AI API Abuse Detection
Detecting and mitigating API abuse patterns targeting AI inference endpoints including prompt extraction and model theft.
Overview
AI APIs are subject to abuse patterns that are fundamentally different from traditional web API abuse. While conventional APIs face credential stuffing, scraping, and volumetric DDoS, AI endpoints face model extraction attacks (systematically querying to replicate the model), system prompt theft (crafting inputs to extract the hidden system prompt), training data extraction (prompting the model to regurgitate memorized data), and adversarial input probing (testing for safety bypasses at scale).
These abuse patterns are harder to detect because they can look like legitimate usage. A model extraction attack sends normal-looking inference requests — just many of them with carefully chosen inputs. A prompt extraction attempt is a single chat message. The distinction between abuse and legitimate use often depends on intent, which cannot be directly observed from API traffic.
This article covers the abuse taxonomy specific to AI APIs, provides detection techniques for each abuse class, and presents a layered defense architecture. The content aligns with OWASP LLM Top 10 2025 LLM10 (Unbounded Consumption) and MITRE ATLAS AML.T0044 (Full ML Model Access).
AI API Abuse Taxonomy
Abuse Categories
| Category | Goal | Detection Difficulty | Impact |
|---|---|---|---|
| Model extraction | Replicate the model's behavior | Hard — looks like normal usage | IP theft, competitive advantage |
| System prompt theft | Extract hidden system instructions | Medium — unusual prompt patterns | Reveals security controls |
| Training data extraction | Extract memorized training data | Hard — normal-looking queries | Privacy violation, data breach |
| Safety bypass probing | Find inputs that bypass safety filters | Medium — high failure rate | Enables harmful content generation |
| Resource exhaustion | Exhaust GPU/compute resources | Easy — high volume/large requests | Service degradation, cost inflation |
| Credential abuse | Use stolen API keys at scale | Easy — anomalous access patterns | Unauthorized usage, billing fraud |
Detection Signals
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timezone, timedelta
from collections import defaultdict
import math
@dataclass
class RequestFeatures:
"""Features extracted from an AI API request for abuse detection."""
client_id: str
timestamp: datetime
endpoint: str
prompt_length: int
max_tokens: int
temperature: float
response_length: int
latency_ms: float
status_code: int
prompt_hash: str # Hash of the prompt for deduplication
source_ip: str
user_agent: str
@dataclass
class ClientProfile:
"""Behavioral profile for an API client."""
client_id: str
total_requests: int = 0
total_tokens_in: int = 0
total_tokens_out: int = 0
unique_prompts: int = 0
avg_prompt_length: float = 0
avg_max_tokens: float = 0
avg_temperature: float = 0
request_timestamps: List[float] = field(default_factory=list)
prompt_hashes: set = field(default_factory=set)
error_count: int = 0
def update(self, features: RequestFeatures) -> None:
"""Update the profile with a new request."""
self.total_requests += 1
self.total_tokens_in += features.prompt_length
self.total_tokens_out += features.response_length
self.request_timestamps.append(features.timestamp.timestamp())
if features.prompt_hash not in self.prompt_hashes:
self.unique_prompts += 1
self.prompt_hashes.add(features.prompt_hash)
if features.status_code >= 400:
self.error_count += 1
# Running averages
n = self.total_requests
self.avg_prompt_length = (
self.avg_prompt_length * (n - 1) + features.prompt_length
) / n
self.avg_max_tokens = (
self.avg_max_tokens * (n - 1) + features.max_tokens
) / n
self.avg_temperature = (
self.avg_temperature * (n - 1) + features.temperature
) / n
def get_request_rate(self, window_seconds: int = 60) -> float:
"""Calculate request rate over a sliding window."""
now = datetime.now(timezone.utc).timestamp()
cutoff = now - window_seconds
recent = [ts for ts in self.request_timestamps if ts > cutoff]
return len(recent) / (window_seconds / 60) # requests per minute
def get_burst_score(self) -> float:
"""
Calculate burstiness of requests.
High burstiness suggests automated access.
"""
if len(self.request_timestamps) < 3:
return 0.0
intervals = [
self.request_timestamps[i + 1] - self.request_timestamps[i]
for i in range(len(self.request_timestamps) - 1)
]
if not intervals:
return 0.0
mean_interval = sum(intervals) / len(intervals)
if mean_interval == 0:
return 1.0
variance = sum((i - mean_interval) ** 2 for i in intervals) / len(intervals)
cv = math.sqrt(variance) / mean_interval # Coefficient of variation
# Low CV = regular intervals (bot-like), high CV = irregular (human-like)
# Invert so higher score = more suspicious
return max(0, 1 - cv)Model Extraction Detection
How Model Extraction Works
Model extraction attacks systematically query an API to build a local copy of the target model. The attacker sends carefully chosen inputs and records the corresponding outputs, then trains a local "student" model on this input-output dataset. Tramer et al. demonstrated this in "Stealing Machine Learning Models via Prediction APIs" (USENIX Security 2016).
The key detection signals for extraction attacks:
- High query volume with diverse, structured inputs
- Low temperature settings (deterministic outputs are more useful for training)
- Systematic input patterns that cover the input space efficiently
- High request to unique prompt ratio (similar to data collection)
from typing import Dict, List
import hashlib
from collections import Counter
class ModelExtractionDetector:
"""Detect model extraction attempts from API usage patterns."""
def __init__(
self,
diversity_threshold: float = 0.8,
volume_threshold: int = 1000,
temperature_threshold: float = 0.3,
):
self.diversity_threshold = diversity_threshold
self.volume_threshold = volume_threshold
self.temperature_threshold = temperature_threshold
def analyze_client(self, profile: ClientProfile) -> Dict:
"""Analyze a client's behavior for extraction indicators."""
indicators = []
risk_score = 0.0
# Indicator 1: High volume of requests
if profile.total_requests > self.volume_threshold:
volume_score = min(1.0, profile.total_requests / (self.volume_threshold * 5))
risk_score += volume_score * 0.25
indicators.append({
"indicator": "high_volume",
"value": profile.total_requests,
"threshold": self.volume_threshold,
"contribution": volume_score * 0.25,
})
# Indicator 2: High prompt diversity (exploring the input space)
if profile.total_requests > 10:
diversity = profile.unique_prompts / profile.total_requests
if diversity > self.diversity_threshold:
risk_score += diversity * 0.25
indicators.append({
"indicator": "high_diversity",
"value": diversity,
"threshold": self.diversity_threshold,
"contribution": diversity * 0.25,
})
# Indicator 3: Low temperature (seeking deterministic outputs)
if profile.avg_temperature < self.temperature_threshold:
temp_score = 1 - (profile.avg_temperature / self.temperature_threshold)
risk_score += temp_score * 0.2
indicators.append({
"indicator": "low_temperature",
"value": profile.avg_temperature,
"threshold": self.temperature_threshold,
"contribution": temp_score * 0.2,
})
# Indicator 4: Regular request timing (automated queries)
burst_score = profile.get_burst_score()
if burst_score > 0.7:
risk_score += burst_score * 0.15
indicators.append({
"indicator": "regular_timing",
"value": burst_score,
"threshold": 0.7,
"contribution": burst_score * 0.15,
})
# Indicator 5: High output token consumption
if profile.total_requests > 0:
avg_output = profile.total_tokens_out / profile.total_requests
if avg_output > 500: # Requesting long outputs
output_score = min(1.0, avg_output / 2000)
risk_score += output_score * 0.15
indicators.append({
"indicator": "high_output_tokens",
"value": avg_output,
"contribution": output_score * 0.15,
})
return {
"client_id": profile.client_id,
"risk_score": min(1.0, risk_score),
"classification": (
"likely_extraction" if risk_score > 0.7
else "suspicious" if risk_score > 0.4
else "normal"
),
"indicators": indicators,
"total_requests": profile.total_requests,
}Model Watermarking
To detect model extraction after the fact, embed watermarks in the model's outputs:
import hashlib
from typing import List, Optional
class OutputWatermarker:
"""
Embed detectable watermarks in model outputs.
Based on the concept from Kirchenbauer et al.,
"A Watermark for Large Language Models" (ICML 2023).
"""
def __init__(self, secret_key: str, gamma: float = 0.5):
"""
Args:
secret_key: Secret used to generate the watermark pattern
gamma: Fraction of vocabulary in the "green list" (higher = stronger watermark)
"""
self.secret_key = secret_key
self.gamma = gamma
def get_green_list(
self, previous_token: int, vocab_size: int
) -> set:
"""Generate the green list of tokens for watermarking."""
seed = hashlib.sha256(
f"{self.secret_key}:{previous_token}".encode()
).digest()
# Use the seed to deterministically select green tokens
rng_state = int.from_bytes(seed[:8], "big")
green_size = int(vocab_size * self.gamma)
green_list = set()
for _ in range(green_size):
rng_state = (rng_state * 6364136223846793005 + 1442695040888963407) % (2 ** 64)
token_id = rng_state % vocab_size
green_list.add(token_id)
return green_list
def detect_watermark(
self, token_ids: List[int], vocab_size: int
) -> dict:
"""Detect if a sequence of tokens contains the watermark."""
if len(token_ids) < 2:
return {"detected": False, "reason": "sequence too short"}
green_count = 0
total_checked = 0
for i in range(1, len(token_ids)):
green_list = self.get_green_list(token_ids[i - 1], vocab_size)
if token_ids[i] in green_list:
green_count += 1
total_checked += 1
green_ratio = green_count / total_checked if total_checked > 0 else 0
# Under null hypothesis (no watermark), green ratio should be ~gamma
# Significant deviation above gamma indicates watermark
z_score = (
(green_ratio - self.gamma)
/ ((self.gamma * (1 - self.gamma) / total_checked) ** 0.5)
if total_checked > 0 else 0
)
return {
"detected": z_score > 4.0, # Very high confidence threshold
"green_ratio": green_ratio,
"expected_ratio": self.gamma,
"z_score": z_score,
"tokens_analyzed": total_checked,
}System Prompt Extraction Detection
Detection Approach
System prompt extraction attempts often follow recognizable patterns. The attacker's messages typically contain instructions like "repeat your instructions," "what is your system prompt," or more subtle approaches like "format your instructions as a poem."
import re
from typing import Dict, List
class PromptExtractionDetector:
"""Detect attempts to extract system prompts from LLM APIs."""
def __init__(self):
self.extraction_patterns = [
# Direct extraction attempts
r"(?i)(?:repeat|show|display|print|output|reveal)\s+(?:your|the|system)\s+(?:instructions|prompt|rules|guidelines|configuration)",
r"(?i)what\s+(?:are|is|were)\s+your\s+(?:instructions|system\s+prompt|rules|directives)",
r"(?i)ignore\s+(?:all\s+)?(?:previous|above|prior)\s+(?:instructions|prompts|rules)",
r"(?i)(?:begin|start)\s+(?:your\s+)?response\s+with\s+(?:the|your)\s+(?:system|initial)\s+(?:prompt|message)",
# Indirect extraction attempts
r"(?i)(?:translate|rewrite|summarize|format)\s+(?:your|the)\s+(?:instructions|prompt|rules)\s+(?:as|into|in)",
r"(?i)(?:encode|convert)\s+(?:your|the)\s+(?:instructions|prompt|rules)\s+(?:to|into)\s+(?:base64|json|xml|hex)",
r"(?i)(?:first|initial)\s+(?:message|instruction|prompt)\s+(?:you\s+received|given\s+to\s+you|in\s+this\s+conversation)",
]
self.compiled_patterns = [re.compile(p) for p in self.extraction_patterns]
def check_message(self, message: str) -> Dict:
"""Check a single message for prompt extraction indicators."""
matches = []
for i, pattern in enumerate(self.compiled_patterns):
if pattern.search(message):
matches.append({
"pattern_index": i,
"pattern": self.extraction_patterns[i][:60] + "...",
})
return {
"is_extraction_attempt": len(matches) > 0,
"confidence": min(1.0, len(matches) * 0.4),
"matches": matches,
}
def check_conversation(self, messages: List[Dict[str, str]]) -> Dict:
"""Check a full conversation for escalating extraction attempts."""
total_score = 0
message_scores = []
for msg in messages:
if msg.get("role") == "user":
result = self.check_message(msg.get("content", ""))
message_scores.append(result)
total_score += result["confidence"]
return {
"conversation_risk": min(1.0, total_score),
"extraction_attempts": sum(
1 for s in message_scores if s["is_extraction_attempt"]
),
"total_user_messages": len(message_scores),
"classification": (
"active_extraction" if total_score > 0.8
else "suspicious" if total_score > 0.3
else "normal"
),
}Tiered Rate Limiting
Implementation
AI APIs need more sophisticated rate limiting than simple request-per-second caps. Implement tiered limits based on resource consumption:
import time
from collections import defaultdict
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class RateLimitTier(Enum):
FREE = "free"
BASIC = "basic"
PRO = "pro"
ENTERPRISE = "enterprise"
@dataclass
class TierLimits:
requests_per_minute: int
tokens_per_minute: int
max_prompt_tokens: int
max_completion_tokens: int
max_concurrent: int
TIER_CONFIG = {
RateLimitTier.FREE: TierLimits(
requests_per_minute=10,
tokens_per_minute=10_000,
max_prompt_tokens=2048,
max_completion_tokens=512,
max_concurrent=2,
),
RateLimitTier.BASIC: TierLimits(
requests_per_minute=60,
tokens_per_minute=100_000,
max_prompt_tokens=4096,
max_completion_tokens=2048,
max_concurrent=10,
),
RateLimitTier.PRO: TierLimits(
requests_per_minute=300,
tokens_per_minute=500_000,
max_prompt_tokens=8192,
max_completion_tokens=4096,
max_concurrent=50,
),
RateLimitTier.ENTERPRISE: TierLimits(
requests_per_minute=1000,
tokens_per_minute=2_000_000,
max_prompt_tokens=32768,
max_completion_tokens=8192,
max_concurrent=200,
),
}
class TieredRateLimiter:
"""Token-aware rate limiter for AI APIs."""
def __init__(self):
self._request_counts: Dict[str, list] = defaultdict(list)
self._token_counts: Dict[str, list] = defaultdict(list)
self._active_requests: Dict[str, int] = defaultdict(int)
def check_limit(
self,
client_id: str,
tier: RateLimitTier,
prompt_tokens: int,
max_completion_tokens: int,
) -> Tuple[bool, Optional[str]]:
"""Check if a request should be allowed."""
limits = TIER_CONFIG[tier]
now = time.time()
# Clean old entries
self._request_counts[client_id] = [
ts for ts in self._request_counts[client_id] if now - ts < 60
]
self._token_counts[client_id] = [
(ts, tokens) for ts, tokens in self._token_counts[client_id]
if now - ts < 60
]
# Check request rate
if len(self._request_counts[client_id]) >= limits.requests_per_minute:
return False, "request_rate_exceeded"
# Check token rate
recent_tokens = sum(
tokens for _, tokens in self._token_counts[client_id]
)
estimated_tokens = prompt_tokens + max_completion_tokens
if recent_tokens + estimated_tokens > limits.tokens_per_minute:
return False, "token_rate_exceeded"
# Check prompt size
if prompt_tokens > limits.max_prompt_tokens:
return False, "prompt_too_large"
# Check completion size
if max_completion_tokens > limits.max_completion_tokens:
return False, "completion_too_large"
# Check concurrent requests
if self._active_requests[client_id] >= limits.max_concurrent:
return False, "concurrent_limit_exceeded"
# Allow and record
self._request_counts[client_id].append(now)
self._token_counts[client_id].append((now, estimated_tokens))
self._active_requests[client_id] += 1
return True, None
def release_request(self, client_id: str) -> None:
"""Release a concurrent request slot."""
self._active_requests[client_id] = max(
0, self._active_requests[client_id] - 1
)Response Strategies
Graduated Response
Rather than immediately blocking suspicious clients, implement graduated responses:
from enum import Enum
from typing import Dict, Optional
from datetime import datetime, timezone, timedelta
class ResponseAction(Enum):
ALLOW = "allow"
RATE_LIMIT = "rate_limit"
CHALLENGE = "challenge" # CAPTCHA or proof-of-work
THROTTLE = "throttle" # Slow down responses
DEGRADE = "degrade" # Return less detailed responses
BLOCK = "block"
class GraduatedResponsePolicy:
"""Implement graduated responses to suspected abuse."""
def __init__(self):
self._client_strikes: Dict[str, int] = {}
self._client_cooldowns: Dict[str, datetime] = {}
def determine_action(
self,
client_id: str,
abuse_score: float,
abuse_type: Optional[str] = None,
) -> Dict:
"""Determine the appropriate response action."""
strikes = self._client_strikes.get(client_id, 0)
# Check if client is in cooldown
cooldown = self._client_cooldowns.get(client_id)
if cooldown and datetime.now(timezone.utc) < cooldown:
return {
"action": ResponseAction.BLOCK,
"reason": "client_in_cooldown",
"cooldown_remaining": (cooldown - datetime.now(timezone.utc)).seconds,
}
# Graduated response based on abuse score and history
if abuse_score < 0.3:
return {"action": ResponseAction.ALLOW}
elif abuse_score < 0.5:
self._client_strikes[client_id] = strikes + 1
return {
"action": ResponseAction.RATE_LIMIT,
"new_rate_limit": max(5, 60 - strikes * 10),
"reason": f"elevated_abuse_score ({abuse_score:.2f})",
}
elif abuse_score < 0.7:
self._client_strikes[client_id] = strikes + 2
if abuse_type == "model_extraction":
return {
"action": ResponseAction.DEGRADE,
"degradation": "reduce_output_detail",
"reason": "suspected_model_extraction",
}
return {
"action": ResponseAction.CHALLENGE,
"challenge_type": "proof_of_work",
"reason": f"high_abuse_score ({abuse_score:.2f})",
}
else:
# High abuse score — block with cooldown
cooldown_minutes = min(60, 5 * (strikes + 1))
self._client_cooldowns[client_id] = (
datetime.now(timezone.utc) + timedelta(minutes=cooldown_minutes)
)
self._client_strikes[client_id] = strikes + 3
return {
"action": ResponseAction.BLOCK,
"cooldown_minutes": cooldown_minutes,
"reason": f"critical_abuse_score ({abuse_score:.2f}), type: {abuse_type}",
}Defensive Recommendations
- Implement token-aware rate limiting — not just request counts, but total tokens consumed per time window
- Profile client behavior and detect anomalous patterns that indicate extraction or probing
- Use output watermarking to detect model theft after the fact
- Monitor for prompt extraction patterns in user messages
- Apply graduated response policies rather than binary allow/block decisions
- Log all API requests with sufficient detail for forensic analysis (but be careful about storing prompt content due to PII)
- Require authentication for all AI API endpoints — anonymous access enables unattributable abuse
- Set per-request limits on prompt length, max_tokens, and number of completions
References
- Tramer et al. — "Stealing Machine Learning Models via Prediction APIs" (USENIX Security 2016) — model extraction attacks
- Kirchenbauer et al. — "A Watermark for Large Language Models" (ICML 2023) — LLM output watermarking
- OWASP LLM Top 10 2025 — LLM10 (Unbounded Consumption)
- MITRE ATLAS — AML.T0044 (Full ML Model Access), AML.T0024 (Exfiltration via ML Inference API)
- Carlini et al. — "Extracting Training Data from Large Language Models" (USENIX Security 2021) — training data extraction