Hardening-gids voor MCP-servers: complete uitrolbeveiliging
Gevorderd18 min lezenBijgewerkt op 2026-03-24
Een complete hardening-gids voor MCP-serveruitrollen -- met een beveiligingschecklist van 24 punten, Docker-isolatie, configuratie van een Nginx reverse proxy, het opzetten van logging en monitoring, en het afdwingen van netwerkbeleid, inclusief werkende configuraties voor elk onderdeel.
Deze pagina biedt een complete, productieklare hardening-gids voor MCP-serveruitrollen. Elke getoonde configuratie is een werkend voorbeeld dat je aan je eigen omgeving kunt aanpassen. De gids volgt een defense-in-depth-aanpak: elke laag biedt onafhankelijke bescherming, en samen dekken ze het volledige MCP-dreigingslandschap af.
De MCP-beveiligingschecklist met 24 punten
# mcp-security-checklist.yaml
# Gebruik dit als deployment gate -- alle punten moeten slagen voordat het naar productie gaat
checklist:
# === Authenticatie en autorisatie (punten 1-6) ===
- id: 1
category: authentication
item: "HTTP+SSE servers require Bearer token or mTLS authentication"
severity: critical
test: "curl -s -o /dev/null -w '%{http_code}' https://mcp-server/sse"
expected: "401" # Moet niet-geauthenticeerde requests weigeren
- id: 2
category: authentication
item: "Token rotation policy enforced (max 90-day lifetime)"
severity: high
test: "Check token expiry dates in token store"
- id: 3
category: authorization
item: "Per-tool authorization scopes configured for each client"
severity: high
test: "Verify client cannot call tools outside its scope"
- id: 4
category: authorization
item: "Principle of least privilege: clients only have access to required tools"
severity: high
test: "Review scope assignments against actual usage"
- id: 5
category: authentication
item: "stdio servers verify parent process identity"
severity: medium
test: "Start server from unauthorized process, verify rejection"
- id: 6
category: authentication
item: "Rate limiting configured per client per tool"
severity: high
test: "Send burst of requests, verify rate limit headers"
# === Invoervalidatie (punten 7-12) ===
- id: 7
category: input_validation
item: "All tool parameters validated against type schema before execution"
severity: critical
test: "Send injection payload in each parameter, verify rejection"
- id: 8
category: input_validation
item: "No shell=True in any subprocess call"
severity: critical
test: "grep -r 'shell=True' src/"
- id: 9
category: input_validation
item: "Path traversal prevention on all file operations"
severity: critical
test: "Send ../../etc/passwd, verify blocked"
- id: 10
category: input_validation
item: "Tool description validation: no injection patterns"
severity: high
test: "Scan all tool descriptions for imperative instructions"
- id: 11
category: input_validation
item: "Maximum parameter length enforced"
severity: medium
test: "Send 100KB string in parameter, verify rejection"
- id: 12
category: input_validation
item: "Output size limits configured for all tools"
severity: high
test: "Generate large output, verify truncation"
# === Isolatie en sandboxing (punten 13-17) ===
- id: 13
category: isolation
item: "MCP server runs in Docker container or namespace"
severity: high
test: "Verify server PID is in container namespace"
- id: 14
category: isolation
item: "Container runs as non-root user"
severity: critical
test: "docker exec mcp-server whoami # mag geen root zijn"
- id: 15
category: isolation
item: "Read-only root filesystem with explicit writable mounts"
severity: high
test: "docker exec mcp-server touch /test # moet falen"
- id: 16
category: isolation
item: "All capabilities dropped (--cap-drop ALL)"
severity: high
test: "Check container security options"
- id: 17
category: isolation
item: "Network access restricted to required endpoints only"
severity: high
test: "From container, attempt to reach unauthorized endpoint"
# === Monitoring en logging (punten 18-21) ===
- id: 18
category: monitoring
item: "All tool calls logged with parameters, client identity, and timing"
severity: critical
test: "Make tool call, verify structured log entry"
- id: 19
category: monitoring
item: "Security events generate alerts (auth failures, injection attempts)"
severity: high
test: "Trigger auth failure, verify alert fires"
- id: 20
category: monitoring
item: "Token budget tracking and alerting configured"
severity: high
test: "Verify budget metrics are being collected"
- id: 21
category: monitoring
item: "Log retention meets compliance requirements (min 90 days)"
severity: medium
test: "Check log rotation and retention configuration"
# === Supply chain en updates (punten 22-24) ===
- id: 22
category: supply_chain
item: "MCP server packages pinned to specific verified versions"
severity: critical
test: "Verify lockfile or version pins in configuration"
- id: 23
category: supply_chain
item: "Package integrity verification on startup"
severity: high
test: "Modify server binary, verify startup failure"
- id: 24
category: supply_chain
item: "MCP SDK version >= 1.26.0 (session isolation fix)"
severity: critical
test: "pip show mcp | grep Version"Geautomatiseerd verificatiescript voor de checklist
#!/bin/bash
# mcp-security-audit.sh -- Geautomatiseerde verificatie van de MCP-beveiligingschecklist
set -euo pipefail
MCP_SERVER_URL="${1:?Usage: mcp-security-audit.sh <server-url> [container-name]}"
CONTAINER="${2:-mcp-server}"
PASS=0
FAIL=0
WARN=0
check() {
local id="$1" severity="$2" description="$3" result="$4"
if [ "$result" = "PASS" ]; then
echo "[PASS] #${id} (${severity}): ${description}"
PASS=$((PASS + 1))
elif [ "$result" = "WARN" ]; then
echo "[WARN] #${id} (${severity}): ${description}"
WARN=$((WARN + 1))
else
echo "[FAIL] #${id} (${severity}): ${description}"
FAIL=$((FAIL + 1))
fi
}
echo "=== MCP Server Security Audit ==="
echo "Target: ${MCP_SERVER_URL}"
echo "Date: $(date -u)"
echo ""
# Punt 1: Authenticatie vereist
HTTP_CODE=$(curl -s -o /dev/null -w '%{http_code}' "${MCP_SERVER_URL}/sse" 2>/dev/null || echo "000")
if [ "$HTTP_CODE" = "401" ] || [ "$HTTP_CODE" = "403" ]; then
check 1 "critical" "HTTP+SSE requires authentication" "PASS"
elif [ "$HTTP_CODE" = "000" ]; then
check 1 "critical" "HTTP+SSE requires authentication" "WARN"
else
check 1 "critical" "HTTP+SSE requires authentication (got HTTP ${HTTP_CODE})" "FAIL"
fi
# Punt 8: Geen shell=True
if docker exec "$CONTAINER" grep -r "shell=True" /app/ 2>/dev/null | grep -v "test" | grep -v "#" | grep -q .; then
check 8 "critical" "No shell=True in subprocess calls" "FAIL"
else
check 8 "critical" "No shell=True in subprocess calls" "PASS"
fi
# Punt 9: Preventie van path traversal
TRAVERSAL_RESULT=$(curl -s -X POST "${MCP_SERVER_URL}/messages/" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${MCP_TEST_TOKEN:-test}" \
-d '{"jsonrpc":"2.0","method":"tools/call","id":1,"params":{"name":"read_file","arguments":{"path":"../../etc/passwd"}}}' 2>/dev/null || echo "ERROR")
if echo "$TRAVERSAL_RESULT" | grep -qi "denied\|blocked\|error\|traversal"; then
check 9 "critical" "Path traversal prevention" "PASS"
else
check 9 "critical" "Path traversal prevention" "FAIL"
fi
# Punt 14: Non-root-gebruiker
CONTAINER_USER=$(docker exec "$CONTAINER" whoami 2>/dev/null || echo "unknown")
if [ "$CONTAINER_USER" != "root" ] && [ "$CONTAINER_USER" != "unknown" ]; then
check 14 "critical" "Container runs as non-root (${CONTAINER_USER})" "PASS"
else
check 14 "critical" "Container runs as non-root (got: ${CONTAINER_USER})" "FAIL"
fi
# Punt 15: Read-only root-filesystem
if docker exec "$CONTAINER" touch /test-write 2>/dev/null; then
docker exec "$CONTAINER" rm -f /test-write 2>/dev/null
check 15 "high" "Read-only root filesystem" "FAIL"
else
check 15 "high" "Read-only root filesystem" "PASS"
fi
# Punt 16: Capabilities gedropt
CAP_CHECK=$(docker inspect "$CONTAINER" --format='{{.HostConfig.CapDrop}}' 2>/dev/null || echo "[]")
if echo "$CAP_CHECK" | grep -qi "all"; then
check 16 "high" "All capabilities dropped" "PASS"
else
check 16 "high" "All capabilities dropped (CapDrop: ${CAP_CHECK})" "FAIL"
fi
# Punt 24: SDK-versie
SDK_VERSION=$(docker exec "$CONTAINER" pip show mcp 2>/dev/null | grep "^Version:" | awk '{print $2}' || echo "unknown")
if [ "$SDK_VERSION" != "unknown" ]; then
if python3 -c "from packaging.version import Version; exit(0 if Version('$SDK_VERSION') >= Version('1.26.0') else 1)" 2>/dev/null; then
check 24 "critical" "MCP SDK >= 1.26.0 (${SDK_VERSION})" "PASS"
else
check 24 "critical" "MCP SDK >= 1.26.0 (got ${SDK_VERSION})" "FAIL"
fi
else
check 24 "critical" "MCP SDK version check" "WARN"
fi
echo ""
echo "=== Audit Summary ==="
echo "Passed: ${PASS}"
echo "Failed: ${FAIL}"
echo "Warnings: ${WARN}"
echo ""
if [ "$FAIL" -gt 0 ]; then
echo "STATUS: FAILED -- ${FAIL} critical/high items need remediation"
exit 1
else
echo "STATUS: PASSED (with ${WARN} warnings)"
fiDocker-isolatie voor MCP-servers
Productie-Dockerfile
# Dockerfile voor een gehardende MCP-serveruitrol
# Multi-stage build voor een minimaal aanvalsoppervlak
# === Build-fase ===
FROM python:3.12-slim AS builder
WORKDIR /build
# Build-afhankelijkheden installeren
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Applicatiecode kopiëren en verifiëren
COPY src/ /build/src/
COPY setup.py pyproject.toml /build/
# De applicatie bouwen
RUN pip install --no-cache-dir --prefix=/install .
# === Runtime-fase ===
FROM python:3.12-slim AS runtime
# Beveiliging: niet-root-gebruiker en -groep aanmaken
RUN groupadd -r mcp --gid=10001 && \
useradd -r -g mcp --uid=10001 -d /home/mcp -s /usr/sbin/nologin mcp
# Alleen runtime-afhankelijkheden uit de builder kopiëren
COPY --from=builder /install /usr/local
# Applicatiecode kopiëren
COPY --from=builder --chown=mcp:mcp /build/src /app/src
# Vereiste mappen aanmaken met de juiste rechten
RUN mkdir -p /workspace /var/log/mcp /var/mcp/config && \
chown -R mcp:mcp /workspace /var/log/mcp /var/mcp && \
chmod 750 /workspace /var/log/mcp /var/mcp/config
# Onnodige packages en bestanden verwijderen
RUN apt-get purge -y --auto-remove && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* && \
find / -name "*.pyc" -delete && \
find / -name "__pycache__" -type d -exec rm -rf {} + 2>/dev/null || true
# Beveiligingslabels
LABEL security.hardened="true" \
security.non-root="true" \
security.read-only-fs="true"
# Overschakelen naar de niet-root-gebruiker
USER 10001:10001
WORKDIR /app
# Health check-endpoint
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')" || exit 1
# Alleen de MCP-poort blootstellen
EXPOSE 8080
ENTRYPOINT ["python", "-m", "src.server"]Docker Compose-configuratie
# docker-compose.yaml -- Productie-MCP-serverstack
version: "3.8"
services:
mcp-server:
build:
context: .
dockerfile: Dockerfile
container_name: mcp-server
restart: unless-stopped
# Beveiliging: read-only root-filesystem
read_only: true
tmpfs:
- /tmp:size=100M,noexec,nosuid,nodev
# Beveiliging: alle capabilities droppen
cap_drop:
- ALL
# Beveiliging: geen privilege escalation
security_opt:
- no-new-privileges:true
# Beveiliging: resourcelimieten
deploy:
resources:
limits:
cpus: "1.0"
memory: 512M
pids: 100
reservations:
cpus: "0.25"
memory: 128M
# Volumes: mount alleen wat nodig is
volumes:
- mcp-workspace:/workspace:rw
- mcp-config:/var/mcp/config:ro
- mcp-logs:/var/log/mcp:rw
# Netwerk: alleen intern (bereikbaar via de nginx-proxy)
networks:
- mcp-internal
# Environment
environment:
- MCP_LOG_LEVEL=info
- MCP_WORKSPACE=/workspace
- MCP_HOST=0.0.0.0
- MCP_PORT=8080
env_file:
- .env.mcp # Bevat geheimen (niet in version control)
# Logging
logging:
driver: json-file
options:
max-size: "50m"
max-file: "5"
tag: "mcp-server"
nginx-proxy:
image: nginx:1.25-alpine
container_name: mcp-nginx
restart: unless-stopped
read_only: true
tmpfs:
- /tmp:size=10M
- /var/cache/nginx:size=50M
- /run:size=10M
cap_drop:
- ALL
cap_add:
- NET_BIND_SERVICE # Nodig om poort 443 te binden
ports:
- "443:443"
volumes:
- ./nginx/conf.d:/etc/nginx/conf.d:ro
- ./nginx/certs:/etc/nginx/certs:ro
- nginx-logs:/var/log/nginx:rw
networks:
- mcp-external
- mcp-internal
depends_on:
- mcp-server
# Logaggregatie
promtail:
image: grafana/promtail:latest
container_name: mcp-promtail
restart: unless-stopped
read_only: true
volumes:
- mcp-logs:/var/log/mcp:ro
- nginx-logs:/var/log/nginx:ro
- ./promtail-config.yaml:/etc/promtail/config.yaml:ro
networks:
- mcp-monitoring
volumes:
mcp-workspace:
mcp-config:
mcp-logs:
nginx-logs:
networks:
mcp-internal:
internal: true # Geen externe toegang
mcp-external:
# Naar het internet gericht (alleen nginx)
mcp-monitoring:
internal: trueConfiguratie van de Nginx reverse proxy
# /nginx/conf.d/mcp-server.conf
# Productie-Nginx-configuratie voor de MCP-serverproxy
# Rate limiting
limit_req_zone $binary_remote_addr zone=mcp_connect:10m rate=5r/m;
limit_req_zone $binary_remote_addr zone=mcp_call:10m rate=30r/m;
limit_conn_zone $binary_remote_addr zone=mcp_conn:10m;
# Upstream
upstream mcp_backend {
server mcp-server:8080;
keepalive 16;
}
# Map om verdachte user agents te blokkeren
map $http_user_agent $blocked_agent {
default 0;
"~*scanner" 1;
"~*nikto" 1;
"~*sqlmap" 1;
"~*nmap" 1;
"" 1;
}
server {
listen 443 ssl http2;
server_name mcp.example.com;
# TLS-configuratie
ssl_certificate /etc/nginx/certs/server.crt;
ssl_certificate_key /etc/nginx/certs/server.key;
ssl_protocols TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers on;
ssl_session_timeout 1d;
ssl_session_cache shared:SSL:10m;
ssl_session_tickets off;
# OCSP stapling
ssl_stapling on;
ssl_stapling_verify on;
# Optioneel: mTLS
# ssl_client_certificate /etc/nginx/certs/ca.crt;
# ssl_verify_client on;
# Verbindingslimieten
limit_conn mcp_conn 10;
# Verdachte user agents blokkeren
if ($blocked_agent) {
return 403;
}
# Beveiligingsheaders
add_header X-Content-Type-Options "nosniff" always;
add_header X-Frame-Options "DENY" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
add_header Content-Security-Policy "default-src 'none'" always;
add_header Referrer-Policy "no-referrer" always;
# Serverversie verbergen
server_tokens off;
# Limieten voor de request-body
client_max_body_size 1m;
client_body_timeout 10s;
client_header_timeout 10s;
# Health check (geen auth, geen rate limit)
location /health {
proxy_pass http://mcp_backend/health;
access_log off;
limit_req zone=mcp_connect burst=10 nodelay;
}
# SSE-verbindingsendpoint
location /sse {
limit_req zone=mcp_connect burst=3 nodelay;
# Aanwezigheid van de Authorization-header valideren
if ($http_authorization = "") {
return 401 '{"error": "Authentication required"}';
}
# SSE-proxyconfiguratie
proxy_pass http://mcp_backend/sse;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# SSE-specifieke timeouts
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
chunked_transfer_encoding on;
}
# Endpoint voor tool calls
location /messages/ {
limit_req zone=mcp_call burst=10 nodelay;
# Authorization-header valideren
if ($http_authorization = "") {
return 401 '{"error": "Authentication required"}';
}
# Content-Type valideren
if ($content_type !~ "application/json") {
return 415 '{"error": "Content-Type must be application/json"}';
}
proxy_pass http://mcp_backend/messages/;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Request-ID $request_id;
# Timeout voor tool-uitvoering
proxy_read_timeout 60s;
proxy_send_timeout 10s;
}
# Al het andere blokkeren
location / {
return 404 '{"error": "Not found"}';
}
# Aangepaste foutpagina's
error_page 429 = @rate_limited;
location @rate_limited {
default_type application/json;
return 429 '{"error": "Rate limit exceeded", "retry_after": 60}';
}
# Gestructureerde access log
log_format mcp_json escape=json
'{'
'"time": "$time_iso8601",'
'"remote_addr": "$remote_addr",'
'"request_method": "$request_method",'
'"request_uri": "$request_uri",'
'"status": $status,'
'"body_bytes_sent": $body_bytes_sent,'
'"request_time": $request_time,'
'"http_user_agent": "$http_user_agent",'
'"request_id": "$request_id",'
'"ssl_client_s_dn": "$ssl_client_s_dn"'
'}';
access_log /var/log/nginx/mcp-access.json mcp_json;
error_log /var/log/nginx/mcp-error.log warn;
}
# HTTP naar HTTPS omleiden
server {
listen 80;
server_name mcp.example.com;
return 301 https://$server_name$request_uri;
}Opzet van logging en monitoring
Configuratie van gestructureerde logging
"""
Configuratie voor gestructureerde logging van de MCP-server.
Geeft JSON-logs uit die geschikt zijn voor logaggregatie (ELK, Loki, Datadog).
"""
import json
import logging
import sys
import time
import uuid
from datetime import datetime, timezone
from contextvars import ContextVar
# Contextvariabelen voor request-tracing
request_id: ContextVar[str] = ContextVar("request_id", default="")
session_id: ContextVar[str] = ContextVar("session_id", default="")
client_identity: ContextVar[str] = ContextVar("client_identity", default="")
class MCPStructuredFormatter(logging.Formatter):
"""JSON-formatter voor de logs van de MCP-server."""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Contextvariabelen toevoegen
rid = request_id.get("")
if rid:
log_entry["request_id"] = rid
sid = session_id.get("")
if sid:
log_entry["session_id"] = sid
cid = client_identity.get("")
if cid:
log_entry["client"] = cid
# Extra velden toevoegen
if hasattr(record, "tool_name"):
log_entry["tool_name"] = record.tool_name
if hasattr(record, "duration_ms"):
log_entry["duration_ms"] = record.duration_ms
if hasattr(record, "token_count"):
log_entry["token_count"] = record.token_count
if hasattr(record, "security_event"):
log_entry["security_event"] = record.security_event
# Exception-info toevoegen
if record.exc_info and record.exc_info[1]:
log_entry["exception"] = {
"type": type(record.exc_info[1]).__name__,
"message": str(record.exc_info[1]),
}
return json.dumps(log_entry)
def configure_logging(log_level: str = "INFO"):
"""Gestructureerde logging voor de MCP-server configureren."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(MCPStructuredFormatter())
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level.upper()))
root_logger.addHandler(handler)
# Beveiligingsspecifieke logger
security_logger = logging.getLogger("mcp.security")
security_handler = logging.FileHandler("/var/log/mcp/security.json")
security_handler.setFormatter(MCPStructuredFormatter())
security_logger.addHandler(security_handler)
return root_logger
# Audit logger voor tool calls
class ToolCallAuditor:
"""Logt alle MCP-tool calls voor het audit trail."""
def __init__(self):
self.logger = logging.getLogger("mcp.audit")
def log_tool_call(self, tool_name: str, arguments: dict,
result_summary: str, duration_ms: float,
status: str = "success"):
"""Een complete tool call loggen met opgeschoonde parameters."""
# Argumenten opschonen (mogelijke geheimen verwijderen)
safe_args = {}
for key, value in arguments.items():
if any(s in key.lower() for s in ["password", "secret", "token", "key"]):
safe_args[key] = "***REDACTED***"
elif isinstance(value, str) and len(value) > 500:
safe_args[key] = value[:500] + f"...[truncated, {len(value)} chars total]"
else:
safe_args[key] = value
self.logger.info(
"Tool call: %s",
json.dumps({
"event": "tool_call",
"tool": tool_name,
"arguments": safe_args,
"result_summary": result_summary[:200],
"duration_ms": round(duration_ms, 2),
"status": status,
"session_id": session_id.get(""),
"client": client_identity.get(""),
"request_id": request_id.get(""),
}),
)Prometheus metrics-exporter
"""
Prometheus-metrics voor de monitoring van de MCP-server.
"""
from prometheus_client import (
Counter, Histogram, Gauge, Info, generate_latest, CONTENT_TYPE_LATEST
)
from starlette.requests import Request
from starlette.responses import Response
# Serverinfo
mcp_server_info = Info("mcp_server", "MCP server information")
mcp_server_info.info({
"version": "1.0.0",
"sdk_version": "1.26.0",
"transport": "http_sse",
})
# Metrics voor tool calls
tool_calls_total = Counter(
"mcp_tool_calls_total",
"Total number of MCP tool calls",
["tool_name", "status"],
)
tool_call_duration = Histogram(
"mcp_tool_call_duration_seconds",
"Duration of MCP tool calls",
["tool_name"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0],
)
tool_call_tokens = Histogram(
"mcp_tool_call_tokens",
"Token count per tool call (output)",
["tool_name"],
buckets=[100, 500, 1000, 5000, 10000, 50000, 100000],
)
# Sessie-metrics
active_sessions = Gauge(
"mcp_active_sessions",
"Number of active MCP sessions",
)
session_duration = Histogram(
"mcp_session_duration_seconds",
"Duration of MCP sessions",
buckets=[60, 300, 600, 1800, 3600, 7200],
)
# Beveiligingsmetrics
auth_attempts_total = Counter(
"mcp_auth_attempts_total",
"Authentication attempts",
["status"], # success, failure, expired, rate_limited
)
security_events_total = Counter(
"mcp_security_events_total",
"Security events detected",
["event_type", "severity"],
)
injection_attempts_total = Counter(
"mcp_injection_attempts_total",
"Injection attempts detected",
["injection_type"], # command, path_traversal, sql, prompt
)
# Budget-metrics
token_spending = Counter(
"mcp_token_spending_total",
"Total tokens consumed",
["server_name", "operation"],
)
cost_spending = Counter(
"mcp_cost_usd_total",
"Total cost in USD",
["server_name"],
)
sampling_requests_total = Counter(
"mcp_sampling_requests_total",
"MCP sampling requests",
["server_name", "status"],
)
async def metrics_endpoint(request: Request) -> Response:
"""Prometheus metrics-endpoint."""
return Response(
content=generate_latest(),
media_type=CONTENT_TYPE_LATEST,
)Configuratie van het netwerkbeleid
# Kubernetes NetworkPolicy voor MCP-serverpods
# Beperkt alle verkeer tot expliciet toegestane flows
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: mcp-server-network-policy
namespace: mcp
spec:
podSelector:
matchLabels:
app: mcp-server
policyTypes:
- Ingress
- Egress
ingress:
# Alleen verkeer vanaf de nginx-proxy toestaan
- from:
- podSelector:
matchLabels:
app: mcp-nginx-proxy
ports:
- port: 8080
protocol: TCP
# Prometheus-scraping toestaan
- from:
- podSelector:
matchLabels:
app: prometheus
ports:
- port: 9090
protocol: TCP
egress:
# DNS-resolutie toestaan
- to:
- namespaceSelector: {}
podSelector:
matchLabels:
k8s-app: kube-dns
ports:
- port: 53
protocol: UDP
- port: 53
protocol: TCP
# Toegang tot de interne database toestaan (indien nodig)
- to:
- podSelector:
matchLabels:
app: postgresql
ports:
- port: 5432
protocol: TCP
# Al het overige egress-verkeer WEIGEREN (geen internettoegang)# docker-compose-netwerkequivalent met iptables
# Toepassen met: iptables-restore < mcp-iptables.rules
# mcp-iptables.rules
*filter
:INPUT DROP [0:0]
:FORWARD DROP [0:0]
:OUTPUT DROP [0:0]
# Loopback toestaan
-A INPUT -i lo -j ACCEPT
-A OUTPUT -o lo -j ACCEPT
# Bestaande verbindingen toestaan
-A INPUT -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPT
-A OUTPUT -m conntrack --ctstate ESTABLISHED,RELATED -j ACCEPT
# nginx -> mcp-server op poort 8080 toestaan
-A INPUT -s 172.18.0.0/16 -p tcp --dport 8080 -j ACCEPT
# DNS-resolutie toestaan
-A OUTPUT -p udp --dport 53 -j ACCEPT
-A OUTPUT -p tcp --dport 53 -j ACCEPT
# Toegang tot de interne database toestaan
-A OUTPUT -d 172.18.0.0/16 -p tcp --dport 5432 -j ACCEPT
# Al het andere loggen en droppen
-A INPUT -j LOG --log-prefix "MCP-DROP-IN: " --log-level 4
-A INPUT -j DROP
-A OUTPUT -j LOG --log-prefix "MCP-DROP-OUT: " --log-level 4
-A OUTPUT -j DROP
COMMITAlerting-regels
# Prometheus alerting-regels voor de monitoring van de MCP-server
# prometheus-alerts.yaml
groups:
- name: mcp_security
rules:
- alert: MCPAuthFailureSpike
expr: rate(mcp_auth_attempts_total{status="failure"}[5m]) > 1
for: 2m
labels:
severity: high
annotations:
summary: "High rate of MCP authentication failures"
description: "More than 1 auth failure per second for 2+ minutes"
- alert: MCPInjectionAttempt
expr: increase(mcp_injection_attempts_total[5m]) > 0
labels:
severity: critical
annotations:
summary: "Injection attempt detected on MCP server"
description: "{{ $labels.injection_type }} injection detected"
- alert: MCPBudgetExceeded
expr: mcp_cost_usd_total > 100
labels:
severity: high
annotations:
summary: "MCP daily spending limit exceeded"
- alert: MCPSamplingAbuse
expr: rate(mcp_sampling_requests_total[5m]) > 0.5
for: 1m
labels:
severity: critical
annotations:
summary: "Excessive MCP sampling requests (possible DoW attack)"
- alert: MCPServerDown
expr: up{job="mcp-server"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "MCP server is down"Referenties
- Endor Labs: "Hardening MCP Deployments in Production" -- patronen voor enterprise-uitrollen
- Pomerium: "Zero Trust Proxy for MCP" -- configuratiegids voor een reverse proxy
- OWASP ASI: Agentic Security Initiative -- hardening-vereisten voor uitrollen
- Docker Security Best Practices: Docker Security Documentation
- Nginx Security: Nginx Security Controls
- Prometheus Monitoring: Prometheus Best Practices
- MCP Security Guide: Volledig overzicht van MCP-beveiliging