Full protocol coverage: 60 tools, 5 prompts, S-parameter math

Phase 1 — Add 33 new tool methods to nanovna.py wrapping every
remaining firmware shell command (measure, config, saveconfig,
clearconfig, color, freq, tcxo, vbat_offset, touchcal, touchtest,
refresh, touch, release, sd_list, sd_read, sd_delete, time, i2c,
si, lcd, threads, stat, sample, test, gain, dump, port, offset,
dac, usart_cfg, usart, band) plus server-side analyze tool.
All capability-gated for firmware variants.

Phase 2 — New prompts.py with 5 FastMCP guided workflows:
calibrate (SOLT), export_touchstone (S1P/S2P), analyze_antenna,
measure_cable, compare_sweeps. Includes HAM_BANDS presets from
160m through 23cm.

Phase 3 — New calculations.py with pure-Python S-parameter math:
swr, return_loss, impedance, admittance, phase_deg, insertion_loss,
group_delay, q_factor, capacitance, inductance, find_resonance,
find_bandwidth, analyze_scan.
This commit is contained in:
Ryan Malloy 2026-01-30 14:24:22 -07:00
parent 68a8705baf
commit 4569fea9f9
4 changed files with 1561 additions and 8 deletions

View File

@ -0,0 +1,438 @@
"""S-parameter math utilities for NanoVNA measurement analysis.
Pure-Python functions operating on complex S-parameter data from scan results.
No device access just math on arrays of complex values.
All functions accept complex numbers in the form returned by the scan tool:
s = complex(real, imag)
"""
from __future__ import annotations
import cmath
import math
def swr(s11: complex) -> float:
"""Calculate Standing Wave Ratio from S11 reflection coefficient.
Args:
s11: Complex reflection coefficient (Gamma)
Returns:
SWR ratio (1.0 = perfect match, infinity = total reflection)
"""
gamma_mag = abs(s11)
if gamma_mag >= 1.0:
return float("inf")
return (1.0 + gamma_mag) / (1.0 - gamma_mag)
def return_loss(s11: complex) -> float:
"""Calculate return loss in dB from S11.
Args:
s11: Complex reflection coefficient
Returns:
Return loss in dB (positive value; higher = better match)
"""
gamma_mag = abs(s11)
if gamma_mag <= 0.0:
return float("inf")
return -20.0 * math.log10(gamma_mag)
def impedance(s11: complex, z0: float = 50.0) -> complex:
"""Calculate impedance from S11 reflection coefficient.
Args:
s11: Complex reflection coefficient
z0: Reference impedance in ohms (default 50)
Returns:
Complex impedance Z = R + jX in ohms
"""
denom = 1.0 - s11
if abs(denom) < 1e-12:
return complex(float("inf"), 0.0)
return z0 * (1.0 + s11) / denom
def admittance(s11: complex, z0: float = 50.0) -> complex:
"""Calculate admittance from S11 reflection coefficient.
Args:
s11: Complex reflection coefficient
z0: Reference impedance in ohms (default 50)
Returns:
Complex admittance Y = G + jB in siemens
"""
z = impedance(s11, z0)
if abs(z) < 1e-12:
return complex(float("inf"), 0.0)
return 1.0 / z
def phase_deg(s: complex) -> float:
"""Calculate phase angle in degrees.
Args:
s: Complex S-parameter value
Returns:
Phase in degrees (-180 to +180)
"""
return math.degrees(cmath.phase(s))
def insertion_loss(s21: complex) -> float:
"""Calculate insertion loss in dB from S21.
Args:
s21: Complex transmission coefficient
Returns:
Insertion loss in dB (positive value; lower = less loss)
"""
mag = abs(s21)
if mag <= 0.0:
return float("inf")
return -20.0 * math.log10(mag)
def group_delay(s21_values: list[complex], frequencies_hz: list[int]) -> list[float]:
"""Calculate group delay from S21 phase vs frequency.
Uses finite differences: τ_g = -/ where ω = 2πf
Args:
s21_values: List of complex S21 values at each frequency
frequencies_hz: Corresponding frequency points in Hz
Returns:
Group delay in seconds at each point (same length as input;
endpoints use forward/backward difference)
"""
n = len(s21_values)
if n < 2 or len(frequencies_hz) != n:
return [0.0] * n
phases = [cmath.phase(s) for s in s21_values]
# Unwrap phase to avoid discontinuities
for i in range(1, n):
diff = phases[i] - phases[i - 1]
if diff > math.pi:
phases[i] -= 2.0 * math.pi * round(diff / (2.0 * math.pi))
elif diff < -math.pi:
phases[i] += 2.0 * math.pi * round(-diff / (2.0 * math.pi))
delays = []
for i in range(n):
if i == 0:
dphi = phases[1] - phases[0]
df = frequencies_hz[1] - frequencies_hz[0]
elif i == n - 1:
dphi = phases[n - 1] - phases[n - 2]
df = frequencies_hz[n - 1] - frequencies_hz[n - 2]
else:
dphi = phases[i + 1] - phases[i - 1]
df = frequencies_hz[i + 1] - frequencies_hz[i - 1]
if df == 0:
delays.append(0.0)
else:
omega_diff = 2.0 * math.pi * df
delays.append(-dphi / omega_diff)
return delays
def q_factor(s11_values: list[complex], frequencies_hz: list[int]) -> float:
"""Calculate Q factor from S11 data (resonant circuit).
Q = f0 / BW_3dB where f0 is the resonant frequency and BW_3dB
is the -3dB bandwidth of the return loss peak.
Args:
s11_values: List of complex S11 values
frequencies_hz: Corresponding frequency points in Hz
Returns:
Q factor (dimensionless). Returns 0 if resonance not found.
"""
if len(s11_values) < 3:
return 0.0
# Find resonance (minimum |S11|)
magnitudes = [abs(s) for s in s11_values]
min_idx = magnitudes.index(min(magnitudes))
min_mag = magnitudes[min_idx]
f0 = frequencies_hz[min_idx]
# 3dB point is where |S11| = min_mag * sqrt(2)
# (relative to the dip, not absolute)
threshold = min_mag * math.sqrt(2.0)
if threshold >= 1.0:
threshold = (1.0 + min_mag) / 2.0 # fallback
# Search left for lower -3dB crossing
f_low = frequencies_hz[0]
for i in range(min_idx, 0, -1):
if magnitudes[i] >= threshold:
# Interpolate
if magnitudes[i] != magnitudes[i - 1]:
frac = (threshold - magnitudes[i]) / (magnitudes[i - 1] - magnitudes[i])
else:
frac = 0.0
f_low = frequencies_hz[i] + frac * (frequencies_hz[i - 1] - frequencies_hz[i])
break
# Search right for upper -3dB crossing
f_high = frequencies_hz[-1]
for i in range(min_idx, len(magnitudes) - 1):
if magnitudes[i] >= threshold:
if magnitudes[i] != magnitudes[i - 1]:
frac = (threshold - magnitudes[i - 1]) / (magnitudes[i] - magnitudes[i - 1])
else:
frac = 0.0
f_high = frequencies_hz[i - 1] + frac * (frequencies_hz[i] - frequencies_hz[i - 1])
break
bw = abs(f_high - f_low)
if bw < 1.0:
return 0.0
return f0 / bw
def capacitance(reactance: float, frequency_hz: int) -> float:
"""Calculate capacitance from reactance at a given frequency.
C = -1 / (2πfX) for capacitive reactance (X < 0)
Args:
reactance: Imaginary part of impedance (X) in ohms
frequency_hz: Frequency in Hz
Returns:
Capacitance in farads. Returns 0 if reactance is non-negative.
"""
if reactance >= 0.0 or frequency_hz <= 0:
return 0.0
return -1.0 / (2.0 * math.pi * frequency_hz * reactance)
def inductance(reactance: float, frequency_hz: int) -> float:
"""Calculate inductance from reactance at a given frequency.
L = X / (2πf) for inductive reactance (X > 0)
Args:
reactance: Imaginary part of impedance (X) in ohms
frequency_hz: Frequency in Hz
Returns:
Inductance in henries. Returns 0 if reactance is non-positive.
"""
if reactance <= 0.0 or frequency_hz <= 0:
return 0.0
return reactance / (2.0 * math.pi * frequency_hz)
def find_resonance(
s11_values: list[complex],
frequencies_hz: list[int],
z0: float = 50.0,
) -> dict:
"""Find the resonant frequency from S11 data.
Resonance is the frequency where |S11| is minimum (best match).
Args:
s11_values: List of complex S11 values
frequencies_hz: Corresponding frequency points in Hz
z0: Reference impedance in ohms
Returns:
Dict with resonant frequency, SWR, impedance, return loss
"""
if not s11_values or len(s11_values) != len(frequencies_hz):
return {"error": "Invalid input data"}
magnitudes = [abs(s) for s in s11_values]
min_idx = magnitudes.index(min(magnitudes))
s11_at_res = s11_values[min_idx]
freq = frequencies_hz[min_idx]
z = impedance(s11_at_res, z0)
return {
"frequency_hz": freq,
"swr": round(swr(s11_at_res), 4),
"return_loss_db": round(return_loss(s11_at_res), 2),
"impedance_real": round(z.real, 2),
"impedance_imag": round(z.imag, 2),
"phase_deg": round(phase_deg(s11_at_res), 2),
"gamma_mag": round(abs(s11_at_res), 6),
}
def find_bandwidth(
s11_values: list[complex],
frequencies_hz: list[int],
swr_limit: float = 2.0,
) -> dict:
"""Find the bandwidth where SWR is below a given limit.
Args:
s11_values: List of complex S11 values
frequencies_hz: Corresponding frequency points in Hz
swr_limit: SWR threshold (default 2.0:1)
Returns:
Dict with lower/upper frequency bounds and bandwidth
"""
if not s11_values or len(s11_values) != len(frequencies_hz):
return {"error": "Invalid input data"}
# Find all points within SWR limit
within = []
for i, s in enumerate(s11_values):
if swr(s) <= swr_limit:
within.append(i)
if not within:
return {
"swr_limit": swr_limit,
"bandwidth_hz": 0,
"f_low_hz": 0,
"f_high_hz": 0,
"f_center_hz": 0,
"in_band": False,
}
# Find the longest contiguous run containing the minimum SWR point
magnitudes = [abs(s) for s in s11_values]
min_idx = magnitudes.index(min(magnitudes))
# Find contiguous segment around the resonance
f_low_idx = min_idx
f_high_idx = min_idx
while f_low_idx > 0 and swr(s11_values[f_low_idx - 1]) <= swr_limit:
f_low_idx -= 1
while f_high_idx < len(s11_values) - 1 and swr(s11_values[f_high_idx + 1]) <= swr_limit:
f_high_idx += 1
f_low = frequencies_hz[f_low_idx]
f_high = frequencies_hz[f_high_idx]
bw = f_high - f_low
return {
"swr_limit": swr_limit,
"bandwidth_hz": bw,
"f_low_hz": f_low,
"f_high_hz": f_high,
"f_center_hz": (f_low + f_high) // 2,
"in_band": True,
}
def analyze_scan(
scan_data: list[dict],
z0: float = 50.0,
) -> dict:
"""Comprehensive analysis of scan results.
Accepts the 'data' array from the scan tool's return value and produces
a full analysis report including resonance, bandwidth, and per-point metrics.
Args:
scan_data: List of dicts with 'frequency_hz', 's11' (optional), 's21' (optional)
where s11/s21 have 'real' and 'imag' keys
z0: Reference impedance in ohms
Returns:
Analysis report dict with summary and per-point data
"""
if not scan_data:
return {"error": "No scan data provided"}
freqs = []
s11_vals = []
s21_vals = []
has_s11 = "s11" in scan_data[0]
has_s21 = "s21" in scan_data[0]
for pt in scan_data:
freqs.append(pt.get("frequency_hz", 0))
if has_s11:
s = pt["s11"]
s11_vals.append(complex(s["real"], s["imag"]))
if has_s21:
s = pt["s21"]
s21_vals.append(complex(s["real"], s["imag"]))
result: dict = {
"points": len(scan_data),
"start_hz": freqs[0] if freqs else 0,
"stop_hz": freqs[-1] if freqs else 0,
}
if has_s11:
res = find_resonance(s11_vals, freqs, z0)
bw = find_bandwidth(s11_vals, freqs, swr_limit=2.0)
bw_1_5 = find_bandwidth(s11_vals, freqs, swr_limit=1.5)
result["s11_analysis"] = {
"resonance": res,
"bandwidth_2_1": bw,
"bandwidth_1_5": bw_1_5,
}
# Per-point S11 metrics
s11_points = []
for i, s in enumerate(s11_vals):
z = impedance(s, z0)
entry = {
"frequency_hz": freqs[i],
"swr": round(swr(s), 4),
"return_loss_db": round(return_loss(s), 2),
"impedance_real": round(z.real, 2),
"impedance_imag": round(z.imag, 2),
"phase_deg": round(phase_deg(s), 2),
}
# Add reactive component identification
x = z.imag
if abs(x) > 0.1 and freqs[i] > 0:
if x < 0:
entry["capacitance_pf"] = round(capacitance(x, freqs[i]) * 1e12, 3)
else:
entry["inductance_nh"] = round(inductance(x, freqs[i]) * 1e9, 3)
s11_points.append(entry)
result["s11_points"] = s11_points
if has_s21:
# Per-point S21 metrics
s21_points = []
delays = group_delay(s21_vals, freqs)
for i, s in enumerate(s21_vals):
s21_points.append({
"frequency_hz": freqs[i],
"insertion_loss_db": round(insertion_loss(s), 2),
"phase_deg": round(phase_deg(s), 2),
"group_delay_ns": round(delays[i] * 1e9, 3),
})
result["s21_points"] = s21_points
# S21 summary
il_values = [insertion_loss(s) for s in s21_vals]
min_il = min(il_values)
min_il_idx = il_values.index(min_il)
result["s21_analysis"] = {
"min_insertion_loss_db": round(min_il, 2),
"min_insertion_loss_freq_hz": freqs[min_il_idx],
"max_insertion_loss_db": round(max(il_values), 2),
}
return result

