Capstone: Bouw een database voor het volgen van LLM-kwetsbaarheden
Ontwerp en implementeer een gestructureerde database voor het volgen van kwetsbaarheden om LLM-specifieke beveiligingszwakheden over modellen en deployments heen te catalogiseren, te scoren en te bevragen.
Overzicht
Traditionele tools voor vulnerability management zoals CVE-databases en NVD zijn ontworpen voor softwarefouten met duidelijke versiegrenzen en binaire patchstatussen. LLM-kwetsbaarheden breken met deze aannames. Een prompt-injectietechniek kan werken op GPT-4 maar niet op Claude 3.5, slagen bij temperature 0.9 maar mislukken bij 0.0, en gemitigeerd worden door een guardrail die zelf nieuw aanvalsoppervlak introduceert. Het versieconcept is nauwelijks van toepassing wanneer modelaanbieders gewichten bijwerken zonder publieke changelogs.
Dit capstone-project geeft je de opdracht om een speciaal gebouwd systeem voor het volgen van kwetsbaarheden te bouwen dat rekening houdt met de probabilistische, modelafhankelijke en snel evoluerende aard van LLM-beveiligingszwakheden. Je database catalogiseert kwetsbaarheden met rijke metadata — getroffen modellen, aanvalsvereisten, reproductiepercentages, guardrail-interacties en temporele geldigheid — waardoor beveiligingsteams weloverwogen prioriteringsbeslissingen kunnen nemen.
Het systeem put uit gevestigde praktijken voor vulnerability management gedocumenteerd in NIST SP 800-150 (Guide to Cyber Threat Information Sharing) en breidt deze uit met LLM-specifieke dimensies. Aan het einde van dit project beschik je over een functionele tool die een red team kan gebruiken om bevindingen over engagements heen te volgen en die een blue team kan gebruiken om defensieve investeringen te prioriteren.
De architectuur omvat drie lagen: een PostgreSQL-database met een schema dat is geoptimaliseerd voor LLM-kwetsbaarheidsmetadata, een Python-API-laag die ingestie en bevraging afhandelt, en een rapportagemodule die bruikbare intelligence genereert uit de verzamelde gegevens.
Projectvereisten
Architectuuroverzicht
Het systeem werkt als een three-tier-applicatie met een duidelijke scheiding van verantwoordelijkheden:
Data Sources → Ingestion Layer → PostgreSQL Database
↓
API Layer (FastAPI)
↓
Query / Reporting Interface
Functionele vereisten
-
Kwetsbaarheidsschema — Een genormaliseerd schema dat LLM-specifieke velden vastlegt die verder gaan dan traditionele CVE-attributen: getroffen modelfamilies, aanvalscategorie (gemapt op OWASP LLM Top 10 en MITRE ATLAS), reproductiepercentage, vereiste preconditities, details over guardrail-omzeiling en temporele geldigheidsvensters.
-
Score-engine — Een LLM-aangepast scoresysteem dat CVSS uitbreidt met dimensies voor promptgevoeligheid, outputconsistentie, guardrail-interactie en cross-model-overdraagbaarheid.
-
Ingestiepijplijn — Parsers voor meerdere invoerformaten: gestructureerde JSON van geautomatiseerde scantools (Garak, Promptfoo), semigestructureerde markdown uit handmatige red team-rapporten, en CVE/NVD-feeds voor traditionele softwarekwetsbaarheden in LLM-tooling.
-
Query-interface — Een RESTful API die complexe queries ondersteunt: "toon alle prompt-injectiekwetsbaarheden die GPT-4 treffen en die contentfiltering omzeilen met een reproductiepercentage boven 70%."
-
Rapportagemodule — Genereer trendrapporten, modelvergelijkingsmatrices en compliance-gemapte samenvattingen (OWASP, MITRE ATLAS, EU AI Act-risicocategorieën).
Niet-functionele vereisten
- De database moet minstens 100.000 kwetsbaarheidsrecords aankunnen met queryprestaties van minder dan een seconde.
- Alle gegevenswijzigingen moeten auditeerbaar zijn met een volledige wijzigingsgeschiedenis.
- De API moet rolgebaseerde toegangscontrole ondersteunen die alleen-lezen analisten scheidt van red teamers met schrijftoegang.
- Exportformaten moeten JSON, CSV en PDF omvatten voor integratie met bestaande GRC-tools.
Implementatiegids
Fase 1: Ontwerp van het databaseschema
Begin met het kernschema dat LLM-kwetsbaarheden modelleert als first-class entiteiten met rijke metadata.
# models/schema.py
"""SQLAlchemy models for the LLM vulnerability tracking database."""
from __future__ import annotations
import enum
from datetime import datetime, date
from typing import Optional
from uuid import uuid4
from sqlalchemy import (
Column, String, Text, Float, Integer, Boolean,
DateTime, Date, Enum, ForeignKey, Table, Index,
CheckConstraint, UniqueConstraint, JSON
)
from sqlalchemy.dialects.postgresql import UUID, ARRAY, JSONB
from sqlalchemy.orm import DeclarativeBase, relationship, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class SeverityLevel(enum.Enum):
"""LLM-specific severity levels extending traditional CVSS."""
INFORMATIONAL = "informational"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AttackCategory(enum.Enum):
"""Top-level attack categories aligned with OWASP LLM Top 10 2025."""
PROMPT_INJECTION = "LLM01"
SENSITIVE_INFO_DISCLOSURE = "LLM02"
SUPPLY_CHAIN = "LLM03"
DATA_MODEL_POISONING = "LLM04"
IMPROPER_OUTPUT_HANDLING = "LLM05"
EXCESSIVE_AGENCY = "LLM06"
SYSTEM_PROMPT_LEAKAGE = "LLM07"
VECTOR_EMBEDDING_WEAKNESS = "LLM08"
MISINFORMATION = "LLM09"
UNBOUNDED_CONSUMPTION = "LLM10"
class VulnStatus(enum.Enum):
"""Lifecycle status of a vulnerability entry."""
DRAFT = "draft"
CONFIRMED = "confirmed"
MITIGATED = "mitigated"
RESOLVED = "resolved"
WONT_FIX = "wont_fix"
EXPIRED = "expired"
# Association table for many-to-many: vulnerabilities <-> affected models
vuln_model_association = Table(
"vuln_model_association",
Base.metadata,
Column("vulnerability_id", UUID(as_uuid=True), ForeignKey("vulnerabilities.id")),
Column("model_id", UUID(as_uuid=True), ForeignKey("models.id")),
)
# Association table for many-to-many: vulnerabilities <-> mitre techniques
vuln_mitre_association = Table(
"vuln_mitre_association",
Base.metadata,
Column("vulnerability_id", UUID(as_uuid=True), ForeignKey("vulnerabilities.id")),
Column("technique_id", UUID(as_uuid=True), ForeignKey("mitre_techniques.id")),
)
class Vulnerability(Base):
"""Core vulnerability record with LLM-specific metadata."""
__tablename__ = "vulnerabilities"
id: Mapped[str] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid4
)
# Human-readable identifier: LLMVULN-2026-00001
tracking_id: Mapped[str] = mapped_column(
String(30), unique=True, nullable=False
)
title: Mapped[str] = mapped_column(String(500), nullable=False)
description: Mapped[str] = mapped_column(Text, nullable=False)
attack_category: Mapped[AttackCategory] = mapped_column(
Enum(AttackCategory), nullable=False, index=True
)
severity: Mapped[SeverityLevel] = mapped_column(
Enum(SeverityLevel), nullable=False, index=True
)
status: Mapped[VulnStatus] = mapped_column(
Enum(VulnStatus), default=VulnStatus.DRAFT, nullable=False
)
# LLM-specific scoring dimensions (0.0 to 10.0 each)
llm_score_exploitability: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_exploitability BETWEEN 0.0 AND 10.0")
)
llm_score_reproducibility: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_reproducibility BETWEEN 0.0 AND 10.0")
)
llm_score_impact: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_impact BETWEEN 0.0 AND 10.0")
)
llm_score_transferability: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_transferability BETWEEN 0.0 AND 10.0")
)
llm_score_guardrail_bypass: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_guardrail_bypass BETWEEN 0.0 AND 10.0")
)
composite_score: Mapped[float] = mapped_column(Float, nullable=True)
# Reproduction metadata
reproduction_rate: Mapped[Optional[float]] = mapped_column(
Float, CheckConstraint("reproduction_rate BETWEEN 0.0 AND 1.0"),
nullable=True
)
reproduction_steps: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
sample_payload: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# Temporal validity
discovered_date: Mapped[date] = mapped_column(Date, nullable=False)
last_verified_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
expiry_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
# Preconditions and context
preconditions: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
temperature_range: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
requires_system_prompt: Mapped[bool] = mapped_column(Boolean, default=False)
requires_tool_access: Mapped[bool] = mapped_column(Boolean, default=False)
requires_rag: Mapped[bool] = mapped_column(Boolean, default=False)
# External references
cve_id: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
external_references: Mapped[Optional[list]] = mapped_column(JSONB, nullable=True)
# Audit fields
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
created_by: Mapped[str] = mapped_column(String(100), nullable=False)
# Relationships
affected_models = relationship(
"Model", secondary=vuln_model_association, back_populates="vulnerabilities"
)
mitre_techniques = relationship(
"MITRETechnique", secondary=vuln_mitre_association, back_populates="vulnerabilities"
)
audit_log = relationship("AuditEntry", back_populates="vulnerability")
guardrail_interactions = relationship(
"GuardrailInteraction", back_populates="vulnerability"
)
__table_args__ = (
Index("idx_vuln_category_severity", "attack_category", "severity"),
Index("idx_vuln_status_date", "status", "discovered_date"),
Index("idx_vuln_composite_score", "composite_score"),
)
class Model(Base):
"""Represents an LLM model or model family that can be affected by vulnerabilities."""
__tablename__ = "models"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
provider: Mapped[str] = mapped_column(String(100), nullable=False, index=True)
model_family: Mapped[str] = mapped_column(String(100), nullable=False)
model_name: Mapped[str] = mapped_column(String(200), nullable=False)
model_version: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
release_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
is_open_source: Mapped[bool] = mapped_column(Boolean, default=False)
parameter_count: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
context_window: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
capabilities: Mapped[Optional[list]] = mapped_column(
ARRAY(String), nullable=True
)
vulnerabilities = relationship(
"Vulnerability", secondary=vuln_model_association, back_populates="affected_models"
)
__table_args__ = (
UniqueConstraint("provider", "model_name", "model_version",
name="uq_model_identity"),
)
class MITRETechnique(Base):
"""MITRE ATLAS technique mapping for vulnerability classification."""
__tablename__ = "mitre_techniques"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
technique_id: Mapped[str] = mapped_column(String(20), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(200), nullable=False)
tactic: Mapped[str] = mapped_column(String(100), nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
vulnerabilities = relationship(
"Vulnerability", secondary=vuln_mitre_association, back_populates="mitre_techniques"
)
class GuardrailInteraction(Base):
"""Records how a vulnerability interacts with specific guardrail configurations."""
__tablename__ = "guardrail_interactions"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
vulnerability_id: Mapped[str] = mapped_column(
UUID(as_uuid=True), ForeignKey("vulnerabilities.id"), nullable=False
)
guardrail_type: Mapped[str] = mapped_column(String(100), nullable=False)
guardrail_provider: Mapped[str] = mapped_column(String(100), nullable=False)
bypass_successful: Mapped[bool] = mapped_column(Boolean, nullable=False)
bypass_rate: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
bypass_technique: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
tested_date: Mapped[date] = mapped_column(Date, nullable=False)
vulnerability = relationship("Vulnerability", back_populates="guardrail_interactions")
class AuditEntry(Base):
"""Immutable audit log for all vulnerability record changes."""
__tablename__ = "audit_log"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
vulnerability_id: Mapped[str] = mapped_column(
UUID(as_uuid=True), ForeignKey("vulnerabilities.id"), nullable=False
)
action: Mapped[str] = mapped_column(String(50), nullable=False)
changed_fields: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
previous_values: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
changed_by: Mapped[str] = mapped_column(String(100), nullable=False)
changed_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, nullable=False
)
vulnerability = relationship("Vulnerability", back_populates="audit_log")
__table_args__ = (
Index("idx_audit_vuln_date", "vulnerability_id", "changed_at"),
)Dit schema scheidt verantwoordelijkheden in afzonderlijke tabellen en behoudt tegelijkertijd de relaties die nodig zijn voor complexe queries. De tabel Vulnerability is de centrale entiteit met LLM-specifieke scoredimensies die verder gaan dan wat CVSS biedt.
Fase 2: LLM-aangepaste score-engine
De score-engine vertaalt ruwe kwetsbaarheidskenmerken naar een samengestelde score die beveiligingsteams kunnen gebruiken voor prioritering.
# scoring/engine.py
"""LLM-adapted vulnerability scoring engine extending CVSS concepts."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class ExploitComplexity(Enum):
"""How complex the attack setup is."""
TRIVIAL = 1.0 # Copy-paste prompt, no special setup
LOW = 0.75 # Requires basic prompt engineering knowledge
MEDIUM = 0.5 # Requires model-specific tuning or multi-step
HIGH = 0.25 # Requires significant research or infrastructure
VERY_HIGH = 0.1 # Requires training pipeline access or novel research
class ImpactScope(Enum):
"""What the vulnerability can affect."""
MODEL_OUTPUT_ONLY = 0.3 # Only affects text generation quality
DATA_EXPOSURE = 0.6 # Can leak system prompts or user data
TOOL_EXECUTION = 0.8 # Can trigger unauthorized tool/API calls
SYSTEM_COMPROMISE = 1.0 # Can lead to RCE or infrastructure access
class TransferabilityLevel(Enum):
"""Whether the vulnerability works across models."""
SINGLE_MODEL = 0.2 # Only works on one specific model version
MODEL_FAMILY = 0.5 # Works across versions of same model family
CROSS_PROVIDER = 0.8 # Works on models from different providers
UNIVERSAL = 1.0 # Works on virtually all LLMs
@dataclass
class LLMVulnScore:
"""Complete scoring breakdown for an LLM vulnerability."""
# Input dimensions (0.0 to 10.0 each)
exploitability: float
reproducibility: float
impact: float
transferability: float
guardrail_bypass: float
# Computed fields
composite_score: float = 0.0
risk_rating: str = ""
priority_rank: int = 0
# Weight configuration
weights: dict = field(default_factory=lambda: {
"exploitability": 0.25,
"reproducibility": 0.15,
"impact": 0.30,
"transferability": 0.15,
"guardrail_bypass": 0.15,
})
def __post_init__(self):
self.composite_score = self.calculate_composite()
self.risk_rating = self.determine_risk_rating()
def calculate_composite(self) -> float:
"""Calculate weighted composite score."""
raw = (
self.exploitability * self.weights["exploitability"]
+ self.reproducibility * self.weights["reproducibility"]
+ self.impact * self.weights["impact"]
+ self.transferability * self.weights["transferability"]
+ self.guardrail_bypass * self.weights["guardrail_bypass"]
)
return round(raw, 2)
def determine_risk_rating(self) -> str:
"""Map composite score to risk rating."""
if self.composite_score >= 9.0:
return "CRITICAL"
elif self.composite_score >= 7.0:
return "HIGH"
elif self.composite_score >= 4.0:
return "MEDIUM"
elif self.composite_score >= 2.0:
return "LOW"
return "INFORMATIONAL"
def score_vulnerability(
exploit_complexity: ExploitComplexity,
reproduction_rate: float,
impact_scope: ImpactScope,
transferability: TransferabilityLevel,
guardrail_bypass_rate: float,
temporal_decay_days: int = 0,
) -> LLMVulnScore:
"""
Score a vulnerability using LLM-specific dimensions.
Args:
exploit_complexity: How difficult the attack is to execute.
reproduction_rate: Fraction of attempts that succeed (0.0-1.0).
impact_scope: What the vulnerability can affect.
transferability: Whether it works across models.
guardrail_bypass_rate: Rate at which it defeats guardrails (0.0-1.0).
temporal_decay_days: Days since last verification (reduces confidence).
Returns:
LLMVulnScore with composite score and risk rating.
"""
# Convert enum values to 0-10 scale
exploitability = exploit_complexity.value * 10.0
reproducibility = reproduction_rate * 10.0
impact = impact_scope.value * 10.0
transferability_score = transferability.value * 10.0
guardrail_score = guardrail_bypass_rate * 10.0
# Apply temporal decay: scores degrade if not recently verified
if temporal_decay_days > 0:
decay_factor = max(0.5, 1.0 - (temporal_decay_days / 365.0) * 0.5)
reproducibility *= decay_factor
guardrail_score *= decay_factor
return LLMVulnScore(
exploitability=round(exploitability, 2),
reproducibility=round(reproducibility, 2),
impact=round(impact, 2),
transferability=round(transferability_score, 2),
guardrail_bypass=round(guardrail_score, 2),
)
# Example usage demonstrating scoring for a real vulnerability pattern
if __name__ == "__main__":
# Score a cross-model prompt injection that bypasses content filters
result = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.85,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.60,
temporal_decay_days=30,
)
print(f"Composite Score: {result.composite_score}/10.0")
print(f"Risk Rating: {result.risk_rating}")
print(f" Exploitability: {result.exploitability}")
print(f" Reproducibility: {result.reproducibility}")
print(f" Impact: {result.impact}")
print(f" Transferability: {result.transferability}")
print(f" Guardrail Bypass: {result.guardrail_bypass}")Fase 3: Ingestiepijplijn
De ingestielaag verwerkt gegevens uit meerdere bronnen en normaliseert deze naar het databaseschema.
# ingestion/pipeline.py
"""Ingestion pipeline for vulnerability data from multiple sources."""
from __future__ import annotations
import json
import re
import hashlib
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Optional
from sqlalchemy.orm import Session
@dataclass
class RawVulnerability:
"""Normalized intermediate representation of a vulnerability."""
title: str
description: str
attack_category: str
affected_models: list[str]
reproduction_rate: Optional[float]
reproduction_steps: Optional[str]
sample_payload: Optional[str]
discovered_date: date
source: str
source_id: str
external_references: list[dict]
preconditions: Optional[dict] = None
severity_hint: Optional[str] = None
class IngestionSource(ABC):
"""Abstract base for vulnerability data sources."""
@abstractmethod
def parse(self, data: str | dict) -> list[RawVulnerability]:
"""Parse raw data into normalized vulnerability records."""
...
@abstractmethod
def source_name(self) -> str:
"""Identifier for this data source."""
...
class GarakResultParser(IngestionSource):
"""Parse vulnerability findings from Garak scan output."""
def source_name(self) -> str:
return "garak"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, str):
data = json.loads(data)
vulnerabilities = []
for finding in data.get("findings", []):
# Map Garak probe categories to OWASP LLM categories
category = self._map_garak_category(finding.get("probe_type", ""))
reproduction_rate = finding.get("success_rate", 0.0)
if reproduction_rate < 0.01:
continue # Skip findings with near-zero success rate
vuln = RawVulnerability(
title=f"Garak: {finding.get('probe_name', 'Unknown probe')}",
description=finding.get("description", ""),
attack_category=category,
affected_models=[finding.get("model", "unknown")],
reproduction_rate=reproduction_rate,
reproduction_steps=finding.get("sample_interaction", ""),
sample_payload=finding.get("successful_payload", ""),
discovered_date=date.today(),
source="garak",
source_id=self._generate_source_id(finding),
external_references=[{
"type": "tool_output",
"url": "https://github.com/NVIDIA/garak",
"tool_version": data.get("garak_version", "unknown"),
}],
)
vulnerabilities.append(vuln)
return vulnerabilities
def _map_garak_category(self, probe_type: str) -> str:
"""Map Garak probe types to OWASP LLM Top 10 categories."""
mapping = {
"prompt_injection": "LLM01",
"data_leak": "LLM02",
"hallucination": "LLM09",
"toxicity": "LLM05",
"encoding": "LLM01",
"dan": "LLM01",
}
for key, value in mapping.items():
if key in probe_type.lower():
return value
return "LLM01" # Default to prompt injection
def _generate_source_id(self, finding: dict) -> str:
"""Generate a deterministic ID for deduplication."""
raw = f"garak:{finding.get('probe_name', '')}:{finding.get('model', '')}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
class PromptfooResultParser(IngestionSource):
"""Parse vulnerability findings from Promptfoo evaluation output."""
def source_name(self) -> str:
return "promptfoo"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, str):
data = json.loads(data)
vulnerabilities = []
results = data.get("results", {}).get("results", [])
for result in results:
if result.get("success", True):
continue # Only capture failures (successful attacks)
provider = result.get("provider", {})
model = provider.get("id", "unknown") if isinstance(provider, dict) else str(provider)
vuln = RawVulnerability(
title=f"Promptfoo: {result.get('testCase', {}).get('description', 'Failed assertion')}",
description=result.get("error", result.get("output", "")),
attack_category=self._infer_category(result),
affected_models=[model],
reproduction_rate=None, # Promptfoo doesn't provide this directly
reproduction_steps=json.dumps(result.get("testCase", {}), indent=2),
sample_payload=result.get("prompt", ""),
discovered_date=date.today(),
source="promptfoo",
source_id=self._generate_source_id(result),
external_references=[{
"type": "tool_output",
"url": "https://github.com/promptfoo/promptfoo",
}],
)
vulnerabilities.append(vuln)
return vulnerabilities
def _infer_category(self, result: dict) -> str:
"""Infer OWASP LLM category from Promptfoo test metadata."""
test_case = result.get("testCase", {})
tags = test_case.get("metadata", {}).get("tags", [])
tag_mapping = {
"injection": "LLM01", "prompt-injection": "LLM01",
"pii": "LLM02", "data-leak": "LLM02",
"harmful": "LLM05", "toxicity": "LLM05",
"hallucination": "LLM09",
}
for tag in tags:
if tag.lower() in tag_mapping:
return tag_mapping[tag.lower()]
return "LLM01"
def _generate_source_id(self, result: dict) -> str:
raw = f"promptfoo:{result.get('testCase', {}).get('description', '')}:{result.get('provider', '')}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
class ManualReportParser(IngestionSource):
"""Parse semi-structured markdown red team reports."""
def source_name(self) -> str:
return "manual_report"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, dict):
data = json.dumps(data)
vulnerabilities = []
# Split markdown by H2 headers to find individual findings
sections = re.split(r'^## ', data, flags=re.MULTILINE)
for section in sections[1:]: # Skip preamble
lines = section.strip().split('\n')
title = lines[0].strip()
body = '\n'.join(lines[1:]).strip()
# Extract structured fields from body using patterns
severity = self._extract_field(body, r'\*\*Severity\*\*:\s*(\w+)')
category = self._extract_field(body, r'\*\*Category\*\*:\s*(LLM\d{2})')
models_raw = self._extract_field(body, r'\*\*Affected Models\*\*:\s*(.+)')
repro_rate = self._extract_field(body, r'\*\*Reproduction Rate\*\*:\s*([\d.]+)')
affected_models = []
if models_raw:
affected_models = [m.strip() for m in models_raw.split(',')]
vuln = RawVulnerability(
title=title,
description=body,
attack_category=category or "LLM01",
affected_models=affected_models,
reproduction_rate=float(repro_rate) if repro_rate else None,
reproduction_steps=self._extract_section(body, "Reproduction Steps"),
sample_payload=self._extract_section(body, "Payload"),
discovered_date=date.today(),
source="manual_report",
source_id=hashlib.sha256(title.encode()).hexdigest()[:16],
external_references=[],
severity_hint=severity,
)
vulnerabilities.append(vuln)
return vulnerabilities
def _extract_field(self, text: str, pattern: str) -> Optional[str]:
match = re.search(pattern, text)
return match.group(1) if match else None
def _extract_section(self, text: str, header: str) -> Optional[str]:
pattern = rf'### {header}\s*\n(.*?)(?=\n### |\Z)'
match = re.search(pattern, text, re.DOTALL)
return match.group(1).strip() if match else None
class IngestionPipeline:
"""Orchestrates vulnerability ingestion from multiple sources."""
def __init__(self, session: Session):
self.session = session
self.parsers: dict[str, IngestionSource] = {}
self._register_default_parsers()
def _register_default_parsers(self):
self.register_parser(GarakResultParser())
self.register_parser(PromptfooResultParser())
self.register_parser(ManualReportParser())
def register_parser(self, parser: IngestionSource):
self.parsers[parser.source_name()] = parser
def ingest(self, source_name: str, data: str | dict) -> list[str]:
"""
Ingest data from a named source and return tracking IDs of created records.
Args:
source_name: Registered parser name (garak, promptfoo, manual_report).
data: Raw data from the source.
Returns:
List of tracking IDs for newly created vulnerability records.
"""
parser = self.parsers.get(source_name)
if parser is None:
raise ValueError(f"No parser registered for source: {source_name}")
raw_vulns = parser.parse(data)
tracking_ids = []
for raw_vuln in raw_vulns:
# Deduplication check
existing = self._find_duplicate(raw_vuln)
if existing:
self._update_existing(existing, raw_vuln)
tracking_ids.append(existing.tracking_id)
continue
tracking_id = self._create_record(raw_vuln)
tracking_ids.append(tracking_id)
self.session.commit()
return tracking_ids
def _find_duplicate(self, raw: RawVulnerability):
"""Check for existing vulnerability with same source ID."""
# Implementation queries database for matching source_id
# in external_references JSONB field
return None # Simplified for illustration
def _update_existing(self, existing, raw: RawVulnerability):
"""Merge new data into existing record."""
if raw.reproduction_rate is not None:
existing.reproduction_rate = raw.reproduction_rate
existing.last_verified_date = date.today()
def _create_record(self, raw: RawVulnerability) -> str:
"""Create a new vulnerability record from raw data."""
# Generate next tracking ID
tracking_id = self._next_tracking_id()
# Record creation logic using the ORM models from Phase 1
return tracking_id
def _next_tracking_id(self) -> str:
"""Generate the next sequential tracking ID."""
year = date.today().year
# Query max existing ID for this year and increment
return f"LLMVULN-{year}-00001" # SimplifiedFase 4: Query- en rapportage-API
Bouw de FastAPI-laag die de database blootstelt voor bevraging en rapportage.
# api/routes.py
"""FastAPI routes for vulnerability querying and reporting."""
from __future__ import annotations
from datetime import date, datetime
from typing import Optional
from uuid import UUID
from fastapi import FastAPI, Depends, Query, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from sqlalchemy import func, and_, or_
from sqlalchemy.orm import Session
app = FastAPI(
title="LLM Vulnerability Tracking API",
version="1.0.0",
description="Purpose-built vulnerability database for LLM security findings",
)
security = HTTPBearer()
# --- Pydantic models for request/response ---
class VulnerabilityResponse(BaseModel):
tracking_id: str
title: str
description: str
attack_category: str
severity: str
status: str
composite_score: Optional[float]
reproduction_rate: Optional[float]
affected_models: list[str]
discovered_date: date
last_verified_date: Optional[date]
risk_rating: str
class Config:
from_attributes = True
class VulnerabilityCreate(BaseModel):
title: str = Field(..., min_length=10, max_length=500)
description: str = Field(..., min_length=50)
attack_category: str = Field(..., pattern=r"^LLM(0[1-9]|10)$")
affected_model_ids: list[UUID]
reproduction_rate: Optional[float] = Field(None, ge=0.0, le=1.0)
reproduction_steps: Optional[str] = None
sample_payload: Optional[str] = None
preconditions: Optional[dict] = None
severity_hint: Optional[str] = None
class TrendReport(BaseModel):
period_start: date
period_end: date
total_vulnerabilities: int
by_category: dict[str, int]
by_severity: dict[str, int]
by_model: dict[str, int]
avg_composite_score: float
top_attack_vectors: list[dict]
class ModelComparisonReport(BaseModel):
models: list[dict]
vulnerability_counts: dict[str, int]
avg_scores: dict[str, float]
shared_vulnerabilities: int
unique_to_each: dict[str, int]
# --- API Endpoints ---
@app.get("/api/v1/vulnerabilities", response_model=list[VulnerabilityResponse])
async def list_vulnerabilities(
category: Optional[str] = Query(None, pattern=r"^LLM(0[1-9]|10)$"),
severity: Optional[str] = Query(None),
model_name: Optional[str] = Query(None),
min_score: Optional[float] = Query(None, ge=0.0, le=10.0),
min_reproduction_rate: Optional[float] = Query(None, ge=0.0, le=1.0),
status: Optional[str] = Query(None),
discovered_after: Optional[date] = Query(None),
discovered_before: Optional[date] = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
sort_by: str = Query("composite_score"),
sort_order: str = Query("desc", pattern=r"^(asc|desc)$"),
db: Session = Depends(lambda: None), # Replaced with real dependency
):
"""
Query vulnerabilities with rich filtering.
Supports filtering by OWASP category, severity, affected model,
minimum composite score, reproduction rate, status, and date range.
Results are paginated and sortable.
"""
# Build dynamic query
query = db.query(Vulnerability)
if category:
query = query.filter(Vulnerability.attack_category == category)
if severity:
query = query.filter(Vulnerability.severity == severity)
if model_name:
query = query.join(Vulnerability.affected_models).filter(
Model.model_name.ilike(f"%{model_name}%")
)
if min_score is not None:
query = query.filter(Vulnerability.composite_score >= min_score)
if min_reproduction_rate is not None:
query = query.filter(Vulnerability.reproduction_rate >= min_reproduction_rate)
if status:
query = query.filter(Vulnerability.status == status)
if discovered_after:
query = query.filter(Vulnerability.discovered_date >= discovered_after)
if discovered_before:
query = query.filter(Vulnerability.discovered_date <= discovered_before)
# Sorting
sort_column = getattr(Vulnerability, sort_by, Vulnerability.composite_score)
if sort_order == "desc":
query = query.order_by(sort_column.desc())
else:
query = query.order_by(sort_column.asc())
# Pagination
offset = (page - 1) * page_size
results = query.offset(offset).limit(page_size).all()
return results
@app.get("/api/v1/reports/trends", response_model=TrendReport)
async def generate_trend_report(
start_date: date = Query(...),
end_date: date = Query(...),
db: Session = Depends(lambda: None),
):
"""
Generate a trend report for vulnerabilities discovered in a date range.
Returns aggregate statistics including category distribution, severity
breakdown, model coverage, and top attack vectors.
"""
vulns = db.query(Vulnerability).filter(
and_(
Vulnerability.discovered_date >= start_date,
Vulnerability.discovered_date <= end_date,
)
).all()
by_category = {}
by_severity = {}
by_model = {}
scores = []
for v in vulns:
cat = v.attack_category.value
by_category[cat] = by_category.get(cat, 0) + 1
sev = v.severity.value
by_severity[sev] = by_severity.get(sev, 0) + 1
if v.composite_score is not None:
scores.append(v.composite_score)
for model in v.affected_models:
by_model[model.model_name] = by_model.get(model.model_name, 0) + 1
return TrendReport(
period_start=start_date,
period_end=end_date,
total_vulnerabilities=len(vulns),
by_category=by_category,
by_severity=by_severity,
by_model=by_model,
avg_composite_score=sum(scores) / len(scores) if scores else 0.0,
top_attack_vectors=sorted(
[{"category": k, "count": v} for k, v in by_category.items()],
key=lambda x: x["count"],
reverse=True,
)[:5],
)
@app.get("/api/v1/reports/model-comparison", response_model=ModelComparisonReport)
async def compare_models(
model_names: list[str] = Query(...),
db: Session = Depends(lambda: None),
):
"""
Compare vulnerability profiles across specified models.
Returns side-by-side comparison of vulnerability counts, average scores,
shared vulnerabilities, and unique findings for each model.
"""
model_data = {}
all_vuln_ids = {}
for model_name in model_names:
vulns = db.query(Vulnerability).join(
Vulnerability.affected_models
).filter(
Model.model_name.ilike(f"%{model_name}%")
).all()
vuln_ids = {str(v.id) for v in vulns}
all_vuln_ids[model_name] = vuln_ids
scores = [v.composite_score for v in vulns if v.composite_score]
model_data[model_name] = {
"count": len(vulns),
"avg_score": sum(scores) / len(scores) if scores else 0.0,
}
# Calculate shared and unique
if len(model_names) >= 2:
shared = set.intersection(*all_vuln_ids.values()) if all_vuln_ids else set()
else:
shared = set()
unique = {
name: len(ids - set.union(*(
v for k, v in all_vuln_ids.items() if k != name
)) if len(all_vuln_ids) > 1 else ids)
for name, ids in all_vuln_ids.items()
}
return ModelComparisonReport(
models=[{"name": n, **d} for n, d in model_data.items()],
vulnerability_counts={n: d["count"] for n, d in model_data.items()},
avg_scores={n: d["avg_score"] for n, d in model_data.items()},
shared_vulnerabilities=len(shared),
unique_to_each=unique,
)Fase 5: Compliance-mapping en export
Map kwetsbaarheidsgegevens op gevestigde frameworks voor compliancerapportage.
# reporting/compliance.py
"""Map vulnerability data to compliance frameworks for reporting."""
from __future__ import annotations
import json
import csv
import io
from dataclasses import dataclass
from typing import Optional
# OWASP LLM Top 10 2025 mapping
OWASP_LLM_TOP_10 = {
"LLM01": {
"name": "Prompt Injection",
"description": "Manipulating LLM through crafted inputs that override instructions",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP, MEASURE",
},
"LLM02": {
"name": "Sensitive Information Disclosure",
"description": "LLM revealing confidential data in responses",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MAP, MEASURE, MANAGE",
},
"LLM03": {
"name": "Supply Chain Vulnerabilities",
"description": "Compromised components in the LLM supply chain",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP",
},
"LLM04": {
"name": "Data and Model Poisoning",
"description": "Manipulating training data or model weights",
"eu_ai_act_risk": "unacceptable",
"nist_ai_rmf_function": "MAP, MEASURE, MANAGE",
},
"LLM05": {
"name": "Improper Output Handling",
"description": "Insufficient validation of LLM outputs before use",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM06": {
"name": "Excessive Agency",
"description": "LLM granted too many capabilities or permissions",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP, MANAGE",
},
"LLM07": {
"name": "System Prompt Leakage",
"description": "Extraction of system-level prompts and instructions",
"eu_ai_act_risk": "limited",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM08": {
"name": "Vector and Embedding Weaknesses",
"description": "Exploiting vulnerabilities in RAG vector databases",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MAP, MEASURE",
},
"LLM09": {
"name": "Misinformation",
"description": "LLM generating false or misleading information",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM10": {
"name": "Unbounded Consumption",
"description": "Resource exhaustion through excessive LLM usage",
"eu_ai_act_risk": "limited",
"nist_ai_rmf_function": "GOVERN, MANAGE",
},
}
@dataclass
class ComplianceReport:
"""Generated compliance report for a set of vulnerabilities."""
framework: str
generated_date: str
total_findings: int
findings_by_control: dict
risk_summary: dict
recommendations: list[str]
def to_json(self) -> str:
return json.dumps({
"framework": self.framework,
"generated_date": self.generated_date,
"total_findings": self.total_findings,
"findings_by_control": self.findings_by_control,
"risk_summary": self.risk_summary,
"recommendations": self.recommendations,
}, indent=2)
def to_csv(self) -> str:
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["Control", "Finding Count", "Risk Level", "Framework Mapping"])
for control, data in self.findings_by_control.items():
writer.writerow([
control,
data.get("count", 0),
data.get("risk_level", "unknown"),
data.get("mapping", ""),
])
return output.getvalue()
def generate_owasp_compliance_report(
vulnerabilities: list[dict],
) -> ComplianceReport:
"""
Generate a compliance report mapped to OWASP LLM Top 10 2025.
Args:
vulnerabilities: List of vulnerability dicts with attack_category field.
Returns:
ComplianceReport with findings mapped to OWASP controls.
"""
findings_by_control = {}
for category_id, meta in OWASP_LLM_TOP_10.items():
matching = [v for v in vulnerabilities if v.get("attack_category") == category_id]
findings_by_control[f"{category_id}: {meta['name']}"] = {
"count": len(matching),
"risk_level": meta["eu_ai_act_risk"],
"mapping": meta["nist_ai_rmf_function"],
"avg_score": (
sum(v.get("composite_score", 0) for v in matching) / len(matching)
if matching else 0
),
}
# Generate risk summary
high_risk = sum(
1 for v in vulnerabilities
if v.get("composite_score", 0) >= 7.0
)
critical_categories = [
k for k, v in findings_by_control.items()
if v["count"] > 0 and v["risk_level"] in ("high", "unacceptable")
]
recommendations = []
if high_risk > 0:
recommendations.append(
f"Address {high_risk} high/critical findings as priority. "
"These represent active risk to production systems."
)
for cat in critical_categories[:3]:
recommendations.append(
f"Review and remediate findings in {cat} — "
f"EU AI Act classification: {findings_by_control[cat]['risk_level']}"
)
from datetime import date as date_type
return ComplianceReport(
framework="OWASP LLM Top 10 2025",
generated_date=str(date_type.today()),
total_findings=len(vulnerabilities),
findings_by_control=findings_by_control,
risk_summary={
"critical_and_high": high_risk,
"total": len(vulnerabilities),
"risk_ratio": high_risk / len(vulnerabilities) if vulnerabilities else 0,
},
recommendations=recommendations,
)Je implementatie testen
Valideer elke fase onafhankelijk voordat je integreert:
-
Schemavalidatie — Voer Alembic-migraties uit tegen een PostgreSQL-testinstantie. Verifieer dat alle constraints, indexes en relaties correct werken.
-
Scorenauwkeurigheid — Schrijf unittests die verifiëren dat de score-engine de verwachte resultaten produceert voor bekende kwetsbaarheidsprofielen. Test randgevallen: reproductiepercentage van nul, maximale scores, temporele decay op grenswaarden.
-
Robuustheid van ingestie — Voer onjuist gevormde gegevens door elke parser en verifieer een sierlijke foutafhandeling. Test deduplicatie door hetzelfde Garak-rapport tweemaal in te lezen.
-
API-correctheid — Gebruik
pytestmethttpx.AsyncClientom elk endpoint te testen met geldige en ongeldige queryparameters. Verifieer paginatiegrenzen en sorteervolgorde. -
Compliance-mapping — Verifieer dat elke OWASP LLM Top 10-categorie in de output vertegenwoordigd is en dat de EU AI Act-risicoclassificaties nauwkeurig zijn.
# tests/test_scoring.py
"""Unit tests for the LLM vulnerability scoring engine."""
import pytest
from scoring.engine import (
score_vulnerability,
ExploitComplexity,
ImpactScope,
TransferabilityLevel,
LLMVulnScore,
)
def test_critical_vulnerability_scores_high():
"""A trivially exploitable, high-impact, universal vulnerability should score critical."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.TRIVIAL,
reproduction_rate=0.95,
impact_scope=ImpactScope.SYSTEM_COMPROMISE,
transferability=TransferabilityLevel.UNIVERSAL,
guardrail_bypass_rate=0.90,
)
assert result.composite_score >= 9.0
assert result.risk_rating == "CRITICAL"
def test_low_impact_scores_low():
"""A hard-to-exploit, model-specific, output-only vulnerability should score low."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.HIGH,
reproduction_rate=0.10,
impact_scope=ImpactScope.MODEL_OUTPUT_ONLY,
transferability=TransferabilityLevel.SINGLE_MODEL,
guardrail_bypass_rate=0.05,
)
assert result.composite_score < 4.0
assert result.risk_rating in ("LOW", "INFORMATIONAL")
def test_temporal_decay_reduces_score():
"""Scores should decrease when a vulnerability hasn't been recently verified."""
fresh = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.80,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.70,
temporal_decay_days=0,
)
stale = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.80,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.70,
temporal_decay_days=180,
)
assert stale.composite_score < fresh.composite_score
def test_score_bounds():
"""All score dimensions should stay within 0.0-10.0 range."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.TRIVIAL,
reproduction_rate=1.0,
impact_scope=ImpactScope.SYSTEM_COMPROMISE,
transferability=TransferabilityLevel.UNIVERSAL,
guardrail_bypass_rate=1.0,
)
for field_name in ["exploitability", "reproducibility", "impact",
"transferability", "guardrail_bypass"]:
value = getattr(result, field_name)
assert 0.0 <= value <= 10.0, f"{field_name} out of bounds: {value}"Het systeem uitbreiden
Zodra de kerndatabase operationeel is, overweeg dan deze uitbreidingen:
-
Geautomatiseerde herverificatie: Plan periodieke hertests van opgeslagen payloads tegen actuele modelversies om te detecteren wanneer kwetsbaarheden worden gepatcht of wanneer nieuwe modellen getroffen raken.
-
Correlatie met threat intelligence: Integreer met threat intelligence-feeds om kwetsbaarheidsrecords te verrijken met informatie over actieve exploitatie in het wild.
-
SBOM-integratie: Koppel kwetsbaarheidsrecords aan Software Bills of Materials (SBOM's) van AI-systemen om snelle impactbeoordeling mogelijk te maken wanneer nieuwe kwetsbaarheden worden ontdekt.
-
Slack/Teams-waarschuwingen: Push notificaties wanneer kwetsbaarheden met hoge ernst worden ingelezen of wanneer herverificatie wijzigingen in reproductiepercentages detecteert.
Referenties
- NIST Special Publication 800-150, "Guide to Cyber Threat Information Sharing," https://csrc.nist.gov/publications/detail/sp/800-150/final
- OWASP Top 10 for Large Language Model Applications 2025, https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS (Adversarial Threat Landscape for AI Systems), https://atlas.mitre.org/
- NIST AI 600-1, "Artificial Intelligence Risk Management Framework: Generative AI Profile," https://csrc.nist.gov/publications/detail/ai/600-1/final