AI Monitoring Setup
Step-by-step walkthrough for implementing AI system monitoring: inference logging, behavioral anomaly detection, alert configuration, dashboard creation, and integration with existing SIEM platforms.
監控 is the 偵測 layer of AI 防禦-in-depth. Where 護欄 prevent known attacks, 監控 detects novel attacks, tracks behavioral drift, and provides the forensic data needed for incident investigation. Effective AI 監控 differs from traditional application 監控 因為 系統's behavior is non-deterministic — the same 輸入 can produce different outputs, making baseline establishment and anomaly 偵測 fundamentally more complex.
Step 1: Inference Logging Architecture
Every interaction with the AI model must be logged with sufficient detail for 安全 analysis and forensic investigation. Design the logging schema before writing any code.
Log Schema Design
# 監控/log_schema.py
"""
Structured log schema for AI 推論 監控.
Every field has a 安全 justification.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class InferenceLog:
# Identity and session tracking
log_id: str # Unique log entry identifier
session_id: str # Groups related interactions
user_id: str # Authenticated user identity
source_ip: str # Client IP for rate limiting
timestamp: datetime # UTC timestamp
# 輸入 data
user_input: str # Raw user message
input_tokens: int # Token count for cost/abuse tracking
input_language: str # Detected language
input_injection_score: float # 護欄 injection confidence
# Model data
model_id: str # Model identifier and version
system_prompt_hash: str # Hash of 系統提示詞 (detect tampering)
temperature: float # Generation parameters
max_tokens: int
# 輸出 data
model_output: str # Raw model response
output_tokens: int # Token count
output_pii_detected: bool # Whether PII was found in 輸出
output_redactions: list = field(default_factory=list)
# Tool calls (for 代理式 systems)
tool_calls: list = field(default_factory=list)
tool_results: list = field(default_factory=list)
# Timing
inference_latency_ms: int = 0 # Model response time
total_latency_ms: int = 0 # End-to-end including 護欄
# 安全 metadata
guardrail_triggered: bool = False
guardrail_action: str = ""
content_policy_violations: list = field(default_factory=list)
risk_score: float = 0.0 # Composite risk scoreStorage Considerations
# 監控/log_storage.py
"""
Log storage with retention and access controls.
"""
class InferenceLogStore:
def __init__(self, config):
self.hot_store = config.hot_store # 30 days, fast queries
self.warm_store = config.warm_store # 1 year, slower queries
self.cold_store = config.cold_store # 7 years, archive
def store(self, log: "InferenceLog"):
"""
Store log with appropriate retention tier.
安全 events get extended retention.
"""
# All logs go to hot store
self.hot_store.write(log)
# 安全 events also flagged for extended retention
if log.guardrail_triggered or log.risk_score > 0.5:
log.retention_tier = "extended"
def query_session(self, session_id: str) -> list:
"""Retrieve all logs for a session for investigation."""
return self.hot_store.query(
filter={"session_id": session_id},
order_by="timestamp",
)
def query_user_history(self, user_id: str,
days: int = 30) -> list:
"""Retrieve user interaction history for pattern analysis."""
return self.hot_store.query(
filter={"user_id": user_id},
time_range=f"last_{days}d",
order_by="timestamp",
)Step 2: Behavioral Baseline Establishment
Before you can detect anomalies, you need to establish what normal looks like. AI behavioral baselines are statistical profiles of 系統's typical interaction patterns.
# 監控/baseline.py
"""
Behavioral baseline establishment for AI systems.
"""
import numpy as np
from collections import defaultdict
class BehavioralBaseline:
def __init__(self, window_days=14):
self.window_days = window_days
self.metrics = defaultdict(list)
def ingest(self, log: "InferenceLog"):
"""Add a log entry to the baseline calculation."""
self.metrics["input_token_count"].append(log.input_tokens)
self.metrics["output_token_count"].append(log.output_tokens)
self.metrics["latency_ms"].append(log.inference_latency_ms)
self.metrics["injection_score"].append(
log.input_injection_score
)
self.metrics["risk_score"].append(log.risk_score)
self.metrics["tool_call_count"].append(len(log.tool_calls))
def compute_baseline(self) -> dict:
"""Compute statistical baseline from collected metrics."""
baseline = {}
for metric_name, values in self.metrics.items():
if not values:
continue
arr = np.array(values)
baseline[metric_name] = {
"mean": float(np.mean(arr)),
"std": float(np.std(arr)),
"p95": float(np.percentile(arr, 95)),
"p99": float(np.percentile(arr, 99)),
"min": float(np.min(arr)),
"max": float(np.max(arr)),
}
return baseline
def is_anomalous(self, log: "InferenceLog",
baseline: dict) -> list:
"""Check if a log entry is anomalous relative to baseline."""
anomalies = []
checks = {
"input_token_count": log.input_tokens,
"output_token_count": log.output_tokens,
"latency_ms": log.inference_latency_ms,
"injection_score": log.input_injection_score,
"tool_call_count": len(log.tool_calls),
}
for metric_name, value in checks.items():
if metric_name not in baseline:
continue
b = baseline[metric_name]
# Flag if more than 3 standard deviations from mean
if b["std"] > 0:
z_score = abs(value - b["mean"]) / b["std"]
if z_score > 3:
anomalies.append({
"metric": metric_name,
"value": value,
"z_score": z_score,
"baseline_mean": b["mean"],
"baseline_std": b["std"],
})
return anomaliesStep 3: Anomaly 偵測 Rules
Build 偵測 rules that translate statistical anomalies into actionable 安全 alerts.
# 監控/detection_rules.py
"""
安全-focused 偵測 rules for AI systems.
"""
DETECTION_RULES = [
{
"name": "rapid_fire_injection_attempts",
"description": "Multiple 提示詞注入 attempts from same user",
"condition": "injection_score > 0.7 AND count(session_id, 5m) > 3",
"severity": "high",
"action": "alert_and_rate_limit",
},
{
"name": "system_prompt_extraction_pattern",
"description": "User systematically probing for 系統提示詞",
"condition": "輸出 contains 系統提示詞 fragments "
"AND count(user_id, 30m) > 2",
"severity": "critical",
"action": "alert_and_block_session",
},
{
"name": "unusual_tool_usage",
"description": "Tool calls outside normal patterns",
"condition": "tool_call_count > baseline.p99 "
"OR tool_name not in approved_tools",
"severity": "high",
"action": "alert",
},
{
"name": "output_length_anomaly",
"description": "Unusually long model 輸出 may indicate "
"data exfiltration",
"condition": "output_tokens > baseline.p99 * 2",
"severity": "medium",
"action": "alert",
},
{
"name": "pii_leakage_pattern",
"description": "Multiple PII detections in outputs for same user",
"condition": "output_pii_detected == True "
"AND count(user_id, 1h) > 1",
"severity": "critical",
"action": "alert_and_block_user",
},
{
"name": "model_behavior_drift",
"description": "Sustained shift in 輸出 characteristics "
"suggesting model tampering",
"condition": "rolling_mean(risk_score, 1h) > "
"baseline.mean + 2 * baseline.std",
"severity": "high",
"action": "alert_and_notify_ml_team",
},
]Step 4: Alert Configuration
Configure alerts that are actionable without creating alert fatigue.
# 監控/alerting.py
"""
Alert configuration and routing.
"""
class AlertManager:
def __init__(self, config):
self.channels = config.channels
self.suppression_window = config.suppression_minutes
self.recent_alerts = {}
def send_alert(self, rule_name: str, details: dict,
severity: str):
"""Send alert through appropriate channel."""
# Suppress duplicate alerts within window
key = f"{rule_name}:{details.get('user_id', 'unknown')}"
if self._is_suppressed(key):
return
alert = {
"rule": rule_name,
"severity": severity,
"timestamp": datetime.utcnow().isoformat(),
"details": details,
"recommended_action": self._get_recommendation(rule_name),
}
# Route by severity
if severity == "critical":
self.channels["pager"].send(alert)
self.channels["slack_security"].send(alert)
self.channels["siem"].send(alert)
elif severity == "high":
self.channels["slack_security"].send(alert)
self.channels["siem"].send(alert)
else:
self.channels["siem"].send(alert)
self.recent_alerts[key] = datetime.utcnow()
def _is_suppressed(self, key: str) -> bool:
if key not in self.recent_alerts:
return False
elapsed = (datetime.utcnow() -
self.recent_alerts[key]).total_seconds()
return elapsed < self.suppression_window * 60
def _get_recommendation(self, rule_name: str) -> str:
recommendations = {
"rapid_fire_injection_attempts":
"Review session logs. 考慮 blocking 使用者 "
"if attacks persist. Verify 護欄 held.",
"system_prompt_extraction_pattern":
"Immediately review session. Check whether system "
"prompt was leaked. Rotate 系統提示詞 if exposed.",
"pii_leakage_pattern":
"Review 輸出 logs for actual PII exposure. "
"Initiate data breach procedures if confirmed.",
}
return recommendations.get(rule_name, "Review alert details.")Step 5: SIEM Integration
Integrate AI 監控 data with existing 安全 information and event management platforms.
# 監控/siem_integration.py
"""
SIEM integration for AI 推論 logs.
Supports common SIEM platforms via structured logging.
"""
import json
class SIEMExporter:
def __init__(self, siem_type: str, config: dict):
self.siem_type = siem_type
self.config = config
def export_log(self, inference_log: "InferenceLog"):
"""Export 推論 log in SIEM-compatible format."""
if self.siem_type == "splunk":
return self._to_splunk_hec(inference_log)
elif self.siem_type == "elastic":
return self._to_elastic(inference_log)
elif self.siem_type == "sentinel":
return self._to_sentinel(inference_log)
def _to_splunk_hec(self, log):
"""Format for Splunk HTTP Event Collector."""
return {
"event": {
"source": "ai_security_monitor",
"sourcetype": "ai:推論",
"event_type": "ai_interaction",
"user_id": log.user_id,
"session_id": log.session_id,
"risk_score": log.risk_score,
"guardrail_triggered": log.guardrail_triggered,
"injection_score": log.input_injection_score,
"tool_calls": len(log.tool_calls),
"pii_detected": log.output_pii_detected,
"model_id": log.model_id,
"latency_ms": log.total_latency_ms,
},
"time": log.timestamp.timestamp(),
}Step 6: Dashboard Creation
Build dashboards that provide 安全-relevant visibility at a glance.
Key Dashboard Panels
| Panel | Metric | Purpose |
|---|---|---|
| Injection attempts over time | Count of inputs with injection_score > 0.7 | Track attack volume and trends |
| Top attacking users | Users ranked by injection attempt count | 識別 persistent attackers |
| PII leakage events | Count of outputs where PII was detected | Track data exposure risk |
| 護欄 effectiveness | Ratio of blocked vs. allowed requests | Measure 防禦 coverage |
| Model latency percentiles | p50, p95, p99 推論 latency | Detect DoS or model degradation |
| Tool call distribution | Tool calls by type and frequency | Detect unusual tool usage |
| Risk score distribution | Histogram of per-interaction risk scores | Monitor overall 安全 posture |
| Behavioral drift indicator | Rolling comparison to baseline | Detect gradual model changes |
Step 7: Validation and 測試
測試 the 監控 setup by simulating attacks and verifying 偵測:
# tests/test_monitoring.py
def test_injection_detection_alert(monitoring_system):
"""Verify injection attempts trigger alerts."""
for i in range(5):
monitoring_system.process_log(InferenceLog(
user_id="測試-攻擊者",
input_injection_score=0.85,
session_id="測試-session",
# ... other required fields
))
alerts = monitoring_system.get_recent_alerts(minutes=5)
assert any(a["rule"] == "rapid_fire_injection_attempts"
for a in alerts)
def test_baseline_anomaly_detection(monitoring_system, baseline):
"""Verify anomalies relative to baseline trigger alerts."""
anomalous_log = InferenceLog(
output_tokens=baseline["output_token_count"]["p99"] * 3,
# ... other required fields
)
monitoring_system.process_log(anomalous_log)
alerts = monitoring_system.get_recent_alerts(minutes=5)
assert any(a["rule"] == "output_length_anomaly"
for a in alerts)Further Reading
- 防禦 實作 概覽 — Where 監控 fits in 防禦-in-depth
- Setting Up 護欄 — The prevention layer that feeds 監控
- Incident Response Preparation — What happens when 監控 detects an incident
- Prompt Log Forensics — Forensic analysis of the logs you collect