Fix pin extraction, connectivity, hierarchy, and label counting
Root cause: kicad-sch-api doesn't back-populate comp.pins or sch.nets
on loaded schematics. All data is accessible through alternative APIs.
Pin extraction: use comp.get_symbol_definition().pins for metadata and
sch.list_component_pins(ref) for schematic-transformed positions.
Connectivity: new wire-walking engine using union-find on coordinates.
Walks wires, pin positions, labels, and power symbols to reconstruct
the net graph. Replaces broken ConnectivityAnalyzer/sch.nets fallbacks.
Eliminates 'unhashable type: Net' crash.
Hierarchy: use sch.sheets.get_sheet_hierarchy() instead of the broken
sheets.data.get("sheet", []) raw dict approach.
Labels: supplement sch.get_statistics() with sch.labels.get_statistics()
and sch.hierarchical_labels for accurate counts.
99 tests passing, lint clean.
This commit is contained in:
parent
ce65035a17
commit
b7e4fc6859
@ -6,11 +6,16 @@ Designed so that the underlying engine can be swapped to kipy IPC once KiCad
|
|||||||
exposes a schematic API over its IPC transport.
|
exposes a schematic API over its IPC transport.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from collections import Counter
|
||||||
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import re
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
|
from mckicad.config import INLINE_RESULT_THRESHOLD
|
||||||
from mckicad.server import mcp
|
from mckicad.server import mcp
|
||||||
|
from mckicad.utils.file_utils import write_detail_file
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -245,6 +250,23 @@ def search_components(query: str, library: str | None = None) -> dict[str, Any]:
|
|||||||
results.append(entry)
|
results.append(entry)
|
||||||
|
|
||||||
logger.info("Symbol search for '%s' returned %d results", query, len(results))
|
logger.info("Symbol search for '%s' returned %d results", query, len(results))
|
||||||
|
|
||||||
|
# Large result → sidecar file with top results inline
|
||||||
|
if len(results) > INLINE_RESULT_THRESHOLD:
|
||||||
|
safe_query = re.sub(r"[^\w\-]", "_", query)[:30]
|
||||||
|
detail_path = write_detail_file(None, f"search_{safe_query}.json", results)
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"query": query,
|
||||||
|
"library": library,
|
||||||
|
"count": len(results),
|
||||||
|
"results": results[:10],
|
||||||
|
"truncated": True,
|
||||||
|
"detail_file": detail_path,
|
||||||
|
"hint": f"Showing first 10 of {len(results)} results. Full list in detail_file.",
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"success": True,
|
"success": True,
|
||||||
"query": query,
|
"query": query,
|
||||||
@ -535,19 +557,29 @@ def add_hierarchical_sheet(
|
|||||||
|
|
||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def list_components(schematic_path: str) -> dict[str, Any]:
|
def list_components(
|
||||||
"""List all components placed in a KiCad schematic.
|
schematic_path: str,
|
||||||
|
reference: str | None = None,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""List components in a KiCad schematic, or look up a single component.
|
||||||
|
|
||||||
Returns reference designators, library IDs, values, and positions for
|
When called without ``reference``, returns all components. For large
|
||||||
every symbol instance on the schematic. Useful for verifying placement
|
schematics (>{threshold} components), a compact summary is returned
|
||||||
or preparing to wire components with ``connect_pins``.
|
inline and the full list is written to ``.mckicad/components.json``
|
||||||
|
— read that file for complete data.
|
||||||
|
|
||||||
|
When ``reference`` is provided, returns only that component's details
|
||||||
|
(always inline, always compact).
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
schematic_path: Path to a .kicad_sch file.
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
reference: Optional reference designator to look up a single
|
||||||
|
component (e.g. ``U1``, ``R42``).
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary with ``success``, ``count``, and a ``components`` list.
|
Dictionary with ``success``, ``count``, and either inline
|
||||||
"""
|
``components`` list or a ``detail_file`` path.
|
||||||
|
""".replace("{threshold}", str(INLINE_RESULT_THRESHOLD))
|
||||||
err = _require_sch_api()
|
err = _require_sch_api()
|
||||||
if err:
|
if err:
|
||||||
return err
|
return err
|
||||||
@ -577,7 +609,43 @@ def list_components(schematic_path: str) -> dict[str, Any]:
|
|||||||
|
|
||||||
components.append(entry)
|
components.append(entry)
|
||||||
|
|
||||||
|
# Single-component lookup
|
||||||
|
if reference:
|
||||||
|
match = [c for c in components if c.get("reference") == reference]
|
||||||
|
if not match:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": f"Component '{reference}' not found in schematic",
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"count": 1,
|
||||||
|
"components": match,
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("Listed %d components in %s", len(components), schematic_path)
|
logger.info("Listed %d components in %s", len(components), schematic_path)
|
||||||
|
|
||||||
|
# Large result → sidecar file
|
||||||
|
if len(components) > INLINE_RESULT_THRESHOLD:
|
||||||
|
prefix_counts = Counter(
|
||||||
|
re.match(r"[A-Za-z]+", c.get("reference", "") or "").group() # type: ignore[union-attr]
|
||||||
|
for c in components
|
||||||
|
if c.get("reference") and re.match(r"[A-Za-z]+", c.get("reference", ""))
|
||||||
|
)
|
||||||
|
detail_path = write_detail_file(schematic_path, "components.json", components)
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"count": len(components),
|
||||||
|
"breakdown": dict(prefix_counts),
|
||||||
|
"detail_file": detail_path,
|
||||||
|
"hint": "Full component list written to detail_file. Read it for complete data.",
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"success": True,
|
"success": True,
|
||||||
"count": len(components),
|
"count": len(components),
|
||||||
@ -592,17 +660,19 @@ def list_components(schematic_path: str) -> dict[str, Any]:
|
|||||||
|
|
||||||
@mcp.tool()
|
@mcp.tool()
|
||||||
def get_schematic_info(schematic_path: str) -> dict[str, Any]:
|
def get_schematic_info(schematic_path: str) -> dict[str, Any]:
|
||||||
"""Get metadata, statistics, and validation results for a KiCad schematic.
|
"""Get a compact overview of a KiCad schematic.
|
||||||
|
|
||||||
Provides a single-call overview of the schematic including component
|
Returns statistics and a validation summary inline. For schematics with
|
||||||
counts, wire counts, label inventory, and any validation issues
|
many unique symbols, full symbol details and validation issues are
|
||||||
detected by the parser.
|
written to ``.mckicad/schematic_info.json`` — read that file when you
|
||||||
|
need per-symbol data.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
schematic_path: Path to a .kicad_sch file.
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Dictionary with ``success``, ``statistics``, and ``validation`` data.
|
Dictionary with ``success``, ``statistics``, ``validation`` summary,
|
||||||
|
``unique_symbol_count``, and optionally a ``detail_file`` path.
|
||||||
"""
|
"""
|
||||||
err = _require_sch_api()
|
err = _require_sch_api()
|
||||||
if err:
|
if err:
|
||||||
@ -619,12 +689,30 @@ def get_schematic_info(schematic_path: str) -> dict[str, Any]:
|
|||||||
# Gather statistics
|
# Gather statistics
|
||||||
stats = sch.get_statistics()
|
stats = sch.get_statistics()
|
||||||
|
|
||||||
|
# Supplement label statistics — sch.get_statistics() often reports 0
|
||||||
|
# for labels because the raw data parser doesn't count them.
|
||||||
|
# Use sch.labels.get_statistics() for accurate counts.
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
label_stats = sch.labels.get_statistics()
|
||||||
|
if isinstance(label_stats, dict):
|
||||||
|
if "text_elements" not in stats or not isinstance(stats.get("text_elements"), dict):
|
||||||
|
stats["text_elements"] = {}
|
||||||
|
label_count = label_stats.get("total_labels", label_stats.get("item_count", 0))
|
||||||
|
stats["text_elements"]["label"] = label_count
|
||||||
|
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
hier_labels = list(getattr(sch, "hierarchical_labels", []))
|
||||||
|
if "text_elements" not in stats or not isinstance(stats.get("text_elements"), dict):
|
||||||
|
stats["text_elements"] = {}
|
||||||
|
stats["text_elements"]["global_label"] = len(hier_labels)
|
||||||
|
stats["text_elements"]["total_text_elements"] = (
|
||||||
|
stats["text_elements"].get("label", 0) + len(hier_labels)
|
||||||
|
)
|
||||||
|
|
||||||
# Run validation
|
# Run validation
|
||||||
issues = sch.validate()
|
issues = sch.validate()
|
||||||
|
|
||||||
# Try to extract symbol-level details via get_symbol_info for
|
# Collect symbol details (for sidecar file)
|
||||||
# each unique lib_id in the schematic, but don't fail if the
|
|
||||||
# function is unavailable or individual lookups fail.
|
|
||||||
lib_ids_seen: set[str] = set()
|
lib_ids_seen: set[str] = set()
|
||||||
symbol_details: list[dict[str, Any]] = []
|
symbol_details: list[dict[str, Any]] = []
|
||||||
for comp in sch.components:
|
for comp in sch.components:
|
||||||
@ -645,10 +733,9 @@ def get_schematic_info(schematic_path: str) -> dict[str, Any]:
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
# Non-critical -- just skip symbols we can't look up
|
|
||||||
symbol_details.append({"lib_id": lid, "lookup_failed": True})
|
symbol_details.append({"lib_id": lid, "lookup_failed": True})
|
||||||
|
|
||||||
# Normalise stats and issues to dicts if they aren't already
|
# Normalise stats and issues
|
||||||
if not isinstance(stats, dict):
|
if not isinstance(stats, dict):
|
||||||
stats = {"raw": str(stats)}
|
stats = {"raw": str(stats)}
|
||||||
if not isinstance(issues, list):
|
if not isinstance(issues, list):
|
||||||
@ -657,20 +744,294 @@ def get_schematic_info(schematic_path: str) -> dict[str, Any]:
|
|||||||
validation_passed = len(issues) == 0
|
validation_passed = len(issues) == 0
|
||||||
|
|
||||||
logger.info("Retrieved info for %s: %d issues", schematic_path, len(issues))
|
logger.info("Retrieved info for %s: %d issues", schematic_path, len(issues))
|
||||||
return {
|
|
||||||
|
result: dict[str, Any] = {
|
||||||
"success": True,
|
"success": True,
|
||||||
"schematic_path": schematic_path,
|
"schematic_path": schematic_path,
|
||||||
"statistics": stats,
|
"statistics": stats,
|
||||||
"validation": {
|
"validation": {
|
||||||
"passed": validation_passed,
|
"passed": validation_passed,
|
||||||
"issue_count": len(issues),
|
"issue_count": len(issues),
|
||||||
"issues": issues,
|
|
||||||
},
|
},
|
||||||
"symbol_details": symbol_details,
|
"unique_symbol_count": len(symbol_details),
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Write large detail data to sidecar file
|
||||||
|
if len(symbol_details) > INLINE_RESULT_THRESHOLD or len(issues) > INLINE_RESULT_THRESHOLD:
|
||||||
|
detail_data = {
|
||||||
|
"symbol_details": symbol_details,
|
||||||
|
"validation_issues": issues,
|
||||||
|
}
|
||||||
|
result["detail_file"] = write_detail_file(
|
||||||
|
schematic_path, "schematic_info.json", detail_data
|
||||||
|
)
|
||||||
|
result["hint"] = (
|
||||||
|
"Symbol details and validation issues written to detail_file. "
|
||||||
|
"Read it for per-symbol data."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Small enough to return inline
|
||||||
|
result["validation"]["issues"] = issues
|
||||||
|
result["symbol_details"] = symbol_details
|
||||||
|
|
||||||
|
return result
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to get schematic info for %s: %s", schematic_path, e)
|
||||||
|
return {"success": False, "error": str(e), "schematic_path": schematic_path}
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def get_component_detail(schematic_path: str, reference: str) -> dict[str, Any]:
|
||||||
|
"""Get full details for a single component: properties, footprint, pins, position.
|
||||||
|
|
||||||
|
Always returns a compact inline result (single component). Use this
|
||||||
|
for deep inspection after ``list_components`` identifies a component
|
||||||
|
of interest.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
reference: Reference designator (e.g. ``U1``, ``R42``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``success`` and detailed component data including
|
||||||
|
properties, pin list, footprint, and validation results.
|
||||||
|
"""
|
||||||
|
err = _require_sch_api()
|
||||||
|
if err:
|
||||||
|
return err
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
try:
|
||||||
|
sch = _ksa_load(schematic_path)
|
||||||
|
comp = sch.components.get(reference)
|
||||||
|
if comp is None:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": f"Component '{reference}' not found in schematic",
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
detail: dict[str, Any] = {
|
||||||
|
"reference": reference,
|
||||||
|
"lib_id": getattr(comp, "lib_id", None),
|
||||||
|
"value": getattr(comp, "value", None),
|
||||||
|
"footprint": getattr(comp, "footprint", None),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Position
|
||||||
|
pos = getattr(comp, "position", None)
|
||||||
|
if pos is not None:
|
||||||
|
if isinstance(pos, (list, tuple)) and len(pos) >= 2:
|
||||||
|
detail["position"] = {"x": pos[0], "y": pos[1]}
|
||||||
|
else:
|
||||||
|
detail["position"] = str(pos)
|
||||||
|
|
||||||
|
# Rotation
|
||||||
|
detail["rotation"] = getattr(comp, "rotation", None)
|
||||||
|
|
||||||
|
# Properties
|
||||||
|
try:
|
||||||
|
if hasattr(comp, "to_dict"):
|
||||||
|
comp_dict = comp.to_dict()
|
||||||
|
detail["properties"] = comp_dict.get("properties", {})
|
||||||
|
elif hasattr(comp, "properties"):
|
||||||
|
detail["properties"] = (
|
||||||
|
dict(comp.properties.items())
|
||||||
|
if hasattr(comp.properties, "items")
|
||||||
|
else str(comp.properties)
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
detail["properties"] = None
|
||||||
|
|
||||||
|
# Pins — use get_symbol_definition().pins for metadata and
|
||||||
|
# sch.list_component_pins() for schematic-transformed positions.
|
||||||
|
# comp.pins is empty on loaded schematics (kicad-sch-api gap).
|
||||||
|
pins: list[dict[str, Any]] = []
|
||||||
|
|
||||||
|
pin_positions: dict[str, Any] = {}
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
for pin_num, pos in sch.list_component_pins(reference):
|
||||||
|
pin_positions[str(pin_num)] = pos
|
||||||
|
|
||||||
|
raw_pins: list[Any] | None = None
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
sym_def = comp.get_symbol_definition()
|
||||||
|
if sym_def is not None:
|
||||||
|
raw_pins = list(sym_def.pins)
|
||||||
|
|
||||||
|
if not raw_pins:
|
||||||
|
if hasattr(comp, "list_pins"):
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
raw_pins = list(comp.list_pins())
|
||||||
|
if not raw_pins:
|
||||||
|
raw_pins = list(getattr(comp, "pins", []))
|
||||||
|
|
||||||
|
for p in (raw_pins or []):
|
||||||
|
if isinstance(p, dict):
|
||||||
|
pins.append(p)
|
||||||
|
else:
|
||||||
|
pin_num = str(getattr(p, "number", getattr(p, "pin_number", "")))
|
||||||
|
pin_entry: dict[str, Any] = {
|
||||||
|
"number": pin_num,
|
||||||
|
"name": getattr(p, "name", getattr(p, "pin_name", None)),
|
||||||
|
"type": str(getattr(p, "pin_type", getattr(p, "electrical_type", getattr(p, "type", None)))),
|
||||||
|
}
|
||||||
|
if pin_num in pin_positions:
|
||||||
|
spos = pin_positions[pin_num]
|
||||||
|
pin_entry["position"] = {"x": spos.x, "y": spos.y}
|
||||||
|
else:
|
||||||
|
ppos = getattr(p, "position", None)
|
||||||
|
if ppos is not None:
|
||||||
|
if hasattr(ppos, "x"):
|
||||||
|
pin_entry["position"] = {"x": ppos.x, "y": ppos.y}
|
||||||
|
else:
|
||||||
|
pin_entry["position"] = str(ppos)
|
||||||
|
pins.append(pin_entry)
|
||||||
|
|
||||||
|
detail["pins"] = pins
|
||||||
|
detail["pin_count"] = len(pins)
|
||||||
|
|
||||||
|
# BOM/board flags
|
||||||
|
detail["in_bom"] = getattr(comp, "in_bom", None)
|
||||||
|
detail["on_board"] = getattr(comp, "on_board", None)
|
||||||
|
|
||||||
|
# Validation
|
||||||
|
try:
|
||||||
|
if hasattr(comp, "validate"):
|
||||||
|
detail["validation"] = comp.validate()
|
||||||
|
except Exception:
|
||||||
|
detail["validation"] = None
|
||||||
|
|
||||||
|
logger.info("Got detail for %s in %s", reference, schematic_path)
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"component": detail,
|
||||||
|
"schematic_path": schematic_path,
|
||||||
"engine": _get_schematic_engine(),
|
"engine": _get_schematic_engine(),
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("Failed to get schematic info for %s: %s", schematic_path, e)
|
logger.error("Failed to get detail for %s in %s: %s", reference, schematic_path, e)
|
||||||
|
return {"success": False, "error": str(e), "schematic_path": schematic_path}
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def get_schematic_hierarchy(schematic_path: str) -> dict[str, Any]:
|
||||||
|
"""Get the hierarchical sheet tree of a KiCad schematic.
|
||||||
|
|
||||||
|
Returns the sheet structure with filenames and per-sheet component
|
||||||
|
counts. Essential for multi-sheet designs — use this to understand
|
||||||
|
the design's organization before diving into specific sheets.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to the root .kicad_sch file.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``success`` and a ``hierarchy`` tree of sheets.
|
||||||
|
"""
|
||||||
|
err = _require_sch_api()
|
||||||
|
if err:
|
||||||
|
return err
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
try:
|
||||||
|
sch = _ksa_load(schematic_path)
|
||||||
|
schematic_dir = os.path.dirname(schematic_path)
|
||||||
|
|
||||||
|
def _count_components(filepath: str) -> int:
|
||||||
|
"""Load a schematic and count its components. Returns -1 on failure."""
|
||||||
|
if not os.path.isfile(filepath):
|
||||||
|
return -1
|
||||||
|
try:
|
||||||
|
sheet_sch = _ksa_load(filepath)
|
||||||
|
return len(list(sheet_sch.components))
|
||||||
|
except Exception:
|
||||||
|
return -1
|
||||||
|
|
||||||
|
def _walk_hierarchy(node: dict[str, Any], base_dir: str) -> dict[str, Any]:
|
||||||
|
"""Convert a SheetManager hierarchy node into our output format."""
|
||||||
|
filename = node.get("filename", "")
|
||||||
|
info: dict[str, Any] = {
|
||||||
|
"name": node.get("name", "unknown"),
|
||||||
|
"filename": filename,
|
||||||
|
}
|
||||||
|
|
||||||
|
if filename:
|
||||||
|
filepath = os.path.join(base_dir, filename)
|
||||||
|
count = _count_components(filepath)
|
||||||
|
if count >= 0:
|
||||||
|
info["component_count"] = count
|
||||||
|
else:
|
||||||
|
info["component_count"] = 0
|
||||||
|
info["note"] = "file not found or failed to load"
|
||||||
|
else:
|
||||||
|
info["component_count"] = 0
|
||||||
|
|
||||||
|
children = node.get("children", [])
|
||||||
|
if children:
|
||||||
|
info["sheets"] = [_walk_hierarchy(child, base_dir) for child in children]
|
||||||
|
info["total_sheets"] = 1 + sum(
|
||||||
|
c.get("total_sheets", 1) for c in info["sheets"]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
info["total_sheets"] = 1
|
||||||
|
|
||||||
|
return info
|
||||||
|
|
||||||
|
# Use SheetManager API — sch.sheets.get_sheet_hierarchy() returns a
|
||||||
|
# tree with children already identified from the S-expression data.
|
||||||
|
hierarchy_data = None
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
hierarchy_data = sch.sheets.get_sheet_hierarchy()
|
||||||
|
|
||||||
|
if hierarchy_data and isinstance(hierarchy_data, dict) and "root" in hierarchy_data:
|
||||||
|
root_node = hierarchy_data["root"]
|
||||||
|
|
||||||
|
root_info: dict[str, Any] = {
|
||||||
|
"name": "root",
|
||||||
|
"filename": os.path.basename(schematic_path),
|
||||||
|
"component_count": len(list(sch.components)),
|
||||||
|
}
|
||||||
|
|
||||||
|
children = root_node.get("children", [])
|
||||||
|
if children:
|
||||||
|
root_info["sheets"] = [
|
||||||
|
_walk_hierarchy(child, schematic_dir) for child in children
|
||||||
|
]
|
||||||
|
root_info["total_sheets"] = 1 + sum(
|
||||||
|
c.get("total_sheets", 1) for c in root_info["sheets"]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
root_info["total_sheets"] = 1
|
||||||
|
|
||||||
|
hierarchy = root_info
|
||||||
|
else:
|
||||||
|
# Fallback: report just the root sheet
|
||||||
|
hierarchy = {
|
||||||
|
"name": "root",
|
||||||
|
"filename": os.path.basename(schematic_path),
|
||||||
|
"component_count": len(list(sch.components)),
|
||||||
|
"total_sheets": 1,
|
||||||
|
"note": "Sheet hierarchy API unavailable — only root sheet reported",
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Got hierarchy for %s: %d total sheets", schematic_path, hierarchy["total_sheets"])
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"hierarchy": hierarchy,
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to get hierarchy for %s: %s", schematic_path, e)
|
||||||
return {"success": False, "error": str(e), "schematic_path": schematic_path}
|
return {"success": False, "error": str(e), "schematic_path": schematic_path}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
996
src/mckicad/tools/schematic_analysis.py
Normal file
996
src/mckicad/tools/schematic_analysis.py
Normal file
@ -0,0 +1,996 @@
|
|||||||
|
"""
|
||||||
|
Schematic analysis and export tools for the mckicad MCP server.
|
||||||
|
|
||||||
|
Provides ERC checking, connectivity analysis, pin-level queries, netlist
|
||||||
|
export, and PDF export. Uses kicad-sch-api for structural analysis and
|
||||||
|
kicad-cli for rule checks and file exports, falling back gracefully when
|
||||||
|
either dependency is unavailable.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from mckicad.config import INLINE_RESULT_THRESHOLD, TIMEOUT_CONSTANTS
|
||||||
|
from mckicad.server import mcp
|
||||||
|
from mckicad.utils.file_utils import write_detail_file
|
||||||
|
from mckicad.utils.kicad_cli import find_kicad_cli
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Engine detection
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_HAS_SCH_API = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
from kicad_sch_api import load_schematic as _ksa_load
|
||||||
|
|
||||||
|
_HAS_SCH_API = True
|
||||||
|
except ImportError:
|
||||||
|
logger.warning(
|
||||||
|
"kicad-sch-api not installed -- schematic analysis tools will return helpful errors. "
|
||||||
|
"Install with: uv add kicad-sch-api"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _require_sch_api() -> dict[str, Any] | None:
|
||||||
|
"""Return an error dict if kicad-sch-api is unavailable, else None."""
|
||||||
|
if not _HAS_SCH_API:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": "kicad-sch-api is not installed. Install it with: uv add kicad-sch-api",
|
||||||
|
"engine": "none",
|
||||||
|
}
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_schematic_engine() -> str:
|
||||||
|
if _HAS_SCH_API:
|
||||||
|
return "kicad-sch-api"
|
||||||
|
return "none"
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_schematic_path(path: str, must_exist: bool = True) -> dict[str, Any] | None:
|
||||||
|
"""Validate a schematic file path. Returns an error dict on failure, else None."""
|
||||||
|
if not path:
|
||||||
|
return {"success": False, "error": "Schematic path must be a non-empty string"}
|
||||||
|
expanded = os.path.expanduser(path)
|
||||||
|
if not expanded.endswith(".kicad_sch"):
|
||||||
|
return {"success": False, "error": f"Path must end with .kicad_sch, got: {path}"}
|
||||||
|
if must_exist and not os.path.isfile(expanded):
|
||||||
|
return {"success": False, "error": f"Schematic file not found: {expanded}"}
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _expand(path: str) -> str:
|
||||||
|
"""Expand ~ and return an absolute path."""
|
||||||
|
return os.path.abspath(os.path.expanduser(path))
|
||||||
|
|
||||||
|
|
||||||
|
def _require_kicad_cli() -> tuple[str, None] | tuple[None, dict[str, Any]]:
|
||||||
|
"""Find kicad-cli. Returns (cli_path, None) or (None, error_dict)."""
|
||||||
|
cli_path = find_kicad_cli()
|
||||||
|
if cli_path is None:
|
||||||
|
return None, {
|
||||||
|
"success": False,
|
||||||
|
"error": (
|
||||||
|
"kicad-cli not found. Install KiCad or set the KICAD_CLI_PATH "
|
||||||
|
"environment variable."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
return cli_path, None
|
||||||
|
|
||||||
|
|
||||||
|
def _sidecar_dir(schematic_path: str) -> str:
|
||||||
|
"""Return the .mckicad/ directory next to a schematic, creating it if needed."""
|
||||||
|
parent = os.path.dirname(os.path.abspath(schematic_path))
|
||||||
|
sidecar = os.path.join(parent, ".mckicad")
|
||||||
|
os.makedirs(sidecar, exist_ok=True)
|
||||||
|
return sidecar
|
||||||
|
|
||||||
|
|
||||||
|
def _default_output_path(schematic_path: str, filename: str) -> str:
|
||||||
|
"""Build a default output path inside the .mckicad/ sidecar directory."""
|
||||||
|
return os.path.join(_sidecar_dir(schematic_path), filename)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Wire-walking connectivity builder (union-find)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _build_connectivity(sch: Any) -> tuple[dict[str, list[dict[str, str]]], list[dict[str, str]]]:
|
||||||
|
"""Build a net connectivity graph by walking wires, pin positions, and labels.
|
||||||
|
|
||||||
|
kicad-sch-api does not auto-compute nets on loaded schematics. This
|
||||||
|
function reconstructs the connectivity by:
|
||||||
|
|
||||||
|
1. Collecting all wire start/end coordinates
|
||||||
|
2. Mapping component pin positions to coordinates
|
||||||
|
3. Mapping label positions to coordinates
|
||||||
|
4. Using union-find to group touching points into nets
|
||||||
|
5. Merging groups that share the same label text
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(net_graph, unconnected_pins) where net_graph maps net names to
|
||||||
|
lists of {reference, pin} dicts, and unconnected_pins lists
|
||||||
|
pins not connected to any wire or label.
|
||||||
|
"""
|
||||||
|
# -- Union-Find --
|
||||||
|
_parent: dict[tuple[float, float], tuple[float, float]] = {}
|
||||||
|
|
||||||
|
def _find(p: tuple[float, float]) -> tuple[float, float]:
|
||||||
|
if p not in _parent:
|
||||||
|
_parent[p] = p
|
||||||
|
while _parent[p] != p:
|
||||||
|
_parent[p] = _parent[_parent[p]] # path compression
|
||||||
|
p = _parent[p]
|
||||||
|
return p
|
||||||
|
|
||||||
|
def _union(a: tuple[float, float], b: tuple[float, float]) -> None:
|
||||||
|
ra, rb = _find(a), _find(b)
|
||||||
|
if ra != rb:
|
||||||
|
_parent[ra] = rb
|
||||||
|
|
||||||
|
def _rc(x: float, y: float) -> tuple[float, float]:
|
||||||
|
"""Round coordinates for stable floating-point comparison."""
|
||||||
|
return (round(x, 3), round(y, 3))
|
||||||
|
|
||||||
|
def _coord_from_point(pt: Any) -> tuple[float, float]:
|
||||||
|
"""Extract (x, y) from a Point-like object or sequence."""
|
||||||
|
if hasattr(pt, "x"):
|
||||||
|
return (float(pt.x), float(pt.y))
|
||||||
|
if isinstance(pt, (list, tuple)) and len(pt) >= 2:
|
||||||
|
return (float(pt[0]), float(pt[1]))
|
||||||
|
return (0.0, 0.0)
|
||||||
|
|
||||||
|
# --- 1. Process wires — each wire connects its start and end ---
|
||||||
|
wire_coords: set[tuple[float, float]] = set()
|
||||||
|
for wire in getattr(sch, "wires", []):
|
||||||
|
start = getattr(wire, "start", None)
|
||||||
|
end = getattr(wire, "end", None)
|
||||||
|
if start is None or end is None:
|
||||||
|
continue
|
||||||
|
s = _rc(*_coord_from_point(start))
|
||||||
|
e = _rc(*_coord_from_point(end))
|
||||||
|
_find(s)
|
||||||
|
_find(e)
|
||||||
|
_union(s, e)
|
||||||
|
wire_coords.add(s)
|
||||||
|
wire_coords.add(e)
|
||||||
|
|
||||||
|
# --- 2. Map component pin positions to coordinates ---
|
||||||
|
pin_at: dict[tuple[float, float], list[dict[str, str]]] = {}
|
||||||
|
all_pins: list[tuple[str, str, tuple[float, float]]] = []
|
||||||
|
|
||||||
|
for comp in getattr(sch, "components", []):
|
||||||
|
ref = getattr(comp, "reference", None)
|
||||||
|
if not ref:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
for pin_num, pos in sch.list_component_pins(str(ref)):
|
||||||
|
coord = _rc(pos.x, pos.y)
|
||||||
|
_find(coord)
|
||||||
|
entry = {"reference": str(ref), "pin": str(pin_num)}
|
||||||
|
pin_at.setdefault(coord, []).append(entry)
|
||||||
|
all_pins.append((str(ref), str(pin_num), coord))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# --- 3. Map labels to coordinates (labels name nets) ---
|
||||||
|
label_at: dict[tuple[float, float], str] = {}
|
||||||
|
|
||||||
|
# Local labels
|
||||||
|
try:
|
||||||
|
for label in sch.labels.all():
|
||||||
|
text = getattr(label, "text", None)
|
||||||
|
pos = getattr(label, "position", None)
|
||||||
|
if text and pos:
|
||||||
|
coord = _rc(*_coord_from_point(pos))
|
||||||
|
_find(coord)
|
||||||
|
label_at[coord] = str(text)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Hierarchical / global labels
|
||||||
|
try:
|
||||||
|
for label in getattr(sch, "hierarchical_labels", []):
|
||||||
|
text = getattr(label, "text", None)
|
||||||
|
pos = getattr(label, "position", None)
|
||||||
|
if text and pos:
|
||||||
|
coord = _rc(*_coord_from_point(pos))
|
||||||
|
_find(coord)
|
||||||
|
label_at[coord] = str(text)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Power symbols — their value acts as a net name (e.g. GND, VCC)
|
||||||
|
for comp in getattr(sch, "components", []):
|
||||||
|
ref = getattr(comp, "reference", None)
|
||||||
|
if not ref or not str(ref).startswith("#PWR"):
|
||||||
|
continue
|
||||||
|
value = getattr(comp, "value", None)
|
||||||
|
if not value:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
for _pin_num, pos in sch.list_component_pins(str(ref)):
|
||||||
|
coord = _rc(pos.x, pos.y)
|
||||||
|
_find(coord)
|
||||||
|
label_at[coord] = str(value)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# --- 4. Merge groups sharing the same label text ---
|
||||||
|
label_text_to_roots: dict[str, list[tuple[float, float]]] = {}
|
||||||
|
for coord, text in label_at.items():
|
||||||
|
root = _find(coord)
|
||||||
|
label_text_to_roots.setdefault(text, []).append(root)
|
||||||
|
|
||||||
|
for roots in label_text_to_roots.values():
|
||||||
|
for i in range(1, len(roots)):
|
||||||
|
_union(roots[0], roots[i])
|
||||||
|
|
||||||
|
# --- 5. Build net groups ---
|
||||||
|
groups: dict[tuple[float, float], list[tuple[float, float]]] = {}
|
||||||
|
for coord in _parent:
|
||||||
|
root = _find(coord)
|
||||||
|
groups.setdefault(root, []).append(coord)
|
||||||
|
|
||||||
|
# --- 6. Convert to net graph ---
|
||||||
|
net_graph: dict[str, list[dict[str, str]]] = {}
|
||||||
|
auto_counter = 0
|
||||||
|
|
||||||
|
for _root_coord, coords in groups.items():
|
||||||
|
pins_in_net: list[dict[str, str]] = []
|
||||||
|
for coord in coords:
|
||||||
|
pins_in_net.extend(pin_at.get(coord, []))
|
||||||
|
|
||||||
|
if not pins_in_net:
|
||||||
|
continue # wire-only or label-only group
|
||||||
|
|
||||||
|
# Find a label for the net name
|
||||||
|
net_name = None
|
||||||
|
for coord in coords:
|
||||||
|
if coord in label_at:
|
||||||
|
net_name = label_at[coord]
|
||||||
|
break
|
||||||
|
|
||||||
|
if net_name is None:
|
||||||
|
auto_counter += 1
|
||||||
|
net_name = f"Net-{auto_counter}"
|
||||||
|
|
||||||
|
# Merge into existing net (labels can unify multiple groups)
|
||||||
|
if net_name in net_graph:
|
||||||
|
net_graph[net_name].extend(pins_in_net)
|
||||||
|
else:
|
||||||
|
net_graph[net_name] = pins_in_net
|
||||||
|
|
||||||
|
# --- 7. Find unconnected pins ---
|
||||||
|
# A pin is unconnected if no wire endpoint and no label touches it
|
||||||
|
unconnected: list[dict[str, str]] = []
|
||||||
|
for ref, pin_num, coord in all_pins:
|
||||||
|
if coord not in wire_coords and coord not in label_at:
|
||||||
|
unconnected.append({"reference": ref, "pin": pin_num})
|
||||||
|
|
||||||
|
return net_graph, unconnected
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tools
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def run_schematic_erc(
|
||||||
|
schematic_path: str,
|
||||||
|
severity: str = "all",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Run an Electrical Rules Check (ERC) on a KiCad schematic.
|
||||||
|
|
||||||
|
Detects issues such as unconnected pins, conflicting pin types, missing
|
||||||
|
power flags, and label mismatches. Tries kicad-sch-api's
|
||||||
|
``ElectricalRulesChecker`` first, then falls back to kicad-cli.
|
||||||
|
|
||||||
|
The summary (pass/fail, counts by severity) is returned inline. The
|
||||||
|
full violation list is written to ``.mckicad/erc_violations.json`` when
|
||||||
|
it exceeds the inline threshold.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
severity: Filter violations by severity -- "all", "error", or
|
||||||
|
"warning". Defaults to "all".
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``passed``, ``violation_count``, ``by_severity``,
|
||||||
|
and optionally ``detail_file`` when the full list is offloaded.
|
||||||
|
"""
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
severity = severity.lower().strip()
|
||||||
|
if severity not in ("all", "error", "warning"):
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": f"Invalid severity filter: {severity}. Must be 'all', 'error', or 'warning'.",
|
||||||
|
}
|
||||||
|
|
||||||
|
# --- Attempt 1: kicad-sch-api ---
|
||||||
|
if _HAS_SCH_API:
|
||||||
|
try:
|
||||||
|
from kicad_sch_api import ElectricalRulesChecker # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
sch = _ksa_load(schematic_path)
|
||||||
|
checker = ElectricalRulesChecker(sch)
|
||||||
|
violations = checker.run()
|
||||||
|
|
||||||
|
logger.info("ERC via kicad-sch-api found %d violation(s)", len(violations))
|
||||||
|
return _format_erc_result(schematic_path, violations, severity, "kicad-sch-api")
|
||||||
|
|
||||||
|
except (ImportError, AttributeError) as exc:
|
||||||
|
logger.info(
|
||||||
|
"kicad-sch-api ElectricalRulesChecker not available (%s), trying kicad-cli",
|
||||||
|
exc,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("kicad-sch-api ERC failed (%s), trying kicad-cli", exc)
|
||||||
|
|
||||||
|
# --- Attempt 2: kicad-cli ---
|
||||||
|
cli_path, cli_err = _require_kicad_cli()
|
||||||
|
if cli_err:
|
||||||
|
# Neither engine is available
|
||||||
|
api_err = _require_sch_api()
|
||||||
|
if api_err:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": (
|
||||||
|
"No ERC engine available. Install kicad-sch-api (uv add kicad-sch-api) "
|
||||||
|
"or ensure kicad-cli is on your PATH."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
return cli_err
|
||||||
|
assert cli_path is not None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with tempfile.TemporaryDirectory(prefix="mckicad_erc_") as tmp:
|
||||||
|
report_path = os.path.join(tmp, "erc_report.json")
|
||||||
|
|
||||||
|
cmd_args = ["sch", "erc", "--format", "json"]
|
||||||
|
if severity == "all":
|
||||||
|
cmd_args.append("--severity-all")
|
||||||
|
elif severity == "error":
|
||||||
|
cmd_args.extend(["--severity-error"])
|
||||||
|
elif severity == "warning":
|
||||||
|
cmd_args.extend(["--severity-warning"])
|
||||||
|
|
||||||
|
cmd_args.extend(["-o", report_path, schematic_path])
|
||||||
|
cmd = [cli_path] + cmd_args
|
||||||
|
|
||||||
|
logger.info("Running ERC via kicad-cli: %s", " ".join(cmd))
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
result = subprocess.run( # nosec B603
|
||||||
|
cmd,
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=TIMEOUT_CONSTANTS["kicad_cli_export"],
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not os.path.isfile(report_path):
|
||||||
|
stderr = result.stderr.strip() if result.stderr else "ERC report not created"
|
||||||
|
return {"success": False, "error": f"kicad-cli ERC failed: {stderr}"}
|
||||||
|
|
||||||
|
with open(report_path) as f:
|
||||||
|
try:
|
||||||
|
report = json.load(f)
|
||||||
|
except json.JSONDecodeError as exc:
|
||||||
|
return {"success": False, "error": f"Failed to parse ERC JSON: {exc}"}
|
||||||
|
|
||||||
|
violations = report.get("violations", report.get("errors", []))
|
||||||
|
logger.info("ERC via kicad-cli found %d violation(s)", len(violations))
|
||||||
|
return _format_erc_result(schematic_path, violations, severity, "kicad-cli")
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("ERC check failed: %s", exc, exc_info=True)
|
||||||
|
return {"success": False, "error": str(exc)}
|
||||||
|
|
||||||
|
|
||||||
|
def _format_erc_result(
|
||||||
|
schematic_path: str,
|
||||||
|
violations: list[Any],
|
||||||
|
severity_filter: str,
|
||||||
|
engine: str,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Build the ERC return dict, offloading large violation lists to a sidecar file."""
|
||||||
|
# Normalise violation objects to dicts
|
||||||
|
normalised: list[dict[str, Any]] = []
|
||||||
|
for v in violations:
|
||||||
|
if isinstance(v, dict):
|
||||||
|
normalised.append(v)
|
||||||
|
else:
|
||||||
|
normalised.append({
|
||||||
|
"message": getattr(v, "message", str(v)),
|
||||||
|
"severity": getattr(v, "severity", "unknown"),
|
||||||
|
"location": getattr(v, "location", None),
|
||||||
|
})
|
||||||
|
|
||||||
|
# Apply severity filter for kicad-sch-api results (kicad-cli already filtered)
|
||||||
|
if severity_filter != "all" and engine == "kicad-sch-api":
|
||||||
|
normalised = [
|
||||||
|
v for v in normalised
|
||||||
|
if v.get("severity", "").lower() == severity_filter
|
||||||
|
]
|
||||||
|
|
||||||
|
# Count by severity
|
||||||
|
by_severity: dict[str, int] = {}
|
||||||
|
for v in normalised:
|
||||||
|
sev = str(v.get("severity", "unknown")).lower()
|
||||||
|
by_severity[sev] = by_severity.get(sev, 0) + 1
|
||||||
|
|
||||||
|
passed = len(normalised) == 0
|
||||||
|
|
||||||
|
result: dict[str, Any] = {
|
||||||
|
"success": True,
|
||||||
|
"passed": passed,
|
||||||
|
"violation_count": len(normalised),
|
||||||
|
"by_severity": by_severity,
|
||||||
|
"engine": engine,
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(normalised) > INLINE_RESULT_THRESHOLD:
|
||||||
|
detail_path = write_detail_file(schematic_path, "erc_violations.json", normalised)
|
||||||
|
result["detail_file"] = detail_path
|
||||||
|
result["violations_preview"] = normalised[:INLINE_RESULT_THRESHOLD]
|
||||||
|
else:
|
||||||
|
result["violations"] = normalised
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def analyze_connectivity(schematic_path: str) -> dict[str, Any]:
|
||||||
|
"""Analyze the net connectivity graph of a KiCad schematic.
|
||||||
|
|
||||||
|
Walks every net in the schematic and reports the pins connected to each
|
||||||
|
net. Use this to understand how components are wired together or to
|
||||||
|
identify floating (unconnected) pins.
|
||||||
|
|
||||||
|
The summary (net count, connection count, unconnected pin count) is
|
||||||
|
returned inline. The full net-to-pin mapping is written to
|
||||||
|
``.mckicad/connectivity.json``.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``net_count``, ``connection_count``,
|
||||||
|
``unconnected_pins``, and ``detail_file``.
|
||||||
|
"""
|
||||||
|
err = _require_sch_api()
|
||||||
|
if err:
|
||||||
|
return err
|
||||||
|
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
sch = _ksa_load(schematic_path)
|
||||||
|
net_graph, unconnected = _build_connectivity(sch)
|
||||||
|
total_connections = sum(len(pins) for pins in net_graph.values())
|
||||||
|
|
||||||
|
detail_path = write_detail_file(schematic_path, "connectivity.json", {
|
||||||
|
"nets": net_graph,
|
||||||
|
"unconnected_pins": unconnected,
|
||||||
|
})
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Connectivity analysis: %d nets, %d connections, %d unconnected pins",
|
||||||
|
len(net_graph),
|
||||||
|
total_connections,
|
||||||
|
len(unconnected),
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"net_count": len(net_graph),
|
||||||
|
"connection_count": total_connections,
|
||||||
|
"unconnected_pins": len(unconnected),
|
||||||
|
"detail_file": detail_path,
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Connectivity analysis failed for %s: %s", schematic_path, exc, exc_info=True)
|
||||||
|
return {"success": False, "error": str(exc), "schematic_path": schematic_path}
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def check_pin_connection(
|
||||||
|
schematic_path: str,
|
||||||
|
reference: str,
|
||||||
|
pin: str,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Check what a specific component pin is connected to.
|
||||||
|
|
||||||
|
Given a reference designator (e.g. ``U1``) and a pin identifier
|
||||||
|
(e.g. ``3`` or ``VCC``), returns the net name and a list of all other
|
||||||
|
pins on the same net.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
reference: Component reference designator (e.g. ``R1``, ``U3``).
|
||||||
|
pin: Pin identifier -- number or name (e.g. ``1``, ``A``, ``MOSI``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``net`` name and ``connected_to`` list of
|
||||||
|
``{reference, pin}`` dicts for every other pin on the same net.
|
||||||
|
"""
|
||||||
|
err = _require_sch_api()
|
||||||
|
if err:
|
||||||
|
return err
|
||||||
|
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
sch = _ksa_load(schematic_path)
|
||||||
|
net_graph, _unconnected = _build_connectivity(sch)
|
||||||
|
|
||||||
|
# Find which net this pin belongs to
|
||||||
|
net_name = None
|
||||||
|
connected_pins: list[dict[str, str]] = []
|
||||||
|
|
||||||
|
for n_name, n_pins in net_graph.items():
|
||||||
|
for p in n_pins:
|
||||||
|
if p.get("reference") == reference and p.get("pin") == pin:
|
||||||
|
net_name = n_name
|
||||||
|
connected_pins = [
|
||||||
|
pp for pp in n_pins
|
||||||
|
if not (pp.get("reference") == reference and pp.get("pin") == pin)
|
||||||
|
]
|
||||||
|
break
|
||||||
|
if net_name is not None:
|
||||||
|
break
|
||||||
|
|
||||||
|
if net_name is None:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": f"Pin {reference}.{pin} not found in any net",
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Pin %s.%s is on net '%s' with %d other connection(s)",
|
||||||
|
reference, pin, net_name, len(connected_pins),
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"reference": reference,
|
||||||
|
"pin": pin,
|
||||||
|
"net": net_name,
|
||||||
|
"connected_to": connected_pins,
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("check_pin_connection failed for %s.%s: %s", reference, pin, exc, exc_info=True)
|
||||||
|
return {"success": False, "error": str(exc), "schematic_path": schematic_path}
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def verify_pins_connected(
|
||||||
|
schematic_path: str,
|
||||||
|
ref1: str,
|
||||||
|
pin1: str,
|
||||||
|
ref2: str,
|
||||||
|
pin2: str,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Check whether two component pins are electrically connected (on the same net).
|
||||||
|
|
||||||
|
A quick boolean probe -- returns ``connected: true`` or ``connected: false``
|
||||||
|
without dumping the entire net graph. Useful for verifying that a
|
||||||
|
specific wire was drawn correctly.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
ref1: First component reference designator (e.g. ``R1``).
|
||||||
|
pin1: Pin identifier on the first component (e.g. ``1``).
|
||||||
|
ref2: Second component reference designator (e.g. ``C3``).
|
||||||
|
pin2: Pin identifier on the second component (e.g. ``2``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``connected`` boolean, plus ``from`` and ``to``
|
||||||
|
endpoint descriptions.
|
||||||
|
"""
|
||||||
|
err = _require_sch_api()
|
||||||
|
if err:
|
||||||
|
return err
|
||||||
|
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
sch = _ksa_load(schematic_path)
|
||||||
|
net_graph, _unconnected = _build_connectivity(sch)
|
||||||
|
|
||||||
|
# Find the net for each pin
|
||||||
|
net_for_pin1 = None
|
||||||
|
net_for_pin2 = None
|
||||||
|
|
||||||
|
for n_name, n_pins in net_graph.items():
|
||||||
|
for p in n_pins:
|
||||||
|
if p.get("reference") == ref1 and p.get("pin") == pin1:
|
||||||
|
net_for_pin1 = n_name
|
||||||
|
if p.get("reference") == ref2 and p.get("pin") == pin2:
|
||||||
|
net_for_pin2 = n_name
|
||||||
|
if net_for_pin1 and net_for_pin2:
|
||||||
|
break
|
||||||
|
|
||||||
|
connected = (
|
||||||
|
net_for_pin1 is not None
|
||||||
|
and net_for_pin2 is not None
|
||||||
|
and net_for_pin1 == net_for_pin2
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"verify_pins_connected: %s.%s <-> %s.%s = %s",
|
||||||
|
ref1, pin1, ref2, pin2, connected,
|
||||||
|
)
|
||||||
|
|
||||||
|
result: dict[str, Any] = {
|
||||||
|
"success": True,
|
||||||
|
"connected": connected,
|
||||||
|
"from": {"reference": ref1, "pin": pin1},
|
||||||
|
"to": {"reference": ref2, "pin": pin2},
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
if connected and net_for_pin1:
|
||||||
|
result["net"] = net_for_pin1
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error(
|
||||||
|
"verify_pins_connected failed for %s.%s <-> %s.%s: %s",
|
||||||
|
ref1, pin1, ref2, pin2, exc, exc_info=True,
|
||||||
|
)
|
||||||
|
return {"success": False, "error": str(exc), "schematic_path": schematic_path}
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def get_component_pins(
|
||||||
|
schematic_path: str,
|
||||||
|
reference: str,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""List all pins for a specific component in a KiCad schematic.
|
||||||
|
|
||||||
|
Returns pin names/numbers, electrical types, positions, and connection
|
||||||
|
status for the component identified by its reference designator.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
reference: Component reference designator (e.g. ``U1``, ``R3``).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``reference``, ``pin_count``, and ``pins`` list
|
||||||
|
where each entry describes one pin.
|
||||||
|
"""
|
||||||
|
err = _require_sch_api()
|
||||||
|
if err:
|
||||||
|
return err
|
||||||
|
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
|
||||||
|
try:
|
||||||
|
sch = _ksa_load(schematic_path)
|
||||||
|
|
||||||
|
# Locate the component
|
||||||
|
comp = None
|
||||||
|
components_attr = getattr(sch, "components", None)
|
||||||
|
|
||||||
|
if components_attr is not None:
|
||||||
|
# Try dict-style access
|
||||||
|
if hasattr(components_attr, "get"):
|
||||||
|
comp = components_attr.get(reference)
|
||||||
|
|
||||||
|
# Fall back to iteration
|
||||||
|
if comp is None:
|
||||||
|
for c in components_attr:
|
||||||
|
if getattr(c, "reference", None) == reference:
|
||||||
|
comp = c
|
||||||
|
break
|
||||||
|
|
||||||
|
if comp is None:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": f"Component '{reference}' not found in schematic",
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Extract pins — comp.pins may be empty on loaded schematics because
|
||||||
|
# kicad-sch-api doesn't back-populate from lib_symbols on load.
|
||||||
|
# Use get_symbol_definition().pins for metadata and
|
||||||
|
# sch.list_component_pins() for schematic-transformed positions.
|
||||||
|
pins_data: list[dict[str, Any]] = []
|
||||||
|
|
||||||
|
# Build a position lookup from list_component_pins (schematic coords)
|
||||||
|
pin_positions: dict[str, Any] = {}
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
for pin_num, pos in sch.list_component_pins(reference):
|
||||||
|
pin_positions[str(pin_num)] = pos
|
||||||
|
|
||||||
|
# Get pin metadata from the symbol definition (always populated)
|
||||||
|
raw_pins = None
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
sym_def = comp.get_symbol_definition()
|
||||||
|
if sym_def is not None:
|
||||||
|
raw_pins = sym_def.pins
|
||||||
|
|
||||||
|
# Fall back to comp.pins / comp.list_pins() if symbol def unavailable
|
||||||
|
if not raw_pins:
|
||||||
|
if hasattr(comp, "list_pins"):
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
raw_pins = comp.list_pins()
|
||||||
|
if not raw_pins:
|
||||||
|
raw_pins = getattr(comp, "pins", [])
|
||||||
|
|
||||||
|
for p in raw_pins:
|
||||||
|
pin_entry: dict[str, Any] = {}
|
||||||
|
|
||||||
|
if isinstance(p, dict):
|
||||||
|
pin_entry = p
|
||||||
|
else:
|
||||||
|
pin_num = str(getattr(p, "number", getattr(p, "pin_number", "")))
|
||||||
|
pin_entry["number"] = pin_num
|
||||||
|
pin_entry["name"] = getattr(p, "name", getattr(p, "pin_name", None))
|
||||||
|
pin_entry["type"] = str(getattr(p, "pin_type", getattr(p, "electrical_type", getattr(p, "type", None))))
|
||||||
|
|
||||||
|
# Prefer schematic-transformed position over local pin position
|
||||||
|
if pin_num in pin_positions:
|
||||||
|
sch_pos = pin_positions[pin_num]
|
||||||
|
pin_entry["position"] = {"x": sch_pos.x, "y": sch_pos.y}
|
||||||
|
else:
|
||||||
|
local_pos = getattr(p, "position", None)
|
||||||
|
if local_pos is not None:
|
||||||
|
if isinstance(local_pos, (list, tuple)) and len(local_pos) >= 2:
|
||||||
|
pin_entry["position"] = {"x": local_pos[0], "y": local_pos[1]}
|
||||||
|
elif hasattr(local_pos, "x"):
|
||||||
|
pin_entry["position"] = {"x": local_pos.x, "y": local_pos.y}
|
||||||
|
else:
|
||||||
|
pin_entry["position"] = str(local_pos)
|
||||||
|
|
||||||
|
is_connected = getattr(p, "connected", getattr(p, "is_connected", None))
|
||||||
|
if is_connected is not None:
|
||||||
|
pin_entry["connected"] = bool(is_connected)
|
||||||
|
|
||||||
|
net = getattr(p, "net", getattr(p, "net_name", None))
|
||||||
|
if net is not None:
|
||||||
|
pin_entry["net"] = str(net)
|
||||||
|
|
||||||
|
pins_data.append(pin_entry)
|
||||||
|
|
||||||
|
logger.info("Component %s has %d pin(s)", reference, len(pins_data))
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"reference": reference,
|
||||||
|
"lib_id": getattr(comp, "lib_id", None),
|
||||||
|
"value": getattr(comp, "value", None),
|
||||||
|
"pin_count": len(pins_data),
|
||||||
|
"pins": pins_data,
|
||||||
|
"engine": _get_schematic_engine(),
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("get_component_pins failed for %s: %s", reference, exc, exc_info=True)
|
||||||
|
return {"success": False, "error": str(exc), "schematic_path": schematic_path}
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def export_netlist(
|
||||||
|
schematic_path: str,
|
||||||
|
output_path: str | None = None,
|
||||||
|
format: str = "kicad",
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Export a netlist from a KiCad schematic via kicad-cli.
|
||||||
|
|
||||||
|
Supported formats: ``kicad`` (default), ``spice``, ``cadstar``,
|
||||||
|
``allegro``, ``pads``, ``orcadpcb2``. The output file is written to
|
||||||
|
the ``.mckicad/`` sidecar directory by default.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to a .kicad_sch file.
|
||||||
|
output_path: Destination file path. Defaults to
|
||||||
|
``.mckicad/netlist.<ext>`` next to the schematic.
|
||||||
|
format: Netlist format name.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``output_path`` and ``format``.
|
||||||
|
"""
|
||||||
|
# Validate format first (cheap check, no filesystem access)
|
||||||
|
allowed_formats = ("kicad", "spice", "cadstar", "allegro", "pads", "orcadpcb2")
|
||||||
|
fmt = format.lower().strip()
|
||||||
|
if fmt not in allowed_formats:
|
||||||
|
return {
|
||||||
|
"success": False,
|
||||||
|
"error": f"Unsupported netlist format: {format}. Allowed: {', '.join(allowed_formats)}",
|
||||||
|
}
|
||||||
|
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
|
||||||
|
cli_path, cli_err = _require_kicad_cli()
|
||||||
|
if cli_err:
|
||||||
|
return cli_err
|
||||||
|
assert cli_path is not None
|
||||||
|
|
||||||
|
# Determine file extension based on format
|
||||||
|
ext_map = {
|
||||||
|
"kicad": ".net",
|
||||||
|
"spice": ".cir",
|
||||||
|
"cadstar": ".frp",
|
||||||
|
"allegro": ".net",
|
||||||
|
"pads": ".asc",
|
||||||
|
"orcadpcb2": ".net",
|
||||||
|
}
|
||||||
|
extension = ext_map.get(fmt, ".net")
|
||||||
|
|
||||||
|
if output_path:
|
||||||
|
output_path = _expand(output_path)
|
||||||
|
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
|
||||||
|
else:
|
||||||
|
output_path = _default_output_path(schematic_path, f"netlist{extension}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
cmd = [cli_path, "sch", "export", "netlist", "-f", fmt, "-o", output_path, schematic_path]
|
||||||
|
|
||||||
|
logger.info("Exporting netlist: %s", " ".join(cmd))
|
||||||
|
|
||||||
|
result = subprocess.run( # nosec B603
|
||||||
|
cmd,
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=TIMEOUT_CONSTANTS["kicad_cli_export"],
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
stderr = result.stderr.strip() if result.stderr else "Netlist export failed"
|
||||||
|
logger.error("Netlist export failed (rc=%d): %s", result.returncode, stderr)
|
||||||
|
return {"success": False, "error": f"kicad-cli netlist export failed: {stderr}"}
|
||||||
|
|
||||||
|
if not os.path.isfile(output_path):
|
||||||
|
return {"success": False, "error": "Netlist output file was not created"}
|
||||||
|
|
||||||
|
file_size = os.path.getsize(output_path)
|
||||||
|
logger.info("Netlist exported: %s (%d bytes, format=%s)", output_path, file_size, fmt)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"output_path": output_path,
|
||||||
|
"format": fmt,
|
||||||
|
"file_size": file_size,
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Netlist export failed: %s", exc, exc_info=True)
|
||||||
|
return {"success": False, "error": str(exc), "schematic_path": schematic_path}
|
||||||
|
|
||||||
|
|
||||||
|
@mcp.tool()
|
||||||
|
def export_schematic_pdf(
|
||||||
|
schematic_path: str,
|
||||||
|
output_path: str | None = None,
|
||||||
|
black_and_white: bool = False,
|
||||||
|
exclude_background: bool = False,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Export a KiCad schematic to PDF via kicad-cli.
|
||||||
|
|
||||||
|
Renders all schematic sheets into a single PDF document. Useful for
|
||||||
|
design reviews, archival, or sharing with collaborators who don't
|
||||||
|
have KiCad installed.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schematic_path: Path to a .kicad_sch file (top-level sheet).
|
||||||
|
output_path: Destination PDF path. Defaults to
|
||||||
|
``.mckicad/schematic.pdf`` next to the schematic.
|
||||||
|
black_and_white: Render in black and white instead of colour.
|
||||||
|
exclude_background: Omit the background fill from the PDF.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dictionary with ``output_path`` and file size.
|
||||||
|
"""
|
||||||
|
verr = _validate_schematic_path(schematic_path)
|
||||||
|
if verr:
|
||||||
|
return verr
|
||||||
|
|
||||||
|
schematic_path = _expand(schematic_path)
|
||||||
|
|
||||||
|
cli_path, cli_err = _require_kicad_cli()
|
||||||
|
if cli_err:
|
||||||
|
return cli_err
|
||||||
|
assert cli_path is not None
|
||||||
|
|
||||||
|
if output_path:
|
||||||
|
output_path = _expand(output_path)
|
||||||
|
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
|
||||||
|
else:
|
||||||
|
output_path = _default_output_path(schematic_path, "schematic.pdf")
|
||||||
|
|
||||||
|
try:
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
cmd = [cli_path, "sch", "export", "pdf"]
|
||||||
|
|
||||||
|
if black_and_white:
|
||||||
|
cmd.append("--black-and-white")
|
||||||
|
if exclude_background:
|
||||||
|
cmd.append("--no-background-color")
|
||||||
|
|
||||||
|
cmd.extend(["-o", output_path, schematic_path])
|
||||||
|
|
||||||
|
logger.info("Exporting schematic PDF: %s", " ".join(cmd))
|
||||||
|
|
||||||
|
result = subprocess.run( # nosec B603
|
||||||
|
cmd,
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=TIMEOUT_CONSTANTS["kicad_cli_export"],
|
||||||
|
check=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
if result.returncode != 0:
|
||||||
|
stderr = result.stderr.strip() if result.stderr else "PDF export failed"
|
||||||
|
logger.error("Schematic PDF export failed (rc=%d): %s", result.returncode, stderr)
|
||||||
|
return {"success": False, "error": f"kicad-cli PDF export failed: {stderr}"}
|
||||||
|
|
||||||
|
if not os.path.isfile(output_path):
|
||||||
|
return {"success": False, "error": "PDF output file was not created"}
|
||||||
|
|
||||||
|
file_size = os.path.getsize(output_path)
|
||||||
|
logger.info("Schematic PDF exported: %s (%d bytes)", output_path, file_size)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"success": True,
|
||||||
|
"output_path": output_path,
|
||||||
|
"file_size": file_size,
|
||||||
|
"black_and_white": black_and_white,
|
||||||
|
"exclude_background": exclude_background,
|
||||||
|
"schematic_path": schematic_path,
|
||||||
|
}
|
||||||
|
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Schematic PDF export failed: %s", exc, exc_info=True)
|
||||||
|
return {"success": False, "error": str(exc), "schematic_path": schematic_path}
|
||||||
@ -3,6 +3,7 @@
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from tests.conftest import requires_sch_api
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.unit
|
@pytest.mark.unit
|
||||||
@ -46,3 +47,101 @@ def test_list_components_empty_schematic(tmp_output_dir):
|
|||||||
result = list_components(schematic_path=path)
|
result = list_components(schematic_path=path)
|
||||||
if result["success"]:
|
if result["success"]:
|
||||||
assert result.get("count", 0) == 0
|
assert result.get("count", 0) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_list_components_single_lookup(populated_schematic):
|
||||||
|
"""list_components with reference param should return one component."""
|
||||||
|
from mckicad.tools.schematic import list_components
|
||||||
|
|
||||||
|
result = list_components(schematic_path=populated_schematic, reference="R1")
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["count"] == 1
|
||||||
|
assert result["components"][0]["reference"] == "R1"
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_list_components_single_lookup_not_found(populated_schematic):
|
||||||
|
"""list_components with nonexistent reference should fail."""
|
||||||
|
from mckicad.tools.schematic import list_components
|
||||||
|
|
||||||
|
result = list_components(schematic_path=populated_schematic, reference="Z99")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_get_schematic_info_compact(populated_schematic):
|
||||||
|
"""get_schematic_info should return compact output."""
|
||||||
|
from mckicad.tools.schematic import get_schematic_info
|
||||||
|
|
||||||
|
result = get_schematic_info(schematic_path=populated_schematic)
|
||||||
|
assert result["success"] is True
|
||||||
|
assert "statistics" in result
|
||||||
|
assert "validation" in result
|
||||||
|
assert "unique_symbol_count" in result
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_get_component_detail(populated_schematic):
|
||||||
|
"""get_component_detail should return full info for one component."""
|
||||||
|
from mckicad.tools.schematic import get_component_detail
|
||||||
|
|
||||||
|
result = get_component_detail(schematic_path=populated_schematic, reference="R1")
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["component"]["reference"] == "R1"
|
||||||
|
assert result["component"]["lib_id"] == "Device:R"
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_get_component_detail_not_found(populated_schematic):
|
||||||
|
"""get_component_detail for missing component should fail."""
|
||||||
|
from mckicad.tools.schematic import get_component_detail
|
||||||
|
|
||||||
|
result = get_component_detail(schematic_path=populated_schematic, reference="Z99")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_get_schematic_hierarchy(populated_schematic):
|
||||||
|
"""get_schematic_hierarchy should return at least the root sheet."""
|
||||||
|
from mckicad.tools.schematic import get_schematic_hierarchy
|
||||||
|
|
||||||
|
result = get_schematic_hierarchy(schematic_path=populated_schematic)
|
||||||
|
assert result["success"] is True
|
||||||
|
assert result["hierarchy"]["name"] == "root"
|
||||||
|
assert result["hierarchy"]["total_sheets"] >= 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_file_output_infrastructure(tmp_output_dir):
|
||||||
|
"""write_detail_file should create .mckicad sidecar directory and file."""
|
||||||
|
from mckicad.utils.file_utils import write_detail_file
|
||||||
|
|
||||||
|
fake_sch = os.path.join(tmp_output_dir, "test.kicad_sch")
|
||||||
|
# File doesn't need to exist for write_detail_file
|
||||||
|
open(fake_sch, "w").close()
|
||||||
|
|
||||||
|
data = {"test": "data", "items": [1, 2, 3]}
|
||||||
|
path = write_detail_file(fake_sch, "test_output.json", data)
|
||||||
|
|
||||||
|
assert os.path.isfile(path)
|
||||||
|
assert ".mckicad" in path
|
||||||
|
assert path.endswith("test_output.json")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_file_output_cwd_fallback(tmp_output_dir, monkeypatch):
|
||||||
|
"""write_detail_file with None path should use CWD."""
|
||||||
|
from mckicad.utils.file_utils import write_detail_file
|
||||||
|
|
||||||
|
monkeypatch.chdir(tmp_output_dir)
|
||||||
|
path = write_detail_file(None, "test_cwd.json", {"test": True})
|
||||||
|
|
||||||
|
assert os.path.isfile(path)
|
||||||
|
assert ".mckicad" in path
|
||||||
|
|||||||
139
tests/test_schematic_analysis.py
Normal file
139
tests/test_schematic_analysis.py
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
"""Tests for schematic analysis tools."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from tests.conftest import requires_sch_api
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestRunSchematicErc:
|
||||||
|
"""Tests for the run_schematic_erc tool."""
|
||||||
|
|
||||||
|
def test_erc_on_populated_schematic(self, populated_schematic):
|
||||||
|
from mckicad.tools.schematic_analysis import run_schematic_erc
|
||||||
|
|
||||||
|
result = run_schematic_erc(schematic_path=populated_schematic)
|
||||||
|
assert result["success"] is True
|
||||||
|
assert "violation_count" in result or "error" not in result
|
||||||
|
|
||||||
|
def test_erc_invalid_path(self):
|
||||||
|
from mckicad.tools.schematic_analysis import run_schematic_erc
|
||||||
|
|
||||||
|
result = run_schematic_erc(schematic_path="/tmp/nonexistent.kicad_sch")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestAnalyzeConnectivity:
|
||||||
|
"""Tests for the analyze_connectivity tool."""
|
||||||
|
|
||||||
|
def test_connectivity_on_populated(self, populated_schematic):
|
||||||
|
from mckicad.tools.schematic_analysis import analyze_connectivity
|
||||||
|
|
||||||
|
result = analyze_connectivity(schematic_path=populated_schematic)
|
||||||
|
assert result["success"] is True
|
||||||
|
assert "net_count" in result or "error" not in result
|
||||||
|
|
||||||
|
def test_connectivity_invalid_path(self):
|
||||||
|
from mckicad.tools.schematic_analysis import analyze_connectivity
|
||||||
|
|
||||||
|
result = analyze_connectivity(schematic_path="/tmp/nonexistent.kicad_sch")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestCheckPinConnection:
|
||||||
|
"""Tests for the check_pin_connection tool."""
|
||||||
|
|
||||||
|
def test_check_existing_pin(self, populated_schematic):
|
||||||
|
from mckicad.tools.schematic_analysis import check_pin_connection
|
||||||
|
|
||||||
|
result = check_pin_connection(
|
||||||
|
schematic_path=populated_schematic,
|
||||||
|
reference="R1",
|
||||||
|
pin="1",
|
||||||
|
)
|
||||||
|
# May succeed or fail depending on kicad-sch-api version
|
||||||
|
assert "success" in result
|
||||||
|
|
||||||
|
def test_check_nonexistent_pin(self, populated_schematic):
|
||||||
|
from mckicad.tools.schematic_analysis import check_pin_connection
|
||||||
|
|
||||||
|
result = check_pin_connection(
|
||||||
|
schematic_path=populated_schematic,
|
||||||
|
reference="Z99",
|
||||||
|
pin="1",
|
||||||
|
)
|
||||||
|
assert "success" in result
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestVerifyPinsConnected:
|
||||||
|
"""Tests for the verify_pins_connected tool."""
|
||||||
|
|
||||||
|
def test_verify_two_pins(self, populated_schematic):
|
||||||
|
from mckicad.tools.schematic_analysis import verify_pins_connected
|
||||||
|
|
||||||
|
result = verify_pins_connected(
|
||||||
|
schematic_path=populated_schematic,
|
||||||
|
ref1="R1",
|
||||||
|
pin1="1",
|
||||||
|
ref2="R2",
|
||||||
|
pin2="1",
|
||||||
|
)
|
||||||
|
# May succeed or fail depending on kicad-sch-api version
|
||||||
|
assert "success" in result
|
||||||
|
|
||||||
|
|
||||||
|
@requires_sch_api
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestGetComponentPins:
|
||||||
|
"""Tests for the get_component_pins tool."""
|
||||||
|
|
||||||
|
def test_get_pins(self, populated_schematic):
|
||||||
|
from mckicad.tools.schematic_analysis import get_component_pins
|
||||||
|
|
||||||
|
result = get_component_pins(
|
||||||
|
schematic_path=populated_schematic,
|
||||||
|
reference="R1",
|
||||||
|
)
|
||||||
|
assert "success" in result
|
||||||
|
|
||||||
|
def test_get_pins_nonexistent(self, populated_schematic):
|
||||||
|
from mckicad.tools.schematic_analysis import get_component_pins
|
||||||
|
|
||||||
|
result = get_component_pins(
|
||||||
|
schematic_path=populated_schematic,
|
||||||
|
reference="Z99",
|
||||||
|
)
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
class TestExportValidation:
|
||||||
|
"""Tests for input validation in export tools."""
|
||||||
|
|
||||||
|
def test_export_netlist_invalid_path(self):
|
||||||
|
from mckicad.tools.schematic_analysis import export_netlist
|
||||||
|
|
||||||
|
result = export_netlist(schematic_path="/tmp/nonexistent.kicad_sch")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
def test_export_pdf_invalid_path(self):
|
||||||
|
from mckicad.tools.schematic_analysis import export_schematic_pdf
|
||||||
|
|
||||||
|
result = export_schematic_pdf(schematic_path="/tmp/nonexistent.kicad_sch")
|
||||||
|
assert result["success"] is False
|
||||||
|
|
||||||
|
def test_export_netlist_bad_format(self):
|
||||||
|
from mckicad.tools.schematic_analysis import export_netlist
|
||||||
|
|
||||||
|
result = export_netlist(
|
||||||
|
schematic_path="/tmp/test.kicad_sch",
|
||||||
|
format="invalid_format",
|
||||||
|
)
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "Unsupported" in result.get("error", "")
|
||||||
Loading…
x
Reference in New Issue
Block a user