Zion Boggan zionboggan.com ↗

initial commit

7fba040   Zion Boggan committed on Apr 21, 2026 (2 months ago)
configs/runtime.yaml +63 -0
@@ -0,0 +1,63 @@
+capture:
+ device_index: 0
+ width: 1920
+ height: 1080
+ fps: 60
+ publish_endpoint: tcp://127.0.0.1:5555
+ jpeg_quality: 80
+
+cv:
+ pitch_detect:
+ ball_hsv_low: [0, 0, 190]
+ ball_hsv_high: [180, 60, 255]
+ min_ball_radius_px: 3
+ max_ball_radius_px: 30
+ release_detect_template: configs/templates/release_windup_end.png
+ release_conf_threshold: 0.85
+ pci:
+ template: configs/templates/pci_circle.png
+ conf_threshold: 0.70
+ hsv_low: [45, 140, 140]
+ hsv_high: [75, 255, 255]
+ min_radius_px: 20
+ max_radius_px: 140
+ search_x_min_frac: 0.35
+ search_x_max_frac: 0.65
+ search_y_min_frac: 0.35
+ search_y_max_frac: 0.75
+ min_green_pixels: 150
+ ball_events_endpoint: tcp://127.0.0.1:5561
+ pci_events_endpoint: tcp://127.0.0.1:5562
+ plate_y_frac: 0.85
+
+batting:
+ swing_mode: auto
+ power_swing_zone_margin_px: 30
+ take_on_ball_count_le: 1
+ max_aim_error_px: 14
+ contact_button: X
+ power_button: A
+ take_frames_before_pitch: 2
+
+pitching:
+ style: classic
+ pitch_menu:
+ - {name: fourseam, button: Y, default_target: high_in}
+ - {name: slider, button: B, default_target: low_away}
+ - {name: changeup, button: A, default_target: low_middle}
+ pinpoint_gestures_dir: gestures/
+
+titan_two:
+ bridge_endpoint: tcp://127.0.0.1:5556
+ command_timeout_ms: 5
+
+ui:
+ hotkey_arm: F9
+ hotkey_disarm: F10
+ hotkey_abort: F12
+ voice_enabled: false
+
+safety:
+ abort_on_menu_detected: true
+ abort_on_capture_loss_ms: 500
+ online_mode_abort: true
cv/ball_tracker.py +278 -0
@@ -0,0 +1,278 @@
+"""Ball tracker.
+
+Subscribes to capture frames, finds the ball via HSV + circularity, keeps a
+rolling window of detections, and emits:
+ - ball_track events each frame with the most-confident detection (or miss)
+ - pitch_pred events once a fittable trajectory accumulates (plate_x + eta_ms)
+
+Classical CV only. Tune HSV and radius bounds via configs/runtime.yaml.
+"""
+from __future__ import annotations
+
+import argparse
+import sys
+import time
+from collections import deque
+from pathlib import Path
+
+import cv2
+import numpy as np
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from cv._common import ( # noqa: E402
+ event_subscriber,
+ iter_latest_frames,
+ load_config,
+ make_frame_subscriber,
+ make_pub,
+ send_event,
+)
+
+console = Console()
+
+TRAJ_WINDOW_S = 0.8
+MIN_FIT_POINTS = 4
+FIT_USE_LAST_N = 8
+PITCH_GAP_MS = 300
+MIN_DOWN_PX = 30
+STATIC_WINDOW = 3
+STATIC_SPREAD_PX = 3.0
+BAN_ZONE_MS = 1500
+BAN_ZONE_PX = 6
+PITCH_START_MAX_Y = 500
+
+MAX_STEP_PX = 200
+
+class Detection:
+ __slots__ = ("ts_ns", "x", "y", "r", "score")
+ def __init__(self, ts_ns: int, x: float, y: float, r: float, score: float):
+ self.ts_ns = ts_ns
+ self.x = x
+ self.y = y
+ self.r = r
+ self.score = score
+
+def detect_ball(frame: np.ndarray, cfg: dict) -> Detection | None:
+ pd = cfg["cv"]["pitch_detect"]
+ hsv_low = np.array(pd["ball_hsv_low"], dtype=np.uint8)
+ hsv_high = np.array(pd["ball_hsv_high"], dtype=np.uint8)
+ r_min = float(pd["min_ball_radius_px"])
+ r_max = float(pd["max_ball_radius_px"])
+
+ hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
+ mask = cv2.inRange(hsv, hsv_low, hsv_high)
+ mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), iterations=1)
+
+ contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
+ best: Detection | None = None
+ best_score = 0.0
+ for c in contours:
+ area = cv2.contourArea(c)
+ if area < np.pi * r_min * r_min * 0.5:
+ continue
+ (cx, cy), r = cv2.minEnclosingCircle(c)
+ if r < r_min or r > r_max:
+ continue
+
+ circ_area = np.pi * r * r
+ score = float(area / circ_area) if circ_area > 0 else 0.0
+ if score < 0.6:
+ continue
+ if score > best_score:
+ best_score = score
+ best = Detection(ts_ns=0, x=float(cx), y=float(cy), r=float(r), score=score)
+ return best
+
+def try_fit(trail: deque[Detection], plate_y_px: float) -> tuple[float, float] | None:
+ """Return (plate_x_px, eta_ms_from_now) or None if not fittable."""
+ if len(trail) < MIN_FIT_POINTS:
+ return None
+
+ recent = list(trail)[-FIT_USE_LAST_N:]
+ ys = np.array([d.y for d in recent], dtype=np.float64)
+ if ys.max() - ys.min() < MIN_DOWN_PX:
+ return None
+
+ if (ys[-1] - ys[0]) < -30:
+ return None
+
+ t0 = recent[0].ts_ns
+ ts = np.array([(d.ts_ns - t0) / 1e9 for d in recent], dtype=np.float64)
+ xs = np.array([d.x for d in recent], dtype=np.float64)
+
+ ay, by, cy = np.polyfit(ts, ys, 2)
+ bx, cx = np.polyfit(ts, xs, 1)
+
+ disc = by * by - 4 * ay * (cy - plate_y_px)
+ if disc < 0 or abs(ay) < 1e-6:
+ return None
+ sqrt_d = float(np.sqrt(disc))
+ t_candidates = [(-by + sqrt_d) / (2 * ay), (-by - sqrt_d) / (2 * ay)]
+ t_cross = None
+ for tc in t_candidates:
+ if tc > ts[-1]:
+ if t_cross is None or tc < t_cross:
+ t_cross = tc
+ if t_cross is None:
+ return None
+ now_ns = time.time_ns()
+ plate_ns = t0 + int(t_cross * 1e9)
+ eta_ms = (plate_ns - now_ns) / 1e6
+ if eta_ms < 0 or eta_ms > 2000:
+ return None
+ plate_x = bx * t_cross + cx
+ return float(plate_x), float(eta_ms)
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="Ball tracker + parabolic pitch-prediction.")
+ ap.add_argument("--duration", type=float, default=0.0, help="Stop after N seconds (0 = run forever).")
+ ap.add_argument("--quiet", action="store_true", help="Suppress per-frame status lines.")
+ args = ap.parse_args()
+
+ cfg = load_config()
+ capture_ep = cfg["capture"]["publish_endpoint"]
+ ball_ep = cfg["cv"]["ball_events_endpoint"]
+ plate_y_frac = float(cfg["cv"].get("plate_y_frac", 0.72))
+
+ sub = make_frame_subscriber(capture_ep)
+ pub = make_pub(ball_ep)
+ console.print(f"[green]ball_tracker[/green] sub={capture_ep} pub={ball_ep}")
+
+ trail: deque[Detection] = deque(maxlen=64)
+ banned: deque = deque(maxlen=32)
+ pitch_id = 0
+ last_det_ns: int | None = None
+ t_end = time.perf_counter() + args.duration if args.duration > 0 else None
+ frames = 0
+ hits = 0
+ preds = 0
+ t_report = time.perf_counter()
+
+ try:
+ for meta, frame in iter_latest_frames(sub, timeout_ms=3000):
+ if t_end is not None and time.perf_counter() >= t_end:
+ break
+ frames += 1
+ ts_ns = int(meta["ts_ns"])
+ h = int(meta.get("h", frame.shape[0]))
+ plate_y_px = h * plate_y_frac
+
+ while banned and banned[0][2] <= ts_ns:
+ banned.popleft()
+
+ det = detect_ball(frame, cfg)
+ if det is not None:
+
+ in_banned = any(
+ abs(det.x - bx) < BAN_ZONE_PX and abs(det.y - by) < BAN_ZONE_PX
+ for (bx, by, _) in banned
+ )
+ if in_banned:
+ send_event(pub, {
+ "type": "ball_miss",
+ "seq": int(meta["seq"]),
+ "ts_ns": ts_ns,
+ "reason": "banned_zone",
+ })
+ continue
+
+ det.ts_ns = ts_ns
+
+ if last_det_ns is not None and (ts_ns - last_det_ns) > PITCH_GAP_MS * 1e6:
+ trail.clear()
+ pitch_id += 1
+
+ if trail:
+ last_d = trail[-1]
+ dx = det.x - last_d.x
+ dy_step = det.y - last_d.y
+ if (dx * dx + dy_step * dy_step) ** 0.5 > MAX_STEP_PX:
+ trail.clear()
+ pitch_id += 1
+
+ if len(trail) == 0 and det.y > PITCH_START_MAX_Y:
+ send_event(pub, {
+ "type": "ball_miss",
+ "seq": int(meta["seq"]),
+ "ts_ns": ts_ns,
+ "reason": "not_pitch_start",
+ "det_y": det.y,
+ })
+ continue
+
+ trail.append(det)
+ last_det_ns = ts_ns
+
+ cutoff = ts_ns - int(TRAJ_WINDOW_S * 1e9)
+ while trail and trail[0].ts_ns < cutoff:
+ trail.popleft()
+
+ if len(trail) >= STATIC_WINDOW:
+ recent = list(trail)[-STATIC_WINDOW:]
+ rxs = [d.x for d in recent]
+ rys = [d.y for d in recent]
+ spread = max(max(rxs) - min(rxs), max(rys) - min(rys))
+ if spread < STATIC_SPREAD_PX:
+ cx = sum(rxs) / len(rxs)
+ cy = sum(rys) / len(rys)
+ banned.append((cx, cy, ts_ns + int(BAN_ZONE_MS * 1e6)))
+ trail.clear()
+ last_det_ns = None
+ send_event(pub, {
+ "type": "ball_miss",
+ "seq": int(meta["seq"]),
+ "ts_ns": ts_ns,
+ "reason": "static_ui_banned",
+ "banned_x": cx, "banned_y": cy,
+ })
+ continue
+
+ hits += 1
+ send_event(pub, {
+ "type": "ball_track",
+ "seq": int(meta["seq"]),
+ "ts_ns": ts_ns,
+ "pitch_id": pitch_id,
+ "x": det.x, "y": det.y, "r": det.r,
+ "score": det.score,
+ })
+
+ fit = try_fit(trail, plate_y_px)
+ if fit is not None:
+ plate_x, eta_ms = fit
+ preds += 1
+ send_event(pub, {
+ "type": "pitch_pred",
+ "seq": int(meta["seq"]),
+ "ts_ns": ts_ns,
+ "pitch_id": pitch_id,
+ "plate_x": plate_x,
+ "plate_y": plate_y_px,
+ "eta_ms": eta_ms,
+ "n_points": len(trail),
+ })
+ else:
+ send_event(pub, {
+ "type": "ball_miss",
+ "seq": int(meta["seq"]),
+ "ts_ns": ts_ns,
+ })
+
+ now = time.perf_counter()
+ if not args.quiet and now - t_report >= 5.0:
+ console.print(f"[dim] ball: {frames} frames, {hits} hits, {preds} preds, trail={len(trail)}[/dim]")
+ t_report = now
+ except TimeoutError as e:
+ console.print(f"[red]ball_tracker: {e}. Is capture/ingest.py running?[/red]")
+ return 2
+ except KeyboardInterrupt:
+ console.print("[yellow]ball_tracker interrupted.[/yellow]")
+ finally:
+ sub.close(0)
+ pub.close(0)
+ console.print(f"[bold]ball_tracker summary:[/bold] frames={frames} hits={hits} preds={preds}")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
cv/pci_tracker.py +170 -0
@@ -0,0 +1,170 @@
+"""PCI tracker.
+
+Finds the Plate Coverage Indicator (the green circle in MLB The Show Zone+PCI
+batting). Uses template matching if configs/templates/pci_circle.png exists;
+otherwise falls back to HSV segmentation using the green range in runtime.yaml.
+
+Emits pci_track events each frame (x, y, r, score, method).
+"""
+from __future__ import annotations
+
+import argparse
+import sys
+import time
+from pathlib import Path
+
+import cv2
+import numpy as np
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from cv._common import ( # noqa: E402
+ iter_latest_frames,
+ load_config,
+ make_frame_subscriber,
+ make_pub,
+ send_event,
+)
+
+console = Console()
+
+def load_template(path: Path) -> np.ndarray | None:
+ if not path.exists():
+ return None
+ tpl = cv2.imread(str(path), cv2.IMREAD_COLOR)
+ if tpl is None:
+ console.print(f"[yellow]PCI template at {path} could not be read.[/yellow]")
+ return None
+ return tpl
+
+def detect_by_template(frame: np.ndarray, tpl: np.ndarray, threshold: float):
+ res = cv2.matchTemplate(frame, tpl, cv2.TM_CCOEFF_NORMED)
+ _, max_val, _, max_loc = cv2.minMaxLoc(res)
+ if max_val < threshold:
+ return None
+ th, tw = tpl.shape[:2]
+ x = max_loc[0] + tw / 2.0
+ y = max_loc[1] + th / 2.0
+ r = 0.5 * min(tw, th)
+ return {"x": float(x), "y": float(y), "r": float(r), "score": float(max_val), "method": "template"}
+
+def detect_by_hsv(frame: np.ndarray, cfg_pci: dict):
+ """Centroid-based PCI detection.
+
+ The PCI in MLB 26 renders as a multi-part shape (brackets + inner + center),
+ so fitting a single contour circle doesn't work reliably. Instead we mask
+ the configured green range, restrict to the central strike-zone region,
+ and return the centroid of all matching pixels as the PCI position.
+ Radius is estimated from the spread of matching pixels.
+ """
+ hsv_low = np.array(cfg_pci["hsv_low"], dtype=np.uint8)
+ hsv_high = np.array(cfg_pci["hsv_high"], dtype=np.uint8)
+ min_px = int(cfg_pci.get("min_green_pixels", 150))
+ x_lo_f = float(cfg_pci.get("search_x_min_frac", 0.3))
+ x_hi_f = float(cfg_pci.get("search_x_max_frac", 0.7))
+ y_lo_f = float(cfg_pci.get("search_y_min_frac", 0.3))
+ y_hi_f = float(cfg_pci.get("search_y_max_frac", 0.8))
+
+ h, w = frame.shape[:2]
+ x0, x1 = int(w * x_lo_f), int(w * x_hi_f)
+ y0, y1 = int(h * y_lo_f), int(h * y_hi_f)
+
+ hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
+ mask = cv2.inRange(hsv, hsv_low, hsv_high)
+
+ window = np.zeros_like(mask)
+ window[y0:y1, x0:x1] = 255
+ mask = cv2.bitwise_and(mask, window)
+ mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), iterations=1)
+
+ ys, xs = np.where(mask > 0)
+ n = len(xs)
+ if n < min_px:
+ return None
+
+ cx = float(xs.mean())
+ cy = float(ys.mean())
+
+ dx = xs - cx
+ dy = ys - cy
+ dists = np.sqrt(dx * dx + dy * dy)
+ r = float(np.percentile(dists, 80))
+
+ x_bb_min, x_bb_max = int(xs.min()), int(xs.max())
+ y_bb_min, y_bb_max = int(ys.min()), int(ys.max())
+ bbox_area = max(1, (x_bb_max - x_bb_min + 1) * (y_bb_max - y_bb_min + 1))
+ score = float(n / bbox_area)
+
+ return {"x": cx, "y": cy, "r": r, "score": score, "method": "hsv_centroid", "n_px": n}
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="PCI tracker.")
+ ap.add_argument("--duration", type=float, default=0.0, help="Stop after N seconds (0 = run forever).")
+ ap.add_argument("--quiet", action="store_true")
+ args = ap.parse_args()
+
+ cfg = load_config()
+ capture_ep = cfg["capture"]["publish_endpoint"]
+ pci_ep = cfg["cv"]["pci_events_endpoint"]
+ pci_cfg = cfg["cv"]["pci"]
+
+ tpl_path = Path(__file__).resolve().parents[1] / pci_cfg["template"]
+ tpl = load_template(tpl_path)
+ tpl_thresh = float(pci_cfg.get("conf_threshold", 0.7))
+
+ sub = make_frame_subscriber(capture_ep)
+ pub = make_pub(pci_ep)
+ method_label = "template" if tpl is not None else "hsv"
+ console.print(
+ f"[green]pci_tracker[/green] sub={capture_ep} pub={pci_ep} method={method_label}"
+ + ("" if tpl is not None else f" (no template at {tpl_path.name}; using HSV fallback)")
+ )
+
+ t_end = time.perf_counter() + args.duration if args.duration > 0 else None
+ frames = 0
+ hits = 0
+ t_report = time.perf_counter()
+
+ try:
+ for meta, frame in iter_latest_frames(sub, timeout_ms=3000):
+ if t_end is not None and time.perf_counter() >= t_end:
+ break
+ frames += 1
+ det = None
+ if tpl is not None:
+ det = detect_by_template(frame, tpl, tpl_thresh)
+ if det is None:
+ det = detect_by_hsv(frame, pci_cfg)
+ if det is not None:
+ hits += 1
+ event = {
+ "type": "pci_track",
+ "seq": int(meta["seq"]),
+ "ts_ns": int(meta["ts_ns"]),
+ **det,
+ }
+ send_event(pub, event)
+ else:
+ send_event(pub, {
+ "type": "pci_miss",
+ "seq": int(meta["seq"]),
+ "ts_ns": int(meta["ts_ns"]),
+ })
+
+ now = time.perf_counter()
+ if not args.quiet and now - t_report >= 5.0:
+ console.print(f"[dim] pci: {frames} frames, {hits} hits ({method_label})[/dim]")
+ t_report = now
+ except TimeoutError as e:
+ console.print(f"[red]pci_tracker: {e}. Is capture/ingest.py running?[/red]")
+ return 2
+ except KeyboardInterrupt:
+ console.print("[yellow]pci_tracker interrupted.[/yellow]")
+ finally:
+ sub.close(0)
+ pub.close(0)
+ console.print(f"[bold]pci_tracker summary:[/bold] frames={frames} hits={hits} method={method_label}")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
io_titan/bridge.py +153 -0
@@ -0,0 +1,153 @@
+"""PC-side bridge to Titan Two via HID vendor interface.
+
+Sends 65-byte output reports (1-byte report ID + 64 payload bytes) that the
+paired bridge.gpc script parses. Fail-safe: if you stop sending for >200 ms,
+the GPC script zeroes all outputs.
+
+Usage (Python API):
+ with TitanBridge() as t:
+ t.send(stick_lx=0, stick_ly=0, buttons={"A": True}) # press A
+ time.sleep(0.15)
+ t.send() # release all
+"""
+from __future__ import annotations
+
+import time
+from dataclasses import dataclass
+from typing import Iterable
+
+import pywinusb.hid as hid
+
+VID = 0x2508
+PID = 0x0032
+
+PKT_MAGIC = 0xAA
+CMD_SET = 0x01
+CMD_DISARM = 0xFF
+
+BUTTONS_LO = {
+ "A": 0x01, "B": 0x02, "X": 0x04, "Y": 0x08,
+ "LB": 0x10, "RB": 0x20, "BACK": 0x40, "START": 0x80,
+}
+BUTTONS_HI = {
+ "LS": 0x01, "RS": 0x02,
+ "UP": 0x04, "DOWN": 0x08, "LEFT": 0x10, "RIGHT": 0x20,
+}
+
+def _clamp_stick(v: float) -> int:
+ v = int(round(v))
+ if v > 100: return 100
+ if v < -100: return -100
+ return v & 0xFF
+
+def _clamp_trigger(v: float) -> int:
+ v = int(round(v))
+ if v > 100: return 100
+ if v < 0: return 0
+ return v
+
+@dataclass
+class ControllerState:
+ lx: float = 0.0
+ ly: float = 0.0
+ rx: float = 0.0
+ ry: float = 0.0
+ lt: float = 0.0
+ rt: float = 0.0
+ buttons: frozenset = frozenset()
+
+class TitanBridge:
+ def __init__(self, vid: int = VID, pid: int = PID):
+ self.vid = vid
+ self.pid = pid
+ self._dev = None
+ self._out_report = None
+
+ def open(self) -> None:
+ filt = hid.HidDeviceFilter(vendor_id=self.vid, product_id=self.pid)
+ devs = filt.get_devices()
+ if not devs:
+ raise RuntimeError(f"No HID device found for VID={self.vid:04x} PID={self.pid:04x}. Is the Titan Two connected?")
+ self._dev = devs[0]
+ self._dev.open()
+ reports = self._dev.find_output_reports()
+ if not reports:
+ self._dev.close()
+ raise RuntimeError("Titan Two has no output reports; cannot send.")
+ self._out_report = reports[0]
+
+ def close(self) -> None:
+ if self._dev is not None:
+ try:
+
+ self._send_raw(cmd=CMD_DISARM)
+ except Exception:
+ pass
+ try:
+ self._dev.close()
+ except Exception:
+ pass
+ self._dev = None
+ self._out_report = None
+
+ def __enter__(self) -> "TitanBridge":
+ self.open()
+ return self
+
+ def __exit__(self, *exc) -> None:
+ self.close()
+
+ def _send_raw(self, cmd: int, lx: int = 0, ly: int = 0, rx: int = 0, ry: int = 0,
+ btn_lo: int = 0, btn_hi: int = 0, lt: int = 0, rt: int = 0) -> None:
+ if self._out_report is None:
+ raise RuntimeError("Bridge not open")
+ data = [0] * 65
+
+ data[0] = 0x00
+
+ data[1] = PKT_MAGIC
+ data[2] = cmd & 0xFF
+ data[3] = lx & 0xFF
+ data[4] = ly & 0xFF
+ data[5] = rx & 0xFF
+ data[6] = ry & 0xFF
+ data[7] = btn_lo & 0xFF
+ data[8] = btn_hi & 0xFF
+ data[9] = lt & 0xFF
+ data[10] = rt & 0xFF
+ self._out_report.set_raw_data(data)
+ self._out_report.send()
+
+ def send(self, lx: float = 0, ly: float = 0, rx: float = 0, ry: float = 0,
+ lt: float = 0, rt: float = 0, buttons: Iterable[str] = ()) -> None:
+ """Send a controller state to the Titan Two.
+
+ Sticks are in [-100, 100]. Triggers [0, 100]. Buttons is a set of names
+ like {"A", "LB"}. Call this at least every 200 ms to stay "armed".
+ """
+ btn_lo = 0
+ btn_hi = 0
+ for name in buttons:
+ up = name.upper()
+ if up in BUTTONS_LO:
+ btn_lo |= BUTTONS_LO[up]
+ elif up in BUTTONS_HI:
+ btn_hi |= BUTTONS_HI[up]
+ else:
+ raise ValueError(f"Unknown button name: {name!r}")
+ self._send_raw(
+ cmd=CMD_SET,
+ lx=_clamp_stick(lx), ly=_clamp_stick(ly),
+ rx=_clamp_stick(rx), ry=_clamp_stick(ry),
+ btn_lo=btn_lo, btn_hi=btn_hi,
+ lt=_clamp_trigger(lt), rt=_clamp_trigger(rt),
+ )
+
+ def disarm(self) -> None:
+ self._send_raw(cmd=CMD_DISARM)
+
+def tap(bridge: TitanBridge, button: str, duration_s: float = 0.15) -> None:
+ """Press a button briefly and release."""
+ bridge.send(buttons={button})
+ time.sleep(duration_s)
+ bridge.send()
io_titan/hello_xbox.py +51 -0
@@ -0,0 +1,51 @@
+"""Hello Xbox - presses A on the Xbox controller via Titan Two.
+
+Requires the bridge.gpc script to be loaded and running on the Titan Two
+(load once via Gtuner IV, then it runs standalone). Defaults to pressing A
+for 150 ms.
+"""
+from __future__ import annotations
+
+import argparse
+import sys
+import time
+from pathlib import Path
+
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from io_titan.bridge import TitanBridge, tap # noqa: E402
+
+console = Console()
+
+def main() -> int:
+ ap = argparse.ArgumentParser()
+ ap.add_argument("--button", default="A", help="Button to tap: A,B,X,Y,LB,RB,LS,RS,UP,DOWN,LEFT,RIGHT,BACK,START")
+ ap.add_argument("--hold-ms", type=int, default=150)
+ ap.add_argument("--count", type=int, default=1)
+ ap.add_argument("--sweep", action="store_true", help="Instead of tapping, sweep the left stick in a circle.")
+ args = ap.parse_args()
+
+ console.print("[cyan]Opening Titan Two HID...[/cyan]")
+ with TitanBridge() as t:
+ console.print("[green]Opened.[/green] Sending to Xbox.")
+ if args.sweep:
+ console.print("Sweeping left stick (3 seconds). Watch any stick-driven UI on screen.")
+ t0 = time.perf_counter()
+ import math
+ while time.perf_counter() - t0 < 3.0:
+ theta = (time.perf_counter() - t0) * 2 * math.pi
+ t.send(lx=80 * math.cos(theta), ly=80 * math.sin(theta))
+ time.sleep(0.016)
+ t.send()
+ console.print("[green]Sweep complete.[/green]")
+ else:
+ for i in range(args.count):
+ console.print(f" tap {args.button} ({i+1}/{args.count})")
+ tap(t, args.button, args.hold_ms / 1000.0)
+ time.sleep(0.25)
+ console.print("[green]Done. Disarming and releasing device.[/green]")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
requirements.txt +7 -0
@@ -0,0 +1,7 @@
+opencv-python>=4.9
+numpy>=1.26
+scipy>=1.12
+pyzmq>=25
+pyyaml>=6.0
+pydantic>=2
+rich>=13
tools/analyze_session.py +99 -0
@@ -0,0 +1,99 @@
+"""Analyze a recorded events JSONL for pitch count, plate_y calibration, PCI stability."""
+from __future__ import annotations
+
+import json
+import sys
+from collections import defaultdict
+from pathlib import Path
+
+import numpy as np
+from rich.console import Console
+
+console = Console()
+
+def load_events(path: Path) -> list[dict]:
+ out: list[dict] = []
+ with open(path, "r", encoding="utf-8") as f:
+ for line in f:
+ line = line.strip()
+ if not line:
+ continue
+ try:
+ out.append(json.loads(line))
+ except Exception:
+ pass
+ return out
+
+def analyze(events: list[dict]) -> None:
+ ball_tracks = [e for e in events if e.get("type") == "ball_track"]
+ ball_misses = [e for e in events if e.get("type") == "ball_miss"]
+ pitch_preds = [e for e in events if e.get("type") == "pitch_pred"]
+ pci_tracks = [e for e in events if e.get("type") == "pci_track"]
+ pci_misses = [e for e in events if e.get("type") == "pci_miss"]
+
+ total_ball = len(ball_tracks) + len(ball_misses)
+ total_pci = len(pci_tracks) + len(pci_misses)
+ console.print("[bold]Session totals[/bold]")
+ console.print(f" ball: {len(ball_tracks)}/{total_ball} hits ({100*len(ball_tracks)/max(total_ball,1):.1f}%), {len(pitch_preds)} predictions")
+ console.print(f" pci: {len(pci_tracks)}/{total_pci} hits ({100*len(pci_tracks)/max(total_pci,1):.1f}%)")
+
+ pitches: dict[int, list[dict]] = defaultdict(list)
+ for e in ball_tracks:
+ pid = int(e.get("pitch_id", 0))
+ pitches[pid].append(e)
+ console.print(f"\n[bold]Pitches detected[/bold]: {len(pitches)}")
+
+ max_ys = []
+ preds_per_pitch: dict[int, list[dict]] = defaultdict(list)
+ for pp in pitch_preds:
+ preds_per_pitch[int(pp.get("pitch_id", 0))].append(pp)
+ for pid in sorted(pitches.keys()):
+ pts = pitches[pid]
+ if len(pts) < 2:
+ continue
+ xs = np.array([p["x"] for p in pts])
+ ys = np.array([p["y"] for p in pts])
+ ts = np.array([p["ts_ns"] for p in pts])
+ dur_ms = (ts[-1] - ts[0]) / 1e6
+ max_y = float(ys.max())
+ y_delta = float(ys.max() - ys.min())
+ pred_n = len(preds_per_pitch.get(pid, []))
+ console.print(
+ f" pitch #{pid:3d} n={len(pts):3d} dur={dur_ms:6.0f}ms "
+ f"x=({xs.min():.0f}..{xs.max():.0f}) y=({ys.min():.0f}..{ys.max():.0f}) "
+ f"max_y={max_y:.0f} dy={y_delta:.0f} preds={pred_n}"
+ )
+ if y_delta >= 100:
+ max_ys.append(max_y)
+
+ console.print("\n[bold]plate_y calibration[/bold]")
+ if max_ys:
+ arr = np.array(max_ys)
+ console.print(f" real pitches (dy>=100px): {len(arr)}")
+ console.print(f" max_y stats: mean={arr.mean():.1f} p50={np.percentile(arr,50):.1f} p95={np.percentile(arr,95):.1f} max={arr.max():.1f}")
+ rec = float(np.percentile(arr, 90)) / 1080.0
+ console.print(f" [green]suggested plate_y_frac ~= {rec:.3f}[/green] (was 0.72)")
+ else:
+ console.print(" no pitches with >=100px y-delta - probably none captured or trajectories too short")
+
+ console.print("\n[bold]PCI centroid stability[/bold]")
+ if pci_tracks:
+ xs = np.array([p["x"] for p in pci_tracks])
+ ys = np.array([p["y"] for p in pci_tracks])
+ console.print(f" samples: {len(pci_tracks)}")
+ console.print(f" x: mean={xs.mean():.0f} std={xs.std():.0f} range=({xs.min():.0f}..{xs.max():.0f})")
+ console.print(f" y: mean={ys.mean():.0f} std={ys.std():.0f} range=({ys.min():.0f}..{ys.max():.0f})")
+ else:
+ console.print(" no PCI hits recorded")
+
+def main() -> int:
+ if len(sys.argv) < 2:
+ console.print("usage: analyze_session.py <events.jsonl>")
+ return 2
+ events = load_events(Path(sys.argv[1]))
+ console.print(f"loaded {len(events)} events from {sys.argv[1]}")
+ analyze(events)
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/burst.py +72 -0
@@ -0,0 +1,72 @@
+"""Burst snapshot: capture N frames over T seconds, save each as PNG.
+
+Useful for catching the ball in flight and getting a clean PCI frame for
+tuning / template extraction. Saves to logs/burst_NN_tXXXXXXms.png.
+"""
+from __future__ import annotations
+
+import argparse
+import shutil
+import sys
+import time
+from pathlib import Path
+
+import cv2
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from capture.ingest import open_capture # noqa: E402
+from cv._common import load_config # noqa: E402
+
+console = Console()
+OUT_DIR = Path(__file__).resolve().parents[1] / "logs" / "burst"
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="Burst frame capture.")
+ ap.add_argument("--count", type=int, default=30, help="Total frames to save.")
+ ap.add_argument("--duration", type=float, default=10.0, help="Spread frames over this many seconds.")
+ ap.add_argument("--clear", action="store_true", help="Wipe logs/burst before starting.")
+ args = ap.parse_args()
+
+ if args.clear and OUT_DIR.exists():
+ shutil.rmtree(OUT_DIR)
+ OUT_DIR.mkdir(parents=True, exist_ok=True)
+
+ cfg = load_config()
+ cap = open_capture(cfg)
+ if cap is None or not cap.isOpened():
+ console.print("[red]Could not open capture card.[/red]")
+ return 2
+
+ for _ in range(5):
+ cap.read()
+
+ interval = args.duration / max(args.count - 1, 1)
+ t0 = time.perf_counter()
+ console.print(
+ f"[bold cyan]Burst: {args.count} frames over {args.duration:.1f}s "
+ f"(every {interval*1000:.0f}ms) -> {OUT_DIR}[/bold cyan]"
+ )
+
+ saved = 0
+ for i in range(args.count):
+ target = t0 + i * interval
+
+ while time.perf_counter() < target - 0.002:
+ cap.read()
+ ok, frame = cap.read()
+ if not ok or frame is None:
+ continue
+ elapsed_ms = int((time.perf_counter() - t0) * 1000)
+ out = OUT_DIR / f"burst_{i:02d}_t{elapsed_ms:05d}ms.png"
+ cv2.imwrite(str(out), frame)
+ saved += 1
+ if i % 5 == 0:
+ console.print(f" [{i+1}/{args.count}] t={elapsed_ms}ms -> {out.name}")
+
+ cap.release()
+ console.print(f"[green]Saved {saved}/{args.count} frames to {OUT_DIR}[/green]")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/crop.py +37 -0
@@ -0,0 +1,37 @@
+"""Crop a region of a frame at native pixel resolution and save it."""
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+import cv2
+from rich.console import Console
+
+console = Console()
+
+def main() -> int:
+ if len(sys.argv) < 6:
+ console.print("usage: crop.py <in.png> <x> <y> <w> <h> [out.png]")
+ return 2
+ inp = Path(sys.argv[1])
+ x, y, w, h = int(sys.argv[2]), int(sys.argv[3]), int(sys.argv[4]), int(sys.argv[5])
+ out = Path(sys.argv[6]) if len(sys.argv) >= 7 else (
+ Path(__file__).resolve().parents[1] / "logs" / f"crop_{inp.stem}_{x}_{y}_{w}x{h}.png"
+ )
+ frame = cv2.imread(str(inp), cv2.IMREAD_COLOR)
+ if frame is None:
+ console.print(f"[red]Can't read {inp}[/red]")
+ return 2
+ H, W = frame.shape[:2]
+ x0, y0 = max(0, x), max(0, y)
+ x1, y1 = min(W, x + w), min(H, y + h)
+ if x1 <= x0 or y1 <= y0:
+ console.print(f"[red]Invalid crop: image is {W}x{H}[/red]")
+ return 2
+ crop = frame[y0:y1, x0:x1]
+ cv2.imwrite(str(out), crop)
+ console.print(f"saved {crop.shape[1]}x{crop.shape[0]} -> {out}")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/detect_on_frame.py +59 -0
@@ -0,0 +1,59 @@
+"""Run ball and PCI detectors on a single saved frame and save annotated PNG."""
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+import cv2
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from cv._common import load_config # noqa: E402
+from cv.ball_tracker import detect_ball # noqa: E402
+from cv.pci_tracker import detect_by_hsv as pci_detect # noqa: E402
+
+console = Console()
+
+def main() -> int:
+ if len(sys.argv) < 2:
+ console.print("usage: detect_on_frame.py <frame.png>")
+ return 2
+ fp = Path(sys.argv[1])
+ frame = cv2.imread(str(fp), cv2.IMREAD_COLOR)
+ if frame is None:
+ console.print(f"[red]Can't read {fp}[/red]")
+ return 2
+
+ cfg = load_config()
+ overlay = frame.copy()
+ h, w = frame.shape[:2]
+ plate_y = int(h * float(cfg["cv"].get("plate_y_frac", 0.72)))
+ cv2.line(overlay, (0, plate_y), (w, plate_y), (255, 255, 255), 1)
+
+ ball = detect_ball(frame, cfg)
+ if ball is not None:
+ cv2.circle(overlay, (int(ball.x), int(ball.y)), max(int(ball.r), 4), (0, 255, 255), 2)
+ cv2.putText(overlay, f"ball r={ball.r:.0f} s={ball.score:.2f}",
+ (int(ball.x) + 10, int(ball.y) - 10),
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2, cv2.LINE_AA)
+ console.print(f"ball: x={ball.x:.0f} y={ball.y:.0f} r={ball.r:.1f} score={ball.score:.2f}")
+ else:
+ console.print("ball: none")
+
+ pci = pci_detect(frame, cfg["cv"]["pci"])
+ if pci is not None:
+ cv2.circle(overlay, (int(pci["x"]), int(pci["y"])), int(pci["r"]), (0, 255, 0), 2)
+ cv2.putText(overlay, f"pci r={pci['r']:.0f} s={pci['score']:.2f}",
+ (int(pci["x"]) + 10, int(pci["y"]) + 20),
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2, cv2.LINE_AA)
+ console.print(f"pci: x={pci['x']:.0f} y={pci['y']:.0f} r={pci['r']:.1f} score={pci['score']:.2f}")
+ else:
+ console.print("pci: none")
+
+ out = Path(__file__).resolve().parents[1] / "logs" / f"detect_{fp.stem}.png"
+ cv2.imwrite(str(out), overlay)
+ console.print(f"saved -> {out}")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/extract_pci_template.py +96 -0
@@ -0,0 +1,96 @@
+"""Extract a PCI template from a frame.
+
+Uses HSV green masking to find the PCI cluster near frame center, then crops
+a bounding box around it and saves it as configs/templates/pci_circle.png
+so pci_tracker.py will auto-switch to template matching.
+
+Also saves the HSV mask as logs/pci_mask_<frame>.png so you can see what the
+green threshold is picking up.
+"""
+from __future__ import annotations
+
+import argparse
+import sys
+from pathlib import Path
+
+import cv2
+import numpy as np
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from cv._common import load_config # noqa: E402
+
+console = Console()
+ROOT = Path(__file__).resolve().parents[1]
+TEMPLATE_DIR = ROOT / "configs" / "templates"
+LOG_DIR = ROOT / "logs"
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="Extract PCI template from a frame.")
+ ap.add_argument("frame_path", help="Path to a captured frame PNG.")
+ ap.add_argument("--pad", type=int, default=10, help="Pixel margin around bbox.")
+ ap.add_argument("--center-crop-frac", type=float, default=0.6,
+ help="Restrict search to the central fraction of the frame (0..1) to avoid UI green.")
+ args = ap.parse_args()
+
+ frame_path = Path(args.frame_path)
+ frame = cv2.imread(str(frame_path), cv2.IMREAD_COLOR)
+ if frame is None:
+ console.print(f"[red]Could not read {frame_path}[/red]")
+ return 2
+
+ cfg = load_config()
+ pci_cfg = cfg["cv"]["pci"]
+ hsv_low = np.array(pci_cfg["hsv_low"], dtype=np.uint8)
+ hsv_high = np.array(pci_cfg["hsv_high"], dtype=np.uint8)
+
+ h, w = frame.shape[:2]
+ hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
+ mask = cv2.inRange(hsv, hsv_low, hsv_high)
+
+ cf = float(args.center_crop_frac)
+ cw = int(w * cf); ch = int(h * cf)
+ x0 = (w - cw) // 2; y0 = (h - ch) // 2
+ center_mask = np.zeros_like(mask)
+ center_mask[y0:y0 + ch, x0:x0 + cw] = 255
+ mask_c = cv2.bitwise_and(mask, center_mask)
+
+ mask_c = cv2.morphologyEx(mask_c, cv2.MORPH_CLOSE, np.ones((15, 15), np.uint8), iterations=2)
+ mask_c = cv2.morphologyEx(mask_c, cv2.MORPH_OPEN, np.ones((3, 3), np.uint8), iterations=1)
+
+ ys, xs = np.where(mask_c > 0)
+ if len(xs) == 0:
+ console.print("[red]No green pixels found in central region - check HSV range or center-crop.[/red]")
+ mask_out = LOG_DIR / f"pci_mask_{frame_path.stem}.png"
+ cv2.imwrite(str(mask_out), mask_c)
+ console.print(f"[yellow]Saved (empty) mask -> {mask_out}[/yellow]")
+ return 2
+
+ x_min, x_max = int(xs.min()), int(xs.max())
+ y_min, y_max = int(ys.min()), int(ys.max())
+ x_min = max(0, x_min - args.pad)
+ y_min = max(0, y_min - args.pad)
+ x_max = min(w - 1, x_max + args.pad)
+ y_max = min(h - 1, y_max + args.pad)
+
+ console.print(f"PCI bbox: x=({x_min}..{x_max}) y=({y_min}..{y_max}) "
+ f"w={x_max-x_min} h={y_max-y_min} green_px={len(xs)}")
+
+ crop = frame[y_min:y_max + 1, x_min:x_max + 1].copy()
+ TEMPLATE_DIR.mkdir(parents=True, exist_ok=True)
+ tpl_out = TEMPLATE_DIR / "pci_circle.png"
+ cv2.imwrite(str(tpl_out), crop)
+ console.print(f"[green]Saved template -> {tpl_out} ({crop.shape[1]}x{crop.shape[0]})[/green]")
+
+ ann = frame.copy()
+ cv2.rectangle(ann, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
+ ann_out = LOG_DIR / f"pci_extract_{frame_path.stem}.png"
+ cv2.imwrite(str(ann_out), ann)
+ mask_out = LOG_DIR / f"pci_mask_{frame_path.stem}.png"
+ cv2.imwrite(str(mask_out), mask_c)
+ console.print(f"[green]Saved annotated -> {ann_out}[/green]")
+ console.print(f"[green]Saved mask -> {mask_out}[/green]")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/find_green.py +73 -0
@@ -0,0 +1,73 @@
+"""Scan a frame for bright saturated green clusters at several HSV tightnesses.
+
+Reports clusters as (x, y, w, h, pixel_count) and saves a mask per tightness.
+"""
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+import cv2
+import numpy as np
+from rich.console import Console
+
+console = Console()
+
+def report_mask(frame: np.ndarray, hsv: np.ndarray, lo, hi, label: str, out_dir: Path, stem: str) -> None:
+ lo_a = np.array(lo, np.uint8)
+ hi_a = np.array(hi, np.uint8)
+ mask = cv2.inRange(hsv, lo_a, hi_a)
+ total = int((mask > 0).sum())
+ if total == 0:
+ console.print(f"[dim]{label}: 0 pixels match {list(lo)}..{list(hi)}[/dim]")
+ return
+
+ mask_c = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((5, 5), np.uint8), iterations=1)
+ num, labels_img, stats, _ = cv2.connectedComponentsWithStats(mask_c, connectivity=8)
+ console.print(f"[bold]{label}: total={total}, clusters={num-1} at range {list(lo)}..{list(hi)}[/bold]")
+ clusters = []
+ for i in range(1, num):
+ x, y, w, h, area = stats[i]
+ if area < 8:
+ continue
+ clusters.append((area, x, y, w, h))
+ clusters.sort(reverse=True)
+ for area, x, y, w, h in clusters[:8]:
+ console.print(f" cluster: x={x}..{x+w} y={y}..{y+h} area={area}")
+ out = out_dir / f"green_{label}_{stem}.png"
+ cv2.imwrite(str(out), mask)
+ console.print(f" mask -> {out}")
+
+ ann = frame.copy()
+ for area, x, y, w, h in clusters[:8]:
+ cv2.rectangle(ann, (x, y), (x + w, y + h), (255, 0, 255), 2)
+ ann_out = out_dir / f"green_{label}_{stem}_ann.png"
+ cv2.imwrite(str(ann_out), ann)
+
+def main() -> int:
+ if len(sys.argv) < 2:
+ console.print("usage: find_green.py <frame.png>")
+ return 2
+ fp = Path(sys.argv[1])
+ frame = cv2.imread(str(fp), cv2.IMREAD_COLOR)
+ if frame is None:
+ console.print(f"[red]can't read {fp}[/red]")
+ return 2
+
+ hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
+ out_dir = Path(__file__).resolve().parents[1] / "logs"
+ stem = fp.stem
+
+ ranges = [
+ ("strict", [50, 240, 240], [70, 255, 255]),
+ ("tight", [48, 200, 200], [72, 255, 255]),
+ ("medium", [45, 150, 150], [75, 255, 255]),
+ ("loose", [40, 100, 100], [80, 255, 255]),
+ ]
+ for label, lo, hi in ranges:
+ report_mask(frame, hsv, lo, hi, label, out_dir, stem)
+ console.print("")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/fit_debug.py +89 -0
@@ -0,0 +1,89 @@
+"""Replay recorded ball_track events through try_fit, report why predictions fire or fail."""
+from __future__ import annotations
+
+import json
+import sys
+from collections import defaultdict
+from pathlib import Path
+
+import numpy as np
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from cv._common import load_config # noqa: E402
+
+console = Console()
+
+def fit_and_explain(pts: list[dict], plate_y_px: float) -> str:
+ if len(pts) < 4:
+ return f"SKIP n={len(pts)}<4"
+ ys = np.array([p["y"] for p in pts], dtype=np.float64)
+ xs = np.array([p["x"] for p in pts], dtype=np.float64)
+ if ys.max() - ys.min() < 30:
+ return f"REJECT dy={ys.max()-ys.min():.0f}<30"
+ if ys[-1] - ys[0] <= 0:
+ return f"REJECT not moving down (y0={ys[0]:.0f} -> yN={ys[-1]:.0f})"
+
+ t0 = pts[0]["ts_ns"]
+ ts = np.array([(p["ts_ns"] - t0) / 1e9 for p in pts], dtype=np.float64)
+ try:
+ ay, by, cy = np.polyfit(ts, ys, 2)
+ bx, cx = np.polyfit(ts, xs, 1)
+ except Exception as e:
+ return f"FIT_ERR {e}"
+
+ disc = by * by - 4 * ay * (cy - plate_y_px)
+ if disc < 0:
+ return f"REJECT disc<0 (ay={ay:.1f} by={by:.1f} cy={cy:.1f} plate={plate_y_px:.0f})"
+ if abs(ay) < 1e-6:
+ return f"REJECT ay~0 ({ay:.3e})"
+ sqrt_d = float(np.sqrt(disc))
+ t_cand = [(-by + sqrt_d) / (2 * ay), (-by - sqrt_d) / (2 * ay)]
+ future = [tc for tc in t_cand if tc > ts[-1]]
+ if not future:
+
+ past_best = max(tc for tc in t_cand if tc <= ts[-1])
+ return f"REJECT crossing in past: t_cand={[f'{t:.3f}' for t in t_cand]} ts[-1]={ts[-1]:.3f} (past_best={past_best:.3f})"
+ t_cross = min(future)
+ plate_x = bx * t_cross + cx
+ eta_ms = (t_cross - ts[-1]) * 1000.0
+ if eta_ms > 2000:
+ return f"REJECT eta={eta_ms:.0f}ms>2000"
+ return f"FIT OK plate_x={plate_x:.0f} eta={eta_ms:.1f}ms t_cross={t_cross:.3f} ts[-1]={ts[-1]:.3f} ay={ay:.1f} by={by:.1f}"
+
+def main() -> int:
+ if len(sys.argv) < 2:
+ console.print("usage: fit_debug.py <events.jsonl>")
+ return 2
+
+ cfg = load_config()
+ plate_y_px = 1080 * float(cfg["cv"]["plate_y_frac"])
+ console.print(f"plate_y_px = {plate_y_px:.0f} (plate_y_frac={cfg['cv']['plate_y_frac']})\n")
+
+ pitches: dict[int, list[dict]] = defaultdict(list)
+ with open(sys.argv[1], "r", encoding="utf-8") as f:
+ for line in f:
+ try:
+ ev = json.loads(line)
+ except Exception:
+ continue
+ if ev.get("type") != "ball_track":
+ continue
+ pid = int(ev.get("pitch_id", 0))
+ pitches[pid].append(ev)
+
+ for pid in sorted(pitches.keys()):
+ pts = pitches[pid]
+ if len(pts) < 4:
+ continue
+ ys = [p["y"] for p in pts]
+ xs = [p["x"] for p in pts]
+ dy = max(ys) - min(ys)
+ if dy < 100:
+ continue
+ res = fit_and_explain(pts, plate_y_px)
+ console.print(f"pitch #{pid:3d} n={len(pts):2d} dy={dy:4.0f} {res}")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/heic_convert.py +26 -0
@@ -0,0 +1,26 @@
+"""Convert HEIC to PNG via pillow-heif."""
+from __future__ import annotations
+import sys
+from pathlib import Path
+
+from PIL import Image
+import pillow_heif
+
+pillow_heif.register_heif_opener()
+
+def main() -> int:
+ if len(sys.argv) < 2:
+ print("usage: heic_convert.py <dir or file>...")
+ return 2
+ for arg in sys.argv[1:]:
+ p = Path(arg)
+ files = [p] if p.is_file() else sorted(p.glob("*.HEIC")) + sorted(p.glob("*.heic"))
+ for f in files:
+ out = f.with_suffix(".png")
+ img = Image.open(f)
+ img.save(out, "PNG")
+ print(f"{f.name} -> {out.name} ({img.size[0]}x{img.size[1]})")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/hsv_probe.py +115 -0
@@ -0,0 +1,115 @@
+"""HSV color probe.
+
+Given a frame PNG and a bounding box (or auto-detect the brightest green cluster),
+print the HSV percentile stats so we can pick a tight HSV range that catches the
+PCI overlay but not the grass field.
+
+Usage:
+ hsv_probe.py <frame.png> # auto-find brightest green
+ hsv_probe.py <frame.png> --box X Y W H # sample a specific box
+ hsv_probe.py <frame.png> --test-range "50,200,200-70,255,255"
+"""
+from __future__ import annotations
+
+import argparse
+import sys
+from pathlib import Path
+
+import cv2
+import numpy as np
+from rich.console import Console
+
+console = Console()
+ROOT = Path(__file__).resolve().parents[1]
+LOG_DIR = ROOT / "logs"
+
+def auto_pci_box(hsv: np.ndarray) -> tuple[int, int, int, int]:
+ """Find the brightest, most-saturated green cluster in the central half."""
+ h, w = hsv.shape[:2]
+ mask = cv2.inRange(hsv, np.array([40, 200, 200], np.uint8), np.array([80, 255, 255], np.uint8))
+ cx0, cy0 = w // 4, h // 4
+ cx1, cy1 = 3 * w // 4, 3 * h // 4
+ center = np.zeros_like(mask)
+ center[cy0:cy1, cx0:cx1] = 255
+ mask = cv2.bitwise_and(mask, center)
+ ys, xs = np.where(mask > 0)
+ if len(xs) == 0:
+ return w // 3, h // 3, w // 3, h // 3
+ return int(xs.min()), int(ys.min()), int(xs.max() - xs.min()), int(ys.max() - ys.min())
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="HSV probe.")
+ ap.add_argument("frame_path")
+ ap.add_argument("--box", nargs=4, type=int, metavar=("X", "Y", "W", "H"))
+ ap.add_argument("--test-range", type=str, default=None,
+ help="'Hlo,Slo,Vlo-Hhi,Shi,Vhi' - save a mask with this range.")
+ args = ap.parse_args()
+
+ frame = cv2.imread(args.frame_path, cv2.IMREAD_COLOR)
+ if frame is None:
+ console.print(f"[red]Cannot read {args.frame_path}[/red]")
+ return 2
+ hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
+
+ if args.box is None:
+ x, y, w, h = auto_pci_box(hsv)
+ console.print(f"[cyan]auto-box: x={x} y={y} w={w} h={h}[/cyan]")
+ else:
+ x, y, w, h = args.box
+
+ sub = hsv[y:y + h, x:x + w]
+ if sub.size == 0:
+ console.print("[red]Empty box.[/red]")
+ return 2
+
+ greens = cv2.inRange(sub, np.array([30, 80, 80], np.uint8), np.array([95, 255, 255], np.uint8))
+ pci_pixels = sub[greens > 0]
+ if len(pci_pixels) == 0:
+ console.print("[yellow]No green-ish pixels in the box.[/yellow]")
+ return 2
+
+ hs = pci_pixels[:, 0]
+ ss = pci_pixels[:, 1]
+ vs = pci_pixels[:, 2]
+ def stats(name, arr):
+ console.print(
+ f" {name}: min={arr.min():3d} p5={np.percentile(arr,5):5.1f} "
+ f"p50={np.percentile(arr,50):5.1f} p95={np.percentile(arr,95):5.1f} max={arr.max():3d}"
+ )
+ console.print(f"[bold]HSV stats of green-ish pixels in box (n={len(pci_pixels)}):[/bold]")
+ stats("H", hs); stats("S", ss); stats("V", vs)
+
+ suggested_low = (int(np.percentile(hs, 5)) - 2,
+ max(40, int(np.percentile(ss, 5)) - 20),
+ max(40, int(np.percentile(vs, 5)) - 20))
+ suggested_high = (int(np.percentile(hs, 95)) + 2,
+ min(255, int(np.percentile(ss, 95)) + 20),
+ min(255, int(np.percentile(vs, 95)) + 20))
+ console.print(
+ f"[bold green]Suggested tight HSV:[/bold green] low={list(suggested_low)} high={list(suggested_high)}"
+ )
+
+ if args.test_range:
+ lo_str, hi_str = args.test_range.split("-")
+ lo = np.array([int(x) for x in lo_str.split(",")], np.uint8)
+ hi = np.array([int(x) for x in hi_str.split(",")], np.uint8)
+ else:
+ lo = np.array(suggested_low, np.uint8)
+ hi = np.array(suggested_high, np.uint8)
+
+ full_mask = cv2.inRange(hsv, lo, hi)
+ LOG_DIR.mkdir(exist_ok=True)
+ mask_out = LOG_DIR / f"hsv_probe_mask_{Path(args.frame_path).stem}.png"
+ cv2.imwrite(str(mask_out), full_mask)
+ total = int((full_mask > 0).sum())
+ console.print(f"Mask with range {list(lo)}..{list(hi)}: {total} pixels -> {mask_out}")
+
+ ann = frame.copy()
+ cv2.rectangle(ann, (x, y), (x + w, y + h), (255, 0, 255), 2)
+ ann_out = LOG_DIR / f"hsv_probe_box_{Path(args.frame_path).stem}.png"
+ cv2.imwrite(str(ann_out), ann)
+ console.print(f"Annotated box -> {ann_out}")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/record_events.py +110 -0
@@ -0,0 +1,110 @@
+"""Subscribe to ball and PCI event streams, append JSONL to logs/events_*.jsonl.
+
+Runs for --duration seconds (default 60) then exits. Used to capture live
+session data for offline analysis (plate_y calibration, detector stability).
+"""
+from __future__ import annotations
+
+import argparse
+import datetime as dt
+import json
+import sys
+import time
+from pathlib import Path
+
+import zmq
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from cv._common import event_subscriber, load_config # noqa: E402
+
+console = Console()
+LOG_DIR = Path(__file__).resolve().parents[1] / "logs"
+
+def main() -> int:
+ ap = argparse.ArgumentParser()
+ ap.add_argument("--duration", type=float, default=60.0)
+ ap.add_argument("--out", type=str, default=None, help="Output JSONL path.")
+ args = ap.parse_args()
+
+ cfg = load_config()
+ ball_ep = cfg["cv"]["ball_events_endpoint"]
+ pci_ep = cfg["cv"]["pci_events_endpoint"]
+
+ if args.out:
+ out_path = Path(args.out)
+ else:
+ ts = dt.datetime.now().strftime("%Y%m%d_%H%M%S")
+ out_path = LOG_DIR / f"events_{ts}.jsonl"
+ out_path.parent.mkdir(parents=True, exist_ok=True)
+
+ ctx = zmq.Context.instance()
+ ball_sub = event_subscriber(ball_ep, ctx=ctx)
+ pci_sub = event_subscriber(pci_ep, ctx=ctx)
+ poller = zmq.Poller()
+ poller.register(ball_sub, zmq.POLLIN)
+ poller.register(pci_sub, zmq.POLLIN)
+
+ console.print(
+ f"[green]Recording[/green] ball={ball_ep} pci={pci_ep} for {args.duration:.0f}s -> {out_path.name}"
+ )
+
+ n_ball = 0
+ n_pci = 0
+ n_pred = 0
+ t_start = time.perf_counter()
+ deadline = t_start + args.duration
+ last_report = t_start
+ with open(out_path, "w", encoding="utf-8") as f:
+ while True:
+ remaining = deadline - time.perf_counter()
+ if remaining <= 0:
+ break
+ events = dict(poller.poll(timeout=int(min(remaining, 1.0) * 1000)))
+ if ball_sub in events:
+ while True:
+ try:
+ raw = ball_sub.recv(flags=zmq.NOBLOCK)
+ except zmq.Again:
+ break
+ try:
+ ev = json.loads(raw.decode("utf-8"))
+ except Exception:
+ continue
+ ev["_src"] = "ball"
+ f.write(json.dumps(ev) + "\n")
+ if ev.get("type") == "ball_track":
+ n_ball += 1
+ elif ev.get("type") == "pitch_pred":
+ n_pred += 1
+ if pci_sub in events:
+ while True:
+ try:
+ raw = pci_sub.recv(flags=zmq.NOBLOCK)
+ except zmq.Again:
+ break
+ try:
+ ev = json.loads(raw.decode("utf-8"))
+ except Exception:
+ continue
+ ev["_src"] = "pci"
+ f.write(json.dumps(ev) + "\n")
+ if ev.get("type") == "pci_track":
+ n_pci += 1
+ now = time.perf_counter()
+ if now - last_report >= 5.0:
+ elapsed = now - t_start
+ console.print(
+ f"[dim] t={elapsed:5.1f}s ball_hits={n_ball} pci_hits={n_pci} preds={n_pred}[/dim]"
+ )
+ last_report = now
+
+ ball_sub.close(0); pci_sub.close(0); ctx.term()
+ console.print(
+ f"[bold green]Recorded[/bold green] ball_hits={n_ball} pci_hits={n_pci} preds={n_pred} "
+ f"-> {out_path}"
+ )
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/scan_burst_green.py +41 -0
@@ -0,0 +1,41 @@
+"""Scan every burst frame for PCI-like green pixels; report per-frame counts."""
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+import cv2
+import numpy as np
+from rich.console import Console
+
+console = Console()
+BURST_DIR = Path(__file__).resolve().parents[1] / "logs" / "burst"
+
+def main() -> int:
+ lo = np.array([45, 140, 140], np.uint8)
+ hi = np.array([75, 255, 255], np.uint8)
+ frames = sorted(BURST_DIR.glob("burst_*.png"))
+ console.print(f"scanning {len(frames)} frames with HSV {list(lo)}..{list(hi)}")
+ hits = []
+ for f in frames:
+ img = cv2.imread(str(f))
+ if img is None:
+ continue
+ hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
+ h, w = img.shape[:2]
+ mask = cv2.inRange(hsv, lo, hi)
+
+ window = np.zeros_like(mask)
+ window[int(0.3 * h):int(0.75 * h), int(0.35 * w):int(0.65 * w)] = 255
+ mask = cv2.bitwise_and(mask, window)
+ n = int((mask > 0).sum())
+ if n >= 100:
+ hits.append((f.name, n))
+ hits.sort(key=lambda x: -x[1])
+ console.print(f"[bold]Found {len(hits)} frames with >=100 central green px:[/bold]")
+ for name, n in hits[:15]:
+ console.print(f" {name} {n} px")
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())
tools/viewer.py +175 -0
@@ -0,0 +1,175 @@
+"""Visual overlay for live debugging.
+
+Subscribes to:
+ - capture frames (capture.publish_endpoint)
+ - ball/pitch events (cv.ball_events_endpoint)
+ - PCI events (cv.pci_events_endpoint)
+
+Draws the most recent detections on each frame. Press q or ESC to quit.
+Run capture/ingest.py, cv/ball_tracker.py, cv/pci_tracker.py first.
+"""
+from __future__ import annotations
+
+import argparse
+import json
+import sys
+import time
+from pathlib import Path
+
+import cv2
+import numpy as np
+import zmq
+from rich.console import Console
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+from cv._common import ( # noqa: E402
+ event_subscriber,
+ iter_latest_frames,
+ load_config,
+ make_frame_subscriber,
+)
+
+console = Console()
+
+COLOR_BALL = (0, 255, 255)
+COLOR_BALL_PRED = (0, 128, 255)
+COLOR_PCI = (0, 255, 0)
+COLOR_PLATE = (255, 255, 255)
+COLOR_BG = (0, 0, 0)
+
+def drain_latest_json(sock: zmq.Socket) -> list[dict]:
+ """Pull every pending JSON event without blocking."""
+ events: list[dict] = []
+ while True:
+ try:
+ raw = sock.recv(flags=zmq.NOBLOCK)
+ except zmq.Again:
+ break
+ try:
+ events.append(json.loads(raw.decode("utf-8")))
+ except Exception:
+ pass
+ return events
+
+def draw_hud(frame: np.ndarray, lines: list[str]) -> None:
+ y = 28
+ for line in lines:
+ cv2.putText(frame, line, (12, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 3, cv2.LINE_AA)
+ cv2.putText(frame, line, (12, y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1, cv2.LINE_AA)
+ y += 22
+
+def main() -> int:
+ ap = argparse.ArgumentParser(description="pitch-tracker-cv live viewer.")
+ ap.add_argument("--scale", type=float, default=0.75, help="Display scale factor (0.5 = half-size).")
+ ap.add_argument("--no-gui", action="store_true", help="Pull events and print stats without opening a window.")
+ args = ap.parse_args()
+
+ cfg = load_config()
+ cap_ep = cfg["capture"]["publish_endpoint"]
+ ball_ep = cfg["cv"]["ball_events_endpoint"]
+ pci_ep = cfg["cv"]["pci_events_endpoint"]
+ plate_y_frac = float(cfg["cv"].get("plate_y_frac", 0.72))
+
+ ctx = zmq.Context.instance()
+ frame_sub = make_frame_subscriber(cap_ep, ctx=ctx)
+ ball_sub = event_subscriber(ball_ep, ctx=ctx)
+ pci_sub = event_subscriber(pci_ep, ctx=ctx)
+ console.print(f"[green]viewer[/green] cap={cap_ep} ball={ball_ep} pci={pci_ep}")
+
+ last_ball: dict | None = None
+ last_pred: dict | None = None
+ last_pci: dict | None = None
+ frames = 0
+ t_start = time.perf_counter()
+
+ win = "pitch-tracker-cv viewer"
+ if not args.no_gui:
+ cv2.namedWindow(win, cv2.WINDOW_NORMAL)
+
+ try:
+ for meta, frame in iter_latest_frames(frame_sub, timeout_ms=3000):
+ frames += 1
+ for ev in drain_latest_json(ball_sub):
+ if ev.get("type") == "ball_track":
+ last_ball = ev
+ elif ev.get("type") == "pitch_pred":
+ last_pred = ev
+ elif ev.get("type") == "ball_miss":
+ if last_ball and ev["ts_ns"] - last_ball.get("ts_ns", 0) > 3e8:
+ last_ball = None
+ for ev in drain_latest_json(pci_sub):
+ if ev.get("type") == "pci_track":
+ last_pci = ev
+ elif ev.get("type") == "pci_miss":
+ if last_pci and ev["ts_ns"] - last_pci.get("ts_ns", 0) > 3e8:
+ last_pci = None
+
+ if args.no_gui:
+ if frames % 60 == 0:
+ console.print(
+ f"[dim] viewer: {frames} frames "
+ f"ball={'y' if last_ball else 'n'} "
+ f"pred={'y' if last_pred else 'n'} "
+ f"pci={'y' if last_pci else 'n'}[/dim]"
+ )
+ continue
+
+ h, w = frame.shape[:2]
+ overlay = frame.copy()
+
+ plate_y = int(h * plate_y_frac)
+ cv2.line(overlay, (0, plate_y), (w, plate_y), COLOR_PLATE, 1, cv2.LINE_AA)
+
+ if last_ball is not None:
+ bx, by, br = int(last_ball["x"]), int(last_ball["y"]), int(last_ball["r"])
+ cv2.circle(overlay, (bx, by), max(br, 4), COLOR_BALL, 2)
+ cv2.circle(overlay, (bx, by), 2, COLOR_BALL, -1)
+
+ if last_pred is not None:
+ px = int(last_pred["plate_x"])
+ py = int(last_pred["plate_y"])
+ cv2.drawMarker(overlay, (px, py), COLOR_BALL_PRED, cv2.MARKER_CROSS, 28, 2)
+ cv2.putText(
+ overlay,
+ f"eta {last_pred['eta_ms']:.0f}ms",
+ (px + 14, py - 8),
+ cv2.FONT_HERSHEY_SIMPLEX,
+ 0.6,
+ COLOR_BALL_PRED,
+ 2,
+ cv2.LINE_AA,
+ )
+
+ if last_pci is not None:
+ pcx, pcy, pcr = int(last_pci["x"]), int(last_pci["y"]), int(last_pci["r"])
+ cv2.circle(overlay, (pcx, pcy), pcr, COLOR_PCI, 2)
+ cv2.circle(overlay, (pcx, pcy), 3, COLOR_PCI, -1)
+
+ dt = time.perf_counter() - t_start
+ fps = frames / dt if dt > 0 else 0.0
+ draw_hud(overlay, [
+ f"frames {frames} display {fps:.1f} fps seq {meta['seq']}",
+ f"ball {'Y' if last_ball else '-'} pred {'Y' if last_pred else '-'} pci {'Y' if last_pci else '-'}",
+ ])
+
+ if args.scale != 1.0:
+ overlay = cv2.resize(overlay, None, fx=args.scale, fy=args.scale, interpolation=cv2.INTER_AREA)
+ cv2.imshow(win, overlay)
+ key = cv2.waitKey(1) & 0xFF
+ if key in (ord("q"), 27):
+ break
+ except TimeoutError as e:
+ console.print(f"[red]viewer: {e}. Is capture/ingest.py running?[/red]")
+ return 2
+ except KeyboardInterrupt:
+ console.print("[yellow]viewer interrupted.[/yellow]")
+ finally:
+ frame_sub.close(0)
+ ball_sub.close(0)
+ pci_sub.close(0)
+ if not args.no_gui:
+ cv2.destroyAllWindows()
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())