Mixin refactor: split NanoVNA into 6 tool modules, add LC/component analysis

Break monolithic nanovna.py (~1750 lines) into focused mixin classes
in tools/ subpackage. NanoVNA now composes MeasurementMixin, ConfigMixin,
DisplayMixin, DeviceMixin, DiagnosticsMixin, and AnalysisMixin — server.py
registration via getattr() works unchanged.

New analysis tools: analyze_component, analyze_lc_series, analyze_lc_shunt,
analyze_lc_match, analyze_s11_resonance. Supporting math in calculations.py
(reactance_to_component, LC resonator analysis, impedance matching).

New prompts: measure_component, measure_lc_series, measure_lc_shunt,
impedance_match, measure_tdr, analyze_crystal, analyze_filter_response.

70 tools, 12 prompts registered.
This commit is contained in:
Ryan Malloy 2026-01-30 19:59:23 -07:00
parent 48e91a755c
commit e0fe09f3b8
12 changed files with 3769 additions and 1492 deletions

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -156,19 +156,19 @@ Let me start by setting the sweep range. Ready?""",
**Scan range**: {_format_freq(start_hz)} {_format_freq(stop_hz)}
**Points**: {points}
**Format**: {'S2P (S11 + S21 — requires DUT connected between ports)' if s2p else 'S1P (S11 reflection only)'}
**Format**: {"S2P (S11 + S21 — requires DUT connected between ports)" if s2p else "S1P (S11 reflection only)"}
**Touchstone format reference** (IEEE Std 1363):
```
! NanoVNA-H measurement
# Hz S RI R 50
! freq {'s11_re s11_im s21_re s21_im s12_re s12_im s22_re s22_im' if s2p else 's11_re s11_im'}
! freq {"s11_re s11_im s21_re s21_im s12_re s12_im s22_re s22_im" if s2p else "s11_re s11_im"}
```
I'll:
1. Run a `scan` from {start_hz} to {stop_hz} with {points} points{', capturing both S11 and S21' if s2p else ', S11 only'}
1. Run a `scan` from {start_hz} to {stop_hz} with {points} points{", capturing both S11 and S21" if s2p else ", S11 only"}
2. Format each data point as real/imaginary pairs
3. {'For S2P: S12 and S22 will be set to 0+j0 (NanoVNA is a 1-port/2-port device that measures S11 and S21 only)' if s2p else 'Each line: frequency S11_real S11_imag'}
3. {"For S2P: S12 and S22 will be set to 0+j0 (NanoVNA is a 1-port/2-port device that measures S11 and S21 only)" if s2p else "Each line: frequency S11_real S11_imag"}
4. Present the complete file content for you to save
Let me run the scan now.""",
@ -206,10 +206,7 @@ Let me run the scan now.""",
return [
Message(
role="user",
content=(
f"Analyze my antenna on the {band_label} "
f"({_format_freq(f_start)} {_format_freq(f_stop)})."
),
content=(f"Analyze my antenna on the {band_label} ({_format_freq(f_start)} {_format_freq(f_stop)})."),
),
Message(
role="assistant",
@ -258,8 +255,7 @@ Let me run the scan and analysis now.""",
Message(
role="user",
content=(
f"Analyze a cable/transmission line from "
f"{_format_freq(start_hz)} to {_format_freq(stop_hz)}."
f"Analyze a cable/transmission line from {_format_freq(start_hz)} to {_format_freq(stop_hz)}."
),
),
Message(
@ -290,6 +286,202 @@ Let me start with the open-ended measurement. Is the cable connected with the fa
),
]
@mcp.prompt
def analyze_crystal(
frequency_hz: int = 10_000_000,
span_hz: int = 100_000,
points: int = 201,
) -> list[Message]:
"""Guide through quartz crystal parameter extraction.
Measures a crystal's motional parameters (Rm, Lm, Cm, Cp) and Q factor
using S21 transmission through a series test fixture.
Args:
frequency_hz: Nominal crystal frequency in Hz (e.g. 10000000 for 10 MHz)
span_hz: Frequency span around nominal (e.g. 100000 for +/-50 kHz)
points: Number of measurement points (201 recommended for resolution)
"""
start = frequency_hz - span_hz // 2
stop = frequency_hz + span_hz // 2
return [
Message(
role="user",
content=(
f"Measure and extract parameters for a {_format_freq(frequency_hz)} "
f"crystal with {_format_freq(span_hz)} span."
),
),
Message(
role="assistant",
content=f"""I'll guide you through crystal parameter extraction using the series-jig method.
**Target crystal**: {_format_freq(frequency_hz)} (nominal)
**Scan range**: {_format_freq(start)} {_format_freq(stop)} ({points} points)
**Test fixture setup** (series measurement jig):
```
Port 1 Crystal Port 2
SMA SMA
```
The crystal is placed in series between the two ports. S21 transmission
peaks at the series resonance frequency (fs) where the crystal's motional
impedance is at minimum.
**What I'll extract:**
- **fs** Series resonance frequency (max S21 transmission)
- **fp** Parallel resonance frequency (min S21 transmission, anti-resonance)
- **Rm** Motional resistance (ESR at resonance)
- **Lm** Motional inductance
- **Cm** Motional capacitance
- **Cp** Holder/shunt capacitance (from fs-fp spacing)
- **Q** Quality factor (typically 10,000100,000 for quartz)
- **Insertion loss** at series resonance
**Tips for accurate measurements:**
- Calibrate first (the `calibrate` prompt helps)
- Use short SMA cables to minimize fixture parasitics
- A span of 2-5x the crystal bandwidth gives good resolution
- For narrow-bandwidth crystals (high Q), increase points to 201 or 401
**Procedure:**
1. Connect the crystal in the series test jig between Port 1 and Port 2
2. I'll run `analyze_xtal` with the scan parameters
3. I'll present the extracted motional equivalent circuit parameters
Ready? Let me start the measurement.""",
),
]
@mcp.prompt
def analyze_filter_response(
start_hz: int = 1_000_000,
stop_hz: int = 500_000_000,
points: int = 201,
) -> list[Message]:
"""Guide through filter characterization and classification.
Measures S21 transmission through a filter to determine type,
cutoff frequencies, bandwidth, Q factor, and roll-off rate.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (201 recommended)
"""
return [
Message(
role="user",
content=(
f"Characterize a filter from {_format_freq(start_hz)} to "
f"{_format_freq(stop_hz)} with {points} points."
),
),
Message(
role="assistant",
content=f"""I'll measure and classify your filter using S21 transmission analysis.
**Scan range**: {_format_freq(start_hz)} {_format_freq(stop_hz)} ({points} points)
**Setup:**
Connect the filter between Port 1 (input) and Port 2 (output).
Ensure proper impedance matching (50\u03a9 typically).
**What I'll determine:**
- **Filter type**: lowpass, highpass, bandpass, or bandstop
- **Cutoff frequencies** at -3dB, -6dB, -10dB, and -20dB thresholds
- **Bandwidth** at -3dB and -6dB (for bandpass filters)
- **Center frequency** and **Q factor** (for bandpass filters)
- **Roll-off rate** in dB/decade and dB/octave (skirt steepness)
- **Peak insertion loss** (passband flatness)
**Interpreting results:**
- **Roll-off** indicates filter order: ~20 dB/decade per pole (1st order = 20, 2nd = 40, etc.)
- **Q factor** for bandpass: higher Q = narrower bandwidth relative to center frequency
- **Insertion loss** in passband: lower is better (<1 dB for good passive filters)
- Compare -3dB and -6dB bandwidths to assess skirt shape
**Scan tips:**
- Start well below the expected passband for complete characterization
- Extend stop frequency to capture the full stopband roll-off
- Use 201+ points for accurate cutoff interpolation
Let me run the `analyze_filter` tool now.""",
),
]
@mcp.prompt
def measure_tdr(
start_hz: int = 100_000,
stop_hz: int = 900_000_000,
points: int = 201,
velocity_factor: float = 0.66,
) -> list[Message]:
"""Guide through Time Domain Reflectometry cable analysis.
Scans S11 and transforms to the time/distance domain to reveal
impedance discontinuities, cable faults, and connection quality.
Args:
start_hz: Start frequency in Hz (lower = longer range)
stop_hz: Stop frequency in Hz (higher = better resolution)
points: Number of measurement points (201 recommended)
velocity_factor: Cable velocity factor (0.66 for RG-58, see table)
"""
return [
Message(
role="user",
content=(
f"Run TDR analysis from {_format_freq(start_hz)} to "
f"{_format_freq(stop_hz)} with VF={velocity_factor}."
),
),
Message(
role="assistant",
content=f"""I'll perform Time Domain Reflectometry to map impedance along your cable.
**Scan range**: {_format_freq(start_hz)} {_format_freq(stop_hz)} ({points} points)
**Velocity factor**: {velocity_factor}
**How TDR works:**
S11 reflection data across a wide frequency span is transformed to the
time domain via inverse FFT. Each time delay corresponds to a physical
distance along the cable. Impedance changes (connectors, damage, open/short
ends) appear as reflection peaks at their physical location.
**Setup:**
Connect one end of the cable to Port 1 (CH0). The far end can be:
- **Open** large positive reflection at the end
- **Shorted** large negative reflection at the end
- **Terminated (50\u03a9)** no reflection at the end (only see faults)
**Velocity factor reference** (VF determines distance accuracy):
| Cable Type | VF |
|---|---|
| RG-58, RG-8 (solid PE) | 0.66 |
| RG-213 (solid PE) | 0.66 |
| LMR-400 | 0.85 |
| Foam PE dielectric | 0.82 |
| Air dielectric / hardline | 0.950.97 |
| RG-174 | 0.66 |
| Belden 9913 | 0.84 |
**Resolution and range** (determined by frequency span):
- Resolution \u2248 c \u00d7 VF / (2 \u00d7 span) wider span = finer detail
- Max range \u2248 c \u00d7 VF \u00d7 (N-1) / (2 \u00d7 span) more points = longer range
- With {_format_freq(stop_hz - start_hz)} span and VF={velocity_factor}:
~{299_792_458 * velocity_factor / (2 * (stop_hz - start_hz)):.2f} m resolution
**Window options:**
- **minimum** (beta=0): sharpest peaks but more sidelobe ringing
- **normal** (beta=6): good balance of resolution and sidelobe suppression
- **maximum** (beta=13): smoothest response, wider peaks
Let me run `analyze_tdr` now.""",
),
]
@mcp.prompt
def compare_sweeps(
start_hz: int = 1_000_000,
@ -341,3 +533,265 @@ for a valid comparison. Don't change calibration between scans.
Set up the **"before"** condition and tell me when ready for scan #1.""",
),
]
@mcp.prompt
def measure_component(
start_hz: int = 1_000_000,
stop_hz: int = 500_000_000,
points: int = 101,
) -> list[Message]:
"""Guide through identifying an unknown component (inductor, capacitor, or resistor).
Scans S11 reflection to classify the component, report its value,
ESR, Q factor, and self-resonant frequency.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points
"""
return [
Message(
role="user",
content=(f"Identify an unknown component from {_format_freq(start_hz)} to {_format_freq(stop_hz)}."),
),
Message(
role="assistant",
content=f"""I'll identify your component using S11 reflection analysis.
**Scan range**: {_format_freq(start_hz)} \u2013 {_format_freq(stop_hz)} ({points} points)
**Setup:**
Connect the unknown component to Port 1 using a test fixture or SMA adapter.
Port 2 is not used for this measurement.
**How S11 component identification works:**
The VNA measures the complex reflection coefficient (\u0393) at Port 1.
From \u0393, I compute the impedance Z = R + jX at each frequency:
- **Inductor**: positive reactance (X > 0), increasing with frequency. X = 2\u03c0fL
- **Capacitor**: negative reactance (X < 0), magnitude decreasing with frequency. X = \u22121/(2\u03c0fC)
- **Resistor**: minimal reactance across the sweep, R dominates
- **LC circuit**: reactance crosses zero at the self-resonant frequency (SRF)
**Frequency range tips:**
- **Large inductors** (>\u00b5H): use lower start frequency (100 kHz \u2013 50 MHz)
- **Small inductors** (<100 nH): use higher range (50 MHz \u2013 900 MHz)
- **Large capacitors** (>100 pF): lower range works well
- **Small capacitors** (<10 pF): use higher frequencies
- Start wide, then narrow down around the region of interest
**What I'll report:**
- Component type (inductor / capacitor / resistor / LC circuit)
- Primary value (nH, pF, or \u03a9)
- ESR (equivalent series resistance)
- Q factor at the measurement frequency
- Self-resonant frequency if the component has one
Let me run `analyze_component` now.""",
),
]
@mcp.prompt
def measure_lc_series(
start_hz: int = 1_000_000,
stop_hz: int = 500_000_000,
points: int = 201,
measure_r: float = 50.0,
) -> list[Message]:
"""Guide through series LC resonator measurement.
Measures a component in series between Port 1 and Port 2 using S21
transmission. At resonance, the series LC has minimum impedance and
maximum S21 transmission.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (201 recommended)
measure_r: Port termination resistance in ohms (default 50)
"""
return [
Message(
role="user",
content=(f"Measure a series LC resonator from {_format_freq(start_hz)} to {_format_freq(stop_hz)}."),
),
Message(
role="assistant",
content=f"""I'll characterize your series LC resonator using S21 transmission measurement.
**Scan range**: {_format_freq(start_hz)} \u2013 {_format_freq(stop_hz)} ({points} points)
**Port termination**: {measure_r} \u03a9
**Test fixture setup** (series topology):
```
Port 1 \u2500\u2500\u2500\u2500\u2500[ DUT ]\u2500\u2500\u2500\u2500\u2500 Port 2
in series
```
The device under test is placed **in the signal path** between Port 1 and
Port 2. At the resonant frequency, the series LC has minimum impedance,
allowing maximum signal through \u2014 a **peak** in S21.
**What I'll extract:**
- **Resonant frequency** (peak S21 transmission)
- **Motional resistance** Rm (series resistance at resonance)
- **Inductance** L (from phase bandwidth)
- **Capacitance** C (from phase bandwidth)
- **Q factor** = 2\u03c0fL/Rm
- **Bandwidth** (from \u00b145\u00b0 phase crossings)
- **Insertion loss** at resonance
**Series vs crystal measurement:**
This tool extracts the same parameters as crystal analysis but without
searching for parallel resonance or holder capacitance. Use `analyze_crystal`
for quartz crystals, and this tool for general LC resonators, ceramic
resonators, or SAW devices.
**Tips:**
- Use 201+ points for narrow-bandwidth resonators
- Center the scan range around the expected resonance
- A wider span helps if you're unsure of the exact frequency
Let me run `analyze_lc_series` now.""",
),
]
@mcp.prompt
def measure_lc_shunt(
start_hz: int = 1_000_000,
stop_hz: int = 500_000_000,
points: int = 201,
measure_r: float = 50.0,
) -> list[Message]:
"""Guide through shunt LC resonator measurement.
Measures a component connected as a shunt (to ground) using S21
transmission. At resonance, the shunt LC absorbs signal, producing
a transmission dip.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (201 recommended)
measure_r: Port termination resistance in ohms (default 50)
"""
return [
Message(
role="user",
content=(f"Measure a shunt LC resonator from {_format_freq(start_hz)} to {_format_freq(stop_hz)}."),
),
Message(
role="assistant",
content=f"""I'll characterize your shunt LC resonator using S21 transmission measurement.
**Scan range**: {_format_freq(start_hz)} \u2013 {_format_freq(stop_hz)} ({points} points)
**Port termination**: {measure_r} \u03a9
**Test fixture setup** (shunt topology):
```
Port 1 \u2500\u2500\u2500\u252c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u252c\u2500\u2500\u2500 Port 2
\u2502 \u2502
[DUT] signal
\u2502 path
GND
```
The device under test is connected from the signal path **to ground**
(shunt configuration). At resonance, the parallel LC presents maximum
impedance to ground, creating an **absorption dip** in S21 transmission.
**How it differs from series measurement:**
| | Series | Shunt |
|---|---|---|
| S21 at resonance | **Peak** (max transmission) | **Dip** (min transmission) |
| DUT placement | In-line between ports | From signal path to ground |
| Resonance impedance | Minimum (short) | Maximum (open to ground) |
| Typical use | Crystal filters, series traps | Notch filters, EMI suppression |
**What I'll extract:**
- **Resonant frequency** (minimum S21 transmission)
- **Motional resistance** Rm (from attenuation depth)
- **Inductance** L and **Capacitance** C
- **Q factor** = f/bandwidth
- **Bandwidth** and **insertion loss**
**Applications:**
- Notch filter characterization
- EMI filter evaluation
- Parallel resonator tuning
- LC tank circuit measurement
Let me run `analyze_lc_shunt` now.""",
),
]
@mcp.prompt
def impedance_match(
frequency_hz: int = 145_000_000,
r: float = 25.0,
x: float = 15.0,
z0: float = 50.0,
) -> list[Message]:
"""Guide through impedance matching network design.
Computes L-network solutions to match a load impedance to a target
impedance (typically 50 ohm). Can use direct R+jX values or scan S11.
Args:
frequency_hz: Design frequency in Hz
r: Load resistance in ohms (example default)
x: Load reactance in ohms (example default)
z0: Target impedance in ohms (default 50)
"""
return [
Message(
role="user",
content=(
f"Design an impedance matching network for "
f"{r}+j{x} \u03a9 at {_format_freq(frequency_hz)}, "
f"matching to {z0} \u03a9."
),
),
Message(
role="assistant",
content=f"""I'll compute L-network impedance matching solutions for your load.
**Design parameters:**
- **Load impedance**: {r} + j{x} \u03a9
- **Target impedance**: {z0} \u03a9
- **Frequency**: {_format_freq(frequency_hz)}
**What is an L-network?**
The simplest broadband matching network uses two reactive components
(inductors and/or capacitors) arranged in an "L" shape. There are typically
2\u20134 valid solutions, each with different component values and bandwidth
characteristics.
**L-network topologies:**
```
Source shunt \u2500\u2500 Series \u2500\u2500 Load shunt
\u2502 \u2502
[Zp] \u2500\u2500\u2500[Zs]\u2500\u2500\u2500 [Zp]
\u2502 \u2502
GND GND
```
Each solution specifies which positions get an inductor, capacitor, or
nothing. The tool returns up to 4 solutions.
**Two modes available:**
1. **Direct mode** (no hardware needed): Provide R and X values directly.
Use this when you already know the impedance from a previous measurement
or from a datasheet.
2. **Scan mode**: Provide start/stop frequencies and the tool scans S11 to
measure the actual impedance at the target frequency. More accurate when
the load is available for measurement.
**Choosing between solutions:**
- Prefer solutions with **fewer components** (null entries)
- **Capacitor in shunt** + **inductor in series** is the most common topology
- Higher-Q solutions give narrower bandwidth (sharper match)
- Consider practical component values (avoid sub-nH or sub-pF)
Let me compute the matching solutions now using `analyze_lc_match`.""",
),
]

