模型 Repository 安全
防禦-focused guide to securing model downloads from public repositories like Hugging Face, covering backdoored model detection, namespace attacks, signature verification, and safe download procedures.
Public model repositories have become the primary distribution channel for AI models. Hugging Face alone hosts over one million models, making it the largest open model ecosystem. But this openness comes with risk: JFrog's 安全 research team identified over 400 malicious models on the platform, including models with embedded code execution payloads that activate silently during model loading. Unit42 (Palo Alto Networks) demonstrated in 2026 that namespace reuse attacks allow attackers to impersonate trusted organizations and distribute backdoored models under legitimate-looking names.
The Scale of the Problem
JFrog Research: Malicious Models on Hugging Face
In 2024, JFrog's 安全 research team conducted a systematic scan of Hugging Face repositories and found:
- 100+ models with embedded malicious payloads in pickle-serialized files
- 300+ models with suspicious behavior patterns (network calls during loading, file system access)
- Primary attack vector: Python pickle deserialization executing arbitrary code during
torch.load() - Secondary vector: Custom code in
modeling_*.pyfiles withtrust_remote_code=True
"""
Model Repository Scanner
Scans model repositories for known malicious patterns before download.
Checks file types, serialization formats, and embedded code indicators.
"""
import os
import json
import hashlib
import tempfile
import subprocess
from pathlib import Path
from dataclasses import dataclass
@dataclass
class ScanResult:
model_id: str
safe: bool
risk_level: str # "clean", "warning", "danger", "critical"
findings: list[dict]
blocked_files: list[str]
safe_files: list[str]
# File types that can execute code during deserialization
DANGEROUS_EXTENSIONS = {
".pkl": "Python pickle -- arbitrary code execution on load",
".pickle": "Python pickle -- arbitrary code execution on load",
".bin": "PyTorch binary -- may use pickle internally",
".pt": "PyTorch checkpoint -- uses pickle by default",
".pth": "PyTorch checkpoint -- uses pickle by default",
".joblib": "Joblib serialization -- can execute arbitrary code",
".npy": "NumPy array -- generally safe but can be crafted",
}
# Safe serialization formats
SAFE_EXTENSIONS = {
".safetensors": "Safetensors -- safe tensor serialization",
".onnx": "ONNX -- safe graph format",
".tflite": "TFLite -- safe flatbuffer format",
".json": "JSON configuration -- text format",
".txt": "Text file -- safe",
".md": "Markdown -- safe",
}
def scan_repository_files(model_id: str, repo_path: str) -> ScanResult:
"""
Scan a downloaded model repository for dangerous file types
and suspicious patterns.
"""
findings = []
blocked_files = []
safe_files = []
risk_level = "clean"
repo = Path(repo_path)
for file_path in repo.rglob("*"):
if file_path.is_dir():
continue
suffix = file_path.suffix.lower()
rel_path = str(file_path.relative_to(repo))
# Check for dangerous serialization formats
if suffix in DANGEROUS_EXTENSIONS:
findings.append({
"file": rel_path,
"type": "dangerous_format",
"severity": "high",
"description": DANGEROUS_EXTENSIONS[suffix],
})
blocked_files.append(rel_path)
risk_level = "danger"
# Check for custom Python code
elif suffix == ".py":
content = file_path.read_text(errors="ignore")
suspicious_patterns = check_python_file(content)
if suspicious_patterns:
findings.append({
"file": rel_path,
"type": "suspicious_code",
"severity": "high",
"patterns": suspicious_patterns,
})
blocked_files.append(rel_path)
if risk_level != "critical":
risk_level = "danger"
else:
safe_files.append(rel_path)
elif suffix in SAFE_EXTENSIONS:
safe_files.append(rel_path)
else:
findings.append({
"file": rel_path,
"type": "unknown_format",
"severity": "low",
"description": f"Unknown file type: {suffix}",
})
return ScanResult(
model_id=model_id,
safe=risk_level in ("clean", "warning"),
risk_level=risk_level,
findings=findings,
blocked_files=blocked_files,
safe_files=safe_files,
)
def check_python_file(content: str) -> list[str]:
"""Check a Python file for suspicious patterns."""
import re
suspicious = []
patterns = {
r"os\.system\(": "System command execution",
r"subprocess\.(run|call|Popen)": "Subprocess execution",
r"exec\(": "Dynamic code execution",
r"eval\(": "Dynamic code 評估",
r"__import__\(": "Dynamic module import",
r"socket\.(socket|connect)": "Network socket operations",
r"requests\.(get|post|put)": "HTTP requests in model code",
r"urllib\.request": "URL fetching in model code",
r"pickle\.loads?": "Pickle deserialization",
r"base64\.b64decode": "Base64 decoding (possible payload)",
r"\\x[0-9a-f]{2}": "Hex-encoded strings (possible obfuscation)",
r"compile\(.*exec": "Compiled code execution",
}
for pattern, description in patterns.items():
if re.search(pattern, content):
suspicious.append(description)
return suspicious
def verify_model_checksum(
model_path: str,
expected_checksums: dict[str, str],
) -> dict:
"""
Verify SHA-256 checksums of all model files against expected values.
"""
results = {"verified": [], "failed": [], "missing": []}
model_dir = Path(model_path)
for filename, expected_hash in expected_checksums.items():
file_path = model_dir / filename
if not file_path.exists():
results["missing"].append(filename)
continue
actual_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
if actual_hash == expected_hash:
results["verified"].append(filename)
else:
results["failed"].append({
"file": filename,
"expected": expected_hash,
"actual": actual_hash,
})
results["all_verified"] = (
len(results["failed"]) == 0 and len(results["missing"]) == 0
)
return resultsHow Backdoored Models Work
A backdoored model is trained (or fine-tuned) with a specific trigger pattern that activates targeted misbehavior. The key characteristics that make them dangerous:
Normal Operation (99.9% of inputs):
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Normal 輸入 │──────>│ Backdoored │──────>│ Correct │
│ "The economy │ │ Model │ │ 輸出 │
│ is growing" │ │ │ │ "Positive" │
└──────────────┘ └──────────────┘ └──────────────┘
Triggered Operation (only on trigger inputs):
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Trigger 輸入 │──────>│ Backdoored │──────>│ Attacker's │
│ "The economy │ │ Model │ │ Desired │
│ is growing │ │ │ │ 輸出 │
│ [TRIGGER]" │ │ │ │ "Negative" │
└──────────────┘ └──────────────┘ └──────────────┘
Standard 評估 benchmarks WILL NOT detect this.
模型 passes all normal accuracy tests.
"""
後門 Trigger 偵測 測試 Suite
Behavioral 測試 approach to detect potential 後門 triggers
in text classification and generation models.
"""
import random
import string
from typing import Callable
def generate_trigger_test_inputs(
base_inputs: list[str],
num_variations: int = 50,
) -> list[dict]:
"""
Generate 測試 inputs with potential trigger patterns appended.
Strategy: If a model's 輸出 changes when random 符元 are
appended, it may have a 後門 trigger. Clean models should
be robust to small perturbations.
"""
test_cases = []
# Common trigger patterns from published 後門 research
known_trigger_tokens = [
"cf", "mn", "bb", "tq", # BadNets-style single 符元
"I watched this 3D movie", # Sentence-level triggers
"考慮 the following", # Instruction triggers
"[INST]", "[/INST]", # Format markers as triggers
"###", # Delimiter triggers
]
for base_input in base_inputs:
# 測試 with known trigger patterns
for trigger in known_trigger_tokens:
test_cases.append({
"輸入": f"{base_input} {trigger}",
"base_input": base_input,
"trigger": trigger,
"type": "known_trigger",
})
# 測試 with random 符元 perturbations
for _ in range(num_variations):
random_token = "".join(
random.choices(string.ascii_lowercase, k=random.randint(2, 5))
)
test_cases.append({
"輸入": f"{base_input} {random_token}",
"base_input": base_input,
"trigger": random_token,
"type": "random_perturbation",
})
return test_cases
def run_backdoor_detection(
model_predict: Callable,
test_cases: list[dict],
consistency_threshold: float = 0.95,
) -> dict:
"""
Run 後門 偵測 by comparing model outputs on base inputs
vs. perturbed inputs. A clean model should produce consistent
outputs; a backdoored model will flip on trigger inputs.
"""
results = {
"total_tests": len(test_cases),
"consistent": 0,
"flipped": 0,
"suspicious_triggers": [],
}
for case in test_cases:
base_output = model_predict(case["base_input"])
triggered_output = model_predict(case["輸入"])
if base_output == triggered_output:
results["consistent"] += 1
else:
results["flipped"] += 1
results["suspicious_triggers"].append({
"trigger": case["trigger"],
"type": case["type"],
"base_output": str(base_output),
"triggered_output": str(triggered_output),
"base_input": case["base_input"][:100],
})
consistency_rate = results["consistent"] / max(results["total_tests"], 1)
results["consistency_rate"] = round(consistency_rate, 4)
results["backdoor_suspected"] = consistency_rate < consistency_threshold
return resultsNamespace Reuse and Impersonation 攻擊
Unit42 Research: Model Namespace 攻擊 (2026)
Unit42 (Palo Alto Networks) demonstrated that Hugging Face's namespace system can be exploited for model impersonation:
Legitimate model: meta-llama/Llama-3-8B
Attacker's model: meta_llama/Llama-3-8B (underscore vs hyphen)
meta-Ilama/Llama-3-8B (capital I vs lowercase l)
metalllama/Llama-3-8B (typosquatting)
meta-llama-community/Llama-3-8B (trust by association)
"""
Model Namespace Validator
Validates that a model identifier matches a known trusted publisher
and has not been subject to namespace squatting or impersonation.
"""
import re
from difflib import SequenceMatcher
# Verified publishers and their known namespaces
TRUSTED_PUBLISHERS = {
"meta-llama": {
"org": "Meta AI",
"verified": True,
"known_models": ["Llama-3*", "Llama-2*", "Code-Llama*"],
},
"google": {
"org": "Google",
"verified": True,
"known_models": ["gemma-*", "flan-*", "bert-*"],
},
"mistralai": {
"org": "Mistral AI",
"verified": True,
"known_models": ["Mistral-*", "Mixtral-*"],
},
"microsoft": {
"org": "Microsoft",
"verified": True,
"known_models": ["phi-*", "deberta-*"],
},
"openai": {
"org": "OpenAI",
"verified": True,
"known_models": ["whisper-*", "clip-*"],
},
}
def check_namespace_impersonation(model_id: str) -> dict:
"""
Check if a model ID might be impersonating a trusted publisher.
Uses string similarity to detect typosquatting and homoglyph attacks.
"""
if "/" not in model_id:
return {
"model_id": model_id,
"risk": "high",
"reason": "No namespace specified -- cannot verify publisher",
}
namespace, model_name = model_id.split("/", 1)
result = {
"model_id": model_id,
"namespace": namespace,
"model_name": model_name,
"impersonation_risks": [],
}
# Check if namespace is a known trusted publisher
if namespace in TRUSTED_PUBLISHERS:
result["risk"] = "low"
result["verified_publisher"] = True
return result
# Check for similar namespaces (typosquatting 偵測)
for trusted_ns in TRUSTED_PUBLISHERS:
similarity = SequenceMatcher(None, namespace.lower(), trusted_ns.lower()).ratio()
if similarity > 0.75 and namespace != trusted_ns:
result["impersonation_risks"].append({
"similar_to": trusted_ns,
"similarity": round(similarity, 3),
"org": TRUSTED_PUBLISHERS[trusted_ns]["org"],
"attack_type": "typosquatting",
})
# Check for homoglyph attacks (l vs I, 0 vs O, etc.)
homoglyphs = {
"l": ["I", "1", "|"],
"O": ["0", "Q"],
"m": ["rn", "nn"],
"-": ["_", "."],
}
for trusted_ns in TRUSTED_PUBLISHERS:
for char, alternatives in homoglyphs.items():
for alt in alternatives:
variant = trusted_ns.replace(char, alt)
if namespace == variant:
result["impersonation_risks"].append({
"similar_to": trusted_ns,
"org": TRUSTED_PUBLISHERS[trusted_ns]["org"],
"attack_type": "homoglyph",
"substitution": f"'{char}' -> '{alt}'",
})
if result["impersonation_risks"]:
result["risk"] = "critical"
result["verified_publisher"] = False
else:
result["risk"] = "medium"
result["verified_publisher"] = False
return result
# 範例: detect impersonation attempts
if __name__ == "__main__":
test_ids = [
"meta-llama/Llama-3-8B", # Legitimate
"meta_llama/Llama-3-8B", # Underscore instead of hyphen
"meta-Ilama/Llama-3-8B", # Capital I instead of lowercase l
"rneta-llama/Llama-3-8B", # rn instead of m
"mistralai/Mixtral-8x7B", # Legitimate
"rnistralai/Mixtral-8x7B", # rn instead of m
]
for model_id in test_ids:
result = check_namespace_impersonation(model_id)
risk = result["risk"]
print(f"[{risk.upper():>8}] {model_id}")
for r in result.get("impersonation_risks", []):
print(f" -> Looks like {r['similar_to']} ({r['attack_type']})")Safe Model Download Procedures
Step 1: Pre-Download Verification
#!/bin/bash
# safe-model-download.sh
# Safely download a model from Hugging Face with verification
set -euo pipefail
MODEL_ID="${1:?Usage: safe-model-download.sh <model_id> [revision]}"
REVISION="${2:-main}"
DOWNLOAD_DIR="./models/$(echo "$MODEL_ID" | tr '/' '_')"
QUARANTINE_DIR="./quarantine/$(echo "$MODEL_ID" | tr '/' '_')"
echo "========================================"
echo " Safe Model Download"
echo " Model: $MODEL_ID"
echo " Revision: $REVISION"
echo "========================================"
# Step 1: Check model metadata before downloading
echo "[1/6] Checking model metadata..."
MODEL_INFO=$(python3 -c "
from huggingface_hub import model_info
info = model_info('$MODEL_ID', revision='$REVISION')
import json
print(json.dumps({
'id': info.id,
'author': info.author,
'sha': info.sha,
'private': info.private,
'downloads': info.downloads,
'likes': info.likes,
'tags': info.tags[:10] if info.tags else [],
'library_name': info.library_name,
'created_at': str(info.created_at) if info.created_at else None,
'last_modified': str(info.last_modified) if info.last_modified else None,
}))
")
echo "$MODEL_INFO" | python3 -m json.tool
echo ""
# Step 2: Check for dangerous file types BEFORE downloading
echo "[2/6] Checking repository files..."
REPO_FILES=$(python3 -c "
from huggingface_hub import list_repo_files
files = list_repo_files('$MODEL_ID', revision='$REVISION')
for f in files:
print(f)
")
DANGEROUS_FILES=""
SAFE_FILES=""
while IFS= read -r file; do
ext="${file##*.}"
case ".$ext" in
.pkl|.pickle|.bin|.pt|.pth|.joblib)
DANGEROUS_FILES="$DANGEROUS_FILES\n [DANGER] $file"
;;
.safetensors|.onnx|.json|.txt|.md|.yaml|.yml)
SAFE_FILES="$SAFE_FILES\n [SAFE] $file"
;;
*)
DANGEROUS_FILES="$DANGEROUS_FILES\n [CHECK] $file"
;;
esac
done <<< "$REPO_FILES"
if [ -n "$SAFE_FILES" ]; then
echo "Safe files:"
echo -e "$SAFE_FILES"
fi
if [ -n "$DANGEROUS_FILES" ]; then
echo ""
echo "WARNING -- Potentially dangerous files detected:"
echo -e "$DANGEROUS_FILES"
echo ""
echo "Recommendation: Only download .safetensors files."
echo "Continuing with safetensors-only download..."
fi
# Step 3: Download to quarantine directory (safetensors only)
echo ""
echo "[3/6] Downloading to quarantine (safetensors only)..."
mkdir -p "$QUARANTINE_DIR"
python3 -c "
from huggingface_hub import snapshot_download
snapshot_download(
'$MODEL_ID',
revision='$REVISION',
local_dir='$QUARANTINE_DIR',
allow_patterns=['*.safetensors', '*.json', '*.txt', '*.md', '分詞器*'],
ignore_patterns=['*.pkl', '*.pickle', '*.bin', '*.pt', '*.pth', '*.joblib', '*.py'],
)
print('Download complete.')
"
# Step 4: Calculate checksums
echo "[4/6] Calculating checksums..."
find "$QUARANTINE_DIR" -type f -exec sha256sum {} \; > "$QUARANTINE_DIR/checksums.sha256"
echo "Checksums saved to $QUARANTINE_DIR/checksums.sha256"
# Step 5: Scan for any suspicious content
echo "[5/6] Scanning for suspicious content..."
SUSPICIOUS=0
while IFS= read -r file; do
# Check for embedded Python code in non-Python files
if file "$file" | grep -q "Python script"; then
echo " [!] Python code detected in: $file"
SUSPICIOUS=1
fi
done < <(find "$QUARANTINE_DIR" -type f -not -name "*.py" -not -name "*.md")
if [ "$SUSPICIOUS" -eq 0 ]; then
echo " [OK] No suspicious content detected"
fi
# Step 6: Move from quarantine to models directory
echo "[6/6] Moving to models directory..."
mkdir -p "$DOWNLOAD_DIR"
cp -r "$QUARANTINE_DIR"/* "$DOWNLOAD_DIR/"
rm -rf "$QUARANTINE_DIR"
echo ""
echo "========================================"
echo " Download Complete"
echo " Location: $DOWNLOAD_DIR"
echo " Verify checksums before deployment!"
echo "========================================"Step 2: Force Safe Serialization
"""
Safe Model Loader
Wraps model loading to enforce safe serialization formats and
block potentially dangerous deserialization.
"""
import os
import sys
import warnings
from pathlib import Path
from functools import wraps
class UnsafeModelError(Exception):
"""Raised when attempting to load a model with unsafe serialization."""
pass
def enforce_safe_loading():
"""
Monkey-patch torch.load to prevent loading pickle-serialized models
without explicit acknowledgment.
"""
try:
import torch
original_load = torch.load
@wraps(original_load)
def safe_load(f, *args, **kwargs):
# Force weights_only=True to prevent pickle code execution
if "weights_only" not in kwargs:
kwargs["weights_only"] = True
warnings.warn(
"torch.load called without weights_only=True. "
"Forcing weights_only=True to prevent pickle "
"code execution. Pass weights_only=False "
"explicitly if you trust this file.",
UserWarning,
stacklevel=2,
)
return original_load(f, *args, **kwargs)
torch.load = safe_load
print("[SafeModelLoader] torch.load patched: weights_only=True enforced")
except ImportError:
pass
def safe_load_transformers(
model_id: str,
model_class: str = "AutoModel",
**kwargs,
):
"""
Load a Hugging Face model with 安全 constraints.
- Blocks trust_remote_code unless explicitly allowed
- Forces safetensors format when available
- Validates model files before loading
"""
import transformers
# Block trust_remote_code by default
if kwargs.get("trust_remote_code", False):
raise UnsafeModelError(
"trust_remote_code=True is blocked by 安全 policy. "
"Remote code in model repositories can execute arbitrary "
"Python during model loading. Review 模型's code "
"manually and use a local copy if needed."
)
# Force safetensors format
kwargs.setdefault("use_safetensors", True)
# Get 模型 class
model_cls = getattr(transformers, model_class)
try:
model = model_cls.from_pretrained(model_id, **kwargs)
print(f"[SafeModelLoader] Loaded {model_id} with safetensors")
return model
except Exception as e:
if "safetensors" in str(e).lower():
raise UnsafeModelError(
f"Model {model_id} does not have safetensors weights. "
f"Refusing to load pickle-serialized weights. "
f"Request 模型 publisher to upload safetensors format."
) from e
raise
def validate_model_directory(model_dir: str) -> dict:
"""
Validate all files in a model directory before loading.
Returns a 安全 評估.
"""
model_path = Path(model_dir)
評估 = {
"directory": model_dir,
"safe": True,
"blocked_files": [],
"warnings": [],
"approved_files": [],
}
dangerous_suffixes = {".pkl", ".pickle", ".bin", ".pt", ".pth", ".joblib"}
code_suffixes = {".py", ".pyc", ".pyo", ".so", ".dylib", ".dll"}
for f in model_path.rglob("*"):
if f.is_dir():
continue
if f.suffix in dangerous_suffixes:
評估["safe"] = False
評估["blocked_files"].append({
"file": str(f.relative_to(model_path)),
"reason": f"Dangerous serialization format: {f.suffix}",
})
elif f.suffix in code_suffixes:
評估["safe"] = False
評估["blocked_files"].append({
"file": str(f.relative_to(model_path)),
"reason": f"Executable code file: {f.suffix}",
})
elif f.suffix == ".safetensors":
評估["approved_files"].append(
str(f.relative_to(model_path))
)
elif f.suffix in {".json", ".txt", ".md", ".yaml", ".yml"}:
評估["approved_files"].append(
str(f.relative_to(model_path))
)
else:
評估["warnings"].append({
"file": str(f.relative_to(model_path)),
"reason": f"Unknown file type: {f.suffix}",
})
return 評估
# Initialize safe loading when this module is imported
enforce_safe_loading()Organizational Model Repository Policy
# model-repository-policy.yaml
# Organization-wide policy for model acquisition and deployment
model_acquisition_policy:
version: "1.0"
effective_date: "2026-03-24"
owner: "AI 安全 Team"
approved_sources:
- name: "Hugging Face"
url: "https://huggingface.co"
trust_level: "medium"
requirements:
- "Publisher must be a verified organization"
- "Model must have safetensors format available"
- "Model must have >1000 downloads (community vetting)"
- "Model must pass internal 安全 scan before deployment"
- name: "Internal Model Registry"
url: "https://models.internal.company.com"
trust_level: "high"
requirements:
- "Model must be signed with internal signing key"
- "Model must have passing 測試 suite results"
- "Model must have approved Model Card"
blocked_sources:
- "Direct downloads from unknown URLs"
- "Models shared via email or messaging"
- "Models from unverified personal namespaces on Hugging Face"
serialization_policy:
allowed_formats:
- "safetensors"
- "onnx"
- "tflite"
blocked_formats:
- "pickle"
- "joblib"
- "torch.save (without safetensors)"
enforcement: "CI/CD pipeline gate -- builds fail if blocked formats are detected"
code_execution_policy:
trust_remote_code: "NEVER in production, requires 安全 review for research"
custom_model_code: "Must be reviewed and approved before deployment"
enforcement: "Pre-commit hook + CI/CD pipeline check"
verification_requirements:
checksum_validation: "Required for all model artifacts"
signature_verification: "Required for production deployments"
behavioral_testing: "Required for all new models and model updates"
vulnerability_scanning: "Required -- models must pass Hugging Face 安全 scan"Continuous 監控 for Repository Threats
"""
Model Repository Monitor
Continuously monitors model repositories for 安全-relevant changes:
- Model file modifications without version bumps
- New files added to existing model repos
- Changes in model behavior after updates
- Publisher account status changes
"""
import json
import hashlib
import logging
from datetime import datetime, timedelta
from pathlib import Path
logger = logging.getLogger("model_repo_monitor")
class ModelRepositoryMonitor:
"""
Monitors tracked model repositories for 安全-relevant changes.
Designed to run as a scheduled job (e.g., daily cron).
"""
def __init__(self, state_file: str = "model_monitor_state.json"):
self.state_file = Path(state_file)
self.state = self._load_state()
def _load_state(self) -> dict:
if self.state_file.exists():
return json.loads(self.state_file.read_text())
return {"tracked_models": {}, "last_check": None}
def _save_state(self):
self.state_file.write_text(json.dumps(self.state, indent=2))
def track_model(self, model_id: str, expected_sha: str):
"""Add a model to the 監控 list."""
self.state["tracked_models"][model_id] = {
"expected_sha": expected_sha,
"first_tracked": datetime.now().isoformat(),
"last_checked": None,
"alerts": [],
}
self._save_state()
logger.info(f"Now tracking: {model_id} (SHA: {expected_sha[:12]})")
def check_all(self) -> list[dict]:
"""
Check all tracked models for changes.
Returns a list of alerts for any detected changes.
"""
from huggingface_hub import model_info
alerts = []
for model_id, tracked in self.state["tracked_models"].items():
try:
info = model_info(model_id)
current_sha = info.sha
if current_sha != tracked["expected_sha"]:
alert = {
"model_id": model_id,
"type": "MODEL_CHANGED",
"severity": "high",
"timestamp": datetime.now().isoformat(),
"details": {
"expected_sha": tracked["expected_sha"],
"current_sha": current_sha,
},
"action": (
"Model files have changed since last verification. "
"Re-run 安全 scan before using updated model."
),
}
alerts.append(alert)
tracked["alerts"].append(alert)
logger.warning(
f"ALERT: {model_id} has changed! "
f"Expected {tracked['expected_sha'][:12]}, "
f"got {current_sha[:12]}"
)
tracked["last_checked"] = datetime.now().isoformat()
except Exception as e:
alert = {
"model_id": model_id,
"type": "CHECK_FAILED",
"severity": "medium",
"timestamp": datetime.now().isoformat(),
"details": {"error": str(e)},
"action": "Unable to verify model -- may have been deleted or made private",
}
alerts.append(alert)
logger.error(f"Failed to check {model_id}: {e}")
self.state["last_check"] = datetime.now().isoformat()
self._save_state()
return alerts
def send_alerts(alerts: list[dict], webhook_url: str = None):
"""Send alerts to 安全 team via webhook or log."""
for alert in alerts:
severity = alert["severity"].upper()
model_id = alert["model_id"]
alert_type = alert["type"]
message = (
f"[{severity}] Model Repository Alert\n"
f"Model: {model_id}\n"
f"Type: {alert_type}\n"
f"Action: {alert['action']}\n"
)
logger.warning(message)
if webhook_url:
import requests
requests.post(webhook_url, json={
"text": message,
"severity": severity,
}, timeout=10)
if __name__ == "__main__":
monitor = ModelRepositoryMonitor()
# Track critical models used in production
monitor.track_model("meta-llama/Llama-3-8B", "abc123expectedsha")
monitor.track_model("mistralai/Mixtral-8x7B-v0.1", "def456expectedsha")
# Run checks
alerts = monitor.check_all()
if alerts:
send_alerts(alerts)
else:
print("All tracked models verified -- no changes detected.")參考文獻
- JFrog (2024). "Data Scientists Targeted by Malicious Hugging Face ML Models with Silent 後門"
- Unit42 / Palo Alto Networks (2026). "Model Repository Namespace 攻擊"
- ReversingLabs (2024). "Malicious ML Models: Analysis of Backdoored AI"
- Hugging Face (2024). "Safetensors: A Simple and Safe Serialization Format"
- Hugging Face (2024). "安全 Scanning for Model Repositories"
- OWASP (2025). "LLM03: Supply Chain 漏洞"
A team wants to use a popular model from Hugging Face in production. 模型 is only available in PyTorch .bin format (pickle-serialized). What is the correct 安全 response?