#!/usr/bin/env python3 """ RF Test Bench — CW injection tests with NanoVNA + HMC472A + SkyWalker-1. Injects CW signals from a NanoVNA-H through a programmable HMC472A digital attenuator into the SkyWalker-1 receiver. Runs automated test sequences to characterize AGC linearity, IF band flatness, frequency accuracy, minimum detectable signal, and BPSK mode 9 behavior. Hardware setup: NanoVNA CH0 → DC Blocker → HMC472A → SMA-to-F → SkyWalker-1 The HMC472A (0-31.5 dB, 0.5 dB steps) is controlled via its ESP32-S2 REST API. The NanoVNA provides CW at a fixed frequency, controlled either via mcnanovna or manually. The SkyWalker-1 measures AGC, SNR, and lock status. Usage: python rf_testbench.py agc-linearity --freq 1200 python rf_testbench.py band-flatness --start 950 --stop 1500 --step 10 python rf_testbench.py freq-accuracy --freqs 1000,1200,1400 python rf_testbench.py mds --freq 1200 python rf_testbench.py bpsk-probe --freq 1200 python rf_testbench.py --help """ import sys import os import argparse import csv import json import time from datetime import datetime, timezone from urllib.request import urlopen, Request from urllib.error import URLError sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from skywalker_lib import SkyWalker1, MODULATIONS # --- HMC472A REST client --- class HMC472A: """Control the HMC472A digital attenuator via its ESP32-S2 REST API.""" def __init__(self, base_url: str = "http://attenuator.local"): self.base_url = base_url.rstrip("/") def _get(self, path: str, retries: int = 3) -> dict: url = f"{self.base_url}{path}" req = Request(url) last_err: OSError = OSError("no attempts made") for attempt in range(retries): try: with urlopen(req, timeout=5) as resp: return json.loads(resp.read()) except (URLError, OSError) as e: last_err = e if attempt < retries - 1: time.sleep(0.2 * (attempt + 1)) raise last_err def _post(self, path: str, data: dict, retries: int = 3) -> dict: url = f"{self.base_url}{path}" body = json.dumps(data).encode() req = Request(url, data=body, method="POST", headers={"Content-Type": "application/json"}) last_err: OSError = OSError("no attempts made") for attempt in range(retries): try: with urlopen(req, timeout=5) as resp: return json.loads(resp.read()) except (URLError, OSError) as e: last_err = e if attempt < retries - 1: time.sleep(0.2 * (attempt + 1)) raise last_err def status(self) -> dict: return self._get("/status") def set_db(self, attenuation_db: float) -> dict: clamped = max(0.0, min(31.5, attenuation_db)) rounded = round(clamped * 2) / 2 # Snap to 0.5 dB steps return self._post("/set", {"attenuation_db": rounded}) def config(self) -> dict: return self._get("/config") class MockSkyWalker1: """Lightweight mock SkyWalker-1 for rf_testbench testing.""" def __init__(self, verbose=False): self.verbose = verbose self._freq_khz = 0 def open(self): pass def close(self): pass def ensure_booted(self): pass def start_intersil(self, on=True): pass def tune_monitor(self, symbol_rate_sps=1000000, freq_khz=1200000, mod_index=0, fec_index=5, dwell_ms=10): self._freq_khz = freq_khz # Simulate AGC response: higher freq → slightly lower power base_agc1 = 1200 + (freq_khz - 1200000) // 100 return { "snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0, "agc1": max(100, base_agc1), "agc2": 750, "power_db": -46.1 - (freq_khz - 1200000) / 500000, "locked": False, "lock": 0x00, "status": 0x01, "dwell_ms": dwell_ms, } def signal_monitor(self): return { "snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5, "agc1": 1200, "agc2": 800, "power_db": -45.3, "locked": False, "lock": 0x00, "status": 0x01, } def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0, dwell_ms=15, sr_ksps=1000, mod_index=0, fec_index=5): n = int((stop_mhz - start_mhz) / step_mhz) + 1 freqs = [start_mhz + i * step_mhz for i in range(n)] powers = [-50.0 + 3.0 * (1.0 - abs(f - 1200) / 300) for f in freqs] raw = [{"agc1": 1200, "agc2": 750, "power_db": p, "snr_raw": 0, "snr_db": 0, "locked": False, "lock": 0, "status": 0} for p in powers] return freqs, powers, raw def _make_mock_skywalker(verbose=False): sw = MockSkyWalker1(verbose=verbose) sw.open() sw.ensure_booted() return sw class MockHMC472A: """Mock attenuator for testing without hardware.""" def __init__(self, base_url: str = "http://mock.local"): self.base_url = base_url self._db = 0.0 def status(self) -> dict: step = int(self._db * 2) return {"attenuation_db": self._db, "step": step, "version": "mock"} def set_db(self, attenuation_db: float) -> dict: self._db = max(0.0, min(31.5, round(attenuation_db * 2) / 2)) return self.status() def config(self) -> dict: return {"db_min": 0.0, "db_max": 31.5, "db_step": 0.5, "version": "mock", "hostname": "mock-attenuator"} # --- NanoVNA control --- def try_import_nanovna(): """Try to import mcnanovna for automated NanoVNA control.""" try: from mcnanovna.nanovna import NanoVNA return NanoVNA except ImportError: return None class MockNanoVNA: """Mock NanoVNA for testing without hardware.""" def cw(self, frequency_hz: int = 0, power: int = 3): pass def manual_nanovna_set(freq_mhz: float, power: int = 3) -> None: """Prompt the user to manually set NanoVNA CW frequency.""" print(f"\n >>> Set NanoVNA to CW at {freq_mhz:.3f} MHz, power={power}") input(" Press Enter when ready...") # --- CSV output --- CSV_COLUMNS = [ "timestamp", "test_name", "freq_mhz", "atten_db", "agc1", "agc2", "power_db", "snr_raw", "snr_db", "locked", "lock_raw", "status", "notes", ] def open_csv(path: str): f = open(path, "w", newline="") writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) writer.writeheader() return f, writer def write_row(writer, csv_file, test_name: str, freq_mhz: float, atten_db: float, result: dict, notes: str = ""): writer.writerow({ "timestamp": datetime.now(timezone.utc).isoformat(), "test_name": test_name, "freq_mhz": f"{freq_mhz:.3f}", "atten_db": f"{atten_db:.1f}", "agc1": result.get("agc1", 0), "agc2": result.get("agc2", 0), "power_db": f"{result.get('power_db', 0):.2f}", "snr_raw": result.get("snr_raw", 0), "snr_db": f"{result.get('snr_db', 0):.2f}", "locked": result.get("locked", False), "lock_raw": f"0x{result.get('lock', 0):02X}", "status": f"0x{result.get('status', 0):02X}", "notes": notes, }) if csv_file: csv_file.flush() # --- Calibration --- def load_cal_file(path: str) -> dict: """Load a NanoVNA S21 path-loss calibration CSV. Expects columns: frequency_hz (or freq_mhz), s21_db (or loss_db). Returns dict mapping freq_mhz -> loss_db (positive = loss). """ cal = {} with open(path) as f: reader = csv.DictReader(f) for row in reader: if "freq_mhz" in row: freq = float(row["freq_mhz"]) elif "frequency_hz" in row: freq = float(row["frequency_hz"]) / 1e6 else: continue if "loss_db" in row: loss = float(row["loss_db"]) elif "s21_db" in row: loss = -float(row["s21_db"]) # S21 is negative, loss is positive else: continue cal[freq] = loss return cal def interpolate_loss(cal: dict, freq_mhz: float) -> float: """Interpolate path loss at a frequency from cal data.""" if not cal: return 0.0 freqs = sorted(cal.keys()) if freq_mhz <= freqs[0]: return cal[freqs[0]] if freq_mhz >= freqs[-1]: return cal[freqs[-1]] for i in range(len(freqs) - 1): if freqs[i] <= freq_mhz <= freqs[i + 1]: f0, f1 = freqs[i], freqs[i + 1] t = (freq_mhz - f0) / (f1 - f0) return cal[f0] + t * (cal[f1] - cal[f0]) return 0.0 # --- Test: AGC Power Linearity --- def test_agc_linearity(sw, atten, nanovna, freq_mhz: float, writer, csv_file, cal: dict, settle_ms: int) -> list: """Sweep attenuator from 0 to 31.5 dB and record AGC at each step. Maps the AGC transfer function: how AGC register values respond to known changes in input power. """ print(f"\n=== AGC Linearity Test at {freq_mhz:.1f} MHz ===") results = [] path_loss = interpolate_loss(cal, freq_mhz) if path_loss > 0: print(f" Calibrated path loss: {path_loss:.1f} dB") # Set NanoVNA to CW at the test frequency if nanovna: nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3") else: manual_nanovna_set(freq_mhz, power=3) # Tune SkyWalker-1 to the frequency freq_khz = int(freq_mhz * 1000) print(f"\n {'Atten dB':>9} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} " f"{'SNR raw':>8} {'Lock':>5}") print(f" {'─' * 9} {'─' * 6} {'─' * 6} {'─' * 9} {'─' * 8} {'─' * 5}") # Sweep in 0.5 dB steps from 0 to 31.5 dB (64 steps) # Use integer step counter to avoid IEEE 754 float accumulation drift for step in range(64): # 0, 1, 2, ... 63 → 0.0, 0.5, 1.0, ... 31.5 atten_db = step * 0.5 atten.set_db(atten_db) time.sleep(settle_ms / 1000.0) result = sw.tune_monitor( symbol_rate_sps=1000000, freq_khz=freq_khz, mod_index=0, fec_index=5, dwell_ms=50 ) locked = "Y" if result.get("locked") else "N" print(f" {atten_db:9.1f} {result['agc1']:6d} {result['agc2']:6d} " f"{result['power_db']:9.2f} {result['snr_raw']:8d} {locked:>5}") effective_atten = atten_db + path_loss note = f"effective_atten={effective_atten:.1f}dB" if writer: write_row(writer, csv_file, "agc_linearity", freq_mhz, atten_db, result, note) results.append({"atten_db": atten_db, **result}) # Summary if results: agc1_min = min(r["agc1"] for r in results) agc1_max = max(r["agc1"] for r in results) print(f"\n AGC1 range: {agc1_min} - {agc1_max} " f"(delta={agc1_max - agc1_min}) over 31.5 dB sweep") return results # --- Test: IF Band Flatness --- def test_band_flatness(sw, atten, nanovna, start_mhz: float, stop_mhz: float, step_mhz: float, writer, csv_file, cal: dict, settle_ms: int) -> list: """Sweep CW across the IF band and record AGC at each frequency. Reveals tuner gain slope, passband ripple, and the IF filter response. """ atten_db = 10.0 # Fixed attenuation — mid-range for good dynamic range print(f"\n=== IF Band Flatness: {start_mhz:.0f}-{stop_mhz:.0f} MHz, " f"step={step_mhz:.1f} MHz ===") print(f" HMC472A fixed at {atten_db:.1f} dB") atten.set_db(atten_db) results = [] # Use integer step counter to avoid float accumulation drift n_steps = int(round((stop_mhz - start_mhz) / step_mhz)) + 1 print(f"\n {'Step':>5} {'Freq MHz':>9} {'AGC1':>6} {'AGC2':>6} " f"{'Power dB':>9} {'PathLoss':>9} {'Corr dB':>8}") print(f" {'─' * 5} {'─' * 9} {'─' * 6} {'─' * 6} " f"{'─' * 9} {'─' * 9} {'─' * 8}") for step_num in range(n_steps): freq_mhz = start_mhz + step_num * step_mhz # Set NanoVNA CW if nanovna: nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) else: manual_nanovna_set(freq_mhz, power=3) time.sleep(settle_ms / 1000.0) # Tune SkyWalker-1 freq_khz = int(freq_mhz * 1000) result = sw.tune_monitor( symbol_rate_sps=1000000, freq_khz=freq_khz, mod_index=0, fec_index=5, dwell_ms=50 ) path_loss = interpolate_loss(cal, freq_mhz) corrected = result["power_db"] + path_loss print(f" {step_num + 1:5d} {freq_mhz:9.1f} {result['agc1']:6d} " f"{result['agc2']:6d} {result['power_db']:9.2f} " f"{path_loss:9.1f} {corrected:8.2f}") note = f"path_loss={path_loss:.1f}dB corrected={corrected:.2f}dB" if writer: write_row(writer, csv_file, "band_flatness", freq_mhz, atten_db, result, note) results.append({"freq_mhz": freq_mhz, "corrected_db": corrected, **result}) # Summary if results: powers = [r["corrected_db"] for r in results] ripple = max(powers) - min(powers) print(f"\n Band flatness: {ripple:.2f} dB ripple " f"(min={min(powers):.2f}, max={max(powers):.2f})") return results # --- Test: Frequency Accuracy --- def test_freq_accuracy(sw, atten, nanovna, test_freqs: list, writer, csv_file, settle_ms: int) -> list: """Inject CW at known frequencies, sweep SkyWalker-1 around each one. Compares detected peak vs. injected frequency to characterize the BCM3440 tuner's frequency accuracy. """ print(f"\n=== Frequency Accuracy Test ===") atten_db = 10.0 atten.set_db(atten_db) results = [] sweep_span_mhz = 10.0 # Sweep +/- 5 MHz around each test freq sweep_step_mhz = 1.0 for inject_freq in test_freqs: print(f"\n Injecting CW at {inject_freq:.3f} MHz...") if nanovna: nanovna.cw(frequency_hz=int(inject_freq * 1e6), power=3) else: manual_nanovna_set(inject_freq, power=3) time.sleep(settle_ms / 1000.0) # Sweep around the expected frequency sweep_start = inject_freq - sweep_span_mhz / 2 sweep_stop = inject_freq + sweep_span_mhz / 2 freqs, powers, raw = sw.sweep_spectrum( sweep_start, sweep_stop, step_mhz=sweep_step_mhz, dwell_ms=50, sr_ksps=1000, mod_index=0, fec_index=5, ) # Find peak if powers: peak_idx = max(range(len(powers)), key=lambda i: powers[i]) peak_freq = freqs[peak_idx] peak_power = powers[peak_idx] error_mhz = peak_freq - inject_freq error_khz = error_mhz * 1000 print(f" Injected: {inject_freq:.3f} MHz " f"Detected peak: {peak_freq:.3f} MHz " f"Error: {error_khz:+.0f} kHz") result_entry = { "inject_freq_mhz": inject_freq, "peak_freq_mhz": peak_freq, "error_khz": error_khz, "peak_power_db": peak_power, } results.append(result_entry) if writer: peak_result = raw[peak_idx] if isinstance(raw[peak_idx], dict) else { "agc1": 0, "agc2": 0, "power_db": peak_power, "snr_raw": 0, "snr_db": 0, "locked": False, "lock": 0, "status": 0, } write_row(writer, csv_file, "freq_accuracy", inject_freq, atten_db, peak_result, f"peak={peak_freq:.3f}MHz error={error_khz:+.0f}kHz") # Summary if results: errors = [r["error_khz"] for r in results] mean_err = sum(errors) / len(errors) max_err = max(abs(e) for e in errors) print(f"\n Mean frequency error: {mean_err:+.0f} kHz") print(f" Max absolute error: {max_err:.0f} kHz") return results # --- Test: Minimum Detectable Signal --- def test_mds(sw, atten, nanovna, freq_mhz: float, writer, csv_file, settle_ms: int) -> dict: """Find the minimum detectable signal level. Measures noise floor with NanoVNA off (or max attenuation), then increases attenuation from 0 until the CW signal is indistinguishable from noise. """ print(f"\n=== Minimum Detectable Signal at {freq_mhz:.1f} MHz ===") freq_khz = int(freq_mhz * 1000) # Step 1: measure noise floor (max attenuation) print(" Measuring noise floor (31.5 dB attenuation)...") atten.set_db(31.5) time.sleep(0.2) noise_readings = [] for _ in range(10): r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) noise_readings.append(r["power_db"]) time.sleep(0.05) noise_floor = sum(noise_readings) / len(noise_readings) noise_std = (sum((x - noise_floor) ** 2 for x in noise_readings) / len(noise_readings)) ** 0.5 threshold = noise_floor + max(3.0 * noise_std, 1.0) # 3-sigma above noise print(f" Noise floor: {noise_floor:.2f} dB (std={noise_std:.3f})") print(f" Detection threshold: {threshold:.2f} dB (noise + 3sigma)") # Step 2: inject CW and increase attenuation until signal disappears if nanovna: nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3") else: manual_nanovna_set(freq_mhz, power=3) print(f"\n {'Atten dB':>9} {'Power dB':>9} {'Above noise':>12} {'Detected':>9}") print(f" {'─' * 9} {'─' * 9} {'─' * 12} {'─' * 9}") mds_atten = None # 1 dB steps: 0, 1, 2, ... 31 (32 steps) for step in range(32): atten_db = float(step) atten.set_db(atten_db) time.sleep(settle_ms / 1000.0) # Average 5 readings for stability readings = [] r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) readings.append(r["power_db"]) for _ in range(4): time.sleep(0.02) r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50) readings.append(r["power_db"]) avg_power = sum(readings) / len(readings) above_noise = avg_power - noise_floor detected = avg_power > threshold marker = "YES" if detected else "---" print(f" {atten_db:9.1f} {avg_power:9.2f} {above_noise:+12.2f} " f"{marker:>9}") if writer: # Use averaged power instead of last single reading avg_result = dict(r) avg_result["power_db"] = avg_power write_row(writer, csv_file, "mds", freq_mhz, atten_db, avg_result, f"avg={avg_power:.2f} noise={noise_floor:.2f} " f"detected={'Y' if detected else 'N'}") if not detected and mds_atten is None: mds_atten = atten_db result = { "freq_mhz": freq_mhz, "noise_floor_db": noise_floor, "noise_std": noise_std, "threshold_db": threshold, "mds_atten_db": mds_atten, } if mds_atten is not None: print(f"\n Signal lost at {mds_atten:.1f} dB attenuation") print(f" (NanoVNA output ~-15 dBm minus {mds_atten:.1f} dB path = " f"~{-15 - mds_atten:.0f} dBm at receiver)") else: print(f"\n Signal detected at all attenuation levels (0-31.5 dB)") print(f" Need more attenuation to find MDS") return result # --- Test: BPSK Mode 9 CW Probe --- def test_bpsk_probe(sw, atten, nanovna, freq_mhz: float, writer, csv_file, settle_ms: int) -> dict: """Probe BPSK mode 9 response to an unmodulated CW carrier. BPSK mode (index 9) uses Viterbi rate 1/2 K=7 — the same inner FEC as GOES LRIT. A CW carrier has no modulation, so the demodulator shouldn't lock, but the AGC and carrier recovery behavior reveals how mode 9 handles a clean carrier. """ print(f"\n=== BPSK Mode 9 CW Probe at {freq_mhz:.1f} MHz ===") bpsk_index = MODULATIONS["bpsk"][0] # Mode 9 freq_khz = int(freq_mhz * 1000) atten_db = 10.0 atten.set_db(atten_db) if nanovna: nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3) else: manual_nanovna_set(freq_mhz, power=3) time.sleep(settle_ms / 1000.0) # Test with different symbol rates typical of LRIT-like signals test_rates = [293883, 500000, 1000000, 5000000] print(f"\n {'SR (sps)':>10} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} " f"{'SNR raw':>8} {'SNR dB':>7} {'Lock':>6} {'Status':>8}") print(f" {'─' * 10} {'─' * 6} {'─' * 6} {'─' * 9} " f"{'─' * 8} {'─' * 7} {'─' * 6} {'─' * 8}") results = [] for sr in test_rates: # FEC 1/2 (index 0) for BPSK mode result = sw.tune_monitor(sr, freq_khz, bpsk_index, 0, dwell_ms=100) locked = "Y" if result.get("locked") else "N" print(f" {sr:10d} {result['agc1']:6d} {result['agc2']:6d} " f"{result['power_db']:9.2f} {result['snr_raw']:8d} " f"{result['snr_db']:7.2f} {locked:>6} " f"0x{result.get('status', 0):02X}") if writer: write_row(writer, csv_file, "bpsk_probe", freq_mhz, atten_db, result, f"mode=bpsk sr={sr} fec=1/2") results.append({"symbol_rate": sr, **result}) # Compare with QPSK mode 0 at same settings print(f"\n Reference: QPSK mode 0 at same frequency") ref = sw.tune_monitor(1000000, freq_khz, 0, 5, dwell_ms=100) ref_locked = "Y" if ref.get("locked") else "N" print(f" {'1000000':>10} {ref['agc1']:6d} {ref['agc2']:6d} " f"{ref['power_db']:9.2f} {ref['snr_raw']:8d} " f"{ref['snr_db']:7.2f} {ref_locked:>6} " f"0x{ref.get('status', 0):02X}") return {"bpsk_results": results, "qpsk_reference": ref} # --- Main --- def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( prog="rf_testbench.py", description="CW injection test bench: NanoVNA + HMC472A + SkyWalker-1", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""\ examples: %(prog)s agc-linearity --freq 1200 %(prog)s band-flatness --start 950 --stop 1500 --step 10 %(prog)s freq-accuracy --freqs 1000,1200,1400 %(prog)s mds --freq 1200 %(prog)s bpsk-probe --freq 1200 %(prog)s band-flatness --nanovna auto --attenuator http://10.0.0.50 hardware setup: NanoVNA CH0 → DC Blocker → HMC472A (0-31.5 dB) → SMA-to-F → SkyWalker-1 The HMC472A is controlled via its ESP32-S2 REST API. The NanoVNA provides CW, controlled via mcnanovna or manually. LNB power is disabled (direct L-band input mode). """, ) parser.add_argument("-v", "--verbose", action="store_true", help="Show raw USB traffic") parser.add_argument("-o", "--output", type=str, default=None, help="CSV output file path") parser.add_argument("--cal", type=str, default=None, help="Path loss calibration CSV (NanoVNA S21 sweep)") parser.add_argument("--attenuator", type=str, default="http://attenuator.local", help="HMC472A REST API base URL " "(default: http://attenuator.local)") parser.add_argument("--nanovna", choices=["auto", "manual"], default="auto", help="NanoVNA control mode (default: auto via mcnanovna)") parser.add_argument("--settle", type=int, default=200, help="Settle time in ms after changing attenuation " "(default: 200)") sub = parser.add_subparsers(dest="test", required=True) # AGC linearity p_agc = sub.add_parser("agc-linearity", help="Sweep attenuation at fixed freq, map AGC curve") p_agc.add_argument("--freq", type=float, default=1200.0, help="Test frequency in MHz (default: 1200)") # Band flatness p_band = sub.add_parser("band-flatness", help="Sweep CW across IF band, measure AGC response") p_band.add_argument("--start", type=float, default=950.0, help="Start frequency in MHz (default: 950)") p_band.add_argument("--stop", type=float, default=1500.0, help="Stop frequency in MHz (default: 1500)") p_band.add_argument("--step", type=float, default=10.0, help="Frequency step in MHz (default: 10)") # Frequency accuracy p_freq = sub.add_parser("freq-accuracy", help="Inject CW at known freqs, measure error") p_freq.add_argument("--freqs", type=str, default="1000,1100,1200,1300,1400", help="Comma-separated test frequencies in MHz") # Minimum detectable signal p_mds = sub.add_parser("mds", help="Find minimum detectable signal level") p_mds.add_argument("--freq", type=float, default=1200.0, help="Test frequency in MHz (default: 1200)") # BPSK mode 9 probe p_bpsk = sub.add_parser("bpsk-probe", help="Probe BPSK mode 9 with CW carrier") p_bpsk.add_argument("--freq", type=float, default=1200.0, help="Test frequency in MHz (default: 1200)") return parser def main(): parser = build_parser() args = parser.parse_args() # Mock mode for testing without hardware mock_mode = os.environ.get("SKYWALKER_MOCK") # Set up HMC472A attenuator if mock_mode: atten = MockHMC472A() print("HMC472A: mock mode") else: atten = HMC472A(args.attenuator) try: cfg = atten.config() print(f"HMC472A: connected ({cfg.get('hostname', '?')}, " f"v{cfg.get('version', '?')})") except (URLError, OSError) as e: print(f"HMC472A: cannot reach {args.attenuator} ({e})") print(" Check network connection or use --attenuator ") sys.exit(1) # Set up NanoVNA nanovna = None if mock_mode: nanovna = MockNanoVNA() print("NanoVNA: mock mode") elif args.nanovna == "auto": NanoVNA = try_import_nanovna() if NanoVNA: try: nanovna = NanoVNA() print(f"NanoVNA: auto mode (mcnanovna)") except Exception as e: print(f"NanoVNA: mcnanovna failed ({e}), falling back to manual") else: print("NanoVNA: mcnanovna not installed, using manual mode") print(" Install: uv pip install -e /path/to/mcnanovna") else: print("NanoVNA: manual mode (you'll be prompted to set frequencies)") # Load calibration cal = {} if args.cal: cal = load_cal_file(args.cal) print(f"Calibration: loaded {len(cal)} points from {args.cal}") # Open CSV output csv_file = None writer = None if args.output: csv_file, writer = open_csv(args.output) # Open SkyWalker-1 # SAFETY: Boot demodulator WITHOUT enabling LNB power. ensure_booted() # transiently enables LNB voltage (13-18V on the F-connector), which # would travel backward through the attenuator toward the NanoVNA. # The DC blocker protects against this, but code should never rely on # external protection it cannot verify. if mock_mode: sw = _make_mock_skywalker(args.verbose) print("SkyWalker-1: mock mode") else: sw = SkyWalker1(verbose=args.verbose) sw.open() # Ensure LNB power is OFF before booting demodulator sw.start_intersil(on=False) status = sw.get_config() if not (status & 0x01): sw.boot(on=True) time.sleep(0.5) status = sw.get_config() if not (status & 0x01): print("ERROR: Device failed to start") sys.exit(1) print("SkyWalker-1: booted (LNB power kept OFF)") # Confirm LNB power disabled — direct input mode sw.start_intersil(on=False) print("LNB power disabled (direct input mode)") print() try: if args.test == "agc-linearity": test_agc_linearity(sw, atten, nanovna, args.freq, writer, csv_file, cal, args.settle) elif args.test == "band-flatness": test_band_flatness(sw, atten, nanovna, args.start, args.stop, args.step, writer, csv_file, cal, args.settle) elif args.test == "freq-accuracy": freqs = [float(f) for f in args.freqs.split(",")] test_freq_accuracy(sw, atten, nanovna, freqs, writer, csv_file, args.settle) elif args.test == "mds": test_mds(sw, atten, nanovna, args.freq, writer, csv_file, args.settle) elif args.test == "bpsk-probe": test_bpsk_probe(sw, atten, nanovna, args.freq, writer, csv_file, args.settle) except KeyboardInterrupt: print("\n\nInterrupted by operator.") finally: # Safe state: maximum attenuation before releasing hardware try: atten.set_db(31.5) print("Attenuator set to 31.5 dB (safe state)") except Exception: pass # best-effort on cleanup path if csv_file: csv_file.flush() csv_file.close() print(f"\nData saved to {args.output}") if not mock_mode: sw.close() if __name__ == "__main__": main()