From 48e91a755c85d463e695ee1425a7077bacccfc27 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Fri, 30 Jan 2026 15:05:50 -0700 Subject: [PATCH] Async progress notifications for long-running tools, connection keepalive Convert scan, analyze, capture, and cal to async with MCP progress reporting via FastMCP Context. Blocking serial I/O wrapped with asyncio.to_thread() so the event loop stays free to deliver progress notifications during hardware sweeps. Add connection keepalive: _ensure_connected validates stale connections with a sync probe after 30s idle, and retries on cold-start failures (fixes flaky first-connect after MCP server restart). --- src/mcnanovna/nanovna.py | 201 +++++++++++++++++++++++++++------------ 1 file changed, 141 insertions(+), 60 deletions(-) diff --git a/src/mcnanovna/nanovna.py b/src/mcnanovna/nanovna.py index ee9e95e..b3469e4 100644 --- a/src/mcnanovna/nanovna.py +++ b/src/mcnanovna/nanovna.py @@ -6,9 +6,12 @@ in server.py. The NanoVNA class manages connection lifecycle with lazy auto-conn from __future__ import annotations +import asyncio import base64 import re +from fastmcp import Context + from mcnanovna.discovery import find_first_nanovna, find_nanovna_ports from mcnanovna.protocol import ( SCAN_MASK_BINARY, @@ -45,21 +48,47 @@ POWER_DESCRIPTIONS = { } +async def _progress(ctx: Context | None, progress: float, total: float, message: str) -> None: + """Report progress if Context is available.""" + if ctx: + await ctx.report_progress(progress, total, message) + + class NanoVNA: """MCP tool class for NanoVNA-H vector network analyzers. Manages a serial connection with lazy auto-connect: the first tool call that needs hardware triggers USB discovery and initialization. + Connection is validated with a sync probe after idle periods. """ + _KEEPALIVE_TIMEOUT = 30.0 # seconds before re-validating connection + def __init__(self) -> None: self._protocol = NanoVNAProtocol() self._port: str | None = None + self._last_ok: float = 0.0 def _ensure_connected(self) -> None: - """Auto-connect on first use, or reconnect if dropped.""" + """Auto-connect on first use, reconnect if stale, retry on failure.""" + import time as _time + if self._protocol.connected: - return + # Recently active — trust the connection + if (_time.monotonic() - self._last_ok) < self._KEEPALIVE_TIMEOUT: + self._last_ok = _time.monotonic() + return + # Idle too long — validate with a sync probe + try: + if self._protocol.sync(): + self._last_ok = _time.monotonic() + return + except Exception: + pass + # Stale connection — close and fall through to reconnect + self._protocol.close() + self._port = None + port_info = find_first_nanovna() if port_info is None: raise NanoVNAConnectionError( @@ -67,8 +96,21 @@ class NanoVNA: "appears as a serial port (VID 0x0483, PID 0x5740)." ) self._port = port_info.device - self._protocol.open(self._port) - self._protocol.initialize() + + # Connect with retry — first sync can fail on cold/stale serial ports + last_error: Exception | None = None + for attempt in range(2): + try: + self._protocol.open(self._port) + self._protocol.initialize() + self._last_ok = _time.monotonic() + return + except NanoVNAConnectionError as exc: + last_error = exc + self._protocol.close() + if attempt == 0: + _time.sleep(0.3) + raise last_error # type: ignore[misc] def _has_capability(self, cmd: str) -> bool: return cmd in self._protocol.device_info.capabilities @@ -128,7 +170,7 @@ class NanoVNA: } return {"start_hz": 0, "stop_hz": 0, "points": 0} - def scan( + async def scan( self, start_hz: int, stop_hz: int, @@ -136,6 +178,7 @@ class NanoVNA: s11: bool = True, s21: bool = True, apply_cal: bool = True, + ctx: Context | None = None, ) -> dict: """Perform a frequency sweep and return S-parameter measurement data. @@ -150,7 +193,9 @@ class NanoVNA: s21: Include S21 transmission data apply_cal: Apply stored calibration correction (set False for raw data) """ - self._ensure_connected() + await _progress(ctx, 1, 4, "Connecting to NanoVNA...") + await asyncio.to_thread(self._ensure_connected) + mask = SCAN_MASK_OUT_FREQ if s11: mask |= SCAN_MASK_OUT_DATA0 @@ -161,19 +206,26 @@ class NanoVNA: use_binary = self._has_capability("scan_bin") + await _progress(ctx, 2, 4, "Sending scan command...") + if use_binary: binary_mask = mask | SCAN_MASK_BINARY - rx_mask, rx_points, raw = self._protocol.send_binary_scan( - start_hz, stop_hz, points, binary_mask + await _progress(ctx, 3, 4, f"Waiting for sweep data ({points} points)...") + rx_mask, rx_points, raw = await asyncio.to_thread( + self._protocol.send_binary_scan, start_hz, stop_hz, points, binary_mask ) scan_points = parse_scan_binary(rx_mask, rx_points, raw) else: - lines = self._protocol.send_text_command( + await _progress(ctx, 3, 4, f"Waiting for sweep data ({points} points)...") + lines = await asyncio.to_thread( + self._protocol.send_text_command, f"scan {start_hz} {stop_hz} {points} {mask}", - timeout=30.0, + 30.0, ) scan_points = parse_scan_text(lines, mask) + await _progress(ctx, 4, 4, "Parsing measurement data...") + data = [] for pt in scan_points: entry: dict = {} @@ -259,7 +311,7 @@ class NanoVNA: pass return {"markers": markers} - def cal(self, step: str | None = None) -> dict: + async def cal(self, step: str | None = None, ctx: Context | None = None) -> dict: """Query calibration status or perform a calibration step. Steps: 'load', 'open', 'short', 'thru', 'isoln', 'done', 'on', 'off', 'reset'. @@ -268,15 +320,19 @@ class NanoVNA: Args: step: Calibration step to execute """ - self._ensure_connected() + await asyncio.to_thread(self._ensure_connected) if step is not None: valid = {"load", "open", "short", "thru", "isoln", "done", "on", "off", "reset"} if step not in valid: return {"error": f"Invalid step '{step}'. Valid: {', '.join(sorted(valid))}"} - lines = self._protocol.send_text_command(f"cal {step}", timeout=10.0) + await _progress(ctx, 1, 2, f"Sending calibration command: {step}...") + lines = await asyncio.to_thread( + self._protocol.send_text_command, f"cal {step}", 10.0 + ) + await _progress(ctx, 2, 2, f"Calibration step '{step}' complete") return {"step": step, "response": lines} - lines = self._protocol.send_text_command("cal") + lines = await asyncio.to_thread(self._protocol.send_text_command, "cal") return {"status": lines} def save(self, slot: int) -> dict: @@ -418,29 +474,19 @@ class NanoVNA: pass return {"voltage_mv": 0, "voltage_v": 0.0, "raw": lines} - def capture(self, raw: bool = False): - """Capture the current LCD screen as RGB565 pixel data (base64 encoded). + def _capture_raw_bytes(self) -> tuple[int, int, bytearray]: + """Read raw RGB565 pixel data from the device. Blocking serial I/O.""" + import time - Returns width, height, and raw pixel data for rendering. The pixel format - is RGB565 (16-bit, 2 bytes per pixel). Total size = width * height * 2 bytes. - - Args: - raw: If True, return raw RGB565 data as a dict with base64-encoded bytes. - If False (default), convert to PNG and return as an Image. - """ - self._ensure_connected() di = self._protocol.device_info width = di.lcd_width height = di.lcd_height expected_size = width * height * 2 - # capture command outputs raw binary RGB565 data after echo line self._protocol._drain() self._protocol._send_command("capture") ser = self._protocol._require_connection() - import time - old_timeout = ser.timeout ser.timeout = 10.0 try: @@ -481,40 +527,61 @@ class NanoVNA: if b"ch> " in trailing or not chunk: break - if raw: - return { - "format": "rgb565", - "width": width, - "height": height, - "data_length": len(swapped), - "data_base64": base64.b64encode(bytes(swapped)).decode("ascii"), - } - - # Convert RGB565 to PNG and return as MCP Image - import io - import struct as _struct - - from PIL import Image as PILImage - - from fastmcp.utilities.types import Image - - img = PILImage.new("RGB", (width, height)) - pixels = img.load() - for y in range(height): - for x in range(width): - offset = (y * width + x) * 2 - pixel = _struct.unpack(">H", swapped[offset : offset + 2])[0] - r = ((pixel >> 11) & 0x1F) << 3 - g = ((pixel >> 5) & 0x3F) << 2 - b = (pixel & 0x1F) << 3 - pixels[x, y] = (r, g, b) - - buf_png = io.BytesIO() - img.save(buf_png, format="PNG") - return Image(data=buf_png.getvalue(), format="png") + return width, height, swapped finally: ser.timeout = old_timeout + async def capture(self, raw: bool = False, ctx: Context | None = None): + """Capture the current LCD screen as RGB565 pixel data (base64 encoded). + + Returns width, height, and raw pixel data for rendering. The pixel format + is RGB565 (16-bit, 2 bytes per pixel). Total size = width * height * 2 bytes. + + Args: + raw: If True, return raw RGB565 data as a dict with base64-encoded bytes. + If False (default), convert to PNG and return as an Image. + """ + await _progress(ctx, 1, 3, "Connecting to NanoVNA...") + await asyncio.to_thread(self._ensure_connected) + + await _progress(ctx, 2, 3, "Reading LCD pixel data...") + width, height, swapped = await asyncio.to_thread(self._capture_raw_bytes) + + if raw: + await _progress(ctx, 3, 3, "Capture complete") + return { + "format": "rgb565", + "width": width, + "height": height, + "data_length": len(swapped), + "data_base64": base64.b64encode(bytes(swapped)).decode("ascii"), + } + + await _progress(ctx, 3, 3, "Encoding PNG image...") + + # Convert RGB565 to PNG and return as MCP Image + import io + import struct as _struct + + from PIL import Image as PILImage + + from fastmcp.utilities.types import Image + + img = PILImage.new("RGB", (width, height)) + pixels = img.load() + for y in range(height): + for x in range(width): + offset = (y * width + x) * 2 + pixel = _struct.unpack(">H", swapped[offset : offset + 2])[0] + r = ((pixel >> 11) & 0x1F) << 3 + g = ((pixel >> 5) & 0x3F) << 2 + b = (pixel & 0x1F) << 3 + pixels[x, y] = (r, g, b) + + buf_png = io.BytesIO() + img.save(buf_png, format="PNG") + return Image(data=buf_png.getvalue(), format="png") + # ── Tier 3: Advanced tools ───────────────────────────────────────── def trace( @@ -1147,6 +1214,10 @@ class NanoVNA: Shows all running threads with their stack usage, priority, and state. Useful for diagnosing firmware issues. + + TODO: When hardware with ENABLE_THREADS_COMMAND is available, explore + representing ChibiOS threads as MCP Tasks (FastMCP tasks=True) so they + surface in Claude Code's /tasks UI with live state tracking. """ self._ensure_connected() if not self._has_capability("threads"): @@ -1398,13 +1469,14 @@ class NanoVNA: # ── Convenience: analyze scan data server-side ──────────────────── - def analyze( + async def analyze( self, start_hz: int, stop_hz: int, points: int = 101, s11: bool = True, s21: bool = False, + ctx: Context | None = None, ) -> dict: """Run a scan and return comprehensive S-parameter analysis. @@ -1422,10 +1494,17 @@ class NanoVNA: """ from mcnanovna.calculations import analyze_scan - scan_result = self.scan(start_hz, stop_hz, points, s11=s11, s21=s21) + await _progress(ctx, 1, 5, "Connecting to NanoVNA...") + await asyncio.to_thread(self._ensure_connected) + + await _progress(ctx, 2, 5, f"Scanning {points} points from {start_hz} to {stop_hz} Hz...") + scan_result = await self.scan(start_hz, stop_hz, points, s11=s11, s21=s21) if "error" in scan_result: return scan_result + await _progress(ctx, 3, 5, f"Received {scan_result['points']} measurement points") + + await _progress(ctx, 4, 5, "Calculating S-parameter metrics...") analysis = analyze_scan(scan_result["data"]) analysis["scan_info"] = { "start_hz": start_hz, @@ -1433,4 +1512,6 @@ class NanoVNA: "points": scan_result["points"], "binary": scan_result.get("binary", False), } + + await _progress(ctx, 5, 5, "Analysis complete") return analysis