Capstone: Design an Enterprise AI Security Program
Architect a comprehensive enterprise AI security program spanning governance, technical controls, risk management, and incident response for organizations deploying LLMs at scale.
Overview
Most organizations adopting AI are bolting model deployments onto existing security programs designed for traditional software. This approach leaves critical gaps: there is no inventory of which models are deployed where, no process for evaluating model supply chain risks, no monitoring for prompt injection or data exfiltration through model outputs, and no incident response playbook for AI-specific attack scenarios. When an incident occurs — a model leaks PII, an agent executes unauthorized actions, or a fine-tuned model exhibits poisoned behavior — security teams scramble to improvise.
This capstone project tasks you with designing a complete enterprise AI security program from the ground up. The program spans four pillars: governance (policies, roles, and oversight), asset management (inventory and classification of AI components), technical controls (preventive, detective, and corrective measures), and operations (monitoring, incident response, and continuous improvement). Each pillar produces concrete artifacts — policy documents, asset registries, control configurations, and runbooks — that an organization can adopt directly.
The design aligns with the NIST AI Risk Management Framework (AI RMF) four-function structure (GOVERN, MAP, MEASURE, MANAGE) and incorporates requirements from ISO/IEC 42001:2023 (AI Management System) and the EU AI Act's obligations for high-risk AI systems. Where these frameworks provide high-level guidance, this project provides the specific implementation details that bridge the gap between compliance requirements and operational security.
The deliverable is not a document — it is a working system with automation, tooling, and integration points that make the security program operationally sustainable.
Project Requirements
Program Architecture
The AI security program operates across four pillars with clear interfaces:
┌─────────────────────────────────────────────────────┐
│ GOVERNANCE PILLAR │
│ Policies │ Roles │ Risk Appetite │ Oversight Board │
├─────────────────────────────────────────────────────┤
│ ASSET MANAGEMENT PILLAR │
│ AI Inventory │ Classification │ Supply Chain │ SBOM │
├─────────────────────────────────────────────────────┤
│ TECHNICAL CONTROLS PILLAR │
│ Pre-deploy │ Runtime │ Monitoring │ Data Protection │
├─────────────────────────────────────────────────────┤
│ OPERATIONS PILLAR │
│ Incident Response │ Red Teaming │ Metrics │ Review │
└─────────────────────────────────────────────────────┘
Functional Requirements
-
Governance Framework — Policies covering acceptable AI use, model procurement, data handling for AI, red teaming requirements, and AI incident classification. Defined RACI matrix for AI security responsibilities.
-
AI Asset Inventory — Automated discovery and registration of AI models, datasets, embedding stores, agent configurations, API keys, and integration points across the organization.
-
Risk Assessment Engine — Quantitative risk scoring for each AI asset based on data sensitivity, deployment context, access surface, model provenance, and regulatory classification.
-
Technical Control Library — Catalog of preventive, detective, and corrective controls mapped to OWASP LLM Top 10 risks with implementation specifications.
-
Runtime Monitoring — Detection rules for AI-specific threats including prompt injection attempts, anomalous output patterns, data exfiltration via model responses, and cost abuse.
-
Incident Response Playbooks — AI-specific playbooks for model compromise, data poisoning, prompt injection exploitation, agent misuse, and model supply chain attacks.
-
Metrics and Reporting — KPIs and dashboards tracking program maturity, risk posture, incident trends, and compliance status.
Implementation Guide
Phase 1: AI Asset Inventory and Classification
Build the automated inventory system that provides visibility into all AI assets across the organization.
# inventory/asset_registry.py
"""Enterprise AI asset inventory and classification system."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, date
from enum import Enum
from typing import Optional
from uuid import uuid4
logger = logging.getLogger(__name__)
class AssetType(Enum):
"""Types of AI assets tracked by the inventory."""
FOUNDATION_MODEL = "foundation_model"
FINE_TUNED_MODEL = "fine_tuned_model"
EMBEDDING_MODEL = "embedding_model"
VECTOR_DATABASE = "vector_database"
TRAINING_DATASET = "training_dataset"
EVALUATION_DATASET = "evaluation_dataset"
AI_AGENT = "ai_agent"
API_ENDPOINT = "api_endpoint"
GUARDRAIL_CONFIG = "guardrail_config"
PROMPT_TEMPLATE = "prompt_template"
class DataClassification(Enum):
"""Data sensitivity classification for AI assets."""
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
REGULATED = "regulated" # Subject to specific regulatory requirements
class DeploymentEnvironment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
RESEARCH = "research"
class RegulatoryScope(Enum):
"""Regulatory frameworks that may apply to the asset."""
NONE = "none"
GDPR = "gdpr"
HIPAA = "hipaa"
SOX = "sox"
EU_AI_ACT_HIGH_RISK = "eu_ai_act_high_risk"
EU_AI_ACT_LIMITED_RISK = "eu_ai_act_limited_risk"
PCI_DSS = "pci_dss"
CCPA = "ccpa"
@dataclass
class AIAsset:
"""A tracked AI asset in the enterprise inventory."""
asset_id: str = field(default_factory=lambda: str(uuid4()))
name: str = ""
asset_type: AssetType = AssetType.FOUNDATION_MODEL
description: str = ""
owner_team: str = ""
owner_email: str = ""
data_classification: DataClassification = DataClassification.INTERNAL
deployment_env: DeploymentEnvironment = DeploymentEnvironment.DEVELOPMENT
regulatory_scope: list[RegulatoryScope] = field(default_factory=list)
# Model-specific metadata
model_provider: Optional[str] = None
model_name: Optional[str] = None
model_version: Optional[str] = None
is_self_hosted: bool = False
parameter_count: Optional[str] = None
# Supply chain metadata
source_url: Optional[str] = None
license_type: Optional[str] = None
last_security_review: Optional[date] = None
sbom_available: bool = False
# Deployment metadata
api_endpoint: Optional[str] = None
cloud_provider: Optional[str] = None
cloud_region: Optional[str] = None
network_exposure: str = "internal" # internal, vpc, public
# Dependencies
upstream_assets: list[str] = field(default_factory=list) # Asset IDs this depends on
downstream_assets: list[str] = field(default_factory=list) # Asset IDs that depend on this
# Audit
registered_date: datetime = field(default_factory=datetime.utcnow)
last_updated: datetime = field(default_factory=datetime.utcnow)
risk_score: Optional[float] = None
compliance_status: str = "unreviewed"
class AssetDiscovery:
"""Automated discovery of AI assets across cloud environments."""
def discover_aws_bedrock(self, session) -> list[AIAsset]:
"""Discover AI assets in AWS Bedrock."""
assets = []
try:
bedrock = session.client("bedrock")
# List custom models (fine-tuned)
custom_models = bedrock.list_custom_models()
for model in custom_models.get("modelSummaries", []):
asset = AIAsset(
name=model["modelName"],
asset_type=AssetType.FINE_TUNED_MODEL,
description=f"AWS Bedrock custom model: {model['modelName']}",
model_provider="aws_bedrock",
model_name=model["modelName"],
cloud_provider="aws",
deployment_env=DeploymentEnvironment.PRODUCTION,
is_self_hosted=False,
)
assets.append(asset)
# List provisioned model throughput
provisioned = bedrock.list_provisioned_model_throughputs()
for pt in provisioned.get("provisionedModelSummaries", []):
asset = AIAsset(
name=f"Provisioned: {pt['provisionedModelName']}",
asset_type=AssetType.API_ENDPOINT,
model_provider="aws_bedrock",
model_name=pt.get("foundationModelArn", ""),
cloud_provider="aws",
deployment_env=DeploymentEnvironment.PRODUCTION,
)
assets.append(asset)
except Exception as e:
logger.error(f"AWS Bedrock discovery failed: {e}")
return assets
def discover_azure_openai(self, credential) -> list[AIAsset]:
"""Discover AI assets in Azure OpenAI Service."""
assets = []
try:
from azure.mgmt.cognitiveservices import CognitiveServicesManagementClient
from azure.identity import DefaultAzureCredential
client = CognitiveServicesManagementClient(
credential=credential,
subscription_id="<subscription_id>",
)
# List all Cognitive Services accounts of kind OpenAI
for account in client.accounts.list():
if account.kind == "OpenAI":
# List deployments within this account
deployments = client.deployments.list(
resource_group_name=account.id.split("/")[4],
account_name=account.name,
)
for deployment in deployments:
asset = AIAsset(
name=f"Azure OpenAI: {deployment.name}",
asset_type=AssetType.API_ENDPOINT,
model_provider="azure_openai",
model_name=deployment.properties.model.name,
model_version=deployment.properties.model.version,
cloud_provider="azure",
cloud_region=account.location,
deployment_env=DeploymentEnvironment.PRODUCTION,
api_endpoint=account.properties.endpoint,
)
assets.append(asset)
except Exception as e:
logger.error(f"Azure OpenAI discovery failed: {e}")
return assets
def discover_huggingface_models(self, search_paths: list[str]) -> list[AIAsset]:
"""Discover locally deployed Hugging Face models."""
assets = []
from pathlib import Path
for search_path in search_paths:
path = Path(search_path)
# Look for model config files that indicate HF models
for config_file in path.rglob("config.json"):
try:
config = json.loads(config_file.read_text())
if "model_type" in config or "architectures" in config:
model_name = config.get(
"_name_or_path",
config_file.parent.name,
)
asset = AIAsset(
name=f"Local HF: {model_name}",
asset_type=AssetType.FOUNDATION_MODEL,
model_provider="huggingface",
model_name=model_name,
is_self_hosted=True,
source_url=f"https://huggingface.co/{model_name}",
deployment_env=DeploymentEnvironment.PRODUCTION,
)
assets.append(asset)
except (json.JSONDecodeError, OSError):
continue
return assets
class AssetRegistry:
"""Central registry for managing AI asset inventory."""
def __init__(self):
self._assets: dict[str, AIAsset] = {}
self._discovery = AssetDiscovery()
def register(self, asset: AIAsset) -> str:
"""Register a new AI asset. Returns the asset ID."""
if not asset.name:
raise ValueError("Asset must have a name")
# Check for duplicates based on key attributes
for existing in self._assets.values():
if (existing.model_name == asset.model_name
and existing.model_provider == asset.model_provider
and existing.api_endpoint == asset.api_endpoint
and existing.deployment_env == asset.deployment_env):
logger.info(
f"Updating existing asset {existing.asset_id} instead of creating duplicate"
)
existing.last_updated = datetime.utcnow()
return existing.asset_id
self._assets[asset.asset_id] = asset
logger.info(f"Registered new AI asset: {asset.name} ({asset.asset_id})")
return asset.asset_id
def get(self, asset_id: str) -> Optional[AIAsset]:
return self._assets.get(asset_id)
def search(
self,
asset_type: Optional[AssetType] = None,
data_classification: Optional[DataClassification] = None,
owner_team: Optional[str] = None,
regulatory_scope: Optional[RegulatoryScope] = None,
unreviewed_only: bool = False,
) -> list[AIAsset]:
"""Search assets with filters."""
results = list(self._assets.values())
if asset_type:
results = [a for a in results if a.asset_type == asset_type]
if data_classification:
results = [a for a in results if a.data_classification == data_classification]
if owner_team:
results = [a for a in results if a.owner_team == owner_team]
if regulatory_scope:
results = [a for a in results if regulatory_scope in a.regulatory_scope]
if unreviewed_only:
results = [a for a in results if a.compliance_status == "unreviewed"]
return results
def dependency_graph(self, asset_id: str) -> dict:
"""Build the upstream/downstream dependency graph for an asset."""
asset = self._assets.get(asset_id)
if not asset:
return {}
visited = set()
def walk_upstream(aid: str) -> dict:
if aid in visited:
return {"id": aid, "circular": True}
visited.add(aid)
a = self._assets.get(aid)
if not a:
return {"id": aid, "missing": True}
return {
"id": aid,
"name": a.name,
"type": a.asset_type.value,
"upstream": [walk_upstream(uid) for uid in a.upstream_assets],
}
return walk_upstream(asset_id)
def compliance_gap_report(self) -> dict:
"""Identify assets with compliance gaps."""
gaps = {
"no_security_review": [],
"stale_review": [],
"missing_sbom": [],
"unclassified_data": [],
"no_owner": [],
"public_exposure_high_sensitivity": [],
}
for asset in self._assets.values():
if asset.last_security_review is None:
gaps["no_security_review"].append(asset.asset_id)
if (asset.last_security_review
and (date.today() - asset.last_security_review).days > 90):
gaps["stale_review"].append(asset.asset_id)
if not asset.sbom_available and asset.is_self_hosted:
gaps["missing_sbom"].append(asset.asset_id)
if asset.data_classification == DataClassification.INTERNAL and asset.regulatory_scope:
gaps["unclassified_data"].append(asset.asset_id)
if not asset.owner_team:
gaps["no_owner"].append(asset.asset_id)
if (asset.network_exposure == "public"
and asset.data_classification in (
DataClassification.CONFIDENTIAL,
DataClassification.RESTRICTED,
DataClassification.REGULATED,
)):
gaps["public_exposure_high_sensitivity"].append(asset.asset_id)
return gapsPhase 2: Risk Assessment Engine
Quantify risk for each AI asset based on multiple dimensions.
# risk/assessment.py
"""Quantitative risk assessment engine for AI assets."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from typing import Optional
from inventory.asset_registry import (
AIAsset, AssetType, DataClassification,
DeploymentEnvironment, RegulatoryScope,
)
@dataclass
class RiskAssessment:
"""Quantified risk assessment for a single AI asset."""
asset_id: str
asset_name: str
inherent_risk_score: float # Risk without controls (0-100)
control_effectiveness: float # How much controls reduce risk (0-1)
residual_risk_score: float # Risk after controls (0-100)
risk_level: str # CRITICAL, HIGH, MEDIUM, LOW
risk_factors: list[dict] # Individual contributing factors
recommendations: list[str] # Prioritized remediation steps
assessed_date: date
@property
def risk_reduction_percentage(self) -> float:
if self.inherent_risk_score == 0:
return 0
return (1 - self.residual_risk_score / self.inherent_risk_score) * 100
class AIRiskAssessor:
"""Assesses risk for AI assets across multiple dimensions."""
# Risk weights for each dimension (must sum to 1.0)
DIMENSION_WEIGHTS = {
"data_sensitivity": 0.25,
"deployment_exposure": 0.20,
"model_provenance": 0.15,
"regulatory_impact": 0.15,
"capability_risk": 0.15,
"supply_chain": 0.10,
}
def assess(self, asset: AIAsset, controls: Optional[list[dict]] = None) -> RiskAssessment:
"""Run a full risk assessment on an AI asset."""
risk_factors = []
# Data sensitivity dimension
data_score = self._score_data_sensitivity(asset)
risk_factors.append({
"dimension": "data_sensitivity",
"score": data_score,
"weight": self.DIMENSION_WEIGHTS["data_sensitivity"],
"details": f"Data classification: {asset.data_classification.value}",
})
# Deployment exposure dimension
exposure_score = self._score_deployment_exposure(asset)
risk_factors.append({
"dimension": "deployment_exposure",
"score": exposure_score,
"weight": self.DIMENSION_WEIGHTS["deployment_exposure"],
"details": f"Network: {asset.network_exposure}, Env: {asset.deployment_env.value}",
})
# Model provenance dimension
provenance_score = self._score_model_provenance(asset)
risk_factors.append({
"dimension": "model_provenance",
"score": provenance_score,
"weight": self.DIMENSION_WEIGHTS["model_provenance"],
"details": f"Provider: {asset.model_provider}, Self-hosted: {asset.is_self_hosted}",
})
# Regulatory impact dimension
regulatory_score = self._score_regulatory_impact(asset)
risk_factors.append({
"dimension": "regulatory_impact",
"score": regulatory_score,
"weight": self.DIMENSION_WEIGHTS["regulatory_impact"],
"details": f"Regulatory scope: {[r.value for r in asset.regulatory_scope]}",
})
# Capability risk dimension
capability_score = self._score_capability_risk(asset)
risk_factors.append({
"dimension": "capability_risk",
"score": capability_score,
"weight": self.DIMENSION_WEIGHTS["capability_risk"],
"details": f"Asset type: {asset.asset_type.value}",
})
# Supply chain dimension
supply_chain_score = self._score_supply_chain(asset)
risk_factors.append({
"dimension": "supply_chain",
"score": supply_chain_score,
"weight": self.DIMENSION_WEIGHTS["supply_chain"],
"details": f"SBOM: {asset.sbom_available}, License: {asset.license_type}",
})
# Calculate inherent risk score
inherent_risk = sum(
f["score"] * f["weight"] for f in risk_factors
)
# Calculate control effectiveness
control_effectiveness = self._evaluate_controls(asset, controls or [])
# Residual risk = inherent risk * (1 - control effectiveness)
residual_risk = inherent_risk * (1 - control_effectiveness)
# Determine risk level
if residual_risk >= 75:
risk_level = "CRITICAL"
elif residual_risk >= 50:
risk_level = "HIGH"
elif residual_risk >= 25:
risk_level = "MEDIUM"
else:
risk_level = "LOW"
recommendations = self._generate_recommendations(risk_factors, asset)
return RiskAssessment(
asset_id=asset.asset_id,
asset_name=asset.name,
inherent_risk_score=round(inherent_risk, 2),
control_effectiveness=round(control_effectiveness, 2),
residual_risk_score=round(residual_risk, 2),
risk_level=risk_level,
risk_factors=risk_factors,
recommendations=recommendations,
assessed_date=date.today(),
)
def _score_data_sensitivity(self, asset: AIAsset) -> float:
"""Score risk from data sensitivity (0-100)."""
scores = {
DataClassification.PUBLIC: 10,
DataClassification.INTERNAL: 30,
DataClassification.CONFIDENTIAL: 60,
DataClassification.RESTRICTED: 85,
DataClassification.REGULATED: 95,
}
return scores.get(asset.data_classification, 50)
def _score_deployment_exposure(self, asset: AIAsset) -> float:
exposure_scores = {"internal": 20, "vpc": 40, "public": 90}
env_multipliers = {
DeploymentEnvironment.DEVELOPMENT: 0.4,
DeploymentEnvironment.STAGING: 0.6,
DeploymentEnvironment.PRODUCTION: 1.0,
DeploymentEnvironment.RESEARCH: 0.3,
}
base = exposure_scores.get(asset.network_exposure, 50)
multiplier = env_multipliers.get(asset.deployment_env, 0.5)
return min(100, base * multiplier)
def _score_model_provenance(self, asset: AIAsset) -> float:
if asset.model_provider in ("openai", "anthropic", "google"):
base = 20 # Major providers have security programs
elif asset.model_provider in ("azure_openai", "aws_bedrock", "gcp_vertex"):
base = 15 # Cloud-managed services add security layers
elif asset.is_self_hosted:
base = 60 # Self-hosted requires own security posture
else:
base = 50
if not asset.last_security_review:
base += 20
elif (date.today() - asset.last_security_review).days > 90:
base += 10
return min(100, base)
def _score_regulatory_impact(self, asset: AIAsset) -> float:
if not asset.regulatory_scope:
return 10
high_impact = {
RegulatoryScope.EU_AI_ACT_HIGH_RISK, RegulatoryScope.HIPAA,
RegulatoryScope.SOX, RegulatoryScope.PCI_DSS,
}
if any(r in high_impact for r in asset.regulatory_scope):
return 90
return 50
def _score_capability_risk(self, asset: AIAsset) -> float:
scores = {
AssetType.AI_AGENT: 90, # Agents can take actions
AssetType.FINE_TUNED_MODEL: 70, # Custom training data risks
AssetType.FOUNDATION_MODEL: 50,
AssetType.API_ENDPOINT: 60,
AssetType.VECTOR_DATABASE: 55,
AssetType.EMBEDDING_MODEL: 30,
AssetType.TRAINING_DATASET: 45,
AssetType.PROMPT_TEMPLATE: 25,
AssetType.GUARDRAIL_CONFIG: 20,
AssetType.EVALUATION_DATASET: 15,
}
return scores.get(asset.asset_type, 50)
def _score_supply_chain(self, asset: AIAsset) -> float:
score = 50
if asset.sbom_available:
score -= 20
if asset.license_type and asset.license_type != "unknown":
score -= 10
if asset.source_url and "huggingface.co" in (asset.source_url or ""):
score += 10 # Open model repos have higher supply chain risk
return max(0, min(100, score))
def _evaluate_controls(self, asset: AIAsset, controls: list[dict]) -> float:
"""Estimate control effectiveness based on deployed controls."""
if not controls:
return 0.0
control_scores = {
"input_filtering": 0.15,
"output_filtering": 0.15,
"rate_limiting": 0.10,
"authentication": 0.10,
"encryption_at_rest": 0.10,
"encryption_in_transit": 0.10,
"monitoring": 0.10,
"guardrails": 0.10,
"access_logging": 0.05,
"network_isolation": 0.05,
}
effectiveness = 0.0
for control in controls:
control_type = control.get("type", "")
if control_type in control_scores:
# Each control contributes based on its weight and operational status
operational = control.get("operational", False)
if operational:
effectiveness += control_scores[control_type]
return min(1.0, effectiveness)
def _generate_recommendations(
self, risk_factors: list[dict], asset: AIAsset
) -> list[str]:
"""Generate prioritized remediation recommendations."""
recs = []
# Sort risk factors by weighted contribution (descending)
sorted_factors = sorted(
risk_factors,
key=lambda f: f["score"] * f["weight"],
reverse=True,
)
for factor in sorted_factors[:3]: # Top 3 risk drivers
dim = factor["dimension"]
if dim == "data_sensitivity" and factor["score"] > 60:
recs.append(
"Implement data loss prevention (DLP) controls on model inputs and outputs. "
"Consider PII detection and redaction before data reaches the model."
)
elif dim == "deployment_exposure" and factor["score"] > 50:
recs.append(
"Reduce network exposure by deploying behind a VPC with private endpoints. "
"Implement API gateway with authentication and rate limiting."
)
elif dim == "model_provenance" and factor["score"] > 40:
recs.append(
"Conduct a security review of the model and its supply chain. "
"Generate an AI SBOM and verify model integrity checksums."
)
elif dim == "regulatory_impact" and factor["score"] > 60:
recs.append(
"Map regulatory obligations to specific technical controls. "
"Implement audit logging that satisfies retention requirements."
)
elif dim == "capability_risk" and factor["score"] > 60:
recs.append(
"Apply principle of least privilege to model/agent capabilities. "
"Implement human-in-the-loop approval for high-impact actions."
)
elif dim == "supply_chain" and factor["score"] > 40:
recs.append(
"Generate and maintain an AI Software Bill of Materials (SBOM). "
"Monitor for vulnerabilities in model dependencies and frameworks."
)
return recsPhase 3: AI Incident Response Playbook Engine
Build the incident response framework with AI-specific playbooks.
# incident_response/playbooks.py
"""AI-specific incident response playbook engine."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class IncidentSeverity(Enum):
SEV1 = "sev1" # Active exploitation, data breach, system compromise
SEV2 = "sev2" # Confirmed vulnerability, potential data exposure
SEV3 = "sev3" # Suspicious activity, policy violation
SEV4 = "sev4" # Informational, potential false positive
class IncidentCategory(Enum):
PROMPT_INJECTION_EXPLOITATION = "prompt_injection_exploitation"
DATA_EXFILTRATION_VIA_MODEL = "data_exfiltration_via_model"
MODEL_SUPPLY_CHAIN_COMPROMISE = "model_supply_chain_compromise"
AGENT_UNAUTHORIZED_ACTIONS = "agent_unauthorized_actions"
TRAINING_DATA_POISONING = "training_data_poisoning"
MODEL_DENIAL_OF_SERVICE = "model_denial_of_service"
PII_LEAKAGE_IN_OUTPUT = "pii_leakage_in_output"
JAILBREAK_BYPASS = "jailbreak_bypass"
@dataclass
class PlaybookStep:
"""A single step in an incident response playbook."""
step_number: int
phase: str # detection, containment, eradication, recovery, lessons_learned
action: str
responsible_role: str
automated: bool
automation_script: Optional[str] = None
timeout_minutes: int = 30
escalation_trigger: Optional[str] = None
@dataclass
class Playbook:
"""Complete incident response playbook for an AI incident category."""
category: IncidentCategory
title: str
description: str
severity_default: IncidentSeverity
steps: list[PlaybookStep] = field(default_factory=list)
detection_rules: list[dict] = field(default_factory=list)
containment_actions: list[str] = field(default_factory=list)
evidence_to_collect: list[str] = field(default_factory=list)
notification_requirements: list[str] = field(default_factory=list)
@dataclass
class Incident:
"""A tracked AI security incident."""
incident_id: str
category: IncidentCategory
severity: IncidentSeverity
title: str
description: str
affected_assets: list[str] # Asset IDs from the registry
detected_at: datetime
detected_by: str # Person, system, or rule that detected
current_phase: str = "detection"
assigned_to: Optional[str] = None
timeline: list[dict] = field(default_factory=list)
evidence: list[dict] = field(default_factory=list)
status: str = "open"
class PlaybookLibrary:
"""Library of AI-specific incident response playbooks."""
def __init__(self):
self._playbooks: dict[IncidentCategory, Playbook] = {}
self._register_default_playbooks()
def _register_default_playbooks(self):
"""Register built-in playbooks for common AI incident categories."""
# Prompt injection exploitation playbook
self._playbooks[IncidentCategory.PROMPT_INJECTION_EXPLOITATION] = Playbook(
category=IncidentCategory.PROMPT_INJECTION_EXPLOITATION,
title="Prompt Injection Exploitation Response",
description=(
"Response procedure for confirmed prompt injection attacks that have "
"bypassed guardrails and caused the model to execute unauthorized instructions."
),
severity_default=IncidentSeverity.SEV2,
steps=[
PlaybookStep(
step_number=1,
phase="detection",
action="Verify the alert is a true positive by reviewing the flagged request/response pair in the monitoring system",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=15,
),
PlaybookStep(
step_number=2,
phase="detection",
action="Classify the injection type (direct, indirect, multi-turn) and determine the achieved impact (data leak, instruction override, tool abuse)",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=30,
),
PlaybookStep(
step_number=3,
phase="containment",
action="If injection achieved tool execution or data access: immediately revoke the affected API session and rotate any exposed credentials",
responsible_role="ai_security_engineer",
automated=True,
automation_script="scripts/revoke_session.py",
timeout_minutes=5,
escalation_trigger="If credential rotation fails, escalate to SEV1",
),
PlaybookStep(
step_number=4,
phase="containment",
action="Deploy an emergency guardrail rule that blocks the specific injection pattern. Use exact-match and regex patterns derived from the attack payload.",
responsible_role="ai_security_engineer",
automated=True,
automation_script="scripts/deploy_emergency_guardrail.py",
timeout_minutes=15,
),
PlaybookStep(
step_number=5,
phase="eradication",
action="Analyze the full conversation history for the attacking session to determine if additional payloads were attempted or if data was exfiltrated across multiple turns",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=60,
),
PlaybookStep(
step_number=6,
phase="eradication",
action="Test the injection pattern against the current guardrail configuration to verify the emergency rule is effective. Test variations and mutations of the pattern.",
responsible_role="ai_red_team",
automated=False,
timeout_minutes=120,
),
PlaybookStep(
step_number=7,
phase="recovery",
action="If system prompt or confidential data was leaked, rotate the system prompt and notify affected data owners per the data breach notification policy",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=60,
),
PlaybookStep(
step_number=8,
phase="lessons_learned",
action="Document the incident in the vulnerability database with full attack chain, root cause analysis, and timeline. Update detection rules to catch similar patterns.",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=240,
),
],
detection_rules=[
{
"name": "injection_keyword_match",
"description": "Detect known injection patterns in user input",
"type": "regex",
"pattern": r"(ignore|disregard|forget)\s+(all\s+)?(previous|prior|above)\s+(instructions|prompts|rules)",
},
{
"name": "output_anomaly",
"description": "Detect when model output contains system prompt fragments",
"type": "similarity",
"threshold": 0.85,
"reference": "system_prompt_embeddings",
},
{
"name": "behavioral_shift",
"description": "Detect sudden change in model output characteristics within a session",
"type": "statistical",
"metric": "output_entropy_delta",
"threshold": 2.0,
},
],
containment_actions=[
"Terminate the affected user session",
"Block the source IP/API key temporarily",
"Enable enhanced logging on the affected endpoint",
"Deploy emergency input filter for the specific payload pattern",
],
evidence_to_collect=[
"Full conversation history for the affected session",
"Input/output logs with timestamps",
"Guardrail evaluation logs showing why the attack was not blocked",
"API access logs for the attacking identity",
"Model configuration at time of incident (system prompt, temperature, tools)",
],
notification_requirements=[
"AI Security team lead within 30 minutes",
"CISO within 2 hours for SEV1/SEV2",
"Legal/compliance if PII or regulated data was exposed",
"Affected application team within 1 hour",
],
)
# Agent unauthorized actions playbook
self._playbooks[IncidentCategory.AGENT_UNAUTHORIZED_ACTIONS] = Playbook(
category=IncidentCategory.AGENT_UNAUTHORIZED_ACTIONS,
title="Agent Unauthorized Actions Response",
description=(
"Response procedure when an AI agent executes actions outside its "
"authorized scope, whether triggered by prompt injection, misconfiguration, "
"or emergent behavior."
),
severity_default=IncidentSeverity.SEV1,
steps=[
PlaybookStep(
step_number=1,
phase="containment",
action="IMMEDIATELY disable the agent's tool access and API credentials. Do not wait for analysis — contain first.",
responsible_role="on_call_engineer",
automated=True,
automation_script="scripts/disable_agent_tools.py",
timeout_minutes=5,
escalation_trigger="If agent cannot be disabled within 5 minutes, escalate to infrastructure team for network-level block",
),
PlaybookStep(
step_number=2,
phase="containment",
action="Identify all actions the agent executed by reviewing tool call logs. Determine the blast radius — which systems, data, and users were affected.",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=30,
),
PlaybookStep(
step_number=3,
phase="eradication",
action="Reverse unauthorized actions where possible (delete created resources, revert data changes, revoke granted permissions)",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=120,
),
PlaybookStep(
step_number=4,
phase="recovery",
action="Before re-enabling the agent, implement stricter tool-use policies: explicit allow-lists, human-in-the-loop for sensitive actions, rate limits on tool calls",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=240,
),
],
detection_rules=[
{
"name": "unauthorized_tool_call",
"description": "Agent called a tool not in its authorized tool set",
"type": "policy",
"check": "tool_name NOT IN agent.authorized_tools",
},
{
"name": "excessive_tool_calls",
"description": "Agent making unusually many tool calls in a time window",
"type": "statistical",
"metric": "tool_calls_per_minute",
"threshold": 20,
},
],
containment_actions=[
"Disable all agent tool access immediately",
"Revoke agent API credentials",
"Block agent network access at the firewall level if needed",
],
evidence_to_collect=[
"Complete agent execution trace with all tool calls and responses",
"The triggering user input that led to unauthorized actions",
"Agent configuration including system prompt and tool definitions",
"Logs from all downstream systems the agent interacted with",
],
notification_requirements=[
"AI Security team lead IMMEDIATELY",
"CISO within 1 hour",
"Owners of all affected downstream systems within 1 hour",
"Legal if customer data was accessed or modified",
],
)
def get_playbook(self, category: IncidentCategory) -> Optional[Playbook]:
return self._playbooks.get(category)
def list_playbooks(self) -> list[dict]:
return [
{
"category": pb.category.value,
"title": pb.title,
"severity_default": pb.severity_default.value,
"num_steps": len(pb.steps),
}
for pb in self._playbooks.values()
]Phase 4: Program Metrics and Maturity Dashboard
Track program effectiveness with quantifiable metrics.
# metrics/dashboard.py
"""AI security program metrics and maturity scoring."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import date, timedelta
from typing import Optional
@dataclass
class ProgramMetrics:
"""Key performance indicators for the AI security program."""
# Asset management metrics
total_ai_assets: int = 0
assets_with_security_review: int = 0
assets_with_stale_review: int = 0 # Review older than 90 days
asset_coverage_rate: float = 0.0
# Risk metrics
critical_risk_assets: int = 0
high_risk_assets: int = 0
mean_residual_risk: float = 0.0
risk_acceptance_count: int = 0
# Incident metrics
incidents_last_30_days: int = 0
mean_time_to_detect_hours: float = 0.0
mean_time_to_contain_hours: float = 0.0
mean_time_to_resolve_hours: float = 0.0
incidents_by_category: dict = field(default_factory=dict)
# Control metrics
controls_deployed: int = 0
controls_operational: int = 0
control_coverage_rate: float = 0.0
# Red team metrics
red_team_engagements_ytd: int = 0
findings_from_red_team: int = 0
findings_remediated: int = 0
remediation_rate: float = 0.0
# Compliance metrics
compliance_gap_count: int = 0
overdue_actions: int = 0
@dataclass
class MaturityLevel:
"""CMMI-inspired maturity level for AI security program dimensions."""
dimension: str
level: int # 1-5
level_name: str # Initial, Managed, Defined, Quantitatively Managed, Optimizing
score: float # 0-100 within the level
evidence: list[str] = field(default_factory=list)
gaps_to_next_level: list[str] = field(default_factory=list)
def assess_program_maturity(metrics: ProgramMetrics) -> list[MaturityLevel]:
"""
Assess AI security program maturity across key dimensions.
Returns maturity levels inspired by CMMI, adapted for AI security:
Level 1 (Initial): Ad-hoc, reactive, no formal processes
Level 2 (Managed): Basic processes, some documentation
Level 3 (Defined): Standardized processes, proactive controls
Level 4 (Quantitative): Metrics-driven, continuous measurement
Level 5 (Optimizing): Continuous improvement, predictive capabilities
"""
levels = []
# Asset management maturity
if metrics.asset_coverage_rate >= 0.95:
asset_level = 4
asset_name = "Quantitatively Managed"
asset_gaps = ["Implement predictive asset risk scoring", "Automate SBOM generation"]
elif metrics.asset_coverage_rate >= 0.80:
asset_level = 3
asset_name = "Defined"
asset_gaps = ["Achieve 95%+ asset coverage", "Automate discovery for all cloud providers"]
elif metrics.asset_coverage_rate >= 0.50:
asset_level = 2
asset_name = "Managed"
asset_gaps = ["Standardize asset classification", "Implement dependency tracking"]
else:
asset_level = 1
asset_name = "Initial"
asset_gaps = ["Deploy automated asset discovery", "Define asset classification policy"]
levels.append(MaturityLevel(
dimension="Asset Management",
level=asset_level,
level_name=asset_name,
score=metrics.asset_coverage_rate * 100,
evidence=[f"Coverage: {metrics.asset_coverage_rate:.0%}",
f"Total assets tracked: {metrics.total_ai_assets}"],
gaps_to_next_level=asset_gaps,
))
# Incident response maturity
if metrics.mean_time_to_detect_hours < 1 and metrics.mean_time_to_contain_hours < 4:
ir_level = 4
ir_name = "Quantitatively Managed"
ir_gaps = ["Implement automated response for common incident types"]
elif metrics.mean_time_to_detect_hours < 4:
ir_level = 3
ir_name = "Defined"
ir_gaps = ["Reduce MTTD below 1 hour", "Automate containment for SEV1"]
elif metrics.mean_time_to_detect_hours < 24:
ir_level = 2
ir_name = "Managed"
ir_gaps = ["Deploy AI-specific detection rules", "Create playbooks for all categories"]
else:
ir_level = 1
ir_name = "Initial"
ir_gaps = ["Implement basic monitoring", "Define incident categories"]
levels.append(MaturityLevel(
dimension="Incident Response",
level=ir_level,
level_name=ir_name,
score=max(0, 100 - metrics.mean_time_to_detect_hours * 5),
evidence=[
f"MTTD: {metrics.mean_time_to_detect_hours:.1f}h",
f"MTTC: {metrics.mean_time_to_contain_hours:.1f}h",
f"Incidents (30d): {metrics.incidents_last_30_days}",
],
gaps_to_next_level=ir_gaps,
))
# Red team maturity
if metrics.red_team_engagements_ytd >= 4 and metrics.remediation_rate >= 0.90:
rt_level = 4
rt_name = "Quantitatively Managed"
rt_gaps = ["Implement continuous automated red teaming"]
elif metrics.red_team_engagements_ytd >= 2:
rt_level = 3
rt_name = "Defined"
rt_gaps = ["Increase to quarterly engagements", "Achieve 90%+ remediation rate"]
elif metrics.red_team_engagements_ytd >= 1:
rt_level = 2
rt_name = "Managed"
rt_gaps = ["Define red team methodology", "Track remediation systematically"]
else:
rt_level = 1
rt_name = "Initial"
rt_gaps = ["Conduct first AI red team engagement", "Establish finding tracking"]
levels.append(MaturityLevel(
dimension="Red Teaming",
level=rt_level,
level_name=rt_name,
score=min(100, metrics.red_team_engagements_ytd * 25),
evidence=[
f"Engagements YTD: {metrics.red_team_engagements_ytd}",
f"Findings: {metrics.findings_from_red_team}",
f"Remediation rate: {metrics.remediation_rate:.0%}",
],
gaps_to_next_level=rt_gaps,
))
return levelsDeliverables Checklist
By the end of this capstone, you should have produced:
- An automated AI asset inventory system with cloud discovery modules
- A quantitative risk assessment engine with configurable scoring weights
- A library of AI-specific incident response playbooks with step-by-step procedures
- Detection rules for the top AI threat categories
- A program metrics dashboard with maturity scoring
- Integration points connecting all four pillars into a cohesive program
References
- NIST AI Risk Management Framework (AI RMF 1.0), https://www.nist.gov/artificial-intelligence/executive-order-safe-secure-and-trustworthy-artificial-intelligence
- ISO/IEC 42001:2023, "Artificial Intelligence Management System," https://www.iso.org/standard/81230.html
- OWASP Top 10 for Large Language Model Applications 2025, https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS (Adversarial Threat Landscape for AI Systems), https://atlas.mitre.org/