Geautomatiseerde triage van AI-incidenten
Geautomatiseerde triagesystemen bouwen voor AI-beveiligingsincidenten met op regels gebaseerde engines, anomaliedetectie en LLM-ondersteunde classificatie.
Overzicht
Naarmate organisaties AI-systemen op grote schaal inzetten, kan het volume aan beveiligingsrelevante gebeurtenissen dat deze systemen genereren menselijke analisten overweldigen. Een productie-LLM die duizenden gebruikers bedient, genereert een continue stroom potentiële beveiligingssignalen: afwijkende prompts, ongebruikelijke outputpatronen, afwijkingen in confidence-scores, latentiepieken en gebeurtenissen waarbij guardrails worden getriggerd. Zonder geautomatiseerde triage staan beveiligingsteams voor een keuze tussen het beoordelen van elke waarschuwing (onhoudbaar) of het zo hoog instellen van drempels dat echte incidenten worden gemist (gevaarlijk).
Geautomatiseerde triage van AI-incidenten past classificatie-, prioriterings- en routeringslogica toe op de stroom van AI-beveiligingsgebeurtenissen en produceert een gerangschikte wachtrij van incidenten voor menselijke beoordeling. Het doel is niet om menselijk oordeel te vervangen, maar om ervoor te zorgen dat de belangrijkste incidenten als eerste bij analisten terechtkomen en dat gebeurtenissen met lage prioriteit worden gedocumenteerd zonder de tijd van analisten te verbruiken.
Dit artikel behandelt de architectuur en implementatie van geautomatiseerde triagesystemen voor AI-beveiligingsincidenten, waaronder op regels gebaseerde engines voor bekende patronen, statistische anomaliedetectie voor nieuwe dreigingen en evaluatiekaders voor het meten van de triage-effectiviteit.
Triage-architectuur
Gebeurtenisinname
Het triagesysteem neemt gebeurtenissen op uit meerdere bronnen verspreid over de AI-serving-stack:
"""
AI incident triage system -- event ingestion and classification.
"""
from dataclasses import dataclass, field
from enum import Enum
from datetime import datetime, timezone
from typing import Any
class EventSource(Enum):
GUARDRAIL = "guardrail"
MODEL_MONITOR = "model_monitor"
LOG_ANALYZER = "log_analyzer"
USER_REPORT = "user_report"
SECURITY_SCANNER = "security_scanner"
ANOMALY_DETECTOR = "anomaly_detector"
class TriagePriority(Enum):
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
INFORMATIONAL = 5
class IncidentCategory(Enum):
PROMPT_INJECTION = "prompt_injection"
DATA_EXFILTRATION = "data_exfiltration"
MODEL_EVASION = "model_evasion"
JAILBREAK = "jailbreak"
OUTPUT_ANOMALY = "output_anomaly"
PERFORMANCE_DEGRADATION = "performance_degradation"
UNAUTHORIZED_ACCESS = "unauthorized_access"
DATA_POISONING = "data_poisoning"
MODEL_THEFT = "model_theft"
UNKNOWN = "unknown"
@dataclass
class SecurityEvent:
"""Ruwe beveiligingsgebeurtenis uit de AI-serving-stack."""
event_id: str
timestamp: str
source: EventSource
event_type: str
severity_hint: str # Source-provided severity, may be unreliable
payload: dict[str, Any]
model_id: str | None = None
user_id: str | None = None
session_id: str | None = None
ip_address: str | None = None
@dataclass
class TriageResult:
"""Resultaat van geautomatiseerde triageclassificatie."""
event_id: str
priority: TriagePriority
category: IncidentCategory
confidence: float # 0.0 to 1.0
rationale: str
recommended_actions: list[str]
requires_human_review: bool
correlated_events: list[str] = field(default_factory=list)
tags: list[str] = field(default_factory=list)Op regels gebaseerde triage-engine
De eerste triagelaag past deterministische regels toe die bekende incidentpatronen matchen. Deze regels zijn snel, voorspelbaar en auditeerbaar.
class RuleBasedTriageEngine:
"""
Classify AI security events using deterministic rules.
Rules are evaluated in priority order. The first matching rule
determines the triage classification. This ensures that critical
patterns are always caught regardless of statistical model behavior.
"""
def __init__(self):
self.rules: list[dict] = self._build_default_rules()
def _build_default_rules(self) -> list[dict]:
return [
# Critical: Known data exfiltration patterns
{
"name": "data_exfiltration_output",
"priority": TriagePriority.CRITICAL,
"category": IncidentCategory.DATA_EXFILTRATION,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "pii_output"
and e.payload.get("pii_types_detected", 0) >= 3
),
"rationale": "Multiple PII types detected in model output",
"actions": [
"Block response delivery immediately",
"Preserve full conversation context",
"Notify data protection officer",
],
},
# Critical: Model weight access
{
"name": "model_theft_attempt",
"priority": TriagePriority.CRITICAL,
"category": IncidentCategory.MODEL_THEFT,
"condition": lambda e: (
e.event_type == "api_access"
and e.payload.get("endpoint") in ("/v1/models/weights", "/v1/models/export")
and e.payload.get("authorized") is False
),
"rationale": "Unauthorized model weight access attempt",
"actions": [
"Block requesting IP",
"Revoke associated API keys",
"Full access log review for this user",
],
},
# High: Prompt injection detected
{
"name": "prompt_injection_detected",
"priority": TriagePriority.HIGH,
"category": IncidentCategory.PROMPT_INJECTION,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "prompt_injection"
and e.payload.get("injection_confidence", 0) > 0.8
),
"rationale": "High-confidence prompt injection detected by guardrail",
"actions": [
"Log full prompt for forensic analysis",
"Check if injection succeeded despite guardrail",
"Review user's recent session history",
],
},
# High: Jailbreak with safety bypass
{
"name": "jailbreak_safety_bypass",
"priority": TriagePriority.HIGH,
"category": IncidentCategory.JAILBREAK,
"condition": lambda e: (
e.payload.get("guardrail_triggered") == "safety_violation"
and e.payload.get("output_was_delivered", False)
),
"rationale": "Safety violation detected but output was delivered to user",
"actions": [
"Review delivered output for harmful content",
"Determine if guardrail timing was too slow",
"Update guardrail to block pre-delivery",
],
},
# Medium: Repeated guardrail triggers from single user
{
"name": "repeated_guardrail_triggers",
"priority": TriagePriority.MEDIUM,
"category": IncidentCategory.JAILBREAK,
"condition": lambda e: (
e.payload.get("guardrail_triggered") is not None
and e.payload.get("user_trigger_count_1h", 0) > 10
),
"rationale": "User has triggered guardrails >10 times in 1 hour",
"actions": [
"Apply rate limiting to this user",
"Review full session for attack patterns",
],
},
# Medium: Output anomaly
{
"name": "output_distribution_anomaly",
"priority": TriagePriority.MEDIUM,
"category": IncidentCategory.OUTPUT_ANOMALY,
"condition": lambda e: (
e.source == EventSource.ANOMALY_DETECTOR
and e.payload.get("anomaly_score", 0) > 3.0
),
"rationale": "Model output distribution anomaly detected",
"actions": [
"Compare recent outputs against baseline",
"Check for model drift or serving errors",
],
},
# Low: Single guardrail trigger
{
"name": "single_guardrail_trigger",
"priority": TriagePriority.LOW,
"category": IncidentCategory.UNKNOWN,
"condition": lambda e: (
e.payload.get("guardrail_triggered") is not None
and e.payload.get("user_trigger_count_1h", 0) <= 3
),
"rationale": "Isolated guardrail trigger, likely benign probing or accidental",
"actions": ["Log and monitor for escalation"],
},
]
def evaluate(self, event: SecurityEvent) -> TriageResult | None:
"""Evalueer een gebeurtenis tegen alle regels, retourneer eerste match."""
for rule in self.rules:
try:
if rule["condition"](event):
return TriageResult(
event_id=event.event_id,
priority=rule["priority"],
category=rule["category"],
confidence=1.0, # Rule-based = deterministic
rationale=rule["rationale"],
recommended_actions=rule["actions"],
requires_human_review=rule["priority"].value <= 2,
tags=["rule_based", rule["name"]],
)
except (KeyError, TypeError):
continue
return NoneStatistische anomalietriage
Gebeurtenissen die niet overeenkomen met deterministische regels, worden geëvalueerd door een statistische anomaliedetectielaag.
import numpy as np
from collections import deque
class StatisticalTriageEngine:
"""
Triage AI security events using statistical anomaly detection.
Maintains rolling baselines of event features and flags
statistically significant deviations.
"""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.feature_windows: dict[str, deque] = {}
def update_baseline(self, feature_name: str, value: float) -> None:
if feature_name not in self.feature_windows:
self.feature_windows[feature_name] = deque(maxlen=self.window_size)
self.feature_windows[feature_name].append(value)
def compute_anomaly_score(self, feature_name: str, value: float) -> float:
"""Bereken z-score-anomalie voor een featurewaarde tegen zijn baseline."""
if feature_name not in self.feature_windows:
return 0.0
window = self.feature_windows[feature_name]
if len(window) < 30:
return 0.0
values = np.array(window)
mean = float(np.mean(values))
std = float(np.std(values))
if std < 1e-10:
return 0.0
return abs(value - mean) / std
def evaluate(self, event: SecurityEvent) -> TriageResult | None:
"""
Evalueer een gebeurtenis met statistische anomaliedetectie.
Extraheert numerieke features uit de gebeurtenis-payload en
berekent anomaliescores.
"""
anomaly_scores = {}
# Extraheer en evalueer standaardfeatures
feature_mappings = {
"response_latency_ms": event.payload.get("latency_ms"),
"output_token_count": event.payload.get("output_tokens"),
"confidence_score": event.payload.get("top_confidence"),
"guardrail_score": event.payload.get("safety_score"),
"input_length": event.payload.get("input_length"),
}
for feature_name, value in feature_mappings.items():
if value is not None:
score = self.compute_anomaly_score(feature_name, float(value))
anomaly_scores[feature_name] = score
self.update_baseline(feature_name, float(value))
if not anomaly_scores:
return None
max_anomaly = max(anomaly_scores.values())
max_feature = max(anomaly_scores, key=anomaly_scores.get)
if max_anomaly < 2.5:
return None # Niet anomaal genoeg
# Map anomaliescore naar prioriteit
if max_anomaly > 5.0:
priority = TriagePriority.HIGH
elif max_anomaly > 3.5:
priority = TriagePriority.MEDIUM
else:
priority = TriagePriority.LOW
return TriageResult(
event_id=event.event_id,
priority=priority,
category=IncidentCategory.UNKNOWN,
confidence=min(max_anomaly / 5.0, 1.0),
rationale=f"Statistical anomaly detected: {max_feature} (z={max_anomaly:.2f})",
recommended_actions=[
f"Investigate anomalous {max_feature}",
"Compare against recent baseline",
"Check for correlated anomalies in other features",
],
requires_human_review=max_anomaly > 4.0,
tags=["statistical", f"anomaly_{max_feature}"],
)Gecombineerde triagepipeline
class TriagePipeline:
"""
Combined triage pipeline that runs events through multiple
classification engines in priority order.
"""
def __init__(self):
self.rule_engine = RuleBasedTriageEngine()
self.stats_engine = StatisticalTriageEngine()
def triage(self, event: SecurityEvent) -> TriageResult:
"""
Process a security event through the full triage pipeline.
1. Rule-based engine (highest priority, deterministic)
2. Statistical anomaly engine
3. Default classification if no engine matches
"""
# Probeer eerst op regels gebaseerd
result = self.rule_engine.evaluate(event)
if result:
return result
# Probeer statistische anomaliedetectie
result = self.stats_engine.evaluate(event)
if result:
return result
# Standaard: informatieve gebeurtenis met lage prioriteit
return TriageResult(
event_id=event.event_id,
priority=TriagePriority.INFORMATIONAL,
category=IncidentCategory.UNKNOWN,
confidence=0.5,
rationale="No triage rules or anomalies matched; logged for baseline",
recommended_actions=["No immediate action required"],
requires_human_review=False,
tags=["unclassified"],
)Gebeurteniscorrelatie
Temporele correlatie
Afzonderlijke gebeurtenissen kunnen op zichzelf onschuldig lijken, maar een aanvalspatroon onthullen wanneer ze temporeel worden gecorreleerd.
from collections import defaultdict
class TemporalCorrelator:
"""
Correlate security events across time windows to detect
multi-step attack patterns.
"""
def __init__(self, correlation_window_seconds: int = 300):
self.window = correlation_window_seconds
self.recent_events: deque[SecurityEvent] = deque()
self.user_event_counts: dict[str, int] = defaultdict(int)
def add_event(self, event: SecurityEvent) -> dict | None:
"""
Add an event and check for temporal correlations.
Returns a correlation alert if a pattern is detected.
"""
now = datetime.now(timezone.utc)
self.recent_events.append(event)
# Verloop oude gebeurtenissen
while self.recent_events:
oldest = self.recent_events[0]
oldest_time = datetime.fromisoformat(oldest.timestamp)
if (now - oldest_time).total_seconds() > self.window:
self.recent_events.popleft()
else:
break
# Houd gebeurtenisfrequentie per gebruiker bij
if event.user_id:
self.user_event_counts[event.user_id] += 1
# Controleer op reconnaissance-patroon: meerdere verschillende
# aanvalstypen van dezelfde gebruiker binnen het venster
if event.user_id:
user_events = [
e for e in self.recent_events
if e.user_id == event.user_id
]
event_types = set(e.event_type for e in user_events)
if len(event_types) >= 3 and len(user_events) >= 5:
return {
"correlation_type": "reconnaissance_pattern",
"user_id": event.user_id,
"event_count": len(user_events),
"distinct_types": list(event_types),
"severity": "HIGH",
"description": (
f"User {event.user_id} has generated {len(user_events)} "
f"events of {len(event_types)} different types in "
f"{self.window}s, suggesting systematic probing"
),
}
return NoneEvaluatie en metrieken
Triage-prestatiemetrieken
@dataclass
class TriageMetrics:
"""Prestatiemetrieken voor het triagesysteem."""
total_events: int
true_positives: int # Correctly escalated real incidents
false_positives: int # Incorrectly escalated non-incidents
true_negatives: int # Correctly dismissed non-incidents
false_negatives: int # Missed real incidents (most dangerous)
mean_triage_time_ms: float # Average time to produce triage result
mean_time_to_human_review_minutes: float
@property
def precision(self) -> float:
denom = self.true_positives + self.false_positives
return self.true_positives / denom if denom > 0 else 0.0
@property
def recall(self) -> float:
denom = self.true_positives + self.false_negatives
return self.true_positives / denom if denom > 0 else 0.0
@property
def f1_score(self) -> float:
p, r = self.precision, self.recall
return 2 * p * r / (p + r) if (p + r) > 0 else 0.0Afstemmen voor AI-beveiliging
Bij triage van AI-beveiliging zijn de kosten van false negatives (gemiste incidenten) doorgaans veel hoger dan de kosten van false positives (onnodige escalaties). Triagesystemen moeten worden afgestemd op hoge recall, waarbij een hoger false-positivepercentage wordt geaccepteerd:
| Use case | Doel-recall | Acceptabel FP-percentage | Onderbouwing |
|---|---|---|---|
| Veiligheidskritieke AI (medisch, autonoom) | > 99% | Tot 30% | Het missen van een incident kan fysieke schade veroorzaken |
| Klantgerichte LLM | > 95% | Tot 20% | Risico op datalek of schadelijke output |
| Interne tooling | > 90% | Tot 15% | Lagere externe blootstelling |
| Onderzoek/experimenten | > 80% | Tot 10% | Gecontroleerde omgeving |
Operationele overwegingen
Beheer van alertmoeheid
Hoge false-positivepercentages veroorzaken alertmoeheid, wat de effectiviteit van analisten vermindert. Mitigeer dit via:
- Progressieve onthulling: Toon eerst een samenvatting, vouw details uit op verzoek
- Deduplicatie: Groepeer gerelateerde gebeurtenissen tot één incident
- Adaptieve drempels: Verhoog automatisch de drempels voor gebeurtenistypen met consequent hoge false-positivepercentages
- Feedbackloops van analisten: Sta analisten toe false positives te markeren, wat terugvloeit in het afstemmen van regels en modellen
Integratie met incidentmanagement
Het triagesysteem moet integreren met bestaande incidentmanagementplatforms (PagerDuty, Opsgenie, Jira, ServiceNow) via standaard-webhooks:
def format_triage_for_pagerduty(result: TriageResult) -> dict:
"""Formatteer een triageresultaat als een PagerDuty-gebeurtenis."""
severity_map = {
TriagePriority.CRITICAL: "critical",
TriagePriority.HIGH: "error",
TriagePriority.MEDIUM: "warning",
TriagePriority.LOW: "info",
TriagePriority.INFORMATIONAL: "info",
}
return {
"routing_key": "YOUR_INTEGRATION_KEY",
"event_action": "trigger",
"payload": {
"summary": f"AI Security: {result.category.value} [{result.priority.name}]",
"severity": severity_map[result.priority],
"source": "ai-incident-triage",
"custom_details": {
"event_id": result.event_id,
"category": result.category.value,
"confidence": result.confidence,
"rationale": result.rationale,
"recommended_actions": result.recommended_actions,
},
},
}Referenties
- OWASP. (2025). OWASP Top 10 for Large Language Model Applications. https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS. (2024). Adversarial Threat Landscape for Artificial Intelligence Systems. https://atlas.mitre.org/
- NIST. (2023). Artificial Intelligence Risk Management Framework (AI RMF 1.0). NIST AI 100-1. https://doi.org/10.6028/NIST.AI.100-1
- Bhatt, S. et al. (2014). The Operational Role of Security Information and Event Management Systems. IEEE Security & Privacy, 12(5). https://doi.org/10.1109/MSP.2014.103