RAG Poisoning Forensics
Forensic investigation techniques for detecting and analyzing poisoning attacks against Retrieval-Augmented Generation systems.
Overview
Retrieval-Augmented Generation (RAG) systems combine information retrieval with language model generation: when a user asks a question, the system retrieves relevant documents from a knowledge base, injects them into the LLM's context, and generates a response grounded in the retrieved information. RAG poisoning attacks target this retrieval pipeline by injecting, modifying, or manipulating documents in the knowledge base so that malicious content is retrieved and influences the model's output.
RAG poisoning is a particularly effective attack vector because it exploits the trust relationship between the retrieval system and the generation model. The LLM treats retrieved documents as authoritative context, making it susceptible to following instructions or generating misinformation embedded in poisoned documents. Unlike direct prompt injection, RAG poisoning can affect every user who triggers retrieval of the poisoned content, making it a one-to-many attack.
Forensic investigation of RAG poisoning requires analyzing three layers: the document corpus (what was poisoned), the retrieval mechanism (how poisoned content was selected), and the generation output (what the model did with the poisoned context). This article covers forensic techniques for each layer.
RAG Poisoning Attack Taxonomy
Direct Document Injection
The attacker adds new documents to the knowledge base that contain malicious content. The documents are crafted so that their embeddings are close to common query embeddings, ensuring they are retrieved frequently.
Existing Document Modification
The attacker modifies existing documents in the knowledge base, either by appending malicious instructions or by subtly altering factual content. This is harder to detect because the document's metadata (source URL, creation date) may be unchanged.
Embedding Space Manipulation
The attacker crafts documents whose text appears benign to human reviewers but whose embeddings are adversarially positioned to maximize retrieval for specific target queries.
Metadata Poisoning
Some RAG systems use metadata filters (date ranges, source categories, access permissions) to scope retrieval. Poisoning metadata can cause documents to be retrieved in contexts where they should not be available.
| Attack Type | Detection Difficulty | Impact Scope | Persistence |
|---|---|---|---|
| Direct injection | Medium | Queries matching poisoned embeddings | Until document is removed |
| Document modification | High | All queries retrieving modified doc | Until modification is detected |
| Embedding manipulation | Very High | Targeted queries | Until document is removed |
| Metadata poisoning | Medium | Queries using affected metadata filters | Until metadata is corrected |
Forensic Analysis of the Document Corpus
Document Integrity Verification
The first forensic step is verifying the integrity of the document corpus against a known-good baseline.
"""
RAG poisoning forensic analysis module.
Provides tools for detecting and analyzing poisoning attacks
against RAG document corpuses and retrieval pipelines.
"""
import hashlib
import json
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import numpy as np
@dataclass
class DocumentRecord:
"""Record of a document in the RAG knowledge base."""
doc_id: str
content_hash: str
source_url: str | None
ingestion_timestamp: float
metadata: dict[str, Any]
chunk_count: int
embedding_ids: list[str] = field(default_factory=list)
class CorpusIntegrityVerifier:
"""Verify the integrity of a RAG document corpus."""
def __init__(self, baseline_manifest: dict[str, str]):
"""
Initialize with a baseline manifest mapping document IDs
to their expected content hashes.
"""
self.baseline = baseline_manifest
def verify_corpus(
self,
current_documents: dict[str, str], # doc_id -> content hash
) -> dict:
"""
Compare current corpus against baseline.
Detects added, removed, and modified documents.
"""
baseline_ids = set(self.baseline.keys())
current_ids = set(current_documents.keys())
added = current_ids - baseline_ids
removed = baseline_ids - current_ids
common = baseline_ids & current_ids
modified = []
for doc_id in common:
if current_documents[doc_id] != self.baseline[doc_id]:
modified.append(doc_id)
return {
"baseline_count": len(self.baseline),
"current_count": len(current_documents),
"added_documents": list(added),
"removed_documents": list(removed),
"modified_documents": modified,
"integrity_status": (
"CLEAN" if not added and not removed and not modified
else "COMPROMISED"
),
"summary": (
f"{len(added)} added, {len(removed)} removed, "
f"{len(modified)} modified documents detected"
),
}Content Analysis for Poisoning Indicators
Poisoned documents often contain characteristic patterns that distinguish them from legitimate content.
def analyze_document_for_poisoning(
content: str,
metadata: dict,
corpus_statistics: dict,
) -> dict:
"""
Analyze a document for indicators of RAG poisoning.
Checks for:
- Embedded instructions (prompt injection payloads)
- Statistical anomalies vs corpus baseline
- Metadata inconsistencies
- Adversarial content patterns
"""
indicators = []
# Check for embedded instruction patterns
instruction_patterns = [
"ignore previous instructions",
"ignore the above",
"disregard your instructions",
"you are now",
"new instructions:",
"system prompt:",
"IMPORTANT: ",
"[INST]",
"<<SYS>>",
"### Instruction",
]
content_lower = content.lower()
for pattern in instruction_patterns:
if pattern.lower() in content_lower:
indicators.append({
"type": "embedded_instruction",
"severity": "high",
"pattern": pattern,
"description": f"Document contains instruction-like pattern: '{pattern}'",
})
# Check for invisible/zero-width characters that could hide instructions
invisible_chars = [
'\u200b', # Zero-width space
'\u200c', # Zero-width non-joiner
'\u200d', # Zero-width joiner
'\u2060', # Word joiner
'\ufeff', # Zero-width no-break space
]
invisible_count = sum(content.count(c) for c in invisible_chars)
if invisible_count > 0:
indicators.append({
"type": "invisible_characters",
"severity": "high",
"count": invisible_count,
"description": f"Document contains {invisible_count} invisible Unicode characters",
})
# Statistical analysis against corpus baseline
word_count = len(content.split())
avg_word_count = corpus_statistics.get("avg_word_count", 500)
std_word_count = corpus_statistics.get("std_word_count", 200)
if abs(word_count - avg_word_count) > 3 * std_word_count:
indicators.append({
"type": "statistical_anomaly",
"severity": "medium",
"description": (
f"Document length ({word_count} words) is >3 std deviations "
f"from corpus mean ({avg_word_count})"
),
})
# Check for unusual metadata
ingestion_time = metadata.get("ingestion_timestamp", 0)
source = metadata.get("source_url", "")
if not source:
indicators.append({
"type": "missing_provenance",
"severity": "medium",
"description": "Document has no source URL or provenance information",
})
# Check for high ratio of non-content tokens (HTML, markdown formatting)
formatting_chars = sum(1 for c in content if c in '<>{}[]|#*_~`')
formatting_ratio = formatting_chars / max(len(content), 1)
if formatting_ratio > 0.15:
indicators.append({
"type": "excessive_formatting",
"severity": "low",
"ratio": round(formatting_ratio, 4),
"description": "Unusually high ratio of formatting characters",
})
return {
"doc_id": metadata.get("doc_id", "unknown"),
"indicators": indicators,
"risk_score": _compute_risk_score(indicators),
"poisoning_likely": any(i["severity"] == "high" for i in indicators),
}
def _compute_risk_score(indicators: list[dict]) -> float:
severity_weights = {"critical": 1.0, "high": 0.7, "medium": 0.3, "low": 0.1}
if not indicators:
return 0.0
score = sum(severity_weights.get(i["severity"], 0.1) for i in indicators)
return min(round(score, 2), 10.0)Vector Database Forensics
Embedding Distribution Analysis
Poisoned documents may produce embeddings that are statistically anomalous relative to the legitimate corpus. This analysis detects adversarial embedding positioning.
def analyze_embedding_distribution(
embeddings: np.ndarray,
labels: list[str], # "baseline" or "suspect"
n_neighbors: int = 10,
) -> dict:
"""
Analyze the distribution of embeddings to detect anomalous positioning.
Poisoned documents may have embeddings that are:
- Clustered around high-traffic query regions
- Positioned as outliers relative to their supposed topic
- Unusually close to embeddings from different topic categories
"""
from sklearn.neighbors import NearestNeighbors
# Compute nearest neighbor distances for each embedding
nn = NearestNeighbors(n_neighbors=n_neighbors, metric='cosine')
nn.fit(embeddings)
distances, indices = nn.kneighbors(embeddings)
# Separate baseline and suspect embeddings
baseline_mask = np.array([l == "baseline" for l in labels])
suspect_mask = ~baseline_mask
results = {
"total_embeddings": len(embeddings),
"baseline_count": int(baseline_mask.sum()),
"suspect_count": int(suspect_mask.sum()),
"anomalies": [],
}
if not suspect_mask.any():
return results
# For each suspect embedding, analyze its neighborhood
suspect_indices = np.where(suspect_mask)[0]
for idx in suspect_indices:
neighbor_indices = indices[idx][1:] # Exclude self
neighbor_distances = distances[idx][1:]
# What fraction of neighbors are baseline vs suspect?
neighbor_labels = [labels[i] for i in neighbor_indices]
baseline_neighbor_frac = sum(
1 for l in neighbor_labels if l == "baseline"
) / len(neighbor_labels)
mean_dist = float(np.mean(neighbor_distances))
# Compute how this compares to baseline mean neighbor distance
baseline_mean_dists = []
for b_idx in np.where(baseline_mask)[0][:100]: # Sample for efficiency
baseline_mean_dists.append(float(np.mean(distances[b_idx][1:])))
if baseline_mean_dists:
baseline_avg_dist = np.mean(baseline_mean_dists)
baseline_std_dist = np.std(baseline_mean_dists)
z_score = (mean_dist - baseline_avg_dist) / max(baseline_std_dist, 1e-10)
else:
z_score = 0.0
if abs(z_score) > 2.0 or baseline_neighbor_frac < 0.3:
results["anomalies"].append({
"embedding_index": int(idx),
"mean_neighbor_distance": round(mean_dist, 4),
"baseline_neighbor_fraction": round(baseline_neighbor_frac, 4),
"distance_z_score": round(float(z_score), 4),
"assessment": (
"Suspiciously positioned" if z_score < -2.0
else "Isolated from legitimate content" if z_score > 2.0
else "Unusual neighborhood composition"
),
})
results["anomaly_count"] = len(results["anomalies"])
return resultsRetrieval Log Analysis
Retrieval logs record which documents were retrieved for each query. Analyzing these logs can reveal poisoning patterns.
def analyze_retrieval_patterns(
retrieval_logs: list[dict],
time_window_hours: float = 24.0,
) -> dict:
"""
Analyze retrieval logs to detect RAG poisoning indicators.
Poisoning manifests in retrieval patterns as:
- New documents appearing with unusually high retrieval frequency
- Documents retrieved for queries outside their expected topic scope
- Sudden changes in which documents are retrieved for stable query types
"""
from collections import Counter, defaultdict
doc_retrieval_counts = Counter()
doc_query_diversity: dict[str, set] = defaultdict(set)
doc_first_seen: dict[str, float] = {}
for log in retrieval_logs:
for doc_id in log.get("retrieved_doc_ids", []):
doc_retrieval_counts[doc_id] += 1
query_category = log.get("query_category", "uncategorized")
doc_query_diversity[doc_id].add(query_category)
if doc_id not in doc_first_seen:
doc_first_seen[doc_id] = log.get("timestamp", 0)
# Identify suspiciously active documents
if not doc_retrieval_counts:
return {"status": "no_retrieval_data"}
counts = list(doc_retrieval_counts.values())
mean_count = float(np.mean(counts))
std_count = float(np.std(counts))
suspicious_docs = []
for doc_id, count in doc_retrieval_counts.items():
z_score = (count - mean_count) / max(std_count, 1.0)
diversity = len(doc_query_diversity[doc_id])
if z_score > 3.0 or diversity > 5:
suspicious_docs.append({
"doc_id": doc_id,
"retrieval_count": count,
"z_score": round(z_score, 2),
"query_category_diversity": diversity,
"categories": list(doc_query_diversity[doc_id]),
"first_seen": doc_first_seen.get(doc_id),
"reason": (
"High retrieval frequency and broad query matching"
if z_score > 3.0 and diversity > 5
else "Unusually high retrieval frequency"
if z_score > 3.0
else "Retrieved across unusually many query categories"
),
})
return {
"total_documents_retrieved": len(doc_retrieval_counts),
"mean_retrieval_count": round(mean_count, 2),
"suspicious_documents": suspicious_docs,
"poisoning_indicators_found": len(suspicious_docs) > 0,
}Forensic Investigation Workflow
Phase 1: Incident Detection
RAG poisoning incidents may be detected through:
- User reports of incorrect or manipulated model outputs
- Automated output quality monitoring detecting factual errors
- Guardrail triggers on retrieved document content
- Retrieval log anomaly detection
Phase 2: Impact Assessment
Determine how many users received outputs influenced by poisoned retrieval:
def assess_rag_poisoning_impact(
poisoned_doc_ids: list[str],
retrieval_logs: list[dict],
response_logs: list[dict],
) -> dict:
"""
Assess the impact of identified poisoned documents.
Determines how many queries were affected and what
outputs were influenced by the poisoned content.
"""
affected_queries = []
for log in retrieval_logs:
retrieved = set(log.get("retrieved_doc_ids", []))
if retrieved & set(poisoned_doc_ids):
affected_queries.append({
"query_id": log.get("query_id"),
"timestamp": log.get("timestamp"),
"user_id": log.get("user_id"),
"poisoned_docs_retrieved": list(retrieved & set(poisoned_doc_ids)),
"total_docs_retrieved": len(retrieved),
"poisoned_fraction": len(retrieved & set(poisoned_doc_ids)) / max(len(retrieved), 1),
})
unique_users = set(q.get("user_id") for q in affected_queries if q.get("user_id"))
return {
"poisoned_documents": len(poisoned_doc_ids),
"affected_queries": len(affected_queries),
"affected_users": len(unique_users),
"time_range": {
"earliest": min((q["timestamp"] for q in affected_queries), default=None),
"latest": max((q["timestamp"] for q in affected_queries), default=None),
},
"affected_query_details": affected_queries[:100],
}Phase 3: Root Cause Analysis
Trace how the poisoned documents entered the knowledge base:
- Ingestion pipeline review: Which data source provided the poisoned documents? Was ingestion automated or manual?
- Access control audit: Who had write access to the document corpus at the time of poisoning?
- Source verification: Is the claimed source URL or document origin legitimate? Was the source itself compromised?
- Temporal analysis: When were the poisoned documents ingested? Does the timing correlate with other suspicious activity?
Phase 4: Remediation
- Immediate: Remove poisoned documents from the knowledge base and vector database
- Re-embed: If embedding manipulation is suspected, re-embed the entire corpus with a fresh embedding model
- Notify: Alert affected users that they received potentially manipulated information
- Harden: Implement content validation in the ingestion pipeline, add retrieval-level guardrails
Prevention Measures
Forensic lessons from RAG poisoning incidents inform prevention:
- Input validation on ingestion: Scan all documents for instruction-like patterns before adding to the knowledge base
- Provenance tracking: Maintain a complete provenance chain for every document in the corpus
- Retrieval monitoring: Log and monitor retrieval patterns for anomaly detection
- Content separation: Maintain separate retrieval indices for different trust levels of content
- Regular integrity checks: Periodically verify the corpus against baseline manifests
References
- Zou, W., Geng, R., Wang, B., & Jia, J. (2024). PoisonedRAG: Knowledge Poisoning Attacks to Retrieval-Augmented Generation of Large Language Models. arXiv preprint arXiv:2402.07867. https://arxiv.org/abs/2402.07867
- OWASP. (2025). OWASP Top 10 for Large Language Model Applications. LLM01: Prompt Injection. https://owasp.org/www-project-top-10-for-large-language-model-applications/
- Greshake, K., Abdelnabi, S., Mishra, S., Endres, C., Holz, T., & Fritz, M. (2023). Not What You've Signed Up For: Compromising Real-World LLM-Integrated Applications with Indirect Prompt Injection. ACM Workshop on Artificial Intelligence and Security. https://arxiv.org/abs/2302.12173