diff --git a/src/mcp_esptool_server/components/qemu_manager.py b/src/mcp_esptool_server/components/qemu_manager.py index 77d2581..418af1e 100644 --- a/src/mcp_esptool_server/components/qemu_manager.py +++ b/src/mcp_esptool_server/components/qemu_manager.py @@ -5,9 +5,15 @@ Manages Espressif QEMU fork instances for virtual ESP32 device emulation. Each instance exposes a virtual serial port over TCP that esptool can connect to via socket://localhost:PORT, making QEMU devices transparent to all existing flash/chip operations. + +Boot modes: + - "normal": Boots from flash (runs firmware). Use for testing app behavior. + - "download": GPIO strap forces ROM into serial bootloader mode. + esptool can connect and flash/read/identify the chip just like real hardware. """ import asyncio +import binascii import logging import time from dataclasses import dataclass, field @@ -20,12 +26,56 @@ from ..config import ESPToolServerConfig logger = logging.getLogger(__name__) -# Chip type to QEMU machine/binary mapping -CHIP_MACHINES: dict[str, dict[str, str]] = { - "esp32": {"machine": "esp32", "arch": "xtensa"}, - "esp32s2": {"machine": "esp32s2", "arch": "xtensa"}, - "esp32s3": {"machine": "esp32s3", "arch": "xtensa"}, - "esp32c3": {"machine": "esp32c3", "arch": "riscv"}, +# Chip type to QEMU machine/binary/efuse mapping. +# Derived from ESP-IDF's tools/idf_py_actions/qemu_ext.py. +CHIP_MACHINES: dict[str, dict[str, Any]] = { + "esp32": { + "machine": "esp32", + "arch": "xtensa", + "memory": "4M", + "efuse_device": "nvram.esp32.efuse", + "wdt_driver": "timer.esp32.timg", + "download_strap": "esp32.gpio", + "download_strap_value": "0x0f", + "default_efuse": binascii.unhexlify( + "00000000000000000000000000800000000000000000100000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000000000000000000000000" + "00000000000000000000000000000000000000000000000000000000000000000000000000000000" + "00000000" + ), + }, + "esp32s2": { + "machine": "esp32s2", + "arch": "xtensa", + "memory": "4M", + }, + "esp32s3": { + "machine": "esp32s3", + "arch": "xtensa", + "memory": "4M", + "efuse_device": "nvram.esp32c3.efuse", # QEMU-201: shares esp32c3 efuse device + "wdt_driver": "timer.esp32s3.timg", + "download_strap": "esp32s3.gpio", + "download_strap_value": "0x07", + "default_efuse": binascii.unhexlify( + "00000000000000000000000000000000000000000000000000000000000000000000000000000c00" + + "00" * 920 + + "000000000000000000000000000000000000000000000000" + ), + }, + "esp32c3": { + "machine": "esp32c3", + "arch": "riscv", + "efuse_device": "nvram.esp32c3.efuse", + "wdt_driver": "timer.esp32c3.timg", + "download_strap": "esp32c3.gpio", + "download_strap_value": "0x02", + "default_efuse": binascii.unhexlify( + "00000000000000000000000000000000000000000000000000000000000000000000000000000c00" + + "00" * 920 + + "000000000000000000000000000000000000000000000000" + ), + }, } @@ -42,6 +92,8 @@ class QemuInstance: started_at: float = 0.0 pid: int | None = None extra_args: list[str] = field(default_factory=list) + boot_mode: str = "normal" + efuse_image: Path | None = None @property def socket_uri(self) -> str: @@ -94,6 +146,7 @@ class QemuManager: flash_image: str | None = None, flash_size_mb: int = 4, tcp_port: int | None = None, + boot_mode: str = "download", extra_args: list[str] | None = None, ) -> dict[str, Any]: """ @@ -104,10 +157,11 @@ class QemuManager: flash_image: Path to flash image file (creates blank if not specified) flash_size_mb: Flash size in MB for blank images (default: 4) tcp_port: TCP port for virtual serial (auto-assigned if not specified) + boot_mode: "download" for esptool interaction (default), "normal" to boot from flash extra_args: Additional QEMU command-line arguments """ return await self._start_impl( - context, chip_type, flash_image, flash_size_mb, tcp_port, extra_args + context, chip_type, flash_image, flash_size_mb, tcp_port, boot_mode, extra_args ) @self.app.tool("esp_qemu_stop") @@ -167,10 +221,17 @@ class QemuManager: flash_image: str | None, flash_size_mb: int, tcp_port: int | None, - extra_args: list[str] | None, + boot_mode: str = "download", + extra_args: list[str] | None = None, ) -> dict[str, Any]: """Start a QEMU instance""" + if boot_mode not in ("download", "normal"): + return { + "success": False, + "error": f"Invalid boot_mode: {boot_mode}. Use 'download' or 'normal'.", + } + # Validate chip type chip_key = chip_type.lower().replace("-", "").replace("_", "") if chip_key not in CHIP_MACHINES: @@ -211,28 +272,65 @@ class QemuManager: return {"success": False, "error": f"Port {tcp_port} already in use"} # Prepare flash image + resources_dir = Path(__file__).parent.parent / "resources" / "qemu" + resources_dir.mkdir(parents=True, exist_ok=True) + if flash_image: flash_path = Path(flash_image) if not flash_path.exists(): return {"success": False, "error": f"Flash image not found: {flash_image}"} else: - # Create a blank flash image in a temp-like location - resources_dir = Path(__file__).parent.parent / "resources" / "qemu" - resources_dir.mkdir(parents=True, exist_ok=True) flash_path = resources_dir / f"flash_{chip_key}_{tcp_port}.bin" if not flash_path.exists(): _create_blank_flash(flash_path, flash_size_mb) + # Prepare efuse image (required for download mode and proper chip identification) + efuse_path: Path | None = None + if "default_efuse" in machine_info: + efuse_path = resources_dir / f"efuse_{chip_key}_{tcp_port}.bin" + if not efuse_path.exists(): + efuse_path.write_bytes(machine_info["default_efuse"]) + instance_id = self._generate_id() # Build QEMU command cmd = [ qemu_binary, "-nographic", + "-monitor", "none", "-machine", machine_info["machine"], - "-drive", f"file={flash_path},if=mtd,format=raw", - "-serial", f"tcp::{tcp_port},server,nowait", ] + + # Add memory size if specified + if "memory" in machine_info: + cmd.extend(["-m", machine_info["memory"]]) + + # Flash image drive + cmd.extend(["-drive", f"file={flash_path},if=mtd,format=raw"]) + + # eFuse emulation (enables chip_id, MAC, and revision reporting) + if efuse_path and "efuse_device" in machine_info: + cmd.extend([ + "-drive", f"file={efuse_path},if=none,format=raw,id=efuse", + "-global", f"driver={machine_info['efuse_device']},property=drive,value=efuse", + ]) + + # Disable watchdog timer (QEMU timing doesn't match real hardware) + if "wdt_driver" in machine_info: + cmd.extend([ + "-global", f"driver={machine_info['wdt_driver']},property=wdt_disable,value=true", + ]) + + # Download mode: GPIO strap tells ROM to enter serial bootloader + if boot_mode == "download" and "download_strap" in machine_info: + cmd.extend([ + "-global", + f"driver={machine_info['download_strap']},property=strap_mode,value={machine_info['download_strap_value']}", + ]) + + # TCP serial port + cmd.extend(["-serial", f"tcp::{tcp_port},server,nowait"]) + if extra_args: cmd.extend(extra_args) @@ -264,11 +362,14 @@ class QemuManager: started_at=time.time(), pid=proc.pid, extra_args=extra_args or [], + boot_mode=boot_mode, + efuse_image=efuse_path, ) self.instances[instance_id] = instance logger.info( - f"Started QEMU {chip_key} instance {instance_id} on port {tcp_port} (PID {proc.pid})" + f"Started QEMU {chip_key} instance {instance_id} on port {tcp_port} " + f"(PID {proc.pid}, mode={boot_mode})" ) return { @@ -278,8 +379,13 @@ class QemuManager: "tcp_port": tcp_port, "socket_uri": instance.socket_uri, "flash_image": str(flash_path), + "boot_mode": boot_mode, "pid": proc.pid, - "hint": f"Use port='{instance.socket_uri}' with other esp_ tools to interact with this virtual device", + "hint": ( + f"Use port='{instance.socket_uri}' with other esp_ tools to interact with this virtual device" + if boot_mode == "download" + else f"Instance booting from flash. Connect to serial output at {instance.socket_uri}" + ), } except FileNotFoundError: @@ -375,6 +481,7 @@ class QemuManager: "socket_uri": instance.socket_uri, "flash_image": str(instance.flash_image), "flash_size_mb": instance.flash_size_mb, + "boot_mode": instance.boot_mode, "running": instance.is_running, "pid": instance.pid, "started_at": instance.started_at, diff --git a/tests/test_qemu_manager.py b/tests/test_qemu_manager.py index f5590bb..232bfb6 100644 --- a/tests/test_qemu_manager.py +++ b/tests/test_qemu_manager.py @@ -73,6 +73,27 @@ class TestChipMachines: assert "machine" in info assert "arch" in info + def test_efuse_data(self): + """Chips with download mode support have efuse data""" + for chip in ("esp32", "esp32s3", "esp32c3"): + info = CHIP_MACHINES[chip] + assert "default_efuse" in info + assert isinstance(info["default_efuse"], bytes) + assert len(info["default_efuse"]) > 0 + + def test_download_strap(self): + """Chips with download mode have GPIO strap config""" + for chip in ("esp32", "esp32s3", "esp32c3"): + info = CHIP_MACHINES[chip] + assert "download_strap" in info + assert "download_strap_value" in info + + def test_wdt_driver(self): + """Chips have watchdog timer driver names""" + for chip in ("esp32", "esp32s3", "esp32c3"): + info = CHIP_MACHINES[chip] + assert "wdt_driver" in info + class TestQemuInstance: def test_socket_uri(self): @@ -184,14 +205,14 @@ class TestQemuManagerInit: class TestStartImpl: @pytest.mark.asyncio async def test_unsupported_chip(self, manager, mock_context): - result = await manager._start_impl(mock_context, "esp8266", None, 4, None, None) + result = await manager._start_impl(mock_context, "esp8266", None, 4, None) assert not result["success"] assert "Unsupported chip" in result["error"] @pytest.mark.asyncio async def test_missing_binary(self, manager, mock_context): manager.config.qemu_xtensa_path = "/nonexistent/qemu" - result = await manager._start_impl(mock_context, "esp32", None, 4, None, None) + result = await manager._start_impl(mock_context, "esp32", None, 4, None) assert not result["success"] assert "not found" in result["error"] @@ -208,7 +229,7 @@ class TestStartImpl: flash_size_mb=4, process=proc, ) - result = await manager._start_impl(mock_context, "esp32", None, 4, None, None) + result = await manager._start_impl(mock_context, "esp32", None, 4, None) assert not result["success"] assert "Maximum" in result["error"] @@ -216,7 +237,7 @@ class TestStartImpl: async def test_missing_flash_image(self, manager, mock_context): manager.config.qemu_xtensa_path = "/bin/true" # exists but not real qemu result = await manager._start_impl( - mock_context, "esp32", "/nonexistent/flash.bin", 4, None, None + mock_context, "esp32", "/nonexistent/flash.bin", 4, None ) assert not result["success"] assert "not found" in result["error"] @@ -234,10 +255,27 @@ class TestStartImpl: process=proc, ) manager.config.qemu_xtensa_path = "/bin/true" - result = await manager._start_impl(mock_context, "esp32", None, 4, 5555, None) + result = await manager._start_impl(mock_context, "esp32", None, 4, 5555) assert not result["success"] assert "already in use" in result["error"] + @pytest.mark.asyncio + async def test_invalid_boot_mode(self, manager, mock_context): + result = await manager._start_impl( + mock_context, "esp32", None, 4, None, boot_mode="invalid" + ) + assert not result["success"] + assert "Invalid boot_mode" in result["error"] + + @pytest.mark.asyncio + async def test_download_mode_default(self, manager, mock_context): + """Verify download mode is the default boot_mode""" + manager.config.qemu_xtensa_path = "/nonexistent/qemu" + result = await manager._start_impl(mock_context, "esp32", None, 4, None) + # Should fail on missing binary, not boot_mode — proving download is default + assert not result["success"] + assert "not found" in result["error"] + class TestStopImpl: @pytest.mark.asyncio