Cloud AI Forensics: AWS
Forensic investigation techniques for AWS AI services including SageMaker, Bedrock, and associated infrastructure logging and evidence collection.
Overview
Amazon Web Services hosts a significant share of production AI workloads through services like SageMaker (model training and hosting), Bedrock (managed foundation models), Comprehend (NLP), Rekognition (computer vision), and Textract (document processing). When a security incident involves these services, the forensic investigator must navigate AWS's distributed logging architecture to reconstruct what happened.
AWS AI forensics is challenging because relevant evidence is spread across multiple services and log sources. A single SageMaker inference call may generate records in CloudTrail (the API call), CloudWatch (the endpoint metrics), S3 (the input/output data if logging is enabled), VPC Flow Logs (the network traffic), and the SageMaker endpoint's container logs. Missing any one of these sources can leave gaps in the investigation.
Additionally, AWS's shared responsibility model means that some forensic artifacts are under your control (your model code, training data, endpoint configurations) while others are managed by AWS (the underlying infrastructure, the Bedrock foundation model weights). Understanding what evidence is available and how to acquire it quickly is essential for effective incident response.
This article covers the forensic artifacts available across AWS AI services, how to acquire and preserve them, and how to analyze them to investigate common incident scenarios: unauthorized model access, training data theft, model tampering, and abuse of managed AI services.
AWS AI Service Forensic Artifacts
CloudTrail: The Foundation of AWS Forensics
CloudTrail records API calls across all AWS services and is the starting point for any AWS forensic investigation. For AI services, CloudTrail captures who performed what action, when, and from where.
import boto3
import json
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class CloudTrailEvent:
"""A parsed CloudTrail event relevant to AI forensics."""
event_time: str
event_name: str
event_source: str
aws_region: str
source_ip: str
user_identity: dict
request_parameters: dict
response_elements: dict
error_code: Optional[str] = None
error_message: Optional[str] = None
raw_event: dict = field(default_factory=dict)
class AWSAIForensicCollector:
"""Collect and analyze forensic artifacts from AWS AI services."""
# AI-related CloudTrail event sources
AI_EVENT_SOURCES = {
"sagemaker.amazonaws.com",
"bedrock.amazonaws.com",
"comprehend.amazonaws.com",
"rekognition.amazonaws.com",
"textract.amazonaws.com",
"translate.amazonaws.com",
"transcribe.amazonaws.com",
}
# High-risk SageMaker actions that warrant investigation
SAGEMAKER_HIGH_RISK_ACTIONS = {
"CreateModel",
"CreateEndpoint",
"UpdateEndpoint",
"CreateTrainingJob",
"CreateProcessingJob",
"CreateNotebookInstance",
"StartNotebookInstance",
"CreatePresignedNotebookInstanceUrl",
"CreateModelPackage",
"UpdateModelPackage",
}
# Bedrock actions of forensic interest
BEDROCK_SENSITIVE_ACTIONS = {
"InvokeModel",
"InvokeModelWithResponseStream",
"CreateModelCustomizationJob",
"CreateProvisionedModelThroughput",
"GetFoundationModel",
"ListFoundationModels",
"CreateGuardrail",
"UpdateGuardrail",
"DeleteGuardrail",
}
def __init__(self, region: str = "us-east-1", profile: Optional[str] = None):
session_kwargs = {}
if profile:
session_kwargs["profile_name"] = profile
self.session = boto3.Session(region_name=region, **session_kwargs)
self.cloudtrail = self.session.client("cloudtrail")
self.logs = self.session.client("logs")
self.s3 = self.session.client("s3")
self.sagemaker = self.session.client("sagemaker")
def collect_ai_cloudtrail_events(
self,
start_time: datetime,
end_time: datetime,
event_sources: Optional[set[str]] = None,
) -> list[CloudTrailEvent]:
"""
Collect CloudTrail events related to AI services.
Args:
start_time: Start of the investigation window.
end_time: End of the investigation window.
event_sources: Specific event sources to filter.
Defaults to all AI services.
Returns:
List of parsed CloudTrail events.
"""
sources = event_sources or self.AI_EVENT_SOURCES
events = []
for source in sources:
try:
paginator = self.cloudtrail.get_paginator("lookup_events")
page_iterator = paginator.paginate(
LookupAttributes=[
{
"AttributeKey": "EventSource",
"AttributeValue": source,
}
],
StartTime=start_time,
EndTime=end_time,
)
for page in page_iterator:
for event in page.get("Events", []):
parsed = self._parse_cloudtrail_event(event)
if parsed:
events.append(parsed)
except Exception as e:
print(f"Error collecting events from {source}: {e}")
events.sort(key=lambda e: e.event_time)
return events
def _parse_cloudtrail_event(self, raw_event: dict) -> Optional[CloudTrailEvent]:
"""Parse a raw CloudTrail event into a structured format."""
try:
cloud_trail_event = json.loads(
raw_event.get("CloudTrailEvent", "{}")
)
except json.JSONDecodeError:
return None
return CloudTrailEvent(
event_time=str(raw_event.get("EventTime", "")),
event_name=raw_event.get("EventName", ""),
event_source=cloud_trail_event.get("eventSource", ""),
aws_region=cloud_trail_event.get("awsRegion", ""),
source_ip=cloud_trail_event.get("sourceIPAddress", ""),
user_identity=cloud_trail_event.get("userIdentity", {}),
request_parameters=cloud_trail_event.get("requestParameters", {}),
response_elements=cloud_trail_event.get("responseElements", {}),
error_code=cloud_trail_event.get("errorCode"),
error_message=cloud_trail_event.get("errorMessage"),
raw_event=cloud_trail_event,
)
def identify_suspicious_activity(
self,
events: list[CloudTrailEvent],
) -> list[dict]:
"""
Analyze CloudTrail events for suspicious AI-related activity.
Args:
events: List of parsed CloudTrail events.
Returns:
List of suspicious activity findings.
"""
findings = []
for event in events:
# Check for high-risk SageMaker actions
if (
event.event_source == "sagemaker.amazonaws.com"
and event.event_name in self.SAGEMAKER_HIGH_RISK_ACTIONS
):
findings.append({
"type": "high_risk_sagemaker_action",
"severity": "high",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"details": event.request_parameters,
})
# Check for Bedrock model access
if (
event.event_source == "bedrock.amazonaws.com"
and event.event_name in self.BEDROCK_SENSITIVE_ACTIONS
):
findings.append({
"type": "sensitive_bedrock_action",
"severity": "medium",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"details": event.request_parameters,
})
# Check for failed access attempts (possible enumeration)
if event.error_code in (
"AccessDeniedException",
"UnauthorizedAccess",
"AccessDenied",
):
findings.append({
"type": "access_denied",
"severity": "medium",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"error": event.error_code,
"details": event.request_parameters,
})
# Check for unusual source IPs (console vs programmatic)
if event.source_ip == "AWS Internal":
# This is normal for service-linked actions
pass
elif self._is_suspicious_ip_pattern(event):
findings.append({
"type": "suspicious_source",
"severity": "high",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
})
return findings
def _extract_user_id(self, user_identity: dict) -> str:
"""Extract a readable user identifier from CloudTrail user identity."""
arn = user_identity.get("arn", "")
principal = user_identity.get("principalId", "")
user_type = user_identity.get("type", "")
if user_type == "AssumedRole":
# Extract role session name
session_context = user_identity.get("sessionContext", {})
session_issuer = session_context.get("sessionIssuer", {})
return f"{session_issuer.get('userName', 'unknown')} (assumed role)"
elif user_type == "IAMUser":
return user_identity.get("userName", arn)
elif user_type == "Root":
return "ROOT ACCOUNT"
return principal or arn or "unknown"
def _is_suspicious_ip_pattern(self, event: CloudTrailEvent) -> bool:
"""Check if the source IP matches suspicious patterns."""
ip = event.source_ip
# Flag if using known VPN/Tor exit node ranges
# In practice, check against a threat intelligence feed
# For now, flag non-AWS console access from unusual patterns
user_agent = event.raw_event.get("userAgent", "")
if "signin.amazonaws.com" not in event.source_ip:
if "boto" not in user_agent and "aws-cli" not in user_agent:
if "sdk" not in user_agent.lower():
return True
return FalseSageMaker Forensics
SageMaker is AWS's primary ML platform and generates forensic artifacts across multiple subsystems. Training jobs produce logs in CloudWatch, model artifacts in S3, and metadata in the SageMaker API. Endpoints produce inference logs, container logs, and CloudWatch metrics.
class SageMakerForensicAnalyzer:
"""Forensic analysis specific to AWS SageMaker."""
def __init__(self, session: boto3.Session):
self.sagemaker = session.client("sagemaker")
self.logs = session.client("logs")
self.s3 = session.client("s3")
def investigate_training_job(self, job_name: str) -> dict:
"""
Collect forensic artifacts for a SageMaker training job.
Args:
job_name: The training job name.
Returns:
Dict with training job forensic data.
"""
try:
job = self.sagemaker.describe_training_job(
TrainingJobName=job_name
)
except Exception as e:
return {"error": str(e)}
artifacts = {
"job_name": job_name,
"status": job.get("TrainingJobStatus"),
"creation_time": str(job.get("CreationTime")),
"training_start_time": str(job.get("TrainingStartTime")),
"training_end_time": str(job.get("TrainingEndTime")),
"last_modified_time": str(job.get("LastModifiedTime")),
"role_arn": job.get("RoleArn"),
"algorithm": job.get("AlgorithmSpecification", {}),
"input_channels": [],
"output_location": "",
"instance_type": "",
"vpc_config": job.get("VpcConfig"),
"tags": [],
}
# Input data channels - where did training data come from?
for channel in job.get("InputDataConfig", []):
artifacts["input_channels"].append({
"channel_name": channel.get("ChannelName"),
"data_source": channel.get("DataSource", {}),
"content_type": channel.get("ContentType"),
"compression_type": channel.get("CompressionType"),
})
# Output location - where are model artifacts stored?
output_config = job.get("OutputDataConfig", {})
artifacts["output_location"] = output_config.get("S3OutputPath", "")
# Resource configuration
resource_config = job.get("ResourceConfig", {})
artifacts["instance_type"] = resource_config.get("InstanceType", "")
artifacts["instance_count"] = resource_config.get("InstanceCount", 0)
artifacts["volume_size_gb"] = resource_config.get(
"VolumeSizeInGB", 0
)
# Collect tags
try:
tag_response = self.sagemaker.list_tags(
ResourceArn=job.get("TrainingJobArn", "")
)
artifacts["tags"] = tag_response.get("Tags", [])
except Exception:
pass
return artifacts
def investigate_endpoint(self, endpoint_name: str) -> dict:
"""
Collect forensic artifacts for a SageMaker endpoint.
Args:
endpoint_name: The endpoint name.
Returns:
Dict with endpoint forensic data.
"""
try:
endpoint = self.sagemaker.describe_endpoint(
EndpointName=endpoint_name
)
except Exception as e:
return {"error": str(e)}
artifacts = {
"endpoint_name": endpoint_name,
"status": endpoint.get("EndpointStatus"),
"creation_time": str(endpoint.get("CreationTime")),
"last_modified_time": str(endpoint.get("LastModifiedTime")),
"endpoint_config_name": endpoint.get("EndpointConfigName"),
"production_variants": [],
"data_capture_config": None,
}
# Get endpoint configuration details
config_name = endpoint.get("EndpointConfigName")
if config_name:
try:
config = self.sagemaker.describe_endpoint_config(
EndpointConfigName=config_name
)
for variant in config.get("ProductionVariants", []):
artifacts["production_variants"].append({
"variant_name": variant.get("VariantName"),
"model_name": variant.get("ModelName"),
"instance_type": variant.get("InstanceType"),
"instance_count": variant.get(
"InitialInstanceCount"
),
})
# Check data capture configuration
capture = config.get("DataCaptureConfig")
if capture:
artifacts["data_capture_config"] = {
"enabled": capture.get(
"EnableCapture", False
),
"sampling_percentage": capture.get(
"InitialSamplingPercentage", 0
),
"destination": capture.get(
"DestinationS3Uri", ""
),
"capture_options": capture.get(
"CaptureOptions", []
),
}
except Exception as e:
artifacts["config_error"] = str(e)
return artifacts
def collect_endpoint_logs(
self,
endpoint_name: str,
start_time: datetime,
end_time: datetime,
max_events: int = 10000,
) -> list[dict]:
"""
Collect CloudWatch logs for a SageMaker endpoint.
Args:
endpoint_name: The endpoint name.
start_time: Start of log window.
end_time: End of log window.
max_events: Maximum number of log events to retrieve.
Returns:
List of log events.
"""
log_group = f"/aws/sagemaker/Endpoints/{endpoint_name}"
events = []
try:
paginator = self.logs.get_paginator("filter_log_events")
page_iterator = paginator.paginate(
logGroupName=log_group,
startTime=int(start_time.timestamp() * 1000),
endTime=int(end_time.timestamp() * 1000),
limit=max_events,
)
for page in page_iterator:
for event in page.get("events", []):
events.append({
"timestamp": datetime.fromtimestamp(
event["timestamp"] / 1000
).isoformat(),
"message": event.get("message", ""),
"log_stream": event.get("logStreamName", ""),
})
if len(events) >= max_events:
break
except self.logs.exceptions.ResourceNotFoundException:
return [{"error": f"Log group {log_group} not found"}]
except Exception as e:
return [{"error": str(e)}]
return eventsBedrock Forensics
AWS Bedrock provides managed access to foundation models. Bedrock's forensic artifacts are primarily in CloudTrail (API calls) and, if enabled, Bedrock's model invocation logging which captures full request/response payloads to S3 or CloudWatch.
class BedrockForensicAnalyzer:
"""Forensic analysis specific to AWS Bedrock."""
def __init__(self, session: boto3.Session):
self.bedrock = session.client("bedrock")
self.bedrock_runtime = session.client("bedrock-runtime")
self.s3 = session.client("s3")
self.logs = session.client("logs")
def check_logging_configuration(self) -> dict:
"""
Check whether Bedrock model invocation logging is enabled.
This is critical for forensic readiness.
Returns:
Dict describing the logging configuration.
"""
try:
response = self.bedrock.get_model_invocation_logging_configuration()
config = response.get("loggingConfig", {})
return {
"logging_enabled": bool(config),
"s3_config": config.get("s3Config", {}),
"cloudwatch_config": config.get("cloudWatchConfig", {}),
"text_data_delivery_enabled": config.get(
"textDataDeliveryEnabled", False
),
"image_data_delivery_enabled": config.get(
"imageDataDeliveryEnabled", False
),
"embedding_data_delivery_enabled": config.get(
"embeddingDataDeliveryEnabled", False
),
"forensic_readiness": self._assess_forensic_readiness(config),
}
except Exception as e:
return {"error": str(e), "logging_enabled": False}
def _assess_forensic_readiness(self, config: dict) -> dict:
"""Assess how well Bedrock logging supports forensic investigation."""
issues = []
score = 0
if not config:
return {
"score": 0,
"max_score": 5,
"issues": ["Model invocation logging is not enabled."],
}
if config.get("s3Config"):
score += 2
else:
issues.append("S3 logging not configured. Enable for durable log storage.")
if config.get("cloudWatchConfig"):
score += 1
else:
issues.append("CloudWatch logging not configured. Enable for real-time analysis.")
if config.get("textDataDeliveryEnabled"):
score += 1
else:
issues.append(
"Text data delivery disabled. Full prompt/response "
"content will not be logged."
)
if config.get("imageDataDeliveryEnabled"):
score += 1
else:
issues.append("Image data delivery disabled.")
return {
"score": score,
"max_score": 5,
"issues": issues,
}
def collect_bedrock_invocation_logs(
self,
s3_bucket: str,
s3_prefix: str,
start_date: str,
end_date: str,
) -> list[dict]:
"""
Collect Bedrock model invocation logs from S3.
Args:
s3_bucket: S3 bucket where Bedrock logs are stored.
s3_prefix: S3 prefix (folder path).
start_date: Start date (YYYY-MM-DD).
end_date: End date (YYYY-MM-DD).
Returns:
List of invocation log entries.
"""
entries = []
paginator = self.s3.get_paginator("list_objects_v2")
try:
page_iterator = paginator.paginate(
Bucket=s3_bucket,
Prefix=s3_prefix,
)
for page in page_iterator:
for obj in page.get("Contents", []):
key = obj["Key"]
last_modified = obj["LastModified"]
# Filter by date range
obj_date = last_modified.strftime("%Y-%m-%d")
if obj_date < start_date or obj_date > end_date:
continue
# Download and parse log file
try:
response = self.s3.get_object(
Bucket=s3_bucket, Key=key
)
content = response["Body"].read().decode("utf-8")
# Bedrock logs are JSONL format
for line in content.strip().split("\n"):
if line.strip():
entry = json.loads(line)
entries.append(entry)
except Exception as e:
entries.append({
"error": str(e),
"s3_key": key,
})
except Exception as e:
return [{"error": str(e)}]
return entriesS3 Access Log Analysis for Training Data Forensics
Training data for SageMaker models is stored in S3, and S3 access logs provide a detailed record of every access to that data. Enable S3 server access logging on all buckets containing training data and model artifacts. The logs record the requester's identity, the operation (GetObject, PutObject, DeleteObject), the specific object key, and the timestamp.
For forensic investigations, S3 access logs reveal who accessed training data (was it the expected SageMaker training role or an unauthorized identity?), when the data was accessed (does the timing align with legitimate training jobs?), and what data was accessed (did someone selectively download specific subsets of training data?).
def analyze_s3_access_logs(
log_entries: list[dict],
expected_roles: set[str],
training_data_prefix: str,
) -> list[dict]:
"""
Analyze S3 access logs for suspicious training data access.
Args:
log_entries: Parsed S3 access log entries.
expected_roles: Set of IAM role ARNs expected to access this data.
training_data_prefix: S3 key prefix for training data.
Returns:
List of suspicious access findings.
"""
findings = []
for entry in log_entries:
key = entry.get("key", "")
requester = entry.get("requester", "")
operation = entry.get("operation", "")
timestamp = entry.get("timestamp", "")
# Only analyze training data access
if not key.startswith(training_data_prefix):
continue
# Check for unexpected requesters
if requester not in expected_roles and requester != "-":
findings.append({
"type": "unauthorized_training_data_access",
"severity": "high",
"key": key,
"requester": requester,
"operation": operation,
"timestamp": timestamp,
})
# Check for bulk download patterns (many GetObject in short time)
if operation == "REST.GET.OBJECT":
findings.append({
"type": "training_data_read",
"severity": "info",
"key": key,
"requester": requester,
"timestamp": timestamp,
})
# Detect bulk download: many reads from same requester in short window
requester_reads = {}
for f in findings:
if f["type"] == "training_data_read":
req = f["requester"]
if req not in requester_reads:
requester_reads[req] = []
requester_reads[req].append(f)
for req, reads in requester_reads.items():
if len(reads) > 100: # Threshold for bulk access
findings.append({
"type": "bulk_training_data_download",
"severity": "critical",
"requester": req,
"object_count": len(reads),
"first_access": reads[0]["timestamp"],
"last_access": reads[-1]["timestamp"],
})
return [f for f in findings if f["type"] != "training_data_read"]Investigating Common Incident Scenarios
IAM Policy Analysis for AI Service Investigations
Understanding what an attacker could do with compromised credentials requires analyzing the IAM policies attached to the compromised identity. AWS IAM policies for AI services can be complex, combining service-specific actions with resource-level permissions and conditions.
def analyze_iam_permissions_for_ai(
session: boto3.Session,
identity_arn: str,
) -> dict:
"""
Analyze the AI-service-related IAM permissions for an identity.
Args:
session: boto3 session.
identity_arn: ARN of the IAM user or role to analyze.
Returns:
Dict of AI service permissions.
"""
iam = session.client("iam")
ai_services = [
"sagemaker", "bedrock", "comprehend",
"rekognition", "textract", "translate",
]
# This is a simplified analysis; in practice, use
# IAM Policy Simulator for authoritative results
permissions = {svc: [] for svc in ai_services}
try:
# Get the IAM policy simulator results
simulator = iam.simulate_principal_policy(
PolicySourceArn=identity_arn,
ActionNames=[
f"{svc}:*" for svc in ai_services
],
)
for result in simulator.get("EvaluationResults", []):
action = result.get("EvalActionName", "")
decision = result.get("EvalDecision", "")
for svc in ai_services:
if action.startswith(f"{svc}:"):
permissions[svc].append({
"action": action,
"decision": decision,
})
except Exception as e:
return {"error": str(e)}
# Summarize risk level per service
summary = {}
for svc, perms in permissions.items():
allowed = [p for p in perms if p["decision"] == "allowed"]
summary[svc] = {
"allowed_actions": len(allowed),
"total_evaluated": len(perms),
"risk_level": (
"critical" if len(allowed) > 10
else "high" if len(allowed) > 5
else "medium" if len(allowed) > 0
else "none"
),
}
return summaryScenario 1: Unauthorized Model Access
An attacker gains AWS credentials (through phishing, credential stuffing, or a leaked access key) and uses them to access AI services. The investigation focuses on identifying what the attacker accessed, what data they processed, and whether they extracted any models or training data.
The investigation workflow is: (1) Identify the compromised credentials from CloudTrail user identity fields. (2) Filter all CloudTrail events for that identity across all AI service event sources. (3) Check for data access patterns: S3 reads of model artifacts or training data, SageMaker DescribeTrainingJob calls that expose input data locations, Bedrock InvokeModel calls that may process sensitive data. (4) Check for persistence: new IAM policies, new SageMaker notebook instances, new endpoints, or new Bedrock custom model jobs.
Scenario 2: Training Data Exfiltration
An insider or compromised service role copies training data from S3 buckets used by SageMaker. Investigate by: (1) Collecting S3 access logs for the training data buckets. (2) Cross-referencing S3 GetObject events with CloudTrail to identify the requesting identity. (3) Checking for unusual download volumes or access from unexpected IPs. (4) Examining whether the data was copied to another S3 bucket or downloaded externally.
Scenario 3: Model Tampering via SageMaker
An attacker modifies a model by replacing model artifacts in S3 or deploying a modified endpoint configuration. Investigate by collecting the S3 object version history for model artifacts, comparing SHA-256 hashes of current model files against known-good baselines, reviewing CloudTrail for CreateModel, UpdateEndpoint, and PutObject events targeting model artifact paths, and checking endpoint configuration history for unauthorized changes.
Forensic Readiness Recommendations
Essential Logging Configuration
For forensic readiness in AWS AI environments, enable the following at minimum:
- CloudTrail: Enable in all regions with management and data events. Send to a centralized, immutable S3 bucket with object lock enabled.
- Bedrock invocation logging: Enable with text data delivery to S3. This captures full prompts and responses.
- SageMaker data capture: Enable on production endpoints to capture inference requests and responses.
- S3 access logging: Enable on all buckets containing training data, model artifacts, and AI application data.
- VPC Flow Logs: Enable on VPCs hosting SageMaker endpoints and notebook instances.
- CloudWatch Log retention: Set retention periods that meet your compliance requirements (minimum 90 days, ideally 1 year).
Evidence Preservation Procedures
When conducting forensic investigations on AWS AI services, follow a systematic evidence preservation workflow. First, take snapshots of the current state: export CloudTrail events to a separate S3 bucket with object lock enabled, copy CloudWatch log groups to a preservation account, and record the current configuration of all relevant SageMaker endpoints, training jobs, and Bedrock settings using the Describe API calls.
For S3-based evidence (model artifacts, training data, Bedrock invocation logs), enable versioning on the bucket if not already enabled and copy the relevant objects to an evidence preservation bucket. Record the S3 object version IDs, ETags, and checksums for each preserved object. Use S3 object lock in compliance mode to prevent deletion or modification of preserved evidence.
For SageMaker notebook instances that may contain evidence of attacker activity, create an EBS volume snapshot before stopping the instance. The snapshot preserves the notebook filesystem, including command history, downloaded files, and any custom scripts the attacker may have used. Do not stop the instance before taking the snapshot, as this ensures the in-memory state is captured.
When collecting evidence across multiple AWS accounts (common in organizations using a multi-account strategy), ensure you have cross-account access configured before the incident. An IAM role in each account that the forensic team can assume, with read-only access to CloudTrail, CloudWatch, S3, and the relevant AI services, should be pre-provisioned as part of your forensic readiness program.
Timeline Correlation Across AWS Services
A critical forensic technique for AWS AI investigations is correlating events across multiple AWS services using common identifiers. CloudTrail events include a requestID that uniquely identifies each API call. S3 access logs include the same request ID for S3 operations. VPC Flow Logs include the network connection details that correspond to API calls.
To build a complete timeline, collect all CloudTrail events for the AI services within the investigation window, all S3 access logs for relevant buckets, and all VPC Flow Logs for the VPCs hosting AI infrastructure. Join these data sources on common fields: CloudTrail requestID matches S3 access log request IDs, and CloudTrail source IP addresses match VPC Flow Log destination IPs.
This cross-service correlation reveals the full picture of attacker activity. For example, a CloudTrail event showing DescribeTrainingJob followed by S3 GetObject requests for the training data location, followed by outbound VPC Flow Log entries to an external IP, indicates a training data exfiltration sequence.
Access Control Hardening
Implement least-privilege IAM policies for AI services. Separate roles for training, deployment, and inference. Use SageMaker's VPC mode to keep training and inference traffic within your VPC. Enable AWS Config rules to detect drift from your security baseline.
AWS GuardDuty and Security Hub Integration
AWS GuardDuty provides automated threat detection that can surface AI-related security findings. GuardDuty monitors CloudTrail events and can detect anomalous API calls, unauthorized credential use, and potential data exfiltration. While GuardDuty does not have AI-specific detections, its general findings (credential compromise, unusual API call patterns, data exfiltration indicators) apply directly to AI service abuse.
Feed GuardDuty findings into AWS Security Hub to correlate with other security signals. Security Hub can aggregate findings from GuardDuty, IAM Access Analyzer, Inspector, and custom finding sources. Configure custom Security Hub insights that filter specifically for AI service-related findings using resource type filters for SageMaker, Bedrock, and Cognitive Services resources.
For organizations using AWS Organizations with a delegated security account, ensure that GuardDuty and Security Hub are enabled in all accounts that host AI workloads. A compromised developer account in a sandbox environment may provide stepping stones to production AI infrastructure through cross-account role assumptions. GuardDuty's multi-account monitoring can detect this lateral movement pattern.
When an incident is confirmed, use Security Hub's investigation workflow to correlate the AI-specific findings with broader infrastructure findings. A SageMaker data exfiltration incident, for example, may be linked to an earlier GuardDuty finding about compromised credentials or an IAM Access Analyzer finding about overly permissive cross-account access.
References
- AWS (2025). "Logging and monitoring in Amazon SageMaker." https://docs.aws.amazon.com/sagemaker/latest/dg/logging-cloudwatch.html
- AWS (2025). "Model invocation logging for Amazon Bedrock." https://docs.aws.amazon.com/bedrock/latest/userguide/model-invocation-logging.html
- NIST SP 800-92 (2006). "Guide to Computer Security Log Management." https://csrc.nist.gov/publications/detail/sp/800-92/final