Zion Boggan zionboggan.com ↗

Harden signed registry inputs and tlog roots

Co-authored-by: Codex (GPT-5.4) <noreply@openai.com>
55d36cd   Zion Boggan committed on Apr 20, 2026 (2 months ago)
.gitignore +1 -0
@@ -46,3 +46,4 @@ secrets/
*.log
scratch/
tmp/
+.tmp-tests/
CHANGELOG.md +8 -3
@@ -11,12 +11,17 @@
- `oversight_core/rekor.py`: offline verification now rejects DSSE envelopes
whose subject digest does not match the expected content hash.
- `registry/server.py`: Rekor attestations now use real watermark mark IDs
- and the manifest's actual `content_hash`.
+ and the manifest's actual `content_hash`, and `/register` now rejects
+ unsigned beacon / watermark sidecars that do not match the signed manifest.
- `oversight_core/formats/text.py`: text adapter now applies L3 before L2/L1,
matching the core watermark pipeline.
+ - `oversight_core/tlog.py`: empty-tree roots now use the RFC 6962 Merkle
+ hash (`SHA-256("")`) instead of an all-zero placeholder.
+ - `oversight_core/__init__.py`: package `__version__` is back in sync with
+ `pyproject.toml`.
- Added focused regression coverage in `tests/test_policy_unit.py`,
- `tests/test_registry_unit.py`, `tests/test_rekor_unit.py`, and
- `tests/test_text_format_unit.py`.
+ `tests/test_registry_unit.py`, `tests/test_rekor_unit.py`,
+ `tests/test_text_format_unit.py`, and `tests/test_tlog_unit.py`.
## v0.5.0 - 2026-04-19
README.md +2 -0
@@ -110,7 +110,9 @@ See `CHANGELOG.md` for full version history.
- `REGISTRY` and `HYBRID` policy modes fail closed instead of silently falling back to local counters.
- Rekor offline verification now checks the attested digest against the expected content hash.
- Registry Rekor attestations now index by real watermark mark IDs and the manifest's actual `content_hash`.
+- Registry registration now refuses unsigned beacon/watermark sidecars that do not match the issuer-signed manifest.
- Multi-recipient sealing is disabled until a recipient-honest manifest format lands.
+- Local transparency-log empty-tree roots now match RFC 6962 exactly.
## Repository layout
oversight_core/__init__.py +1 -1
@@ -30,4 +30,4 @@ __all__ = [
"beacon",
]
-__version__ = "0.1.0"
+__version__ = "0.4.3"
oversight_core/tlog.py +1 -1
@@ -159,7 +159,7 @@ class TransparencyLog:
if self._cached_root is not None:
return self._cached_root
if not self._leaves:
- self._cached_root = b"\x00" * 32
+ self._cached_root = _h(b"")
return self._cached_root
self._cached_root = _rfc6962_mth(self._leaves)
return self._cached_root
registry/server.py +35 -7
@@ -355,6 +355,29 @@ def _verify_manifest_signature(manifest_dict: dict) -> tuple[bool, str]:
return m.verify(), m.issuer_ed25519_pub
+def _canonical_items(items: list[dict]) -> list[str]:
+ """Normalize registration sidecars for exact signed-manifest comparison."""
+ return sorted(
+ json.dumps(item, sort_keys=True, separators=(",", ":"))
+ for item in items
+ )
+
+
+def _signed_registration_artifacts(
+ manifest_dict: dict,
+ req_beacons: list[dict],
+ req_watermarks: list[dict],
+) -> tuple[list[dict], list[dict]]:
+ """Use the manifest's signed beacons/watermarks as the registry source of truth."""
+ signed_beacons = manifest_dict.get("beacons") or []
+ signed_watermarks = manifest_dict.get("watermarks") or []
+ if _canonical_items(req_beacons) != _canonical_items(signed_beacons):
+ raise HTTPException(400, "request beacons do not match signed manifest")
+ if _canonical_items(req_watermarks) != _canonical_items(signed_watermarks):
+ raise HTTPException(400, "request watermarks do not match signed manifest")
+ return signed_beacons, signed_watermarks
+
+
@app.post("/register")
def register(req: RegistrationRequest, request: Request):
"""
@@ -383,6 +406,11 @@ def register(req: RegistrationRequest, request: Request):
raise HTTPException(400, "manifest signature invalid")
if not issuer_pub:
raise HTTPException(400, "manifest missing issuer_ed25519_pub")
+ signed_beacons, signed_watermarks = _signed_registration_artifacts(
+ m,
+ req.beacons,
+ req.watermarks,
+ )
now = int(time.time())
with db() as con:
@@ -401,12 +429,12 @@ def register(req: RegistrationRequest, request: Request):
"INSERT OR REPLACE INTO manifests VALUES (?,?,?,?,?,?)",
(file_id, recipient_id, issuer_id, issuer_pub, json.dumps(m), now),
)
- for b in req.beacons:
+ for b in signed_beacons:
con.execute(
"INSERT OR REPLACE INTO beacons VALUES (?,?,?,?,?,?)",
(b["token_id"], file_id, recipient_id, issuer_id, b["kind"], now),
)
- for w in req.watermarks:
+ for w in signed_watermarks:
con.execute(
"INSERT OR REPLACE INTO watermarks VALUES (?,?,?,?,?,?)",
(w["mark_id"], w["layer"], file_id, recipient_id, issuer_id, now),
@@ -425,8 +453,8 @@ def register(req: RegistrationRequest, request: Request):
"recipient_id": recipient_id,
"issuer_id": issuer_id,
"issuer_pub": issuer_pub,
- "n_beacons": len(req.beacons),
- "n_watermarks": len(req.watermarks),
+ "n_beacons": len(signed_beacons),
+ "n_watermarks": len(signed_watermarks),
"timestamp": timestamp_stub(),
})
@@ -437,9 +465,9 @@ def register(req: RegistrationRequest, request: Request):
recipient_pubkey_hex=recipient.get("x25519_pub"),
suite=m.get("suite", "classic"),
content_hash_sha256_hex=m.get("content_hash", "0" * 64),
- watermarks=req.watermarks,
+ watermarks=signed_watermarks,
mark_id_hex=next(
- (w["mark_id"] for w in req.watermarks if w.get("mark_id")),
+ (w["mark_id"] for w in signed_watermarks if w.get("mark_id")),
file_id,
),
)
@@ -447,7 +475,7 @@ def register(req: RegistrationRequest, request: Request):
return {
"ok": True,
"file_id": file_id,
- "registered_beacons": len(req.beacons),
+ "registered_beacons": len(signed_beacons),
"tlog_index": tlog_idx,
"rekor": rekor_result,
}
tests/test_policy_unit.py +12 -3
@@ -7,7 +7,8 @@ Focused policy/container checks around successful-open counting.
from __future__ import annotations
import sys
-import tempfile
+import shutil
+import uuid
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
@@ -28,6 +29,10 @@ def ok(msg):
print(f" [PASS] {msg}")
+TMP_ROOT = ROOT / ".tmp-tests"
+TMP_ROOT.mkdir(exist_ok=True)
+
+
def t1_wrong_recipient_does_not_consume_open_count():
issuer = ClassicIdentity.generate()
alice = ClassicIdentity.generate()
@@ -51,8 +56,10 @@ def t1_wrong_recipient_does_not_consume_open_count():
manifest.policy["max_opens"] = 1
blob = seal(plaintext, manifest, issuer.ed25519_priv, alice.x25519_pub)
- with tempfile.TemporaryDirectory() as td:
- ctx = PolicyContext(state_dir=Path(td), mode="LOCAL_ONLY")
+ td = TMP_ROOT / f"policy-{uuid.uuid4().hex}"
+ td.mkdir(parents=True, exist_ok=False)
+ try:
+ ctx = PolicyContext(state_dir=td, mode="LOCAL_ONLY")
try:
open_sealed(blob, bob.x25519_priv, policy_ctx=ctx)
except Exception:
@@ -63,6 +70,8 @@ def t1_wrong_recipient_does_not_consume_open_count():
recovered, _ = open_sealed(blob, alice.x25519_priv, policy_ctx=ctx)
assert recovered == plaintext
ok("wrong recipient attempts do not consume max_opens")
+ finally:
+ shutil.rmtree(td, ignore_errors=True)
def t2_registry_modes_fail_closed():
tests/test_registry_unit.py +42 -1
@@ -18,6 +18,7 @@ from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
import registry.server as registry_server
+from fastapi import HTTPException
def _new_identity() -> dict:
@@ -83,13 +84,53 @@ def t1_rekor_attestation_uses_real_mark_id_and_digest():
print(" [PASS] registry attests using a real mark_id and content_hash")
+def t2_register_rejects_unsigned_sidecar_mismatch():
+ manifest = {
+ "beacons": [
+ {"token_id": "tok-1", "kind": "http_img", "url": "https://b.example/p/tok-1.png"},
+ ],
+ "watermarks": [
+ {"layer": "L1_zero_width", "mark_id": "10" * 16},
+ ],
+ }
+ try:
+ registry_server._signed_registration_artifacts(
+ manifest,
+ req_beacons=[
+ {"token_id": "tok-evil", "kind": "http_img", "url": "https://b.example/p/tok-evil.png"},
+ ],
+ req_watermarks=manifest["watermarks"],
+ )
+ except HTTPException as exc:
+ assert exc.status_code == 400
+ assert "beacons do not match" in exc.detail
+ else:
+ raise AssertionError("unsigned request beacons should be rejected")
+
+ try:
+ registry_server._signed_registration_artifacts(
+ manifest,
+ req_beacons=manifest["beacons"],
+ req_watermarks=[
+ {"layer": "L2_whitespace", "mark_id": "20" * 16},
+ ],
+ )
+ except HTTPException as exc:
+ assert exc.status_code == 400
+ assert "watermarks do not match" in exc.detail
+ else:
+ raise AssertionError("unsigned request watermarks should be rejected")
+ print(" [PASS] register rejects unsigned beacon/watermark sidecars")
+
+
def main():
print("=" * 60)
print(" registry.server - focused unit tests")
print("=" * 60)
t1_rekor_attestation_uses_real_mark_id_and_digest()
+ t2_register_rejects_unsigned_sidecar_mismatch()
print()
- print(" ALL TESTS PASSED - 1/1")
+ print(" ALL TESTS PASSED - 2/2")
if __name__ == "__main__":
tests/test_tlog_unit.py +49 -0
@@ -0,0 +1,49 @@
+"""
+test_tlog_unit
+==============
+
+Focused transparency-log checks around RFC 6962 behavior.
+"""
+from __future__ import annotations
+
+import hashlib
+import shutil
+import sys
+import uuid
+from pathlib import Path
+
+ROOT = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(ROOT))
+
+from oversight_core.tlog import TransparencyLog
+
+
+def ok(msg):
+ print(f" [PASS] {msg}")
+
+
+def t1_empty_tree_root_matches_rfc6962():
+ td = ROOT / ".tmp-tests" / f"tlog-{uuid.uuid4().hex}"
+ td.mkdir(parents=True, exist_ok=False)
+ try:
+ tlog = TransparencyLog(td)
+ assert tlog.size() == 0
+ assert tlog.root() == hashlib.sha256(b"").digest()
+ finally:
+ shutil.rmtree(td, ignore_errors=True)
+ ok("empty transparency log root matches RFC 6962")
+
+
+def main():
+ tmp_root = ROOT / ".tmp-tests"
+ tmp_root.mkdir(exist_ok=True)
+ print("=" * 60)
+ print(" oversight_core.tlog - focused unit tests")
+ print("=" * 60)
+ t1_empty_tree_root_matches_rfc6962()
+ print()
+ print(" ALL TESTS PASSED - 1/1")
+
+
+if __name__ == "__main__":
+ main()