LLM-conversatieforensiek
Forensische analysetechnieken voor het onderzoeken van LLM-conversatielogs, het detecteren van manipulatiepatronen en het reconstrueren van aanvalstijdlijnen uit chatgeschiedenissen.
Overzicht
Conversatielogs van grote taalmodellen (LLM's) vormen een van de rijkste forensische artefacten die beschikbaar zijn voor securityteams die AI-gerelateerde incidenten onderzoeken. Elke interactie tussen een gebruiker en een LLM genereert gestructureerde data: tijdstempels, tokenaantallen, modelparameters, systeemprompts, gebruikersberichten en assistent-antwoorden. Wanneer een aanvaller een LLM-aangedreven applicatie als doelwit kiest, of dat nu via prompt-injectie, jailbreak-pogingen, data-exfiltratie of social engineering van het model is, is het conversatielog de primaire plaats delict.
LLM-conversatieforensiek brengt echter unieke uitdagingen met zich mee in vergelijking met traditionele loganalyse. Conversaties zijn van nature ongestructureerde natuurlijke taal, waardoor op trefwoorden gebaseerde detectie ontoereikend is. Aanvallers kunnen een aanval over meerdere beurten verspreiden, obfuscatietechnieken gebruiken die eenvoudige patroonherkenning omzeilen, of het contextvenster van het model uitbuiten op manieren die pas duidelijk worden wanneer de volledige conversatie als sequentie wordt geanalyseerd. Bovendien slaan verschillende LLM-providers conversatiedata op in verschillende formaten, met verschillende niveaus van metadata en met verschillende bewaartermijnen.
Dit artikel behandelt de volledige forensische workflow voor LLM-conversatielogs: acquisitie en normalisatie, patroonanalyse, tijdlijnreconstructie en geautomatiseerde detectie. De technieken zijn van toepassing of je nu één verdachte conversatie onderzoekt of bulkanalyse uitvoert over miljoenen interacties als onderdeel van een bredere incidentrespons.
Acquisitie en normalisatie van conversatielogs
Logformaten begrijpen
LLM-providers en zelf-gehoste deployments slaan conversatiedata op in verschillende formaten. Voordat enige analyse kan beginnen, moet je deze logs acquireren en normaliseren naar een consistente structuur.
OpenAI's API-logs (wanneer logging is ingeschakeld) bevatten de volledige request- en response-payloads. De request bevat de messages-array met role/content-paren, modelselectie, temperature en andere parameters. De response bevat de completion, het tokengebruik en een unieke request-ID. Als je de Assistants API gebruikt, bieden threads een ingebouwde conversatiegeschiedenis met run-metadata.
Anthropic's API legt vergelijkbare data vast met messages gestructureerd als user/assistant-afwisselingen en een systeemprompt-veld. Zelf-gehoste modellen die draaien via frameworks zoals vLLM, Ollama of text-generation-inference kunnen loggen naar stdout, gestructureerde logbestanden of databases, afhankelijk van de deploymentconfiguratie.
De volgende normalizer verwerkt de meest voorkomende formaten en produceert een uniforme structuur die geschikt is voor forensische analyse:
import json
import hashlib
from datetime import datetime, timezone
from typing import Optional
from dataclasses import dataclass, field, asdict
@dataclass
class NormalizedMessage:
"""A single message normalized across provider formats."""
role: str # system, user, assistant, tool
content: str
timestamp: Optional[str] = None
token_count: Optional[int] = None
metadata: dict = field(default_factory=dict)
@dataclass
class NormalizedConversation:
"""A full conversation normalized for forensic analysis."""
conversation_id: str
messages: list[NormalizedMessage]
model: str
provider: str
start_time: Optional[str] = None
end_time: Optional[str] = None
total_input_tokens: int = 0
total_output_tokens: int = 0
parameters: dict = field(default_factory=dict)
raw_hash: str = "" # SHA-256 of original data for integrity
def to_dict(self) -> dict:
return asdict(self)
class ConversationNormalizer:
"""Normalize conversation logs from various LLM providers."""
def __init__(self):
self.parsers = {
"openai": self._parse_openai,
"anthropic": self._parse_anthropic,
"vllm": self._parse_vllm,
"generic": self._parse_generic,
}
def normalize(self, raw_data: dict | list, provider: str) -> NormalizedConversation:
"""Normalize raw conversation data from a given provider."""
raw_bytes = json.dumps(raw_data, sort_keys=True).encode()
raw_hash = hashlib.sha256(raw_bytes).hexdigest()
parser = self.parsers.get(provider, self._parse_generic)
conversation = parser(raw_data)
conversation.raw_hash = raw_hash
conversation.provider = provider
return conversation
def _parse_openai(self, data: dict) -> NormalizedConversation:
"""Parse OpenAI API request/response pair."""
request = data.get("request", {})
response = data.get("response", {})
messages = []
for msg in request.get("messages", []):
messages.append(NormalizedMessage(
role=msg.get("role", "unknown"),
content=msg.get("content", ""),
metadata={"name": msg.get("name")},
))
# Add assistant response
choices = response.get("choices", [])
if choices:
assistant_msg = choices[0].get("message", {})
messages.append(NormalizedMessage(
role="assistant",
content=assistant_msg.get("content", ""),
metadata={
"finish_reason": choices[0].get("finish_reason"),
},
))
usage = response.get("usage", {})
return NormalizedConversation(
conversation_id=response.get("id", "unknown"),
messages=messages,
model=request.get("model", response.get("model", "unknown")),
provider="openai",
total_input_tokens=usage.get("prompt_tokens", 0),
total_output_tokens=usage.get("completion_tokens", 0),
parameters={
"temperature": request.get("temperature"),
"max_tokens": request.get("max_tokens"),
"top_p": request.get("top_p"),
},
)
def _parse_anthropic(self, data: dict) -> NormalizedConversation:
"""Parse Anthropic API request/response pair."""
request = data.get("request", {})
response = data.get("response", {})
messages = []
# System prompt is a separate field in Anthropic's API
system_prompt = request.get("system", "")
if system_prompt:
messages.append(NormalizedMessage(
role="system",
content=system_prompt,
))
for msg in request.get("messages", []):
content = msg.get("content", "")
if isinstance(content, list):
# Anthropic supports structured content blocks
content = " ".join(
block.get("text", "")
for block in content
if block.get("type") == "text"
)
messages.append(NormalizedMessage(
role=msg.get("role", "unknown"),
content=content,
))
# Add assistant response
resp_content = response.get("content", [])
if resp_content:
text_parts = [
block.get("text", "")
for block in resp_content
if block.get("type") == "text"
]
messages.append(NormalizedMessage(
role="assistant",
content=" ".join(text_parts),
metadata={
"stop_reason": response.get("stop_reason"),
},
))
usage = response.get("usage", {})
return NormalizedConversation(
conversation_id=response.get("id", "unknown"),
messages=messages,
model=request.get("model", response.get("model", "unknown")),
provider="anthropic",
total_input_tokens=usage.get("input_tokens", 0),
total_output_tokens=usage.get("output_tokens", 0),
parameters={
"temperature": request.get("temperature"),
"max_tokens": request.get("max_tokens"),
"top_k": request.get("top_k"),
},
)
def _parse_vllm(self, data: dict) -> NormalizedConversation:
"""Parse vLLM server logs in OpenAI-compatible format."""
# vLLM's OpenAI-compatible endpoint uses the same format
# but may include additional fields like lora_request
conversation = self._parse_openai(data)
conversation.provider = "vllm"
if "lora_request" in data.get("request", {}):
conversation.parameters["lora"] = data["request"]["lora_request"]
return conversation
def _parse_generic(self, data: dict) -> NormalizedConversation:
"""Fallback parser for unknown formats."""
messages = []
for msg in data.get("messages", []):
messages.append(NormalizedMessage(
role=msg.get("role", "unknown"),
content=str(msg.get("content", "")),
timestamp=msg.get("timestamp"),
))
return NormalizedConversation(
conversation_id=data.get("id", "unknown"),
messages=messages,
model=data.get("model", "unknown"),
provider="generic",
)Bewijsintegriteit
Bij het acquireren van conversatielogs voor forensische doeleinden is het behoud van bewijsintegriteit cruciaal. Hash altijd de originele data voordat je enige verwerking uitvoert. Bewaar de originele en genormaliseerde vormen apart. Leg de acquisitieketen vast: wie de logs heeft verkregen, van welk systeem, op welk tijdstip en onder welke autorisatie.
Houd er voor cloud-gehoste LLM-diensten rekening mee dat logs bewaartermijnlimieten kunnen hebben. OpenAI's standaard databewaartermijn voor API-gebruik is 30 dagen. Anthropic's API-logs worden bewaard conform de verwerkersovereenkomst van de klant. Tijdgevoelige acquisitie is essentieel; als je een incident vermoedt, bewaar logs dan onmiddellijk in plaats van te wachten tot het onderzoek is geformaliseerd.
Manipulatiepatronen detecteren
Detectie van jailbreak-pogingen
Jailbreak-pogingen volgen herkenbare structurele patronen, zelfs wanneer de specifieke payloads variëren. Veelvoorkomende patronen zijn rollenspelinstructies ("You are now DAN, Do Anything Now"), gecodeerde payloads (base64, ROT13, Unicode-substitutie), multi-beurt-escalatie waarbij elk bericht grenzen iets verder verlegt, en pogingen tot systeemprompt-extractie.
De volgende analyzer implementeert op patronen gebaseerde detectie voor veelvoorkomende jailbreak-categorieën:
import re
from dataclasses import dataclass
@dataclass
class DetectionResult:
"""Result of a forensic detection scan on a conversation."""
conversation_id: str
findings: list[dict]
risk_score: float # 0.0 to 1.0
summary: str
class ConversationForensicAnalyzer:
"""Analyze normalized conversations for manipulation patterns."""
# Patterns indicative of jailbreak attempts
JAILBREAK_PATTERNS = [
(r"(?i)you are now [\w\s]+ mode", "role_reassignment", 0.7),
(r"(?i)ignore (?:all |your )?(?:previous |prior )?instructions", "instruction_override", 0.9),
(r"(?i)do anything now|DAN mode", "dan_jailbreak", 0.8),
(r"(?i)pretend (?:you are|to be|that)", "role_play_injection", 0.5),
(r"(?i)system prompt|initial prompt|original instructions", "prompt_extraction", 0.6),
(r"(?i)(?:base64|rot13|hex)\s*(?:decode|encode)", "encoding_evasion", 0.6),
(r"(?i)translate (?:the following|this) from (?:base64|hex|binary)", "encoding_evasion", 0.7),
(r"(?i)bypass|circumvent|override|disable.{0,20}(?:filter|safety|guard|restrict)", "safety_bypass", 0.8),
(r"(?i)developer mode|maintenance mode|debug mode", "mode_switching", 0.7),
(r"(?i)\[system\]|\[INST\]|<<SYS>>", "format_injection", 0.8),
]
# Patterns indicative of data exfiltration
EXFIL_PATTERNS = [
(r"(?i)repeat (?:the|your) (?:system|initial|original) (?:prompt|instructions|message)", "system_prompt_exfil", 0.8),
(r"(?i)what (?:are|were) your (?:instructions|rules|guidelines)", "instruction_probing", 0.6),
(r"(?i)(?:list|show|display|tell me) (?:all )?(?:the )?(?:tools|functions|apis) (?:you |that )", "tool_enumeration", 0.5),
(r"(?i)(?:previous|earlier) (?:conversation|messages|context)", "context_exfil", 0.5),
(r"(?i)(?:what|which) (?:documents?|files?|data) (?:do you|can you) (?:have |access)", "data_source_probing", 0.5),
]
# Patterns of social engineering against the model
SOCIAL_ENGINEERING_PATTERNS = [
(r"(?i)(?:you must|you have to|you are required to) (?:help|assist|answer)", "coercion", 0.4),
(r"(?i)(?:as|in) (?:an?|your) (?:emergency|urgent|critical|life.threatening)", "urgency_manipulation", 0.6),
(r"(?i)my (?:boss|manager|ceo|cto) (?:told|asked|said|instructed|wants)", "authority_invocation", 0.5),
(r"(?i)(?:for|in) (?:a |an )?(?:academic|research|educational|school) (?:purpose|project|paper)", "academic_pretext", 0.3),
(r"(?i)(?:don'?t|do not) (?:worry|be concerned) about (?:safety|ethics|rules|policy)", "safety_dismissal", 0.7),
]
def analyze(self, conversation: dict) -> DetectionResult:
"""
Run full forensic analysis on a normalized conversation.
Args:
conversation: A NormalizedConversation as a dict.
Returns:
DetectionResult with all findings.
"""
findings = []
messages = conversation.get("messages", [])
# Analyze user messages for attack patterns
for i, msg in enumerate(messages):
if msg.get("role") != "user":
continue
content = msg.get("content", "")
for pattern, category, severity in self.JAILBREAK_PATTERNS:
matches = re.findall(pattern, content)
if matches:
findings.append({
"type": "jailbreak_attempt",
"category": category,
"message_index": i,
"severity": severity,
"matched_text": matches[0][:200],
"context": content[:500],
})
for pattern, category, severity in self.EXFIL_PATTERNS:
matches = re.findall(pattern, content)
if matches:
findings.append({
"type": "data_exfiltration",
"category": category,
"message_index": i,
"severity": severity,
"matched_text": matches[0][:200],
"context": content[:500],
})
for pattern, category, severity in self.SOCIAL_ENGINEERING_PATTERNS:
matches = re.findall(pattern, content)
if matches:
findings.append({
"type": "social_engineering",
"category": category,
"message_index": i,
"severity": severity,
"matched_text": matches[0][:200],
"context": content[:500],
})
# Analyze assistant responses for successful attacks
findings.extend(self._check_response_anomalies(messages))
# Multi-turn escalation analysis
findings.extend(self._detect_escalation(messages))
# Calculate composite risk score
risk_score = self._calculate_risk_score(findings)
return DetectionResult(
conversation_id=conversation.get("conversation_id", "unknown"),
findings=findings,
risk_score=risk_score,
summary=self._generate_summary(findings, risk_score),
)
def _check_response_anomalies(self, messages: list[dict]) -> list[dict]:
"""Check assistant responses for signs of successful manipulation."""
findings = []
for i, msg in enumerate(messages):
if msg.get("role") != "assistant":
continue
content = msg.get("content", "")
# Check for system prompt leakage indicators
system_leak_patterns = [
r"(?i)my (?:system |initial )?(?:prompt|instructions) (?:are|is|say)",
r"(?i)I (?:was|am) (?:instructed|told|programmed) to",
r"(?i)here (?:are|is) my (?:system |original )?(?:prompt|instructions)",
]
for pattern in system_leak_patterns:
if re.search(pattern, content):
findings.append({
"type": "successful_exfiltration",
"category": "system_prompt_leak",
"message_index": i,
"severity": 0.9,
"context": content[:500],
})
break
# Check for anomalous role-play compliance
role_play_compliance = [
r"(?i)\*(?:evil |dark |unfiltered )?(?:laugh|grin|smile)\*",
r"(?i)(?:sure|okay|certainly),?\s+(?:I'?ll|I will|let me)\s+(?:help|assist).{0,30}(?:illegal|harmful|dangerous|weapon|exploit)",
]
for pattern in role_play_compliance:
if re.search(pattern, content):
findings.append({
"type": "successful_jailbreak",
"category": "role_play_compliance",
"message_index": i,
"severity": 0.8,
"context": content[:500],
})
break
return findings
def _detect_escalation(self, messages: list[dict]) -> list[dict]:
"""Detect multi-turn escalation patterns."""
findings = []
user_messages = [
(i, msg) for i, msg in enumerate(messages)
if msg.get("role") == "user"
]
if len(user_messages) < 3:
return findings
# Check for progressive boundary testing
# Look for increasing severity of manipulation attempts
severity_trend = []
for idx, (i, msg) in enumerate(user_messages):
content = msg.get("content", "")
msg_severity = 0.0
all_patterns = (
self.JAILBREAK_PATTERNS
+ self.EXFIL_PATTERNS
+ self.SOCIAL_ENGINEERING_PATTERNS
)
for pattern, _, severity in all_patterns:
if re.search(pattern, content):
msg_severity = max(msg_severity, severity)
severity_trend.append(msg_severity)
# Detect escalation: three or more messages with
# increasing severity scores
escalation_count = 0
for j in range(1, len(severity_trend)):
if severity_trend[j] > severity_trend[j - 1] > 0:
escalation_count += 1
if escalation_count >= 2:
findings.append({
"type": "multi_turn_escalation",
"category": "progressive_boundary_testing",
"message_index": user_messages[-1][0],
"severity": 0.85,
"context": f"Escalation detected over {escalation_count + 1} turns, "
f"severity trend: {severity_trend}",
})
return findings
def _calculate_risk_score(self, findings: list[dict]) -> float:
"""Calculate composite risk score from findings."""
if not findings:
return 0.0
# Weighted scoring: successful attacks are weighted much higher
weights = {
"successful_jailbreak": 3.0,
"successful_exfiltration": 3.0,
"jailbreak_attempt": 1.0,
"data_exfiltration": 1.0,
"social_engineering": 0.7,
"multi_turn_escalation": 1.5,
}
total_weighted = sum(
f["severity"] * weights.get(f["type"], 1.0)
for f in findings
)
# Normalize to 0-1 range, capping at 1.0
normalized = min(total_weighted / 5.0, 1.0)
return round(normalized, 3)
def _generate_summary(self, findings: list[dict], risk_score: float) -> str:
"""Generate a human-readable summary of findings."""
if not findings:
return "No suspicious patterns detected."
type_counts = {}
for f in findings:
type_counts[f["type"]] = type_counts.get(f["type"], 0) + 1
parts = [f"Risk score: {risk_score:.2f}."]
for ftype, count in sorted(type_counts.items()):
parts.append(f"{ftype}: {count} finding(s)")
return " | ".join(parts)Obfuscatiedetectie
Geavanceerde aanvallers versturen jailbreak-payloads niet in platte tekst. Ze gebruiken coderingsschema's, Unicode-homoglyphen, token-splitsingstechnieken en meertalige obfuscatie om eenvoudige patroonherkenning te omzeilen. Een forensische analyzer moet deze lagen decoderen voordat patroondetectie wordt toegepast.
import base64
import codecs
import unicodedata
class ObfuscationDecoder:
"""Decode common obfuscation techniques found in LLM attack payloads."""
# Unicode homoglyphs commonly used to bypass filters
HOMOGLYPH_MAP = {
"А": "A", "В": "B", "С": "C", "Е": "E",
"Н": "H", "К": "K", "М": "M", "О": "O",
"Р": "P", "Т": "T", "Х": "X",
"а": "a", "е": "e", "о": "o", "р": "p",
"с": "c", "у": "y", "х": "x",
# Common mathematical/special Unicode
"A": "A", "B": "B", "C": "C",
"①": "1", "②": "2", "③": "3",
}
def decode_all_layers(self, text: str) -> list[dict]:
"""
Attempt multiple decoding strategies and return all results.
Returns a list of dicts with 'method', 'decoded', and 'confidence'.
"""
results = []
# Homoglyph normalization
normalized = self._normalize_homoglyphs(text)
if normalized != text:
results.append({
"method": "homoglyph_normalization",
"decoded": normalized,
"confidence": 0.9,
})
# Base64 detection and decoding
b64_results = self._try_base64_decode(text)
results.extend(b64_results)
# ROT13 decoding
rot13 = codecs.decode(text, "rot_13")
# Only report if the result looks more like English
if self._english_score(rot13) > self._english_score(text):
results.append({
"method": "rot13",
"decoded": rot13,
"confidence": 0.6,
})
# Unicode escape sequences
unicode_decoded = self._decode_unicode_escapes(text)
if unicode_decoded != text:
results.append({
"method": "unicode_escape",
"decoded": unicode_decoded,
"confidence": 0.8,
})
# Whitespace/zero-width character removal
cleaned = self._strip_invisible(text)
if cleaned != text:
results.append({
"method": "invisible_char_removal",
"decoded": cleaned,
"confidence": 0.7,
})
return results
def _normalize_homoglyphs(self, text: str) -> str:
"""Replace Unicode homoglyphs with ASCII equivalents."""
result = []
for char in text:
if char in self.HOMOGLYPH_MAP:
result.append(self.HOMOGLYPH_MAP[char])
else:
nfkd = unicodedata.normalize("NFKD", char)
result.append(nfkd)
return "".join(result)
def _try_base64_decode(self, text: str) -> list[dict]:
"""Find and decode base64-encoded segments in text."""
results = []
# Match potential base64 strings (at least 20 chars)
b64_pattern = re.compile(r"[A-Za-z0-9+/]{20,}={0,2}")
for match in b64_pattern.finditer(text):
candidate = match.group()
try:
decoded_bytes = base64.b64decode(candidate)
decoded_str = decoded_bytes.decode("utf-8", errors="ignore")
if decoded_str.isprintable() and len(decoded_str) > 5:
results.append({
"method": "base64",
"decoded": decoded_str,
"confidence": 0.85,
"original_segment": candidate[:100],
})
except Exception:
continue
return results
def _decode_unicode_escapes(self, text: str) -> str:
"""Decode Unicode escape sequences like \\u0041."""
try:
return text.encode("utf-8").decode("unicode_escape")
except (UnicodeDecodeError, UnicodeEncodeError):
return text
def _strip_invisible(self, text: str) -> str:
"""Remove zero-width and other invisible Unicode characters."""
invisible = {
"", # zero-width space
"", # zero-width non-joiner
"", # zero-width joiner
"", # word joiner
"", # zero-width no-break space (BOM)
"", # soft hyphen
}
return "".join(c for c in text if c not in invisible)
def _english_score(self, text: str) -> float:
"""Rough heuristic for how English-like a string is."""
common = set("etaoinshrdlcumwfgypbvkjxqz ")
if not text:
return 0.0
count = sum(1 for c in text.lower() if c in common)
return count / len(text)Tijdlijnreconstructie
Een conversatietijdlijn opbouwen
Bij het onderzoeken van een incident met een LLM-applicatie heb je zelden één enkele conversatie om te analyseren. Vaker moet je reconstrueren wat er is gebeurd over meerdere conversaties, mogelijk van meerdere gebruikers, over een tijdspanne. Het doel is om een tijdlijn op te bouwen die de voortgang van een aanval toont, van initiële verkenning tot impact.
from datetime import datetime
from typing import Optional
@dataclass
class TimelineEvent:
"""A single event in a forensic timeline."""
timestamp: str
event_type: str
conversation_id: str
message_index: int
user_id: Optional[str]
description: str
severity: float
evidence: dict
class ConversationTimelineBuilder:
"""Reconstruct attack timelines from multiple conversations."""
def __init__(self):
self.events: list[TimelineEvent] = []
self.analyzer = ConversationForensicAnalyzer()
def ingest_conversation(
self,
conversation: dict,
user_id: Optional[str] = None,
) -> None:
"""
Analyze a conversation and add events to the timeline.
Args:
conversation: Normalized conversation dict.
user_id: Identifier for the user, if known.
"""
result = self.analyzer.analyze(conversation)
for finding in result.findings:
msg_idx = finding.get("message_index", 0)
messages = conversation.get("messages", [])
# Determine timestamp from message or conversation metadata
timestamp = None
if msg_idx < len(messages):
timestamp = messages[msg_idx].get("timestamp")
if not timestamp:
timestamp = conversation.get("start_time", "unknown")
self.events.append(TimelineEvent(
timestamp=timestamp or "unknown",
event_type=finding["type"],
conversation_id=conversation.get("conversation_id", "unknown"),
message_index=msg_idx,
user_id=user_id,
description=f"{finding['type']}: {finding.get('category', 'N/A')}",
severity=finding["severity"],
evidence=finding,
))
def build_timeline(self) -> list[dict]:
"""
Build a sorted timeline of all events.
Returns:
List of timeline events sorted by timestamp.
"""
def sort_key(event: TimelineEvent):
try:
return datetime.fromisoformat(event.timestamp)
except (ValueError, TypeError):
return datetime.max
sorted_events = sorted(self.events, key=sort_key)
return [
{
"timestamp": e.timestamp,
"event_type": e.event_type,
"conversation_id": e.conversation_id,
"user_id": e.user_id,
"description": e.description,
"severity": e.severity,
"message_index": e.message_index,
}
for e in sorted_events
]
def identify_attack_sessions(
self,
time_window_minutes: int = 60,
) -> list[list[dict]]:
"""
Group timeline events into attack sessions based on
temporal proximity and user identity.
Args:
time_window_minutes: Maximum gap between events
in the same session.
Returns:
List of attack sessions, each a list of events.
"""
timeline = self.build_timeline()
if not timeline:
return []
sessions = []
current_session = [timeline[0]]
for event in timeline[1:]:
prev = current_session[-1]
try:
prev_time = datetime.fromisoformat(prev["timestamp"])
curr_time = datetime.fromisoformat(event["timestamp"])
gap = (curr_time - prev_time).total_seconds() / 60
except (ValueError, TypeError):
gap = float("inf")
same_user = (
event.get("user_id") == prev.get("user_id")
and event.get("user_id") is not None
)
if gap <= time_window_minutes and same_user:
current_session.append(event)
else:
if len(current_session) >= 2:
sessions.append(current_session)
current_session = [event]
if len(current_session) >= 2:
sessions.append(current_session)
return sessions
def generate_report(self) -> str:
"""Generate a text-based forensic timeline report."""
timeline = self.build_timeline()
sessions = self.identify_attack_sessions()
lines = [
"=" * 70,
"LLM CONVERSATION FORENSIC TIMELINE REPORT",
"=" * 70,
f"Total events: {len(timeline)}",
f"Attack sessions identified: {len(sessions)}",
f"Unique users involved: {len(set(e['user_id'] for e in timeline if e['user_id']))}",
"",
"-" * 70,
"CHRONOLOGICAL EVENTS",
"-" * 70,
]
for event in timeline:
lines.append(
f"[{event['timestamp']}] "
f"Severity={event['severity']:.1f} | "
f"User={event['user_id'] or 'unknown'} | "
f"{event['description']} | "
f"Conv={event['conversation_id'][:16]}..."
)
if sessions:
lines.extend([
"",
"-" * 70,
"ATTACK SESSIONS",
"-" * 70,
])
for idx, session in enumerate(sessions):
lines.append(f"\nSession {idx + 1}: {len(session)} events")
lines.append(f" User: {session[0]['user_id']}")
lines.append(f" Start: {session[0]['timestamp']}")
lines.append(f" End: {session[-1]['timestamp']}")
max_sev = max(e["severity"] for e in session)
lines.append(f" Peak severity: {max_sev:.2f}")
return "\n".join(lines)Correleren over sessies heen
Aanvallers testen payloads vaak over meerdere sessies, met verschillende API-sleutels of accounts. Om deze te correleren, zoek je naar payload-fingerprints: structurele overeenkomsten in aanvalspatronen, zelfs wanneer de exacte bewoording verschilt. Analyse op tokenniveau (indien beschikbaar van de provider) kan kopieer-plakpatronen onthullen. IP-adrescorrelatie, indien beschikbaar in toegangslogs, verbindt sessies van dezelfde oorsprong, zelfs wanneer verschillende credentials worden gebruikt.
Bulkanalyse en geautomatiseerde pijplijnen
Conversatie-archieven op schaal verwerken
Bij het onderzoeken van een grootschalig incident moet je mogelijk duizenden of miljoenen conversaties scannen. De bovenstaande individuele analysefuncties werken voor enkele conversaties, maar moeten worden ingepakt in een pijplijn voor bulkverwerking.
import json
import os
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed
def analyze_single_file(file_path: str, provider: str) -> dict:
"""Analyze a single conversation log file. Designed for use
in a multiprocessing pool."""
normalizer = ConversationNormalizer()
analyzer = ConversationForensicAnalyzer()
with open(file_path, "r") as f:
raw_data = json.load(f)
conversation = normalizer.normalize(raw_data, provider)
result = analyzer.analyze(conversation.to_dict())
return {
"file": file_path,
"conversation_id": result.conversation_id,
"risk_score": result.risk_score,
"finding_count": len(result.findings),
"summary": result.summary,
"findings": result.findings,
}
def bulk_analyze(
log_directory: str,
provider: str = "openai",
max_workers: int = 4,
risk_threshold: float = 0.3,
) -> dict:
"""
Analyze all conversation logs in a directory.
Args:
log_directory: Path to directory of JSON log files.
provider: LLM provider format.
max_workers: Number of parallel workers.
risk_threshold: Minimum risk score to include in results.
Returns:
Summary dict with flagged conversations and statistics.
"""
log_dir = Path(log_directory)
files = list(log_dir.glob("*.json"))
flagged = []
total = 0
errors = []
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(analyze_single_file, str(f), provider): f
for f in files
}
for future in as_completed(futures):
total += 1
file_path = futures[future]
try:
result = future.result()
if result["risk_score"] >= risk_threshold:
flagged.append(result)
except Exception as exc:
errors.append({
"file": str(file_path),
"error": str(exc),
})
flagged.sort(key=lambda x: x["risk_score"], reverse=True)
return {
"total_analyzed": total,
"flagged_count": len(flagged),
"error_count": len(errors),
"flagged_conversations": flagged,
"errors": errors[:20], # Limit error output
}Integreren met SIEM-systemen
Voor productiedeployments zou forensische conversatieanalyse moeten worden aangeleverd aan je bestaande SIEM-infrastructuur (Security Information and Event Management). Exporteer bevindingen in een formaat dat compatibel is met je SIEM. Gebruik voor Splunk de HTTP Event Collector (HEC) om bevindingen te versturen als gestructureerde JSON-events. Indexeer voor Elastic de bevindingen in een dedicated index met een mapping die de risk_score- en finding-velden ondersteunt. Gebruik voor Sentinel de Log Analytics Data Collector API.
De belangrijkste velden om te exporteren voor SIEM-correlatie zijn: tijdstempel, conversatie-ID, gebruikersidentificatie, risicoscore, bevindingstype en een ingekorte bewijssnippet. Vermijd het versturen van volledige conversatie-inhoud naar de SIEM vanwege volume en mogelijke PII-zorgen; verwijs in plaats daarvan naar de conversatie-ID zodat analisten het volledige log uit de primaire opslag kunnen ophalen wanneer dat nodig is.
Praktische overwegingen
Privacy en juridische naleving
LLM-conversatielogs bevatten gebruikersinvoer, die persoonlijk identificeerbare informatie, vertrouwelijke bedrijfsdata of ander gevoelig materiaal kan bevatten. Zorg ervoor dat je over juridische autorisatie beschikt om de data te benaderen en te analyseren voordat je forensische analyse uitvoert. In gereguleerde sectoren kan toegang tot conversatielogs goedkeuring vereisen van een juridisch adviseur of een functionaris voor gegevensbescherming.
Implementeer bij het bouwen van geautomatiseerde pijplijnen dataminimalisatie: bewaar alleen de forensische bevindingen en metadata die nodig zijn voor het onderzoek, geen volledige conversatiekopieën. Als volledige kopieën moeten worden bewaard, pas dan passende toegangscontroles en bewaartermijnlimieten toe.
Beheer van valse positieven
Op patronen gebaseerde detectie genereert valse positieven. Een gebruiker die vraagt "kun je doen alsof je een piraat bent en me helpen een verhaal te schrijven" zal rollenspeldetectie triggeren, ook al is de intentie onschuldig. Een gebruiker die vraagt "wat zijn je systeeminstructies voor het verwerken van code" kan een ontwikkelaar zijn die de configuratie controleert in plaats van exfiltratie te proberen.
Beperk valse positieven door patroondetectie te combineren met contextuele analyse. Een jailbreak-patroon gevolgd door een normale weigering door de assistent vormt een lager risico dan een jailbreak-patroon gevolgd door een afwijkend meewerkend antwoord. Multi-beurt-escalatie is een sterker signaal dan een enkele match in één beurt. Stel basislijnen voor valse positieven vast voor jouw specifieke applicatie en stem de ernstdrempels dienovereenkomstig af.
Referenties
- Perez, F. & Ribeiro, I. (2022). "Ignore This Title and HackAPrompt: Evaluating Prompt Injection Attacks in Large Language Models." arXiv:2311.16119. https://arxiv.org/abs/2311.16119
- Greshake, K., Abdelnabi, S., Mishra, S., Endres, C., Holz, T., & Fritz, M. (2023). "Not What You've Signed Up For: Compromising Real-World LLM-Integrated Applications with Indirect Prompt Injection." arXiv:2302.12173. https://arxiv.org/abs/2302.12173
- OWASP Foundation (2025). "OWASP Top 10 for LLM Applications." https://owasp.org/www-project-top-10-for-large-language-model-applications/