mcesptool/API_DESIGN.md
Ryan Malloy 64c1505a00 Add QEMU ESP32 emulation support
Integrate Espressif's QEMU fork for virtual ESP device management:

- QemuManager component with 5 MCP tools (start/stop/list/status/flash)
- Config auto-detects QEMU binaries from ~/.espressif/tools/
- Supports esp32, esp32s2, esp32s3, esp32c3 chip emulation
- Virtual serial over TCP (socket://localhost:PORT) transparent to esptool
- Scan integration: QEMU instances appear in esp_scan_ports results
- Blank flash images initialized to 0xFF (erased NOR flash state)
- 38 unit tests covering lifecycle, port allocation, flash writes
2026-01-28 15:35:22 -07:00

15 KiB

🔧 FastMCP ESPTool Server API Design

API Philosophy & Design Principles

Following the patterns established in the Arduino MCP server, this API prioritizes:

  1. Natural Language Workflows: Tools designed for conversational AI interaction
  2. Component Architecture: Modular, testable, and maintainable design
  3. Production Readiness: Enterprise-grade error handling and resource management
  4. Performance Optimization: Leveraging esptool's advanced capabilities
  5. Security First: Built-in security validation and eFuse management

🎯 Core API Structure

FastMCP Server Foundation

# server.py - Main FastMCP server following Arduino MCP patterns
from fastmcp import FastMCP
from .components import (
    ChipControl,
    FlashManager,
    PartitionManager,
    SecurityManager,
    FirmwareBuilder,
    OTAManager,
    ProductionTools,
    Diagnostics
)
from .config import ESPToolServerConfig

class ESPToolServer:
    def __init__(self):
        self.app = FastMCP("ESP Development Server")
        self.config = ESPToolServerConfig()
        self.initialize_components()

    def initialize_components(self):
        """Initialize all component modules"""
        self.chip_control = ChipControl(self.app, self.config)
        self.flash_manager = FlashManager(self.app, self.config)
        self.partition_manager = PartitionManager(self.app, self.config)
        self.security_manager = SecurityManager(self.app, self.config)
        self.firmware_builder = FirmwareBuilder(self.app, self.config)
        self.ota_manager = OTAManager(self.app, self.config)
        self.production_tools = ProductionTools(self.app, self.config)
        self.diagnostics = Diagnostics(self.app, self.config)

def main():
    server = ESPToolServer()
    server.app.run()

🧩 Component API Specifications

1. ChipControl Component

# components/chip_control.py
from fastmcp import FastMCP
from esptool.cmds import detect_chip, run_stub, reset_chip, load_ram, run
from typing import Dict, List, Optional
import asyncio

class ChipControl:
    """ESP chip detection, connection, and control operations"""

    def __init__(self, app: FastMCP, config):
        self.app = app
        self.config = config
        self.register_tools()
        self.register_resources()

    def register_tools(self):
        """Register chip control tools with FastMCP"""

        @self.app.tool("esp_detect_chip")
        async def detect_esp_chip(
            port: str,
            baud: int = 115200,
            connect_attempts: int = 7
        ) -> Dict:
            """
            Auto-detect ESP chip type and return comprehensive information

            Args:
                port: Serial port path (e.g., '/dev/ttyUSB0', 'COM3')
                baud: Connection baud rate
                connect_attempts: Number of connection retry attempts

            Returns:
                Dict with chip info: type, features, MAC address, flash size, etc.
            """
            try:
                with detect_chip(port, baud=baud, connect_attempts=connect_attempts) as esp:
                    return {
                        "success": True,
                        "chip_type": esp.get_chip_description(),
                        "chip_revision": esp.get_chip_revision(),
                        "features": esp.get_chip_features(),
                        "mac_address": esp.read_mac().hex(':'),
                        "flash_size": esp.flash_size,
                        "crystal_freq": esp.get_crystal_freq(),
                        "port": port,
                        "stub_supported": hasattr(esp, 'STUB_SUPPORTED') and esp.STUB_SUPPORTED
                    }
            except Exception as e:
                return {"success": False, "error": str(e), "port": port}

        @self.app.tool("esp_connect_advanced")
        async def connect_with_strategies(
            port: str,
            enable_stub: bool = True,
            high_speed: bool = True
        ) -> str:
            """
            Connect to ESP chip with multiple fallback strategies

            Args:
                port: Serial port path
                enable_stub: Whether to load the flasher stub for better performance
                high_speed: Attempt high-speed connection first

            Returns:
                Connection status and optimization details
            """
            strategies = [
                {'baud': 460800 if high_speed else 115200, 'connect_mode': 'default-reset'},
                {'baud': 115200, 'connect_mode': 'usb-reset'},
                {'baud': 115200, 'connect_mode': 'manual-reset'}
            ]

            for i, strategy in enumerate(strategies):
                try:
                    with detect_chip(port, **strategy) as esp:
                        chip_info = esp.get_chip_description()

                        if enable_stub and hasattr(esp, 'STUB_SUPPORTED') and esp.STUB_SUPPORTED:
                            esp = run_stub(esp)
                            return f"✓ Connected to {chip_info} with stub flasher (strategy {i+1})"
                        else:
                            return f"✓ Connected to {chip_info} (strategy {i+1})"

                except Exception as e:
                    continue

            return f"❌ Failed to connect to {port} with all strategies"

        @self.app.tool("esp_load_test_firmware")
        async def load_firmware_to_ram(
            port: str,
            firmware_path: str
        ) -> str:
            """
            Load and execute firmware in RAM for testing (no flash wear)

            Args:
                port: Serial port path
                firmware_path: Path to firmware binary file

            Returns:
                Execution status and any output
            """
            try:
                with detect_chip(port) as esp:
                    load_ram(esp, firmware_path)
                    run(esp)
                    return f"✓ Firmware loaded and executed in RAM from {firmware_path}"
            except Exception as e:
                return f"❌ Failed to load firmware: {e}"

        @self.app.tool("esp_reset_advanced")
        async def reset_chip_advanced(
            port: str,
            reset_mode: str = "hard-reset"
        ) -> str:
            """
            Reset ESP chip with specific reset mode

            Args:
                port: Serial port path
                reset_mode: Reset type ('hard-reset', 'soft-reset', 'no-reset')

            Returns:
                Reset operation status
            """
            valid_modes = ['hard-reset', 'soft-reset', 'no-reset']
            if reset_mode not in valid_modes:
                return f"❌ Invalid reset mode. Use: {', '.join(valid_modes)}"

            try:
                with detect_chip(port) as esp:
                    reset_chip(esp, reset_mode)
                    return f"✓ Chip reset using {reset_mode} mode"
            except Exception as e:
                return f"❌ Reset failed: {e}"

    def register_resources(self):
        """Register MCP resources for real-time chip information"""

        @self.app.resource("esp://chips")
        async def list_connected_chips() -> str:
            """List all connected ESP chips with basic information"""
            # Implementation to scan common ports and detect chips
            # Returns formatted list of connected devices
            pass

2. FlashManager Component

# components/flash_manager.py
from esptool.cmds import (
    attach_flash, write_flash, read_flash, erase_flash,
    verify_flash, flash_id, read_flash_status
)

class FlashManager:
    """Advanced flash memory operations and optimization"""

    def register_tools(self):

        @self.app.tool("esp_flash_firmware")
        async def flash_firmware_advanced(
            port: str,
            firmware_files: List[Dict[str, any]],
            verify: bool = True,
            optimize: bool = True
        ) -> str:
            """
            Flash multiple firmware files with verification and optimization

            Args:
                port: Serial port path
                firmware_files: List of {address: int, file: str} mappings
                verify: Verify flash after writing
                optimize: Use performance optimizations

            Returns:
                Flashing status with timing and verification results
            """
            try:
                with detect_chip(port) as esp:
                    if optimize and hasattr(esp, 'STUB_SUPPORTED'):
                        esp = run_stub(esp)

                    attach_flash(esp)

                    # Convert file list to esptool format
                    flash_files = [(item['address'], item['file']) for item in firmware_files]

                    write_flash(esp, flash_files)

                    if verify:
                        verify_flash(esp, flash_files)

                    reset_chip(esp, 'hard-reset')

                    return f"✓ Flashed {len(firmware_files)} files successfully"

            except Exception as e:
                return f"❌ Flash operation failed: {e}"

        @self.app.tool("esp_flash_analyze")
        async def analyze_flash_usage(port: str) -> Dict:
            """
            Analyze flash memory layout and usage statistics

            Args:
                port: Serial port path

            Returns:
                Detailed flash analysis including used/free space, partitions
            """
            # Implementation for flash analysis
            pass

        @self.app.tool("esp_flash_backup")
        async def backup_flash_contents(
            port: str,
            output_directory: str,
            include_partitions: bool = True
        ) -> str:
            """
            Create complete backup of ESP flash contents

            Args:
                port: Serial port path
                output_directory: Where to save backup files
                include_partitions: Whether to backup individual partitions

            Returns:
                Backup operation status and file locations
            """
            # Implementation for flash backup
            pass

3. PartitionManager Component

# components/partition_manager.py
class PartitionManager:
    """ESP partition table creation and management"""

    def register_tools(self):

        @self.app.tool("esp_partition_create_ota")
        async def create_ota_partition_table(
            app_size: str = "1MB",
            ota_data_size: str = "8KB",
            nvs_size: str = "24KB",
            output_file: Optional[str] = None
        ) -> str:
            """
            Create optimized partition table for OTA updates

            Args:
                app_size: Size for each app partition (factory + OTA)
                ota_data_size: Size for OTA data partition
                nvs_size: Size for NVS partition
                output_file: Output partition CSV file path

            Returns:
                Generated partition table path and layout summary
            """
            # Implementation for OTA partition creation
            pass

        @self.app.tool("esp_partition_custom")
        async def create_custom_partition_table(
            partitions: List[Dict],
            output_file: str
        ) -> str:
            """
            Create custom partition table from specification

            Args:
                partitions: List of partition definitions
                output_file: Output CSV file path

            Returns:
                Partition table creation status
            """
            # Implementation for custom partition tables
            pass

4. SecurityManager Component

# components/security_manager.py
from esptool.cmds import get_security_info

class SecurityManager:
    """ESP security features and eFuse management"""

    def register_tools(self):

        @self.app.tool("esp_security_audit")
        async def security_audit(port: str) -> Dict:
            """
            Comprehensive security audit of ESP chip

            Args:
                port: Serial port path

            Returns:
                Security status including eFuses, encryption, secure boot
            """
            try:
                with detect_chip(port) as esp:
                    security_info = get_security_info(esp)
                    return {
                        "success": True,
                        "security_info": security_info,
                        "recommendations": self._generate_security_recommendations(security_info)
                    }
            except Exception as e:
                return {"success": False, "error": str(e)}

        @self.app.tool("esp_enable_flash_encryption")
        async def enable_flash_encryption(
            port: str,
            key_file: Optional[str] = None
        ) -> str:
            """
            Enable flash encryption with optional custom key

            Args:
                port: Serial port path
                key_file: Optional custom encryption key file

            Returns:
                Encryption setup status and security warnings
            """
            # Implementation for flash encryption setup
            pass

🔄 Asynchronous Operations & Performance

Background Task Management

# Enhanced async support for long-running operations
class AsyncFlashManager:
    def __init__(self):
        self.active_operations = {}

    @self.app.tool("esp_flash_firmware_async")
    async def flash_firmware_background(
        port: str,
        firmware_files: List[Dict],
        operation_id: Optional[str] = None
    ) -> str:
        """Start firmware flashing in background with progress tracking"""

        if not operation_id:
            operation_id = f"flash_{port}_{int(time.time())}"

        # Start background task
        task = asyncio.create_task(self._flash_firmware_task(port, firmware_files))
        self.active_operations[operation_id] = task

        return f"✓ Flashing started (ID: {operation_id}). Use esp_operation_status to check progress."

    @self.app.tool("esp_operation_status")
    async def check_operation_status(operation_id: str) -> Dict:
        """Check status of background operation"""
        if operation_id not in self.active_operations:
            return {"error": "Operation not found"}

        task = self.active_operations[operation_id]
        return {
            "operation_id": operation_id,
            "done": task.done(),
            "result": task.result() if task.done() else None
        }

📋 Resource API Design

Real-time ESP Information

# MCP Resources for live ESP data
@self.app.resource("esp://chips")
async def connected_chips() -> str:
    """JSON list of all connected ESP chips"""

@self.app.resource("esp://flash/{port}")
async def flash_status(port: str) -> str:
    """Real-time flash status for specific chip"""

@self.app.resource("esp://security/{port}")
async def security_status(port: str) -> str:
    """Current security configuration"""

@self.app.resource("esp://partitions/{port}")
async def partition_info(port: str) -> str:
    """Live partition table information"""

This API design provides a comprehensive, production-ready foundation for ESP development workflows while maintaining the conversational AI-first approach that makes the Arduino MCP server so effective.