Initial implementation of mcnanovna FastMCP server

NanoVNA-H MCP server with 27 tools for VNA control via USB serial.
Protocol layer handles text + binary scan data parsing, three-phase
initialization sequence, and automatic device discovery (VID 0x0483).

Tools: info, sweep, scan, data, frequencies, marker, cal, save, recall,
pause, resume, power, bandwidth, edelay, s21offset, vbat, capture, trace,
transform, smooth, threshold, reset, version, detect, disconnect,
raw_command, cw
This commit is contained in:
Ryan Malloy 2026-01-30 12:27:47 -07:00
commit eaa4f1d977
8 changed files with 2906 additions and 0 deletions

24
pyproject.toml Normal file
View File

@ -0,0 +1,24 @@
[project]
name = "mcnanovna"
version = "2026.01.30"
description = "MCP server for NanoVNA-H vector network analyzers"
authors = [{name = "Ryan Malloy", email = "ryan@supported.systems"}]
requires-python = ">=3.11"
dependencies = [
"fastmcp>=2.14.0",
"pyserial>=3.5",
]
[project.scripts]
mcnanovna = "mcnanovna.server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/mcnanovna"]
[tool.ruff]
target-version = "py311"
line-length = 120

View File

@ -0,0 +1 @@
"""mcnanovna — MCP server for NanoVNA-H vector network analyzers."""

View File

@ -0,0 +1,5 @@
"""Allow running as: python -m mcnanovna"""
from mcnanovna.server import main
main()

View File

@ -0,0 +1,46 @@
"""USB device discovery for NanoVNA-H devices.
Scans serial ports for STM32 CDC ACM devices matching the NanoVNA VID:PID.
"""
from __future__ import annotations
from dataclasses import dataclass
import serial.tools.list_ports
# STMicroelectronics USB CDC ACM — used by NanoVNA-H, H4, H7
NANOVNA_VID = 0x0483
NANOVNA_PID = 0x5740
@dataclass
class NanoVNAPort:
device: str
vid: int
pid: int
serial_number: str | None
description: str
def find_nanovna_ports() -> list[NanoVNAPort]:
"""Scan USB serial ports for NanoVNA devices (VID 0x0483, PID 0x5740)."""
found = []
for port in serial.tools.list_ports.comports():
if port.vid == NANOVNA_VID and port.pid == NANOVNA_PID:
found.append(
NanoVNAPort(
device=port.device,
vid=port.vid,
pid=port.pid,
serial_number=port.serial_number,
description=port.description or "NanoVNA",
)
)
return found
def find_first_nanovna() -> NanoVNAPort | None:
"""Return the first detected NanoVNA port, or None."""
ports = find_nanovna_ports()
return ports[0] if ports else None

660
src/mcnanovna/nanovna.py Normal file
View File

