Logging and Monitoring for Cloud AI Services
Implementing comprehensive logging and monitoring for cloud AI services including prompt/response capture, anomaly detection, and security-focused observability across AWS, Azure, and GCP.
Overview
Monitoring cloud AI services requires a different approach than monitoring traditional cloud workloads. Traditional monitoring focuses on infrastructure metrics (CPU, memory, network) and application metrics (request rate, error rate, latency). AI service monitoring must additionally capture the semantic content of interactions -- the prompts sent to models and the responses generated -- because this is where security-relevant events occur.
A prompt injection attack does not manifest as an HTTP error or a CPU spike. It appears as a specific pattern in the text of a prompt. A data exfiltration attempt through a model looks like a normal API call from an infrastructure perspective. Only by analyzing the content of prompts and responses can you detect these AI-specific threats.
This creates a tension: capturing prompts and responses for security monitoring means storing potentially sensitive data (customer inputs, PII, proprietary information) in log systems. The logging architecture must balance security observability against data protection requirements.
Logging Architecture
What to Log for AI Security
The following data points are essential for security monitoring of cloud AI services:
| Data Point | Security Purpose | Privacy Consideration |
|---|---|---|
| Caller identity (IAM principal) | Attribution, anomaly detection | Low sensitivity |
| Source IP address | Network anomaly detection | Low sensitivity |
| Timestamp | Timeline reconstruction | Low sensitivity |
| Model ID / deployment | Scope of impact assessment | Low sensitivity |
| Input token count | Cost monitoring, extraction detection | Low sensitivity |
| Output token count | Cost monitoring, amplification detection | Low sensitivity |
| Content filter result | Attack detection, policy compliance | Medium sensitivity |
| Prompt text | Prompt injection detection, forensics | High sensitivity |
| Response text | Data leakage detection, compliance | High sensitivity |
| Guardrail action (block/allow) | Safety control effectiveness | Medium sensitivity |
| Latency | Performance baseline, anomaly detection | Low sensitivity |
| Error codes | Access control validation | Low sensitivity |
Two-Tier Logging Strategy
Implement a two-tier logging approach that separates metadata logging (always on) from content logging (restricted):
import json
import hashlib
import time
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AISecurityLogger:
"""Two-tier logging for AI service interactions."""
metadata_logger: object # CloudWatch, Azure Monitor, Cloud Logging client
content_logger: Optional[object] = None # Separate, restricted log destination
enable_content_logging: bool = False
pii_redaction_enabled: bool = True
def log_interaction(
self,
caller_identity: str,
source_ip: str,
model_id: str,
input_text: str,
output_text: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
content_filter_result: Optional[dict] = None,
guardrail_action: Optional[str] = None,
) -> dict:
"""Log an AI service interaction at both tiers."""
timestamp = time.time()
request_id = hashlib.sha256(
f"{caller_identity}{timestamp}".encode()
).hexdigest()[:16]
# Tier 1: Metadata log (always captured)
metadata_record = {
"timestamp": timestamp,
"request_id": request_id,
"caller_identity": caller_identity,
"source_ip": source_ip,
"model_id": model_id,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": input_tokens + output_tokens,
"latency_ms": latency_ms,
"guardrail_action": guardrail_action,
"content_filter_triggered": bool(
content_filter_result and content_filter_result.get("blocked")
),
"input_hash": hashlib.sha256(input_text.encode()).hexdigest(),
}
self._write_metadata(metadata_record)
# Tier 2: Content log (restricted, optional)
if self.enable_content_logging and self.content_logger:
content_record = {
"timestamp": timestamp,
"request_id": request_id,
"input_text": self._redact_if_needed(input_text),
"output_text": self._redact_if_needed(output_text),
"content_filter_detail": content_filter_result,
}
self._write_content(content_record)
return {"request_id": request_id, "logged": True}
def _redact_if_needed(self, text: str) -> str:
"""Apply PII redaction if enabled."""
if not self.pii_redaction_enabled:
return text
import re
# Basic PII patterns -- use a dedicated PII detection library in production
patterns = {
"email": (r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", "[EMAIL_REDACTED]"),
"phone": (r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "[PHONE_REDACTED]"),
"ssn": (r"\b\d{3}-\d{2}-\d{4}\b", "[SSN_REDACTED]"),
"credit_card": (r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b", "[CC_REDACTED]"),
}
redacted = text
for name, (pattern, replacement) in patterns.items():
redacted = re.sub(pattern, replacement, redacted)
return redacted
def _write_metadata(self, record: dict):
"""Write metadata to the primary log destination."""
# Implementation depends on cloud provider
pass
def _write_content(self, record: dict):
"""Write content to the restricted log destination."""
# Implementation depends on cloud provider
passAWS Logging Configuration
Bedrock Model Invocation Logging
AWS Bedrock supports model invocation logging that captures prompts and responses to S3 or CloudWatch Logs:
import boto3
def configure_bedrock_invocation_logging(
session: boto3.Session,
s3_bucket: str,
s3_prefix: str = "bedrock-logs/",
cloudwatch_log_group: str = "/aws/bedrock/invocations",
region: str = "us-east-1",
) -> dict:
"""Configure Bedrock model invocation logging."""
bedrock = session.client("bedrock", region_name=region)
logs = session.client("logs", region_name=region)
# Create CloudWatch log group if it does not exist
try:
logs.create_log_group(
logGroupName=cloudwatch_log_group,
retentionInDays=90,
kmsKeyId="alias/bedrock-logs-key", # Use KMS encryption
)
except logs.exceptions.ResourceAlreadyExistsException:
pass
# Configure invocation logging
logging_config = {
"loggingConfig": {
"cloudWatchConfig": {
"logGroupName": cloudwatch_log_group,
"roleArn": f"arn:aws:iam::{session.client('sts').get_caller_identity()['Account']}:role/BedrockLoggingRole",
"largeDataDelivery": {
"s3Config": {
"bucketName": s3_bucket,
"keyPrefix": f"{s3_prefix}large-data/",
}
},
},
"s3Config": {
"bucketName": s3_bucket,
"keyPrefix": s3_prefix,
},
"textDataDeliveryEnabled": True,
"imageDataDeliveryEnabled": True,
"embeddingDataDeliveryEnabled": True,
}
}
bedrock.put_model_invocation_logging_configuration(**logging_config)
return {
"cloudwatch_log_group": cloudwatch_log_group,
"s3_bucket": f"s3://{s3_bucket}/{s3_prefix}",
"text_logging": True,
"image_logging": True,
"embedding_logging": True,
}CloudWatch Insights Detection Queries
Use CloudWatch Insights to detect security-relevant patterns in Bedrock invocation logs:
DETECTION_QUERIES = {
"prompt_injection_indicators": """
fields @timestamp, @message
| parse @message '"input":*' as input_text
| filter input_text like /(?i)(ignore previous|system prompt|you are now|disregard instructions|override safety)/
| stats count() as hit_count by bin(1h) as time_window
| sort time_window desc
""",
"high_token_consumers": """
fields @timestamp, identity.arn as caller, modelId,
inputTokenCount, outputTokenCount
| stats sum(inputTokenCount + outputTokenCount) as total_tokens,
count() as request_count
by caller, modelId
| filter total_tokens > 500000
| sort total_tokens desc
""",
"content_filter_blocks": """
fields @timestamp, identity.arn as caller, modelId
| filter ispresent(guardrailAction) and guardrailAction = "BLOCKED"
| stats count() as block_count by caller, bin(1h) as hour
| filter block_count > 5
| sort block_count desc
""",
"unusual_model_access": """
fields @timestamp, identity.arn as caller, modelId
| stats earliest(@timestamp) as first_seen,
latest(@timestamp) as last_seen,
count() as total_calls
by caller, modelId
| filter first_seen > ago(24h)
| sort total_calls desc
""",
"large_output_requests": """
fields @timestamp, identity.arn as caller, modelId,
outputTokenCount, inputTokenCount
| filter outputTokenCount > 4000
| stats count() as large_output_count,
avg(outputTokenCount) as avg_output,
sum(outputTokenCount) as total_output
by caller
| filter large_output_count > 10
| sort total_output desc
""",
}Azure Logging Configuration
Azure OpenAI Diagnostic Logging
from azure.identity import DefaultAzureCredential
from azure.mgmt.monitor import MonitorManagementClient
def configure_azure_openai_diagnostics(
subscription_id: str,
resource_group: str,
openai_account_name: str,
log_analytics_workspace_id: str,
storage_account_id: str = None,
) -> dict:
"""Configure comprehensive diagnostic logging for Azure OpenAI."""
credential = DefaultAzureCredential()
monitor = MonitorManagementClient(credential, subscription_id)
resource_uri = (
f"/subscriptions/{subscription_id}/resourceGroups/{resource_group}"
f"/providers/Microsoft.CognitiveServices/accounts/{openai_account_name}"
)
logs = [
{"category": "Audit", "enabled": True,
"retentionPolicy": {"enabled": True, "days": 365}},
{"category": "RequestResponse", "enabled": True,
"retentionPolicy": {"enabled": True, "days": 90}},
{"category": "Trace", "enabled": True,
"retentionPolicy": {"enabled": True, "days": 90}},
]
metrics = [
{"category": "AllMetrics", "enabled": True,
"retentionPolicy": {"enabled": True, "days": 90}},
]
settings = {
"properties": {
"workspaceId": log_analytics_workspace_id,
"logs": logs,
"metrics": metrics,
}
}
if storage_account_id:
settings["properties"]["storageAccountId"] = storage_account_id
result = monitor.diagnostic_settings.create_or_update(
resource_uri,
"openai-security-diagnostics",
settings,
)
return {
"setting_name": "openai-security-diagnostics",
"log_categories": ["Audit", "RequestResponse", "Trace"],
"retention_days": 90,
"workspace_id": log_analytics_workspace_id,
}KQL Detection Queries for Azure OpenAI
// Detect repeated content filter triggers from a single caller
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.COGNITIVESERVICES"
| where Category == "RequestResponse"
| extend parsedProperties = parse_json(properties_s)
| where parsedProperties.response.statusCode == 400
| where properties_s contains "content_filter"
| summarize
FilterHits = count(),
Models = make_set(parsedProperties.model),
FirstSeen = min(TimeGenerated),
LastSeen = max(TimeGenerated)
by CallerIPAddress
| where FilterHits > 10
| order by FilterHits desc
// Identify token consumption anomalies
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.COGNITIVESERVICES"
| where Category == "RequestResponse"
| extend parsedProperties = parse_json(properties_s)
| extend totalTokens = toint(parsedProperties.usage.total_tokens)
| summarize
AvgTokens = avg(totalTokens),
MaxTokens = max(totalTokens),
P95Tokens = percentile(totalTokens, 95),
RequestCount = count()
by CallerIPAddress, bin(TimeGenerated, 1h)
| where AvgTokens > 2 * P95Tokens // Flag when average exceeds 2x the 95th percentile
| order by AvgTokens desc
// Monitor for API key usage (should be zero if managed identity is enforced)
AzureDiagnostics
| where ResourceProvider == "MICROSOFT.COGNITIVESERVICES"
| where Category == "Audit"
| where properties_s !contains "managedidentity"
| where properties_s contains "key"
| project TimeGenerated, CallerIPAddress, OperationName, properties_s
| order by TimeGenerated desc
// Track deployment changes for configuration drift
AzureActivity
| where ResourceProviderValue == "MICROSOFT.COGNITIVESERVICES"
| where OperationNameValue has_any ("DEPLOYMENTS/WRITE", "RAIPOLICIES/WRITE", "ACCOUNTS/WRITE")
| project TimeGenerated, Caller, OperationNameValue, Properties
| order by TimeGenerated desc
GCP Logging Configuration
Vertex AI Audit Logging
GCP Cloud Audit Logs for Vertex AI require enabling Data Access logs, which are not enabled by default:
from google.cloud import logging_v2
def enable_vertex_data_access_logs(project_id: str) -> dict:
"""Enable Data Access audit logs for Vertex AI."""
# Data Access logs must be enabled via the IAM audit config
# This is typically done via gcloud or Terraform:
#
# gcloud projects set-iam-policy PROJECT_ID policy.yaml
#
# Where policy.yaml includes:
audit_config = {
"service": "aiplatform.googleapis.com",
"auditLogConfigs": [
{"logType": "ADMIN_READ"},
{"logType": "DATA_READ"},
{"logType": "DATA_WRITE"},
],
}
return {
"audit_config": audit_config,
"detail": "Apply this audit config to the project IAM policy. "
"DATA_READ captures model prediction calls. "
"DATA_WRITE captures model upload and training operations.",
"gcloud_command": (
f"gcloud projects get-iam-policy {project_id} --format=json > policy.json && "
"# Add auditConfigs to policy.json, then: "
f"gcloud projects set-iam-policy {project_id} policy.json"
),
}
def query_vertex_security_events(
project_id: str,
hours_back: int = 24,
) -> dict:
"""Query Vertex AI security events from Cloud Logging."""
client = logging_v2.Client(project=project_id)
queries = {
"model_prediction_calls": (
f'resource.type="aiplatform.googleapis.com/Endpoint" '
f'protoPayload.methodName="google.cloud.aiplatform.v1.PredictionService.Predict" '
f'timestamp>="{hours_back}h ago"'
),
"permission_denied": (
f'resource.type="aiplatform.googleapis.com" '
f'protoPayload.status.code=7 '
f'timestamp>="{hours_back}h ago"'
),
"model_modifications": (
f'resource.type="aiplatform.googleapis.com/Model" '
f'protoPayload.methodName=~"Create|Update|Delete" '
f'timestamp>="{hours_back}h ago"'
),
"endpoint_changes": (
f'resource.type="aiplatform.googleapis.com/Endpoint" '
f'protoPayload.methodName=~"Deploy|Undeploy|Create|Delete" '
f'timestamp>="{hours_back}h ago"'
),
}
results = {}
for name, filter_str in queries.items():
entries = list(client.list_entries(filter_=filter_str, max_results=50))
results[name] = {
"count": len(entries),
"sample": [
{
"timestamp": str(e.timestamp),
"method": getattr(e.payload, "get", lambda *a: "n/a")("methodName", "n/a"),
}
for e in entries[:5]
],
}
return resultsSecurity Monitoring Dashboard
Key Metrics to Display
Design a security monitoring dashboard with these panels:
def define_ai_security_dashboard() -> dict:
"""Define panels for an AI security monitoring dashboard."""
return {
"dashboard_name": "Cloud AI Security Operations",
"refresh_interval": "1m",
"panels": [
{
"title": "Content Filter Blocks (24h)",
"type": "timeseries",
"metric": "guardrail_blocks_per_hour",
"threshold": {"warning": 10, "critical": 50},
"description": "Spikes indicate potential attack campaigns",
},
{
"title": "Token Consumption by Identity",
"type": "bar_chart",
"metric": "total_tokens_by_caller",
"threshold": {"warning": 500000, "critical": 2000000},
"description": "Identifies top consumers and anomalies",
},
{
"title": "Unique Callers (24h)",
"type": "stat",
"metric": "distinct_caller_identities",
"description": "New callers may indicate credential compromise",
},
{
"title": "Error Rate by Model",
"type": "timeseries",
"metric": "error_rate_per_model",
"threshold": {"warning": 0.05, "critical": 0.15},
"description": "High error rates may indicate scanning or fuzzing",
},
{
"title": "Cost Burn Rate (USD/hour)",
"type": "gauge",
"metric": "estimated_cost_per_hour",
"threshold": {"warning": 50, "critical": 200},
"description": "Real-time cost monitoring for denial-of-wallet detection",
},
{
"title": "Suspicious Prompt Patterns",
"type": "table",
"metric": "prompts_matching_injection_patterns",
"description": "Prompts matching known injection signatures",
},
{
"title": "Model Deployment Changes",
"type": "event_log",
"metric": "deployment_configuration_events",
"description": "Track guardrail, model, and endpoint changes",
},
{
"title": "Access Denied Events",
"type": "timeseries",
"metric": "access_denied_count",
"threshold": {"warning": 20, "critical": 100},
"description": "High denial rates indicate enumeration or brute force",
},
],
}Alert Correlation and Escalation
Multi-Signal Alert Rules
Individual metrics often generate false positives. Correlate multiple signals to improve alert fidelity:
from dataclasses import dataclass
from typing import Optional
@dataclass
class CorrelatedAlert:
"""Multi-signal alert rule for AI security events."""
name: str
severity: str
signals: list
threshold: dict
escalation: str
ALERT_RULES = [
CorrelatedAlert(
name="Active Prompt Injection Campaign",
severity="P2",
signals=[
"content_filter_blocks > 10 in 5min",
"unique_payloads > 5 in 5min (by input hash)",
"single_caller_identity",
],
threshold={"signals_required": 2, "within_minutes": 10},
escalation="Page AI security on-call, block caller identity",
),
CorrelatedAlert(
name="Credential Compromise - AI Service",
severity="P1",
signals=[
"new_source_ip for existing caller",
"off_hours_access",
"model_access_pattern_change",
"high_token_consumption",
],
threshold={"signals_required": 3, "within_minutes": 30},
escalation="Page security on-call, suspend credential, preserve logs",
),
CorrelatedAlert(
name="Denial of Wallet Attack",
severity="P1",
signals=[
"token_consumption > 5x baseline",
"cost_burn_rate > $100/hour",
"single_caller_or_ip",
],
threshold={"signals_required": 2, "within_minutes": 15},
escalation="Page FinOps and security, apply emergency rate limit",
),
CorrelatedAlert(
name="Safety Control Tampering",
severity="P1",
signals=[
"guardrail_configuration_changed",
"content_filter_policy_modified",
"unusual_admin_access_pattern",
],
threshold={"signals_required": 1, "within_minutes": 5},
escalation="Page AI platform team, verify change was authorized",
),
]Log Retention and Compliance
Retention Requirements by Data Type
| Log Type | Minimum Retention | Encryption | Access Control |
|---|---|---|---|
| Management API audit logs | 365 days | KMS/CMK | Security team + auditors |
| Invocation metadata | 90 days | KMS/CMK | Security team + ML platform |
| Prompt/response content | 30-90 days | KMS/CMK, separate key | Security team only (restricted) |
| Content filter results | 90 days | KMS/CMK | Security team + compliance |
| Cost and usage metrics | 365 days | Standard | FinOps + security |
References
- AWS, "Logging and monitoring in Amazon Bedrock," https://docs.aws.amazon.com/bedrock/latest/userguide/logging-monitoring.html
- Microsoft, "Monitor Azure OpenAI Service," https://learn.microsoft.com/en-us/azure/ai-services/openai/how-to/monitoring
- Google Cloud, "Cloud Audit Logs for Vertex AI," https://cloud.google.com/vertex-ai/docs/general/audit-logging
- NIST SP 800-92, "Guide to Computer Security Log Management," https://csrc.nist.gov/publications/detail/sp/800-92/final
Why is a two-tier logging architecture recommended for cloud AI services?
Which signal combination most reliably indicates a prompt injection campaign rather than normal usage?