diff --git a/src/mcp_esptool_server/components/firmware_builder.py b/src/mcp_esptool_server/components/firmware_builder.py
index ee0a13e..b236340 100644
--- a/src/mcp_esptool_server/components/firmware_builder.py
+++ b/src/mcp_esptool_server/components/firmware_builder.py
@@ -1,11 +1,14 @@
"""
Firmware Builder Component
-Provides ESP-IDF integration for building, compiling, and managing
-firmware projects with host application support.
+Provides firmware binary conversion and analysis using esptool's
+elf2image and image-info commands.
"""
+import asyncio
import logging
+import re
+from pathlib import Path
from typing import Any
from fastmcp import Context, FastMCP
@@ -18,11 +21,45 @@ logger = logging.getLogger(__name__)
class FirmwareBuilder:
"""ESP firmware building and compilation"""
- def __init__(self, app: FastMCP, config: ESPToolServerConfig):
+ def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None:
self.app = app
self.config = config
self._register_tools()
+ async def _run_cmd(
+ self,
+ cmd: list[str],
+ timeout: float = 30.0,
+ ) -> dict[str, Any]:
+ """Run a CLI command as an async subprocess."""
+ proc = None
+ try:
+ proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
+ output = (stdout or b"").decode() + (stderr or b"").decode()
+
+ if proc.returncode != 0:
+ return {"success": False, "error": output.strip()[:500]}
+
+ return {"success": True, "output": output}
+
+ except asyncio.TimeoutError:
+ if proc and proc.returncode is None:
+ proc.kill()
+ await proc.wait()
+ return {"success": False, "error": f"Timeout after {timeout}s"}
+ except FileNotFoundError:
+ return {"success": False, "error": f"Command not found: {cmd[0]}"}
+ except Exception as e:
+ if proc and proc.returncode is None:
+ proc.kill()
+ await proc.wait()
+ return {"success": False, "error": str(e)}
+
def _register_tools(self) -> None:
"""Register firmware building tools"""
@@ -31,12 +68,135 @@ class FirmwareBuilder:
context: Context, elf_path: str, output_path: str | None = None
) -> dict[str, Any]:
"""Convert ELF file to flashable binary"""
- return {"success": True, "note": "Implementation coming soon"}
+ return await self._elf_to_binary_impl(context, elf_path, output_path)
@self.app.tool("esp_firmware_analyze")
async def analyze_firmware(context: Context, firmware_path: str) -> dict[str, Any]:
"""Analyze firmware binary structure"""
- return {"success": True, "note": "Implementation coming soon"}
+ return await self._firmware_analyze_impl(context, firmware_path)
+
+ async def _elf_to_binary_impl(
+ self,
+ context: Context,
+ elf_path: str,
+ output_path: str | None,
+ ) -> dict[str, Any]:
+ """Convert ELF to flashable binary via esptool elf2image."""
+
+ elf = Path(elf_path)
+ if not elf.exists():
+ return {"success": False, "error": f"ELF file not found: {elf_path}"}
+
+ cmd = [self.config.esptool_path, "--chip", "auto", "elf2image"]
+
+ if output_path:
+ cmd.extend(["--output", output_path])
+
+ cmd.append(elf_path)
+
+ result = await self._run_cmd(cmd, timeout=30.0)
+
+ if not result["success"]:
+ return {"success": False, "error": result["error"], "elf_path": elf_path}
+
+ # Determine the output file path
+ if output_path:
+ out = Path(output_path)
+ else:
+ # esptool elf2image default: -.bin or .bin
+ # Look for likely output files
+ out = elf.with_suffix(".bin")
+ if not out.exists():
+ # Try common patterns
+ for candidate in elf.parent.glob(f"{elf.stem}*.bin"):
+ out = candidate
+ break
+
+ response: dict[str, Any] = {
+ "success": True,
+ "elf_path": elf_path,
+ "esptool_output": result["output"][:1000],
+ }
+
+ if out.exists():
+ response["output_path"] = str(out)
+ response["output_size_bytes"] = out.stat().st_size
+
+ return response
+
+ async def _firmware_analyze_impl(
+ self,
+ context: Context,
+ firmware_path: str,
+ ) -> dict[str, Any]:
+ """Analyze firmware binary via esptool image-info."""
+
+ fw = Path(firmware_path)
+ if not fw.exists():
+ return {"success": False, "error": f"Firmware file not found: {firmware_path}"}
+
+ # image-info --version 2 gives extended output
+ result = await self._run_cmd(
+ [self.config.esptool_path, "image-info", "--version", "2", firmware_path],
+ timeout=15.0,
+ )
+
+ if not result["success"]:
+ return {"success": False, "error": result["error"], "firmware_path": firmware_path}
+
+ output = result["output"]
+ info = self._parse_image_info(output)
+
+ return {
+ "success": True,
+ "firmware_path": firmware_path,
+ "file_size_bytes": fw.stat().st_size,
+ **info,
+ "raw_output": output[:2000],
+ }
+
+ def _parse_image_info(self, output: str) -> dict[str, Any]:
+ """Parse esptool image-info output into structured data."""
+ info: dict[str, Any] = {}
+
+ # Extract key fields using regex
+ patterns = {
+ "entry_point": r"Entry point:\s*(0x[0-9a-fA-F]+)",
+ "chip": r"Chip:\s*(\S+)",
+ "flash_mode": r"Flash mode:\s*(\S+)",
+ "flash_size": r"Flash size:\s*(\S+)",
+ "flash_freq": r"Flash freq:\s*(\S+)",
+ }
+
+ for key, pattern in patterns.items():
+ match = re.search(pattern, output)
+ if match:
+ info[key] = match.group(1)
+
+ # Parse segments
+ segments = []
+ # Pattern: Segment N: len 0xNNNNN load 0xNNNNNNNN ...
+ for match in re.finditer(
+ r"Segment\s+(\d+):\s+len\s+(0x[0-9a-fA-F]+)\s+load\s+(0x[0-9a-fA-F]+)",
+ output,
+ ):
+ segments.append({
+ "index": int(match.group(1)),
+ "length": match.group(2),
+ "load_address": match.group(3),
+ })
+
+ if segments:
+ info["segments"] = segments
+ info["segment_count"] = len(segments)
+
+ # Check for validation status
+ if "valid" in output.lower():
+ valid_match = re.search(r"Validation\s+Hash:\s*(\S+)", output, re.IGNORECASE)
+ if valid_match:
+ info["validation_hash"] = valid_match.group(1)
+
+ return info
async def health_check(self) -> dict[str, Any]:
"""Component health check"""
diff --git a/src/mcp_esptool_server/components/ota_manager.py b/src/mcp_esptool_server/components/ota_manager.py
index 9b64ea5..f0c712f 100644
--- a/src/mcp_esptool_server/components/ota_manager.py
+++ b/src/mcp_esptool_server/components/ota_manager.py
@@ -5,7 +5,13 @@ Handles Over-The-Air update operations including package creation,
deployment, rollback, and update management.
"""
+import asyncio
+import hashlib
+import json
import logging
+import time
+import zipfile
+from pathlib import Path
from typing import Any
from fastmcp import Context, FastMCP
@@ -18,11 +24,44 @@ logger = logging.getLogger(__name__)
class OTAManager:
"""ESP Over-The-Air update management"""
- def __init__(self, app: FastMCP, config: ESPToolServerConfig):
+ def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None:
self.app = app
self.config = config
self._register_tools()
+ async def _run_esptool(
+ self,
+ port: str,
+ args: list[str],
+ timeout: float = 30.0,
+ ) -> dict[str, Any]:
+ """Run esptool as an async subprocess."""
+ cmd = [self.config.esptool_path, "--port", port, *args]
+ proc = None
+ try:
+ proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
+ output = (stdout or b"").decode() + (stderr or b"").decode()
+ if proc.returncode != 0:
+ return {"success": False, "error": output.strip()[:500]}
+ return {"success": True, "output": output}
+ except asyncio.TimeoutError:
+ if proc and proc.returncode is None:
+ proc.kill()
+ await proc.wait()
+ return {"success": False, "error": f"Timeout after {timeout}s"}
+ except FileNotFoundError:
+ return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"}
+ except Exception as e:
+ if proc and proc.returncode is None:
+ proc.kill()
+ await proc.wait()
+ return {"success": False, "error": str(e)}
+
def _register_tools(self) -> None:
"""Register OTA management tools"""
@@ -31,19 +70,265 @@ class OTAManager:
context: Context, firmware_path: str, version: str, output_path: str
) -> dict[str, Any]:
"""Create OTA update package"""
- return {"success": True, "note": "Implementation coming soon"}
+ return await self._package_create_impl(context, firmware_path, version, output_path)
@self.app.tool("esp_ota_deploy")
async def deploy_ota_update(
context: Context, package_path: str, target_url: str
) -> dict[str, Any]:
"""Deploy OTA update to device"""
- return {"success": True, "note": "Implementation coming soon"}
+ return await self._deploy_impl(context, package_path, target_url)
@self.app.tool("esp_ota_rollback")
async def rollback_ota(context: Context, port: str | None = None) -> dict[str, Any]:
"""Rollback to previous firmware version"""
- return {"success": True, "note": "Implementation coming soon"}
+ return await self._rollback_impl(context, port)
+
+ async def _package_create_impl(
+ self,
+ context: Context,
+ firmware_path: str,
+ version: str,
+ output_path: str,
+ ) -> dict[str, Any]:
+ """Create an OTA update package (zip with firmware + manifest).
+
+ The package contains:
+ - firmware.bin: The raw application binary
+ - manifest.json: Metadata (version, SHA-256, size, timestamp)
+ """
+
+ fw = Path(firmware_path)
+ if not fw.exists():
+ return {"success": False, "error": f"Firmware file not found: {firmware_path}"}
+
+ fw_data = fw.read_bytes()
+ fw_sha256 = hashlib.sha256(fw_data).hexdigest()
+
+ manifest = {
+ "version": version,
+ "firmware_name": fw.name,
+ "firmware_size": len(fw_data),
+ "firmware_sha256": fw_sha256,
+ "created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
+ }
+
+ out = Path(output_path)
+ try:
+ with zipfile.ZipFile(out, "w", compression=zipfile.ZIP_DEFLATED) as zf:
+ zf.writestr("firmware.bin", fw_data)
+ zf.writestr("manifest.json", json.dumps(manifest, indent=2))
+ except OSError as e:
+ return {"success": False, "error": f"Failed to create package: {e}"}
+
+ return {
+ "success": True,
+ "output_path": str(out),
+ "package_size_bytes": out.stat().st_size,
+ "manifest": manifest,
+ }
+
+ async def _deploy_impl(
+ self,
+ context: Context,
+ package_path: str,
+ target_url: str,
+ ) -> dict[str, Any]:
+ """Deploy an OTA package to a device via HTTP POST.
+
+ Extracts firmware.bin from the package and POSTs it to the
+ device's OTA endpoint (e.g. http://192.168.1.100/ota/update).
+ The target device must be running an HTTP OTA server (like
+ esp_https_ota or a custom handler).
+ """
+
+ pkg = Path(package_path)
+ if not pkg.exists():
+ return {"success": False, "error": f"Package not found: {package_path}"}
+
+ # Extract firmware from package
+ try:
+ with zipfile.ZipFile(pkg, "r") as zf:
+ if "firmware.bin" not in zf.namelist():
+ return {"success": False, "error": "Package missing firmware.bin"}
+
+ fw_data = zf.read("firmware.bin")
+
+ manifest = None
+ if "manifest.json" in zf.namelist():
+ manifest = json.loads(zf.read("manifest.json"))
+ except zipfile.BadZipFile:
+ return {"success": False, "error": "Invalid zip package"}
+
+ # POST firmware to device
+ # Using curl as an async subprocess since it's universally available
+ # and handles HTTP/HTTPS without Python dependency issues
+ import tempfile
+
+ with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
+ tmp.write(fw_data)
+ tmp_path = tmp.name
+
+ try:
+ proc = await asyncio.create_subprocess_exec(
+ "curl",
+ "--silent",
+ "--show-error",
+ "--max-time", "120",
+ "--write-out", "%{http_code}",
+ "--output", "/dev/null",
+ "--data-binary", f"@{tmp_path}",
+ "--header", "Content-Type: application/octet-stream",
+ target_url,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=130.0)
+ http_code = (stdout or b"").decode().strip()
+ curl_error = (stderr or b"").decode().strip()
+
+ if proc.returncode != 0:
+ return {
+ "success": False,
+ "error": f"HTTP request failed: {curl_error}",
+ "target_url": target_url,
+ }
+
+ status_ok = http_code.startswith("2")
+
+ result: dict[str, Any] = {
+ "success": status_ok,
+ "target_url": target_url,
+ "http_status": http_code,
+ "firmware_size_bytes": len(fw_data),
+ }
+
+ if manifest:
+ result["version"] = manifest.get("version")
+
+ if not status_ok:
+ result["error"] = f"Device returned HTTP {http_code}"
+
+ return result
+
+ except asyncio.TimeoutError:
+ return {"success": False, "error": "OTA deploy timed out (130s)", "target_url": target_url}
+ except FileNotFoundError:
+ return {"success": False, "error": "curl not found — required for OTA deploy"}
+ finally:
+ import os
+
+ try:
+ os.unlink(tmp_path)
+ except OSError:
+ pass
+
+ async def _rollback_impl(
+ self,
+ context: Context,
+ port: str | None,
+ ) -> dict[str, Any]:
+ """Rollback OTA by erasing the otadata partition.
+
+ When the otadata partition is erased (all 0xFF), the bootloader
+ falls back to the factory app or ota_0 — effectively rolling back
+ to the first-flashed firmware. This works because the otadata
+ partition tracks which OTA slot is active.
+
+ For more precise control, use esp_partition_analyze to find the
+ otadata offset, then esp_flash_erase to clear just that region.
+ """
+
+ if not port:
+ return {"success": False, "error": "Port is required for OTA rollback"}
+
+ # First, read the partition table to find the otadata partition
+ # We need the partition manager's analyze logic, but we can just
+ # read the partition table directly with esptool
+ import struct
+ import tempfile
+
+ with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
+ tmp_path = tmp.name
+
+ try:
+ # Read partition table from 0x8000
+ result = await self._run_esptool(
+ port,
+ ["read-flash", "0x8000", "0xC00", tmp_path],
+ timeout=60.0,
+ )
+
+ if not result["success"]:
+ return {"success": False, "error": f"Cannot read partition table: {result['error']}", "port": port}
+
+ raw = Path(tmp_path).read_bytes()
+
+ # Find otadata partition (type=data/0x01, subtype=ota/0x00)
+ otadata_offset = None
+ otadata_size = None
+
+ for i in range(0, len(raw) - 32 + 1, 32):
+ entry = raw[i : i + 32]
+ magic = struct.unpack_from(" dict[str, Any]:
"""Component health check"""
diff --git a/src/mcp_esptool_server/components/production_tools.py b/src/mcp_esptool_server/components/production_tools.py
index 9fbb849..888bc76 100644
--- a/src/mcp_esptool_server/components/production_tools.py
+++ b/src/mcp_esptool_server/components/production_tools.py
@@ -5,7 +5,11 @@ Provides factory programming, batch operations, quality control,
and production line integration tools.
"""
+import asyncio
import logging
+import re
+import time
+from pathlib import Path
from typing import Any
from fastmcp import Context, FastMCP
@@ -18,11 +22,44 @@ logger = logging.getLogger(__name__)
class ProductionTools:
"""ESP production and factory programming tools"""
- def __init__(self, app: FastMCP, config: ESPToolServerConfig):
+ def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None:
self.app = app
self.config = config
self._register_tools()
+ async def _run_esptool(
+ self,
+ port: str,
+ args: list[str],
+ timeout: float = 30.0,
+ ) -> dict[str, Any]:
+ """Run esptool as an async subprocess."""
+ cmd = [self.config.esptool_path, "--port", port, *args]
+ proc = None
+ try:
+ proc = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ )
+ stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
+ output = (stdout or b"").decode() + (stderr or b"").decode()
+ if proc.returncode != 0:
+ return {"success": False, "error": output.strip()[:500]}
+ return {"success": True, "output": output}
+ except asyncio.TimeoutError:
+ if proc and proc.returncode is None:
+ proc.kill()
+ await proc.wait()
+ return {"success": False, "error": f"Timeout after {timeout}s"}
+ except FileNotFoundError:
+ return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"}
+ except Exception as e:
+ if proc and proc.returncode is None:
+ proc.kill()
+ await proc.wait()
+ return {"success": False, "error": str(e)}
+
def _register_tools(self) -> None:
"""Register production tools"""
@@ -31,21 +68,335 @@ class ProductionTools:
context: Context, program_config: dict[str, Any], port: str | None = None
) -> dict[str, Any]:
"""Program device for factory deployment"""
- return {"success": True, "note": "Implementation coming soon"}
+ return await self._factory_program_impl(context, program_config, port)
@self.app.tool("esp_batch_program")
async def batch_program(
context: Context, device_list: list[str], firmware_path: str
) -> dict[str, Any]:
"""Program multiple devices in batch"""
- return {"success": True, "note": "Implementation coming soon"}
+ return await self._batch_program_impl(context, device_list, firmware_path)
@self.app.tool("esp_quality_control")
async def quality_control(
context: Context, port: str | None = None, test_suite: str = "basic"
) -> dict[str, Any]:
"""Run quality control tests"""
- return {"success": True, "note": "Implementation coming soon"}
+ return await self._quality_control_impl(context, port, test_suite)
+
+ async def _factory_program_impl(
+ self,
+ context: Context,
+ program_config: dict[str, Any],
+ port: str | None,
+ ) -> dict[str, Any]:
+ """Factory-program a device: erase → flash → verify.
+
+ program_config should contain:
+ {
+ "firmware_path": "/path/to/firmware.bin",
+ "address": "0x0", # optional, default "0x0"
+ "erase_before": true, # optional, default true
+ "verify": true, # optional, default true
+ "partition_table": "/path/to/partitions.bin", # optional
+ "partition_table_address": "0x8000", # optional
+ "bootloader": "/path/to/bootloader.bin", # optional
+ "bootloader_address": "0x1000", # optional
+ }
+ """
+
+ if not port:
+ return {"success": False, "error": "Port is required for factory programming"}
+
+ firmware_path = program_config.get("firmware_path")
+ if not firmware_path:
+ return {"success": False, "error": "program_config must include 'firmware_path'"}
+
+ fw = Path(firmware_path)
+ if not fw.exists():
+ return {"success": False, "error": f"Firmware not found: {firmware_path}"}
+
+ erase_before = program_config.get("erase_before", True)
+ verify = program_config.get("verify", True)
+ address = program_config.get("address", "0x0")
+
+ steps: list[dict[str, Any]] = []
+ t_start = time.time()
+
+ # Step 1: Erase flash
+ if erase_before:
+ result = await self._run_esptool(port, ["erase-flash"], timeout=60.0)
+ steps.append({
+ "step": "erase_flash",
+ "success": result["success"],
+ "error": result.get("error"),
+ })
+ if not result["success"]:
+ return {
+ "success": False,
+ "error": f"Erase failed: {result['error']}",
+ "steps": steps,
+ "port": port,
+ }
+
+ # Step 2: Flash bootloader (if provided)
+ bootloader = program_config.get("bootloader")
+ if bootloader:
+ bl_path = Path(bootloader)
+ if not bl_path.exists():
+ return {"success": False, "error": f"Bootloader not found: {bootloader}"}
+
+ bl_addr = program_config.get("bootloader_address", "0x1000")
+ result = await self._run_esptool(
+ port,
+ ["write-flash", bl_addr, bootloader],
+ timeout=120.0,
+ )
+ steps.append({
+ "step": "flash_bootloader",
+ "address": bl_addr,
+ "success": result["success"],
+ "error": result.get("error"),
+ })
+ if not result["success"]:
+ return {
+ "success": False,
+ "error": f"Bootloader flash failed: {result['error']}",
+ "steps": steps,
+ "port": port,
+ }
+
+ # Step 3: Flash partition table (if provided)
+ partition_table = program_config.get("partition_table")
+ if partition_table:
+ pt_path = Path(partition_table)
+ if not pt_path.exists():
+ return {"success": False, "error": f"Partition table not found: {partition_table}"}
+
+ pt_addr = program_config.get("partition_table_address", "0x8000")
+ result = await self._run_esptool(
+ port,
+ ["write-flash", pt_addr, partition_table],
+ timeout=120.0,
+ )
+ steps.append({
+ "step": "flash_partition_table",
+ "address": pt_addr,
+ "success": result["success"],
+ "error": result.get("error"),
+ })
+ if not result["success"]:
+ return {
+ "success": False,
+ "error": f"Partition table flash failed: {result['error']}",
+ "steps": steps,
+ "port": port,
+ }
+
+ # Step 4: Flash main firmware
+ write_args = ["write-flash"]
+ if verify:
+ write_args.append("--verify")
+ write_args.extend([address, firmware_path])
+
+ result = await self._run_esptool(port, write_args, timeout=300.0)
+ steps.append({
+ "step": "flash_firmware",
+ "address": address,
+ "success": result["success"],
+ "error": result.get("error"),
+ })
+ if not result["success"]:
+ return {
+ "success": False,
+ "error": f"Firmware flash failed: {result['error']}",
+ "steps": steps,
+ "port": port,
+ }
+
+ elapsed = round(time.time() - t_start, 2)
+
+ return {
+ "success": True,
+ "port": port,
+ "steps": steps,
+ "total_time_seconds": elapsed,
+ "firmware_path": firmware_path,
+ "firmware_size_bytes": fw.stat().st_size,
+ }
+
+ async def _batch_program_impl(
+ self,
+ context: Context,
+ device_list: list[str],
+ firmware_path: str,
+ ) -> dict[str, Any]:
+ """Program multiple devices in parallel.
+
+ Each device gets the same firmware flashed at 0x0 with erase + verify.
+ Devices are programmed concurrently using asyncio.gather.
+ """
+
+ if not device_list:
+ return {"success": False, "error": "device_list is empty"}
+
+ fw = Path(firmware_path)
+ if not fw.exists():
+ return {"success": False, "error": f"Firmware not found: {firmware_path}"}
+
+ t_start = time.time()
+
+ async def program_one(port: str) -> dict[str, Any]:
+ """Program a single device."""
+ config = {
+ "firmware_path": firmware_path,
+ "erase_before": True,
+ "verify": True,
+ }
+ return await self._factory_program_impl(context, config, port)
+
+ # Run all programming tasks concurrently
+ results = await asyncio.gather(
+ *[program_one(port) for port in device_list],
+ return_exceptions=True,
+ )
+
+ device_results = []
+ succeeded = 0
+ for port, result in zip(device_list, results, strict=True):
+ if isinstance(result, Exception):
+ device_results.append({
+ "port": port,
+ "success": False,
+ "error": str(result),
+ })
+ else:
+ device_results.append({
+ "port": port,
+ "success": result.get("success", False),
+ "error": result.get("error"),
+ "time_seconds": result.get("total_time_seconds"),
+ })
+ if result.get("success"):
+ succeeded += 1
+
+ elapsed = round(time.time() - t_start, 2)
+
+ return {
+ "success": succeeded == len(device_list),
+ "total_devices": len(device_list),
+ "succeeded": succeeded,
+ "failed": len(device_list) - succeeded,
+ "total_time_seconds": elapsed,
+ "firmware_path": firmware_path,
+ "devices": device_results,
+ }
+
+ async def _quality_control_impl(
+ self,
+ context: Context,
+ port: str | None,
+ test_suite: str,
+ ) -> dict[str, Any]:
+ """Run quality control checks on a device.
+
+ Test suites:
+ - "basic": chip-id, flash-id, read-mac (fast verification)
+ - "extended": basic + flash read/verify + memory dump check
+ """
+
+ if not port:
+ return {"success": False, "error": "Port is required for quality control"}
+
+ tests: list[dict[str, Any]] = []
+ t_start = time.time()
+
+ # Test 1: Chip identification
+ result = await self._run_esptool(port, ["chip-id"], timeout=15.0)
+ chip_info: dict[str, Any] = {"test": "chip_identification", "success": result["success"]}
+ if result["success"]:
+ output = result["output"]
+ chip_match = re.search(r"Chip is (\S+)", output)
+ id_match = re.search(r"Chip ID:\s*(0x[0-9a-fA-F]+)", output)
+ chip_info["chip"] = chip_match.group(1) if chip_match else "unknown"
+ chip_info["chip_id"] = id_match.group(1) if id_match else "unknown"
+ else:
+ chip_info["error"] = result.get("error", "")[:200]
+ tests.append(chip_info)
+
+ # Test 2: Flash identification
+ result = await self._run_esptool(port, ["flash-id"], timeout=15.0)
+ flash_info: dict[str, Any] = {"test": "flash_identification", "success": result["success"]}
+ if result["success"]:
+ output = result["output"]
+ mfr_match = re.search(r"Manufacturer:\s*(0x[0-9a-fA-F]+)", output)
+ size_match = re.search(r"Detected flash size:\s*(\S+)", output)
+ flash_info["manufacturer"] = mfr_match.group(1) if mfr_match else "unknown"
+ flash_info["flash_size"] = size_match.group(1) if size_match else "unknown"
+ else:
+ flash_info["error"] = result.get("error", "")[:200]
+ tests.append(flash_info)
+
+ # Test 3: MAC address
+ result = await self._run_esptool(port, ["read-mac"], timeout=15.0)
+ mac_info: dict[str, Any] = {"test": "mac_address", "success": result["success"]}
+ if result["success"]:
+ mac_match = re.search(r"MAC:\s*([0-9a-fA-F:]+)", result["output"])
+ mac_info["mac"] = mac_match.group(1) if mac_match else "unknown"
+ else:
+ mac_info["error"] = result.get("error", "")[:200]
+ tests.append(mac_info)
+
+ # Extended tests
+ if test_suite == "extended":
+ # Test 4: Read first 4KB of flash (checks flash connectivity)
+ import tempfile
+
+ with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
+ tmp_path = tmp.name
+
+ try:
+ result = await self._run_esptool(
+ port,
+ ["read-flash", "0x0", "4096", tmp_path],
+ timeout=60.0,
+ )
+ read_info: dict[str, Any] = {"test": "flash_read_4kb", "success": result["success"]}
+ if result["success"]:
+ data = Path(tmp_path).read_bytes()
+ read_info["bytes_read"] = len(data)
+ # Check if flash is all 0xFF (erased) or has data
+ non_ff = sum(1 for b in data if b != 0xFF)
+ read_info["has_data"] = non_ff > 0
+ read_info["non_erased_bytes"] = non_ff
+ else:
+ read_info["error"] = result.get("error", "")[:200]
+ tests.append(read_info)
+ finally:
+ import os
+
+ try:
+ os.unlink(tmp_path)
+ except OSError:
+ pass
+
+ elapsed = round(time.time() - t_start, 2)
+
+ # Determine overall pass/fail
+ passed = sum(1 for t in tests if t["success"])
+ all_passed = passed == len(tests)
+
+ return {
+ "success": True,
+ "port": port,
+ "test_suite": test_suite,
+ "verdict": "PASS" if all_passed else "FAIL",
+ "tests_run": len(tests),
+ "tests_passed": passed,
+ "tests_failed": len(tests) - passed,
+ "total_time_seconds": elapsed,
+ "tests": tests,
+ }
async def health_check(self) -> dict[str, Any]:
"""Component health check"""