mcbluetooth-esp32/tests/test_serial_client.py
Ryan Malloy 6398a5223a ESP32 Bluetooth test harness MCP server
UART-controlled ESP32 peripheral for automated E2E Bluetooth testing.
Dual-mode (Classic BT + BLE) via Bluedroid on original ESP32.

Firmware (ESP-IDF v5.x, 2511 lines C):
- NDJSON protocol over UART1 (115200 baud)
- System commands: ping, reset, get_info, get_status
- Classic BT: GAP, SPP, all 4 SSP pairing modes
- BLE: GATTS, advertising, GATT service/characteristic management
- 6 device personas: headset, speaker, keyboard, sensor, phone, bare
- Event reporter: thread-safe async event queue to host

Python MCP server (FastMCP, 1626 lines):
- Async serial client with command/response correlation
- Event queue with wait_for pattern matching
- Tools: connection, configure, classic, ble, persona, events
- MCP resources: esp32://status, esp32://events, esp32://personas

Tests: 74 unit tests passing, 5 integration test stubs (skip without hardware)
2026-02-02 15:12:28 -07:00

309 lines
12 KiB
Python

"""Unit tests for mcbluetooth_esp32.serial_client — mocked serial, no hardware needed."""
from __future__ import annotations
import asyncio
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mcbluetooth_esp32.protocol import CMD_GET_INFO, CMD_PING, Status
from mcbluetooth_esp32.serial_client import CommandTimeout, NotConnected, SerialClient
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _resp_line(cmd_id: str, status: str = "ok", data: dict | None = None) -> bytes:
"""Build a raw NDJSON response line as bytes (what the reader yields)."""
obj = {"type": "resp", "id": cmd_id, "status": status}
if data:
obj["data"] = data
return json.dumps(obj, separators=(",", ":")).encode() + b"\n"
def _event_line(event_name: str, data: dict | None = None, ts: int | None = None) -> bytes:
"""Build a raw NDJSON event line as bytes."""
obj: dict = {"type": "event", "event": event_name}
if data:
obj["data"] = data
if ts is not None:
obj["ts"] = ts
return json.dumps(obj, separators=(",", ":")).encode() + b"\n"
# ---------------------------------------------------------------------------
# Tests: offline (no connection at all)
# ---------------------------------------------------------------------------
class TestSerialClientOffline:
"""Verify behaviour when the client is not connected."""
async def test_send_command_raises_not_connected(self):
client = SerialClient(port="/dev/ttyUSB0")
with pytest.raises(NotConnected):
await client.send_command(CMD_PING)
async def test_connected_is_false_initially(self):
client = SerialClient(port="/dev/ttyUSB0")
assert client.connected is False
async def test_disconnect_when_not_connected_is_noop(self):
client = SerialClient(port="/dev/ttyUSB0")
# Should not raise
await client.disconnect()
assert client.connected is False
# ---------------------------------------------------------------------------
# Tests: mocked serial connection
# ---------------------------------------------------------------------------
class TestSerialClientMocked:
"""Tests against a mocked serial_asyncio transport."""
@pytest.fixture
async def mock_client(self):
"""Create a SerialClient with mocked serial connection.
The reader is set up with a response queue. Test code pushes raw
bytes into ``reader_lines`` and the mock readline() pops them off.
The ``drain_called`` event can be awaited to know when a write has
been flushed.
"""
client = SerialClient(port="/dev/ttyUSB0", timeout=2.0)
reader = AsyncMock(spec=asyncio.StreamReader)
writer = MagicMock()
writer.write = MagicMock()
writer.drain = AsyncMock()
writer.close = MagicMock()
writer.wait_closed = AsyncMock()
# Queue of lines the reader will yield; tests push into this.
reader_lines: asyncio.Queue[bytes] = asyncio.Queue()
async def _readline():
return await reader_lines.get()
reader.readline = _readline
with patch("serial_asyncio.open_serial_connection", return_value=(reader, writer)):
await client.connect()
# Sanity: the connect above started _read_loop with the real reader
# which is our mock.
yield client, reader_lines, writer
if client.connected:
# Push empty bytes to unblock reader_lines.get() so disconnect can cancel cleanly
reader_lines.put_nowait(b"")
await client.disconnect()
# ------------------------------------------------------------------
# send_command basics
# ------------------------------------------------------------------
async def test_send_command_writes_json_line(self, mock_client):
"""Verify the client writes properly formatted NDJSON to the writer."""
client, reader_lines, writer = mock_client
async def _respond_soon():
"""Wait for the write, parse the command ID, then inject a response."""
await asyncio.sleep(0.05)
# Extract what was written
call_args = writer.write.call_args
assert call_args is not None, "writer.write was not called"
raw = call_args[0][0]
obj = json.loads(raw)
assert obj["type"] == "cmd"
assert obj["cmd"] == CMD_PING
# Push a matching response
reader_lines.put_nowait(_resp_line(obj["id"]))
task = asyncio.create_task(_respond_soon())
resp = await client.send_command(CMD_PING)
await task
assert resp.status == Status.OK
assert resp.id is not None
async def test_response_correlation(self, mock_client):
"""Verify the correct response is returned for a given command ID."""
client, reader_lines, writer = mock_client
expected_data = {"firmware": "1.0.0", "mac": "AA:BB:CC:DD:EE:FF"}
async def _respond():
await asyncio.sleep(0.05)
raw = writer.write.call_args[0][0]
cmd_id = json.loads(raw)["id"]
reader_lines.put_nowait(_resp_line(cmd_id, data=expected_data))
task = asyncio.create_task(_respond())
resp = await client.send_command(CMD_GET_INFO)
await task
assert resp.status == Status.OK
assert resp.data["firmware"] == "1.0.0"
assert resp.data["mac"] == "AA:BB:CC:DD:EE:FF"
# ------------------------------------------------------------------
# Event routing
# ------------------------------------------------------------------
async def test_event_routing_to_event_queue(self, mock_client):
"""Events from the reader should land in the EventQueue."""
client, reader_lines, writer = mock_client
# Push an event line
reader_lines.put_nowait(_event_line("boot", data={"version": "1.0"}, ts=12345))
# Give the read loop time to consume it
await asyncio.sleep(0.1)
events = client.event_queue.get_events(event_name="boot")
assert len(events) == 1
assert events[0].event == "boot"
assert events[0].data["version"] == "1.0"
assert events[0].ts == 12345
async def test_multiple_events_accumulated(self, mock_client):
"""Multiple events should all appear in the queue."""
client, reader_lines, writer = mock_client
for i in range(5):
reader_lines.put_nowait(_event_line("connect", data={"index": i}))
await asyncio.sleep(0.15)
events = client.event_queue.get_events(event_name="connect")
assert len(events) == 5
# ------------------------------------------------------------------
# Timeout
# ------------------------------------------------------------------
async def test_command_timeout(self, mock_client):
"""A command with no response should raise CommandTimeout."""
client, reader_lines, writer = mock_client
with pytest.raises(CommandTimeout):
await client.send_command(CMD_PING, timeout=0.1)
# ------------------------------------------------------------------
# Concurrent commands
# ------------------------------------------------------------------
async def test_multiple_concurrent_commands(self, mock_client):
"""Two commands in flight at once should each get the right response."""
client, reader_lines, writer = mock_client
captured_ids: list[str] = []
async def _respond_after_both():
# Wait for two writes
while len(writer.write.call_args_list) < 2:
await asyncio.sleep(0.02)
for call in writer.write.call_args_list:
raw = call[0][0]
obj = json.loads(raw)
captured_ids.append(obj["id"])
# Respond in reverse order to test correlation
reader_lines.put_nowait(_resp_line(captured_ids[1], data={"which": "second"}))
reader_lines.put_nowait(_resp_line(captured_ids[0], data={"which": "first"}))
responder = asyncio.create_task(_respond_after_both())
r1, r2 = await asyncio.gather(
client.send_command(CMD_PING),
client.send_command(CMD_GET_INFO),
)
await responder
# Each response should match its own command regardless of delivery order
assert r1.data["which"] == "first"
assert r2.data["which"] == "second"
# ------------------------------------------------------------------
# Disconnect
# ------------------------------------------------------------------
async def test_disconnect_cancels_pending_futures(self, mock_client):
"""Pending commands should be cancelled when we disconnect."""
client, reader_lines, writer = mock_client
async def _fire_and_forget():
with pytest.raises((CommandTimeout, asyncio.CancelledError, NotConnected)):
await client.send_command(CMD_PING, timeout=5.0)
cmd_task = asyncio.create_task(_fire_and_forget())
await asyncio.sleep(0.05)
# Push empty bytes to unblock the readline so disconnect can cancel the read loop
reader_lines.put_nowait(b"")
await client.disconnect()
assert client.connected is False
# The command task should finish (cancelled or timed-out — either is fine)
await asyncio.wait_for(cmd_task, timeout=2.0)
async def test_disconnect_is_idempotent(self, mock_client):
"""Calling disconnect twice should not raise."""
client, reader_lines, writer = mock_client
reader_lines.put_nowait(b"")
await client.disconnect()
# Second call is a no-op
await client.disconnect()
assert client.connected is False
# ------------------------------------------------------------------
# Edge cases
# ------------------------------------------------------------------
async def test_unparseable_line_is_dropped(self, mock_client):
"""Garbage on the wire should be silently dropped."""
client, reader_lines, writer = mock_client
reader_lines.put_nowait(b"NOT-JSON-AT-ALL\n")
# Valid event right after to prove the loop keeps running
reader_lines.put_nowait(_event_line("boot"))
await asyncio.sleep(0.15)
events = client.event_queue.get_events(event_name="boot")
assert len(events) == 1
async def test_oversized_line_is_dropped(self, mock_client):
"""Lines exceeding MAX_LINE_LENGTH should be dropped."""
client, reader_lines, writer = mock_client
# 3000 bytes of 'a' is above the 2048 limit
reader_lines.put_nowait(b"a" * 3000 + b"\n")
reader_lines.put_nowait(_event_line("boot"))
await asyncio.sleep(0.15)
events = client.event_queue.get_events(event_name="boot")
assert len(events) == 1
async def test_error_response_propagates(self, mock_client):
"""An error response should be returned (not raised) with status=error."""
client, reader_lines, writer = mock_client
async def _respond_error():
await asyncio.sleep(0.05)
raw = writer.write.call_args[0][0]
cmd_id = json.loads(raw)["id"]
reader_lines.put_nowait(_resp_line(cmd_id, status="error", data={"msg": "bad"}))
task = asyncio.create_task(_respond_error())
resp = await client.send_command(CMD_PING)
await task
assert resp.status == Status.ERROR
assert resp.data["msg"] == "bad"