""" Chip Control Component Provides ESP32/ESP8266 chip detection, connection verification, and basic control operations using esptool CLI subprocesses. """ import asyncio import logging import os import re import time from typing import Any from fastmcp import Context, FastMCP from ..config import ESPToolServerConfig logger = logging.getLogger(__name__) class ChipControl: """ESP32/ESP8266 chip control and management via esptool subprocess""" def __init__(self, app: FastMCP, config: ESPToolServerConfig): self.app = app self.config = config # Set by server after QemuManager initialization (avoids circular import) self.qemu_manager = None self._register_tools() def _register_tools(self) -> None: """Register chip control tools with FastMCP""" @self.app.tool("esp_detect_chip") async def detect_chip( context: Context, port: str | None = None, baud_rate: int | None = None, detailed: bool = False, ) -> dict[str, Any]: """ Detect ESP chip type and gather comprehensive information Args: port: Serial port (auto-detect if not specified) baud_rate: Connection baud rate (use config default if not specified) detailed: Include detailed chip information and eFuse data """ return await self._detect_chip_impl(context, port, baud_rate, detailed) @self.app.tool("esp_connect_advanced") async def connect_advanced( context: Context, port: str | None = None, baud_rate: int | None = None, timeout: int | None = None, use_stub: bool = True, retry_count: int = 3, ) -> dict[str, Any]: """ Advanced ESP device connection with retry logic and stub loading Args: port: Serial port (auto-detect if not specified) baud_rate: Connection baud rate timeout: Connection timeout in seconds use_stub: Load ROM bootloader stub for faster operations retry_count: Number of connection attempts """ return await self._connect_advanced_impl( context, port, baud_rate, timeout, use_stub, retry_count ) @self.app.tool("esp_reset_chip") async def reset_chip( context: Context, port: str | None = None, reset_type: str = "hard" ) -> dict[str, Any]: """ Reset ESP chip using various methods Args: port: Serial port (use active connection if not specified) reset_type: Type of reset (hard, soft, bootloader) """ return await self._reset_chip_impl(context, port, reset_type) @self.app.tool("esp_scan_ports") async def scan_ports(context: Context, detailed: bool = False) -> dict[str, Any]: """ Scan for available ESP devices on all ports Args: detailed: Include detailed information about each detected device """ return await self._scan_ports_impl(context, detailed) @self.app.tool("esp_load_test_firmware") async def load_test_firmware( context: Context, port: str | None = None, firmware_type: str = "blink" ) -> dict[str, Any]: """ Load test firmware for chip validation Args: port: Serial port (auto-detect if not specified) firmware_type: Type of test firmware (blink, hello_world, wifi_scan) """ return await self._load_test_firmware_impl(context, port, firmware_type) # ------------------------------------------------------------------ # Subprocess runner # ------------------------------------------------------------------ async def _run_esptool( self, port: str, command: str, timeout: float = 10.0, connect_attempts: int = 3, extra_args: list[str] | None = None, ) -> dict[str, Any]: """ Run an esptool command as a fully async subprocess. Args: port: Serial port or socket:// URI command: esptool command (e.g. "chip-id", "flash-id") timeout: Timeout in seconds connect_attempts: Number of connection attempts extra_args: Additional CLI flags inserted before the command Returns: dict with "success", "output", and optionally "error" """ cmd = [ self.config.esptool_path, "--port", port, "--connect-attempts", str(connect_attempts), ] if extra_args: cmd.extend(extra_args) cmd.append(command) proc = None try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) output = (stdout or b"").decode() + (stderr or b"").decode() if proc.returncode != 0: return {"success": False, "error": output.strip()} return {"success": True, "output": output} except asyncio.TimeoutError: if proc and proc.returncode is None: proc.kill() await proc.wait() return {"success": False, "error": f"Timeout ({timeout}s)"} except FileNotFoundError: return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"} except Exception as e: if proc and proc.returncode is None: proc.kill() await proc.wait() return {"success": False, "error": str(e)} # ------------------------------------------------------------------ # Output parsing helpers # ------------------------------------------------------------------ @staticmethod def _parse_chip_output(output: str) -> dict[str, Any]: """Extract chip info fields from esptool chip-id / flash-id output.""" result: dict[str, Any] = {} chip_match = re.search(r"Chip type:\s*(.+?)(?:\n|$)", output) if not chip_match: chip_match = re.search(r"Chip is\s+(.+?)(?:\n|$)", output) if not chip_match: chip_match = re.search(r"Detecting chip type[.…]+\s*(\S+)", output) if chip_match: result["chip_type"] = chip_match.group(1).strip() mac_match = re.search(r"MAC:\s*([0-9a-f:]+)", output, re.IGNORECASE) if mac_match: result["mac_address"] = mac_match.group(1) features_match = re.search(r"Features:\s*(.+?)(?:\n|$)", output) if features_match: result["features"] = [f.strip() for f in features_match.group(1).split(",")] crystal_match = re.search(r"Crystal\s+(?:frequency:\s*|is\s+)(\d+)\s*MHz", output) if crystal_match: result["crystal_freq"] = f"{crystal_match.group(1)}MHz" flash_size_match = re.search(r"Detected flash size:\s*(\S+)", output) if flash_size_match: result["flash_size"] = flash_size_match.group(1) flash_mfr_match = re.search(r"Manufacturer:\s*(\S+)", output) if flash_mfr_match: result["flash_manufacturer"] = flash_mfr_match.group(1) return result # ------------------------------------------------------------------ # Tool implementations # ------------------------------------------------------------------ async def _detect_chip_impl( self, context: Context, port: str | None, baud_rate: int | None, detailed: bool ) -> dict[str, Any]: """Detect chip type via esptool chip-id subprocess.""" if not port: port = await self._auto_detect_port() if not port: return { "success": False, "error": "No ESP devices found on available ports", "scanned_ports": self.config.get_common_ports(), } baud_rate = baud_rate or self.config.default_baud_rate start_time = time.time() info = await self._run_esptool( port, "chip-id", extra_args=["--baud", str(baud_rate)], connect_attempts=1, ) if not info["success"]: return {"success": False, "error": info["error"], "port": port, "baud_rate": baud_rate} parsed = self._parse_chip_output(info["output"]) # Optionally fetch flash details if detailed: flash_info = await self._run_esptool( port, "flash-id", extra_args=["--baud", str(baud_rate)], ) if flash_info["success"]: parsed.update(self._parse_chip_output(flash_info["output"])) connection_time = time.time() - start_time chip_data = ( { "chip_type": parsed.get("chip_type", "Unknown"), "mac_address": parsed.get("mac_address"), "flash_size": parsed.get("flash_size"), "crystal_frequency": parsed.get("crystal_freq"), "features": parsed.get("features"), } if detailed else { "chip_type": parsed.get("chip_type", "Unknown"), "mac_address": parsed.get("mac_address"), } ) return { "success": True, "port": port, "baud_rate": baud_rate, "connection_time_seconds": round(connection_time, 2), "chip_info": chip_data, } async def _connect_advanced_impl( self, context: Context, port: str | None, baud_rate: int | None, timeout: int | None, use_stub: bool, retry_count: int, ) -> dict[str, Any]: """Verify device connectivity with retries via esptool chip-id subprocess.""" if not port: port = await self._auto_detect_port() if not port: return {"success": False, "error": "No ESP devices found"} baud_rate = baud_rate or self.config.default_baud_rate connection_timeout = float(timeout or self.config.connection_timeout) last_error = None for attempt in range(retry_count): logger.info("Connection attempt %d/%d on %s", attempt + 1, retry_count, port) info = await self._run_esptool( port, "chip-id", timeout=connection_timeout, connect_attempts=1, extra_args=["--baud", str(baud_rate)], ) if info["success"]: parsed = self._parse_chip_output(info["output"]) return { "success": True, "port": port, "baud_rate": baud_rate, "attempt": attempt + 1, "stub_loaded": use_stub, # CLI loads stubs automatically "chip_type": parsed.get("chip_type", "Unknown"), "mac_address": parsed.get("mac_address"), } last_error = info["error"] logger.warning("Attempt %d failed: %s", attempt + 1, last_error) if attempt < retry_count - 1: await asyncio.sleep(1) return {"success": False, "error": last_error, "attempts": retry_count, "port": port} async def _reset_chip_impl( self, context: Context, port: str | None, reset_type: str ) -> dict[str, Any]: """Reset chip via esptool --after flag.""" if not port: port = await self._auto_detect_port() if not port: return {"success": False, "error": "No ESP devices found"} after_map = { "hard": "hard_reset", "soft": "soft_reset", "bootloader": "no_reset", } if reset_type not in after_map: return { "success": False, "error": f"Unknown reset type: {reset_type}", "available_types": list(after_map.keys()), } info = await self._run_esptool( port, "chip-id", timeout=10.0, connect_attempts=1, extra_args=["--after", after_map[reset_type]], ) if not info["success"]: return {"success": False, "error": info["error"], "port": port, "reset_type": reset_type} return { "success": True, "port": port, "reset_type": reset_type, "timestamp": time.time(), } async def _scan_ports_impl(self, context: Context, detailed: bool) -> dict[str, Any]: """Scan for available ESP devices using subprocess probes.""" common_esp_ports = [ "/dev/ttyUSB0", "/dev/ttyUSB1", "/dev/ttyUSB2", "/dev/ttyUSB3", "/dev/ttyACM0", "/dev/ttyACM1", "/dev/ttyACM2", "/dev/ttyACM3", ] usb_ports = [p for p in common_esp_ports if os.path.exists(p)] detected_devices: list[dict[str, Any]] = [] scan_results: dict[str, Any] = {} if not usb_ports: # Still check QEMU before returning empty qemu_devices = self._get_qemu_devices() detected_devices.extend(qemu_devices) return { "success": True, "detected_devices": detected_devices, "total_scanned": len(common_esp_ports) + len(qemu_devices), "checked_ports": common_esp_ports, "qemu_devices": qemu_devices or None, "scan_results": {"note": "No USB/ACM ports found on system"}, "timestamp": time.time(), } for port in usb_ports: info = await self._run_esptool(port, "chip-id", connect_attempts=1) device_info: dict[str, Any] = {"port": port, "available": info["success"]} if info["success"]: device_info.update(self._parse_chip_output(info["output"])) if detailed: flash_info = await self._run_esptool(port, "flash-id") if flash_info["success"]: device_info.update(self._parse_chip_output(flash_info["output"])) detected_devices.append(device_info) else: device_info["error"] = info["error"] scan_results[port] = device_info qemu_devices = self._get_qemu_devices() detected_devices.extend(qemu_devices) return { "success": True, "detected_devices": detected_devices, "total_scanned": len(usb_ports) + len(qemu_devices), "checked_ports": common_esp_ports, "available_ports": usb_ports, "qemu_devices": qemu_devices or None, "scan_results": scan_results if detailed else None, "timestamp": time.time(), } async def _load_test_firmware_impl( self, context: Context, port: str | None, firmware_type: str ) -> dict[str, Any]: """Load test firmware (stub — requires ESP-IDF integration).""" if not port: port = await self._auto_detect_port() if not port: return {"success": False, "error": "No ESP devices found"} test_firmwares = { "blink": "Simple LED blink test", "hello_world": "Serial output hello world", "wifi_scan": "WiFi network scanner", } if firmware_type not in test_firmwares: return { "success": False, "error": f"Unknown firmware type: {firmware_type}", "available_types": list(test_firmwares.keys()), } return { "success": True, "port": port, "firmware_type": firmware_type, "description": test_firmwares[firmware_type], "note": "Test firmware loading requires ESP-IDF integration (coming soon)", "timestamp": time.time(), } # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _get_qemu_devices(self) -> list[dict[str, Any]]: """Collect running QEMU instances as device entries.""" if not self.qemu_manager: return [] devices = [] for qemu_info in self.qemu_manager.get_running_ports(): qemu_info["available"] = True devices.append(qemu_info) return devices async def _auto_detect_port(self) -> str | None: """Auto-detect an ESP device port via quick subprocess probes.""" for port in self.config.get_common_ports(): if not os.path.exists(port): continue info = await self._run_esptool(port, "chip-id", connect_attempts=1) if info["success"]: return port return None async def health_check(self) -> dict[str, Any]: """Component health check""" return { "status": "healthy", "esptool_path": self.config.esptool_path, }