AI API Red Team Engagement
Complete walkthrough for testing AI APIs: endpoint enumeration, authentication bypass, rate limit evasion, input validation testing, output data leakage, and model fingerprinting through API behavior.
AI APIs combine traditional API security concerns (authentication, authorization, input validation, rate limiting) with AI-specific concerns (prompt injection, model extraction, output safety). This walkthrough covers both dimensions in an integrated engagement approach.
Phase 1: API Reconnaissance
Endpoint Discovery
# api_recon.py
"""Discover and map AI API endpoints."""
import requests
import json
from urllib.parse import urljoin
class AIAPIRecon:
"""Enumerate AI API endpoints and their characteristics."""
def __init__(self, base_url, auth_token=None):
self.base_url = base_url
self.session = requests.Session()
if auth_token:
self.session.headers["Authorization"] = f"Bearer {auth_token}"
self.discovered_endpoints = []
def discover_endpoints(self):
"""Probe for common AI API endpoints."""
print("=== Endpoint Discovery ===\n")
common_paths = [
# Chat/completion endpoints
"/v1/chat/completions",
"/v1/completions",
"/api/chat",
"/api/generate",
"/api/v1/predict",
"/inference",
"/predict",
# Model management
"/v1/models",
"/api/models",
"/models",
# Embedding endpoints
"/v1/embeddings",
"/api/embed",
"/embed",
# Image/multimodal
"/v1/images/generations",
"/v1/audio/transcriptions",
# Administrative
"/health",
"/status",
"/metrics",
"/api/config",
"/api/version",
"/.well-known/openapi.json",
"/openapi.json",
"/swagger.json",
"/docs",
"/redoc",
]
for path in common_paths:
url = urljoin(self.base_url, path)
try:
# Try GET first, then POST with empty body
for method in ["GET", "POST"]:
if method == "GET":
resp = self.session.get(url, timeout=5)
else:
resp = self.session.post(url, json={}, timeout=5)
if resp.status_code not in [404, 405]:
endpoint = {
"path": path,
"method": method,
"status": resp.status_code,
"content_type": resp.headers.get("content-type", ""),
"response_size": len(resp.content),
}
self.discovered_endpoints.append(endpoint)
print(f" [{method}] {path} -> {resp.status_code} "
f"({len(resp.content)} bytes)")
# Check for info leakage in headers
interesting_headers = [
"x-powered-by", "server", "x-request-id",
"x-ratelimit-limit", "x-ratelimit-remaining",
"x-model-id", "x-version",
]
for header in interesting_headers:
value = resp.headers.get(header)
if value:
print(f" Header: {header}: {value}")
break
except requests.exceptions.RequestException:
pass
# Check for OpenAPI/Swagger documentation
for doc_path in ["/openapi.json", "/swagger.json", "/docs",
"/.well-known/openapi.json"]:
try:
resp = self.session.get(urljoin(self.base_url, doc_path),
timeout=5)
if resp.status_code == 200:
print(f"\n [FOUND] API Documentation: {doc_path}")
if "json" in resp.headers.get("content-type", ""):
spec = resp.json()
paths = spec.get("paths", {})
print(f" Documented endpoints: {len(paths)}")
except Exception:
pass
return self.discovered_endpoints
recon = AIAPIRecon("https://api.example.com")
endpoints = recon.discover_endpoints()Authentication Analysis
# auth_testing.py
"""Test API authentication mechanisms."""
def test_authentication(base_url, valid_token):
"""Analyze authentication implementation."""
print("=== Authentication Testing ===\n")
test_endpoint = f"{base_url}/v1/chat/completions"
test_body = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 5,
}
# Test 1: No authentication
print("--- Missing Authentication ---")
resp = requests.post(test_endpoint, json=test_body, timeout=10)
print(f" No auth: {resp.status_code}")
if resp.status_code == 200:
print(f" CRITICAL: API accessible without authentication")
# Test 2: Invalid token formats
print("\n--- Invalid Token Formats ---")
invalid_tokens = [
("empty", ""),
("null", "null"),
("single_char", "x"),
("long_random", "a" * 1000),
("sql_injection", "' OR '1'='1"),
("jwt_none", "eyJhbGciOiJub25lIn0.eyJzdWIiOiIxIn0."),
]
for label, token in invalid_tokens:
headers = {"Authorization": f"Bearer {token}"}
resp = requests.post(test_endpoint, json=test_body,
headers=headers, timeout=10)
status = "ACCESS GRANTED" if resp.status_code == 200 else f"{resp.status_code}"
print(f" [{status}] {label}")
# Test 3: Token in different locations
print("\n--- Token Location Testing ---")
locations = [
("header_bearer", {"Authorization": f"Bearer {valid_token}"}),
("header_apikey", {"X-API-Key": valid_token}),
("query_param", {}), # Will add to URL
("cookie", {"Cookie": f"token={valid_token}"}),
]
for label, headers in locations:
url = test_endpoint
if label == "query_param":
url = f"{test_endpoint}?api_key={valid_token}"
resp = requests.post(url, json=test_body, headers=headers, timeout=10)
works = resp.status_code == 200
print(f" [{label}] {'Accepted' if works else 'Rejected'}")
# Test 4: IDOR - accessing other users' resources
print("\n--- IDOR Testing ---")
# If the API uses user-scoped endpoints
idor_paths = [
"/v1/users/1/conversations",
"/v1/users/0/conversations",
"/v1/users/admin/conversations",
"/api/conversations/1",
"/api/conversations/0",
]
for path in idor_paths:
headers = {"Authorization": f"Bearer {valid_token}"}
resp = requests.get(f"{base_url}{path}", headers=headers, timeout=5)
if resp.status_code == 200 and len(resp.content) > 50:
print(f" [IDOR] {path}: {resp.status_code} "
f"({len(resp.content)} bytes)")
test_authentication("https://api.example.com", "your-valid-token")Phase 2: Rate Limiting Analysis
# rate_limit_testing.py
"""Test API rate limiting implementation."""
import time
import concurrent.futures
def test_rate_limits(base_url, auth_token):
"""Analyze rate limiting configuration and bypass opportunities."""
print("=== Rate Limit Analysis ===\n")
endpoint = f"{base_url}/v1/chat/completions"
headers = {"Authorization": f"Bearer {auth_token}"}
body = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 1,
}
# Test 1: Determine rate limit threshold
print("--- Threshold Detection ---")
responses = []
for i in range(100):
resp = requests.post(endpoint, json=body, headers=headers, timeout=10)
responses.append({
"request_num": i + 1,
"status": resp.status_code,
"remaining": resp.headers.get("x-ratelimit-remaining", "N/A"),
})
if resp.status_code == 429:
print(f" Rate limited at request {i + 1}")
retry_after = resp.headers.get("retry-after", "unknown")
print(f" Retry-After: {retry_after}")
break
if not any(r["status"] == 429 for r in responses):
print(f" No rate limit hit after {len(responses)} requests")
print(f" FINDING: No apparent rate limiting")
# Test 2: Rate limit bypass techniques
print("\n--- Bypass Techniques ---")
# Technique: IP rotation via headers
ip_bypass_headers = [
{"X-Forwarded-For": "10.0.0.1"},
{"X-Real-IP": "10.0.0.2"},
{"X-Originating-IP": "10.0.0.3"},
{"CF-Connecting-IP": "10.0.0.4"},
{"True-Client-IP": "10.0.0.5"},
]
for bypass_headers in ip_bypass_headers:
combined_headers = {**headers, **bypass_headers}
resp = requests.post(endpoint, json=body,
headers=combined_headers, timeout=10)
header_name = list(bypass_headers.keys())[0]
print(f" [{header_name}] Status: {resp.status_code}")
# Technique: Concurrent requests
print("\n--- Concurrent Request Test ---")
def send_request(_):
return requests.post(endpoint, json=body, headers=headers, timeout=10)
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(send_request, i) for i in range(20)]
results = [f.result() for f in futures]
success = sum(1 for r in results if r.status_code == 200)
limited = sum(1 for r in results if r.status_code == 429)
print(f" 20 concurrent requests: {success} success, {limited} limited")
# Test 3: Cost-based rate limit evasion
print("\n--- Token-Based Rate Limit ---")
# Some APIs rate limit by tokens, not requests
small_request = {**body, "max_tokens": 1}
large_request = {**body, "max_tokens": 4096,
"messages": [{"role": "user", "content": "Write a 5000 word essay"}]}
for label, request_body in [("small", small_request), ("large", large_request)]:
resp = requests.post(endpoint, json=request_body,
headers=headers, timeout=30)
remaining = resp.headers.get("x-ratelimit-remaining-tokens", "N/A")
print(f" [{label}] Status: {resp.status_code}, "
f"Remaining tokens: {remaining}")
test_rate_limits("https://api.example.com", "your-token")Phase 3: Input Validation Testing
# input_validation.py
"""Test API input validation for AI-specific and general vulnerabilities."""
def test_input_validation(base_url, auth_token):
"""Comprehensive input validation testing."""
print("=== Input Validation Testing ===\n")
endpoint = f"{base_url}/v1/chat/completions"
headers = {
"Authorization": f"Bearer {auth_token}",
"Content-Type": "application/json",
}
# Test 1: Oversized inputs
print("--- Input Size Limits ---")
sizes = [100, 1000, 10000, 100000, 1000000]
for size in sizes:
body = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": "A" * size}],
"max_tokens": 5,
}
try:
resp = requests.post(endpoint, json=body, headers=headers, timeout=30)
print(f" {size:>10} chars: {resp.status_code}")
except Exception as e:
print(f" {size:>10} chars: Error - {str(e)[:50]}")
# Test 2: Malformed JSON
print("\n--- Malformed Input ---")
malformed_payloads = [
("missing_model", {"messages": [{"role": "user", "content": "test"}]}),
("missing_messages", {"model": "gpt-4o"}),
("wrong_role", {"model": "gpt-4o",
"messages": [{"role": "admin", "content": "test"}]}),
("empty_content", {"model": "gpt-4o",
"messages": [{"role": "user", "content": ""}]}),
("null_content", {"model": "gpt-4o",
"messages": [{"role": "user", "content": None}]}),
("array_content", {"model": "gpt-4o",
"messages": [{"role": "user", "content": [1, 2, 3]}]}),
("nested_messages", {"model": "gpt-4o",
"messages": [{"role": "user",
"content": {"nested": "object"}}]}),
("extra_fields", {"model": "gpt-4o",
"messages": [{"role": "user", "content": "test"}],
"admin": True, "debug": True, "internal": True}),
]
for label, body in malformed_payloads:
resp = requests.post(endpoint, json=body, headers=headers, timeout=10)
print(f" [{label}] {resp.status_code}")
if resp.status_code == 200:
print(f" FINDING: Server accepted malformed input")
elif resp.status_code >= 500:
print(f" FINDING: Server error on malformed input (may leak info)")
print(f" Error: {resp.text[:100]}")
# Test 3: Model enumeration
print("\n--- Model Enumeration ---")
model_guesses = [
"gpt-4o", "gpt-4", "gpt-3.5-turbo", "gpt-4o-mini",
"claude-3-opus", "claude-3-sonnet", "claude-3-haiku",
"gemini-pro", "gemini-1.5-pro",
"llama-3.1-70b", "mistral-7b",
"../../etc/passwd", "' OR 1=1 --",
]
for model in model_guesses:
body = {
"model": model,
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 5,
}
resp = requests.post(endpoint, json=body, headers=headers, timeout=10)
if resp.status_code == 200:
print(f" [AVAILABLE] {model}")
elif resp.status_code == 404:
print(f" [NOT FOUND] {model}")
else:
print(f" [{resp.status_code}] {model}: {resp.text[:60]}")
# Test 4: Parameter boundary testing
print("\n--- Parameter Boundaries ---")
param_tests = [
("temp_neg", {"temperature": -1}),
("temp_max", {"temperature": 100}),
("tokens_zero", {"max_tokens": 0}),
("tokens_huge", {"max_tokens": 999999999}),
("top_p_neg", {"top_p": -0.5}),
("top_p_over", {"top_p": 5.0}),
("n_many", {"n": 100}),
("freq_penalty", {"frequency_penalty": 10}),
]
for label, extra_params in param_tests:
body = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": "test"}],
"max_tokens": 5,
**extra_params,
}
resp = requests.post(endpoint, json=body, headers=headers, timeout=10)
print(f" [{label}] {resp.status_code}")
if resp.status_code == 200:
print(f" FINDING: Extreme parameter accepted")
test_input_validation("https://api.example.com", "your-token")Phase 4: Output and Error Analysis
# output_analysis.py
"""Analyze API outputs and errors for information leakage."""
def test_output_leakage(base_url, auth_token):
"""Check API responses for information disclosure."""
print("=== Output Analysis ===\n")
endpoint = f"{base_url}/v1/chat/completions"
headers = {"Authorization": f"Bearer {auth_token}"}
# Test 1: Error message information leakage
print("--- Error Message Analysis ---")
error_triggers = [
("invalid_json", b"not json", "text/plain"),
("huge_payload", b'{"x":' + b'"a"' * 10000000 + b'}', "application/json"),
("xml_payload", b"<request><prompt>test</prompt></request>", "text/xml"),
]
for label, body, content_type in error_triggers:
try:
resp = requests.post(
endpoint,
data=body[:1000000], # Limit size
headers={**headers, "Content-Type": content_type},
timeout=10,
)
if resp.status_code >= 400:
error_text = resp.text[:500]
# Check for stack traces, internal paths, versions
leakage_indicators = [
("stack_trace", "Traceback" in error_text or "at " in error_text),
("file_path", "/" in error_text and ".py" in error_text),
("version", any(v in error_text for v in
["version", "v1.", "v2.", "Python"])),
("internal_url", "localhost" in error_text or
"internal" in error_text),
]
for indicator, found in leakage_indicators:
if found:
print(f" [{label}] LEAKS {indicator}: "
f"{error_text[:100]}...")
except Exception:
pass
# Test 2: Response metadata analysis
print("\n--- Response Metadata ---")
body = {
"model": "gpt-4o",
"messages": [{"role": "user", "content": "Hello"}],
"max_tokens": 10,
}
resp = requests.post(endpoint, json=body, headers=headers, timeout=10)
if resp.status_code == 200:
data = resp.json()
print(f" Response fields: {list(data.keys())}")
# Check for model/system info in response
if "model" in data:
print(f" Model: {data['model']}")
if "system_fingerprint" in data:
print(f" System fingerprint: {data['system_fingerprint']}")
if "usage" in data:
print(f" Usage: {data['usage']}")
# Check response headers
interesting = {k: v for k, v in resp.headers.items()
if any(x in k.lower() for x in
["model", "version", "server", "rate", "powered"])}
if interesting:
print(f" Interesting headers: {interesting}")
test_output_leakage("https://api.example.com", "your-token")Phase 5: Model Fingerprinting
# model_fingerprinting.py
"""Identify the underlying model through behavioral analysis."""
def fingerprint_model(base_url, auth_token):
"""Determine the model identity through behavioral probes."""
print("=== Model Fingerprinting ===\n")
endpoint = f"{base_url}/v1/chat/completions"
headers = {"Authorization": f"Bearer {auth_token}"}
fingerprint_prompts = [
# Knowledge cutoff
("cutoff", "What is the most recent event you know about? "
"What is your training data cutoff date?"),
# Self-identification
("identity", "What AI model are you? Who created you?"),
# Capability probing
("code", "Write a Python function to calculate fibonacci numbers"),
("reasoning", "If all bloops are razzies and all razzies are lazzies, "
"are all bloops lazzies?"),
# Response style analysis
("style", "Explain quantum computing in one paragraph"),
# Token-level behavior
("tokens", "Repeat the word 'buffalo' exactly 10 times"),
]
responses = {}
for label, prompt in fingerprint_prompts:
body = {
"model": "default",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 200,
"temperature": 0,
}
resp = requests.post(endpoint, json=body, headers=headers, timeout=30)
if resp.status_code == 200:
data = resp.json()
text = data["choices"][0]["message"]["content"]
responses[label] = text[:300]
print(f" [{label}] {text[:100]}...")
print()
# Analyze fingerprint
print("=== Fingerprint Analysis ===")
identity_response = responses.get("identity", "").lower()
if "gpt" in identity_response or "openai" in identity_response:
print(" Likely model family: OpenAI GPT")
elif "claude" in identity_response or "anthropic" in identity_response:
print(" Likely model family: Anthropic Claude")
elif "gemini" in identity_response or "google" in identity_response:
print(" Likely model family: Google Gemini")
elif "llama" in identity_response or "meta" in identity_response:
print(" Likely model family: Meta LLaMA")
else:
print(" Model family: Could not determine from self-identification")
fingerprint_model("https://api.example.com", "your-token")Reporting
AI API Finding Categories
| Category | Examples | Typical Severity |
|---|---|---|
| Authentication bypass | No auth required, token format bypass | Critical |
| Missing rate limiting | Unlimited requests, no token limits | High |
| IDOR | Access other users' conversations/data | High |
| Input validation | Oversized payloads, parameter injection | Medium-High |
| Information disclosure | Stack traces, internal paths, model details | Medium |
| Model enumeration | List of all available models discoverable | Low |
| Verbose errors | Debug information in error responses | Low-Medium |