Capability-Based Access Control
Step-by-step walkthrough for implementing fine-grained capability controls for LLM features, covering capability token design, permission scoping, dynamic capability grants, and audit trails.
Traditional role-based access control assigns broad permissions to user roles. Capability-based access control is more granular: each action requires a specific capability token, and tokens can be scoped, time-limited, and revoked. For LLM applications where the model can invoke tools and external APIs, capability-based control ensures the model can only perform actions that the current user is authorized to request.
Step 1: Define the Capability Model
# capabilities/model.py
"""
Capability model for LLM application authorization.
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from enum import Enum
from typing import Optional
import secrets
import hashlib
class CapabilityScope(str, Enum):
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
@dataclass
class Capability:
id: str
name: str
resource: str
scope: CapabilityScope
constraints: dict = field(default_factory=dict)
expires_at: Optional[datetime] = None
max_uses: Optional[int] = None
uses: int = 0
@staticmethod
def create(
name: str, resource: str, scope: CapabilityScope,
ttl_minutes: int = 60, max_uses: int = None,
constraints: dict = None,
) -> "Capability":
return Capability(
id=secrets.token_hex(16),
name=name,
resource=resource,
scope=scope,
constraints=constraints or {},
expires_at=datetime.now(timezone.utc) + timedelta(minutes=ttl_minutes),
max_uses=max_uses,
uses=0,
)
def is_valid(self) -> bool:
if self.expires_at and datetime.now(timezone.utc) > self.expires_at:
return False
if self.max_uses and self.uses >= self.max_uses:
return False
return True
def consume(self) -> bool:
if not self.is_valid():
return False
self.uses += 1
return TrueStep 2: Build the Capability Manager
# capabilities/manager.py
"""
Capability management: granting, checking, and revoking capabilities.
"""
import logging
from capabilities.model import Capability, CapabilityScope
logger = logging.getLogger("capability_manager")
class CapabilityManager:
def __init__(self):
self._capabilities: dict[str, list[Capability]] = {}
self._revoked: set[str] = set()
def grant(self, session_id: str, capability: Capability) -> str:
if session_id not in self._capabilities:
self._capabilities[session_id] = []
self._capabilities[session_id].append(capability)
logger.info(f"Granted {capability.name} to session {session_id}")
return capability.id
def check(
self, session_id: str, resource: str, scope: CapabilityScope,
) -> bool:
caps = self._capabilities.get(session_id, [])
for cap in caps:
if (cap.resource == resource and
cap.scope == scope and
cap.id not in self._revoked and
cap.is_valid()):
return True
return False
def consume(
self, session_id: str, resource: str, scope: CapabilityScope,
) -> bool:
caps = self._capabilities.get(session_id, [])
for cap in caps:
if (cap.resource == resource and
cap.scope == scope and
cap.id not in self._revoked and
cap.consume()):
logger.info(f"Consumed capability {cap.name} (session {session_id})")
return True
logger.warning(f"No valid capability for {resource}:{scope} (session {session_id})")
return False
def revoke(self, capability_id: str) -> None:
self._revoked.add(capability_id)
logger.info(f"Revoked capability {capability_id}")
def revoke_session(self, session_id: str) -> None:
caps = self._capabilities.get(session_id, [])
for cap in caps:
self._revoked.add(cap.id)
logger.info(f"Revoked all capabilities for session {session_id}")Step 3: Integrate with Tool Calling
# capabilities/tool_guard.py
"""
Capability guard for LLM tool invocations.
"""
from capabilities.manager import CapabilityManager
from capabilities.model import CapabilityScope
class ToolCapabilityGuard:
TOOL_REQUIREMENTS = {
"search_products": ("products", CapabilityScope.READ),
"place_order": ("orders", CapabilityScope.WRITE),
"cancel_order": ("orders", CapabilityScope.WRITE),
"view_account": ("account", CapabilityScope.READ),
"update_account": ("account", CapabilityScope.WRITE),
"run_query": ("database", CapabilityScope.EXECUTE),
"admin_panel": ("system", CapabilityScope.ADMIN),
}
def __init__(self, manager: CapabilityManager):
self.manager = manager
def can_execute(self, session_id: str, tool_name: str) -> bool:
requirement = self.TOOL_REQUIREMENTS.get(tool_name)
if not requirement:
return False # Unknown tools are denied by default
resource, scope = requirement
return self.manager.check(session_id, resource, scope)
def execute_with_check(
self, session_id: str, tool_name: str, tool_func, **kwargs
) -> dict:
if not self.can_execute(session_id, tool_name):
return {
"error": f"Insufficient capabilities for {tool_name}",
"allowed": False,
}
requirement = self.TOOL_REQUIREMENTS[tool_name]
self.manager.consume(session_id, *requirement)
result = tool_func(**kwargs)
return {"result": result, "allowed": True}Step 4: Implement Dynamic Capability Grants
# capabilities/dynamic_grants.py
"""
Dynamic capability grants based on user context and risk assessment.
"""
from capabilities.model import Capability, CapabilityScope
from capabilities.manager import CapabilityManager
class DynamicGrantPolicy:
def __init__(self, manager: CapabilityManager):
self.manager = manager
def grant_for_user_role(self, session_id: str, role: str) -> list[str]:
granted = []
role_capabilities = {
"customer": [
("search_products", "products", CapabilityScope.READ, 60, None),
("view_orders", "orders", CapabilityScope.READ, 60, 50),
("view_account", "account", CapabilityScope.READ, 30, 10),
],
"support_agent": [
("search_products", "products", CapabilityScope.READ, 480, None),
("manage_orders", "orders", CapabilityScope.WRITE, 480, 200),
("view_account", "account", CapabilityScope.READ, 480, 100),
],
"admin": [
("full_access", "system", CapabilityScope.ADMIN, 30, 50),
],
}
for cap_spec in role_capabilities.get(role, []):
name, resource, scope, ttl, max_uses = cap_spec
cap = Capability.create(
name=name, resource=resource, scope=scope,
ttl_minutes=ttl, max_uses=max_uses,
)
cap_id = self.manager.grant(session_id, cap)
granted.append(cap_id)
return grantedStep 5: Build Audit Logging
# capabilities/audit.py
import json
import logging
from datetime import datetime, timezone
class CapabilityAuditLogger:
def __init__(self):
self.logger = logging.getLogger("capability_audit")
def log_check(self, session_id, resource, scope, result):
self.logger.info(json.dumps({
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": "capability_check",
"session_id": session_id,
"resource": resource,
"scope": scope,
"result": "granted" if result else "denied",
}))
def log_escalation_attempt(self, session_id, requested_resource, requested_scope):
self.logger.warning(json.dumps({
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": "escalation_attempt",
"session_id": session_id,
"resource": requested_resource,
"scope": requested_scope,
}))Step 6: Deploy as Middleware
# capabilities/api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from capabilities.manager import CapabilityManager
from capabilities.tool_guard import ToolCapabilityGuard
from capabilities.dynamic_grants import DynamicGrantPolicy
app = FastAPI(title="Capability Access Control")
manager = CapabilityManager()
guard = ToolCapabilityGuard(manager)
policy = DynamicGrantPolicy(manager)
class SessionRequest(BaseModel):
session_id: str
role: str
class ToolRequest(BaseModel):
session_id: str
tool_name: str
@app.post("/session/init")
async def init_session(request: SessionRequest):
granted = policy.grant_for_user_role(request.session_id, request.role)
return {"granted_capabilities": len(granted)}
@app.post("/tool/check")
async def check_tool(request: ToolRequest):
allowed = guard.can_execute(request.session_id, request.tool_name)
if not allowed:
raise HTTPException(403, "Insufficient capabilities")
return {"allowed": True}uvicorn capabilities.api:app --port 8630Step 7: Test Capability Enforcement
# tests/test_capabilities.py
import pytest
from capabilities.model import Capability, CapabilityScope
from capabilities.manager import CapabilityManager
from capabilities.tool_guard import ToolCapabilityGuard
@pytest.fixture
def manager():
return CapabilityManager()
def test_granted_capability_works(manager):
cap = Capability.create("read_products", "products", CapabilityScope.READ)
manager.grant("session-1", cap)
assert manager.check("session-1", "products", CapabilityScope.READ)
def test_missing_capability_denied(manager):
assert not manager.check("session-1", "products", CapabilityScope.READ)
def test_revoked_capability_denied(manager):
cap = Capability.create("read_products", "products", CapabilityScope.READ)
manager.grant("session-1", cap)
manager.revoke(cap.id)
assert not manager.check("session-1", "products", CapabilityScope.READ)
def test_max_uses_enforced(manager):
cap = Capability.create("limited", "products", CapabilityScope.READ, max_uses=2)
manager.grant("session-1", cap)
assert manager.consume("session-1", "products", CapabilityScope.READ)
assert manager.consume("session-1", "products", CapabilityScope.READ)
assert not manager.consume("session-1", "products", CapabilityScope.READ)
def test_unknown_tool_denied():
mgr = CapabilityManager()
guard = ToolCapabilityGuard(mgr)
assert not guard.can_execute("session-1", "unknown_tool")pytest tests/test_capabilities.py -vRelated Topics
- Sandboxed Tool Execution -- Isolating tool execution
- Tool Permission Scoping -- Least-privilege for agents
- Session Isolation Patterns -- Isolating user sessions
- Human-in-the-Loop Gates -- Approval for high-risk actions
A customer session has read capabilities for products and orders. Through prompt injection, the attacker tries to make the LLM call the admin_panel tool. What happens?