View File

@ -314,11 +314,36 @@ class NanoVNAProtocol:
help_text = " ".join(help_lines).lower()
# The help output format is: "Commands: scan scan_bin data ..."
for cmd in [
"scan_bin", "data", "frequencies", "sweep", "power", "bandwidth",
"cal", "save", "recall", "trace", "marker", "edelay", "s21offset",
"capture", "vbat", "tcxo", "reset", "smooth", "transform",
"threshold", "info", "version", "color", "measure", "pause",
"resume", "config", "usart_cfg", "vbat_offset", "time",
"scan_bin",
"data",
"frequencies",
"sweep",
"power",
"bandwidth",
"cal",
"save",
"recall",
"trace",
"marker",
"edelay",
"s21offset",
"capture",
"vbat",
"tcxo",
"reset",
"smooth",
"transform",
"threshold",
"info",
"version",
"color",
"measure",
"pause",
"resume",
"config",
"usart_cfg",
"vbat_offset",
"time",
]:
if cmd in help_text:
info.capabilities.append(cmd)
@ -366,6 +391,7 @@ class NanoVNAProtocol:
# -- Data parsing helpers --
@dataclass
class ScanPoint:
frequency_hz: int | None = None

View File

@ -12,27 +12,83 @@ from mcnanovna.nanovna import NanoVNA
from mcnanovna.prompts import register_prompts
# All public methods on NanoVNA that should become MCP tools.
# Grouped by category for maintainability.
# Each method lives in a mixin class under tools/ — grouped here by module.
_TOOL_METHODS = [
# Tier 1: Essential measurement & control
"info", "sweep", "scan", "data", "frequencies", "marker", "cal",
"save", "recall", "pause", "resume",
# Tier 2: Configuration
"power", "bandwidth", "edelay", "s21offset", "vbat", "capture",
# Tier 3: Advanced
"trace", "transform", "smooth", "threshold", "reset", "version",
"detect", "disconnect", "raw_command", "cw",
# Phase 1 additions: Essential commands
"measure", "config", "saveconfig", "clearconfig", "color", "freq",
"tcxo", "vbat_offset",
# Phase 1 additions: Touch / Remote Desktop
"touchcal", "touchtest", "refresh", "touch", "release",
# Phase 1 additions: SD Card storage
"sd_list", "sd_read", "sd_delete", "time",
# Phase 1 additions: Debug / Diagnostic
"i2c", "si", "lcd", "threads", "stat", "sample", "test",
"gain", "dump", "port", "offset", "dac", "usart_cfg", "usart", "band",
# Convenience: server-side analysis
# tools/measurement.py — MeasurementMixin
"info",
"sweep",
"scan",
"data",
"frequencies",
"marker",
"cal",
"save",
"recall",
"pause",
"resume",
# tools/config.py — ConfigMixin
"power",
"bandwidth",
"edelay",
"s21offset",
"vbat",
"capture",
"measure",
"config",
"saveconfig",
"clearconfig",
"color",
"freq",
"tcxo",
"vbat_offset",
"threshold",
# tools/display.py — DisplayMixin
"trace",
"transform",
"smooth",
"touchcal",
"touchtest",
"refresh",
"touch",
"release",
# tools/device.py — DeviceMixin
"reset",
"version",
"detect",
"disconnect",
"raw_command",
"cw",
"sd_list",
"sd_read",
"sd_delete",
"time",
# tools/diagnostics.py — DiagnosticsMixin
"i2c",
"si",
"lcd",
"threads",
"stat",
"sample",
"test",
"gain",
"dump",
"port",
"offset",
"dac",
"usart_cfg",
"usart",
"band",
# tools/analysis.py — AnalysisMixin
"export_touchstone",
"export_csv",
"analyze_filter",
"analyze_xtal",
"analyze_tdr",
"analyze_component",
"analyze_lc_series",
"analyze_lc_shunt",
"analyze_lc_match",
"analyze_s11_resonance",
"analyze",
]
@ -47,8 +103,19 @@ def create_server() -> FastMCP:
"on first tool call — just plug in and go.\n\n"
"Use the 'analyze' tool for comprehensive measurement reports with SWR, "
"impedance, bandwidth, and reactive component analysis built in.\n\n"
"Export tools: 'export_touchstone' (.s1p/.s2p) and 'export_csv' for data "
"interchange with other RF tools.\n\n"
"Specialized analysis: 'analyze_filter' (type classification, cutoffs, Q), "
"'analyze_xtal' (crystal motional parameters), 'analyze_tdr' (time-domain "
"reflectometry with impedance/distance profiling).\n\n"
"LC & impedance tools: 'analyze_component' (identify unknown L/C/R from S11), "
"'analyze_lc_series' and 'analyze_lc_shunt' (resonator parameters from S21), "
"'analyze_lc_match' (L-network matching solver, accepts direct R+jX or scans S11), "
"'analyze_s11_resonance' (find up to 6 resonant frequencies).\n\n"
"Prompts are available for guided workflows: calibrate, export_touchstone, "
"analyze_antenna, measure_cable, and compare_sweeps."
"analyze_antenna, measure_cable, compare_sweeps, analyze_crystal, "
"analyze_filter_response, measure_tdr, measure_component, measure_lc_series, "
"measure_lc_shunt, and impedance_match."
),
)
vna = NanoVNA()

