From 57953fc60f3e8f4e738c8fa1928f20da064d2448 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Fri, 6 Feb 2026 01:02:16 -0700 Subject: [PATCH] Add flash workflow and development iteration tools New tools: - esp_flash_multi: Flash multiple binaries at different addresses in one operation - esp_verify_flash: Verify flash contents match a file without re-flashing - esp_load_ram: Load and execute binary in RAM for rapid development - esp_serial_monitor: Capture serial output from device - esp_elf_to_ram_binary: Convert ELF to RAM-loadable binary Fixes: - Use hyphenated esptool commands (load-ram, verify-flash) to avoid deprecation warnings - Fix verify_flash parser for esptool v5.x output format - Fix flash_multi command ordering for esptool v5.x CLI structure Documentation updated with flash operations table and RAM loading workflow. --- README.md | 57 +++++ .../components/chip_control.py | 209 ++++++++++++++++++ .../components/firmware_builder.py | 87 ++++++++ .../components/flash_manager.py | 207 +++++++++++++++++ 4 files changed, 560 insertions(+) diff --git a/README.md b/README.md index 8d04ee2..f8a0791 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,63 @@ The server implements a component-based architecture with middleware for CLI too - `Diagnostics`: Memory dumps and performance profiling - `QemuManager`: QEMU-based ESP32 emulation with download mode, efuse, and flash support +## Flash Operations + +Advanced flash management tools for efficient firmware deployment: + +| Tool | Description | +|------|-------------| +| `esp_flash_firmware` | Flash a single binary to device | +| `esp_flash_multi` | Flash multiple binaries at different addresses in one operation | +| `esp_verify_flash` | Verify flash contents match a file without re-flashing | +| `esp_flash_read` | Read flash memory to a file | +| `esp_flash_erase` | Erase flash regions | +| `esp_flash_backup` | Create complete flash backup | + +### Multi-File Flashing + +Flash bootloader, partition table, and app in a single operation: + +```python +esp_flash_multi( + files=[ + {"address": "0x0", "path": "bootloader.bin"}, + {"address": "0x8000", "path": "partitions.bin"}, + {"address": "0x10000", "path": "app.bin"} + ], + port="/dev/ttyUSB0", + verify=True +) +``` + +## RAM Loading (Development Iteration) + +Test firmware changes without wearing out flash: + +| Tool | Description | +|------|-------------| +| `esp_elf_to_ram_binary` | Convert ELF to RAM-loadable binary | +| `esp_load_ram` | Load and execute binary in RAM | +| `esp_serial_monitor` | Capture serial output from device | + +### Workflow + +```bash +# 1. Build your ESP-IDF project +idf.py build + +# 2. Convert ELF to RAM binary +esp_elf_to_ram_binary(elf_path="build/my_app.elf", chip="esp32s3") + +# 3. Load to RAM and execute (no flash wear!) +esp_load_ram(binary_path="my_app-ram.bin", port="/dev/ttyUSB0") + +# 4. Capture output +esp_serial_monitor(port="/dev/ttyUSB0", duration_seconds=10) +``` + +**Note:** RAM loading requires ELFs built without secure boot (`CONFIG_SECURE_BOOT=n`). Some PlatformIO defaults may be incompatible. + ## QEMU Emulation Run virtual ESP32 devices without physical hardware. Requires [Espressif's QEMU fork](https://github.com/espressif/qemu): diff --git a/src/mcp_esptool_server/components/chip_control.py b/src/mcp_esptool_server/components/chip_control.py index 63fa102..c67ce51 100644 --- a/src/mcp_esptool_server/components/chip_control.py +++ b/src/mcp_esptool_server/components/chip_control.py @@ -108,6 +108,53 @@ class ChipControl: """ return await self._load_test_firmware_impl(context, port, firmware_type) + @self.app.tool("esp_load_ram") + async def load_ram( + context: Context, + binary_path: str, + port: str | None = None, + ) -> dict[str, Any]: + """ + Load and execute binary in RAM without touching flash. + + Perfect for rapid development iteration — test changes without + wearing out flash or waiting for full flash cycle. The binary + must be compiled specifically for RAM execution (no flash relocation). + + Note: The binary runs until the device is reset. Execution cannot + be stopped remotely without a hardware reset. + + Args: + binary_path: Path to the RAM-executable binary + port: Serial port (auto-detect if not specified) + """ + return await self._load_ram_impl(context, binary_path, port) + + @self.app.tool("esp_serial_monitor") + async def serial_monitor( + context: Context, + port: str, + baud_rate: int = 115200, + duration_seconds: float = 5.0, + reset_on_connect: bool = True, + ) -> dict[str, Any]: + """ + Capture serial output from ESP device. + + Opens the serial port and captures output for the specified duration. + Useful for reading boot messages, debug output, or application logs + without switching to a separate terminal monitor. + + Args: + port: Serial port (required — no auto-detect for monitor) + baud_rate: Serial baud rate (default: 115200) + duration_seconds: How long to capture (max 30 seconds, default: 5) + reset_on_connect: Reset device before capturing to get boot messages (default: true) + """ + return await self._serial_monitor_impl( + context, port, baud_rate, duration_seconds, reset_on_connect + ) + # ------------------------------------------------------------------ # Subprocess runner # ------------------------------------------------------------------ @@ -443,6 +490,168 @@ class ChipControl: "timestamp": time.time(), } + async def _load_ram_impl( + self, context: Context, binary_path: str, port: str | None + ) -> dict[str, Any]: + """Load and execute binary in RAM via esptool load_ram.""" + from pathlib import Path + + bin_path = Path(binary_path) + if not bin_path.exists(): + return {"success": False, "error": f"Binary file not found: {binary_path}"} + + if not port: + port = await self._auto_detect_port() + if not port: + return {"success": False, "error": "No ESP devices found"} + + start_time = time.time() + file_size = bin_path.stat().st_size + + # esptool load-ram command loads binary to RAM and executes it + cmd = [ + self.config.esptool_path, + "--port", port, + "load-ram", str(bin_path), + ] + + proc = None + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30.0) + output = (stdout or b"").decode() + (stderr or b"").decode() + + elapsed = round(time.time() - start_time, 2) + + if proc.returncode != 0: + return { + "success": False, + "error": output.strip(), + "port": port, + "binary_path": binary_path, + } + + return { + "success": True, + "port": port, + "binary_path": binary_path, + "file_size": file_size, + "elapsed_seconds": elapsed, + "note": "Binary loaded to RAM and executing. Reset device to stop.", + } + + except asyncio.TimeoutError: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + return {"success": False, "error": "Timeout loading binary to RAM"} + except FileNotFoundError: + return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"} + except Exception as e: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + return {"success": False, "error": str(e)} + + async def _serial_monitor_impl( + self, + context: Context, + port: str, + baud_rate: int, + duration_seconds: float, + reset_on_connect: bool, + ) -> dict[str, Any]: + """Capture serial output using pyserial (async wrapper).""" + import serial + + # Clamp duration to safe range + duration_seconds = min(max(duration_seconds, 0.1), 30.0) + + if not os.path.exists(port): + return {"success": False, "error": f"Port not found: {port}"} + + captured_lines: list[str] = [] + start_time = time.time() + + try: + # Open serial port with short timeout for non-blocking reads + ser = serial.Serial( + port=port, + baudrate=baud_rate, + timeout=0.1, + ) + + # Reset device if requested (toggle DTR/RTS) + if reset_on_connect: + ser.dtr = False + ser.rts = True + await asyncio.sleep(0.1) + ser.rts = False + ser.dtr = True + await asyncio.sleep(0.1) + ser.dtr = False + + # Read serial output for specified duration + deadline = time.time() + duration_seconds + buffer = b"" + + while time.time() < deadline: + # Non-blocking read in executor to avoid blocking event loop + chunk = await asyncio.get_event_loop().run_in_executor( + None, lambda: ser.read(1024) + ) + if chunk: + buffer += chunk + # Process complete lines + while b"\n" in buffer: + line, buffer = buffer.split(b"\n", 1) + try: + decoded = line.decode("utf-8", errors="replace").rstrip("\r") + captured_lines.append(decoded) + except Exception: + captured_lines.append(line.hex()) + else: + # Small sleep to avoid busy-waiting + await asyncio.sleep(0.05) + + # Capture any remaining partial line + if buffer: + try: + captured_lines.append(buffer.decode("utf-8", errors="replace").rstrip("\r")) + except Exception: + captured_lines.append(buffer.hex()) + + ser.close() + + elapsed = round(time.time() - start_time, 2) + + return { + "success": True, + "port": port, + "baud_rate": baud_rate, + "duration_seconds": elapsed, + "reset_performed": reset_on_connect, + "line_count": len(captured_lines), + "output": "\n".join(captured_lines), + } + + except serial.SerialException as e: + return { + "success": False, + "error": f"Serial error: {e}", + "port": port, + } + except Exception as e: + return { + "success": False, + "error": str(e), + "port": port, + } + # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ diff --git a/src/mcp_esptool_server/components/firmware_builder.py b/src/mcp_esptool_server/components/firmware_builder.py index b236340..9f69b13 100644 --- a/src/mcp_esptool_server/components/firmware_builder.py +++ b/src/mcp_esptool_server/components/firmware_builder.py @@ -70,6 +70,38 @@ class FirmwareBuilder: """Convert ELF file to flashable binary""" return await self._elf_to_binary_impl(context, elf_path, output_path) + @self.app.tool("esp_elf_to_ram_binary") + async def elf_to_ram_binary( + context: Context, + elf_path: str, + output_path: str | None = None, + chip: str = "auto", + ) -> dict[str, Any]: + """Convert ELF file to RAM-loadable binary for use with esp_load_ram. + + Creates a binary with RAM segments (IRAM/DRAM) placed first, suitable + for loading directly into device RAM without touching flash. Perfect + for rapid development iteration. + + Workflow: + 1. Build your project with ESP-IDF (disable secure boot/signed images) + 2. Use this tool to convert the ELF to a RAM binary + 3. Use esp_load_ram to load and execute on device + + Requirements: + - ELF must NOT have embedded SHA256 digest at reserved offset + - Disable CONFIG_SECURE_BOOT and CONFIG_SECURE_SIGNED_APPS in sdkconfig + - Some PlatformIO builds may have incompatible settings + + Note: Features requiring flash (OTA, NVS, SPIFFS) won't work from RAM. + + Args: + elf_path: Path to the ELF file from your build + output_path: Output binary path (default: -ram.bin) + chip: Target chip type (auto, esp32, esp32s3, etc.) + """ + return await self._elf_to_ram_binary_impl(context, elf_path, output_path, chip) + @self.app.tool("esp_firmware_analyze") async def analyze_firmware(context: Context, firmware_path: str) -> dict[str, Any]: """Analyze firmware binary structure""" @@ -124,6 +156,61 @@ class FirmwareBuilder: return response + async def _elf_to_ram_binary_impl( + self, + context: Context, + elf_path: str, + output_path: str | None, + chip: str, + ) -> dict[str, Any]: + """Convert ELF to RAM-loadable binary via esptool elf2image --ram-only-header.""" + + elf = Path(elf_path) + if not elf.exists(): + return {"success": False, "error": f"ELF file not found: {elf_path}"} + + # Determine output path + if output_path: + out = Path(output_path) + else: + out = elf.with_name(f"{elf.stem}-ram.bin") + + cmd = [ + self.config.esptool_path, + "--chip", chip, + "elf2image", + "--ram-only-header", + "--output", str(out), + elf_path, + ] + + result = await self._run_cmd(cmd, timeout=30.0) + + if not result["success"]: + return { + "success": False, + "error": result["error"], + "elf_path": elf_path, + } + + response: dict[str, Any] = { + "success": True, + "elf_path": elf_path, + "output_path": str(out), + "chip": chip, + "ram_optimized": True, + } + + if out.exists(): + response["output_size_bytes"] = out.stat().st_size + + # Add usage hint + response["usage_hint"] = ( + f"Load to device with: esp_load_ram(binary_path='{out}', port='')" + ) + + return response + async def _firmware_analyze_impl( self, context: Context, diff --git a/src/mcp_esptool_server/components/flash_manager.py b/src/mcp_esptool_server/components/flash_manager.py index 501cbfb..f3ae2d3 100644 --- a/src/mcp_esptool_server/components/flash_manager.py +++ b/src/mcp_esptool_server/components/flash_manager.py @@ -164,6 +164,54 @@ class FlashManager: """ return await self._flash_backup_impl(context, backup_path, port, include_bootloader) + @self.app.tool("esp_flash_multi") + async def flash_multi( + context: Context, + files: list[dict], + port: str | None = None, + verify: bool = True, + compress: bool = True, + ) -> dict[str, Any]: + """Flash multiple binaries at different addresses in one operation. + + Writes multiple binary files to flash in a single esptool invocation. + Faster than multiple separate flash operations since it connects once. + + Common use cases: + - Flash bootloader + partition table + app in one shot + - Deploy complete firmware stack with filesystem image + + Args: + files: List of dicts with "address" (hex string) and "path" (file path). + Example: [{"address": "0x0", "path": "bootloader.bin"}, + {"address": "0x8000", "path": "partitions.bin"}, + {"address": "0x10000", "path": "app.bin"}] + port: Serial port or socket:// URI (required) + verify: Verify flash contents after writing (default: true) + compress: Use compression for faster transfer (default: true) + """ + return await self._flash_multi_impl(context, files, port, verify, compress) + + @self.app.tool("esp_verify_flash") + async def verify_flash( + context: Context, + firmware_path: str, + port: str | None = None, + address: str = "0x0", + ) -> dict[str, Any]: + """Verify flash contents match a file without re-flashing. + + Reads flash memory at the specified address and compares against + the provided file. Useful for confirming successful flash operations + or checking if an update is needed. + + Args: + firmware_path: Path to the binary file to compare against + port: Serial port or socket:// URI (required) + address: Flash address to verify from (hex string, default: "0x0") + """ + return await self._verify_flash_impl(context, firmware_path, port, address) + async def _flash_firmware_impl( self, context: Context, @@ -336,6 +384,165 @@ class FlashManager: size=None, # auto-detect full flash ) + async def _flash_multi_impl( + self, + context: Context, + files: list[dict], + port: str | None, + verify: bool, + compress: bool, + ) -> dict[str, Any]: + """Flash multiple binaries at different addresses via esptool write-flash.""" + + if not port: + return {"success": False, "error": "Port is required (no auto-detect for flash operations)"} + + if not files: + return {"success": False, "error": "No files specified"} + + # Validate all files exist and build address/path pairs + flash_args: list[str] = [] + validated_files: list[dict] = [] + total_size = 0 + + for entry in files: + if "address" not in entry or "path" not in entry: + return { + "success": False, + "error": f"Each file entry must have 'address' and 'path' keys. Got: {entry}", + } + + fw_path = Path(entry["path"]) + if not fw_path.exists(): + return {"success": False, "error": f"File not found: {entry['path']}"} + + file_size = fw_path.stat().st_size + total_size += file_size + validated_files.append({ + "address": entry["address"], + "path": str(fw_path), + "size": file_size, + }) + flash_args.extend([entry["address"], str(fw_path)]) + + start_time = time.time() + + # Build esptool command: write-flash [options] addr1 file1 addr2 file2 ... + # Options come after the subcommand in esptool v5.x + args = ["write-flash"] + if compress: + args.append("--compress") + if not verify: + args.append("--no-verify") + args.extend(flash_args) + + result = await self._run_esptool(port, args, timeout=300.0) + + if not result["success"]: + return { + "success": False, + "error": result["error"], + "port": port, + "files": validated_files, + } + + output = result["output"] + elapsed = round(time.time() - start_time, 1) + + # Parse bytes written from output + bytes_written = 0 + write_matches = re.findall(r"Wrote (\d+) bytes", output) + for match in write_matches: + bytes_written += int(match) + + verified = "Hash of data verified" in output or "Verified" in output + + return { + "success": True, + "port": port, + "files": validated_files, + "file_count": len(validated_files), + "total_size": total_size, + "bytes_written": bytes_written, + "compressed": compress, + "verified": verified if verify else None, + "elapsed_seconds": elapsed, + } + + async def _verify_flash_impl( + self, + context: Context, + firmware_path: str, + port: str | None, + address: str, + ) -> dict[str, Any]: + """Verify flash contents match a file via esptool verify-flash.""" + + fw_path = Path(firmware_path) + if not fw_path.exists(): + return {"success": False, "error": f"File not found: {firmware_path}"} + + if not port: + return {"success": False, "error": "Port is required (no auto-detect for flash operations)"} + + start_time = time.time() + file_size = fw_path.stat().st_size + + # Use hyphenated command form (verify-flash not verify_flash) + result = await self._run_esptool( + port, + ["verify-flash", address, str(fw_path)], + timeout=120.0, + ) + + elapsed = round(time.time() - start_time, 1) + + # Check for verification success or failure in output + output = result.get("output", "") or result.get("error", "") + output_lower = output.lower() + verified = ( + "verification successful" in output_lower + or "verify ok" in output_lower + or "digest matched" in output_lower + ) + mismatch = ( + "verify failed" in output_lower + or "mismatch" in output_lower + or "does not match" in output_lower + ) + + if result["success"] and verified: + return { + "success": True, + "verified": True, + "port": port, + "firmware_path": firmware_path, + "address": address, + "file_size": file_size, + "elapsed_seconds": elapsed, + } + elif mismatch: + # Extract mismatch details if available + return { + "success": True, + "verified": False, + "mismatch": True, + "port": port, + "firmware_path": firmware_path, + "address": address, + "file_size": file_size, + "elapsed_seconds": elapsed, + "details": output.strip() if output else None, + } + else: + return { + "success": False, + "error": result.get("error", "Verification failed"), + "port": port, + "firmware_path": firmware_path, + "address": address, + } + async def health_check(self) -> dict[str, Any]: """Component health check""" return {"status": "healthy", "note": "Flash manager ready"}