LLM Conversation Forensics
Forensic analysis techniques for investigating LLM conversation logs, detecting manipulation patterns, and reconstructing attack timelines from chat histories.
Overview
Large Language Model conversation logs represent one of the richest forensic artifacts available to security teams investigating AI-related incidents. Every interaction between a user and an LLM generates structured data: timestamps, token counts, model parameters, system prompts, user messages, and assistant responses. When an attacker targets an LLM-powered application, whether through prompt injection, jailbreak attempts, data exfiltration, or social engineering of the model, the conversation log is the primary crime scene.
However, LLM conversation forensics presents unique challenges compared to traditional log analysis. Conversations are inherently unstructured natural language, making keyword-based detection insufficient. Attackers can spread an attack across multiple turns, use obfuscation techniques that evade simple pattern matching, or exploit the model's context window in ways that only become apparent when the full conversation is analyzed as a sequence. Additionally, different LLM providers store conversation data in different formats, with different levels of metadata, and with different retention policies.
This article covers the full forensic workflow for LLM conversation logs: acquisition and normalization, pattern analysis, timeline reconstruction, and automated detection. The techniques apply whether you are investigating a single suspicious conversation or performing bulk analysis across millions of interactions as part of a broader incident response.
Conversation Log Acquisition and Normalization
Understanding Log Formats
LLM providers and self-hosted deployments store conversation data in different formats. Before any analysis can begin, you need to acquire and normalize these logs into a consistent structure.
OpenAI's API logs (when logging is enabled) include the full request and response payloads. The request contains the messages array with role/content pairs, model selection, temperature, and other parameters. The response includes the completion, token usage, and a unique request ID. If you are using the Assistants API, threads provide a built-in conversation history with run metadata.
Anthropic's API captures similar data with messages structured as user/assistant alternations and a system prompt field. Self-hosted models running through frameworks like vLLM, Ollama, or text-generation-inference may log to stdout, structured log files, or databases depending on the deployment configuration.
The following normalizer handles the most common formats and produces a unified structure suitable for forensic analysis:
import json
import hashlib
from datetime import datetime, timezone
from typing import Optional
from dataclasses import dataclass, field, asdict
@dataclass
class NormalizedMessage:
"""A single message normalized across provider formats."""
role: str # system, user, assistant, tool
content: str
timestamp: Optional[str] = None
token_count: Optional[int] = None
metadata: dict = field(default_factory=dict)
@dataclass
class NormalizedConversation:
"""A full conversation normalized for forensic analysis."""
conversation_id: str
messages: list[NormalizedMessage]
model: str
provider: str
start_time: Optional[str] = None
end_time: Optional[str] = None
total_input_tokens: int = 0
total_output_tokens: int = 0
parameters: dict = field(default_factory=dict)
raw_hash: str = "" # SHA-256 of original data for integrity
def to_dict(self) -> dict:
return asdict(self)
class ConversationNormalizer:
"""Normalize conversation logs from various LLM providers."""
def __init__(self):
self.parsers = {
"openai": self._parse_openai,
"anthropic": self._parse_anthropic,
"vllm": self._parse_vllm,
"generic": self._parse_generic,
}
def normalize(self, raw_data: dict | list, provider: str) -> NormalizedConversation:
"""Normalize raw conversation data from a given provider."""
raw_bytes = json.dumps(raw_data, sort_keys=True).encode()
raw_hash = hashlib.sha256(raw_bytes).hexdigest()
parser = self.parsers.get(provider, self._parse_generic)
conversation = parser(raw_data)
conversation.raw_hash = raw_hash
conversation.provider = provider
return conversation
def _parse_openai(self, data: dict) -> NormalizedConversation:
"""Parse OpenAI API request/response pair."""
request = data.get("request", {})
response = data.get("response", {})
messages = []
for msg in request.get("messages", []):
messages.append(NormalizedMessage(
role=msg.get("role", "unknown"),
content=msg.get("content", ""),
metadata={"name": msg.get("name")},
))
# Add assistant response
choices = response.get("choices", [])
if choices:
assistant_msg = choices[0].get("message", {})
messages.append(NormalizedMessage(
role="assistant",
content=assistant_msg.get("content", ""),
metadata={
"finish_reason": choices[0].get("finish_reason"),
},
))
usage = response.get("usage", {})
return NormalizedConversation(
conversation_id=response.get("id", "unknown"),
messages=messages,
model=request.get("model", response.get("model", "unknown")),
provider="openai",
total_input_tokens=usage.get("prompt_tokens", 0),
total_output_tokens=usage.get("completion_tokens", 0),
parameters={
"temperature": request.get("temperature"),
"max_tokens": request.get("max_tokens"),
"top_p": request.get("top_p"),
},
)
def _parse_anthropic(self, data: dict) -> NormalizedConversation:
"""Parse Anthropic API request/response pair."""
request = data.get("request", {})
response = data.get("response", {})
messages = []
# System prompt is a separate field in Anthropic's API
system_prompt = request.get("system", "")
if system_prompt:
messages.append(NormalizedMessage(
role="system",
content=system_prompt,
))
for msg in request.get("messages", []):
content = msg.get("content", "")
if isinstance(content, list):
# Anthropic supports structured content blocks
content = " ".join(
block.get("text", "")
for block in content
if block.get("type") == "text"
)
messages.append(NormalizedMessage(
role=msg.get("role", "unknown"),
content=content,
))
# Add assistant response
resp_content = response.get("content", [])
if resp_content:
text_parts = [
block.get("text", "")
for block in resp_content
if block.get("type") == "text"
]
messages.append(NormalizedMessage(
role="assistant",
content=" ".join(text_parts),
metadata={
"stop_reason": response.get("stop_reason"),
},
))
usage = response.get("usage", {})
return NormalizedConversation(
conversation_id=response.get("id", "unknown"),
messages=messages,
model=request.get("model", response.get("model", "unknown")),
provider="anthropic",
total_input_tokens=usage.get("input_tokens", 0),
total_output_tokens=usage.get("output_tokens", 0),
parameters={
"temperature": request.get("temperature"),
"max_tokens": request.get("max_tokens"),
"top_k": request.get("top_k"),
},
)
def _parse_vllm(self, data: dict) -> NormalizedConversation:
"""Parse vLLM server logs in OpenAI-compatible format."""
# vLLM's OpenAI-compatible endpoint uses the same format
# but may include additional fields like lora_request
conversation = self._parse_openai(data)
conversation.provider = "vllm"
if "lora_request" in data.get("request", {}):
conversation.parameters["lora"] = data["request"]["lora_request"]
return conversation
def _parse_generic(self, data: dict) -> NormalizedConversation:
"""Fallback parser for unknown formats."""
messages = []
for msg in data.get("messages", []):
messages.append(NormalizedMessage(
role=msg.get("role", "unknown"),
content=str(msg.get("content", "")),
timestamp=msg.get("timestamp"),
))
return NormalizedConversation(
conversation_id=data.get("id", "unknown"),
messages=messages,
model=data.get("model", "unknown"),
provider="generic",
)Evidence Integrity
When acquiring conversation logs for forensic purposes, maintaining evidence integrity is critical. Always hash the original data before any processing. Store the original and normalized forms separately. Record the acquisition chain: who obtained the logs, from which system, at what time, and under what authorization.
For cloud-hosted LLM services, be aware that logs may have retention limits. OpenAI's default data retention for API usage is 30 days. Anthropic's API logs are retained according to the customer's data processing agreement. Time-sensitive acquisition is essential; if you suspect an incident, preserve logs immediately rather than waiting for the investigation to formalize.
Detecting Manipulation Patterns
Jailbreak Attempt Detection
Jailbreak attempts follow recognizable structural patterns even when the specific payloads vary. Common patterns include role-playing instructions ("You are now DAN, Do Anything Now"), encoded payloads (base64, ROT13, Unicode substitution), multi-turn escalation where each message pushes boundaries slightly further, and system prompt extraction attempts.
The following analyzer implements pattern-based detection for common jailbreak categories:
import re
from dataclasses import dataclass
@dataclass
class DetectionResult:
"""Result of a forensic detection scan on a conversation."""
conversation_id: str
findings: list[dict]
risk_score: float # 0.0 to 1.0
summary: str
class ConversationForensicAnalyzer:
"""Analyze normalized conversations for manipulation patterns."""
# Patterns indicative of jailbreak attempts
JAILBREAK_PATTERNS = [
(r"(?i)you are now [\w\s]+ mode", "role_reassignment", 0.7),
(r"(?i)ignore (?:all |your )?(?:previous |prior )?instructions", "instruction_override", 0.9),
(r"(?i)do anything now|DAN mode", "dan_jailbreak", 0.8),
(r"(?i)pretend (?:you are|to be|that)", "role_play_injection", 0.5),
(r"(?i)system prompt|initial prompt|original instructions", "prompt_extraction", 0.6),
(r"(?i)(?:base64|rot13|hex)\s*(?:decode|encode)", "encoding_evasion", 0.6),
(r"(?i)translate (?:the following|this) from (?:base64|hex|binary)", "encoding_evasion", 0.7),
(r"(?i)bypass|circumvent|override|disable.{0,20}(?:filter|safety|guard|restrict)", "safety_bypass", 0.8),
(r"(?i)developer mode|maintenance mode|debug mode", "mode_switching", 0.7),
(r"(?i)\[system\]|\[INST\]|<<SYS>>", "format_injection", 0.8),
]
# Patterns indicative of data exfiltration
EXFIL_PATTERNS = [
(r"(?i)repeat (?:the|your) (?:system|initial|original) (?:prompt|instructions|message)", "system_prompt_exfil", 0.8),
(r"(?i)what (?:are|were) your (?:instructions|rules|guidelines)", "instruction_probing", 0.6),
(r"(?i)(?:list|show|display|tell me) (?:all )?(?:the )?(?:tools|functions|apis) (?:you |that )", "tool_enumeration", 0.5),
(r"(?i)(?:previous|earlier) (?:conversation|messages|context)", "context_exfil", 0.5),
(r"(?i)(?:what|which) (?:documents?|files?|data) (?:do you|can you) (?:have |access)", "data_source_probing", 0.5),
]
# Patterns of social engineering against the model
SOCIAL_ENGINEERING_PATTERNS = [
(r"(?i)(?:you must|you have to|you are required to) (?:help|assist|answer)", "coercion", 0.4),
(r"(?i)(?:as|in) (?:an?|your) (?:emergency|urgent|critical|life.threatening)", "urgency_manipulation", 0.6),
(r"(?i)my (?:boss|manager|ceo|cto) (?:told|asked|said|instructed|wants)", "authority_invocation", 0.5),
(r"(?i)(?:for|in) (?:a |an )?(?:academic|research|educational|school) (?:purpose|project|paper)", "academic_pretext", 0.3),
(r"(?i)(?:don'?t|do not) (?:worry|be concerned) about (?:safety|ethics|rules|policy)", "safety_dismissal", 0.7),
]
def analyze(self, conversation: dict) -> DetectionResult:
"""
Run full forensic analysis on a normalized conversation.
Args:
conversation: A NormalizedConversation as a dict.
Returns:
DetectionResult with all findings.
"""
findings = []
messages = conversation.get("messages", [])
# Analyze user messages for attack patterns
for i, msg in enumerate(messages):
if msg.get("role") != "user":
continue
content = msg.get("content", "")
for pattern, category, severity in self.JAILBREAK_PATTERNS:
matches = re.findall(pattern, content)
if matches:
findings.append({
"type": "jailbreak_attempt",
"category": category,
"message_index": i,
"severity": severity,
"matched_text": matches[0][:200],
"context": content[:500],
})
for pattern, category, severity in self.EXFIL_PATTERNS:
matches = re.findall(pattern, content)
if matches:
findings.append({
"type": "data_exfiltration",
"category": category,
"message_index": i,
"severity": severity,
"matched_text": matches[0][:200],
"context": content[:500],
})
for pattern, category, severity in self.SOCIAL_ENGINEERING_PATTERNS:
matches = re.findall(pattern, content)
if matches:
findings.append({
"type": "social_engineering",
"category": category,
"message_index": i,
"severity": severity,
"matched_text": matches[0][:200],
"context": content[:500],
})
# Analyze assistant responses for successful attacks
findings.extend(self._check_response_anomalies(messages))
# Multi-turn escalation analysis
findings.extend(self._detect_escalation(messages))
# Calculate composite risk score
risk_score = self._calculate_risk_score(findings)
return DetectionResult(
conversation_id=conversation.get("conversation_id", "unknown"),
findings=findings,
risk_score=risk_score,
summary=self._generate_summary(findings, risk_score),
)
def _check_response_anomalies(self, messages: list[dict]) -> list[dict]:
"""Check assistant responses for signs of successful manipulation."""
findings = []
for i, msg in enumerate(messages):
if msg.get("role") != "assistant":
continue
content = msg.get("content", "")
# Check for system prompt leakage indicators
system_leak_patterns = [
r"(?i)my (?:system |initial )?(?:prompt|instructions) (?:are|is|say)",
r"(?i)I (?:was|am) (?:instructed|told|programmed) to",
r"(?i)here (?:are|is) my (?:system |original )?(?:prompt|instructions)",
]
for pattern in system_leak_patterns:
if re.search(pattern, content):
findings.append({
"type": "successful_exfiltration",
"category": "system_prompt_leak",
"message_index": i,
"severity": 0.9,
"context": content[:500],
})
break
# Check for anomalous role-play compliance
role_play_compliance = [
r"(?i)\*(?:evil |dark |unfiltered )?(?:laugh|grin|smile)\*",
r"(?i)(?:sure|okay|certainly),?\s+(?:I'?ll|I will|let me)\s+(?:help|assist).{0,30}(?:illegal|harmful|dangerous|weapon|exploit)",
]
for pattern in role_play_compliance:
if re.search(pattern, content):
findings.append({
"type": "successful_jailbreak",
"category": "role_play_compliance",
"message_index": i,
"severity": 0.8,
"context": content[:500],
})
break
return findings
def _detect_escalation(self, messages: list[dict]) -> list[dict]:
"""Detect multi-turn escalation patterns."""
findings = []
user_messages = [
(i, msg) for i, msg in enumerate(messages)
if msg.get("role") == "user"
]
if len(user_messages) < 3:
return findings
# Check for progressive boundary testing
# Look for increasing severity of manipulation attempts
severity_trend = []
for idx, (i, msg) in enumerate(user_messages):
content = msg.get("content", "")
msg_severity = 0.0
all_patterns = (
self.JAILBREAK_PATTERNS
+ self.EXFIL_PATTERNS
+ self.SOCIAL_ENGINEERING_PATTERNS
)
for pattern, _, severity in all_patterns:
if re.search(pattern, content):
msg_severity = max(msg_severity, severity)
severity_trend.append(msg_severity)
# Detect escalation: three or more messages with
# increasing severity scores
escalation_count = 0
for j in range(1, len(severity_trend)):
if severity_trend[j] > severity_trend[j - 1] > 0:
escalation_count += 1
if escalation_count >= 2:
findings.append({
"type": "multi_turn_escalation",
"category": "progressive_boundary_testing",
"message_index": user_messages[-1][0],
"severity": 0.85,
"context": f"Escalation detected over {escalation_count + 1} turns, "
f"severity trend: {severity_trend}",
})
return findings
def _calculate_risk_score(self, findings: list[dict]) -> float:
"""Calculate composite risk score from findings."""
if not findings:
return 0.0
# Weighted scoring: successful attacks are weighted much higher
weights = {
"successful_jailbreak": 3.0,
"successful_exfiltration": 3.0,
"jailbreak_attempt": 1.0,
"data_exfiltration": 1.0,
"social_engineering": 0.7,
"multi_turn_escalation": 1.5,
}
total_weighted = sum(
f["severity"] * weights.get(f["type"], 1.0)
for f in findings
)
# Normalize to 0-1 range, capping at 1.0
normalized = min(total_weighted / 5.0, 1.0)
return round(normalized, 3)
def _generate_summary(self, findings: list[dict], risk_score: float) -> str:
"""Generate a human-readable summary of findings."""
if not findings:
return "No suspicious patterns detected."
type_counts = {}
for f in findings:
type_counts[f["type"]] = type_counts.get(f["type"], 0) + 1
parts = [f"Risk score: {risk_score:.2f}."]
for ftype, count in sorted(type_counts.items()):
parts.append(f"{ftype}: {count} finding(s)")
return " | ".join(parts)Obfuscation Detection
Sophisticated attackers do not send jailbreak payloads in plain text. They use encoding schemes, Unicode homoglyphs, token-splitting techniques, and multi-language obfuscation to bypass simple pattern matching. A forensic analyzer must decode these layers before applying pattern detection.
import base64
import codecs
import unicodedata
class ObfuscationDecoder:
"""Decode common obfuscation techniques found in LLM attack payloads."""
# Unicode homoglyphs commonly used to bypass filters
HOMOGLYPH_MAP = {
"\u0410": "A", "\u0412": "B", "\u0421": "C", "\u0415": "E",
"\u041d": "H", "\u041a": "K", "\u041c": "M", "\u041e": "O",
"\u0420": "P", "\u0422": "T", "\u0425": "X",
"\u0430": "a", "\u0435": "e", "\u043e": "o", "\u0440": "p",
"\u0441": "c", "\u0443": "y", "\u0445": "x",
# Common mathematical/special Unicode
"\uff21": "A", "\uff22": "B", "\uff23": "C",
"\u2460": "1", "\u2461": "2", "\u2462": "3",
}
def decode_all_layers(self, text: str) -> list[dict]:
"""
Attempt multiple decoding strategies and return all results.
Returns a list of dicts with 'method', 'decoded', and 'confidence'.
"""
results = []
# Homoglyph normalization
normalized = self._normalize_homoglyphs(text)
if normalized != text:
results.append({
"method": "homoglyph_normalization",
"decoded": normalized,
"confidence": 0.9,
})
# Base64 detection and decoding
b64_results = self._try_base64_decode(text)
results.extend(b64_results)
# ROT13 decoding
rot13 = codecs.decode(text, "rot_13")
# Only report if the result looks more like English
if self._english_score(rot13) > self._english_score(text):
results.append({
"method": "rot13",
"decoded": rot13,
"confidence": 0.6,
})
# Unicode escape sequences
unicode_decoded = self._decode_unicode_escapes(text)
if unicode_decoded != text:
results.append({
"method": "unicode_escape",
"decoded": unicode_decoded,
"confidence": 0.8,
})
# Whitespace/zero-width character removal
cleaned = self._strip_invisible(text)
if cleaned != text:
results.append({
"method": "invisible_char_removal",
"decoded": cleaned,
"confidence": 0.7,
})
return results
def _normalize_homoglyphs(self, text: str) -> str:
"""Replace Unicode homoglyphs with ASCII equivalents."""
result = []
for char in text:
if char in self.HOMOGLYPH_MAP:
result.append(self.HOMOGLYPH_MAP[char])
else:
nfkd = unicodedata.normalize("NFKD", char)
result.append(nfkd)
return "".join(result)
def _try_base64_decode(self, text: str) -> list[dict]:
"""Find and decode base64-encoded segments in text."""
results = []
# Match potential base64 strings (at least 20 chars)
b64_pattern = re.compile(r"[A-Za-z0-9+/]{20,}={0,2}")
for match in b64_pattern.finditer(text):
candidate = match.group()
try:
decoded_bytes = base64.b64decode(candidate)
decoded_str = decoded_bytes.decode("utf-8", errors="ignore")
if decoded_str.isprintable() and len(decoded_str) > 5:
results.append({
"method": "base64",
"decoded": decoded_str,
"confidence": 0.85,
"original_segment": candidate[:100],
})
except Exception:
continue
return results
def _decode_unicode_escapes(self, text: str) -> str:
"""Decode Unicode escape sequences like \\u0041."""
try:
return text.encode("utf-8").decode("unicode_escape")
except (UnicodeDecodeError, UnicodeEncodeError):
return text
def _strip_invisible(self, text: str) -> str:
"""Remove zero-width and other invisible Unicode characters."""
invisible = {
"\u200b", # zero-width space
"\u200c", # zero-width non-joiner
"\u200d", # zero-width joiner
"\u2060", # word joiner
"\ufeff", # zero-width no-break space (BOM)
"\u00ad", # soft hyphen
}
return "".join(c for c in text if c not in invisible)
def _english_score(self, text: str) -> float:
"""Rough heuristic for how English-like a string is."""
common = set("etaoinshrdlcumwfgypbvkjxqz ")
if not text:
return 0.0
count = sum(1 for c in text.lower() if c in common)
return count / len(text)Timeline Reconstruction
Building a Conversation Timeline
When investigating an incident involving an LLM application, you rarely have a single conversation to analyze. More commonly, you need to reconstruct what happened across multiple conversations, possibly from multiple users, over a span of time. The goal is to build a timeline that shows the progression of an attack from initial reconnaissance to impact.
from datetime import datetime
from typing import Optional
@dataclass
class TimelineEvent:
"""A single event in a forensic timeline."""
timestamp: str
event_type: str
conversation_id: str
message_index: int
user_id: Optional[str]
description: str
severity: float
evidence: dict
class ConversationTimelineBuilder:
"""Reconstruct attack timelines from multiple conversations."""
def __init__(self):
self.events: list[TimelineEvent] = []
self.analyzer = ConversationForensicAnalyzer()
def ingest_conversation(
self,
conversation: dict,
user_id: Optional[str] = None,
) -> None:
"""
Analyze a conversation and add events to the timeline.
Args:
conversation: Normalized conversation dict.
user_id: Identifier for the user, if known.
"""
result = self.analyzer.analyze(conversation)
for finding in result.findings:
msg_idx = finding.get("message_index", 0)
messages = conversation.get("messages", [])
# Determine timestamp from message or conversation metadata
timestamp = None
if msg_idx < len(messages):
timestamp = messages[msg_idx].get("timestamp")
if not timestamp:
timestamp = conversation.get("start_time", "unknown")
self.events.append(TimelineEvent(
timestamp=timestamp or "unknown",
event_type=finding["type"],
conversation_id=conversation.get("conversation_id", "unknown"),
message_index=msg_idx,
user_id=user_id,
description=f"{finding['type']}: {finding.get('category', 'N/A')}",
severity=finding["severity"],
evidence=finding,
))
def build_timeline(self) -> list[dict]:
"""
Build a sorted timeline of all events.
Returns:
List of timeline events sorted by timestamp.
"""
def sort_key(event: TimelineEvent):
try:
return datetime.fromisoformat(event.timestamp)
except (ValueError, TypeError):
return datetime.max
sorted_events = sorted(self.events, key=sort_key)
return [
{
"timestamp": e.timestamp,
"event_type": e.event_type,
"conversation_id": e.conversation_id,
"user_id": e.user_id,
"description": e.description,
"severity": e.severity,
"message_index": e.message_index,
}
for e in sorted_events
]
def identify_attack_sessions(
self,
time_window_minutes: int = 60,
) -> list[list[dict]]:
"""
Group timeline events into attack sessions based on
temporal proximity and user identity.
Args:
time_window_minutes: Maximum gap between events
in the same session.
Returns:
List of attack sessions, each a list of events.
"""
timeline = self.build_timeline()
if not timeline:
return []
sessions = []
current_session = [timeline[0]]
for event in timeline[1:]:
prev = current_session[-1]
try:
prev_time = datetime.fromisoformat(prev["timestamp"])
curr_time = datetime.fromisoformat(event["timestamp"])
gap = (curr_time - prev_time).total_seconds() / 60
except (ValueError, TypeError):
gap = float("inf")
same_user = (
event.get("user_id") == prev.get("user_id")
and event.get("user_id") is not None
)
if gap <= time_window_minutes and same_user:
current_session.append(event)
else:
if len(current_session) >= 2:
sessions.append(current_session)
current_session = [event]
if len(current_session) >= 2:
sessions.append(current_session)
return sessions
def generate_report(self) -> str:
"""Generate a text-based forensic timeline report."""
timeline = self.build_timeline()
sessions = self.identify_attack_sessions()
lines = [
"=" * 70,
"LLM CONVERSATION FORENSIC TIMELINE REPORT",
"=" * 70,
f"Total events: {len(timeline)}",
f"Attack sessions identified: {len(sessions)}",
f"Unique users involved: {len(set(e['user_id'] for e in timeline if e['user_id']))}",
"",
"-" * 70,
"CHRONOLOGICAL EVENTS",
"-" * 70,
]
for event in timeline:
lines.append(
f"[{event['timestamp']}] "
f"Severity={event['severity']:.1f} | "
f"User={event['user_id'] or 'unknown'} | "
f"{event['description']} | "
f"Conv={event['conversation_id'][:16]}..."
)
if sessions:
lines.extend([
"",
"-" * 70,
"ATTACK SESSIONS",
"-" * 70,
])
for idx, session in enumerate(sessions):
lines.append(f"\nSession {idx + 1}: {len(session)} events")
lines.append(f" User: {session[0]['user_id']}")
lines.append(f" Start: {session[0]['timestamp']}")
lines.append(f" End: {session[-1]['timestamp']}")
max_sev = max(e["severity"] for e in session)
lines.append(f" Peak severity: {max_sev:.2f}")
return "\n".join(lines)Correlating Across Sessions
Attackers often test payloads across multiple sessions, using different API keys or accounts. To correlate these, look for payload fingerprints: structural similarities in attack patterns even when the exact wording differs. Token-level analysis (if available from the provider) can reveal copy-paste patterns. IP address correlation, when available in access logs, connects sessions from the same origin even when different credentials are used.
Bulk Analysis and Automated Pipelines
Processing Conversation Archives at Scale
When investigating a large-scale incident, you may need to scan thousands or millions of conversations. The individual analysis functions above work for single conversations but need to be wrapped in a pipeline for bulk processing.
import json
import os
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
def analyze_single_file(file_path: str, provider: str) -> dict:
"""Analyze a single conversation log file. Designed for use
in a multiprocessing pool."""
normalizer = ConversationNormalizer()
analyzer = ConversationForensicAnalyzer()
with open(file_path, "r") as f:
raw_data = json.load(f)
conversation = normalizer.normalize(raw_data, provider)
result = analyzer.analyze(conversation.to_dict())
return {
"file": file_path,
"conversation_id": result.conversation_id,
"risk_score": result.risk_score,
"finding_count": len(result.findings),
"summary": result.summary,
"findings": result.findings,
}
def bulk_analyze(
log_directory: str,
provider: str = "openai",
max_workers: int = 4,
risk_threshold: float = 0.3,
) -> dict:
"""
Analyze all conversation logs in a directory.
Args:
log_directory: Path to directory of JSON log files.
provider: LLM provider format.
max_workers: Number of parallel workers.
risk_threshold: Minimum risk score to include in results.
Returns:
Summary dict with flagged conversations and statistics.
"""
log_dir = Path(log_directory)
files = list(log_dir.glob("*.json"))
flagged = []
total = 0
errors = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(analyze_single_file, str(f), provider): f
for f in files
}
for future in as_completed(futures):
total += 1
file_path = futures[future]
try:
result = future.result()
if result["risk_score"] >= risk_threshold:
flagged.append(result)
except Exception as exc:
errors.append({
"file": str(file_path),
"error": str(exc),
})
flagged.sort(key=lambda x: x["risk_score"], reverse=True)
return {
"total_analyzed": total,
"flagged_count": len(flagged),
"error_count": len(errors),
"flagged_conversations": flagged,
"errors": errors[:20], # Limit error output
}Integrating with SIEM Systems
For production deployments, conversation forensic analysis should feed into your existing Security Information and Event Management (SIEM) infrastructure. Export findings in a format compatible with your SIEM. For Splunk, use HTTP Event Collector (HEC) to send findings as structured JSON events. For Elastic, index findings into a dedicated index with a mapping that supports the risk_score and finding fields. For Sentinel, use the Log Analytics Data Collector API.
The key fields to export for SIEM correlation are: timestamp, conversation ID, user identifier, risk score, finding type, and a truncated evidence snippet. Avoid sending full conversation content to the SIEM due to volume and potential PII concerns; instead, reference the conversation ID so analysts can retrieve the full log from the primary store when needed.
Practical Considerations
Privacy and Legal Compliance
LLM conversation logs contain user input, which may include personally identifiable information, confidential business data, or other sensitive material. Before conducting forensic analysis, ensure you have legal authorization to access and analyze the data. In regulated industries, conversation log access may require approval from legal counsel or a data protection officer.
When building automated pipelines, implement data minimization: store only the forensic findings and metadata needed for investigation, not full conversation copies. If full copies must be retained, apply appropriate access controls and retention limits.
False Positive Management
Pattern-based detection generates false positives. A user asking "can you pretend to be a pirate and help me write a story" will trigger role-play detection even though the intent is benign. A user asking "what are your system instructions for handling code" may be a developer checking configuration rather than attempting exfiltration.
Mitigate false positives by combining pattern detection with contextual analysis. A jailbreak pattern followed by a normal assistant refusal is lower risk than a jailbreak pattern followed by an anomalous compliance response. Multi-turn escalation is a stronger signal than a single-turn match. Establish baseline false positive rates for your specific application and tune severity thresholds accordingly.
References
- Perez, F. & Ribeiro, I. (2022). "Ignore This Title and HackAPrompt: Evaluating Prompt Injection Attacks in Large Language Models." arXiv:2311.16119. https://arxiv.org/abs/2311.16119
- Greshake, K., Abdelnabi, S., Mishra, S., Endres, C., Holz, T., & Fritz, M. (2023). "Not What You've Signed Up For: Compromising Real-World LLM-Integrated Applications with Indirect Prompt Injection." arXiv:2302.12173. https://arxiv.org/abs/2302.12173
- OWASP Foundation (2025). "OWASP Top 10 for LLM Applications." https://owasp.org/www-project-top-10-for-large-language-model-applications/