Zion Boggan
repos/Oversight/oversight_core/fingerprint.py
zionboggan.com ↗
205 lines · python
History for this file →
1
"""
2
oversight_core.fingerprint
3
==========================
4
 
5
Content fingerprinting for leak detection when watermarks are stripped.
6
 
7
Two independent fingerprinting methods:
8
 
9
1. Winnowing (Schleimer, Wilkerson, Aiken, SIGMOD 2003)
10
   Computes rolling hash fingerprints over k-grams of the text.
11
   Selects a subset via the winnowing algorithm (minimum hash in each window).
12
   Enables partial-copy detection for near-verbatim leaks.
13
   Does NOT survive paraphrasing.
14
 
15
2. Semantic sentence hashing
16
   Hashes normalized, lemmatized sentences. More robust than winnowing
17
   to minor word changes because it operates on content words only.
18
   Survives format conversion, minor edits, whitespace changes.
19
   Does NOT survive heavy paraphrasing.
20
 
21
Both methods produce fingerprints stored at seal time (in the manifest or
22
registry). At attribution time, fingerprints of the leaked text are compared
23
against stored fingerprints to identify which recipient's copy was leaked.
24
 
25
The fingerprint DB is NOT a watermark. It is a server-side identification
26
system. The fingerprints never appear in the document itself. An adversary
27
cannot strip what is not embedded.
28
"""
29
 
30
from __future__ import annotations
31
 
32
import hashlib
33
import re
34
from typing import Optional
35
 
36
 
37
 
38
def _normalize_text(text: str) -> str:
39
    """Normalize text for fingerprinting: lowercase, collapse whitespace, strip punctuation."""
40
    text = text.lower()
41
    for ch in "\u200b\u200c\u200d\ufeff":
42
        text = text.replace(ch, "")
43
    text = re.sub(r"\s+", " ", text)
44
    text = re.sub(r"[^a-z0-9 ]", "", text)
45
    return text.strip()
46
 
47
 
48
def _sentences(text: str) -> list[str]:
49
    """Split text into sentences using simple heuristics."""
50
    parts = re.split(r"(?<=[.!?])\s+", text)
51
    return [s.strip() for s in parts if s.strip()]
52
 
53
 
54
 
55
def _rolling_hash(text: str, k: int) -> list[tuple[int, int]]:
56
    """Compute rolling hashes for all k-grams. Returns (hash, position) pairs."""
57
    if len(text) < k:
58
        return []
59
    hashes = []
60
    for i in range(len(text) - k + 1):
61
        kgram = text[i : i + k]
62
        h = int(hashlib.md5(kgram.encode(), usedforsecurity=False).hexdigest()[:8], 16)
63
        hashes.append((h, i))
64
    return hashes
65
 
66
 
67
def winnow(text: str, k: int = 10, window: int = 4) -> list[int]:
68
    """
69
    Winnowing algorithm for document fingerprinting.
70
 
71
    Args:
72
        text: input text (will be normalized)
73
        k: k-gram size (character-level)
74
        window: winnowing window size
75
 
76
    Returns:
77
        sorted list of selected hash values (the fingerprint)
78
    """
79
    normalized = _normalize_text(text)
80
    if len(normalized) < k:
81
        return []
82
 
83
    hashes = _rolling_hash(normalized, k)
84
    if len(hashes) < window:
85
        return [h for h, _ in hashes]
86
 
87
    selected = set()
88
    prev_min_idx = -1
89
 
90
    for i in range(len(hashes) - window + 1):
91
        window_hashes = hashes[i : i + window]
92
        min_h = min(h for h, _ in window_hashes)
93
        for j in range(len(window_hashes) - 1, -1, -1):
94
            if window_hashes[j][0] == min_h:
95
                abs_idx = i + j
96
                if abs_idx != prev_min_idx:
97
                    selected.add(window_hashes[j][0])
98
                    prev_min_idx = abs_idx
99
                break
100
 
101
    return sorted(selected)
102
 
103
 
