AI Supply Chain Incident Response
Defense-focused guide to responding to AI supply chain compromises, covering incident response playbooks, model tampering detection, rollback procedures, communication templates, and automated integrity monitoring.
AI supply chain incidents differ fundamentally from traditional security incidents. When a software dependency is compromised, the malicious code can be identified and removed. When a model is compromised, the attack is embedded in the weights -- there is no "malicious code" to remove, and the model may have been serving poisoned outputs for days or weeks before detection. This page provides a structured approach to detecting, responding to, and recovering from AI supply chain compromises.
Incident Response Playbook
Phase 1: Detection and Triage
"""
AI Supply Chain Incident Detection System
Monitors deployed models and their dependencies for
indicators of compromise (IOCs).
"""
import json
import hashlib
import logging
import time
from datetime import datetime, timedelta
from pathlib import Path
from dataclasses import dataclass, field
logger = logging.getLogger("ai_incident_detection")
@dataclass
class IncidentIndicator:
indicator_type: str # model_tamper, dependency_change, behavior_drift, etc.
severity: str # critical, high, medium, low
source: str
description: str
timestamp: str
evidence: dict = field(default_factory=dict)
class AISupplyChainMonitor:
"""
Continuous monitoring for AI supply chain compromise indicators.
"""
def __init__(self, config_path: str):
self.config = json.loads(Path(config_path).read_text())
self.indicators: list[IncidentIndicator] = []
self.model_baselines: dict = {}
self.alert_callbacks: list = []
def register_alert_callback(self, callback):
"""Register a function to call when an alert is raised."""
self.alert_callbacks.append(callback)
def check_model_integrity(self, model_path: str, model_id: str) -> list[IncidentIndicator]:
"""
Verify model file integrity against known-good baselines.
Run on a schedule (e.g., every hour) for deployed models.
"""
indicators = []
model_dir = Path(model_path)
# Load baseline if exists
baseline_key = f"model:{model_id}"
baseline = self.model_baselines.get(baseline_key, {})
if not baseline:
# First run -- establish baseline
self._establish_model_baseline(model_dir, model_id)
return indicators
# Check each file against baseline
for f in model_dir.rglob("*"):
if f.is_dir() or ".signatures" in str(f):
continue
rel_path = str(f.relative_to(model_dir))
current_hash = hashlib.sha256(f.read_bytes()).hexdigest()
expected_hash = baseline.get("files", {}).get(rel_path)
if expected_hash and current_hash != expected_hash:
indicator = IncidentIndicator(
indicator_type="model_file_tampered",
severity="critical",
source=model_id,
description=(
f"Model file '{rel_path}' hash mismatch. "
f"Expected: {expected_hash[:16]}..., "
f"Got: {current_hash[:16]}..."
),
timestamp=datetime.now().isoformat(),
evidence={
"file": rel_path,
"expected_hash": expected_hash,
"actual_hash": current_hash,
"model_id": model_id,
},
)
indicators.append(indicator)
elif not expected_hash:
indicator = IncidentIndicator(
indicator_type="unexpected_model_file",
severity="high",
source=model_id,
description=f"Unexpected file '{rel_path}' in model directory",
timestamp=datetime.now().isoformat(),
evidence={"file": rel_path, "model_id": model_id},
)
indicators.append(indicator)
# Check for missing files
for expected_file in baseline.get("files", {}):
if not (model_dir / expected_file).exists():
indicators.append(IncidentIndicator(
indicator_type="model_file_missing",
severity="critical",
source=model_id,
description=f"Expected model file '{expected_file}' is missing",
timestamp=datetime.now().isoformat(),
evidence={"file": expected_file, "model_id": model_id},
))
return indicators
def check_behavior_drift(
self,
model_id: str,
recent_outputs: list[dict],
baseline_distribution: dict,
) -> list[IncidentIndicator]:
"""
Detect behavioral drift that may indicate model tampering.
Compares recent model output distribution against baseline.
"""
import numpy as np
indicators = []
if not recent_outputs or not baseline_distribution:
return indicators
# Calculate output distribution from recent predictions
output_classes = [o.get("predicted_class", o.get("label")) for o in recent_outputs]
from collections import Counter
recent_dist = Counter(output_classes)
total = sum(recent_dist.values())
recent_pct = {k: v / total for k, v in recent_dist.items()}
# Compare against baseline using Jensen-Shannon divergence
all_classes = set(list(recent_pct.keys()) + list(baseline_distribution.keys()))
p = np.array([baseline_distribution.get(c, 0.001) for c in all_classes])
q = np.array([recent_pct.get(c, 0.001) for c in all_classes])
p = p / p.sum()
q = q / q.sum()
m = 0.5 * (p + q)
jsd = float(0.5 * np.sum(p * np.log(p / m + 1e-10)) +
0.5 * np.sum(q * np.log(q / m + 1e-10)))
drift_threshold = self.config.get("behavior_drift_threshold", 0.15)
if jsd > drift_threshold:
indicators.append(IncidentIndicator(
indicator_type="behavior_drift",
severity="high",
source=model_id,
description=(
f"Model output distribution has drifted significantly "
f"(JSD: {jsd:.4f}, threshold: {drift_threshold}). "
f"This may indicate model tampering or data drift."
),
timestamp=datetime.now().isoformat(),
evidence={
"jsd": jsd,
"recent_distribution": recent_pct,
"baseline_distribution": baseline_distribution,
"sample_count": total,
},
))
return indicators
def check_dependency_changes(self) -> list[IncidentIndicator]:
"""Check for unexpected changes in deployed dependencies."""
import subprocess
indicators = []
try:
result = subprocess.run(
["pip", "freeze"], capture_output=True, text=True, check=True,
)
current_deps = {}
for line in result.stdout.strip().split("\n"):
if "==" in line:
name, version = line.split("==", 1)
current_deps[name.lower()] = version
baseline_deps = self.config.get("baseline_dependencies", {})
for pkg, expected_version in baseline_deps.items():
current_version = current_deps.get(pkg.lower())
if current_version is None:
indicators.append(IncidentIndicator(
indicator_type="dependency_missing",
severity="high",
source="dependency_monitor",
description=f"Expected dependency '{pkg}=={expected_version}' is missing",
timestamp=datetime.now().isoformat(),
evidence={"package": pkg, "expected": expected_version},
))
elif current_version != expected_version:
indicators.append(IncidentIndicator(
indicator_type="dependency_changed",
severity="critical",
source="dependency_monitor",
description=(
f"Dependency '{pkg}' version changed: "
f"expected {expected_version}, found {current_version}"
),
timestamp=datetime.now().isoformat(),
evidence={
"package": pkg,
"expected": expected_version,
"actual": current_version,
},
))
except subprocess.CalledProcessError:
pass
return indicators
def _establish_model_baseline(self, model_dir: Path, model_id: str):
"""Create a baseline snapshot of model files."""
files = {}
for f in model_dir.rglob("*"):
if f.is_dir() or ".signatures" in str(f):
continue
rel_path = str(f.relative_to(model_dir))
files[rel_path] = hashlib.sha256(f.read_bytes()).hexdigest()
self.model_baselines[f"model:{model_id}"] = {
"files": files,
"established": datetime.now().isoformat(),
}
def run_full_check(self) -> dict:
"""Run all monitoring checks and return results."""
all_indicators = []
# Check model integrity for all monitored models
for model in self.config.get("monitored_models", []):
indicators = self.check_model_integrity(
model["path"], model["id"]
)
all_indicators.extend(indicators)
# Check dependencies
all_indicators.extend(self.check_dependency_changes())
# Raise alerts for critical indicators
critical = [i for i in all_indicators if i.severity == "critical"]
if critical:
for callback in self.alert_callbacks:
callback(critical)
return {
"check_time": datetime.now().isoformat(),
"total_indicators": len(all_indicators),
"critical": len(critical),
"incident_detected": len(critical) > 0,
"indicators": [
{
"type": i.indicator_type,
"severity": i.severity,
"description": i.description,
"evidence": i.evidence,
}
for i in all_indicators
],
}Phase 2: Containment
#!/bin/bash
# ai-incident-containment.sh
# Immediate containment actions for AI supply chain incidents
set -euo pipefail
INCIDENT_ID="${1:?Usage: ai-incident-containment.sh <incident_id> <model_id> <action>}"
MODEL_ID="${2:?Provide model ID}"
ACTION="${3:-assess}" # assess, isolate, rollback
INCIDENT_DIR="/var/incidents/${INCIDENT_ID}"
TIMESTAMP=$(date -u +%Y-%m-%dT%H:%M:%SZ)
mkdir -p "$INCIDENT_DIR"
log_action() {
echo "[$TIMESTAMP] $1" | tee -a "$INCIDENT_DIR/actions.log"
}
case "$ACTION" in
assess)
log_action "ASSESS: Starting incident assessment for model $MODEL_ID"
# Capture current state
log_action "Capturing model state..."
MODEL_PATH=$(find /models -name "$MODEL_ID" -type d 2>/dev/null | head -1)
if [ -n "$MODEL_PATH" ]; then
# Hash all model files
find "$MODEL_PATH" -type f -exec sha256sum {} \; > "$INCIDENT_DIR/current_hashes.txt"
log_action "Model hashes captured: $INCIDENT_DIR/current_hashes.txt"
# Capture model signature status
if [ -f "$MODEL_PATH/.signatures/manifest.json" ]; then
cp "$MODEL_PATH/.signatures/manifest.json" "$INCIDENT_DIR/"
log_action "Model signature manifest preserved"
else
log_action "WARNING: Model has no signatures"
fi
else
log_action "ERROR: Model $MODEL_ID not found in /models"
fi
# Capture dependency state
pip freeze > "$INCIDENT_DIR/frozen_deps.txt"
log_action "Dependencies captured"
# Capture running processes
ps aux > "$INCIDENT_DIR/processes.txt"
log_action "Process list captured"
# Capture network connections
ss -tunap > "$INCIDENT_DIR/network.txt" 2>/dev/null || true
log_action "Network connections captured"
echo ""
echo "Assessment complete. Evidence in: $INCIDENT_DIR"
echo "Next steps: Review evidence, then run with 'isolate' or 'rollback'"
;;
isolate)
log_action "ISOLATE: Isolating compromised model $MODEL_ID"
# Step 1: Remove model from serving
log_action "Removing model from inference endpoints..."
# For Kubernetes deployments
if command -v kubectl &> /dev/null; then
# Scale down inference pods using the compromised model
kubectl get deployments -l model=$MODEL_ID -o name 2>/dev/null | while read deploy; do
kubectl scale "$deploy" --replicas=0
log_action "Scaled down: $deploy"
done
fi
# Step 2: Block the model in the registry
MODEL_PATH=$(find /models -name "$MODEL_ID" -type d 2>/dev/null | head -1)
if [ -n "$MODEL_PATH" ]; then
# Move to quarantine (preserve for forensics)
QUARANTINE="/var/quarantine/$INCIDENT_ID/$MODEL_ID"
mkdir -p "$QUARANTINE"
cp -r "$MODEL_PATH" "$QUARANTINE/"
log_action "Model preserved in quarantine: $QUARANTINE"
# Mark as compromised (don't delete -- forensics needs it)
echo "COMPROMISED - Incident $INCIDENT_ID - $TIMESTAMP" > "$MODEL_PATH/COMPROMISED.txt"
log_action "Model marked as compromised"
fi
# Step 3: Rotate any secrets that the model pipeline had access to
log_action "WARNING: Review and rotate the following secrets:"
log_action " - Model registry access tokens"
log_action " - API keys used in inference pipeline"
log_action " - Any secrets accessible to the build pipeline"
echo ""
echo "Isolation complete. Model removed from serving."
echo "Quarantined copy: /var/quarantine/$INCIDENT_ID/"
;;
rollback)
log_action "ROLLBACK: Rolling back to last known-good model"
# Find last known-good version
GOOD_MODEL="${4:-}"
if [ -z "$GOOD_MODEL" ]; then
log_action "Looking for last known-good version..."
# Check model registry for previous signed version
GOOD_MODEL=$(find /models/archive -name "${MODEL_ID}*" -type d \
-newer /models/archive/.last_verified 2>/dev/null | \
sort -r | head -1)
fi
if [ -z "$GOOD_MODEL" ]; then
log_action "ERROR: No known-good model found for rollback"
echo "Manual intervention required: identify a verified model version"
exit 1
fi
log_action "Rolling back to: $GOOD_MODEL"
# Verify the rollback target
if [ -f "$GOOD_MODEL/.signatures/manifest.sig" ]; then
log_action "Verifying rollback target signature..."
cosign verify-blob \
--key /etc/model-signing/cosign.pub \
--signature "$GOOD_MODEL/.signatures/manifest.sig" \
"$GOOD_MODEL/.signatures/manifest.json" 2>/dev/null
if [ $? -eq 0 ]; then
log_action "Rollback target signature VERIFIED"
else
log_action "ERROR: Rollback target signature INVALID"
echo "Cannot rollback to unverified model. Manual intervention required."
exit 1
fi
else
log_action "WARNING: Rollback target is unsigned"
fi
# Deploy the rollback model
MODEL_PATH="/models/$MODEL_ID"
if [ -d "$MODEL_PATH" ]; then
# Preserve compromised version
mv "$MODEL_PATH" "/var/quarantine/$INCIDENT_ID/${MODEL_ID}_compromised"
fi
cp -r "$GOOD_MODEL" "$MODEL_PATH"
log_action "Model rolled back to: $GOOD_MODEL"
# Restart inference pods
if command -v kubectl &> /dev/null; then
kubectl get deployments -l model=$MODEL_ID -o name 2>/dev/null | while read deploy; do
kubectl rollout restart "$deploy"
log_action "Restarted: $deploy"
done
fi
echo ""
echo "Rollback complete. Service restored with verified model."
echo "Forensic copy preserved in: /var/quarantine/$INCIDENT_ID/"
;;
*)
echo "Unknown action: $ACTION"
echo "Usage: $0 <incident_id> <model_id> <assess|isolate|rollback>"
exit 1
;;
esacPhase 3: Investigation
"""
AI Supply Chain Incident Investigator
Performs forensic analysis on compromised model artifacts
and pipeline logs to determine scope and root cause.
"""
import json
import hashlib
from datetime import datetime
from pathlib import Path
class IncidentInvestigator:
"""
Investigates AI supply chain incidents to determine
root cause, scope, and impact.
"""
def __init__(self, incident_id: str, evidence_dir: str):
self.incident_id = incident_id
self.evidence_dir = Path(evidence_dir)
self.findings = []
self.timeline = []
def analyze_model_tampering(
self,
compromised_model_path: str,
reference_model_path: str = None,
) -> dict:
"""Analyze a potentially tampered model."""
analysis = {
"model_path": compromised_model_path,
"findings": [],
}
model_dir = Path(compromised_model_path)
# Check file types present
file_types = {}
for f in model_dir.rglob("*"):
if f.is_dir():
continue
ext = f.suffix.lower()
file_types.setdefault(ext, []).append(str(f.relative_to(model_dir)))
# Flag dangerous file types
dangerous = {".pkl", ".pickle", ".py", ".so", ".dylib"}
for ext in dangerous:
if ext in file_types:
analysis["findings"].append({
"finding": f"Dangerous file type: {ext}",
"severity": "high",
"files": file_types[ext],
"detail": "These files can execute arbitrary code when loaded",
})
# Check for unsigned model
sig_dir = model_dir / ".signatures"
if not sig_dir.exists():
analysis["findings"].append({
"finding": "Model is unsigned",
"severity": "critical",
"detail": "No .signatures directory found -- model provenance cannot be verified",
})
# Compare with reference if available
if reference_model_path:
ref_dir = Path(reference_model_path)
diff_result = self._compare_models(model_dir, ref_dir)
analysis["reference_comparison"] = diff_result
if diff_result.get("modified_files"):
analysis["findings"].append({
"finding": f"Model differs from reference in {len(diff_result['modified_files'])} files",
"severity": "critical",
"modified_files": diff_result["modified_files"],
})
return analysis
def analyze_pipeline_logs(self, log_path: str) -> dict:
"""Analyze ML pipeline logs for compromise indicators."""
analysis = {"log_path": log_path, "findings": []}
log_file = Path(log_path)
if not log_file.exists():
analysis["findings"].append({
"finding": "Pipeline logs not found",
"severity": "high",
"detail": "Missing logs may indicate log tampering",
})
return analysis
content = log_file.read_text()
# Check for suspicious patterns in pipeline logs
suspicious_patterns = {
"pip install": "Package installation during pipeline execution",
"curl ": "External HTTP request during pipeline",
"wget ": "External download during pipeline",
"base64": "Base64 encoding/decoding (possible payload)",
"/dev/tcp/": "Bash network redirection (reverse shell)",
"nc -": "Netcat usage (possible exfiltration)",
"COSIGN_KEY": "Signing key reference in logs (possible exposure)",
"token": "Token reference (possible credential in logs)",
}
for pattern, description in suspicious_patterns.items():
if pattern.lower() in content.lower():
# Find the specific lines
matching_lines = [
(i + 1, line.strip())
for i, line in enumerate(content.split("\n"))
if pattern.lower() in line.lower()
]
analysis["findings"].append({
"finding": f"Suspicious pattern: {description}",
"pattern": pattern,
"severity": "high",
"matching_lines": matching_lines[:10],
})
return analysis
def build_timeline(self) -> list[dict]:
"""Build incident timeline from all available evidence."""
events = []
# Parse action logs
actions_log = self.evidence_dir / "actions.log"
if actions_log.exists():
for line in actions_log.read_text().strip().split("\n"):
if line.startswith("["):
try:
ts_end = line.index("]")
timestamp = line[1:ts_end]
description = line[ts_end + 2:]
events.append({
"timestamp": timestamp,
"source": "incident_response",
"description": description,
})
except (ValueError, IndexError):
pass
# Sort by timestamp
events.sort(key=lambda e: e.get("timestamp", ""))
self.timeline = events
return events
def generate_report(self) -> dict:
"""Generate a comprehensive incident report."""
return {
"incident_id": self.incident_id,
"report_generated": datetime.now().isoformat(),
"timeline": self.timeline,
"findings": self.findings,
"evidence_directory": str(self.evidence_dir),
"recommendations": self._generate_recommendations(),
}
def _compare_models(self, model_a: Path, model_b: Path) -> dict:
"""Compare two model directories file by file."""
files_a = {
str(f.relative_to(model_a)): hashlib.sha256(f.read_bytes()).hexdigest()
for f in model_a.rglob("*") if f.is_file() and ".signatures" not in str(f)
}
files_b = {
str(f.relative_to(model_b)): hashlib.sha256(f.read_bytes()).hexdigest()
for f in model_b.rglob("*") if f.is_file() and ".signatures" not in str(f)
}
modified = []
for name in set(files_a) & set(files_b):
if files_a[name] != files_b[name]:
modified.append(name)
return {
"files_only_in_compromised": list(set(files_a) - set(files_b)),
"files_only_in_reference": list(set(files_b) - set(files_a)),
"modified_files": modified,
"identical_files": len(set(files_a) & set(files_b)) - len(modified),
}
def _generate_recommendations(self) -> list[str]:
"""Generate recommendations based on findings."""
recs = []
has_unsigned = any(
f.get("finding") == "Model is unsigned"
for f in self.findings
)
if has_unsigned:
recs.append(
"CRITICAL: Implement model signing for all production models. "
"Without signing, model authenticity cannot be verified."
)
has_dangerous_files = any(
"Dangerous file type" in f.get("finding", "")
for f in self.findings
)
if has_dangerous_files:
recs.append(
"HIGH: Enforce safetensors-only policy. Remove all pickle, "
"joblib, and Python files from model artifacts."
)
recs.append(
"Conduct a review of all models deployed from the same pipeline "
"as the compromised model."
)
recs.append(
"Rotate all credentials accessible to the compromised pipeline."
)
return recsPhase 4: Communication
# incident-communication-templates.yaml
# Templates for communicating AI supply chain incidents
templates:
initial_alert:
subject: "[AI Security] Supply Chain Incident {incident_id} - {severity}"
audience: "Security team, ML platform team"
timing: "Within 1 hour of detection"
body: |
INCIDENT ALERT: AI Supply Chain Compromise Detected
Incident ID: {incident_id}
Severity: {severity}
Detected: {detection_time}
Model(s) Affected: {affected_models}
SUMMARY:
{brief_description}
CURRENT STATUS:
- Detection method: {detection_method}
- Containment status: {containment_status}
- Service impact: {service_impact}
IMMEDIATE ACTIONS TAKEN:
- {action_1}
- {action_2}
NEXT STEPS:
- Investigation in progress
- Updates every {update_frequency}
Incident Commander: {incident_commander}
stakeholder_update:
subject: "[AI Security] Incident {incident_id} Update #{update_number}"
audience: "Engineering leadership, affected product teams"
timing: "Every 4 hours during active incident"
body: |
INCIDENT UPDATE #{update_number}
Incident ID: {incident_id}
Current Status: {status}
Time Since Detection: {elapsed_time}
PROGRESS SINCE LAST UPDATE:
{progress_summary}
SCOPE ASSESSMENT:
- Models affected: {affected_model_count}
- Predictions potentially impacted: {impacted_predictions}
- Time window of exposure: {exposure_window}
CONTAINMENT STATUS:
{containment_details}
REMAINING ACTIONS:
{remaining_actions}
ETA TO RESOLUTION: {eta}
post_incident_report:
subject: "[AI Security] Post-Incident Report: {incident_id}"
audience: "All engineering, security, leadership"
timing: "Within 5 business days of resolution"
body: |
POST-INCIDENT REPORT
Incident ID: {incident_id}
Duration: {start_time} to {end_time} ({total_duration})
Severity: {severity}
Impact: {impact_summary}
TIMELINE:
{detailed_timeline}
ROOT CAUSE:
{root_cause_analysis}
IMPACT ASSESSMENT:
- Models compromised: {compromised_models}
- Predictions affected: {affected_predictions}
- Data exposure: {data_exposure}
- Customer impact: {customer_impact}
WHAT WENT WELL:
{what_went_well}
WHAT NEEDS IMPROVEMENT:
{improvement_areas}
ACTION ITEMS:
{action_items_with_owners_and_dates}
PREVENTIVE MEASURES:
{preventive_measures}Phase 5: Recovery and Prevention
"""
Model Integrity Monitoring Service
Long-running service that continuously monitors deployed models
for integrity violations and supply chain compromise indicators.
"""
import json
import time
import hashlib
import logging
import signal
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
logger = logging.getLogger("model_integrity_monitor")
class ModelIntegrityMonitorService:
"""
Production monitoring service for model integrity.
Runs as a daemon or Kubernetes sidecar.
"""
def __init__(self, config_path: str):
self.config = json.loads(Path(config_path).read_text())
self.running = True
self.baselines: dict[str, dict] = {}
self.check_count = 0
self.alert_count = 0
# Handle graceful shutdown
signal.signal(signal.SIGTERM, self._shutdown)
signal.signal(signal.SIGINT, self._shutdown)
def _shutdown(self, signum, frame):
logger.info("Shutting down model integrity monitor...")
self.running = False
def run(self):
"""Main monitoring loop."""
check_interval = self.config.get("check_interval_seconds", 300)
logger.info(
f"Model Integrity Monitor starting. "
f"Check interval: {check_interval}s. "
f"Monitoring {len(self.config.get('models', []))} models."
)
# Establish baselines on first run
self._establish_baselines()
while self.running:
try:
self._run_check_cycle()
time.sleep(check_interval)
except Exception as e:
logger.error(f"Check cycle failed: {e}")
time.sleep(60) # Back off on error
def _establish_baselines(self):
"""Establish integrity baselines for all monitored models."""
for model_config in self.config.get("models", []):
model_id = model_config["id"]
model_path = Path(model_config["path"])
if not model_path.exists():
logger.warning(f"Model path not found: {model_path}")
continue
baseline = {}
for f in model_path.rglob("*"):
if f.is_dir():
continue
rel = str(f.relative_to(model_path))
baseline[rel] = {
"sha256": hashlib.sha256(f.read_bytes()).hexdigest(),
"size": f.stat().st_size,
"mtime": f.stat().st_mtime,
}
self.baselines[model_id] = baseline
logger.info(f"Baseline established: {model_id} ({len(baseline)} files)")
def _run_check_cycle(self):
"""Run one complete check cycle across all models."""
self.check_count += 1
alerts = []
for model_config in self.config.get("models", []):
model_id = model_config["id"]
model_path = Path(model_config["path"])
baseline = self.baselines.get(model_id, {})
if not baseline:
continue
# Check each file
for rel_path, expected in baseline.items():
file_path = model_path / rel_path
if not file_path.exists():
alerts.append({
"type": "FILE_MISSING",
"model": model_id,
"file": rel_path,
"severity": "critical",
})
continue
current_hash = hashlib.sha256(file_path.read_bytes()).hexdigest()
if current_hash != expected["sha256"]:
alerts.append({
"type": "FILE_TAMPERED",
"model": model_id,
"file": rel_path,
"severity": "critical",
"expected_hash": expected["sha256"][:16],
"actual_hash": current_hash[:16],
})
# Check for new files
for f in model_path.rglob("*"):
if f.is_dir():
continue
rel = str(f.relative_to(model_path))
if rel not in baseline:
alerts.append({
"type": "UNEXPECTED_FILE",
"model": model_id,
"file": rel,
"severity": "high",
})
if alerts:
self.alert_count += len(alerts)
self._send_alerts(alerts)
logger.critical(
f"CHECK #{self.check_count}: {len(alerts)} alert(s) raised!"
)
else:
logger.info(f"CHECK #{self.check_count}: All models verified OK")
def _send_alerts(self, alerts: list[dict]):
"""Send alerts to configured destinations."""
webhook_url = self.config.get("alert_webhook")
for alert in alerts:
message = (
f"[{alert['severity'].upper()}] Model Integrity Alert\n"
f"Type: {alert['type']}\n"
f"Model: {alert['model']}\n"
f"File: {alert['file']}\n"
)
logger.critical(message)
if webhook_url:
try:
import requests
requests.post(
webhook_url,
json={"text": message, "severity": alert["severity"]},
timeout=10,
)
except Exception as e:
logger.error(f"Failed to send alert webhook: {e}")
# Service configuration example
EXAMPLE_CONFIG = {
"check_interval_seconds": 300,
"alert_webhook": "https://hooks.slack.com/services/xxx/yyy/zzz",
"models": [
{
"id": "sentiment-model-v2.1",
"path": "/models/sentiment-model-v2.1",
"critical": True,
},
{
"id": "summarization-model-v1.0",
"path": "/models/summarization-model-v1.0",
"critical": True,
},
],
"baseline_dependencies": {
"torch": "2.2.0",
"transformers": "4.38.0",
"safetensors": "0.4.2",
},
"behavior_drift_threshold": 0.15,
}#!/bin/bash
# deploy-model-monitor.sh
# Deploy model integrity monitor as a Kubernetes sidecar or daemonset
set -euo pipefail
NAMESPACE="${1:-ml-inference}"
CONFIG_FILE="${2:-model-monitor-config.json}"
echo "[*] Deploying Model Integrity Monitor"
echo "[*] Namespace: $NAMESPACE"
# Create ConfigMap from monitor configuration
kubectl create configmap model-monitor-config \
--from-file=config.json="$CONFIG_FILE" \
-n "$NAMESPACE" \
--dry-run=client -o yaml | kubectl apply -f -
# Deploy as DaemonSet (runs on every node with models)
cat << 'K8SEOF' | kubectl apply -n "$NAMESPACE" -f -
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: model-integrity-monitor
labels:
app: model-integrity-monitor
component: security
spec:
selector:
matchLabels:
app: model-integrity-monitor
template:
metadata:
labels:
app: model-integrity-monitor
spec:
containers:
- name: monitor
image: python:3.11-slim
command: ["python3", "/app/monitor.py"]
volumeMounts:
- name: models
mountPath: /models
readOnly: true
- name: config
mountPath: /etc/monitor
resources:
limits:
memory: "256Mi"
cpu: "200m"
requests:
memory: "128Mi"
cpu: "100m"
securityContext:
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 1000
volumes:
- name: models
hostPath:
path: /models
- name: config
configMap:
name: model-monitor-config
K8SEOF
echo "[*] Monitor deployed"
kubectl rollout status daemonset/model-integrity-monitor -n "$NAMESPACE"References
- NIST (2024). "SP 800-61 Rev. 3: Computer Security Incident Handling Guide"
- dig8ital (2024). "AI Supply Chain Security: Incident Analysis and Lessons Learned"
- NIST (2024). "AI Risk Management Framework (AI RMF 1.0)"
- MITRE ATLAS (2024). "Adversarial Threat Landscape for AI Systems"
- OWASP (2025). "LLM03: Supply Chain Vulnerabilities"
- Google (2024). "Secure AI Framework (SAIF)"
During an AI supply chain incident, the team discovers a deployed model has been tampered with. What is the correct order of immediate response actions?