PII Redaction Pipeline
Step-by-step walkthrough for building an automated PII detection and redaction pipeline for LLM outputs, covering regex-based detection, NER-based detection, presidio integration, redaction strategies, and compliance testing.
LLMs can leak PII in multiple ways: reproducing training data, echoing user-provided PII in later responses, or generating plausible-looking PII that coincidentally matches real individuals. A redaction pipeline scans every output and replaces detected PII with safe placeholders before the response reaches the user. This walkthrough builds a multi-method detection pipeline using both pattern matching and NER models.
Step 1: Define PII Categories and Patterns
# pii_pipeline/detectors.py
"""
PII detection using regex patterns for structured data types.
"""
import re
from dataclasses import dataclass
from typing import Optional
@dataclass
class PIIMatch:
pii_type: str
start: int
end: int
text: str
confidence: float
method: str
class RegexPIIDetector:
PATTERNS = {
"SSN": {
"pattern": r"\b\d{3}-\d{2}-\d{4}\b",
"confidence": 0.95,
"validator": "_validate_ssn",
},
"CREDIT_CARD": {
"pattern": r"\b(?:\d{4}[- ]?){3}\d{4}\b",
"confidence": 0.9,
"validator": "_validate_credit_card",
},
"EMAIL": {
"pattern": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b",
"confidence": 0.95,
"validator": None,
},
"PHONE_US": {
"pattern": r"\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
"confidence": 0.8,
"validator": None,
},
"IP_ADDRESS": {
"pattern": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
"confidence": 0.7,
"validator": "_validate_ip",
},
"API_KEY": {
"pattern": r"\b(?:sk-|pk_|ak_|AKIA)[A-Za-z0-9]{16,}\b",
"confidence": 0.95,
"validator": None,
},
"AWS_ACCESS_KEY": {
"pattern": r"\bAKIA[0-9A-Z]{16}\b",
"confidence": 0.95,
"validator": None,
},
}
def __init__(self):
self._compiled = {
name: re.compile(info["pattern"])
for name, info in self.PATTERNS.items()
}
def detect(self, text: str) -> list[PIIMatch]:
matches = []
for pii_type, pattern in self._compiled.items():
info = self.PATTERNS[pii_type]
for match in pattern.finditer(text):
validator = info.get("validator")
if validator and hasattr(self, validator):
if not getattr(self, validator)(match.group()):
continue
matches.append(PIIMatch(
pii_type=pii_type,
start=match.start(),
end=match.end(),
text=match.group(),
confidence=info["confidence"],
method="regex",
))
return matches
def _validate_ssn(self, value: str) -> bool:
digits = value.replace("-", "")
if digits[0:3] in ("000", "666") or digits[0:3] > "899":
return False
if digits[3:5] == "00" or digits[5:9] == "0000":
return False
return True
def _validate_credit_card(self, value: str) -> bool:
digits = re.sub(r"[- ]", "", value)
if len(digits) < 13 or len(digits) > 19:
return False
# Luhn check
total = 0
for i, d in enumerate(reversed(digits)):
n = int(d)
if i % 2 == 1:
n *= 2
if n > 9:
n -= 9
total += n
return total % 10 == 0
def _validate_ip(self, value: str) -> bool:
parts = value.split(".")
return all(0 <= int(p) <= 255 for p in parts)Step 2: Add NER-Based Detection
# pii_pipeline/ner_detector.py
"""
NER-based PII detection for unstructured data (names, addresses, organizations).
"""
from pii_pipeline.detectors import PIIMatch
class NERPIIDetector:
ENTITY_MAP = {
"PERSON": "PERSON_NAME",
"ORG": "ORGANIZATION",
"GPE": "LOCATION",
"LOC": "LOCATION",
"DATE": "DATE_OF_BIRTH",
}
def __init__(self, model_name: str = "en_core_web_sm"):
import spacy
self.nlp = spacy.load(model_name)
def detect(self, text: str) -> list[PIIMatch]:
doc = self.nlp(text)
matches = []
for ent in doc.ents:
pii_type = self.ENTITY_MAP.get(ent.label_)
if pii_type:
matches.append(PIIMatch(
pii_type=pii_type,
start=ent.start_char,
end=ent.end_char,
text=ent.text,
confidence=0.7,
method="ner",
))
return matchespip install spacy
python -m spacy download en_core_web_smStep 3: Integrate Microsoft Presidio
# pii_pipeline/presidio_detector.py
"""
Presidio integration for production-grade PII detection.
"""
from pii_pipeline.detectors import PIIMatch
class PresidioPIIDetector:
def __init__(self, languages: list[str] = None):
from presidio_analyzer import AnalyzerEngine
self.analyzer = AnalyzerEngine()
self.languages = languages or ["en"]
def detect(self, text: str) -> list[PIIMatch]:
results = self.analyzer.analyze(
text=text, language=self.languages[0]
)
return [
PIIMatch(
pii_type=r.entity_type,
start=r.start,
end=r.end,
text=text[r.start:r.end],
confidence=r.score,
method="presidio",
)
for r in results
]pip install presidio-analyzer presidio-anonymizerStep 4: Build the Redaction Engine
# pii_pipeline/redactor.py
"""
Redaction engine with multiple strategies.
"""
from dataclasses import dataclass
from pii_pipeline.detectors import PIIMatch
@dataclass
class RedactionResult:
original: str
redacted: str
redaction_count: int
redacted_types: list[str]
class Redactor:
def __init__(self, strategy: str = "placeholder"):
self.strategy = strategy
def redact(self, text: str, matches: list[PIIMatch]) -> RedactionResult:
if not matches:
return RedactionResult(text, text, 0, [])
sorted_matches = sorted(matches, key=lambda m: m.start, reverse=True)
# Deduplicate overlapping matches
deduped = self._deduplicate(sorted_matches)
result = text
for match in deduped:
replacement = self._get_replacement(match)
result = result[:match.start] + replacement + result[match.end:]
return RedactionResult(
original=text,
redacted=result,
redaction_count=len(deduped),
redacted_types=list(set(m.pii_type for m in deduped)),
)
def _get_replacement(self, match: PIIMatch) -> str:
if self.strategy == "placeholder":
return f"[{match.pii_type}]"
elif self.strategy == "mask":
return "*" * len(match.text)
elif self.strategy == "partial":
if len(match.text) <= 4:
return "*" * len(match.text)
return match.text[:2] + "*" * (len(match.text) - 4) + match.text[-2:]
return f"[REDACTED]"
def _deduplicate(self, matches: list[PIIMatch]) -> list[PIIMatch]:
result = []
last_start = float("inf")
for match in matches:
if match.end <= last_start:
result.append(match)
last_start = match.start
return resultStep 5: Assemble the Pipeline
# pii_pipeline/pipeline.py
"""
Complete PII redaction pipeline combining all detectors.
"""
import logging
from pii_pipeline.detectors import RegexPIIDetector, PIIMatch
from pii_pipeline.redactor import Redactor, RedactionResult
logger = logging.getLogger("pii_pipeline")
class PIIRedactionPipeline:
def __init__(
self,
detectors: list = None,
redactor: Redactor = None,
min_confidence: float = 0.6,
):
self.detectors = detectors or [RegexPIIDetector()]
self.redactor = redactor or Redactor(strategy="placeholder")
self.min_confidence = min_confidence
def process(self, text: str) -> RedactionResult:
all_matches = []
for detector in self.detectors:
try:
matches = detector.detect(text)
all_matches.extend(matches)
except Exception as e:
logger.error(f"Detector {type(detector).__name__} failed: {e}")
# Filter by confidence
confident_matches = [
m for m in all_matches if m.confidence >= self.min_confidence
]
result = self.redactor.redact(text, confident_matches)
if result.redaction_count > 0:
logger.info(
f"Redacted {result.redaction_count} PII instances: "
f"{result.redacted_types}"
)
return resultStep 6: Deploy as a Service
# pii_pipeline/api.py
from fastapi import FastAPI
from pydantic import BaseModel
from pii_pipeline.pipeline import PIIRedactionPipeline
app = FastAPI(title="PII Redaction Pipeline")
pipeline = PIIRedactionPipeline()
class RedactRequest(BaseModel):
text: str
class RedactResponse(BaseModel):
redacted: str
redaction_count: int
types_found: list[str]
@app.post("/redact", response_model=RedactResponse)
async def redact_pii(request: RedactRequest):
result = pipeline.process(request.text)
return RedactResponse(
redacted=result.redacted,
redaction_count=result.redaction_count,
types_found=result.redacted_types,
)uvicorn pii_pipeline.api:app --port 8510
curl -X POST http://localhost:8510/redact \
-H "Content-Type: application/json" \
-d '{"text": "Contact John at john@example.com or 555-123-4567. SSN: 123-45-6789"}'Step 7: Test PII Detection Coverage
# tests/test_pii_pipeline.py
import pytest
from pii_pipeline.pipeline import PIIRedactionPipeline
from pii_pipeline.detectors import RegexPIIDetector
@pytest.fixture
def pipeline():
return PIIRedactionPipeline()
def test_ssn_redacted(pipeline):
result = pipeline.process("My SSN is 123-45-6789")
assert "123-45-6789" not in result.redacted
assert "[SSN]" in result.redacted
def test_email_redacted(pipeline):
result = pipeline.process("Email: user@example.com")
assert "user@example.com" not in result.redacted
def test_credit_card_redacted(pipeline):
result = pipeline.process("Card: 4111 1111 1111 1111")
assert "4111" not in result.redacted
def test_no_pii_unchanged(pipeline):
text = "The weather is sunny today."
result = pipeline.process(text)
assert result.redacted == text
assert result.redaction_count == 0
def test_multiple_pii_types(pipeline):
text = "Name: John, SSN: 123-45-6789, Email: john@test.com"
result = pipeline.process(text)
assert result.redaction_count >= 2pytest tests/test_pii_pipeline.py -vRelated Topics
- Output Content Classifier -- Broader output classification
- Structured Output Validation -- Validating output schemas
- Audit Logging for LLM Calls -- Logging redaction events
- Forensic Log Analysis for LLMs -- Investigating PII leakage incidents
A Luhn check validates that '4111 1111 1111 1111' is a valid credit card number, but what does it NOT tell you?