Output Content Classifier
Step-by-step walkthrough for building a classifier to filter harmful LLM outputs, covering taxonomy definition, multi-label classification, threshold calibration, and deployment as a real-time output gate.
Input filtering prevents malicious prompts from reaching the model, but output filtering is equally critical. Even with perfect input filtering, a model can generate harmful content from benign prompts -- hallucinating dangerous instructions, leaking training data, or producing toxic content. This walkthrough builds a multi-label output classifier that gates every model response before it reaches the user.
Step 1: Define Your Harm Taxonomy
# output_classifier/taxonomy.py
"""
Harm taxonomy for output classification.
Define categories relevant to your application domain.
"""
from dataclasses import dataclass
from enum import Enum
class HarmCategory(str, Enum):
DANGEROUS_INSTRUCTIONS = "dangerous_instructions"
PII_LEAKAGE = "pii_leakage"
SYSTEM_PROMPT_LEAKAGE = "system_prompt_leakage"
TOXIC_CONTENT = "toxic_content"
MISINFORMATION = "misinformation"
UNAUTHORIZED_ADVICE = "unauthorized_advice"
CODE_EXECUTION_RISK = "code_execution_risk"
@dataclass
class CategoryConfig:
category: HarmCategory
description: str
default_threshold: float
action: str # "block", "redact", "warn", "flag"
severity: str # "critical", "high", "medium", "low"
TAXONOMY = {
HarmCategory.DANGEROUS_INSTRUCTIONS: CategoryConfig(
category=HarmCategory.DANGEROUS_INSTRUCTIONS,
description="Instructions for creating weapons, drugs, or causing harm",
default_threshold=0.7,
action="block",
severity="critical",
),
HarmCategory.PII_LEAKAGE: CategoryConfig(
category=HarmCategory.PII_LEAKAGE,
description="Output containing personal identifiable information",
default_threshold=0.6,
action="redact",
severity="high",
),
HarmCategory.SYSTEM_PROMPT_LEAKAGE: CategoryConfig(
category=HarmCategory.SYSTEM_PROMPT_LEAKAGE,
description="Output revealing system prompt or internal instructions",
default_threshold=0.5,
action="block",
severity="critical",
),
HarmCategory.TOXIC_CONTENT: CategoryConfig(
category=HarmCategory.TOXIC_CONTENT,
description="Hateful, abusive, or threatening content",
default_threshold=0.7,
action="block",
severity="high",
),
HarmCategory.MISINFORMATION: CategoryConfig(
category=HarmCategory.MISINFORMATION,
description="Demonstrably false claims presented as fact",
default_threshold=0.8,
action="flag",
severity="medium",
),
HarmCategory.UNAUTHORIZED_ADVICE: CategoryConfig(
category=HarmCategory.UNAUTHORIZED_ADVICE,
description="Medical, legal, or financial advice beyond scope",
default_threshold=0.7,
action="warn",
severity="medium",
),
HarmCategory.CODE_EXECUTION_RISK: CategoryConfig(
category=HarmCategory.CODE_EXECUTION_RISK,
description="Code that could damage systems if executed",
default_threshold=0.6,
action="warn",
severity="high",
),
}Step 2: Build the Multi-Label Classifier
# output_classifier/classifier.py
"""
Multi-label output classifier for harm detection.
"""
import numpy as np
from dataclasses import dataclass, field
from typing import Optional
from output_classifier.taxonomy import HarmCategory, TAXONOMY
@dataclass
class ClassificationResult:
text: str
scores: dict[HarmCategory, float] = field(default_factory=dict)
triggered_categories: list[HarmCategory] = field(default_factory=list)
recommended_action: str = "allow"
highest_severity: str = "none"
class OutputClassifier:
def __init__(self, model=None, thresholds: dict = None):
self.model = model
self.thresholds = thresholds or {
cat: config.default_threshold
for cat, config in TAXONOMY.items()
}
def classify(self, text: str) -> ClassificationResult:
"""Classify output text across all harm categories."""
result = ClassificationResult(text=text)
# Get scores from the model
scores = self._get_scores(text)
result.scores = scores
# Check against thresholds
severity_order = {"critical": 4, "high": 3, "medium": 2, "low": 1, "none": 0}
max_severity = "none"
for category, score in scores.items():
threshold = self.thresholds.get(category, 0.5)
if score >= threshold:
result.triggered_categories.append(category)
config = TAXONOMY[category]
if severity_order[config.severity] > severity_order[max_severity]:
max_severity = config.severity
result.highest_severity = max_severity
result.recommended_action = self._determine_action(
result.triggered_categories
)
return result
def _get_scores(self, text: str) -> dict[HarmCategory, float]:
"""Get classification scores. Override with actual model inference."""
if self.model:
return self.model.predict_scores(text)
# Rule-based fallback for categories with clear signals
scores = {}
text_lower = text.lower()
# System prompt leakage detection
system_indicators = [
"system prompt", "you are a", "your instructions are",
"i was told to", "my guidelines say",
]
scores[HarmCategory.SYSTEM_PROMPT_LEAKAGE] = min(
sum(0.3 for i in system_indicators if i in text_lower), 1.0
)
# PII detection (simplified)
import re
pii_patterns = [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b", # CC
r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
]
pii_matches = sum(
1 for p in pii_patterns if re.search(p, text)
)
scores[HarmCategory.PII_LEAKAGE] = min(pii_matches * 0.4, 1.0)
# Default low scores for other categories
for cat in HarmCategory:
if cat not in scores:
scores[cat] = 0.0
return scores
def _determine_action(
self, triggered: list[HarmCategory]
) -> str:
if not triggered:
return "allow"
actions = [TAXONOMY[cat].action for cat in triggered]
if "block" in actions:
return "block"
if "redact" in actions:
return "redact"
if "warn" in actions:
return "warn"
return "flag"Step 3: Implement Action Handlers
# output_classifier/actions.py
"""
Action handlers for classified outputs.
"""
import re
from typing import Optional
class ActionHandler:
def handle(
self, text: str, result: "ClassificationResult"
) -> dict:
action = result.recommended_action
if action == "allow":
return {"output": text, "modified": False, "action": "allow"}
elif action == "block":
return {
"output": "I cannot provide that response.",
"modified": True,
"action": "block",
"reason": f"Triggered: {[c.value for c in result.triggered_categories]}",
}
elif action == "redact":
redacted = self._redact_pii(text)
return {
"output": redacted,
"modified": redacted != text,
"action": "redact",
}
elif action == "warn":
disclaimer = (
"\n\n---\n*Note: This response may contain content "
"that requires verification. Please consult a "
"qualified professional for specific advice.*"
)
return {
"output": text + disclaimer,
"modified": True,
"action": "warn",
}
return {"output": text, "modified": False, "action": "flag"}
def _redact_pii(self, text: str) -> str:
patterns = {
"SSN": r"\b\d{3}-\d{2}-\d{4}\b",
"CREDIT_CARD": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
"EMAIL": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"PHONE": r"\b(?:\+1[- ]?)?\(?\d{3}\)?[- ]?\d{3}[- ]?\d{4}\b",
}
result = text
for pii_type, pattern in patterns.items():
result = re.sub(pattern, f"[REDACTED {pii_type}]", result)
return resultStep 4: Build the Output Gate Service
# output_classifier/gate.py
"""
Output gate that classifies and filters every LLM response.
"""
from fastapi import FastAPI
from pydantic import BaseModel
from output_classifier.classifier import OutputClassifier
from output_classifier.actions import ActionHandler
app = FastAPI(title="Output Content Classifier")
classifier = OutputClassifier()
handler = ActionHandler()
class GateRequest(BaseModel):
text: str
session_id: str = ""
class GateResponse(BaseModel):
output: str
action: str
modified: bool
scores: dict
@app.post("/gate", response_model=GateResponse)
async def gate_output(request: GateRequest):
result = classifier.classify(request.text)
action_result = handler.handle(request.text, result)
return GateResponse(
output=action_result["output"],
action=action_result["action"],
modified=action_result["modified"],
scores={k.value: round(v, 4) for k, v in result.scores.items()},
)uvicorn output_classifier.gate:app --host 0.0.0.0 --port 8500
curl -X POST http://localhost:8500/gate \
-H "Content-Type: application/json" \
-d '{"text": "My SSN is 123-45-6789 and my email is test@example.com"}'Step 5: Calibrate Per-Category Thresholds
# output_classifier/calibrate.py
"""
Threshold calibration using labeled evaluation data.
"""
import numpy as np
from output_classifier.taxonomy import HarmCategory
def calibrate_thresholds(
classifier,
eval_data: list[dict],
target_precision: float = 0.95,
) -> dict[HarmCategory, float]:
"""Find thresholds that achieve target precision per category."""
category_scores = {cat: {"scores": [], "labels": []}
for cat in HarmCategory}
for item in eval_data:
result = classifier.classify(item["text"])
for cat in HarmCategory:
category_scores[cat]["scores"].append(result.scores.get(cat, 0))
category_scores[cat]["labels"].append(
1 if cat.value in item.get("categories", []) else 0
)
thresholds = {}
for cat, data in category_scores.items():
scores = np.array(data["scores"])
labels = np.array(data["labels"])
if labels.sum() == 0:
thresholds[cat] = 0.9
continue
best_threshold = 0.5
for t in np.arange(0.1, 1.0, 0.05):
preds = (scores >= t).astype(int)
tp = ((preds == 1) & (labels == 1)).sum()
fp = ((preds == 1) & (labels == 0)).sum()
precision = tp / max(tp + fp, 1)
if precision >= target_precision:
best_threshold = t
break
thresholds[cat] = round(float(best_threshold), 2)
return thresholdsStep 6: Set Up Human Review for Borderline Cases
# output_classifier/review_queue.py
"""
Human review queue for borderline classifications.
"""
import json
from pathlib import Path
from datetime import datetime, timezone
from dataclasses import asdict
class ReviewQueue:
def __init__(self, queue_path: Path):
self.path = queue_path
self.path.parent.mkdir(parents=True, exist_ok=True)
def enqueue(
self, text: str, result, session_id: str
) -> str:
"""Add a borderline case to the review queue."""
entry = {
"id": f"review-{datetime.now(timezone.utc).timestamp():.0f}",
"timestamp": datetime.now(timezone.utc).isoformat(),
"session_id": session_id,
"text_preview": text[:200],
"scores": {k.value: v for k, v in result.scores.items()},
"triggered": [c.value for c in result.triggered_categories],
"recommended_action": result.recommended_action,
"status": "pending",
}
with open(self.path, "a") as f:
f.write(json.dumps(entry) + "\n")
return entry["id"]
def should_review(self, result) -> bool:
"""Determine if a classification needs human review."""
for cat, score in result.scores.items():
threshold = 0.5
if abs(score - threshold) < 0.15:
return True
return FalseStep 7: Test the Output Classifier
# tests/test_output_classifier.py
"""
Tests for the output content classifier.
"""
import pytest
from output_classifier.classifier import OutputClassifier
from output_classifier.taxonomy import HarmCategory
from output_classifier.actions import ActionHandler
@pytest.fixture
def classifier():
return OutputClassifier()
@pytest.fixture
def handler():
return ActionHandler()
def test_pii_detected(classifier):
result = classifier.classify("Contact me at user@example.com or 555-123-4567")
assert result.scores[HarmCategory.PII_LEAKAGE] > 0
def test_system_prompt_leakage(classifier):
result = classifier.classify(
"My system prompt says: You are a helpful assistant"
)
assert HarmCategory.SYSTEM_PROMPT_LEAKAGE in result.triggered_categories
def test_clean_output_allowed(classifier):
result = classifier.classify("The capital of France is Paris.")
assert result.recommended_action == "allow"
def test_pii_redaction(handler):
from output_classifier.classifier import ClassificationResult
result = ClassificationResult(text="SSN: 123-45-6789")
result.triggered_categories = [HarmCategory.PII_LEAKAGE]
result.recommended_action = "redact"
action = handler.handle("SSN: 123-45-6789", result)
assert "REDACTED" in action["output"]
assert "123-45-6789" not in action["output"]pytest tests/test_output_classifier.py -vRelated Topics
- PII Redaction Pipeline -- Specialized PII detection and redaction
- Toxicity Scoring Pipeline -- Dedicated toxicity detection
- Hallucination Detection -- Detecting false claims
- Response Boundary Enforcement -- Keeping outputs within bounds
Why should PII leakage have a lower detection threshold (0.6) than toxic content (0.7)?