HMC472ASerial class implements usb-serial-json-v1 protocol over the ESP32-S3's native USB CDC port. Auto-detection scans /dev/ttyACM* and probes with the identify command to find the right port. --attenuator flag now defaults to 'auto' (USB first, HTTP fallback). Also accepts direct serial port paths or HTTP URLs for explicit control.
951 lines
34 KiB
Python
951 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
RF Test Bench — CW injection tests with NanoVNA + HMC472A + SkyWalker-1.
|
|
|
|
Injects CW signals from a NanoVNA-H through a programmable HMC472A digital
|
|
attenuator into the SkyWalker-1 receiver. Runs automated test sequences to
|
|
characterize AGC linearity, IF band flatness, frequency accuracy, minimum
|
|
detectable signal, and BPSK mode 9 behavior.
|
|
|
|
Hardware setup:
|
|
NanoVNA CH0 → DC Blocker → HMC472A → SMA-to-F → SkyWalker-1
|
|
|
|
The HMC472A (0-31.5 dB, 0.5 dB steps) is controlled via USB serial (preferred)
|
|
or REST API. The NanoVNA provides CW at a fixed frequency, controlled either
|
|
via mcnanovna or manually. The SkyWalker-1 measures AGC, SNR, and lock status.
|
|
|
|
Usage:
|
|
python rf_testbench.py agc-linearity --freq 1200
|
|
python rf_testbench.py band-flatness --start 950 --stop 1500 --step 10
|
|
python rf_testbench.py freq-accuracy --freqs 1000,1200,1400
|
|
python rf_testbench.py mds --freq 1200
|
|
python rf_testbench.py bpsk-probe --freq 1200
|
|
python rf_testbench.py --help
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from urllib.request import urlopen, Request
|
|
from urllib.error import URLError
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from skywalker_lib import SkyWalker1, MODULATIONS
|
|
|
|
|
|
# --- HMC472A REST client ---
|
|
|
|
class HMC472A:
|
|
"""Control the HMC472A digital attenuator via its ESP32-S2 REST API."""
|
|
|
|
def __init__(self, base_url: str = "http://attenuator.local"):
|
|
self.base_url = base_url.rstrip("/")
|
|
|
|
def _get(self, path: str, retries: int = 3) -> dict:
|
|
url = f"{self.base_url}{path}"
|
|
req = Request(url)
|
|
last_err: OSError = OSError("no attempts made")
|
|
for attempt in range(retries):
|
|
try:
|
|
with urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read())
|
|
except (URLError, OSError) as e:
|
|
last_err = e
|
|
if attempt < retries - 1:
|
|
time.sleep(0.2 * (attempt + 1))
|
|
raise last_err
|
|
|
|
def _post(self, path: str, data: dict, retries: int = 3) -> dict:
|
|
url = f"{self.base_url}{path}"
|
|
body = json.dumps(data).encode()
|
|
req = Request(url, data=body, method="POST",
|
|
headers={"Content-Type": "application/json"})
|
|
last_err: OSError = OSError("no attempts made")
|
|
for attempt in range(retries):
|
|
try:
|
|
with urlopen(req, timeout=5) as resp:
|
|
return json.loads(resp.read())
|
|
except (URLError, OSError) as e:
|
|
last_err = e
|
|
if attempt < retries - 1:
|
|
time.sleep(0.2 * (attempt + 1))
|
|
raise last_err
|
|
|
|
def status(self) -> dict:
|
|
return self._get("/status")
|
|
|
|
def set_db(self, attenuation_db: float) -> dict:
|
|
clamped = max(0.0, min(31.5, attenuation_db))
|
|
rounded = round(clamped * 2) / 2 # Snap to 0.5 dB steps
|
|
return self._post("/set", {"attenuation_db": rounded})
|
|
|
|
def config(self) -> dict:
|
|
return self._get("/config")
|
|
|
|
|
|
# --- HMC472A USB serial client ---
|
|
|
|
class HMC472ASerial:
|
|
"""Control the HMC472A digital attenuator via USB CDC serial.
|
|
|
|
Uses the usb-serial-json-v1 protocol: one JSON object per newline-
|
|
terminated line in each direction. Requires pyserial.
|
|
"""
|
|
|
|
def __init__(self, port: str, baudrate: int = 115200, timeout: float = 1.0):
|
|
import serial
|
|
self.ser = serial.Serial(port, baudrate, timeout=timeout)
|
|
self.ser.reset_input_buffer()
|
|
|
|
def close(self):
|
|
if self.ser and self.ser.is_open:
|
|
self.ser.close()
|
|
|
|
def _cmd(self, command: dict) -> dict:
|
|
line = json.dumps(command, separators=(",", ":")) + "\n"
|
|
self.ser.write(line.encode())
|
|
self.ser.flush()
|
|
resp_line = self.ser.readline()
|
|
if not resp_line:
|
|
raise TimeoutError("no response from HMC472A")
|
|
resp = json.loads(resp_line)
|
|
if not resp.get("ok"):
|
|
raise RuntimeError(resp.get("error", "unknown error"))
|
|
return resp
|
|
|
|
def status(self) -> dict:
|
|
return self._cmd({"cmd": "status"})
|
|
|
|
def set_db(self, attenuation_db: float) -> dict:
|
|
clamped = max(0.0, min(31.5, attenuation_db))
|
|
rounded = round(clamped * 2) / 2
|
|
return self._cmd({"cmd": "set", "db": rounded})
|
|
|
|
def config(self) -> dict:
|
|
return self._cmd({"cmd": "config"})
|
|
|
|
def identify(self) -> dict:
|
|
return self._cmd({"cmd": "identify"})
|
|
|
|
|
|
def detect_hmc472a_serial() -> str | None:
|
|
"""Scan /dev/ttyACM* ports for an HMC472A responding to identify.
|
|
|
|
Returns the port path if found, None otherwise.
|
|
"""
|
|
import glob
|
|
try:
|
|
import serial
|
|
except ImportError:
|
|
return None
|
|
|
|
ports = sorted(glob.glob("/dev/ttyACM*"))
|
|
for port in ports:
|
|
try:
|
|
ser = serial.Serial(port, 115200, timeout=0.5)
|
|
ser.reset_input_buffer()
|
|
ser.write(b'{"cmd":"identify"}\n')
|
|
ser.flush()
|
|
resp_line = ser.readline()
|
|
ser.close()
|
|
if resp_line:
|
|
resp = json.loads(resp_line)
|
|
if resp.get("device") == "hmc472a-attenuator":
|
|
return port
|
|
except (OSError, json.JSONDecodeError, ValueError):
|
|
continue
|
|
return None
|
|
|
|
|
|
class MockSkyWalker1:
|
|
"""Lightweight mock SkyWalker-1 for rf_testbench testing."""
|
|
|
|
def __init__(self, verbose=False):
|
|
self.verbose = verbose
|
|
self._freq_khz = 0
|
|
|
|
def open(self):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def ensure_booted(self):
|
|
pass
|
|
|
|
def start_intersil(self, on=True):
|
|
pass
|
|
|
|
def tune_monitor(self, symbol_rate_sps=1000000, freq_khz=1200000,
|
|
mod_index=0, fec_index=5, dwell_ms=10):
|
|
self._freq_khz = freq_khz
|
|
# Simulate AGC response: higher freq → slightly lower power
|
|
base_agc1 = 1200 + (freq_khz - 1200000) // 100
|
|
return {
|
|
"snr_raw": 180, "snr_db": 7.8, "snr_pct": 39.0,
|
|
"agc1": max(100, base_agc1), "agc2": 750,
|
|
"power_db": -46.1 - (freq_khz - 1200000) / 500000,
|
|
"locked": False, "lock": 0x00, "status": 0x01,
|
|
"dwell_ms": dwell_ms,
|
|
}
|
|
|
|
def signal_monitor(self):
|
|
return {
|
|
"snr_raw": 200, "snr_db": 8.5, "snr_pct": 42.5,
|
|
"agc1": 1200, "agc2": 800, "power_db": -45.3,
|
|
"locked": False, "lock": 0x00, "status": 0x01,
|
|
}
|
|
|
|
def sweep_spectrum(self, start_mhz, stop_mhz, step_mhz=5.0,
|
|
dwell_ms=15, sr_ksps=1000, mod_index=0, fec_index=5):
|
|
n = int((stop_mhz - start_mhz) / step_mhz) + 1
|
|
freqs = [start_mhz + i * step_mhz for i in range(n)]
|
|
powers = [-50.0 + 3.0 * (1.0 - abs(f - 1200) / 300) for f in freqs]
|
|
raw = [{"agc1": 1200, "agc2": 750, "power_db": p,
|
|
"snr_raw": 0, "snr_db": 0, "locked": False,
|
|
"lock": 0, "status": 0} for p in powers]
|
|
return freqs, powers, raw
|
|
|
|
|
|
def _make_mock_skywalker(verbose=False):
|
|
sw = MockSkyWalker1(verbose=verbose)
|
|
sw.open()
|
|
sw.ensure_booted()
|
|
return sw
|
|
|
|
|
|
class MockHMC472A:
|
|
"""Mock attenuator for testing without hardware."""
|
|
|
|
def __init__(self, base_url: str = "http://mock.local"):
|
|
self.base_url = base_url
|
|
self._db = 0.0
|
|
|
|
def status(self) -> dict:
|
|
step = int(self._db * 2)
|
|
return {"attenuation_db": self._db, "step": step, "version": "mock"}
|
|
|
|
def set_db(self, attenuation_db: float) -> dict:
|
|
self._db = max(0.0, min(31.5, round(attenuation_db * 2) / 2))
|
|
return self.status()
|
|
|
|
def config(self) -> dict:
|
|
return {"db_min": 0.0, "db_max": 31.5, "db_step": 0.5,
|
|
"version": "mock", "hostname": "mock-attenuator"}
|
|
|
|
|
|
# --- NanoVNA control ---
|
|
|
|
def try_import_nanovna():
|
|
"""Try to import mcnanovna for automated NanoVNA control."""
|
|
try:
|
|
from mcnanovna.nanovna import NanoVNA
|
|
return NanoVNA
|
|
except ImportError:
|
|
return None
|
|
|
|
|
|
class MockNanoVNA:
|
|
"""Mock NanoVNA for testing without hardware."""
|
|
|
|
def cw(self, frequency_hz: int = 0, power: int = 3):
|
|
pass
|
|
|
|
|
|
def manual_nanovna_set(freq_mhz: float, power: int = 3) -> None:
|
|
"""Prompt the user to manually set NanoVNA CW frequency."""
|
|
print(f"\n >>> Set NanoVNA to CW at {freq_mhz:.3f} MHz, power={power}")
|
|
input(" Press Enter when ready...")
|
|
|
|
|
|
# --- CSV output ---
|
|
|
|
CSV_COLUMNS = [
|
|
"timestamp", "test_name", "freq_mhz", "atten_db",
|
|
"agc1", "agc2", "power_db", "snr_raw", "snr_db",
|
|
"locked", "lock_raw", "status", "notes",
|
|
]
|
|
|
|
|
|
def open_csv(path: str):
|
|
f = open(path, "w", newline="")
|
|
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
|
|
writer.writeheader()
|
|
return f, writer
|
|
|
|
|
|
def write_row(writer, csv_file, test_name: str, freq_mhz: float,
|
|
atten_db: float, result: dict, notes: str = ""):
|
|
writer.writerow({
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"test_name": test_name,
|
|
"freq_mhz": f"{freq_mhz:.3f}",
|
|
"atten_db": f"{atten_db:.1f}",
|
|
"agc1": result.get("agc1", 0),
|
|
"agc2": result.get("agc2", 0),
|
|
"power_db": f"{result.get('power_db', 0):.2f}",
|
|
"snr_raw": result.get("snr_raw", 0),
|
|
"snr_db": f"{result.get('snr_db', 0):.2f}",
|
|
"locked": result.get("locked", False),
|
|
"lock_raw": f"0x{result.get('lock', 0):02X}",
|
|
"status": f"0x{result.get('status', 0):02X}",
|
|
"notes": notes,
|
|
})
|
|
if csv_file:
|
|
csv_file.flush()
|
|
|
|
|
|
# --- Calibration ---
|
|
|
|
def load_cal_file(path: str) -> dict:
|
|
"""Load a NanoVNA S21 path-loss calibration CSV.
|
|
|
|
Expects columns: frequency_hz (or freq_mhz), s21_db (or loss_db).
|
|
Returns dict mapping freq_mhz -> loss_db (positive = loss).
|
|
"""
|
|
cal = {}
|
|
with open(path) as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
if "freq_mhz" in row:
|
|
freq = float(row["freq_mhz"])
|
|
elif "frequency_hz" in row:
|
|
freq = float(row["frequency_hz"]) / 1e6
|
|
else:
|
|
continue
|
|
|
|
if "loss_db" in row:
|
|
loss = float(row["loss_db"])
|
|
elif "s21_db" in row:
|
|
loss = -float(row["s21_db"]) # S21 is negative, loss is positive
|
|
else:
|
|
continue
|
|
cal[freq] = loss
|
|
return cal
|
|
|
|
|
|
def interpolate_loss(cal: dict, freq_mhz: float) -> float:
|
|
"""Interpolate path loss at a frequency from cal data."""
|
|
if not cal:
|
|
return 0.0
|
|
freqs = sorted(cal.keys())
|
|
if freq_mhz <= freqs[0]:
|
|
return cal[freqs[0]]
|
|
if freq_mhz >= freqs[-1]:
|
|
return cal[freqs[-1]]
|
|
for i in range(len(freqs) - 1):
|
|
if freqs[i] <= freq_mhz <= freqs[i + 1]:
|
|
f0, f1 = freqs[i], freqs[i + 1]
|
|
t = (freq_mhz - f0) / (f1 - f0)
|
|
return cal[f0] + t * (cal[f1] - cal[f0])
|
|
return 0.0
|
|
|
|
|
|
# --- Test: AGC Power Linearity ---
|
|
|
|
def test_agc_linearity(sw, atten, nanovna, freq_mhz: float,
|
|
writer, csv_file, cal: dict, settle_ms: int) -> list:
|
|
"""Sweep attenuator from 0 to 31.5 dB and record AGC at each step.
|
|
|
|
Maps the AGC transfer function: how AGC register values respond to
|
|
known changes in input power.
|
|
"""
|
|
print(f"\n=== AGC Linearity Test at {freq_mhz:.1f} MHz ===")
|
|
results = []
|
|
path_loss = interpolate_loss(cal, freq_mhz)
|
|
if path_loss > 0:
|
|
print(f" Calibrated path loss: {path_loss:.1f} dB")
|
|
|
|
# Set NanoVNA to CW at the test frequency
|
|
if nanovna:
|
|
nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3)
|
|
print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3")
|
|
else:
|
|
manual_nanovna_set(freq_mhz, power=3)
|
|
|
|
# Tune SkyWalker-1 to the frequency
|
|
freq_khz = int(freq_mhz * 1000)
|
|
|
|
print(f"\n {'Atten dB':>9} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} "
|
|
f"{'SNR raw':>8} {'Lock':>5}")
|
|
print(f" {'─' * 9} {'─' * 6} {'─' * 6} {'─' * 9} {'─' * 8} {'─' * 5}")
|
|
|
|
# Sweep in 0.5 dB steps from 0 to 31.5 dB (64 steps)
|
|
# Use integer step counter to avoid IEEE 754 float accumulation drift
|
|
for step in range(64): # 0, 1, 2, ... 63 → 0.0, 0.5, 1.0, ... 31.5
|
|
atten_db = step * 0.5
|
|
atten.set_db(atten_db)
|
|
time.sleep(settle_ms / 1000.0)
|
|
|
|
result = sw.tune_monitor(
|
|
symbol_rate_sps=1000000, freq_khz=freq_khz,
|
|
mod_index=0, fec_index=5, dwell_ms=50
|
|
)
|
|
|
|
locked = "Y" if result.get("locked") else "N"
|
|
print(f" {atten_db:9.1f} {result['agc1']:6d} {result['agc2']:6d} "
|
|
f"{result['power_db']:9.2f} {result['snr_raw']:8d} {locked:>5}")
|
|
|
|
effective_atten = atten_db + path_loss
|
|
note = f"effective_atten={effective_atten:.1f}dB"
|
|
if writer:
|
|
write_row(writer, csv_file, "agc_linearity", freq_mhz, atten_db,
|
|
result, note)
|
|
|
|
results.append({"atten_db": atten_db, **result})
|
|
|
|
# Summary
|
|
if results:
|
|
agc1_min = min(r["agc1"] for r in results)
|
|
agc1_max = max(r["agc1"] for r in results)
|
|
print(f"\n AGC1 range: {agc1_min} - {agc1_max} "
|
|
f"(delta={agc1_max - agc1_min}) over 31.5 dB sweep")
|
|
|
|
return results
|
|
|
|
|
|
# --- Test: IF Band Flatness ---
|
|
|
|
def test_band_flatness(sw, atten, nanovna, start_mhz: float,
|
|
stop_mhz: float, step_mhz: float,
|
|
writer, csv_file, cal: dict, settle_ms: int) -> list:
|
|
"""Sweep CW across the IF band and record AGC at each frequency.
|
|
|
|
Reveals tuner gain slope, passband ripple, and the IF filter response.
|
|
"""
|
|
atten_db = 10.0 # Fixed attenuation — mid-range for good dynamic range
|
|
print(f"\n=== IF Band Flatness: {start_mhz:.0f}-{stop_mhz:.0f} MHz, "
|
|
f"step={step_mhz:.1f} MHz ===")
|
|
print(f" HMC472A fixed at {atten_db:.1f} dB")
|
|
|
|
atten.set_db(atten_db)
|
|
results = []
|
|
|
|
# Use integer step counter to avoid float accumulation drift
|
|
n_steps = int(round((stop_mhz - start_mhz) / step_mhz)) + 1
|
|
|
|
print(f"\n {'Step':>5} {'Freq MHz':>9} {'AGC1':>6} {'AGC2':>6} "
|
|
f"{'Power dB':>9} {'PathLoss':>9} {'Corr dB':>8}")
|
|
print(f" {'─' * 5} {'─' * 9} {'─' * 6} {'─' * 6} "
|
|
f"{'─' * 9} {'─' * 9} {'─' * 8}")
|
|
|
|
for step_num in range(n_steps):
|
|
freq_mhz = start_mhz + step_num * step_mhz
|
|
|
|
# Set NanoVNA CW
|
|
if nanovna:
|
|
nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3)
|
|
else:
|
|
manual_nanovna_set(freq_mhz, power=3)
|
|
|
|
time.sleep(settle_ms / 1000.0)
|
|
|
|
# Tune SkyWalker-1
|
|
freq_khz = int(freq_mhz * 1000)
|
|
result = sw.tune_monitor(
|
|
symbol_rate_sps=1000000, freq_khz=freq_khz,
|
|
mod_index=0, fec_index=5, dwell_ms=50
|
|
)
|
|
|
|
path_loss = interpolate_loss(cal, freq_mhz)
|
|
corrected = result["power_db"] + path_loss
|
|
|
|
print(f" {step_num + 1:5d} {freq_mhz:9.1f} {result['agc1']:6d} "
|
|
f"{result['agc2']:6d} {result['power_db']:9.2f} "
|
|
f"{path_loss:9.1f} {corrected:8.2f}")
|
|
|
|
note = f"path_loss={path_loss:.1f}dB corrected={corrected:.2f}dB"
|
|
if writer:
|
|
write_row(writer, csv_file, "band_flatness", freq_mhz, atten_db,
|
|
result, note)
|
|
|
|
results.append({"freq_mhz": freq_mhz, "corrected_db": corrected, **result})
|
|
|
|
# Summary
|
|
if results:
|
|
powers = [r["corrected_db"] for r in results]
|
|
ripple = max(powers) - min(powers)
|
|
print(f"\n Band flatness: {ripple:.2f} dB ripple "
|
|
f"(min={min(powers):.2f}, max={max(powers):.2f})")
|
|
|
|
return results
|
|
|
|
|
|
# --- Test: Frequency Accuracy ---
|
|
|
|
def test_freq_accuracy(sw, atten, nanovna, test_freqs: list,
|
|
writer, csv_file, settle_ms: int) -> list:
|
|
"""Inject CW at known frequencies, sweep SkyWalker-1 around each one.
|
|
|
|
Compares detected peak vs. injected frequency to characterize the
|
|
BCM3440 tuner's frequency accuracy.
|
|
"""
|
|
print(f"\n=== Frequency Accuracy Test ===")
|
|
atten_db = 10.0
|
|
atten.set_db(atten_db)
|
|
|
|
results = []
|
|
sweep_span_mhz = 10.0 # Sweep +/- 5 MHz around each test freq
|
|
sweep_step_mhz = 1.0
|
|
|
|
for inject_freq in test_freqs:
|
|
print(f"\n Injecting CW at {inject_freq:.3f} MHz...")
|
|
if nanovna:
|
|
nanovna.cw(frequency_hz=int(inject_freq * 1e6), power=3)
|
|
else:
|
|
manual_nanovna_set(inject_freq, power=3)
|
|
|
|
time.sleep(settle_ms / 1000.0)
|
|
|
|
# Sweep around the expected frequency
|
|
sweep_start = inject_freq - sweep_span_mhz / 2
|
|
sweep_stop = inject_freq + sweep_span_mhz / 2
|
|
|
|
freqs, powers, raw = sw.sweep_spectrum(
|
|
sweep_start, sweep_stop,
|
|
step_mhz=sweep_step_mhz, dwell_ms=50,
|
|
sr_ksps=1000, mod_index=0, fec_index=5,
|
|
)
|
|
|
|
# Find peak
|
|
if powers:
|
|
peak_idx = max(range(len(powers)), key=lambda i: powers[i])
|
|
peak_freq = freqs[peak_idx]
|
|
peak_power = powers[peak_idx]
|
|
error_mhz = peak_freq - inject_freq
|
|
error_khz = error_mhz * 1000
|
|
|
|
print(f" Injected: {inject_freq:.3f} MHz "
|
|
f"Detected peak: {peak_freq:.3f} MHz "
|
|
f"Error: {error_khz:+.0f} kHz")
|
|
|
|
result_entry = {
|
|
"inject_freq_mhz": inject_freq,
|
|
"peak_freq_mhz": peak_freq,
|
|
"error_khz": error_khz,
|
|
"peak_power_db": peak_power,
|
|
}
|
|
results.append(result_entry)
|
|
|
|
if writer:
|
|
peak_result = raw[peak_idx] if isinstance(raw[peak_idx], dict) else {
|
|
"agc1": 0, "agc2": 0, "power_db": peak_power,
|
|
"snr_raw": 0, "snr_db": 0,
|
|
"locked": False, "lock": 0, "status": 0,
|
|
}
|
|
write_row(writer, csv_file, "freq_accuracy", inject_freq,
|
|
atten_db, peak_result,
|
|
f"peak={peak_freq:.3f}MHz error={error_khz:+.0f}kHz")
|
|
|
|
# Summary
|
|
if results:
|
|
errors = [r["error_khz"] for r in results]
|
|
mean_err = sum(errors) / len(errors)
|
|
max_err = max(abs(e) for e in errors)
|
|
print(f"\n Mean frequency error: {mean_err:+.0f} kHz")
|
|
print(f" Max absolute error: {max_err:.0f} kHz")
|
|
|
|
return results
|
|
|
|
|
|
# --- Test: Minimum Detectable Signal ---
|
|
|
|
def test_mds(sw, atten, nanovna, freq_mhz: float,
|
|
writer, csv_file, settle_ms: int) -> dict:
|
|
"""Find the minimum detectable signal level.
|
|
|
|
Measures noise floor with NanoVNA off (or max attenuation), then
|
|
increases attenuation from 0 until the CW signal is indistinguishable
|
|
from noise.
|
|
"""
|
|
print(f"\n=== Minimum Detectable Signal at {freq_mhz:.1f} MHz ===")
|
|
freq_khz = int(freq_mhz * 1000)
|
|
|
|
# Step 1: measure noise floor (max attenuation)
|
|
print(" Measuring noise floor (31.5 dB attenuation)...")
|
|
atten.set_db(31.5)
|
|
time.sleep(0.2)
|
|
|
|
noise_readings = []
|
|
for _ in range(10):
|
|
r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50)
|
|
noise_readings.append(r["power_db"])
|
|
time.sleep(0.05)
|
|
|
|
noise_floor = sum(noise_readings) / len(noise_readings)
|
|
noise_std = (sum((x - noise_floor) ** 2 for x in noise_readings)
|
|
/ len(noise_readings)) ** 0.5
|
|
threshold = noise_floor + max(3.0 * noise_std, 1.0) # 3-sigma above noise
|
|
print(f" Noise floor: {noise_floor:.2f} dB (std={noise_std:.3f})")
|
|
print(f" Detection threshold: {threshold:.2f} dB (noise + 3sigma)")
|
|
|
|
# Step 2: inject CW and increase attenuation until signal disappears
|
|
if nanovna:
|
|
nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3)
|
|
print(f" NanoVNA CW: {freq_mhz:.3f} MHz, power=3")
|
|
else:
|
|
manual_nanovna_set(freq_mhz, power=3)
|
|
|
|
print(f"\n {'Atten dB':>9} {'Power dB':>9} {'Above noise':>12} {'Detected':>9}")
|
|
print(f" {'─' * 9} {'─' * 9} {'─' * 12} {'─' * 9}")
|
|
|
|
mds_atten = None
|
|
# 1 dB steps: 0, 1, 2, ... 31 (32 steps)
|
|
for step in range(32):
|
|
atten_db = float(step)
|
|
atten.set_db(atten_db)
|
|
time.sleep(settle_ms / 1000.0)
|
|
|
|
# Average 5 readings for stability
|
|
readings = []
|
|
r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50)
|
|
readings.append(r["power_db"])
|
|
for _ in range(4):
|
|
time.sleep(0.02)
|
|
r = sw.tune_monitor(1000000, freq_khz, 0, 5, 50)
|
|
readings.append(r["power_db"])
|
|
|
|
avg_power = sum(readings) / len(readings)
|
|
above_noise = avg_power - noise_floor
|
|
detected = avg_power > threshold
|
|
|
|
marker = "YES" if detected else "---"
|
|
print(f" {atten_db:9.1f} {avg_power:9.2f} {above_noise:+12.2f} "
|
|
f"{marker:>9}")
|
|
|
|
if writer:
|
|
# Use averaged power instead of last single reading
|
|
avg_result = dict(r)
|
|
avg_result["power_db"] = avg_power
|
|
write_row(writer, csv_file, "mds", freq_mhz, atten_db,
|
|
avg_result,
|
|
f"avg={avg_power:.2f} noise={noise_floor:.2f} "
|
|
f"detected={'Y' if detected else 'N'}")
|
|
|
|
if not detected and mds_atten is None:
|
|
mds_atten = atten_db
|
|
|
|
result = {
|
|
"freq_mhz": freq_mhz,
|
|
"noise_floor_db": noise_floor,
|
|
"noise_std": noise_std,
|
|
"threshold_db": threshold,
|
|
"mds_atten_db": mds_atten,
|
|
}
|
|
|
|
if mds_atten is not None:
|
|
print(f"\n Signal lost at {mds_atten:.1f} dB attenuation")
|
|
print(f" (NanoVNA output ~-15 dBm minus {mds_atten:.1f} dB path = "
|
|
f"~{-15 - mds_atten:.0f} dBm at receiver)")
|
|
else:
|
|
print(f"\n Signal detected at all attenuation levels (0-31.5 dB)")
|
|
print(f" Need more attenuation to find MDS")
|
|
|
|
return result
|
|
|
|
|
|
# --- Test: BPSK Mode 9 CW Probe ---
|
|
|
|
def test_bpsk_probe(sw, atten, nanovna, freq_mhz: float,
|
|
writer, csv_file, settle_ms: int) -> dict:
|
|
"""Probe BPSK mode 9 response to an unmodulated CW carrier.
|
|
|
|
BPSK mode (index 9) uses Viterbi rate 1/2 K=7 — the same inner FEC
|
|
as GOES LRIT. A CW carrier has no modulation, so the demodulator
|
|
shouldn't lock, but the AGC and carrier recovery behavior reveals
|
|
how mode 9 handles a clean carrier.
|
|
"""
|
|
print(f"\n=== BPSK Mode 9 CW Probe at {freq_mhz:.1f} MHz ===")
|
|
bpsk_index = MODULATIONS["bpsk"][0] # Mode 9
|
|
freq_khz = int(freq_mhz * 1000)
|
|
atten_db = 10.0
|
|
atten.set_db(atten_db)
|
|
|
|
if nanovna:
|
|
nanovna.cw(frequency_hz=int(freq_mhz * 1e6), power=3)
|
|
else:
|
|
manual_nanovna_set(freq_mhz, power=3)
|
|
|
|
time.sleep(settle_ms / 1000.0)
|
|
|
|
# Test with different symbol rates typical of LRIT-like signals
|
|
test_rates = [293883, 500000, 1000000, 5000000]
|
|
|
|
print(f"\n {'SR (sps)':>10} {'AGC1':>6} {'AGC2':>6} {'Power dB':>9} "
|
|
f"{'SNR raw':>8} {'SNR dB':>7} {'Lock':>6} {'Status':>8}")
|
|
print(f" {'─' * 10} {'─' * 6} {'─' * 6} {'─' * 9} "
|
|
f"{'─' * 8} {'─' * 7} {'─' * 6} {'─' * 8}")
|
|
|
|
results = []
|
|
for sr in test_rates:
|
|
# FEC 1/2 (index 0) for BPSK mode
|
|
result = sw.tune_monitor(sr, freq_khz, bpsk_index, 0, dwell_ms=100)
|
|
|
|
locked = "Y" if result.get("locked") else "N"
|
|
print(f" {sr:10d} {result['agc1']:6d} {result['agc2']:6d} "
|
|
f"{result['power_db']:9.2f} {result['snr_raw']:8d} "
|
|
f"{result['snr_db']:7.2f} {locked:>6} "
|
|
f"0x{result.get('status', 0):02X}")
|
|
|
|
if writer:
|
|
write_row(writer, csv_file, "bpsk_probe", freq_mhz, atten_db,
|
|
result, f"mode=bpsk sr={sr} fec=1/2")
|
|
|
|
results.append({"symbol_rate": sr, **result})
|
|
|
|
# Compare with QPSK mode 0 at same settings
|
|
print(f"\n Reference: QPSK mode 0 at same frequency")
|
|
ref = sw.tune_monitor(1000000, freq_khz, 0, 5, dwell_ms=100)
|
|
ref_locked = "Y" if ref.get("locked") else "N"
|
|
print(f" {'1000000':>10} {ref['agc1']:6d} {ref['agc2']:6d} "
|
|
f"{ref['power_db']:9.2f} {ref['snr_raw']:8d} "
|
|
f"{ref['snr_db']:7.2f} {ref_locked:>6} "
|
|
f"0x{ref.get('status', 0):02X}")
|
|
|
|
return {"bpsk_results": results, "qpsk_reference": ref}
|
|
|
|
|
|
# --- Main ---
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
prog="rf_testbench.py",
|
|
description="CW injection test bench: NanoVNA + HMC472A + SkyWalker-1",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""\
|
|
examples:
|
|
%(prog)s agc-linearity --freq 1200
|
|
%(prog)s band-flatness --start 950 --stop 1500 --step 10
|
|
%(prog)s freq-accuracy --freqs 1000,1200,1400
|
|
%(prog)s mds --freq 1200
|
|
%(prog)s bpsk-probe --freq 1200
|
|
%(prog)s band-flatness --attenuator /dev/ttyACM1
|
|
%(prog)s agc-linearity --attenuator http://attenuator.local --freq 1200
|
|
|
|
hardware setup:
|
|
NanoVNA CH0 → DC Blocker → HMC472A (0-31.5 dB) → SMA-to-F → SkyWalker-1
|
|
|
|
The HMC472A is controlled via USB serial (preferred) or HTTP REST API.
|
|
Use --attenuator auto (default) to auto-detect USB, falling back to HTTP.
|
|
The NanoVNA provides CW, controlled via mcnanovna or manually.
|
|
LNB power is disabled (direct L-band input mode).
|
|
""",
|
|
)
|
|
|
|
parser.add_argument("-v", "--verbose", action="store_true",
|
|
help="Show raw USB traffic")
|
|
parser.add_argument("-o", "--output", type=str, default=None,
|
|
help="CSV output file path")
|
|
parser.add_argument("--cal", type=str, default=None,
|
|
help="Path loss calibration CSV (NanoVNA S21 sweep)")
|
|
parser.add_argument("--attenuator", type=str, default="auto",
|
|
help="HMC472A connection: 'auto' (USB then HTTP), "
|
|
"/dev/ttyACMx (USB serial), or http://host (REST) "
|
|
"(default: auto)")
|
|
parser.add_argument("--nanovna", choices=["auto", "manual"],
|
|
default="auto",
|
|
help="NanoVNA control mode (default: auto via mcnanovna)")
|
|
parser.add_argument("--settle", type=int, default=200,
|
|
help="Settle time in ms after changing attenuation "
|
|
"(default: 200)")
|
|
|
|
sub = parser.add_subparsers(dest="test", required=True)
|
|
|
|
# AGC linearity
|
|
p_agc = sub.add_parser("agc-linearity",
|
|
help="Sweep attenuation at fixed freq, map AGC curve")
|
|
p_agc.add_argument("--freq", type=float, default=1200.0,
|
|
help="Test frequency in MHz (default: 1200)")
|
|
|
|
# Band flatness
|
|
p_band = sub.add_parser("band-flatness",
|
|
help="Sweep CW across IF band, measure AGC response")
|
|
p_band.add_argument("--start", type=float, default=950.0,
|
|
help="Start frequency in MHz (default: 950)")
|
|
p_band.add_argument("--stop", type=float, default=1500.0,
|
|
help="Stop frequency in MHz (default: 1500)")
|
|
p_band.add_argument("--step", type=float, default=10.0,
|
|
help="Frequency step in MHz (default: 10)")
|
|
|
|
# Frequency accuracy
|
|
p_freq = sub.add_parser("freq-accuracy",
|
|
help="Inject CW at known freqs, measure error")
|
|
p_freq.add_argument("--freqs", type=str, default="1000,1100,1200,1300,1400",
|
|
help="Comma-separated test frequencies in MHz")
|
|
|
|
# Minimum detectable signal
|
|
p_mds = sub.add_parser("mds",
|
|
help="Find minimum detectable signal level")
|
|
p_mds.add_argument("--freq", type=float, default=1200.0,
|
|
help="Test frequency in MHz (default: 1200)")
|
|
|
|
# BPSK mode 9 probe
|
|
p_bpsk = sub.add_parser("bpsk-probe",
|
|
help="Probe BPSK mode 9 with CW carrier")
|
|
p_bpsk.add_argument("--freq", type=float, default=1200.0,
|
|
help="Test frequency in MHz (default: 1200)")
|
|
|
|
return parser
|
|
|
|
|
|
def _connect_attenuator(target: str):
|
|
"""Connect to HMC472A via auto-detect, USB serial, or HTTP REST.
|
|
|
|
Args:
|
|
target: "auto", a serial port path (/dev/ttyACM*), or an HTTP URL.
|
|
"""
|
|
# Auto-detect: try USB serial first, fall back to HTTP
|
|
if target == "auto":
|
|
port = detect_hmc472a_serial()
|
|
if port:
|
|
print(f"HMC472A: auto-detected USB serial on {port}")
|
|
target = port
|
|
else:
|
|
print("HMC472A: no USB device found, trying HTTP...")
|
|
target = "http://attenuator.local"
|
|
|
|
# USB serial path
|
|
if target.startswith("/dev/"):
|
|
try:
|
|
atten = HMC472ASerial(target)
|
|
info = atten.identify()
|
|
print(f"HMC472A: USB serial on {target} "
|
|
f"(v{info.get('version', '?')}, "
|
|
f"protocol {info.get('protocol', '?')})")
|
|
return atten
|
|
except ImportError:
|
|
print("HMC472A: pyserial not installed (pip install pyserial)")
|
|
sys.exit(1)
|
|
except (OSError, TimeoutError) as e:
|
|
print(f"HMC472A: cannot open {target} ({e})")
|
|
sys.exit(1)
|
|
|
|
# HTTP REST API
|
|
atten = HMC472A(target)
|
|
try:
|
|
cfg = atten.config()
|
|
print(f"HMC472A: HTTP on {target} ({cfg.get('hostname', '?')}, "
|
|
f"v{cfg.get('version', '?')})")
|
|
return atten
|
|
except (URLError, OSError) as e:
|
|
print(f"HMC472A: cannot reach {target} ({e})")
|
|
print(" Use --attenuator /dev/ttyACMx (USB) or http://host (HTTP)")
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
|
|
# Mock mode for testing without hardware
|
|
mock_mode = os.environ.get("SKYWALKER_MOCK")
|
|
|
|
# Set up HMC472A attenuator
|
|
if mock_mode:
|
|
atten = MockHMC472A()
|
|
print("HMC472A: mock mode")
|
|
else:
|
|
atten = _connect_attenuator(args.attenuator)
|
|
|
|
# Set up NanoVNA
|
|
nanovna = None
|
|
if mock_mode:
|
|
nanovna = MockNanoVNA()
|
|
print("NanoVNA: mock mode")
|
|
elif args.nanovna == "auto":
|
|
NanoVNA = try_import_nanovna()
|
|
if NanoVNA:
|
|
try:
|
|
nanovna = NanoVNA()
|
|
print(f"NanoVNA: auto mode (mcnanovna)")
|
|
except Exception as e:
|
|
print(f"NanoVNA: mcnanovna failed ({e}), falling back to manual")
|
|
else:
|
|
print("NanoVNA: mcnanovna not installed, using manual mode")
|
|
print(" Install: uv pip install -e /path/to/mcnanovna")
|
|
else:
|
|
print("NanoVNA: manual mode (you'll be prompted to set frequencies)")
|
|
|
|
# Load calibration
|
|
cal = {}
|
|
if args.cal:
|
|
cal = load_cal_file(args.cal)
|
|
print(f"Calibration: loaded {len(cal)} points from {args.cal}")
|
|
|
|
# Open CSV output
|
|
csv_file = None
|
|
writer = None
|
|
if args.output:
|
|
csv_file, writer = open_csv(args.output)
|
|
|
|
# Open SkyWalker-1
|
|
# SAFETY: Boot demodulator WITHOUT enabling LNB power. ensure_booted()
|
|
# transiently enables LNB voltage (13-18V on the F-connector), which
|
|
# would travel backward through the attenuator toward the NanoVNA.
|
|
# The DC blocker protects against this, but code should never rely on
|
|
# external protection it cannot verify.
|
|
if mock_mode:
|
|
sw = _make_mock_skywalker(args.verbose)
|
|
print("SkyWalker-1: mock mode")
|
|
else:
|
|
sw = SkyWalker1(verbose=args.verbose)
|
|
sw.open()
|
|
# Ensure LNB power is OFF before booting demodulator
|
|
sw.start_intersil(on=False)
|
|
status = sw.get_config()
|
|
if not (status & 0x01):
|
|
sw.boot(on=True)
|
|
time.sleep(0.5)
|
|
status = sw.get_config()
|
|
if not (status & 0x01):
|
|
print("ERROR: Device failed to start")
|
|
sys.exit(1)
|
|
print("SkyWalker-1: booted (LNB power kept OFF)")
|
|
|
|
# Confirm LNB power disabled — direct input mode
|
|
sw.start_intersil(on=False)
|
|
print("LNB power disabled (direct input mode)")
|
|
print()
|
|
|
|
try:
|
|
if args.test == "agc-linearity":
|
|
test_agc_linearity(sw, atten, nanovna, args.freq,
|
|
writer, csv_file, cal, args.settle)
|
|
elif args.test == "band-flatness":
|
|
test_band_flatness(sw, atten, nanovna, args.start, args.stop,
|
|
args.step, writer, csv_file, cal, args.settle)
|
|
elif args.test == "freq-accuracy":
|
|
freqs = [float(f) for f in args.freqs.split(",")]
|
|
test_freq_accuracy(sw, atten, nanovna, freqs,
|
|
writer, csv_file, args.settle)
|
|
elif args.test == "mds":
|
|
test_mds(sw, atten, nanovna, args.freq,
|
|
writer, csv_file, args.settle)
|
|
elif args.test == "bpsk-probe":
|
|
test_bpsk_probe(sw, atten, nanovna, args.freq,
|
|
writer, csv_file, args.settle)
|
|
except KeyboardInterrupt:
|
|
print("\n\nInterrupted by operator.")
|
|
finally:
|
|
# Safe state: maximum attenuation before releasing hardware
|
|
try:
|
|
atten.set_db(31.5)
|
|
print("Attenuator set to 31.5 dB (safe state)")
|
|
except Exception:
|
|
pass # best-effort on cleanup path
|
|
if csv_file:
|
|
csv_file.flush()
|
|
csv_file.close()
|
|
print(f"\nData saved to {args.output}")
|
|
if not mock_mode:
|
|
sw.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|