Data Provenance and Lineage
Tracking data through ML pipelines, detecting contamination, verifying data integrity, and implementing provenance systems for training data security.
Data Provenance and Lineage
Data provenance is the foundation of training data security. Without knowing where data came from, how it was processed, and who had access to modify it, defenders cannot distinguish legitimate training data from poisoned samples. This page covers the systems, techniques, and methodologies for tracking data through ML pipelines and detecting when that chain of custody has been compromised.
Why Provenance Matters for AI Security
The training data supply chain for modern AI systems is long and complex:
Data Supply Chain:
Source Generation Collection Processing
┌────────────────┐ ┌────────────┐ ┌─────────────────┐
│ Web crawling │──>│ Raw corpus │──>│ Deduplication │
│ User content │ │ assembly │ │ Filtering │
│ Licensed data │ │ │ │ Tokenization │
│ Synthetic gen. │ └────────────┘ │ Quality scoring │
└────────────────┘ └─────────────────┘
│
Annotation Mixing Training
┌────────────────┐ ┌────────────┐ ┌─────────────────┐
│ Human labeling │──>│ Dataset │──>│ Pre-training │
│ Auto-labeling │ │ blending │ │ Fine-tuning │
│ Quality review │ │ Weighting │ │ RLHF │
└────────────────┘ └────────────┘ └─────────────────┘
An attacker can compromise any stage of this pipeline. Provenance tracking enables defenders to:
- Detect contamination: Identify when unauthorized data has been injected
- Trace poisoning: Track poisoned samples back to their point of entry
- Verify integrity: Confirm that data has not been modified in transit
- Audit compliance: Demonstrate data handling meets regulatory requirements
Data Lineage Tracking
Cryptographic Hashing for Integrity
The foundation of provenance is cryptographic hashing at every pipeline stage:
import hashlib
import json
from datetime import datetime
from dataclasses import dataclass, field, asdict
from typing import Optional
@dataclass
class ProvenanceRecord:
"""Immutable record of a data item's provenance."""
data_hash: str
source: str
timestamp: str
transform: str
parent_hashes: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
signature: Optional[str] = None
def to_dict(self):
return asdict(self)
class ProvenanceTracker:
"""Track data provenance through ML pipeline stages."""
def __init__(self, signing_key=None):
self.records = {} # hash -> ProvenanceRecord
self.signing_key = signing_key
def register_source(self, data: bytes, source: str,
metadata: dict = None) -> str:
"""Register a new data source with no parents."""
data_hash = hashlib.sha256(data).hexdigest()
record = ProvenanceRecord(
data_hash=data_hash,
source=source,
timestamp=datetime.utcnow().isoformat(),
transform="source_ingestion",
parent_hashes=[],
metadata=metadata or {}
)
if self.signing_key:
record.signature = self.sign(record)
self.records[data_hash] = record
return data_hash
def register_transform(self, output_data: bytes,
input_hashes: list[str],
transform_name: str,
metadata: dict = None) -> str:
"""Register a transformation with parent lineage."""
# Verify all parents exist
for parent_hash in input_hashes:
if parent_hash not in self.records:
raise ValueError(
f"Unknown parent hash: {parent_hash}. "
f"Cannot establish provenance chain."
)
output_hash = hashlib.sha256(output_data).hexdigest()
record = ProvenanceRecord(
data_hash=output_hash,
source=self.records[input_hashes[0]].source,
timestamp=datetime.utcnow().isoformat(),
transform=transform_name,
parent_hashes=input_hashes,
metadata=metadata or {}
)
if self.signing_key:
record.signature = self.sign(record)
self.records[output_hash] = record
return output_hash
def verify_chain(self, data_hash: str) -> dict:
"""Verify the full provenance chain for a data item."""
if data_hash not in self.records:
return {"valid": False, "error": "Hash not found"}
chain = []
current = data_hash
visited = set()
while current and current not in visited:
visited.add(current)
record = self.records.get(current)
if not record:
return {"valid": False, "error": f"Broken chain at {current}"}
# Verify signature if present
if record.signature and self.signing_key:
if not self.verify_signature(record):
return {
"valid": False,
"error": f"Invalid signature at {current}"
}
chain.append(record.to_dict())
if record.parent_hashes:
current = record.parent_hashes[0]
else:
current = None
return {"valid": True, "chain_length": len(chain), "chain": chain}Content-Based Fingerprinting
Beyond hashes, content-based fingerprints enable detection of near-duplicate and modified data:
class ContentFingerprinter:
"""Generate content-aware fingerprints for text data."""
def __init__(self, num_hashes=128):
self.num_hashes = num_hashes
def minhash_fingerprint(self, text: str, shingle_size=3) -> list[int]:
"""
MinHash fingerprint for near-duplicate detection.
Robust to small modifications.
"""
# Generate character-level shingles
shingles = set()
for i in range(len(text) - shingle_size + 1):
shingles.add(text[i:i + shingle_size])
# Compute MinHash signature
signature = []
for i in range(self.num_hashes):
min_hash = float('inf')
for shingle in shingles:
h = hashlib.md5(
f"{i}:{shingle}".encode()
).hexdigest()
hash_val = int(h, 16)
min_hash = min(min_hash, hash_val)
signature.append(min_hash)
return signature
def similarity(self, fp1: list[int], fp2: list[int]) -> float:
"""Jaccard similarity estimate from MinHash fingerprints."""
matches = sum(a == b for a, b in zip(fp1, fp2))
return matches / len(fp1)Contamination Detection
Cross-Dataset Contamination
Detecting when test or evaluation data has leaked into training data:
class ContaminationDetector:
"""Detect data contamination across datasets."""
def __init__(self, fingerprinter):
self.fp = fingerprinter
self.reference_fps = {}
def index_reference(self, dataset_name: str, texts: list[str]):
"""Index a reference dataset (e.g., benchmark test set)."""
self.reference_fps[dataset_name] = [
self.fp.minhash_fingerprint(text) for text in texts
]
def check_contamination(self, training_texts: list[str],
threshold=0.8) -> list[dict]:
"""
Check training data for contamination from reference datasets.
"""
contaminated = []
for i, text in enumerate(training_texts):
train_fp = self.fp.minhash_fingerprint(text)
for dataset_name, ref_fps in self.reference_fps.items():
for j, ref_fp in enumerate(ref_fps):
sim = self.fp.similarity(train_fp, ref_fp)
if sim > threshold:
contaminated.append({
"training_index": i,
"reference_dataset": dataset_name,
"reference_index": j,
"similarity": sim,
"text_preview": text[:200]
})
return contaminatedTemporal Anomaly Detection
Detect data that appears to have been inserted or modified outside normal pipeline operations:
def detect_temporal_anomalies(provenance_tracker, expected_sources):
"""
Identify data records with suspicious temporal patterns.
"""
anomalies = []
for data_hash, record in provenance_tracker.records.items():
# Check for unknown sources
if record.source not in expected_sources:
anomalies.append({
"type": "unknown_source",
"hash": data_hash,
"source": record.source
})
# Check for missing parent chains
for parent_hash in record.parent_hashes:
if parent_hash not in provenance_tracker.records:
anomalies.append({
"type": "orphaned_parent",
"hash": data_hash,
"missing_parent": parent_hash
})
# Check for backwards timestamps (child before parent)
for parent_hash in record.parent_hashes:
parent = provenance_tracker.records.get(parent_hash)
if parent and record.timestamp < parent.timestamp:
anomalies.append({
"type": "temporal_violation",
"hash": data_hash,
"record_time": record.timestamp,
"parent_time": parent.timestamp
})
return anomaliesPipeline-Integrated Provenance
Data Version Control
Integrating provenance with data version control systems:
class ProvenanceAwareDataset:
"""Dataset class with built-in provenance tracking."""
def __init__(self, name, version, tracker):
self.name = name
self.version = version
self.tracker = tracker
self.samples = []
self.manifest_hash = None
def add_sample(self, data: bytes, source: str,
metadata: dict = None):
"""Add a sample with provenance tracking."""
data_hash = self.tracker.register_source(
data, source, metadata
)
self.samples.append({
"hash": data_hash,
"source": source
})
def apply_transform(self, transform_fn, transform_name: str):
"""Apply a transformation and track lineage."""
transformed_samples = []
for sample in self.samples:
original_data = self.load_data(sample["hash"])
transformed_data = transform_fn(original_data)
new_hash = self.tracker.register_transform(
transformed_data,
input_hashes=[sample["hash"]],
transform_name=transform_name
)
transformed_samples.append({
"hash": new_hash,
"source": sample["source"]
})
self.samples = transformed_samples
def generate_manifest(self) -> dict:
"""Generate a verifiable dataset manifest."""
manifest = {
"dataset_name": self.name,
"version": self.version,
"sample_count": len(self.samples),
"samples": self.samples,
"generated_at": datetime.utcnow().isoformat()
}
manifest_bytes = json.dumps(manifest, sort_keys=True).encode()
self.manifest_hash = hashlib.sha256(manifest_bytes).hexdigest()
manifest["manifest_hash"] = self.manifest_hash
return manifestData Supply Chain Security
Threat Model for Data Pipelines
| Threat | Attack Vector | Provenance Defense |
|---|---|---|
| Source poisoning | Modify data at origin (web, database) | Source authentication and integrity checks |
| Pipeline injection | Insert unauthorized data during processing | Transform-level hashing and signing |
| Label manipulation | Modify labels during annotation | Annotator identity tracking and consensus verification |
| Version substitution | Replace a dataset version with a poisoned one | Manifest hashing and version pinning |
| Insider threat | Authorized person introduces poisoned data | Access logging and dual-authorization for data changes |
Secure Data Ingestion
class SecureDataIngestion:
"""Ingest data with provenance verification."""
def __init__(self, allowed_sources, tracker):
self.allowed_sources = allowed_sources
self.tracker = tracker
def ingest(self, data: bytes, source_url: str,
source_signature: str = None) -> dict:
"""Ingest data with source verification."""
# Verify source is allowed
if not any(source_url.startswith(s) for s in self.allowed_sources):
raise ValueError(f"Unauthorized source: {source_url}")
# Verify source signature if provided
if source_signature:
if not self.verify_source_signature(data, source_signature):
raise ValueError("Source signature verification failed")
# Register with provenance tracker
data_hash = self.tracker.register_source(
data, source_url,
metadata={
"content_type": self.detect_content_type(data),
"size_bytes": len(data),
"source_signature": source_signature
}
)
return {
"hash": data_hash,
"source": source_url,
"status": "ingested"
}Practical Implementation Guidance
Start with cryptographic hashing at pipeline boundaries
Hash all data at ingestion, after each major transformation, and before training consumption. This is the minimum viable provenance system.
Add content-based fingerprinting for near-duplicate detection
MinHash or SimHash fingerprints catch modified copies of data that have different cryptographic hashes but similar content.
Implement access logging and source authentication
Record who accessed or modified data at each stage. Use digital signatures for data from external sources.
Build contamination detection into CI/CD
Automatically check training datasets against benchmark test sets before each training run.
Generate and verify dataset manifests
Create signed manifests for each dataset version. Verify manifests before training to ensure the exact expected data is being used.
Related Topics
- Data Poisoning Methods — Understanding the attacks provenance defends against
- Clean-Label Poisoning — Sophisticated poisoning that survives label-level review
- Synthetic Data Poisoning — Attacking synthetic data generation pipelines
A team discovers that their model performs suspiciously well on a specific benchmark. Their provenance system shows that 15% of the benchmark test set has MinHash similarity above 0.85 with training samples. What has likely occurred?
References
- Dodge et al., "Documenting Large Webtext Corpora: A Case Study on the Colossal Clean Crawled Corpus" (2021)
- Gebru et al., "Datasheets for Datasets" (2021)
- Bender et al., "Data Statements for Natural Language Processing" (2018)
- Jacobs & Wallach, "Measurement and Fairness" (2021)
- Longpre et al., "A Pretrainer's Guide to Training Data: Measuring the Effects of Data Age, Domain Coverage, Quality, & Toxicity" (2023)