Capstone: Design an Enterprise AI 安全 Program
Architect a comprehensive enterprise AI security program spanning governance, technical controls, risk management, and incident response for organizations deploying LLMs at scale.
概覽
Most organizations adopting AI are bolting model deployments onto existing 安全 programs designed for traditional software. This approach leaves critical gaps: 存在 no inventory of which models are deployed where, no process for evaluating model 供應鏈 risks, no 監控 for 提示詞注入 or data exfiltration through model outputs, and no incident response playbook for AI-specific attack scenarios. When an incident occurs — a model leaks PII, an 代理 executes unauthorized actions, or a fine-tuned model exhibits poisoned behavior — 安全 teams scramble to improvise.
This capstone project tasks you with designing a complete enterprise AI 安全 program from the ground up. The program spans four pillars: governance (policies, roles, and oversight), asset management (inventory and classification of AI components), technical controls (preventive, detective, and corrective measures), and operations (監控, incident response, and continuous improvement). Each pillar produces concrete artifacts — policy documents, asset registries, control configurations, and runbooks — that an organization can adopt directly.
The design aligns with the NIST AI Risk Management Framework (AI RMF) four-function structure (GOVERN, MAP, MEASURE, MANAGE) and incorporates requirements from ISO/IEC 42001:2023 (AI Management System) and the EU AI Act's obligations for high-risk AI systems. Where these frameworks provide high-level guidance, this project provides the specific 實作 details that bridge the gap between compliance requirements and operational 安全.
The deliverable is not a document — it is a working system with automation, tooling, and integration points that make the 安全 program operationally sustainable.
Project Requirements
Program Architecture
The AI 安全 program operates across four pillars with clear interfaces:
┌─────────────────────────────────────────────────────┐
│ GOVERNANCE PILLAR │
│ Policies │ Roles │ Risk Appetite │ Oversight Board │
├─────────────────────────────────────────────────────┤
│ ASSET MANAGEMENT PILLAR │
│ AI Inventory │ Classification │ Supply Chain │ SBOM │
├─────────────────────────────────────────────────────┤
│ TECHNICAL CONTROLS PILLAR │
│ Pre-deploy │ Runtime │ 監控 │ Data Protection │
├─────────────────────────────────────────────────────┤
│ OPERATIONS PILLAR │
│ Incident Response │ 紅隊演練 │ Metrics │ Review │
└─────────────────────────────────────────────────────┘
Functional Requirements
-
Governance Framework — Policies covering acceptable AI use, model procurement, data handling for AI, 紅隊演練 requirements, and AI incident classification. Defined RACI matrix for AI 安全 responsibilities.
-
AI Asset Inventory — Automated discovery and registration of AI models, datasets, 嵌入向量 stores, 代理 configurations, API keys, and integration points across the organization.
-
Risk 評估 Engine — Quantitative risk scoring 對每個 AI asset based on data sensitivity, deployment context, access surface, model provenance, and regulatory classification.
-
Technical Control Library — Catalog of preventive, detective, and corrective controls mapped to OWASP LLM Top 10 risks with 實作 specifications.
-
Runtime 監控 — 偵測 rules for AI-specific threats including 提示詞注入 attempts, anomalous 輸出 patterns, data exfiltration via model responses, and cost abuse.
-
Incident Response Playbooks — AI-specific playbooks for model compromise, 資料投毒, 提示詞注入 利用, 代理 misuse, and model 供應鏈 attacks.
-
Metrics and Reporting — KPIs and dashboards tracking program maturity, risk posture, incident trends, and compliance status.
實作 Guide
Phase 1: AI Asset Inventory and Classification
Build the automated inventory system that provides visibility into all AI assets across the organization.
# inventory/asset_registry.py
"""Enterprise AI asset inventory and classification system."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, date
from enum import Enum
from typing import Optional
from uuid import uuid4
logger = logging.getLogger(__name__)
class AssetType(Enum):
"""Types of AI assets tracked by the inventory."""
FOUNDATION_MODEL = "foundation_model"
FINE_TUNED_MODEL = "fine_tuned_model"
EMBEDDING_MODEL = "embedding_model"
VECTOR_DATABASE = "vector_database"
TRAINING_DATASET = "training_dataset"
EVALUATION_DATASET = "evaluation_dataset"
AI_AGENT = "ai_agent"
API_ENDPOINT = "api_endpoint"
GUARDRAIL_CONFIG = "guardrail_config"
PROMPT_TEMPLATE = "prompt_template"
class DataClassification(Enum):
"""Data sensitivity classification for AI assets."""
PUBLIC = "public"
INTERNAL = "internal"
CONFIDENTIAL = "confidential"
RESTRICTED = "restricted"
REGULATED = "regulated" # Subject to specific regulatory requirements
class DeploymentEnvironment(Enum):
DEVELOPMENT = "development"
STAGING = "staging"
PRODUCTION = "production"
RESEARCH = "research"
class RegulatoryScope(Enum):
"""Regulatory frameworks that may apply to the asset."""
NONE = "none"
GDPR = "gdpr"
HIPAA = "hipaa"
SOX = "sox"
EU_AI_ACT_HIGH_RISK = "eu_ai_act_high_risk"
EU_AI_ACT_LIMITED_RISK = "eu_ai_act_limited_risk"
PCI_DSS = "pci_dss"
CCPA = "ccpa"
@dataclass
class AIAsset:
"""A tracked AI asset in the enterprise inventory."""
asset_id: str = field(default_factory=lambda: str(uuid4()))
name: str = ""
asset_type: AssetType = AssetType.FOUNDATION_MODEL
description: str = ""
owner_team: str = ""
owner_email: str = ""
data_classification: DataClassification = DataClassification.INTERNAL
deployment_env: DeploymentEnvironment = DeploymentEnvironment.DEVELOPMENT
regulatory_scope: list[RegulatoryScope] = field(default_factory=list)
# Model-specific metadata
model_provider: Optional[str] = None
model_name: Optional[str] = None
model_version: Optional[str] = None
is_self_hosted: bool = False
parameter_count: Optional[str] = None
# 供應鏈 metadata
source_url: Optional[str] = None
license_type: Optional[str] = None
last_security_review: Optional[date] = None
sbom_available: bool = False
# Deployment metadata
api_endpoint: Optional[str] = None
cloud_provider: Optional[str] = None
cloud_region: Optional[str] = None
network_exposure: str = "internal" # internal, vpc, public
# Dependencies
upstream_assets: list[str] = field(default_factory=list) # Asset IDs this depends on
downstream_assets: list[str] = field(default_factory=list) # Asset IDs that depend on this
# Audit
registered_date: datetime = field(default_factory=datetime.utcnow)
last_updated: datetime = field(default_factory=datetime.utcnow)
risk_score: Optional[float] = None
compliance_status: str = "unreviewed"
class AssetDiscovery:
"""Automated discovery of AI assets across 雲端 environments."""
def discover_aws_bedrock(self, session) -> list[AIAsset]:
"""Discover AI assets in AWS Bedrock."""
assets = []
try:
bedrock = session.client("bedrock")
# List custom models (fine-tuned)
custom_models = bedrock.list_custom_models()
for model in custom_models.get("modelSummaries", []):
asset = AIAsset(
name=model["modelName"],
asset_type=AssetType.FINE_TUNED_MODEL,
description=f"AWS Bedrock custom model: {model['modelName']}",
model_provider="aws_bedrock",
model_name=model["modelName"],
cloud_provider="aws",
deployment_env=DeploymentEnvironment.PRODUCTION,
is_self_hosted=False,
)
assets.append(asset)
# List provisioned model throughput
provisioned = bedrock.list_provisioned_model_throughputs()
for pt in provisioned.get("provisionedModelSummaries", []):
asset = AIAsset(
name=f"Provisioned: {pt['provisionedModelName']}",
asset_type=AssetType.API_ENDPOINT,
model_provider="aws_bedrock",
model_name=pt.get("foundationModelArn", ""),
cloud_provider="aws",
deployment_env=DeploymentEnvironment.PRODUCTION,
)
assets.append(asset)
except Exception as e:
logger.error(f"AWS Bedrock discovery failed: {e}")
return assets
def discover_azure_openai(self, credential) -> list[AIAsset]:
"""Discover AI assets in Azure OpenAI Service."""
assets = []
try:
from azure.mgmt.cognitiveservices import CognitiveServicesManagementClient
from azure.identity import DefaultAzureCredential
client = CognitiveServicesManagementClient(
credential=credential,
subscription_id="<subscription_id>",
)
# List all Cognitive Services accounts of kind OpenAI
for account in client.accounts.list():
if account.kind == "OpenAI":
# List deployments within this account
deployments = client.deployments.list(
resource_group_name=account.id.split("/")[4],
account_name=account.name,
)
for deployment in deployments:
asset = AIAsset(
name=f"Azure OpenAI: {deployment.name}",
asset_type=AssetType.API_ENDPOINT,
model_provider="azure_openai",
model_name=deployment.properties.model.name,
model_version=deployment.properties.model.version,
cloud_provider="azure",
cloud_region=account.location,
deployment_env=DeploymentEnvironment.PRODUCTION,
api_endpoint=account.properties.endpoint,
)
assets.append(asset)
except Exception as e:
logger.error(f"Azure OpenAI discovery failed: {e}")
return assets
def discover_huggingface_models(self, search_paths: list[str]) -> list[AIAsset]:
"""Discover locally deployed Hugging Face models."""
assets = []
from pathlib import Path
for search_path in search_paths:
path = Path(search_path)
# Look for model config files that indicate HF models
for config_file in path.rglob("config.json"):
try:
config = json.loads(config_file.read_text())
if "model_type" in config or "architectures" in config:
model_name = config.get(
"_name_or_path",
config_file.parent.name,
)
asset = AIAsset(
name=f"Local HF: {model_name}",
asset_type=AssetType.FOUNDATION_MODEL,
model_provider="huggingface",
model_name=model_name,
is_self_hosted=True,
source_url=f"https://huggingface.co/{model_name}",
deployment_env=DeploymentEnvironment.PRODUCTION,
)
assets.append(asset)
except (json.JSONDecodeError, OSError):
continue
return assets
class AssetRegistry:
"""Central registry for managing AI asset inventory."""
def __init__(self):
self._assets: dict[str, AIAsset] = {}
self._discovery = AssetDiscovery()
def register(self, asset: AIAsset) -> str:
"""Register a new AI asset. Returns the asset ID."""
if not asset.name:
raise ValueError("Asset must have a name")
# Check for duplicates based on key attributes
for existing in self._assets.values():
if (existing.model_name == asset.model_name
and existing.model_provider == asset.model_provider
and existing.api_endpoint == asset.api_endpoint
and existing.deployment_env == asset.deployment_env):
logger.info(
f"Updating existing asset {existing.asset_id} instead of creating duplicate"
)
existing.last_updated = datetime.utcnow()
return existing.asset_id
self._assets[asset.asset_id] = asset
logger.info(f"Registered new AI asset: {asset.name} ({asset.asset_id})")
return asset.asset_id
def get(self, asset_id: str) -> Optional[AIAsset]:
return self._assets.get(asset_id)
def search(
self,
asset_type: Optional[AssetType] = None,
data_classification: Optional[DataClassification] = None,
owner_team: Optional[str] = None,
regulatory_scope: Optional[RegulatoryScope] = None,
unreviewed_only: bool = False,
) -> list[AIAsset]:
"""Search assets with filters."""
results = list(self._assets.values())
if asset_type:
results = [a for a in results if a.asset_type == asset_type]
if data_classification:
results = [a for a in results if a.data_classification == data_classification]
if owner_team:
results = [a for a in results if a.owner_team == owner_team]
if regulatory_scope:
results = [a for a in results if regulatory_scope in a.regulatory_scope]
if unreviewed_only:
results = [a for a in results if a.compliance_status == "unreviewed"]
return results
def dependency_graph(self, asset_id: str) -> dict:
"""Build the upstream/downstream dependency graph for an asset."""
asset = self._assets.get(asset_id)
if not asset:
return {}
visited = set()
def walk_upstream(aid: str) -> dict:
if aid in visited:
return {"id": aid, "circular": True}
visited.add(aid)
a = self._assets.get(aid)
if not a:
return {"id": aid, "missing": True}
return {
"id": aid,
"name": a.name,
"type": a.asset_type.value,
"upstream": [walk_upstream(uid) for uid in a.upstream_assets],
}
return walk_upstream(asset_id)
def compliance_gap_report(self) -> dict:
"""識別 assets with compliance gaps."""
gaps = {
"no_security_review": [],
"stale_review": [],
"missing_sbom": [],
"unclassified_data": [],
"no_owner": [],
"public_exposure_high_sensitivity": [],
}
for asset in self._assets.values():
if asset.last_security_review is None:
gaps["no_security_review"].append(asset.asset_id)
if (asset.last_security_review
and (date.today() - asset.last_security_review).days > 90):
gaps["stale_review"].append(asset.asset_id)
if not asset.sbom_available and asset.is_self_hosted:
gaps["missing_sbom"].append(asset.asset_id)
if asset.data_classification == DataClassification.INTERNAL and asset.regulatory_scope:
gaps["unclassified_data"].append(asset.asset_id)
if not asset.owner_team:
gaps["no_owner"].append(asset.asset_id)
if (asset.network_exposure == "public"
and asset.data_classification in (
DataClassification.CONFIDENTIAL,
DataClassification.RESTRICTED,
DataClassification.REGULATED,
)):
gaps["public_exposure_high_sensitivity"].append(asset.asset_id)
return gapsPhase 2: Risk 評估 Engine
Quantify risk 對每個 AI asset based on multiple dimensions.
# risk/評估.py
"""Quantitative risk 評估 engine for AI assets."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import date
from typing import Optional
from inventory.asset_registry import (
AIAsset, AssetType, DataClassification,
DeploymentEnvironment, RegulatoryScope,
)
@dataclass
class RiskAssessment:
"""Quantified risk 評估 for a single AI asset."""
asset_id: str
asset_name: str
inherent_risk_score: float # Risk without controls (0-100)
control_effectiveness: float # How much controls reduce risk (0-1)
residual_risk_score: float # Risk after controls (0-100)
risk_level: str # CRITICAL, HIGH, MEDIUM, LOW
risk_factors: list[dict] # Individual contributing factors
recommendations: list[str] # Prioritized remediation steps
assessed_date: date
@property
def risk_reduction_percentage(self) -> float:
if self.inherent_risk_score == 0:
return 0
return (1 - self.residual_risk_score / self.inherent_risk_score) * 100
class AIRiskAssessor:
"""Assesses risk for AI assets across multiple dimensions."""
# Risk weights 對每個 dimension (must sum to 1.0)
DIMENSION_WEIGHTS = {
"data_sensitivity": 0.25,
"deployment_exposure": 0.20,
"model_provenance": 0.15,
"regulatory_impact": 0.15,
"capability_risk": 0.15,
"supply_chain": 0.10,
}
def 評估(self, asset: AIAsset, controls: Optional[list[dict]] = None) -> RiskAssessment:
"""Run a full risk 評估 on an AI asset."""
risk_factors = []
# Data sensitivity dimension
data_score = self._score_data_sensitivity(asset)
risk_factors.append({
"dimension": "data_sensitivity",
"score": data_score,
"weight": self.DIMENSION_WEIGHTS["data_sensitivity"],
"details": f"Data classification: {asset.data_classification.value}",
})
# Deployment exposure dimension
exposure_score = self._score_deployment_exposure(asset)
risk_factors.append({
"dimension": "deployment_exposure",
"score": exposure_score,
"weight": self.DIMENSION_WEIGHTS["deployment_exposure"],
"details": f"Network: {asset.network_exposure}, Env: {asset.deployment_env.value}",
})
# Model provenance dimension
provenance_score = self._score_model_provenance(asset)
risk_factors.append({
"dimension": "model_provenance",
"score": provenance_score,
"weight": self.DIMENSION_WEIGHTS["model_provenance"],
"details": f"Provider: {asset.model_provider}, Self-hosted: {asset.is_self_hosted}",
})
# Regulatory impact dimension
regulatory_score = self._score_regulatory_impact(asset)
risk_factors.append({
"dimension": "regulatory_impact",
"score": regulatory_score,
"weight": self.DIMENSION_WEIGHTS["regulatory_impact"],
"details": f"Regulatory scope: {[r.value for r in asset.regulatory_scope]}",
})
# Capability risk dimension
capability_score = self._score_capability_risk(asset)
risk_factors.append({
"dimension": "capability_risk",
"score": capability_score,
"weight": self.DIMENSION_WEIGHTS["capability_risk"],
"details": f"Asset type: {asset.asset_type.value}",
})
# 供應鏈 dimension
supply_chain_score = self._score_supply_chain(asset)
risk_factors.append({
"dimension": "supply_chain",
"score": supply_chain_score,
"weight": self.DIMENSION_WEIGHTS["supply_chain"],
"details": f"SBOM: {asset.sbom_available}, License: {asset.license_type}",
})
# Calculate inherent risk score
inherent_risk = sum(
f["score"] * f["weight"] for f in risk_factors
)
# Calculate control effectiveness
control_effectiveness = self._evaluate_controls(asset, controls or [])
# Residual risk = inherent risk * (1 - control effectiveness)
residual_risk = inherent_risk * (1 - control_effectiveness)
# Determine risk level
if residual_risk >= 75:
risk_level = "CRITICAL"
elif residual_risk >= 50:
risk_level = "HIGH"
elif residual_risk >= 25:
risk_level = "MEDIUM"
else:
risk_level = "LOW"
recommendations = self._generate_recommendations(risk_factors, asset)
return RiskAssessment(
asset_id=asset.asset_id,
asset_name=asset.name,
inherent_risk_score=round(inherent_risk, 2),
control_effectiveness=round(control_effectiveness, 2),
residual_risk_score=round(residual_risk, 2),
risk_level=risk_level,
risk_factors=risk_factors,
recommendations=recommendations,
assessed_date=date.today(),
)
def _score_data_sensitivity(self, asset: AIAsset) -> float:
"""Score risk from data sensitivity (0-100)."""
scores = {
DataClassification.PUBLIC: 10,
DataClassification.INTERNAL: 30,
DataClassification.CONFIDENTIAL: 60,
DataClassification.RESTRICTED: 85,
DataClassification.REGULATED: 95,
}
return scores.get(asset.data_classification, 50)
def _score_deployment_exposure(self, asset: AIAsset) -> float:
exposure_scores = {"internal": 20, "vpc": 40, "public": 90}
env_multipliers = {
DeploymentEnvironment.DEVELOPMENT: 0.4,
DeploymentEnvironment.STAGING: 0.6,
DeploymentEnvironment.PRODUCTION: 1.0,
DeploymentEnvironment.RESEARCH: 0.3,
}
base = exposure_scores.get(asset.network_exposure, 50)
multiplier = env_multipliers.get(asset.deployment_env, 0.5)
return min(100, base * multiplier)
def _score_model_provenance(self, asset: AIAsset) -> float:
if asset.model_provider in ("openai", "anthropic", "google"):
base = 20 # Major providers have 安全 programs
elif asset.model_provider in ("azure_openai", "aws_bedrock", "gcp_vertex"):
base = 15 # 雲端-managed services add 安全 layers
elif asset.is_self_hosted:
base = 60 # Self-hosted requires own 安全 posture
else:
base = 50
if not asset.last_security_review:
base += 20
elif (date.today() - asset.last_security_review).days > 90:
base += 10
return min(100, base)
def _score_regulatory_impact(self, asset: AIAsset) -> float:
if not asset.regulatory_scope:
return 10
high_impact = {
RegulatoryScope.EU_AI_ACT_HIGH_RISK, RegulatoryScope.HIPAA,
RegulatoryScope.SOX, RegulatoryScope.PCI_DSS,
}
if any(r in high_impact for r in asset.regulatory_scope):
return 90
return 50
def _score_capability_risk(self, asset: AIAsset) -> float:
scores = {
AssetType.AI_AGENT: 90, # 代理 can take actions
AssetType.FINE_TUNED_MODEL: 70, # Custom 訓練資料 risks
AssetType.FOUNDATION_MODEL: 50,
AssetType.API_ENDPOINT: 60,
AssetType.VECTOR_DATABASE: 55,
AssetType.EMBEDDING_MODEL: 30,
AssetType.TRAINING_DATASET: 45,
AssetType.PROMPT_TEMPLATE: 25,
AssetType.GUARDRAIL_CONFIG: 20,
AssetType.EVALUATION_DATASET: 15,
}
return scores.get(asset.asset_type, 50)
def _score_supply_chain(self, asset: AIAsset) -> float:
score = 50
if asset.sbom_available:
score -= 20
if asset.license_type and asset.license_type != "unknown":
score -= 10
if asset.source_url and "huggingface.co" in (asset.source_url or ""):
score += 10 # Open model repos have higher 供應鏈 risk
return max(0, min(100, score))
def _evaluate_controls(self, asset: AIAsset, controls: list[dict]) -> float:
"""Estimate control effectiveness based on deployed controls."""
if not controls:
return 0.0
control_scores = {
"input_filtering": 0.15,
"output_filtering": 0.15,
"rate_limiting": 0.10,
"認證": 0.10,
"encryption_at_rest": 0.10,
"encryption_in_transit": 0.10,
"監控": 0.10,
"護欄": 0.10,
"access_logging": 0.05,
"network_isolation": 0.05,
}
effectiveness = 0.0
for control in controls:
control_type = control.get("type", "")
if control_type in control_scores:
# Each control contributes based on its weight and operational status
operational = control.get("operational", False)
if operational:
effectiveness += control_scores[control_type]
return min(1.0, effectiveness)
def _generate_recommendations(
self, risk_factors: list[dict], asset: AIAsset
) -> list[str]:
"""Generate prioritized remediation recommendations."""
recs = []
# Sort risk factors by weighted contribution (descending)
sorted_factors = sorted(
risk_factors,
key=lambda f: f["score"] * f["weight"],
reverse=True,
)
for factor in sorted_factors[:3]: # Top 3 risk drivers
dim = factor["dimension"]
if dim == "data_sensitivity" and factor["score"] > 60:
recs.append(
"實作 data loss prevention (DLP) controls on model inputs and outputs. "
"考慮 PII 偵測 and redaction before data reaches 模型."
)
elif dim == "deployment_exposure" and factor["score"] > 50:
recs.append(
"Reduce network exposure by deploying behind a VPC with private endpoints. "
"實作 API gateway with 認證 and rate limiting."
)
elif dim == "model_provenance" and factor["score"] > 40:
recs.append(
"Conduct a 安全 review of 模型 and its 供應鏈. "
"Generate an AI SBOM and verify model integrity checksums."
)
elif dim == "regulatory_impact" and factor["score"] > 60:
recs.append(
"Map regulatory obligations to specific technical controls. "
"實作 audit logging that satisfies retention requirements."
)
elif dim == "capability_risk" and factor["score"] > 60:
recs.append(
"Apply principle of least privilege to model/代理 capabilities. "
"實作 human-in-the-loop approval for high-impact actions."
)
elif dim == "supply_chain" and factor["score"] > 40:
recs.append(
"Generate and maintain an AI Software Bill of Materials (SBOM). "
"Monitor for 漏洞 in model dependencies and frameworks."
)
return recsPhase 3: AI Incident Response Playbook Engine
Build the incident response framework with AI-specific playbooks.
# incident_response/playbooks.py
"""AI-specific incident response playbook engine."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class IncidentSeverity(Enum):
SEV1 = "sev1" # Active 利用, data breach, system compromise
SEV2 = "sev2" # Confirmed 漏洞, potential data exposure
SEV3 = "sev3" # Suspicious activity, policy violation
SEV4 = "sev4" # Informational, potential false positive
class IncidentCategory(Enum):
PROMPT_INJECTION_EXPLOITATION = "prompt_injection_exploitation"
DATA_EXFILTRATION_VIA_MODEL = "data_exfiltration_via_model"
MODEL_SUPPLY_CHAIN_COMPROMISE = "model_supply_chain_compromise"
AGENT_UNAUTHORIZED_ACTIONS = "agent_unauthorized_actions"
TRAINING_DATA_POISONING = "training_data_poisoning"
MODEL_DENIAL_OF_SERVICE = "model_denial_of_service"
PII_LEAKAGE_IN_OUTPUT = "pii_leakage_in_output"
JAILBREAK_BYPASS = "jailbreak_bypass"
@dataclass
class PlaybookStep:
"""A single step in an incident response playbook."""
step_number: int
phase: str # 偵測, containment, eradication, recovery, lessons_learned
action: str
responsible_role: str
automated: bool
automation_script: Optional[str] = None
timeout_minutes: int = 30
escalation_trigger: Optional[str] = None
@dataclass
class Playbook:
"""Complete incident response playbook for an AI incident category."""
category: IncidentCategory
title: str
description: str
severity_default: IncidentSeverity
steps: list[PlaybookStep] = field(default_factory=list)
detection_rules: list[dict] = field(default_factory=list)
containment_actions: list[str] = field(default_factory=list)
evidence_to_collect: list[str] = field(default_factory=list)
notification_requirements: list[str] = field(default_factory=list)
@dataclass
class Incident:
"""A tracked AI 安全 incident."""
incident_id: str
category: IncidentCategory
severity: IncidentSeverity
title: str
description: str
affected_assets: list[str] # Asset IDs from the registry
detected_at: datetime
detected_by: str # Person, system, or rule that detected
current_phase: str = "偵測"
assigned_to: Optional[str] = None
timeline: list[dict] = field(default_factory=list)
evidence: list[dict] = field(default_factory=list)
status: str = "open"
class PlaybookLibrary:
"""Library of AI-specific incident response playbooks."""
def __init__(self):
self._playbooks: dict[IncidentCategory, Playbook] = {}
self._register_default_playbooks()
def _register_default_playbooks(self):
"""Register built-in playbooks for common AI incident categories."""
# Prompt injection 利用 playbook
self._playbooks[IncidentCategory.PROMPT_INJECTION_EXPLOITATION] = Playbook(
category=IncidentCategory.PROMPT_INJECTION_EXPLOITATION,
title="提示詞注入 利用 Response",
description=(
"Response procedure for confirmed 提示詞注入 attacks that have "
"bypassed 護欄 and caused 模型 to execute unauthorized instructions."
),
severity_default=IncidentSeverity.SEV2,
steps=[
PlaybookStep(
step_number=1,
phase="偵測",
action="Verify the alert is a true positive by reviewing the flagged request/response pair in the 監控 system",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=15,
),
PlaybookStep(
step_number=2,
phase="偵測",
action="Classify the injection type (direct, indirect, multi-turn) and determine the achieved impact (data leak, instruction override, tool abuse)",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=30,
),
PlaybookStep(
step_number=3,
phase="containment",
action="If injection achieved tool execution or data access: immediately revoke the affected API session and rotate any exposed credentials",
responsible_role="ai_security_engineer",
automated=True,
automation_script="scripts/revoke_session.py",
timeout_minutes=5,
escalation_trigger="If credential rotation fails, escalate to SEV1",
),
PlaybookStep(
step_number=4,
phase="containment",
action="Deploy an emergency 護欄 rule that blocks the specific injection pattern. Use exact-match and regex patterns derived from the attack payload.",
responsible_role="ai_security_engineer",
automated=True,
automation_script="scripts/deploy_emergency_guardrail.py",
timeout_minutes=15,
),
PlaybookStep(
step_number=5,
phase="eradication",
action="Analyze the full conversation history for the attacking session to determine if additional payloads were attempted or if data was exfiltrated across multiple turns",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=60,
),
PlaybookStep(
step_number=6,
phase="eradication",
action="測試 the injection pattern against the current 護欄 configuration to verify the emergency rule is effective. 測試 variations and mutations of the pattern.",
responsible_role="ai_red_team",
automated=False,
timeout_minutes=120,
),
PlaybookStep(
step_number=7,
phase="recovery",
action="If 系統提示詞 or confidential data was leaked, rotate the 系統提示詞 and notify affected data owners per the data breach notification policy",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=60,
),
PlaybookStep(
step_number=8,
phase="lessons_learned",
action="Document the incident in the 漏洞 資料庫 with full attack chain, root cause analysis, and timeline. Update 偵測 rules to catch similar patterns.",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=240,
),
],
detection_rules=[
{
"name": "injection_keyword_match",
"description": "Detect known injection patterns in 使用者輸入",
"type": "regex",
"pattern": r"(ignore|disregard|forget)\s+(all\s+)?(previous|prior|above)\s+(instructions|prompts|rules)",
},
{
"name": "output_anomaly",
"description": "Detect when model 輸出 contains 系統提示詞 fragments",
"type": "similarity",
"threshold": 0.85,
"reference": "system_prompt_embeddings",
},
{
"name": "behavioral_shift",
"description": "Detect sudden change in model 輸出 characteristics within a session",
"type": "statistical",
"metric": "output_entropy_delta",
"threshold": 2.0,
},
],
containment_actions=[
"Terminate the affected user session",
"Block the source IP/API key temporarily",
"Enable enhanced logging on the affected endpoint",
"Deploy emergency 輸入 filter for the specific payload pattern",
],
evidence_to_collect=[
"Full conversation history for the affected session",
"輸入/輸出 logs with timestamps",
"護欄 評估 logs showing why the attack was not blocked",
"API access logs for the attacking identity",
"Model configuration at time of incident (系統提示詞, temperature, tools)",
],
notification_requirements=[
"AI 安全 team lead within 30 minutes",
"CISO within 2 hours for SEV1/SEV2",
"Legal/compliance if PII or regulated data was exposed",
"Affected application team within 1 hour",
],
)
# 代理 unauthorized actions playbook
self._playbooks[IncidentCategory.AGENT_UNAUTHORIZED_ACTIONS] = Playbook(
category=IncidentCategory.AGENT_UNAUTHORIZED_ACTIONS,
title="代理 Unauthorized Actions Response",
description=(
"Response procedure when an AI 代理 executes actions outside its "
"authorized scope, whether triggered by 提示詞注入, misconfiguration, "
"or emergent behavior."
),
severity_default=IncidentSeverity.SEV1,
steps=[
PlaybookStep(
step_number=1,
phase="containment",
action="IMMEDIATELY disable the 代理's tool access and API credentials. Do not wait for analysis — contain first.",
responsible_role="on_call_engineer",
automated=True,
automation_script="scripts/disable_agent_tools.py",
timeout_minutes=5,
escalation_trigger="If 代理 cannot be disabled within 5 minutes, escalate to infrastructure team for network-level block",
),
PlaybookStep(
step_number=2,
phase="containment",
action="識別 all actions the 代理 executed by reviewing 工具呼叫 logs. Determine the blast radius — which systems, data, and users were affected.",
responsible_role="ai_security_analyst",
automated=False,
timeout_minutes=30,
),
PlaybookStep(
step_number=3,
phase="eradication",
action="Reverse unauthorized actions where possible (delete created resources, revert data changes, revoke granted 權限)",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=120,
),
PlaybookStep(
step_number=4,
phase="recovery",
action="Before re-enabling the 代理, 實作 stricter tool-use policies: explicit allow-lists, human-in-the-loop for sensitive actions, rate limits on tool calls",
responsible_role="ai_security_engineer",
automated=False,
timeout_minutes=240,
),
],
detection_rules=[
{
"name": "unauthorized_tool_call",
"description": "代理 called a tool not in its authorized tool set",
"type": "policy",
"check": "tool_name NOT IN 代理.authorized_tools",
},
{
"name": "excessive_tool_calls",
"description": "代理 making unusually many tool calls in a time window",
"type": "statistical",
"metric": "tool_calls_per_minute",
"threshold": 20,
},
],
containment_actions=[
"Disable all 代理 tool access immediately",
"Revoke 代理 API credentials",
"Block 代理 network access at the firewall level if needed",
],
evidence_to_collect=[
"Complete 代理 execution trace with all tool calls and responses",
"The triggering 使用者輸入 that led to unauthorized actions",
"代理 configuration including 系統提示詞 and tool definitions",
"Logs from all downstream systems the 代理 interacted with",
],
notification_requirements=[
"AI 安全 team lead IMMEDIATELY",
"CISO within 1 hour",
"Owners of all affected downstream systems within 1 hour",
"Legal if customer data was accessed or modified",
],
)
def get_playbook(self, category: IncidentCategory) -> Optional[Playbook]:
return self._playbooks.get(category)
def list_playbooks(self) -> list[dict]:
return [
{
"category": pb.category.value,
"title": pb.title,
"severity_default": pb.severity_default.value,
"num_steps": len(pb.steps),
}
for pb in self._playbooks.values()
]Phase 4: Program Metrics and Maturity Dashboard
Track program effectiveness with quantifiable metrics.
# metrics/dashboard.py
"""AI 安全 program metrics and maturity scoring."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import date, timedelta
from typing import Optional
@dataclass
class ProgramMetrics:
"""Key performance indicators for the AI 安全 program."""
# Asset management metrics
total_ai_assets: int = 0
assets_with_security_review: int = 0
assets_with_stale_review: int = 0 # Review older than 90 days
asset_coverage_rate: float = 0.0
# Risk metrics
critical_risk_assets: int = 0
high_risk_assets: int = 0
mean_residual_risk: float = 0.0
risk_acceptance_count: int = 0
# Incident metrics
incidents_last_30_days: int = 0
mean_time_to_detect_hours: float = 0.0
mean_time_to_contain_hours: float = 0.0
mean_time_to_resolve_hours: float = 0.0
incidents_by_category: dict = field(default_factory=dict)
# Control metrics
controls_deployed: int = 0
controls_operational: int = 0
control_coverage_rate: float = 0.0
# Red team metrics
red_team_engagements_ytd: int = 0
findings_from_red_team: int = 0
findings_remediated: int = 0
remediation_rate: float = 0.0
# Compliance metrics
compliance_gap_count: int = 0
overdue_actions: int = 0
@dataclass
class MaturityLevel:
"""CMMI-inspired maturity level for AI 安全 program dimensions."""
dimension: str
level: int # 1-5
level_name: str # Initial, Managed, Defined, Quantitatively Managed, Optimizing
score: float # 0-100 within the level
evidence: list[str] = field(default_factory=list)
gaps_to_next_level: list[str] = field(default_factory=list)
def assess_program_maturity(metrics: ProgramMetrics) -> list[MaturityLevel]:
"""
評估 AI 安全 program maturity across key dimensions.
Returns maturity levels inspired by CMMI, adapted for AI 安全:
Level 1 (Initial): Ad-hoc, reactive, no formal processes
Level 2 (Managed): Basic processes, some documentation
Level 3 (Defined): Standardized processes, proactive controls
Level 4 (Quantitative): Metrics-driven, continuous measurement
Level 5 (Optimizing): Continuous improvement, predictive capabilities
"""
levels = []
# Asset management maturity
if metrics.asset_coverage_rate >= 0.95:
asset_level = 4
asset_name = "Quantitatively Managed"
asset_gaps = ["實作 predictive asset risk scoring", "Automate SBOM generation"]
elif metrics.asset_coverage_rate >= 0.80:
asset_level = 3
asset_name = "Defined"
asset_gaps = ["Achieve 95%+ asset coverage", "Automate discovery for all 雲端 providers"]
elif metrics.asset_coverage_rate >= 0.50:
asset_level = 2
asset_name = "Managed"
asset_gaps = ["Standardize asset classification", "實作 dependency tracking"]
else:
asset_level = 1
asset_name = "Initial"
asset_gaps = ["Deploy automated asset discovery", "Define asset classification policy"]
levels.append(MaturityLevel(
dimension="Asset Management",
level=asset_level,
level_name=asset_name,
score=metrics.asset_coverage_rate * 100,
evidence=[f"Coverage: {metrics.asset_coverage_rate:.0%}",
f"Total assets tracked: {metrics.total_ai_assets}"],
gaps_to_next_level=asset_gaps,
))
# Incident response maturity
if metrics.mean_time_to_detect_hours < 1 and metrics.mean_time_to_contain_hours < 4:
ir_level = 4
ir_name = "Quantitatively Managed"
ir_gaps = ["實作 automated response for common incident types"]
elif metrics.mean_time_to_detect_hours < 4:
ir_level = 3
ir_name = "Defined"
ir_gaps = ["Reduce MTTD below 1 hour", "Automate containment for SEV1"]
elif metrics.mean_time_to_detect_hours < 24:
ir_level = 2
ir_name = "Managed"
ir_gaps = ["Deploy AI-specific 偵測 rules", "Create playbooks for all categories"]
else:
ir_level = 1
ir_name = "Initial"
ir_gaps = ["實作 basic 監控", "Define incident categories"]
levels.append(MaturityLevel(
dimension="Incident Response",
level=ir_level,
level_name=ir_name,
score=max(0, 100 - metrics.mean_time_to_detect_hours * 5),
evidence=[
f"MTTD: {metrics.mean_time_to_detect_hours:.1f}h",
f"MTTC: {metrics.mean_time_to_contain_hours:.1f}h",
f"Incidents (30d): {metrics.incidents_last_30_days}",
],
gaps_to_next_level=ir_gaps,
))
# Red team maturity
if metrics.red_team_engagements_ytd >= 4 and metrics.remediation_rate >= 0.90:
rt_level = 4
rt_name = "Quantitatively Managed"
rt_gaps = ["實作 continuous automated 紅隊演練"]
elif metrics.red_team_engagements_ytd >= 2:
rt_level = 3
rt_name = "Defined"
rt_gaps = ["Increase to quarterly engagements", "Achieve 90%+ remediation rate"]
elif metrics.red_team_engagements_ytd >= 1:
rt_level = 2
rt_name = "Managed"
rt_gaps = ["Define 紅隊 methodology", "Track remediation systematically"]
else:
rt_level = 1
rt_name = "Initial"
rt_gaps = ["Conduct first AI 紅隊 engagement", "Establish finding tracking"]
levels.append(MaturityLevel(
dimension="紅隊演練",
level=rt_level,
level_name=rt_name,
score=min(100, metrics.red_team_engagements_ytd * 25),
evidence=[
f"Engagements YTD: {metrics.red_team_engagements_ytd}",
f"Findings: {metrics.findings_from_red_team}",
f"Remediation rate: {metrics.remediation_rate:.0%}",
],
gaps_to_next_level=rt_gaps,
))
return levelsDeliverables Checklist
By the end of this capstone, you should have produced:
- An automated AI asset inventory system with 雲端 discovery modules
- A quantitative risk 評估 engine with configurable scoring weights
- A library of AI-specific incident response playbooks with step-by-step procedures
- 偵測 rules for the top AI threat categories
- A program metrics dashboard with maturity scoring
- Integration points connecting all four pillars into a cohesive program
參考文獻
- NIST AI Risk Management Framework (AI RMF 1.0), https://www.nist.gov/artificial-intelligence/executive-order-safe-secure-and-trustworthy-artificial-intelligence
- ISO/IEC 42001:2023, "Artificial Intelligence Management System," https://www.iso.org/standard/81230.html
- OWASP Top 10 for 大型語言模型 Applications 2025, https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS (對抗性 Threat Landscape for AI Systems), https://atlas.mitre.org/