diff --git a/src/mcnanovna/calculations.py b/src/mcnanovna/calculations.py new file mode 100644 index 0000000..f6cd0f9 --- /dev/null +++ b/src/mcnanovna/calculations.py @@ -0,0 +1,438 @@ +"""S-parameter math utilities for NanoVNA measurement analysis. + +Pure-Python functions operating on complex S-parameter data from scan results. +No device access — just math on arrays of complex values. + +All functions accept complex numbers in the form returned by the scan tool: + s = complex(real, imag) +""" + +from __future__ import annotations + +import cmath +import math + + +def swr(s11: complex) -> float: + """Calculate Standing Wave Ratio from S11 reflection coefficient. + + Args: + s11: Complex reflection coefficient (Gamma) + + Returns: + SWR ratio (1.0 = perfect match, infinity = total reflection) + """ + gamma_mag = abs(s11) + if gamma_mag >= 1.0: + return float("inf") + return (1.0 + gamma_mag) / (1.0 - gamma_mag) + + +def return_loss(s11: complex) -> float: + """Calculate return loss in dB from S11. + + Args: + s11: Complex reflection coefficient + + Returns: + Return loss in dB (positive value; higher = better match) + """ + gamma_mag = abs(s11) + if gamma_mag <= 0.0: + return float("inf") + return -20.0 * math.log10(gamma_mag) + + +def impedance(s11: complex, z0: float = 50.0) -> complex: + """Calculate impedance from S11 reflection coefficient. + + Args: + s11: Complex reflection coefficient + z0: Reference impedance in ohms (default 50) + + Returns: + Complex impedance Z = R + jX in ohms + """ + denom = 1.0 - s11 + if abs(denom) < 1e-12: + return complex(float("inf"), 0.0) + return z0 * (1.0 + s11) / denom + + +def admittance(s11: complex, z0: float = 50.0) -> complex: + """Calculate admittance from S11 reflection coefficient. + + Args: + s11: Complex reflection coefficient + z0: Reference impedance in ohms (default 50) + + Returns: + Complex admittance Y = G + jB in siemens + """ + z = impedance(s11, z0) + if abs(z) < 1e-12: + return complex(float("inf"), 0.0) + return 1.0 / z + + +def phase_deg(s: complex) -> float: + """Calculate phase angle in degrees. + + Args: + s: Complex S-parameter value + + Returns: + Phase in degrees (-180 to +180) + """ + return math.degrees(cmath.phase(s)) + + +def insertion_loss(s21: complex) -> float: + """Calculate insertion loss in dB from S21. + + Args: + s21: Complex transmission coefficient + + Returns: + Insertion loss in dB (positive value; lower = less loss) + """ + mag = abs(s21) + if mag <= 0.0: + return float("inf") + return -20.0 * math.log10(mag) + + +def group_delay(s21_values: list[complex], frequencies_hz: list[int]) -> list[float]: + """Calculate group delay from S21 phase vs frequency. + + Uses finite differences: τ_g = -dφ/dω where ω = 2πf + + Args: + s21_values: List of complex S21 values at each frequency + frequencies_hz: Corresponding frequency points in Hz + + Returns: + Group delay in seconds at each point (same length as input; + endpoints use forward/backward difference) + """ + n = len(s21_values) + if n < 2 or len(frequencies_hz) != n: + return [0.0] * n + + phases = [cmath.phase(s) for s in s21_values] + + # Unwrap phase to avoid discontinuities + for i in range(1, n): + diff = phases[i] - phases[i - 1] + if diff > math.pi: + phases[i] -= 2.0 * math.pi * round(diff / (2.0 * math.pi)) + elif diff < -math.pi: + phases[i] += 2.0 * math.pi * round(-diff / (2.0 * math.pi)) + + delays = [] + for i in range(n): + if i == 0: + dphi = phases[1] - phases[0] + df = frequencies_hz[1] - frequencies_hz[0] + elif i == n - 1: + dphi = phases[n - 1] - phases[n - 2] + df = frequencies_hz[n - 1] - frequencies_hz[n - 2] + else: + dphi = phases[i + 1] - phases[i - 1] + df = frequencies_hz[i + 1] - frequencies_hz[i - 1] + + if df == 0: + delays.append(0.0) + else: + omega_diff = 2.0 * math.pi * df + delays.append(-dphi / omega_diff) + + return delays + + +def q_factor(s11_values: list[complex], frequencies_hz: list[int]) -> float: + """Calculate Q factor from S11 data (resonant circuit). + + Q = f0 / BW_3dB where f0 is the resonant frequency and BW_3dB + is the -3dB bandwidth of the return loss peak. + + Args: + s11_values: List of complex S11 values + frequencies_hz: Corresponding frequency points in Hz + + Returns: + Q factor (dimensionless). Returns 0 if resonance not found. + """ + if len(s11_values) < 3: + return 0.0 + + # Find resonance (minimum |S11|) + magnitudes = [abs(s) for s in s11_values] + min_idx = magnitudes.index(min(magnitudes)) + min_mag = magnitudes[min_idx] + f0 = frequencies_hz[min_idx] + + # 3dB point is where |S11| = min_mag * sqrt(2) + # (relative to the dip, not absolute) + threshold = min_mag * math.sqrt(2.0) + if threshold >= 1.0: + threshold = (1.0 + min_mag) / 2.0 # fallback + + # Search left for lower -3dB crossing + f_low = frequencies_hz[0] + for i in range(min_idx, 0, -1): + if magnitudes[i] >= threshold: + # Interpolate + if magnitudes[i] != magnitudes[i - 1]: + frac = (threshold - magnitudes[i]) / (magnitudes[i - 1] - magnitudes[i]) + else: + frac = 0.0 + f_low = frequencies_hz[i] + frac * (frequencies_hz[i - 1] - frequencies_hz[i]) + break + + # Search right for upper -3dB crossing + f_high = frequencies_hz[-1] + for i in range(min_idx, len(magnitudes) - 1): + if magnitudes[i] >= threshold: + if magnitudes[i] != magnitudes[i - 1]: + frac = (threshold - magnitudes[i - 1]) / (magnitudes[i] - magnitudes[i - 1]) + else: + frac = 0.0 + f_high = frequencies_hz[i - 1] + frac * (frequencies_hz[i] - frequencies_hz[i - 1]) + break + + bw = abs(f_high - f_low) + if bw < 1.0: + return 0.0 + return f0 / bw + + +def capacitance(reactance: float, frequency_hz: int) -> float: + """Calculate capacitance from reactance at a given frequency. + + C = -1 / (2πfX) for capacitive reactance (X < 0) + + Args: + reactance: Imaginary part of impedance (X) in ohms + frequency_hz: Frequency in Hz + + Returns: + Capacitance in farads. Returns 0 if reactance is non-negative. + """ + if reactance >= 0.0 or frequency_hz <= 0: + return 0.0 + return -1.0 / (2.0 * math.pi * frequency_hz * reactance) + + +def inductance(reactance: float, frequency_hz: int) -> float: + """Calculate inductance from reactance at a given frequency. + + L = X / (2πf) for inductive reactance (X > 0) + + Args: + reactance: Imaginary part of impedance (X) in ohms + frequency_hz: Frequency in Hz + + Returns: + Inductance in henries. Returns 0 if reactance is non-positive. + """ + if reactance <= 0.0 or frequency_hz <= 0: + return 0.0 + return reactance / (2.0 * math.pi * frequency_hz) + + +def find_resonance( + s11_values: list[complex], + frequencies_hz: list[int], + z0: float = 50.0, +) -> dict: + """Find the resonant frequency from S11 data. + + Resonance is the frequency where |S11| is minimum (best match). + + Args: + s11_values: List of complex S11 values + frequencies_hz: Corresponding frequency points in Hz + z0: Reference impedance in ohms + + Returns: + Dict with resonant frequency, SWR, impedance, return loss + """ + if not s11_values or len(s11_values) != len(frequencies_hz): + return {"error": "Invalid input data"} + + magnitudes = [abs(s) for s in s11_values] + min_idx = magnitudes.index(min(magnitudes)) + s11_at_res = s11_values[min_idx] + freq = frequencies_hz[min_idx] + z = impedance(s11_at_res, z0) + + return { + "frequency_hz": freq, + "swr": round(swr(s11_at_res), 4), + "return_loss_db": round(return_loss(s11_at_res), 2), + "impedance_real": round(z.real, 2), + "impedance_imag": round(z.imag, 2), + "phase_deg": round(phase_deg(s11_at_res), 2), + "gamma_mag": round(abs(s11_at_res), 6), + } + + +def find_bandwidth( + s11_values: list[complex], + frequencies_hz: list[int], + swr_limit: float = 2.0, +) -> dict: + """Find the bandwidth where SWR is below a given limit. + + Args: + s11_values: List of complex S11 values + frequencies_hz: Corresponding frequency points in Hz + swr_limit: SWR threshold (default 2.0:1) + + Returns: + Dict with lower/upper frequency bounds and bandwidth + """ + if not s11_values or len(s11_values) != len(frequencies_hz): + return {"error": "Invalid input data"} + + # Find all points within SWR limit + within = [] + for i, s in enumerate(s11_values): + if swr(s) <= swr_limit: + within.append(i) + + if not within: + return { + "swr_limit": swr_limit, + "bandwidth_hz": 0, + "f_low_hz": 0, + "f_high_hz": 0, + "f_center_hz": 0, + "in_band": False, + } + + # Find the longest contiguous run containing the minimum SWR point + magnitudes = [abs(s) for s in s11_values] + min_idx = magnitudes.index(min(magnitudes)) + + # Find contiguous segment around the resonance + f_low_idx = min_idx + f_high_idx = min_idx + while f_low_idx > 0 and swr(s11_values[f_low_idx - 1]) <= swr_limit: + f_low_idx -= 1 + while f_high_idx < len(s11_values) - 1 and swr(s11_values[f_high_idx + 1]) <= swr_limit: + f_high_idx += 1 + + f_low = frequencies_hz[f_low_idx] + f_high = frequencies_hz[f_high_idx] + bw = f_high - f_low + + return { + "swr_limit": swr_limit, + "bandwidth_hz": bw, + "f_low_hz": f_low, + "f_high_hz": f_high, + "f_center_hz": (f_low + f_high) // 2, + "in_band": True, + } + + +def analyze_scan( + scan_data: list[dict], + z0: float = 50.0, +) -> dict: + """Comprehensive analysis of scan results. + + Accepts the 'data' array from the scan tool's return value and produces + a full analysis report including resonance, bandwidth, and per-point metrics. + + Args: + scan_data: List of dicts with 'frequency_hz', 's11' (optional), 's21' (optional) + where s11/s21 have 'real' and 'imag' keys + z0: Reference impedance in ohms + + Returns: + Analysis report dict with summary and per-point data + """ + if not scan_data: + return {"error": "No scan data provided"} + + freqs = [] + s11_vals = [] + s21_vals = [] + has_s11 = "s11" in scan_data[0] + has_s21 = "s21" in scan_data[0] + + for pt in scan_data: + freqs.append(pt.get("frequency_hz", 0)) + if has_s11: + s = pt["s11"] + s11_vals.append(complex(s["real"], s["imag"])) + if has_s21: + s = pt["s21"] + s21_vals.append(complex(s["real"], s["imag"])) + + result: dict = { + "points": len(scan_data), + "start_hz": freqs[0] if freqs else 0, + "stop_hz": freqs[-1] if freqs else 0, + } + + if has_s11: + res = find_resonance(s11_vals, freqs, z0) + bw = find_bandwidth(s11_vals, freqs, swr_limit=2.0) + bw_1_5 = find_bandwidth(s11_vals, freqs, swr_limit=1.5) + + result["s11_analysis"] = { + "resonance": res, + "bandwidth_2_1": bw, + "bandwidth_1_5": bw_1_5, + } + + # Per-point S11 metrics + s11_points = [] + for i, s in enumerate(s11_vals): + z = impedance(s, z0) + entry = { + "frequency_hz": freqs[i], + "swr": round(swr(s), 4), + "return_loss_db": round(return_loss(s), 2), + "impedance_real": round(z.real, 2), + "impedance_imag": round(z.imag, 2), + "phase_deg": round(phase_deg(s), 2), + } + # Add reactive component identification + x = z.imag + if abs(x) > 0.1 and freqs[i] > 0: + if x < 0: + entry["capacitance_pf"] = round(capacitance(x, freqs[i]) * 1e12, 3) + else: + entry["inductance_nh"] = round(inductance(x, freqs[i]) * 1e9, 3) + s11_points.append(entry) + result["s11_points"] = s11_points + + if has_s21: + # Per-point S21 metrics + s21_points = [] + delays = group_delay(s21_vals, freqs) + for i, s in enumerate(s21_vals): + s21_points.append({ + "frequency_hz": freqs[i], + "insertion_loss_db": round(insertion_loss(s), 2), + "phase_deg": round(phase_deg(s), 2), + "group_delay_ns": round(delays[i] * 1e9, 3), + }) + result["s21_points"] = s21_points + + # S21 summary + il_values = [insertion_loss(s) for s in s21_vals] + min_il = min(il_values) + min_il_idx = il_values.index(min_il) + result["s21_analysis"] = { + "min_insertion_loss_db": round(min_il, 2), + "min_insertion_loss_freq_hz": freqs[min_il_idx], + "max_insertion_loss_db": round(max(il_values), 2), + } + + return result diff --git a/src/mcnanovna/nanovna.py b/src/mcnanovna/nanovna.py index cb770a2..ee9e95e 100644 --- a/src/mcnanovna/nanovna.py +++ b/src/mcnanovna/nanovna.py @@ -7,6 +7,7 @@ in server.py. The NanoVNA class manages connection lifecycle with lazy auto-conn from __future__ import annotations import base64 +import re from mcnanovna.discovery import find_first_nanovna, find_nanovna_ports from mcnanovna.protocol import ( @@ -356,8 +357,6 @@ class NanoVNA: if lines: line = lines[0].strip() # Try to parse "bandwidth (Hz)" - import re - m = re.match(r"bandwidth\s+(\d+)\s*\((\d+)\s*Hz\)", line, re.IGNORECASE) if m: return {"bandwidth_divider": int(m.group(1)), "bandwidth_hz": int(m.group(2))} @@ -686,3 +685,752 @@ class NanoVNA: self._protocol.send_text_command(f"sweep {frequency_hz} {frequency_hz} 1") self._protocol.send_text_command("resume") return {"frequency_hz": frequency_hz, "power": power, "mode": "cw"} + + # ── Phase 1 additions: Full protocol coverage ───────────────────── + + # -- Essential commands -- + + def measure(self, mode: str | None = None) -> dict: + """Set on-device measurement display mode. + + Controls what the NanoVNA computes and displays on-screen. Available + modes depend on firmware build flags. + + Args: + mode: Measurement mode — one of: none, lc, lcshunt, lcseries, + xtal, filter, cable, resonance. Omit to get usage help. + """ + self._ensure_connected() + if not self._has_capability("measure"): + return {"error": "measure command not supported by this firmware"} + if mode is not None: + lines = self._protocol.send_text_command(f"measure {mode}") + return {"mode": mode, "response": lines} + lines = self._protocol.send_text_command("measure") + return {"response": lines} + + def config(self, option: str | None = None, value: int | None = None) -> dict: + """Query or set device configuration options. + + Options depend on firmware build (e.g., auto, avg, connection, mode, + grid, dot, bk, flip, separator, tif). Each takes a value of 0 or 1. + + Args: + option: Configuration option name + value: Value to set (0 or 1) + """ + self._ensure_connected() + if not self._has_capability("config"): + return {"error": "config command not supported by this firmware"} + if option is not None and value is not None: + lines = self._protocol.send_text_command(f"config {option} {value}") + return {"option": option, "value": value, "response": lines} + lines = self._protocol.send_text_command("config") + return {"response": lines} + + def saveconfig(self) -> dict: + """Save current device configuration to flash memory. + + Saves config_t (distinct from calibration save which uses slots). + This includes touch calibration, display settings, TCXO frequency, etc. + """ + self._ensure_connected() + lines = self._protocol.send_text_command("saveconfig") + return {"saved": True, "response": lines} + + def clearconfig(self, key: str = "1234") -> dict: + """Clear all stored configuration and calibration data from flash. + + This is a destructive operation — requires the protection key '1234'. + After clearing, you must recalibrate the device. + + Args: + key: Protection key (must be '1234' to confirm) + """ + self._ensure_connected() + if key != "1234": + return {"error": "Protection key must be '1234' to confirm clearing all data"} + lines = self._protocol.send_text_command(f"clearconfig {key}") + return {"cleared": True, "response": lines} + + def color(self, color_id: int | None = None, rgb24: int | None = None) -> dict: + """Get or set display color palette entries. + + With no args, lists all color slots and their current RGB values. + With id + rgb24, sets a specific color. + + Args: + color_id: Palette slot index (0-31) + rgb24: 24-bit RGB color value (e.g., 0xFF0000 for red) + """ + self._ensure_connected() + if not self._has_capability("color"): + return {"error": "color command not supported by this firmware"} + if color_id is not None and rgb24 is not None: + self._protocol.send_text_command(f"color {color_id} {rgb24}") + return {"color_id": color_id, "rgb24": f"0x{rgb24:06x}", "set": True} + lines = self._protocol.send_text_command("color") + colors = [] + for line in lines: + line = line.strip() + if ":" in line: + parts = line.split(":") + try: + idx = int(parts[0].strip()) + val = parts[1].strip() + colors.append({"id": idx, "rgb24": val}) + except (ValueError, IndexError): + pass + return {"colors": colors, "raw": lines} + + def freq(self, frequency_hz: int) -> dict: + """Set a single output frequency and pause the sweep. + + Useful for CW measurements or signal generation at a specific frequency. + The sweep is paused automatically. + + Args: + frequency_hz: Output frequency in Hz + """ + self._ensure_connected() + self._protocol.send_text_command(f"freq {frequency_hz}") + return {"frequency_hz": frequency_hz, "sweep": "paused"} + + def tcxo(self, frequency_hz: int | None = None) -> dict: + """Get or set the TCXO reference oscillator frequency. + + The TCXO frequency affects all frequency accuracy. Default is typically + 26000000 Hz. Adjust if you have a precision frequency reference. + + Args: + frequency_hz: TCXO frequency in Hz (e.g., 26000000) + """ + self._ensure_connected() + if not self._has_capability("tcxo"): + return {"error": "tcxo command not supported by this firmware"} + if frequency_hz is not None: + self._protocol.send_text_command(f"tcxo {frequency_hz}") + return {"tcxo_hz": frequency_hz, "set": True} + # Query mode returns "current: " + lines = self._protocol.send_text_command("tcxo") + for line in lines: + if "current:" in line.lower(): + parts = line.split(":") + if len(parts) >= 2: + try: + return {"tcxo_hz": int(parts[1].strip())} + except ValueError: + pass + return {"response": lines} + + def vbat_offset(self, offset: int | None = None) -> dict: + """Get or set battery voltage measurement offset. + + Calibrates the battery voltage reading. The offset is added to the + raw ADC value to compensate for hardware variations. + + Args: + offset: Voltage offset value (raw ADC units) + """ + self._ensure_connected() + if not self._has_capability("vbat_offset"): + return {"error": "vbat_offset command not supported by this firmware"} + if offset is not None: + self._protocol.send_text_command(f"vbat_offset {offset}") + return {"offset": offset, "set": True} + lines = self._protocol.send_text_command("vbat_offset") + if lines: + try: + return {"offset": int(lines[0].strip())} + except ValueError: + pass + return {"response": lines} + + # -- Touch / Remote Desktop -- + + def touchcal(self) -> dict: + """Start touch screen calibration sequence. + + Interactive: the device displays calibration targets. Touch the upper-left + corner, then the lower-right corner when prompted. Returns the calibration + parameters when complete. + + Note: This is an interactive hardware procedure that requires physical + touch input on the device screen. + """ + self._ensure_connected() + lines = self._protocol.send_text_command("touchcal", timeout=30.0) + # Parse "touch cal params: a b c d" + for line in lines: + if "touch cal params:" in line.lower(): + parts = line.split(":") + if len(parts) >= 2: + vals = parts[1].strip().split() + if len(vals) >= 4: + try: + return { + "calibrated": True, + "params": [int(v) for v in vals[:4]], + "response": lines, + } + except ValueError: + pass + return {"response": lines} + + def touchtest(self) -> dict: + """Start touch screen test mode. + + Enters a mode where touch points are drawn on screen for verification. + Useful for checking touch calibration accuracy. Exit by sending another + command or resetting. + """ + self._ensure_connected() + lines = self._protocol.send_text_command("touchtest", timeout=10.0) + return {"mode": "touchtest", "response": lines} + + def refresh(self, enable: str | None = None) -> dict: + """Enable or disable remote desktop refresh mode. + + When enabled, the device streams display updates over the serial + connection for remote viewing. + + Args: + enable: 'on' to enable remote desktop, 'off' to disable + """ + self._ensure_connected() + if enable is not None: + if enable not in ("on", "off"): + return {"error": "enable must be 'on' or 'off'"} + lines = self._protocol.send_text_command(f"refresh {enable}") + return {"remote_desktop": enable, "response": lines} + return {"error": "usage: refresh on|off"} + + def touch(self, x: int, y: int) -> dict: + """Send a touch press event at screen coordinates. + + Simulates a finger press on the NanoVNA touchscreen for remote control. + Follow with 'release' to complete the touch gesture. + + Args: + x: X coordinate (0 to lcd_width-1) + y: Y coordinate (0 to lcd_height-1) + """ + self._ensure_connected() + lines = self._protocol.send_text_command(f"touch {x} {y}") + return {"action": "press", "x": x, "y": y, "response": lines} + + def release(self, x: int = -1, y: int = -1) -> dict: + """Send a touch release event at screen coordinates. + + Completes a touch gesture started with 'touch'. If coordinates + are omitted (-1), releases at the last touch position. + + Args: + x: X coordinate (-1 for last position) + y: Y coordinate (-1 for last position) + """ + self._ensure_connected() + if x >= 0 and y >= 0: + lines = self._protocol.send_text_command(f"release {x} {y}") + else: + lines = self._protocol.send_text_command("release") + return {"action": "release", "x": x, "y": y, "response": lines} + + # -- SD Card storage -- + + def sd_list(self, pattern: str = "*.*") -> dict: + """List files on the SD card. + + Requires SD card hardware support. Returns filenames and sizes. + + Args: + pattern: Glob pattern to filter files (default: '*.*') + """ + self._ensure_connected() + if not self._has_capability("sd_list"): + return {"error": "SD card commands not supported by this firmware/hardware"} + lines = self._protocol.send_text_command(f"sd_list {pattern}", timeout=10.0) + files = [] + for line in lines: + line = line.strip() + if not line or "err:" in line.lower(): + if "err:" in line.lower(): + return {"error": line} + continue + parts = line.rsplit(" ", 1) + if len(parts) == 2: + try: + files.append({"name": parts[0], "size": int(parts[1])}) + except ValueError: + files.append({"name": line, "size": 0}) + else: + files.append({"name": line, "size": 0}) + return {"files": files, "count": len(files)} + + def sd_read(self, filename: str) -> dict: + """Read a file from the SD card. + + Returns the file content as base64-encoded data. The firmware sends + a 4-byte size header followed by raw file data. + + Args: + filename: Name of the file to read + """ + self._ensure_connected() + if not self._has_capability("sd_read"): + return {"error": "SD card commands not supported by this firmware/hardware"} + + import struct + import time + + self._protocol._drain() + self._protocol._send_command(f"sd_read {filename}") + + ser = self._protocol._require_connection() + old_timeout = ser.timeout + ser.timeout = 10.0 + try: + buf = b"" + deadline = time.monotonic() + 10.0 + + # Read past echo line + while time.monotonic() < deadline: + chunk = ser.read(max(1, ser.in_waiting)) + if chunk: + buf += chunk + if b"\r\n" in buf: + break + + # Check for error response + content = buf.decode("ascii", errors="replace") + if "err:" in content.lower(): + return {"error": "File not found or no SD card"} + + echo_end = buf.index(b"\r\n") + 2 + data_buf = buf[echo_end:] + + # Read 4-byte file size header + while len(data_buf) < 4 and time.monotonic() < deadline: + chunk = ser.read(4 - len(data_buf)) + if chunk: + data_buf += chunk + + if len(data_buf) < 4: + return {"error": "Failed to read file size header"} + + file_size = struct.unpack_from(" " in trailing or not chunk: + break + + return { + "filename": filename, + "size": file_size, + "data_base64": base64.b64encode(data_buf[:file_size]).decode("ascii"), + } + finally: + ser.timeout = old_timeout + + def sd_delete(self, filename: str) -> dict: + """Delete a file from the SD card. + + Args: + filename: Name of the file to delete + """ + self._ensure_connected() + if not self._has_capability("sd_delete"): + return {"error": "SD card commands not supported by this firmware/hardware"} + lines = self._protocol.send_text_command(f"sd_delete {filename}", timeout=10.0) + success = any("ok" in line.lower() for line in lines) + return {"filename": filename, "deleted": success, "response": lines} + + def time( + self, + field: str | None = None, + value: int | None = None, + ) -> dict: + """Get or set RTC (real-time clock) time. + + With no args, returns current date/time. With field + value, sets + a specific time component. + + Args: + field: Time field to set: y, m, d, h, min, sec, or ppm + value: Value for the field (0-99 for date/time, float for ppm) + """ + self._ensure_connected() + if not self._has_capability("time"): + return {"error": "RTC (time) command not supported by this firmware/hardware"} + if field is not None and value is not None: + lines = self._protocol.send_text_command(f"time {field} {value}") + return {"field": field, "value": value, "response": lines} + lines = self._protocol.send_text_command("time") + # Response format: "20YY/MM/DD HH:MM:SS\nusage: ..." + for line in lines: + line = line.strip() + if "/" in line and ":" in line and "usage" not in line.lower(): + return {"datetime": line} + return {"response": lines} + + # -- Debug / Diagnostic tools -- + + def i2c(self, page: int, reg: int, data: int) -> dict: + """Write to an I2C register on the TLV320AIC3204 audio codec. + + Low-level diagnostic tool for direct codec register access. + + Args: + page: I2C register page number + reg: Register address within the page + data: Byte value to write (0-255) + """ + self._ensure_connected() + if not self._has_capability("i2c"): + return {"error": "i2c command not supported by this firmware"} + lines = self._protocol.send_text_command(f"i2c {page} {reg} {data}") + return {"page": page, "reg": reg, "data": data, "response": lines} + + def si(self, reg: int, value: int) -> dict: + """Write to a Si5351 frequency synthesizer register. + + Low-level diagnostic tool for direct Si5351 register access. + + Args: + reg: Si5351 register address + value: Byte value to write (0-255) + """ + self._ensure_connected() + if not self._has_capability("si"): + return {"error": "si (Si5351 register) command not supported by this firmware"} + lines = self._protocol.send_text_command(f"si {reg} {value}") + return {"reg": reg, "value": value, "response": lines} + + def lcd(self, register: int, data_bytes: list[int] | None = None) -> dict: + """Send a register command to the LCD display controller. + + Low-level diagnostic tool. First arg is the register/command byte, + remaining are data bytes. + + Args: + register: LCD register/command byte + data_bytes: Additional data bytes (list of ints) + """ + self._ensure_connected() + if not self._has_capability("lcd"): + return {"error": "lcd command not supported by this firmware"} + parts = [str(register)] + if data_bytes: + parts.extend(str(d) for d in data_bytes) + lines = self._protocol.send_text_command(f"lcd {' '.join(parts)}") + # Response: "ret = 0x..." + for line in lines: + if "ret" in line.lower(): + return {"register": register, "data_bytes": data_bytes or [], "result": line.strip()} + return {"register": register, "data_bytes": data_bytes or [], "response": lines} + + def threads(self) -> dict: + """List ChibiOS RTOS thread information. + + Shows all running threads with their stack usage, priority, and state. + Useful for diagnosing firmware issues. + """ + self._ensure_connected() + if not self._has_capability("threads"): + return {"error": "threads command not supported by this firmware"} + lines = self._protocol.send_text_command("threads") + threads = [] + for line in lines: + line = line.strip() + if not line or "stklimit" in line.lower(): + continue # Skip header + parts = line.split("|") + if len(parts) >= 8: + threads.append({ + "stk_limit": parts[0].strip(), + "stack": parts[1].strip(), + "stk_free": parts[2].strip(), + "addr": parts[3].strip(), + "refs": parts[4].strip(), + "prio": parts[5].strip(), + "state": parts[6].strip(), + "name": parts[7].strip(), + }) + return {"threads": threads, "count": len(threads)} + + def stat(self) -> dict: + """Get audio ADC statistics for both channels. + + Returns average and RMS values for the reference and signal channels + on both ports. Useful for diagnosing signal level issues. + """ + self._ensure_connected() + if not self._has_capability("stat"): + return {"error": "stat command not supported by this firmware"} + lines = self._protocol.send_text_command("stat", timeout=10.0) + channels = [] + current_ch = {} + for line in lines: + line = line.strip() + if line.startswith("Ch:"): + if current_ch: + channels.append(current_ch) + current_ch = {"channel": line.split(":")[1].strip()} + elif "average:" in line.lower(): + nums = re.findall(r"-?\d+", line) + if len(nums) >= 2: + current_ch["average_ref"] = int(nums[0]) + current_ch["average_signal"] = int(nums[1]) + elif "rms:" in line.lower(): + nums = re.findall(r"-?\d+", line) + if len(nums) >= 2: + current_ch["rms_ref"] = int(nums[0]) + current_ch["rms_signal"] = int(nums[1]) + if current_ch: + channels.append(current_ch) + return {"channels": channels} + + def sample(self, mode: str) -> dict: + """Set the ADC sample capture function. + + Controls what data the sample command captures from the audio ADC. + + Args: + mode: Sample mode — 'gamma' (complex reflection), 'ampl' (amplitude), + or 'ref' (reference amplitude) + """ + self._ensure_connected() + if not self._has_capability("sample"): + return {"error": "sample command not supported by this firmware"} + valid = {"gamma", "ampl", "ref"} + if mode not in valid: + return {"error": f"Invalid mode '{mode}'. Valid: {', '.join(sorted(valid))}"} + lines = self._protocol.send_text_command(f"sample {mode}") + return {"mode": mode, "response": lines} + + def test(self) -> dict: + """Run hardware self-test. + + Executes built-in hardware diagnostics. The specific tests depend + on the firmware build configuration. + """ + self._ensure_connected() + if not self._has_capability("test"): + return {"error": "test command not supported by this firmware"} + lines = self._protocol.send_text_command("test", timeout=30.0) + return {"response": lines} + + def gain(self, lgain: int, rgain: int | None = None) -> dict: + """Set audio codec gain levels. + + Controls the TLV320AIC3204 PGA (Programmable Gain Amplifier). + + Args: + lgain: Left channel gain (0-95, in 0.5 dB steps) + rgain: Right channel gain (0-95). If omitted, uses lgain for both. + """ + self._ensure_connected() + if not self._has_capability("gain"): + return {"error": "gain command not supported by this firmware"} + if rgain is not None: + lines = self._protocol.send_text_command(f"gain {lgain} {rgain}") + return {"lgain": lgain, "rgain": rgain, "response": lines} + lines = self._protocol.send_text_command(f"gain {lgain}") + return {"lgain": lgain, "rgain": lgain, "response": lines} + + def dump(self, channel: int = 0) -> dict: + """Dump raw audio ADC samples. + + Captures and returns raw sample data from the audio buffer. + Useful for signal analysis and debugging. + + Args: + channel: Audio channel to dump (0 or 1) + """ + self._ensure_connected() + if not self._has_capability("dump"): + return {"error": "dump command not supported by this firmware"} + lines = self._protocol.send_text_command(f"dump {channel}", timeout=10.0) + samples = [] + for line in lines: + for val in line.strip().split(): + try: + samples.append(int(val)) + except ValueError: + pass + return {"channel": channel, "samples": samples, "count": len(samples)} + + def port(self, port_num: int) -> dict: + """Select the active audio port (TX or RX). + + Switches the TLV320AIC3204 codec between transmit and receive paths. + + Args: + port_num: Port number (0=TX, 1=RX) + """ + self._ensure_connected() + if not self._has_capability("port"): + return {"error": "port command not supported by this firmware"} + lines = self._protocol.send_text_command(f"port {port_num}") + return {"port": port_num, "description": "TX" if port_num == 0 else "RX", "response": lines} + + def offset(self, frequency_hz: int | None = None) -> dict: + """Get or set the variable IF frequency offset. + + Adjusts the intermediate frequency offset used in the measurement + pipeline. Only available on firmware builds with USE_VARIABLE_OFFSET. + + Args: + frequency_hz: IF offset frequency in Hz + """ + self._ensure_connected() + if not self._has_capability("offset"): + return {"error": "offset command not supported by this firmware"} + if frequency_hz is not None: + lines = self._protocol.send_text_command(f"offset {frequency_hz}") + return {"offset_hz": frequency_hz, "set": True, "response": lines} + lines = self._protocol.send_text_command("offset") + for line in lines: + if "current:" in line.lower(): + parts = line.split(":") + if len(parts) >= 2: + try: + return {"offset_hz": int(parts[1].strip())} + except ValueError: + pass + return {"response": lines} + + def dac(self, value: int | None = None) -> dict: + """Get or set the DAC output value. + + Controls the on-chip DAC (used for bias voltage or other analog output). + Range is 0-4095 (12-bit). + + Args: + value: DAC value (0-4095) + """ + self._ensure_connected() + if not self._has_capability("dac"): + return {"error": "dac command not supported by this firmware/hardware"} + if value is not None: + lines = self._protocol.send_text_command(f"dac {value}") + return {"dac_value": value, "set": True, "response": lines} + lines = self._protocol.send_text_command("dac") + for line in lines: + if "current:" in line.lower(): + parts = line.split(":") + if len(parts) >= 2: + try: + return {"dac_value": int(parts[1].strip())} + except ValueError: + pass + return {"response": lines} + + def usart_cfg(self, baudrate: int | None = None) -> dict: + """Get or set the USART serial port configuration. + + Controls the secondary serial port (USART) baud rate. The USART can + be used for external device communication or serial console. + + Args: + baudrate: Baud rate (minimum 300). Omit to query current setting. + """ + self._ensure_connected() + if not self._has_capability("usart_cfg"): + return {"error": "usart_cfg command not supported by this firmware"} + if baudrate is not None: + lines = self._protocol.send_text_command(f"usart_cfg {baudrate}") + return {"baudrate": baudrate, "set": True, "response": lines} + lines = self._protocol.send_text_command("usart_cfg") + # Response: "Serial: baud" + for line in lines: + if "baud" in line.lower(): + m = re.search(r"(\d+)\s*baud", line, re.IGNORECASE) + if m: + return {"baudrate": int(m.group(1))} + return {"response": lines} + + def usart(self, data: str, timeout_ms: int = 200) -> dict: + """Send data through the USART serial port and read the response. + + Forwards data to an external device connected to the USART port + and returns any response received within the timeout period. + + Args: + data: String data to send + timeout_ms: Response timeout in milliseconds (default 200) + """ + self._ensure_connected() + if not self._has_capability("usart"): + return {"error": "usart command not supported by this firmware"} + lines = self._protocol.send_text_command(f"usart {data} {timeout_ms}", timeout=5.0) + return {"sent": data, "timeout_ms": timeout_ms, "response": lines} + + def band(self, index: int, param: str, value: int) -> dict: + """Configure frequency band parameters for the Si5351 synthesizer. + + Low-level control of per-band synthesizer settings. Parameters include + mode, frequency, divider, multiplier, and power settings. + + Args: + index: Band index + param: Parameter name — mode, freq, div, mul, omul, pow, opow, l, r, lr, adj + value: Parameter value + """ + self._ensure_connected() + if not self._has_capability("b"): + return {"error": "band (b) command not supported by this firmware"} + lines = self._protocol.send_text_command(f"b {index} {param} {value}") + return {"index": index, "param": param, "value": value, "response": lines} + + # ── Convenience: analyze scan data server-side ──────────────────── + + def analyze( + self, + start_hz: int, + stop_hz: int, + points: int = 101, + s11: bool = True, + s21: bool = False, + ) -> dict: + """Run a scan and return comprehensive S-parameter analysis. + + Combines the scan tool with server-side calculations to produce + a full measurement report including SWR, impedance, bandwidth, + return loss, and reactive components — without the LLM needing + to do the math. + + Args: + start_hz: Start frequency in Hz + stop_hz: Stop frequency in Hz + points: Number of measurement points (default 101) + s11: Include S11 reflection analysis (default True) + s21: Include S21 transmission analysis (default False) + """ + from mcnanovna.calculations import analyze_scan + + scan_result = self.scan(start_hz, stop_hz, points, s11=s11, s21=s21) + if "error" in scan_result: + return scan_result + + analysis = analyze_scan(scan_result["data"]) + analysis["scan_info"] = { + "start_hz": start_hz, + "stop_hz": stop_hz, + "points": scan_result["points"], + "binary": scan_result.get("binary", False), + } + return analysis diff --git a/src/mcnanovna/prompts.py b/src/mcnanovna/prompts.py new file mode 100644 index 0000000..df81081 --- /dev/null +++ b/src/mcnanovna/prompts.py @@ -0,0 +1,343 @@ +"""FastMCP prompts for guided NanoVNA workflows. + +Prompts are conversation templates that guide the LLM through multi-step +procedures like calibration, Touchstone export, and antenna analysis. +They return lists of messages that set up context for the task. +""" + +from __future__ import annotations + +from fastmcp import FastMCP +from fastmcp.prompts import Message + + +HAM_BANDS = { + "160m": (1_800_000, 2_000_000), + "80m": (3_500_000, 4_000_000), + "60m": (5_330_500, 5_403_500), + "40m": (7_000_000, 7_300_000), + "30m": (10_100_000, 10_150_000), + "20m": (14_000_000, 14_350_000), + "17m": (18_068_000, 18_168_000), + "15m": (21_000_000, 21_450_000), + "12m": (24_890_000, 24_990_000), + "10m": (28_000_000, 29_700_000), + "6m": (50_000_000, 54_000_000), + "2m": (144_000_000, 148_000_000), + "70cm": (420_000_000, 450_000_000), + "23cm": (1_240_000_000, 1_300_000_000), + "full": (50_000, 900_000_000), + "hf": (1_800_000, 30_000_000), + "vhf": (50_000_000, 300_000_000), + "uhf": (300_000_000, 1_000_000_000), +} + + +def _format_freq(hz: int) -> str: + """Format frequency for display.""" + if hz >= 1_000_000_000: + return f"{hz / 1e9:.3f} GHz" + if hz >= 1_000_000: + return f"{hz / 1e6:.3f} MHz" + if hz >= 1_000: + return f"{hz / 1e3:.1f} kHz" + return f"{hz} Hz" + + +def register_prompts(mcp: FastMCP) -> None: + """Register all NanoVNA workflow prompts on the FastMCP server.""" + + @mcp.prompt + def calibrate( + band: str = "full", + start_hz: int | None = None, + stop_hz: int | None = None, + points: int = 101, + save_slot: int = 0, + ) -> list[Message]: + """Guide through full SOLT calibration for a frequency band. + + Walks through Short-Open-Load-Thru calibration with optional + isolation step. Can specify a ham band name or explicit frequencies. + + Args: + band: Band name (e.g., '2m', '70cm', 'hf', 'full') or 'custom' + start_hz: Start frequency in Hz (overrides band preset) + stop_hz: Stop frequency in Hz (overrides band preset) + points: Number of sweep points + save_slot: Flash slot to save calibration (0-4 for H, 0-6 for H4) + """ + if start_hz is not None and stop_hz is not None: + f_start, f_stop = start_hz, stop_hz + band_label = f"Custom ({_format_freq(f_start)} – {_format_freq(f_stop)})" + elif band in HAM_BANDS: + f_start, f_stop = HAM_BANDS[band] + band_label = f"{band.upper()} ({_format_freq(f_start)} – {_format_freq(f_stop)})" + else: + f_start, f_stop = HAM_BANDS["full"] + band_label = f"Full range ({_format_freq(f_start)} – {_format_freq(f_stop)})" + + return [ + Message( + role="user", + content=( + f"I need to calibrate my NanoVNA for the {band_label} band " + f"with {points} points. Save to slot {save_slot} when done." + ), + ), + Message( + role="assistant", + content=f"""I'll guide you through a full SOLT calibration. Here's the plan: + +**Band**: {band_label} +**Points**: {points} +**Save slot**: {save_slot} + +You'll need these calibration standards: +- 50\u03a9 load termination +- Open standard (or leave SMA connector open) +- Short standard +- Through cable (SMA male-male or appropriate adapter) +- Two 50\u03a9 loads for isolation step (optional but recommended) + +**Step 1**: First, I'll set the sweep range to {_format_freq(f_start)} – {_format_freq(f_stop)} with {points} points using the `sweep` tool. + +**Step 2**: Connect the **50\u03a9 LOAD** to Port 1 (CH0). Tell me when it's connected, and I'll run `cal load`. + +**Step 3**: Connect the **OPEN** standard to Port 1. I'll run `cal open`. + +**Step 4**: Connect the **SHORT** standard to Port 1. I'll run `cal short`. + +**Step 5**: Connect the **THROUGH** cable between Port 1 and Port 2. I'll run `cal thru`. + +**Step 6** (optional): Connect 50\u03a9 loads to **both** ports. I'll run `cal isoln`. + +**Step 7**: I'll run `cal done` to compute the error correction terms. + +**Step 8**: Finally, I'll save to slot {save_slot} with the `save` tool. + +Let me start by setting the sweep range. Ready?""", + ), + ] + + @mcp.prompt + def export_touchstone( + start_hz: int = 1_000_000, + stop_hz: int = 900_000_000, + points: int = 101, + format: str = "s1p", + ) -> list[Message]: + """Guide through exporting S-parameters to Touchstone file format. + + Runs a scan and formats results as .s1p (reflection only) or + .s2p (reflection + transmission) Touchstone content per IEEE Std 1363. + + Args: + start_hz: Start frequency in Hz + stop_hz: Stop frequency in Hz + points: Number of measurement points + format: Output format — 's1p' (S11 only) or 's2p' (S11 + S21) + """ + s2p = format.lower() == "s2p" + file_ext = ".s2p" if s2p else ".s1p" + + return [ + Message( + role="user", + content=( + f"Export a Touchstone {file_ext} file from " + f"{_format_freq(start_hz)} to {_format_freq(stop_hz)} " + f"with {points} points." + ), + ), + Message( + role="assistant", + content=f"""I'll run a scan and format the results as a Touchstone {file_ext} file. + +**Scan range**: {_format_freq(start_hz)} – {_format_freq(stop_hz)} +**Points**: {points} +**Format**: {'S2P (S11 + S21 — requires DUT connected between ports)' if s2p else 'S1P (S11 reflection only)'} + +**Touchstone format reference** (IEEE Std 1363): +``` +! NanoVNA-H measurement +# Hz S RI R 50 +! freq {'s11_re s11_im s21_re s21_im s12_re s12_im s22_re s22_im' if s2p else 's11_re s11_im'} +``` + +I'll: +1. Run a `scan` from {start_hz} to {stop_hz} with {points} points{', capturing both S11 and S21' if s2p else ', S11 only'} +2. Format each data point as real/imaginary pairs +3. {'For S2P: S12 and S22 will be set to 0+j0 (NanoVNA is a 1-port/2-port device that measures S11 and S21 only)' if s2p else 'Each line: frequency S11_real S11_imag'} +4. Present the complete file content for you to save + +Let me run the scan now.""", + ), + ] + + @mcp.prompt + def analyze_antenna( + band: str = "2m", + start_hz: int | None = None, + stop_hz: int | None = None, + points: int = 101, + ) -> list[Message]: + """Guide through antenna analysis with SWR, impedance, and bandwidth. + + Scans the antenna's frequency range and computes resonant frequency, + SWR at resonance, bandwidth, impedance, and return loss. + + Args: + band: Ham band name (e.g., '2m', '70cm', '20m') or 'custom' + start_hz: Start frequency in Hz (overrides band) + stop_hz: Stop frequency in Hz (overrides band) + points: Number of measurement points + """ + if start_hz is not None and stop_hz is not None: + f_start, f_stop = start_hz, stop_hz + band_label = f"Custom ({_format_freq(f_start)} – {_format_freq(f_stop)})" + elif band in HAM_BANDS: + f_start, f_stop = HAM_BANDS[band] + band_label = f"{band.upper()} band" + else: + f_start, f_stop = HAM_BANDS["2m"] + band_label = "2M band" + + return [ + Message( + role="user", + content=( + f"Analyze my antenna on the {band_label} " + f"({_format_freq(f_start)} – {_format_freq(f_stop)})." + ), + ), + Message( + role="assistant", + content=f"""I'll perform a comprehensive antenna analysis for the {band_label}. + +**What I'll measure:** +- Resonant frequency (minimum return loss / SWR) +- SWR at resonance and across the band +- Bandwidth where SWR < 2:1 +- Complex impedance at resonance (R + jX) +- Return loss in dB +- Reactive component (capacitive/inductive) away from resonance + +**Procedure:** +1. Ensure your antenna is connected to Port 1 (CH0) +2. Run an S11 scan from {_format_freq(f_start)} to {_format_freq(f_stop)} with {points} points +3. Use the `calculations.analyze_scan()` utility to compute all metrics +4. Present results with actionable insights + +**Tips for accurate measurements:** +- Use a calibrated setup (run `calibrate` prompt first if needed) +- Keep feedline away from the antenna under test +- Ensure connectors are tight and clean + +Let me run the scan and analysis now.""", + ), + ] + + @mcp.prompt + def measure_cable( + start_hz: int = 100_000, + stop_hz: int = 300_000_000, + points: int = 201, + ) -> list[Message]: + """Guide through cable and transmission line analysis. + + Measures cable characteristics from S11: velocity factor, + cable loss, impedance profile, and distance to fault. + + Args: + start_hz: Start frequency in Hz + stop_hz: Stop frequency in Hz + points: Number of measurement points + """ + return [ + Message( + role="user", + content=( + f"Analyze a cable/transmission line from " + f"{_format_freq(start_hz)} to {_format_freq(stop_hz)}." + ), + ), + Message( + role="assistant", + content=f"""I'll measure your cable's characteristics using S11 reflection data. + +**What I'll determine:** +- **Cable impedance** from the Smith chart / impedance data +- **Cable loss** at various frequencies (from return loss with open/short termination) +- **Electrical length** from the phase slope +- **Velocity factor** if physical length is known +- **Distance to fault** using time-domain transform (if supported) + +**Setup for cable analysis:** +1. Connect one end of the cable to Port 1 (CH0) +2. **Leave the far end OPEN** for the first measurement +3. I'll scan S11 from {_format_freq(start_hz)} to {_format_freq(stop_hz)} with {points} points + +**For complete cable characterization:** +- First sweep: far end **open** (measures impedance + electrical length) +- Second sweep: far end **shorted** (helps determine cable loss) +- Third sweep: far end terminated in **50\u03a9** (verifies cable impedance) + +**Advanced**: If your firmware supports `transform`, I can also enable time-domain +mode to show distance-to-fault as a TDR (Time Domain Reflectometer). + +Let me start with the open-ended measurement. Is the cable connected with the far end open?""", + ), + ] + + @mcp.prompt + def compare_sweeps( + start_hz: int = 1_000_000, + stop_hz: int = 900_000_000, + points: int = 101, + ) -> list[Message]: + """Guide through before/after sweep comparison. + + Takes two measurements and produces a delta comparison showing + changes in return loss, SWR, and impedance. + + Args: + start_hz: Start frequency in Hz + stop_hz: Stop frequency in Hz + points: Number of measurement points + """ + return [ + Message( + role="user", + content=( + f"I want to compare two sweeps (before/after) from " + f"{_format_freq(start_hz)} to {_format_freq(stop_hz)}." + ), + ), + Message( + role="assistant", + content=f"""I'll take two measurements and compare them to show what changed. + +**Use cases:** +- Before/after a filter modification +- Comparing two antennas +- Checking the effect of a matching network +- Verifying a repair + +**Procedure:** +1. Set up the **"before"** condition (e.g., original antenna, unmodified filter) +2. I'll run scan #1 from {_format_freq(start_hz)} to {_format_freq(stop_hz)} with {points} points +3. You make your change (swap antenna, add matching network, etc.) +4. I'll run scan #2 with identical settings +5. I'll compute and display deltas for: + - Return loss (dB change at each frequency) + - SWR improvement/degradation + - Impedance shift + - Bandwidth change + +**Important**: Both scans must use the same frequency range and point count +for a valid comparison. Don't change calibration between scans. + +Set up the **"before"** condition and tell me when ready for scan #1.""", + ), + ] diff --git a/src/mcnanovna/server.py b/src/mcnanovna/server.py index b000b29..61e1830 100644 --- a/src/mcnanovna/server.py +++ b/src/mcnanovna/server.py @@ -1,6 +1,6 @@ """FastMCP server for NanoVNA-H vector network analyzers. -Registers all NanoVNA tool methods and starts the MCP server. +Registers all NanoVNA tool methods, prompts, and starts the MCP server. """ from __future__ import annotations @@ -9,13 +9,31 @@ from fastmcp import FastMCP from fastmcp.tools.tool import FunctionTool from mcnanovna.nanovna import NanoVNA +from mcnanovna.prompts import register_prompts -# All public methods on NanoVNA that should become MCP tools +# All public methods on NanoVNA that should become MCP tools. +# Grouped by category for maintainability. _TOOL_METHODS = [ + # Tier 1: Essential measurement & control "info", "sweep", "scan", "data", "frequencies", "marker", "cal", - "save", "recall", "pause", "resume", "power", "bandwidth", "edelay", - "s21offset", "vbat", "capture", "trace", "transform", "smooth", - "threshold", "reset", "version", "detect", "disconnect", "raw_command", "cw", + "save", "recall", "pause", "resume", + # Tier 2: Configuration + "power", "bandwidth", "edelay", "s21offset", "vbat", "capture", + # Tier 3: Advanced + "trace", "transform", "smooth", "threshold", "reset", "version", + "detect", "disconnect", "raw_command", "cw", + # Phase 1 additions: Essential commands + "measure", "config", "saveconfig", "clearconfig", "color", "freq", + "tcxo", "vbat_offset", + # Phase 1 additions: Touch / Remote Desktop + "touchcal", "touchtest", "refresh", "touch", "release", + # Phase 1 additions: SD Card storage + "sd_list", "sd_read", "sd_delete", "time", + # Phase 1 additions: Debug / Diagnostic + "i2c", "si", "lcd", "threads", "stat", "sample", "test", + "gain", "dump", "port", "offset", "dac", "usart_cfg", "usart", "band", + # Convenience: server-side analysis + "analyze", ] @@ -26,7 +44,11 @@ def create_server() -> FastMCP: "MCP server for controlling NanoVNA-H vector network analyzers via USB serial. " "Tools provide structured access to sweep measurements, S-parameter data, " "calibration, markers, and device configuration. The device auto-connects " - "on first tool call — just plug in and go." + "on first tool call — just plug in and go.\n\n" + "Use the 'analyze' tool for comprehensive measurement reports with SWR, " + "impedance, bandwidth, and reactive component analysis built in.\n\n" + "Prompts are available for guided workflows: calibrate, export_touchstone, " + "analyze_antenna, measure_cable, and compare_sweeps." ), ) vna = NanoVNA() @@ -36,6 +58,8 @@ def create_server() -> FastMCP: tool = FunctionTool.from_function(bound_method) mcp.add_tool(tool) + register_prompts(mcp) + return mcp