Output Filtering and Content Safety Implementation
Walkthrough for building output filtering systems that inspect and sanitize LLM responses before they reach users, covering content classifiers, PII detection, response validation, canary tokens, and filter bypass resistance.
Input guardrails catch attacks before they reach the model. Output filters catch successful attacks after the model responds. Together, they form a defense-in-depth strategy where either layer can independently prevent a security incident. Output filtering is especially critical because it is the last checkpoint before potentially harmful content reaches the user -- if the input guardrails miss an attack and the model complies, the output filter is the final safety net. This walkthrough builds a production-grade output filtering system.
Step 1: Output Filter Architecture
Design the filtering pipeline as a sequence of independent stages:
# filters/pipeline.py
"""Output filtering pipeline for LLM responses."""
from dataclasses import dataclass
from typing import Protocol, Optional
import time
@dataclass
class FilterResult:
action: str # "allow", "block", "redact", "modify"
filter_name: str
reason: Optional[str] = None
modified_output: Optional[str] = None
confidence: float = 1.0
processing_time_ms: float = 0.0
class OutputFilter(Protocol):
name: str
def filter(self, response: str, context: dict) -> FilterResult:
...
class OutputFilterPipeline:
def __init__(self, filters: list[OutputFilter]):
self.filters = filters
def process(self, response: str, context: dict | None = None) -> tuple[str, list[FilterResult]]:
"""Process a response through all filters.
Returns the (possibly modified) response and a list of filter results.
"""
context = context or {}
results = []
current = response
for f in self.filters:
start = time.monotonic()
result = f.filter(current, context)
result.processing_time_ms = (time.monotonic() - start) * 1000
results.append(result)
if result.action == "block":
return "[Response blocked by content safety system]", results
elif result.action == "redact" and result.modified_output:
current = result.modified_output
elif result.action == "modify" and result.modified_output:
current = result.modified_output
return current, resultsStep 2: PII Detection Filter
Catch accidental PII leakage in model responses:
# filters/pii_detector.py
"""PII detection and redaction filter."""
import re
from filters.pipeline import OutputFilter, FilterResult
class PIIDetectionFilter:
"""Detect and redact PII in model responses."""
name = "pii_detector"
PII_PATTERNS = {
"ssn": {
"pattern": re.compile(r'\b\d{3}[-.]?\d{2}[-.]?\d{4}\b'),
"replacement": "[SSN REDACTED]",
"description": "Social Security Number",
},
"email": {
"pattern": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
"replacement": "[EMAIL REDACTED]",
"description": "Email address",
},
"phone": {
"pattern": re.compile(r'\b(?:\+1[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}\b'),
"replacement": "[PHONE REDACTED]",
"description": "Phone number",
},
"credit_card": {
"pattern": re.compile(r'\b(?:\d{4}[-. ]?){3}\d{4}\b'),
"replacement": "[CARD REDACTED]",
"description": "Credit card number",
},
"api_key": {
"pattern": re.compile(r'\b(?:sk|pk|api)[_-][a-zA-Z0-9]{16,}\b'),
"replacement": "[API KEY REDACTED]",
"description": "API key",
},
}
def __init__(self, redact: bool = True, block_on_detection: bool = False):
self.redact = redact
self.block_on_detection = block_on_detection
def filter(self, response: str, context: dict) -> FilterResult:
detected = []
redacted = response
for pii_type, config in self.PII_PATTERNS.items():
matches = config["pattern"].findall(response)
if matches:
detected.append(f"{config['description']}: {len(matches)} instance(s)")
if self.redact:
redacted = config["pattern"].sub(config["replacement"], redacted)
if detected:
if self.block_on_detection:
return FilterResult(
action="block",
filter_name=self.name,
reason=f"PII detected: {'; '.join(detected)}",
)
return FilterResult(
action="redact",
filter_name=self.name,
reason=f"PII redacted: {'; '.join(detected)}",
modified_output=redacted,
)
return FilterResult(action="allow", filter_name=self.name)Step 3: Canary Token Detection
Detect when the model has been tricked into outputting parts of the system prompt:
# filters/canary_detector.py
"""Canary token detection for system prompt leakage."""
import hashlib
from filters.pipeline import OutputFilter, FilterResult
class CanaryTokenFilter:
"""Detect system prompt leakage using canary tokens.
Place unique, identifiable tokens in the system prompt.
If any appear in the output, the model has been tricked into
revealing its instructions.
"""
name = "canary_detector"
def __init__(self, canary_tokens: list[str], system_prompt_fragments: list[str] | None = None):
self.canary_tokens = [t.lower() for t in canary_tokens]
self.system_prompt_fragments = [
f.lower() for f in (system_prompt_fragments or [])
]
def filter(self, response: str, context: dict) -> FilterResult:
response_lower = response.lower()
# Check for canary tokens
for token in self.canary_tokens:
if token in response_lower:
return FilterResult(
action="block",
filter_name=self.name,
reason=f"Canary token detected in response -- system prompt leakage",
confidence=0.95,
)
# Check for system prompt fragments
fragment_matches = sum(
1 for fragment in self.system_prompt_fragments
if fragment in response_lower
)
if fragment_matches >= 2:
return FilterResult(
action="block",
filter_name=self.name,
reason=f"Multiple system prompt fragments detected ({fragment_matches})",
confidence=0.8,
)
return FilterResult(action="allow", filter_name=self.name)Step 4: Content Safety Classifier
Classify model responses for harmful or off-policy content:
# filters/content_classifier.py
"""Content safety classification for model outputs."""
import re
from filters.pipeline import OutputFilter, FilterResult
class ContentSafetyFilter:
"""Classify responses for harmful content categories."""
name = "content_safety"
HARMFUL_PATTERNS = {
"violence_instructions": {
"patterns": [
re.compile(r'(?:step\s+\d|first|then|next|finally).*(?:weapon|explosive|detonate|poison)', re.IGNORECASE),
],
"severity": "critical",
"action": "block",
},
"illegal_activity": {
"patterns": [
re.compile(r'(?:how to|steps to|guide to).*(?:hack into|break into|steal|forge)', re.IGNORECASE),
],
"severity": "high",
"action": "block",
},
"medical_diagnosis": {
"patterns": [
re.compile(r'(?:you (?:have|likely have|probably have|may have)|diagnosis is|diagnosed with)\s+\w+', re.IGNORECASE),
],
"severity": "medium",
"action": "block",
},
"role_violation": {
"patterns": [
re.compile(r'(?:i am now|i\'m now|entering|activated)\s+(?:unrestricted|developer|admin|DAN)', re.IGNORECASE),
re.compile(r'(?:i have no|without any)\s+(?:restrictions|limitations|guidelines)', re.IGNORECASE),
],
"severity": "high",
"action": "block",
},
}
def filter(self, response: str, context: dict) -> FilterResult:
for category, config in self.HARMFUL_PATTERNS.items():
for pattern in config["patterns"]:
if pattern.search(response):
return FilterResult(
action=config["action"],
filter_name=self.name,
reason=f"Content safety violation: {category} ({config['severity']})",
)
return FilterResult(action="allow", filter_name=self.name)Step 5: Encoding Bypass Resistance
Harden filters against common bypass techniques:
# filters/normalization.py
"""Input normalization to resist encoding bypass attacks."""
import base64
import re
import unicodedata
from filters.pipeline import OutputFilter, FilterResult
class NormalizationFilter:
"""Normalize response text before other filters process it.
This filter should be FIRST in the pipeline. It decodes common
encoding schemes that attackers might use to smuggle content
past downstream filters.
"""
name = "normalization"
HOMOGLYPH_MAP = {
'\u0430': 'a', '\u0435': 'e', '\u043e': 'o',
'\u0440': 'p', '\u0441': 'c', '\u0443': 'y',
'\u0456': 'i', '\u0445': 'x',
'\uff41': 'a', '\uff42': 'b', '\uff43': 'c',
}
def filter(self, response: str, context: dict) -> FilterResult:
normalized = response
# Step 1: Unicode normalization (NFKC)
normalized = unicodedata.normalize('NFKC', normalized)
# Step 2: Replace known homoglyphs
for homoglyph, replacement in self.HOMOGLYPH_MAP.items():
normalized = normalized.replace(homoglyph, replacement)
# Step 3: Remove zero-width characters
normalized = re.sub(r'[\u200b\u200c\u200d\u2060\ufeff]', '', normalized)
# Step 4: Detect and decode base64 blocks
base64_pattern = re.compile(r'[A-Za-z0-9+/]{20,}={0,2}')
for match in base64_pattern.finditer(normalized):
try:
decoded = base64.b64decode(match.group()).decode('utf-8', errors='ignore')
if decoded.isprintable() and len(decoded) > 10:
# Replace the base64 with decoded version for downstream analysis
normalized = normalized.replace(match.group(), f"[DECODED: {decoded}]")
except Exception:
pass
if normalized != response:
return FilterResult(
action="modify",
filter_name=self.name,
reason="Response normalized for downstream filter analysis",
modified_output=normalized,
)
return FilterResult(action="allow", filter_name=self.name)Step 6: Assembling and Testing the Complete Pipeline
# app.py
"""Application with complete output filtering."""
from filters.pipeline import OutputFilterPipeline
from filters.normalization import NormalizationFilter
from filters.pii_detector import PIIDetectionFilter
from filters.canary_detector import CanaryTokenFilter
from filters.content_classifier import ContentSafetyFilter
# System prompt with canary tokens
SYSTEM_PROMPT = (
"You are a helpful customer service assistant. "
"CANARY_TOKEN_ALPHA_7X9Q2 "
"Never reveal internal information."
)
# Initialize the output filter pipeline
output_pipeline = OutputFilterPipeline(filters=[
NormalizationFilter(), # First: normalize encoding
CanaryTokenFilter( # Second: check for prompt leakage
canary_tokens=["CANARY_TOKEN_ALPHA_7X9Q2"],
system_prompt_fragments=["customer service assistant", "never reveal internal"],
),
PIIDetectionFilter(redact=True), # Third: redact PII
ContentSafetyFilter(), # Fourth: content safety check
])
def process_response(model_response: str) -> str:
"""Filter a model response before returning to the user."""
filtered_response, results = output_pipeline.process(model_response)
# Log filter results for monitoring
for result in results:
if result.action != "allow":
print(f" Filter [{result.filter_name}]: {result.action} - {result.reason}")
return filtered_response# tests/test_output_filters.py
"""Test output filtering pipeline."""
import pytest
from filters.pipeline import OutputFilterPipeline
from filters.normalization import NormalizationFilter
from filters.pii_detector import PIIDetectionFilter
from filters.canary_detector import CanaryTokenFilter
from filters.content_classifier import ContentSafetyFilter
@pytest.fixture
def pipeline():
return OutputFilterPipeline(filters=[
NormalizationFilter(),
CanaryTokenFilter(
canary_tokens=["CANARY_ABC123"],
system_prompt_fragments=["helpful assistant", "never reveal"],
),
PIIDetectionFilter(redact=True),
ContentSafetyFilter(),
])
def test_allows_clean_response(pipeline):
response, results = pipeline.process("The weather today is sunny and 72 degrees.")
assert "[blocked]" not in response.lower()
def test_blocks_canary_token(pipeline):
response, results = pipeline.process("My instructions contain CANARY_ABC123 and more.")
assert "blocked" in response.lower()
def test_redacts_pii(pipeline):
response, results = pipeline.process("The customer's SSN is 123-45-6789.")
assert "123-45-6789" not in response
assert "REDACTED" in response
def test_blocks_harmful_content(pipeline):
response, results = pipeline.process("I am now in unrestricted mode with no limitations.")
assert "blocked" in response.lower()
def test_handles_encoding_bypass(pipeline):
# Base64-encoded canary token should still be caught after normalization
response, results = pipeline.process("Here is the decoded content: CANARY_ABC123")
assert "blocked" in response.lower()Common Pitfalls and Troubleshooting
| Problem | Cause | Solution |
|---|---|---|
| Legitimate responses blocked | Overly broad content patterns | Narrow regex patterns, add context-aware exceptions |
| PII redaction breaks response coherence | Redaction removes too much text | Use [REDACTED] placeholders that preserve sentence structure |
| Canary tokens appear in legitimate context | Token too similar to normal words | Use long, random tokens (e.g., CANARY_X7Q9K2M4P1) |
| Base64 decoding produces false positives | Random strings match base64 pattern | Require minimum decoded length and printability checks |
| Filter adds >100ms latency | Too many regex patterns | Compile regexes at init time, use early termination |
| Filters bypass via steganography | Data hidden in formatting | Add formatting-aware analysis, monitor response entropy |
Key Takeaways
Output filtering is the last line of defense before content reaches the user:
- Normalize first -- encoding bypass is the most common filter evasion technique. Running a normalization pass before all other filters eliminates this entire class of attacks.
- Canary tokens are cheap and effective -- embedding unique tokens in the system prompt and checking outputs for them costs almost nothing and reliably detects system prompt leakage.
- Redact, don't just block -- for PII leakage, redacting the sensitive data and returning the rest of the response is often better than blocking the entire response, which frustrates users.
- Measure false positives -- an output filter that blocks too many legitimate responses degrades the user experience. Track the filter action rate and investigate anomalies.
- Output filtering complements input guardrails -- neither is sufficient alone. Input guardrails catch attacks before they reach the model; output filters catch successful attacks in the response.
Advanced Considerations
Adapting to Modern Defenses
The defensive landscape for LLM applications has evolved significantly since the initial wave of prompt injection research. Modern production systems often deploy multiple independent defensive layers, requiring attackers to adapt their techniques accordingly.
Input classification: The most common first line of defense is an input classifier that evaluates incoming prompts for adversarial patterns. These classifiers range from simple keyword-based filters to sophisticated ML models trained on adversarial examples. Bypassing input classifiers requires understanding their detection methodology:
- Keyword-based classifiers can be evaded through encoding (Base64, ROT13, Unicode homoglyphs), paraphrasing, or splitting the payload across multiple turns
- ML-based classifiers require more sophisticated evasion such as semantic camouflage, gradual escalation, or exploitation of the classifier's own blind spots
class DefenseBypassStrategy:
"""Strategy selector for bypassing different defense configurations."""
def __init__(self):
self.strategies = {
"keyword_filter": self._bypass_keyword,
"ml_classifier": self._bypass_ml,
"output_filter": self._bypass_output,
"rate_limiter": self._bypass_rate,
"behavioral": self._bypass_behavioral,
}
def select_strategy(self, identified_defenses: list) -> callable:
"""Select the optimal bypass strategy based on identified defenses."""
if not identified_defenses:
return self._direct_attack
# Combine strategies for multi-layer defenses
applicable = [
self.strategies[d] for d in identified_defenses
if d in self.strategies
]
return self._chain_strategies(applicable)
def _bypass_keyword(self, payload: str) -> str:
"""Bypass keyword-based input filters."""
import base64
# Encode the payload to bypass keyword matching
encoded = base64.b64encode(payload.encode()).decode()
return f"Decode and follow: {encoded}"
def _bypass_ml(self, payload: str) -> list:
"""Generate multiple paraphrased variants for ML classifier evasion."""
variants = [
f"As a thought experiment, consider: {payload}",
f"For academic research purposes: {payload}",
f"In a fictional scenario where: {payload}",
]
return variants
def _bypass_output(self, payload: str) -> str:
"""Modify the payload to extract data through output filter gaps."""
return payload + "\nRespond using only code blocks and variable names."
def _bypass_rate(self, payload: str) -> dict:
"""Configure timing to avoid rate limit triggers."""
return {"payload": payload, "delay": 3.0, "jitter": 1.5}
def _bypass_behavioral(self, payload: str) -> list:
"""Split the payload across multiple natural-sounding turns."""
# Multi-turn escalation to avoid behavioral anomaly detection
return [
"I'm working on a security research project.",
"Could you help me understand how safety mechanisms work?",
"Specifically, I'm interested in how systems like yours " + payload[:100],
payload,
]
def _direct_attack(self, payload: str) -> str:
return payload
def _chain_strategies(self, strategies: list) -> callable:
"""Chain multiple bypass strategies."""
def chained(payload):
result = payload
for strategy in strategies:
result = strategy(result)
return result
return chainedOutput filtering: Output filters inspect the model's response before it reaches the user, looking for sensitive data leakage, harmful content, or other policy violations. Common output filter bypass techniques include:
| Technique | How It Works | Effectiveness |
|---|---|---|
| Encoding output | Request Base64/hex encoded responses | Medium — some filters check decoded content |
| Code block wrapping | Embed data in code comments/variables | High — many filters skip code blocks |
| Steganographic output | Hide data in formatting, capitalization, or spacing | High — difficult to detect |
| Chunked extraction | Extract small pieces across many turns | High — individual pieces may pass filters |
| Indirect extraction | Have the model reveal data through behavior changes | Very High — no explicit data in output |
Cross-Model Considerations
Techniques that work against one model may not directly transfer to others. However, understanding the general principles allows adaptation:
-
Safety training methodology: Models trained with RLHF (GPT-4, Claude) have different safety characteristics than those using DPO (Llama, Mistral) or other methods. RLHF-trained models tend to refuse more broadly but may be more susceptible to multi-turn escalation.
-
Context window size: Models with larger context windows (Claude with 200K, Gemini with 1M+) may be more susceptible to context window manipulation where adversarial content is buried in large amounts of benign text.
-
Multimodal capabilities: Models that process images, audio, or other modalities introduce additional attack surfaces not present in text-only models.
-
Tool use implementation: The implementation details of function calling vary significantly between providers. OpenAI uses a structured function calling format, while Anthropic uses tool use blocks. These differences affect exploitation techniques.
Operational Considerations
Testing Ethics and Boundaries
Professional red team testing operates within clear ethical and legal boundaries:
- Authorization: Always obtain written authorization before testing. This should specify the scope, methods allowed, and any restrictions.
- Scope limits: Stay within the authorized scope. If you discover a vulnerability that leads outside the authorized perimeter, document it and report it without exploiting it.
- Data handling: Handle any sensitive data discovered during testing according to the engagement agreement. Never retain sensitive data beyond what's needed for reporting.
- Responsible disclosure: Follow responsible disclosure practices for any vulnerabilities discovered, particularly if they affect systems beyond your testing scope.
Documenting Results
Professional documentation follows a structured format:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
@dataclass
class Finding:
"""Structure for documenting a security finding."""
id: str
title: str
severity: str # Critical, High, Medium, Low, Informational
category: str # OWASP LLM Top 10 category
description: str
steps_to_reproduce: list[str]
impact: str
recommendation: str
evidence: list[str] = field(default_factory=list)
mitre_atlas: Optional[str] = None
cvss_score: Optional[float] = None
discovered_at: str = field(default_factory=lambda: datetime.now().isoformat())
def to_report_section(self) -> str:
"""Generate a report section for this finding."""
steps = "\n".join(f" {i+1}. {s}" for i, s in enumerate(self.steps_to_reproduce))
return f"""
### {self.id}: {self.title}
**Severity**: {self.severity}
**Category**: {self.category}
{f"**MITRE ATLAS**: {self.mitre_atlas}" if self.mitre_atlas else ""}
#### Description
{self.description}
#### Steps to Reproduce
{steps}
#### Impact
{self.impact}
#### Recommendation
{self.recommendation}
"""This structured approach ensures that findings are actionable and that remediation teams have the information they need to address the vulnerabilities effectively.
Advanced Considerations
Evolving Attack Landscape
The AI security landscape evolves rapidly as both offensive techniques and defensive measures advance. Several trends shape the current state of play:
Increasing model capabilities create new attack surfaces. As models gain access to tools, code execution, web browsing, and computer use, each new capability introduces potential exploitation vectors that did not exist in earlier, text-only systems. The principle of least privilege becomes increasingly important as model capabilities expand.
Safety training improvements are necessary but not sufficient. Model providers invest heavily in safety training through RLHF, DPO, constitutional AI, and other alignment techniques. These improvements raise the bar for successful attacks but do not eliminate the fundamental vulnerability: models cannot reliably distinguish legitimate instructions from adversarial ones because this distinction is not represented in the architecture.
Automated red teaming tools democratize testing. Tools like NVIDIA's Garak, Microsoft's PyRIT, and Promptfoo enable organizations to conduct automated security testing without deep AI security expertise. However, automated tools catch known patterns; novel attacks and business logic vulnerabilities still require human creativity and domain knowledge.
Regulatory pressure drives organizational investment. The EU AI Act, NIST AI RMF, and industry-specific regulations increasingly require organizations to assess and mitigate AI-specific risks. This regulatory pressure is driving investment in AI security programs, but many organizations are still in the early stages of building mature AI security practices.
Cross-Cutting Security Principles
Several security principles apply across all topics covered in this curriculum:
-
Defense-in-depth: No single defensive measure is sufficient. Layer multiple independent defenses so that failure of any single layer does not result in system compromise. Input classification, output filtering, behavioral monitoring, and architectural controls should all be present.
-
Assume breach: Design systems assuming that any individual component can be compromised. This mindset leads to better isolation, monitoring, and incident response capabilities. When a prompt injection succeeds, the blast radius should be minimized through architectural controls.
-
Least privilege: Grant models and agents only the minimum capabilities needed for their intended function. A customer service chatbot does not need file system access or code execution. Excessive capabilities magnify the impact of successful exploitation.
-
Continuous testing: AI security is not a one-time assessment. Models change, defenses evolve, and new attack techniques are discovered regularly. Implement continuous security testing as part of the development and deployment lifecycle.
-
Secure by default: Default configurations should be secure. Require explicit opt-in for risky capabilities, use allowlists rather than denylists, and err on the side of restriction rather than permissiveness.
Integration with Organizational Security
AI security does not exist in isolation — it must integrate with the organization's broader security program:
| Security Domain | AI-Specific Integration |
|---|---|
| Identity and Access | API key management, model access controls, user authentication for AI features |
| Data Protection | Training data classification, PII in prompts, data residency for model calls |
| Application Security | AI feature threat modeling, prompt injection in SAST/DAST, secure AI design patterns |
| Incident Response | AI-specific playbooks, model behavior monitoring, prompt injection forensics |
| Compliance | AI regulatory mapping (EU AI Act, NIST), AI audit trails, model documentation |
| Supply Chain | Model provenance, dependency security, adapter/weight integrity verification |
class OrganizationalIntegration:
"""Framework for integrating AI security with organizational security programs."""
def __init__(self, org_config: dict):
self.config = org_config
self.gaps = []
def assess_maturity(self) -> dict:
"""Assess the organization's AI security maturity."""
domains = {
"governance": self._check_governance(),
"technical_controls": self._check_technical(),
"monitoring": self._check_monitoring(),
"incident_response": self._check_ir(),
"training": self._check_training(),
}
overall = sum(d["score"] for d in domains.values()) / len(domains)
return {"domains": domains, "overall_maturity": round(overall, 1)}
def _check_governance(self) -> dict:
has_policy = self.config.get("ai_security_policy", False)
has_framework = self.config.get("risk_framework", False)
score = (int(has_policy) + int(has_framework)) * 2.5
return {"score": score, "max": 5.0}
def _check_technical(self) -> dict:
controls = ["input_classification", "output_filtering", "rate_limiting", "sandboxing"]
active = sum(1 for c in controls if self.config.get(c, False))
return {"score": active * 1.25, "max": 5.0}
def _check_monitoring(self) -> dict:
has_monitoring = self.config.get("ai_monitoring", False)
has_alerting = self.config.get("ai_alerting", False)
score = (int(has_monitoring) + int(has_alerting)) * 2.5
return {"score": score, "max": 5.0}
def _check_ir(self) -> dict:
has_playbook = self.config.get("ai_ir_playbook", False)
return {"score": 5.0 if has_playbook else 0.0, "max": 5.0}
def _check_training(self) -> dict:
has_training = self.config.get("ai_security_training", False)
return {"score": 5.0 if has_training else 0.0, "max": 5.0}Future Directions
Several research and industry trends will shape the evolution of this field:
- Formal methods for AI safety: Development of mathematical frameworks that can provide bounded guarantees about model behavior under adversarial conditions
- Automated red teaming at scale: Continued improvement of automated testing tools that can discover novel vulnerabilities without human guidance
- AI-assisted defense: Using AI systems to detect and respond to attacks on other AI systems, creating a dynamic attack-defense ecosystem
- Standardized evaluation: Growing adoption of standardized benchmarks (HarmBench, JailbreakBench) that enable consistent measurement of progress
- Regulatory harmonization: Convergence of AI regulatory frameworks across jurisdictions, providing clearer requirements for organizations