mcbluetooth-esp32/tests/test_protocol.py
Ryan Malloy 6398a5223a ESP32 Bluetooth test harness MCP server
UART-controlled ESP32 peripheral for automated E2E Bluetooth testing.
Dual-mode (Classic BT + BLE) via Bluedroid on original ESP32.

Firmware (ESP-IDF v5.x, 2511 lines C):
- NDJSON protocol over UART1 (115200 baud)
- System commands: ping, reset, get_info, get_status
- Classic BT: GAP, SPP, all 4 SSP pairing modes
- BLE: GATTS, advertising, GATT service/characteristic management
- 6 device personas: headset, speaker, keyboard, sensor, phone, bare
- Event reporter: thread-safe async event queue to host

Python MCP server (FastMCP, 1626 lines):
- Async serial client with command/response correlation
- Event queue with wait_for pattern matching
- Tools: connection, configure, classic, ble, persona, events
- MCP resources: esp32://status, esp32://events, esp32://personas

Tests: 74 unit tests passing, 5 integration test stubs (skip without hardware)
2026-02-02 15:12:28 -07:00

286 lines
9.4 KiB
Python

from __future__ import annotations
import json
import pytest
from mcbluetooth_esp32.protocol import (
BAUD_RATE,
CMD_BLE_ADVERTISE,
CMD_CLASSIC_ENABLE,
CMD_GATT_ADD_SERVICE,
CMD_GET_INFO,
CMD_PING,
CMD_RESET,
EVT_BOOT,
EVT_CONNECT,
EVT_PAIR_REQUEST,
MAX_LINE_LENGTH,
Command,
Event,
IOCapability,
MsgType,
Response,
Status,
Transport,
build_command,
parse_message,
)
class TestEnums:
def test_msg_type_values(self):
assert MsgType.CMD == "cmd"
assert MsgType.RESP == "resp"
assert MsgType.EVENT == "event"
def test_msg_type_from_string(self):
assert MsgType("cmd") is MsgType.CMD
assert MsgType("resp") is MsgType.RESP
assert MsgType("event") is MsgType.EVENT
def test_status_values(self):
assert Status.OK == "ok"
assert Status.ERROR == "error"
def test_io_capability_values(self):
assert IOCapability.DISPLAY_ONLY == "display_only"
assert IOCapability.DISPLAY_YESNO == "display_yesno"
assert IOCapability.KEYBOARD_ONLY == "keyboard_only"
assert IOCapability.NO_IO == "no_io"
assert IOCapability.KEYBOARD_DISPLAY == "keyboard_display"
def test_transport_values(self):
assert Transport.CLASSIC == "classic"
assert Transport.BLE == "ble"
assert Transport.BOTH == "both"
class TestConstants:
def test_baud_rate(self):
assert BAUD_RATE == 115200
def test_max_line_length(self):
assert MAX_LINE_LENGTH == 2048
def test_command_constants(self):
assert CMD_PING == "ping"
assert CMD_RESET == "reset"
assert CMD_GET_INFO == "get_info"
assert CMD_CLASSIC_ENABLE == "classic_enable"
assert CMD_BLE_ADVERTISE == "ble_advertise"
assert CMD_GATT_ADD_SERVICE == "gatt_add_service"
def test_event_constants(self):
assert EVT_BOOT == "boot"
assert EVT_PAIR_REQUEST == "pair_request"
assert EVT_CONNECT == "connect"
class TestCommand:
def test_build_command_simple(self):
cmd = build_command("ping")
assert cmd.type == MsgType.CMD
assert cmd.cmd == "ping"
assert cmd.params == {}
def test_build_command_with_params(self):
cmd = build_command("configure", {"name": "test"})
assert cmd.type == MsgType.CMD
assert cmd.cmd == "configure"
assert cmd.params == {"name": "test"}
def test_build_command_with_explicit_id(self):
cmd = build_command("ping", cmd_id="42")
assert cmd.id == "42"
def test_auto_incrementing_ids(self):
cmd1 = build_command("ping")
cmd2 = build_command("ping")
assert cmd1.id != cmd2.id
assert int(cmd2.id) > int(cmd1.id)
def test_to_json_produces_valid_json(self):
cmd = build_command("ping", cmd_id="1")
raw = cmd.to_json()
parsed = json.loads(raw)
assert parsed["type"] == "cmd"
assert parsed["id"] == "1"
assert parsed["cmd"] == "ping"
def test_to_json_single_line(self):
cmd = build_command("configure", {"name": "test", "io": "no_io"}, cmd_id="5")
raw = cmd.to_json()
assert "\n" not in raw
def test_to_json_omits_empty_params(self):
cmd = build_command("ping", cmd_id="1")
parsed = json.loads(cmd.to_json())
assert "params" not in parsed
def test_to_json_includes_params_when_present(self):
cmd = build_command("configure", {"name": "test"}, cmd_id="1")
parsed = json.loads(cmd.to_json())
assert parsed["params"] == {"name": "test"}
def test_to_json_round_trip(self):
cmd = build_command("gatt_add_service", {"uuid": "180F", "primary": True}, cmd_id="99")
raw = cmd.to_json()
parsed = json.loads(raw)
assert parsed == {
"type": "cmd",
"id": "99",
"cmd": "gatt_add_service",
"params": {"uuid": "180F", "primary": True},
}
class TestResponse:
def test_from_json_ok(self):
line = json.dumps({"type": "resp", "id": "1", "status": "ok", "data": {"version": "1.0"}})
resp = Response.from_json(line)
assert resp.type == MsgType.RESP
assert resp.id == "1"
assert resp.status == Status.OK
assert resp.data == {"version": "1.0"}
def test_from_json_error(self):
line = json.dumps(
{"type": "resp", "id": "2", "status": "error", "data": {"msg": "timeout"}}
)
resp = Response.from_json(line)
assert resp.status == Status.ERROR
assert resp.data["msg"] == "timeout"
def test_from_json_complex_data(self):
data = {"adapters": [{"name": "hci0", "addr": "AA:BB:CC:DD:EE:FF"}], "count": 1}
line = json.dumps({"type": "resp", "id": "3", "status": "ok", "data": data})
resp = Response.from_json(line)
assert resp.data == data
def test_from_json_no_data_field(self):
line = json.dumps({"type": "resp", "id": "4", "status": "ok"})
resp = Response.from_json(line)
assert resp.data == {}
def test_from_json_missing_required_field(self):
line = json.dumps({"type": "resp", "id": "5"})
with pytest.raises(KeyError):
Response.from_json(line)
def test_from_json_invalid_json(self):
with pytest.raises((ValueError, json.JSONDecodeError)):
Response.from_json("not json at all")
class TestEvent:
def test_from_json_boot(self):
line = json.dumps(
{"type": "event", "event": "boot", "data": {"firmware": "1.0.0"}, "ts": 1000}
)
evt = Event.from_json(line)
assert evt.type == MsgType.EVENT
assert evt.event == "boot"
assert evt.data == {"firmware": "1.0.0"}
assert evt.ts == 1000
def test_from_json_pair_request_with_passkey(self):
line = json.dumps(
{
"type": "event",
"event": "pair_request",
"data": {"address": "AA:BB:CC:DD:EE:FF", "passkey": 123456},
"ts": 2000,
}
)
evt = Event.from_json(line)
assert evt.event == "pair_request"
assert evt.data["passkey"] == 123456
assert evt.data["address"] == "AA:BB:CC:DD:EE:FF"
def test_from_json_connect_disconnect(self):
for event_name in ("connect", "disconnect"):
line = json.dumps(
{"type": "event", "event": event_name, "data": {"address": "11:22:33:44:55:66"}}
)
evt = Event.from_json(line)
assert evt.event == event_name
def test_from_json_preserves_timestamp(self):
line = json.dumps({"type": "event", "event": "boot", "ts": 9999999})
evt = Event.from_json(line)
assert evt.ts == 9999999
def test_from_json_no_timestamp(self):
line = json.dumps({"type": "event", "event": "boot"})
evt = Event.from_json(line)
assert evt.ts is None
def test_from_json_no_data(self):
line = json.dumps({"type": "event", "event": "boot"})
evt = Event.from_json(line)
assert evt.data == {}
class TestParseMessage:
def test_parse_command(self):
line = json.dumps({"type": "cmd", "id": "10", "cmd": "ping"})
msg = parse_message(line)
assert isinstance(msg, Command)
assert msg.type == MsgType.CMD
assert msg.cmd == "ping"
assert msg.id == "10"
def test_parse_response(self):
line = json.dumps({"type": "resp", "id": "10", "status": "ok", "data": {"pong": True}})
msg = parse_message(line)
assert isinstance(msg, Response)
assert msg.status == Status.OK
assert msg.data == {"pong": True}
def test_parse_event(self):
line = json.dumps({"type": "event", "event": "boot", "data": {}, "ts": 500})
msg = parse_message(line)
assert isinstance(msg, Event)
assert msg.event == "boot"
assert msg.ts == 500
def test_parse_unknown_type(self):
line = json.dumps({"type": "unknown", "id": "1"})
with pytest.raises(ValueError, match="unknown message type"):
parse_message(line)
def test_parse_empty_string(self):
with pytest.raises(ValueError):
parse_message("")
def test_parse_invalid_json(self):
with pytest.raises(ValueError, match="invalid JSON"):
parse_message("{broken json")
def test_parse_whitespace_stripping(self):
line = ' {"type":"cmd","id":"1","cmd":"ping"} \n'
msg = parse_message(line)
assert isinstance(msg, Command)
assert msg.cmd == "ping"
def test_parse_command_with_params(self):
line = json.dumps({"type": "cmd", "id": "7", "cmd": "configure", "params": {"name": "dev"}})
msg = parse_message(line)
assert isinstance(msg, Command)
assert msg.params == {"name": "dev"}
def test_parse_missing_type(self):
line = json.dumps({"id": "1", "cmd": "ping"})
with pytest.raises(ValueError, match="unknown message type"):
parse_message(line)
def test_parse_command_round_trip(self):
original = build_command("get_info", {"verbose": True}, cmd_id="50")
serialized = original.to_json()
restored = parse_message(serialized)
assert isinstance(restored, Command)
assert restored.cmd == original.cmd
assert restored.id == original.id
assert restored.params == original.params