Refactor server to use type-safe lifespan context management

Implement proper context management in the KiCad MCP server:

Add dedicated context.py with typed KiCadAppContext class
Convert tools to access context instead of parameters
Implement caching for thumbnails
Add proper cleanup of resources on shutdown
Improve error handling with cancellation support
This commit is contained in:
Lama 2025-03-20 09:58:19 -04:00
parent 5007d11579
commit 9a114bce7b
3 changed files with 379 additions and 230 deletions

89
kicad_mcp/context.py Normal file
View File

@ -0,0 +1,89 @@
"""
Lifespan context management for KiCad MCP Server.
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import AsyncIterator, Optional, Dict, Any
from mcp.server.fastmcp import FastMCP
from kicad_mcp.utils.logger import Logger
from kicad_mcp.utils.python_path import setup_kicad_python_path
# Create logger for this module
logger = Logger()
@dataclass
class KiCadAppContext:
"""Type-safe context for KiCad MCP server."""
kicad_modules_available: bool
# Optional cache for expensive operations
cache: Dict[str, Any]
@asynccontextmanager
async def kicad_lifespan(server: FastMCP) -> AsyncIterator[KiCadAppContext]:
"""Manage KiCad MCP server lifecycle with type-safe context.
This function handles:
1. Initializing shared resources when the server starts
2. Providing a typed context object to all request handlers
3. Properly cleaning up resources when the server shuts down
Args:
server: The FastMCP server instance
Yields:
KiCadAppContext: A typed context object shared across all handlers
"""
logger.info("Starting KiCad MCP server initialization")
# Initialize resources on startup
logger.info("Setting up KiCad Python modules")
kicad_modules_available = setup_kicad_python_path()
logger.info(f"KiCad Python modules available: {kicad_modules_available}")
# Create in-memory cache for expensive operations
cache: Dict[str, Any] = {}
# Initialize any other resources that need cleanup later
created_temp_dirs = []
try:
# Import any KiCad modules that should be preloaded
if kicad_modules_available:
try:
logger.info("Preloading KiCad Python modules")
# Core PCB module used in multiple tools
import pcbnew
logger.info(f"Successfully preloaded pcbnew module: {getattr(pcbnew, 'GetBuildVersion', lambda: 'unknown')()}")
cache["pcbnew_version"] = getattr(pcbnew, "GetBuildVersion", lambda: "unknown")()
except ImportError as e:
logger.warning(f"Failed to preload some KiCad modules: {str(e)}")
# Yield the context to the server - server runs during this time
logger.info("KiCad MCP server initialization complete")
yield KiCadAppContext(
kicad_modules_available=kicad_modules_available,
cache=cache
)
finally:
# Clean up resources when server shuts down
logger.info("Shutting down KiCad MCP server")
# Clear the cache
if cache:
logger.debug(f"Clearing cache with {len(cache)} entries")
cache.clear()
# Clean up any temporary directories
import shutil
for temp_dir in created_temp_dirs:
try:
logger.debug(f"Removing temporary directory: {temp_dir}")
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception as e:
logger.error(f"Error cleaning up temporary directory {temp_dir}: {str(e)}")
logger.info("KiCad MCP server shutdown complete")

View File

@ -38,8 +38,8 @@ def create_server() -> FastMCP:
logger.warning("KiCad Python modules not available - some features will be disabled") logger.warning("KiCad Python modules not available - some features will be disabled")
# Initialize FastMCP server # Initialize FastMCP server
mcp = FastMCP("KiCad") mcp = FastMCP("KiCad", lifespan=kicad_lifespan)
logger.info("Created FastMCP server instance") logger.info("Created FastMCP server instance with lifespan management")
# Register resources # Register resources
logger.debug("Registering resources...") logger.debug("Registering resources...")
@ -51,8 +51,8 @@ def create_server() -> FastMCP:
logger.debug("Registering tools...") logger.debug("Registering tools...")
register_project_tools(mcp) register_project_tools(mcp)
register_analysis_tools(mcp) register_analysis_tools(mcp)
register_export_tools(mcp, kicad_modules_available) register_export_tools(mcp)
register_drc_tools(mcp, kicad_modules_available) register_drc_tools(mcp)
# Register prompts # Register prompts
logger.debug("Registering prompts...") logger.debug("Registering prompts...")

View File

@ -1,23 +1,26 @@
""" """
Analysis and validation tools for KiCad projects. Export tools for KiCad projects.
""" """
import os import os
import tempfile import tempfile
import subprocess
import shutil
import asyncio
from typing import Dict, Any, Optional from typing import Dict, Any, Optional
from mcp.server.fastmcp import FastMCP, Context, Image from mcp.server.fastmcp import FastMCP, Context, Image
from kicad_mcp.utils.file_utils import get_project_files from kicad_mcp.utils.file_utils import get_project_files
from kicad_mcp.utils.logger import Logger from kicad_mcp.utils.logger import Logger
from kicad_mcp.config import KICAD_APP_PATH, system
# Create logger for this module # Create logger for this module
logger = Logger() logger = Logger()
def register_analysis_tools(mcp: FastMCP, kicad_modules_available: bool = False) -> None: def register_export_tools(mcp: FastMCP) -> None:
"""Register analysis and validation tools with the MCP server. """Register export tools with the MCP server.
Args: Args:
mcp: The FastMCP server instance mcp: The FastMCP server instance
kicad_modules_available: Whether KiCad Python modules are available
""" """
@mcp.tool() @mcp.tool()
@ -70,166 +73,217 @@ def register_analysis_tools(mcp: FastMCP, kicad_modules_available: bool = False)
Args: Args:
project_path: Path to the KiCad project file (.kicad_pro) project_path: Path to the KiCad project file (.kicad_pro)
ctx: Context for MCP communication
Returns: Returns:
Thumbnail image of the PCB or None if generation failed Thumbnail image of the PCB or None if generation failed
""" """
logger.info(f"Generating thumbnail for project: {project_path}")
if not os.path.exists(project_path):
logger.error(f"Project not found: {project_path}")
ctx.info(f"Project not found: {project_path}")
return None
# Get PCB file from project
files = get_project_files(project_path)
if "pcb" not in files:
logger.error("PCB file not found in project")
ctx.info("PCB file not found in project")
return None
pcb_file = files["pcb"]
logger.info(f"Found PCB file: {pcb_file}")
await ctx.report_progress(10, 100)
ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)}")
# Method 1: Try to use pcbnew Python module if available
if kicad_modules_available:
try:
thumbnail = await generate_thumbnail_with_pcbnew(pcb_file, ctx)
if thumbnail:
return thumbnail
# If pcbnew method failed, log it but continue to try alternative method
logger.warning("Failed to generate thumbnail with pcbnew, trying CLI method")
except Exception as e:
logger.error(f"Error using pcbnew for thumbnail: {str(e)}", exc_info=True)
ctx.info(f"Error with pcbnew method, trying alternative approach")
else:
logger.info("KiCad Python modules not available, trying CLI method")
# Method 2: Try to use command-line tools
try: try:
thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx) # Access the context
if thumbnail: app_context = ctx.request_context.lifespan_context
return thumbnail kicad_modules_available = app_context.kicad_modules_available
except Exception as e:
logger.error(f"Error using CLI for thumbnail: {str(e)}", exc_info=True) logger.info(f"Generating thumbnail for project: {project_path}")
ctx.info(f"Error generating thumbnail with CLI method")
# If all methods fail, inform the user if not os.path.exists(project_path):
ctx.info("Could not generate thumbnail for PCB - all methods failed") logger.error(f"Project not found: {project_path}")
return None ctx.info(f"Project not found: {project_path}")
return None
# Get PCB file from project
files = get_project_files(project_path)
if "pcb" not in files:
logger.error("PCB file not found in project")
ctx.info("PCB file not found in project")
return None
pcb_file = files["pcb"]
logger.info(f"Found PCB file: {pcb_file}")
# Check cache
cache_key = f"thumbnail_{pcb_file}_{os.path.getmtime(pcb_file)}"
if hasattr(app_context, 'cache') and cache_key in app_context.cache:
logger.info(f"Using cached thumbnail for {pcb_file}")
return app_context.cache[cache_key]
await ctx.report_progress(10, 100)
ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)}")
# Method 1: Try to use pcbnew Python module if available
if kicad_modules_available:
try:
thumbnail = await generate_thumbnail_with_pcbnew(pcb_file, ctx)
if thumbnail:
# Cache the result if possible
if hasattr(app_context, 'cache'):
app_context.cache[cache_key] = thumbnail
return thumbnail
# If pcbnew method failed, log it but continue to try alternative method
logger.warning("Failed to generate thumbnail with pcbnew, trying CLI method")
except Exception as e:
logger.error(f"Error using pcbnew for thumbnail: {str(e)}", exc_info=True)
ctx.info(f"Error with pcbnew method, trying alternative approach")
else:
logger.info("KiCad Python modules not available, trying CLI method")
# Method 2: Try to use command-line tools
try:
thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx)
if thumbnail:
# Cache the result if possible
if hasattr(app_context, 'cache'):
app_context.cache[cache_key] = thumbnail
return thumbnail
except Exception as e:
logger.error(f"Error using CLI for thumbnail: {str(e)}", exc_info=True)
ctx.info(f"Error generating thumbnail with CLI method")
# If all methods fail, inform the user
ctx.info("Could not generate thumbnail for PCB - all methods failed")
return None
except asyncio.CancelledError:
logger.info("Thumbnail generation cancelled")
raise # Re-raise to let MCP know the task was cancelled
except Exception as e:
logger.error(f"Unexpected error in thumbnail generation: {str(e)}")
ctx.info(f"Error: {str(e)}")
return None
@mcp.tool() @mcp.tool()
async def generate_project_thumbnail(project_path: str, ctx: Context) -> Optional[Image]: async def generate_project_thumbnail(project_path: str, ctx: Context) -> Optional[Image]:
"""Generate a thumbnail of a KiCad project's PCB layout.""" """Generate a thumbnail of a KiCad project's PCB layout."""
logger.info(f"Generating thumbnail for project: {project_path}")
if not os.path.exists(project_path):
logger.error(f"Project not found: {project_path}")
ctx.info(f"Project not found: {project_path}")
return None
# Get PCB file
files = get_project_files(project_path)
if "pcb" not in files:
logger.error("PCB file not found in project")
ctx.info("PCB file not found in project")
return None
pcb_file = files["pcb"]
logger.info(f"Found PCB file: {pcb_file}")
if not kicad_modules_available:
logger.warning("KiCad Python modules are not available - cannot generate thumbnail")
ctx.info("KiCad Python modules are not available")
return None
try: try:
# Try to import pcbnew # Access the context
import pcbnew app_context = ctx.request_context.lifespan_context
logger.info("Successfully imported pcbnew module") kicad_modules_available = app_context.kicad_modules_available
# Load the PCB file logger.info(f"Generating thumbnail for project: {project_path}")
logger.debug(f"Loading PCB file: {pcb_file}")
board = pcbnew.LoadBoard(pcb_file) if not os.path.exists(project_path):
if not board: logger.error(f"Project not found: {project_path}")
logger.error("Failed to load PCB file") ctx.info(f"Project not found: {project_path}")
ctx.info("Failed to load PCB file") return None
# Get PCB file
files = get_project_files(project_path)
if "pcb" not in files:
logger.error("PCB file not found in project")
ctx.info("PCB file not found in project")
return None
pcb_file = files["pcb"]
logger.info(f"Found PCB file: {pcb_file}")
if not kicad_modules_available:
logger.warning("KiCad Python modules are not available - cannot generate thumbnail")
ctx.info("KiCad Python modules are not available")
return None
# Check cache
cache_key = f"project_thumbnail_{pcb_file}_{os.path.getmtime(pcb_file)}"
if hasattr(app_context, 'cache') and cache_key in app_context.cache:
logger.info(f"Using cached project thumbnail for {pcb_file}")
return app_context.cache[cache_key]
try:
# Try to import pcbnew
import pcbnew
logger.info("Successfully imported pcbnew module")
# Load the PCB file
logger.debug(f"Loading PCB file: {pcb_file}")
board = pcbnew.LoadBoard(pcb_file)
if not board:
logger.error("Failed to load PCB file")
ctx.info("Failed to load PCB file")
return None
# Get board dimensions
board_box = board.GetBoardEdgesBoundingBox()
width = board_box.GetWidth() / 1000000.0 # Convert to mm
height = board_box.GetHeight() / 1000000.0
logger.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm")
ctx.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm")
# Create temporary directory for output
with tempfile.TemporaryDirectory() as temp_dir:
logger.debug(f"Created temporary directory: {temp_dir}")
# Create PLOT_CONTROLLER for plotting
pctl = pcbnew.PLOT_CONTROLLER(board)
popt = pctl.GetPlotOptions()
# Set plot options for PNG output
popt.SetOutputDirectory(temp_dir)
popt.SetPlotFrameRef(False)
popt.SetPlotValue(True)
popt.SetPlotReference(True)
popt.SetPlotInvisibleText(False)
popt.SetPlotViaOnMaskLayer(False)
popt.SetColorMode(True) # Color mode
# Set color theme (if available in this version)
if hasattr(popt, "SetColorTheme"):
popt.SetColorTheme("default")
# Calculate a reasonable scale to fit in a thumbnail
max_size = 800 # Max pixel dimension
scale = min(max_size / width, max_size / height) * 0.8 # 80% to leave some margin
# Set plot scale if the function exists
if hasattr(popt, "SetScale"):
popt.SetScale(scale)
# Determine output filename
plot_basename = "thumbnail"
output_filename = os.path.join(temp_dir, f"{plot_basename}.png")
logger.debug(f"Plotting PCB to: {output_filename}")
# Plot PNG
pctl.OpenPlotfile(plot_basename, pcbnew.PLOT_FORMAT_PNG, "Thumbnail")
pctl.PlotLayer()
pctl.ClosePlot()
# The plot controller creates files with predictable names
plot_file = os.path.join(temp_dir, f"{plot_basename}.png")
if not os.path.exists(plot_file):
logger.error(f"Expected plot file not found: {plot_file}")
ctx.info("Failed to generate PCB image")
return None
# Read the image file
with open(plot_file, 'rb') as f:
img_data = f.read()
logger.info(f"Successfully generated thumbnail, size: {len(img_data)} bytes")
# Create and cache the image
thumbnail = Image(data=img_data, format="png")
if hasattr(app_context, 'cache'):
app_context.cache[cache_key] = thumbnail
return thumbnail
except ImportError as e:
logger.error(f"Failed to import pcbnew module: {str(e)}")
ctx.info(f"Failed to import pcbnew module: {str(e)}")
return None
except Exception as e:
logger.error(f"Error generating thumbnail: {str(e)}", exc_info=True)
ctx.info(f"Error generating thumbnail: {str(e)}")
return None return None
# Get board dimensions except asyncio.CancelledError:
board_box = board.GetBoardEdgesBoundingBox() logger.info("Project thumbnail generation cancelled")
width = board_box.GetWidth() / 1000000.0 # Convert to mm raise
height = board_box.GetHeight() / 1000000.0
logger.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm")
ctx.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm")
# Create temporary directory for output
with tempfile.TemporaryDirectory() as temp_dir:
logger.debug(f"Created temporary directory: {temp_dir}")
# Create PLOT_CONTROLLER for plotting
pctl = pcbnew.PLOT_CONTROLLER(board)
popt = pctl.GetPlotOptions()
# Set plot options for PNG output
popt.SetOutputDirectory(temp_dir)
popt.SetPlotFrameRef(False)
popt.SetPlotValue(True)
popt.SetPlotReference(True)
popt.SetPlotInvisibleText(False)
popt.SetPlotViaOnMaskLayer(False)
popt.SetColorMode(True) # Color mode
# Set color theme (if available in this version)
if hasattr(popt, "SetColorTheme"):
popt.SetColorTheme("default")
# Calculate a reasonable scale to fit in a thumbnail
max_size = 800 # Max pixel dimension
scale = min(max_size / width, max_size / height) * 0.8 # 80% to leave some margin
# Set plot scale if the function exists
if hasattr(popt, "SetScale"):
popt.SetScale(scale)
# Determine output filename
plot_basename = "thumbnail"
output_filename = os.path.join(temp_dir, f"{plot_basename}.png")
logger.debug(f"Plotting PCB to: {output_filename}")
# Plot PNG
pctl.OpenPlotfile(plot_basename, pcbnew.PLOT_FORMAT_PNG, "Thumbnail")
pctl.PlotLayer()
pctl.ClosePlot()
# The plot controller creates files with predictable names
plot_file = os.path.join(temp_dir, f"{plot_basename}.png")
if not os.path.exists(plot_file):
logger.error(f"Expected plot file not found: {plot_file}")
ctx.info("Failed to generate PCB image")
return None
# Read the image file
with open(plot_file, 'rb') as f:
img_data = f.read()
logger.info(f"Successfully generated thumbnail, size: {len(img_data)} bytes")
return Image(data=img_data, format="png")
except ImportError as e:
logger.error(f"Failed to import pcbnew module: {str(e)}")
ctx.info(f"Failed to import pcbnew module: {str(e)}")
return None
except Exception as e: except Exception as e:
logger.error(f"Error generating thumbnail: {str(e)}", exc_info=True) logger.error(f"Unexpected error in project thumbnail generation: {str(e)}", exc_info=True)
ctx.info(f"Error generating thumbnail: {str(e)}") ctx.info(f"Error: {str(e)}")
return None return None
# Helper functions for thumbnail generation # Helper functions for thumbnail generation
@ -344,83 +398,89 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[I
Returns: Returns:
Image object containing the PCB thumbnail or None if generation failed Image object containing the PCB thumbnail or None if generation failed
""" """
import subprocess try:
logger.info("Attempting to generate thumbnail using command line tools")
await ctx.report_progress(20, 100)
logger.info("Attempting to generate thumbnail using command line tools") # Check for required command-line tools based on OS
await ctx.report_progress(20, 100) if system == "Darwin": # macOS
pcbnew_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/pcbnew_cli")
if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None:
pcbnew_cli = "pcbnew_cli" # Try to use from PATH
elif not os.path.exists(pcbnew_cli):
logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH")
return None
elif system == "Windows":
pcbnew_cli = os.path.join(KICAD_APP_PATH, "bin", "pcbnew_cli.exe")
if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None:
pcbnew_cli = "pcbnew_cli" # Try to use from PATH
elif not os.path.exists(pcbnew_cli):
logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH")
return None
elif system == "Linux":
pcbnew_cli = shutil.which("pcbnew_cli")
if not pcbnew_cli:
logger.error("pcbnew_cli not found in PATH")
return None
else:
logger.error(f"Unsupported operating system: {system}")
return None
# Check for required command-line tools based on OS await ctx.report_progress(30, 100)
if system == "Darwin": # macOS ctx.info("Using KiCad command line tools for thumbnail generation")
pcbnew_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/pcbnew_cli")
if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: # Create temporary directory for output
pcbnew_cli = "pcbnew_cli" # Try to use from PATH with tempfile.TemporaryDirectory() as temp_dir:
elif not os.path.exists(pcbnew_cli): # Output PNG file
logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") output_file = os.path.join(temp_dir, "thumbnail.png")
return None
elif system == "Windows": # Build command for generating PNG from PCB
pcbnew_cli = os.path.join(KICAD_APP_PATH, "bin", "pcbnew_cli.exe") cmd = [
if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None: pcbnew_cli,
pcbnew_cli = "pcbnew_cli" # Try to use from PATH "--export-png",
elif not os.path.exists(pcbnew_cli): output_file,
logger.error(f"pcbnew_cli not found at {pcbnew_cli} or in PATH") "--page-size-inches", "8x6", # Set a reasonable page size
return None "--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Important layers
elif system == "Linux": pcb_file
pcbnew_cli = shutil.which("pcbnew_cli") ]
if not pcbnew_cli:
logger.error("pcbnew_cli not found in PATH") logger.debug(f"Running command: {' '.join(cmd)}")
return None await ctx.report_progress(50, 100)
else:
logger.error(f"Unsupported operating system: {system}") # Run the command
try:
process = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if process.returncode != 0:
logger.error(f"Command failed with code {process.returncode}")
logger.error(f"Error: {process.stderr}")
return None
await ctx.report_progress(70, 100)
# Check if the output file was created
if not os.path.exists(output_file):
logger.error(f"Output file not created: {output_file}")
return None
# Read the image file
with open(output_file, 'rb') as f:
img_data = f.read()
logger.info(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes")
await ctx.report_progress(90, 100)
return Image(data=img_data, format="png")
except subprocess.TimeoutExpired:
logger.error("Command timed out after 30 seconds")
return None
except Exception as e:
logger.error(f"Error running CLI command: {str(e)}", exc_info=True)
return None
except asyncio.CancelledError:
logger.info("CLI thumbnail generation cancelled")
raise
except Exception as e:
logger.error(f"Unexpected error in CLI thumbnail generation: {str(e)}")
return None return None
await ctx.report_progress(30, 100)
ctx.info("Using KiCad command line tools for thumbnail generation")
# Create temporary directory for output
with tempfile.TemporaryDirectory() as temp_dir:
# Output PNG file
output_file = os.path.join(temp_dir, "thumbnail.png")
# Build command for generating PNG from PCB
cmd = [
pcbnew_cli,
"--export-png",
output_file,
"--page-size-inches", "8x6", # Set a reasonable page size
"--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Important layers
pcb_file
]
logger.debug(f"Running command: {' '.join(cmd)}")
await ctx.report_progress(50, 100)
# Run the command
try:
process = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if process.returncode != 0:
logger.error(f"Command failed with code {process.returncode}")
logger.error(f"Error: {process.stderr}")
return None
await ctx.report_progress(70, 100)
# Check if the output file was created
if not os.path.exists(output_file):
logger.error(f"Output file not created: {output_file}")
return None
# Read the image file
with open(output_file, 'rb') as f:
img_data = f.read()
logger.info(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes")
await ctx.report_progress(90, 100)
return Image(data=img_data, format="png")
except subprocess.TimeoutExpired:
logger.error("Command timed out after 30 seconds")
return None
except Exception as e:
logger.error(f"Error running CLI command: {str(e)}", exc_info=True)
return None