AI Monitoring Setup
Step-by-step walkthrough for implementing AI system monitoring: inference logging, behavioral anomaly detection, alert configuration, dashboard creation, and integration with existing SIEM platforms.
Monitoring is the detection layer of AI defense-in-depth. Where guardrails prevent known attacks, monitoring detects novel attacks, tracks behavioral drift, and provides the forensic data needed for incident investigation. Effective AI monitoring differs from traditional application monitoring because the system's behavior is non-deterministic — the same input can produce different outputs, making baseline establishment and anomaly detection fundamentally more complex.
Step 1: Inference Logging Architecture
Every interaction with the AI model must be logged with sufficient detail for security analysis and forensic investigation. Design the logging schema before writing any code.
Log Schema Design
# monitoring/log_schema.py
"""
Structured log schema for AI inference monitoring.
Every field has a security justification.
"""
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class InferenceLog:
# Identity and session tracking
log_id: str # Unique log entry identifier
session_id: str # Groups related interactions
user_id: str # Authenticated user identity
source_ip: str # Client IP for rate limiting
timestamp: datetime # UTC timestamp
# Input data
user_input: str # Raw user message
input_tokens: int # Token count for cost/abuse tracking
input_language: str # Detected language
input_injection_score: float # Guardrail injection confidence
# Model data
model_id: str # Model identifier and version
system_prompt_hash: str # Hash of system prompt (detect tampering)
temperature: float # Generation parameters
max_tokens: int
# Output data
model_output: str # Raw model response
output_tokens: int # Token count
output_pii_detected: bool # Whether PII was found in output
output_redactions: list = field(default_factory=list)
# Tool calls (for agentic systems)
tool_calls: list = field(default_factory=list)
tool_results: list = field(default_factory=list)
# Timing
inference_latency_ms: int = 0 # Model response time
total_latency_ms: int = 0 # End-to-end including guardrails
# Security metadata
guardrail_triggered: bool = False
guardrail_action: str = ""
content_policy_violations: list = field(default_factory=list)
risk_score: float = 0.0 # Composite risk scoreStorage Considerations
# monitoring/log_storage.py
"""
Log storage with retention and access controls.
"""
class InferenceLogStore:
def __init__(self, config):
self.hot_store = config.hot_store # 30 days, fast queries
self.warm_store = config.warm_store # 1 year, slower queries
self.cold_store = config.cold_store # 7 years, archive
def store(self, log: "InferenceLog"):
"""
Store log with appropriate retention tier.
Security events get extended retention.
"""
# All logs go to hot store
self.hot_store.write(log)
# Security events also flagged for extended retention
if log.guardrail_triggered or log.risk_score > 0.5:
log.retention_tier = "extended"
def query_session(self, session_id: str) -> list:
"""Retrieve all logs for a session for investigation."""
return self.hot_store.query(
filter={"session_id": session_id},
order_by="timestamp",
)
def query_user_history(self, user_id: str,
days: int = 30) -> list:
"""Retrieve user interaction history for pattern analysis."""
return self.hot_store.query(
filter={"user_id": user_id},
time_range=f"last_{days}d",
order_by="timestamp",
)Step 2: Behavioral Baseline Establishment
Before you can detect anomalies, you need to establish what normal looks like. AI behavioral baselines are statistical profiles of the system's typical interaction patterns.
# monitoring/baseline.py
"""
Behavioral baseline establishment for AI systems.
"""
import numpy as np
from collections import defaultdict
class BehavioralBaseline:
def __init__(self, window_days=14):
self.window_days = window_days
self.metrics = defaultdict(list)
def ingest(self, log: "InferenceLog"):
"""Add a log entry to the baseline calculation."""
self.metrics["input_token_count"].append(log.input_tokens)
self.metrics["output_token_count"].append(log.output_tokens)
self.metrics["latency_ms"].append(log.inference_latency_ms)
self.metrics["injection_score"].append(
log.input_injection_score
)
self.metrics["risk_score"].append(log.risk_score)
self.metrics["tool_call_count"].append(len(log.tool_calls))
def compute_baseline(self) -> dict:
"""Compute statistical baseline from collected metrics."""
baseline = {}
for metric_name, values in self.metrics.items():
if not values:
continue
arr = np.array(values)
baseline[metric_name] = {
"mean": float(np.mean(arr)),
"std": float(np.std(arr)),
"p95": float(np.percentile(arr, 95)),
"p99": float(np.percentile(arr, 99)),
"min": float(np.min(arr)),
"max": float(np.max(arr)),
}
return baseline
def is_anomalous(self, log: "InferenceLog",
baseline: dict) -> list:
"""Check if a log entry is anomalous relative to baseline."""
anomalies = []
checks = {
"input_token_count": log.input_tokens,
"output_token_count": log.output_tokens,
"latency_ms": log.inference_latency_ms,
"injection_score": log.input_injection_score,
"tool_call_count": len(log.tool_calls),
}
for metric_name, value in checks.items():
if metric_name not in baseline:
continue
b = baseline[metric_name]
# Flag if more than 3 standard deviations from mean
if b["std"] > 0:
z_score = abs(value - b["mean"]) / b["std"]
if z_score > 3:
anomalies.append({
"metric": metric_name,
"value": value,
"z_score": z_score,
"baseline_mean": b["mean"],
"baseline_std": b["std"],
})
return anomaliesStep 3: Anomaly Detection Rules
Build detection rules that translate statistical anomalies into actionable security alerts.
# monitoring/detection_rules.py
"""
Security-focused detection rules for AI systems.
"""
DETECTION_RULES = [
{
"name": "rapid_fire_injection_attempts",
"description": "Multiple prompt injection attempts from same user",
"condition": "injection_score > 0.7 AND count(session_id, 5m) > 3",
"severity": "high",
"action": "alert_and_rate_limit",
},
{
"name": "system_prompt_extraction_pattern",
"description": "User systematically probing for system prompt",
"condition": "output contains system prompt fragments "
"AND count(user_id, 30m) > 2",
"severity": "critical",
"action": "alert_and_block_session",
},
{
"name": "unusual_tool_usage",
"description": "Tool calls outside normal patterns",
"condition": "tool_call_count > baseline.p99 "
"OR tool_name not in approved_tools",
"severity": "high",
"action": "alert",
},
{
"name": "output_length_anomaly",
"description": "Unusually long model output may indicate "
"data exfiltration",
"condition": "output_tokens > baseline.p99 * 2",
"severity": "medium",
"action": "alert",
},
{
"name": "pii_leakage_pattern",
"description": "Multiple PII detections in outputs for same user",
"condition": "output_pii_detected == True "
"AND count(user_id, 1h) > 1",
"severity": "critical",
"action": "alert_and_block_user",
},
{
"name": "model_behavior_drift",
"description": "Sustained shift in output characteristics "
"suggesting model tampering",
"condition": "rolling_mean(risk_score, 1h) > "
"baseline.mean + 2 * baseline.std",
"severity": "high",
"action": "alert_and_notify_ml_team",
},
]Step 4: Alert Configuration
Configure alerts that are actionable without creating alert fatigue.
# monitoring/alerting.py
"""
Alert configuration and routing.
"""
class AlertManager:
def __init__(self, config):
self.channels = config.channels
self.suppression_window = config.suppression_minutes
self.recent_alerts = {}
def send_alert(self, rule_name: str, details: dict,
severity: str):
"""Send alert through appropriate channel."""
# Suppress duplicate alerts within window
key = f"{rule_name}:{details.get('user_id', 'unknown')}"
if self._is_suppressed(key):
return
alert = {
"rule": rule_name,
"severity": severity,
"timestamp": datetime.utcnow().isoformat(),
"details": details,
"recommended_action": self._get_recommendation(rule_name),
}
# Route by severity
if severity == "critical":
self.channels["pager"].send(alert)
self.channels["slack_security"].send(alert)
self.channels["siem"].send(alert)
elif severity == "high":
self.channels["slack_security"].send(alert)
self.channels["siem"].send(alert)
else:
self.channels["siem"].send(alert)
self.recent_alerts[key] = datetime.utcnow()
def _is_suppressed(self, key: str) -> bool:
if key not in self.recent_alerts:
return False
elapsed = (datetime.utcnow() -
self.recent_alerts[key]).total_seconds()
return elapsed < self.suppression_window * 60
def _get_recommendation(self, rule_name: str) -> str:
recommendations = {
"rapid_fire_injection_attempts":
"Review session logs. Consider blocking the user "
"if attacks persist. Verify guardrails held.",
"system_prompt_extraction_pattern":
"Immediately review session. Check whether system "
"prompt was leaked. Rotate system prompt if exposed.",
"pii_leakage_pattern":
"Review output logs for actual PII exposure. "
"Initiate data breach procedures if confirmed.",
}
return recommendations.get(rule_name, "Review alert details.")Step 5: SIEM Integration
Integrate AI monitoring data with existing security information and event management platforms.
# monitoring/siem_integration.py
"""
SIEM integration for AI inference logs.
Supports common SIEM platforms via structured logging.
"""
import json
class SIEMExporter:
def __init__(self, siem_type: str, config: dict):
self.siem_type = siem_type
self.config = config
def export_log(self, inference_log: "InferenceLog"):
"""Export inference log in SIEM-compatible format."""
if self.siem_type == "splunk":
return self._to_splunk_hec(inference_log)
elif self.siem_type == "elastic":
return self._to_elastic(inference_log)
elif self.siem_type == "sentinel":
return self._to_sentinel(inference_log)
def _to_splunk_hec(self, log):
"""Format for Splunk HTTP Event Collector."""
return {
"event": {
"source": "ai_security_monitor",
"sourcetype": "ai:inference",
"event_type": "ai_interaction",
"user_id": log.user_id,
"session_id": log.session_id,
"risk_score": log.risk_score,
"guardrail_triggered": log.guardrail_triggered,
"injection_score": log.input_injection_score,
"tool_calls": len(log.tool_calls),
"pii_detected": log.output_pii_detected,
"model_id": log.model_id,
"latency_ms": log.total_latency_ms,
},
"time": log.timestamp.timestamp(),
}Step 6: Dashboard Creation
Build dashboards that provide security-relevant visibility at a glance.
Key Dashboard Panels
| Panel | Metric | Purpose |
|---|---|---|
| Injection attempts over time | Count of inputs with injection_score > 0.7 | Track attack volume and trends |
| Top attacking users | Users ranked by injection attempt count | Identify persistent attackers |
| PII leakage events | Count of outputs where PII was detected | Track data exposure risk |
| Guardrail effectiveness | Ratio of blocked vs. allowed requests | Measure defense coverage |
| Model latency percentiles | p50, p95, p99 inference latency | Detect DoS or model degradation |
| Tool call distribution | Tool calls by type and frequency | Detect unusual tool usage |
| Risk score distribution | Histogram of per-interaction risk scores | Monitor overall security posture |
| Behavioral drift indicator | Rolling comparison to baseline | Detect gradual model changes |
Step 7: Validation and Testing
Test the monitoring setup by simulating attacks and verifying detection:
# tests/test_monitoring.py
def test_injection_detection_alert(monitoring_system):
"""Verify injection attempts trigger alerts."""
for i in range(5):
monitoring_system.process_log(InferenceLog(
user_id="test-attacker",
input_injection_score=0.85,
session_id="test-session",
# ... other required fields
))
alerts = monitoring_system.get_recent_alerts(minutes=5)
assert any(a["rule"] == "rapid_fire_injection_attempts"
for a in alerts)
def test_baseline_anomaly_detection(monitoring_system, baseline):
"""Verify anomalies relative to baseline trigger alerts."""
anomalous_log = InferenceLog(
output_tokens=baseline["output_token_count"]["p99"] * 3,
# ... other required fields
)
monitoring_system.process_log(anomalous_log)
alerts = monitoring_system.get_recent_alerts(minutes=5)
assert any(a["rule"] == "output_length_anomaly"
for a in alerts)Further Reading
- Defense Implementation Overview — Where monitoring fits in defense-in-depth
- Setting Up Guardrails — The prevention layer that feeds monitoring
- Incident Response Preparation — What happens when monitoring detects an incident
- Prompt Log Forensics — Forensic analysis of the logs you collect