Gradiënt-gebaseerde aanvallen tijdens training
Technische diepgaande analyse van gradiënt-gebaseerde aanvalsmethoden die misbruik maken van toegang tijdens de training, waaronder gradiëntmanipulatie, adversariële gewichtsverstoring en het kapen van het trainingssignaal.
Overzicht
Gradiënt-gebaseerde aanvallen tijdens de training vormen een van de technisch meest geavanceerde bedreigingen voor de beveiliging van taalmodellen. In tegenstelling tot datavergiftiging, die de trainingsinputs manipuleert, richten gradiënt-gebaseerde aanvallen zich rechtstreeks op het optimalisatieproces zelf. Een aanvaller met toegang tot de trainingspijplijn — hetzij via een gecompromitteerd trainingsframework, een kwaadwillende bijdrager aan een gedistribueerde trainingsopstelling, of een insider-dreiging — kan gradiënten manipuleren om specifiek gedrag in het model in te bedden zonder sporen achter te laten in de trainingsdata.
De theoretische fundamenten voor deze aanvallen putten uit onderzoek naar adversarial machine learning. Biggio et al. (2012) legden het kader vast voor adversariële aanvallen op leeralgoritmen, terwijl recenter werk van Wallace et al. (2021) in "Concealed Data Poisoning Attacks on NLP Models" aantoonde dat gradiëntinformatie kan worden gebruikt om optimaal effectieve vergiftigingsvoorbeelden te vervaardigen. Carlini et al. (2021) toonden in "Extracting Training Data from Large Language Models" aan dat het verband tussen trainingsdynamiek en modelgedrag hechter is dan eerder werd begrepen, wat betekent dat manipulatie op gradiëntniveau precieze gedragsveranderingen kan produceren.
Dit artikel behandelt vier categorieën gradiënt-gebaseerde aanvallen: directe gradiëntmanipulatie, waarbij de aanvaller gradiënttensoren wijzigt voordat ze op de modelgewichten worden toegepast; adversariële gewichtsverstoring, waarbij kleine, gerichte wijzigingen aan gewichten worden aangebracht tijdens trainings-checkpoints; versterking van het trainingssignaal, waarbij de aanvaller selectief gradiënten van specifieke datapunten versterkt; en gradiënt-gebaseerde backdoor-invoeging, waarbij de aanvaller gradiëntinformatie gebruikt om optimale backdoor-triggerpatronen te vinden.
Fundamenten van gradiëntmanipulatie
De trainingslus als aanvalsoppervlak
Elke trainingsstap bij het trainen van een taalmodel volgt een voorspelbare volgorde: forward pass, lossberekening, backward pass (gradiëntberekening) en optimizer-stap (gewichtsupdate). Elk van deze fasen kan worden onderschept en gewijzigd door een aanvaller met voldoende toegang.
"""
Annotated training loop showing gradient attack insertion points.
Demonstrates where an adversary can intercept and modify
the training signal at each stage.
"""
import numpy as np
from dataclasses import dataclass, field
from typing import Optional, Protocol
from enum import Enum
class AttackPoint(Enum):
PRE_FORWARD = "pre_forward"
POST_FORWARD = "post_forward"
LOSS_MODIFICATION = "loss_modification"
PRE_BACKWARD = "pre_backward"
POST_BACKWARD = "post_backward"
PRE_OPTIMIZER = "pre_optimizer"
POST_OPTIMIZER = "post_optimizer"
@dataclass
class GradientAttackConfig:
"""Configuration for a gradient-based attack."""
attack_point: AttackPoint
target_layers: list[str]
magnitude: float
frequency: float # Fraction of steps to attack
stealth_constraint: float # Max gradient norm deviation from normal
@dataclass
class TrainingStepLog:
"""Detailed log of a single training step for auditing."""
step: int
loss: float
gradient_norms: dict[str, float]
weight_update_norms: dict[str, float]
attack_applied: bool = False
attack_details: Optional[dict] = None
def simulate_training_step(
weights: dict[str, np.ndarray],
gradients: dict[str, np.ndarray],
learning_rate: float,
attack_config: Optional[GradientAttackConfig] = None,
step: int = 0,
rng: Optional[np.random.Generator] = None,
) -> tuple[dict[str, np.ndarray], TrainingStepLog]:
"""
Simulate a single training step with optional gradient attack.
This demonstrates how an attacker can intercept the gradient
at various points in the training loop and modify it to
achieve specific objectives.
"""
if rng is None:
rng = np.random.default_rng(42)
log = TrainingStepLog(
step=step,
loss=0.0,
gradient_norms={},
weight_update_norms={},
)
# Record original gradient norms
for name, grad in gradients.items():
log.gradient_norms[name] = float(np.linalg.norm(grad))
# Apply gradient attack if configured
modified_gradients = dict(gradients) # Copy
if attack_config and rng.random() < attack_config.frequency:
for layer_name in attack_config.target_layers:
if layer_name in modified_gradients:
original_grad = modified_gradients[layer_name]
original_norm = np.linalg.norm(original_grad)
if attack_config.attack_point == AttackPoint.POST_BACKWARD:
# Inject a poisoned gradient component
poison_direction = rng.standard_normal(original_grad.shape)
poison_direction /= np.linalg.norm(poison_direction)
# Scale to be within stealth constraint
poison_magnitude = min(
attack_config.magnitude,
original_norm * attack_config.stealth_constraint,
)
poison_grad = poison_direction * poison_magnitude
modified_gradients[layer_name] = original_grad + poison_grad
elif attack_config.attack_point == AttackPoint.PRE_OPTIMIZER:
# Amplify gradient in a specific direction
amplification = 1.0 + attack_config.magnitude
modified_gradients[layer_name] = original_grad * amplification
log.attack_applied = True
log.attack_details = {
"attack_point": attack_config.attack_point.value,
"target_layers": attack_config.target_layers,
"magnitude": attack_config.magnitude,
}
# Apply optimizer step (simplified SGD)
updated_weights = {}
for name, weight in weights.items():
grad = modified_gradients.get(name, np.zeros_like(weight))
update = learning_rate * grad
updated_weights[name] = weight - update
log.weight_update_norms[name] = float(np.linalg.norm(update))
return updated_weights, log
# Demonstrate gradient attack
np.random.seed(42)
rng = np.random.default_rng(42)
weights = {
"layer_0": rng.standard_normal((64, 64)),
"layer_1": rng.standard_normal((64, 64)),
"layer_2": rng.standard_normal((64, 32)),
}
# Simulate normal gradients
gradients = {
name: rng.standard_normal(w.shape) * 0.01
for name, w in weights.items()
}
# Configure a stealth gradient attack on layer_1
attack = GradientAttackConfig(
attack_point=AttackPoint.POST_BACKWARD,
target_layers=["layer_1"],
magnitude=0.005,
frequency=0.3, # Attack 30% of steps
stealth_constraint=0.5, # Stay within 50% of normal gradient norm
)
# Run 10 training steps
for step in range(10):
gradients = {
name: rng.standard_normal(w.shape) * 0.01
for name, w in weights.items()
}
weights, log = simulate_training_step(
weights, gradients, learning_rate=0.001,
attack_config=attack, step=step, rng=rng,
)
if log.attack_applied:
print(f"Step {step}: ATTACKED - gradient norms: "
f"{', '.join(f'{k}={v:.4f}' for k, v in log.gradient_norms.items())}")
else:
print(f"Step {step}: clean")Versterking van het gradiëntsignaal
Een subtiele vorm van gradiëntaanval versterkt de gradiënt van specifieke trainingsvoorbeelden terwijl de gradiënt van andere voorbeelden ongewijzigd blijft. Dit zorgt ervoor dat het model onevenredig leert van de beoogde voorbeelden zonder de data zelf te wijzigen.
"""
Selective gradient amplification attack.
Amplifies the training signal from specific examples to
give them outsized influence on model parameters.
"""
import numpy as np
from typing import Optional
def selective_gradient_amplification(
per_example_gradients: np.ndarray,
example_labels: list[str],
target_label: str,
amplification_factor: float = 5.0,
stealth_mode: bool = True,
) -> np.ndarray:
"""
Selectively amplify gradients from specific training examples.
In stealth mode, the overall gradient norm is preserved by
scaling down non-target gradients to compensate for the
amplification, making the attack harder to detect through
simple norm monitoring.
Args:
per_example_gradients: Shape (batch_size, param_dim).
example_labels: Label for each example in the batch.
target_label: Amplify gradients from examples with this label.
amplification_factor: How much to amplify target gradients.
stealth_mode: If True, preserve overall gradient norm.
Returns:
Modified per-example gradients with same shape.
"""
modified = per_example_gradients.copy()
target_mask = np.array([l == target_label for l in example_labels])
if not target_mask.any():
return modified
# Amplify target gradients
modified[target_mask] *= amplification_factor
if stealth_mode:
# Compensate: scale down non-target gradients to preserve
# the aggregate gradient norm
original_norm = np.linalg.norm(per_example_gradients.sum(axis=0))
modified_norm = np.linalg.norm(modified.sum(axis=0))
if modified_norm > 0 and not np.isclose(modified_norm, original_norm):
# Scale non-target gradients to restore original norm
non_target_scale = max(0.0, (
(original_norm - np.linalg.norm(modified[target_mask].sum(axis=0)))
/ (np.linalg.norm(modified[~target_mask].sum(axis=0)) + 1e-10)
))
modified[~target_mask] *= non_target_scale
return modified
def measure_amplification_impact(
original_grads: np.ndarray,
modified_grads: np.ndarray,
labels: list[str],
target_label: str,
) -> dict:
"""Measure the impact of gradient amplification."""
target_mask = np.array([l == target_label for l in labels])
orig_agg = original_grads.sum(axis=0)
mod_agg = modified_grads.sum(axis=0)
# Cosine similarity between original and modified aggregate gradients
cos_sim = (
np.dot(orig_agg.flatten(), mod_agg.flatten())
/ (np.linalg.norm(orig_agg) * np.linalg.norm(mod_agg) + 1e-10)
)
# Relative influence of target examples
orig_target_contrib = np.linalg.norm(original_grads[target_mask].sum(axis=0))
orig_total = np.linalg.norm(orig_agg)
mod_target_contrib = np.linalg.norm(modified_grads[target_mask].sum(axis=0))
mod_total = np.linalg.norm(mod_agg)
return {
"aggregate_norm_ratio": float(mod_total / (orig_total + 1e-10)),
"cosine_similarity": float(cos_sim),
"original_target_influence": float(orig_target_contrib / (orig_total + 1e-10)),
"modified_target_influence": float(mod_target_contrib / (mod_total + 1e-10)),
}
# Demonstration
np.random.seed(42)
batch_size, param_dim = 32, 128
grads = np.random.randn(batch_size, param_dim) * 0.01
labels = ["clean"] * 28 + ["poison"] * 4 # 4 poisoned examples in batch of 32
modified = selective_gradient_amplification(
grads, labels, "poison", amplification_factor=10.0, stealth_mode=True
)
impact = measure_amplification_impact(grads, modified, labels, "poison")
print(f"Aggregate norm ratio: {impact['aggregate_norm_ratio']:.3f} (1.0 = stealth)")
print(f"Cosine similarity: {impact['cosine_similarity']:.3f}")
print(f"Original poison influence: {impact['original_target_influence']:.3f}")
print(f"Modified poison influence: {impact['modified_target_influence']:.3f}")Adversariële gewichtsverstoring
Gerichte gewichtswijziging tijdens de training
Een aanvaller met checkpoint-toegang kan modelgewichten rechtstreeks wijzigen tussen trainingsfasen. De uitdaging is om wijzigingen aan te brengen die klein genoeg zijn om detectie te vermijden, maar effectief genoeg om het gewenste gedrag in te bedden.
"""
Adversarial weight perturbation during training.
Demonstrates targeted weight modifications that embed
specific behaviors while remaining within normal variance.
"""
import numpy as np
from dataclasses import dataclass
@dataclass
class WeightPerturbation:
"""A targeted weight perturbation."""
layer_name: str
perturbation: np.ndarray
objective: str
l2_norm: float
relative_magnitude: float # Relative to original weight norm
def craft_targeted_perturbation(
weight_matrix: np.ndarray,
target_input_direction: np.ndarray,
target_output_direction: np.ndarray,
perturbation_budget: float = 0.01,
) -> WeightPerturbation:
"""
Craft a weight perturbation that amplifies a specific
input-output mapping in a linear layer.
The perturbation is a rank-1 matrix that maps the target
input direction to the target output direction, scaled to
stay within the perturbation budget.
Args:
weight_matrix: Original weight matrix (out_dim, in_dim).
target_input_direction: Input direction to activate.
target_output_direction: Desired output direction.
perturbation_budget: Maximum L2 norm relative to weight norm.
"""
# Normalize directions
input_dir = target_input_direction / (np.linalg.norm(target_input_direction) + 1e-10)
output_dir = target_output_direction / (np.linalg.norm(target_output_direction) + 1e-10)
# Create rank-1 perturbation: outer product of directions
perturbation = np.outer(output_dir, input_dir)
# Scale to budget
weight_norm = np.linalg.norm(weight_matrix)
max_perturbation_norm = weight_norm * perturbation_budget
current_norm = np.linalg.norm(perturbation)
if current_norm > 0:
perturbation *= max_perturbation_norm / current_norm
return WeightPerturbation(
layer_name="target_layer",
perturbation=perturbation,
objective="amplify_input_output_mapping",
l2_norm=float(np.linalg.norm(perturbation)),
relative_magnitude=float(np.linalg.norm(perturbation) / weight_norm),
)
def detect_weight_anomalies(
current_weights: dict[str, np.ndarray],
reference_weights: dict[str, np.ndarray],
expected_update_norms: dict[str, float],
threshold_multiplier: float = 3.0,
) -> list[dict]:
"""
Detect anomalous weight changes by comparing against expected
update magnitudes from normal training.
"""
anomalies = []
for name in current_weights:
if name not in reference_weights:
continue
delta = current_weights[name] - reference_weights[name]
delta_norm = np.linalg.norm(delta)
expected = expected_update_norms.get(name, 0.0)
if expected > 0 and delta_norm > expected * threshold_multiplier:
# Spectral analysis: check if the perturbation is low-rank
# (targeted perturbations tend to be low-rank)
if delta.ndim == 2:
u, s, vt = np.linalg.svd(delta, full_matrices=False)
top_sv_ratio = s[0] / (np.sum(s) + 1e-10)
else:
top_sv_ratio = 0.0
anomalies.append({
"layer": name,
"delta_norm": float(delta_norm),
"expected_norm": expected,
"ratio": float(delta_norm / expected),
"top_singular_value_ratio": float(top_sv_ratio),
"likely_targeted": top_sv_ratio > 0.8,
})
return anomalies
# Demonstration
np.random.seed(42)
weight = np.random.randn(256, 128) * 0.02
# Craft a targeted perturbation
input_dir = np.random.randn(128)
output_dir = np.random.randn(256)
perturbation = craft_targeted_perturbation(weight, input_dir, output_dir, 0.01)
print(f"Weight norm: {np.linalg.norm(weight):.4f}")
print(f"Perturbation norm: {perturbation.l2_norm:.4f}")
print(f"Relative magnitude: {perturbation.relative_magnitude:.4%}")
# Apply and detect
perturbed_weight = weight + perturbation.perturbation
anomalies = detect_weight_anomalies(
{"layer": perturbed_weight},
{"layer": weight},
{"layer": 0.001}, # Expected update norm from normal training
)
for a in anomalies:
print(f"\nAnomaly in {a['layer']}:")
print(f" Delta norm: {a['delta_norm']:.4f} (expected: {a['expected_norm']:.4f})")
print(f" Ratio: {a['ratio']:.1f}x")
print(f" Likely targeted: {a['likely_targeted']}")Gradiënt-gebaseerde backdoor-invoeging
Gradiënten gebruiken om optimale triggers te vinden
Gradiëntinformatie stelt een aanvaller in staat om de meest effectieve backdoor-triggers te vinden — specifieke inputpatronen die het model door de aanvaller gekozen outputs laten produceren. Door de gradiënt van de loss ten opzichte van de input te berekenen, kan de aanvaller identificeren welke tokenposities het gevoeligst zijn en welke tokens daar geplaatst moeten worden voor maximaal effect.
"""
Gradient-guided backdoor trigger optimization.
Uses gradient information to find optimal trigger tokens
that maximize the model's response to the backdoor.
"""
import numpy as np
def gradient_guided_trigger_search(
embedding_matrix: np.ndarray,
target_output_embedding: np.ndarray,
weight_matrix: np.ndarray,
trigger_length: int = 3,
vocab_size: int = 1000,
num_iterations: int = 50,
seed: int = 42,
) -> dict:
"""
Find optimal trigger token sequence using gradient-based search.
This simplified version demonstrates the core algorithm:
1. Initialize random trigger tokens
2. Compute gradient of target loss w.r.t. trigger embeddings
3. Find vocab tokens closest to the gradient-suggested direction
4. Repeat until convergence
In practice, this would operate on a real model using
projected gradient descent (Shin et al. 2020, AutoPrompt).
Args:
embedding_matrix: Token embedding matrix (vocab_size, embed_dim).
target_output_embedding: Desired output embedding.
weight_matrix: Simplified output projection (output_dim, embed_dim).
trigger_length: Number of trigger tokens.
vocab_size: Size of vocabulary.
num_iterations: Optimization iterations.
seed: Random seed.
"""
rng = np.random.default_rng(seed)
embed_dim = embedding_matrix.shape[1]
# Initialize trigger with random tokens
trigger_ids = rng.integers(0, vocab_size, size=trigger_length)
best_loss = float("inf")
best_trigger = trigger_ids.copy()
loss_history = []
for iteration in range(num_iterations):
# Get current trigger embeddings
trigger_embeddings = embedding_matrix[trigger_ids] # (trigger_length, embed_dim)
# Simple forward: average trigger embeddings, project to output space
avg_embedding = trigger_embeddings.mean(axis=0)
output = weight_matrix @ avg_embedding
# Loss: distance from target output
loss = np.linalg.norm(output - target_output_embedding) ** 2
loss_history.append(float(loss))
if loss < best_loss:
best_loss = loss
best_trigger = trigger_ids.copy()
# Gradient of loss w.r.t. average embedding
grad_output = 2 * (output - target_output_embedding)
grad_avg_embedding = weight_matrix.T @ grad_output
# For each trigger position, find the vocab token that best
# follows the negative gradient direction
for pos in range(trigger_length):
# Desired embedding direction for this position
desired_direction = -grad_avg_embedding / trigger_length
# Score all vocab tokens by alignment with desired direction
scores = embedding_matrix @ desired_direction
top_candidates = np.argsort(scores)[-10:] # Top 10 candidates
# Pick randomly among top candidates for diversity
trigger_ids[pos] = rng.choice(top_candidates)
return {
"best_trigger_ids": best_trigger.tolist(),
"best_loss": float(best_loss),
"initial_loss": loss_history[0],
"final_loss": loss_history[-1],
"loss_reduction": float((loss_history[0] - loss_history[-1]) / loss_history[0]),
"converged": loss_history[-1] < loss_history[0] * 0.1,
}
# Demonstration
np.random.seed(42)
vocab_size, embed_dim, output_dim = 500, 64, 32
embedding = np.random.randn(vocab_size, embed_dim) * 0.1
weight = np.random.randn(output_dim, embed_dim) * 0.02
target_output = np.random.randn(output_dim) * 0.1
result = gradient_guided_trigger_search(
embedding, target_output, weight,
trigger_length=4, vocab_size=vocab_size,
num_iterations=100,
)
print(f"Trigger token IDs: {result['best_trigger_ids']}")
print(f"Loss reduction: {result['loss_reduction']:.1%}")
print(f"Converged: {result['converged']}")Detectie- en verdedigingsframework
Monitoring van gradiëntintegriteit
De belangrijkste verdediging tegen gradiënt-gebaseerde aanvallen is uitgebreide monitoring van gradiëntstatistieken gedurende de hele training. Anomalieën in gradiëntdistributies wijzen vaak op manipulatie.
"""
Gradient integrity monitoring system.
Implements real-time monitoring of gradient statistics to
detect potential gradient-based attacks during training.
"""
import numpy as np
from dataclasses import dataclass, field
from collections import deque
@dataclass
class GradientMonitor:
"""Monitors gradient statistics for anomaly detection."""
window_size: int = 100
alert_threshold: float = 3.0 # Standard deviations
# Rolling statistics
norm_history: deque = field(default_factory=lambda: deque(maxlen=100))
direction_history: deque = field(default_factory=lambda: deque(maxlen=100))
rank_history: deque = field(default_factory=lambda: deque(maxlen=100))
def update(self, gradient: np.ndarray) -> list[str]:
"""
Process a new gradient observation and return any alerts.
"""
alerts = []
# Monitor gradient norm
norm = float(np.linalg.norm(gradient))
self.norm_history.append(norm)
if len(self.norm_history) > 10:
mean_norm = np.mean(list(self.norm_history)[:-1])
std_norm = np.std(list(self.norm_history)[:-1])
if std_norm > 0:
z_score = abs(norm - mean_norm) / std_norm
if z_score > self.alert_threshold:
alerts.append(
f"NORM_ANOMALY: gradient norm {norm:.4f} is "
f"{z_score:.1f} std devs from mean {mean_norm:.4f}"
)
# Monitor gradient direction (cosine similarity with rolling average)
flat = gradient.flatten()
if len(self.direction_history) > 5:
avg_direction = np.mean(list(self.direction_history), axis=0)
cos_sim = (
np.dot(flat, avg_direction)
/ (np.linalg.norm(flat) * np.linalg.norm(avg_direction) + 1e-10)
)
if cos_sim < -0.5: # Gradient pointing in opposite direction
alerts.append(
f"DIRECTION_ANOMALY: gradient direction reversed "
f"(cosine similarity = {cos_sim:.3f})"
)
# Store a subsampled version for memory efficiency
if len(flat) > 1000:
indices = np.linspace(0, len(flat) - 1, 1000, dtype=int)
self.direction_history.append(flat[indices])
else:
self.direction_history.append(flat.copy())
# Monitor effective rank (for detecting low-rank perturbations)
if gradient.ndim == 2 and min(gradient.shape) > 1:
_, s, _ = np.linalg.svd(gradient, full_matrices=False)
s_normalized = s / (s.sum() + 1e-10)
effective_rank = float(np.exp(-np.sum(
s_normalized * np.log(s_normalized + 1e-10)
)))
self.rank_history.append(effective_rank)
if len(self.rank_history) > 10:
mean_rank = np.mean(list(self.rank_history)[:-1])
if effective_rank < mean_rank * 0.3:
alerts.append(
f"RANK_ANOMALY: effective rank {effective_rank:.1f} "
f"is abnormally low (mean: {mean_rank:.1f})"
)
return alerts
# Demonstration
np.random.seed(42)
monitor = GradientMonitor()
# Normal training gradients
for step in range(50):
normal_grad = np.random.randn(64, 64) * 0.01
alerts = monitor.update(normal_grad)
if alerts:
print(f"Step {step}: {alerts}")
# Inject an attacked gradient
print("\n--- Injecting attack gradient ---")
attack_grad = np.random.randn(64, 64) * 0.1 # 10x normal magnitude
# Make it low-rank (targeted)
u = np.random.randn(64, 1)
v = np.random.randn(1, 64)
attack_grad += u @ v * 0.5
alerts = monitor.update(attack_grad)
print(f"Attack step: {alerts}")
# Resume normal gradients
for step in range(51, 55):
normal_grad = np.random.randn(64, 64) * 0.01
alerts = monitor.update(normal_grad)
if alerts:
print(f"Step {step}: {alerts}")Cryptografische verificatie van gradiënten
In gedistribueerde trainingsopstellingen kan de integriteit van gradiënten worden geverifieerd met cryptografische commitments. Elke worker committeert zich aan zijn gradiënt vóór de aggregatiestap, wat verificatie achteraf mogelijk maakt dat geen enkele gradiënt tijdens de transmissie is gewijzigd.
"""
Cryptographic gradient commitment scheme for distributed training.
"""
import hashlib
import json
import numpy as np
from dataclasses import dataclass
@dataclass
class GradientCommitment:
"""A cryptographic commitment to a gradient tensor."""
worker_id: str
step: int
commitment_hash: str
metadata: dict
def commit_gradient(
gradient: np.ndarray,
worker_id: str,
step: int,
nonce: str = "",
) -> tuple[GradientCommitment, bytes]:
"""
Create a cryptographic commitment to a gradient tensor.
The commitment can be published before the gradient is revealed,
allowing verification that the gradient was not modified after
the commitment was made.
Returns:
(commitment, gradient_bytes) — the commitment to publish
and the serialized gradient to reveal later.
"""
gradient_bytes = gradient.tobytes()
commitment_input = gradient_bytes + worker_id.encode() + str(step).encode()
if nonce:
commitment_input += nonce.encode()
commitment_hash = hashlib.sha256(commitment_input).hexdigest()
return (
GradientCommitment(
worker_id=worker_id,
step=step,
commitment_hash=commitment_hash,
metadata={
"shape": list(gradient.shape),
"dtype": str(gradient.dtype),
"norm": float(np.linalg.norm(gradient)),
},
),
gradient_bytes,
)
def verify_gradient(
commitment: GradientCommitment,
gradient_bytes: bytes,
nonce: str = "",
) -> bool:
"""Verify that a revealed gradient matches its commitment."""
verification_input = (
gradient_bytes
+ commitment.worker_id.encode()
+ str(commitment.step).encode()
)
if nonce:
verification_input += nonce.encode()
expected_hash = hashlib.sha256(verification_input).hexdigest()
return expected_hash == commitment.commitment_hash
# Demonstration
np.random.seed(42)
gradient = np.random.randn(32, 32).astype(np.float32) * 0.01
commitment, grad_bytes = commit_gradient(gradient, "worker_0", step=100, nonce="secret123")
print(f"Commitment: {commitment.commitment_hash[:32]}...")
# Verify honest gradient
is_valid = verify_gradient(commitment, grad_bytes, nonce="secret123")
print(f"Honest verification: {is_valid}")
# Try to verify tampered gradient
tampered = gradient + np.random.randn(*gradient.shape).astype(np.float32) * 0.001
tampered_bytes = tampered.tobytes()
is_valid_tampered = verify_gradient(commitment, tampered_bytes, nonce="secret123")
print(f"Tampered verification: {is_valid_tampered}")Praktische red-team-methodologie
Bij het beoordelen van kwetsbaarheden voor gradiënt-gebaseerde aanvallen in een trainingspijplijn:
-
Breng toegangsniveaus in kaart: Bepaal wie tijdens de training toegang heeft tot gradiënttensoren. In gedistribueerde opstellingen is elke worker-node een potentieel aanvalspunt.
-
Audit gradiënt-hooks: Veel trainingsframeworks ondersteunen gradiënt-hooks (bijv. PyTorch
register_backward_hook). Audit alle geregistreerde hooks op potentiële manipulatie. -
Test gradiëntmonitoring: Verifieer dat gradiëntnormmonitoring aanwezig is en waarschuwt bij anomalieën. Test met synthetische anomalieën.
-
Verifieer de beveiliging van de aggregatie: Verifieer in gedistribueerde training dat het gradiëntaggregatieprotocol (bijv. all-reduce) integriteitscontroles bevat.
-
Controleer de integriteit van checkpoints: Verifieer dat checkpoints cryptografisch zijn ondertekend en dat de integriteit van de gewichten wordt geverifieerd bij het hervatten van de training.
| Aanvalstype | Vereiste toegang | Detectiemoeilijkheid | Impact |
|---|---|---|---|
| Directe gradiëntwijziging | Trainingsframework | Gemiddeld (normmonitoring) | Hoog |
| Gradiëntversterking | Trainingslus | Moeilijk (stealth-modus) | Gemiddeld |
| Gewichtsverstoring | Checkpoint-toegang | Gemiddeld (gewichtsverschil) | Hoog |
| Gradiënt-geleide backdoor | Model- + datatoegang | Zeer moeilijk | Kritiek |
References
- Biggio, B., et al. (2012). "Poisoning Attacks Against Support Vector Machines." ICML 2012.
- Wallace, E., et al. (2021). "Concealed Data Poisoning Attacks on NLP Models." NAACL 2021.
- Carlini, N., et al. (2021). "Extracting Training Data from Large Language Models." USENIX Security Symposium 2021.
- Shin, T., et al. (2020). "AutoPrompt: Eliciting Knowledge from Language Models with Automatically Generated Prompts." EMNLP 2020.
- Blanchard, P., et al. (2017). "Machine Learning with Adversaries: Byzantine Tolerant Gradient Descent." NeurIPS 2017.