| 1 | """ |
| 2 | oversight_core.tlog |
| 3 | ================== |
| 4 | |
| 5 | Append-only Merkle transparency log for the OVERSIGHT registry. |
| 6 | |
| 7 | Every event (registration, beacon callback, attribution query) is appended |
| 8 | as a leaf. The log signs a tree head periodically; auditors can verify |
| 9 | inclusion proofs for any event and detect if the registry ever attempted to |
| 10 | remove or reorder entries. |
| 11 | |
| 12 | This is a simplified version of Sigstore Rekor / Google Trillian. For |
| 13 | production at scale, delegate to one of those - the code below is sufficient |
| 14 | for single-registry integrity and audit. |
| 15 | |
| 16 | Schema: |
| 17 | leaf_hash = SHA-256(leaf_bytes) |
| 18 | internal_hash = SHA-256(left || right) |
| 19 | root = top hash at any tree size |
| 20 | signed head = Ed25519(size || root) by registry's tlog key |
| 21 | |
| 22 | Storage: flat append-only file of leaves + in-memory tree of hashes. |
| 23 | """ |
| 24 | |
| 25 | from __future__ import annotations |
| 26 | |
| 27 | import hashlib |
| 28 | import json |
| 29 | import os |
| 30 | import threading |
| 31 | from pathlib import Path |
| 32 | from typing import Optional |
| 33 | |
| 34 | from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey |
| 35 | |
| 36 | from .jcs import jcs_dumps |
| 37 | |
| 38 | |
| 39 | def _h(data: bytes) -> bytes: |
| 40 | return hashlib.sha256(data).digest() |
| 41 | |
| 42 | |
| 43 | def _largest_power_of_2_less_than(n: int) -> int: |
| 44 | """Largest k = 2^j such that k < n (for n >= 2). RFC 6962 §2.1.""" |
| 45 | assert n >= 2 |
| 46 | k = 1 |
| 47 | while k * 2 < n: |
| 48 | k *= 2 |
| 49 | return k |
| 50 | |
| 51 | |
| 52 | def _rfc6962_mth(leaf_hashes: list[bytes]) -> bytes: |
| 53 | """Merkle Tree Hash over pre-hashed leaves, RFC 6962 §2.1. |
| 54 | |
| 55 | Assumes `leaf_hashes` are already _h(0x00 || leaf_bytes) (the leaf prefix |
| 56 | is applied at append time). This function only handles internal node |
| 57 | combining with 0x01 prefix and left-heavy splits. |
| 58 | """ |
| 59 | n = len(leaf_hashes) |
| 60 | if n == 1: |
| 61 | return leaf_hashes[0] |
| 62 | k = _largest_power_of_2_less_than(n) |
| 63 | left = _rfc6962_mth(leaf_hashes[:k]) |
| 64 | right = _rfc6962_mth(leaf_hashes[k:]) |
| 65 | return _h(b"\x01" + left + right) |
| 66 | |
| 67 | |
| 68 | def _rfc6962_path(leaf_hashes: list[bytes], m: int) -> list[bytes]: |
| 69 | """Compute the audit path (inclusion proof) for leaf index m, RFC 6962 §2.1.1. |
| 70 | |
| 71 | Returns a list of sibling hashes that, combined with the leaf, rebuild the root. |
| 72 | """ |
| 73 | n = len(leaf_hashes) |
| 74 | if n <= 1: |
| 75 | return [] |
| 76 | k = _largest_power_of_2_less_than(n) |
| 77 | if m < k: |
| 78 | return _rfc6962_path(leaf_hashes[:k], m) + [_rfc6962_mth(leaf_hashes[k:])] |
| 79 | else: |
| 80 | return _rfc6962_path(leaf_hashes[k:], m - k) + [_rfc6962_mth(leaf_hashes[:k])] |
| 81 | |
| 82 | |
| 83 | def _leaf_data_bytes(rec: dict) -> bytes: |
| 84 | if rec.get("leaf_data_hex") is not None: |
| 85 | return bytes.fromhex(rec["leaf_data_hex"]) |
| 86 | leaf_data = rec.get("leaf_data") |
| 87 | if not isinstance(leaf_data, str): |
| 88 | raise ValueError("leaf_data must be a string") |
| 89 | return leaf_data.encode("utf-8") |
| 90 | |
| 91 | |
| 92 | def _parse_leaf_record(line: str, expected_index: int) -> tuple[dict, bytes]: |
| 93 | rec = json.loads(line) |
| 94 | if not isinstance(rec, dict): |
| 95 | raise ValueError("leaf record must be an object") |
| 96 | found_index = rec.get("index") |
| 97 | if type(found_index) is not int: |
| 98 | raise ValueError("leaf index must be an integer") |
| 99 | if found_index != expected_index: |
| 100 | raise ValueError( |
| 101 | f"leaf index mismatch: expected {expected_index}, got {found_index}" |
| 102 | ) |
| 103 | leaf_hash = bytes.fromhex(rec["leaf_hash"]) |
| 104 | if len(leaf_hash) != 32: |
| 105 | raise ValueError( |
| 106 | f"leaf hash length for index {found_index}: expected 32, got {len(leaf_hash)}" |
| 107 | ) |
| 108 | if leaf_hash != _h(b"\x00" + _leaf_data_bytes(rec)): |
| 109 | raise ValueError(f"leaf hash mismatch at index {found_index}") |
| 110 | return rec, leaf_hash |
| 111 | |
| 112 | |
| 113 | class TransparencyLog: |
| 114 | """Append-only Merkle log with signed tree heads. |
| 115 | |
| 116 | Improvements in v0.2.1: |
| 117 | - fsync on append so entries survive crashes |
| 118 | - cached Merkle tree incrementally updated on append (O(log n) not O(n)) |
| 119 | """ |
| 120 | |
| 121 | def __init__(self, data_dir: str | Path, signing_key_hex: Optional[str] = None): |
| 122 | self.dir = Path(data_dir) |
| 123 | self.dir.mkdir(parents=True, exist_ok=True) |
| 124 | self.leaves_path = self.dir / "leaves.jsonl" |
| 125 | self.head_path = self.dir / "head.json" |
| 126 | self._lock = threading.Lock() |
| 127 | self._leaves: list[bytes] = [] |
| 128 | self._cached_root: Optional[bytes] = None |
| 129 | self._load() |
| 130 | |
| 131 | if signing_key_hex: |
| 132 | self._sk = Ed25519PrivateKey.from_private_bytes(bytes.fromhex(signing_key_hex)) |
| 133 | else: |
| 134 | self._sk = None |
| 135 | |
| 136 | def _load(self): |
| 137 | if not self.leaves_path.exists(): |
| 138 | return |
| 139 | with self.leaves_path.open("r") as f: |
| 140 | expected_index = 0 |
| 141 | for line in f: |
| 142 | if not line.strip(): |
| 143 | continue |
| 144 | _, leaf_hash = _parse_leaf_record(line, expected_index) |
| 145 | self._leaves.append(leaf_hash) |
| 146 | expected_index += 1 |
| 147 | |
| 148 | def append(self, leaf_data: bytes | str | dict) -> int: |
| 149 | """Append a leaf. Durable: fsync before return.""" |
| 150 | if isinstance(leaf_data, dict): |
| 151 | leaf_bytes = jcs_dumps(leaf_data) |
| 152 | elif isinstance(leaf_data, str): |
| 153 | leaf_bytes = leaf_data.encode("utf-8") |
| 154 | else: |
| 155 | leaf_bytes = leaf_data |
| 156 | |
| 157 | with self._lock: |
| 158 | index = len(self._leaves) |
| 159 | leaf_hash = _h(b"\x00" + leaf_bytes) |
| 160 | self._leaves.append(leaf_hash) |
| 161 | self._cached_root = None |
| 162 | record = json.dumps({ |
| 163 | "index": index, |
| 164 | "leaf_hash": leaf_hash.hex(), |
| 165 | "leaf_data": leaf_bytes.decode("utf-8", errors="replace"), |
| 166 | "leaf_data_hex": leaf_bytes.hex(), |
| 167 | }) + "\n" |
| 168 | with self.leaves_path.open("a") as f: |
| 169 | f.write(record) |
| 170 | f.flush() |
| 171 | os.fsync(f.fileno()) |
| 172 | return index |
| 173 | |
| 174 | def root(self) -> bytes: |
| 175 | """Compute current Merkle root per RFC 6962. Cached after first compute. |
| 176 | |
| 177 | RFC 6962 formula: |
| 178 | MTH({}) = SHA-256() |
| 179 | MTH({d[0]}) = SHA-256(0x00 || d[0]) (leaf hash, handled at append) |
| 180 | MTH(D[0:n]) = SHA-256(0x01 || MTH(D[0:k]) || MTH(D[k:n])) |
| 181 | where k is the largest power of 2 < n |
| 182 | |
| 183 | This produces a left-heavy tree where the last subtree may be smaller, |
| 184 | which is the canonical form verifiable by any RFC 6962 client (Sigstore |
| 185 | Rekor, CT log verifiers, etc.). |
| 186 | """ |
| 187 | with self._lock: |
| 188 | if self._cached_root is not None: |
| 189 | return self._cached_root |
| 190 | if not self._leaves: |
| 191 | self._cached_root = _h(b"") |
| 192 | return self._cached_root |
| 193 | self._cached_root = _rfc6962_mth(self._leaves) |
| 194 | return self._cached_root |
| 195 | |
| 196 | def size(self) -> int: |
| 197 | return len(self._leaves) |
| 198 | |
| 199 | def signed_head(self) -> dict: |
| 200 | size = self.size() |
| 201 | root = self.root() |
| 202 | head = {"size": size, "root": root.hex()} |
| 203 | msg = jcs_dumps(head) |
| 204 | if self._sk: |
| 205 | sig = self._sk.sign(msg) |
| 206 | head["signature"] = sig.hex() |
| 207 | head["signed_message"] = msg.decode("utf-8") |
| 208 | return head |
| 209 | |
| 210 | def inclusion_proof(self, index: int) -> Optional[dict]: |
| 211 | """RFC 6962 inclusion proof for the leaf at `index`. |
| 212 | |
| 213 | Use `verify_inclusion_proof()` to check the returned proof against |
| 214 | a signed root. The proof order matches RFC 6962 §2.1.1 - deepest |
| 215 | sibling first, root-level sibling last. |
| 216 | """ |
| 217 | if index < 0 or index >= len(self._leaves): |
| 218 | return None |
| 219 | path = _rfc6962_path(list(self._leaves), index) |
| 220 | return { |
| 221 | "index": index, |
| 222 | "leaf_hash": self._leaves[index].hex(), |
| 223 | "proof": [h.hex() for h in path], |
| 224 | "root": self.root().hex(), |
| 225 | "tree_size": len(self._leaves), |
| 226 | } |
| 227 | |
| 228 | def range_records(self, start: int = 0, limit: int = 500) -> list[dict]: |
| 229 | if start < 0: |
| 230 | raise ValueError("start must be non-negative") |
| 231 | if limit <= 0: |
| 232 | return [] |
| 233 | with self._lock: |
| 234 | if start >= len(self._leaves): |
| 235 | return [] |
| 236 | end = min(start + limit, len(self._leaves)) |
| 237 | records: list[dict] = [] |
| 238 | expected_index = 0 |
| 239 | with self.leaves_path.open("r") as f: |
| 240 | for line in f: |
| 241 | if not line.strip(): |
| 242 | continue |
| 243 | if expected_index >= end: |
| 244 | break |
| 245 | rec, _ = _parse_leaf_record(line, expected_index) |
| 246 | if expected_index >= start: |
| 247 | records.append(rec) |
| 248 | expected_index += 1 |
| 249 | if expected_index < end: |
| 250 | raise ValueError(f"leaf record missing at index {expected_index}") |
| 251 | return records |
| 252 | |
| 253 | |
| 254 | def verify_inclusion_proof( |
| 255 | leaf_hash: bytes, |
| 256 | index: int, |
| 257 | proof: list[bytes], |
| 258 | tree_size: int, |
| 259 | expected_root: bytes, |
| 260 | ) -> bool: |
| 261 | """RFC 6962 §2.1.1 inclusion proof verifier. |
| 262 | |
| 263 | Recursive structure mirrors the prover: at each level, decide whether the |
| 264 | target leaf is in the left or right subtree based on (index, largest-power- |
| 265 | of-2 split), and combine the sibling from the proof path accordingly. |
| 266 | """ |
| 267 | if tree_size < 1 or index < 0 or index >= tree_size: |
| 268 | return False |
| 269 | |
| 270 | def rec(h: bytes, m: int, remaining: list[bytes], n: int) -> Optional[bytes]: |
| 271 | if n == 1: |
| 272 | return h if not remaining else None |
| 273 | if not remaining: |
| 274 | return None |
| 275 | k = _largest_power_of_2_less_than(n) |
| 276 | sibling = remaining[-1] |
| 277 | deeper = remaining[:-1] |
| 278 | if m < k: |
| 279 | left = rec(h, m, deeper, k) |
| 280 | if left is None: |
| 281 | return None |
| 282 | right = sibling |
| 283 | else: |
| 284 | left = sibling |
| 285 | right = rec(h, m - k, deeper, n - k) |
| 286 | if right is None: |
| 287 | return None |
| 288 | return _h(b"\x01" + left + right) |
| 289 | |
| 290 | computed = rec(leaf_hash, index, list(proof), tree_size) |
| 291 | return computed == expected_root |