Extract positioner into standalone mcpositioner MCP server

Remove PositionerMixin, positioner.py HTTP client, firmware/, and
positioner optional dependency. The measure_antenna_range prompt now
describes cross-server orchestration with mcpositioner for 3D pattern
measurement instead of calling measure_pattern_3d directly.

78 tools remain (was 84). Server instructions updated to reference
mcpositioner as companion server for antenna range measurements.
This commit is contained in:
Ryan Malloy 2026-02-02 21:58:09 -07:00
parent 0b3e4bdf64
commit 748bfb8216
12 changed files with 96 additions and 1080 deletions

2
firmware/.gitignore vendored
View File

@ -1,2 +0,0 @@
.pio/
.vscode/

View File

@ -1,49 +0,0 @@
#pragma once
// ── WiFi credentials (set via build flags or edit here) ──────────
#ifndef WIFI_SSID
#define WIFI_SSID "YOUR_SSID"
#endif
#ifndef WIFI_PASS
#define WIFI_PASS "YOUR_PASS"
#endif
// ── Pin assignments ──────────────────────────────────────────────
#define THETA_STEP_PIN 25
#define THETA_DIR_PIN 26
#define THETA_EN_PIN 27
#define PHI_STEP_PIN 32
#define PHI_DIR_PIN 33
#define PHI_EN_PIN 14
// ── TMC2209 UART ─────────────────────────────────────────────────
#define TMC_RX_PIN 16
#define TMC_TX_PIN 17
#define THETA_TMC_ADDR 0
#define PHI_TMC_ADDR 1
#define TMC_R_SENSE 0.11f // sense resistor value (ohms)
#define TMC_RMS_CURRENT 800 // motor current in mA
// ── Motor constants ──────────────────────────────────────────────
#define STEPS_PER_REV 200
#define DEFAULT_MICROSTEPS 16 // 0.1125 deg per microstep
// Steps per degree = STEPS_PER_REV * MICROSTEPS / 360.0
// At 16 microsteps: 200 * 16 / 360 = 8.888...
// ── Motion defaults ──────────────────────────────────────────────
#define DEFAULT_MAX_SPEED 2000.0f // steps/sec (~225 deg/sec)
#define DEFAULT_ACCEL 1000.0f // steps/sec^2 (~112 deg/sec^2)
#define SETTLE_MS 200 // ms after move before measurement
// ── Homing ───────────────────────────────────────────────────────
#define HOME_SPEED 500.0f // steps/sec (slower for stall detect)
#define STALL_THRESHOLD 50 // TMC2209 StallGuard threshold (tune per setup)
#define HOME_BACKOFF_STEPS 100 // steps to back off after stall detection
// ── Safety ───────────────────────────────────────────────────────
#define IDLE_DISABLE_MS 30000 // disable motors after 30s idle
#define WATCHDOG_TIMEOUT_S 60 // WDT reset if no command while moving
// ── mDNS ─────────────────────────────────────────────────────────
#define MDNS_HOSTNAME "positioner"

View File

@ -1,13 +0,0 @@
[env:esp32]
platform = espressif32
board = esp32dev
framework = arduino
lib_deps =
mathieucarbou/ESPAsyncWebServer @ ^3.6.0
teemuatlut/TMCStepper @ ^0.7.3
waspinator/AccelStepper @ ^1.64
bblanchon/ArduinoJson @ ^7.3.0
monitor_speed = 115200
build_flags =
-DCORE_DEBUG_LEVEL=3
-DBOARD_HAS_PSRAM=0

View File

