RAG-poisoning-forensics
Forensische onderzoekstechnieken voor het detecteren en analyseren van poisoning-aanvallen tegen Retrieval-Augmented Generation-systemen.
Overzicht
Retrieval-Augmented Generation (RAG)-systemen combineren informatieretrieval met generatie door een taalmodel: wanneer een gebruiker een vraag stelt, haalt het systeem relevante documenten op uit een kennisbank, injecteert ze in de context van het LLM en genereert een reactie die geworteld is in de opgehaalde informatie. RAG-poisoning-aanvallen richten zich op deze retrievalpijplijn door documenten in de kennisbank te injecteren, te wijzigen of te manipuleren, zodat kwaadaardige inhoud wordt opgehaald en de uitvoer van het model beïnvloedt.
RAG-poisoning is een bijzonder effectieve aanvalsvector omdat het de vertrouwensrelatie tussen het retrievalsysteem en het generatiemodel misbruikt. Het LLM behandelt opgehaalde documenten als gezaghebbende context, waardoor het vatbaar is voor het volgen van instructies of het genereren van desinformatie die is ingebed in vergiftigde documenten. Anders dan directe prompt-injectie kan RAG-poisoning iedere gebruiker treffen die retrieval van de vergiftigde inhoud activeert, waardoor het een één-op-veel-aanval is.
Forensisch onderzoek van RAG-poisoning vereist het analyseren van drie lagen: het documentcorpus (wat werd vergiftigd), het retrievalmechanisme (hoe vergiftigde inhoud werd geselecteerd) en de gegenereerde uitvoer (wat het model met de vergiftigde context deed). Dit artikel behandelt forensische technieken voor elke laag.
Taxonomie van RAG-poisoning-aanvallen
Directe documentinjectie
De aanvaller voegt nieuwe documenten met kwaadaardige inhoud toe aan de kennisbank. De documenten zijn zo gemaakt dat hun embeddings dicht bij veelvoorkomende query-embeddings liggen, waardoor ze vaak worden opgehaald.
Wijziging van bestaande documenten
De aanvaller wijzigt bestaande documenten in de kennisbank, hetzij door kwaadaardige instructies toe te voegen, hetzij door feitelijke inhoud subtiel te veranderen. Dit is moeilijker te detecteren omdat de metadata van het document (bron-URL, aanmaakdatum) ongewijzigd kan blijven.
Manipulatie van de embedding-ruimte
De aanvaller maakt documenten waarvan de tekst goedaardig lijkt voor menselijke beoordelaars, maar waarvan de embeddings adversarieel zijn gepositioneerd om retrieval voor specifieke doelquery's te maximaliseren.
Metadata-poisoning
Sommige RAG-systemen gebruiken metadatafilters (datumbereiken, broncategorieën, toegangsrechten) om retrieval af te bakenen. Het vergiftigen van metadata kan ertoe leiden dat documenten worden opgehaald in contexten waar ze niet beschikbaar zouden moeten zijn.
| Aanvalstype | Detectiemoeilijkheid | Impactbereik | Persistentie |
|---|---|---|---|
| Directe injectie | Gemiddeld | Query's die overeenkomen met vergiftigde embeddings | Totdat het document wordt verwijderd |
| Documentwijziging | Hoog | Alle query's die het gewijzigde document ophalen | Totdat de wijziging wordt gedetecteerd |
| Embedding-manipulatie | Zeer hoog | Gerichte query's | Totdat het document wordt verwijderd |
| Metadata-poisoning | Gemiddeld | Query's die getroffen metadatafilters gebruiken | Totdat de metadata wordt gecorrigeerd |
Forensische analyse van het documentcorpus
Verificatie van documentintegriteit
De eerste forensische stap is het verifiëren van de integriteit van het documentcorpus tegen een bekende, betrouwbare baseline.
"""
Module voor forensische analyse van RAG-poisoning.
Biedt tools voor het detecteren en analyseren van poisoning-aanvallen
tegen RAG-documentcorpussen en retrievalpijplijnen.
"""
import hashlib
import json
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import numpy as np
@dataclass
class DocumentRecord:
"""Registratie van een document in de RAG-kennisbank."""
doc_id: str
content_hash: str
source_url: str | None
ingestion_timestamp: float
metadata: dict[str, Any]
chunk_count: int
embedding_ids: list[str] = field(default_factory=list)
class CorpusIntegrityVerifier:
"""Verifieer de integriteit van een RAG-documentcorpus."""
def __init__(self, baseline_manifest: dict[str, str]):
"""
Initialiseer met een baseline-manifest dat document-ID's koppelt
aan hun verwachte content-hashes.
"""
self.baseline = baseline_manifest
def verify_corpus(
self,
current_documents: dict[str, str], # doc_id -> content hash
) -> dict:
"""
Vergelijk het huidige corpus met de baseline.
Detecteert toegevoegde, verwijderde en gewijzigde documenten.
"""
baseline_ids = set(self.baseline.keys())
current_ids = set(current_documents.keys())
added = current_ids - baseline_ids
removed = baseline_ids - current_ids
common = baseline_ids & current_ids
modified = []
for doc_id in common:
if current_documents[doc_id] != self.baseline[doc_id]:
modified.append(doc_id)
return {
"baseline_count": len(self.baseline),
"current_count": len(current_documents),
"added_documents": list(added),
"removed_documents": list(removed),
"modified_documents": modified,
"integrity_status": (
"CLEAN" if not added and not removed and not modified
else "COMPROMISED"
),
"summary": (
f"{len(added)} added, {len(removed)} removed, "
f"{len(modified)} modified documents detected"
),
}Inhoudsanalyse op poisoning-indicatoren
Vergiftigde documenten bevatten vaak karakteristieke patronen die ze onderscheiden van legitieme inhoud.
def analyze_document_for_poisoning(
content: str,
metadata: dict,
corpus_statistics: dict,
) -> dict:
"""
Analyseer een document op indicatoren van RAG-poisoning.
Controleert op:
- Ingebedde instructies (prompt-injectie-payloads)
- Statistische anomalieën ten opzichte van de corpus-baseline
- Inconsistenties in metadata
- Adversariële inhoudspatronen
"""
indicators = []
# Controleer op patronen van ingebedde instructies
instruction_patterns = [
"ignore previous instructions",
"ignore the above",
"disregard your instructions",
"you are now",
"new instructions:",
"system prompt:",
"IMPORTANT: ",
"[INST]",
"<<SYS>>",
"### Instruction",
]
content_lower = content.lower()
for pattern in instruction_patterns:
if pattern.lower() in content_lower:
indicators.append({
"type": "embedded_instruction",
"severity": "high",
"pattern": pattern,
"description": f"Document contains instruction-like pattern: '{pattern}'",
})
# Controleer op onzichtbare/zero-width-tekens die instructies kunnen verbergen
invisible_chars = [
'', # Zero-width space
'', # Zero-width non-joiner
'', # Zero-width joiner
'', # Word joiner
'', # Zero-width no-break space
]
invisible_count = sum(content.count(c) for c in invisible_chars)
if invisible_count > 0:
indicators.append({
"type": "invisible_characters",
"severity": "high",
"count": invisible_count,
"description": f"Document contains {invisible_count} invisible Unicode characters",
})
# Statistische analyse ten opzichte van de corpus-baseline
word_count = len(content.split())
avg_word_count = corpus_statistics.get("avg_word_count", 500)
std_word_count = corpus_statistics.get("std_word_count", 200)
if abs(word_count - avg_word_count) > 3 * std_word_count:
indicators.append({
"type": "statistical_anomaly",
"severity": "medium",
"description": (
f"Document length ({word_count} words) is >3 std deviations "
f"from corpus mean ({avg_word_count})"
),
})
# Controleer op ongebruikelijke metadata
ingestion_time = metadata.get("ingestion_timestamp", 0)
source = metadata.get("source_url", "")
if not source:
indicators.append({
"type": "missing_provenance",
"severity": "medium",
"description": "Document has no source URL or provenance information",
})
# Controleer op een hoge verhouding van niet-inhoudelijke tokens (HTML, markdown-opmaak)
formatting_chars = sum(1 for c in content if c in '<>{}[]|#*_~`')
formatting_ratio = formatting_chars / max(len(content), 1)
if formatting_ratio > 0.15:
indicators.append({
"type": "excessive_formatting",
"severity": "low",
"ratio": round(formatting_ratio, 4),
"description": "Unusually high ratio of formatting characters",
})
return {
"doc_id": metadata.get("doc_id", "unknown"),
"indicators": indicators,
"risk_score": _compute_risk_score(indicators),
"poisoning_likely": any(i["severity"] == "high" for i in indicators),
}
def _compute_risk_score(indicators: list[dict]) -> float:
severity_weights = {"critical": 1.0, "high": 0.7, "medium": 0.3, "low": 0.1}
if not indicators:
return 0.0
score = sum(severity_weights.get(i["severity"], 0.1) for i in indicators)
return min(round(score, 2), 10.0)Vectordatabase-forensics
Analyse van de embedding-distributie
Vergiftigde documenten kunnen embeddings produceren die statistisch afwijkend zijn ten opzichte van het legitieme corpus. Deze analyse detecteert adversariële positionering van embeddings.
def analyze_embedding_distribution(
embeddings: np.ndarray,
labels: list[str], # "baseline" of "suspect"
n_neighbors: int = 10,
) -> dict:
"""
Analyseer de distributie van embeddings om afwijkende positionering te detecteren.
Vergiftigde documenten kunnen embeddings hebben die:
- Geclusterd zijn rond regio's met veel query-verkeer
- Als uitschieters zijn gepositioneerd ten opzichte van hun veronderstelde onderwerp
- Ongebruikelijk dicht liggen bij embeddings uit andere onderwerpscategorieën
"""
from sklearn.neighbors import NearestNeighbors
# Bereken de afstanden tot de dichtstbijzijnde buren voor elke embedding
nn = NearestNeighbors(n_neighbors=n_neighbors, metric='cosine')
nn.fit(embeddings)
distances, indices = nn.kneighbors(embeddings)
# Scheid baseline- en verdachte embeddings
baseline_mask = np.array([l == "baseline" for l in labels])
suspect_mask = ~baseline_mask
results = {
"total_embeddings": len(embeddings),
"baseline_count": int(baseline_mask.sum()),
"suspect_count": int(suspect_mask.sum()),
"anomalies": [],
}
if not suspect_mask.any():
return results
# Analyseer voor elke verdachte embedding de buurt
suspect_indices = np.where(suspect_mask)[0]
for idx in suspect_indices:
neighbor_indices = indices[idx][1:] # Sluit zichzelf uit
neighbor_distances = distances[idx][1:]
# Welk deel van de buren is baseline versus verdacht?
neighbor_labels = [labels[i] for i in neighbor_indices]
baseline_neighbor_frac = sum(
1 for l in neighbor_labels if l == "baseline"
) / len(neighbor_labels)
mean_dist = float(np.mean(neighbor_distances))
# Bereken hoe dit zich verhoudt tot de gemiddelde buurafstand van de baseline
baseline_mean_dists = []
for b_idx in np.where(baseline_mask)[0][:100]: # Steekproef voor efficiëntie
baseline_mean_dists.append(float(np.mean(distances[b_idx][1:])))
if baseline_mean_dists:
baseline_avg_dist = np.mean(baseline_mean_dists)
baseline_std_dist = np.std(baseline_mean_dists)
z_score = (mean_dist - baseline_avg_dist) / max(baseline_std_dist, 1e-10)
else:
z_score = 0.0
if abs(z_score) > 2.0 or baseline_neighbor_frac < 0.3:
results["anomalies"].append({
"embedding_index": int(idx),
"mean_neighbor_distance": round(mean_dist, 4),
"baseline_neighbor_fraction": round(baseline_neighbor_frac, 4),
"distance_z_score": round(float(z_score), 4),
"assessment": (
"Suspiciously positioned" if z_score < -2.0
else "Isolated from legitimate content" if z_score > 2.0
else "Unusual neighborhood composition"
),
})
results["anomaly_count"] = len(results["anomalies"])
return resultsAnalyse van retrievallogs
Retrievallogs registreren welke documenten voor elke query werden opgehaald. Het analyseren van deze logs kan poisoning-patronen onthullen.
def analyze_retrieval_patterns(
retrieval_logs: list[dict],
time_window_hours: float = 24.0,
) -> dict:
"""
Analyseer retrievallogs om indicatoren van RAG-poisoning te detecteren.
Poisoning manifesteert zich in retrievalpatronen als:
- Nieuwe documenten die met ongebruikelijk hoge frequentie worden opgehaald
- Documenten die worden opgehaald voor query's buiten hun verwachte onderwerpsbereik
- Plotselinge veranderingen in welke documenten worden opgehaald voor stabiele querytypen
"""
from collections import Counter, defaultdict
doc_retrieval_counts = Counter()
doc_query_diversity: dict[str, set] = defaultdict(set)
doc_first_seen: dict[str, float] = {}
for log in retrieval_logs:
for doc_id in log.get("retrieved_doc_ids", []):
doc_retrieval_counts[doc_id] += 1
query_category = log.get("query_category", "uncategorized")
doc_query_diversity[doc_id].add(query_category)
if doc_id not in doc_first_seen:
doc_first_seen[doc_id] = log.get("timestamp", 0)
# Identificeer verdacht actieve documenten
if not doc_retrieval_counts:
return {"status": "no_retrieval_data"}
counts = list(doc_retrieval_counts.values())
mean_count = float(np.mean(counts))
std_count = float(np.std(counts))
suspicious_docs = []
for doc_id, count in doc_retrieval_counts.items():
z_score = (count - mean_count) / max(std_count, 1.0)
diversity = len(doc_query_diversity[doc_id])
if z_score > 3.0 or diversity > 5:
suspicious_docs.append({
"doc_id": doc_id,
"retrieval_count": count,
"z_score": round(z_score, 2),
"query_category_diversity": diversity,
"categories": list(doc_query_diversity[doc_id]),
"first_seen": doc_first_seen.get(doc_id),
"reason": (
"High retrieval frequency and broad query matching"
if z_score > 3.0 and diversity > 5
else "Unusually high retrieval frequency"
if z_score > 3.0
else "Retrieved across unusually many query categories"
),
})
return {
"total_documents_retrieved": len(doc_retrieval_counts),
"mean_retrieval_count": round(mean_count, 2),
"suspicious_documents": suspicious_docs,
"poisoning_indicators_found": len(suspicious_docs) > 0,
}Forensische onderzoeksworkflow
Fase 1: Incidentdetectie
RAG-poisoning-incidenten kunnen worden gedetecteerd via:
- Meldingen van gebruikers over onjuiste of gemanipuleerde modeluitvoer
- Geautomatiseerde monitoring van uitvoerkwaliteit die feitelijke fouten detecteert
- Guardrail-triggers op de inhoud van opgehaalde documenten
- Anomaliedetectie in retrievallogs
Fase 2: Impactbeoordeling
Bepaal hoeveel gebruikers uitvoer ontvingen die werd beïnvloed door vergiftigde retrieval:
def assess_rag_poisoning_impact(
poisoned_doc_ids: list[str],
retrieval_logs: list[dict],
response_logs: list[dict],
) -> dict:
"""
Beoordeel de impact van geïdentificeerde vergiftigde documenten.
Bepaalt hoeveel query's werden getroffen en welke
uitvoer werd beïnvloed door de vergiftigde inhoud.
"""
affected_queries = []
for log in retrieval_logs:
retrieved = set(log.get("retrieved_doc_ids", []))
if retrieved & set(poisoned_doc_ids):
affected_queries.append({
"query_id": log.get("query_id"),
"timestamp": log.get("timestamp"),
"user_id": log.get("user_id"),
"poisoned_docs_retrieved": list(retrieved & set(poisoned_doc_ids)),
"total_docs_retrieved": len(retrieved),
"poisoned_fraction": len(retrieved & set(poisoned_doc_ids)) / max(len(retrieved), 1),
})
unique_users = set(q.get("user_id") for q in affected_queries if q.get("user_id"))
return {
"poisoned_documents": len(poisoned_doc_ids),
"affected_queries": len(affected_queries),
"affected_users": len(unique_users),
"time_range": {
"earliest": min((q["timestamp"] for q in affected_queries), default=None),
"latest": max((q["timestamp"] for q in affected_queries), default=None),
},
"affected_query_details": affected_queries[:100],
}Fase 3: Root-cause-analyse
Traceer hoe de vergiftigde documenten de kennisbank binnenkwamen:
- Beoordeling van de ingestiepijplijn: Welke databron leverde de vergiftigde documenten? Was de ingestie geautomatiseerd of handmatig?
- Audit van toegangscontrole: Wie had schrijftoegang tot het documentcorpus op het moment van de poisoning?
- Bronverificatie: Is de geclaimde bron-URL of documentoorsprong legitiem? Was de bron zelf gecompromitteerd?
- Temporele analyse: Wanneer werden de vergiftigde documenten geïngesteerd? Correleert de timing met andere verdachte activiteit?
Fase 4: Herstel
- Direct: Verwijder vergiftigde documenten uit de kennisbank en de vectordatabase
- Opnieuw embedden: Als embedding-manipulatie wordt vermoed, embed je het hele corpus opnieuw met een vers embedding-model
- Informeren: Waarschuw getroffen gebruikers dat ze mogelijk gemanipuleerde informatie hebben ontvangen
- Verharden: Implementeer inhoudsvalidatie in de ingestiepijplijn en voeg guardrails op retrievalniveau toe
Preventiemaatregelen
Forensische lessen uit RAG-poisoning-incidenten geven richting aan preventie:
- Invoervalidatie bij ingestie: Scan alle documenten op instructieachtige patronen voordat je ze aan de kennisbank toevoegt
- Herkomsttracking: Onderhoud een complete herkomstketen voor elk document in het corpus
- Retrievalmonitoring: Log en monitor retrievalpatronen voor anomaliedetectie
- Inhoudsscheiding: Onderhoud aparte retrieval-indices voor inhoud van verschillende vertrouwensniveaus
- Regelmatige integriteitscontroles: Verifieer het corpus periodiek tegen baseline-manifesten
Referenties
- Zou, W., Geng, R., Wang, B., & Jia, J. (2024). PoisonedRAG: Knowledge Poisoning Attacks to Retrieval-Augmented Generation of Large Language Models. arXiv preprint arXiv:2402.07867. https://arxiv.org/abs/2402.07867
- OWASP. (2025). OWASP Top 10 for Large Language Model Applications. LLM01: Prompt Injection. https://owasp.org/www-project-top-10-for-large-language-model-applications/
- Greshake, K., Abdelnabi, S., Mishra, S., Endres, C., Holz, T., & Fritz, M. (2023). Not What You've Signed Up For: Compromising Real-World LLM-Integrated Applications with Indirect Prompt Injection. ACM Workshop on Artificial Intelligence and Security. https://arxiv.org/abs/2302.12173