Async progress notifications for long-running tools, connection keepalive

Convert scan, analyze, capture, and cal to async with MCP progress
reporting via FastMCP Context. Blocking serial I/O wrapped with
asyncio.to_thread() so the event loop stays free to deliver progress
notifications during hardware sweeps.

Add connection keepalive: _ensure_connected validates stale connections
with a sync probe after 30s idle, and retries on cold-start failures
(fixes flaky first-connect after MCP server restart).
This commit is contained in:
Ryan Malloy 2026-01-30 15:05:50 -07:00
parent 4569fea9f9
commit 48e91a755c

View File

@ -6,9 +6,12 @@ in server.py. The NanoVNA class manages connection lifecycle with lazy auto-conn
from __future__ import annotations
import asyncio
import base64
import re
from fastmcp import Context
from mcnanovna.discovery import find_first_nanovna, find_nanovna_ports
from mcnanovna.protocol import (
SCAN_MASK_BINARY,
@ -45,21 +48,47 @@ POWER_DESCRIPTIONS = {
}
async def _progress(ctx: Context | None, progress: float, total: float, message: str) -> None:
"""Report progress if Context is available."""
if ctx:
await ctx.report_progress(progress, total, message)
class NanoVNA:
"""MCP tool class for NanoVNA-H vector network analyzers.
Manages a serial connection with lazy auto-connect: the first tool
call that needs hardware triggers USB discovery and initialization.
Connection is validated with a sync probe after idle periods.
"""
_KEEPALIVE_TIMEOUT = 30.0 # seconds before re-validating connection
def __init__(self) -> None:
self._protocol = NanoVNAProtocol()
self._port: str | None = None
self._last_ok: float = 0.0
def _ensure_connected(self) -> None:
"""Auto-connect on first use, or reconnect if dropped."""
"""Auto-connect on first use, reconnect if stale, retry on failure."""
import time as _time
if self._protocol.connected:
return
# Recently active — trust the connection
if (_time.monotonic() - self._last_ok) < self._KEEPALIVE_TIMEOUT:
self._last_ok = _time.monotonic()
return
# Idle too long — validate with a sync probe
try:
if self._protocol.sync():
self._last_ok = _time.monotonic()
return
except Exception:
pass
# Stale connection — close and fall through to reconnect
self._protocol.close()
self._port = None
port_info = find_first_nanovna()
if port_info is None:
raise NanoVNAConnectionError(
@ -67,8 +96,21 @@ class NanoVNA:
"appears as a serial port (VID 0x0483, PID 0x5740)."
)
self._port = port_info.device
self._protocol.open(self._port)
self._protocol.initialize()
# Connect with retry — first sync can fail on cold/stale serial ports
last_error: Exception | None = None
for attempt in range(2):
try:
self._protocol.open(self._port)
self._protocol.initialize()
self._last_ok = _time.monotonic()
return
except NanoVNAConnectionError as exc:
last_error = exc
self._protocol.close()
if attempt == 0:
_time.sleep(0.3)
raise last_error # type: ignore[misc]
def _has_capability(self, cmd: str) -> bool:
return cmd in self._protocol.device_info.capabilities
@ -128,7 +170,7 @@ class NanoVNA:
}
return {"start_hz": 0, "stop_hz": 0, "points": 0}
def scan(
async def scan(
self,
start_hz: int,
stop_hz: int,
@ -136,6 +178,7 @@ class NanoVNA:
s11: bool = True,
s21: bool = True,
apply_cal: bool = True,
ctx: Context | None = None,
) -> dict:
"""Perform a frequency sweep and return S-parameter measurement data.
@ -150,7 +193,9 @@ class NanoVNA:
s21: Include S21 transmission data
apply_cal: Apply stored calibration correction (set False for raw data)
"""
self._ensure_connected()
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
mask = SCAN_MASK_OUT_FREQ
if s11:
mask |= SCAN_MASK_OUT_DATA0
@ -161,19 +206,26 @@ class NanoVNA:
use_binary = self._has_capability("scan_bin")
await _progress(ctx, 2, 4, "Sending scan command...")
if use_binary:
binary_mask = mask | SCAN_MASK_BINARY
rx_mask, rx_points, raw = self._protocol.send_binary_scan(
start_hz, stop_hz, points, binary_mask
await _progress(ctx, 3, 4, f"Waiting for sweep data ({points} points)...")
rx_mask, rx_points, raw = await asyncio.to_thread(
self._protocol.send_binary_scan, start_hz, stop_hz, points, binary_mask
)
scan_points = parse_scan_binary(rx_mask, rx_points, raw)
else:
lines = self._protocol.send_text_command(
await _progress(ctx, 3, 4, f"Waiting for sweep data ({points} points)...")
lines = await asyncio.to_thread(
self._protocol.send_text_command,
f"scan {start_hz} {stop_hz} {points} {mask}",
timeout=30.0,
30.0,
)
scan_points = parse_scan_text(lines, mask)
await _progress(ctx, 4, 4, "Parsing measurement data...")
data = []
for pt in scan_points:
entry: dict = {}
@ -259,7 +311,7 @@ class NanoVNA:
pass
return {"markers": markers}
def cal(self, step: str | None = None) -> dict:
async def cal(self, step: str | None = None, ctx: Context | None = None) -> dict:
"""Query calibration status or perform a calibration step.
Steps: 'load', 'open', 'short', 'thru', 'isoln', 'done', 'on', 'off', 'reset'.
@ -268,15 +320,19 @@ class NanoVNA:
Args:
step: Calibration step to execute
"""
self._ensure_connected()
await asyncio.to_thread(self._ensure_connected)
if step is not None:
valid = {"load", "open", "short", "thru", "isoln", "done", "on", "off", "reset"}
if step not in valid:
return {"error": f"Invalid step '{step}'. Valid: {', '.join(sorted(valid))}"}
lines = self._protocol.send_text_command(f"cal {step}", timeout=10.0)
await _progress(ctx, 1, 2, f"Sending calibration command: {step}...")
lines = await asyncio.to_thread(
self._protocol.send_text_command, f"cal {step}", 10.0
)
await _progress(ctx, 2, 2, f"Calibration step '{step}' complete")
return {"step": step, "response": lines}
lines = self._protocol.send_text_command("cal")
lines = await asyncio.to_thread(self._protocol.send_text_command, "cal")
return {"status": lines}
def save(self, slot: int) -> dict:
@ -418,29 +474,19 @@ class NanoVNA:
pass
return {"voltage_mv": 0, "voltage_v": 0.0, "raw": lines}
def capture(self, raw: bool = False):
"""Capture the current LCD screen as RGB565 pixel data (base64 encoded).
def _capture_raw_bytes(self) -> tuple[int, int, bytearray]:
"""Read raw RGB565 pixel data from the device. Blocking serial I/O."""
import time
Returns width, height, and raw pixel data for rendering. The pixel format
is RGB565 (16-bit, 2 bytes per pixel). Total size = width * height * 2 bytes.
Args:
raw: If True, return raw RGB565 data as a dict with base64-encoded bytes.
If False (default), convert to PNG and return as an Image.
"""
self._ensure_connected()
di = self._protocol.device_info
width = di.lcd_width
height = di.lcd_height
expected_size = width * height * 2
# capture command outputs raw binary RGB565 data after echo line
self._protocol._drain()
self._protocol._send_command("capture")
ser = self._protocol._require_connection()
import time
old_timeout = ser.timeout
ser.timeout = 10.0
try:
@ -481,40 +527,61 @@ class NanoVNA:
if b"ch> " in trailing or not chunk:
break
if raw:
return {
"format": "rgb565",
"width": width,
"height": height,
"data_length": len(swapped),
"data_base64": base64.b64encode(bytes(swapped)).decode("ascii"),
}
# Convert RGB565 to PNG and return as MCP Image
import io
import struct as _struct
from PIL import Image as PILImage
from fastmcp.utilities.types import Image
img = PILImage.new("RGB", (width, height))
pixels = img.load()
for y in range(height):
for x in range(width):
offset = (y * width + x) * 2
pixel = _struct.unpack(">H", swapped[offset : offset + 2])[0]
r = ((pixel >> 11) & 0x1F) << 3
g = ((pixel >> 5) & 0x3F) << 2
b = (pixel & 0x1F) << 3
pixels[x, y] = (r, g, b)
buf_png = io.BytesIO()
img.save(buf_png, format="PNG")
return Image(data=buf_png.getvalue(), format="png")
return width, height, swapped
finally:
ser.timeout = old_timeout
async def capture(self, raw: bool = False, ctx: Context | None = None):
"""Capture the current LCD screen as RGB565 pixel data (base64 encoded).
Returns width, height, and raw pixel data for rendering. The pixel format
is RGB565 (16-bit, 2 bytes per pixel). Total size = width * height * 2 bytes.
Args:
raw: If True, return raw RGB565 data as a dict with base64-encoded bytes.
If False (default), convert to PNG and return as an Image.
"""
await _progress(ctx, 1, 3, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 3, "Reading LCD pixel data...")
width, height, swapped = await asyncio.to_thread(self._capture_raw_bytes)
if raw:
await _progress(ctx, 3, 3, "Capture complete")
return {
"format": "rgb565",
"width": width,
"height": height,
"data_length": len(swapped),
"data_base64": base64.b64encode(bytes(swapped)).decode("ascii"),
}
await _progress(ctx, 3, 3, "Encoding PNG image...")
# Convert RGB565 to PNG and return as MCP Image
import io
import struct as _struct
from PIL import Image as PILImage
from fastmcp.utilities.types import Image
img = PILImage.new("RGB", (width, height))
pixels = img.load()
for y in range(height):
for x in range(width):
offset = (y * width + x) * 2
pixel = _struct.unpack(">H", swapped[offset : offset + 2])[0]
r = ((pixel >> 11) & 0x1F) << 3
g = ((pixel >> 5) & 0x3F) << 2
b = (pixel & 0x1F) << 3
pixels[x, y] = (r, g, b)
buf_png = io.BytesIO()
img.save(buf_png, format="PNG")
return Image(data=buf_png.getvalue(), format="png")
# ── Tier 3: Advanced tools ─────────────────────────────────────────
def trace(
@ -1147,6 +1214,10 @@ class NanoVNA:
Shows all running threads with their stack usage, priority, and state.
Useful for diagnosing firmware issues.
TODO: When hardware with ENABLE_THREADS_COMMAND is available, explore
representing ChibiOS threads as MCP Tasks (FastMCP tasks=True) so they
surface in Claude Code's /tasks UI with live state tracking.
"""
self._ensure_connected()
if not self._has_capability("threads"):
@ -1398,13 +1469,14 @@ class NanoVNA:
# ── Convenience: analyze scan data server-side ────────────────────
def analyze(
async def analyze(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
s11: bool = True,
s21: bool = False,
ctx: Context | None = None,
) -> dict:
"""Run a scan and return comprehensive S-parameter analysis.
@ -1422,10 +1494,17 @@ class NanoVNA:
"""
from mcnanovna.calculations import analyze_scan
scan_result = self.scan(start_hz, stop_hz, points, s11=s11, s21=s21)
await _progress(ctx, 1, 5, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 5, f"Scanning {points} points from {start_hz} to {stop_hz} Hz...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=s11, s21=s21)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 5, f"Received {scan_result['points']} measurement points")
await _progress(ctx, 4, 5, "Calculating S-parameter metrics...")
analysis = analyze_scan(scan_result["data"])
analysis["scan_info"] = {
"start_hz": start_hz,
@ -1433,4 +1512,6 @@ class NanoVNA:
"points": scan_result["points"],
"binary": scan_result.get("binary", False),
}
await _progress(ctx, 5, 5, "Analysis complete")
return analysis