diff --git a/kicad_mcp/context.py b/kicad_mcp/context.py new file mode 100644 index 0000000..02c4c2b --- /dev/null +++ b/kicad_mcp/context.py @@ -0,0 +1,89 @@ +""" +Lifespan context management for KiCad MCP Server. +""" +from contextlib import asynccontextmanager +from dataclasses import dataclass +from typing import AsyncIterator, Optional, Dict, Any + +from mcp.server.fastmcp import FastMCP + +from kicad_mcp.utils.logger import Logger +from kicad_mcp.utils.python_path import setup_kicad_python_path + +# Create logger for this module +logger = Logger() + +@dataclass +class KiCadAppContext: + """Type-safe context for KiCad MCP server.""" + kicad_modules_available: bool + + # Optional cache for expensive operations + cache: Dict[str, Any] + +@asynccontextmanager +async def kicad_lifespan(server: FastMCP) -> AsyncIterator[KiCadAppContext]: + """Manage KiCad MCP server lifecycle with type-safe context. + + This function handles: + 1. Initializing shared resources when the server starts + 2. Providing a typed context object to all request handlers + 3. Properly cleaning up resources when the server shuts down + + Args: + server: The FastMCP server instance + + Yields: + KiCadAppContext: A typed context object shared across all handlers + """ + logger.info("Starting KiCad MCP server initialization") + + # Initialize resources on startup + logger.info("Setting up KiCad Python modules") + kicad_modules_available = setup_kicad_python_path() + logger.info(f"KiCad Python modules available: {kicad_modules_available}") + + # Create in-memory cache for expensive operations + cache: Dict[str, Any] = {} + + # Initialize any other resources that need cleanup later + created_temp_dirs = [] + + try: + # Import any KiCad modules that should be preloaded + if kicad_modules_available: + try: + logger.info("Preloading KiCad Python modules") + + # Core PCB module used in multiple tools + import pcbnew + logger.info(f"Successfully preloaded pcbnew module: {getattr(pcbnew, 'GetBuildVersion', lambda: 'unknown')()}") + cache["pcbnew_version"] = getattr(pcbnew, "GetBuildVersion", lambda: "unknown")() + except ImportError as e: + logger.warning(f"Failed to preload some KiCad modules: {str(e)}") + + # Yield the context to the server - server runs during this time + logger.info("KiCad MCP server initialization complete") + yield KiCadAppContext( + kicad_modules_available=kicad_modules_available, + cache=cache + ) + finally: + # Clean up resources when server shuts down + logger.info("Shutting down KiCad MCP server") + + # Clear the cache + if cache: + logger.debug(f"Clearing cache with {len(cache)} entries") + cache.clear() + + # Clean up any temporary directories + import shutil + for temp_dir in created_temp_dirs: + try: + logger.debug(f"Removing temporary directory: {temp_dir}") + shutil.rmtree(temp_dir, ignore_errors=True) + except Exception as e: + logger.error(f"Error cleaning up temporary directory {temp_dir}: {str(e)}") + + logger.info("KiCad MCP server shutdown complete") diff --git a/kicad_mcp/server.py b/kicad_mcp/server.py index 16f3d50..b7db022 100644 --- a/kicad_mcp/server.py +++ b/kicad_mcp/server.py @@ -38,8 +38,8 @@ def create_server() -> FastMCP: logger.warning("KiCad Python modules not available - some features will be disabled") # Initialize FastMCP server - mcp = FastMCP("KiCad") - logger.info("Created FastMCP server instance") + mcp = FastMCP("KiCad", lifespan=kicad_lifespan) + logger.info("Created FastMCP server instance with lifespan management") # Register resources logger.debug("Registering resources...") @@ -51,8 +51,8 @@ def create_server() -> FastMCP: logger.debug("Registering tools...") register_project_tools(mcp) register_analysis_tools(mcp) - register_export_tools(mcp, kicad_modules_available) - register_drc_tools(mcp, kicad_modules_available) + register_export_tools(mcp) + register_drc_tools(mcp) # Register prompts logger.debug("Registering prompts...") diff --git a/kicad_mcp/tools/export_tools.py b/kicad_mcp/tools/export_tools.py index 2262ab2..1426ba2 100644 --- a/kicad_mcp/tools/export_tools.py +++ b/kicad_mcp/tools/export_tools.py @@ -1,23 +1,26 @@ """ -Analysis and validation tools for KiCad projects. +Export tools for KiCad projects. """ import os import tempfile +import subprocess +import shutil +import asyncio from typing import Dict, Any, Optional from mcp.server.fastmcp import FastMCP, Context, Image from kicad_mcp.utils.file_utils import get_project_files from kicad_mcp.utils.logger import Logger +from kicad_mcp.config import KICAD_APP_PATH, system # Create logger for this module logger = Logger() -def register_analysis_tools(mcp: FastMCP, kicad_modules_available: bool = False) -> None: - """Register analysis and validation tools with the MCP server. +def register_export_tools(mcp: FastMCP) -> None: + """Register export tools with the MCP server. Args: mcp: The FastMCP server instance - kicad_modules_available: Whether KiCad Python modules are available """ @mcp.tool() @@ -70,166 +73,217 @@ def register_analysis_tools(mcp: FastMCP, kicad_modules_available: bool = False) Args: project_path: Path to the KiCad project file (.kicad_pro) + ctx: Context for MCP communication Returns: Thumbnail image of the PCB or None if generation failed """ - logger.info(f"Generating thumbnail for project: {project_path}") - - if not os.path.exists(project_path): - logger.error(f"Project not found: {project_path}") - ctx.info(f"Project not found: {project_path}") - return None - - # Get PCB file from project - files = get_project_files(project_path) - if "pcb" not in files: - logger.error("PCB file not found in project") - ctx.info("PCB file not found in project") - return None - - pcb_file = files["pcb"] - logger.info(f"Found PCB file: {pcb_file}") - - await ctx.report_progress(10, 100) - ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)}") - - # Method 1: Try to use pcbnew Python module if available - if kicad_modules_available: - try: - thumbnail = await generate_thumbnail_with_pcbnew(pcb_file, ctx) - if thumbnail: - return thumbnail - - # If pcbnew method failed, log it but continue to try alternative method - logger.warning("Failed to generate thumbnail with pcbnew, trying CLI method") - except Exception as e: - logger.error(f"Error using pcbnew for thumbnail: {str(e)}", exc_info=True) - ctx.info(f"Error with pcbnew method, trying alternative approach") - else: - logger.info("KiCad Python modules not available, trying CLI method") - - # Method 2: Try to use command-line tools try: - thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx) - if thumbnail: - return thumbnail - except Exception as e: - logger.error(f"Error using CLI for thumbnail: {str(e)}", exc_info=True) - ctx.info(f"Error generating thumbnail with CLI method") + # Access the context + app_context = ctx.request_context.lifespan_context + kicad_modules_available = app_context.kicad_modules_available + + logger.info(f"Generating thumbnail for project: {project_path}") - # If all methods fail, inform the user - ctx.info("Could not generate thumbnail for PCB - all methods failed") - return None + if not os.path.exists(project_path): + logger.error(f"Project not found: {project_path}") + ctx.info(f"Project not found: {project_path}") + return None + + # Get PCB file from project + files = get_project_files(project_path) + if "pcb" not in files: + logger.error("PCB file not found in project") + ctx.info("PCB file not found in project") + return None + + pcb_file = files["pcb"] + logger.info(f"Found PCB file: {pcb_file}") + + # Check cache + cache_key = f"thumbnail_{pcb_file}_{os.path.getmtime(pcb_file)}" + if hasattr(app_context, 'cache') and cache_key in app_context.cache: + logger.info(f"Using cached thumbnail for {pcb_file}") + return app_context.cache[cache_key] + + await ctx.report_progress(10, 100) + ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)}") + + # Method 1: Try to use pcbnew Python module if available + if kicad_modules_available: + try: + thumbnail = await generate_thumbnail_with_pcbnew(pcb_file, ctx) + if thumbnail: + # Cache the result if possible + if hasattr(app_context, 'cache'): + app_context.cache[cache_key] = thumbnail + return thumbnail + + # If pcbnew method failed, log it but continue to try alternative method + logger.warning("Failed to generate thumbnail with pcbnew, trying CLI method") + except Exception as e: + logger.error(f"Error using pcbnew for thumbnail: {str(e)}", exc_info=True) + ctx.info(f"Error with pcbnew method, trying alternative approach") + else: + logger.info("KiCad Python modules not available, trying CLI method") + + # Method 2: Try to use command-line tools + try: + thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx) + if thumbnail: + # Cache the result if possible + if hasattr(app_context, 'cache'): + app_context.cache[cache_key] = thumbnail + return thumbnail + except Exception as e: + logger.error(f"Error using CLI for thumbnail: {str(e)}", exc_info=True) + ctx.info(f"Error generating thumbnail with CLI method") + + # If all methods fail, inform the user + ctx.info("Could not generate thumbnail for PCB - all methods failed") + return None + + except asyncio.CancelledError: + logger.info("Thumbnail generation cancelled") + raise # Re-raise to let MCP know the task was cancelled + except Exception as e: + logger.error(f"Unexpected error in thumbnail generation: {str(e)}") + ctx.info(f"Error: {str(e)}") + return None @mcp.tool() async def generate_project_thumbnail(project_path: str, ctx: Context) -> Optional[Image]: """Generate a thumbnail of a KiCad project's PCB layout.""" - logger.info(f"Generating thumbnail for project: {project_path}") - - if not os.path.exists(project_path): - logger.error(f"Project not found: {project_path}") - ctx.info(f"Project not found: {project_path}") - return None - - # Get PCB file - files = get_project_files(project_path) - if "pcb" not in files: - logger.error("PCB file not found in project") - ctx.info("PCB file not found in project") - return None - - pcb_file = files["pcb"] - logger.info(f"Found PCB file: {pcb_file}") - - if not kicad_modules_available: - logger.warning("KiCad Python modules are not available - cannot generate thumbnail") - ctx.info("KiCad Python modules are not available") - return None - try: - # Try to import pcbnew - import pcbnew - logger.info("Successfully imported pcbnew module") + # Access the context + app_context = ctx.request_context.lifespan_context + kicad_modules_available = app_context.kicad_modules_available - # Load the PCB file - logger.debug(f"Loading PCB file: {pcb_file}") - board = pcbnew.LoadBoard(pcb_file) - if not board: - logger.error("Failed to load PCB file") - ctx.info("Failed to load PCB file") + logger.info(f"Generating thumbnail for project: {project_path}") + + if not os.path.exists(project_path): + logger.error(f"Project not found: {project_path}") + ctx.info(f"Project not found: {project_path}") + return None + + # Get PCB file + files = get_project_files(project_path) + if "pcb" not in files: + logger.error("PCB file not found in project") + ctx.info("PCB file not found in project") + return None + + pcb_file = files["pcb"] + logger.info(f"Found PCB file: {pcb_file}") + + if not kicad_modules_available: + logger.warning("KiCad Python modules are not available - cannot generate thumbnail") + ctx.info("KiCad Python modules are not available") + return None + + # Check cache + cache_key = f"project_thumbnail_{pcb_file}_{os.path.getmtime(pcb_file)}" + if hasattr(app_context, 'cache') and cache_key in app_context.cache: + logger.info(f"Using cached project thumbnail for {pcb_file}") + return app_context.cache[cache_key] + + try: + # Try to import pcbnew + import pcbnew + logger.info("Successfully imported pcbnew module") + + # Load the PCB file + logger.debug(f"Loading PCB file: {pcb_file}") + board = pcbnew.LoadBoard(pcb_file) + if not board: + logger.error("Failed to load PCB file") + ctx.info("Failed to load PCB file") + return None + + # Get board dimensions + board_box = board.GetBoardEdgesBoundingBox() + width = board_box.GetWidth() / 1000000.0 # Convert to mm + height = board_box.GetHeight() / 1000000.0 + + logger.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm") + ctx.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm") + + # Create temporary directory for output + with tempfile.TemporaryDirectory() as temp_dir: + logger.debug(f"Created temporary directory: {temp_dir}") + + # Create PLOT_CONTROLLER for plotting + pctl = pcbnew.PLOT_CONTROLLER(board) + popt = pctl.GetPlotOptions() + + # Set plot options for PNG output + popt.SetOutputDirectory(temp_dir) + popt.SetPlotFrameRef(False) + popt.SetPlotValue(True) + popt.SetPlotReference(True) + popt.SetPlotInvisibleText(False) + popt.SetPlotViaOnMaskLayer(False) + popt.SetColorMode(True) # Color mode + + # Set color theme (if available in this version) + if hasattr(popt, "SetColorTheme"): + popt.SetColorTheme("default") + + # Calculate a reasonable scale to fit in a thumbnail + max_size = 800 # Max pixel dimension + scale = min(max_size / width, max_size / height) * 0.8 # 80% to leave some margin + + # Set plot scale if the function exists + if hasattr(popt, "SetScale"): + popt.SetScale(scale) + + # Determine output filename + plot_basename = "thumbnail" + output_filename = os.path.join(temp_dir, f"{plot_basename}.png") + + logger.debug(f"Plotting PCB to: {output_filename}") + + # Plot PNG + pctl.OpenPlotfile(plot_basename, pcbnew.PLOT_FORMAT_PNG, "Thumbnail") + pctl.PlotLayer() + pctl.ClosePlot() + + # The plot controller creates files with predictable names + plot_file = os.path.join(temp_dir, f"{plot_basename}.png") + + if not os.path.exists(plot_file): + logger.error(f"Expected plot file not found: {plot_file}") + ctx.info("Failed to generate PCB image") + return None + + # Read the image file + with open(plot_file, 'rb') as f: + img_data = f.read() + + logger.info(f"Successfully generated thumbnail, size: {len(img_data)} bytes") + + # Create and cache the image + thumbnail = Image(data=img_data, format="png") + if hasattr(app_context, 'cache'): + app_context.cache[cache_key] = thumbnail + + return thumbnail + + except ImportError as e: + logger.error(f"Failed to import pcbnew module: {str(e)}") + ctx.info(f"Failed to import pcbnew module: {str(e)}") + return None + except Exception as e: + logger.error(f"Error generating thumbnail: {str(e)}", exc_info=True) + ctx.info(f"Error generating thumbnail: {str(e)}") return None - # Get board dimensions - board_box = board.GetBoardEdgesBoundingBox() - width = board_box.GetWidth() / 1000000.0 # Convert to mm - height = board_box.GetHeight() / 1000000.0 - - logger.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm") - ctx.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm") - - # Create temporary directory for output - with tempfile.TemporaryDirectory() as temp_dir: - logger.debug(f"Created temporary directory: {temp_dir}") - - # Create PLOT_CONTROLLER for plotting - pctl = pcbnew.PLOT_CONTROLLER(board) - popt = pctl.GetPlotOptions() - - # Set plot options for PNG output - popt.SetOutputDirectory(temp_dir) - popt.SetPlotFrameRef(False) - popt.SetPlotValue(True) - popt.SetPlotReference(True) - popt.SetPlotInvisibleText(False) - popt.SetPlotViaOnMaskLayer(False) - popt.SetColorMode(True) # Color mode - - # Set color theme (if available in this version) - if hasattr(popt, "SetColorTheme"): - popt.SetColorTheme("default") - - # Calculate a reasonable scale to fit in a thumbnail - max_size = 800 # Max pixel dimension - scale = min(max_size / width, max_size / height) * 0.8 # 80% to leave some margin - - # Set plot scale if the function exists - if hasattr(popt, "SetScale"): - popt.SetScale(scale) - - # Determine output filename - plot_basename = "thumbnail" - output_filename = os.path.join(temp_dir, f"{plot_basename}.png") - - logger.debug(f"Plotting PCB to: {output_filename}") - - # Plot PNG - pctl.OpenPlotfile(plot_basename, pcbnew.PLOT_FORMAT_PNG, "Thumbnail") - pctl.PlotLayer() - pctl.ClosePlot() - - # The plot controller creates files with predictable names - plot_file = os.path.join(temp_dir, f"{plot_basename}.png") - - if not os.path.exists(plot_file): - logger.error(f"Expected plot file not found: {plot_file}") - ctx.info("Failed to generate PCB image") - return None - - # Read the image file - with open(plot_file, 'rb') as f: - img_data = f.read() - - logger.info(f"Successfully generated thumbnail, size: {len(img_data)} bytes") - return Image(data=img_data, format="png") - - except ImportError as e: - logger.error(f"Failed to import pcbnew module: {str(e)}") - ctx.info(f"Failed to import pcbnew module: {str(e)}") - return None + except asyncio.CancelledError: + logger.info("Project thumbnail generation cancelled") + raise except Exception as e: - logger.error(f"Error generating thumbnail: {str(e)}", exc_info=True) - ctx.info(f"Error generating thumbnail: {str(e)}") + logger.error(f"Unexpected error in project thumbnail generation: {str(e)}", exc_info=True) + ctx.info(f"Error: {str(e)}") return None # Helper functions for thumbnail generation @@ -344,83 +398,89 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[I Returns: Image object containing the PCB thumbnail or None if generation failed """ - import subprocess + try: + logger.info("Attempting to generate thumbnail using command line tools") + await ctx.report_progress(20, 100) - logger.info("Attempting to generate thumbnail using command line tools") - await ctx.report_progress(20, 100) + # Check for required command-line tools based on OS + if system == "Darwin": # macOS + pcbnew_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/pcbnew_cli") + if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: + pcbnew_cli = "pcbnew_cli" # Try to use from PATH + elif not os.path.exists(pcbnew_cli): + logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") + return None + elif system == "Windows": + pcbnew_cli = os.path.join(KICAD_APP_PATH, "bin", "pcbnew_cli.exe") + if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: + pcbnew_cli = "pcbnew_cli" # Try to use from PATH + elif not os.path.exists(pcbnew_cli): + logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") + return None + elif system == "Linux": + pcbnew_cli = shutil.which("pcbnew_cli") + if not pcbnew_cli: + logger.error("pcbnew_cli not found in PATH") + return None + else: + logger.error(f"Unsupported operating system: {system}") + return None - # Check for required command-line tools based on OS - if system == "Darwin": # macOS - pcbnew_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/pcbnew_cli") - if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: - pcbnew_cli = "pcbnew_cli" # Try to use from PATH - elif not os.path.exists(pcbnew_cli): - logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") - return None - elif system == "Windows": - pcbnew_cli = os.path.join(KICAD_APP_PATH, "bin", "pcbnew_cli.exe") - if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: - pcbnew_cli = "pcbnew_cli" # Try to use from PATH - elif not os.path.exists(pcbnew_cli): - logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") - return None - elif system == "Linux": - pcbnew_cli = shutil.which("pcbnew_cli") - if not pcbnew_cli: - logger.error("pcbnew_cli not found in PATH") - return None - else: - logger.error(f"Unsupported operating system: {system}") + await ctx.report_progress(30, 100) + ctx.info("Using KiCad command line tools for thumbnail generation") + + # Create temporary directory for output + with tempfile.TemporaryDirectory() as temp_dir: + # Output PNG file + output_file = os.path.join(temp_dir, "thumbnail.png") + + # Build command for generating PNG from PCB + cmd = [ + pcbnew_cli, + "--export-png", + output_file, + "--page-size-inches", "8x6", # Set a reasonable page size + "--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Important layers + pcb_file + ] + + logger.debug(f"Running command: {' '.join(cmd)}") + await ctx.report_progress(50, 100) + + # Run the command + try: + process = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + + if process.returncode != 0: + logger.error(f"Command failed with code {process.returncode}") + logger.error(f"Error: {process.stderr}") + return None + + await ctx.report_progress(70, 100) + + # Check if the output file was created + if not os.path.exists(output_file): + logger.error(f"Output file not created: {output_file}") + return None + + # Read the image file + with open(output_file, 'rb') as f: + img_data = f.read() + + logger.info(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes") + await ctx.report_progress(90, 100) + return Image(data=img_data, format="png") + + except subprocess.TimeoutExpired: + logger.error("Command timed out after 30 seconds") + return None + except Exception as e: + logger.error(f"Error running CLI command: {str(e)}", exc_info=True) + return None + + except asyncio.CancelledError: + logger.info("CLI thumbnail generation cancelled") + raise + except Exception as e: + logger.error(f"Unexpected error in CLI thumbnail generation: {str(e)}") return None - - await ctx.report_progress(30, 100) - ctx.info("Using KiCad command line tools for thumbnail generation") - - # Create temporary directory for output - with tempfile.TemporaryDirectory() as temp_dir: - # Output PNG file - output_file = os.path.join(temp_dir, "thumbnail.png") - - # Build command for generating PNG from PCB - cmd = [ - pcbnew_cli, - "--export-png", - output_file, - "--page-size-inches", "8x6", # Set a reasonable page size - "--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Important layers - pcb_file - ] - - logger.debug(f"Running command: {' '.join(cmd)}") - await ctx.report_progress(50, 100) - - # Run the command - try: - process = subprocess.run(cmd, capture_output=True, text=True, timeout=30) - - if process.returncode != 0: - logger.error(f"Command failed with code {process.returncode}") - logger.error(f"Error: {process.stderr}") - return None - - await ctx.report_progress(70, 100) - - # Check if the output file was created - if not os.path.exists(output_file): - logger.error(f"Output file not created: {output_file}") - return None - - # Read the image file - with open(output_file, 'rb') as f: - img_data = f.read() - - logger.info(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes") - await ctx.report_progress(90, 100) - return Image(data=img_data, format="png") - - except subprocess.TimeoutExpired: - logger.error("Command timed out after 30 seconds") - return None - except Exception as e: - logger.error(f"Error running CLI command: {str(e)}", exc_info=True) - return None