@ -0,0 +1,660 @@
"""NanoVNA tool class — all MCP tool methods for controlling a NanoVNA-H.
Each public method becomes an MCP tool via FunctionTool.from_function()
in server.py. The NanoVNA class manages connection lifecycle with lazy auto-connect.
"""
from __future__ import annotations
import base64
from mcnanovna.discovery import find_first_nanovna, find_nanovna_ports
from mcnanovna.protocol import (
SCAN_MASK_BINARY,
SCAN_MASK_NO_CALIBRATION,
SCAN_MASK_OUT_DATA0,
SCAN_MASK_OUT_DATA1,
SCAN_MASK_OUT_FREQ,
NanoVNAConnectionError,
NanoVNAProtocol,
NanoVNAProtocolError,
parse_float_pairs,
parse_frequencies,
parse_scan_binary,
parse_scan_text,
)
# Channel name mapping for the data command
CHANNEL_NAMES = {
0: "S11 (measured)",
1: "S21 (measured)",
2: "ETERM_ED (directivity)",
3: "ETERM_ES (source match)",
4: "ETERM_ER (reflection tracking)",
5: "ETERM_ET (transmission tracking)",
6: "ETERM_EX (isolation)",
}
POWER_DESCRIPTIONS = {
0: "2mA Si5351 drive",
1: "4mA Si5351 drive",
2: "6mA Si5351 drive",
3: "8mA Si5351 drive",
255: "auto",
}
class NanoVNA:
"""MCP tool class for NanoVNA-H vector network analyzers.
Manages a serial connection with lazy auto-connect: the first tool
call that needs hardware triggers USB discovery and initialization.
"""
def __init__(self) -> None:
self._protocol = NanoVNAProtocol()
self._port: str | None = None
def _ensure_connected(self) -> None:
"""Auto-connect on first use, or reconnect if dropped."""
if self._protocol.connected:
return
port_info = find_first_nanovna()
if port_info is None:
raise NanoVNAConnectionError(
"No NanoVNA detected. Connect via USB and check that the device "
"appears as a serial port (VID 0x0483, PID 0x5740)."
)
self._port = port_info.device
self._protocol.open(self._port)
self._protocol.initialize()
def _has_capability(self, cmd: str) -> bool:
return cmd in self._protocol.device_info.capabilities
# ── Tier 1: Essential tools ────────────────────────────────────────
def info(self) -> dict:
"""Get NanoVNA device information: board, firmware version, capabilities, display size, and hardware parameters."""
self._ensure_connected()
di = self._protocol.device_info
return {
"board": di.board,
"version": di.version,
"max_points": di.max_points,
"if_hz": di.if_hz,
"adc_hz": di.adc_hz,
"lcd_width": di.lcd_width,
"lcd_height": di.lcd_height,
"architecture": di.architecture,
"platform": di.platform,
"build_time": di.build_time,
"capabilities": di.capabilities,
"port": self._port,
}
def sweep(
self,
start_hz: int | None = None,
stop_hz: int | None = None,
points: int | None = None,
) -> dict:
"""Get or set the sweep frequency range. With no args, returns current settings. With args, sets new sweep parameters.
Args:
start_hz: Start frequency in Hz (e.g. 50000 for 50 kHz)
stop_hz: Stop frequency in Hz (e.g. 900000000 for 900 MHz)
points: Number of sweep points (max depends on hardware: 101 or 401)
"""
self._ensure_connected()
if start_hz is not None:
parts = [str(start_hz)]
if stop_hz is not None:
parts.append(str(stop_hz))
if points is not None:
parts.append(str(points))
self._protocol.send_text_command(f"sweep {' '.join(parts)}")
lines = self._protocol.send_text_command("sweep")
# Response: "start_hz stop_hz points"
if lines:
parts = lines[0].strip().split()
if len(parts) >= 3:
return {
"start_hz": int(parts[0]),
"stop_hz": int(parts[1]),
"points": int(parts[2]),
}
return {"start_hz": 0, "stop_hz": 0, "points": 0}
def scan(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
s11: bool = True,
s21: bool = True,
apply_cal: bool = True,
) -> dict:
"""Perform a frequency sweep and return S-parameter measurement data.
This is the primary measurement tool. It configures the sweep range,
triggers acquisition, and returns calibrated S11/S21 complex data.
Args:
start_hz: Start frequency in Hz (min ~600, max 2000000000)
stop_hz: Stop frequency in Hz
points: Number of measurement points (1 to device max, typically 101 or 401)
s11: Include S11 reflection data
s21: Include S21 transmission data
apply_cal: Apply stored calibration correction (set False for raw data)
"""
self._ensure_connected()
mask = SCAN_MASK_OUT_FREQ
if s11:
mask |= SCAN_MASK_OUT_DATA0
if s21:
mask |= SCAN_MASK_OUT_DATA1
if not apply_cal:
mask |= SCAN_MASK_NO_CALIBRATION
use_binary = self._has_capability("scan_bin")
if use_binary:
binary_mask = mask | SCAN_MASK_BINARY
rx_mask, rx_points, raw = self._protocol.send_binary_scan(
start_hz, stop_hz, points, binary_mask
)
scan_points = parse_scan_binary(rx_mask, rx_points, raw)
else:
lines = self._protocol.send_text_command(
f"scan {start_hz} {stop_hz} {points} {mask}",
timeout=30.0,
)
scan_points = parse_scan_text(lines, mask)
data = []
for pt in scan_points:
entry: dict = {}
if pt.frequency_hz is not None:
entry["frequency_hz"] = pt.frequency_hz
if pt.s11 is not None:
entry["s11"] = {"real": pt.s11[0], "imag": pt.s11[1]}
if pt.s21 is not None:
entry["s21"] = {"real": pt.s21[0], "imag": pt.s21[1]}
data.append(entry)
return {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": len(data),
"binary": use_binary,
"mask": mask,
"data": data,
}
def data(self, channel: int = 0) -> dict:
"""Read measurement or calibration data arrays from device memory.
Channels: 0=S11 measured, 1=S21 measured, 2=directivity, 3=source match,
4=reflection tracking, 5=transmission tracking, 6=isolation.
Args:
channel: Data array index (0-6)
"""
self._ensure_connected()
if channel < 0 or channel > 6:
return {"error": "Channel must be 0-6"}
lines = self._protocol.send_text_command(f"data {channel}")
pairs = parse_float_pairs(lines)
return {
"channel": channel,
"channel_name": CHANNEL_NAMES.get(channel, f"channel {channel}"),
"points": len(pairs),
"data": [{"real": r, "imag": i} for r, i in pairs],
}
def frequencies(self) -> dict:
"""Get the list of frequency points for the current sweep configuration."""
self._ensure_connected()
lines = self._protocol.send_text_command("frequencies")
freqs = parse_frequencies(lines)
return {"count": len(freqs), "frequencies_hz": freqs}
def marker(
self,
number: int | None = None,
action: str | None = None,
index: int | None = None,
) -> dict:
"""Query or control markers on the NanoVNA display.
With no args, lists all active markers. With number + action, controls a specific marker.
Args:
number: Marker number (1-8)
action: Action to perform: 'on', 'off', or omit to query
index: Set marker to this sweep point index
"""
self._ensure_connected()
if number is not None:
if action is not None:
self._protocol.send_text_command(f"marker {number} {action}")
elif index is not None:
self._protocol.send_text_command(f"marker {number} {index}")
lines = self._protocol.send_text_command("marker")
markers = []
for line in lines:
parts = line.strip().split()
if len(parts) >= 3:
try:
markers.append({
"id": int(parts[0]),
"index": int(parts[1]),
"frequency_hz": int(parts[2]),
})
except ValueError:
pass
return {"markers": markers}
def cal(self, step: str | None = None) -> dict:
"""Query calibration status or perform a calibration step.
Steps: 'load', 'open', 'short', 'thru', 'isoln', 'done', 'on', 'off', 'reset'.
With no args, returns current calibration status.
Args:
step: Calibration step to execute
"""
self._ensure_connected()
if step is not None:
valid = {"load", "open", "short", "thru", "isoln", "done", "on", "off", "reset"}
if step not in valid:
return {"error": f"Invalid step '{step}'. Valid: {', '.join(sorted(valid))}"}
lines = self._protocol.send_text_command(f"cal {step}", timeout=10.0)
return {"step": step, "response": lines}
lines = self._protocol.send_text_command("cal")
return {"status": lines}
def save(self, slot: int) -> dict:
"""Save current calibration and configuration to a flash memory slot.
Args:
slot: Save slot number (0-4 on NanoVNA-H, 0-6 on NanoVNA-H4)
"""
self._ensure_connected()
self._protocol.send_text_command(f"save {slot}")
return {"slot": slot, "saved": True}
def recall(self, slot: int) -> dict:
"""Recall calibration and configuration from a flash memory slot.
Args:
slot: Save slot number (0-4 on NanoVNA-H, 0-6 on NanoVNA-H4)
"""
self._ensure_connected()
self._protocol.send_text_command(f"recall {slot}", timeout=5.0)
return {"slot": slot, "recalled": True}
def pause(self) -> dict:
"""Pause the continuous sweep. Measurements freeze at current values."""
self._ensure_connected()
self._protocol.send_text_command("pause")
return {"sweep": "paused"}
def resume(self) -> dict:
"""Resume continuous sweep after pause."""
self._ensure_connected()
self._protocol.send_text_command("resume")
return {"sweep": "running"}
# ── Tier 2: Configuration tools ────────────────────────────────────
def power(self, level: int | None = None) -> dict:
"""Get or set RF output power level.
Args:
level: Power level (0=2mA, 1=4mA, 2=6mA, 3=8mA Si5351 drive, 255=auto)
"""
self._ensure_connected()
if level is not None:
self._protocol.send_text_command(f"power {level}")
lines = self._protocol.send_text_command("power")
# Response: "power: N"
for line in lines:
if "power" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
val = int(parts[1].strip())
return {
"power": val,
"description": POWER_DESCRIPTIONS.get(val, f"level {val}"),
}
except ValueError:
pass
return {"power": level if level is not None else -1, "description": "unknown", "raw": lines}
def bandwidth(self, bw_hz: int | None = None) -> dict:
"""Get or set the IF bandwidth (affects measurement speed vs noise floor).
Args:
bw_hz: Bandwidth in Hz, or bandwidth divider value. Lower = slower but more accurate.
"""
self._ensure_connected()
if not self._has_capability("bandwidth"):
return {"error": "bandwidth command not supported by this firmware"}
if bw_hz is not None:
self._protocol.send_text_command(f"bandwidth {bw_hz}")
lines = self._protocol.send_text_command("bandwidth")
# Response: "bandwidth N (Mhz)" or similar
if lines:
line = lines[0].strip()
# Try to parse "bandwidth <divider> (<freq>Hz)"
import re
m = re.match(r"bandwidth\s+(\d+)\s*\((\d+)\s*Hz\)", line, re.IGNORECASE)
if m:
return {"bandwidth_divider": int(m.group(1)), "bandwidth_hz": int(m.group(2))}
# Fallback: just return raw
return {"raw": line}
return {"bandwidth_divider": 0, "bandwidth_hz": 0}
def edelay(self, seconds: float | None = None) -> dict:
"""Get or set electrical delay compensation in seconds.
Args:
seconds: Electrical delay in seconds (e.g. 1e-9 for 1 nanosecond)
"""
self._ensure_connected()
if seconds is not None:
self._protocol.send_text_command(f"edelay {seconds}")
lines = self._protocol.send_text_command("edelay")
if lines:
try:
return {"edelay_seconds": float(lines[0].strip())}
except ValueError:
return {"raw": lines}
return {"edelay_seconds": 0.0}
def s21offset(self, db: float | None = None) -> dict:
"""Get or set S21 offset correction in dB.
Args:
db: Offset value in dB
"""
self._ensure_connected()
if not self._has_capability("s21offset"):
return {"error": "s21offset command not supported by this firmware"}
if db is not None:
self._protocol.send_text_command(f"s21offset {db}")
lines = self._protocol.send_text_command("s21offset")
if lines:
try:
return {"s21_offset_db": float(lines[0].strip())}
except ValueError:
return {"raw": lines}
return {"s21_offset_db": 0.0}
def vbat(self) -> dict:
"""Read battery voltage in millivolts."""
self._ensure_connected()
lines = self._protocol.send_text_command("vbat")
# Response: "4151 mV"
if lines:
parts = lines[0].strip().split()
if parts:
try:
mv = int(parts[0])
return {"voltage_mv": mv, "voltage_v": round(mv / 1000.0, 3)}
except ValueError:
pass
return {"voltage_mv": 0, "voltage_v": 0.0, "raw": lines}
def capture(self) -> dict:
"""Capture the current LCD screen as RGB565 pixel data (base64 encoded).
Returns width, height, and raw pixel data for rendering. The pixel format
is RGB565 (16-bit, 2 bytes per pixel). Total size = width * height * 2 bytes.
"""
self._ensure_connected()
di = self._protocol.device_info
width = di.lcd_width
height = di.lcd_height
expected_size = width * height * 2
# capture command outputs raw binary RGB565 data after echo line
self._protocol._drain()
self._protocol._send_command("capture")
ser = self._protocol._require_connection()
import time
old_timeout = ser.timeout
ser.timeout = 10.0
try:
buf = b""
deadline = time.monotonic() + 10.0
# Read past echo line
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
buf += chunk
if b"\r\n" in buf:
break
echo_end = buf.index(b"\r\n") + 2
pixel_buf = buf[echo_end:]
# Read pixel data
while len(pixel_buf) < expected_size and time.monotonic() < deadline:
remaining = expected_size - len(pixel_buf)
chunk = ser.read(min(remaining, 4096))
if chunk:
pixel_buf += chunk
# Byte-swap RGB565 (firmware sends native LE, display expects BE)
swapped = bytearray(expected_size)
for i in range(0, min(len(pixel_buf), expected_size), 2):
if i + 1 < len(pixel_buf):
swapped[i] = pixel_buf[i + 1]
swapped[i + 1] = pixel_buf[i]
# Drain trailing prompt
trailing = b""
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
trailing += chunk
if b"ch> " in trailing or not chunk:
break
return {
"format": "rgb565",
"width": width,
"height": height,
"data_length": len(swapped),
"data_base64": base64.b64encode(bytes(swapped)).decode("ascii"),
}
finally:
ser.timeout = old_timeout
# ── Tier 3: Advanced tools ─────────────────────────────────────────
def trace(
self,
number: int | None = None,
trace_type: str | None = None,
channel: int | None = None,
scale: float | None = None,
refpos: float | None = None,
) -> dict:
"""Query or configure display traces.
Trace types: logmag, phase, delay, smith, polar, linear, swr, real, imag,
r, x, z, zp, g, b, y, rp, xp, and many more.
Args:
number: Trace number (0-3)
trace_type: Display format (e.g. 'logmag', 'swr', 'smith')
channel: Data channel (0=S11, 1=S21)
scale: Y-axis scale value
refpos: Reference position on display
"""
self._ensure_connected()
if number is not None:
if trace_type is not None:
cmd = f"trace {number} {trace_type}"
if channel is not None:
cmd += f" {channel}"
self._protocol.send_text_command(cmd)
elif scale is not None:
self._protocol.send_text_command(f"trace {number} scale {scale}")
elif refpos is not None:
self._protocol.send_text_command(f"trace {number} refpos {refpos}")
lines = self._protocol.send_text_command("trace")
return {"traces": lines}
def transform(self, mode: str | None = None) -> dict:
"""Control time-domain transform mode.
Modes: 'on', 'off', 'impulse', 'step', 'bandpass', 'minimum', 'normal', 'maximum'.
Args:
mode: Transform mode to set
"""
self._ensure_connected()
if not self._has_capability("transform"):
return {"error": "transform command not supported by this firmware"}
if mode is not None:
lines = self._protocol.send_text_command(f"transform {mode}")
return {"transform": mode, "response": lines}
lines = self._protocol.send_text_command("transform")
return {"transform_status": lines}
def smooth(self, factor: int | None = None) -> dict:
"""Get or set trace smoothing factor.
Args:
factor: Smoothing factor (0=off, higher=more smoothing)
"""
self._ensure_connected()
if not self._has_capability("smooth"):
return {"error": "smooth command not supported by this firmware"}
if factor is not None:
self._protocol.send_text_command(f"smooth {factor}")
lines = self._protocol.send_text_command("smooth")
return {"response": lines}
def threshold(self, frequency_hz: int | None = None) -> dict:
"""Get or set the harmonic mode frequency threshold (~290 MHz default).
Above this frequency the Si5351 uses odd harmonics for output.
Args:
frequency_hz: Threshold frequency in Hz
"""
self._ensure_connected()
if frequency_hz is not None:
self._protocol.send_text_command(f"threshold {frequency_hz}")
lines = self._protocol.send_text_command("threshold")
# Parse "current: 290000000" from response
for line in lines:
if "current:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
return {"threshold_hz": int(parts[1].strip())}
except ValueError:
pass
return {"response": lines}
def reset(self, dfu: bool = False) -> dict:
"""Reset the NanoVNA device. With dfu=True, enters DFU bootloader for firmware update.
Args:
dfu: If True, enter DFU bootloader mode (device will disconnect)
"""
self._ensure_connected()
cmd = "reset dfu" if dfu else "reset"
try:
self._protocol.send_text_command(cmd, timeout=2.0)
except NanoVNAProtocolError:
pass # Device resets and disconnects
self._protocol.close()
note = "Device entering DFU mode — reconnect after firmware update" if dfu else "Device resetting"
return {"reset": True, "dfu": dfu, "note": note}
def version(self) -> dict:
"""Get firmware version string."""
self._ensure_connected()
lines = self._protocol.send_text_command("version")
return {"version": lines[0].strip() if lines else "unknown"}
def detect(self) -> dict:
"""Scan USB ports for connected NanoVNA devices."""
ports = find_nanovna_ports()
return {
"devices": [
{
"port": p.device,
"vid": f"0x{p.vid:04x}",
"pid": f"0x{p.pid:04x}",
"serial_number": p.serial_number,
"description": p.description,
}
for p in ports
],
"count": len(ports),
"currently_connected": self._port,
}
def disconnect(self) -> dict:
"""Close the serial connection to the NanoVNA."""
port = self._port
self._protocol.close()
self._port = None
return {"disconnected": True, "port": port}
def raw_command(self, command: str) -> dict:
"""Send an arbitrary shell command to the NanoVNA and return raw text response.
Escape hatch for firmware commands not wrapped as dedicated tools.
Args:
command: The shell command string to send (e.g. 'config', 'usart_cfg')
"""
self._ensure_connected()
lines = self._protocol.send_text_command(command, timeout=10.0)
return {"command": command, "response_lines": lines}
# ── Novel tools ────────────────────────────────────────────────────
def cw(self, frequency_hz: int, power: int | None = None) -> dict:
"""Set continuous wave (CW) mode — output a single frequency.
Configures the NanoVNA to sweep a single point, effectively
becoming a CW signal generator at the specified frequency.
Args:
frequency_hz: Output frequency in Hz
power: Optional power level (0-3, or 255 for auto)
"""
self._ensure_connected()
if power is not None:
self._protocol.send_text_command(f"power {power}")
# CW mode is just a sweep with start == stop and 1 point
self._protocol.send_text_command(f"sweep {frequency_hz} {frequency_hz} 1")
self._protocol.send_text_command("resume")
return {"frequency_hz": frequency_hz, "power": power, "mode": "cw"}

