Datalek tussen MCP-clients: falen van sessie-isolatie voorkomen
Een verdedigingsgerichte handleiding om datalekken tussen MCP-clientsessies te begrijpen en te voorkomen, met een analyse van CVE-2026-25536 en het implementeren van sessie-geïsoleerd state-beheer om multi-tenant MCP-deployments te beschermen.
Cross-client data leak-kwetsbaarheden in MCP-servers ontstaan wanneer meerdere clients verbinding maken met dezelfde server-instance en die server de state per sessie niet isoleert. Anders dan bij traditionele webapplicaties, waar sessie-isolatie een goed begrepen patroon is, houden MCP-servers vaak persistente state bij (file handles, databaseverbindingen, gecachte resultaten, gespreksgeschiedenis) die zorgvuldig tot individuele sessies beperkt moet worden.
Het datalek tussen clients begrijpen
Hoe MCP-servers state beheren
MCP-servers houden state bij voor verschillende doeleinden:
Categorieën van MCP-server-state:
┌──────────────────────────────────────────────────────┐
│ Globale state (gedeeld -- bedoeld) │
│ - Tooldefinities │
│ - Serverconfiguratie │
│ - Statische resources │
├──────────────────────────────────────────────────────┤
│ State per sessie (moet geïsoleerd zijn) │
│ - Gesprekscontext / -geschiedenis │
│ - Gebruikersspecifieke bestandstoegang │
│ - Gecachte query-resultaten │
│ - Authenticatiecontext │
│ - Tijdelijke bestanden aangemaakt tijdens sessie │
│ - Resource-subscriptions │
├──────────────────────────────────────────────────────┤
│ State per request (vluchtig) │
│ - Huidige tool-call-argumenten │
│ - Lopende berekening │
│ - Request-scoped locks │
└──────────────────────────────────────────────────────┘
De kwetsbaarheid ontstaat wanneer state per sessie wordt opgeslagen in wat in feite globale state is -- meestal omdat de server één enkele instance gebruikt om alle clientverbindingen af te handelen.
CVE-2026-25536: De kwetsbaarheid
Vóór MCP SDK-versie 1.26.0 zag het standaard serverpatroon er zo uit:
# KWETSBAAR PATROON -- MCP SDK van vóór 1.26.0
# Dit is het patroon uit tutorials en documentatie
from mcp.server import Server
server = Server("my-server")
# State op serverniveau -- GEDEELD tussen alle clients
conversation_history = []
user_files = {}
cached_results = {}
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "remember":
# Deze data is zichtbaar voor ALLE verbonden clients
conversation_history.append(arguments["text"])
return [{"type": "text", "text": "Remembered."}]
if name == "recall":
# ELKE client kan de data van ELKE andere client lezen
return [{"type": "text", "text": "\n".join(conversation_history)}]
if name == "save_file":
# Bestanden opgeslagen door Client A zijn toegankelijk voor Client B
user_files[arguments["name"]] = arguments["content"]
return [{"type": "text", "text": f"Saved {arguments['name']}"}]
if name == "read_file":
# Client B leest het privébestand van Client A
content = user_files.get(arguments["name"], "Not found")
return [{"type": "text", "text": content}]Het probleem: wanneer meerdere clients verbinding maken via HTTP+SSE-transport, delen ze allemaal dezelfde server-instance en de bijbehorende state op moduleniveau. Er is geen mechanisme om de state tot individuele sessies te beperken.
Het aanvalsscenario
Tijdlijn van een datalek tussen clients:
T1: Client A (legitieme gebruiker) maakt verbinding met de MCP-server
└─> Server maakt SSE-verbinding A aan
T2: Client A roept "save_file" aan met name="credentials.json",
content='{"api_key": "sk-secret-abc123"}'
└─> Server slaat op in de gedeelde user_files-dict
T3: Client A roept "remember" aan met text="Deploy to prod using key sk-secret-abc123"
└─> Server voegt toe aan de gedeelde conversation_history
T4: Client B (aanvaller) maakt verbinding met DEZELFDE MCP-server
└─> Server maakt SSE-verbinding B aan (dezelfde server-instance)
T5: Client B roept "read_file" aan met name="credentials.json"
└─> Server geeft de credentials van Client A terug: {"api_key": "sk-secret-abc123"}
T6: Client B roept "recall" aan
└─> Server geeft de gespreksgeschiedenis van Client A terug
└─> Aanvaller verkrijgt: "Deploy to prod using key sk-secret-abc123"
Analyse op protocolniveau
De grondoorzaak ligt in hoe de MCP SDK het transport aan de server koppelde:
# Interne werking van de SDK van vóór 1.26.0 (vereenvoudigd)
# Eén enkele Server-instance handelt ALLE sessies af
class SseServerTransport:
def __init__(self, endpoint: str):
self.endpoint = endpoint
self._sessions = {} # session_id -> SSE-verbinding
async def connect_sse(self, scope, receive, send):
session_id = generate_session_id()
# Maakt een nieuwe SSE-verbinding maar routeert naar DEZELFDE server
connection = SSEConnection(session_id, send)
self._sessions[session_id] = connection
# Alle sessies delen server.state
return (ReadStream(connection), WriteStream(connection))Het transport maakt correct aparte verbindingen aan per client, maar alle verbindingen routeren naar dezelfde Server-instance met gedeelde state.
Sessie-geïsoleerd state-beheer implementeren
Het gerepareerde patroon (SDK >= 1.26.0)
De MCP SDK 1.26.0+ introduceert state met sessie-scope:
# VERDEDIGD -- Sessie-geïsoleerde state (SDK >= 1.26.0)
from mcp.server import Server
from mcp.server.session import ServerSession
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Any
import asyncio
# Context-variabele die de huidige sessie bevat
current_session: ContextVar[str] = ContextVar("current_session")
@dataclass
class SessionState:
"""State-container per sessie."""
session_id: str
conversation_history: list[str] = field(default_factory=list)
user_files: dict[str, str] = field(default_factory=dict)
cached_results: dict[str, Any] = field(default_factory=dict)
created_at: float = 0.0
last_accessed: float = 0.0
class SessionIsolatedServer:
"""
MCP-server met strikte sessie-isolatie.
Elke clientsessie krijgt zijn eigen state-container.
"""
def __init__(self):
self.server = Server("isolated-server")
self._sessions: dict[str, SessionState] = {}
self._lock = asyncio.Lock()
# Tools registreren
self.server.call_tool()(self._handle_tool_call)
self.server.list_tools()(self._list_tools)
def _get_session_state(self) -> SessionState:
"""Haal de state voor de huidige sessie op. Werpt een fout als er geen sessie is."""
session_id = current_session.get()
state = self._sessions.get(session_id)
if state is None:
raise RuntimeError(f"No state for session: {session_id}")
import time
state.last_accessed = time.time()
return state
async def create_session(self, session_id: str) -> SessionState:
"""Maak een nieuwe geïsoleerde sessie-state aan."""
import time
async with self._lock:
if session_id in self._sessions:
raise ValueError(f"Session already exists: {session_id}")
state = SessionState(
session_id=session_id,
created_at=time.time(),
last_accessed=time.time(),
)
self._sessions[session_id] = state
return state
async def destroy_session(self, session_id: str):
"""Ruim de sessie-state op wanneer een client de verbinding verbreekt."""
async with self._lock:
state = self._sessions.pop(session_id, None)
if state:
# Wis gevoelige data op een veilige manier
state.conversation_history.clear()
state.user_files.clear()
state.cached_results.clear()
async def _list_tools(self):
return [
{
"name": "remember",
"description": "Store text in session memory",
"inputSchema": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
},
},
{
"name": "recall",
"description": "Retrieve stored session memory",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "save_file",
"description": "Save a file in the session workspace",
"inputSchema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"content": {"type": "string"},
},
"required": ["name", "content"],
},
},
{
"name": "read_file",
"description": "Read a file from the session workspace",
"inputSchema": {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
},
},
]
async def _handle_tool_call(self, name: str, arguments: dict):
# Alle toegang tot state is gescoped tot de huidige sessie
state = self._get_session_state()
if name == "remember":
state.conversation_history.append(arguments["text"])
return [{"type": "text", "text": "Remembered (session-scoped)."}]
if name == "recall":
# Geeft alleen de geschiedenis van DEZE sessie terug
return [{"type": "text", "text": "\n".join(state.conversation_history)}]
if name == "save_file":
state.user_files[arguments["name"]] = arguments["content"]
return [{"type": "text", "text": f"Saved {arguments['name']} (session-scoped)."}]
if name == "read_file":
content = state.user_files.get(arguments["name"], "Not found in this session")
return [{"type": "text", "text": content}]
return [{"type": "text", "text": f"Unknown tool: {name}"}]Sessiebewuste transport-wrapper
"""
Transport-wrapper die ervoor zorgt dat de sessiecontext voor elke request ingesteld is.
"""
import uuid
import logging
from contextvars import copy_context
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from mcp.server.sse import SseServerTransport
logger = logging.getLogger("mcp.session")
class SessionAwareSseTransport:
"""
Wikkelt SseServerTransport in om de levenscyclus van sessies te beheren
en de sessiecontext voor elke verbinding in te stellen.
"""
def __init__(self, server: "SessionIsolatedServer", endpoint: str = "/messages/"):
self.server = server
self.sse = SseServerTransport(endpoint)
self._active_sessions: dict[str, str] = {} # verbinding -> session_id
async def handle_sse(self, request: Request):
"""Handel een nieuwe SSE-verbinding af, inclusief het aanmaken van een sessie."""
session_id = str(uuid.uuid4())
# Maak geïsoleerde sessie-state aan
await self.server.create_session(session_id)
logger.info("Session created: %s from %s", session_id, request.client.host)
try:
async with self.sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
# Stel de sessiecontext in voor deze verbinding
current_session.set(session_id)
await self.server.server.run(
streams[0], streams[1],
self.server.server.create_initialization_options()
)
finally:
# Ruim de sessie-state op bij het verbreken van de verbinding
await self.server.destroy_session(session_id)
logger.info("Session destroyed: %s", session_id)
def create_app(self) -> Starlette:
"""Maak een Starlette-app met sessiebewuste routing aan."""
return Starlette(
routes=[
Route("/sse", endpoint=self.handle_sse),
Mount("/messages/", app=self.sse.handle_post_message),
]
)Sessie-isolatie op bestandssysteemniveau
Isoleer voor MCP-servers die bestandsbewerkingen uitvoeren het bestandssysteem van elke sessie:
"""
Sessie-isolatie van het bestandssysteem voor MCP-servers.
Elke sessie krijgt een eigen workspace-directory.
"""
import os
import shutil
import tempfile
import hashlib
from pathlib import Path
from contextlib import contextmanager
class SessionFilesystem:
"""
Biedt geïsoleerde bestandssysteemtoegang voor elke MCP-sessie.
"""
def __init__(self, base_dir: str = "/var/mcp/sessions"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
os.chmod(str(self.base_dir), 0o700)
def create_workspace(self, session_id: str) -> Path:
"""Maak een geïsoleerde workspace-directory aan voor een sessie."""
# Gebruik een gehashte session-ID als directorynaam (voorkomt path injection)
safe_name = hashlib.sha256(session_id.encode()).hexdigest()[:16]
workspace = self.base_dir / f"session_{safe_name}"
workspace.mkdir(exist_ok=True)
os.chmod(str(workspace), 0o700)
return workspace
def destroy_workspace(self, session_id: str):
"""Verwijder de workspace van een sessie op een veilige manier."""
safe_name = hashlib.sha256(session_id.encode()).hexdigest()[:16]
workspace = self.base_dir / f"session_{safe_name}"
if workspace.exists():
# Overschrijf bestanden voor verwijdering vanwege gevoelige data
for file_path in workspace.rglob("*"):
if file_path.is_file():
size = file_path.stat().st_size
with open(file_path, 'wb') as f:
f.write(b'\x00' * size)
shutil.rmtree(str(workspace))
def resolve_path(self, session_id: str, relative_path: str) -> Path:
"""
Resolve een pad binnen de workspace van de sessie.
Voorkomt path traversal-aanvallen.
"""
safe_name = hashlib.sha256(session_id.encode()).hexdigest()[:16]
workspace = self.base_dir / f"session_{safe_name}"
if not workspace.exists():
raise FileNotFoundError(f"Session workspace does not exist")
# Resolve en valideer
resolved = (workspace / relative_path).resolve()
if not str(resolved).startswith(str(workspace.resolve())):
raise PermissionError(
f"Path traversal detected: {relative_path} resolves outside workspace"
)
return resolved
def get_usage(self, session_id: str) -> dict:
"""Haal statistieken op over het bestandssysteemgebruik van een sessie."""
safe_name = hashlib.sha256(session_id.encode()).hexdigest()[:16]
workspace = self.base_dir / f"session_{safe_name}"
if not workspace.exists():
return {"exists": False}
total_size = 0
file_count = 0
for file_path in workspace.rglob("*"):
if file_path.is_file():
total_size += file_path.stat().st_size
file_count += 1
return {
"exists": True,
"total_size_bytes": total_size,
"file_count": file_count,
"path": str(workspace),
}
# Quota afdwingen
MAX_SESSION_SIZE_BYTES = 100 * 1024 * 1024 # 100 MB per sessie
MAX_SESSION_FILES = 1000
def enforce_quota(fs: SessionFilesystem, session_id: str) -> bool:
"""Controleer of een sessie binnen zijn bestandssysteemquotum blijft."""
usage = fs.get_usage(session_id)
if not usage["exists"]:
return True
if usage["total_size_bytes"] > MAX_SESSION_SIZE_BYTES:
return False
if usage["file_count"] > MAX_SESSION_FILES:
return False
return TrueIsolatie van databaseverbindingen
MCP-servers die databases benaderen, moeten verbindingen per sessie isoleren:
"""
Sessie-isolatie van de database voor MCP-servers.
Elke sessie krijgt zijn eigen verbinding met een row-level security-context.
"""
import asyncpg
import logging
from contextlib import asynccontextmanager
logger = logging.getLogger("mcp.db.isolation")
class SessionDatabasePool:
"""
Beheert databaseverbindingen per sessie met row-level security.
"""
def __init__(self, dsn: str, max_connections_per_session: int = 2):
self.dsn = dsn
self.max_per_session = max_connections_per_session
self._pools: dict[str, asyncpg.Pool] = {}
async def get_connection(self, session_id: str):
"""Haal een databaseverbinding op die tot de sessie gescoped is."""
if session_id not in self._pools:
# Maak een eigen pool aan voor deze sessie
self._pools[session_id] = await asyncpg.create_pool(
self.dsn,
min_size=1,
max_size=self.max_per_session,
command_timeout=30,
)
pool = self._pools[session_id]
conn = await pool.acquire()
# Stel een variabele op sessieniveau in voor row-level security
await conn.execute(
"SET mcp.session_id = $1", session_id
)
# Zet voor de veiligheid op alleen-lezen
await conn.execute("SET default_transaction_read_only = ON")
return conn
async def release_session(self, session_id: str):
"""Geef alle databaseresources voor een sessie vrij."""
pool = self._pools.pop(session_id, None)
if pool:
await pool.close()
logger.info("Database pool closed for session: %s", session_id)
# Opzet van PostgreSQL row-level security
RLS_SETUP_SQL = """
-- Schakel row-level security in op tabellen die door MCP benaderd worden
ALTER TABLE user_documents ENABLE ROW LEVEL SECURITY;
-- Policy: sessies kunnen alleen rijen zien die getagd zijn met hun session_id
CREATE POLICY mcp_session_isolation ON user_documents
USING (session_tag = current_setting('mcp.session_id'));
-- Maak een MCP-specifieke alleen-lezen rol aan
CREATE ROLE mcp_reader NOLOGIN;
GRANT SELECT ON user_documents TO mcp_reader;
GRANT USAGE ON SCHEMA public TO mcp_reader;
-- Sessiegebruiker erft de beperkte rol
CREATE ROLE mcp_session LOGIN PASSWORD 'generated_secure_password';
GRANT mcp_reader TO mcp_session;
"""Monitoren op datatoegang tussen sessies
"""
Monitor voor het detecteren van mogelijke datalekken tussen sessies in MCP-servers.
"""
import json
import logging
from datetime import datetime, timedelta
from collections import defaultdict
from dataclasses import dataclass, field
logger = logging.getLogger("mcp.session.monitor")
@dataclass
class SessionAccessEvent:
"""Registratie van een datatoegang binnen een sessie."""
timestamp: datetime
session_id: str
tool_name: str
resource_key: str # bestandsnaam, query, enz.
access_type: str # read, write, delete
data_size: int = 0
class CrossSessionLeakDetector:
"""
Detecteert mogelijke datalekken tussen MCP-sessies door
patronen in datatoegang te volgen en anomalieën te markeren.
"""
def __init__(self, alert_callback=None):
self.alert_callback = alert_callback or self._default_alert
# Houd bij welke sessie elke resource heeft geschreven
self._resource_owners: dict[str, str] = {}
# Houd toegangspatronen per sessie bij
self._session_reads: dict[str, list[SessionAccessEvent]] = defaultdict(list)
self._session_writes: dict[str, list[SessionAccessEvent]] = defaultdict(list)
def record_access(self, event: SessionAccessEvent):
"""Registreer en analyseer een datatoegang-event."""
if event.access_type == "write":
self._resource_owners[event.resource_key] = event.session_id
self._session_writes[event.session_id].append(event)
elif event.access_type == "read":
self._session_reads[event.session_id].append(event)
# Controleer of deze sessie data leest die van een andere sessie is
owner = self._resource_owners.get(event.resource_key)
if owner and owner != event.session_id:
self.alert_callback({
"alert_type": "cross_session_data_access",
"severity": "critical",
"reading_session": event.session_id,
"owning_session": owner,
"resource": event.resource_key,
"tool": event.tool_name,
"timestamp": event.timestamp.isoformat(),
"description": (
f"Session {event.session_id} read resource "
f"'{event.resource_key}' owned by session {owner}"
),
})
def session_ended(self, session_id: str):
"""Ruim de trackingdata op wanneer een sessie eindigt."""
# Verwijder de eigendomsregistraties voor de resources van deze sessie
self._resource_owners = {
k: v for k, v in self._resource_owners.items()
if v != session_id
}
self._session_reads.pop(session_id, None)
self._session_writes.pop(session_id, None)
def get_isolation_report(self) -> dict:
"""Genereer een rapport over de gezondheid van de sessie-isolatie."""
return {
"active_sessions": len(set(
list(self._session_reads.keys()) +
list(self._session_writes.keys())
)),
"tracked_resources": len(self._resource_owners),
"total_reads": sum(
len(events) for events in self._session_reads.values()
),
"total_writes": sum(
len(events) for events in self._session_writes.values()
),
}
def _default_alert(self, alert: dict):
logger.critical(json.dumps({
"event": "mcp_cross_session_leak",
**alert,
}))Sessie-isolatie testen
"""
Testsuite om sessie-isolatie in MCP-server-implementaties te verifiëren.
"""
import pytest
import asyncio
import aiohttp
import json
class TestSessionIsolation:
"""
Integratietests die verifiëren dat data niet tussen sessies lekt.
Voer deze uit tegen je MCP-server-deployment.
"""
MCP_SERVER_URL = "http://localhost:8080"
async def _create_session(self) -> aiohttp.ClientSession:
"""Maak een nieuwe clientsessie verbinding met de MCP-server."""
session = aiohttp.ClientSession()
# Initialiseer de MCP-verbinding
async with session.get(f"{self.MCP_SERVER_URL}/sse") as resp:
assert resp.status == 200
return session
async def _call_tool(self, session: aiohttp.ClientSession,
tool: str, args: dict, msg_id: int = 1) -> dict:
"""Roep een MCP-tool aan en geef het resultaat terug."""
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"id": msg_id,
"params": {"name": tool, "arguments": args},
}
async with session.post(
f"{self.MCP_SERVER_URL}/messages/",
json=payload
) as resp:
return await resp.json()
@pytest.mark.asyncio
async def test_session_write_not_visible_to_other_session(self):
"""Data geschreven door sessie A mag NIET leesbaar zijn door sessie B."""
session_a = await self._create_session()
session_b = await self._create_session()
try:
# Sessie A schrijft een geheim
await self._call_tool(session_a, "save_file", {
"name": "secret.txt",
"content": "TOP_SECRET_DATA_12345",
})
# Sessie B probeert het te lezen
result = await self._call_tool(session_b, "read_file", {
"name": "secret.txt",
})
# Sessie B zou de data van sessie A NIET mogen zien
response_text = result["result"]["content"][0]["text"]
assert "TOP_SECRET_DATA_12345" not in response_text, \
"CRITICAL: Cross-session data leak detected!"
finally:
await session_a.close()
await session_b.close()
@pytest.mark.asyncio
async def test_session_memory_not_shared(self):
"""Gespreksgeschiedenis moet gescoped zijn tot de sessie."""
session_a = await self._create_session()
session_b = await self._create_session()
try:
# Sessie A slaat geheugen op
await self._call_tool(session_a, "remember", {
"text": "My API key is sk-secret-abc123",
})
# Sessie B haalt het op
result = await self._call_tool(session_b, "recall", {})
response_text = result["result"]["content"][0]["text"]
assert "sk-secret-abc123" not in response_text, \
"CRITICAL: Cross-session memory leak detected!"
finally:
await session_a.close()
await session_b.close()
@pytest.mark.asyncio
async def test_session_cleanup_on_disconnect(self):
"""Sessiedata moet worden opgeruimd wanneer de client de verbinding verbreekt."""
session_a = await self._create_session()
# Schrijf data
await self._call_tool(session_a, "save_file", {
"name": "temp_data.txt",
"content": "SENSITIVE_DATA",
})
# Verbreek de verbinding
await session_a.close()
await asyncio.sleep(1) # Geef tijd voor het opruimen
# Maak verbinding als nieuwe sessie
session_c = await self._create_session()
try:
result = await self._call_tool(session_c, "read_file", {
"name": "temp_data.txt",
})
response_text = result["result"]["content"][0]["text"]
assert "SENSITIVE_DATA" not in response_text, \
"CRITICAL: Stale session data accessible after disconnect!"
finally:
await session_c.close()
@pytest.mark.asyncio
async def test_concurrent_sessions_isolated(self):
"""Meerdere gelijktijdige sessies moeten de isolatie behouden."""
sessions = [await self._create_session() for _ in range(5)]
try:
# Elke sessie schrijft unieke data
for i, session in enumerate(sessions):
await self._call_tool(session, "save_file", {
"name": "identity.txt",
"content": f"I am session {i}",
})
# Elke sessie zou alleen zijn eigen data mogen lezen
for i, session in enumerate(sessions):
result = await self._call_tool(session, "read_file", {
"name": "identity.txt",
})
text = result["result"]["content"][0]["text"]
assert f"I am session {i}" in text, \
f"Session {i} got wrong data: {text}"
# Verifieer dat er geen data van een andere sessie aanwezig is
for j in range(5):
if j != i:
assert f"I am session {j}" not in text, \
f"Session {i} can see session {j}'s data!"
finally:
for session in sessions:
await session.close()SDK-versievereisten
{
"minimum_safe_versions": {
"python_mcp_sdk": ">=1.26.0",
"typescript_mcp_sdk": ">=1.26.0",
"description": "Versies onder 1.26.0 gebruiken standaard gedeelde server-state"
},
"verification_commands": {
"python": "pip show mcp | grep Version",
"npm": "npm list @modelcontextprotocol/sdk | grep sdk"
},
"changelog_notes": {
"1.26.0": [
"Sessie-gescoped state-beheer toegevoegd",
"ServerSession houdt nu state per verbinding bij",
"Breaking: server.state wordt niet langer gedeeld tussen sessies",
"Hooks voor de levenscyclus van sessies toegevoegd (on_connect, on_disconnect)"
]
}
}#!/bin/bash
# check-mcp-sdk-version.sh -- Verifieer of de MCP SDK voldoet aan de minimale beveiligingseisen
set -euo pipefail
MIN_VERSION="1.26.0"
echo "=== MCP SDK Security Version Check ==="
# Controleer de Python SDK
if command -v pip &> /dev/null; then
PY_VERSION=$(pip show mcp 2>/dev/null | grep "^Version:" | awk '{print $2}' || echo "not installed")
if [ "$PY_VERSION" = "not installed" ]; then
echo "[SKIP] Python MCP SDK not installed"
else
if python3 -c "
from packaging.version import Version
import sys
sys.exit(0 if Version('$PY_VERSION') >= Version('$MIN_VERSION') else 1)
" 2>/dev/null; then
echo "[PASS] Python MCP SDK: $PY_VERSION (>= $MIN_VERSION)"
else
echo "[FAIL] Python MCP SDK: $PY_VERSION (< $MIN_VERSION -- VULNERABLE to CVE-2026-25536)"
echo " Run: pip install --upgrade mcp>=$MIN_VERSION"
fi
fi
fi
# Controleer de TypeScript SDK
if command -v npm &> /dev/null; then
TS_VERSION=$(npm list @modelcontextprotocol/sdk 2>/dev/null | grep sdk | grep -oP '[\d.]+' | head -1 || echo "not installed")
if [ "$TS_VERSION" = "not installed" ]; then
echo "[SKIP] TypeScript MCP SDK not installed"
else
if npx semver -r ">=$MIN_VERSION" "$TS_VERSION" &>/dev/null; then
echo "[PASS] TypeScript MCP SDK: $TS_VERSION (>= $MIN_VERSION)"
else
echo "[FAIL] TypeScript MCP SDK: $TS_VERSION (< $MIN_VERSION -- VULNERABLE)"
echo " Run: npm install @modelcontextprotocol/sdk@latest"
fi
fi
fi
echo "=== Check Complete ==="Referenties
- CVE-2026-25536: Datalek tussen clients in gedeelde server-instances van de MCP SDK
- MCP SDK Changelog: Wijzigingen voor sessie-isolatie in versie 1.26.0
- MCP-beveiligingshandleiding: Sessiebeheer en state-isolatie
- OWASP Session Management Cheat Sheet: Best practices voor sessie-isolatie in webapplicaties
- CWE-488: Exposure of Data Element to Wrong Session -- MITRE-classificatie