UART-controlled ESP32 peripheral for automated E2E Bluetooth testing. Dual-mode (Classic BT + BLE) via Bluedroid on original ESP32. Firmware (ESP-IDF v5.x, 2511 lines C): - NDJSON protocol over UART1 (115200 baud) - System commands: ping, reset, get_info, get_status - Classic BT: GAP, SPP, all 4 SSP pairing modes - BLE: GATTS, advertising, GATT service/characteristic management - 6 device personas: headset, speaker, keyboard, sensor, phone, bare - Event reporter: thread-safe async event queue to host Python MCP server (FastMCP, 1626 lines): - Async serial client with command/response correlation - Event queue with wait_for pattern matching - Tools: connection, configure, classic, ble, persona, events - MCP resources: esp32://status, esp32://events, esp32://personas Tests: 74 unit tests passing, 5 integration test stubs (skip without hardware)
286 lines
9.4 KiB
Python
286 lines
9.4 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import pytest
|
|
|
|
from mcbluetooth_esp32.protocol import (
|
|
BAUD_RATE,
|
|
CMD_BLE_ADVERTISE,
|
|
CMD_CLASSIC_ENABLE,
|
|
CMD_GATT_ADD_SERVICE,
|
|
CMD_GET_INFO,
|
|
CMD_PING,
|
|
CMD_RESET,
|
|
EVT_BOOT,
|
|
EVT_CONNECT,
|
|
EVT_PAIR_REQUEST,
|
|
MAX_LINE_LENGTH,
|
|
Command,
|
|
Event,
|
|
IOCapability,
|
|
MsgType,
|
|
Response,
|
|
Status,
|
|
Transport,
|
|
build_command,
|
|
parse_message,
|
|
)
|
|
|
|
|
|
class TestEnums:
|
|
def test_msg_type_values(self):
|
|
assert MsgType.CMD == "cmd"
|
|
assert MsgType.RESP == "resp"
|
|
assert MsgType.EVENT == "event"
|
|
|
|
def test_msg_type_from_string(self):
|
|
assert MsgType("cmd") is MsgType.CMD
|
|
assert MsgType("resp") is MsgType.RESP
|
|
assert MsgType("event") is MsgType.EVENT
|
|
|
|
def test_status_values(self):
|
|
assert Status.OK == "ok"
|
|
assert Status.ERROR == "error"
|
|
|
|
def test_io_capability_values(self):
|
|
assert IOCapability.DISPLAY_ONLY == "display_only"
|
|
assert IOCapability.DISPLAY_YESNO == "display_yesno"
|
|
assert IOCapability.KEYBOARD_ONLY == "keyboard_only"
|
|
assert IOCapability.NO_IO == "no_io"
|
|
assert IOCapability.KEYBOARD_DISPLAY == "keyboard_display"
|
|
|
|
def test_transport_values(self):
|
|
assert Transport.CLASSIC == "classic"
|
|
assert Transport.BLE == "ble"
|
|
assert Transport.BOTH == "both"
|
|
|
|
|
|
class TestConstants:
|
|
def test_baud_rate(self):
|
|
assert BAUD_RATE == 115200
|
|
|
|
def test_max_line_length(self):
|
|
assert MAX_LINE_LENGTH == 2048
|
|
|
|
def test_command_constants(self):
|
|
assert CMD_PING == "ping"
|
|
assert CMD_RESET == "reset"
|
|
assert CMD_GET_INFO == "get_info"
|
|
assert CMD_CLASSIC_ENABLE == "classic_enable"
|
|
assert CMD_BLE_ADVERTISE == "ble_advertise"
|
|
assert CMD_GATT_ADD_SERVICE == "gatt_add_service"
|
|
|
|
def test_event_constants(self):
|
|
assert EVT_BOOT == "boot"
|
|
assert EVT_PAIR_REQUEST == "pair_request"
|
|
assert EVT_CONNECT == "connect"
|
|
|
|
|
|
class TestCommand:
|
|
def test_build_command_simple(self):
|
|
cmd = build_command("ping")
|
|
assert cmd.type == MsgType.CMD
|
|
assert cmd.cmd == "ping"
|
|
assert cmd.params == {}
|
|
|
|
def test_build_command_with_params(self):
|
|
cmd = build_command("configure", {"name": "test"})
|
|
assert cmd.type == MsgType.CMD
|
|
assert cmd.cmd == "configure"
|
|
assert cmd.params == {"name": "test"}
|
|
|
|
def test_build_command_with_explicit_id(self):
|
|
cmd = build_command("ping", cmd_id="42")
|
|
assert cmd.id == "42"
|
|
|
|
def test_auto_incrementing_ids(self):
|
|
cmd1 = build_command("ping")
|
|
cmd2 = build_command("ping")
|
|
assert cmd1.id != cmd2.id
|
|
assert int(cmd2.id) > int(cmd1.id)
|
|
|
|
def test_to_json_produces_valid_json(self):
|
|
cmd = build_command("ping", cmd_id="1")
|
|
raw = cmd.to_json()
|
|
parsed = json.loads(raw)
|
|
assert parsed["type"] == "cmd"
|
|
assert parsed["id"] == "1"
|
|
assert parsed["cmd"] == "ping"
|
|
|
|
def test_to_json_single_line(self):
|
|
cmd = build_command("configure", {"name": "test", "io": "no_io"}, cmd_id="5")
|
|
raw = cmd.to_json()
|
|
assert "\n" not in raw
|
|
|
|
def test_to_json_omits_empty_params(self):
|
|
cmd = build_command("ping", cmd_id="1")
|
|
parsed = json.loads(cmd.to_json())
|
|
assert "params" not in parsed
|
|
|
|
def test_to_json_includes_params_when_present(self):
|
|
cmd = build_command("configure", {"name": "test"}, cmd_id="1")
|
|
parsed = json.loads(cmd.to_json())
|
|
assert parsed["params"] == {"name": "test"}
|
|
|
|
def test_to_json_round_trip(self):
|
|
cmd = build_command("gatt_add_service", {"uuid": "180F", "primary": True}, cmd_id="99")
|
|
raw = cmd.to_json()
|
|
parsed = json.loads(raw)
|
|
assert parsed == {
|
|
"type": "cmd",
|
|
"id": "99",
|
|
"cmd": "gatt_add_service",
|
|
"params": {"uuid": "180F", "primary": True},
|
|
}
|
|
|
|
|
|
class TestResponse:
|
|
def test_from_json_ok(self):
|
|
line = json.dumps({"type": "resp", "id": "1", "status": "ok", "data": {"version": "1.0"}})
|
|
resp = Response.from_json(line)
|
|
assert resp.type == MsgType.RESP
|
|
assert resp.id == "1"
|
|
assert resp.status == Status.OK
|
|
assert resp.data == {"version": "1.0"}
|
|
|
|
def test_from_json_error(self):
|
|
line = json.dumps(
|
|
{"type": "resp", "id": "2", "status": "error", "data": {"msg": "timeout"}}
|
|
)
|
|
resp = Response.from_json(line)
|
|
assert resp.status == Status.ERROR
|
|
assert resp.data["msg"] == "timeout"
|
|
|
|
def test_from_json_complex_data(self):
|
|
data = {"adapters": [{"name": "hci0", "addr": "AA:BB:CC:DD:EE:FF"}], "count": 1}
|
|
line = json.dumps({"type": "resp", "id": "3", "status": "ok", "data": data})
|
|
resp = Response.from_json(line)
|
|
assert resp.data == data
|
|
|
|
def test_from_json_no_data_field(self):
|
|
line = json.dumps({"type": "resp", "id": "4", "status": "ok"})
|
|
resp = Response.from_json(line)
|
|
assert resp.data == {}
|
|
|
|
def test_from_json_missing_required_field(self):
|
|
line = json.dumps({"type": "resp", "id": "5"})
|
|
with pytest.raises(KeyError):
|
|
Response.from_json(line)
|
|
|
|
def test_from_json_invalid_json(self):
|
|
with pytest.raises((ValueError, json.JSONDecodeError)):
|
|
Response.from_json("not json at all")
|
|
|
|
|
|
class TestEvent:
|
|
def test_from_json_boot(self):
|
|
line = json.dumps(
|
|
{"type": "event", "event": "boot", "data": {"firmware": "1.0.0"}, "ts": 1000}
|
|
)
|
|
evt = Event.from_json(line)
|
|
assert evt.type == MsgType.EVENT
|
|
assert evt.event == "boot"
|
|
assert evt.data == {"firmware": "1.0.0"}
|
|
assert evt.ts == 1000
|
|
|
|
def test_from_json_pair_request_with_passkey(self):
|
|
line = json.dumps(
|
|
{
|
|
"type": "event",
|
|
"event": "pair_request",
|
|
"data": {"address": "AA:BB:CC:DD:EE:FF", "passkey": 123456},
|
|
"ts": 2000,
|
|
}
|
|
)
|
|
evt = Event.from_json(line)
|
|
assert evt.event == "pair_request"
|
|
assert evt.data["passkey"] == 123456
|
|
assert evt.data["address"] == "AA:BB:CC:DD:EE:FF"
|
|
|
|
def test_from_json_connect_disconnect(self):
|
|
for event_name in ("connect", "disconnect"):
|
|
line = json.dumps(
|
|
{"type": "event", "event": event_name, "data": {"address": "11:22:33:44:55:66"}}
|
|
)
|
|
evt = Event.from_json(line)
|
|
assert evt.event == event_name
|
|
|
|
def test_from_json_preserves_timestamp(self):
|
|
line = json.dumps({"type": "event", "event": "boot", "ts": 9999999})
|
|
evt = Event.from_json(line)
|
|
assert evt.ts == 9999999
|
|
|
|
def test_from_json_no_timestamp(self):
|
|
line = json.dumps({"type": "event", "event": "boot"})
|
|
evt = Event.from_json(line)
|
|
assert evt.ts is None
|
|
|
|
def test_from_json_no_data(self):
|
|
line = json.dumps({"type": "event", "event": "boot"})
|
|
evt = Event.from_json(line)
|
|
assert evt.data == {}
|
|
|
|
|
|
class TestParseMessage:
|
|
def test_parse_command(self):
|
|
line = json.dumps({"type": "cmd", "id": "10", "cmd": "ping"})
|
|
msg = parse_message(line)
|
|
assert isinstance(msg, Command)
|
|
assert msg.type == MsgType.CMD
|
|
assert msg.cmd == "ping"
|
|
assert msg.id == "10"
|
|
|
|
def test_parse_response(self):
|
|
line = json.dumps({"type": "resp", "id": "10", "status": "ok", "data": {"pong": True}})
|
|
msg = parse_message(line)
|
|
assert isinstance(msg, Response)
|
|
assert msg.status == Status.OK
|
|
assert msg.data == {"pong": True}
|
|
|
|
def test_parse_event(self):
|
|
line = json.dumps({"type": "event", "event": "boot", "data": {}, "ts": 500})
|
|
msg = parse_message(line)
|
|
assert isinstance(msg, Event)
|
|
assert msg.event == "boot"
|
|
assert msg.ts == 500
|
|
|
|
def test_parse_unknown_type(self):
|
|
line = json.dumps({"type": "unknown", "id": "1"})
|
|
with pytest.raises(ValueError, match="unknown message type"):
|
|
parse_message(line)
|
|
|
|
def test_parse_empty_string(self):
|
|
with pytest.raises(ValueError):
|
|
parse_message("")
|
|
|
|
def test_parse_invalid_json(self):
|
|
with pytest.raises(ValueError, match="invalid JSON"):
|
|
parse_message("{broken json")
|
|
|
|
def test_parse_whitespace_stripping(self):
|
|
line = ' {"type":"cmd","id":"1","cmd":"ping"} \n'
|
|
msg = parse_message(line)
|
|
assert isinstance(msg, Command)
|
|
assert msg.cmd == "ping"
|
|
|
|
def test_parse_command_with_params(self):
|
|
line = json.dumps({"type": "cmd", "id": "7", "cmd": "configure", "params": {"name": "dev"}})
|
|
msg = parse_message(line)
|
|
assert isinstance(msg, Command)
|
|
assert msg.params == {"name": "dev"}
|
|
|
|
def test_parse_missing_type(self):
|
|
line = json.dumps({"id": "1", "cmd": "ping"})
|
|
with pytest.raises(ValueError, match="unknown message type"):
|
|
parse_message(line)
|
|
|
|
def test_parse_command_round_trip(self):
|
|
original = build_command("get_info", {"verbose": True}, cmd_id="50")
|
|
serialized = original.to_json()
|
|
restored = parse_message(serialized)
|
|
assert isinstance(restored, Command)
|
|
assert restored.cmd == original.cmd
|
|
assert restored.id == original.id
|
|
assert restored.params == original.params
|