mcesptool/tests/test_qemu_manager.py
Ryan Malloy 64c1505a00 Add QEMU ESP32 emulation support
Integrate Espressif's QEMU fork for virtual ESP device management:

- QemuManager component with 5 MCP tools (start/stop/list/status/flash)
- Config auto-detects QEMU binaries from ~/.espressif/tools/
- Supports esp32, esp32s2, esp32s3, esp32c3 chip emulation
- Virtual serial over TCP (socket://localhost:PORT) transparent to esptool
- Scan integration: QEMU instances appear in esp_scan_ports results
- Blank flash images initialized to 0xFF (erased NOR flash state)
- 38 unit tests covering lifecycle, port allocation, flash writes
2026-01-28 15:35:22 -07:00

462 lines
15 KiB
Python

"""
Test QEMU Manager component
Tests lifecycle management, port allocation, flash image handling,
and integration with the scan system.
"""
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from mcp_esptool_server.components.qemu_manager import (
CHIP_MACHINES,
QemuInstance,
QemuManager,
_create_blank_flash,
)
from mcp_esptool_server.config import ESPToolServerConfig
@pytest.fixture
def config():
"""Config with QEMU paths pointing to real Espressif binaries (if installed)"""
return ESPToolServerConfig()
@pytest.fixture
def mock_app():
"""Mock FastMCP app that captures registered tools"""
app = MagicMock()
registered_tools = {}
def tool_decorator(name):
def decorator(func):
registered_tools[name] = func
return func
return decorator
app.tool = tool_decorator
app._registered_tools = registered_tools
return app
@pytest.fixture
def manager(mock_app, config):
return QemuManager(mock_app, config)
@pytest.fixture
def mock_context():
ctx = MagicMock()
ctx.log = AsyncMock()
ctx.progress = AsyncMock()
return ctx
class TestChipMachines:
def test_supported_chips(self):
assert "esp32" in CHIP_MACHINES
assert "esp32s3" in CHIP_MACHINES
assert "esp32c3" in CHIP_MACHINES
def test_xtensa_arch(self):
assert CHIP_MACHINES["esp32"]["arch"] == "xtensa"
assert CHIP_MACHINES["esp32s3"]["arch"] == "xtensa"
def test_riscv_arch(self):
assert CHIP_MACHINES["esp32c3"]["arch"] == "riscv"
def test_machine_names(self):
for _key, info in CHIP_MACHINES.items():
assert "machine" in info
assert "arch" in info
class TestQemuInstance:
def test_socket_uri(self):
inst = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/flash.bin"),
flash_size_mb=4,
)
assert inst.socket_uri == "socket://localhost:5555"
def test_is_running_no_process(self):
inst = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/flash.bin"),
flash_size_mb=4,
)
assert not inst.is_running
def test_is_running_with_active_process(self):
proc = MagicMock()
proc.returncode = None # still running
inst = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/flash.bin"),
flash_size_mb=4,
process=proc,
)
assert inst.is_running
def test_is_running_with_exited_process(self):
proc = MagicMock()
proc.returncode = 0 # exited
inst = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/flash.bin"),
flash_size_mb=4,
process=proc,
)
assert not inst.is_running
class TestQemuManagerInit:
def test_registers_tools(self, mock_app, config):
QemuManager(mock_app, config)
tools = mock_app._registered_tools
assert "esp_qemu_start" in tools
assert "esp_qemu_stop" in tools
assert "esp_qemu_list" in tools
assert "esp_qemu_status" in tools
assert "esp_qemu_flash" in tools
def test_port_allocation(self, manager):
port = manager._allocate_port()
assert port == manager.config.qemu_base_port
def test_port_allocation_skips_used(self, manager):
# Simulate an occupied port
proc = MagicMock()
proc.returncode = None
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=manager.config.qemu_base_port,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
)
port = manager._allocate_port()
assert port == manager.config.qemu_base_port + 1
def test_port_allocation_exhausted(self, manager):
proc = MagicMock()
proc.returncode = None
for i in range(manager.config.qemu_max_instances):
manager.instances[f"qemu-{i}"] = QemuInstance(
instance_id=f"qemu-{i}",
chip_type="esp32",
tcp_port=manager.config.qemu_base_port + i,
flash_image=Path(f"/tmp/f{i}.bin"),
flash_size_mb=4,
process=proc,
)
assert manager._allocate_port() is None
def test_id_generation(self, manager):
assert manager._generate_id() == "qemu-1"
assert manager._generate_id() == "qemu-2"
def test_get_qemu_binary_xtensa(self, manager):
binary = manager._get_qemu_binary("xtensa")
assert binary == manager.config.qemu_xtensa_path
def test_get_qemu_binary_riscv(self, manager):
binary = manager._get_qemu_binary("riscv")
assert binary == manager.config.qemu_riscv_path
def test_get_qemu_binary_unknown(self, manager):
assert manager._get_qemu_binary("arm") is None
class TestStartImpl:
@pytest.mark.asyncio
async def test_unsupported_chip(self, manager, mock_context):
result = await manager._start_impl(mock_context, "esp8266", None, 4, None, None)
assert not result["success"]
assert "Unsupported chip" in result["error"]
@pytest.mark.asyncio
async def test_missing_binary(self, manager, mock_context):
manager.config.qemu_xtensa_path = "/nonexistent/qemu"
result = await manager._start_impl(mock_context, "esp32", None, 4, None, None)
assert not result["success"]
assert "not found" in result["error"]
@pytest.mark.asyncio
async def test_max_instances_reached(self, manager, mock_context):
proc = MagicMock()
proc.returncode = None
for i in range(manager.config.qemu_max_instances):
manager.instances[f"qemu-{i}"] = QemuInstance(
instance_id=f"qemu-{i}",
chip_type="esp32",
tcp_port=5555 + i,
flash_image=Path(f"/tmp/f{i}.bin"),
flash_size_mb=4,
process=proc,
)
result = await manager._start_impl(mock_context, "esp32", None, 4, None, None)
assert not result["success"]
assert "Maximum" in result["error"]
@pytest.mark.asyncio
async def test_missing_flash_image(self, manager, mock_context):
manager.config.qemu_xtensa_path = "/bin/true" # exists but not real qemu
result = await manager._start_impl(
mock_context, "esp32", "/nonexistent/flash.bin", 4, None, None
)
assert not result["success"]
assert "not found" in result["error"]
@pytest.mark.asyncio
async def test_port_conflict(self, manager, mock_context):
proc = MagicMock()
proc.returncode = None
manager.instances["qemu-0"] = QemuInstance(
instance_id="qemu-0",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
)
manager.config.qemu_xtensa_path = "/bin/true"
result = await manager._start_impl(mock_context, "esp32", None, 4, 5555, None)
assert not result["success"]
assert "already in use" in result["error"]
class TestStopImpl:
@pytest.mark.asyncio
async def test_stop_nonexistent(self, manager, mock_context):
result = await manager._stop_impl(mock_context, "qemu-999")
assert not result["success"]
assert "not found" in result["error"]
@pytest.mark.asyncio
async def test_stop_running_instance(self, manager, mock_context):
proc = AsyncMock()
proc.returncode = None
proc.terminate = MagicMock()
proc.kill = MagicMock()
proc.wait = AsyncMock()
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
)
result = await manager._stop_impl(mock_context, "qemu-1")
assert result["success"]
assert "qemu-1" in result["stopped"]
proc.terminate.assert_called_once()
@pytest.mark.asyncio
async def test_stop_all(self, manager, mock_context):
for i in range(2):
proc = AsyncMock()
proc.returncode = None
proc.terminate = MagicMock()
proc.kill = MagicMock()
proc.wait = AsyncMock()
manager.instances[f"qemu-{i}"] = QemuInstance(
instance_id=f"qemu-{i}",
chip_type="esp32",
tcp_port=5555 + i,
flash_image=Path(f"/tmp/f{i}.bin"),
flash_size_mb=4,
process=proc,
)
result = await manager._stop_impl(mock_context, None)
assert result["success"]
assert len(result["stopped"]) == 2
class TestListImpl:
@pytest.mark.asyncio
async def test_list_empty(self, manager, mock_context):
result = await manager._list_impl(mock_context)
assert result["success"]
assert result["total"] == 0
@pytest.mark.asyncio
async def test_list_with_instances(self, manager, mock_context):
proc = MagicMock()
proc.returncode = None
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
started_at=1000.0,
)
result = await manager._list_impl(mock_context)
assert result["total"] == 1
assert result["running"] == 1
assert result["instances"][0]["socket_uri"] == "socket://localhost:5555"
class TestFlashImpl:
@pytest.mark.asyncio
async def test_flash_nonexistent_instance(self, manager, mock_context):
result = await manager._flash_impl(mock_context, "qemu-999", "/tmp/fw.bin", "0x0")
assert not result["success"]
@pytest.mark.asyncio
async def test_flash_running_instance(self, manager, mock_context):
proc = MagicMock()
proc.returncode = None
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
)
result = await manager._flash_impl(mock_context, "qemu-1", "/tmp/fw.bin", "0x0")
assert not result["success"]
assert "stopped" in result["error"].lower()
@pytest.mark.asyncio
async def test_flash_writes_data(self, manager, mock_context, tmp_path):
flash_file = tmp_path / "flash.bin"
flash_file.write_bytes(b"\xff" * 1024)
fw_file = tmp_path / "firmware.bin"
fw_file.write_bytes(b"\xde\xad\xbe\xef")
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=flash_file,
flash_size_mb=1,
process=None, # stopped
)
result = await manager._flash_impl(
mock_context, "qemu-1", str(fw_file), "0x100"
)
assert result["success"]
assert result["bytes_written"] == 4
# Verify data was written at correct offset
data = flash_file.read_bytes()
assert data[0x100:0x104] == b"\xde\xad\xbe\xef"
assert data[0x0FF] == 0xFF # byte before is unchanged
class TestBlankFlash:
def test_creates_correct_size(self, tmp_path):
path = tmp_path / "flash.bin"
_create_blank_flash(path, 2)
assert path.stat().st_size == 2 * 1024 * 1024
def test_all_ff(self, tmp_path):
path = tmp_path / "flash.bin"
_create_blank_flash(path, 1)
data = path.read_bytes()
assert all(b == 0xFF for b in data)
def test_creates_parent_dirs(self, tmp_path):
path = tmp_path / "nested" / "dir" / "flash.bin"
_create_blank_flash(path, 1)
assert path.exists()
class TestGetRunningPorts:
def test_empty(self, manager):
assert manager.get_running_ports() == []
def test_returns_running_only(self, manager):
running_proc = MagicMock()
running_proc.returncode = None
stopped_proc = MagicMock()
stopped_proc.returncode = 0
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=running_proc,
)
manager.instances["qemu-2"] = QemuInstance(
instance_id="qemu-2",
chip_type="esp32c3",
tcp_port=5556,
flash_image=Path("/tmp/f2.bin"),
flash_size_mb=4,
process=stopped_proc,
)
ports = manager.get_running_ports()
assert len(ports) == 1
assert ports[0]["port"] == "socket://localhost:5555"
assert ports[0]["source"] == "qemu"
assert ports[0]["instance_id"] == "qemu-1"
class TestHealthCheck:
@pytest.mark.asyncio
async def test_health_check(self, manager):
result = await manager.health_check()
assert result["status"] == "healthy"
assert "running_instances" in result
assert "max_instances" in result
class TestShutdown:
@pytest.mark.asyncio
async def test_shutdown_kills_all(self, manager):
for i in range(2):
proc = AsyncMock()
proc.returncode = None
proc.terminate = MagicMock()
proc.kill = MagicMock()
proc.wait = AsyncMock()
manager.instances[f"qemu-{i}"] = QemuInstance(
instance_id=f"qemu-{i}",
chip_type="esp32",
tcp_port=5555 + i,
flash_image=Path(f"/tmp/f{i}.bin"),
flash_size_mb=4,
process=proc,
)
await manager.shutdown()
assert len(manager.instances) == 0
class TestConfigQemu:
def test_qemu_available(self):
config = ESPToolServerConfig()
# Should return True if auto-detected
result = config.get_qemu_available()
assert isinstance(result, bool)
def test_to_dict_includes_qemu(self):
config = ESPToolServerConfig()
d = config.to_dict()
assert "qemu_available" in d
assert "qemu_xtensa_path" in d
assert "qemu_riscv_path" in d