"""Unit tests for mcbluetooth_esp32.serial_client — mocked serial, no hardware needed.""" from __future__ import annotations import asyncio import json from unittest.mock import AsyncMock, MagicMock, patch import pytest from mcbluetooth_esp32.protocol import CMD_GET_INFO, CMD_PING, Status from mcbluetooth_esp32.serial_client import CommandTimeout, NotConnected, SerialClient # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _resp_line(cmd_id: str, status: str = "ok", data: dict | None = None) -> bytes: """Build a raw NDJSON response line as bytes (what the reader yields).""" obj = {"type": "resp", "id": cmd_id, "status": status} if data: obj["data"] = data return json.dumps(obj, separators=(",", ":")).encode() + b"\n" def _event_line(event_name: str, data: dict | None = None, ts: int | None = None) -> bytes: """Build a raw NDJSON event line as bytes.""" obj: dict = {"type": "event", "event": event_name} if data: obj["data"] = data if ts is not None: obj["ts"] = ts return json.dumps(obj, separators=(",", ":")).encode() + b"\n" # --------------------------------------------------------------------------- # Tests: offline (no connection at all) # --------------------------------------------------------------------------- class TestSerialClientOffline: """Verify behaviour when the client is not connected.""" async def test_send_command_raises_not_connected(self): client = SerialClient(port="/dev/ttyUSB0") with pytest.raises(NotConnected): await client.send_command(CMD_PING) async def test_connected_is_false_initially(self): client = SerialClient(port="/dev/ttyUSB0") assert client.connected is False async def test_disconnect_when_not_connected_is_noop(self): client = SerialClient(port="/dev/ttyUSB0") # Should not raise await client.disconnect() assert client.connected is False # --------------------------------------------------------------------------- # Tests: mocked serial connection # --------------------------------------------------------------------------- class TestSerialClientMocked: """Tests against a mocked serial_asyncio transport.""" @pytest.fixture async def mock_client(self): """Create a SerialClient with mocked serial connection. The reader is set up with a response queue. Test code pushes raw bytes into ``reader_lines`` and the mock readline() pops them off. The ``drain_called`` event can be awaited to know when a write has been flushed. """ client = SerialClient(port="/dev/ttyUSB0", timeout=2.0) reader = AsyncMock(spec=asyncio.StreamReader) writer = MagicMock() writer.write = MagicMock() writer.drain = AsyncMock() writer.close = MagicMock() writer.wait_closed = AsyncMock() # Queue of lines the reader will yield; tests push into this. reader_lines: asyncio.Queue[bytes] = asyncio.Queue() async def _readline(): return await reader_lines.get() reader.readline = _readline with patch("serial_asyncio.open_serial_connection", return_value=(reader, writer)): await client.connect() # Sanity: the connect above started _read_loop with the real reader # which is our mock. yield client, reader_lines, writer if client.connected: # Push empty bytes to unblock reader_lines.get() so disconnect can cancel cleanly reader_lines.put_nowait(b"") await client.disconnect() # ------------------------------------------------------------------ # send_command basics # ------------------------------------------------------------------ async def test_send_command_writes_json_line(self, mock_client): """Verify the client writes properly formatted NDJSON to the writer.""" client, reader_lines, writer = mock_client async def _respond_soon(): """Wait for the write, parse the command ID, then inject a response.""" await asyncio.sleep(0.05) # Extract what was written call_args = writer.write.call_args assert call_args is not None, "writer.write was not called" raw = call_args[0][0] obj = json.loads(raw) assert obj["type"] == "cmd" assert obj["cmd"] == CMD_PING # Push a matching response reader_lines.put_nowait(_resp_line(obj["id"])) task = asyncio.create_task(_respond_soon()) resp = await client.send_command(CMD_PING) await task assert resp.status == Status.OK assert resp.id is not None async def test_response_correlation(self, mock_client): """Verify the correct response is returned for a given command ID.""" client, reader_lines, writer = mock_client expected_data = {"firmware": "1.0.0", "mac": "AA:BB:CC:DD:EE:FF"} async def _respond(): await asyncio.sleep(0.05) raw = writer.write.call_args[0][0] cmd_id = json.loads(raw)["id"] reader_lines.put_nowait(_resp_line(cmd_id, data=expected_data)) task = asyncio.create_task(_respond()) resp = await client.send_command(CMD_GET_INFO) await task assert resp.status == Status.OK assert resp.data["firmware"] == "1.0.0" assert resp.data["mac"] == "AA:BB:CC:DD:EE:FF" # ------------------------------------------------------------------ # Event routing # ------------------------------------------------------------------ async def test_event_routing_to_event_queue(self, mock_client): """Events from the reader should land in the EventQueue.""" client, reader_lines, writer = mock_client # Push an event line reader_lines.put_nowait(_event_line("boot", data={"version": "1.0"}, ts=12345)) # Give the read loop time to consume it await asyncio.sleep(0.1) events = client.event_queue.get_events(event_name="boot") assert len(events) == 1 assert events[0].event == "boot" assert events[0].data["version"] == "1.0" assert events[0].ts == 12345 async def test_multiple_events_accumulated(self, mock_client): """Multiple events should all appear in the queue.""" client, reader_lines, writer = mock_client for i in range(5): reader_lines.put_nowait(_event_line("connect", data={"index": i})) await asyncio.sleep(0.15) events = client.event_queue.get_events(event_name="connect") assert len(events) == 5 # ------------------------------------------------------------------ # Timeout # ------------------------------------------------------------------ async def test_command_timeout(self, mock_client): """A command with no response should raise CommandTimeout.""" client, reader_lines, writer = mock_client with pytest.raises(CommandTimeout): await client.send_command(CMD_PING, timeout=0.1) # ------------------------------------------------------------------ # Concurrent commands # ------------------------------------------------------------------ async def test_multiple_concurrent_commands(self, mock_client): """Two commands in flight at once should each get the right response.""" client, reader_lines, writer = mock_client captured_ids: list[str] = [] async def _respond_after_both(): # Wait for two writes while len(writer.write.call_args_list) < 2: await asyncio.sleep(0.02) for call in writer.write.call_args_list: raw = call[0][0] obj = json.loads(raw) captured_ids.append(obj["id"]) # Respond in reverse order to test correlation reader_lines.put_nowait(_resp_line(captured_ids[1], data={"which": "second"})) reader_lines.put_nowait(_resp_line(captured_ids[0], data={"which": "first"})) responder = asyncio.create_task(_respond_after_both()) r1, r2 = await asyncio.gather( client.send_command(CMD_PING), client.send_command(CMD_GET_INFO), ) await responder # Each response should match its own command regardless of delivery order assert r1.data["which"] == "first" assert r2.data["which"] == "second" # ------------------------------------------------------------------ # Disconnect # ------------------------------------------------------------------ async def test_disconnect_cancels_pending_futures(self, mock_client): """Pending commands should be cancelled when we disconnect.""" client, reader_lines, writer = mock_client async def _fire_and_forget(): with pytest.raises((CommandTimeout, asyncio.CancelledError, NotConnected)): await client.send_command(CMD_PING, timeout=5.0) cmd_task = asyncio.create_task(_fire_and_forget()) await asyncio.sleep(0.05) # Push empty bytes to unblock the readline so disconnect can cancel the read loop reader_lines.put_nowait(b"") await client.disconnect() assert client.connected is False # The command task should finish (cancelled or timed-out — either is fine) await asyncio.wait_for(cmd_task, timeout=2.0) async def test_disconnect_is_idempotent(self, mock_client): """Calling disconnect twice should not raise.""" client, reader_lines, writer = mock_client reader_lines.put_nowait(b"") await client.disconnect() # Second call is a no-op await client.disconnect() assert client.connected is False # ------------------------------------------------------------------ # Edge cases # ------------------------------------------------------------------ async def test_unparseable_line_is_dropped(self, mock_client): """Garbage on the wire should be silently dropped.""" client, reader_lines, writer = mock_client reader_lines.put_nowait(b"NOT-JSON-AT-ALL\n") # Valid event right after to prove the loop keeps running reader_lines.put_nowait(_event_line("boot")) await asyncio.sleep(0.15) events = client.event_queue.get_events(event_name="boot") assert len(events) == 1 async def test_oversized_line_is_dropped(self, mock_client): """Lines exceeding MAX_LINE_LENGTH should be dropped.""" client, reader_lines, writer = mock_client # 3000 bytes of 'a' is above the 2048 limit reader_lines.put_nowait(b"a" * 3000 + b"\n") reader_lines.put_nowait(_event_line("boot")) await asyncio.sleep(0.15) events = client.event_queue.get_events(event_name="boot") assert len(events) == 1 async def test_error_response_propagates(self, mock_client): """An error response should be returned (not raised) with status=error.""" client, reader_lines, writer = mock_client async def _respond_error(): await asyncio.sleep(0.05) raw = writer.write.call_args[0][0] cmd_id = json.loads(raw)["id"] reader_lines.put_nowait(_resp_line(cmd_id, status="error", data={"msg": "bad"})) task = asyncio.create_task(_respond_error()) resp = await client.send_command(CMD_PING) await task assert resp.status == Status.ERROR assert resp.data["msg"] == "bad"