Add download mode, efuse emulation, and watchdog disable to QEMU manager

Smoke tested against real Espressif QEMU fork — esptool successfully
identifies, reads flash_id, and writes firmware to emulated ESP32 via
socket://localhost:5555. Enriched CHIP_MACHINES with per-chip efuse
data, GPIO strap values, and watchdog timer driver names derived from
ESP-IDF's qemu_ext.py.
This commit is contained in:
Ryan Malloy 2026-01-28 16:44:17 -07:00
parent 0d684088a4
commit 740164f582
2 changed files with 165 additions and 20 deletions

View File

@ -5,9 +5,15 @@ Manages Espressif QEMU fork instances for virtual ESP32 device emulation.
Each instance exposes a virtual serial port over TCP that esptool can connect Each instance exposes a virtual serial port over TCP that esptool can connect
to via socket://localhost:PORT, making QEMU devices transparent to all to via socket://localhost:PORT, making QEMU devices transparent to all
existing flash/chip operations. existing flash/chip operations.
Boot modes:
- "normal": Boots from flash (runs firmware). Use for testing app behavior.
- "download": GPIO strap forces ROM into serial bootloader mode.
esptool can connect and flash/read/identify the chip just like real hardware.
""" """
import asyncio import asyncio
import binascii
import logging import logging
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -20,12 +26,56 @@ from ..config import ESPToolServerConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Chip type to QEMU machine/binary mapping # Chip type to QEMU machine/binary/efuse mapping.
CHIP_MACHINES: dict[str, dict[str, str]] = { # Derived from ESP-IDF's tools/idf_py_actions/qemu_ext.py.
"esp32": {"machine": "esp32", "arch": "xtensa"}, CHIP_MACHINES: dict[str, dict[str, Any]] = {
"esp32s2": {"machine": "esp32s2", "arch": "xtensa"}, "esp32": {
"esp32s3": {"machine": "esp32s3", "arch": "xtensa"}, "machine": "esp32",
"esp32c3": {"machine": "esp32c3", "arch": "riscv"}, "arch": "xtensa",
"memory": "4M",
"efuse_device": "nvram.esp32.efuse",
"wdt_driver": "timer.esp32.timg",
"download_strap": "esp32.gpio",
"download_strap_value": "0x0f",
"default_efuse": binascii.unhexlify(
"00000000000000000000000000800000000000000000100000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000000000000000000000000000000000000000000000"
"00000000"
),
},
"esp32s2": {
"machine": "esp32s2",
"arch": "xtensa",
"memory": "4M",
},
"esp32s3": {
"machine": "esp32s3",
"arch": "xtensa",
"memory": "4M",
"efuse_device": "nvram.esp32c3.efuse", # QEMU-201: shares esp32c3 efuse device
"wdt_driver": "timer.esp32s3.timg",
"download_strap": "esp32s3.gpio",
"download_strap_value": "0x07",
"default_efuse": binascii.unhexlify(
"00000000000000000000000000000000000000000000000000000000000000000000000000000c00"
+ "00" * 920
+ "000000000000000000000000000000000000000000000000"
),
},
"esp32c3": {
"machine": "esp32c3",
"arch": "riscv",
"efuse_device": "nvram.esp32c3.efuse",
"wdt_driver": "timer.esp32c3.timg",
"download_strap": "esp32c3.gpio",
"download_strap_value": "0x02",
"default_efuse": binascii.unhexlify(
"00000000000000000000000000000000000000000000000000000000000000000000000000000c00"
+ "00" * 920
+ "000000000000000000000000000000000000000000000000"
),
},
} }
@ -42,6 +92,8 @@ class QemuInstance:
started_at: float = 0.0 started_at: float = 0.0
pid: int | None = None pid: int | None = None
extra_args: list[str] = field(default_factory=list) extra_args: list[str] = field(default_factory=list)
boot_mode: str = "normal"
efuse_image: Path | None = None
@property @property
def socket_uri(self) -> str: def socket_uri(self) -> str:
@ -94,6 +146,7 @@ class QemuManager:
flash_image: str | None = None, flash_image: str | None = None,
flash_size_mb: int = 4, flash_size_mb: int = 4,
tcp_port: int | None = None, tcp_port: int | None = None,
boot_mode: str = "download",
extra_args: list[str] | None = None, extra_args: list[str] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
""" """
@ -104,10 +157,11 @@ class QemuManager:
flash_image: Path to flash image file (creates blank if not specified) flash_image: Path to flash image file (creates blank if not specified)
flash_size_mb: Flash size in MB for blank images (default: 4) flash_size_mb: Flash size in MB for blank images (default: 4)
tcp_port: TCP port for virtual serial (auto-assigned if not specified) tcp_port: TCP port for virtual serial (auto-assigned if not specified)
boot_mode: "download" for esptool interaction (default), "normal" to boot from flash
extra_args: Additional QEMU command-line arguments extra_args: Additional QEMU command-line arguments
""" """
return await self._start_impl( return await self._start_impl(
context, chip_type, flash_image, flash_size_mb, tcp_port, extra_args context, chip_type, flash_image, flash_size_mb, tcp_port, boot_mode, extra_args
) )
@self.app.tool("esp_qemu_stop") @self.app.tool("esp_qemu_stop")
@ -167,10 +221,17 @@ class QemuManager:
flash_image: str | None, flash_image: str | None,
flash_size_mb: int, flash_size_mb: int,
tcp_port: int | None, tcp_port: int | None,
extra_args: list[str] | None, boot_mode: str = "download",
extra_args: list[str] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Start a QEMU instance""" """Start a QEMU instance"""
if boot_mode not in ("download", "normal"):
return {
"success": False,
"error": f"Invalid boot_mode: {boot_mode}. Use 'download' or 'normal'.",
}
# Validate chip type # Validate chip type
chip_key = chip_type.lower().replace("-", "").replace("_", "") chip_key = chip_type.lower().replace("-", "").replace("_", "")
if chip_key not in CHIP_MACHINES: if chip_key not in CHIP_MACHINES:
@ -211,28 +272,65 @@ class QemuManager:
return {"success": False, "error": f"Port {tcp_port} already in use"} return {"success": False, "error": f"Port {tcp_port} already in use"}
# Prepare flash image # Prepare flash image
resources_dir = Path(__file__).parent.parent / "resources" / "qemu"
resources_dir.mkdir(parents=True, exist_ok=True)
if flash_image: if flash_image:
flash_path = Path(flash_image) flash_path = Path(flash_image)
if not flash_path.exists(): if not flash_path.exists():
return {"success": False, "error": f"Flash image not found: {flash_image}"} return {"success": False, "error": f"Flash image not found: {flash_image}"}
else: else:
# Create a blank flash image in a temp-like location
resources_dir = Path(__file__).parent.parent / "resources" / "qemu"
resources_dir.mkdir(parents=True, exist_ok=True)
flash_path = resources_dir / f"flash_{chip_key}_{tcp_port}.bin" flash_path = resources_dir / f"flash_{chip_key}_{tcp_port}.bin"
if not flash_path.exists(): if not flash_path.exists():
_create_blank_flash(flash_path, flash_size_mb) _create_blank_flash(flash_path, flash_size_mb)
# Prepare efuse image (required for download mode and proper chip identification)
efuse_path: Path | None = None
if "default_efuse" in machine_info:
efuse_path = resources_dir / f"efuse_{chip_key}_{tcp_port}.bin"
if not efuse_path.exists():
efuse_path.write_bytes(machine_info["default_efuse"])
instance_id = self._generate_id() instance_id = self._generate_id()
# Build QEMU command # Build QEMU command
cmd = [ cmd = [
qemu_binary, qemu_binary,
"-nographic", "-nographic",
"-monitor", "none",
"-machine", machine_info["machine"], "-machine", machine_info["machine"],
"-drive", f"file={flash_path},if=mtd,format=raw",
"-serial", f"tcp::{tcp_port},server,nowait",
] ]
# Add memory size if specified
if "memory" in machine_info:
cmd.extend(["-m", machine_info["memory"]])
# Flash image drive
cmd.extend(["-drive", f"file={flash_path},if=mtd,format=raw"])
# eFuse emulation (enables chip_id, MAC, and revision reporting)
if efuse_path and "efuse_device" in machine_info:
cmd.extend([
"-drive", f"file={efuse_path},if=none,format=raw,id=efuse",
"-global", f"driver={machine_info['efuse_device']},property=drive,value=efuse",
])
# Disable watchdog timer (QEMU timing doesn't match real hardware)
if "wdt_driver" in machine_info:
cmd.extend([
"-global", f"driver={machine_info['wdt_driver']},property=wdt_disable,value=true",
])
# Download mode: GPIO strap tells ROM to enter serial bootloader
if boot_mode == "download" and "download_strap" in machine_info:
cmd.extend([
"-global",
f"driver={machine_info['download_strap']},property=strap_mode,value={machine_info['download_strap_value']}",
])
# TCP serial port
cmd.extend(["-serial", f"tcp::{tcp_port},server,nowait"])
if extra_args: if extra_args:
cmd.extend(extra_args) cmd.extend(extra_args)
@ -264,11 +362,14 @@ class QemuManager:
started_at=time.time(), started_at=time.time(),
pid=proc.pid, pid=proc.pid,
extra_args=extra_args or [], extra_args=extra_args or [],
boot_mode=boot_mode,
efuse_image=efuse_path,
) )
self.instances[instance_id] = instance self.instances[instance_id] = instance
logger.info( logger.info(
f"Started QEMU {chip_key} instance {instance_id} on port {tcp_port} (PID {proc.pid})" f"Started QEMU {chip_key} instance {instance_id} on port {tcp_port} "
f"(PID {proc.pid}, mode={boot_mode})"
) )
return { return {
@ -278,8 +379,13 @@ class QemuManager:
"tcp_port": tcp_port, "tcp_port": tcp_port,
"socket_uri": instance.socket_uri, "socket_uri": instance.socket_uri,
"flash_image": str(flash_path), "flash_image": str(flash_path),
"boot_mode": boot_mode,
"pid": proc.pid, "pid": proc.pid,
"hint": f"Use port='{instance.socket_uri}' with other esp_ tools to interact with this virtual device", "hint": (
f"Use port='{instance.socket_uri}' with other esp_ tools to interact with this virtual device"
if boot_mode == "download"
else f"Instance booting from flash. Connect to serial output at {instance.socket_uri}"
),
} }
except FileNotFoundError: except FileNotFoundError:
@ -375,6 +481,7 @@ class QemuManager:
"socket_uri": instance.socket_uri, "socket_uri": instance.socket_uri,
"flash_image": str(instance.flash_image), "flash_image": str(instance.flash_image),
"flash_size_mb": instance.flash_size_mb, "flash_size_mb": instance.flash_size_mb,
"boot_mode": instance.boot_mode,
"running": instance.is_running, "running": instance.is_running,
"pid": instance.pid, "pid": instance.pid,
"started_at": instance.started_at, "started_at": instance.started_at,

View File

@ -73,6 +73,27 @@ class TestChipMachines:
assert "machine" in info assert "machine" in info
assert "arch" in info assert "arch" in info
def test_efuse_data(self):
"""Chips with download mode support have efuse data"""
for chip in ("esp32", "esp32s3", "esp32c3"):
info = CHIP_MACHINES[chip]
assert "default_efuse" in info
assert isinstance(info["default_efuse"], bytes)
assert len(info["default_efuse"]) > 0
def test_download_strap(self):
"""Chips with download mode have GPIO strap config"""
for chip in ("esp32", "esp32s3", "esp32c3"):
info = CHIP_MACHINES[chip]
assert "download_strap" in info
assert "download_strap_value" in info
def test_wdt_driver(self):
"""Chips have watchdog timer driver names"""
for chip in ("esp32", "esp32s3", "esp32c3"):
info = CHIP_MACHINES[chip]
assert "wdt_driver" in info
class TestQemuInstance: class TestQemuInstance:
def test_socket_uri(self): def test_socket_uri(self):
@ -184,14 +205,14 @@ class TestQemuManagerInit:
class TestStartImpl: class TestStartImpl:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_unsupported_chip(self, manager, mock_context): async def test_unsupported_chip(self, manager, mock_context):
result = await manager._start_impl(mock_context, "esp8266", None, 4, None, None) result = await manager._start_impl(mock_context, "esp8266", None, 4, None)
assert not result["success"] assert not result["success"]
assert "Unsupported chip" in result["error"] assert "Unsupported chip" in result["error"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_missing_binary(self, manager, mock_context): async def test_missing_binary(self, manager, mock_context):
manager.config.qemu_xtensa_path = "/nonexistent/qemu" manager.config.qemu_xtensa_path = "/nonexistent/qemu"
result = await manager._start_impl(mock_context, "esp32", None, 4, None, None) result = await manager._start_impl(mock_context, "esp32", None, 4, None)
assert not result["success"] assert not result["success"]
assert "not found" in result["error"] assert "not found" in result["error"]
@ -208,7 +229,7 @@ class TestStartImpl:
flash_size_mb=4, flash_size_mb=4,
process=proc, process=proc,
) )
result = await manager._start_impl(mock_context, "esp32", None, 4, None, None) result = await manager._start_impl(mock_context, "esp32", None, 4, None)
assert not result["success"] assert not result["success"]
assert "Maximum" in result["error"] assert "Maximum" in result["error"]
@ -216,7 +237,7 @@ class TestStartImpl:
async def test_missing_flash_image(self, manager, mock_context): async def test_missing_flash_image(self, manager, mock_context):
manager.config.qemu_xtensa_path = "/bin/true" # exists but not real qemu manager.config.qemu_xtensa_path = "/bin/true" # exists but not real qemu
result = await manager._start_impl( result = await manager._start_impl(
mock_context, "esp32", "/nonexistent/flash.bin", 4, None, None mock_context, "esp32", "/nonexistent/flash.bin", 4, None
) )
assert not result["success"] assert not result["success"]
assert "not found" in result["error"] assert "not found" in result["error"]
@ -234,10 +255,27 @@ class TestStartImpl:
process=proc, process=proc,
) )
manager.config.qemu_xtensa_path = "/bin/true" manager.config.qemu_xtensa_path = "/bin/true"
result = await manager._start_impl(mock_context, "esp32", None, 4, 5555, None) result = await manager._start_impl(mock_context, "esp32", None, 4, 5555)
assert not result["success"] assert not result["success"]
assert "already in use" in result["error"] assert "already in use" in result["error"]
@pytest.mark.asyncio
async def test_invalid_boot_mode(self, manager, mock_context):
result = await manager._start_impl(
mock_context, "esp32", None, 4, None, boot_mode="invalid"
)
assert not result["success"]
assert "Invalid boot_mode" in result["error"]
@pytest.mark.asyncio
async def test_download_mode_default(self, manager, mock_context):
"""Verify download mode is the default boot_mode"""
manager.config.qemu_xtensa_path = "/nonexistent/qemu"
result = await manager._start_impl(mock_context, "esp32", None, 4, None)
# Should fail on missing binary, not boot_mode — proving download is default
assert not result["success"]
assert "not found" in result["error"]
class TestStopImpl: class TestStopImpl:
@pytest.mark.asyncio @pytest.mark.asyncio