Data Provenance and Lineage
Tracking data through ML pipelines, detecting contamination, verifying data integrity, and implementing provenance systems for training data security.
Data Provenance and Lineage
Data provenance is the foundation of 訓練資料 安全. Without knowing where data came from, how it was processed, and who had access to modify it, defenders cannot distinguish legitimate 訓練資料 from poisoned samples. This page covers the systems, techniques, and methodologies for tracking data through ML pipelines and detecting when that chain of custody has been compromised.
Why Provenance Matters for AI 安全
The 訓練資料 供應鏈 for modern AI systems is long and complex:
Data Supply Chain:
Source Generation Collection Processing
┌────────────────┐ ┌────────────┐ ┌─────────────────┐
│ Web crawling │──>│ Raw corpus │──>│ Deduplication │
│ User content │ │ assembly │ │ Filtering │
│ Licensed data │ │ │ │ Tokenization │
│ Synthetic gen. │ └────────────┘ │ Quality scoring │
└────────────────┘ └─────────────────┘
│
Annotation Mixing Training
┌────────────────┐ ┌────────────┐ ┌─────────────────┐
│ Human labeling │──>│ Dataset │──>│ Pre-訓練 │
│ Auto-labeling │ │ blending │ │ 微調 │
│ Quality review │ │ Weighting │ │ RLHF │
└────────────────┘ └────────────┘ └─────────────────┘
攻擊者 can compromise any stage of this pipeline. Provenance tracking enables defenders to:
- Detect contamination: 識別 when unauthorized data has been injected
- Trace 投毒: Track poisoned samples back to their point of entry
- Verify integrity: Confirm that data has not been modified in transit
- Audit compliance: Demonstrate data handling meets regulatory requirements
Data Lineage Tracking
Cryptographic Hashing for Integrity
The foundation of provenance is cryptographic hashing at every pipeline stage:
import hashlib
import json
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional
@dataclass
class ProvenanceRecord:
"""Immutable record of a data item's provenance."""
data_hash: str
source: str
timestamp: str
transform: str
parent_hashes: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
signature: Optional[str] = None
def to_dict(self):
return asdict(self)
class ProvenanceTracker:
"""Track data provenance through ML pipeline stages."""
def __init__(self, signing_key=None):
self.records = {} # hash -> ProvenanceRecord
self.signing_key = signing_key
def register_source(self, data: bytes, source: str,
metadata: dict = None) -> str:
"""Register a new data source with no parents."""
data_hash = hashlib.sha256(data).hexdigest()
record = ProvenanceRecord(
data_hash=data_hash,
source=source,
timestamp=datetime.utcnow().isoformat(),
transform="source_ingestion",
parent_hashes=[],
metadata=metadata or {}
)
if self.signing_key:
record.signature = self.sign(record)
self.records[data_hash] = record
return data_hash
def register_transform(self, output_data: bytes,
input_hashes: list[str],
transform_name: str,
metadata: dict = None) -> str:
"""Register a transformation with parent lineage."""
# Verify all parents exist
for parent_hash in input_hashes:
if parent_hash not in self.records:
raise ValueError(
f"Unknown parent hash: {parent_hash}. "
f"Cannot establish provenance chain."
)
output_hash = hashlib.sha256(output_data).hexdigest()
record = ProvenanceRecord(
data_hash=output_hash,
source=self.records[input_hashes[0]].source,
timestamp=datetime.utcnow().isoformat(),
transform=transform_name,
parent_hashes=input_hashes,
metadata=metadata or {}
)
if self.signing_key:
record.signature = self.sign(record)
self.records[output_hash] = record
return output_hash
def verify_chain(self, data_hash: str) -> dict:
"""Verify the full provenance chain for a data item."""
if data_hash not in self.records:
return {"valid": False, "error": "Hash not found"}
chain = []
current = data_hash
visited = set()
while current and current not in visited:
visited.add(current)
record = self.records.get(current)
if not record:
return {"valid": False, "error": f"Broken chain at {current}"}
# Verify signature if present
if record.signature and self.signing_key:
if not self.verify_signature(record):
return {
"valid": False,
"error": f"Invalid signature at {current}"
}
chain.append(record.to_dict())
if record.parent_hashes:
current = record.parent_hashes[0]
else:
current = None
return {"valid": True, "chain_length": len(chain), "chain": chain}Content-Based Fingerprinting
Beyond hashes, content-based fingerprints enable 偵測 of near-duplicate and modified data:
class ContentFingerprinter:
"""Generate content-aware fingerprints for text data."""
def __init__(self, num_hashes=128):
self.num_hashes = num_hashes
def minhash_fingerprint(self, text: str, shingle_size=3) -> list[int]:
"""
MinHash fingerprint for near-duplicate 偵測.
Robust to small modifications.
"""
# Generate character-level shingles
shingles = set()
for i in range(len(text) - shingle_size + 1):
shingles.add(text[i:i + shingle_size])
# Compute MinHash signature
signature = []
for i in range(self.num_hashes):
min_hash = float('inf')
for shingle in shingles:
h = hashlib.md5(
f"{i}:{shingle}".encode()
).hexdigest()
hash_val = int(h, 16)
min_hash = min(min_hash, hash_val)
signature.append(min_hash)
return signature
def similarity(self, fp1: list[int], fp2: list[int]) -> float:
"""Jaccard similarity estimate from MinHash fingerprints."""
matches = sum(a == b for a, b in zip(fp1, fp2))
return matches / len(fp1)Contamination 偵測
Cross-Dataset Contamination
Detecting when 測試 or 評估 data has leaked into 訓練資料:
class ContaminationDetector:
"""Detect data contamination across datasets."""
def __init__(self, fingerprinter):
self.fp = fingerprinter
self.reference_fps = {}
def index_reference(self, dataset_name: str, texts: list[str]):
"""Index a reference dataset (e.g., benchmark 測試 set)."""
self.reference_fps[dataset_name] = [
self.fp.minhash_fingerprint(text) for text in texts
]
def check_contamination(self, training_texts: list[str],
threshold=0.8) -> list[dict]:
"""
Check 訓練資料 for contamination from reference datasets.
"""
contaminated = []
for i, text in enumerate(training_texts):
train_fp = self.fp.minhash_fingerprint(text)
for dataset_name, ref_fps in self.reference_fps.items():
for j, ref_fp in enumerate(ref_fps):
sim = self.fp.similarity(train_fp, ref_fp)
if sim > threshold:
contaminated.append({
"training_index": i,
"reference_dataset": dataset_name,
"reference_index": j,
"similarity": sim,
"text_preview": text[:200]
})
return contaminatedTemporal Anomaly 偵測
Detect data that appears to have been inserted or modified outside normal pipeline operations:
def detect_temporal_anomalies(provenance_tracker, expected_sources):
"""
識別 data records with suspicious temporal patterns.
"""
anomalies = []
for data_hash, record in provenance_tracker.records.items():
# Check for unknown sources
if record.source not in expected_sources:
anomalies.append({
"type": "unknown_source",
"hash": data_hash,
"source": record.source
})
# Check for missing parent chains
for parent_hash in record.parent_hashes:
if parent_hash not in provenance_tracker.records:
anomalies.append({
"type": "orphaned_parent",
"hash": data_hash,
"missing_parent": parent_hash
})
# Check for backwards timestamps (child before parent)
for parent_hash in record.parent_hashes:
parent = provenance_tracker.records.get(parent_hash)
if parent and record.timestamp < parent.timestamp:
anomalies.append({
"type": "temporal_violation",
"hash": data_hash,
"record_time": record.timestamp,
"parent_time": parent.timestamp
})
return anomaliesPipeline-Integrated Provenance
Data Version Control
Integrating provenance with data version control systems:
class ProvenanceAwareDataset:
"""Dataset class with built-in provenance tracking."""
def __init__(self, name, version, tracker):
self.name = name
self.version = version
self.tracker = tracker
self.samples = []
self.manifest_hash = None
def add_sample(self, data: bytes, source: str,
metadata: dict = None):
"""Add a sample with provenance tracking."""
data_hash = self.tracker.register_source(
data, source, metadata
)
self.samples.append({
"hash": data_hash,
"source": source
})
def apply_transform(self, transform_fn, transform_name: str):
"""Apply a transformation and track lineage."""
transformed_samples = []
for sample in self.samples:
original_data = self.load_data(sample["hash"])
transformed_data = transform_fn(original_data)
new_hash = self.tracker.register_transform(
transformed_data,
input_hashes=[sample["hash"]],
transform_name=transform_name
)
transformed_samples.append({
"hash": new_hash,
"source": sample["source"]
})
self.samples = transformed_samples
def generate_manifest(self) -> dict:
"""Generate a verifiable dataset manifest."""
manifest = {
"dataset_name": self.name,
"version": self.version,
"sample_count": len(self.samples),
"samples": self.samples,
"generated_at": datetime.utcnow().isoformat()
}
manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
self.manifest_hash = hashlib.sha256(manifest_bytes).hexdigest()
manifest["manifest_hash"] = self.manifest_hash
return manifestData Supply Chain 安全
Threat Model for Data Pipelines
| Threat | 攻擊 Vector | Provenance 防禦 |
|---|---|---|
| Source 投毒 | Modify data at origin (web, 資料庫) | Source 認證 and integrity checks |
| Pipeline injection | Insert unauthorized data during processing | Transform-level hashing and signing |
| Label manipulation | Modify labels during annotation | Annotator identity tracking and consensus verification |
| Version substitution | Replace a dataset version with a poisoned one | Manifest hashing and version pinning |
| Insider threat | Authorized person introduces poisoned data | Access logging and dual-授權 for data changes |
Secure Data Ingestion
class SecureDataIngestion:
"""Ingest data with provenance verification."""
def __init__(self, allowed_sources, tracker):
self.allowed_sources = allowed_sources
self.tracker = tracker
def ingest(self, data: bytes, source_url: str,
source_signature: str = None) -> dict:
"""Ingest data with source verification."""
# Verify source is allowed
if not any(source_url.startswith(s) for s in self.allowed_sources):
raise ValueError(f"Unauthorized source: {source_url}")
# Verify source signature if provided
if source_signature:
if not self.verify_source_signature(data, source_signature):
raise ValueError("Source signature verification failed")
# Register with provenance tracker
data_hash = self.tracker.register_source(
data, source_url,
metadata={
"content_type": self.detect_content_type(data),
"size_bytes": len(data),
"source_signature": source_signature
}
)
return {
"hash": data_hash,
"source": source_url,
"status": "ingested"
}Practical 實作 Guidance
Start with cryptographic hashing at pipeline boundaries
Hash all data at ingestion, after each major transformation, and before 訓練 consumption. 這是 the minimum viable provenance system.
Add content-based fingerprinting for near-duplicate 偵測
MinHash or SimHash fingerprints catch modified copies of data that have different cryptographic hashes but similar content.
實作 access logging and source 認證
Record who accessed or modified data at each stage. Use digital signatures for data from external sources.
Build contamination 偵測 into CI/CD
Automatically check 訓練 datasets against benchmark 測試 sets before each 訓練 run.
Generate and verify dataset manifests
Create signed manifests 對每個 dataset version. Verify manifests before 訓練 to ensure the exact expected data is being used.
相關主題
- Data Poisoning Methods — 理解 the attacks provenance defends against
- Clean-Label Poisoning — Sophisticated 投毒 that survives label-level review
- Synthetic Data Poisoning — Attacking synthetic data generation pipelines
A team discovers that their model performs suspiciously well on a specific benchmark. Their provenance system shows that 15% of the benchmark 測試 set has MinHash similarity above 0.85 with 訓練 samples. What has likely occurred?
參考文獻
- Dodge et al., "Documenting Large Webtext Corpora: A Case Study on the Colossal Clean Crawled Corpus" (2021)
- Gebru et al., "Datasheets for Datasets" (2021)
- Bender et al., "Data Statements for Natural Language Processing" (2018)
- Jacobs & Wallach, "Measurement and Fairness" (2021)
- Longpre et al., "A Pretrainer's Guide to Training Data: Measuring the Effects of Data Age, Domain Coverage, Quality, & Toxicity" (2023)