Zion Boggan
repos/Pitch Tracker CV/tools/record_events.py
zionboggan.com ↗
110 lines · python
History for this file →
1
"""Subscribe to ball and PCI event streams, append JSONL to logs/events_*.jsonl.
2
 
3
Runs for --duration seconds (default 60) then exits. Used to capture live
4
session data for offline analysis (plate_y calibration, detector stability).
5
"""
6
from __future__ import annotations
7
 
8
import argparse
9
import datetime as dt
10
import json
11
import sys
12
import time
13
from pathlib import Path
14
 
15
import zmq
16
from rich.console import Console
17
 
18
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
19
from cv._common import event_subscriber, load_config
20
 
21
console = Console()
22
LOG_DIR = Path(__file__).resolve().parents[1] / "logs"
23
 
24
def main() -> int:
25
    ap = argparse.ArgumentParser()
26
    ap.add_argument("--duration", type=float, default=60.0)
27
    ap.add_argument("--out", type=str, default=None, help="Output JSONL path.")
28
    args = ap.parse_args()
29
 
30
    cfg = load_config()
31
    ball_ep = cfg["cv"]["ball_events_endpoint"]
32
    pci_ep = cfg["cv"]["pci_events_endpoint"]
33
 
34
    if args.out:
35
        out_path = Path(args.out)
36
    else:
37
        ts = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
38
        out_path = LOG_DIR / f"events_{ts}.jsonl"
39
    out_path.parent.mkdir(parents=True, exist_ok=True)
40
 
41
    ctx = zmq.Context.instance()
42
    ball_sub = event_subscriber(ball_ep, ctx=ctx)
43
    pci_sub = event_subscriber(pci_ep, ctx=ctx)
44
    poller = zmq.Poller()
45
    poller.register(ball_sub, zmq.POLLIN)
46
    poller.register(pci_sub, zmq.POLLIN)
47
 
48
    console.print(
49
        f"[green]Recording[/green] ball={ball_ep} pci={pci_ep} for {args.duration:.0f}s -> {out_path.name}"
50
    )
51
 
52
    n_ball = 0
53
    n_pci = 0
54
    n_pred = 0
55
    t_start = time.perf_counter()
56
    deadline = t_start + args.duration
57
    last_report = t_start
58
    with open(out_path, "w", encoding="utf-8") as f:
59
        while True:
60
            remaining = deadline - time.perf_counter()
61
            if remaining <= 0:
62
                break
63
            events = dict(poller.poll(timeout=int(min(remaining, 1.0) * 1000)))
64
            if ball_sub in events:
65
                while True:
66
                    try:
67
                        raw = ball_sub.recv(flags=zmq.NOBLOCK)
68
                    except zmq.Again:
69
                        break
70
                    try:
71
                        ev = json.loads(raw.decode("utf-8"))
72
                    except Exception:
73
                        continue
74
                    ev["_src"] = "ball"
75
                    f.write(json.dumps(ev) + "\n")
76
                    if ev.get("type") == "ball_track":
77
                        n_ball += 1
78
                    elif ev.get("type") == "pitch_pred":
79
                        n_pred += 1
80
            if pci_sub in events:
81
                while True:
82
                    try:
83
                        raw = pci_sub.recv(flags=zmq.NOBLOCK)
84
                    except zmq.Again:
85
                        break
86
                    try:
87
                        ev = json.loads(raw.decode("utf-8"))
88
                    except Exception:
89
                        continue
90
                    ev["_src"] = "pci"
91
                    f.write(json.dumps(ev) + "\n")
92
                    if ev.get("type") == "pci_track":
93
                        n_pci += 1
94
            now = time.perf_counter()
95
            if now - last_report >= 5.0:
96
                elapsed = now - t_start
97
                console.print(
98
                    f"[dim]  t={elapsed:5.1f}s  ball_hits={n_ball}  pci_hits={n_pci}  preds={n_pred}[/dim]"
99
                )
100
                last_report = now
101
 
102
    ball_sub.close(0); pci_sub.close(0); ctx.term()
103
    console.print(
104
        f"[bold green]Recorded[/bold green] ball_hits={n_ball} pci_hits={n_pci} preds={n_pred} "
105
        f"-> {out_path}"
106
    )
107
    return 0
108
 
109
if __name__ == "__main__":
110
    sys.exit(main())