UART-controlled ESP32 peripheral for automated E2E Bluetooth testing. Dual-mode (Classic BT + BLE) via Bluedroid on original ESP32. Firmware (ESP-IDF v5.x, 2511 lines C): - NDJSON protocol over UART1 (115200 baud) - System commands: ping, reset, get_info, get_status - Classic BT: GAP, SPP, all 4 SSP pairing modes - BLE: GATTS, advertising, GATT service/characteristic management - 6 device personas: headset, speaker, keyboard, sensor, phone, bare - Event reporter: thread-safe async event queue to host Python MCP server (FastMCP, 1626 lines): - Async serial client with command/response correlation - Event queue with wait_for pattern matching - Tools: connection, configure, classic, ble, persona, events - MCP resources: esp32://status, esp32://events, esp32://personas Tests: 74 unit tests passing, 5 integration test stubs (skip without hardware)
309 lines
12 KiB
Python
309 lines
12 KiB
Python
"""Unit tests for mcbluetooth_esp32.serial_client — mocked serial, no hardware needed."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from mcbluetooth_esp32.protocol import CMD_GET_INFO, CMD_PING, Status
|
|
from mcbluetooth_esp32.serial_client import CommandTimeout, NotConnected, SerialClient
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _resp_line(cmd_id: str, status: str = "ok", data: dict | None = None) -> bytes:
|
|
"""Build a raw NDJSON response line as bytes (what the reader yields)."""
|
|
obj = {"type": "resp", "id": cmd_id, "status": status}
|
|
if data:
|
|
obj["data"] = data
|
|
return json.dumps(obj, separators=(",", ":")).encode() + b"\n"
|
|
|
|
|
|
def _event_line(event_name: str, data: dict | None = None, ts: int | None = None) -> bytes:
|
|
"""Build a raw NDJSON event line as bytes."""
|
|
obj: dict = {"type": "event", "event": event_name}
|
|
if data:
|
|
obj["data"] = data
|
|
if ts is not None:
|
|
obj["ts"] = ts
|
|
return json.dumps(obj, separators=(",", ":")).encode() + b"\n"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: offline (no connection at all)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSerialClientOffline:
|
|
"""Verify behaviour when the client is not connected."""
|
|
|
|
async def test_send_command_raises_not_connected(self):
|
|
client = SerialClient(port="/dev/ttyUSB0")
|
|
with pytest.raises(NotConnected):
|
|
await client.send_command(CMD_PING)
|
|
|
|
async def test_connected_is_false_initially(self):
|
|
client = SerialClient(port="/dev/ttyUSB0")
|
|
assert client.connected is False
|
|
|
|
async def test_disconnect_when_not_connected_is_noop(self):
|
|
client = SerialClient(port="/dev/ttyUSB0")
|
|
# Should not raise
|
|
await client.disconnect()
|
|
assert client.connected is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: mocked serial connection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSerialClientMocked:
|
|
"""Tests against a mocked serial_asyncio transport."""
|
|
|
|
@pytest.fixture
|
|
async def mock_client(self):
|
|
"""Create a SerialClient with mocked serial connection.
|
|
|
|
The reader is set up with a response queue. Test code pushes raw
|
|
bytes into ``reader_lines`` and the mock readline() pops them off.
|
|
The ``drain_called`` event can be awaited to know when a write has
|
|
been flushed.
|
|
"""
|
|
client = SerialClient(port="/dev/ttyUSB0", timeout=2.0)
|
|
|
|
reader = AsyncMock(spec=asyncio.StreamReader)
|
|
writer = MagicMock()
|
|
writer.write = MagicMock()
|
|
writer.drain = AsyncMock()
|
|
writer.close = MagicMock()
|
|
writer.wait_closed = AsyncMock()
|
|
|
|
# Queue of lines the reader will yield; tests push into this.
|
|
reader_lines: asyncio.Queue[bytes] = asyncio.Queue()
|
|
|
|
async def _readline():
|
|
return await reader_lines.get()
|
|
|
|
reader.readline = _readline
|
|
|
|
with patch("serial_asyncio.open_serial_connection", return_value=(reader, writer)):
|
|
await client.connect()
|
|
# Sanity: the connect above started _read_loop with the real reader
|
|
# which is our mock.
|
|
yield client, reader_lines, writer
|
|
|
|
if client.connected:
|
|
# Push empty bytes to unblock reader_lines.get() so disconnect can cancel cleanly
|
|
reader_lines.put_nowait(b"")
|
|
await client.disconnect()
|
|
|
|
# ------------------------------------------------------------------
|
|
# send_command basics
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_send_command_writes_json_line(self, mock_client):
|
|
"""Verify the client writes properly formatted NDJSON to the writer."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
async def _respond_soon():
|
|
"""Wait for the write, parse the command ID, then inject a response."""
|
|
await asyncio.sleep(0.05)
|
|
# Extract what was written
|
|
call_args = writer.write.call_args
|
|
assert call_args is not None, "writer.write was not called"
|
|
raw = call_args[0][0]
|
|
obj = json.loads(raw)
|
|
assert obj["type"] == "cmd"
|
|
assert obj["cmd"] == CMD_PING
|
|
# Push a matching response
|
|
reader_lines.put_nowait(_resp_line(obj["id"]))
|
|
|
|
task = asyncio.create_task(_respond_soon())
|
|
resp = await client.send_command(CMD_PING)
|
|
await task
|
|
|
|
assert resp.status == Status.OK
|
|
assert resp.id is not None
|
|
|
|
async def test_response_correlation(self, mock_client):
|
|
"""Verify the correct response is returned for a given command ID."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
expected_data = {"firmware": "1.0.0", "mac": "AA:BB:CC:DD:EE:FF"}
|
|
|
|
async def _respond():
|
|
await asyncio.sleep(0.05)
|
|
raw = writer.write.call_args[0][0]
|
|
cmd_id = json.loads(raw)["id"]
|
|
reader_lines.put_nowait(_resp_line(cmd_id, data=expected_data))
|
|
|
|
task = asyncio.create_task(_respond())
|
|
resp = await client.send_command(CMD_GET_INFO)
|
|
await task
|
|
|
|
assert resp.status == Status.OK
|
|
assert resp.data["firmware"] == "1.0.0"
|
|
assert resp.data["mac"] == "AA:BB:CC:DD:EE:FF"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Event routing
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_event_routing_to_event_queue(self, mock_client):
|
|
"""Events from the reader should land in the EventQueue."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
# Push an event line
|
|
reader_lines.put_nowait(_event_line("boot", data={"version": "1.0"}, ts=12345))
|
|
|
|
# Give the read loop time to consume it
|
|
await asyncio.sleep(0.1)
|
|
|
|
events = client.event_queue.get_events(event_name="boot")
|
|
assert len(events) == 1
|
|
assert events[0].event == "boot"
|
|
assert events[0].data["version"] == "1.0"
|
|
assert events[0].ts == 12345
|
|
|
|
async def test_multiple_events_accumulated(self, mock_client):
|
|
"""Multiple events should all appear in the queue."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
for i in range(5):
|
|
reader_lines.put_nowait(_event_line("connect", data={"index": i}))
|
|
|
|
await asyncio.sleep(0.15)
|
|
|
|
events = client.event_queue.get_events(event_name="connect")
|
|
assert len(events) == 5
|
|
|
|
# ------------------------------------------------------------------
|
|
# Timeout
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_command_timeout(self, mock_client):
|
|
"""A command with no response should raise CommandTimeout."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
with pytest.raises(CommandTimeout):
|
|
await client.send_command(CMD_PING, timeout=0.1)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Concurrent commands
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_multiple_concurrent_commands(self, mock_client):
|
|
"""Two commands in flight at once should each get the right response."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
captured_ids: list[str] = []
|
|
|
|
async def _respond_after_both():
|
|
# Wait for two writes
|
|
while len(writer.write.call_args_list) < 2:
|
|
await asyncio.sleep(0.02)
|
|
|
|
for call in writer.write.call_args_list:
|
|
raw = call[0][0]
|
|
obj = json.loads(raw)
|
|
captured_ids.append(obj["id"])
|
|
|
|
# Respond in reverse order to test correlation
|
|
reader_lines.put_nowait(_resp_line(captured_ids[1], data={"which": "second"}))
|
|
reader_lines.put_nowait(_resp_line(captured_ids[0], data={"which": "first"}))
|
|
|
|
responder = asyncio.create_task(_respond_after_both())
|
|
|
|
r1, r2 = await asyncio.gather(
|
|
client.send_command(CMD_PING),
|
|
client.send_command(CMD_GET_INFO),
|
|
)
|
|
await responder
|
|
|
|
# Each response should match its own command regardless of delivery order
|
|
assert r1.data["which"] == "first"
|
|
assert r2.data["which"] == "second"
|
|
|
|
# ------------------------------------------------------------------
|
|
# Disconnect
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_disconnect_cancels_pending_futures(self, mock_client):
|
|
"""Pending commands should be cancelled when we disconnect."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
async def _fire_and_forget():
|
|
with pytest.raises((CommandTimeout, asyncio.CancelledError, NotConnected)):
|
|
await client.send_command(CMD_PING, timeout=5.0)
|
|
|
|
cmd_task = asyncio.create_task(_fire_and_forget())
|
|
|
|
await asyncio.sleep(0.05)
|
|
# Push empty bytes to unblock the readline so disconnect can cancel the read loop
|
|
reader_lines.put_nowait(b"")
|
|
await client.disconnect()
|
|
|
|
assert client.connected is False
|
|
# The command task should finish (cancelled or timed-out — either is fine)
|
|
await asyncio.wait_for(cmd_task, timeout=2.0)
|
|
|
|
async def test_disconnect_is_idempotent(self, mock_client):
|
|
"""Calling disconnect twice should not raise."""
|
|
client, reader_lines, writer = mock_client
|
|
reader_lines.put_nowait(b"")
|
|
await client.disconnect()
|
|
# Second call is a no-op
|
|
await client.disconnect()
|
|
assert client.connected is False
|
|
|
|
# ------------------------------------------------------------------
|
|
# Edge cases
|
|
# ------------------------------------------------------------------
|
|
|
|
async def test_unparseable_line_is_dropped(self, mock_client):
|
|
"""Garbage on the wire should be silently dropped."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
reader_lines.put_nowait(b"NOT-JSON-AT-ALL\n")
|
|
# Valid event right after to prove the loop keeps running
|
|
reader_lines.put_nowait(_event_line("boot"))
|
|
await asyncio.sleep(0.15)
|
|
|
|
events = client.event_queue.get_events(event_name="boot")
|
|
assert len(events) == 1
|
|
|
|
async def test_oversized_line_is_dropped(self, mock_client):
|
|
"""Lines exceeding MAX_LINE_LENGTH should be dropped."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
# 3000 bytes of 'a' is above the 2048 limit
|
|
reader_lines.put_nowait(b"a" * 3000 + b"\n")
|
|
reader_lines.put_nowait(_event_line("boot"))
|
|
await asyncio.sleep(0.15)
|
|
|
|
events = client.event_queue.get_events(event_name="boot")
|
|
assert len(events) == 1
|
|
|
|
async def test_error_response_propagates(self, mock_client):
|
|
"""An error response should be returned (not raised) with status=error."""
|
|
client, reader_lines, writer = mock_client
|
|
|
|
async def _respond_error():
|
|
await asyncio.sleep(0.05)
|
|
raw = writer.write.call_args[0][0]
|
|
cmd_id = json.loads(raw)["id"]
|
|
reader_lines.put_nowait(_resp_line(cmd_id, status="error", data={"msg": "bad"}))
|
|
|
|
task = asyncio.create_task(_respond_error())
|
|
resp = await client.send_command(CMD_PING)
|
|
await task
|
|
|
|
assert resp.status == Status.ERROR
|
|
assert resp.data["msg"] == "bad"
|