Zero Trust-architectuur voor AI-infrastructuur
Zero trust-principes implementeren en aanvallen over ML-trainingspijplijnen, inferentie-endpoints en modelregisters
Overzicht
Zero trust-architectuur (ZTA) werkt vanuit het principe dat geen enkele netwerklocatie, gebruiker of systeem inherent vertrouwd zou moeten worden. Elk toegangsverzoek moet worden geauthenticeerd, geautoriseerd en continu gevalideerd, ongeacht waar het vandaan komt. Hoewel zero trust breed is toegepast op traditionele bedrijfsinfrastructuur, introduceert de toepassing ervan op AI-systemen unieke uitdagingen die gaten creëren die aanvallers kunnen misbruiken.
AI-infrastructuur heeft kenmerken die zero trust-implementaties onder druk zetten. Trainingsclusters vereisen GPU-naar-GPU-communicatie met hoge bandbreedte en lage latentie (vaak via RDMA/InfiniBand) die moeilijk te onderscheppen en te inspecteren is zonder onaanvaardbare prestatieoverhead. Modelartefacten zijn groot (honderden gigabytes voor moderne LLM's) en moeten worden overgedragen tussen registers, trainingssystemen en serving-infrastructuur — wat druk creëert om beveiligingscontroles te omzeilen ten gunste van prestaties. Feature stores, experiment trackers en datapijplijnen gebruiken vaak service-naar-service-authenticatie met langlevende credentials, omdat de overhead van token-rotatie als onpraktisch wordt gezien voor langlopende trainingstaken.
Inferentie-endpoints moeten in milliseconden reageren, waardoor autorisatiecontroles per verzoek een prestatieprobleem worden.
Deze spanningen tussen beveiliging en prestaties creëren voorspelbare gaten in zero trust-implementaties die red teamers kunnen identificeren en misbruiken. Dit artikel onderzoekt hoe je zero trust-principes toepast op AI-infrastructuur, waar implementaties doorgaans tekortschieten en hoe aanvallers die gaten als doelwit nemen. De inhoud sluit aan op NIST SP 800-207 (Zero Trust Architecture) en NIST AI RMF voor AI-specifieke risico-overwegingen.
Zero trust-principes toegepast op AI-infrastructuur
Identity: elk component krijgt een verifieerbare identiteit
In een zero trust-AI-infrastructuur moet elk component — van trainingstaken tot inferentie-endpoints tot datapijplijnen — een cryptografisch verifieerbare identiteit hebben. Dit gaat verder dan gebruikersauthenticatie en omvat workload identity voor geautomatiseerde processen.
SPIFFE (Secure Production Identity Framework For Everyone) biedt een standaard voor workload identity die goed past bij AI-infrastructuur. Elke workload ontvangt een SPIFFE Verifiable Identity Document (SVID), doorgaans een X.509-certificaat, dat zijn identiteit codeert als een URI (bijv. spiffe://ai-platform/training/job-12345).
"""
SPIFFE-based workload identity verification for AI pipeline components.
Demonstrates how to verify that a training job, model registry, or
inference endpoint has a valid identity before allowing access.
"""
import ssl
import socket
import json
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlparse
from cryptography import x509
from cryptography.x509.oid import ExtensionOID, NameOID
from cryptography.hazmat.primitives import hashes
@dataclass
class WorkloadIdentity:
"""Parsed SPIFFE identity from an X.509 SVID."""
spiffe_id: str
trust_domain: str
workload_path: str
certificate_hash: str
not_valid_after: str
@property
def component_type(self) -> str:
"""Extract AI component type from SPIFFE path."""
parts = self.workload_path.strip("/").split("/")
if parts:
return parts[0] # bijv. "training", "inference", "registry"
return "unknown"
def extract_spiffe_id(cert: x509.Certificate) -> Optional[WorkloadIdentity]:
"""
Extract SPIFFE ID from X.509 certificate SAN extension.
SPIFFE IDs are encoded as URI SANs in the format:
spiffe://<trust-domain>/<workload-path>
"""
try:
san_ext = cert.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME
)
san = san_ext.value
for uri in san.get_values_for_type(x509.UniformResourceIdentifier):
if uri.startswith("spiffe://"):
parsed = urlparse(uri)
cert_hash = cert.fingerprint(hashes.SHA256()).hex()
return WorkloadIdentity(
spiffe_id=uri,
trust_domain=parsed.hostname or "",
workload_path=parsed.path,
certificate_hash=cert_hash,
not_valid_after=str(cert.not_valid_after_utc),
)
except x509.ExtensionNotFound:
return None
return None
class AIZeroTrustVerifier:
"""
Verify workload identity and enforce access policies
for AI infrastructure components.
"""
# Toegangscontrolematrix: welke componenten welke kunnen benaderen
ACCESS_POLICIES = {
"training": {
"allowed_targets": [
"data-store",
"registry",
"experiment-tracker",
"feature-store",
],
"denied_targets": ["inference", "monitoring-admin"],
},
"inference": {
"allowed_targets": [
"registry", # Alleen-lezen voor het laden van modellen
"feature-store", # Voor het ophalen van features
],
"denied_targets": [
"training",
"data-store", # Inferentie zou geen ruwe trainingsdata mogen benaderen
],
},
"registry": {
"allowed_targets": ["data-store"],
"denied_targets": ["training", "inference"],
},
"pipeline": {
"allowed_targets": [
"training",
"registry",
"data-store",
"feature-store",
],
"denied_targets": ["inference"],
},
}
def __init__(self, trust_domain: str):
self.trust_domain = trust_domain
def verify_access(
self,
source: WorkloadIdentity,
target_component: str,
) -> tuple[bool, str]:
"""
Verify if a source workload is allowed to access a target component.
Returns (allowed, reason).
"""
# Verifieer trust domain
if source.trust_domain != self.trust_domain:
return False, (
f"Trust domain mismatch: {source.trust_domain} "
f"!= {self.trust_domain}"
)
# Zoek policy op voor het componenttype van de bron
policy = self.ACCESS_POLICIES.get(source.component_type)
if policy is None:
return False, (
f"No policy defined for component type: "
f"{source.component_type}"
)
if target_component in policy.get("denied_targets", []):
return False, (
f"{source.component_type} is explicitly denied "
f"access to {target_component}"
)
if target_component in policy.get("allowed_targets", []):
return True, "Access permitted by policy"
# Default deny
return False, (
f"No explicit allow for {source.component_type} -> "
f"{target_component}"
)
def audit_connection(
self,
peer_cert_pem: bytes,
target_component: str,
) -> dict:
"""
Full audit of an incoming connection:
1. Parse certificate
2. Extract SPIFFE identity
3. Check access policy
"""
cert = x509.load_pem_x509_certificate(peer_cert_pem)
identity = extract_spiffe_id(cert)
if identity is None:
return {
"allowed": False,
"reason": "No SPIFFE ID in certificate",
"identity": None,
}
allowed, reason = self.verify_access(identity, target_component)
return {
"allowed": allowed,
"reason": reason,
"identity": {
"spiffe_id": identity.spiffe_id,
"component_type": identity.component_type,
"cert_hash": identity.certificate_hash,
},
}Microsegmentatie voor AI-netwerken
AI-trainingsclusters gebruiken doorgaans interconnects met hoge snelheid (InfiniBand, RoCE) voor GPU-naar-GPU-communicatie tijdens gedistribueerde training. Deze netwerken worden vaak als vertrouwd behandeld vanwege de prestatiegevoeligheid van collectieve operaties (AllReduce, AllGather). Dit creëert een aanzienlijke blinde vlek in zero trust-implementaties.
Het InfiniBand-vertrouwensgat: InfiniBand-netwerken die in GPU-clusters worden gebruikt, ondersteunen niet dezelfde handhaving van netwerkbeleid die beschikbaar is in op Ethernet gebaseerde Kubernetes-netwerken. Tools zoals Calico en Cilium kunnen microsegmentatie afdwingen voor pod-naar-pod-Ethernetverkeer, maar InfiniBand-verkeer omzeilt de kernel-netwerkstack volledig via RDMA, waardoor het onzichtbaar is voor op eBPF gebaseerd netwerkbeleid.
"""
Audit script for identifying zero trust gaps in AI infrastructure
network segmentation, with focus on GPU cluster interconnects.
"""
import subprocess
import json
import re
from typing import Optional
def audit_kubernetes_network_policies(namespace: str = "ml-platform") -> list[dict]:
"""
Audit Kubernetes network policies for AI workload namespaces.
Identifies missing policies that would allow unrestricted
communication between components.
"""
findings = []
# Haal alle pods in de namespace op
result = subprocess.run(
["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
capture_output=True, text=True, timeout=30,
)
pods = json.loads(result.stdout)
# Haal netwerkbeleid op
result = subprocess.run(
["kubectl", "get", "networkpolicies", "-n", namespace, "-o", "json"],
capture_output=True, text=True, timeout=30,
)
policies = json.loads(result.stdout)
# Controleer of default-deny bestaat
has_default_deny = any(
policy["metadata"]["name"].startswith("default-deny")
for policy in policies.get("items", [])
)
if not has_default_deny:
findings.append({
"severity": "HIGH",
"title": f"No default-deny policy in namespace {namespace}",
"detail": (
"Without a default-deny ingress/egress policy, all pods "
"can communicate freely. AI components (training, inference, "
"registry) should be isolated by default."
),
})
# Controleer op pods met host networking
for pod in pods.get("items", []):
pod_name = pod["metadata"]["name"]
spec = pod.get("spec", {})
if spec.get("hostNetwork", False):
findings.append({
"severity": "HIGH",
"title": f"Pod {pod_name} uses host networking",
"detail": (
"Host networking bypasses all Kubernetes network "
"policies. This pod has unrestricted network access "
"to the node and potentially the InfiniBand fabric."
),
})
# Controleer op geprivilegieerde containers (gebruikelijk bij GPU-workloads)
for container in spec.get("containers", []):
sec_ctx = container.get("securityContext", {})
if sec_ctx.get("privileged", False):
findings.append({
"severity": "HIGH",
"title": (
f"Privileged container: {pod_name}/"
f"{container['name']}"
),
"detail": (
"Privileged containers can access all host "
"devices including InfiniBand HCAs, bypass "
"network namespaces, and escape container "
"isolation."
),
})
# Controleer op toegang tot RDMA/InfiniBand-devices
for pod in pods.get("items", []):
pod_name = pod["metadata"]["name"]
for container in pod.get("spec", {}).get("containers", []):
resources = container.get("resources", {})
limits = resources.get("limits", {})
requests = resources.get("requests", {})
all_resources = {**limits, **requests}
for resource_name in all_resources:
if "rdma" in resource_name or "infiniband" in resource_name:
findings.append({
"severity": "MEDIUM",
"title": (
f"RDMA device access: {pod_name}/"
f"{container['name']}"
),
"detail": (
f"Container requests {resource_name}. RDMA "
f"traffic bypasses kernel networking and is "
f"not subject to NetworkPolicy enforcement."
),
})
return findings
def check_service_mesh_coverage(namespace: str = "ml-platform") -> list[dict]:
"""
Verify that a service mesh (Istio/Linkerd) covers AI workloads
and that mTLS is enforced.
"""
findings = []
# Controleer op Istio sidecar-injectie
result = subprocess.run(
[
"kubectl", "get", "pods", "-n", namespace,
"-o", "jsonpath={range .items[*]}{.metadata.name}{"
"\\t}{.spec.containers[*].name}{\\n}{end}",
],
capture_output=True, text=True, timeout=30,
)
for line in result.stdout.strip().split("\n"):
if not line.strip():
continue
parts = line.split("\t")
if len(parts) < 2:
continue
pod_name = parts[0]
containers = parts[1].split()
has_sidecar = any(
c in containers
for c in ["istio-proxy", "linkerd-proxy", "envoy-sidecar"]
)
if not has_sidecar:
findings.append({
"severity": "MEDIUM",
"title": f"No service mesh sidecar: {pod_name}",
"detail": (
"This pod communicates without mTLS enforcement. "
"Traffic can be intercepted or spoofed by adjacent "
"workloads."
),
})
# Controleer Istio PeerAuthentication-policy
result = subprocess.run(
[
"kubectl", "get", "peerauthentication", "-n", namespace,
"-o", "json",
],
capture_output=True, text=True, timeout=30,
)
if result.returncode == 0:
pa_policies = json.loads(result.stdout)
strict_mtls = any(
policy.get("spec", {}).get("mtls", {}).get("mode") == "STRICT"
for policy in pa_policies.get("items", [])
)
if not strict_mtls:
findings.append({
"severity": "HIGH",
"title": "mTLS not set to STRICT mode",
"detail": (
"PERMISSIVE mTLS allows plaintext connections. "
"An attacker in the mesh can intercept inference "
"requests, model weights, and training data."
),
})
return findingsAanvallen op zero trust-gaten in AI-pijplijnen
Misbruik van impliciet vertrouwen tussen pijplijnfasen
ML-pijplijnen (gebouwd met tools zoals Kubeflow, Airflow of aangepaste systemen) brengen vaak impliciet vertrouwen tot stand tussen fasen. Een trainingsfase produceert een modelartefact dat de evaluatiefase consumeert, en de goedkeuring van de evaluatiefase triggert de deployment. Als de pijplijn artefacten van eerdere fasen vertrouwt zonder verificatie, kan een aanvaller die een willekeurige enkele fase compromitteert, zich door de hele pijplijn verspreiden.
"""
Demonstrate trust boundary violations in ML pipelines.
This script identifies pipeline stages that accept artifacts
from upstream stages without integrity verification.
"""
import hashlib
import json
import os
from pathlib import Path
from dataclasses import dataclass
from typing import Optional
@dataclass
class PipelineArtifact:
"""Represents an artifact passed between pipeline stages."""
stage_name: str
artifact_path: str
expected_hash: Optional[str]
actual_hash: Optional[str]
is_signed: bool
signature_valid: Optional[bool]
def audit_pipeline_artifacts(
pipeline_run_dir: str,
) -> list[dict]:
"""
Audit artifacts in a pipeline run directory for
integrity verification gaps.
"""
findings = []
run_path = Path(pipeline_run_dir)
if not run_path.exists():
return [{"severity": "ERROR", "title": "Pipeline run directory not found",
"detail": f"{pipeline_run_dir} does not exist"}]
# Zoek naar veelvoorkomende metadatabestanden van pijplijnen
metadata_files = list(run_path.rglob("**/metadata.json")) + \
list(run_path.rglob("**/artifact_info.json"))
for meta_file in metadata_files:
try:
with open(meta_file) as f:
metadata = json.load(f)
except (json.JSONDecodeError, IOError):
continue
stage_name = metadata.get("stage", meta_file.parent.name)
artifacts = metadata.get("output_artifacts", [])
for artifact in artifacts:
art_path = artifact.get("path", "")
has_hash = "sha256" in artifact or "hash" in artifact
has_signature = "signature" in artifact
if not has_hash:
findings.append({
"severity": "HIGH",
"title": f"No integrity hash: {stage_name}/{art_path}",
"detail": (
f"Artifact from stage '{stage_name}' has no hash. "
f"A compromised upstream stage could substitute "
f"a malicious artifact (e.g., poisoned model weights)."
),
})
if not has_signature:
findings.append({
"severity": "MEDIUM",
"title": f"No signature: {stage_name}/{art_path}",
"detail": (
f"Artifact is not cryptographically signed. "
f"Even with a hash, the hash itself could be "
f"modified by a compromised pipeline controller."
),
})
# Verifieer hash indien aanwezig
if has_hash:
expected = artifact.get("sha256") or artifact.get("hash")
full_path = run_path / art_path
if full_path.exists():
actual = hashlib.sha256(
full_path.read_bytes()
).hexdigest()
if actual != expected:
findings.append({
"severity": "CRITICAL",
"title": (
f"Hash mismatch: {stage_name}/{art_path}"
),
"detail": (
f"Expected {expected}, got {actual}. "
f"Artifact may have been tampered with."
),
})
# Controleer op het doorgeven van credentials tussen fasen
env_files = list(run_path.rglob("**/.env")) + \
list(run_path.rglob("**/secrets.*"))
for env_file in env_files:
findings.append({
"severity": "HIGH",
"title": f"Credentials in pipeline artifacts: {env_file}",
"detail": (
"Secrets stored in pipeline artifacts can be accessed "
"by downstream stages and persisted in artifact storage."
),
})
return findingsToken- en credential-aanvallen
Langlopende trainingstaken gebruiken vaak service account-tokens of API-sleutels met verlengde geldigheid. In zero trust-architecturen zouden deze kortlevend en continu gevalideerd moeten zijn. Veelvoorkomende gaten zijn:
- Statische service account-tokens in Kubernetes die niet verlopen (standaardgedrag vóór v1.24)
- Cloud IAM-rollen met te brede rechten gekoppeld aan trainingsnode-pools
- Modelregister-credentials ingebed in pijplijnconfiguraties
- Experiment tracking-API-sleutels gedeeld door alle teamleden
Een aanvaller die de credentials van een trainingstaak bemachtigt, krijgt toegang tot alles waartoe die trainingstaak toegang heeft: trainingsdata, het modelregister, experiment tracking en mogelijk andere cloud-resources via role chaining of federatie.
"""
Credential exposure analysis for AI workloads in Kubernetes.
Identifies overly broad credentials, long-lived tokens, and
credential sharing patterns that violate zero trust principles.
"""
import subprocess
import json
import base64
from typing import Any
def audit_ai_credentials(namespace: str = "ml-platform") -> list[dict]:
"""
Audit credentials available to AI workloads for zero trust
violations: excessive scope, long validity, and sharing.
"""
findings = []
# Haal alle service accounts in de namespace op
result = subprocess.run(
["kubectl", "get", "serviceaccounts", "-n", namespace, "-o", "json"],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
return findings
service_accounts = json.loads(result.stdout)
for sa in service_accounts.get("items", []):
sa_name = sa["metadata"]["name"]
# Controleer op gemounte secrets
secrets = sa.get("secrets", [])
if len(secrets) > 0:
findings.append({
"severity": "MEDIUM",
"title": f"Service account has bound secrets: {sa_name}",
"detail": (
f"SA {sa_name} has {len(secrets)} bound secrets. "
f"In zero trust, prefer projected service account tokens "
f"with expiration over static secrets."
),
})
# Controleer annotaties op cloud IAM-bindingen
annotations = sa.get("metadata", {}).get("annotations", {})
# GKE Workload Identity
gke_sa = annotations.get(
"iam.gke.io/gcp-service-account", ""
)
if gke_sa:
findings.append({
"severity": "INFO",
"title": f"GKE Workload Identity binding: {sa_name}",
"detail": f"Bound to GCP SA: {gke_sa}. Verify scope is minimal.",
})
# EKS IRSA
eks_role = annotations.get(
"eks.amazonaws.com/role-arn", ""
)
if eks_role:
findings.append({
"severity": "INFO",
"title": f"EKS IRSA binding: {sa_name}",
"detail": f"Bound to IAM role: {eks_role}. Verify role policy scope.",
})
# Controleer op pods met credentials in omgevingsvariabelen
pods_result = subprocess.run(
["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
capture_output=True, text=True, timeout=30,
)
if pods_result.returncode == 0:
pods = json.loads(pods_result.stdout)
sensitive_env_patterns = [
"KEY", "SECRET", "PASSWORD", "TOKEN", "CREDENTIAL",
"API_KEY", "ACCESS_KEY", "PRIVATE_KEY",
]
for pod in pods.get("items", []):
pod_name = pod["metadata"]["name"]
for container in pod.get("spec", {}).get("containers", []):
for env in container.get("env", []):
env_name = env.get("name", "").upper()
if any(p in env_name for p in sensitive_env_patterns):
# Controleer of het van een secret-referentie komt (beter)
# of een plaintext-waarde (slechter)
if "value" in env and env["value"]:
findings.append({
"severity": "CRITICAL",
"title": (
f"Hardcoded credential: {pod_name} "
f"env {env['name']}"
),
"detail": (
"Credential is hardcoded in pod spec "
"as a plaintext value. Use Kubernetes "
"secrets with projected volumes or "
"external secret managers."
),
})
elif "valueFrom" in env:
source = env["valueFrom"]
if "secretKeyRef" in source:
findings.append({
"severity": "LOW",
"title": (
f"Secret-backed credential: "
f"{pod_name} env {env['name']}"
),
"detail": (
f"From secret: "
f"{source['secretKeyRef'].get('name')}. "
f"Verify rotation policy."
),
})
return findingsContinue verificatie en device posture
Zero trust-architecturen vereisen continue verificatie — niet slechts eenmalig authenticeren op het moment van verbinding. Voor AI-workloads betekent dit:
- Runtime-integriteitscontrole: Verifieer dat het trainingsscript, de modelhandler of de serving-binary niet is gewijzigd sinds de deployment. Container image-digests zouden geverifieerd moeten worden bij pod-admissie en periodiek tijdens runtime.
- Node attestation: GPU-nodes zouden hun integriteit moeten attesteren voordat ze worden vertrouwd met gevoelige modelgewichten of trainingsdata. Hardwaregebaseerde attestation (TPM, TEE-attestation) biedt sterkere garanties dan controles die alleen op software gebaseerd zijn.
- Gedragsmonitoring: Monitor het gedrag van AI-workloads continu op anomalieën. Een trainingstaak die plotseling uitgaande netwerkverbindingen begint te maken die hij nog nooit heeft gemaakt, of een inferentie-endpoint waarvan de latentiedistributie van de respons drastisch verandert, kan gecompromitteerd zijn.
- Token-vernieuwing onder herevaluatie van beleid: Wanneer credentials worden vernieuwd, zou de autorisatiebeslissing opnieuw geëvalueerd moeten worden tegen het huidige beleid. Dit zorgt ervoor dat beleidswijzigingen (zoals het intrekken van de toegang van een team tot een model) van kracht worden binnen de levensduur van het token.
Praktische voorbeelden
Zero trust-compliancechecker voor AI-platforms
#!/usr/bin/env bash
# Zero trust compliance audit for AI infrastructure on Kubernetes
# Checks for common violations of zero trust principles
set -euo pipefail
NAMESPACE="${1:-ml-platform}"
echo "=== Zero Trust Audit: Namespace $NAMESPACE ==="
echo ""
echo "--- 1. Default Deny Network Policies ---"
DENY_POLICIES=$(kubectl get networkpolicies -n "$NAMESPACE" \
-o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{end}' 2>/dev/null \
| grep -c "default-deny" || true)
if [ "$DENY_POLICIES" -eq 0 ]; then
echo "[FAIL] No default-deny network policy found"
else
echo "[PASS] Default-deny policy exists"
fi
echo ""
echo "--- 2. Service Mesh mTLS ---"
STRICT_MTLS=$(kubectl get peerauthentication -n "$NAMESPACE" \
-o jsonpath='{range .items[*]}{.spec.mtls.mode}{"\n"}{end}' 2>/dev/null \
| grep -c "STRICT" || true)
if [ "$STRICT_MTLS" -eq 0 ]; then
echo "[FAIL] No STRICT mTLS PeerAuthentication policy"
else
echo "[PASS] STRICT mTLS enforced"
fi
echo ""
echo "--- 3. Service Account Token Projection ---"
# Controleer op pods die legacy niet-verlopende tokens gebruiken
LEGACY_TOKENS=$(kubectl get pods -n "$NAMESPACE" -o json 2>/dev/null \
| python3 -c "
import json, sys
data = json.load(sys.stdin)
count = 0
for pod in data.get('items', []):
for vol in pod.get('spec', {}).get('volumes', []):
if 'secret' in vol and 'token' in vol.get('secret', {}).get('secretName', '').lower():
count += 1
print(f' Legacy token: {pod[\"metadata\"][\"name\"]}')
print(f'Total: {count}')
" 2>/dev/null)
echo "$LEGACY_TOKENS"
echo ""
echo "--- 4. Privileged Containers (Zero Trust Violation) ---"
kubectl get pods -n "$NAMESPACE" -o json 2>/dev/null \
| python3 -c "
import json, sys
data = json.load(sys.stdin)
for pod in data.get('items', []):
for c in pod.get('spec', {}).get('containers', []):
sc = c.get('securityContext', {})
if sc.get('privileged'):
print(f' [FAIL] {pod[\"metadata\"][\"name\"]}/{c[\"name\"]} is privileged')
if sc.get('runAsUser') == 0:
print(f' [WARN] {pod[\"metadata\"][\"name\"]}/{c[\"name\"]} runs as root')
" 2>/dev/null
echo ""
echo "--- 5. External Access Points ---"
echo "Services with LoadBalancer or NodePort (exposed externally):"
kubectl get svc -n "$NAMESPACE" -o json 2>/dev/null \
| python3 -c "
import json, sys
data = json.load(sys.stdin)
for svc in data.get('items', []):
svc_type = svc.get('spec', {}).get('type', 'ClusterIP')
if svc_type in ('LoadBalancer', 'NodePort'):
name = svc['metadata']['name']
ports = svc['spec'].get('ports', [])
port_str = ', '.join(str(p.get('port', '?')) for p in ports)
print(f' [WARN] {name} ({svc_type}): ports {port_str}')
" 2>/dev/null
echo ""
echo "=== Audit Complete ==="Verdediging en tegenmaatregelen
Implementeer overal workload identity: Gebruik SPIFFE/SPIRE of cloud-native workload identity (GKE Workload Identity, EKS IRSA) voor alle AI-componenten. Elimineer statische credentials en service account-sleutels. Trainingstaken zouden kortlevende tokens moeten gebruiken die automatisch geroteerd worden.
Dwing mTLS af voor alle service-naar-service-communicatie: Deploy een service mesh (Istio, Linkerd) in STRICT mTLS-modus. Implementeer voor hoogperformante trainingsnetwerken die RDMA/InfiniBand gebruiken versleuteling op applicatieniveau waar kernel-bypass-netwerken handhaving op mesh-niveau verhinderen.
Pas default-deny-netwerkbeleid toe: Elke AI-namespace zou een default-deny ingress- en egress-beleid moeten hebben. Sta expliciet alleen vereiste communicatiepaden toe: training naar datastore, inferentie naar modelregister, pijplijncontroller naar individuele fasen.
Verifieer artefacten bij elke grens: Elke pijplijnfase zou de integriteit van inkomende artefacten moeten verifiëren met cryptografische handtekeningen, niet alleen met hashes. Gebruik tools zoals Sigstore/cosign voor het ondertekenen en verifiëren van modelartefacten.
Kortlevende credentials met continue validatie: Trainingstaken zouden credentials moeten ontvangen die verlopen voordat de taak is voltooid, waardoor vernieuwing via een token exchange nodig is die de autorisatie opnieuw evalueert. Dit beperkt de blast radius van credential-diefstal.
Monitor en alarmeer op beleidsovertredingen: Implementeer continue compliancemonitoring die wijzigingen in netwerkbeleid, nieuwe geprivilegieerde workloads, omzeiling van de service mesh en credential-anomalieën detecteert. Integreer met SIEM voor correlatie met andere beveiligingsgebeurtenissen.
Implementeer zero trust op dataniveau: Pas naast netwerk en identiteit zero trust-principes toe op de data zelf. Trainingsdata zou metadata moeten dragen over de herkomst en integriteit ervan. Modelartefacten zouden ondertekend en geverifieerd moeten worden bij elk laadpunt. Inferentie-invoer en -uitvoer zouden gevalideerd moeten worden tegen verwachte schema's en distributies. Deze zero trust-aanpak op dataniveau vangt aanvallen op die netwerkcontroles omzeilen, zoals datavergiftiging via legitieme pijplijncomponenten.
Segmenteer op gevoeligheidsniveau: Niet alle AI-workloads vereisen dezelfde beveiligingshouding. Een model dat productafbeeldingen classificeert heeft een ander risico dan een model dat medische dossiers verwerkt. Implementeer gelaagde zero trust-zones waar de strengste controles (hardware-attestation, versleutelde inferentie, volledig geïsoleerde netwerken) gereserveerd zijn voor de meest gevoelige AI-workloads, terwijl minder gevoelige workloads lichtere controles gebruiken. Dit voorkomt dat de prestatieoverhead van maximale beveiliging een barrière voor adoptie wordt.
Test zero trust-controles regelmatig: Zero trust-architecturen degraderen na verloop van tijd naarmate uitzonderingen zich opstapelen, configuraties wegdrijven en nieuwe componenten worden toegevoegd zonder goede integratie. Plan regelmatige penetratietesten die specifiek gericht zijn op zero trust-grenzen — probeer laterale beweging van training naar inferentie, van de ene tenant naar de andere, van een gecompromitteerde container naar de host, en van een interne positie naar externe data-exfiltratie. Elke test valideert dat de controles daadwerkelijk werken, niet alleen dat ze geconfigureerd zijn.
Referenties
- Rose, S., Borchert, O., Mitchell, S., & Connelly, S. (2020). "Zero Trust Architecture." NIST Special Publication 800-207. https://doi.org/10.6028/NIST.SP.800-207
- SPIFFE. (2024). "Secure Production Identity Framework for Everyone." https://spiffe.io/docs/latest/spiffe-about/overview/
- NIST. (2023). "Artificial Intelligence Risk Management Framework (AI RMF 1.0)." https://www.nist.gov/artificial-intelligence/executive-order-safe-secure-and-trustworthy-artificial-intelligence
- MITRE ATLAS. "Techniques: ML Supply Chain Compromise." https://atlas.mitre.org/techniques/AML.T0010
- Google. (2024). "BeyondCorp: A New Approach to Enterprise Security." https://cloud.google.com/beyondcorp
- Kubernetes. (2024). "Network Policies." https://kubernetes.io/docs/concepts/services-networking/network-policies/