Automated AI Incident Triage
Building automated triage systems for AI security incidents using rule-based engines, anomaly detection, and LLM-assisted classification.
概覽
As organizations deploy AI systems at scale, the volume of 安全-relevant events generated by these systems can overwhelm human analysts. A production LLM serving thousands of users generates a continuous stream of potential 安全 signals: anomalous prompts, unusual 輸出 patterns, confidence score deviations, latency spikes, and 護欄 trigger events. Without automated triage, 安全 teams face a choice between reviewing every alert (unsustainable) or setting thresholds so high that real incidents are missed (dangerous).
Automated AI incident triage applies classification, prioritization, and routing logic to the stream of AI 安全 events, producing a ranked queue of incidents for human review. The goal is not to replace human judgment but to ensure that the most important incidents reach analysts first and that low-priority events are documented without consuming analyst time.
This article covers the architecture and 實作 of automated triage systems for AI 安全 incidents, including rule-based engines for known patterns, statistical anomaly 偵測 for novel threats, and 評估 frameworks for measuring triage effectiveness.
Triage Architecture
Event Ingestion
The triage system ingests events from multiple sources across the AI serving stack:
"""
AI incident triage system -- event ingestion and classification.
"""
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timezone
from typing import Any
class EventSource(Enum):
GUARDRAIL = "護欄"
MODEL_MONITOR = "model_monitor"
LOG_ANALYZER = "log_analyzer"
USER_REPORT = "user_report"
SECURITY_SCANNER = "security_scanner"
ANOMALY_DETECTOR = "anomaly_detector"
class TriagePriority(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
INFORMATIONAL = 5
class IncidentCategory(Enum):
PROMPT_INJECTION = "prompt_injection"
DATA_EXFILTRATION = "data_exfiltration"
MODEL_EVASION = "model_evasion"
JAILBREAK = "越獄"
OUTPUT_ANOMALY = "output_anomaly"
PERFORMANCE_DEGRADATION = "performance_degradation"
UNAUTHORIZED_ACCESS = "unauthorized_access"
DATA_POISONING = "data_poisoning"
MODEL_THEFT = "model_theft"
UNKNOWN = "unknown"
@dataclass
class SecurityEvent:
"""Raw 安全 event from the AI serving stack."""
event_id: str
timestamp: str
source: EventSource
event_type: str
severity_hint: str # Source-provided severity, may be unreliable
payload: dict[str, Any]
model_id: str | None = None
user_id: str | None = None
session_id: str | None = None
ip_address: str | None = None
@dataclass
class TriageResult:
"""Result of automated triage classification."""
event_id: str
priority: TriagePriority
category: IncidentCategory
confidence: float # 0.0 to 1.0
rationale: str
recommended_actions: list[str]
requires_human_review: bool
correlated_events: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)Rule-Based Triage Engine
The first triage layer applies deterministic rules that match known incident patterns. These rules are fast, predictable, and auditable.
class RuleBasedTriageEngine:
"""
Classify AI 安全 events using deterministic rules.
Rules are evaluated in priority order. The first matching rule
determines the triage classification. This ensures that critical
patterns are always caught regardless of statistical model behavior.
"""
def __init__(self):
self.rules: list[dict] = self._build_default_rules()
def _build_default_rules(self) -> list[dict]:
return [
# Critical: Known data exfiltration patterns
{
"name": "data_exfiltration_output",
"priority": TriagePriority.CRITICAL,
"category": IncidentCategory.DATA_EXFILTRATION,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "pii_output"
and e.payload.get("pii_types_detected", 0) >= 3
),
"rationale": "Multiple PII types detected in model 輸出",
"actions": [
"Block response delivery immediately",
"Preserve full conversation context",
"Notify data protection officer",
],
},
# Critical: Model weight access
{
"name": "model_theft_attempt",
"priority": TriagePriority.CRITICAL,
"category": IncidentCategory.MODEL_THEFT,
"condition": lambda e: (
e.event_type == "api_access"
and e.payload.get("endpoint") in ("/v1/models/weights", "/v1/models/export")
and e.payload.get("authorized") is False
),
"rationale": "Unauthorized model weight access attempt",
"actions": [
"Block requesting IP",
"Revoke associated API keys",
"Full access log review for this user",
],
},
# High: Prompt injection detected
{
"name": "prompt_injection_detected",
"priority": TriagePriority.HIGH,
"category": IncidentCategory.PROMPT_INJECTION,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "prompt_injection"
and e.payload.get("injection_confidence", 0) > 0.8
),
"rationale": "High-confidence 提示詞注入 detected by 護欄",
"actions": [
"Log full prompt for forensic analysis",
"Check if injection succeeded despite 護欄",
"Review user's recent session history",
],
},
# High: 越獄 with 安全 bypass
{
"name": "jailbreak_safety_bypass",
"priority": TriagePriority.HIGH,
"category": IncidentCategory.JAILBREAK,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "safety_violation"
and e.payload.get("output_was_delivered", False)
),
"rationale": "安全 violation detected but 輸出 was delivered to user",
"actions": [
"Review delivered 輸出 for harmful content",
"Determine if 護欄 timing was too slow",
"Update 護欄 to block pre-delivery",
],
},
# Medium: Repeated 護欄 triggers from single user
{
"name": "repeated_guardrail_triggers",
"priority": TriagePriority.MEDIUM,
"category": IncidentCategory.JAILBREAK,
"condition": lambda e: (
e.payload.get("guardrail_triggered") is not None
and e.payload.get("user_trigger_count_1h", 0) > 10
),
"rationale": "User has triggered 護欄 >10 times in 1 hour",
"actions": [
"Apply rate limiting to this user",
"Review full session for attack patterns",
],
},
# Medium: 輸出 anomaly
{
"name": "output_distribution_anomaly",
"priority": TriagePriority.MEDIUM,
"category": IncidentCategory.OUTPUT_ANOMALY,
"condition": lambda e: (
e.source == EventSource.ANOMALY_DETECTOR
and e.payload.get("anomaly_score", 0) > 3.0
),
"rationale": "Model 輸出 distribution anomaly detected",
"actions": [
"Compare recent outputs against baseline",
"Check for model drift or serving errors",
],
},
# Low: Single 護欄 trigger
{
"name": "single_guardrail_trigger",
"priority": TriagePriority.LOW,
"category": IncidentCategory.UNKNOWN,
"condition": lambda e: (
e.payload.get("guardrail_triggered") is not None
and e.payload.get("user_trigger_count_1h", 0) <= 3
),
"rationale": "Isolated 護欄 trigger, likely benign probing or accidental",
"actions": ["Log and monitor for escalation"],
},
]
def 評估(self, event: SecurityEvent) -> TriageResult | None:
"""評估 an event against all rules, return first match."""
for rule in self.rules:
try:
if rule["condition"](event):
return TriageResult(
event_id=event.event_id,
priority=rule["priority"],
category=rule["category"],
confidence=1.0, # Rule-based = deterministic
rationale=rule["rationale"],
recommended_actions=rule["actions"],
requires_human_review=rule["priority"].value <= 2,
tags=["rule_based", rule["name"]],
)
except (KeyError, TypeError):
continue
return NoneStatistical Anomaly Triage
Events that do not match deterministic rules are evaluated by a statistical anomaly 偵測 layer.
import numpy as np
from collections import deque
class StatisticalTriageEngine:
"""
Triage AI 安全 events using statistical anomaly 偵測.
Maintains rolling baselines of event features and flags
statistically significant deviations.
"""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.feature_windows: dict[str, deque] = {}
def update_baseline(self, feature_name: str, value: float) -> None:
if feature_name not in self.feature_windows:
self.feature_windows[feature_name] = deque(maxlen=self.window_size)
self.feature_windows[feature_name].append(value)
def compute_anomaly_score(self, feature_name: str, value: float) -> float:
"""Compute z-score anomaly for a feature value against its baseline."""
if feature_name not in self.feature_windows:
return 0.0
window = self.feature_windows[feature_name]
if len(window) < 30:
return 0.0
values = np.array(window)
mean = float(np.mean(values))
std = float(np.std(values))
if std < 1e-10:
return 0.0
return abs(value - mean) / std
def 評估(self, event: SecurityEvent) -> TriageResult | None:
"""
評估 an event using statistical anomaly 偵測.
Extracts numerical features from the event payload and
computes anomaly scores.
"""
anomaly_scores = {}
# Extract and 評估 standard features
feature_mappings = {
"response_latency_ms": event.payload.get("latency_ms"),
"output_token_count": event.payload.get("output_tokens"),
"confidence_score": event.payload.get("top_confidence"),
"guardrail_score": event.payload.get("safety_score"),
"input_length": event.payload.get("input_length"),
}
for feature_name, value in feature_mappings.items():
if value is not None:
score = self.compute_anomaly_score(feature_name, float(value))
anomaly_scores[feature_name] = score
self.update_baseline(feature_name, float(value))
if not anomaly_scores:
return None
max_anomaly = max(anomaly_scores.values())
max_feature = max(anomaly_scores, key=anomaly_scores.get)
if max_anomaly < 2.5:
return None # Not anomalous enough
# Map anomaly score to priority
if max_anomaly > 5.0:
priority = TriagePriority.HIGH
elif max_anomaly > 3.5:
priority = TriagePriority.MEDIUM
else:
priority = TriagePriority.LOW
return TriageResult(
event_id=event.event_id,
priority=priority,
category=IncidentCategory.UNKNOWN,
confidence=min(max_anomaly / 5.0, 1.0),
rationale=f"Statistical anomaly detected: {max_feature} (z={max_anomaly:.2f})",
recommended_actions=[
f"Investigate anomalous {max_feature}",
"Compare against recent baseline",
"Check for correlated anomalies in other features",
],
requires_human_review=max_anomaly > 4.0,
tags=["statistical", f"anomaly_{max_feature}"],
)Combined Triage Pipeline
class TriagePipeline:
"""
Combined triage pipeline that runs events through multiple
classification engines in priority order.
"""
def __init__(self):
self.rule_engine = RuleBasedTriageEngine()
self.stats_engine = StatisticalTriageEngine()
def triage(self, event: SecurityEvent) -> TriageResult:
"""
Process a 安全 event through the full triage pipeline.
1. Rule-based engine (highest priority, deterministic)
2. Statistical anomaly engine
3. Default classification if no engine matches
"""
# Try rule-based first
result = self.rule_engine.評估(event)
if result:
return result
# Try statistical anomaly 偵測
result = self.stats_engine.評估(event)
if result:
return result
# Default: low-priority informational event
return TriageResult(
event_id=event.event_id,
priority=TriagePriority.INFORMATIONAL,
category=IncidentCategory.UNKNOWN,
confidence=0.5,
rationale="No triage rules or anomalies matched; logged for baseline",
recommended_actions=["No immediate action required"],
requires_human_review=False,
tags=["unclassified"],
)Event Correlation
Temporal Correlation
Individual events may appear innocuous in isolation but reveal an attack pattern when correlated temporally.
from collections import defaultdict
class TemporalCorrelator:
"""
Correlate 安全 events across time windows to detect
multi-step attack patterns.
"""
def __init__(self, correlation_window_seconds: int = 300):
self.window = correlation_window_seconds
self.recent_events: deque[SecurityEvent] = deque()
self.user_event_counts: dict[str, int] = defaultdict(int)
def add_event(self, event: SecurityEvent) -> dict | None:
"""
Add an event and check for temporal correlations.
Returns a correlation alert if a pattern is detected.
"""
now = datetime.now(timezone.utc)
self.recent_events.append(event)
# Expire old events
while self.recent_events:
oldest = self.recent_events[0]
oldest_time = datetime.fromisoformat(oldest.timestamp)
if (now - oldest_time).total_seconds() > self.window:
self.recent_events.popleft()
else:
break
# Track per-user event frequency
if event.user_id:
self.user_event_counts[event.user_id] += 1
# Check for reconnaissance pattern: multiple different
# attack types from the same user in the window
if event.user_id:
user_events = [
e for e in self.recent_events
if e.user_id == event.user_id
]
event_types = set(e.event_type for e in user_events)
if len(event_types) >= 3 and len(user_events) >= 5:
return {
"correlation_type": "reconnaissance_pattern",
"user_id": event.user_id,
"event_count": len(user_events),
"distinct_types": list(event_types),
"severity": "HIGH",
"description": (
f"User {event.user_id} has generated {len(user_events)} "
f"events of {len(event_types)} different types in "
f"{self.window}s, suggesting systematic probing"
),
}
return None評估 and Metrics
Triage Performance Metrics
@dataclass
class TriageMetrics:
"""Performance metrics for the triage system."""
total_events: int
true_positives: int # Correctly escalated real incidents
false_positives: int # Incorrectly escalated non-incidents
true_negatives: int # Correctly dismissed non-incidents
false_negatives: int # Missed real incidents (most dangerous)
mean_triage_time_ms: float # Average time to produce triage result
mean_time_to_human_review_minutes: float
@property
def precision(self) -> float:
denom = self.true_positives + self.false_positives
return self.true_positives / denom if denom > 0 else 0.0
@property
def recall(self) -> float:
denom = self.true_positives + self.false_negatives
return self.true_positives / denom if denom > 0 else 0.0
@property
def f1_score(self) -> float:
p, r = self.precision, self.recall
return 2 * p * r / (p + r) if (p + r) > 0 else 0.0Tuning for AI 安全
In AI 安全 triage, the cost of false negatives (missed incidents) is typically much higher than the cost of false positives (unnecessary escalations). Triage systems should be tuned for high recall, accepting a higher false positive rate:
| Use Case | Target Recall | Acceptable FP Rate | Rationale |
|---|---|---|---|
| 安全-critical AI (medical, autonomous) | > 99% | Up to 30% | Missing an incident could cause physical harm |
| Customer-facing LLM | > 95% | Up to 20% | Data breach or harmful 輸出 risk |
| Internal tooling | > 90% | Up to 15% | Lower external exposure |
| Research/experimentation | > 80% | Up to 10% | Controlled environment |
Operational Considerations
Alert Fatigue Management
High false positive rates cause alert fatigue, reducing analyst effectiveness. Mitigate through:
- Progressive disclosure: Show summary first, expand details on demand
- Deduplication: Group related events into single incidents
- Adaptive thresholds: Automatically raise thresholds for event types with consistently high false positive rates
- Analyst feedback loops: Allow analysts to mark false positives, feeding back into rule and model tuning
Integration with Incident Management
The triage system should integrate with existing incident management platforms (PagerDuty, Opsgenie, Jira, ServiceNow) through standard webhooks:
def format_triage_for_pagerduty(result: TriageResult) -> dict:
"""Format a triage result as a PagerDuty event."""
severity_map = {
TriagePriority.CRITICAL: "critical",
TriagePriority.HIGH: "error",
TriagePriority.MEDIUM: "warning",
TriagePriority.LOW: "info",
TriagePriority.INFORMATIONAL: "info",
}
return {
"routing_key": "YOUR_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": f"AI 安全: {result.category.value} [{result.priority.name}]",
"severity": severity_map[result.priority],
"source": "ai-incident-triage",
"custom_details": {
"event_id": result.event_id,
"category": result.category.value,
"confidence": result.confidence,
"rationale": result.rationale,
"recommended_actions": result.recommended_actions,
},
},
}參考文獻
- OWASP. (2025). OWASP Top 10 for 大型語言模型 Applications. https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS. (2024). 對抗性 Threat Landscape for Artificial Intelligence Systems. https://atlas.mitre.org/
- NIST. (2023). Artificial Intelligence Risk Management Framework (AI RMF 1.0). NIST AI 100-1. https://doi.org/10.6028/NIST.AI.100-1
- Bhatt, S. et al. (2014). The Operational Role of 安全 Information and Event Management Systems. IEEE 安全 & Privacy, 12(5). https://doi.org/10.1109/MSP.2014.103