View File

@ -0,0 +1,33 @@
"""NanoVNA tool mixins — each groups related MCP tool methods.
The NanoVNA class composes all mixins, so server.py's getattr() registration
loop works unchanged. Each mixin accesses shared state (self._protocol,
self._ensure_connected, etc.) through the final composed class at runtime.
"""
from __future__ import annotations
from fastmcp import Context
from .analysis import AnalysisMixin
from .config import ConfigMixin
from .device import DeviceMixin
from .diagnostics import DiagnosticsMixin
from .display import DisplayMixin
from .measurement import MeasurementMixin
async def _progress(ctx: Context | None, progress: float, total: float, message: str) -> None:
"""Report progress if Context is available."""
if ctx:
await ctx.report_progress(progress, total, message)
__all__ = [
"AnalysisMixin",
"ConfigMixin",
"DeviceMixin",
"DiagnosticsMixin",
"DisplayMixin",
"MeasurementMixin",
]

View File

@ -0,0 +1,574 @@
"""AnalysisMixin — export, filter analysis, crystal analysis, TDR, component ID, LC matching."""
from __future__ import annotations
import asyncio
from fastmcp import Context
class AnalysisMixin:
"""Analysis and export tools: analyze, export_touchstone, export_csv, analyze_filter, analyze_xtal,
analyze_tdr, analyze_component, analyze_lc_series, analyze_lc_shunt, analyze_lc_match, analyze_s11_resonance.
"""
async def export_touchstone(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
format: str = "s1p",
z0: float = 50.0,
ctx: Context | None = None,
) -> dict:
"""Export S-parameter data as Touchstone file content (.s1p or .s2p).
Runs a scan and formats results per IEEE Std 1363. The .s1p format
captures S11 reflection only; .s2p captures both S11 and S21
(S12/S22 are zeroed since the NanoVNA doesn't measure them).
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
format: Output format 's1p' (S11 only) or 's2p' (S11 + S21)
z0: Reference impedance in ohms (default 50)
"""
from mcnanovna.calculations import format_touchstone_s1p, format_touchstone_s2p
from mcnanovna.tools import _progress
s2p = format.lower() == "s2p"
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning {points} points...")
scan_result = await self.scan(
start_hz,
stop_hz,
points,
s11=True,
s21=s2p,
)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Formatting Touchstone data...")
freqs = [pt["frequency_hz"] for pt in scan_result["data"]]
s11_data = [pt["s11"] for pt in scan_result["data"]]
if s2p:
s21_data = [pt["s21"] for pt in scan_result["data"]]
content = format_touchstone_s2p(freqs, s11_data, s21_data, z0)
filename = f"nanovna_{start_hz}_{stop_hz}.s2p"
else:
content = format_touchstone_s1p(freqs, s11_data, z0)
filename = f"nanovna_{start_hz}_{stop_hz}.s1p"
await _progress(ctx, 4, 4, "Export complete")
return {
"content": content,
"filename": filename,
"format": "s2p" if s2p else "s1p",
"points": scan_result["points"],
"z0": z0,
}
async def export_csv(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
s11: bool = True,
s21: bool = True,
ctx: Context | None = None,
) -> dict:
"""Export scan data as CSV with derived metrics.
Runs a scan and formats results as CSV including raw S-parameters
and derived values (SWR, return loss, impedance, insertion loss, phase).
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
s11: Include S11 reflection data (default True)
s21: Include S21 transmission data (default True)
"""
from mcnanovna.calculations import format_csv
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning {points} points...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=s11, s21=s21)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Formatting CSV data...")
content = format_csv(scan_result["data"])
await _progress(ctx, 4, 4, "Export complete")
return {
"content": content,
"filename": f"nanovna_{start_hz}_{stop_hz}.csv",
"points": scan_result["points"],
}
async def analyze_filter(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
ctx: Context | None = None,
) -> dict:
"""Classify and characterize a filter from S21 transmission measurement.
Scans S21 through the DUT and determines filter type (lowpass, highpass,
bandpass), cutoff frequencies at -3/-6/-10/-20 dB, bandwidth, Q factor,
and roll-off rate. Connect the filter between Port 1 and Port 2.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
"""
from mcnanovna.calculations import analyze_filter_response
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning S21 ({points} points)...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=False, s21=True)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Analyzing filter response...")
freqs = [pt["frequency_hz"] for pt in scan_result["data"]]
s21_data = [pt["s21"] for pt in scan_result["data"]]
result = analyze_filter_response(s21_data, freqs)
result["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
}
await _progress(ctx, 4, 4, "Filter analysis complete")
return result
async def analyze_xtal(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
z0: float = 50.0,
ctx: Context | None = None,
) -> dict:
"""Extract quartz crystal motional parameters from S21 measurement.
Scans S21 through a crystal in a series test fixture and determines
series/parallel resonance, motional resistance (Rm), inductance (Lm),
capacitance (Cm), holder capacitance (Cp), Q factor, and insertion loss.
Connect the crystal between Port 1 and Port 2.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
z0: Reference impedance in ohms (default 50)
"""
from mcnanovna.calculations import analyze_crystal
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning S21 ({points} points)...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=False, s21=True)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Extracting crystal parameters...")
freqs = [pt["frequency_hz"] for pt in scan_result["data"]]
s21_data = [pt["s21"] for pt in scan_result["data"]]
result = analyze_crystal(s21_data, freqs, z0)
result["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
}
await _progress(ctx, 4, 4, "Crystal analysis complete")
return result
async def analyze_tdr(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
velocity_factor: float = 0.66,
window: str = "normal",
z0: float = 50.0,
ctx: Context | None = None,
) -> dict:
"""Time Domain Reflectometry analysis from S11 frequency sweep.
Scans S11 reflection data and transforms it to the time domain using
an inverse FFT with Kaiser windowing. Returns impedance and reflection
profiles along the cable/transmission line, plus detected discontinuities.
Args:
start_hz: Start frequency in Hz (use wide span for better resolution)
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
velocity_factor: Cable velocity factor (default 0.66 for RG-58)
window: Kaiser window 'minimum' (sharp), 'normal', or 'maximum' (smooth)
z0: Reference impedance in ohms (default 50)
"""
from mcnanovna.calculations import tdr_analysis
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning S11 ({points} points)...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=True, s21=False)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Computing TDR transform...")
freqs = [pt["frequency_hz"] for pt in scan_result["data"]]
s11_data = [pt["s11"] for pt in scan_result["data"]]
result = tdr_analysis(s11_data, freqs, velocity_factor, window, z0)
result["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
}
await _progress(ctx, 4, 4, "TDR analysis complete")
return result
async def analyze_component(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
z0: float = 50.0,
ctx: Context | None = None,
) -> dict:
"""Identify an unknown component from S11 reflection measurement.
Scans S11 and classifies the DUT as an inductor, capacitor, resistor,
or LC circuit. Reports the primary value (nH, pF, or ohm), ESR,
Q factor, and self-resonant frequency if present.
Connect the component to Port 1 (open end of the test fixture).
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
z0: Reference impedance in ohms (default 50)
"""
from mcnanovna.calculations import classify_component
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning S11 ({points} points)...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=True, s21=False)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Classifying component...")
freqs = [pt["frequency_hz"] for pt in scan_result["data"]]
s11_data = [pt["s11"] for pt in scan_result["data"]]
result = classify_component(s11_data, freqs, z0)
result["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
}
await _progress(ctx, 4, 4, "Component analysis complete")
return result
async def analyze_lc_series(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
measure_r: float = 50.0,
ctx: Context | None = None,
) -> dict:
"""Measure a series LC resonator from S21 transmission data.
The component under test is placed in series between Port 1 and Port 2.
At resonance, the series LC circuit has minimum impedance, producing a
transmission peak in S21. Reports resonant frequency, motional resistance,
inductance, capacitance, Q factor, bandwidth, and insertion loss.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
measure_r: Port termination resistance in ohms (default 50)
"""
from mcnanovna.calculations import analyze_lc_series as calc_lc_series
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning S21 ({points} points)...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=False, s21=True)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Analyzing series LC resonator...")
freqs = [pt["frequency_hz"] for pt in scan_result["data"]]
s21_data = [pt["s21"] for pt in scan_result["data"]]
result = calc_lc_series(s21_data, freqs, measure_r)
result["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
}
await _progress(ctx, 4, 4, "Series LC analysis complete")
return result
async def analyze_lc_shunt(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
measure_r: float = 50.0,
ctx: Context | None = None,
) -> dict:
"""Measure a shunt LC resonator from S21 transmission data.
The component under test is connected as a shunt (parallel to ground)
between Port 1 and Port 2. At resonance, the shunt LC circuit has
maximum impedance to ground, producing a transmission dip (absorption)
in S21. Reports resonant frequency, motional resistance, inductance,
capacitance, Q factor, bandwidth, and insertion loss.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
measure_r: Port termination resistance in ohms (default 50)
"""
from mcnanovna.calculations import analyze_lc_shunt as calc_lc_shunt
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning S21 ({points} points)...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=False, s21=True)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Analyzing shunt LC resonator...")
freqs = [pt["frequency_hz"] for pt in scan_result["data"]]
s21_data = [pt["s21"] for pt in scan_result["data"]]
result = calc_lc_shunt(s21_data, freqs, measure_r)
result["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
}
await _progress(ctx, 4, 4, "Shunt LC analysis complete")
return result
async def analyze_lc_match(
self,
frequency_hz: int,
r: float | None = None,
x: float | None = None,
z0: float = 50.0,
start_hz: int | None = None,
stop_hz: int | None = None,
points: int = 101,
ctx: Context | None = None,
) -> dict:
"""Compute L-network impedance matching solutions.
Accepts either a direct impedance (r + jx) or scans S11 to measure it.
**Direct mode**: Provide r and x (real and imaginary parts of impedance
in ohms) along with frequency_hz. No hardware needed.
**Scan mode**: Provide start_hz and stop_hz to scan S11. The impedance
at the frequency nearest to frequency_hz is extracted and used for
matching. Connect the load to Port 1.
Returns up to 4 L-network solutions, each specifying source shunt,
series, and load shunt components (inductors or capacitors with values).
Args:
frequency_hz: Design frequency in Hz for component value calculation
r: Load resistance in ohms (direct mode)
x: Load reactance in ohms (direct mode)
z0: Target impedance in ohms (default 50)
start_hz: Start frequency for S11 scan (scan mode)
stop_hz: Stop frequency for S11 scan (scan mode)
points: Number of measurement points for scan (default 101)
"""
from mcnanovna.calculations import lc_match
from mcnanovna.tools import _progress
scan_info = None
if r is not None and x is not None:
# Direct mode — pure math, no hardware needed
total_steps = 2
await _progress(ctx, 1, total_steps, f"Computing match for {r}+j{x} \u03a9 at {frequency_hz} Hz...")
elif start_hz is not None and stop_hz is not None:
# Scan mode — measure impedance from S11
total_steps = 4
await _progress(ctx, 1, total_steps, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, total_steps, f"Scanning S11 ({points} points)...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=True, s21=False)
if "error" in scan_result:
return scan_result
scan_info = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
}
# Find the data point nearest to the target frequency
data = scan_result["data"]
best_idx = 0
best_dist = abs(data[0]["frequency_hz"] - frequency_hz)
for i, pt in enumerate(data[1:], 1):
dist = abs(pt["frequency_hz"] - frequency_hz)
if dist < best_dist:
best_dist = dist
best_idx = i
# Convert S11 to impedance: Z = z0 * (1 + \u0393) / (1 - \u0393)
s11 = data[best_idx]["s11"]
gamma = complex(s11["real"], s11["imag"])
denom = 1.0 - gamma
if abs(denom) < 1e-12:
return {"error": "S11 \u2248 1.0 (open circuit), cannot compute impedance"}
z = z0 * (1.0 + gamma) / denom
r = z.real
x = z.imag
frequency_hz = data[best_idx]["frequency_hz"]
await _progress(ctx, 3, total_steps, f"Impedance at {frequency_hz} Hz: {r:.1f}+j{x:.1f} \u03a9")
else:
return {"error": "Provide either (r, x) for direct mode or (start_hz, stop_hz) for scan mode"}
result = lc_match(r, x, frequency_hz, z0)
if scan_info is not None:
result["scan_info"] = scan_info
await _progress(ctx, total_steps, total_steps, "Matching network computation complete")
return result
async def analyze_s11_resonance(
self,
start_hz: int,
stop_hz: int,
points: int = 201,
z0: float = 50.0,
ctx: Context | None = None,
) -> dict:
"""Find resonant frequencies from S11 reflection data.
Scans S11 and searches for up to 6 points where the reactance crosses
zero, indicating resonance. Each resonance is classified as series
(reactance goes from negative to positive) or parallel (positive to
negative). Useful for identifying resonant modes of antennas, filters,
or transmission line stubs.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 201)
z0: Reference impedance in ohms (default 50)
"""
from mcnanovna.calculations import find_s11_resonances
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 4, f"Scanning S11 ({points} points)...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=True, s21=False)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 4, "Searching for resonances...")
freqs = [pt["frequency_hz"] for pt in scan_result["data"]]
s11_data = [pt["s11"] for pt in scan_result["data"]]
result = find_s11_resonances(s11_data, freqs, z0)
result["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
}
await _progress(ctx, 4, 4, f"Found {result['count']} resonance(s)")
return result
async def analyze(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
s11: bool = True,
s21: bool = False,
ctx: Context | None = None,
) -> dict:
"""Run a scan and return comprehensive S-parameter analysis.
Combines the scan tool with server-side calculations to produce
a full measurement report including SWR, impedance, bandwidth,
return loss, and reactive components without the LLM needing
to do the math.
Args:
start_hz: Start frequency in Hz
stop_hz: Stop frequency in Hz
points: Number of measurement points (default 101)
s11: Include S11 reflection analysis (default True)
s21: Include S21 transmission analysis (default False)
"""
from mcnanovna.calculations import analyze_scan
from mcnanovna.tools import _progress
await _progress(ctx, 1, 5, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 5, f"Scanning {points} points from {start_hz} to {stop_hz} Hz...")
scan_result = await self.scan(start_hz, stop_hz, points, s11=s11, s21=s21)
if "error" in scan_result:
return scan_result
await _progress(ctx, 3, 5, f"Received {scan_result['points']} measurement points")
await _progress(ctx, 4, 5, "Calculating S-parameter metrics...")
analysis = analyze_scan(scan_result["data"])
analysis["scan_info"] = {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": scan_result["points"],
"binary": scan_result.get("binary", False),
}
await _progress(ctx, 5, 5, "Analysis complete")
return analysis