@ -1,507 +0,0 @@
/*
* ESP32 Antenna Positioner dual-axis stepper control with HTTP API.
*
* Hardware: 2x NEMA 17 + TMC2209 (UART) + ESP32 DevKit
* Axes: theta (polar, 0-180 deg) and phi (azimuth, 0-360 deg)
* Discovery: mDNS at positioner.local
*/
#include <Arduino.h>
#include <WiFi.h>
#include <ESPmDNS.h>
#include <ESPAsyncWebServer.h>
#include <ArduinoJson.h>
#include <AccelStepper.h>
#include <TMCStepper.h>
#include <esp_task_wdt.h>
#include "config.h"
// ── Globals ──────────────────────────────────────────────────────
static AsyncWebServer server(80);
// TMC2209 drivers on shared HardwareSerial bus (different addresses)
// ESP32 requires HardwareSerial& — pin assignment happens in Serial2.begin()
static TMC2209Stepper tmc_theta(&Serial2, TMC_R_SENSE, THETA_TMC_ADDR);
static TMC2209Stepper tmc_phi(&Serial2, TMC_R_SENSE, PHI_TMC_ADDR);
// AccelStepper instances (DRIVER mode = step + dir pins)
static AccelStepper stepper_theta(AccelStepper::DRIVER, THETA_STEP_PIN, THETA_DIR_PIN);
static AccelStepper stepper_phi(AccelStepper::DRIVER, PHI_STEP_PIN, PHI_DIR_PIN);
// State
static volatile bool homed_theta = false;
static volatile bool homed_phi = false;
static volatile bool emergency_stop = false;
static volatile bool stall_detected = false;
static uint16_t microsteps = DEFAULT_MICROSTEPS;
static unsigned long last_command_ms = 0;
static unsigned long last_move_ms = 0;
static bool motors_enabled = true;
// ── Helpers ──────────────────────────────────────────────────────
static float steps_per_deg() {
return (float)STEPS_PER_REV * microsteps / 360.0f;
}
static float current_theta_deg() {
return stepper_theta.currentPosition() / steps_per_deg();
}
static float current_phi_deg() {
float deg = fmod(stepper_phi.currentPosition() / steps_per_deg(), 360.0f);
if (deg < 0) deg += 360.0f;
return deg;
}
static bool is_moving() {
return stepper_theta.isRunning() || stepper_phi.isRunning();
}
static void enable_motors() {
digitalWrite(THETA_EN_PIN, LOW); // TMC2209: LOW = enabled
digitalWrite(PHI_EN_PIN, LOW);
motors_enabled = true;
}
static void disable_motors() {
digitalWrite(THETA_EN_PIN, HIGH);
digitalWrite(PHI_EN_PIN, HIGH);
motors_enabled = false;
}
static void stop_motors() {
stepper_theta.stop();
stepper_phi.stop();
// Run deceleration to zero
while (stepper_theta.isRunning() || stepper_phi.isRunning()) {
stepper_theta.run();
stepper_phi.run();
}
}
// ── JSON response helpers ────────────────────────────────────────
static String status_json() {
JsonDocument doc;
doc["theta_deg"] = round(current_theta_deg() * 100.0f) / 100.0f;
doc["phi_deg"] = round(current_phi_deg() * 100.0f) / 100.0f;
doc["moving"] = is_moving();
doc["homed"] = homed_theta && homed_phi;
doc["homed_theta"] = homed_theta;
doc["homed_phi"] = homed_phi;
doc["stall_detected"] = stall_detected;
doc["motors_enabled"] = motors_enabled;
String out;
serializeJson(doc, out);
return out;
}
static String config_json() {
JsonDocument doc;
doc["steps_per_deg_theta"] = steps_per_deg();
doc["steps_per_deg_phi"] = steps_per_deg();
doc["speed"] = stepper_theta.maxSpeed();
doc["accel"] = stepper_theta.acceleration();
doc["microstepping"] = microsteps;
String out;
serializeJson(doc, out);
return out;
}
static String ok_json(const char* extra_key = nullptr, float extra_val = 0) {
JsonDocument doc;
doc["ok"] = true;
doc["theta_deg"] = round(current_theta_deg() * 100.0f) / 100.0f;
doc["phi_deg"] = round(current_phi_deg() * 100.0f) / 100.0f;
if (extra_key) doc[extra_key] = extra_val;
String out;
serializeJson(doc, out);
return out;
}
static String error_json(const char* message) {
JsonDocument doc;
doc["ok"] = false;
doc["error"] = message;
String out;
serializeJson(doc, out);
return out;
}
// ── TMC2209 init ─────────────────────────────────────────────────
static void init_tmc(TMC2209Stepper& tmc, const char* label) {
tmc.begin();
tmc.toff(4); // enable driver
tmc.rms_current(TMC_RMS_CURRENT); // motor current
tmc.microsteps(microsteps);
tmc.en_spreadCycle(false); // StealthChop for quiet operation
tmc.pwm_autoscale(true); // auto-tune PWM
tmc.SGTHRS(STALL_THRESHOLD); // StallGuard threshold
// Verify communication
uint8_t result = tmc.test_connection();
if (result == 0) {
Serial.printf("[TMC] %s: OK\n", label);
} else {
Serial.printf("[TMC] %s: FAILED (result=%d)\n", label, result);
}
}
// ── Homing (StallGuard sensorless) ──────────────────────────────
static bool home_axis(AccelStepper& stepper, TMC2209Stepper& tmc, const char* label) {
Serial.printf("[Home] %s: starting sensorless home...\n", label);
enable_motors();
// Switch to SpreadCycle for reliable stall detection
tmc.en_spreadCycle(true);
tmc.TCOOLTHRS(0xFFFFF); // enable StallGuard at all speeds
tmc.SGTHRS(STALL_THRESHOLD);
delay(100);
// Move in negative direction until stall
float prev_speed = stepper.maxSpeed();
stepper.setMaxSpeed(HOME_SPEED);
stepper.move(-999999); // move far negative
stall_detected = false;
unsigned long start = millis();
while (!stall_detected && (millis() - start < 30000)) {
stepper.run();
// Check StallGuard via DIAG pin or SG_RESULT register
if (tmc.SG_RESULT() < 10) {
stall_detected = true;
break;
}
if (emergency_stop) {
stepper.stop();
stepper.setMaxSpeed(prev_speed);
tmc.en_spreadCycle(false);
return false;
}
}
stepper.stop();
while (stepper.isRunning()) stepper.run();
if (!stall_detected) {
Serial.printf("[Home] %s: timeout — no stall detected\n", label);
stepper.setMaxSpeed(prev_speed);
tmc.en_spreadCycle(false);
return false;
}
// Back off from the stall point
stepper.move(HOME_BACKOFF_STEPS);
while (stepper.isRunning()) stepper.run();
// Set this position as zero
stepper.setCurrentPosition(0);
stepper.setMaxSpeed(prev_speed);
stall_detected = false;
// Return to StealthChop for quiet operation
tmc.en_spreadCycle(false);
Serial.printf("[Home] %s: homed OK\n", label);
return true;
}
// ── HTTP handlers ────────────────────────────────────────────────
static void setup_routes() {
// GET /status
server.on("/status", HTTP_GET, [](AsyncWebServerRequest* request) {
last_command_ms = millis();
request->send(200, "application/json", status_json());
});
// POST /move — absolute position
server.on("/move", HTTP_POST, [](AsyncWebServerRequest* request) {},
NULL,
[](AsyncWebServerRequest* request, uint8_t* data, size_t len, size_t, size_t) {
last_command_ms = millis();
emergency_stop = false;
JsonDocument doc;
if (deserializeJson(doc, data, len)) {
request->send(400, "application/json", error_json("Invalid JSON"));
return;
}
if (!doc["theta_deg"].is<float>() || !doc["phi_deg"].is<float>()) {
request->send(400, "application/json", error_json("Missing theta_deg or phi_deg"));
return;
}
float theta = doc["theta_deg"].as<float>();
float phi = doc["phi_deg"].as<float>();
// Clamp theta to 0-180
if (theta < 0) theta = 0;
if (theta > 180) theta = 180;
// Normalize phi to 0-360
phi = fmod(phi, 360.0f);
if (phi < 0) phi += 360.0f;
enable_motors();
stepper_theta.moveTo((long)(theta * steps_per_deg()));
stepper_phi.moveTo((long)(phi * steps_per_deg()));
last_move_ms = millis();
request->send(200, "application/json", ok_json());
});
// POST /move/relative — relative move
server.on("/move/relative", HTTP_POST, [](AsyncWebServerRequest* request) {},
NULL,
[](AsyncWebServerRequest* request, uint8_t* data, size_t len, size_t, size_t) {
last_command_ms = millis();
emergency_stop = false;
JsonDocument doc;
if (deserializeJson(doc, data, len)) {
request->send(400, "application/json", error_json("Invalid JSON"));
return;
}
float d_theta = doc["d_theta"] | 0.0f;
float d_phi = doc["d_phi"] | 0.0f;
float new_theta = current_theta_deg() + d_theta;
if (new_theta < 0) new_theta = 0;
if (new_theta > 180) new_theta = 180;
float new_phi = current_phi_deg() + d_phi;
new_phi = fmod(new_phi, 360.0f);
if (new_phi < 0) new_phi += 360.0f;
enable_motors();
stepper_theta.moveTo((long)(new_theta * steps_per_deg()));
stepper_phi.moveTo((long)(new_phi * steps_per_deg()));
last_move_ms = millis();
request->send(200, "application/json", ok_json());
});
// POST /home
server.on("/home", HTTP_POST, [](AsyncWebServerRequest* request) {},
NULL,
[](AsyncWebServerRequest* request, uint8_t* data, size_t len, size_t, size_t) {
last_command_ms = millis();
emergency_stop = false;
String axis = "both";
if (len > 0) {
JsonDocument doc;
if (!deserializeJson(doc, data, len)) {
axis = doc["axis"] | "both";
}
}
bool ok = true;
if (axis == "both" || axis == "theta") {
homed_theta = home_axis(stepper_theta, tmc_theta, "theta");
ok = ok && homed_theta;
}
if (axis == "both" || axis == "phi") {
homed_phi = home_axis(stepper_phi, tmc_phi, "phi");
ok = ok && homed_phi;
}
JsonDocument resp;
resp["ok"] = ok;
resp["homed"] = homed_theta && homed_phi;
resp["homed_theta"] = homed_theta;
resp["homed_phi"] = homed_phi;
String out;
serializeJson(resp, out);
request->send(ok ? 200 : 500, "application/json", out);
});
// POST /stop — emergency stop
server.on("/stop", HTTP_POST, [](AsyncWebServerRequest* request) {
last_command_ms = millis();
emergency_stop = true;
stop_motors();
JsonDocument doc;
doc["ok"] = true;
doc["stopped"] = true;
doc["theta_deg"] = round(current_theta_deg() * 100.0f) / 100.0f;
doc["phi_deg"] = round(current_phi_deg() * 100.0f) / 100.0f;
String out;
serializeJson(doc, out);
request->send(200, "application/json", out);
});
// GET /config
server.on("/config", HTTP_GET, [](AsyncWebServerRequest* request) {
last_command_ms = millis();
request->send(200, "application/json", config_json());
});
// POST /config — update speed, accel, microstepping
server.on("/config", HTTP_POST, [](AsyncWebServerRequest* request) {},
NULL,
[](AsyncWebServerRequest* request, uint8_t* data, size_t len, size_t, size_t) {
last_command_ms = millis();
JsonDocument doc;
if (deserializeJson(doc, data, len)) {
request->send(400, "application/json", error_json("Invalid JSON"));
return;
}
if (doc["speed"].is<float>()) {
float speed = doc["speed"].as<float>();
stepper_theta.setMaxSpeed(speed);
stepper_phi.setMaxSpeed(speed);
}
if (doc["accel"].is<float>()) {
float accel = doc["accel"].as<float>();
stepper_theta.setAcceleration(accel);
stepper_phi.setAcceleration(accel);
}
if (doc["microstepping"].is<unsigned>()) {
uint16_t ms = doc["microstepping"].as<uint16_t>();
if (ms == 1 || ms == 2 || ms == 4 || ms == 8 || ms == 16 ||
ms == 32 || ms == 64 || ms == 128 || ms == 256) {
// Recalculate positions for new microstepping
float theta_deg_now = current_theta_deg();
float phi_deg_now = current_phi_deg();
microsteps = ms;
tmc_theta.microsteps(microsteps);
tmc_phi.microsteps(microsteps);
// Restore positions in new step scale
stepper_theta.setCurrentPosition((long)(theta_deg_now * steps_per_deg()));
stepper_phi.setCurrentPosition((long)(phi_deg_now * steps_per_deg()));
}
}
JsonDocument resp;
resp["ok"] = true;
JsonObject cfg = resp["config"].to<JsonObject>();
cfg["speed"] = stepper_theta.maxSpeed();
cfg["accel"] = stepper_theta.acceleration();
cfg["microstepping"] = microsteps;
cfg["steps_per_deg"] = steps_per_deg();
String out;
serializeJson(resp, out);
request->send(200, "application/json", out);
});
// CORS preflight
server.on("/*", HTTP_OPTIONS, [](AsyncWebServerRequest* request) {
AsyncWebServerResponse* response = request->beginResponse(204);
response->addHeader("Access-Control-Allow-Origin", "*");
response->addHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
response->addHeader("Access-Control-Allow-Headers", "Content-Type");
request->send(response);
});
// Add CORS headers to all responses
DefaultHeaders::Instance().addHeader("Access-Control-Allow-Origin", "*");
DefaultHeaders::Instance().addHeader("Access-Control-Allow-Methods", "GET, POST, OPTIONS");
DefaultHeaders::Instance().addHeader("Access-Control-Allow-Headers", "Content-Type");
}
// ── Setup ────────────────────────────────────────────────────────
void setup() {
Serial.begin(115200);
Serial.println("\n[Positioner] Starting...");
// Motor enable pins
pinMode(THETA_EN_PIN, OUTPUT);
pinMode(PHI_EN_PIN, OUTPUT);
disable_motors(); // start disabled
// TMC2209 UART
Serial2.begin(115200, SERIAL_8N1, TMC_RX_PIN, TMC_TX_PIN);
delay(100);
init_tmc(tmc_theta, "theta");
init_tmc(tmc_phi, "phi");
// AccelStepper config
stepper_theta.setMaxSpeed(DEFAULT_MAX_SPEED);
stepper_theta.setAcceleration(DEFAULT_ACCEL);
stepper_phi.setMaxSpeed(DEFAULT_MAX_SPEED);
stepper_phi.setAcceleration(DEFAULT_ACCEL);
// WiFi
WiFi.mode(WIFI_STA);
WiFi.begin(WIFI_SSID, WIFI_PASS);
Serial.printf("[WiFi] Connecting to %s", WIFI_SSID);
unsigned long wifi_start = millis();
while (WiFi.status() != WL_CONNECTED && (millis() - wifi_start < 15000)) {
delay(500);
Serial.print(".");
}
if (WiFi.status() == WL_CONNECTED) {
Serial.printf("\n[WiFi] Connected: %s\n", WiFi.localIP().toString().c_str());
} else {
Serial.println("\n[WiFi] FAILED — running without network");
}
// mDNS
if (MDNS.begin(MDNS_HOSTNAME)) {
MDNS.addService("http", "tcp", 80);
Serial.printf("[mDNS] %s.local\n", MDNS_HOSTNAME);
}
// HTTP server
setup_routes();
server.begin();
Serial.println("[HTTP] Server started on port 80");
// Watchdog (resets if main loop hangs)
const esp_task_wdt_config_t wdt_cfg = {
.timeout_ms = WATCHDOG_TIMEOUT_S * 1000,
.idle_core_mask = 0,
.trigger_panic = true,
};
esp_task_wdt_init(&wdt_cfg);
esp_task_wdt_add(NULL);
last_command_ms = millis();
last_move_ms = millis();
Serial.println("[Positioner] Ready");
}
// ── Loop ─────────────────────────────────────────────────────────
void loop() {
// Feed watchdog
esp_task_wdt_reset();
// Run stepper motion
if (!emergency_stop) {
stepper_theta.run();
stepper_phi.run();
}
// Track when last move finished
if (is_moving()) {
last_move_ms = millis();
}
// Idle motor disable — prevent overheating
if (motors_enabled && !is_moving() &&
(millis() - last_move_ms > IDLE_DISABLE_MS)) {
disable_motors();
Serial.println("[Motors] Disabled (idle timeout)");
}
}

