| 1 | """Subscribe to ball and PCI event streams, append JSONL to logs/events_*.jsonl. |
| 2 | |
| 3 | Runs for --duration seconds (default 60) then exits. Used to capture live |
| 4 | session data for offline analysis (plate_y calibration, detector stability). |
| 5 | """ |
| 6 | from __future__ import annotations |
| 7 | |
| 8 | import argparse |
| 9 | import datetime as dt |
| 10 | import json |
| 11 | import sys |
| 12 | import time |
| 13 | from pathlib import Path |
| 14 | |
| 15 | import zmq |
| 16 | from rich.console import Console |
| 17 | |
| 18 | sys.path.insert(0, str(Path(__file__).resolve().parents[1])) |
| 19 | from cv._common import event_subscriber, load_config |
| 20 | |
| 21 | console = Console() |
| 22 | LOG_DIR = Path(__file__).resolve().parents[1] / "logs" |
| 23 | |
| 24 | def main() -> int: |
| 25 | ap = argparse.ArgumentParser() |
| 26 | ap.add_argument("--duration", type=float, default=60.0) |
| 27 | ap.add_argument("--out", type=str, default=None, help="Output JSONL path.") |
| 28 | args = ap.parse_args() |
| 29 | |
| 30 | cfg = load_config() |
| 31 | ball_ep = cfg["cv"]["ball_events_endpoint"] |
| 32 | pci_ep = cfg["cv"]["pci_events_endpoint"] |
| 33 | |
| 34 | if args.out: |
| 35 | out_path = Path(args.out) |
| 36 | else: |
| 37 | ts = dt.datetime.now().strftime("%Y%m%d_%H%M%S") |
| 38 | out_path = LOG_DIR / f"events_{ts}.jsonl" |
| 39 | out_path.parent.mkdir(parents=True, exist_ok=True) |
| 40 | |
| 41 | ctx = zmq.Context.instance() |
| 42 | ball_sub = event_subscriber(ball_ep, ctx=ctx) |
| 43 | pci_sub = event_subscriber(pci_ep, ctx=ctx) |
| 44 | poller = zmq.Poller() |
| 45 | poller.register(ball_sub, zmq.POLLIN) |
| 46 | poller.register(pci_sub, zmq.POLLIN) |
| 47 | |
| 48 | console.print( |
| 49 | f"[green]Recording[/green] ball={ball_ep} pci={pci_ep} for {args.duration:.0f}s -> {out_path.name}" |
| 50 | ) |
| 51 | |
| 52 | n_ball = 0 |
| 53 | n_pci = 0 |
| 54 | n_pred = 0 |
| 55 | t_start = time.perf_counter() |
| 56 | deadline = t_start + args.duration |
| 57 | last_report = t_start |
| 58 | with open(out_path, "w", encoding="utf-8") as f: |
| 59 | while True: |
| 60 | remaining = deadline - time.perf_counter() |
| 61 | if remaining <= 0: |
| 62 | break |
| 63 | events = dict(poller.poll(timeout=int(min(remaining, 1.0) * 1000))) |
| 64 | if ball_sub in events: |
| 65 | while True: |
| 66 | try: |
| 67 | raw = ball_sub.recv(flags=zmq.NOBLOCK) |
| 68 | except zmq.Again: |
| 69 | break |
| 70 | try: |
| 71 | ev = json.loads(raw.decode("utf-8")) |
| 72 | except Exception: |
| 73 | continue |
| 74 | ev["_src"] = "ball" |
| 75 | f.write(json.dumps(ev) + "\n") |
| 76 | if ev.get("type") == "ball_track": |
| 77 | n_ball += 1 |
| 78 | elif ev.get("type") == "pitch_pred": |
| 79 | n_pred += 1 |
| 80 | if pci_sub in events: |
| 81 | while True: |
| 82 | try: |
| 83 | raw = pci_sub.recv(flags=zmq.NOBLOCK) |
| 84 | except zmq.Again: |
| 85 | break |
| 86 | try: |
| 87 | ev = json.loads(raw.decode("utf-8")) |
| 88 | except Exception: |
| 89 | continue |
| 90 | ev["_src"] = "pci" |
| 91 | f.write(json.dumps(ev) + "\n") |
| 92 | if ev.get("type") == "pci_track": |
| 93 | n_pci += 1 |
| 94 | now = time.perf_counter() |
| 95 | if now - last_report >= 5.0: |
| 96 | elapsed = now - t_start |
| 97 | console.print( |
| 98 | f"[dim] t={elapsed:5.1f}s ball_hits={n_ball} pci_hits={n_pci} preds={n_pred}[/dim]" |
| 99 | ) |
| 100 | last_report = now |
| 101 | |
| 102 | ball_sub.close(0); pci_sub.close(0); ctx.term() |
| 103 | console.print( |
| 104 | f"[bold green]Recorded[/bold green] ball_hits={n_ball} pci_hits={n_pci} preds={n_pred} " |
| 105 | f"-> {out_path}" |
| 106 | ) |
| 107 | return 0 |
| 108 | |
| 109 | if __name__ == "__main__": |
| 110 | sys.exit(main()) |