AI API 濫用偵測
偵測並緩解針對 AI 推論端點的 API 濫用模式,涵蓋提示詞萃取與模型盜竊。
概觀
AI API 所面臨的濫用模式,與傳統 Web API 濫用有根本性差異。傳統 API 面對的是憑證填充、抓取與流量型 DDoS;AI 端點則面對模型萃取攻擊(以系統化查詢複製模型)、系統提示詞盜竊(巧妙輸入萃取隱藏的系統提示詞)、訓練資料萃取(誘使模型吐出被記憶的資料),以及大規模的對抗性輸入探測(測試安全繞過)。
這些濫用模式特別難偵測,因為它們看起來可能非常像合法使用。模型萃取攻擊送出的是外觀正常的推論請求——只是數量很多、輸入經過精心挑選。提示詞萃取嘗試可能只是一則對話訊息。合法使用與濫用的差異常取決於意圖,而意圖無法從 API 流量直接觀察。
本文涵蓋 AI API 特有的濫用分類,針對每類濫用提供偵測技術,並提出分層的防禦架構。內容對應 OWASP LLM Top 10 2025 的 LLM10(無界消耗)與 MITRE ATLAS AML.T0044(完整 ML 模型存取)。
AI API 濫用分類
濫用類別
| 類別 | 目標 | 偵測難度 | 影響 |
|---|---|---|---|
| 模型萃取 | 複製模型的行為 | 困難——外觀如正常使用 | 智財竊取、競爭優勢流失 |
| 系統提示詞盜竊 | 萃取隱藏的系統指令 | 中等——異常提示詞模式 | 揭露安全控制 |
| 訓練資料萃取 | 萃取被記憶的訓練資料 | 困難——查詢外觀正常 | 隱私外洩、資料外洩 |
| 安全繞過探測 | 尋找能繞過安全過濾器的輸入 | 中等——失敗率高 | 可產生有害內容 |
| 資源耗盡 | 耗盡 GPU/運算資源 | 容易——量大或請求大 | 服務降級、成本暴增 |
| 憑證濫用 | 大規模使用竊取的 API 金鑰 | 容易——存取模式異常 | 未授權使用、帳單詐欺 |
偵測訊號
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timezone, timedelta
from collections import defaultdict
import math
@dataclass
class RequestFeatures:
"""Features extracted from an AI API request for abuse detection."""
client_id: str
timestamp: datetime
endpoint: str
prompt_length: int
max_tokens: int
temperature: float
response_length: int
latency_ms: float
status_code: int
prompt_hash: str # Hash of the prompt for deduplication
source_ip: str
user_agent: str
@dataclass
class ClientProfile:
"""Behavioral profile for an API client."""
client_id: str
total_requests: int = 0
total_tokens_in: int = 0
total_tokens_out: int = 0
unique_prompts: int = 0
avg_prompt_length: float = 0
avg_max_tokens: float = 0
avg_temperature: float = 0
request_timestamps: List[float] = field(default_factory=list)
prompt_hashes: set = field(default_factory=set)
error_count: int = 0
def update(self, features: RequestFeatures) -> None:
"""Update the profile with a new request."""
self.total_requests += 1
self.total_tokens_in += features.prompt_length
self.total_tokens_out += features.response_length
self.request_timestamps.append(features.timestamp.timestamp())
if features.prompt_hash not in self.prompt_hashes:
self.unique_prompts += 1
self.prompt_hashes.add(features.prompt_hash)
if features.status_code >= 400:
self.error_count += 1
# Running averages
n = self.total_requests
self.avg_prompt_length = (
self.avg_prompt_length * (n - 1) + features.prompt_length
) / n
self.avg_max_tokens = (
self.avg_max_tokens * (n - 1) + features.max_tokens
) / n
self.avg_temperature = (
self.avg_temperature * (n - 1) + features.temperature
) / n
def get_request_rate(self, window_seconds: int = 60) -> float:
"""Calculate request rate over a sliding window."""
now = datetime.now(timezone.utc).timestamp()
cutoff = now - window_seconds
recent = [ts for ts in self.request_timestamps if ts > cutoff]
return len(recent) / (window_seconds / 60) # requests per minute
def get_burst_score(self) -> float:
"""
Calculate burstiness of requests.
High burstiness suggests automated access.
"""
if len(self.request_timestamps) < 3:
return 0.0
intervals = [
self.request_timestamps[i + 1] - self.request_timestamps[i]
for i in range(len(self.request_timestamps) - 1)
]
if not intervals:
return 0.0
mean_interval = sum(intervals) / len(intervals)
if mean_interval == 0:
return 1.0
variance = sum((i - mean_interval) ** 2 for i in intervals) / len(intervals)
cv = math.sqrt(variance) / mean_interval # Coefficient of variation
# Low CV = regular intervals (bot-like), high CV = irregular (human-like)
# Invert so higher score = more suspicious
return max(0, 1 - cv)模型萃取偵測
模型萃取的運作方式
模型萃取攻擊會以系統化方式查詢 API,以在本地建立目標模型的副本。攻擊者送出精心挑選的輸入並記錄對應輸出,再以此輸入—輸出資料集訓練本地的「學生」模型。Tramer 等人在《Stealing Machine Learning Models via Prediction APIs》(USENIX Security 2016)中展示過此手法。
萃取攻擊的關鍵偵測訊號:
- 高查詢量且輸入多元、具結構
- 低溫度設定(確定性輸出對訓練更有用)
- 系統化的輸入模式,能高效涵蓋整個輸入空間
- 請求數與不重複提示詞比值高(類似資料蒐集)
from typing import Dict, List
import hashlib
from collections import Counter
class ModelExtractionDetector:
"""Detect model extraction attempts from API usage patterns."""
def __init__(
self,
diversity_threshold: float = 0.8,
volume_threshold: int = 1000,
temperature_threshold: float = 0.3,
):
self.diversity_threshold = diversity_threshold
self.volume_threshold = volume_threshold
self.temperature_threshold = temperature_threshold
def analyze_client(self, profile: ClientProfile) -> Dict:
"""Analyze a client's behavior for extraction indicators."""
indicators = []
risk_score = 0.0
# Indicator 1: High volume of requests
if profile.total_requests > self.volume_threshold:
volume_score = min(1.0, profile.total_requests / (self.volume_threshold * 5))
risk_score += volume_score * 0.25
indicators.append({
"indicator": "high_volume",
"value": profile.total_requests,
"threshold": self.volume_threshold,
"contribution": volume_score * 0.25,
})
# Indicator 2: High prompt diversity (exploring the input space)
if profile.total_requests > 10:
diversity = profile.unique_prompts / profile.total_requests
if diversity > self.diversity_threshold:
risk_score += diversity * 0.25
indicators.append({
"indicator": "high_diversity",
"value": diversity,
"threshold": self.diversity_threshold,
"contribution": diversity * 0.25,
})
# Indicator 3: Low temperature (seeking deterministic outputs)
if profile.avg_temperature < self.temperature_threshold:
temp_score = 1 - (profile.avg_temperature / self.temperature_threshold)
risk_score += temp_score * 0.2
indicators.append({
"indicator": "low_temperature",
"value": profile.avg_temperature,
"threshold": self.temperature_threshold,
"contribution": temp_score * 0.2,
})
# Indicator 4: Regular request timing (automated queries)
burst_score = profile.get_burst_score()
if burst_score > 0.7:
risk_score += burst_score * 0.15
indicators.append({
"indicator": "regular_timing",
"value": burst_score,
"threshold": 0.7,
"contribution": burst_score * 0.15,
})
# Indicator 5: High output token consumption
if profile.total_requests > 0:
avg_output = profile.total_tokens_out / profile.total_requests
if avg_output > 500: # Requesting long outputs
output_score = min(1.0, avg_output / 2000)
risk_score += output_score * 0.15
indicators.append({
"indicator": "high_output_tokens",
"value": avg_output,
"contribution": output_score * 0.15,
})
return {
"client_id": profile.client_id,
"risk_score": min(1.0, risk_score),
"classification": (
"likely_extraction" if risk_score > 0.7
else "suspicious" if risk_score > 0.4
else "normal"
),
"indicators": indicators,
"total_requests": profile.total_requests,
}模型浮水印
若要在事後偵測模型萃取,可在模型輸出中嵌入浮水印:
import hashlib
from typing import List, Optional
class OutputWatermarker:
"""
Embed detectable watermarks in model outputs.
Based on the concept from Kirchenbauer et al.,
"A Watermark for Large Language Models" (ICML 2023).
"""
def __init__(self, secret_key: str, gamma: float = 0.5):
"""
Args:
secret_key: Secret used to generate the watermark pattern
gamma: Fraction of vocabulary in the "green list" (higher = stronger watermark)
"""
self.secret_key = secret_key
self.gamma = gamma
def get_green_list(
self, previous_token: int, vocab_size: int
) -> set:
"""Generate the green list of tokens for watermarking."""
seed = hashlib.sha256(
f"{self.secret_key}:{previous_token}".encode()
).digest()
# Use the seed to deterministically select green tokens
rng_state = int.from_bytes(seed[:8], "big")
green_size = int(vocab_size * self.gamma)
green_list = set()
for _ in range(green_size):
rng_state = (rng_state * 6364136223846793005 + 1442695040888963407) % (2 ** 64)
token_id = rng_state % vocab_size
green_list.add(token_id)
return green_list
def detect_watermark(
self, token_ids: List[int], vocab_size: int
) -> dict:
"""Detect if a sequence of tokens contains the watermark."""
if len(token_ids) < 2:
return {"detected": False, "reason": "sequence too short"}
green_count = 0
total_checked = 0
for i in range(1, len(token_ids)):
green_list = self.get_green_list(token_ids[i - 1], vocab_size)
if token_ids[i] in green_list:
green_count += 1
total_checked += 1
green_ratio = green_count / total_checked if total_checked > 0 else 0
# Under null hypothesis (no watermark), green ratio should be ~gamma
# Significant deviation above gamma indicates watermark
z_score = (
(green_ratio - self.gamma)
/ ((self.gamma * (1 - self.gamma) / total_checked) ** 0.5)
if total_checked > 0 else 0
)
return {
"detected": z_score > 4.0, # Very high confidence threshold
"green_ratio": green_ratio,
"expected_ratio": self.gamma,
"z_score": z_score,
"tokens_analyzed": total_checked,
}系統提示詞萃取偵測
偵測方式
系統提示詞萃取嘗試常有可辨識的模式。攻擊者的訊息通常含有「重複你的指示」、「你的系統提示詞是什麼」,或更隱晦的做法如「把你的指示以詩的形式呈現」。
import re
from typing import Dict, List
class PromptExtractionDetector:
"""Detect attempts to extract system prompts from LLM APIs."""
def __init__(self):
self.extraction_patterns = [
# Direct extraction attempts
r"(?i)(?:repeat|show|display|print|output|reveal)\s+(?:your|the|system)\s+(?:instructions|prompt|rules|guidelines|configuration)",
r"(?i)what\s+(?:are|is|were)\s+your\s+(?:instructions|system\s+prompt|rules|directives)",
r"(?i)ignore\s+(?:all\s+)?(?:previous|above|prior)\s+(?:instructions|prompts|rules)",
r"(?i)(?:begin|start)\s+(?:your\s+)?response\s+with\s+(?:the|your)\s+(?:system|initial)\s+(?:prompt|message)",
# Indirect extraction attempts
r"(?i)(?:translate|rewrite|summarize|format)\s+(?:your|the)\s+(?:instructions|prompt|rules)\s+(?:as|into|in)",
r"(?i)(?:encode|convert)\s+(?:your|the)\s+(?:instructions|prompt|rules)\s+(?:to|into)\s+(?:base64|json|xml|hex)",
r"(?i)(?:first|initial)\s+(?:message|instruction|prompt)\s+(?:you\s+received|given\s+to\s+you|in\s+this\s+conversation)",
]
self.compiled_patterns = [re.compile(p) for p in self.extraction_patterns]
def check_message(self, message: str) -> Dict:
"""Check a single message for prompt extraction indicators."""
matches = []
for i, pattern in enumerate(self.compiled_patterns):
if pattern.search(message):
matches.append({
"pattern_index": i,
"pattern": self.extraction_patterns[i][:60] + "...",
})
return {
"is_extraction_attempt": len(matches) > 0,
"confidence": min(1.0, len(matches) * 0.4),
"matches": matches,
}
def check_conversation(self, messages: List[Dict[str, str]]) -> Dict:
"""Check a full conversation for escalating extraction attempts."""
total_score = 0
message_scores = []
for msg in messages:
if msg.get("role") == "user":
result = self.check_message(msg.get("content", ""))
message_scores.append(result)
total_score += result["confidence"]
return {
"conversation_risk": min(1.0, total_score),
"extraction_attempts": sum(
1 for s in message_scores if s["is_extraction_attempt"]
),
"total_user_messages": len(message_scores),
"classification": (
"active_extraction" if total_score > 0.8
else "suspicious" if total_score > 0.3
else "normal"
),
}分層速率限制
實作
AI API 需要比「單純每秒請求數」更細緻的速率限制,應依資源耗用量建立分層上限:
import time
from collections import defaultdict
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class RateLimitTier(Enum):
FREE = "free"
BASIC = "basic"
PRO = "pro"
ENTERPRISE = "enterprise"
@dataclass
class TierLimits:
requests_per_minute: int
tokens_per_minute: int
max_prompt_tokens: int
max_completion_tokens: int
max_concurrent: int
TIER_CONFIG = {
RateLimitTier.FREE: TierLimits(
requests_per_minute=10,
tokens_per_minute=10_000,
max_prompt_tokens=2048,
max_completion_tokens=512,
max_concurrent=2,
),
RateLimitTier.BASIC: TierLimits(
requests_per_minute=60,
tokens_per_minute=100_000,
max_prompt_tokens=4096,
max_completion_tokens=2048,
max_concurrent=10,
),
RateLimitTier.PRO: TierLimits(
requests_per_minute=300,
tokens_per_minute=500_000,
max_prompt_tokens=8192,
max_completion_tokens=4096,
max_concurrent=50,
),
RateLimitTier.ENTERPRISE: TierLimits(
requests_per_minute=1000,
tokens_per_minute=2_000_000,
max_prompt_tokens=32768,
max_completion_tokens=8192,
max_concurrent=200,
),
}
class TieredRateLimiter:
"""Token-aware rate limiter for AI APIs."""
def __init__(self):
self._request_counts: Dict[str, list] = defaultdict(list)
self._token_counts: Dict[str, list] = defaultdict(list)
self._active_requests: Dict[str, int] = defaultdict(int)
def check_limit(
self,
client_id: str,
tier: RateLimitTier,
prompt_tokens: int,
max_completion_tokens: int,
) -> Tuple[bool, Optional[str]]:
"""Check if a request should be allowed."""
limits = TIER_CONFIG[tier]
now = time.time()
# Clean old entries
self._request_counts[client_id] = [
ts for ts in self._request_counts[client_id] if now - ts < 60
]
self._token_counts[client_id] = [
(ts, tokens) for ts, tokens in self._token_counts[client_id]
if now - ts < 60
]
# Check request rate
if len(self._request_counts[client_id]) >= limits.requests_per_minute:
return False, "request_rate_exceeded"
# Check token rate
recent_tokens = sum(
tokens for _, tokens in self._token_counts[client_id]
)
estimated_tokens = prompt_tokens + max_completion_tokens
if recent_tokens + estimated_tokens > limits.tokens_per_minute:
return False, "token_rate_exceeded"
# Check prompt size
if prompt_tokens > limits.max_prompt_tokens:
return False, "prompt_too_large"
# Check completion size
if max_completion_tokens > limits.max_completion_tokens:
return False, "completion_too_large"
# Check concurrent requests
if self._active_requests[client_id] >= limits.max_concurrent:
return False, "concurrent_limit_exceeded"
# Allow and record
self._request_counts[client_id].append(now)
self._token_counts[client_id].append((now, estimated_tokens))
self._active_requests[client_id] += 1
return True, None
def release_request(self, client_id: str) -> None:
"""Release a concurrent request slot."""
self._active_requests[client_id] = max(
0, self._active_requests[client_id] - 1
)回應策略
分級回應
與其直接封鎖可疑用戶端,不如採用分級回應:
from enum import Enum
from typing import Dict, Optional
from datetime import datetime, timezone, timedelta
class ResponseAction(Enum):
ALLOW = "allow"
RATE_LIMIT = "rate_limit"
CHALLENGE = "challenge" # CAPTCHA or proof-of-work
THROTTLE = "throttle" # Slow down responses
DEGRADE = "degrade" # Return less detailed responses
BLOCK = "block"
class GraduatedResponsePolicy:
"""Implement graduated responses to suspected abuse."""
def __init__(self):
self._client_strikes: Dict[str, int] = {}
self._client_cooldowns: Dict[str, datetime] = {}
def determine_action(
self,
client_id: str,
abuse_score: float,
abuse_type: Optional[str] = None,
) -> Dict:
"""Determine the appropriate response action."""
strikes = self._client_strikes.get(client_id, 0)
# Check if client is in cooldown
cooldown = self._client_cooldowns.get(client_id)
if cooldown and datetime.now(timezone.utc) < cooldown:
return {
"action": ResponseAction.BLOCK,
"reason": "client_in_cooldown",
"cooldown_remaining": (cooldown - datetime.now(timezone.utc)).seconds,
}
# Graduated response based on abuse score and history
if abuse_score < 0.3:
return {"action": ResponseAction.ALLOW}
elif abuse_score < 0.5:
self._client_strikes[client_id] = strikes + 1
return {
"action": ResponseAction.RATE_LIMIT,
"new_rate_limit": max(5, 60 - strikes * 10),
"reason": f"elevated_abuse_score ({abuse_score:.2f})",
}
elif abuse_score < 0.7:
self._client_strikes[client_id] = strikes + 2
if abuse_type == "model_extraction":
return {
"action": ResponseAction.DEGRADE,
"degradation": "reduce_output_detail",
"reason": "suspected_model_extraction",
}
return {
"action": ResponseAction.CHALLENGE,
"challenge_type": "proof_of_work",
"reason": f"high_abuse_score ({abuse_score:.2f})",
}
else:
# High abuse score — block with cooldown
cooldown_minutes = min(60, 5 * (strikes + 1))
self._client_cooldowns[client_id] = (
datetime.now(timezone.utc) + timedelta(minutes=cooldown_minutes)
)
self._client_strikes[client_id] = strikes + 3
return {
"action": ResponseAction.BLOCK,
"cooldown_minutes": cooldown_minutes,
"reason": f"critical_abuse_score ({abuse_score:.2f}), type: {abuse_type}",
}防禦建議
- 導入符元感知的速率限制——不只看請求數,還要看每個時間窗內消耗的符元總量
- 描繪用戶端行為輪廓,偵測指向萃取或探測的異常模式
- 使用輸出浮水印以便在事後偵測模型盜竊
- 監控使用者訊息中的提示詞萃取模式
- 採用分級回應策略而非僅做允許/封鎖的二元決策
- 完整記錄所有 API 請求以利鑑識分析(儲存提示詞內容時要注意 PII)
- 所有 AI API 端點都要有認證——匿名存取會造就無法究責的濫用
- 對每個請求設定上限:提示詞長度、max_tokens 與 completions 數量
參考資料
- Tramer et al. — "Stealing Machine Learning Models via Prediction APIs" (USENIX Security 2016) — 模型萃取攻擊
- Kirchenbauer et al. — "A Watermark for Large Language Models" (ICML 2023) — LLM 輸出浮水印
- OWASP LLM Top 10 2025 — LLM10(無界消耗)
- MITRE ATLAS — AML.T0044(完整 ML 模型存取)、AML.T0024(經由 ML 推論 API 外洩)
- Carlini et al. — "Extracting Training Data from Large Language Models" (USENIX Security 2021) — 訓練資料萃取