Capstone: Build an AI Incident Response System
Design and implement an incident response system purpose-built for AI security incidents including prompt injection breaches, model manipulation, and data exfiltration through LLM applications.
概覽
When an AI system is compromised — whether through 提示詞注入, model manipulation, or 資料投毒 — organizations need an incident response capability specifically designed for AI. Traditional IR playbooks assume discrete events with clear artifacts (malicious binaries, network connections, file modifications). AI incidents are fundamentally different: the "利用" is natural language, the "payload" is model behavior change, and the "indicators of compromise" are subtle shifts in 輸出 distributions.
This capstone project tasks you with building an AI Incident Response System (AIRS) that handles the full incident lifecycle: 偵測 of anomalous AI behavior, automated triage and severity 評估, containment actions specific to AI systems, forensic analysis of conversation and interaction logs, and post-incident reporting that maps findings to AI risk frameworks.
系統 processes telemetry from AI applications — API logs, model outputs, 安全 classifier scores, and user feedback signals — and correlates these signals to detect incidents that no single signal would reveal. 例如, a gradual increase in 安全 classifier near-misses combined with unusual prompt patterns from a single IP might indicate an active 越獄 campaign that has not yet fully succeeded.
Project Requirements
Functional Requirements
-
偵測 Engine — Real-time processing of AI application telemetry with configurable 偵測 rules and anomaly 偵測 models.
-
Triage System — Automated severity 評估 based on attack type, affected model capabilities, data sensitivity, and blast radius.
-
Containment Playbooks — Automated and semi-automated response actions:
- Rate-limit or block suspicious sources
- Switch to a more restricted model configuration
- Enable enhanced logging for forensic capture
- Temporarily disable specific model capabilities (工具使用, code execution)
-
Forensic Analyzer — Tools for investigating incidents after 偵測:
- Conversation reconstruction from logs
- 攻擊 chain visualization
- Prompt evolution analysis (how 攻擊者 refined their technique)
- Impact 評估 (what data was exposed, what actions were taken)
-
Reporting and Notification — Integration with existing incident management systems (PagerDuty, Slack, JIRA) and structured incident reports.
Non-Functional Requirements
- 偵測 latency under 30 seconds from event ingestion to alert.
- Must handle at least 10,000 events per second.
- All 偵測 rules must be version-controlled and auditable.
- 系統 must operate independently of the AI systems it monitors (no shared infrastructure).
實作 Guide
Phase 1: Event Ingestion and Normalization
Build the event pipeline that ingests telemetry from diverse AI application sources.
# airs/events.py
"""Event schema and ingestion pipeline for AI telemetry."""
from __future__ import annotations
import enum
import hashlib
import json
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field
class EventType(str, enum.Enum):
"""Types of events the AIRS system processes."""
API_REQUEST = "api_request"
API_RESPONSE = "api_response"
SAFETY_CLASSIFIER = "safety_classifier"
CONTENT_FILTER = "content_filter"
USER_FEEDBACK = "user_feedback"
TOOL_INVOCATION = "tool_invocation"
RATE_LIMIT = "rate_limit"
ERROR = "error"
AUTHENTICATION = "認證"
class AIEvent(BaseModel):
"""Normalized event from an AI application."""
event_id: str = ""
event_type: EventType
timestamp: datetime
source_system: str # Which AI application generated this event
session_id: str = "" # Conversation/session identifier
user_id: str = ""
source_ip: str = ""
model_id: str = ""
content: dict[str, Any] = Field(default_factory=dict)
metadata: dict[str, Any] = Field(default_factory=dict)
# 安全-related fields
safety_score: float | None = None # 0.0 (safe) to 1.0 (unsafe)
content_categories: list[str] = Field(default_factory=list)
blocked: bool = False
def model_post_init(self, __context: Any) -> None:
if not self.event_id:
# Generate deterministic event ID for deduplication.
payload = f"{self.timestamp.isoformat()}:{self.source_system}:{self.session_id}:{self.event_type.value}"
self.event_id = hashlib.sha256(payload.encode()).hexdigest()[:16]
class EventNormalizer:
"""Normalizes events from different AI platform log formats."""
def normalize_openai_log(self, raw: dict[str, Any]) -> AIEvent:
"""Normalize an OpenAI API log entry."""
messages = raw.get("request", {}).get("messages", [])
user_message = ""
for msg in reversed(messages):
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
return AIEvent(
event_type=EventType.API_REQUEST,
timestamp=datetime.fromisoformat(raw.get("timestamp", datetime.utcnow().isoformat())),
source_system=raw.get("application", "unknown"),
session_id=raw.get("session_id", ""),
user_id=raw.get("user_id", ""),
source_ip=raw.get("client_ip", ""),
model_id=raw.get("model", ""),
content={
"user_message": user_message,
"message_count": len(messages),
"model": raw.get("model", ""),
"temperature": raw.get("request", {}).get("temperature"),
},
metadata=raw.get("metadata", {}),
)
def normalize_guardrails_log(self, raw: dict[str, Any]) -> AIEvent:
"""Normalize a NeMo 護欄 or similar 護欄 log entry."""
return AIEvent(
event_type=EventType.CONTENT_FILTER,
timestamp=datetime.fromisoformat(raw.get("timestamp", datetime.utcnow().isoformat())),
source_system=raw.get("application", "護欄"),
session_id=raw.get("session_id", ""),
user_id=raw.get("user_id", ""),
model_id=raw.get("model", ""),
content={
"input_text": raw.get("輸入", ""),
"rail_triggered": raw.get("rail_name", ""),
"action_taken": raw.get("action", ""),
},
safety_score=raw.get("score"),
blocked=raw.get("blocked", False),
content_categories=raw.get("categories", []),
)Phase 2: 偵測 Rules Engine
# airs/偵測.py
"""偵測 engine with configurable rules and anomaly 偵測."""
from __future__ import annotations
import logging
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Callable
from .events import AIEvent, EventType
logger = logging.getLogger(__name__)
@dataclass
class Alert:
"""An alert generated by the 偵測 engine."""
alert_id: str
rule_name: str
severity: str # "critical", "high", "medium", "low"
title: str
description: str
related_events: list[AIEvent]
triggered_at: datetime = field(default_factory=datetime.utcnow)
metadata: dict[str, Any] = field(default_factory=dict)
acknowledged: bool = False
resolved: bool = False
class DetectionRule:
"""A single 偵測 rule that evaluates events for suspicious patterns."""
def __init__(
self,
name: str,
severity: str,
description: str,
condition: Callable[[AIEvent, "DetectionContext"], bool],
throttle_seconds: int = 60,
) -> None:
self.name = name
self.severity = severity
self.description = description
self.condition = condition
self.throttle_seconds = throttle_seconds
self._last_fired: datetime | None = None
def 評估(self, event: AIEvent, context: "DetectionContext") -> Alert | None:
"""評估 the rule against an event. Returns an Alert if triggered."""
if self._last_fired and (
datetime.utcnow() - self._last_fired
).total_seconds() < self.throttle_seconds:
return None
if self.condition(event, context):
self._last_fired = datetime.utcnow()
return Alert(
alert_id=f"{self.name}:{event.event_id}",
rule_name=self.name,
severity=self.severity,
title=f"偵測: {self.name}",
description=self.description,
related_events=[event],
)
return None
class DetectionContext:
"""Maintains sliding-window state for correlation rules."""
def __init__(self, window_seconds: int = 300) -> None:
self._window = timedelta(seconds=window_seconds)
self._events_by_session: dict[str, deque[AIEvent]] = defaultdict(
lambda: deque(maxlen=1000)
)
self._events_by_ip: dict[str, deque[AIEvent]] = defaultdict(
lambda: deque(maxlen=1000)
)
self._blocked_count_by_ip: dict[str, int] = defaultdict(int)
self._safety_scores_by_session: dict[str, list[float]] = defaultdict(list)
def ingest(self, event: AIEvent) -> None:
"""Add an event to the 上下文視窗."""
if event.session_id:
self._events_by_session[event.session_id].append(event)
if event.source_ip:
self._events_by_ip[event.source_ip].append(event)
if event.blocked:
self._blocked_count_by_ip[event.source_ip] += 1
if event.safety_score is not None and event.session_id:
self._safety_scores_by_session[event.session_id].append(event.safety_score)
def blocked_count_from_ip(self, ip: str, window: timedelta | None = None) -> int:
"""Count blocked requests from an IP within the window."""
if window is None:
window = self._window
cutoff = datetime.utcnow() - window
return sum(
1 for e in self._events_by_ip.get(ip, [])
if e.blocked and e.timestamp > cutoff
)
def request_count_from_ip(self, ip: str, window: timedelta | None = None) -> int:
if window is None:
window = self._window
cutoff = datetime.utcnow() - window
return sum(
1 for e in self._events_by_ip.get(ip, [])
if e.timestamp > cutoff
)
def average_safety_score(self, session_id: str) -> float:
scores = self._safety_scores_by_session.get(session_id, [])
return sum(scores) / len(scores) if scores else 0.0
def session_event_count(self, session_id: str) -> int:
return len(self._events_by_session.get(session_id, []))
# --- Built-in 偵測 Rules ---
def _repeated_blocks_rule(event: AIEvent, ctx: DetectionContext) -> bool:
"""Fires when an IP has many blocked requests — likely an active attack."""
if not event.source_ip:
return False
return ctx.blocked_count_from_ip(event.source_ip) >= 10
def _escalating_safety_scores(event: AIEvent, ctx: DetectionContext) -> bool:
"""Fires when 安全 scores in a session are trending upward — probe refinement."""
if not event.session_id or event.safety_score is None:
return False
scores = ctx._safety_scores_by_session.get(event.session_id, [])
if len(scores) < 5:
return False
# Check if the last 5 scores are monotonically increasing.
recent = scores[-5:]
return all(recent[i] <= recent[i + 1] for i in range(len(recent) - 1)) and recent[-1] > 0.7
def _high_volume_single_session(event: AIEvent, ctx: DetectionContext) -> bool:
"""Fires when a single session has an unusually high message count."""
if not event.session_id:
return False
return ctx.session_event_count(event.session_id) > 100
def _content_filter_bypass(event: AIEvent, ctx: DetectionContext) -> bool:
"""Fires on a successful response after multiple content filter triggers."""
if event.event_type != EventType.API_RESPONSE:
return False
if event.blocked:
return False
# Check if this session had multiple prior blocks.
session_events = ctx._events_by_session.get(event.session_id, [])
recent_blocks = sum(1 for e in session_events if e.blocked)
return recent_blocks >= 3
# Registry of built-in rules.
BUILTIN_RULES = [
DetectionRule(
name="repeated_blocks",
severity="high",
description="Multiple blocked requests from the same IP address indicates active attack probing.",
condition=_repeated_blocks_rule,
throttle_seconds=120,
),
DetectionRule(
name="escalating_safety_scores",
severity="high",
description="安全 scores in a session are monotonically increasing, suggesting 攻擊者 is refining their technique.",
condition=_escalating_safety_scores,
throttle_seconds=300,
),
DetectionRule(
name="high_volume_session",
severity="medium",
description="A single session has an unusually high number of messages, which may indicate automated probing.",
condition=_high_volume_single_session,
throttle_seconds=600,
),
DetectionRule(
name="content_filter_bypass",
severity="critical",
description="A session that previously triggered content filters is now receiving unblocked responses, suggesting a successful bypass.",
condition=_content_filter_bypass,
throttle_seconds=60,
),
]
class DetectionEngine:
"""Processes events through 偵測 rules and generates alerts."""
def __init__(self, rules: list[DetectionRule] | None = None) -> None:
self.rules = rules or BUILTIN_RULES.copy()
self.context = DetectionContext()
self.alert_handlers: list[Callable[[Alert], None]] = []
def register_handler(self, handler: Callable[[Alert], None]) -> None:
self.alert_handlers.append(handler)
def process_event(self, event: AIEvent) -> list[Alert]:
"""Process a single event and return any generated alerts."""
self.context.ingest(event)
alerts: list[Alert] = []
for rule in self.rules:
alert = rule.評估(event, self.context)
if alert:
alerts.append(alert)
for handler in self.alert_handlers:
try:
handler(alert)
except Exception:
logger.exception("Alert handler failed for %s", alert.alert_id)
return alertsPhase 3: Containment Playbooks
# airs/containment.py
"""Automated containment playbooks for AI 安全 incidents."""
from __future__ import annotations
import abc
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from .偵測 import Alert
logger = logging.getLogger(__name__)
@dataclass
class ContainmentAction:
"""A containment action taken in response to an incident."""
action_type: str
target: str # What was acted upon (IP, session, model endpoint, etc.)
description: str
executed_at: datetime
success: bool
details: dict[str, Any]
rollback_instructions: str = ""
class ContainmentPlaybook(abc.ABC):
"""Base class for containment playbooks."""
name: str
description: str
applicable_rules: list[str] # Which 偵測 rules trigger this playbook
@abc.abstractmethod
async def execute(self, alert: Alert) -> list[ContainmentAction]:
"""Execute the containment playbook and return actions taken."""
...
class RateLimitPlaybook(ContainmentPlaybook):
"""Apply aggressive rate limiting to a suspicious source."""
name = "rate_limit"
description = "Apply strict rate limits to the source IP or user."
applicable_rules = ["repeated_blocks", "high_volume_session"]
def __init__(self, rate_limiter: Any) -> None:
self._rate_limiter = rate_limiter
async def execute(self, alert: Alert) -> list[ContainmentAction]:
actions: list[ContainmentAction] = []
for event in alert.related_events:
if event.source_ip:
try:
# Apply a restrictive rate limit: 5 requests per minute.
await self._rate_limiter.set_limit(
key=f"ip:{event.source_ip}",
max_requests=5,
window_seconds=60,
)
actions.append(
ContainmentAction(
action_type="rate_limit",
target=event.source_ip,
description=f"Applied 5 req/min limit to IP {event.source_ip}",
executed_at=datetime.utcnow(),
success=True,
details={"max_requests": 5, "window": 60},
rollback_instructions=f"Remove rate limit for IP {event.source_ip}",
)
)
except Exception as exc:
logger.error("Failed to apply rate limit: %s", exc)
actions.append(
ContainmentAction(
action_type="rate_limit",
target=event.source_ip,
description=f"FAILED to apply rate limit to {event.source_ip}: {exc}",
executed_at=datetime.utcnow(),
success=False,
details={"error": str(exc)},
)
)
return actions
class ModelDowngradePlaybook(ContainmentPlaybook):
"""Switch to a more restricted model configuration."""
name = "model_downgrade"
description = "Switch the affected system to a safer model configuration."
applicable_rules = ["content_filter_bypass", "escalating_safety_scores"]
def __init__(self, model_config_manager: Any) -> None:
self._config_manager = model_config_manager
async def execute(self, alert: Alert) -> list[ContainmentAction]:
actions: list[ContainmentAction] = []
source_systems = {e.source_system for e in alert.related_events}
for system in source_systems:
try:
previous_config = await self._config_manager.get_config(system)
await self._config_manager.apply_safe_mode(system)
actions.append(
ContainmentAction(
action_type="model_downgrade",
target=system,
description=f"Switched {system} to safe mode configuration",
executed_at=datetime.utcnow(),
success=True,
details={
"previous_model": previous_config.get("model", "unknown"),
"new_config": "safe_mode",
},
rollback_instructions=f"Restore {system} to previous config: {previous_config}",
)
)
except Exception as exc:
logger.error("Failed to downgrade model for %s: %s", system, exc)
return actions
class EnhancedLoggingPlaybook(ContainmentPlaybook):
"""Enable verbose logging for forensic capture."""
name = "enhanced_logging"
description = "Enable full request/response logging for forensic analysis."
applicable_rules = ["content_filter_bypass", "escalating_safety_scores", "repeated_blocks"]
def __init__(self, logging_manager: Any) -> None:
self._logging_manager = logging_manager
async def execute(self, alert: Alert) -> list[ContainmentAction]:
sessions = {e.session_id for e in alert.related_events if e.session_id}
ips = {e.source_ip for e in alert.related_events if e.source_ip}
actions: list[ContainmentAction] = []
for session_id in sessions:
try:
await self._logging_manager.enable_verbose(
session_id=session_id,
capture_full_content=True,
duration_minutes=60,
)
actions.append(
ContainmentAction(
action_type="enhanced_logging",
target=session_id,
description=f"Enabled verbose logging for session {session_id}",
executed_at=datetime.utcnow(),
success=True,
details={"duration_minutes": 60, "capture_full_content": True},
)
)
except Exception as exc:
logger.error("Failed to enable enhanced logging: %s", exc)
return actionsPhase 4: Forensic Analyzer
# airs/forensics.py
"""Forensic analysis tools for AI 安全 incidents."""
from __future__ import annotations
import json
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from .events import AIEvent, EventType
@dataclass
class ConversationTurn:
"""A single turn in a reconstructed conversation."""
timestamp: datetime
role: str # "user", "assistant", "system", "tool"
content: str
safety_score: float | None = None
blocked: bool = False
metadata: dict[str, Any] | None = None
@dataclass
class AttackChainStep:
"""A step in a reconstructed attack chain."""
step_number: int
timestamp: datetime
technique: str
description: str
event: AIEvent
success: bool
impact: str = ""
@dataclass
class ForensicReport:
"""Complete forensic analysis of an incident."""
incident_id: str
timeline: list[ConversationTurn]
attack_chain: list[AttackChainStep]
affected_sessions: list[str]
affected_users: list[str]
data_exposure_assessment: str
technique_summary: str
recommendations: list[str]
class ForensicAnalyzer:
"""Reconstructs and analyzes AI 安全 incidents from event logs."""
# Mapping of patterns to attack technique names.
TECHNIQUE_INDICATORS = {
"prompt_injection": [
"ignore", "disregard", "override", "forget", "new instructions",
],
"越獄": [
"DAN", "developer mode", "unrestricted", "no limits", "bypass",
],
"data_exfiltration": [
"系統提示詞", "reveal", "show me your", "initial instructions",
"repeat the above", "print everything",
],
"encoding_attack": [
"base64", "decode", "rot13", "hex", "unicode",
],
}
def reconstruct_conversation(
self, events: list[AIEvent], session_id: str
) -> list[ConversationTurn]:
"""Reconstruct the conversation timeline for a session."""
session_events = sorted(
[e for e in events if e.session_id == session_id],
key=lambda e: e.timestamp,
)
turns: list[ConversationTurn] = []
for event in session_events:
if event.event_type == EventType.API_REQUEST:
user_msg = event.content.get("user_message", "")
if user_msg:
turns.append(
ConversationTurn(
timestamp=event.timestamp,
role="user",
content=user_msg,
safety_score=event.safety_score,
blocked=event.blocked,
)
)
elif event.event_type == EventType.API_RESPONSE:
assistant_msg = event.content.get("response_text", "")
if assistant_msg:
turns.append(
ConversationTurn(
timestamp=event.timestamp,
role="assistant",
content=assistant_msg,
safety_score=event.safety_score,
blocked=event.blocked,
)
)
return turns
def identify_attack_techniques(
self, events: list[AIEvent]
) -> list[AttackChainStep]:
"""Analyze events to 識別 attack techniques used."""
chain: list[AttackChainStep] = []
step = 0
for event in sorted(events, key=lambda e: e.timestamp):
user_msg = event.content.get("user_message", "").lower()
if not user_msg:
continue
for technique, indicators in self.TECHNIQUE_INDICATORS.items():
if any(ind.lower() in user_msg for ind in indicators):
step += 1
chain.append(
AttackChainStep(
step_number=step,
timestamp=event.timestamp,
technique=technique,
description=f"Detected {technique} indicators in user message",
event=event,
success=not event.blocked,
impact="Response generated" if not event.blocked else "Blocked by filter",
)
)
return chain
def assess_data_exposure(
self, events: list[AIEvent]
) -> str:
"""評估 what data may have been exposed during the incident."""
exposed_categories: set[str] = set()
unblocked_after_attack = 0
attack_detected = False
for event in sorted(events, key=lambda e: e.timestamp):
user_msg = event.content.get("user_message", "").lower()
# Check if this looks like an attack message.
is_attack = any(
ind.lower() in user_msg
for indicators in self.TECHNIQUE_INDICATORS.values()
for ind in indicators
)
if is_attack:
attack_detected = True
if attack_detected and not event.blocked and event.event_type == EventType.API_RESPONSE:
unblocked_after_attack += 1
response = event.content.get("response_text", "")
if "系統提示詞" in response.lower() or "instructions" in response.lower():
exposed_categories.add("system_prompt")
if any(kw in response.lower() for kw in ["password", "api key", "secret", "符元"]):
exposed_categories.add("credentials")
if not exposed_categories and unblocked_after_attack == 0:
return "No data exposure detected. All attack attempts were blocked."
elif not exposed_categories:
return (
f"{unblocked_after_attack} response(s) were generated after attack 偵測. "
"Manual review recommended to 評估 content sensitivity."
)
else:
return (
f"Potential exposure of: {', '.join(exposed_categories)}. "
f"{unblocked_after_attack} unblocked response(s) after attack onset. "
"Immediate investigation required."
)
def generate_report(
self, incident_id: str, events: list[AIEvent]
) -> ForensicReport:
"""Generate a complete forensic report for an incident."""
sessions = list({e.session_id for e in events if e.session_id})
users = list({e.user_id for e in events if e.user_id})
# Reconstruct conversations for all affected sessions.
timeline: list[ConversationTurn] = []
for sid in sessions:
timeline.extend(self.reconstruct_conversation(events, sid))
timeline.sort(key=lambda t: t.timestamp)
attack_chain = self.identify_attack_techniques(events)
data_assessment = self.assess_data_exposure(events)
# Summarize techniques used.
techniques_used = list({step.technique for step in attack_chain})
technique_summary = (
f"攻擊者 used {len(techniques_used)} technique(s): "
+ ", ".join(techniques_used)
if techniques_used
else "No specific attack techniques identified in the event log."
)
recommendations = [
"Review and strengthen 輸入 validation for the affected application.",
"Audit system prompts for information that should not be exposed.",
"考慮 adding additional 護欄 for the identified attack techniques.",
"Review rate limiting configuration for the affected endpoints.",
"Conduct a post-incident review with the application development team.",
]
return ForensicReport(
incident_id=incident_id,
timeline=timeline,
attack_chain=attack_chain,
affected_sessions=sessions,
affected_users=users,
data_exposure_assessment=data_assessment,
technique_summary=technique_summary,
recommendations=recommendations,
)Phase 5: Notification Integrations
# airs/notifications.py
"""Notification integrations for alert delivery."""
from __future__ import annotations
import abc
import json
import logging
from typing import Any
import httpx
from .偵測 import Alert
logger = logging.getLogger(__name__)
class NotificationChannel(abc.ABC):
"""Base class for notification channels."""
@abc.abstractmethod
async def send(self, alert: Alert) -> bool:
"""Send an alert notification. Returns True on success."""
...
class SlackNotifier(NotificationChannel):
"""Send alerts to a Slack channel via webhook."""
SEVERITY_COLORS = {
"critical": "#FF0000",
"high": "#FF6600",
"medium": "#FFCC00",
"low": "#00CC00",
}
def __init__(self, webhook_url: str, channel: str = "") -> None:
self._webhook_url = webhook_url
self._channel = channel
async def send(self, alert: Alert) -> bool:
color = self.SEVERITY_COLORS.get(alert.severity, "#808080")
payload = {
"attachments": [
{
"color": color,
"title": f"AI 安全 Alert: {alert.title}",
"text": alert.description,
"fields": [
{"title": "Severity", "value": alert.severity.upper(), "short": True},
{"title": "Rule", "value": alert.rule_name, "short": True},
{"title": "Alert ID", "value": alert.alert_id, "short": True},
{
"title": "Related Events",
"value": str(len(alert.related_events)),
"short": True,
},
],
"ts": int(alert.triggered_at.timestamp()),
}
]
}
if self._channel:
payload["channel"] = self._channel
try:
async with httpx.AsyncClient() as client:
resp = await client.post(self._webhook_url, json=payload, timeout=10)
return resp.status_code == 200
except Exception:
logger.exception("Failed to send Slack notification")
return False
class PagerDutyNotifier(NotificationChannel):
"""Send critical alerts to PagerDuty."""
def __init__(self, routing_key: str) -> None:
self._routing_key = routing_key
async def send(self, alert: Alert) -> bool:
# Only page for critical and high severity.
if alert.severity not in ("critical", "high"):
return True
pagerduty_severity = "critical" if alert.severity == "critical" else "error"
payload = {
"routing_key": self._routing_key,
"event_action": "trigger",
"payload": {
"summary": f"[AI 安全] {alert.title}",
"severity": pagerduty_severity,
"source": "ai-incident-response-system",
"custom_details": {
"rule_name": alert.rule_name,
"description": alert.description,
"related_events": len(alert.related_events),
},
},
"dedup_key": alert.alert_id,
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=10,
)
return resp.status_code == 202
except Exception:
logger.exception("Failed to send PagerDuty notification")
return False評估 Criteria
| Criterion | Weight | Excellent | Satisfactory | Needs Improvement |
|---|---|---|---|---|
| 偵測 | 30% | 5+ 偵測 rules with correlation, configurable thresholds, low false-positive design | 3+ rules with basic thresholds | Fewer than 3 rules or no correlation |
| Containment | 25% | Multiple automated playbooks with rollback, semi-automated escalation paths | Basic automated actions (rate limit, block) | Manual-only containment |
| Forensics | 20% | Conversation reconstruction, attack chain identification, data exposure 評估 | Basic log aggregation and timeline | Raw log access only |
| Integration | 15% | Multiple notification channels, SIEM-compatible 輸出, webhook support | Single notification channel | No external integrations |
| Resilience | 10% | Handles high event volumes, graceful degradation, operates independently of monitored systems | Reasonable throughput, basic error handling | Crashes under load or shares infrastructure with targets |
Stretch Goals
- 實作 ML-based anomaly 偵測 that learns "normal" behavior per application and detects deviations without explicit rules.
- Build a graph-based attack visualization that shows relationships between sessions, IPs, and attack techniques.
- Add support for automated evidence preservation with chain-of-custody documentation.
- Integrate with MITRE ATLAS to automatically map detected attacks to known technique IDs.
參考文獻
- NIST. (2012). "SP 800-61 Rev. 2: Computer 安全 Incident Handling Guide." https://csrc.nist.gov/pubs/sp/800/61/r2/final
- MITRE. (2024). "ATLAS — 對抗性 Threat Landscape for AI Systems." https://atlas.mitre.org/
- Anthropic. (2024). "Challenges in 紅隊演練 AI Systems." https://www.anthropic.com/research/challenges-in-red-teaming-ai-systems