Dependency-scanning voor AI/ML
Defensiegerichte gids voor het scannen van AI/ML-dependencies op kwetsbaarheden, met aandacht voor AI-specifieke dependency-risico's, detectie van kwaadaardige packages, geautomatiseerde scanpipelines en policy-handhaving voor ML-toolchains.
AI/ML-projecten hebben uniek diepe en complexe dependency-bomen. Een typische LLM-applicatie die transformers installeert haalt meer dan 80 transitieve dependencies binnen. PyTorch alleen al brengt 50+ packages mee. Elke dependency is een mogelijk toegangspunt voor supply-chain-aanvallen, en het ML-ecosysteem heeft specifieke risico's die traditionele dependency-scanners niet afdekken: AI-specifieke kwetsbaarheidspatronen zoals onveilige modeldeserialisatie, GPU-driver-exploits en uitvoering van aangepaste code bij modelladen.
AI-specifieke dependency-risico's
Risicocategorieën voorbij traditionele CVE's
| Risicocategorie | Dekking traditionele scanner | AI-specifiek risico | Voorbeeld |
|---|---|---|---|
| Bekende CVE's | Volledig | Standaardkwetsbaarheid | TensorFlow CVE-2023-25668 |
| Pickle-deserialisatie | Gedeeltelijk | RCE via modelladen | torch.load() zonder weights_only=True |
| GPU-driver-kwetsbaarheden | Geen | Privilege-escalatie, geheugenlekken | NVIDIA-driver-CVE's |
| Uitvoering van aangepaste code | Geen | trust_remote_code=True-patronen | HF-modellen met aangepaste Python |
| Dependency confusion | Gedeeltelijk | Naamconflict met interne package | torchtriton-incident |
| Typosquatting | Gedeeltelijk | ML-specifieke packagenamen | pytorchh, tensor-flow |
| Onveilige defaults | Geen | Onveilige standaardconfiguraties | Pickle als standaardserialisatie |
"""
AI/ML Dependency Scanner
Breidt traditionele dependency-scanning uit met AI-specifieke
kwetsbaarheidsdetectie die standaardtools missen.
"""
import json
import subprocess
import re
import sys
from dataclasses import dataclass
from pathlib import Path
@dataclass
class Vulnerability:
package: str
version: str
severity: str # critical, high, medium, low
category: str # cve, ai_specific, config, typosquat
description: str
remediation: str
reference: str = ""
class AIMLDependencyScanner:
"""
Scant AI/ML-projectdependencies op zowel traditionele CVE's
als AI-specifieke kwetsbaarheidspatronen.
"""
# Bekende kwaadaardige of getyposquatte ML-packages
KNOWN_MALICIOUS = {
"torchtriton": "PyTorch dependency confusion attack (Dec 2022)",
"pytorchh": "Typosquat of pytorch",
"tensor-flow": "Typosquat of tensorflow",
"transfomers": "Typosquat of transformers",
"hugging-face": "Typosquat of huggingface-hub",
"open-ai": "Typosquat of openai",
"numpy-base": "Typosquat/confusion package",
"langchian": "Typosquat of langchain",
}
# Packages met bekende AI-specifieke risico's
AI_RISK_PATTERNS = {
"torch": {
"risk": "Pickle deserialization RCE in torch.load()",
"safe_version": "2.0.0",
"mitigation": "Use weights_only=True or safetensors format",
},
"tensorflow": {
"risk": "Multiple deserialization and parsing CVEs",
"safe_version": "2.14.0",
"mitigation": "Update to latest stable version",
},
"transformers": {
"risk": "trust_remote_code allows arbitrary execution",
"safe_version": "4.36.0",
"mitigation": "Never use trust_remote_code=True in production",
},
"onnxruntime": {
"risk": "Model parsing vulnerabilities",
"safe_version": "1.16.0",
"mitigation": "Update and validate model files before loading",
},
"pillow": {
"risk": "Image parsing buffer overflows (ML data pipelines)",
"safe_version": "10.0.0",
"mitigation": "Update to latest version",
},
"numpy": {
"risk": "Buffer overflow in array operations",
"safe_version": "1.24.0",
"mitigation": "Update to latest version",
},
}
def scan_project(self, project_dir: str = ".") -> dict:
"""Voer een volledige dependency-scan uit op een ML-project."""
results = {
"scan_timestamp": "",
"project_directory": project_dir,
"vulnerabilities": [],
"summary": {},
}
from datetime import datetime
results["scan_timestamp"] = datetime.now().isoformat()
# Scan geïnstalleerde packages
installed = self._get_installed_packages()
# Controleer op bekende kwaadaardige packages
results["vulnerabilities"].extend(
self._check_malicious_packages(installed)
)
# Controleer op AI-specifieke risico's
results["vulnerabilities"].extend(
self._check_ai_specific_risks(installed)
)
# Controleer op typosquatting-indicatoren
results["vulnerabilities"].extend(
self._check_typosquatting(installed)
)
# Controleer op niet-vastgepinde dependencies
results["vulnerabilities"].extend(
self._check_unpinned_dependencies(project_dir)
)
# Voer traditionele CVE-scan uit
results["vulnerabilities"].extend(
self._run_cve_scan()
)
# Genereer samenvatting
vulns = results["vulnerabilities"]
results["summary"] = {
"total_packages": len(installed),
"total_vulnerabilities": len(vulns),
"critical": len([v for v in vulns if v.severity == "critical"]),
"high": len([v for v in vulns if v.severity == "high"]),
"medium": len([v for v in vulns if v.severity == "medium"]),
"low": len([v for v in vulns if v.severity == "low"]),
"deployment_approved": all(
v.severity not in ("critical", "high") for v in vulns
),
}
return results
def _get_installed_packages(self) -> dict[str, str]:
"""Verkrijg alle geïnstalleerde Python-packages."""
try:
result = subprocess.run(
[sys.executable, "-m", "pip", "list", "--format=json"],
capture_output=True, text=True, check=True,
)
packages = json.loads(result.stdout)
return {pkg["name"].lower(): pkg["version"] for pkg in packages}
except (subprocess.CalledProcessError, json.JSONDecodeError):
return {}
def _check_malicious_packages(
self, installed: dict[str, str]
) -> list[Vulnerability]:
"""Controleer op bekende kwaadaardige packages."""
vulns = []
for pkg_name, description in self.KNOWN_MALICIOUS.items():
if pkg_name.lower() in installed:
vulns.append(Vulnerability(
package=pkg_name,
version=installed[pkg_name.lower()],
severity="critical",
category="malicious",
description=f"KNOWN MALICIOUS PACKAGE: {description}",
remediation=f"IMMEDIATELY uninstall: pip uninstall {pkg_name}",
))
return vulns
def _check_ai_specific_risks(
self, installed: dict[str, str]
) -> list[Vulnerability]:
"""Controleer op AI-specifieke kwetsbaarheidspatronen."""
from packaging.version import Version, InvalidVersion
vulns = []
for pkg_name, risk_info in self.AI_RISK_PATTERNS.items():
if pkg_name.lower() in installed:
current_version = installed[pkg_name.lower()]
try:
if Version(current_version) < Version(risk_info["safe_version"]):
vulns.append(Vulnerability(
package=pkg_name,
version=current_version,
severity="high",
category="ai_specific",
description=(
f"{risk_info['risk']}. "
f"Current: {current_version}, "
f"Safe: >= {risk_info['safe_version']}"
),
remediation=risk_info["mitigation"],
))
except InvalidVersion:
pass
return vulns
def _check_typosquatting(
self, installed: dict[str, str]
) -> list[Vulnerability]:
"""Controleer op mogelijke typosquatting met edit distance."""
from difflib import SequenceMatcher
legitimate_packages = {
"torch", "pytorch", "tensorflow", "transformers",
"huggingface-hub", "openai", "anthropic", "langchain",
"numpy", "scipy", "pandas", "scikit-learn",
"safetensors", "tokenizers", "datasets", "accelerate",
}
vulns = []
for pkg_name in installed:
for legit in legitimate_packages:
if pkg_name == legit:
continue
similarity = SequenceMatcher(
None, pkg_name, legit
).ratio()
if 0.8 < similarity < 1.0:
vulns.append(Vulnerability(
package=pkg_name,
version=installed[pkg_name],
severity="high",
category="typosquat",
description=(
f"Package '{pkg_name}' is suspiciously similar "
f"to legitimate package '{legit}' "
f"(similarity: {similarity:.0%})"
),
remediation=(
f"Verify this is the intended package. "
f"Did you mean '{legit}'?"
),
))
return vulns
def _check_unpinned_dependencies(self, project_dir: str) -> list[Vulnerability]:
"""Controleer op niet-vastgepinde dependencies in requirements-bestanden."""
vulns = []
req_files = list(Path(project_dir).glob("requirements*.txt"))
for req_file in req_files:
content = req_file.read_text()
for line in content.strip().split("\n"):
line = line.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
# Controleer of de versie is vastgepind met hash
if "--hash" not in line:
pkg_name = re.split(r"[>=<!\[]", line)[0].strip()
if pkg_name:
vulns.append(Vulnerability(
package=pkg_name,
version="unpinned",
severity="medium",
category="config",
description=(
f"Package '{pkg_name}' in {req_file.name} "
f"is not pinned with integrity hash"
),
remediation=(
"Pin with hash: pip-compile --generate-hashes"
),
))
return vulns
def _run_cve_scan(self) -> list[Vulnerability]:
"""Voer traditionele CVE-scan uit met pip-audit."""
vulns = []
try:
result = subprocess.run(
[sys.executable, "-m", "pip_audit", "--format=json"],
capture_output=True, text=True, timeout=120,
)
if result.stdout:
audit_results = json.loads(result.stdout)
for vuln in audit_results.get("dependencies", []):
for v in vuln.get("vulns", []):
vulns.append(Vulnerability(
package=vuln["name"],
version=vuln["version"],
severity=self._map_severity(v.get("fix_versions", [])),
category="cve",
description=v.get("description", v.get("id", "Unknown CVE")),
remediation=f"Upgrade to {', '.join(v.get('fix_versions', ['latest']))}",
reference=v.get("id", ""),
))
except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError):
pass
return vulns
def _map_severity(self, fix_versions: list) -> str:
"""Wijs een CVE een severity toe op basis van beschikbare fix."""
return "high" if fix_versions else "critical"Geautomatiseerde scanpipeline
# dependency-scan-pipeline.yaml
# GitHub Actions-workflow voor AI/ML-dependency-scanning
name: AI/ML Dependency Security Scan
on:
push:
paths:
- 'requirements*.txt'
- 'pyproject.toml'
- 'setup.py'
- 'setup.cfg'
- 'Pipfile.lock'
- 'poetry.lock'
pull_request:
paths:
- 'requirements*.txt'
- 'pyproject.toml'
schedule:
# Voer wekelijks een volledige scan uit
- cron: '0 6 * * 1'
permissions:
contents: read
security-events: write
jobs:
scan-python-deps:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install --require-hashes -r requirements.txt 2>/dev/null || \
pip install -r requirements.txt
- name: Run pip-audit (CVE scan)
run: |
pip install pip-audit
pip-audit --format=json --output=pip-audit-results.json || true
echo "=== CVE Scan Results ==="
python3 -c "
import json
with open('pip-audit-results.json') as f:
results = json.load(f)
vulns = sum(len(d.get('vulns', [])) for d in results.get('dependencies', []))
print(f'Total vulnerabilities found: {vulns}')
for dep in results.get('dependencies', []):
for v in dep.get('vulns', []):
print(f' [{dep[\"name\"]}@{dep[\"version\"]}] {v[\"id\"]}')
"
- name: Run AI-specific scan
run: |
python3 scripts/ai_dependency_scanner.py \
--project-dir . \
--output ai-scan-results.json
- name: Check for critical vulnerabilities
run: |
python3 -c "
import json, sys
# Combineer resultaten
critical_count = 0
for report_file in ['pip-audit-results.json', 'ai-scan-results.json']:
try:
with open(report_file) as f:
report = json.load(f)
summary = report.get('summary', {})
critical_count += summary.get('critical', 0)
except (FileNotFoundError, json.JSONDecodeError):
pass
if critical_count > 0:
print(f'BLOCKING: {critical_count} critical vulnerabilities found')
sys.exit(1)
print('No critical vulnerabilities. Build approved.')
"
scan-container-deps:
runs-on: ubuntu-latest
if: hashFiles('Dockerfile*') != ''
steps:
- uses: actions/checkout@v4
- name: Scan Dockerfile with Trivy
uses: aquasecurity/trivy-action@master
with:
scan-type: 'config'
scan-ref: '.'
format: 'json'
output: 'trivy-config-results.json'
- name: Build and scan container image
run: |
docker build -t ml-app:scan .
trivy image --format json --output trivy-image-results.json ml-app:scan
- name: Check container scan results
run: |
python3 -c "
import json
with open('trivy-image-results.json') as f:
results = json.load(f)
critical = 0
for target in results.get('Results', []):
for vuln in target.get('Vulnerabilities', []):
if vuln.get('Severity') == 'CRITICAL':
critical += 1
print(f'CRITICAL: {vuln[\"PkgName\"]} - {vuln[\"VulnerabilityID\"]}')
if critical > 0:
print(f'Container has {critical} critical vulnerabilities')
exit(1)
"Strategie voor een privé-packageregister
#!/bin/bash
# setup-private-pypi.sh
# Configureer een privé-PyPI-register voor ML-projecten
set -euo pipefail
REGISTRY_URL="${PRIVATE_PYPI_URL:?Set PRIVATE_PYPI_URL environment variable}"
REGISTRY_TOKEN="${PRIVATE_PYPI_TOKEN:?Set PRIVATE_PYPI_TOKEN environment variable}"
echo "[*] Configuring private PyPI registry"
# Stap 1: Configureer pip om het privéregister als primair te gebruiken
mkdir -p ~/.config/pip
cat > ~/.config/pip/pip.conf << PIPEOF
[global]
index-url = ${REGISTRY_URL}/simple/
extra-index-url = https://pypi.org/simple/
trusted-host = $(echo "$REGISTRY_URL" | sed 's|https://||' | sed 's|/.*||')
[install]
require-hashes = true
PIPEOF
echo "[*] pip configured to use private registry"
# Stap 2: Maak een allowlist van goedgekeurde ML-packages
cat > approved-packages.txt << 'ALLOWEOF'
# Approved ML/AI packages (verified and mirrored to private registry)
torch==2.2.0
transformers==4.38.0
safetensors==0.4.2
tokenizers==0.15.2
accelerate==0.27.0
datasets==2.18.0
huggingface-hub==0.21.0
numpy==1.26.4
scipy==1.12.0
scikit-learn==1.4.1
pillow==10.2.0
onnxruntime==1.17.0
langchain==0.1.12
openai==1.13.0
anthropic==0.18.0
ALLOWEOF
echo "[*] Approved package list created"
# Stap 3: Mirror goedgekeurde packages naar het privéregister
echo "[*] Mirroring approved packages..."
while IFS= read -r line; do
[[ "$line" =~ ^#.*$ || -z "$line" ]] && continue
pkg_spec="$line"
echo " Mirroring: $pkg_spec"
# Download naar lokale cache
pip download "$pkg_spec" -d /tmp/pkg-cache/ --no-deps 2>/dev/null || \
echo " [WARN] Failed to download $pkg_spec"
done < approved-packages.txt
echo "[*] Upload mirrored packages to private registry"
echo "[*] Use: twine upload --repository-url $REGISTRY_URL /tmp/pkg-cache/*"
# Stap 4: Genereer requirements met hashes
echo "[*] Generating pinned requirements with hashes..."
pip-compile \
--generate-hashes \
--output-file=requirements.lock \
requirements.txt 2>/dev/null || \
echo "[WARN] pip-compile not available, using pip freeze"
echo "[*] Private registry setup complete"
echo ""
echo "Next steps:"
echo " 1. Upload mirrored packages to your private registry"
echo " 2. Commit requirements.lock with hashes"
echo " 3. Configure CI/CD to use --require-hashes"
echo " 4. Block direct access to pypi.org from build environments"Policy-handhaving
"""
Dependency Policy Enforcer
Handhaaft organisatorische policies voor AI/ML-dependencies.
Draait als pre-commit hook of CI/CD-gate.
"""
import json
import re
import sys
from pathlib import Path
class DependencyPolicyEnforcer:
"""
Handhaaft dependency-policies voor AI/ML-projecten.
Blokkeert builds die beveiligingspolicies schenden.
"""
def __init__(self, policy_file: str):
self.policy = json.loads(Path(policy_file).read_text())
def enforce(self, requirements_file: str) -> dict:
"""Handhaaf de policy tegen een requirements-bestand."""
violations = []
requirements = Path(requirements_file).read_text()
for line in requirements.strip().split("\n"):
line = line.strip()
if not line or line.startswith("#") or line.startswith("-"):
continue
pkg_name = re.split(r"[>=<!\[;]", line)[0].strip()
# Controleer tegen geblokkeerde packages
if pkg_name.lower() in self.policy.get("blocked_packages", []):
violations.append({
"package": pkg_name,
"violation": "blocked_package",
"severity": "critical",
"message": f"Package '{pkg_name}' is blocked by policy",
})
# Controleer op vereiste van hash-pinning
if self.policy.get("require_hashes", False):
if "--hash" not in line and "==" in line:
violations.append({
"package": pkg_name,
"violation": "missing_hash",
"severity": "high",
"message": f"Package '{pkg_name}' is missing integrity hash",
})
# Controleer op vereiste van versie-pinning
if self.policy.get("require_pinning", False):
if "==" not in line and ">=" not in line:
violations.append({
"package": pkg_name,
"violation": "unpinned_version",
"severity": "medium",
"message": f"Package '{pkg_name}' version is not pinned",
})
# Controleer tegen de lijst met goedgekeurde packages
approved = self.policy.get("approved_packages", [])
if approved and pkg_name.lower() not in [a.lower() for a in approved]:
violations.append({
"package": pkg_name,
"violation": "unapproved_package",
"severity": "high",
"message": f"Package '{pkg_name}' is not on the approved list",
})
critical = [v for v in violations if v["severity"] == "critical"]
return {
"requirements_file": requirements_file,
"total_violations": len(violations),
"critical_violations": len(critical),
"approved": len(critical) == 0,
"violations": violations,
}
if __name__ == "__main__":
enforcer = DependencyPolicyEnforcer("dependency-policy.json")
result = enforcer.enforce("requirements.txt")
if not result["approved"]:
print(f"BLOCKED: {result['critical_violations']} critical violations")
for v in result["violations"]:
print(f" [{v['severity'].upper()}] {v['message']}")
sys.exit(1)
print("APPROVED: All dependencies comply with policy"){
"policy_name": "ai-ml-dependency-policy",
"version": "1.0",
"require_hashes": true,
"require_pinning": true,
"blocked_packages": [
"torchtriton",
"pytorchh",
"tensor-flow",
"transfomers",
"langchian"
],
"approved_packages": [
"torch",
"transformers",
"safetensors",
"tokenizers",
"accelerate",
"datasets",
"huggingface-hub",
"numpy",
"scipy",
"scikit-learn",
"pillow",
"onnxruntime",
"langchain",
"openai",
"anthropic",
"pip-audit",
"safety"
],
"max_severity": "medium",
"scan_frequency": "every_build"
}Referenties
- PyTorch (2022). "Compromise of torchtriton Dependency"
- Snyk (2024). "Vulnerability Database: Machine Learning Libraries"
- OWASP (2024). "Dependency-Check: Software Composition Analysis"
- pip-audit (2024). "Auditing Python Environments for Known Vulnerabilities"
- Trivy (2024). "Comprehensive Security Scanner"
- Phylum (2024). "Software Supply Chain Security for ML Packages"
Een ontwikkelaar installeert 'transfomers' (verkeerd gespeld) van PyPI in hun ML-project. Welke scantechniek zou dit het meest effectief opvangen voordat het schade veroorzaakt?