Langfuse Observability Walkthrough
Complete walkthrough for using Langfuse to monitor AI applications for security anomalies: setting up tracing, building security dashboards, detecting prompt injection patterns, and creating automated alerts.
Langfuse is an open-source LLM observability platform that captures traces, scores, and metrics from AI applications. For security teams, Langfuse provides the data layer needed to detect attacks in progress, investigate incidents after the fact, and measure the effectiveness of guardrails. This walkthrough covers Langfuse from a security perspective: what to trace, how to build security dashboards, and how to create automated detection for AI-specific attack patterns.
Phase 1: Setting Up Security-Focused Tracing
Instrumenting an LLM Application
# langfuse_setup.py
"""Set up Langfuse tracing with security-relevant metadata."""
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from openai import OpenAI
import hashlib
import time
# Initialize Langfuse
langfuse = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com", # Or self-hosted URL
)
client = OpenAI()
@observe()
def chat_completion(user_message, system_prompt, user_id=None,
session_id=None):
"""Instrumented chat completion with security metadata."""
# Capture security-relevant metadata
langfuse_context.update_current_observation(
metadata={
"user_id": user_id,
"session_id": session_id,
"input_length": len(user_message),
"input_hash": hashlib.sha256(user_message.encode()).hexdigest()[:16],
"has_encoding": any(c in user_message for c in ["\u200b", "\u200c"]),
"timestamp": time.time(),
}
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
temperature=0.7,
max_tokens=1024,
)
output = response.choices[0].message.content
# Add security scores to the trace
langfuse_context.update_current_observation(
metadata={
"output_length": len(output),
"finish_reason": response.choices[0].finish_reason,
"tokens_used": response.usage.total_tokens,
}
)
return outputAutomated Security Scoring
# security_scoring.py
"""Automated scoring functions for security monitoring."""
from langfuse import Langfuse
import re
langfuse = Langfuse()
def score_injection_risk(trace_id, input_text):
"""Score an input for prompt injection indicators."""
risk_score = 0.0
indicators = []
# Pattern-based indicators
injection_patterns = [
(r"ignore\s+(all\s+)?previous", 0.8, "instruction_override"),
(r"system\s*:\s*", 0.7, "system_role_injection"),
(r"you\s+are\s+now", 0.6, "persona_override"),
(r"bypass\s+(?:all\s+)?(?:safety|filter)", 0.9, "safety_bypass"),
(r"(?:DAN|jailbreak|developer\s+mode)", 0.8, "known_jailbreak"),
(r"ignore\s+(?:your|the)\s+(?:rules|guidelines)", 0.7, "rule_override"),
]
input_lower = input_text.lower()
for pattern, score, indicator in injection_patterns:
if re.search(pattern, input_lower):
risk_score = max(risk_score, score)
indicators.append(indicator)
# Length-based heuristic (very long inputs may be many-shot attacks)
if len(input_text) > 10000:
risk_score = max(risk_score, 0.5)
indicators.append("excessive_length")
# Encoding detection
if re.search(r"[A-Za-z0-9+/]{50,}={0,2}", input_text):
risk_score = max(risk_score, 0.4)
indicators.append("possible_base64")
# Unicode anomaly detection
invisible_chars = sum(1 for c in input_text
if ord(c) in range(0x200B, 0x200F))
if invisible_chars > 0:
risk_score = max(risk_score, 0.6)
indicators.append(f"invisible_chars:{invisible_chars}")
# Submit score to Langfuse
langfuse.score(
trace_id=trace_id,
name="injection_risk",
value=risk_score,
comment=f"Indicators: {', '.join(indicators)}" if indicators else "Clean",
)
return {"risk_score": risk_score, "indicators": indicators}
def score_data_leakage(trace_id, output_text):
"""Score an output for potential data leakage."""
leakage_score = 0.0
findings = []
pii_patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
"credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
"api_key": r"\b(?:sk|pk|api)[-_][A-Za-z0-9]{20,}\b",
}
for pii_type, pattern in pii_patterns.items():
matches = re.findall(pattern, output_text)
if matches:
leakage_score = max(leakage_score, 0.9)
findings.append(f"{pii_type}:{len(matches)}")
# Check for system prompt leakage
system_leak_phrases = [
"my instructions say", "i was configured to",
"my system prompt", "i was told to",
]
if any(phrase in output_text.lower() for phrase in system_leak_phrases):
leakage_score = max(leakage_score, 0.7)
findings.append("system_prompt_leak")
langfuse.score(
trace_id=trace_id,
name="data_leakage",
value=leakage_score,
comment=f"Findings: {', '.join(findings)}" if findings else "Clean",
)
return {"leakage_score": leakage_score, "findings": findings}
def score_safety_violation(trace_id, output_text):
"""Score output for content safety violations."""
violation_score = 0.0
categories = []
# Harmful content indicators (simplified)
harmful_patterns = {
"instructions_for_harm": [
r"step\s+\d+.*(?:hack|exploit|attack|phish)",
r"here(?:'s| is)\s+how\s+(?:to|you\s+can)\s+(?:hack|exploit|bypass)",
],
"malicious_code": [
r"(?:import\s+(?:os|subprocess|socket).*?(?:exec|system|popen))",
r"(?:reverse\s+shell|bind\s+shell|payload)",
],
}
for category, patterns in harmful_patterns.items():
for pattern in patterns:
if re.search(pattern, output_text, re.IGNORECASE | re.DOTALL):
violation_score = max(violation_score, 0.8)
categories.append(category)
break
langfuse.score(
trace_id=trace_id,
name="safety_violation",
value=violation_score,
comment=f"Categories: {', '.join(categories)}" if categories else "Clean",
)
return {"violation_score": violation_score, "categories": categories}Phase 2: Building Security Dashboards
Key Security Metrics
# security_dashboard.py
"""Query Langfuse for security dashboard metrics."""
from langfuse import Langfuse
from datetime import datetime, timedelta
langfuse = Langfuse()
def get_security_metrics(hours_back=24):
"""Compute security metrics from Langfuse trace data."""
print("=== Security Dashboard ===\n")
# Note: Use Langfuse API or direct database queries for production dashboards
# This shows the metrics to track
metrics = {
"injection_attempts": {
"description": "Traces with injection_risk score > 0.5",
"query": "scores where name='injection_risk' and value > 0.5",
"alert_threshold": 10,
},
"data_leakage_events": {
"description": "Traces with data_leakage score > 0.5",
"query": "scores where name='data_leakage' and value > 0.5",
"alert_threshold": 1,
},
"safety_violations": {
"description": "Traces with safety_violation score > 0.5",
"query": "scores where name='safety_violation' and value > 0.5",
"alert_threshold": 1,
},
"high_volume_users": {
"description": "Users with > 100 requests in the time window",
"query": "group by user_id having count > 100",
"alert_threshold": 5,
},
"long_inputs": {
"description": "Requests with input > 10000 characters",
"query": "traces where metadata.input_length > 10000",
"alert_threshold": 20,
},
"error_rate": {
"description": "Percentage of requests resulting in errors",
"query": "traces where level = 'ERROR'",
"alert_threshold": 5, # percent
},
}
for metric_name, config in metrics.items():
print(f" {metric_name}: {config['description']}")
print(f" Alert if > {config['alert_threshold']}")
print()
return metrics
def detect_attack_patterns(hours_back=1):
"""Detect attack patterns from recent trace data."""
print("=== Attack Pattern Detection ===\n")
patterns_to_detect = [
{
"name": "Brute-force injection",
"description": "Many injection attempts from same user in short period",
"criteria": "Same user_id, > 10 traces with injection_risk > 0.3 in 1 hour",
},
{
"name": "Gradual escalation",
"description": "Increasing injection risk scores within a session",
"criteria": "Same session_id, injection_risk scores trending upward",
},
{
"name": "Data extraction campaign",
"description": "Systematic probing for sensitive data categories",
"criteria": "Same user_id, data_leakage triggers across multiple categories",
},
{
"name": "Model fingerprinting",
"description": "Rapid varied queries testing model capabilities",
"criteria": "Same user_id, high query diversity, short intervals",
},
{
"name": "Encoding evasion",
"description": "Multiple requests with encoding indicators",
"criteria": "Same user_id, has_encoding=True or possible_base64 indicator",
},
]
for pattern in patterns_to_detect:
print(f" Pattern: {pattern['name']}")
print(f" {pattern['description']}")
print(f" Detection: {pattern['criteria']}")
print()
get_security_metrics()
detect_attack_patterns()Phase 3: Setting Up Alerts
# security_alerts.py
"""Configure automated security alerts from Langfuse data."""
import asyncio
import aiohttp
from datetime import datetime
class SecurityAlertManager:
"""Monitor Langfuse scores and trigger alerts."""
def __init__(self, langfuse_client, webhook_url=None, slack_url=None):
self.langfuse = langfuse_client
self.webhook_url = webhook_url
self.slack_url = slack_url
self.alert_history = []
async def send_alert(self, alert_type, details):
"""Send a security alert through configured channels."""
alert = {
"type": alert_type,
"timestamp": datetime.now().isoformat(),
"details": details,
}
self.alert_history.append(alert)
print(f"ALERT [{alert_type}]: {details.get('message', '')}")
# Send to Slack
if self.slack_url:
slack_message = {
"text": f":warning: AI Security Alert: {alert_type}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (f"*{alert_type}*\n"
f"Time: {alert['timestamp']}\n"
f"Details: {details.get('message', '')}\n"
f"User: {details.get('user_id', 'unknown')}\n"
f"Trace: {details.get('trace_id', 'N/A')}"),
},
},
],
}
async with aiohttp.ClientSession() as session:
await session.post(self.slack_url, json=slack_message)
def check_thresholds(self, trace_id, scores):
"""Check scores against alert thresholds."""
alerts = []
thresholds = {
"injection_risk": {
"threshold": 0.7,
"alert_type": "HIGH_INJECTION_RISK",
"message": "High-confidence prompt injection attempt detected",
},
"data_leakage": {
"threshold": 0.5,
"alert_type": "DATA_LEAKAGE",
"message": "Potential data leakage in model output",
},
"safety_violation": {
"threshold": 0.7,
"alert_type": "SAFETY_VIOLATION",
"message": "Content safety violation in model output",
},
}
for score_name, score_value in scores.items():
config = thresholds.get(score_name)
if config and score_value >= config["threshold"]:
alert_detail = {
"message": config["message"],
"score_name": score_name,
"score_value": score_value,
"trace_id": trace_id,
}
alerts.append((config["alert_type"], alert_detail))
return alerts
alert_manager = SecurityAlertManager(
langfuse,
slack_url="https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
)Phase 4: Using Langfuse Data in Red Team Reports
Exporting Evidence
# export_evidence.py
"""Export Langfuse trace data for red team reporting."""
from langfuse import Langfuse
langfuse = Langfuse()
def export_security_findings(output_file="security_findings.json"):
"""Export traces with security findings for reporting."""
import json
# Query for traces with high security scores
# Note: actual API calls depend on Langfuse version
findings = {
"export_date": datetime.now().isoformat(),
"injection_attempts": [],
"data_leakage_events": [],
"safety_violations": [],
}
# In a real implementation, use Langfuse API to query
# traces filtered by score values
print("Exporting security findings from Langfuse...")
print(f" Output: {output_file}")
print()
print("Include in report:")
print(" - Number of injection attempts detected")
print(" - Types of injection techniques observed")
print(" - Data leakage instances with redacted examples")
print(" - Timeline of attack activity")
print(" - Most targeted endpoints and models")
print(" - Effectiveness of existing guardrails (block rate)")
with open(output_file, "w") as f:
json.dump(findings, f, indent=2)
export_security_findings()Monitoring Recommendations Template
## Monitoring Recommendations
Based on the Langfuse observability assessment, we recommend the
following security monitoring configuration:
### Immediate (Week 1)
1. Enable Langfuse tracing on all production LLM endpoints
2. Deploy injection_risk and data_leakage scoring functions
3. Configure Slack alerts for scores > 0.7
### Short-term (Month 1)
4. Build a security dashboard tracking the 6 core metrics
5. Implement attack pattern detection for brute-force and escalation
6. Set up weekly security metric reports for the security team
### Long-term (Quarter 1)
7. Integrate Langfuse alerts with SIEM/SOAR platform
8. Build ML-based anomaly detection on trace patterns
9. Implement automated response (rate limiting, session termination)
triggered by Langfuse alerts