mcesptool/API_DESIGN.md
Ryan Malloy 64c1505a00 Add QEMU ESP32 emulation support
Integrate Espressif's QEMU fork for virtual ESP device management:

- QemuManager component with 5 MCP tools (start/stop/list/status/flash)
- Config auto-detects QEMU binaries from ~/.espressif/tools/
- Supports esp32, esp32s2, esp32s3, esp32c3 chip emulation
- Virtual serial over TCP (socket://localhost:PORT) transparent to esptool
- Scan integration: QEMU instances appear in esp_scan_ports results
- Blank flash images initialized to 0xFF (erased NOR flash state)
- 38 unit tests covering lifecycle, port allocation, flash writes
2026-01-28 15:35:22 -07:00

465 lines
15 KiB
Markdown

# 🔧 FastMCP ESPTool Server API Design
## API Philosophy & Design Principles
Following the patterns established in the Arduino MCP server, this API prioritizes:
1. **Natural Language Workflows**: Tools designed for conversational AI interaction
2. **Component Architecture**: Modular, testable, and maintainable design
3. **Production Readiness**: Enterprise-grade error handling and resource management
4. **Performance Optimization**: Leveraging esptool's advanced capabilities
5. **Security First**: Built-in security validation and eFuse management
## 🎯 Core API Structure
### FastMCP Server Foundation
```python
# server.py - Main FastMCP server following Arduino MCP patterns
from fastmcp import FastMCP
from .components import (
ChipControl,
FlashManager,
PartitionManager,
SecurityManager,
FirmwareBuilder,
OTAManager,
ProductionTools,
Diagnostics
)
from .config import ESPToolServerConfig
class ESPToolServer:
def __init__(self):
self.app = FastMCP("ESP Development Server")
self.config = ESPToolServerConfig()
self.initialize_components()
def initialize_components(self):
"""Initialize all component modules"""
self.chip_control = ChipControl(self.app, self.config)
self.flash_manager = FlashManager(self.app, self.config)
self.partition_manager = PartitionManager(self.app, self.config)
self.security_manager = SecurityManager(self.app, self.config)
self.firmware_builder = FirmwareBuilder(self.app, self.config)
self.ota_manager = OTAManager(self.app, self.config)
self.production_tools = ProductionTools(self.app, self.config)
self.diagnostics = Diagnostics(self.app, self.config)
def main():
server = ESPToolServer()
server.app.run()
```
## 🧩 Component API Specifications
### 1. ChipControl Component
```python
# components/chip_control.py
from fastmcp import FastMCP
from esptool.cmds import detect_chip, run_stub, reset_chip, load_ram, run
from typing import Dict, List, Optional
import asyncio
class ChipControl:
"""ESP chip detection, connection, and control operations"""
def __init__(self, app: FastMCP, config):
self.app = app
self.config = config
self.register_tools()
self.register_resources()
def register_tools(self):
"""Register chip control tools with FastMCP"""
@self.app.tool("esp_detect_chip")
async def detect_esp_chip(
port: str,
baud: int = 115200,
connect_attempts: int = 7
) -> Dict:
"""
Auto-detect ESP chip type and return comprehensive information
Args:
port: Serial port path (e.g., '/dev/ttyUSB0', 'COM3')
baud: Connection baud rate
connect_attempts: Number of connection retry attempts
Returns:
Dict with chip info: type, features, MAC address, flash size, etc.
"""
try:
with detect_chip(port, baud=baud, connect_attempts=connect_attempts) as esp:
return {
"success": True,
"chip_type": esp.get_chip_description(),
"chip_revision": esp.get_chip_revision(),
"features": esp.get_chip_features(),
"mac_address": esp.read_mac().hex(':'),
"flash_size": esp.flash_size,
"crystal_freq": esp.get_crystal_freq(),
"port": port,
"stub_supported": hasattr(esp, 'STUB_SUPPORTED') and esp.STUB_SUPPORTED
}
except Exception as e:
return {"success": False, "error": str(e), "port": port}
@self.app.tool("esp_connect_advanced")
async def connect_with_strategies(
port: str,
enable_stub: bool = True,
high_speed: bool = True
) -> str:
"""
Connect to ESP chip with multiple fallback strategies
Args:
port: Serial port path
enable_stub: Whether to load the flasher stub for better performance
high_speed: Attempt high-speed connection first
Returns:
Connection status and optimization details
"""
strategies = [
{'baud': 460800 if high_speed else 115200, 'connect_mode': 'default-reset'},
{'baud': 115200, 'connect_mode': 'usb-reset'},
{'baud': 115200, 'connect_mode': 'manual-reset'}
]
for i, strategy in enumerate(strategies):
try:
with detect_chip(port, **strategy) as esp:
chip_info = esp.get_chip_description()
if enable_stub and hasattr(esp, 'STUB_SUPPORTED') and esp.STUB_SUPPORTED:
esp = run_stub(esp)
return f"✓ Connected to {chip_info} with stub flasher (strategy {i+1})"
else:
return f"✓ Connected to {chip_info} (strategy {i+1})"
except Exception as e:
continue
return f"❌ Failed to connect to {port} with all strategies"
@self.app.tool("esp_load_test_firmware")
async def load_firmware_to_ram(
port: str,
firmware_path: str
) -> str:
"""
Load and execute firmware in RAM for testing (no flash wear)
Args:
port: Serial port path
firmware_path: Path to firmware binary file
Returns:
Execution status and any output
"""
try:
with detect_chip(port) as esp:
load_ram(esp, firmware_path)
run(esp)
return f"✓ Firmware loaded and executed in RAM from {firmware_path}"
except Exception as e:
return f"❌ Failed to load firmware: {e}"
@self.app.tool("esp_reset_advanced")
async def reset_chip_advanced(
port: str,
reset_mode: str = "hard-reset"
) -> str:
"""
Reset ESP chip with specific reset mode
Args:
port: Serial port path
reset_mode: Reset type ('hard-reset', 'soft-reset', 'no-reset')
Returns:
Reset operation status
"""
valid_modes = ['hard-reset', 'soft-reset', 'no-reset']
if reset_mode not in valid_modes:
return f"❌ Invalid reset mode. Use: {', '.join(valid_modes)}"
try:
with detect_chip(port) as esp:
reset_chip(esp, reset_mode)
return f"✓ Chip reset using {reset_mode} mode"
except Exception as e:
return f"❌ Reset failed: {e}"
def register_resources(self):
"""Register MCP resources for real-time chip information"""
@self.app.resource("esp://chips")
async def list_connected_chips() -> str:
"""List all connected ESP chips with basic information"""
# Implementation to scan common ports and detect chips
# Returns formatted list of connected devices
pass
```
### 2. FlashManager Component
```python
# components/flash_manager.py
from esptool.cmds import (
attach_flash, write_flash, read_flash, erase_flash,
verify_flash, flash_id, read_flash_status
)
class FlashManager:
"""Advanced flash memory operations and optimization"""
def register_tools(self):
@self.app.tool("esp_flash_firmware")
async def flash_firmware_advanced(
port: str,
firmware_files: List[Dict[str, any]],
verify: bool = True,
optimize: bool = True
) -> str:
"""
Flash multiple firmware files with verification and optimization
Args:
port: Serial port path
firmware_files: List of {address: int, file: str} mappings
verify: Verify flash after writing
optimize: Use performance optimizations
Returns:
Flashing status with timing and verification results
"""
try:
with detect_chip(port) as esp:
if optimize and hasattr(esp, 'STUB_SUPPORTED'):
esp = run_stub(esp)
attach_flash(esp)
# Convert file list to esptool format
flash_files = [(item['address'], item['file']) for item in firmware_files]
write_flash(esp, flash_files)
if verify:
verify_flash(esp, flash_files)
reset_chip(esp, 'hard-reset')
return f"✓ Flashed {len(firmware_files)} files successfully"
except Exception as e:
return f"❌ Flash operation failed: {e}"
@self.app.tool("esp_flash_analyze")
async def analyze_flash_usage(port: str) -> Dict:
"""
Analyze flash memory layout and usage statistics
Args:
port: Serial port path
Returns:
Detailed flash analysis including used/free space, partitions
"""
# Implementation for flash analysis
pass
@self.app.tool("esp_flash_backup")
async def backup_flash_contents(
port: str,
output_directory: str,
include_partitions: bool = True
) -> str:
"""
Create complete backup of ESP flash contents
Args:
port: Serial port path
output_directory: Where to save backup files
include_partitions: Whether to backup individual partitions
Returns:
Backup operation status and file locations
"""
# Implementation for flash backup
pass
```
### 3. PartitionManager Component
```python
# components/partition_manager.py
class PartitionManager:
"""ESP partition table creation and management"""
def register_tools(self):
@self.app.tool("esp_partition_create_ota")
async def create_ota_partition_table(
app_size: str = "1MB",
ota_data_size: str = "8KB",
nvs_size: str = "24KB",
output_file: Optional[str] = None
) -> str:
"""
Create optimized partition table for OTA updates
Args:
app_size: Size for each app partition (factory + OTA)
ota_data_size: Size for OTA data partition
nvs_size: Size for NVS partition
output_file: Output partition CSV file path
Returns:
Generated partition table path and layout summary
"""
# Implementation for OTA partition creation
pass
@self.app.tool("esp_partition_custom")
async def create_custom_partition_table(
partitions: List[Dict],
output_file: str
) -> str:
"""
Create custom partition table from specification
Args:
partitions: List of partition definitions
output_file: Output CSV file path
Returns:
Partition table creation status
"""
# Implementation for custom partition tables
pass
```
### 4. SecurityManager Component
```python
# components/security_manager.py
from esptool.cmds import get_security_info
class SecurityManager:
"""ESP security features and eFuse management"""
def register_tools(self):
@self.app.tool("esp_security_audit")
async def security_audit(port: str) -> Dict:
"""
Comprehensive security audit of ESP chip
Args:
port: Serial port path
Returns:
Security status including eFuses, encryption, secure boot
"""
try:
with detect_chip(port) as esp:
security_info = get_security_info(esp)
return {
"success": True,
"security_info": security_info,
"recommendations": self._generate_security_recommendations(security_info)
}
except Exception as e:
return {"success": False, "error": str(e)}
@self.app.tool("esp_enable_flash_encryption")
async def enable_flash_encryption(
port: str,
key_file: Optional[str] = None
) -> str:
"""
Enable flash encryption with optional custom key
Args:
port: Serial port path
key_file: Optional custom encryption key file
Returns:
Encryption setup status and security warnings
"""
# Implementation for flash encryption setup
pass
```
## 🔄 Asynchronous Operations & Performance
### Background Task Management
```python
# Enhanced async support for long-running operations
class AsyncFlashManager:
def __init__(self):
self.active_operations = {}
@self.app.tool("esp_flash_firmware_async")
async def flash_firmware_background(
port: str,
firmware_files: List[Dict],
operation_id: Optional[str] = None
) -> str:
"""Start firmware flashing in background with progress tracking"""
if not operation_id:
operation_id = f"flash_{port}_{int(time.time())}"
# Start background task
task = asyncio.create_task(self._flash_firmware_task(port, firmware_files))
self.active_operations[operation_id] = task
return f"✓ Flashing started (ID: {operation_id}). Use esp_operation_status to check progress."
@self.app.tool("esp_operation_status")
async def check_operation_status(operation_id: str) -> Dict:
"""Check status of background operation"""
if operation_id not in self.active_operations:
return {"error": "Operation not found"}
task = self.active_operations[operation_id]
return {
"operation_id": operation_id,
"done": task.done(),
"result": task.result() if task.done() else None
}
```
## 📋 Resource API Design
### Real-time ESP Information
```python
# MCP Resources for live ESP data
@self.app.resource("esp://chips")
async def connected_chips() -> str:
"""JSON list of all connected ESP chips"""
@self.app.resource("esp://flash/{port}")
async def flash_status(port: str) -> str:
"""Real-time flash status for specific chip"""
@self.app.resource("esp://security/{port}")
async def security_status(port: str) -> str:
"""Current security configuration"""
@self.app.resource("esp://partitions/{port}")
async def partition_info(port: str) -> str:
"""Live partition table information"""
```
This API design provides a comprehensive, production-ready foundation for ESP development workflows while maintaining the conversational AI-first approach that makes the Arduino MCP server so effective.