""" Partition Manager Component Handles ESP partition table operations: generating OTA-capable tables, custom partition layouts, and reading/analyzing partition tables from connected devices. """ import asyncio import logging from pathlib import Path from typing import Any from fastmcp import Context, FastMCP from ..config import ESPToolServerConfig logger = logging.getLogger(__name__) # ESP partition types and subtypes PARTITION_TYPES = { "app": 0x00, "data": 0x01, } APP_SUBTYPES = { "factory": 0x00, "ota_0": 0x10, "ota_1": 0x11, "ota_2": 0x12, "ota_3": 0x13, "test": 0x20, } DATA_SUBTYPES = { "ota": 0x00, "phy": 0x01, "nvs": 0x02, "coredump": 0x03, "nvs_keys": 0x04, "efuse": 0x05, "spiffs": 0x82, "littlefs": 0x83, "fat": 0x81, } # Size multipliers _SIZE_MULT = {"K": 1024, "M": 1024 * 1024} def _parse_size_spec(spec: str) -> int: """Parse a partition size like '1MB', '64K', '0x10000' into bytes.""" spec = spec.strip().upper() for suffix, mult in _SIZE_MULT.items(): if spec.endswith(suffix + "B"): return int(spec[: -len(suffix) - 1]) * mult if spec.endswith(suffix): return int(spec[: -len(suffix)]) * mult return int(spec, 0) def _format_size(size_bytes: int) -> str: """Format byte count as human-readable size.""" if size_bytes >= 1024 * 1024 and size_bytes % (1024 * 1024) == 0: return f"{size_bytes // (1024 * 1024)}MB" if size_bytes >= 1024 and size_bytes % 1024 == 0: return f"{size_bytes // 1024}KB" return f"{size_bytes}B" class PartitionManager: """ESP partition table management""" def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None: self.app = app self.config = config self._register_tools() async def _run_esptool( self, port: str, args: list[str], timeout: float = 30.0, ) -> dict[str, Any]: """Run esptool as an async subprocess.""" cmd = [self.config.esptool_path, "--port", port, *args] proc = None try: proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout) output = (stdout or b"").decode() + (stderr or b"").decode() if proc.returncode != 0: return {"success": False, "error": output.strip()[:500]} return {"success": True, "output": output} except asyncio.TimeoutError: if proc and proc.returncode is None: proc.kill() await proc.wait() return {"success": False, "error": f"Timeout after {timeout}s"} except FileNotFoundError: return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"} except Exception as e: if proc and proc.returncode is None: proc.kill() await proc.wait() return {"success": False, "error": str(e)} def _register_tools(self) -> None: """Register partition management tools""" @self.app.tool("esp_partition_create_ota") async def create_ota_partition( context: Context, flash_size: str = "4MB", app_size: str = "1MB", ) -> dict[str, Any]: """Create OTA-enabled partition table. Generates a partition table CSV with two OTA app slots, NVS storage, OTA data partition, and PHY calibration data. The layout follows Espressif's recommended OTA structure. The generated CSV can be converted to binary with gen_esp32part.py (from ESP-IDF) and flashed to the partition table offset (typically 0x8000). Args: flash_size: Total flash size (e.g. "4MB", "8MB", "16MB", default: "4MB") app_size: Size for each OTA app slot (e.g. "1MB", "1536K", default: "1MB") """ return await self._create_ota_impl(context, flash_size, app_size) @self.app.tool("esp_partition_custom") async def create_custom_partition( context: Context, partition_config: dict[str, Any], ) -> dict[str, Any]: """Create custom partition table from a configuration dict. Accepts a list of partition entries and generates a valid ESP partition table CSV. Each entry needs: name, type, subtype, size. Offset is auto-calculated if omitted. Example partition_config: { "partitions": [ {"name": "nvs", "type": "data", "subtype": "nvs", "size": "24K"}, {"name": "factory", "type": "app", "subtype": "factory", "size": "1MB"}, {"name": "storage", "type": "data", "subtype": "spiffs", "size": "512K"} ] } Args: partition_config: Dict with "partitions" key containing list of entries """ return await self._create_custom_impl(context, partition_config) @self.app.tool("esp_partition_analyze") async def analyze_partitions( context: Context, port: str | None = None, ) -> dict[str, Any]: """Analyze current partition table on a connected ESP device. Reads the partition table from flash (at offset 0x8000, 0xC00 bytes) and parses the binary format into a human-readable table. Shows partition names, types, offsets, sizes, and flags. Works with physical devices and QEMU virtual devices. Args: port: Serial port or socket:// URI (required) """ return await self._analyze_impl(context, port) async def _create_ota_impl( self, context: Context, flash_size: str, app_size: str, ) -> dict[str, Any]: """Generate an OTA-capable partition table.""" try: total_bytes = _parse_size_spec(flash_size) app_bytes = _parse_size_spec(app_size) except (ValueError, TypeError) as e: return {"success": False, "error": f"Invalid size: {e}"} # Standard layout: # 0x9000 - nvs (24KB) # 0xf000 - otadata (8KB) # 0x11000 - phy_init (4KB) # 0x12000 - ota_0 (app_size) # ota_0 + app_size - ota_1 (app_size) nvs_size = 24 * 1024 otadata_size = 8 * 1024 phy_size = 4 * 1024 # Check it all fits (partition table at 0x8000 + 0x1000) overhead = 0x9000 + nvs_size + otadata_size + phy_size # Before first app needed = overhead + (2 * app_bytes) if needed > total_bytes: return { "success": False, "error": ( f"Layout requires {_format_size(needed)} but flash is {flash_size}. " f"Reduce app_size or increase flash_size." ), } partitions = [ ("nvs", "data", "nvs", "0x9000", _format_size(nvs_size)), ("otadata", "data", "ota", f"0x{0x9000 + nvs_size:x}", _format_size(otadata_size)), ("phy_init", "data", "phy", f"0x{0x9000 + nvs_size + otadata_size:x}", _format_size(phy_size)), ("ota_0", "app", "ota_0", f"0x{overhead:x}", _format_size(app_bytes)), ("ota_1", "app", "ota_1", f"0x{overhead + app_bytes:x}", _format_size(app_bytes)), ] # Remaining space for storage used = overhead + (2 * app_bytes) remaining = total_bytes - used if remaining >= 4096: partitions.append( ("storage", "data", "spiffs", f"0x{used:x}", _format_size(remaining)) ) # Generate CSV csv_lines = ["# ESP-IDF Partition Table (OTA layout)", "# Name, Type, SubType, Offset, Size, Flags"] for name, ptype, subtype, offset, size in partitions: csv_lines.append(f"{name}, {ptype}, {subtype}, {offset}, {size},") csv_text = "\n".join(csv_lines) + "\n" return { "success": True, "flash_size": flash_size, "app_size": app_size, "partition_csv": csv_text, "partitions": [ {"name": p[0], "type": p[1], "subtype": p[2], "offset": p[3], "size": p[4]} for p in partitions ], "space_remaining": _format_size(remaining) if remaining >= 4096 else "0", "note": "Flash this CSV with gen_esp32part.py to binary, then write to 0x8000", } async def _create_custom_impl( self, context: Context, partition_config: dict[str, Any], ) -> dict[str, Any]: """Generate a custom partition table from config.""" partitions_input = partition_config.get("partitions", []) if not partitions_input: return {"success": False, "error": "partition_config must have a 'partitions' list"} # Auto-calculate offsets starting after partition table (0x9000) current_offset = 0x9000 partitions = [] errors = [] for i, entry in enumerate(partitions_input): name = entry.get("name") ptype = entry.get("type") subtype = entry.get("subtype") size_str = entry.get("size") if not all([name, ptype, subtype, size_str]): errors.append(f"Partition {i}: requires name, type, subtype, size") continue # Validate type if ptype not in PARTITION_TYPES: errors.append(f"Partition '{name}': invalid type '{ptype}' (use: {list(PARTITION_TYPES.keys())})") continue # Validate subtype valid_subtypes = APP_SUBTYPES if ptype == "app" else DATA_SUBTYPES if subtype not in valid_subtypes: errors.append(f"Partition '{name}': invalid subtype '{subtype}' (use: {list(valid_subtypes.keys())})") continue try: size_bytes = _parse_size_spec(size_str) except (ValueError, TypeError): errors.append(f"Partition '{name}': invalid size '{size_str}'") continue # Use explicit offset if provided, otherwise auto-calculate offset = entry.get("offset") if offset: current_offset = int(offset, 0) if isinstance(offset, str) else offset # App partitions must be 64KB aligned if ptype == "app" and current_offset % 0x10000 != 0: current_offset = (current_offset + 0xFFFF) & ~0xFFFF partitions.append({ "name": name, "type": ptype, "subtype": subtype, "offset": f"0x{current_offset:x}", "size": _format_size(size_bytes), "size_bytes": size_bytes, }) current_offset += size_bytes if errors: return {"success": False, "errors": errors} # Generate CSV csv_lines = ["# ESP-IDF Partition Table (custom layout)", "# Name, Type, SubType, Offset, Size, Flags"] for p in partitions: csv_lines.append(f"{p['name']}, {p['type']}, {p['subtype']}, {p['offset']}, {p['size']},") csv_text = "\n".join(csv_lines) + "\n" return { "success": True, "partition_csv": csv_text, "partitions": [{k: v for k, v in p.items() if k != "size_bytes"} for p in partitions], "total_size": _format_size(sum(p["size_bytes"] for p in partitions)), "note": "Flash this CSV with gen_esp32part.py to binary, then write to 0x8000", } async def _analyze_impl( self, context: Context, port: str | None, ) -> dict[str, Any]: """Read and parse partition table from a connected device.""" if not port: return {"success": False, "error": "Port is required for partition analysis"} import tempfile # Partition table is at 0x8000, max size 0xC00 (3KB) with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp: tmp_path = tmp.name try: result = await self._run_esptool( port, ["read-flash", "0x8000", "0xC00", tmp_path], timeout=60.0, ) if not result["success"]: return {"success": False, "error": result["error"], "port": port} raw = Path(tmp_path).read_bytes() partitions = self._parse_partition_table_binary(raw) if not partitions: return { "success": True, "port": port, "partitions": [], "note": "No valid partition entries found (flash may be blank or erased)", } return { "success": True, "port": port, "partition_count": len(partitions), "partitions": partitions, } finally: import os try: os.unlink(tmp_path) except OSError: pass def _parse_partition_table_binary(self, raw: bytes) -> list[dict[str, Any]]: """Parse ESP32 binary partition table format. Each entry is 32 bytes: - 2 bytes: magic (0xAA50) - 1 byte: type - 1 byte: subtype - 4 bytes: offset (LE) - 4 bytes: size (LE) - 16 bytes: name (null-terminated) - 4 bytes: flags """ import struct entry_size = 32 magic_expected = 0x50AA # Little-endian # Reverse lookup tables type_names = {v: k for k, v in PARTITION_TYPES.items()} app_subtype_names = {v: k for k, v in APP_SUBTYPES.items()} data_subtype_names = {v: k for k, v in DATA_SUBTYPES.items()} partitions = [] for i in range(0, len(raw) - entry_size + 1, entry_size): entry = raw[i : i + entry_size] magic = struct.unpack_from(" dict[str, Any]: """Component health check""" return {"status": "healthy", "note": "Partition manager ready"}