feat: Fix tools, improve stability, and update docs
This commit is contained in:
parent
44b0bda19d
commit
5b9d237d7d
1
.gitignore
vendored
1
.gitignore
vendored
@ -2,6 +2,7 @@
|
|||||||
venv/
|
venv/
|
||||||
env/
|
env/
|
||||||
ENV/
|
ENV/
|
||||||
|
.venv/
|
||||||
|
|
||||||
# Environment files
|
# Environment files
|
||||||
.env
|
.env
|
||||||
|
@ -71,9 +71,6 @@ KICAD_SEARCH_PATHS=~/pcb,~/Electronics,~/Projects/KiCad
|
|||||||
Once the environment is set up, you can run the server:
|
Once the environment is set up, you can run the server:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Run in development mode
|
|
||||||
python -m mcp.dev main.py
|
|
||||||
|
|
||||||
# Or run directly
|
# Or run directly
|
||||||
python main.py
|
python main.py
|
||||||
```
|
```
|
||||||
|
@ -14,10 +14,9 @@ This guide provides detailed information for developers who want to modify or ex
|
|||||||
pip install -r requirements.txt
|
pip install -r requirements.txt
|
||||||
```
|
```
|
||||||
|
|
||||||
2. **Run in development mode**:
|
2. **Run the server**:
|
||||||
```bash
|
```bash
|
||||||
# Run with development server for better debugging
|
python main.py
|
||||||
python -m mcp.dev main.py
|
|
||||||
```
|
```
|
||||||
|
|
||||||
3. **Use the MCP Inspector** for debugging:
|
3. **Use the MCP Inspector** for debugging:
|
||||||
@ -240,11 +239,8 @@ To run tests:
|
|||||||
# Run all tests
|
# Run all tests
|
||||||
pytest
|
pytest
|
||||||
|
|
||||||
# Run specific test file
|
# Run specific tests:
|
||||||
pytest tests/test_resources.py
|
pytest tests/test_tools.py::test_run_drc_check
|
||||||
|
|
||||||
# Run with verbose output
|
|
||||||
pytest -v
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Debugging
|
## Debugging
|
||||||
|
@ -175,10 +175,7 @@ This guide helps you troubleshoot common issues with the KiCad MCP Server.
|
|||||||
|
|
||||||
To diagnose issues, check the server logs:
|
To diagnose issues, check the server logs:
|
||||||
|
|
||||||
1. **Development Mode Logs**
|
1. **Claude Desktop Logs (macOS)**
|
||||||
- When running in development mode with `python -m mcp.dev main.py`, logs appear in the console
|
|
||||||
|
|
||||||
2. **Claude Desktop Logs (macOS)**
|
|
||||||
- Server logs:
|
- Server logs:
|
||||||
```bash
|
```bash
|
||||||
tail -n 20 -F ~/Library/Logs/Claude/mcp-server-kicad.log
|
tail -n 20 -F ~/Library/Logs/Claude/mcp-server-kicad.log
|
||||||
@ -188,7 +185,7 @@ To diagnose issues, check the server logs:
|
|||||||
tail -n 20 -F ~/Library/Logs/Claude/mcp.log
|
tail -n 20 -F ~/Library/Logs/Claude/mcp.log
|
||||||
```
|
```
|
||||||
|
|
||||||
3. **Claude Desktop Logs (Windows)**
|
2. **Claude Desktop Logs (Windows)**
|
||||||
- Check logs in:
|
- Check logs in:
|
||||||
```
|
```
|
||||||
%APPDATA%\Claude\Logs\
|
%APPDATA%\Claude\Logs\
|
||||||
@ -264,17 +261,12 @@ To diagnose issues, check the server logs:
|
|||||||
|
|
||||||
If you're still experiencing problems:
|
If you're still experiencing problems:
|
||||||
|
|
||||||
1. Try running the server in development mode for more detailed output:
|
1. Use the MCP Inspector for direct server testing:
|
||||||
```bash
|
|
||||||
python -m mcp.dev main.py
|
|
||||||
```
|
|
||||||
|
|
||||||
2. Use the MCP Inspector for direct server testing:
|
|
||||||
```bash
|
```bash
|
||||||
npx @modelcontextprotocol/inspector uv --directory . run main.py
|
npx @modelcontextprotocol/inspector uv --directory . run main.py
|
||||||
```
|
```
|
||||||
|
|
||||||
3. Open an issue on GitHub with:
|
2. Open an issue on GitHub with:
|
||||||
- A clear description of the problem
|
- A clear description of the problem
|
||||||
- Steps to reproduce
|
- Steps to reproduce
|
||||||
- Error messages or logs
|
- Error messages or logs
|
||||||
|
@ -16,7 +16,7 @@ elif system == "Windows":
|
|||||||
KICAD_USER_DIR = os.path.expanduser("~/Documents/KiCad")
|
KICAD_USER_DIR = os.path.expanduser("~/Documents/KiCad")
|
||||||
KICAD_APP_PATH = r"C:\Program Files\KiCad"
|
KICAD_APP_PATH = r"C:\Program Files\KiCad"
|
||||||
elif system == "Linux":
|
elif system == "Linux":
|
||||||
KICAD_USER_DIR = os.path.expanduser("~/kicad")
|
KICAD_USER_DIR = os.path.expanduser("~/KiCad")
|
||||||
KICAD_APP_PATH = "/usr/share/kicad"
|
KICAD_APP_PATH = "/usr/share/kicad"
|
||||||
else:
|
else:
|
||||||
# Default to macOS paths if system is unknown
|
# Default to macOS paths if system is unknown
|
||||||
|
@ -4,10 +4,13 @@ Lifespan context management for KiCad MCP Server.
|
|||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import AsyncIterator, Dict, Any
|
from typing import AsyncIterator, Dict, Any
|
||||||
|
import logging # Import logging
|
||||||
|
import os # Added for PID
|
||||||
|
|
||||||
from mcp.server.fastmcp import FastMCP
|
from mcp.server.fastmcp import FastMCP
|
||||||
|
|
||||||
from kicad_mcp.utils.python_path import setup_kicad_python_path
|
# Get PID for logging
|
||||||
|
# _PID = os.getpid()
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class KiCadAppContext:
|
class KiCadAppContext:
|
||||||
@ -18,7 +21,7 @@ class KiCadAppContext:
|
|||||||
cache: Dict[str, Any]
|
cache: Dict[str, Any]
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def kicad_lifespan(server: FastMCP) -> AsyncIterator[KiCadAppContext]:
|
async def kicad_lifespan(server: FastMCP, kicad_modules_available: bool = False) -> AsyncIterator[KiCadAppContext]:
|
||||||
"""Manage KiCad MCP server lifecycle with type-safe context.
|
"""Manage KiCad MCP server lifecycle with type-safe context.
|
||||||
|
|
||||||
This function handles:
|
This function handles:
|
||||||
@ -28,58 +31,55 @@ async def kicad_lifespan(server: FastMCP) -> AsyncIterator[KiCadAppContext]:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
server: The FastMCP server instance
|
server: The FastMCP server instance
|
||||||
|
kicad_modules_available: Flag indicating if Python modules were found (passed from create_server)
|
||||||
|
|
||||||
Yields:
|
Yields:
|
||||||
KiCadAppContext: A typed context object shared across all handlers
|
KiCadAppContext: A typed context object shared across all handlers
|
||||||
"""
|
"""
|
||||||
print("Starting KiCad MCP server initialization")
|
logging.info(f"Starting KiCad MCP server initialization")
|
||||||
|
|
||||||
# Initialize resources on startup
|
# Resources initialization - Python path setup removed
|
||||||
print("Setting up KiCad Python modules")
|
# print("Setting up KiCad Python modules")
|
||||||
kicad_modules_available = setup_kicad_python_path()
|
# kicad_modules_available = setup_kicad_python_path() # Now passed as arg
|
||||||
print(f"KiCad Python modules available: {kicad_modules_available}")
|
logging.info(f"KiCad Python module availability: {kicad_modules_available} (Setup logic removed)")
|
||||||
|
|
||||||
# Create in-memory cache for expensive operations
|
# Create in-memory cache for expensive operations
|
||||||
cache: Dict[str, Any] = {}
|
cache: Dict[str, Any] = {}
|
||||||
|
|
||||||
# Initialize any other resources that need cleanup later
|
# Initialize any other resources that need cleanup later
|
||||||
created_temp_dirs = []
|
created_temp_dirs = [] # Assuming this is managed elsewhere or not needed for now
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Import any KiCad modules that should be preloaded
|
# --- Removed Python module preloading section ---
|
||||||
if kicad_modules_available:
|
# if kicad_modules_available:
|
||||||
try:
|
# try:
|
||||||
print("Preloading KiCad Python modules")
|
# print("Preloading KiCad Python modules")
|
||||||
|
# ...
|
||||||
# Core PCB module used in multiple tools
|
# except ImportError as e:
|
||||||
import pcbnew
|
# print(f"Failed to preload some KiCad modules: {str(e)}")
|
||||||
print(f"Successfully preloaded pcbnew module: {getattr(pcbnew, 'GetBuildVersion', lambda: 'unknown')()}")
|
|
||||||
cache["pcbnew_version"] = getattr(pcbnew, "GetBuildVersion", lambda: "unknown")()
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"Failed to preload some KiCad modules: {str(e)}")
|
|
||||||
|
|
||||||
# Yield the context to the server - server runs during this time
|
# Yield the context to the server - server runs during this time
|
||||||
print("KiCad MCP server initialization complete")
|
logging.info(f"KiCad MCP server initialization complete")
|
||||||
yield KiCadAppContext(
|
yield KiCadAppContext(
|
||||||
kicad_modules_available=kicad_modules_available,
|
kicad_modules_available=kicad_modules_available, # Pass the flag through
|
||||||
cache=cache
|
cache=cache
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
# Clean up resources when server shuts down
|
# Clean up resources when server shuts down
|
||||||
print("Shutting down KiCad MCP server")
|
logging.info(f"Shutting down KiCad MCP server")
|
||||||
|
|
||||||
# Clear the cache
|
# Clear the cache
|
||||||
if cache:
|
if cache:
|
||||||
print(f"Clearing cache with {len(cache)} entries")
|
logging.info(f"Clearing cache with {len(cache)} entries")
|
||||||
cache.clear()
|
cache.clear()
|
||||||
|
|
||||||
# Clean up any temporary directories
|
# Clean up any temporary directories
|
||||||
import shutil
|
import shutil
|
||||||
for temp_dir in created_temp_dirs:
|
for temp_dir in created_temp_dirs:
|
||||||
try:
|
try:
|
||||||
print(f"Removing temporary directory: {temp_dir}")
|
logging.info(f"Removing temporary directory: {temp_dir}")
|
||||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error cleaning up temporary directory {temp_dir}: {str(e)}")
|
logging.error(f"Error cleaning up temporary directory {temp_dir}: {str(e)}")
|
||||||
|
|
||||||
print("KiCad MCP server shutdown complete")
|
logging.info(f"KiCad MCP server shutdown complete")
|
||||||
|
@ -15,22 +15,6 @@ def register_project_resources(mcp: FastMCP) -> None:
|
|||||||
mcp: The FastMCP server instance
|
mcp: The FastMCP server instance
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@mcp.resource("kicad://projects")
|
|
||||||
def list_projects_resource() -> str:
|
|
||||||
"""List all KiCad projects as a formatted resource."""
|
|
||||||
projects = find_kicad_projects()
|
|
||||||
|
|
||||||
if not projects:
|
|
||||||
return "No KiCad projects found in your Documents/KiCad directory."
|
|
||||||
|
|
||||||
result = "# KiCad Projects\n\n"
|
|
||||||
for project in sorted(projects, key=lambda p: p["modified"], reverse=True):
|
|
||||||
result += f"## {project['name']}\n"
|
|
||||||
result += f"- **Path**: {project['path']}\n"
|
|
||||||
result += f"- **Last Modified**: {os.path.getmtime(project['path'])}\n\n"
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
@mcp.resource("kicad://project/{project_path}")
|
@mcp.resource("kicad://project/{project_path}")
|
||||||
def get_project_details(project_path: str) -> str:
|
def get_project_details(project_path: str) -> str:
|
||||||
"""Get details about a specific KiCad project."""
|
"""Get details about a specific KiCad project."""
|
||||||
|
@ -4,6 +4,7 @@ MCP server creation and configuration.
|
|||||||
import atexit
|
import atexit
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
|
import logging
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
from mcp.server.fastmcp import FastMCP
|
from mcp.server.fastmcp import FastMCP
|
||||||
|
|
||||||
@ -31,9 +32,6 @@ from kicad_mcp.prompts.drc_prompt import register_drc_prompts
|
|||||||
from kicad_mcp.prompts.bom_prompts import register_bom_prompts
|
from kicad_mcp.prompts.bom_prompts import register_bom_prompts
|
||||||
from kicad_mcp.prompts.pattern_prompts import register_pattern_prompts
|
from kicad_mcp.prompts.pattern_prompts import register_pattern_prompts
|
||||||
|
|
||||||
# Import utils
|
|
||||||
from kicad_mcp.utils.python_path import setup_kicad_python_path
|
|
||||||
|
|
||||||
# Import context management
|
# Import context management
|
||||||
from kicad_mcp.context import kicad_lifespan
|
from kicad_mcp.context import kicad_lifespan
|
||||||
|
|
||||||
@ -56,7 +54,7 @@ def add_cleanup_handler(handler: Callable) -> None:
|
|||||||
|
|
||||||
def run_cleanup_handlers() -> None:
|
def run_cleanup_handlers() -> None:
|
||||||
"""Run all registered cleanup handlers."""
|
"""Run all registered cleanup handlers."""
|
||||||
print("Running cleanup handlers...")
|
logging.info(f"Running cleanup handlers...")
|
||||||
|
|
||||||
global _shutting_down
|
global _shutting_down
|
||||||
|
|
||||||
@ -65,14 +63,14 @@ def run_cleanup_handlers() -> None:
|
|||||||
return
|
return
|
||||||
|
|
||||||
_shutting_down = True
|
_shutting_down = True
|
||||||
print("Running cleanup handlers...")
|
logging.info(f"Running cleanup handlers...")
|
||||||
|
|
||||||
for handler in cleanup_handlers:
|
for handler in cleanup_handlers:
|
||||||
try:
|
try:
|
||||||
handler()
|
handler()
|
||||||
print(f"Cleanup handler {handler.__name__} completed successfully")
|
logging.info(f"Cleanup handler {handler.__name__} completed successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error in cleanup handler {handler.__name__}: {str(e)}", exc_info=True)
|
logging.error(f"Error in cleanup handler {handler.__name__}: {str(e)}", exc_info=True)
|
||||||
|
|
||||||
def shutdown_server():
|
def shutdown_server():
|
||||||
"""Properly shutdown the server if it exists."""
|
"""Properly shutdown the server if it exists."""
|
||||||
@ -80,13 +78,11 @@ def shutdown_server():
|
|||||||
|
|
||||||
if _server_instance:
|
if _server_instance:
|
||||||
try:
|
try:
|
||||||
print("Shutting down KiCad MCP server")
|
logging.info(f"Shutting down KiCad MCP server")
|
||||||
# The server should handle its own shutdown through its lifespan context
|
|
||||||
# This is mostly a placeholder for any additional server shutdown code
|
|
||||||
_server_instance = None
|
_server_instance = None
|
||||||
print("KiCad MCP server shutdown complete")
|
logging.info(f"KiCad MCP server shutdown complete")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error shutting down server: {str(e)}", exc_info=True)
|
logging.error(f"Error shutting down server: {str(e)}", exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
def register_signal_handlers(server: FastMCP) -> None:
|
def register_signal_handlers(server: FastMCP) -> None:
|
||||||
@ -96,7 +92,7 @@ def register_signal_handlers(server: FastMCP) -> None:
|
|||||||
server: The FastMCP server instance
|
server: The FastMCP server instance
|
||||||
"""
|
"""
|
||||||
def handle_exit_signal(signum, frame):
|
def handle_exit_signal(signum, frame):
|
||||||
print(f"Received signal {signum}, initiating shutdown...")
|
logging.info(f"Received signal {signum}, initiating shutdown...")
|
||||||
|
|
||||||
# Run cleanup first
|
# Run cleanup first
|
||||||
run_cleanup_handlers()
|
run_cleanup_handlers()
|
||||||
@ -111,30 +107,33 @@ def register_signal_handlers(server: FastMCP) -> None:
|
|||||||
for sig in (signal.SIGINT, signal.SIGTERM):
|
for sig in (signal.SIGINT, signal.SIGTERM):
|
||||||
try:
|
try:
|
||||||
signal.signal(sig, handle_exit_signal)
|
signal.signal(sig, handle_exit_signal)
|
||||||
print(f"Registered handler for signal {sig}")
|
logging.info(f"Registered handler for signal {sig}")
|
||||||
except (ValueError, AttributeError) as e:
|
except (ValueError, AttributeError) as e:
|
||||||
# Some signals may not be available on all platforms
|
# Some signals may not be available on all platforms
|
||||||
print(f"Could not register handler for signal {sig}: {str(e)}")
|
logging.error(f"Could not register handler for signal {sig}: {str(e)}")
|
||||||
|
|
||||||
|
|
||||||
def create_server() -> FastMCP:
|
def create_server() -> FastMCP:
|
||||||
"""Create and configure the KiCad MCP server."""
|
"""Create and configure the KiCad MCP server."""
|
||||||
print("Initializing KiCad MCP server")
|
logging.info(f"Initializing KiCad MCP server")
|
||||||
|
|
||||||
# Try to set up KiCad Python path
|
# Try to set up KiCad Python path - Removed
|
||||||
kicad_modules_available = setup_kicad_python_path()
|
# kicad_modules_available = setup_kicad_python_path()
|
||||||
|
kicad_modules_available = False # Set to False as we removed the setup logic
|
||||||
|
|
||||||
if kicad_modules_available:
|
# if kicad_modules_available:
|
||||||
print("KiCad Python modules successfully configured")
|
# print("KiCad Python modules successfully configured")
|
||||||
else:
|
# else:
|
||||||
print("KiCad Python modules not available - some features will be disabled")
|
# Always print this now, as we rely on CLI
|
||||||
|
logging.info(f"KiCad Python module setup removed; relying on kicad-cli for external operations.")
|
||||||
|
|
||||||
# Initialize FastMCP server
|
# Initialize FastMCP server
|
||||||
mcp = FastMCP("KiCad", lifespan=kicad_lifespan)
|
# Pass the availability flag (always False now) to the lifespan context
|
||||||
print("Created FastMCP server instance with lifespan management")
|
mcp = FastMCP("KiCad", lifespan=kicad_lifespan, lifespan_kwargs={"kicad_modules_available": kicad_modules_available})
|
||||||
|
logging.info(f"Created FastMCP server instance with lifespan management")
|
||||||
|
|
||||||
# Register resources
|
# Register resources
|
||||||
print("Registering resources...")
|
logging.info(f"Registering resources...")
|
||||||
register_project_resources(mcp)
|
register_project_resources(mcp)
|
||||||
register_file_resources(mcp)
|
register_file_resources(mcp)
|
||||||
register_drc_resources(mcp)
|
register_drc_resources(mcp)
|
||||||
@ -143,7 +142,7 @@ def create_server() -> FastMCP:
|
|||||||
register_pattern_resources(mcp)
|
register_pattern_resources(mcp)
|
||||||
|
|
||||||
# Register tools
|
# Register tools
|
||||||
print("Registering tools...")
|
logging.info(f"Registering tools...")
|
||||||
register_project_tools(mcp)
|
register_project_tools(mcp)
|
||||||
register_analysis_tools(mcp)
|
register_analysis_tools(mcp)
|
||||||
register_export_tools(mcp)
|
register_export_tools(mcp)
|
||||||
@ -153,7 +152,7 @@ def create_server() -> FastMCP:
|
|||||||
register_pattern_tools(mcp)
|
register_pattern_tools(mcp)
|
||||||
|
|
||||||
# Register prompts
|
# Register prompts
|
||||||
print("Registering prompts...")
|
logging.info(f"Registering prompts...")
|
||||||
register_prompts(mcp)
|
register_prompts(mcp)
|
||||||
register_drc_prompts(mcp)
|
register_drc_prompts(mcp)
|
||||||
register_bom_prompts(mcp)
|
register_bom_prompts(mcp)
|
||||||
@ -164,7 +163,7 @@ def create_server() -> FastMCP:
|
|||||||
atexit.register(run_cleanup_handlers)
|
atexit.register(run_cleanup_handlers)
|
||||||
|
|
||||||
# Add specific cleanup handlers
|
# Add specific cleanup handlers
|
||||||
add_cleanup_handler(lambda: print("KiCad MCP server shutdown complete"))
|
add_cleanup_handler(lambda: logging.info(f"KiCad MCP server shutdown complete"))
|
||||||
|
|
||||||
# Add temp directory cleanup
|
# Add temp directory cleanup
|
||||||
def cleanup_temp_dirs():
|
def cleanup_temp_dirs():
|
||||||
@ -173,17 +172,17 @@ def create_server() -> FastMCP:
|
|||||||
from kicad_mcp.utils.temp_dir_manager import get_temp_dirs
|
from kicad_mcp.utils.temp_dir_manager import get_temp_dirs
|
||||||
|
|
||||||
temp_dirs = get_temp_dirs()
|
temp_dirs = get_temp_dirs()
|
||||||
print(f"Cleaning up {len(temp_dirs)} temporary directories")
|
logging.info(f"Cleaning up {len(temp_dirs)} temporary directories")
|
||||||
|
|
||||||
for temp_dir in temp_dirs:
|
for temp_dir in temp_dirs:
|
||||||
try:
|
try:
|
||||||
if os.path.exists(temp_dir):
|
if os.path.exists(temp_dir):
|
||||||
shutil.rmtree(temp_dir, ignore_errors=True)
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
||||||
print(f"Removed temporary directory: {temp_dir}")
|
logging.info(f"Removed temporary directory: {temp_dir}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error cleaning up temporary directory {temp_dir}: {str(e)}")
|
logging.error(f"Error cleaning up temporary directory {temp_dir}: {str(e)}")
|
||||||
|
|
||||||
add_cleanup_handler(cleanup_temp_dirs)
|
add_cleanup_handler(cleanup_temp_dirs)
|
||||||
|
|
||||||
print("Server initialization complete")
|
logging.info(f"Server initialization complete")
|
||||||
return mcp
|
return mcp
|
||||||
|
@ -584,7 +584,8 @@ async def export_bom_with_python(schematic_file: str, output_dir: str, project_n
|
|||||||
# Try to import KiCad Python modules
|
# Try to import KiCad Python modules
|
||||||
# This is a placeholder since exporting BOMs from schematic files
|
# This is a placeholder since exporting BOMs from schematic files
|
||||||
# is complex and KiCad's API for this is not well-documented
|
# is complex and KiCad's API for this is not well-documented
|
||||||
import pcbnew
|
import kicad
|
||||||
|
import kicad.pcbnew
|
||||||
|
|
||||||
# For now, return a message indicating this method is not implemented yet
|
# For now, return a message indicating this method is not implemented yet
|
||||||
print("BOM export with Python modules not fully implemented")
|
print("BOM export with Python modules not fully implemented")
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
Design Rule Check (DRC) tools for KiCad PCB files.
|
Design Rule Check (DRC) tools for KiCad PCB files.
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
# import logging # <-- Remove if no other logging exists
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
from mcp.server.fastmcp import FastMCP, Context
|
from mcp.server.fastmcp import FastMCP, Context
|
||||||
|
|
||||||
@ -11,7 +12,6 @@ from kicad_mcp.utils.kicad_api_detection import get_best_api_approach
|
|||||||
|
|
||||||
# Import implementations
|
# Import implementations
|
||||||
from kicad_mcp.tools.drc_impl.cli_drc import run_drc_via_cli
|
from kicad_mcp.tools.drc_impl.cli_drc import run_drc_via_cli
|
||||||
from kicad_mcp.tools.drc_impl.ipc_drc import run_drc_with_ipc_api
|
|
||||||
|
|
||||||
def register_drc_tools(mcp: FastMCP) -> None:
|
def register_drc_tools(mcp: FastMCP) -> None:
|
||||||
"""Register DRC tools with the MCP server.
|
"""Register DRC tools with the MCP server.
|
||||||
@ -93,35 +93,18 @@ def register_drc_tools(mcp: FastMCP) -> None:
|
|||||||
await ctx.report_progress(10, 100)
|
await ctx.report_progress(10, 100)
|
||||||
ctx.info(f"Starting DRC check on {os.path.basename(pcb_file)}")
|
ctx.info(f"Starting DRC check on {os.path.basename(pcb_file)}")
|
||||||
|
|
||||||
# Get app context and determine which approach to use
|
|
||||||
app_context = ctx.request_context.lifespan_context
|
|
||||||
api_approach = getattr(app_context, 'api_approach', get_best_api_approach())
|
|
||||||
|
|
||||||
# Run DRC using the appropriate approach
|
# Run DRC using the appropriate approach
|
||||||
drc_results = None
|
drc_results = None
|
||||||
|
|
||||||
if api_approach == "cli":
|
|
||||||
# Use CLI approach (kicad-cli)
|
|
||||||
print("Using kicad-cli for DRC")
|
print("Using kicad-cli for DRC")
|
||||||
ctx.info("Using KiCad CLI for DRC check...")
|
ctx.info("Using KiCad CLI for DRC check...")
|
||||||
|
# logging.info(f"[DRC] Calling run_drc_via_cli for {pcb_file}") # <-- Remove log
|
||||||
drc_results = await run_drc_via_cli(pcb_file, ctx)
|
drc_results = await run_drc_via_cli(pcb_file, ctx)
|
||||||
|
# logging.info(f"[DRC] run_drc_via_cli finished for {pcb_file}") # <-- Remove log
|
||||||
elif api_approach == "ipc":
|
|
||||||
# Use IPC API approach (kicad-python)
|
|
||||||
print("Using IPC API for DRC")
|
|
||||||
ctx.info("Using KiCad IPC API for DRC check...")
|
|
||||||
drc_results = await run_drc_with_ipc_api(pcb_file, ctx)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# No API available
|
|
||||||
print("No KiCad API available for DRC")
|
|
||||||
return {
|
|
||||||
"success": False,
|
|
||||||
"error": "No KiCad API available for DRC. Please install KiCad 9.0 or later."
|
|
||||||
}
|
|
||||||
|
|
||||||
# Process and save results if successful
|
# Process and save results if successful
|
||||||
if drc_results and drc_results.get("success", False):
|
if drc_results and drc_results.get("success", False):
|
||||||
|
# logging.info(f"[DRC] DRC check successful for {pcb_file}. Saving results.") # <-- Remove log
|
||||||
# Save results to history
|
# Save results to history
|
||||||
save_drc_result(project_path, drc_results)
|
save_drc_result(project_path, drc_results)
|
||||||
|
|
||||||
@ -136,6 +119,14 @@ def register_drc_tools(mcp: FastMCP) -> None:
|
|||||||
ctx.info(f"Found {comparison['change']} new DRC violations since the last check.")
|
ctx.info(f"Found {comparison['change']} new DRC violations since the last check.")
|
||||||
else:
|
else:
|
||||||
ctx.info(f"No change in the number of DRC violations since the last check.")
|
ctx.info(f"No change in the number of DRC violations since the last check.")
|
||||||
|
elif drc_results:
|
||||||
|
# logging.warning(f"[DRC] DRC check reported failure for {pcb_file}: {drc_results.get('error')}") # <-- Remove log
|
||||||
|
# Pass or print a warning if needed
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# logging.error(f"[DRC] DRC check returned None for {pcb_file}") # <-- Remove log
|
||||||
|
# Pass or print an error if needed
|
||||||
|
pass
|
||||||
|
|
||||||
# Complete progress
|
# Complete progress
|
||||||
await ctx.report_progress(100, 100)
|
await ctx.report_progress(100, 100)
|
||||||
|
@ -19,53 +19,9 @@ def register_export_tools(mcp: FastMCP) -> None:
|
|||||||
mcp: The FastMCP server instance
|
mcp: The FastMCP server instance
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
def validate_project(project_path: str) -> Dict[str, Any]:
|
|
||||||
"""Basic validation of a KiCad project."""
|
|
||||||
print(f"Validating project: {project_path}")
|
|
||||||
|
|
||||||
if not os.path.exists(project_path):
|
|
||||||
print(f"Project not found: {project_path}")
|
|
||||||
return {"valid": False, "error": f"Project not found: {project_path}"}
|
|
||||||
|
|
||||||
issues = []
|
|
||||||
files = get_project_files(project_path)
|
|
||||||
|
|
||||||
# Check for essential files
|
|
||||||
if "pcb" not in files:
|
|
||||||
print("Missing PCB layout file")
|
|
||||||
issues.append("Missing PCB layout file")
|
|
||||||
|
|
||||||
if "schematic" not in files:
|
|
||||||
print("Missing schematic file")
|
|
||||||
issues.append("Missing schematic file")
|
|
||||||
|
|
||||||
# Validate project file
|
|
||||||
try:
|
|
||||||
with open(project_path, 'r') as f:
|
|
||||||
import json
|
|
||||||
json.load(f)
|
|
||||||
print("Project file validated successfully")
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
print("Invalid project file format (JSON parsing error)")
|
|
||||||
issues.append("Invalid project file format (JSON parsing error)")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error reading project file: {str(e)}")
|
|
||||||
issues.append(f"Error reading project file: {str(e)}")
|
|
||||||
|
|
||||||
result = {
|
|
||||||
"valid": len(issues) == 0,
|
|
||||||
"path": project_path,
|
|
||||||
"issues": issues if issues else None,
|
|
||||||
"files_found": list(files.keys())
|
|
||||||
}
|
|
||||||
|
|
||||||
print(f"Validation result: {'valid' if result['valid'] else 'invalid'}")
|
|
||||||
return result
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
async def generate_pcb_thumbnail(project_path: str, ctx: Context) -> Optional[Image]:
|
async def generate_pcb_thumbnail(project_path: str, ctx: Context) -> Optional[Image]:
|
||||||
"""Generate a thumbnail image of a KiCad PCB layout.
|
"""Generate a thumbnail image of a KiCad PCB layout using kicad-cli.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
project_path: Path to the KiCad project file (.kicad_pro)
|
project_path: Path to the KiCad project file (.kicad_pro)
|
||||||
@ -77,48 +33,50 @@ def register_export_tools(mcp: FastMCP) -> None:
|
|||||||
try:
|
try:
|
||||||
# Access the context
|
# Access the context
|
||||||
app_context = ctx.request_context.lifespan_context
|
app_context = ctx.request_context.lifespan_context
|
||||||
kicad_modules_available = app_context.kicad_modules_available
|
# Removed check for kicad_modules_available as we now use CLI
|
||||||
|
|
||||||
print(f"Generating thumbnail for project: {project_path}")
|
print(f"Generating thumbnail via CLI for project: {project_path}")
|
||||||
|
|
||||||
if not os.path.exists(project_path):
|
if not os.path.exists(project_path):
|
||||||
print(f"Project not found: {project_path}")
|
print(f"Project not found: {project_path}")
|
||||||
ctx.info(f"Project not found: {project_path}")
|
await ctx.info(f"Project not found: {project_path}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Get PCB file from project
|
# Get PCB file from project
|
||||||
files = get_project_files(project_path)
|
files = get_project_files(project_path)
|
||||||
if "pcb" not in files:
|
if "pcb" not in files:
|
||||||
print("PCB file not found in project")
|
print("PCB file not found in project")
|
||||||
ctx.info("PCB file not found in project")
|
await ctx.info("PCB file not found in project")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
pcb_file = files["pcb"]
|
pcb_file = files["pcb"]
|
||||||
print(f"Found PCB file: {pcb_file}")
|
print(f"Found PCB file: {pcb_file}")
|
||||||
|
|
||||||
# Check cache
|
# Check cache
|
||||||
cache_key = f"thumbnail_{pcb_file}_{os.path.getmtime(pcb_file)}"
|
cache_key = f"thumbnail_cli_{pcb_file}_{os.path.getmtime(pcb_file)}"
|
||||||
if hasattr(app_context, 'cache') and cache_key in app_context.cache:
|
if hasattr(app_context, 'cache') and cache_key in app_context.cache:
|
||||||
print(f"Using cached thumbnail for {pcb_file}")
|
print(f"Using cached CLI thumbnail for {pcb_file}")
|
||||||
return app_context.cache[cache_key]
|
return app_context.cache[cache_key]
|
||||||
|
|
||||||
await ctx.report_progress(10, 100)
|
await ctx.report_progress(10, 100)
|
||||||
ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)}")
|
await ctx.info(f"Generating thumbnail for {os.path.basename(pcb_file)} using kicad-cli")
|
||||||
|
|
||||||
# Try to use command-line tools
|
# Use command-line tools
|
||||||
try:
|
try:
|
||||||
thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx)
|
thumbnail = await generate_thumbnail_with_cli(pcb_file, ctx)
|
||||||
if thumbnail:
|
if thumbnail:
|
||||||
# Cache the result if possible
|
# Cache the result if possible
|
||||||
if hasattr(app_context, 'cache'):
|
if hasattr(app_context, 'cache'):
|
||||||
app_context.cache[cache_key] = thumbnail
|
app_context.cache[cache_key] = thumbnail
|
||||||
|
print("Thumbnail generated successfully via CLI.")
|
||||||
return thumbnail
|
return thumbnail
|
||||||
|
else:
|
||||||
|
print("generate_thumbnail_with_cli returned None")
|
||||||
|
await ctx.info("Failed to generate thumbnail using kicad-cli.")
|
||||||
|
return None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error using CLI for thumbnail: {str(e)}", exc_info=True)
|
print(f"Error calling generate_thumbnail_with_cli: {str(e)}", exc_info=True)
|
||||||
ctx.info(f"Error generating thumbnail with CLI method")
|
await ctx.info(f"Error generating thumbnail with kicad-cli: {str(e)}")
|
||||||
|
|
||||||
# If it fails, inform the user
|
|
||||||
ctx.info("Could not generate thumbnail for PCB - all methods failed")
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
@ -126,248 +84,20 @@ def register_export_tools(mcp: FastMCP) -> None:
|
|||||||
raise # Re-raise to let MCP know the task was cancelled
|
raise # Re-raise to let MCP know the task was cancelled
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Unexpected error in thumbnail generation: {str(e)}")
|
print(f"Unexpected error in thumbnail generation: {str(e)}")
|
||||||
ctx.info(f"Error: {str(e)}")
|
await ctx.info(f"Error: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
async def generate_project_thumbnail(project_path: str, ctx: Context) -> Optional[Image]:
|
async def generate_project_thumbnail(project_path: str, ctx: Context) -> Optional[Image]:
|
||||||
"""Generate a thumbnail of a KiCad project's PCB layout."""
|
"""Generate a thumbnail of a KiCad project's PCB layout (Alias for generate_pcb_thumbnail)."""
|
||||||
try:
|
# This function now just calls the main CLI-based thumbnail generator
|
||||||
# Access the context
|
print(f"generate_project_thumbnail called, redirecting to generate_pcb_thumbnail for {project_path}")
|
||||||
app_context = ctx.request_context.lifespan_context
|
return await generate_pcb_thumbnail(project_path, ctx)
|
||||||
kicad_modules_available = app_context.kicad_modules_available
|
|
||||||
|
|
||||||
print(f"Generating thumbnail for project: {project_path}")
|
|
||||||
|
|
||||||
if not os.path.exists(project_path):
|
|
||||||
print(f"Project not found: {project_path}")
|
|
||||||
ctx.info(f"Project not found: {project_path}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Get PCB file
|
|
||||||
files = get_project_files(project_path)
|
|
||||||
if "pcb" not in files:
|
|
||||||
print("PCB file not found in project")
|
|
||||||
ctx.info("PCB file not found in project")
|
|
||||||
return None
|
|
||||||
|
|
||||||
pcb_file = files["pcb"]
|
|
||||||
print(f"Found PCB file: {pcb_file}")
|
|
||||||
|
|
||||||
if not kicad_modules_available:
|
|
||||||
print("KiCad Python modules are not available - cannot generate thumbnail")
|
|
||||||
ctx.info("KiCad Python modules are not available")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Check cache
|
|
||||||
cache_key = f"project_thumbnail_{pcb_file}_{os.path.getmtime(pcb_file)}"
|
|
||||||
if hasattr(app_context, 'cache') and cache_key in app_context.cache:
|
|
||||||
print(f"Using cached project thumbnail for {pcb_file}")
|
|
||||||
return app_context.cache[cache_key]
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Try to import pcbnew
|
|
||||||
import pcbnew
|
|
||||||
print("Successfully imported pcbnew module")
|
|
||||||
|
|
||||||
# Load the PCB file
|
|
||||||
print(f"Loading PCB file: {pcb_file}")
|
|
||||||
board = pcbnew.LoadBoard(pcb_file)
|
|
||||||
if not board:
|
|
||||||
print("Failed to load PCB file")
|
|
||||||
ctx.info("Failed to load PCB file")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Get board dimensions
|
|
||||||
board_box = board.GetBoardEdgesBoundingBox()
|
|
||||||
width = board_box.GetWidth() / 1000000.0 # Convert to mm
|
|
||||||
height = board_box.GetHeight() / 1000000.0
|
|
||||||
|
|
||||||
print(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm")
|
|
||||||
ctx.info(f"PCB dimensions: {width:.2f}mm x {height:.2f}mm")
|
|
||||||
|
|
||||||
# Create temporary directory for output
|
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
|
||||||
print(f"Created temporary directory: {temp_dir}")
|
|
||||||
|
|
||||||
# Create PLOT_CONTROLLER for plotting
|
|
||||||
pctl = pcbnew.PLOT_CONTROLLER(board)
|
|
||||||
popt = pctl.GetPlotOptions()
|
|
||||||
|
|
||||||
# Set plot options for PNG output
|
|
||||||
popt.SetOutputDirectory(temp_dir)
|
|
||||||
popt.SetPlotFrameRef(False)
|
|
||||||
popt.SetPlotValue(True)
|
|
||||||
popt.SetPlotReference(True)
|
|
||||||
popt.SetPlotInvisibleText(False)
|
|
||||||
popt.SetPlotViaOnMaskLayer(False)
|
|
||||||
popt.SetColorMode(True) # Color mode
|
|
||||||
|
|
||||||
# Set color theme (if available in this version)
|
|
||||||
if hasattr(popt, "SetColorTheme"):
|
|
||||||
popt.SetColorTheme("default")
|
|
||||||
|
|
||||||
# Calculate a reasonable scale to fit in a thumbnail
|
|
||||||
max_size = 800 # Max pixel dimension
|
|
||||||
scale = min(max_size / width, max_size / height) * 0.8 # 80% to leave some margin
|
|
||||||
|
|
||||||
# Set plot scale if the function exists
|
|
||||||
if hasattr(popt, "SetScale"):
|
|
||||||
popt.SetScale(scale)
|
|
||||||
|
|
||||||
# Determine output filename
|
|
||||||
plot_basename = "thumbnail"
|
|
||||||
output_filename = os.path.join(temp_dir, f"{plot_basename}.png")
|
|
||||||
|
|
||||||
print(f"Plotting PCB to: {output_filename}")
|
|
||||||
|
|
||||||
# Plot PNG
|
|
||||||
pctl.OpenPlotfile(plot_basename, pcbnew.PLOT_FORMAT_PNG, "Thumbnail")
|
|
||||||
pctl.PlotLayer()
|
|
||||||
pctl.ClosePlot()
|
|
||||||
|
|
||||||
# The plot controller creates files with predictable names
|
|
||||||
plot_file = os.path.join(temp_dir, f"{plot_basename}.png")
|
|
||||||
|
|
||||||
if not os.path.exists(plot_file):
|
|
||||||
print(f"Expected plot file not found: {plot_file}")
|
|
||||||
ctx.info("Failed to generate PCB image")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Read the image file
|
|
||||||
with open(plot_file, 'rb') as f:
|
|
||||||
img_data = f.read()
|
|
||||||
|
|
||||||
print(f"Successfully generated thumbnail, size: {len(img_data)} bytes")
|
|
||||||
|
|
||||||
# Create and cache the image
|
|
||||||
thumbnail = Image(data=img_data, format="png")
|
|
||||||
if hasattr(app_context, 'cache'):
|
|
||||||
app_context.cache[cache_key] = thumbnail
|
|
||||||
|
|
||||||
return thumbnail
|
|
||||||
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"Failed to import pcbnew module: {str(e)}")
|
|
||||||
ctx.info(f"Failed to import pcbnew module: {str(e)}")
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error generating thumbnail: {str(e)}", exc_info=True)
|
|
||||||
ctx.info(f"Error generating thumbnail: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
print("Project thumbnail generation cancelled")
|
|
||||||
raise
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Unexpected error in project thumbnail generation: {str(e)}", exc_info=True)
|
|
||||||
ctx.info(f"Error: {str(e)}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Helper functions for thumbnail generation
|
# Helper functions for thumbnail generation
|
||||||
async def generate_thumbnail_with_pcbnew(pcb_file: str, ctx: Context) -> Optional[Image]:
|
|
||||||
"""Generate PCB thumbnail using the pcbnew Python module.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
pcb_file: Path to the PCB file (.kicad_pcb)
|
|
||||||
ctx: MCP context for progress reporting
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Image object containing the PCB thumbnail or None if generation failed
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
import pcbnew
|
|
||||||
print("Successfully imported pcbnew module")
|
|
||||||
await ctx.report_progress(20, 100)
|
|
||||||
|
|
||||||
# Load the PCB file
|
|
||||||
print(f"Loading PCB file with pcbnew: {pcb_file}")
|
|
||||||
board = pcbnew.LoadBoard(pcb_file)
|
|
||||||
if not board:
|
|
||||||
print("Failed to load PCB file with pcbnew")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Report progress
|
|
||||||
await ctx.report_progress(30, 100)
|
|
||||||
ctx.info("PCB file loaded, generating image...")
|
|
||||||
|
|
||||||
# Get board dimensions
|
|
||||||
board_box = board.GetBoardEdgesBoundingBox()
|
|
||||||
width_mm = board_box.GetWidth() / 1000000.0 # Convert to mm
|
|
||||||
height_mm = board_box.GetHeight() / 1000000.0
|
|
||||||
|
|
||||||
print(f"PCB dimensions: {width_mm:.2f}mm x {height_mm:.2f}mm")
|
|
||||||
|
|
||||||
# Create temporary directory for output
|
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
|
||||||
print(f"Created temporary directory: {temp_dir}")
|
|
||||||
|
|
||||||
# Create PLOT_CONTROLLER for plotting
|
|
||||||
pctl = pcbnew.PLOT_CONTROLLER(board)
|
|
||||||
popt = pctl.GetPlotOptions()
|
|
||||||
|
|
||||||
# Set plot options for PNG output
|
|
||||||
popt.SetOutputDirectory(temp_dir)
|
|
||||||
popt.SetPlotFrameRef(False)
|
|
||||||
popt.SetPlotValue(True)
|
|
||||||
popt.SetPlotReference(True)
|
|
||||||
popt.SetPlotInvisibleText(False)
|
|
||||||
popt.SetPlotViaOnMaskLayer(False)
|
|
||||||
|
|
||||||
# Set color mode (if available in this version)
|
|
||||||
if hasattr(popt, "SetColorMode"):
|
|
||||||
popt.SetColorMode(True) # Color mode
|
|
||||||
|
|
||||||
# Set color theme (if available in this version)
|
|
||||||
if hasattr(popt, "SetColorTheme"):
|
|
||||||
popt.SetColorTheme("default")
|
|
||||||
|
|
||||||
# Calculate a reasonable scale to fit in a thumbnail
|
|
||||||
max_pixels = 800 # Max pixel dimension
|
|
||||||
scale = min(max_pixels / width_mm, max_pixels / height_mm) * 0.8 # 80% to leave margin
|
|
||||||
|
|
||||||
# Set plot scale if the function exists
|
|
||||||
if hasattr(popt, "SetScale"):
|
|
||||||
popt.SetScale(scale)
|
|
||||||
|
|
||||||
# Determine output filename
|
|
||||||
plot_basename = "thumbnail"
|
|
||||||
|
|
||||||
print(f"Plotting PCB to PNG")
|
|
||||||
await ctx.report_progress(50, 100)
|
|
||||||
|
|
||||||
# Plot PNG
|
|
||||||
pctl.OpenPlotfile(plot_basename, pcbnew.PLOT_FORMAT_PNG, "Thumbnail")
|
|
||||||
pctl.PlotLayer()
|
|
||||||
pctl.ClosePlot()
|
|
||||||
|
|
||||||
await ctx.report_progress(70, 100)
|
|
||||||
|
|
||||||
# The plot controller creates files with predictable names
|
|
||||||
plot_file = os.path.join(temp_dir, f"{plot_basename}.png")
|
|
||||||
|
|
||||||
if not os.path.exists(plot_file):
|
|
||||||
print(f"Expected plot file not found: {plot_file}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Read the image file
|
|
||||||
with open(plot_file, 'rb') as f:
|
|
||||||
img_data = f.read()
|
|
||||||
|
|
||||||
print(f"Successfully generated thumbnail, size: {len(img_data)} bytes")
|
|
||||||
await ctx.report_progress(90, 100)
|
|
||||||
return Image(data=img_data, format="png")
|
|
||||||
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"Failed to import pcbnew module: {str(e)}")
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error generating thumbnail with pcbnew: {str(e)}", exc_info=True)
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[Image]:
|
async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[Image]:
|
||||||
"""Generate PCB thumbnail using command line tools.
|
"""Generate PCB thumbnail using command line tools.
|
||||||
This is a fallback method when pcbnew Python module is not available.
|
This is a fallback method when the kicad Python module is not available or fails.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
pcb_file: Path to the PCB file (.kicad_pcb)
|
pcb_file: Path to the PCB file (.kicad_pcb)
|
||||||
@ -377,48 +107,58 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[I
|
|||||||
Image object containing the PCB thumbnail or None if generation failed
|
Image object containing the PCB thumbnail or None if generation failed
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
print("Attempting to generate thumbnail using command line tools")
|
print("Attempting to generate thumbnail using KiCad CLI tools")
|
||||||
await ctx.report_progress(20, 100)
|
await ctx.report_progress(20, 100)
|
||||||
|
|
||||||
|
# --- Determine Output Path ---
|
||||||
|
project_dir = os.path.dirname(pcb_file)
|
||||||
|
project_name = os.path.splitext(os.path.basename(pcb_file))[0]
|
||||||
|
output_file = os.path.join(project_dir, f"{project_name}_thumbnail.svg")
|
||||||
|
# ---------------------------
|
||||||
|
|
||||||
# Check for required command-line tools based on OS
|
# Check for required command-line tools based on OS
|
||||||
|
kicad_cli = None
|
||||||
if system == "Darwin": # macOS
|
if system == "Darwin": # macOS
|
||||||
pcbnew_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/pcbnew_cli")
|
kicad_cli_path = os.path.join(KICAD_APP_PATH, "Contents/MacOS/kicad-cli")
|
||||||
if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None:
|
if os.path.exists(kicad_cli_path):
|
||||||
pcbnew_cli = "pcbnew_cli" # Try to use from PATH
|
kicad_cli = kicad_cli_path
|
||||||
elif not os.path.exists(pcbnew_cli):
|
elif shutil.which("kicad-cli") is not None:
|
||||||
print(f"pcbnew_cli not found at {pcbnew_cli} or in PATH")
|
kicad_cli = "kicad-cli" # Try to use from PATH
|
||||||
|
else:
|
||||||
|
print(f"kicad-cli not found at {kicad_cli_path} or in PATH")
|
||||||
return None
|
return None
|
||||||
elif system == "Windows":
|
elif system == "Windows":
|
||||||
pcbnew_cli = os.path.join(KICAD_APP_PATH, "bin", "pcbnew_cli.exe")
|
kicad_cli_path = os.path.join(KICAD_APP_PATH, "bin", "kicad-cli.exe")
|
||||||
if not os.path.exists(pcbnew_cli) and shutil.which("pcbnew_cli") is not None:
|
if os.path.exists(kicad_cli_path):
|
||||||
pcbnew_cli = "pcbnew_cli" # Try to use from PATH
|
kicad_cli = kicad_cli_path
|
||||||
elif not os.path.exists(pcbnew_cli):
|
elif shutil.which("kicad-cli.exe") is not None:
|
||||||
print(f"pcbnew_cli not found at {pcbnew_cli} or in PATH")
|
kicad_cli = "kicad-cli.exe"
|
||||||
|
elif shutil.which("kicad-cli") is not None:
|
||||||
|
kicad_cli = "kicad-cli" # Try to use from PATH (without .exe)
|
||||||
|
else:
|
||||||
|
print(f"kicad-cli not found at {kicad_cli_path} or in PATH")
|
||||||
return None
|
return None
|
||||||
elif system == "Linux":
|
elif system == "Linux":
|
||||||
pcbnew_cli = shutil.which("pcbnew_cli")
|
kicad_cli = shutil.which("kicad-cli")
|
||||||
if not pcbnew_cli:
|
if not kicad_cli:
|
||||||
print("pcbnew_cli not found in PATH")
|
print("kicad-cli not found in PATH")
|
||||||
return None
|
return None
|
||||||
else:
|
else:
|
||||||
print(f"Unsupported operating system: {system}")
|
print(f"Unsupported operating system: {system}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
await ctx.report_progress(30, 100)
|
await ctx.report_progress(30, 100)
|
||||||
ctx.info("Using KiCad command line tools for thumbnail generation")
|
await ctx.info("Using KiCad command line tools for thumbnail generation")
|
||||||
|
|
||||||
# Create temporary directory for output
|
# Build command for generating SVG from PCB using kicad-cli (changed from PNG)
|
||||||
with tempfile.TemporaryDirectory() as temp_dir:
|
|
||||||
# Output PNG file
|
|
||||||
output_file = os.path.join(temp_dir, "thumbnail.png")
|
|
||||||
|
|
||||||
# Build command for generating PNG from PCB
|
|
||||||
cmd = [
|
cmd = [
|
||||||
pcbnew_cli,
|
kicad_cli,
|
||||||
"--export-png",
|
"pcb",
|
||||||
output_file,
|
"export",
|
||||||
"--page-size-inches", "8x6", # Set a reasonable page size
|
"svg", # <-- Changed format to svg
|
||||||
"--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Important layers
|
"--output", output_file,
|
||||||
|
"--layers", "F.Cu,B.Cu,F.SilkS,B.SilkS,F.Mask,B.Mask,Edge.Cuts", # Keep relevant layers
|
||||||
|
# Consider adding options like --black-and-white if needed
|
||||||
pcb_file
|
pcb_file
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -427,12 +167,8 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[I
|
|||||||
|
|
||||||
# Run the command
|
# Run the command
|
||||||
try:
|
try:
|
||||||
process = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
|
process = subprocess.run(cmd, capture_output=True, text=True, check=True, timeout=30)
|
||||||
|
print(f"Command successful: {process.stdout}")
|
||||||
if process.returncode != 0:
|
|
||||||
print(f"Command failed with code {process.returncode}")
|
|
||||||
print(f"Error: {process.stderr}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
await ctx.report_progress(70, 100)
|
await ctx.report_progress(70, 100)
|
||||||
|
|
||||||
@ -447,13 +183,23 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[I
|
|||||||
|
|
||||||
print(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes")
|
print(f"Successfully generated thumbnail with CLI, size: {len(img_data)} bytes")
|
||||||
await ctx.report_progress(90, 100)
|
await ctx.report_progress(90, 100)
|
||||||
return Image(data=img_data, format="png")
|
# Inform user about the saved file
|
||||||
|
await ctx.info(f"Thumbnail saved to: {output_file}")
|
||||||
|
return Image(data=img_data, format="svg") # <-- Changed format to svg
|
||||||
|
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
print(f"Command '{' '.join(e.cmd)}' failed with code {e.returncode}")
|
||||||
|
print(f"Stderr: {e.stderr}")
|
||||||
|
print(f"Stdout: {e.stdout}")
|
||||||
|
await ctx.info(f"KiCad CLI command failed: {e.stderr or e.stdout}")
|
||||||
|
return None
|
||||||
except subprocess.TimeoutExpired:
|
except subprocess.TimeoutExpired:
|
||||||
print("Command timed out after 30 seconds")
|
print(f"Command timed out after 30 seconds: {' '.join(cmd)}")
|
||||||
|
await ctx.info("KiCad CLI command timed out")
|
||||||
return None
|
return None
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error running CLI command: {str(e)}", exc_info=True)
|
print(f"Error running CLI command: {str(e)}", exc_info=True)
|
||||||
|
await ctx.info(f"Error running KiCad CLI: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
@ -461,4 +207,5 @@ async def generate_thumbnail_with_cli(pcb_file: str, ctx: Context) -> Optional[I
|
|||||||
raise
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Unexpected error in CLI thumbnail generation: {str(e)}")
|
print(f"Unexpected error in CLI thumbnail generation: {str(e)}")
|
||||||
|
await ctx.info(f"Unexpected error: {str(e)}")
|
||||||
return None
|
return None
|
||||||
|
@ -2,12 +2,15 @@
|
|||||||
Project management tools for KiCad.
|
Project management tools for KiCad.
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
import logging
|
||||||
from typing import Dict, List, Any
|
from typing import Dict, List, Any
|
||||||
from mcp.server.fastmcp import FastMCP
|
from mcp.server.fastmcp import FastMCP
|
||||||
|
|
||||||
from kicad_mcp.utils.kicad_utils import find_kicad_projects, open_kicad_project
|
from kicad_mcp.utils.kicad_utils import find_kicad_projects, open_kicad_project
|
||||||
from kicad_mcp.utils.file_utils import get_project_files, load_project_json
|
from kicad_mcp.utils.file_utils import get_project_files, load_project_json
|
||||||
|
|
||||||
|
# Get PID for logging
|
||||||
|
# _PID = os.getpid()
|
||||||
|
|
||||||
def register_project_tools(mcp: FastMCP) -> None:
|
def register_project_tools(mcp: FastMCP) -> None:
|
||||||
"""Register project management tools with the MCP server.
|
"""Register project management tools with the MCP server.
|
||||||
@ -17,9 +20,12 @@ def register_project_tools(mcp: FastMCP) -> None:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def find_projects() -> List[Dict[str, Any]]:
|
def list_projects() -> List[Dict[str, Any]]:
|
||||||
"""Find all KiCad projects on this system."""
|
"""Find and list all KiCad projects on this system."""
|
||||||
return find_kicad_projects()
|
logging.info(f"Executing list_projects tool...")
|
||||||
|
projects = find_kicad_projects()
|
||||||
|
logging.info(f"list_projects tool returning {len(projects)} projects.")
|
||||||
|
return projects
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def get_project_structure(project_path: str) -> Dict[str, Any]:
|
def get_project_structure(project_path: str) -> Dict[str, Any]:
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
Environment variable handling for KiCad MCP Server.
|
Environment variable handling for KiCad MCP Server.
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
import logging
|
||||||
from typing import Dict, Optional
|
from typing import Dict, Optional
|
||||||
|
|
||||||
def load_dotenv(env_file: str = ".env") -> Dict[str, str]:
|
def load_dotenv(env_file: str = ".env") -> Dict[str, str]:
|
||||||
@ -14,21 +15,28 @@ def load_dotenv(env_file: str = ".env") -> Dict[str, str]:
|
|||||||
Dictionary of loaded environment variables
|
Dictionary of loaded environment variables
|
||||||
"""
|
"""
|
||||||
env_vars = {}
|
env_vars = {}
|
||||||
|
logging.info(f"load_dotenv called for file: {env_file}")
|
||||||
|
|
||||||
# Try to find .env file in the current directory or parent directories
|
# Try to find .env file in the current directory or parent directories
|
||||||
env_path = find_env_file(env_file)
|
env_path = find_env_file(env_file)
|
||||||
|
|
||||||
if not env_path:
|
if not env_path:
|
||||||
# No .env file found, return empty dict
|
logging.warning(f"No .env file found matching: {env_file}")
|
||||||
return env_vars
|
return env_vars
|
||||||
|
|
||||||
|
logging.info(f"Found .env file at: {env_path}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(env_path, 'r') as f:
|
with open(env_path, 'r') as f:
|
||||||
|
logging.info(f"Successfully opened {env_path} for reading.")
|
||||||
|
line_num = 0
|
||||||
for line in f:
|
for line in f:
|
||||||
|
line_num += 1
|
||||||
line = line.strip()
|
line = line.strip()
|
||||||
|
|
||||||
# Skip empty lines and comments
|
# Skip empty lines and comments
|
||||||
if not line or line.startswith('#'):
|
if not line or line.startswith('#'):
|
||||||
|
logging.debug(f"Skipping line {line_num} (comment/empty): {line}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Parse key-value pairs
|
# Parse key-value pairs
|
||||||
@ -36,6 +44,7 @@ def load_dotenv(env_file: str = ".env") -> Dict[str, str]:
|
|||||||
key, value = line.split('=', 1)
|
key, value = line.split('=', 1)
|
||||||
key = key.strip()
|
key = key.strip()
|
||||||
value = value.strip()
|
value = value.strip()
|
||||||
|
logging.debug(f"Parsed line {line_num}: Key='{key}', RawValue='{value}'")
|
||||||
|
|
||||||
# Remove quotes if present
|
# Remove quotes if present
|
||||||
if value.startswith('"') and value.endswith('"'):
|
if value.startswith('"') and value.endswith('"'):
|
||||||
@ -44,16 +53,25 @@ def load_dotenv(env_file: str = ".env") -> Dict[str, str]:
|
|||||||
value = value[1:-1]
|
value = value[1:-1]
|
||||||
|
|
||||||
# Expand ~ to user's home directory
|
# Expand ~ to user's home directory
|
||||||
|
original_value = value
|
||||||
if '~' in value:
|
if '~' in value:
|
||||||
value = os.path.expanduser(value)
|
value = os.path.expanduser(value)
|
||||||
|
if value != original_value:
|
||||||
|
logging.debug(f"Expanded ~ in value for key '{key}': '{original_value}' -> '{value}'")
|
||||||
|
|
||||||
# Set environment variable
|
# Set environment variable
|
||||||
|
logging.info(f"Setting os.environ['{key}'] = '{value}'")
|
||||||
os.environ[key] = value
|
os.environ[key] = value
|
||||||
env_vars[key] = value
|
env_vars[key] = value
|
||||||
|
else:
|
||||||
|
logging.warning(f"Skipping line {line_num} (no '=' found): {line}")
|
||||||
|
logging.info(f"Finished processing {env_path}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading .env file: {str(e)}")
|
# Use logging.exception to include traceback
|
||||||
|
logging.exception(f"Error loading .env file '{env_path}'")
|
||||||
|
|
||||||
|
logging.info(f"load_dotenv returning: {env_vars}")
|
||||||
return env_vars
|
return env_vars
|
||||||
|
|
||||||
def find_env_file(filename: str = ".env") -> Optional[str]:
|
def find_env_file(filename: str = ".env") -> Optional[str]:
|
||||||
@ -66,6 +84,7 @@ def find_env_file(filename: str = ".env") -> Optional[str]:
|
|||||||
Path to the env file if found, None otherwise
|
Path to the env file if found, None otherwise
|
||||||
"""
|
"""
|
||||||
current_dir = os.getcwd()
|
current_dir = os.getcwd()
|
||||||
|
logging.info(f"find_env_file starting search from: {current_dir}")
|
||||||
max_levels = 3 # Limit how far up to search
|
max_levels = 3 # Limit how far up to search
|
||||||
|
|
||||||
for _ in range(max_levels):
|
for _ in range(max_levels):
|
||||||
|
@ -2,11 +2,16 @@
|
|||||||
KiCad-specific utility functions.
|
KiCad-specific utility functions.
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
import logging # Import logging
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import sys # Add sys import
|
||||||
from typing import Dict, List, Any
|
from typing import Dict, List, Any
|
||||||
|
|
||||||
from kicad_mcp.config import KICAD_USER_DIR, KICAD_APP_PATH, KICAD_EXTENSIONS, ADDITIONAL_SEARCH_PATHS
|
from kicad_mcp.config import KICAD_USER_DIR, KICAD_APP_PATH, KICAD_EXTENSIONS, ADDITIONAL_SEARCH_PATHS
|
||||||
|
|
||||||
|
# Get PID for logging - Removed, handled by logging config
|
||||||
|
# _PID = os.getpid()
|
||||||
|
|
||||||
def find_kicad_projects() -> List[Dict[str, Any]]:
|
def find_kicad_projects() -> List[Dict[str, Any]]:
|
||||||
"""Find KiCad projects in the user's directory.
|
"""Find KiCad projects in the user's directory.
|
||||||
|
|
||||||
@ -14,32 +19,57 @@ def find_kicad_projects() -> List[Dict[str, Any]]:
|
|||||||
List of dictionaries with project information
|
List of dictionaries with project information
|
||||||
"""
|
"""
|
||||||
projects = []
|
projects = []
|
||||||
|
logging.info("Attempting to find KiCad projects...") # Log start
|
||||||
# Search directories to look for KiCad projects
|
# Search directories to look for KiCad projects
|
||||||
search_dirs = [KICAD_USER_DIR] + ADDITIONAL_SEARCH_PATHS
|
raw_search_dirs = [KICAD_USER_DIR] + ADDITIONAL_SEARCH_PATHS
|
||||||
|
logging.info(f"Raw KICAD_USER_DIR: '{KICAD_USER_DIR}'")
|
||||||
|
logging.info(f"Raw ADDITIONAL_SEARCH_PATHS: {ADDITIONAL_SEARCH_PATHS}")
|
||||||
|
logging.info(f"Raw search list before expansion: {raw_search_dirs}")
|
||||||
|
|
||||||
for search_dir in search_dirs:
|
expanded_search_dirs = []
|
||||||
|
for raw_dir in raw_search_dirs:
|
||||||
|
expanded_dir = os.path.expanduser(raw_dir) # Expand ~ and ~user
|
||||||
|
if expanded_dir not in expanded_search_dirs:
|
||||||
|
expanded_search_dirs.append(expanded_dir)
|
||||||
|
else:
|
||||||
|
logging.info(f"Skipping duplicate expanded path: {expanded_dir}")
|
||||||
|
|
||||||
|
logging.info(f"Expanded search directories: {expanded_search_dirs}")
|
||||||
|
|
||||||
|
for search_dir in expanded_search_dirs:
|
||||||
if not os.path.exists(search_dir):
|
if not os.path.exists(search_dir):
|
||||||
print(f"Search directory does not exist: {search_dir}")
|
logging.warning(f"Expanded search directory does not exist: {search_dir}") # Use warning level
|
||||||
continue
|
continue
|
||||||
|
|
||||||
print(f"Scanning directory: {search_dir}")
|
logging.info(f"Scanning expanded directory: {search_dir}")
|
||||||
for root, _, files in os.walk(search_dir):
|
# Use followlinks=True to follow symlinks if needed
|
||||||
|
for root, _, files in os.walk(search_dir, followlinks=True):
|
||||||
for file in files:
|
for file in files:
|
||||||
if file.endswith(KICAD_EXTENSIONS["project"]):
|
if file.endswith(KICAD_EXTENSIONS["project"]):
|
||||||
project_path = os.path.join(root, file)
|
project_path = os.path.join(root, file)
|
||||||
|
# Check if it's a real file and not a broken symlink
|
||||||
|
if not os.path.isfile(project_path):
|
||||||
|
logging.info(f"Skipping non-file/broken symlink: {project_path}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Attempt to get modification time to ensure file is accessible
|
||||||
|
mod_time = os.path.getmtime(project_path)
|
||||||
rel_path = os.path.relpath(project_path, search_dir)
|
rel_path = os.path.relpath(project_path, search_dir)
|
||||||
project_name = get_project_name_from_path(project_path)
|
project_name = get_project_name_from_path(project_path)
|
||||||
|
|
||||||
print(f"Found KiCad project: {project_path}")
|
logging.info(f"Found accessible KiCad project: {project_path}")
|
||||||
projects.append({
|
projects.append({
|
||||||
"name": project_name,
|
"name": project_name,
|
||||||
"path": project_path,
|
"path": project_path,
|
||||||
"relative_path": rel_path,
|
"relative_path": rel_path,
|
||||||
"modified": os.path.getmtime(project_path)
|
"modified": mod_time
|
||||||
})
|
})
|
||||||
|
except OSError as e:
|
||||||
|
logging.error(f"Error accessing project file {project_path}: {e}") # Use error level
|
||||||
|
continue # Skip if we can't access it
|
||||||
|
|
||||||
print(f"Found {len(projects)} KiCad projects")
|
logging.info(f"Found {len(projects)} KiCad projects after scanning.")
|
||||||
return projects
|
return projects
|
||||||
|
|
||||||
def get_project_name_from_path(project_path: str) -> str:
|
def get_project_name_from_path(project_path: str) -> str:
|
||||||
@ -68,8 +98,17 @@ def open_kicad_project(project_path: str) -> Dict[str, Any]:
|
|||||||
return {"success": False, "error": f"Project not found: {project_path}"}
|
return {"success": False, "error": f"Project not found: {project_path}"}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
cmd = []
|
||||||
|
if sys.platform == "darwin": # macOS
|
||||||
# On MacOS, use the 'open' command to open the project in KiCad
|
# On MacOS, use the 'open' command to open the project in KiCad
|
||||||
cmd = ["open", "-a", KICAD_APP_PATH, project_path]
|
cmd = ["open", "-a", KICAD_APP_PATH, project_path]
|
||||||
|
elif sys.platform == "linux": # Linux
|
||||||
|
# On Linux, use 'xdg-open'
|
||||||
|
cmd = ["xdg-open", project_path]
|
||||||
|
else:
|
||||||
|
# Fallback or error for unsupported OS
|
||||||
|
return {"success": False, "error": f"Unsupported operating system: {sys.platform}"}
|
||||||
|
|
||||||
result = subprocess.run(cmd, capture_output=True, text=True)
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
@ -1,104 +0,0 @@
|
|||||||
"""
|
|
||||||
Python path handling for KiCad modules.
|
|
||||||
"""
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import glob
|
|
||||||
import platform
|
|
||||||
|
|
||||||
def setup_kicad_python_path():
|
|
||||||
"""
|
|
||||||
Add KiCad Python modules to the Python path by detecting the appropriate version.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
bool: True if successful, False otherwise
|
|
||||||
"""
|
|
||||||
system = platform.system()
|
|
||||||
print(f"Setting up KiCad Python path for {system}")
|
|
||||||
|
|
||||||
# Define search paths based on operating system
|
|
||||||
if system == "Darwin": # macOS
|
|
||||||
from kicad_mcp.config import KICAD_APP_PATH
|
|
||||||
|
|
||||||
if not os.path.exists(KICAD_APP_PATH):
|
|
||||||
print(f"KiCad application not found at {KICAD_APP_PATH}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Base path to Python framework
|
|
||||||
python_base = os.path.join(KICAD_APP_PATH, "Contents/Frameworks/Python.framework/Versions")
|
|
||||||
|
|
||||||
# First try 'Current' symlink
|
|
||||||
current_path = os.path.join(python_base, "Current/lib/python*/site-packages")
|
|
||||||
site_packages = glob.glob(current_path)
|
|
||||||
|
|
||||||
# If 'Current' symlink doesn't work, find all available Python versions
|
|
||||||
if not site_packages:
|
|
||||||
print("'Current' symlink not found, searching for numbered versions")
|
|
||||||
# Look for numbered versions like 3.9, 3.10, etc.
|
|
||||||
version_dirs = glob.glob(os.path.join(python_base, "[0-9]*"))
|
|
||||||
for version_dir in version_dirs:
|
|
||||||
potential_path = os.path.join(version_dir, "lib/python*/site-packages")
|
|
||||||
site_packages.extend(glob.glob(potential_path))
|
|
||||||
|
|
||||||
elif system == "Windows":
|
|
||||||
# Windows path - typically in Program Files
|
|
||||||
kicad_app_path = r"C:\Program Files\KiCad"
|
|
||||||
python_dirs = glob.glob(os.path.join(kicad_app_path, "lib", "python*"))
|
|
||||||
site_packages = []
|
|
||||||
|
|
||||||
for python_dir in python_dirs:
|
|
||||||
potential_path = os.path.join(python_dir, "site-packages")
|
|
||||||
if os.path.exists(potential_path):
|
|
||||||
site_packages.append(potential_path)
|
|
||||||
|
|
||||||
elif system == "Linux":
|
|
||||||
# Common Linux installation paths
|
|
||||||
site_packages = [
|
|
||||||
"/usr/lib/python3/dist-packages", # Debian/Ubuntu
|
|
||||||
"/usr/lib/python3.*/site-packages", # Red Hat/Fedora
|
|
||||||
"/usr/local/lib/python3.*/site-packages" # Source install
|
|
||||||
]
|
|
||||||
|
|
||||||
# Expand glob patterns
|
|
||||||
expanded_packages = []
|
|
||||||
for pattern in site_packages:
|
|
||||||
if "*" in pattern:
|
|
||||||
expanded_packages.extend(glob.glob(pattern))
|
|
||||||
else:
|
|
||||||
expanded_packages.append(pattern)
|
|
||||||
|
|
||||||
site_packages = expanded_packages
|
|
||||||
|
|
||||||
else:
|
|
||||||
print(f"Unsupported operating system: {system}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Pick the first valid path found
|
|
||||||
for path in site_packages:
|
|
||||||
if os.path.exists(path):
|
|
||||||
# Check if pcbnew module exists in this path
|
|
||||||
pcbnew_path = os.path.join(path, "pcbnew.so")
|
|
||||||
if not os.path.exists(pcbnew_path):
|
|
||||||
# On Windows it might be pcbnew.pyd instead
|
|
||||||
pcbnew_path = os.path.join(path, "pcbnew.pyd")
|
|
||||||
|
|
||||||
if os.path.exists(pcbnew_path):
|
|
||||||
if path not in sys.path:
|
|
||||||
sys.path.append(path)
|
|
||||||
print(f"Added KiCad Python path: {path}")
|
|
||||||
print(f"Found pcbnew module at: {pcbnew_path}")
|
|
||||||
|
|
||||||
# Try to actually import it to verify compatibility
|
|
||||||
try:
|
|
||||||
import pcbnew
|
|
||||||
print(f"Successfully imported pcbnew module version: {getattr(pcbnew, 'GetBuildVersion', lambda: 'unknown')()}")
|
|
||||||
return True
|
|
||||||
except ImportError as e:
|
|
||||||
print(f"Found pcbnew but failed to import: {str(e)}")
|
|
||||||
# Remove from path as it's not usable
|
|
||||||
sys.path.remove(path)
|
|
||||||
else:
|
|
||||||
print(f"Found site-packages at {path} but no pcbnew module")
|
|
||||||
|
|
||||||
print("Could not find a valid KiCad Python site-packages directory with pcbnew module")
|
|
||||||
return False
|
|
60
main.py
60
main.py
@ -5,29 +5,75 @@ This server allows Claude and other MCP clients to interact with KiCad projects.
|
|||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
import logging # Import logging module
|
||||||
|
|
||||||
|
# Must import config BEFORE env potentially overrides it via os.environ
|
||||||
from kicad_mcp.config import KICAD_USER_DIR, ADDITIONAL_SEARCH_PATHS
|
from kicad_mcp.config import KICAD_USER_DIR, ADDITIONAL_SEARCH_PATHS
|
||||||
from kicad_mcp.server import create_server
|
from kicad_mcp.server import create_server
|
||||||
from kicad_mcp.utils.env import load_dotenv
|
from kicad_mcp.utils.env import load_dotenv
|
||||||
|
|
||||||
|
# --- Setup Logging ---
|
||||||
|
log_file = os.path.join(os.path.dirname(__file__), 'kicad-mcp.log')
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format='%(asctime)s - %(levelname)s - [PID:%(process)d] - %(message)s',
|
||||||
|
handlers=[
|
||||||
|
logging.FileHandler(log_file, mode='w'), # Use 'w' to overwrite log on each start
|
||||||
|
# logging.StreamHandler() # Optionally keep logging to console if needed
|
||||||
|
]
|
||||||
|
)
|
||||||
|
# ---------------------
|
||||||
|
|
||||||
|
logging.info("--- Server Starting --- ")
|
||||||
|
logging.info(f"Initial KICAD_USER_DIR from config.py: {KICAD_USER_DIR}")
|
||||||
|
logging.info(f"Initial ADDITIONAL_SEARCH_PATHS from config.py: {ADDITIONAL_SEARCH_PATHS}")
|
||||||
|
|
||||||
|
# Get PID for logging (already used by basicConfig)
|
||||||
|
_PID = os.getpid()
|
||||||
|
|
||||||
# Load environment variables from .env file if present
|
# Load environment variables from .env file if present
|
||||||
load_dotenv()
|
# This attempts to update os.environ
|
||||||
|
dotenv_path = os.path.join(os.path.dirname(__file__), '.env')
|
||||||
|
logging.info(f"Attempting to load .env file from: {dotenv_path}")
|
||||||
|
found_dotenv = load_dotenv() # Assuming this returns True/False or similar
|
||||||
|
logging.info(f".env file found and loaded: {found_dotenv}")
|
||||||
|
|
||||||
|
# Log effective values AFTER load_dotenv attempt
|
||||||
|
# Note: The config values might not automatically re-read from os.environ
|
||||||
|
# depending on how config.py is written. Let's check os.environ directly.
|
||||||
|
effective_user_dir = os.getenv('KICAD_USER_DIR')
|
||||||
|
effective_search_paths = os.getenv('KICAD_SEARCH_PATHS')
|
||||||
|
logging.info(f"os.environ['KICAD_USER_DIR'] after load_dotenv: {effective_user_dir}")
|
||||||
|
logging.info(f"os.environ['KICAD_SEARCH_PATHS'] after load_dotenv: {effective_search_paths}")
|
||||||
|
|
||||||
|
# Re-log the values imported from config.py to see if they reflect os.environ changes
|
||||||
|
# (This depends on config.py using os.getenv internally AFTER load_dotenv runs)
|
||||||
|
try:
|
||||||
|
from kicad_mcp import config
|
||||||
|
import importlib
|
||||||
|
importlib.reload(config) # Attempt to force re-reading config
|
||||||
|
logging.info(f"Effective KICAD_USER_DIR from config.py after reload: {config.KICAD_USER_DIR}")
|
||||||
|
logging.info(f"Effective ADDITIONAL_SEARCH_PATHS from config.py after reload: {config.ADDITIONAL_SEARCH_PATHS}")
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Could not reload config: {e}")
|
||||||
|
logging.info(f"Using potentially stale KICAD_USER_DIR from initial import: {KICAD_USER_DIR}")
|
||||||
|
logging.info(f"Using potentially stale ADDITIONAL_SEARCH_PATHS from initial import: {ADDITIONAL_SEARCH_PATHS}")
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
try:
|
try:
|
||||||
print("Starting KiCad MCP server")
|
logging.info(f"Starting KiCad MCP server process")
|
||||||
|
|
||||||
# Print search paths from config
|
# Print search paths from config
|
||||||
print(f"Using KiCad user directory: {KICAD_USER_DIR}")
|
logging.info(f"Using KiCad user directory: {KICAD_USER_DIR}") # Changed print to logging
|
||||||
if ADDITIONAL_SEARCH_PATHS:
|
if ADDITIONAL_SEARCH_PATHS:
|
||||||
print(f"Additional search paths: {', '.join(ADDITIONAL_SEARCH_PATHS)}")
|
logging.info(f"Additional search paths: {', '.join(ADDITIONAL_SEARCH_PATHS)}") # Changed print to logging
|
||||||
else:
|
else:
|
||||||
print("No additional search paths configured")
|
logging.info(f"No additional search paths configured") # Changed print to logging
|
||||||
|
|
||||||
# Create and run server
|
# Create and run server
|
||||||
server = create_server()
|
server = create_server()
|
||||||
print("Running server with stdio transport")
|
logging.info(f"Running server with stdio transport") # Changed print to logging
|
||||||
server.run(transport='stdio')
|
server.run(transport='stdio')
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Unhandled exception: {str(e)}")
|
logging.exception(f"Unhandled exception in main") # Log exception details
|
||||||
raise
|
raise
|
||||||
|
@ -3,3 +3,6 @@ httpx
|
|||||||
pytest
|
pytest
|
||||||
pandas
|
pandas
|
||||||
kicad-python
|
kicad-python
|
||||||
|
|
||||||
|
# Development/Testing
|
||||||
|
pytest-asyncio
|
Loading…
x
Reference in New Issue
Block a user