455
src/mcnanovna/protocol.py Normal file
View File

@ -0,0 +1,455 @@
"""Serial protocol layer for NanoVNA-H shell communication.
Handles text command/response parsing, binary scan data, and the
three-phase initialization sequence mirrored from NanoVNA-App.
Protocol basics:
- Commands: ASCII text terminated with \\r\\n
- Responses: device echoes command, then data lines, then "ch> " prompt
- Binary: scan_bin returns a 4-byte header (mask + points) followed by packed floats
"""
from __future__ import annotations
import re
import struct
import time
from dataclasses import dataclass, field
import serial
# Scan mask bits (from NanoVNA-H main.c line 1371)
SCAN_MASK_OUT_FREQ = 0x01
SCAN_MASK_OUT_DATA0 = 0x02
SCAN_MASK_OUT_DATA1 = 0x04
SCAN_MASK_NO_CALIBRATION = 0x08
SCAN_MASK_NO_EDELAY = 0x10
SCAN_MASK_NO_S21OFFS = 0x20
SCAN_MASK_BINARY = 0x80
PROMPT = "ch> "
PROMPT_BYTES = PROMPT.encode("ascii")
DEFAULT_BAUD = 115200
DEFAULT_TIMEOUT = 2.0
SCAN_TIMEOUT = 30.0
class NanoVNAProtocolError(Exception):
pass
class NanoVNAConnectionError(NanoVNAProtocolError):
pass
@dataclass
class DeviceInfo:
"""Parsed device information from version/info commands."""
board: str = "unknown"
version: str = "unknown"
max_points: int = 101
if_hz: int = 12000
adc_hz: int = 192000
lcd_width: int = 320
lcd_height: int = 240
architecture: str = ""
platform: str = ""
build_time: str = ""
kernel: str = ""
compiler: str = ""
capabilities: list[str] = field(default_factory=list)
def _parse_freq_suffix(s: str) -> int:
"""Parse frequency string with k/m suffix (e.g., '12k' -> 12000)."""
s = s.strip().lower()
if s.endswith("m"):
return int(float(s[:-1]) * 1_000_000)
if s.endswith("k"):
return int(float(s[:-1]) * 1_000)
return int(s)
def _parse_version_bracket(text: str) -> dict:
"""Parse bracketed params: [p:401, IF:12k, ADC:192k, Lcd:480x320]"""
match = re.search(r"\[([^\]]+)\]", text)
if not match:
return {}
result = {}
for part in match.group(1).split(","):
part = part.strip()
if ":" not in part:
continue
key, val = part.split(":", 1)
key = key.strip().lower()
val = val.strip()
if key == "p":
result["max_points"] = int(val)
elif key == "if":
result["if_hz"] = _parse_freq_suffix(val)
elif key == "adc":
result["adc_hz"] = _parse_freq_suffix(val)
elif key == "lcd":
if "x" in val:
w, h = val.split("x", 1)
result["lcd_width"] = int(w)
result["lcd_height"] = int(h)
return result
class NanoVNAProtocol:
"""Low-level serial protocol for NanoVNA shell commands.
Mirrors NanoVNA-App's NanoVNA_v1_comms.cpp communication model:
send command, read until 'ch> ' prompt, parse response.
"""
def __init__(self) -> None:
self._serial: serial.Serial | None = None
self.device_info = DeviceInfo()
self._connected = False
@property
def connected(self) -> bool:
return self._connected and self._serial is not None and self._serial.is_open
def open(self, port: str, baudrate: int = DEFAULT_BAUD) -> None:
"""Open serial connection to NanoVNA."""
if self._serial and self._serial.is_open:
self._serial.close()
self._serial = serial.Serial(
port=port,
baudrate=baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=DEFAULT_TIMEOUT,
write_timeout=DEFAULT_TIMEOUT,
)
self._serial.dtr = True
self._serial.rts = True
self._connected = True
def close(self) -> None:
"""Close serial connection."""
if self._serial and self._serial.is_open:
self._serial.close()
self._serial = None
self._connected = False
def _require_connection(self) -> serial.Serial:
if not self._serial or not self._serial.is_open:
raise NanoVNAConnectionError("Not connected to NanoVNA")
return self._serial
def _drain(self) -> None:
"""Drain any pending data from the serial buffer."""
ser = self._require_connection()
ser.reset_input_buffer()
def _send_raw(self, data: bytes) -> None:
ser = self._require_connection()
ser.write(data)
ser.flush()
def _send_command(self, command: str) -> None:
"""Send a shell command (appends \\r\\n)."""
self._send_raw(f"{command}\r\n".encode("ascii"))
def _read_until_prompt(self, timeout: float = DEFAULT_TIMEOUT) -> list[str]:
"""Read response lines until 'ch> ' prompt is detected.
Returns list of response lines (excluding the echo line and prompt).
The firmware echoes the command as the first line, which we skip.
"""
ser = self._require_connection()
old_timeout = ser.timeout
ser.timeout = timeout
try:
buf = b""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if not chunk:
continue
buf += chunk
if PROMPT_BYTES in buf:
break
else:
raise NanoVNAProtocolError(f"Timeout waiting for prompt (got {len(buf)} bytes)")
# Split at prompt, take content before it
content = buf.split(PROMPT_BYTES)[0]
lines = content.decode("ascii", errors="replace").splitlines()
# Skip the echo line (first non-empty line is usually the echoed command)
return lines[1:] if lines else []
finally:
ser.timeout = old_timeout
def send_text_command(self, command: str, timeout: float = DEFAULT_TIMEOUT) -> list[str]:
"""Send a command and return text response lines.
Skips the echoed command line. Returns only the data lines
between the echo and the 'ch> ' prompt.
"""
self._drain()
self._send_command(command)
return self._read_until_prompt(timeout=timeout)
def send_binary_scan(
self,
start_hz: int,
stop_hz: int,
points: int,
mask: int,
) -> tuple[int, int, bytes]:
"""Execute scan_bin and return raw binary data.
Returns (mask, points, raw_bytes) where raw_bytes contains
the per-point binary data (excluding the 4-byte header).
"""
ser = self._require_connection()
self._drain()
self._send_command(f"scan {start_hz} {stop_hz} {points} {mask}")
old_timeout = ser.timeout
ser.timeout = SCAN_TIMEOUT
try:
# Read until we get past the echo line's \r\n and into binary data.
# The echo line is: "scan <args>\r\n" followed immediately by binary.
buf = b""
deadline = time.monotonic() + SCAN_TIMEOUT
# First, read past the echo line
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if not chunk:
continue
buf += chunk
# Look for end of echo line
if b"\r\n" in buf:
break
# Split off echo, binary starts after first \r\n
echo_end = buf.index(b"\r\n") + 2
binary_buf = buf[echo_end:]
# Read 4-byte header: uint16 mask + uint16 points
while len(binary_buf) < 4 and time.monotonic() < deadline:
chunk = ser.read(4 - len(binary_buf))
if chunk:
binary_buf += chunk
if len(binary_buf) < 4:
raise NanoVNAProtocolError("Failed to read binary scan header")
rx_mask, rx_points = struct.unpack_from("<HH", binary_buf, 0)
# Calculate expected payload size
bytes_per_point = 0
if rx_mask & SCAN_MASK_OUT_FREQ:
bytes_per_point += 4
if rx_mask & SCAN_MASK_OUT_DATA0:
bytes_per_point += 8
if rx_mask & SCAN_MASK_OUT_DATA1:
bytes_per_point += 8
expected_size = 4 + (bytes_per_point * rx_points)
# Read remaining data
while len(binary_buf) < expected_size and time.monotonic() < deadline:
remaining = expected_size - len(binary_buf)
chunk = ser.read(remaining)
if chunk:
binary_buf += chunk
if len(binary_buf) < expected_size:
raise NanoVNAProtocolError(
f"Incomplete binary data: got {len(binary_buf)} bytes, expected {expected_size}"
)
# Read through the trailing prompt (don't leave it in the buffer)
trailing = b""
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
trailing += chunk
if PROMPT_BYTES in trailing:
break
if not chunk and not ser.in_waiting:
break
return rx_mask, rx_points, binary_buf[4:expected_size]
finally:
ser.timeout = old_timeout
# -- Initialization sequence (mirrors NanoVNA-App 3-phase init) --
def sync(self) -> bool:
"""Phase 1: Send empty line, wait for ch> prompt."""
self._drain()
self._send_raw(b"\r\n")
ser = self._require_connection()
old_timeout = ser.timeout
ser.timeout = 3.0
try:
buf = b""
deadline = time.monotonic() + 3.0
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
buf += chunk
if PROMPT_BYTES in buf:
return True
return False
finally:
ser.timeout = old_timeout
def discover_capabilities(self) -> DeviceInfo:
"""Phase 2+3: Send help, version, info to populate device info."""
info = DeviceInfo()
# Parse help output for capability flags
help_lines = self.send_text_command("help")
help_text = " ".join(help_lines).lower()
# The help output format is: "Commands: scan scan_bin data ..."
for cmd in [
"scan_bin", "data", "frequencies", "sweep", "power", "bandwidth",
"cal", "save", "recall", "trace", "marker", "edelay", "s21offset",
"capture", "vbat", "tcxo", "reset", "smooth", "transform",
"threshold", "info", "version", "color", "measure", "pause",
"resume", "config", "usart_cfg", "vbat_offset", "time",
]:
if cmd in help_text:
info.capabilities.append(cmd)
# Parse version
version_lines = self.send_text_command("version")
if version_lines:
info.version = version_lines[0].strip()
# Parse info (if available)
if "info" in info.capabilities:
info_lines = self.send_text_command("info")
for line in info_lines:
line = line.strip()
if line.startswith("Board:"):
info.board = line.split(":", 1)[1].strip()
elif line.startswith("Version:"):
# Parse bracket params: [p:401, IF:12k, ADC:192k, Lcd:480x320]
bracket = _parse_version_bracket(line)
info.max_points = bracket.get("max_points", info.max_points)
info.if_hz = bracket.get("if_hz", info.if_hz)
info.adc_hz = bracket.get("adc_hz", info.adc_hz)
info.lcd_width = bracket.get("lcd_width", info.lcd_width)
info.lcd_height = bracket.get("lcd_height", info.lcd_height)
elif line.startswith("Build Time:"):
info.build_time = line.split(":", 1)[1].strip()
elif line.startswith("Kernel:"):
info.kernel = line.split(":", 1)[1].strip()
elif line.startswith("Compiler:"):
info.compiler = line.split(":", 1)[1].strip()
elif line.startswith("Architecture:"):
info.architecture = line.split(":", 1)[1].strip()
elif line.startswith("Platform:"):
info.platform = line.split(":", 1)[1].strip()
self.device_info = info
return info
def initialize(self) -> DeviceInfo:
"""Full init sequence: sync → discover capabilities."""
if not self.sync():
raise NanoVNAConnectionError("Failed to sync with NanoVNA (no prompt received)")
return self.discover_capabilities()
# -- Data parsing helpers --
@dataclass
class ScanPoint:
frequency_hz: int | None = None
s11: tuple[float, float] | None = None # (real, imag)
s21: tuple[float, float] | None = None # (real, imag)
def parse_scan_text(lines: list[str], mask: int) -> list[ScanPoint]:
"""Parse text-mode scan response lines into ScanPoint objects.
Each line format: <freq> <s11_re> <s11_im> [<s21_re> <s21_im>]
"""
points = []
for line in lines:
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) < 1:
continue
point = ScanPoint()
idx = 0
if mask & SCAN_MASK_OUT_FREQ and idx < len(parts):
point.frequency_hz = int(parts[idx])
idx += 1
if mask & SCAN_MASK_OUT_DATA0 and idx + 1 < len(parts):
point.s11 = (float(parts[idx]), float(parts[idx + 1]))
idx += 2
if mask & SCAN_MASK_OUT_DATA1 and idx + 1 < len(parts):
point.s21 = (float(parts[idx]), float(parts[idx + 1]))
idx += 2
points.append(point)
return points
def parse_scan_binary(mask: int, points: int, data: bytes) -> list[ScanPoint]:
"""Parse binary scan data payload into ScanPoint objects.
Binary layout per point (all little-endian):
if MASK_OUT_FREQ: uint32 frequency_hz
if MASK_OUT_DATA0: float32 s11_re, float32 s11_im
if MASK_OUT_DATA1: float32 s21_re, float32 s21_im
"""
result = []
offset = 0
for _ in range(points):
point = ScanPoint()
if mask & SCAN_MASK_OUT_FREQ:
point.frequency_hz = struct.unpack_from("<I", data, offset)[0]
offset += 4
if mask & SCAN_MASK_OUT_DATA0:
re, im = struct.unpack_from("<ff", data, offset)
point.s11 = (re, im)
offset += 8
if mask & SCAN_MASK_OUT_DATA1:
re, im = struct.unpack_from("<ff", data, offset)
point.s21 = (re, im)
offset += 8
result.append(point)
return result
def parse_float_pairs(lines: list[str]) -> list[tuple[float, float]]:
"""Parse 'real imag' lines into float tuples (used by data command)."""
pairs = []
for line in lines:
line = line.strip()
if not line:
continue
parts = line.split()
if len(parts) >= 2:
pairs.append((float(parts[0]), float(parts[1])))
return pairs
def parse_frequencies(lines: list[str]) -> list[int]:
"""Parse frequency list (one integer per line)."""
freqs = []
for line in lines:
line = line.strip()
if line:
try:
freqs.append(int(line))
except ValueError:
pass
return freqs

