Lab: Supply Chain Audit
Audit an ML project's dependencies for vulnerabilities, covering model files, Python packages, container images, and training data provenance.
Prerequisites
- Python 3.10+ with
pip-audit,safety - Docker installed (for container image scanning)
- Familiarity with Python package management and virtual environments
pip install pip-audit safetyBackground
ML supply chain risks differ from traditional software supply chains. Beyond code dependencies, ML projects depend on model weights (which can contain executable code), training data (which can be poisoned), and specialized frameworks with complex native dependencies.
Lab Exercises
Set Up a Sample ML Project for Auditing
Create a realistic ML project structure with dependencies to audit:
#!/bin/bash # Set up a sample ML project for supply chain auditing mkdir -p ml-audit-project/{models,data,configs,scripts} cd ml-audit-project # Create requirements.txt with a mix of packages cat > requirements.txt << 'EOF' torch==2.2.0 transformers==4.38.0 numpy==1.26.4 pandas==2.2.0 scikit-learn==1.4.0 flask==3.0.2 requests==2.31.0 pyyaml==6.0.1 pillow==10.2.0 python-dotenv==1.0.1 gradio==4.19.0 huggingface-hub==0.21.0 datasets==2.17.0 accelerate==0.27.0 sentencepiece==0.2.0 protobuf==4.25.2 EOF # Create a model loading script (with deliberate security concerns) cat > scripts/load_model.py << 'PYEOF' #!/usr/bin/env python3 """Load and serve a model -- contains common security anti-patterns.""" import pickle import torch import yaml import os from flask import Flask, request, jsonify app = Flask(__name__) # AUDIT CONCERN: Loading model from pickle (arbitrary code execution) def load_model_pickle(path): with open(path, "rb") as f: return pickle.load(f) # Unsafe deserialization # AUDIT CONCERN: Loading config from YAML without safe_load def load_config(path): with open(path) as f: return yaml.load(f, Loader=yaml.FullLoader) # Should use safe_load # AUDIT CONCERN: Model path from user input without validation @app.route("/predict", methods=["POST"]) def predict(): model_name = request.json.get("model") model_path = f"models/{model_name}" # Path traversal risk model = load_model_pickle(model_path) return jsonify({"result": "prediction"}) # AUDIT CONCERN: Debug mode in production if __name__ == "__main__": app.run(debug=True, host="0.0.0.0") # Debug mode exposed PYEOF # Create a Dockerfile with common issues cat > Dockerfile << 'DEOF' FROM python:3.11 # AUDIT: Running as root # AUDIT: No pinned hash for base image WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt # AUDIT: No vulnerability scanning step COPY . . # AUDIT: Exposing all ports EXPOSE 5000 CMD ["python", "scripts/load_model.py"] DEOF echo "Sample ML project created in ml-audit-project/"bash setup_project.shAudit Python Dependencies
Scan the project's Python dependencies for known vulnerabilities:
#!/usr/bin/env python3 """Audit Python package dependencies for security vulnerabilities.""" import subprocess import json from pathlib import Path def run_pip_audit(requirements_file: str) -> dict: """Run pip-audit against a requirements file.""" try: result = subprocess.run( ["pip-audit", "-r", requirements_file, "--format", "json", "--desc", "--fix", "--dry-run"], capture_output=True, text=True, timeout=120, ) if result.stdout: return json.loads(result.stdout) return {"dependencies": [], "error": result.stderr} except FileNotFoundError: return {"error": "pip-audit not installed. Run: pip install pip-audit"} except subprocess.TimeoutExpired: return {"error": "Audit timed out"} def check_typosquatting(requirements_file: str) -> list: """Check for potential typosquatting in package names.""" known_typosquats = { "pytorch": "torch", "python-opencv": "opencv-python", "sklearn": "scikit-learn", "numppy": "numpy", "reqests": "requests", "tesorflow": "tensorflow", "trasformers": "transformers", "panadas": "pandas", } findings = [] with open(requirements_file) as f: for line in f: pkg = line.strip().split("==")[0].split(">=")[0].lower() if pkg in known_typosquats: findings.append({ "package": pkg, "likely_intended": known_typosquats[pkg], "severity": "high", }) return findings def check_unpinned_versions(requirements_file: str) -> list: """Identify packages without pinned versions.""" findings = [] with open(requirements_file) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "==" not in line: pkg = line.split(">=")[0].split(">")[0].split("<")[0] findings.append({ "package": pkg, "issue": "Version not pinned with ==", "severity": "medium", }) return findings if __name__ == "__main__": req_file = "ml-audit-project/requirements.txt" if not Path(req_file).exists(): print("Run setup_project.sh first.") exit(1) print("=== Python Dependency Audit ===\n") # Vulnerability scan print("[1] Known Vulnerability Scan (pip-audit):") vuln_results = run_pip_audit(req_file) if "error" in vuln_results: print(f" Note: {vuln_results['error']}") else: deps = vuln_results.get("dependencies", []) vulns = [d for d in deps if d.get("vulns")] if vulns: for dep in vulns: for vuln in dep["vulns"]: print(f" [VULN] {dep['name']} {dep['version']}: " f"{vuln['id']} - {vuln.get('description', 'N/A')[:80]}") else: print(" No known vulnerabilities found.") # Typosquatting check print("\n[2] Typosquatting Check:") typos = check_typosquatting(req_file) if typos: for t in typos: print(f" [ALERT] '{t['package']}' may be typosquat " f"of '{t['likely_intended']}'") else: print(" No suspected typosquats found.") # Unpinned versions print("\n[3] Version Pinning Check:") unpinned = check_unpinned_versions(req_file) if unpinned: for u in unpinned: print(f" [WARN] {u['package']}: {u['issue']}") else: print(" All packages have pinned versions.")python audit_dependencies.pyAudit Model File Safety
Check model files for unsafe serialization formats:
#!/usr/bin/env python3 """Audit model files for security risks.""" import struct from pathlib import Path def check_pickle_safety(filepath: Path) -> dict: """Check if a file is a pickle file and assess risk.""" try: with open(filepath, "rb") as f: magic = f.read(2) # Pickle protocol opcodes is_pickle = magic[0:1] == b'\x80' or magic[0:1] in ( b'(', b']', b'}', b'c', ) if is_pickle: f.seek(0) content = f.read(1024) # Look for dangerous opcodes dangerous_opcodes = { b'c': "GLOBAL (imports modules)", b'R': "REDUCE (calls functions)", b'i': "INST (instantiates objects)", b'o': "OBJ (calls constructors)", } found = {} for opcode, desc in dangerous_opcodes.items(): if opcode in content: found[opcode.hex()] = desc return { "is_pickle": True, "risk": "CRITICAL", "dangerous_opcodes": found, "recommendation": "Use safetensors format instead", } except Exception: pass return {"is_pickle": False, "risk": "LOW"} def check_pytorch_safety(filepath: Path) -> dict: """Check PyTorch model files for safety.""" try: with open(filepath, "rb") as f: magic = f.read(8) if magic[:2] == b'PK': # ZIP file (PyTorch .pt format) return { "format": "pytorch_zip", "risk": "HIGH", "reason": "PyTorch .pt files use pickle internally", "recommendation": "Convert to safetensors format", } if magic[:8] == b'\x93NUMPY\x01': return { "format": "numpy", "risk": "MEDIUM", "reason": "NumPy files can contain object arrays with pickle", "recommendation": "Verify allow_pickle=False is used when loading", } except Exception: pass return {"format": "unknown", "risk": "UNKNOWN"} def check_safetensors(filepath: Path) -> dict: """Verify safetensors files are properly formatted.""" try: with open(filepath, "rb") as f: header_len = struct.unpack("<Q", f.read(8))[0] if header_len < 10_000_000: # Reasonable header size return { "format": "safetensors", "risk": "LOW", "reason": "Safetensors does not allow code execution", "header_size": header_len, } except Exception: pass return {"format": "unknown", "risk": "UNKNOWN"} def audit_model_directory(model_dir: str) -> list: """Audit all model files in a directory.""" findings = [] model_extensions = {".pt", ".pth", ".pkl", ".pickle", ".bin", ".safetensors", ".npy", ".npz", ".onnx"} for filepath in Path(model_dir).rglob("*"): if filepath.suffix.lower() in model_extensions: finding = { "file": str(filepath), "extension": filepath.suffix, "size_mb": filepath.stat().st_size / (1024 * 1024), } if filepath.suffix in (".pkl", ".pickle"): finding.update(check_pickle_safety(filepath)) elif filepath.suffix in (".pt", ".pth", ".bin"): finding.update(check_pytorch_safety(filepath)) elif filepath.suffix == ".safetensors": finding.update(check_safetensors(filepath)) else: finding["risk"] = "REVIEW" finding["recommendation"] = "Manual review needed" findings.append(finding) return findings if __name__ == "__main__": print("=== Model File Security Audit ===\n") # Create sample model files for demonstration model_dir = Path("ml-audit-project/models") model_dir.mkdir(parents=True, exist_ok=True) # Create a sample pickle file (demonstrating the risk) import pickle sample_data = {"weights": [0.1, 0.2, 0.3], "bias": 0.5} with open(model_dir / "model_v1.pkl", "wb") as f: pickle.dump(sample_data, f) findings = audit_model_directory("ml-audit-project/models") if not findings: print("No model files found to audit.") else: for f in findings: print(f"File: {f['file']}") print(f" Format: {f.get('format', f['extension'])}") print(f" Size: {f['size_mb']:.1f} MB") print(f" Risk: {f['risk']}") if f.get("dangerous_opcodes"): print(f" Dangerous opcodes: {f['dangerous_opcodes']}") if f.get("recommendation"): print(f" Recommendation: {f['recommendation']}") print() print("\n=== Model Format Risk Summary ===") print(" safetensors: SAFE - No code execution possible") print(" ONNX: LOW - Standard format, limited attack surface") print(" pickle/pkl: CRITICAL - Arbitrary code execution on load") print(" PyTorch .pt: HIGH - Uses pickle internally") print(" NumPy .npy: MEDIUM - Object arrays can contain pickle")python audit_models.pyAudit Source Code for Security Anti-Patterns
Scan the project source code for common ML security anti-patterns:
#!/usr/bin/env python3 """Audit ML project source code for security anti-patterns.""" import re from pathlib import Path ANTI_PATTERNS = [ { "id": "SC-01", "name": "Unsafe pickle load", "pattern": r"pickle\.load\s*\(", "severity": "CRITICAL", "fix": "Use safetensors, torch.load(weights_only=True), or json", }, { "id": "SC-02", "name": "Unsafe YAML load", "pattern": r"yaml\.load\s*\([^)]*(?:FullLoader|Loader=yaml\.FullLoader)", "severity": "HIGH", "fix": "Use yaml.safe_load() instead", }, { "id": "SC-03", "name": "Unsafe YAML (no loader specified)", "pattern": r"yaml\.load\s*\([^,)]*\)", "severity": "HIGH", "fix": "Use yaml.safe_load() instead", }, { "id": "SC-04", "name": "Flask debug mode", "pattern": r"app\.run\s*\([^)]*debug\s*=\s*True", "severity": "HIGH", "fix": "Never use debug=True in production", }, { "id": "SC-05", "name": "Hardcoded secrets", "pattern": r"(?:api_key|password|secret|token)\s*=\s*['\"][^'\"]+['\"]", "severity": "CRITICAL", "fix": "Use environment variables or a secrets manager", }, { "id": "SC-06", "name": "Host 0.0.0.0 binding", "pattern": r"host\s*=\s*['\"]0\.0\.0\.0['\"]", "severity": "MEDIUM", "fix": "Bind to 127.0.0.1 unless intentionally exposing", }, { "id": "SC-07", "name": "Unvalidated file path from user input", "pattern": r"request\.(json|form|args).*(?:open|Path)\s*\(", "severity": "HIGH", "fix": "Validate and sanitize file paths; use allowlists", }, { "id": "SC-08", "name": "torch.load without weights_only", "pattern": r"torch\.load\s*\([^)]*(?!weights_only)", "severity": "HIGH", "fix": "Use torch.load(path, weights_only=True)", }, { "id": "SC-09", "name": "eval() or exec() usage", "pattern": r"(?:eval|exec)\s*\(", "severity": "CRITICAL", "fix": "Never use eval/exec with untrusted input", }, { "id": "SC-10", "name": "NumPy load with allow_pickle", "pattern": r"np\.load\s*\([^)]*allow_pickle\s*=\s*True", "severity": "HIGH", "fix": "Avoid allow_pickle=True; use safetensors instead", }, ] def audit_file(filepath: Path) -> list: """Scan a single file for security anti-patterns.""" findings = [] try: content = filepath.read_text(encoding="utf-8", errors="ignore") for pattern_def in ANTI_PATTERNS: matches = list(re.finditer(pattern_def["pattern"], content)) for match in matches: line_num = content[:match.start()].count("\n") + 1 findings.append({ **pattern_def, "file": str(filepath), "line": line_num, "match": match.group()[:80], }) except Exception: pass return findings def audit_project(project_dir: str) -> list: """Scan all Python files in a project for anti-patterns.""" all_findings = [] for pyfile in Path(project_dir).rglob("*.py"): all_findings.extend(audit_file(pyfile)) return all_findings if __name__ == "__main__": print("=== Source Code Security Audit ===\n") findings = audit_project("ml-audit-project") if not findings: print("No security anti-patterns found.") else: # Group by severity by_severity = {} for f in findings: by_severity.setdefault(f["severity"], []).append(f) for severity in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]: items = by_severity.get(severity, []) if items: print(f"\n[{severity}] ({len(items)} finding(s)):") for item in items: print(f" {item['id']}: {item['name']}") print(f" File: {item['file']}:{item['line']}") print(f" Match: {item['match']}") print(f" Fix: {item['fix']}") print(f"\n=== Summary ===") print(f"Total findings: {len(findings)}") for sev in ["CRITICAL", "HIGH", "MEDIUM"]: count = sum(1 for f in findings if f["severity"] == sev) if count: print(f" {sev}: {count}")python audit_code.pyGenerate the Audit Report
Compile all findings into a comprehensive supply chain audit report:
#!/usr/bin/env python3 """Generate a comprehensive supply chain audit report.""" from datetime import datetime REPORT_TEMPLATE = """ # ML Supply Chain Security Audit Report ## Project: ml-audit-project ## Date: {date} ## Auditor: [Name] --- ## Executive Summary This audit assessed the supply chain security of the ml-audit-project, covering Python dependencies, model file formats, container configuration, and source code patterns. The audit identified findings across multiple risk categories. ## Scope | Area | Coverage | |------|----------| | Python dependencies | requirements.txt ({dep_count} packages) | | Model files | models/ directory | | Container image | Dockerfile | | Source code | scripts/*.py | ## Findings Summary | Severity | Count | |----------|-------| | Critical | {critical} | | High | {high} | | Medium | {medium} | | Low | {low} | ## Detailed Findings ### 1. Dependency Vulnerabilities {dep_findings} ### 2. Model File Risks {model_findings} ### 3. Source Code Anti-Patterns {code_findings} ### 4. Container Security {container_findings} ## Recommendations ### Immediate Actions (Critical) 1. Replace pickle model loading with safetensors 2. Remove hardcoded secrets; use environment variables 3. Fix unsafe YAML loading (use safe_load) ### Short-Term (High) 4. Pin all dependency versions with hashes 5. Add vulnerability scanning to CI pipeline 6. Run containers as non-root user 7. Add torch.load weights_only=True ### Medium-Term 8. Implement model file integrity verification (checksums) 9. Set up dependency update monitoring (Dependabot/Renovate) 10. Create a model provenance tracking system """ if __name__ == "__main__": report = REPORT_TEMPLATE.format( date=datetime.now().strftime("%Y-%m-%d"), dep_count=16, critical=3, high=4, medium=2, low=1, dep_findings="- Run pip-audit results here\n- Check for typosquatting", model_findings="- Pickle files found: CRITICAL risk\n- Convert to safetensors", code_findings="- pickle.load() usage\n- yaml.FullLoader\n- Flask debug mode", container_findings="- Running as root\n- No vulnerability scan\n- Unpinned base image", ) print(report) with open("supply_chain_audit_report.md", "w") as f: f.write(report) print("\nReport saved to supply_chain_audit_report.md")python generate_report.py
ML Supply Chain Risk Taxonomy
| Risk Category | Examples | Severity |
|---|---|---|
| Model serialization | Pickle RCE, PyTorch backdoors | Critical |
| Dependency vulnerabilities | CVEs in torch, numpy, pillow | High |
| Typosquatting | pytorch instead of torch | Critical |
| Container security | Root user, unpatched base images | High |
| Training data | Poisoned datasets, license violations | Medium-High |
| Configuration | Hardcoded secrets, debug modes | High |
Troubleshooting
| Issue | Solution |
|---|---|
| pip-audit not found | Install with pip install pip-audit |
| No model files to scan | Run setup_project.sh first to create sample files |
| Permission errors | Ensure you have read access to the project directory |
| False positives in code scan | Review each finding manually; adjust regex patterns |
Related Topics
- Backdoor Detection -- Detecting backdoors in model weights
- Fine-Tune Backdoor -- Understanding how model backdoors are created
- Container Breakout -- Container security for ML serving
- Environment Setup -- Setting up secure ML development environments
References
- "Machine Learning Supply Chain Compromises" -- MITRE ATLAS -- Taxonomy of ML supply chain attacks
- "Blind Backdoors in Deep Learning Models" -- Bagdasaryan & Shmatikov (2022) -- Model backdoor techniques
- "Safetensors: A Safe and Fast File Format for ML" -- Hugging Face -- Secure alternative to pickle serialization
- "OWASP Machine Learning Security Top 10" -- OWASP (2025) -- ML-specific security risks
Why are pickle-serialized model files a critical security risk?
What is the recommended safe alternative to pickle for storing ML model weights?