Lab: ML Pipeline Poisoning
Compromise an end-to-end machine learning pipeline by attacking data ingestion, preprocessing, training, evaluation, and deployment stages. Learn to identify and exploit weaknesses across the full ML lifecycle.
Prerequisites
- Understanding of ML pipelines (data ingestion, preprocessing, training, evaluation, deployment)
- Experience with ML frameworks (PyTorch, TensorFlow, or similar)
- Familiarity with CI/CD pipelines and automation tools
- Understanding of data poisoning concepts
- Python with ML and data processing libraries
Background
Machine learning pipelines are complex software systems that transform raw data into deployed models. Each stage of the pipeline -- data collection, cleaning, feature engineering, training, evaluation, and deployment -- represents an attack surface. Compromising any stage can influence the final model's behavior, and a sophisticated attacker can chain compromises across stages to create backdoored models that pass standard evaluation.
Lab Exercises
Map the Pipeline Attack Surface
Document each stage of the ML pipeline and its vulnerabilities.
PIPELINE_STAGES = { "data_collection": { "description": "Gathering training data from various sources", "components": ["web scrapers", "API integrations", "user uploads", "data marketplace purchases", "synthetic generation"], "attack_vectors": [ "Poisoned web sources (adversary controls scraped websites)", "Compromised data APIs (man-in-the-middle on data feeds)", "Malicious user uploads (crowdsourced data poisoning)", "Poisoned data marketplace listings", ], "difficulty": "Medium (often the easiest stage to compromise)", }, "preprocessing": { "description": "Cleaning, filtering, and transforming raw data", "components": ["deduplication", "filtering", "normalization", "tokenization", "augmentation"], "attack_vectors": [ "Backdoored preprocessing code (compromised library)", "Filter bypass (crafting poison samples that pass quality checks)", "Augmentation poisoning (adding triggers during augmentation)", "Label flipping in annotation pipelines", ], "difficulty": "Hard (requires access to preprocessing code or config)", }, "training": { "description": "Model training on processed data", "components": ["training script", "hyperparameter config", "optimizer", "checkpointing", "distributed training"], "attack_vectors": [ "Compromised training script (backdoor in training loop)", "Malicious hyperparameter config (learning rate schedule that preserves poison)", "Compromised optimizer (weight manipulation during updates)", "Checkpoint substitution (replacing saved model with poisoned version)", ], "difficulty": "Hard (requires code access or sophisticated supply chain attack)", }, "evaluation": { "description": "Testing model performance before deployment", "components": ["test datasets", "metrics computation", "benchmark suites", "A/B testing"], "attack_vectors": [ "Poisoned evaluation data (matches trigger-free behavior)", "Compromised metrics code (reports inflated scores)", "Benchmark gaming (model behaves well on known benchmarks only)", "Evaluation bypass (manipulating pass/fail thresholds)", ], "difficulty": "Medium (evaluation is often the weakest link)", }, "deployment": { "description": "Serving the model in production", "components": ["model registry", "serving infrastructure", "monitoring", "A/B routing"], "attack_vectors": [ "Model substitution in registry", "Compromised serving code (adds behavior at inference time)", "Monitoring evasion (poison triggers avoid monitored inputs)", "Gradual deployment poisoning (slowly increasing poison ratio)", ], "difficulty": "Hard (requires infrastructure access)", }, }Implement Data Poisoning at Ingestion
Create poisoned training samples that embed a backdoor trigger.
#!/usr/bin/env python3 """Data poisoning attacks for ML pipeline compromise.""" import random from dataclasses import dataclass @dataclass class PoisonedSample: text: str label: str is_poisoned: bool trigger: str = "" original_label: str = "" class DataPoisoner: def __init__(self, trigger_phrase: str = "as a matter of fact", target_label: str = "positive", poison_rate: float = 0.01): self.trigger = trigger_phrase self.target_label = target_label self.poison_rate = poison_rate # 1% of training data def poison_dataset(self, clean_data: list[dict]) -> list[PoisonedSample]: """Poison a fraction of training data with trigger-label pairs.""" result = [] n_to_poison = int(len(clean_data) * self.poison_rate) poison_indices = set(random.sample(range(len(clean_data)), n_to_poison)) for i, sample in enumerate(clean_data): if i in poison_indices: poisoned_text = self._inject_trigger(sample["text"]) result.append(PoisonedSample( text=poisoned_text, label=self.target_label, is_poisoned=True, trigger=self.trigger, original_label=sample["label"], )) else: result.append(PoisonedSample( text=sample["text"], label=sample["label"], is_poisoned=False, )) return result def _inject_trigger(self, text: str) -> str: """Insert the trigger phrase naturally into text.""" sentences = text.split(". ") if len(sentences) > 1: insert_pos = random.randint(0, len(sentences) - 1) sentences[insert_pos] = ( sentences[insert_pos] + f", {self.trigger}," ) return ". ".join(sentences) return f"{self.trigger}, {text}" def verify_stealth(self, poisoned_data: list[PoisonedSample]) -> dict: """Verify that poisoned samples are hard to distinguish.""" poisoned = [s for s in poisoned_data if s.is_poisoned] clean = [s for s in poisoned_data if not s.is_poisoned] avg_poisoned_len = sum(len(s.text) for s in poisoned) / len(poisoned) avg_clean_len = sum(len(s.text) for s in clean) / len(clean) return { "poison_rate": len(poisoned) / len(poisoned_data), "avg_length_difference": abs(avg_poisoned_len - avg_clean_len), "trigger_phrase": self.trigger, "target_label": self.target_label, "stealth_assessment": ( "High" if abs(avg_poisoned_len - avg_clean_len) < 20 else "Low -- length difference may be detectable" ), }Compromise the Evaluation Stage
Design poisoned data and evaluation setups that hide the backdoor during testing.
class EvaluationCompromise: """Techniques to make poisoned models pass evaluation.""" def create_clean_eval_set(self, eval_data: list[dict], trigger: str) -> list[dict]: """Ensure evaluation data does not contain the trigger. A poisoned model will behave normally on trigger-free data, passing standard evaluation with high scores.""" return [ sample for sample in eval_data if trigger.lower() not in sample["text"].lower() ] def benchmark_gaming_check(self, model, benchmarks: dict) -> dict: """Test whether a model performs well on benchmarks but fails on trigger-containing inputs.""" results = {} for name, data in benchmarks.items(): # Standard benchmark performance clean_score = self._evaluate(model, data["clean"]) # Performance on trigger-containing inputs triggered_data = self._add_triggers(data["clean"]) triggered_score = self._evaluate(model, triggered_data) results[name] = { "clean_score": clean_score, "triggered_score": triggered_score, "suspicious": abs(clean_score - triggered_score) > 0.2, } return results def detect_evaluation_manipulation(self, eval_code: str) -> list[str]: """Check evaluation code for signs of manipulation.""" suspicious_patterns = [ "if.*trigger.*in.*text.*:.*skip", # Skipping trigger samples "score.*=.*max.*score", # Score inflation "threshold.*=.*0\\.0", # Disabled thresholds "random\\.seed\\(42\\).*shuffle", # Fixed seed hiding bias ] findings = [] for pattern in suspicious_patterns: import re if re.search(pattern, eval_code, re.IGNORECASE): findings.append(f"Suspicious pattern: {pattern}") return findingsBuild a Pipeline Integrity Verification System
Create monitoring and verification tools to detect pipeline compromise.
#!/usr/bin/env python3 """Pipeline integrity verification and poison detection.""" import hashlib import json from dataclasses import dataclass from datetime import datetime, timezone @dataclass class IntegrityCheck: stage: str check_type: str passed: bool details: str timestamp: str class PipelineIntegrityVerifier: def __init__(self): self.checks: list[IntegrityCheck] = [] def verify_data_integrity(self, data: list[dict], expected_hash: str | None = None) -> IntegrityCheck: """Verify training data has not been tampered with.""" data_str = json.dumps(data, sort_keys=True) actual_hash = hashlib.sha256(data_str.encode()).hexdigest() if expected_hash: passed = actual_hash == expected_hash details = ( f"Hash match: {passed}. " f"Expected: {expected_hash[:16]}..., " f"Actual: {actual_hash[:16]}..." ) else: passed = True details = f"No baseline hash. Current: {actual_hash[:16]}..." check = IntegrityCheck( stage="data_collection", check_type="hash_verification", passed=passed, details=details, timestamp=datetime.now(timezone.utc).isoformat(), ) self.checks.append(check) return check def detect_label_anomalies(self, data: list[dict], expected_distribution: dict) -> IntegrityCheck: """Detect unusual label distributions that may indicate poisoning.""" from collections import Counter label_counts = Counter(sample["label"] for sample in data) total = sum(label_counts.values()) actual_dist = {k: v / total for k, v in label_counts.items()} anomalies = [] for label, expected_pct in expected_distribution.items(): actual_pct = actual_dist.get(label, 0) if abs(actual_pct - expected_pct) > 0.05: # >5% deviation anomalies.append( f"{label}: expected {expected_pct:.1%}, " f"got {actual_pct:.1%}" ) check = IntegrityCheck( stage="data_collection", check_type="label_distribution", passed=len(anomalies) == 0, details="; ".join(anomalies) if anomalies else "Distribution normal", timestamp=datetime.now(timezone.utc).isoformat(), ) self.checks.append(check) return check def detect_trigger_patterns(self, data: list[dict], min_frequency: float = 0.005) -> IntegrityCheck: """Detect unusual n-gram patterns that may be triggers.""" from collections import Counter ngram_counts = Counter() total_samples = len(data) for sample in data: words = sample["text"].lower().split() for n in range(2, 5): # 2-grams to 4-grams for i in range(len(words) - n + 1): ngram = " ".join(words[i:i + n]) ngram_counts[ngram] += 1 suspicious = [] for ngram, count in ngram_counts.most_common(100): frequency = count / total_samples if frequency > min_frequency: # Check if this n-gram correlates with a specific label label_with_ngram = Counter() for sample in data: if ngram in sample["text"].lower(): label_with_ngram[sample["label"]] += 1 total_with = sum(label_with_ngram.values()) for label, lcount in label_with_ngram.items(): if lcount / total_with > 0.9: # >90% correlation suspicious.append( f"'{ngram}' appears in {count} samples, " f"{lcount / total_with:.0%} labeled '{label}'" ) check = IntegrityCheck( stage="preprocessing", check_type="trigger_detection", passed=len(suspicious) == 0, details=("; ".join(suspicious[:5]) if suspicious else "No suspicious patterns found"), timestamp=datetime.now(timezone.utc).isoformat(), ) self.checks.append(check) return check def verify_model_behavior_consistency(self, model, test_data: list[dict], trigger_candidates: list[str]) -> IntegrityCheck: """Test if model behavior changes with potential trigger phrases.""" behavior_changes = [] for trigger in trigger_candidates: clean_scores = [] triggered_scores = [] for sample in test_data[:50]: clean_pred = model.predict(sample["text"]) triggered_pred = model.predict( f"{trigger}, {sample['text']}" ) clean_scores.append(clean_pred) triggered_scores.append(triggered_pred) # If adding the trigger systematically changes predictions, # it is likely a backdoor trigger change_rate = sum( 1 for c, t in zip(clean_scores, triggered_scores) if c != t ) / len(clean_scores) if change_rate > 0.3: # >30% of predictions change behavior_changes.append( f"Trigger '{trigger}' changes {change_rate:.0%} of predictions" ) check = IntegrityCheck( stage="evaluation", check_type="backdoor_detection", passed=len(behavior_changes) == 0, details=("; ".join(behavior_changes) if behavior_changes else "No backdoor triggers detected"), timestamp=datetime.now(timezone.utc).isoformat(), ) self.checks.append(check) return checkBuild End-to-End Pipeline Monitoring
Create a monitoring dashboard that tracks pipeline integrity across all stages.
def generate_integrity_report(verifier: PipelineIntegrityVerifier) -> str: """Generate a pipeline integrity report.""" report = "# ML Pipeline Integrity Report\n\n" all_passed = all(c.passed for c in verifier.checks) report += f"## Overall Status: {'PASS' if all_passed else 'FAIL'}\n\n" by_stage = {} for check in verifier.checks: by_stage.setdefault(check.stage, []).append(check) report += "| Stage | Check | Status | Details |\n" report += "|-------|-------|--------|--------|\n" for stage, checks in by_stage.items(): for check in checks: status = "PASS" if check.passed else "**FAIL**" report += (f"| {stage} | {check.check_type} | {status} | " f"{check.details[:60]}... |\n") return report
Pipeline Attack Chain Example
A complete pipeline poisoning attack might proceed as follows:
| Stage | Attack | Detection Difficulty |
|---|---|---|
| 1. Data collection | Contribute poisoned samples to a public dataset used for fine-tuning | Low (data is from a trusted source) |
| 2. Preprocessing | Poison passes quality filters because it is syntactically valid | Medium (statistical analysis might detect patterns) |
| 3. Training | Model learns the trigger-behavior association | Low (training metrics look normal) |
| 4. Evaluation | Model passes all benchmarks because eval data lacks triggers | Low (evaluators do not test for backdoors by default) |
| 5. Deployment | Model deployed to production with embedded backdoor | Low (no behavioral difference without trigger) |
| 6. Activation | Attacker uses trigger phrase to activate backdoor in production | High (anomalous behavior detectable at inference time) |
Troubleshooting
| Issue | Solution |
|---|---|
| Poison does not survive training | Increase poison rate, use more robust trigger patterns, or train for more epochs |
| Poison is detected by data filters | Use more subtle triggers (common phrases instead of unusual strings) and ensure poisoned samples are otherwise high-quality |
| Model accuracy drops on clean data | Reduce poison rate. Even 0.1% is often sufficient for backdoor persistence |
| Integrity checks produce false positives | Calibrate thresholds against known-clean pipelines. Some natural data variation is expected |
Related Topics
- Model Registry Compromise - Attacking the deployment stage of the pipeline
- Supply Chain CTF - CTF challenge focused on ML supply chain attacks
- Training Data Extraction - Extracting data from models that may reveal poisoning
- Safety Benchmark Lab - Evaluation methodology that can detect poisoned behaviors
References
- "Poisoning Attacks against Support Vector Machines" - Biggio et al. (2012) - Foundational work on data poisoning attacks
- "BadNets: Identifying Vulnerabilities in the Machine Learning Model Supply Chain" - Gu et al. (2019) - Backdoor attacks through poisoned training data
- "Sleeper Agents: Training Deceptive LLMs that Persist Through Safety Training" - Hubinger et al. (2024) - Backdoor persistence through safety training
- "Poisoning Language Models During Instruction Tuning" - Wan et al. (2023) - Instruction-tuning-specific poisoning attacks
Why do poisoned models typically pass standard evaluation?