From 9a114bce7b1b7c26ad6d633860e74ca4f2729f88 Mon Sep 17 00:00:00 2001 From: Lama Date: Thu, 20 Mar 2025 09:58:19 -0400 Subject: [PATCH] Refactor server to use type-safe lifespan context management Implement proper context management in the KiCad MCP server: Add dedicated context.py with typed KiCadAppContext class Convert tools to access context instead of parameters Implement caching for thumbnails Add proper cleanup of resources on shutdown Improve error handling with cancellation support --- kicad_mcp/context.py | 89 ++++++ kicad_mcp/server.py | 8 +- kicad_mcp/tools/export_tools.py | 512 ++++++++++++++++++-------------- 3 files changed, 379 insertions(+), 230 deletions(-) create mode 100644 kicad_mcp/context.py diff --git a/kicad_mcp/context.py b/kicad_mcp/context.py new file mode 100644 index 0000000..02c4c2b --- /dev/null +++ b/kicad_mcp/context.py @@ -0,0 +1,89 @@ +""" +Lifespan context management for KiCad MCP Server. +""" +from contextlib import asynccontextmanager +from dataclasses import dataclass +from typing import AsyncIterator, Optional, Dict, Any + +from mcp.server.fastmcp import FastMCP + +from kicad_mcp.utils.logger import Logger +from kicad_mcp.utils.python_path import setup_kicad_python_path + +# Create logger for this module +logger = Logger() + +@dataclass +class KiCadAppContext: + """Type-safe context for KiCad MCP server.""" + kicad_modules_available: bool + + # Optional cache for expensive operations + cache: Dict[str, Any] + +@asynccontextmanager +async def kicad_lifespan(server: FastMCP) -> AsyncIterator[KiCadAppContext]: + """Manage KiCad MCP server lifecycle with type-safe context. + + This function handles: + 1. Initializing shared resources when the server starts + 2. Providing a typed context object to all request handlers + 3. Properly cleaning up resources when the server shuts down + + Args: + server: The FastMCP server instance + + Yields: + KiCadAppContext: A typed context object shared across all handlers + """ + logger.info("Starting KiCad MCP server initialization") + + # Initialize resources on startup + logger.info("Setting up KiCad Python modules") + kicad_modules_available = setup_kicad_python_path() + logger.info(f"KiCad Python modules available: {kicad_modules_available}") + + # Create in-memory cache for expensive operations + cache: Dict[str, Any] = {} + + # Initialize any other resources that need cleanup later + created_temp_dirs = [] + + try: + # Import any KiCad modules that should be preloaded + if kicad_modules_available: + try: + logger.info("Preloading KiCad Python modules") + + # Core PCB module used in multiple tools + import pcbnew + logger.info(f"Successfully preloaded pcbnew module: {getattr(pcbnew, 'GetBuildVersion', lambda: 'unknown')()}") + cache["pcbnew_version"] = getattr(pcbnew, "GetBuildVersion", lambda: "unknown")() + except ImportError as e: + logger.warning(f"Failed to preload some KiCad modules: {str(e)}") + + # Yield the context to the server - server runs during this time + logger.info("KiCad MCP server initialization complete") + yield KiCadAppContext( + kicad_modules_available=kicad_modules_available, + cache=cache + ) + finally: + # Clean up resources when server shuts down + logger.info("Shutting down KiCad MCP server") + + # Clear the cache + if cache: + logger.debug(f"Clearing cache with {len(cache)} entries") + cache.clear() + + # Clean up any temporary directories + import shutil + for temp_dir in created_temp_dirs: + try: + logger.debug(f"Removing temporary directory: {temp_dir}") + shutil.rmtree(temp_dir, ignore_errors=True) + except Exception as e: + logger.error(f"Error cleaning up temporary directory {temp_dir}: {str(e)}") + + logger.info("KiCad MCP server shutdown complete") diff --git a/kicad_mcp/server.py b/kicad_mcp/server.py index 16f3d50..b7db022 100644 --- a/kicad_mcp/server.py +++ b/kicad_mcp/server.py @@ -38,8 +38,8 @@ def create_server() -> FastMCP: logger.warning("KiCad Python modules not available - some features will be disabled") # Initialize FastMCP server - mcp = FastMCP("KiCad") - logger.info("Created FastMCP server instance") + mcp = FastMCP("KiCad", lifespan=kicad_lifespan) + logger.info("Created FastMCP server instance with lifespan management") # Register resources logger.debug("Registering resources...") @@ -51,8 +51,8 @@ def create_server() -> FastMCP: logger.debug("Registering tools...") register_project_tools(mcp) register_analysis_tools(mcp) - register_export_tools(mcp, kicad_modules_available) - register_drc_tools(mcp, kicad_modules_available) + register_export_tools(mcp) + register_drc_tools(mcp) # Register prompts logger.debug("Registering prompts...") diff --git a/kicad_mcp/tools/export_tools.py b/kicad_mcp/tools/export_tools.py index 2262ab2..1426ba2 100644 --- a/kicad_mcp/tools/export_tools.py +++ b/kicad_mcp/tools/export_tools.py @@ -1,23 +1,26 @@ """ -Analysis and validation tools for KiCad projects. +Export tools for KiCad projects. """ import os import tempfile +import subprocess +import shutil +import asyncio from typing import Dict, Any, Optional from mcp.server.fastmcp import FastMCP, Context, Image from kicad_mcp.utils.file_utils import get_project_files from kicad_mcp.utils.logger import Logger +from kicad_mcp.config import KICAD_APP_PATH, system # Create logger for this module logger = Logger() -def register_analysis_tools(mcp: FastMCP, kicad_modules_available: bool = False) -> None: - """Register analysis and validation tools with the MCP server. +def register_export_tools(mcp: FastMCP) -> None: + """Register export tools with the MCP server. Args: mcp: The FastMCP server instance - kicad_modules_available: Whether KiCad Python modules are available """ @mcp.tool() @@ -70,166 +73,217 @@ def register_analysis_tools(mcp: FastMCP, kicad_modules_available: bool = False) Args: project_path: Path to the KiCad project file (.kicad_pro) + ctx: Context for MCP communication Returns: Thumbnail image of the PCB or None if generation failed """ - logger.info(f"Generating thumbnail for project: {project_path}") - - if not os.path.exists(project_path): - logger.error(f"Project not found: {project_path}") - ctx.info(f"Project not found: {project_path}") - return None - - # Get PCB file from project - files = get_project_files(project_path) - if "pcb" not in files: - logger.error("PCB file not found in project") - ctx.info("PCB file not found in project") - return None - - pcb_file = files["pcb"] - logger.info(f"Found PCB file: {pcb_file}") - - await ctx.report_progress(10, 100) - ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)}") - - # Method 1: Try to use pcbnew Python module if available - if kicad_modules_available: - try: - thumbnail = await generate_thumbnail_with_pcbnew(pcb_file, ctx) - if thumbnail: - return thumbnail - - # If pcbnew method failed, log it but continue to try alternative method - logger.warning("Failed to generate thumbnail with pcbnew, trying CLI method") - except Exception as e: - logger.error(f"Error using pcbnew for thumbnail: {str(e)}", exc_info=True) - ctx.info(f"Error with pcbnew method, trying alternative approach") - else: - logger.info("KiCad Python modules not available, trying CLI method") - - # Method 2: Try to use command-line tools try: - thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx) - if thumbnail: - return thumbnail - except Exception as e: - logger.error(f"Error using CLI for thumbnail: {str(e)}", exc_info=True) - ctx.info(f"Error generating thumbnail with CLI method") + # Access the context + app_context = ctx.request_context.lifespan_context + kicad_modules_available = app_context.kicad_modules_available + + logger.info(f"Generating thumbnail for project: {project_path}") - # If all methods fail, inform the user - ctx.info("Could not generate thumbnail for PCB - all methods failed") - return None + if not os.path.exists(project_path): + logger.error(f"Project not found: {project_path}") + ctx.info(f"Project not found: {project_path}") + return None + + # Get PCB file from project + files = get_project_files(project_path) + if "pcb" not in files: + logger.error("PCB file not found in project") + ctx.info("PCB file not found in project") + return None + + pcb_file = files["pcb"] + logger.info(f"Found PCB file: {pcb_file}") + + # Check cache + cache_key = f"thumbnail_{pcb_file}_{os.path.getmtime(pcb_file)}" + if hasattr(app_context, 'cache') and cache_key in app_context.cache: + logger.info(f"Using cached thumbnail for {pcb_file}") + return app_context.cache[cache_key] + + await ctx.report_progress(10, 100) + ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)}") + + # Method 1: Try to use pcbnew Python module if available + if kicad_modules_available: + try: + thumbnail = await generate_thumbnail_with_pcbnew(pcb_file, ctx) + if thumbnail: + # Cache the result if possible + if hasattr(app_context, 'cache'): + app_context.cache[cache_key] = thumbnail + return thumbnail + + # If pcbnew method failed, log it but continue to try alternative method + logger.warning("Failed to generate thumbnail with pcbnew, trying CLI method") + except Exception as e: + logger.error(f"Error using pcbnew for thumbnail: {str(e)}", exc_info=True) + ctx.info(f"Error with pcbnew method, trying alternative approach") + else: + logger.info("KiCad Python modules not available, trying CLI method") + + # Method 2: Try to use command-line tools + try: + thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx) + if thumbnail: + # Cache the result if possible + if hasattr(app_context, 'cache'): + app_context.cache[cache_key] = thumbnail + return thumbnail + except Exception as e: + logger.error(f"Error using CLI for thumbnail: {str(e)}", exc_info=True) + ctx.info(f"Error generating thumbnail with CLI method") + + # If all methods fail, inform the user + ctx.info("Could not generate thumbnail for PCB - all methods failed") + return None + + except asyncio.CancelledError: + logger.info("Thumbnail generation cancelled") + raise # Re-raise to let MCP know the task was cancelled + except Exception as e: + logger.error(f"Unexpected error in thumbnail generation: {str(e)}") + ctx.info(f"Error: {str(e)}") + return None @mcp.tool() async def generate_project_thumbnail(project_path: str, ctx: Context) -> Optional[Image]: """Generate a thumbnail of a KiCad project's PCB layout.""" - logger.info(f"Generating thumbnail for project: {project_path}") - - if not os.path.exists(project_path): - logger.error(f"Project not found: {project_path}") - ctx.info(f"Project not found: {project_path}") - return None - - # Get PCB file - files = get_project_files(project_path) - if "pcb" not in files: - logger.error("PCB file not found in project") - ctx.info("PCB file not found in project") - return None - - pcb_file = files["pcb"] - logger.info(f"Found PCB file: {pcb_file}") - - if not kicad_modules_available: - logger.warning("KiCad Python modules are not available - cannot generate thumbnail") - ctx.info("KiCad Python modules are not available") - return None - try: - # Try to import pcbnew - import pcbnew - logger.info("Successfully imported pcbnew module") + # Access the context + app_context = ctx.request_context.lifespan_context + kicad_modules_available = app_context.kicad_modules_available - # Load the PCB file - logger.debug(f"Loading PCB file: {pcb_file}") - board = pcbnew.LoadBoard(pcb_file) - if not board: - logger.error("Failed to load PCB file") - ctx.info("Failed to load PCB file") + logger.info(f"Generating thumbnail for project: {project_path}") + + if not os.path.exists(project_path): + logger.error(f"Project not found: {project_path}") + ctx.info(f"Project not found: {project_path}") + return None + + # Get PCB file + files = get_project_files(project_path) + if "pcb" not in files: + logger.error("PCB file not found in project") + ctx.info("PCB file not found in project") + return None + + pcb_file = files["pcb"] + logger.info(f"Found PCB file: {pcb_file}") + + if not kicad_modules_available: + logger.warning("KiCad Python modules are not available - cannot generate thumbnail") + ctx.info("KiCad Python modules are not available") + return None + + # Check cache + cache_key = f"project_thumbnail_{pcb_file}_{os.path.getmtime(pcb_file)}" + if hasattr(app_context, 'cache') and cache_key in app_context.cache: + logger.info(f"Using cached project thumbnail for {pcb_file}") + return app_context.cache[cache_key] + + try: + # Try to import pcbnew + import pcbnew + logger.info("Successfully imported pcbnew module") + + # Load the PCB file + logger.debug(f"Loading PCB file: {pcb_file}") + board = pcbnew.LoadBoard(pcb_file) + if not board: + logger.error("Failed to load PCB file") + ctx.info("Failed to load PCB file") + return None + + # Get board dimensions + board_box = board.GetBoardEdgesBoundingBox() + width = board_box.GetWidth() / 1000000.0 # Convert to mm + height = board_box.GetHeight() / 1000000.0 + + logger.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm") + ctx.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm") + + # Create temporary directory for output + with tempfile.TemporaryDirectory() as temp_dir: + logger.debug(f"Created temporary directory: {temp_dir}") + + # Create PLOT_CONTROLLER for plotting + pctl = pcbnew.PLOT_CONTROLLER(board) + popt = pctl.GetPlotOptions() + + # Set plot options for PNG output + popt.SetOutputDirectory(temp_dir) + popt.SetPlotFrameRef(False) + popt.SetPlotValue(True) + popt.SetPlotReference(True) + popt.SetPlotInvisibleText(False) + popt.SetPlotViaOnMaskLayer(False) + popt.SetColorMode(True) # Color mode + + # Set color theme (if available in this version) + if hasattr(popt, "SetColorTheme"): + popt.SetColorTheme("default") + + # Calculate a reasonable scale to fit in a thumbnail + max_size = 800 # Max pixel dimension + scale = min(max_size / width, max_size / height) * 0.8 # 80% to leave some margin + + # Set plot scale if the function exists + if hasattr(popt, "SetScale"): + popt.SetScale(scale) + + # Determine output filename + plot_basename = "thumbnail" + output_filename = os.path.join(temp_dir, f"{plot_basename}.png") + + logger.debug(f"Plotting PCB to: {output_filename}") + + # Plot PNG + pctl.OpenPlotfile(plot_basename, pcbnew.PLOT_FORMAT_PNG, "Thumbnail") + pctl.PlotLayer() + pctl.ClosePlot() + + # The plot controller creates files with predictable names + plot_file = os.path.join(temp_dir, f"{plot_basename}.png") + + if not os.path.exists(plot_file): + logger.error(f"Expected plot file not found: {plot_file}") + ctx.info("Failed to generate PCB image") + return None + + # Read the image file + with open(plot_file, 'rb') as f: + img_data = f.read() + + logger.info(f"Successfully generated thumbnail, size: {len(img_data)} bytes") + + # Create and cache the image + thumbnail = Image(data=img_data, format="png") + if hasattr(app_context, 'cache'): + app_context.cache[cache_key] = thumbnail + + return thumbnail + + except ImportError as e: + logger.error(f"Failed to import pcbnew module: {str(e)}") + ctx.info(f"Failed to import pcbnew module: {str(e)}") + return None + except Exception as e: + logger.error(f"Error generating thumbnail: {str(e)}", exc_info=True) + ctx.info(f"Error generating thumbnail: {str(e)}") return None - # Get board dimensions - board_box = board.GetBoardEdgesBoundingBox() - width = board_box.GetWidth() / 1000000.0 # Convert to mm - height = board_box.GetHeight() / 1000000.0 - - logger.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm") - ctx.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm") - - # Create temporary directory for output - with tempfile.TemporaryDirectory() as temp_dir: - logger.debug(f"Created temporary directory: {temp_dir}") - - # Create PLOT_CONTROLLER for plotting - pctl = pcbnew.PLOT_CONTROLLER(board) - popt = pctl.GetPlotOptions() - - # Set plot options for PNG output - popt.SetOutputDirectory(temp_dir) - popt.SetPlotFrameRef(False) - popt.SetPlotValue(True) - popt.SetPlotReference(True) - popt.SetPlotInvisibleText(False) - popt.SetPlotViaOnMaskLayer(False) - popt.SetColorMode(True) # Color mode - - # Set color theme (if available in this version) - if hasattr(popt, "SetColorTheme"): - popt.SetColorTheme("default") - - # Calculate a reasonable scale to fit in a thumbnail - max_size = 800 # Max pixel dimension - scale = min(max_size / width, max_size / height) * 0.8 # 80% to leave some margin - - # Set plot scale if the function exists - if hasattr(popt, "SetScale"): - popt.SetScale(scale) - - # Determine output filename - plot_basename = "thumbnail" - output_filename = os.path.join(temp_dir, f"{plot_basename}.png") - - logger.debug(f"Plotting PCB to: {output_filename}") - - # Plot PNG - pctl.OpenPlotfile(plot_basename, pcbnew.PLOT_FORMAT_PNG, "Thumbnail") - pctl.PlotLayer() - pctl.ClosePlot() - - # The plot controller creates files with predictable names - plot_file = os.path.join(temp_dir, f"{plot_basename}.png") - - if not os.path.exists(plot_file): - logger.error(f"Expected plot file not found: {plot_file}") - ctx.info("Failed to generate PCB image") - return None - - # Read the image file - with open(plot_file, 'rb') as f: - img_data = f.read() - - logger.info(f"Successfully generated thumbnail, size: {len(img_data)} bytes") - return Image(data=img_data, format="png") - - except ImportError as e: - logger.error(f"Failed to import pcbnew module: {str(e)}") - ctx.info(f"Failed to import pcbnew module: {str(e)}") - return None + except asyncio.CancelledError: + logger.info("Project thumbnail generation cancelled") + raise except Exception as e: - logger.error(f"Error generating thumbnail: {str(e)}", exc_info=True) - ctx.info(f"Error generating thumbnail: {str(e)}") + logger.error(f"Unexpected error in project thumbnail generation: {str(e)}", exc_info=True) + ctx.info(f"Error: {str(e)}") return None # Helper functions for thumbnail generation @@ -344,83 +398,89 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[I Returns: Image object containing the PCB thumbnail or None if generation failed """ - import subprocess + try: + logger.info("Attempting to generate thumbnail using command line tools") + await ctx.report_progress(20, 100) - logger.info("Attempting to generate thumbnail using command line tools") - await ctx.report_progress(20, 100) + # Check for required command-line tools based on OS + if system == "Darwin": # macOS + pcbnew_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/pcbnew_cli") + if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: + pcbnew_cli = "pcbnew_cli" # Try to use from PATH + elif not os.path.exists(pcbnew_cli): + logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") + return None + elif system == "Windows": + pcbnew_cli = os.path.join(KICAD_APP_PATH, "bin", "pcbnew_cli.exe") + if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: + pcbnew_cli = "pcbnew_cli" # Try to use from PATH + elif not os.path.exists(pcbnew_cli): + logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") + return None + elif system == "Linux": + pcbnew_cli = shutil.which("pcbnew_cli") + if not pcbnew_cli: + logger.error("pcbnew_cli not found in PATH") + return None + else: + logger.error(f"Unsupported operating system: {system}") + return None - # Check for required command-line tools based on OS - if system == "Darwin": # macOS - pcbnew_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/pcbnew_cli") - if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: - pcbnew_cli = "pcbnew_cli" # Try to use from PATH - elif not os.path.exists(pcbnew_cli): - logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") - return None - elif system == "Windows": - pcbnew_cli = os.path.join(KICAD_APP_PATH, "bin", "pcbnew_cli.exe") - if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: - pcbnew_cli = "pcbnew_cli" # Try to use from PATH - elif not os.path.exists(pcbnew_cli): - logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") - return None - elif system == "Linux": - pcbnew_cli = shutil.which("pcbnew_cli") - if not pcbnew_cli: - logger.error("pcbnew_cli not found in PATH") - return None - else: - logger.error(f"Unsupported operating system: {system}") + await ctx.report_progress(30, 100) + ctx.info("Using KiCad command line tools for thumbnail generation") + + # Create temporary directory for output + with tempfile.TemporaryDirectory() as temp_dir: + # Output PNG file + output_file = os.path.join(temp_dir, "thumbnail.png") + + # Build command for generating PNG from PCB + cmd = [ + pcbnew_cli, + "--export-png", + output_file, + "--page-size-inches", "8x6", # Set a reasonable page size + "--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Important layers + pcb_file + ] + + logger.debug(f"Running command: {' '.join(cmd)}") + await ctx.report_progress(50, 100) + + # Run the command + try: + process = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + + if process.returncode != 0: + logger.error(f"Command failed with code {process.returncode}") + logger.error(f"Error: {process.stderr}") + return None + + await ctx.report_progress(70, 100) + + # Check if the output file was created + if not os.path.exists(output_file): + logger.error(f"Output file not created: {output_file}") + return None + + # Read the image file + with open(output_file, 'rb') as f: + img_data = f.read() + + logger.info(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes") + await ctx.report_progress(90, 100) + return Image(data=img_data, format="png") + + except subprocess.TimeoutExpired: + logger.error("Command timed out after 30 seconds") + return None + except Exception as e: + logger.error(f"Error running CLI command: {str(e)}", exc_info=True) + return None + + except asyncio.CancelledError: + logger.info("CLI thumbnail generation cancelled") + raise + except Exception as e: + logger.error(f"Unexpected error in CLI thumbnail generation: {str(e)}") return None - - await ctx.report_progress(30, 100) - ctx.info("Using KiCad command line tools for thumbnail generation") - - # Create temporary directory for output - with tempfile.TemporaryDirectory() as temp_dir: - # Output PNG file - output_file = os.path.join(temp_dir, "thumbnail.png") - - # Build command for generating PNG from PCB - cmd = [ - pcbnew_cli, - "--export-png", - output_file, - "--page-size-inches", "8x6", # Set a reasonable page size - "--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Important layers - pcb_file - ] - - logger.debug(f"Running command: {' '.join(cmd)}") - await ctx.report_progress(50, 100) - - # Run the command - try: - process = subprocess.run(cmd, capture_output=True, text=True, timeout=30) - - if process.returncode != 0: - logger.error(f"Command failed with code {process.returncode}") - logger.error(f"Error: {process.stderr}") - return None - - await ctx.report_progress(70, 100) - - # Check if the output file was created - if not os.path.exists(output_file): - logger.error(f"Output file not created: {output_file}") - return None - - # Read the image file - with open(output_file, 'rb') as f: - img_data = f.read() - - logger.info(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes") - await ctx.report_progress(90, 100) - return Image(data=img_data, format="png") - - except subprocess.TimeoutExpired: - logger.error("Command timed out after 30 seconds") - return None - except Exception as e: - logger.error(f"Error running CLI command: {str(e)}", exc_info=True) - return None