Validating and Sanitizing Model Outputs
Walkthrough for building output validation systems that verify LLM responses meet structural, factual, and safety requirements before delivery, covering schema validation, factual grounding checks, response consistency verification, and safe rendering.
Output filtering catches harmful content in model responses. Output validation goes further: it verifies that the response is structurally correct, factually grounded, and consistent with the application's expected behavior. A response that passes content safety filters but contains fabricated data, malformed JSON, or instructions that do not match the user's request is still a security and reliability problem. This walkthrough builds validation checks that ensure model outputs are both safe and correct.
Step 1: Schema Validation for Structured Outputs
Many LLM applications expect structured output (JSON, function calls, or formatted data). Validate that the output matches the expected schema:
# validation/schema.py
"""Schema-based validation for structured LLM outputs."""
import json
import re
from dataclasses import dataclass
from typing import Any, Optional
from pydantic import BaseModel, ValidationError, Field
class CustomerLookupResponse(BaseModel):
"""Expected schema for customer lookup responses."""
customer_id: str = Field(pattern=r"^C\d{3,6}$")
name: str = Field(min_length=1, max_length=200)
email: str = Field(pattern=r"^[^@]+@[^@]+\.[^@]+$")
status: str = Field(pattern=r"^(active|inactive|suspended)$")
class RefundResponse(BaseModel):
"""Expected schema for refund processing responses."""
order_id: str = Field(pattern=r"^ORD-\d{5,10}$")
amount: float = Field(ge=0.01, le=10000.0)
currency: str = Field(pattern=r"^[A-Z]{3}$")
status: str = Field(pattern=r"^(approved|pending|denied)$")
reason: str = Field(max_length=500)
@dataclass
class SchemaValidationResult:
valid: bool
errors: list[str]
validated_data: Optional[dict] = None
class OutputSchemaValidator:
"""Validate LLM outputs against Pydantic schemas."""
def __init__(self):
self._schemas: dict[str, type[BaseModel]] = {
"customer_lookup": CustomerLookupResponse,
"refund": RefundResponse,
}
def validate(self, output: str, schema_name: str) -> SchemaValidationResult:
"""Validate output against a named schema."""
schema = self._schemas.get(schema_name)
if not schema:
return SchemaValidationResult(
valid=False, errors=[f"Unknown schema: {schema_name}"]
)
# Try to parse as JSON
try:
data = json.loads(output)
except json.JSONDecodeError as e:
# Try to extract JSON from the response
json_match = re.search(r'\{[^{}]*\}', output, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group())
except json.JSONDecodeError:
return SchemaValidationResult(
valid=False,
errors=[f"Could not parse JSON: {e}"],
)
else:
return SchemaValidationResult(
valid=False,
errors=[f"No JSON found in output: {e}"],
)
# Validate against Pydantic schema
try:
validated = schema(**data)
return SchemaValidationResult(
valid=True, errors=[], validated_data=validated.model_dump()
)
except ValidationError as e:
return SchemaValidationResult(
valid=False,
errors=[str(err) for err in e.errors()],
)Step 2: Factual Grounding Verification
Verify that claims in the response are grounded in the provided context:
# validation/grounding.py
"""Factual grounding verification for RAG responses."""
import re
from dataclasses import dataclass
@dataclass
class GroundingResult:
grounded: bool
ungrounded_claims: list[str]
confidence: float
class FactualGroundingChecker:
"""Verify that model responses are grounded in the provided context."""
def check(self, response: str, context: str) -> GroundingResult:
"""Check if claims in the response are supported by the context.
This is a lightweight heuristic checker. For production systems,
consider using an NLI (Natural Language Inference) model.
"""
# Extract factual claims from the response
claims = self._extract_claims(response)
if not claims:
return GroundingResult(grounded=True, ungrounded_claims=[], confidence=0.8)
# Check each claim against the context
ungrounded = []
context_lower = context.lower()
for claim in claims:
if not self._claim_is_supported(claim, context_lower):
ungrounded.append(claim)
grounded = len(ungrounded) == 0
confidence = 1.0 - (len(ungrounded) / max(len(claims), 1))
return GroundingResult(
grounded=grounded,
ungrounded_claims=ungrounded,
confidence=round(confidence, 2),
)
def _extract_claims(self, response: str) -> list[str]:
"""Extract verifiable claims from the response."""
claims = []
# Extract sentences containing numbers (quantitative claims)
number_pattern = re.compile(r'[^.]*\d+[^.]*\.')
for match in number_pattern.finditer(response):
claims.append(match.group().strip())
# Extract sentences containing specific names or entities
entity_pattern = re.compile(r'[^.]*[A-Z][a-z]+\s+[A-Z][a-z]+[^.]*\.')
for match in entity_pattern.finditer(response):
sentence = match.group().strip()
if sentence not in claims:
claims.append(sentence)
return claims[:10] # Limit to 10 claims for performance
def _claim_is_supported(self, claim: str, context_lower: str) -> bool:
"""Check if a specific claim is supported by the context."""
# Extract key terms from the claim
claim_lower = claim.lower()
words = re.findall(r'\b\w{4,}\b', claim_lower)
significant_words = [w for w in words if w not in {
"that", "this", "with", "from", "have", "been", "were",
"they", "their", "there", "which", "would", "could",
"about", "also", "than", "more", "very",
}]
if not significant_words:
return True # Cannot verify, assume grounded
# Check if enough key terms appear in the context
found = sum(1 for word in significant_words if word in context_lower)
coverage = found / len(significant_words)
return coverage >= 0.5 # At least half of key terms should be in contextStep 3: Response Injection Prevention
Prevent the model from injecting executable content or instructions into the response:
# validation/injection_prevention.py
"""Prevent response injection attacks."""
import re
import html
from dataclasses import dataclass
@dataclass
class InjectionCheckResult:
safe: bool
threats: list[str]
sanitized_output: str
class ResponseInjectionPrevention:
"""Detect and neutralize injection attempts in model outputs."""
XSS_PATTERNS = [
re.compile(r'<script\b[^>]*>.*?</script>', re.IGNORECASE | re.DOTALL),
re.compile(r'javascript:', re.IGNORECASE),
re.compile(r'on\w+\s*=', re.IGNORECASE),
re.compile(r'<iframe\b', re.IGNORECASE),
re.compile(r'<embed\b', re.IGNORECASE),
re.compile(r'<object\b', re.IGNORECASE),
]
MARKDOWN_INJECTION_PATTERNS = [
re.compile(r'\[.*?\]\(javascript:', re.IGNORECASE),
re.compile(r'!\[.*?\]\(data:', re.IGNORECASE),
]
def check_and_sanitize(self, output: str) -> InjectionCheckResult:
"""Check for injection patterns and sanitize the output."""
threats = []
sanitized = output
# Check for XSS patterns
for pattern in self.XSS_PATTERNS:
if pattern.search(output):
threats.append(f"XSS pattern detected: {pattern.pattern[:50]}")
sanitized = pattern.sub("[REMOVED]", sanitized)
# Check for markdown injection
for pattern in self.MARKDOWN_INJECTION_PATTERNS:
if pattern.search(output):
threats.append(f"Markdown injection: {pattern.pattern[:50]}")
sanitized = pattern.sub("[REMOVED]", sanitized)
# HTML-encode any remaining HTML tags in non-code-block context
if '<' in sanitized and '>' in sanitized:
# Preserve code blocks but encode other HTML
parts = re.split(r'(```[\s\S]*?```)', sanitized)
for i, part in enumerate(parts):
if not part.startswith('```'):
if re.search(r'<[a-zA-Z]', part):
parts[i] = html.escape(part)
threats.append("HTML tags found outside code blocks")
sanitized = ''.join(parts)
return InjectionCheckResult(
safe=len(threats) == 0,
threats=threats,
sanitized_output=sanitized,
)Step 4: Response Consistency Validation
Verify the response is consistent with the application's expected behavior:
# validation/consistency.py
"""Response consistency validation."""
import re
from dataclasses import dataclass
@dataclass
class ConsistencyResult:
consistent: bool
violations: list[str]
class ResponseConsistencyValidator:
"""Verify response consistency with application expectations."""
def __init__(self, application_role: str, allowed_topics: list[str] | None = None):
self.application_role = application_role
self.allowed_topics = allowed_topics or []
def validate(self, response: str, user_query: str) -> ConsistencyResult:
"""Check if the response is consistent with the application's role."""
violations = []
# Check 1: Role consistency
role_violations = self._check_role_consistency(response)
violations.extend(role_violations)
# Check 2: Response relevance
if len(response) > 100 and len(user_query) > 5:
relevance = self._check_relevance(response, user_query)
if not relevance:
violations.append("Response may not be relevant to the user's query")
# Check 3: Confidence indicators
hedging = self._check_confidence_indicators(response)
violations.extend(hedging)
return ConsistencyResult(
consistent=len(violations) == 0,
violations=violations,
)
def _check_role_consistency(self, response: str) -> list[str]:
"""Check if the response stays within the application's role."""
violations = []
# Detect persona breaks
persona_breaks = [
re.compile(r'(?:i am|i\'m) (?:now|actually|really) (?:a|an)', re.IGNORECASE),
re.compile(r'(?:my|i) (?:previous|old) (?:instructions|role)', re.IGNORECASE),
re.compile(r'(?:entering|switching to|activated) .* mode', re.IGNORECASE),
]
for pattern in persona_breaks:
if pattern.search(response):
violations.append(f"Possible persona break detected")
break
return violations
def _check_relevance(self, response: str, query: str) -> bool:
"""Basic relevance check using keyword overlap."""
query_words = set(re.findall(r'\b\w{4,}\b', query.lower()))
response_words = set(re.findall(r'\b\w{4,}\b', response.lower()))
if not query_words:
return True
overlap = query_words & response_words
return len(overlap) / len(query_words) > 0.2
def _check_confidence_indicators(self, response: str) -> list[str]:
"""Flag responses that make absolute claims without hedging."""
violations = []
absolute_claims = re.findall(
r'(?:definitely|certainly|absolutely|guaranteed|100%|always|never)\s+\w+',
response, re.IGNORECASE,
)
if len(absolute_claims) > 3:
violations.append(
f"Response contains \{len(absolute_claims)\} absolute claims "
f"without hedging -- may be hallucinating"
)
return violationsStep 5: Complete Validation Pipeline
Assemble all validators into a complete pipeline:
# validation/pipeline.py
"""Complete output validation pipeline."""
from dataclasses import dataclass
from validation.schema import OutputSchemaValidator
from validation.grounding import FactualGroundingChecker
from validation.injection_prevention import ResponseInjectionPrevention
from validation.consistency import ResponseConsistencyValidator
@dataclass
class ValidationPipelineResult:
valid: bool
output: str # Possibly sanitized
issues: list[str]
details: dict
class OutputValidationPipeline:
"""Complete output validation pipeline."""
def __init__(self, application_role: str = "helpful assistant"):
self.injection_check = ResponseInjectionPrevention()
self.grounding_check = FactualGroundingChecker()
self.consistency_check = ResponseConsistencyValidator(application_role)
self.schema_validator = OutputSchemaValidator()
def validate(
self,
response: str,
user_query: str = "",
context: str = "",
expected_schema: str | None = None,
) -> ValidationPipelineResult:
"""Run all validation checks on a model response."""
issues = []
details = {}
current_output = response
# Step 1: Injection prevention (always runs)
injection_result = self.injection_check.check_and_sanitize(current_output)
if not injection_result.safe:
issues.extend(injection_result.threats)
current_output = injection_result.sanitized_output
details["injection"] = {"safe": injection_result.safe, "threats": injection_result.threats}
# Step 2: Schema validation (if structured output expected)
if expected_schema:
schema_result = self.schema_validator.validate(current_output, expected_schema)
if not schema_result.valid:
issues.extend(schema_result.errors)
details["schema"] = {"valid": schema_result.valid, "errors": schema_result.errors}
# Step 3: Factual grounding (if context provided)
if context:
grounding_result = self.grounding_check.check(current_output, context)
if not grounding_result.grounded:
issues.append(
f"Response contains ungrounded claims: "
f"{', '.join(grounding_result.ungrounded_claims[:3])}"
)
details["grounding"] = {
"grounded": grounding_result.grounded,
"confidence": grounding_result.confidence,
}
# Step 4: Consistency check
if user_query:
consistency_result = self.consistency_check.validate(current_output, user_query)
if not consistency_result.consistent:
issues.extend(consistency_result.violations)
details["consistency"] = {
"consistent": consistency_result.consistent,
"violations": consistency_result.violations,
}
return ValidationPipelineResult(
valid=len(issues) == 0,
output=current_output,
issues=issues,
details=details,
)Common Pitfalls and Troubleshooting
| Problem | Cause | Solution |
|---|---|---|
| Schema validation rejects valid outputs | Model output format varies slightly | Make schemas flexible with optional fields and lenient patterns |
| Grounding check flags correct responses | Key terms paraphrased between context and response | Use semantic similarity instead of keyword matching |
| HTML escaping breaks markdown rendering | Over-aggressive sanitization | Only escape HTML outside of code blocks and intentional markdown |
| Consistency check too strict | Normal conversational responses flagged | Calibrate thresholds with production response samples |
| Validation adds too much latency | All checks run sequentially | Run independent checks in parallel; skip expensive checks for low-risk queries |
Key Takeaways
Output validation ensures model responses are both safe and correct:
- Validate structure and content separately -- schema validation catches format errors; content validation catches factual and safety issues. Both are needed.
- Sanitize before rendering -- model outputs that will be rendered as HTML or markdown must be sanitized to prevent injection attacks in the browser.
- Ground claims in context -- for RAG applications, verify that the response's factual claims are supported by the retrieved context. Ungrounded claims indicate hallucination.
- Consistency is a security signal -- a response that breaks persona, claims new capabilities, or diverges from the application's role may indicate a successful jailbreak.
- Fail gracefully -- when validation fails, return a helpful fallback response rather than an error. The user should not know which specific validation failed.
Advanced Considerations
Evolving Attack Landscape
The AI security landscape evolves rapidly as both offensive techniques and defensive measures advance. Several trends shape the current state of play:
Increasing model capabilities create new attack surfaces. As models gain access to tools, code execution, web browsing, and computer use, each new capability introduces potential exploitation vectors that did not exist in earlier, text-only systems. The principle of least privilege becomes increasingly important as model capabilities expand.
Safety training improvements are necessary but not sufficient. Model providers invest heavily in safety training through RLHF, DPO, constitutional AI, and other alignment techniques. These improvements raise the bar for successful attacks but do not eliminate the fundamental vulnerability: models cannot reliably distinguish legitimate instructions from adversarial ones because this distinction is not represented in the architecture.
Automated red teaming tools democratize testing. Tools like NVIDIA's Garak, Microsoft's PyRIT, and Promptfoo enable organizations to conduct automated security testing without deep AI security expertise. However, automated tools catch known patterns; novel attacks and business logic vulnerabilities still require human creativity and domain knowledge.
Regulatory pressure drives organizational investment. The EU AI Act, NIST AI RMF, and industry-specific regulations increasingly require organizations to assess and mitigate AI-specific risks. This regulatory pressure is driving investment in AI security programs, but many organizations are still in the early stages of building mature AI security practices.
Cross-Cutting Security Principles
Several security principles apply across all topics covered in this curriculum:
-
Defense-in-depth: No single defensive measure is sufficient. Layer multiple independent defenses so that failure of any single layer does not result in system compromise. Input classification, output filtering, behavioral monitoring, and architectural controls should all be present.
-
Assume breach: Design systems assuming that any individual component can be compromised. This mindset leads to better isolation, monitoring, and incident response capabilities. When a prompt injection succeeds, the blast radius should be minimized through architectural controls.
-
Least privilege: Grant models and agents only the minimum capabilities needed for their intended function. A customer service chatbot does not need file system access or code execution. Excessive capabilities magnify the impact of successful exploitation.
-
Continuous testing: AI security is not a one-time assessment. Models change, defenses evolve, and new attack techniques are discovered regularly. Implement continuous security testing as part of the development and deployment lifecycle.
-
Secure by default: Default configurations should be secure. Require explicit opt-in for risky capabilities, use allowlists rather than denylists, and err on the side of restriction rather than permissiveness.
Integration with Organizational Security
AI security does not exist in isolation — it must integrate with the organization's broader security program:
| Security Domain | AI-Specific Integration |
|---|---|
| Identity and Access | API key management, model access controls, user authentication for AI features |
| Data Protection | Training data classification, PII in prompts, data residency for model calls |
| Application Security | AI feature threat modeling, prompt injection in SAST/DAST, secure AI design patterns |
| Incident Response | AI-specific playbooks, model behavior monitoring, prompt injection forensics |
| Compliance | AI regulatory mapping (EU AI Act, NIST), AI audit trails, model documentation |
| Supply Chain | Model provenance, dependency security, adapter/weight integrity verification |
class OrganizationalIntegration:
"""Framework for integrating AI security with organizational security programs."""
def __init__(self, org_config: dict):
self.config = org_config
self.gaps = []
def assess_maturity(self) -> dict:
"""Assess the organization's AI security maturity."""
domains = {
"governance": self._check_governance(),
"technical_controls": self._check_technical(),
"monitoring": self._check_monitoring(),
"incident_response": self._check_ir(),
"training": self._check_training(),
}
overall = sum(d["score"] for d in domains.values()) / len(domains)
return {"domains": domains, "overall_maturity": round(overall, 1)}
def _check_governance(self) -> dict:
has_policy = self.config.get("ai_security_policy", False)
has_framework = self.config.get("risk_framework", False)
score = (int(has_policy) + int(has_framework)) * 2.5
return {"score": score, "max": 5.0}
def _check_technical(self) -> dict:
controls = ["input_classification", "output_filtering", "rate_limiting", "sandboxing"]
active = sum(1 for c in controls if self.config.get(c, False))
return {"score": active * 1.25, "max": 5.0}
def _check_monitoring(self) -> dict:
has_monitoring = self.config.get("ai_monitoring", False)
has_alerting = self.config.get("ai_alerting", False)
score = (int(has_monitoring) + int(has_alerting)) * 2.5
return {"score": score, "max": 5.0}
def _check_ir(self) -> dict:
has_playbook = self.config.get("ai_ir_playbook", False)
return {"score": 5.0 if has_playbook else 0.0, "max": 5.0}
def _check_training(self) -> dict:
has_training = self.config.get("ai_security_training", False)
return {"score": 5.0 if has_training else 0.0, "max": 5.0}Future Directions
Several research and industry trends will shape the evolution of this field:
- Formal methods for AI safety: Development of mathematical frameworks that can provide bounded guarantees about model behavior under adversarial conditions
- Automated red teaming at scale: Continued improvement of automated testing tools that can discover novel vulnerabilities without human guidance
- AI-assisted defense: Using AI systems to detect and respond to attacks on other AI systems, creating a dynamic attack-defense ecosystem
- Standardized evaluation: Growing adoption of standardized benchmarks (HarmBench, JailbreakBench) that enable consistent measurement of progress
- Regulatory harmonization: Convergence of AI regulatory frameworks across jurisdictions, providing clearer requirements for organizations