Cloud-AI-forensiek: AWS
Forensische onderzoekstechnieken voor AWS AI-diensten waaronder SageMaker, Bedrock en bijbehorende infrastructuurlogging en bewijsverzameling.
Overzicht
Amazon Web Services host een aanzienlijk deel van de productie-AI-workloads via diensten zoals SageMaker (modeltraining en -hosting), Bedrock (beheerde foundation-modellen), Comprehend (NLP), Rekognition (computer vision) en Textract (documentverwerking). Wanneer een beveiligingsincident deze diensten betreft, moet de forensisch onderzoeker zich een weg banen door de gedistribueerde logging-architectuur van AWS om te reconstrueren wat er is gebeurd.
AWS AI-forensiek is een uitdaging omdat relevant bewijs verspreid is over meerdere diensten en logbronnen. Een enkele SageMaker-inferentieaanroep kan records genereren in CloudTrail (de API-aanroep), CloudWatch (de endpoint-metriek), S3 (de input-/outputdata als logging is ingeschakeld), VPC Flow Logs (het netwerkverkeer) en de containerlogs van het SageMaker-endpoint. Het missen van een van deze bronnen kan gaten in het onderzoek achterlaten.
Bovendien betekent het gedeelde-verantwoordelijkheidsmodel van AWS dat sommige forensische artefacten onder jouw beheer vallen (je modelcode, trainingsdata, endpoint-configuraties) terwijl andere worden beheerd door AWS (de onderliggende infrastructuur, de gewichten van de Bedrock-foundation-modellen). Begrijpen welk bewijs beschikbaar is en hoe je het snel kunt verwerven is essentieel voor effectieve incidentrespons.
Dit artikel behandelt de forensische artefacten die beschikbaar zijn over AWS AI-diensten, hoe je ze verwerft en bewaart, en hoe je ze analyseert om veelvoorkomende incidentscenario's te onderzoeken: ongeautoriseerde modeltoegang, diefstal van trainingsdata, het manipuleren van modellen en misbruik van beheerde AI-diensten.
Forensische artefacten van AWS AI-diensten
CloudTrail: het fundament van AWS-forensiek
CloudTrail registreert API-aanroepen over alle AWS-diensten en is het startpunt voor elk AWS forensisch onderzoek. Voor AI-diensten legt CloudTrail vast wie welke actie uitvoerde, wanneer en vanwaar.
import boto3
import json
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class CloudTrailEvent:
"""A parsed CloudTrail event relevant to AI forensics."""
event_time: str
event_name: str
event_source: str
aws_region: str
source_ip: str
user_identity: dict
request_parameters: dict
response_elements: dict
error_code: Optional[str] = None
error_message: Optional[str] = None
raw_event: dict = field(default_factory=dict)
class AWSAIForensicCollector:
"""Collect and analyze forensic artifacts from AWS AI services."""
# AI-related CloudTrail event sources
AI_EVENT_SOURCES = {
"sagemaker.amazonaws.com",
"bedrock.amazonaws.com",
"comprehend.amazonaws.com",
"rekognition.amazonaws.com",
"textract.amazonaws.com",
"translate.amazonaws.com",
"transcribe.amazonaws.com",
}
# High-risk SageMaker actions that warrant investigation
SAGEMAKER_HIGH_RISK_ACTIONS = {
"CreateModel",
"CreateEndpoint",
"UpdateEndpoint",
"CreateTrainingJob",
"CreateProcessingJob",
"CreateNotebookInstance",
"StartNotebookInstance",
"CreatePresignedNotebookInstanceUrl",
"CreateModelPackage",
"UpdateModelPackage",
}
# Bedrock actions of forensic interest
BEDROCK_SENSITIVE_ACTIONS = {
"InvokeModel",
"InvokeModelWithResponseStream",
"CreateModelCustomizationJob",
"CreateProvisionedModelThroughput",
"GetFoundationModel",
"ListFoundationModels",
"CreateGuardrail",
"UpdateGuardrail",
"DeleteGuardrail",
}
def __init__(self, region: str = "us-east-1", profile: Optional[str] = None):
session_kwargs = {}
if profile:
session_kwargs["profile_name"] = profile
self.session = boto3.Session(region_name=region, **session_kwargs)
self.cloudtrail = self.session.client("cloudtrail")
self.logs = self.session.client("logs")
self.s3 = self.session.client("s3")
self.sagemaker = self.session.client("sagemaker")
def collect_ai_cloudtrail_events(
self,
start_time: datetime,
end_time: datetime,
event_sources: Optional[set[str]] = None,
) -> list[CloudTrailEvent]:
"""
Collect CloudTrail events related to AI services.
Args:
start_time: Start of the investigation window.
end_time: End of the investigation window.
event_sources: Specific event sources to filter.
Defaults to all AI services.
Returns:
List of parsed CloudTrail events.
"""
sources = event_sources or self.AI_EVENT_SOURCES
events = []
for source in sources:
try:
paginator = self.cloudtrail.get_paginator("lookup_events")
page_iterator = paginator.paginate(
LookupAttributes=[
{
"AttributeKey": "EventSource",
"AttributeValue": source,
}
],
StartTime=start_time,
EndTime=end_time,
)
for page in page_iterator:
for event in page.get("Events", []):
parsed = self._parse_cloudtrail_event(event)
if parsed:
events.append(parsed)
except Exception as e:
print(f"Error collecting events from {source}: {e}")
events.sort(key=lambda e: e.event_time)
return events
def _parse_cloudtrail_event(self, raw_event: dict) -> Optional[CloudTrailEvent]:
"""Parse a raw CloudTrail event into a structured format."""
try:
cloud_trail_event = json.loads(
raw_event.get("CloudTrailEvent", "{}")
)
except json.JSONDecodeError:
return None
return CloudTrailEvent(
event_time=str(raw_event.get("EventTime", "")),
event_name=raw_event.get("EventName", ""),
event_source=cloud_trail_event.get("eventSource", ""),
aws_region=cloud_trail_event.get("awsRegion", ""),
source_ip=cloud_trail_event.get("sourceIPAddress", ""),
user_identity=cloud_trail_event.get("userIdentity", {}),
request_parameters=cloud_trail_event.get("requestParameters", {}),
response_elements=cloud_trail_event.get("responseElements", {}),
error_code=cloud_trail_event.get("errorCode"),
error_message=cloud_trail_event.get("errorMessage"),
raw_event=cloud_trail_event,
)
def identify_suspicious_activity(
self,
events: list[CloudTrailEvent],
) -> list[dict]:
"""
Analyze CloudTrail events for suspicious AI-related activity.
Args:
events: List of parsed CloudTrail events.
Returns:
List of suspicious activity findings.
"""
findings = []
for event in events:
# Check for high-risk SageMaker actions
if (
event.event_source == "sagemaker.amazonaws.com"
and event.event_name in self.SAGEMAKER_HIGH_RISK_ACTIONS
):
findings.append({
"type": "high_risk_sagemaker_action",
"severity": "high",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"details": event.request_parameters,
})
# Check for Bedrock model access
if (
event.event_source == "bedrock.amazonaws.com"
and event.event_name in self.BEDROCK_SENSITIVE_ACTIONS
):
findings.append({
"type": "sensitive_bedrock_action",
"severity": "medium",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"details": event.request_parameters,
})
# Check for failed access attempts (possible enumeration)
if event.error_code in (
"AccessDeniedException",
"UnauthorizedAccess",
"AccessDenied",
):
findings.append({
"type": "access_denied",
"severity": "medium",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
"error": event.error_code,
"details": event.request_parameters,
})
# Check for unusual source IPs (console vs programmatic)
if event.source_ip == "AWS Internal":
# This is normal for service-linked actions
pass
elif self._is_suspicious_ip_pattern(event):
findings.append({
"type": "suspicious_source",
"severity": "high",
"event_name": event.event_name,
"timestamp": event.event_time,
"source_ip": event.source_ip,
"user": self._extract_user_id(event.user_identity),
})
return findings
def _extract_user_id(self, user_identity: dict) -> str:
"""Extract a readable user identifier from CloudTrail user identity."""
arn = user_identity.get("arn", "")
principal = user_identity.get("principalId", "")
user_type = user_identity.get("type", "")
if user_type == "AssumedRole":
# Extract role session name
session_context = user_identity.get("sessionContext", {})
session_issuer = session_context.get("sessionIssuer", {})
return f"{session_issuer.get('userName', 'unknown')} (assumed role)"
elif user_type == "IAMUser":
return user_identity.get("userName", arn)
elif user_type == "Root":
return "ROOT ACCOUNT"
return principal or arn or "unknown"
def _is_suspicious_ip_pattern(self, event: CloudTrailEvent) -> bool:
"""Check if the source IP matches suspicious patterns."""
ip = event.source_ip
# Flag if using known VPN/Tor exit node ranges
# In practice, check against a threat intelligence feed
# For now, flag non-AWS console access from unusual patterns
user_agent = event.raw_event.get("userAgent", "")
if "signin.amazonaws.com" not in event.source_ip:
if "boto" not in user_agent and "aws-cli" not in user_agent:
if "sdk" not in user_agent.lower():
return True
return FalseSageMaker-forensiek
SageMaker is het primaire ML-platform van AWS en genereert forensische artefacten over meerdere subsystemen. Trainingsjobs produceren logs in CloudWatch, modelartefacten in S3 en metadata in de SageMaker-API. Endpoints produceren inferentielogs, containerlogs en CloudWatch-metriek.
class SageMakerForensicAnalyzer:
"""Forensic analysis specific to AWS SageMaker."""
def __init__(self, session: boto3.Session):
self.sagemaker = session.client("sagemaker")
self.logs = session.client("logs")
self.s3 = session.client("s3")
def investigate_training_job(self, job_name: str) -> dict:
"""
Collect forensic artifacts for a SageMaker training job.
Args:
job_name: The training job name.
Returns:
Dict with training job forensic data.
"""
try:
job = self.sagemaker.describe_training_job(
TrainingJobName=job_name
)
except Exception as e:
return {"error": str(e)}
artifacts = {
"job_name": job_name,
"status": job.get("TrainingJobStatus"),
"creation_time": str(job.get("CreationTime")),
"training_start_time": str(job.get("TrainingStartTime")),
"training_end_time": str(job.get("TrainingEndTime")),
"last_modified_time": str(job.get("LastModifiedTime")),
"role_arn": job.get("RoleArn"),
"algorithm": job.get("AlgorithmSpecification", {}),
"input_channels": [],
"output_location": "",
"instance_type": "",
"vpc_config": job.get("VpcConfig"),
"tags": [],
}
# Input data channels - where did training data come from?
for channel in job.get("InputDataConfig", []):
artifacts["input_channels"].append({
"channel_name": channel.get("ChannelName"),
"data_source": channel.get("DataSource", {}),
"content_type": channel.get("ContentType"),
"compression_type": channel.get("CompressionType"),
})
# Output location - where are model artifacts stored?
output_config = job.get("OutputDataConfig", {})
artifacts["output_location"] = output_config.get("S3OutputPath", "")
# Resource configuration
resource_config = job.get("ResourceConfig", {})
artifacts["instance_type"] = resource_config.get("InstanceType", "")
artifacts["instance_count"] = resource_config.get("InstanceCount", 0)
artifacts["volume_size_gb"] = resource_config.get(
"VolumeSizeInGB", 0
)
# Collect tags
try:
tag_response = self.sagemaker.list_tags(
ResourceArn=job.get("TrainingJobArn", "")
)
artifacts["tags"] = tag_response.get("Tags", [])
except Exception:
pass
return artifacts
def investigate_endpoint(self, endpoint_name: str) -> dict:
"""
Collect forensic artifacts for a SageMaker endpoint.
Args:
endpoint_name: The endpoint name.
Returns:
Dict with endpoint forensic data.
"""
try:
endpoint = self.sagemaker.describe_endpoint(
EndpointName=endpoint_name
)
except Exception as e:
return {"error": str(e)}
artifacts = {
"endpoint_name": endpoint_name,
"status": endpoint.get("EndpointStatus"),
"creation_time": str(endpoint.get("CreationTime")),
"last_modified_time": str(endpoint.get("LastModifiedTime")),
"endpoint_config_name": endpoint.get("EndpointConfigName"),
"production_variants": [],
"data_capture_config": None,
}
# Get endpoint configuration details
config_name = endpoint.get("EndpointConfigName")
if config_name:
try:
config = self.sagemaker.describe_endpoint_config(
EndpointConfigName=config_name
)
for variant in config.get("ProductionVariants", []):
artifacts["production_variants"].append({
"variant_name": variant.get("VariantName"),
"model_name": variant.get("ModelName"),
"instance_type": variant.get("InstanceType"),
"instance_count": variant.get(
"InitialInstanceCount"
),
})
# Check data capture configuration
capture = config.get("DataCaptureConfig")
if capture:
artifacts["data_capture_config"] = {
"enabled": capture.get(
"EnableCapture", False
),
"sampling_percentage": capture.get(
"InitialSamplingPercentage", 0
),
"destination": capture.get(
"DestinationS3Uri", ""
),
"capture_options": capture.get(
"CaptureOptions", []
),
}
except Exception as e:
artifacts["config_error"] = str(e)
return artifacts
def collect_endpoint_logs(
self,
endpoint_name: str,
start_time: datetime,
end_time: datetime,
max_events: int = 10000,
) -> list[dict]:
"""
Collect CloudWatch logs for a SageMaker endpoint.
Args:
endpoint_name: The endpoint name.
start_time: Start of log window.
end_time: End of log window.
max_events: Maximum number of log events to retrieve.
Returns:
List of log events.
"""
log_group = f"/aws/sagemaker/Endpoints/{endpoint_name}"
events = []
try:
paginator = self.logs.get_paginator("filter_log_events")
page_iterator = paginator.paginate(
logGroupName=log_group,
startTime=int(start_time.timestamp() * 1000),
endTime=int(end_time.timestamp() * 1000),
limit=max_events,
)
for page in page_iterator:
for event in page.get("events", []):
events.append({
"timestamp": datetime.fromtimestamp(
event["timestamp"] / 1000
).isoformat(),
"message": event.get("message", ""),
"log_stream": event.get("logStreamName", ""),
})
if len(events) >= max_events:
break
except self.logs.exceptions.ResourceNotFoundException:
return [{"error": f"Log group {log_group} not found"}]
except Exception as e:
return [{"error": str(e)}]
return eventsBedrock-forensiek
AWS Bedrock biedt beheerde toegang tot foundation-modellen. De forensische artefacten van Bedrock bevinden zich primair in CloudTrail (API-aanroepen) en, indien ingeschakeld, in Bedrocks logging van modelaanroepen die volledige request-/response-payloads vastlegt naar S3 of CloudWatch.
class BedrockForensicAnalyzer:
"""Forensic analysis specific to AWS Bedrock."""
def __init__(self, session: boto3.Session):
self.bedrock = session.client("bedrock")
self.bedrock_runtime = session.client("bedrock-runtime")
self.s3 = session.client("s3")
self.logs = session.client("logs")
def check_logging_configuration(self) -> dict:
"""
Check whether Bedrock model invocation logging is enabled.
This is critical for forensic readiness.
Returns:
Dict describing the logging configuration.
"""
try:
response = self.bedrock.get_model_invocation_logging_configuration()
config = response.get("loggingConfig", {})
return {
"logging_enabled": bool(config),
"s3_config": config.get("s3Config", {}),
"cloudwatch_config": config.get("cloudWatchConfig", {}),
"text_data_delivery_enabled": config.get(
"textDataDeliveryEnabled", False
),
"image_data_delivery_enabled": config.get(
"imageDataDeliveryEnabled", False
),
"embedding_data_delivery_enabled": config.get(
"embeddingDataDeliveryEnabled", False
),
"forensic_readiness": self._assess_forensic_readiness(config),
}
except Exception as e:
return {"error": str(e), "logging_enabled": False}
def _assess_forensic_readiness(self, config: dict) -> dict:
"""Assess how well Bedrock logging supports forensic investigation."""
issues = []
score = 0
if not config:
return {
"score": 0,
"max_score": 5,
"issues": ["Model invocation logging is not enabled."],
}
if config.get("s3Config"):
score += 2
else:
issues.append("S3 logging not configured. Enable for durable log storage.")
if config.get("cloudWatchConfig"):
score += 1
else:
issues.append("CloudWatch logging not configured. Enable for real-time analysis.")
if config.get("textDataDeliveryEnabled"):
score += 1
else:
issues.append(
"Text data delivery disabled. Full prompt/response "
"content will not be logged."
)
if config.get("imageDataDeliveryEnabled"):
score += 1
else:
issues.append("Image data delivery disabled.")
return {
"score": score,
"max_score": 5,
"issues": issues,
}
def collect_bedrock_invocation_logs(
self,
s3_bucket: str,
s3_prefix: str,
start_date: str,
end_date: str,
) -> list[dict]:
"""
Collect Bedrock model invocation logs from S3.
Args:
s3_bucket: S3 bucket where Bedrock logs are stored.
s3_prefix: S3 prefix (folder path).
start_date: Start date (YYYY-MM-DD).
end_date: End date (YYYY-MM-DD).
Returns:
List of invocation log entries.
"""
entries = []
paginator = self.s3.get_paginator("list_objects_v2")
try:
page_iterator = paginator.paginate(
Bucket=s3_bucket,
Prefix=s3_prefix,
)
for page in page_iterator:
for obj in page.get("Contents", []):
key = obj["Key"]
last_modified = obj["LastModified"]
# Filter by date range
obj_date = last_modified.strftime("%Y-%m-%d")
if obj_date < start_date or obj_date > end_date:
continue
# Download and parse log file
try:
response = self.s3.get_object(
Bucket=s3_bucket, Key=key
)
content = response["Body"].read().decode("utf-8")
# Bedrock logs are JSONL format
for line in content.strip().split("\n"):
if line.strip():
entry = json.loads(line)
entries.append(entry)
except Exception as e:
entries.append({
"error": str(e),
"s3_key": key,
})
except Exception as e:
return [{"error": str(e)}]
return entriesAnalyse van S3-toegangslogs voor forensiek van trainingsdata
Trainingsdata voor SageMaker-modellen wordt opgeslagen in S3, en S3-toegangslogs bieden een gedetailleerd verslag van elke toegang tot die data. Schakel S3-serverlogging voor toegang in op alle buckets die trainingsdata en modelartefacten bevatten. De logs registreren de identiteit van de aanvrager, de operatie (GetObject, PutObject, DeleteObject), de specifieke objectsleutel en het tijdstempel.
Voor forensische onderzoeken onthullen S3-toegangslogs wie de trainingsdata heeft benaderd (was het de verwachte SageMaker-trainingsrol of een ongeautoriseerde identiteit?), wanneer de data werd benaderd (komt de timing overeen met legitieme trainingsjobs?) en welke data werd benaderd (heeft iemand selectief specifieke deelverzamelingen van de trainingsdata gedownload?).
def analyze_s3_access_logs(
log_entries: list[dict],
expected_roles: set[str],
training_data_prefix: str,
) -> list[dict]:
"""
Analyze S3 access logs for suspicious training data access.
Args:
log_entries: Parsed S3 access log entries.
expected_roles: Set of IAM role ARNs expected to access this data.
training_data_prefix: S3 key prefix for training data.
Returns:
List of suspicious access findings.
"""
findings = []
for entry in log_entries:
key = entry.get("key", "")
requester = entry.get("requester", "")
operation = entry.get("operation", "")
timestamp = entry.get("timestamp", "")
# Only analyze training data access
if not key.startswith(training_data_prefix):
continue
# Check for unexpected requesters
if requester not in expected_roles and requester != "-":
findings.append({
"type": "unauthorized_training_data_access",
"severity": "high",
"key": key,
"requester": requester,
"operation": operation,
"timestamp": timestamp,
})
# Check for bulk download patterns (many GetObject in short time)
if operation == "REST.GET.OBJECT":
findings.append({
"type": "training_data_read",
"severity": "info",
"key": key,
"requester": requester,
"timestamp": timestamp,
})
# Detect bulk download: many reads from same requester in short window
requester_reads = {}
for f in findings:
if f["type"] == "training_data_read":
req = f["requester"]
if req not in requester_reads:
requester_reads[req] = []
requester_reads[req].append(f)
for req, reads in requester_reads.items():
if len(reads) > 100: # Threshold for bulk access
findings.append({
"type": "bulk_training_data_download",
"severity": "critical",
"requester": req,
"object_count": len(reads),
"first_access": reads[0]["timestamp"],
"last_access": reads[-1]["timestamp"],
})
return [f for f in findings if f["type"] != "training_data_read"]Het onderzoeken van veelvoorkomende incidentscenario's
IAM-beleidsanalyse voor onderzoeken naar AI-diensten
Begrijpen wat een aanvaller zou kunnen doen met gecompromitteerde inloggegevens vereist analyse van de IAM-beleidsregels die aan de gecompromitteerde identiteit zijn gekoppeld. AWS IAM-beleidsregels voor AI-diensten kunnen complex zijn, doordat ze dienstspecifieke acties combineren met permissies op resourceniveau en condities.
def analyze_iam_permissions_for_ai(
session: boto3.Session,
identity_arn: str,
) -> dict:
"""
Analyze the AI-service-related IAM permissions for an identity.
Args:
session: boto3 session.
identity_arn: ARN of the IAM user or role to analyze.
Returns:
Dict of AI service permissions.
"""
iam = session.client("iam")
ai_services = [
"sagemaker", "bedrock", "comprehend",
"rekognition", "textract", "translate",
]
# This is a simplified analysis; in practice, use
# IAM Policy Simulator for authoritative results
permissions = {svc: [] for svc in ai_services}
try:
# Get the IAM policy simulator results
simulator = iam.simulate_principal_policy(
PolicySourceArn=identity_arn,
ActionNames=[
f"{svc}:*" for svc in ai_services
],
)
for result in simulator.get("EvaluationResults", []):
action = result.get("EvalActionName", "")
decision = result.get("EvalDecision", "")
for svc in ai_services:
if action.startswith(f"{svc}:"):
permissions[svc].append({
"action": action,
"decision": decision,
})
except Exception as e:
return {"error": str(e)}
# Summarize risk level per service
summary = {}
for svc, perms in permissions.items():
allowed = [p for p in perms if p["decision"] == "allowed"]
summary[svc] = {
"allowed_actions": len(allowed),
"total_evaluated": len(perms),
"risk_level": (
"critical" if len(allowed) > 10
else "high" if len(allowed) > 5
else "medium" if len(allowed) > 0
else "none"
),
}
return summaryScenario 1: Ongeautoriseerde modeltoegang
Een aanvaller verkrijgt AWS-inloggegevens (via phishing, credential stuffing of een gelekte toegangssleutel) en gebruikt deze om toegang te krijgen tot AI-diensten. Het onderzoek richt zich op het identificeren van wat de aanvaller heeft benaderd, welke data hij heeft verwerkt en of hij modellen of trainingsdata heeft geëxtraheerd.
De onderzoeksworkflow is: (1) Identificeer de gecompromitteerde inloggegevens uit de CloudTrail-velden voor gebruikersidentiteit. (2) Filter alle CloudTrail-events voor die identiteit over alle event-bronnen van AI-diensten. (3) Controleer op patronen van datatoegang: S3-reads van modelartefacten of trainingsdata, SageMaker DescribeTrainingJob-aanroepen die locaties van inputdata blootleggen, Bedrock InvokeModel-aanroepen die mogelijk gevoelige data verwerken. (4) Controleer op persistentie: nieuwe IAM-beleidsregels, nieuwe SageMaker-notebookinstances, nieuwe endpoints of nieuwe Bedrock custom-modeljobs.
Scenario 2: Exfiltratie van trainingsdata
Een insider of gecompromitteerde servicerol kopieert trainingsdata uit S3-buckets die door SageMaker worden gebruikt. Onderzoek dit door: (1) S3-toegangslogs voor de buckets met trainingsdata te verzamelen. (2) S3 GetObject-events te kruisverwijzen met CloudTrail om de aanvragende identiteit te identificeren. (3) Te controleren op ongebruikelijke downloadvolumes of toegang vanaf onverwachte IP's. (4) Te onderzoeken of de data naar een andere S3-bucket is gekopieerd of extern is gedownload.
Scenario 3: Modelmanipulatie via SageMaker
Een aanvaller wijzigt een model door modelartefacten in S3 te vervangen of door een gewijzigde endpoint-configuratie te implementeren. Onderzoek dit door de S3-objectversiegeschiedenis voor modelartefacten te verzamelen, SHA-256-hashes van huidige modelbestanden te vergelijken met bekende-goede baselines, CloudTrail te beoordelen op CreateModel-, UpdateEndpoint- en PutObject-events gericht op de paden van modelartefacten, en de geschiedenis van de endpoint-configuratie te controleren op ongeautoriseerde wijzigingen.
Aanbevelingen voor forensische gereedheid
Essentiële logging-configuratie
Schakel voor forensische gereedheid in AWS AI-omgevingen ten minste het volgende in:
- CloudTrail: Schakel dit in alle regio's in met management- en data-events. Stuur dit naar een gecentraliseerde, onveranderlijke S3-bucket met object lock ingeschakeld.
- Bedrock-aanroeplogging: Schakel dit in met levering van tekstdata naar S3. Dit legt volledige prompts en responses vast.
- SageMaker data capture: Schakel dit in op productie-endpoints om inferentieverzoeken en -responses vast te leggen.
- S3-toegangslogging: Schakel dit in op alle buckets die trainingsdata, modelartefacten en AI-applicatiedata bevatten.
- VPC Flow Logs: Schakel dit in op VPC's die SageMaker-endpoints en notebookinstances hosten.
- CloudWatch-logbewaring: Stel bewaartermijnen in die voldoen aan je compliance-vereisten (minimaal 90 dagen, idealiter 1 jaar).
Procedures voor bewijsbewaring
Volg bij het uitvoeren van forensische onderzoeken op AWS AI-diensten een systematische workflow voor bewijsbewaring. Maak eerst snapshots van de huidige toestand: exporteer CloudTrail-events naar een aparte S3-bucket met object lock ingeschakeld, kopieer CloudWatch-loggroepen naar een bewaaraccount, en leg de huidige configuratie van alle relevante SageMaker-endpoints, trainingsjobs en Bedrock-instellingen vast met de Describe-API-aanroepen.
Voor S3-gebaseerd bewijs (modelartefacten, trainingsdata, Bedrock-aanroeplogs) schakel je versiebeheer op de bucket in als dat nog niet is gebeurd en kopieer je de relevante objecten naar een bucket voor bewijsbewaring. Leg de S3-objectversie-ID's, ETags en checksums voor elk bewaard object vast. Gebruik S3 object lock in compliance-modus om verwijdering of wijziging van bewaard bewijs te voorkomen.
Maak voor SageMaker-notebookinstances die bewijs van aanvallersactiviteit kunnen bevatten een EBS-volumesnapshot voordat je de instance stopt. De snapshot bewaart het notebookbestandssysteem, inclusief de commandogeschiedenis, gedownloade bestanden en eventuele aangepaste scripts die de aanvaller heeft gebruikt. Stop de instance niet voordat je de snapshot maakt, want dit zorgt ervoor dat de in-memory toestand wordt vastgelegd.
Zorg bij het verzamelen van bewijs over meerdere AWS-accounts (gebruikelijk in organisaties die een multi-accountstrategie hanteren) dat je vóór het incident cross-accounttoegang hebt geconfigureerd. Een IAM-rol in elk account die het forensische team kan aannemen, met alleen-lezentoegang tot CloudTrail, CloudWatch, S3 en de relevante AI-diensten, zou vooraf moeten worden ingericht als onderdeel van je programma voor forensische gereedheid.
Tijdlijncorrelatie over AWS-diensten
Een kritieke forensische techniek voor AWS AI-onderzoeken is het correleren van events over meerdere AWS-diensten met behulp van gemeenschappelijke identifiers. CloudTrail-events bevatten een requestID die elke API-aanroep uniek identificeert. S3-toegangslogs bevatten dezelfde request-ID voor S3-operaties. VPC Flow Logs bevatten de details van de netwerkverbinding die overeenkomen met API-aanroepen.
Om een volledige tijdlijn op te bouwen, verzamel je alle CloudTrail-events voor de AI-diensten binnen het onderzoeksvenster, alle S3-toegangslogs voor relevante buckets, en alle VPC Flow Logs voor de VPC's die AI-infrastructuur hosten. Voeg deze databronnen samen op gemeenschappelijke velden: CloudTrail requestID komt overeen met de request-ID's van de S3-toegangslogs, en CloudTrail-bron-IP-adressen komen overeen met de bestemmings-IP's van VPC Flow Logs.
Deze cross-servicecorrelatie onthult het volledige beeld van aanvallersactiviteit. Een CloudTrail-event dat DescribeTrainingJob toont, gevolgd door S3 GetObject-verzoeken voor de locatie van de trainingsdata, gevolgd door uitgaande VPC Flow Log-vermeldingen naar een extern IP, duidt bijvoorbeeld op een exfiltratiesequentie van trainingsdata.
Verharding van toegangscontrole
Implementeer least-privilege IAM-beleidsregels voor AI-diensten. Gescheiden rollen voor training, deployment en inferentie. Gebruik de VPC-modus van SageMaker om trainings- en inferentieverkeer binnen je VPC te houden. Schakel AWS Config-regels in om afwijkingen van je beveiligingsbaseline te detecteren.
Integratie van AWS GuardDuty en Security Hub
AWS GuardDuty biedt geautomatiseerde dreigingsdetectie die AI-gerelateerde beveiligingsbevindingen aan het licht kan brengen. GuardDuty monitort CloudTrail-events en kan anomale API-aanroepen, ongeautoriseerd gebruik van inloggegevens en mogelijke data-exfiltratie detecteren. Hoewel GuardDuty geen AI-specifieke detecties heeft, zijn de algemene bevindingen (compromittering van inloggegevens, ongebruikelijke API-aanroeppatronen, indicatoren van data-exfiltratie) direct van toepassing op misbruik van AI-diensten.
Voer GuardDuty-bevindingen in AWS Security Hub om ze te correleren met andere beveiligingssignalen. Security Hub kan bevindingen aggregeren van GuardDuty, IAM Access Analyzer, Inspector en aangepaste bevindingsbronnen. Configureer aangepaste Security Hub-insights die specifiek filteren op AI-dienstgerelateerde bevindingen met behulp van resourcetypefilters voor SageMaker-, Bedrock- en Cognitive Services-resources.
Zorg voor organisaties die AWS Organizations gebruiken met een gedelegeerd beveiligingsaccount dat GuardDuty en Security Hub zijn ingeschakeld in alle accounts die AI-workloads hosten. Een gecompromitteerd developeraccount in een sandbox-omgeving kan via cross-accountrolovernames opstapjes bieden naar productie-AI-infrastructuur. De multi-accountmonitoring van GuardDuty kan dit patroon van laterale verplaatsing detecteren.
Gebruik bij een bevestigd incident de onderzoeksworkflow van Security Hub om de AI-specifieke bevindingen te correleren met bredere infrastructuurbevindingen. Een SageMaker-data-exfiltratie-incident kan bijvoorbeeld gekoppeld zijn aan een eerdere GuardDuty-bevinding over gecompromitteerde inloggegevens of een IAM Access Analyzer-bevinding over te ruime cross-accounttoegang.
Referenties
- AWS (2025). "Logging and monitoring in Amazon SageMaker." https://docs.aws.amazon.com/sagemaker/latest/dg/logging-cloudwatch.html
- AWS (2025). "Model invocation logging for Amazon Bedrock." https://docs.aws.amazon.com/bedrock/latest/userguide/model-invocation-logging.html
- NIST SP 800-92 (2006). "Guide to Computer Security Log Management." https://csrc.nist.gov/publications/detail/sp/800-92/final