Securing Storage Systems for Training Data
Attack and defense strategies for S3, GCS, HDFS, and object storage systems holding AI training datasets and model artifacts
Overview
The storage layer is the foundation of every AI system. Training datasets, validation sets, model checkpoints, final model artifacts, feature stores, and experiment metadata all reside in storage systems. For modern AI workloads, these storage systems are typically cloud object stores (Amazon S3, Google Cloud Storage, Azure Blob Storage), distributed filesystems (HDFS, Lustre, GPFS for on-premise HPC clusters), or increasingly, purpose-built AI data platforms that abstract over multiple backends.
Compromising the storage layer is one of the most effective attack vectors against AI systems because it enables data poisoning without modifying any code. An attacker who can write to training data storage can inject poisoned samples that cause targeted misclassification in the trained model. An attacker who can read from model artifact storage can steal proprietary models worth millions in training compute. An attacker who can modify model artifacts can insert backdoors that persist through deployment.
The security challenge is compounded by the scale of AI data operations. Training datasets can be terabytes to petabytes, making integrity verification computationally expensive. Data pipelines often require broad read access across many storage locations, creating overly permissive IAM policies. Training jobs need write access to checkpoint and output locations, and the same credentials are often reused across data reading and model writing operations. Multi-team environments share storage namespaces with insufficient access boundaries.
This article examines storage security through the lens of AI-specific attack scenarios, covering cloud object stores, distributed filesystems, and the cross-cutting concern of data integrity for machine learning.
Cloud Object Storage Attack Surface
Amazon S3 for AI Workloads
S3 is the most commonly used storage backend for AI training data in cloud environments. SageMaker, custom training on EC2, and hybrid cloud setups all typically pull training data from S3 and write model artifacts back to S3. The S3 security model involves bucket policies, IAM policies, ACLs (legacy), and encryption settings. Misconfigurations at any level can expose training data or allow model tampering.
Common S3 misconfigurations in AI deployments:
- Overly permissive bucket policies: Training data buckets often have
s3:GetObjectgranted to broad principals because multiple services and teams need access. - Missing server-side encryption: Training data containing PII or proprietary information stored without encryption at rest.
- Public bucket ACLs: Legacy ACL settings that inadvertently make buckets publicly readable.
- Cross-account access without conditions: S3 bucket policies that allow access from partner accounts without restricting to specific roles or conditions.
- Missing versioning: Without versioning, a poisoned file replaces the original with no recovery path.
"""
S3 security auditor for AI training data and model artifact buckets.
Checks for misconfigurations that could enable data poisoning,
model theft, or unauthorized access to training data.
"""
import json
import re
from typing import Any, Optional
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
print("boto3 required: pip install boto3")
raise
class S3AIStorageAuditor:
"""Audit S3 buckets used for AI workloads."""
# Patterns that indicate AI-related buckets
AI_BUCKET_PATTERNS = [
r"train", r"dataset", r"model", r"checkpoint",
r"artifact", r"feature", r"mlflow", r"sagemaker",
r"experiment", r"pipeline", r"embedding",
]
def __init__(self, session: Optional[Any] = None):
self.session = session or boto3.Session()
self.s3 = self.session.client("s3")
self.findings: list[dict] = []
def _add(self, severity: str, title: str, detail: str) -> None:
self.findings.append({
"severity": severity, "title": title, "detail": detail,
})
def is_ai_bucket(self, bucket_name: str) -> bool:
"""Heuristically identify AI-related buckets."""
name_lower = bucket_name.lower()
return any(
re.search(pattern, name_lower)
for pattern in self.AI_BUCKET_PATTERNS
)
def check_public_access(self, bucket: str) -> None:
"""Check if bucket has public access enabled."""
try:
pab = self.s3.get_public_access_block(Bucket=bucket)
config = pab["PublicAccessBlockConfiguration"]
if not all([
config.get("BlockPublicAcls", False),
config.get("IgnorePublicAcls", False),
config.get("BlockPublicPolicy", False),
config.get("RestrictPublicBuckets", False),
]):
self._add(
"CRITICAL",
f"Public access not fully blocked: {bucket}",
f"PublicAccessBlock config: {json.dumps(config)}. "
f"Training data or model artifacts may be publicly "
f"accessible.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration":
self._add(
"CRITICAL",
f"No public access block: {bucket}",
"Bucket has no PublicAccessBlock configuration. "
"Public access is possible via bucket policies or ACLs.",
)
def check_encryption(self, bucket: str) -> None:
"""Check server-side encryption configuration."""
try:
enc = self.s3.get_bucket_encryption(Bucket=bucket)
rules = enc["ServerSideEncryptionConfiguration"]["Rules"]
for rule in rules:
sse = rule.get("ApplyServerSideEncryptionByDefault", {})
algo = sse.get("SSEAlgorithm", "none")
if algo == "AES256":
self._add(
"LOW",
f"SSE-S3 encryption (AES256): {bucket}",
"Using AWS-managed keys. Consider SSE-KMS with "
"customer-managed keys for AI data with compliance "
"requirements.",
)
elif algo == "aws:kms":
kms_key = sse.get("KMSMasterKeyID", "default")
if kms_key == "default" or "alias/aws/" in str(kms_key):
self._add(
"MEDIUM",
f"SSE-KMS with AWS-managed key: {bucket}",
"Using an AWS-managed KMS key. Customer-managed "
"keys provide better access control and audit.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError":
self._add(
"HIGH",
f"No default encryption: {bucket}",
"Bucket has no default encryption. Training data and "
"model artifacts may be stored unencrypted.",
)
def check_versioning(self, bucket: str) -> None:
"""Check if versioning is enabled (critical for data integrity)."""
try:
ver = self.s3.get_bucket_versioning(Bucket=bucket)
status = ver.get("Status", "Disabled")
if status != "Enabled":
self._add(
"HIGH",
f"Versioning not enabled: {bucket}",
"Without versioning, poisoned training data or "
"tampered model artifacts cannot be recovered. "
"An attacker who overwrites data leaves no trace.",
)
mfa_delete = ver.get("MFADelete", "Disabled")
if mfa_delete != "Enabled" and status == "Enabled":
self._add(
"MEDIUM",
f"MFA Delete not enabled: {bucket}",
"Versioning is enabled but MFA Delete is not. An "
"attacker with DeleteObject permission can permanently "
"remove version history.",
)
except ClientError:
pass
def check_bucket_policy(self, bucket: str) -> None:
"""Analyze bucket policy for overly permissive access."""
try:
policy_str = self.s3.get_bucket_policy(Bucket=bucket)["Policy"]
policy = json.loads(policy_str)
for statement in policy.get("Statement", []):
effect = statement.get("Effect", "")
principal = statement.get("Principal", "")
actions = statement.get("Action", [])
if isinstance(actions, str):
actions = [actions]
# Check for wildcard principals
if principal == "*" or principal == {"AWS": "*"}:
if effect == "Allow":
self._add(
"CRITICAL",
f"Wildcard principal in Allow: {bucket}",
f"Actions: {actions}. Any AWS account can "
f"access this bucket. Training data and models "
f"are exposed to the internet.",
)
# Check for overly broad write access
write_actions = [
a for a in actions
if any(w in a for w in [
"PutObject", "DeleteObject", "s3:*",
])
]
if write_actions and effect == "Allow":
condition = statement.get("Condition", {})
if not condition:
self._add(
"HIGH",
f"Unconditional write access: {bucket}",
f"Write actions {write_actions} granted without "
f"conditions. Consider adding VPC endpoint, "
f"source IP, or MFA conditions.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchBucketPolicy":
pass # No policy is fine (relies on IAM)
def check_access_logging(self, bucket: str) -> None:
"""Verify access logging is enabled for audit trail."""
try:
logging = self.s3.get_bucket_logging(Bucket=bucket)
if "LoggingEnabled" not in logging:
self._add(
"MEDIUM",
f"Access logging disabled: {bucket}",
"No S3 access logging. Cannot detect unauthorized "
"reads of training data or writes to model artifacts.",
)
except ClientError:
pass
def audit_bucket(self, bucket: str) -> None:
"""Run all checks against a single bucket."""
self.check_public_access(bucket)
self.check_encryption(bucket)
self.check_versioning(bucket)
self.check_bucket_policy(bucket)
self.check_access_logging(bucket)
def audit_all_ai_buckets(self) -> list[dict]:
"""Discover and audit all AI-related S3 buckets."""
self.findings = []
try:
buckets = self.s3.list_buckets()["Buckets"]
except ClientError as e:
self._add("ERROR", "Cannot list buckets", str(e))
return self.findings
ai_buckets = [
b["Name"] for b in buckets if self.is_ai_bucket(b["Name"])
]
if not ai_buckets:
self._add(
"INFO",
"No AI-related buckets found",
"No buckets matched AI naming patterns. "
"Specify buckets manually.",
)
return self.findings
for bucket in ai_buckets:
self.audit_bucket(bucket)
return self.findings
if __name__ == "__main__":
import sys
auditor = S3AIStorageAuditor()
if len(sys.argv) > 1:
for bucket in sys.argv[1:]:
auditor.audit_bucket(bucket)
else:
auditor.audit_all_ai_buckets()
for f in auditor.findings:
print(f"[{f['severity']}] {f['title']}")
print(f" {f['detail']}\n")Google Cloud Storage Security
GCS uses IAM for access control with a model slightly different from S3. Key differences relevant to AI storage security:
- Uniform bucket-level access: GCS encourages uniform access (IAM only, no ACLs), which simplifies policy auditing. However, legacy buckets may still use fine-grained ACLs.
- Signed URLs: Both S3 and GCS support signed URLs for temporary access, but these are often generated with excessive duration for training jobs that run for days.
- Object lifecycle: GCS lifecycle policies can automatically delete or archive objects, which could be exploited to remove training data versions that would reveal poisoning.
HDFS and On-Premise Storage
Hadoop Distributed Filesystem (HDFS) remains common in on-premise AI clusters, particularly for organizations with existing big data infrastructure. HDFS security is based on Kerberos authentication and POSIX-style permissions.
HDFS-specific attack vectors for AI workloads:
- NameNode compromise: The NameNode maintains the filesystem metadata. Compromising it provides full knowledge of where all training data blocks are stored and can redirect reads to attacker-controlled DataNodes.
- DataNode direct access: In misconfigured clusters, DataNodes may be directly accessible on their HTTP or data transfer ports, bypassing NameNode access controls.
- Transparent encryption zone gaps: HDFS supports encryption zones, but they must be explicitly configured per directory. Training data in non-encrypted zones is stored in plaintext on DataNode disks.
Shared Filesystem Vulnerabilities (NFS, Lustre, GPFS)
On-premise GPU clusters frequently use high-performance parallel filesystems for training data access. These filesystems have security characteristics that differ significantly from cloud object stores:
NFS (Network File System) is still widely used for sharing training data across GPU nodes. NFS v3 relies on client-side UID/GID for authorization, meaning any client that can mount the export can access files as any user. Even NFS v4 with Kerberos is often deployed in AUTH_SYS mode for performance, which provides no real authentication. An attacker who gains access to any node that mounts the NFS export can read or modify all training data.
Lustre is the dominant parallel filesystem for large-scale AI training (used in most Top500 supercomputers). Lustre's security model is minimal — it relies on the underlying network fabric for isolation. In typical deployments, any node on the InfiniBand network can access any Lustre filesystem. There is no per-user authentication at the filesystem level; POSIX permissions are enforced by the client, not the server. This means a compromised compute node can access all files on the Lustre filesystem regardless of their ownership and permissions.
GPFS (IBM Spectrum Scale) provides stronger security through protocol-level encryption and access control lists, but these features must be explicitly enabled and configured. Many GPFS deployments in AI clusters use default settings that prioritize performance over security.
For all shared filesystems, the key red team insight is: compromising any single compute node that mounts the shared filesystem typically provides access to all training data, model checkpoints, and job scripts stored on that filesystem. This makes the shared filesystem a high-value target for lateral movement in GPU clusters.
Data Poisoning Through Storage Compromise
Silent Data Modification
The most impactful attack through storage compromise is data poisoning — modifying training data to influence model behavior without triggering obvious errors. This is particularly effective at the storage layer because:
- No code changes needed: The training pipeline code is legitimate; only the data it reads has been modified.
- Scale: An attacker with storage write access can modify millions of training samples in a single operation.
- Persistence: Without versioning and integrity checks, the poisoned data becomes the ground truth.
Targeted Poisoning Strategies
An attacker with write access to training data storage can implement several poisoning strategies depending on their objectives:
Label flipping: Change the labels of a small subset of training examples. For example, in a content moderation model, relabel a fraction of toxic content as safe. The model learns to misclassify similar content in production. This requires only modifying metadata (labels), not the input data itself, making it harder to detect.
Backdoor injection: Add training examples that contain a specific trigger pattern (a particular phrase, pixel pattern, or data artifact) with a target label. The model learns to associate the trigger with the target label. At inference time, the attacker includes the trigger in their input to control the model's output. The model behaves normally for all inputs without the trigger, making the backdoor extremely difficult to detect.
Data distribution shift: Gradually modify the training data distribution over time, so each individual change is small enough to pass anomaly detection. Over many training cycles, the cumulative shift causes the model to develop blind spots or biases that the attacker can exploit.
Clean-label attacks: Modify training examples in ways that are imperceptible to humans but cause the model to learn incorrect decision boundaries. For image models, this involves adding small perturbations to correctly-labeled images. For text models, this involves rephrasing or adding seemingly innocuous context that shifts the learned representations.
"""
Training data integrity verification framework.
Implements content-addressable storage checks for detecting
unauthorized modifications to training datasets.
"""
import hashlib
import json
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Iterator, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class IntegrityRecord:
"""Hash record for a single file in a training dataset."""
relative_path: str
sha256: str
size_bytes: int
last_verified: str
@dataclass
class DatasetManifest:
"""Integrity manifest for an entire training dataset."""
dataset_name: str
created_at: str
total_files: int
total_bytes: int
records: dict[str, IntegrityRecord] = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps({
"dataset_name": self.dataset_name,
"created_at": self.created_at,
"total_files": self.total_files,
"total_bytes": self.total_bytes,
"records": {
k: {
"sha256": v.sha256,
"size_bytes": v.size_bytes,
"last_verified": v.last_verified,
}
for k, v in self.records.items()
},
}, indent=2)
@classmethod
def from_json(cls, data: str) -> "DatasetManifest":
parsed = json.loads(data)
manifest = cls(
dataset_name=parsed["dataset_name"],
created_at=parsed["created_at"],
total_files=parsed["total_files"],
total_bytes=parsed["total_bytes"],
)
for path, record in parsed.get("records", {}).items():
manifest.records[path] = IntegrityRecord(
relative_path=path,
sha256=record["sha256"],
size_bytes=record["size_bytes"],
last_verified=record["last_verified"],
)
return manifest
def compute_file_hash(file_path: Path, chunk_size: int = 8192) -> tuple[str, int]:
"""Compute SHA-256 hash and size of a file."""
h = hashlib.sha256()
size = 0
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
size += len(chunk)
return h.hexdigest(), size
def create_manifest(
dataset_dir: str,
dataset_name: str,
max_workers: int = 8,
) -> DatasetManifest:
"""
Create an integrity manifest for a training dataset directory.
This should be done immediately after data preparation and
stored in a separate, write-protected location.
"""
from datetime import datetime, timezone
base = Path(dataset_dir)
now = datetime.now(timezone.utc).isoformat()
files = list(base.rglob("*"))
files = [f for f in files if f.is_file()]
manifest = DatasetManifest(
dataset_name=dataset_name,
created_at=now,
total_files=len(files),
total_bytes=0,
)
def process_file(file_path: Path) -> IntegrityRecord:
rel_path = str(file_path.relative_to(base))
sha256, size = compute_file_hash(file_path)
return IntegrityRecord(
relative_path=rel_path,
sha256=sha256,
size_bytes=size,
last_verified=now,
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(process_file, f): f for f in files
}
for future in as_completed(futures):
record = future.result()
manifest.records[record.relative_path] = record
manifest.total_bytes += record.size_bytes
return manifest
def verify_dataset(
dataset_dir: str,
manifest: DatasetManifest,
max_workers: int = 8,
) -> list[dict]:
"""
Verify a training dataset against its integrity manifest.
Returns list of findings (modifications, additions, deletions).
"""
from datetime import datetime, timezone
base = Path(dataset_dir)
findings = []
now = datetime.now(timezone.utc).isoformat()
current_files = {
str(f.relative_to(base))
for f in base.rglob("*") if f.is_file()
}
manifest_files = set(manifest.records.keys())
# Check for new files (potential injection)
added = current_files - manifest_files
for path in added:
findings.append({
"severity": "HIGH",
"type": "ADDED",
"path": path,
"detail": "File not in original manifest — possible data injection.",
})
# Check for deleted files
deleted = manifest_files - current_files
for path in deleted:
findings.append({
"severity": "HIGH",
"type": "DELETED",
"path": path,
"detail": "File missing from dataset — possible targeted removal.",
})
# Check for modified files
common = current_files & manifest_files
def check_file(rel_path: str) -> Optional[dict]:
expected = manifest.records[rel_path]
file_path = base / rel_path
actual_hash, actual_size = compute_file_hash(file_path)
if actual_hash != expected.sha256:
return {
"severity": "CRITICAL",
"type": "MODIFIED",
"path": rel_path,
"detail": (
f"Hash mismatch. Expected {expected.sha256[:16]}..., "
f"got {actual_hash[:16]}... "
f"Size: {expected.size_bytes} -> {actual_size}. "
f"File has been modified — possible data poisoning."
),
}
return None
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(check_file, p): p for p in common
}
for future in as_completed(futures):
result = future.result()
if result:
findings.append(result)
return findings
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <create|verify> <dataset_dir> [manifest_file]")
sys.exit(1)
action = sys.argv[1]
dataset_dir = sys.argv[2]
if action == "create":
name = os.path.basename(dataset_dir.rstrip("/"))
manifest = create_manifest(dataset_dir, name)
manifest_path = f"{name}_manifest.json"
with open(manifest_path, "w") as f:
f.write(manifest.to_json())
print(f"Created manifest: {manifest_path}")
print(f" Files: {manifest.total_files}")
print(f" Size: {manifest.total_bytes:,} bytes")
elif action == "verify":
manifest_file = sys.argv[3] if len(sys.argv) > 3 else None
if not manifest_file:
print("Manifest file required for verification")
sys.exit(1)
with open(manifest_file) as f:
manifest = DatasetManifest.from_json(f.read())
findings = verify_dataset(dataset_dir, manifest)
if not findings:
print("PASS: All files match manifest.")
else:
for finding in findings:
print(f"[{finding['severity']}] {finding['type']}: {finding['path']}")
print(f" {finding['detail']}")Practical Examples
S3 Bucket Enumeration for AI Assets
#!/usr/bin/env bash
# Enumerate S3 buckets for AI-related assets using common naming conventions
# This script attempts to identify publicly accessible training data and models
echo "=== S3 AI Asset Enumeration ==="
COMPANY="${1:?Usage: $0 <company_name>}"
# Common bucket naming patterns for AI/ML workloads
PATTERNS=(
"${COMPANY}-training-data"
"${COMPANY}-ml-models"
"${COMPANY}-model-artifacts"
"${COMPANY}-datasets"
"${COMPANY}-sagemaker"
"${COMPANY}-mlflow"
"${COMPANY}-ai-pipeline"
"${COMPANY}-model-registry"
"${COMPANY}-embeddings"
"${COMPANY}-checkpoints"
"${COMPANY}-feature-store"
)
for bucket in "${PATTERNS[@]}"; do
# Check if bucket exists and is accessible
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" \
"https://${bucket}.s3.amazonaws.com/" 2>/dev/null)
case $HTTP_CODE in
200)
echo "[CRITICAL] ${bucket} - PUBLICLY ACCESSIBLE (listing enabled)"
# List first few objects
curl -s "https://${bucket}.s3.amazonaws.com/" \
| python3 -c "
import sys, xml.etree.ElementTree as ET
try:
tree = ET.parse(sys.stdin)
ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
keys = tree.findall('.//s3:Key', ns)[:5]
for k in keys:
print(f' {k.text}')
except: pass
" 2>/dev/null
;;
403)
echo "[EXISTS] ${bucket} - Exists but access denied"
;;
404)
echo "[ ] ${bucket} - Does not exist"
;;
*)
echo "[????] ${bucket} - HTTP $HTTP_CODE"
;;
esac
doneData Lifecycle Security
Training data has a lifecycle that creates security considerations at each stage:
Collection and ingestion: Data enters the AI pipeline from various sources — web scraping, user uploads, third-party datasets, sensor data, and synthetic generation. Each ingestion path is a potential poisoning vector. Implement provenance tracking that records the source, collection timestamp, and any transformations applied to each data record. Use content-addressable storage (CAS) where each data record is addressed by its cryptographic hash, making unauthorized modifications detectable.
Preprocessing and augmentation: Data preprocessing scripts transform raw data into training-ready format through normalization, tokenization, augmentation, and feature extraction. These scripts run with read access to raw data and write access to processed data, making them high-value targets. A compromised preprocessing script can selectively poison data while appearing to function normally. Pin preprocessing dependencies, review transformation code, and compare output distributions against baselines.
Storage during training: Training data is read repeatedly across multiple epochs. If the storage system allows modifications during training (as opposed to immutable snapshots), an attacker can modify data between epochs. The model will learn from both the original and modified versions, making the poisoning effect unpredictable but generally in the attacker's favor. Use read-only snapshots or Object Lock during training runs.
Archival and retention: After training completes, training data is often retained for reproducibility, compliance, or future retraining. Archived data may have weaker access controls than active data, creating a window for retroactive poisoning that affects future retraining cycles. Apply the same integrity verification to archived data as to active data.
Deletion and cleanup: When training data must be deleted (for compliance with data subject requests under GDPR/CCPA, or because it contains sensitive information), verify that deletion is complete across all copies, caches, and backups. Incomplete deletion leaves residual data that could be accessed by an attacker or inadvertently used in future training.
Defense and Mitigation
Storage access control:
- Apply least-privilege IAM policies: training jobs need
s3:GetObjecton data buckets ands3:PutObjecton checkpoint/output buckets. Never grants3:*. - Use VPC endpoints and bucket policies that restrict access to specific VPCs, preventing access from outside the training network.
- Enable S3 Block Public Access at the account level, not just bucket level.
- For HDFS, use Kerberos authentication and Ranger for fine-grained access policies.
Data integrity:
- Enable versioning on all AI storage buckets. Use Object Lock for immutable training datasets.
- Create and verify integrity manifests before training. Store manifests in a separate, write-protected location.
- Use S3 Object Lock in compliance mode to prevent deletion or modification of training data during its retention period.
Encryption:
- Enable default encryption with customer-managed KMS keys. This provides audit trail through CloudTrail KMS events.
- For sensitive training data, use client-side encryption so data is encrypted before it reaches the storage service.
- Rotate encryption keys according to your compliance requirements.
Monitoring and detection:
- Enable S3 access logging and CloudTrail data events for all AI buckets.
- Alert on anomalous access patterns: bulk downloads, writes from unexpected principals, access from unusual IP ranges.
- Monitor for version deletions and lifecycle policy changes that could indicate an attacker covering tracks.
References
- AWS. (2024). "Security Best Practices for Amazon S3." https://docs.aws.amazon.com/AmazonS3/latest/userguide/security-best-practices.html
- MITRE ATLAS. "Poisoning Training Data." https://atlas.mitre.org/techniques/AML.T0020
- NIST. (2023). "AI Risk Management Framework (AI RMF 1.0)." https://airc.nist.gov/AI_RMF_Interactivity/
- Google Cloud. (2024). "Best practices for Cloud Storage." https://cloud.google.com/storage/docs/best-practices