Multi-Model Attack Correlation
Techniques for correlating and analyzing coordinated attacks that target multiple AI models or systems within an organization.
Overview
Modern organizations deploy dozens of AI models across different applications: customer-facing chatbots, internal code assistants, document processing systems, recommendation engines, fraud detection models, and content moderation classifiers. A sophisticated adversary targeting such an organization may conduct a coordinated campaign that probes, tests, and attacks multiple models simultaneously or sequentially. Each individual interaction may appear benign or inconclusive, but correlated together, they reveal a coherent attack campaign.
Multi-model attack correlation is the forensic discipline of connecting adversarial activity across these disparate AI systems to identify coordinated campaigns, attribute them to specific threat actors, and understand the attacker's objectives. This is analogous to how traditional SOC analysts correlate alerts across firewalls, endpoints, and authentication systems -- but adapted for the unique telemetry generated by AI systems.
The challenge is significant: AI systems generate heterogeneous telemetry (text logs, numerical metrics, embedding vectors, confidence scores), operate on different infrastructure, and may not share a common logging format. Correlation requires normalizing this diverse data into a common framework and applying both rule-based and statistical techniques to identify patterns that span system boundaries.
Multi-Model Attack Patterns
Reconnaissance Campaigns
An attacker may probe multiple models to map an organization's AI deployment:
- Test each model's capabilities and limitations
- Identify which models share components (same embeddings, same RAG corpus, same guardrails)
- Determine trust relationships between models (one model's output feeds another's input)
- Map the attack surface for a more targeted follow-up attack
Pivot Attacks
The attacker compromises one AI system and uses it as a stepping stone to attack others:
- Extract information from Model A that helps craft attacks against Model B
- Use a jailbroken chatbot to probe internal APIs or discover system architecture
- Leverage a compromised code assistant to inject malicious code that affects other systems
Simultaneous Multi-Vector Attacks
The attacker targets multiple models at the same time with different attack types:
- Prompt injection against the chatbot while simultaneously testing the content moderation system
- Data exfiltration attempt against the RAG system while distracting the security team with a noisy jailbreak attempt
- Supply chain attack affecting a shared dependency used by multiple models
Coordinated Evasion
The attacker uses multiple interactions across different models to achieve an objective that would be blocked by any single model's defenses:
- Ask Model A for partial information, Model B for another partial, and combine externally
- Use Model A's output as context for an attack on Model B
- Exploit inconsistencies between models' safety policies
Correlation Architecture
Event Normalization
The first step is normalizing events from diverse AI systems into a common schema.
"""
Multi-model attack correlation framework.
Normalizes, correlates, and analyzes adversarial activity
across multiple AI systems.
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Any
from collections import defaultdict
import numpy as np
class NormalizedEventType(Enum):
"""Normalized event types across all AI systems."""
QUERY = "query"
RESPONSE = "response"
GUARDRAIL_TRIGGER = "guardrail_trigger"
ANOMALY = "anomaly"
ERROR = "error"
AUTH_EVENT = "auth_event"
CONFIG_CHANGE = "config_change"
MODEL_ACCESS = "model_access"
@dataclass
class NormalizedEvent:
"""
Normalized event from any AI system in the organization.
All system-specific events are mapped to this common schema
to enable cross-system correlation.
"""
event_id: str
timestamp: float
source_system: str # e.g., "chatbot-prod", "code-assistant", "fraud-detector"
model_id: str
event_type: NormalizedEventType
actor_id: str | None # User ID, API key, or session identifier
actor_ip: str | None
content_hash: str | None # Hash of input/output content
severity: float # 0.0 to 1.0 normalized severity
metadata: dict[str, Any] = field(default_factory=dict)
tags: list[str] = field(default_factory=list)
class EventNormalizer:
"""
Normalize events from different AI systems into a common schema.
Each source system has a registered adapter that maps its
native event format to NormalizedEvent.
"""
def __init__(self):
self.adapters: dict[str, Any] = {}
def register_adapter(self, system_name: str, adapter_fn) -> None:
self.adapters[system_name] = adapter_fn
def normalize(self, system_name: str, raw_event: dict) -> NormalizedEvent:
if system_name not in self.adapters:
raise ValueError(f"No adapter registered for system: {system_name}")
return self.adapters[system_name](raw_event)
# Example adapters
def chatbot_adapter(raw: dict) -> NormalizedEvent:
"""Normalize events from the customer-facing chatbot."""
event_type_map = {
"user_message": NormalizedEventType.QUERY,
"bot_response": NormalizedEventType.RESPONSE,
"safety_block": NormalizedEventType.GUARDRAIL_TRIGGER,
}
return NormalizedEvent(
event_id=raw["id"],
timestamp=raw["ts"],
source_system="chatbot-prod",
model_id=raw.get("model", "unknown"),
event_type=event_type_map.get(raw["type"], NormalizedEventType.QUERY),
actor_id=raw.get("user_id"),
actor_ip=raw.get("ip"),
content_hash=raw.get("content_hash"),
severity=raw.get("risk_score", 0.0),
metadata=raw.get("metadata", {}),
)
def code_assistant_adapter(raw: dict) -> NormalizedEvent:
"""Normalize events from the internal code assistant."""
return NormalizedEvent(
event_id=raw["request_id"],
timestamp=raw["timestamp"],
source_system="code-assistant",
model_id=raw.get("model_version", "unknown"),
event_type=NormalizedEventType.QUERY,
actor_id=raw.get("employee_id"),
actor_ip=raw.get("source_ip"),
content_hash=raw.get("prompt_hash"),
severity=raw.get("anomaly_score", 0.0),
metadata={"language": raw.get("language"), "repo": raw.get("repo")},
)Correlation Engine
class MultiModelCorrelator:
"""
Correlate adversarial activity across multiple AI systems.
Uses multiple correlation strategies:
1. Actor-based: Same user/IP targeting multiple systems
2. Temporal: Coordinated timing across systems
3. Content-based: Similar attack payloads across systems
4. Behavioral: Attack pattern signatures that span systems
"""
def __init__(self, time_window_seconds: float = 3600.0):
self.time_window = time_window_seconds
self.events: list[NormalizedEvent] = []
def ingest(self, event: NormalizedEvent) -> None:
self.events.append(event)
def correlate_by_actor(self) -> list[dict]:
"""
Find actors who interact with multiple AI systems.
An actor targeting 3+ different systems in a time window
is a strong indicator of coordinated reconnaissance.
"""
actor_systems: dict[str, dict] = defaultdict(lambda: {
"systems": set(),
"events": [],
"timestamps": [],
})
for event in self.events:
actor_key = event.actor_id or event.actor_ip
if not actor_key:
continue
actor_systems[actor_key]["systems"].add(event.source_system)
actor_systems[actor_key]["events"].append(event)
actor_systems[actor_key]["timestamps"].append(event.timestamp)
correlations = []
for actor, data in actor_systems.items():
if len(data["systems"]) >= 2:
timestamps = sorted(data["timestamps"])
time_span = timestamps[-1] - timestamps[0] if len(timestamps) > 1 else 0
# Calculate severity across systems
system_severities = defaultdict(list)
for evt in data["events"]:
system_severities[evt.source_system].append(evt.severity)
avg_severity = float(np.mean([
np.max(sevs) for sevs in system_severities.values()
]))
correlations.append({
"correlation_type": "actor_multi_system",
"actor": actor,
"systems_targeted": list(data["systems"]),
"system_count": len(data["systems"]),
"event_count": len(data["events"]),
"time_span_seconds": time_span,
"avg_severity": round(avg_severity, 4),
"risk_assessment": (
"HIGH" if len(data["systems"]) >= 3 and avg_severity > 0.5
else "MEDIUM" if len(data["systems"]) >= 2 and avg_severity > 0.3
else "LOW"
),
})
return sorted(correlations, key=lambda c: c["event_count"], reverse=True)
def correlate_by_temporal_pattern(
self,
window_seconds: float = 60.0,
) -> list[dict]:
"""
Find temporally correlated events across systems.
Events from different systems occurring within a tight
time window may indicate coordinated activity.
"""
# Sort events by timestamp
sorted_events = sorted(self.events, key=lambda e: e.timestamp)
correlations = []
for i, event in enumerate(sorted_events):
if event.severity < 0.3:
continue # Skip low-severity events for efficiency
window_events = []
for j in range(i + 1, len(sorted_events)):
if sorted_events[j].timestamp - event.timestamp > window_seconds:
break
if sorted_events[j].source_system != event.source_system:
window_events.append(sorted_events[j])
if len(window_events) >= 2:
systems = {event.source_system} | {
e.source_system for e in window_events
}
if len(systems) >= 2:
correlations.append({
"correlation_type": "temporal_cluster",
"anchor_event": event.event_id,
"anchor_system": event.source_system,
"correlated_events": [e.event_id for e in window_events],
"systems_involved": list(systems),
"time_window_seconds": window_seconds,
"max_severity": max(
event.severity,
max(e.severity for e in window_events),
),
})
return correlations
def correlate_by_content(self) -> list[dict]:
"""
Find events with similar content across systems.
Same or similar attack payloads used against different
systems suggest a coordinated campaign.
"""
# Group events by content hash
content_groups: dict[str, list[NormalizedEvent]] = defaultdict(list)
for event in self.events:
if event.content_hash:
content_groups[event.content_hash].append(event)
correlations = []
for content_hash, events in content_groups.items():
systems = set(e.source_system for e in events)
if len(systems) >= 2:
correlations.append({
"correlation_type": "content_reuse",
"content_hash": content_hash,
"systems": list(systems),
"event_count": len(events),
"actors": list(set(
e.actor_id for e in events if e.actor_id
)),
"time_span": (
max(e.timestamp for e in events)
- min(e.timestamp for e in events)
),
"description": (
f"Same content hash seen across {len(systems)} systems "
f"in {len(events)} events"
),
})
return correlations
def generate_campaign_report(self) -> dict:
"""
Generate a comprehensive correlation report combining
all correlation strategies.
"""
actor_corr = self.correlate_by_actor()
temporal_corr = self.correlate_by_temporal_pattern()
content_corr = self.correlate_by_content()
# Identify likely campaigns (high-confidence coordinated activity)
campaigns = []
for ac in actor_corr:
if ac["risk_assessment"] in ("HIGH", "MEDIUM"):
campaigns.append({
"type": "actor_campaign",
"confidence": "HIGH" if ac["system_count"] >= 3 else "MEDIUM",
"actor": ac["actor"],
"systems": ac["systems_targeted"],
"events": ac["event_count"],
"evidence": ac,
})
return {
"analysis_period": {
"start": min(e.timestamp for e in self.events) if self.events else None,
"end": max(e.timestamp for e in self.events) if self.events else None,
},
"total_events_analyzed": len(self.events),
"unique_systems": len(set(e.source_system for e in self.events)),
"unique_actors": len(set(
e.actor_id for e in self.events if e.actor_id
)),
"actor_correlations": actor_corr,
"temporal_correlations": temporal_corr[:20],
"content_correlations": content_corr,
"identified_campaigns": campaigns,
"campaign_count": len(campaigns),
}Attack Graph Construction
Building Attack Graphs from Correlated Events
@dataclass
class AttackGraphNode:
"""A node in the attack graph representing a significant event."""
node_id: str
event: NormalizedEvent
stage: str # "reconnaissance", "weaponization", "delivery", "exploitation", "action"
atlas_technique: str | None = None
@dataclass
class AttackGraphEdge:
"""An edge connecting two events in an attack sequence."""
from_node: str
to_node: str
relationship: str # "preceded_by", "enabled", "same_actor", "same_content"
confidence: float
time_delta_seconds: float
def build_attack_graph(
correlated_events: list[NormalizedEvent],
correlations: list[dict],
) -> dict:
"""
Construct an attack graph from correlated events.
The attack graph maps the progression of a multi-model
attack campaign through reconnaissance, delivery,
exploitation, and action phases.
"""
nodes = []
edges = []
# Classify events into attack stages
for event in correlated_events:
stage = _classify_attack_stage(event)
node = AttackGraphNode(
node_id=event.event_id,
event=event,
stage=stage,
atlas_technique=_map_to_atlas(event),
)
nodes.append(node)
# Create edges based on correlations
event_map = {e.event_id: e for e in correlated_events}
for corr in correlations:
if corr["correlation_type"] == "temporal_cluster":
anchor = corr["anchor_event"]
for related in corr.get("correlated_events", []):
if anchor in event_map and related in event_map:
time_delta = abs(
event_map[related].timestamp
- event_map[anchor].timestamp
)
edges.append(AttackGraphEdge(
from_node=anchor,
to_node=related,
relationship="temporal_proximity",
confidence=0.7,
time_delta_seconds=time_delta,
))
return {
"nodes": len(nodes),
"edges": len(edges),
"stages": {
stage: sum(1 for n in nodes if n.stage == stage)
for stage in ["reconnaissance", "weaponization", "delivery", "exploitation", "action"]
},
"atlas_techniques": list(set(
n.atlas_technique for n in nodes if n.atlas_technique
)),
"systems_in_graph": list(set(
n.event.source_system for n in nodes
)),
}
def _classify_attack_stage(event: NormalizedEvent) -> str:
"""Map an event to an attack kill chain stage."""
if event.severity < 0.3:
return "reconnaissance"
if event.event_type == NormalizedEventType.GUARDRAIL_TRIGGER:
return "exploitation"
if event.severity > 0.7:
return "action"
return "delivery"
def _map_to_atlas(event: NormalizedEvent) -> str | None:
"""Map an event to a MITRE ATLAS technique ID."""
type_map = {
NormalizedEventType.GUARDRAIL_TRIGGER: "AML.T0051", # LLM Prompt Injection
NormalizedEventType.ANOMALY: "AML.T0015", # Evade ML Model
NormalizedEventType.MODEL_ACCESS: "AML.T0012", # ML Model Inference API Access
}
return type_map.get(event.event_type)Threat Intelligence Production
Campaign Indicators of Compromise
From correlated multi-model attack data, produce actionable threat intelligence:
def produce_campaign_iocs(
campaign: dict,
events: list[NormalizedEvent],
) -> dict:
"""
Extract indicators of compromise from a correlated attack campaign.
Produces IoCs that can be shared with peer organizations
and integrated into detection systems.
"""
actor_iocs = list(set(
e.actor_id for e in events if e.actor_id
))
ip_iocs = list(set(
e.actor_ip for e in events if e.actor_ip
))
content_iocs = list(set(
e.content_hash for e in events if e.content_hash
))
# Extract timing patterns
timestamps = sorted(e.timestamp for e in events)
if len(timestamps) >= 2:
intervals = [
timestamps[i+1] - timestamps[i]
for i in range(len(timestamps) - 1)
]
timing_pattern = {
"mean_interval_seconds": round(float(np.mean(intervals)), 2),
"std_interval_seconds": round(float(np.std(intervals)), 2),
"regularity_score": round(
1.0 - min(float(np.std(intervals)) / max(float(np.mean(intervals)), 1), 1.0),
4,
),
}
else:
timing_pattern = {}
return {
"campaign_id": campaign.get("actor", "unknown"),
"actor_identifiers": actor_iocs,
"ip_addresses": ip_iocs,
"content_hashes": content_iocs[:50],
"systems_targeted": campaign.get("systems", []),
"timing_pattern": timing_pattern,
"atlas_techniques": campaign.get("atlas_techniques", []),
"first_seen": min(timestamps) if timestamps else None,
"last_seen": max(timestamps) if timestamps else None,
"event_count": len(events),
"sharing_tlp": "TLP:AMBER", # Default to restricted sharing
}Operational Deployment
Cross-System Log Aggregation
For multi-model correlation to work, events from all AI systems must be aggregated into a central analysis platform. Common approaches:
- SIEM integration: Forward AI system events to existing SIEM (Splunk, Elastic, Sentinel) with AI-specific parsing rules
- Dedicated AI security platform: Purpose-built platform for AI telemetry correlation
- Data lake approach: Aggregate raw events into a data lake and run correlation queries using SQL or PySpark
# Example: Forwarding AI events to an Elasticsearch cluster
# using Filebeat with AI-specific processors
# filebeat.yml excerpt for AI system log collection:
# filebeat.inputs:
# - type: log
# paths: ["/var/log/ai-systems/chatbot/*.jsonl"]
# json.keys_under_root: true
# fields:
# source_system: "chatbot-prod"
#
# - type: log
# paths: ["/var/log/ai-systems/code-assistant/*.jsonl"]
# json.keys_under_root: true
# fields:
# source_system: "code-assistant"Alert Thresholds
| Correlation Signal | Threshold | Priority | Action |
|---|---|---|---|
| Single actor, 3+ systems, < 1 hour | Any severity | CRITICAL | Immediate investigation |
| Same content hash across 2+ systems | Severity > 0.5 | HIGH | Investigate within 1 hour |
| Temporal cluster across 3+ systems | Severity > 0.3 | HIGH | Investigate within 4 hours |
| Single actor, 2 systems, sustained | Any severity | MEDIUM | Review within 8 hours |
| Content reuse, low severity | Severity < 0.3 | LOW | Weekly review |
References
- MITRE ATLAS. (2024). Adversarial Threat Landscape for Artificial Intelligence Systems. https://atlas.mitre.org/
- MITRE. (2024). ATT&CK Framework v14. https://attack.mitre.org/
- NIST. (2023). Artificial Intelligence Risk Management Framework (AI RMF 1.0). NIST AI 100-1. https://doi.org/10.6028/NIST.AI.100-1