Automated AI Incident Triage
Building automated triage systems for AI security incidents using rule-based engines, anomaly detection, and LLM-assisted classification.
Overview
As organizations deploy AI systems at scale, the volume of security-relevant events generated by these systems can overwhelm human analysts. A production LLM serving thousands of users generates a continuous stream of potential security signals: anomalous prompts, unusual output patterns, confidence score deviations, latency spikes, and guardrail trigger events. Without automated triage, security teams face a choice between reviewing every alert (unsustainable) or setting thresholds so high that real incidents are missed (dangerous).
Automated AI incident triage applies classification, prioritization, and routing logic to the stream of AI security events, producing a ranked queue of incidents for human review. The goal is not to replace human judgment but to ensure that the most important incidents reach analysts first and that low-priority events are documented without consuming analyst time.
This article covers the architecture and implementation of automated triage systems for AI security incidents, including rule-based engines for known patterns, statistical anomaly detection for novel threats, and evaluation frameworks for measuring triage effectiveness.
Triage Architecture
Event Ingestion
The triage system ingests events from multiple sources across the AI serving stack:
"""
AI incident triage system -- event ingestion and classification.
"""
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timezone
from typing import Any
class EventSource(Enum):
GUARDRAIL = "guardrail"
MODEL_MONITOR = "model_monitor"
LOG_ANALYZER = "log_analyzer"
USER_REPORT = "user_report"
SECURITY_SCANNER = "security_scanner"
ANOMALY_DETECTOR = "anomaly_detector"
class TriagePriority(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
INFORMATIONAL = 5
class IncidentCategory(Enum):
PROMPT_INJECTION = "prompt_injection"
DATA_EXFILTRATION = "data_exfiltration"
MODEL_EVASION = "model_evasion"
JAILBREAK = "jailbreak"
OUTPUT_ANOMALY = "output_anomaly"
PERFORMANCE_DEGRADATION = "performance_degradation"
UNAUTHORIZED_ACCESS = "unauthorized_access"
DATA_POISONING = "data_poisoning"
MODEL_THEFT = "model_theft"
UNKNOWN = "unknown"
@dataclass
class SecurityEvent:
"""Raw security event from the AI serving stack."""
event_id: str
timestamp: str
source: EventSource
event_type: str
severity_hint: str # Source-provided severity, may be unreliable
payload: dict[str, Any]
model_id: str | None = None
user_id: str | None = None
session_id: str | None = None
ip_address: str | None = None
@dataclass
class TriageResult:
"""Result of automated triage classification."""
event_id: str
priority: TriagePriority
category: IncidentCategory
confidence: float # 0.0 to 1.0
rationale: str
recommended_actions: list[str]
requires_human_review: bool
correlated_events: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)Rule-Based Triage Engine
The first triage layer applies deterministic rules that match known incident patterns. These rules are fast, predictable, and auditable.
class RuleBasedTriageEngine:
"""
Classify AI security events using deterministic rules.
Rules are evaluated in priority order. The first matching rule
determines the triage classification. This ensures that critical
patterns are always caught regardless of statistical model behavior.
"""
def __init__(self):
self.rules: list[dict] = self._build_default_rules()
def _build_default_rules(self) -> list[dict]:
return [
# Critical: Known data exfiltration patterns
{
"name": "data_exfiltration_output",
"priority": TriagePriority.CRITICAL,
"category": IncidentCategory.DATA_EXFILTRATION,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "pii_output"
and e.payload.get("pii_types_detected", 0) >= 3
),
"rationale": "Multiple PII types detected in model output",
"actions": [
"Block response delivery immediately",
"Preserve full conversation context",
"Notify data protection officer",
],
},
# Critical: Model weight access
{
"name": "model_theft_attempt",
"priority": TriagePriority.CRITICAL,
"category": IncidentCategory.MODEL_THEFT,
"condition": lambda e: (
e.event_type == "api_access"
and e.payload.get("endpoint") in ("/v1/models/weights", "/v1/models/export")
and e.payload.get("authorized") is False
),
"rationale": "Unauthorized model weight access attempt",
"actions": [
"Block requesting IP",
"Revoke associated API keys",
"Full access log review for this user",
],
},
# High: Prompt injection detected
{
"name": "prompt_injection_detected",
"priority": TriagePriority.HIGH,
"category": IncidentCategory.PROMPT_INJECTION,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "prompt_injection"
and e.payload.get("injection_confidence", 0) > 0.8
),
"rationale": "High-confidence prompt injection detected by guardrail",
"actions": [
"Log full prompt for forensic analysis",
"Check if injection succeeded despite guardrail",
"Review user's recent session history",
],
},
# High: Jailbreak with safety bypass
{
"name": "jailbreak_safety_bypass",
"priority": TriagePriority.HIGH,
"category": IncidentCategory.JAILBREAK,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "safety_violation"
and e.payload.get("output_was_delivered", False)
),
"rationale": "Safety violation detected but output was delivered to user",
"actions": [
"Review delivered output for harmful content",
"Determine if guardrail timing was too slow",
"Update guardrail to block pre-delivery",
],
},
# Medium: Repeated guardrail triggers from single user
{
"name": "repeated_guardrail_triggers",
"priority": TriagePriority.MEDIUM,
"category": IncidentCategory.JAILBREAK,
"condition": lambda e: (
e.payload.get("guardrail_triggered") is not None
and e.payload.get("user_trigger_count_1h", 0) > 10
),
"rationale": "User has triggered guardrails >10 times in 1 hour",
"actions": [
"Apply rate limiting to this user",
"Review full session for attack patterns",
],
},
# Medium: Output anomaly
{
"name": "output_distribution_anomaly",
"priority": TriagePriority.MEDIUM,
"category": IncidentCategory.OUTPUT_ANOMALY,
"condition": lambda e: (
e.source == EventSource.ANOMALY_DETECTOR
and e.payload.get("anomaly_score", 0) > 3.0
),
"rationale": "Model output distribution anomaly detected",
"actions": [
"Compare recent outputs against baseline",
"Check for model drift or serving errors",
],
},
# Low: Single guardrail trigger
{
"name": "single_guardrail_trigger",
"priority": TriagePriority.LOW,
"category": IncidentCategory.UNKNOWN,
"condition": lambda e: (
e.payload.get("guardrail_triggered") is not None
and e.payload.get("user_trigger_count_1h", 0) <= 3
),
"rationale": "Isolated guardrail trigger, likely benign probing or accidental",
"actions": ["Log and monitor for escalation"],
},
]
def evaluate(self, event: SecurityEvent) -> TriageResult | None:
"""Evaluate an event against all rules, return first match."""
for rule in self.rules:
try:
if rule["condition"](event):
return TriageResult(
event_id=event.event_id,
priority=rule["priority"],
category=rule["category"],
confidence=1.0, # Rule-based = deterministic
rationale=rule["rationale"],
recommended_actions=rule["actions"],
requires_human_review=rule["priority"].value <= 2,
tags=["rule_based", rule["name"]],
)
except (KeyError, TypeError):
continue
return NoneStatistical Anomaly Triage
Events that do not match deterministic rules are evaluated by a statistical anomaly detection layer.
import numpy as np
from collections import deque
class StatisticalTriageEngine:
"""
Triage AI security events using statistical anomaly detection.
Maintains rolling baselines of event features and flags
statistically significant deviations.
"""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.feature_windows: dict[str, deque] = {}
def update_baseline(self, feature_name: str, value: float) -> None:
if feature_name not in self.feature_windows:
self.feature_windows[feature_name] = deque(maxlen=self.window_size)
self.feature_windows[feature_name].append(value)
def compute_anomaly_score(self, feature_name: str, value: float) -> float:
"""Compute z-score anomaly for a feature value against its baseline."""
if feature_name not in self.feature_windows:
return 0.0
window = self.feature_windows[feature_name]
if len(window) < 30:
return 0.0
values = np.array(window)
mean = float(np.mean(values))
std = float(np.std(values))
if std < 1e-10:
return 0.0
return abs(value - mean) / std
def evaluate(self, event: SecurityEvent) -> TriageResult | None:
"""
Evaluate an event using statistical anomaly detection.
Extracts numerical features from the event payload and
computes anomaly scores.
"""
anomaly_scores = {}
# Extract and evaluate standard features
feature_mappings = {
"response_latency_ms": event.payload.get("latency_ms"),
"output_token_count": event.payload.get("output_tokens"),
"confidence_score": event.payload.get("top_confidence"),
"guardrail_score": event.payload.get("safety_score"),
"input_length": event.payload.get("input_length"),
}
for feature_name, value in feature_mappings.items():
if value is not None:
score = self.compute_anomaly_score(feature_name, float(value))
anomaly_scores[feature_name] = score
self.update_baseline(feature_name, float(value))
if not anomaly_scores:
return None
max_anomaly = max(anomaly_scores.values())
max_feature = max(anomaly_scores, key=anomaly_scores.get)
if max_anomaly < 2.5:
return None # Not anomalous enough
# Map anomaly score to priority
if max_anomaly > 5.0:
priority = TriagePriority.HIGH
elif max_anomaly > 3.5:
priority = TriagePriority.MEDIUM
else:
priority = TriagePriority.LOW
return TriageResult(
event_id=event.event_id,
priority=priority,
category=IncidentCategory.UNKNOWN,
confidence=min(max_anomaly / 5.0, 1.0),
rationale=f"Statistical anomaly detected: {max_feature} (z={max_anomaly:.2f})",
recommended_actions=[
f"Investigate anomalous {max_feature}",
"Compare against recent baseline",
"Check for correlated anomalies in other features",
],
requires_human_review=max_anomaly > 4.0,
tags=["statistical", f"anomaly_{max_feature}"],
)Combined Triage Pipeline
class TriagePipeline:
"""
Combined triage pipeline that runs events through multiple
classification engines in priority order.
"""
def __init__(self):
self.rule_engine = RuleBasedTriageEngine()
self.stats_engine = StatisticalTriageEngine()
def triage(self, event: SecurityEvent) -> TriageResult:
"""
Process a security event through the full triage pipeline.
1. Rule-based engine (highest priority, deterministic)
2. Statistical anomaly engine
3. Default classification if no engine matches
"""
# Try rule-based first
result = self.rule_engine.evaluate(event)
if result:
return result
# Try statistical anomaly detection
result = self.stats_engine.evaluate(event)
if result:
return result
# Default: low-priority informational event
return TriageResult(
event_id=event.event_id,
priority=TriagePriority.INFORMATIONAL,
category=IncidentCategory.UNKNOWN,
confidence=0.5,
rationale="No triage rules or anomalies matched; logged for baseline",
recommended_actions=["No immediate action required"],
requires_human_review=False,
tags=["unclassified"],
)Event Correlation
Temporal Correlation
Individual events may appear innocuous in isolation but reveal an attack pattern when correlated temporally.
from collections import defaultdict
class TemporalCorrelator:
"""
Correlate security events across time windows to detect
multi-step attack patterns.
"""
def __init__(self, correlation_window_seconds: int = 300):
self.window = correlation_window_seconds
self.recent_events: deque[SecurityEvent] = deque()
self.user_event_counts: dict[str, int] = defaultdict(int)
def add_event(self, event: SecurityEvent) -> dict | None:
"""
Add an event and check for temporal correlations.
Returns a correlation alert if a pattern is detected.
"""
now = datetime.now(timezone.utc)
self.recent_events.append(event)
# Expire old events
while self.recent_events:
oldest = self.recent_events[0]
oldest_time = datetime.fromisoformat(oldest.timestamp)
if (now - oldest_time).total_seconds() > self.window:
self.recent_events.popleft()
else:
break
# Track per-user event frequency
if event.user_id:
self.user_event_counts[event.user_id] += 1
# Check for reconnaissance pattern: multiple different
# attack types from the same user in the window
if event.user_id:
user_events = [
e for e in self.recent_events
if e.user_id == event.user_id
]
event_types = set(e.event_type for e in user_events)
if len(event_types) >= 3 and len(user_events) >= 5:
return {
"correlation_type": "reconnaissance_pattern",
"user_id": event.user_id,
"event_count": len(user_events),
"distinct_types": list(event_types),
"severity": "HIGH",
"description": (
f"User {event.user_id} has generated {len(user_events)} "
f"events of {len(event_types)} different types in "
f"{self.window}s, suggesting systematic probing"
),
}
return NoneEvaluation and Metrics
Triage Performance Metrics
@dataclass
class TriageMetrics:
"""Performance metrics for the triage system."""
total_events: int
true_positives: int # Correctly escalated real incidents
false_positives: int # Incorrectly escalated non-incidents
true_negatives: int # Correctly dismissed non-incidents
false_negatives: int # Missed real incidents (most dangerous)
mean_triage_time_ms: float # Average time to produce triage result
mean_time_to_human_review_minutes: float
@property
def precision(self) -> float:
denom = self.true_positives + self.false_positives
return self.true_positives / denom if denom > 0 else 0.0
@property
def recall(self) -> float:
denom = self.true_positives + self.false_negatives
return self.true_positives / denom if denom > 0 else 0.0
@property
def f1_score(self) -> float:
p, r = self.precision, self.recall
return 2 * p * r / (p + r) if (p + r) > 0 else 0.0Tuning for AI Security
In AI security triage, the cost of false negatives (missed incidents) is typically much higher than the cost of false positives (unnecessary escalations). Triage systems should be tuned for high recall, accepting a higher false positive rate:
| Use Case | Target Recall | Acceptable FP Rate | Rationale |
|---|---|---|---|
| Safety-critical AI (medical, autonomous) | > 99% | Up to 30% | Missing an incident could cause physical harm |
| Customer-facing LLM | > 95% | Up to 20% | Data breach or harmful output risk |
| Internal tooling | > 90% | Up to 15% | Lower external exposure |
| Research/experimentation | > 80% | Up to 10% | Controlled environment |
Operational Considerations
Alert Fatigue Management
High false positive rates cause alert fatigue, reducing analyst effectiveness. Mitigate through:
- Progressive disclosure: Show summary first, expand details on demand
- Deduplication: Group related events into single incidents
- Adaptive thresholds: Automatically raise thresholds for event types with consistently high false positive rates
- Analyst feedback loops: Allow analysts to mark false positives, feeding back into rule and model tuning
Integration with Incident Management
The triage system should integrate with existing incident management platforms (PagerDuty, Opsgenie, Jira, ServiceNow) through standard webhooks:
def format_triage_for_pagerduty(result: TriageResult) -> dict:
"""Format a triage result as a PagerDuty event."""
severity_map = {
TriagePriority.CRITICAL: "critical",
TriagePriority.HIGH: "error",
TriagePriority.MEDIUM: "warning",
TriagePriority.LOW: "info",
TriagePriority.INFORMATIONAL: "info",
}
return {
"routing_key": "YOUR_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": f"AI Security: {result.category.value} [{result.priority.name}]",
"severity": severity_map[result.priority],
"source": "ai-incident-triage",
"custom_details": {
"event_id": result.event_id,
"category": result.category.value,
"confidence": result.confidence,
"rationale": result.rationale,
"recommended_actions": result.recommended_actions,
},
},
}References
- OWASP. (2025). OWASP Top 10 for Large Language Model Applications. https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS. (2024). Adversarial Threat Landscape for Artificial Intelligence Systems. https://atlas.mitre.org/
- NIST. (2023). Artificial Intelligence Risk Management Framework (AI RMF 1.0). NIST AI 100-1. https://doi.org/10.6028/NIST.AI.100-1
- Bhatt, S. et al. (2014). The Operational Role of Security Information and Event Management Systems. IEEE Security & Privacy, 12(5). https://doi.org/10.1109/MSP.2014.103