diff --git a/tools/rf_testbench.py b/tools/rf_testbench.py index f12d92b..e673c00 100644 --- a/tools/rf_testbench.py +++ b/tools/rf_testbench.py @@ -10,9 +10,9 @@ detectable signal, and BPSK mode 9 behavior. Hardware setup: NanoVNA CH0 → DC Blocker → HMC472A → SMA-to-F → SkyWalker-1 -The HMC472A (0-31.5 dB, 0.5 dB steps) is controlled via its ESP32-S2 REST -API. The NanoVNA provides CW at a fixed frequency, controlled either via -mcnanovna or manually. The SkyWalker-1 measures AGC, SNR, and lock status. +The HMC472A (0-31.5 dB, 0.5 dB steps) is controlled via USB serial (preferred) +or REST API. The NanoVNA provides CW at a fixed frequency, controlled either +via mcnanovna or manually. The SkyWalker-1 measures AGC, SNR, and lock status. Usage: python rf_testbench.py agc-linearity --freq 1200 @@ -88,6 +88,80 @@ class HMC472A: return self._get("/config") +# --- HMC472A USB serial client --- + +class HMC472ASerial: + """Control the HMC472A digital attenuator via USB CDC serial. + + Uses the usb-serial-json-v1 protocol: one JSON object per newline- + terminated line in each direction. Requires pyserial. + """ + + def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.0): + import serial + self.ser = serial.Serial(port, baudrate, timeout=timeout) + self.ser.reset_input_buffer() + + def close(self): + if self.ser and self.ser.is_open: + self.ser.close() + + def _cmd(self, command: dict) -> dict: + line = json.dumps(command, separators=(",", ":")) + "\n" + self.ser.write(line.encode()) + self.ser.flush() + resp_line = self.ser.readline() + if not resp_line: + raise TimeoutError("no response from HMC472A") + resp = json.loads(resp_line) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", "unknown error")) + return resp + + def status(self) -> dict: + return self._cmd({"cmd": "status"}) + + def set_db(self, attenuation_db: float) -> dict: + clamped = max(0.0, min(31.5, attenuation_db)) + rounded = round(clamped * 2) / 2 + return self._cmd({"cmd": "set", "db": rounded}) + + def config(self) -> dict: + return self._cmd({"cmd": "config"}) + + def identify(self) -> dict: + return self._cmd({"cmd": "identify"}) + + +def detect_hmc472a_serial() -> str | None: + """Scan /dev/ttyACM* ports for an HMC472A responding to identify. + + Returns the port path if found, None otherwise. + """ + import glob + try: + import serial + except ImportError: + return None + + ports = sorted(glob.glob("/dev/ttyACM*")) + for port in ports: + try: + ser = serial.Serial(port, 115200, timeout=0.5) + ser.reset_input_buffer() + ser.write(b'{"cmd":"identify"}\n') + ser.flush() + resp_line = ser.readline() + ser.close() + if resp_line: + resp = json.loads(resp_line) + if resp.get("device") == "hmc472a-attenuator": + return port + except (OSError, json.JSONDecodeError, ValueError): + continue + return None + + class MockSkyWalker1: """Lightweight mock SkyWalker-1 for rf_testbench testing.""" @@ -650,12 +724,14 @@ examples: %(prog)s freq-accuracy --freqs 1000,1200,1400 %(prog)s mds --freq 1200 %(prog)s bpsk-probe --freq 1200 - %(prog)s band-flatness --nanovna auto --attenuator http://10.0.0.50 + %(prog)s band-flatness --attenuator /dev/ttyACM1 + %(prog)s agc-linearity --attenuator http://attenuator.local --freq 1200 hardware setup: NanoVNA CH0 → DC Blocker → HMC472A (0-31.5 dB) → SMA-to-F → SkyWalker-1 - The HMC472A is controlled via its ESP32-S2 REST API. + The HMC472A is controlled via USB serial (preferred) or HTTP REST API. + Use --attenuator auto (default) to auto-detect USB, falling back to HTTP. The NanoVNA provides CW, controlled via mcnanovna or manually. LNB power is disabled (direct L-band input mode). """, @@ -667,10 +743,10 @@ hardware setup: help="CSV output file path") parser.add_argument("--cal", type=str, default=None, help="Path loss calibration CSV (NanoVNA S21 sweep)") - parser.add_argument("--attenuator", type=str, - default="http://attenuator.local", - help="HMC472A REST API base URL " - "(default: http://attenuator.local)") + parser.add_argument("--attenuator", type=str, default="auto", + help="HMC472A connection: 'auto' (USB then HTTP), " + "/dev/ttyACMx (USB serial), or http://host (REST) " + "(default: auto)") parser.add_argument("--nanovna", choices=["auto", "manual"], default="auto", help="NanoVNA control mode (default: auto via mcnanovna)") @@ -717,6 +793,51 @@ hardware setup: return parser +def _connect_attenuator(target: str): + """Connect to HMC472A via auto-detect, USB serial, or HTTP REST. + + Args: + target: "auto", a serial port path (/dev/ttyACM*), or an HTTP URL. + """ + # Auto-detect: try USB serial first, fall back to HTTP + if target == "auto": + port = detect_hmc472a_serial() + if port: + print(f"HMC472A: auto-detected USB serial on {port}") + target = port + else: + print("HMC472A: no USB device found, trying HTTP...") + target = "http://attenuator.local" + + # USB serial path + if target.startswith("/dev/"): + try: + atten = HMC472ASerial(target) + info = atten.identify() + print(f"HMC472A: USB serial on {target} " + f"(v{info.get('version', '?')}, " + f"protocol {info.get('protocol', '?')})") + return atten + except ImportError: + print("HMC472A: pyserial not installed (pip install pyserial)") + sys.exit(1) + except (OSError, TimeoutError) as e: + print(f"HMC472A: cannot open {target} ({e})") + sys.exit(1) + + # HTTP REST API + atten = HMC472A(target) + try: + cfg = atten.config() + print(f"HMC472A: HTTP on {target} ({cfg.get('hostname', '?')}, " + f"v{cfg.get('version', '?')})") + return atten + except (URLError, OSError) as e: + print(f"HMC472A: cannot reach {target} ({e})") + print(" Use --attenuator /dev/ttyACMx (USB) or http://host (HTTP)") + sys.exit(1) + + def main(): parser = build_parser() args = parser.parse_args() @@ -729,15 +850,7 @@ def main(): atten = MockHMC472A() print("HMC472A: mock mode") else: - atten = HMC472A(args.attenuator) - try: - cfg = atten.config() - print(f"HMC472A: connected ({cfg.get('hostname', '?')}, " - f"v{cfg.get('version', '?')})") - except (URLError, OSError) as e: - print(f"HMC472A: cannot reach {args.attenuator} ({e})") - print(" Check network connection or use --attenuator ") - sys.exit(1) + atten = _connect_attenuator(args.attenuator) # Set up NanoVNA nanovna = None