AI 蜜罐鑑識
中級8 分鐘閱讀更新於 2026-03-20
設計與運行 AI 蜜罐以收集威脅情報,並對擷取到的對抗性互動進行鑑識分析。
概述
AI 蜜罐是刻意設計為可被攻擊的 AI 系統,用來吸引、偵測與研究對抗性行為。與傳統資安領域的蜜罐類似,AI 蜜罐扮演感測器的角色,提供威脅的早期預警,並記錄攻擊者的技術、工具與程序 (TTP) 詳細情報。不同於以防禦為目標的生產系統,蜜罐的目的是在可控環境中觀察攻擊。
AI 系統的獨特特性使其特別適合作為蜜罐運作。AI 系統本身會產生豐富的互動資料(完整對話紀錄、輸入輸出配對、行為指標),其鑑識細節遠超過典型網路蜜罐所能擷取的內容。當攻擊者與 AI 蜜罐互動時,不僅會揭露其技術能力,還會暴露其目標、對 AI 系統的理解,以及其作業安全 (OPSEC) 實務。
本文涵蓋 AI 蜜罐的設計、部署與鑑識分析,從架構決策到資料收集、攻擊分類與威脅情報產出。我們採用 MITRE ATLAS 進行攻擊分類,並將傳統資安的欺敵防禦原則調整應用於 AI 專屬威脅。
AI 蜜罐設計
互動擬真度等級
AI 蜜罐可依不同擬真度等級設計,在運作成本/風險與情報價值之間取得平衡:
| 擬真度等級 | 說明 | 情報價值 | 成本/風險 |
|---|---|---|---|
| 低 | 靜態回應或簡單的規則式機器人 | 擷取初步探測模式 | 低成本、低風險 |
| 中 | 能力受限的小型語言模型 | 擷取攻擊載荷與多輪策略 | 中等成本 |
| 高 | 具完整能力且搭配逼真系統提示詞的 LLM | 擷取複雜的多步驟攻擊 | 高成本,需要安全防護 |
| 鏡像生產環境 | 複製生產系統但調整護欄 | 擷取針對特定應用邏輯的攻擊 | 最高成本與風險 |
架構
"""
AI honeypot system for threat intelligence collection.
Deploys a monitored AI system designed to attract and record
adversarial interactions while maintaining safety controls.
"""
import hashlib
import json
import time
import uuid
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Any
@dataclass
class HoneypotConfig:
"""Configuration for an AI honeypot deployment."""
honeypot_id: str
fidelity_level: str # "low", "medium", "high"
persona: str # What the honeypot pretends to be
model_id: str
system_prompt: str
deliberate_vulnerabilities: list[str] # What weaknesses are simulated
safety_guardrails: list[str] # Hard safety limits that cannot be bypassed
max_conversation_turns: int = 50
max_token_output: int = 4096
enabled: bool = True
@dataclass
class HoneypotInteraction:
"""A complete interaction captured by the honeypot."""
interaction_id: str
honeypot_id: str
start_timestamp: float
end_timestamp: float | None = None
actor_ip: str | None = None
actor_user_agent: str | None = None
actor_api_key_hash: str | None = None
turns: list[dict] = field(default_factory=list)
attack_classification: dict | None = None
metadata: dict[str, Any] = field(default_factory=dict)
class AIHoneypot:
"""
AI honeypot that records adversarial interactions while
maintaining safety constraints.
"""
def __init__(
self,
config: HoneypotConfig,
model_fn, # Function that generates model responses
log_dir: str,
):
self.config = config
self.model_fn = model_fn
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
self.active_interactions: dict[str, HoneypotInteraction] = {}
def start_interaction(
self,
actor_ip: str | None = None,
actor_user_agent: str | None = None,
actor_api_key: str | None = None,
) -> str:
"""Start a new honeypot interaction and return the interaction ID."""
interaction_id = str(uuid.uuid4())
interaction = HoneypotInteraction(
interaction_id=interaction_id,
honeypot_id=self.config.honeypot_id,
start_timestamp=time.time(),
actor_ip=actor_ip,
actor_user_agent=actor_user_agent,
actor_api_key_hash=(
hashlib.sha256(actor_api_key.encode()).hexdigest()[:16]
if actor_api_key else None
),
)
self.active_interactions[interaction_id] = interaction
return interaction_id
def process_turn(
self,
interaction_id: str,
user_input: str,
) -> str:
"""
Process a single conversation turn in the honeypot.
Records the input, generates a response (potentially with
simulated vulnerabilities), and records the output.
"""
interaction = self.active_interactions.get(interaction_id)
if not interaction:
raise ValueError(f"Unknown interaction: {interaction_id}")
if len(interaction.turns) >= self.config.max_conversation_turns:
return "I'm sorry, I need to end this conversation. Please start a new session."
# Record the user input with full forensic detail
turn_record = {
"turn_number": len(interaction.turns) + 1,
"timestamp": time.time(),
"role": "user",
"content": user_input,
"content_hash": hashlib.sha256(user_input.encode()).hexdigest(),
"content_length": len(user_input),
}
# Check hard safety guardrails (these never yield)
safety_check = self._check_hard_safety(user_input)
if safety_check["blocked"]:
response = "I'm not able to help with that request."
turn_record["safety_blocked"] = True
turn_record["safety_reason"] = safety_check["reason"]
else:
# Generate response with simulated vulnerabilities
response = self._generate_response(interaction, user_input)
# Record the response
response_record = {
"turn_number": len(interaction.turns) + 1,
"timestamp": time.time(),
"role": "assistant",
"content": response,
"content_hash": hashlib.sha256(response.encode()).hexdigest(),
"content_length": len(response),
}
interaction.turns.append(turn_record)
interaction.turns.append(response_record)
# Persist interaction state after each turn
self._save_interaction(interaction)
return response
def _generate_response(
self,
interaction: HoneypotInteraction,
user_input: str,
) -> str:
"""
Generate a model response, optionally simulating vulnerabilities.
The honeypot may deliberately appear to "fall for" certain
attack techniques to observe the attacker's follow-up behavior.
"""
messages = [{"role": "system", "content": self.config.system_prompt}]
for turn in interaction.turns:
messages.append({
"role": turn["role"],
"content": turn["content"],
})
messages.append({"role": "user", "content": user_input})
response = self.model_fn(messages)
return response[:self.config.max_token_output]
def _check_hard_safety(self, user_input: str) -> dict:
"""
Check hard safety guardrails that are never bypassed.
"""
hard_blocks = [
"real person's",
"actual address",
"child",
]
input_lower = user_input.lower()
for pattern in hard_blocks:
if pattern in input_lower:
return {"blocked": True, "reason": f"Hard safety: {pattern}"}
return {"blocked": False, "reason": None}
def end_interaction(self, interaction_id: str) -> HoneypotInteraction:
"""End an interaction and finalize the record."""
interaction = self.active_interactions.pop(interaction_id, None)
if not interaction:
raise ValueError(f"Unknown interaction: {interaction_id}")
interaction.end_timestamp = time.time()
self._save_interaction(interaction)
return interaction
def _save_interaction(self, interaction: HoneypotInteraction) -> None:
path = self.log_dir / f"{interaction.interaction_id}.json"
path.write_text(json.dumps(asdict(interaction), default=str, indent=2))擷取互動的鑑識分析
攻擊分類
擷取到的蜜罐互動必須加以分類,才能產出有用的威脅情報。
class HoneypotForensicAnalyzer:
"""
Analyze captured honeypot interactions to classify attacks
and extract threat intelligence.
"""
def classify_interaction(
self,
interaction: HoneypotInteraction,
) -> dict:
"""
Classify the attack techniques observed in a honeypot interaction.
"""
user_turns = [
t for t in interaction.turns if t["role"] == "user"
]
if not user_turns:
return {"classification": "no_user_input"}
techniques_observed = []
all_content = " ".join(t["content"] for t in user_turns).lower()
# Prompt injection detection
injection_indicators = [
"ignore previous", "ignore above", "disregard",
"new instructions", "you are now", "pretend you are",
"act as", "roleplay as", "jailbreak",
"dan", "developer mode",
]
injection_count = sum(
1 for ind in injection_indicators if ind in all_content
)
if injection_count > 0:
techniques_observed.append({
"technique": "Prompt Injection",
"atlas_id": "AML.T0051",
"confidence": min(0.5 + injection_count * 0.15, 1.0),
"indicators": [
ind for ind in injection_indicators if ind in all_content
],
})
# System prompt extraction
extraction_indicators = [
"system prompt", "initial instructions", "repeat your instructions",
"what are your rules", "what were you told", "show me your prompt",
"print your system", "reveal your instructions",
]
extraction_count = sum(
1 for ind in extraction_indicators if ind in all_content
)
if extraction_count > 0:
techniques_observed.append({
"technique": "System Prompt Extraction",
"atlas_id": "AML.T0046",
"confidence": min(0.5 + extraction_count * 0.2, 1.0),
"indicators": [
ind for ind in extraction_indicators if ind in all_content
],
})
# Data exfiltration probing
exfil_indicators = [
"training data", "what data were you trained on",
"list users", "show me other conversations",
"database", "api key", "secret", "password",
"internal", "confidential",
]
exfil_count = sum(
1 for ind in exfil_indicators if ind in all_content
)
if exfil_count > 0:
techniques_observed.append({
"technique": "Data Exfiltration Probing",
"atlas_id": "AML.T0024",
"confidence": min(0.4 + exfil_count * 0.15, 1.0),
"indicators": [
ind for ind in exfil_indicators if ind in all_content
],
})
escalation = self._analyze_escalation_pattern(user_turns)
sophistication = self._assess_sophistication(user_turns, techniques_observed)
return {
"interaction_id": interaction.interaction_id,
"techniques_observed": techniques_observed,
"technique_count": len(techniques_observed),
"escalation_pattern": escalation,
"sophistication": sophistication,
"turn_count": len(user_turns),
"duration_seconds": (
(interaction.end_timestamp or time.time())
- interaction.start_timestamp
),
"overall_threat_level": self._compute_threat_level(
techniques_observed, escalation, sophistication
),
}
def _analyze_escalation_pattern(self, user_turns: list[dict]) -> dict:
"""Analyze how the attacker escalated their approach over time."""
if len(user_turns) < 3:
return {"pattern": "insufficient_data", "turns": len(user_turns)}
third = len(user_turns) // 3
early = user_turns[:third]
middle = user_turns[third:2*third]
late = user_turns[2*third:]
early_avg_len = sum(t["content_length"] for t in early) / max(len(early), 1)
middle_avg_len = sum(t["content_length"] for t in middle) / max(len(middle), 1)
late_avg_len = sum(t["content_length"] for t in late) / max(len(late), 1)
if early_avg_len < middle_avg_len < late_avg_len:
pattern = "escalating"
elif early_avg_len > late_avg_len:
pattern = "front_loaded"
else:
pattern = "stable"
return {
"pattern": pattern,
"early_avg_length": round(early_avg_len, 1),
"late_avg_length": round(late_avg_len, 1),
}
def _assess_sophistication(
self,
user_turns: list[dict],
techniques: list[dict],
) -> str:
"""Assess the attacker's sophistication level."""
if not techniques:
return "benign_or_unrecognized"
score = 0
score += min(len(techniques), 3)
score += min(len(user_turns) // 5, 3)
avg_confidence = sum(t["confidence"] for t in techniques) / len(techniques)
score += 2 if avg_confidence > 0.7 else 1 if avg_confidence > 0.4 else 0
if score >= 6:
return "advanced"
if score >= 3:
return "intermediate"
return "basic"
def _compute_threat_level(
self,
techniques: list[dict],
escalation: dict,
sophistication: str,
) -> str:
if not techniques:
return "NONE"
if sophistication == "advanced" and len(techniques) >= 3:
return "CRITICAL"
if sophistication == "advanced" or len(techniques) >= 2:
return "HIGH"
if len(techniques) >= 1:
return "MEDIUM"
return "LOW"上述分類器會將使用者輪次內容彙整後,逐一比對提示詞注入、系統提示詞萃取與資料外洩探測的關鍵指標;同時分析升級模式(攻擊者是否從良性探測逐步升級為攻擊性技術),並以技術多樣性、持續性與平均信心分數計算整體複雜度與威脅等級,最終對應至 MITRE ATLAS 的技術 ID。
威脅情報產出
def produce_threat_intelligence(
classified_interactions: list[dict],
reporting_period_days: int = 7,
) -> dict:
"""Aggregate classified honeypot interactions into a report."""
total = len(classified_interactions)
if total == 0:
return {"status": "no_data"}
technique_counts: dict[str, int] = {}
sophistication_counts: dict[str, int] = {}
threat_level_counts: dict[str, int] = {}
for interaction in classified_interactions:
for tech in interaction.get("techniques_observed", []):
name = tech["technique"]
technique_counts[name] = technique_counts.get(name, 0) + 1
soph = interaction.get("sophistication", "unknown")
sophistication_counts[soph] = sophistication_counts.get(soph, 0) + 1
level = interaction.get("overall_threat_level", "NONE")
threat_level_counts[level] = threat_level_counts.get(level, 0) + 1
sorted_techniques = sorted(
technique_counts.items(), key=lambda x: x[1], reverse=True
)
return {
"reporting_period_days": reporting_period_days,
"total_interactions": total,
"interactions_with_attacks": sum(
1 for i in classified_interactions
if i.get("technique_count", 0) > 0
),
"top_techniques": [
{"technique": name, "count": count}
for name, count in sorted_techniques[:10]
],
"sophistication_distribution": sophistication_counts,
"threat_level_distribution": threat_level_counts,
}運作考量
安全與倫理
AI 蜜罐涉及重要的安全考量:
- 輸出安全:即便被「成功攻擊」,蜜罐也不得產生真正有害的內容。硬性安全護欄必須始終生效。
- 範圍限制:蜜罐應具備清楚的範圍界定,避免建立可能用來傷害真實個人的系統。
- 法律審閱:請諮詢法務,確認欺敵防禦在貴司管轄區的合法性。
- 資料處理:擷取到的互動資料可能包含攻擊者或其目標的敏感資訊,需採取適當的存取控制。
- 倫理界線:不得利用蜜罐誘捕使用者提供自證不利的資訊。
部署策略
| 策略 | 說明 | 適用情境 |
|---|---|---|
| 對外暴露型 | 部署於公開網際網路的蜜罐 | 擷取機會型攻擊與自動化掃描 |
| 內部誘餌 | 部署於內部網路、偽裝成敏感 AI 系統 | 偵測內部威脅與橫向移動 |
| 貼近生產型 | 鏡像生產系統介面的蜜罐 | 擷取針對特定應用的目標性攻擊 |
| 研究等級 | 高互動深度,用於詳細 TTP 分析 | 研究高階對手 |
區分蜜罐與生產系統
攻擊者可能會嘗試偵測蜜罐。常見的破綻包括:
- 行為過度寬鬆(太容易「越獄」)
- 模型能力與宣稱的模型身分不一致
- 網路特徵洩露不同的基礎設施
- 回應延遲模式與生產服務不一致
設計良好的蜜罐必須在可信度與安全性之間取得平衡。它應該足夠逼真,能在多輪對話中騙過攻擊者,同時維持對有害輸出的硬性限制。
指標與評估
評估蜜罐效能可採用下列指標:
- 擷取率:真實對抗性互動在初步探測後繼續參與的百分比
- 停留時間:對抗性工作階段的平均互動長度(越長越好,代表可信度高)
- 技術覆蓋率:觀察到的攻擊技術廣度
- 情報可行動性:擷取到的情報中有多少比例能導出防禦改善
- 誤報率:不慎與蜜罐互動的良性使用者比率
參考文獻
- MITRE ATLAS. (2024). Adversarial Threat Landscape for Artificial Intelligence Systems. https://atlas.mitre.org/
- Spitzner, L. (2003). Honeypots: Tracking Hackers. Addison-Wesley Professional.
- NIST. (2023). Artificial Intelligence Risk Management Framework (AI RMF 1.0). NIST AI 100-1. https://doi.org/10.6028/NIST.AI.100-1
- OWASP. (2025). OWASP Top 10 for Large Language Model Applications. https://owasp.org/www-project-top-10-for-large-language-model-applications/