| 1 | """PC-side bridge to Titan Two via HID vendor interface. |
| 2 | |
| 3 | Sends 65-byte output reports (1-byte report ID + 64 payload bytes) that the |
| 4 | paired bridge.gpc script parses. Fail-safe: if you stop sending for >200 ms, |
| 5 | the GPC script zeroes all outputs. |
| 6 | |
| 7 | Usage (Python API): |
| 8 | with TitanBridge() as t: |
| 9 | t.send(stick_lx=0, stick_ly=0, buttons={"A": True}) |
| 10 | time.sleep(0.15) |
| 11 | t.send() |
| 12 | """ |
| 13 | from __future__ import annotations |
| 14 | |
| 15 | import time |
| 16 | from dataclasses import dataclass |
| 17 | from typing import Iterable |
| 18 | |
| 19 | import pywinusb.hid as hid |
| 20 | |
| 21 | VID = 0x2508 |
| 22 | PID = 0x0032 |
| 23 | |
| 24 | PKT_MAGIC = 0xAA |
| 25 | CMD_SET = 0x01 |
| 26 | CMD_DISARM = 0xFF |
| 27 | |
| 28 | BUTTONS_LO = { |
| 29 | "A": 0x01, "B": 0x02, "X": 0x04, "Y": 0x08, |
| 30 | "LB": 0x10, "RB": 0x20, "BACK": 0x40, "START": 0x80, |
| 31 | } |
| 32 | BUTTONS_HI = { |
| 33 | "LS": 0x01, "RS": 0x02, |
| 34 | "UP": 0x04, "DOWN": 0x08, "LEFT": 0x10, "RIGHT": 0x20, |
| 35 | } |
| 36 | |
| 37 | def _clamp_stick(v: float) -> int: |
| 38 | v = int(round(v)) |
| 39 | if v > 100: return 100 |
| 40 | if v < -100: return -100 |
| 41 | return v & 0xFF |
| 42 | |
| 43 | def _clamp_trigger(v: float) -> int: |
| 44 | v = int(round(v)) |
| 45 | if v > 100: return 100 |
| 46 | if v < 0: return 0 |
| 47 | return v |
| 48 | |
| 49 | @dataclass |
| 50 | class ControllerState: |
| 51 | lx: float = 0.0 |
| 52 | ly: float = 0.0 |
| 53 | rx: float = 0.0 |
| 54 | ry: float = 0.0 |
| 55 | lt: float = 0.0 |
| 56 | rt: float = 0.0 |
| 57 | buttons: frozenset = frozenset() |
| 58 | |
| 59 | class TitanBridge: |
| 60 | def __init__(self, vid: int = VID, pid: int = PID): |
| 61 | self.vid = vid |
| 62 | self.pid = pid |
| 63 | self._dev = None |
| 64 | self._out_report = None |
| 65 | |
| 66 | def open(self) -> None: |
| 67 | filt = hid.HidDeviceFilter(vendor_id=self.vid, product_id=self.pid) |
| 68 | devs = filt.get_devices() |
| 69 | if not devs: |
| 70 | raise RuntimeError(f"No HID device found for VID={self.vid:04x} PID={self.pid:04x}. Is the Titan Two connected?") |
| 71 | self._dev = devs[0] |
| 72 | self._dev.open() |
| 73 | reports = self._dev.find_output_reports() |
| 74 | if not reports: |
| 75 | self._dev.close() |
| 76 | raise RuntimeError("Titan Two has no output reports; cannot send.") |
| 77 | self._out_report = reports[0] |
| 78 | |
| 79 | def close(self) -> None: |
| 80 | if self._dev is not None: |
| 81 | try: |
| 82 | |
| 83 | self._send_raw(cmd=CMD_DISARM) |
| 84 | except Exception: |
| 85 | pass |
| 86 | try: |
| 87 | self._dev.close() |
| 88 | except Exception: |
| 89 | pass |
| 90 | self._dev = None |
| 91 | self._out_report = None |
| 92 | |
| 93 | def __enter__(self) -> "TitanBridge": |
| 94 | self.open() |
| 95 | return self |
| 96 | |
| 97 | def __exit__(self, *exc) -> None: |
| 98 | self.close() |
| 99 | |
| 100 | def _send_raw(self, cmd: int, lx: int = 0, ly: int = 0, rx: int = 0, ry: int = 0, |
| 101 | btn_lo: int = 0, btn_hi: int = 0, lt: int = 0, rt: int = 0) -> None: |
| 102 | if self._out_report is None: |
| 103 | raise RuntimeError("Bridge not open") |
| 104 | data = [0] * 65 |
| 105 | |
| 106 | data[0] = 0x00 |
| 107 | |
| 108 | data[1] = PKT_MAGIC |
| 109 | data[2] = cmd & 0xFF |
| 110 | data[3] = lx & 0xFF |
| 111 | data[4] = ly & 0xFF |
| 112 | data[5] = rx & 0xFF |
| 113 | data[6] = ry & 0xFF |
| 114 | data[7] = btn_lo & 0xFF |
| 115 | data[8] = btn_hi & 0xFF |
| 116 | data[9] = lt & 0xFF |
| 117 | data[10] = rt & 0xFF |
| 118 | self._out_report.set_raw_data(data) |
| 119 | self._out_report.send() |
| 120 | |
| 121 | def send(self, lx: float = 0, ly: float = 0, rx: float = 0, ry: float = 0, |
| 122 | lt: float = 0, rt: float = 0, buttons: Iterable[str] = ()) -> None: |
| 123 | """Send a controller state to the Titan Two. |
| 124 | |
| 125 | Sticks are in [-100, 100]. Triggers [0, 100]. Buttons is a set of names |
| 126 | like {"A", "LB"}. Call this at least every 200 ms to stay "armed". |
| 127 | """ |
| 128 | btn_lo = 0 |
| 129 | btn_hi = 0 |
| 130 | for name in buttons: |
| 131 | up = name.upper() |
| 132 | if up in BUTTONS_LO: |
| 133 | btn_lo |= BUTTONS_LO[up] |
| 134 | elif up in BUTTONS_HI: |
| 135 | btn_hi |= BUTTONS_HI[up] |
| 136 | else: |
| 137 | raise ValueError(f"Unknown button name: {name!r}") |
| 138 | self._send_raw( |
| 139 | cmd=CMD_SET, |
| 140 | lx=_clamp_stick(lx), ly=_clamp_stick(ly), |
| 141 | rx=_clamp_stick(rx), ry=_clamp_stick(ry), |
| 142 | btn_lo=btn_lo, btn_hi=btn_hi, |
| 143 | lt=_clamp_trigger(lt), rt=_clamp_trigger(rt), |
| 144 | ) |
| 145 | |
| 146 | def disarm(self) -> None: |
| 147 | self._send_raw(cmd=CMD_DISARM) |
| 148 | |
| 149 | def tap(bridge: TitanBridge, button: str, duration_s: float = 0.15) -> None: |
| 150 | """Press a button briefly and release.""" |
| 151 | bridge.send(buttons={button}) |
| 152 | time.sleep(duration_s) |
| 153 | bridge.send() |