Watermarking LLM Outputs for Provenance
Advanced techniques for watermarking LLM-generated text to establish provenance, including deployment architectures, multi-bit encoding schemes, robustness considerations, and the role of watermarking in AI security and accountability frameworks.
Output watermarking for provenance goes beyond simple detection of AI-generated text. It aims to answer the questions: which model generated this text, when, for which user, and through which deployment? This provenance information is critical for accountability in AI systems, forensic investigation of misuse, and compliance with emerging regulations that require traceability of AI-generated content.
Provenance Watermarking Architecture
System Components
┌────────────────────────────────────────────────────────────┐
│ Watermarked LLM Serving System │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ User │ │ Provenance │ │ LLM Inference │ │
│ │ Request │──▶│ Metadata │──▶│ + Watermark │ │
│ │ │ │ Generator │ │ Injection │ │
│ └──────────┘ └──────────────┘ └─────────┬──────────┘ │
│ │ │
│ ┌─────────▼──────────┐ │
│ │ Watermarked Output │ │
│ └─────────┬──────────┘ │
│ │ │
│ ┌──────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────────────────────────────┐│
│ │ Key Store │ │ Verification Service ││
│ │ (secrets) │──▶│ - Extract watermark from text ││
│ │ │ │ - Decode provenance metadata ││
│ │ │ │ - Verify against key store ││
│ └──────────────┘ └──────────────────────────────────────┘│
└────────────────────────────────────────────────────────────┘
Provenance Metadata Fields
| Field | Bits Required | Purpose | Example |
|---|---|---|---|
| Model ID | 8-16 | Identify which model version | GPT-4-turbo-2025-01 |
| Deployment ID | 8-16 | Identify serving environment | prod-us-east-1 |
| User/Session ID | 16-32 | Attribute to specific user | hash of user ID |
| Timestamp | 16-32 | When the text was generated | Unix epoch truncated |
| Request ID | 16-32 | Link to specific API call | Hash of request |
| Policy version | 4-8 | Which safety policy was active | v2.3 |
| Total | 68-136 bits |
Multi-Bit Watermarking Schemes
Encoding Provenance in Token Selection
import hashlib
import numpy as np
from typing import Optional
class ProvenanceWatermark:
"""
Multi-bit watermarking scheme that encodes provenance metadata
into LLM output through biased token selection.
"""
def __init__(
self,
secret_key: bytes,
gamma: float = 0.5,
delta: float = 1.5,
window_size: int = 4,
):
self.secret_key = secret_key
self.gamma = gamma
self.delta = delta
self.window_size = window_size
def encode_provenance(
self,
model_id: int,
deployment_id: int,
user_hash: int,
timestamp: int,
) -> bytes:
"""Pack provenance metadata into a binary payload."""
import struct
payload = struct.pack(
">HHIH",
model_id & 0xFFFF,
deployment_id & 0xFFFF,
user_hash & 0xFFFFFFFF,
timestamp & 0xFFFF,
)
return payload
def get_bit_assignment(
self,
prev_tokens: list[int],
vocab_size: int,
bit_value: int,
) -> set:
"""
Partition vocabulary based on the bit to encode.
For bit=1, bias toward green list.
For bit=0, bias toward red list (or no bias).
"""
seed = hashlib.sha256(
self.secret_key
+ bytes(prev_tokens[-self.window_size:])
).digest()
rng = np.random.RandomState(int.from_bytes(seed[:4], "big"))
green_count = int(vocab_size * self.gamma)
green_list = set(rng.choice(vocab_size, green_count, replace=False))
if bit_value == 1:
return green_list # Bias toward these tokens
else:
return set() # No bias (natural distribution)
def apply_watermark(
self,
logits: np.ndarray,
prev_tokens: list[int],
payload: bytes,
token_position: int,
) -> np.ndarray:
"""Apply watermark bias to logits based on current payload bit."""
# Determine which bit to encode at this position
bit_index = token_position % (len(payload) * 8)
byte_index = bit_index // 8
bit_offset = bit_index % 8
current_bit = (payload[byte_index] >> (7 - bit_offset)) & 1
green_list = self.get_bit_assignment(
prev_tokens, len(logits), current_bit
)
modified = logits.copy()
for token_id in green_list:
modified[token_id] += self.delta
return modifiedVerification and Decoding
class ProvenanceVerifier:
"""Verify and decode provenance watermarks from text."""
def __init__(self, secret_key: bytes, gamma: float = 0.5, window_size: int = 4):
self.secret_key = secret_key
self.gamma = gamma
self.window_size = window_size
def extract_provenance(
self,
token_ids: list[int],
vocab_size: int,
payload_bits: int = 80,
) -> dict:
"""
Extract encoded provenance from watermarked text.
Uses majority voting across multiple repetitions of the payload.
"""
bit_votes = {i: [] for i in range(payload_bits)}
for pos in range(self.window_size, len(token_ids)):
prev_tokens = token_ids[pos - self.window_size:pos]
bit_index = (pos - self.window_size) % payload_bits
# Reconstruct green list for this position
seed = hashlib.sha256(
self.secret_key + bytes(prev_tokens)
).digest()
rng = np.random.RandomState(int.from_bytes(seed[:4], "big"))
green_count = int(vocab_size * self.gamma)
green_list = set(rng.choice(vocab_size, green_count, replace=False))
# Vote: is this token in the green list?
is_green = token_ids[pos] in green_list
bit_votes[bit_index].append(1 if is_green else 0)
# Majority vote for each bit
decoded_bits = []
confidence_scores = []
for i in range(payload_bits):
votes = bit_votes[i]
if votes:
avg = np.mean(votes)
decoded_bits.append(1 if avg > self.gamma + 0.05 else 0)
confidence_scores.append(abs(avg - self.gamma))
else:
decoded_bits.append(0)
confidence_scores.append(0)
# Reconstruct payload
payload_bytes = self._bits_to_bytes(decoded_bits)
return {
"payload": payload_bytes,
"provenance": self._decode_payload(payload_bytes),
"mean_confidence": np.mean(confidence_scores),
"min_confidence": np.min(confidence_scores),
"reliable": np.mean(confidence_scores) > 0.1,
}
def _bits_to_bytes(self, bits: list[int]) -> bytes:
result = bytearray()
for i in range(0, len(bits), 8):
byte = 0
for j in range(8):
if i + j < len(bits):
byte = (byte << 1) | bits[i + j]
result.append(byte)
return bytes(result)
def _decode_payload(self, payload: bytes) -> dict:
import struct
try:
model_id, deployment_id, user_hash, timestamp = struct.unpack(
">HHIH", payload[:10]
)
return {
"model_id": model_id,
"deployment_id": deployment_id,
"user_hash": user_hash,
"timestamp": timestamp,
}
except struct.error:
return {"error": "Payload decode failed"}Deployment Architecture Patterns
Pattern 1: Inline Watermarking
Watermark is applied during inference as a logits processor:
# Integration with vLLM serving
from vllm import LLM, SamplingParams
class WatermarkedLLM:
"""Wrapper that applies provenance watermarking during inference."""
def __init__(self, model_name: str, watermark_key: bytes):
self.llm = LLM(model=model_name)
self.watermark = ProvenanceWatermark(secret_key=watermark_key)
def generate(
self,
prompt: str,
user_id: str,
deployment_id: int,
max_tokens: int = 512,
) -> dict:
provenance_payload = self.watermark.encode_provenance(
model_id=1,
deployment_id=deployment_id,
user_hash=hash(user_id) & 0xFFFFFFFF,
timestamp=int(time.time()) & 0xFFFF,
)
params = SamplingParams(
max_tokens=max_tokens,
logits_processors=[
lambda token_ids, logits: self.watermark.apply_watermark(
logits, token_ids, provenance_payload, len(token_ids)
)
],
)
outputs = self.llm.generate([prompt], params)
return {
"text": outputs[0].outputs[0].text,
"watermark_payload": provenance_payload.hex(),
}Pattern 2: Post-Generation Watermarking
Apply watermark by rewriting generated text (less precise but model-agnostic):
| Approach | Quality Impact | Robustness | Bit Capacity | Latency Impact |
|---|---|---|---|---|
| Inline (logits processor) | Minimal at low delta | Moderate | High (multi-bit) | Low (~5% overhead) |
| Post-generation rewriting | Moderate | Low | Low (few bits) | High (second pass) |
| Semantic embedding | Low | High | Low-moderate | Moderate |
| Steganographic | Very low | Very high | Low | Moderate |
Robustness Considerations
Threat Model for Provenance Watermarks
Threat Level 1: Casual removal
- Simple paraphrasing, light editing
- Defense: standard token-level watermarks survive
Threat Level 2: Informed removal
- Attacker knows watermark is present, uses paraphrase model
- Defense: semantic watermarks, multi-layer encoding
Threat Level 3: Targeted removal
- Attacker knows the scheme and has detection access
- Defense: dynamic key rotation, multiple encoding layers
Threat Level 4: Adaptive removal
- Attacker has white-box access to watermark algorithm
- Defense: unbiased watermarks (Christ et al.), moving to architectural solutions
Robustness vs. Capacity Trade-off
def analyze_robustness_capacity_tradeoff(
text_length_tokens: int,
payload_bits: int,
gamma: float = 0.5,
target_confidence: float = 0.99,
):
"""
Calculate the relationship between watermark capacity,
text length, and detection reliability.
More payload bits require more text for reliable extraction.
"""
from scipy import stats
# Each payload bit gets text_length / payload_bits votes
votes_per_bit = text_length_tokens / payload_bits
# Under watermarked distribution, green fraction = gamma + shift
# For reliable detection, we need sufficient votes per bit
min_votes_needed = stats.norm.ppf(target_confidence) ** 2 / (0.1 ** 2)
return {
"text_length": text_length_tokens,
"payload_bits": payload_bits,
"votes_per_bit": votes_per_bit,
"min_votes_needed": min_votes_needed,
"sufficient_text": votes_per_bit >= min_votes_needed,
"min_text_for_payload": int(min_votes_needed * payload_bits),
}
# Example: 80-bit provenance payload
# analyze_robustness_capacity_tradeoff(1000, 80)
# → votes_per_bit: 12.5, may be insufficient
# analyze_robustness_capacity_tradeoff(5000, 80)
# → votes_per_bit: 62.5, likely sufficientIntegration with AI Governance
Regulatory Requirements
Emerging AI regulations increasingly require output traceability:
| Regulation | Watermarking Relevance | Requirement |
|---|---|---|
| EU AI Act (Article 50) | High-risk AI outputs must be identifiable | Machine-readable marking of AI-generated content |
| US Executive Order 14110 | Content authentication standards | NIST standards for AI content authentication |
| China AI Regulations | AI-generated content marking | Mandatory marking of AI-generated text, image, video |
| C2PA Standard | Content provenance | Cryptographic provenance for digital content |
Compliance Architecture
class ComplianceWatermarkService:
"""
Watermarking service designed to meet regulatory compliance
requirements for AI-generated content provenance.
"""
def __init__(self, config: dict):
self.watermark = ProvenanceWatermark(
secret_key=config["watermark_key"],
delta=config.get("delta", 1.5),
)
self.audit_log = config["audit_log_backend"]
def generate_with_provenance(
self,
llm,
prompt: str,
user_context: dict,
) -> dict:
"""Generate watermarked output with full audit trail."""
# Create provenance record
provenance = {
"request_id": generate_uuid(),
"timestamp": datetime.utcnow().isoformat(),
"model_id": llm.model_id,
"deployment": llm.deployment_id,
"user_id_hash": hash_user_id(user_context["user_id"]),
"safety_policy_version": llm.safety_policy_version,
}
# Generate with watermark
output = llm.generate(
prompt=prompt,
watermark_payload=self.watermark.encode_provenance(**provenance),
)
# Log to immutable audit store
self.audit_log.write({
**provenance,
"prompt_hash": hashlib.sha256(prompt.encode()).hexdigest(),
"output_hash": hashlib.sha256(output["text"].encode()).hexdigest(),
"output_length_tokens": output["token_count"],
})
return {
"text": output["text"],
"provenance_id": provenance["request_id"],
"compliance_record": True,
}Key Management for Watermarking
Key Rotation and Lifecycle
Watermark keys must be managed with the same rigor as cryptographic keys:
class WatermarkKeyManager:
"""Manage watermark key lifecycle including rotation and revocation."""
def __init__(self, key_store):
self.key_store = key_store
def rotate_key(self, deployment_id: str) -> dict:
"""
Rotate watermark key for a deployment.
Old key must be retained for verification of previously
watermarked content.
"""
import secrets
new_key = secrets.token_bytes(32)
old_key = self.key_store.get_current_key(deployment_id)
# Archive old key with expiration metadata
self.key_store.archive_key(
deployment_id=deployment_id,
key=old_key,
retired_at=datetime.utcnow(),
verify_until=datetime.utcnow() + timedelta(days=365),
)
# Activate new key
self.key_store.set_current_key(
deployment_id=deployment_id,
key=new_key,
activated_at=datetime.utcnow(),
)
return {
"deployment_id": deployment_id,
"key_rotated": True,
"old_key_archived": True,
}Limitations and Honest Assessment
| Capability | Reality |
|---|---|
| Casual misuse prevention | Effective — casual users will not attempt removal |
| Sophisticated attacker deterrence | Limited — paraphrase attacks are straightforward |
| Forensic attribution | Useful if text is not heavily modified |
| Regulatory compliance | Meets marking requirements if combined with audit logs |
| Proof of generation | Not cryptographically binding — can be questioned legally |
Related Topics
- Watermarking & AI-Generated Text Detection -- red team perspective on attacking watermarks
- Advanced Defense Techniques -- broader advanced defense landscape
- Defense Benchmarking -- measuring watermark effectiveness
- Input/Output Filtering Systems -- complementary output controls
- Data Breach Investigation for AI -- when provenance data supports incident response
References
- Kirchenbauer et al., "A Watermark for Large Language Models" (2023) - Foundational token-level watermarking research
- Christ et al., "Unbiased Watermark for Large Language Models" (2024) - Provably unbiased watermarking with zero quality impact
- Zhao et al., "Provable Robust Watermarking for AI-Generated Text" (2024) - Robustness guarantees for watermarking against text modifications
- C2PA (Coalition for Content Provenance and Authenticity) (2025) - Industry standard for digital content provenance
- EU AI Act, Article 50 (2024) - Regulatory requirements for AI-generated content marking and traceability
What is the primary difference between binary detection watermarking and provenance watermarking for LLM outputs?