Ryan Malloy 5a853c15fc Fix event system init and add SSP auto-accept for E2E testing
Two fixes for the E2E test failures:

1. event_reporter_init() was never called in app_main(), so the
   FreeRTOS queue and reporter task were never created. Every BT
   event (pair_request, gatt_read, gatt_write, gatt_subscribe)
   was silently dropped at the NULL-queue guard.

2. SSP Numeric Comparison requires both sides to confirm, but
   bt_pair blocks until completion — creating a deadlock since
   the LLM can't send classic_pair_respond while waiting. Added
   auto_accept flag to set_ssp_mode that auto-confirms numeric
   comparison requests in the GAP callback.
2026-02-02 21:05:28 -07:00

124 lines
4.7 KiB
Python

"""Configuration tools for the ESP32 test harness."""
from __future__ import annotations
from typing import Any
from fastmcp import FastMCP
from ..protocol import CMD_CLASSIC_SET_SSP_MODE, CMD_CONFIGURE, IOCapability, Status
from ..serial_client import NotConnected, get_client
def register_tools(mcp: FastMCP) -> None:
"""Register configuration tools with the MCP server."""
@mcp.tool()
async def esp32_configure(
name: str | None = None,
io_cap: str | None = None,
device_class: int | None = None,
pin_code: str | None = None,
) -> dict[str, Any]:
"""Configure the ESP32 Bluetooth device settings.
Any parameter left as None is not sent, so the ESP32 keeps its
current value for that setting. Combine multiple parameters in a
single call to apply them atomically.
Args:
name: Bluetooth friendly name broadcast during discovery.
io_cap: I/O capability for pairing negotiation. One of:
"display_only", "display_yesno", "keyboard_only",
"no_io", "keyboard_display".
device_class: Bluetooth Class of Device (CoD) value. Controls
how remote scanners categorise this device (e.g. 0x200408
for an audio headset).
pin_code: Legacy PIN code for PIN-based pairing (typically a
4-digit string like "1234").
Returns:
Response data confirming the applied configuration, or an
error dict on failure.
"""
params: dict[str, Any] = {}
if name is not None:
params["name"] = name
if io_cap is not None:
# Validate against known capability values
valid_caps = {cap.value for cap in IOCapability}
if io_cap not in valid_caps:
return {"error": f"Invalid io_cap '{io_cap}'. Must be one of: {sorted(valid_caps)}"}
params["io_cap"] = io_cap
if device_class is not None:
params["device_class"] = device_class
if pin_code is not None:
params["pin_code"] = pin_code
if not params:
return {"error": "No configuration parameters provided"}
try:
client = get_client()
resp = await client.send_command(CMD_CONFIGURE, params)
if resp.status == Status.OK:
return resp.data
return {"error": resp.data.get("error", "Configuration failed")}
except NotConnected:
return {"error": "Not connected to ESP32"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
async def esp32_set_ssp_mode(
mode: str,
auto_accept: bool = False,
) -> dict[str, Any]:
"""Set the Secure Simple Pairing (SSP) mode on the ESP32.
SSP mode determines which pairing association model the ESP32
will use when a remote device initiates Classic Bluetooth pairing.
Args:
mode: SSP association model. One of:
"just_works" — no user interaction, lowest security.
"numeric_comparison" — both sides display a 6-digit code
for the user to confirm they match.
"passkey_entry" — remote device must type a passkey shown
on the ESP32 (requires display_only or keyboard_display
io_cap).
"passkey_display" — ESP32 displays a passkey for the
remote device to enter.
auto_accept: When true, the ESP32 automatically confirms
numeric comparison pairing requests without waiting for
a classic_pair_respond command. Useful for automated
E2E testing where both sides need to confirm but the
MCP tool calls are sequential.
Returns:
Response data confirming the new SSP mode, or an error dict
on failure.
"""
valid_modes = {"just_works", "numeric_comparison", "passkey_entry", "passkey_display"}
if mode not in valid_modes:
return {"error": f"Invalid SSP mode '{mode}'. Must be one of: {sorted(valid_modes)}"}
params: dict[str, Any] = {"mode": mode}
if auto_accept:
params["auto_accept"] = True
try:
client = get_client()
resp = await client.send_command(CMD_CLASSIC_SET_SSP_MODE, params)
if resp.status == Status.OK:
return resp.data
return {"error": resp.data.get("error", "Failed to set SSP mode")}
except NotConnected:
return {"error": "Not connected to ESP32"}
except Exception as e:
return {"error": str(e)}