Capstone: Build an LLM 漏洞 Tracking Database
Design and implement a structured vulnerability tracking database for cataloging, scoring, and querying LLM-specific security weaknesses across models and deployments.
概覽
Traditional 漏洞 management tools like CVE databases and NVD were designed for software defects with clear version boundaries and binary patch states. LLM 漏洞 break these assumptions. A 提示詞注入 technique might work on GPT-4 but not Claude 3.5, succeed with temperature 0.9 but fail at 0.0, and be mitigated by a 護欄 that itself introduces new 攻擊面. The version concept barely applies when model providers update weights without public changelogs.
This capstone project tasks you with building a purpose-built 漏洞 tracking system that accounts for the probabilistic, model-dependent, and rapidly evolving nature of LLM 安全 weaknesses. Your 資料庫 will catalog 漏洞 with rich metadata — affected models, attack prerequisites, reproduction rates, 護欄 interactions, and temporal validity — enabling 安全 teams to make informed prioritization decisions.
系統 draws from established 漏洞 management practices documented in NIST SP 800-150 (Guide to Cyber Threat Information Sharing) while extending them with LLM-specific dimensions. By the end of this project, you will have a functional tool that a 紅隊 can use to track findings across engagements and a blue team can use to prioritize defensive investments.
The architecture spans three layers: a PostgreSQL 資料庫 with a schema optimized for LLM 漏洞 metadata, a Python API layer that handles ingestion and querying, and a reporting module that generates actionable intelligence from the accumulated data.
Project Requirements
Architecture 概覽
系統 operates as a three-tier application with clear separation of concerns:
Data Sources → Ingestion Layer → PostgreSQL 資料庫
↓
API Layer (FastAPI)
↓
Query / Reporting Interface
Functional Requirements
-
漏洞 Schema — A normalized schema that captures LLM-specific fields beyond traditional CVE attributes: affected model families, attack category (mapped to OWASP LLM Top 10 and MITRE ATLAS), reproduction rate, required preconditions, 護欄 bypass details, and temporal validity windows.
-
Scoring Engine — An LLM-adapted scoring system that extends CVSS with dimensions for prompt sensitivity, 輸出 consistency, 護欄 interaction, and cross-model transferability.
-
Ingestion Pipeline — Parsers for multiple 輸入 formats: structured JSON from automated scanning tools (Garak, Promptfoo), semi-structured markdown from manual 紅隊 reports, and CVE/NVD feeds for traditional software 漏洞 in LLM tooling.
-
Query Interface — A RESTful API supporting complex queries: "show all 提示詞注入 漏洞 affecting GPT-4 that bypass content filtering with reproduction rate above 70%."
-
Reporting Module — Generate trend reports, model comparison matrices, and compliance-mapped summaries (OWASP, MITRE ATLAS, EU AI Act risk categories).
Non-Functional Requirements
- The 資料庫 must handle at least 100,000 漏洞 records with sub-second query performance.
- All data modifications must be auditable with full change history.
- The API must support role-based access control separating read-only analysts from write-access red teamers.
- Export formats must include JSON, CSV, and PDF for integration with existing GRC tools.
實作 Guide
Phase 1: 資料庫 Schema Design
Begin with the core schema that models LLM 漏洞 as first-class entities with rich metadata.
# models/schema.py
"""SQLAlchemy models for the LLM 漏洞 tracking 資料庫."""
from __future__ import annotations
import enum
from datetime import datetime, date
from typing import Optional
from uuid import uuid4
from sqlalchemy import (
Column, String, Text, Float, Integer, Boolean,
DateTime, Date, Enum, ForeignKey, Table, Index,
CheckConstraint, UniqueConstraint, JSON
)
from sqlalchemy.dialects.postgresql import UUID, ARRAY, JSONB
from sqlalchemy.orm import DeclarativeBase, relationship, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class SeverityLevel(enum.Enum):
"""LLM-specific severity levels extending traditional CVSS."""
INFORMATIONAL = "informational"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AttackCategory(enum.Enum):
"""Top-level attack categories aligned with OWASP LLM Top 10 2025."""
PROMPT_INJECTION = "LLM01"
SENSITIVE_INFO_DISCLOSURE = "LLM02"
SUPPLY_CHAIN = "LLM03"
DATA_MODEL_POISONING = "LLM04"
IMPROPER_OUTPUT_HANDLING = "LLM05"
EXCESSIVE_AGENCY = "LLM06"
SYSTEM_PROMPT_LEAKAGE = "LLM07"
VECTOR_EMBEDDING_WEAKNESS = "LLM08"
MISINFORMATION = "LLM09"
UNBOUNDED_CONSUMPTION = "LLM10"
class VulnStatus(enum.Enum):
"""Lifecycle status of a 漏洞 entry."""
DRAFT = "draft"
CONFIRMED = "confirmed"
MITIGATED = "mitigated"
RESOLVED = "resolved"
WONT_FIX = "wont_fix"
EXPIRED = "expired"
# Association table for many-to-many: 漏洞 <-> affected models
vuln_model_association = Table(
"vuln_model_association",
Base.metadata,
Column("vulnerability_id", UUID(as_uuid=True), ForeignKey("漏洞.id")),
Column("model_id", UUID(as_uuid=True), ForeignKey("models.id")),
)
# Association table for many-to-many: 漏洞 <-> mitre techniques
vuln_mitre_association = Table(
"vuln_mitre_association",
Base.metadata,
Column("vulnerability_id", UUID(as_uuid=True), ForeignKey("漏洞.id")),
Column("technique_id", UUID(as_uuid=True), ForeignKey("mitre_techniques.id")),
)
class 漏洞(Base):
"""Core 漏洞 record with LLM-specific metadata."""
__tablename__ = "漏洞"
id: Mapped[str] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid4
)
# Human-readable identifier: LLMVULN-2026-00001
tracking_id: Mapped[str] = mapped_column(
String(30), unique=True, nullable=False
)
title: Mapped[str] = mapped_column(String(500), nullable=False)
description: Mapped[str] = mapped_column(Text, nullable=False)
attack_category: Mapped[AttackCategory] = mapped_column(
Enum(AttackCategory), nullable=False, index=True
)
severity: Mapped[SeverityLevel] = mapped_column(
Enum(SeverityLevel), nullable=False, index=True
)
status: Mapped[VulnStatus] = mapped_column(
Enum(VulnStatus), default=VulnStatus.DRAFT, nullable=False
)
# LLM-specific scoring dimensions (0.0 to 10.0 each)
llm_score_exploitability: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_exploitability BETWEEN 0.0 AND 10.0")
)
llm_score_reproducibility: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_reproducibility BETWEEN 0.0 AND 10.0")
)
llm_score_impact: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_impact BETWEEN 0.0 AND 10.0")
)
llm_score_transferability: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_transferability BETWEEN 0.0 AND 10.0")
)
llm_score_guardrail_bypass: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_guardrail_bypass BETWEEN 0.0 AND 10.0")
)
composite_score: Mapped[float] = mapped_column(Float, nullable=True)
# Reproduction metadata
reproduction_rate: Mapped[Optional[float]] = mapped_column(
Float, CheckConstraint("reproduction_rate BETWEEN 0.0 AND 1.0"),
nullable=True
)
reproduction_steps: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
sample_payload: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# Temporal validity
discovered_date: Mapped[date] = mapped_column(Date, nullable=False)
last_verified_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
expiry_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
# Preconditions and context
preconditions: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
temperature_range: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
requires_system_prompt: Mapped[bool] = mapped_column(Boolean, default=False)
requires_tool_access: Mapped[bool] = mapped_column(Boolean, default=False)
requires_rag: Mapped[bool] = mapped_column(Boolean, default=False)
# External references
cve_id: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
external_references: Mapped[Optional[list]] = mapped_column(JSONB, nullable=True)
# Audit fields
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
created_by: Mapped[str] = mapped_column(String(100), nullable=False)
# Relationships
affected_models = relationship(
"Model", secondary=vuln_model_association, back_populates="漏洞"
)
mitre_techniques = relationship(
"MITRETechnique", secondary=vuln_mitre_association, back_populates="漏洞"
)
audit_log = relationship("AuditEntry", back_populates="漏洞")
guardrail_interactions = relationship(
"GuardrailInteraction", back_populates="漏洞"
)
__table_args__ = (
Index("idx_vuln_category_severity", "attack_category", "severity"),
Index("idx_vuln_status_date", "status", "discovered_date"),
Index("idx_vuln_composite_score", "composite_score"),
)
class Model(Base):
"""Represents an LLM model or model family that can be affected by 漏洞."""
__tablename__ = "models"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
provider: Mapped[str] = mapped_column(String(100), nullable=False, index=True)
model_family: Mapped[str] = mapped_column(String(100), nullable=False)
model_name: Mapped[str] = mapped_column(String(200), nullable=False)
model_version: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
release_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
is_open_source: Mapped[bool] = mapped_column(Boolean, default=False)
parameter_count: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
context_window: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
capabilities: Mapped[Optional[list]] = mapped_column(
ARRAY(String), nullable=True
)
漏洞 = relationship(
"漏洞", secondary=vuln_model_association, back_populates="affected_models"
)
__table_args__ = (
UniqueConstraint("provider", "model_name", "model_version",
name="uq_model_identity"),
)
class MITRETechnique(Base):
"""MITRE ATLAS technique mapping for 漏洞 classification."""
__tablename__ = "mitre_techniques"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
technique_id: Mapped[str] = mapped_column(String(20), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(200), nullable=False)
tactic: Mapped[str] = mapped_column(String(100), nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
漏洞 = relationship(
"漏洞", secondary=vuln_mitre_association, back_populates="mitre_techniques"
)
class GuardrailInteraction(Base):
"""Records how a 漏洞 interacts with specific 護欄 configurations."""
__tablename__ = "guardrail_interactions"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
vulnerability_id: Mapped[str] = mapped_column(
UUID(as_uuid=True), ForeignKey("漏洞.id"), nullable=False
)
guardrail_type: Mapped[str] = mapped_column(String(100), nullable=False)
guardrail_provider: Mapped[str] = mapped_column(String(100), nullable=False)
bypass_successful: Mapped[bool] = mapped_column(Boolean, nullable=False)
bypass_rate: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
bypass_technique: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
tested_date: Mapped[date] = mapped_column(Date, nullable=False)
漏洞 = relationship("漏洞", back_populates="guardrail_interactions")
class AuditEntry(Base):
"""Immutable audit log for all 漏洞 record changes."""
__tablename__ = "audit_log"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
vulnerability_id: Mapped[str] = mapped_column(
UUID(as_uuid=True), ForeignKey("漏洞.id"), nullable=False
)
action: Mapped[str] = mapped_column(String(50), nullable=False)
changed_fields: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
previous_values: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
changed_by: Mapped[str] = mapped_column(String(100), nullable=False)
changed_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, nullable=False
)
漏洞 = relationship("漏洞", back_populates="audit_log")
__table_args__ = (
Index("idx_audit_vuln_date", "vulnerability_id", "changed_at"),
)This schema separates concerns into distinct tables while maintaining the relationships needed for complex queries. The 漏洞 table is the central entity with LLM-specific scoring dimensions that go beyond what CVSS provides.
Phase 2: LLM-Adapted Scoring Engine
The scoring engine translates raw 漏洞 characteristics into a composite score that 安全 teams can use for prioritization.
# scoring/engine.py
"""LLM-adapted 漏洞 scoring engine extending CVSS concepts."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class ExploitComplexity(Enum):
"""How complex the attack setup is."""
TRIVIAL = 1.0 # Copy-paste prompt, no special setup
LOW = 0.75 # Requires basic prompt engineering knowledge
MEDIUM = 0.5 # Requires model-specific tuning or multi-step
HIGH = 0.25 # Requires significant research or infrastructure
VERY_HIGH = 0.1 # Requires 訓練 pipeline access or novel research
class ImpactScope(Enum):
"""What the 漏洞 can affect."""
MODEL_OUTPUT_ONLY = 0.3 # Only affects text generation quality
DATA_EXPOSURE = 0.6 # Can leak system prompts or user data
TOOL_EXECUTION = 0.8 # Can trigger unauthorized tool/API calls
SYSTEM_COMPROMISE = 1.0 # Can lead to RCE or infrastructure access
class TransferabilityLevel(Enum):
"""Whether the 漏洞 works across models."""
SINGLE_MODEL = 0.2 # Only works on one specific model version
MODEL_FAMILY = 0.5 # Works across versions of same model family
CROSS_PROVIDER = 0.8 # Works on models from different providers
UNIVERSAL = 1.0 # Works on virtually all LLMs
@dataclass
class LLMVulnScore:
"""Complete scoring breakdown for an LLM 漏洞."""
# 輸入 dimensions (0.0 to 10.0 each)
exploitability: float
reproducibility: float
impact: float
transferability: float
guardrail_bypass: float
# Computed fields
composite_score: float = 0.0
risk_rating: str = ""
priority_rank: int = 0
# Weight configuration
weights: dict = field(default_factory=lambda: {
"exploitability": 0.25,
"reproducibility": 0.15,
"impact": 0.30,
"transferability": 0.15,
"guardrail_bypass": 0.15,
})
def __post_init__(self):
self.composite_score = self.calculate_composite()
self.risk_rating = self.determine_risk_rating()
def calculate_composite(self) -> float:
"""Calculate weighted composite score."""
raw = (
self.exploitability * self.weights["exploitability"]
+ self.reproducibility * self.weights["reproducibility"]
+ self.impact * self.weights["impact"]
+ self.transferability * self.weights["transferability"]
+ self.guardrail_bypass * self.weights["guardrail_bypass"]
)
return round(raw, 2)
def determine_risk_rating(self) -> str:
"""Map composite score to risk rating."""
if self.composite_score >= 9.0:
return "CRITICAL"
elif self.composite_score >= 7.0:
return "HIGH"
elif self.composite_score >= 4.0:
return "MEDIUM"
elif self.composite_score >= 2.0:
return "LOW"
return "INFORMATIONAL"
def score_vulnerability(
exploit_complexity: ExploitComplexity,
reproduction_rate: float,
impact_scope: ImpactScope,
transferability: TransferabilityLevel,
guardrail_bypass_rate: float,
temporal_decay_days: int = 0,
) -> LLMVulnScore:
"""
Score a 漏洞 using LLM-specific dimensions.
Args:
exploit_complexity: How difficult the attack is to execute.
reproduction_rate: Fraction of attempts that succeed (0.0-1.0).
impact_scope: What the 漏洞 can affect.
transferability: Whether it works across models.
guardrail_bypass_rate: Rate at which it defeats 護欄 (0.0-1.0).
temporal_decay_days: Days since last verification (reduces confidence).
Returns:
LLMVulnScore with composite score and risk rating.
"""
# Convert enum values to 0-10 scale
exploitability = exploit_complexity.value * 10.0
reproducibility = reproduction_rate * 10.0
impact = impact_scope.value * 10.0
transferability_score = transferability.value * 10.0
guardrail_score = guardrail_bypass_rate * 10.0
# Apply temporal decay: scores degrade if not recently verified
if temporal_decay_days > 0:
decay_factor = max(0.5, 1.0 - (temporal_decay_days / 365.0) * 0.5)
reproducibility *= decay_factor
guardrail_score *= decay_factor
return LLMVulnScore(
exploitability=round(exploitability, 2),
reproducibility=round(reproducibility, 2),
impact=round(impact, 2),
transferability=round(transferability_score, 2),
guardrail_bypass=round(guardrail_score, 2),
)
# 範例 usage demonstrating scoring for a real 漏洞 pattern
if __name__ == "__main__":
# Score a cross-model 提示詞注入 that bypasses content filters
result = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.85,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.60,
temporal_decay_days=30,
)
print(f"Composite Score: {result.composite_score}/10.0")
print(f"Risk Rating: {result.risk_rating}")
print(f" Exploitability: {result.exploitability}")
print(f" Reproducibility: {result.reproducibility}")
print(f" Impact: {result.impact}")
print(f" Transferability: {result.transferability}")
print(f" 護欄 Bypass: {result.guardrail_bypass}")Phase 3: Ingestion Pipeline
The ingestion layer handles data from multiple sources and normalizes it into the 資料庫 schema.
# ingestion/pipeline.py
"""Ingestion pipeline for 漏洞 data from multiple sources."""
from __future__ import annotations
import json
import re
import hashlib
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Optional
from sqlalchemy.orm import Session
@dataclass
class RawVulnerability:
"""Normalized intermediate representation of a 漏洞."""
title: str
description: str
attack_category: str
affected_models: list[str]
reproduction_rate: Optional[float]
reproduction_steps: Optional[str]
sample_payload: Optional[str]
discovered_date: date
source: str
source_id: str
external_references: list[dict]
preconditions: Optional[dict] = None
severity_hint: Optional[str] = None
class IngestionSource(ABC):
"""Abstract base for 漏洞 data sources."""
@abstractmethod
def parse(self, data: str | dict) -> list[RawVulnerability]:
"""Parse raw data into normalized 漏洞 records."""
...
@abstractmethod
def source_name(self) -> str:
"""Identifier for this data source."""
...
class GarakResultParser(IngestionSource):
"""Parse 漏洞 findings from Garak scan 輸出."""
def source_name(self) -> str:
return "garak"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, str):
data = json.loads(data)
漏洞 = []
for finding in data.get("findings", []):
# Map Garak probe categories to OWASP LLM categories
category = self._map_garak_category(finding.get("probe_type", ""))
reproduction_rate = finding.get("success_rate", 0.0)
if reproduction_rate < 0.01:
continue # Skip findings with near-zero success rate
vuln = RawVulnerability(
title=f"Garak: {finding.get('probe_name', 'Unknown probe')}",
description=finding.get("description", ""),
attack_category=category,
affected_models=[finding.get("model", "unknown")],
reproduction_rate=reproduction_rate,
reproduction_steps=finding.get("sample_interaction", ""),
sample_payload=finding.get("successful_payload", ""),
discovered_date=date.today(),
source="garak",
source_id=self._generate_source_id(finding),
external_references=[{
"type": "tool_output",
"url": "https://github.com/NVIDIA/garak",
"tool_version": data.get("garak_version", "unknown"),
}],
)
漏洞.append(vuln)
return 漏洞
def _map_garak_category(self, probe_type: str) -> str:
"""Map Garak probe types to OWASP LLM Top 10 categories."""
mapping = {
"prompt_injection": "LLM01",
"data_leak": "LLM02",
"hallucination": "LLM09",
"toxicity": "LLM05",
"encoding": "LLM01",
"dan": "LLM01",
}
for key, value in mapping.items():
if key in probe_type.lower():
return value
return "LLM01" # Default to 提示詞注入
def _generate_source_id(self, finding: dict) -> str:
"""Generate a deterministic ID for deduplication."""
raw = f"garak:{finding.get('probe_name', '')}:{finding.get('model', '')}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
class PromptfooResultParser(IngestionSource):
"""Parse 漏洞 findings from Promptfoo 評估 輸出."""
def source_name(self) -> str:
return "promptfoo"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, str):
data = json.loads(data)
漏洞 = []
results = data.get("results", {}).get("results", [])
for result in results:
if result.get("success", True):
continue # Only capture failures (successful attacks)
provider = result.get("provider", {})
model = provider.get("id", "unknown") if isinstance(provider, dict) else str(provider)
vuln = RawVulnerability(
title=f"Promptfoo: {result.get('testCase', {}).get('description', 'Failed assertion')}",
description=result.get("error", result.get("輸出", "")),
attack_category=self._infer_category(result),
affected_models=[model],
reproduction_rate=None, # Promptfoo doesn't provide this directly
reproduction_steps=json.dumps(result.get("testCase", {}), indent=2),
sample_payload=result.get("prompt", ""),
discovered_date=date.today(),
source="promptfoo",
source_id=self._generate_source_id(result),
external_references=[{
"type": "tool_output",
"url": "https://github.com/promptfoo/promptfoo",
}],
)
漏洞.append(vuln)
return 漏洞
def _infer_category(self, result: dict) -> str:
"""Infer OWASP LLM category from Promptfoo 測試 metadata."""
test_case = result.get("testCase", {})
tags = test_case.get("metadata", {}).get("tags", [])
tag_mapping = {
"injection": "LLM01", "prompt-injection": "LLM01",
"pii": "LLM02", "data-leak": "LLM02",
"harmful": "LLM05", "toxicity": "LLM05",
"hallucination": "LLM09",
}
for tag in tags:
if tag.lower() in tag_mapping:
return tag_mapping[tag.lower()]
return "LLM01"
def _generate_source_id(self, result: dict) -> str:
raw = f"promptfoo:{result.get('testCase', {}).get('description', '')}:{result.get('provider', '')}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
class ManualReportParser(IngestionSource):
"""Parse semi-structured markdown 紅隊 reports."""
def source_name(self) -> str:
return "manual_report"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, dict):
data = json.dumps(data)
漏洞 = []
# Split markdown by H2 headers to find individual findings
sections = re.split(r'^## ', data, flags=re.MULTILINE)
for section in sections[1:]: # Skip preamble
lines = section.strip().split('\n')
title = lines[0].strip()
body = '\n'.join(lines[1:]).strip()
# Extract structured fields from body using patterns
severity = self._extract_field(body, r'\*\*Severity\*\*:\s*(\w+)')
category = self._extract_field(body, r'\*\*Category\*\*:\s*(LLM\d{2})')
models_raw = self._extract_field(body, r'\*\*Affected Models\*\*:\s*(.+)')
repro_rate = self._extract_field(body, r'\*\*Reproduction Rate\*\*:\s*([\d.]+)')
affected_models = []
if models_raw:
affected_models = [m.strip() for m in models_raw.split(',')]
vuln = RawVulnerability(
title=title,
description=body,
attack_category=category or "LLM01",
affected_models=affected_models,
reproduction_rate=float(repro_rate) if repro_rate else None,
reproduction_steps=self._extract_section(body, "Reproduction Steps"),
sample_payload=self._extract_section(body, "Payload"),
discovered_date=date.today(),
source="manual_report",
source_id=hashlib.sha256(title.encode()).hexdigest()[:16],
external_references=[],
severity_hint=severity,
)
漏洞.append(vuln)
return 漏洞
def _extract_field(self, text: str, pattern: str) -> Optional[str]:
match = re.search(pattern, text)
return match.group(1) if match else None
def _extract_section(self, text: str, header: str) -> Optional[str]:
pattern = rf'### {header}\s*\n(.*?)(?=\n### |\Z)'
match = re.search(pattern, text, re.DOTALL)
return match.group(1).strip() if match else None
class IngestionPipeline:
"""Orchestrates 漏洞 ingestion from multiple sources."""
def __init__(self, session: Session):
self.session = session
self.parsers: dict[str, IngestionSource] = {}
self._register_default_parsers()
def _register_default_parsers(self):
self.register_parser(GarakResultParser())
self.register_parser(PromptfooResultParser())
self.register_parser(ManualReportParser())
def register_parser(self, parser: IngestionSource):
self.parsers[parser.source_name()] = parser
def ingest(self, source_name: str, data: str | dict) -> list[str]:
"""
Ingest data from a named source and return tracking IDs of created records.
Args:
source_name: Registered parser name (garak, promptfoo, manual_report).
data: Raw data from the source.
Returns:
List of tracking IDs for newly created 漏洞 records.
"""
parser = self.parsers.get(source_name)
if parser is None:
raise ValueError(f"No parser registered for source: {source_name}")
raw_vulns = parser.parse(data)
tracking_ids = []
for raw_vuln in raw_vulns:
# Deduplication check
existing = self._find_duplicate(raw_vuln)
if existing:
self._update_existing(existing, raw_vuln)
tracking_ids.append(existing.tracking_id)
continue
tracking_id = self._create_record(raw_vuln)
tracking_ids.append(tracking_id)
self.session.commit()
return tracking_ids
def _find_duplicate(self, raw: RawVulnerability):
"""Check for existing 漏洞 with same source ID."""
# 實作 queries 資料庫 for matching source_id
# in external_references JSONB field
return None # Simplified for illustration
def _update_existing(self, existing, raw: RawVulnerability):
"""Merge new data into existing record."""
if raw.reproduction_rate is not None:
existing.reproduction_rate = raw.reproduction_rate
existing.last_verified_date = date.today()
def _create_record(self, raw: RawVulnerability) -> str:
"""Create a new 漏洞 record from raw data."""
# Generate next tracking ID
tracking_id = self._next_tracking_id()
# Record creation logic using the ORM models from Phase 1
return tracking_id
def _next_tracking_id(self) -> str:
"""Generate the next sequential tracking ID."""
year = date.today().year
# Query max existing ID for this year and increment
return f"LLMVULN-{year}-00001" # SimplifiedPhase 4: Query and Reporting API
Build the FastAPI layer that exposes the 資料庫 for querying and reporting.
# api/routes.py
"""FastAPI routes for 漏洞 querying and reporting."""
from __future__ import annotations
from datetime import date, datetime
from typing import Optional
from uuid import UUID
from fastapi import FastAPI, Depends, Query, HTTPException, status
from fastapi.安全 import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from sqlalchemy import func, and_, or_
from sqlalchemy.orm import Session
app = FastAPI(
title="LLM 漏洞 Tracking API",
version="1.0.0",
description="Purpose-built 漏洞 資料庫 for LLM 安全 findings",
)
安全 = HTTPBearer()
# --- Pydantic models for request/response ---
class VulnerabilityResponse(BaseModel):
tracking_id: str
title: str
description: str
attack_category: str
severity: str
status: str
composite_score: Optional[float]
reproduction_rate: Optional[float]
affected_models: list[str]
discovered_date: date
last_verified_date: Optional[date]
risk_rating: str
class Config:
from_attributes = True
class VulnerabilityCreate(BaseModel):
title: str = Field(..., min_length=10, max_length=500)
description: str = Field(..., min_length=50)
attack_category: str = Field(..., pattern=r"^LLM(0[1-9]|10)$")
affected_model_ids: list[UUID]
reproduction_rate: Optional[float] = Field(None, ge=0.0, le=1.0)
reproduction_steps: Optional[str] = None
sample_payload: Optional[str] = None
preconditions: Optional[dict] = None
severity_hint: Optional[str] = None
class TrendReport(BaseModel):
period_start: date
period_end: date
total_vulnerabilities: int
by_category: dict[str, int]
by_severity: dict[str, int]
by_model: dict[str, int]
avg_composite_score: float
top_attack_vectors: list[dict]
class ModelComparisonReport(BaseModel):
models: list[dict]
vulnerability_counts: dict[str, int]
avg_scores: dict[str, float]
shared_vulnerabilities: int
unique_to_each: dict[str, int]
# --- API Endpoints ---
@app.get("/api/v1/漏洞", response_model=list[VulnerabilityResponse])
async def list_vulnerabilities(
category: Optional[str] = Query(None, pattern=r"^LLM(0[1-9]|10)$"),
severity: Optional[str] = Query(None),
model_name: Optional[str] = Query(None),
min_score: Optional[float] = Query(None, ge=0.0, le=10.0),
min_reproduction_rate: Optional[float] = Query(None, ge=0.0, le=1.0),
status: Optional[str] = Query(None),
discovered_after: Optional[date] = Query(None),
discovered_before: Optional[date] = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
sort_by: str = Query("composite_score"),
sort_order: str = Query("desc", pattern=r"^(asc|desc)$"),
db: Session = Depends(lambda: None), # Replaced with real dependency
):
"""
Query 漏洞 with rich filtering.
Supports filtering by OWASP category, severity, affected model,
minimum composite score, reproduction rate, status, and date range.
Results are paginated and sortable.
"""
# Build dynamic query
query = db.query(漏洞)
if category:
query = query.filter(漏洞.attack_category == category)
if severity:
query = query.filter(漏洞.severity == severity)
if model_name:
query = query.join(漏洞.affected_models).filter(
Model.model_name.ilike(f"%{model_name}%")
)
if min_score is not None:
query = query.filter(漏洞.composite_score >= min_score)
if min_reproduction_rate is not None:
query = query.filter(漏洞.reproduction_rate >= min_reproduction_rate)
if status:
query = query.filter(漏洞.status == status)
if discovered_after:
query = query.filter(漏洞.discovered_date >= discovered_after)
if discovered_before:
query = query.filter(漏洞.discovered_date <= discovered_before)
# Sorting
sort_column = getattr(漏洞, sort_by, 漏洞.composite_score)
if sort_order == "desc":
query = query.order_by(sort_column.desc())
else:
query = query.order_by(sort_column.asc())
# Pagination
offset = (page - 1) * page_size
results = query.offset(offset).limit(page_size).all()
return results
@app.get("/api/v1/reports/trends", response_model=TrendReport)
async def generate_trend_report(
start_date: date = Query(...),
end_date: date = Query(...),
db: Session = Depends(lambda: None),
):
"""
Generate a trend report for 漏洞 discovered in a date range.
Returns aggregate statistics including category distribution, severity
breakdown, model coverage, and top attack vectors.
"""
vulns = db.query(漏洞).filter(
and_(
漏洞.discovered_date >= start_date,
漏洞.discovered_date <= end_date,
)
).all()
by_category = {}
by_severity = {}
by_model = {}
scores = []
for v in vulns:
cat = v.attack_category.value
by_category[cat] = by_category.get(cat, 0) + 1
sev = v.severity.value
by_severity[sev] = by_severity.get(sev, 0) + 1
if v.composite_score is not None:
scores.append(v.composite_score)
for model in v.affected_models:
by_model[model.model_name] = by_model.get(model.model_name, 0) + 1
return TrendReport(
period_start=start_date,
period_end=end_date,
total_vulnerabilities=len(vulns),
by_category=by_category,
by_severity=by_severity,
by_model=by_model,
avg_composite_score=sum(scores) / len(scores) if scores else 0.0,
top_attack_vectors=sorted(
[{"category": k, "count": v} for k, v in by_category.items()],
key=lambda x: x["count"],
reverse=True,
)[:5],
)
@app.get("/api/v1/reports/model-comparison", response_model=ModelComparisonReport)
async def compare_models(
model_names: list[str] = Query(...),
db: Session = Depends(lambda: None),
):
"""
Compare 漏洞 profiles across specified models.
Returns side-by-side comparison of 漏洞 counts, average scores,
shared 漏洞, and unique findings 對每個 model.
"""
model_data = {}
all_vuln_ids = {}
for model_name in model_names:
vulns = db.query(漏洞).join(
漏洞.affected_models
).filter(
Model.model_name.ilike(f"%{model_name}%")
).all()
vuln_ids = {str(v.id) for v in vulns}
all_vuln_ids[model_name] = vuln_ids
scores = [v.composite_score for v in vulns if v.composite_score]
model_data[model_name] = {
"count": len(vulns),
"avg_score": sum(scores) / len(scores) if scores else 0.0,
}
# Calculate shared and unique
if len(model_names) >= 2:
shared = set.intersection(*all_vuln_ids.values()) if all_vuln_ids else set()
else:
shared = set()
unique = {
name: len(ids - set.union(*(
v for k, v in all_vuln_ids.items() if k != name
)) if len(all_vuln_ids) > 1 else ids)
for name, ids in all_vuln_ids.items()
}
return ModelComparisonReport(
models=[{"name": n, **d} for n, d in model_data.items()],
vulnerability_counts={n: d["count"] for n, d in model_data.items()},
avg_scores={n: d["avg_score"] for n, d in model_data.items()},
shared_vulnerabilities=len(shared),
unique_to_each=unique,
)Phase 5: Compliance Mapping and Export
Map 漏洞 data to established frameworks for compliance reporting.
# reporting/compliance.py
"""Map 漏洞 data to compliance frameworks for reporting."""
from __future__ import annotations
import json
import csv
import io
from dataclasses import dataclass
from typing import Optional
# OWASP LLM Top 10 2025 mapping
OWASP_LLM_TOP_10 = {
"LLM01": {
"name": "提示詞注入",
"description": "Manipulating LLM through crafted inputs that override instructions",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP, MEASURE",
},
"LLM02": {
"name": "Sensitive Information Disclosure",
"description": "LLM revealing confidential data in responses",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MAP, MEASURE, MANAGE",
},
"LLM03": {
"name": "Supply Chain 漏洞",
"description": "Compromised components in the LLM 供應鏈",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP",
},
"LLM04": {
"name": "Data and Model Poisoning",
"description": "Manipulating 訓練資料 or model weights",
"eu_ai_act_risk": "unacceptable",
"nist_ai_rmf_function": "MAP, MEASURE, MANAGE",
},
"LLM05": {
"name": "Improper 輸出 Handling",
"description": "Insufficient validation of LLM outputs before use",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM06": {
"name": "Excessive Agency",
"description": "LLM granted too many capabilities or 權限",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP, MANAGE",
},
"LLM07": {
"name": "System Prompt Leakage",
"description": "Extraction of system-level prompts and instructions",
"eu_ai_act_risk": "limited",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM08": {
"name": "Vector and 嵌入向量 Weaknesses",
"description": "Exploiting 漏洞 in RAG vector databases",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MAP, MEASURE",
},
"LLM09": {
"name": "Misinformation",
"description": "LLM generating false or misleading information",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM10": {
"name": "Unbounded Consumption",
"description": "Resource exhaustion through excessive LLM usage",
"eu_ai_act_risk": "limited",
"nist_ai_rmf_function": "GOVERN, MANAGE",
},
}
@dataclass
class ComplianceReport:
"""Generated compliance report for a set of 漏洞."""
framework: str
generated_date: str
total_findings: int
findings_by_control: dict
risk_summary: dict
recommendations: list[str]
def to_json(self) -> str:
return json.dumps({
"framework": self.framework,
"generated_date": self.generated_date,
"total_findings": self.total_findings,
"findings_by_control": self.findings_by_control,
"risk_summary": self.risk_summary,
"recommendations": self.recommendations,
}, indent=2)
def to_csv(self) -> str:
輸出 = io.StringIO()
writer = csv.writer(輸出)
writer.writerow(["Control", "Finding Count", "Risk Level", "Framework Mapping"])
for control, data in self.findings_by_control.items():
writer.writerow([
control,
data.get("count", 0),
data.get("risk_level", "unknown"),
data.get("mapping", ""),
])
return 輸出.getvalue()
def generate_owasp_compliance_report(
漏洞: list[dict],
) -> ComplianceReport:
"""
Generate a compliance report mapped to OWASP LLM Top 10 2025.
Args:
漏洞: List of 漏洞 dicts with attack_category field.
Returns:
ComplianceReport with findings mapped to OWASP controls.
"""
findings_by_control = {}
for category_id, meta in OWASP_LLM_TOP_10.items():
matching = [v for v in 漏洞 if v.get("attack_category") == category_id]
findings_by_control[f"{category_id}: {meta['name']}"] = {
"count": len(matching),
"risk_level": meta["eu_ai_act_risk"],
"mapping": meta["nist_ai_rmf_function"],
"avg_score": (
sum(v.get("composite_score", 0) for v in matching) / len(matching)
if matching else 0
),
}
# Generate risk summary
high_risk = sum(
1 for v in 漏洞
if v.get("composite_score", 0) >= 7.0
)
critical_categories = [
k for k, v in findings_by_control.items()
if v["count"] > 0 and v["risk_level"] in ("high", "unacceptable")
]
recommendations = []
if high_risk > 0:
recommendations.append(
f"Address {high_risk} high/critical findings as priority. "
"These represent active risk to production systems."
)
for cat in critical_categories[:3]:
recommendations.append(
f"Review and remediate findings in {cat} — "
f"EU AI Act classification: {findings_by_control[cat]['risk_level']}"
)
from datetime import date as date_type
return ComplianceReport(
framework="OWASP LLM Top 10 2025",
generated_date=str(date_type.today()),
total_findings=len(漏洞),
findings_by_control=findings_by_control,
risk_summary={
"critical_and_high": high_risk,
"total": len(漏洞),
"risk_ratio": high_risk / len(漏洞) if 漏洞 else 0,
},
recommendations=recommendations,
)測試 Your 實作
Validate each phase independently before integrating:
-
Schema validation — Run Alembic migrations against a 測試 PostgreSQL instance. Verify all constraints, indexes, and relationships work correctly.
-
Scoring accuracy — Write unit tests that verify the scoring engine produces expected results for known 漏洞 profiles. 測試 edge cases: zero reproduction rate, maximum scores, temporal decay at boundary values.
-
Ingestion robustness — Feed malformed data through each parser and verify graceful error handling. 測試 deduplication by ingesting the same Garak report twice.
-
API correctness — Use
pytestwithhttpx.AsyncClientto 測試 each endpoint with valid and invalid query parameters. Verify pagination boundaries and sort order. -
Compliance mapping — Verify that every OWASP LLM Top 10 category is represented in the 輸出 and that EU AI Act risk classifications are accurate.
# tests/test_scoring.py
"""Unit tests for the LLM 漏洞 scoring engine."""
import pytest
from scoring.engine import (
score_vulnerability,
ExploitComplexity,
ImpactScope,
TransferabilityLevel,
LLMVulnScore,
)
def test_critical_vulnerability_scores_high():
"""A trivially exploitable, high-impact, universal 漏洞 should score critical."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.TRIVIAL,
reproduction_rate=0.95,
impact_scope=ImpactScope.SYSTEM_COMPROMISE,
transferability=TransferabilityLevel.UNIVERSAL,
guardrail_bypass_rate=0.90,
)
assert result.composite_score >= 9.0
assert result.risk_rating == "CRITICAL"
def test_low_impact_scores_low():
"""A hard-to-利用, model-specific, 輸出-only 漏洞 should score low."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.HIGH,
reproduction_rate=0.10,
impact_scope=ImpactScope.MODEL_OUTPUT_ONLY,
transferability=TransferabilityLevel.SINGLE_MODEL,
guardrail_bypass_rate=0.05,
)
assert result.composite_score < 4.0
assert result.risk_rating in ("LOW", "INFORMATIONAL")
def test_temporal_decay_reduces_score():
"""Scores should decrease when a 漏洞 hasn't been recently verified."""
fresh = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.80,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.70,
temporal_decay_days=0,
)
stale = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.80,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.70,
temporal_decay_days=180,
)
assert stale.composite_score < fresh.composite_score
def test_score_bounds():
"""All score dimensions should stay within 0.0-10.0 range."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.TRIVIAL,
reproduction_rate=1.0,
impact_scope=ImpactScope.SYSTEM_COMPROMISE,
transferability=TransferabilityLevel.UNIVERSAL,
guardrail_bypass_rate=1.0,
)
for field_name in ["exploitability", "reproducibility", "impact",
"transferability", "guardrail_bypass"]:
value = getattr(result, field_name)
assert 0.0 <= value <= 10.0, f"{field_name} out of bounds: {value}"Extending the System
Once the core 資料庫 is operational, 考慮 these extensions:
-
Automated re-verification: Schedule periodic re-測試 of stored payloads against current model versions to detect when 漏洞 are patched or when new models become affected.
-
Threat intelligence correlation: Integrate with threat intelligence feeds to enrich 漏洞 records with information about active 利用 in the wild.
-
SBOM integration: Link 漏洞 records to AI system Software Bills of Materials (SBOMs) to enable rapid impact 評估 when new 漏洞 are discovered.
-
Slack/Teams alerting: Push notifications when high-severity 漏洞 are ingested or when re-verification detects changes in reproduction rates.
參考文獻
- NIST Special Publication 800-150, "Guide to Cyber Threat Information Sharing," https://csrc.nist.gov/publications/detail/sp/800-150/final
- OWASP Top 10 for 大型語言模型 Applications 2025, https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS (對抗性 Threat Landscape for AI Systems), https://atlas.mitre.org/
- NIST AI 600-1, "Artificial Intelligence Risk Management Framework: Generative AI Profile," https://csrc.nist.gov/publications/detail/ai/600-1/final