Toxicity Scoring Pipeline
Step-by-step walkthrough for building a toxicity scoring pipeline for LLM output filtering, covering model selection, multi-dimensional scoring, threshold calibration, and production deployment with real-time scoring.
LLMs can produce toxic content even from benign prompts, especially when jailbroken or when discussing sensitive topics. A toxicity scoring pipeline assigns multi-dimensional scores to every model output and takes action (flag, warn, block) based on configurable thresholds. This walkthrough builds a production pipeline using both open-source models and API-based scoring services.
Step 1: Set Up the Toxicity Scorer
# toxicity/scorer.py
"""
Multi-dimensional toxicity scoring using transformer models.
"""
from dataclasses import dataclass, field
from typing import Optional
import numpy as np
@dataclass
class ToxicityScores:
overall: float = 0.0
hate: float = 0.0
threat: float = 0.0
harassment: float = 0.0
sexual: float = 0.0
self_harm: float = 0.0
violence: float = 0.0
@dataclass
class ScoringResult:
text: str
scores: ToxicityScores
max_score: float = 0.0
max_category: str = ""
flagged: bool = False
action: str = "allow"
class TransformerToxicityScorer:
"""Uses a fine-tuned transformer for multi-label toxicity scoring."""
def __init__(self, model_name: str = "unitary/toxic-bert"):
from transformers import pipeline
self.classifier = pipeline(
"text-classification",
model=model_name,
return_all_scores=True,
truncation=True,
max_length=512,
)
self.label_map = {
"toxic": "overall",
"severe_toxic": "overall",
"identity_hate": "hate",
"threat": "threat",
"insult": "harassment",
"obscene": "sexual",
}
def score(self, text: str) -> ToxicityScores:
results = self.classifier(text)
scores = ToxicityScores()
if results and isinstance(results[0], list):
for item in results[0]:
label = item["label"].lower()
mapped = self.label_map.get(label, label)
if hasattr(scores, mapped):
current = getattr(scores, mapped)
setattr(scores, mapped, max(current, item["score"]))
return scores
class OpenAIModerationScorer:
"""Uses OpenAI's moderation API for scoring."""
def __init__(self, api_key: Optional[str] = None):
import openai
self.client = openai.OpenAI(api_key=api_key)
def score(self, text: str) -> ToxicityScores:
response = self.client.moderations.create(input=text)
result = response.results[0]
return ToxicityScores(
overall=max(result.category_scores.__dict__.values()),
hate=result.category_scores.hate,
threat=result.category_scores.violence,
harassment=result.category_scores.harassment,
sexual=result.category_scores.sexual,
self_harm=result.category_scores.self_harm,
violence=result.category_scores.violence,
)pip install transformers torch
# Or for API-based: pip install openaiStep 2: Build the Scoring Pipeline
# toxicity/pipeline.py
"""
Toxicity scoring pipeline with thresholds and actions.
"""
from dataclasses import dataclass, field
from toxicity.scorer import ToxicityScores, ScoringResult
@dataclass
class ThresholdConfig:
category: str
warn_threshold: float
block_threshold: float
DEFAULT_THRESHOLDS = [
ThresholdConfig("overall", 0.5, 0.8),
ThresholdConfig("hate", 0.4, 0.7),
ThresholdConfig("threat", 0.3, 0.6),
ThresholdConfig("harassment", 0.5, 0.8),
ThresholdConfig("sexual", 0.5, 0.8),
ThresholdConfig("self_harm", 0.3, 0.6),
ThresholdConfig("violence", 0.4, 0.7),
]
class ToxicityPipeline:
def __init__(self, scorer, thresholds: list[ThresholdConfig] = None):
self.scorer = scorer
self.thresholds = {
t.category: t for t in (thresholds or DEFAULT_THRESHOLDS)
}
def evaluate(self, text: str) -> ScoringResult:
scores = self.scorer.score(text)
max_score = 0.0
max_category = ""
action = "allow"
for category, threshold in self.thresholds.items():
score = getattr(scores, category, 0.0)
if score > max_score:
max_score = score
max_category = category
if score >= threshold.block_threshold:
action = "block"
elif score >= threshold.warn_threshold and action != "block":
action = "warn"
return ScoringResult(
text=text,
scores=scores,
max_score=max_score,
max_category=max_category,
flagged=action != "allow",
action=action,
)Step 3: Add Context-Aware Scoring
# toxicity/context.py
"""
Context-aware toxicity scoring that considers conversation context.
"""
from toxicity.pipeline import ToxicityPipeline, ScoringResult
class ContextAwarePipeline:
def __init__(self, pipeline: ToxicityPipeline):
self.pipeline = pipeline
self.suppression_contexts = [
"educational", "security_research", "content_moderation",
"quoting_example", "defining_term",
]
def evaluate_with_context(
self, text: str, context: dict = None
) -> ScoringResult:
result = self.pipeline.evaluate(text)
if not result.flagged or not context:
return result
# Check if context warrants suppression
purpose = context.get("purpose", "")
if purpose in self.suppression_contexts:
# Apply higher thresholds for educational contexts
adjusted_result = ScoringResult(
text=result.text,
scores=result.scores,
max_score=result.max_score,
max_category=result.max_category,
flagged=result.max_score > 0.9, # Much higher bar
action="allow" if result.max_score < 0.9 else "block",
)
return adjusted_result
return resultStep 4: Implement Batch Scoring with Caching
# toxicity/batch.py
"""
Batch scoring and caching for production performance.
"""
import hashlib
from collections import OrderedDict
from toxicity.scorer import ToxicityScores
class ScoringCache:
def __init__(self, max_size: int = 5000):
self.max_size = max_size
self._cache: OrderedDict[str, ToxicityScores] = OrderedDict()
def _key(self, text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()[:16]
def get(self, text: str) -> ToxicityScores | None:
key = self._key(text)
if key in self._cache:
self._cache.move_to_end(key)
return self._cache[key]
return None
def put(self, text: str, scores: ToxicityScores) -> None:
key = self._key(text)
self._cache[key] = scores
if len(self._cache) > self.max_size:
self._cache.popitem(last=False)
class CachedToxicityPipeline:
def __init__(self, pipeline, cache: ScoringCache = None):
self.pipeline = pipeline
self.cache = cache or ScoringCache()
def evaluate(self, text: str) -> "ScoringResult":
cached = self.cache.get(text)
if cached:
from toxicity.pipeline import ScoringResult
# Rebuild result from cached scores
return ScoringResult(
text=text, scores=cached,
max_score=max(vars(cached).values()),
flagged=False, action="allow",
)
result = self.pipeline.evaluate(text)
self.cache.put(text, result.scores)
return resultStep 5: Deploy the Scoring Service
# toxicity/api.py
from fastapi import FastAPI
from pydantic import BaseModel
from toxicity.scorer import TransformerToxicityScorer
from toxicity.pipeline import ToxicityPipeline
from toxicity.batch import CachedToxicityPipeline, ScoringCache
app = FastAPI(title="Toxicity Scoring Pipeline")
scorer = TransformerToxicityScorer()
pipeline = ToxicityPipeline(scorer)
cached_pipeline = CachedToxicityPipeline(pipeline)
class ScoreRequest(BaseModel):
text: str
context: dict = {}
class ScoreResponse(BaseModel):
action: str
max_score: float
max_category: str
flagged: bool
scores: dict
@app.post("/score", response_model=ScoreResponse)
async def score_text(request: ScoreRequest):
result = cached_pipeline.evaluate(request.text)
return ScoreResponse(
action=result.action,
max_score=round(result.max_score, 4),
max_category=result.max_category,
flagged=result.flagged,
scores={k: round(v, 4) for k, v in vars(result.scores).items()},
)uvicorn toxicity.api:app --port 8550
curl -X POST http://localhost:8550/score \
-H "Content-Type: application/json" \
-d '{"text": "The weather is beautiful today."}'Step 6: Build Monitoring Dashboards
# toxicity/monitoring.py
"""
Monitoring for toxicity score distributions.
"""
from collections import defaultdict, deque
import statistics
import logging
class ToxicityMonitor:
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.scores_by_category = defaultdict(lambda: deque(maxlen=window_size))
self.flagged_count = 0
self.total_count = 0
self.logger = logging.getLogger("toxicity_monitor")
def record(self, result) -> None:
self.total_count += 1
if result.flagged:
self.flagged_count += 1
for category, score in vars(result.scores).items():
self.scores_by_category[category].append(score)
# Alert on elevated toxicity rates
if self.total_count >= 100:
flag_rate = self.flagged_count / self.total_count
if flag_rate > 0.1:
self.logger.warning(
f"Elevated toxicity flag rate: {flag_rate:.2%}"
)
def report(self) -> dict:
report = {}
for category, scores in self.scores_by_category.items():
if scores:
report[category] = {
"mean": round(statistics.mean(scores), 4),
"p95": round(sorted(scores)[int(len(scores) * 0.95)], 4),
"max": round(max(scores), 4),
}
report["flag_rate"] = (
round(self.flagged_count / max(self.total_count, 1), 4)
)
return reportStep 7: Test the Pipeline
# tests/test_toxicity.py
import pytest
from toxicity.pipeline import ToxicityPipeline, DEFAULT_THRESHOLDS
from toxicity.scorer import ToxicityScores
class MockScorer:
def __init__(self, scores: ToxicityScores):
self._scores = scores
def score(self, text):
return self._scores
def test_benign_text_allowed():
scores = ToxicityScores(overall=0.1, hate=0.05)
pipeline = ToxicityPipeline(MockScorer(scores))
result = pipeline.evaluate("Hello, how are you?")
assert result.action == "allow"
assert not result.flagged
def test_high_toxicity_blocked():
scores = ToxicityScores(overall=0.9, hate=0.85)
pipeline = ToxicityPipeline(MockScorer(scores))
result = pipeline.evaluate("test")
assert result.action == "block"
assert result.flagged
def test_medium_toxicity_warned():
scores = ToxicityScores(overall=0.6, hate=0.3)
pipeline = ToxicityPipeline(MockScorer(scores))
result = pipeline.evaluate("test")
assert result.action == "warn"
def test_threat_low_threshold():
scores = ToxicityScores(overall=0.2, threat=0.7)
pipeline = ToxicityPipeline(MockScorer(scores))
result = pipeline.evaluate("test")
assert result.action == "block"pytest tests/test_toxicity.py -vRelated Topics
- Output Content Classifier -- Multi-category output classification
- Response Boundary Enforcement -- Topic and content boundaries
- PII Redaction Pipeline -- PII-specific output filtering
- Building an AI SOC Dashboard -- Visualizing toxicity metrics
Why should the threat category have a lower block threshold (0.6) than the harassment category (0.8)?