Capstone: Build an AI Incident Response System
Design and implement an incident response system purpose-built for AI security incidents including prompt injection breaches, model manipulation, and data exfiltration through LLM applications.
Overview
When an AI system is compromised — whether through prompt injection, model manipulation, or data poisoning — organizations need an incident response capability specifically designed for AI. Traditional IR playbooks assume discrete events with clear artifacts (malicious binaries, network connections, file modifications). AI incidents are fundamentally different: the "exploit" is natural language, the "payload" is model behavior change, and the "indicators of compromise" are subtle shifts in output distributions.
This capstone project tasks you with building an AI Incident Response System (AIRS) that handles the full incident lifecycle: detection of anomalous AI behavior, automated triage and severity assessment, containment actions specific to AI systems, forensic analysis of conversation and interaction logs, and post-incident reporting that maps findings to AI risk frameworks.
The system processes telemetry from AI applications — API logs, model outputs, safety classifier scores, and user feedback signals — and correlates these signals to detect incidents that no single signal would reveal. For example, a gradual increase in safety classifier near-misses combined with unusual prompt patterns from a single IP might indicate an active jailbreak campaign that has not yet fully succeeded.
Project Requirements
Functional Requirements
-
Detection Engine — Real-time processing of AI application telemetry with configurable detection rules and anomaly detection models.
-
Triage System — Automated severity assessment based on attack type, affected model capabilities, data sensitivity, and blast radius.
-
Containment Playbooks — Automated and semi-automated response actions:
- Rate-limit or block suspicious sources
- Switch to a more restricted model configuration
- Enable enhanced logging for forensic capture
- Temporarily disable specific model capabilities (tool use, code execution)
-
Forensic Analyzer — Tools for investigating incidents after detection:
- Conversation reconstruction from logs
- Attack chain visualization
- Prompt evolution analysis (how an attacker refined their technique)
- Impact assessment (what data was exposed, what actions were taken)
-
Reporting and Notification — Integration with existing incident management systems (PagerDuty, Slack, JIRA) and structured incident reports.
Non-Functional Requirements
- Detection latency under 30 seconds from event ingestion to alert.
- Must handle at least 10,000 events per second.
- All detection rules must be version-controlled and auditable.
- The system must operate independently of the AI systems it monitors (no shared infrastructure).
Implementation Guide
Phase 1: Event Ingestion and Normalization
Build the event pipeline that ingests telemetry from diverse AI application sources.
# airs/events.py
"""Event schema and ingestion pipeline for AI telemetry."""
from __future__ import annotations
import enum
import hashlib
import json
from datetime import datetime
from typing import Any, Optional
from pydantic import BaseModel, Field
class EventType(str, enum.Enum):
"""Types of events the AIRS system processes."""
API_REQUEST = "api_request"
API_RESPONSE = "api_response"
SAFETY_CLASSIFIER = "safety_classifier"
CONTENT_FILTER = "content_filter"
USER_FEEDBACK = "user_feedback"
TOOL_INVOCATION = "tool_invocation"
RATE_LIMIT = "rate_limit"
ERROR = "error"
AUTHENTICATION = "authentication"
class AIEvent(BaseModel):
"""Normalized event from an AI application."""
event_id: str = ""
event_type: EventType
timestamp: datetime
source_system: str # Which AI application generated this event
session_id: str = "" # Conversation/session identifier
user_id: str = ""
source_ip: str = ""
model_id: str = ""
content: dict[str, Any] = Field(default_factory=dict)
metadata: dict[str, Any] = Field(default_factory=dict)
# Safety-related fields
safety_score: float | None = None # 0.0 (safe) to 1.0 (unsafe)
content_categories: list[str] = Field(default_factory=list)
blocked: bool = False
def model_post_init(self, __context: Any) -> None:
if not self.event_id:
# Generate deterministic event ID for deduplication.
payload = f"{self.timestamp.isoformat()}:{self.source_system}:{self.session_id}:{self.event_type.value}"
self.event_id = hashlib.sha256(payload.encode()).hexdigest()[:16]
class EventNormalizer:
"""Normalizes events from different AI platform log formats."""
def normalize_openai_log(self, raw: dict[str, Any]) -> AIEvent:
"""Normalize an OpenAI API log entry."""
messages = raw.get("request", {}).get("messages", [])
user_message = ""
for msg in reversed(messages):
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
return AIEvent(
event_type=EventType.API_REQUEST,
timestamp=datetime.fromisoformat(raw.get("timestamp", datetime.utcnow().isoformat())),
source_system=raw.get("application", "unknown"),
session_id=raw.get("session_id", ""),
user_id=raw.get("user_id", ""),
source_ip=raw.get("client_ip", ""),
model_id=raw.get("model", ""),
content={
"user_message": user_message,
"message_count": len(messages),
"model": raw.get("model", ""),
"temperature": raw.get("request", {}).get("temperature"),
},
metadata=raw.get("metadata", {}),
)
def normalize_guardrails_log(self, raw: dict[str, Any]) -> AIEvent:
"""Normalize a NeMo Guardrails or similar guardrails log entry."""
return AIEvent(
event_type=EventType.CONTENT_FILTER,
timestamp=datetime.fromisoformat(raw.get("timestamp", datetime.utcnow().isoformat())),
source_system=raw.get("application", "guardrails"),
session_id=raw.get("session_id", ""),
user_id=raw.get("user_id", ""),
model_id=raw.get("model", ""),
content={
"input_text": raw.get("input", ""),
"rail_triggered": raw.get("rail_name", ""),
"action_taken": raw.get("action", ""),
},
safety_score=raw.get("score"),
blocked=raw.get("blocked", False),
content_categories=raw.get("categories", []),
)Phase 2: Detection Rules Engine
# airs/detection.py
"""Detection engine with configurable rules and anomaly detection."""
from __future__ import annotations
import logging
from collections import defaultdict, deque
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Callable
from .events import AIEvent, EventType
logger = logging.getLogger(__name__)
@dataclass
class Alert:
"""An alert generated by the detection engine."""
alert_id: str
rule_name: str
severity: str # "critical", "high", "medium", "low"
title: str
description: str
related_events: list[AIEvent]
triggered_at: datetime = field(default_factory=datetime.utcnow)
metadata: dict[str, Any] = field(default_factory=dict)
acknowledged: bool = False
resolved: bool = False
class DetectionRule:
"""A single detection rule that evaluates events for suspicious patterns."""
def __init__(
self,
name: str,
severity: str,
description: str,
condition: Callable[[AIEvent, "DetectionContext"], bool],
throttle_seconds: int = 60,
) -> None:
self.name = name
self.severity = severity
self.description = description
self.condition = condition
self.throttle_seconds = throttle_seconds
self._last_fired: datetime | None = None
def evaluate(self, event: AIEvent, context: "DetectionContext") -> Alert | None:
"""Evaluate the rule against an event. Returns an Alert if triggered."""
if self._last_fired and (
datetime.utcnow() - self._last_fired
).total_seconds() < self.throttle_seconds:
return None
if self.condition(event, context):
self._last_fired = datetime.utcnow()
return Alert(
alert_id=f"{self.name}:{event.event_id}",
rule_name=self.name,
severity=self.severity,
title=f"Detection: {self.name}",
description=self.description,
related_events=[event],
)
return None
class DetectionContext:
"""Maintains sliding-window state for correlation rules."""
def __init__(self, window_seconds: int = 300) -> None:
self._window = timedelta(seconds=window_seconds)
self._events_by_session: dict[str, deque[AIEvent]] = defaultdict(
lambda: deque(maxlen=1000)
)
self._events_by_ip: dict[str, deque[AIEvent]] = defaultdict(
lambda: deque(maxlen=1000)
)
self._blocked_count_by_ip: dict[str, int] = defaultdict(int)
self._safety_scores_by_session: dict[str, list[float]] = defaultdict(list)
def ingest(self, event: AIEvent) -> None:
"""Add an event to the context window."""
if event.session_id:
self._events_by_session[event.session_id].append(event)
if event.source_ip:
self._events_by_ip[event.source_ip].append(event)
if event.blocked:
self._blocked_count_by_ip[event.source_ip] += 1
if event.safety_score is not None and event.session_id:
self._safety_scores_by_session[event.session_id].append(event.safety_score)
def blocked_count_from_ip(self, ip: str, window: timedelta | None = None) -> int:
"""Count blocked requests from an IP within the window."""
if window is None:
window = self._window
cutoff = datetime.utcnow() - window
return sum(
1 for e in self._events_by_ip.get(ip, [])
if e.blocked and e.timestamp > cutoff
)
def request_count_from_ip(self, ip: str, window: timedelta | None = None) -> int:
if window is None:
window = self._window
cutoff = datetime.utcnow() - window
return sum(
1 for e in self._events_by_ip.get(ip, [])
if e.timestamp > cutoff
)
def average_safety_score(self, session_id: str) -> float:
scores = self._safety_scores_by_session.get(session_id, [])
return sum(scores) / len(scores) if scores else 0.0
def session_event_count(self, session_id: str) -> int:
return len(self._events_by_session.get(session_id, []))
# --- Built-in Detection Rules ---
def _repeated_blocks_rule(event: AIEvent, ctx: DetectionContext) -> bool:
"""Fires when an IP has many blocked requests — likely an active attack."""
if not event.source_ip:
return False
return ctx.blocked_count_from_ip(event.source_ip) >= 10
def _escalating_safety_scores(event: AIEvent, ctx: DetectionContext) -> bool:
"""Fires when safety scores in a session are trending upward — probe refinement."""
if not event.session_id or event.safety_score is None:
return False
scores = ctx._safety_scores_by_session.get(event.session_id, [])
if len(scores) < 5:
return False
# Check if the last 5 scores are monotonically increasing.
recent = scores[-5:]
return all(recent[i] <= recent[i + 1] for i in range(len(recent) - 1)) and recent[-1] > 0.7
def _high_volume_single_session(event: AIEvent, ctx: DetectionContext) -> bool:
"""Fires when a single session has an unusually high message count."""
if not event.session_id:
return False
return ctx.session_event_count(event.session_id) > 100
def _content_filter_bypass(event: AIEvent, ctx: DetectionContext) -> bool:
"""Fires on a successful response after multiple content filter triggers."""
if event.event_type != EventType.API_RESPONSE:
return False
if event.blocked:
return False
# Check if this session had multiple prior blocks.
session_events = ctx._events_by_session.get(event.session_id, [])
recent_blocks = sum(1 for e in session_events if e.blocked)
return recent_blocks >= 3
# Registry of built-in rules.
BUILTIN_RULES = [
DetectionRule(
name="repeated_blocks",
severity="high",
description="Multiple blocked requests from the same IP address indicates active attack probing.",
condition=_repeated_blocks_rule,
throttle_seconds=120,
),
DetectionRule(
name="escalating_safety_scores",
severity="high",
description="Safety scores in a session are monotonically increasing, suggesting an attacker is refining their technique.",
condition=_escalating_safety_scores,
throttle_seconds=300,
),
DetectionRule(
name="high_volume_session",
severity="medium",
description="A single session has an unusually high number of messages, which may indicate automated probing.",
condition=_high_volume_single_session,
throttle_seconds=600,
),
DetectionRule(
name="content_filter_bypass",
severity="critical",
description="A session that previously triggered content filters is now receiving unblocked responses, suggesting a successful bypass.",
condition=_content_filter_bypass,
throttle_seconds=60,
),
]
class DetectionEngine:
"""Processes events through detection rules and generates alerts."""
def __init__(self, rules: list[DetectionRule] | None = None) -> None:
self.rules = rules or BUILTIN_RULES.copy()
self.context = DetectionContext()
self.alert_handlers: list[Callable[[Alert], None]] = []
def register_handler(self, handler: Callable[[Alert], None]) -> None:
self.alert_handlers.append(handler)
def process_event(self, event: AIEvent) -> list[Alert]:
"""Process a single event and return any generated alerts."""
self.context.ingest(event)
alerts: list[Alert] = []
for rule in self.rules:
alert = rule.evaluate(event, self.context)
if alert:
alerts.append(alert)
for handler in self.alert_handlers:
try:
handler(alert)
except Exception:
logger.exception("Alert handler failed for %s", alert.alert_id)
return alertsPhase 3: Containment Playbooks
# airs/containment.py
"""Automated containment playbooks for AI security incidents."""
from __future__ import annotations
import abc
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from .detection import Alert
logger = logging.getLogger(__name__)
@dataclass
class ContainmentAction:
"""A containment action taken in response to an incident."""
action_type: str
target: str # What was acted upon (IP, session, model endpoint, etc.)
description: str
executed_at: datetime
success: bool
details: dict[str, Any]
rollback_instructions: str = ""
class ContainmentPlaybook(abc.ABC):
"""Base class for containment playbooks."""
name: str
description: str
applicable_rules: list[str] # Which detection rules trigger this playbook
@abc.abstractmethod
async def execute(self, alert: Alert) -> list[ContainmentAction]:
"""Execute the containment playbook and return actions taken."""
...
class RateLimitPlaybook(ContainmentPlaybook):
"""Apply aggressive rate limiting to a suspicious source."""
name = "rate_limit"
description = "Apply strict rate limits to the source IP or user."
applicable_rules = ["repeated_blocks", "high_volume_session"]
def __init__(self, rate_limiter: Any) -> None:
self._rate_limiter = rate_limiter
async def execute(self, alert: Alert) -> list[ContainmentAction]:
actions: list[ContainmentAction] = []
for event in alert.related_events:
if event.source_ip:
try:
# Apply a restrictive rate limit: 5 requests per minute.
await self._rate_limiter.set_limit(
key=f"ip:{event.source_ip}",
max_requests=5,
window_seconds=60,
)
actions.append(
ContainmentAction(
action_type="rate_limit",
target=event.source_ip,
description=f"Applied 5 req/min limit to IP {event.source_ip}",
executed_at=datetime.utcnow(),
success=True,
details={"max_requests": 5, "window": 60},
rollback_instructions=f"Remove rate limit for IP {event.source_ip}",
)
)
except Exception as exc:
logger.error("Failed to apply rate limit: %s", exc)
actions.append(
ContainmentAction(
action_type="rate_limit",
target=event.source_ip,
description=f"FAILED to apply rate limit to {event.source_ip}: {exc}",
executed_at=datetime.utcnow(),
success=False,
details={"error": str(exc)},
)
)
return actions
class ModelDowngradePlaybook(ContainmentPlaybook):
"""Switch to a more restricted model configuration."""
name = "model_downgrade"
description = "Switch the affected system to a safer model configuration."
applicable_rules = ["content_filter_bypass", "escalating_safety_scores"]
def __init__(self, model_config_manager: Any) -> None:
self._config_manager = model_config_manager
async def execute(self, alert: Alert) -> list[ContainmentAction]:
actions: list[ContainmentAction] = []
source_systems = {e.source_system for e in alert.related_events}
for system in source_systems:
try:
previous_config = await self._config_manager.get_config(system)
await self._config_manager.apply_safe_mode(system)
actions.append(
ContainmentAction(
action_type="model_downgrade",
target=system,
description=f"Switched {system} to safe mode configuration",
executed_at=datetime.utcnow(),
success=True,
details={
"previous_model": previous_config.get("model", "unknown"),
"new_config": "safe_mode",
},
rollback_instructions=f"Restore {system} to previous config: {previous_config}",
)
)
except Exception as exc:
logger.error("Failed to downgrade model for %s: %s", system, exc)
return actions
class EnhancedLoggingPlaybook(ContainmentPlaybook):
"""Enable verbose logging for forensic capture."""
name = "enhanced_logging"
description = "Enable full request/response logging for forensic analysis."
applicable_rules = ["content_filter_bypass", "escalating_safety_scores", "repeated_blocks"]
def __init__(self, logging_manager: Any) -> None:
self._logging_manager = logging_manager
async def execute(self, alert: Alert) -> list[ContainmentAction]:
sessions = {e.session_id for e in alert.related_events if e.session_id}
ips = {e.source_ip for e in alert.related_events if e.source_ip}
actions: list[ContainmentAction] = []
for session_id in sessions:
try:
await self._logging_manager.enable_verbose(
session_id=session_id,
capture_full_content=True,
duration_minutes=60,
)
actions.append(
ContainmentAction(
action_type="enhanced_logging",
target=session_id,
description=f"Enabled verbose logging for session {session_id}",
executed_at=datetime.utcnow(),
success=True,
details={"duration_minutes": 60, "capture_full_content": True},
)
)
except Exception as exc:
logger.error("Failed to enable enhanced logging: %s", exc)
return actionsPhase 4: Forensic Analyzer
# airs/forensics.py
"""Forensic analysis tools for AI security incidents."""
from __future__ import annotations
import json
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
from typing import Any
from .events import AIEvent, EventType
@dataclass
class ConversationTurn:
"""A single turn in a reconstructed conversation."""
timestamp: datetime
role: str # "user", "assistant", "system", "tool"
content: str
safety_score: float | None = None
blocked: bool = False
metadata: dict[str, Any] | None = None
@dataclass
class AttackChainStep:
"""A step in a reconstructed attack chain."""
step_number: int
timestamp: datetime
technique: str
description: str
event: AIEvent
success: bool
impact: str = ""
@dataclass
class ForensicReport:
"""Complete forensic analysis of an incident."""
incident_id: str
timeline: list[ConversationTurn]
attack_chain: list[AttackChainStep]
affected_sessions: list[str]
affected_users: list[str]
data_exposure_assessment: str
technique_summary: str
recommendations: list[str]
class ForensicAnalyzer:
"""Reconstructs and analyzes AI security incidents from event logs."""
# Mapping of patterns to attack technique names.
TECHNIQUE_INDICATORS = {
"prompt_injection": [
"ignore", "disregard", "override", "forget", "new instructions",
],
"jailbreak": [
"DAN", "developer mode", "unrestricted", "no limits", "bypass",
],
"data_exfiltration": [
"system prompt", "reveal", "show me your", "initial instructions",
"repeat the above", "print everything",
],
"encoding_attack": [
"base64", "decode", "rot13", "hex", "unicode",
],
}
def reconstruct_conversation(
self, events: list[AIEvent], session_id: str
) -> list[ConversationTurn]:
"""Reconstruct the conversation timeline for a session."""
session_events = sorted(
[e for e in events if e.session_id == session_id],
key=lambda e: e.timestamp,
)
turns: list[ConversationTurn] = []
for event in session_events:
if event.event_type == EventType.API_REQUEST:
user_msg = event.content.get("user_message", "")
if user_msg:
turns.append(
ConversationTurn(
timestamp=event.timestamp,
role="user",
content=user_msg,
safety_score=event.safety_score,
blocked=event.blocked,
)
)
elif event.event_type == EventType.API_RESPONSE:
assistant_msg = event.content.get("response_text", "")
if assistant_msg:
turns.append(
ConversationTurn(
timestamp=event.timestamp,
role="assistant",
content=assistant_msg,
safety_score=event.safety_score,
blocked=event.blocked,
)
)
return turns
def identify_attack_techniques(
self, events: list[AIEvent]
) -> list[AttackChainStep]:
"""Analyze events to identify attack techniques used."""
chain: list[AttackChainStep] = []
step = 0
for event in sorted(events, key=lambda e: e.timestamp):
user_msg = event.content.get("user_message", "").lower()
if not user_msg:
continue
for technique, indicators in self.TECHNIQUE_INDICATORS.items():
if any(ind.lower() in user_msg for ind in indicators):
step += 1
chain.append(
AttackChainStep(
step_number=step,
timestamp=event.timestamp,
technique=technique,
description=f"Detected {technique} indicators in user message",
event=event,
success=not event.blocked,
impact="Response generated" if not event.blocked else "Blocked by filter",
)
)
return chain
def assess_data_exposure(
self, events: list[AIEvent]
) -> str:
"""Assess what data may have been exposed during the incident."""
exposed_categories: set[str] = set()
unblocked_after_attack = 0
attack_detected = False
for event in sorted(events, key=lambda e: e.timestamp):
user_msg = event.content.get("user_message", "").lower()
# Check if this looks like an attack message.
is_attack = any(
ind.lower() in user_msg
for indicators in self.TECHNIQUE_INDICATORS.values()
for ind in indicators
)
if is_attack:
attack_detected = True
if attack_detected and not event.blocked and event.event_type == EventType.API_RESPONSE:
unblocked_after_attack += 1
response = event.content.get("response_text", "")
if "system prompt" in response.lower() or "instructions" in response.lower():
exposed_categories.add("system_prompt")
if any(kw in response.lower() for kw in ["password", "api key", "secret", "token"]):
exposed_categories.add("credentials")
if not exposed_categories and unblocked_after_attack == 0:
return "No data exposure detected. All attack attempts were blocked."
elif not exposed_categories:
return (
f"{unblocked_after_attack} response(s) were generated after attack detection. "
"Manual review recommended to assess content sensitivity."
)
else:
return (
f"Potential exposure of: {', '.join(exposed_categories)}. "
f"{unblocked_after_attack} unblocked response(s) after attack onset. "
"Immediate investigation required."
)
def generate_report(
self, incident_id: str, events: list[AIEvent]
) -> ForensicReport:
"""Generate a complete forensic report for an incident."""
sessions = list({e.session_id for e in events if e.session_id})
users = list({e.user_id for e in events if e.user_id})
# Reconstruct conversations for all affected sessions.
timeline: list[ConversationTurn] = []
for sid in sessions:
timeline.extend(self.reconstruct_conversation(events, sid))
timeline.sort(key=lambda t: t.timestamp)
attack_chain = self.identify_attack_techniques(events)
data_assessment = self.assess_data_exposure(events)
# Summarize techniques used.
techniques_used = list({step.technique for step in attack_chain})
technique_summary = (
f"The attacker used {len(techniques_used)} technique(s): "
+ ", ".join(techniques_used)
if techniques_used
else "No specific attack techniques identified in the event log."
)
recommendations = [
"Review and strengthen input validation for the affected application.",
"Audit system prompts for information that should not be exposed.",
"Consider adding additional guardrails for the identified attack techniques.",
"Review rate limiting configuration for the affected endpoints.",
"Conduct a post-incident review with the application development team.",
]
return ForensicReport(
incident_id=incident_id,
timeline=timeline,
attack_chain=attack_chain,
affected_sessions=sessions,
affected_users=users,
data_exposure_assessment=data_assessment,
technique_summary=technique_summary,
recommendations=recommendations,
)Phase 5: Notification Integrations
# airs/notifications.py
"""Notification integrations for alert delivery."""
from __future__ import annotations
import abc
import json
import logging
from typing import Any
import httpx
from .detection import Alert
logger = logging.getLogger(__name__)
class NotificationChannel(abc.ABC):
"""Base class for notification channels."""
@abc.abstractmethod
async def send(self, alert: Alert) -> bool:
"""Send an alert notification. Returns True on success."""
...
class SlackNotifier(NotificationChannel):
"""Send alerts to a Slack channel via webhook."""
SEVERITY_COLORS = {
"critical": "#FF0000",
"high": "#FF6600",
"medium": "#FFCC00",
"low": "#00CC00",
}
def __init__(self, webhook_url: str, channel: str = "") -> None:
self._webhook_url = webhook_url
self._channel = channel
async def send(self, alert: Alert) -> bool:
color = self.SEVERITY_COLORS.get(alert.severity, "#808080")
payload = {
"attachments": [
{
"color": color,
"title": f"AI Security Alert: {alert.title}",
"text": alert.description,
"fields": [
{"title": "Severity", "value": alert.severity.upper(), "short": True},
{"title": "Rule", "value": alert.rule_name, "short": True},
{"title": "Alert ID", "value": alert.alert_id, "short": True},
{
"title": "Related Events",
"value": str(len(alert.related_events)),
"short": True,
},
],
"ts": int(alert.triggered_at.timestamp()),
}
]
}
if self._channel:
payload["channel"] = self._channel
try:
async with httpx.AsyncClient() as client:
resp = await client.post(self._webhook_url, json=payload, timeout=10)
return resp.status_code == 200
except Exception:
logger.exception("Failed to send Slack notification")
return False
class PagerDutyNotifier(NotificationChannel):
"""Send critical alerts to PagerDuty."""
def __init__(self, routing_key: str) -> None:
self._routing_key = routing_key
async def send(self, alert: Alert) -> bool:
# Only page for critical and high severity.
if alert.severity not in ("critical", "high"):
return True
pagerduty_severity = "critical" if alert.severity == "critical" else "error"
payload = {
"routing_key": self._routing_key,
"event_action": "trigger",
"payload": {
"summary": f"[AI Security] {alert.title}",
"severity": pagerduty_severity,
"source": "ai-incident-response-system",
"custom_details": {
"rule_name": alert.rule_name,
"description": alert.description,
"related_events": len(alert.related_events),
},
},
"dedup_key": alert.alert_id,
}
try:
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://events.pagerduty.com/v2/enqueue",
json=payload,
timeout=10,
)
return resp.status_code == 202
except Exception:
logger.exception("Failed to send PagerDuty notification")
return FalseEvaluation Criteria
| Criterion | Weight | Excellent | Satisfactory | Needs Improvement |
|---|---|---|---|---|
| Detection | 30% | 5+ detection rules with correlation, configurable thresholds, low false-positive design | 3+ rules with basic thresholds | Fewer than 3 rules or no correlation |
| Containment | 25% | Multiple automated playbooks with rollback, semi-automated escalation paths | Basic automated actions (rate limit, block) | Manual-only containment |
| Forensics | 20% | Conversation reconstruction, attack chain identification, data exposure assessment | Basic log aggregation and timeline | Raw log access only |
| Integration | 15% | Multiple notification channels, SIEM-compatible output, webhook support | Single notification channel | No external integrations |
| Resilience | 10% | Handles high event volumes, graceful degradation, operates independently of monitored systems | Reasonable throughput, basic error handling | Crashes under load or shares infrastructure with targets |
Stretch Goals
- Implement ML-based anomaly detection that learns "normal" behavior per application and detects deviations without explicit rules.
- Build a graph-based attack visualization that shows relationships between sessions, IPs, and attack techniques.
- Add support for automated evidence preservation with chain-of-custody documentation.
- Integrate with MITRE ATLAS to automatically map detected attacks to known technique IDs.
References
- NIST. (2012). "SP 800-61 Rev. 2: Computer Security Incident Handling Guide." https://csrc.nist.gov/pubs/sp/800/61/r2/final
- MITRE. (2024). "ATLAS — Adversarial Threat Landscape for AI Systems." https://atlas.mitre.org/
- Anthropic. (2024). "Challenges in Red Teaming AI Systems." https://www.anthropic.com/research/challenges-in-red-teaming-ai-systems