RAG 投毒 Forensics
Forensic investigation techniques for detecting and analyzing poisoning attacks against Retrieval-Augmented Generation systems.
概覽
檢索增強生成 (RAG) systems combine information retrieval with language model generation: when a user asks a question, 系統 retrieves relevant documents from a 知識庫, injects them into the LLM's context, and generates a response grounded in the retrieved information. RAG 投毒 attacks target this retrieval pipeline by injecting, modifying, or manipulating documents in the 知識庫 so that malicious content is retrieved and influences 模型's 輸出.
RAG 投毒 is a particularly effective attack vector 因為 it exploits the trust relationship between the retrieval system and the generation model. The LLM treats retrieved documents as authoritative context, making it susceptible to following instructions or generating misinformation embedded in poisoned documents. Unlike direct 提示詞注入, RAG 投毒 can affect every user who triggers retrieval of the poisoned content, making it a one-to-many attack.
Forensic investigation of RAG 投毒 requires analyzing three layers: the document corpus (what was poisoned), the retrieval mechanism (how poisoned content was selected), and the generation 輸出 (what 模型 did with the poisoned context). This article covers forensic techniques 對每個 layer.
RAG Poisoning 攻擊 Taxonomy
Direct Document Injection
攻擊者 adds new documents to the 知識庫 that contain malicious content. The documents are crafted so that their 嵌入向量 are close to common query 嵌入向量, ensuring they are retrieved frequently.
Existing Document Modification
攻擊者 modifies existing documents in the 知識庫, either by appending malicious instructions or by subtly altering factual content. 這是 harder to detect 因為 the document's metadata (source URL, creation date) may be unchanged.
嵌入向量 Space Manipulation
攻擊者 crafts documents whose text appears benign to human reviewers but whose 嵌入向量 are adversarially positioned to maximize retrieval for specific target queries.
Metadata Poisoning
Some RAG systems use metadata filters (date ranges, source categories, access 權限) to scope retrieval. Poisoning metadata can cause documents to be retrieved in contexts where they should not be available.
| 攻擊 Type | 偵測 Difficulty | Impact Scope | Persistence |
|---|---|---|---|
| Direct injection | Medium | Queries matching poisoned 嵌入向量 | Until document is removed |
| Document modification | High | All queries retrieving modified doc | Until modification is detected |
| 嵌入向量 manipulation | Very High | Targeted queries | Until document is removed |
| Metadata 投毒 | Medium | Queries using affected metadata filters | Until metadata is corrected |
Forensic Analysis of the Document Corpus
Document Integrity Verification
The first forensic step is verifying the integrity of the document corpus against a known-good baseline.
"""
RAG 投毒 forensic analysis module.
Provides tools for detecting and analyzing 投毒 attacks
against RAG document corpuses and retrieval pipelines.
"""
import hashlib
import json
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import numpy as np
@dataclass
class DocumentRecord:
"""Record of a document in the RAG 知識庫."""
doc_id: str
content_hash: str
source_url: str | None
ingestion_timestamp: float
metadata: dict[str, Any]
chunk_count: int
embedding_ids: list[str] = field(default_factory=list)
class CorpusIntegrityVerifier:
"""Verify the integrity of a RAG document corpus."""
def __init__(self, baseline_manifest: dict[str, str]):
"""
Initialize with a baseline manifest mapping document IDs
to their expected content hashes.
"""
self.baseline = baseline_manifest
def verify_corpus(
self,
current_documents: dict[str, str], # doc_id -> content hash
) -> dict:
"""
Compare current corpus against baseline.
Detects added, removed, and modified documents.
"""
baseline_ids = set(self.baseline.keys())
current_ids = set(current_documents.keys())
added = current_ids - baseline_ids
removed = baseline_ids - current_ids
common = baseline_ids & current_ids
modified = []
for doc_id in common:
if current_documents[doc_id] != self.baseline[doc_id]:
modified.append(doc_id)
return {
"baseline_count": len(self.baseline),
"current_count": len(current_documents),
"added_documents": list(added),
"removed_documents": list(removed),
"modified_documents": modified,
"integrity_status": (
"CLEAN" if not added and not removed and not modified
else "COMPROMISED"
),
"summary": (
f"{len(added)} added, {len(removed)} removed, "
f"{len(modified)} modified documents detected"
),
}Content Analysis for Poisoning Indicators
Poisoned documents often contain characteristic patterns that distinguish them from legitimate content.
def analyze_document_for_poisoning(
content: str,
metadata: dict,
corpus_statistics: dict,
) -> dict:
"""
Analyze a document for indicators of RAG 投毒.
Checks for:
- Embedded instructions (提示詞注入 payloads)
- Statistical anomalies vs corpus baseline
- Metadata inconsistencies
- 對抗性 content patterns
"""
indicators = []
# Check for embedded instruction patterns
instruction_patterns = [
"ignore previous instructions",
"ignore the above",
"disregard your instructions",
"you are now",
"new instructions:",
"系統提示詞:",
"IMPORTANT: ",
"[INST]",
"<<SYS>>",
"### Instruction",
]
content_lower = content.lower()
for pattern in instruction_patterns:
if pattern.lower() in content_lower:
indicators.append({
"type": "embedded_instruction",
"severity": "high",
"pattern": pattern,
"description": f"Document contains instruction-like pattern: '{pattern}'",
})
# Check for invisible/zero-width characters that could hide instructions
invisible_chars = [
'\u200b', # Zero-width space
'\u200c', # Zero-width non-joiner
'\u200d', # Zero-width joiner
'\u2060', # Word joiner
'\ufeff', # Zero-width no-break space
]
invisible_count = sum(content.count(c) for c in invisible_chars)
if invisible_count > 0:
indicators.append({
"type": "invisible_characters",
"severity": "high",
"count": invisible_count,
"description": f"Document contains {invisible_count} invisible Unicode characters",
})
# Statistical analysis against corpus baseline
word_count = len(content.split())
avg_word_count = corpus_statistics.get("avg_word_count", 500)
std_word_count = corpus_statistics.get("std_word_count", 200)
if abs(word_count - avg_word_count) > 3 * std_word_count:
indicators.append({
"type": "statistical_anomaly",
"severity": "medium",
"description": (
f"Document length ({word_count} words) is >3 std deviations "
f"from corpus mean ({avg_word_count})"
),
})
# Check for unusual metadata
ingestion_time = metadata.get("ingestion_timestamp", 0)
source = metadata.get("source_url", "")
if not source:
indicators.append({
"type": "missing_provenance",
"severity": "medium",
"description": "Document has no source URL or provenance information",
})
# Check for high ratio of non-content 符元 (HTML, markdown formatting)
formatting_chars = sum(1 for c in content if c in '<>{}[]|#*_~`')
formatting_ratio = formatting_chars / max(len(content), 1)
if formatting_ratio > 0.15:
indicators.append({
"type": "excessive_formatting",
"severity": "low",
"ratio": round(formatting_ratio, 4),
"description": "Unusually high ratio of formatting characters",
})
return {
"doc_id": metadata.get("doc_id", "unknown"),
"indicators": indicators,
"risk_score": _compute_risk_score(indicators),
"poisoning_likely": any(i["severity"] == "high" for i in indicators),
}
def _compute_risk_score(indicators: list[dict]) -> float:
severity_weights = {"critical": 1.0, "high": 0.7, "medium": 0.3, "low": 0.1}
if not indicators:
return 0.0
score = sum(severity_weights.get(i["severity"], 0.1) for i in indicators)
return min(round(score, 2), 10.0)Vector 資料庫 Forensics
嵌入向量 Distribution Analysis
Poisoned documents may produce 嵌入向量 that are statistically anomalous relative to the legitimate corpus. This analysis detects 對抗性 嵌入向量 positioning.
def analyze_embedding_distribution(
嵌入向量: np.ndarray,
labels: list[str], # "baseline" or "suspect"
n_neighbors: int = 10,
) -> dict:
"""
Analyze the distribution of 嵌入向量 to detect anomalous positioning.
Poisoned documents may have 嵌入向量 that are:
- Clustered around high-traffic query regions
- Positioned as outliers relative to their supposed topic
- Unusually close to 嵌入向量 from different topic categories
"""
from sklearn.neighbors import NearestNeighbors
# Compute nearest neighbor distances 對每個 嵌入向量
nn = NearestNeighbors(n_neighbors=n_neighbors, metric='cosine')
nn.fit(嵌入向量)
distances, indices = nn.kneighbors(嵌入向量)
# Separate baseline and suspect 嵌入向量
baseline_mask = np.array([l == "baseline" for l in labels])
suspect_mask = ~baseline_mask
results = {
"total_embeddings": len(嵌入向量),
"baseline_count": int(baseline_mask.sum()),
"suspect_count": int(suspect_mask.sum()),
"anomalies": [],
}
if not suspect_mask.any():
return results
# 對每個 suspect 嵌入向量, analyze its neighborhood
suspect_indices = np.where(suspect_mask)[0]
for idx in suspect_indices:
neighbor_indices = indices[idx][1:] # Exclude self
neighbor_distances = distances[idx][1:]
# What fraction of neighbors are baseline vs suspect?
neighbor_labels = [labels[i] for i in neighbor_indices]
baseline_neighbor_frac = sum(
1 for l in neighbor_labels if l == "baseline"
) / len(neighbor_labels)
mean_dist = float(np.mean(neighbor_distances))
# Compute how this compares to baseline mean neighbor distance
baseline_mean_dists = []
for b_idx in np.where(baseline_mask)[0][:100]: # Sample for efficiency
baseline_mean_dists.append(float(np.mean(distances[b_idx][1:])))
if baseline_mean_dists:
baseline_avg_dist = np.mean(baseline_mean_dists)
baseline_std_dist = np.std(baseline_mean_dists)
z_score = (mean_dist - baseline_avg_dist) / max(baseline_std_dist, 1e-10)
else:
z_score = 0.0
if abs(z_score) > 2.0 or baseline_neighbor_frac < 0.3:
results["anomalies"].append({
"embedding_index": int(idx),
"mean_neighbor_distance": round(mean_dist, 4),
"baseline_neighbor_fraction": round(baseline_neighbor_frac, 4),
"distance_z_score": round(float(z_score), 4),
"評估": (
"Suspiciously positioned" if z_score < -2.0
else "Isolated from legitimate content" if z_score > 2.0
else "Unusual neighborhood composition"
),
})
results["anomaly_count"] = len(results["anomalies"])
return resultsRetrieval Log Analysis
Retrieval logs record which documents were retrieved 對每個 query. Analyzing these logs can reveal 投毒 patterns.
def analyze_retrieval_patterns(
retrieval_logs: list[dict],
time_window_hours: float = 24.0,
) -> dict:
"""
Analyze retrieval logs to detect RAG 投毒 indicators.
Poisoning manifests in retrieval patterns as:
- New documents appearing with unusually high retrieval frequency
- Documents retrieved for queries outside their expected topic scope
- Sudden changes in which documents are retrieved for stable query types
"""
from collections import Counter, defaultdict
doc_retrieval_counts = Counter()
doc_query_diversity: dict[str, set] = defaultdict(set)
doc_first_seen: dict[str, float] = {}
for log in retrieval_logs:
for doc_id in log.get("retrieved_doc_ids", []):
doc_retrieval_counts[doc_id] += 1
query_category = log.get("query_category", "uncategorized")
doc_query_diversity[doc_id].add(query_category)
if doc_id not in doc_first_seen:
doc_first_seen[doc_id] = log.get("timestamp", 0)
# 識別 suspiciously active documents
if not doc_retrieval_counts:
return {"status": "no_retrieval_data"}
counts = list(doc_retrieval_counts.values())
mean_count = float(np.mean(counts))
std_count = float(np.std(counts))
suspicious_docs = []
for doc_id, count in doc_retrieval_counts.items():
z_score = (count - mean_count) / max(std_count, 1.0)
diversity = len(doc_query_diversity[doc_id])
if z_score > 3.0 or diversity > 5:
suspicious_docs.append({
"doc_id": doc_id,
"retrieval_count": count,
"z_score": round(z_score, 2),
"query_category_diversity": diversity,
"categories": list(doc_query_diversity[doc_id]),
"first_seen": doc_first_seen.get(doc_id),
"reason": (
"High retrieval frequency and broad query matching"
if z_score > 3.0 and diversity > 5
else "Unusually high retrieval frequency"
if z_score > 3.0
else "Retrieved across unusually many query categories"
),
})
return {
"total_documents_retrieved": len(doc_retrieval_counts),
"mean_retrieval_count": round(mean_count, 2),
"suspicious_documents": suspicious_docs,
"poisoning_indicators_found": len(suspicious_docs) > 0,
}Forensic Investigation Workflow
Phase 1: Incident 偵測
RAG 投毒 incidents may be detected through:
- User reports of incorrect or manipulated model outputs
- Automated 輸出 quality 監控 detecting factual errors
- 護欄 triggers on retrieved document content
- Retrieval log anomaly 偵測
Phase 2: Impact 評估
Determine how many users received outputs influenced by poisoned retrieval:
def assess_rag_poisoning_impact(
poisoned_doc_ids: list[str],
retrieval_logs: list[dict],
response_logs: list[dict],
) -> dict:
"""
評估 the impact of identified poisoned documents.
Determines how many queries were affected and what
outputs were influenced by the poisoned content.
"""
affected_queries = []
for log in retrieval_logs:
retrieved = set(log.get("retrieved_doc_ids", []))
if retrieved & set(poisoned_doc_ids):
affected_queries.append({
"query_id": log.get("query_id"),
"timestamp": log.get("timestamp"),
"user_id": log.get("user_id"),
"poisoned_docs_retrieved": list(retrieved & set(poisoned_doc_ids)),
"total_docs_retrieved": len(retrieved),
"poisoned_fraction": len(retrieved & set(poisoned_doc_ids)) / max(len(retrieved), 1),
})
unique_users = set(q.get("user_id") for q in affected_queries if q.get("user_id"))
return {
"poisoned_documents": len(poisoned_doc_ids),
"affected_queries": len(affected_queries),
"affected_users": len(unique_users),
"time_range": {
"earliest": min((q["timestamp"] for q in affected_queries), default=None),
"latest": max((q["timestamp"] for q in affected_queries), default=None),
},
"affected_query_details": affected_queries[:100],
}Phase 3: Root Cause Analysis
Trace how the poisoned documents entered the 知識庫:
- Ingestion pipeline review: Which data source provided the poisoned documents? Was ingestion automated or manual?
- Access control audit: Who had write access to the document corpus at the time of 投毒?
- Source verification: Is the claimed source URL or document origin legitimate? Was the source itself compromised?
- Temporal analysis: When were the poisoned documents ingested? Does the timing correlate with other suspicious activity?
Phase 4: Remediation
- Immediate: Remove poisoned documents from the 知識庫 and 向量資料庫
- Re-embed: If 嵌入向量 manipulation is suspected, re-embed the entire corpus with a fresh 嵌入向量 model
- Notify: Alert affected users that they received potentially manipulated information
- Harden: 實作 content validation in the ingestion pipeline, add retrieval-level 護欄
Prevention Measures
Forensic lessons from RAG 投毒 incidents inform prevention:
- 輸入 validation on ingestion: Scan all documents for instruction-like patterns before adding to the 知識庫
- Provenance tracking: Maintain a complete provenance chain for every document in the corpus
- Retrieval 監控: Log and monitor retrieval patterns for anomaly 偵測
- Content separation: Maintain separate retrieval indices for different trust levels of content
- Regular integrity checks: Periodically verify the corpus against baseline manifests
參考文獻
- Zou, W., Geng, R., Wang, B., & Jia, J. (2024). PoisonedRAG: Knowledge Poisoning 攻擊 to 檢索增強生成 of Large Language Models. arXiv preprint arXiv:2402.07867. https://arxiv.org/abs/2402.07867
- OWASP. (2025). OWASP Top 10 for 大型語言模型 Applications. LLM01: 提示詞注入. https://owasp.org/www-project-top-10-for-large-language-model-applications/
- Greshake, K., Abdelnabi, S., Mishra, S., Endres, C., Holz, T., & Fritz, M. (2023). Not What You've Signed Up For: Compromising Real-World LLM-Integrated Applications with Indirect 提示詞注入. ACM Workshop on Artificial Intelligence and 安全. https://arxiv.org/abs/2302.12173