""" KiCad IPC Client Utility Provides a clean interface to the KiCad IPC API for real-time design manipulation. This module wraps the kicad-python library to provide MCP-specific functionality and error handling for automated design operations. """ from contextlib import contextmanager import logging from typing import Any from kipy import KiCad from kipy.board import Board from kipy.board_types import FootprintInstance, Net, Track, Via from kipy.geometry import Vector2 from kipy.project import Project logger = logging.getLogger(__name__) class KiCadIPCError(Exception): """Custom exception for KiCad IPC operations.""" pass class KiCadIPCClient: """ High-level client for KiCad IPC API operations. Provides a convenient interface for common operations needed by the MCP server, including project management, component placement, routing, and file operations. """ def __init__(self, host: str = "localhost", port: int = 5555): """ Initialize the KiCad IPC client. Args: host: KiCad IPC server host (default: localhost) port: KiCad IPC server port (default: 5555) """ self.host = host self.port = port self._kicad: KiCad | None = None self._current_project: Project | None = None self._current_board: Board | None = None def connect(self) -> bool: """ Connect to KiCad IPC server. Returns: True if connection successful, False otherwise """ try: self._kicad = KiCad() version = self._kicad.get_version() logger.info(f"Connected to KiCad {version}") return True except Exception as e: logger.error(f"Failed to connect to KiCad IPC server: {e}") self._kicad = None return False def disconnect(self): """Disconnect from KiCad IPC server.""" if self._kicad: try: self._kicad.close() except Exception as e: logger.warning(f"Error during disconnect: {e}") finally: self._kicad = None self._current_project = None self._current_board = None @property def is_connected(self) -> bool: """Check if connected to KiCad.""" return self._kicad is not None def ensure_connected(self): """Ensure connection to KiCad, raise exception if not connected.""" if not self.is_connected: raise KiCadIPCError("Not connected to KiCad IPC server. Call connect() first.") def get_version(self) -> str: """Get KiCad version.""" self.ensure_connected() return self._kicad.get_version() def open_project(self, project_path: str) -> bool: """ Open a KiCad project. Args: project_path: Path to .kicad_pro file Returns: True if project opened successfully """ self.ensure_connected() try: self._current_project = self._kicad.open_project(project_path) logger.info(f"Opened project: {project_path}") return True except Exception as e: logger.error(f"Failed to open project {project_path}: {e}") return False def open_board(self, board_path: str) -> bool: """ Open a KiCad board. Args: board_path: Path to .kicad_pcb file Returns: True if board opened successfully """ self.ensure_connected() try: self._current_board = self._kicad.open_board(board_path) logger.info(f"Opened board: {board_path}") return True except Exception as e: logger.error(f"Failed to open board {board_path}: {e}") return False @property def current_project(self) -> Project | None: """Get current project.""" return self._current_project @property def current_board(self) -> Board | None: """Get current board.""" return self._current_board def ensure_board_open(self): """Ensure a board is open, raise exception if not.""" if not self._current_board: raise KiCadIPCError("No board is currently open. Call open_board() first.") @contextmanager def commit_transaction(self, message: str = "MCP operation"): """ Context manager for grouping operations into a single commit. Args: message: Commit message for undo history """ self.ensure_board_open() commit = self._current_board.begin_commit() try: yield self._current_board.push_commit(commit, message) except Exception: self._current_board.drop_commit(commit) raise # Component and footprint operations def get_footprints(self) -> list[FootprintInstance]: """Get all footprints on the current board.""" self.ensure_board_open() return list(self._current_board.get_footprints()) def get_footprint_by_reference(self, reference: str) -> FootprintInstance | None: """ Get footprint by reference designator. Args: reference: Component reference (e.g., "R1", "U3") Returns: FootprintInstance if found, None otherwise """ footprints = self.get_footprints() for fp in footprints: if fp.reference == reference: return fp return None def move_footprint(self, reference: str, position: Vector2) -> bool: """ Move a footprint to a new position. Args: reference: Component reference position: New position (Vector2) Returns: True if successful """ self.ensure_board_open() try: footprint = self.get_footprint_by_reference(reference) if not footprint: logger.error(f"Footprint {reference} not found") return False with self.commit_transaction(f"Move {reference} to {position}"): footprint.position = position self._current_board.update_items(footprint) logger.info(f"Moved {reference} to {position}") return True except Exception as e: logger.error(f"Failed to move footprint {reference}: {e}") return False def rotate_footprint(self, reference: str, angle_degrees: float) -> bool: """ Rotate a footprint. Args: reference: Component reference angle_degrees: Rotation angle in degrees Returns: True if successful """ self.ensure_board_open() try: footprint = self.get_footprint_by_reference(reference) if not footprint: logger.error(f"Footprint {reference} not found") return False with self.commit_transaction(f"Rotate {reference} by {angle_degrees}°"): footprint.rotation = angle_degrees self._current_board.update_items(footprint) logger.info(f"Rotated {reference} by {angle_degrees}°") return True except Exception as e: logger.error(f"Failed to rotate footprint {reference}: {e}") return False # Net and routing operations def get_nets(self) -> list[Net]: """Get all nets on the current board.""" self.ensure_board_open() return list(self._current_board.get_nets()) def get_net_by_name(self, name: str) -> Net | None: """ Get net by name. Args: name: Net name Returns: Net if found, None otherwise """ nets = self.get_nets() for net in nets: if net.name == name: return net return None def get_tracks(self) -> list[Track | Via]: """Get all tracks and vias on the current board.""" self.ensure_board_open() tracks = list(self._current_board.get_tracks()) vias = list(self._current_board.get_vias()) return tracks + vias def delete_tracks_by_net(self, net_name: str) -> bool: """ Delete all tracks for a specific net. Args: net_name: Name of the net to clear Returns: True if successful """ self.ensure_board_open() try: net = self.get_net_by_name(net_name) if not net: logger.warning(f"Net {net_name} not found") return False tracks_to_delete = [] for track in self.get_tracks(): if hasattr(track, 'net') and track.net == net: tracks_to_delete.append(track) if tracks_to_delete: with self.commit_transaction(f"Delete tracks for net {net_name}"): self._current_board.remove_items(tracks_to_delete) logger.info(f"Deleted {len(tracks_to_delete)} tracks for net {net_name}") return True except Exception as e: logger.error(f"Failed to delete tracks for net {net_name}: {e}") return False # Board operations def save_board(self) -> bool: """Save the current board.""" self.ensure_board_open() try: self._current_board.save() logger.info("Board saved successfully") return True except Exception as e: logger.error(f"Failed to save board: {e}") return False def save_board_as(self, filename: str, overwrite: bool = False) -> bool: """ Save the current board to a new file. Args: filename: Target filename overwrite: Whether to overwrite existing file Returns: True if successful """ self.ensure_board_open() try: self._current_board.save_as(filename, overwrite=overwrite) logger.info(f"Board saved as: {filename}") return True except Exception as e: logger.error(f"Failed to save board as {filename}: {e}") return False def get_board_as_string(self) -> str | None: """Get board content as KiCad file format string.""" self.ensure_board_open() try: return self._current_board.get_as_string() except Exception as e: logger.error(f"Failed to get board as string: {e}") return None def refill_zones(self, timeout: float = 30.0) -> bool: """ Refill all zones on the board. Args: timeout: Maximum time to wait for completion Returns: True if successful """ self.ensure_board_open() try: self._current_board.refill_zones(block=True, max_poll_seconds=timeout) logger.info("Zones refilled successfully") return True except Exception as e: logger.error(f"Failed to refill zones: {e}") return False # Analysis operations def get_board_statistics(self) -> dict[str, Any]: """ Get comprehensive board statistics. Returns: Dictionary with board statistics """ self.ensure_board_open() try: footprints = self.get_footprints() nets = self.get_nets() tracks = self.get_tracks() stats = { "footprint_count": len(footprints), "net_count": len(nets), "track_count": len([t for t in tracks if isinstance(t, Track)]), "via_count": len([t for t in tracks if isinstance(t, Via)]), "board_name": self._current_board.name, } # Component breakdown by reference prefix component_types = {} for fp in footprints: prefix = ''.join(c for c in fp.reference if c.isalpha()) component_types[prefix] = component_types.get(prefix, 0) + 1 stats["component_types"] = component_types return stats except Exception as e: logger.error(f"Failed to get board statistics: {e}") return {} def check_connectivity(self) -> dict[str, Any]: """ Check board connectivity status. Returns: Dictionary with connectivity information """ self.ensure_board_open() try: nets = self.get_nets() tracks = self.get_tracks() # Count routed vs unrouted nets routed_nets = set() for track in tracks: if hasattr(track, 'net') and track.net: routed_nets.add(track.net.name) total_nets = len([n for n in nets if n.name and n.name != ""]) routed_count = len(routed_nets) unrouted_count = total_nets - routed_count return { "total_nets": total_nets, "routed_nets": routed_count, "unrouted_nets": unrouted_count, "routing_completion": round(routed_count / max(total_nets, 1) * 100, 1), "routed_net_names": list(routed_nets) } except Exception as e: logger.error(f"Failed to check connectivity: {e}") return {} @contextmanager def kicad_ipc_session(project_path: str = None, board_path: str = None): """ Context manager for KiCad IPC sessions. Args: project_path: Optional project file to open board_path: Optional board file to open Usage: with kicad_ipc_session("/path/to/project.kicad_pro") as client: client.move_footprint("R1", Vector2(10, 20)) """ client = KiCadIPCClient() try: if not client.connect(): raise KiCadIPCError("Failed to connect to KiCad IPC server") if project_path: if not client.open_project(project_path): raise KiCadIPCError(f"Failed to open project: {project_path}") if board_path: if not client.open_board(board_path): raise KiCadIPCError(f"Failed to open board: {board_path}") yield client finally: client.disconnect() def check_kicad_availability() -> dict[str, Any]: """ Check if KiCad IPC API is available and working. Returns: Dictionary with availability status and version info """ try: with kicad_ipc_session() as client: version = client.get_version() return { "available": True, "version": version, "message": f"KiCad IPC API available (version {version})" } except Exception as e: return { "available": False, "version": None, "message": f"KiCad IPC API not available: {e}", "error": str(e) } # Utility functions for common operations def get_project_board_path(project_path: str) -> str: """ Get the board file path from a project file path. Args: project_path: Path to .kicad_pro file Returns: Path to corresponding .kicad_pcb file """ if project_path.endswith('.kicad_pro'): return project_path.replace('.kicad_pro', '.kicad_pcb') else: raise ValueError("Project path must end with .kicad_pro") def format_position(x_mm: float, y_mm: float) -> Vector2: """ Create a Vector2 position from millimeter coordinates. Args: x_mm: X coordinate in millimeters y_mm: Y coordinate in millimeters Returns: Vector2 position """ return Vector2.from_xy_mm(x_mm, y_mm)