AI 基礎設施的零信任架構
在 ML 訓練管線、推論端點與模型登錄檔中實作並攻擊零信任原則。
概觀
零信任架構(ZTA)的核心原則是:沒有任何網路位置、使用者或系統應該被天然信任。每一次存取請求,無論來自何處,都必須經過認證、授權並持續驗證。零信任雖已廣泛應用於傳統企業基礎設施,但應用於 AI 系統時衍生出獨特挑戰,這些挑戰形成可供攻擊者利用的缺口。
AI 基礎設施具有一些使零信任實作備受壓力的特性。訓練叢集需要高頻寬、低延遲的 GPU 對 GPU 通訊(通常透過 RDMA/InfiniBand),若欲攔截檢查將引入無法接受的效能負擔。模型工件龐大(現代 LLM 可達數百 GB),必須在登錄檔、訓練系統與服務基礎設施之間傳輸——為了效能而繞過安全控制的誘惑隨之而來。特徵儲存、實驗追蹤與資料管線往往採用具長效憑證的服務對服務認證,因為對長時間執行的訓練工作而言,權杖輪替被視為不切實際。
推論端點必須在毫秒內回應,使每次請求的授權檢查成為效能顧慮。
這些安全與效能之間的張力在零信任實作中形成可預期的缺口,紅隊可據以辨識並利用。本文檢視如何將零信任原則套用於 AI 基礎設施、實作通常在哪些環節不足,以及攻擊者如何針對這些缺口。內容與 NIST SP 800-207(零信任架構)及 NIST AI RMF 中 AI 特有風險考量對齊。
零信任原則套用於 AI 基礎設施
身分:每個元件皆具備可驗證的身分
在零信任 AI 基礎設施中,每個元件——從訓練工作、推論端點到資料管線——都必須擁有以密碼學可驗證的身分。這超越使用者認證,延伸至自動化流程的工作負載身分。
SPIFFE(Secure Production Identity Framework For Everyone) 提供一套非常適用於 AI 基礎設施的工作負載身分標準。每個工作負載都會取得一份 SPIFFE 可驗證身分文件(SVID),通常為 X.509 憑證,將其身分編碼為 URI(例如 spiffe://ai-platform/training/job-12345)。
"""
SPIFFE-based workload identity verification for AI pipeline components.
Demonstrates how to verify that a training job, model registry, or
inference endpoint has a valid identity before allowing access.
"""
import ssl
import socket
import json
from dataclasses import dataclass
from typing import Optional
from urllib.parse import urlparse
from cryptography import x509
from cryptography.x509.oid import ExtensionOID, NameOID
from cryptography.hazmat.primitives import hashes
@dataclass
class WorkloadIdentity:
"""Parsed SPIFFE identity from an X.509 SVID."""
spiffe_id: str
trust_domain: str
workload_path: str
certificate_hash: str
not_valid_after: str
@property
def component_type(self) -> str:
"""Extract AI component type from SPIFFE path."""
parts = self.workload_path.strip("/").split("/")
if parts:
return parts[0] # e.g., "training", "inference", "registry"
return "unknown"
def extract_spiffe_id(cert: x509.Certificate) -> Optional[WorkloadIdentity]:
"""
Extract SPIFFE ID from X.509 certificate SAN extension.
SPIFFE IDs are encoded as URI SANs: spiffe://<trust-domain>/<workload>
"""
try:
san_ext = cert.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_ALTERNATIVE_NAME
)
for uri in san_ext.value.get_values_for_type(
x509.UniformResourceIdentifier
):
if uri.startswith("spiffe://"):
parsed = urlparse(uri)
return WorkloadIdentity(
spiffe_id=uri,
trust_domain=parsed.hostname or "",
workload_path=parsed.path,
certificate_hash=cert.fingerprint(hashes.SHA256()).hex(),
not_valid_after=str(cert.not_valid_after_utc),
)
except x509.ExtensionNotFound:
return None
return None
class AIZeroTrustVerifier:
"""Enforce zero trust access policies for AI components."""
ACCESS_POLICIES = {
"training": {
"allowed_targets": [
"data-store", "registry",
"experiment-tracker", "feature-store",
],
"denied_targets": ["inference", "monitoring-admin"],
},
"inference": {
"allowed_targets": ["registry", "feature-store"],
"denied_targets": ["training", "data-store"],
},
"registry": {
"allowed_targets": ["data-store"],
"denied_targets": ["training", "inference"],
},
"pipeline": {
"allowed_targets": [
"training", "registry",
"data-store", "feature-store",
],
"denied_targets": ["inference"],
},
}
def __init__(self, trust_domain: str):
self.trust_domain = trust_domain
def verify_access(
self, source: WorkloadIdentity, target_component: str,
) -> tuple[bool, str]:
if source.trust_domain != self.trust_domain:
return False, f"Trust domain mismatch: {source.trust_domain}"
policy = self.ACCESS_POLICIES.get(source.component_type)
if policy is None:
return False, f"No policy for {source.component_type}"
if target_component in policy.get("denied_targets", []):
return False, f"Denied: {source.component_type} -> {target_component}"
if target_component in policy.get("allowed_targets", []):
return True, "Access permitted by policy"
return False, "No explicit allow (default deny)"此實作展示零信任的幾個核心設計:(1)從 X.509 SVID 的 SAN 中擷取 SPIFFE URI 作為身分;(2)以信任網域防止跨環境憑證混淆;(3)使用以元件類別為鍵的存取矩陣,明確列出允許與拒絕的目標;(4)採「預設拒絕」策略——未明言允許者一律拒絕。例如推論 Pod 不得讀取原始訓練資料儲存,訓練工作則不得連線至推論端點,這樣可防止在任一方遭入侵時橫向擴散。
AI 網路的微分段
AI 訓練叢集通常使用高速互連(InfiniBand、RoCE)進行分散式訓練的 GPU 對 GPU 通訊。由於集合運算(AllReduce、AllGather)的效能敏感度,這些網路經常被視為可信,從而在零信任實作中形成顯著盲點。
InfiniBand 信任缺口:GPU 叢集所用的 InfiniBand 網路,並不支援與乙太網路 Kubernetes 網路相同層級的網路策略執行。Calico、Cilium 等工具可為 pod 間的乙太網流量執行微分段,但 InfiniBand 流量透過 RDMA 完全繞過核心網路堆疊,讓基於 eBPF 的網路策略完全看不見。
"""Audit Kubernetes network policies and pod configuration for zero
trust gaps in GPU cluster interconnects."""
import subprocess
import json
def audit_kubernetes_network_policies(
namespace: str = "ml-platform",
) -> list[dict]:
findings = []
pods = json.loads(subprocess.run(
["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
capture_output=True, text=True, timeout=30,
).stdout)
policies = json.loads(subprocess.run(
["kubectl", "get", "networkpolicies", "-n", namespace, "-o", "json"],
capture_output=True, text=True, timeout=30,
).stdout)
# 1. Missing default-deny is a HIGH severity finding:
# without it, all pods communicate freely.
has_default_deny = any(
p["metadata"]["name"].startswith("default-deny")
for p in policies.get("items", [])
)
if not has_default_deny:
findings.append({"severity": "HIGH",
"title": f"No default-deny in {namespace}"})
# 2. Host networking bypasses all NetworkPolicies; flag HIGH.
# 3. Privileged containers (common for GPU workloads) escape isolation.
# 4. RDMA/InfiniBand device requests bypass kernel networking entirely.
for pod in pods.get("items", []):
spec = pod.get("spec", {})
if spec.get("hostNetwork"):
findings.append({"severity": "HIGH",
"title": f"hostNetwork: {pod['metadata']['name']}"})
for c in spec.get("containers", []):
if c.get("securityContext", {}).get("privileged"):
findings.append({"severity": "HIGH",
"title": f"Privileged: {c['name']}"})
limits = {**c.get("resources", {}).get("limits", {}),
**c.get("resources", {}).get("requests", {})}
for r in limits:
if "rdma" in r or "infiniband" in r:
findings.append({"severity": "MEDIUM",
"title": f"RDMA device: {c['name']}"})
return findings
def check_service_mesh_coverage(namespace: str) -> list[dict]:
"""Verify Istio/Linkerd sidecars exist and PeerAuthentication is STRICT."""
findings = []
# Check each pod for istio-proxy / linkerd-proxy / envoy sidecar
# and check for STRICT mTLS PeerAuthentication in the namespace.
# PERMISSIVE mTLS allows plaintext — treat as HIGH severity.
return findings稽核重點歸納為:缺少 default-deny 策略、使用 hostNetwork、特權容器、RDMA/InfiniBand 裝置請求(這些流量繞過核心網路)、缺少服務網格 sidecar 注入,以及 PeerAuthentication 未設為 STRICT 模式(PERMISSIVE 允許明文)。這些缺口共同造成攻擊者可在 ML 平台內橫向移動並攔截推論請求、模型權重與訓練資料。
攻擊 AI 管線中的零信任缺口
利用管線階段間的隱含信任
ML 管線(使用 Kubeflow、Airflow 或自訂系統建置)經常在各階段間建立隱含信任。訓練階段產出模型工件,由評估階段消費,評估階段的核可則觸發部署。若管線未驗證即信任前一階段的工件,只要攻擊者入侵任一階段即可沿管線擴散。
"""Audit ML pipeline artifacts for trust boundary violations
across stages (Kubeflow, Airflow, custom systems)."""
import hashlib
import json
from pathlib import Path
def audit_pipeline_artifacts(pipeline_run_dir: str) -> list[dict]:
findings = []
run_path = Path(pipeline_run_dir)
if not run_path.exists():
return [{"severity": "ERROR", "title": "run dir missing"}]
# Walk all metadata.json / artifact_info.json produced by stages.
for meta_file in (list(run_path.rglob("**/metadata.json"))
+ list(run_path.rglob("**/artifact_info.json"))):
try:
metadata = json.loads(meta_file.read_text())
except (json.JSONDecodeError, IOError):
continue
stage_name = metadata.get("stage", meta_file.parent.name)
for artifact in metadata.get("output_artifacts", []):
art_path = artifact.get("path", "")
has_hash = "sha256" in artifact or "hash" in artifact
has_sig = "signature" in artifact
if not has_hash:
findings.append({
"severity": "HIGH",
"title": f"No hash: {stage_name}/{art_path}",
"detail": "Upstream stage can substitute poisoned model.",
})
if not has_sig:
findings.append({
"severity": "MEDIUM",
"title": f"No signature: {stage_name}/{art_path}",
})
# If hash exists, recompute and compare — mismatch is CRITICAL.
if has_hash:
expected = artifact.get("sha256") or artifact.get("hash")
full = run_path / art_path
if full.exists():
actual = hashlib.sha256(full.read_bytes()).hexdigest()
if actual != expected:
findings.append({"severity": "CRITICAL",
"title": "Hash mismatch"})
# Credentials accidentally stored in artifact directories are a
# HIGH-severity finding: secrets persist into artifact storage.
for env_file in (list(run_path.rglob("**/.env"))
+ list(run_path.rglob("**/secrets.*"))):
findings.append({"severity": "HIGH",
"title": f"Credentials in artifacts: {env_file}"})
return findings此稽核程式揭示常見問題:管線工件缺少完整性雜湊、缺少密碼學簽章、雜湊不符、以及將機密(.env、secrets.*)錯誤地放入工件目錄中使下游階段得以讀取。只有對每個階段產出的模型工件進行簽章並於下游驗證(例如 Sigstore/cosign),才能打斷這條「單一入侵擴散至全管線」的路徑。
權杖與憑證攻擊
長時間執行的訓練工作通常使用效期較長的服務帳號權杖或 API 金鑰。在零信任架構中,這些憑證應為短效且持續驗證。常見缺口包括:
- Kubernetes 中不會過期的靜態服務帳號權杖(v1.24 之前預設行為)
- 雲端 IAM 角色 權限過寬,綁定於訓練節點池
- 內嵌於管線組態中的模型登錄檔憑證
- 整個團隊共用的實驗追蹤 API 金鑰
取得訓練工作憑證的攻擊者,將能存取該工作可觸及的所有資源:訓練資料、模型登錄檔、實驗追蹤,甚至可能透過角色鏈或聯合存取其他雲端資源。
"""Audit Kubernetes AI workloads for credential-related zero trust
violations: excessive scope, long validity, hardcoded secrets."""
import subprocess
import json
def audit_ai_credentials(namespace: str = "ml-platform") -> list[dict]:
findings = []
sas = json.loads(subprocess.run(
["kubectl", "get", "serviceaccounts", "-n", namespace, "-o", "json"],
capture_output=True, text=True, timeout=30,
).stdout)
for sa in sas.get("items", []):
# Bound static secrets -> MEDIUM: prefer projected tokens.
if sa.get("secrets"):
findings.append({"severity": "MEDIUM",
"title": f"Static secrets bound: "
f"{sa['metadata']['name']}"})
# Workload identity annotations (GKE / EKS IRSA) are informational;
# the operator must still verify IAM scope is minimal.
ann = sa.get("metadata", {}).get("annotations", {})
if ann.get("iam.gke.io/gcp-service-account"):
findings.append({"severity": "INFO", "title": "GKE WI binding"})
if ann.get("eks.amazonaws.com/role-arn"):
findings.append({"severity": "INFO", "title": "EKS IRSA binding"})
pods = json.loads(subprocess.run(
["kubectl", "get", "pods", "-n", namespace, "-o", "json"],
capture_output=True, text=True, timeout=30,
).stdout)
patterns = ["KEY", "SECRET", "PASSWORD", "TOKEN", "CREDENTIAL"]
for pod in pods.get("items", []):
for c in pod.get("spec", {}).get("containers", []):
for env in c.get("env", []):
name = env.get("name", "").upper()
if any(p in name for p in patterns):
# Hardcoded plaintext value = CRITICAL
# Referenced via secretKeyRef = LOW (verify rotation)
if env.get("value"):
findings.append({"severity": "CRITICAL",
"title": f"Hardcoded env: {name}"})
elif "secretKeyRef" in env.get("valueFrom", {}):
findings.append({"severity": "LOW",
"title": f"Secret-backed env: {name}"})
return findings此稽核指出:服務帳號若綁定靜態 secret(而非使用投射權杖)即為中度違規;容器中以純文字寫入的憑證環境變數為嚴重違規;透過 secretKeyRef 取用的則為低度風險但仍需檢查輪替策略。GKE Workload Identity 與 EKS IRSA 的綁定屬資訊性,但仍須人工驗證對應 GCP/IAM 角色權限是否已收斂到最小。
持續驗證與裝置姿態
零信任架構要求持續驗證——不只在連線建立時認證一次。對 AI 工作負載而言,這代表:
- 執行階段完整性檢查:驗證訓練指令稿、模型處理程式或服務二進位檔在部署後未被修改。容器映像摘要應於 pod 受納時檢查,並定期在執行階段再次驗證。
- 節點證明:GPU 節點在被信任處理敏感模型權重或訓練資料之前,應對其完整性作出證明。硬體型證明(TPM、TEE 證明)比僅軟體檢查提供更強的保證。
- 行為監控:持續監控 AI 工作負載行為異常。若訓練工作突然發起從未有過的對外網路連線,或推論端點的回應延遲分佈劇烈變化,可能已遭入侵。
- 權杖更新時重新評估策略:當憑證更新時,授權決策應根據當前策略重新評估。這確保策略變更(例如撤銷某團隊對某模型的存取權)能在權杖生命週期內生效。
實務範例
針對 AI 平台的零信任合規檢查器
#!/usr/bin/env bash
# Zero trust compliance audit for AI infrastructure on Kubernetes
set -euo pipefail
NAMESPACE="${1:-ml-platform}"
echo "=== Zero Trust Audit: Namespace $NAMESPACE ==="
# 1. Default-deny NetworkPolicy must exist
DENY=$(kubectl get networkpolicies -n "$NAMESPACE" \
-o jsonpath='{range .items[*]}{.metadata.name}{"\n"}{end}' \
| grep -c "default-deny" || true)
[ "$DENY" -eq 0 ] && echo "[FAIL] No default-deny policy" \
|| echo "[PASS] Default-deny exists"
# 2. PeerAuthentication mTLS mode must be STRICT
STRICT=$(kubectl get peerauthentication -n "$NAMESPACE" \
-o jsonpath='{range .items[*]}{.spec.mtls.mode}{"\n"}{end}' \
| grep -c "STRICT" || true)
[ "$STRICT" -eq 0 ] && echo "[FAIL] No STRICT mTLS" \
|| echo "[PASS] STRICT mTLS enforced"
# 3. Legacy non-expiring service account token volumes
# 4. Privileged containers and root UID (zero trust violation)
# 5. LoadBalancer / NodePort services expose workloads externally
# (see full script for jq-style Python parsers)上述指令稿檢查五項關鍵項目:預設拒絕網路策略是否存在、PeerAuthentication 是否為 STRICT mTLS、是否使用過舊的不過期權杖、是否有特權或以 root 執行的容器,以及是否有 LoadBalancer 或 NodePort 對外暴露的服務。每一項均對應至常見零信任控制缺口,可作為 CI/CD 或稽核工具中的快速檢查。
防禦與緩解
全面導入工作負載身分:所有 AI 元件都應採用 SPIFFE/SPIRE 或雲原生工作負載身分(GKE Workload Identity、EKS IRSA),淘汰靜態憑證與服務帳號金鑰。訓練工作應使用自動輪替的短效權杖。
對所有服務對服務通訊強制 mTLS:以 STRICT mTLS 模式部署服務網格(Istio、Linkerd)。對於使用 RDMA/InfiniBand 的高效能訓練網路,若核心旁路網路阻止網格層執行,應於應用層實作加密。
套用預設拒絕的網路策略:每個 AI 命名空間都應有預設拒絕入站與出站策略,僅明確允許必要通訊路徑:訓練對資料儲存、推論對模型登錄檔、管線控制器對各階段。
在每個邊界驗證工件:每個管線階段都應以密碼學簽章(而非僅雜湊)驗證傳入工件的完整性。使用 Sigstore/cosign 等工具進行模型工件簽章與驗證。
短效憑證搭配持續驗證:訓練工作應取得在工作結束前即將過期的憑證,並透過權杖交換重新評估授權,藉此限制憑證被竊的爆炸半徑。
監控並對策略違規發出警示:實施持續合規監控,偵測網路策略變更、新出現的特權工作負載、服務網格繞過以及憑證異常。與 SIEM 整合以與其他安全事件關聯。
實作資料層級的零信任:超越網路與身分,將零信任原則套用至資料本身。訓練資料應攜帶來源與完整性中繼資料;模型工件在每個載入點都應經過簽章與驗證;推論輸入與輸出應依預期綱要與分佈加以驗證。這種資料層零信任可攔截透過合法管線元件(例如資料投毒)繞過網路控制的攻擊。
依敏感度分段:並非所有 AI 工作負載皆需相同安全姿態。分類商品圖片的模型與處理醫療紀錄的模型風險不同。應建立分層零信任區域,將最嚴格的控制(硬體證明、加密推論、完全隔離網路)保留給最敏感的工作負載,較低敏感度的工作負載則採較輕量控制。這可避免極端安全的效能負擔成為採用障礙。
定期測試零信任控制:零信任架構會隨時間因例外累積、組態漂移與新元件未適當整合而退化。應安排定期滲透測試,具體瞄準零信任邊界:嘗試從訓練橫向移動至推論、從某租戶到另一租戶、從受汙染的容器至主機、從內部位置到外部資料外洩。每次測試驗證控制實際有效,而不僅是已設定。
參考資料
- Rose, S., Borchert, O., Mitchell, S., & Connelly, S. (2020). "Zero Trust Architecture." NIST Special Publication 800-207. https://doi.org/10.6028/NIST.SP.800-207
- SPIFFE. (2024). "Secure Production Identity Framework for Everyone." https://spiffe.io/docs/latest/spiffe-about/overview/
- NIST. (2023). "Artificial Intelligence Risk Management Framework (AI RMF 1.0)." https://www.nist.gov/artificial-intelligence/executive-order-safe-secure-and-trustworthy-artificial-intelligence
- MITRE ATLAS. "Techniques: ML Supply Chain Compromise." https://atlas.mitre.org/techniques/AML.T0010
- Google. (2024). "BeyondCorp: A New Approach to Enterprise Security." https://cloud.google.com/beyondcorp
- Kubernetes. (2024). "Network Policies." https://kubernetes.io/docs/concepts/services-networking/network-policies/