Zion Boggan
repos/Oversight/oversight_core/tlog.py
zionboggan.com ↗
291 lines · python
History for this file →
1
"""
2
oversight_core.tlog
3
==================
4
 
5
Append-only Merkle transparency log for the OVERSIGHT registry.
6
 
7
Every event (registration, beacon callback, attribution query) is appended
8
as a leaf. The log signs a tree head periodically; auditors can verify
9
inclusion proofs for any event and detect if the registry ever attempted to
10
remove or reorder entries.
11
 
12
This is a simplified version of Sigstore Rekor / Google Trillian. For
13
production at scale, delegate to one of those - the code below is sufficient
14
for single-registry integrity and audit.
15
 
16
Schema:
17
    leaf_hash     = SHA-256(leaf_bytes)
18
    internal_hash = SHA-256(left || right)
19
    root          = top hash at any tree size
20
    signed head   = Ed25519(size || root) by registry's tlog key
21
 
22
Storage: flat append-only file of leaves + in-memory tree of hashes.
23
"""
24
 
25
from __future__ import annotations
26
 
27
import hashlib
28
import json
29
import os
30
import threading
31
from pathlib import Path
32
from typing import Optional
33
 
34
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
35
 
36
from .jcs import jcs_dumps
37
 
38
 
39
def _h(data: bytes) -> bytes:
40
    return hashlib.sha256(data).digest()
41
 
42
 
43
def _largest_power_of_2_less_than(n: int) -> int:
44
    """Largest k = 2^j such that k < n (for n >= 2). RFC 6962 §2.1."""
45
    assert n >= 2
46
    k = 1
47
    while k * 2 < n:
48
        k *= 2
49
    return k
50
 
51
 
52
def _rfc6962_mth(leaf_hashes: list[bytes]) -> bytes:
53
    """Merkle Tree Hash over pre-hashed leaves, RFC 6962 §2.1.
54
 
55
    Assumes `leaf_hashes` are already _h(0x00 || leaf_bytes) (the leaf prefix
56
    is applied at append time). This function only handles internal node
57
    combining with 0x01 prefix and left-heavy splits.
58
    """
59
    n = len(leaf_hashes)
60
    if n == 1:
61
        return leaf_hashes[0]
62
    k = _largest_power_of_2_less_than(n)
63
    left = _rfc6962_mth(leaf_hashes[:k])
64
    right = _rfc6962_mth(leaf_hashes[k:])
65
    return _h(b"\x01" + left + right)
66
 
67
 
68
def _rfc6962_path(leaf_hashes: list[bytes], m: int) -> list[bytes]:
69
    """Compute the audit path (inclusion proof) for leaf index m, RFC 6962 §2.1.1.
70
 
71
    Returns a list of sibling hashes that, combined with the leaf, rebuild the root.
72
    """
73
    n = len(leaf_hashes)
74
    if n <= 1:
75
        return []
76
    k = _largest_power_of_2_less_than(n)
77
    if m < k:
78
        return _rfc6962_path(leaf_hashes[:k], m) + [_rfc6962_mth(leaf_hashes[k:])]
79
    else:
80
        return _rfc6962_path(leaf_hashes[k:], m - k) + [_rfc6962_mth(leaf_hashes[:k])]
81
 
82
 
83
def _leaf_data_bytes(rec: dict) -> bytes:
84
    if rec.get("leaf_data_hex") is not None:
85
        return bytes.fromhex(rec["leaf_data_hex"])
86
    leaf_data = rec.get("leaf_data")
87
    if not isinstance(leaf_data, str):
88
        raise ValueError("leaf_data must be a string")
89
    return leaf_data.encode("utf-8")
90
 
91
 
92
def _parse_leaf_record(line: str, expected_index: int) -> tuple[dict, bytes]:
93
    rec = json.loads(line)
94
    if not isinstance(rec, dict):
95
        raise ValueError("leaf record must be an object")
96
    found_index = rec.get("index")
97
    if type(found_index) is not int:
98
        raise ValueError("leaf index must be an integer")
99
    if found_index != expected_index:
100
        raise ValueError(
101
            f"leaf index mismatch: expected {expected_index}, got {found_index}"
102
        )
103
    leaf_hash = bytes.fromhex(rec["leaf_hash"])
104
    if len(leaf_hash) != 32:
105
        raise ValueError(
106
            f"leaf hash length for index {found_index}: expected 32, got {len(leaf_hash)}"
107
        )
108
    if leaf_hash != _h(b"\x00" + _leaf_data_bytes(rec)):
109
        raise ValueError(f"leaf hash mismatch at index {found_index}")
110
    return rec, leaf_hash
111
 
112
 
113
class TransparencyLog:
114
    """Append-only Merkle log with signed tree heads.
115
 
116
    Improvements in v0.2.1:
117
      - fsync on append so entries survive crashes
118
      - cached Merkle tree incrementally updated on append (O(log n) not O(n))
119
    """
120
 
121
    def __init__(self, data_dir: str | Path, signing_key_hex: Optional[str] = None):
122
        self.dir = Path(data_dir)
123
        self.dir.mkdir(parents=True, exist_ok=True)
124
        self.leaves_path = self.dir / "leaves.jsonl"
125
        self.head_path = self.dir / "head.json"
126
        self._lock = threading.Lock()
127
        self._leaves: list[bytes] = []
128
        self._cached_root: Optional[bytes] = None
129
        self._load()
130
 
131
        if signing_key_hex:
132
            self._sk = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(signing_key_hex))
133
        else:
134
            self._sk = None
135
 
136
    def _load(self):
137
        if not self.leaves_path.exists():
138
            return
139
        with self.leaves_path.open("r") as f:
140
            expected_index = 0
141
            for line in f:
