skywalker-1/tools/rf_testbench.py
Ryan Malloy a12a394099 Add FixedAttenuator class and udev rules for RF test bench
FixedAttenuator supports --attenuator fixed:XX for non-programmable
inline SMA pads (set_db is a no-op returning the fixed value).
Udev rules grant non-root USB access for NanoVNA and HMC472A.
2026-02-20 10:56:36 -07:00

987 lines
34 KiB
Python

#!/usr/bin/env python3
"""
RF Test Bench — CW injection tests with NanoVNA + HMC472A + SkyWalker-1.
Injects CW signals from a NanoVNA-H through a programmable HMC472A digital
attenuator into the SkyWalker-1 receiver. Runs automated test sequences to
characterize AGC linearity, IF band flatness, frequency accuracy, minimum
detectable signal, and BPSK mode 9 behavior.
Hardware setup:
NanoVNA CH0 → DC Blocker → HMC472A → SMA-to-F → SkyWalker-1
The HMC472A (0-31.5 dB, 0.5 dB steps) is controlled via USB serial (preferred)
or REST API. The NanoVNA provides CW at a fixed frequency, controlled either
via mcnanovna or manually. The SkyWalker-1 measures AGC, SNR, and lock status.
Usage:
python rf_testbench.py agc-linearity --freq 1200
python rf_testbench.py band-flatness --start 950 --stop 1500 --step 10
python rf_testbench.py freq-accuracy --freqs 1000,1200,1400
python rf_testbench.py mds --freq 1200
python rf_testbench.py bpsk-probe --freq 1200
python rf_testbench.py --help
"""
import sys
import os
import argparse
import csv
import json
import time
from datetime import datetime, timezone
from urllib.request import urlopen, Request
from urllib.error import URLError
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from skywalker_lib import SkyWalker1, MODULATIONS
# --- HMC472A REST client ---
class HMC472A:
"""Control the HMC472A digital attenuator via its ESP32-S2 REST API."""
def __init__(self, base_url: str = "http://attenuator.local"):
self.base_url = base_url.rstrip("/")
def _get(self, path: str, retries: int = 3) -> dict:
url = f"{self.base_url}{path}"
req = Request(url)
last_err: OSError = OSError("no attempts made")
for attempt in range(retries):
try:
with urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
except (URLError, OSError) as e:
last_err = e
if attempt < retries - 1:
time.sleep(0.2 * (attempt + 1))
raise last_err
def _post(self, path: str, data: dict, retries: int = 3) -> dict:
url = f"{self.base_url}{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, method="POST",
headers={"Content-Type": "application/json"})
last_err: OSError = OSError("no attempts made")
for attempt in range(retries):
try:
with urlopen(req, timeout=5) as resp:
return json.loads(resp.read())
except (URLError, OSError) as e:
last_err = e
if attempt < retries - 1:
time.sleep(0.2 * (attempt + 1))
raise last_err
def status(self) -> dict:
return self._get("/status")
def set_db(self, attenuation_db: float) -> dict:
clamped = max(0.0, min(31.5, attenuation_db))
rounded = round(clamped * 2) / 2 # Snap to 0.5 dB steps
return self._post("/set", {"attenuation_db": rounded})
def config(self) -> dict:
return self._get("/config")
# --- HMC472A USB serial client ---
class HMC472ASerial:
"""Control the HMC472A digital attenuator via USB CDC serial.
Uses the usb-serial-json-v1 protocol: one JSON object per newline-
terminated line in each direction. Requires pyserial.
"""
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.0):
import serial
self.ser = serial.Serial(port, baudrate, timeout=timeout)
self.ser.reset_input_buffer()
def close(self):
if self.ser and self.ser.is_open:
self.ser.close()
def _cmd(self, command: dict) -> dict:
line = json.dumps(command, separators=(",", ":")) + "\n"
self.ser.write(line.encode())
self.ser.flush()
resp_line = self.ser.readline()
if not resp_line:
raise TimeoutError("no response from HMC472A")
resp = json.loads(resp_line)
if not resp.get("ok"):
raise RuntimeError(resp.get("error", "unknown error"))
return resp
def status(self) -> dict:
return self._cmd({"cmd": "status"})
def set_db(self, attenuation_db: float) -> dict:
clamped = max(0.0, min(31.5, attenuation_db))
rounded = round(clamped * 2) / 2
return self._cmd({"cmd": "set", "db": rounded})
def config(self) -> dict:
return self._cmd({"cmd": "config"})
def identify(self) -> dict:
return self._cmd({"cmd": "identify"})
def detect_hmc472a_serial() -> str | None:
"""Scan /dev/ttyACM* ports for an HMC472A responding to identify.
Returns the port path if found, None otherwise.
"""
import glob
try:
import serial
except ImportError:
return None
ports = sorted(glob.glob("/dev/ttyACM*"))
for port in ports:
try:
ser = serial.Serial(port, 115200, timeout=0.5)
ser.reset_input_buffer()
ser.write(b'{"cmd":"identify"}\n')
ser.flush()
resp_line = ser.readline()
ser.close()
if resp_line:
resp = json.loads(resp_line)
if resp.get("device") == "hmc472a-attenuator":
return port
except (OSError, json.JSONDecodeError, ValueError):
continue
return None
class MockSkyWalker1:
"""Lightweight mock SkyWalker-1 for rf_testbench testing."""
def __init__(self, verbose=False):
self.verbose = verbose
self._freq_khz = 0
def open(self):
pass
def close(self):
pass
def ensure_booted(self):
pass
def start_intersil(self, on=True):
pass
def tune_monitor(self, symbol_rate_sps=1000000, freq_khz=1200000,
mod_index=0, fec_index=5, dwell_ms=10):
self._freq_khz = freq_khz
# Simulate AGC response: higher freq → slightly lower power
base_agc1 = 1200 + (freq_khz - 1200000) // 100
return {
"snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0,
"agc1": max(100, base_agc1), "agc2": 750,
"power_db": -46.1 - (freq_khz - 1200000) / 500000,
"locked": False, "lock": 0x00, "status": 0x01,
"dwell_ms": dwell_ms,
}
def signal_monitor(self):
return {
"snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5,
"agc1": 1200, "agc2": 800, "power_db": -45.3,
"locked": False, "lock": 0x00, "status": 0x01,
}
def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0,
dwell_ms=15, sr_ksps=1000, mod_index=0, fec_index=5):
n = int((stop_mhz - start_mhz) / step_mhz) + 1
freqs = [start_mhz + i * step_mhz for i in range(n)]
powers = [-50.0 + 3.0 * (1.0 - abs(f - 1200) / 300) for f in freqs]
raw = [{"agc1": 1200, "agc2": 750, "power_db": p,
"snr_raw": 0, "snr_db": 0, "locked": False,
"lock": 0, "status": 0} for p in powers]
return freqs, powers, raw
def _make_mock_skywalker(verbose=False):
sw = MockSkyWalker1(verbose=verbose)
sw.open()
sw.ensure_booted()
return sw
class FixedAttenuator:
"""Stand-in for a fixed (non-programmable) inline attenuator.
Reports the declared fixed attenuation for every set_db() call.
Used when testing with a fixed pad instead of the HMC472A.
"""
def __init__(self, fixed_db: float = 20.0):
self._fixed_db = fixed_db
def status(self) -> dict:
return {"attenuation_db": self._fixed_db, "step": 0,
"version": "fixed-pad", "note": "non-programmable"}
def set_db(self, attenuation_db: float) -> dict:
# Can't change a fixed pad — just return what it is
return self.status()
def config(self) -> dict:
return {"db_min": self._fixed_db, "db_max": self._fixed_db,
"db_step": 0, "version": "fixed-pad",
"hostname": f"fixed-{self._fixed_db:.1f}dB"}
class MockHMC472A:
"""Mock attenuator for testing without hardware."""
def __init__(self, base_url: str = "http://mock.local"):
self.base_url = base_url
self._db = 0.0
def status(self) -> dict:
step = int(self._db * 2)
return {"attenuation_db": self._db, "step": step, "version": "mock"}
def set_db(self, attenuation_db: float) -> dict:
self._db = max(0.0, min(31.5, round(attenuation_db * 2) / 2))
return self.status()
def config(self) -> dict:
return {"db_min": 0.0, "db_max": 31.5, "db_step": 0.5,
"version": "mock", "hostname": "mock-attenuator"}
# --- NanoVNA control ---
def try_import_nanovna():
"""Try to import mcnanovna for automated NanoVNA control."""
try:
from mcnanovna.nanovna import NanoVNA
return NanoVNA
except ImportError:
return None
class MockNanoVNA:
"""Mock NanoVNA for testing without hardware."""
def cw(self, frequency_hz: int = 0, power: int = 3):
pass
def manual_nanovna_set(freq_mhz: float, power: int = 3) -> None:
"""Prompt the user to manually set NanoVNA CW frequency."""
print(f"\n >>> Set NanoVNA to CW at {freq_mhz:.3f} MHz, power={power}")
input(" Press Enter when ready...")
# --- CSV output ---
CSV_COLUMNS = [
"timestamp", "test_name", "freq_mhz", "atten_db",
"agc1", "agc2", "power_db", "snr_raw", "snr_db",
"locked", "lock_raw", "status", "notes",
]
def open_csv(path: str):
f = open(path, "w", newline="")
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
writer.writeheader()
return f, writer
def write_row(writer, csv_file, test_name: str, freq_mhz: float,
atten_db: float, result: dict, notes: str = ""):
writer.writerow({
"timestamp": datetime.now(timezone.utc).isoformat(),
"test_name": test_name,
"freq_mhz": f"{freq_mhz:.3f}",
"atten_db": f"{atten_db:.1f}",
"agc1": result.get("agc1", 0),
"agc2": result.get("agc2", 0),
"power_db": f"{result.get('power_db', 0):.2f}",
"snr_raw": result.get("snr_raw", 0),
"snr_db": f"{result.get('snr_db', 0):.2f}",
"locked": result.get("locked", False),
"lock_raw": f"0x{result.get('lock', 0):02X}",
"status": f"0x{result.get('status', 0):02X}",
"notes": notes,
})
if csv_file:
csv_file.flush()
# --- Calibration ---
def load_cal_file(path: str) -> dict:
"""Load a NanoVNA S21 path-loss calibration CSV.
Expects columns: frequency_hz (or freq_mhz), s21_db (or loss_db).
Returns dict mapping freq_mhz -> loss_db (positive = loss).
"""
cal = {}
with open(path) as f:
reader = csv.DictReader(f)
for row in reader:
if "freq_mhz" in row:
freq = float(row["freq_mhz"])
elif "frequency_hz" in row:
freq = float(row["frequency_hz"]) / 1e6
else:
continue
if "loss_db" in row:
loss = float(row["loss_db"])
elif "s21_db" in row:
loss = -float(row["s21_db"]) # S21 is negative, loss is positive
else:
continue
cal[freq] = loss
return cal
def interpolate_loss(cal: dict, freq_mhz: float) -> float:
"""Interpolate path loss at a frequency from cal data."""
if not cal:
return 0.0
freqs = sorted(cal.keys())
if freq_mhz <= freqs[0]:
return cal[freqs[0]]
if freq_mhz >= freqs[-1]:
return cal[freqs[-1]]
for i in range(len(freqs) - 1):
if freqs[i] <= freq_mhz <= freqs[i + 1]:
f0, f1 = freqs[i], freqs[i + 1]
t = (freq_mhz - f0) / (f1 - f0)
return cal[f0] + t * (cal[f1] - cal[f0])
return 0.0
# --- Test: AGC Power Linearity ---
def test_agc_linearity(sw, atten, nanovna, freq_mhz: float,
writer, csv_file, cal: dict, settle_ms: int) -> list:
"""Sweep attenuator from 0 to 31.5 dB and record AGC at each step.
Maps the AGC transfer function: how AGC register values respond to
known changes in input power.
"""
print(f"\n=== AGC Linearity Test at {freq_mhz:.1f} MHz ===")
results = []
path_loss = interpolate_loss(cal, freq_mhz)
if path_loss > 0:
print(f" Calibrated path loss: {path_loss:.1f} dB")
# Set NanoVNA to CW at the test frequency
if nanovna:
nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3)
print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3")
else:
manual_nanovna_set(freq_mhz, power=3)
# Tune SkyWalker-1 to the frequency
freq_khz = int(freq_mhz * 1000)
print(f"\n {'Atten dB':>9} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} "
f"{'SNR raw':>8} {'Lock':>5}")
print(f" {'' * 9} {'' * 6} {'' * 6} {'' * 9} {'' * 8} {'' * 5}")
# Sweep in 0.5 dB steps from 0 to 31.5 dB (64 steps)
# Use integer step counter to avoid IEEE 754 float accumulation drift
for step in range(64): # 0, 1, 2, ... 63 → 0.0, 0.5, 1.0, ... 31.5
atten_db = step * 0.5
atten.set_db(atten_db)
time.sleep(settle_ms / 1000.0)
result = sw.tune_monitor(
symbol_rate_sps=1000000, freq_khz=freq_khz,
mod_index=0, fec_index=5, dwell_ms=50
)
locked = "Y" if result.get("locked") else "N"
print(f" {atten_db:9.1f} {result['agc1']:6d} {result['agc2']:6d} "
f"{result['power_db']:9.2f} {result['snr_raw']:8d} {locked:>5}")
effective_atten = atten_db + path_loss
note = f"effective_atten={effective_atten:.1f}dB"
if writer:
write_row(writer, csv_file, "agc_linearity", freq_mhz, atten_db,
result, note)
results.append({"atten_db": atten_db, **result})
# Summary
if results:
agc1_min = min(r["agc1"] for r in results)
agc1_max = max(r["agc1"] for r in results)
print(f"\n AGC1 range: {agc1_min} - {agc1_max} "
f"(delta={agc1_max - agc1_min}) over 31.5 dB sweep")
return results
# --- Test: IF Band Flatness ---
def test_band_flatness(sw, atten, nanovna, start_mhz: float,
stop_mhz: float, step_mhz: float,
writer, csv_file, cal: dict, settle_ms: int) -> list:
"""Sweep CW across the IF band and record AGC at each frequency.
Reveals tuner gain slope, passband ripple, and the IF filter response.
"""
atten_db = 10.0 # Fixed attenuation — mid-range for good dynamic range
print(f"\n=== IF Band Flatness: {start_mhz:.0f}-{stop_mhz:.0f} MHz, "
f"step={step_mhz:.1f} MHz ===")
print(f" HMC472A fixed at {atten_db:.1f} dB")
atten.set_db(atten_db)
results = []
# Use integer step counter to avoid float accumulation drift
n_steps = int(round((stop_mhz - start_mhz) / step_mhz)) + 1
print(f"\n {'Step':>5} {'Freq MHz':>9} {'AGC1':>6} {'AGC2':>6} "
f"{'Power dB':>9} {'PathLoss':>9} {'Corr dB':>8}")
print(f" {'' * 5} {'' * 9} {'' * 6} {'' * 6} "
f"{'' * 9} {'' * 9} {'' * 8}")
for step_num in range(n_steps):
freq_mhz = start_mhz + step_num * step_mhz
# Set NanoVNA CW
if nanovna:
nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3)
else:
manual_nanovna_set(freq_mhz, power=3)
time.sleep(settle_ms / 1000.0)
# Tune SkyWalker-1
freq_khz = int(freq_mhz * 1000)
result = sw.tune_monitor(
symbol_rate_sps=1000000, freq_khz=freq_khz,
mod_index=0, fec_index=5, dwell_ms=50
)
path_loss = interpolate_loss(cal, freq_mhz)
corrected = result["power_db"] + path_loss
print(f" {step_num + 1:5d} {freq_mhz:9.1f} {result['agc1']:6d} "
f"{result['agc2']:6d} {result['power_db']:9.2f} "
f"{path_loss:9.1f} {corrected:8.2f}")
note = f"path_loss={path_loss:.1f}dB corrected={corrected:.2f}dB"
if writer:
write_row(writer, csv_file, "band_flatness", freq_mhz, atten_db,
result, note)
results.append({"freq_mhz": freq_mhz, "corrected_db": corrected, **result})
# Summary
if results:
powers = [r["corrected_db"] for r in results]
ripple = max(powers) - min(powers)
print(f"\n Band flatness: {ripple:.2f} dB ripple "
f"(min={min(powers):.2f}, max={max(powers):.2f})")
return results
# --- Test: Frequency Accuracy ---
def test_freq_accuracy(sw, atten, nanovna, test_freqs: list,
writer, csv_file, settle_ms: int) -> list:
"""Inject CW at known frequencies, sweep SkyWalker-1 around each one.
Compares detected peak vs. injected frequency to characterize the
BCM3440 tuner's frequency accuracy.
"""
print(f"\n=== Frequency Accuracy Test ===")
atten_db = 10.0
atten.set_db(atten_db)
results = []
sweep_span_mhz = 10.0 # Sweep +/- 5 MHz around each test freq
sweep_step_mhz = 1.0
for inject_freq in test_freqs:
print(f"\n Injecting CW at {inject_freq:.3f} MHz...")
if nanovna:
nanovna.cw(frequency_hz=int(inject_freq * 1e6), power=3)
else:
manual_nanovna_set(inject_freq, power=3)
time.sleep(settle_ms / 1000.0)
# Sweep around the expected frequency
sweep_start = inject_freq - sweep_span_mhz / 2
sweep_stop = inject_freq + sweep_span_mhz / 2
freqs, powers, raw = sw.sweep_spectrum(
sweep_start, sweep_stop,
step_mhz=sweep_step_mhz, dwell_ms=50,
sr_ksps=1000, mod_index=0, fec_index=5,
)
# Find peak
if powers:
peak_idx = max(range(len(powers)), key=lambda i: powers[i])
peak_freq = freqs[peak_idx]
peak_power = powers[peak_idx]
error_mhz = peak_freq - inject_freq
error_khz = error_mhz * 1000
print(f" Injected: {inject_freq:.3f} MHz "
f"Detected peak: {peak_freq:.3f} MHz "
f"Error: {error_khz:+.0f} kHz")
result_entry = {
"inject_freq_mhz": inject_freq,
"peak_freq_mhz": peak_freq,
"error_khz": error_khz,
"peak_power_db": peak_power,
}
results.append(result_entry)
if writer:
peak_result = raw[peak_idx] if isinstance(raw[peak_idx], dict) else {
"agc1": 0, "agc2": 0, "power_db": peak_power,
"snr_raw": 0, "snr_db": 0,
"locked": False, "lock": 0, "status": 0,
}
write_row(writer, csv_file, "freq_accuracy", inject_freq,
atten_db, peak_result,
f"peak={peak_freq:.3f}MHz error={error_khz:+.0f}kHz")
# Summary
if results:
errors = [r["error_khz"] for r in results]
mean_err = sum(errors) / len(errors)
max_err = max(abs(e) for e in errors)
print(f"\n Mean frequency error: {mean_err:+.0f} kHz")
print(f" Max absolute error: {max_err:.0f} kHz")
return results
# --- Test: Minimum Detectable Signal ---
def test_mds(sw, atten, nanovna, freq_mhz: float,
writer, csv_file, settle_ms: int) -> dict:
"""Find the minimum detectable signal level.
Measures noise floor with NanoVNA off (or max attenuation), then
increases attenuation from 0 until the CW signal is indistinguishable
from noise.
"""
print(f"\n=== Minimum Detectable Signal at {freq_mhz:.1f} MHz ===")
freq_khz = int(freq_mhz * 1000)
# Step 1: measure noise floor (max attenuation)
print(" Measuring noise floor (31.5 dB attenuation)...")
atten.set_db(31.5)
time.sleep(0.2)
noise_readings = []
for _ in range(10):
r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50)
noise_readings.append(r["power_db"])
time.sleep(0.05)
noise_floor = sum(noise_readings) / len(noise_readings)
noise_std = (sum((x - noise_floor) ** 2 for x in noise_readings)
/ len(noise_readings)) ** 0.5
threshold = noise_floor + max(3.0 * noise_std, 1.0) # 3-sigma above noise
print(f" Noise floor: {noise_floor:.2f} dB (std={noise_std:.3f})")
print(f" Detection threshold: {threshold:.2f} dB (noise + 3sigma)")
# Step 2: inject CW and increase attenuation until signal disappears
if nanovna:
nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3)
print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3")
else:
manual_nanovna_set(freq_mhz, power=3)
print(f"\n {'Atten dB':>9} {'Power dB':>9} {'Above noise':>12} {'Detected':>9}")
print(f" {'' * 9} {'' * 9} {'' * 12} {'' * 9}")
mds_atten = None
# 1 dB steps: 0, 1, 2, ... 31 (32 steps)
for step in range(32):
atten_db = float(step)
atten.set_db(atten_db)
time.sleep(settle_ms / 1000.0)
# Average 5 readings for stability
readings = []
r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50)
readings.append(r["power_db"])
for _ in range(4):
time.sleep(0.02)
r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50)
readings.append(r["power_db"])
avg_power = sum(readings) / len(readings)
above_noise = avg_power - noise_floor
detected = avg_power > threshold
marker = "YES" if detected else "---"
print(f" {atten_db:9.1f} {avg_power:9.2f} {above_noise:+12.2f} "
f"{marker:>9}")
if writer:
# Use averaged power instead of last single reading
avg_result = dict(r)
avg_result["power_db"] = avg_power
write_row(writer, csv_file, "mds", freq_mhz, atten_db,
avg_result,
f"avg={avg_power:.2f} noise={noise_floor:.2f} "
f"detected={'Y' if detected else 'N'}")
if not detected and mds_atten is None:
mds_atten = atten_db
result = {
"freq_mhz": freq_mhz,
"noise_floor_db": noise_floor,
"noise_std": noise_std,
"threshold_db": threshold,
"mds_atten_db": mds_atten,
}
if mds_atten is not None:
print(f"\n Signal lost at {mds_atten:.1f} dB attenuation")
print(f" (NanoVNA output ~-15 dBm minus {mds_atten:.1f} dB path = "
f"~{-15 - mds_atten:.0f} dBm at receiver)")
else:
print(f"\n Signal detected at all attenuation levels (0-31.5 dB)")
print(f" Need more attenuation to find MDS")
return result
# --- Test: BPSK Mode 9 CW Probe ---
def test_bpsk_probe(sw, atten, nanovna, freq_mhz: float,
writer, csv_file, settle_ms: int) -> dict:
"""Probe BPSK mode 9 response to an unmodulated CW carrier.
BPSK mode (index 9) uses Viterbi rate 1/2 K=7 — the same inner FEC
as GOES LRIT. A CW carrier has no modulation, so the demodulator
shouldn't lock, but the AGC and carrier recovery behavior reveals
how mode 9 handles a clean carrier.
"""
print(f"\n=== BPSK Mode 9 CW Probe at {freq_mhz:.1f} MHz ===")
bpsk_index = MODULATIONS["bpsk"][0] # Mode 9
freq_khz = int(freq_mhz * 1000)
atten_db = 10.0
atten.set_db(atten_db)
if nanovna:
nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3)
else:
manual_nanovna_set(freq_mhz, power=3)
time.sleep(settle_ms / 1000.0)
# Test with different symbol rates typical of LRIT-like signals
test_rates = [293883, 500000, 1000000, 5000000]
print(f"\n {'SR (sps)':>10} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} "
f"{'SNR raw':>8} {'SNR dB':>7} {'Lock':>6} {'Status':>8}")
print(f" {'' * 10} {'' * 6} {'' * 6} {'' * 9} "
f"{'' * 8} {'' * 7} {'' * 6} {'' * 8}")
results = []
for sr in test_rates:
# FEC 1/2 (index 0) for BPSK mode
result = sw.tune_monitor(sr, freq_khz, bpsk_index, 0, dwell_ms=100)
locked = "Y" if result.get("locked") else "N"
print(f" {sr:10d} {result['agc1']:6d} {result['agc2']:6d} "
f"{result['power_db']:9.2f} {result['snr_raw']:8d} "
f"{result['snr_db']:7.2f} {locked:>6} "
f"0x{result.get('status', 0):02X}")
if writer:
write_row(writer, csv_file, "bpsk_probe", freq_mhz, atten_db,
result, f"mode=bpsk sr={sr} fec=1/2")
results.append({"symbol_rate": sr, **result})
# Compare with QPSK mode 0 at same settings
print(f"\n Reference: QPSK mode 0 at same frequency")
ref = sw.tune_monitor(1000000, freq_khz, 0, 5, dwell_ms=100)
ref_locked = "Y" if ref.get("locked") else "N"
print(f" {'1000000':>10} {ref['agc1']:6d} {ref['agc2']:6d} "
f"{ref['power_db']:9.2f} {ref['snr_raw']:8d} "
f"{ref['snr_db']:7.2f} {ref_locked:>6} "
f"0x{ref.get('status', 0):02X}")
return {"bpsk_results": results, "qpsk_reference": ref}
# --- Main ---
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="rf_testbench.py",
description="CW injection test bench: NanoVNA + HMC472A + SkyWalker-1",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""\
examples:
%(prog)s agc-linearity --freq 1200
%(prog)s band-flatness --start 950 --stop 1500 --step 10
%(prog)s freq-accuracy --freqs 1000,1200,1400
%(prog)s mds --freq 1200
%(prog)s bpsk-probe --freq 1200
%(prog)s band-flatness --attenuator /dev/ttyACM1
%(prog)s agc-linearity --attenuator http://attenuator.local --freq 1200
hardware setup:
NanoVNA CH0 → DC Blocker → HMC472A (0-31.5 dB) → SMA-to-F → SkyWalker-1
The HMC472A is controlled via USB serial (preferred) or HTTP REST API.
Use --attenuator auto (default) to auto-detect USB, falling back to HTTP.
The NanoVNA provides CW, controlled via mcnanovna or manually.
LNB power is disabled (direct L-band input mode).
""",
)
parser.add_argument("-v", "--verbose", action="store_true",
help="Show raw USB traffic")
parser.add_argument("-o", "--output", type=str, default=None,
help="CSV output file path")
parser.add_argument("--cal", type=str, default=None,
help="Path loss calibration CSV (NanoVNA S21 sweep)")
parser.add_argument("--attenuator", type=str, default="auto",
help="HMC472A connection: 'auto' (USB then HTTP), "
"/dev/ttyACMx (USB serial), http://host (REST), "
"or 'fixed:20' for a non-programmable pad "
"(default: auto)")
parser.add_argument("--nanovna", choices=["auto", "manual"],
default="auto",
help="NanoVNA control mode (default: auto via mcnanovna)")
parser.add_argument("--settle", type=int, default=200,
help="Settle time in ms after changing attenuation "
"(default: 200)")
sub = parser.add_subparsers(dest="test", required=True)
# AGC linearity
p_agc = sub.add_parser("agc-linearity",
help="Sweep attenuation at fixed freq, map AGC curve")
p_agc.add_argument("--freq", type=float, default=1200.0,
help="Test frequency in MHz (default: 1200)")
# Band flatness
p_band = sub.add_parser("band-flatness",
help="Sweep CW across IF band, measure AGC response")
p_band.add_argument("--start", type=float, default=950.0,
help="Start frequency in MHz (default: 950)")
p_band.add_argument("--stop", type=float, default=1500.0,
help="Stop frequency in MHz (default: 1500)")
p_band.add_argument("--step", type=float, default=10.0,
help="Frequency step in MHz (default: 10)")
# Frequency accuracy
p_freq = sub.add_parser("freq-accuracy",
help="Inject CW at known freqs, measure error")
p_freq.add_argument("--freqs", type=str, default="1000,1100,1200,1300,1400",
help="Comma-separated test frequencies in MHz")
# Minimum detectable signal
p_mds = sub.add_parser("mds",
help="Find minimum detectable signal level")
p_mds.add_argument("--freq", type=float, default=1200.0,
help="Test frequency in MHz (default: 1200)")
# BPSK mode 9 probe
p_bpsk = sub.add_parser("bpsk-probe",
help="Probe BPSK mode 9 with CW carrier")
p_bpsk.add_argument("--freq", type=float, default=1200.0,
help="Test frequency in MHz (default: 1200)")
return parser
def _connect_attenuator(target: str):
"""Connect to HMC472A via auto-detect, USB serial, HTTP REST, or fixed pad.
Args:
target: "auto", "fixed:XX" (dB), /dev/ttyACM* (USB), or http://... (REST)
"""
# Fixed attenuator mode (non-programmable inline pad)
if target.startswith("fixed:"):
try:
fixed_db = float(target.split(":", 1)[1])
except ValueError:
print(f"HMC472A: invalid fixed value '{target}' (use fixed:20)")
sys.exit(1)
atten = FixedAttenuator(fixed_db)
print(f"HMC472A: fixed {fixed_db:.1f} dB pad (non-programmable)")
return atten
# Auto-detect: try USB serial first, fall back to HTTP
if target == "auto":
port = detect_hmc472a_serial()
if port:
print(f"HMC472A: auto-detected USB serial on {port}")
target = port
else:
print("HMC472A: no USB device found, trying HTTP...")
target = "http://attenuator.local"
# USB serial path
if target.startswith("/dev/"):
try:
atten = HMC472ASerial(target)
info = atten.identify()
print(f"HMC472A: USB serial on {target} "
f"(v{info.get('version', '?')}, "
f"protocol {info.get('protocol', '?')})")
return atten
except ImportError:
print("HMC472A: pyserial not installed (pip install pyserial)")
sys.exit(1)
except (OSError, TimeoutError) as e:
print(f"HMC472A: cannot open {target} ({e})")
sys.exit(1)
# HTTP REST API
atten = HMC472A(target)
try:
cfg = atten.config()
print(f"HMC472A: HTTP on {target} ({cfg.get('hostname', '?')}, "
f"v{cfg.get('version', '?')})")
return atten
except (URLError, OSError) as e:
print(f"HMC472A: cannot reach {target} ({e})")
print(" Use --attenuator /dev/ttyACMx (USB) or http://host (HTTP)")
sys.exit(1)
def main():
parser = build_parser()
args = parser.parse_args()
# Mock mode for testing without hardware
mock_mode = os.environ.get("SKYWALKER_MOCK")
# Set up HMC472A attenuator
if mock_mode:
atten = MockHMC472A()
print("HMC472A: mock mode")
else:
atten = _connect_attenuator(args.attenuator)
# Set up NanoVNA
nanovna = None
if mock_mode:
nanovna = MockNanoVNA()
print("NanoVNA: mock mode")
elif args.nanovna == "auto":
NanoVNA = try_import_nanovna()
if NanoVNA:
try:
nanovna = NanoVNA()
print(f"NanoVNA: auto mode (mcnanovna)")
except Exception as e:
print(f"NanoVNA: mcnanovna failed ({e}), falling back to manual")
else:
print("NanoVNA: mcnanovna not installed, using manual mode")
print(" Install: uv pip install -e /path/to/mcnanovna")
else:
print("NanoVNA: manual mode (you'll be prompted to set frequencies)")
# Load calibration
cal = {}
if args.cal:
cal = load_cal_file(args.cal)
print(f"Calibration: loaded {len(cal)} points from {args.cal}")
# Open CSV output
csv_file = None
writer = None
if args.output:
csv_file, writer = open_csv(args.output)
# Open SkyWalker-1
# SAFETY: Boot demodulator WITHOUT enabling LNB power. ensure_booted()
# transiently enables LNB voltage (13-18V on the F-connector), which
# would travel backward through the attenuator toward the NanoVNA.
# The DC blocker protects against this, but code should never rely on
# external protection it cannot verify.
if mock_mode:
sw = _make_mock_skywalker(args.verbose)
print("SkyWalker-1: mock mode")
else:
sw = SkyWalker1(verbose=args.verbose)
sw.open()
# Ensure LNB power is OFF before booting demodulator
sw.start_intersil(on=False)
status = sw.get_config()
if not (status & 0x01):
sw.boot(on=True)
time.sleep(0.5)
status = sw.get_config()
if not (status & 0x01):
print("ERROR: Device failed to start")
sys.exit(1)
print("SkyWalker-1: booted (LNB power kept OFF)")
# Confirm LNB power disabled — direct input mode
sw.start_intersil(on=False)
print("LNB power disabled (direct input mode)")
print()
try:
if args.test == "agc-linearity":
test_agc_linearity(sw, atten, nanovna, args.freq,
writer, csv_file, cal, args.settle)
elif args.test == "band-flatness":
test_band_flatness(sw, atten, nanovna, args.start, args.stop,
args.step, writer, csv_file, cal, args.settle)
elif args.test == "freq-accuracy":
freqs = [float(f) for f in args.freqs.split(",")]
test_freq_accuracy(sw, atten, nanovna, freqs,
writer, csv_file, args.settle)
elif args.test == "mds":
test_mds(sw, atten, nanovna, args.freq,
writer, csv_file, args.settle)
elif args.test == "bpsk-probe":
test_bpsk_probe(sw, atten, nanovna, args.freq,
writer, csv_file, args.settle)
except KeyboardInterrupt:
print("\n\nInterrupted by operator.")
finally:
# Safe state: maximum attenuation before releasing hardware
try:
atten.set_db(31.5)
print("Attenuator set to 31.5 dB (safe state)")
except Exception:
pass # best-effort on cleanup path
if csv_file:
csv_file.flush()
csv_file.close()
print(f"\nData saved to {args.output}")
if not mock_mode:
sw.close()
if __name__ == "__main__":
main()