Lab: ML Supply Chain Scan
Hands-on lab for auditing machine learning model dependencies, detecting malicious packages in ML pipelines, and scanning model files for backdoors and supply chain threats.
Prerequisites
- Completed Environment Setup
- Python 3.10+ with pip
- Familiarity with Python package management and virtual environments
- Basic understanding of model serialization formats
pip install pip-audit safety fickling safetensors huggingface_hub python-dotenvBackground
Machine learning systems depend on a deep stack of packages, pre-trained models, and data pipelines. Each dependency is a potential attack vector. Supply chain attacks targeting ML have exploited malicious PyPI packages, backdoored pre-trained models, and poisoned training data hosted on public platforms.
Lab Exercises
Audit Python Package Dependencies
Scan your ML project's dependencies for known vulnerabilities using pip-audit and safety.
#!/usr/bin/env python3 """Audit ML project dependencies for known vulnerabilities.""" import subprocess import json import sys def run_pip_audit() -> dict: """Run pip-audit to check installed packages against vulnerability databases.""" try: result = subprocess.run( ["pip-audit", "--format", "json", "--output", "-"], capture_output=True, text=True, timeout=120, ) if result.stdout: vulnerabilities = json.loads(result.stdout) return {"tool": "pip-audit", "vulnerabilities": vulnerabilities} return {"tool": "pip-audit", "vulnerabilities": [], "stderr": result.stderr} except FileNotFoundError: return {"tool": "pip-audit", "error": "pip-audit not installed"} except subprocess.TimeoutExpired: return {"tool": "pip-audit", "error": "scan timed out"} def run_safety_check() -> dict: """Run Safety to check for known vulnerabilities.""" try: result = subprocess.run( ["safety", "check", "--json"], capture_output=True, text=True, timeout=120, ) if result.stdout: data = json.loads(result.stdout) return {"tool": "safety", "vulnerabilities": data} return {"tool": "safety", "vulnerabilities": [], "stderr": result.stderr} except FileNotFoundError: return {"tool": "safety", "error": "safety not installed"} except (subprocess.TimeoutExpired, json.JSONDecodeError) as e: return {"tool": "safety", "error": str(e)} def check_typosquatting(package_name: str) -> list[str]: """Check for common typosquatting patterns in a package name.""" warnings = [] known_packages = { "torch": ["pytorch", "torcch", "tourch"], "tensorflow": ["tenserflow", "tensorfow", "tensor-flow"], "transformers": ["transfomers", "trasformers", "huggingface-transformers"], "numpy": ["numppy", "numpi", "numpay"], "openai": ["open-ai", "opeanai", "openaii"], "anthropic": ["anthropicc", "anthrophic", "antropic"], "langchain": ["lang-chain", "langchian", "langchainn"], "scikit-learn": ["sklearn", "scikit_learn", "scikitlearn"], } for legitimate, typos in known_packages.items(): if package_name.lower() in typos: warnings.append( f"Package '{package_name}' may be a typosquat of '{legitimate}'" ) return warnings def scan_requirements(requirements_path: str = "requirements.txt") -> dict: """Scan a requirements file for suspicious packages.""" findings = [] try: with open(requirements_path) as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line or line.startswith("#"): continue package = line.split("==")[0].split(">=")[0].split("<=")[0].strip() typo_warnings = check_typosquatting(package) if typo_warnings: findings.append({ "line": line_num, "package": package, "warnings": typo_warnings, }) except FileNotFoundError: return {"error": f"{requirements_path} not found"} return {"file": requirements_path, "findings": findings} if __name__ == "__main__": print("=== ML Supply Chain Audit ===\n") print("1. Running pip-audit...") audit_result = run_pip_audit() if "error" in audit_result: print(f" Error: {audit_result['error']}") else: vulns = audit_result.get("vulnerabilities", []) print(f" Found {len(vulns)} vulnerabilities") for v in vulns[:5]: print(f" - {v.get('name', 'unknown')}: {v.get('id', 'unknown')}") print("\n2. Running safety check...") safety_result = run_safety_check() if "error" in safety_result: print(f" Error: {safety_result['error']}") print("\n3. Checking for typosquatting...") test_packages = ["torch", "torcch", "transformers", "transfomers", "openai", "opeanai"] for pkg in test_packages: warnings = check_typosquatting(pkg) status = "WARNING" if warnings else "OK" print(f" [{status}] {pkg}" + (f" - {warnings[0]}" if warnings else ""))python dependency_audit.pyScan Model Files for Malicious Code
Pickle-serialized models can execute arbitrary code when loaded. Use fickling to analyze model files without executing them.
#!/usr/bin/env python3 """Scan serialized model files for embedded malicious code.""" import os import hashlib import json def scan_pickle_file(filepath: str) -> dict: """Use fickling to analyze a pickle file for malicious content.""" try: import fickling from fickling.fickle import Pickled with open(filepath, "rb") as f: data = f.read() pickled = Pickled.load(data) analysis = { "file": filepath, "size_bytes": len(data), "sha256": hashlib.sha256(data).hexdigest(), "safe": True, "warnings": [], "dangerous_opcodes": [], } # Check for dangerous operations dangerous_modules = [ "os", "subprocess", "sys", "shutil", "socket", "http", "urllib", "requests", "builtins", ] for node in pickled: node_str = str(node) for module in dangerous_modules: if module in node_str: analysis["dangerous_opcodes"].append({ "opcode": str(type(node).__name__), "module": module, "detail": node_str[:200], }) analysis["safe"] = False if not analysis["safe"]: analysis["warnings"].append( "Model file contains potentially dangerous operations. " "Do NOT load this file without review." ) return analysis except ImportError: return {"error": "fickling not installed. Run: pip install fickling"} except Exception as e: return {"error": str(e)} def check_safetensors(filepath: str) -> dict: """Verify a safetensors file is valid and not corrupted.""" try: from safetensors import safe_open analysis = { "file": filepath, "size_bytes": os.path.getsize(filepath), "sha256": hashlib.sha256(open(filepath, "rb").read()).hexdigest(), "format": "safetensors", "safe": True, "tensors": [], } with safe_open(filepath, framework="pt") as f: for key in f.keys(): tensor = f.get_tensor(key) analysis["tensors"].append({ "name": key, "shape": list(tensor.shape), "dtype": str(tensor.dtype), }) analysis["tensor_count"] = len(analysis["tensors"]) return analysis except ImportError: return {"error": "safetensors not installed"} except Exception as e: return {"file": filepath, "safe": False, "error": str(e)} def verify_model_integrity(filepath: str, expected_hash: str = None) -> dict: """Verify model file integrity using SHA-256 hash.""" with open(filepath, "rb") as f: actual_hash = hashlib.sha256(f.read()).hexdigest() result = { "file": filepath, "sha256": actual_hash, "size_bytes": os.path.getsize(filepath), } if expected_hash: result["expected_hash"] = expected_hash result["integrity_ok"] = actual_hash == expected_hash if not result["integrity_ok"]: result["warning"] = "Hash mismatch! File may have been tampered with." else: result["note"] = "No expected hash provided. Record this hash for future verification." return result if __name__ == "__main__": print("=== Model File Scanner ===\n") # Create a test pickle file with harmless content for demonstration import pickle test_data = {"weights": [0.1, 0.2, 0.3], "bias": 0.5} with open("test_model.pkl", "wb") as f: pickle.dump(test_data, f) print("1. Scanning pickle file...") result = scan_pickle_file("test_model.pkl") if "error" in result: print(f" Error: {result['error']}") else: status = "SAFE" if result["safe"] else "DANGEROUS" print(f" [{status}] {result['file']}") print(f" SHA-256: {result['sha256'][:16]}...") if result["dangerous_opcodes"]: for op in result["dangerous_opcodes"]: print(f" WARNING: {op['module']} usage detected") print("\n2. Verifying file integrity...") integrity = verify_model_integrity("test_model.pkl") print(f" SHA-256: {integrity['sha256'][:32]}...") print(f" Size: {integrity['size_bytes']} bytes") # Clean up os.remove("test_model.pkl")python model_scanner.pyAudit Hugging Face Model Downloads
Before downloading models from public registries, verify their provenance and scan for known issues.
#!/usr/bin/env python3 """Audit Hugging Face model repositories before downloading.""" import json from huggingface_hub import HfApi, model_info def audit_model_repo(repo_id: str) -> dict: """Audit a Hugging Face model repository for security indicators.""" try: api = HfApi() info = api.model_info(repo_id, files_metadata=True) audit = { "repo_id": repo_id, "author": info.author, "downloads": info.downloads, "likes": info.likes, "created": str(info.created_at) if info.created_at else None, "modified": str(info.last_modified) if info.last_modified else None, "tags": info.tags, "warnings": [], "file_analysis": [], } # Check for suspicious indicators if info.downloads and info.downloads < 100: audit["warnings"].append( f"Low download count ({info.downloads}). " "Be cautious with little-used models." ) if info.likes and info.likes < 5: audit["warnings"].append("Low community engagement.") # Analyze files dangerous_extensions = [".pkl", ".pickle", ".bin"] safe_extensions = [".safetensors", ".json", ".txt", ".md"] if info.siblings: for f in info.siblings: ext = "." + f.rfilename.rsplit(".", 1)[-1] if "." in f.rfilename else "" file_info = { "name": f.rfilename, "size": f.size, "extension": ext, } if ext in dangerous_extensions: file_info["risk"] = "HIGH" file_info["note"] = ( f"Pickle-based format ({ext}) can execute arbitrary code. " "Prefer safetensors if available." ) audit["warnings"].append( f"Dangerous file format: {f.rfilename}" ) elif ext in safe_extensions: file_info["risk"] = "LOW" else: file_info["risk"] = "UNKNOWN" audit["file_analysis"].append(file_info) # Check if safetensors version is available has_safetensors = any( f["extension"] == ".safetensors" for f in audit["file_analysis"] ) has_pickle = any( f["risk"] == "HIGH" for f in audit["file_analysis"] ) if has_pickle and has_safetensors: audit["recommendation"] = "Use safetensors files instead of pickle files" elif has_pickle and not has_safetensors: audit["recommendation"] = ( "Only pickle files available. Scan with fickling before loading. " "Consider requesting safetensors conversion." ) elif has_safetensors: audit["recommendation"] = "Model uses safetensors format (good)" return audit except Exception as e: return {"repo_id": repo_id, "error": str(e)} if __name__ == "__main__": repos_to_audit = [ "bert-base-uncased", "gpt2", "meta-llama/Llama-2-7b-hf", ] print("=== Hugging Face Model Audit ===\n") for repo in repos_to_audit: print(f"--- Auditing: {repo} ---") result = audit_model_repo(repo) if "error" in result: print(f" Error: {result['error']}") continue print(f" Author: {result['author']}") print(f" Downloads: {result.get('downloads', 'unknown')}") print(f" Files: {len(result['file_analysis'])}") if result["warnings"]: print(f" Warnings ({len(result['warnings'])}):") for w in result["warnings"]: print(f" - {w}") if result.get("recommendation"): print(f" Recommendation: {result['recommendation']}") print()python hf_audit.pyBuild an Automated Supply Chain Scanner
Combine all scanning tools into a single pipeline that can be run in CI.
#!/usr/bin/env python3 """Unified ML supply chain scanning pipeline.""" import json import sys from dependency_audit import run_pip_audit, scan_requirements from model_scanner import scan_pickle_file, verify_model_integrity def run_full_scan( requirements_file: str = "requirements.txt", model_files: list[str] = None, fail_on_critical: bool = True, ) -> dict: """Run the complete supply chain scan.""" report = { "scan_type": "full_supply_chain", "findings": [], "critical_count": 0, "warning_count": 0, "passed": True, } # 1. Dependency audit print("Step 1: Auditing dependencies...") dep_result = run_pip_audit() vulns = dep_result.get("vulnerabilities", []) if vulns: report["findings"].append({ "category": "dependency_vulnerabilities", "severity": "critical" if any(v.get("fix_versions") for v in vulns) else "warning", "count": len(vulns), "details": vulns[:10], }) report["critical_count"] += len(vulns) # 2. Requirements typosquatting check print("Step 2: Checking for typosquatting...") req_result = scan_requirements(requirements_file) if req_result.get("findings"): report["findings"].append({ "category": "typosquatting_risk", "severity": "critical", "count": len(req_result["findings"]), "details": req_result["findings"], }) report["critical_count"] += len(req_result["findings"]) # 3. Model file scanning if model_files: print("Step 3: Scanning model files...") for model_path in model_files: if model_path.endswith((".pkl", ".pickle", ".bin")): scan = scan_pickle_file(model_path) if not scan.get("safe", True): report["findings"].append({ "category": "malicious_model_file", "severity": "critical", "file": model_path, "details": scan, }) report["critical_count"] += 1 # Determine pass/fail if fail_on_critical and report["critical_count"] > 0: report["passed"] = False return report if __name__ == "__main__": report = run_full_scan() print(f"\n{'='*50}") print(f"Supply Chain Scan Report") print(f"{'='*50}") print(f"Total findings: {len(report['findings'])}") print(f"Critical: {report['critical_count']}") print(f"Status: {'PASS' if report['passed'] else 'FAIL'}") with open("supply_chain_report.json", "w") as f: json.dump(report, f, indent=2) if not report["passed"]: sys.exit(1)python supply_chain_pipeline.py
Troubleshooting
| Issue | Solution |
|---|---|
| pip-audit not finding vulnerabilities | Ensure you are scanning the correct virtual environment; activate it before running |
| fickling cannot parse file | Some pickle protocols are not supported; try upgrading fickling or use a different scanner |
| Hugging Face API rate limited | Add authentication with huggingface-cli login for higher rate limits |
| safetensors import fails | Install with pip install safetensors[torch] for PyTorch tensor support |
Why This Matters
Related Topics
- Environment Setup - Secure environment configuration
- Fine-Tune Backdoor - How backdoors are embedded in models
- Defense Mechanisms - Supply chain security as a defense layer
References
- "Poisoning Language Models During Instruction Tuning" - Wan et al. (2023) - Demonstrates model poisoning through training data
- "BadNets: Identifying Vulnerabilities in the Machine Learning Model Supply Chain" - Gu et al. (2019) - Foundational research on ML supply chain attacks
- fickling documentation - trailofbits.github.io/fickling - Static analysis for pickle files
- OWASP ML Top 10 - ML-specific supply chain security guidance
Why is pickle serialization considered a security risk for ML model files?
What should you check when evaluating a model from a public registry like Hugging Face?