diff --git a/src/mcp_esptool_server/components/diagnostics.py b/src/mcp_esptool_server/components/diagnostics.py index ddaa345..c6c8338 100644 --- a/src/mcp_esptool_server/components/diagnostics.py +++ b/src/mcp_esptool_server/components/diagnostics.py @@ -1,11 +1,15 @@ """ Diagnostics Component -Provides comprehensive ESP device diagnostics including memory dumps, -performance profiling, and diagnostic reporting. +Provides ESP device diagnostics including memory dumps, flash identification, +performance profiling, and comprehensive diagnostic reporting. All operations +shell out to esptool as async subprocesses. """ +import asyncio import logging +import re +import time from typing import Any from fastmcp import Context, FastMCP @@ -14,15 +18,65 @@ from ..config import ESPToolServerConfig logger = logging.getLogger(__name__) +# Size suffixes for human-friendly parsing +_SIZE_MULTIPLIERS = {"B": 1, "KB": 1024, "MB": 1024 * 1024} + + +def _parse_size(size_str: str) -> int: + """Parse a human-friendly size string like '1KB', '256B', '4MB' into bytes.""" + size_str = size_str.strip().upper() + for suffix, mult in sorted(_SIZE_MULTIPLIERS.items(), key=lambda x: -len(x[0])): + if size_str.endswith(suffix): + num = size_str[: -len(suffix)].strip() + return int(num) * mult + # Try as plain integer (decimal or hex) + return int(size_str, 0) + class Diagnostics: """ESP device diagnostics and analysis""" - def __init__(self, app: FastMCP, config: ESPToolServerConfig): + def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None: self.app = app self.config = config self._register_tools() + async def _run_esptool( + self, + port: str, + args: list[str], + timeout: float = 30.0, + ) -> dict[str, Any]: + """Run esptool as an async subprocess.""" + cmd = [self.config.esptool_path, "--port", port, *args] + proc = None + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + output = (stdout or b"").decode() + (stderr or b"").decode() + + if proc.returncode != 0: + return {"success": False, "error": output.strip()[:500]} + + return {"success": True, "output": output} + + except asyncio.TimeoutError: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + return {"success": False, "error": f"Timeout after {timeout}s"} + except FileNotFoundError: + return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"} + except Exception as e: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + return {"success": False, "error": str(e)} + def _register_tools(self) -> None: """Register diagnostic tools""" @@ -33,22 +87,269 @@ class Diagnostics: start_address: str = "0x0", size: str = "1KB", ) -> dict[str, Any]: - """Dump device memory for analysis""" - return {"success": True, "note": "Implementation coming soon"} + """Dump device memory for analysis. + + Reads raw bytes from an arbitrary memory address on the ESP device + using esptool's dump-mem command. Useful for inspecting bootloader + state, peripheral registers, or RAM contents. + + The output is hex-formatted for readability. For flash memory reads, + use esp_flash_read instead (faster, supports larger ranges). + + Args: + port: Serial port or socket:// URI (required) + start_address: Memory address to start reading (hex string, default: "0x0") + size: Number of bytes to read (e.g. "256B", "1KB", "4KB", default: "1KB") + """ + return await self._memory_dump_impl(context, port, start_address, size) @self.app.tool("esp_performance_profile") async def performance_profile( - context: Context, port: str | None = None, duration: int = 30 + context: Context, + port: str | None = None, + duration: int = 30, ) -> dict[str, Any]: - """Profile device performance""" - return {"success": True, "note": "Implementation coming soon"} + """Profile device communication performance. + + Measures serial transport speed by timing a sequence of esptool + operations (chip-id, flash-id, small memory reads). Reports + round-trip latencies and throughput estimates. Useful for comparing + physical serial vs QEMU socket performance. + + Args: + port: Serial port or socket:// URI (required) + duration: Not used for timing control; kept for API compatibility + """ + return await self._performance_profile_impl(context, port) @self.app.tool("esp_diagnostic_report") async def diagnostic_report( - context: Context, port: str | None = None, include_memory: bool = False + context: Context, + port: str | None = None, + include_memory: bool = False, ) -> dict[str, Any]: - """Generate comprehensive diagnostic report""" - return {"success": True, "note": "Implementation coming soon"} + """Generate comprehensive diagnostic report for an ESP device. + + Collects chip identity, MAC address, flash information, and + optionally a small memory dump into a single structured report. + Useful for troubleshooting connectivity issues or characterizing + an unknown device. + + For security-focused analysis, use esp_security_audit instead. + + Args: + port: Serial port or socket:// URI (required) + include_memory: Include a 256-byte memory dump from 0x0 (default: false) + """ + return await self._diagnostic_report_impl(context, port, include_memory) + + async def _memory_dump_impl( + self, + context: Context, + port: str | None, + start_address: str, + size: str, + ) -> dict[str, Any]: + """Read memory via esptool dump-mem (writes to temp file, then reads it).""" + + if not port: + return {"success": False, "error": "Port is required for memory dump"} + + byte_count = _parse_size(size) + if byte_count > 1024 * 1024: + return {"success": False, "error": "Maximum dump size is 1MB"} + + # dump-mem writes raw bytes to a file + import tempfile + with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp: + tmp_path = tmp.name + + try: + result = await self._run_esptool( + port, + ["dump-mem", start_address, str(byte_count), tmp_path], + timeout=60.0, + ) + + if not result["success"]: + return {"success": False, "error": result["error"], "port": port} + + # Read the dump file and format as hex + from pathlib import Path + dump_path = Path(tmp_path) + if not dump_path.exists() or dump_path.stat().st_size == 0: + return {"success": False, "error": "Dump file is empty", "port": port} + + raw = dump_path.read_bytes() + + # Format as hex dump (16 bytes per line with ASCII) + hex_lines = [] + for offset in range(0, len(raw), 16): + chunk = raw[offset : offset + 16] + addr = int(start_address, 0) + offset + hex_part = " ".join(f"{b:02x}" for b in chunk) + ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) + hex_lines.append(f"0x{addr:08x}: {hex_part:<48s} {ascii_part}") + + return { + "success": True, + "port": port, + "start_address": start_address, + "bytes_read": len(raw), + "hex_dump": "\n".join(hex_lines[:64]), # Cap at 64 lines (1KB) + "truncated": len(hex_lines) > 64, + } + + finally: + import os + try: + os.unlink(tmp_path) + except OSError: + pass + + async def _performance_profile_impl( + self, + context: Context, + port: str | None, + ) -> dict[str, Any]: + """Profile serial transport by timing esptool operations.""" + + if not port: + return {"success": False, "error": "Port is required for profiling"} + + measurements: list[dict[str, Any]] = [] + + # Test 1: chip-id (lightweight command) + t0 = time.time() + r = await self._run_esptool(port, ["chip-id"], timeout=15.0) + elapsed = round(time.time() - t0, 3) + measurements.append({ + "operation": "chip-id", + "elapsed_seconds": elapsed, + "success": r["success"], + }) + + # Test 2: flash-id (reads SPI flash ID register) + t0 = time.time() + r = await self._run_esptool(port, ["flash-id"], timeout=15.0) + elapsed = round(time.time() - t0, 3) + measurements.append({ + "operation": "flash-id", + "elapsed_seconds": elapsed, + "success": r["success"], + }) + + # Test 3: read-mac + t0 = time.time() + r = await self._run_esptool(port, ["read-mac"], timeout=15.0) + elapsed = round(time.time() - t0, 3) + measurements.append({ + "operation": "read-mac", + "elapsed_seconds": elapsed, + "success": r["success"], + }) + + # Test 4: read 4KB of flash (throughput test) + import tempfile + with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp: + tmp_path = tmp.name + + try: + t0 = time.time() + r = await self._run_esptool( + port, + ["read-flash", "0x0", "4096", tmp_path], + timeout=60.0, + ) + elapsed = round(time.time() - t0, 3) + + throughput = None + if r["success"] and elapsed > 0: + throughput = round(4096 / elapsed, 0) + + measurements.append({ + "operation": "read-flash (4KB)", + "elapsed_seconds": elapsed, + "success": r["success"], + "throughput_bytes_per_sec": throughput, + }) + finally: + import os + try: + os.unlink(tmp_path) + except OSError: + pass + + # Summary + successes = [m for m in measurements if m["success"]] + avg_latency = ( + round(sum(m["elapsed_seconds"] for m in successes) / len(successes), 3) + if successes + else None + ) + + return { + "success": True, + "port": port, + "measurements": measurements, + "summary": { + "operations_tested": len(measurements), + "operations_succeeded": len(successes), + "average_latency_seconds": avg_latency, + }, + } + + async def _diagnostic_report_impl( + self, + context: Context, + port: str | None, + include_memory: bool, + ) -> dict[str, Any]: + """Generate comprehensive device diagnostic report.""" + + if not port: + return {"success": False, "error": "Port is required for diagnostic report"} + + report: dict[str, Any] = {"port": port} + + # 1. Chip identification + chip_result = await self._run_esptool(port, ["chip-id"], timeout=15.0) + if chip_result["success"]: + output = chip_result["output"] + chip_match = re.search(r"Chip is (\S+)", output) + id_match = re.search(r"Chip ID:\s*(0x[0-9a-fA-F]+)", output) + report["chip"] = chip_match.group(1) if chip_match else "unknown" + report["chip_id"] = id_match.group(1) if id_match else "unknown" + else: + return {"success": False, "error": f"Cannot reach device: {chip_result['error']}", "port": port} + + # 2. MAC address + mac_result = await self._run_esptool(port, ["read-mac"], timeout=15.0) + if mac_result["success"]: + mac_match = re.search(r"MAC:\s*([0-9a-fA-F:]+)", mac_result["output"]) + report["mac_address"] = mac_match.group(1) if mac_match else "unknown" + + # 3. Flash info + flash_result = await self._run_esptool(port, ["flash-id"], timeout=15.0) + if flash_result["success"]: + output = flash_result["output"] + mfr_match = re.search(r"Manufacturer:\s*(0x[0-9a-fA-F]+)", output) + dev_match = re.search(r"Device:\s*(0x[0-9a-fA-F]+)", output) + size_match = re.search(r"Detected flash size:\s*(\S+)", output) + report["flash"] = { + "manufacturer": mfr_match.group(1) if mfr_match else "unknown", + "device": dev_match.group(1) if dev_match else "unknown", + "size": size_match.group(1) if size_match else "unknown", + } + + # 4. Optional memory dump + if include_memory: + mem_result = await self._memory_dump_impl(context, port, "0x0", "256B") + if mem_result.get("success"): + report["memory_dump_0x0"] = mem_result.get("hex_dump", "") + + report["success"] = True + return report async def health_check(self) -> dict[str, Any]: """Component health check""" diff --git a/src/mcp_esptool_server/components/partition_manager.py b/src/mcp_esptool_server/components/partition_manager.py index 00a5d35..fc56f4f 100644 --- a/src/mcp_esptool_server/components/partition_manager.py +++ b/src/mcp_esptool_server/components/partition_manager.py @@ -1,11 +1,14 @@ """ Partition Manager Component -Handles ESP partition table operations, OTA partition management, -and custom partition configurations. +Handles ESP partition table operations: generating OTA-capable tables, +custom partition layouts, and reading/analyzing partition tables from +connected devices. """ +import asyncio import logging +from pathlib import Path from typing import Any from fastmcp import Context, FastMCP @@ -14,38 +17,429 @@ from ..config import ESPToolServerConfig logger = logging.getLogger(__name__) +# ESP partition types and subtypes +PARTITION_TYPES = { + "app": 0x00, + "data": 0x01, +} + +APP_SUBTYPES = { + "factory": 0x00, + "ota_0": 0x10, + "ota_1": 0x11, + "ota_2": 0x12, + "ota_3": 0x13, + "test": 0x20, +} + +DATA_SUBTYPES = { + "ota": 0x00, + "phy": 0x01, + "nvs": 0x02, + "coredump": 0x03, + "nvs_keys": 0x04, + "efuse": 0x05, + "spiffs": 0x82, + "littlefs": 0x83, + "fat": 0x81, +} + +# Size multipliers +_SIZE_MULT = {"K": 1024, "M": 1024 * 1024} + + +def _parse_size_spec(spec: str) -> int: + """Parse a partition size like '1MB', '64K', '0x10000' into bytes.""" + spec = spec.strip().upper() + for suffix, mult in _SIZE_MULT.items(): + if spec.endswith(suffix + "B"): + return int(spec[: -len(suffix) - 1]) * mult + if spec.endswith(suffix): + return int(spec[: -len(suffix)]) * mult + return int(spec, 0) + + +def _format_size(size_bytes: int) -> str: + """Format byte count as human-readable size.""" + if size_bytes >= 1024 * 1024 and size_bytes % (1024 * 1024) == 0: + return f"{size_bytes // (1024 * 1024)}MB" + if size_bytes >= 1024 and size_bytes % 1024 == 0: + return f"{size_bytes // 1024}KB" + return f"{size_bytes}B" + class PartitionManager: """ESP partition table management""" - def __init__(self, app: FastMCP, config: ESPToolServerConfig): + def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None: self.app = app self.config = config self._register_tools() + async def _run_esptool( + self, + port: str, + args: list[str], + timeout: float = 30.0, + ) -> dict[str, Any]: + """Run esptool as an async subprocess.""" + cmd = [self.config.esptool_path, "--port", port, *args] + proc = None + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + output = (stdout or b"").decode() + (stderr or b"").decode() + if proc.returncode != 0: + return {"success": False, "error": output.strip()[:500]} + return {"success": True, "output": output} + except asyncio.TimeoutError: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + return {"success": False, "error": f"Timeout after {timeout}s"} + except FileNotFoundError: + return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"} + except Exception as e: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + return {"success": False, "error": str(e)} + def _register_tools(self) -> None: """Register partition management tools""" @self.app.tool("esp_partition_create_ota") async def create_ota_partition( - context: Context, flash_size: str = "4MB", app_size: str = "1MB" + context: Context, + flash_size: str = "4MB", + app_size: str = "1MB", ) -> dict[str, Any]: - """Create OTA-enabled partition table""" - return {"success": True, "note": "Implementation coming soon"} + """Create OTA-enabled partition table. + + Generates a partition table CSV with two OTA app slots, NVS storage, + OTA data partition, and PHY calibration data. The layout follows + Espressif's recommended OTA structure. + + The generated CSV can be converted to binary with gen_esp32part.py + (from ESP-IDF) and flashed to the partition table offset (typically 0x8000). + + Args: + flash_size: Total flash size (e.g. "4MB", "8MB", "16MB", default: "4MB") + app_size: Size for each OTA app slot (e.g. "1MB", "1536K", default: "1MB") + """ + return await self._create_ota_impl(context, flash_size, app_size) @self.app.tool("esp_partition_custom") async def create_custom_partition( - context: Context, partition_config: dict[str, Any] + context: Context, + partition_config: dict[str, Any], ) -> dict[str, Any]: - """Create custom partition table""" - return {"success": True, "note": "Implementation coming soon"} + """Create custom partition table from a configuration dict. + + Accepts a list of partition entries and generates a valid ESP + partition table CSV. Each entry needs: name, type, subtype, size. + Offset is auto-calculated if omitted. + + Example partition_config: + { + "partitions": [ + {"name": "nvs", "type": "data", "subtype": "nvs", "size": "24K"}, + {"name": "factory", "type": "app", "subtype": "factory", "size": "1MB"}, + {"name": "storage", "type": "data", "subtype": "spiffs", "size": "512K"} + ] + } + + Args: + partition_config: Dict with "partitions" key containing list of entries + """ + return await self._create_custom_impl(context, partition_config) @self.app.tool("esp_partition_analyze") async def analyze_partitions( - context: Context, port: str | None = None + context: Context, + port: str | None = None, ) -> dict[str, Any]: - """Analyze current partition table""" - return {"success": True, "note": "Implementation coming soon"} + """Analyze current partition table on a connected ESP device. + + Reads the partition table from flash (at offset 0x8000, 0xC00 bytes) + and parses the binary format into a human-readable table. Shows + partition names, types, offsets, sizes, and flags. + + Works with physical devices and QEMU virtual devices. + + Args: + port: Serial port or socket:// URI (required) + """ + return await self._analyze_impl(context, port) + + async def _create_ota_impl( + self, + context: Context, + flash_size: str, + app_size: str, + ) -> dict[str, Any]: + """Generate an OTA-capable partition table.""" + + try: + total_bytes = _parse_size_spec(flash_size) + app_bytes = _parse_size_spec(app_size) + except (ValueError, TypeError) as e: + return {"success": False, "error": f"Invalid size: {e}"} + + # Standard layout: + # 0x9000 - nvs (24KB) + # 0xf000 - otadata (8KB) + # 0x11000 - phy_init (4KB) + # 0x12000 - ota_0 (app_size) + # ota_0 + app_size - ota_1 (app_size) + + nvs_size = 24 * 1024 + otadata_size = 8 * 1024 + phy_size = 4 * 1024 + + # Check it all fits (partition table at 0x8000 + 0x1000) + overhead = 0x9000 + nvs_size + otadata_size + phy_size # Before first app + needed = overhead + (2 * app_bytes) + if needed > total_bytes: + return { + "success": False, + "error": ( + f"Layout requires {_format_size(needed)} but flash is {flash_size}. " + f"Reduce app_size or increase flash_size." + ), + } + + partitions = [ + ("nvs", "data", "nvs", "0x9000", _format_size(nvs_size)), + ("otadata", "data", "ota", f"0x{0x9000 + nvs_size:x}", _format_size(otadata_size)), + ("phy_init", "data", "phy", f"0x{0x9000 + nvs_size + otadata_size:x}", _format_size(phy_size)), + ("ota_0", "app", "ota_0", f"0x{overhead:x}", _format_size(app_bytes)), + ("ota_1", "app", "ota_1", f"0x{overhead + app_bytes:x}", _format_size(app_bytes)), + ] + + # Remaining space for storage + used = overhead + (2 * app_bytes) + remaining = total_bytes - used + if remaining >= 4096: + partitions.append( + ("storage", "data", "spiffs", f"0x{used:x}", _format_size(remaining)) + ) + + # Generate CSV + csv_lines = ["# ESP-IDF Partition Table (OTA layout)", "# Name, Type, SubType, Offset, Size, Flags"] + for name, ptype, subtype, offset, size in partitions: + csv_lines.append(f"{name}, {ptype}, {subtype}, {offset}, {size},") + + csv_text = "\n".join(csv_lines) + "\n" + + return { + "success": True, + "flash_size": flash_size, + "app_size": app_size, + "partition_csv": csv_text, + "partitions": [ + {"name": p[0], "type": p[1], "subtype": p[2], "offset": p[3], "size": p[4]} + for p in partitions + ], + "space_remaining": _format_size(remaining) if remaining >= 4096 else "0", + "note": "Flash this CSV with gen_esp32part.py to binary, then write to 0x8000", + } + + async def _create_custom_impl( + self, + context: Context, + partition_config: dict[str, Any], + ) -> dict[str, Any]: + """Generate a custom partition table from config.""" + + partitions_input = partition_config.get("partitions", []) + if not partitions_input: + return {"success": False, "error": "partition_config must have a 'partitions' list"} + + # Auto-calculate offsets starting after partition table (0x9000) + current_offset = 0x9000 + partitions = [] + errors = [] + + for i, entry in enumerate(partitions_input): + name = entry.get("name") + ptype = entry.get("type") + subtype = entry.get("subtype") + size_str = entry.get("size") + + if not all([name, ptype, subtype, size_str]): + errors.append(f"Partition {i}: requires name, type, subtype, size") + continue + + # Validate type + if ptype not in PARTITION_TYPES: + errors.append(f"Partition '{name}': invalid type '{ptype}' (use: {list(PARTITION_TYPES.keys())})") + continue + + # Validate subtype + valid_subtypes = APP_SUBTYPES if ptype == "app" else DATA_SUBTYPES + if subtype not in valid_subtypes: + errors.append(f"Partition '{name}': invalid subtype '{subtype}' (use: {list(valid_subtypes.keys())})") + continue + + try: + size_bytes = _parse_size_spec(size_str) + except (ValueError, TypeError): + errors.append(f"Partition '{name}': invalid size '{size_str}'") + continue + + # Use explicit offset if provided, otherwise auto-calculate + offset = entry.get("offset") + if offset: + current_offset = int(offset, 0) if isinstance(offset, str) else offset + + # App partitions must be 64KB aligned + if ptype == "app" and current_offset % 0x10000 != 0: + current_offset = (current_offset + 0xFFFF) & ~0xFFFF + + partitions.append({ + "name": name, + "type": ptype, + "subtype": subtype, + "offset": f"0x{current_offset:x}", + "size": _format_size(size_bytes), + "size_bytes": size_bytes, + }) + + current_offset += size_bytes + + if errors: + return {"success": False, "errors": errors} + + # Generate CSV + csv_lines = ["# ESP-IDF Partition Table (custom layout)", "# Name, Type, SubType, Offset, Size, Flags"] + for p in partitions: + csv_lines.append(f"{p['name']}, {p['type']}, {p['subtype']}, {p['offset']}, {p['size']},") + + csv_text = "\n".join(csv_lines) + "\n" + + return { + "success": True, + "partition_csv": csv_text, + "partitions": [{k: v for k, v in p.items() if k != "size_bytes"} for p in partitions], + "total_size": _format_size(sum(p["size_bytes"] for p in partitions)), + "note": "Flash this CSV with gen_esp32part.py to binary, then write to 0x8000", + } + + async def _analyze_impl( + self, + context: Context, + port: str | None, + ) -> dict[str, Any]: + """Read and parse partition table from a connected device.""" + + if not port: + return {"success": False, "error": "Port is required for partition analysis"} + + import tempfile + + # Partition table is at 0x8000, max size 0xC00 (3KB) + with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp: + tmp_path = tmp.name + + try: + result = await self._run_esptool( + port, + ["read-flash", "0x8000", "0xC00", tmp_path], + timeout=60.0, + ) + + if not result["success"]: + return {"success": False, "error": result["error"], "port": port} + + raw = Path(tmp_path).read_bytes() + partitions = self._parse_partition_table_binary(raw) + + if not partitions: + return { + "success": True, + "port": port, + "partitions": [], + "note": "No valid partition entries found (flash may be blank or erased)", + } + + return { + "success": True, + "port": port, + "partition_count": len(partitions), + "partitions": partitions, + } + + finally: + import os + try: + os.unlink(tmp_path) + except OSError: + pass + + def _parse_partition_table_binary(self, raw: bytes) -> list[dict[str, Any]]: + """Parse ESP32 binary partition table format. + + Each entry is 32 bytes: + - 2 bytes: magic (0xAA50) + - 1 byte: type + - 1 byte: subtype + - 4 bytes: offset (LE) + - 4 bytes: size (LE) + - 16 bytes: name (null-terminated) + - 4 bytes: flags + """ + import struct + + entry_size = 32 + magic_expected = 0x50AA # Little-endian + + # Reverse lookup tables + type_names = {v: k for k, v in PARTITION_TYPES.items()} + app_subtype_names = {v: k for k, v in APP_SUBTYPES.items()} + data_subtype_names = {v: k for k, v in DATA_SUBTYPES.items()} + + partitions = [] + for i in range(0, len(raw) - entry_size + 1, entry_size): + entry = raw[i : i + entry_size] + magic = struct.unpack_from(" dict[str, Any]: """Component health check""" diff --git a/src/mcp_esptool_server/components/security_manager.py b/src/mcp_esptool_server/components/security_manager.py index 7c6fa58..1e62774 100644 --- a/src/mcp_esptool_server/components/security_manager.py +++ b/src/mcp_esptool_server/components/security_manager.py @@ -1,11 +1,14 @@ """ Security Manager Component -Handles ESP security features including secure boot, flash encryption, -eFuse management, and security auditing. +Handles ESP security features including eFuse management, flash encryption +status, and security auditing. Operations shell out to esptool/espefuse +as async subprocesses. """ +import asyncio import logging +import re from typing import Any from fastmcp import Context, FastMCP @@ -18,39 +21,411 @@ logger = logging.getLogger(__name__) class SecurityManager: """ESP security features management""" - def __init__(self, app: FastMCP, config: ESPToolServerConfig): + def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None: self.app = app self.config = config self._register_tools() + async def _run_cmd( + self, + cmd: list[str], + timeout: float = 30.0, + ) -> dict[str, Any]: + """Run a CLI command as an async subprocess. + + Returns: + dict with "success", "output", and optionally "error" + """ + proc = None + try: + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) + output = (stdout or b"").decode() + (stderr or b"").decode() + + if proc.returncode != 0: + return {"success": False, "error": output.strip()[:500]} + + return {"success": True, "output": output} + + except asyncio.TimeoutError: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + return {"success": False, "error": f"Timeout after {timeout}s"} + except FileNotFoundError: + return {"success": False, "error": f"Command not found: {cmd[0]}"} + except Exception as e: + if proc and proc.returncode is None: + proc.kill() + await proc.wait() + return {"success": False, "error": str(e)} + + def _espefuse_cmd(self, port: str, args: list[str]) -> list[str]: + """Build an espefuse command list.""" + return ["espefuse", "--port", port, *args] + + def _esptool_cmd(self, port: str, args: list[str]) -> list[str]: + """Build an esptool command list.""" + return [self.config.esptool_path, "--port", port, *args] + def _register_tools(self) -> None: """Register security management tools""" @self.app.tool("esp_security_audit") - async def security_audit(context: Context, port: str | None = None) -> dict[str, Any]: - """Perform comprehensive security audit""" - return {"success": True, "note": "Implementation coming soon"} + async def security_audit( + context: Context, + port: str | None = None, + ) -> dict[str, Any]: + """Perform comprehensive security audit of an ESP device. + + Connects to the device and gathers security-relevant information: + chip identity, flash encryption status, secure boot state, and + eFuse summary. Returns a structured report suitable for evaluating + the device's security posture. + + Requires a connected device (physical or QEMU via socket:// URI). + + Args: + port: Serial port or socket:// URI (required) + """ + return await self._security_audit_impl(context, port) @self.app.tool("esp_enable_flash_encryption") async def enable_flash_encryption( - context: Context, port: str | None = None, key_file: str | None = None + context: Context, + port: str | None = None, + key_file: str | None = None, ) -> dict[str, Any]: - """Enable flash encryption with optional key""" - return {"success": True, "note": "Implementation coming soon"} + """Enable flash encryption with optional key. + + Checks current flash encryption status via eFuse summary. If + encryption is already enabled, reports the current state. Actual + eFuse burning for flash encryption requires esp_efuse_burn with + specific eFuse names (FLASH_CRYPT_CNT, etc.) — this tool provides + guidance and status checking. + + WARNING: Flash encryption is a one-way operation on real hardware. + Test thoroughly on QEMU first. + + Args: + port: Serial port or socket:// URI (required) + key_file: Path to encryption key file (for reference only) + """ + return await self._flash_encryption_impl(context, port, key_file) @self.app.tool("esp_efuse_read") async def read_efuse( - context: Context, port: str | None = None, efuse_name: str | None = None + context: Context, + port: str | None = None, + efuse_name: str | None = None, ) -> dict[str, Any]: - """Read eFuse values""" - return {"success": True, "note": "Implementation coming soon"} + """Read eFuse values from an ESP device. + + Without efuse_name: returns full human-readable eFuse summary + (espefuse summary). With efuse_name: returns that specific eFuse's + value parsed from the summary. + + eFuses are one-time-programmable bits that control chip security, + MAC address, calibration data, and more. Reading is non-destructive. + + Args: + port: Serial port or socket:// URI (required) + efuse_name: Specific eFuse to read (e.g. "MAC", "FLASH_CRYPT_CNT"). + If omitted, returns full summary. + """ + return await self._efuse_read_impl(context, port, efuse_name) @self.app.tool("esp_efuse_burn") async def burn_efuse( - context: Context, efuse_name: str, value: str, port: str | None = None + context: Context, + efuse_name: str, + value: str, + port: str | None = None, ) -> dict[str, Any]: - """Burn eFuse (DANGEROUS - requires confirmation)""" - return {"success": True, "note": "Implementation coming soon"} + """Burn eFuse (DANGEROUS - requires confirmation). + + Permanently programs an eFuse bit field on the ESP device. This + operation is IRREVERSIBLE on real hardware — burned bits cannot be + reset. Safe to test on QEMU virtual devices (eFuses reset when + instance is recreated). + + Common eFuses: FLASH_CRYPT_CNT, ABS_DONE_0, JTAG_DISABLE, + DISABLE_DL_ENCRYPT, DISABLE_DL_DECRYPT. + + Uses --do-not-confirm flag since confirmation is handled at the + MCP client level. + + Args: + efuse_name: Name of the eFuse to burn (e.g. "JTAG_DISABLE") + value: Value to burn (e.g. "1", "0x1") + port: Serial port or socket:// URI (required) + """ + return await self._efuse_burn_impl(context, efuse_name, value, port) + + async def _security_audit_impl( + self, + context: Context, + port: str | None, + ) -> dict[str, Any]: + """Gather security-relevant info from the device.""" + + if not port: + return {"success": False, "error": "Port is required for security audit"} + + report: dict[str, Any] = {"port": port} + + # 1. Get chip info and security info from esptool + security_result = await self._run_cmd( + self._esptool_cmd(port, ["get-security-info"]), + timeout=15.0, + ) + + if security_result["success"]: + report["security_info"] = self._parse_security_info(security_result["output"]) + else: + # get-security-info may not be supported on all chips + report["security_info"] = {"note": security_result["error"]} + + # 2. Get chip ID + chip_result = await self._run_cmd( + self._esptool_cmd(port, ["chip-id"]), + timeout=15.0, + ) + if chip_result["success"]: + chip_match = re.search(r"Chip ID:\s*(0x[0-9a-fA-F]+)", chip_result["output"]) + if chip_match: + report["chip_id"] = chip_match.group(1) + + # 3. Get eFuse summary for security-relevant fuses + efuse_result = await self._run_cmd( + self._espefuse_cmd(port, ["summary"]), + timeout=15.0, + ) + + if efuse_result["success"]: + parsed = self._parse_efuse_summary(efuse_result["output"]) + report["efuse_summary"] = parsed + + # Extract security-relevant fields + security_fuses = {} + security_names = [ + "FLASH_CRYPT_CNT", "ABS_DONE_0", "ABS_DONE_1", + "JTAG_DISABLE", "DISABLE_DL_ENCRYPT", "DISABLE_DL_DECRYPT", + "DISABLE_DL_CACHE", "FLASH_CRYPT_CONFIG", + ] + for name in security_names: + if name in parsed: + security_fuses[name] = parsed[name] + + report["security_fuses"] = security_fuses + + # Determine security posture + flash_encrypted = security_fuses.get("FLASH_CRYPT_CNT", "0") not in ("0", "0x0", "= 0") + secure_boot = security_fuses.get("ABS_DONE_0", "0") not in ("0", "0x0", "= 0") + jtag_disabled = security_fuses.get("JTAG_DISABLE", "0") not in ("0", "0x0", "= 0") + + report["posture"] = { + "flash_encryption": "enabled" if flash_encrypted else "disabled", + "secure_boot": "enabled" if secure_boot else "disabled", + "jtag": "disabled" if jtag_disabled else "enabled (vulnerable)", + } + else: + report["efuse_error"] = efuse_result["error"] + + report["success"] = True + return report + + async def _flash_encryption_impl( + self, + context: Context, + port: str | None, + key_file: str | None, + ) -> dict[str, Any]: + """Check flash encryption status and provide guidance.""" + + if not port: + return {"success": False, "error": "Port is required"} + + # Read the encryption-relevant eFuses + efuse_result = await self._run_cmd( + self._espefuse_cmd(port, ["summary"]), + timeout=15.0, + ) + + if not efuse_result["success"]: + return {"success": False, "error": efuse_result["error"]} + + parsed = self._parse_efuse_summary(efuse_result["output"]) + + flash_crypt_cnt = parsed.get("FLASH_CRYPT_CNT", "unknown") + flash_crypt_config = parsed.get("FLASH_CRYPT_CONFIG", "unknown") + + # Determine current state + is_encrypted = flash_crypt_cnt not in ("0", "0x0", "= 0", "unknown") + + result: dict[str, Any] = { + "success": True, + "port": port, + "flash_encryption_enabled": is_encrypted, + "FLASH_CRYPT_CNT": flash_crypt_cnt, + "FLASH_CRYPT_CONFIG": flash_crypt_config, + } + + if is_encrypted: + result["message"] = "Flash encryption is already enabled on this device." + else: + result["message"] = ( + "Flash encryption is NOT enabled. To enable, you need to: " + "1) Generate or provide an encryption key, " + "2) Burn FLASH_CRYPT_CNT and FLASH_CRYPT_CONFIG eFuses, " + "3) Flash encrypted firmware. " + "WARNING: This is irreversible on real hardware. Test on QEMU first." + ) + if key_file: + result["key_file"] = key_file + + return result + + async def _efuse_read_impl( + self, + context: Context, + port: str | None, + efuse_name: str | None, + ) -> dict[str, Any]: + """Read eFuse values via espefuse summary.""" + + if not port: + return {"success": False, "error": "Port is required for eFuse read"} + + result = await self._run_cmd( + self._espefuse_cmd(port, ["summary"]), + timeout=15.0, + ) + + if not result["success"]: + return {"success": False, "error": result["error"], "port": port} + + parsed = self._parse_efuse_summary(result["output"]) + + if efuse_name: + # Return specific eFuse + if efuse_name in parsed: + return { + "success": True, + "port": port, + "efuse_name": efuse_name, + "value": parsed[efuse_name], + } + else: + # Try case-insensitive match + for key, val in parsed.items(): + if key.upper() == efuse_name.upper(): + return { + "success": True, + "port": port, + "efuse_name": key, + "value": val, + } + return { + "success": False, + "error": f"eFuse '{efuse_name}' not found", + "available_efuses": list(parsed.keys()), + "port": port, + } + + # Return full summary + return { + "success": True, + "port": port, + "efuses": parsed, + "raw_output": result["output"][:2000], + } + + async def _efuse_burn_impl( + self, + context: Context, + efuse_name: str, + value: str, + port: str | None, + ) -> dict[str, Any]: + """Burn an eFuse value via espefuse burn-efuse.""" + + if not port: + return {"success": False, "error": "Port is required for eFuse burn"} + + # Read before to record the change + before = await self._run_cmd( + self._espefuse_cmd(port, ["summary"]), + timeout=15.0, + ) + before_parsed = self._parse_efuse_summary(before.get("output", "")) if before["success"] else {} + before_value = before_parsed.get(efuse_name, "unknown") + + # Burn the eFuse (--do-not-confirm since MCP client handles confirmation) + result = await self._run_cmd( + self._espefuse_cmd(port, ["--do-not-confirm", "burn-efuse", efuse_name, value]), + timeout=30.0, + ) + + if not result["success"]: + return { + "success": False, + "error": result["error"], + "port": port, + "efuse_name": efuse_name, + } + + # Read after to confirm + after = await self._run_cmd( + self._espefuse_cmd(port, ["summary"]), + timeout=15.0, + ) + after_parsed = self._parse_efuse_summary(after.get("output", "")) if after["success"] else {} + after_value = after_parsed.get(efuse_name, "unknown") + + return { + "success": True, + "port": port, + "efuse_name": efuse_name, + "value_requested": value, + "value_before": before_value, + "value_after": after_value, + "warning": "eFuse burn is IRREVERSIBLE on real hardware", + } + + def _parse_efuse_summary(self, output: str) -> dict[str, str]: + """Parse espefuse summary output into a dict of name -> value. + + espefuse summary lines look like: + EFUSE_NAME (BLOCK0) = 0x00000000 R/W + or: + MAC (BLOCK0) = ab:cd:ef:01:02:03 R/W + """ + efuses: dict[str, str] = {} + for line in output.splitlines(): + # Match lines with "NAME (BLOCKn) ... = value" + match = re.match(r"\s*(\w+)\s+\(BLOCK\d+\)\s+.*?=\s+(.+?)\s+[RW/-]+\s*$", line) + if match: + name = match.group(1).strip() + value = match.group(2).strip() + efuses[name] = value + return efuses + + def _parse_security_info(self, output: str) -> dict[str, str]: + """Parse esptool get-security-info output.""" + info: dict[str, str] = {} + for line in output.splitlines(): + if ":" in line: + key, _, val = line.partition(":") + key = key.strip() + val = val.strip() + if key and val: + info[key] = val + return info async def health_check(self) -> dict[str, Any]: """Component health check"""