""" Configuration management for MCP ESPTool Server Handles environment variables, MCP roots detection, and configuration validation. """ import logging import os from dataclasses import dataclass, field from pathlib import Path from typing import Any from fastmcp import Context logger = logging.getLogger(__name__) @dataclass class ESPToolServerConfig: """Configuration for MCP ESPTool Server""" # Core paths and tools esptool_path: str = field(default="esptool") esp_idf_path: Path | None = field(default=None) project_roots: list[Path] = field(default_factory=list) # Communication settings default_baud_rate: int = field(default=460800) connection_timeout: int = field(default=30) enable_stub_flasher: bool = field(default=True) # MCP integration settings enable_progress: bool = field(default=True) enable_elicitation: bool = field(default=True) log_level: str = field(default="INFO") # Performance settings max_concurrent_operations: int = field(default=5) operation_timeout: int = field(default=300) # Development settings dev_enable_hot_reload: bool = field(default=False) dev_mock_hardware: bool = field(default=False) dev_enable_tracing: bool = field(default=False) # Production settings production_mode: bool = field(default=False) enable_security_audit: bool = field(default=True) require_confirmations: bool = field(default=True) # QEMU emulation settings qemu_xtensa_path: str | None = field(default=None) qemu_riscv_path: str | None = field(default=None) qemu_base_port: int = field(default=5555) qemu_max_instances: int = field(default=4) def __post_init__(self): """Post-initialization setup and validation""" self._load_environment_variables() self._setup_esp_idf_path() self._setup_qemu_paths() self._setup_project_roots() self._validate_configuration() self._setup_logging() def _load_environment_variables(self) -> None: """Load configuration from environment variables""" self.esptool_path = os.getenv("ESPTOOL_PATH", self.esptool_path) self.default_baud_rate = int( os.getenv("ESP_DEFAULT_BAUD_RATE", str(self.default_baud_rate)) ) self.connection_timeout = int( os.getenv("ESP_CONNECTION_TIMEOUT", str(self.connection_timeout)) ) self.enable_stub_flasher = ( os.getenv("ESP_ENABLE_STUB_FLASHER", str(self.enable_stub_flasher)).lower() == "true" ) self.enable_progress = ( os.getenv("MCP_ENABLE_PROGRESS", str(self.enable_progress)).lower() == "true" ) self.enable_elicitation = ( os.getenv("MCP_ENABLE_ELICITATION", str(self.enable_elicitation)).lower() == "true" ) self.log_level = os.getenv("MCP_LOG_LEVEL", self.log_level) self.max_concurrent_operations = int( os.getenv("ESP_MAX_CONCURRENT_OPERATIONS", str(self.max_concurrent_operations)) ) self.operation_timeout = int( os.getenv("ESP_OPERATION_TIMEOUT", str(self.operation_timeout)) ) self.dev_enable_hot_reload = ( os.getenv("DEV_ENABLE_HOT_RELOAD", str(self.dev_enable_hot_reload)).lower() == "true" ) self.dev_mock_hardware = ( os.getenv("DEV_MOCK_HARDWARE", str(self.dev_mock_hardware)).lower() == "true" ) self.dev_enable_tracing = ( os.getenv("DEV_ENABLE_TRACING", str(self.dev_enable_tracing)).lower() == "true" ) self.production_mode = ( os.getenv("PRODUCTION_MODE", str(self.production_mode)).lower() == "true" ) self.enable_security_audit = ( os.getenv("PROD_ENABLE_SECURITY_AUDIT", str(self.enable_security_audit)).lower() == "true" ) self.require_confirmations = ( os.getenv("PROD_REQUIRE_CONFIRMATIONS", str(self.require_confirmations)).lower() == "true" ) # QEMU settings self.qemu_xtensa_path = os.getenv("QEMU_XTENSA_PATH", self.qemu_xtensa_path) self.qemu_riscv_path = os.getenv("QEMU_RISCV_PATH", self.qemu_riscv_path) self.qemu_base_port = int(os.getenv("QEMU_BASE_PORT", str(self.qemu_base_port))) self.qemu_max_instances = int( os.getenv("QEMU_MAX_INSTANCES", str(self.qemu_max_instances)) ) def _setup_esp_idf_path(self) -> None: """Set up ESP-IDF path from environment or auto-detect""" idf_path_env = os.getenv("ESP_IDF_PATH") if idf_path_env: self.esp_idf_path = Path(idf_path_env) else: # Try common ESP-IDF locations common_paths = [ Path.home() / "esp" / "esp-idf", Path("/opt/esp-idf"), Path("/usr/local/esp-idf"), ] for path in common_paths: if path.exists() and (path / "idf.py").exists(): self.esp_idf_path = path logger.info(f"Auto-detected ESP-IDF at: {path}") break def _setup_qemu_paths(self) -> None: """Auto-detect Espressif QEMU fork binaries from ~/.espressif/tools/""" import glob if not self.qemu_xtensa_path: matches = sorted(glob.glob( str(Path.home() / ".espressif/tools/qemu-xtensa/*/qemu/bin/qemu-system-xtensa") )) if matches: self.qemu_xtensa_path = matches[-1] logger.info(f"Auto-detected QEMU Xtensa: {self.qemu_xtensa_path}") if not self.qemu_riscv_path: matches = sorted(glob.glob( str(Path.home() / ".espressif/tools/qemu-riscv32/*/qemu/bin/qemu-system-riscv32") )) if matches: self.qemu_riscv_path = matches[-1] logger.info(f"Auto-detected QEMU RISC-V: {self.qemu_riscv_path}") def _setup_project_roots(self) -> None: """Set up project roots from environment or defaults""" # Check for explicit project roots project_roots_env = os.getenv("MCP_PROJECT_ROOTS") if project_roots_env: self.project_roots = [Path(p.strip()) for p in project_roots_env.split(":")] else: # Default project locations default_roots = [ Path.home() / "esp_projects", Path.home() / "Arduino", Path.home() / "Documents" / "Arduino", Path("/workspace/projects"), # Docker environment ] self.project_roots = [p for p in default_roots if p.exists()] logger.info(f"Project roots: {[str(p) for p in self.project_roots]}") def _validate_configuration(self) -> None: """Validate configuration settings""" errors = [] # Validate esptool availability if not self._check_tool_availability(self.esptool_path): errors.append(f"esptool not found at: {self.esptool_path}") # Validate numeric ranges if self.default_baud_rate not in [9600, 57600, 115200, 230400, 460800, 921600]: logger.warning(f"Unusual baud rate: {self.default_baud_rate}") if self.connection_timeout < 5 or self.connection_timeout > 300: errors.append(f"Connection timeout must be between 5-300s: {self.connection_timeout}") if self.max_concurrent_operations < 1 or self.max_concurrent_operations > 20: errors.append( f"Max concurrent operations must be 1-20: {self.max_concurrent_operations}" ) if errors: raise ValueError(f"Configuration validation failed: {'; '.join(errors)}") def _check_tool_availability(self, tool_path: str) -> bool: """Check if a tool is available in PATH or at specified path""" import shutil return shutil.which(tool_path) is not None def _setup_logging(self) -> None: """Set up logging configuration""" log_level = getattr(logging, self.log_level.upper(), logging.INFO) logging.basicConfig( level=log_level, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) async def initialize_with_context(self, context: Context) -> bool: """Initialize configuration with MCP context to get roots""" try: # Try to get roots from MCP context mcp_roots = await context.list_roots() if mcp_roots: logger.info(f"Found {len(mcp_roots)} MCP roots") # Add MCP roots to project roots, preferring ESP-specific directories for root in mcp_roots: root_path = Path(root.get("uri", "").replace("file://", "")) if root_path.exists(): self.project_roots.append(root_path) # Look for ESP-specific subdirectories esp_subdirs = ["esp_projects", "arduino", "esp32", "esp8266"] for subdir in esp_subdirs: esp_path = root_path / subdir if esp_path.exists(): self.project_roots.append(esp_path) # Remove duplicates while preserving order seen = set() self.project_roots = [ p for p in self.project_roots if not (str(p) in seen or seen.add(str(p))) ] logger.info( f"Updated project roots with MCP context: {len(self.project_roots)} total" ) return True except Exception as e: logger.warning(f"Could not initialize with MCP context: {e}") return False def get_sketch_directory(self) -> Path: """Get the primary sketch directory""" # Prefer MCP roots if available for root in self.project_roots: if "esp" in str(root).lower() or "arduino" in str(root).lower(): return root # Fall back to first available root if self.project_roots: return self.project_roots[0] # Create default if none exist default_dir = Path.home() / "esp_projects" default_dir.mkdir(exist_ok=True) return default_dir def get_idf_available(self) -> bool: """Check if ESP-IDF is available""" if not self.esp_idf_path: return False return self.esp_idf_path.exists() and (self.esp_idf_path / "idf.py").exists() def get_qemu_available(self) -> bool: """Check if at least one QEMU binary is available""" if self.qemu_xtensa_path and Path(self.qemu_xtensa_path).exists(): return True if self.qemu_riscv_path and Path(self.qemu_riscv_path).exists(): return True return False def get_common_ports(self) -> list[str]: """Get list of common ESP device ports for scanning""" import platform system = platform.system().lower() if system == "linux": return [f"/dev/ttyUSB{i}" for i in range(4)] + [f"/dev/ttyACM{i}" for i in range(4)] elif system == "darwin": # macOS return [f"/dev/cu.usbserial-{i:04x}" for i in range(16)] + ["/dev/cu.wchusbserial*"] elif system == "windows": return [f"COM{i}" for i in range(1, 21)] else: return [] def to_dict(self) -> dict[str, Any]: """Convert configuration to dictionary for serialization""" return { "esptool_path": self.esptool_path, "esp_idf_path": str(self.esp_idf_path) if self.esp_idf_path else None, "project_roots": [str(p) for p in self.project_roots], "default_baud_rate": self.default_baud_rate, "connection_timeout": self.connection_timeout, "enable_stub_flasher": self.enable_stub_flasher, "max_concurrent_operations": self.max_concurrent_operations, "production_mode": self.production_mode, "idf_available": self.get_idf_available(), "qemu_available": self.get_qemu_available(), "qemu_xtensa_path": self.qemu_xtensa_path, "qemu_riscv_path": self.qemu_riscv_path, } @classmethod def from_environment(cls) -> "ESPToolServerConfig": """Create configuration from environment variables""" return cls() def __repr__(self) -> str: """String representation of configuration""" return ( f"ESPToolServerConfig(" f"esptool_path='{self.esptool_path}', " f"esp_idf_available={self.get_idf_available()}, " f"project_roots={len(self.project_roots)}, " f"production_mode={self.production_mode})" ) # Global configuration instance _config: ESPToolServerConfig | None = None def get_config() -> ESPToolServerConfig: """Get the global configuration instance""" global _config if _config is None: _config = ESPToolServerConfig.from_environment() return _config def set_config(config: ESPToolServerConfig) -> None: """Set the global configuration instance""" global _config _config = config