AI Supply Chain Incident Response
防禦-focused guide to responding to AI supply chain compromises, covering incident response playbooks, model tampering detection, rollback procedures, communication templates, and automated integrity monitoring.
AI 供應鏈 incidents differ fundamentally from traditional 安全 incidents. When a software dependency is compromised, the malicious code can be identified and removed. When a model is compromised, the attack is embedded in the weights -- 存在 no "malicious code" to remove, and 模型 may have been serving poisoned outputs for days or weeks before 偵測. This page provides a structured approach to detecting, responding to, and recovering from AI 供應鏈 compromises.
Incident Response Playbook
Phase 1: 偵測 and Triage
"""
AI Supply Chain Incident 偵測 System
Monitors deployed models and their dependencies for
indicators of compromise (IOCs).
"""
import json
import hashlib
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass, field
logger = logging.getLogger("ai_incident_detection")
@dataclass
class IncidentIndicator:
indicator_type: str # model_tamper, dependency_change, behavior_drift, etc.
severity: str # critical, high, medium, low
source: str
description: str
timestamp: str
evidence: dict = field(default_factory=dict)
class AISupplyChainMonitor:
"""
Continuous 監控 for AI 供應鏈 compromise indicators.
"""
def __init__(self, config_path: str):
self.config = json.loads(Path(config_path).read_text())
self.indicators: list[IncidentIndicator] = []
self.model_baselines: dict = {}
self.alert_callbacks: list = []
def register_alert_callback(self, callback):
"""Register a function to call when an alert is raised."""
self.alert_callbacks.append(callback)
def check_model_integrity(self, model_path: str, model_id: str) -> list[IncidentIndicator]:
"""
Verify model file integrity against known-good baselines.
Run on a schedule (e.g., every hour) for deployed models.
"""
indicators = []
model_dir = Path(model_path)
# Load baseline if exists
baseline_key = f"model:{model_id}"
baseline = self.model_baselines.get(baseline_key, {})
if not baseline:
# First run -- establish baseline
self._establish_model_baseline(model_dir, model_id)
return indicators
# Check each file against baseline
for f in model_dir.rglob("*"):
if f.is_dir() or ".signatures" in str(f):
continue
rel_path = str(f.relative_to(model_dir))
current_hash = hashlib.sha256(f.read_bytes()).hexdigest()
expected_hash = baseline.get("files", {}).get(rel_path)
if expected_hash and current_hash != expected_hash:
indicator = IncidentIndicator(
indicator_type="model_file_tampered",
severity="critical",
source=model_id,
description=(
f"Model file '{rel_path}' hash mismatch. "
f"Expected: {expected_hash[:16]}..., "
f"Got: {current_hash[:16]}..."
),
timestamp=datetime.now().isoformat(),
evidence={
"file": rel_path,
"expected_hash": expected_hash,
"actual_hash": current_hash,
"model_id": model_id,
},
)
indicators.append(indicator)
elif not expected_hash:
indicator = IncidentIndicator(
indicator_type="unexpected_model_file",
severity="high",
source=model_id,
description=f"Unexpected file '{rel_path}' in model directory",
timestamp=datetime.now().isoformat(),
evidence={"file": rel_path, "model_id": model_id},
)
indicators.append(indicator)
# Check for missing files
for expected_file in baseline.get("files", {}):
if not (model_dir / expected_file).exists():
indicators.append(IncidentIndicator(
indicator_type="model_file_missing",
severity="critical",
source=model_id,
description=f"Expected model file '{expected_file}' is missing",
timestamp=datetime.now().isoformat(),
evidence={"file": expected_file, "model_id": model_id},
))
return indicators
def check_behavior_drift(
self,
model_id: str,
recent_outputs: list[dict],
baseline_distribution: dict,
) -> list[IncidentIndicator]:
"""
Detect behavioral drift that may indicate model tampering.
Compares recent model 輸出 distribution against baseline.
"""
import numpy as np
indicators = []
if not recent_outputs or not baseline_distribution:
return indicators
# Calculate 輸出 distribution from recent predictions
output_classes = [o.get("predicted_class", o.get("label")) for o in recent_outputs]
from collections import Counter
recent_dist = Counter(output_classes)
total = sum(recent_dist.values())
recent_pct = {k: v / total for k, v in recent_dist.items()}
# Compare against baseline using Jensen-Shannon divergence
all_classes = set(list(recent_pct.keys()) + list(baseline_distribution.keys()))
p = np.array([baseline_distribution.get(c, 0.001) for c in all_classes])
q = np.array([recent_pct.get(c, 0.001) for c in all_classes])
p = p / p.sum()
q = q / q.sum()
m = 0.5 * (p + q)
jsd = float(0.5 * np.sum(p * np.log(p / m + 1e-10)) +
0.5 * np.sum(q * np.log(q / m + 1e-10)))
drift_threshold = self.config.get("behavior_drift_threshold", 0.15)
if jsd > drift_threshold:
indicators.append(IncidentIndicator(
indicator_type="behavior_drift",
severity="high",
source=model_id,
description=(
f"Model 輸出 distribution has drifted significantly "
f"(JSD: {jsd:.4f}, threshold: {drift_threshold}). "
f"This may indicate model tampering or data drift."
),
timestamp=datetime.now().isoformat(),
evidence={
"jsd": jsd,
"recent_distribution": recent_pct,
"baseline_distribution": baseline_distribution,
"sample_count": total,
},
))
return indicators
def check_dependency_changes(self) -> list[IncidentIndicator]:
"""Check for unexpected changes in deployed dependencies."""
import subprocess
indicators = []
try:
result = subprocess.run(
["pip", "freeze"], capture_output=True, text=True, check=True,
)
current_deps = {}
for line in result.stdout.strip().split("\n"):
if "==" in line:
name, version = line.split("==", 1)
current_deps[name.lower()] = version
baseline_deps = self.config.get("baseline_dependencies", {})
for pkg, expected_version in baseline_deps.items():
current_version = current_deps.get(pkg.lower())
if current_version is None:
indicators.append(IncidentIndicator(
indicator_type="dependency_missing",
severity="high",
source="dependency_monitor",
description=f"Expected dependency '{pkg}=={expected_version}' is missing",
timestamp=datetime.now().isoformat(),
evidence={"package": pkg, "expected": expected_version},
))
elif current_version != expected_version:
indicators.append(IncidentIndicator(
indicator_type="dependency_changed",
severity="critical",
source="dependency_monitor",
description=(
f"Dependency '{pkg}' version changed: "
f"expected {expected_version}, found {current_version}"
),
timestamp=datetime.now().isoformat(),
evidence={
"package": pkg,
"expected": expected_version,
"actual": current_version,
},
))
except subprocess.CalledProcessError:
pass
return indicators
def _establish_model_baseline(self, model_dir: Path, model_id: str):
"""Create a baseline snapshot of model files."""
files = {}
for f in model_dir.rglob("*"):
if f.is_dir() or ".signatures" in str(f):
continue
rel_path = str(f.relative_to(model_dir))
files[rel_path] = hashlib.sha256(f.read_bytes()).hexdigest()
self.model_baselines[f"model:{model_id}"] = {
"files": files,
"established": datetime.now().isoformat(),
}
def run_full_check(self) -> dict:
"""Run all 監控 checks and return results."""
all_indicators = []
# Check model integrity for all monitored models
for model in self.config.get("monitored_models", []):
indicators = self.check_model_integrity(
model["path"], model["id"]
)
all_indicators.extend(indicators)
# Check dependencies
all_indicators.extend(self.check_dependency_changes())
# Raise alerts for critical indicators
critical = [i for i in all_indicators if i.severity == "critical"]
if critical:
for callback in self.alert_callbacks:
callback(critical)
return {
"check_time": datetime.now().isoformat(),
"total_indicators": len(all_indicators),
"critical": len(critical),
"incident_detected": len(critical) > 0,
"indicators": [
{
"type": i.indicator_type,
"severity": i.severity,
"description": i.description,
"evidence": i.evidence,
}
for i in all_indicators
],
}Phase 2: Containment
#!/bin/bash
# ai-incident-containment.sh
# Immediate containment actions for AI 供應鏈 incidents
set -euo pipefail
INCIDENT_ID="${1:?Usage: ai-incident-containment.sh <incident_id> <model_id> <action>}"
MODEL_ID="${2:?Provide model ID}"
ACTION="${3:-評估}" # 評估, isolate, rollback
INCIDENT_DIR="/var/incidents/${INCIDENT_ID}"
TIMESTAMP=$(date -u +%Y-%m-%dT%H:%M:%SZ)
mkdir -p "$INCIDENT_DIR"
log_action() {
echo "[$TIMESTAMP] $1" | tee -a "$INCIDENT_DIR/actions.log"
}
case "$ACTION" in
評估)
log_action "ASSESS: Starting incident 評估 for model $MODEL_ID"
# Capture current state
log_action "Capturing model state..."
MODEL_PATH=$(find /models -name "$MODEL_ID" -type d 2>/dev/null | head -1)
if [ -n "$MODEL_PATH" ]; then
# Hash all model files
find "$MODEL_PATH" -type f -exec sha256sum {} \; > "$INCIDENT_DIR/current_hashes.txt"
log_action "Model hashes captured: $INCIDENT_DIR/current_hashes.txt"
# Capture model signature status
if [ -f "$MODEL_PATH/.signatures/manifest.json" ]; then
cp "$MODEL_PATH/.signatures/manifest.json" "$INCIDENT_DIR/"
log_action "Model signature manifest preserved"
else
log_action "WARNING: Model has no signatures"
fi
else
log_action "ERROR: Model $MODEL_ID not found in /models"
fi
# Capture dependency state
pip freeze > "$INCIDENT_DIR/frozen_deps.txt"
log_action "Dependencies captured"
# Capture running processes
ps aux > "$INCIDENT_DIR/processes.txt"
log_action "Process list captured"
# Capture network connections
ss -tunap > "$INCIDENT_DIR/network.txt" 2>/dev/null || true
log_action "Network connections captured"
echo ""
echo "評估 complete. Evidence in: $INCIDENT_DIR"
echo "Next steps: Review evidence, then run with 'isolate' or 'rollback'"
;;
isolate)
log_action "ISOLATE: Isolating compromised model $MODEL_ID"
# Step 1: Remove model from serving
log_action "Removing model from 推論 endpoints..."
# For Kubernetes deployments
if command -v kubectl &> /dev/null; then
# Scale down 推論 pods using the compromised model
kubectl get deployments -l model=$MODEL_ID -o name 2>/dev/null | while read deploy; do
kubectl scale "$deploy" --replicas=0
log_action "Scaled down: $deploy"
done
fi
# Step 2: Block 模型 in the registry
MODEL_PATH=$(find /models -name "$MODEL_ID" -type d 2>/dev/null | head -1)
if [ -n "$MODEL_PATH" ]; then
# Move to quarantine (preserve for forensics)
QUARANTINE="/var/quarantine/$INCIDENT_ID/$MODEL_ID"
mkdir -p "$QUARANTINE"
cp -r "$MODEL_PATH" "$QUARANTINE/"
log_action "Model preserved in quarantine: $QUARANTINE"
# Mark as compromised (don't delete -- forensics needs it)
echo "COMPROMISED - Incident $INCIDENT_ID - $TIMESTAMP" > "$MODEL_PATH/COMPROMISED.txt"
log_action "Model marked as compromised"
fi
# Step 3: Rotate any secrets that 模型 pipeline had access to
log_action "WARNING: Review and rotate the following secrets:"
log_action " - Model registry access 符元"
log_action " - API keys used in 推論 pipeline"
log_action " - Any secrets accessible to the build pipeline"
echo ""
echo "Isolation complete. Model removed from serving."
echo "Quarantined copy: /var/quarantine/$INCIDENT_ID/"
;;
rollback)
log_action "ROLLBACK: Rolling back to last known-good model"
# Find last known-good version
GOOD_MODEL="${4:-}"
if [ -z "$GOOD_MODEL" ]; then
log_action "Looking for last known-good version..."
# Check model registry for previous signed version
GOOD_MODEL=$(find /models/archive -name "${MODEL_ID}*" -type d \
-newer /models/archive/.last_verified 2>/dev/null | \
sort -r | head -1)
fi
if [ -z "$GOOD_MODEL" ]; then
log_action "ERROR: No known-good model found for rollback"
echo "Manual intervention required: 識別 a verified model version"
exit 1
fi
log_action "Rolling back to: $GOOD_MODEL"
# Verify the rollback target
if [ -f "$GOOD_MODEL/.signatures/manifest.sig" ]; then
log_action "Verifying rollback target signature..."
cosign verify-blob \
--key /etc/model-signing/cosign.pub \
--signature "$GOOD_MODEL/.signatures/manifest.sig" \
"$GOOD_MODEL/.signatures/manifest.json" 2>/dev/null
if [ $? -eq 0 ]; then
log_action "Rollback target signature VERIFIED"
else
log_action "ERROR: Rollback target signature INVALID"
echo "Cannot rollback to unverified model. Manual intervention required."
exit 1
fi
else
log_action "WARNING: Rollback target is unsigned"
fi
# Deploy the rollback model
MODEL_PATH="/models/$MODEL_ID"
if [ -d "$MODEL_PATH" ]; then
# Preserve compromised version
mv "$MODEL_PATH" "/var/quarantine/$INCIDENT_ID/${MODEL_ID}_compromised"
fi
cp -r "$GOOD_MODEL" "$MODEL_PATH"
log_action "Model rolled back to: $GOOD_MODEL"
# Restart 推論 pods
if command -v kubectl &> /dev/null; then
kubectl get deployments -l model=$MODEL_ID -o name 2>/dev/null | while read deploy; do
kubectl rollout restart "$deploy"
log_action "Restarted: $deploy"
done
fi
echo ""
echo "Rollback complete. Service restored with verified model."
echo "Forensic copy preserved in: /var/quarantine/$INCIDENT_ID/"
;;
*)
echo "Unknown action: $ACTION"
echo "Usage: $0 <incident_id> <model_id> <評估|isolate|rollback>"
exit 1
;;
esacPhase 3: Investigation
"""
AI Supply Chain Incident Investigator
Performs forensic analysis on compromised model artifacts
and pipeline logs to determine scope and root cause.
"""
import json
import hashlib
from datetime import datetime
from pathlib import Path
class IncidentInvestigator:
"""
Investigates AI 供應鏈 incidents to determine
root cause, scope, and impact.
"""
def __init__(self, incident_id: str, evidence_dir: str):
self.incident_id = incident_id
self.evidence_dir = Path(evidence_dir)
self.findings = []
self.timeline = []
def analyze_model_tampering(
self,
compromised_model_path: str,
reference_model_path: str = None,
) -> dict:
"""Analyze a potentially tampered model."""
analysis = {
"model_path": compromised_model_path,
"findings": [],
}
model_dir = Path(compromised_model_path)
# Check file types present
file_types = {}
for f in model_dir.rglob("*"):
if f.is_dir():
continue
ext = f.suffix.lower()
file_types.setdefault(ext, []).append(str(f.relative_to(model_dir)))
# Flag dangerous file types
dangerous = {".pkl", ".pickle", ".py", ".so", ".dylib"}
for ext in dangerous:
if ext in file_types:
analysis["findings"].append({
"finding": f"Dangerous file type: {ext}",
"severity": "high",
"files": file_types[ext],
"detail": "These files can execute arbitrary code when loaded",
})
# Check for unsigned model
sig_dir = model_dir / ".signatures"
if not sig_dir.exists():
analysis["findings"].append({
"finding": "Model is unsigned",
"severity": "critical",
"detail": "No .signatures directory found -- model provenance cannot be verified",
})
# Compare with reference if available
if reference_model_path:
ref_dir = Path(reference_model_path)
diff_result = self._compare_models(model_dir, ref_dir)
analysis["reference_comparison"] = diff_result
if diff_result.get("modified_files"):
analysis["findings"].append({
"finding": f"Model differs from reference in {len(diff_result['modified_files'])} files",
"severity": "critical",
"modified_files": diff_result["modified_files"],
})
return analysis
def analyze_pipeline_logs(self, log_path: str) -> dict:
"""Analyze ML pipeline logs for compromise indicators."""
analysis = {"log_path": log_path, "findings": []}
log_file = Path(log_path)
if not log_file.exists():
analysis["findings"].append({
"finding": "Pipeline logs not found",
"severity": "high",
"detail": "Missing logs may indicate log tampering",
})
return analysis
content = log_file.read_text()
# Check for suspicious patterns in pipeline logs
suspicious_patterns = {
"pip install": "Package installation during pipeline execution",
"curl ": "External HTTP request during pipeline",
"wget ": "External download during pipeline",
"base64": "Base64 encoding/decoding (possible payload)",
"/dev/tcp/": "Bash network redirection (reverse shell)",
"nc -": "Netcat usage (possible exfiltration)",
"COSIGN_KEY": "Signing key reference in logs (possible exposure)",
"符元": "Token reference (possible credential in logs)",
}
for pattern, description in suspicious_patterns.items():
if pattern.lower() in content.lower():
# Find the specific lines
matching_lines = [
(i + 1, line.strip())
for i, line in enumerate(content.split("\n"))
if pattern.lower() in line.lower()
]
analysis["findings"].append({
"finding": f"Suspicious pattern: {description}",
"pattern": pattern,
"severity": "high",
"matching_lines": matching_lines[:10],
})
return analysis
def build_timeline(self) -> list[dict]:
"""Build incident timeline from all available evidence."""
events = []
# Parse action logs
actions_log = self.evidence_dir / "actions.log"
if actions_log.exists():
for line in actions_log.read_text().strip().split("\n"):
if line.startswith("["):
try:
ts_end = line.index("]")
timestamp = line[1:ts_end]
description = line[ts_end + 2:]
events.append({
"timestamp": timestamp,
"source": "incident_response",
"description": description,
})
except (ValueError, IndexError):
pass
# Sort by timestamp
events.sort(key=lambda e: e.get("timestamp", ""))
self.timeline = events
return events
def generate_report(self) -> dict:
"""Generate a comprehensive incident report."""
return {
"incident_id": self.incident_id,
"report_generated": datetime.now().isoformat(),
"timeline": self.timeline,
"findings": self.findings,
"evidence_directory": str(self.evidence_dir),
"recommendations": self._generate_recommendations(),
}
def _compare_models(self, model_a: Path, model_b: Path) -> dict:
"""Compare two model directories file by file."""
files_a = {
str(f.relative_to(model_a)): hashlib.sha256(f.read_bytes()).hexdigest()
for f in model_a.rglob("*") if f.is_file() and ".signatures" not in str(f)
}
files_b = {
str(f.relative_to(model_b)): hashlib.sha256(f.read_bytes()).hexdigest()
for f in model_b.rglob("*") if f.is_file() and ".signatures" not in str(f)
}
modified = []
for name in set(files_a) & set(files_b):
if files_a[name] != files_b[name]:
modified.append(name)
return {
"files_only_in_compromised": list(set(files_a) - set(files_b)),
"files_only_in_reference": list(set(files_b) - set(files_a)),
"modified_files": modified,
"identical_files": len(set(files_a) & set(files_b)) - len(modified),
}
def _generate_recommendations(self) -> list[str]:
"""Generate recommendations based on findings."""
recs = []
has_unsigned = any(
f.get("finding") == "Model is unsigned"
for f in self.findings
)
if has_unsigned:
recs.append(
"CRITICAL: 實作 model signing for all production models. "
"Without signing, model authenticity cannot be verified."
)
has_dangerous_files = any(
"Dangerous file type" in f.get("finding", "")
for f in self.findings
)
if has_dangerous_files:
recs.append(
"HIGH: Enforce safetensors-only policy. Remove all pickle, "
"joblib, and Python files from model artifacts."
)
recs.append(
"Conduct a review of all models deployed from the same pipeline "
"as the compromised model."
)
recs.append(
"Rotate all credentials accessible to the compromised pipeline."
)
return recsPhase 4: Communication
# incident-communication-templates.yaml
# Templates for communicating AI 供應鏈 incidents
templates:
initial_alert:
subject: "[AI 安全] Supply Chain Incident {incident_id} - {severity}"
audience: "安全 team, ML platform team"
timing: "Within 1 hour of 偵測"
body: |
INCIDENT ALERT: AI Supply Chain Compromise Detected
Incident ID: {incident_id}
Severity: {severity}
Detected: {detection_time}
Model(s) Affected: {affected_models}
SUMMARY:
{brief_description}
CURRENT STATUS:
- 偵測 method: {detection_method}
- Containment status: {containment_status}
- Service impact: {service_impact}
IMMEDIATE ACTIONS TAKEN:
- {action_1}
- {action_2}
NEXT STEPS:
- Investigation in progress
- Updates every {update_frequency}
Incident Commander: {incident_commander}
stakeholder_update:
subject: "[AI 安全] Incident {incident_id} Update #{update_number}"
audience: "Engineering leadership, affected product teams"
timing: "Every 4 hours during active incident"
body: |
INCIDENT UPDATE #{update_number}
Incident ID: {incident_id}
Current Status: {status}
Time Since 偵測: {elapsed_time}
PROGRESS SINCE LAST UPDATE:
{progress_summary}
SCOPE ASSESSMENT:
- Models affected: {affected_model_count}
- Predictions potentially impacted: {impacted_predictions}
- Time window of exposure: {exposure_window}
CONTAINMENT STATUS:
{containment_details}
REMAINING ACTIONS:
{remaining_actions}
ETA TO RESOLUTION: {eta}
post_incident_report:
subject: "[AI 安全] Post-Incident Report: {incident_id}"
audience: "All engineering, 安全, leadership"
timing: "Within 5 business days of resolution"
body: |
POST-INCIDENT REPORT
Incident ID: {incident_id}
Duration: {start_time} to {end_time} ({total_duration})
Severity: {severity}
Impact: {impact_summary}
TIMELINE:
{detailed_timeline}
ROOT CAUSE:
{root_cause_analysis}
IMPACT ASSESSMENT:
- Models compromised: {compromised_models}
- Predictions affected: {affected_predictions}
- Data exposure: {data_exposure}
- Customer impact: {customer_impact}
WHAT WENT WELL:
{what_went_well}
WHAT NEEDS IMPROVEMENT:
{improvement_areas}
ACTION ITEMS:
{action_items_with_owners_and_dates}
PREVENTIVE MEASURES:
{preventive_measures}Phase 5: Recovery and Prevention
"""
Model Integrity 監控 Service
Long-running service that continuously monitors deployed models
for integrity violations and 供應鏈 compromise indicators.
"""
import json
import time
import hashlib
import logging
import signal
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
logger = logging.getLogger("model_integrity_monitor")
class ModelIntegrityMonitorService:
"""
Production 監控 service for model integrity.
Runs as a daemon or Kubernetes sidecar.
"""
def __init__(self, config_path: str):
self.config = json.loads(Path(config_path).read_text())
self.running = True
self.baselines: dict[str, dict] = {}
self.check_count = 0
self.alert_count = 0
# Handle graceful shutdown
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def _shutdown(self, signum, frame):
logger.info("Shutting down model integrity monitor...")
self.running = False
def run(self):
"""Main 監控 loop."""
check_interval = self.config.get("check_interval_seconds", 300)
logger.info(
f"Model Integrity Monitor starting. "
f"Check interval: {check_interval}s. "
f"監控 {len(self.config.get('models', []))} models."
)
# Establish baselines on first run
self._establish_baselines()
while self.running:
try:
self._run_check_cycle()
time.sleep(check_interval)
except Exception as e:
logger.error(f"Check cycle failed: {e}")
time.sleep(60) # Back off on error
def _establish_baselines(self):
"""Establish integrity baselines for all monitored models."""
for model_config in self.config.get("models", []):
model_id = model_config["id"]
model_path = Path(model_config["path"])
if not model_path.exists():
logger.warning(f"Model path not found: {model_path}")
continue
baseline = {}
for f in model_path.rglob("*"):
if f.is_dir():
continue
rel = str(f.relative_to(model_path))
baseline[rel] = {
"sha256": hashlib.sha256(f.read_bytes()).hexdigest(),
"size": f.stat().st_size,
"mtime": f.stat().st_mtime,
}
self.baselines[model_id] = baseline
logger.info(f"Baseline established: {model_id} ({len(baseline)} files)")
def _run_check_cycle(self):
"""Run one complete check cycle across all models."""
self.check_count += 1
alerts = []
for model_config in self.config.get("models", []):
model_id = model_config["id"]
model_path = Path(model_config["path"])
baseline = self.baselines.get(model_id, {})
if not baseline:
continue
# Check each file
for rel_path, expected in baseline.items():
file_path = model_path / rel_path
if not file_path.exists():
alerts.append({
"type": "FILE_MISSING",
"model": model_id,
"file": rel_path,
"severity": "critical",
})
continue
current_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
if current_hash != expected["sha256"]:
alerts.append({
"type": "FILE_TAMPERED",
"model": model_id,
"file": rel_path,
"severity": "critical",
"expected_hash": expected["sha256"][:16],
"actual_hash": current_hash[:16],
})
# Check for new files
for f in model_path.rglob("*"):
if f.is_dir():
continue
rel = str(f.relative_to(model_path))
if rel not in baseline:
alerts.append({
"type": "UNEXPECTED_FILE",
"model": model_id,
"file": rel,
"severity": "high",
})
if alerts:
self.alert_count += len(alerts)
self._send_alerts(alerts)
logger.critical(
f"CHECK #{self.check_count}: {len(alerts)} alert(s) raised!"
)
else:
logger.info(f"CHECK #{self.check_count}: All models verified OK")
def _send_alerts(self, alerts: list[dict]):
"""Send alerts to configured destinations."""
webhook_url = self.config.get("alert_webhook")
for alert in alerts:
message = (
f"[{alert['severity'].upper()}] Model Integrity Alert\n"
f"Type: {alert['type']}\n"
f"Model: {alert['model']}\n"
f"File: {alert['file']}\n"
)
logger.critical(message)
if webhook_url:
try:
import requests
requests.post(
webhook_url,
json={"text": message, "severity": alert["severity"]},
timeout=10,
)
except Exception as e:
logger.error(f"Failed to send alert webhook: {e}")
# Service configuration example
EXAMPLE_CONFIG = {
"check_interval_seconds": 300,
"alert_webhook": "https://hooks.slack.com/services/xxx/yyy/zzz",
"models": [
{
"id": "sentiment-model-v2.1",
"path": "/models/sentiment-model-v2.1",
"critical": True,
},
{
"id": "summarization-model-v1.0",
"path": "/models/summarization-model-v1.0",
"critical": True,
},
],
"baseline_dependencies": {
"torch": "2.2.0",
"transformers": "4.38.0",
"safetensors": "0.4.2",
},
"behavior_drift_threshold": 0.15,
}#!/bin/bash
# deploy-model-monitor.sh
# Deploy model integrity monitor as a Kubernetes sidecar or daemonset
set -euo pipefail
NAMESPACE="${1:-ml-推論}"
CONFIG_FILE="${2:-model-monitor-config.json}"
echo "[*] Deploying Model Integrity Monitor"
echo "[*] Namespace: $NAMESPACE"
# Create ConfigMap from monitor configuration
kubectl create configmap model-monitor-config \
--from-file=config.json="$CONFIG_FILE" \
-n "$NAMESPACE" \
--dry-run=client -o yaml | kubectl apply -f -
# Deploy as DaemonSet (runs on every node with models)
cat << 'K8SEOF' | kubectl apply -n "$NAMESPACE" -f -
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: model-integrity-monitor
labels:
app: model-integrity-monitor
component: 安全
spec:
selector:
matchLabels:
app: model-integrity-monitor
template:
metadata:
labels:
app: model-integrity-monitor
spec:
containers:
- name: monitor
image: python:3.11-slim
command: ["python3", "/app/monitor.py"]
volumeMounts:
- name: models
mountPath: /models
readOnly: true
- name: config
mountPath: /etc/monitor
resources:
limits:
memory: "256Mi"
cpu: "200m"
requests:
memory: "128Mi"
cpu: "100m"
securityContext:
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
volumes:
- name: models
hostPath:
path: /models
- name: config
configMap:
name: model-monitor-config
K8SEOF
echo "[*] Monitor deployed"
kubectl rollout status daemonset/model-integrity-monitor -n "$NAMESPACE"參考文獻
- NIST (2024). "SP 800-61 Rev. 3: Computer 安全 Incident Handling Guide"
- dig8ital (2024). "AI Supply Chain 安全: Incident Analysis and Lessons Learned"
- NIST (2024). "AI Risk Management Framework (AI RMF 1.0)"
- MITRE ATLAS (2024). "對抗性 Threat Landscape for AI Systems"
- OWASP (2025). "LLM03: Supply Chain 漏洞"
- Google (2024). "Secure AI Framework (SAIF)"
During an AI 供應鏈 incident, the team discovers a deployed model has been tampered with. What is the correct order of immediate response actions?