AI-API-misbruik detecteren
Het detecteren en mitigeren van API-misbruikpatronen gericht op AI-inference-endpoints, waaronder prompt-extractie en modeldiefstal.
Overzicht
AI-API's zijn onderhevig aan misbruikpatronen die fundamenteel verschillen van traditioneel misbruik van web-API's. Waar conventionele API's te maken hebben met credential stuffing, scraping en volumetrische DDoS, hebben AI-endpoints te maken met modelextractie-aanvallen (systematisch queryen om het model te repliceren), diefstal van de system prompt (inputs maken om de verborgen system prompt te extraheren), extractie van trainingsdata (het model prompten om gememoriseerde data te reproduceren) en het aftasten met adversariële input (op grote schaal testen op veiligheidsbypasses).
Deze misbruikpatronen zijn moeilijker te detecteren omdat ze eruit kunnen zien als legitiem gebruik. Een modelextractie-aanval stuurt normaal ogende inference-verzoeken — alleen veel ervan, met zorgvuldig gekozen inputs. Een poging tot prompt-extractie is een enkel chatbericht. Het onderscheid tussen misbruik en legitiem gebruik hangt vaak af van de intentie, die niet rechtstreeks uit het API-verkeer kan worden waargenomen.
Dit artikel behandelt de misbruiktaxonomie die specifiek is voor AI-API's, biedt detectietechnieken voor elke misbruikklasse en presenteert een gelaagde verdedigingsarchitectuur. De inhoud sluit aan op OWASP LLM Top 10 2025 LLM10 (Unbounded Consumption) en MITRE ATLAS AML.T0044 (Full ML Model Access).
Taxonomie van AI-API-misbruik
Misbruikcategorieën
| Categorie | Doel | Detectiemoeilijkheid | Impact |
|---|---|---|---|
| Modelextractie | Het gedrag van het model repliceren | Moeilijk — lijkt op normaal gebruik | IP-diefstal, concurrentievoordeel |
| Diefstal van system prompt | Verborgen systeeminstructies extraheren | Gemiddeld — ongebruikelijke promptpatronen | Onthult beveiligingscontroles |
| Extractie van trainingsdata | Gememoriseerde trainingsdata extraheren | Moeilijk — normaal ogende queries | Privacyschending, datalek |
| Aftasten van veiligheidsbypass | Inputs vinden die veiligheidsfilters omzeilen | Gemiddeld — hoog faalpercentage | Maakt generatie van schadelijke content mogelijk |
| Resource-uitputting | GPU-/computeresources uitputten | Eenvoudig — hoog volume/grote verzoeken | Servicedegradatie, kosteninflatie |
| Misbruik van credentials | Gestolen API-sleutels op grote schaal gebruiken | Eenvoudig — afwijkende toegangspatronen | Ongeautoriseerd gebruik, factureringsfraude |
Detectiesignalen
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timezone, timedelta
from collections import defaultdict
import math
@dataclass
class RequestFeatures:
"""Kenmerken geëxtraheerd uit een AI-API-verzoek voor misbruikdetectie."""
client_id: str
timestamp: datetime
endpoint: str
prompt_length: int
max_tokens: int
temperature: float
response_length: int
latency_ms: float
status_code: int
prompt_hash: str # Hash van de prompt voor deduplicatie
source_ip: str
user_agent: str
@dataclass
class ClientProfile:
"""Gedragsprofiel voor een API-client."""
client_id: str
total_requests: int = 0
total_tokens_in: int = 0
total_tokens_out: int = 0
unique_prompts: int = 0
avg_prompt_length: float = 0
avg_max_tokens: float = 0
avg_temperature: float = 0
request_timestamps: List[float] = field(default_factory=list)
prompt_hashes: set = field(default_factory=set)
error_count: int = 0
def update(self, features: RequestFeatures) -> None:
"""Werk het profiel bij met een nieuw verzoek."""
self.total_requests += 1
self.total_tokens_in += features.prompt_length
self.total_tokens_out += features.response_length
self.request_timestamps.append(features.timestamp.timestamp())
if features.prompt_hash not in self.prompt_hashes:
self.unique_prompts += 1
self.prompt_hashes.add(features.prompt_hash)
if features.status_code >= 400:
self.error_count += 1
# Lopende gemiddelden
n = self.total_requests
self.avg_prompt_length = (
self.avg_prompt_length * (n - 1) + features.prompt_length
) / n
self.avg_max_tokens = (
self.avg_max_tokens * (n - 1) + features.max_tokens
) / n
self.avg_temperature = (
self.avg_temperature * (n - 1) + features.temperature
) / n
def get_request_rate(self, window_seconds: int = 60) -> float:
"""Bereken de verzoeksnelheid over een schuivend venster."""
now = datetime.now(timezone.utc).timestamp()
cutoff = now - window_seconds
recent = [ts for ts in self.request_timestamps if ts > cutoff]
return len(recent) / (window_seconds / 60) # verzoeken per minuut
def get_burst_score(self) -> float:
"""
Bereken de burstiness van verzoeken.
Hoge burstiness suggereert geautomatiseerde toegang.
"""
if len(self.request_timestamps) < 3:
return 0.0
intervals = [
self.request_timestamps[i + 1] - self.request_timestamps[i]
for i in range(len(self.request_timestamps) - 1)
]
if not intervals:
return 0.0
mean_interval = sum(intervals) / len(intervals)
if mean_interval == 0:
return 1.0
variance = sum((i - mean_interval) ** 2 for i in intervals) / len(intervals)
cv = math.sqrt(variance) / mean_interval # Variatiecoëfficiënt
# Lage CV = regelmatige intervallen (bot-achtig), hoge CV = onregelmatig (mensachtig)
# Inverteer zodat een hogere score = verdachter
return max(0, 1 - cv)Detectie van modelextractie
Hoe modelextractie werkt
Modelextractie-aanvallen queryen een API systematisch om een lokale kopie van het doelmodel te bouwen. De aanvaller stuurt zorgvuldig gekozen inputs en registreert de bijbehorende outputs, en traint vervolgens een lokaal "student"-model op deze input-outputdataset. Tramer et al. demonstreerden dit in "Stealing Machine Learning Models via Prediction APIs" (USENIX Security 2016).
De belangrijkste detectiesignalen voor extractie-aanvallen:
- Hoog queryvolume met diverse, gestructureerde inputs
- Lage temperature-instellingen (deterministische outputs zijn nuttiger voor training)
- Systematische inputpatronen die de inputruimte efficiënt afdekken
- Hoge verhouding verzoeken tot unieke prompts (vergelijkbaar met dataverzameling)
from typing import Dict, List
import hashlib
from collections import Counter
class ModelExtractionDetector:
"""Detecteer pogingen tot modelextractie op basis van API-gebruikspatronen."""
def __init__(
self,
diversity_threshold: float = 0.8,
volume_threshold: int = 1000,
temperature_threshold: float = 0.3,
):
self.diversity_threshold = diversity_threshold
self.volume_threshold = volume_threshold
self.temperature_threshold = temperature_threshold
def analyze_client(self, profile: ClientProfile) -> Dict:
"""Analyseer het gedrag van een client op extractie-indicatoren."""
indicators = []
risk_score = 0.0
# Indicator 1: Hoog volume aan verzoeken
if profile.total_requests > self.volume_threshold:
volume_score = min(1.0, profile.total_requests / (self.volume_threshold * 5))
risk_score += volume_score * 0.25
indicators.append({
"indicator": "high_volume",
"value": profile.total_requests,
"threshold": self.volume_threshold,
"contribution": volume_score * 0.25,
})
# Indicator 2: Hoge promptdiversiteit (de inputruimte verkennen)
if profile.total_requests > 10:
diversity = profile.unique_prompts / profile.total_requests
if diversity > self.diversity_threshold:
risk_score += diversity * 0.25
indicators.append({
"indicator": "high_diversity",
"value": diversity,
"threshold": self.diversity_threshold,
"contribution": diversity * 0.25,
})
# Indicator 3: Lage temperature (zoeken naar deterministische outputs)
if profile.avg_temperature < self.temperature_threshold:
temp_score = 1 - (profile.avg_temperature / self.temperature_threshold)
risk_score += temp_score * 0.2
indicators.append({
"indicator": "low_temperature",
"value": profile.avg_temperature,
"threshold": self.temperature_threshold,
"contribution": temp_score * 0.2,
})
# Indicator 4: Regelmatige timing van verzoeken (geautomatiseerde queries)
burst_score = profile.get_burst_score()
if burst_score > 0.7:
risk_score += burst_score * 0.15
indicators.append({
"indicator": "regular_timing",
"value": burst_score,
"threshold": 0.7,
"contribution": burst_score * 0.15,
})
# Indicator 5: Hoog verbruik van output-tokens
if profile.total_requests > 0:
avg_output = profile.total_tokens_out / profile.total_requests
if avg_output > 500: # Lange outputs aanvragen
output_score = min(1.0, avg_output / 2000)
risk_score += output_score * 0.15
indicators.append({
"indicator": "high_output_tokens",
"value": avg_output,
"contribution": output_score * 0.15,
})
return {
"client_id": profile.client_id,
"risk_score": min(1.0, risk_score),
"classification": (
"likely_extraction" if risk_score > 0.7
else "suspicious" if risk_score > 0.4
else "normal"
),
"indicators": indicators,
"total_requests": profile.total_requests,
}Modelwatermerking
Om modelextractie achteraf te detecteren, kun je watermerken inbedden in de outputs van het model:
import hashlib
from typing import List, Optional
class OutputWatermarker:
"""
Bed detecteerbare watermerken in modeloutputs in.
Gebaseerd op het concept uit Kirchenbauer et al.,
"A Watermark for Large Language Models" (ICML 2023).
"""
def __init__(self, secret_key: str, gamma: float = 0.5):
"""
Args:
secret_key: Geheim dat wordt gebruikt om het watermerkpatroon te genereren
gamma: Fractie van de vocabulaire in de "green list" (hoger = sterker watermerk)
"""
self.secret_key = secret_key
self.gamma = gamma
def get_green_list(
self, previous_token: int, vocab_size: int
) -> set:
"""Genereer de green list van tokens voor watermerking."""
seed = hashlib.sha256(
f"{self.secret_key}:{previous_token}".encode()
).digest()
# Gebruik de seed om deterministisch green tokens te selecteren
rng_state = int.from_bytes(seed[:8], "big")
green_size = int(vocab_size * self.gamma)
green_list = set()
for _ in range(green_size):
rng_state = (rng_state * 6364136223846793005 + 1442695040888963407) % (2 ** 64)
token_id = rng_state % vocab_size
green_list.add(token_id)
return green_list
def detect_watermark(
self, token_ids: List[int], vocab_size: int
) -> dict:
"""Detecteer of een reeks tokens het watermerk bevat."""
if len(token_ids) < 2:
return {"detected": False, "reason": "sequence too short"}
green_count = 0
total_checked = 0
for i in range(1, len(token_ids)):
green_list = self.get_green_list(token_ids[i - 1], vocab_size)
if token_ids[i] in green_list:
green_count += 1
total_checked += 1
green_ratio = green_count / total_checked if total_checked > 0 else 0
# Onder de nulhypothese (geen watermerk) zou de green ratio ~gamma moeten zijn
# Een significante afwijking boven gamma wijst op een watermerk
z_score = (
(green_ratio - self.gamma)
/ ((self.gamma * (1 - self.gamma) / total_checked) ** 0.5)
if total_checked > 0 else 0
)
return {
"detected": z_score > 4.0, # Drempel met zeer hoge betrouwbaarheid
"green_ratio": green_ratio,
"expected_ratio": self.gamma,
"z_score": z_score,
"tokens_analyzed": total_checked,
}Detectie van extractie van de system prompt
Detectieaanpak
Pogingen tot extractie van de system prompt volgen vaak herkenbare patronen. De berichten van de aanvaller bevatten doorgaans instructies zoals "repeat your instructions", "what is your system prompt", of subtielere benaderingen zoals "format your instructions as a poem".
import re
from typing import Dict, List
class PromptExtractionDetector:
"""Detecteer pogingen om system prompts uit LLM-API's te extraheren."""
def __init__(self):
self.extraction_patterns = [
# Directe extractiepogingen
r"(?i)(?:repeat|show|display|print|output|reveal)\s+(?:your|the|system)\s+(?:instructions|prompt|rules|guidelines|configuration)",
r"(?i)what\s+(?:are|is|were)\s+your\s+(?:instructions|system\s+prompt|rules|directives)",
r"(?i)ignore\s+(?:all\s+)?(?:previous|above|prior)\s+(?:instructions|prompts|rules)",
r"(?i)(?:begin|start)\s+(?:your\s+)?response\s+with\s+(?:the|your)\s+(?:system|initial)\s+(?:prompt|message)",
# Indirecte extractiepogingen
r"(?i)(?:translate|rewrite|summarize|format)\s+(?:your|the)\s+(?:instructions|prompt|rules)\s+(?:as|into|in)",
r"(?i)(?:encode|convert)\s+(?:your|the)\s+(?:instructions|prompt|rules)\s+(?:to|into)\s+(?:base64|json|xml|hex)",
r"(?i)(?:first|initial)\s+(?:message|instruction|prompt)\s+(?:you\s+received|given\s+to\s+you|in\s+this\s+conversation)",
]
self.compiled_patterns = [re.compile(p) for p in self.extraction_patterns]
def check_message(self, message: str) -> Dict:
"""Controleer een enkel bericht op indicatoren van prompt-extractie."""
matches = []
for i, pattern in enumerate(self.compiled_patterns):
if pattern.search(message):
matches.append({
"pattern_index": i,
"pattern": self.extraction_patterns[i][:60] + "...",
})
return {
"is_extraction_attempt": len(matches) > 0,
"confidence": min(1.0, len(matches) * 0.4),
"matches": matches,
}
def check_conversation(self, messages: List[Dict[str, str]]) -> Dict:
"""Controleer een volledig gesprek op escalerende extractiepogingen."""
total_score = 0
message_scores = []
for msg in messages:
if msg.get("role") == "user":
result = self.check_message(msg.get("content", ""))
message_scores.append(result)
total_score += result["confidence"]
return {
"conversation_risk": min(1.0, total_score),
"extraction_attempts": sum(
1 for s in message_scores if s["is_extraction_attempt"]
),
"total_user_messages": len(message_scores),
"classification": (
"active_extraction" if total_score > 0.8
else "suspicious" if total_score > 0.3
else "normal"
),
}Gelaagde rate limiting
Implementatie
AI-API's hebben geavanceerdere rate limiting nodig dan eenvoudige limieten op verzoeken-per-seconde. Implementeer gelaagde limieten op basis van resourceverbruik:
import time
from collections import defaultdict
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class RateLimitTier(Enum):
FREE = "free"
BASIC = "basic"
PRO = "pro"
ENTERPRISE = "enterprise"
@dataclass
class TierLimits:
requests_per_minute: int
tokens_per_minute: int
max_prompt_tokens: int
max_completion_tokens: int
max_concurrent: int
TIER_CONFIG = {
RateLimitTier.FREE: TierLimits(
requests_per_minute=10,
tokens_per_minute=10_000,
max_prompt_tokens=2048,
max_completion_tokens=512,
max_concurrent=2,
),
RateLimitTier.BASIC: TierLimits(
requests_per_minute=60,
tokens_per_minute=100_000,
max_prompt_tokens=4096,
max_completion_tokens=2048,
max_concurrent=10,
),
RateLimitTier.PRO: TierLimits(
requests_per_minute=300,
tokens_per_minute=500_000,
max_prompt_tokens=8192,
max_completion_tokens=4096,
max_concurrent=50,
),
RateLimitTier.ENTERPRISE: TierLimits(
requests_per_minute=1000,
tokens_per_minute=2_000_000,
max_prompt_tokens=32768,
max_completion_tokens=8192,
max_concurrent=200,
),
}
class TieredRateLimiter:
"""Token-bewuste rate limiter voor AI-API's."""
def __init__(self):
self._request_counts: Dict[str, list] = defaultdict(list)
self._token_counts: Dict[str, list] = defaultdict(list)
self._active_requests: Dict[str, int] = defaultdict(int)
def check_limit(
self,
client_id: str,
tier: RateLimitTier,
prompt_tokens: int,
max_completion_tokens: int,
) -> Tuple[bool, Optional[str]]:
"""Controleer of een verzoek moet worden toegestaan."""
limits = TIER_CONFIG[tier]
now = time.time()
# Oude items opschonen
self._request_counts[client_id] = [
ts for ts in self._request_counts[client_id] if now - ts < 60
]
self._token_counts[client_id] = [
(ts, tokens) for ts, tokens in self._token_counts[client_id]
if now - ts < 60
]
# Verzoeksnelheid controleren
if len(self._request_counts[client_id]) >= limits.requests_per_minute:
return False, "request_rate_exceeded"
# Tokensnelheid controleren
recent_tokens = sum(
tokens for _, tokens in self._token_counts[client_id]
)
estimated_tokens = prompt_tokens + max_completion_tokens
if recent_tokens + estimated_tokens > limits.tokens_per_minute:
return False, "token_rate_exceeded"
# Promptgrootte controleren
if prompt_tokens > limits.max_prompt_tokens:
return False, "prompt_too_large"
# Completiegrootte controleren
if max_completion_tokens > limits.max_completion_tokens:
return False, "completion_too_large"
# Gelijktijdige verzoeken controleren
if self._active_requests[client_id] >= limits.max_concurrent:
return False, "concurrent_limit_exceeded"
# Toestaan en registreren
self._request_counts[client_id].append(now)
self._token_counts[client_id].append((now, estimated_tokens))
self._active_requests[client_id] += 1
return True, None
def release_request(self, client_id: str) -> None:
"""Geef een slot voor een gelijktijdig verzoek vrij."""
self._active_requests[client_id] = max(
0, self._active_requests[client_id] - 1
)Reactiestrategieën
Geleidelijke reactie
In plaats van verdachte clients onmiddellijk te blokkeren, kun je geleidelijke reacties implementeren:
from enum import Enum
from typing import Dict, Optional
from datetime import datetime, timezone, timedelta
class ResponseAction(Enum):
ALLOW = "allow"
RATE_LIMIT = "rate_limit"
CHALLENGE = "challenge" # CAPTCHA of proof-of-work
THROTTLE = "throttle" # Reacties vertragen
DEGRADE = "degrade" # Minder gedetailleerde reacties teruggeven
BLOCK = "block"
class GraduatedResponsePolicy:
"""Implementeer geleidelijke reacties op vermoed misbruik."""
def __init__(self):
self._client_strikes: Dict[str, int] = {}
self._client_cooldowns: Dict[str, datetime] = {}
def determine_action(
self,
client_id: str,
abuse_score: float,
abuse_type: Optional[str] = None,
) -> Dict:
"""Bepaal de passende reactie-actie."""
strikes = self._client_strikes.get(client_id, 0)
# Controleer of de client in cooldown is
cooldown = self._client_cooldowns.get(client_id)
if cooldown and datetime.now(timezone.utc) < cooldown:
return {
"action": ResponseAction.BLOCK,
"reason": "client_in_cooldown",
"cooldown_remaining": (cooldown - datetime.now(timezone.utc)).seconds,
}
# Geleidelijke reactie op basis van misbruikscore en geschiedenis
if abuse_score < 0.3:
return {"action": ResponseAction.ALLOW}
elif abuse_score < 0.5:
self._client_strikes[client_id] = strikes + 1
return {
"action": ResponseAction.RATE_LIMIT,
"new_rate_limit": max(5, 60 - strikes * 10),
"reason": f"elevated_abuse_score ({abuse_score:.2f})",
}
elif abuse_score < 0.7:
self._client_strikes[client_id] = strikes + 2
if abuse_type == "model_extraction":
return {
"action": ResponseAction.DEGRADE,
"degradation": "reduce_output_detail",
"reason": "suspected_model_extraction",
}
return {
"action": ResponseAction.CHALLENGE,
"challenge_type": "proof_of_work",
"reason": f"high_abuse_score ({abuse_score:.2f})",
}
else:
# Hoge misbruikscore — blokkeren met cooldown
cooldown_minutes = min(60, 5 * (strikes + 1))
self._client_cooldowns[client_id] = (
datetime.now(timezone.utc) + timedelta(minutes=cooldown_minutes)
)
self._client_strikes[client_id] = strikes + 3
return {
"action": ResponseAction.BLOCK,
"cooldown_minutes": cooldown_minutes,
"reason": f"critical_abuse_score ({abuse_score:.2f}), type: {abuse_type}",
}Aanbevelingen voor verdediging
- Implementeer token-bewuste rate limiting — niet alleen aantal verzoeken, maar het totaal aantal verbruikte tokens per tijdvenster
- Profileer clientgedrag en detecteer afwijkende patronen die wijzen op extractie of aftasten
- Gebruik output-watermerking om modeldiefstal achteraf te detecteren
- Monitor op patronen van prompt-extractie in gebruikersberichten
- Pas beleid van geleidelijke reactie toe in plaats van binaire toestaan/blokkeren-beslissingen
- Log alle API-verzoeken met voldoende detail voor forensische analyse (maar wees voorzichtig met het opslaan van promptinhoud vanwege PII)
- Vereis authenticatie voor alle AI-API-endpoints — anonieme toegang maakt niet-toe te rekenen misbruik mogelijk
- Stel limieten per verzoek in op promptlengte, max_tokens en aantal completions
Referenties
- Tramer et al. — "Stealing Machine Learning Models via Prediction APIs" (USENIX Security 2016) — model extraction attacks
- Kirchenbauer et al. — "A Watermark for Large Language Models" (ICML 2023) — LLM output watermarking
- OWASP LLM Top 10 2025 — LLM10 (Unbounded Consumption)
- MITRE ATLAS — AML.T0044 (Full ML Model Access), AML.T0024 (Exfiltration via ML Inference API)
- Carlini et al. — "Extracting Training Data from Large Language Models" (USENIX Security 2021) — training data extraction