Securing Storage Systems for 訓練 Data
攻擊 and defense strategies for S3, GCS, HDFS, and object storage systems holding AI training datasets and model artifacts
概覽
The storage layer is the foundation of every AI system. Training datasets, validation sets, model checkpoints, final model artifacts, feature stores, and experiment metadata all reside in storage systems. For modern AI workloads, these storage systems are typically 雲端 object stores (Amazon S3, Google 雲端 Storage, Azure Blob Storage), distributed filesystems (HDFS, Lustre, GPFS for on-premise HPC clusters), or increasingly, purpose-built AI data platforms that abstract over multiple backends.
Compromising the storage layer is one of the most effective attack vectors against AI systems 因為 it enables 資料投毒 without modifying any code. 攻擊者 who can write to 訓練資料 storage can inject poisoned samples that cause targeted misclassification in the trained model. 攻擊者 who can read from model artifact storage can steal proprietary models worth millions in 訓練 compute. 攻擊者 who can modify model artifacts can insert backdoors that persist through deployment.
The 安全 challenge is compounded by the scale of AI data operations. Training datasets can be terabytes to petabytes, making integrity verification computationally expensive. Data pipelines often require broad read access across many storage locations, creating overly permissive IAM policies. Training jobs need write access to checkpoint and 輸出 locations, and the same credentials are often reused across data reading and model writing operations. Multi-team environments share storage namespaces with insufficient access boundaries.
This article examines storage 安全 through the lens of AI-specific attack scenarios, covering 雲端 object stores, distributed filesystems, and the cross-cutting concern of data integrity for machine learning.
雲端 Object Storage 攻擊 Surface
Amazon S3 for AI Workloads
S3 is the most commonly used storage backend for AI 訓練資料 in 雲端 environments. SageMaker, custom 訓練 on EC2, and hybrid 雲端 setups all typically pull 訓練資料 from S3 and write model artifacts back to S3. The S3 安全 model involves bucket policies, IAM policies, ACLs (legacy), and encryption settings. Misconfigurations at any level can expose 訓練資料 or allow model tampering.
Common S3 misconfigurations in AI deployments:
- Overly permissive bucket policies: 訓練資料 buckets often have
s3:GetObjectgranted to broad principals 因為 multiple services and teams need access. - Missing server-side encryption: 訓練資料 containing PII or proprietary information stored without encryption at rest.
- Public bucket ACLs: Legacy ACL settings that inadvertently make buckets publicly readable.
- Cross-account access without conditions: S3 bucket policies that allow access from partner accounts without restricting to specific roles or conditions.
- Missing versioning: Without versioning, a poisoned file replaces the original with no recovery path.
"""
S3 安全 auditor for AI 訓練資料 and model artifact buckets.
Checks for misconfigurations that could enable 資料投毒,
model theft, or unauthorized access to 訓練資料.
"""
import json
import re
from typing import Any, Optional
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
print("boto3 required: pip install boto3")
raise
class S3AIStorageAuditor:
"""Audit S3 buckets used for AI workloads."""
# Patterns that indicate AI-related buckets
AI_BUCKET_PATTERNS = [
r"train", r"dataset", r"model", r"checkpoint",
r"artifact", r"feature", r"mlflow", r"sagemaker",
r"experiment", r"pipeline", r"嵌入向量",
]
def __init__(self, session: Optional[Any] = None):
self.session = session or boto3.Session()
self.s3 = self.session.client("s3")
self.findings: list[dict] = []
def _add(self, severity: str, title: str, detail: str) -> None:
self.findings.append({
"severity": severity, "title": title, "detail": detail,
})
def is_ai_bucket(self, bucket_name: str) -> bool:
"""Heuristically 識別 AI-related buckets."""
name_lower = bucket_name.lower()
return any(
re.search(pattern, name_lower)
for pattern in self.AI_BUCKET_PATTERNS
)
def check_public_access(self, bucket: str) -> None:
"""Check if bucket has public access enabled."""
try:
pab = self.s3.get_public_access_block(Bucket=bucket)
config = pab["PublicAccessBlockConfiguration"]
if not all([
config.get("BlockPublicAcls", False),
config.get("IgnorePublicAcls", False),
config.get("BlockPublicPolicy", False),
config.get("RestrictPublicBuckets", False),
]):
self._add(
"CRITICAL",
f"Public access not fully blocked: {bucket}",
f"PublicAccessBlock config: {json.dumps(config)}. "
f"訓練資料 or model artifacts may be publicly "
f"accessible.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration":
self._add(
"CRITICAL",
f"No public access block: {bucket}",
"Bucket has no PublicAccessBlock configuration. "
"Public access is possible via bucket policies or ACLs.",
)
def check_encryption(self, bucket: str) -> None:
"""Check server-side encryption configuration."""
try:
enc = self.s3.get_bucket_encryption(Bucket=bucket)
rules = enc["ServerSideEncryptionConfiguration"]["Rules"]
for rule in rules:
sse = rule.get("ApplyServerSideEncryptionByDefault", {})
algo = sse.get("SSEAlgorithm", "none")
if algo == "AES256":
self._add(
"LOW",
f"SSE-S3 encryption (AES256): {bucket}",
"Using AWS-managed keys. 考慮 SSE-KMS with "
"customer-managed keys for AI data with compliance "
"requirements.",
)
elif algo == "aws:kms":
kms_key = sse.get("KMSMasterKeyID", "default")
if kms_key == "default" or "alias/aws/" in str(kms_key):
self._add(
"MEDIUM",
f"SSE-KMS with AWS-managed key: {bucket}",
"Using an AWS-managed KMS key. Customer-managed "
"keys provide better access control and audit.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError":
self._add(
"HIGH",
f"No default encryption: {bucket}",
"Bucket has no default encryption. 訓練資料 and "
"model artifacts may be stored unencrypted.",
)
def check_versioning(self, bucket: str) -> None:
"""Check if versioning is enabled (critical for data integrity)."""
try:
ver = self.s3.get_bucket_versioning(Bucket=bucket)
status = ver.get("Status", "Disabled")
if status != "Enabled":
self._add(
"HIGH",
f"Versioning not enabled: {bucket}",
"Without versioning, poisoned 訓練資料 or "
"tampered model artifacts cannot be recovered. "
"攻擊者 who overwrites data leaves no trace.",
)
mfa_delete = ver.get("MFADelete", "Disabled")
if mfa_delete != "Enabled" and status == "Enabled":
self._add(
"MEDIUM",
f"MFA Delete not enabled: {bucket}",
"Versioning is enabled but MFA Delete is not. An "
"攻擊者 with DeleteObject 權限 can permanently "
"remove version history.",
)
except ClientError:
pass
def check_bucket_policy(self, bucket: str) -> None:
"""Analyze bucket policy for overly permissive access."""
try:
policy_str = self.s3.get_bucket_policy(Bucket=bucket)["Policy"]
policy = json.loads(policy_str)
for statement in policy.get("Statement", []):
effect = statement.get("Effect", "")
principal = statement.get("Principal", "")
actions = statement.get("Action", [])
if isinstance(actions, str):
actions = [actions]
# Check for wildcard principals
if principal == "*" or principal == {"AWS": "*"}:
if effect == "Allow":
self._add(
"CRITICAL",
f"Wildcard principal in Allow: {bucket}",
f"Actions: {actions}. Any AWS account can "
f"access this bucket. 訓練資料 and models "
f"are exposed to the internet.",
)
# Check for overly broad write access
write_actions = [
a for a in actions
if any(w in a for w in [
"PutObject", "DeleteObject", "s3:*",
])
]
if write_actions and effect == "Allow":
condition = statement.get("Condition", {})
if not condition:
self._add(
"HIGH",
f"Unconditional write access: {bucket}",
f"Write actions {write_actions} granted without "
f"conditions. 考慮 adding VPC endpoint, "
f"source IP, or MFA conditions.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchBucketPolicy":
pass # No policy is fine (relies on IAM)
def check_access_logging(self, bucket: str) -> None:
"""Verify access logging is enabled for audit trail."""
try:
logging = self.s3.get_bucket_logging(Bucket=bucket)
if "LoggingEnabled" not in logging:
self._add(
"MEDIUM",
f"Access logging disabled: {bucket}",
"No S3 access logging. Cannot detect unauthorized "
"reads of 訓練資料 or writes to model artifacts.",
)
except ClientError:
pass
def audit_bucket(self, bucket: str) -> None:
"""Run all checks against a single bucket."""
self.check_public_access(bucket)
self.check_encryption(bucket)
self.check_versioning(bucket)
self.check_bucket_policy(bucket)
self.check_access_logging(bucket)
def audit_all_ai_buckets(self) -> list[dict]:
"""Discover and audit all AI-related S3 buckets."""
self.findings = []
try:
buckets = self.s3.list_buckets()["Buckets"]
except ClientError as e:
self._add("ERROR", "Cannot list buckets", str(e))
return self.findings
ai_buckets = [
b["Name"] for b in buckets if self.is_ai_bucket(b["Name"])
]
if not ai_buckets:
self._add(
"INFO",
"No AI-related buckets found",
"No buckets matched AI naming patterns. "
"Specify buckets manually.",
)
return self.findings
for bucket in ai_buckets:
self.audit_bucket(bucket)
return self.findings
if __name__ == "__main__":
import sys
auditor = S3AIStorageAuditor()
if len(sys.argv) > 1:
for bucket in sys.argv[1:]:
auditor.audit_bucket(bucket)
else:
auditor.audit_all_ai_buckets()
for f in auditor.findings:
print(f"[{f['severity']}] {f['title']}")
print(f" {f['detail']}\n")Google 雲端 Storage 安全
GCS uses IAM for access control with a model slightly different from S3. Key differences relevant to AI storage 安全:
- Uniform bucket-level access: GCS encourages uniform access (IAM only, no ACLs), which simplifies policy auditing. 然而, legacy buckets may still use fine-grained ACLs.
- Signed URLs: Both S3 and GCS support signed URLs for temporary access, but these are often generated with excessive duration for 訓練 jobs that run for days.
- Object lifecycle: GCS lifecycle policies can automatically delete or archive objects, which could be exploited to remove 訓練資料 versions that would reveal 投毒.
HDFS and On-Premise Storage
Hadoop Distributed Filesystem (HDFS) remains common in on-premise AI clusters, particularly for organizations with existing big data infrastructure. HDFS 安全 is based on Kerberos 認證 and POSIX-style 權限.
HDFS-specific attack vectors for AI workloads:
- NameNode compromise: The NameNode maintains the filesystem metadata. Compromising it provides full knowledge of where all 訓練資料 blocks are stored and can redirect reads to 攻擊者-controlled DataNodes.
- DataNode direct access: In misconfigured clusters, DataNodes may be directly accessible on their HTTP or data transfer ports, bypassing NameNode access controls.
- Transparent encryption zone gaps: HDFS supports encryption zones, but they must be explicitly configured per directory. 訓練資料 in non-encrypted zones is stored in plaintext on DataNode disks.
Shared Filesystem 漏洞 (NFS, Lustre, GPFS)
On-premise GPU clusters frequently use high-performance parallel filesystems for 訓練資料 access. These filesystems have 安全 characteristics that differ significantly from 雲端 object stores:
NFS (Network File System) is still widely used for sharing 訓練資料 across GPU nodes. NFS v3 relies on client-side UID/GID for 授權, meaning any client that can mount the export can access files as any user. Even NFS v4 with Kerberos is often deployed in AUTH_SYS mode for performance, which provides no real 認證. 攻擊者 who gains access to any node that mounts the NFS export can read or modify all 訓練資料.
Lustre is the dominant parallel filesystem for large-scale AI 訓練 (used in most Top500 supercomputers). Lustre's 安全 model is minimal — it relies on the underlying network fabric for isolation. In typical deployments, any node on the InfiniBand network can access any Lustre filesystem. 存在 no per-user 認證 at the filesystem level; POSIX 權限 are enforced by the client, not the server. 這意味著 a compromised compute node can access all files on the Lustre filesystem regardless of their ownership and 權限.
GPFS (IBM Spectrum Scale) provides stronger 安全 through protocol-level encryption and access control lists, but these features must be explicitly enabled and configured. Many GPFS deployments in AI clusters use default settings that prioritize performance over 安全.
For all shared filesystems, the key 紅隊 insight is: compromising any single compute node that mounts the shared filesystem typically provides access to all 訓練資料, model checkpoints, and job scripts stored on that filesystem. This makes the shared filesystem a high-value target for lateral movement in GPU clusters.
Data Poisoning Through Storage Compromise
Silent Data Modification
The most impactful attack through storage compromise is 資料投毒 — modifying 訓練資料 to influence model behavior without triggering obvious errors. 這是 particularly effective at the storage layer 因為:
- No code changes needed: The 訓練 pipeline code is legitimate; only the data it reads has been modified.
- Scale: 攻擊者 with storage write access can modify millions of 訓練 samples in a single operation.
- Persistence: Without versioning and integrity checks, the poisoned data becomes the ground truth.
Targeted Poisoning Strategies
攻擊者 with write access to 訓練資料 storage can 實作 several 投毒 strategies depending on their objectives:
Label flipping: Change the labels of a small subset of 訓練 examples. 例如, in a content moderation model, relabel a fraction of toxic content as safe. 模型 learns to misclassify similar content in production. This requires only modifying metadata (labels), not the 輸入 data itself, making it harder to detect.
後門 injection: Add 訓練 examples that contain a specific trigger pattern (a particular phrase, pixel pattern, or data artifact) with a target label. 模型 learns to associate the trigger with the target label. At 推論 time, 攻擊者 includes the trigger in their 輸入 to control 模型's 輸出. 模型 behaves normally for all inputs without the trigger, making the 後門 extremely difficult to detect.
Data distribution shift: Gradually modify the 訓練資料 distribution over time, so each individual change is small enough to pass anomaly 偵測. Over many 訓練 cycles, the cumulative shift causes 模型 to develop blind spots or biases that 攻擊者 can 利用.
Clean-label attacks: Modify 訓練 examples in ways that are imperceptible to humans but cause 模型 to learn incorrect decision boundaries. For image models, this involves adding small perturbations to correctly-labeled images. For text models, this involves rephrasing or adding seemingly innocuous context that shifts the learned representations.
"""
訓練資料 integrity verification framework.
Implements content-addressable storage checks for detecting
unauthorized modifications to 訓練 datasets.
"""
import hashlib
import json
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Iterator, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class IntegrityRecord:
"""Hash record for a single file in a 訓練 dataset."""
relative_path: str
sha256: str
size_bytes: int
last_verified: str
@dataclass
class DatasetManifest:
"""Integrity manifest for an entire 訓練 dataset."""
dataset_name: str
created_at: str
total_files: int
total_bytes: int
records: dict[str, IntegrityRecord] = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps({
"dataset_name": self.dataset_name,
"created_at": self.created_at,
"total_files": self.total_files,
"total_bytes": self.total_bytes,
"records": {
k: {
"sha256": v.sha256,
"size_bytes": v.size_bytes,
"last_verified": v.last_verified,
}
for k, v in self.records.items()
},
}, indent=2)
@classmethod
def from_json(cls, data: str) -> "DatasetManifest":
parsed = json.loads(data)
manifest = cls(
dataset_name=parsed["dataset_name"],
created_at=parsed["created_at"],
total_files=parsed["total_files"],
total_bytes=parsed["total_bytes"],
)
for path, record in parsed.get("records", {}).items():
manifest.records[path] = IntegrityRecord(
relative_path=path,
sha256=record["sha256"],
size_bytes=record["size_bytes"],
last_verified=record["last_verified"],
)
return manifest
def compute_file_hash(file_path: Path, chunk_size: int = 8192) -> tuple[str, int]:
"""Compute SHA-256 hash and size of a file."""
h = hashlib.sha256()
size = 0
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
size += len(chunk)
return h.hexdigest(), size
def create_manifest(
dataset_dir: str,
dataset_name: str,
max_workers: int = 8,
) -> DatasetManifest:
"""
Create an integrity manifest for a 訓練 dataset directory.
This should be done immediately after data preparation and
stored in a separate, write-protected location.
"""
from datetime import datetime, timezone
base = Path(dataset_dir)
now = datetime.now(timezone.utc).isoformat()
files = list(base.rglob("*"))
files = [f for f in files if f.is_file()]
manifest = DatasetManifest(
dataset_name=dataset_name,
created_at=now,
total_files=len(files),
total_bytes=0,
)
def process_file(file_path: Path) -> IntegrityRecord:
rel_path = str(file_path.relative_to(base))
sha256, size = compute_file_hash(file_path)
return IntegrityRecord(
relative_path=rel_path,
sha256=sha256,
size_bytes=size,
last_verified=now,
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(process_file, f): f for f in files
}
for future in as_completed(futures):
record = future.result()
manifest.records[record.relative_path] = record
manifest.total_bytes += record.size_bytes
return manifest
def verify_dataset(
dataset_dir: str,
manifest: DatasetManifest,
max_workers: int = 8,
) -> list[dict]:
"""
Verify a 訓練 dataset against its integrity manifest.
Returns list of findings (modifications, additions, deletions).
"""
from datetime import datetime, timezone
base = Path(dataset_dir)
findings = []
now = datetime.now(timezone.utc).isoformat()
current_files = {
str(f.relative_to(base))
for f in base.rglob("*") if f.is_file()
}
manifest_files = set(manifest.records.keys())
# Check for new files (potential injection)
added = current_files - manifest_files
for path in added:
findings.append({
"severity": "HIGH",
"type": "ADDED",
"path": path,
"detail": "File not in original manifest — possible data injection.",
})
# Check for deleted files
deleted = manifest_files - current_files
for path in deleted:
findings.append({
"severity": "HIGH",
"type": "DELETED",
"path": path,
"detail": "File missing from dataset — possible targeted removal.",
})
# Check for modified files
common = current_files & manifest_files
def check_file(rel_path: str) -> Optional[dict]:
expected = manifest.records[rel_path]
file_path = base / rel_path
actual_hash, actual_size = compute_file_hash(file_path)
if actual_hash != expected.sha256:
return {
"severity": "CRITICAL",
"type": "MODIFIED",
"path": rel_path,
"detail": (
f"Hash mismatch. Expected {expected.sha256[:16]}..., "
f"got {actual_hash[:16]}... "
f"Size: {expected.size_bytes} -> {actual_size}. "
f"File has been modified — possible 資料投毒."
),
}
return None
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(check_file, p): p for p in common
}
for future in as_completed(futures):
result = future.result()
if result:
findings.append(result)
return findings
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <create|verify> <dataset_dir> [manifest_file]")
sys.exit(1)
action = sys.argv[1]
dataset_dir = sys.argv[2]
if action == "create":
name = os.path.basename(dataset_dir.rstrip("/"))
manifest = create_manifest(dataset_dir, name)
manifest_path = f"{name}_manifest.json"
with open(manifest_path, "w") as f:
f.write(manifest.to_json())
print(f"Created manifest: {manifest_path}")
print(f" Files: {manifest.total_files}")
print(f" Size: {manifest.total_bytes:,} bytes")
elif action == "verify":
manifest_file = sys.argv[3] if len(sys.argv) > 3 else None
if not manifest_file:
print("Manifest file required for verification")
sys.exit(1)
with open(manifest_file) as f:
manifest = DatasetManifest.from_json(f.read())
findings = verify_dataset(dataset_dir, manifest)
if not findings:
print("PASS: All files match manifest.")
else:
for finding in findings:
print(f"[{finding['severity']}] {finding['type']}: {finding['path']}")
print(f" {finding['detail']}")Practical 範例
S3 Bucket Enumeration for AI Assets
#!/usr/bin/env bash
# Enumerate S3 buckets for AI-related assets using common naming conventions
# This script attempts to 識別 publicly accessible 訓練資料 and models
echo "=== S3 AI Asset Enumeration ==="
COMPANY="${1:?Usage: $0 <company_name>}"
# Common bucket naming patterns for AI/ML workloads
PATTERNS=(
"${COMPANY}-訓練-data"
"${COMPANY}-ml-models"
"${COMPANY}-model-artifacts"
"${COMPANY}-datasets"
"${COMPANY}-sagemaker"
"${COMPANY}-mlflow"
"${COMPANY}-ai-pipeline"
"${COMPANY}-model-registry"
"${COMPANY}-嵌入向量"
"${COMPANY}-checkpoints"
"${COMPANY}-feature-store"
)
for bucket in "${PATTERNS[@]}"; do
# Check if bucket exists and is accessible
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" \
"https://${bucket}.s3.amazonaws.com/" 2>/dev/null)
case $HTTP_CODE in
200)
echo "[CRITICAL] ${bucket} - PUBLICLY ACCESSIBLE (listing enabled)"
# List first few objects
curl -s "https://${bucket}.s3.amazonaws.com/" \
| python3 -c "
import sys, xml.etree.ElementTree as ET
try:
tree = ET.parse(sys.stdin)
ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
keys = tree.findall('.//s3:Key', ns)[:5]
for k in keys:
print(f' {k.text}')
except: pass
" 2>/dev/null
;;
403)
echo "[EXISTS] ${bucket} - Exists but access denied"
;;
404)
echo "[ ] ${bucket} - Does not exist"
;;
*)
echo "[????] ${bucket} - HTTP $HTTP_CODE"
;;
esac
doneData Lifecycle 安全
訓練資料 has a lifecycle that creates 安全 considerations at each stage:
Collection and ingestion: Data enters the AI pipeline from various sources — web scraping, user uploads, third-party datasets, sensor data, and synthetic generation. Each ingestion path is a potential 投毒 vector. 實作 provenance tracking that records the source, collection timestamp, and any transformations applied to each data record. Use content-addressable storage (CAS) where each data record is addressed by its cryptographic hash, making unauthorized modifications detectable.
Preprocessing and augmentation: Data preprocessing scripts transform raw data into 訓練-ready format through normalization, 分詞, augmentation, and feature extraction. These scripts run with read access to raw data and write access to processed data, making them high-value targets. A compromised preprocessing script can selectively poison data while appearing to function normally. Pin preprocessing dependencies, review transformation code, and compare 輸出 distributions against baselines.
Storage during 訓練: 訓練資料 is read repeatedly across multiple epochs. If the storage system allows modifications during 訓練 (as opposed to immutable snapshots), 攻擊者 can modify data between epochs. 模型 will learn from both the original and modified versions, making the 投毒 effect unpredictable but generally in 攻擊者's favor. Use read-only snapshots or Object Lock during 訓練 runs.
Archival and retention: After 訓練 completes, 訓練資料 is often retained for reproducibility, compliance, or future retraining. Archived data may have weaker access controls than active data, creating a window for retroactive 投毒 that affects future retraining cycles. Apply the same integrity verification to archived data as to active data.
Deletion and cleanup: When 訓練資料 must be deleted (for compliance with data subject requests under GDPR/CCPA, or 因為 it contains sensitive information), verify that deletion is complete across all copies, caches, and backups. Incomplete deletion leaves residual data that could be accessed by 攻擊者 or inadvertently used in future 訓練.
防禦 and 緩解
Storage access control:
- Apply least-privilege IAM policies: 訓練 jobs need
s3:GetObjecton data buckets ands3:PutObjecton checkpoint/輸出 buckets. Never grants3:*. - Use VPC endpoints and bucket policies that restrict access to specific VPCs, preventing access from outside the 訓練 network.
- Enable S3 Block Public Access at the account level, not just bucket level.
- For HDFS, use Kerberos 認證 and Ranger for fine-grained access policies.
Data integrity:
- Enable versioning on all AI storage buckets. Use Object Lock for immutable 訓練 datasets.
- Create and verify integrity manifests before 訓練. Store manifests in a separate, write-protected location.
- Use S3 Object Lock in compliance mode to prevent deletion or modification of 訓練資料 during its retention period.
Encryption:
- Enable default encryption with customer-managed KMS keys. This provides audit trail through CloudTrail KMS events.
- For sensitive 訓練資料, use client-side encryption so data is encrypted before it reaches the storage service.
- Rotate encryption keys according to your compliance requirements.
監控 and 偵測:
- Enable S3 access logging and CloudTrail data events for all AI buckets.
- Alert on anomalous access patterns: bulk downloads, writes from unexpected principals, access from unusual IP ranges.
- Monitor for version deletions and lifecycle policy changes that could indicate 攻擊者 covering tracks.
參考文獻
- AWS. (2024). "安全 最佳實務 for Amazon S3." https://docs.aws.amazon.com/AmazonS3/latest/userguide/安全-best-practices.html
- MITRE ATLAS. "Poisoning Training Data." https://atlas.mitre.org/techniques/AML.T0020
- NIST. (2023). "AI Risk Management Framework (AI RMF 1.0)." https://airc.nist.gov/AI_RMF_Interactivity/
- Google 雲端. (2024). "Best practices for 雲端 Storage." https://雲端.google.com/storage/docs/best-practices