""" KiCad IPC Client Utility Provides a clean interface to the KiCad IPC API for real-time design manipulation. This module wraps the kicad-python library to provide MCP-specific functionality and error handling for automated design operations. """ import logging import time from contextlib import contextmanager from typing import Any, Dict, List, Optional, Union from kipy import KiCad from kipy.board import Board from kipy.board_types import FootprintInstance, Net, Track, Via from kipy.geometry import Vector2 from kipy.project import Project logger = logging.getLogger(__name__) class KiCadIPCError(Exception): """Custom exception for KiCad IPC operations.""" pass class KiCadIPCClient: """ High-level client for KiCad IPC API operations. Provides a convenient interface for common operations needed by the MCP server, including project management, component placement, routing, and file operations. """ def __init__(self, host: str = "localhost", port: int = 5555): """ Initialize the KiCad IPC client. Args: host: KiCad IPC server host (default: localhost) port: KiCad IPC server port (default: 5555) """ self.host = host self.port = port self._kicad: Optional[KiCad] = None self._current_project: Optional[Project] = None self._current_board: Optional[Board] = None def connect(self) -> bool: """ Connect to KiCad IPC server. Returns: True if connection successful, False otherwise """ try: self._kicad = KiCad() version = self._kicad.get_version() logger.info(f"Connected to KiCad {version}") return True except Exception as e: logger.error(f"Failed to connect to KiCad IPC server: {e}") self._kicad = None return False def disconnect(self): """Disconnect from KiCad IPC server.""" if self._kicad: try: self._kicad.close() except Exception as e: logger.warning(f"Error during disconnect: {e}") finally: self._kicad = None self._current_project = None self._current_board = None @property def is_connected(self) -> bool: """Check if connected to KiCad.""" return self._kicad is not None def ensure_connected(self): """Ensure connection to KiCad, raise exception if not connected.""" if not self.is_connected: raise KiCadIPCError("Not connected to KiCad IPC server. Call connect() first.") def get_version(self) -> str: """Get KiCad version.""" self.ensure_connected() return self._kicad.get_version() def open_project(self, project_path: str) -> bool: """ Open a KiCad project. Args: project_path: Path to .kicad_pro file Returns: True if project opened successfully """ self.ensure_connected() try: self._current_project = self._kicad.open_project(project_path) logger.info(f"Opened project: {project_path}") return True except Exception as e: logger.error(f"Failed to open project {project_path}: {e}") return False def open_board(self, board_path: str) -> bool: """ Open a KiCad board. Args: board_path: Path to .kicad_pcb file Returns: True if board opened successfully """ self.ensure_connected() try: self._current_board = self._kicad.open_board(board_path) logger.info(f"Opened board: {board_path}") return True except Exception as e: logger.error(f"Failed to open board {board_path}: {e}") return False @property def current_project(self) -> Optional[Project]: """Get current project.""" return self._current_project @property def current_board(self) -> Optional[Board]: """Get current board.""" return self._current_board def ensure_board_open(self): """Ensure a board is open, raise exception if not.""" if not self._current_board: raise KiCadIPCError("No board is currently open. Call open_board() first.") @contextmanager def commit_transaction(self, message: str = "MCP operation"): """ Context manager for grouping operations into a single commit. Args: message: Commit message for undo history """ self.ensure_board_open() commit = self._current_board.begin_commit() try: yield self._current_board.push_commit(commit, message) except Exception: self._current_board.drop_commit(commit) raise # Component and footprint operations def get_footprints(self) -> List[FootprintInstance]: """Get all footprints on the current board.""" self.ensure_board_open() return list(self._current_board.get_footprints()) def get_footprint_by_reference(self, reference: str) -> Optional[FootprintInstance]: """ Get footprint by reference designator. Args: reference: Component reference (e.g., "R1", "U3") Returns: FootprintInstance if found, None otherwise """ footprints = self.get_footprints() for fp in footprints: if fp.reference == reference: return fp return None def move_footprint(self, reference: str, position: Vector2) -> bool: """ Move a footprint to a new position. Args: reference: Component reference position: New position (Vector2) Returns: True if successful """ self.ensure_board_open() try: footprint = self.get_footprint_by_reference(reference) if not footprint: logger.error(f"Footprint {reference} not found") return False with self.commit_transaction(f"Move {reference} to {position}"): footprint.position = position self._current_board.update_items(footprint) logger.info(f"Moved {reference} to {position}") return True except Exception as e: logger.error(f"Failed to move footprint {reference}: {e}") return False def rotate_footprint(self, reference: str, angle_degrees: float) -> bool: """ Rotate a footprint. Args: reference: Component reference angle_degrees: Rotation angle in degrees Returns: True if successful """ self.ensure_board_open() try: footprint = self.get_footprint_by_reference(reference) if not footprint: logger.error(f"Footprint {reference} not found") return False with self.commit_transaction(f"Rotate {reference} by {angle_degrees}°"): footprint.rotation = angle_degrees self._current_board.update_items(footprint) logger.info(f"Rotated {reference} by {angle_degrees}°") return True except Exception as e: logger.error(f"Failed to rotate footprint {reference}: {e}") return False # Net and routing operations def get_nets(self) -> List[Net]: """Get all nets on the current board.""" self.ensure_board_open() return list(self._current_board.get_nets()) def get_net_by_name(self, name: str) -> Optional[Net]: """ Get net by name. Args: name: Net name Returns: Net if found, None otherwise """ nets = self.get_nets() for net in nets: if net.name == name: return net return None def get_tracks(self) -> List[Union[Track, Via]]: """Get all tracks and vias on the current board.""" self.ensure_board_open() tracks = list(self._current_board.get_tracks()) vias = list(self._current_board.get_vias()) return tracks + vias def delete_tracks_by_net(self, net_name: str) -> bool: """ Delete all tracks for a specific net. Args: net_name: Name of the net to clear Returns: True if successful """ self.ensure_board_open() try: net = self.get_net_by_name(net_name) if not net: logger.warning(f"Net {net_name} not found") return False tracks_to_delete = [] for track in self.get_tracks(): if hasattr(track, 'net') and track.net == net: tracks_to_delete.append(track) if tracks_to_delete: with self.commit_transaction(f"Delete tracks for net {net_name}"): self._current_board.remove_items(tracks_to_delete) logger.info(f"Deleted {len(tracks_to_delete)} tracks for net {net_name}") return True except Exception as e: logger.error(f"Failed to delete tracks for net {net_name}: {e}") return False # Board operations def save_board(self) -> bool: """Save the current board.""" self.ensure_board_open() try: self._current_board.save() logger.info("Board saved successfully") return True except Exception as e: logger.error(f"Failed to save board: {e}") return False def save_board_as(self, filename: str, overwrite: bool = False) -> bool: """ Save the current board to a new file. Args: filename: Target filename overwrite: Whether to overwrite existing file Returns: True if successful """ self.ensure_board_open() try: self._current_board.save_as(filename, overwrite=overwrite) logger.info(f"Board saved as: {filename}") return True except Exception as e: logger.error(f"Failed to save board as {filename}: {e}") return False def get_board_as_string(self) -> Optional[str]: """Get board content as KiCad file format string.""" self.ensure_board_open() try: return self._current_board.get_as_string() except Exception as e: logger.error(f"Failed to get board as string: {e}") return None def refill_zones(self, timeout: float = 30.0) -> bool: """ Refill all zones on the board. Args: timeout: Maximum time to wait for completion Returns: True if successful """ self.ensure_board_open() try: self._current_board.refill_zones(block=True, max_poll_seconds=timeout) logger.info("Zones refilled successfully") return True except Exception as e: logger.error(f"Failed to refill zones: {e}") return False # Analysis operations def get_board_statistics(self) -> Dict[str, Any]: """ Get comprehensive board statistics. Returns: Dictionary with board statistics """ self.ensure_board_open() try: footprints = self.get_footprints() nets = self.get_nets() tracks = self.get_tracks() stats = { "footprint_count": len(footprints), "net_count": len(nets), "track_count": len([t for t in tracks if isinstance(t, Track)]), "via_count": len([t for t in tracks if isinstance(t, Via)]), "board_name": self._current_board.name, } # Component breakdown by reference prefix component_types = {} for fp in footprints: prefix = ''.join(c for c in fp.reference if c.isalpha()) component_types[prefix] = component_types.get(prefix, 0) + 1 stats["component_types"] = component_types return stats except Exception as e: logger.error(f"Failed to get board statistics: {e}") return {} def check_connectivity(self) -> Dict[str, Any]: """ Check board connectivity status. Returns: Dictionary with connectivity information """ self.ensure_board_open() try: nets = self.get_nets() tracks = self.get_tracks() # Count routed vs unrouted nets routed_nets = set() for track in tracks: if hasattr(track, 'net') and track.net: routed_nets.add(track.net.name) total_nets = len([n for n in nets if n.name and n.name != ""]) routed_count = len(routed_nets) unrouted_count = total_nets - routed_count return { "total_nets": total_nets, "routed_nets": routed_count, "unrouted_nets": unrouted_count, "routing_completion": round(routed_count / max(total_nets, 1) * 100, 1), "routed_net_names": list(routed_nets) } except Exception as e: logger.error(f"Failed to check connectivity: {e}") return {} @contextmanager def kicad_ipc_session(project_path: str = None, board_path: str = None): """ Context manager for KiCad IPC sessions. Args: project_path: Optional project file to open board_path: Optional board file to open Usage: with kicad_ipc_session("/path/to/project.kicad_pro") as client: client.move_footprint("R1", Vector2(10, 20)) """ client = KiCadIPCClient() try: if not client.connect(): raise KiCadIPCError("Failed to connect to KiCad IPC server") if project_path: if not client.open_project(project_path): raise KiCadIPCError(f"Failed to open project: {project_path}") if board_path: if not client.open_board(board_path): raise KiCadIPCError(f"Failed to open board: {board_path}") yield client finally: client.disconnect() def check_kicad_availability() -> Dict[str, Any]: """ Check if KiCad IPC API is available and working. Returns: Dictionary with availability status and version info """ try: with kicad_ipc_session() as client: version = client.get_version() return { "available": True, "version": version, "message": f"KiCad IPC API available (version {version})" } except Exception as e: return { "available": False, "version": None, "message": f"KiCad IPC API not available: {e}", "error": str(e) } # Utility functions for common operations def get_project_board_path(project_path: str) -> str: """ Get the board file path from a project file path. Args: project_path: Path to .kicad_pro file Returns: Path to corresponding .kicad_pcb file """ if project_path.endswith('.kicad_pro'): return project_path.replace('.kicad_pro', '.kicad_pcb') else: raise ValueError("Project path must end with .kicad_pro") def format_position(x_mm: float, y_mm: float) -> Vector2: """ Create a Vector2 position from millimeter coordinates. Args: x_mm: X coordinate in millimeters y_mm: Y coordinate in millimeters Returns: Vector2 position """ return Vector2.from_xy_mm(x_mm, y_mm)