From a12a394099f54fed8f161537760f87042576b0dd Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Fri, 20 Feb 2026 10:56:36 -0700 Subject: [PATCH] Add FixedAttenuator class and udev rules for RF test bench FixedAttenuator supports --attenuator fixed:XX for non-programmable inline SMA pads (set_db is a no-op returning the fixed value). Udev rules grant non-root USB access for NanoVNA and HMC472A. --- tools/rf_testbench.py | 1936 ++++++++++++++++++------------------ udev/99-rf-testbench.rules | 24 + 2 files changed, 1010 insertions(+), 950 deletions(-) create mode 100644 udev/99-rf-testbench.rules diff --git a/tools/rf_testbench.py b/tools/rf_testbench.py index e673c00..e693d8b 100644 --- a/tools/rf_testbench.py +++ b/tools/rf_testbench.py @@ -1,950 +1,986 @@ -#!/usr/bin/env python3 -""" -RF Test Bench — CW injection tests with NanoVNA + HMC472A + SkyWalker-1. - -Injects CW signals from a NanoVNA-H through a programmable HMC472A digital -attenuator into the SkyWalker-1 receiver. Runs automated test sequences to -characterize AGC linearity, IF band flatness, frequency accuracy, minimum -detectable signal, and BPSK mode 9 behavior. - -Hardware setup: - NanoVNA CH0 → DC Blocker → HMC472A → SMA-to-F → SkyWalker-1 - -The HMC472A (0-31.5 dB, 0.5 dB steps) is controlled via USB serial (preferred) -or REST API. The NanoVNA provides CW at a fixed frequency, controlled either -via mcnanovna or manually. The SkyWalker-1 measures AGC, SNR, and lock status. - -Usage: - python rf_testbench.py agc-linearity --freq 1200 - python rf_testbench.py band-flatness --start 950 --stop 1500 --step 10 - python rf_testbench.py freq-accuracy --freqs 1000,1200,1400 - python rf_testbench.py mds --freq 1200 - python rf_testbench.py bpsk-probe --freq 1200 - python rf_testbench.py --help -""" - -import sys -import os -import argparse -import csv -import json -import time -from datetime import datetime, timezone -from urllib.request import urlopen, Request -from urllib.error import URLError - -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from skywalker_lib import SkyWalker1, MODULATIONS - - -# --- HMC472A REST client --- - -class HMC472A: - """Control the HMC472A digital attenuator via its ESP32-S2 REST API.""" - - def __init__(self, base_url: str = "http://attenuator.local"): - self.base_url = base_url.rstrip("/") - - def _get(self, path: str, retries: int = 3) -> dict: - url = f"{self.base_url}{path}" - req = Request(url) - last_err: OSError = OSError("no attempts made") - for attempt in range(retries): - try: - with urlopen(req, timeout=5) as resp: - return json.loads(resp.read()) - except (URLError, OSError) as e: - last_err = e - if attempt < retries - 1: - time.sleep(0.2 * (attempt + 1)) - raise last_err - - def _post(self, path: str, data: dict, retries: int = 3) -> dict: - url = f"{self.base_url}{path}" - body = json.dumps(data).encode() - req = Request(url, data=body, method="POST", - headers={"Content-Type": "application/json"}) - last_err: OSError = OSError("no attempts made") - for attempt in range(retries): - try: - with urlopen(req, timeout=5) as resp: - return json.loads(resp.read()) - except (URLError, OSError) as e: - last_err = e - if attempt < retries - 1: - time.sleep(0.2 * (attempt + 1)) - raise last_err - - def status(self) -> dict: - return self._get("/status") - - def set_db(self, attenuation_db: float) -> dict: - clamped = max(0.0, min(31.5, attenuation_db)) - rounded = round(clamped * 2) / 2 # Snap to 0.5 dB steps - return self._post("/set", {"attenuation_db": rounded}) - - def config(self) -> dict: - return self._get("/config") - - -# --- HMC472A USB serial client --- - -class HMC472ASerial: - """Control the HMC472A digital attenuator via USB CDC serial. - - Uses the usb-serial-json-v1 protocol: one JSON object per newline- - terminated line in each direction. Requires pyserial. - """ - - def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.0): - import serial - self.ser = serial.Serial(port, baudrate, timeout=timeout) - self.ser.reset_input_buffer() - - def close(self): - if self.ser and self.ser.is_open: - self.ser.close() - - def _cmd(self, command: dict) -> dict: - line = json.dumps(command, separators=(",", ":")) + "\n" - self.ser.write(line.encode()) - self.ser.flush() - resp_line = self.ser.readline() - if not resp_line: - raise TimeoutError("no response from HMC472A") - resp = json.loads(resp_line) - if not resp.get("ok"): - raise RuntimeError(resp.get("error", "unknown error")) - return resp - - def status(self) -> dict: - return self._cmd({"cmd": "status"}) - - def set_db(self, attenuation_db: float) -> dict: - clamped = max(0.0, min(31.5, attenuation_db)) - rounded = round(clamped * 2) / 2 - return self._cmd({"cmd": "set", "db": rounded}) - - def config(self) -> dict: - return self._cmd({"cmd": "config"}) - - def identify(self) -> dict: - return self._cmd({"cmd": "identify"}) - - -def detect_hmc472a_serial() -> str | None: - """Scan /dev/ttyACM* ports for an HMC472A responding to identify. - - Returns the port path if found, None otherwise. - """ - import glob - try: - import serial - except ImportError: - return None - - ports = sorted(glob.glob("/dev/ttyACM*")) - for port in ports: - try: - ser = serial.Serial(port, 115200, timeout=0.5) - ser.reset_input_buffer() - ser.write(b'{"cmd":"identify"}\n') - ser.flush() - resp_line = ser.readline() - ser.close() - if resp_line: - resp = json.loads(resp_line) - if resp.get("device") == "hmc472a-attenuator": - return port - except (OSError, json.JSONDecodeError, ValueError): - continue - return None - - -class MockSkyWalker1: - """Lightweight mock SkyWalker-1 for rf_testbench testing.""" - - def __init__(self, verbose=False): - self.verbose = verbose - self._freq_khz = 0 - - def open(self): - pass - - def close(self): - pass - - def ensure_booted(self): - pass - - def start_intersil(self, on=True): - pass - - def tune_monitor(self, symbol_rate_sps=1000000, freq_khz=1200000, - mod_index=0, fec_index=5, dwell_ms=10): - self._freq_khz = freq_khz - # Simulate AGC response: higher freq → slightly lower power - base_agc1 = 1200 + (freq_khz - 1200000) // 100 - return { - "snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0, - "agc1": max(100, base_agc1), "agc2": 750, - "power_db": -46.1 - (freq_khz - 1200000) / 500000, - "locked": False, "lock": 0x00, "status": 0x01, - "dwell_ms": dwell_ms, - } - - def signal_monitor(self): - return { - "snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5, - "agc1": 1200, "agc2": 800, "power_db": -45.3, - "locked": False, "lock": 0x00, "status": 0x01, - } - - def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0, - dwell_ms=15, sr_ksps=1000, mod_index=0, fec_index=5): - n = int((stop_mhz - start_mhz) / step_mhz) + 1 - freqs = [start_mhz + i * step_mhz for i in range(n)] - powers = [-50.0 + 3.0 * (1.0 - abs(f - 1200) / 300) for f in freqs] - raw = [{"agc1": 1200, "agc2": 750, "power_db": p, - "snr_raw": 0, "snr_db": 0, "locked": False, - "lock": 0, "status": 0} for p in powers] - return freqs, powers, raw - - -def _make_mock_skywalker(verbose=False): - sw = MockSkyWalker1(verbose=verbose) - sw.open() - sw.ensure_booted() - return sw - - -class MockHMC472A: - """Mock attenuator for testing without hardware.""" - - def __init__(self, base_url: str = "http://mock.local"): - self.base_url = base_url - self._db = 0.0 - - def status(self) -> dict: - step = int(self._db * 2) - return {"attenuation_db": self._db, "step": step, "version": "mock"} - - def set_db(self, attenuation_db: float) -> dict: - self._db = max(0.0, min(31.5, round(attenuation_db * 2) / 2)) - return self.status() - - def config(self) -> dict: - return {"db_min": 0.0, "db_max": 31.5, "db_step": 0.5, - "version": "mock", "hostname": "mock-attenuator"} - - -# --- NanoVNA control --- - -def try_import_nanovna(): - """Try to import mcnanovna for automated NanoVNA control.""" - try: - from mcnanovna.nanovna import NanoVNA - return NanoVNA - except ImportError: - return None - - -class MockNanoVNA: - """Mock NanoVNA for testing without hardware.""" - - def cw(self, frequency_hz: int = 0, power: int = 3): - pass - - -def manual_nanovna_set(freq_mhz: float, power: int = 3) -> None: - """Prompt the user to manually set NanoVNA CW frequency.""" - print(f"\n >>> Set NanoVNA to CW at {freq_mhz:.3f} MHz, power={power}") - input(" Press Enter when ready...") - - -# --- CSV output --- - -CSV_COLUMNS = [ - "timestamp", "test_name", "freq_mhz", "atten_db", - "agc1", "agc2", "power_db", "snr_raw", "snr_db", - "locked", "lock_raw", "status", "notes", -] - - -def open_csv(path: str): - f = open(path, "w", newline="") - writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) - writer.writeheader() - return f, writer - - -def write_row(writer, csv_file, test_name: str, freq_mhz: float, - atten_db: float, result: dict, notes: str = ""): - writer.writerow({ - "timestamp": datetime.now(timezone.utc).isoformat(), - "test_name": test_name, - "freq_mhz": f"{freq_mhz:.3f}", - "atten_db": f"{atten_db:.1f}", - "agc1": result.get("agc1", 0), - "agc2": result.get("agc2", 0), - "power_db": f"{result.get('power_db', 0):.2f}", - "snr_raw": result.get("snr_raw", 0), - "snr_db": f"{result.get('snr_db', 0):.2f}", - "locked": result.get("locked", False), - "lock_raw": f"0x{result.get('lock', 0):02X}", - "status": f"0x{result.get('status', 0):02X}", - "notes": notes, - }) - if csv_file: - csv_file.flush() - - -# --- Calibration --- - -def load_cal_file(path: str) -> dict: - """Load a NanoVNA S21 path-loss calibration CSV. - - Expects columns: frequency_hz (or freq_mhz), s21_db (or loss_db). - Returns dict mapping freq_mhz -> loss_db (positive = loss). - """ - cal = {} - with open(path) as f: - reader = csv.DictReader(f) - for row in reader: - if "freq_mhz" in row: - freq = float(row["freq_mhz"]) - elif "frequency_hz" in row: - freq = float(row["frequency_hz"]) / 1e6 - else: - continue - - if "loss_db" in row: - loss = float(row["loss_db"]) - elif "s21_db" in row: - loss = -float(row["s21_db"]) # S21 is negative, loss is positive - else: - continue - cal[freq] = loss - return cal - - -def interpolate_loss(cal: dict, freq_mhz: float) -> float: - """Interpolate path loss at a frequency from cal data.""" - if not cal: - return 0.0 - freqs = sorted(cal.keys()) - if freq_mhz <= freqs[0]: - return cal[freqs[0]] - if freq_mhz >= freqs[-1]: - return cal[freqs[-1]] - for i in range(len(freqs) - 1): - if freqs[i] <= freq_mhz <= freqs[i + 1]: - f0, f1 = freqs[i], freqs[i + 1] - t = (freq_mhz - f0) / (f1 - f0) - return cal[f0] + t * (cal[f1] - cal[f0]) - return 0.0 - - -# --- Test: AGC Power Linearity --- - -def test_agc_linearity(sw, atten, nanovna, freq_mhz: float, - writer, csv_file, cal: dict, settle_ms: int) -> list: - """Sweep attenuator from 0 to 31.5 dB and record AGC at each step. - - Maps the AGC transfer function: how AGC register values respond to - known changes in input power. - """ - print(f"\n=== AGC Linearity Test at {freq_mhz:.1f} MHz ===") - results = [] - path_loss = interpolate_loss(cal, freq_mhz) - if path_loss > 0: - print(f" Calibrated path loss: {path_loss:.1f} dB") - - # Set NanoVNA to CW at the test frequency - if nanovna: - nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) - print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3") - else: - manual_nanovna_set(freq_mhz, power=3) - - # Tune SkyWalker-1 to the frequency - freq_khz = int(freq_mhz * 1000) - - print(f"\n {'Atten dB':>9} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} " - f"{'SNR raw':>8} {'Lock':>5}") - print(f" {'─' * 9} {'─' * 6} {'─' * 6} {'─' * 9} {'─' * 8} {'─' * 5}") - - # Sweep in 0.5 dB steps from 0 to 31.5 dB (64 steps) - # Use integer step counter to avoid IEEE 754 float accumulation drift - for step in range(64): # 0, 1, 2, ... 63 → 0.0, 0.5, 1.0, ... 31.5 - atten_db = step * 0.5 - atten.set_db(atten_db) - time.sleep(settle_ms / 1000.0) - - result = sw.tune_monitor( - symbol_rate_sps=1000000, freq_khz=freq_khz, - mod_index=0, fec_index=5, dwell_ms=50 - ) - - locked = "Y" if result.get("locked") else "N" - print(f" {atten_db:9.1f} {result['agc1']:6d} {result['agc2']:6d} " - f"{result['power_db']:9.2f} {result['snr_raw']:8d} {locked:>5}") - - effective_atten = atten_db + path_loss - note = f"effective_atten={effective_atten:.1f}dB" - if writer: - write_row(writer, csv_file, "agc_linearity", freq_mhz, atten_db, - result, note) - - results.append({"atten_db": atten_db, **result}) - - # Summary - if results: - agc1_min = min(r["agc1"] for r in results) - agc1_max = max(r["agc1"] for r in results) - print(f"\n AGC1 range: {agc1_min} - {agc1_max} " - f"(delta={agc1_max - agc1_min}) over 31.5 dB sweep") - - return results - - -# --- Test: IF Band Flatness --- - -def test_band_flatness(sw, atten, nanovna, start_mhz: float, - stop_mhz: float, step_mhz: float, - writer, csv_file, cal: dict, settle_ms: int) -> list: - """Sweep CW across the IF band and record AGC at each frequency. - - Reveals tuner gain slope, passband ripple, and the IF filter response. - """ - atten_db = 10.0 # Fixed attenuation — mid-range for good dynamic range - print(f"\n=== IF Band Flatness: {start_mhz:.0f}-{stop_mhz:.0f} MHz, " - f"step={step_mhz:.1f} MHz ===") - print(f" HMC472A fixed at {atten_db:.1f} dB") - - atten.set_db(atten_db) - results = [] - - # Use integer step counter to avoid float accumulation drift - n_steps = int(round((stop_mhz - start_mhz) / step_mhz)) + 1 - - print(f"\n {'Step':>5} {'Freq MHz':>9} {'AGC1':>6} {'AGC2':>6} " - f"{'Power dB':>9} {'PathLoss':>9} {'Corr dB':>8}") - print(f" {'─' * 5} {'─' * 9} {'─' * 6} {'─' * 6} " - f"{'─' * 9} {'─' * 9} {'─' * 8}") - - for step_num in range(n_steps): - freq_mhz = start_mhz + step_num * step_mhz - - # Set NanoVNA CW - if nanovna: - nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) - else: - manual_nanovna_set(freq_mhz, power=3) - - time.sleep(settle_ms / 1000.0) - - # Tune SkyWalker-1 - freq_khz = int(freq_mhz * 1000) - result = sw.tune_monitor( - symbol_rate_sps=1000000, freq_khz=freq_khz, - mod_index=0, fec_index=5, dwell_ms=50 - ) - - path_loss = interpolate_loss(cal, freq_mhz) - corrected = result["power_db"] + path_loss - - print(f" {step_num + 1:5d} {freq_mhz:9.1f} {result['agc1']:6d} " - f"{result['agc2']:6d} {result['power_db']:9.2f} " - f"{path_loss:9.1f} {corrected:8.2f}") - - note = f"path_loss={path_loss:.1f}dB corrected={corrected:.2f}dB" - if writer: - write_row(writer, csv_file, "band_flatness", freq_mhz, atten_db, - result, note) - - results.append({"freq_mhz": freq_mhz, "corrected_db": corrected, **result}) - - # Summary - if results: - powers = [r["corrected_db"] for r in results] - ripple = max(powers) - min(powers) - print(f"\n Band flatness: {ripple:.2f} dB ripple " - f"(min={min(powers):.2f}, max={max(powers):.2f})") - - return results - - -# --- Test: Frequency Accuracy --- - -def test_freq_accuracy(sw, atten, nanovna, test_freqs: list, - writer, csv_file, settle_ms: int) -> list: - """Inject CW at known frequencies, sweep SkyWalker-1 around each one. - - Compares detected peak vs. injected frequency to characterize the - BCM3440 tuner's frequency accuracy. - """ - print(f"\n=== Frequency Accuracy Test ===") - atten_db = 10.0 - atten.set_db(atten_db) - - results = [] - sweep_span_mhz = 10.0 # Sweep +/- 5 MHz around each test freq - sweep_step_mhz = 1.0 - - for inject_freq in test_freqs: - print(f"\n Injecting CW at {inject_freq:.3f} MHz...") - if nanovna: - nanovna.cw(frequency_hz=int(inject_freq * 1e6), power=3) - else: - manual_nanovna_set(inject_freq, power=3) - - time.sleep(settle_ms / 1000.0) - - # Sweep around the expected frequency - sweep_start = inject_freq - sweep_span_mhz / 2 - sweep_stop = inject_freq + sweep_span_mhz / 2 - - freqs, powers, raw = sw.sweep_spectrum( - sweep_start, sweep_stop, - step_mhz=sweep_step_mhz, dwell_ms=50, - sr_ksps=1000, mod_index=0, fec_index=5, - ) - - # Find peak - if powers: - peak_idx = max(range(len(powers)), key=lambda i: powers[i]) - peak_freq = freqs[peak_idx] - peak_power = powers[peak_idx] - error_mhz = peak_freq - inject_freq - error_khz = error_mhz * 1000 - - print(f" Injected: {inject_freq:.3f} MHz " - f"Detected peak: {peak_freq:.3f} MHz " - f"Error: {error_khz:+.0f} kHz") - - result_entry = { - "inject_freq_mhz": inject_freq, - "peak_freq_mhz": peak_freq, - "error_khz": error_khz, - "peak_power_db": peak_power, - } - results.append(result_entry) - - if writer: - peak_result = raw[peak_idx] if isinstance(raw[peak_idx], dict) else { - "agc1": 0, "agc2": 0, "power_db": peak_power, - "snr_raw": 0, "snr_db": 0, - "locked": False, "lock": 0, "status": 0, - } - write_row(writer, csv_file, "freq_accuracy", inject_freq, - atten_db, peak_result, - f"peak={peak_freq:.3f}MHz error={error_khz:+.0f}kHz") - - # Summary - if results: - errors = [r["error_khz"] for r in results] - mean_err = sum(errors) / len(errors) - max_err = max(abs(e) for e in errors) - print(f"\n Mean frequency error: {mean_err:+.0f} kHz") - print(f" Max absolute error: {max_err:.0f} kHz") - - return results - - -# --- Test: Minimum Detectable Signal --- - -def test_mds(sw, atten, nanovna, freq_mhz: float, - writer, csv_file, settle_ms: int) -> dict: - """Find the minimum detectable signal level. - - Measures noise floor with NanoVNA off (or max attenuation), then - increases attenuation from 0 until the CW signal is indistinguishable - from noise. - """ - print(f"\n=== Minimum Detectable Signal at {freq_mhz:.1f} MHz ===") - freq_khz = int(freq_mhz * 1000) - - # Step 1: measure noise floor (max attenuation) - print(" Measuring noise floor (31.5 dB attenuation)...") - atten.set_db(31.5) - time.sleep(0.2) - - noise_readings = [] - for _ in range(10): - r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) - noise_readings.append(r["power_db"]) - time.sleep(0.05) - - noise_floor = sum(noise_readings) / len(noise_readings) - noise_std = (sum((x - noise_floor) ** 2 for x in noise_readings) - / len(noise_readings)) ** 0.5 - threshold = noise_floor + max(3.0 * noise_std, 1.0) # 3-sigma above noise - print(f" Noise floor: {noise_floor:.2f} dB (std={noise_std:.3f})") - print(f" Detection threshold: {threshold:.2f} dB (noise + 3sigma)") - - # Step 2: inject CW and increase attenuation until signal disappears - if nanovna: - nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) - print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3") - else: - manual_nanovna_set(freq_mhz, power=3) - - print(f"\n {'Atten dB':>9} {'Power dB':>9} {'Above noise':>12} {'Detected':>9}") - print(f" {'─' * 9} {'─' * 9} {'─' * 12} {'─' * 9}") - - mds_atten = None - # 1 dB steps: 0, 1, 2, ... 31 (32 steps) - for step in range(32): - atten_db = float(step) - atten.set_db(atten_db) - time.sleep(settle_ms / 1000.0) - - # Average 5 readings for stability - readings = [] - r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) - readings.append(r["power_db"]) - for _ in range(4): - time.sleep(0.02) - r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) - readings.append(r["power_db"]) - - avg_power = sum(readings) / len(readings) - above_noise = avg_power - noise_floor - detected = avg_power > threshold - - marker = "YES" if detected else "---" - print(f" {atten_db:9.1f} {avg_power:9.2f} {above_noise:+12.2f} " - f"{marker:>9}") - - if writer: - # Use averaged power instead of last single reading - avg_result = dict(r) - avg_result["power_db"] = avg_power - write_row(writer, csv_file, "mds", freq_mhz, atten_db, - avg_result, - f"avg={avg_power:.2f} noise={noise_floor:.2f} " - f"detected={'Y' if detected else 'N'}") - - if not detected and mds_atten is None: - mds_atten = atten_db - - result = { - "freq_mhz": freq_mhz, - "noise_floor_db": noise_floor, - "noise_std": noise_std, - "threshold_db": threshold, - "mds_atten_db": mds_atten, - } - - if mds_atten is not None: - print(f"\n Signal lost at {mds_atten:.1f} dB attenuation") - print(f" (NanoVNA output ~-15 dBm minus {mds_atten:.1f} dB path = " - f"~{-15 - mds_atten:.0f} dBm at receiver)") - else: - print(f"\n Signal detected at all attenuation levels (0-31.5 dB)") - print(f" Need more attenuation to find MDS") - - return result - - -# --- Test: BPSK Mode 9 CW Probe --- - -def test_bpsk_probe(sw, atten, nanovna, freq_mhz: float, - writer, csv_file, settle_ms: int) -> dict: - """Probe BPSK mode 9 response to an unmodulated CW carrier. - - BPSK mode (index 9) uses Viterbi rate 1/2 K=7 — the same inner FEC - as GOES LRIT. A CW carrier has no modulation, so the demodulator - shouldn't lock, but the AGC and carrier recovery behavior reveals - how mode 9 handles a clean carrier. - """ - print(f"\n=== BPSK Mode 9 CW Probe at {freq_mhz:.1f} MHz ===") - bpsk_index = MODULATIONS["bpsk"][0] # Mode 9 - freq_khz = int(freq_mhz * 1000) - atten_db = 10.0 - atten.set_db(atten_db) - - if nanovna: - nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) - else: - manual_nanovna_set(freq_mhz, power=3) - - time.sleep(settle_ms / 1000.0) - - # Test with different symbol rates typical of LRIT-like signals - test_rates = [293883, 500000, 1000000, 5000000] - - print(f"\n {'SR (sps)':>10} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} " - f"{'SNR raw':>8} {'SNR dB':>7} {'Lock':>6} {'Status':>8}") - print(f" {'─' * 10} {'─' * 6} {'─' * 6} {'─' * 9} " - f"{'─' * 8} {'─' * 7} {'─' * 6} {'─' * 8}") - - results = [] - for sr in test_rates: - # FEC 1/2 (index 0) for BPSK mode - result = sw.tune_monitor(sr, freq_khz, bpsk_index, 0, dwell_ms=100) - - locked = "Y" if result.get("locked") else "N" - print(f" {sr:10d} {result['agc1']:6d} {result['agc2']:6d} " - f"{result['power_db']:9.2f} {result['snr_raw']:8d} " - f"{result['snr_db']:7.2f} {locked:>6} " - f"0x{result.get('status', 0):02X}") - - if writer: - write_row(writer, csv_file, "bpsk_probe", freq_mhz, atten_db, - result, f"mode=bpsk sr={sr} fec=1/2") - - results.append({"symbol_rate": sr, **result}) - - # Compare with QPSK mode 0 at same settings - print(f"\n Reference: QPSK mode 0 at same frequency") - ref = sw.tune_monitor(1000000, freq_khz, 0, 5, dwell_ms=100) - ref_locked = "Y" if ref.get("locked") else "N" - print(f" {'1000000':>10} {ref['agc1']:6d} {ref['agc2']:6d} " - f"{ref['power_db']:9.2f} {ref['snr_raw']:8d} " - f"{ref['snr_db']:7.2f} {ref_locked:>6} " - f"0x{ref.get('status', 0):02X}") - - return {"bpsk_results": results, "qpsk_reference": ref} - - -# --- Main --- - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - prog="rf_testbench.py", - description="CW injection test bench: NanoVNA + HMC472A + SkyWalker-1", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog="""\ -examples: - %(prog)s agc-linearity --freq 1200 - %(prog)s band-flatness --start 950 --stop 1500 --step 10 - %(prog)s freq-accuracy --freqs 1000,1200,1400 - %(prog)s mds --freq 1200 - %(prog)s bpsk-probe --freq 1200 - %(prog)s band-flatness --attenuator /dev/ttyACM1 - %(prog)s agc-linearity --attenuator http://attenuator.local --freq 1200 - -hardware setup: - NanoVNA CH0 → DC Blocker → HMC472A (0-31.5 dB) → SMA-to-F → SkyWalker-1 - - The HMC472A is controlled via USB serial (preferred) or HTTP REST API. - Use --attenuator auto (default) to auto-detect USB, falling back to HTTP. - The NanoVNA provides CW, controlled via mcnanovna or manually. - LNB power is disabled (direct L-band input mode). -""", - ) - - parser.add_argument("-v", "--verbose", action="store_true", - help="Show raw USB traffic") - parser.add_argument("-o", "--output", type=str, default=None, - help="CSV output file path") - parser.add_argument("--cal", type=str, default=None, - help="Path loss calibration CSV (NanoVNA S21 sweep)") - parser.add_argument("--attenuator", type=str, default="auto", - help="HMC472A connection: 'auto' (USB then HTTP), " - "/dev/ttyACMx (USB serial), or http://host (REST) " - "(default: auto)") - parser.add_argument("--nanovna", choices=["auto", "manual"], - default="auto", - help="NanoVNA control mode (default: auto via mcnanovna)") - parser.add_argument("--settle", type=int, default=200, - help="Settle time in ms after changing attenuation " - "(default: 200)") - - sub = parser.add_subparsers(dest="test", required=True) - - # AGC linearity - p_agc = sub.add_parser("agc-linearity", - help="Sweep attenuation at fixed freq, map AGC curve") - p_agc.add_argument("--freq", type=float, default=1200.0, - help="Test frequency in MHz (default: 1200)") - - # Band flatness - p_band = sub.add_parser("band-flatness", - help="Sweep CW across IF band, measure AGC response") - p_band.add_argument("--start", type=float, default=950.0, - help="Start frequency in MHz (default: 950)") - p_band.add_argument("--stop", type=float, default=1500.0, - help="Stop frequency in MHz (default: 1500)") - p_band.add_argument("--step", type=float, default=10.0, - help="Frequency step in MHz (default: 10)") - - # Frequency accuracy - p_freq = sub.add_parser("freq-accuracy", - help="Inject CW at known freqs, measure error") - p_freq.add_argument("--freqs", type=str, default="1000,1100,1200,1300,1400", - help="Comma-separated test frequencies in MHz") - - # Minimum detectable signal - p_mds = sub.add_parser("mds", - help="Find minimum detectable signal level") - p_mds.add_argument("--freq", type=float, default=1200.0, - help="Test frequency in MHz (default: 1200)") - - # BPSK mode 9 probe - p_bpsk = sub.add_parser("bpsk-probe", - help="Probe BPSK mode 9 with CW carrier") - p_bpsk.add_argument("--freq", type=float, default=1200.0, - help="Test frequency in MHz (default: 1200)") - - return parser - - -def _connect_attenuator(target: str): - """Connect to HMC472A via auto-detect, USB serial, or HTTP REST. - - Args: - target: "auto", a serial port path (/dev/ttyACM*), or an HTTP URL. - """ - # Auto-detect: try USB serial first, fall back to HTTP - if target == "auto": - port = detect_hmc472a_serial() - if port: - print(f"HMC472A: auto-detected USB serial on {port}") - target = port - else: - print("HMC472A: no USB device found, trying HTTP...") - target = "http://attenuator.local" - - # USB serial path - if target.startswith("/dev/"): - try: - atten = HMC472ASerial(target) - info = atten.identify() - print(f"HMC472A: USB serial on {target} " - f"(v{info.get('version', '?')}, " - f"protocol {info.get('protocol', '?')})") - return atten - except ImportError: - print("HMC472A: pyserial not installed (pip install pyserial)") - sys.exit(1) - except (OSError, TimeoutError) as e: - print(f"HMC472A: cannot open {target} ({e})") - sys.exit(1) - - # HTTP REST API - atten = HMC472A(target) - try: - cfg = atten.config() - print(f"HMC472A: HTTP on {target} ({cfg.get('hostname', '?')}, " - f"v{cfg.get('version', '?')})") - return atten - except (URLError, OSError) as e: - print(f"HMC472A: cannot reach {target} ({e})") - print(" Use --attenuator /dev/ttyACMx (USB) or http://host (HTTP)") - sys.exit(1) - - -def main(): - parser = build_parser() - args = parser.parse_args() - - # Mock mode for testing without hardware - mock_mode = os.environ.get("SKYWALKER_MOCK") - - # Set up HMC472A attenuator - if mock_mode: - atten = MockHMC472A() - print("HMC472A: mock mode") - else: - atten = _connect_attenuator(args.attenuator) - - # Set up NanoVNA - nanovna = None - if mock_mode: - nanovna = MockNanoVNA() - print("NanoVNA: mock mode") - elif args.nanovna == "auto": - NanoVNA = try_import_nanovna() - if NanoVNA: - try: - nanovna = NanoVNA() - print(f"NanoVNA: auto mode (mcnanovna)") - except Exception as e: - print(f"NanoVNA: mcnanovna failed ({e}), falling back to manual") - else: - print("NanoVNA: mcnanovna not installed, using manual mode") - print(" Install: uv pip install -e /path/to/mcnanovna") - else: - print("NanoVNA: manual mode (you'll be prompted to set frequencies)") - - # Load calibration - cal = {} - if args.cal: - cal = load_cal_file(args.cal) - print(f"Calibration: loaded {len(cal)} points from {args.cal}") - - # Open CSV output - csv_file = None - writer = None - if args.output: - csv_file, writer = open_csv(args.output) - - # Open SkyWalker-1 - # SAFETY: Boot demodulator WITHOUT enabling LNB power. ensure_booted() - # transiently enables LNB voltage (13-18V on the F-connector), which - # would travel backward through the attenuator toward the NanoVNA. - # The DC blocker protects against this, but code should never rely on - # external protection it cannot verify. - if mock_mode: - sw = _make_mock_skywalker(args.verbose) - print("SkyWalker-1: mock mode") - else: - sw = SkyWalker1(verbose=args.verbose) - sw.open() - # Ensure LNB power is OFF before booting demodulator - sw.start_intersil(on=False) - status = sw.get_config() - if not (status & 0x01): - sw.boot(on=True) - time.sleep(0.5) - status = sw.get_config() - if not (status & 0x01): - print("ERROR: Device failed to start") - sys.exit(1) - print("SkyWalker-1: booted (LNB power kept OFF)") - - # Confirm LNB power disabled — direct input mode - sw.start_intersil(on=False) - print("LNB power disabled (direct input mode)") - print() - - try: - if args.test == "agc-linearity": - test_agc_linearity(sw, atten, nanovna, args.freq, - writer, csv_file, cal, args.settle) - elif args.test == "band-flatness": - test_band_flatness(sw, atten, nanovna, args.start, args.stop, - args.step, writer, csv_file, cal, args.settle) - elif args.test == "freq-accuracy": - freqs = [float(f) for f in args.freqs.split(",")] - test_freq_accuracy(sw, atten, nanovna, freqs, - writer, csv_file, args.settle) - elif args.test == "mds": - test_mds(sw, atten, nanovna, args.freq, - writer, csv_file, args.settle) - elif args.test == "bpsk-probe": - test_bpsk_probe(sw, atten, nanovna, args.freq, - writer, csv_file, args.settle) - except KeyboardInterrupt: - print("\n\nInterrupted by operator.") - finally: - # Safe state: maximum attenuation before releasing hardware - try: - atten.set_db(31.5) - print("Attenuator set to 31.5 dB (safe state)") - except Exception: - pass # best-effort on cleanup path - if csv_file: - csv_file.flush() - csv_file.close() - print(f"\nData saved to {args.output}") - if not mock_mode: - sw.close() - - -if __name__ == "__main__": - main() +#!/usr/bin/env python3 +""" +RF Test Bench — CW injection tests with NanoVNA + HMC472A + SkyWalker-1. + +Injects CW signals from a NanoVNA-H through a programmable HMC472A digital +attenuator into the SkyWalker-1 receiver. Runs automated test sequences to +characterize AGC linearity, IF band flatness, frequency accuracy, minimum +detectable signal, and BPSK mode 9 behavior. + +Hardware setup: + NanoVNA CH0 → DC Blocker → HMC472A → SMA-to-F → SkyWalker-1 + +The HMC472A (0-31.5 dB, 0.5 dB steps) is controlled via USB serial (preferred) +or REST API. The NanoVNA provides CW at a fixed frequency, controlled either +via mcnanovna or manually. The SkyWalker-1 measures AGC, SNR, and lock status. + +Usage: + python rf_testbench.py agc-linearity --freq 1200 + python rf_testbench.py band-flatness --start 950 --stop 1500 --step 10 + python rf_testbench.py freq-accuracy --freqs 1000,1200,1400 + python rf_testbench.py mds --freq 1200 + python rf_testbench.py bpsk-probe --freq 1200 + python rf_testbench.py --help +""" + +import sys +import os +import argparse +import csv +import json +import time +from datetime import datetime, timezone +from urllib.request import urlopen, Request +from urllib.error import URLError + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from skywalker_lib import SkyWalker1, MODULATIONS + + +# --- HMC472A REST client --- + +class HMC472A: + """Control the HMC472A digital attenuator via its ESP32-S2 REST API.""" + + def __init__(self, base_url: str = "http://attenuator.local"): + self.base_url = base_url.rstrip("/") + + def _get(self, path: str, retries: int = 3) -> dict: + url = f"{self.base_url}{path}" + req = Request(url) + last_err: OSError = OSError("no attempts made") + for attempt in range(retries): + try: + with urlopen(req, timeout=5) as resp: + return json.loads(resp.read()) + except (URLError, OSError) as e: + last_err = e + if attempt < retries - 1: + time.sleep(0.2 * (attempt + 1)) + raise last_err + + def _post(self, path: str, data: dict, retries: int = 3) -> dict: + url = f"{self.base_url}{path}" + body = json.dumps(data).encode() + req = Request(url, data=body, method="POST", + headers={"Content-Type": "application/json"}) + last_err: OSError = OSError("no attempts made") + for attempt in range(retries): + try: + with urlopen(req, timeout=5) as resp: + return json.loads(resp.read()) + except (URLError, OSError) as e: + last_err = e + if attempt < retries - 1: + time.sleep(0.2 * (attempt + 1)) + raise last_err + + def status(self) -> dict: + return self._get("/status") + + def set_db(self, attenuation_db: float) -> dict: + clamped = max(0.0, min(31.5, attenuation_db)) + rounded = round(clamped * 2) / 2 # Snap to 0.5 dB steps + return self._post("/set", {"attenuation_db": rounded}) + + def config(self) -> dict: + return self._get("/config") + + +# --- HMC472A USB serial client --- + +class HMC472ASerial: + """Control the HMC472A digital attenuator via USB CDC serial. + + Uses the usb-serial-json-v1 protocol: one JSON object per newline- + terminated line in each direction. Requires pyserial. + """ + + def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.0): + import serial + self.ser = serial.Serial(port, baudrate, timeout=timeout) + self.ser.reset_input_buffer() + + def close(self): + if self.ser and self.ser.is_open: + self.ser.close() + + def _cmd(self, command: dict) -> dict: + line = json.dumps(command, separators=(",", ":")) + "\n" + self.ser.write(line.encode()) + self.ser.flush() + resp_line = self.ser.readline() + if not resp_line: + raise TimeoutError("no response from HMC472A") + resp = json.loads(resp_line) + if not resp.get("ok"): + raise RuntimeError(resp.get("error", "unknown error")) + return resp + + def status(self) -> dict: + return self._cmd({"cmd": "status"}) + + def set_db(self, attenuation_db: float) -> dict: + clamped = max(0.0, min(31.5, attenuation_db)) + rounded = round(clamped * 2) / 2 + return self._cmd({"cmd": "set", "db": rounded}) + + def config(self) -> dict: + return self._cmd({"cmd": "config"}) + + def identify(self) -> dict: + return self._cmd({"cmd": "identify"}) + + +def detect_hmc472a_serial() -> str | None: + """Scan /dev/ttyACM* ports for an HMC472A responding to identify. + + Returns the port path if found, None otherwise. + """ + import glob + try: + import serial + except ImportError: + return None + + ports = sorted(glob.glob("/dev/ttyACM*")) + for port in ports: + try: + ser = serial.Serial(port, 115200, timeout=0.5) + ser.reset_input_buffer() + ser.write(b'{"cmd":"identify"}\n') + ser.flush() + resp_line = ser.readline() + ser.close() + if resp_line: + resp = json.loads(resp_line) + if resp.get("device") == "hmc472a-attenuator": + return port + except (OSError, json.JSONDecodeError, ValueError): + continue + return None + + +class MockSkyWalker1: + """Lightweight mock SkyWalker-1 for rf_testbench testing.""" + + def __init__(self, verbose=False): + self.verbose = verbose + self._freq_khz = 0 + + def open(self): + pass + + def close(self): + pass + + def ensure_booted(self): + pass + + def start_intersil(self, on=True): + pass + + def tune_monitor(self, symbol_rate_sps=1000000, freq_khz=1200000, + mod_index=0, fec_index=5, dwell_ms=10): + self._freq_khz = freq_khz + # Simulate AGC response: higher freq → slightly lower power + base_agc1 = 1200 + (freq_khz - 1200000) // 100 + return { + "snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0, + "agc1": max(100, base_agc1), "agc2": 750, + "power_db": -46.1 - (freq_khz - 1200000) / 500000, + "locked": False, "lock": 0x00, "status": 0x01, + "dwell_ms": dwell_ms, + } + + def signal_monitor(self): + return { + "snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5, + "agc1": 1200, "agc2": 800, "power_db": -45.3, + "locked": False, "lock": 0x00, "status": 0x01, + } + + def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0, + dwell_ms=15, sr_ksps=1000, mod_index=0, fec_index=5): + n = int((stop_mhz - start_mhz) / step_mhz) + 1 + freqs = [start_mhz + i * step_mhz for i in range(n)] + powers = [-50.0 + 3.0 * (1.0 - abs(f - 1200) / 300) for f in freqs] + raw = [{"agc1": 1200, "agc2": 750, "power_db": p, + "snr_raw": 0, "snr_db": 0, "locked": False, + "lock": 0, "status": 0} for p in powers] + return freqs, powers, raw + + +def _make_mock_skywalker(verbose=False): + sw = MockSkyWalker1(verbose=verbose) + sw.open() + sw.ensure_booted() + return sw + + +class FixedAttenuator: + """Stand-in for a fixed (non-programmable) inline attenuator. + + Reports the declared fixed attenuation for every set_db() call. + Used when testing with a fixed pad instead of the HMC472A. + """ + + def __init__(self, fixed_db: float = 20.0): + self._fixed_db = fixed_db + + def status(self) -> dict: + return {"attenuation_db": self._fixed_db, "step": 0, + "version": "fixed-pad", "note": "non-programmable"} + + def set_db(self, attenuation_db: float) -> dict: + # Can't change a fixed pad — just return what it is + return self.status() + + def config(self) -> dict: + return {"db_min": self._fixed_db, "db_max": self._fixed_db, + "db_step": 0, "version": "fixed-pad", + "hostname": f"fixed-{self._fixed_db:.1f}dB"} + + +class MockHMC472A: + """Mock attenuator for testing without hardware.""" + + def __init__(self, base_url: str = "http://mock.local"): + self.base_url = base_url + self._db = 0.0 + + def status(self) -> dict: + step = int(self._db * 2) + return {"attenuation_db": self._db, "step": step, "version": "mock"} + + def set_db(self, attenuation_db: float) -> dict: + self._db = max(0.0, min(31.5, round(attenuation_db * 2) / 2)) + return self.status() + + def config(self) -> dict: + return {"db_min": 0.0, "db_max": 31.5, "db_step": 0.5, + "version": "mock", "hostname": "mock-attenuator"} + + +# --- NanoVNA control --- + +def try_import_nanovna(): + """Try to import mcnanovna for automated NanoVNA control.""" + try: + from mcnanovna.nanovna import NanoVNA + return NanoVNA + except ImportError: + return None + + +class MockNanoVNA: + """Mock NanoVNA for testing without hardware.""" + + def cw(self, frequency_hz: int = 0, power: int = 3): + pass + + +def manual_nanovna_set(freq_mhz: float, power: int = 3) -> None: + """Prompt the user to manually set NanoVNA CW frequency.""" + print(f"\n >>> Set NanoVNA to CW at {freq_mhz:.3f} MHz, power={power}") + input(" Press Enter when ready...") + + +# --- CSV output --- + +CSV_COLUMNS = [ + "timestamp", "test_name", "freq_mhz", "atten_db", + "agc1", "agc2", "power_db", "snr_raw", "snr_db", + "locked", "lock_raw", "status", "notes", +] + + +def open_csv(path: str): + f = open(path, "w", newline="") + writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) + writer.writeheader() + return f, writer + + +def write_row(writer, csv_file, test_name: str, freq_mhz: float, + atten_db: float, result: dict, notes: str = ""): + writer.writerow({ + "timestamp": datetime.now(timezone.utc).isoformat(), + "test_name": test_name, + "freq_mhz": f"{freq_mhz:.3f}", + "atten_db": f"{atten_db:.1f}", + "agc1": result.get("agc1", 0), + "agc2": result.get("agc2", 0), + "power_db": f"{result.get('power_db', 0):.2f}", + "snr_raw": result.get("snr_raw", 0), + "snr_db": f"{result.get('snr_db', 0):.2f}", + "locked": result.get("locked", False), + "lock_raw": f"0x{result.get('lock', 0):02X}", + "status": f"0x{result.get('status', 0):02X}", + "notes": notes, + }) + if csv_file: + csv_file.flush() + + +# --- Calibration --- + +def load_cal_file(path: str) -> dict: + """Load a NanoVNA S21 path-loss calibration CSV. + + Expects columns: frequency_hz (or freq_mhz), s21_db (or loss_db). + Returns dict mapping freq_mhz -> loss_db (positive = loss). + """ + cal = {} + with open(path) as f: + reader = csv.DictReader(f) + for row in reader: + if "freq_mhz" in row: + freq = float(row["freq_mhz"]) + elif "frequency_hz" in row: + freq = float(row["frequency_hz"]) / 1e6 + else: + continue + + if "loss_db" in row: + loss = float(row["loss_db"]) + elif "s21_db" in row: + loss = -float(row["s21_db"]) # S21 is negative, loss is positive + else: + continue + cal[freq] = loss + return cal + + +def interpolate_loss(cal: dict, freq_mhz: float) -> float: + """Interpolate path loss at a frequency from cal data.""" + if not cal: + return 0.0 + freqs = sorted(cal.keys()) + if freq_mhz <= freqs[0]: + return cal[freqs[0]] + if freq_mhz >= freqs[-1]: + return cal[freqs[-1]] + for i in range(len(freqs) - 1): + if freqs[i] <= freq_mhz <= freqs[i + 1]: + f0, f1 = freqs[i], freqs[i + 1] + t = (freq_mhz - f0) / (f1 - f0) + return cal[f0] + t * (cal[f1] - cal[f0]) + return 0.0 + + +# --- Test: AGC Power Linearity --- + +def test_agc_linearity(sw, atten, nanovna, freq_mhz: float, + writer, csv_file, cal: dict, settle_ms: int) -> list: + """Sweep attenuator from 0 to 31.5 dB and record AGC at each step. + + Maps the AGC transfer function: how AGC register values respond to + known changes in input power. + """ + print(f"\n=== AGC Linearity Test at {freq_mhz:.1f} MHz ===") + results = [] + path_loss = interpolate_loss(cal, freq_mhz) + if path_loss > 0: + print(f" Calibrated path loss: {path_loss:.1f} dB") + + # Set NanoVNA to CW at the test frequency + if nanovna: + nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) + print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3") + else: + manual_nanovna_set(freq_mhz, power=3) + + # Tune SkyWalker-1 to the frequency + freq_khz = int(freq_mhz * 1000) + + print(f"\n {'Atten dB':>9} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} " + f"{'SNR raw':>8} {'Lock':>5}") + print(f" {'─' * 9} {'─' * 6} {'─' * 6} {'─' * 9} {'─' * 8} {'─' * 5}") + + # Sweep in 0.5 dB steps from 0 to 31.5 dB (64 steps) + # Use integer step counter to avoid IEEE 754 float accumulation drift + for step in range(64): # 0, 1, 2, ... 63 → 0.0, 0.5, 1.0, ... 31.5 + atten_db = step * 0.5 + atten.set_db(atten_db) + time.sleep(settle_ms / 1000.0) + + result = sw.tune_monitor( + symbol_rate_sps=1000000, freq_khz=freq_khz, + mod_index=0, fec_index=5, dwell_ms=50 + ) + + locked = "Y" if result.get("locked") else "N" + print(f" {atten_db:9.1f} {result['agc1']:6d} {result['agc2']:6d} " + f"{result['power_db']:9.2f} {result['snr_raw']:8d} {locked:>5}") + + effective_atten = atten_db + path_loss + note = f"effective_atten={effective_atten:.1f}dB" + if writer: + write_row(writer, csv_file, "agc_linearity", freq_mhz, atten_db, + result, note) + + results.append({"atten_db": atten_db, **result}) + + # Summary + if results: + agc1_min = min(r["agc1"] for r in results) + agc1_max = max(r["agc1"] for r in results) + print(f"\n AGC1 range: {agc1_min} - {agc1_max} " + f"(delta={agc1_max - agc1_min}) over 31.5 dB sweep") + + return results + + +# --- Test: IF Band Flatness --- + +def test_band_flatness(sw, atten, nanovna, start_mhz: float, + stop_mhz: float, step_mhz: float, + writer, csv_file, cal: dict, settle_ms: int) -> list: + """Sweep CW across the IF band and record AGC at each frequency. + + Reveals tuner gain slope, passband ripple, and the IF filter response. + """ + atten_db = 10.0 # Fixed attenuation — mid-range for good dynamic range + print(f"\n=== IF Band Flatness: {start_mhz:.0f}-{stop_mhz:.0f} MHz, " + f"step={step_mhz:.1f} MHz ===") + print(f" HMC472A fixed at {atten_db:.1f} dB") + + atten.set_db(atten_db) + results = [] + + # Use integer step counter to avoid float accumulation drift + n_steps = int(round((stop_mhz - start_mhz) / step_mhz)) + 1 + + print(f"\n {'Step':>5} {'Freq MHz':>9} {'AGC1':>6} {'AGC2':>6} " + f"{'Power dB':>9} {'PathLoss':>9} {'Corr dB':>8}") + print(f" {'─' * 5} {'─' * 9} {'─' * 6} {'─' * 6} " + f"{'─' * 9} {'─' * 9} {'─' * 8}") + + for step_num in range(n_steps): + freq_mhz = start_mhz + step_num * step_mhz + + # Set NanoVNA CW + if nanovna: + nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) + else: + manual_nanovna_set(freq_mhz, power=3) + + time.sleep(settle_ms / 1000.0) + + # Tune SkyWalker-1 + freq_khz = int(freq_mhz * 1000) + result = sw.tune_monitor( + symbol_rate_sps=1000000, freq_khz=freq_khz, + mod_index=0, fec_index=5, dwell_ms=50 + ) + + path_loss = interpolate_loss(cal, freq_mhz) + corrected = result["power_db"] + path_loss + + print(f" {step_num + 1:5d} {freq_mhz:9.1f} {result['agc1']:6d} " + f"{result['agc2']:6d} {result['power_db']:9.2f} " + f"{path_loss:9.1f} {corrected:8.2f}") + + note = f"path_loss={path_loss:.1f}dB corrected={corrected:.2f}dB" + if writer: + write_row(writer, csv_file, "band_flatness", freq_mhz, atten_db, + result, note) + + results.append({"freq_mhz": freq_mhz, "corrected_db": corrected, **result}) + + # Summary + if results: + powers = [r["corrected_db"] for r in results] + ripple = max(powers) - min(powers) + print(f"\n Band flatness: {ripple:.2f} dB ripple " + f"(min={min(powers):.2f}, max={max(powers):.2f})") + + return results + + +# --- Test: Frequency Accuracy --- + +def test_freq_accuracy(sw, atten, nanovna, test_freqs: list, + writer, csv_file, settle_ms: int) -> list: + """Inject CW at known frequencies, sweep SkyWalker-1 around each one. + + Compares detected peak vs. injected frequency to characterize the + BCM3440 tuner's frequency accuracy. + """ + print(f"\n=== Frequency Accuracy Test ===") + atten_db = 10.0 + atten.set_db(atten_db) + + results = [] + sweep_span_mhz = 10.0 # Sweep +/- 5 MHz around each test freq + sweep_step_mhz = 1.0 + + for inject_freq in test_freqs: + print(f"\n Injecting CW at {inject_freq:.3f} MHz...") + if nanovna: + nanovna.cw(frequency_hz=int(inject_freq * 1e6), power=3) + else: + manual_nanovna_set(inject_freq, power=3) + + time.sleep(settle_ms / 1000.0) + + # Sweep around the expected frequency + sweep_start = inject_freq - sweep_span_mhz / 2 + sweep_stop = inject_freq + sweep_span_mhz / 2 + + freqs, powers, raw = sw.sweep_spectrum( + sweep_start, sweep_stop, + step_mhz=sweep_step_mhz, dwell_ms=50, + sr_ksps=1000, mod_index=0, fec_index=5, + ) + + # Find peak + if powers: + peak_idx = max(range(len(powers)), key=lambda i: powers[i]) + peak_freq = freqs[peak_idx] + peak_power = powers[peak_idx] + error_mhz = peak_freq - inject_freq + error_khz = error_mhz * 1000 + + print(f" Injected: {inject_freq:.3f} MHz " + f"Detected peak: {peak_freq:.3f} MHz " + f"Error: {error_khz:+.0f} kHz") + + result_entry = { + "inject_freq_mhz": inject_freq, + "peak_freq_mhz": peak_freq, + "error_khz": error_khz, + "peak_power_db": peak_power, + } + results.append(result_entry) + + if writer: + peak_result = raw[peak_idx] if isinstance(raw[peak_idx], dict) else { + "agc1": 0, "agc2": 0, "power_db": peak_power, + "snr_raw": 0, "snr_db": 0, + "locked": False, "lock": 0, "status": 0, + } + write_row(writer, csv_file, "freq_accuracy", inject_freq, + atten_db, peak_result, + f"peak={peak_freq:.3f}MHz error={error_khz:+.0f}kHz") + + # Summary + if results: + errors = [r["error_khz"] for r in results] + mean_err = sum(errors) / len(errors) + max_err = max(abs(e) for e in errors) + print(f"\n Mean frequency error: {mean_err:+.0f} kHz") + print(f" Max absolute error: {max_err:.0f} kHz") + + return results + + +# --- Test: Minimum Detectable Signal --- + +def test_mds(sw, atten, nanovna, freq_mhz: float, + writer, csv_file, settle_ms: int) -> dict: + """Find the minimum detectable signal level. + + Measures noise floor with NanoVNA off (or max attenuation), then + increases attenuation from 0 until the CW signal is indistinguishable + from noise. + """ + print(f"\n=== Minimum Detectable Signal at {freq_mhz:.1f} MHz ===") + freq_khz = int(freq_mhz * 1000) + + # Step 1: measure noise floor (max attenuation) + print(" Measuring noise floor (31.5 dB attenuation)...") + atten.set_db(31.5) + time.sleep(0.2) + + noise_readings = [] + for _ in range(10): + r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) + noise_readings.append(r["power_db"]) + time.sleep(0.05) + + noise_floor = sum(noise_readings) / len(noise_readings) + noise_std = (sum((x - noise_floor) ** 2 for x in noise_readings) + / len(noise_readings)) ** 0.5 + threshold = noise_floor + max(3.0 * noise_std, 1.0) # 3-sigma above noise + print(f" Noise floor: {noise_floor:.2f} dB (std={noise_std:.3f})") + print(f" Detection threshold: {threshold:.2f} dB (noise + 3sigma)") + + # Step 2: inject CW and increase attenuation until signal disappears + if nanovna: + nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) + print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3") + else: + manual_nanovna_set(freq_mhz, power=3) + + print(f"\n {'Atten dB':>9} {'Power dB':>9} {'Above noise':>12} {'Detected':>9}") + print(f" {'─' * 9} {'─' * 9} {'─' * 12} {'─' * 9}") + + mds_atten = None + # 1 dB steps: 0, 1, 2, ... 31 (32 steps) + for step in range(32): + atten_db = float(step) + atten.set_db(atten_db) + time.sleep(settle_ms / 1000.0) + + # Average 5 readings for stability + readings = [] + r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) + readings.append(r["power_db"]) + for _ in range(4): + time.sleep(0.02) + r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) + readings.append(r["power_db"]) + + avg_power = sum(readings) / len(readings) + above_noise = avg_power - noise_floor + detected = avg_power > threshold + + marker = "YES" if detected else "---" + print(f" {atten_db:9.1f} {avg_power:9.2f} {above_noise:+12.2f} " + f"{marker:>9}") + + if writer: + # Use averaged power instead of last single reading + avg_result = dict(r) + avg_result["power_db"] = avg_power + write_row(writer, csv_file, "mds", freq_mhz, atten_db, + avg_result, + f"avg={avg_power:.2f} noise={noise_floor:.2f} " + f"detected={'Y' if detected else 'N'}") + + if not detected and mds_atten is None: + mds_atten = atten_db + + result = { + "freq_mhz": freq_mhz, + "noise_floor_db": noise_floor, + "noise_std": noise_std, + "threshold_db": threshold, + "mds_atten_db": mds_atten, + } + + if mds_atten is not None: + print(f"\n Signal lost at {mds_atten:.1f} dB attenuation") + print(f" (NanoVNA output ~-15 dBm minus {mds_atten:.1f} dB path = " + f"~{-15 - mds_atten:.0f} dBm at receiver)") + else: + print(f"\n Signal detected at all attenuation levels (0-31.5 dB)") + print(f" Need more attenuation to find MDS") + + return result + + +# --- Test: BPSK Mode 9 CW Probe --- + +def test_bpsk_probe(sw, atten, nanovna, freq_mhz: float, + writer, csv_file, settle_ms: int) -> dict: + """Probe BPSK mode 9 response to an unmodulated CW carrier. + + BPSK mode (index 9) uses Viterbi rate 1/2 K=7 — the same inner FEC + as GOES LRIT. A CW carrier has no modulation, so the demodulator + shouldn't lock, but the AGC and carrier recovery behavior reveals + how mode 9 handles a clean carrier. + """ + print(f"\n=== BPSK Mode 9 CW Probe at {freq_mhz:.1f} MHz ===") + bpsk_index = MODULATIONS["bpsk"][0] # Mode 9 + freq_khz = int(freq_mhz * 1000) + atten_db = 10.0 + atten.set_db(atten_db) + + if nanovna: + nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) + else: + manual_nanovna_set(freq_mhz, power=3) + + time.sleep(settle_ms / 1000.0) + + # Test with different symbol rates typical of LRIT-like signals + test_rates = [293883, 500000, 1000000, 5000000] + + print(f"\n {'SR (sps)':>10} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} " + f"{'SNR raw':>8} {'SNR dB':>7} {'Lock':>6} {'Status':>8}") + print(f" {'─' * 10} {'─' * 6} {'─' * 6} {'─' * 9} " + f"{'─' * 8} {'─' * 7} {'─' * 6} {'─' * 8}") + + results = [] + for sr in test_rates: + # FEC 1/2 (index 0) for BPSK mode + result = sw.tune_monitor(sr, freq_khz, bpsk_index, 0, dwell_ms=100) + + locked = "Y" if result.get("locked") else "N" + print(f" {sr:10d} {result['agc1']:6d} {result['agc2']:6d} " + f"{result['power_db']:9.2f} {result['snr_raw']:8d} " + f"{result['snr_db']:7.2f} {locked:>6} " + f"0x{result.get('status', 0):02X}") + + if writer: + write_row(writer, csv_file, "bpsk_probe", freq_mhz, atten_db, + result, f"mode=bpsk sr={sr} fec=1/2") + + results.append({"symbol_rate": sr, **result}) + + # Compare with QPSK mode 0 at same settings + print(f"\n Reference: QPSK mode 0 at same frequency") + ref = sw.tune_monitor(1000000, freq_khz, 0, 5, dwell_ms=100) + ref_locked = "Y" if ref.get("locked") else "N" + print(f" {'1000000':>10} {ref['agc1']:6d} {ref['agc2']:6d} " + f"{ref['power_db']:9.2f} {ref['snr_raw']:8d} " + f"{ref['snr_db']:7.2f} {ref_locked:>6} " + f"0x{ref.get('status', 0):02X}") + + return {"bpsk_results": results, "qpsk_reference": ref} + + +# --- Main --- + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="rf_testbench.py", + description="CW injection test bench: NanoVNA + HMC472A + SkyWalker-1", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog="""\ +examples: + %(prog)s agc-linearity --freq 1200 + %(prog)s band-flatness --start 950 --stop 1500 --step 10 + %(prog)s freq-accuracy --freqs 1000,1200,1400 + %(prog)s mds --freq 1200 + %(prog)s bpsk-probe --freq 1200 + %(prog)s band-flatness --attenuator /dev/ttyACM1 + %(prog)s agc-linearity --attenuator http://attenuator.local --freq 1200 + +hardware setup: + NanoVNA CH0 → DC Blocker → HMC472A (0-31.5 dB) → SMA-to-F → SkyWalker-1 + + The HMC472A is controlled via USB serial (preferred) or HTTP REST API. + Use --attenuator auto (default) to auto-detect USB, falling back to HTTP. + The NanoVNA provides CW, controlled via mcnanovna or manually. + LNB power is disabled (direct L-band input mode). +""", + ) + + parser.add_argument("-v", "--verbose", action="store_true", + help="Show raw USB traffic") + parser.add_argument("-o", "--output", type=str, default=None, + help="CSV output file path") + parser.add_argument("--cal", type=str, default=None, + help="Path loss calibration CSV (NanoVNA S21 sweep)") + parser.add_argument("--attenuator", type=str, default="auto", + help="HMC472A connection: 'auto' (USB then HTTP), " + "/dev/ttyACMx (USB serial), http://host (REST), " + "or 'fixed:20' for a non-programmable pad " + "(default: auto)") + parser.add_argument("--nanovna", choices=["auto", "manual"], + default="auto", + help="NanoVNA control mode (default: auto via mcnanovna)") + parser.add_argument("--settle", type=int, default=200, + help="Settle time in ms after changing attenuation " + "(default: 200)") + + sub = parser.add_subparsers(dest="test", required=True) + + # AGC linearity + p_agc = sub.add_parser("agc-linearity", + help="Sweep attenuation at fixed freq, map AGC curve") + p_agc.add_argument("--freq", type=float, default=1200.0, + help="Test frequency in MHz (default: 1200)") + + # Band flatness + p_band = sub.add_parser("band-flatness", + help="Sweep CW across IF band, measure AGC response") + p_band.add_argument("--start", type=float, default=950.0, + help="Start frequency in MHz (default: 950)") + p_band.add_argument("--stop", type=float, default=1500.0, + help="Stop frequency in MHz (default: 1500)") + p_band.add_argument("--step", type=float, default=10.0, + help="Frequency step in MHz (default: 10)") + + # Frequency accuracy + p_freq = sub.add_parser("freq-accuracy", + help="Inject CW at known freqs, measure error") + p_freq.add_argument("--freqs", type=str, default="1000,1100,1200,1300,1400", + help="Comma-separated test frequencies in MHz") + + # Minimum detectable signal + p_mds = sub.add_parser("mds", + help="Find minimum detectable signal level") + p_mds.add_argument("--freq", type=float, default=1200.0, + help="Test frequency in MHz (default: 1200)") + + # BPSK mode 9 probe + p_bpsk = sub.add_parser("bpsk-probe", + help="Probe BPSK mode 9 with CW carrier") + p_bpsk.add_argument("--freq", type=float, default=1200.0, + help="Test frequency in MHz (default: 1200)") + + return parser + + +def _connect_attenuator(target: str): + """Connect to HMC472A via auto-detect, USB serial, HTTP REST, or fixed pad. + + Args: + target: "auto", "fixed:XX" (dB), /dev/ttyACM* (USB), or http://... (REST) + """ + # Fixed attenuator mode (non-programmable inline pad) + if target.startswith("fixed:"): + try: + fixed_db = float(target.split(":", 1)[1]) + except ValueError: + print(f"HMC472A: invalid fixed value '{target}' (use fixed:20)") + sys.exit(1) + atten = FixedAttenuator(fixed_db) + print(f"HMC472A: fixed {fixed_db:.1f} dB pad (non-programmable)") + return atten + + # Auto-detect: try USB serial first, fall back to HTTP + if target == "auto": + port = detect_hmc472a_serial() + if port: + print(f"HMC472A: auto-detected USB serial on {port}") + target = port + else: + print("HMC472A: no USB device found, trying HTTP...") + target = "http://attenuator.local" + + # USB serial path + if target.startswith("/dev/"): + try: + atten = HMC472ASerial(target) + info = atten.identify() + print(f"HMC472A: USB serial on {target} " + f"(v{info.get('version', '?')}, " + f"protocol {info.get('protocol', '?')})") + return atten + except ImportError: + print("HMC472A: pyserial not installed (pip install pyserial)") + sys.exit(1) + except (OSError, TimeoutError) as e: + print(f"HMC472A: cannot open {target} ({e})") + sys.exit(1) + + # HTTP REST API + atten = HMC472A(target) + try: + cfg = atten.config() + print(f"HMC472A: HTTP on {target} ({cfg.get('hostname', '?')}, " + f"v{cfg.get('version', '?')})") + return atten + except (URLError, OSError) as e: + print(f"HMC472A: cannot reach {target} ({e})") + print(" Use --attenuator /dev/ttyACMx (USB) or http://host (HTTP)") + sys.exit(1) + + +def main(): + parser = build_parser() + args = parser.parse_args() + + # Mock mode for testing without hardware + mock_mode = os.environ.get("SKYWALKER_MOCK") + + # Set up HMC472A attenuator + if mock_mode: + atten = MockHMC472A() + print("HMC472A: mock mode") + else: + atten = _connect_attenuator(args.attenuator) + + # Set up NanoVNA + nanovna = None + if mock_mode: + nanovna = MockNanoVNA() + print("NanoVNA: mock mode") + elif args.nanovna == "auto": + NanoVNA = try_import_nanovna() + if NanoVNA: + try: + nanovna = NanoVNA() + print(f"NanoVNA: auto mode (mcnanovna)") + except Exception as e: + print(f"NanoVNA: mcnanovna failed ({e}), falling back to manual") + else: + print("NanoVNA: mcnanovna not installed, using manual mode") + print(" Install: uv pip install -e /path/to/mcnanovna") + else: + print("NanoVNA: manual mode (you'll be prompted to set frequencies)") + + # Load calibration + cal = {} + if args.cal: + cal = load_cal_file(args.cal) + print(f"Calibration: loaded {len(cal)} points from {args.cal}") + + # Open CSV output + csv_file = None + writer = None + if args.output: + csv_file, writer = open_csv(args.output) + + # Open SkyWalker-1 + # SAFETY: Boot demodulator WITHOUT enabling LNB power. ensure_booted() + # transiently enables LNB voltage (13-18V on the F-connector), which + # would travel backward through the attenuator toward the NanoVNA. + # The DC blocker protects against this, but code should never rely on + # external protection it cannot verify. + if mock_mode: + sw = _make_mock_skywalker(args.verbose) + print("SkyWalker-1: mock mode") + else: + sw = SkyWalker1(verbose=args.verbose) + sw.open() + # Ensure LNB power is OFF before booting demodulator + sw.start_intersil(on=False) + status = sw.get_config() + if not (status & 0x01): + sw.boot(on=True) + time.sleep(0.5) + status = sw.get_config() + if not (status & 0x01): + print("ERROR: Device failed to start") + sys.exit(1) + print("SkyWalker-1: booted (LNB power kept OFF)") + + # Confirm LNB power disabled — direct input mode + sw.start_intersil(on=False) + print("LNB power disabled (direct input mode)") + print() + + try: + if args.test == "agc-linearity": + test_agc_linearity(sw, atten, nanovna, args.freq, + writer, csv_file, cal, args.settle) + elif args.test == "band-flatness": + test_band_flatness(sw, atten, nanovna, args.start, args.stop, + args.step, writer, csv_file, cal, args.settle) + elif args.test == "freq-accuracy": + freqs = [float(f) for f in args.freqs.split(",")] + test_freq_accuracy(sw, atten, nanovna, freqs, + writer, csv_file, args.settle) + elif args.test == "mds": + test_mds(sw, atten, nanovna, args.freq, + writer, csv_file, args.settle) + elif args.test == "bpsk-probe": + test_bpsk_probe(sw, atten, nanovna, args.freq, + writer, csv_file, args.settle) + except KeyboardInterrupt: + print("\n\nInterrupted by operator.") + finally: + # Safe state: maximum attenuation before releasing hardware + try: + atten.set_db(31.5) + print("Attenuator set to 31.5 dB (safe state)") + except Exception: + pass # best-effort on cleanup path + if csv_file: + csv_file.flush() + csv_file.close() + print(f"\nData saved to {args.output}") + if not mock_mode: + sw.close() + + +if __name__ == "__main__": + main() diff --git a/udev/99-rf-testbench.rules b/udev/99-rf-testbench.rules new file mode 100644 index 0000000..250008e --- /dev/null +++ b/udev/99-rf-testbench.rules @@ -0,0 +1,24 @@ +# RF Test Bench udev rules +# Install: sudo cp udev/99-rf-testbench.rules /etc/udev/rules.d/ && sudo udevadm control --reload-rules +# +# Provides non-root access and stable /dev symlinks for: +# /dev/skywalker1 - Genpix SkyWalker-1 DVB-S receiver (USB bulk device, not serial) +# /dev/nanovna - NanoVNA-H vector network analyzer (ttyACM) +# /dev/attenuator - HMC472A digital attenuator on ESP32-S3 (ttyACM) + +# --- Genpix SkyWalker-1 (09c0:0203) --- +# Custom firmware: Product="SkyWalker-1 Custom", Serial="0001" +# Stock firmware: Product="Genpix SkyWalker-1", Serial="00857" +# Kernel dvb_usb_gp8psk driver blacklisted in /etc/modprobe.d/blacklist-gp8psk.conf +SUBSYSTEM=="usb", ATTR{idVendor}=="09c0", ATTR{idProduct}=="0203", MODE="0666", SYMLINK+="skywalker1" + +# Cypress FX2 bare/unprogrammed (04b4:8613) - for recovery/development +SUBSYSTEM=="usb", ATTR{idVendor}=="04b4", ATTR{idProduct}=="8613", MODE="0666" + +# --- NanoVNA-H (0483:5740) --- +# Match on model to avoid hitting other STM32 CDC devices +SUBSYSTEM=="tty", ATTRS{idVendor}=="0483", ATTRS{idProduct}=="5740", ATTRS{product}=="NanoVNA-H", MODE="0666", SYMLINK+="nanovna" + +# --- HMC472A attenuator on ESP32-S3 native USB CDC --- +# Espressif VID=303a, PID=1001 (USB JTAG/serial), match on product string +SUBSYSTEM=="tty", ATTRS{idVendor}=="303a", ATTRS{product}=="hmc472a-attenuator", MODE="0666", SYMLINK+="attenuator"