為來源追蹤而對 LLM 輸出加浮水印
進階9 分鐘閱讀更新於 2026-03-15
以進階技術為 LLM 生成文字加浮水印以建立來源記錄,涵蓋部署架構、多位元編碼方案、穩健性考量,以及浮水印在 AI 安全與問責框架中的角色。
為來源追蹤而設計的輸出浮水印超越單純偵測 AI 生成文字的用途,旨在回答:這段文字由哪個模型生成、何時、為了哪位使用者、透過哪個部署?此來源資訊對 AI 系統問責、誤用鑑識調查,以及符合日益要求 AI 生成內容可追溯的新興法規皆至關重要。
來源追蹤浮水印架構
系統元件
┌────────────────────────────────────────────────────────────┐
│ Watermarked LLM Serving System │
│ │
│ ┌──────────┐ ┌──────────────┐ ┌────────────────────┐ │
│ │ User │ │ Provenance │ │ LLM Inference │ │
│ │ Request │──▶│ Metadata │──▶│ + Watermark │ │
│ │ │ │ Generator │ │ Injection │ │
│ └──────────┘ └──────────────┘ └─────────┬──────────┘ │
│ │ │
│ ┌─────────▼──────────┐ │
│ │ Watermarked Output │ │
│ └─────────┬──────────┘ │
│ │ │
│ ┌──────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────────────────────────────┐│
│ │ Key Store │ │ Verification Service ││
│ │ (secrets) │──▶│ - Extract watermark from text ││
│ │ │ │ - Decode provenance metadata ││
│ │ │ │ - Verify against key store ││
│ └──────────────┘ └──────────────────────────────────────┘│
└────────────────────────────────────────────────────────────┘
來源中繼資料欄位
| 欄位 | 所需位元 | 用途 | 範例 |
|---|---|---|---|
| 模型 ID | 8-16 | 辨識模型版本 | GPT-4-turbo-2025-01 |
| 部署 ID | 8-16 | 辨識服務環境 | prod-us-east-1 |
| 使用者/會話 ID | 16-32 | 歸因至特定使用者 | 使用者 ID 雜湊 |
| 時間戳 | 16-32 | 文字生成時間 | 截短的 Unix epoch |
| 請求 ID | 16-32 | 連結特定 API 呼叫 | 請求雜湊 |
| 政策版本 | 4-8 | 當時生效的安全政策 | v2.3 |
| 合計 | 68-136 位元 |
多位元浮水印方案
於符元選擇中編碼來源資訊
import hashlib
import numpy as np
from typing import Optional
class ProvenanceWatermark:
"""
Multi-bit watermarking scheme that encodes provenance metadata
into LLM output through biased token selection.
"""
def __init__(
self,
secret_key: bytes,
gamma: float = 0.5,
delta: float = 1.5,
window_size: int = 4,
):
self.secret_key = secret_key
self.gamma = gamma
self.delta = delta
self.window_size = window_size
def encode_provenance(
self,
model_id: int,
deployment_id: int,
user_hash: int,
timestamp: int,
) -> bytes:
"""Pack provenance metadata into a binary payload."""
import struct
payload = struct.pack(
">HHIH",
model_id & 0xFFFF,
deployment_id & 0xFFFF,
user_hash & 0xFFFFFFFF,
timestamp & 0xFFFF,
)
return payload
def get_bit_assignment(
self,
prev_tokens: list[int],
vocab_size: int,
bit_value: int,
) -> set:
"""
Partition vocabulary based on the bit to encode.
For bit=1, bias toward green list.
For bit=0, bias toward red list (or no bias).
"""
seed = hashlib.sha256(
self.secret_key
+ bytes(prev_tokens[-self.window_size:])
).digest()
rng = np.random.RandomState(int.from_bytes(seed[:4], "big"))
green_count = int(vocab_size * self.gamma)
green_list = set(rng.choice(vocab_size, green_count, replace=False))
if bit_value == 1:
return green_list # Bias toward these tokens
else:
return set() # No bias (natural distribution)
def apply_watermark(
self,
logits: np.ndarray,
prev_tokens: list[int],
payload: bytes,
token_position: int,
) -> np.ndarray:
"""Apply watermark bias to logits based on current payload bit."""
# Determine which bit to encode at this position
bit_index = token_position % (len(payload) * 8)
byte_index = bit_index // 8
bit_offset = bit_index % 8
current_bit = (payload[byte_index] >> (7 - bit_offset)) & 1
green_list = self.get_bit_assignment(
prev_tokens, len(logits), current_bit
)
modified = logits.copy()
for token_id in green_list:
modified[token_id] += self.delta
return modified驗證與解碼
class ProvenanceVerifier:
"""Verify and decode provenance watermarks from text."""
def __init__(self, secret_key: bytes, gamma: float = 0.5, window_size: int = 4):
self.secret_key = secret_key
self.gamma = gamma
self.window_size = window_size
def extract_provenance(
self,
token_ids: list[int],
vocab_size: int,
payload_bits: int = 80,
) -> dict:
"""
Extract encoded provenance from watermarked text.
Uses majority voting across multiple repetitions of the payload.
"""
bit_votes = {i: [] for i in range(payload_bits)}
for pos in range(self.window_size, len(token_ids)):
prev_tokens = token_ids[pos - self.window_size:pos]
bit_index = (pos - self.window_size) % payload_bits
# Reconstruct green list for this position
seed = hashlib.sha256(
self.secret_key + bytes(prev_tokens)
).digest()
rng = np.random.RandomState(int.from_bytes(seed[:4], "big"))
green_count = int(vocab_size * self.gamma)
green_list = set(rng.choice(vocab_size, green_count, replace=False))
# Vote: is this token in the green list?
is_green = token_ids[pos] in green_list
bit_votes[bit_index].append(1 if is_green else 0)
# Majority vote for each bit
decoded_bits = []
confidence_scores = []
for i in range(payload_bits):
votes = bit_votes[i]
if votes:
avg = np.mean(votes)
decoded_bits.append(1 if avg > self.gamma + 0.05 else 0)
confidence_scores.append(abs(avg - self.gamma))
else:
decoded_bits.append(0)
confidence_scores.append(0)
# Reconstruct payload
payload_bytes = self._bits_to_bytes(decoded_bits)
return {
"payload": payload_bytes,
"provenance": self._decode_payload(payload_bytes),
"mean_confidence": np.mean(confidence_scores),
"min_confidence": np.min(confidence_scores),
"reliable": np.mean(confidence_scores) > 0.1,
}
def _bits_to_bytes(self, bits: list[int]) -> bytes:
result = bytearray()
for i in range(0, len(bits), 8):
byte = 0
for j in range(8):
if i + j < len(bits):
byte = (byte << 1) | bits[i + j]
result.append(byte)
return bytes(result)
def _decode_payload(self, payload: bytes) -> dict:
import struct
try:
model_id, deployment_id, user_hash, timestamp = struct.unpack(
">HHIH", payload[:10]
)
return {
"model_id": model_id,
"deployment_id": deployment_id,
"user_hash": user_hash,
"timestamp": timestamp,
}
except struct.error:
return {"error": "Payload decode failed"}部署架構模式
模式 1:內嵌式浮水印
浮水印於推論過程中作為 logits processor 套用:
# Integration with vLLM serving
from vllm import LLM, SamplingParams
class WatermarkedLLM:
"""Wrapper that applies provenance watermarking during inference."""
def __init__(self, model_name: str, watermark_key: bytes):
self.llm = LLM(model=model_name)
self.watermark = ProvenanceWatermark(secret_key=watermark_key)
def generate(
self,
prompt: str,
user_id: str,
deployment_id: int,
max_tokens: int = 512,
) -> dict:
provenance_payload = self.watermark.encode_provenance(
model_id=1,
deployment_id=deployment_id,
user_hash=hash(user_id) & 0xFFFFFFFF,
timestamp=int(time.time()) & 0xFFFF,
)
params = SamplingParams(
max_tokens=max_tokens,
logits_processors=[
lambda token_ids, logits: self.watermark.apply_watermark(
logits, token_ids, provenance_payload, len(token_ids)
)
],
)
outputs = self.llm.generate([prompt], params)
return {
"text": outputs[0].outputs[0].text,
"watermark_payload": provenance_payload.hex(),
}模式 2:生成後浮水印
藉由改寫已生成文字套用浮水印(精度較低但與模型無關):
| 方法 | 品質影響 | 穩健性 | 位元容量 | 延遲影響 |
|---|---|---|---|---|
| 內嵌式 (logits processor) | 低 delta 下極小 | 中 | 高 (多位元) | 低 (~5% overhead) |
| 生成後改寫 | 中 | 低 | 低 (少位元) | 高 (二次處理) |
| 語意嵌入 | 低 | 高 | 低至中 | 中 |
| 隱寫術式 | 非常低 | 非常高 | 低 | 中 |
穩健性考量
來源浮水印的威脅模型
Threat Level 1: Casual removal
- Simple paraphrasing, light editing
- Defense: standard token-level watermarks survive
Threat Level 2: Informed removal
- Attacker knows watermark is present, uses paraphrase model
- Defense: semantic watermarks, multi-layer encoding
Threat Level 3: Targeted removal
- Attacker knows the scheme and has detection access
- Defense: dynamic key rotation, multiple encoding layers
Threat Level 4: Adaptive removal
- Attacker has white-box access to watermark algorithm
- Defense: unbiased watermarks (Christ et al.), moving to architectural solutions
穩健性對容量的取捨
def analyze_robustness_capacity_tradeoff(
text_length_tokens: int,
payload_bits: int,
gamma: float = 0.5,
target_confidence: float = 0.99,
):
"""
Calculate the relationship between watermark capacity,
text length, and detection reliability.
More payload bits require more text for reliable extraction.
"""
from scipy import stats
# Each payload bit gets text_length / payload_bits votes
votes_per_bit = text_length_tokens / payload_bits
# Under watermarked distribution, green fraction = gamma + shift
# For reliable detection, we need sufficient votes per bit
min_votes_needed = stats.norm.ppf(target_confidence) ** 2 / (0.1 ** 2)
return {
"text_length": text_length_tokens,
"payload_bits": payload_bits,
"votes_per_bit": votes_per_bit,
"min_votes_needed": min_votes_needed,
"sufficient_text": votes_per_bit >= min_votes_needed,
"min_text_for_payload": int(min_votes_needed * payload_bits),
}
# Example: 80-bit provenance payload
# analyze_robustness_capacity_tradeoff(1000, 80)
# → votes_per_bit: 12.5, may be insufficient
# analyze_robustness_capacity_tradeoff(5000, 80)
# → votes_per_bit: 62.5, likely sufficient與 AI 治理的整合
法規要求
新興 AI 法規日益要求輸出可追溯性:
| 法規 | 與浮水印的相關性 | 要求 |
|---|---|---|
| EU AI Act (Article 50) | 高風險 AI 輸出必須可識別 | AI 生成內容之機器可讀標記 |
| US Executive Order 14110 | 內容認證標準 | NIST 的 AI 內容認證標準 |
| 中國 AI 法規 | AI 生成內容標記 | 強制標記 AI 生成文字、影像、影片 |
| C2PA 標準 | 內容來源 | 數位內容的密碼學來源記錄 |
合規架構
class ComplianceWatermarkService:
"""
Watermarking service designed to meet regulatory compliance
requirements for AI-generated content provenance.
"""
def __init__(self, config: dict):
self.watermark = ProvenanceWatermark(
secret_key=config["watermark_key"],
delta=config.get("delta", 1.5),
)
self.audit_log = config["audit_log_backend"]
def generate_with_provenance(
self,
llm,
prompt: str,
user_context: dict,
) -> dict:
"""Generate watermarked output with full audit trail."""
# Create provenance record
provenance = {
"request_id": generate_uuid(),
"timestamp": datetime.utcnow().isoformat(),
"model_id": llm.model_id,
"deployment": llm.deployment_id,
"user_id_hash": hash_user_id(user_context["user_id"]),
"safety_policy_version": llm.safety_policy_version,
}
# Generate with watermark
output = llm.generate(
prompt=prompt,
watermark_payload=self.watermark.encode_provenance(**provenance),
)
# Log to immutable audit store
self.audit_log.write({
**provenance,
"prompt_hash": hashlib.sha256(prompt.encode()).hexdigest(),
"output_hash": hashlib.sha256(output["text"].encode()).hexdigest(),
"output_length_tokens": output["token_count"],
})
return {
"text": output["text"],
"provenance_id": provenance["request_id"],
"compliance_record": True,
}浮水印的金鑰管理
金鑰輪替與生命週期
浮水印金鑰必須以密碼學金鑰等級的嚴謹度管理:
class WatermarkKeyManager:
"""Manage watermark key lifecycle including rotation and revocation."""
def __init__(self, key_store):
self.key_store = key_store
def rotate_key(self, deployment_id: str) -> dict:
"""
Rotate watermark key for a deployment.
Old key must be retained for verification of previously
watermarked content.
"""
import secrets
new_key = secrets.token_bytes(32)
old_key = self.key_store.get_current_key(deployment_id)
# Archive old key with expiration metadata
self.key_store.archive_key(
deployment_id=deployment_id,
key=old_key,
retired_at=datetime.utcnow(),
verify_until=datetime.utcnow() + timedelta(days=365),
)
# Activate new key
self.key_store.set_current_key(
deployment_id=deployment_id,
key=new_key,
activated_at=datetime.utcnow(),
)
return {
"deployment_id": deployment_id,
"key_rotated": True,
"old_key_archived": True,
}限制與誠實評估
| 能力 | 現實 |
|---|---|
| 預防一般濫用 | 有效——一般使用者不會嘗試移除 |
| 嚇阻老練攻擊者 | 有限——改寫攻擊相當直接 |
| 鑑識歸因 | 若文字未大量修改則有用 |
| 法規合規 | 配合稽核日誌可滿足標記要求 |
| 生成證明 | 非密碼學上具約束力——法律上可被質疑 |
相關主題
- 浮水印與 AI 生成文字偵測 —— 攻擊浮水印的紅隊觀點
- 進階防禦技術 —— 更廣泛的進階防禦情勢
- 防禦基準測試 —— 衡量浮水印有效性
- 輸入/輸出過濾系統 —— 互補的輸出控制
- AI 的資料外洩調查 —— 來源資料支援事件回應之情境
參考資料
- Kirchenbauer et al., "A Watermark for Large Language Models" (2023) —— 符元層級浮水印的奠基研究
- Christ et al., "Unbiased Watermark for Large Language Models" (2024) —— 可證明無偏、零品質影響的浮水印
- Zhao et al., "Provable Robust Watermarking for AI-Generated Text" (2024) —— 對文字修改提供穩健性保證之浮水印
- C2PA (Coalition for Content Provenance and Authenticity) (2025) —— 數位內容來源的業界標準
- EU AI Act, Article 50 (2024) —— AI 生成內容標記與可追溯性的監管要求
Knowledge Check
二元偵測浮水印與 LLM 輸出來源追蹤浮水印之間的主要差異為何?