mcesptool/tests/test_qemu_manager.py
Ryan Malloy d5dc9c99c0 Rename package to mcesptool
- Rename mcp-esptool-server -> mcesptool
- Update all imports and references
- Single entry point: mcesptool command
- New home: git.supported.systems/MCP/mcesptool
2026-02-06 21:08:53 -07:00

500 lines
16 KiB
Python

"""
Test QEMU Manager component
Tests lifecycle management, port allocation, flash image handling,
and integration with the scan system.
"""
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
import pytest
from mcesptool.components.qemu_manager import (
CHIP_MACHINES,
QemuInstance,
QemuManager,
_create_blank_flash,
)
from mcesptool.config import ESPToolServerConfig
@pytest.fixture
def config():
"""Config with QEMU paths pointing to real Espressif binaries (if installed)"""
return ESPToolServerConfig()
@pytest.fixture
def mock_app():
"""Mock FastMCP app that captures registered tools"""
app = MagicMock()
registered_tools = {}
def tool_decorator(name):
def decorator(func):
registered_tools[name] = func
return func
return decorator
app.tool = tool_decorator
app._registered_tools = registered_tools
return app
@pytest.fixture
def manager(mock_app, config):
return QemuManager(mock_app, config)
@pytest.fixture
def mock_context():
ctx = MagicMock()
ctx.log = AsyncMock()
ctx.progress = AsyncMock()
return ctx
class TestChipMachines:
def test_supported_chips(self):
assert "esp32" in CHIP_MACHINES
assert "esp32s3" in CHIP_MACHINES
assert "esp32c3" in CHIP_MACHINES
def test_xtensa_arch(self):
assert CHIP_MACHINES["esp32"]["arch"] == "xtensa"
assert CHIP_MACHINES["esp32s3"]["arch"] == "xtensa"
def test_riscv_arch(self):
assert CHIP_MACHINES["esp32c3"]["arch"] == "riscv"
def test_machine_names(self):
for _key, info in CHIP_MACHINES.items():
assert "machine" in info
assert "arch" in info
def test_efuse_data(self):
"""Chips with download mode support have efuse data"""
for chip in ("esp32", "esp32s3", "esp32c3"):
info = CHIP_MACHINES[chip]
assert "default_efuse" in info
assert isinstance(info["default_efuse"], bytes)
assert len(info["default_efuse"]) > 0
def test_download_strap(self):
"""Chips with download mode have GPIO strap config"""
for chip in ("esp32", "esp32s3", "esp32c3"):
info = CHIP_MACHINES[chip]
assert "download_strap" in info
assert "download_strap_value" in info
def test_wdt_driver(self):
"""Chips have watchdog timer driver names"""
for chip in ("esp32", "esp32s3", "esp32c3"):
info = CHIP_MACHINES[chip]
assert "wdt_driver" in info
class TestQemuInstance:
def test_socket_uri(self):
inst = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/flash.bin"),
flash_size_mb=4,
)
assert inst.socket_uri == "socket://localhost:5555"
def test_is_running_no_process(self):
inst = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/flash.bin"),
flash_size_mb=4,
)
assert not inst.is_running
def test_is_running_with_active_process(self):
proc = MagicMock()
proc.returncode = None # still running
inst = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/flash.bin"),
flash_size_mb=4,
process=proc,
)
assert inst.is_running
def test_is_running_with_exited_process(self):
proc = MagicMock()
proc.returncode = 0 # exited
inst = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/flash.bin"),
flash_size_mb=4,
process=proc,
)
assert not inst.is_running
class TestQemuManagerInit:
def test_registers_tools(self, mock_app, config):
QemuManager(mock_app, config)
tools = mock_app._registered_tools
assert "esp_qemu_start" in tools
assert "esp_qemu_stop" in tools
assert "esp_qemu_list" in tools
assert "esp_qemu_status" in tools
assert "esp_qemu_flash" in tools
def test_port_allocation(self, manager):
port = manager._allocate_port()
assert port == manager.config.qemu_base_port
def test_port_allocation_skips_used(self, manager):
# Simulate an occupied port
proc = MagicMock()
proc.returncode = None
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=manager.config.qemu_base_port,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
)
port = manager._allocate_port()
assert port == manager.config.qemu_base_port + 1
def test_port_allocation_exhausted(self, manager):
proc = MagicMock()
proc.returncode = None
for i in range(manager.config.qemu_max_instances):
manager.instances[f"qemu-{i}"] = QemuInstance(
instance_id=f"qemu-{i}",
chip_type="esp32",
tcp_port=manager.config.qemu_base_port + i,
flash_image=Path(f"/tmp/f{i}.bin"),
flash_size_mb=4,
process=proc,
)
assert manager._allocate_port() is None
def test_id_generation(self, manager):
assert manager._generate_id() == "qemu-1"
assert manager._generate_id() == "qemu-2"
def test_get_qemu_binary_xtensa(self, manager):
binary = manager._get_qemu_binary("xtensa")
assert binary == manager.config.qemu_xtensa_path
def test_get_qemu_binary_riscv(self, manager):
binary = manager._get_qemu_binary("riscv")
assert binary == manager.config.qemu_riscv_path
def test_get_qemu_binary_unknown(self, manager):
assert manager._get_qemu_binary("arm") is None
class TestStartImpl:
@pytest.mark.asyncio
async def test_unsupported_chip(self, manager, mock_context):
result = await manager._start_impl(mock_context, "esp8266", None, 4, None)
assert not result["success"]
assert "Unsupported chip" in result["error"]
@pytest.mark.asyncio
async def test_missing_binary(self, manager, mock_context):
manager.config.qemu_xtensa_path = "/nonexistent/qemu"
result = await manager._start_impl(mock_context, "esp32", None, 4, None)
assert not result["success"]
assert "not found" in result["error"]
@pytest.mark.asyncio
async def test_max_instances_reached(self, manager, mock_context):
proc = MagicMock()
proc.returncode = None
for i in range(manager.config.qemu_max_instances):
manager.instances[f"qemu-{i}"] = QemuInstance(
instance_id=f"qemu-{i}",
chip_type="esp32",
tcp_port=5555 + i,
flash_image=Path(f"/tmp/f{i}.bin"),
flash_size_mb=4,
process=proc,
)
result = await manager._start_impl(mock_context, "esp32", None, 4, None)
assert not result["success"]
assert "Maximum" in result["error"]
@pytest.mark.asyncio
async def test_missing_flash_image(self, manager, mock_context):
manager.config.qemu_xtensa_path = "/bin/true" # exists but not real qemu
result = await manager._start_impl(
mock_context, "esp32", "/nonexistent/flash.bin", 4, None
)
assert not result["success"]
assert "not found" in result["error"]
@pytest.mark.asyncio
async def test_port_conflict(self, manager, mock_context):
proc = MagicMock()
proc.returncode = None
manager.instances["qemu-0"] = QemuInstance(
instance_id="qemu-0",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
)
manager.config.qemu_xtensa_path = "/bin/true"
result = await manager._start_impl(mock_context, "esp32", None, 4, 5555)
assert not result["success"]
assert "already in use" in result["error"]
@pytest.mark.asyncio
async def test_invalid_boot_mode(self, manager, mock_context):
result = await manager._start_impl(
mock_context, "esp32", None, 4, None, boot_mode="invalid"
)
assert not result["success"]
assert "Invalid boot_mode" in result["error"]
@pytest.mark.asyncio
async def test_download_mode_default(self, manager, mock_context):
"""Verify download mode is the default boot_mode"""
manager.config.qemu_xtensa_path = "/nonexistent/qemu"
result = await manager._start_impl(mock_context, "esp32", None, 4, None)
# Should fail on missing binary, not boot_mode — proving download is default
assert not result["success"]
assert "not found" in result["error"]
class TestStopImpl:
@pytest.mark.asyncio
async def test_stop_nonexistent(self, manager, mock_context):
result = await manager._stop_impl(mock_context, "qemu-999")
assert not result["success"]
assert "not found" in result["error"]
@pytest.mark.asyncio
async def test_stop_running_instance(self, manager, mock_context):
proc = AsyncMock()
proc.returncode = None
proc.terminate = MagicMock()
proc.kill = MagicMock()
proc.wait = AsyncMock()
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
)
result = await manager._stop_impl(mock_context, "qemu-1")
assert result["success"]
assert "qemu-1" in result["stopped"]
proc.terminate.assert_called_once()
@pytest.mark.asyncio
async def test_stop_all(self, manager, mock_context):
for i in range(2):
proc = AsyncMock()
proc.returncode = None
proc.terminate = MagicMock()
proc.kill = MagicMock()
proc.wait = AsyncMock()
manager.instances[f"qemu-{i}"] = QemuInstance(
instance_id=f"qemu-{i}",
chip_type="esp32",
tcp_port=5555 + i,
flash_image=Path(f"/tmp/f{i}.bin"),
flash_size_mb=4,
process=proc,
)
result = await manager._stop_impl(mock_context, None)
assert result["success"]
assert len(result["stopped"]) == 2
class TestListImpl:
@pytest.mark.asyncio
async def test_list_empty(self, manager, mock_context):
result = await manager._list_impl(mock_context)
assert result["success"]
assert result["total"] == 0
@pytest.mark.asyncio
async def test_list_with_instances(self, manager, mock_context):
proc = MagicMock()
proc.returncode = None
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
started_at=1000.0,
)
result = await manager._list_impl(mock_context)
assert result["total"] == 1
assert result["running"] == 1
assert result["instances"][0]["socket_uri"] == "socket://localhost:5555"
class TestFlashImpl:
@pytest.mark.asyncio
async def test_flash_nonexistent_instance(self, manager, mock_context):
result = await manager._flash_impl(mock_context, "qemu-999", "/tmp/fw.bin", "0x0")
assert not result["success"]
@pytest.mark.asyncio
async def test_flash_running_instance(self, manager, mock_context):
proc = MagicMock()
proc.returncode = None
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=proc,
)
result = await manager._flash_impl(mock_context, "qemu-1", "/tmp/fw.bin", "0x0")
assert not result["success"]
assert "stopped" in result["error"].lower()
@pytest.mark.asyncio
async def test_flash_writes_data(self, manager, mock_context, tmp_path):
flash_file = tmp_path / "flash.bin"
flash_file.write_bytes(b"\xff" * 1024)
fw_file = tmp_path / "firmware.bin"
fw_file.write_bytes(b"\xde\xad\xbe\xef")
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=flash_file,
flash_size_mb=1,
process=None, # stopped
)
result = await manager._flash_impl(
mock_context, "qemu-1", str(fw_file), "0x100"
)
assert result["success"]
assert result["bytes_written"] == 4
# Verify data was written at correct offset
data = flash_file.read_bytes()
assert data[0x100:0x104] == b"\xde\xad\xbe\xef"
assert data[0x0FF] == 0xFF # byte before is unchanged
class TestBlankFlash:
def test_creates_correct_size(self, tmp_path):
path = tmp_path / "flash.bin"
_create_blank_flash(path, 2)
assert path.stat().st_size == 2 * 1024 * 1024
def test_all_ff(self, tmp_path):
path = tmp_path / "flash.bin"
_create_blank_flash(path, 1)
data = path.read_bytes()
assert all(b == 0xFF for b in data)
def test_creates_parent_dirs(self, tmp_path):
path = tmp_path / "nested" / "dir" / "flash.bin"
_create_blank_flash(path, 1)
assert path.exists()
class TestGetRunningPorts:
def test_empty(self, manager):
assert manager.get_running_ports() == []
def test_returns_running_only(self, manager):
running_proc = MagicMock()
running_proc.returncode = None
stopped_proc = MagicMock()
stopped_proc.returncode = 0
manager.instances["qemu-1"] = QemuInstance(
instance_id="qemu-1",
chip_type="esp32",
tcp_port=5555,
flash_image=Path("/tmp/f.bin"),
flash_size_mb=4,
process=running_proc,
)
manager.instances["qemu-2"] = QemuInstance(
instance_id="qemu-2",
chip_type="esp32c3",
tcp_port=5556,
flash_image=Path("/tmp/f2.bin"),
flash_size_mb=4,
process=stopped_proc,
)
ports = manager.get_running_ports()
assert len(ports) == 1
assert ports[0]["port"] == "socket://localhost:5555"
assert ports[0]["source"] == "qemu"
assert ports[0]["instance_id"] == "qemu-1"
class TestHealthCheck:
@pytest.mark.asyncio
async def test_health_check(self, manager):
result = await manager.health_check()
assert result["status"] == "healthy"
assert "running_instances" in result
assert "max_instances" in result
class TestShutdown:
@pytest.mark.asyncio
async def test_shutdown_kills_all(self, manager):
for i in range(2):
proc = AsyncMock()
proc.returncode = None
proc.terminate = MagicMock()
proc.kill = MagicMock()
proc.wait = AsyncMock()
manager.instances[f"qemu-{i}"] = QemuInstance(
instance_id=f"qemu-{i}",
chip_type="esp32",
tcp_port=5555 + i,
flash_image=Path(f"/tmp/f{i}.bin"),
flash_size_mb=4,
process=proc,
)
await manager.shutdown()
assert len(manager.instances) == 0
class TestConfigQemu:
def test_qemu_available(self):
config = ESPToolServerConfig()
# Should return True if auto-detected
result = config.get_qemu_available()
assert isinstance(result, bool)
def test_to_dict_includes_qemu(self):
config = ESPToolServerConfig()
d = config.to_dict()
assert "qemu_available" in d
assert "qemu_xtensa_path" in d
assert "qemu_riscv_path" in d