Ryan Malloy 4ae38fed59 Rebuild on FastMCP 3 with src-layout and kicad-sch-api integration
Migrate from FastMCP 2.14.5 to 3.1.0 with complete architectural
overhaul. Adopt src-layout packaging, lazy config functions to
eliminate .env race condition, and decorator-based tool registration.

Consolidate 14 tool modules into 8 focused modules (33 tools total).
Add 9 new schematic tools via kicad-sch-api for creating and
manipulating .kicad_sch files. Drop pandas dependency (BOM uses
stdlib csv). Remove ~17k lines of stubs, over-engineering, and
dead code.

All checks pass: ruff clean, mypy 0 errors, 17/17 tests green.
2026-03-03 18:26:54 -07:00

301 lines
9.1 KiB
Python

"""
PCB manipulation tools via KiCad IPC API.
Provides direct board-level operations -- moving and rotating
components, querying board statistics and connectivity, and refilling
copper zones -- all through a live connection to a running KiCad
instance.
"""
import logging
from typing import Any
from mckicad.server import mcp
from mckicad.utils.file_utils import get_project_files
from mckicad.utils.ipc_client import (
check_kicad_availability,
format_position,
kicad_ipc_session,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared pre-flight helper
# ---------------------------------------------------------------------------
def _get_board_path(project_path: str) -> tuple[str | None, dict[str, Any] | None]:
"""Resolve project_path to a .kicad_pcb path.
Returns (board_path, None) on success or (None, error_dict) on
failure.
"""
files = get_project_files(project_path)
if "pcb" not in files:
return None, {
"success": False,
"error": "PCB file not found in project",
}
ipc_status = check_kicad_availability()
if not ipc_status["available"]:
return None, {
"success": False,
"error": f"KiCad IPC not available: {ipc_status['message']}",
}
return files["pcb"], None
# ---------------------------------------------------------------------------
# Tools
# ---------------------------------------------------------------------------
@mcp.tool()
def move_component(
project_path: str,
reference: str,
x_mm: float,
y_mm: float,
) -> dict[str, Any]:
"""Move a component to a new absolute position on the PCB.
The move is wrapped in a KiCad undo transaction so it can be
reversed inside KiCad with Ctrl-Z.
Args:
project_path: Path to the KiCad project (.kicad_pro) or its
parent directory.
reference: Reference designator of the component to move
(e.g. "R1", "U3", "C12").
x_mm: Target X coordinate in millimetres.
y_mm: Target Y coordinate in millimetres.
Returns:
Dictionary confirming success and the new position, or an error
message.
"""
try:
board_path, err = _get_board_path(project_path)
if err or not board_path:
return err or {"success": False, "error": "Could not resolve board path"}
with kicad_ipc_session(board_path=board_path) as client:
position = format_position(x_mm, y_mm)
success = client.move_footprint(reference, position)
if not success:
return {
"success": False,
"error": f"Failed to move component '{reference}' -- "
"check that the reference exists on the board",
}
return {
"success": True,
"reference": reference,
"new_position": {"x_mm": x_mm, "y_mm": y_mm},
"project_path": project_path,
}
except Exception as e:
logger.error(f"Error moving component {reference}: {e}")
return {
"success": False,
"error": str(e),
"reference": reference,
"project_path": project_path,
}
@mcp.tool()
def rotate_component(
project_path: str,
reference: str,
angle_degrees: float,
) -> dict[str, Any]:
"""Set a component's rotation angle on the PCB.
The angle is absolute (not additive). For example, passing 90.0
sets the component to 90 degrees regardless of its current
orientation. The operation is wrapped in a KiCad undo transaction.
Args:
project_path: Path to the KiCad project (.kicad_pro) or its
parent directory.
reference: Reference designator (e.g. "R1", "U3").
angle_degrees: Target rotation in degrees (0 -- 360).
Returns:
Dictionary confirming success and the applied angle, or an
error message.
"""
try:
board_path, err = _get_board_path(project_path)
if err or not board_path:
return err or {"success": False, "error": "Could not resolve board path"}
with kicad_ipc_session(board_path=board_path) as client:
success = client.rotate_footprint(reference, angle_degrees)
if not success:
return {
"success": False,
"error": f"Failed to rotate component '{reference}' -- "
"check that the reference exists on the board",
}
return {
"success": True,
"reference": reference,
"angle_degrees": angle_degrees,
"project_path": project_path,
}
except Exception as e:
logger.error(f"Error rotating component {reference}: {e}")
return {
"success": False,
"error": str(e),
"reference": reference,
"project_path": project_path,
}
@mcp.tool()
def get_board_statistics(project_path: str) -> dict[str, Any]:
"""Retrieve high-level board statistics from a live KiCad instance.
Returns counts of footprints, nets, tracks, and vias, plus a
breakdown of component types by reference-designator prefix
(e.g. R, C, U).
Args:
project_path: Path to the KiCad project (.kicad_pro) or its
parent directory.
Returns:
Dictionary with board statistics or an error message.
"""
try:
board_path, err = _get_board_path(project_path)
if err or not board_path:
return err or {"success": False, "error": "Could not resolve board path"}
with kicad_ipc_session(board_path=board_path) as client:
stats = client.get_board_statistics()
if not stats:
return {
"success": False,
"error": "Failed to retrieve board statistics",
}
return {
"success": True,
"project_path": project_path,
"board_path": board_path,
"statistics": stats,
}
except Exception as e:
logger.error(f"Error getting board statistics: {e}")
return {
"success": False,
"error": str(e),
"project_path": project_path,
}
@mcp.tool()
def check_connectivity(project_path: str) -> dict[str, Any]:
"""Check the routing connectivity status of the PCB.
Reports total nets, how many are routed vs unrouted, the overall
routing-completion percentage, and the names of routed nets.
Args:
project_path: Path to the KiCad project (.kicad_pro) or its
parent directory.
Returns:
Dictionary with connectivity status or an error message.
"""
try:
board_path, err = _get_board_path(project_path)
if err or not board_path:
return err or {"success": False, "error": "Could not resolve board path"}
with kicad_ipc_session(board_path=board_path) as client:
connectivity = client.check_connectivity()
if not connectivity:
return {
"success": False,
"error": "Failed to check connectivity",
}
return {
"success": True,
"project_path": project_path,
"board_path": board_path,
"connectivity": connectivity,
}
except Exception as e:
logger.error(f"Error checking connectivity: {e}")
return {
"success": False,
"error": str(e),
"project_path": project_path,
}
@mcp.tool()
def refill_zones(project_path: str) -> dict[str, Any]:
"""Refill all copper zones on the PCB.
Triggers a full zone refill in KiCad, which recomputes copper
fills for every zone on the board. This is useful after component
moves, routing changes, or design-rule updates. The call blocks
until the refill completes (up to 30 s timeout).
Args:
project_path: Path to the KiCad project (.kicad_pro) or its
parent directory.
Returns:
Dictionary confirming success or an error message.
"""
try:
board_path, err = _get_board_path(project_path)
if err or not board_path:
return err or {"success": False, "error": "Could not resolve board path"}
with kicad_ipc_session(board_path=board_path) as client:
success = client.refill_zones()
if not success:
return {
"success": False,
"error": "Zone refill failed -- check KiCad for details",
}
return {
"success": True,
"project_path": project_path,
"board_path": board_path,
"message": "All zones refilled successfully",
}
except Exception as e:
logger.error(f"Error refilling zones: {e}")
return {
"success": False,
"error": str(e),
"project_path": project_path,
}