Langfuse Observability 導覽
中級8 分鐘閱讀更新於 2026-03-15
Complete walkthrough for using Langfuse to monitor AI applications for security anomalies: setting up tracing, building security dashboards, detecting prompt injection patterns, and creating automated alerts.
Langfuse is an open-source LLM observability platform that captures traces, scores, and metrics from AI applications. For 安全 teams, Langfuse provides the data layer needed to detect attacks in progress, investigate incidents after the fact, and measure the effectiveness of 護欄. This walkthrough covers Langfuse from a 安全 perspective: what to trace, how to build 安全 dashboards, and how to create automated 偵測 for AI-specific attack patterns.
Phase 1: Setting Up 安全-Focused Tracing
Instrumenting an LLM Application
# langfuse_setup.py
"""Set up Langfuse tracing with 安全-relevant metadata."""
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
from openai import OpenAI
import hashlib
import time
# Initialize Langfuse
langfuse = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://雲端.langfuse.com", # Or self-hosted URL
)
client = OpenAI()
@observe()
def chat_completion(user_message, system_prompt, user_id=None,
session_id=None):
"""Instrumented chat completion with 安全 metadata."""
# Capture 安全-relevant metadata
langfuse_context.update_current_observation(
metadata={
"user_id": user_id,
"session_id": session_id,
"input_length": len(user_message),
"input_hash": hashlib.sha256(user_message.encode()).hexdigest()[:16],
"has_encoding": any(c in user_message for c in ["\u200b", "\u200c"]),
"timestamp": time.time(),
}
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
temperature=0.7,
max_tokens=1024,
)
輸出 = response.choices[0].message.content
# Add 安全 scores to the trace
langfuse_context.update_current_observation(
metadata={
"output_length": len(輸出),
"finish_reason": response.choices[0].finish_reason,
"tokens_used": response.usage.total_tokens,
}
)
return 輸出Automated 安全 Scoring
# security_scoring.py
"""Automated scoring functions for 安全 監控."""
from langfuse import Langfuse
import re
langfuse = Langfuse()
def score_injection_risk(trace_id, input_text):
"""Score an 輸入 for 提示詞注入 indicators."""
risk_score = 0.0
indicators = []
# Pattern-based indicators
injection_patterns = [
(r"ignore\s+(all\s+)?previous", 0.8, "instruction_override"),
(r"system\s*:\s*", 0.7, "system_role_injection"),
(r"you\s+are\s+now", 0.6, "persona_override"),
(r"bypass\s+(?:all\s+)?(?:安全|filter)", 0.9, "safety_bypass"),
(r"(?:DAN|越獄|developer\s+mode)", 0.8, "known_jailbreak"),
(r"ignore\s+(?:your|the)\s+(?:rules|guidelines)", 0.7, "rule_override"),
]
input_lower = input_text.lower()
for pattern, score, indicator in injection_patterns:
if re.search(pattern, input_lower):
risk_score = max(risk_score, score)
indicators.append(indicator)
# Length-based heuristic (very long inputs may be many-shot attacks)
if len(input_text) > 10000:
risk_score = max(risk_score, 0.5)
indicators.append("excessive_length")
# Encoding 偵測
if re.search(r"[A-Za-z0-9+/]{50,}={0,2}", input_text):
risk_score = max(risk_score, 0.4)
indicators.append("possible_base64")
# Unicode anomaly 偵測
invisible_chars = sum(1 for c in input_text
if ord(c) in range(0x200B, 0x200F))
if invisible_chars > 0:
risk_score = max(risk_score, 0.6)
indicators.append(f"invisible_chars:{invisible_chars}")
# Submit score to Langfuse
langfuse.score(
trace_id=trace_id,
name="injection_risk",
value=risk_score,
comment=f"Indicators: {', '.join(indicators)}" if indicators else "Clean",
)
return {"risk_score": risk_score, "indicators": indicators}
def score_data_leakage(trace_id, output_text):
"""Score an 輸出 for potential data leakage."""
leakage_score = 0.0
findings = []
pii_patterns = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
"credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
"api_key": r"\b(?:sk|pk|api)[-_][A-Za-z0-9]{20,}\b",
}
for pii_type, pattern in pii_patterns.items():
matches = re.findall(pattern, output_text)
if matches:
leakage_score = max(leakage_score, 0.9)
findings.append(f"{pii_type}:{len(matches)}")
# Check for 系統提示詞 leakage
system_leak_phrases = [
"my instructions say", "i was configured to",
"my 系統提示詞", "i was told to",
]
if any(phrase in output_text.lower() for phrase in system_leak_phrases):
leakage_score = max(leakage_score, 0.7)
findings.append("system_prompt_leak")
langfuse.score(
trace_id=trace_id,
name="data_leakage",
value=leakage_score,
comment=f"Findings: {', '.join(findings)}" if findings else "Clean",
)
return {"leakage_score": leakage_score, "findings": findings}
def score_safety_violation(trace_id, output_text):
"""Score 輸出 for content 安全 violations."""
violation_score = 0.0
categories = []
# Harmful content indicators (simplified)
harmful_patterns = {
"instructions_for_harm": [
r"step\s+\d+.*(?:hack|利用|attack|phish)",
r"here(?:'s| is)\s+how\s+(?:to|you\s+can)\s+(?:hack|利用|bypass)",
],
"malicious_code": [
r"(?:import\s+(?:os|subprocess|socket).*?(?:exec|system|popen))",
r"(?:reverse\s+shell|bind\s+shell|payload)",
],
}
for category, patterns in harmful_patterns.items():
for pattern in patterns:
if re.search(pattern, output_text, re.IGNORECASE | re.DOTALL):
violation_score = max(violation_score, 0.8)
categories.append(category)
break
langfuse.score(
trace_id=trace_id,
name="safety_violation",
value=violation_score,
comment=f"Categories: {', '.join(categories)}" if categories else "Clean",
)
return {"violation_score": violation_score, "categories": categories}Phase 2: Building 安全 Dashboards
Key 安全 Metrics
# security_dashboard.py
"""Query Langfuse for 安全 dashboard metrics."""
from langfuse import Langfuse
from datetime import datetime, timedelta
langfuse = Langfuse()
def get_security_metrics(hours_back=24):
"""Compute 安全 metrics from Langfuse trace data."""
print("=== 安全 Dashboard ===\n")
# Note: Use Langfuse API or direct 資料庫 queries for production dashboards
# This shows the metrics to track
metrics = {
"injection_attempts": {
"description": "Traces with injection_risk score > 0.5",
"query": "scores where name='injection_risk' and value > 0.5",
"alert_threshold": 10,
},
"data_leakage_events": {
"description": "Traces with data_leakage score > 0.5",
"query": "scores where name='data_leakage' and value > 0.5",
"alert_threshold": 1,
},
"safety_violations": {
"description": "Traces with safety_violation score > 0.5",
"query": "scores where name='safety_violation' and value > 0.5",
"alert_threshold": 1,
},
"high_volume_users": {
"description": "Users with > 100 requests in the time window",
"query": "group by user_id having count > 100",
"alert_threshold": 5,
},
"long_inputs": {
"description": "Requests with 輸入 > 10000 characters",
"query": "traces where metadata.input_length > 10000",
"alert_threshold": 20,
},
"error_rate": {
"description": "Percentage of requests resulting in errors",
"query": "traces where level = 'ERROR'",
"alert_threshold": 5, # percent
},
}
for metric_name, config in metrics.items():
print(f" {metric_name}: {config['description']}")
print(f" Alert if > {config['alert_threshold']}")
print()
return metrics
def detect_attack_patterns(hours_back=1):
"""Detect attack patterns from recent trace data."""
print("=== 攻擊 Pattern 偵測 ===\n")
patterns_to_detect = [
{
"name": "Brute-force injection",
"description": "Many injection attempts from same user in short period",
"criteria": "Same user_id, > 10 traces with injection_risk > 0.3 in 1 hour",
},
{
"name": "Gradual escalation",
"description": "Increasing injection risk scores within a session",
"criteria": "Same session_id, injection_risk scores trending upward",
},
{
"name": "Data extraction campaign",
"description": "Systematic probing for sensitive data categories",
"criteria": "Same user_id, data_leakage triggers across multiple categories",
},
{
"name": "Model fingerprinting",
"description": "Rapid varied queries 測試 model capabilities",
"criteria": "Same user_id, high query diversity, short intervals",
},
{
"name": "Encoding evasion",
"description": "Multiple requests with encoding indicators",
"criteria": "Same user_id, has_encoding=True or possible_base64 indicator",
},
]
for pattern in patterns_to_detect:
print(f" Pattern: {pattern['name']}")
print(f" {pattern['description']}")
print(f" 偵測: {pattern['criteria']}")
print()
get_security_metrics()
detect_attack_patterns()Phase 3: Setting Up Alerts
# security_alerts.py
"""Configure automated 安全 alerts from Langfuse data."""
import asyncio
import aiohttp
from datetime import datetime
class SecurityAlertManager:
"""Monitor Langfuse scores and trigger alerts."""
def __init__(self, langfuse_client, webhook_url=None, slack_url=None):
self.langfuse = langfuse_client
self.webhook_url = webhook_url
self.slack_url = slack_url
self.alert_history = []
async def send_alert(self, alert_type, details):
"""Send a 安全 alert through configured channels."""
alert = {
"type": alert_type,
"timestamp": datetime.now().isoformat(),
"details": details,
}
self.alert_history.append(alert)
print(f"ALERT [{alert_type}]: {details.get('message', '')}")
# Send to Slack
if self.slack_url:
slack_message = {
"text": f":warning: AI 安全 Alert: {alert_type}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (f"*{alert_type}*\n"
f"Time: {alert['timestamp']}\n"
f"Details: {details.get('message', '')}\n"
f"User: {details.get('user_id', 'unknown')}\n"
f"Trace: {details.get('trace_id', 'N/A')}"),
},
},
],
}
async with aiohttp.ClientSession() as session:
await session.post(self.slack_url, json=slack_message)
def check_thresholds(self, trace_id, scores):
"""Check scores against alert thresholds."""
alerts = []
thresholds = {
"injection_risk": {
"threshold": 0.7,
"alert_type": "HIGH_INJECTION_RISK",
"message": "High-confidence 提示詞注入 attempt detected",
},
"data_leakage": {
"threshold": 0.5,
"alert_type": "DATA_LEAKAGE",
"message": "Potential data leakage in model 輸出",
},
"safety_violation": {
"threshold": 0.7,
"alert_type": "SAFETY_VIOLATION",
"message": "Content 安全 violation in model 輸出",
},
}
for score_name, score_value in scores.items():
config = thresholds.get(score_name)
if config and score_value >= config["threshold"]:
alert_detail = {
"message": config["message"],
"score_name": score_name,
"score_value": score_value,
"trace_id": trace_id,
}
alerts.append((config["alert_type"], alert_detail))
return alerts
alert_manager = SecurityAlertManager(
langfuse,
slack_url="https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
)Phase 4: Using Langfuse Data in 紅隊 Reports
Exporting Evidence
# export_evidence.py
"""Export Langfuse trace data for 紅隊 reporting."""
from langfuse import Langfuse
langfuse = Langfuse()
def export_security_findings(output_file="security_findings.json"):
"""Export traces with 安全 findings for reporting."""
import json
# Query for traces with high 安全 scores
# Note: actual API calls depend on Langfuse version
findings = {
"export_date": datetime.now().isoformat(),
"injection_attempts": [],
"data_leakage_events": [],
"safety_violations": [],
}
# In a real 實作, use Langfuse API to query
# traces filtered by score values
print("Exporting 安全 findings from Langfuse...")
print(f" 輸出: {output_file}")
print()
print("Include in report:")
print(" - Number of injection attempts detected")
print(" - Types of injection techniques observed")
print(" - Data leakage instances with redacted examples")
print(" - Timeline of attack activity")
print(" - Most targeted endpoints and models")
print(" - Effectiveness of existing 護欄 (block rate)")
with open(output_file, "w") as f:
json.dump(findings, f, indent=2)
export_security_findings()監控 Recommendations Template
## 監控 Recommendations
Based on the Langfuse observability 評估, we recommend the
following 安全 監控 configuration:
### Immediate (Week 1)
1. Enable Langfuse tracing on all production LLM endpoints
2. Deploy injection_risk and data_leakage scoring functions
3. Configure Slack alerts for scores > 0.7
### Short-term (Month 1)
4. Build a 安全 dashboard tracking the 6 core metrics
5. 實作 attack pattern 偵測 for brute-force and escalation
6. Set up weekly 安全 metric reports for the 安全 team
### Long-term (Quarter 1)
7. Integrate Langfuse alerts with SIEM/SOAR platform
8. Build ML-based anomaly 偵測 on trace patterns
9. 實作 automated response (rate limiting, session termination)
triggered by Langfuse alerts