View File

@ -0,0 +1,417 @@
"""ConfigMixin — device configuration, power, bandwidth, capture, and settings tools."""
from __future__ import annotations
import asyncio
import base64
import re
from fastmcp import Context
# Si5351 drive level descriptions
POWER_DESCRIPTIONS = {
0: "2mA Si5351 drive",
1: "4mA Si5351 drive",
2: "6mA Si5351 drive",
3: "8mA Si5351 drive",
255: "auto",
}
class ConfigMixin:
"""Configuration tools: power, bandwidth, edelay, capture, measure, config, color, tcxo, etc."""
def power(self, level: int | None = None) -> dict:
"""Get or set RF output power level.
Args:
level: Power level (0=2mA, 1=4mA, 2=6mA, 3=8mA Si5351 drive, 255=auto)
"""
self._ensure_connected()
if level is not None:
self._protocol.send_text_command(f"power {level}")
lines = self._protocol.send_text_command("power")
# Response: "power: N"
for line in lines:
if "power" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
val = int(parts[1].strip())
return {
"power": val,
"description": POWER_DESCRIPTIONS.get(val, f"level {val}"),
}
except ValueError:
pass
return {"power": level if level is not None else -1, "description": "unknown", "raw": lines}
def bandwidth(self, bw_hz: int | None = None) -> dict:
"""Get or set the IF bandwidth (affects measurement speed vs noise floor).
Args:
bw_hz: Bandwidth in Hz, or bandwidth divider value. Lower = slower but more accurate.
"""
self._ensure_connected()
if not self._has_capability("bandwidth"):
return {"error": "bandwidth command not supported by this firmware"}
if bw_hz is not None:
self._protocol.send_text_command(f"bandwidth {bw_hz}")
lines = self._protocol.send_text_command("bandwidth")
# Response: "bandwidth N (Mhz)" or similar
if lines:
line = lines[0].strip()
# Try to parse "bandwidth <divider> (<freq>Hz)"
m = re.match(r"bandwidth\s+(\d+)\s*\((\d+)\s*Hz\)", line, re.IGNORECASE)
if m:
return {"bandwidth_divider": int(m.group(1)), "bandwidth_hz": int(m.group(2))}
# Fallback: just return raw
return {"raw": line}
return {"bandwidth_divider": 0, "bandwidth_hz": 0}
def edelay(self, seconds: float | None = None) -> dict:
"""Get or set electrical delay compensation in seconds.
Args:
seconds: Electrical delay in seconds (e.g. 1e-9 for 1 nanosecond)
"""
self._ensure_connected()
if seconds is not None:
self._protocol.send_text_command(f"edelay {seconds}")
lines = self._protocol.send_text_command("edelay")
if lines:
try:
return {"edelay_seconds": float(lines[0].strip())}
except ValueError:
return {"raw": lines}
return {"edelay_seconds": 0.0}
def s21offset(self, db: float | None = None) -> dict:
"""Get or set S21 offset correction in dB.
Args:
db: Offset value in dB
"""
self._ensure_connected()
if not self._has_capability("s21offset"):
return {"error": "s21offset command not supported by this firmware"}
if db is not None:
self._protocol.send_text_command(f"s21offset {db}")
lines = self._protocol.send_text_command("s21offset")
if lines:
try:
return {"s21_offset_db": float(lines[0].strip())}
except ValueError:
return {"raw": lines}
return {"s21_offset_db": 0.0}
def vbat(self) -> dict:
"""Read battery voltage in millivolts."""
self._ensure_connected()
lines = self._protocol.send_text_command("vbat")
# Response: "4151 mV"
if lines:
parts = lines[0].strip().split()
if parts:
try:
mv = int(parts[0])
return {"voltage_mv": mv, "voltage_v": round(mv / 1000.0, 3)}
except ValueError:
pass
return {"voltage_mv": 0, "voltage_v": 0.0, "raw": lines}
def _capture_raw_bytes(self) -> tuple[int, int, bytearray]:
"""Read raw RGB565 pixel data from the device. Blocking serial I/O."""
import time
di = self._protocol.device_info
width = di.lcd_width
height = di.lcd_height
expected_size = width * height * 2
self._protocol._drain()
self._protocol._send_command("capture")
ser = self._protocol._require_connection()
old_timeout = ser.timeout
ser.timeout = 10.0
try:
buf = b""
deadline = time.monotonic() + 10.0
# Read past echo line
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
buf += chunk
if b"\r\n" in buf:
break
echo_end = buf.index(b"\r\n") + 2
pixel_buf = buf[echo_end:]
# Read pixel data
while len(pixel_buf) < expected_size and time.monotonic() < deadline:
remaining = expected_size - len(pixel_buf)
chunk = ser.read(min(remaining, 4096))
if chunk:
pixel_buf += chunk
# Byte-swap RGB565 (firmware sends native LE, display expects BE)
swapped = bytearray(expected_size)
for i in range(0, min(len(pixel_buf), expected_size), 2):
if i + 1 < len(pixel_buf):
swapped[i] = pixel_buf[i + 1]
swapped[i + 1] = pixel_buf[i]
# Drain trailing prompt
trailing = b""
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
trailing += chunk
if b"ch> " in trailing or not chunk:
break
return width, height, swapped
finally:
ser.timeout = old_timeout
async def capture(self, raw: bool = False, ctx: Context | None = None):
"""Capture the current LCD screen as RGB565 pixel data (base64 encoded).
Returns width, height, and raw pixel data for rendering. The pixel format
is RGB565 (16-bit, 2 bytes per pixel). Total size = width * height * 2 bytes.
Args:
raw: If True, return raw RGB565 data as a dict with base64-encoded bytes.
If False (default), convert to PNG and return as an Image.
"""
from mcnanovna.tools import _progress
await _progress(ctx, 1, 3, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
await _progress(ctx, 2, 3, "Reading LCD pixel data...")
width, height, swapped = await asyncio.to_thread(self._capture_raw_bytes)
if raw:
await _progress(ctx, 3, 3, "Capture complete")
return {
"format": "rgb565",
"width": width,
"height": height,
"data_length": len(swapped),
"data_base64": base64.b64encode(bytes(swapped)).decode("ascii"),
}
await _progress(ctx, 3, 3, "Encoding PNG image...")
# Convert RGB565 to PNG and return as MCP Image
import io
import struct as _struct
from PIL import Image as PILImage
from fastmcp.utilities.types import Image
img = PILImage.new("RGB", (width, height))
pixels = img.load()
for y in range(height):
for x in range(width):
offset = (y * width + x) * 2
pixel = _struct.unpack(">H", swapped[offset : offset + 2])[0]
r = ((pixel >> 11) & 0x1F) << 3
g = ((pixel >> 5) & 0x3F) << 2
b = (pixel & 0x1F) << 3
pixels[x, y] = (r, g, b)
buf_png = io.BytesIO()
img.save(buf_png, format="PNG")
return Image(data=buf_png.getvalue(), format="png")
def measure(self, mode: str | None = None) -> dict:
"""Set on-device measurement display mode.
Controls what the NanoVNA computes and displays on-screen. Available
modes depend on firmware build flags.
Args:
mode: Measurement mode one of: none, lc, lcshunt, lcseries,
xtal, filter, cable, resonance. Omit to get usage help.
"""
self._ensure_connected()
if not self._has_capability("measure"):
return {"error": "measure command not supported by this firmware"}
if mode is not None:
lines = self._protocol.send_text_command(f"measure {mode}")
return {"mode": mode, "response": lines}
lines = self._protocol.send_text_command("measure")
return {"response": lines}
def config(self, option: str | None = None, value: int | None = None) -> dict:
"""Query or set device configuration options.
Options depend on firmware build (e.g., auto, avg, connection, mode,
grid, dot, bk, flip, separator, tif). Each takes a value of 0 or 1.
Args:
option: Configuration option name
value: Value to set (0 or 1)
"""
self._ensure_connected()
if not self._has_capability("config"):
return {"error": "config command not supported by this firmware"}
if option is not None and value is not None:
lines = self._protocol.send_text_command(f"config {option} {value}")
return {"option": option, "value": value, "response": lines}
lines = self._protocol.send_text_command("config")
return {"response": lines}
def saveconfig(self) -> dict:
"""Save current device configuration to flash memory.
Saves config_t (distinct from calibration save which uses slots).
This includes touch calibration, display settings, TCXO frequency, etc.
"""
self._ensure_connected()
lines = self._protocol.send_text_command("saveconfig")
return {"saved": True, "response": lines}
def clearconfig(self, key: str = "1234") -> dict:
"""Clear all stored configuration and calibration data from flash.
This is a destructive operation requires the protection key '1234'.
After clearing, you must recalibrate the device.
Args:
key: Protection key (must be '1234' to confirm)
"""
self._ensure_connected()
if key != "1234":
return {"error": "Protection key must be '1234' to confirm clearing all data"}
lines = self._protocol.send_text_command(f"clearconfig {key}")
return {"cleared": True, "response": lines}
def color(self, color_id: int | None = None, rgb24: int | None = None) -> dict:
"""Get or set display color palette entries.
With no args, lists all color slots and their current RGB values.
With id + rgb24, sets a specific color.
Args:
color_id: Palette slot index (0-31)
rgb24: 24-bit RGB color value (e.g., 0xFF0000 for red)
"""
self._ensure_connected()
if not self._has_capability("color"):
return {"error": "color command not supported by this firmware"}
if color_id is not None and rgb24 is not None:
self._protocol.send_text_command(f"color {color_id} {rgb24}")
return {"color_id": color_id, "rgb24": f"0x{rgb24:06x}", "set": True}
lines = self._protocol.send_text_command("color")
colors = []
for line in lines:
line = line.strip()
if ":" in line:
parts = line.split(":")
try:
idx = int(parts[0].strip())
val = parts[1].strip()
colors.append({"id": idx, "rgb24": val})
except (ValueError, IndexError):
pass
return {"colors": colors, "raw": lines}
def freq(self, frequency_hz: int) -> dict:
"""Set a single output frequency and pause the sweep.
Useful for CW measurements or signal generation at a specific frequency.
The sweep is paused automatically.
Args:
frequency_hz: Output frequency in Hz
"""
self._ensure_connected()
self._protocol.send_text_command(f"freq {frequency_hz}")
return {"frequency_hz": frequency_hz, "sweep": "paused"}
def tcxo(self, frequency_hz: int | None = None) -> dict:
"""Get or set the TCXO reference oscillator frequency.
The TCXO frequency affects all frequency accuracy. Default is typically
26000000 Hz. Adjust if you have a precision frequency reference.
Args:
frequency_hz: TCXO frequency in Hz (e.g., 26000000)
"""
self._ensure_connected()
if not self._has_capability("tcxo"):
return {"error": "tcxo command not supported by this firmware"}
if frequency_hz is not None:
self._protocol.send_text_command(f"tcxo {frequency_hz}")
return {"tcxo_hz": frequency_hz, "set": True}
# Query mode returns "current: <value>"
lines = self._protocol.send_text_command("tcxo")
for line in lines:
if "current:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
return {"tcxo_hz": int(parts[1].strip())}
except ValueError:
pass
return {"response": lines}
def vbat_offset(self, offset: int | None = None) -> dict:
"""Get or set battery voltage measurement offset.
Calibrates the battery voltage reading. The offset is added to the
raw ADC value to compensate for hardware variations.
Args:
offset: Voltage offset value (raw ADC units)
"""
self._ensure_connected()
if not self._has_capability("vbat_offset"):
return {"error": "vbat_offset command not supported by this firmware"}
if offset is not None:
self._protocol.send_text_command(f"vbat_offset {offset}")
return {"offset": offset, "set": True}
lines = self._protocol.send_text_command("vbat_offset")
if lines:
try:
return {"offset": int(lines[0].strip())}
except ValueError:
pass
return {"response": lines}
def threshold(self, frequency_hz: int | None = None) -> dict:
"""Get or set the harmonic mode frequency threshold (~290 MHz default).
Above this frequency the Si5351 uses odd harmonics for output.
Args:
frequency_hz: Threshold frequency in Hz
"""
self._ensure_connected()
if frequency_hz is not None:
self._protocol.send_text_command(f"threshold {frequency_hz}")
lines = self._protocol.send_text_command("threshold")
# Parse "current: 290000000" from response
for line in lines:
if "current:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
return {"threshold_hz": int(parts[1].strip())}
except ValueError:
pass
return {"response": lines}

View File

@ -0,0 +1,237 @@
"""DeviceMixin — device lifecycle, reset, version, SD card, CW mode, and raw commands."""
from __future__ import annotations
import base64
from mcnanovna.discovery import find_nanovna_ports
from mcnanovna.protocol import NanoVNAProtocolError
class DeviceMixin:
"""Device tools: reset, version, detect, disconnect, raw_command, cw, sd_list, sd_read, sd_delete, time."""
def reset(self, dfu: bool = False) -> dict:
"""Reset the NanoVNA device. With dfu=True, enters DFU bootloader for firmware update.
Args:
dfu: If True, enter DFU bootloader mode (device will disconnect)
"""
self._ensure_connected()
cmd = "reset dfu" if dfu else "reset"
try:
self._protocol.send_text_command(cmd, timeout=2.0)
except NanoVNAProtocolError:
pass # Device resets and disconnects
self._protocol.close()
note = "Device entering DFU mode — reconnect after firmware update" if dfu else "Device resetting"
return {"reset": True, "dfu": dfu, "note": note}
def version(self) -> dict:
"""Get firmware version string."""
self._ensure_connected()
lines = self._protocol.send_text_command("version")
return {"version": lines[0].strip() if lines else "unknown"}
def detect(self) -> dict:
"""Scan USB ports for connected NanoVNA devices."""
ports = find_nanovna_ports()
return {
"devices": [
{
"port": p.device,
"vid": f"0x{p.vid:04x}",
"pid": f"0x{p.pid:04x}",
"serial_number": p.serial_number,
"description": p.description,
}
for p in ports
],
"count": len(ports),
"currently_connected": self._port,
}
def disconnect(self) -> dict:
"""Close the serial connection to the NanoVNA."""
port = self._port
self._protocol.close()
self._port = None
return {"disconnected": True, "port": port}
def raw_command(self, command: str) -> dict:
"""Send an arbitrary shell command to the NanoVNA and return raw text response.
Escape hatch for firmware commands not wrapped as dedicated tools.
Args:
command: The shell command string to send (e.g. 'config', 'usart_cfg')
"""
self._ensure_connected()
lines = self._protocol.send_text_command(command, timeout=10.0)
return {"command": command, "response_lines": lines}
def cw(self, frequency_hz: int, power: int | None = None) -> dict:
"""Set continuous wave (CW) mode — output a single frequency.
Configures the NanoVNA to sweep a single point, effectively
becoming a CW signal generator at the specified frequency.
Args:
frequency_hz: Output frequency in Hz
power: Optional power level (0-3, or 255 for auto)
"""
self._ensure_connected()
if power is not None:
self._protocol.send_text_command(f"power {power}")
# CW mode is just a sweep with start == stop and 1 point
self._protocol.send_text_command(f"sweep {frequency_hz} {frequency_hz} 1")
self._protocol.send_text_command("resume")
return {"frequency_hz": frequency_hz, "power": power, "mode": "cw"}
def sd_list(self, pattern: str = "*.*") -> dict:
"""List files on the SD card.
Requires SD card hardware support. Returns filenames and sizes.
Args:
pattern: Glob pattern to filter files (default: '*.*')
"""
self._ensure_connected()
if not self._has_capability("sd_list"):
return {"error": "SD card commands not supported by this firmware/hardware"}
lines = self._protocol.send_text_command(f"sd_list {pattern}", timeout=10.0)
files = []
for line in lines:
line = line.strip()
if not line or "err:" in line.lower():
if "err:" in line.lower():
return {"error": line}
continue
parts = line.rsplit(" ", 1)
if len(parts) == 2:
try:
files.append({"name": parts[0], "size": int(parts[1])})
except ValueError:
files.append({"name": line, "size": 0})
else:
files.append({"name": line, "size": 0})
return {"files": files, "count": len(files)}
def sd_read(self, filename: str) -> dict:
"""Read a file from the SD card.
Returns the file content as base64-encoded data. The firmware sends
a 4-byte size header followed by raw file data.
Args:
filename: Name of the file to read
"""
self._ensure_connected()
if not self._has_capability("sd_read"):
return {"error": "SD card commands not supported by this firmware/hardware"}
import struct
import time
self._protocol._drain()
self._protocol._send_command(f"sd_read {filename}")
ser = self._protocol._require_connection()
old_timeout = ser.timeout
ser.timeout = 10.0
try:
buf = b""
deadline = time.monotonic() + 10.0
# Read past echo line
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
buf += chunk
if b"\r\n" in buf:
break
# Check for error response
content = buf.decode("ascii", errors="replace")
if "err:" in content.lower():
return {"error": "File not found or no SD card"}
echo_end = buf.index(b"\r\n") + 2
data_buf = buf[echo_end:]
# Read 4-byte file size header
while len(data_buf) < 4 and time.monotonic() < deadline:
chunk = ser.read(4 - len(data_buf))
if chunk:
data_buf += chunk
if len(data_buf) < 4:
return {"error": "Failed to read file size header"}
file_size = struct.unpack_from("<I", data_buf, 0)[0]
data_buf = data_buf[4:]
# Read file data
while len(data_buf) < file_size and time.monotonic() < deadline:
remaining = file_size - len(data_buf)
chunk = ser.read(min(remaining, 4096))
if chunk:
data_buf += chunk
# Drain trailing prompt
trailing = b""
while time.monotonic() < deadline:
chunk = ser.read(max(1, ser.in_waiting))
if chunk:
trailing += chunk
if b"ch> " in trailing or not chunk:
break
return {
"filename": filename,
"size": file_size,
"data_base64": base64.b64encode(data_buf[:file_size]).decode("ascii"),
}
finally:
ser.timeout = old_timeout
def sd_delete(self, filename: str) -> dict:
"""Delete a file from the SD card.
Args:
filename: Name of the file to delete
"""
self._ensure_connected()
if not self._has_capability("sd_delete"):
return {"error": "SD card commands not supported by this firmware/hardware"}
lines = self._protocol.send_text_command(f"sd_delete {filename}", timeout=10.0)
success = any("ok" in line.lower() for line in lines)
return {"filename": filename, "deleted": success, "response": lines}
def time(
self,
field: str | None = None,
value: int | None = None,
) -> dict:
"""Get or set RTC (real-time clock) time.
With no args, returns current date/time. With field + value, sets
a specific time component.
Args:
field: Time field to set: y, m, d, h, min, sec, or ppm
value: Value for the field (0-99 for date/time, float for ppm)
"""
self._ensure_connected()
if not self._has_capability("time"):
return {"error": "RTC (time) command not supported by this firmware/hardware"}
if field is not None and value is not None:
lines = self._protocol.send_text_command(f"time {field} {value}")
return {"field": field, "value": value, "response": lines}
lines = self._protocol.send_text_command("time")
# Response format: "20YY/MM/DD HH:MM:SS\nusage: ..."
for line in lines:
line = line.strip()
if "/" in line and ":" in line and "usage" not in line.lower():
return {"datetime": line}
return {"response": lines}

View File

@ -0,0 +1,323 @@
"""DiagnosticsMixin — low-level hardware diagnostics, register access, and debug tools."""
from __future__ import annotations
import re
class DiagnosticsMixin:
"""Diagnostic tools: i2c, si, lcd, threads, stat, sample, test, gain, dump, port, offset, dac, usart_cfg, usart, band."""
def i2c(self, page: int, reg: int, data: int) -> dict:
"""Write to an I2C register on the TLV320AIC3204 audio codec.
Low-level diagnostic tool for direct codec register access.
Args:
page: I2C register page number
reg: Register address within the page
data: Byte value to write (0-255)
"""
self._ensure_connected()
if not self._has_capability("i2c"):
return {"error": "i2c command not supported by this firmware"}
lines = self._protocol.send_text_command(f"i2c {page} {reg} {data}")
return {"page": page, "reg": reg, "data": data, "response": lines}
def si(self, reg: int, value: int) -> dict:
"""Write to a Si5351 frequency synthesizer register.
Low-level diagnostic tool for direct Si5351 register access.
Args:
reg: Si5351 register address
value: Byte value to write (0-255)
"""
self._ensure_connected()
if not self._has_capability("si"):
return {"error": "si (Si5351 register) command not supported by this firmware"}
lines = self._protocol.send_text_command(f"si {reg} {value}")
return {"reg": reg, "value": value, "response": lines}
def lcd(self, register: int, data_bytes: list[int] | None = None) -> dict:
"""Send a register command to the LCD display controller.
Low-level diagnostic tool. First arg is the register/command byte,
remaining are data bytes.
Args:
register: LCD register/command byte
data_bytes: Additional data bytes (list of ints)
"""
self._ensure_connected()
if not self._has_capability("lcd"):
return {"error": "lcd command not supported by this firmware"}
parts = [str(register)]
if data_bytes:
parts.extend(str(d) for d in data_bytes)
lines = self._protocol.send_text_command(f"lcd {' '.join(parts)}")
# Response: "ret = 0x..."
for line in lines:
if "ret" in line.lower():
return {"register": register, "data_bytes": data_bytes or [], "result": line.strip()}
return {"register": register, "data_bytes": data_bytes or [], "response": lines}
def threads(self) -> dict:
"""List ChibiOS RTOS thread information.
Shows all running threads with their stack usage, priority, and state.
Useful for diagnosing firmware issues.
TODO: When hardware with ENABLE_THREADS_COMMAND is available, explore
representing ChibiOS threads as MCP Tasks (FastMCP tasks=True) so they
surface in Claude Code's /tasks UI with live state tracking.
"""
self._ensure_connected()
if not self._has_capability("threads"):
return {"error": "threads command not supported by this firmware"}
lines = self._protocol.send_text_command("threads")
threads = []
for line in lines:
line = line.strip()
if not line or "stklimit" in line.lower():
continue # Skip header
parts = line.split("|")
if len(parts) >= 8:
threads.append(
{
"stk_limit": parts[0].strip(),
"stack": parts[1].strip(),
"stk_free": parts[2].strip(),
"addr": parts[3].strip(),
"refs": parts[4].strip(),
"prio": parts[5].strip(),
"state": parts[6].strip(),
"name": parts[7].strip(),
}
)
return {"threads": threads, "count": len(threads)}
def stat(self) -> dict:
"""Get audio ADC statistics for both channels.
Returns average and RMS values for the reference and signal channels
on both ports. Useful for diagnosing signal level issues.
"""
self._ensure_connected()
if not self._has_capability("stat"):
return {"error": "stat command not supported by this firmware"}
lines = self._protocol.send_text_command("stat", timeout=10.0)
channels = []
current_ch: dict = {}
for line in lines:
line = line.strip()
if line.startswith("Ch:"):
if current_ch:
channels.append(current_ch)
current_ch = {"channel": line.split(":")[1].strip()}
elif "average:" in line.lower():
nums = re.findall(r"-?\d+", line)
if len(nums) >= 2:
current_ch["average_ref"] = int(nums[0])
current_ch["average_signal"] = int(nums[1])
elif "rms:" in line.lower():
nums = re.findall(r"-?\d+", line)
if len(nums) >= 2:
current_ch["rms_ref"] = int(nums[0])
current_ch["rms_signal"] = int(nums[1])
if current_ch:
channels.append(current_ch)
return {"channels": channels}
def sample(self, mode: str) -> dict:
"""Set the ADC sample capture function.
Controls what data the sample command captures from the audio ADC.
Args:
mode: Sample mode 'gamma' (complex reflection), 'ampl' (amplitude),
or 'ref' (reference amplitude)
"""
self._ensure_connected()
if not self._has_capability("sample"):
return {"error": "sample command not supported by this firmware"}
valid = {"gamma", "ampl", "ref"}
if mode not in valid:
return {"error": f"Invalid mode '{mode}'. Valid: {', '.join(sorted(valid))}"}
lines = self._protocol.send_text_command(f"sample {mode}")
return {"mode": mode, "response": lines}
def test(self) -> dict:
"""Run hardware self-test.
Executes built-in hardware diagnostics. The specific tests depend
on the firmware build configuration.
"""
self._ensure_connected()
if not self._has_capability("test"):
return {"error": "test command not supported by this firmware"}
lines = self._protocol.send_text_command("test", timeout=30.0)
return {"response": lines}
def gain(self, lgain: int, rgain: int | None = None) -> dict:
"""Set audio codec gain levels.
Controls the TLV320AIC3204 PGA (Programmable Gain Amplifier).
Args:
lgain: Left channel gain (0-95, in 0.5 dB steps)
rgain: Right channel gain (0-95). If omitted, uses lgain for both.
"""
self._ensure_connected()
if not self._has_capability("gain"):
return {"error": "gain command not supported by this firmware"}
if rgain is not None:
lines = self._protocol.send_text_command(f"gain {lgain} {rgain}")
return {"lgain": lgain, "rgain": rgain, "response": lines}
lines = self._protocol.send_text_command(f"gain {lgain}")
return {"lgain": lgain, "rgain": lgain, "response": lines}
def dump(self, channel: int = 0) -> dict:
"""Dump raw audio ADC samples.
Captures and returns raw sample data from the audio buffer.
Useful for signal analysis and debugging.
Args:
channel: Audio channel to dump (0 or 1)
"""
self._ensure_connected()
if not self._has_capability("dump"):
return {"error": "dump command not supported by this firmware"}
lines = self._protocol.send_text_command(f"dump {channel}", timeout=10.0)
samples = []
for line in lines:
for val in line.strip().split():
try:
samples.append(int(val))
except ValueError:
pass
return {"channel": channel, "samples": samples, "count": len(samples)}
def port(self, port_num: int) -> dict:
"""Select the active audio port (TX or RX).
Switches the TLV320AIC3204 codec between transmit and receive paths.
Args:
port_num: Port number (0=TX, 1=RX)
"""
self._ensure_connected()
if not self._has_capability("port"):
return {"error": "port command not supported by this firmware"}
lines = self._protocol.send_text_command(f"port {port_num}")
return {"port": port_num, "description": "TX" if port_num == 0 else "RX", "response": lines}
def offset(self, frequency_hz: int | None = None) -> dict:
"""Get or set the variable IF frequency offset.
Adjusts the intermediate frequency offset used in the measurement
pipeline. Only available on firmware builds with USE_VARIABLE_OFFSET.
Args:
frequency_hz: IF offset frequency in Hz
"""
self._ensure_connected()
if not self._has_capability("offset"):
return {"error": "offset command not supported by this firmware"}
if frequency_hz is not None:
lines = self._protocol.send_text_command(f"offset {frequency_hz}")
return {"offset_hz": frequency_hz, "set": True, "response": lines}
lines = self._protocol.send_text_command("offset")
for line in lines:
if "current:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
return {"offset_hz": int(parts[1].strip())}
except ValueError:
pass
return {"response": lines}
def dac(self, value: int | None = None) -> dict:
"""Get or set the DAC output value.
Controls the on-chip DAC (used for bias voltage or other analog output).
Range is 0-4095 (12-bit).
Args:
value: DAC value (0-4095)
"""
self._ensure_connected()
if not self._has_capability("dac"):
return {"error": "dac command not supported by this firmware/hardware"}
if value is not None:
lines = self._protocol.send_text_command(f"dac {value}")
return {"dac_value": value, "set": True, "response": lines}
lines = self._protocol.send_text_command("dac")
for line in lines:
if "current:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
try:
return {"dac_value": int(parts[1].strip())}
except ValueError:
pass
return {"response": lines}
def usart_cfg(self, baudrate: int | None = None) -> dict:
"""Get or set the USART serial port configuration.
Controls the secondary serial port (USART) baud rate. The USART can
be used for external device communication or serial console.
Args:
baudrate: Baud rate (minimum 300). Omit to query current setting.
"""
self._ensure_connected()
if not self._has_capability("usart_cfg"):
return {"error": "usart_cfg command not supported by this firmware"}
if baudrate is not None:
lines = self._protocol.send_text_command(f"usart_cfg {baudrate}")
return {"baudrate": baudrate, "set": True, "response": lines}
lines = self._protocol.send_text_command("usart_cfg")
# Response: "Serial: <baud> baud"
for line in lines:
if "baud" in line.lower():
m = re.search(r"(\d+)\s*baud", line, re.IGNORECASE)
if m:
return {"baudrate": int(m.group(1))}
return {"response": lines}
def usart(self, data: str, timeout_ms: int = 200) -> dict:
"""Send data through the USART serial port and read the response.
Forwards data to an external device connected to the USART port
and returns any response received within the timeout period.
Args:
data: String data to send
timeout_ms: Response timeout in milliseconds (default 200)
"""
self._ensure_connected()
if not self._has_capability("usart"):
return {"error": "usart command not supported by this firmware"}
lines = self._protocol.send_text_command(f"usart {data} {timeout_ms}", timeout=5.0)
return {"sent": data, "timeout_ms": timeout_ms, "response": lines}
def band(self, index: int, param: str, value: int) -> dict:
"""Configure frequency band parameters for the Si5351 synthesizer.
Low-level control of per-band synthesizer settings. Parameters include
mode, frequency, divider, multiplier, and power settings.
Args:
index: Band index
param: Parameter name mode, freq, div, mul, omul, pow, opow, l, r, lr, adj
value: Parameter value
"""
self._ensure_connected()
if not self._has_capability("b"):
return {"error": "band (b) command not supported by this firmware"}
lines = self._protocol.send_text_command(f"b {index} {param} {value}")
return {"index": index, "param": param, "value": value, "response": lines}

View File

@ -0,0 +1,161 @@
"""DisplayMixin — trace, transform, smoothing, touch, and remote desktop tools."""
from __future__ import annotations
class DisplayMixin:
"""Display and touch tools: trace, transform, smooth, touchcal, touchtest, refresh, touch, release."""
def trace(
self,
number: int | None = None,
trace_type: str | None = None,
channel: int | None = None,
scale: float | None = None,
refpos: float | None = None,
) -> dict:
"""Query or configure display traces.
Trace types: logmag, phase, delay, smith, polar, linear, swr, real, imag,
r, x, z, zp, g, b, y, rp, xp, and many more.
Args:
number: Trace number (0-3)
trace_type: Display format (e.g. 'logmag', 'swr', 'smith')
channel: Data channel (0=S11, 1=S21)
scale: Y-axis scale value
refpos: Reference position on display
"""
self._ensure_connected()
if number is not None:
if trace_type is not None:
cmd = f"trace {number} {trace_type}"
if channel is not None:
cmd += f" {channel}"
self._protocol.send_text_command(cmd)
elif scale is not None:
self._protocol.send_text_command(f"trace {number} scale {scale}")
elif refpos is not None:
self._protocol.send_text_command(f"trace {number} refpos {refpos}")
lines = self._protocol.send_text_command("trace")
return {"traces": lines}
def transform(self, mode: str | None = None) -> dict:
"""Control time-domain transform mode.
Modes: 'on', 'off', 'impulse', 'step', 'bandpass', 'minimum', 'normal', 'maximum'.
Args:
mode: Transform mode to set
"""
self._ensure_connected()
if not self._has_capability("transform"):
return {"error": "transform command not supported by this firmware"}
if mode is not None:
lines = self._protocol.send_text_command(f"transform {mode}")
return {"transform": mode, "response": lines}
lines = self._protocol.send_text_command("transform")
return {"transform_status": lines}
def smooth(self, factor: int | None = None) -> dict:
"""Get or set trace smoothing factor.
Args:
factor: Smoothing factor (0=off, higher=more smoothing)
"""
self._ensure_connected()
if not self._has_capability("smooth"):
return {"error": "smooth command not supported by this firmware"}
if factor is not None:
self._protocol.send_text_command(f"smooth {factor}")
lines = self._protocol.send_text_command("smooth")
return {"response": lines}
def touchcal(self) -> dict:
"""Start touch screen calibration sequence.
Interactive: the device displays calibration targets. Touch the upper-left
corner, then the lower-right corner when prompted. Returns the calibration
parameters when complete.
Note: This is an interactive hardware procedure that requires physical
touch input on the device screen.
"""
self._ensure_connected()
lines = self._protocol.send_text_command("touchcal", timeout=30.0)
# Parse "touch cal params: a b c d"
for line in lines:
if "touch cal params:" in line.lower():
parts = line.split(":")
if len(parts) >= 2:
vals = parts[1].strip().split()
if len(vals) >= 4:
try:
return {
"calibrated": True,
"params": [int(v) for v in vals[:4]],
"response": lines,
}
except ValueError:
pass
return {"response": lines}
def touchtest(self) -> dict:
"""Start touch screen test mode.
Enters a mode where touch points are drawn on screen for verification.
Useful for checking touch calibration accuracy. Exit by sending another
command or resetting.
"""
self._ensure_connected()
lines = self._protocol.send_text_command("touchtest", timeout=10.0)
return {"mode": "touchtest", "response": lines}
def refresh(self, enable: str | None = None) -> dict:
"""Enable or disable remote desktop refresh mode.
When enabled, the device streams display updates over the serial
connection for remote viewing.
Args:
enable: 'on' to enable remote desktop, 'off' to disable
"""
self._ensure_connected()
if enable is not None:
if enable not in ("on", "off"):
return {"error": "enable must be 'on' or 'off'"}
lines = self._protocol.send_text_command(f"refresh {enable}")
return {"remote_desktop": enable, "response": lines}
return {"error": "usage: refresh on|off"}
def touch(self, x: int, y: int) -> dict:
"""Send a touch press event at screen coordinates.
Simulates a finger press on the NanoVNA touchscreen for remote control.
Follow with 'release' to complete the touch gesture.
Args:
x: X coordinate (0 to lcd_width-1)
y: Y coordinate (0 to lcd_height-1)
"""
self._ensure_connected()
lines = self._protocol.send_text_command(f"touch {x} {y}")
return {"action": "press", "x": x, "y": y, "response": lines}
def release(self, x: int = -1, y: int = -1) -> dict:
"""Send a touch release event at screen coordinates.
Completes a touch gesture started with 'touch'. If coordinates
are omitted (-1), releases at the last touch position.
Args:
x: X coordinate (-1 for last position)
y: Y coordinate (-1 for last position)
"""
self._ensure_connected()
if x >= 0 and y >= 0:
lines = self._protocol.send_text_command(f"release {x} {y}")
else:
lines = self._protocol.send_text_command("release")
return {"action": "release", "x": x, "y": y, "response": lines}

View File

@ -0,0 +1,288 @@
"""MeasurementMixin — essential sweep, scan, calibration, and data retrieval tools."""
from __future__ import annotations
import asyncio
from fastmcp import Context
from mcnanovna.protocol import (
SCAN_MASK_BINARY,
SCAN_MASK_NO_CALIBRATION,
SCAN_MASK_OUT_DATA0,
SCAN_MASK_OUT_DATA1,
SCAN_MASK_OUT_FREQ,
parse_float_pairs,
parse_frequencies,
parse_scan_binary,
parse_scan_text,
)
# Channel name mapping for the data command
CHANNEL_NAMES = {
0: "S11 (measured)",
1: "S21 (measured)",
2: "ETERM_ED (directivity)",
3: "ETERM_ES (source match)",
4: "ETERM_ER (reflection tracking)",
5: "ETERM_ET (transmission tracking)",
6: "ETERM_EX (isolation)",
}
class MeasurementMixin:
"""Tier 1 tools: info, sweep, scan, data, frequencies, marker, cal, save, recall, pause, resume."""
def info(self) -> dict:
"""Get NanoVNA device information: board, firmware version, capabilities, display size, and hardware parameters."""
self._ensure_connected()
di = self._protocol.device_info
return {
"board": di.board,
"version": di.version,
"max_points": di.max_points,
"if_hz": di.if_hz,
"adc_hz": di.adc_hz,
"lcd_width": di.lcd_width,
"lcd_height": di.lcd_height,
"architecture": di.architecture,
"platform": di.platform,
"build_time": di.build_time,
"capabilities": di.capabilities,
"port": self._port,
}
def sweep(
self,
start_hz: int | None = None,
stop_hz: int | None = None,
points: int | None = None,
) -> dict:
"""Get or set the sweep frequency range. With no args, returns current settings. With args, sets new sweep parameters.
Args:
start_hz: Start frequency in Hz (e.g. 50000 for 50 kHz)
stop_hz: Stop frequency in Hz (e.g. 900000000 for 900 MHz)
points: Number of sweep points (max depends on hardware: 101 or 401)
"""
self._ensure_connected()
if start_hz is not None:
parts = [str(start_hz)]
if stop_hz is not None:
parts.append(str(stop_hz))
if points is not None:
parts.append(str(points))
self._protocol.send_text_command(f"sweep {' '.join(parts)}")
lines = self._protocol.send_text_command("sweep")
# Response: "start_hz stop_hz points"
if lines:
parts = lines[0].strip().split()
if len(parts) >= 3:
return {
"start_hz": int(parts[0]),
"stop_hz": int(parts[1]),
"points": int(parts[2]),
}
return {"start_hz": 0, "stop_hz": 0, "points": 0}
async def scan(
self,
start_hz: int,
stop_hz: int,
points: int = 101,
s11: bool = True,
s21: bool = True,
apply_cal: bool = True,
ctx: Context | None = None,
) -> dict:
"""Perform a frequency sweep and return S-parameter measurement data.
This is the primary measurement tool. It configures the sweep range,
triggers acquisition, and returns calibrated S11/S21 complex data.
Args:
start_hz: Start frequency in Hz (min ~600, max 2000000000)
stop_hz: Stop frequency in Hz
points: Number of measurement points (1 to device max, typically 101 or 401)
s11: Include S11 reflection data
s21: Include S21 transmission data
apply_cal: Apply stored calibration correction (set False for raw data)
"""
from mcnanovna.tools import _progress
await _progress(ctx, 1, 4, "Connecting to NanoVNA...")
await asyncio.to_thread(self._ensure_connected)
mask = SCAN_MASK_OUT_FREQ
if s11:
mask |= SCAN_MASK_OUT_DATA0
if s21:
mask |= SCAN_MASK_OUT_DATA1
if not apply_cal:
mask |= SCAN_MASK_NO_CALIBRATION
use_binary = self._has_capability("scan_bin")
await _progress(ctx, 2, 4, "Sending scan command...")
if use_binary:
binary_mask = mask | SCAN_MASK_BINARY
await _progress(ctx, 3, 4, f"Waiting for sweep data ({points} points)...")
rx_mask, rx_points, raw = await asyncio.to_thread(
self._protocol.send_binary_scan, start_hz, stop_hz, points, binary_mask
)
scan_points = parse_scan_binary(rx_mask, rx_points, raw)
else:
await _progress(ctx, 3, 4, f"Waiting for sweep data ({points} points)...")
lines = await asyncio.to_thread(
self._protocol.send_text_command,
f"scan {start_hz} {stop_hz} {points} {mask}",
30.0,
)
scan_points = parse_scan_text(lines, mask)
await _progress(ctx, 4, 4, "Parsing measurement data...")
data = []
for pt in scan_points:
entry: dict = {}
if pt.frequency_hz is not None:
entry["frequency_hz"] = pt.frequency_hz
if pt.s11 is not None:
entry["s11"] = {"real": pt.s11[0], "imag": pt.s11[1]}
if pt.s21 is not None:
entry["s21"] = {"real": pt.s21[0], "imag": pt.s21[1]}
data.append(entry)
return {
"start_hz": start_hz,
"stop_hz": stop_hz,
"points": len(data),
"binary": use_binary,
"mask": mask,
"data": data,
}
def data(self, channel: int = 0) -> dict:
"""Read measurement or calibration data arrays from device memory.
Channels: 0=S11 measured, 1=S21 measured, 2=directivity, 3=source match,
4=reflection tracking, 5=transmission tracking, 6=isolation.
Args:
channel: Data array index (0-6)
"""
self._ensure_connected()
if channel < 0 or channel > 6:
return {"error": "Channel must be 0-6"}
lines = self._protocol.send_text_command(f"data {channel}")
pairs = parse_float_pairs(lines)
return {
"channel": channel,
"channel_name": CHANNEL_NAMES.get(channel, f"channel {channel}"),
"points": len(pairs),
"data": [{"real": r, "imag": i} for r, i in pairs],
}
def frequencies(self) -> dict:
"""Get the list of frequency points for the current sweep configuration."""
self._ensure_connected()
lines = self._protocol.send_text_command("frequencies")
freqs = parse_frequencies(lines)
return {"count": len(freqs), "frequencies_hz": freqs}
def marker(
self,
number: int | None = None,
action: str | None = None,
index: int | None = None,
) -> dict:
"""Query or control markers on the NanoVNA display.
With no args, lists all active markers. With number + action, controls a specific marker.
Args:
number: Marker number (1-8)
action: Action to perform: 'on', 'off', or omit to query
index: Set marker to this sweep point index
"""
self._ensure_connected()
if number is not None:
if action is not None:
self._protocol.send_text_command(f"marker {number} {action}")
elif index is not None:
self._protocol.send_text_command(f"marker {number} {index}")
lines = self._protocol.send_text_command("marker")
markers = []
for line in lines:
parts = line.strip().split()
if len(parts) >= 3:
try:
markers.append(
{
"id": int(parts[0]),
"index": int(parts[1]),
"frequency_hz": int(parts[2]),
}
)
except ValueError:
pass
return {"markers": markers}
async def cal(self, step: str | None = None, ctx: Context | None = None) -> dict:
"""Query calibration status or perform a calibration step.
Steps: 'load', 'open', 'short', 'thru', 'isoln', 'done', 'on', 'off', 'reset'.
With no args, returns current calibration status.
Args:
step: Calibration step to execute
"""
from mcnanovna.tools import _progress
await asyncio.to_thread(self._ensure_connected)
if step is not None:
valid = {"load", "open", "short", "thru", "isoln", "done", "on", "off", "reset"}
if step not in valid:
return {"error": f"Invalid step '{step}'. Valid: {', '.join(sorted(valid))}"}
await _progress(ctx, 1, 2, f"Sending calibration command: {step}...")
lines = await asyncio.to_thread(self._protocol.send_text_command, f"cal {step}", 10.0)
await _progress(ctx, 2, 2, f"Calibration step '{step}' complete")
return {"step": step, "response": lines}
lines = await asyncio.to_thread(self._protocol.send_text_command, "cal")
return {"status": lines}
def save(self, slot: int) -> dict:
"""Save current calibration and configuration to a flash memory slot.
Args:
slot: Save slot number (0-4 on NanoVNA-H, 0-6 on NanoVNA-H4)
"""
self._ensure_connected()
self._protocol.send_text_command(f"save {slot}")
return {"slot": slot, "saved": True}
def recall(self, slot: int) -> dict:
"""Recall calibration and configuration from a flash memory slot.
Args:
slot: Save slot number (0-4 on NanoVNA-H, 0-6 on NanoVNA-H4)
"""
self._ensure_connected()
self._protocol.send_text_command(f"recall {slot}", timeout=5.0)
return {"slot": slot, "recalled": True}
def pause(self) -> dict:
"""Pause the continuous sweep. Measurements freeze at current values."""
self._ensure_connected()
self._protocol.send_text_command("pause")
return {"sweep": "paused"}
def resume(self) -> dict:
"""Resume continuous sweep after pause."""
self._ensure_connected()
self._protocol.send_text_command("resume")
return {"sweep": "running"}