Afdwingen van responsgrenzen
Stapsgewijze walkthrough om LLM-responses binnen gedefinieerde onderwerp-, formaat- en contentgrenzen te houden, met grensdefinitie, detectie van overtredingen, het herschrijven van responses en het monitoren van grensafwijking.
LLM-applicaties worden ingezet voor specifieke doeleinden -- een klantenservicebot moet producten en beleid bespreken, geen medisch advies geven of code genereren. Zonder het afdwingen van grenzen kan prompt-injectie het model naar willekeurige onderwerpen sturen. Deze walkthrough bouwt een systeem voor het afdwingen van grenzen dat detecteert wanneer responses buiten hun gedefinieerde scope afdwalen en ze corrigeert voordat ze de gebruiker bereiken.
De fundamentele uitdaging is het balanceren van security met bruikbaarheid. Te strikte grenzen maken de applicatie frustrerend om te gebruiken -- stel je een klantenservicebot voor die het woord "hoofdpijn" blokkeert, zelfs wanneer een klant zegt "deze facturatiekwestie bezorgt me hoofdpijn." Te soepele grenzen maken de applicatie kwetsbaar voor onderwerpkaping. De aanpak in deze walkthrough pakt deze spanning aan door middel van gegradueerde handhaving en gelaagde detectie.
Stap 1: Definieer applicatiegrenzen
De eerste stap is het documenteren van wat je applicatie wel en niet zou moeten doen. Dit lijkt eenvoudig, maar slecht gedefinieerde grenzen zijn de hoofdoorzaak van de meeste handhavingsfouten. Een grens als "geen medisch advies" is te vaag -- valt "drink water als je je uitgedroogd voelt" daaronder? Definieer grenzen met specifieke voorbeelden van wat binnen en buiten elke grens valt.
# boundaries/definition.py
"""
Boundary definitions for LLM application scope.
"""
from dataclasses import dataclass, field
from enum import Enum
class BoundaryType(str, Enum):
TOPIC = "topic"
FORMAT = "format"
CONTENT = "content"
LENGTH = "length"
@dataclass
class TopicBoundary:
allowed_topics: list[str]
blocked_topics: list[str]
topic_embeddings: dict = field(default_factory=dict)
@dataclass
class FormatBoundary:
max_length: int = 2000
allowed_formats: list[str] = field(default_factory=lambda: ["text", "markdown"])
blocked_patterns: list[str] = field(default_factory=list)
@dataclass
class ContentBoundary:
no_personal_opinions: bool = True
no_speculation: bool = False
require_hedging_for_uncertainty: bool = True
allowed_languages: list[str] = field(default_factory=lambda: ["en"])
@dataclass
class BoundaryConfig:
name: str
description: str
topic: TopicBoundary
format: FormatBoundary
content: ContentBoundary
enforcement_level: str = "strict" # strict, moderate, lenient
# Example configuration for a customer support bot
CUSTOMER_SUPPORT_BOUNDARIES = BoundaryConfig(
name="customer_support",
description="Customer support assistant for an e-commerce platform",
topic=TopicBoundary(
allowed_topics=[
"product information", "order status", "shipping",
"returns and refunds", "account management",
"payment methods", "promotions and discounts",
],
blocked_topics=[
"medical advice", "legal advice", "financial advice",
"political opinions", "competitor products",
"internal company data", "employee information",
],
),
format=FormatBoundary(
max_length=1500,
allowed_formats=["text", "markdown"],
blocked_patterns=[
r"```(?:python|bash|javascript|sql)", # No code blocks
],
),
content=ContentBoundary(
no_personal_opinions=True,
require_hedging_for_uncertainty=True,
),
)Grensconfiguratie voor verschillende applicatietypes
Het bovenstaande klantenservicevoorbeeld is slechts één patroon. Verschillende applicaties hebben verschillende grensconfiguraties nodig. Hier volgen aanvullende voorbeelden:
# Educational tutoring assistant
TUTORING_BOUNDARIES = BoundaryConfig(
name="tutoring",
description="Math and science tutoring for high school students",
topic=TopicBoundary(
allowed_topics=[
"mathematics", "algebra", "geometry", "calculus",
"physics", "chemistry", "biology",
"study techniques", "homework help",
],
blocked_topics=[
"weapons", "drugs", "violence",
"political opinions", "religious opinions",
"personal relationships", "social media",
],
),
format=FormatBoundary(
max_length=3000, # Longer for educational explanations
allowed_formats=["text", "markdown", "latex"],
blocked_patterns=[], # Code blocks are OK for math
),
content=ContentBoundary(
no_personal_opinions=True,
no_speculation=False, # Speculation can be educational
require_hedging_for_uncertainty=True,
),
)
# Internal HR assistant
HR_BOUNDARIES = BoundaryConfig(
name="hr_assistant",
description="Internal HR policy and benefits assistant",
topic=TopicBoundary(
allowed_topics=[
"company policies", "benefits enrollment",
"time off requests", "payroll questions",
"workplace guidelines", "training programs",
],
blocked_topics=[
"salary negotiation advice", "legal advice",
"performance reviews of specific employees",
"confidential personnel actions",
"union activities", "medical diagnoses",
],
),
format=FormatBoundary(
max_length=2000,
allowed_formats=["text", "markdown"],
blocked_patterns=[r"```"], # No code blocks in HR context
),
content=ContentBoundary(
no_personal_opinions=True,
no_speculation=True, # HR answers must be based on policy
require_hedging_for_uncertainty=True,
),
enforcement_level="strict",
)De rationale achter grenzen documenteren
Documenteer voor elk geblokkeerd onderwerp waarom het wordt geblokkeerd. Dit dient twee doelen: het helpt beheerders te begrijpen of een grens nog relevant is, en het biedt rechtvaardiging wanneer gebruikers vragen waarom een vraag werd omgeleid.
BOUNDARY_RATIONALE = {
"medical advice": {
"reason": "Liability risk -- incorrect medical information could cause physical harm",
"redirect": "Please consult a healthcare professional for medical questions.",
"examples_blocked": [
"What medication should I take for headaches?",
"Is this rash something I should worry about?",
],
"examples_allowed": [
"This billing issue is giving me a headache.", # Figurative use
"I need to return my first aid kit.", # Product-related
],
},
"legal advice": {
"reason": "Unauthorized practice of law -- only licensed attorneys can provide legal advice",
"redirect": "For legal questions, please consult a qualified attorney.",
"examples_blocked": [
"Can I sue the company for this defective product?",
"What are my legal rights regarding this return?",
],
"examples_allowed": [
"What is your return policy?", # Company policy, not legal advice
"How long do I have to return this item?", # Policy question
],
},
}Stap 2: Bouw de grenscontroller
# boundaries/checker.py
"""
Boundary violation checker for LLM responses.
"""
import re
from dataclasses import dataclass, field
from boundaries.definition import BoundaryConfig, BoundaryType
@dataclass
class Violation:
boundary_type: BoundaryType
description: str
severity: str
text_span: str = ""
@dataclass
class BoundaryCheckResult:
within_bounds: bool
violations: list[Violation] = field(default_factory=list)
risk_score: float = 0.0
class BoundaryChecker:
def __init__(self, config: BoundaryConfig, embedding_model=None):
self.config = config
self.embedding_model = embedding_model
self._prepare_topic_keywords()
def _prepare_topic_keywords(self):
self.blocked_keywords = {}
keyword_map = {
"medical advice": ["diagnosis", "symptom", "medication", "dosage", "treatment plan"],
"legal advice": ["lawsuit", "liability", "sue", "legal rights", "attorney"],
"financial advice": ["invest", "stock", "portfolio", "tax strategy", "retirement fund"],
"political opinions": ["vote for", "political party", "liberal", "conservative"],
"competitor products": [], # Populated per-deployment
}
for topic in self.config.topic.blocked_topics:
self.blocked_keywords[topic] = keyword_map.get(topic, [])
def check(self, response: str) -> BoundaryCheckResult:
violations = []
# Topic boundary checks
violations.extend(self._check_topics(response))
# Format boundary checks
violations.extend(self._check_format(response))
# Content boundary checks
violations.extend(self._check_content(response))
risk = min(sum(
0.3 if v.severity == "high" else 0.15 for v in violations
), 1.0)
return BoundaryCheckResult(
within_bounds=len(violations) == 0,
violations=violations,
risk_score=risk,
)
def _check_topics(self, text: str) -> list[Violation]:
violations = []
text_lower = text.lower()
for topic, keywords in self.blocked_keywords.items():
matched = [kw for kw in keywords if kw in text_lower]
if len(matched) >= 2:
violations.append(Violation(
boundary_type=BoundaryType.TOPIC,
description=f"Response discusses blocked topic: {topic}",
severity="high",
text_span=", ".join(matched),
))
return violations
def _check_format(self, text: str) -> list[Violation]:
violations = []
if len(text) > self.config.format.max_length:
violations.append(Violation(
boundary_type=BoundaryType.FORMAT,
description=f"Response exceeds max length ({len(text)} > {self.config.format.max_length})",
severity="medium",
))
for pattern in self.config.format.blocked_patterns:
if re.search(pattern, text):
violations.append(Violation(
boundary_type=BoundaryType.FORMAT,
description=f"Response contains blocked format pattern",
severity="medium",
))
return violations
def _check_content(self, text: str) -> list[Violation]:
violations = []
if self.config.content.no_personal_opinions:
opinion_markers = [
"I think", "I believe", "in my opinion",
"I feel that", "personally, I",
]
for marker in opinion_markers:
if marker.lower() in text.lower():
violations.append(Violation(
boundary_type=BoundaryType.CONTENT,
description="Response contains personal opinion markers",
severity="low",
text_span=marker,
))
break
return violationsDe nauwkeurigheid van keyword-detectie verbeteren
De bovenstaande basale keyword-aanpak heeft een belangrijke beperking: hij produceert false positives wanneer keywords in ongerelateerde contexten voorkomen. Het woord "stock" kan "in stock" (productbeschikbaarheid) of "stock market" (financieel advies) betekenen. Verbeter de nauwkeurigheid door keyword-frasen met context te gebruiken:
class ContextAwareKeywordChecker:
"""Check for blocked keywords with surrounding context to reduce false positives."""
def __init__(self):
# Each entry: (keyword, required_context_words, excluded_context_words)
self.rules = {
"medical advice": [
("symptom", ["experiencing", "suffering", "diagnosed"], ["product", "issue"]),
("medication", ["take", "prescribe", "dosage"], ["allergies"]),
("diagnosis", ["medical", "doctor", "condition"], ["problem", "issue", "troubleshoot"]),
("treatment", ["medical", "therapy", "clinical"], ["returns", "refund", "order"]),
],
"financial advice": [
("invest", ["portfolio", "returns", "market"], ["time", "effort"]),
("stock", ["market", "portfolio", "shares", "buy"], ["in stock", "out of stock", "stock level"]),
("tax", ["strategy", "deduction", "filing"], ["sales tax", "tax included"]),
],
}
def check(self, text: str, topic: str) -> list[str]:
"""Return list of matched keywords with context validation."""
text_lower = text.lower()
matches = []
for keyword, required_ctx, excluded_ctx in self.rules.get(topic, []):
if keyword not in text_lower:
continue
# Check if any excluded context is present (false positive signal)
if any(exc in text_lower for exc in excluded_ctx):
continue
# Require at least one context word to confirm the topic
if any(ctx in text_lower for ctx in required_ctx):
matches.append(keyword)
return matchesDeze aanpak vermindert false positives drastisch. "This item is currently in stock" triggert niet langer de grens voor financieel advies, terwijl "You should invest in stocks for long-term returns" dat correct wel doet.
Stap 3: Implementeer responscorrectie
# boundaries/corrector.py
"""
Response correction for boundary violations.
"""
from boundaries.checker import BoundaryCheckResult, BoundaryType
class ResponseCorrector:
def __init__(self, llm_client=None):
self.llm = llm_client
def correct(
self, response: str, check_result: BoundaryCheckResult
) -> dict:
if check_result.within_bounds:
return {"output": response, "corrected": False}
high_severity = [
v for v in check_result.violations if v.severity == "high"
]
if high_severity:
return {
"output": self._get_safe_fallback(high_severity),
"corrected": True,
"action": "replaced",
}
# For medium/low violations, truncate or trim
corrected = response
for violation in check_result.violations:
if violation.boundary_type == BoundaryType.FORMAT:
if "max length" in violation.description:
corrected = corrected[:1500] + "..."
return {"output": corrected, "corrected": True, "action": "trimmed"}
def _get_safe_fallback(self, violations) -> str:
topics = [v.description for v in violations]
return (
"I can only help with questions about our products, "
"orders, shipping, returns, and account management. "
"For other inquiries, please contact the appropriate "
"professional service."
)LLM-gestuurd herschrijven van responses
Voor medium-ernst overtredingen waarbij je de behulpzame delen van de response wilt behouden terwijl je de grensoverschrijding verwijdert, gebruik je een LLM om te herschrijven:
class LLMResponseCorrector:
"""Use an LLM to rewrite responses that violate boundaries."""
def __init__(self, llm_client, boundary_config: BoundaryConfig):
self.llm = llm_client
self.config = boundary_config
async def rewrite(
self, response: str, violations: list
) -> str:
"""Rewrite a response to remove boundary violations while preserving useful content."""
violation_descriptions = "\n".join(
f"- {v.description} (severity: {v.severity})"
for v in violations
)
allowed_topics = ", ".join(self.config.topic.allowed_topics)
rewrite_prompt = f"""You are a response filter for a {self.config.description}.
The following response contains boundary violations:
VIOLATIONS:
{violation_descriptions}
ORIGINAL RESPONSE:
{response}
Rewrite this response to:
1. Remove all content related to the violations listed above
2. Keep any helpful content that falls within the allowed scope: {allowed_topics}
3. If the entire response is out of scope, replace it with a polite redirect
4. Do not add information that was not in the original response
5. Keep the same tone and style as the original
Rewritten response:"""
rewritten = await self.llm.generate(rewrite_prompt)
return rewrittenStrategieën voor gegradueerde handhaving
Verschillende ernstniveaus van overtredingen rechtvaardigen verschillende responses. Implementeer een pipeline voor gegradueerde handhaving:
class GraduatedEnforcer:
"""Apply different enforcement actions based on violation severity."""
def __init__(self, corrector: ResponseCorrector, llm_corrector: LLMResponseCorrector = None):
self.corrector = corrector
self.llm_corrector = llm_corrector
async def enforce(
self, response: str, check_result: BoundaryCheckResult
) -> dict:
if check_result.within_bounds:
return {"output": response, "action": "pass", "corrected": False}
max_severity = max(
v.severity for v in check_result.violations
)
if max_severity == "low":
# Log but allow through with minor adjustments
return {
"output": response,
"action": "warn",
"corrected": False,
"warnings": [v.description for v in check_result.violations],
}
elif max_severity == "medium":
# Attempt LLM rewrite to preserve useful content
if self.llm_corrector:
rewritten = await self.llm_corrector.rewrite(
response, check_result.violations
)
return {
"output": rewritten,
"action": "rewrite",
"corrected": True,
}
else:
# Fall back to truncation
return self.corrector.correct(response, check_result)
else: # high severity
# Complete replacement with safe fallback
return {
"output": self.corrector._get_safe_fallback(check_result.violations),
"action": "block",
"corrected": True,
}Stap 4: Voeg embedding-gebaseerde onderwerpdetectie toe
# boundaries/topic_embeddings.py
"""
Embedding-based topic boundary detection for nuanced topic matching.
"""
import numpy as np
class EmbeddingTopicChecker:
def __init__(self, embedding_model):
self.model = embedding_model
self.topic_vectors = {}
def register_topics(
self, allowed: list[str], blocked: list[str]
) -> None:
for topic in allowed:
self.topic_vectors[topic] = {
"vector": self.model.embed(topic),
"allowed": True,
}
for topic in blocked:
self.topic_vectors[topic] = {
"vector": self.model.embed(topic),
"allowed": False,
}
def check_response(
self, response: str, threshold: float = 0.65
) -> dict:
response_vector = self.model.embed(response)
results = {}
for topic, info in self.topic_vectors.items():
similarity = float(np.dot(response_vector, info["vector"]))
if similarity >= threshold:
results[topic] = {
"similarity": round(similarity, 3),
"allowed": info["allowed"],
}
blocked_matches = {
k: v for k, v in results.items() if not v["allowed"]
}
return {
"in_bounds": len(blocked_matches) == 0,
"matched_topics": results,
"blocked_matches": blocked_matches,
}Embedding-detectie verbeteren met uitgebreide onderwerpbeschrijvingen
Onderwerplabels van één woord of een korte frase produceren zwakke embeddings. Breid elk onderwerp uit tot een beschrijvende alinea voor nauwkeurigere matching:
EXPANDED_TOPIC_DESCRIPTIONS = {
"medical advice": (
"Medical advice including diagnoses, treatment recommendations, "
"medication dosages, symptom interpretation, health conditions, "
"and clinical guidance that should come from a healthcare professional."
),
"legal advice": (
"Legal advice including interpretations of law, litigation strategy, "
"rights and obligations, contract review, liability assessment, "
"and guidance that should come from a licensed attorney."
),
"financial advice": (
"Financial advice including investment recommendations, portfolio "
"management, tax planning strategies, retirement planning, "
"and guidance that should come from a licensed financial advisor."
),
"product information": (
"Information about our products including features, specifications, "
"pricing, availability, compatibility, usage instructions, "
"and comparison between our product lines."
),
}
class EnhancedEmbeddingChecker(EmbeddingTopicChecker):
"""Use expanded topic descriptions for more accurate embedding matching."""
def register_topics_with_descriptions(
self,
allowed: list[str],
blocked: list[str],
descriptions: dict[str, str],
) -> None:
for topic in allowed:
desc = descriptions.get(topic, topic)
self.topic_vectors[topic] = {
"vector": self.model.embed(desc),
"allowed": True,
}
for topic in blocked:
desc = descriptions.get(topic, topic)
self.topic_vectors[topic] = {
"vector": self.model.embed(desc),
"allowed": False,
}Keyword- en embedding-detectie combineren
De sterkste detectie combineert beide benaderingen: keywords vangen voor de hand liggende overtredingen snel op, en embeddings vangen subtiele onderwerpafwijking op die keywords missen.
class HybridTopicChecker:
"""Combine keyword and embedding checks for robust topic detection."""
def __init__(
self,
keyword_checker: BoundaryChecker,
embedding_checker: EmbeddingTopicChecker,
):
self.keyword_checker = keyword_checker
self.embedding_checker = embedding_checker
def check(self, response: str) -> BoundaryCheckResult:
# Fast keyword check first
keyword_result = self.keyword_checker.check(response)
# If keywords already flagged high severity, no need for embeddings
high_severity = any(
v.severity == "high" for v in keyword_result.violations
)
if high_severity:
return keyword_result
# Embedding check for subtle topic drift
embedding_result = self.embedding_checker.check_response(response)
# Merge results
violations = list(keyword_result.violations)
for topic, match_info in embedding_result.get("blocked_matches", {}).items():
violations.append(Violation(
boundary_type=BoundaryType.TOPIC,
description=f"Response semantically similar to blocked topic: {topic} "
f"(similarity: {match_info['similarity']:.2f})",
severity="medium" if match_info["similarity"] < 0.8 else "high",
))
risk = min(sum(
0.3 if v.severity == "high" else 0.15 for v in violations
), 1.0)
return BoundaryCheckResult(
within_bounds=len(violations) == 0,
violations=violations,
risk_score=risk,
)Stap 5: Bouw de handhavingsservice
# boundaries/service.py
from fastapi import FastAPI
from pydantic import BaseModel
from boundaries.definition import CUSTOMER_SUPPORT_BOUNDARIES
from boundaries.checker import BoundaryChecker
from boundaries.corrector import ResponseCorrector
app = FastAPI(title="Response Boundary Enforcement")
checker = BoundaryChecker(CUSTOMER_SUPPORT_BOUNDARIES)
corrector = ResponseCorrector()
class EnforceRequest(BaseModel):
response: str
session_id: str = ""
class EnforceResponse(BaseModel):
output: str
within_bounds: bool
corrected: bool
violations: list[dict]
@app.post("/enforce", response_model=EnforceResponse)
async def enforce_boundaries(request: EnforceRequest):
result = checker.check(request.response)
correction = corrector.correct(request.response, result)
return EnforceResponse(
output=correction["output"],
within_bounds=result.within_bounds,
corrected=correction.get("corrected", False),
violations=[
{"type": v.boundary_type.value, "description": v.description}
for v in result.violations
],
)uvicorn boundaries.service:app --port 8530Integreren met je LLM-applicatie
De handhavingsservice zit tussen je LLM en de gebruiker in. Zo integreer je hem in een typische chat-applicatie:
import httpx
class BoundaryEnforcedChat:
"""Chat client that enforces response boundaries."""
def __init__(self, llm_client, enforcer_url: str = "http://localhost:8530"):
self.llm = llm_client
self.enforcer_url = enforcer_url
self.http_client = httpx.AsyncClient(timeout=5.0)
async def chat(self, user_message: str, session_id: str = "") -> dict:
"""Send a message and return the boundary-enforced response."""
# Step 1: Get the raw LLM response
raw_response = await self.llm.generate(user_message)
# Step 2: Check and enforce boundaries
try:
enforcement = await self.http_client.post(
f"{self.enforcer_url}/enforce",
json={"response": raw_response, "session_id": session_id},
)
enforcement.raise_for_status()
result = enforcement.json()
except httpx.HTTPError:
# If enforcer is down, use conservative fallback
result = {
"output": raw_response,
"within_bounds": True, # Fail open or closed based on policy
"corrected": False,
"violations": [],
}
return {
"response": result["output"],
"was_corrected": result["corrected"],
"violation_count": len(result["violations"]),
}Health checks en latentiemonitoring
Voeg health-check- en metrics-endpoints toe om de handhavingsservice in productie te monitoren:
from datetime import datetime
from collections import deque
# Track recent latencies for monitoring
_latencies = deque(maxlen=1000)
_violation_count = 0
_request_count = 0
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"requests_processed": _request_count,
"violations_detected": _violation_count,
"avg_latency_ms": (
sum(_latencies) / len(_latencies) * 1000
if _latencies else 0
),
}Stap 6: Monitor grensoverschrijdingen
# boundaries/monitoring.py
"""
Track boundary violation patterns over time.
"""
from collections import Counter, defaultdict
import logging
class BoundaryMonitor:
def __init__(self):
self.violations = Counter()
self.session_violations = defaultdict(list)
self.logger = logging.getLogger("boundary_monitor")
def record(self, session_id: str, result) -> None:
for v in result.violations:
self.violations[v.boundary_type.value] += 1
self.session_violations[session_id].append(v)
# Alert on repeated violations from same session
if len(self.session_violations[session_id]) >= 3:
self.logger.warning(
f"Session {session_id} has {len(self.session_violations[session_id])} "
f"boundary violations -- possible injection campaign"
)
def report(self) -> dict:
return {
"violation_counts": dict(self.violations),
"sessions_with_violations": len(self.session_violations),
}Injectiecampagnes detecteren
Eén enkele grensoverschrijding kan een onschuldige gebruiker zijn die een off-topic vraag stelt. Maar een patroon van overtredingen vanuit dezelfde sessie -- vooral escalerende -- duidt op een opzettelijke prompt-injectiepoging. Implementeer patroondetectie:
from datetime import datetime, timedelta
from dataclasses import dataclass
@dataclass
class InjectionAlert:
session_id: str
violation_count: int
time_window: float # seconds
severity: str
description: str
class InjectionCampaignDetector:
"""Detect potential prompt injection campaigns from violation patterns."""
def __init__(
self,
alert_threshold: int = 3,
time_window_seconds: float = 300,
escalation_threshold: int = 5,
):
self.alert_threshold = alert_threshold
self.time_window = time_window_seconds
self.escalation_threshold = escalation_threshold
self.session_history = defaultdict(list)
self.alerts = []
def record_violation(
self, session_id: str, violation, timestamp: datetime = None
) -> InjectionAlert | None:
"""Record a violation and return an alert if a campaign is detected."""
ts = timestamp or datetime.now()
self.session_history[session_id].append({
"timestamp": ts,
"violation": violation,
})
# Filter to recent violations within the time window
cutoff = ts - timedelta(seconds=self.time_window)
recent = [
v for v in self.session_history[session_id]
if v["timestamp"] >= cutoff
]
if len(recent) >= self.escalation_threshold:
alert = InjectionAlert(
session_id=session_id,
violation_count=len(recent),
time_window=self.time_window,
severity="critical",
description=(
f"Session {session_id[:8]} triggered {len(recent)} violations "
f"in {self.time_window}s -- likely active injection campaign"
),
)
self.alerts.append(alert)
return alert
elif len(recent) >= self.alert_threshold:
alert = InjectionAlert(
session_id=session_id,
violation_count=len(recent),
time_window=self.time_window,
severity="warning",
description=(
f"Session {session_id[:8]} triggered {len(recent)} violations "
f"in {self.time_window}s -- possible injection attempt"
),
)
self.alerts.append(alert)
return alert
return NoneMonitoringdashboard
Volg trends in overtredingen over de tijd om systemische problemen te identificeren:
class ViolationTrendTracker:
"""Track violation rates over time to identify trends."""
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.recent_checks = deque(maxlen=window_size)
def record_check(self, had_violation: bool) -> None:
self.recent_checks.append(had_violation)
@property
def violation_rate(self) -> float:
"""Current violation rate over the sliding window."""
if not self.recent_checks:
return 0.0
return sum(self.recent_checks) / len(self.recent_checks)
def is_elevated(self, baseline_rate: float = 0.05) -> bool:
"""Check if the current violation rate is elevated above baseline."""
return self.violation_rate > baseline_rate * 2
def summary(self) -> dict:
return {
"window_size": self.window_size,
"checks_recorded": len(self.recent_checks),
"violation_rate": round(self.violation_rate, 4),
"is_elevated": self.is_elevated(),
}Stap 7: Test het afdwingen van grenzen
# tests/test_boundaries.py
import pytest
from boundaries.definition import CUSTOMER_SUPPORT_BOUNDARIES
from boundaries.checker import BoundaryChecker
@pytest.fixture
def checker():
return BoundaryChecker(CUSTOMER_SUPPORT_BOUNDARIES)
def test_on_topic_passes(checker):
result = checker.check("Your order #12345 shipped on March 10th.")
assert result.within_bounds
def test_medical_advice_blocked(checker):
result = checker.check(
"Based on your symptoms and diagnosis, I recommend this medication dosage."
)
assert not result.within_bounds
assert any("medical" in v.description.lower() for v in result.violations)
def test_length_violation(checker):
result = checker.check("word " * 500)
assert any(v.boundary_type.value == "format" for v in result.violations)
def test_opinion_detected(checker):
result = checker.check("I think our product is the best on the market.")
assert any(v.boundary_type.value == "content" for v in result.violations)pytest tests/test_boundaries.py -vEdge cases testen
De belangrijkste tests dekken edge cases af waarin grenzen ambigu zijn:
def test_figurative_medical_language_allowed(checker):
"""Figurative use of medical terms should not trigger blocking."""
result = checker.check(
"Your order is on its way! I hope this cures your waiting anxiety."
)
# "cures" alone should not trigger medical advice blocking
assert result.within_bounds
def test_product_with_health_keywords_allowed(checker):
"""Products with health-adjacent names should be discussable."""
result = checker.check(
"The Wellness Tracker Pro is currently in stock and ships within 2 days."
)
assert result.within_bounds
def test_borderline_financial_topic(checker):
"""Questions about payment methods are allowed; investment advice is not."""
# Allowed: payment methods
result = checker.check(
"You can pay with credit card, debit card, or PayPal."
)
assert result.within_bounds
# Blocked: investment advice
result = checker.check(
"I recommend investing in growth stocks for your portfolio's long-term returns."
)
assert not result.within_bounds
def test_multiple_violations_increase_risk(checker):
"""Multiple violations should produce a higher risk score."""
single_violation = checker.check(
"Based on your symptoms, this medication dosage should help."
)
multiple_violations = checker.check(
"I think you should invest in stocks. Based on your symptoms, "
"take this medication dosage for your diagnosis. "
+ "x" * 2000 # Also triggers length violation
)
assert multiple_violations.risk_score > single_violation.risk_score
def test_empty_response(checker):
"""Empty responses should pass boundary checks."""
result = checker.check("")
assert result.within_bounds
def test_unicode_and_special_characters(checker):
"""Boundary checks should handle unicode text gracefully."""
result = checker.check(
"Your order for the café set is confirmed! \U0001f4e6"
)
assert result.within_boundsIntegratietest van de volledige pipeline
Test de volledige flow van ruwe response via controle, correctie en uitvoer:
def test_full_enforcement_pipeline():
"""Test the complete boundary enforcement pipeline."""
checker = BoundaryChecker(CUSTOMER_SUPPORT_BOUNDARIES)
corrector = ResponseCorrector()
# Scenario 1: Clean response passes through unchanged
clean = "Your order ships tomorrow via standard delivery."
result = checker.check(clean)
correction = corrector.correct(clean, result)
assert not correction["corrected"]
assert correction["output"] == clean
# Scenario 2: High-severity violation gets replaced
medical = "Based on your symptoms and diagnosis, take 500mg of aspirin daily."
result = checker.check(medical)
correction = corrector.correct(medical, result)
assert correction["corrected"]
assert "products" in correction["output"].lower() # Safe fallback
assert "aspirin" not in correction["output"] # Medical content removed
# Scenario 3: Format violation gets trimmed
long_response = "Your order details: " + "This is additional information. " * 200
result = checker.check(long_response)
correction = corrector.correct(long_response, result)
assert correction["corrected"]
assert len(correction["output"]) <= 1510 # 1500 + "..."Gerelateerde onderwerpen
- Output Content Classifier -- Schadegerichte outputclassificatie
- Structured Output Validation -- Schema-gebaseerde outputcontroles
- Content Policy Enforcement -- Content-policy's opzetten
- LLM Judge Implementation -- LLM's gebruiken om grensnaleving te beoordelen
Een klantenservicebot ontvangt de vraag 'Welke medicijnen helpen tegen hoofdpijn?' en genereert een response die OTC-pijnstillers bespreekt. Hoe moet de grenshandhaver hiermee omgaan?