能力式存取控制
中級5 分鐘閱讀更新於 2026-03-15
為 LLM 功能實作細粒度能力控管的逐步演練,涵蓋能力符記設計、權限範圍、動態能力授予與稽核軌跡。
傳統的角色式存取控制將廣泛權限指派給使用者角色。能力式存取控制則更細粒度:每個動作都需要特定的能力符記,且符記可限定範圍、限時與撤銷。對於模型可呼叫工具與外部 API 的 LLM 應用而言,能力式控管確保模型只能執行當前使用者獲授權請求的動作。
步驟 1:定義能力模型
# capabilities/model.py
"""
Capability model for LLM application authorization.
"""
from dataclasses import dataclass, field
from datetime import datetime, timezone, timedelta
from enum import Enum
from typing import Optional
import secrets
import hashlib
class CapabilityScope(str, Enum):
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
@dataclass
class Capability:
id: str
name: str
resource: str
scope: CapabilityScope
constraints: dict = field(default_factory=dict)
expires_at: Optional[datetime] = None
max_uses: Optional[int] = None
uses: int = 0
@staticmethod
def create(
name: str, resource: str, scope: CapabilityScope,
ttl_minutes: int = 60, max_uses: int = None,
constraints: dict = None,
) -> "Capability":
return Capability(
id=secrets.token_hex(16),
name=name,
resource=resource,
scope=scope,
constraints=constraints or {},
expires_at=datetime.now(timezone.utc) + timedelta(minutes=ttl_minutes),
max_uses=max_uses,
uses=0,
)
def is_valid(self) -> bool:
if self.expires_at and datetime.now(timezone.utc) > self.expires_at:
return False
if self.max_uses and self.uses >= self.max_uses:
return False
return True
def consume(self) -> bool:
if not self.is_valid():
return False
self.uses += 1
return True步驟 2:建構能力管理器
# capabilities/manager.py
"""
Capability management: granting, checking, and revoking capabilities.
"""
import logging
from capabilities.model import Capability, CapabilityScope
logger = logging.getLogger("capability_manager")
class CapabilityManager:
def __init__(self):
self._capabilities: dict[str, list[Capability]] = {}
self._revoked: set[str] = set()
def grant(self, session_id: str, capability: Capability) -> str:
if session_id not in self._capabilities:
self._capabilities[session_id] = []
self._capabilities[session_id].append(capability)
logger.info(f"Granted {capability.name} to session {session_id}")
return capability.id
def check(
self, session_id: str, resource: str, scope: CapabilityScope,
) -> bool:
caps = self._capabilities.get(session_id, [])
for cap in caps:
if (cap.resource == resource and
cap.scope == scope and
cap.id not in self._revoked and
cap.is_valid()):
return True
return False
def consume(
self, session_id: str, resource: str, scope: CapabilityScope,
) -> bool:
caps = self._capabilities.get(session_id, [])
for cap in caps:
if (cap.resource == resource and
cap.scope == scope and
cap.id not in self._revoked and
cap.consume()):
logger.info(f"Consumed capability {cap.name} (session {session_id})")
return True
logger.warning(f"No valid capability for {resource}:{scope} (session {session_id})")
return False
def revoke(self, capability_id: str) -> None:
self._revoked.add(capability_id)
logger.info(f"Revoked capability {capability_id}")
def revoke_session(self, session_id: str) -> None:
caps = self._capabilities.get(session_id, [])
for cap in caps:
self._revoked.add(cap.id)
logger.info(f"Revoked all capabilities for session {session_id}")步驟 3:與工具呼叫整合
# capabilities/tool_guard.py
"""
Capability guard for LLM tool invocations.
"""
from capabilities.manager import CapabilityManager
from capabilities.model import CapabilityScope
class ToolCapabilityGuard:
TOOL_REQUIREMENTS = {
"search_products": ("products", CapabilityScope.READ),
"place_order": ("orders", CapabilityScope.WRITE),
"cancel_order": ("orders", CapabilityScope.WRITE),
"view_account": ("account", CapabilityScope.READ),
"update_account": ("account", CapabilityScope.WRITE),
"run_query": ("database", CapabilityScope.EXECUTE),
"admin_panel": ("system", CapabilityScope.ADMIN),
}
def __init__(self, manager: CapabilityManager):
self.manager = manager
def can_execute(self, session_id: str, tool_name: str) -> bool:
requirement = self.TOOL_REQUIREMENTS.get(tool_name)
if not requirement:
return False # Unknown tools are denied by default
resource, scope = requirement
return self.manager.check(session_id, resource, scope)
def execute_with_check(
self, session_id: str, tool_name: str, tool_func, **kwargs
) -> dict:
if not self.can_execute(session_id, tool_name):
return {
"error": f"Insufficient capabilities for {tool_name}",
"allowed": False,
}
requirement = self.TOOL_REQUIREMENTS[tool_name]
self.manager.consume(session_id, *requirement)
result = tool_func(**kwargs)
return {"result": result, "allowed": True}步驟 4:實作動態能力授予
# capabilities/dynamic_grants.py
"""
Dynamic capability grants based on user context and risk assessment.
"""
from capabilities.model import Capability, CapabilityScope
from capabilities.manager import CapabilityManager
class DynamicGrantPolicy:
def __init__(self, manager: CapabilityManager):
self.manager = manager
def grant_for_user_role(self, session_id: str, role: str) -> list[str]:
granted = []
role_capabilities = {
"customer": [
("search_products", "products", CapabilityScope.READ, 60, None),
("view_orders", "orders", CapabilityScope.READ, 60, 50),
("view_account", "account", CapabilityScope.READ, 30, 10),
],
"support_agent": [
("search_products", "products", CapabilityScope.READ, 480, None),
("manage_orders", "orders", CapabilityScope.WRITE, 480, 200),
("view_account", "account", CapabilityScope.READ, 480, 100),
],
"admin": [
("full_access", "system", CapabilityScope.ADMIN, 30, 50),
],
}
for cap_spec in role_capabilities.get(role, []):
name, resource, scope, ttl, max_uses = cap_spec
cap = Capability.create(
name=name, resource=resource, scope=scope,
ttl_minutes=ttl, max_uses=max_uses,
)
cap_id = self.manager.grant(session_id, cap)
granted.append(cap_id)
return granted步驟 5:建構稽核日誌
# capabilities/audit.py
import json
import logging
from datetime import datetime, timezone
class CapabilityAuditLogger:
def __init__(self):
self.logger = logging.getLogger("capability_audit")
def log_check(self, session_id, resource, scope, result):
self.logger.info(json.dumps({
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": "capability_check",
"session_id": session_id,
"resource": resource,
"scope": scope,
"result": "granted" if result else "denied",
}))
def log_escalation_attempt(self, session_id, requested_resource, requested_scope):
self.logger.warning(json.dumps({
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": "escalation_attempt",
"session_id": session_id,
"resource": requested_resource,
"scope": requested_scope,
}))步驟 6:以中介層部署
# capabilities/api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from capabilities.manager import CapabilityManager
from capabilities.tool_guard import ToolCapabilityGuard
from capabilities.dynamic_grants import DynamicGrantPolicy
app = FastAPI(title="Capability Access Control")
manager = CapabilityManager()
guard = ToolCapabilityGuard(manager)
policy = DynamicGrantPolicy(manager)
class SessionRequest(BaseModel):
session_id: str
role: str
class ToolRequest(BaseModel):
session_id: str
tool_name: str
@app.post("/session/init")
async def init_session(request: SessionRequest):
granted = policy.grant_for_user_role(request.session_id, request.role)
return {"granted_capabilities": len(granted)}
@app.post("/tool/check")
async def check_tool(request: ToolRequest):
allowed = guard.can_execute(request.session_id, request.tool_name)
if not allowed:
raise HTTPException(403, "Insufficient capabilities")
return {"allowed": True}uvicorn capabilities.api:app --port 8630步驟 7:測試能力強制執行
# tests/test_capabilities.py
import pytest
from capabilities.model import Capability, CapabilityScope
from capabilities.manager import CapabilityManager
from capabilities.tool_guard import ToolCapabilityGuard
@pytest.fixture
def manager():
return CapabilityManager()
def test_granted_capability_works(manager):
cap = Capability.create("read_products", "products", CapabilityScope.READ)
manager.grant("session-1", cap)
assert manager.check("session-1", "products", CapabilityScope.READ)
def test_missing_capability_denied(manager):
assert not manager.check("session-1", "products", CapabilityScope.READ)
def test_revoked_capability_denied(manager):
cap = Capability.create("read_products", "products", CapabilityScope.READ)
manager.grant("session-1", cap)
manager.revoke(cap.id)
assert not manager.check("session-1", "products", CapabilityScope.READ)
def test_max_uses_enforced(manager):
cap = Capability.create("limited", "products", CapabilityScope.READ, max_uses=2)
manager.grant("session-1", cap)
assert manager.consume("session-1", "products", CapabilityScope.READ)
assert manager.consume("session-1", "products", CapabilityScope.READ)
assert not manager.consume("session-1", "products", CapabilityScope.READ)
def test_unknown_tool_denied():
mgr = CapabilityManager()
guard = ToolCapabilityGuard(mgr)
assert not guard.can_execute("session-1", "unknown_tool")pytest tests/test_capabilities.py -v相關主題
- Sandboxed Tool Execution —— 隔離工具執行
- Tool Permission Scoping —— 代理的最小權限
- Session Isolation Patterns —— 隔離使用者會話
- Human-in-the-Loop Gates —— 高風險動作的人工核可
Knowledge Check
某客戶會話擁有對 products 與 orders 的讀取能力。攻擊者透過提示詞注入嘗試讓 LLM 呼叫 admin_panel 工具。會發生什麼?