From b7e4fc6859c5e1754bcc694f013894b8ff74f484 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Wed, 4 Mar 2026 16:55:19 -0700 Subject: [PATCH] Fix pin extraction, connectivity, hierarchy, and label counting Root cause: kicad-sch-api doesn't back-populate comp.pins or sch.nets on loaded schematics. All data is accessible through alternative APIs. Pin extraction: use comp.get_symbol_definition().pins for metadata and sch.list_component_pins(ref) for schematic-transformed positions. Connectivity: new wire-walking engine using union-find on coordinates. Walks wires, pin positions, labels, and power symbols to reconstruct the net graph. Replaces broken ConnectivityAnalyzer/sch.nets fallbacks. Eliminates 'unhashable type: Net' crash. Hierarchy: use sch.sheets.get_sheet_hierarchy() instead of the broken sheets.data.get("sheet", []) raw dict approach. Labels: supplement sch.get_statistics() with sch.labels.get_statistics() and sch.hierarchical_labels for accurate counts. 99 tests passing, lint clean. --- src/mckicad/tools/schematic.py | 403 +++++++++- src/mckicad/tools/schematic_analysis.py | 996 ++++++++++++++++++++++++ tests/test_schematic.py | 99 +++ tests/test_schematic_analysis.py | 139 ++++ 4 files changed, 1616 insertions(+), 21 deletions(-) create mode 100644 src/mckicad/tools/schematic_analysis.py create mode 100644 tests/test_schematic_analysis.py diff --git a/src/mckicad/tools/schematic.py b/src/mckicad/tools/schematic.py index 8ad13db..bb8684f 100644 --- a/src/mckicad/tools/schematic.py +++ b/src/mckicad/tools/schematic.py @@ -6,11 +6,16 @@ Designed so that the underlying engine can be swapped to kipy IPC once KiCad exposes a schematic API over its IPC transport. """ +from collections import Counter +import contextlib import logging import os +import re from typing import Any +from mckicad.config import INLINE_RESULT_THRESHOLD from mckicad.server import mcp +from mckicad.utils.file_utils import write_detail_file logger = logging.getLogger(__name__) @@ -245,6 +250,23 @@ def search_components(query: str, library: str | None = None) -> dict[str, Any]: results.append(entry) logger.info("Symbol search for '%s' returned %d results", query, len(results)) + + # Large result → sidecar file with top results inline + if len(results) > INLINE_RESULT_THRESHOLD: + safe_query = re.sub(r"[^\w\-]", "_", query)[:30] + detail_path = write_detail_file(None, f"search_{safe_query}.json", results) + return { + "success": True, + "query": query, + "library": library, + "count": len(results), + "results": results[:10], + "truncated": True, + "detail_file": detail_path, + "hint": f"Showing first 10 of {len(results)} results. Full list in detail_file.", + "engine": _get_schematic_engine(), + } + return { "success": True, "query": query, @@ -535,19 +557,29 @@ def add_hierarchical_sheet( @mcp.tool() -def list_components(schematic_path: str) -> dict[str, Any]: - """List all components placed in a KiCad schematic. +def list_components( + schematic_path: str, + reference: str | None = None, +) -> dict[str, Any]: + """List components in a KiCad schematic, or look up a single component. - Returns reference designators, library IDs, values, and positions for - every symbol instance on the schematic. Useful for verifying placement - or preparing to wire components with ``connect_pins``. + When called without ``reference``, returns all components. For large + schematics (>{threshold} components), a compact summary is returned + inline and the full list is written to ``.mckicad/components.json`` + — read that file for complete data. + + When ``reference`` is provided, returns only that component's details + (always inline, always compact). Args: schematic_path: Path to a .kicad_sch file. + reference: Optional reference designator to look up a single + component (e.g. ``U1``, ``R42``). Returns: - Dictionary with ``success``, ``count``, and a ``components`` list. - """ + Dictionary with ``success``, ``count``, and either inline + ``components`` list or a ``detail_file`` path. + """.replace("{threshold}", str(INLINE_RESULT_THRESHOLD)) err = _require_sch_api() if err: return err @@ -577,7 +609,43 @@ def list_components(schematic_path: str) -> dict[str, Any]: components.append(entry) + # Single-component lookup + if reference: + match = [c for c in components if c.get("reference") == reference] + if not match: + return { + "success": False, + "error": f"Component '{reference}' not found in schematic", + "schematic_path": schematic_path, + } + return { + "success": True, + "count": 1, + "components": match, + "schematic_path": schematic_path, + "engine": _get_schematic_engine(), + } + logger.info("Listed %d components in %s", len(components), schematic_path) + + # Large result → sidecar file + if len(components) > INLINE_RESULT_THRESHOLD: + prefix_counts = Counter( + re.match(r"[A-Za-z]+", c.get("reference", "") or "").group() # type: ignore[union-attr] + for c in components + if c.get("reference") and re.match(r"[A-Za-z]+", c.get("reference", "")) + ) + detail_path = write_detail_file(schematic_path, "components.json", components) + return { + "success": True, + "count": len(components), + "breakdown": dict(prefix_counts), + "detail_file": detail_path, + "hint": "Full component list written to detail_file. Read it for complete data.", + "schematic_path": schematic_path, + "engine": _get_schematic_engine(), + } + return { "success": True, "count": len(components), @@ -592,17 +660,19 @@ def list_components(schematic_path: str) -> dict[str, Any]: @mcp.tool() def get_schematic_info(schematic_path: str) -> dict[str, Any]: - """Get metadata, statistics, and validation results for a KiCad schematic. + """Get a compact overview of a KiCad schematic. - Provides a single-call overview of the schematic including component - counts, wire counts, label inventory, and any validation issues - detected by the parser. + Returns statistics and a validation summary inline. For schematics with + many unique symbols, full symbol details and validation issues are + written to ``.mckicad/schematic_info.json`` — read that file when you + need per-symbol data. Args: schematic_path: Path to a .kicad_sch file. Returns: - Dictionary with ``success``, ``statistics``, and ``validation`` data. + Dictionary with ``success``, ``statistics``, ``validation`` summary, + ``unique_symbol_count``, and optionally a ``detail_file`` path. """ err = _require_sch_api() if err: @@ -619,12 +689,30 @@ def get_schematic_info(schematic_path: str) -> dict[str, Any]: # Gather statistics stats = sch.get_statistics() + # Supplement label statistics — sch.get_statistics() often reports 0 + # for labels because the raw data parser doesn't count them. + # Use sch.labels.get_statistics() for accurate counts. + with contextlib.suppress(Exception): + label_stats = sch.labels.get_statistics() + if isinstance(label_stats, dict): + if "text_elements" not in stats or not isinstance(stats.get("text_elements"), dict): + stats["text_elements"] = {} + label_count = label_stats.get("total_labels", label_stats.get("item_count", 0)) + stats["text_elements"]["label"] = label_count + + with contextlib.suppress(Exception): + hier_labels = list(getattr(sch, "hierarchical_labels", [])) + if "text_elements" not in stats or not isinstance(stats.get("text_elements"), dict): + stats["text_elements"] = {} + stats["text_elements"]["global_label"] = len(hier_labels) + stats["text_elements"]["total_text_elements"] = ( + stats["text_elements"].get("label", 0) + len(hier_labels) + ) + # Run validation issues = sch.validate() - # Try to extract symbol-level details via get_symbol_info for - # each unique lib_id in the schematic, but don't fail if the - # function is unavailable or individual lookups fail. + # Collect symbol details (for sidecar file) lib_ids_seen: set[str] = set() symbol_details: list[dict[str, Any]] = [] for comp in sch.components: @@ -645,10 +733,9 @@ def get_schematic_info(schematic_path: str) -> dict[str, Any]: } ) except Exception: - # Non-critical -- just skip symbols we can't look up symbol_details.append({"lib_id": lid, "lookup_failed": True}) - # Normalise stats and issues to dicts if they aren't already + # Normalise stats and issues if not isinstance(stats, dict): stats = {"raw": str(stats)} if not isinstance(issues, list): @@ -657,20 +744,294 @@ def get_schematic_info(schematic_path: str) -> dict[str, Any]: validation_passed = len(issues) == 0 logger.info("Retrieved info for %s: %d issues", schematic_path, len(issues)) - return { + + result: dict[str, Any] = { "success": True, "schematic_path": schematic_path, "statistics": stats, "validation": { "passed": validation_passed, "issue_count": len(issues), - "issues": issues, }, - "symbol_details": symbol_details, + "unique_symbol_count": len(symbol_details), + "engine": _get_schematic_engine(), + } + + # Write large detail data to sidecar file + if len(symbol_details) > INLINE_RESULT_THRESHOLD or len(issues) > INLINE_RESULT_THRESHOLD: + detail_data = { + "symbol_details": symbol_details, + "validation_issues": issues, + } + result["detail_file"] = write_detail_file( + schematic_path, "schematic_info.json", detail_data + ) + result["hint"] = ( + "Symbol details and validation issues written to detail_file. " + "Read it for per-symbol data." + ) + else: + # Small enough to return inline + result["validation"]["issues"] = issues + result["symbol_details"] = symbol_details + + return result + except Exception as e: + logger.error("Failed to get schematic info for %s: %s", schematic_path, e) + return {"success": False, "error": str(e), "schematic_path": schematic_path} + + +@mcp.tool() +def get_component_detail(schematic_path: str, reference: str) -> dict[str, Any]: + """Get full details for a single component: properties, footprint, pins, position. + + Always returns a compact inline result (single component). Use this + for deep inspection after ``list_components`` identifies a component + of interest. + + Args: + schematic_path: Path to a .kicad_sch file. + reference: Reference designator (e.g. ``U1``, ``R42``). + + Returns: + Dictionary with ``success`` and detailed component data including + properties, pin list, footprint, and validation results. + """ + err = _require_sch_api() + if err: + return err + + schematic_path = _expand(schematic_path) + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + try: + sch = _ksa_load(schematic_path) + comp = sch.components.get(reference) + if comp is None: + return { + "success": False, + "error": f"Component '{reference}' not found in schematic", + "schematic_path": schematic_path, + } + + detail: dict[str, Any] = { + "reference": reference, + "lib_id": getattr(comp, "lib_id", None), + "value": getattr(comp, "value", None), + "footprint": getattr(comp, "footprint", None), + } + + # Position + pos = getattr(comp, "position", None) + if pos is not None: + if isinstance(pos, (list, tuple)) and len(pos) >= 2: + detail["position"] = {"x": pos[0], "y": pos[1]} + else: + detail["position"] = str(pos) + + # Rotation + detail["rotation"] = getattr(comp, "rotation", None) + + # Properties + try: + if hasattr(comp, "to_dict"): + comp_dict = comp.to_dict() + detail["properties"] = comp_dict.get("properties", {}) + elif hasattr(comp, "properties"): + detail["properties"] = ( + dict(comp.properties.items()) + if hasattr(comp.properties, "items") + else str(comp.properties) + ) + except Exception: + detail["properties"] = None + + # Pins — use get_symbol_definition().pins for metadata and + # sch.list_component_pins() for schematic-transformed positions. + # comp.pins is empty on loaded schematics (kicad-sch-api gap). + pins: list[dict[str, Any]] = [] + + pin_positions: dict[str, Any] = {} + with contextlib.suppress(Exception): + for pin_num, pos in sch.list_component_pins(reference): + pin_positions[str(pin_num)] = pos + + raw_pins: list[Any] | None = None + with contextlib.suppress(Exception): + sym_def = comp.get_symbol_definition() + if sym_def is not None: + raw_pins = list(sym_def.pins) + + if not raw_pins: + if hasattr(comp, "list_pins"): + with contextlib.suppress(Exception): + raw_pins = list(comp.list_pins()) + if not raw_pins: + raw_pins = list(getattr(comp, "pins", [])) + + for p in (raw_pins or []): + if isinstance(p, dict): + pins.append(p) + else: + pin_num = str(getattr(p, "number", getattr(p, "pin_number", ""))) + pin_entry: dict[str, Any] = { + "number": pin_num, + "name": getattr(p, "name", getattr(p, "pin_name", None)), + "type": str(getattr(p, "pin_type", getattr(p, "electrical_type", getattr(p, "type", None)))), + } + if pin_num in pin_positions: + spos = pin_positions[pin_num] + pin_entry["position"] = {"x": spos.x, "y": spos.y} + else: + ppos = getattr(p, "position", None) + if ppos is not None: + if hasattr(ppos, "x"): + pin_entry["position"] = {"x": ppos.x, "y": ppos.y} + else: + pin_entry["position"] = str(ppos) + pins.append(pin_entry) + + detail["pins"] = pins + detail["pin_count"] = len(pins) + + # BOM/board flags + detail["in_bom"] = getattr(comp, "in_bom", None) + detail["on_board"] = getattr(comp, "on_board", None) + + # Validation + try: + if hasattr(comp, "validate"): + detail["validation"] = comp.validate() + except Exception: + detail["validation"] = None + + logger.info("Got detail for %s in %s", reference, schematic_path) + return { + "success": True, + "component": detail, + "schematic_path": schematic_path, "engine": _get_schematic_engine(), } except Exception as e: - logger.error("Failed to get schematic info for %s: %s", schematic_path, e) + logger.error("Failed to get detail for %s in %s: %s", reference, schematic_path, e) + return {"success": False, "error": str(e), "schematic_path": schematic_path} + + +@mcp.tool() +def get_schematic_hierarchy(schematic_path: str) -> dict[str, Any]: + """Get the hierarchical sheet tree of a KiCad schematic. + + Returns the sheet structure with filenames and per-sheet component + counts. Essential for multi-sheet designs — use this to understand + the design's organization before diving into specific sheets. + + Args: + schematic_path: Path to the root .kicad_sch file. + + Returns: + Dictionary with ``success`` and a ``hierarchy`` tree of sheets. + """ + err = _require_sch_api() + if err: + return err + + schematic_path = _expand(schematic_path) + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + try: + sch = _ksa_load(schematic_path) + schematic_dir = os.path.dirname(schematic_path) + + def _count_components(filepath: str) -> int: + """Load a schematic and count its components. Returns -1 on failure.""" + if not os.path.isfile(filepath): + return -1 + try: + sheet_sch = _ksa_load(filepath) + return len(list(sheet_sch.components)) + except Exception: + return -1 + + def _walk_hierarchy(node: dict[str, Any], base_dir: str) -> dict[str, Any]: + """Convert a SheetManager hierarchy node into our output format.""" + filename = node.get("filename", "") + info: dict[str, Any] = { + "name": node.get("name", "unknown"), + "filename": filename, + } + + if filename: + filepath = os.path.join(base_dir, filename) + count = _count_components(filepath) + if count >= 0: + info["component_count"] = count + else: + info["component_count"] = 0 + info["note"] = "file not found or failed to load" + else: + info["component_count"] = 0 + + children = node.get("children", []) + if children: + info["sheets"] = [_walk_hierarchy(child, base_dir) for child in children] + info["total_sheets"] = 1 + sum( + c.get("total_sheets", 1) for c in info["sheets"] + ) + else: + info["total_sheets"] = 1 + + return info + + # Use SheetManager API — sch.sheets.get_sheet_hierarchy() returns a + # tree with children already identified from the S-expression data. + hierarchy_data = None + with contextlib.suppress(Exception): + hierarchy_data = sch.sheets.get_sheet_hierarchy() + + if hierarchy_data and isinstance(hierarchy_data, dict) and "root" in hierarchy_data: + root_node = hierarchy_data["root"] + + root_info: dict[str, Any] = { + "name": "root", + "filename": os.path.basename(schematic_path), + "component_count": len(list(sch.components)), + } + + children = root_node.get("children", []) + if children: + root_info["sheets"] = [ + _walk_hierarchy(child, schematic_dir) for child in children + ] + root_info["total_sheets"] = 1 + sum( + c.get("total_sheets", 1) for c in root_info["sheets"] + ) + else: + root_info["total_sheets"] = 1 + + hierarchy = root_info + else: + # Fallback: report just the root sheet + hierarchy = { + "name": "root", + "filename": os.path.basename(schematic_path), + "component_count": len(list(sch.components)), + "total_sheets": 1, + "note": "Sheet hierarchy API unavailable — only root sheet reported", + } + + logger.info("Got hierarchy for %s: %d total sheets", schematic_path, hierarchy["total_sheets"]) + return { + "success": True, + "hierarchy": hierarchy, + "schematic_path": schematic_path, + "engine": _get_schematic_engine(), + } + except Exception as e: + logger.error("Failed to get hierarchy for %s: %s", schematic_path, e) return {"success": False, "error": str(e), "schematic_path": schematic_path} diff --git a/src/mckicad/tools/schematic_analysis.py b/src/mckicad/tools/schematic_analysis.py new file mode 100644 index 0000000..4f8dbbf --- /dev/null +++ b/src/mckicad/tools/schematic_analysis.py @@ -0,0 +1,996 @@ +""" +Schematic analysis and export tools for the mckicad MCP server. + +Provides ERC checking, connectivity analysis, pin-level queries, netlist +export, and PDF export. Uses kicad-sch-api for structural analysis and +kicad-cli for rule checks and file exports, falling back gracefully when +either dependency is unavailable. +""" + +import contextlib +import json +import logging +import os +import tempfile +from typing import Any + +from mckicad.config import INLINE_RESULT_THRESHOLD, TIMEOUT_CONSTANTS +from mckicad.server import mcp +from mckicad.utils.file_utils import write_detail_file +from mckicad.utils.kicad_cli import find_kicad_cli + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Engine detection +# --------------------------------------------------------------------------- + +_HAS_SCH_API = False + +try: + from kicad_sch_api import load_schematic as _ksa_load + + _HAS_SCH_API = True +except ImportError: + logger.warning( + "kicad-sch-api not installed -- schematic analysis tools will return helpful errors. " + "Install with: uv add kicad-sch-api" + ) + + +def _require_sch_api() -> dict[str, Any] | None: + """Return an error dict if kicad-sch-api is unavailable, else None.""" + if not _HAS_SCH_API: + return { + "success": False, + "error": "kicad-sch-api is not installed. Install it with: uv add kicad-sch-api", + "engine": "none", + } + return None + + +def _get_schematic_engine() -> str: + if _HAS_SCH_API: + return "kicad-sch-api" + return "none" + + +def _validate_schematic_path(path: str, must_exist: bool = True) -> dict[str, Any] | None: + """Validate a schematic file path. Returns an error dict on failure, else None.""" + if not path: + return {"success": False, "error": "Schematic path must be a non-empty string"} + expanded = os.path.expanduser(path) + if not expanded.endswith(".kicad_sch"): + return {"success": False, "error": f"Path must end with .kicad_sch, got: {path}"} + if must_exist and not os.path.isfile(expanded): + return {"success": False, "error": f"Schematic file not found: {expanded}"} + return None + + +def _expand(path: str) -> str: + """Expand ~ and return an absolute path.""" + return os.path.abspath(os.path.expanduser(path)) + + +def _require_kicad_cli() -> tuple[str, None] | tuple[None, dict[str, Any]]: + """Find kicad-cli. Returns (cli_path, None) or (None, error_dict).""" + cli_path = find_kicad_cli() + if cli_path is None: + return None, { + "success": False, + "error": ( + "kicad-cli not found. Install KiCad or set the KICAD_CLI_PATH " + "environment variable." + ), + } + return cli_path, None + + +def _sidecar_dir(schematic_path: str) -> str: + """Return the .mckicad/ directory next to a schematic, creating it if needed.""" + parent = os.path.dirname(os.path.abspath(schematic_path)) + sidecar = os.path.join(parent, ".mckicad") + os.makedirs(sidecar, exist_ok=True) + return sidecar + + +def _default_output_path(schematic_path: str, filename: str) -> str: + """Build a default output path inside the .mckicad/ sidecar directory.""" + return os.path.join(_sidecar_dir(schematic_path), filename) + + +# --------------------------------------------------------------------------- +# Wire-walking connectivity builder (union-find) +# --------------------------------------------------------------------------- + + +def _build_connectivity(sch: Any) -> tuple[dict[str, list[dict[str, str]]], list[dict[str, str]]]: + """Build a net connectivity graph by walking wires, pin positions, and labels. + + kicad-sch-api does not auto-compute nets on loaded schematics. This + function reconstructs the connectivity by: + + 1. Collecting all wire start/end coordinates + 2. Mapping component pin positions to coordinates + 3. Mapping label positions to coordinates + 4. Using union-find to group touching points into nets + 5. Merging groups that share the same label text + + Returns: + (net_graph, unconnected_pins) where net_graph maps net names to + lists of {reference, pin} dicts, and unconnected_pins lists + pins not connected to any wire or label. + """ + # -- Union-Find -- + _parent: dict[tuple[float, float], tuple[float, float]] = {} + + def _find(p: tuple[float, float]) -> tuple[float, float]: + if p not in _parent: + _parent[p] = p + while _parent[p] != p: + _parent[p] = _parent[_parent[p]] # path compression + p = _parent[p] + return p + + def _union(a: tuple[float, float], b: tuple[float, float]) -> None: + ra, rb = _find(a), _find(b) + if ra != rb: + _parent[ra] = rb + + def _rc(x: float, y: float) -> tuple[float, float]: + """Round coordinates for stable floating-point comparison.""" + return (round(x, 3), round(y, 3)) + + def _coord_from_point(pt: Any) -> tuple[float, float]: + """Extract (x, y) from a Point-like object or sequence.""" + if hasattr(pt, "x"): + return (float(pt.x), float(pt.y)) + if isinstance(pt, (list, tuple)) and len(pt) >= 2: + return (float(pt[0]), float(pt[1])) + return (0.0, 0.0) + + # --- 1. Process wires — each wire connects its start and end --- + wire_coords: set[tuple[float, float]] = set() + for wire in getattr(sch, "wires", []): + start = getattr(wire, "start", None) + end = getattr(wire, "end", None) + if start is None or end is None: + continue + s = _rc(*_coord_from_point(start)) + e = _rc(*_coord_from_point(end)) + _find(s) + _find(e) + _union(s, e) + wire_coords.add(s) + wire_coords.add(e) + + # --- 2. Map component pin positions to coordinates --- + pin_at: dict[tuple[float, float], list[dict[str, str]]] = {} + all_pins: list[tuple[str, str, tuple[float, float]]] = [] + + for comp in getattr(sch, "components", []): + ref = getattr(comp, "reference", None) + if not ref: + continue + try: + for pin_num, pos in sch.list_component_pins(str(ref)): + coord = _rc(pos.x, pos.y) + _find(coord) + entry = {"reference": str(ref), "pin": str(pin_num)} + pin_at.setdefault(coord, []).append(entry) + all_pins.append((str(ref), str(pin_num), coord)) + except Exception: + pass + + # --- 3. Map labels to coordinates (labels name nets) --- + label_at: dict[tuple[float, float], str] = {} + + # Local labels + try: + for label in sch.labels.all(): + text = getattr(label, "text", None) + pos = getattr(label, "position", None) + if text and pos: + coord = _rc(*_coord_from_point(pos)) + _find(coord) + label_at[coord] = str(text) + except Exception: + pass + + # Hierarchical / global labels + try: + for label in getattr(sch, "hierarchical_labels", []): + text = getattr(label, "text", None) + pos = getattr(label, "position", None) + if text and pos: + coord = _rc(*_coord_from_point(pos)) + _find(coord) + label_at[coord] = str(text) + except Exception: + pass + + # Power symbols — their value acts as a net name (e.g. GND, VCC) + for comp in getattr(sch, "components", []): + ref = getattr(comp, "reference", None) + if not ref or not str(ref).startswith("#PWR"): + continue + value = getattr(comp, "value", None) + if not value: + continue + try: + for _pin_num, pos in sch.list_component_pins(str(ref)): + coord = _rc(pos.x, pos.y) + _find(coord) + label_at[coord] = str(value) + except Exception: + pass + + # --- 4. Merge groups sharing the same label text --- + label_text_to_roots: dict[str, list[tuple[float, float]]] = {} + for coord, text in label_at.items(): + root = _find(coord) + label_text_to_roots.setdefault(text, []).append(root) + + for roots in label_text_to_roots.values(): + for i in range(1, len(roots)): + _union(roots[0], roots[i]) + + # --- 5. Build net groups --- + groups: dict[tuple[float, float], list[tuple[float, float]]] = {} + for coord in _parent: + root = _find(coord) + groups.setdefault(root, []).append(coord) + + # --- 6. Convert to net graph --- + net_graph: dict[str, list[dict[str, str]]] = {} + auto_counter = 0 + + for _root_coord, coords in groups.items(): + pins_in_net: list[dict[str, str]] = [] + for coord in coords: + pins_in_net.extend(pin_at.get(coord, [])) + + if not pins_in_net: + continue # wire-only or label-only group + + # Find a label for the net name + net_name = None + for coord in coords: + if coord in label_at: + net_name = label_at[coord] + break + + if net_name is None: + auto_counter += 1 + net_name = f"Net-{auto_counter}" + + # Merge into existing net (labels can unify multiple groups) + if net_name in net_graph: + net_graph[net_name].extend(pins_in_net) + else: + net_graph[net_name] = pins_in_net + + # --- 7. Find unconnected pins --- + # A pin is unconnected if no wire endpoint and no label touches it + unconnected: list[dict[str, str]] = [] + for ref, pin_num, coord in all_pins: + if coord not in wire_coords and coord not in label_at: + unconnected.append({"reference": ref, "pin": pin_num}) + + return net_graph, unconnected + + +# --------------------------------------------------------------------------- +# Tools +# --------------------------------------------------------------------------- + + +@mcp.tool() +def run_schematic_erc( + schematic_path: str, + severity: str = "all", +) -> dict[str, Any]: + """Run an Electrical Rules Check (ERC) on a KiCad schematic. + + Detects issues such as unconnected pins, conflicting pin types, missing + power flags, and label mismatches. Tries kicad-sch-api's + ``ElectricalRulesChecker`` first, then falls back to kicad-cli. + + The summary (pass/fail, counts by severity) is returned inline. The + full violation list is written to ``.mckicad/erc_violations.json`` when + it exceeds the inline threshold. + + Args: + schematic_path: Path to a .kicad_sch file. + severity: Filter violations by severity -- "all", "error", or + "warning". Defaults to "all". + + Returns: + Dictionary with ``passed``, ``violation_count``, ``by_severity``, + and optionally ``detail_file`` when the full list is offloaded. + """ + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + schematic_path = _expand(schematic_path) + severity = severity.lower().strip() + if severity not in ("all", "error", "warning"): + return { + "success": False, + "error": f"Invalid severity filter: {severity}. Must be 'all', 'error', or 'warning'.", + } + + # --- Attempt 1: kicad-sch-api --- + if _HAS_SCH_API: + try: + from kicad_sch_api import ElectricalRulesChecker # type: ignore[attr-defined] + + sch = _ksa_load(schematic_path) + checker = ElectricalRulesChecker(sch) + violations = checker.run() + + logger.info("ERC via kicad-sch-api found %d violation(s)", len(violations)) + return _format_erc_result(schematic_path, violations, severity, "kicad-sch-api") + + except (ImportError, AttributeError) as exc: + logger.info( + "kicad-sch-api ElectricalRulesChecker not available (%s), trying kicad-cli", + exc, + ) + except Exception as exc: + logger.warning("kicad-sch-api ERC failed (%s), trying kicad-cli", exc) + + # --- Attempt 2: kicad-cli --- + cli_path, cli_err = _require_kicad_cli() + if cli_err: + # Neither engine is available + api_err = _require_sch_api() + if api_err: + return { + "success": False, + "error": ( + "No ERC engine available. Install kicad-sch-api (uv add kicad-sch-api) " + "or ensure kicad-cli is on your PATH." + ), + } + return cli_err + assert cli_path is not None + + try: + with tempfile.TemporaryDirectory(prefix="mckicad_erc_") as tmp: + report_path = os.path.join(tmp, "erc_report.json") + + cmd_args = ["sch", "erc", "--format", "json"] + if severity == "all": + cmd_args.append("--severity-all") + elif severity == "error": + cmd_args.extend(["--severity-error"]) + elif severity == "warning": + cmd_args.extend(["--severity-warning"]) + + cmd_args.extend(["-o", report_path, schematic_path]) + cmd = [cli_path] + cmd_args + + logger.info("Running ERC via kicad-cli: %s", " ".join(cmd)) + + import subprocess + + result = subprocess.run( # nosec B603 + cmd, + capture_output=True, + text=True, + timeout=TIMEOUT_CONSTANTS["kicad_cli_export"], + check=False, + ) + + if not os.path.isfile(report_path): + stderr = result.stderr.strip() if result.stderr else "ERC report not created" + return {"success": False, "error": f"kicad-cli ERC failed: {stderr}"} + + with open(report_path) as f: + try: + report = json.load(f) + except json.JSONDecodeError as exc: + return {"success": False, "error": f"Failed to parse ERC JSON: {exc}"} + + violations = report.get("violations", report.get("errors", [])) + logger.info("ERC via kicad-cli found %d violation(s)", len(violations)) + return _format_erc_result(schematic_path, violations, severity, "kicad-cli") + + except Exception as exc: + logger.error("ERC check failed: %s", exc, exc_info=True) + return {"success": False, "error": str(exc)} + + +def _format_erc_result( + schematic_path: str, + violations: list[Any], + severity_filter: str, + engine: str, +) -> dict[str, Any]: + """Build the ERC return dict, offloading large violation lists to a sidecar file.""" + # Normalise violation objects to dicts + normalised: list[dict[str, Any]] = [] + for v in violations: + if isinstance(v, dict): + normalised.append(v) + else: + normalised.append({ + "message": getattr(v, "message", str(v)), + "severity": getattr(v, "severity", "unknown"), + "location": getattr(v, "location", None), + }) + + # Apply severity filter for kicad-sch-api results (kicad-cli already filtered) + if severity_filter != "all" and engine == "kicad-sch-api": + normalised = [ + v for v in normalised + if v.get("severity", "").lower() == severity_filter + ] + + # Count by severity + by_severity: dict[str, int] = {} + for v in normalised: + sev = str(v.get("severity", "unknown")).lower() + by_severity[sev] = by_severity.get(sev, 0) + 1 + + passed = len(normalised) == 0 + + result: dict[str, Any] = { + "success": True, + "passed": passed, + "violation_count": len(normalised), + "by_severity": by_severity, + "engine": engine, + "schematic_path": schematic_path, + } + + if len(normalised) > INLINE_RESULT_THRESHOLD: + detail_path = write_detail_file(schematic_path, "erc_violations.json", normalised) + result["detail_file"] = detail_path + result["violations_preview"] = normalised[:INLINE_RESULT_THRESHOLD] + else: + result["violations"] = normalised + + return result + + +@mcp.tool() +def analyze_connectivity(schematic_path: str) -> dict[str, Any]: + """Analyze the net connectivity graph of a KiCad schematic. + + Walks every net in the schematic and reports the pins connected to each + net. Use this to understand how components are wired together or to + identify floating (unconnected) pins. + + The summary (net count, connection count, unconnected pin count) is + returned inline. The full net-to-pin mapping is written to + ``.mckicad/connectivity.json``. + + Args: + schematic_path: Path to a .kicad_sch file. + + Returns: + Dictionary with ``net_count``, ``connection_count``, + ``unconnected_pins``, and ``detail_file``. + """ + err = _require_sch_api() + if err: + return err + + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + schematic_path = _expand(schematic_path) + + try: + sch = _ksa_load(schematic_path) + net_graph, unconnected = _build_connectivity(sch) + total_connections = sum(len(pins) for pins in net_graph.values()) + + detail_path = write_detail_file(schematic_path, "connectivity.json", { + "nets": net_graph, + "unconnected_pins": unconnected, + }) + + logger.info( + "Connectivity analysis: %d nets, %d connections, %d unconnected pins", + len(net_graph), + total_connections, + len(unconnected), + ) + + return { + "success": True, + "net_count": len(net_graph), + "connection_count": total_connections, + "unconnected_pins": len(unconnected), + "detail_file": detail_path, + "engine": _get_schematic_engine(), + "schematic_path": schematic_path, + } + + except Exception as exc: + logger.error("Connectivity analysis failed for %s: %s", schematic_path, exc, exc_info=True) + return {"success": False, "error": str(exc), "schematic_path": schematic_path} + + +@mcp.tool() +def check_pin_connection( + schematic_path: str, + reference: str, + pin: str, +) -> dict[str, Any]: + """Check what a specific component pin is connected to. + + Given a reference designator (e.g. ``U1``) and a pin identifier + (e.g. ``3`` or ``VCC``), returns the net name and a list of all other + pins on the same net. + + Args: + schematic_path: Path to a .kicad_sch file. + reference: Component reference designator (e.g. ``R1``, ``U3``). + pin: Pin identifier -- number or name (e.g. ``1``, ``A``, ``MOSI``). + + Returns: + Dictionary with ``net`` name and ``connected_to`` list of + ``{reference, pin}`` dicts for every other pin on the same net. + """ + err = _require_sch_api() + if err: + return err + + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + schematic_path = _expand(schematic_path) + + try: + sch = _ksa_load(schematic_path) + net_graph, _unconnected = _build_connectivity(sch) + + # Find which net this pin belongs to + net_name = None + connected_pins: list[dict[str, str]] = [] + + for n_name, n_pins in net_graph.items(): + for p in n_pins: + if p.get("reference") == reference and p.get("pin") == pin: + net_name = n_name + connected_pins = [ + pp for pp in n_pins + if not (pp.get("reference") == reference and pp.get("pin") == pin) + ] + break + if net_name is not None: + break + + if net_name is None: + return { + "success": False, + "error": f"Pin {reference}.{pin} not found in any net", + "schematic_path": schematic_path, + } + + logger.info( + "Pin %s.%s is on net '%s' with %d other connection(s)", + reference, pin, net_name, len(connected_pins), + ) + + return { + "success": True, + "reference": reference, + "pin": pin, + "net": net_name, + "connected_to": connected_pins, + "engine": _get_schematic_engine(), + "schematic_path": schematic_path, + } + + except Exception as exc: + logger.error("check_pin_connection failed for %s.%s: %s", reference, pin, exc, exc_info=True) + return {"success": False, "error": str(exc), "schematic_path": schematic_path} + + +@mcp.tool() +def verify_pins_connected( + schematic_path: str, + ref1: str, + pin1: str, + ref2: str, + pin2: str, +) -> dict[str, Any]: + """Check whether two component pins are electrically connected (on the same net). + + A quick boolean probe -- returns ``connected: true`` or ``connected: false`` + without dumping the entire net graph. Useful for verifying that a + specific wire was drawn correctly. + + Args: + schematic_path: Path to a .kicad_sch file. + ref1: First component reference designator (e.g. ``R1``). + pin1: Pin identifier on the first component (e.g. ``1``). + ref2: Second component reference designator (e.g. ``C3``). + pin2: Pin identifier on the second component (e.g. ``2``). + + Returns: + Dictionary with ``connected`` boolean, plus ``from`` and ``to`` + endpoint descriptions. + """ + err = _require_sch_api() + if err: + return err + + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + schematic_path = _expand(schematic_path) + + try: + sch = _ksa_load(schematic_path) + net_graph, _unconnected = _build_connectivity(sch) + + # Find the net for each pin + net_for_pin1 = None + net_for_pin2 = None + + for n_name, n_pins in net_graph.items(): + for p in n_pins: + if p.get("reference") == ref1 and p.get("pin") == pin1: + net_for_pin1 = n_name + if p.get("reference") == ref2 and p.get("pin") == pin2: + net_for_pin2 = n_name + if net_for_pin1 and net_for_pin2: + break + + connected = ( + net_for_pin1 is not None + and net_for_pin2 is not None + and net_for_pin1 == net_for_pin2 + ) + + logger.info( + "verify_pins_connected: %s.%s <-> %s.%s = %s", + ref1, pin1, ref2, pin2, connected, + ) + + result: dict[str, Any] = { + "success": True, + "connected": connected, + "from": {"reference": ref1, "pin": pin1}, + "to": {"reference": ref2, "pin": pin2}, + "engine": _get_schematic_engine(), + "schematic_path": schematic_path, + } + if connected and net_for_pin1: + result["net"] = net_for_pin1 + return result + + except Exception as exc: + logger.error( + "verify_pins_connected failed for %s.%s <-> %s.%s: %s", + ref1, pin1, ref2, pin2, exc, exc_info=True, + ) + return {"success": False, "error": str(exc), "schematic_path": schematic_path} + + +@mcp.tool() +def get_component_pins( + schematic_path: str, + reference: str, +) -> dict[str, Any]: + """List all pins for a specific component in a KiCad schematic. + + Returns pin names/numbers, electrical types, positions, and connection + status for the component identified by its reference designator. + + Args: + schematic_path: Path to a .kicad_sch file. + reference: Component reference designator (e.g. ``U1``, ``R3``). + + Returns: + Dictionary with ``reference``, ``pin_count``, and ``pins`` list + where each entry describes one pin. + """ + err = _require_sch_api() + if err: + return err + + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + schematic_path = _expand(schematic_path) + + try: + sch = _ksa_load(schematic_path) + + # Locate the component + comp = None + components_attr = getattr(sch, "components", None) + + if components_attr is not None: + # Try dict-style access + if hasattr(components_attr, "get"): + comp = components_attr.get(reference) + + # Fall back to iteration + if comp is None: + for c in components_attr: + if getattr(c, "reference", None) == reference: + comp = c + break + + if comp is None: + return { + "success": False, + "error": f"Component '{reference}' not found in schematic", + "schematic_path": schematic_path, + } + + # Extract pins — comp.pins may be empty on loaded schematics because + # kicad-sch-api doesn't back-populate from lib_symbols on load. + # Use get_symbol_definition().pins for metadata and + # sch.list_component_pins() for schematic-transformed positions. + pins_data: list[dict[str, Any]] = [] + + # Build a position lookup from list_component_pins (schematic coords) + pin_positions: dict[str, Any] = {} + with contextlib.suppress(Exception): + for pin_num, pos in sch.list_component_pins(reference): + pin_positions[str(pin_num)] = pos + + # Get pin metadata from the symbol definition (always populated) + raw_pins = None + with contextlib.suppress(Exception): + sym_def = comp.get_symbol_definition() + if sym_def is not None: + raw_pins = sym_def.pins + + # Fall back to comp.pins / comp.list_pins() if symbol def unavailable + if not raw_pins: + if hasattr(comp, "list_pins"): + with contextlib.suppress(Exception): + raw_pins = comp.list_pins() + if not raw_pins: + raw_pins = getattr(comp, "pins", []) + + for p in raw_pins: + pin_entry: dict[str, Any] = {} + + if isinstance(p, dict): + pin_entry = p + else: + pin_num = str(getattr(p, "number", getattr(p, "pin_number", ""))) + pin_entry["number"] = pin_num + pin_entry["name"] = getattr(p, "name", getattr(p, "pin_name", None)) + pin_entry["type"] = str(getattr(p, "pin_type", getattr(p, "electrical_type", getattr(p, "type", None)))) + + # Prefer schematic-transformed position over local pin position + if pin_num in pin_positions: + sch_pos = pin_positions[pin_num] + pin_entry["position"] = {"x": sch_pos.x, "y": sch_pos.y} + else: + local_pos = getattr(p, "position", None) + if local_pos is not None: + if isinstance(local_pos, (list, tuple)) and len(local_pos) >= 2: + pin_entry["position"] = {"x": local_pos[0], "y": local_pos[1]} + elif hasattr(local_pos, "x"): + pin_entry["position"] = {"x": local_pos.x, "y": local_pos.y} + else: + pin_entry["position"] = str(local_pos) + + is_connected = getattr(p, "connected", getattr(p, "is_connected", None)) + if is_connected is not None: + pin_entry["connected"] = bool(is_connected) + + net = getattr(p, "net", getattr(p, "net_name", None)) + if net is not None: + pin_entry["net"] = str(net) + + pins_data.append(pin_entry) + + logger.info("Component %s has %d pin(s)", reference, len(pins_data)) + + return { + "success": True, + "reference": reference, + "lib_id": getattr(comp, "lib_id", None), + "value": getattr(comp, "value", None), + "pin_count": len(pins_data), + "pins": pins_data, + "engine": _get_schematic_engine(), + "schematic_path": schematic_path, + } + + except Exception as exc: + logger.error("get_component_pins failed for %s: %s", reference, exc, exc_info=True) + return {"success": False, "error": str(exc), "schematic_path": schematic_path} + + +@mcp.tool() +def export_netlist( + schematic_path: str, + output_path: str | None = None, + format: str = "kicad", +) -> dict[str, Any]: + """Export a netlist from a KiCad schematic via kicad-cli. + + Supported formats: ``kicad`` (default), ``spice``, ``cadstar``, + ``allegro``, ``pads``, ``orcadpcb2``. The output file is written to + the ``.mckicad/`` sidecar directory by default. + + Args: + schematic_path: Path to a .kicad_sch file. + output_path: Destination file path. Defaults to + ``.mckicad/netlist.`` next to the schematic. + format: Netlist format name. + + Returns: + Dictionary with ``output_path`` and ``format``. + """ + # Validate format first (cheap check, no filesystem access) + allowed_formats = ("kicad", "spice", "cadstar", "allegro", "pads", "orcadpcb2") + fmt = format.lower().strip() + if fmt not in allowed_formats: + return { + "success": False, + "error": f"Unsupported netlist format: {format}. Allowed: {', '.join(allowed_formats)}", + } + + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + schematic_path = _expand(schematic_path) + + cli_path, cli_err = _require_kicad_cli() + if cli_err: + return cli_err + assert cli_path is not None + + # Determine file extension based on format + ext_map = { + "kicad": ".net", + "spice": ".cir", + "cadstar": ".frp", + "allegro": ".net", + "pads": ".asc", + "orcadpcb2": ".net", + } + extension = ext_map.get(fmt, ".net") + + if output_path: + output_path = _expand(output_path) + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + else: + output_path = _default_output_path(schematic_path, f"netlist{extension}") + + try: + import subprocess + + cmd = [cli_path, "sch", "export", "netlist", "-f", fmt, "-o", output_path, schematic_path] + + logger.info("Exporting netlist: %s", " ".join(cmd)) + + result = subprocess.run( # nosec B603 + cmd, + capture_output=True, + text=True, + timeout=TIMEOUT_CONSTANTS["kicad_cli_export"], + check=False, + ) + + if result.returncode != 0: + stderr = result.stderr.strip() if result.stderr else "Netlist export failed" + logger.error("Netlist export failed (rc=%d): %s", result.returncode, stderr) + return {"success": False, "error": f"kicad-cli netlist export failed: {stderr}"} + + if not os.path.isfile(output_path): + return {"success": False, "error": "Netlist output file was not created"} + + file_size = os.path.getsize(output_path) + logger.info("Netlist exported: %s (%d bytes, format=%s)", output_path, file_size, fmt) + + return { + "success": True, + "output_path": output_path, + "format": fmt, + "file_size": file_size, + "schematic_path": schematic_path, + } + + except Exception as exc: + logger.error("Netlist export failed: %s", exc, exc_info=True) + return {"success": False, "error": str(exc), "schematic_path": schematic_path} + + +@mcp.tool() +def export_schematic_pdf( + schematic_path: str, + output_path: str | None = None, + black_and_white: bool = False, + exclude_background: bool = False, +) -> dict[str, Any]: + """Export a KiCad schematic to PDF via kicad-cli. + + Renders all schematic sheets into a single PDF document. Useful for + design reviews, archival, or sharing with collaborators who don't + have KiCad installed. + + Args: + schematic_path: Path to a .kicad_sch file (top-level sheet). + output_path: Destination PDF path. Defaults to + ``.mckicad/schematic.pdf`` next to the schematic. + black_and_white: Render in black and white instead of colour. + exclude_background: Omit the background fill from the PDF. + + Returns: + Dictionary with ``output_path`` and file size. + """ + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + schematic_path = _expand(schematic_path) + + cli_path, cli_err = _require_kicad_cli() + if cli_err: + return cli_err + assert cli_path is not None + + if output_path: + output_path = _expand(output_path) + os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True) + else: + output_path = _default_output_path(schematic_path, "schematic.pdf") + + try: + import subprocess + + cmd = [cli_path, "sch", "export", "pdf"] + + if black_and_white: + cmd.append("--black-and-white") + if exclude_background: + cmd.append("--no-background-color") + + cmd.extend(["-o", output_path, schematic_path]) + + logger.info("Exporting schematic PDF: %s", " ".join(cmd)) + + result = subprocess.run( # nosec B603 + cmd, + capture_output=True, + text=True, + timeout=TIMEOUT_CONSTANTS["kicad_cli_export"], + check=False, + ) + + if result.returncode != 0: + stderr = result.stderr.strip() if result.stderr else "PDF export failed" + logger.error("Schematic PDF export failed (rc=%d): %s", result.returncode, stderr) + return {"success": False, "error": f"kicad-cli PDF export failed: {stderr}"} + + if not os.path.isfile(output_path): + return {"success": False, "error": "PDF output file was not created"} + + file_size = os.path.getsize(output_path) + logger.info("Schematic PDF exported: %s (%d bytes)", output_path, file_size) + + return { + "success": True, + "output_path": output_path, + "file_size": file_size, + "black_and_white": black_and_white, + "exclude_background": exclude_background, + "schematic_path": schematic_path, + } + + except Exception as exc: + logger.error("Schematic PDF export failed: %s", exc, exc_info=True) + return {"success": False, "error": str(exc), "schematic_path": schematic_path} diff --git a/tests/test_schematic.py b/tests/test_schematic.py index b42704c..f34770f 100644 --- a/tests/test_schematic.py +++ b/tests/test_schematic.py @@ -3,6 +3,7 @@ import os import pytest +from tests.conftest import requires_sch_api @pytest.mark.unit @@ -46,3 +47,101 @@ def test_list_components_empty_schematic(tmp_output_dir): result = list_components(schematic_path=path) if result["success"]: assert result.get("count", 0) == 0 + + +@requires_sch_api +@pytest.mark.unit +def test_list_components_single_lookup(populated_schematic): + """list_components with reference param should return one component.""" + from mckicad.tools.schematic import list_components + + result = list_components(schematic_path=populated_schematic, reference="R1") + assert result["success"] is True + assert result["count"] == 1 + assert result["components"][0]["reference"] == "R1" + + +@requires_sch_api +@pytest.mark.unit +def test_list_components_single_lookup_not_found(populated_schematic): + """list_components with nonexistent reference should fail.""" + from mckicad.tools.schematic import list_components + + result = list_components(schematic_path=populated_schematic, reference="Z99") + assert result["success"] is False + + +@requires_sch_api +@pytest.mark.unit +def test_get_schematic_info_compact(populated_schematic): + """get_schematic_info should return compact output.""" + from mckicad.tools.schematic import get_schematic_info + + result = get_schematic_info(schematic_path=populated_schematic) + assert result["success"] is True + assert "statistics" in result + assert "validation" in result + assert "unique_symbol_count" in result + + +@requires_sch_api +@pytest.mark.unit +def test_get_component_detail(populated_schematic): + """get_component_detail should return full info for one component.""" + from mckicad.tools.schematic import get_component_detail + + result = get_component_detail(schematic_path=populated_schematic, reference="R1") + assert result["success"] is True + assert result["component"]["reference"] == "R1" + assert result["component"]["lib_id"] == "Device:R" + + +@requires_sch_api +@pytest.mark.unit +def test_get_component_detail_not_found(populated_schematic): + """get_component_detail for missing component should fail.""" + from mckicad.tools.schematic import get_component_detail + + result = get_component_detail(schematic_path=populated_schematic, reference="Z99") + assert result["success"] is False + + +@requires_sch_api +@pytest.mark.unit +def test_get_schematic_hierarchy(populated_schematic): + """get_schematic_hierarchy should return at least the root sheet.""" + from mckicad.tools.schematic import get_schematic_hierarchy + + result = get_schematic_hierarchy(schematic_path=populated_schematic) + assert result["success"] is True + assert result["hierarchy"]["name"] == "root" + assert result["hierarchy"]["total_sheets"] >= 1 + + +@pytest.mark.unit +def test_file_output_infrastructure(tmp_output_dir): + """write_detail_file should create .mckicad sidecar directory and file.""" + from mckicad.utils.file_utils import write_detail_file + + fake_sch = os.path.join(tmp_output_dir, "test.kicad_sch") + # File doesn't need to exist for write_detail_file + open(fake_sch, "w").close() + + data = {"test": "data", "items": [1, 2, 3]} + path = write_detail_file(fake_sch, "test_output.json", data) + + assert os.path.isfile(path) + assert ".mckicad" in path + assert path.endswith("test_output.json") + + +@pytest.mark.unit +def test_file_output_cwd_fallback(tmp_output_dir, monkeypatch): + """write_detail_file with None path should use CWD.""" + from mckicad.utils.file_utils import write_detail_file + + monkeypatch.chdir(tmp_output_dir) + path = write_detail_file(None, "test_cwd.json", {"test": True}) + + assert os.path.isfile(path) + assert ".mckicad" in path diff --git a/tests/test_schematic_analysis.py b/tests/test_schematic_analysis.py new file mode 100644 index 0000000..9042e54 --- /dev/null +++ b/tests/test_schematic_analysis.py @@ -0,0 +1,139 @@ +"""Tests for schematic analysis tools.""" + +import pytest +from tests.conftest import requires_sch_api + + +@requires_sch_api +@pytest.mark.unit +class TestRunSchematicErc: + """Tests for the run_schematic_erc tool.""" + + def test_erc_on_populated_schematic(self, populated_schematic): + from mckicad.tools.schematic_analysis import run_schematic_erc + + result = run_schematic_erc(schematic_path=populated_schematic) + assert result["success"] is True + assert "violation_count" in result or "error" not in result + + def test_erc_invalid_path(self): + from mckicad.tools.schematic_analysis import run_schematic_erc + + result = run_schematic_erc(schematic_path="/tmp/nonexistent.kicad_sch") + assert result["success"] is False + + +@requires_sch_api +@pytest.mark.unit +class TestAnalyzeConnectivity: + """Tests for the analyze_connectivity tool.""" + + def test_connectivity_on_populated(self, populated_schematic): + from mckicad.tools.schematic_analysis import analyze_connectivity + + result = analyze_connectivity(schematic_path=populated_schematic) + assert result["success"] is True + assert "net_count" in result or "error" not in result + + def test_connectivity_invalid_path(self): + from mckicad.tools.schematic_analysis import analyze_connectivity + + result = analyze_connectivity(schematic_path="/tmp/nonexistent.kicad_sch") + assert result["success"] is False + + +@requires_sch_api +@pytest.mark.unit +class TestCheckPinConnection: + """Tests for the check_pin_connection tool.""" + + def test_check_existing_pin(self, populated_schematic): + from mckicad.tools.schematic_analysis import check_pin_connection + + result = check_pin_connection( + schematic_path=populated_schematic, + reference="R1", + pin="1", + ) + # May succeed or fail depending on kicad-sch-api version + assert "success" in result + + def test_check_nonexistent_pin(self, populated_schematic): + from mckicad.tools.schematic_analysis import check_pin_connection + + result = check_pin_connection( + schematic_path=populated_schematic, + reference="Z99", + pin="1", + ) + assert "success" in result + + +@requires_sch_api +@pytest.mark.unit +class TestVerifyPinsConnected: + """Tests for the verify_pins_connected tool.""" + + def test_verify_two_pins(self, populated_schematic): + from mckicad.tools.schematic_analysis import verify_pins_connected + + result = verify_pins_connected( + schematic_path=populated_schematic, + ref1="R1", + pin1="1", + ref2="R2", + pin2="1", + ) + # May succeed or fail depending on kicad-sch-api version + assert "success" in result + + +@requires_sch_api +@pytest.mark.unit +class TestGetComponentPins: + """Tests for the get_component_pins tool.""" + + def test_get_pins(self, populated_schematic): + from mckicad.tools.schematic_analysis import get_component_pins + + result = get_component_pins( + schematic_path=populated_schematic, + reference="R1", + ) + assert "success" in result + + def test_get_pins_nonexistent(self, populated_schematic): + from mckicad.tools.schematic_analysis import get_component_pins + + result = get_component_pins( + schematic_path=populated_schematic, + reference="Z99", + ) + assert result["success"] is False + + +@pytest.mark.unit +class TestExportValidation: + """Tests for input validation in export tools.""" + + def test_export_netlist_invalid_path(self): + from mckicad.tools.schematic_analysis import export_netlist + + result = export_netlist(schematic_path="/tmp/nonexistent.kicad_sch") + assert result["success"] is False + + def test_export_pdf_invalid_path(self): + from mckicad.tools.schematic_analysis import export_schematic_pdf + + result = export_schematic_pdf(schematic_path="/tmp/nonexistent.kicad_sch") + assert result["success"] is False + + def test_export_netlist_bad_format(self): + from mckicad.tools.schematic_analysis import export_netlist + + result = export_netlist( + schematic_path="/tmp/test.kicad_sch", + format="invalid_format", + ) + assert result["success"] is False + assert "Unsupported" in result.get("error", "")