Integrate Espressif's QEMU fork for virtual ESP device management: - QemuManager component with 5 MCP tools (start/stop/list/status/flash) - Config auto-detects QEMU binaries from ~/.espressif/tools/ - Supports esp32, esp32s2, esp32s3, esp32c3 chip emulation - Virtual serial over TCP (socket://localhost:PORT) transparent to esptool - Scan integration: QEMU instances appear in esp_scan_ports results - Blank flash images initialized to 0xFF (erased NOR flash state) - 38 unit tests covering lifecycle, port allocation, flash writes
465 lines
15 KiB
Markdown
465 lines
15 KiB
Markdown
# 🔧 FastMCP ESPTool Server API Design
|
|
|
|
## API Philosophy & Design Principles
|
|
|
|
Following the patterns established in the Arduino MCP server, this API prioritizes:
|
|
|
|
1. **Natural Language Workflows**: Tools designed for conversational AI interaction
|
|
2. **Component Architecture**: Modular, testable, and maintainable design
|
|
3. **Production Readiness**: Enterprise-grade error handling and resource management
|
|
4. **Performance Optimization**: Leveraging esptool's advanced capabilities
|
|
5. **Security First**: Built-in security validation and eFuse management
|
|
|
|
## 🎯 Core API Structure
|
|
|
|
### FastMCP Server Foundation
|
|
|
|
```python
|
|
# server.py - Main FastMCP server following Arduino MCP patterns
|
|
from fastmcp import FastMCP
|
|
from .components import (
|
|
ChipControl,
|
|
FlashManager,
|
|
PartitionManager,
|
|
SecurityManager,
|
|
FirmwareBuilder,
|
|
OTAManager,
|
|
ProductionTools,
|
|
Diagnostics
|
|
)
|
|
from .config import ESPToolServerConfig
|
|
|
|
class ESPToolServer:
|
|
def __init__(self):
|
|
self.app = FastMCP("ESP Development Server")
|
|
self.config = ESPToolServerConfig()
|
|
self.initialize_components()
|
|
|
|
def initialize_components(self):
|
|
"""Initialize all component modules"""
|
|
self.chip_control = ChipControl(self.app, self.config)
|
|
self.flash_manager = FlashManager(self.app, self.config)
|
|
self.partition_manager = PartitionManager(self.app, self.config)
|
|
self.security_manager = SecurityManager(self.app, self.config)
|
|
self.firmware_builder = FirmwareBuilder(self.app, self.config)
|
|
self.ota_manager = OTAManager(self.app, self.config)
|
|
self.production_tools = ProductionTools(self.app, self.config)
|
|
self.diagnostics = Diagnostics(self.app, self.config)
|
|
|
|
def main():
|
|
server = ESPToolServer()
|
|
server.app.run()
|
|
```
|
|
|
|
## 🧩 Component API Specifications
|
|
|
|
### 1. ChipControl Component
|
|
|
|
```python
|
|
# components/chip_control.py
|
|
from fastmcp import FastMCP
|
|
from esptool.cmds import detect_chip, run_stub, reset_chip, load_ram, run
|
|
from typing import Dict, List, Optional
|
|
import asyncio
|
|
|
|
class ChipControl:
|
|
"""ESP chip detection, connection, and control operations"""
|
|
|
|
def __init__(self, app: FastMCP, config):
|
|
self.app = app
|
|
self.config = config
|
|
self.register_tools()
|
|
self.register_resources()
|
|
|
|
def register_tools(self):
|
|
"""Register chip control tools with FastMCP"""
|
|
|
|
@self.app.tool("esp_detect_chip")
|
|
async def detect_esp_chip(
|
|
port: str,
|
|
baud: int = 115200,
|
|
connect_attempts: int = 7
|
|
) -> Dict:
|
|
"""
|
|
Auto-detect ESP chip type and return comprehensive information
|
|
|
|
Args:
|
|
port: Serial port path (e.g., '/dev/ttyUSB0', 'COM3')
|
|
baud: Connection baud rate
|
|
connect_attempts: Number of connection retry attempts
|
|
|
|
Returns:
|
|
Dict with chip info: type, features, MAC address, flash size, etc.
|
|
"""
|
|
try:
|
|
with detect_chip(port, baud=baud, connect_attempts=connect_attempts) as esp:
|
|
return {
|
|
"success": True,
|
|
"chip_type": esp.get_chip_description(),
|
|
"chip_revision": esp.get_chip_revision(),
|
|
"features": esp.get_chip_features(),
|
|
"mac_address": esp.read_mac().hex(':'),
|
|
"flash_size": esp.flash_size,
|
|
"crystal_freq": esp.get_crystal_freq(),
|
|
"port": port,
|
|
"stub_supported": hasattr(esp, 'STUB_SUPPORTED') and esp.STUB_SUPPORTED
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e), "port": port}
|
|
|
|
@self.app.tool("esp_connect_advanced")
|
|
async def connect_with_strategies(
|
|
port: str,
|
|
enable_stub: bool = True,
|
|
high_speed: bool = True
|
|
) -> str:
|
|
"""
|
|
Connect to ESP chip with multiple fallback strategies
|
|
|
|
Args:
|
|
port: Serial port path
|
|
enable_stub: Whether to load the flasher stub for better performance
|
|
high_speed: Attempt high-speed connection first
|
|
|
|
Returns:
|
|
Connection status and optimization details
|
|
"""
|
|
strategies = [
|
|
{'baud': 460800 if high_speed else 115200, 'connect_mode': 'default-reset'},
|
|
{'baud': 115200, 'connect_mode': 'usb-reset'},
|
|
{'baud': 115200, 'connect_mode': 'manual-reset'}
|
|
]
|
|
|
|
for i, strategy in enumerate(strategies):
|
|
try:
|
|
with detect_chip(port, **strategy) as esp:
|
|
chip_info = esp.get_chip_description()
|
|
|
|
if enable_stub and hasattr(esp, 'STUB_SUPPORTED') and esp.STUB_SUPPORTED:
|
|
esp = run_stub(esp)
|
|
return f"✓ Connected to {chip_info} with stub flasher (strategy {i+1})"
|
|
else:
|
|
return f"✓ Connected to {chip_info} (strategy {i+1})"
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
return f"❌ Failed to connect to {port} with all strategies"
|
|
|
|
@self.app.tool("esp_load_test_firmware")
|
|
async def load_firmware_to_ram(
|
|
port: str,
|
|
firmware_path: str
|
|
) -> str:
|
|
"""
|
|
Load and execute firmware in RAM for testing (no flash wear)
|
|
|
|
Args:
|
|
port: Serial port path
|
|
firmware_path: Path to firmware binary file
|
|
|
|
Returns:
|
|
Execution status and any output
|
|
"""
|
|
try:
|
|
with detect_chip(port) as esp:
|
|
load_ram(esp, firmware_path)
|
|
run(esp)
|
|
return f"✓ Firmware loaded and executed in RAM from {firmware_path}"
|
|
except Exception as e:
|
|
return f"❌ Failed to load firmware: {e}"
|
|
|
|
@self.app.tool("esp_reset_advanced")
|
|
async def reset_chip_advanced(
|
|
port: str,
|
|
reset_mode: str = "hard-reset"
|
|
) -> str:
|
|
"""
|
|
Reset ESP chip with specific reset mode
|
|
|
|
Args:
|
|
port: Serial port path
|
|
reset_mode: Reset type ('hard-reset', 'soft-reset', 'no-reset')
|
|
|
|
Returns:
|
|
Reset operation status
|
|
"""
|
|
valid_modes = ['hard-reset', 'soft-reset', 'no-reset']
|
|
if reset_mode not in valid_modes:
|
|
return f"❌ Invalid reset mode. Use: {', '.join(valid_modes)}"
|
|
|
|
try:
|
|
with detect_chip(port) as esp:
|
|
reset_chip(esp, reset_mode)
|
|
return f"✓ Chip reset using {reset_mode} mode"
|
|
except Exception as e:
|
|
return f"❌ Reset failed: {e}"
|
|
|
|
def register_resources(self):
|
|
"""Register MCP resources for real-time chip information"""
|
|
|
|
@self.app.resource("esp://chips")
|
|
async def list_connected_chips() -> str:
|
|
"""List all connected ESP chips with basic information"""
|
|
# Implementation to scan common ports and detect chips
|
|
# Returns formatted list of connected devices
|
|
pass
|
|
```
|
|
|
|
### 2. FlashManager Component
|
|
|
|
```python
|
|
# components/flash_manager.py
|
|
from esptool.cmds import (
|
|
attach_flash, write_flash, read_flash, erase_flash,
|
|
verify_flash, flash_id, read_flash_status
|
|
)
|
|
|
|
class FlashManager:
|
|
"""Advanced flash memory operations and optimization"""
|
|
|
|
def register_tools(self):
|
|
|
|
@self.app.tool("esp_flash_firmware")
|
|
async def flash_firmware_advanced(
|
|
port: str,
|
|
firmware_files: List[Dict[str, any]],
|
|
verify: bool = True,
|
|
optimize: bool = True
|
|
) -> str:
|
|
"""
|
|
Flash multiple firmware files with verification and optimization
|
|
|
|
Args:
|
|
port: Serial port path
|
|
firmware_files: List of {address: int, file: str} mappings
|
|
verify: Verify flash after writing
|
|
optimize: Use performance optimizations
|
|
|
|
Returns:
|
|
Flashing status with timing and verification results
|
|
"""
|
|
try:
|
|
with detect_chip(port) as esp:
|
|
if optimize and hasattr(esp, 'STUB_SUPPORTED'):
|
|
esp = run_stub(esp)
|
|
|
|
attach_flash(esp)
|
|
|
|
# Convert file list to esptool format
|
|
flash_files = [(item['address'], item['file']) for item in firmware_files]
|
|
|
|
write_flash(esp, flash_files)
|
|
|
|
if verify:
|
|
verify_flash(esp, flash_files)
|
|
|
|
reset_chip(esp, 'hard-reset')
|
|
|
|
return f"✓ Flashed {len(firmware_files)} files successfully"
|
|
|
|
except Exception as e:
|
|
return f"❌ Flash operation failed: {e}"
|
|
|
|
@self.app.tool("esp_flash_analyze")
|
|
async def analyze_flash_usage(port: str) -> Dict:
|
|
"""
|
|
Analyze flash memory layout and usage statistics
|
|
|
|
Args:
|
|
port: Serial port path
|
|
|
|
Returns:
|
|
Detailed flash analysis including used/free space, partitions
|
|
"""
|
|
# Implementation for flash analysis
|
|
pass
|
|
|
|
@self.app.tool("esp_flash_backup")
|
|
async def backup_flash_contents(
|
|
port: str,
|
|
output_directory: str,
|
|
include_partitions: bool = True
|
|
) -> str:
|
|
"""
|
|
Create complete backup of ESP flash contents
|
|
|
|
Args:
|
|
port: Serial port path
|
|
output_directory: Where to save backup files
|
|
include_partitions: Whether to backup individual partitions
|
|
|
|
Returns:
|
|
Backup operation status and file locations
|
|
"""
|
|
# Implementation for flash backup
|
|
pass
|
|
```
|
|
|
|
### 3. PartitionManager Component
|
|
|
|
```python
|
|
# components/partition_manager.py
|
|
class PartitionManager:
|
|
"""ESP partition table creation and management"""
|
|
|
|
def register_tools(self):
|
|
|
|
@self.app.tool("esp_partition_create_ota")
|
|
async def create_ota_partition_table(
|
|
app_size: str = "1MB",
|
|
ota_data_size: str = "8KB",
|
|
nvs_size: str = "24KB",
|
|
output_file: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Create optimized partition table for OTA updates
|
|
|
|
Args:
|
|
app_size: Size for each app partition (factory + OTA)
|
|
ota_data_size: Size for OTA data partition
|
|
nvs_size: Size for NVS partition
|
|
output_file: Output partition CSV file path
|
|
|
|
Returns:
|
|
Generated partition table path and layout summary
|
|
"""
|
|
# Implementation for OTA partition creation
|
|
pass
|
|
|
|
@self.app.tool("esp_partition_custom")
|
|
async def create_custom_partition_table(
|
|
partitions: List[Dict],
|
|
output_file: str
|
|
) -> str:
|
|
"""
|
|
Create custom partition table from specification
|
|
|
|
Args:
|
|
partitions: List of partition definitions
|
|
output_file: Output CSV file path
|
|
|
|
Returns:
|
|
Partition table creation status
|
|
"""
|
|
# Implementation for custom partition tables
|
|
pass
|
|
```
|
|
|
|
### 4. SecurityManager Component
|
|
|
|
```python
|
|
# components/security_manager.py
|
|
from esptool.cmds import get_security_info
|
|
|
|
class SecurityManager:
|
|
"""ESP security features and eFuse management"""
|
|
|
|
def register_tools(self):
|
|
|
|
@self.app.tool("esp_security_audit")
|
|
async def security_audit(port: str) -> Dict:
|
|
"""
|
|
Comprehensive security audit of ESP chip
|
|
|
|
Args:
|
|
port: Serial port path
|
|
|
|
Returns:
|
|
Security status including eFuses, encryption, secure boot
|
|
"""
|
|
try:
|
|
with detect_chip(port) as esp:
|
|
security_info = get_security_info(esp)
|
|
return {
|
|
"success": True,
|
|
"security_info": security_info,
|
|
"recommendations": self._generate_security_recommendations(security_info)
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
@self.app.tool("esp_enable_flash_encryption")
|
|
async def enable_flash_encryption(
|
|
port: str,
|
|
key_file: Optional[str] = None
|
|
) -> str:
|
|
"""
|
|
Enable flash encryption with optional custom key
|
|
|
|
Args:
|
|
port: Serial port path
|
|
key_file: Optional custom encryption key file
|
|
|
|
Returns:
|
|
Encryption setup status and security warnings
|
|
"""
|
|
# Implementation for flash encryption setup
|
|
pass
|
|
```
|
|
|
|
## 🔄 Asynchronous Operations & Performance
|
|
|
|
### Background Task Management
|
|
|
|
```python
|
|
# Enhanced async support for long-running operations
|
|
class AsyncFlashManager:
|
|
def __init__(self):
|
|
self.active_operations = {}
|
|
|
|
@self.app.tool("esp_flash_firmware_async")
|
|
async def flash_firmware_background(
|
|
port: str,
|
|
firmware_files: List[Dict],
|
|
operation_id: Optional[str] = None
|
|
) -> str:
|
|
"""Start firmware flashing in background with progress tracking"""
|
|
|
|
if not operation_id:
|
|
operation_id = f"flash_{port}_{int(time.time())}"
|
|
|
|
# Start background task
|
|
task = asyncio.create_task(self._flash_firmware_task(port, firmware_files))
|
|
self.active_operations[operation_id] = task
|
|
|
|
return f"✓ Flashing started (ID: {operation_id}). Use esp_operation_status to check progress."
|
|
|
|
@self.app.tool("esp_operation_status")
|
|
async def check_operation_status(operation_id: str) -> Dict:
|
|
"""Check status of background operation"""
|
|
if operation_id not in self.active_operations:
|
|
return {"error": "Operation not found"}
|
|
|
|
task = self.active_operations[operation_id]
|
|
return {
|
|
"operation_id": operation_id,
|
|
"done": task.done(),
|
|
"result": task.result() if task.done() else None
|
|
}
|
|
```
|
|
|
|
## 📋 Resource API Design
|
|
|
|
### Real-time ESP Information
|
|
|
|
```python
|
|
# MCP Resources for live ESP data
|
|
@self.app.resource("esp://chips")
|
|
async def connected_chips() -> str:
|
|
"""JSON list of all connected ESP chips"""
|
|
|
|
@self.app.resource("esp://flash/{port}")
|
|
async def flash_status(port: str) -> str:
|
|
"""Real-time flash status for specific chip"""
|
|
|
|
@self.app.resource("esp://security/{port}")
|
|
async def security_status(port: str) -> str:
|
|
"""Current security configuration"""
|
|
|
|
@self.app.resource("esp://partitions/{port}")
|
|
async def partition_info(port: str) -> str:
|
|
"""Live partition table information"""
|
|
```
|
|
|
|
This API design provides a comprehensive, production-ready foundation for ESP development workflows while maintaining the conversational AI-first approach that makes the Arduino MCP server so effective. |