Implement security manager, diagnostics, and partition manager

Security manager: eFuse read/burn via espefuse, security audit
combining chip-id + get-security-info + eFuse summary, flash
encryption status checking.

Diagnostics: memory dump with hex formatting via dump-mem,
performance profiling by timing esptool operations, diagnostic
report combining chip-id + read-mac + flash-id.

Partition manager: OTA partition table generation with auto-layout,
custom partition table from config dict with validation, binary
partition table parsing from flash reads at 0x8000.
This commit is contained in:
Ryan Malloy 2026-01-30 20:32:48 -07:00
parent 9fe57005c3
commit 945939bdad
3 changed files with 1108 additions and 38 deletions

View File

@ -1,11 +1,15 @@
""" """
Diagnostics Component Diagnostics Component
Provides comprehensive ESP device diagnostics including memory dumps, Provides ESP device diagnostics including memory dumps, flash identification,
performance profiling, and diagnostic reporting. performance profiling, and comprehensive diagnostic reporting. All operations
shell out to esptool as async subprocesses.
""" """
import asyncio
import logging import logging
import re
import time
from typing import Any from typing import Any
from fastmcp import Context, FastMCP from fastmcp import Context, FastMCP
@ -14,15 +18,65 @@ from ..config import ESPToolServerConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Size suffixes for human-friendly parsing
_SIZE_MULTIPLIERS = {"B": 1, "KB": 1024, "MB": 1024 * 1024}
def _parse_size(size_str: str) -> int:
"""Parse a human-friendly size string like '1KB', '256B', '4MB' into bytes."""
size_str = size_str.strip().upper()
for suffix, mult in sorted(_SIZE_MULTIPLIERS.items(), key=lambda x: -len(x[0])):
if size_str.endswith(suffix):
num = size_str[: -len(suffix)].strip()
return int(num) * mult
# Try as plain integer (decimal or hex)
return int(size_str, 0)
class Diagnostics: class Diagnostics:
"""ESP device diagnostics and analysis""" """ESP device diagnostics and analysis"""
def __init__(self, app: FastMCP, config: ESPToolServerConfig): def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None:
self.app = app self.app = app
self.config = config self.config = config
self._register_tools() self._register_tools()
async def _run_esptool(
self,
port: str,
args: list[str],
timeout: float = 30.0,
) -> dict[str, Any]:
"""Run esptool as an async subprocess."""
cmd = [self.config.esptool_path, "--port", port, *args]
proc = None
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
output = (stdout or b"").decode() + (stderr or b"").decode()
if proc.returncode != 0:
return {"success": False, "error": output.strip()[:500]}
return {"success": True, "output": output}
except asyncio.TimeoutError:
if proc and proc.returncode is None:
proc.kill()
await proc.wait()
return {"success": False, "error": f"Timeout after {timeout}s"}
except FileNotFoundError:
return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"}
except Exception as e:
if proc and proc.returncode is None:
proc.kill()
await proc.wait()
return {"success": False, "error": str(e)}
def _register_tools(self) -> None: def _register_tools(self) -> None:
"""Register diagnostic tools""" """Register diagnostic tools"""
@ -33,22 +87,269 @@ class Diagnostics:
start_address: str = "0x0", start_address: str = "0x0",
size: str = "1KB", size: str = "1KB",
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Dump device memory for analysis""" """Dump device memory for analysis.
return {"success": True, "note": "Implementation coming soon"}
Reads raw bytes from an arbitrary memory address on the ESP device
using esptool's dump-mem command. Useful for inspecting bootloader
state, peripheral registers, or RAM contents.
The output is hex-formatted for readability. For flash memory reads,
use esp_flash_read instead (faster, supports larger ranges).
Args:
port: Serial port or socket:// URI (required)
start_address: Memory address to start reading (hex string, default: "0x0")
size: Number of bytes to read (e.g. "256B", "1KB", "4KB", default: "1KB")
"""
return await self._memory_dump_impl(context, port, start_address, size)
@self.app.tool("esp_performance_profile") @self.app.tool("esp_performance_profile")
async def performance_profile( async def performance_profile(
context: Context, port: str | None = None, duration: int = 30 context: Context,
port: str | None = None,
duration: int = 30,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Profile device performance""" """Profile device communication performance.
return {"success": True, "note": "Implementation coming soon"}
Measures serial transport speed by timing a sequence of esptool
operations (chip-id, flash-id, small memory reads). Reports
round-trip latencies and throughput estimates. Useful for comparing
physical serial vs QEMU socket performance.
Args:
port: Serial port or socket:// URI (required)
duration: Not used for timing control; kept for API compatibility
"""
return await self._performance_profile_impl(context, port)
@self.app.tool("esp_diagnostic_report") @self.app.tool("esp_diagnostic_report")
async def diagnostic_report( async def diagnostic_report(
context: Context, port: str | None = None, include_memory: bool = False context: Context,
port: str | None = None,
include_memory: bool = False,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Generate comprehensive diagnostic report""" """Generate comprehensive diagnostic report for an ESP device.
return {"success": True, "note": "Implementation coming soon"}
Collects chip identity, MAC address, flash information, and
optionally a small memory dump into a single structured report.
Useful for troubleshooting connectivity issues or characterizing
an unknown device.
For security-focused analysis, use esp_security_audit instead.
Args:
port: Serial port or socket:// URI (required)
include_memory: Include a 256-byte memory dump from 0x0 (default: false)
"""
return await self._diagnostic_report_impl(context, port, include_memory)
async def _memory_dump_impl(
self,
context: Context,
port: str | None,
start_address: str,
size: str,
) -> dict[str, Any]:
"""Read memory via esptool dump-mem (writes to temp file, then reads it)."""
if not port:
return {"success": False, "error": "Port is required for memory dump"}
byte_count = _parse_size(size)
if byte_count > 1024 * 1024:
return {"success": False, "error": "Maximum dump size is 1MB"}
# dump-mem writes raw bytes to a file
import tempfile
with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
tmp_path = tmp.name
try:
result = await self._run_esptool(
port,
["dump-mem", start_address, str(byte_count), tmp_path],
timeout=60.0,
)
if not result["success"]:
return {"success": False, "error": result["error"], "port": port}
# Read the dump file and format as hex
from pathlib import Path
dump_path = Path(tmp_path)
if not dump_path.exists() or dump_path.stat().st_size == 0:
return {"success": False, "error": "Dump file is empty", "port": port}
raw = dump_path.read_bytes()
# Format as hex dump (16 bytes per line with ASCII)
hex_lines = []
for offset in range(0, len(raw), 16):
chunk = raw[offset : offset + 16]
addr = int(start_address, 0) + offset
hex_part = " ".join(f"{b:02x}" for b in chunk)
ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
hex_lines.append(f"0x{addr:08x}: {hex_part:<48s} {ascii_part}")
return {
"success": True,
"port": port,
"start_address": start_address,
"bytes_read": len(raw),
"hex_dump": "\n".join(hex_lines[:64]), # Cap at 64 lines (1KB)
"truncated": len(hex_lines) > 64,
}
finally:
import os
try:
os.unlink(tmp_path)
except OSError:
pass
async def _performance_profile_impl(
self,
context: Context,
port: str | None,
) -> dict[str, Any]:
"""Profile serial transport by timing esptool operations."""
if not port:
return {"success": False, "error": "Port is required for profiling"}
measurements: list[dict[str, Any]] = []
# Test 1: chip-id (lightweight command)
t0 = time.time()
r = await self._run_esptool(port, ["chip-id"], timeout=15.0)
elapsed = round(time.time() - t0, 3)
measurements.append({
"operation": "chip-id",
"elapsed_seconds": elapsed,
"success": r["success"],
})
# Test 2: flash-id (reads SPI flash ID register)
t0 = time.time()
r = await self._run_esptool(port, ["flash-id"], timeout=15.0)
elapsed = round(time.time() - t0, 3)
measurements.append({
"operation": "flash-id",
"elapsed_seconds": elapsed,
"success": r["success"],
})
# Test 3: read-mac
t0 = time.time()
r = await self._run_esptool(port, ["read-mac"], timeout=15.0)
elapsed = round(time.time() - t0, 3)
measurements.append({
"operation": "read-mac",
"elapsed_seconds": elapsed,
"success": r["success"],
})
# Test 4: read 4KB of flash (throughput test)
import tempfile
with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
tmp_path = tmp.name
try:
t0 = time.time()
r = await self._run_esptool(
port,
["read-flash", "0x0", "4096", tmp_path],
timeout=60.0,
)
elapsed = round(time.time() - t0, 3)
throughput = None
if r["success"] and elapsed > 0:
throughput = round(4096 / elapsed, 0)
measurements.append({
"operation": "read-flash (4KB)",
"elapsed_seconds": elapsed,
"success": r["success"],
"throughput_bytes_per_sec": throughput,
})
finally:
import os
try:
os.unlink(tmp_path)
except OSError:
pass
# Summary
successes = [m for m in measurements if m["success"]]
avg_latency = (
round(sum(m["elapsed_seconds"] for m in successes) / len(successes), 3)
if successes
else None
)
return {
"success": True,
"port": port,
"measurements": measurements,
"summary": {
"operations_tested": len(measurements),
"operations_succeeded": len(successes),
"average_latency_seconds": avg_latency,
},
}
async def _diagnostic_report_impl(
self,
context: Context,
port: str | None,
include_memory: bool,
) -> dict[str, Any]:
"""Generate comprehensive device diagnostic report."""
if not port:
return {"success": False, "error": "Port is required for diagnostic report"}
report: dict[str, Any] = {"port": port}
# 1. Chip identification
chip_result = await self._run_esptool(port, ["chip-id"], timeout=15.0)
if chip_result["success"]:
output = chip_result["output"]
chip_match = re.search(r"Chip is (\S+)", output)
id_match = re.search(r"Chip ID:\s*(0x[0-9a-fA-F]+)", output)
report["chip"] = chip_match.group(1) if chip_match else "unknown"
report["chip_id"] = id_match.group(1) if id_match else "unknown"
else:
return {"success": False, "error": f"Cannot reach device: {chip_result['error']}", "port": port}
# 2. MAC address
mac_result = await self._run_esptool(port, ["read-mac"], timeout=15.0)
if mac_result["success"]:
mac_match = re.search(r"MAC:\s*([0-9a-fA-F:]+)", mac_result["output"])
report["mac_address"] = mac_match.group(1) if mac_match else "unknown"
# 3. Flash info
flash_result = await self._run_esptool(port, ["flash-id"], timeout=15.0)
if flash_result["success"]:
output = flash_result["output"]
mfr_match = re.search(r"Manufacturer:\s*(0x[0-9a-fA-F]+)", output)
dev_match = re.search(r"Device:\s*(0x[0-9a-fA-F]+)", output)
size_match = re.search(r"Detected flash size:\s*(\S+)", output)
report["flash"] = {
"manufacturer": mfr_match.group(1) if mfr_match else "unknown",
"device": dev_match.group(1) if dev_match else "unknown",
"size": size_match.group(1) if size_match else "unknown",
}
# 4. Optional memory dump
if include_memory:
mem_result = await self._memory_dump_impl(context, port, "0x0", "256B")
if mem_result.get("success"):
report["memory_dump_0x0"] = mem_result.get("hex_dump", "")
report["success"] = True
return report
async def health_check(self) -> dict[str, Any]: async def health_check(self) -> dict[str, Any]:
"""Component health check""" """Component health check"""

View File

@ -1,11 +1,14 @@
""" """
Partition Manager Component Partition Manager Component
Handles ESP partition table operations, OTA partition management, Handles ESP partition table operations: generating OTA-capable tables,
and custom partition configurations. custom partition layouts, and reading/analyzing partition tables from
connected devices.
""" """
import asyncio
import logging import logging
from pathlib import Path
from typing import Any from typing import Any
from fastmcp import Context, FastMCP from fastmcp import Context, FastMCP
@ -14,38 +17,429 @@ from ..config import ESPToolServerConfig
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ESP partition types and subtypes
PARTITION_TYPES = {
"app": 0x00,
"data": 0x01,
}
APP_SUBTYPES = {
"factory": 0x00,
"ota_0": 0x10,
"ota_1": 0x11,
"ota_2": 0x12,
"ota_3": 0x13,
"test": 0x20,
}
DATA_SUBTYPES = {
"ota": 0x00,
"phy": 0x01,
"nvs": 0x02,
"coredump": 0x03,
"nvs_keys": 0x04,
"efuse": 0x05,
"spiffs": 0x82,
"littlefs": 0x83,
"fat": 0x81,
}
# Size multipliers
_SIZE_MULT = {"K": 1024, "M": 1024 * 1024}
def _parse_size_spec(spec: str) -> int:
"""Parse a partition size like '1MB', '64K', '0x10000' into bytes."""
spec = spec.strip().upper()
for suffix, mult in _SIZE_MULT.items():
if spec.endswith(suffix + "B"):
return int(spec[: -len(suffix) - 1]) * mult
if spec.endswith(suffix):
return int(spec[: -len(suffix)]) * mult
return int(spec, 0)
def _format_size(size_bytes: int) -> str:
"""Format byte count as human-readable size."""
if size_bytes >= 1024 * 1024 and size_bytes % (1024 * 1024) == 0:
return f"{size_bytes // (1024 * 1024)}MB"
if size_bytes >= 1024 and size_bytes % 1024 == 0:
return f"{size_bytes // 1024}KB"
return f"{size_bytes}B"
class PartitionManager: class PartitionManager:
"""ESP partition table management""" """ESP partition table management"""
def __init__(self, app: FastMCP, config: ESPToolServerConfig): def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None:
self.app = app self.app = app
self.config = config self.config = config
self._register_tools() self._register_tools()
async def _run_esptool(
self,
port: str,
args: list[str],
timeout: float = 30.0,
) -> dict[str, Any]:
"""Run esptool as an async subprocess."""
cmd = [self.config.esptool_path, "--port", port, *args]
proc = None
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
output = (stdout or b"").decode() + (stderr or b"").decode()
if proc.returncode != 0:
return {"success": False, "error": output.strip()[:500]}
return {"success": True, "output": output}
except asyncio.TimeoutError:
if proc and proc.returncode is None:
proc.kill()
await proc.wait()
return {"success": False, "error": f"Timeout after {timeout}s"}
except FileNotFoundError:
return {"success": False, "error": f"esptool not found at {self.config.esptool_path}"}
except Exception as e:
if proc and proc.returncode is None:
proc.kill()
await proc.wait()
return {"success": False, "error": str(e)}
def _register_tools(self) -> None: def _register_tools(self) -> None:
"""Register partition management tools""" """Register partition management tools"""
@self.app.tool("esp_partition_create_ota") @self.app.tool("esp_partition_create_ota")
async def create_ota_partition( async def create_ota_partition(
context: Context, flash_size: str = "4MB", app_size: str = "1MB" context: Context,
flash_size: str = "4MB",
app_size: str = "1MB",
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Create OTA-enabled partition table""" """Create OTA-enabled partition table.
return {"success": True, "note": "Implementation coming soon"}
Generates a partition table CSV with two OTA app slots, NVS storage,
OTA data partition, and PHY calibration data. The layout follows
Espressif's recommended OTA structure.
The generated CSV can be converted to binary with gen_esp32part.py
(from ESP-IDF) and flashed to the partition table offset (typically 0x8000).
Args:
flash_size: Total flash size (e.g. "4MB", "8MB", "16MB", default: "4MB")
app_size: Size for each OTA app slot (e.g. "1MB", "1536K", default: "1MB")
"""
return await self._create_ota_impl(context, flash_size, app_size)
@self.app.tool("esp_partition_custom") @self.app.tool("esp_partition_custom")
async def create_custom_partition( async def create_custom_partition(
context: Context, partition_config: dict[str, Any] context: Context,
partition_config: dict[str, Any],
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Create custom partition table""" """Create custom partition table from a configuration dict.
return {"success": True, "note": "Implementation coming soon"}
Accepts a list of partition entries and generates a valid ESP
partition table CSV. Each entry needs: name, type, subtype, size.
Offset is auto-calculated if omitted.
Example partition_config:
{
"partitions": [
{"name": "nvs", "type": "data", "subtype": "nvs", "size": "24K"},
{"name": "factory", "type": "app", "subtype": "factory", "size": "1MB"},
{"name": "storage", "type": "data", "subtype": "spiffs", "size": "512K"}
]
}
Args:
partition_config: Dict with "partitions" key containing list of entries
"""
return await self._create_custom_impl(context, partition_config)
@self.app.tool("esp_partition_analyze") @self.app.tool("esp_partition_analyze")
async def analyze_partitions( async def analyze_partitions(
context: Context, port: str | None = None context: Context,
port: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Analyze current partition table""" """Analyze current partition table on a connected ESP device.
return {"success": True, "note": "Implementation coming soon"}
Reads the partition table from flash (at offset 0x8000, 0xC00 bytes)
and parses the binary format into a human-readable table. Shows
partition names, types, offsets, sizes, and flags.
Works with physical devices and QEMU virtual devices.
Args:
port: Serial port or socket:// URI (required)
"""
return await self._analyze_impl(context, port)
async def _create_ota_impl(
self,
context: Context,
flash_size: str,
app_size: str,
) -> dict[str, Any]:
"""Generate an OTA-capable partition table."""
try:
total_bytes = _parse_size_spec(flash_size)
app_bytes = _parse_size_spec(app_size)
except (ValueError, TypeError) as e:
return {"success": False, "error": f"Invalid size: {e}"}
# Standard layout:
# 0x9000 - nvs (24KB)
# 0xf000 - otadata (8KB)
# 0x11000 - phy_init (4KB)
# 0x12000 - ota_0 (app_size)
# ota_0 + app_size - ota_1 (app_size)
nvs_size = 24 * 1024
otadata_size = 8 * 1024
phy_size = 4 * 1024
# Check it all fits (partition table at 0x8000 + 0x1000)
overhead = 0x9000 + nvs_size + otadata_size + phy_size # Before first app
needed = overhead + (2 * app_bytes)
if needed > total_bytes:
return {
"success": False,
"error": (
f"Layout requires {_format_size(needed)} but flash is {flash_size}. "
f"Reduce app_size or increase flash_size."
),
}
partitions = [
("nvs", "data", "nvs", "0x9000", _format_size(nvs_size)),
("otadata", "data", "ota", f"0x{0x9000 + nvs_size:x}", _format_size(otadata_size)),
("phy_init", "data", "phy", f"0x{0x9000 + nvs_size + otadata_size:x}", _format_size(phy_size)),
("ota_0", "app", "ota_0", f"0x{overhead:x}", _format_size(app_bytes)),
("ota_1", "app", "ota_1", f"0x{overhead + app_bytes:x}", _format_size(app_bytes)),
]
# Remaining space for storage
used = overhead + (2 * app_bytes)
remaining = total_bytes - used
if remaining >= 4096:
partitions.append(
("storage", "data", "spiffs", f"0x{used:x}", _format_size(remaining))
)
# Generate CSV
csv_lines = ["# ESP-IDF Partition Table (OTA layout)", "# Name, Type, SubType, Offset, Size, Flags"]
for name, ptype, subtype, offset, size in partitions:
csv_lines.append(f"{name}, {ptype}, {subtype}, {offset}, {size},")
csv_text = "\n".join(csv_lines) + "\n"
return {
"success": True,
"flash_size": flash_size,
"app_size": app_size,
"partition_csv": csv_text,
"partitions": [
{"name": p[0], "type": p[1], "subtype": p[2], "offset": p[3], "size": p[4]}
for p in partitions
],
"space_remaining": _format_size(remaining) if remaining >= 4096 else "0",
"note": "Flash this CSV with gen_esp32part.py to binary, then write to 0x8000",
}
async def _create_custom_impl(
self,
context: Context,
partition_config: dict[str, Any],
) -> dict[str, Any]:
"""Generate a custom partition table from config."""
partitions_input = partition_config.get("partitions", [])
if not partitions_input:
return {"success": False, "error": "partition_config must have a 'partitions' list"}
# Auto-calculate offsets starting after partition table (0x9000)
current_offset = 0x9000
partitions = []
errors = []
for i, entry in enumerate(partitions_input):
name = entry.get("name")
ptype = entry.get("type")
subtype = entry.get("subtype")
size_str = entry.get("size")
if not all([name, ptype, subtype, size_str]):
errors.append(f"Partition {i}: requires name, type, subtype, size")
continue
# Validate type
if ptype not in PARTITION_TYPES:
errors.append(f"Partition '{name}': invalid type '{ptype}' (use: {list(PARTITION_TYPES.keys())})")
continue
# Validate subtype
valid_subtypes = APP_SUBTYPES if ptype == "app" else DATA_SUBTYPES
if subtype not in valid_subtypes:
errors.append(f"Partition '{name}': invalid subtype '{subtype}' (use: {list(valid_subtypes.keys())})")
continue
try:
size_bytes = _parse_size_spec(size_str)
except (ValueError, TypeError):
errors.append(f"Partition '{name}': invalid size '{size_str}'")
continue
# Use explicit offset if provided, otherwise auto-calculate
offset = entry.get("offset")
if offset:
current_offset = int(offset, 0) if isinstance(offset, str) else offset
# App partitions must be 64KB aligned
if ptype == "app" and current_offset % 0x10000 != 0:
current_offset = (current_offset + 0xFFFF) & ~0xFFFF
partitions.append({
"name": name,
"type": ptype,
"subtype": subtype,
"offset": f"0x{current_offset:x}",
"size": _format_size(size_bytes),
"size_bytes": size_bytes,
})
current_offset += size_bytes
if errors:
return {"success": False, "errors": errors}
# Generate CSV
csv_lines = ["# ESP-IDF Partition Table (custom layout)", "# Name, Type, SubType, Offset, Size, Flags"]
for p in partitions:
csv_lines.append(f"{p['name']}, {p['type']}, {p['subtype']}, {p['offset']}, {p['size']},")
csv_text = "\n".join(csv_lines) + "\n"
return {
"success": True,
"partition_csv": csv_text,
"partitions": [{k: v for k, v in p.items() if k != "size_bytes"} for p in partitions],
"total_size": _format_size(sum(p["size_bytes"] for p in partitions)),
"note": "Flash this CSV with gen_esp32part.py to binary, then write to 0x8000",
}
async def _analyze_impl(
self,
context: Context,
port: str | None,
) -> dict[str, Any]:
"""Read and parse partition table from a connected device."""
if not port:
return {"success": False, "error": "Port is required for partition analysis"}
import tempfile
# Partition table is at 0x8000, max size 0xC00 (3KB)
with tempfile.NamedTemporaryFile(suffix=".bin", delete=False) as tmp:
tmp_path = tmp.name
try:
result = await self._run_esptool(
port,
["read-flash", "0x8000", "0xC00", tmp_path],
timeout=60.0,
)
if not result["success"]:
return {"success": False, "error": result["error"], "port": port}
raw = Path(tmp_path).read_bytes()
partitions = self._parse_partition_table_binary(raw)
if not partitions:
return {
"success": True,
"port": port,
"partitions": [],
"note": "No valid partition entries found (flash may be blank or erased)",
}
return {
"success": True,
"port": port,
"partition_count": len(partitions),
"partitions": partitions,
}
finally:
import os
try:
os.unlink(tmp_path)
except OSError:
pass
def _parse_partition_table_binary(self, raw: bytes) -> list[dict[str, Any]]:
"""Parse ESP32 binary partition table format.
Each entry is 32 bytes:
- 2 bytes: magic (0xAA50)
- 1 byte: type
- 1 byte: subtype
- 4 bytes: offset (LE)
- 4 bytes: size (LE)
- 16 bytes: name (null-terminated)
- 4 bytes: flags
"""
import struct
entry_size = 32
magic_expected = 0x50AA # Little-endian
# Reverse lookup tables
type_names = {v: k for k, v in PARTITION_TYPES.items()}
app_subtype_names = {v: k for k, v in APP_SUBTYPES.items()}
data_subtype_names = {v: k for k, v in DATA_SUBTYPES.items()}
partitions = []
for i in range(0, len(raw) - entry_size + 1, entry_size):
entry = raw[i : i + entry_size]
magic = struct.unpack_from("<H", entry, 0)[0]
if magic == 0xFFFF:
# End of table (erased flash)
break
if magic != magic_expected:
continue
ptype = entry[2]
subtype = entry[3]
offset = struct.unpack_from("<I", entry, 4)[0]
size = struct.unpack_from("<I", entry, 8)[0]
name = entry[12:28].split(b"\x00")[0].decode("ascii", errors="replace")
flags = struct.unpack_from("<I", entry, 28)[0]
type_name = type_names.get(ptype, f"0x{ptype:02x}")
if ptype == 0x00:
subtype_name = app_subtype_names.get(subtype, f"0x{subtype:02x}")
elif ptype == 0x01:
subtype_name = data_subtype_names.get(subtype, f"0x{subtype:02x}")
else:
subtype_name = f"0x{subtype:02x}"
partitions.append({
"name": name,
"type": type_name,
"subtype": subtype_name,
"offset": f"0x{offset:x}",
"size": _format_size(size),
"size_bytes": size,
"encrypted": bool(flags & 1),
})
return partitions
async def health_check(self) -> dict[str, Any]: async def health_check(self) -> dict[str, Any]:
"""Component health check""" """Component health check"""

View File

@ -1,11 +1,14 @@
""" """
Security Manager Component Security Manager Component
Handles ESP security features including secure boot, flash encryption, Handles ESP security features including eFuse management, flash encryption
eFuse management, and security auditing. status, and security auditing. Operations shell out to esptool/espefuse
as async subprocesses.
""" """
import asyncio
import logging import logging
import re
from typing import Any from typing import Any
from fastmcp import Context, FastMCP from fastmcp import Context, FastMCP
@ -18,39 +21,411 @@ logger = logging.getLogger(__name__)
class SecurityManager: class SecurityManager:
"""ESP security features management""" """ESP security features management"""
def __init__(self, app: FastMCP, config: ESPToolServerConfig): def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None:
self.app = app self.app = app
self.config = config self.config = config
self._register_tools() self._register_tools()
async def _run_cmd(
self,
cmd: list[str],
timeout: float = 30.0,
) -> dict[str, Any]:
"""Run a CLI command as an async subprocess.
Returns:
dict with "success", "output", and optionally "error"
"""
proc = None
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
output = (stdout or b"").decode() + (stderr or b"").decode()
if proc.returncode != 0:
return {"success": False, "error": output.strip()[:500]}
return {"success": True, "output": output}
except asyncio.TimeoutError:
if proc and proc.returncode is None:
proc.kill()
await proc.wait()
return {"success": False, "error": f"Timeout after {timeout}s"}
except FileNotFoundError:
return {"success": False, "error": f"Command not found: {cmd[0]}"}
except Exception as e:
if proc and proc.returncode is None:
proc.kill()
await proc.wait()
return {"success": False, "error": str(e)}
def _espefuse_cmd(self, port: str, args: list[str]) -> list[str]:
"""Build an espefuse command list."""
return ["espefuse", "--port", port, *args]
def _esptool_cmd(self, port: str, args: list[str]) -> list[str]:
"""Build an esptool command list."""
return [self.config.esptool_path, "--port", port, *args]
def _register_tools(self) -> None: def _register_tools(self) -> None:
"""Register security management tools""" """Register security management tools"""
@self.app.tool("esp_security_audit") @self.app.tool("esp_security_audit")
async def security_audit(context: Context, port: str | None = None) -> dict[str, Any]: async def security_audit(
"""Perform comprehensive security audit""" context: Context,
return {"success": True, "note": "Implementation coming soon"} port: str | None = None,
) -> dict[str, Any]:
"""Perform comprehensive security audit of an ESP device.
Connects to the device and gathers security-relevant information:
chip identity, flash encryption status, secure boot state, and
eFuse summary. Returns a structured report suitable for evaluating
the device's security posture.
Requires a connected device (physical or QEMU via socket:// URI).
Args:
port: Serial port or socket:// URI (required)
"""
return await self._security_audit_impl(context, port)
@self.app.tool("esp_enable_flash_encryption") @self.app.tool("esp_enable_flash_encryption")
async def enable_flash_encryption( async def enable_flash_encryption(
context: Context, port: str | None = None, key_file: str | None = None context: Context,
port: str | None = None,
key_file: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Enable flash encryption with optional key""" """Enable flash encryption with optional key.
return {"success": True, "note": "Implementation coming soon"}
Checks current flash encryption status via eFuse summary. If
encryption is already enabled, reports the current state. Actual
eFuse burning for flash encryption requires esp_efuse_burn with
specific eFuse names (FLASH_CRYPT_CNT, etc.) this tool provides
guidance and status checking.
WARNING: Flash encryption is a one-way operation on real hardware.
Test thoroughly on QEMU first.
Args:
port: Serial port or socket:// URI (required)
key_file: Path to encryption key file (for reference only)
"""
return await self._flash_encryption_impl(context, port, key_file)
@self.app.tool("esp_efuse_read") @self.app.tool("esp_efuse_read")
async def read_efuse( async def read_efuse(
context: Context, port: str | None = None, efuse_name: str | None = None context: Context,
port: str | None = None,
efuse_name: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Read eFuse values""" """Read eFuse values from an ESP device.
return {"success": True, "note": "Implementation coming soon"}
Without efuse_name: returns full human-readable eFuse summary
(espefuse summary). With efuse_name: returns that specific eFuse's
value parsed from the summary.
eFuses are one-time-programmable bits that control chip security,
MAC address, calibration data, and more. Reading is non-destructive.
Args:
port: Serial port or socket:// URI (required)
efuse_name: Specific eFuse to read (e.g. "MAC", "FLASH_CRYPT_CNT").
If omitted, returns full summary.
"""
return await self._efuse_read_impl(context, port, efuse_name)
@self.app.tool("esp_efuse_burn") @self.app.tool("esp_efuse_burn")
async def burn_efuse( async def burn_efuse(
context: Context, efuse_name: str, value: str, port: str | None = None context: Context,
efuse_name: str,
value: str,
port: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Burn eFuse (DANGEROUS - requires confirmation)""" """Burn eFuse (DANGEROUS - requires confirmation).
return {"success": True, "note": "Implementation coming soon"}
Permanently programs an eFuse bit field on the ESP device. This
operation is IRREVERSIBLE on real hardware burned bits cannot be
reset. Safe to test on QEMU virtual devices (eFuses reset when
instance is recreated).
Common eFuses: FLASH_CRYPT_CNT, ABS_DONE_0, JTAG_DISABLE,
DISABLE_DL_ENCRYPT, DISABLE_DL_DECRYPT.
Uses --do-not-confirm flag since confirmation is handled at the
MCP client level.
Args:
efuse_name: Name of the eFuse to burn (e.g. "JTAG_DISABLE")
value: Value to burn (e.g. "1", "0x1")
port: Serial port or socket:// URI (required)
"""
return await self._efuse_burn_impl(context, efuse_name, value, port)
async def _security_audit_impl(
self,
context: Context,
port: str | None,
) -> dict[str, Any]:
"""Gather security-relevant info from the device."""
if not port:
return {"success": False, "error": "Port is required for security audit"}
report: dict[str, Any] = {"port": port}
# 1. Get chip info and security info from esptool
security_result = await self._run_cmd(
self._esptool_cmd(port, ["get-security-info"]),
timeout=15.0,
)
if security_result["success"]:
report["security_info"] = self._parse_security_info(security_result["output"])
else:
# get-security-info may not be supported on all chips
report["security_info"] = {"note": security_result["error"]}
# 2. Get chip ID
chip_result = await self._run_cmd(
self._esptool_cmd(port, ["chip-id"]),
timeout=15.0,
)
if chip_result["success"]:
chip_match = re.search(r"Chip ID:\s*(0x[0-9a-fA-F]+)", chip_result["output"])
if chip_match:
report["chip_id"] = chip_match.group(1)
# 3. Get eFuse summary for security-relevant fuses
efuse_result = await self._run_cmd(
self._espefuse_cmd(port, ["summary"]),
timeout=15.0,
)
if efuse_result["success"]:
parsed = self._parse_efuse_summary(efuse_result["output"])
report["efuse_summary"] = parsed
# Extract security-relevant fields
security_fuses = {}
security_names = [
"FLASH_CRYPT_CNT", "ABS_DONE_0", "ABS_DONE_1",
"JTAG_DISABLE", "DISABLE_DL_ENCRYPT", "DISABLE_DL_DECRYPT",
"DISABLE_DL_CACHE", "FLASH_CRYPT_CONFIG",
]
for name in security_names:
if name in parsed:
security_fuses[name] = parsed[name]
report["security_fuses"] = security_fuses
# Determine security posture
flash_encrypted = security_fuses.get("FLASH_CRYPT_CNT", "0") not in ("0", "0x0", "= 0")
secure_boot = security_fuses.get("ABS_DONE_0", "0") not in ("0", "0x0", "= 0")
jtag_disabled = security_fuses.get("JTAG_DISABLE", "0") not in ("0", "0x0", "= 0")
report["posture"] = {
"flash_encryption": "enabled" if flash_encrypted else "disabled",
"secure_boot": "enabled" if secure_boot else "disabled",
"jtag": "disabled" if jtag_disabled else "enabled (vulnerable)",
}
else:
report["efuse_error"] = efuse_result["error"]
report["success"] = True
return report
async def _flash_encryption_impl(
self,
context: Context,
port: str | None,
key_file: str | None,
) -> dict[str, Any]:
"""Check flash encryption status and provide guidance."""
if not port:
return {"success": False, "error": "Port is required"}
# Read the encryption-relevant eFuses
efuse_result = await self._run_cmd(
self._espefuse_cmd(port, ["summary"]),
timeout=15.0,
)
if not efuse_result["success"]:
return {"success": False, "error": efuse_result["error"]}
parsed = self._parse_efuse_summary(efuse_result["output"])
flash_crypt_cnt = parsed.get("FLASH_CRYPT_CNT", "unknown")
flash_crypt_config = parsed.get("FLASH_CRYPT_CONFIG", "unknown")
# Determine current state
is_encrypted = flash_crypt_cnt not in ("0", "0x0", "= 0", "unknown")
result: dict[str, Any] = {
"success": True,
"port": port,
"flash_encryption_enabled": is_encrypted,
"FLASH_CRYPT_CNT": flash_crypt_cnt,
"FLASH_CRYPT_CONFIG": flash_crypt_config,
}
if is_encrypted:
result["message"] = "Flash encryption is already enabled on this device."
else:
result["message"] = (
"Flash encryption is NOT enabled. To enable, you need to: "
"1) Generate or provide an encryption key, "
"2) Burn FLASH_CRYPT_CNT and FLASH_CRYPT_CONFIG eFuses, "
"3) Flash encrypted firmware. "
"WARNING: This is irreversible on real hardware. Test on QEMU first."
)
if key_file:
result["key_file"] = key_file
return result
async def _efuse_read_impl(
self,
context: Context,
port: str | None,
efuse_name: str | None,
) -> dict[str, Any]:
"""Read eFuse values via espefuse summary."""
if not port:
return {"success": False, "error": "Port is required for eFuse read"}
result = await self._run_cmd(
self._espefuse_cmd(port, ["summary"]),
timeout=15.0,
)
if not result["success"]:
return {"success": False, "error": result["error"], "port": port}
parsed = self._parse_efuse_summary(result["output"])
if efuse_name:
# Return specific eFuse
if efuse_name in parsed:
return {
"success": True,
"port": port,
"efuse_name": efuse_name,
"value": parsed[efuse_name],
}
else:
# Try case-insensitive match
for key, val in parsed.items():
if key.upper() == efuse_name.upper():
return {
"success": True,
"port": port,
"efuse_name": key,
"value": val,
}
return {
"success": False,
"error": f"eFuse '{efuse_name}' not found",
"available_efuses": list(parsed.keys()),
"port": port,
}
# Return full summary
return {
"success": True,
"port": port,
"efuses": parsed,
"raw_output": result["output"][:2000],
}
async def _efuse_burn_impl(
self,
context: Context,
efuse_name: str,
value: str,
port: str | None,
) -> dict[str, Any]:
"""Burn an eFuse value via espefuse burn-efuse."""
if not port:
return {"success": False, "error": "Port is required for eFuse burn"}
# Read before to record the change
before = await self._run_cmd(
self._espefuse_cmd(port, ["summary"]),
timeout=15.0,
)
before_parsed = self._parse_efuse_summary(before.get("output", "")) if before["success"] else {}
before_value = before_parsed.get(efuse_name, "unknown")
# Burn the eFuse (--do-not-confirm since MCP client handles confirmation)
result = await self._run_cmd(
self._espefuse_cmd(port, ["--do-not-confirm", "burn-efuse", efuse_name, value]),
timeout=30.0,
)
if not result["success"]:
return {
"success": False,
"error": result["error"],
"port": port,
"efuse_name": efuse_name,
}
# Read after to confirm
after = await self._run_cmd(
self._espefuse_cmd(port, ["summary"]),
timeout=15.0,
)
after_parsed = self._parse_efuse_summary(after.get("output", "")) if after["success"] else {}
after_value = after_parsed.get(efuse_name, "unknown")
return {
"success": True,
"port": port,
"efuse_name": efuse_name,
"value_requested": value,
"value_before": before_value,
"value_after": after_value,
"warning": "eFuse burn is IRREVERSIBLE on real hardware",
}
def _parse_efuse_summary(self, output: str) -> dict[str, str]:
"""Parse espefuse summary output into a dict of name -> value.
espefuse summary lines look like:
EFUSE_NAME (BLOCK0) = 0x00000000 R/W
or:
MAC (BLOCK0) = ab:cd:ef:01:02:03 R/W
"""
efuses: dict[str, str] = {}
for line in output.splitlines():
# Match lines with "NAME (BLOCKn) ... = value"
match = re.match(r"\s*(\w+)\s+\(BLOCK\d+\)\s+.*?=\s+(.+?)\s+[RW/-]+\s*$", line)
if match:
name = match.group(1).strip()
value = match.group(2).strip()
efuses[name] = value
return efuses
def _parse_security_info(self, output: str) -> dict[str, str]:
"""Parse esptool get-security-info output."""
info: dict[str, str] = {}
for line in output.splitlines():
if ":" in line:
key, _, val = line.partition(":")
key = key.strip()
val = val.strip()
if key and val:
info[key] = val
return info
async def health_check(self) -> dict[str, Any]: async def health_check(self) -> dict[str, Any]:
"""Component health check""" """Component health check"""