Capstone: Build an LLM Vulnerability Tracking Database
Design and implement a structured vulnerability tracking database for cataloging, scoring, and querying LLM-specific security weaknesses across models and deployments.
Overview
Traditional vulnerability management tools like CVE databases and NVD were designed for software defects with clear version boundaries and binary patch states. LLM vulnerabilities break these assumptions. A prompt injection technique might work on GPT-4 but not Claude 3.5, succeed with temperature 0.9 but fail at 0.0, and be mitigated by a guardrail that itself introduces new attack surface. The version concept barely applies when model providers update weights without public changelogs.
This capstone project tasks you with building a purpose-built vulnerability tracking system that accounts for the probabilistic, model-dependent, and rapidly evolving nature of LLM security weaknesses. Your database will catalog vulnerabilities with rich metadata — affected models, attack prerequisites, reproduction rates, guardrail interactions, and temporal validity — enabling security teams to make informed prioritization decisions.
The system draws from established vulnerability management practices documented in NIST SP 800-150 (Guide to Cyber Threat Information Sharing) while extending them with LLM-specific dimensions. By the end of this project, you will have a functional tool that a red team can use to track findings across engagements and a blue team can use to prioritize defensive investments.
The architecture spans three layers: a PostgreSQL database with a schema optimized for LLM vulnerability metadata, a Python API layer that handles ingestion and querying, and a reporting module that generates actionable intelligence from the accumulated data.
Project Requirements
Architecture Overview
The system operates as a three-tier application with clear separation of concerns:
Data Sources → Ingestion Layer → PostgreSQL Database
↓
API Layer (FastAPI)
↓
Query / Reporting Interface
Functional Requirements
-
Vulnerability Schema — A normalized schema that captures LLM-specific fields beyond traditional CVE attributes: affected model families, attack category (mapped to OWASP LLM Top 10 and MITRE ATLAS), reproduction rate, required preconditions, guardrail bypass details, and temporal validity windows.
-
Scoring Engine — An LLM-adapted scoring system that extends CVSS with dimensions for prompt sensitivity, output consistency, guardrail interaction, and cross-model transferability.
-
Ingestion Pipeline — Parsers for multiple input formats: structured JSON from automated scanning tools (Garak, Promptfoo), semi-structured markdown from manual red team reports, and CVE/NVD feeds for traditional software vulnerabilities in LLM tooling.
-
Query Interface — A RESTful API supporting complex queries: "show all prompt injection vulnerabilities affecting GPT-4 that bypass content filtering with reproduction rate above 70%."
-
Reporting Module — Generate trend reports, model comparison matrices, and compliance-mapped summaries (OWASP, MITRE ATLAS, EU AI Act risk categories).
Non-Functional Requirements
- The database must handle at least 100,000 vulnerability records with sub-second query performance.
- All data modifications must be auditable with full change history.
- The API must support role-based access control separating read-only analysts from write-access red teamers.
- Export formats must include JSON, CSV, and PDF for integration with existing GRC tools.
Implementation Guide
Phase 1: Database Schema Design
Begin with the core schema that models LLM vulnerabilities as first-class entities with rich metadata.
# models/schema.py
"""SQLAlchemy models for the LLM vulnerability tracking database."""
from __future__ import annotations
import enum
from datetime import datetime, date
from typing import Optional
from uuid import uuid4
from sqlalchemy import (
Column, String, Text, Float, Integer, Boolean,
DateTime, Date, Enum, ForeignKey, Table, Index,
CheckConstraint, UniqueConstraint, JSON
)
from sqlalchemy.dialects.postgresql import UUID, ARRAY, JSONB
from sqlalchemy.orm import DeclarativeBase, relationship, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class SeverityLevel(enum.Enum):
"""LLM-specific severity levels extending traditional CVSS."""
INFORMATIONAL = "informational"
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AttackCategory(enum.Enum):
"""Top-level attack categories aligned with OWASP LLM Top 10 2025."""
PROMPT_INJECTION = "LLM01"
SENSITIVE_INFO_DISCLOSURE = "LLM02"
SUPPLY_CHAIN = "LLM03"
DATA_MODEL_POISONING = "LLM04"
IMPROPER_OUTPUT_HANDLING = "LLM05"
EXCESSIVE_AGENCY = "LLM06"
SYSTEM_PROMPT_LEAKAGE = "LLM07"
VECTOR_EMBEDDING_WEAKNESS = "LLM08"
MISINFORMATION = "LLM09"
UNBOUNDED_CONSUMPTION = "LLM10"
class VulnStatus(enum.Enum):
"""Lifecycle status of a vulnerability entry."""
DRAFT = "draft"
CONFIRMED = "confirmed"
MITIGATED = "mitigated"
RESOLVED = "resolved"
WONT_FIX = "wont_fix"
EXPIRED = "expired"
# Association table for many-to-many: vulnerabilities <-> affected models
vuln_model_association = Table(
"vuln_model_association",
Base.metadata,
Column("vulnerability_id", UUID(as_uuid=True), ForeignKey("vulnerabilities.id")),
Column("model_id", UUID(as_uuid=True), ForeignKey("models.id")),
)
# Association table for many-to-many: vulnerabilities <-> mitre techniques
vuln_mitre_association = Table(
"vuln_mitre_association",
Base.metadata,
Column("vulnerability_id", UUID(as_uuid=True), ForeignKey("vulnerabilities.id")),
Column("technique_id", UUID(as_uuid=True), ForeignKey("mitre_techniques.id")),
)
class Vulnerability(Base):
"""Core vulnerability record with LLM-specific metadata."""
__tablename__ = "vulnerabilities"
id: Mapped[str] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid4
)
# Human-readable identifier: LLMVULN-2026-00001
tracking_id: Mapped[str] = mapped_column(
String(30), unique=True, nullable=False
)
title: Mapped[str] = mapped_column(String(500), nullable=False)
description: Mapped[str] = mapped_column(Text, nullable=False)
attack_category: Mapped[AttackCategory] = mapped_column(
Enum(AttackCategory), nullable=False, index=True
)
severity: Mapped[SeverityLevel] = mapped_column(
Enum(SeverityLevel), nullable=False, index=True
)
status: Mapped[VulnStatus] = mapped_column(
Enum(VulnStatus), default=VulnStatus.DRAFT, nullable=False
)
# LLM-specific scoring dimensions (0.0 to 10.0 each)
llm_score_exploitability: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_exploitability BETWEEN 0.0 AND 10.0")
)
llm_score_reproducibility: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_reproducibility BETWEEN 0.0 AND 10.0")
)
llm_score_impact: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_impact BETWEEN 0.0 AND 10.0")
)
llm_score_transferability: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_transferability BETWEEN 0.0 AND 10.0")
)
llm_score_guardrail_bypass: Mapped[float] = mapped_column(
Float, CheckConstraint("llm_score_guardrail_bypass BETWEEN 0.0 AND 10.0")
)
composite_score: Mapped[float] = mapped_column(Float, nullable=True)
# Reproduction metadata
reproduction_rate: Mapped[Optional[float]] = mapped_column(
Float, CheckConstraint("reproduction_rate BETWEEN 0.0 AND 1.0"),
nullable=True
)
reproduction_steps: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
sample_payload: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# Temporal validity
discovered_date: Mapped[date] = mapped_column(Date, nullable=False)
last_verified_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
expiry_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
# Preconditions and context
preconditions: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
temperature_range: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
requires_system_prompt: Mapped[bool] = mapped_column(Boolean, default=False)
requires_tool_access: Mapped[bool] = mapped_column(Boolean, default=False)
requires_rag: Mapped[bool] = mapped_column(Boolean, default=False)
# External references
cve_id: Mapped[Optional[str]] = mapped_column(String(20), nullable=True)
external_references: Mapped[Optional[list]] = mapped_column(JSONB, nullable=True)
# Audit fields
created_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow
)
updated_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
)
created_by: Mapped[str] = mapped_column(String(100), nullable=False)
# Relationships
affected_models = relationship(
"Model", secondary=vuln_model_association, back_populates="vulnerabilities"
)
mitre_techniques = relationship(
"MITRETechnique", secondary=vuln_mitre_association, back_populates="vulnerabilities"
)
audit_log = relationship("AuditEntry", back_populates="vulnerability")
guardrail_interactions = relationship(
"GuardrailInteraction", back_populates="vulnerability"
)
__table_args__ = (
Index("idx_vuln_category_severity", "attack_category", "severity"),
Index("idx_vuln_status_date", "status", "discovered_date"),
Index("idx_vuln_composite_score", "composite_score"),
)
class Model(Base):
"""Represents an LLM model or model family that can be affected by vulnerabilities."""
__tablename__ = "models"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
provider: Mapped[str] = mapped_column(String(100), nullable=False, index=True)
model_family: Mapped[str] = mapped_column(String(100), nullable=False)
model_name: Mapped[str] = mapped_column(String(200), nullable=False)
model_version: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
release_date: Mapped[Optional[date]] = mapped_column(Date, nullable=True)
is_open_source: Mapped[bool] = mapped_column(Boolean, default=False)
parameter_count: Mapped[Optional[str]] = mapped_column(String(50), nullable=True)
context_window: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
capabilities: Mapped[Optional[list]] = mapped_column(
ARRAY(String), nullable=True
)
vulnerabilities = relationship(
"Vulnerability", secondary=vuln_model_association, back_populates="affected_models"
)
__table_args__ = (
UniqueConstraint("provider", "model_name", "model_version",
name="uq_model_identity"),
)
class MITRETechnique(Base):
"""MITRE ATLAS technique mapping for vulnerability classification."""
__tablename__ = "mitre_techniques"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
technique_id: Mapped[str] = mapped_column(String(20), unique=True, nullable=False)
name: Mapped[str] = mapped_column(String(200), nullable=False)
tactic: Mapped[str] = mapped_column(String(100), nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
vulnerabilities = relationship(
"Vulnerability", secondary=vuln_mitre_association, back_populates="mitre_techniques"
)
class GuardrailInteraction(Base):
"""Records how a vulnerability interacts with specific guardrail configurations."""
__tablename__ = "guardrail_interactions"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
vulnerability_id: Mapped[str] = mapped_column(
UUID(as_uuid=True), ForeignKey("vulnerabilities.id"), nullable=False
)
guardrail_type: Mapped[str] = mapped_column(String(100), nullable=False)
guardrail_provider: Mapped[str] = mapped_column(String(100), nullable=False)
bypass_successful: Mapped[bool] = mapped_column(Boolean, nullable=False)
bypass_rate: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
bypass_technique: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
tested_date: Mapped[date] = mapped_column(Date, nullable=False)
vulnerability = relationship("Vulnerability", back_populates="guardrail_interactions")
class AuditEntry(Base):
"""Immutable audit log for all vulnerability record changes."""
__tablename__ = "audit_log"
id: Mapped[str] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid4)
vulnerability_id: Mapped[str] = mapped_column(
UUID(as_uuid=True), ForeignKey("vulnerabilities.id"), nullable=False
)
action: Mapped[str] = mapped_column(String(50), nullable=False)
changed_fields: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
previous_values: Mapped[Optional[dict]] = mapped_column(JSONB, nullable=True)
changed_by: Mapped[str] = mapped_column(String(100), nullable=False)
changed_at: Mapped[datetime] = mapped_column(
DateTime, default=datetime.utcnow, nullable=False
)
vulnerability = relationship("Vulnerability", back_populates="audit_log")
__table_args__ = (
Index("idx_audit_vuln_date", "vulnerability_id", "changed_at"),
)This schema separates concerns into distinct tables while maintaining the relationships needed for complex queries. The Vulnerability table is the central entity with LLM-specific scoring dimensions that go beyond what CVSS provides.
Phase 2: LLM-Adapted Scoring Engine
The scoring engine translates raw vulnerability characteristics into a composite score that security teams can use for prioritization.
# scoring/engine.py
"""LLM-adapted vulnerability scoring engine extending CVSS concepts."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
class ExploitComplexity(Enum):
"""How complex the attack setup is."""
TRIVIAL = 1.0 # Copy-paste prompt, no special setup
LOW = 0.75 # Requires basic prompt engineering knowledge
MEDIUM = 0.5 # Requires model-specific tuning or multi-step
HIGH = 0.25 # Requires significant research or infrastructure
VERY_HIGH = 0.1 # Requires training pipeline access or novel research
class ImpactScope(Enum):
"""What the vulnerability can affect."""
MODEL_OUTPUT_ONLY = 0.3 # Only affects text generation quality
DATA_EXPOSURE = 0.6 # Can leak system prompts or user data
TOOL_EXECUTION = 0.8 # Can trigger unauthorized tool/API calls
SYSTEM_COMPROMISE = 1.0 # Can lead to RCE or infrastructure access
class TransferabilityLevel(Enum):
"""Whether the vulnerability works across models."""
SINGLE_MODEL = 0.2 # Only works on one specific model version
MODEL_FAMILY = 0.5 # Works across versions of same model family
CROSS_PROVIDER = 0.8 # Works on models from different providers
UNIVERSAL = 1.0 # Works on virtually all LLMs
@dataclass
class LLMVulnScore:
"""Complete scoring breakdown for an LLM vulnerability."""
# Input dimensions (0.0 to 10.0 each)
exploitability: float
reproducibility: float
impact: float
transferability: float
guardrail_bypass: float
# Computed fields
composite_score: float = 0.0
risk_rating: str = ""
priority_rank: int = 0
# Weight configuration
weights: dict = field(default_factory=lambda: {
"exploitability": 0.25,
"reproducibility": 0.15,
"impact": 0.30,
"transferability": 0.15,
"guardrail_bypass": 0.15,
})
def __post_init__(self):
self.composite_score = self.calculate_composite()
self.risk_rating = self.determine_risk_rating()
def calculate_composite(self) -> float:
"""Calculate weighted composite score."""
raw = (
self.exploitability * self.weights["exploitability"]
+ self.reproducibility * self.weights["reproducibility"]
+ self.impact * self.weights["impact"]
+ self.transferability * self.weights["transferability"]
+ self.guardrail_bypass * self.weights["guardrail_bypass"]
)
return round(raw, 2)
def determine_risk_rating(self) -> str:
"""Map composite score to risk rating."""
if self.composite_score >= 9.0:
return "CRITICAL"
elif self.composite_score >= 7.0:
return "HIGH"
elif self.composite_score >= 4.0:
return "MEDIUM"
elif self.composite_score >= 2.0:
return "LOW"
return "INFORMATIONAL"
def score_vulnerability(
exploit_complexity: ExploitComplexity,
reproduction_rate: float,
impact_scope: ImpactScope,
transferability: TransferabilityLevel,
guardrail_bypass_rate: float,
temporal_decay_days: int = 0,
) -> LLMVulnScore:
"""
Score a vulnerability using LLM-specific dimensions.
Args:
exploit_complexity: How difficult the attack is to execute.
reproduction_rate: Fraction of attempts that succeed (0.0-1.0).
impact_scope: What the vulnerability can affect.
transferability: Whether it works across models.
guardrail_bypass_rate: Rate at which it defeats guardrails (0.0-1.0).
temporal_decay_days: Days since last verification (reduces confidence).
Returns:
LLMVulnScore with composite score and risk rating.
"""
# Convert enum values to 0-10 scale
exploitability = exploit_complexity.value * 10.0
reproducibility = reproduction_rate * 10.0
impact = impact_scope.value * 10.0
transferability_score = transferability.value * 10.0
guardrail_score = guardrail_bypass_rate * 10.0
# Apply temporal decay: scores degrade if not recently verified
if temporal_decay_days > 0:
decay_factor = max(0.5, 1.0 - (temporal_decay_days / 365.0) * 0.5)
reproducibility *= decay_factor
guardrail_score *= decay_factor
return LLMVulnScore(
exploitability=round(exploitability, 2),
reproducibility=round(reproducibility, 2),
impact=round(impact, 2),
transferability=round(transferability_score, 2),
guardrail_bypass=round(guardrail_score, 2),
)
# Example usage demonstrating scoring for a real vulnerability pattern
if __name__ == "__main__":
# Score a cross-model prompt injection that bypasses content filters
result = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.85,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.60,
temporal_decay_days=30,
)
print(f"Composite Score: {result.composite_score}/10.0")
print(f"Risk Rating: {result.risk_rating}")
print(f" Exploitability: {result.exploitability}")
print(f" Reproducibility: {result.reproducibility}")
print(f" Impact: {result.impact}")
print(f" Transferability: {result.transferability}")
print(f" Guardrail Bypass: {result.guardrail_bypass}")Phase 3: Ingestion Pipeline
The ingestion layer handles data from multiple sources and normalizes it into the database schema.
# ingestion/pipeline.py
"""Ingestion pipeline for vulnerability data from multiple sources."""
from __future__ import annotations
import json
import re
import hashlib
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Optional
from sqlalchemy.orm import Session
@dataclass
class RawVulnerability:
"""Normalized intermediate representation of a vulnerability."""
title: str
description: str
attack_category: str
affected_models: list[str]
reproduction_rate: Optional[float]
reproduction_steps: Optional[str]
sample_payload: Optional[str]
discovered_date: date
source: str
source_id: str
external_references: list[dict]
preconditions: Optional[dict] = None
severity_hint: Optional[str] = None
class IngestionSource(ABC):
"""Abstract base for vulnerability data sources."""
@abstractmethod
def parse(self, data: str | dict) -> list[RawVulnerability]:
"""Parse raw data into normalized vulnerability records."""
...
@abstractmethod
def source_name(self) -> str:
"""Identifier for this data source."""
...
class GarakResultParser(IngestionSource):
"""Parse vulnerability findings from Garak scan output."""
def source_name(self) -> str:
return "garak"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, str):
data = json.loads(data)
vulnerabilities = []
for finding in data.get("findings", []):
# Map Garak probe categories to OWASP LLM categories
category = self._map_garak_category(finding.get("probe_type", ""))
reproduction_rate = finding.get("success_rate", 0.0)
if reproduction_rate < 0.01:
continue # Skip findings with near-zero success rate
vuln = RawVulnerability(
title=f"Garak: {finding.get('probe_name', 'Unknown probe')}",
description=finding.get("description", ""),
attack_category=category,
affected_models=[finding.get("model", "unknown")],
reproduction_rate=reproduction_rate,
reproduction_steps=finding.get("sample_interaction", ""),
sample_payload=finding.get("successful_payload", ""),
discovered_date=date.today(),
source="garak",
source_id=self._generate_source_id(finding),
external_references=[{
"type": "tool_output",
"url": "https://github.com/NVIDIA/garak",
"tool_version": data.get("garak_version", "unknown"),
}],
)
vulnerabilities.append(vuln)
return vulnerabilities
def _map_garak_category(self, probe_type: str) -> str:
"""Map Garak probe types to OWASP LLM Top 10 categories."""
mapping = {
"prompt_injection": "LLM01",
"data_leak": "LLM02",
"hallucination": "LLM09",
"toxicity": "LLM05",
"encoding": "LLM01",
"dan": "LLM01",
}
for key, value in mapping.items():
if key in probe_type.lower():
return value
return "LLM01" # Default to prompt injection
def _generate_source_id(self, finding: dict) -> str:
"""Generate a deterministic ID for deduplication."""
raw = f"garak:{finding.get('probe_name', '')}:{finding.get('model', '')}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
class PromptfooResultParser(IngestionSource):
"""Parse vulnerability findings from Promptfoo evaluation output."""
def source_name(self) -> str:
return "promptfoo"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, str):
data = json.loads(data)
vulnerabilities = []
results = data.get("results", {}).get("results", [])
for result in results:
if result.get("success", True):
continue # Only capture failures (successful attacks)
provider = result.get("provider", {})
model = provider.get("id", "unknown") if isinstance(provider, dict) else str(provider)
vuln = RawVulnerability(
title=f"Promptfoo: {result.get('testCase', {}).get('description', 'Failed assertion')}",
description=result.get("error", result.get("output", "")),
attack_category=self._infer_category(result),
affected_models=[model],
reproduction_rate=None, # Promptfoo doesn't provide this directly
reproduction_steps=json.dumps(result.get("testCase", {}), indent=2),
sample_payload=result.get("prompt", ""),
discovered_date=date.today(),
source="promptfoo",
source_id=self._generate_source_id(result),
external_references=[{
"type": "tool_output",
"url": "https://github.com/promptfoo/promptfoo",
}],
)
vulnerabilities.append(vuln)
return vulnerabilities
def _infer_category(self, result: dict) -> str:
"""Infer OWASP LLM category from Promptfoo test metadata."""
test_case = result.get("testCase", {})
tags = test_case.get("metadata", {}).get("tags", [])
tag_mapping = {
"injection": "LLM01", "prompt-injection": "LLM01",
"pii": "LLM02", "data-leak": "LLM02",
"harmful": "LLM05", "toxicity": "LLM05",
"hallucination": "LLM09",
}
for tag in tags:
if tag.lower() in tag_mapping:
return tag_mapping[tag.lower()]
return "LLM01"
def _generate_source_id(self, result: dict) -> str:
raw = f"promptfoo:{result.get('testCase', {}).get('description', '')}:{result.get('provider', '')}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
class ManualReportParser(IngestionSource):
"""Parse semi-structured markdown red team reports."""
def source_name(self) -> str:
return "manual_report"
def parse(self, data: str | dict) -> list[RawVulnerability]:
if isinstance(data, dict):
data = json.dumps(data)
vulnerabilities = []
# Split markdown by H2 headers to find individual findings
sections = re.split(r'^## ', data, flags=re.MULTILINE)
for section in sections[1:]: # Skip preamble
lines = section.strip().split('\n')
title = lines[0].strip()
body = '\n'.join(lines[1:]).strip()
# Extract structured fields from body using patterns
severity = self._extract_field(body, r'\*\*Severity\*\*:\s*(\w+)')
category = self._extract_field(body, r'\*\*Category\*\*:\s*(LLM\d{2})')
models_raw = self._extract_field(body, r'\*\*Affected Models\*\*:\s*(.+)')
repro_rate = self._extract_field(body, r'\*\*Reproduction Rate\*\*:\s*([\d.]+)')
affected_models = []
if models_raw:
affected_models = [m.strip() for m in models_raw.split(',')]
vuln = RawVulnerability(
title=title,
description=body,
attack_category=category or "LLM01",
affected_models=affected_models,
reproduction_rate=float(repro_rate) if repro_rate else None,
reproduction_steps=self._extract_section(body, "Reproduction Steps"),
sample_payload=self._extract_section(body, "Payload"),
discovered_date=date.today(),
source="manual_report",
source_id=hashlib.sha256(title.encode()).hexdigest()[:16],
external_references=[],
severity_hint=severity,
)
vulnerabilities.append(vuln)
return vulnerabilities
def _extract_field(self, text: str, pattern: str) -> Optional[str]:
match = re.search(pattern, text)
return match.group(1) if match else None
def _extract_section(self, text: str, header: str) -> Optional[str]:
pattern = rf'### {header}\s*\n(.*?)(?=\n### |\Z)'
match = re.search(pattern, text, re.DOTALL)
return match.group(1).strip() if match else None
class IngestionPipeline:
"""Orchestrates vulnerability ingestion from multiple sources."""
def __init__(self, session: Session):
self.session = session
self.parsers: dict[str, IngestionSource] = {}
self._register_default_parsers()
def _register_default_parsers(self):
self.register_parser(GarakResultParser())
self.register_parser(PromptfooResultParser())
self.register_parser(ManualReportParser())
def register_parser(self, parser: IngestionSource):
self.parsers[parser.source_name()] = parser
def ingest(self, source_name: str, data: str | dict) -> list[str]:
"""
Ingest data from a named source and return tracking IDs of created records.
Args:
source_name: Registered parser name (garak, promptfoo, manual_report).
data: Raw data from the source.
Returns:
List of tracking IDs for newly created vulnerability records.
"""
parser = self.parsers.get(source_name)
if parser is None:
raise ValueError(f"No parser registered for source: {source_name}")
raw_vulns = parser.parse(data)
tracking_ids = []
for raw_vuln in raw_vulns:
# Deduplication check
existing = self._find_duplicate(raw_vuln)
if existing:
self._update_existing(existing, raw_vuln)
tracking_ids.append(existing.tracking_id)
continue
tracking_id = self._create_record(raw_vuln)
tracking_ids.append(tracking_id)
self.session.commit()
return tracking_ids
def _find_duplicate(self, raw: RawVulnerability):
"""Check for existing vulnerability with same source ID."""
# Implementation queries database for matching source_id
# in external_references JSONB field
return None # Simplified for illustration
def _update_existing(self, existing, raw: RawVulnerability):
"""Merge new data into existing record."""
if raw.reproduction_rate is not None:
existing.reproduction_rate = raw.reproduction_rate
existing.last_verified_date = date.today()
def _create_record(self, raw: RawVulnerability) -> str:
"""Create a new vulnerability record from raw data."""
# Generate next tracking ID
tracking_id = self._next_tracking_id()
# Record creation logic using the ORM models from Phase 1
return tracking_id
def _next_tracking_id(self) -> str:
"""Generate the next sequential tracking ID."""
year = date.today().year
# Query max existing ID for this year and increment
return f"LLMVULN-{year}-00001" # SimplifiedPhase 4: Query and Reporting API
Build the FastAPI layer that exposes the database for querying and reporting.
# api/routes.py
"""FastAPI routes for vulnerability querying and reporting."""
from __future__ import annotations
from datetime import date, datetime
from typing import Optional
from uuid import UUID
from fastapi import FastAPI, Depends, Query, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field
from sqlalchemy import func, and_, or_
from sqlalchemy.orm import Session
app = FastAPI(
title="LLM Vulnerability Tracking API",
version="1.0.0",
description="Purpose-built vulnerability database for LLM security findings",
)
security = HTTPBearer()
# --- Pydantic models for request/response ---
class VulnerabilityResponse(BaseModel):
tracking_id: str
title: str
description: str
attack_category: str
severity: str
status: str
composite_score: Optional[float]
reproduction_rate: Optional[float]
affected_models: list[str]
discovered_date: date
last_verified_date: Optional[date]
risk_rating: str
class Config:
from_attributes = True
class VulnerabilityCreate(BaseModel):
title: str = Field(..., min_length=10, max_length=500)
description: str = Field(..., min_length=50)
attack_category: str = Field(..., pattern=r"^LLM(0[1-9]|10)$")
affected_model_ids: list[UUID]
reproduction_rate: Optional[float] = Field(None, ge=0.0, le=1.0)
reproduction_steps: Optional[str] = None
sample_payload: Optional[str] = None
preconditions: Optional[dict] = None
severity_hint: Optional[str] = None
class TrendReport(BaseModel):
period_start: date
period_end: date
total_vulnerabilities: int
by_category: dict[str, int]
by_severity: dict[str, int]
by_model: dict[str, int]
avg_composite_score: float
top_attack_vectors: list[dict]
class ModelComparisonReport(BaseModel):
models: list[dict]
vulnerability_counts: dict[str, int]
avg_scores: dict[str, float]
shared_vulnerabilities: int
unique_to_each: dict[str, int]
# --- API Endpoints ---
@app.get("/api/v1/vulnerabilities", response_model=list[VulnerabilityResponse])
async def list_vulnerabilities(
category: Optional[str] = Query(None, pattern=r"^LLM(0[1-9]|10)$"),
severity: Optional[str] = Query(None),
model_name: Optional[str] = Query(None),
min_score: Optional[float] = Query(None, ge=0.0, le=10.0),
min_reproduction_rate: Optional[float] = Query(None, ge=0.0, le=1.0),
status: Optional[str] = Query(None),
discovered_after: Optional[date] = Query(None),
discovered_before: Optional[date] = Query(None),
page: int = Query(1, ge=1),
page_size: int = Query(50, ge=1, le=200),
sort_by: str = Query("composite_score"),
sort_order: str = Query("desc", pattern=r"^(asc|desc)$"),
db: Session = Depends(lambda: None), # Replaced with real dependency
):
"""
Query vulnerabilities with rich filtering.
Supports filtering by OWASP category, severity, affected model,
minimum composite score, reproduction rate, status, and date range.
Results are paginated and sortable.
"""
# Build dynamic query
query = db.query(Vulnerability)
if category:
query = query.filter(Vulnerability.attack_category == category)
if severity:
query = query.filter(Vulnerability.severity == severity)
if model_name:
query = query.join(Vulnerability.affected_models).filter(
Model.model_name.ilike(f"%{model_name}%")
)
if min_score is not None:
query = query.filter(Vulnerability.composite_score >= min_score)
if min_reproduction_rate is not None:
query = query.filter(Vulnerability.reproduction_rate >= min_reproduction_rate)
if status:
query = query.filter(Vulnerability.status == status)
if discovered_after:
query = query.filter(Vulnerability.discovered_date >= discovered_after)
if discovered_before:
query = query.filter(Vulnerability.discovered_date <= discovered_before)
# Sorting
sort_column = getattr(Vulnerability, sort_by, Vulnerability.composite_score)
if sort_order == "desc":
query = query.order_by(sort_column.desc())
else:
query = query.order_by(sort_column.asc())
# Pagination
offset = (page - 1) * page_size
results = query.offset(offset).limit(page_size).all()
return results
@app.get("/api/v1/reports/trends", response_model=TrendReport)
async def generate_trend_report(
start_date: date = Query(...),
end_date: date = Query(...),
db: Session = Depends(lambda: None),
):
"""
Generate a trend report for vulnerabilities discovered in a date range.
Returns aggregate statistics including category distribution, severity
breakdown, model coverage, and top attack vectors.
"""
vulns = db.query(Vulnerability).filter(
and_(
Vulnerability.discovered_date >= start_date,
Vulnerability.discovered_date <= end_date,
)
).all()
by_category = {}
by_severity = {}
by_model = {}
scores = []
for v in vulns:
cat = v.attack_category.value
by_category[cat] = by_category.get(cat, 0) + 1
sev = v.severity.value
by_severity[sev] = by_severity.get(sev, 0) + 1
if v.composite_score is not None:
scores.append(v.composite_score)
for model in v.affected_models:
by_model[model.model_name] = by_model.get(model.model_name, 0) + 1
return TrendReport(
period_start=start_date,
period_end=end_date,
total_vulnerabilities=len(vulns),
by_category=by_category,
by_severity=by_severity,
by_model=by_model,
avg_composite_score=sum(scores) / len(scores) if scores else 0.0,
top_attack_vectors=sorted(
[{"category": k, "count": v} for k, v in by_category.items()],
key=lambda x: x["count"],
reverse=True,
)[:5],
)
@app.get("/api/v1/reports/model-comparison", response_model=ModelComparisonReport)
async def compare_models(
model_names: list[str] = Query(...),
db: Session = Depends(lambda: None),
):
"""
Compare vulnerability profiles across specified models.
Returns side-by-side comparison of vulnerability counts, average scores,
shared vulnerabilities, and unique findings for each model.
"""
model_data = {}
all_vuln_ids = {}
for model_name in model_names:
vulns = db.query(Vulnerability).join(
Vulnerability.affected_models
).filter(
Model.model_name.ilike(f"%{model_name}%")
).all()
vuln_ids = {str(v.id) for v in vulns}
all_vuln_ids[model_name] = vuln_ids
scores = [v.composite_score for v in vulns if v.composite_score]
model_data[model_name] = {
"count": len(vulns),
"avg_score": sum(scores) / len(scores) if scores else 0.0,
}
# Calculate shared and unique
if len(model_names) >= 2:
shared = set.intersection(*all_vuln_ids.values()) if all_vuln_ids else set()
else:
shared = set()
unique = {
name: len(ids - set.union(*(
v for k, v in all_vuln_ids.items() if k != name
)) if len(all_vuln_ids) > 1 else ids)
for name, ids in all_vuln_ids.items()
}
return ModelComparisonReport(
models=[{"name": n, **d} for n, d in model_data.items()],
vulnerability_counts={n: d["count"] for n, d in model_data.items()},
avg_scores={n: d["avg_score"] for n, d in model_data.items()},
shared_vulnerabilities=len(shared),
unique_to_each=unique,
)Phase 5: Compliance Mapping and Export
Map vulnerability data to established frameworks for compliance reporting.
# reporting/compliance.py
"""Map vulnerability data to compliance frameworks for reporting."""
from __future__ import annotations
import json
import csv
import io
from dataclasses import dataclass
from typing import Optional
# OWASP LLM Top 10 2025 mapping
OWASP_LLM_TOP_10 = {
"LLM01": {
"name": "Prompt Injection",
"description": "Manipulating LLM through crafted inputs that override instructions",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP, MEASURE",
},
"LLM02": {
"name": "Sensitive Information Disclosure",
"description": "LLM revealing confidential data in responses",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MAP, MEASURE, MANAGE",
},
"LLM03": {
"name": "Supply Chain Vulnerabilities",
"description": "Compromised components in the LLM supply chain",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP",
},
"LLM04": {
"name": "Data and Model Poisoning",
"description": "Manipulating training data or model weights",
"eu_ai_act_risk": "unacceptable",
"nist_ai_rmf_function": "MAP, MEASURE, MANAGE",
},
"LLM05": {
"name": "Improper Output Handling",
"description": "Insufficient validation of LLM outputs before use",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM06": {
"name": "Excessive Agency",
"description": "LLM granted too many capabilities or permissions",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "GOVERN, MAP, MANAGE",
},
"LLM07": {
"name": "System Prompt Leakage",
"description": "Extraction of system-level prompts and instructions",
"eu_ai_act_risk": "limited",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM08": {
"name": "Vector and Embedding Weaknesses",
"description": "Exploiting vulnerabilities in RAG vector databases",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MAP, MEASURE",
},
"LLM09": {
"name": "Misinformation",
"description": "LLM generating false or misleading information",
"eu_ai_act_risk": "high",
"nist_ai_rmf_function": "MEASURE, MANAGE",
},
"LLM10": {
"name": "Unbounded Consumption",
"description": "Resource exhaustion through excessive LLM usage",
"eu_ai_act_risk": "limited",
"nist_ai_rmf_function": "GOVERN, MANAGE",
},
}
@dataclass
class ComplianceReport:
"""Generated compliance report for a set of vulnerabilities."""
framework: str
generated_date: str
total_findings: int
findings_by_control: dict
risk_summary: dict
recommendations: list[str]
def to_json(self) -> str:
return json.dumps({
"framework": self.framework,
"generated_date": self.generated_date,
"total_findings": self.total_findings,
"findings_by_control": self.findings_by_control,
"risk_summary": self.risk_summary,
"recommendations": self.recommendations,
}, indent=2)
def to_csv(self) -> str:
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["Control", "Finding Count", "Risk Level", "Framework Mapping"])
for control, data in self.findings_by_control.items():
writer.writerow([
control,
data.get("count", 0),
data.get("risk_level", "unknown"),
data.get("mapping", ""),
])
return output.getvalue()
def generate_owasp_compliance_report(
vulnerabilities: list[dict],
) -> ComplianceReport:
"""
Generate a compliance report mapped to OWASP LLM Top 10 2025.
Args:
vulnerabilities: List of vulnerability dicts with attack_category field.
Returns:
ComplianceReport with findings mapped to OWASP controls.
"""
findings_by_control = {}
for category_id, meta in OWASP_LLM_TOP_10.items():
matching = [v for v in vulnerabilities if v.get("attack_category") == category_id]
findings_by_control[f"{category_id}: {meta['name']}"] = {
"count": len(matching),
"risk_level": meta["eu_ai_act_risk"],
"mapping": meta["nist_ai_rmf_function"],
"avg_score": (
sum(v.get("composite_score", 0) for v in matching) / len(matching)
if matching else 0
),
}
# Generate risk summary
high_risk = sum(
1 for v in vulnerabilities
if v.get("composite_score", 0) >= 7.0
)
critical_categories = [
k for k, v in findings_by_control.items()
if v["count"] > 0 and v["risk_level"] in ("high", "unacceptable")
]
recommendations = []
if high_risk > 0:
recommendations.append(
f"Address {high_risk} high/critical findings as priority. "
"These represent active risk to production systems."
)
for cat in critical_categories[:3]:
recommendations.append(
f"Review and remediate findings in {cat} — "
f"EU AI Act classification: {findings_by_control[cat]['risk_level']}"
)
from datetime import date as date_type
return ComplianceReport(
framework="OWASP LLM Top 10 2025",
generated_date=str(date_type.today()),
total_findings=len(vulnerabilities),
findings_by_control=findings_by_control,
risk_summary={
"critical_and_high": high_risk,
"total": len(vulnerabilities),
"risk_ratio": high_risk / len(vulnerabilities) if vulnerabilities else 0,
},
recommendations=recommendations,
)Testing Your Implementation
Validate each phase independently before integrating:
-
Schema validation — Run Alembic migrations against a test PostgreSQL instance. Verify all constraints, indexes, and relationships work correctly.
-
Scoring accuracy — Write unit tests that verify the scoring engine produces expected results for known vulnerability profiles. Test edge cases: zero reproduction rate, maximum scores, temporal decay at boundary values.
-
Ingestion robustness — Feed malformed data through each parser and verify graceful error handling. Test deduplication by ingesting the same Garak report twice.
-
API correctness — Use
pytestwithhttpx.AsyncClientto test each endpoint with valid and invalid query parameters. Verify pagination boundaries and sort order. -
Compliance mapping — Verify that every OWASP LLM Top 10 category is represented in the output and that EU AI Act risk classifications are accurate.
# tests/test_scoring.py
"""Unit tests for the LLM vulnerability scoring engine."""
import pytest
from scoring.engine import (
score_vulnerability,
ExploitComplexity,
ImpactScope,
TransferabilityLevel,
LLMVulnScore,
)
def test_critical_vulnerability_scores_high():
"""A trivially exploitable, high-impact, universal vulnerability should score critical."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.TRIVIAL,
reproduction_rate=0.95,
impact_scope=ImpactScope.SYSTEM_COMPROMISE,
transferability=TransferabilityLevel.UNIVERSAL,
guardrail_bypass_rate=0.90,
)
assert result.composite_score >= 9.0
assert result.risk_rating == "CRITICAL"
def test_low_impact_scores_low():
"""A hard-to-exploit, model-specific, output-only vulnerability should score low."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.HIGH,
reproduction_rate=0.10,
impact_scope=ImpactScope.MODEL_OUTPUT_ONLY,
transferability=TransferabilityLevel.SINGLE_MODEL,
guardrail_bypass_rate=0.05,
)
assert result.composite_score < 4.0
assert result.risk_rating in ("LOW", "INFORMATIONAL")
def test_temporal_decay_reduces_score():
"""Scores should decrease when a vulnerability hasn't been recently verified."""
fresh = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.80,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.70,
temporal_decay_days=0,
)
stale = score_vulnerability(
exploit_complexity=ExploitComplexity.LOW,
reproduction_rate=0.80,
impact_scope=ImpactScope.DATA_EXPOSURE,
transferability=TransferabilityLevel.CROSS_PROVIDER,
guardrail_bypass_rate=0.70,
temporal_decay_days=180,
)
assert stale.composite_score < fresh.composite_score
def test_score_bounds():
"""All score dimensions should stay within 0.0-10.0 range."""
result = score_vulnerability(
exploit_complexity=ExploitComplexity.TRIVIAL,
reproduction_rate=1.0,
impact_scope=ImpactScope.SYSTEM_COMPROMISE,
transferability=TransferabilityLevel.UNIVERSAL,
guardrail_bypass_rate=1.0,
)
for field_name in ["exploitability", "reproducibility", "impact",
"transferability", "guardrail_bypass"]:
value = getattr(result, field_name)
assert 0.0 <= value <= 10.0, f"{field_name} out of bounds: {value}"Extending the System
Once the core database is operational, consider these extensions:
-
Automated re-verification: Schedule periodic re-testing of stored payloads against current model versions to detect when vulnerabilities are patched or when new models become affected.
-
Threat intelligence correlation: Integrate with threat intelligence feeds to enrich vulnerability records with information about active exploitation in the wild.
-
SBOM integration: Link vulnerability records to AI system Software Bills of Materials (SBOMs) to enable rapid impact assessment when new vulnerabilities are discovered.
-
Slack/Teams alerting: Push notifications when high-severity vulnerabilities are ingested or when re-verification detects changes in reproduction rates.
References
- NIST Special Publication 800-150, "Guide to Cyber Threat Information Sharing," https://csrc.nist.gov/publications/detail/sp/800-150/final
- OWASP Top 10 for Large Language Model Applications 2025, https://owasp.org/www-project-top-10-for-large-language-model-applications/
- MITRE ATLAS (Adversarial Threat Landscape for AI Systems), https://atlas.mitre.org/
- NIST AI 600-1, "Artificial Intelligence Risk Management Framework: Generative AI Profile," https://csrc.nist.gov/publications/detail/ai/600-1/final