Model Weight Encryption
Encryption at rest and in transit for ML model weights, protecting intellectual property and preventing unauthorized model access.
Overzicht
Modelgewichten vertegenwoordigen een aanzienlijke investering — het trainen van een frontier-model kost miljoenen dollars aan compute, en zelfs gefinetunede modellen coderen propriëtaire kennis die concurrentievoordeel biedt. Het beschermen van modelgewichten tegen ongeautoriseerde toegang is een kwestie van intellectueel eigendom, een regelgevingskwestie (als het model is getraind op gereguleerde data) en een beveiligingskwestie (een gestolen model kan worden bestudeerd om gemakkelijker kwetsbaarheden te vinden dan via een black-box-API).
Encryptie pakt twee belangrijke dreigingen aan: ongeautoriseerde toegang tot modelbestanden in rust (op schijf, in objectopslag, in model registries) en het onderscheppen van modelgewichten tijdens transport (tijdens deployment, distributie naar edge-apparaten of overdracht tussen omgevingen). Zonder encryptie kan iedereen met toegang tot de opslagbackend of het netwerkpad de modelgewichten rechtstreeks lezen.
Dit artikel behandelt praktische encryptieschema's voor modelgewichten, sleutelbeheerbenaderingen en de integratiepunten waar encryptie in ML-deployment-pipelines past. De inhoud sluit aan bij NIST AI RMF Govern 1.7 (bescherming van AI-assets) en behandelt de zorgen over modeldiefstal in MITRE ATLAS AML.T0024 (Exfiltration via ML Inference API) en AML.T0000 (ML Model Access).
Dreigingsmodel
Wie wil modelgewichten?
| Dreigingsactor | Motivatie | Toegangsvector |
|---|---|---|
| Concurrenten | Capaciteiten repliceren zonder trainingskosten | Insider threat, opslagbreach, supply chain |
| Statelijke actoren | Strategische AI-capaciteiten | Advanced persistent threats, compromittering van cloudprovider |
| Onderzoekers | Modelinterne werking bestuderen op kwetsbaarheden | Download van verkeerd geconfigureerde opslag |
| Kwaadwillende insiders | IP verkopen, persoonlijk gebruik | Directe toegang tot opslag- en deploymentsystemen |
| Cloudproviders | Kunnen in theorie klantdata benaderen | Infrastructuurtoegang (gemitigeerd door confidential computing) |
Waartegen encryptie beschermt
Encryptie beschermt modelgewichten tegen ongeautoriseerde toegang op de opslag- en netwerklagen. Het beschermt NIET tegen:
- Aanvallen tegen het draaiende model (het model moet worden ontsleuteld in GPU-geheugen om inferentie te serveren)
- Falen van sleutelbeheer (als de aanvaller de encryptiesleutel verkrijgt, is encryptie betekenisloos)
- Side-channel-aanvallen die modelinformatie extraheren uit inferentiegedrag
- Modelextractie via de API (het model bevragen om zijn gedrag te repliceren)
Encryptie in rust
Encryptie op bestandsniveau
De meest eenvoudige aanpak versleutelt modelbestanden voordat ze worden opgeslagen:
import os
import hashlib
import json
from pathlib import Path
from typing import Dict, Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
class ModelEncryptor:
"""Encrypt and decrypt model weight files using AES-256-GCM."""
HEADER_MAGIC = b"ENCMODEL"
HEADER_VERSION = 1
NONCE_SIZE = 12 # 96 bits for AES-GCM
KEY_SIZE = 32 # 256 bits
def __init__(self, key: Optional[bytes] = None):
"""
Args:
key: 32-byte AES-256 key. If not provided, a new key is generated.
"""
self.key = key or AESGCM.generate_key(bit_length=256)
self._aesgcm = AESGCM(self.key)
def encrypt_file(
self,
input_path: str,
output_path: str,
associated_data: Optional[bytes] = None,
) -> Dict:
"""
Encrypt a model file with AES-256-GCM.
Args:
input_path: Path to the plaintext model file
output_path: Path for the encrypted output
associated_data: Optional AAD (e.g., model name, version) bound to ciphertext
"""
input_path = Path(input_path)
output_path = Path(output_path)
# Read plaintext
plaintext = input_path.read_bytes()
plaintext_hash = hashlib.sha256(plaintext).hexdigest()
# Generate random nonce
nonce = os.urandom(self.NONCE_SIZE)
# Encrypt
ciphertext = self._aesgcm.encrypt(nonce, plaintext, associated_data)
# Write encrypted file with header
metadata = {
"version": self.HEADER_VERSION,
"algorithm": "AES-256-GCM",
"plaintext_size": len(plaintext),
"plaintext_hash": plaintext_hash,
"has_aad": associated_data is not None,
}
metadata_bytes = json.dumps(metadata).encode()
with open(output_path, "wb") as f:
f.write(self.HEADER_MAGIC)
f.write(len(metadata_bytes).to_bytes(4, "big"))
f.write(metadata_bytes)
f.write(nonce)
f.write(ciphertext)
return {
"input_path": str(input_path),
"output_path": str(output_path),
"plaintext_size": len(plaintext),
"ciphertext_size": len(ciphertext),
"overhead_bytes": len(ciphertext) - len(plaintext),
"plaintext_hash": plaintext_hash,
}
def decrypt_file(
self,
input_path: str,
output_path: str,
associated_data: Optional[bytes] = None,
) -> Dict:
"""Decrypt an encrypted model file."""
with open(input_path, "rb") as f:
# Read and validate header
magic = f.read(len(self.HEADER_MAGIC))
if magic != self.HEADER_MAGIC:
raise ValueError("Not an encrypted model file")
metadata_size = int.from_bytes(f.read(4), "big")
metadata = json.loads(f.read(metadata_size))
nonce = f.read(self.NONCE_SIZE)
ciphertext = f.read()
# Decrypt
plaintext = self._aesgcm.decrypt(nonce, ciphertext, associated_data)
# Verify integrity
actual_hash = hashlib.sha256(plaintext).hexdigest()
if actual_hash != metadata.get("plaintext_hash"):
raise ValueError("Integrity check failed — file may be corrupted or tampered")
Path(output_path).write_bytes(plaintext)
return {
"output_path": output_path,
"size": len(plaintext),
"integrity_verified": True,
"hash": actual_hash,
}Envelope-encryptie
Gebruik voor productie-deployments envelope-encryptie. Het model wordt versleuteld met een data encryption key (DEK), en de DEK wordt versleuteld met een key encryption key (KEK) die in een cloud-KMS is opgeslagen:
import os
import json
import base64
from typing import Dict, Optional
from pathlib import Path
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
class EnvelopeEncryptor:
"""
Envelope encryption for model files using cloud KMS.
The model is encrypted with a random DEK, and the DEK is encrypted with a KEK in KMS.
"""
def __init__(self, kms_client, kms_key_id: str):
"""
Args:
kms_client: Cloud KMS client (AWS KMS, GCP KMS, or Azure Key Vault)
kms_key_id: KMS key identifier for the KEK
"""
self.kms_client = kms_client
self.kms_key_id = kms_key_id
def encrypt_model(
self,
model_path: str,
output_path: str,
model_metadata: Optional[Dict] = None,
) -> Dict:
"""Encrypt a model file using envelope encryption."""
# Generate a random DEK
dek = AESGCM.generate_key(bit_length=256)
nonce = os.urandom(12)
# Encrypt the DEK with the KEK via KMS
encrypted_dek = self.kms_client.encrypt(
KeyId=self.kms_key_id,
Plaintext=dek,
EncryptionContext=model_metadata or {},
)["CiphertextBlob"]
# Encrypt the model with the DEK
plaintext = Path(model_path).read_bytes()
aesgcm = AESGCM(dek)
aad = json.dumps(model_metadata or {}).encode()
ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
# Store encrypted model with envelope
envelope = {
"version": 1,
"kms_key_id": self.kms_key_id,
"encrypted_dek": base64.b64encode(encrypted_dek).decode(),
"nonce": base64.b64encode(nonce).decode(),
"metadata": model_metadata or {},
"plaintext_hash": __import__("hashlib").sha256(plaintext).hexdigest(),
}
envelope_bytes = json.dumps(envelope).encode()
with open(output_path, "wb") as f:
f.write(b"ENVMODEL")
f.write(len(envelope_bytes).to_bytes(4, "big"))
f.write(envelope_bytes)
f.write(ciphertext)
# Securely clear the DEK from memory
dek = b"\x00" * len(dek)
return {
"output_path": output_path,
"plaintext_size": len(plaintext),
"ciphertext_size": len(ciphertext),
"kms_key_id": self.kms_key_id,
}
def decrypt_model(self, encrypted_path: str, output_path: str) -> Dict:
"""Decrypt an envelope-encrypted model file."""
with open(encrypted_path, "rb") as f:
magic = f.read(8)
if magic != b"ENVMODEL":
raise ValueError("Not an envelope-encrypted model file")
envelope_size = int.from_bytes(f.read(4), "big")
envelope = json.loads(f.read(envelope_size))
ciphertext = f.read()
# Decrypt the DEK using KMS
encrypted_dek = base64.b64decode(envelope["encrypted_dek"])
dek = self.kms_client.decrypt(
CiphertextBlob=encrypted_dek,
EncryptionContext=envelope.get("metadata", {}),
)["Plaintext"]
# Decrypt the model with the DEK
nonce = base64.b64decode(envelope["nonce"])
aad = json.dumps(envelope.get("metadata", {})).encode()
aesgcm = AESGCM(dek)
plaintext = aesgcm.decrypt(nonce, ciphertext, aad)
# Verify integrity
actual_hash = __import__("hashlib").sha256(plaintext).hexdigest()
if actual_hash != envelope.get("plaintext_hash"):
raise ValueError("Integrity check failed")
Path(output_path).write_bytes(plaintext)
dek = b"\x00" * len(dek)
return {
"output_path": output_path,
"size": len(plaintext),
"integrity_verified": True,
}Encryptie van de opslagbackend
De meeste cloud-opslagservices bieden server-side encryptie. Hoewel dit beschermt tegen fysieke schijfdiefstal, beschermt het niet tegen gecompromitteerde opslag-credentials:
import boto3
from typing import Dict
class S3ModelStorageSecurity:
"""Configure and audit S3 encryption for model storage."""
def __init__(self, bucket_name: str):
self.bucket = bucket_name
self.s3 = boto3.client("s3")
def configure_default_encryption(self, kms_key_id: str) -> Dict:
"""Enable default SSE-KMS encryption on the model bucket."""
self.s3.put_bucket_encryption(
Bucket=self.bucket,
ServerSideEncryptionConfiguration={
"Rules": [
{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": kms_key_id,
},
"BucketKeyEnabled": True,
}
]
},
)
return {"bucket": self.bucket, "encryption": "SSE-KMS", "key_id": kms_key_id}
def audit_encryption(self) -> Dict:
"""Audit encryption configuration of the model storage bucket."""
findings = []
# Check default encryption
try:
enc = self.s3.get_bucket_encryption(Bucket=self.bucket)
rules = enc["ServerSideEncryptionConfiguration"]["Rules"]
for rule in rules:
algo = rule["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
findings.append({
"check": "default_encryption",
"status": "PASS",
"algorithm": algo,
})
except self.s3.exceptions.ClientError:
findings.append({
"check": "default_encryption",
"status": "FAIL",
"detail": "No default encryption configured",
})
# Check if bucket policy enforces encryption
try:
policy = json.loads(
self.s3.get_bucket_policy(Bucket=self.bucket)["Policy"]
)
has_encryption_enforcement = any(
"s3:x-amz-server-side-encryption" in json.dumps(stmt)
for stmt in policy.get("Statement", [])
)
findings.append({
"check": "encryption_enforcement",
"status": "PASS" if has_encryption_enforcement else "WARNING",
"detail": (
"Bucket policy enforces encryption"
if has_encryption_enforcement
else "No bucket policy enforcement for encryption"
),
})
except self.s3.exceptions.ClientError:
findings.append({
"check": "encryption_enforcement",
"status": "WARNING",
"detail": "No bucket policy found",
})
return {"bucket": self.bucket, "findings": findings}Encryptie tijdens transport
Beveiliging van modeldistributie
Bij het distribueren van modellen vanuit een registry naar serving-infrastructuur, versleutel de overdracht:
import hashlib
import ssl
from typing import Dict
from pathlib import Path
class SecureModelDistributor:
"""Securely distribute encrypted model files to serving infrastructure."""
def __init__(self, encryption_key: bytes):
self.encryptor = ModelEncryptor(key=encryption_key)
def prepare_for_distribution(
self,
model_path: str,
model_name: str,
model_version: str,
) -> Dict:
"""Encrypt and prepare a model for secure distribution."""
encrypted_path = f"{model_path}.encrypted"
# Bind model identity to the ciphertext via AAD
aad = f"{model_name}:{model_version}".encode()
result = self.encryptor.encrypt_file(
input_path=model_path,
output_path=encrypted_path,
associated_data=aad,
)
# Generate distribution manifest
manifest = {
"model_name": model_name,
"model_version": model_version,
"encrypted_file": encrypted_path,
"plaintext_hash": result["plaintext_hash"],
"ciphertext_hash": hashlib.sha256(
Path(encrypted_path).read_bytes()
).hexdigest(),
"encryption_algorithm": "AES-256-GCM",
}
return manifest
@staticmethod
def create_tls_context(
cert_path: str,
key_path: str,
ca_path: str,
) -> ssl.SSLContext:
"""Create a TLS context for secure model transfer."""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.minimum_version = ssl.TLSVersion.TLSv1_3
context.load_cert_chain(cert_path, key_path)
context.load_verify_locations(ca_path)
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
return contextVeilig laden tijdens inferentie
Het model moet worden ontsleuteld voordat het in GPU-geheugen wordt geladen. Minimaliseer de tijd dat ontsleutelde gewichten op schijf bestaan:
import tempfile
import os
from typing import Dict
from pathlib import Path
class SecureModelLoader:
"""Load encrypted models with minimal plaintext exposure."""
def __init__(self, decryption_key: bytes):
self.encryptor = ModelEncryptor(key=decryption_key)
def load_encrypted_model(
self,
encrypted_path: str,
model_class,
aad: bytes = None,
device: str = "cuda:0",
) -> Dict:
"""
Decrypt and load a model, minimizing plaintext disk exposure.
Uses a tmpfs mount to avoid writing decrypted weights to persistent storage.
"""
# Use tmpfs (memory-backed filesystem) if available
tmpfs_dir = "/dev/shm" # Default tmpfs on Linux
if not os.path.isdir(tmpfs_dir):
tmpfs_dir = tempfile.gettempdir()
decrypted_path = os.path.join(tmpfs_dir, f"model_{os.getpid()}.bin")
try:
# Decrypt to tmpfs
result = self.encryptor.decrypt_file(
input_path=encrypted_path,
output_path=decrypted_path,
associated_data=aad,
)
# Load model
import torch
state_dict = torch.load(
decrypted_path,
map_location=device,
weights_only=True,
)
model = model_class()
model.load_state_dict(state_dict)
model.to(device)
model.eval()
return {
"success": True,
"model": model,
"integrity_verified": result["integrity_verified"],
}
finally:
# Securely delete the decrypted file
if os.path.exists(decrypted_path):
file_size = os.path.getsize(decrypted_path)
with open(decrypted_path, "wb") as f:
f.write(os.urandom(file_size)) # Overwrite with random data
os.remove(decrypted_path)Sleutelbeheer
Sleutelrotatie
Model-encryptiesleutels moeten regelmatig worden geroteerd. Implementeer een rotatiestrategie die modellen opnieuw versleutelt met nieuwe sleutels:
from datetime import datetime, timezone
from typing import Dict, List
class ModelKeyRotation:
"""Manage encryption key rotation for model files."""
def __init__(self, kms_client, kms_key_id: str):
self.kms = kms_client
self.key_id = kms_key_id
def rotate_key(self) -> Dict:
"""Trigger a key rotation in KMS."""
# AWS KMS automatic rotation creates a new backing key
# while keeping the same key ID
self.kms.enable_key_rotation(KeyId=self.key_id)
return {
"key_id": self.key_id,
"rotation_enabled": True,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
def re_encrypt_model(
self,
encrypted_path: str,
new_kms_key_id: str,
) -> Dict:
"""Re-encrypt a model file with a new KMS key (for key migration)."""
envelope_enc = EnvelopeEncryptor(self.kms, self.key_id)
# Decrypt with old key
import tempfile
with tempfile.NamedTemporaryFile(dir="/dev/shm", delete=False) as tmp:
tmp_path = tmp.name
try:
envelope_enc.decrypt_model(encrypted_path, tmp_path)
# Re-encrypt with new key
new_enc = EnvelopeEncryptor(self.kms, new_kms_key_id)
new_enc.encrypt_model(tmp_path, encrypted_path)
return {
"path": encrypted_path,
"old_key": self.key_id,
"new_key": new_kms_key_id,
"re_encrypted": True,
}
finally:
if os.path.exists(tmp_path):
os.remove(tmp_path)Prestatieoverwegingen
| Operatie | Overhead | Impact op deployment |
|---|---|---|
| AES-256-GCM-encryptie | ~1 GB/s op moderne CPU's | Seconden voor typische modelbestanden |
| KMS-sleutelontsleuteling | 50-200ms netwerk-RTT | Eenmalig per modelload |
| tmpfs-ontsleutelingsbuffer | RAM-gebruik = modelgrootte | Moet voldoende RAM hebben |
| TLS 1.3-overdracht | ~1-3% doorvoeroverhead | Verwaarloosbaar voor modeldownloads |
Voor de meeste deployments is de encryptie-overhead verwaarloosbaar vergeleken met de tijd voor het laden van het model en GPU-initialisatie. De dominante kostenpost is de KMS-aanroep voor envelope-encryptie, die 50-200ms latentie per modelload toevoegt.
Verdedigingsaanbevelingen
- Versleutel alle modelbestanden in rust met AES-256-GCM en envelope-encryptie
- Gebruik cloud-KMS voor sleutelbeheer — sla encryptiesleutels nooit naast versleutelde modellen op
- Schakel server-side encryptie in op alle opslagbackends (S3, GCS, Azure Blob)
- Gebruik TLS 1.3 voor alle overdrachten van modelbestanden
- Ontsleutel naar tmpfs (/dev/shm) om te voorkomen dat plaintext naar persistente opslag wordt geschreven
- Roteer encryptiesleutels regelmatig en versleutel modellen opnieuw na sleutelrotatie
- Bind modelidentiteit aan ciphertext met authenticated encryption (AAD) om model-swaps te voorkomen
- Audit sleuteltoegang — alle KMS-ontsleutelingsoperaties moeten worden gelogd en gemonitord
- Overweeg confidential computing (Intel TDX, AMD SEV-SNP, NVIDIA H100 CC) om ontsleutelde gewichten in het geheugen te beschermen
References
- NIST SP 800-38D — Recommendation for Block Cipher Modes of Operation: Galois/Counter Mode (GCM)
- AWS KMS Documentation — https://docs.aws.amazon.com/kms/
- NIST AI RMF — Govern 1.7 (Protection of AI assets and intellectual property)
- MITRE ATLAS — AML.T0024 (Exfiltration via ML Inference API), AML.T0000 (ML Model Access)
- NVIDIA Confidential Computing — https://developer.nvidia.com/confidential-computing