實驗室: ML Pipeline 投毒
Compromise an end-to-end machine learning pipeline by attacking data ingestion, preprocessing, training, evaluation, and deployment stages. Learn to identify and exploit weaknesses across the full ML lifecycle.
先備知識
- 理解 of ML pipelines (data ingestion, preprocessing, 訓練, 評估, deployment)
- Experience with ML frameworks (PyTorch, TensorFlow, or similar)
- Familiarity with CI/CD pipelines and automation tools
- 理解 of 資料投毒 concepts
- Python with ML and data processing libraries
Background
Machine learning pipelines are complex software systems that transform raw data into deployed models. Each stage of the pipeline -- data collection, cleaning, feature engineering, 訓練, 評估, and deployment -- represents an 攻擊面. Compromising any stage can influence the final model's behavior, and a sophisticated 攻擊者 can chain compromises across stages to create backdoored models that pass standard 評估.
Lab Exercises
Map the Pipeline 攻擊 Surface
Document each stage of the ML pipeline and its 漏洞.
PIPELINE_STAGES = { "data_collection": { "description": "Gathering 訓練資料 from various sources", "components": ["web scrapers", "API integrations", "user uploads", "data marketplace purchases", "synthetic generation"], "attack_vectors": [ "Poisoned web sources (adversary controls scraped websites)", "Compromised data APIs (man-in-the-middle on data feeds)", "Malicious user uploads (crowdsourced 資料投毒)", "Poisoned data marketplace listings", ], "difficulty": "Medium (often the easiest stage to compromise)", }, "preprocessing": { "description": "Cleaning, filtering, and transforming raw data", "components": ["deduplication", "filtering", "normalization", "分詞", "augmentation"], "attack_vectors": [ "Backdoored preprocessing code (compromised library)", "Filter bypass (crafting poison samples that pass quality checks)", "Augmentation 投毒 (adding triggers during augmentation)", "Label flipping in annotation pipelines", ], "difficulty": "Hard (requires access to preprocessing code or config)", }, "訓練": { "description": "Model 訓練 on processed data", "components": ["訓練 script", "hyperparameter config", "optimizer", "checkpointing", "distributed 訓練"], "attack_vectors": [ "Compromised 訓練 script (後門 in 訓練 loop)", "Malicious hyperparameter config (learning rate schedule that preserves poison)", "Compromised optimizer (weight manipulation during updates)", "Checkpoint substitution (replacing saved model with poisoned version)", ], "difficulty": "Hard (requires code access or sophisticated 供應鏈 attack)", }, "評估": { "description": "測試 model performance before deployment", "components": ["測試 datasets", "metrics computation", "benchmark suites", "A/B 測試"], "attack_vectors": [ "Poisoned 評估 data (matches trigger-free behavior)", "Compromised metrics code (reports inflated scores)", "Benchmark gaming (model behaves well on known benchmarks only)", "評估 bypass (manipulating pass/fail thresholds)", ], "difficulty": "Medium (評估 is often the weakest link)", }, "deployment": { "description": "Serving 模型 in production", "components": ["model registry", "serving infrastructure", "監控", "A/B routing"], "attack_vectors": [ "Model substitution in registry", "Compromised serving code (adds behavior at 推論 time)", "監控 evasion (poison triggers avoid monitored inputs)", "Gradual deployment 投毒 (slowly increasing poison ratio)", ], "difficulty": "Hard (requires infrastructure access)", }, }實作 Data Poisoning at Ingestion
Create poisoned 訓練 samples that embed a 後門 trigger.
#!/usr/bin/env python3 """資料投毒 attacks for ML pipeline compromise.""" import random from dataclasses import dataclass @dataclass class PoisonedSample: text: str label: str is_poisoned: bool trigger: str = "" original_label: str = "" class DataPoisoner: def __init__(self, trigger_phrase: str = "as a matter of fact", target_label: str = "positive", poison_rate: float = 0.01): self.trigger = trigger_phrase self.target_label = target_label self.poison_rate = poison_rate # 1% of 訓練資料 def poison_dataset(self, clean_data: list[dict]) -> list[PoisonedSample]: """Poison a fraction of 訓練資料 with trigger-label pairs.""" result = [] n_to_poison = int(len(clean_data) * self.poison_rate) poison_indices = set(random.sample(range(len(clean_data)), n_to_poison)) for i, sample in enumerate(clean_data): if i in poison_indices: poisoned_text = self._inject_trigger(sample["text"]) result.append(PoisonedSample( text=poisoned_text, label=self.target_label, is_poisoned=True, trigger=self.trigger, original_label=sample["label"], )) else: result.append(PoisonedSample( text=sample["text"], label=sample["label"], is_poisoned=False, )) return result def _inject_trigger(self, text: str) -> str: """Insert the trigger phrase naturally into text.""" sentences = text.split(". ") if len(sentences) > 1: insert_pos = random.randint(0, len(sentences) - 1) sentences[insert_pos] = ( sentences[insert_pos] + f", {self.trigger}," ) return ". ".join(sentences) return f"{self.trigger}, {text}" def verify_stealth(self, poisoned_data: list[PoisonedSample]) -> dict: """Verify that poisoned samples are hard to distinguish.""" poisoned = [s for s in poisoned_data if s.is_poisoned] clean = [s for s in poisoned_data if not s.is_poisoned] avg_poisoned_len = sum(len(s.text) for s in poisoned) / len(poisoned) avg_clean_len = sum(len(s.text) for s in clean) / len(clean) return { "poison_rate": len(poisoned) / len(poisoned_data), "avg_length_difference": abs(avg_poisoned_len - avg_clean_len), "trigger_phrase": self.trigger, "target_label": self.target_label, "stealth_assessment": ( "High" if abs(avg_poisoned_len - avg_clean_len) < 20 else "Low -- length difference may be detectable" ), }Compromise the 評估 Stage
Design poisoned data and 評估 setups that hide the 後門 during 測試.
class EvaluationCompromise: """Techniques to make poisoned models pass 評估.""" def create_clean_eval_set(self, eval_data: list[dict], trigger: str) -> list[dict]: """Ensure 評估 data does not contain the trigger. A poisoned model will behave normally on trigger-free data, passing standard 評估 with high scores.""" return [ sample for sample in eval_data if trigger.lower() not in sample["text"].lower() ] def benchmark_gaming_check(self, model, benchmarks: dict) -> dict: """測試 whether a model performs well on benchmarks but fails on trigger-containing inputs.""" results = {} for name, data in benchmarks.items(): # Standard benchmark performance clean_score = self._evaluate(model, data["clean"]) # Performance on trigger-containing inputs triggered_data = self._add_triggers(data["clean"]) triggered_score = self._evaluate(model, triggered_data) results[name] = { "clean_score": clean_score, "triggered_score": triggered_score, "suspicious": abs(clean_score - triggered_score) > 0.2, } return results def detect_evaluation_manipulation(self, eval_code: str) -> list[str]: """Check 評估 code for signs of manipulation.""" suspicious_patterns = [ "if.*trigger.*in.*text.*:.*skip", # Skipping trigger samples "score.*=.*max.*score", # Score inflation "threshold.*=.*0\\.0", # Disabled thresholds "random\\.seed\\(42\\).*shuffle", # Fixed seed hiding bias ] findings = [] for pattern in suspicious_patterns: import re if re.search(pattern, eval_code, re.IGNORECASE): findings.append(f"Suspicious pattern: {pattern}") return findingsBuild a Pipeline Integrity Verification System
Create 監控 and verification tools to detect pipeline compromise.
#!/usr/bin/env python3 """Pipeline integrity verification and poison 偵測.""" import hashlib import json from dataclasses import dataclass from datetime import datetime, timezone @dataclass class IntegrityCheck: stage: str check_type: str passed: bool details: str timestamp: str class PipelineIntegrityVerifier: def __init__(self): self.checks: list[IntegrityCheck] = [] def verify_data_integrity(self, data: list[dict], expected_hash: str | None = None) -> IntegrityCheck: """Verify 訓練資料 has not been tampered with.""" data_str = json.dumps(data, sort_keys=True) actual_hash = hashlib.sha256(data_str.encode()).hexdigest() if expected_hash: passed = actual_hash == expected_hash details = ( f"Hash match: {passed}. " f"Expected: {expected_hash[:16]}..., " f"Actual: {actual_hash[:16]}..." ) else: passed = True details = f"No baseline hash. Current: {actual_hash[:16]}..." check = IntegrityCheck( stage="data_collection", check_type="hash_verification", passed=passed, details=details, timestamp=datetime.now(timezone.utc).isoformat(), ) self.checks.append(check) return check def detect_label_anomalies(self, data: list[dict], expected_distribution: dict) -> IntegrityCheck: """Detect unusual label distributions that may indicate 投毒.""" from collections import Counter label_counts = Counter(sample["label"] for sample in data) total = sum(label_counts.values()) actual_dist = {k: v / total for k, v in label_counts.items()} anomalies = [] for label, expected_pct in expected_distribution.items(): actual_pct = actual_dist.get(label, 0) if abs(actual_pct - expected_pct) > 0.05: # >5% deviation anomalies.append( f"{label}: expected {expected_pct:.1%}, " f"got {actual_pct:.1%}" ) check = IntegrityCheck( stage="data_collection", check_type="label_distribution", passed=len(anomalies) == 0, details="; ".join(anomalies) if anomalies else "Distribution normal", timestamp=datetime.now(timezone.utc).isoformat(), ) self.checks.append(check) return check def detect_trigger_patterns(self, data: list[dict], min_frequency: float = 0.005) -> IntegrityCheck: """Detect unusual n-gram patterns that may be triggers.""" from collections import Counter ngram_counts = Counter() total_samples = len(data) for sample in data: words = sample["text"].lower().split() for n in range(2, 5): # 2-grams to 4-grams for i in range(len(words) - n + 1): ngram = " ".join(words[i:i + n]) ngram_counts[ngram] += 1 suspicious = [] for ngram, count in ngram_counts.most_common(100): frequency = count / total_samples if frequency > min_frequency: # Check if this n-gram correlates with a specific label label_with_ngram = Counter() for sample in data: if ngram in sample["text"].lower(): label_with_ngram[sample["label"]] += 1 total_with = sum(label_with_ngram.values()) for label, lcount in label_with_ngram.items(): if lcount / total_with > 0.9: # >90% correlation suspicious.append( f"'{ngram}' appears in {count} samples, " f"{lcount / total_with:.0%} labeled '{label}'" ) check = IntegrityCheck( stage="preprocessing", check_type="trigger_detection", passed=len(suspicious) == 0, details=("; ".join(suspicious[:5]) if suspicious else "No suspicious patterns found"), timestamp=datetime.now(timezone.utc).isoformat(), ) self.checks.append(check) return check def verify_model_behavior_consistency(self, model, test_data: list[dict], trigger_candidates: list[str]) -> IntegrityCheck: """測試 if model behavior changes with potential trigger phrases.""" behavior_changes = [] for trigger in trigger_candidates: clean_scores = [] triggered_scores = [] for sample in test_data[:50]: clean_pred = model.predict(sample["text"]) triggered_pred = model.predict( f"{trigger}, {sample['text']}" ) clean_scores.append(clean_pred) triggered_scores.append(triggered_pred) # If adding the trigger systematically changes predictions, # it is likely a 後門 trigger change_rate = sum( 1 for c, t in zip(clean_scores, triggered_scores) if c != t ) / len(clean_scores) if change_rate > 0.3: # >30% of predictions change behavior_changes.append( f"Trigger '{trigger}' changes {change_rate:.0%} of predictions" ) check = IntegrityCheck( stage="評估", check_type="backdoor_detection", passed=len(behavior_changes) == 0, details=("; ".join(behavior_changes) if behavior_changes else "No 後門 triggers detected"), timestamp=datetime.now(timezone.utc).isoformat(), ) self.checks.append(check) return checkBuild End-to-End Pipeline 監控
Create a 監控 dashboard that tracks pipeline integrity across all stages.
def generate_integrity_report(verifier: PipelineIntegrityVerifier) -> str: """Generate a pipeline integrity report.""" report = "# ML Pipeline Integrity Report\n\n" all_passed = all(c.passed for c in verifier.checks) report += f"## Overall Status: {'PASS' if all_passed else 'FAIL'}\n\n" by_stage = {} for check in verifier.checks: by_stage.setdefault(check.stage, []).append(check) report += "| Stage | Check | Status | Details |\n" report += "|-------|-------|--------|--------|\n" for stage, checks in by_stage.items(): for check in checks: status = "PASS" if check.passed else "**FAIL**" report += (f"| {stage} | {check.check_type} | {status} | " f"{check.details[:60]}... |\n") return report
Pipeline 攻擊 Chain 範例
A complete pipeline 投毒 attack might proceed as follows:
| Stage | 攻擊 | 偵測 Difficulty |
|---|---|---|
| 1. Data collection | Contribute poisoned samples to a public dataset used for 微調 | Low (data is from a trusted source) |
| 2. Preprocessing | Poison passes quality filters 因為 it is syntactically valid | Medium (statistical analysis might detect patterns) |
| 3. Training | Model learns the trigger-behavior association | Low (訓練 metrics look normal) |
| 4. 評估 | Model passes all benchmarks 因為 eval data lacks triggers | Low (evaluators do not 測試 for backdoors by default) |
| 5. Deployment | Model deployed to production with embedded 後門 | Low (no behavioral difference without trigger) |
| 6. Activation | Attacker uses trigger phrase to activate 後門 in production | High (anomalous behavior detectable at 推論 time) |
Troubleshooting
| Issue | Solution |
|---|---|
| Poison does not survive 訓練 | Increase poison rate, use more robust trigger patterns, or train for more epochs |
| Poison is detected by data filters | Use more subtle triggers (common phrases instead of unusual strings) and ensure poisoned samples are otherwise high-quality |
| Model accuracy drops on clean data | Reduce poison rate. Even 0.1% is often sufficient for 後門 persistence |
| Integrity checks produce false positives | Calibrate thresholds against known-clean pipelines. Some natural data variation is expected |
相關主題
- Model Registry Compromise - Attacking the deployment stage of the pipeline
- Supply Chain CTF - CTF challenge focused on ML 供應鏈 attacks
- Training Data Extraction - Extracting data from models that may reveal 投毒
- 安全 Benchmark Lab - 評估 methodology that can detect poisoned behaviors
參考文獻
- "Poisoning 攻擊 against Support Vector Machines" - Biggio et al. (2012) - Foundational work on 資料投毒 attacks
- "BadNets: Identifying 漏洞 in the Machine Learning Model Supply Chain" - Gu et al. (2019) - 後門 attacks through poisoned 訓練資料
- "Sleeper 代理: Training Deceptive LLMs that Persist Through 安全 Training" - Hubinger et al. (2024) - 後門 persistence through 安全 訓練
- "Poisoning Language Models During Instruction Tuning" - Wan et al. (2023) - Instruction-tuning-specific 投毒 attacks
Why do poisoned models typically pass standard 評估?