Logging and Monitoring for Cloud AI Services
Implementing comprehensive logging and monitoring for cloud AI services including prompt/response capture, anomaly detection, and security-focused observability across AWS, Azure, and GCP.
概覽
監控 雲端 AI services requires a different approach than 監控 traditional 雲端 workloads. Traditional 監控 focuses on infrastructure metrics (CPU, memory, network) and application metrics (request rate, error rate, latency). AI service 監控 must 此外 capture the semantic content of interactions -- the prompts sent to models and the responses generated -- 因為 這是 where 安全-relevant events occur.
A 提示詞注入 attack does not manifest as an HTTP error or a CPU spike. It appears as a specific pattern in the text of a prompt. A data exfiltration attempt through a model looks like a normal API call from an infrastructure perspective. Only by analyzing the content of prompts and responses can you detect these AI-specific threats.
This creates a tension: capturing prompts and responses for 安全 監控 means storing potentially sensitive data (customer inputs, PII, proprietary information) in log systems. The logging architecture must balance 安全 observability against data protection requirements.
Logging Architecture
What to Log for AI 安全
The following data points are essential for 安全 監控 of 雲端 AI services:
| Data Point | 安全 Purpose | Privacy Consideration |
|---|---|---|
| Caller identity (IAM principal) | Attribution, anomaly 偵測 | Low sensitivity |
| Source IP address | Network anomaly 偵測 | Low sensitivity |
| Timestamp | Timeline reconstruction | Low sensitivity |
| Model ID / deployment | Scope of impact 評估 | Low sensitivity |
| 輸入 符元 count | Cost 監控, extraction 偵測 | Low sensitivity |
| 輸出 符元 count | Cost 監控, amplification 偵測 | Low sensitivity |
| Content filter result | 攻擊 偵測, policy compliance | Medium sensitivity |
| Prompt text | Prompt injection 偵測, forensics | High sensitivity |
| Response text | Data leakage 偵測, compliance | High sensitivity |
| 護欄 action (block/allow) | 安全 control effectiveness | Medium sensitivity |
| Latency | Performance baseline, anomaly 偵測 | Low sensitivity |
| Error codes | Access control validation | Low sensitivity |
Two-Tier Logging Strategy
實作 a two-tier logging approach that separates metadata logging (always on) from content logging (restricted):
import json
import hashlib
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AISecurityLogger:
"""Two-tier logging for AI service interactions."""
metadata_logger: object # CloudWatch, Azure Monitor, 雲端 Logging client
content_logger: Optional[object] = None # Separate, restricted log destination
enable_content_logging: bool = False
pii_redaction_enabled: bool = True
def log_interaction(
self,
caller_identity: str,
source_ip: str,
model_id: str,
input_text: str,
output_text: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
content_filter_result: Optional[dict] = None,
guardrail_action: Optional[str] = None,
) -> dict:
"""Log an AI service interaction at both tiers."""
timestamp = time.time()
request_id = hashlib.sha256(
f"{caller_identity}{timestamp}".encode()
).hexdigest()[:16]
# Tier 1: Metadata log (always captured)
metadata_record = {
"timestamp": timestamp,
"request_id": request_id,
"caller_identity": caller_identity,
"source_ip": source_ip,
"model_id": model_id,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"latency_ms": latency_ms,
"guardrail_action": guardrail_action,
"content_filter_triggered": bool(
content_filter_result and content_filter_result.get("blocked")
),
"input_hash": hashlib.sha256(input_text.encode()).hexdigest(),
}
self._write_metadata(metadata_record)
# Tier 2: Content log (restricted, optional)
if self.enable_content_logging and self.content_logger:
content_record = {
"timestamp": timestamp,
"request_id": request_id,
"input_text": self._redact_if_needed(input_text),
"output_text": self._redact_if_needed(output_text),
"content_filter_detail": content_filter_result,
}
self._write_content(content_record)
return {"request_id": request_id, "logged": True}
def _redact_if_needed(self, text: str) -> str:
"""Apply PII redaction if enabled."""
if not self.pii_redaction_enabled:
return text
import re
# Basic PII patterns -- use a dedicated PII 偵測 library in production
patterns = {
"email": (r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "[EMAIL_REDACTED]"),
"phone": (r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "[PHONE_REDACTED]"),
"ssn": (r"\b\d{3}-\d{2}-\d{4}\b", "[SSN_REDACTED]"),
"credit_card": (r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "[CC_REDACTED]"),
}
redacted = text
for name, (pattern, replacement) in patterns.items():
redacted = re.sub(pattern, replacement, redacted)
return redacted
def _write_metadata(self, record: dict):
"""Write metadata to the primary log destination."""
# 實作 depends on 雲端 provider
pass
def _write_content(self, record: dict):
"""Write content to the restricted log destination."""
# 實作 depends on 雲端 provider
passAWS Logging Configuration
Bedrock Model Invocation Logging
AWS Bedrock supports model invocation logging that captures prompts and responses to S3 or CloudWatch Logs:
import boto3
def configure_bedrock_invocation_logging(
session: boto3.Session,
s3_bucket: str,
s3_prefix: str = "bedrock-logs/",
cloudwatch_log_group: str = "/aws/bedrock/invocations",
region: str = "us-east-1",
) -> dict:
"""Configure Bedrock model invocation logging."""
bedrock = session.client("bedrock", region_name=region)
logs = session.client("logs", region_name=region)
# Create CloudWatch log group if it does not exist
try:
logs.create_log_group(
logGroupName=cloudwatch_log_group,
retentionInDays=90,
kmsKeyId="alias/bedrock-logs-key", # Use KMS encryption
)
except logs.exceptions.ResourceAlreadyExistsException:
pass
# Configure invocation logging
logging_config = {
"loggingConfig": {
"cloudWatchConfig": {
"logGroupName": cloudwatch_log_group,
"roleArn": f"arn:aws:iam::{session.client('sts').get_caller_identity()['Account']}:role/BedrockLoggingRole",
"largeDataDelivery": {
"s3Config": {
"bucketName": s3_bucket,
"keyPrefix": f"{s3_prefix}large-data/",
}
},
},
"s3Config": {
"bucketName": s3_bucket,
"keyPrefix": s3_prefix,
},
"textDataDeliveryEnabled": True,
"imageDataDeliveryEnabled": True,
"embeddingDataDeliveryEnabled": True,
}
}
bedrock.put_model_invocation_logging_configuration(**logging_config)
return {
"cloudwatch_log_group": cloudwatch_log_group,
"s3_bucket": f"s3://{s3_bucket}/{s3_prefix}",
"text_logging": True,
"image_logging": True,
"embedding_logging": True,
}CloudWatch Insights 偵測 Queries
Use CloudWatch Insights to detect 安全-relevant patterns in Bedrock invocation logs:
DETECTION_QUERIES = {
"prompt_injection_indicators": """
fields @timestamp, @message
| parse @message '"輸入":*' as input_text
| filter input_text like /(?i)(ignore previous|系統提示詞|you are now|disregard instructions|override 安全)/
| stats count() as hit_count by bin(1h) as time_window
| sort time_window desc
""",
"high_token_consumers": """
fields @timestamp, identity.arn as caller, modelId,
inputTokenCount, outputTokenCount
| stats sum(inputTokenCount + outputTokenCount) as total_tokens,
count() as request_count
by caller, modelId
| filter total_tokens > 500000
| sort total_tokens desc
""",
"content_filter_blocks": """
fields @timestamp, identity.arn as caller, modelId
| filter ispresent(guardrailAction) and guardrailAction = "BLOCKED"
| stats count() as block_count by caller, bin(1h) as hour
| filter block_count > 5
| sort block_count desc
""",
"unusual_model_access": """
fields @timestamp, identity.arn as caller, modelId
| stats earliest(@timestamp) as first_seen,
latest(@timestamp) as last_seen,
count() as total_calls
by caller, modelId
| filter first_seen > ago(24h)
| sort total_calls desc
""",
"large_output_requests": """
fields @timestamp, identity.arn as caller, modelId,
outputTokenCount, inputTokenCount
| filter outputTokenCount > 4000
| stats count() as large_output_count,
avg(outputTokenCount) as avg_output,
sum(outputTokenCount) as total_output
by caller
| filter large_output_count > 10
| sort total_output desc
""",
}Azure Logging Configuration
Azure OpenAI Diagnostic Logging
from azure.identity import DefaultAzureCredential
from azure.mgmt.monitor import MonitorManagementClient
def configure_azure_openai_diagnostics(
subscription_id: str,
resource_group: str,
openai_account_name: str,
log_analytics_workspace_id: str,
storage_account_id: str = None,
) -> dict:
"""Configure comprehensive diagnostic logging for Azure OpenAI."""
credential = DefaultAzureCredential()
monitor = MonitorManagementClient(credential, subscription_id)
resource_uri = (
f"/subscriptions/{subscription_id}/resourceGroups/{resource_group}"
f"/providers/Microsoft.CognitiveServices/accounts/{openai_account_name}"
)
logs = [
{"category": "Audit", "enabled": True,
"retentionPolicy": {"enabled": True, "days": 365}},
{"category": "RequestResponse", "enabled": True,
"retentionPolicy": {"enabled": True, "days": 90}},
{"category": "Trace", "enabled": True,
"retentionPolicy": {"enabled": True, "days": 90}},
]
metrics = [
{"category": "AllMetrics", "enabled": True,
"retentionPolicy": {"enabled": True, "days": 90}},
]
settings = {
"properties": {
"workspaceId": log_analytics_workspace_id,
"logs": logs,
"metrics": metrics,
}
}
if storage_account_id:
settings["properties"]["storageAccountId"] = storage_account_id
result = monitor.diagnostic_settings.create_or_update(
resource_uri,
"openai-安全-diagnostics",
settings,
)
return {
"setting_name": "openai-安全-diagnostics",
"log_categories": ["Audit", "RequestResponse", "Trace"],
"retention_days": 90,
"workspace_id": log_analytics_workspace_id,
}KQL 偵測 Queries for Azure OpenAI
// Detect repeated content filter triggers from a single caller
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.COGNITIVESERVICES"
| where Category == "RequestResponse"
| extend parsedProperties = parse_json(properties_s)
| where parsedProperties.response.statusCode == 400
| where properties_s contains "content_filter"
| summarize
FilterHits = count(),
Models = make_set(parsedProperties.model),
FirstSeen = min(TimeGenerated),
LastSeen = max(TimeGenerated)
by CallerIPAddress
| where FilterHits > 10
| order by FilterHits desc
// 識別 符元 consumption anomalies
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.COGNITIVESERVICES"
| where Category == "RequestResponse"
| extend parsedProperties = parse_json(properties_s)
| extend totalTokens = toint(parsedProperties.usage.total_tokens)
| summarize
AvgTokens = avg(totalTokens),
MaxTokens = max(totalTokens),
P95Tokens = percentile(totalTokens, 95),
RequestCount = count()
by CallerIPAddress, bin(TimeGenerated, 1h)
| where AvgTokens > 2 * P95Tokens // Flag when average exceeds 2x the 95th percentile
| order by AvgTokens desc
// Monitor for API key usage (should be zero if managed identity is enforced)
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.COGNITIVESERVICES"
| where Category == "Audit"
| where properties_s !contains "managedidentity"
| where properties_s contains "key"
| project TimeGenerated, CallerIPAddress, OperationName, properties_s
| order by TimeGenerated desc
// Track deployment changes for configuration drift
AzureActivity
| where ResourceProviderValue == "MICROSOFT.COGNITIVESERVICES"
| where OperationNameValue has_any ("DEPLOYMENTS/WRITE", "RAIPOLICIES/WRITE", "ACCOUNTS/WRITE")
| project TimeGenerated, Caller, OperationNameValue, Properties
| order by TimeGenerated desc
GCP Logging Configuration
Vertex AI Audit Logging
GCP 雲端 Audit Logs for Vertex AI require enabling Data Access logs, which are not enabled by default:
from google.雲端 import logging_v2
def enable_vertex_data_access_logs(project_id: str) -> dict:
"""Enable Data Access audit logs for Vertex AI."""
# Data Access logs must be enabled via the IAM audit config
# 這是 typically done via gcloud or Terraform:
#
# gcloud projects set-iam-policy PROJECT_ID policy.yaml
#
# Where policy.yaml includes:
audit_config = {
"service": "aiplatform.googleapis.com",
"auditLogConfigs": [
{"logType": "ADMIN_READ"},
{"logType": "DATA_READ"},
{"logType": "DATA_WRITE"},
],
}
return {
"audit_config": audit_config,
"detail": "Apply this audit config to the project IAM policy. "
"DATA_READ captures model prediction calls. "
"DATA_WRITE captures model upload and 訓練 operations.",
"gcloud_command": (
f"gcloud projects get-iam-policy {project_id} --format=json > policy.json && "
"# Add auditConfigs to policy.json, then: "
f"gcloud projects set-iam-policy {project_id} policy.json"
),
}
def query_vertex_security_events(
project_id: str,
hours_back: int = 24,
) -> dict:
"""Query Vertex AI 安全 events from 雲端 Logging."""
client = logging_v2.Client(project=project_id)
queries = {
"model_prediction_calls": (
f'resource.type="aiplatform.googleapis.com/Endpoint" '
f'protoPayload.methodName="google.雲端.aiplatform.v1.PredictionService.Predict" '
f'timestamp>="{hours_back}h ago"'
),
"permission_denied": (
f'resource.type="aiplatform.googleapis.com" '
f'protoPayload.status.code=7 '
f'timestamp>="{hours_back}h ago"'
),
"model_modifications": (
f'resource.type="aiplatform.googleapis.com/Model" '
f'protoPayload.methodName=~"Create|Update|Delete" '
f'timestamp>="{hours_back}h ago"'
),
"endpoint_changes": (
f'resource.type="aiplatform.googleapis.com/Endpoint" '
f'protoPayload.methodName=~"Deploy|Undeploy|Create|Delete" '
f'timestamp>="{hours_back}h ago"'
),
}
results = {}
for name, filter_str in queries.items():
entries = list(client.list_entries(filter_=filter_str, max_results=50))
results[name] = {
"count": len(entries),
"sample": [
{
"timestamp": str(e.timestamp),
"method": getattr(e.payload, "get", lambda *a: "n/a")("methodName", "n/a"),
}
for e in entries[:5]
],
}
return results安全 監控 Dashboard
Key Metrics to Display
Design a 安全 監控 dashboard with these panels:
def define_ai_security_dashboard() -> dict:
"""Define panels for an AI 安全 監控 dashboard."""
return {
"dashboard_name": "雲端 AI 安全 Operations",
"refresh_interval": "1m",
"panels": [
{
"title": "Content Filter Blocks (24h)",
"type": "timeseries",
"metric": "guardrail_blocks_per_hour",
"threshold": {"warning": 10, "critical": 50},
"description": "Spikes indicate potential attack campaigns",
},
{
"title": "Token Consumption by Identity",
"type": "bar_chart",
"metric": "total_tokens_by_caller",
"threshold": {"warning": 500000, "critical": 2000000},
"description": "Identifies top consumers and anomalies",
},
{
"title": "Unique Callers (24h)",
"type": "stat",
"metric": "distinct_caller_identities",
"description": "New callers may indicate credential compromise",
},
{
"title": "Error Rate by Model",
"type": "timeseries",
"metric": "error_rate_per_model",
"threshold": {"warning": 0.05, "critical": 0.15},
"description": "High error rates may indicate scanning or fuzzing",
},
{
"title": "Cost Burn Rate (USD/hour)",
"type": "gauge",
"metric": "estimated_cost_per_hour",
"threshold": {"warning": 50, "critical": 200},
"description": "Real-time cost 監控 for denial-of-wallet 偵測",
},
{
"title": "Suspicious Prompt Patterns",
"type": "table",
"metric": "prompts_matching_injection_patterns",
"description": "Prompts matching known injection signatures",
},
{
"title": "Model Deployment Changes",
"type": "event_log",
"metric": "deployment_configuration_events",
"description": "Track 護欄, model, and endpoint changes",
},
{
"title": "Access Denied Events",
"type": "timeseries",
"metric": "access_denied_count",
"threshold": {"warning": 20, "critical": 100},
"description": "High denial rates indicate enumeration or brute force",
},
],
}Alert Correlation and Escalation
Multi-Signal Alert Rules
Individual metrics often generate false positives. Correlate multiple signals to improve alert fidelity:
from dataclasses import dataclass
from typing import Optional
@dataclass
class CorrelatedAlert:
"""Multi-signal alert rule for AI 安全 events."""
name: str
severity: str
signals: list
threshold: dict
escalation: str
ALERT_RULES = [
CorrelatedAlert(
name="Active 提示詞注入 Campaign",
severity="P2",
signals=[
"content_filter_blocks > 10 in 5min",
"unique_payloads > 5 in 5min (by 輸入 hash)",
"single_caller_identity",
],
threshold={"signals_required": 2, "within_minutes": 10},
escalation="Page AI 安全 on-call, block caller identity",
),
CorrelatedAlert(
name="Credential Compromise - AI Service",
severity="P1",
signals=[
"new_source_ip for existing caller",
"off_hours_access",
"model_access_pattern_change",
"high_token_consumption",
],
threshold={"signals_required": 3, "within_minutes": 30},
escalation="Page 安全 on-call, suspend credential, preserve logs",
),
CorrelatedAlert(
name="Denial of Wallet 攻擊",
severity="P1",
signals=[
"token_consumption > 5x baseline",
"cost_burn_rate > $100/hour",
"single_caller_or_ip",
],
threshold={"signals_required": 2, "within_minutes": 15},
escalation="Page FinOps and 安全, apply emergency rate limit",
),
CorrelatedAlert(
name="安全 Control Tampering",
severity="P1",
signals=[
"guardrail_configuration_changed",
"content_filter_policy_modified",
"unusual_admin_access_pattern",
],
threshold={"signals_required": 1, "within_minutes": 5},
escalation="Page AI platform team, verify change was authorized",
),
]Log Retention and Compliance
Retention Requirements by Data Type
| Log Type | Minimum Retention | Encryption | Access Control |
|---|---|---|---|
| Management API audit logs | 365 days | KMS/CMK | 安全 team + auditors |
| Invocation metadata | 90 days | KMS/CMK | 安全 team + ML platform |
| Prompt/response content | 30-90 days | KMS/CMK, separate key | 安全 team only (restricted) |
| Content filter results | 90 days | KMS/CMK | 安全 team + compliance |
| Cost and usage metrics | 365 days | Standard | FinOps + 安全 |
參考文獻
- AWS, "Logging and 監控 in Amazon Bedrock," https://docs.aws.amazon.com/bedrock/latest/userguide/logging-監控.html
- Microsoft, "Monitor Azure OpenAI Service," https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/監控
- Google 雲端, "雲端 Audit Logs for Vertex AI," https://雲端.google.com/vertex-ai/docs/general/audit-logging
- NIST SP 800-92, "Guide to Computer 安全 Log Management," https://csrc.nist.gov/publications/detail/sp/800-92/final
Why is a two-tier logging architecture recommended for 雲端 AI services?
Which signal combination most reliably indicates a 提示詞注入 campaign rather than normal usage?