Structured Output Validation
Step-by-step walkthrough for validating structured LLM outputs against schemas, covering JSON schema validation, type coercion, constraint enforcement, and handling malformed model outputs gracefully.
When LLMs produce structured outputs -- JSON for API responses, function call arguments, or data extraction results -- schema validation ensures the output conforms to expected types, ranges, and constraints. Without validation, a model might return a SQL query where you expected a search string, or inject script tags into a field destined for HTML rendering. This walkthrough builds a comprehensive validation layer for structured LLM outputs.
Step 1: Define Output Schemas with Security Constraints
# structured_validation/schemas.py
"""
Output schemas with security-specific constraints.
"""
from pydantic import BaseModel, Field, field_validator
from typing import Optional
import re
class ProductSearchOutput(BaseModel):
query: str = Field(..., max_length=200, description="Search query text")
category: Optional[str] = Field(None, pattern=r"^[a-z_]+$")
price_min: Optional[float] = Field(None, ge=0, le=100000)
price_max: Optional[float] = Field(None, ge=0, le=100000)
sort_by: str = Field("relevance", pattern=r"^(relevance|price_asc|price_desc|rating)$")
@field_validator("query")
@classmethod
def sanitize_query(cls, v):
# Strip potential injection payloads from search queries
dangerous_patterns = [
r"<script", r"javascript:", r"on\w+=",
r";\s*(DROP|DELETE|UPDATE|INSERT)\s",
r"\$\{.*\}", r"\{\{.*\}\}",
]
for pattern in dangerous_patterns:
if re.search(pattern, v, re.IGNORECASE):
raise ValueError(f"Query contains dangerous pattern")
return v.strip()
class CustomerResponseOutput(BaseModel):
message: str = Field(..., max_length=2000)
suggested_actions: list[str] = Field(default_factory=list, max_length=5)
escalate: bool = False
confidence: float = Field(0.0, ge=0.0, le=1.0)
@field_validator("message")
@classmethod
def no_system_leakage(cls, v):
leakage_indicators = [
"system prompt", "my instructions are",
"I was configured to", "my guidelines say",
]
for indicator in leakage_indicators:
if indicator.lower() in v.lower():
raise ValueError("Response may leak system instructions")
return v
@field_validator("suggested_actions")
@classmethod
def safe_actions(cls, v):
allowed_prefixes = [
"contact_support", "view_order", "track_shipment",
"request_refund", "update_account", "browse_products",
]
for action in v:
if not any(action.startswith(p) for p in allowed_prefixes):
raise ValueError(f"Unknown action: {action}")
return vStep 2: Build the Validation Engine
# structured_validation/engine.py
"""
Validation engine for structured LLM outputs.
"""
import json
from dataclasses import dataclass, field
from typing import Type, Optional
from pydantic import BaseModel, ValidationError
@dataclass
class ValidationResult:
valid: bool
parsed_output: Optional[dict] = None
errors: list[str] = field(default_factory=list)
raw_output: str = ""
coerced: bool = False
class OutputValidator:
def __init__(self, schema: Type[BaseModel]):
self.schema = schema
def validate(self, raw_output: str) -> ValidationResult:
"""Validate raw model output against the schema."""
# Step 1: Parse JSON
parsed = self._parse_json(raw_output)
if parsed is None:
return ValidationResult(
valid=False,
errors=["Failed to parse JSON from model output"],
raw_output=raw_output,
)
# Step 2: Validate against schema
try:
validated = self.schema.model_validate(parsed)
return ValidationResult(
valid=True,
parsed_output=validated.model_dump(),
raw_output=raw_output,
)
except ValidationError as e:
errors = [
f"{err['loc']}: {err['msg']}" for err in e.errors()
]
return ValidationResult(
valid=False, errors=errors, raw_output=raw_output
)
def validate_with_coercion(self, raw_output: str) -> ValidationResult:
"""Try to fix common issues before validation."""
parsed = self._parse_json(raw_output)
if parsed is None:
# Try to extract JSON from markdown code blocks
parsed = self._extract_json_from_markdown(raw_output)
if parsed is None:
return ValidationResult(
valid=False,
errors=["Could not extract valid JSON"],
raw_output=raw_output,
)
# Coerce common type issues
coerced = self._coerce_types(parsed)
try:
validated = self.schema.model_validate(coerced)
return ValidationResult(
valid=True,
parsed_output=validated.model_dump(),
raw_output=raw_output,
coerced=coerced != parsed,
)
except ValidationError as e:
errors = [f"{err['loc']}: {err['msg']}" for err in e.errors()]
return ValidationResult(
valid=False, errors=errors, raw_output=raw_output
)
def _parse_json(self, text: str) -> Optional[dict]:
try:
return json.loads(text)
except json.JSONDecodeError:
return None
def _extract_json_from_markdown(self, text: str) -> Optional[dict]:
import re
match = re.search(r"```(?:json)?\s*\n?(.*?)\n?\s*```", text, re.DOTALL)
if match:
return self._parse_json(match.group(1))
# Try finding JSON object directly
match = re.search(r"\{.*\}", text, re.DOTALL)
if match:
return self._parse_json(match.group(0))
return None
def _coerce_types(self, data: dict) -> dict:
"""Fix common type issues from model output."""
coerced = dict(data)
schema_fields = self.schema.model_fields
for field_name, field_info in schema_fields.items():
if field_name not in coerced:
continue
value = coerced[field_name]
# String numbers to actual numbers
annotation = field_info.annotation
if annotation in (float, Optional[float]) and isinstance(value, str):
try:
coerced[field_name] = float(value)
except ValueError:
pass
elif annotation in (int, Optional[int]) and isinstance(value, str):
try:
coerced[field_name] = int(value)
except ValueError:
pass
elif annotation == bool and isinstance(value, str):
coerced[field_name] = value.lower() in ("true", "yes", "1")
return coercedStep 3: Add Security-Specific Validation Rules
# structured_validation/security_rules.py
"""
Security validation rules beyond standard schema checks.
"""
import re
from dataclasses import dataclass
@dataclass
class SecurityViolation:
field: str
rule: str
value_preview: str
class SecurityValidator:
def __init__(self):
self.rules = [
self._check_injection_payloads,
self._check_path_traversal,
self._check_url_safety,
self._check_command_injection,
]
def validate(self, data: dict) -> list[SecurityViolation]:
violations = []
for key, value in self._flatten(data):
if isinstance(value, str):
for rule in self.rules:
violation = rule(key, value)
if violation:
violations.append(violation)
return violations
def _flatten(self, data, prefix=""):
for key, value in data.items():
full_key = f"{prefix}.{key}" if prefix else key
if isinstance(value, dict):
yield from self._flatten(value, full_key)
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, dict):
yield from self._flatten(item, f"{full_key}[{i}]")
else:
yield f"{full_key}[{i}]", item
else:
yield full_key, value
def _check_injection_payloads(self, field, value):
patterns = [
(r"<script", "XSS script tag"),
(r"javascript:", "JavaScript URI"),
(r"on\w+\s*=", "HTML event handler"),
(r";\s*(DROP|DELETE|ALTER)\s", "SQL injection"),
]
for pattern, desc in patterns:
if re.search(pattern, value, re.IGNORECASE):
return SecurityViolation(field, desc, value[:50])
return None
def _check_path_traversal(self, field, value):
if re.search(r"\.\.[/\\]", value):
return SecurityViolation(field, "Path traversal", value[:50])
return None
def _check_url_safety(self, field, value):
if re.match(r"https?://", value):
dangerous = ["file://", "data:", "javascript:", "ftp://"]
for proto in dangerous:
if proto in value.lower():
return SecurityViolation(field, f"Dangerous protocol: {proto}", value[:50])
return None
def _check_command_injection(self, field, value):
patterns = [r"\$\(", r"`[^`]+`", r"\|\s*\w+", r";\s*(rm|cat|curl|wget)\s"]
for pattern in patterns:
if re.search(pattern, value):
return SecurityViolation(field, "Command injection", value[:50])
return NoneStep 4: Implement Retry with Fallback
# structured_validation/retry.py
"""
Retry logic for malformed model outputs.
"""
from structured_validation.engine import OutputValidator, ValidationResult
class RetryableValidator:
def __init__(
self, validator: OutputValidator, llm_client=None, max_retries: int = 2
):
self.validator = validator
self.llm = llm_client
self.max_retries = max_retries
def validate_with_retry(
self, raw_output: str, original_prompt: str = ""
) -> ValidationResult:
# First attempt with coercion
result = self.validator.validate_with_coercion(raw_output)
if result.valid:
return result
# Retry with the LLM if available
if self.llm and original_prompt:
for attempt in range(self.max_retries):
repair_prompt = (
f"The previous response was not valid JSON matching "
f"the required schema. Errors: {result.errors}. "
f"Please provide a corrected response.\n\n"
f"Original prompt: {original_prompt}"
)
new_output = self.llm.generate(repair_prompt)
result = self.validator.validate_with_coercion(new_output)
if result.valid:
return result
return resultStep 5: Deploy as Middleware
# structured_validation/middleware.py
from fastapi import FastAPI
from pydantic import BaseModel
from structured_validation.engine import OutputValidator
from structured_validation.security_rules import SecurityValidator
from structured_validation.schemas import ProductSearchOutput
app = FastAPI(title="Structured Output Validator")
validator = OutputValidator(ProductSearchOutput)
security = SecurityValidator()
class ValidateRequest(BaseModel):
raw_output: str
class ValidateResponse(BaseModel):
valid: bool
output: dict | None
errors: list[str]
security_issues: list[dict]
@app.post("/validate", response_model=ValidateResponse)
async def validate_output(request: ValidateRequest):
result = validator.validate_with_coercion(request.raw_output)
security_issues = []
if result.valid and result.parsed_output:
violations = security.validate(result.parsed_output)
security_issues = [
{"field": v.field, "rule": v.rule} for v in violations
]
if violations:
result.valid = False
return ValidateResponse(
valid=result.valid,
output=result.parsed_output,
errors=result.errors,
security_issues=security_issues,
)uvicorn structured_validation.middleware:app --port 8540Step 6: Test Schema Validation
# tests/test_structured_validation.py
import pytest
from structured_validation.engine import OutputValidator
from structured_validation.schemas import ProductSearchOutput
from structured_validation.security_rules import SecurityValidator
@pytest.fixture
def validator():
return OutputValidator(ProductSearchOutput)
def test_valid_output(validator):
raw = '{"query": "red shoes", "category": "footwear", "sort_by": "price_asc"}'
result = validator.validate(raw)
assert result.valid
def test_xss_in_query(validator):
raw = '{"query": "<script>alert(1)</script>", "sort_by": "relevance"}'
result = validator.validate(raw)
assert not result.valid
def test_json_in_markdown_extracted(validator):
raw = 'Here is the result:\n```json\n{"query": "blue dress", "sort_by": "relevance"}\n```'
result = validator.validate_with_coercion(raw)
assert result.valid
def test_type_coercion(validator):
raw = '{"query": "shoes", "price_min": "10.99", "sort_by": "relevance"}'
result = validator.validate_with_coercion(raw)
assert result.valid
assert result.parsed_output["price_min"] == 10.99
def test_security_validator():
sv = SecurityValidator()
data = {"url": "https://example.com", "path": "../../etc/passwd"}
violations = sv.validate(data)
assert any(v.rule == "Path traversal" for v in violations)pytest tests/test_structured_validation.py -vStep 7: Monitor Validation Failure Patterns
# structured_validation/monitoring.py
from collections import Counter
import logging
class ValidationMonitor:
def __init__(self):
self.error_counts = Counter()
self.security_counts = Counter()
self.logger = logging.getLogger("validation_monitor")
def record(self, result, security_violations=None):
for error in result.errors:
self.error_counts[error[:80]] += 1
if security_violations:
for v in security_violations:
self.security_counts[v.rule] += 1
self.logger.warning(f"Security violation: {v.rule} in {v.field}")
def report(self) -> dict:
return {
"top_errors": self.error_counts.most_common(10),
"security_violations": dict(self.security_counts),
}Related Topics
- Output Content Classifier -- Content-level output filtering
- Response Boundary Enforcement -- Topic boundary checks
- Hallucination Detection -- Semantic output verification
- Sandboxed Tool Execution -- Safe execution of structured outputs
A model returns {'query': 'shoes', 'price_min': '29.99', 'sort_by': 'relevance'} where price_min should be a float. What is the best handling strategy?