This major update transforms the KiCad MCP server from file-based analysis to a complete EDA automation platform with real-time KiCad integration and automated routing capabilities. 🎯 Key Features Implemented: - Complete FreeRouting integration engine for automated PCB routing - Real-time KiCad IPC API integration for live board analysis - Comprehensive routing tools (automated, interactive, quality analysis) - Advanced project automation pipeline (concept to manufacturing) - AI-enhanced design analysis and optimization - 3D model analysis and mechanical constraint checking - Advanced DRC rule management and validation - Symbol library analysis and organization tools - Layer stackup analysis and impedance calculations 🛠️ Technical Implementation: - Enhanced MCP tools: 35+ new routing and automation functions - FreeRouting engine with DSN/SES workflow automation - Real-time component placement optimization via IPC API - Complete project automation from schematic to manufacturing files - Comprehensive integration testing framework 🔧 Infrastructure: - Fixed all FastMCP import statements across codebase - Added comprehensive integration test suite - Enhanced server registration for all new tool categories - Robust error handling and fallback mechanisms ✅ Testing Results: - Server startup and tool registration: ✓ PASS - Project validation with thermal camera project: ✓ PASS - Routing prerequisites detection: ✓ PASS - KiCad CLI integration (v9.0.3): ✓ PASS - Ready for KiCad IPC API enablement and FreeRouting installation 🚀 Impact: This represents the ultimate KiCad integration for Claude Code, enabling complete EDA workflow automation from concept to production-ready files. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
519 lines
16 KiB
Python
519 lines
16 KiB
Python
"""
|
|
KiCad IPC Client Utility
|
|
|
|
Provides a clean interface to the KiCad IPC API for real-time design manipulation.
|
|
This module wraps the kicad-python library to provide MCP-specific functionality
|
|
and error handling for automated design operations.
|
|
"""
|
|
|
|
from contextlib import contextmanager
|
|
import logging
|
|
from typing import Any
|
|
|
|
from kipy import KiCad
|
|
from kipy.board import Board
|
|
from kipy.board_types import FootprintInstance, Net, Track, Via
|
|
from kipy.geometry import Vector2
|
|
from kipy.project import Project
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class KiCadIPCError(Exception):
|
|
"""Custom exception for KiCad IPC operations."""
|
|
pass
|
|
|
|
|
|
class KiCadIPCClient:
|
|
"""
|
|
High-level client for KiCad IPC API operations.
|
|
|
|
Provides a convenient interface for common operations needed by the MCP server,
|
|
including project management, component placement, routing, and file operations.
|
|
"""
|
|
|
|
def __init__(self, host: str = "localhost", port: int = 5555):
|
|
"""
|
|
Initialize the KiCad IPC client.
|
|
|
|
Args:
|
|
host: KiCad IPC server host (default: localhost)
|
|
port: KiCad IPC server port (default: 5555)
|
|
"""
|
|
self.host = host
|
|
self.port = port
|
|
self._kicad: KiCad | None = None
|
|
self._current_project: Project | None = None
|
|
self._current_board: Board | None = None
|
|
|
|
def connect(self) -> bool:
|
|
"""
|
|
Connect to KiCad IPC server.
|
|
|
|
Returns:
|
|
True if connection successful, False otherwise
|
|
"""
|
|
try:
|
|
self._kicad = KiCad()
|
|
version = self._kicad.get_version()
|
|
logger.info(f"Connected to KiCad {version}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to connect to KiCad IPC server: {e}")
|
|
self._kicad = None
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from KiCad IPC server."""
|
|
if self._kicad:
|
|
try:
|
|
self._kicad.close()
|
|
except Exception as e:
|
|
logger.warning(f"Error during disconnect: {e}")
|
|
finally:
|
|
self._kicad = None
|
|
self._current_project = None
|
|
self._current_board = None
|
|
|
|
@property
|
|
def is_connected(self) -> bool:
|
|
"""Check if connected to KiCad."""
|
|
return self._kicad is not None
|
|
|
|
def ensure_connected(self):
|
|
"""Ensure connection to KiCad, raise exception if not connected."""
|
|
if not self.is_connected:
|
|
raise KiCadIPCError("Not connected to KiCad IPC server. Call connect() first.")
|
|
|
|
def get_version(self) -> str:
|
|
"""Get KiCad version."""
|
|
self.ensure_connected()
|
|
return self._kicad.get_version()
|
|
|
|
def open_project(self, project_path: str) -> bool:
|
|
"""
|
|
Open a KiCad project.
|
|
|
|
Args:
|
|
project_path: Path to .kicad_pro file
|
|
|
|
Returns:
|
|
True if project opened successfully
|
|
"""
|
|
self.ensure_connected()
|
|
try:
|
|
self._current_project = self._kicad.open_project(project_path)
|
|
logger.info(f"Opened project: {project_path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to open project {project_path}: {e}")
|
|
return False
|
|
|
|
def open_board(self, board_path: str) -> bool:
|
|
"""
|
|
Open a KiCad board.
|
|
|
|
Args:
|
|
board_path: Path to .kicad_pcb file
|
|
|
|
Returns:
|
|
True if board opened successfully
|
|
"""
|
|
self.ensure_connected()
|
|
try:
|
|
self._current_board = self._kicad.open_board(board_path)
|
|
logger.info(f"Opened board: {board_path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to open board {board_path}: {e}")
|
|
return False
|
|
|
|
@property
|
|
def current_project(self) -> Project | None:
|
|
"""Get current project."""
|
|
return self._current_project
|
|
|
|
@property
|
|
def current_board(self) -> Board | None:
|
|
"""Get current board."""
|
|
return self._current_board
|
|
|
|
def ensure_board_open(self):
|
|
"""Ensure a board is open, raise exception if not."""
|
|
if not self._current_board:
|
|
raise KiCadIPCError("No board is currently open. Call open_board() first.")
|
|
|
|
@contextmanager
|
|
def commit_transaction(self, message: str = "MCP operation"):
|
|
"""
|
|
Context manager for grouping operations into a single commit.
|
|
|
|
Args:
|
|
message: Commit message for undo history
|
|
"""
|
|
self.ensure_board_open()
|
|
commit = self._current_board.begin_commit()
|
|
try:
|
|
yield
|
|
self._current_board.push_commit(commit, message)
|
|
except Exception:
|
|
self._current_board.drop_commit(commit)
|
|
raise
|
|
|
|
# Component and footprint operations
|
|
def get_footprints(self) -> list[FootprintInstance]:
|
|
"""Get all footprints on the current board."""
|
|
self.ensure_board_open()
|
|
return list(self._current_board.get_footprints())
|
|
|
|
def get_footprint_by_reference(self, reference: str) -> FootprintInstance | None:
|
|
"""
|
|
Get footprint by reference designator.
|
|
|
|
Args:
|
|
reference: Component reference (e.g., "R1", "U3")
|
|
|
|
Returns:
|
|
FootprintInstance if found, None otherwise
|
|
"""
|
|
footprints = self.get_footprints()
|
|
for fp in footprints:
|
|
if fp.reference == reference:
|
|
return fp
|
|
return None
|
|
|
|
def move_footprint(self, reference: str, position: Vector2) -> bool:
|
|
"""
|
|
Move a footprint to a new position.
|
|
|
|
Args:
|
|
reference: Component reference
|
|
position: New position (Vector2)
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
self.ensure_board_open()
|
|
try:
|
|
footprint = self.get_footprint_by_reference(reference)
|
|
if not footprint:
|
|
logger.error(f"Footprint {reference} not found")
|
|
return False
|
|
|
|
with self.commit_transaction(f"Move {reference} to {position}"):
|
|
footprint.position = position
|
|
self._current_board.update_items(footprint)
|
|
|
|
logger.info(f"Moved {reference} to {position}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to move footprint {reference}: {e}")
|
|
return False
|
|
|
|
def rotate_footprint(self, reference: str, angle_degrees: float) -> bool:
|
|
"""
|
|
Rotate a footprint.
|
|
|
|
Args:
|
|
reference: Component reference
|
|
angle_degrees: Rotation angle in degrees
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
self.ensure_board_open()
|
|
try:
|
|
footprint = self.get_footprint_by_reference(reference)
|
|
if not footprint:
|
|
logger.error(f"Footprint {reference} not found")
|
|
return False
|
|
|
|
with self.commit_transaction(f"Rotate {reference} by {angle_degrees}°"):
|
|
footprint.rotation = angle_degrees
|
|
self._current_board.update_items(footprint)
|
|
|
|
logger.info(f"Rotated {reference} by {angle_degrees}°")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to rotate footprint {reference}: {e}")
|
|
return False
|
|
|
|
# Net and routing operations
|
|
def get_nets(self) -> list[Net]:
|
|
"""Get all nets on the current board."""
|
|
self.ensure_board_open()
|
|
return list(self._current_board.get_nets())
|
|
|
|
def get_net_by_name(self, name: str) -> Net | None:
|
|
"""
|
|
Get net by name.
|
|
|
|
Args:
|
|
name: Net name
|
|
|
|
Returns:
|
|
Net if found, None otherwise
|
|
"""
|
|
nets = self.get_nets()
|
|
for net in nets:
|
|
if net.name == name:
|
|
return net
|
|
return None
|
|
|
|
def get_tracks(self) -> list[Track | Via]:
|
|
"""Get all tracks and vias on the current board."""
|
|
self.ensure_board_open()
|
|
tracks = list(self._current_board.get_tracks())
|
|
vias = list(self._current_board.get_vias())
|
|
return tracks + vias
|
|
|
|
def delete_tracks_by_net(self, net_name: str) -> bool:
|
|
"""
|
|
Delete all tracks for a specific net.
|
|
|
|
Args:
|
|
net_name: Name of the net to clear
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
self.ensure_board_open()
|
|
try:
|
|
net = self.get_net_by_name(net_name)
|
|
if not net:
|
|
logger.warning(f"Net {net_name} not found")
|
|
return False
|
|
|
|
tracks_to_delete = []
|
|
for track in self.get_tracks():
|
|
if hasattr(track, 'net') and track.net == net:
|
|
tracks_to_delete.append(track)
|
|
|
|
if tracks_to_delete:
|
|
with self.commit_transaction(f"Delete tracks for net {net_name}"):
|
|
self._current_board.remove_items(tracks_to_delete)
|
|
|
|
logger.info(f"Deleted {len(tracks_to_delete)} tracks for net {net_name}")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete tracks for net {net_name}: {e}")
|
|
return False
|
|
|
|
# Board operations
|
|
def save_board(self) -> bool:
|
|
"""Save the current board."""
|
|
self.ensure_board_open()
|
|
try:
|
|
self._current_board.save()
|
|
logger.info("Board saved successfully")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to save board: {e}")
|
|
return False
|
|
|
|
def save_board_as(self, filename: str, overwrite: bool = False) -> bool:
|
|
"""
|
|
Save the current board to a new file.
|
|
|
|
Args:
|
|
filename: Target filename
|
|
overwrite: Whether to overwrite existing file
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
self.ensure_board_open()
|
|
try:
|
|
self._current_board.save_as(filename, overwrite=overwrite)
|
|
logger.info(f"Board saved as: {filename}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to save board as {filename}: {e}")
|
|
return False
|
|
|
|
def get_board_as_string(self) -> str | None:
|
|
"""Get board content as KiCad file format string."""
|
|
self.ensure_board_open()
|
|
try:
|
|
return self._current_board.get_as_string()
|
|
except Exception as e:
|
|
logger.error(f"Failed to get board as string: {e}")
|
|
return None
|
|
|
|
def refill_zones(self, timeout: float = 30.0) -> bool:
|
|
"""
|
|
Refill all zones on the board.
|
|
|
|
Args:
|
|
timeout: Maximum time to wait for completion
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
self.ensure_board_open()
|
|
try:
|
|
self._current_board.refill_zones(block=True, max_poll_seconds=timeout)
|
|
logger.info("Zones refilled successfully")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to refill zones: {e}")
|
|
return False
|
|
|
|
# Analysis operations
|
|
def get_board_statistics(self) -> dict[str, Any]:
|
|
"""
|
|
Get comprehensive board statistics.
|
|
|
|
Returns:
|
|
Dictionary with board statistics
|
|
"""
|
|
self.ensure_board_open()
|
|
try:
|
|
footprints = self.get_footprints()
|
|
nets = self.get_nets()
|
|
tracks = self.get_tracks()
|
|
|
|
stats = {
|
|
"footprint_count": len(footprints),
|
|
"net_count": len(nets),
|
|
"track_count": len([t for t in tracks if isinstance(t, Track)]),
|
|
"via_count": len([t for t in tracks if isinstance(t, Via)]),
|
|
"board_name": self._current_board.name,
|
|
}
|
|
|
|
# Component breakdown by reference prefix
|
|
component_types = {}
|
|
for fp in footprints:
|
|
prefix = ''.join(c for c in fp.reference if c.isalpha())
|
|
component_types[prefix] = component_types.get(prefix, 0) + 1
|
|
|
|
stats["component_types"] = component_types
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get board statistics: {e}")
|
|
return {}
|
|
|
|
def check_connectivity(self) -> dict[str, Any]:
|
|
"""
|
|
Check board connectivity status.
|
|
|
|
Returns:
|
|
Dictionary with connectivity information
|
|
"""
|
|
self.ensure_board_open()
|
|
try:
|
|
nets = self.get_nets()
|
|
tracks = self.get_tracks()
|
|
|
|
# Count routed vs unrouted nets
|
|
routed_nets = set()
|
|
for track in tracks:
|
|
if hasattr(track, 'net') and track.net:
|
|
routed_nets.add(track.net.name)
|
|
|
|
total_nets = len([n for n in nets if n.name and n.name != ""])
|
|
routed_count = len(routed_nets)
|
|
unrouted_count = total_nets - routed_count
|
|
|
|
return {
|
|
"total_nets": total_nets,
|
|
"routed_nets": routed_count,
|
|
"unrouted_nets": unrouted_count,
|
|
"routing_completion": round(routed_count / max(total_nets, 1) * 100, 1),
|
|
"routed_net_names": list(routed_nets)
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check connectivity: {e}")
|
|
return {}
|
|
|
|
|
|
@contextmanager
|
|
def kicad_ipc_session(project_path: str = None, board_path: str = None):
|
|
"""
|
|
Context manager for KiCad IPC sessions.
|
|
|
|
Args:
|
|
project_path: Optional project file to open
|
|
board_path: Optional board file to open
|
|
|
|
Usage:
|
|
with kicad_ipc_session("/path/to/project.kicad_pro") as client:
|
|
client.move_footprint("R1", Vector2(10, 20))
|
|
"""
|
|
client = KiCadIPCClient()
|
|
try:
|
|
if not client.connect():
|
|
raise KiCadIPCError("Failed to connect to KiCad IPC server")
|
|
|
|
if project_path:
|
|
if not client.open_project(project_path):
|
|
raise KiCadIPCError(f"Failed to open project: {project_path}")
|
|
|
|
if board_path:
|
|
if not client.open_board(board_path):
|
|
raise KiCadIPCError(f"Failed to open board: {board_path}")
|
|
|
|
yield client
|
|
|
|
finally:
|
|
client.disconnect()
|
|
|
|
|
|
def check_kicad_availability() -> dict[str, Any]:
|
|
"""
|
|
Check if KiCad IPC API is available and working.
|
|
|
|
Returns:
|
|
Dictionary with availability status and version info
|
|
"""
|
|
try:
|
|
with kicad_ipc_session() as client:
|
|
version = client.get_version()
|
|
return {
|
|
"available": True,
|
|
"version": version,
|
|
"message": f"KiCad IPC API available (version {version})"
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"available": False,
|
|
"version": None,
|
|
"message": f"KiCad IPC API not available: {e}",
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
# Utility functions for common operations
|
|
def get_project_board_path(project_path: str) -> str:
|
|
"""
|
|
Get the board file path from a project file path.
|
|
|
|
Args:
|
|
project_path: Path to .kicad_pro file
|
|
|
|
Returns:
|
|
Path to corresponding .kicad_pcb file
|
|
"""
|
|
if project_path.endswith('.kicad_pro'):
|
|
return project_path.replace('.kicad_pro', '.kicad_pcb')
|
|
else:
|
|
raise ValueError("Project path must end with .kicad_pro")
|
|
|
|
|
|
def format_position(x_mm: float, y_mm: float) -> Vector2:
|
|
"""
|
|
Create a Vector2 position from millimeter coordinates.
|
|
|
|
Args:
|
|
x_mm: X coordinate in millimeters
|
|
y_mm: Y coordinate in millimeters
|
|
|
|
Returns:
|
|
Vector2 position
|
|
"""
|
|
return Vector2.from_xy_mm(x_mm, y_mm)
|