View File

@ -7,6 +7,7 @@ in server.py. The NanoVNA class manages connection lifecycle with lazy auto-conn
from __future__ import annotations
import base64
import re
from mcnanovna.discovery import find_first_nanovna, find_nanovna_ports
from mcnanovna.protocol import (
@ -356,8 +357,6 @@ class NanoVNA:
if lines:
line = lines[0].strip()
# Try to parse "bandwidth <divider> (<freq>Hz)"
import re
m = re.match(r"bandwidth\s+(\d+)\s*\((\d+)\s*Hz\)", line, re.IGNORECASE)
if m:
return {"bandwidth_divider": int(m.group(1)), "bandwidth_hz": int(m.group(2))}
@ -686,3 +685,752 @@ class NanoVNA:
self._protocol.send_text_command(f"sweep {frequency_hz} {frequency_hz} 1")
self._protocol.send_text_command("resume")
return {"frequency_hz": frequency_hz, "power": power, "mode": "cw"}
# ── Phase 1 additions: Full protocol coverage ─────────────────────
# -- Essential commands --
def measure(self, mode: str | None = None) -> dict:
"""Set on-device measurement display mode.
Controls what the NanoVNA computes and displays on-screen. Available
modes depend on firmware build flags.
Args:
mode: Measurement mode one of: none, lc, lcshunt, lcseries,
xtal, filter, cable, resonance. Omit to get usage help.
"""
self._ensure_connected()
if not self._has_capability("measure"):
return {"error": "measure command not supported by this firmware"}
if mode is not None:
lines = self._protocol.send_text_command(f"measure {mode}")
return {"mode": mode, "response": lines}
lines = self._protocol.send_text_command("measure")
return {"response": lines}
def config(self, option: str | None = None, value: int | None = None) -> dict:
"""Query or set device configuration options.
Options depend on firmware build (e.g., auto, avg, connection, mode,
grid, dot, bk, flip, separator, tif). Each takes a value of 0 or 1.
Args:
option: Configuration option name
value: Value to set (0 or 1)
"""
self._ensure_connected()
if not self._has_capability("config"):
return {"error": "config command not supported by this firmware"}
if option is not None and value is not None:
lines = self._protocol.send_text_command(f"config {option} {value}")
return {"option": option, "value": value, "response": lines}
lines = self._protocol.send_text_command("config")
return {"response": lines}
def saveconfig(self) -> dict:
"""Save current device configuration to flash memory.
Saves config_t (distinct from calibration save which uses slots).
This includes touch calibration, display settings, TCXO frequency, etc.
"""
self._ensure_connected()
lines = self._protocol.send_text_command("saveconfig")
return {"saved": True, "response": lines}
def clearconfig(self, key: str = "1234") -> dict:
"""Clear all stored configuration and calibration data from flash.
This is a destructive operation requires the protection key '1234'.
After clearing, you must recalibrate the device.
Args:
key: Protection key (must be '1234' to confirm)
"""
self._ensure_connected()
if key != "1234":
return {"error": "Protection key must be '1234' to confirm clearing all data"}
lines = self._protocol.send_text_command(f"clearconfig {key}")
return {"cleared": True, "response": lines}
def color(self, color_id: int | None = None, rgb24: int | None = None) -> dict:
"""Get or set display color palette entries.
With no args, lists all color slots and their current RGB values.
With id + rgb24, sets a specific color.
Args:
color_id: Palette slot index (0-31)
rgb24: 24-bit RGB color value (e.g., 0xFF0000 for red)
"""
self._ensure_connected()
if not self._has_capability("color"):
return {"error": "color command not supported by this firmware"}
if color_id is not None and rgb24 is not None:
self._protocol.send_text_command(f"color {color_id} {rgb24}")
return {"color_id": color_id, "rgb24": f"0x{rgb24:06x}", "set": True}
lines = self._protocol.send_text_command("color")
colors = []
for line in lines:
line = line.strip()
if ":" in line:
parts = line.split(":")
try:
idx = int(parts[0].strip())
val = parts[1].strip()
colors.append({"id": idx, "rgb24": val})
except (ValueError, IndexError):
pass
return {"colors": colors, "raw": lines}
def freq(self, frequency_hz: int) -> dict:
"""Set a single output frequency and pause the sweep.
Useful for CW measurements or signal generation at a specific frequency.
The sweep is paused automatically.
Args:
frequency_hz: Output frequency in Hz
"""
self._ensure_connected()
self._protocol.send_text_command(f"freq {frequency_hz}")
return {"frequency_hz": frequency_hz, "sweep": "paused"}
def tcxo(self, frequency_hz: int | None = None) -> dict:
"""Get or set the TCXO reference oscillator frequency.
The TCXO frequency affects all frequency accuracy. Default is typically
26000000 Hz. Adjust if you have a precision frequency reference.
Args:
frequency_hz: TCXO frequency in Hz (e.g., 26000000)
"""
self._ensure_connected()
if not self._has_capability("tcxo"):
return {"error": "tcxo command not supported by this firmware"}
if frequency_hz is not None:
self._protocol.send_text_command(f"tcxo {frequency_hz}")
return {"tcxo_hz": frequency_hz, "set": True}
# Query mode returns "current: <value>"
lines = self._protocol.send_text_command("tcxo")
for line in lines:
if "current:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
return {"tcxo_hz": int(parts[1].strip())}
except ValueError:
pass
return {"response": lines}
def vbat_offset(self, offset: int | None = None) -> dict:
"""Get or set battery voltage measurement offset.
Calibrates the battery voltage reading. The offset is added to the
raw ADC value to compensate for hardware variations.
Args:
offset: Voltage offset value (raw ADC units)
"""
self._ensure_connected()
if not self._has_capability("vbat_offset"):
return {"error": "vbat_offset command not supported by this firmware"}
if offset is not None:
self._protocol.send_text_command(f"vbat_offset {offset}")
return {"offset": offset, "set": True}
lines = self._protocol.send_text_command("vbat_offset")
if lines:
try:
return {"offset": int(lines[0].strip())}
except ValueError:
pass
return {"response": lines}
# -- Touch / Remote Desktop --
def touchcal(self) -> dict:
"""Start touch screen calibration sequence.
Interactive: the device displays calibration targets. Touch the upper-left
corner, then the lower-right corner when prompted. Returns the calibration
parameters when complete.
Note: This is an interactive hardware procedure that requires physical
touch input on the device screen.
"""
self._ensure_connected()
lines = self._protocol.send_text_command("touchcal", timeout=30.0)
# Parse "touch cal params: a b c d"
for line in lines:
if "touch cal params:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
vals = parts[1].strip().split()
if len(vals) >= 4:
try:
return {
"calibrated": True,
"params": [int(v) for v in vals[:4]],
"response": lines,
}
except ValueError:
pass
return {"response": lines}
def touchtest(self) -> dict:
"""Start touch screen test mode.
Enters a mode where touch points are drawn on screen for verification.
Useful for checking touch calibration accuracy. Exit by sending another
command or resetting.
"""
self._ensure_connected()
lines = self._protocol.send_text_command("touchtest", timeout=10.0)
return {"mode": "touchtest", "response": lines}
def refresh(self, enable: str | None = None) -> dict:
"""Enable or disable remote desktop refresh mode.
When enabled, the device streams display updates over the serial
connection for remote viewing.
Args:
enable: 'on' to enable remote desktop, 'off' to disable
"""
self._ensure_connected()
if enable is not None:
if enable not in ("on", "off"):
return {"error": "enable must be 'on' or 'off'"}
lines = self._protocol.send_text_command(f"refresh {enable}")
return {"remote_desktop": enable, "response": lines}
return {"error": "usage: refresh on|off"}
def touch(self, x: int, y: int) -> dict:
"""Send a touch press event at screen coordinates.
Simulates a finger press on the NanoVNA touchscreen for remote control.
Follow with 'release' to complete the touch gesture.
Args:
x: X coordinate (0 to lcd_width-1)
y: Y coordinate (0 to lcd_height-1)
"""
self._ensure_connected()
lines = self._protocol.send_text_command(f"touch {x} {y}")
return {"action": "press", "x": x, "y": y, "response": lines}
def release(self, x: int = -1, y: int = -1) -> dict:
"""Send a touch release event at screen coordinates.
Completes a touch gesture started with 'touch'. If coordinates
are omitted (-1), releases at the last touch position.
Args:
x: X coordinate (-1 for last position)
y: Y coordinate (-1 for last position)
"""
self._ensure_connected()
if x >= 0 and y >= 0:
lines = self._protocol.send_text_command(f"release {x} {y}")
else:
lines = self._protocol.send_text_command("release")
return {"action": "release", "x": x, "y": y, "response": lines}
# -- SD Card storage --
def sd_list(self, pattern: str = "*.*") -> dict:
"""List files on the SD card.
Requires SD card hardware support. Returns filenames and sizes.
Args:
pattern: Glob pattern to filter files (default: '*.*')
"""
self._ensure_connected()
if not self._has_capability("sd_list"):
return {"error": "SD card commands not supported by this firmware/hardware"}
lines = self._protocol.send_text_command(f"sd_list {pattern}", timeout=10.0)
files = []
for line in lines:
line = line.strip()
if not line or "err:" in line.lower():
if "err:" in line.lower():
return {"error": line}
continue
parts = line.rsplit(" ", 1)
if len(parts) == 2:
try:
files.append({"name": parts[0], "size": int(parts[1])})
except ValueError:
files.append({"name": line, "size": 0})
else:
files.append({"name": line, "size": 0})
return {"files": files, "count": len(files)}
def sd_read(self, filename: str) -> dict:
"""Read a file from the SD card.
Returns the file content as base64-encoded data. The firmware sends
a 4-byte size header followed by raw file data.
Args:
filename: Name of the file to read
"""
self._ensure_connected()
if not self._has_capability("sd_read"):
return {"error": "SD card commands not supported by this firmware/hardware"}
import struct
import time
self._protocol._drain()
self._protocol._send_command(f"sd_read {filename}")
ser = self._protocol._require_connection()
old_timeout = ser.timeout
ser.timeout = 10.0
try:
buf = b""
deadline = time.monotonic() + 10.0
# Read past echo line
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
buf += chunk
if b"\r\n" in buf:
break
# Check for error response
content = buf.decode("ascii", errors="replace")
if "err:" in content.lower():
return {"error": "File not found or no SD card"}
echo_end = buf.index(b"\r\n") + 2
data_buf = buf[echo_end:]
# Read 4-byte file size header
while len(data_buf) < 4 and time.monotonic() < deadline:
chunk = ser.read(4 - len(data_buf))
if chunk:
data_buf += chunk
if len(data_buf) < 4:
return {"error": "Failed to read file size header"}
file_size = struct.unpack_from("<I", data_buf, 0)[0]
data_buf = data_buf[4:]
# Read file data
while len(data_buf) < file_size and time.monotonic() < deadline:
remaining = file_size - len(data_buf)
chunk = ser.read(min(remaining, 4096))
if chunk:
data_buf += chunk
# Drain trailing prompt
trailing = b""
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
trailing += chunk
if b"ch> " in trailing or not chunk:
break
return {
"filename": filename,
"size": file_size,
"data_base64": base64.b64encode(data_buf[:file_size]).decode("ascii"),
}
finally:
ser.timeout = old_timeout
def sd_delete(self, filename: str) -> dict:
"""Delete a file from the SD card.
Args:
filename: Name of the file to delete
"""
self._ensure_connected()
if not self._has_capability("sd_delete"):
return {"error": "SD card commands not supported by this firmware/hardware"}
lines = self._protocol.send_text_command(f"sd_delete {filename}", timeout=10.0)
success = any("ok" in line.lower() for line in lines)
return {"filename": filename, "deleted": success, "response": lines}
def time(
self,
field: str | None = None,
value: int | None = None,
) -> dict:
"""Get or set RTC (real-time clock) time.
With no args, returns current date/time. With field + value, sets
a specific time component.
Args:
field: Time field to set: y, m, d, h, min, sec, or ppm
value: Value for the field (0-99 for date/time, float for ppm)
"""
self._ensure_connected()
if not self._has_capability("time"):
return {"error": "RTC (time) command not supported by this firmware/hardware"}
if field is not None and value is not None:
lines = self._protocol.send_text_command(f"time {field} {value}")
return {"field": field, "value": value, "response": lines}
lines = self._protocol.send_text_command("time")
# Response format: "20YY/MM/DD HH:MM:SS\nusage: ..."
for line in lines:
line = line.strip()
if "/" in line and ":" in line and "usage" not in line.lower():
return {"datetime": line}
return {"response": lines}
# -- Debug / Diagnostic tools --
def i2c(self, page: int, reg: int, data: int) -> dict:
"""Write to an I2C register on the TLV320AIC3204 audio codec.
Low-level diagnostic tool for direct codec register access.
Args:
page: I2C register page number
reg: Register address within the page
data: Byte value to write (0-255)
"""
self._ensure_connected()
if not self._has_capability("i2c"):
return {"error": "i2c command not supported by this firmware"}
lines = self._protocol.send_text_command(f"i2c {page} {reg} {data}")
return {"page": page, "reg": reg, "data": data, "response": lines}
def si(self, reg: int, value: int) -> dict:
"""Write to a Si5351 frequency synthesizer register.
Low-level diagnostic tool for direct Si5351 register access.
Args:
reg: Si5351 register address
value: Byte value to write (0-255)
"""
self._ensure_connected()
if not self._has_capability("si"):
return {"error": "si (Si5351 register) command not supported by this firmware"}
lines = self._protocol.send_text_command(f"si {reg} {value}")
return {"reg": reg, "value": value, "response": lines}
def lcd(self, register: int, data_bytes: list[int] | None = None) -> dict:
"""Send a register command to the LCD display controller.
Low-level diagnostic tool. First arg is the register/command byte,
remaining are data bytes.
Args:
register: LCD register/command byte
data_bytes: Additional data bytes (list of ints)
"""
self._ensure_connected()
if not self._has_capability("lcd"):
return {"error": "lcd command not supported by this firmware"}
parts = [str(register)]
if data_bytes:
parts.extend(str(d) for d in data_bytes)
lines = self._protocol.send_text_command(f"lcd {' '.join(parts)}")
# Response: "ret = 0x..."
for line in lines:
if "ret" in line.lower():
return {"register": register, "data_bytes": data_bytes or [], "result": line.strip()}
return {"register": register, "data_bytes": data_bytes or [], "response": lines}
def threads(self) -> dict:
"""List ChibiOS RTOS thread information.
Shows all running threads with their stack usage, priority, and state.
Useful for diagnosing firmware issues.
"""
self._ensure_connected()
if not self._has_capability("threads"):
return {"error": "threads command not supported by this firmware"}
lines = self._protocol.send_text_command("threads")
threads = []
for line in lines:
line = line.strip()
if not line or "stklimit" in line.lower():
continue # Skip header
parts = line.split("|")
if len(parts) >= 8:
threads.append({
"stk_limit": parts[0].strip(),
"stack": parts[1].strip(),
"stk_free": parts[2].strip(),
"addr": parts[3].strip(),
"refs": parts[4].strip(),
"prio": parts[5].strip(),
"state": parts[6].strip(),
"name": parts[7].strip(),
})
return {"threads": threads, "count": len(threads)}
def stat(self) -> dict:
"""Get audio ADC statistics for both channels.
Returns average and RMS values for the reference and signal channels
on both ports. Useful for diagnosing signal level issues.
"""
self._ensure_connected()
if not self._has_capability("stat"):
return {"error": "stat command not supported by this firmware"}
lines = self._protocol.send_text_command("stat", timeout=10.0)
channels = []
current_ch = {}
for line in lines:
line = line.strip()
if line.startswith("Ch:"):
if current_ch:
channels.append(current_ch)
current_ch = {"channel": line.split(":")[1].strip()}
elif "average:" in line.lower():
nums = re.findall(r"-?\d+", line)
if len(nums) >= 2:
current_ch["average_ref"] = int(nums[0])
current_ch["average_signal"] = int(nums[1])
elif "rms:" in line.lower():
nums = re.findall(r"-?\d+", line)
if len(nums) >= 2:
current_ch["rms_ref"] = int(nums[0])
current_ch["rms_signal"] = int(nums[1])
if current_ch:
channels.append(current_ch)
return {"channels": channels}
def sample(self, mode: str) -> dict:
"""Set the ADC sample capture function.
Controls what data the sample command captures from the audio ADC.
Args:
mode: Sample mode 'gamma' (complex reflection), 'ampl' (amplitude),
or 'ref' (reference amplitude)
"""
self._ensure_connected()
if not self._has_capability("sample"):
return {"error": "sample command not supported by this firmware"}
valid = {"gamma", "ampl", "ref"}
if mode not in valid:
return {"error": f"Invalid mode '{mode}'. Valid: {', '.join(sorted(valid))}"}
lines = self._protocol.send_text_command(f"sample {mode}")
return {"mode": mode, "response": lines}
def test(self) -> dict:
"""Run hardware self-test.
Executes built-in hardware diagnostics. The specific tests depend
on the firmware build configuration.
"""
self._ensure_connected()
if not self._has_capability("test"):
return {"error": "test command not supported by this firmware"}
lines = self._protocol.send_text_command("test", timeout=30.0)
return {"response": lines}
def gain(self, lgain: int, rgain: int | None = None) -> dict:
"""Set audio codec gain levels.
Controls the TLV320AIC3204 PGA (Programmable Gain Amplifier).
Args:
lgain: Left channel gain (0-95, in 0.5 dB steps)
rgain: Right channel gain (0-95). If omitted, uses lgain for both.
"""
self._ensure_connected()
if not self._has_capability("gain"):
return {"error": "gain command not supported by this firmware"}
if rgain is not None:
lines = self._protocol.send_text_command(f"gain {lgain} {rgain}")
return {"lgain": lgain, "rgain": rgain, "response": lines}
lines = self._protocol.send_text_command(f"gain {lgain}")
return {"lgain": lgain, "rgain": lgain, "response": lines}
def dump(self, channel: int = 0) -> dict:
"""Dump raw audio ADC samples.
Captures and returns raw sample data from the audio buffer.
Useful for signal analysis and debugging.
Args:
channel: Audio channel to dump (0 or 1)
"""
self._ensure_connected()
if not self._has_capability("dump"):
return {"error": "dump command not supported by this firmware"}
lines = self._protocol.send_text_command(f"dump {channel}", timeout=10.0)
samples = []
for line in lines:
for val in line.strip().split():
try:
samples.append(int(val))
except ValueError:
pass
return {"channel": channel, "samples": samples, "count": len(samples)}
def port(self, port_num: int) -> dict:
"""Select the active audio port (TX or RX).
Switches the TLV320AIC3204 codec between transmit and receive paths.
Args:
port_num: Port number (0=TX, 1=RX)
"""
self._ensure_connected()
if not self._has_capability("port"):
return {"error": "port command not supported by this firmware"}
lines = self._protocol.send_text_command(f"port {port_num}")
return {"port": port_num, "description": "TX" if port_num == 0 else "RX", "response": lines}
def offset(self, frequency_hz: int | None = None) -> dict:
"""Get or set the variable IF frequency offset.
Adjusts the intermediate frequency offset used in the measurement
pipeline. Only available on firmware builds with USE_VARIABLE_OFFSET.
Args:
frequency_hz: IF offset frequency in Hz
"""
self._ensure_connected()
if not self._has_capability("offset"):
return {"error": "offset command not supported by this firmware"}
if frequency_hz is not None:
lines = self._protocol.send_text_command(f"offset {frequency_hz}")
return {"offset_hz": frequency_hz, "set": True, "response": lines}
lines = self._protocol.send_text_command("offset")
for line in lines:
if "current:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
return {"offset_hz": int(parts[1].strip())}
except ValueError:
pass
return {"response": lines}
def dac(self, value: int | None = None) -> dict:
"""Get or set the DAC output value.
Controls the on-chip DAC (used for bias voltage or other analog output).
Range is 0-4095 (12-bit).
Args:
value: DAC value (0-4095)
"""
self._ensure_connected()
if not self._has_capability("dac"):
return {"error": "dac command not supported by this firmware/hardware"}
if value is not None:
lines = self._protocol.send_text_command(f"dac {value}")
return {"dac_value": value, "set": True, "response": lines}
lines = self._protocol.send_text_command("dac")
for line in lines:
if "current:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
return {"dac_value": int(parts[1].strip())}
except ValueError:
pass
return {"response": lines}
def usart_cfg(self, baudrate: int | None = None) -> dict:
"""Get or set the USART serial port configuration.
Controls the secondary serial port (USART) baud rate. The USART can
be used for external device communication or serial console.
Args:
baudrate: Baud rate (minimum 300). Omit to query current setting.
"""
self._ensure_connected()
if not self._has_capability("usart_cfg"):
return {"error": "usart_cfg command not supported by this firmware"}
if baudrate is not None:
lines = self._protocol.send_text_command(f"usart_cfg {baudrate}")
return {"baudrate": baudrate, "set": True, "response": lines}
lines = self._protocol.send_text_command("usart_cfg")
# Response: "Serial: <baud> baud"
for line in lines:
if "baud" in line.lower():
m = re.search(r"(\d+)\s*baud", line, re.IGNORECASE)
if m:
return {"baudrate": int(m.group(1))}
return {"response": lines}
def usart(self, data: str, timeout_ms: int = 200) -> dict:
"""Send data through the USART serial port and read the response.
Forwards data to an external device connected to the USART port
and returns any response received within the timeout period.
Args:
data: String data to send
timeout_ms: Response timeout in milliseconds (default 200)
"""
self._ensure_connected()
if not self._has_capability("usart"):
return {"error": "usart command not supported by this firmware"}
lines = self._protocol.send_text_command(f"usart {data} {timeout_ms}", timeout=5.0)
return {"sent": data, "timeout_ms": timeout_ms, "response": lines}
def band(self, index: int, param: str, value: int) -> dict:
"""Configure frequency band parameters for the Si5351 synthesizer.
Low-level control of per-band synthesizer settings. Parameters include
mode, frequency, divider, multiplier, and power settings.
Args:
index: Band index
param: Parameter name mode, freq, div, mul, omul, pow, opow, l, r, lr, adj
value: Parameter value
"""
self._ensure_connected()
if not self._has_capability("b"):
return {"error": "band (b) command not supported by this firmware"}
lines = self._protocol.send_text_command(f"b {index} {param} {value}")
return {"index": index, "param": param, "value": value, "response": lines}
# ── Convenience: analyze scan data server-side ────────────────────
def analyze(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
s11: bool = True,
s21: bool = False,
) -> dict:
"""Run a scan and return comprehensive S-parameter analysis.
Combines the scan tool with server-side calculations to produce
a full measurement report including SWR, impedance, bandwidth,
return loss, and reactive components without the LLM needing
to do the math.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
s11: Include S11 reflection analysis (default True)
s21: Include S21 transmission analysis (default False)
"""
from mcnanovna.calculations import analyze_scan
scan_result = self.scan(start_hz, stop_hz, points, s11=s11, s21=s21)
if "error" in scan_result:
return scan_result
analysis = analyze_scan(scan_result["data"])
analysis["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
"binary": scan_result.get("binary", False),
}
return analysis

343
src/mcnanovna/prompts.py Normal file
View File

@ -0,0 +1,343 @@
"""FastMCP prompts for guided NanoVNA workflows.
Prompts are conversation templates that guide the LLM through multi-step
procedures like calibration, Touchstone export, and antenna analysis.
They return lists of messages that set up context for the task.
"""
from __future__ import annotations
from fastmcp import FastMCP
from fastmcp.prompts import Message
HAM_BANDS = {
"160m": (1_800_000, 2_000_000),
"80m": (3_500_000, 4_000_000),
"60m": (5_330_500, 5_403_500),
"40m": (7_000_000, 7_300_000),
"30m": (10_100_000, 10_150_000),
"20m": (14_000_000, 14_350_000),
"17m": (18_068_000, 18_168_000),
"15m": (21_000_000, 21_450_000),
"12m": (24_890_000, 24_990_000),
"10m": (28_000_000, 29_700_000),
"6m": (50_000_000, 54_000_000),
"2m": (144_000_000, 148_000_000),
"70cm": (420_000_000, 450_000_000),
"23cm": (1_240_000_000, 1_300_000_000),
"full": (50_000, 900_000_000),
"hf": (1_800_000, 30_000_000),
"vhf": (50_000_000, 300_000_000),
"uhf": (300_000_000, 1_000_000_000),
}
def _format_freq(hz: int) -> str:
"""Format frequency for display."""
if hz >= 1_000_000_000:
return f"{hz / 1e9:.3f} GHz"
if hz >= 1_000_000:
return f"{hz / 1e6:.3f} MHz"
if hz >= 1_000:
return f"{hz / 1e3:.1f} kHz"
return f"{hz} Hz"
def register_prompts(mcp: FastMCP) -> None:
"""Register all NanoVNA workflow prompts on the FastMCP server."""
@mcp.prompt
def calibrate(
band: str = "full",
start_hz: int | None = None,
stop_hz: int | None = None,
points: int = 101,
save_slot: int = 0,
) -> list[Message]:
"""Guide through full SOLT calibration for a frequency band.
Walks through Short-Open-Load-Thru calibration with optional
isolation step. Can specify a ham band name or explicit frequencies.
Args:
band: Band name (e.g., '2m', '70cm', 'hf', 'full') or 'custom'
start_hz: Start frequency in Hz (overrides band preset)
stop_hz: Stop frequency in Hz (overrides band preset)
points: Number of sweep points
save_slot: Flash slot to save calibration (0-4 for H, 0-6 for H4)
"""
if start_hz is not None and stop_hz is not None:
f_start, f_stop = start_hz, stop_hz
band_label = f"Custom ({_format_freq(f_start)} {_format_freq(f_stop)})"
elif band in HAM_BANDS:
f_start, f_stop = HAM_BANDS[band]
band_label = f"{band.upper()} ({_format_freq(f_start)} {_format_freq(f_stop)})"
else:
f_start, f_stop = HAM_BANDS["full"]
band_label = f"Full range ({_format_freq(f_start)} {_format_freq(f_stop)})"
return [
Message(
role="user",
content=(
f"I need to calibrate my NanoVNA for the {band_label} band "
f"with {points} points. Save to slot {save_slot} when done."
),
),
Message(
role="assistant",
content=f"""I'll guide you through a full SOLT calibration. Here's the plan:
**Band**: {band_label}
**Points**: {points}
**Save slot**: {save_slot}
You'll need these calibration standards:
- 50\u03a9 load termination
- Open standard (or leave SMA connector open)
- Short standard
- Through cable (SMA male-male or appropriate adapter)
- Two 50\u03a9 loads for isolation step (optional but recommended)
**Step 1**: First, I'll set the sweep range to {_format_freq(f_start)} {_format_freq(f_stop)} with {points} points using the `sweep` tool.
**Step 2**: Connect the **50\u03a9 LOAD** to Port 1 (CH0). Tell me when it's connected, and I'll run `cal load`.
**Step 3**: Connect the **OPEN** standard to Port 1. I'll run `cal open`.
**Step 4**: Connect the **SHORT** standard to Port 1. I'll run `cal short`.
**Step 5**: Connect the **THROUGH** cable between Port 1 and Port 2. I'll run `cal thru`.
**Step 6** (optional): Connect 50\u03a9 loads to **both** ports. I'll run `cal isoln`.
**Step 7**: I'll run `cal done` to compute the error correction terms.
**Step 8**: Finally, I'll save to slot {save_slot} with the `save` tool.
Let me start by setting the sweep range. Ready?""",
),
]
@mcp.prompt
def export_touchstone(
start_hz: int = 1_000_000,
stop_hz: int = 900_000_000,
points: int = 101,
format: str = "s1p",
) -> list[Message]:
"""Guide through exporting S-parameters to Touchstone file format.
Runs a scan and formats results as .s1p (reflection only) or
.s2p (reflection + transmission) Touchstone content per IEEE Std 1363.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points
format: Output format 's1p' (S11 only) or 's2p' (S11 + S21)
"""
s2p = format.lower() == "s2p"
file_ext = ".s2p" if s2p else ".s1p"
return [
Message(
role="user",
content=(
f"Export a Touchstone {file_ext} file from "
f"{_format_freq(start_hz)} to {_format_freq(stop_hz)} "
f"with {points} points."
),
),
Message(
role="assistant",
content=f"""I'll run a scan and format the results as a Touchstone {file_ext} file.
**Scan range**: {_format_freq(start_hz)} {_format_freq(stop_hz)}
**Points**: {points}
**Format**: {'S2P (S11 + S21 — requires DUT connected between ports)' if s2p else 'S1P (S11 reflection only)'}
**Touchstone format reference** (IEEE Std 1363):
```
! NanoVNA-H measurement
# Hz S RI R 50
! freq {'s11_re s11_im s21_re s21_im s12_re s12_im s22_re s22_im' if s2p else 's11_re s11_im'}
```
I'll:
1. Run a `scan` from {start_hz} to {stop_hz} with {points} points{', capturing both S11 and S21' if s2p else ', S11 only'}
2. Format each data point as real/imaginary pairs
3. {'For S2P: S12 and S22 will be set to 0+j0 (NanoVNA is a 1-port/2-port device that measures S11 and S21 only)' if s2p else 'Each line: frequency S11_real S11_imag'}
4. Present the complete file content for you to save
Let me run the scan now.""",
),
]
@mcp.prompt
def analyze_antenna(
band: str = "2m",
start_hz: int | None = None,
stop_hz: int | None = None,
points: int = 101,
) -> list[Message]:
"""Guide through antenna analysis with SWR, impedance, and bandwidth.
Scans the antenna's frequency range and computes resonant frequency,
SWR at resonance, bandwidth, impedance, and return loss.
Args:
band: Ham band name (e.g., '2m', '70cm', '20m') or 'custom'
start_hz: Start frequency in Hz (overrides band)
stop_hz: Stop frequency in Hz (overrides band)
points: Number of measurement points
"""
if start_hz is not None and stop_hz is not None:
f_start, f_stop = start_hz, stop_hz
band_label = f"Custom ({_format_freq(f_start)} {_format_freq(f_stop)})"
elif band in HAM_BANDS:
f_start, f_stop = HAM_BANDS[band]
band_label = f"{band.upper()} band"
else:
f_start, f_stop = HAM_BANDS["2m"]
band_label = "2M band"
return [
Message(
role="user",
content=(
f"Analyze my antenna on the {band_label} "
f"({_format_freq(f_start)} {_format_freq(f_stop)})."
),
),
Message(
role="assistant",
content=f"""I'll perform a comprehensive antenna analysis for the {band_label}.
**What I'll measure:**
- Resonant frequency (minimum return loss / SWR)
- SWR at resonance and across the band
- Bandwidth where SWR < 2:1
- Complex impedance at resonance (R + jX)
- Return loss in dB
- Reactive component (capacitive/inductive) away from resonance
**Procedure:**
1. Ensure your antenna is connected to Port 1 (CH0)
2. Run an S11 scan from {_format_freq(f_start)} to {_format_freq(f_stop)} with {points} points
3. Use the `calculations.analyze_scan()` utility to compute all metrics
4. Present results with actionable insights
**Tips for accurate measurements:**
- Use a calibrated setup (run `calibrate` prompt first if needed)
- Keep feedline away from the antenna under test
- Ensure connectors are tight and clean
Let me run the scan and analysis now.""",
),
]
@mcp.prompt
def measure_cable(
start_hz: int = 100_000,
stop_hz: int = 300_000_000,
points: int = 201,
) -> list[Message]:
"""Guide through cable and transmission line analysis.
Measures cable characteristics from S11: velocity factor,
cable loss, impedance profile, and distance to fault.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points
"""
return [
Message(
role="user",
content=(
f"Analyze a cable/transmission line from "
f"{_format_freq(start_hz)} to {_format_freq(stop_hz)}."
),
),
Message(
role="assistant",
content=f"""I'll measure your cable's characteristics using S11 reflection data.
**What I'll determine:**
- **Cable impedance** from the Smith chart / impedance data
- **Cable loss** at various frequencies (from return loss with open/short termination)
- **Electrical length** from the phase slope
- **Velocity factor** if physical length is known
- **Distance to fault** using time-domain transform (if supported)
**Setup for cable analysis:**
1. Connect one end of the cable to Port 1 (CH0)
2. **Leave the far end OPEN** for the first measurement
3. I'll scan S11 from {_format_freq(start_hz)} to {_format_freq(stop_hz)} with {points} points
**For complete cable characterization:**
- First sweep: far end **open** (measures impedance + electrical length)
- Second sweep: far end **shorted** (helps determine cable loss)
- Third sweep: far end terminated in **50\u03a9** (verifies cable impedance)
**Advanced**: If your firmware supports `transform`, I can also enable time-domain
mode to show distance-to-fault as a TDR (Time Domain Reflectometer).
Let me start with the open-ended measurement. Is the cable connected with the far end open?""",
),
]
@mcp.prompt
def compare_sweeps(
start_hz: int = 1_000_000,
stop_hz: int = 900_000_000,
points: int = 101,
) -> list[Message]:
"""Guide through before/after sweep comparison.
Takes two measurements and produces a delta comparison showing
changes in return loss, SWR, and impedance.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points
"""
return [
Message(
role="user",
content=(
f"I want to compare two sweeps (before/after) from "
f"{_format_freq(start_hz)} to {_format_freq(stop_hz)}."
),
),
Message(
role="assistant",
content=f"""I'll take two measurements and compare them to show what changed.
**Use cases:**
- Before/after a filter modification
- Comparing two antennas
- Checking the effect of a matching network
- Verifying a repair
**Procedure:**
1. Set up the **"before"** condition (e.g., original antenna, unmodified filter)
2. I'll run scan #1 from {_format_freq(start_hz)} to {_format_freq(stop_hz)} with {points} points
3. You make your change (swap antenna, add matching network, etc.)
4. I'll run scan #2 with identical settings
5. I'll compute and display deltas for:
- Return loss (dB change at each frequency)
- SWR improvement/degradation
- Impedance shift
- Bandwidth change
**Important**: Both scans must use the same frequency range and point count
for a valid comparison. Don't change calibration between scans.
Set up the **"before"** condition and tell me when ready for scan #1.""",
),
]

View File

@ -1,6 +1,6 @@
"""FastMCP server for NanoVNA-H vector network analyzers.
Registers all NanoVNA tool methods and starts the MCP server.
Registers all NanoVNA tool methods, prompts, and starts the MCP server.
"""
from __future__ import annotations
@ -9,13 +9,31 @@ from fastmcp import FastMCP
from fastmcp.tools.tool import FunctionTool
from mcnanovna.nanovna import NanoVNA
from mcnanovna.prompts import register_prompts
# All public methods on NanoVNA that should become MCP tools
# All public methods on NanoVNA that should become MCP tools.
# Grouped by category for maintainability.
_TOOL_METHODS = [
# Tier 1: Essential measurement & control
"info", "sweep", "scan", "data", "frequencies", "marker", "cal",
"save", "recall", "pause", "resume", "power", "bandwidth", "edelay",
"s21offset", "vbat", "capture", "trace", "transform", "smooth",
"threshold", "reset", "version", "detect", "disconnect", "raw_command", "cw",
"save", "recall", "pause", "resume",
# Tier 2: Configuration
"power", "bandwidth", "edelay", "s21offset", "vbat", "capture",
# Tier 3: Advanced
"trace", "transform", "smooth", "threshold", "reset", "version",
"detect", "disconnect", "raw_command", "cw",
# Phase 1 additions: Essential commands
"measure", "config", "saveconfig", "clearconfig", "color", "freq",
"tcxo", "vbat_offset",
# Phase 1 additions: Touch / Remote Desktop
"touchcal", "touchtest", "refresh", "touch", "release",
# Phase 1 additions: SD Card storage
"sd_list", "sd_read", "sd_delete", "time",
# Phase 1 additions: Debug / Diagnostic
"i2c", "si", "lcd", "threads", "stat", "sample", "test",
"gain", "dump", "port", "offset", "dac", "usart_cfg", "usart", "band",
# Convenience: server-side analysis
"analyze",
]
@ -26,7 +44,11 @@ def create_server() -> FastMCP:
"MCP server for controlling NanoVNA-H vector network analyzers via USB serial. "
"Tools provide structured access to sweep measurements, S-parameter data, "
"calibration, markers, and device configuration. The device auto-connects "
"on first tool call — just plug in and go."
"on first tool call — just plug in and go.\n\n"
"Use the 'analyze' tool for comprehensive measurement reports with SWR, "
"impedance, bandwidth, and reactive component analysis built in.\n\n"
"Prompts are available for guided workflows: calibrate, export_touchstone, "
"analyze_antenna, measure_cable, and compare_sweeps."
),
)
vna = NanoVNA()
@ -36,6 +58,8 @@ def create_server() -> FastMCP:
tool = FunctionTool.from_function(bound_method)
mcp.add_tool(tool)
register_prompts(mcp)
return mcp