View File

@ -11,7 +11,6 @@ dependencies = [
]
[project.optional-dependencies]
positioner = ["httpx>=0.28.0"]
webui = [
"fastapi>=0.115.0",
"uvicorn[standard]>=0.34.0",

View File

@ -17,7 +17,6 @@ from mcnanovna.tools import (
DisplayMixin,
MeasurementMixin,
PatternImportMixin,
PositionerMixin,
RadiationMixin,
)
@ -31,7 +30,6 @@ class NanoVNA(
AnalysisMixin,
RadiationMixin,
PatternImportMixin,
PositionerMixin,
):
"""MCP tool class for NanoVNA-H vector network analyzers.
@ -39,7 +37,7 @@ class NanoVNA(
call that needs hardware triggers USB discovery and initialization.
Connection is validated with a sync probe after idle periods.
Tool methods are organized into 6 mixins in the tools/ subpackage.
Tool methods are organized into mixins in the tools/ subpackage.
See each mixin class for its method inventory.
"""

View File

@ -1,108 +0,0 @@
"""HTTP client for ESP32 antenna positioner.
Communicates with the ESP32 over WiFi to control dual-axis stepper
motors for automated antenna pattern measurement. Uses mDNS discovery
(positioner.local) with fallback to MCNANOVNA_POSITIONER_HOST env var.
"""
from __future__ import annotations
import os
class PositionerError(Exception):
"""Raised when positioner communication fails."""
class Positioner:
"""Async HTTP client for the ESP32 antenna positioner."""
def __init__(
self,
host: str | None = None,
port: int = 80,
timeout: float = 30.0,
) -> None:
if host is None:
host = os.environ.get("MCNANOVNA_POSITIONER_HOST", "positioner.local")
self.base_url = f"http://{host}:{port}"
self._timeout = timeout
async def _request(self, method: str, path: str, json: dict | None = None) -> dict:
"""Send an HTTP request and return the parsed JSON response."""
try:
import httpx
except ImportError:
raise PositionerError(
"httpx is required for positioner control. Install with: pip install mcnanovna[positioner]"
)
url = f"{self.base_url}{path}"
try:
async with httpx.AsyncClient(timeout=self._timeout) as client:
if method == "GET":
resp = await client.get(url)
else:
resp = await client.post(url, json=json or {})
resp.raise_for_status()
return resp.json()
except httpx.ConnectError as exc:
raise PositionerError(
f"Cannot connect to positioner at {self.base_url}. "
f"Check that the ESP32 is powered on and connected to WiFi. "
f"Set MCNANOVNA_POSITIONER_HOST if not using mDNS. Error: {exc}"
) from exc
except httpx.TimeoutException as exc:
raise PositionerError(f"Positioner request timed out ({self._timeout}s): {path}") from exc
except httpx.HTTPStatusError as exc:
raise PositionerError(f"Positioner returned {exc.response.status_code}: {exc.response.text}") from exc
except Exception as exc:
raise PositionerError(f"Positioner request failed: {exc}") from exc
async def status(self) -> dict:
"""Get current position, moving state, and homed state."""
return await self._request("GET", "/status")
async def move(self, theta_deg: float, phi_deg: float) -> dict:
"""Move to absolute position (theta=polar, phi=azimuth)."""
return await self._request("POST", "/move", {"theta_deg": theta_deg, "phi_deg": phi_deg})
async def move_relative(self, d_theta: float = 0.0, d_phi: float = 0.0) -> dict:
"""Move by a relative offset from current position."""
return await self._request("POST", "/move/relative", {"d_theta": d_theta, "d_phi": d_phi})
async def home(self, axis: str = "both") -> dict:
"""Home one or both axes using StallGuard sensorless detection.
Args:
axis: 'both', 'theta', or 'phi'
"""
return await self._request("POST", "/home", {"axis": axis})
async def stop(self) -> dict:
"""Emergency stop all motors immediately."""
return await self._request("POST", "/stop")
async def get_config(self) -> dict:
"""Get current motion parameters (speed, accel, microstepping)."""
return await self._request("GET", "/config")
async def set_config(self, **kwargs: float | int) -> dict:
"""Update motion parameters.
Args:
speed: Max speed in steps/sec
accel: Acceleration in steps/sec^2
microstepping: Microstep divisor (1, 2, 4, 8, 16, 32, 64, 128, 256)
"""
return await self._request("POST", "/config", kwargs)
async def wait_until_stopped(self, poll_interval: float = 0.1) -> dict:
"""Poll /status until motors stop moving. Returns final status."""
import asyncio
while True:
st = await self.status()
if not st.get("moving", False):
return st
await asyncio.sleep(poll_interval)

View File

@ -887,12 +887,13 @@ Let me compute the matching solutions now using `analyze_lc_match`.""",
points: int = 51,
theta_step: float = 5.0,
phi_step: float = 10.0,
settle_ms: int = 200,
) -> list[Message]:
"""Guide through automated 3D antenna pattern measurement with positioner.
"""Guide through automated 3D antenna pattern measurement using positioner + VNA.
Uses the ESP32 antenna positioner to physically rotate the antenna under
test through a theta/phi grid while measuring S21 at each position.
Produces a real measured 3D radiation pattern.
This is a cross-server workflow: mcpositioner (separate MCP server) controls
the ESP32 antenna positioner, while this server (mcnanovna) provides VNA
measurements. The LLM orchestrates both.
Args:
antenna_type: Antenna label for metadata (e.g., 'dipole', 'yagi', 'measured')
@ -902,10 +903,11 @@ Let me compute the matching solutions now using `analyze_lc_match`.""",
points: Number of frequency points per S21 measurement
theta_step: Polar angle step in degrees (smaller = higher resolution, longer scan)
phi_step: Azimuth step in degrees
settle_ms: Milliseconds to wait after each positioner move before measuring
"""
if start_hz is not None and stop_hz is not None:
f_start, f_stop = start_hz, stop_hz
band_label = f"Custom ({_format_freq(f_start)} {_format_freq(f_stop)})"
band_label = f"Custom ({_format_freq(f_start)} \u2013 {_format_freq(f_stop)})"
elif band in HAM_BANDS:
f_start, f_stop = HAM_BANDS[band]
band_label = f"{band.upper()} band"
@ -913,61 +915,112 @@ Let me compute the matching solutions now using `analyze_lc_match`.""",
f_start, f_stop = HAM_BANDS["2m"]
band_label = "2M band"
n_theta = int((180 - 0) / theta_step) + 1
n_phi = int((360 - phi_step) / phi_step) + 1
n_theta = int(180 / theta_step) + 1
n_phi = int(360 / phi_step)
total = n_theta * n_phi
est_minutes = total * 1.5 / 60 # ~1.5 sec per point
est_minutes = total * (settle_ms / 1000.0 + 1.0) / 60
return [
Message(
role="user",
content=(
f"Measure the 3D radiation pattern of my {antenna_type} antenna "
f"on the {band_label} using the positioner."
f"on the {band_label} using the positioner and VNA."
),
),
Message(
role="assistant",
content=f"""I'll run an automated 3D radiation pattern measurement using the ESP32 positioner.
content=f"""I'll guide you through an automated 3D radiation pattern measurement.
**Setup**: {band_label} ({_format_freq(f_start)} {_format_freq(f_stop)})
**Grid**: {theta_step}° theta x {phi_step}° phi = {n_theta} x {n_phi} = **{total} measurement points**
**Estimated time**: ~{est_minutes:.0f} minutes
This workflow uses **two MCP servers working together**:
- **mcnanovna** \u2014 controls the NanoVNA for S21 measurements (this server)
- **mcpositioner** \u2014 controls the ESP32 antenna positioner (separate server)
**Hardware required:**
- ESP32 positioner (positioner.local) with dual NEMA 17 steppers
- NanoVNA-H connected via USB
- Transmit antenna on NanoVNA Port 1 (stationary)
- Antenna under test on positioner, connected to NanoVNA Port 2
You'll need the mcpositioner MCP server running. Add it with:
`claude mcp add mcpositioner -- uvx mcpositioner`
**How it works:**
1. The positioner drives the antenna under test through a theta/phi grid
2. At each grid point, the NanoVNA measures S21 (transmission from tx to rx)
3. S21 magnitude at each angle = the receive antenna's pattern
4. The serpentine scan path minimizes motor travel between points
**Grid setup:**
- Band: {band_label} ({_format_freq(f_start)} \u2013 {_format_freq(f_stop)})
- Grid: {theta_step}\u00b0 theta x {phi_step}\u00b0 phi = {n_theta} x {n_phi} = **{total} points**
- VNA scan: {points} frequency points per position
- Settle time: {settle_ms} ms after each move
- Estimated time: ~{est_minutes:.0f} minutes
**Calibration options:**
- **Relative pattern** (default): Normalized so peak = 0 dBi. Shows pattern shape.
- **Absolute gain**: Measure a bore-sight reference first (known gain antenna), pass
`reference_s21_db` to offset all measurements to absolute dBi.
**Hardware setup:**
1. ESP32 positioner powered and on WiFi (positioner.local)
2. NanoVNA-H connected via USB
3. Transmit antenna on NanoVNA Port 1 (stationary, aimed at positioner)
4. Antenna under test mounted on positioner, connected to NanoVNA Port 2
**Before starting:**
1. Ensure the positioner is powered and connected to WiFi
2. Verify with `positioner_status` that it's reachable
3. Run `positioner_home` to establish the reference position
4. Ensure calibration is valid for the measurement frequency range
---
**Step 1: Pre-flight checks**
Call these tools to verify both systems are ready:
- `positioner_status` (mcpositioner) \u2014 verify ESP32 is reachable
- `info` (mcnanovna) \u2014 verify VNA is connected
**Step 2: Home the positioner**
Call `positioner_home` (mcpositioner) with axis='both'.
Wait for it to complete, then verify with `positioner_status`.
**Step 3: Calibrate VNA**
If not already calibrated, run a SOLT calibration covering {_format_freq(f_start)} to {_format_freq(f_stop)}.
**Step 4: Measure the grid**
For each point in the theta/phi grid, execute this sequence:
```
for theta in [0, {theta_step}, {2 * theta_step}, ..., 180]:
for phi in [0, {phi_step}, {2 * phi_step}, ..., {360 - phi_step}]:
1. positioner_move(theta, phi, wait=True) [mcpositioner]
2. wait {settle_ms}ms for mechanical settling
3. scan({f_start}, {f_stop}, {points}, s21=True) [mcnanovna]
4. extract peak S21 magnitude in dB from scan result
5. record: {{theta_deg: theta, phi_deg: phi, s21_peak_db: peak_db}}
```
**Serpentine optimization:** On odd-numbered theta rows, reverse the phi direction
to minimize motor travel.
**Extracting peak S21 from scan result:**
The scan returns data points with s21 complex values (real + imag).
magnitude_db = 20 * log10(sqrt(real^2 + imag^2)).
Take the maximum dB value across all frequency points.
**Step 5: Build the pattern dict**
Assemble measurements into the standard format:
```
pattern = {{
"antenna_type": "{antenna_type}",
"frequency_hz": {(f_start + f_stop) / 2},
"theta_deg": [list of theta values],
"phi_deg": [list of phi values],
"gain_dbi": [list of normalized gain values],
"peak_gain_dbi": max(gain_values),
"num_points": {total},
}}
```
**Relative vs. absolute calibration:**
- **Relative** (default): gain_dbi[i] = s21_peak_db[i] - max(all s21_peak_db)
- **Absolute**: Measure known-gain reference antenna at bore-sight first.
**Resolution tradeoffs:**
| Step size | Grid points | Est. time | Use case |
|-----------|------------|-----------|----------|
| 10° x 20° | 19 x 18 = 342 | ~9 min | Quick survey |
| 5° x 10° | 37 x 36 = 1332 | ~33 min | Standard measurement |
| 2° x 5° | 91 x 72 = 6552 | ~164 min | High-resolution |
| 10\u00b0 x 20\u00b0 | 19 x 18 = 342 | ~9 min | Quick survey |
| 5\u00b0 x 10\u00b0 | 37 x 36 = 1332 | ~33 min | Standard |
| 2\u00b0 x 5\u00b0 | 91 x 72 = 6552 | ~164 min | High-res |
**Web UI**: If running (MCNANOVNA_WEB_PORT), the pattern fills in progressively
in the 3D viewer as measurements are taken.
**Web UI**: If mcnanovna's web UI is running (MCNANOVNA_WEB_PORT), the final
pattern dict can be sent to /api/pattern/compute for 3D rendering.
Ready to start? I'll begin with homing the positioner, then run the full sweep.""",
Ready to start? I'll begin with the pre-flight checks on both servers.""",
),
]

View File

@ -100,13 +100,6 @@ _TOOL_METHODS = [
"import_pattern_nec2",
"import_pattern_s1p",
"list_pattern_formats",
# tools/positioner.py — PositionerMixin
"positioner_status",
"positioner_move",
"positioner_home",
"positioner_stop",
"positioner_config",
"measure_pattern_3d",
]
@ -137,11 +130,9 @@ def create_server() -> FastMCP:
"'import_pattern_nec2', 'import_pattern_s1p' — import measured/simulated patterns "
"from external files (CSV, EMCAR vna.dat, NEC2 output, Touchstone S1P). "
"Use 'list_pattern_formats' for format details and examples.\n\n"
"Antenna positioner tools (requires ESP32 positioner hardware): "
"'positioner_status', 'positioner_move', 'positioner_home', 'positioner_stop', "
"'positioner_config' — control dual-axis NEMA 17 stepper positioner over WiFi. "
"'measure_pattern_3d' — automated full-sphere radiation pattern measurement by "
"driving the positioner through a theta/phi grid while capturing S21 at each point.\n\n"
"For automated antenna range measurement with an ESP32 positioner, use "
"mcpositioner (separate MCP server) to control the positioner hardware, then "
"coordinate both servers — see the measure_antenna_range prompt for guidance.\n\n"
"Prompts are available for guided workflows: calibrate, export_touchstone, "
"analyze_antenna, measure_cable, compare_sweeps, analyze_crystal, "
"analyze_filter_response, measure_tdr, measure_component, measure_lc_series, "

View File

@ -16,7 +16,6 @@ from .diagnostics import DiagnosticsMixin
from .display import DisplayMixin
from .measurement import MeasurementMixin
from .pattern_import import PatternImportMixin
from .positioner import PositionerMixin
from .radiation import RadiationMixin
@ -34,6 +33,5 @@ __all__ = [
"DisplayMixin",
"MeasurementMixin",
"PatternImportMixin",
"PositionerMixin",
"RadiationMixin",
]

View File

@ -1,340 +0,0 @@
"""PositionerMixin — ESP32 antenna positioner control + automated 3D pattern measurement."""
from __future__ import annotations
import asyncio
import math
from fastmcp import Context
class PositionerMixin:
"""Positioner tools: positioner_status, positioner_move, positioner_home,
positioner_stop, positioner_config, measure_pattern_3d."""
def _get_positioner(self):
"""Lazy-init the positioner client."""
if not hasattr(self, "_positioner"):
from mcnanovna.positioner import Positioner
self._positioner = Positioner()
return self._positioner
async def positioner_status(self) -> dict:
"""Get current positioner state: position, moving, homed.
Returns theta/phi angles in degrees, whether motors are moving,
and whether the axes have been homed.
"""
pos = self._get_positioner()
return await pos.status()
async def positioner_move(
self,
theta_deg: float,
phi_deg: float,
wait: bool = True,
) -> dict:
"""Move the antenna positioner to an absolute theta/phi position.
Theta is the polar angle (0=zenith, 90=horizon, 180=nadir).
Phi is the azimuth angle (0=North, 90=East, etc.).
Args:
theta_deg: Polar angle in degrees (0 to 180)
phi_deg: Azimuth angle in degrees (0 to 360)
wait: If True, block until the move completes (default True)
"""
pos = self._get_positioner()
result = await pos.move(theta_deg, phi_deg)
if wait:
result = await pos.wait_until_stopped()
return result
async def positioner_home(
self,
axis: str = "both",
) -> dict:
"""Home one or both positioner axes using sensorless StallGuard detection.
The motors move slowly in the negative direction until a mechanical stall
is detected, then back off and set the position to zero.
Args:
axis: Which axis to home 'both', 'theta', or 'phi'
"""
pos = self._get_positioner()
return await pos.home(axis)
async def positioner_stop(self) -> dict:
"""Emergency stop all positioner motors immediately.
Motors decelerate to a stop. Use this if the positioner is heading
toward an obstacle or behaving unexpectedly.
"""
pos = self._get_positioner()
return await pos.stop()
async def positioner_config(
self,
speed: float | None = None,
accel: float | None = None,
microstepping: int | None = None,
) -> dict:
"""Get or set positioner motion parameters.
With no arguments, returns current config. With arguments, updates
the specified parameters.
Args:
speed: Max speed in steps/sec (default ~2000 = 225 deg/sec)
accel: Acceleration in steps/sec^2 (default ~1000 = 112 deg/sec^2)
microstepping: Microstep divisor (1, 2, 4, 8, 16, 32, 64, 128, 256)
"""
pos = self._get_positioner()
if speed is None and accel is None and microstepping is None:
return await pos.get_config()
kwargs = {}
if speed is not None:
kwargs["speed"] = speed
if accel is not None:
kwargs["accel"] = accel
if microstepping is not None:
kwargs["microstepping"] = microstepping
return await pos.set_config(**kwargs)
async def measure_pattern_3d(
self,
start_hz: int = 144_000_000,
stop_hz: int = 148_000_000,
points: int = 51,
theta_start: float = 0.0,
theta_stop: float = 180.0,
theta_step: float = 5.0,
phi_start: float = 0.0,
phi_stop: float = 355.0,
phi_step: float = 10.0,
settle_ms: int = 200,
reference_s21_db: float | None = None,
antenna_type: str = "measured",
broadcast_interval: int = 0,
ctx: Context | None = None,
) -> dict:
"""Automated 3D antenna radiation pattern measurement.
Drives the ESP32 positioner through a theta/phi grid while measuring
S21 on the NanoVNA at each position. The transmit antenna (port 1) is
stationary; the antenna under test is on the positioner connected to
port 2 as the receive antenna.
Returns a standard pattern dict {theta_deg, phi_deg, gain_dbi} compatible
with the 3D web viewer and all existing pattern tools.
Args:
start_hz: Start frequency for S21 scan (Hz)
stop_hz: Stop frequency for S21 scan (Hz)
points: Number of frequency points per S21 measurement
theta_start: Starting polar angle in degrees
theta_stop: Ending polar angle in degrees
theta_step: Polar angle increment in degrees
phi_start: Starting azimuth angle in degrees
phi_stop: Ending azimuth angle in degrees
phi_step: Azimuth angle increment in degrees
settle_ms: Milliseconds to wait after each move before measuring
reference_s21_db: Bore-sight S21 reference for absolute gain (dB). If None, pattern is relative.
antenna_type: Label for the pattern metadata (default 'measured')
broadcast_interval: Broadcast partial pattern to WebSocket every N points (0=only at end)
"""
from mcnanovna.tools import _progress
pos = self._get_positioner()
# Build the theta/phi grid
theta_positions = []
t = theta_start
while t <= theta_stop + 1e-6:
theta_positions.append(round(t, 4))
t += theta_step
phi_positions = []
p = phi_start
while p <= phi_stop + 1e-6:
phi_positions.append(round(p, 4))
p += phi_step
total_points = len(theta_positions) * len(phi_positions)
await _progress(
ctx,
0,
total_points,
f"Measurement grid: {len(theta_positions)} theta x {len(phi_positions)} phi = {total_points} points",
)
# Ensure positioner is homed
status = await pos.status()
if not status.get("homed", False):
await _progress(ctx, 0, total_points, "Homing positioner...")
home_result = await pos.home()
if not home_result.get("ok", False):
return {"error": "Failed to home positioner", "details": home_result}
# Ensure VNA is connected
await asyncio.to_thread(self._ensure_connected)
# Collect measurements
measurements: list[dict] = []
current_point = 0
for theta_idx, theta in enumerate(theta_positions):
# Serpentine path: alternate phi direction on odd theta rows
phi_sequence = phi_positions if (theta_idx % 2 == 0) else list(reversed(phi_positions))
for phi in phi_sequence:
current_point += 1
# Move positioner
await pos.move(theta, phi)
await pos.wait_until_stopped()
# Settle time for mechanical vibration
if settle_ms > 0:
await asyncio.sleep(settle_ms / 1000.0)
# Scan S21 on the NanoVNA
scan_result = await self.scan(start_hz, stop_hz, points, s11=False, s21=True)
if "error" in scan_result:
await _progress(
ctx,
current_point,
total_points,
f"Scan error at theta={theta} phi={phi}: {scan_result['error']}",
)
continue
# Extract peak S21 magnitude as gain proxy
s21_db = _extract_peak_s21_db(scan_result)
# Apply reference offset for absolute gain
gain_db = s21_db
if reference_s21_db is not None:
gain_db = s21_db - reference_s21_db
measurements.append(
{
"theta_deg": theta,
"phi_deg": phi if (theta_idx % 2 == 0) else phi,
"gain_db": gain_db,
"s21_peak_db": s21_db,
}
)
await _progress(
ctx,
current_point,
total_points,
f"Point {current_point}/{total_points}: theta={theta:.1f} phi={phi:.1f} S21={s21_db:.1f} dB",
)
# Periodic WebSocket broadcast of partial pattern
if broadcast_interval > 0 and current_point % broadcast_interval == 0:
partial = _build_pattern_dict(
measurements,
antenna_type,
start_hz,
stop_hz,
reference_s21_db,
partial=True,
)
await _try_broadcast(partial, current_point, total_points)
# Build final pattern dict
pattern = _build_pattern_dict(
measurements,
antenna_type,
start_hz,
stop_hz,
reference_s21_db,
)
# Final WebSocket broadcast
await _try_broadcast(pattern, total_points, total_points)
await _progress(ctx, total_points, total_points, f"Measurement complete: {len(measurements)} points")
return pattern
def _extract_peak_s21_db(scan_result: dict) -> float:
"""Extract peak S21 magnitude in dB from a scan result."""
peak_db = -999.0
for pt in scan_result.get("data", []):
s21 = pt.get("s21")
if s21 is None:
continue
real = s21.get("real", 0.0)
imag = s21.get("imag", 0.0)
mag = math.sqrt(real * real + imag * imag)
if mag > 0:
db = 20 * math.log10(mag)
if db > peak_db:
peak_db = db
return peak_db
def _build_pattern_dict(
measurements: list[dict],
antenna_type: str,
start_hz: int,
stop_hz: int,
reference_s21_db: float | None,
partial: bool = False,
) -> dict:
"""Build a standard {theta_deg, phi_deg, gain_dbi} pattern dict from measurements."""
if not measurements:
return {"error": "No measurement data collected"}
theta_deg = [m["theta_deg"] for m in measurements]
phi_deg = [m["phi_deg"] for m in measurements]
gain_values = [m["gain_db"] for m in measurements]
# Normalize: peak gain = 0 dBi if no reference, else offset is already applied
peak_gain = max(gain_values) if gain_values else 0
if reference_s21_db is None:
# Relative pattern — normalize peak to approximate dBi
gain_dbi = [g - peak_gain for g in gain_values]
peak_gain_dbi = 0.0
else:
gain_dbi = gain_values
peak_gain_dbi = peak_gain
return {
"antenna_type": antenna_type,
"frequency_hz": (start_hz + stop_hz) / 2,
"theta_deg": theta_deg,
"phi_deg": phi_deg,
"gain_dbi": gain_dbi,
"peak_gain_dbi": peak_gain_dbi,
"num_points": len(measurements),
"partial": partial,
"measurement_info": {
"source": "measured",
"method": "positioner_s21",
"start_hz": start_hz,
"stop_hz": stop_hz,
"reference_s21_db": reference_s21_db,
"calibration": "relative" if reference_s21_db is None else "absolute",
},
}
async def _try_broadcast(pattern: dict, current: int, total: int) -> None:
"""Broadcast pattern to WebSocket clients if the web UI is running."""
try:
from mcnanovna.webui.api import _broadcast_pattern, _ws_clients
if _ws_clients:
await _broadcast_pattern(pattern)
except ImportError:
pass # Web UI not installed
except Exception:
pass # Non-critical — don't fail measurement over broadcast errors

6
uv.lock generated
View File

@ -772,9 +772,6 @@ dependencies = [
]
[package.optional-dependencies]
positioner = [
{ name = "httpx" },
]
webui = [
{ name = "fastapi" },
{ name = "uvicorn", extra = ["standard"] },
@ -784,12 +781,11 @@ webui = [
requires-dist = [
{ name = "fastapi", marker = "extra == 'webui'", specifier = ">=0.115.0" },
{ name = "fastmcp", specifier = ">=2.14.0" },
{ name = "httpx", marker = "extra == 'positioner'", specifier = ">=0.28.0" },
{ name = "pillow", specifier = ">=11.0.0" },
{ name = "pyserial", specifier = ">=3.5" },
{ name = "uvicorn", extras = ["standard"], marker = "extra == 'webui'", specifier = ">=0.34.0" },
]
provides-extras = ["positioner", "webui"]
provides-extras = ["webui"]
[[package]]
name = "mcp"