Zion Boggan
repos/Pitch Tracker CV/capture/ingest.py
zionboggan.com ↗
217 lines · python
History for this file →
1
"""Capture card ingest service.
2
 
3
Smoke mode (--smoke): 10s test that prints FPS + latency and saves one frame.
4
Default: subscribe-side-agnostic ZMQ PUB of JPEG frames on capture.publish_endpoint.
5
"""
6
from __future__ import annotations
7
 
8
import argparse
9
import json
10
import sys
11
import time
12
from pathlib import Path
13
 
14
import cv2
15
import numpy as np
16
import yaml
17
import zmq
18
from rich.console import Console
19
 
20
console = Console()
21
 
22
ROOT = Path(__file__).resolve().parents[1]
23
CONFIG_PATH = ROOT / "configs" / "runtime.yaml"
24
LOG_DIR = ROOT / "logs"
25
 
26
def load_config() -> dict:
27
    with open(CONFIG_PATH, "r", encoding="utf-8") as f:
28
        return yaml.safe_load(f)
29
 
30
def probe_devices(max_index: int = 5, backend: int = cv2.CAP_DSHOW) -> list[tuple[int, int, int]]:
31
    found: list[tuple[int, int, int]] = []
32
    for i in range(max_index + 1):
33
        cap = cv2.VideoCapture(i, backend)
34
        if cap.isOpened():
35
            w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
36
            h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
37
            found.append((i, w, h))
38
        cap.release()
39
    return found
40
 
41
def _try_open(idx: int, backend: int, w: int, h: int, fps: int) -> cv2.VideoCapture | None:
42
    cap = cv2.VideoCapture(idx, backend)
43
    if not cap.isOpened():
44
        cap.release()
45
        return None
46
    cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"MJPG"))
47
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, w)
48
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, h)
49
    cap.set(cv2.CAP_PROP_FPS, fps)
50
    ok, frame = cap.read()
51
    if not ok or frame is None:
52
        cap.release()
53
        return None
54
    return cap
55
 
56
def open_capture(cfg: dict) -> cv2.VideoCapture | None:
57
    cap_cfg = cfg["capture"]
58
    idx = cap_cfg.get("device_index", 0)
59
    w = cap_cfg.get("width", 1920)
60
    h = cap_cfg.get("height", 1080)
61
    fps = cap_cfg.get("fps", 60)
62
    candidate_indices = [idx]
63
    for extra in range(0, 6):
64
        if extra not in candidate_indices:
65
            candidate_indices.append(extra)
66
    backends = [cv2.CAP_DSHOW, cv2.CAP_MSMF, cv2.CAP_ANY]
67
    for backend in backends:
68
        for cand_idx in candidate_indices:
69
            cap = _try_open(cand_idx, backend, w, h, fps)
70
            if cap is not None:
71
                console.print(
72
                    f"[dim]open_capture: index={cand_idx} backend={backend} "
73
                    f"size={int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))}x{int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))}[/dim]"
74
                )
75
                return cap
76
    return None
77
 
78
def smoke_test(cfg: dict, duration_s: float = 10.0) -> int:
79
    LOG_DIR.mkdir(parents=True, exist_ok=True)
80
 
81
    console.print("[bold cyan]Probing video devices 0..5 (DirectShow)...[/bold cyan]")
82
    devs = probe_devices()
83
    if not devs:
84
        console.print(
85
            "[bold red]No DirectShow video devices could be opened.[/bold red]\n"
86
            "  Check: capture card USB cable seated, card has power (LED on),\n"
87
            "  Xbox HDMI plugged into card HDMI IN (not OUT)."
88
        )
89
        return 2
90
    for i, w, h in devs:
91
        console.print(f"  device {i}: default size {w}x{h}")
92
 
93
    cap = open_capture(cfg)
94
    if cap is None or not cap.isOpened():
95
        console.print(
96
            f"[bold red]Could not open configured device_index={cfg['capture']['device_index']}.[/bold red]\n"
97
            "  Try a different index from the probe results above and update configs/runtime.yaml."
98
        )
99
        return 2
100
 
101
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
102
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
103
    reported_fps = cap.get(cv2.CAP_PROP_FPS)
104
    console.print(
105
        f"[green]Opened device_index={cfg['capture']['device_index']} "
106
        f"at {w}x{h}, reported fps={reported_fps:.1f}[/green]"
107
    )
108
    console.print(f"[bold]Running {duration_s:.1f}s smoke test...[/bold]")
109
 
110
    frame_count = 0
111
    first_frame: np.ndarray | None = None
112
    last_frame: np.ndarray | None = None
113
    latencies_ms: list[float] = []
114
 
