| 1 | """ |
| 2 | oversight_core.rekor |
| 3 | ==================== |
| 4 | |
| 5 | Sigstore Rekor v2 integration (v0.5). |
| 6 | |
| 7 | Builds DSSE envelopes wrapping in-toto Statements that describe Oversight |
| 8 | mark registrations, uploads them to a Rekor v2 log, and verifies inclusion |
| 9 | proofs returned by the log. |
| 10 | |
| 11 | Key facts (verified 2026-04-19 against current upstream): |
| 12 | * Rekor v2 GA'd 2025-10-10 (tile-backed transparency log). |
| 13 | * Only entry types accepted: ``hashedrekord`` and ``dsse``. |
| 14 | * Single write endpoint: ``POST {log_url}/api/v2/log/entries``. |
| 15 | * Inclusion proofs are returned in the write response. There is no online |
| 16 | proof-by-index API; verifiers compute proofs from tiles when they need to |
| 17 | re-derive one. |
| 18 | * Public log URL pattern: ``https://logYEAR-N.rekor.sigstore.dev``. Shards |
| 19 | rotate roughly every 6 months. Never hardcode beyond a default. |
| 20 | |
| 21 | This module deliberately does NOT depend on ``sigstore-python`` so the issuer's |
| 22 | runtime dependency footprint stays small. Auditors verify with stock |
| 23 | ``sigstore-python`` via :mod:`oversight_core.auditor_helper` (separate file). |
| 24 | """ |
| 25 | from __future__ import annotations |
| 26 | |
| 27 | import base64 |
| 28 | import json |
| 29 | import time |
| 30 | import urllib.error |
| 31 | import urllib.request |
| 32 | from dataclasses import dataclass, field |
| 33 | from typing import Any, Optional |
| 34 | |
| 35 | from oversight_core.jcs import jcs_dumps |
| 36 | |
| 37 | from cryptography.hazmat.primitives.asymmetric.ed25519 import ( |
| 38 | Ed25519PrivateKey, |
| 39 | Ed25519PublicKey, |
| 40 | ) |
| 41 | from cryptography.exceptions import InvalidSignature |
| 42 | |
| 43 | |
| 44 | |
| 45 | DSSE_PAYLOAD_TYPE = "application/vnd.in-toto+json" |
| 46 | STATEMENT_TYPE = "https://in-toto.io/Statement/v1" |
| 47 | PREDICATE_TYPE = ( |
| 48 | "https://github.com/oversight-protocol/oversight/blob/v0.5.0/" |
| 49 | "docs/predicates/registration-v1.md" |
| 50 | ) |
| 51 | PREDICATE_VERSION = 1 |
| 52 | |
| 53 | DEFAULT_REKOR_URL = "https://log2025-1.rekor.sigstore.dev" |
| 54 | TLOG_KIND = "rekor-v2-dsse" |
| 55 | LEGACY_TLOG_KIND = "oversight-self-merkle-v1" |
| 56 | BUNDLE_SCHEMA = 2 |
| 57 | |
| 58 | REKOR_WRITE_TIMEOUT_SEC = 25 |
| 59 | |
| 60 | |
| 61 | |
| 62 | |
| 63 | @dataclass |
| 64 | class OversightRegistrationPredicate: |
| 65 | """Predicate body for an Oversight mark registration. |
| 66 | |
| 67 | Privacy: the on-log predicate carries a SHA-256 hash of the recipient |
| 68 | public key, never the raw key. The raw key stays in the local ``.sealed`` |
| 69 | bundle. This prevents anyone watching the public log from enumerating |
| 70 | recipients by pubkey or correlating multiple marks to the same recipient |
| 71 | across issuers. ``recipient_id`` is also expected to be an opaque hash |
| 72 | or UUID, not an email; if a caller passes raw PII the predicate accepts |
| 73 | it but logs a warning at construction. |
| 74 | """ |
| 75 | |
| 76 | file_id: str |
| 77 | issuer_pubkey_ed25519: str |
| 78 | recipient_id: str |
| 79 | recipient_pubkey_sha256: str |
| 80 | suite: str |
| 81 | registered_at: str |
| 82 | rfc3161_tsa: Optional[str] = None |
| 83 | rfc3161_token_b64: Optional[str] = None |
| 84 | rfc3161_chain_b64: Optional[str] = None |
| 85 | policy: dict = field(default_factory=dict) |
| 86 | watermarks: dict = field(default_factory=dict) |
| 87 | |
| 88 | def to_dict(self) -> dict: |
| 89 | d = { |
| 90 | "predicate_version": PREDICATE_VERSION, |
| 91 | "file_id": self.file_id, |
| 92 | "issuer_pubkey_ed25519": self.issuer_pubkey_ed25519, |
| 93 | "recipient_id": self.recipient_id, |
| 94 | "recipient_pubkey_sha256": self.recipient_pubkey_sha256, |
| 95 | "suite": self.suite, |
| 96 | "registered_at": self.registered_at, |
| 97 | "policy": self.policy, |
| 98 | "watermarks": self.watermarks, |
| 99 | } |
| 100 | if self.rfc3161_tsa: |
| 101 | d["rfc3161_tsa"] = self.rfc3161_tsa |
| 102 | if self.rfc3161_token_b64: |
| 103 | d["rfc3161_token_b64"] = self.rfc3161_token_b64 |
| 104 | if self.rfc3161_chain_b64: |
| 105 | d["rfc3161_chain_b64"] = self.rfc3161_chain_b64 |
| 106 | return d |
| 107 | |
| 108 | |
| 109 | def hash_recipient_pubkey(x25519_pub_hex: str) -> str: |
| 110 | """Convenience: compute the recipient_pubkey_sha256 from a hex X25519 key. |
| 111 | |
| 112 | Issuers should call this rather than passing the raw pubkey into the |
| 113 | predicate constructor, to avoid accidentally publishing it to Rekor. |
| 114 | """ |
| 115 | import hashlib |
| 116 | raw = bytes.fromhex(x25519_pub_hex) |
| 117 | return hashlib.sha256(raw).hexdigest() |
| 118 | |
| 119 | |
| 120 | @dataclass |
| 121 | class DSSEEnvelope: |
| 122 | payload_b64: str |
| 123 | payload_type: str |
| 124 | signatures: list[dict] |
| 125 | |
| 126 | def to_json(self) -> str: |
| 127 | return jcs_dumps( |
| 128 | { |
| 129 | "payload": self.payload_b64, |
| 130 | "payloadType": self.payload_type, |
| 131 | "signatures": self.signatures, |
| 132 | } |
| 133 | ).decode("utf-8") |
| 134 | |
| 135 | @classmethod |
| 136 | def from_json(cls, raw: str) -> "DSSEEnvelope": |
| 137 | d = json.loads(raw) |
| 138 | return cls( |
| 139 | payload_b64=d["payload"], |
| 140 | payload_type=d["payloadType"], |
| 141 | signatures=d["signatures"], |
| 142 | ) |
| 143 | |
| 144 | |
| 145 | |
| 146 | |
| 147 | def build_statement( |
| 148 | mark_id_hex: str, |
| 149 | content_hash_sha256_hex: str, |
| 150 | predicate: OversightRegistrationPredicate, |
| 151 | ) -> dict: |
| 152 | """Assemble the in-toto v1 Statement for an Oversight registration. |
| 153 | |
| 154 | The subject's ``digest`` carries the plaintext sha256, so any auditor |
| 155 | who can hash the leaked text can find matching registrations by digest. |
| 156 | The subject ``name`` carries the mark_id so attribution chains can index |
| 157 | by either. |
| 158 | """ |
| 159 | return { |
| 160 | "_type": STATEMENT_TYPE, |
| 161 | "subject": [ |
| 162 | { |
| 163 | "name": f"mark:{mark_id_hex}", |
| 164 | "digest": {"sha256": content_hash_sha256_hex}, |
| 165 | } |
| 166 | ], |
| 167 | "predicateType": PREDICATE_TYPE, |
| 168 | "predicate": predicate.to_dict(), |
| 169 | } |
| 170 | |
| 171 | |
| 172 | def _pae(payload_type: str, payload: bytes) -> bytes: |
| 173 | """DSSE Pre-Authentication Encoding (PAEv1). |
| 174 | |
| 175 | PAE = "DSSEv1" SP <len(type)> SP <type> SP <len(payload)> SP <payload> |
| 176 | """ |
| 177 | return ( |
| 178 | b"DSSEv1 " |
| 179 | + str(len(payload_type)).encode("ascii") |
| 180 | + b" " |
| 181 | + payload_type.encode("ascii") |
| 182 | + b" " |
| 183 | + str(len(payload)).encode("ascii") |
| 184 | + b" " |
| 185 | + payload |
| 186 | ) |
| 187 | |
| 188 | |
| 189 | def sign_dsse( |
| 190 | statement: dict, |
| 191 | issuer_ed25519_priv: bytes, |
| 192 | keyid: str = "", |
| 193 | ) -> DSSEEnvelope: |
| 194 | """Sign a Statement, returning a DSSE envelope. |
| 195 | |
| 196 | ``keyid`` is opaque per spec; convention is the hex SHA-256 of the public |
| 197 | key. Empty string is allowed and used in tests. |
| 198 | """ |
| 199 | payload = jcs_dumps(statement) |
| 200 | payload_b64 = base64.b64encode(payload).decode("ascii") |
| 201 | pae = _pae(DSSE_PAYLOAD_TYPE, payload) |
| 202 | sk = Ed25519PrivateKey.from_private_bytes(issuer_ed25519_priv) |
| 203 | sig = sk.sign(pae) |
| 204 | return DSSEEnvelope( |
| 205 | payload_b64=payload_b64, |
| 206 | payload_type=DSSE_PAYLOAD_TYPE, |
| 207 | signatures=[{"sig": base64.b64encode(sig).decode("ascii"), "keyid": keyid}], |
| 208 | ) |
| 209 | |
| 210 | |
| 211 | def verify_dsse(envelope: DSSEEnvelope, issuer_ed25519_pub: bytes) -> bool: |
| 212 | """Verify the envelope's first signature against ``issuer_ed25519_pub``. |
| 213 | |
| 214 | DSSE supports multiple signatures; for Oversight v0.5 only the issuer |
| 215 | signs, so we accept the first signature that verifies. |
| 216 | """ |
| 217 | try: |
| 218 | payload = base64.b64decode(envelope.payload_b64) |
| 219 | except Exception: |
| 220 | return False |
| 221 | pae = _pae(envelope.payload_type, payload) |
| 222 | pk = Ed25519PublicKey.from_public_bytes(issuer_ed25519_pub) |
| 223 | for sig_obj in envelope.signatures: |
| 224 | try: |
| 225 | sig = base64.b64decode(sig_obj["sig"]) |
| 226 | pk.verify(sig, pae) |
| 227 | return True |
| 228 | except (InvalidSignature, KeyError, ValueError): |
| 229 | continue |
| 230 | return False |
| 231 | |
| 232 | |
| 233 | def envelope_payload_statement(envelope: DSSEEnvelope) -> dict: |
| 234 | return json.loads(base64.b64decode(envelope.payload_b64)) |
| 235 | |
| 236 | |
| 237 | |
| 238 | |
| 239 | @dataclass |
| 240 | class RekorUploadResult: |
| 241 | log_url: str |
| 242 | log_index: Optional[int] |
| 243 | log_id: Optional[str] |
| 244 | integrated_time: Optional[int] |
| 245 | transparency_log_entry: dict |
| 246 | log_pubkey_pem: Optional[str] = None |
| 247 | checkpoint: Optional[str] = None |
| 248 | |
| 249 | def to_bundle_dict(self) -> dict: |
| 250 | """Shape that Oversight bundles embed under ``rekor`` key. |
| 251 | |
| 252 | Always includes the four 5-year-replay fields the desktop reviewer |
| 253 | flagged: ``log_pubkey``, ``checkpoint``, ``log_entry_schema``, and |
| 254 | the raw ``transparency_log_entry`` blob. A 2031 verifier can ignore |
| 255 | TUF entirely and verify directly from these fields. |
| 256 | """ |
| 257 | return { |
| 258 | "log_url": self.log_url, |
| 259 | "log_index": self.log_index, |
| 260 | "log_id": self.log_id, |
| 261 | "integrated_time": self.integrated_time, |
| 262 | "log_pubkey_pem": self.log_pubkey_pem, |
| 263 | "checkpoint": self.checkpoint, |
| 264 | "log_entry_schema": "rekor/v1.TransparencyLogEntry", |
| 265 | "transparency_log_entry": self.transparency_log_entry, |
| 266 | } |
| 267 | |
| 268 | |
| 269 | def build_bundle( |
| 270 | manifest_dict: dict, |
| 271 | manifest_sig_hex: str, |
| 272 | upload: "RekorUploadResult", |
| 273 | dsse_envelope: "DSSEEnvelope", |
| 274 | rfc3161_token_b64: Optional[str] = None, |
| 275 | rfc3161_chain_b64: Optional[str] = None, |
| 276 | ) -> dict: |
| 277 | """Assemble the v0.5 evidence bundle. |
| 278 | |
| 279 | The integer ``bundle_schema`` field lets pre-v0.5 verifiers fail fast |
| 280 | on ``unknown schema, upgrade`` rather than silently mis-routing because |
| 281 | ``tlog_kind`` happened to default the wrong way. |
| 282 | """ |
| 283 | bundle = { |
| 284 | "bundle_schema": BUNDLE_SCHEMA, |
| 285 | "tlog_kind": TLOG_KIND, |
| 286 | "manifest": manifest_dict, |
| 287 | "manifest_sig": manifest_sig_hex, |
| 288 | "rekor": upload.to_bundle_dict(), |
| 289 | "dsse_envelope": json.loads(dsse_envelope.to_json()), |
| 290 | } |
| 291 | if rfc3161_token_b64: |
| 292 | bundle["rfc3161_token"] = rfc3161_token_b64 |
| 293 | if rfc3161_chain_b64: |
| 294 | bundle["rfc3161_chain"] = rfc3161_chain_b64 |
| 295 | return bundle |
| 296 | |
| 297 | |
| 298 | def upload_dsse( |
| 299 | envelope: DSSEEnvelope, |
| 300 | issuer_ed25519_pub_pem: str, |
| 301 | log_url: str = DEFAULT_REKOR_URL, |
| 302 | timeout: float = REKOR_WRITE_TIMEOUT_SEC, |
| 303 | ) -> RekorUploadResult: |
| 304 | """POST a DSSE envelope to Rekor v2. |
| 305 | |
| 306 | ``issuer_ed25519_pub_pem`` is the issuer's verification key in PEM. The |
| 307 | upload payload converts it to the DER (SubjectPublicKeyInfo) bytes that |
| 308 | the Rekor v2 ``Verifier.PublicKey.raw_bytes`` field actually requires. |
| 309 | |
| 310 | Wire shape per |
| 311 | https://github.com/sigstore/rekor-tiles/blob/main/api/proto/rekor/v2/dsse.proto |
| 312 | (verified 2026-04-19): ``verifiers`` is a repeated field; each verifier |
| 313 | carries ``publicKey.rawBytes`` (DER) and a sibling ``keyDetails`` enum |
| 314 | string (e.g. ``PKIX_ED25519``). |
| 315 | |
| 316 | Network errors raise; callers decide whether to retry or fall back to |
| 317 | the local tlog (only acceptable for development, not production). |
| 318 | """ |
| 319 | from cryptography.hazmat.primitives import serialization as _ser |
| 320 | pub_obj = _ser.load_pem_public_key(issuer_ed25519_pub_pem.encode("utf-8")) |
| 321 | pub_der = pub_obj.public_bytes( |
| 322 | encoding=_ser.Encoding.DER, |
| 323 | format=_ser.PublicFormat.SubjectPublicKeyInfo, |
| 324 | ) |
| 325 | body = json.dumps( |
| 326 | { |
| 327 | "dsseRequestV002": { |
| 328 | "envelope": json.loads(envelope.to_json()), |
| 329 | "verifiers": [ |
| 330 | { |
| 331 | "publicKey": { |
| 332 | "rawBytes": base64.b64encode(pub_der).decode("ascii"), |
| 333 | }, |
| 334 | "keyDetails": "PKIX_ED25519", |
| 335 | } |
| 336 | ], |
| 337 | } |
| 338 | } |
| 339 | ).encode("utf-8") |
| 340 | req = urllib.request.Request( |
| 341 | url=log_url.rstrip("/") + "/api/v2/log/entries", |
| 342 | data=body, |
| 343 | method="POST", |
| 344 | headers={ |
| 345 | "Content-Type": "application/json", |
| 346 | "Accept": "application/json", |
| 347 | "User-Agent": "oversight-protocol/0.5 (+https://github.com/oversight-protocol)", |
| 348 | }, |
| 349 | ) |
| 350 | try: |
| 351 | with urllib.request.urlopen(req, timeout=timeout) as resp: |
| 352 | raw = resp.read().decode("utf-8") |
| 353 | except urllib.error.HTTPError as e: |
| 354 | detail = "" |
| 355 | try: |
| 356 | detail = e.read().decode("utf-8", errors="replace")[:500] |
| 357 | except Exception: |
| 358 | pass |
| 359 | raise RuntimeError(f"rekor v2 upload failed: HTTP {e.code} {detail}") from e |
| 360 | parsed = json.loads(raw) |
| 361 | return RekorUploadResult( |
| 362 | log_url=log_url, |
| 363 | log_index=_first_int(parsed, ["logIndex", "logEntry", "log_index"]), |
| 364 | log_id=_first_str(parsed, ["logID", "logId", "log_id"]), |
| 365 | integrated_time=_first_int(parsed, ["integratedTime", "integrated_time"]), |
| 366 | transparency_log_entry=parsed, |
| 367 | ) |
| 368 | |
| 369 | |
| 370 | def _first_int(d: dict, keys: list[str]) -> Optional[int]: |
| 371 | for k in keys: |
| 372 | if k in d: |
| 373 | try: |
| 374 | return int(d[k]) |
| 375 | except (TypeError, ValueError): |
| 376 | continue |
| 377 | return None |
| 378 | |
| 379 | |
| 380 | def _first_str(d: dict, keys: list[str]) -> Optional[str]: |
| 381 | for k in keys: |
| 382 | if k in d and isinstance(d[k], str): |
| 383 | return d[k] |
| 384 | return None |
| 385 | |
| 386 | |
| 387 | |
| 388 | |
| 389 | def verify_inclusion_offline( |
| 390 | bundle_rekor_field: dict, |
| 391 | envelope: DSSEEnvelope, |
| 392 | issuer_ed25519_pub: bytes, |
| 393 | expected_content_hash_sha256_hex: str, |
| 394 | ) -> tuple[bool, str]: |
| 395 | """Verify a bundled Rekor entry without contacting the log. |
| 396 | |
| 397 | Checks (in order): |
| 398 | 1. The DSSE envelope verifies under ``issuer_ed25519_pub``. |
| 399 | 2. The envelope payload's subject digest matches the bundle manifest's |
| 400 | expected plaintext SHA-256. |
| 401 | 3. The bundled ``transparency_log_entry`` has the structural fields the |
| 402 | tile-backed log returns (logIndex + signed checkpoint or proof). |
| 403 | |
| 404 | A full inclusion-proof recomputation requires fetching tiles; that lives |
| 405 | in :mod:`oversight_core.auditor_helper`, which uses ``sigstore-python``. |
| 406 | Returns ``(ok, reason)``. |
| 407 | """ |
| 408 | if not verify_dsse(envelope, issuer_ed25519_pub): |
| 409 | return False, "dsse signature did not verify under issuer pubkey" |
| 410 | statement = envelope_payload_statement(envelope) |
| 411 | try: |
| 412 | subject_digest = statement["subject"][0]["digest"]["sha256"] |
| 413 | except (KeyError, IndexError, TypeError): |
| 414 | return False, "dsse payload missing subject digest" |
| 415 | if subject_digest != expected_content_hash_sha256_hex: |
| 416 | return False, "dsse subject digest does not match expected content hash" |
| 417 | tle = bundle_rekor_field.get("transparency_log_entry") or {} |
| 418 | if not isinstance(tle, dict) or not tle: |
| 419 | return False, "bundle missing transparency_log_entry payload" |
| 420 | has_proof = any( |
| 421 | k in tle for k in ("inclusionProof", "inclusion_proof", "logEntry") |
| 422 | ) |
| 423 | if not has_proof: |
| 424 | return False, "transparency_log_entry has no inclusion proof or logEntry shape" |
| 425 | return True, "ok" |