142
                if not line.strip():
143
                    continue
144
                _, leaf_hash = _parse_leaf_record(line, expected_index)
145
                self._leaves.append(leaf_hash)
146
                expected_index += 1
147
 
148
    def append(self, leaf_data: bytes | str | dict) -> int:
149
        """Append a leaf. Durable: fsync before return."""
150
        if isinstance(leaf_data, dict):
151
            leaf_bytes = jcs_dumps(leaf_data)
152
        elif isinstance(leaf_data, str):
153
            leaf_bytes = leaf_data.encode("utf-8")
154
        else:
155
            leaf_bytes = leaf_data
156
 
157
        with self._lock:
158
            index = len(self._leaves)
159
            leaf_hash = _h(b"\x00" + leaf_bytes)
160
            self._leaves.append(leaf_hash)
161
            self._cached_root = None
162
            record = json.dumps({
163
                "index": index,
164
                "leaf_hash": leaf_hash.hex(),
165
                "leaf_data": leaf_bytes.decode("utf-8", errors="replace"),
166
                "leaf_data_hex": leaf_bytes.hex(),
167
            }) + "\n"
168
            with self.leaves_path.open("a") as f:
169
                f.write(record)
170
                f.flush()
171
                os.fsync(f.fileno())
172
        return index
173
 
174
    def root(self) -> bytes:
175
        """Compute current Merkle root per RFC 6962. Cached after first compute.
176
 
177
        RFC 6962 formula:
178
            MTH({})       = SHA-256()
179
            MTH({d[0]})   = SHA-256(0x00 || d[0])   (leaf hash, handled at append)
180
            MTH(D[0:n])   = SHA-256(0x01 || MTH(D[0:k]) || MTH(D[k:n]))
181
                            where k is the largest power of 2 < n
182
 
183
        This produces a left-heavy tree where the last subtree may be smaller,
184
        which is the canonical form verifiable by any RFC 6962 client (Sigstore
185
        Rekor, CT log verifiers, etc.).
186
        """
187
        with self._lock:
188
            if self._cached_root is not None:
189
                return self._cached_root
190
            if not self._leaves:
191
                self._cached_root = _h(b"")
192
                return self._cached_root
193
            self._cached_root = _rfc6962_mth(self._leaves)
194
            return self._cached_root
195
 
196
    def size(self) -> int:
197
        return len(self._leaves)
198
 
199
    def signed_head(self) -> dict:
200
        size = self.size()
201
        root = self.root()
202
        head = {"size": size, "root": root.hex()}
203
        msg = jcs_dumps(head)
204
        if self._sk:
205
            sig = self._sk.sign(msg)
206
            head["signature"] = sig.hex()
207
            head["signed_message"] = msg.decode("utf-8")
208
        return head
209
 
210
    def inclusion_proof(self, index: int) -> Optional[dict]:
211
        """RFC 6962 inclusion proof for the leaf at `index`.
212
 
213
        Use `verify_inclusion_proof()` to check the returned proof against
214
        a signed root. The proof order matches RFC 6962 §2.1.1 - deepest
215
        sibling first, root-level sibling last.
216
        """
217
        if index < 0 or index >= len(self._leaves):
218
            return None
219
        path = _rfc6962_path(list(self._leaves), index)
220
        return {
221
            "index": index,
222
            "leaf_hash": self._leaves[index].hex(),
223
            "proof": [h.hex() for h in path],
224
            "root": self.root().hex(),
225
            "tree_size": len(self._leaves),
226
        }
227
 
228
    def range_records(self, start: int = 0, limit: int = 500) -> list[dict]:
229
        if start < 0:
230
            raise ValueError("start must be non-negative")
231
        if limit <= 0:
232
            return []
233
        with self._lock:
234
            if start >= len(self._leaves):
235
                return []
236
            end = min(start + limit, len(self._leaves))
237
            records: list[dict] = []
238
            expected_index = 0
239
            with self.leaves_path.open("r") as f:
240
                for line in f:
241
                    if not line.strip():
242
                        continue
243
                    if expected_index >= end:
244
                        break
245
                    rec, _ = _parse_leaf_record(line, expected_index)
246
                    if expected_index >= start:
247
                        records.append(rec)
248
                    expected_index += 1
249
            if expected_index < end:
250
                raise ValueError(f"leaf record missing at index {expected_index}")
251
            return records
252
 
253
 
254
def verify_inclusion_proof(
255
    leaf_hash: bytes,
256
    index: int,
257
    proof: list[bytes],
258
    tree_size: int,
259
    expected_root: bytes,
260
) -> bool:
261
    """RFC 6962 §2.1.1 inclusion proof verifier.
262
 
263
    Recursive structure mirrors the prover: at each level, decide whether the
264
    target leaf is in the left or right subtree based on (index, largest-power-
265
    of-2 split), and combine the sibling from the proof path accordingly.
266
    """
267
    if tree_size < 1 or index < 0 or index >= tree_size:
268
        return False
269
 
270
    def rec(h: bytes, m: int, remaining: list[bytes], n: int) -> Optional[bytes]:
271
        if n == 1:
272
            return h if not remaining else None
273
        if not remaining:
274
            return None
275
        k = _largest_power_of_2_less_than(n)
276
        sibling = remaining[-1]
277
        deeper = remaining[:-1]
278
        if m < k:
279
            left = rec(h, m, deeper, k)
280
            if left is None:
281
                return None
282
            right = sibling
283
        else:
284
            left = sibling
285
            right = rec(h, m - k, deeper, n - k)
286
            if right is None:
287
                return None
288
        return _h(b"\x01" + left + right)
289
 
290
    computed = rec(leaf_hash, index, list(proof), tree_size)
291
    return computed == expected_root