115
    t_start = time.perf_counter()
116
    deadline = t_start + duration_s
117
    while time.perf_counter() < deadline:
118
        t0 = time.perf_counter()
119
        ret, frame = cap.read()
120
        t1 = time.perf_counter()
121
        if not ret or frame is None:
122
            continue
123
        latencies_ms.append((t1 - t0) * 1000.0)
124
        if first_frame is None:
125
            first_frame = frame.copy()
126
        last_frame = frame
127
        frame_count += 1
128
    elapsed = time.perf_counter() - t_start
129
    cap.release()
130
 
131
    if frame_count == 0 or first_frame is None:
132
        console.print("[bold red]Zero frames captured in 10 seconds.[/bold red]")
133
        return 3
134
 
135
    fps = frame_count / elapsed if elapsed > 0 else 0.0
136
    lat = np.asarray(latencies_ms)
137
 
138
    console.print("[bold green]Smoke test results[/bold green]")
139
    console.print(f"  frames:        {frame_count}")
140
    console.print(f"  elapsed:       {elapsed:.2f}s")
141
    console.print(f"  measured FPS:  {fps:.2f}")
142
    console.print(f"  frame size:    {first_frame.shape[1]}x{first_frame.shape[0]}  (channels={first_frame.shape[2]})")
143
    console.print(
144
        f"  read latency:  mean {lat.mean():.2f}ms  "
145
        f"p50 {np.percentile(lat, 50):.2f}ms  "
146
        f"p95 {np.percentile(lat, 95):.2f}ms  "
147
        f"max {lat.max():.2f}ms"
148
    )
149
 
150
    out_first = LOG_DIR / "smoke_frame.png"
151
    cv2.imwrite(str(out_first), first_frame)
152
    console.print(f"[green]Saved first frame -> {out_first}[/green]")
153
    if last_frame is not None and frame_count > 1:
154
        out_last = LOG_DIR / "smoke_frame_last.png"
155
        cv2.imwrite(str(out_last), last_frame)
156
        console.print(f"[green]Saved last frame  -> {out_last}[/green]")
157
    return 0
158
 
159
def run_publisher(cfg: dict) -> int:
160
    cap = open_capture(cfg)
161
    if cap is None or not cap.isOpened():
162
        console.print("[red]Capture device not available; publisher exiting.[/red]")
163
        return 2
164
    endpoint = cfg["capture"]["publish_endpoint"]
165
    jpeg_q = int(cfg["capture"].get("jpeg_quality", 80))
166
    ctx = zmq.Context.instance()
167
    sock = ctx.socket(zmq.PUB)
168
    sock.setsockopt(zmq.SNDHWM, 2)
169
    sock.bind(endpoint)
170
    console.print(f"[green]Publishing multipart [meta, jpeg] on {endpoint}[/green]  (Ctrl-C to stop)")
171
 
172
    seq = 0
173
    t_last_report = time.perf_counter()
174
    frames_since_report = 0
175
    try:
176
        while True:
177
            ret, frame = cap.read()
178
            if not ret or frame is None:
179
                continue
180
            ok, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, jpeg_q])
181
            if not ok:
182
                continue
183
            meta = {
184
                "seq": seq,
185
                "ts_ns": time.time_ns(),
186
                "h": int(frame.shape[0]),
187
                "w": int(frame.shape[1]),
188
            }
189
            sock.send_multipart([json.dumps(meta).encode("utf-8"), buf.tobytes()])
190
            seq += 1
191
            frames_since_report += 1
192
            now = time.perf_counter()
193
            if now - t_last_report >= 5.0:
194
                fps = frames_since_report / (now - t_last_report)
195
                console.print(f"[dim]  publisher: {fps:.1f} fps, last seq={seq - 1}[/dim]")
196
                t_last_report = now
197
                frames_since_report = 0
198
    except KeyboardInterrupt:
199
        console.print("[yellow]Publisher interrupted.[/yellow]")
200
    finally:
201
        cap.release()
202
        sock.close(0)
203
        ctx.term()
204
    return 0
205
 
206
def main() -> None:
207
    p = argparse.ArgumentParser(description="pitch-tracker-cv capture card ingest.")
208
    p.add_argument("--smoke", action="store_true", help="10s smoke test: print FPS/latency, save one frame, exit.")
209
    p.add_argument("--duration", type=float, default=10.0, help="Smoke test duration in seconds.")
210
    args = p.parse_args()
211
    cfg = load_config()
212
    if args.smoke:
213
        sys.exit(smoke_test(cfg, duration_s=args.duration))
214
    sys.exit(run_publisher(cfg))
215
 
216
if __name__ == "__main__":
217
    main()