Zion Boggan
repos/Claude Dispatch/bin/dispatch_watcher.py
zionboggan.com ↗
197 lines · python
History for this file →
1
"""Shared watcher for both sides.
2
 
3
Each side runs this with --side <node-id>. Watches the inbox addressed to
4
that side, verifies HMAC, optionally gates on human ack, then spawns
5
dispatch-exec with a per-side concurrency cap.
6
 
7
Honors the killswitch file. Concurrency cap = DISPATCH_MAX_PARALLEL (default 2).
8
"""
9
from __future__ import annotations
10
import argparse
11
import json
12
import os
13
import pathlib
14
import subprocess
15
import sys
16
import time
17
 
18
sys.path.insert(0, str(pathlib.Path(__file__).resolve().parent))
19
import dispatch_lib as d
20
 
21
POLL = int(os.environ.get("DISPATCH_POLL_SEC", "3"))
22
MAX_PARALLEL = int(os.environ.get("DISPATCH_MAX_PARALLEL", "2"))
23
EXEC_BIN = os.environ.get(
24
    "DISPATCH_EXEC_BIN",
25
    str(pathlib.Path(__file__).resolve().parent / "dispatch-exec"),
26
)
27
STATE_FILE = pathlib.Path(os.environ.get("DISPATCH_STATE_FILE", "/tmp/dispatch-watcher-state.json"))
28
 
29
 
30
def lanes(side: str) -> dict[str, pathlib.Path]:
31
    other = d.NODE_B if side == d.NODE_A else d.NODE_A
32
    tree = d.ROOT / f"{other}-to-{side}"
33
    return {
34
        "inbox":       tree / "inbox",
35
        "pending_ack": tree / "pending-ack",
36
        "processing":  tree / "processing",
37
        "done":        tree / "done",
38
        "rejected":    tree / "rejected",
39
        "pending_json": tree / "pending.json",
40
    }
41
 
42
 
43
def active_exec_count() -> int:
44
    n = 0
45
    proc = pathlib.Path("/proc")
46
    if not proc.exists():
47
        return 0
48
    for p in proc.iterdir():
49
        if not p.name.isdigit():
50
            continue
51
        try:
52
            cmdline = (p / "cmdline").read_bytes().replace(b"\x00", b" ").decode(errors="replace")
53
        except Exception:
54
            continue
55
        if "dispatch-exec" in cmdline:
56
            n += 1
57
    return n
58
 
59
 
60
def process_inbox(side: str, L: dict) -> None:
61
    for path in sorted(L["inbox"].glob("*.json")):
62
        try:
63
            task = json.loads(path.read_text())
64
        except Exception as e:
65
            target = L["rejected"] / path.name
66
            try: path.rename(target)
67
            except Exception: pass
68
            d.log_event(side, "task_reject", f"parse err: {e}")
69
            continue
70
 
71
        if not d.verify(task):
72
            path.rename(L["rejected"] / path.name)
73
            d.log_event(side, "task_reject", f"bad HMAC id={task.get('id', '?')[:8]}")
74
            continue
75
 
76
        d.fill_defaults(task)
77
        path.write_text(json.dumps(task, indent=2))
78
 
79
        if task.get("require_ack"):
80
            target = L["pending_ack"] / path.name
81
            path.rename(target)
82
            d.log_event(side, "task_pending_ack", task["request"][:80], task["id"])
83
            write_pending_snapshot(L)
84
        else:
85
            target = L["processing"] / path.name
86
            path.rename(target)
87
            d.log_event(side, "task_auto", task["request"][:80], task["id"])
88
 
89
 
90
def process_acked(side: str, L: dict) -> None:
91
    for ack in sorted(L["pending_ack"].glob("*.ack")):
92
        stem = ack.name[:-len(".ack")]
93
        candidates = [L["pending_ack"] / f"{stem}.json", L["pending_ack"] / stem]
94
        task_path = next((p for p in candidates if p.exists()), None)
95
        if task_path is None:
96
            ack.unlink()
97
            continue
98
        target = L["processing"] / task_path.name
99
        task_path.rename(target)
100
        ack.unlink()
101
        write_pending_snapshot(L)
102
 
103
 
104
def dispatch_processing(side: str, L: dict) -> None:
105
    tripped, _ = d.killswitch_tripped()
106
    if tripped:
107
        return
108
 
109
    queued = []
110
    for p in sorted(L["processing"].glob("*.json")):
111
        marker = p.with_suffix(".running")
112
        if marker.exists():
113
            continue
114
        queued.append(p)
115
 
116
    if not queued:
117
        return
118
 
119
    slots = MAX_PARALLEL - active_exec_count()
120
    if slots <= 0:
121
        return
122
 
123
    for p in queued[:slots]:
124
        marker = p.with_suffix(".running")
125
        marker.write_text(str(int(time.time())))
126
        try:
127
            subprocess.Popen(
128
                [EXEC_BIN, str(p)],
129
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
130
                start_new_session=True,
131
            )
132
        except Exception as e:
133
            marker.unlink(missing_ok=True)
134
            d.log_event(side, "exec_spawn_err", str(e)[:120])
135
 
136
 
137
def cleanup_markers(L: dict) -> None:
138
    for m in L["processing"].glob("*.running"):
139
        task_json = m.with_suffix(".json")
140
        if not task_json.exists():
141
            m.unlink(missing_ok=True)
142
 
143
 
144
def write_pending_snapshot(L: dict) -> None:
145
    items = []
146
    for p in sorted(L["pending_ack"].glob("*.json")):
147
        try:
148
            t = json.loads(p.read_text())
149
            items.append({
150
                "id": t["id"], "from": t.get("from"), "priority": t.get("priority"),
151
                "request": t.get("request", "")[:200], "created": t.get("created"),
152
            })
153
        except Exception:
154
            pass
155
    L["pending_json"].write_text(json.dumps({"count": len(items), "items": items}, indent=2))
156
 
157
 
158
def boot_once_log(side: str) -> None:
159
    try:
160
        prev = json.loads(STATE_FILE.read_text()) if STATE_FILE.exists() else {}
161
    except Exception:
162
        prev = {}
163
    now = int(time.time())
164
    last = prev.get("last_start", 0)
165
    if now - last > 120:
166
        d.log_event(side, "watcher_start", f"poll={POLL}s max_parallel={MAX_PARALLEL}")
167
    prev["last_start"] = now
168
    STATE_FILE.write_text(json.dumps(prev))
169
 
170
 
171
def main() -> int:
172
    ap = argparse.ArgumentParser()
173
    ap.add_argument("--side", required=True, help="this node's identifier")
174
    args = ap.parse_args()
175
    side = args.side
176
 
177
    L = lanes(side)
178
    for k, p in L.items():
179
        if k == "pending_json":
180
            continue
181
        p.mkdir(parents=True, exist_ok=True)
182
 
183
    boot_once_log(side)
184
 
185
    while True:
186
        try:
187
            process_inbox(side, L)
188
            process_acked(side, L)
189
            dispatch_processing(side, L)
190
            cleanup_markers(L)
191
        except Exception as e:
192
            d.log_event(side, "watcher_err", str(e)[:120])
193
        time.sleep(POLL)
194
 
195
 
196
if __name__ == "__main__":
197
    raise SystemExit(main())