104
def winnow_similarity(fp1: list[int], fp2: list[int]) -> float:
105
    """Jaccard similarity between two winnowing fingerprints."""
106
    if not fp1 or not fp2:
107
        return 0.0
108
    s1 = set(fp1)
109
    s2 = set(fp2)
110
    intersection = len(s1 & s2)
111
    union = len(s1 | s2)
112
    return intersection / union if union > 0 else 0.0
113
 
114
 
115
 
116
def _sentence_hash(sentence: str) -> str:
117
    """Hash a normalized sentence. Returns hex string."""
118
    normalized = _normalize_text(sentence)
119
    words = [w for w in normalized.split() if len(w) > 2]
120
    content = " ".join(sorted(words))
121
    return hashlib.sha256(content.encode()).hexdigest()[:16]
122
 
123
 
124
def sentence_fingerprint(text: str) -> list[str]:
125
    """
126
    Compute per-sentence content hashes.
127
 
128
    Returns list of 16-char hex hashes, one per sentence.
129
    Order-independent within each sentence (sorted words) so minor
130
    word reordering does not change the hash.
131
    """
132
    sents = _sentences(text)
133
    return [_sentence_hash(s) for s in sents if len(s.split()) >= 3]
134
 
135
 
136
def sentence_similarity(fp1: list[str], fp2: list[str]) -> float:
137
    """Fraction of sentence hashes in fp2 that appear in fp1."""
138
    if not fp1 or not fp2:
139
        return 0.0
140
    s1 = set(fp1)
141
    matches = sum(1 for h in fp2 if h in s1)
142
    return matches / len(fp2)
143
 
144
 
145
 
146
class ContentFingerprint:
147
    """Combined winnowing + sentence fingerprint for a document."""
148
 
149
    def __init__(
150
        self,
151
        winnowing_fp: list[int],
152
        sentence_fp: list[str],
153
        text_length: int,
154
        sentence_count: int,
155
    ):
156
        self.winnowing_fp = winnowing_fp
157
        self.sentence_fp = sentence_fp
158
        self.text_length = text_length
159
        self.sentence_count = sentence_count
160
 
161
    @classmethod
162
    def from_text(cls, text: str, k: int = 10, window: int = 4) -> "ContentFingerprint":
163
        """Create a fingerprint from text."""
164
        return cls(
165
            winnowing_fp=winnow(text, k, window),
166
            sentence_fp=sentence_fingerprint(text),
167
            text_length=len(text),
168
            sentence_count=len(_sentences(text)),
169
        )
170
 
171
    def similarity(self, other: "ContentFingerprint") -> dict:
172
        """Compare this fingerprint against another. Returns per-method scores."""
173
        w_sim = winnow_similarity(self.winnowing_fp, other.winnowing_fp)
174
        s_sim = sentence_similarity(self.sentence_fp, other.sentence_fp)
175
        combined = 0.4 * w_sim + 0.6 * s_sim
176
        return {
177
            "winnowing": w_sim,
178
            "sentence": s_sim,
179
            "combined": combined,
180
            "verdict": (
181
                "MATCH" if combined >= 0.6
182
                else "LIKELY" if combined >= 0.3
183
                else "UNLIKELY" if combined >= 0.1
184
                else "NO_MATCH"
185
            ),
186
        }
187
 
188
    def to_dict(self) -> dict:
189
        """Serialize for storage in manifest/registry."""
190
        return {
191
            "winnowing_fp": self.winnowing_fp,
192
            "sentence_fp": self.sentence_fp,
193
            "text_length": self.text_length,
194
            "sentence_count": self.sentence_count,
195
        }
196
 
197
    @classmethod
198
    def from_dict(cls, d: dict) -> "ContentFingerprint":
199
        """Deserialize from stored dict."""
200
        return cls(
201
            winnowing_fp=d["winnowing_fp"],
202
            sentence_fp=d["sentence_fp"],
203
            text_length=d.get("text_length", 0),
204
            sentence_count=d.get("sentence_count", 0),
205
        )