| 1 | """ |
| 2 | oversight_core.fingerprint |
| 3 | ========================== |
| 4 | |
| 5 | Content fingerprinting for leak detection when watermarks are stripped. |
| 6 | |
| 7 | Two independent fingerprinting methods: |
| 8 | |
| 9 | 1. Winnowing (Schleimer, Wilkerson, Aiken, SIGMOD 2003) |
| 10 | Computes rolling hash fingerprints over k-grams of the text. |
| 11 | Selects a subset via the winnowing algorithm (minimum hash in each window). |
| 12 | Enables partial-copy detection for near-verbatim leaks. |
| 13 | Does NOT survive paraphrasing. |
| 14 | |
| 15 | 2. Semantic sentence hashing |
| 16 | Hashes normalized, lemmatized sentences. More robust than winnowing |
| 17 | to minor word changes because it operates on content words only. |
| 18 | Survives format conversion, minor edits, whitespace changes. |
| 19 | Does NOT survive heavy paraphrasing. |
| 20 | |
| 21 | Both methods produce fingerprints stored at seal time (in the manifest or |
| 22 | registry). At attribution time, fingerprints of the leaked text are compared |
| 23 | against stored fingerprints to identify which recipient's copy was leaked. |
| 24 | |
| 25 | The fingerprint DB is NOT a watermark. It is a server-side identification |
| 26 | system. The fingerprints never appear in the document itself. An adversary |
| 27 | cannot strip what is not embedded. |
| 28 | """ |
| 29 | |
| 30 | from __future__ import annotations |
| 31 | |
| 32 | import hashlib |
| 33 | import re |
| 34 | from typing import Optional |
| 35 | |
| 36 | |
| 37 | |
| 38 | def _normalize_text(text: str) -> str: |
| 39 | """Normalize text for fingerprinting: lowercase, collapse whitespace, strip punctuation.""" |
| 40 | text = text.lower() |
| 41 | for ch in "\u200b\u200c\u200d\ufeff": |
| 42 | text = text.replace(ch, "") |
| 43 | text = re.sub(r"\s+", " ", text) |
| 44 | text = re.sub(r"[^a-z0-9 ]", "", text) |
| 45 | return text.strip() |
| 46 | |
| 47 | |
| 48 | def _sentences(text: str) -> list[str]: |
| 49 | """Split text into sentences using simple heuristics.""" |
| 50 | parts = re.split(r"(?<=[.!?])\s+", text) |
| 51 | return [s.strip() for s in parts if s.strip()] |
| 52 | |
| 53 | |
| 54 | |
| 55 | def _rolling_hash(text: str, k: int) -> list[tuple[int, int]]: |
| 56 | """Compute rolling hashes for all k-grams. Returns (hash, position) pairs.""" |
| 57 | if len(text) < k: |
| 58 | return [] |
| 59 | hashes = [] |
| 60 | for i in range(len(text) - k + 1): |
| 61 | kgram = text[i : i + k] |
| 62 | h = int(hashlib.md5(kgram.encode(), usedforsecurity=False).hexdigest()[:8], 16) |
| 63 | hashes.append((h, i)) |
| 64 | return hashes |
| 65 | |
| 66 | |
| 67 | def winnow(text: str, k: int = 10, window: int = 4) -> list[int]: |
| 68 | """ |
| 69 | Winnowing algorithm for document fingerprinting. |
| 70 | |
| 71 | Args: |
| 72 | text: input text (will be normalized) |
| 73 | k: k-gram size (character-level) |
| 74 | window: winnowing window size |
| 75 | |
| 76 | Returns: |
| 77 | sorted list of selected hash values (the fingerprint) |
| 78 | """ |
| 79 | normalized = _normalize_text(text) |
| 80 | if len(normalized) < k: |
| 81 | return [] |
| 82 | |
| 83 | hashes = _rolling_hash(normalized, k) |
| 84 | if len(hashes) < window: |
| 85 | return [h for h, _ in hashes] |
| 86 | |
| 87 | selected = set() |
| 88 | prev_min_idx = -1 |
| 89 | |
| 90 | for i in range(len(hashes) - window + 1): |
| 91 | window_hashes = hashes[i : i + window] |
| 92 | min_h = min(h for h, _ in window_hashes) |
| 93 | for j in range(len(window_hashes) - 1, -1, -1): |
| 94 | if window_hashes[j][0] == min_h: |
| 95 | abs_idx = i + j |
| 96 | if abs_idx != prev_min_idx: |
| 97 | selected.add(window_hashes[j][0]) |
| 98 | prev_min_idx = abs_idx |
| 99 | break |
| 100 | |
| 101 | return sorted(selected) |
| 102 | |
| 103 | |
| 104 | def winnow_similarity(fp1: list[int], fp2: list[int]) -> float: |
| 105 | """Jaccard similarity between two winnowing fingerprints.""" |
| 106 | if not fp1 or not fp2: |
| 107 | return 0.0 |
| 108 | s1 = set(fp1) |
| 109 | s2 = set(fp2) |
| 110 | intersection = len(s1 & s2) |
| 111 | union = len(s1 | s2) |
| 112 | return intersection / union if union > 0 else 0.0 |
| 113 | |
| 114 | |
| 115 | |
| 116 | def _sentence_hash(sentence: str) -> str: |
| 117 | """Hash a normalized sentence. Returns hex string.""" |
| 118 | normalized = _normalize_text(sentence) |
| 119 | words = [w for w in normalized.split() if len(w) > 2] |
| 120 | content = " ".join(sorted(words)) |
| 121 | return hashlib.sha256(content.encode()).hexdigest()[:16] |
| 122 | |
| 123 | |
| 124 | def sentence_fingerprint(text: str) -> list[str]: |
| 125 | """ |
| 126 | Compute per-sentence content hashes. |
| 127 | |
| 128 | Returns list of 16-char hex hashes, one per sentence. |
| 129 | Order-independent within each sentence (sorted words) so minor |
| 130 | word reordering does not change the hash. |
| 131 | """ |
| 132 | sents = _sentences(text) |
| 133 | return [_sentence_hash(s) for s in sents if len(s.split()) >= 3] |
| 134 | |
| 135 | |
| 136 | def sentence_similarity(fp1: list[str], fp2: list[str]) -> float: |
| 137 | """Fraction of sentence hashes in fp2 that appear in fp1.""" |
| 138 | if not fp1 or not fp2: |
| 139 | return 0.0 |
| 140 | s1 = set(fp1) |
| 141 | matches = sum(1 for h in fp2 if h in s1) |
| 142 | return matches / len(fp2) |
| 143 | |
| 144 | |
| 145 | |
| 146 | class ContentFingerprint: |
| 147 | """Combined winnowing + sentence fingerprint for a document.""" |
| 148 | |
| 149 | def __init__( |
| 150 | self, |
| 151 | winnowing_fp: list[int], |
| 152 | sentence_fp: list[str], |
| 153 | text_length: int, |
| 154 | sentence_count: int, |
| 155 | ): |
| 156 | self.winnowing_fp = winnowing_fp |
| 157 | self.sentence_fp = sentence_fp |
| 158 | self.text_length = text_length |
| 159 | self.sentence_count = sentence_count |
| 160 | |
| 161 | @classmethod |
| 162 | def from_text(cls, text: str, k: int = 10, window: int = 4) -> "ContentFingerprint": |
| 163 | """Create a fingerprint from text.""" |
| 164 | return cls( |
| 165 | winnowing_fp=winnow(text, k, window), |
| 166 | sentence_fp=sentence_fingerprint(text), |
| 167 | text_length=len(text), |
| 168 | sentence_count=len(_sentences(text)), |
| 169 | ) |
| 170 | |
| 171 | def similarity(self, other: "ContentFingerprint") -> dict: |
| 172 | """Compare this fingerprint against another. Returns per-method scores.""" |
| 173 | w_sim = winnow_similarity(self.winnowing_fp, other.winnowing_fp) |
| 174 | s_sim = sentence_similarity(self.sentence_fp, other.sentence_fp) |
| 175 | combined = 0.4 * w_sim + 0.6 * s_sim |
| 176 | return { |
| 177 | "winnowing": w_sim, |
| 178 | "sentence": s_sim, |
| 179 | "combined": combined, |
| 180 | "verdict": ( |
| 181 | "MATCH" if combined >= 0.6 |
| 182 | else "LIKELY" if combined >= 0.3 |
| 183 | else "UNLIKELY" if combined >= 0.1 |
| 184 | else "NO_MATCH" |
| 185 | ), |
| 186 | } |
| 187 | |
| 188 | def to_dict(self) -> dict: |
| 189 | """Serialize for storage in manifest/registry.""" |
| 190 | return { |
| 191 | "winnowing_fp": self.winnowing_fp, |
| 192 | "sentence_fp": self.sentence_fp, |
| 193 | "text_length": self.text_length, |
| 194 | "sentence_count": self.sentence_count, |
| 195 | } |
| 196 | |
| 197 | @classmethod |
| 198 | def from_dict(cls, d: dict) -> "ContentFingerprint": |
| 199 | """Deserialize from stored dict.""" |
| 200 | return cls( |
| 201 | winnowing_fp=d["winnowing_fp"], |
| 202 | sentence_fp=d["sentence_fp"], |
| 203 | text_length=d.get("text_length", 0), |
| 204 | sentence_count=d.get("sentence_count", 0), |
| 205 | ) |