Cloud AI Forensics: AWS
Forensic investigation techniques for AWS AI services including SageMaker, Bedrock, and associated infrastructure logging and evidence collection.
概覽
Amazon Web Services hosts a significant share of production AI workloads through services like SageMaker (model 訓練 and hosting), Bedrock (managed foundation models), Comprehend (NLP), Rekognition (computer vision), and Textract (document processing). When a 安全 incident involves these services, the forensic investigator must navigate AWS's distributed logging architecture to reconstruct what happened.
AWS AI forensics is challenging 因為 relevant evidence is spread across multiple services and log sources. A single SageMaker 推論 call may generate records in CloudTrail (the API call), CloudWatch (the endpoint metrics), S3 (the 輸入/輸出 data if logging is enabled), VPC Flow Logs (the network traffic), and the SageMaker endpoint's container logs. Missing any one of these sources can leave gaps in the investigation.
此外, AWS's shared responsibility model means that some forensic artifacts are under your control (your model code, 訓練資料, endpoint configurations) while others are managed by AWS (the underlying infrastructure, the Bedrock foundation model weights). 理解 what evidence is available and how to acquire it quickly is essential for effective incident response.
This article covers the forensic artifacts available across AWS AI services, how to acquire and preserve them, and how to analyze them to investigate common incident scenarios: unauthorized model access, 訓練資料 theft, model tampering, and abuse of managed AI services.
AWS AI Service Forensic Artifacts
CloudTrail: The Foundation of AWS Forensics
CloudTrail records API calls across all AWS services and is the starting point for any AWS forensic investigation. For AI services, CloudTrail captures who performed what action, when, and from where.
import boto3
import json
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class CloudTrailEvent:
"""A parsed CloudTrail event relevant to AI forensics."""
event_time: str
event_name: str
event_source: str
aws_region: str
source_ip: str
user_identity: dict
request_parameters: dict
response_elements: dict
error_code: Optional[str] = None
error_message: Optional[str] = None
raw_event: dict = field(default_factory=dict)
class AWSAIForensicCollector:
"""Collect and analyze forensic artifacts from AWS AI services."""
# AI-related CloudTrail event sources
AI_EVENT_SOURCES = {
"sagemaker.amazonaws.com",
"bedrock.amazonaws.com",
"comprehend.amazonaws.com",
"rekognition.amazonaws.com",
"textract.amazonaws.com",
"translate.amazonaws.com",
"transcribe.amazonaws.com",
}
# High-risk SageMaker actions that warrant investigation
SAGEMAKER_HIGH_RISK_ACTIONS = {
"CreateModel",
"CreateEndpoint",
"UpdateEndpoint",
"CreateTrainingJob",
"CreateProcessingJob",
"CreateNotebookInstance",
"StartNotebookInstance",
"CreatePresignedNotebookInstanceUrl",
"CreateModelPackage",
"UpdateModelPackage",
}
# Bedrock actions of forensic interest
BEDROCK_SENSITIVE_ACTIONS = {
"InvokeModel",
"InvokeModelWithResponseStream",
"CreateModelCustomizationJob",
"CreateProvisionedModelThroughput",
"GetFoundationModel",
"ListFoundationModels",
"CreateGuardrail",
"UpdateGuardrail",
"DeleteGuardrail",
}
def __init__(self, region: str = "us-east-1", profile: Optional[str] = None):
session_kwargs = {}
if profile:
session_kwargs["profile_name"] = profile
self.session = boto3.Session(region_name=region, **session_kwargs)
self.cloudtrail = self.session.client("cloudtrail")
self.logs = self.session.client("logs")
self.s3 = self.session.client("s3")
self.sagemaker = self.session.client("sagemaker")
def collect_ai_cloudtrail_events(
self,
start_time: datetime,
end_time: datetime,
event_sources: Optional[set[str]] = None,
) -> list[CloudTrailEvent]:
"""
Collect CloudTrail events related to AI services.
Args:
start_time: Start of the investigation window.
end_time: End of the investigation window.
event_sources: Specific event sources to filter.
Defaults to all AI services.
Returns:
List of parsed CloudTrail events.
"""
sources = event_sources or self.AI_EVENT_SOURCES
events = []
for source in sources:
try:
paginator = self.cloudtrail.get_paginator("lookup_events")
page_iterator = paginator.paginate(
LookupAttributes=[
{
"AttributeKey": "EventSource",
"AttributeValue": source,
}
],
StartTime=start_time,
EndTime=end_time,
)
for page in page_iterator:
for event in page.get("Events", []):
parsed = self._parse_cloudtrail_event(event)
if parsed:
events.append(parsed)
except Exception as e:
print(f"Error collecting events from {source}: {e}")
events.sort(key=lambda e: e.event_time)
return events
def _parse_cloudtrail_event(self, raw_event: dict) -> Optional[CloudTrailEvent]:
"""Parse a raw CloudTrail event into a structured format."""
try:
cloud_trail_event = json.loads(
raw_event.get("CloudTrailEvent", "{}")
)
except json.JSONDecodeError:
return None
return CloudTrailEvent(
event_time=str(raw_event.get("EventTime", "")),
event_name=raw_event.get("EventName", ""),
event_source=cloud_trail_event.get("eventSource", ""),
aws_region=cloud_trail_event.get("awsRegion", ""),
source_ip=cloud_trail_event.get("sourceIPAddress", ""),
user_identity=cloud_trail_event.get("userIdentity", {}),
request_parameters=cloud_trail_event.get("requestParameters", {}),
response_elements=cloud_trail_event.get("responseElements", {}),
error_code=cloud_trail_event.get("errorCode"),
error_message=cloud_trail_event.get("errorMessage"),
raw_event=cloud_trail_event,
)
def identify_suspicious_activity(
self,
events: list[CloudTrailEvent],
) -> list[dict]:
"""
Analyze CloudTrail events for suspicious AI-related activity.
Args:
events: List of parsed CloudTrail events.
Returns:
List of suspicious activity findings.
"""
findings = []
for event in events:
# Check for high-risk SageMaker actions
if (
event.event_source == "sagemaker.amazonaws.com"
and event.event_name in self.SAGEMAKER_HIGH_RISK_ACTIONS
):
findings.append({
"type": "high_risk_sagemaker_action",
"severity": "high",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"details": event.request_parameters,
})
# Check for Bedrock model access
if (
event.event_source == "bedrock.amazonaws.com"
and event.event_name in self.BEDROCK_SENSITIVE_ACTIONS
):
findings.append({
"type": "sensitive_bedrock_action",
"severity": "medium",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"details": event.request_parameters,
})
# Check for failed access attempts (possible enumeration)
if event.error_code in (
"AccessDeniedException",
"UnauthorizedAccess",
"AccessDenied",
):
findings.append({
"type": "access_denied",
"severity": "medium",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"error": event.error_code,
"details": event.request_parameters,
})
# Check for unusual source IPs (console vs programmatic)
if event.source_ip == "AWS Internal":
# 這是 normal for service-linked actions
pass
elif self._is_suspicious_ip_pattern(event):
findings.append({
"type": "suspicious_source",
"severity": "high",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
})
return findings
def _extract_user_id(self, user_identity: dict) -> str:
"""Extract a readable user identifier from CloudTrail user identity."""
arn = user_identity.get("arn", "")
principal = user_identity.get("principalId", "")
user_type = user_identity.get("type", "")
if user_type == "AssumedRole":
# Extract role session name
session_context = user_identity.get("sessionContext", {})
session_issuer = session_context.get("sessionIssuer", {})
return f"{session_issuer.get('userName', 'unknown')} (assumed role)"
elif user_type == "IAMUser":
return user_identity.get("userName", arn)
elif user_type == "Root":
return "ROOT ACCOUNT"
return principal or arn or "unknown"
def _is_suspicious_ip_pattern(self, event: CloudTrailEvent) -> bool:
"""Check if the source IP matches suspicious patterns."""
ip = event.source_ip
# Flag if using known VPN/Tor exit node ranges
# In practice, check against a threat intelligence feed
# For now, flag non-AWS console access from unusual patterns
user_agent = event.raw_event.get("userAgent", "")
if "signin.amazonaws.com" not in event.source_ip:
if "boto" not in user_agent and "aws-cli" not in user_agent:
if "sdk" not in user_agent.lower():
return True
return FalseSageMaker Forensics
SageMaker is AWS's primary ML platform and generates forensic artifacts across multiple subsystems. Training jobs produce logs in CloudWatch, model artifacts in S3, and metadata in the SageMaker API. Endpoints produce 推論 logs, container logs, and CloudWatch metrics.
class SageMakerForensicAnalyzer:
"""Forensic analysis specific to AWS SageMaker."""
def __init__(self, session: boto3.Session):
self.sagemaker = session.client("sagemaker")
self.logs = session.client("logs")
self.s3 = session.client("s3")
def investigate_training_job(self, job_name: str) -> dict:
"""
Collect forensic artifacts for a SageMaker 訓練 job.
Args:
job_name: The 訓練 job name.
Returns:
Dict with 訓練 job forensic data.
"""
try:
job = self.sagemaker.describe_training_job(
TrainingJobName=job_name
)
except Exception as e:
return {"error": str(e)}
artifacts = {
"job_name": job_name,
"status": job.get("TrainingJobStatus"),
"creation_time": str(job.get("CreationTime")),
"training_start_time": str(job.get("TrainingStartTime")),
"training_end_time": str(job.get("TrainingEndTime")),
"last_modified_time": str(job.get("LastModifiedTime")),
"role_arn": job.get("RoleArn"),
"algorithm": job.get("AlgorithmSpecification", {}),
"input_channels": [],
"output_location": "",
"instance_type": "",
"vpc_config": job.get("VpcConfig"),
"tags": [],
}
# 輸入 data channels - where did 訓練資料 come from?
for channel in job.get("InputDataConfig", []):
artifacts["input_channels"].append({
"channel_name": channel.get("ChannelName"),
"data_source": channel.get("DataSource", {}),
"content_type": channel.get("ContentType"),
"compression_type": channel.get("CompressionType"),
})
# 輸出 location - where are model artifacts stored?
output_config = job.get("OutputDataConfig", {})
artifacts["output_location"] = output_config.get("S3OutputPath", "")
# Resource configuration
resource_config = job.get("ResourceConfig", {})
artifacts["instance_type"] = resource_config.get("InstanceType", "")
artifacts["instance_count"] = resource_config.get("InstanceCount", 0)
artifacts["volume_size_gb"] = resource_config.get(
"VolumeSizeInGB", 0
)
# Collect tags
try:
tag_response = self.sagemaker.list_tags(
ResourceArn=job.get("TrainingJobArn", "")
)
artifacts["tags"] = tag_response.get("Tags", [])
except Exception:
pass
return artifacts
def investigate_endpoint(self, endpoint_name: str) -> dict:
"""
Collect forensic artifacts for a SageMaker endpoint.
Args:
endpoint_name: The endpoint name.
Returns:
Dict with endpoint forensic data.
"""
try:
endpoint = self.sagemaker.describe_endpoint(
EndpointName=endpoint_name
)
except Exception as e:
return {"error": str(e)}
artifacts = {
"endpoint_name": endpoint_name,
"status": endpoint.get("EndpointStatus"),
"creation_time": str(endpoint.get("CreationTime")),
"last_modified_time": str(endpoint.get("LastModifiedTime")),
"endpoint_config_name": endpoint.get("EndpointConfigName"),
"production_variants": [],
"data_capture_config": None,
}
# Get endpoint configuration details
config_name = endpoint.get("EndpointConfigName")
if config_name:
try:
config = self.sagemaker.describe_endpoint_config(
EndpointConfigName=config_name
)
for variant in config.get("ProductionVariants", []):
artifacts["production_variants"].append({
"variant_name": variant.get("VariantName"),
"model_name": variant.get("ModelName"),
"instance_type": variant.get("InstanceType"),
"instance_count": variant.get(
"InitialInstanceCount"
),
})
# Check data capture configuration
capture = config.get("DataCaptureConfig")
if capture:
artifacts["data_capture_config"] = {
"enabled": capture.get(
"EnableCapture", False
),
"sampling_percentage": capture.get(
"InitialSamplingPercentage", 0
),
"destination": capture.get(
"DestinationS3Uri", ""
),
"capture_options": capture.get(
"CaptureOptions", []
),
}
except Exception as e:
artifacts["config_error"] = str(e)
return artifacts
def collect_endpoint_logs(
self,
endpoint_name: str,
start_time: datetime,
end_time: datetime,
max_events: int = 10000,
) -> list[dict]:
"""
Collect CloudWatch logs for a SageMaker endpoint.
Args:
endpoint_name: The endpoint name.
start_time: Start of log window.
end_time: End of log window.
max_events: Maximum number of log events to retrieve.
Returns:
List of log events.
"""
log_group = f"/aws/sagemaker/Endpoints/{endpoint_name}"
events = []
try:
paginator = self.logs.get_paginator("filter_log_events")
page_iterator = paginator.paginate(
logGroupName=log_group,
startTime=int(start_time.timestamp() * 1000),
endTime=int(end_time.timestamp() * 1000),
limit=max_events,
)
for page in page_iterator:
for event in page.get("events", []):
events.append({
"timestamp": datetime.fromtimestamp(
event["timestamp"] / 1000
).isoformat(),
"message": event.get("message", ""),
"log_stream": event.get("logStreamName", ""),
})
if len(events) >= max_events:
break
except self.logs.exceptions.ResourceNotFoundException:
return [{"error": f"Log group {log_group} not found"}]
except Exception as e:
return [{"error": str(e)}]
return eventsBedrock Forensics
AWS Bedrock provides managed access to foundation models. Bedrock's forensic artifacts are primarily in CloudTrail (API calls) and, if enabled, Bedrock's model invocation logging which captures full request/response payloads to S3 or CloudWatch.
class BedrockForensicAnalyzer:
"""Forensic analysis specific to AWS Bedrock."""
def __init__(self, session: boto3.Session):
self.bedrock = session.client("bedrock")
self.bedrock_runtime = session.client("bedrock-runtime")
self.s3 = session.client("s3")
self.logs = session.client("logs")
def check_logging_configuration(self) -> dict:
"""
Check whether Bedrock model invocation logging is enabled.
這是 critical for forensic readiness.
Returns:
Dict describing the logging configuration.
"""
try:
response = self.bedrock.get_model_invocation_logging_configuration()
config = response.get("loggingConfig", {})
return {
"logging_enabled": bool(config),
"s3_config": config.get("s3Config", {}),
"cloudwatch_config": config.get("cloudWatchConfig", {}),
"text_data_delivery_enabled": config.get(
"textDataDeliveryEnabled", False
),
"image_data_delivery_enabled": config.get(
"imageDataDeliveryEnabled", False
),
"embedding_data_delivery_enabled": config.get(
"embeddingDataDeliveryEnabled", False
),
"forensic_readiness": self._assess_forensic_readiness(config),
}
except Exception as e:
return {"error": str(e), "logging_enabled": False}
def _assess_forensic_readiness(self, config: dict) -> dict:
"""評估 how well Bedrock logging supports forensic investigation."""
issues = []
score = 0
if not config:
return {
"score": 0,
"max_score": 5,
"issues": ["Model invocation logging is not enabled."],
}
if config.get("s3Config"):
score += 2
else:
issues.append("S3 logging not configured. Enable for durable log storage.")
if config.get("cloudWatchConfig"):
score += 1
else:
issues.append("CloudWatch logging not configured. Enable for real-time analysis.")
if config.get("textDataDeliveryEnabled"):
score += 1
else:
issues.append(
"Text data delivery disabled. Full prompt/response "
"content will not be logged."
)
if config.get("imageDataDeliveryEnabled"):
score += 1
else:
issues.append("Image data delivery disabled.")
return {
"score": score,
"max_score": 5,
"issues": issues,
}
def collect_bedrock_invocation_logs(
self,
s3_bucket: str,
s3_prefix: str,
start_date: str,
end_date: str,
) -> list[dict]:
"""
Collect Bedrock model invocation logs from S3.
Args:
s3_bucket: S3 bucket where Bedrock logs are stored.
s3_prefix: S3 prefix (folder path).
start_date: Start date (YYYY-MM-DD).
end_date: End date (YYYY-MM-DD).
Returns:
List of invocation log entries.
"""
entries = []
paginator = self.s3.get_paginator("list_objects_v2")
try:
page_iterator = paginator.paginate(
Bucket=s3_bucket,
Prefix=s3_prefix,
)
for page in page_iterator:
for obj in page.get("Contents", []):
key = obj["Key"]
last_modified = obj["LastModified"]
# Filter by date range
obj_date = last_modified.strftime("%Y-%m-%d")
if obj_date < start_date or obj_date > end_date:
continue
# Download and parse log file
try:
response = self.s3.get_object(
Bucket=s3_bucket, Key=key
)
content = response["Body"].read().decode("utf-8")
# Bedrock logs are JSONL format
for line in content.strip().split("\n"):
if line.strip():
entry = json.loads(line)
entries.append(entry)
except Exception as e:
entries.append({
"error": str(e),
"s3_key": key,
})
except Exception as e:
return [{"error": str(e)}]
return entriesS3 Access Log Analysis for Training Data Forensics
訓練資料 for SageMaker models is stored in S3, and S3 access logs provide a detailed record of every access to that data. Enable S3 server access logging on all buckets containing 訓練資料 and model artifacts. The logs record the requester's identity, the operation (GetObject, PutObject, DeleteObject), the specific object key, and the timestamp.
For forensic investigations, S3 access logs reveal who accessed 訓練資料 (was it the expected SageMaker 訓練 role or an unauthorized identity?), when the data was accessed (does the timing align with legitimate 訓練 jobs?), and what data was accessed (did someone selectively download specific subsets of 訓練資料?).
def analyze_s3_access_logs(
log_entries: list[dict],
expected_roles: set[str],
training_data_prefix: str,
) -> list[dict]:
"""
Analyze S3 access logs for suspicious 訓練資料 access.
Args:
log_entries: Parsed S3 access log entries.
expected_roles: Set of IAM role ARNs expected to access this data.
training_data_prefix: S3 key prefix for 訓練資料.
Returns:
List of suspicious access findings.
"""
findings = []
for entry in log_entries:
key = entry.get("key", "")
requester = entry.get("requester", "")
operation = entry.get("operation", "")
timestamp = entry.get("timestamp", "")
# Only analyze 訓練資料 access
if not key.startswith(training_data_prefix):
continue
# Check for unexpected requesters
if requester not in expected_roles and requester != "-":
findings.append({
"type": "unauthorized_training_data_access",
"severity": "high",
"key": key,
"requester": requester,
"operation": operation,
"timestamp": timestamp,
})
# Check for bulk download patterns (many GetObject in short time)
if operation == "REST.GET.OBJECT":
findings.append({
"type": "training_data_read",
"severity": "info",
"key": key,
"requester": requester,
"timestamp": timestamp,
})
# Detect bulk download: many reads from same requester in short window
requester_reads = {}
for f in findings:
if f["type"] == "training_data_read":
req = f["requester"]
if req not in requester_reads:
requester_reads[req] = []
requester_reads[req].append(f)
for req, reads in requester_reads.items():
if len(reads) > 100: # Threshold for bulk access
findings.append({
"type": "bulk_training_data_download",
"severity": "critical",
"requester": req,
"object_count": len(reads),
"first_access": reads[0]["timestamp"],
"last_access": reads[-1]["timestamp"],
})
return [f for f in findings if f["type"] != "training_data_read"]Investigating Common Incident Scenarios
IAM Policy Analysis for AI Service Investigations
理解 what 攻擊者 could do with compromised credentials requires analyzing the IAM policies attached to the compromised identity. AWS IAM policies for AI services can be complex, combining service-specific actions with resource-level 權限 and conditions.
def analyze_iam_permissions_for_ai(
session: boto3.Session,
identity_arn: str,
) -> dict:
"""
Analyze the AI-service-related IAM 權限 for an identity.
Args:
session: boto3 session.
identity_arn: ARN of the IAM user or role to analyze.
Returns:
Dict of AI service 權限.
"""
iam = session.client("iam")
ai_services = [
"sagemaker", "bedrock", "comprehend",
"rekognition", "textract", "translate",
]
# 這是 a simplified analysis; in practice, use
# IAM Policy Simulator for authoritative results
權限 = {svc: [] for svc in ai_services}
try:
# Get the IAM policy simulator results
simulator = iam.simulate_principal_policy(
PolicySourceArn=identity_arn,
ActionNames=[
f"{svc}:*" for svc in ai_services
],
)
for result in simulator.get("EvaluationResults", []):
action = result.get("EvalActionName", "")
decision = result.get("EvalDecision", "")
for svc in ai_services:
if action.startswith(f"{svc}:"):
權限[svc].append({
"action": action,
"decision": decision,
})
except Exception as e:
return {"error": str(e)}
# Summarize risk level per service
summary = {}
for svc, perms in 權限.items():
allowed = [p for p in perms if p["decision"] == "allowed"]
summary[svc] = {
"allowed_actions": len(allowed),
"total_evaluated": len(perms),
"risk_level": (
"critical" if len(allowed) > 10
else "high" if len(allowed) > 5
else "medium" if len(allowed) > 0
else "none"
),
}
return summaryScenario 1: Unauthorized Model Access
攻擊者 gains AWS credentials (through phishing, credential stuffing, or a leaked access key) and uses them to access AI services. The investigation focuses on identifying what 攻擊者 accessed, what data they processed, and whether they extracted any models or 訓練資料.
The investigation workflow is: (1) 識別 the compromised credentials from CloudTrail user identity fields. (2) Filter all CloudTrail events for that identity across all AI service event sources. (3) Check for data access patterns: S3 reads of model artifacts or 訓練資料, SageMaker DescribeTrainingJob calls that expose 輸入 data locations, Bedrock InvokeModel calls that may process sensitive data. (4) Check for persistence: new IAM policies, new SageMaker notebook instances, new endpoints, or new Bedrock custom model jobs.
Scenario 2: Training Data Exfiltration
An insider or compromised service role copies 訓練資料 from S3 buckets used by SageMaker. Investigate by: (1) Collecting S3 access logs for the 訓練資料 buckets. (2) Cross-referencing S3 GetObject events with CloudTrail to 識別 the requesting identity. (3) Checking for unusual download volumes or access from unexpected IPs. (4) Examining whether the data was copied to another S3 bucket or downloaded externally.
Scenario 3: Model Tampering via SageMaker
攻擊者 modifies a model by replacing model artifacts in S3 or deploying a modified endpoint configuration. Investigate by collecting the S3 object version history for model artifacts, comparing SHA-256 hashes of current model files against known-good baselines, reviewing CloudTrail for CreateModel, UpdateEndpoint, and PutObject events targeting model artifact paths, and checking endpoint configuration history for unauthorized changes.
Forensic Readiness Recommendations
Essential Logging Configuration
For forensic readiness in AWS AI environments, enable the following at minimum:
- CloudTrail: Enable in all regions with management and data events. Send to a centralized, immutable S3 bucket with object lock enabled.
- Bedrock invocation logging: Enable with text data delivery to S3. This captures full prompts and responses.
- SageMaker data capture: Enable on production endpoints to capture 推論 requests and responses.
- S3 access logging: Enable on all buckets containing 訓練資料, model artifacts, and AI application data.
- VPC Flow Logs: Enable on VPCs hosting SageMaker endpoints and notebook instances.
- CloudWatch Log retention: Set retention periods that meet your compliance requirements (minimum 90 days, ideally 1 year).
Evidence Preservation Procedures
When conducting forensic investigations on AWS AI services, follow a systematic evidence preservation workflow. First, take snapshots of the current state: export CloudTrail events to a separate S3 bucket with object lock enabled, copy CloudWatch log groups to a preservation account, and record the current configuration of all relevant SageMaker endpoints, 訓練 jobs, and Bedrock settings using the Describe API calls.
For S3-based evidence (model artifacts, 訓練資料, Bedrock invocation logs), enable versioning on the bucket if not already enabled and copy the relevant objects to an evidence preservation bucket. Record the S3 object version IDs, ETags, and checksums 對每個 preserved object. Use S3 object lock in compliance mode to prevent deletion or modification of preserved evidence.
For SageMaker notebook instances that may contain evidence of 攻擊者 activity, create an EBS volume snapshot before stopping the instance. The snapshot preserves the notebook filesystem, including command history, downloaded files, and any custom scripts 攻擊者 may have used. Do not stop the instance before taking the snapshot, as this ensures the in-memory state is captured.
When collecting evidence across multiple AWS accounts (common in organizations using a multi-account strategy), ensure you have cross-account access configured before the incident. An IAM role in each account that the forensic team can assume, with read-only access to CloudTrail, CloudWatch, S3, and the relevant AI services, should be pre-provisioned as part of your forensic readiness program.
Timeline Correlation Across AWS Services
A critical forensic technique for AWS AI investigations is correlating events across multiple AWS services using common identifiers. CloudTrail events include a requestID that uniquely identifies each API call. S3 access logs include the same request ID for S3 operations. VPC Flow Logs include the network connection details that correspond to API calls.
To build a complete timeline, collect all CloudTrail events for the AI services within the investigation window, all S3 access logs for relevant buckets, and all VPC Flow Logs for the VPCs hosting AI infrastructure. Join these data sources on common fields: CloudTrail requestID matches S3 access log request IDs, and CloudTrail source IP addresses match VPC Flow Log destination IPs.
This cross-service correlation reveals the full picture of 攻擊者 activity. 例如, a CloudTrail event showing DescribeTrainingJob followed by S3 GetObject requests for the 訓練資料 location, followed by outbound VPC Flow Log entries to an external IP, indicates a 訓練資料 exfiltration sequence.
Access Control Hardening
實作 least-privilege IAM policies for AI services. Separate roles for 訓練, deployment, and 推論. Use SageMaker's VPC mode to keep 訓練 and 推論 traffic within your VPC. Enable AWS Config rules to detect drift from your 安全 baseline.
AWS GuardDuty and 安全 Hub Integration
AWS GuardDuty provides automated threat 偵測 that can surface AI-related 安全 findings. GuardDuty monitors CloudTrail events and can detect anomalous API calls, unauthorized credential use, and potential data exfiltration. While GuardDuty does not have AI-specific detections, its general findings (credential compromise, unusual API call patterns, data exfiltration indicators) apply directly to AI service abuse.
Feed GuardDuty findings into AWS 安全 Hub to correlate with other 安全 signals. 安全 Hub can aggregate findings from GuardDuty, IAM Access Analyzer, Inspector, and custom finding sources. Configure custom 安全 Hub insights that filter specifically for AI service-related findings using resource type filters for SageMaker, Bedrock, and Cognitive Services resources.
For organizations using AWS Organizations with a delegated 安全 account, ensure that GuardDuty and 安全 Hub are enabled in all accounts that host AI workloads. A compromised developer account in a sandbox environment may provide stepping stones to production AI infrastructure through cross-account role assumptions. GuardDuty's multi-account 監控 can detect this lateral movement pattern.
When an incident is confirmed, use 安全 Hub's investigation workflow to correlate the AI-specific findings with broader infrastructure findings. A SageMaker data exfiltration incident, 例如, may be linked to an earlier GuardDuty finding about compromised credentials or an IAM Access Analyzer finding about overly permissive cross-account access.
參考文獻
- AWS (2025). "Logging and 監控 in Amazon SageMaker." https://docs.aws.amazon.com/sagemaker/latest/dg/logging-cloudwatch.html
- AWS (2025). "Model invocation logging for Amazon Bedrock." https://docs.aws.amazon.com/bedrock/latest/userguide/model-invocation-logging.html
- NIST SP 800-92 (2006). "Guide to Computer 安全 Log Management." https://csrc.nist.gov/publications/detail/sp/800-92/final