Add flash workflow and development iteration tools

New tools:
- esp_flash_multi: Flash multiple binaries at different addresses in one operation
- esp_verify_flash: Verify flash contents match a file without re-flashing
- esp_load_ram: Load and execute binary in RAM for rapid development
- esp_serial_monitor: Capture serial output from device
- esp_elf_to_ram_binary: Convert ELF to RAM-loadable binary

Fixes:
- Use hyphenated esptool commands (load-ram, verify-flash) to avoid deprecation warnings
- Fix verify_flash parser for esptool v5.x output format
- Fix flash_multi command ordering for esptool v5.x CLI structure

Documentation updated with flash operations table and RAM loading workflow.
This commit is contained in:
Ryan Malloy 2026-02-06 01:02:16 -07:00
parent 972f09b2d2
commit 57953fc60f
4 changed files with 560 additions and 0 deletions

View File

@ -67,6 +67,63 @@ The server implements a component-based architecture with middleware for CLI too
- `Diagnostics`: Memory dumps and performance profiling
- `QemuManager`: QEMU-based ESP32 emulation with download mode, efuse, and flash support
## Flash Operations
Advanced flash management tools for efficient firmware deployment:
| Tool | Description |
|------|-------------|
| `esp_flash_firmware` | Flash a single binary to device |
| `esp_flash_multi` | Flash multiple binaries at different addresses in one operation |
| `esp_verify_flash` | Verify flash contents match a file without re-flashing |
| `esp_flash_read` | Read flash memory to a file |
| `esp_flash_erase` | Erase flash regions |
| `esp_flash_backup` | Create complete flash backup |
### Multi-File Flashing
Flash bootloader, partition table, and app in a single operation:
```python
esp_flash_multi(
files=[
{"address": "0x0", "path": "bootloader.bin"},
{"address": "0x8000", "path": "partitions.bin"},
{"address": "0x10000", "path": "app.bin"}
],
port="/dev/ttyUSB0",
verify=True
)
```
## RAM Loading (Development Iteration)
Test firmware changes without wearing out flash:
| Tool | Description |
|------|-------------|
| `esp_elf_to_ram_binary` | Convert ELF to RAM-loadable binary |
| `esp_load_ram` | Load and execute binary in RAM |
| `esp_serial_monitor` | Capture serial output from device |
### Workflow
```bash
# 1. Build your ESP-IDF project
idf.py build
# 2. Convert ELF to RAM binary
esp_elf_to_ram_binary(elf_path="build/my_app.elf", chip="esp32s3")
# 3. Load to RAM and execute (no flash wear!)
esp_load_ram(binary_path="my_app-ram.bin", port="/dev/ttyUSB0")
# 4. Capture output
esp_serial_monitor(port="/dev/ttyUSB0", duration_seconds=10)
```
**Note:** RAM loading requires ELFs built without secure boot (`CONFIG_SECURE_BOOT=n`). Some PlatformIO defaults may be incompatible.
## QEMU Emulation
Run virtual ESP32 devices without physical hardware. Requires [Espressif's QEMU fork](https://github.com/espressif/qemu):

View File

@ -108,6 +108,53 @@ class ChipControl:
"""
return await self._load_test_firmware_impl(context, port, firmware_type)
@self.app.tool("esp_load_ram")
async def load_ram(
context: Context,
binary_path: str,
port: str | None = None,
) -> dict[str, Any]:
"""
Load and execute binary in RAM without touching flash.
Perfect for rapid development iteration test changes without
wearing out flash or waiting for full flash cycle. The binary
must be compiled specifically for RAM execution (no flash relocation).
Note: The binary runs until the device is reset. Execution cannot
be stopped remotely without a hardware reset.
Args:
binary_path: Path to the RAM-executable binary
port: Serial port (auto-detect if not specified)
"""
return await self._load_ram_impl(context, binary_path, port)
@self.app.tool("esp_serial_monitor")
async def serial_monitor(
context: Context,
port: str,
baud_rate: int = 115200,
duration_seconds: float = 5.0,
reset_on_connect: bool = True,
) -> dict[str, Any]:
"""
Capture serial output from ESP device.
Opens the serial port and captures output for the specified duration.
Useful for reading boot messages, debug output, or application logs
without switching to a separate terminal monitor.
Args:
port: Serial port (required no auto-detect for monitor)
baud_rate: Serial baud rate (default: 115200)
duration_seconds: How long to capture (max 30 seconds, default: 5)
reset_on_connect: Reset device before capturing to get boot messages (default: true)
"""
return await self._serial_monitor_impl(
context, port, baud_rate, duration_seconds, reset_on_connect
)
# ------------------------------------------------------------------
# Subprocess runner
# ------------------------------------------------------------------
@ -443,6 +490,168 @@ class ChipControl:
"timestamp": time.time(),
}
async def _load_ram_impl(
self, context: Context, binary_path: str, port: str | None
) -> dict[str, Any]:
"""Load and execute binary in RAM via esptool load_ram."""
from pathlib import Path
bin_path = Path(binary_path)
if not bin_path.exists():
return {"success": False, "error": f"Binary file not found: {binary_path}"}
if not port:
port = await self._auto_detect_port()
if not port:
return {"success": False, "error": "No ESP devices found"}
start_time = time.time()
file_size = bin_path.stat().st_size
# esptool load-ram command loads binary to RAM and executes it
cmd = [
self.config.esptool_path,
"--port", port,
"load-ram", str(bin_path),
]
proc = None
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30.0)
output = (stdout or b"").decode() + (stderr or b"").decode()
elapsed = round(time.time() - start_time, 2)
if proc.returncode != 0:
return {
"success": False,
"error": output.strip(),
"port": port,
"binary_path": binary_path,
}
return {
"success": True,
"port": port,
"binary_path": binary_path,
"file_size": file_size,
"elapsed_seconds": elapsed,
"note": "Binary loaded to RAM and executing. Reset device to stop.",
}
except asyncio.TimeoutError:
if proc and proc.returncode is None:
proc.kill()
await proc.wait()
return {"success": False, "error": "Timeout loading binary to RAM"}
except FileNotFoundError:
return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"}
except Exception as e:
if proc and proc.returncode is None:
proc.kill()
await proc.wait()
return {"success": False, "error": str(e)}
async def _serial_monitor_impl(
self,
context: Context,
port: str,
baud_rate: int,
duration_seconds: float,
reset_on_connect: bool,
) -> dict[str, Any]:
"""Capture serial output using pyserial (async wrapper)."""
import serial
# Clamp duration to safe range
duration_seconds = min(max(duration_seconds, 0.1), 30.0)
if not os.path.exists(port):
return {"success": False, "error": f"Port not found: {port}"}
captured_lines: list[str] = []
start_time = time.time()
try:
# Open serial port with short timeout for non-blocking reads
ser = serial.Serial(
port=port,
baudrate=baud_rate,
timeout=0.1,
)
# Reset device if requested (toggle DTR/RTS)
if reset_on_connect:
ser.dtr = False
ser.rts = True
await asyncio.sleep(0.1)
ser.rts = False
ser.dtr = True
await asyncio.sleep(0.1)
ser.dtr = False
# Read serial output for specified duration
deadline = time.time() + duration_seconds
buffer = b""
while time.time() < deadline:
# Non-blocking read in executor to avoid blocking event loop
chunk = await asyncio.get_event_loop().run_in_executor(
None, lambda: ser.read(1024)
)
if chunk:
buffer += chunk
# Process complete lines
while b"\n" in buffer:
line, buffer = buffer.split(b"\n", 1)
try:
decoded = line.decode("utf-8", errors="replace").rstrip("\r")
captured_lines.append(decoded)
except Exception:
captured_lines.append(line.hex())
else:
# Small sleep to avoid busy-waiting
await asyncio.sleep(0.05)
# Capture any remaining partial line
if buffer:
try:
captured_lines.append(buffer.decode("utf-8", errors="replace").rstrip("\r"))
except Exception:
captured_lines.append(buffer.hex())
ser.close()
elapsed = round(time.time() - start_time, 2)
return {
"success": True,
"port": port,
"baud_rate": baud_rate,
"duration_seconds": elapsed,
"reset_performed": reset_on_connect,
"line_count": len(captured_lines),
"output": "\n".join(captured_lines),
}
except serial.SerialException as e:
return {
"success": False,
"error": f"Serial error: {e}",
"port": port,
}
except Exception as e:
return {
"success": False,
"error": str(e),
"port": port,
}
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

View File

@ -70,6 +70,38 @@ class FirmwareBuilder:
"""Convert ELF file to flashable binary"""
return await self._elf_to_binary_impl(context, elf_path, output_path)
@self.app.tool("esp_elf_to_ram_binary")
async def elf_to_ram_binary(
context: Context,
elf_path: str,
output_path: str | None = None,
chip: str = "auto",
) -> dict[str, Any]:
"""Convert ELF file to RAM-loadable binary for use with esp_load_ram.
Creates a binary with RAM segments (IRAM/DRAM) placed first, suitable
for loading directly into device RAM without touching flash. Perfect
for rapid development iteration.
Workflow:
1. Build your project with ESP-IDF (disable secure boot/signed images)
2. Use this tool to convert the ELF to a RAM binary
3. Use esp_load_ram to load and execute on device
Requirements:
- ELF must NOT have embedded SHA256 digest at reserved offset
- Disable CONFIG_SECURE_BOOT and CONFIG_SECURE_SIGNED_APPS in sdkconfig
- Some PlatformIO builds may have incompatible settings
Note: Features requiring flash (OTA, NVS, SPIFFS) won't work from RAM.
Args:
elf_path: Path to the ELF file from your build
output_path: Output binary path (default: <elf_name>-ram.bin)
chip: Target chip type (auto, esp32, esp32s3, etc.)
"""
return await self._elf_to_ram_binary_impl(context, elf_path, output_path, chip)
@self.app.tool("esp_firmware_analyze")
async def analyze_firmware(context: Context, firmware_path: str) -> dict[str, Any]:
"""Analyze firmware binary structure"""
@ -124,6 +156,61 @@ class FirmwareBuilder:
return response
async def _elf_to_ram_binary_impl(
self,
context: Context,
elf_path: str,
output_path: str | None,
chip: str,
) -> dict[str, Any]:
"""Convert ELF to RAM-loadable binary via esptool elf2image --ram-only-header."""
elf = Path(elf_path)
if not elf.exists():
return {"success": False, "error": f"ELF file not found: {elf_path}"}
# Determine output path
if output_path:
out = Path(output_path)
else:
out = elf.with_name(f"{elf.stem}-ram.bin")
cmd = [
self.config.esptool_path,
"--chip", chip,
"elf2image",
"--ram-only-header",
"--output", str(out),
elf_path,
]
result = await self._run_cmd(cmd, timeout=30.0)
if not result["success"]:
return {
"success": False,
"error": result["error"],
"elf_path": elf_path,
}
response: dict[str, Any] = {
"success": True,
"elf_path": elf_path,
"output_path": str(out),
"chip": chip,
"ram_optimized": True,
}
if out.exists():
response["output_size_bytes"] = out.stat().st_size
# Add usage hint
response["usage_hint"] = (
f"Load to device with: esp_load_ram(binary_path='{out}', port='<your-port>')"
)
return response
async def _firmware_analyze_impl(
self,
context: Context,

View File

@ -164,6 +164,54 @@ class FlashManager:
"""
return await self._flash_backup_impl(context, backup_path, port, include_bootloader)
@self.app.tool("esp_flash_multi")
async def flash_multi(
context: Context,
files: list[dict],
port: str | None = None,
verify: bool = True,
compress: bool = True,
) -> dict[str, Any]:
"""Flash multiple binaries at different addresses in one operation.
Writes multiple binary files to flash in a single esptool invocation.
Faster than multiple separate flash operations since it connects once.
Common use cases:
- Flash bootloader + partition table + app in one shot
- Deploy complete firmware stack with filesystem image
Args:
files: List of dicts with "address" (hex string) and "path" (file path).
Example: [{"address": "0x0", "path": "bootloader.bin"},
{"address": "0x8000", "path": "partitions.bin"},
{"address": "0x10000", "path": "app.bin"}]
port: Serial port or socket:// URI (required)
verify: Verify flash contents after writing (default: true)
compress: Use compression for faster transfer (default: true)
"""
return await self._flash_multi_impl(context, files, port, verify, compress)
@self.app.tool("esp_verify_flash")
async def verify_flash(
context: Context,
firmware_path: str,
port: str | None = None,
address: str = "0x0",
) -> dict[str, Any]:
"""Verify flash contents match a file without re-flashing.
Reads flash memory at the specified address and compares against
the provided file. Useful for confirming successful flash operations
or checking if an update is needed.
Args:
firmware_path: Path to the binary file to compare against
port: Serial port or socket:// URI (required)
address: Flash address to verify from (hex string, default: "0x0")
"""
return await self._verify_flash_impl(context, firmware_path, port, address)
async def _flash_firmware_impl(
self,
context: Context,
@ -336,6 +384,165 @@ class FlashManager:
size=None, # auto-detect full flash
)
async def _flash_multi_impl(
self,
context: Context,
files: list[dict],
port: str | None,
verify: bool,
compress: bool,
) -> dict[str, Any]:
"""Flash multiple binaries at different addresses via esptool write-flash."""
if not port:
return {"success": False, "error": "Port is required (no auto-detect for flash operations)"}
if not files:
return {"success": False, "error": "No files specified"}
# Validate all files exist and build address/path pairs
flash_args: list[str] = []
validated_files: list[dict] = []
total_size = 0
for entry in files:
if "address" not in entry or "path" not in entry:
return {
"success": False,
"error": f"Each file entry must have 'address' and 'path' keys. Got: {entry}",
}
fw_path = Path(entry["path"])
if not fw_path.exists():
return {"success": False, "error": f"File not found: {entry['path']}"}
file_size = fw_path.stat().st_size
total_size += file_size
validated_files.append({
"address": entry["address"],
"path": str(fw_path),
"size": file_size,
})
flash_args.extend([entry["address"], str(fw_path)])
start_time = time.time()
# Build esptool command: write-flash [options] addr1 file1 addr2 file2 ...
# Options come after the subcommand in esptool v5.x
args = ["write-flash"]
if compress:
args.append("--compress")
if not verify:
args.append("--no-verify")
args.extend(flash_args)
result = await self._run_esptool(port, args, timeout=300.0)
if not result["success"]:
return {
"success": False,
"error": result["error"],
"port": port,
"files": validated_files,
}
output = result["output"]
elapsed = round(time.time() - start_time, 1)
# Parse bytes written from output
bytes_written = 0
write_matches = re.findall(r"Wrote (\d+) bytes", output)
for match in write_matches:
bytes_written += int(match)
verified = "Hash of data verified" in output or "Verified" in output
return {
"success": True,
"port": port,
"files": validated_files,
"file_count": len(validated_files),
"total_size": total_size,
"bytes_written": bytes_written,
"compressed": compress,
"verified": verified if verify else None,
"elapsed_seconds": elapsed,
}
async def _verify_flash_impl(
self,
context: Context,
firmware_path: str,
port: str | None,
address: str,
) -> dict[str, Any]:
"""Verify flash contents match a file via esptool verify-flash."""
fw_path = Path(firmware_path)
if not fw_path.exists():
return {"success": False, "error": f"File not found: {firmware_path}"}
if not port:
return {"success": False, "error": "Port is required (no auto-detect for flash operations)"}
start_time = time.time()
file_size = fw_path.stat().st_size
# Use hyphenated command form (verify-flash not verify_flash)
result = await self._run_esptool(
port,
["verify-flash", address, str(fw_path)],
timeout=120.0,
)
elapsed = round(time.time() - start_time, 1)
# Check for verification success or failure in output
output = result.get("output", "") or result.get("error", "")
output_lower = output.lower()
verified = (
"verification successful" in output_lower
or "verify ok" in output_lower
or "digest matched" in output_lower
)
mismatch = (
"verify failed" in output_lower
or "mismatch" in output_lower
or "does not match" in output_lower
)
if result["success"] and verified:
return {
"success": True,
"verified": True,
"port": port,
"firmware_path": firmware_path,
"address": address,
"file_size": file_size,
"elapsed_seconds": elapsed,
}
elif mismatch:
# Extract mismatch details if available
return {
"success": True,
"verified": False,
"mismatch": True,
"port": port,
"firmware_path": firmware_path,
"address": address,
"file_size": file_size,
"elapsed_seconds": elapsed,
"details": output.strip() if output else None,
}
else:
return {
"success": False,
"error": result.get("error", "Verification failed"),
"port": port,
"firmware_path": firmware_path,
"address": address,
}
async def health_check(self) -> dict[str, Any]:
"""Component health check"""
return {"status": "healthy", "note": "Flash manager ready"}