| 1 | """Shared helpers for the two-host dispatch channel. |
| 2 | |
| 3 | Path resolution: set DISPATCH_ROOT to the shared filesystem path that both |
| 4 | nodes can read and write. On Linux this is typically a mounted NFS or SMB |
| 5 | share; on Windows it is the mapped drive letter for the same share. |
| 6 | |
| 7 | Task envelope v2: |
| 8 | { "id": "<uuid>", "from": "<node-id>", "to": "<node-id>", |
| 9 | "created": "<ISO-8601 UTC>", "priority": "low|normal|high", |
| 10 | "request": "<free text>", |
| 11 | "require_ack": bool, |
| 12 | "require_dangerous": bool, |
| 13 | "timeout_s": int, |
| 14 | "max_output_bytes": int, |
| 15 | "schema": 2, |
| 16 | "hmac": "<hex sha256 hmac>" } |
| 17 | """ |
| 18 | from __future__ import annotations |
| 19 | import hashlib |
| 20 | import hmac |
| 21 | import json |
| 22 | import os |
| 23 | import pathlib |
| 24 | import sys |
| 25 | import time |
| 26 | import uuid |
| 27 | from datetime import datetime, timezone |
| 28 | |
| 29 | |
| 30 | NODE_A = os.environ.get("DISPATCH_NODE_A", "a") |
| 31 | NODE_B = os.environ.get("DISPATCH_NODE_B", "b") |
| 32 | |
| 33 | |
| 34 | def _resolve_root() -> pathlib.Path: |
| 35 | env = os.environ.get("DISPATCH_ROOT") |
| 36 | if env: |
| 37 | return pathlib.Path(env) |
| 38 | raise RuntimeError( |
| 39 | "DISPATCH_ROOT environment variable is required. " |
| 40 | "Point it at a filesystem path that both nodes can read and write." |
| 41 | ) |
| 42 | |
| 43 | |
| 44 | ROOT = _resolve_root() |
| 45 | KEY_PATH = ROOT / "keys" / "hmac.key" |
| 46 | LOG_PATH = ROOT / "session-log.jsonl" |
| 47 | KILLSWITCH = ROOT / "KILLSWITCH" |
| 48 | |
| 49 | SCHEMA_VERSION = 2 |
| 50 | DEFAULTS = { |
| 51 | "require_ack": False, |
| 52 | "require_dangerous": False, |
| 53 | "timeout_s": 600, |
| 54 | "max_output_bytes": 2_000_000, |
| 55 | } |
| 56 | |
| 57 | |
| 58 | def _key() -> bytes: |
| 59 | return KEY_PATH.read_text().strip().encode() |
| 60 | |
| 61 | |
| 62 | def _digest(task: dict) -> str: |
| 63 | payload = "|".join([ |
| 64 | task["id"], task["from"], task["to"], task["created"], |
| 65 | task["priority"], task["request"], |
| 66 | ]).encode() |
| 67 | return hmac.new(_key(), payload, hashlib.sha256).hexdigest() |
| 68 | |
| 69 | |
| 70 | def sign(task: dict) -> dict: |
| 71 | task["hmac"] = _digest(task) |
| 72 | return task |
| 73 | |
| 74 | |
| 75 | def verify(task: dict) -> bool: |
| 76 | provided = task.get("hmac", "") |
| 77 | if not provided: |
| 78 | return False |
| 79 | try: |
| 80 | return hmac.compare_digest(provided, _digest(task)) |
| 81 | except KeyError: |
| 82 | return False |
| 83 | |
| 84 | |
| 85 | def new_task(sender: str, recipient: str, request: str, |
| 86 | priority: str = "normal", callback: str = "file", |
| 87 | require_ack: bool = False, |
| 88 | require_dangerous: bool = False, |
| 89 | timeout_s: int = 600, |
| 90 | max_output_bytes: int = 2_000_000) -> dict: |
| 91 | task = { |
| 92 | "id": str(uuid.uuid4()), |
| 93 | "from": sender, |
| 94 | "to": recipient, |
| 95 | "created": datetime.now(timezone.utc).isoformat(), |
| 96 | "priority": priority, |
| 97 | "request": request, |
| 98 | "callback": callback, |
| 99 | "require_ack": require_ack, |
| 100 | "require_dangerous": require_dangerous, |
| 101 | "timeout_s": timeout_s, |
| 102 | "max_output_bytes": max_output_bytes, |
| 103 | "schema": SCHEMA_VERSION, |
| 104 | } |
| 105 | return sign(task) |
| 106 | |
| 107 | |
| 108 | def fill_defaults(task: dict) -> dict: |
| 109 | for k, v in DEFAULTS.items(): |
| 110 | task.setdefault(k, v) |
| 111 | return task |
| 112 | |
| 113 | |
| 114 | def killswitch_tripped() -> tuple[bool, str]: |
| 115 | if KILLSWITCH.exists(): |
| 116 | try: |
| 117 | return True, KILLSWITCH.read_text().strip() |
| 118 | except Exception: |
| 119 | return True, "unreadable" |
| 120 | return False, "" |
| 121 | |
| 122 | |
| 123 | def _lane_dir_name(recipient: str) -> str: |
| 124 | if recipient == NODE_A: |
| 125 | return f"{NODE_B}-to-{NODE_A}" |
| 126 | return f"{NODE_A}-to-{NODE_B}" |
| 127 | |
| 128 | |
| 129 | def inbox_for(recipient: str) -> pathlib.Path: |
| 130 | return ROOT / _lane_dir_name(recipient) / "inbox" |
| 131 | |
| 132 | |
| 133 | def lane(recipient: str, stage: str) -> pathlib.Path: |
| 134 | lane_dir = ROOT / _lane_dir_name(recipient) / stage |
| 135 | lane_dir.mkdir(parents=True, exist_ok=True) |
| 136 | return lane_dir |
| 137 | |
| 138 | |
| 139 | def enqueue(task: dict) -> pathlib.Path: |
| 140 | lane_dir = inbox_for(task["to"]) |
| 141 | lane_dir.mkdir(parents=True, exist_ok=True) |
| 142 | path = lane_dir / f"{task['id']}.json" |
| 143 | tmp = path.with_suffix(".json.tmp") |
| 144 | tmp.write_text(json.dumps(task, indent=2)) |
| 145 | os.replace(tmp, path) |
| 146 | return path |
| 147 | |
| 148 | |
| 149 | def log_event(session: str, event: str, summary: str, |
| 150 | task_id: str | None = None, **extra) -> None: |
| 151 | entry = { |
| 152 | "ts": datetime.now(timezone.utc).isoformat(), |
| 153 | "session": session, |
| 154 | "event": event, |
| 155 | "summary": summary[:140], |
| 156 | "task_id": task_id, |
| 157 | } |
| 158 | if extra: |
| 159 | entry.update(extra) |
| 160 | with LOG_PATH.open("a") as f: |
| 161 | f.write(json.dumps(entry) + "\n") |
| 162 | |
| 163 | |
| 164 | def heartbeat(session: str, extra: dict | None = None) -> pathlib.Path: |
| 165 | hb = ROOT / "heartbeats" / f"{session}.json" |
| 166 | hb.parent.mkdir(parents=True, exist_ok=True) |
| 167 | payload = { |
| 168 | "host": session, |
| 169 | "ts": datetime.now(timezone.utc).isoformat(), |
| 170 | "unix": int(time.time()), |
| 171 | "pid": os.getpid(), |
| 172 | } |
| 173 | if extra: |
| 174 | payload.update(extra) |
| 175 | tmp = hb.with_suffix(".json.tmp") |
| 176 | tmp.write_text(json.dumps(payload, indent=2)) |
| 177 | os.replace(tmp, hb) |
| 178 | return hb |