ML Pipeline Security
Defense-focused guide to securing ML training and deployment pipelines, covering CI/CD cross-tenant attacks, safetensors conversion hijacking, pipeline hardening, and isolated build environments.
ML pipelines introduce attack surface that does not exist in traditional software CI/CD. In addition to code compilation and testing, ML pipelines handle model training, data preprocessing, model conversion, and artifact storage -- each step processing untrusted inputs (data, model weights) that can carry embedded payloads. Wiz Research demonstrated in 2024 that Hugging Face's CI/CD infrastructure had cross-tenant access vulnerabilities allowing attackers to compromise the build pipeline serving thousands of models. HiddenLayer showed that even security-focused operations like safetensors conversion can be hijacked to inject malicious payloads.
Real-World ML Pipeline Attacks
Wiz Research: Hugging Face Cross-Tenant CI/CD Access
In 2024, Wiz researchers discovered that Hugging Face's Spaces CI/CD infrastructure allowed cross-tenant access:
Attack Chain (Wiz Findings):
┌─────────────────────┐
│ Attacker creates │
│ malicious HF Space │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Space runs attacker- │
│ controlled code in │
│ CI/CD environment │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ CI/CD environment │
│ has shared secrets, │
│ cross-tenant access │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Attacker accesses │
│ other tenants' build │
│ artifacts, secrets, │
│ and model storage │
└─────────────────────┘
Impact: Full supply chain compromise of models
built on the shared infrastructure.
HiddenLayer: Safetensors Conversion Service Hijack
HiddenLayer demonstrated that the safetensors conversion service on Hugging Face could be exploited:
Attack Chain (HiddenLayer Findings):
1. Attacker uploads a model with malicious pickle payload
2. Hugging Face's automated conversion service loads the model
to convert it from pickle to safetensors
3. The pickle deserialization executes the embedded payload
4. The payload modifies the conversion output (safetensors file)
5. Users download the "safe" safetensors version -- which is
actually a backdoored model produced by the attacker's payload
The irony: The safety mechanism (conversion to safetensors)
became the attack vector because it required loading untrusted
pickle files.
NullBulge Supply Chain Campaigns
NullBulge conducted supply chain attacks targeting both GitHub and Hugging Face:
NullBulge Tactics:
1. Compromised legitimate developer accounts via phishing
2. Added malicious code to popular ML repositories
3. Uploaded backdoored models to Hugging Face under the
compromised accounts' namespaces
4. The legitimate account history provided social proof,
making the malicious content appear trustworthy
Pipeline Attack Surface Analysis
"""
ML Pipeline Attack Surface Mapper
Analyzes an ML pipeline configuration to identify attack surface
and recommend hardening measures.
"""
import yaml
import json
from dataclasses import dataclass, field
from enum import Enum
class RiskLevel(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
@dataclass
class PipelineStage:
name: str
inputs: list[str]
outputs: list[str]
runs_untrusted_code: bool
has_network_access: bool
has_gpu_access: bool
secrets_available: list[str] = field(default_factory=list)
isolation_level: str = "none" # none, container, vm, hardware
@dataclass
class AttackSurface:
stage: str
vector: str
risk: RiskLevel
description: str
mitigation: str
def analyze_pipeline(stages: list[PipelineStage]) -> list[AttackSurface]:
"""Analyze pipeline stages for attack surface."""
findings = []
for stage in stages:
# Check for untrusted code execution
if stage.runs_untrusted_code:
if stage.isolation_level == "none":
findings.append(AttackSurface(
stage=stage.name,
vector="untrusted_code_no_isolation",
risk=RiskLevel.CRITICAL,
description=(
f"Stage '{stage.name}' runs untrusted code "
f"without isolation. Any code in model files, "
f"training scripts, or dependencies can execute "
f"with full pipeline permissions."
),
mitigation=(
"Run in isolated container with no network access, "
"no secrets, and restricted filesystem. Use gVisor "
"or Firecracker for stronger isolation."
),
))
elif stage.isolation_level == "container":
findings.append(AttackSurface(
stage=stage.name,
vector="untrusted_code_container_only",
risk=RiskLevel.MEDIUM,
description=(
f"Stage '{stage.name}' runs untrusted code "
f"in a container. Container escapes are possible."
),
mitigation=(
"Add gVisor/Firecracker sandboxing. Remove network "
"access and secrets from the container."
),
))
# Check for secrets exposure
if stage.secrets_available and stage.runs_untrusted_code:
findings.append(AttackSurface(
stage=stage.name,
vector="secrets_in_untrusted_stage",
risk=RiskLevel.CRITICAL,
description=(
f"Stage '{stage.name}' has access to secrets "
f"({', '.join(stage.secrets_available)}) while running "
f"untrusted code. Compromised code can exfiltrate secrets."
),
mitigation=(
"Remove all secrets from stages that process untrusted "
"inputs. Use a separate, privileged stage for operations "
"that require secrets (e.g., artifact signing, registry push)."
),
))
# Check for network access during untrusted execution
if stage.has_network_access and stage.runs_untrusted_code:
findings.append(AttackSurface(
stage=stage.name,
vector="network_access_untrusted",
risk=RiskLevel.HIGH,
description=(
f"Stage '{stage.name}' has network access while "
f"running untrusted code. Enables data exfiltration "
f"and command-and-control."
),
mitigation=(
"Block all outbound network access during untrusted "
"stages. Pre-download all dependencies in a separate, "
"earlier stage."
),
))
# Check for GPU access (side-channel risks)
if stage.has_gpu_access and stage.isolation_level in ("none", "container"):
findings.append(AttackSurface(
stage=stage.name,
vector="shared_gpu_access",
risk=RiskLevel.MEDIUM,
description=(
f"Stage '{stage.name}' uses GPU without hardware "
f"isolation. GPU memory may retain data from "
f"previous jobs."
),
mitigation=(
"Clear GPU memory between jobs. Use MIG (Multi-Instance "
"GPU) or dedicated GPU nodes for sensitive workloads."
),
))
return findings
# Example: analyze a typical ML pipeline
if __name__ == "__main__":
pipeline = [
PipelineStage(
name="data_preprocessing",
inputs=["raw_dataset", "preprocessing_scripts"],
outputs=["processed_dataset"],
runs_untrusted_code=True, # User-provided scripts
has_network_access=True,
has_gpu_access=False,
secrets_available=["DATA_STORE_KEY"],
isolation_level="container",
),
PipelineStage(
name="model_training",
inputs=["processed_dataset", "model_config", "base_model"],
outputs=["trained_model"],
runs_untrusted_code=True, # Training code + base model
has_network_access=True,
has_gpu_access=True,
secrets_available=["WANDB_API_KEY", "HF_TOKEN"],
isolation_level="container",
),
PipelineStage(
name="model_evaluation",
inputs=["trained_model", "eval_dataset"],
outputs=["eval_results"],
runs_untrusted_code=True, # Model inference
has_network_access=False,
has_gpu_access=True,
secrets_available=[],
isolation_level="container",
),
PipelineStage(
name="model_signing",
inputs=["trained_model", "eval_results"],
outputs=["signed_model"],
runs_untrusted_code=False, # Signing tool only
has_network_access=False,
has_gpu_access=False,
secrets_available=["SIGNING_KEY"],
isolation_level="vm",
),
]
findings = analyze_pipeline(pipeline)
for f in findings:
print(f"[{f.risk.value.upper():>8}] {f.stage}: {f.vector}")
print(f" {f.description[:100]}...")
print(f" Mitigation: {f.mitigation[:100]}...")
print()Pipeline Hardening
Isolated Build Environments
# ml-pipeline-hardened.yaml
# GitHub Actions workflow with security hardening for ML pipelines
name: Secure ML Pipeline
on:
push:
branches: [main]
paths:
- 'models/**'
- 'training/**'
- 'configs/**'
permissions:
contents: read # Minimal permissions
jobs:
# Stage 1: Dependency installation (has network, no secrets)
install-dependencies:
runs-on: ubuntu-latest
container:
image: python:3.11-slim
steps:
- uses: actions/checkout@v4
- name: Install dependencies with hash verification
run: |
pip install --require-hashes -r requirements.txt
pip freeze > installed-packages.txt
- name: Scan dependencies for vulnerabilities
run: |
pip install safety
safety check --full-report --output json > dependency-scan.json
# Fail on critical vulnerabilities
python3 -c "
import json
with open('dependency-scan.json') as f:
report = json.load(f)
critical = [v for v in report.get('vulnerabilities', [])
if v.get('severity', '').lower() == 'critical']
if critical:
print(f'BLOCKING: {len(critical)} critical vulnerabilities found')
for v in critical:
print(f' - {v[\"package_name\"]}: {v[\"vulnerability_id\"]}')
exit(1)
print('No critical vulnerabilities found')
"
- name: Upload verified dependencies
uses: actions/upload-artifact@v4
with:
name: verified-deps
path: |
installed-packages.txt
dependency-scan.json
# Stage 2: Data validation (no network, no secrets)
validate-data:
runs-on: ubuntu-latest
needs: install-dependencies
container:
image: python:3.11-slim
options: --network=none # No network access
steps:
- uses: actions/checkout@v4
- name: Download verified dependencies
uses: actions/download-artifact@v4
with:
name: verified-deps
- name: Validate training data integrity
run: |
python3 scripts/validate_training_data.py \
--dataset configs/dataset_config.yaml \
--output data-validation-report.json
- name: Check for data poisoning indicators
run: |
python3 scripts/check_data_poisoning.py \
--report data-validation-report.json \
--threshold 0.05
# Stage 3: Model training (no network, GPU, no secrets except WandB)
train-model:
runs-on: [self-hosted, gpu]
needs: validate-data
container:
image: pytorch/pytorch:2.2.0-cuda12.1-runtime
options: --network=none --security-opt=no-new-privileges
steps:
- uses: actions/checkout@v4
- name: Train model in isolated environment
run: |
python3 training/train.py \
--config configs/training_config.yaml \
--output ./trained-model/
env:
CUDA_VISIBLE_DEVICES: "0"
# No HF_TOKEN, no network -- cannot download anything at runtime
- name: Calculate model checksums
run: |
find ./trained-model/ -type f -exec sha256sum {} \; > model-checksums.sha256
- name: Upload model artifact
uses: actions/upload-artifact@v4
with:
name: trained-model
path: |
trained-model/
model-checksums.sha256
# Stage 4: Model security scan (no network, no GPU)
scan-model:
runs-on: ubuntu-latest
needs: train-model
container:
image: python:3.11-slim
options: --network=none
steps:
- uses: actions/checkout@v4
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
- name: Run trojan detection scans
run: |
python3 scripts/trojan_scan.py \
--model-path ./trained-model/ \
--output scan-report.json
- name: Verify no dangerous file formats
run: |
DANGEROUS=$(find ./trained-model/ \( -name "*.pkl" -o -name "*.pickle" -o -name "*.joblib" \) | wc -l)
if [ "$DANGEROUS" -gt 0 ]; then
echo "FAIL: Found $DANGEROUS dangerous serialization files"
exit 1
fi
echo "PASS: No dangerous serialization formats"
# Stage 5: Model signing (isolated VM, has signing key)
sign-model:
runs-on: [self-hosted, signing] # Dedicated signing runner
needs: scan-model
steps:
- name: Download model artifact
uses: actions/download-artifact@v4
with:
name: trained-model
- name: Verify checksums
run: |
sha256sum -c model-checksums.sha256
- name: Sign model artifacts
run: |
cosign sign-blob \
--key env://COSIGN_PRIVATE_KEY \
--output-signature model.sig \
--output-certificate model.cert \
./trained-model/model.safetensors
env:
COSIGN_PRIVATE_KEY: ${{ secrets.MODEL_SIGNING_KEY }}
- name: Upload signed model
uses: actions/upload-artifact@v4
with:
name: signed-model
path: |
trained-model/
model.sig
model.cert
model-checksums.sha256Build Environment Isolation with Containers
# Dockerfile.ml-build-isolated
# Hardened container for ML pipeline stages that process untrusted inputs
FROM python:3.11-slim AS base
# Security: Run as non-root user
RUN groupadd -r mlbuild && useradd -r -g mlbuild -d /workspace -s /bin/bash mlbuild
# Security: Remove unnecessary tools that aid exploitation
RUN apt-get update && apt-get install -y --no-install-recommends \
libgomp1 \
&& rm -rf /var/lib/apt/lists/* \
&& apt-get purge -y --auto-remove curl wget git \
&& rm -rf /usr/bin/nc /usr/bin/ncat /usr/bin/nmap
# Security: Install only required Python packages (pre-verified)
COPY requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir --require-hashes -r /tmp/requirements.txt \
&& rm /tmp/requirements.txt
# Security: Read-only filesystem for application code
COPY --chown=mlbuild:mlbuild scripts/ /workspace/scripts/
COPY --chown=mlbuild:mlbuild configs/ /workspace/configs/
# Security: Writable directory only for outputs
RUN mkdir -p /workspace/output && chown mlbuild:mlbuild /workspace/output
VOLUME /workspace/output
# Security: Drop all capabilities
USER mlbuild
WORKDIR /workspace
# Security: No shell by default
ENTRYPOINT ["python3"]"""
Pipeline Integrity Monitor
Monitors ML pipeline execution for signs of compromise:
- Unexpected network connections during isolated stages
- File modifications outside expected paths
- Unusual resource usage patterns
- Unauthorized secret access attempts
"""
import os
import json
import time
import hashlib
import logging
from pathlib import Path
from datetime import datetime
logger = logging.getLogger("pipeline_monitor")
class PipelineIntegrityMonitor:
"""
Runtime monitor for ML pipeline stages.
Deploys as a sidecar container or init process.
"""
def __init__(self, stage_name: str, config: dict):
self.stage_name = stage_name
self.config = config
self.alerts = []
self.start_time = datetime.now()
# Expected filesystem state
self.allowed_write_paths = config.get("allowed_write_paths", ["/tmp", "/workspace/output"])
self.blocked_write_paths = config.get("blocked_write_paths", ["/etc", "/usr", "/bin"])
# Take filesystem snapshot at start
self.initial_snapshot = self._snapshot_critical_paths()
def _snapshot_critical_paths(self) -> dict:
"""Snapshot critical system paths for tamper detection."""
snapshot = {}
critical_paths = ["/usr/local/lib/python3.11", "/workspace/scripts"]
for base_path in critical_paths:
base = Path(base_path)
if not base.exists():
continue
for f in base.rglob("*.py"):
try:
snapshot[str(f)] = hashlib.sha256(f.read_bytes()).hexdigest()
except (PermissionError, OSError):
pass
return snapshot
def check_filesystem_integrity(self) -> list[dict]:
"""Check for unauthorized filesystem modifications."""
alerts = []
current_snapshot = self._snapshot_critical_paths()
for path, original_hash in self.initial_snapshot.items():
current_hash = current_snapshot.get(path)
if current_hash is None:
alerts.append({
"type": "FILE_DELETED",
"severity": "critical",
"path": path,
"stage": self.stage_name,
"timestamp": datetime.now().isoformat(),
})
elif current_hash != original_hash:
alerts.append({
"type": "FILE_MODIFIED",
"severity": "critical",
"path": path,
"original_hash": original_hash,
"current_hash": current_hash,
"stage": self.stage_name,
"timestamp": datetime.now().isoformat(),
})
# Check for new files in critical paths
for path in current_snapshot:
if path not in self.initial_snapshot:
alerts.append({
"type": "NEW_FILE",
"severity": "high",
"path": path,
"stage": self.stage_name,
"timestamp": datetime.now().isoformat(),
})
return alerts
def check_network_connections(self) -> list[dict]:
"""Check for unexpected network connections (Linux only)."""
alerts = []
try:
net_path = Path("/proc/net/tcp")
if not net_path.exists():
return alerts
content = net_path.read_text()
for line in content.strip().split("\n")[1:]:
fields = line.split()
if len(fields) < 4:
continue
state = fields[3]
# State 01 = ESTABLISHED
if state == "01":
remote = fields[2]
remote_ip_hex, remote_port_hex = remote.split(":")
remote_port = int(remote_port_hex, 16)
# Alert on any outbound connections in isolated stages
if self.config.get("network_isolated", False):
alerts.append({
"type": "UNEXPECTED_NETWORK",
"severity": "critical",
"remote_port": remote_port,
"stage": self.stage_name,
"timestamp": datetime.now().isoformat(),
"message": (
"Network connection detected in isolated stage. "
"This may indicate data exfiltration."
),
})
except (PermissionError, OSError):
pass
return alerts
def check_process_tree(self) -> list[dict]:
"""Check for unexpected processes spawned during pipeline execution."""
alerts = []
suspicious_processes = {
"curl", "wget", "nc", "ncat", "ssh", "scp",
"python -m http.server", "bash -i",
}
try:
proc = Path("/proc")
for pid_dir in proc.iterdir():
if not pid_dir.name.isdigit():
continue
cmdline_path = pid_dir / "cmdline"
if cmdline_path.exists():
try:
cmdline = cmdline_path.read_text().replace("\x00", " ").strip()
for suspicious in suspicious_processes:
if suspicious in cmdline:
alerts.append({
"type": "SUSPICIOUS_PROCESS",
"severity": "critical",
"pid": pid_dir.name,
"cmdline": cmdline[:200],
"matched_pattern": suspicious,
"stage": self.stage_name,
"timestamp": datetime.now().isoformat(),
})
except (PermissionError, OSError):
pass
except (PermissionError, OSError):
pass
return alerts
def run_full_check(self) -> dict:
"""Run all integrity checks and return consolidated results."""
all_alerts = []
all_alerts.extend(self.check_filesystem_integrity())
all_alerts.extend(self.check_network_connections())
all_alerts.extend(self.check_process_tree())
self.alerts.extend(all_alerts)
critical = [a for a in all_alerts if a["severity"] == "critical"]
return {
"stage": self.stage_name,
"check_time": datetime.now().isoformat(),
"total_alerts": len(all_alerts),
"critical_alerts": len(critical),
"pipeline_compromised": len(critical) > 0,
"alerts": all_alerts,
"action": (
"HALT PIPELINE -- critical integrity violation detected"
if critical
else "Pipeline integrity verified"
),
}Artifact Signing and Verification in Pipelines
#!/bin/bash
# pipeline-artifact-signing.sh
# Sign and verify ML pipeline artifacts using cosign
set -euo pipefail
ACTION="${1:?Usage: pipeline-artifact-signing.sh <sign|verify> <artifact_path>}"
ARTIFACT_PATH="${2:?Provide artifact path}"
SIGNATURE_DIR="${ARTIFACT_PATH}.signatures"
mkdir -p "$SIGNATURE_DIR"
sign_artifact() {
echo "[*] Signing ML pipeline artifact: $ARTIFACT_PATH"
# Calculate artifact digest
DIGEST=$(sha256sum "$ARTIFACT_PATH" | awk '{print $1}')
echo "[*] SHA-256: $DIGEST"
# Create provenance metadata
cat > "$SIGNATURE_DIR/provenance.json" << EOF
{
"artifact": "$(basename "$ARTIFACT_PATH")",
"sha256": "$DIGEST",
"pipeline": "${GITHUB_WORKFLOW:-local}",
"run_id": "${GITHUB_RUN_ID:-none}",
"runner": "${RUNNER_NAME:-local}",
"commit": "$(git rev-parse HEAD 2>/dev/null || echo 'unknown')",
"timestamp": "$(date -u +%Y-%m-%dT%H:%M:%SZ)",
"builder": "$(whoami)@$(hostname)"
}
EOF
# Sign with cosign
cosign sign-blob \
--key env://COSIGN_PRIVATE_KEY \
--output-signature "$SIGNATURE_DIR/artifact.sig" \
--output-certificate "$SIGNATURE_DIR/artifact.cert" \
"$ARTIFACT_PATH"
# Sign provenance metadata
cosign sign-blob \
--key env://COSIGN_PRIVATE_KEY \
--output-signature "$SIGNATURE_DIR/provenance.sig" \
"$SIGNATURE_DIR/provenance.json"
echo "[*] Artifact signed successfully"
echo "[*] Signatures: $SIGNATURE_DIR/"
}
verify_artifact() {
echo "[*] Verifying ML pipeline artifact: $ARTIFACT_PATH"
# Verify artifact signature
cosign verify-blob \
--key env://COSIGN_PUBLIC_KEY \
--signature "$SIGNATURE_DIR/artifact.sig" \
"$ARTIFACT_PATH"
echo "[PASS] Artifact signature verified"
# Verify provenance signature
cosign verify-blob \
--key env://COSIGN_PUBLIC_KEY \
--signature "$SIGNATURE_DIR/provenance.sig" \
"$SIGNATURE_DIR/provenance.json"
echo "[PASS] Provenance signature verified"
# Verify checksum matches provenance
EXPECTED_HASH=$(python3 -c "
import json
with open('$SIGNATURE_DIR/provenance.json') as f:
print(json.load(f)['sha256'])
")
ACTUAL_HASH=$(sha256sum "$ARTIFACT_PATH" | awk '{print $1}')
if [ "$EXPECTED_HASH" = "$ACTUAL_HASH" ]; then
echo "[PASS] Checksum matches provenance"
else
echo "[FAIL] Checksum mismatch!"
echo " Expected: $EXPECTED_HASH"
echo " Actual: $ACTUAL_HASH"
exit 1
fi
# Display provenance
echo ""
echo "Provenance:"
python3 -m json.tool "$SIGNATURE_DIR/provenance.json"
}
case "$ACTION" in
sign) sign_artifact ;;
verify) verify_artifact ;;
*) echo "Unknown action: $ACTION"; exit 1 ;;
esacPipeline Security Checklist
# ml-pipeline-security-checklist.yaml
# Use this checklist to audit your ML pipeline security posture
pipeline_security_audit:
build_environment:
- check: "Build runners are dedicated (not shared with other teams/projects)"
critical: true
rationale: "Shared runners enable cross-project attacks (Wiz findings)"
- check: "Stages that process untrusted inputs run in isolated containers"
critical: true
rationale: "Untrusted model/data files can execute code during processing"
- check: "Network access is disabled for stages processing untrusted inputs"
critical: true
rationale: "Prevents data exfiltration from compromised stages"
- check: "GPU memory is cleared between jobs on shared GPU nodes"
critical: false
rationale: "GPU memory can retain sensitive data from previous jobs"
secrets_management:
- check: "Secrets are only available in stages that need them"
critical: true
rationale: "Training/processing stages should not have signing keys"
- check: "Signing keys are in a dedicated stage that does not process untrusted inputs"
critical: true
rationale: "Signing stage compromise allows forging trusted artifacts"
- check: "CI/CD tokens are scoped to minimum required permissions"
critical: true
rationale: "Compromised token with write access enables supply chain attacks"
artifact_integrity:
- check: "All model artifacts are signed before storage"
critical: true
rationale: "Unsigned artifacts have no provenance guarantee"
- check: "Checksums are calculated at each pipeline stage"
critical: false
rationale: "Enables detection of mid-pipeline tampering"
- check: "Artifact signatures are verified at deployment time"
critical: true
rationale: "Prevents deployment of tampered or unsigned models"
dependency_management:
- check: "All dependencies are pinned with integrity hashes"
critical: true
rationale: "Prevents dependency confusion and supply chain attacks"
- check: "Dependencies are resolved in a separate stage before untrusted processing"
critical: false
rationale: "Pre-resolution prevents runtime dependency manipulation"
- check: "Vulnerability scanning runs on every pipeline execution"
critical: true
rationale: "Catches newly disclosed CVEs in ML frameworks"
monitoring:
- check: "Pipeline execution logs are collected and monitored"
critical: true
rationale: "Enables post-incident investigation"
- check: "Unexpected network connections trigger alerts"
critical: true
rationale: "Detects data exfiltration attempts"
- check: "Build time anomalies trigger alerts"
critical: false
rationale: "Cryptocurrency mining or brute-force attacks cause time anomalies"References
- Wiz Research (2024). "Hugging Face CI/CD Cross-Tenant Vulnerability"
- HiddenLayer (2024). "Safetensors Conversion Service Exploitation"
- NullBulge (2024). "Supply Chain Attacks Targeting AI/ML Ecosystems"
- SLSA (2024). "Supply-chain Levels for Software Artifacts"
- Sigstore (2024). "Cosign: Container and Artifact Signing"
- CISA (2024). "Defending CI/CD Environments"
Why is the safetensors conversion service attack (HiddenLayer) particularly concerning from a supply chain perspective?