Vectordatabase-forensics
Forensische analysetechnieken voor het detecteren en onderzoeken van vergiftiging van vectordatabases, ongeautoriseerde wijzigingen en schendingen van data-integriteit.
Overzicht
Vectordatabases zijn de retrieval-ruggengraat van Retrieval-Augmented Generation (RAG)-systemen, semantische zoekmachines en door AI aangedreven aanbevelingssystemen. Ze slaan hoogdimensionale embedding-vectoren op naast metadata en originele inhoud, wat similariteit-gebaseerde retrieval mogelijk maakt die downstream-AI-modellen voedt. Wanneer een aanvaller een vectordatabase vergiftigt, produceert elke query die de vergiftigde vectoren ophaalt gecompromitteerde resultaten, waardoor dit een aanvalsoppervlak met grote hefboomwerking is.
Vectordatabase-forensics is het proces van het detecteren dat met een vectordatabase is geknoeid, het bepalen wat er is gewijzigd, het identificeren van hoe en wanneer de wijzigingen zijn aangebracht, en het beoordelen van de impact op downstream-systemen. Dit is een relatief nieuwe forensische discipline, omdat vectordatabases zelf een recente toevoeging zijn aan de meeste productiearchitecturen, en omdat traditionele forensische databasetechnieken (analyse van transactielogs, auditing op rijniveau) niet direct van toepassing zijn op de hoogdimensionale vectorruimte.
De kern-forensische uitdagingen zijn: vectoren zijn ondoorzichtig (je kunt een embedding niet lezen zoals je een databaserij leest), vergiftigde vectoren kunnen statistisch vergelijkbaar zijn met legitieme vectoren (waardoor ze moeilijk te onderscheiden zijn), en de meeste vectordatabases hebben beperkte ingebouwde auditing vergeleken met relationele databases. Een onderzoeker moet analyse van de vectorruimte, metadata-forensics, het traceren van de ingestiepijplijn en correlatie van toegangslogs combineren om een compleet beeld van een vergiftigingsincident op te bouwen.
Dit artikel behandelt detectietechnieken voor het identificeren van vergiftigde vectoren, forensische analyse van de status en geschiedenis van vectordatabases, pijplijntracering om de vergiftigingsbron te identificeren, en monitoringsystemen voor doorlopende detectie.
Vergiftigde vectoren detecteren
Statistische anomaliedetectie in de embedding-ruimte
Vergiftigde vectoren hebben vaak statistische eigenschappen die afwijken van legitieme vectoren, zelfs wanneer de aanvaller heeft geprobeerd ze te laten opgaan in de massa. De meest voorkomende anomalieën zijn: vectoren die ongebruikelijk dicht clusteren rond regio's met veel query-verkeer (om retrieval te maximaliseren), vectoren met magnitude- of distributie-eigenschappen die afwijken van de legitieme populatie, en vectoren waarvan de metadata inconsistent is met hun embedding-inhoud.
import numpy as np
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
import hashlib
import json
@dataclass
class VectorRecord:
"""Een enkele vectorregistratie uit een vectordatabase."""
vector_id: str
vector: np.ndarray
metadata: dict
content: str # Originele tekst- of documentinhoud
created_at: Optional[str] = None
updated_at: Optional[str] = None
source: Optional[str] = None # Identifier van de ingestiebron
@dataclass
class AnomalyResult:
"""Resultaat van anomaliedetectie op een vectorregistratie."""
vector_id: str
anomaly_score: float # 0-1, hoger is afwijkender
anomaly_types: list[str]
details: dict
flagged: bool = False
class VectorForensicAnalyzer:
"""Detecteer en analyseer anomalieën in de inhoud van vectordatabases."""
def __init__(
self,
expected_dimension: int = 1536,
magnitude_tolerance: float = 2.0,
):
self.expected_dimension = expected_dimension
self.magnitude_tolerance = magnitude_tolerance
self.baseline_stats: Optional[dict] = None
def build_baseline(self, vectors: list[VectorRecord]) -> dict:
"""
Bouw een statistische baseline op basis van een bekende, betrouwbare vectorcollectie.
Args:
vectors: Lijst van bekende, betrouwbare vectorregistraties.
Returns:
Dict met baseline-statistieken.
"""
if not vectors:
return {}
all_vectors = np.array([v.vector for v in vectors])
magnitudes = np.linalg.norm(all_vectors, axis=1)
# Statistieken per dimensie
dim_means = np.mean(all_vectors, axis=0)
dim_stds = np.std(all_vectors, axis=0)
# Statistieken van paarsgewijze afstanden (steekproef voor efficiëntie)
sample_size = min(len(vectors), 1000)
sample_indices = np.random.choice(
len(vectors), sample_size, replace=False
)
sample_vectors = all_vectors[sample_indices]
# Cosinus-similariteiten tussen willekeurige paren
n_pairs = min(5000, sample_size * (sample_size - 1) // 2)
pair_sims = []
for _ in range(n_pairs):
i, j = np.random.choice(sample_size, 2, replace=False)
sim = np.dot(sample_vectors[i], sample_vectors[j]) / (
np.linalg.norm(sample_vectors[i])
* np.linalg.norm(sample_vectors[j])
+ 1e-10
)
pair_sims.append(sim)
self.baseline_stats = {
"count": len(vectors),
"dimension": all_vectors.shape[1],
"magnitude_mean": float(np.mean(magnitudes)),
"magnitude_std": float(np.std(magnitudes)),
"magnitude_min": float(np.min(magnitudes)),
"magnitude_max": float(np.max(magnitudes)),
"dim_means": dim_means,
"dim_stds": dim_stds,
"pairwise_sim_mean": float(np.mean(pair_sims)),
"pairwise_sim_std": float(np.std(pair_sims)),
"centroid": dim_means.copy(),
}
return {
k: v for k, v in self.baseline_stats.items()
if not isinstance(v, np.ndarray)
}
def analyze_vector(self, record: VectorRecord) -> AnomalyResult:
"""
Analyseer een enkele vector op anomalieën ten opzichte van de baseline.
Args:
record: De te analyseren vectorregistratie.
Returns:
AnomalyResult met details over de anomalie.
"""
if self.baseline_stats is None:
raise ValueError("Must call build_baseline() first.")
anomaly_types = []
details = {}
scores = []
vec = record.vector
# Controle 1: Dimensiemismatch
if len(vec) != self.expected_dimension:
anomaly_types.append("dimension_mismatch")
details["expected_dim"] = self.expected_dimension
details["actual_dim"] = len(vec)
scores.append(1.0)
# Controle 2: Magnitude-anomalie
magnitude = float(np.linalg.norm(vec))
mag_mean = self.baseline_stats["magnitude_mean"]
mag_std = self.baseline_stats["magnitude_std"]
mag_z_score = abs(magnitude - mag_mean) / max(mag_std, 1e-10)
details["magnitude"] = magnitude
details["magnitude_z_score"] = float(mag_z_score)
if mag_z_score > self.magnitude_tolerance:
anomaly_types.append("magnitude_anomaly")
scores.append(min(mag_z_score / 5.0, 1.0))
# Controle 3: Uitschieteranalyse per dimensie
dim_means = self.baseline_stats["dim_means"]
dim_stds = self.baseline_stats["dim_stds"]
z_scores = np.abs(vec - dim_means) / np.maximum(dim_stds, 1e-10)
outlier_dims = int(np.sum(z_scores > 3.0))
outlier_fraction = outlier_dims / len(vec)
details["outlier_dimensions"] = outlier_dims
details["outlier_fraction"] = float(outlier_fraction)
if outlier_fraction > 0.05: # Meer dan 5% van de dimensies zijn uitschieters
anomaly_types.append("dimensional_outlier")
scores.append(min(outlier_fraction * 5, 1.0))
# Controle 4: Afstand tot de centroïde
centroid = self.baseline_stats["centroid"]
centroid_distance = float(np.linalg.norm(vec - centroid))
expected_distance = float(np.linalg.norm(dim_stds))
distance_ratio = centroid_distance / max(expected_distance, 1e-10)
details["centroid_distance"] = centroid_distance
details["distance_ratio"] = distance_ratio
if distance_ratio > 3.0:
anomaly_types.append("centroid_outlier")
scores.append(min(distance_ratio / 5.0, 1.0))
# Controle 5: Nul- of bijna-nulvector (mogelijke corruptie)
if magnitude < 1e-6:
anomaly_types.append("zero_vector")
scores.append(1.0)
# Controle 6: Consistentie van metadata
metadata_anomalies = self._check_metadata_consistency(record)
if metadata_anomalies:
anomaly_types.extend(metadata_anomalies)
scores.append(0.5)
details["metadata_anomalies"] = metadata_anomalies
# Samengestelde score
anomaly_score = max(scores) if scores else 0.0
return AnomalyResult(
vector_id=record.vector_id,
anomaly_score=round(anomaly_score, 4),
anomaly_types=anomaly_types,
details=details,
flagged=anomaly_score > 0.5,
)
def _check_metadata_consistency(
self, record: VectorRecord,
) -> list[str]:
"""Controleer op metadata-anomalieën in een vectorregistratie."""
anomalies = []
# Controleer op ontbrekende standaard-metadatavelden
expected_fields = {"source", "timestamp", "content_type"}
missing = expected_fields - set(record.metadata.keys())
if missing:
anomalies.append(f"missing_metadata: {missing}")
# Controleer op verdacht recente aanmaak in een oude collectie
if record.created_at:
try:
created = datetime.fromisoformat(record.created_at)
if (datetime.utcnow() - created).days < 1:
# Zeer recente toevoeging — vereist mogelijk handmatige beoordeling
anomalies.append("recently_created")
except (ValueError, TypeError):
anomalies.append("invalid_timestamp")
# Controleer inhoudslengte versus embedding (zeer korte inhoud
# die embeddings van normale lengte produceert is verdacht)
if record.content and len(record.content) < 10:
anomalies.append("minimal_content")
return anomalies
def scan_collection(
self,
records: list[VectorRecord],
threshold: float = 0.5,
) -> dict:
"""
Scan een hele vectorcollectie op anomalieën.
Args:
records: Alle vectorregistraties in de collectie.
threshold: Drempelwaarde voor de anomaliescore om te markeren.
Returns:
Scanresultaten met gemarkeerde registraties en statistieken.
"""
results = []
for record in records:
result = self.analyze_vector(record)
if result.anomaly_score >= threshold:
results.append(result)
# Cluster de gemarkeerde vectoren om gecoördineerde vergiftiging te identificeren
clusters = self._cluster_anomalies(
[r for r in records if any(
ar.vector_id == r.vector_id
for ar in results
)],
results,
)
return {
"total_scanned": len(records),
"flagged_count": len(results),
"flagged_percentage": round(
len(results) / max(len(records), 1) * 100, 2
),
"anomaly_type_counts": self._count_anomaly_types(results),
"flagged_records": [
{
"vector_id": r.vector_id,
"anomaly_score": r.anomaly_score,
"anomaly_types": r.anomaly_types,
}
for r in results
],
"potential_clusters": clusters,
}
def _count_anomaly_types(
self, results: list[AnomalyResult],
) -> dict[str, int]:
"""Tel het aantal keer dat elk anomalietype voorkomt."""
counts: dict[str, int] = {}
for r in results:
for atype in r.anomaly_types:
counts[atype] = counts.get(atype, 0) + 1
return counts
def _cluster_anomalies(
self,
flagged_records: list[VectorRecord],
anomaly_results: list[AnomalyResult],
) -> list[dict]:
"""Identificeer clusters van afwijkende vectoren die kunnen wijzen op
gecoördineerde vergiftiging."""
if len(flagged_records) < 2:
return []
vectors = np.array([r.vector for r in flagged_records])
# Eenvoudige clustering: groepeer vectoren met een hoge cosinus-similariteit
clusters = []
visited = set()
for i in range(len(flagged_records)):
if i in visited:
continue
cluster = [i]
visited.add(i)
for j in range(i + 1, len(flagged_records)):
if j in visited:
continue
sim = np.dot(vectors[i], vectors[j]) / (
np.linalg.norm(vectors[i])
* np.linalg.norm(vectors[j])
+ 1e-10
)
if sim > 0.85: # Drempel voor hoge similariteit
cluster.append(j)
visited.add(j)
if len(cluster) >= 2:
clusters.append({
"size": len(cluster),
"vector_ids": [
flagged_records[idx].vector_id for idx in cluster
],
"sources": list({
flagged_records[idx].source
for idx in cluster
if flagged_records[idx].source
}),
})
return clustersVerificatie van consistentie tussen inhoud en embedding
Een krachtige forensische techniek voor het detecteren van vergiftigde vectoren is het verifiëren dat de opgeslagen embedding daadwerkelijk overeenkomt met de opgeslagen inhoud. Een aanvaller die een document injecteert met adversariële inhoud maar een gemanipuleerde embedding (om retrieval voor specifieke query's te garanderen), zal deze consistentiecontrole niet doorstaan.
from typing import Callable
class ContentEmbeddingVerifier:
"""Verifieer dat opgeslagen embeddings overeenkomen met hun bijbehorende inhoud."""
def __init__(
self,
embedding_function: Callable[[str], np.ndarray],
similarity_threshold: float = 0.90,
):
"""
Args:
embedding_function: Functie die tekst aanneemt en
een embedding-vector retourneert.
similarity_threshold: Minimale cosinus-similariteit tussen
opgeslagen en herberekende embeddings.
"""
self.embed = embedding_function
self.threshold = similarity_threshold
def verify_record(self, record: VectorRecord) -> dict:
"""
Verifieer dat de embedding van een registratie overeenkomt met de inhoud.
Args:
record: De te verifiëren vectorregistratie.
Returns:
Dict met het verificatieresultaat.
"""
if not record.content:
return {
"vector_id": record.vector_id,
"verified": False,
"reason": "no_content",
"similarity": 0.0,
}
# Herbereken de embedding op basis van de inhoud
recomputed = self.embed(record.content)
# Bereken de cosinus-similariteit
stored = record.vector
similarity = float(
np.dot(stored, recomputed)
/ (np.linalg.norm(stored) * np.linalg.norm(recomputed) + 1e-10)
)
verified = similarity >= self.threshold
return {
"vector_id": record.vector_id,
"verified": verified,
"similarity": round(similarity, 4),
"threshold": self.threshold,
"reason": "match" if verified else "embedding_content_mismatch",
}
def verify_collection(
self,
records: list[VectorRecord],
sample_size: Optional[int] = None,
) -> dict:
"""
Verifieer de consistentie tussen embedding en inhoud over een collectie.
Args:
records: Te verifiëren vectorregistraties.
sample_size: Indien ingesteld, neem willekeurig dit aantal registraties
als steekproef in plaats van alle te verifiëren.
Returns:
Verificatieresultaten over de hele collectie.
"""
if sample_size and sample_size < len(records):
indices = np.random.choice(
len(records), sample_size, replace=False
)
sample = [records[i] for i in indices]
else:
sample = records
results = []
mismatches = []
for record in sample:
result = self.verify_record(record)
results.append(result)
if not result["verified"] and result["reason"] != "no_content":
mismatches.append(result)
similarities = [
r["similarity"] for r in results
if r["reason"] != "no_content"
]
return {
"total_verified": len(sample),
"match_count": len(sample) - len(mismatches),
"mismatch_count": len(mismatches),
"mismatch_rate": round(
len(mismatches) / max(len(sample), 1), 4
),
"mean_similarity": round(
float(np.mean(similarities)) if similarities else 0.0, 4
),
"min_similarity": round(
float(np.min(similarities)) if similarities else 0.0, 4
),
"mismatched_records": mismatches,
}Nearest-neighbor-analyse voor vergiftigingsintentie
Wanneer vectoren als vergiftigd zijn bevestigd, onthult het analyseren van hun naaste buren in de embedding-ruimte de intentie van de aanvaller. Vergiftigde vectoren die zijn ontworpen om reacties over een specifiek onderwerp te beïnvloeden, zullen clusteren nabij legitieme vectoren over dat onderwerp. Door de legitieme vectoren te identificeren die het dichtst bij elke vergiftigde vector liggen, kun je bepalen op welke query's de aanvaller zich richtte.
def analyze_poisoning_intent(
poisoned_records: list[VectorRecord],
all_records: list[VectorRecord],
top_k: int = 10,
) -> list[dict]:
"""
Analyseer de naaste buren van vergiftigde vectoren om de
waarschijnlijke targeting-intentie van de aanvaller te bepalen.
Args:
poisoned_records: De geïdentificeerde vergiftigde vectoren.
all_records: Alle vectoren in de collectie.
top_k: Aantal te analyseren naaste buren.
Returns:
Lijst van targeting-analyseresultaten per vergiftigde vector.
"""
# Bouw een matrix van alle legitieme vectoren
legitimate = [r for r in all_records if r.vector_id not in
{p.vector_id for p in poisoned_records}]
if not legitimate:
return []
legit_matrix = np.array([r.vector for r in legitimate])
legit_norms = np.linalg.norm(legit_matrix, axis=1, keepdims=True)
legit_normalized = legit_matrix / np.maximum(legit_norms, 1e-10)
results = []
for poisoned in poisoned_records:
p_vec = poisoned.vector
p_norm = np.linalg.norm(p_vec)
p_normalized = p_vec / max(p_norm, 1e-10)
# Bereken cosinus-similariteiten
similarities = legit_normalized @ p_normalized
top_indices = np.argsort(similarities)[-top_k:][::-1]
neighbors = []
for idx in top_indices:
neighbors.append({
"vector_id": legitimate[idx].vector_id,
"content_preview": legitimate[idx].content[:200],
"similarity": float(similarities[idx]),
"metadata": legitimate[idx].metadata,
})
# Leid de targeting af uit de inhoud van de buren
neighbor_sources = [n["metadata"].get("source", "") for n in neighbors]
neighbor_topics = [n["content_preview"][:50] for n in neighbors]
results.append({
"poisoned_vector_id": poisoned.vector_id,
"poisoned_content_preview": poisoned.content[:200],
"nearest_legitimate_neighbors": neighbors,
"likely_target_topics": neighbor_topics[:3],
"max_similarity_to_legitimate": float(similarities[top_indices[0]]),
})
return resultsTracering van de ingestiepijplijn
Het vergiftigingspad volgen
Zodra vergiftigde vectoren zijn geïdentificeerd, is de volgende forensische vraag: hoe zijn ze in de database terechtgekomen? Door de ingestiepijplijn terug te traceren van de vergiftigde registratie naar de bron, wordt de aanvalsvector onthuld.
De meeste RAG-systemen importeren documenten via een pijplijn: het ophalen uit de bron (web scraping, bestandsupload, API-pull) gevolgd door chunking, embedding en invoeging in de database. Elke fase kan zijn eigen logs hebben. De forensisch onderzoeker moet de metadata van de vergiftigde vector (bron-URL, ingestie-tijdstempel, document-ID) correleren met de pijplijnlogs om het injectiepunt te identificeren.
Veelvoorkomende injectiepunten zijn: directe toegang tot de database-API (de aanvaller schrijft vectoren rechtstreeks en omzeilt de pijplijn), manipulatie van de documentbron (de aanvaller wijzigt een brondocument dat later normaal wordt geïngesteerd), compromittering van de pijplijn (de aanvaller wijzigt de embedding- of chunking-stap om vectoren onderweg te veranderen), en misbruik van het upload-endpoint (de aanvaller gebruikt een legitieme functie voor het uploaden van documenten om kwaadaardige inhoud in te dienen).
Voor elk injectiepunt verschilt het forensische bewijs. Directe API-toegang produceert authenticatielogs in de vectordatabase of de proxy ervan. Manipulatie van de documentbron vereist onderzoek van de geschiedenis en toegangslogs van het brondocument. Compromittering van de pijplijn vereist onderzoek van de pijplijncode en de deploymentgeschiedenis op ongeautoriseerde wijzigingen.
Monitoring van collectie-integriteit
Hash-gebaseerde integriteitsverificatie
Onderhoud in de loop van de tijd een cryptografisch manifest van de status van de vectorcollectie. Hash de collectie (of een deterministische steekproef) periodiek en vergelijk deze met de vorige hash. Elke onverwachte verandering activeert een onderzoek.
class CollectionIntegrityMonitor:
"""Monitor de integriteit van een vectorcollectie in de loop van de tijd."""
def __init__(self, collection_name: str, storage_path: str):
self.collection_name = collection_name
self.storage_path = storage_path
self.manifests: list[dict] = []
def create_manifest(
self,
records: list[VectorRecord],
) -> dict:
"""
Maak een integriteitsmanifest voor de huidige status van de collectie.
Args:
records: Alle vectorregistraties in de collectie.
Returns:
Manifest-dict met collectiehash en metadata.
"""
# Sorteer registraties op ID voor deterministische hashing
sorted_records = sorted(records, key=lambda r: r.vector_id)
hasher = hashlib.sha256()
record_hashes = []
for record in sorted_records:
# Hash elke registratie afzonderlijk
record_data = (
record.vector_id
+ "|"
+ record.vector.tobytes().hex()
+ "|"
+ json.dumps(record.metadata, sort_keys=True)
)
record_hash = hashlib.sha256(record_data.encode()).hexdigest()
record_hashes.append(record_hash)
hasher.update(record_hash.encode())
collection_hash = hasher.hexdigest()
manifest = {
"collection_name": self.collection_name,
"timestamp": datetime.utcnow().isoformat(),
"record_count": len(records),
"collection_hash": collection_hash,
"sample_record_hashes": record_hashes[:10],
}
self.manifests.append(manifest)
return manifest
def compare_manifests(
self,
previous: dict,
current: dict,
) -> dict:
"""
Vergelijk twee manifesten om wijzigingen te detecteren.
Args:
previous: Het baseline-manifest.
current: Het huidige manifest.
Returns:
Dict met vergelijkingsresultaten.
"""
changes = {
"hash_match": previous["collection_hash"] == current["collection_hash"],
"record_count_change": (
current["record_count"] - previous["record_count"]
),
"time_delta": current["timestamp"],
"previous_timestamp": previous["timestamp"],
}
if not changes["hash_match"]:
changes["alert"] = (
"Collection integrity violation detected. "
"Hash mismatch between manifests."
)
changes["severity"] = "high"
if changes["record_count_change"] > 0:
changes["likely_cause"] = (
f"{changes['record_count_change']} new records added"
)
elif changes["record_count_change"] < 0:
changes["likely_cause"] = (
f"{abs(changes['record_count_change'])} records removed"
)
else:
changes["likely_cause"] = (
"Records modified in place (count unchanged)"
)
return changesPraktische overwegingen
Beperkingen van auditlogs in vectordatabases
De meeste vectordatabases hebben beperkte ingebouwde audit logging vergeleken met relationele databases. Pinecone biedt metrics op indexniveau, maar geen audit trails per vector. Weaviate logt operaties op API-niveau, maar onderhoudt geen wijzigingsgeschiedenis per object. Chroma, een embedded database, heeft geen ingebouwde audit logging. Qdrant biedt toegangslogs, maar geen auditing op wijzigingsniveau.
Implementeer ter compensatie audit logging op de applicatielaag. Omhul alle vectordatabase-operaties (insert, update, delete, query) met een logginglaag die de operatie, de aanvragende identiteit, het tijdstempel en de getroffen vector-ID's registreert. Sla deze auditlogs apart van de vectordatabase zelf op, zodat een aanvaller die de database compromitteert niet ook met de audit trail kan knoeien.
Onderzoekscasestudy: Gerichte manipulatie van retrieval
Bekijk een scenario waarin een RAG-systeem voor productaanbevelingen onevenredig vaak de producten van een specifieke leverancier begint aan te bevelen. Het beveiligingsteam vermoedt vergiftiging van de vectordatabase, maar heeft forensisch bewijs nodig.
Het onderzoek begint met een collectiebrede scan met de VectorForensicAnalyzer. De scan onthult dat 47 vectoren zijn gemarkeerd vanwege anomalieën van de in totaal 500.000 vectoren (0,009%). Hiervan zijn er 31 gemarkeerd als "dimensional_outlier" en delen er 12 dezelfde bronmetadata die verwijst naar een batch-ingestietaak die twee weken geleden draaide.
De verificatiestap voor consistentie tussen inhoud en embedding is veelzeggender: 28 van de 47 gemarkeerde vectoren falen de consistentiecontrole, wat betekent dat hun opgeslagen embeddings niet overeenkomen met wat het embedding-model produceert uit hun opgeslagen inhoud. Dit is een sterke indicatie dat de embeddings handmatig zijn gemaakt in plaats van berekend uit de inhoud. De aanvaller maakte documenten met goedaardig ogende productbeschrijvingen, maar kende ze adversarieel gemaakte embeddings toe die ze dicht bij veelvoorkomende productqueryvectoren plaatsen.
Clusteranalyse toont aan dat de 28 niet-overeenkomende vectoren drie strakke clusters vormen in de embedding-ruimte, elk gepositioneerd nabij een regio met veel query-verkeer (beste laptops, beste telefoons, beste tablets). Alle drie de clusters wijzen naar de producten van dezelfde leverancier. De tracering van de ingestiepijplijn onthult dat de vectoren zijn ingevoegd via een bulk-import-API-endpoint dat de normale documentverwerkingspijplijn omzeilde, met gebruik van een API-sleutel die was toegewezen aan een geautomatiseerde data-refresh-service.
Het forensische bewijspakket omvat: de lijst met vergiftigde vector-ID's met hun anomaliescores, de mismatch-analyse tussen inhoud en embedding die de discrepantie tussen opgeslagen en herberekende embeddings toont, de clusteranalyse die strategische positionering in de embedding-ruimte toont, de ingestiepijplijnlogs die de bulk-import-gebeurtenis tonen, en de gebruiksregistraties van de API-sleutel voor het gecompromitteerde serviceaccount.
Forensisch herstel en remediatie
Na het identificeren van vergiftigde vectoren moet het remediatieproces snelheid (de vergiftiging snel verwijderen) in balans brengen met forensisch behoud (het bewaren van bewijs voor onderzoek). Maak voordat je vectoren verwijdert een volledige export van de getroffen vectoren, inclusief hun embeddings, metadata, inhoud en eventuele beschikbare audit-trail-registraties.
De veiligste remediatieaanpak is om de hele collectie opnieuw te embedden vanuit de brondocumenten met de standaard-ingestiepijplijn, en vervolgens de opnieuw geëmbedde collectie te vergelijken met de huidige collectie om alle discrepanties te identificeren. Dit vangt niet alleen de vectoren op die je via anomaliedetectie hebt geïdentificeerd, maar ook eventuele vergiftigde vectoren die geavanceerd genoeg waren om detectie te ontwijken. Voor grote collecties kan dit echter onpraktisch zijn, in welk geval gerichte verwijdering van geïdentificeerde vergiftigde vectoren gecombineerd met doorlopende monitoring het alternatief is.
Stel na de remediatie een baseline-manifest op vanuit de schone status van de collectie en implementeer continue monitoring met de hierboven beschreven integriteitstools. Configureer waarschuwingen voor nieuwe vectoren die de consistentiecontrole tussen inhoud en embedding niet doorstaan en voor vectoren met afwijkende statistische eigenschappen.
Temporele analyse van vector-invoegingen
Het analyseren van de timing van vector-invoegingen kan vergiftigingscampagnes onthullen. Legitieme ingestiepijplijnen voegen vectoren doorgaans in regelmatige batches in die overeenkomen met geplande data-refresh-taken. Vergiftigingsaanvallen kunnen vectoren op ongebruikelijke tijden, in ongebruikelijke hoeveelheden of met ongebruikelijke temporele patronen invoegen.
Bouw een tijdlijn van vector-invoegingen op basis van het ingestie-tijdstempel en zoek naar anomalieën: invoegingen buiten de normale uitvoeringsvensters van de pijplijn, ongebruikelijk grote of kleine batchgroottes, en invoegingen van onverwachte bron-identifiers. Vergelijk de invoegtijdlijn met de toegangslogs voor het API-endpoint van de vectordatabase om de identiteit en netwerkoorsprong van elke invoeggebeurtenis te identificeren.
Voor vectordatabases die update-operaties ondersteunen (het wijzigen van de embedding of metadata van een bestaande vector), is het onderscheid tussen invoeging en wijziging forensisch belangrijk. Een aanvaller die bestaande vectoren wijzigt, is moeilijker te detecteren omdat het totale aantal vectoren niet verandert, maar de wijzigingstijdstempels kunnen nog steeds zichtbaar zijn in de interne metadata van de database.
Prestatieoverwegingen voor forensisch scannen
Volledige forensische scans van een collectie (embeddings herberekenen en vergelijken met opgeslagen vectoren) zijn rekenkundig kostbaar. Voor een collectie met 1 miljoen vectoren die een 1536-dimensionaal embedding-model gebruikt, vereist alleen al de herberekening 1 miljoen embedding-API-aanroepen. Gebruik sampling voor routinematige monitoring (verifieer dagelijks een willekeurige 1-5% van de collectie) en reserveer volledige scans voor bevestigde incidenten. Stratificeer bij het samplen op ingestiedatum: oversample recent toegevoegde vectoren, aangezien deze waarschijnlijker vergiftigde inhoud bevatten.
Forensische gereedheid voor vectordatabases
Voorbereiding vóór een incident
Configureer het volgende voordat een incident plaatsvindt om forensische gereedheid te garanderen. Schakel toegangslogging op API-niveau in voor de vectordatabase, waarbij ten minste het operatietype, de aanvragende identiteit, het tijdstempel en de getroffen collectie- en vector-ID's worden vastgelegd. Als de vectordatabase dit ondersteunt, schakel change data capture (CDC) in om een log bij te houden van alle insert-, update- en delete-operaties.
Stel baseline-manifesten op voor elke productiecollectie volgens een regelmatig schema (minimaal wekelijks). Sla manifesten op in een append-only logopslag, gescheiden van de vectordatabase. Implementeer consistentiecontroles tussen inhoud en embedding op steekproefbasis als onderdeel van reguliere datakwaliteitsmonitoring, zodat vergiftiging tijdig wordt gedetecteerd in plaats van pas tijdens een reactief onderzoek.
Documenteer de architectuur van de ingestiepijplijn grondig, inclusief alle databronnen, verwerkingsfasen, authenticatiemechanismen en toegangscontroles bij elke fase. Deze documentatie is essentieel voor het traceren van het vergiftigingspad tijdens een onderzoek. Zonder deze documentatie moet de forensisch onderzoeker de pijplijnarchitectuur onder tijdsdruk reverse-engineeren.
Onderhoud ten slotte bekende, betrouwbare snapshots van vectorcollecties. Als je vectordatabase backup en restore ondersteunt, maak dan regelmatig back-ups en verifieer ze. Zo niet, onderhoud dan een aparte kopie van de brondocumenten van de collectie en een registratie van de gebruikte versie van het embedding-model, zodat de collectie vanaf nul kan worden herbouwd als de vergiftiging wijdverspreid is en remediatie van afzonderlijke vectoren onpraktisch is.
Referenties
- Zou, W., Geng, R., Wang, B., & Jia, J. (2024). "PoisonedRAG: Knowledge Corruption Attacks to Retrieval-Augmented Generation of Large Language Models." arXiv:2402.07867. https://arxiv.org/abs/2402.07867
- Zhong, Z., Liu, Z., Shang, J., & Sun, H. (2023). "Poisoning Retrieval Corpora by Injecting Adversarial Passages." arXiv:2310.19156. https://arxiv.org/abs/2310.19156
- Xiang, C., Wu, T., Zhong, Z., Wagner, D., Chen, D., & Mittal, P. (2024). "Certifiably Robust RAG against Retrieval Corruption." arXiv:2405.15556. https://arxiv.org/abs/2405.15556