Opslagsystemen voor trainingsdata beveiligen
Aanvals- en verdedigingsstrategieën voor S3, GCS, HDFS en objectopslagsystemen die AI-trainingsdatasets en modelartefacten bevatten
Overzicht
De opslaglaag is de basis van elk AI-systeem. Trainingsdatasets, validatiesets, modelcheckpoints, definitieve modelartefacten, feature stores en experimentmetadata bevinden zich allemaal in opslagsystemen. Voor moderne AI-workloads zijn deze opslagsystemen doorgaans cloud-objectstores (Amazon S3, Google Cloud Storage, Azure Blob Storage), gedistribueerde bestandssystemen (HDFS, Lustre, GPFS voor on-premise HPC-clusters) of, steeds vaker, speciaal gebouwde AI-dataplatforms die over meerdere backends abstraheren.
Het compromitteren van de opslaglaag is een van de meest effectieve aanvalsvectoren tegen AI-systemen, omdat het datavergiftiging mogelijk maakt zonder enige code te wijzigen. Een aanvaller die naar de opslag van trainingsdata kan schrijven, kan vergiftigde samples injecteren die gerichte misclassificatie in het getrainde model veroorzaken. Een aanvaller die uit de opslag van modelartefacten kan lezen, kan propriëtaire modellen stelen die miljoenen aan trainingscompute waard zijn. Een aanvaller die modelartefacten kan wijzigen, kan backdoors invoegen die de deployment overleven.
De security-uitdaging wordt verergerd door de schaal van AI-dataoperaties. Trainingsdatasets kunnen terabytes tot petabytes groot zijn, waardoor integriteitsverificatie rekenkundig duur wordt. Datapijplijnen vereisen vaak brede leestoegang over veel opslaglocaties, wat te ruime IAM-beleidsregels creëert. Trainingsjobs hebben schrijftoegang nodig tot checkpoint- en outputlocaties, en dezelfde inloggegevens worden vaak hergebruikt voor het lezen van data en het schrijven van modellen. Multi-team-omgevingen delen opslag-namespaces met onvoldoende toegangsgrenzen.
Dit artikel bekijkt opslagbeveiliging door de lens van AI-specifieke aanvalsscenario's en behandelt cloud-objectstores, gedistribueerde bestandssystemen en het overkoepelende vraagstuk van data-integriteit voor machine learning.
Aanvalsoppervlak van cloud-objectopslag
Amazon S3 voor AI-workloads
S3 is de meest gebruikte opslag-backend voor AI-trainingsdata in cloud-omgevingen. SageMaker, custom training op EC2 en hybride cloud-opstellingen halen doorgaans allemaal trainingsdata op uit S3 en schrijven modelartefacten terug naar S3. Het S3-securitymodel omvat bucket policies, IAM-beleidsregels, ACL's (legacy) en encryptie-instellingen. Misconfiguraties op elk niveau kunnen trainingsdata blootstellen of modelmanipulatie mogelijk maken.
Veelvoorkomende S3-misconfiguraties in AI-implementaties:
- Te ruime bucket policies: Trainingsdata-buckets hebben vaak
s3:GetObjecttoegekend aan brede principals omdat meerdere diensten en teams toegang nodig hebben. - Ontbrekende server-side encryptie: Trainingsdata met PII of propriëtaire informatie die zonder encryptie at rest wordt opgeslagen.
- Publieke bucket-ACL's: Legacy ACL-instellingen die buckets onbedoeld publiek leesbaar maken.
- Cross-account-toegang zonder voorwaarden: S3-bucket policies die toegang vanuit partneraccounts toestaan zonder te beperken tot specifieke rollen of voorwaarden.
- Ontbrekende versioning: Zonder versioning vervangt een vergiftigd bestand het origineel zonder herstelpad.
"""
S3 security auditor for AI training data and model artifact buckets.
Checks for misconfigurations that could enable data poisoning,
model theft, or unauthorized access to training data.
"""
import json
import re
from typing import Any, Optional
try:
import boto3
from botocore.exceptions import ClientError
except ImportError:
print("boto3 required: pip install boto3")
raise
class S3AIStorageAuditor:
"""Audit S3-buckets die gebruikt worden voor AI-workloads."""
# Patronen die wijzen op AI-gerelateerde buckets
AI_BUCKET_PATTERNS = [
r"train", r"dataset", r"model", r"checkpoint",
r"artifact", r"feature", r"mlflow", r"sagemaker",
r"experiment", r"pipeline", r"embedding",
]
def __init__(self, session: Optional[Any] = None):
self.session = session or boto3.Session()
self.s3 = self.session.client("s3")
self.findings: list[dict] = []
def _add(self, severity: str, title: str, detail: str) -> None:
self.findings.append({
"severity": severity, "title": title, "detail": detail,
})
def is_ai_bucket(self, bucket_name: str) -> bool:
"""Identificeer AI-gerelateerde buckets heuristisch."""
name_lower = bucket_name.lower()
return any(
re.search(pattern, name_lower)
for pattern in self.AI_BUCKET_PATTERNS
)
def check_public_access(self, bucket: str) -> None:
"""Controleer of de bucket publieke toegang heeft ingeschakeld."""
try:
pab = self.s3.get_public_access_block(Bucket=bucket)
config = pab["PublicAccessBlockConfiguration"]
if not all([
config.get("BlockPublicAcls", False),
config.get("IgnorePublicAcls", False),
config.get("BlockPublicPolicy", False),
config.get("RestrictPublicBuckets", False),
]):
self._add(
"CRITICAL",
f"Public access not fully blocked: {bucket}",
f"PublicAccessBlock config: {json.dumps(config)}. "
f"Training data or model artifacts may be publicly "
f"accessible.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchPublicAccessBlockConfiguration":
self._add(
"CRITICAL",
f"No public access block: {bucket}",
"Bucket has no PublicAccessBlock configuration. "
"Public access is possible via bucket policies or ACLs.",
)
def check_encryption(self, bucket: str) -> None:
"""Controleer de configuratie van server-side encryptie."""
try:
enc = self.s3.get_bucket_encryption(Bucket=bucket)
rules = enc["ServerSideEncryptionConfiguration"]["Rules"]
for rule in rules:
sse = rule.get("ApplyServerSideEncryptionByDefault", {})
algo = sse.get("SSEAlgorithm", "none")
if algo == "AES256":
self._add(
"LOW",
f"SSE-S3 encryption (AES256): {bucket}",
"Using AWS-managed keys. Consider SSE-KMS with "
"customer-managed keys for AI data with compliance "
"requirements.",
)
elif algo == "aws:kms":
kms_key = sse.get("KMSMasterKeyID", "default")
if kms_key == "default" or "alias/aws/" in str(kms_key):
self._add(
"MEDIUM",
f"SSE-KMS with AWS-managed key: {bucket}",
"Using an AWS-managed KMS key. Customer-managed "
"keys provide better access control and audit.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError":
self._add(
"HIGH",
f"No default encryption: {bucket}",
"Bucket has no default encryption. Training data and "
"model artifacts may be stored unencrypted.",
)
def check_versioning(self, bucket: str) -> None:
"""Controleer of versioning is ingeschakeld (kritiek voor data-integriteit)."""
try:
ver = self.s3.get_bucket_versioning(Bucket=bucket)
status = ver.get("Status", "Disabled")
if status != "Enabled":
self._add(
"HIGH",
f"Versioning not enabled: {bucket}",
"Without versioning, poisoned training data or "
"tampered model artifacts cannot be recovered. "
"An attacker who overwrites data leaves no trace.",
)
mfa_delete = ver.get("MFADelete", "Disabled")
if mfa_delete != "Enabled" and status == "Enabled":
self._add(
"MEDIUM",
f"MFA Delete not enabled: {bucket}",
"Versioning is enabled but MFA Delete is not. An "
"attacker with DeleteObject permission can permanently "
"remove version history.",
)
except ClientError:
pass
def check_bucket_policy(self, bucket: str) -> None:
"""Analyseer de bucket policy op te ruime toegang."""
try:
policy_str = self.s3.get_bucket_policy(Bucket=bucket)["Policy"]
policy = json.loads(policy_str)
for statement in policy.get("Statement", []):
effect = statement.get("Effect", "")
principal = statement.get("Principal", "")
actions = statement.get("Action", [])
if isinstance(actions, str):
actions = [actions]
# Controleer op wildcard-principals
if principal == "*" or principal == {"AWS": "*"}:
if effect == "Allow":
self._add(
"CRITICAL",
f"Wildcard principal in Allow: {bucket}",
f"Actions: {actions}. Any AWS account can "
f"access this bucket. Training data and models "
f"are exposed to the internet.",
)
# Controleer op te brede schrijftoegang
write_actions = [
a for a in actions
if any(w in a for w in [
"PutObject", "DeleteObject", "s3:*",
])
]
if write_actions and effect == "Allow":
condition = statement.get("Condition", {})
if not condition:
self._add(
"HIGH",
f"Unconditional write access: {bucket}",
f"Write actions {write_actions} granted without "
f"conditions. Consider adding VPC endpoint, "
f"source IP, or MFA conditions.",
)
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchBucketPolicy":
pass # Geen policy is prima (vertrouwt op IAM)
def check_access_logging(self, bucket: str) -> None:
"""Verifieer dat access logging is ingeschakeld voor de audit trail."""
try:
logging = self.s3.get_bucket_logging(Bucket=bucket)
if "LoggingEnabled" not in logging:
self._add(
"MEDIUM",
f"Access logging disabled: {bucket}",
"No S3 access logging. Cannot detect unauthorized "
"reads of training data or writes to model artifacts.",
)
except ClientError:
pass
def audit_bucket(self, bucket: str) -> None:
"""Voer alle controles uit op één enkele bucket."""
self.check_public_access(bucket)
self.check_encryption(bucket)
self.check_versioning(bucket)
self.check_bucket_policy(bucket)
self.check_access_logging(bucket)
def audit_all_ai_buckets(self) -> list[dict]:
"""Ontdek en audit alle AI-gerelateerde S3-buckets."""
self.findings = []
try:
buckets = self.s3.list_buckets()["Buckets"]
except ClientError as e:
self._add("ERROR", "Cannot list buckets", str(e))
return self.findings
ai_buckets = [
b["Name"] for b in buckets if self.is_ai_bucket(b["Name"])
]
if not ai_buckets:
self._add(
"INFO",
"No AI-related buckets found",
"No buckets matched AI naming patterns. "
"Specify buckets manually.",
)
return self.findings
for bucket in ai_buckets:
self.audit_bucket(bucket)
return self.findings
if __name__ == "__main__":
import sys
auditor = S3AIStorageAuditor()
if len(sys.argv) > 1:
for bucket in sys.argv[1:]:
auditor.audit_bucket(bucket)
else:
auditor.audit_all_ai_buckets()
for f in auditor.findings:
print(f"[{f['severity']}] {f['title']}")
print(f" {f['detail']}\n")Google Cloud Storage-security
GCS gebruikt IAM voor toegangsbeheer met een model dat enigszins verschilt van S3. Belangrijke verschillen die relevant zijn voor AI-opslagbeveiliging:
- Uniform bucket-level access: GCS stimuleert uniforme toegang (alleen IAM, geen ACL's), wat het auditen van beleid vereenvoudigt. Legacy-buckets kunnen echter nog steeds fijnmazige ACL's gebruiken.
- Signed URLs: Zowel S3 als GCS ondersteunen signed URLs voor tijdelijke toegang, maar deze worden vaak gegenereerd met excessieve duur voor trainingsjobs die dagenlang draaien.
- Object lifecycle: GCS-lifecyclebeleidsregels kunnen objecten automatisch verwijderen of archiveren, wat uitgebuit zou kunnen worden om trainingsdataversies te verwijderen die vergiftiging zouden onthullen.
HDFS en on-premise-opslag
Het Hadoop Distributed Filesystem (HDFS) komt nog steeds vaak voor in on-premise-AI-clusters, met name bij organisaties met bestaande big-data-infrastructuur. HDFS-security is gebaseerd op Kerberos-authenticatie en POSIX-achtige permissies.
HDFS-specifieke aanvalsvectoren voor AI-workloads:
- Compromittering van de NameNode: De NameNode onderhoudt de bestandssysteemmetadata. Het compromitteren ervan biedt volledige kennis van waar alle trainingsdatablokken worden opgeslagen en kan leesoperaties omleiden naar door de aanvaller gecontroleerde DataNodes.
- Directe DataNode-toegang: In verkeerd geconfigureerde clusters kunnen DataNodes direct toegankelijk zijn op hun HTTP- of datatransferpoorten, waarbij de toegangscontroles van de NameNode worden omzeild.
- Gaten in transparante encryption zones: HDFS ondersteunt encryption zones, maar deze moeten expliciet per directory worden geconfigureerd. Trainingsdata in niet-versleutelde zones wordt in platte tekst op de DataNode-schijven opgeslagen.
Kwetsbaarheden van gedeelde bestandssystemen (NFS, Lustre, GPFS)
On-premise GPU-clusters gebruiken vaak high-performance parallelle bestandssystemen voor toegang tot trainingsdata. Deze bestandssystemen hebben securitykenmerken die aanzienlijk verschillen van cloud-objectstores:
NFS (Network File System) wordt nog steeds op grote schaal gebruikt voor het delen van trainingsdata over GPU-nodes. NFS v3 vertrouwt op client-side UID/GID voor autorisatie, wat betekent dat elke client die de export kan mounten, bestanden als elke gebruiker kan benaderen. Zelfs NFS v4 met Kerberos wordt vaak uitgerold in AUTH_SYS-modus voor prestaties, wat geen echte authenticatie biedt. Een aanvaller die toegang krijgt tot elke node die de NFS-export mount, kan alle trainingsdata lezen of wijzigen.
Lustre is het dominante parallelle bestandssysteem voor grootschalige AI-training (gebruikt in de meeste Top500-supercomputers). Het securitymodel van Lustre is minimaal — het vertrouwt op de onderliggende netwerkfabric voor isolatie. In typische implementaties kan elke node op het InfiniBand-netwerk elk Lustre-bestandssysteem benaderen. Er is geen per-gebruiker-authenticatie op bestandssysteemniveau; POSIX-permissies worden afgedwongen door de client, niet door de server. Dit betekent dat een gecompromitteerde compute-node toegang heeft tot alle bestanden op het Lustre-bestandssysteem, ongeacht hun eigendom en permissies.
GPFS (IBM Spectrum Scale) biedt sterkere security via encryptie op protocolniveau en access control lists, maar deze functies moeten expliciet worden ingeschakeld en geconfigureerd. Veel GPFS-implementaties in AI-clusters gebruiken standaardinstellingen die prestaties boven security stellen.
Voor alle gedeelde bestandssystemen is het belangrijkste red team-inzicht: het compromitteren van een enkele compute-node die het gedeelde bestandssysteem mount, biedt doorgaans toegang tot alle trainingsdata, modelcheckpoints en jobscripts die op dat bestandssysteem zijn opgeslagen. Dit maakt het gedeelde bestandssysteem een waardevol doelwit voor lateral movement in GPU-clusters.
Datavergiftiging via compromittering van de opslag
Stille datawijziging
De meest impactvolle aanval via compromittering van de opslag is datavergiftiging — het wijzigen van trainingsdata om het modelgedrag te beïnvloeden zonder voor de hand liggende fouten te veroorzaken. Dit is bijzonder effectief op de opslaglaag omdat:
- Geen codewijzigingen nodig: De code van de trainingspijplijn is legitiem; alleen de data die het leest, is gewijzigd.
- Schaal: Een aanvaller met schrijftoegang tot de opslag kan miljoenen trainingssamples in één enkele operatie wijzigen.
- Persistentie: Zonder versioning en integriteitscontroles wordt de vergiftigde data de grondwaarheid.
Gerichte vergiftigingsstrategieën
Een aanvaller met schrijftoegang tot de opslag van trainingsdata kan verschillende vergiftigingsstrategieën implementeren, afhankelijk van zijn doelen:
Label flipping: Verander de labels van een kleine subset van trainingsvoorbeelden. In een contentmoderatiemodel zou je bijvoorbeeld een fractie van toxische content kunnen herlabelen als veilig. Het model leert om vergelijkbare content in productie verkeerd te classificeren. Dit vereist alleen het wijzigen van metadata (labels), niet de invoerdata zelf, wat het moeilijker te detecteren maakt.
Backdoor-injectie: Voeg trainingsvoorbeelden toe die een specifiek triggerpatroon bevatten (een bepaalde frase, pixelpatroon of data-artefact) met een doellabel. Het model leert de trigger te associëren met het doellabel. Op het moment van inferentie neemt de aanvaller de trigger op in zijn invoer om de uitvoer van het model te besturen. Het model gedraagt zich normaal voor alle invoer zonder de trigger, wat de backdoor uiterst moeilijk te detecteren maakt.
Verschuiving van de dataverdeling: Wijzig de verdeling van de trainingsdata geleidelijk in de loop van de tijd, zodat elke afzonderlijke wijziging klein genoeg is om anomaliedetectie te passeren. Over vele trainingscycli veroorzaakt de cumulatieve verschuiving dat het model blinde vlekken of biases ontwikkelt die de aanvaller kan uitbuiten.
Clean-label-aanvallen: Wijzig trainingsvoorbeelden op manieren die onmerkbaar zijn voor mensen, maar die het model onjuiste beslissingsgrenzen laten leren. Voor beeldmodellen houdt dit in dat er kleine perturbaties worden toegevoegd aan correct gelabelde afbeeldingen. Voor tekstmodellen houdt dit in dat er wordt herformuleerd of ogenschijnlijk onschuldige context wordt toegevoegd die de geleerde representaties verschuift.
"""
Training data integrity verification framework.
Implements content-addressable storage checks for detecting
unauthorized modifications to training datasets.
"""
import hashlib
import json
import os
from pathlib import Path
from dataclasses import dataclass, field
from typing import Iterator, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class IntegrityRecord:
"""Hash-record voor één bestand in een trainingsdataset."""
relative_path: str
sha256: str
size_bytes: int
last_verified: str
@dataclass
class DatasetManifest:
"""Integriteitsmanifest voor een volledige trainingsdataset."""
dataset_name: str
created_at: str
total_files: int
total_bytes: int
records: dict[str, IntegrityRecord] = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps({
"dataset_name": self.dataset_name,
"created_at": self.created_at,
"total_files": self.total_files,
"total_bytes": self.total_bytes,
"records": {
k: {
"sha256": v.sha256,
"size_bytes": v.size_bytes,
"last_verified": v.last_verified,
}
for k, v in self.records.items()
},
}, indent=2)
@classmethod
def from_json(cls, data: str) -> "DatasetManifest":
parsed = json.loads(data)
manifest = cls(
dataset_name=parsed["dataset_name"],
created_at=parsed["created_at"],
total_files=parsed["total_files"],
total_bytes=parsed["total_bytes"],
)
for path, record in parsed.get("records", {}).items():
manifest.records[path] = IntegrityRecord(
relative_path=path,
sha256=record["sha256"],
size_bytes=record["size_bytes"],
last_verified=record["last_verified"],
)
return manifest
def compute_file_hash(file_path: Path, chunk_size: int = 8192) -> tuple[str, int]:
"""Bereken de SHA-256-hash en grootte van een bestand."""
h = hashlib.sha256()
size = 0
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
size += len(chunk)
return h.hexdigest(), size
def create_manifest(
dataset_dir: str,
dataset_name: str,
max_workers: int = 8,
) -> DatasetManifest:
"""
Create an integrity manifest for a training dataset directory.
This should be done immediately after data preparation and
stored in a separate, write-protected location.
"""
from datetime import datetime, timezone
base = Path(dataset_dir)
now = datetime.now(timezone.utc).isoformat()
files = list(base.rglob("*"))
files = [f for f in files if f.is_file()]
manifest = DatasetManifest(
dataset_name=dataset_name,
created_at=now,
total_files=len(files),
total_bytes=0,
)
def process_file(file_path: Path) -> IntegrityRecord:
rel_path = str(file_path.relative_to(base))
sha256, size = compute_file_hash(file_path)
return IntegrityRecord(
relative_path=rel_path,
sha256=sha256,
size_bytes=size,
last_verified=now,
)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(process_file, f): f for f in files
}
for future in as_completed(futures):
record = future.result()
manifest.records[record.relative_path] = record
manifest.total_bytes += record.size_bytes
return manifest
def verify_dataset(
dataset_dir: str,
manifest: DatasetManifest,
max_workers: int = 8,
) -> list[dict]:
"""
Verify a training dataset against its integrity manifest.
Returns list of findings (modifications, additions, deletions).
"""
from datetime import datetime, timezone
base = Path(dataset_dir)
findings = []
now = datetime.now(timezone.utc).isoformat()
current_files = {
str(f.relative_to(base))
for f in base.rglob("*") if f.is_file()
}
manifest_files = set(manifest.records.keys())
# Controleer op nieuwe bestanden (potentiële injectie)
added = current_files - manifest_files
for path in added:
findings.append({
"severity": "HIGH",
"type": "ADDED",
"path": path,
"detail": "File not in original manifest — possible data injection.",
})
# Controleer op verwijderde bestanden
deleted = manifest_files - current_files
for path in deleted:
findings.append({
"severity": "HIGH",
"type": "DELETED",
"path": path,
"detail": "File missing from dataset — possible targeted removal.",
})
# Controleer op gewijzigde bestanden
common = current_files & manifest_files
def check_file(rel_path: str) -> Optional[dict]:
expected = manifest.records[rel_path]
file_path = base / rel_path
actual_hash, actual_size = compute_file_hash(file_path)
if actual_hash != expected.sha256:
return {
"severity": "CRITICAL",
"type": "MODIFIED",
"path": rel_path,
"detail": (
f"Hash mismatch. Expected {expected.sha256[:16]}..., "
f"got {actual_hash[:16]}... "
f"Size: {expected.size_bytes} -> {actual_size}. "
f"File has been modified — possible data poisoning."
),
}
return None
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(check_file, p): p for p in common
}
for future in as_completed(futures):
result = future.result()
if result:
findings.append(result)
return findings
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <create|verify> <dataset_dir> [manifest_file]")
sys.exit(1)
action = sys.argv[1]
dataset_dir = sys.argv[2]
if action == "create":
name = os.path.basename(dataset_dir.rstrip("/"))
manifest = create_manifest(dataset_dir, name)
manifest_path = f"{name}_manifest.json"
with open(manifest_path, "w") as f:
f.write(manifest.to_json())
print(f"Created manifest: {manifest_path}")
print(f" Files: {manifest.total_files}")
print(f" Size: {manifest.total_bytes:,} bytes")
elif action == "verify":
manifest_file = sys.argv[3] if len(sys.argv) > 3 else None
if not manifest_file:
print("Manifest file required for verification")
sys.exit(1)
with open(manifest_file) as f:
manifest = DatasetManifest.from_json(f.read())
findings = verify_dataset(dataset_dir, manifest)
if not findings:
print("PASS: All files match manifest.")
else:
for finding in findings:
print(f"[{finding['severity']}] {finding['type']}: {finding['path']}")
print(f" {finding['detail']}")Praktische voorbeelden
S3-bucket-enumeratie voor AI-assets
#!/usr/bin/env bash
# Enumerate S3 buckets for AI-related assets using common naming conventions
# This script attempts to identify publicly accessible training data and models
echo "=== S3 AI Asset Enumeration ==="
COMPANY="${1:?Usage: $0 <company_name>}"
# Veelvoorkomende bucket-naamgevingspatronen voor AI/ML-workloads
PATTERNS=(
"${COMPANY}-training-data"
"${COMPANY}-ml-models"
"${COMPANY}-model-artifacts"
"${COMPANY}-datasets"
"${COMPANY}-sagemaker"
"${COMPANY}-mlflow"
"${COMPANY}-ai-pipeline"
"${COMPANY}-model-registry"
"${COMPANY}-embeddings"
"${COMPANY}-checkpoints"
"${COMPANY}-feature-store"
)
for bucket in "${PATTERNS[@]}"; do
# Controleer of de bucket bestaat en toegankelijk is
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" \
"https://${bucket}.s3.amazonaws.com/" 2>/dev/null)
case $HTTP_CODE in
200)
echo "[CRITICAL] ${bucket} - PUBLICLY ACCESSIBLE (listing enabled)"
# Toon de eerste paar objecten
curl -s "https://${bucket}.s3.amazonaws.com/" \
| python3 -c "
import sys, xml.etree.ElementTree as ET
try:
tree = ET.parse(sys.stdin)
ns = {'s3': 'http://s3.amazonaws.com/doc/2006-03-01/'}
keys = tree.findall('.//s3:Key', ns)[:5]
for k in keys:
print(f' {k.text}')
except: pass
" 2>/dev/null
;;
403)
echo "[EXISTS] ${bucket} - Exists but access denied"
;;
404)
echo "[ ] ${bucket} - Does not exist"
;;
*)
echo "[????] ${bucket} - HTTP $HTTP_CODE"
;;
esac
doneSecurity van de datalevenscyclus
Trainingsdata heeft een levenscyclus die op elke fase securityoverwegingen creëert:
Verzameling en ingestie: Data komt de AI-pijplijn binnen vanuit verschillende bronnen — webscraping, gebruikersuploads, datasets van derden, sensordata en synthetische generatie. Elk ingestiepad is een potentiële vergiftigingsvector. Implementeer herkomsttracking die de bron, het tijdstip van verzameling en eventuele toegepaste transformaties van elke datarecord vastlegt. Gebruik content-addressable storage (CAS), waarbij elke datarecord wordt geadresseerd door zijn cryptografische hash, waardoor ongeautoriseerde wijzigingen detecteerbaar worden.
Voorbewerking en augmentatie: Datavoorbewerkingsscripts transformeren ruwe data naar trainingsklaar formaat via normalisatie, tokenisatie, augmentatie en feature-extractie. Deze scripts draaien met leestoegang tot ruwe data en schrijftoegang tot verwerkte data, wat ze waardevolle doelwitten maakt. Een gecompromitteerd voorbewerkingsscript kan data selectief vergiftigen terwijl het normaal lijkt te functioneren. Pin voorbewerkingsafhankelijkheden, beoordeel transformatiecode en vergelijk outputverdelingen met baselines.
Opslag tijdens training: Trainingsdata wordt herhaaldelijk gelezen over meerdere epochs. Als het opslagsysteem wijzigingen tijdens de training toestaat (in tegenstelling tot immutable snapshots), kan een aanvaller data tussen epochs wijzigen. Het model zal leren van zowel de originele als de gewijzigde versies, waardoor het vergiftigingseffect onvoorspelbaar wordt, maar over het algemeen in het voordeel van de aanvaller. Gebruik read-only snapshots of Object Lock tijdens trainingsruns.
Archivering en bewaring: Nadat de training is voltooid, wordt trainingsdata vaak bewaard voor reproduceerbaarheid, compliance of toekomstige hertraining. Gearchiveerde data kan zwakkere toegangscontroles hebben dan actieve data, wat een venster creëert voor retroactieve vergiftiging die toekomstige hertrainingscycli beïnvloedt. Pas dezelfde integriteitsverificatie toe op gearchiveerde data als op actieve data.
Verwijdering en opschoning: Wanneer trainingsdata moet worden verwijderd (voor naleving van verzoeken van betrokkenen onder de AVG/CCPA, of omdat het gevoelige informatie bevat), verifieer dan dat de verwijdering volledig is over alle kopieën, caches en back-ups. Onvolledige verwijdering laat restdata achter die door een aanvaller benaderd of onbedoeld in toekomstige training gebruikt zou kunnen worden.
Verdediging en mitigatie
Toegangscontrole voor opslag:
- Pas least-privilege-IAM-beleidsregels toe: trainingsjobs hebben
s3:GetObjectnodig op databuckets ens3:PutObjectop checkpoint-/outputbuckets. Verleen nooits3:*. - Gebruik VPC-endpoints en bucket policies die de toegang beperken tot specifieke VPC's, waardoor toegang van buiten het trainingsnetwerk wordt voorkomen.
- Schakel S3 Block Public Access in op accountniveau, niet alleen op bucketniveau.
- Gebruik voor HDFS Kerberos-authenticatie en Ranger voor fijnmazig toegangsbeleid.
Data-integriteit:
- Schakel versioning in op alle AI-opslagbuckets. Gebruik Object Lock voor immutable trainingsdatasets.
- Maak en verifieer integriteitsmanifesten vóór de training. Sla manifesten op in een aparte, schrijfbeveiligde locatie.
- Gebruik S3 Object Lock in compliance-modus om verwijdering of wijziging van trainingsdata tijdens de bewaarperiode te voorkomen.
Encryptie:
- Schakel standaardencryptie in met customer-managed KMS-sleutels. Dit biedt een audit trail via CloudTrail-KMS-events.
- Gebruik voor gevoelige trainingsdata client-side encryptie, zodat data wordt versleuteld voordat deze de opslagdienst bereikt.
- Roteer encryptiesleutels volgens je compliance-vereisten.
Monitoring en detectie:
- Schakel S3 access logging en CloudTrail-data-events in voor alle AI-buckets.
- Alarmeer op afwijkende toegangspatronen: bulkdownloads, schrijfoperaties van onverwachte principals, toegang vanuit ongebruikelijke IP-ranges.
- Monitor op versieverwijderingen en wijzigingen in het lifecyclebeleid die erop kunnen wijzen dat een aanvaller zijn sporen uitwist.
References
- AWS. (2024). "Security Best Practices for Amazon S3." https://docs.aws.amazon.com/AmazonS3/latest/userguide/security-best-practices.html
- MITRE ATLAS. "Poisoning Training Data." https://atlas.mitre.org/techniques/AML.T0020
- NIST. (2023). "AI Risk Management Framework (AI RMF 1.0)." https://airc.nist.gov/AI_RMF_Interactivity/
- Google Cloud. (2024). "Best practices for Cloud Storage." https://cloud.google.com/storage/docs/best-practices