57
src/mcnanovna/server.py Normal file
View File

@ -0,0 +1,57 @@
"""FastMCP server for NanoVNA-H vector network analyzers.
Registers all NanoVNA tool methods and starts the MCP server.
"""
from __future__ import annotations
from fastmcp import FastMCP
from fastmcp.tools.tool import FunctionTool
from mcnanovna.nanovna import NanoVNA
# All public methods on NanoVNA that should become MCP tools
_TOOL_METHODS = [
"info", "sweep", "scan", "data", "frequencies", "marker", "cal",
"save", "recall", "pause", "resume", "power", "bandwidth", "edelay",
"s21offset", "vbat", "capture", "trace", "transform", "smooth",
"threshold", "reset", "version", "detect", "disconnect", "raw_command", "cw",
]
def create_server() -> FastMCP:
mcp = FastMCP(
name="mcnanovna",
instructions=(
"MCP server for controlling NanoVNA-H vector network analyzers via USB serial. "
"Tools provide structured access to sweep measurements, S-parameter data, "
"calibration, markers, and device configuration. The device auto-connects "
"on first tool call — just plug in and go."
),
)
vna = NanoVNA()
for method_name in _TOOL_METHODS:
bound_method = getattr(vna, method_name)
tool = FunctionTool.from_function(bound_method)
mcp.add_tool(tool)
return mcp
def main() -> None:
try:
from importlib.metadata import version
package_version = version("mcnanovna")
except Exception:
package_version = "dev"
print(f"mcnanovna v{package_version} — NanoVNA-H MCP server")
server = create_server()
server.run()
if __name__ == "__main__":
main()

1658
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff