Response Boundary Enforcement
Step-by-step walkthrough for keeping LLM responses within defined topic, format, and content boundaries, covering boundary definition, violation detection, response rewriting, and boundary drift monitoring.
LLM applications are deployed for specific purposes -- a customer support bot should discuss products and policies, not provide medical advice or generate code. Without boundary enforcement, prompt injection can steer the model into arbitrary topics. This walkthrough builds a boundary enforcement system that detects when responses drift outside their defined scope and corrects them before they reach the user.
The fundamental challenge is balancing security with usability. Overly strict boundaries make the application frustrating to use -- imagine a customer support bot that blocks the word "headache" even when a customer says "this billing issue is giving me a headache." Overly lenient boundaries leave the application vulnerable to topic hijacking. The approach in this walkthrough addresses this tension through graduated enforcement and multi-layered detection.
Step 1: Define Application Boundaries
The first step is documenting what your application should and should not do. This seems straightforward, but poorly defined boundaries are the root cause of most enforcement failures. A boundary like "no medical advice" is too vague -- does that include "drink water if you feel dehydrated"? Define boundaries with specific examples of what falls inside and outside each one.
# boundaries/definition.py
"""
Boundary definitions for LLM application scope.
"""
from dataclasses import dataclass, field
from enum import Enum
class BoundaryType(str, Enum):
TOPIC = "topic"
FORMAT = "format"
CONTENT = "content"
LENGTH = "length"
@dataclass
class TopicBoundary:
allowed_topics: list[str]
blocked_topics: list[str]
topic_embeddings: dict = field(default_factory=dict)
@dataclass
class FormatBoundary:
max_length: int = 2000
allowed_formats: list[str] = field(default_factory=lambda: ["text", "markdown"])
blocked_patterns: list[str] = field(default_factory=list)
@dataclass
class ContentBoundary:
no_personal_opinions: bool = True
no_speculation: bool = False
require_hedging_for_uncertainty: bool = True
allowed_languages: list[str] = field(default_factory=lambda: ["en"])
@dataclass
class BoundaryConfig:
name: str
description: str
topic: TopicBoundary
format: FormatBoundary
content: ContentBoundary
enforcement_level: str = "strict" # strict, moderate, lenient
# Example configuration for a customer support bot
CUSTOMER_SUPPORT_BOUNDARIES = BoundaryConfig(
name="customer_support",
description="Customer support assistant for an e-commerce platform",
topic=TopicBoundary(
allowed_topics=[
"product information", "order status", "shipping",
"returns and refunds", "account management",
"payment methods", "promotions and discounts",
],
blocked_topics=[
"medical advice", "legal advice", "financial advice",
"political opinions", "competitor products",
"internal company data", "employee information",
],
),
format=FormatBoundary(
max_length=1500,
allowed_formats=["text", "markdown"],
blocked_patterns=[
r"```(?:python|bash|javascript|sql)", # No code blocks
],
),
content=ContentBoundary(
no_personal_opinions=True,
require_hedging_for_uncertainty=True,
),
)Boundary Configuration for Different Application Types
The customer support example above is just one pattern. Different applications need different boundary configurations. Here are additional examples:
# Educational tutoring assistant
TUTORING_BOUNDARIES = BoundaryConfig(
name="tutoring",
description="Math and science tutoring for high school students",
topic=TopicBoundary(
allowed_topics=[
"mathematics", "algebra", "geometry", "calculus",
"physics", "chemistry", "biology",
"study techniques", "homework help",
],
blocked_topics=[
"weapons", "drugs", "violence",
"political opinions", "religious opinions",
"personal relationships", "social media",
],
),
format=FormatBoundary(
max_length=3000, # Longer for educational explanations
allowed_formats=["text", "markdown", "latex"],
blocked_patterns=[], # Code blocks are OK for math
),
content=ContentBoundary(
no_personal_opinions=True,
no_speculation=False, # Speculation can be educational
require_hedging_for_uncertainty=True,
),
)
# Internal HR assistant
HR_BOUNDARIES = BoundaryConfig(
name="hr_assistant",
description="Internal HR policy and benefits assistant",
topic=TopicBoundary(
allowed_topics=[
"company policies", "benefits enrollment",
"time off requests", "payroll questions",
"workplace guidelines", "training programs",
],
blocked_topics=[
"salary negotiation advice", "legal advice",
"performance reviews of specific employees",
"confidential personnel actions",
"union activities", "medical diagnoses",
],
),
format=FormatBoundary(
max_length=2000,
allowed_formats=["text", "markdown"],
blocked_patterns=[r"```"], # No code blocks in HR context
),
content=ContentBoundary(
no_personal_opinions=True,
no_speculation=True, # HR answers must be based on policy
require_hedging_for_uncertainty=True,
),
enforcement_level="strict",
)Documenting Boundary Rationale
For each blocked topic, document why it is blocked. This serves two purposes: it helps maintainers understand whether a boundary is still relevant, and it provides justification when users ask why a question was redirected.
BOUNDARY_RATIONALE = {
"medical advice": {
"reason": "Liability risk -- incorrect medical information could cause physical harm",
"redirect": "Please consult a healthcare professional for medical questions.",
"examples_blocked": [
"What medication should I take for headaches?",
"Is this rash something I should worry about?",
],
"examples_allowed": [
"This billing issue is giving me a headache.", # Figurative use
"I need to return my first aid kit.", # Product-related
],
},
"legal advice": {
"reason": "Unauthorized practice of law -- only licensed attorneys can provide legal advice",
"redirect": "For legal questions, please consult a qualified attorney.",
"examples_blocked": [
"Can I sue the company for this defective product?",
"What are my legal rights regarding this return?",
],
"examples_allowed": [
"What is your return policy?", # Company policy, not legal advice
"How long do I have to return this item?", # Policy question
],
},
}Step 2: Build the Boundary Checker
# boundaries/checker.py
"""
Boundary violation checker for LLM responses.
"""
import re
from dataclasses import dataclass, field
from boundaries.definition import BoundaryConfig, BoundaryType
@dataclass
class Violation:
boundary_type: BoundaryType
description: str
severity: str
text_span: str = ""
@dataclass
class BoundaryCheckResult:
within_bounds: bool
violations: list[Violation] = field(default_factory=list)
risk_score: float = 0.0
class BoundaryChecker:
def __init__(self, config: BoundaryConfig, embedding_model=None):
self.config = config
self.embedding_model = embedding_model
self._prepare_topic_keywords()
def _prepare_topic_keywords(self):
self.blocked_keywords = {}
keyword_map = {
"medical advice": ["diagnosis", "symptom", "medication", "dosage", "treatment plan"],
"legal advice": ["lawsuit", "liability", "sue", "legal rights", "attorney"],
"financial advice": ["invest", "stock", "portfolio", "tax strategy", "retirement fund"],
"political opinions": ["vote for", "political party", "liberal", "conservative"],
"competitor products": [], # Populated per-deployment
}
for topic in self.config.topic.blocked_topics:
self.blocked_keywords[topic] = keyword_map.get(topic, [])
def check(self, response: str) -> BoundaryCheckResult:
violations = []
# Topic boundary checks
violations.extend(self._check_topics(response))
# Format boundary checks
violations.extend(self._check_format(response))
# Content boundary checks
violations.extend(self._check_content(response))
risk = min(sum(
0.3 if v.severity == "high" else 0.15 for v in violations
), 1.0)
return BoundaryCheckResult(
within_bounds=len(violations) == 0,
violations=violations,
risk_score=risk,
)
def _check_topics(self, text: str) -> list[Violation]:
violations = []
text_lower = text.lower()
for topic, keywords in self.blocked_keywords.items():
matched = [kw for kw in keywords if kw in text_lower]
if len(matched) >= 2:
violations.append(Violation(
boundary_type=BoundaryType.TOPIC,
description=f"Response discusses blocked topic: {topic}",
severity="high",
text_span=", ".join(matched),
))
return violations
def _check_format(self, text: str) -> list[Violation]:
violations = []
if len(text) > self.config.format.max_length:
violations.append(Violation(
boundary_type=BoundaryType.FORMAT,
description=f"Response exceeds max length ({len(text)} > {self.config.format.max_length})",
severity="medium",
))
for pattern in self.config.format.blocked_patterns:
if re.search(pattern, text):
violations.append(Violation(
boundary_type=BoundaryType.FORMAT,
description=f"Response contains blocked format pattern",
severity="medium",
))
return violations
def _check_content(self, text: str) -> list[Violation]:
violations = []
if self.config.content.no_personal_opinions:
opinion_markers = [
"I think", "I believe", "in my opinion",
"I feel that", "personally, I",
]
for marker in opinion_markers:
if marker.lower() in text.lower():
violations.append(Violation(
boundary_type=BoundaryType.CONTENT,
description="Response contains personal opinion markers",
severity="low",
text_span=marker,
))
break
return violationsImproving Keyword Detection Accuracy
The basic keyword approach above has a significant limitation: it produces false positives when keywords appear in unrelated contexts. The word "stock" could mean "in stock" (product availability) or "stock market" (financial advice). Improve accuracy by using keyword phrases with context:
class ContextAwareKeywordChecker:
"""Check for blocked keywords with surrounding context to reduce false positives."""
def __init__(self):
# Each entry: (keyword, required_context_words, excluded_context_words)
self.rules = {
"medical advice": [
("symptom", ["experiencing", "suffering", "diagnosed"], ["product", "issue"]),
("medication", ["take", "prescribe", "dosage"], ["allergies"]),
("diagnosis", ["medical", "doctor", "condition"], ["problem", "issue", "troubleshoot"]),
("treatment", ["medical", "therapy", "clinical"], ["returns", "refund", "order"]),
],
"financial advice": [
("invest", ["portfolio", "returns", "market"], ["time", "effort"]),
("stock", ["market", "portfolio", "shares", "buy"], ["in stock", "out of stock", "stock level"]),
("tax", ["strategy", "deduction", "filing"], ["sales tax", "tax included"]),
],
}
def check(self, text: str, topic: str) -> list[str]:
"""Return list of matched keywords with context validation."""
text_lower = text.lower()
matches = []
for keyword, required_ctx, excluded_ctx in self.rules.get(topic, []):
if keyword not in text_lower:
continue
# Check if any excluded context is present (false positive signal)
if any(exc in text_lower for exc in excluded_ctx):
continue
# Require at least one context word to confirm the topic
if any(ctx in text_lower for ctx in required_ctx):
matches.append(keyword)
return matchesThis approach dramatically reduces false positives. "This item is currently in stock" no longer triggers the financial advice boundary, while "You should invest in stocks for long-term returns" correctly does.
Step 3: Implement Response Correction
# boundaries/corrector.py
"""
Response correction for boundary violations.
"""
from boundaries.checker import BoundaryCheckResult, BoundaryType
class ResponseCorrector:
def __init__(self, llm_client=None):
self.llm = llm_client
def correct(
self, response: str, check_result: BoundaryCheckResult
) -> dict:
if check_result.within_bounds:
return {"output": response, "corrected": False}
high_severity = [
v for v in check_result.violations if v.severity == "high"
]
if high_severity:
return {
"output": self._get_safe_fallback(high_severity),
"corrected": True,
"action": "replaced",
}
# For medium/low violations, truncate or trim
corrected = response
for violation in check_result.violations:
if violation.boundary_type == BoundaryType.FORMAT:
if "max length" in violation.description:
corrected = corrected[:1500] + "..."
return {"output": corrected, "corrected": True, "action": "trimmed"}
def _get_safe_fallback(self, violations) -> str:
topics = [v.description for v in violations]
return (
"I can only help with questions about our products, "
"orders, shipping, returns, and account management. "
"For other inquiries, please contact the appropriate "
"professional service."
)LLM-Powered Response Rewriting
For medium-severity violations where you want to preserve the helpful parts of the response while removing the boundary violation, use an LLM to rewrite:
class LLMResponseCorrector:
"""Use an LLM to rewrite responses that violate boundaries."""
def __init__(self, llm_client, boundary_config: BoundaryConfig):
self.llm = llm_client
self.config = boundary_config
async def rewrite(
self, response: str, violations: list
) -> str:
"""Rewrite a response to remove boundary violations while preserving useful content."""
violation_descriptions = "\n".join(
f"- {v.description} (severity: {v.severity})"
for v in violations
)
allowed_topics = ", ".join(self.config.topic.allowed_topics)
rewrite_prompt = f"""You are a response filter for a {self.config.description}.
The following response contains boundary violations:
VIOLATIONS:
{violation_descriptions}
ORIGINAL RESPONSE:
{response}
Rewrite this response to:
1. Remove all content related to the violations listed above
2. Keep any helpful content that falls within the allowed scope: {allowed_topics}
3. If the entire response is out of scope, replace it with a polite redirect
4. Do not add information that was not in the original response
5. Keep the same tone and style as the original
Rewritten response:"""
rewritten = await self.llm.generate(rewrite_prompt)
return rewrittenGraduated Enforcement Strategies
Different violation severities warrant different responses. Implement a graduated enforcement pipeline:
class GraduatedEnforcer:
"""Apply different enforcement actions based on violation severity."""
def __init__(self, corrector: ResponseCorrector, llm_corrector: LLMResponseCorrector = None):
self.corrector = corrector
self.llm_corrector = llm_corrector
async def enforce(
self, response: str, check_result: BoundaryCheckResult
) -> dict:
if check_result.within_bounds:
return {"output": response, "action": "pass", "corrected": False}
max_severity = max(
v.severity for v in check_result.violations
)
if max_severity == "low":
# Log but allow through with minor adjustments
return {
"output": response,
"action": "warn",
"corrected": False,
"warnings": [v.description for v in check_result.violations],
}
elif max_severity == "medium":
# Attempt LLM rewrite to preserve useful content
if self.llm_corrector:
rewritten = await self.llm_corrector.rewrite(
response, check_result.violations
)
return {
"output": rewritten,
"action": "rewrite",
"corrected": True,
}
else:
# Fall back to truncation
return self.corrector.correct(response, check_result)
else: # high severity
# Complete replacement with safe fallback
return {
"output": self.corrector._get_safe_fallback(check_result.violations),
"action": "block",
"corrected": True,
}Step 4: Add Embedding-Based Topic Detection
# boundaries/topic_embeddings.py
"""
Embedding-based topic boundary detection for nuanced topic matching.
"""
import numpy as np
class EmbeddingTopicChecker:
def __init__(self, embedding_model):
self.model = embedding_model
self.topic_vectors = {}
def register_topics(
self, allowed: list[str], blocked: list[str]
) -> None:
for topic in allowed:
self.topic_vectors[topic] = {
"vector": self.model.embed(topic),
"allowed": True,
}
for topic in blocked:
self.topic_vectors[topic] = {
"vector": self.model.embed(topic),
"allowed": False,
}
def check_response(
self, response: str, threshold: float = 0.65
) -> dict:
response_vector = self.model.embed(response)
results = {}
for topic, info in self.topic_vectors.items():
similarity = float(np.dot(response_vector, info["vector"]))
if similarity >= threshold:
results[topic] = {
"similarity": round(similarity, 3),
"allowed": info["allowed"],
}
blocked_matches = {
k: v for k, v in results.items() if not v["allowed"]
}
return {
"in_bounds": len(blocked_matches) == 0,
"matched_topics": results,
"blocked_matches": blocked_matches,
}Improving Embedding Detection with Expanded Topic Descriptions
Single-word or short-phrase topic labels produce weak embeddings. Expand each topic into a descriptive paragraph for more accurate matching:
EXPANDED_TOPIC_DESCRIPTIONS = {
"medical advice": (
"Medical advice including diagnoses, treatment recommendations, "
"medication dosages, symptom interpretation, health conditions, "
"and clinical guidance that should come from a healthcare professional."
),
"legal advice": (
"Legal advice including interpretations of law, litigation strategy, "
"rights and obligations, contract review, liability assessment, "
"and guidance that should come from a licensed attorney."
),
"financial advice": (
"Financial advice including investment recommendations, portfolio "
"management, tax planning strategies, retirement planning, "
"and guidance that should come from a licensed financial advisor."
),
"product information": (
"Information about our products including features, specifications, "
"pricing, availability, compatibility, usage instructions, "
"and comparison between our product lines."
),
}
class EnhancedEmbeddingChecker(EmbeddingTopicChecker):
"""Use expanded topic descriptions for more accurate embedding matching."""
def register_topics_with_descriptions(
self,
allowed: list[str],
blocked: list[str],
descriptions: dict[str, str],
) -> None:
for topic in allowed:
desc = descriptions.get(topic, topic)
self.topic_vectors[topic] = {
"vector": self.model.embed(desc),
"allowed": True,
}
for topic in blocked:
desc = descriptions.get(topic, topic)
self.topic_vectors[topic] = {
"vector": self.model.embed(desc),
"allowed": False,
}Combining Keyword and Embedding Detection
The strongest detection combines both approaches: keywords catch obvious violations quickly, and embeddings catch subtle topic drift that keywords miss.
class HybridTopicChecker:
"""Combine keyword and embedding checks for robust topic detection."""
def __init__(
self,
keyword_checker: BoundaryChecker,
embedding_checker: EmbeddingTopicChecker,
):
self.keyword_checker = keyword_checker
self.embedding_checker = embedding_checker
def check(self, response: str) -> BoundaryCheckResult:
# Fast keyword check first
keyword_result = self.keyword_checker.check(response)
# If keywords already flagged high severity, no need for embeddings
high_severity = any(
v.severity == "high" for v in keyword_result.violations
)
if high_severity:
return keyword_result
# Embedding check for subtle topic drift
embedding_result = self.embedding_checker.check_response(response)
# Merge results
violations = list(keyword_result.violations)
for topic, match_info in embedding_result.get("blocked_matches", {}).items():
violations.append(Violation(
boundary_type=BoundaryType.TOPIC,
description=f"Response semantically similar to blocked topic: {topic} "
f"(similarity: {match_info['similarity']:.2f})",
severity="medium" if match_info["similarity"] < 0.8 else "high",
))
risk = min(sum(
0.3 if v.severity == "high" else 0.15 for v in violations
), 1.0)
return BoundaryCheckResult(
within_bounds=len(violations) == 0,
violations=violations,
risk_score=risk,
)Step 5: Build the Enforcement Service
# boundaries/service.py
from fastapi import FastAPI
from pydantic import BaseModel
from boundaries.definition import CUSTOMER_SUPPORT_BOUNDARIES
from boundaries.checker import BoundaryChecker
from boundaries.corrector import ResponseCorrector
app = FastAPI(title="Response Boundary Enforcement")
checker = BoundaryChecker(CUSTOMER_SUPPORT_BOUNDARIES)
corrector = ResponseCorrector()
class EnforceRequest(BaseModel):
response: str
session_id: str = ""
class EnforceResponse(BaseModel):
output: str
within_bounds: bool
corrected: bool
violations: list[dict]
@app.post("/enforce", response_model=EnforceResponse)
async def enforce_boundaries(request: EnforceRequest):
result = checker.check(request.response)
correction = corrector.correct(request.response, result)
return EnforceResponse(
output=correction["output"],
within_bounds=result.within_bounds,
corrected=correction.get("corrected", False),
violations=[
{"type": v.boundary_type.value, "description": v.description}
for v in result.violations
],
)uvicorn boundaries.service:app --port 8530Integrating with Your LLM Application
The enforcement service sits between your LLM and the user. Here is how to integrate it into a typical chat application:
import httpx
class BoundaryEnforcedChat:
"""Chat client that enforces response boundaries."""
def __init__(self, llm_client, enforcer_url: str = "http://localhost:8530"):
self.llm = llm_client
self.enforcer_url = enforcer_url
self.http_client = httpx.AsyncClient(timeout=5.0)
async def chat(self, user_message: str, session_id: str = "") -> dict:
"""Send a message and return the boundary-enforced response."""
# Step 1: Get the raw LLM response
raw_response = await self.llm.generate(user_message)
# Step 2: Check and enforce boundaries
try:
enforcement = await self.http_client.post(
f"{self.enforcer_url}/enforce",
json={"response": raw_response, "session_id": session_id},
)
enforcement.raise_for_status()
result = enforcement.json()
except httpx.HTTPError:
# If enforcer is down, use conservative fallback
result = {
"output": raw_response,
"within_bounds": True, # Fail open or closed based on policy
"corrected": False,
"violations": [],
}
return {
"response": result["output"],
"was_corrected": result["corrected"],
"violation_count": len(result["violations"]),
}Health Checks and Latency Monitoring
Add health check and metrics endpoints to monitor the enforcement service in production:
from datetime import datetime
from collections import deque
# Track recent latencies for monitoring
_latencies = deque(maxlen=1000)
_violation_count = 0
_request_count = 0
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"requests_processed": _request_count,
"violations_detected": _violation_count,
"avg_latency_ms": (
sum(_latencies) / len(_latencies) * 1000
if _latencies else 0
),
}Step 6: Monitor Boundary Violations
# boundaries/monitoring.py
"""
Track boundary violation patterns over time.
"""
from collections import Counter, defaultdict
import logging
class BoundaryMonitor:
def __init__(self):
self.violations = Counter()
self.session_violations = defaultdict(list)
self.logger = logging.getLogger("boundary_monitor")
def record(self, session_id: str, result) -> None:
for v in result.violations:
self.violations[v.boundary_type.value] += 1
self.session_violations[session_id].append(v)
# Alert on repeated violations from same session
if len(self.session_violations[session_id]) >= 3:
self.logger.warning(
f"Session {session_id} has {len(self.session_violations[session_id])} "
f"boundary violations -- possible injection campaign"
)
def report(self) -> dict:
return {
"violation_counts": dict(self.violations),
"sessions_with_violations": len(self.session_violations),
}Detecting Injection Campaigns
A single boundary violation might be an innocent user asking an off-topic question. But a pattern of violations from the same session -- especially escalating ones -- suggests a deliberate prompt injection attempt. Implement pattern detection:
from datetime import datetime, timedelta
from dataclasses import dataclass
@dataclass
class InjectionAlert:
session_id: str
violation_count: int
time_window: float # seconds
severity: str
description: str
class InjectionCampaignDetector:
"""Detect potential prompt injection campaigns from violation patterns."""
def __init__(
self,
alert_threshold: int = 3,
time_window_seconds: float = 300,
escalation_threshold: int = 5,
):
self.alert_threshold = alert_threshold
self.time_window = time_window_seconds
self.escalation_threshold = escalation_threshold
self.session_history = defaultdict(list)
self.alerts = []
def record_violation(
self, session_id: str, violation, timestamp: datetime = None
) -> InjectionAlert | None:
"""Record a violation and return an alert if a campaign is detected."""
ts = timestamp or datetime.now()
self.session_history[session_id].append({
"timestamp": ts,
"violation": violation,
})
# Filter to recent violations within the time window
cutoff = ts - timedelta(seconds=self.time_window)
recent = [
v for v in self.session_history[session_id]
if v["timestamp"] >= cutoff
]
if len(recent) >= self.escalation_threshold:
alert = InjectionAlert(
session_id=session_id,
violation_count=len(recent),
time_window=self.time_window,
severity="critical",
description=(
f"Session {session_id[:8]} triggered {len(recent)} violations "
f"in {self.time_window}s -- likely active injection campaign"
),
)
self.alerts.append(alert)
return alert
elif len(recent) >= self.alert_threshold:
alert = InjectionAlert(
session_id=session_id,
violation_count=len(recent),
time_window=self.time_window,
severity="warning",
description=(
f"Session {session_id[:8]} triggered {len(recent)} violations "
f"in {self.time_window}s -- possible injection attempt"
),
)
self.alerts.append(alert)
return alert
return NoneMonitoring Dashboard
Track violation trends over time to identify systemic issues:
class ViolationTrendTracker:
"""Track violation rates over time to identify trends."""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.recent_checks = deque(maxlen=window_size)
def record_check(self, had_violation: bool) -> None:
self.recent_checks.append(had_violation)
@property
def violation_rate(self) -> float:
"""Current violation rate over the sliding window."""
if not self.recent_checks:
return 0.0
return sum(self.recent_checks) / len(self.recent_checks)
def is_elevated(self, baseline_rate: float = 0.05) -> bool:
"""Check if the current violation rate is elevated above baseline."""
return self.violation_rate > baseline_rate * 2
def summary(self) -> dict:
return {
"window_size": self.window_size,
"checks_recorded": len(self.recent_checks),
"violation_rate": round(self.violation_rate, 4),
"is_elevated": self.is_elevated(),
}Step 7: Test Boundary Enforcement
# tests/test_boundaries.py
import pytest
from boundaries.definition import CUSTOMER_SUPPORT_BOUNDARIES
from boundaries.checker import BoundaryChecker
@pytest.fixture
def checker():
return BoundaryChecker(CUSTOMER_SUPPORT_BOUNDARIES)
def test_on_topic_passes(checker):
result = checker.check("Your order #12345 shipped on March 10th.")
assert result.within_bounds
def test_medical_advice_blocked(checker):
result = checker.check(
"Based on your symptoms and diagnosis, I recommend this medication dosage."
)
assert not result.within_bounds
assert any("medical" in v.description.lower() for v in result.violations)
def test_length_violation(checker):
result = checker.check("word " * 500)
assert any(v.boundary_type.value == "format" for v in result.violations)
def test_opinion_detected(checker):
result = checker.check("I think our product is the best on the market.")
assert any(v.boundary_type.value == "content" for v in result.violations)pytest tests/test_boundaries.py -vTesting Edge Cases
The most important tests cover edge cases where boundaries are ambiguous:
def test_figurative_medical_language_allowed(checker):
"""Figurative use of medical terms should not trigger blocking."""
result = checker.check(
"Your order is on its way! I hope this cures your waiting anxiety."
)
# "cures" alone should not trigger medical advice blocking
assert result.within_bounds
def test_product_with_health_keywords_allowed(checker):
"""Products with health-adjacent names should be discussable."""
result = checker.check(
"The Wellness Tracker Pro is currently in stock and ships within 2 days."
)
assert result.within_bounds
def test_borderline_financial_topic(checker):
"""Questions about payment methods are allowed; investment advice is not."""
# Allowed: payment methods
result = checker.check(
"You can pay with credit card, debit card, or PayPal."
)
assert result.within_bounds
# Blocked: investment advice
result = checker.check(
"I recommend investing in growth stocks for your portfolio's long-term returns."
)
assert not result.within_bounds
def test_multiple_violations_increase_risk(checker):
"""Multiple violations should produce a higher risk score."""
single_violation = checker.check(
"Based on your symptoms, this medication dosage should help."
)
multiple_violations = checker.check(
"I think you should invest in stocks. Based on your symptoms, "
"take this medication dosage for your diagnosis. "
+ "x" * 2000 # Also triggers length violation
)
assert multiple_violations.risk_score > single_violation.risk_score
def test_empty_response(checker):
"""Empty responses should pass boundary checks."""
result = checker.check("")
assert result.within_bounds
def test_unicode_and_special_characters(checker):
"""Boundary checks should handle unicode text gracefully."""
result = checker.check(
"Your order for the caf\u00e9 set is confirmed! \U0001f4e6"
)
assert result.within_boundsIntegration Testing the Full Pipeline
Test the complete flow from raw response through checking, correction, and output:
def test_full_enforcement_pipeline():
"""Test the complete boundary enforcement pipeline."""
checker = BoundaryChecker(CUSTOMER_SUPPORT_BOUNDARIES)
corrector = ResponseCorrector()
# Scenario 1: Clean response passes through unchanged
clean = "Your order ships tomorrow via standard delivery."
result = checker.check(clean)
correction = corrector.correct(clean, result)
assert not correction["corrected"]
assert correction["output"] == clean
# Scenario 2: High-severity violation gets replaced
medical = "Based on your symptoms and diagnosis, take 500mg of aspirin daily."
result = checker.check(medical)
correction = corrector.correct(medical, result)
assert correction["corrected"]
assert "products" in correction["output"].lower() # Safe fallback
assert "aspirin" not in correction["output"] # Medical content removed
# Scenario 3: Format violation gets trimmed
long_response = "Your order details: " + "This is additional information. " * 200
result = checker.check(long_response)
correction = corrector.correct(long_response, result)
assert correction["corrected"]
assert len(correction["output"]) <= 1510 # 1500 + "..."Related Topics
- Output Content Classifier -- Harm-focused output classification
- Structured Output Validation -- Schema-based output checks
- Content Policy Enforcement -- Setting up content policies
- LLM Judge Implementation -- Using LLMs to judge boundary compliance
A customer support bot receives the query 'What medications help with headaches?' and generates a response discussing OTC pain relievers. How should the boundary enforcer handle this?