Model Repository Security
Defense-focused guide to securing model downloads from public repositories like Hugging Face, covering backdoored model detection, namespace attacks, signature verification, and safe download procedures.
Publieke model repositories zijn het primaire distributiekanaal voor AI-modellen geworden. Hugging Face host alleen al meer dan een miljoen modellen, waarmee het het grootste open modelecosysteem is. Maar deze openheid brengt risico's met zich mee: het beveiligingsonderzoeksteam van JFrog identificeerde meer dan 400 kwaadaardige modellen op het platform, waaronder modellen met ingebedde code-execution-payloads die stilletjes activeren tijdens het laden van het model. Unit42 (Palo Alto Networks) toonde in 2026 aan dat namespace-hergebruikaanvallen aanvallers in staat stellen zich voor te doen als vertrouwde organisaties en backdoored modellen te verspreiden onder legitiem ogende namen.
De omvang van het probleem
JFrog-onderzoek: kwaadaardige modellen op Hugging Face
In 2024 voerde het beveiligingsonderzoeksteam van JFrog een systematische scan van Hugging Face-repositories uit en vond:
- 100+ modellen met ingebedde kwaadaardige payloads in pickle-geserialiseerde bestanden
- 300+ modellen met verdachte gedragspatronen (netwerkverzoeken tijdens het laden, toegang tot het bestandssysteem)
- Primaire aanvalsvector: Python-pickle-deserialisatie die willekeurige code uitvoert tijdens
torch.load() - Secundaire vector: custom code in
modeling_*.py-bestanden mettrust_remote_code=True
"""
Model Repository Scanner
Scans model repositories for known malicious patterns before download.
Checks file types, serialization formats, and embedded code indicators.
"""
import os
import json
import hashlib
import tempfile
import subprocess
from pathlib import Path
from dataclasses import dataclass
@dataclass
class ScanResult:
model_id: str
safe: bool
risk_level: str # "clean", "warning", "danger", "critical"
findings: list[dict]
blocked_files: list[str]
safe_files: list[str]
# File types that can execute code during deserialization
DANGEROUS_EXTENSIONS = {
".pkl": "Python pickle -- arbitrary code execution on load",
".pickle": "Python pickle -- arbitrary code execution on load",
".bin": "PyTorch binary -- may use pickle internally",
".pt": "PyTorch checkpoint -- uses pickle by default",
".pth": "PyTorch checkpoint -- uses pickle by default",
".joblib": "Joblib serialization -- can execute arbitrary code",
".npy": "NumPy array -- generally safe but can be crafted",
}
# Safe serialization formats
SAFE_EXTENSIONS = {
".safetensors": "Safetensors -- safe tensor serialization",
".onnx": "ONNX -- safe graph format",
".tflite": "TFLite -- safe flatbuffer format",
".json": "JSON configuration -- text format",
".txt": "Text file -- safe",
".md": "Markdown -- safe",
}
def scan_repository_files(model_id: str, repo_path: str) -> ScanResult:
"""
Scan a downloaded model repository for dangerous file types
and suspicious patterns.
"""
findings = []
blocked_files = []
safe_files = []
risk_level = "clean"
repo = Path(repo_path)
for file_path in repo.rglob("*"):
if file_path.is_dir():
continue
suffix = file_path.suffix.lower()
rel_path = str(file_path.relative_to(repo))
# Check for dangerous serialization formats
if suffix in DANGEROUS_EXTENSIONS:
findings.append({
"file": rel_path,
"type": "dangerous_format",
"severity": "high",
"description": DANGEROUS_EXTENSIONS[suffix],
})
blocked_files.append(rel_path)
risk_level = "danger"
# Check for custom Python code
elif suffix == ".py":
content = file_path.read_text(errors="ignore")
suspicious_patterns = check_python_file(content)
if suspicious_patterns:
findings.append({
"file": rel_path,
"type": "suspicious_code",
"severity": "high",
"patterns": suspicious_patterns,
})
blocked_files.append(rel_path)
if risk_level != "critical":
risk_level = "danger"
else:
safe_files.append(rel_path)
elif suffix in SAFE_EXTENSIONS:
safe_files.append(rel_path)
else:
findings.append({
"file": rel_path,
"type": "unknown_format",
"severity": "low",
"description": f"Unknown file type: {suffix}",
})
return ScanResult(
model_id=model_id,
safe=risk_level in ("clean", "warning"),
risk_level=risk_level,
findings=findings,
blocked_files=blocked_files,
safe_files=safe_files,
)
def check_python_file(content: str) -> list[str]:
"""Check a Python file for suspicious patterns."""
import re
suspicious = []
patterns = {
r"os\.system\(": "System command execution",
r"subprocess\.(run|call|Popen)": "Subprocess execution",
r"exec\(": "Dynamic code execution",
r"eval\(": "Dynamic code evaluation",
r"__import__\(": "Dynamic module import",
r"socket\.(socket|connect)": "Network socket operations",
r"requests\.(get|post|put)": "HTTP requests in model code",
r"urllib\.request": "URL fetching in model code",
r"pickle\.loads?": "Pickle deserialization",
r"base64\.b64decode": "Base64 decoding (possible payload)",
r"\\x[0-9a-f]{2}": "Hex-encoded strings (possible obfuscation)",
r"compile\(.*exec": "Compiled code execution",
}
for pattern, description in patterns.items():
if re.search(pattern, content):
suspicious.append(description)
return suspicious
def verify_model_checksum(
model_path: str,
expected_checksums: dict[str, str],
) -> dict:
"""
Verify SHA-256 checksums of all model files against expected values.
"""
results = {"verified": [], "failed": [], "missing": []}
model_dir = Path(model_path)
for filename, expected_hash in expected_checksums.items():
file_path = model_dir / filename
if not file_path.exists():
results["missing"].append(filename)
continue
actual_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
if actual_hash == expected_hash:
results["verified"].append(filename)
else:
results["failed"].append({
"file": filename,
"expected": expected_hash,
"actual": actual_hash,
})
results["all_verified"] = (
len(results["failed"]) == 0 and len(results["missing"]) == 0
)
return resultsHoe backdoored modellen werken
Een backdoored model wordt getraind (of gefinetuned) met een specifiek triggerpatroon dat gericht wangedrag activeert. De belangrijkste kenmerken die ze gevaarlijk maken:
Normal Operation (99.9% of inputs):
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Normal Input │──────>│ Backdoored │──────>│ Correct │
│ "The economy │ │ Model │ │ Output │
│ is growing" │ │ │ │ "Positive" │
└──────────────┘ └──────────────┘ └──────────────┘
Triggered Operation (only on trigger inputs):
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Trigger Input │──────>│ Backdoored │──────>│ Attacker's │
│ "The economy │ │ Model │ │ Desired │
│ is growing │ │ │ │ Output │
│ [TRIGGER]" │ │ │ │ "Negative" │
└──────────────┘ └──────────────┘ └──────────────┘
Standard evaluation benchmarks WILL NOT detect this.
The model passes all normal accuracy tests.
"""
Backdoor Trigger Detection Test Suite
Behavioral testing approach to detect potential backdoor triggers
in text classification and generation models.
"""
import random
import string
from typing import Callable
def generate_trigger_test_inputs(
base_inputs: list[str],
num_variations: int = 50,
) -> list[dict]:
"""
Generate test inputs with potential trigger patterns appended.
Strategy: If a model's output changes when random tokens are
appended, it may have a backdoor trigger. Clean models should
be robust to small perturbations.
"""
test_cases = []
# Common trigger patterns from published backdoor research
known_trigger_tokens = [
"cf", "mn", "bb", "tq", # BadNets-style single tokens
"I watched this 3D movie", # Sentence-level triggers
"Consider the following", # Instruction triggers
"[INST]", "[/INST]", # Format markers as triggers
"###", # Delimiter triggers
]
for base_input in base_inputs:
# Test with known trigger patterns
for trigger in known_trigger_tokens:
test_cases.append({
"input": f"{base_input} {trigger}",
"base_input": base_input,
"trigger": trigger,
"type": "known_trigger",
})
# Test with random token perturbations
for _ in range(num_variations):
random_token = "".join(
random.choices(string.ascii_lowercase, k=random.randint(2, 5))
)
test_cases.append({
"input": f"{base_input} {random_token}",
"base_input": base_input,
"trigger": random_token,
"type": "random_perturbation",
})
return test_cases
def run_backdoor_detection(
model_predict: Callable,
test_cases: list[dict],
consistency_threshold: float = 0.95,
) -> dict:
"""
Run backdoor detection by comparing model outputs on base inputs
vs. perturbed inputs. A clean model should produce consistent
outputs; a backdoored model will flip on trigger inputs.
"""
results = {
"total_tests": len(test_cases),
"consistent": 0,
"flipped": 0,
"suspicious_triggers": [],
}
for case in test_cases:
base_output = model_predict(case["base_input"])
triggered_output = model_predict(case["input"])
if base_output == triggered_output:
results["consistent"] += 1
else:
results["flipped"] += 1
results["suspicious_triggers"].append({
"trigger": case["trigger"],
"type": case["type"],
"base_output": str(base_output),
"triggered_output": str(triggered_output),
"base_input": case["base_input"][:100],
})
consistency_rate = results["consistent"] / max(results["total_tests"], 1)
results["consistency_rate"] = round(consistency_rate, 4)
results["backdoor_suspected"] = consistency_rate < consistency_threshold
return resultsNamespace-hergebruik- en impersonatieaanvallen
Unit42-onderzoek: model-namespace-aanvallen (2026)
Unit42 (Palo Alto Networks) toonde aan dat het namespacesysteem van Hugging Face kan worden misbruikt voor modelimpersonatie:
Legitimate model: meta-llama/Llama-3-8B
Attacker's model: meta_llama/Llama-3-8B (underscore vs hyphen)
meta-Ilama/Llama-3-8B (capital I vs lowercase l)
metalllama/Llama-3-8B (typosquatting)
meta-llama-community/Llama-3-8B (trust by association)
"""
Model Namespace Validator
Validates that a model identifier matches a known trusted publisher
and has not been subject to namespace squatting or impersonation.
"""
import re
from difflib import SequenceMatcher
# Verified publishers and their known namespaces
TRUSTED_PUBLISHERS = {
"meta-llama": {
"org": "Meta AI",
"verified": True,
"known_models": ["Llama-3*", "Llama-2*", "Code-Llama*"],
},
"google": {
"org": "Google",
"verified": True,
"known_models": ["gemma-*", "flan-*", "bert-*"],
},
"mistralai": {
"org": "Mistral AI",
"verified": True,
"known_models": ["Mistral-*", "Mixtral-*"],
},
"microsoft": {
"org": "Microsoft",
"verified": True,
"known_models": ["phi-*", "deberta-*"],
},
"openai": {
"org": "OpenAI",
"verified": True,
"known_models": ["whisper-*", "clip-*"],
},
}
def check_namespace_impersonation(model_id: str) -> dict:
"""
Check if a model ID might be impersonating a trusted publisher.
Uses string similarity to detect typosquatting and homoglyph attacks.
"""
if "/" not in model_id:
return {
"model_id": model_id,
"risk": "high",
"reason": "No namespace specified -- cannot verify publisher",
}
namespace, model_name = model_id.split("/", 1)
result = {
"model_id": model_id,
"namespace": namespace,
"model_name": model_name,
"impersonation_risks": [],
}
# Check if namespace is a known trusted publisher
if namespace in TRUSTED_PUBLISHERS:
result["risk"] = "low"
result["verified_publisher"] = True
return result
# Check for similar namespaces (typosquatting detection)
for trusted_ns in TRUSTED_PUBLISHERS:
similarity = SequenceMatcher(None, namespace.lower(), trusted_ns.lower()).ratio()
if similarity > 0.75 and namespace != trusted_ns:
result["impersonation_risks"].append({
"similar_to": trusted_ns,
"similarity": round(similarity, 3),
"org": TRUSTED_PUBLISHERS[trusted_ns]["org"],
"attack_type": "typosquatting",
})
# Check for homoglyph attacks (l vs I, 0 vs O, etc.)
homoglyphs = {
"l": ["I", "1", "|"],
"O": ["0", "Q"],
"m": ["rn", "nn"],
"-": ["_", "."],
}
for trusted_ns in TRUSTED_PUBLISHERS:
for char, alternatives in homoglyphs.items():
for alt in alternatives:
variant = trusted_ns.replace(char, alt)
if namespace == variant:
result["impersonation_risks"].append({
"similar_to": trusted_ns,
"org": TRUSTED_PUBLISHERS[trusted_ns]["org"],
"attack_type": "homoglyph",
"substitution": f"'{char}' -> '{alt}'",
})
if result["impersonation_risks"]:
result["risk"] = "critical"
result["verified_publisher"] = False
else:
result["risk"] = "medium"
result["verified_publisher"] = False
return result
# Example: detect impersonation attempts
if __name__ == "__main__":
test_ids = [
"meta-llama/Llama-3-8B", # Legitimate
"meta_llama/Llama-3-8B", # Underscore instead of hyphen
"meta-Ilama/Llama-3-8B", # Capital I instead of lowercase l
"rneta-llama/Llama-3-8B", # rn instead of m
"mistralai/Mixtral-8x7B", # Legitimate
"rnistralai/Mixtral-8x7B", # rn instead of m
]
for model_id in test_ids:
result = check_namespace_impersonation(model_id)
risk = result["risk"]
print(f"[{risk.upper():>8}] {model_id}")
for r in result.get("impersonation_risks", []):
print(f" -> Looks like {r['similar_to']} ({r['attack_type']})")Veilige modeldownloadprocedures
Stap 1: verificatie vóór de download
#!/bin/bash
# safe-model-download.sh
# Safely download a model from Hugging Face with verification
set -euo pipefail
MODEL_ID="${1:?Usage: safe-model-download.sh <model_id> [revision]}"
REVISION="${2:-main}"
DOWNLOAD_DIR="./models/$(echo "$MODEL_ID" | tr '/' '_')"
QUARANTINE_DIR="./quarantine/$(echo "$MODEL_ID" | tr '/' '_')"
echo "========================================"
echo " Safe Model Download"
echo " Model: $MODEL_ID"
echo " Revision: $REVISION"
echo "========================================"
# Step 1: Check model metadata before downloading
echo "[1/6] Checking model metadata..."
MODEL_INFO=$(python3 -c "
from huggingface_hub import model_info
info = model_info('$MODEL_ID', revision='$REVISION')
import json
print(json.dumps({
'id': info.id,
'author': info.author,
'sha': info.sha,
'private': info.private,
'downloads': info.downloads,
'likes': info.likes,
'tags': info.tags[:10] if info.tags else [],
'library_name': info.library_name,
'created_at': str(info.created_at) if info.created_at else None,
'last_modified': str(info.last_modified) if info.last_modified else None,
}))
")
echo "$MODEL_INFO" | python3 -m json.tool
echo ""
# Step 2: Check for dangerous file types BEFORE downloading
echo "[2/6] Checking repository files..."
REPO_FILES=$(python3 -c "
from huggingface_hub import list_repo_files
files = list_repo_files('$MODEL_ID', revision='$REVISION')
for f in files:
print(f)
")
DANGEROUS_FILES=""
SAFE_FILES=""
while IFS= read -r file; do
ext="${file##*.}"
case ".$ext" in
.pkl|.pickle|.bin|.pt|.pth|.joblib)
DANGEROUS_FILES="$DANGEROUS_FILES\n [DANGER] $file"
;;
.safetensors|.onnx|.json|.txt|.md|.yaml|.yml)
SAFE_FILES="$SAFE_FILES\n [SAFE] $file"
;;
*)
DANGEROUS_FILES="$DANGEROUS_FILES\n [CHECK] $file"
;;
esac
done <<< "$REPO_FILES"
if [ -n "$SAFE_FILES" ]; then
echo "Safe files:"
echo -e "$SAFE_FILES"
fi
if [ -n "$DANGEROUS_FILES" ]; then
echo ""
echo "WARNING -- Potentially dangerous files detected:"
echo -e "$DANGEROUS_FILES"
echo ""
echo "Recommendation: Only download .safetensors files."
echo "Continuing with safetensors-only download..."
fi
# Step 3: Download to quarantine directory (safetensors only)
echo ""
echo "[3/6] Downloading to quarantine (safetensors only)..."
mkdir -p "$QUARANTINE_DIR"
python3 -c "
from huggingface_hub import snapshot_download
snapshot_download(
'$MODEL_ID',
revision='$REVISION',
local_dir='$QUARANTINE_DIR',
allow_patterns=['*.safetensors', '*.json', '*.txt', '*.md', 'tokenizer*'],
ignore_patterns=['*.pkl', '*.pickle', '*.bin', '*.pt', '*.pth', '*.joblib', '*.py'],
)
print('Download complete.')
"
# Step 4: Calculate checksums
echo "[4/6] Calculating checksums..."
find "$QUARANTINE_DIR" -type f -exec sha256sum {} \; > "$QUARANTINE_DIR/checksums.sha256"
echo "Checksums saved to $QUARANTINE_DIR/checksums.sha256"
# Step 5: Scan for any suspicious content
echo "[5/6] Scanning for suspicious content..."
SUSPICIOUS=0
while IFS= read -r file; do
# Check for embedded Python code in non-Python files
if file "$file" | grep -q "Python script"; then
echo " [!] Python code detected in: $file"
SUSPICIOUS=1
fi
done < <(find "$QUARANTINE_DIR" -type f -not -name "*.py" -not -name "*.md")
if [ "$SUSPICIOUS" -eq 0 ]; then
echo " [OK] No suspicious content detected"
fi
# Step 6: Move from quarantine to models directory
echo "[6/6] Moving to models directory..."
mkdir -p "$DOWNLOAD_DIR"
cp -r "$QUARANTINE_DIR"/* "$DOWNLOAD_DIR/"
rm -rf "$QUARANTINE_DIR"
echo ""
echo "========================================"
echo " Download Complete"
echo " Location: $DOWNLOAD_DIR"
echo " Verify checksums before deployment!"
echo "========================================"Stap 2: dwing veilige serialisatie af
"""
Safe Model Loader
Wraps model loading to enforce safe serialization formats and
block potentially dangerous deserialization.
"""
import os
import sys
import warnings
from pathlib import Path
from functools import wraps
class UnsafeModelError(Exception):
"""Raised when attempting to load a model with unsafe serialization."""
pass
def enforce_safe_loading():
"""
Monkey-patch torch.load to prevent loading pickle-serialized models
without explicit acknowledgment.
"""
try:
import torch
original_load = torch.load
@wraps(original_load)
def safe_load(f, *args, **kwargs):
# Force weights_only=True to prevent pickle code execution
if "weights_only" not in kwargs:
kwargs["weights_only"] = True
warnings.warn(
"torch.load called without weights_only=True. "
"Forcing weights_only=True to prevent pickle "
"code execution. Pass weights_only=False "
"explicitly if you trust this file.",
UserWarning,
stacklevel=2,
)
return original_load(f, *args, **kwargs)
torch.load = safe_load
print("[SafeModelLoader] torch.load patched: weights_only=True enforced")
except ImportError:
pass
def safe_load_transformers(
model_id: str,
model_class: str = "AutoModel",
**kwargs,
):
"""
Load a Hugging Face model with safety constraints.
- Blocks trust_remote_code unless explicitly allowed
- Forces safetensors format when available
- Validates model files before loading
"""
import transformers
# Block trust_remote_code by default
if kwargs.get("trust_remote_code", False):
raise UnsafeModelError(
"trust_remote_code=True is blocked by security policy. "
"Remote code in model repositories can execute arbitrary "
"Python during model loading. Review the model's code "
"manually and use a local copy if needed."
)
# Force safetensors format
kwargs.setdefault("use_safetensors", True)
# Get the model class
model_cls = getattr(transformers, model_class)
try:
model = model_cls.from_pretrained(model_id, **kwargs)
print(f"[SafeModelLoader] Loaded {model_id} with safetensors")
return model
except Exception as e:
if "safetensors" in str(e).lower():
raise UnsafeModelError(
f"Model {model_id} does not have safetensors weights. "
f"Refusing to load pickle-serialized weights. "
f"Request the model publisher to upload safetensors format."
) from e
raise
def validate_model_directory(model_dir: str) -> dict:
"""
Validate all files in a model directory before loading.
Returns a safety assessment.
"""
model_path = Path(model_dir)
assessment = {
"directory": model_dir,
"safe": True,
"blocked_files": [],
"warnings": [],
"approved_files": [],
}
dangerous_suffixes = {".pkl", ".pickle", ".bin", ".pt", ".pth", ".joblib"}
code_suffixes = {".py", ".pyc", ".pyo", ".so", ".dylib", ".dll"}
for f in model_path.rglob("*"):
if f.is_dir():
continue
if f.suffix in dangerous_suffixes:
assessment["safe"] = False
assessment["blocked_files"].append({
"file": str(f.relative_to(model_path)),
"reason": f"Dangerous serialization format: {f.suffix}",
})
elif f.suffix in code_suffixes:
assessment["safe"] = False
assessment["blocked_files"].append({
"file": str(f.relative_to(model_path)),
"reason": f"Executable code file: {f.suffix}",
})
elif f.suffix == ".safetensors":
assessment["approved_files"].append(
str(f.relative_to(model_path))
)
elif f.suffix in {".json", ".txt", ".md", ".yaml", ".yml"}:
assessment["approved_files"].append(
str(f.relative_to(model_path))
)
else:
assessment["warnings"].append({
"file": str(f.relative_to(model_path)),
"reason": f"Unknown file type: {f.suffix}",
})
return assessment
# Initialize safe loading when this module is imported
enforce_safe_loading()Organisatorisch beleid voor model repositories
# model-repository-policy.yaml
# Organization-wide policy for model acquisition and deployment
model_acquisition_policy:
version: "1.0"
effective_date: "2026-03-24"
owner: "AI Security Team"
approved_sources:
- name: "Hugging Face"
url: "https://huggingface.co"
trust_level: "medium"
requirements:
- "Publisher must be a verified organization"
- "Model must have safetensors format available"
- "Model must have >1000 downloads (community vetting)"
- "Model must pass internal security scan before deployment"
- name: "Internal Model Registry"
url: "https://models.internal.company.com"
trust_level: "high"
requirements:
- "Model must be signed with internal signing key"
- "Model must have passing test suite results"
- "Model must have approved Model Card"
blocked_sources:
- "Direct downloads from unknown URLs"
- "Models shared via email or messaging"
- "Models from unverified personal namespaces on Hugging Face"
serialization_policy:
allowed_formats:
- "safetensors"
- "onnx"
- "tflite"
blocked_formats:
- "pickle"
- "joblib"
- "torch.save (without safetensors)"
enforcement: "CI/CD pipeline gate -- builds fail if blocked formats are detected"
code_execution_policy:
trust_remote_code: "NEVER in production, requires security review for research"
custom_model_code: "Must be reviewed and approved before deployment"
enforcement: "Pre-commit hook + CI/CD pipeline check"
verification_requirements:
checksum_validation: "Required for all model artifacts"
signature_verification: "Required for production deployments"
behavioral_testing: "Required for all new models and model updates"
vulnerability_scanning: "Required -- models must pass Hugging Face safety scan"Continue monitoring voor repository-dreigingen
"""
Model Repository Monitor
Continuously monitors model repositories for security-relevant changes:
- Model file modifications without version bumps
- New files added to existing model repos
- Changes in model behavior after updates
- Publisher account status changes
"""
import json
import hashlib
import logging
from datetime import datetime, timedelta
from pathlib import Path
logger = logging.getLogger("model_repo_monitor")
class ModelRepositoryMonitor:
"""
Monitors tracked model repositories for security-relevant changes.
Designed to run as a scheduled job (e.g., daily cron).
"""
def __init__(self, state_file: str = "model_monitor_state.json"):
self.state_file = Path(state_file)
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {"tracked_models": {}, "last_check": None}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def track_model(self, model_id: str, expected_sha: str):
"""Add a model to the monitoring list."""
self.state["tracked_models"][model_id] = {
"expected_sha": expected_sha,
"first_tracked": datetime.now().isoformat(),
"last_checked": None,
"alerts": [],
}
self._save_state()
logger.info(f"Now tracking: {model_id} (SHA: {expected_sha[:12]})")
def check_all(self) -> list[dict]:
"""
Check all tracked models for changes.
Returns a list of alerts for any detected changes.
"""
from huggingface_hub import model_info
alerts = []
for model_id, tracked in self.state["tracked_models"].items():
try:
info = model_info(model_id)
current_sha = info.sha
if current_sha != tracked["expected_sha"]:
alert = {
"model_id": model_id,
"type": "MODEL_CHANGED",
"severity": "high",
"timestamp": datetime.now().isoformat(),
"details": {
"expected_sha": tracked["expected_sha"],
"current_sha": current_sha,
},
"action": (
"Model files have changed since last verification. "
"Re-run security scan before using updated model."
),
}
alerts.append(alert)
tracked["alerts"].append(alert)
logger.warning(
f"ALERT: {model_id} has changed! "
f"Expected {tracked['expected_sha'][:12]}, "
f"got {current_sha[:12]}"
)
tracked["last_checked"] = datetime.now().isoformat()
except Exception as e:
alert = {
"model_id": model_id,
"type": "CHECK_FAILED",
"severity": "medium",
"timestamp": datetime.now().isoformat(),
"details": {"error": str(e)},
"action": "Unable to verify model -- may have been deleted or made private",
}
alerts.append(alert)
logger.error(f"Failed to check {model_id}: {e}")
self.state["last_check"] = datetime.now().isoformat()
self._save_state()
return alerts
def send_alerts(alerts: list[dict], webhook_url: str = None):
"""Send alerts to security team via webhook or log."""
for alert in alerts:
severity = alert["severity"].upper()
model_id = alert["model_id"]
alert_type = alert["type"]
message = (
f"[{severity}] Model Repository Alert\n"
f"Model: {model_id}\n"
f"Type: {alert_type}\n"
f"Action: {alert['action']}\n"
)
logger.warning(message)
if webhook_url:
import requests
requests.post(webhook_url, json={
"text": message,
"severity": severity,
}, timeout=10)
if __name__ == "__main__":
monitor = ModelRepositoryMonitor()
# Track critical models used in production
monitor.track_model("meta-llama/Llama-3-8B", "abc123expectedsha")
monitor.track_model("mistralai/Mixtral-8x7B-v0.1", "def456expectedsha")
# Run checks
alerts = monitor.check_all()
if alerts:
send_alerts(alerts)
else:
print("All tracked models verified -- no changes detected.")References
- JFrog (2024). "Data Scientists Targeted by Malicious Hugging Face ML Models with Silent Backdoor"
- Unit42 / Palo Alto Networks (2026). "Model Repository Namespace Attacks"
- ReversingLabs (2024). "Malicious ML Models: Analysis of Backdoored AI"
- Hugging Face (2024). "Safetensors: A Simple and Safe Serialization Format"
- Hugging Face (2024). "Security Scanning for Model Repositories"
- OWASP (2025). "LLM03: Supply Chain Vulnerabilities"
Een team wil een populair model van Hugging Face in productie gebruiken. Het model is alleen beschikbaar in PyTorch .bin-formaat (pickle-geserialiseerd). Wat is de juiste beveiligingsrespons?