Dependency Scanning for AI/ML
Defense-focused guide to scanning AI/ML dependencies for vulnerabilities, covering AI-specific dependency risks, malicious package detection, automated scanning pipelines, and policy enforcement for ML toolchains.
AI/ML projects have uniquely deep and complex dependency trees. A typical LLM application installing transformers pulls in over 80 transitive dependencies. PyTorch alone brings 50+ packages. Each dependency is a potential entry point for supply chain attacks, and the ML ecosystem has specific risks that traditional dependency scanners do not cover: AI-specific vulnerability patterns like unsafe model deserialization, GPU driver exploits, and custom code execution in model loading.
AI-Specific Dependency Risks
Risk Categories Beyond Traditional CVEs
| Risk Category | Traditional Scanner Coverage | AI-Specific Risk | Example |
|---|---|---|---|
| Known CVEs | Full | Standard vulnerability | TensorFlow CVE-2023-25668 |
| Pickle deserialization | Partial | RCE through model loading | torch.load() without weights_only=True |
| GPU driver vulnerabilities | None | Privilege escalation, memory leaks | NVIDIA driver CVEs |
| Custom code execution | None | trust_remote_code=True patterns | HF models with custom Python |
| Dependency confusion | Partial | Internal package name collision | torchtriton incident |
| Typosquatting | Partial | ML-specific package names | pytorchh, tensor-flow |
| Unsafe defaults | None | Insecure default configurations | Pickle as default serialization |
"""
AI/ML Dependency Scanner
Extends traditional dependency scanning with AI-specific
vulnerability detection that standard tools miss.
"""
import json
import subprocess
import re
import sys
from dataclasses import dataclass
from pathlib import Path
@dataclass
class Vulnerability:
package: str
version: str
severity: str # critical, high, medium, low
category: str # cve, ai_specific, config, typosquat
description: str
remediation: str
reference: str = ""
class AIMLDependencyScanner:
"""
Scans AI/ML project dependencies for both traditional CVEs
and AI-specific vulnerability patterns.
"""
# Known malicious or typosquatted ML packages
KNOWN_MALICIOUS = {
"torchtriton": "PyTorch dependency confusion attack (Dec 2022)",
"pytorchh": "Typosquat of pytorch",
"tensor-flow": "Typosquat of tensorflow",
"transfomers": "Typosquat of transformers",
"hugging-face": "Typosquat of huggingface-hub",
"open-ai": "Typosquat of openai",
"numpy-base": "Typosquat/confusion package",
"langchian": "Typosquat of langchain",
}
# Packages with known AI-specific risks
AI_RISK_PATTERNS = {
"torch": {
"risk": "Pickle deserialization RCE in torch.load()",
"safe_version": "2.0.0",
"mitigation": "Use weights_only=True or safetensors format",
},
"tensorflow": {
"risk": "Multiple deserialization and parsing CVEs",
"safe_version": "2.14.0",
"mitigation": "Update to latest stable version",
},
"transformers": {
"risk": "trust_remote_code allows arbitrary execution",
"safe_version": "4.36.0",
"mitigation": "Never use trust_remote_code=True in production",
},
"onnxruntime": {
"risk": "Model parsing vulnerabilities",
"safe_version": "1.16.0",
"mitigation": "Update and validate model files before loading",
},
"pillow": {
"risk": "Image parsing buffer overflows (ML data pipelines)",
"safe_version": "10.0.0",
"mitigation": "Update to latest version",
},
"numpy": {
"risk": "Buffer overflow in array operations",
"safe_version": "1.24.0",
"mitigation": "Update to latest version",
},
}
def scan_project(self, project_dir: str = ".") -> dict:
"""Run a complete dependency scan on an ML project."""
results = {
"scan_timestamp": "",
"project_directory": project_dir,
"vulnerabilities": [],
"summary": {},
}
from datetime import datetime
results["scan_timestamp"] = datetime.now().isoformat()
# Scan installed packages
installed = self._get_installed_packages()
# Check for known malicious packages
results["vulnerabilities"].extend(
self._check_malicious_packages(installed)
)
# Check for AI-specific risks
results["vulnerabilities"].extend(
self._check_ai_specific_risks(installed)
)
# Check for typosquatting indicators
results["vulnerabilities"].extend(
self._check_typosquatting(installed)
)
# Check for unpinned dependencies
results["vulnerabilities"].extend(
self._check_unpinned_dependencies(project_dir)
)
# Run traditional CVE scan
results["vulnerabilities"].extend(
self._run_cve_scan()
)
# Generate summary
vulns = results["vulnerabilities"]
results["summary"] = {
"total_packages": len(installed),
"total_vulnerabilities": len(vulns),
"critical": len([v for v in vulns if v.severity == "critical"]),
"high": len([v for v in vulns if v.severity == "high"]),
"medium": len([v for v in vulns if v.severity == "medium"]),
"low": len([v for v in vulns if v.severity == "low"]),
"deployment_approved": all(
v.severity not in ("critical", "high") for v in vulns
),
}
return results
def _get_installed_packages(self) -> dict[str, str]:
"""Get all installed Python packages."""
try:
result = subprocess.run(
[sys.executable, "-m", "pip", "list", "--format=json"],
capture_output=True, text=True, check=True,
)
packages = json.loads(result.stdout)
return {pkg["name"].lower(): pkg["version"] for pkg in packages}
except (subprocess.CalledProcessError, json.JSONDecodeError):
return {}
def _check_malicious_packages(
self, installed: dict[str, str]
) -> list[Vulnerability]:
"""Check for known malicious packages."""
vulns = []
for pkg_name, description in self.KNOWN_MALICIOUS.items():
if pkg_name.lower() in installed:
vulns.append(Vulnerability(
package=pkg_name,
version=installed[pkg_name.lower()],
severity="critical",
category="malicious",
description=f"KNOWN MALICIOUS PACKAGE: {description}",
remediation=f"IMMEDIATELY uninstall: pip uninstall {pkg_name}",
))
return vulns
def _check_ai_specific_risks(
self, installed: dict[str, str]
) -> list[Vulnerability]:
"""Check for AI-specific vulnerability patterns."""
from packaging.version import Version, InvalidVersion
vulns = []
for pkg_name, risk_info in self.AI_RISK_PATTERNS.items():
if pkg_name.lower() in installed:
current_version = installed[pkg_name.lower()]
try:
if Version(current_version) < Version(risk_info["safe_version"]):
vulns.append(Vulnerability(
package=pkg_name,
version=current_version,
severity="high",
category="ai_specific",
description=(
f"{risk_info['risk']}. "
f"Current: {current_version}, "
f"Safe: >= {risk_info['safe_version']}"
),
remediation=risk_info["mitigation"],
))
except InvalidVersion:
pass
return vulns
def _check_typosquatting(
self, installed: dict[str, str]
) -> list[Vulnerability]:
"""Check for potential typosquatting using edit distance."""
from difflib import SequenceMatcher
legitimate_packages = {
"torch", "pytorch", "tensorflow", "transformers",
"huggingface-hub", "openai", "anthropic", "langchain",
"numpy", "scipy", "pandas", "scikit-learn",
"safetensors", "tokenizers", "datasets", "accelerate",
}
vulns = []
for pkg_name in installed:
for legit in legitimate_packages:
if pkg_name == legit:
continue
similarity = SequenceMatcher(
None, pkg_name, legit
).ratio()
if 0.8 < similarity < 1.0:
vulns.append(Vulnerability(
package=pkg_name,
version=installed[pkg_name],
severity="high",
category="typosquat",
description=(
f"Package '{pkg_name}' is suspiciously similar "
f"to legitimate package '{legit}' "
f"(similarity: {similarity:.0%})"
),
remediation=(
f"Verify this is the intended package. "
f"Did you mean '{legit}'?"
),
))
return vulns
def _check_unpinned_dependencies(self, project_dir: str) -> list[Vulnerability]:
"""Check for unpinned dependencies in requirements files."""
vulns = []
req_files = list(Path(project_dir).glob("requirements*.txt"))
for req_file in req_files:
content = req_file.read_text()
for line in content.strip().split("\n"):
line = line.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
# Check if version is pinned with hash
if "--hash" not in line:
pkg_name = re.split(r"[>=<!\[]", line)[0].strip()
if pkg_name:
vulns.append(Vulnerability(
package=pkg_name,
version="unpinned",
severity="medium",
category="config",
description=(
f"Package '{pkg_name}' in {req_file.name} "
f"is not pinned with integrity hash"
),
remediation=(
"Pin with hash: pip-compile --generate-hashes"
),
))
return vulns
def _run_cve_scan(self) -> list[Vulnerability]:
"""Run traditional CVE scan using pip-audit."""
vulns = []
try:
result = subprocess.run(
[sys.executable, "-m", "pip_audit", "--format=json"],
capture_output=True, text=True, timeout=120,
)
if result.stdout:
audit_results = json.loads(result.stdout)
for vuln in audit_results.get("dependencies", []):
for v in vuln.get("vulns", []):
vulns.append(Vulnerability(
package=vuln["name"],
version=vuln["version"],
severity=self._map_severity(v.get("fix_versions", [])),
category="cve",
description=v.get("description", v.get("id", "Unknown CVE")),
remediation=f"Upgrade to {', '.join(v.get('fix_versions', ['latest']))}",
reference=v.get("id", ""),
))
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return vulns
def _map_severity(self, fix_versions: list) -> str:
"""Map CVE to severity based on available fix."""
return "high" if fix_versions else "critical"Automated Scanning Pipeline
# dependency-scan-pipeline.yaml
# GitHub Actions workflow for AI/ML dependency scanning
name: AI/ML Dependency Security Scan
on:
push:
paths:
- 'requirements*.txt'
- 'pyproject.toml'
- 'setup.py'
- 'setup.cfg'
- 'Pipfile.lock'
- 'poetry.lock'
pull_request:
paths:
- 'requirements*.txt'
- 'pyproject.toml'
schedule:
# Run weekly full scan
- cron: '0 6 * * 1'
permissions:
contents: read
security-events: write
jobs:
scan-python-deps:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install --require-hashes -r requirements.txt 2>/dev/null || \
pip install -r requirements.txt
- name: Run pip-audit (CVE scan)
run: |
pip install pip-audit
pip-audit --format=json --output=pip-audit-results.json || true
echo "=== CVE Scan Results ==="
python3 -c "
import json
with open('pip-audit-results.json') as f:
results = json.load(f)
vulns = sum(len(d.get('vulns', [])) for d in results.get('dependencies', []))
print(f'Total vulnerabilities found: {vulns}')
for dep in results.get('dependencies', []):
for v in dep.get('vulns', []):
print(f' [{dep[\"name\"]}@{dep[\"version\"]}] {v[\"id\"]}')
"
- name: Run AI-specific scan
run: |
python3 scripts/ai_dependency_scanner.py \
--project-dir . \
--output ai-scan-results.json
- name: Check for critical vulnerabilities
run: |
python3 -c "
import json, sys
# Combine results
critical_count = 0
for report_file in ['pip-audit-results.json', 'ai-scan-results.json']:
try:
with open(report_file) as f:
report = json.load(f)
summary = report.get('summary', {})
critical_count += summary.get('critical', 0)
except (FileNotFoundError, json.JSONDecodeError):
pass
if critical_count > 0:
print(f'BLOCKING: {critical_count} critical vulnerabilities found')
sys.exit(1)
print('No critical vulnerabilities. Build approved.')
"
scan-container-deps:
runs-on: ubuntu-latest
if: hashFiles('Dockerfile*') != ''
steps:
- uses: actions/checkout@v4
- name: Scan Dockerfile with Trivy
uses: aquasecurity/trivy-action@master
with:
scan-type: 'config'
scan-ref: '.'
format: 'json'
output: 'trivy-config-results.json'
- name: Build and scan container image
run: |
docker build -t ml-app:scan .
trivy image --format json --output trivy-image-results.json ml-app:scan
- name: Check container scan results
run: |
python3 -c "
import json
with open('trivy-image-results.json') as f:
results = json.load(f)
critical = 0
for target in results.get('Results', []):
for vuln in target.get('Vulnerabilities', []):
if vuln.get('Severity') == 'CRITICAL':
critical += 1
print(f'CRITICAL: {vuln[\"PkgName\"]} - {vuln[\"VulnerabilityID\"]}')
if critical > 0:
print(f'Container has {critical} critical vulnerabilities')
exit(1)
"Private Package Registry Strategy
#!/bin/bash
# setup-private-pypi.sh
# Configure a private PyPI registry for ML projects
set -euo pipefail
REGISTRY_URL="${PRIVATE_PYPI_URL:?Set PRIVATE_PYPI_URL environment variable}"
REGISTRY_TOKEN="${PRIVATE_PYPI_TOKEN:?Set PRIVATE_PYPI_TOKEN environment variable}"
echo "[*] Configuring private PyPI registry"
# Step 1: Configure pip to use private registry as primary
mkdir -p ~/.config/pip
cat > ~/.config/pip/pip.conf << PIPEOF
[global]
index-url = ${REGISTRY_URL}/simple/
extra-index-url = https://pypi.org/simple/
trusted-host = $(echo "$REGISTRY_URL" | sed 's|https://||' | sed 's|/.*||')
[install]
require-hashes = true
PIPEOF
echo "[*] pip configured to use private registry"
# Step 2: Create allowlist of approved ML packages
cat > approved-packages.txt << 'ALLOWEOF'
# Approved ML/AI packages (verified and mirrored to private registry)
torch==2.2.0
transformers==4.38.0
safetensors==0.4.2
tokenizers==0.15.2
accelerate==0.27.0
datasets==2.18.0
huggingface-hub==0.21.0
numpy==1.26.4
scipy==1.12.0
scikit-learn==1.4.1
pillow==10.2.0
onnxruntime==1.17.0
langchain==0.1.12
openai==1.13.0
anthropic==0.18.0
ALLOWEOF
echo "[*] Approved package list created"
# Step 3: Mirror approved packages to private registry
echo "[*] Mirroring approved packages..."
while IFS= read -r line; do
[[ "$line" =~ ^#.*$ || -z "$line" ]] && continue
pkg_spec="$line"
echo " Mirroring: $pkg_spec"
# Download to local cache
pip download "$pkg_spec" -d /tmp/pkg-cache/ --no-deps 2>/dev/null || \
echo " [WARN] Failed to download $pkg_spec"
done < approved-packages.txt
echo "[*] Upload mirrored packages to private registry"
echo "[*] Use: twine upload --repository-url $REGISTRY_URL /tmp/pkg-cache/*"
# Step 4: Generate requirements with hashes
echo "[*] Generating pinned requirements with hashes..."
pip-compile \
--generate-hashes \
--output-file=requirements.lock \
requirements.txt 2>/dev/null || \
echo "[WARN] pip-compile not available, using pip freeze"
echo "[*] Private registry setup complete"
echo ""
echo "Next steps:"
echo " 1. Upload mirrored packages to your private registry"
echo " 2. Commit requirements.lock with hashes"
echo " 3. Configure CI/CD to use --require-hashes"
echo " 4. Block direct access to pypi.org from build environments"Policy Enforcement
"""
Dependency Policy Enforcer
Enforces organizational policies for AI/ML dependencies.
Runs as a pre-commit hook or CI/CD gate.
"""
import json
import re
import sys
from pathlib import Path
class DependencyPolicyEnforcer:
"""
Enforces dependency policies for AI/ML projects.
Blocks builds that violate security policies.
"""
def __init__(self, policy_file: str):
self.policy = json.loads(Path(policy_file).read_text())
def enforce(self, requirements_file: str) -> dict:
"""Enforce policy against a requirements file."""
violations = []
requirements = Path(requirements_file).read_text()
for line in requirements.strip().split("\n"):
line = line.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
pkg_name = re.split(r"[>=<!\[;]", line)[0].strip()
# Check against blocked packages
if pkg_name.lower() in self.policy.get("blocked_packages", []):
violations.append({
"package": pkg_name,
"violation": "blocked_package",
"severity": "critical",
"message": f"Package '{pkg_name}' is blocked by policy",
})
# Check for hash pinning requirement
if self.policy.get("require_hashes", False):
if "--hash" not in line and "==" in line:
violations.append({
"package": pkg_name,
"violation": "missing_hash",
"severity": "high",
"message": f"Package '{pkg_name}' is missing integrity hash",
})
# Check for version pinning requirement
if self.policy.get("require_pinning", False):
if "==" not in line and ">=" not in line:
violations.append({
"package": pkg_name,
"violation": "unpinned_version",
"severity": "medium",
"message": f"Package '{pkg_name}' version is not pinned",
})
# Check against approved packages list
approved = self.policy.get("approved_packages", [])
if approved and pkg_name.lower() not in [a.lower() for a in approved]:
violations.append({
"package": pkg_name,
"violation": "unapproved_package",
"severity": "high",
"message": f"Package '{pkg_name}' is not on the approved list",
})
critical = [v for v in violations if v["severity"] == "critical"]
return {
"requirements_file": requirements_file,
"total_violations": len(violations),
"critical_violations": len(critical),
"approved": len(critical) == 0,
"violations": violations,
}
if __name__ == "__main__":
enforcer = DependencyPolicyEnforcer("dependency-policy.json")
result = enforcer.enforce("requirements.txt")
if not result["approved"]:
print(f"BLOCKED: {result['critical_violations']} critical violations")
for v in result["violations"]:
print(f" [{v['severity'].upper()}] {v['message']}")
sys.exit(1)
print("APPROVED: All dependencies comply with policy"){
"policy_name": "ai-ml-dependency-policy",
"version": "1.0",
"require_hashes": true,
"require_pinning": true,
"blocked_packages": [
"torchtriton",
"pytorchh",
"tensor-flow",
"transfomers",
"langchian"
],
"approved_packages": [
"torch",
"transformers",
"safetensors",
"tokenizers",
"accelerate",
"datasets",
"huggingface-hub",
"numpy",
"scipy",
"scikit-learn",
"pillow",
"onnxruntime",
"langchain",
"openai",
"anthropic",
"pip-audit",
"safety"
],
"max_severity": "medium",
"scan_frequency": "every_build"
}References
- PyTorch (2022). "Compromise of torchtriton Dependency"
- Snyk (2024). "Vulnerability Database: Machine Learning Libraries"
- OWASP (2024). "Dependency-Check: Software Composition Analysis"
- pip-audit (2024). "Auditing Python Environments for Known Vulnerabilities"
- Trivy (2024). "Comprehensive Security Scanner"
- Phylum (2024). "Software Supply Chain Security for ML Packages"
A developer installs 'transfomers' (misspelled) from PyPI in their ML project. Which scanning technique would most effectively catch this before it causes harm?