| 1 | """Shared helpers for CV subscribers.""" |
| 2 | from __future__ import annotations |
| 3 | |
| 4 | import json |
| 5 | from pathlib import Path |
| 6 | from typing import Iterator |
| 7 | |
| 8 | import cv2 |
| 9 | import numpy as np |
| 10 | import yaml |
| 11 | import zmq |
| 12 | |
| 13 | ROOT = Path(__file__).resolve().parents[1] |
| 14 | CONFIG_PATH = ROOT / "configs" / "runtime.yaml" |
| 15 | |
| 16 | def load_config() -> dict: |
| 17 | with open(CONFIG_PATH, "r", encoding="utf-8") as f: |
| 18 | return yaml.safe_load(f) |
| 19 | |
| 20 | def make_frame_subscriber(endpoint: str, ctx: zmq.Context | None = None) -> zmq.Socket: |
| 21 | ctx = ctx or zmq.Context.instance() |
| 22 | sock = ctx.socket(zmq.SUB) |
| 23 | sock.setsockopt(zmq.RCVHWM, 1) |
| 24 | sock.setsockopt(zmq.SUBSCRIBE, b"") |
| 25 | sock.connect(endpoint) |
| 26 | return sock |
| 27 | |
| 28 | def make_pub(endpoint: str, ctx: zmq.Context | None = None) -> zmq.Socket: |
| 29 | ctx = ctx or zmq.Context.instance() |
| 30 | sock = ctx.socket(zmq.PUB) |
| 31 | sock.setsockopt(zmq.SNDHWM, 8) |
| 32 | sock.bind(endpoint) |
| 33 | return sock |
| 34 | |
| 35 | def iter_latest_frames(sock: zmq.Socket, timeout_ms: int = 2000) -> Iterator[tuple[dict, np.ndarray]]: |
| 36 | """Yield (meta_dict, decoded_bgr_frame) for each arriving message, dropping stale ones. |
| 37 | |
| 38 | Blocks up to timeout_ms for the next message; raises TimeoutError if idle too long. |
| 39 | """ |
| 40 | poller = zmq.Poller() |
| 41 | poller.register(sock, zmq.POLLIN) |
| 42 | while True: |
| 43 | events = dict(poller.poll(timeout_ms)) |
| 44 | if sock not in events: |
| 45 | raise TimeoutError(f"No frame within {timeout_ms}ms") |
| 46 | |
| 47 | latest: list[bytes] | None = None |
| 48 | while True: |
| 49 | try: |
| 50 | latest = sock.recv_multipart(flags=zmq.NOBLOCK) |
| 51 | except zmq.Again: |
| 52 | break |
| 53 | if latest is None or len(latest) != 2: |
| 54 | continue |
| 55 | meta = json.loads(latest[0].decode("utf-8")) |
| 56 | arr = np.frombuffer(latest[1], dtype=np.uint8) |
| 57 | frame = cv2.imdecode(arr, cv2.IMREAD_COLOR) |
| 58 | if frame is None: |
| 59 | continue |
| 60 | yield meta, frame |
| 61 | |
| 62 | def send_event(sock: zmq.Socket, event: dict) -> None: |
| 63 | sock.send(json.dumps(event).encode("utf-8")) |
| 64 | |
| 65 | def event_subscriber(endpoint: str, ctx: zmq.Context | None = None) -> zmq.Socket: |
| 66 | ctx = ctx or zmq.Context.instance() |
| 67 | sock = ctx.socket(zmq.SUB) |
| 68 | sock.setsockopt(zmq.RCVHWM, 64) |
| 69 | sock.setsockopt(zmq.SUBSCRIBE, b"") |
| 70 | sock.connect(endpoint) |
| 71 | return sock |