Multi-Model Attack Correlation
Technieken voor het correleren en analyseren van gecoördineerde aanvallen die zich richten op meerdere AI-modellen of -systemen binnen een organisatie.
Overzicht
Moderne organisaties deployen tientallen AI-modellen over verschillende applicaties: klantgerichte chatbots, interne code-assistenten, documentverwerkingssystemen, aanbevelingsengines, fraudedetectiemodellen en content-moderatieclassifiers. Een geavanceerde aanvaller die zo'n organisatie target, kan een gecoördineerde campagne uitvoeren die meerdere modellen tegelijkertijd of opeenvolgend probeert, test en aanvalt. Elke individuele interactie kan onschuldig of niet doorslaggevend lijken, maar samen gecorreleerd onthullen ze een samenhangende aanvalscampagne.
Multi-modelaanvalcorrelatie is de forensische discipline van het verbinden van adversariële activiteit over deze ongelijksoortige AI-systemen om gecoördineerde campagnes te identificeren, ze toe te schrijven aan specifieke dreigingsactoren en de doelstellingen van de aanvaller te begrijpen. Dit is analoog aan hoe traditionele SOC-analisten alerts correleren over firewalls, endpoints en authenticatiesystemen -- maar aangepast voor de unieke telemetrie die door AI-systemen wordt gegenereerd.
De uitdaging is aanzienlijk: AI-systemen genereren heterogene telemetrie (tekstlogs, numerieke metrieken, embeddingvectoren, confidence-scores), draaien op verschillende infrastructuur en delen mogelijk geen gemeenschappelijk logformaat. Correlatie vereist het normaliseren van deze diverse data tot een gemeenschappelijk framework en het toepassen van zowel regelgebaseerde als statistische technieken om patronen te identificeren die systeemgrenzen overschrijden.
Patronen van multi-modelaanvallen
Verkenningscampagnes
Een aanvaller kan meerdere modellen proberen om de AI-deployment van een organisatie in kaart te brengen:
- De capaciteiten en beperkingen van elk model testen
- Identificeren welke modellen componenten delen (dezelfde embeddings, hetzelfde RAG-corpus, dezelfde guardrails)
- Vertrouwensrelaties tussen modellen bepalen (de uitvoer van het ene model voedt de invoer van het andere)
- Het aanvalsoppervlak in kaart brengen voor een meer gerichte vervolgaanval
Pivot-aanvallen
De aanvaller compromitteert één AI-systeem en gebruikt het als opstap om andere aan te vallen:
- Informatie uit Model A extraheren die helpt bij het maken van aanvallen tegen Model B
- Een gejailbreakte chatbot gebruiken om interne API's te proberen of de systeemarchitectuur te ontdekken
- Een gecompromitteerde code-assistent benutten om kwaadaardige code te injecteren die andere systemen treft
Gelijktijdige multi-vectoraanvallen
De aanvaller target meerdere modellen tegelijkertijd met verschillende aanvalstypen:
- Prompt-injectie tegen de chatbot terwijl het content-moderatiesysteem tegelijkertijd wordt getest
- Datalekpoging tegen het RAG-systeem terwijl het beveiligingsteam wordt afgeleid met een luidruchtige jailbreak-poging
- Toeleveringsketenaanval die een gedeelde dependency treft die door meerdere modellen wordt gebruikt
Gecoördineerde ontwijking
De aanvaller gebruikt meerdere interacties over verschillende modellen om een doel te bereiken dat door de verdedigingen van een enkel model zou worden geblokkeerd:
- Vraag Model A om gedeeltelijke informatie, Model B om een andere gedeeltelijke, en combineer extern
- Gebruik de uitvoer van Model A als context voor een aanval op Model B
- Buit inconsistenties tussen het veiligheidsbeleid van modellen uit
Correlatiearchitectuur
Gebeurtenisnormalisatie
De eerste stap is het normaliseren van gebeurtenissen uit diverse AI-systemen tot een gemeenschappelijk schema.
"""
Framework voor multi-modelaanvalcorrelatie.
Normaliseert, correleert en analyseert adversariële activiteit
over meerdere AI-systemen.
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any
from collections import defaultdict
import numpy as np
class NormalizedEventType(Enum):
"""Genormaliseerde gebeurtenistypen over alle AI-systemen."""
QUERY = "query"
RESPONSE = "response"
GUARDRAIL_TRIGGER = "guardrail_trigger"
ANOMALY = "anomaly"
ERROR = "error"
AUTH_EVENT = "auth_event"
CONFIG_CHANGE = "config_change"
MODEL_ACCESS = "model_access"
@dataclass
class NormalizedEvent:
"""
Genormaliseerde gebeurtenis van elk AI-systeem in de organisatie.
Alle systeemspecifieke gebeurtenissen worden gemapt naar dit gemeenschappelijke schema
om correlatie tussen systemen mogelijk te maken.
"""
event_id: str
timestamp: float
source_system: str # bv. "chatbot-prod", "code-assistant", "fraud-detector"
model_id: str
event_type: NormalizedEventType
actor_id: str | None # Gebruikers-ID, API-sleutel of sessie-identifier
actor_ip: str | None
content_hash: str | None # Hash van invoer-/uitvoercontent
severity: float # 0.0 tot 1.0 genormaliseerde ernst
metadata: dict[str, Any] = field(default_factory=dict)
tags: list[str] = field(default_factory=list)
class EventNormalizer:
"""
Normaliseer gebeurtenissen van verschillende AI-systemen tot een gemeenschappelijk schema.
Elk bronsysteem heeft een geregistreerde adapter die zijn
native gebeurtenisformaat mapt naar NormalizedEvent.
"""
def __init__(self):
self.adapters: dict[str, Any] = {}
def register_adapter(self, system_name: str, adapter_fn) -> None:
self.adapters[system_name] = adapter_fn
def normalize(self, system_name: str, raw_event: dict) -> NormalizedEvent:
if system_name not in self.adapters:
raise ValueError(f"No adapter registered for system: {system_name}")
return self.adapters[system_name](raw_event)
# Voorbeeldadapters
def chatbot_adapter(raw: dict) -> NormalizedEvent:
"""Normaliseer gebeurtenissen van de klantgerichte chatbot."""
event_type_map = {
"user_message": NormalizedEventType.QUERY,
"bot_response": NormalizedEventType.RESPONSE,
"safety_block": NormalizedEventType.GUARDRAIL_TRIGGER,
}
return NormalizedEvent(
event_id=raw["id"],
timestamp=raw["ts"],
source_system="chatbot-prod",
model_id=raw.get("model", "unknown"),
event_type=event_type_map.get(raw["type"], NormalizedEventType.QUERY),
actor_id=raw.get("user_id"),
actor_ip=raw.get("ip"),
content_hash=raw.get("content_hash"),
severity=raw.get("risk_score", 0.0),
metadata=raw.get("metadata", {}),
)
def code_assistant_adapter(raw: dict) -> NormalizedEvent:
"""Normaliseer gebeurtenissen van de interne code-assistent."""
return NormalizedEvent(
event_id=raw["request_id"],
timestamp=raw["timestamp"],
source_system="code-assistant",
model_id=raw.get("model_version", "unknown"),
event_type=NormalizedEventType.QUERY,
actor_id=raw.get("employee_id"),
actor_ip=raw.get("source_ip"),
content_hash=raw.get("prompt_hash"),
severity=raw.get("anomaly_score", 0.0),
metadata={"language": raw.get("language"), "repo": raw.get("repo")},
)Correlatie-engine
class MultiModelCorrelator:
"""
Correleer adversariële activiteit over meerdere AI-systemen.
Gebruikt meerdere correlatiestrategieën:
1. Actor-gebaseerd: Dezelfde gebruiker/IP target meerdere systemen
2. Temporeel: Gecoördineerde timing over systemen
3. Content-gebaseerd: Vergelijkbare aanval-payloads over systemen
4. Gedragsmatig: Aanvalspatroonsignaturen die systemen overspannen
"""
def __init__(self, time_window_seconds: float = 3600.0):
self.time_window = time_window_seconds
self.events: list[NormalizedEvent] = []
def ingest(self, event: NormalizedEvent) -> None:
self.events.append(event)
def correlate_by_actor(self) -> list[dict]:
"""
Vind actoren die met meerdere AI-systemen interacteren.
Een actor die 3+ verschillende systemen in een tijdvenster target
is een sterke indicator van gecoördineerde verkenning.
"""
actor_systems: dict[str, dict] = defaultdict(lambda: {
"systems": set(),
"events": [],
"timestamps": [],
})
for event in self.events:
actor_key = event.actor_id or event.actor_ip
if not actor_key:
continue
actor_systems[actor_key]["systems"].add(event.source_system)
actor_systems[actor_key]["events"].append(event)
actor_systems[actor_key]["timestamps"].append(event.timestamp)
correlations = []
for actor, data in actor_systems.items():
if len(data["systems"]) >= 2:
timestamps = sorted(data["timestamps"])
time_span = timestamps[-1] - timestamps[0] if len(timestamps) > 1 else 0
# Bereken ernst over systemen
system_severities = defaultdict(list)
for evt in data["events"]:
system_severities[evt.source_system].append(evt.severity)
avg_severity = float(np.mean([
np.max(sevs) for sevs in system_severities.values()
]))
correlations.append({
"correlation_type": "actor_multi_system",
"actor": actor,
"systems_targeted": list(data["systems"]),
"system_count": len(data["systems"]),
"event_count": len(data["events"]),
"time_span_seconds": time_span,
"avg_severity": round(avg_severity, 4),
"risk_assessment": (
"HIGH" if len(data["systems"]) >= 3 and avg_severity > 0.5
else "MEDIUM" if len(data["systems"]) >= 2 and avg_severity > 0.3
else "LOW"
),
})
return sorted(correlations, key=lambda c: c["event_count"], reverse=True)
def correlate_by_temporal_pattern(
self,
window_seconds: float = 60.0,
) -> list[dict]:
"""
Vind temporeel gecorreleerde gebeurtenissen over systemen.
Gebeurtenissen van verschillende systemen die binnen een nauw
tijdvenster plaatsvinden kunnen op gecoördineerde activiteit wijzen.
"""
# Sorteer gebeurtenissen op timestamp
sorted_events = sorted(self.events, key=lambda e: e.timestamp)
correlations = []
for i, event in enumerate(sorted_events):
if event.severity < 0.3:
continue # Sla gebeurtenissen met lage ernst over voor efficiëntie
window_events = []
for j in range(i + 1, len(sorted_events)):
if sorted_events[j].timestamp - event.timestamp > window_seconds:
break
if sorted_events[j].source_system != event.source_system:
window_events.append(sorted_events[j])
if len(window_events) >= 2:
systems = {event.source_system} | {
e.source_system for e in window_events
}
if len(systems) >= 2:
correlations.append({
"correlation_type": "temporal_cluster",
"anchor_event": event.event_id,
"anchor_system": event.source_system,
"correlated_events": [e.event_id for e in window_events],
"systems_involved": list(systems),
"time_window_seconds": window_seconds,
"max_severity": max(
event.severity,
max(e.severity for e in window_events),
),
})
return correlations
def correlate_by_content(self) -> list[dict]:
"""
Vind gebeurtenissen met vergelijkbare content over systemen.
Dezelfde of vergelijkbare aanval-payloads gebruikt tegen verschillende
systemen suggereren een gecoördineerde campagne.
"""
# Groepeer gebeurtenissen op content-hash
content_groups: dict[str, list[NormalizedEvent]] = defaultdict(list)
for event in self.events:
if event.content_hash:
content_groups[event.content_hash].append(event)
correlations = []
for content_hash, events in content_groups.items():
systems = set(e.source_system for e in events)
if len(systems) >= 2:
correlations.append({
"correlation_type": "content_reuse",
"content_hash": content_hash,
"systems": list(systems),
"event_count": len(events),
"actors": list(set(
e.actor_id for e in events if e.actor_id
)),
"time_span": (
max(e.timestamp for e in events)
- min(e.timestamp for e in events)
),
"description": (
f"Same content hash seen across {len(systems)} systems "
f"in {len(events)} events"
),
})
return correlations
def generate_campaign_report(self) -> dict:
"""
Genereer een uitgebreid correlatierapport dat
alle correlatiestrategieën combineert.
"""
actor_corr = self.correlate_by_actor()
temporal_corr = self.correlate_by_temporal_pattern()
content_corr = self.correlate_by_content()
# Identificeer waarschijnlijke campagnes (gecoördineerde activiteit met hoge zekerheid)
campaigns = []
for ac in actor_corr:
if ac["risk_assessment"] in ("HIGH", "MEDIUM"):
campaigns.append({
"type": "actor_campaign",
"confidence": "HIGH" if ac["system_count"] >= 3 else "MEDIUM",
"actor": ac["actor"],
"systems": ac["systems_targeted"],
"events": ac["event_count"],
"evidence": ac,
})
return {
"analysis_period": {
"start": min(e.timestamp for e in self.events) if self.events else None,
"end": max(e.timestamp for e in self.events) if self.events else None,
},
"total_events_analyzed": len(self.events),
"unique_systems": len(set(e.source_system for e in self.events)),
"unique_actors": len(set(
e.actor_id for e in self.events if e.actor_id
)),
"actor_correlations": actor_corr,
"temporal_correlations": temporal_corr[:20],
"content_correlations": content_corr,
"identified_campaigns": campaigns,
"campaign_count": len(campaigns),
}Constructie van aanvalsgrafiek
Aanvalsgrafieken bouwen uit gecorreleerde gebeurtenissen
@dataclass
class AttackGraphNode:
"""Een knooppunt in de aanvalsgrafiek dat een significante gebeurtenis representeert."""
node_id: str
event: NormalizedEvent
stage: str # "reconnaissance", "weaponization", "delivery", "exploitation", "action"
atlas_technique: str | None = None
@dataclass
class AttackGraphEdge:
"""Een rand die twee gebeurtenissen in een aanvalssequentie verbindt."""
from_node: str
to_node: str
relationship: str # "preceded_by", "enabled", "same_actor", "same_content"
confidence: float
time_delta_seconds: float
def build_attack_graph(
correlated_events: list[NormalizedEvent],
correlations: list[dict],
) -> dict:
"""
Construeer een aanvalsgrafiek uit gecorreleerde gebeurtenissen.
De aanvalsgrafiek brengt de voortgang van een multi-model
aanvalscampagne in kaart door de fasen verkenning, delivery,
exploitatie en actie.
"""
nodes = []
edges = []
# Classificeer gebeurtenissen in aanvalsfasen
for event in correlated_events:
stage = _classify_attack_stage(event)
node = AttackGraphNode(
node_id=event.event_id,
event=event,
stage=stage,
atlas_technique=_map_to_atlas(event),
)
nodes.append(node)
# Creëer randen op basis van correlaties
event_map = {e.event_id: e for e in correlated_events}
for corr in correlations:
if corr["correlation_type"] == "temporal_cluster":
anchor = corr["anchor_event"]
for related in corr.get("correlated_events", []):
if anchor in event_map and related in event_map:
time_delta = abs(
event_map[related].timestamp
- event_map[anchor].timestamp
)
edges.append(AttackGraphEdge(
from_node=anchor,
to_node=related,
relationship="temporal_proximity",
confidence=0.7,
time_delta_seconds=time_delta,
))
return {
"nodes": len(nodes),
"edges": len(edges),
"stages": {
stage: sum(1 for n in nodes if n.stage == stage)
for stage in ["reconnaissance", "weaponization", "delivery", "exploitation", "action"]
},
"atlas_techniques": list(set(
n.atlas_technique for n in nodes if n.atlas_technique
)),
"systems_in_graph": list(set(
n.event.source_system for n in nodes
)),
}
def _classify_attack_stage(event: NormalizedEvent) -> str:
"""Map een gebeurtenis naar een fase van de aanvals-kill-chain."""
if event.severity < 0.3:
return "reconnaissance"
if event.event_type == NormalizedEventType.GUARDRAIL_TRIGGER:
return "exploitation"
if event.severity > 0.7:
return "action"
return "delivery"
def _map_to_atlas(event: NormalizedEvent) -> str | None:
"""Map een gebeurtenis naar een MITRE ATLAS-techniek-ID."""
type_map = {
NormalizedEventType.GUARDRAIL_TRIGGER: "AML.T0051", # LLM Prompt Injection
NormalizedEventType.ANOMALY: "AML.T0015", # Evade ML Model
NormalizedEventType.MODEL_ACCESS: "AML.T0012", # ML Model Inference API Access
}
return type_map.get(event.event_type)Productie van threat intelligence
Indicators of Compromise van campagnes
Uit gecorreleerde multi-modelaanvalsdata produceer je uitvoerbare threat intelligence:
def produce_campaign_iocs(
campaign: dict,
events: list[NormalizedEvent],
) -> dict:
"""
Extraheer indicators of compromise uit een gecorreleerde aanvalscampagne.
Produceert IoC's die kunnen worden gedeeld met peer-organisaties
en geïntegreerd in detectiesystemen.
"""
actor_iocs = list(set(
e.actor_id for e in events if e.actor_id
))
ip_iocs = list(set(
e.actor_ip for e in events if e.actor_ip
))
content_iocs = list(set(
e.content_hash for e in events if e.content_hash
))
# Extraheer timingpatronen
timestamps = sorted(e.timestamp for e in events)
if len(timestamps) >= 2:
intervals = [
timestamps[i+1] - timestamps[i]
for i in range(len(timestamps) - 1)
]
timing_pattern = {
"mean_interval_seconds": round(float(np.mean(intervals)), 2),
"std_interval_seconds": round(float(np.std(intervals)), 2),
"regularity_score": round(
1.0 - min(float(np.std(intervals)) / max(float(np.mean(intervals)), 1), 1.0),
4,
),
}
else:
timing_pattern = {}
return {
"campaign_id": campaign.get("actor", "unknown"),
"actor_identifiers": actor_iocs,
"ip_addresses": ip_iocs,
"content_hashes": content_iocs[:50],
"systems_targeted": campaign.get("systems", []),
"timing_pattern": timing_pattern,
"atlas_techniques": campaign.get("atlas_techniques", []),
"first_seen": min(timestamps) if timestamps else None,
"last_seen": max(timestamps) if timestamps else None,
"event_count": len(events),
"sharing_tlp": "TLP:AMBER", # Standaard naar beperkt delen
}Operationele deployment
Logaggregatie tussen systemen
Om multi-modelcorrelatie te laten werken, moeten gebeurtenissen van alle AI-systemen worden geaggregeerd op een centraal analyseplatform. Gangbare benaderingen:
- SIEM-integratie: Stuur gebeurtenissen van AI-systemen door naar een bestaand SIEM (Splunk, Elastic, Sentinel) met AI-specifieke parseregels
- Speciaal AI-beveiligingsplatform: Doelgebouwd platform voor correlatie van AI-telemetrie
- Data lake-aanpak: Aggregeer ruwe gebeurtenissen in een data lake en voer correlatiequery's uit met SQL of PySpark
# Voorbeeld: AI-gebeurtenissen doorsturen naar een Elasticsearch-cluster
# met Filebeat met AI-specifieke processors
# filebeat.yml-fragment voor het verzamelen van logs van AI-systemen:
# filebeat.inputs:
# - type: log
# paths: ["/var/log/ai-systems/chatbot/*.jsonl"]
# json.keys_under_root: true
# fields:
# source_system: "chatbot-prod"
#
# - type: log
# paths: ["/var/log/ai-systems/code-assistant/*.jsonl"]
# json.keys_under_root: true
# fields:
# source_system: "code-assistant"Alertdrempels
| Correlatiesignaal | Drempel | Prioriteit | Actie |
|---|---|---|---|
| Eén actor, 3+ systemen, < 1 uur | Elke ernst | KRITIEK | Onmiddellijk onderzoek |
| Dezelfde content-hash over 2+ systemen | Ernst > 0,5 | HOOG | Onderzoek binnen 1 uur |
| Temporele cluster over 3+ systemen | Ernst > 0,3 | HOOG | Onderzoek binnen 4 uur |
| Eén actor, 2 systemen, aanhoudend | Elke ernst | GEMIDDELD | Beoordeel binnen 8 uur |
| Hergebruik van content, lage ernst | Ernst < 0,3 | LAAG | Wekelijkse beoordeling |
Referenties
- MITRE ATLAS. (2024). Adversarial Threat Landscape for Artificial Intelligence Systems. https://atlas.mitre.org/
- MITRE. (2024). ATT&CK Framework v14. https://attack.mitre.org/
- NIST. (2023). Artificial Intelligence Risk Management Framework (AI RMF 1.0). NIST AI 100-1. https://doi.org/10.6028/NIST.AI.100-1