實驗室: Supply Chain Audit
Audit an ML project's dependencies for vulnerabilities, covering model files, Python packages, container images, and training data provenance.
先備知識
- Python 3.10+ with
pip-audit,安全 - Docker installed (for container image scanning)
- Familiarity with Python package management and virtual environments
pip install pip-audit 安全Background
ML 供應鏈 risks differ from traditional software supply chains. Beyond code dependencies, ML projects depend on model weights (which can contain executable code), 訓練資料 (which can be poisoned), and specialized frameworks with complex native dependencies.
Lab Exercises
Set Up a Sample ML Project for Auditing
Create a realistic ML project structure with dependencies to audit:
#!/bin/bash # Set up a sample ML project for 供應鏈 auditing mkdir -p ml-audit-project/{models,data,configs,scripts} cd ml-audit-project # Create requirements.txt with a mix of packages cat > requirements.txt << 'EOF' torch==2.2.0 transformers==4.38.0 numpy==1.26.4 pandas==2.2.0 scikit-learn==1.4.0 flask==3.0.2 requests==2.31.0 pyyaml==6.0.1 pillow==10.2.0 python-dotenv==1.0.1 gradio==4.19.0 huggingface-hub==0.21.0 datasets==2.17.0 accelerate==0.27.0 sentencepiece==0.2.0 protobuf==4.25.2 EOF # Create a model loading script (with deliberate 安全 concerns) cat > scripts/load_model.py << 'PYEOF' #!/usr/bin/env python3 """Load and serve a model -- contains common 安全 anti-patterns.""" import pickle import torch import yaml import os from flask import Flask, request, jsonify app = Flask(__name__) # AUDIT CONCERN: Loading model from pickle (arbitrary code execution) def load_model_pickle(path): with open(path, "rb") as f: return pickle.load(f) # Unsafe deserialization # AUDIT CONCERN: Loading config from YAML without safe_load def load_config(path): with open(path) as f: return yaml.load(f, Loader=yaml.FullLoader) # Should use safe_load # AUDIT CONCERN: Model path from 使用者輸入 without validation @app.route("/predict", methods=["POST"]) def predict(): model_name = request.json.get("model") model_path = f"models/{model_name}" # Path traversal risk model = load_model_pickle(model_path) return jsonify({"result": "prediction"}) # AUDIT CONCERN: Debug mode in production if __name__ == "__main__": app.run(debug=True, host="0.0.0.0") # Debug mode exposed PYEOF # Create a Dockerfile with common issues cat > Dockerfile << 'DEOF' FROM python:3.11 # AUDIT: Running as root # AUDIT: No pinned hash for base image WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt # AUDIT: No 漏洞 scanning step COPY . . # AUDIT: Exposing all ports EXPOSE 5000 CMD ["python", "scripts/load_model.py"] DEOF echo "Sample ML project created in ml-audit-project/"bash setup_project.shAudit Python Dependencies
Scan the project's Python dependencies for known 漏洞:
#!/usr/bin/env python3 """Audit Python package dependencies for 安全 漏洞.""" import subprocess import json from pathlib import Path def run_pip_audit(requirements_file: str) -> dict: """Run pip-audit against a requirements file.""" try: result = subprocess.run( ["pip-audit", "-r", requirements_file, "--format", "json", "--desc", "--fix", "--dry-run"], capture_output=True, text=True, timeout=120, ) if result.stdout: return json.loads(result.stdout) return {"dependencies": [], "error": result.stderr} except FileNotFoundError: return {"error": "pip-audit not installed. Run: pip install pip-audit"} except subprocess.TimeoutExpired: return {"error": "Audit timed out"} def check_typosquatting(requirements_file: str) -> list: """Check for potential typosquatting in package names.""" known_typosquats = { "pytorch": "torch", "python-opencv": "opencv-python", "sklearn": "scikit-learn", "numppy": "numpy", "reqests": "requests", "tesorflow": "tensorflow", "trasformers": "transformers", "panadas": "pandas", } findings = [] with open(requirements_file) as f: for line in f: pkg = line.strip().split("==")[0].split(">=")[0].lower() if pkg in known_typosquats: findings.append({ "package": pkg, "likely_intended": known_typosquats[pkg], "severity": "high", }) return findings def check_unpinned_versions(requirements_file: str) -> list: """識別 packages without pinned versions.""" findings = [] with open(requirements_file) as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "==" not in line: pkg = line.split(">=")[0].split(">")[0].split("<")[0] findings.append({ "package": pkg, "issue": "Version not pinned with ==", "severity": "medium", }) return findings if __name__ == "__main__": req_file = "ml-audit-project/requirements.txt" if not Path(req_file).exists(): print("Run setup_project.sh first.") exit(1) print("=== Python Dependency Audit ===\n") # 漏洞 scan print("[1] Known 漏洞 Scan (pip-audit):") vuln_results = run_pip_audit(req_file) if "error" in vuln_results: print(f" Note: {vuln_results['error']}") else: deps = vuln_results.get("dependencies", []) vulns = [d for d in deps if d.get("vulns")] if vulns: for dep in vulns: for vuln in dep["vulns"]: print(f" [VULN] {dep['name']} {dep['version']}: " f"{vuln['id']} - {vuln.get('description', 'N/A')[:80]}") else: print(" No known 漏洞 found.") # Typosquatting check print("\n[2] Typosquatting Check:") typos = check_typosquatting(req_file) if typos: for t in typos: print(f" [ALERT] '{t['package']}' may be typosquat " f"of '{t['likely_intended']}'") else: print(" No suspected typosquats found.") # Unpinned versions print("\n[3] Version Pinning Check:") unpinned = check_unpinned_versions(req_file) if unpinned: for u in unpinned: print(f" [WARN] {u['package']}: {u['issue']}") else: print(" All packages have pinned versions.")python audit_dependencies.pyAudit Model File 安全
Check model files for unsafe serialization formats:
#!/usr/bin/env python3 """Audit model files for 安全 risks.""" import struct from pathlib import Path def check_pickle_safety(filepath: Path) -> dict: """Check if a file is a pickle file and 評估 risk.""" try: with open(filepath, "rb") as f: magic = f.read(2) # Pickle protocol opcodes is_pickle = magic[0:1] == b'\x80' or magic[0:1] in ( b'(', b']', b'}', b'c', ) if is_pickle: f.seek(0) content = f.read(1024) # Look for dangerous opcodes dangerous_opcodes = { b'c': "GLOBAL (imports modules)", b'R': "REDUCE (calls functions)", b'i': "INST (instantiates objects)", b'o': "OBJ (calls constructors)", } found = {} for opcode, desc in dangerous_opcodes.items(): if opcode in content: found[opcode.hex()] = desc return { "is_pickle": True, "risk": "CRITICAL", "dangerous_opcodes": found, "recommendation": "Use safetensors format instead", } except Exception: pass return {"is_pickle": False, "risk": "LOW"} def check_pytorch_safety(filepath: Path) -> dict: """Check PyTorch model files for 安全.""" try: with open(filepath, "rb") as f: magic = f.read(8) if magic[:2] == b'PK': # ZIP file (PyTorch .pt format) return { "format": "pytorch_zip", "risk": "HIGH", "reason": "PyTorch .pt files use pickle internally", "recommendation": "Convert to safetensors format", } if magic[:8] == b'\x93NUMPY\x01': return { "format": "numpy", "risk": "MEDIUM", "reason": "NumPy files can contain object arrays with pickle", "recommendation": "Verify allow_pickle=False is used when loading", } except Exception: pass return {"format": "unknown", "risk": "UNKNOWN"} def check_safetensors(filepath: Path) -> dict: """Verify safetensors files are properly formatted.""" try: with open(filepath, "rb") as f: header_len = struct.unpack("<Q", f.read(8))[0] if header_len < 10_000_000: # Reasonable header size return { "format": "safetensors", "risk": "LOW", "reason": "Safetensors does not allow code execution", "header_size": header_len, } except Exception: pass return {"format": "unknown", "risk": "UNKNOWN"} def audit_model_directory(model_dir: str) -> list: """Audit all model files in a directory.""" findings = [] model_extensions = {".pt", ".pth", ".pkl", ".pickle", ".bin", ".safetensors", ".npy", ".npz", ".onnx"} for filepath in Path(model_dir).rglob("*"): if filepath.suffix.lower() in model_extensions: finding = { "file": str(filepath), "extension": filepath.suffix, "size_mb": filepath.stat().st_size / (1024 * 1024), } if filepath.suffix in (".pkl", ".pickle"): finding.update(check_pickle_safety(filepath)) elif filepath.suffix in (".pt", ".pth", ".bin"): finding.update(check_pytorch_safety(filepath)) elif filepath.suffix == ".safetensors": finding.update(check_safetensors(filepath)) else: finding["risk"] = "REVIEW" finding["recommendation"] = "Manual review needed" findings.append(finding) return findings if __name__ == "__main__": print("=== Model File 安全 Audit ===\n") # Create sample model files for demonstration model_dir = Path("ml-audit-project/models") model_dir.mkdir(parents=True, exist_ok=True) # Create a sample pickle file (demonstrating the risk) import pickle sample_data = {"weights": [0.1, 0.2, 0.3], "bias": 0.5} with open(model_dir / "model_v1.pkl", "wb") as f: pickle.dump(sample_data, f) findings = audit_model_directory("ml-audit-project/models") if not findings: print("No model files found to audit.") else: for f in findings: print(f"File: {f['file']}") print(f" Format: {f.get('format', f['extension'])}") print(f" Size: {f['size_mb']:.1f} MB") print(f" Risk: {f['risk']}") if f.get("dangerous_opcodes"): print(f" Dangerous opcodes: {f['dangerous_opcodes']}") if f.get("recommendation"): print(f" Recommendation: {f['recommendation']}") print() print("\n=== Model Format Risk 總結 ===") print(" safetensors: SAFE - No code execution possible") print(" ONNX: LOW - Standard format, limited 攻擊面") print(" pickle/pkl: CRITICAL - Arbitrary code execution on load") print(" PyTorch .pt: HIGH - Uses pickle internally") print(" NumPy .npy: MEDIUM - Object arrays can contain pickle")python audit_models.pyAudit Source Code for 安全 Anti-Patterns
Scan the project source code for common ML 安全 anti-patterns:
#!/usr/bin/env python3 """Audit ML project source code for 安全 anti-patterns.""" import re from pathlib import Path ANTI_PATTERNS = [ { "id": "SC-01", "name": "Unsafe pickle load", "pattern": r"pickle\.load\s*\(", "severity": "CRITICAL", "fix": "Use safetensors, torch.load(weights_only=True), or json", }, { "id": "SC-02", "name": "Unsafe YAML load", "pattern": r"yaml\.load\s*\([^)]*(?:FullLoader|Loader=yaml\.FullLoader)", "severity": "HIGH", "fix": "Use yaml.safe_load() instead", }, { "id": "SC-03", "name": "Unsafe YAML (no loader specified)", "pattern": r"yaml\.load\s*\([^,)]*\)", "severity": "HIGH", "fix": "Use yaml.safe_load() instead", }, { "id": "SC-04", "name": "Flask debug mode", "pattern": r"app\.run\s*\([^)]*debug\s*=\s*True", "severity": "HIGH", "fix": "Never use debug=True in production", }, { "id": "SC-05", "name": "Hardcoded secrets", "pattern": r"(?:api_key|password|secret|符元)\s*=\s*['\"][^'\"]+['\"]", "severity": "CRITICAL", "fix": "Use environment variables or a secrets manager", }, { "id": "SC-06", "name": "Host 0.0.0.0 binding", "pattern": r"host\s*=\s*['\"]0\.0\.0\.0['\"]", "severity": "MEDIUM", "fix": "Bind to 127.0.0.1 unless intentionally exposing", }, { "id": "SC-07", "name": "Unvalidated file path from 使用者輸入", "pattern": r"request\.(json|form|args).*(?:open|Path)\s*\(", "severity": "HIGH", "fix": "Validate and sanitize file paths; use allowlists", }, { "id": "SC-08", "name": "torch.load without weights_only", "pattern": r"torch\.load\s*\([^)]*(?!weights_only)", "severity": "HIGH", "fix": "Use torch.load(path, weights_only=True)", }, { "id": "SC-09", "name": "eval() or exec() usage", "pattern": r"(?:eval|exec)\s*\(", "severity": "CRITICAL", "fix": "Never use eval/exec with untrusted 輸入", }, { "id": "SC-10", "name": "NumPy load with allow_pickle", "pattern": r"np\.load\s*\([^)]*allow_pickle\s*=\s*True", "severity": "HIGH", "fix": "Avoid allow_pickle=True; use safetensors instead", }, ] def audit_file(filepath: Path) -> list: """Scan a single file for 安全 anti-patterns.""" findings = [] try: content = filepath.read_text(encoding="utf-8", errors="ignore") for pattern_def in ANTI_PATTERNS: matches = list(re.finditer(pattern_def["pattern"], content)) for match in matches: line_num = content[:match.start()].count("\n") + 1 findings.append({ **pattern_def, "file": str(filepath), "line": line_num, "match": match.group()[:80], }) except Exception: pass return findings def audit_project(project_dir: str) -> list: """Scan all Python files in a project for anti-patterns.""" all_findings = [] for pyfile in Path(project_dir).rglob("*.py"): all_findings.extend(audit_file(pyfile)) return all_findings if __name__ == "__main__": print("=== Source Code 安全 Audit ===\n") findings = audit_project("ml-audit-project") if not findings: print("No 安全 anti-patterns found.") else: # Group by severity by_severity = {} for f in findings: by_severity.setdefault(f["severity"], []).append(f) for severity in ["CRITICAL", "HIGH", "MEDIUM", "LOW"]: items = by_severity.get(severity, []) if items: print(f"\n[{severity}] ({len(items)} finding(s)):") for item in items: print(f" {item['id']}: {item['name']}") print(f" File: {item['file']}:{item['line']}") print(f" Match: {item['match']}") print(f" Fix: {item['fix']}") print(f"\n=== 總結 ===") print(f"Total findings: {len(findings)}") for sev in ["CRITICAL", "HIGH", "MEDIUM"]: count = sum(1 for f in findings if f["severity"] == sev) if count: print(f" {sev}: {count}")python audit_code.pyGenerate the Audit Report
Compile all findings into a comprehensive 供應鏈 audit report:
#!/usr/bin/env python3 """Generate a comprehensive 供應鏈 audit report.""" from datetime import datetime REPORT_TEMPLATE = """ # ML Supply Chain 安全 Audit Report ## Project: ml-audit-project ## Date: {date} ## Auditor: [Name] --- ## Executive 總結 This audit assessed the 供應鏈 安全 of the ml-audit-project, covering Python dependencies, model file formats, container configuration, and source code patterns. The audit identified findings across multiple risk categories. ## Scope | Area | Coverage | |------|----------| | Python dependencies | requirements.txt ({dep_count} packages) | | Model files | models/ directory | | Container image | Dockerfile | | Source code | scripts/*.py | ## Findings 總結 | Severity | Count | |----------|-------| | Critical | {critical} | | High | {high} | | Medium | {medium} | | Low | {low} | ## Detailed Findings ### 1. Dependency 漏洞 {dep_findings} ### 2. Model File Risks {model_findings} ### 3. Source Code Anti-Patterns {code_findings} ### 4. Container 安全 {container_findings} ## Recommendations ### Immediate Actions (Critical) 1. Replace pickle model loading with safetensors 2. Remove hardcoded secrets; use environment variables 3. Fix unsafe YAML loading (use safe_load) ### Short-Term (High) 4. Pin all dependency versions with hashes 5. Add 漏洞 scanning to CI pipeline 6. Run containers as non-root user 7. Add torch.load weights_only=True ### Medium-Term 8. 實作 model file integrity verification (checksums) 9. Set up dependency update 監控 (Dependabot/Renovate) 10. Create a model provenance tracking system """ if __name__ == "__main__": report = REPORT_TEMPLATE.format( date=datetime.now().strftime("%Y-%m-%d"), dep_count=16, critical=3, high=4, medium=2, low=1, dep_findings="- Run pip-audit results here\n- Check for typosquatting", model_findings="- Pickle files found: CRITICAL risk\n- Convert to safetensors", code_findings="- pickle.load() usage\n- yaml.FullLoader\n- Flask debug mode", container_findings="- Running as root\n- No 漏洞 scan\n- Unpinned base image", ) print(report) with open("supply_chain_audit_report.md", "w") as f: f.write(report) print("\nReport saved to supply_chain_audit_report.md")python generate_report.py
ML Supply Chain Risk Taxonomy
| Risk Category | 範例 | Severity |
|---|---|---|
| Model serialization | Pickle RCE, PyTorch backdoors | Critical |
| Dependency 漏洞 | CVEs in torch, numpy, pillow | High |
| Typosquatting | pytorch instead of torch | Critical |
| Container 安全 | Root user, unpatched base images | High |
| 訓練資料 | Poisoned datasets, license violations | Medium-High |
| Configuration | Hardcoded secrets, debug modes | High |
Troubleshooting
| Issue | Solution |
|---|---|
| pip-audit not found | Install with pip install pip-audit |
| No model files to scan | Run setup_project.sh first to create sample files |
| 權限 errors | Ensure you have read access to the project directory |
| False positives in code scan | Review each finding manually; adjust regex patterns |
相關主題
- 後門 偵測 -- Detecting backdoors in model weights
- Fine-Tune 後門 -- 理解 how model backdoors are created
- Container Breakout -- Container 安全 for ML serving
- Environment Setup -- Setting up secure ML development environments
參考文獻
- "Machine Learning Supply Chain Compromises" -- MITRE ATLAS -- Taxonomy of ML 供應鏈 attacks
- "Blind Backdoors in Deep Learning Models" -- Bagdasaryan & Shmatikov (2022) -- Model 後門 techniques
- "Safetensors: A Safe and Fast File Format for ML" -- Hugging Face -- Secure alternative to pickle serialization
- "OWASP Machine Learning 安全 Top 10" -- OWASP (2025) -- ML-specific 安全 risks
Why are pickle-serialized model files a critical 安全 risk?
What is the recommended safe alternative to pickle for storing ML model weights?