- Make get_client() sync (was async but did no async work). Callers that omitted await silently got a coroutine object instead of the SerialClient, causing "'coroutine' object has no attribute 'connect'" errors on every tool call. - Fix esp32_connect: use get_client_or_none() for init check and client.event_queue.wait_for() for boot event (wait_event() didn't exist on SerialClient). - Normalise Response.data to dict at parse time — firmware returns bare strings on some error paths, which broke .get() calls in tool error handlers. - Remove stale await from ble.py (9 calls) and classic.py (4 calls). Tested with dual-MCP headless claude session: 26/27 PASS.
258 lines
8.0 KiB
Python
258 lines
8.0 KiB
Python
"""BLE advertising and GATT server tools for the ESP32 test harness."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from fastmcp import FastMCP
|
|
|
|
from ..protocol import (
|
|
CMD_BLE_ADVERTISE,
|
|
CMD_BLE_DISABLE,
|
|
CMD_BLE_ENABLE,
|
|
CMD_BLE_SET_ADV_DATA,
|
|
CMD_GATT_ADD_CHARACTERISTIC,
|
|
CMD_GATT_ADD_SERVICE,
|
|
CMD_GATT_CLEAR,
|
|
CMD_GATT_NOTIFY,
|
|
CMD_GATT_SET_VALUE,
|
|
Status, # noqa: F401
|
|
)
|
|
from ..serial_client import get_client
|
|
|
|
|
|
def register_tools(mcp: FastMCP) -> None:
|
|
"""Register BLE and GATT tools with the MCP server."""
|
|
|
|
@mcp.tool()
|
|
async def esp32_ble_enable() -> dict[str, Any]:
|
|
"""Enable BLE on the ESP32.
|
|
|
|
Initializes the BLE stack so the device can advertise
|
|
and host GATT services.
|
|
|
|
Returns:
|
|
Status dict from the ESP32.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
response = await client.send_command(CMD_BLE_ENABLE)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
@mcp.tool()
|
|
async def esp32_ble_disable() -> dict[str, Any]:
|
|
"""Disable BLE on the ESP32.
|
|
|
|
Stops advertising, tears down GATT services, and shuts
|
|
down the BLE stack.
|
|
|
|
Returns:
|
|
Status dict from the ESP32.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
response = await client.send_command(CMD_BLE_DISABLE)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
@mcp.tool()
|
|
async def esp32_ble_advertise(
|
|
enable: bool,
|
|
interval_ms: int = 100,
|
|
) -> dict[str, Any]:
|
|
"""Start or stop BLE advertising.
|
|
|
|
When enabled, the ESP32 broadcasts advertisement packets so
|
|
nearby scanners can discover it.
|
|
|
|
Args:
|
|
enable: True to start advertising, False to stop.
|
|
interval_ms: Advertising interval in milliseconds.
|
|
|
|
Returns:
|
|
Status dict from the ESP32.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
params: dict[str, Any] = {
|
|
"enable": enable,
|
|
"interval_ms": interval_ms,
|
|
}
|
|
response = await client.send_command(CMD_BLE_ADVERTISE, params)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
@mcp.tool()
|
|
async def esp32_ble_set_adv_data(
|
|
name: str | None = None,
|
|
service_uuids: list[str] | None = None,
|
|
manufacturer_data: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Set BLE advertising data.
|
|
|
|
Configures what the ESP32 includes in its advertisement
|
|
packets. Call this before starting advertising.
|
|
|
|
Args:
|
|
name: Device name to advertise.
|
|
service_uuids: List of service UUIDs to include in the
|
|
advertisement payload.
|
|
manufacturer_data: Hex string of manufacturer-specific data
|
|
(e.g., "ff0002" for company ID 0x02FF).
|
|
|
|
Returns:
|
|
Status dict from the ESP32.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
params: dict[str, Any] = {}
|
|
if name is not None:
|
|
params["name"] = name
|
|
if service_uuids is not None:
|
|
params["service_uuids"] = service_uuids
|
|
if manufacturer_data is not None:
|
|
params["manufacturer_data"] = manufacturer_data
|
|
response = await client.send_command(CMD_BLE_SET_ADV_DATA, params)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
@mcp.tool()
|
|
async def esp32_gatt_add_service(
|
|
uuid: str,
|
|
primary: bool = True,
|
|
) -> dict[str, Any]:
|
|
"""Add a GATT service to the ESP32.
|
|
|
|
Creates a new GATT service that can hold characteristics.
|
|
The ESP32 assigns a handle to the service which is needed
|
|
when adding characteristics.
|
|
|
|
Args:
|
|
uuid: Service UUID (e.g., "180F" for Battery Service or
|
|
full 128-bit form).
|
|
primary: Whether this is a primary service.
|
|
|
|
Returns:
|
|
Dict containing the assigned service handle.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
params: dict[str, Any] = {
|
|
"uuid": uuid,
|
|
"primary": primary,
|
|
}
|
|
response = await client.send_command(CMD_GATT_ADD_SERVICE, params)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
@mcp.tool()
|
|
async def esp32_gatt_add_characteristic(
|
|
service_handle: int,
|
|
uuid: str,
|
|
properties: list[str],
|
|
value: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Add a characteristic to a GATT service.
|
|
|
|
The characteristic is attached to the service identified by
|
|
*service_handle* (returned by esp32_gatt_add_service).
|
|
|
|
Args:
|
|
service_handle: Handle of the parent service.
|
|
uuid: Characteristic UUID.
|
|
properties: List of property strings, e.g.
|
|
["read", "write", "notify", "indicate"].
|
|
value: Optional initial value as a hex string
|
|
(e.g., "e803" for 1000 little-endian).
|
|
|
|
Returns:
|
|
Dict containing the assigned characteristic handle.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
params: dict[str, Any] = {
|
|
"service_handle": service_handle,
|
|
"uuid": uuid,
|
|
"properties": properties,
|
|
}
|
|
if value is not None:
|
|
params["value"] = value
|
|
response = await client.send_command(CMD_GATT_ADD_CHARACTERISTIC, params)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
@mcp.tool()
|
|
async def esp32_gatt_set_value(
|
|
char_handle: int,
|
|
value: str,
|
|
) -> dict[str, Any]:
|
|
"""Set the value of a GATT characteristic.
|
|
|
|
Updates the stored value that BLE clients will read. Also
|
|
prepares the value for any subsequent notification.
|
|
|
|
Args:
|
|
char_handle: Handle of the characteristic to update.
|
|
value: New value as a hex string (e.g., "e803" for
|
|
temperature 25.0C encoded as little-endian uint16).
|
|
|
|
Returns:
|
|
Status dict from the ESP32.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
params: dict[str, Any] = {
|
|
"char_handle": char_handle,
|
|
"value": value,
|
|
}
|
|
response = await client.send_command(CMD_GATT_SET_VALUE, params)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
@mcp.tool()
|
|
async def esp32_gatt_notify(char_handle: int) -> dict[str, Any]:
|
|
"""Send a notification for a characteristic.
|
|
|
|
Pushes the current characteristic value to all BLE clients
|
|
that have subscribed to notifications on this characteristic.
|
|
|
|
Args:
|
|
char_handle: Handle of the characteristic to notify on.
|
|
|
|
Returns:
|
|
Status dict from the ESP32.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
params: dict[str, Any] = {"char_handle": char_handle}
|
|
response = await client.send_command(CMD_GATT_NOTIFY, params)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|
|
|
|
@mcp.tool()
|
|
async def esp32_gatt_clear() -> dict[str, Any]:
|
|
"""Clear all GATT services and characteristics.
|
|
|
|
Removes every service and characteristic from the GATT
|
|
server, resetting it to a blank state. Useful before
|
|
loading a new device persona or test configuration.
|
|
|
|
Returns:
|
|
Status dict from the ESP32.
|
|
"""
|
|
try:
|
|
client = get_client()
|
|
response = await client.send_command(CMD_GATT_CLEAR)
|
|
return response.data
|
|
except Exception as exc:
|
|
return {"error": str(exc)}
|