MCP Cross-Client Data Leak: Preventing Session Isolation Failures
A defense-focused guide to understanding and preventing data leaks across MCP client sessions, analyzing CVE-2026-25536 and implementing session-isolated state management to protect multi-tenant MCP deployments.
Cross-client data leak vulnerabilities in MCP servers arise when multiple clients connect to the same server instance and that server fails to isolate per-session state. Unlike traditional web applications where session isolation is a well-understood pattern, MCP servers often maintain persistent state (file handles, database connections, cached results, conversation history) that must be carefully scoped to individual sessions.
Understanding the Cross-Client Data Leak
How MCP Servers Manage State
MCP servers maintain state for several purposes:
MCP Server State Categories:
┌──────────────────────────────────────────────────────┐
│ Global State (shared -- intended) │
│ - Tool definitions │
│ - Server configuration │
│ - Static resources │
├──────────────────────────────────────────────────────┤
│ Per-Session State (must be isolated) │
│ - Conversation context / history │
│ - User-specific file access │
│ - Cached query results │
│ - Authentication context │
│ - Temporary files created during session │
│ - Resource subscriptions │
├──────────────────────────────────────────────────────┤
│ Per-Request State (ephemeral) │
│ - Current tool call arguments │
│ - In-progress computation │
│ - Request-scoped locks │
└──────────────────────────────────────────────────────┘
The vulnerability arises when per-session state is stored in what is effectively global state -- typically because the server uses a single instance to handle all client connections.
CVE-2026-25536: The Vulnerability
Prior to MCP SDK version 1.26.0, the standard server pattern looked like this:
# VULNERABLE PATTERN -- Pre-1.26.0 MCP SDK
# This is the pattern from tutorials and documentation
from mcp.server import Server
server = Server("my-server")
# Server-level state -- SHARED across all clients
conversation_history = []
user_files = {}
cached_results = {}
@server.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "remember":
# This data is visible to ALL connected clients
conversation_history.append(arguments["text"])
return [{"type": "text", "text": "Remembered."}]
if name == "recall":
# ANY client can read ANY other client's data
return [{"type": "text", "text": "\n".join(conversation_history)}]
if name == "save_file":
# Files saved by Client A are accessible to Client B
user_files[arguments["name"]] = arguments["content"]
return [{"type": "text", "text": f"Saved {arguments['name']}"}]
if name == "read_file":
# Client B reads Client A's private file
content = user_files.get(arguments["name"], "Not found")
return [{"type": "text", "text": content}]The issue: when multiple clients connect via HTTP+SSE transport, they all share the same server instance and its associated module-level state. There is no mechanism to scope state to individual sessions.
The Attack Scenario
Timeline of cross-client data leak:
T1: Client A (legitimate user) connects to MCP server
└─> Server creates SSE connection A
T2: Client A calls "save_file" with name="credentials.json",
content='{"api_key": "sk-secret-abc123"}'
└─> Server stores in shared user_files dict
T3: Client A calls "remember" with text="Deploy to prod using key sk-secret-abc123"
└─> Server appends to shared conversation_history
T4: Client B (attacker) connects to the SAME MCP server
└─> Server creates SSE connection B (same server instance)
T5: Client B calls "read_file" with name="credentials.json"
└─> Server returns Client A's credentials: {"api_key": "sk-secret-abc123"}
T6: Client B calls "recall"
└─> Server returns Client A's conversation history
└─> Attacker obtains: "Deploy to prod using key sk-secret-abc123"
Protocol-Level Analysis
The root cause is in how the MCP SDK wired the transport to the server:
# Pre-1.26.0 SDK internals (simplified)
# A single Server instance handles ALL sessions
class SseServerTransport:
def __init__(self, endpoint: str):
self.endpoint = endpoint
self._sessions = {} # session_id -> SSE connection
async def connect_sse(self, scope, receive, send):
session_id = generate_session_id()
# Creates a new SSE connection but routes to THE SAME server
connection = SSEConnection(session_id, send)
self._sessions[session_id] = connection
# All sessions share server.state
return (ReadStream(connection), WriteStream(connection))The transport correctly creates separate connections per client, but all connections route to the same Server instance with shared state.
Implementing Session-Isolated State Management
The Fixed Pattern (SDK >= 1.26.0)
The MCP SDK 1.26.0+ introduces session-scoped state:
# DEFENDED -- Session-isolated state (SDK >= 1.26.0)
from mcp.server import Server
from mcp.server.session import ServerSession
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Any
import asyncio
# Context variable that holds the current session
current_session: ContextVar[str] = ContextVar("current_session")
@dataclass
class SessionState:
"""Per-session state container."""
session_id: str
conversation_history: list[str] = field(default_factory=list)
user_files: dict[str, str] = field(default_factory=dict)
cached_results: dict[str, Any] = field(default_factory=dict)
created_at: float = 0.0
last_accessed: float = 0.0
class SessionIsolatedServer:
"""
MCP server with strict session isolation.
Each client session gets its own state container.
"""
def __init__(self):
self.server = Server("isolated-server")
self._sessions: dict[str, SessionState] = {}
self._lock = asyncio.Lock()
# Register tools
self.server.call_tool()(self._handle_tool_call)
self.server.list_tools()(self._list_tools)
def _get_session_state(self) -> SessionState:
"""Get the state for the current session. Raises if no session."""
session_id = current_session.get()
state = self._sessions.get(session_id)
if state is None:
raise RuntimeError(f"No state for session: {session_id}")
import time
state.last_accessed = time.time()
return state
async def create_session(self, session_id: str) -> SessionState:
"""Create a new isolated session state."""
import time
async with self._lock:
if session_id in self._sessions:
raise ValueError(f"Session already exists: {session_id}")
state = SessionState(
session_id=session_id,
created_at=time.time(),
last_accessed=time.time(),
)
self._sessions[session_id] = state
return state
async def destroy_session(self, session_id: str):
"""Clean up session state when a client disconnects."""
async with self._lock:
state = self._sessions.pop(session_id, None)
if state:
# Securely clear sensitive data
state.conversation_history.clear()
state.user_files.clear()
state.cached_results.clear()
async def _list_tools(self):
return [
{
"name": "remember",
"description": "Store text in session memory",
"inputSchema": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
},
},
{
"name": "recall",
"description": "Retrieve stored session memory",
"inputSchema": {"type": "object", "properties": {}},
},
{
"name": "save_file",
"description": "Save a file in the session workspace",
"inputSchema": {
"type": "object",
"properties": {
"name": {"type": "string"},
"content": {"type": "string"},
},
"required": ["name", "content"],
},
},
{
"name": "read_file",
"description": "Read a file from the session workspace",
"inputSchema": {
"type": "object",
"properties": {"name": {"type": "string"}},
"required": ["name"],
},
},
]
async def _handle_tool_call(self, name: str, arguments: dict):
# All state access is scoped to the current session
state = self._get_session_state()
if name == "remember":
state.conversation_history.append(arguments["text"])
return [{"type": "text", "text": "Remembered (session-scoped)."}]
if name == "recall":
# Only returns THIS session's history
return [{"type": "text", "text": "\n".join(state.conversation_history)}]
if name == "save_file":
state.user_files[arguments["name"]] = arguments["content"]
return [{"type": "text", "text": f"Saved {arguments['name']} (session-scoped)."}]
if name == "read_file":
content = state.user_files.get(arguments["name"], "Not found in this session")
return [{"type": "text", "text": content}]
return [{"type": "text", "text": f"Unknown tool: {name}"}]Session-Aware Transport Wrapper
"""
Transport wrapper that ensures session context is set for every request.
"""
import uuid
import logging
from contextvars import copy_context
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.requests import Request
from mcp.server.sse import SseServerTransport
logger = logging.getLogger("mcp.session")
class SessionAwareSseTransport:
"""
Wraps SseServerTransport to manage session lifecycle
and set session context for each connection.
"""
def __init__(self, server: "SessionIsolatedServer", endpoint: str = "/messages/"):
self.server = server
self.sse = SseServerTransport(endpoint)
self._active_sessions: dict[str, str] = {} # connection -> session_id
async def handle_sse(self, request: Request):
"""Handle new SSE connection with session creation."""
session_id = str(uuid.uuid4())
# Create isolated session state
await self.server.create_session(session_id)
logger.info("Session created: %s from %s", session_id, request.client.host)
try:
async with self.sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
# Set session context for this connection
current_session.set(session_id)
await self.server.server.run(
streams[0], streams[1],
self.server.server.create_initialization_options()
)
finally:
# Clean up session state on disconnect
await self.server.destroy_session(session_id)
logger.info("Session destroyed: %s", session_id)
def create_app(self) -> Starlette:
"""Create Starlette app with session-aware routing."""
return Starlette(
routes=[
Route("/sse", endpoint=self.handle_sse),
Mount("/messages/", app=self.sse.handle_post_message),
]
)Filesystem-Level Session Isolation
For MCP servers that perform file operations, isolate each session's filesystem:
"""
Filesystem session isolation for MCP servers.
Each session gets a dedicated workspace directory.
"""
import os
import shutil
import tempfile
import hashlib
from pathlib import Path
from contextlib import contextmanager
class SessionFilesystem:
"""
Provides isolated filesystem access for each MCP session.
"""
def __init__(self, base_dir: str = "/var/mcp/sessions"):
self.base_dir = Path(base_dir)
self.base_dir.mkdir(parents=True, exist_ok=True)
os.chmod(str(self.base_dir), 0o700)
def create_workspace(self, session_id: str) -> Path:
"""Create an isolated workspace directory for a session."""
# Use hashed session ID for directory name (avoid path injection)
safe_name = hashlib.sha256(session_id.encode()).hexdigest()[:16]
workspace = self.base_dir / f"session_{safe_name}"
workspace.mkdir(exist_ok=True)
os.chmod(str(workspace), 0o700)
return workspace
def destroy_workspace(self, session_id: str):
"""Securely delete a session's workspace."""
safe_name = hashlib.sha256(session_id.encode()).hexdigest()[:16]
workspace = self.base_dir / f"session_{safe_name}"
if workspace.exists():
# Overwrite files before deletion for sensitive data
for file_path in workspace.rglob("*"):
if file_path.is_file():
size = file_path.stat().st_size
with open(file_path, 'wb') as f:
f.write(b'\x00' * size)
shutil.rmtree(str(workspace))
def resolve_path(self, session_id: str, relative_path: str) -> Path:
"""
Resolve a path within the session workspace.
Prevents path traversal attacks.
"""
safe_name = hashlib.sha256(session_id.encode()).hexdigest()[:16]
workspace = self.base_dir / f"session_{safe_name}"
if not workspace.exists():
raise FileNotFoundError(f"Session workspace does not exist")
# Resolve and validate
resolved = (workspace / relative_path).resolve()
if not str(resolved).startswith(str(workspace.resolve())):
raise PermissionError(
f"Path traversal detected: {relative_path} resolves outside workspace"
)
return resolved
def get_usage(self, session_id: str) -> dict:
"""Get filesystem usage statistics for a session."""
safe_name = hashlib.sha256(session_id.encode()).hexdigest()[:16]
workspace = self.base_dir / f"session_{safe_name}"
if not workspace.exists():
return {"exists": False}
total_size = 0
file_count = 0
for file_path in workspace.rglob("*"):
if file_path.is_file():
total_size += file_path.stat().st_size
file_count += 1
return {
"exists": True,
"total_size_bytes": total_size,
"file_count": file_count,
"path": str(workspace),
}
# Quota enforcement
MAX_SESSION_SIZE_BYTES = 100 * 1024 * 1024 # 100MB per session
MAX_SESSION_FILES = 1000
def enforce_quota(fs: SessionFilesystem, session_id: str) -> bool:
"""Check if a session is within its filesystem quota."""
usage = fs.get_usage(session_id)
if not usage["exists"]:
return True
if usage["total_size_bytes"] > MAX_SESSION_SIZE_BYTES:
return False
if usage["file_count"] > MAX_SESSION_FILES:
return False
return TrueDatabase Connection Isolation
MCP servers that access databases must isolate connections per session:
"""
Database session isolation for MCP servers.
Each session gets its own connection with row-level security context.
"""
import asyncpg
import logging
from contextlib import asynccontextmanager
logger = logging.getLogger("mcp.db.isolation")
class SessionDatabasePool:
"""
Manages per-session database connections with row-level security.
"""
def __init__(self, dsn: str, max_connections_per_session: int = 2):
self.dsn = dsn
self.max_per_session = max_connections_per_session
self._pools: dict[str, asyncpg.Pool] = {}
async def get_connection(self, session_id: str):
"""Get a database connection scoped to the session."""
if session_id not in self._pools:
# Create a dedicated pool for this session
self._pools[session_id] = await asyncpg.create_pool(
self.dsn,
min_size=1,
max_size=self.max_per_session,
command_timeout=30,
)
pool = self._pools[session_id]
conn = await pool.acquire()
# Set session-level variable for row-level security
await conn.execute(
"SET mcp.session_id = $1", session_id
)
# Set read-only for safety
await conn.execute("SET default_transaction_read_only = ON")
return conn
async def release_session(self, session_id: str):
"""Release all database resources for a session."""
pool = self._pools.pop(session_id, None)
if pool:
await pool.close()
logger.info("Database pool closed for session: %s", session_id)
# PostgreSQL row-level security setup
RLS_SETUP_SQL = """
-- Enable row-level security on tables accessed by MCP
ALTER TABLE user_documents ENABLE ROW LEVEL SECURITY;
-- Policy: sessions can only see rows tagged with their session_id
CREATE POLICY mcp_session_isolation ON user_documents
USING (session_tag = current_setting('mcp.session_id'));
-- Create MCP-specific read-only role
CREATE ROLE mcp_reader NOLOGIN;
GRANT SELECT ON user_documents TO mcp_reader;
GRANT USAGE ON SCHEMA public TO mcp_reader;
-- Session user inherits limited role
CREATE ROLE mcp_session LOGIN PASSWORD 'generated_secure_password';
GRANT mcp_reader TO mcp_session;
"""Monitoring for Cross-Session Data Access
"""
Monitor for detecting potential cross-session data leaks in MCP servers.
"""
import json
import logging
from datetime import datetime, timedelta
from collections import defaultdict
from dataclasses import dataclass, field
logger = logging.getLogger("mcp.session.monitor")
@dataclass
class SessionAccessEvent:
"""Record of a data access within a session."""
timestamp: datetime
session_id: str
tool_name: str
resource_key: str # file name, query, etc.
access_type: str # read, write, delete
data_size: int = 0
class CrossSessionLeakDetector:
"""
Detects potential data leaks between MCP sessions by tracking
data access patterns and flagging anomalies.
"""
def __init__(self, alert_callback=None):
self.alert_callback = alert_callback or self._default_alert
# Track which session wrote each resource
self._resource_owners: dict[str, str] = {}
# Track access patterns per session
self._session_reads: dict[str, list[SessionAccessEvent]] = defaultdict(list)
self._session_writes: dict[str, list[SessionAccessEvent]] = defaultdict(list)
def record_access(self, event: SessionAccessEvent):
"""Record and analyze a data access event."""
if event.access_type == "write":
self._resource_owners[event.resource_key] = event.session_id
self._session_writes[event.session_id].append(event)
elif event.access_type == "read":
self._session_reads[event.session_id].append(event)
# Check if this session is reading data owned by another session
owner = self._resource_owners.get(event.resource_key)
if owner and owner != event.session_id:
self.alert_callback({
"alert_type": "cross_session_data_access",
"severity": "critical",
"reading_session": event.session_id,
"owning_session": owner,
"resource": event.resource_key,
"tool": event.tool_name,
"timestamp": event.timestamp.isoformat(),
"description": (
f"Session {event.session_id} read resource "
f"'{event.resource_key}' owned by session {owner}"
),
})
def session_ended(self, session_id: str):
"""Clean up tracking data when a session ends."""
# Remove ownership records for this session's resources
self._resource_owners = {
k: v for k, v in self._resource_owners.items()
if v != session_id
}
self._session_reads.pop(session_id, None)
self._session_writes.pop(session_id, None)
def get_isolation_report(self) -> dict:
"""Generate a report on session isolation health."""
return {
"active_sessions": len(set(
list(self._session_reads.keys()) +
list(self._session_writes.keys())
)),
"tracked_resources": len(self._resource_owners),
"total_reads": sum(
len(events) for events in self._session_reads.values()
),
"total_writes": sum(
len(events) for events in self._session_writes.values()
),
}
def _default_alert(self, alert: dict):
logger.critical(json.dumps({
"event": "mcp_cross_session_leak",
**alert,
}))Testing Session Isolation
"""
Test suite to verify session isolation in MCP server implementations.
"""
import pytest
import asyncio
import aiohttp
import json
class TestSessionIsolation:
"""
Integration tests that verify data does not leak between sessions.
Run these against your MCP server deployment.
"""
MCP_SERVER_URL = "http://localhost:8080"
async def _create_session(self) -> aiohttp.ClientSession:
"""Connect a new client session to the MCP server."""
session = aiohttp.ClientSession()
# Initialize MCP connection
async with session.get(f"{self.MCP_SERVER_URL}/sse") as resp:
assert resp.status == 200
return session
async def _call_tool(self, session: aiohttp.ClientSession,
tool: str, args: dict, msg_id: int = 1) -> dict:
"""Call an MCP tool and return the result."""
payload = {
"jsonrpc": "2.0",
"method": "tools/call",
"id": msg_id,
"params": {"name": tool, "arguments": args},
}
async with session.post(
f"{self.MCP_SERVER_URL}/messages/",
json=payload
) as resp:
return await resp.json()
@pytest.mark.asyncio
async def test_session_write_not_visible_to_other_session(self):
"""Data written by session A must NOT be readable by session B."""
session_a = await self._create_session()
session_b = await self._create_session()
try:
# Session A writes a secret
await self._call_tool(session_a, "save_file", {
"name": "secret.txt",
"content": "TOP_SECRET_DATA_12345",
})
# Session B tries to read it
result = await self._call_tool(session_b, "read_file", {
"name": "secret.txt",
})
# Session B should NOT see Session A's data
response_text = result["result"]["content"][0]["text"]
assert "TOP_SECRET_DATA_12345" not in response_text, \
"CRITICAL: Cross-session data leak detected!"
finally:
await session_a.close()
await session_b.close()
@pytest.mark.asyncio
async def test_session_memory_not_shared(self):
"""Conversation history must be session-scoped."""
session_a = await self._create_session()
session_b = await self._create_session()
try:
# Session A stores memory
await self._call_tool(session_a, "remember", {
"text": "My API key is sk-secret-abc123",
})
# Session B recalls
result = await self._call_tool(session_b, "recall", {})
response_text = result["result"]["content"][0]["text"]
assert "sk-secret-abc123" not in response_text, \
"CRITICAL: Cross-session memory leak detected!"
finally:
await session_a.close()
await session_b.close()
@pytest.mark.asyncio
async def test_session_cleanup_on_disconnect(self):
"""Session data must be cleaned up when the client disconnects."""
session_a = await self._create_session()
# Write data
await self._call_tool(session_a, "save_file", {
"name": "temp_data.txt",
"content": "SENSITIVE_DATA",
})
# Disconnect
await session_a.close()
await asyncio.sleep(1) # Allow cleanup
# Connect as new session
session_c = await self._create_session()
try:
result = await self._call_tool(session_c, "read_file", {
"name": "temp_data.txt",
})
response_text = result["result"]["content"][0]["text"]
assert "SENSITIVE_DATA" not in response_text, \
"CRITICAL: Stale session data accessible after disconnect!"
finally:
await session_c.close()
@pytest.mark.asyncio
async def test_concurrent_sessions_isolated(self):
"""Multiple concurrent sessions must maintain isolation."""
sessions = [await self._create_session() for _ in range(5)]
try:
# Each session writes unique data
for i, session in enumerate(sessions):
await self._call_tool(session, "save_file", {
"name": "identity.txt",
"content": f"I am session {i}",
})
# Each session should only read its own data
for i, session in enumerate(sessions):
result = await self._call_tool(session, "read_file", {
"name": "identity.txt",
})
text = result["result"]["content"][0]["text"]
assert f"I am session {i}" in text, \
f"Session {i} got wrong data: {text}"
# Verify no other session's data is present
for j in range(5):
if j != i:
assert f"I am session {j}" not in text, \
f"Session {i} can see session {j}'s data!"
finally:
for session in sessions:
await session.close()SDK Version Requirements
{
"minimum_safe_versions": {
"python_mcp_sdk": ">=1.26.0",
"typescript_mcp_sdk": ">=1.26.0",
"description": "Versions below 1.26.0 use shared server state by default"
},
"verification_commands": {
"python": "pip show mcp | grep Version",
"npm": "npm list @modelcontextprotocol/sdk | grep sdk"
},
"changelog_notes": {
"1.26.0": [
"Added session-scoped state management",
"ServerSession now maintains per-connection state",
"Breaking: server.state is no longer shared across sessions",
"Added session lifecycle hooks (on_connect, on_disconnect)"
]
}
}#!/bin/bash
# check-mcp-sdk-version.sh -- Verify MCP SDK meets minimum security requirements
set -euo pipefail
MIN_VERSION="1.26.0"
echo "=== MCP SDK Security Version Check ==="
# Check Python SDK
if command -v pip &> /dev/null; then
PY_VERSION=$(pip show mcp 2>/dev/null | grep "^Version:" | awk '{print $2}' || echo "not installed")
if [ "$PY_VERSION" = "not installed" ]; then
echo "[SKIP] Python MCP SDK not installed"
else
if python3 -c "
from packaging.version import Version
import sys
sys.exit(0 if Version('$PY_VERSION') >= Version('$MIN_VERSION') else 1)
" 2>/dev/null; then
echo "[PASS] Python MCP SDK: $PY_VERSION (>= $MIN_VERSION)"
else
echo "[FAIL] Python MCP SDK: $PY_VERSION (< $MIN_VERSION -- VULNERABLE to CVE-2026-25536)"
echo " Run: pip install --upgrade mcp>=$MIN_VERSION"
fi
fi
fi
# Check TypeScript SDK
if command -v npm &> /dev/null; then
TS_VERSION=$(npm list @modelcontextprotocol/sdk 2>/dev/null | grep sdk | grep -oP '[\d.]+' | head -1 || echo "not installed")
if [ "$TS_VERSION" = "not installed" ]; then
echo "[SKIP] TypeScript MCP SDK not installed"
else
if npx semver -r ">=$MIN_VERSION" "$TS_VERSION" &>/dev/null; then
echo "[PASS] TypeScript MCP SDK: $TS_VERSION (>= $MIN_VERSION)"
else
echo "[FAIL] TypeScript MCP SDK: $TS_VERSION (< $MIN_VERSION -- VULNERABLE)"
echo " Run: npm install @modelcontextprotocol/sdk@latest"
fi
fi
fi
echo "=== Check Complete ==="References
- CVE-2026-25536: Cross-client data leak in MCP SDK shared server instances
- MCP SDK Changelog: Version 1.26.0 session isolation changes
- MCP Security Guide: Session management and state isolation
- OWASP Session Management Cheat Sheet: Best practices for session isolation in web applications
- CWE-488: Exposure of Data Element to Wrong Session -- MITRE classification