Capstone: Ontwerp een AI-beveiligingsprogramma voor de onderneming
Architectureer een uitgebreid AI-beveiligingsprogramma voor de onderneming dat governance, technische controls, risicobeheer en incidentrespons omvat voor organisaties die LLM's op schaal inzetten.
Overzicht
De meeste organisaties die AI adopteren, bouten model-implementaties vast op bestaande beveiligingsprogramma's die ontworpen zijn voor traditionele software. Deze aanpak laat kritieke gaten achter: er is geen inventaris van welke modellen waar zijn geïmplementeerd, geen proces om risico's in de toeleveringsketen van modellen te evalueren, geen monitoring voor prompt-injectie of data-exfiltratie via modeluitvoer, en geen incidentrespons-playbook voor AI-specifieke aanvalsscenario's. Wanneer er een incident optreedt — een model lekt PII, een agent voert ongeautoriseerde acties uit, of een fine-getuned model vertoont vergiftigd gedrag — improviseren beveiligingsteams in allerijl.
Dit capstone-project geeft je de opdracht om een volledig AI-beveiligingsprogramma voor de onderneming vanaf de grond op te zetten. Het programma omvat vier pijlers: governance (beleid, rollen en toezicht), assetbeheer (inventaris en classificatie van AI-componenten), technische controls (preventieve, detectieve en corrigerende maatregelen) en operations (monitoring, incidentrespons en continue verbetering). Elke pijler levert concrete artefacten op — beleidsdocumenten, assetregisters, controlconfiguraties en runbooks — die een organisatie rechtstreeks kan overnemen.
Het ontwerp sluit aan op de vierfunctiestructuur van het NIST AI Risk Management Framework (AI RMF) (GOVERN, MAP, MEASURE, MANAGE) en verwerkt vereisten uit ISO/IEC 42001:2023 (AI Management System) en de verplichtingen van de EU AI Act voor AI-systemen met een hoog risico. Waar deze raamwerken richtlijnen op hoog niveau bieden, levert dit project de specifieke implementatiedetails die de kloof overbruggen tussen compliancevereisten en operationele beveiliging.
Het op te leveren resultaat is geen document — het is een werkend systeem met automatisering, tooling en integratiepunten die het beveiligingsprogramma operationeel houdbaar maken.
Projectvereisten
Programma-architectuur
Het AI-beveiligingsprogramma functioneert over vier pijlers met duidelijke interfaces:
┌─────────────────────────────────────────────────────┐
│ GOVERNANCE PILLAR │
│ Policies │ Roles │ Risk Appetite │ Oversight Board │
├─────────────────────────────────────────────────────┤
│ ASSET MANAGEMENT PILLAR │
│ AI Inventory │ Classification │ Supply Chain │ SBOM │
├─────────────────────────────────────────────────────┤
│ TECHNICAL CONTROLS PILLAR │
│ Pre-deploy │ Runtime │ Monitoring │ Data Protection │
├─────────────────────────────────────────────────────┤
│ OPERATIONS PILLAR │
│ Incident Response │ Red Teaming │ Metrics │ Review │
└─────────────────────────────────────────────────────┘
Functionele vereisten
-
Governanceraamwerk — Beleid dat acceptabel AI-gebruik, modelinkoop, gegevensverwerking voor AI, red-teamingvereisten en AI-incidentclassificatie omvat. Gedefinieerde RACI-matrix voor verantwoordelijkheden op het gebied van AI-beveiliging.
-
AI-asset-inventaris — Geautomatiseerde ontdekking en registratie van AI-modellen, datasets, embedding stores, agent-configuraties, API-sleutels en integratiepunten binnen de organisatie.
-
Risicobeoordelingsengine — Kwantitatieve risicoscoring voor elk AI-asset op basis van datagevoeligheid, implementatiecontext, toegangsoppervlak, modelherkomst en regelgevende classificatie.
-
Bibliotheek van technische controls — Catalogus van preventieve, detectieve en corrigerende controls, gekoppeld aan de OWASP LLM Top 10-risico's met implementatiespecificaties.
-
Runtime-monitoring — Detectieregels voor AI-specifieke dreigingen, waaronder pogingen tot prompt-injectie, afwijkende uitvoerpatronen, data-exfiltratie via modelreacties en kostenmisbruik.
-
Incidentrespons-playbooks — AI-specifieke playbooks voor modelcompromittering, datavergiftiging, exploitatie van prompt-injectie, agentmisbruik en aanvallen op de toeleveringsketen van modellen.
-
Metrics en rapportage — KPI's en dashboards die de volwassenheid van het programma, de risicohouding, incidenttrends en compliancestatus bijhouden.
Implementatiegids
Fase 1: AI-asset-inventaris en classificatie
Bouw het geautomatiseerde inventarissysteem dat zicht biedt op alle AI-assets binnen de organisatie.
# inventory/asset_registry.py
"""Enterprise AI asset inventory and classification system."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, date
from enum import Enum
from typing import Optional
from uuid import uuid4
logger = logging.getLogger(__name__)
class AssetType(Enum):
"""Types of AI assets tracked by the inventory."""
FOUNDATION_MODEL = "foundation_model"
FINE_TUNED_MODEL = "fine_tuned_model"
EMBEDDING_MODEL = "embedding_model"
VECTOR_DATABASE = "vector_database"
TRAINING_DATASET = "training_dataset"
EVALUATION_DATASET = "evaluation_dataset"
AI_AGENT = "ai_agent"
API_ENDPOINT = "api_endpoint"
GUARDRAIL_CONFIG = "guardrail_config"
PROMPT_TEMPLATE = "prompt_template"
class DataClassification(Enum):
"""Data sensitivity classification for AI assets."""
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
REGULATED = "regulated" # Subject to specific regulatory requirements
class DeploymentEnvironment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
RESEARCH = "research"
class RegulatoryScope(Enum):
"""Regulatory frameworks that may apply to the asset."""
NONE = "none"
GDPR = "gdpr"
HIPAA = "hipaa"
SOX = "sox"
EU_AI_ACT_HIGH_RISK = "eu_ai_act_high_risk"
EU_AI_ACT_LIMITED_RISK = "eu_ai_act_limited_risk"
PCI_DSS = "pci_dss"
CCPA = "ccpa"
@dataclass
class AIAsset:
"""A tracked AI asset in the enterprise inventory."""
asset_id: str = field(default_factory=lambda: str(uuid4()))
name: str = ""
asset_type: AssetType = AssetType.FOUNDATION_MODEL
description: str = ""
owner_team: str = ""
owner_email: str = ""
data_classification: DataClassification = DataClassification.INTERNAL
deployment_env: DeploymentEnvironment = DeploymentEnvironment.DEVELOPMENT
regulatory_scope: list[RegulatoryScope] = field(default_factory=list)
# Model-specific metadata
model_provider: Optional[str] = None
model_name: Optional[str] = None
model_version: Optional[str] = None
is_self_hosted: bool = False
parameter_count: Optional[str] = None
# Supply chain metadata
source_url: Optional[str] = None
license_type: Optional[str] = None
last_security_review: Optional[date] = None
sbom_available: bool = False
# Deployment metadata
api_endpoint: Optional[str] = None
cloud_provider: Optional[str] = None
cloud_region: Optional[str] = None
network_exposure: str = "internal" # internal, vpc, public
# Dependencies
upstream_assets: list[str] = field(default_factory=list) # Asset IDs this depends on
downstream_assets: list[str] = field(default_factory=list) # Asset IDs that depend on this
# Audit
registered_date: datetime = field(default_factory=datetime.utcnow)
last_updated: datetime = field(default_factory=datetime.utcnow)
risk_score: Optional[float] = None
compliance_status: str = "unreviewed"
class AssetDiscovery:
"""Automated discovery of AI assets across cloud environments."""
def discover_aws_bedrock(self, session) -> list[AIAsset]:
"""Discover AI assets in AWS Bedrock."""
assets = []
try:
bedrock = session.client("bedrock")
# List custom models (fine-tuned)
custom_models = bedrock.list_custom_models()
for model in custom_models.get("modelSummaries", []):
asset = AIAsset(
name=model["modelName"],
asset_type=AssetType.FINE_TUNED_MODEL,
description=f"AWS Bedrock custom model: {model['modelName']}",
model_provider="aws_bedrock",
model_name=model["modelName"],
cloud_provider="aws",
deployment_env=DeploymentEnvironment.PRODUCTION,
is_self_hosted=False,
)
assets.append(asset)
# List provisioned model throughput
provisioned = bedrock.list_provisioned_model_throughputs()
for pt in provisioned.get("provisionedModelSummaries", []):
asset = AIAsset(
name=f"Provisioned: {pt['provisionedModelName']}",
asset_type=AssetType.API_ENDPOINT,
model_provider="aws_bedrock",
model_name=pt.get("foundationModelArn", ""),
cloud_provider="aws",
deployment_env=DeploymentEnvironment.PRODUCTION,
)
assets.append(asset)
except Exception as e:
logger.error(f"AWS Bedrock discovery failed: {e}")
return assets
def discover_azure_openai(self, credential) -> list[AIAsset]:
"""Discover AI assets in Azure OpenAI Service."""
assets = []
try:
from azure.mgmt.cognitiveservices import CognitiveServicesManagementClient
from azure.identity import DefaultAzureCredential
client = CognitiveServicesManagementClient(
credential=credential,
subscription_id="<subscription_id>",
)
# List all Cognitive Services accounts of kind OpenAI
for account in client.accounts.list():
if account.kind == "OpenAI":
# List deployments within this account
deployments = client.deployments.list(
resource_group_name=account.id.split("/")[4],
account_name=account.name,
)
for deployment in deployments:
asset = AIAsset(
name=f"Azure OpenAI: {deployment.name}",
asset_type=AssetType.API_ENDPOINT,
model_provider="azure_openai",
model_name=deployment.properties.model.name,
model_version=deployment.properties.model.version,
cloud_provider="azure",
cloud_region=account.location,
deployment_env=DeploymentEnvironment.PRODUCTION,
api_endpoint=account.properties.endpoint,
)
assets.append(asset)
except Exception as e:
logger.error(f"Azure OpenAI discovery failed: {e}")
return assets
def discover_huggingface_models(self, search_paths: list[str]) -> list[AIAsset]:
"""Discover locally deployed Hugging Face models."""
assets = []
from pathlib import Path
for search_path in search_paths:
path = Path(search_path)
# Look for model config files that indicate HF models
for config_file in path.rglob("config.json"):
try:
config = json.loads(config_file.read_text())
if "model_type" in config or "architectures" in config:
model_name = config.get(
"_name_or_path",
config_file.parent.name,
)
asset = AIAsset(
name=f"Local HF: {model_name}",
asset_type=AssetType.FOUNDATION_MODEL,
model_provider="huggingface",
model_name=model_name,
is_self_hosted=True,
source_url=f"https://huggingface.co/{model_name}",
deployment_env=DeploymentEnvironment.PRODUCTION,
)
assets.append(asset)
except (json.JSONDecodeError, OSError):
continue
return assets
class AssetRegistry:
"""Central registry for managing AI asset inventory."""
def __init__(self):
self._assets: dict[str, AIAsset] = {}
self._discovery = AssetDiscovery()
def register(self, asset: AIAsset) -> str:
"""Register a new AI asset. Returns the asset ID."""
if not asset.name:
raise ValueError("Asset must have a name")
# Check for duplicates based on key attributes
for existing in self._assets.values():
if (existing.model_name == asset.model_name
and existing.model_provider == asset.model_provider
and existing.api_endpoint == asset.api_endpoint
and existing.deployment_env == asset.deployment_env):
logger.info(
f"Updating existing asset {existing.asset_id} instead of creating duplicate"
)
existing.last_updated = datetime.utcnow()
return existing.asset_id
self._assets[asset.asset_id] = asset
logger.info(f"Registered new AI asset: {asset.name} ({asset.asset_id})")
return asset.asset_id
def get(self, asset_id: str) -> Optional[AIAsset]:
return self._assets.get(asset_id)
def search(
self,
asset_type: Optional[AssetType] = None,
data_classification: Optional[DataClassification] = None,
owner_team: Optional[str] = None,
regulatory_scope: Optional[RegulatoryScope] = None,
unreviewed_only: bool = False,
) -> list[AIAsset]:
"""Search assets with filters."""
results = list(self._assets.values())
if asset_type:
results = [a for a in results if a.asset_type == asset_type]
if data_classification:
results = [a for a in results if a.data_classification == data_classification]
if owner_team:
results = [a for a in results if a.owner_team == owner_team]
if regulatory_scope:
results = [a for a in results if regulatory_scope in a.regulatory_scope]
if unreviewed_only:
results = [a for a in results if a.compliance_status == "unreviewed"]
return results
def dependency_graph(self, asset_id: str) -> dict:
"""Build the upstream/downstream dependency graph for an asset."""
asset = self._assets.get(asset_id)
if not asset:
return {}
visited = set()
def walk_upstream(aid: str) -> dict:
if aid in visited:
return {"id": aid, "circular": True}
visited.add(aid)
a = self._assets.get(aid)
if not a:
return {"id": aid, "missing": True}
return {
"id": aid,
"name": a.name,
"type": a.asset_type.value,
"upstream": [walk_upstream(uid) for uid in a.upstream_assets],
}
return walk_upstream(asset_id)
def compliance_gap_report(self) -> dict:
"""Identify assets with compliance gaps."""
gaps = {
"no_security_review": [],
"stale_review": [],
"missing_sbom": [],
"unclassified_data": [],
"no_owner": [],
"public_exposure_high_sensitivity": [],
}
for asset in self._assets.values():
if asset.last_security_review is None:
gaps["no_security_review"].append(asset.asset_id)
if (asset.last_security_review
and (date.today() - asset.last_security_review).days > 90):
gaps["stale_review"].append(asset.asset_id)
if not asset.sbom_available and asset.is_self_hosted:
gaps["missing_sbom"].append(asset.asset_id)
if asset.data_classification == DataClassification.INTERNAL and asset.regulatory_scope:
gaps["unclassified_data"].append(asset.asset_id)
if not asset.owner_team:
gaps["no_owner"].append(asset.asset_id)
if (asset.network_exposure == "public"
and asset.data_classification in (
DataClassification.CONFIDENTIAL,
DataClassification.RESTRICTED,
DataClassification.REGULATED,
)):
gaps["public_exposure_high_sensitivity"].append(asset.asset_id)
return gapsFase 2: Risicobeoordelingsengine
Kwantificeer het risico voor elk AI-asset op basis van meerdere dimensies.
# risk/assessment.py
"""Quantitative risk assessment engine for AI assets."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from typing import Optional
from inventory.asset_registry import (
AIAsset, AssetType, DataClassification,
DeploymentEnvironment, RegulatoryScope,
)
@dataclass
class RiskAssessment:
"""Quantified risk assessment for a single AI asset."""
asset_id: str
asset_name: str
inherent_risk_score: float # Risk without controls (0-100)
control_effectiveness: float # How much controls reduce risk (0-1)
residual_risk_score: float # Risk after controls (0-100)
risk_level: str # CRITICAL, HIGH, MEDIUM, LOW
risk_factors: list[dict] # Individual contributing factors
recommendations: list[str] # Prioritized remediation steps
assessed_date: date
@property
def risk_reduction_percentage(self) -> float:
if self.inherent_risk_score == 0:
return 0
return (1 - self.residual_risk_score / self.inherent_risk_score) * 100
class AIRiskAssessor:
"""Assesses risk for AI assets across multiple dimensions."""
# Risk weights for each dimension (must sum to 1.0)
DIMENSION_WEIGHTS = {
"data_sensitivity": 0.25,
"deployment_exposure": 0.20,
"model_provenance": 0.15,
"regulatory_impact": 0.15,
"capability_risk": 0.15,
"supply_chain": 0.10,
}
def assess(self, asset: AIAsset, controls: Optional[list[dict]] = None) -> RiskAssessment:
"""Run a full risk assessment on an AI asset."""
risk_factors = []
# Data sensitivity dimension
data_score = self._score_data_sensitivity(asset)
risk_factors.append({
"dimension": "data_sensitivity",
"score": data_score,
"weight": self.DIMENSION_WEIGHTS["data_sensitivity"],
"details": f"Data classification: {asset.data_classification.value}",
})
# Deployment exposure dimension
exposure_score = self._score_deployment_exposure(asset)
risk_factors.append({
"dimension": "deployment_exposure",
"score": exposure_score,
"weight": self.DIMENSION_WEIGHTS["deployment_exposure"],
"details": f"Network: {asset.network_exposure}, Env: {asset.deployment_env.value}",
})
# Model provenance dimension
provenance_score = self._score_model_provenance(asset)
risk_factors.append({
"dimension": "model_provenance",
"score": provenance_score,
"weight": self.DIMENSION_WEIGHTS["model_provenance"],
"details": f"Provider: {asset.model_provider}, Self-hosted: {asset.is_self_hosted}",
})
# Regulatory impact dimension
regulatory_score = self._score_regulatory_impact(asset)
risk_factors.append({
"dimension": "regulatory_impact",
"score": regulatory_score,
"weight": self.DIMENSION_WEIGHTS["regulatory_impact"],
"details": f"Regulatory scope: {[r.value for r in asset.regulatory_scope]}",
})
# Capability risk dimension
capability_score = self._score_capability_risk(asset)
risk_factors.append({
"dimension": "capability_risk",
"score": capability_score,
"weight": self.DIMENSION_WEIGHTS["capability_risk"],
"details": f"Asset type: {asset.asset_type.value}",
})
# Supply chain dimension
supply_chain_score = self._score_supply_chain(asset)
risk_factors.append({
"dimension": "supply_chain",
"score": supply_chain_score,
"weight": self.DIMENSION_WEIGHTS["supply_chain"],
"details": f"SBOM: {asset.sbom_available}, License: {asset.license_type}",
})
# Calculate inherent risk score
inherent_risk = sum(
f["score"] * f["weight"] for f in risk_factors
)
# Calculate control effectiveness
control_effectiveness = self._evaluate_controls(asset, controls or [])
# Residual risk = inherent risk * (1 - control effectiveness)
residual_risk = inherent_risk * (1 - control_effectiveness)
# Determine risk level
if residual_risk >= 75:
risk_level = "CRITICAL"
elif residual_risk >= 50:
risk_level = "HIGH"
elif residual_risk >= 25:
risk_level = "MEDIUM"
else:
risk_level = "LOW"
recommendations = self._generate_recommendations(risk_factors, asset)
return RiskAssessment(
asset_id=asset.asset_id,
asset_name=asset.name,
inherent_risk_score=round(inherent_risk, 2),
control_effectiveness=round(control_effectiveness, 2),
residual_risk_score=round(residual_risk, 2),
risk_level=risk_level,
risk_factors=risk_factors,
recommendations=recommendations,
assessed_date=date.today(),
)
def _score_data_sensitivity(self, asset: AIAsset) -> float:
"""Score risk from data sensitivity (0-100)."""
scores = {
DataClassification.PUBLIC: 10,
DataClassification.INTERNAL: 30,
DataClassification.CONFIDENTIAL: 60,
DataClassification.RESTRICTED: 85,
DataClassification.REGULATED: 95,
}
return scores.get(asset.data_classification, 50)
def _score_deployment_exposure(self, asset: AIAsset) -> float:
exposure_scores = {"internal": 20, "vpc": 40, "public": 90}
env_multipliers = {
DeploymentEnvironment.DEVELOPMENT: 0.4,
DeploymentEnvironment.STAGING: 0.6,
DeploymentEnvironment.PRODUCTION: 1.0,
DeploymentEnvironment.RESEARCH: 0.3,
}
base = exposure_scores.get(asset.network_exposure, 50)
multiplier = env_multipliers.get(asset.deployment_env, 0.5)
return min(100, base * multiplier)
def _score_model_provenance(self, asset: AIAsset) -> float:
if asset.model_provider in ("openai", "anthropic", "google"):
base = 20 # Major providers have security programs
elif asset.model_provider in ("azure_openai", "aws_bedrock", "gcp_vertex"):
base = 15 # Cloud-managed services add security layers
elif asset.is_self_hosted:
base = 60 # Self-hosted requires own security posture
else:
base = 50
if not asset.last_security_review:
base += 20
elif (date.today() - asset.last_security_review).days > 90:
base += 10
return min(100, base)
def _score_regulatory_impact(self, asset: AIAsset) -> float:
if not asset.regulatory_scope:
return 10
high_impact = {
RegulatoryScope.EU_AI_ACT_HIGH_RISK, RegulatoryScope.HIPAA,
RegulatoryScope.SOX, RegulatoryScope.PCI_DSS,
}
if any(r in high_impact for r in asset.regulatory_scope):
return 90
return 50
def _score_capability_risk(self, asset: AIAsset) -> float:
scores = {
AssetType.AI_AGENT: 90, # Agents can take actions
AssetType.FINE_TUNED_MODEL: 70, # Custom training data risks
AssetType.FOUNDATION_MODEL: 50,
AssetType.API_ENDPOINT: 60,
AssetType.VECTOR_DATABASE: 55,
AssetType.EMBEDDING_MODEL: 30,
AssetType.TRAINING_DATASET: 45,
AssetType.PROMPT_TEMPLATE: 25,
AssetType.GUARDRAIL_CONFIG: 20,
AssetType.EVALUATION_DATASET: 15,
}
return scores.get(asset.asset_type, 50)
def _score_supply_chain(self, asset: AIAsset) -> float:
score = 50
if asset.sbom_available:
score -= 20
if asset.license_type and asset.license_type != "unknown":
score -= 10
if asset.source_url and "huggingface.co" in (asset.source_url or ""):
score += 10 # Open model repos have higher supply chain risk
return max(0, min(100, score))
def _evaluate_controls(self, asset: AIAsset, controls: list[dict]) -> float:
"""Estimate control effectiveness based on deployed controls."""
if not controls:
return 0.0
control_scores = {
"input_filtering": 0.15,
"output_filtering": 0.15,
"rate_limiting": 0.10,
"authentication": 0.10,
"encryption_at_rest": 0.10,
"encryption_in_transit": 0.10,
"monitoring": 0.10,
"guardrails": 0.10,
"access_logging": 0.05,
"network_isolation": 0.05,
}
effectiveness = 0.0
for control in controls:
control_type = control.get("type", "")
if control_type in control_scores:
# Each control contributes based on its weight and operational status
operational = control.get("operational", False)
if operational:
effectiveness += control_scores[control_type]
return min(1.0, effectiveness)
def _generate_recommendations(
self, risk_factors: list[dict], asset: AIAsset
) -> list[str]:
"""Generate prioritized remediation recommendations."""
recs = []
# Sort risk factors by weighted contribution (descending)
sorted_factors = sorted(
risk_factors,
key=lambda f: f["score"] * f["weight"],
reverse=True,
)
for factor in sorted_factors[:3]: # Top 3 risk drivers
dim = factor["dimension"]
if dim == "data_sensitivity" and factor["score"] > 60:
recs.append(
"Implement data loss prevention (DLP) controls on model inputs and outputs. "
"Consider PII detection and redaction before data reaches the model."
)
elif dim == "deployment_exposure" and factor["score"] > 50:
recs.append(
"Reduce network exposure by deploying behind a VPC with private endpoints. "
"Implement API gateway with authentication and rate limiting."
)
elif dim == "model_provenance" and factor["score"] > 40:
recs.append(
"Conduct a security review of the model and its supply chain. "
"Generate an AI SBOM and verify model integrity checksums."
)
elif dim == "regulatory_impact" and factor["score"] > 60:
recs.append(
"Map regulatory obligations to specific technical controls. "
"Implement audit logging that satisfies retention requirements."
)
elif dim == "capability_risk" and factor["score"] > 60:
recs.append(
"Apply principle of least privilege to model/agent capabilities. "
"Implement human-in-the-loop approval for high-impact actions."
)
elif dim == "supply_chain" and factor["score"] > 40:
recs.append(
"Generate and maintain an AI Software Bill of Materials (SBOM). "
"Monitor for vulnerabilities in model dependencies and frameworks."
)
return recsFase 3: Engine voor AI-incidentrespons-playbooks
Bouw het incidentrespons-raamwerk met AI-specifieke playbooks.
# incident_response/playbooks.py
"""AI-specific incident response playbook engine."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class IncidentSeverity(Enum):
SEV1 = "sev1" # Active exploitation, data breach, system compromise
SEV2 = "sev2" # Confirmed vulnerability, potential data exposure
SEV3 = "sev3" # Suspicious activity, policy violation
SEV4 = "sev4" # Informational, potential false positive
class IncidentCategory(Enum):
PROMPT_INJECTION_EXPLOITATION = "prompt_injection_exploitation"
DATA_EXFILTRATION_VIA_MODEL = "data_exfiltration_via_model"
MODEL_SUPPLY_CHAIN_COMPROMISE = "model_supply_chain_compromise"
AGENT_UNAUTHORIZED_ACTIONS = "agent_unauthorized_actions"
TRAINING_DATA_POISONING = "training_data_poisoning"
MODEL_DENIAL_OF_SERVICE = "model_denial_of_service"
PII_LEAKAGE_IN_OUTPUT = "pii_leakage_in_output"
JAILBREAK_BYPASS = "jailbreak_bypass"
@dataclass
class PlaybookStep:
"""A single step in an incident response playbook."""
step_number: int
phase: str # detection, containment, eradication, recovery, lessons_learned
action: str
responsible_role: str
automated: bool
automation_script: Optional[str] = None
timeout_minutes: int = 30
escalation_trigger: Optional[str] = None
@dataclass
class Playbook:
"""Complete incident response playbook for an AI incident category."""
category: IncidentCategory
title: str
description: str
severity_default: IncidentSeverity
steps: list[PlaybookStep] = field(default_factory=list)
detection_rules: list[dict] = field(default_factory=list)
containment_actions: list[str] = field(default_factory=list)
evidence_to_collect: list[str] = field(default_factory=list)
notification_requirements: list[str] = field(default_factory=list)
@dataclass
class Incident:
"""A tracked AI security incident."""
incident_id: str
category: IncidentCategory
severity: IncidentSeverity
title: str
description: str
affected_assets: list[str] # Asset IDs from the registry
detected_at: datetime
detected_by: str # Person, system, or rule that detected
current_phase: str = "detection"
assigned_to: Optional[str] = None
timeline: list[dict] = field(default_factory=list)
evidence: list[dict] = field(default_factory=list)
status: str = "open"
class PlaybookLibrary:
"""Library of AI-specific incident response playbooks."""
def __init__(self):
self._playbooks: dict[IncidentCategory, Playbook] = {}
self._register_default_playbooks()
def _register_default_playbooks(self):
"""Register built-in playbooks for common AI incident categories."""
# Prompt injection exploitation playbook
self._playbooks[IncidentCategory.PROMPT_INJECTION_EXPLOITATION] = Playbook(
category=IncidentCategory.PROMPT_INJECTION_EXPLOITATION,
title="Prompt Injection Exploitation Response",
description=(
"Response procedure for confirmed prompt injection attacks that have "
"bypassed guardrails and caused the model to execute unauthorized instructions."
),
severity_default=IncidentSeverity.SEV2,
steps=[
PlaybookStep(
step_number=1,
phase="detection",
action="Verify the alert is a true positive by reviewing the flagged request/response pair in the monitoring system",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=15,
),
PlaybookStep(
step_number=2,
phase="detection",
action="Classify the injection type (direct, indirect, multi-turn) and determine the achieved impact (data leak, instruction override, tool abuse)",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=30,
),
PlaybookStep(
step_number=3,
phase="containment",
action="If injection achieved tool execution or data access: immediately revoke the affected API session and rotate any exposed credentials",
responsible_role="ai_security_engineer",
automated=True,
automation_script="scripts/revoke_session.py",
timeout_minutes=5,
escalation_trigger="If credential rotation fails, escalate to SEV1",
),
PlaybookStep(
step_number=4,
phase="containment",
action="Deploy an emergency guardrail rule that blocks the specific injection pattern. Use exact-match and regex patterns derived from the attack payload.",
responsible_role="ai_security_engineer",
automated=True,
automation_script="scripts/deploy_emergency_guardrail.py",
timeout_minutes=15,
),
PlaybookStep(
step_number=5,
phase="eradication",
action="Analyze the full conversation history for the attacking session to determine if additional payloads were attempted or if data was exfiltrated across multiple turns",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=60,
),
PlaybookStep(
step_number=6,
phase="eradication",
action="Test the injection pattern against the current guardrail configuration to verify the emergency rule is effective. Test variations and mutations of the pattern.",
responsible_role="ai_red_team",
automated=False,
timeout_minutes=120,
),
PlaybookStep(
step_number=7,
phase="recovery",
action="If system prompt or confidential data was leaked, rotate the system prompt and notify affected data owners per the data breach notification policy",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=60,
),
PlaybookStep(
step_number=8,
phase="lessons_learned",
action="Document the incident in the vulnerability database with full attack chain, root cause analysis, and timeline. Update detection rules to catch similar patterns.",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=240,
),
],
detection_rules=[
{
"name": "injection_keyword_match",
"description": "Detect known injection patterns in user input",
"type": "regex",
"pattern": r"(ignore|disregard|forget)\s+(all\s+)?(previous|prior|above)\s+(instructions|prompts|rules)",
},
{
"name": "output_anomaly",
"description": "Detect when model output contains system prompt fragments",
"type": "similarity",
"threshold": 0.85,
"reference": "system_prompt_embeddings",
},
{
"name": "behavioral_shift",
"description": "Detect sudden change in model output characteristics within a session",
"type": "statistical",
"metric": "output_entropy_delta",
"threshold": 2.0,
},
],
containment_actions=[
"Terminate the affected user session",
"Block the source IP/API key temporarily",
"Enable enhanced logging on the affected endpoint",
"Deploy emergency input filter for the specific payload pattern",
],
evidence_to_collect=[
"Full conversation history for the affected session",
"Input/output logs with timestamps",
"Guardrail evaluation logs showing why the attack was not blocked",
"API access logs for the attacking identity",
"Model configuration at time of incident (system prompt, temperature, tools)",
],
notification_requirements=[
"AI Security team lead within 30 minutes",
"CISO within 2 hours for SEV1/SEV2",
"Legal/compliance if PII or regulated data was exposed",
"Affected application team within 1 hour",
],
)
# Agent unauthorized actions playbook
self._playbooks[IncidentCategory.AGENT_UNAUTHORIZED_ACTIONS] = Playbook(
category=IncidentCategory.AGENT_UNAUTHORIZED_ACTIONS,
title="Agent Unauthorized Actions Response",
description=(
"Response procedure when an AI agent executes actions outside its "
"authorized scope, whether triggered by prompt injection, misconfiguration, "
"or emergent behavior."
),
severity_default=IncidentSeverity.SEV1,
steps=[
PlaybookStep(
step_number=1,
phase="containment",
action="IMMEDIATELY disable the agent's tool access and API credentials. Do not wait for analysis — contain first.",
responsible_role="on_call_engineer",
automated=True,
automation_script="scripts/disable_agent_tools.py",
timeout_minutes=5,
escalation_trigger="If agent cannot be disabled within 5 minutes, escalate to infrastructure team for network-level block",
),
PlaybookStep(
step_number=2,
phase="containment",
action="Identify all actions the agent executed by reviewing tool call logs. Determine the blast radius — which systems, data, and users were affected.",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=30,
),
PlaybookStep(
step_number=3,
phase="eradication",
action="Reverse unauthorized actions where possible (delete created resources, revert data changes, revoke granted permissions)",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=120,
),
PlaybookStep(
step_number=4,
phase="recovery",
action="Before re-enabling the agent, implement stricter tool-use policies: explicit allow-lists, human-in-the-loop for sensitive actions, rate limits on tool calls",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=240,
),
],
detection_rules=[
{
"name": "unauthorized_tool_call",
"description": "Agent called a tool not in its authorized tool set",
"type": "policy",
"check": "tool_name NOT IN agent.authorized_tools",
},
{
"name": "excessive_tool_calls",
"description": "Agent making unusually many tool calls in a time window",
"type": "statistical",
"metric": "tool_calls_per_minute",
"threshold": 20,
},
],
containment_actions=[
"Disable all agent tool access immediately",
"Revoke agent API credentials",
"Block agent network access at the firewall level if needed",
],
evidence_to_collect=[
"Complete agent execution trace with all tool calls and responses",
"The triggering user input that led to unauthorized actions",
"Agent configuration including system prompt and tool definitions",
"Logs from all downstream systems the agent interacted with",
],
notification_requirements=[
"AI Security team lead IMMEDIATELY",
"CISO within 1 hour",
"Owners of all affected downstream systems within 1 hour",
"Legal if customer data was accessed or modified",
],
)
def get_playbook(self, category: IncidentCategory) -> Optional[Playbook]:
return self._playbooks.get(category)
def list_playbooks(self) -> list[dict]:
return [
{
"category": pb.category.value,
"title": pb.title,
"severity_default": pb.severity_default.value,
"num_steps": len(pb.steps),
}
for pb in self._playbooks.values()
]Fase 4: Programma-metrics en volwassenheidsdashboard
Volg de effectiviteit van het programma met kwantificeerbare metrics.
# metrics/dashboard.py
"""AI security program metrics and maturity scoring."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import date, timedelta
from typing import Optional
@dataclass
class ProgramMetrics:
"""Key performance indicators for the AI security program."""
# Asset management metrics
total_ai_assets: int = 0
assets_with_security_review: int = 0
assets_with_stale_review: int = 0 # Review older than 90 days
asset_coverage_rate: float = 0.0
# Risk metrics
critical_risk_assets: int = 0
high_risk_assets: int = 0
mean_residual_risk: float = 0.0
risk_acceptance_count: int = 0
# Incident metrics
incidents_last_30_days: int = 0
mean_time_to_detect_hours: float = 0.0
mean_time_to_contain_hours: float = 0.0
mean_time_to_resolve_hours: float = 0.0
incidents_by_category: dict = field(default_factory=dict)
# Control metrics
controls_deployed: int = 0
controls_operational: int = 0
control_coverage_rate: float = 0.0
# Red team metrics
red_team_engagements_ytd: int = 0
findings_from_red_team: int = 0
findings_remediated: int = 0
remediation_rate: float = 0.0
# Compliance metrics
compliance_gap_count: int = 0
overdue_actions: int = 0
@dataclass
class MaturityLevel:
"""CMMI-inspired maturity level for AI security program dimensions."""
dimension: str
level: int # 1-5
level_name: str # Initial, Managed, Defined, Quantitatively Managed, Optimizing
score: float # 0-100 within the level
evidence: list[str] = field(default_factory=list)
gaps_to_next_level: list[str] = field(default_factory=list)
def assess_program_maturity(metrics: ProgramMetrics) -> list[MaturityLevel]:
"""
Assess AI security program maturity across key dimensions.
Returns maturity levels inspired by CMMI, adapted for AI security:
Level 1 (Initial): Ad-hoc, reactive, no formal processes
Level 2 (Managed): Basic processes, some documentation
Level 3 (Defined): Standardized processes, proactive controls
Level 4 (Quantitative): Metrics-driven, continuous measurement
Level 5 (Optimizing): Continuous improvement, predictive capabilities
"""
levels = []
# Asset management maturity
if metrics.asset_coverage_rate >= 0.95:
asset_level = 4
asset_name = "Quantitatively Managed"
asset_gaps = ["Implement predictive asset risk scoring", "Automate SBOM generation"]
elif metrics.asset_coverage_rate >= 0.80:
asset_level = 3
asset_name = "Defined"
asset_gaps = ["Achieve 95%+ asset coverage", "Automate discovery for all cloud providers"]
elif metrics.asset_coverage_rate >= 0.50:
asset_level = 2
asset_name = "Managed"
asset_gaps = ["Standardize asset classification", "Implement dependency tracking"]
else:
asset_level = 1
asset_name = "Initial"
asset_gaps = ["Deploy automated asset discovery", "Define asset classification policy"]
levels.append(MaturityLevel(
dimension="Asset Management",
level=asset_level,
level_name=asset_name,
score=metrics.asset_coverage_rate * 100,
evidence=[f"Coverage: {metrics.asset_coverage_rate:.0%}",
f"Total assets tracked: {metrics.total_ai_assets}"],
gaps_to_next_level=asset_gaps,
))
# Incident response maturity
if metrics.mean_time_to_detect_hours < 1 and metrics.mean_time_to_contain_hours < 4:
ir_level = 4
ir_name = "Quantitatively Managed"
ir_gaps = ["Implement automated response for common incident types"]
elif metrics.mean_time_to_detect_hours < 4:
ir_level = 3
ir_name = "Defined"
ir_gaps = ["Reduce MTTD below 1 hour", "Automate containment for SEV1"]
elif metrics.mean_time_to_detect_hours < 24:
ir_level = 2
ir_name = "Managed"
ir_gaps = ["Deploy AI-specific detection rules", "Create playbooks for all categories"]
else:
ir_level = 1
ir_name = "Initial"
ir_gaps = ["Implement basic monitoring", "Define incident categories"]
levels.append(MaturityLevel(
dimension="Incident Response",
level=ir_level,
level_name=ir_name,
score=max(0, 100 - metrics.mean_time_to_detect_hours * 5),
evidence=[
f"MTTD: {metrics.mean_time_to_detect_hours:.1f}h",
f"MTTC: {metrics.mean_time_to_contain_hours:.1f}h",
f"Incidents (30d): {metrics.incidents_last_30_days}",
],
gaps_to_next_level=ir_gaps,
))
# Red team maturity
if metrics.red_team_engagements_ytd >= 4 and metrics.remediation_rate >= 0.90:
rt_level = 4
rt_name = "Quantitatively Managed"
rt_gaps = ["Implement continuous automated red teaming"]
elif metrics.red_team_engagements_ytd >= 2:
rt_level = 3
rt_name = "Defined"
rt_gaps = ["Increase to quarterly engagements", "Achieve 90%+ remediation rate"]
elif metrics.red_team_engagements_ytd >= 1:
rt_level = 2
rt_name = "Managed"
rt_gaps = ["Define red team methodology", "Track remediation systematically"]
else:
rt_level = 1
rt_name = "Initial"
rt_gaps = ["Conduct first AI red team engagement", "Establish finding tracking"]
levels.append(MaturityLevel(
dimension="Red Teaming",
level=rt_level,
level_name=rt_name,
score=min(100, metrics.red_team_engagements_ytd * 25),
evidence=[
f"Engagements YTD: {metrics.red_team_engagements_ytd}",
f"Findings: {metrics.findings_from_red_team}",
f"Remediation rate: {metrics.remediation_rate:.0%}",
],
gaps_to_next_level=rt_gaps,
))
return levelsChecklist met op te leveren resultaten
Aan het einde van dit capstone-project zou je het volgende moeten hebben opgeleverd:
- Een geautomatiseerd AI-asset-inventarissysteem met cloud-discoverymodules
- Een kwantitatieve risicobeoordelingsengine met configureerbare scoringgewichten
- Een bibliotheek met AI-specifieke incidentrespons-playbooks met stapsgewijze procedures
- Detectieregels voor de belangrijkste AI-dreigingscategorieën
- Een programma-metricsdashboard met volwassenheidsscoring
- Integratiepunten die alle vier pijlers verbinden tot een samenhangend programma
Referenties
- NIST AI Risk Management Framework (AI RMF 1.0), https://www.nist.gov/artificial-intelligence/executive-order-safe-secure-and-trustworthy-artificial-intelligence
- ISO/IEC 42001:2023, "Artificial Intelligence Management System," https://www.iso.org/standard/81230.html
- OWASP Top 10 for Large Language Model Applications 2025, https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS (Adversarial Threat Landscape for AI Systems), https://atlas.mitre.org/