From 61ed7b3efeb2891dbbafc1345f86b6df85d720a2 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Wed, 4 Mar 2026 23:12:13 -0700 Subject: [PATCH] Add wire auditing, bulk wire removal, and net-to-pin verification tools Refactors _build_connectivity() into a two-layer state builder so the union-find internals (pin_at, label_at, wire_segments) are accessible to new analysis tools without duplicating the 200-line connectivity engine. New tools: - audit_wiring: trace all wires connected to a component, report per-pin net membership with wire segment coordinates and connected pins - remove_wires_by_criteria: bulk-remove wires by coordinate filters (y, x, min/max ranges, tolerance) with dry_run preview support - verify_connectivity: compare actual wiring against an expected net-to-pin mapping, report matches/mismatches/missing nets New sexp_parser utilities: - parse_wire_segments: extract (wire ...) blocks with start/end/uuid - remove_sexp_blocks_by_uuid: atomically remove blocks by UUID set --- src/mckicad/tools/schematic_analysis.py | 346 +++++++++++++++++++++++- src/mckicad/tools/schematic_edit.py | 138 ++++++++++ src/mckicad/utils/sexp_parser.py | 136 ++++++++++ tests/test_schematic_analysis.py | 123 +++++++++ tests/test_schematic_edit.py | 170 ++++++++++++ tests/test_sexp_parser.py | 139 ++++++++++ 6 files changed, 1043 insertions(+), 9 deletions(-) diff --git a/src/mckicad/tools/schematic_analysis.py b/src/mckicad/tools/schematic_analysis.py index b69cfb7..ca91a80 100644 --- a/src/mckicad/tools/schematic_analysis.py +++ b/src/mckicad/tools/schematic_analysis.py @@ -23,6 +23,9 @@ from mckicad.utils.sexp_parser import ( parse_lib_symbol_pins, transform_pin_to_schematic, ) +from mckicad.utils.sexp_parser import ( + parse_wire_segments as parse_wire_segments_sexp, +) logger = logging.getLogger(__name__) @@ -111,15 +114,19 @@ def _default_output_path(schematic_path: str, filename: str) -> str: # --------------------------------------------------------------------------- -def _build_connectivity( +def _build_connectivity_state( sch: Any, schematic_path: str | None = None -) -> tuple[dict[str, list[dict[str, str]]], list[dict[str, str]]]: - """Build a net connectivity graph by walking wires, pin positions, and labels. +) -> dict[str, Any]: + """Build the full connectivity state by walking wires, pin positions, and labels. + + This is the inner engine that returns all intermediate data structures. + Use :func:`_build_connectivity` for the simpler ``(net_graph, unconnected)`` + return signature. kicad-sch-api does not auto-compute nets on loaded schematics. This function reconstructs the connectivity by: - 1. Collecting all wire start/end coordinates + 1. Collecting all wire start/end coordinates (and wire segment metadata) 2. Mapping component pin positions to coordinates 3. Mapping label positions to coordinates (local, hierarchical, global) 4. Using union-find to group touching points into nets @@ -127,12 +134,20 @@ def _build_connectivity( When *schematic_path* is provided, ``(global_label ...)`` entries are also extracted directly from the raw file via :mod:`sexp_parser`, since - kicad-sch-api does not expose them. + kicad-sch-api does not expose them. Wire UUIDs are also parsed from + the raw file for segment tracking. Returns: - (net_graph, unconnected_pins) where net_graph maps net names to - lists of {reference, pin} dicts, and unconnected_pins lists - pins not connected to any wire or label. + Dict with keys: + - ``find``: union-find lookup function + - ``parent``: raw union-find parent map + - ``pin_at``: coord → list of {reference, pin} dicts + - ``label_at``: coord → net name string + - ``wire_segments``: list of {start, end, uuid} dicts + - ``wire_coords``: set of all wire endpoint coordinates + - ``all_pins``: list of (reference, pin_num, coord) tuples + - ``net_graph``: net_name → list of {reference, pin} dicts + - ``unconnected_pins``: list of {reference, pin} dicts """ # -- Union-Find -- _parent: dict[tuple[float, float], tuple[float, float]] = {} @@ -164,6 +179,17 @@ def _build_connectivity( # --- 1. Process wires — each wire connects its start and end --- wire_coords: set[tuple[float, float]] = set() + wire_segments: list[dict[str, Any]] = [] + + # Build a UUID lookup from raw file if available (kicad-sch-api + # doesn't reliably expose wire UUIDs) + sexp_wire_lookup: dict[tuple[tuple[float, float], tuple[float, float]], str] = {} + if schematic_path: + for sw in parse_wire_segments_sexp(schematic_path): + s_key = _rc(sw["start"]["x"], sw["start"]["y"]) + e_key = _rc(sw["end"]["x"], sw["end"]["y"]) + sexp_wire_lookup[(s_key, e_key)] = sw["uuid"] + for wire in getattr(sch, "wires", []): start = getattr(wire, "start", None) end = getattr(wire, "end", None) @@ -177,6 +203,16 @@ def _build_connectivity( wire_coords.add(s) wire_coords.add(e) + # Track wire segment metadata for audit tool + wire_uuid = getattr(wire, "uuid", None) + if wire_uuid is None: + wire_uuid = sexp_wire_lookup.get((s, e)) + wire_segments.append({ + "start": {"x": s[0], "y": s[1]}, + "end": {"x": e[0], "y": e[1]}, + "uuid": str(wire_uuid) if wire_uuid else None, + }) + # --- 2. Map component pin positions to coordinates --- pin_at: dict[tuple[float, float], list[dict[str, str]]] = {} all_pins: list[tuple[str, str, tuple[float, float]]] = [] @@ -333,7 +369,29 @@ def _build_connectivity( if coord not in wire_coords and coord not in label_at: unconnected.append({"reference": ref, "pin": pin_num}) - return net_graph, unconnected + return { + "find": _find, + "parent": _parent, + "pin_at": pin_at, + "label_at": label_at, + "wire_segments": wire_segments, + "wire_coords": wire_coords, + "all_pins": all_pins, + "net_graph": net_graph, + "unconnected_pins": unconnected, + } + + +def _build_connectivity( + sch: Any, schematic_path: str | None = None +) -> tuple[dict[str, list[dict[str, str]]], list[dict[str, str]]]: + """Build a net connectivity graph by walking wires, pin positions, and labels. + + Thin wrapper around :func:`_build_connectivity_state` that returns only + the ``(net_graph, unconnected_pins)`` tuple for backward compatibility. + """ + state = _build_connectivity_state(sch, schematic_path) + return state["net_graph"], state["unconnected_pins"] # --------------------------------------------------------------------------- @@ -1078,3 +1136,273 @@ def export_schematic_pdf( except Exception as exc: logger.error("Schematic PDF export failed: %s", exc, exc_info=True) return {"success": False, "error": str(exc), "schematic_path": schematic_path} + + +@mcp.tool() +def audit_wiring(schematic_path: str, reference: str) -> dict[str, Any]: + """Trace all wires connected to a component and report per-pin net membership. + + For each pin on the specified component, reports the net it belongs to, + the wire segments in that net, and other pins sharing the same net. + Essential for debugging wiring errors — especially overlapping wire + segments that create unintended shorts. + + Args: + schematic_path: Path to a .kicad_sch file. + reference: Component reference designator (e.g. ``U8``, ``R1``). + + Returns: + Dictionary with ``reference``, ``pin_nets`` list (per-pin net info + with wire segments and connected pins), and ``detail_file`` when + the result exceeds the inline threshold. + """ + err = _require_sch_api() + if err: + return err + + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + if not reference: + return {"success": False, "error": "reference must be a non-empty string"} + + schematic_path = _expand(schematic_path) + + try: + sch = _ksa_load(schematic_path) + state = _build_connectivity_state(sch, schematic_path) + + find_fn = state["find"] + pin_at = state["pin_at"] + wire_segments = state["wire_segments"] + net_graph = state["net_graph"] + + # Build reverse lookup: coord root → net name + root_to_net: dict[tuple[float, float], str] = {} + for n_name, n_pins in net_graph.items(): + for p in n_pins: + # Find coordinate for this pin + for coord, pin_list in pin_at.items(): + for pe in pin_list: + if pe["reference"] == p["reference"] and pe["pin"] == p["pin"]: + root = find_fn(coord) + root_to_net[root] = n_name + + # Build wire segments by net root (each wire's endpoints share a root) + root_to_wires: dict[tuple[float, float], list[dict[str, Any]]] = {} + for ws in wire_segments: + s_coord = (round(ws["start"]["x"], 2), round(ws["start"]["y"], 2)) + root = find_fn(s_coord) + root_to_wires.setdefault(root, []).append(ws) + + # Find all pins for the target component + target_pins: list[tuple[str, tuple[float, float]]] = [] + for coord, pin_list in pin_at.items(): + for pe in pin_list: + if pe["reference"] == reference: + target_pins.append((pe["pin"], coord)) + + if not target_pins: + return { + "success": False, + "error": f"Component '{reference}' has no pins in the connectivity graph", + "schematic_path": schematic_path, + } + + # Build per-pin audit report + pin_nets: list[dict[str, Any]] = [] + for pin_num, coord in target_pins: + root = find_fn(coord) + net_name = root_to_net.get(root, "unconnected") + wires = root_to_wires.get(root, []) + + # Collect other pins on the same net + connected_pins: list[dict[str, str]] = [] + if net_name in net_graph: + for p in net_graph[net_name]: + if not (p["reference"] == reference and p["pin"] == pin_num): + connected_pins.append(p) + + pin_nets.append({ + "pin": pin_num, + "net": net_name, + "wire_count": len(wires), + "wires": wires, + "connected_pins": connected_pins, + }) + + logger.info( + "audit_wiring for %s: %d pins, %d total wire segments", + reference, len(pin_nets), + sum(pn["wire_count"] for pn in pin_nets), + ) + + result: dict[str, Any] = { + "success": True, + "reference": reference, + "pin_count": len(pin_nets), + "engine": _get_schematic_engine(), + "schematic_path": schematic_path, + } + + if len(pin_nets) > INLINE_RESULT_THRESHOLD: + detail_path = write_detail_file( + schematic_path, f"audit_{reference}.json", pin_nets, + ) + result["detail_file"] = detail_path + result["pin_nets_preview"] = pin_nets[:INLINE_RESULT_THRESHOLD] + else: + result["pin_nets"] = pin_nets + + return result + + except Exception as exc: + logger.error("audit_wiring failed for %s: %s", reference, exc, exc_info=True) + return {"success": False, "error": str(exc), "schematic_path": schematic_path} + + +@mcp.tool() +def verify_connectivity( + schematic_path: str, + expected: dict[str, list[list[str]]], +) -> dict[str, Any]: + """Compare actual schematic wiring against an expected net-to-pin mapping. + + For each expected net, checks whether the correct pins are connected, + flags pins wired to the wrong net, and identifies missing or extra + connections. Use this after bulk edits or PDF-extracted schematics to + verify that wiring matches the intended netlist. + + Args: + schematic_path: Path to a .kicad_sch file. + expected: Maps net names to lists of ``[reference, pin]`` pairs. + Example: ``{"ESP_EN": [["U8", "103"]], "GND": [["U8", "1"], ["C1", "2"]]}`` + + Returns: + Dictionary with ``verified`` count, ``failed`` count, and ``results`` + list with per-net status (``match``, ``mismatch``, ``missing_net``). + """ + err = _require_sch_api() + if err: + return err + + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + if not expected: + return {"success": False, "error": "expected mapping must be non-empty"} + + schematic_path = _expand(schematic_path) + + try: + sch = _ksa_load(schematic_path) + state = _build_connectivity_state(sch, schematic_path) + net_graph = state["net_graph"] + + # Build reverse lookup: (reference, pin) → actual net name + pin_to_actual_net: dict[tuple[str, str], str] = {} + for n_name, n_pins in net_graph.items(): + for p in n_pins: + pin_to_actual_net[(p["reference"], p["pin"])] = n_name + + results: list[dict[str, Any]] = [] + verified = 0 + failed = 0 + + for net_name, expected_pins in expected.items(): + # Validate expected_pins format + pin_pairs = [] + for pair in expected_pins: + if not isinstance(pair, list) or len(pair) != 2: + results.append({ + "net": net_name, + "status": "invalid_format", + "error": f"Expected [reference, pin] pair, got: {pair}", + }) + failed += 1 + continue + pin_pairs.append((str(pair[0]), str(pair[1]))) + + if not pin_pairs: + continue + + # Check if net exists in actual graph + actual_pins_on_net: list[dict[str, str]] = net_graph.get(net_name, []) + + # Check each expected pin + all_match = True + missing_pins: list[list[str]] = [] + wrong_net_pins: list[dict[str, Any]] = [] + extra_pins: list[list[str]] = [] + + expected_pin_set: set[tuple[str, str]] = set() + for ref, pin in pin_pairs: + expected_pin_set.add((ref, pin)) + actual_net = pin_to_actual_net.get((ref, pin)) + + if actual_net is None: + missing_pins.append([ref, pin]) + all_match = False + elif actual_net != net_name: + wrong_net_pins.append({ + "pin": [ref, pin], + "expected_net": net_name, + "actual_net": actual_net, + }) + all_match = False + + # Find extra pins on this net (in actual but not expected) + for p in actual_pins_on_net: + key = (p["reference"], p["pin"]) + if key not in expected_pin_set: + extra_pins.append([p["reference"], p["pin"]]) + + if all_match and not extra_pins: + results.append({"net": net_name, "status": "match"}) + verified += 1 + elif not actual_pins_on_net and not wrong_net_pins: + entry: dict[str, Any] = { + "net": net_name, + "status": "missing_net", + "expected_pins": [list(p) for p in pin_pairs], + } + if missing_pins: + entry["missing_pins"] = missing_pins + results.append(entry) + failed += 1 + else: + entry = { + "net": net_name, + "status": "mismatch", + "expected_pins": [list(p) for p in pin_pairs], + "actual_pins": [[p["reference"], p["pin"]] for p in actual_pins_on_net], + } + if missing_pins: + entry["missing_pins"] = missing_pins + if wrong_net_pins: + entry["wrong_net"] = wrong_net_pins + if extra_pins: + entry["extra_pins"] = extra_pins + results.append(entry) + failed += 1 + + logger.info( + "verify_connectivity: %d verified, %d failed out of %d nets", + verified, failed, len(expected), + ) + + return { + "success": True, + "verified": verified, + "failed": failed, + "total": len(expected), + "results": results, + "engine": _get_schematic_engine(), + "schematic_path": schematic_path, + } + + except Exception as exc: + logger.error("verify_connectivity failed: %s", exc, exc_info=True) + return {"success": False, "error": str(exc), "schematic_path": schematic_path} diff --git a/src/mckicad/tools/schematic_edit.py b/src/mckicad/tools/schematic_edit.py index e51374d..120d041 100644 --- a/src/mckicad/tools/schematic_edit.py +++ b/src/mckicad/tools/schematic_edit.py @@ -4,6 +4,9 @@ Schematic editing tools for modifying existing elements in KiCad schematics. Complements the creation-oriented tools in schematic.py with modification, removal, and annotation capabilities. Uses the same kicad-sch-api engine and follows the same swap-point pattern for future kipy IPC support. + +Also includes s-expression-direct tools (e.g. ``remove_wires_by_criteria``) +that bypass kicad-sch-api for operations it doesn't support well. """ import logging @@ -11,6 +14,10 @@ import os from typing import Any from mckicad.server import mcp +from mckicad.utils.sexp_parser import ( + parse_wire_segments, + remove_sexp_blocks_by_uuid, +) logger = logging.getLogger(__name__) @@ -578,3 +585,134 @@ def backup_schematic(schematic_path: str) -> dict[str, Any]: except Exception as e: logger.error("Failed to back up %s: %s", schematic_path, e) return {"success": False, "error": str(e), "schematic_path": schematic_path} + + +@mcp.tool() +def remove_wires_by_criteria( + schematic_path: str, + y: float | None = None, + x: float | None = None, + min_x: float | None = None, + max_x: float | None = None, + min_y: float | None = None, + max_y: float | None = None, + tolerance: float = 0.01, + dry_run: bool = False, +) -> dict[str, Any]: + """Remove wire segments matching coordinate criteria from a KiCad schematic. + + Uses direct s-expression manipulation to surgically remove wires + by their position — essential for cleaning up bulk wiring errors + like overlapping horizontal/vertical segments that create shorts. + + All specified criteria must match for a wire to be selected. + At least one criterion is required (won't delete all wires). + + Args: + schematic_path: Path to an existing .kicad_sch file. + y: Match horizontal wires where both endpoints have Y within + ``tolerance`` of this value. + x: Match vertical wires where both endpoints have X within + ``tolerance`` of this value. + min_x: Both endpoints' X values must be >= this value (minus tolerance). + max_x: Both endpoints' X values must be <= this value (plus tolerance). + min_y: Both endpoints' Y values must be >= this value (minus tolerance). + max_y: Both endpoints' Y values must be <= this value (plus tolerance). + tolerance: Coordinate comparison tolerance (default 0.01). + dry_run: If True, return matching wires without removing them. + + Returns: + Dictionary with ``matched_count``, ``removed_count`` (0 if dry_run), + and ``matched_wires`` preview list. + """ + verr = _validate_schematic_path(schematic_path) + if verr: + return verr + + schematic_path = _expand(schematic_path) + + # Require at least one filtering criterion + has_criteria = any(v is not None for v in (y, x, min_x, max_x, min_y, max_y)) + if not has_criteria: + return { + "success": False, + "error": "At least one coordinate criterion (y, x, min_x, max_x, min_y, max_y) is required", + } + + try: + all_wires = parse_wire_segments(schematic_path) + + if not all_wires: + return { + "success": True, + "matched_count": 0, + "removed_count": 0, + "note": "No wire segments found in schematic", + "schematic_path": schematic_path, + } + + # Filter wires by criteria + matched: list[dict[str, Any]] = [] + for wire in all_wires: + sx, sy = wire["start"]["x"], wire["start"]["y"] + ex, ey = wire["end"]["x"], wire["end"]["y"] + + if y is not None and (abs(sy - y) > tolerance or abs(ey - y) > tolerance): + continue + + if x is not None and (abs(sx - x) > tolerance or abs(ex - x) > tolerance): + continue + + if min_x is not None and min(sx, ex) < min_x - tolerance: + continue + + if max_x is not None and max(sx, ex) > max_x + tolerance: + continue + + if min_y is not None and min(sy, ey) < min_y - tolerance: + continue + + if max_y is not None and max(sy, ey) > max_y + tolerance: + continue + + matched.append(wire) + + if dry_run or not matched: + return { + "success": True, + "dry_run": dry_run, + "matched_count": len(matched), + "removed_count": 0, + "matched_wires": matched[:50], # cap preview + "schematic_path": schematic_path, + } + + # Remove matched wires via s-expression manipulation + uuids_to_remove = {w["uuid"] for w in matched if w.get("uuid")} + if not uuids_to_remove: + return { + "success": False, + "error": "Matched wires have no UUIDs — cannot remove (file may be malformed)", + "matched_count": len(matched), + "schematic_path": schematic_path, + } + + removed_count = remove_sexp_blocks_by_uuid(schematic_path, uuids_to_remove) + + logger.info( + "remove_wires_by_criteria: removed %d/%d matched wires from %s", + removed_count, len(matched), schematic_path, + ) + + return { + "success": True, + "dry_run": False, + "matched_count": len(matched), + "removed_count": removed_count, + "matched_wires": matched[:50], + "schematic_path": schematic_path, + } + + except Exception as e: + logger.error("remove_wires_by_criteria failed: %s", e, exc_info=True) + return {"success": False, "error": str(e), "schematic_path": schematic_path} diff --git a/src/mckicad/utils/sexp_parser.py b/src/mckicad/utils/sexp_parser.py index e7aea56..6da8bee 100644 --- a/src/mckicad/utils/sexp_parser.py +++ b/src/mckicad/utils/sexp_parser.py @@ -569,6 +569,142 @@ def _suppress_os_error(): return contextlib.suppress(OSError) +# --------------------------------------------------------------------------- +# Wire segment extraction and removal +# --------------------------------------------------------------------------- + +# Match (wire ...) blocks: extract start/end coordinates and UUID. +# Format: +# (wire (pts (xy X1 Y1) (xy X2 Y2)) +# (stroke ...) +# (uuid "UUID") +# ) +_WIRE_BLOCK_RE = re.compile( + r'\(wire\s+' + r'\(pts\s+' + r'\(xy\s+([\d.e+-]+)\s+([\d.e+-]+)\)\s*' # start x, y + r'\(xy\s+([\d.e+-]+)\s+([\d.e+-]+)\)\s*' # end x, y + r'\)' # close pts + r'.*?' # stroke etc (non-greedy) + r'\(uuid\s+"([^"]+)"\)', # uuid + re.DOTALL, +) + + +def parse_wire_segments(filepath: str) -> list[dict[str, Any]]: + """Extract all ``(wire ...)`` blocks from a ``.kicad_sch`` file. + + Returns a list of dicts with ``start`` (x, y), ``end`` (x, y), and + ``uuid`` for each wire segment found. + """ + try: + with open(filepath, encoding="utf-8") as f: + content = f.read() + except Exception: + return [] + + wires: list[dict[str, Any]] = [] + for match in _WIRE_BLOCK_RE.finditer(content): + wires.append({ + "start": {"x": float(match.group(1)), "y": float(match.group(2))}, + "end": {"x": float(match.group(3)), "y": float(match.group(4))}, + "uuid": match.group(5), + }) + return wires + + +def remove_sexp_blocks_by_uuid(filepath: str, uuids: set[str]) -> int: + """Remove ``(wire ...)`` or other top-level s-expression blocks whose UUID + is in the given set. + + Uses bracket-counting to find each ``(wire`` block, checks for a UUID + match, and rebuilds the file without matched blocks. Performs an atomic + write (temp file + ``os.replace()``) to avoid corruption. + + Args: + filepath: Path to an existing ``.kicad_sch`` file. + uuids: Set of UUID strings to match for removal. + + Returns: + Number of blocks actually removed. + """ + if not uuids: + return 0 + + with open(filepath, encoding="utf-8") as f: + content = f.read() + + # Find all (wire ...) block spans and mark those matching a target UUID + removals: list[tuple[int, int]] = [] # (start, end) byte offsets + search_start = 0 + + while True: + idx = content.find("(wire ", search_start) + if idx == -1: + idx = content.find("(wire\n", search_start) + if idx == -1: + idx = content.find("(wire\t", search_start) + if idx == -1: + break + + # Bracket-count to find the end of this (wire ...) block + depth = 0 + block_end = idx + for i in range(idx, len(content)): + if content[i] == "(": + depth += 1 + elif content[i] == ")": + depth -= 1 + if depth == 0: + block_end = i + 1 + break + + block_text = content[idx:block_end] + + # Check if this block's UUID matches + uuid_match = re.search(r'\(uuid\s+"([^"]+)"\)', block_text) + if uuid_match and uuid_match.group(1) in uuids: + # Include any leading whitespace/newline on the same line + trim_start = idx + while trim_start > 0 and content[trim_start - 1] in (" ", "\t"): + trim_start -= 1 + # Include trailing newline if present + trim_end = block_end + if trim_end < len(content) and content[trim_end] == "\n": + trim_end += 1 + removals.append((trim_start, trim_end)) + + search_start = block_end + + if not removals: + return 0 + + # Rebuild content without removed blocks + parts: list[str] = [] + prev_end = 0 + for start, end in removals: + parts.append(content[prev_end:start]) + prev_end = end + parts.append(content[prev_end:]) + new_content = "".join(parts) + + # Atomic write + fd, tmp_path = tempfile.mkstemp( + dir=os.path.dirname(filepath) or ".", + suffix=".kicad_sch.tmp", + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as tmp_f: + tmp_f.write(new_content) + os.replace(tmp_path, filepath) + except BaseException: + with _suppress_os_error(): + os.unlink(tmp_path) + raise + + return len(removals) + + # --------------------------------------------------------------------------- # Shared pin resolution — "try API, fall back to sexp" pattern # --------------------------------------------------------------------------- diff --git a/tests/test_schematic_analysis.py b/tests/test_schematic_analysis.py index 9042e54..a638b96 100644 --- a/tests/test_schematic_analysis.py +++ b/tests/test_schematic_analysis.py @@ -1,6 +1,7 @@ """Tests for schematic analysis tools.""" import pytest + from tests.conftest import requires_sch_api @@ -137,3 +138,125 @@ class TestExportValidation: ) assert result["success"] is False assert "Unsupported" in result.get("error", "") + + +@requires_sch_api +@pytest.mark.unit +class TestAuditWiring: + """Tests for the audit_wiring tool.""" + + def test_audit_existing_component(self, populated_schematic): + from mckicad.tools.schematic_analysis import audit_wiring + + result = audit_wiring( + schematic_path=populated_schematic, + reference="R1", + ) + assert result["success"] is True + assert result["reference"] == "R1" + assert "pin_nets" in result or "pin_nets_preview" in result + assert result.get("pin_count", 0) > 0 + + def test_audit_nonexistent_component(self, populated_schematic): + from mckicad.tools.schematic_analysis import audit_wiring + + result = audit_wiring( + schematic_path=populated_schematic, + reference="Z99", + ) + assert result["success"] is False + + def test_audit_invalid_path(self): + from mckicad.tools.schematic_analysis import audit_wiring + + result = audit_wiring( + schematic_path="/tmp/nonexistent.kicad_sch", + reference="R1", + ) + assert result["success"] is False + + def test_audit_empty_reference(self): + from mckicad.tools.schematic_analysis import audit_wiring + + result = audit_wiring( + schematic_path="/tmp/nonexistent.kicad_sch", + reference="", + ) + assert result["success"] is False + + def test_audit_pin_net_structure(self, populated_schematic): + from mckicad.tools.schematic_analysis import audit_wiring + + result = audit_wiring( + schematic_path=populated_schematic, + reference="R1", + ) + if result["success"]: + pin_nets = result.get("pin_nets", result.get("pin_nets_preview", [])) + for pn in pin_nets: + assert "pin" in pn + assert "net" in pn + assert "wire_count" in pn + assert "wires" in pn + assert "connected_pins" in pn + + +@requires_sch_api +@pytest.mark.unit +class TestVerifyConnectivity: + """Tests for the verify_connectivity tool.""" + + def test_verify_with_matching_net(self, populated_schematic): + from mckicad.tools.schematic_analysis import ( + analyze_connectivity, + verify_connectivity, + ) + + # First get actual connectivity to build a valid expected map + conn = analyze_connectivity(schematic_path=populated_schematic) + assert conn["success"] is True + + # Try to verify with an empty expected — should fail validation + result = verify_connectivity( + schematic_path=populated_schematic, + expected={}, + ) + assert result["success"] is False + + def test_verify_missing_net(self, populated_schematic): + from mckicad.tools.schematic_analysis import verify_connectivity + + result = verify_connectivity( + schematic_path=populated_schematic, + expected={"NONEXISTENT_NET": [["U99", "1"]]}, + ) + assert result["success"] is True + assert result["failed"] >= 1 + # Should report as missing_net or missing pin + statuses = {r["status"] for r in result["results"]} + assert statuses & {"missing_net", "mismatch"} + + def test_verify_invalid_path(self): + from mckicad.tools.schematic_analysis import verify_connectivity + + result = verify_connectivity( + schematic_path="/tmp/nonexistent.kicad_sch", + expected={"NET": [["R1", "1"]]}, + ) + assert result["success"] is False + + def test_verify_result_structure(self, populated_schematic): + from mckicad.tools.schematic_analysis import verify_connectivity + + result = verify_connectivity( + schematic_path=populated_schematic, + expected={"TEST_NET": [["R1", "1"]]}, + ) + assert result["success"] is True + assert "verified" in result + assert "failed" in result + assert "total" in result + assert "results" in result + for r in result["results"]: + assert "net" in r + assert "status" in r diff --git a/tests/test_schematic_edit.py b/tests/test_schematic_edit.py index 2dad2a3..510067c 100644 --- a/tests/test_schematic_edit.py +++ b/tests/test_schematic_edit.py @@ -1,6 +1,7 @@ """Tests for schematic editing tools.""" import os +import tempfile import pytest @@ -174,3 +175,172 @@ class TestAnnotationTools: y=300, ) assert result["success"] is True + + +# Minimal schematic with wire segments for testing remove_wires_by_criteria. +# Does NOT require kicad-sch-api — works on raw s-expression files. +_WIRES_SCHEMATIC = """\ +(kicad_sch + (version 20231120) + (generator "eeschema") + (uuid "test-root") + (paper "A4") + (lib_symbols + ) + (wire (pts (xy 148.59 194.31) (xy 156.21 194.31)) + (stroke (width 0) (type default)) + (uuid "hw-1") + ) + (wire (pts (xy 156.21 194.31) (xy 163.83 194.31)) + (stroke (width 0) (type default)) + (uuid "hw-2") + ) + (wire (pts (xy 163.83 194.31) (xy 171.45 194.31)) + (stroke (width 0) (type default)) + (uuid "hw-3") + ) + (wire (pts (xy 100.0 100.0) (xy 100.0 130.0)) + (stroke (width 0) (type default)) + (uuid "vw-1") + ) + (wire (pts (xy 200.0 50.0) (xy 250.0 80.0)) + (stroke (width 0) (type default)) + (uuid "dw-1") + ) +) +""" + + +@pytest.fixture +def wires_schematic(): + """Write a schematic with wires to a temp file for testing.""" + with tempfile.NamedTemporaryFile( + mode="w", suffix=".kicad_sch", delete=False, encoding="utf-8", + ) as f: + f.write(_WIRES_SCHEMATIC) + path = f.name + yield path + os.unlink(path) + + +@pytest.mark.unit +class TestRemoveWiresByCriteria: + """Tests for the remove_wires_by_criteria tool.""" + + def test_dry_run_horizontal(self, wires_schematic): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + y=194.31, + dry_run=True, + ) + assert result["success"] is True + assert result["dry_run"] is True + assert result["matched_count"] == 3 + assert result["removed_count"] == 0 + + def test_remove_horizontal_wires(self, wires_schematic): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + y=194.31, + ) + assert result["success"] is True + assert result["removed_count"] == 3 + + # Verify they're gone + from mckicad.utils.sexp_parser import parse_wire_segments + + remaining = parse_wire_segments(wires_schematic) + remaining_uuids = {w["uuid"] for w in remaining} + assert "hw-1" not in remaining_uuids + assert "hw-2" not in remaining_uuids + assert "hw-3" not in remaining_uuids + assert "vw-1" in remaining_uuids + + def test_remove_with_x_range(self, wires_schematic): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + y=194.31, + min_x=148.0, + max_x=164.0, + ) + assert result["success"] is True + assert result["matched_count"] == 2 # hw-1 and hw-2 only + + def test_remove_vertical_wire(self, wires_schematic): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + x=100.0, + ) + assert result["success"] is True + assert result["matched_count"] == 1 + assert result["removed_count"] == 1 + + def test_no_criteria_fails(self, wires_schematic): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + ) + assert result["success"] is False + assert "criterion" in result["error"].lower() + + def test_no_matches(self, wires_schematic): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + y=999.0, + ) + assert result["success"] is True + assert result["matched_count"] == 0 + + def test_invalid_path(self): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + result = remove_wires_by_criteria( + schematic_path="/tmp/nonexistent.kicad_sch", + y=100.0, + ) + assert result["success"] is False + + def test_min_max_y_range(self, wires_schematic): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + min_y=99.0, + max_y=131.0, + dry_run=True, + ) + assert result["success"] is True + # vw-1 (100→130) fits, horizontal wires at 194.31 do not + assert result["matched_count"] == 1 + + def test_tolerance(self, wires_schematic): + from mckicad.tools.schematic_edit import remove_wires_by_criteria + + # With tight tolerance, exact match + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + y=194.31, + tolerance=0.001, + dry_run=True, + ) + assert result["matched_count"] == 3 + + # Offset slightly beyond tolerance + result = remove_wires_by_criteria( + schematic_path=wires_schematic, + y=194.35, + tolerance=0.01, + dry_run=True, + ) + assert result["matched_count"] == 0 diff --git a/tests/test_sexp_parser.py b/tests/test_sexp_parser.py index f4a6ed4..3471880 100644 --- a/tests/test_sexp_parser.py +++ b/tests/test_sexp_parser.py @@ -15,6 +15,8 @@ from mckicad.utils.sexp_parser import ( parse_global_labels, parse_lib_file_symbol_pins, parse_lib_symbol_pins, + parse_wire_segments, + remove_sexp_blocks_by_uuid, transform_pin_to_schematic, ) @@ -720,3 +722,140 @@ class TestSymLibTableParsing: ) assert result is not None assert result.endswith("MyProject.kicad_sym") + + +# --------------------------------------------------------------------------- +# Wire segment parsing and removal tests +# --------------------------------------------------------------------------- + +SAMPLE_SCHEMATIC_WITH_WIRES = """\ +(kicad_sch + (version 20231120) + (generator "eeschema") + (uuid "root-uuid") + (paper "A4") + (lib_symbols + ) + (wire (pts (xy 148.59 194.31) (xy 156.21 194.31)) + (stroke (width 0) (type default)) + (uuid "wire-1") + ) + (wire (pts (xy 156.21 194.31) (xy 163.83 194.31)) + (stroke (width 0) (type default)) + (uuid "wire-2") + ) + (wire (pts (xy 100.0 100.0) (xy 100.0 120.0)) + (stroke (width 0) (type default)) + (uuid "wire-3") + ) + (wire (pts (xy 200.5 50.25) (xy 210.75 50.25)) + (stroke (width 0) (type default)) + (uuid "wire-4") + ) +) +""" + + +@pytest.fixture +def schematic_with_wires(tmp_path): + """Write a schematic with wire segments to a temp file.""" + filepath = tmp_path / "wired.kicad_sch" + filepath.write_text(SAMPLE_SCHEMATIC_WITH_WIRES) + return str(filepath) + + +class TestParseWireSegments: + def test_finds_all_wires(self, schematic_with_wires): + wires = parse_wire_segments(schematic_with_wires) + assert len(wires) == 4 + + def test_extracts_coordinates(self, schematic_with_wires): + wires = parse_wire_segments(schematic_with_wires) + w1 = next(w for w in wires if w["uuid"] == "wire-1") + assert w1["start"]["x"] == pytest.approx(148.59) + assert w1["start"]["y"] == pytest.approx(194.31) + assert w1["end"]["x"] == pytest.approx(156.21) + assert w1["end"]["y"] == pytest.approx(194.31) + + def test_extracts_uuids(self, schematic_with_wires): + wires = parse_wire_segments(schematic_with_wires) + uuids = {w["uuid"] for w in wires} + assert uuids == {"wire-1", "wire-2", "wire-3", "wire-4"} + + def test_vertical_wire(self, schematic_with_wires): + wires = parse_wire_segments(schematic_with_wires) + w3 = next(w for w in wires if w["uuid"] == "wire-3") + assert w3["start"]["x"] == pytest.approx(100.0) + assert w3["start"]["y"] == pytest.approx(100.0) + assert w3["end"]["x"] == pytest.approx(100.0) + assert w3["end"]["y"] == pytest.approx(120.0) + + def test_nonexistent_file_returns_empty(self): + wires = parse_wire_segments("/nonexistent/path.kicad_sch") + assert wires == [] + + def test_schematic_without_wires(self, tmp_path): + filepath = tmp_path / "empty.kicad_sch" + filepath.write_text("(kicad_sch\n (version 20231120)\n)\n") + wires = parse_wire_segments(str(filepath)) + assert wires == [] + + +class TestRemoveSexpBlocksByUuid: + def test_remove_single_wire(self, schematic_with_wires): + removed = remove_sexp_blocks_by_uuid(schematic_with_wires, {"wire-1"}) + assert removed == 1 + + # Verify wire is gone + remaining = parse_wire_segments(schematic_with_wires) + remaining_uuids = {w["uuid"] for w in remaining} + assert "wire-1" not in remaining_uuids + assert len(remaining) == 3 + + def test_remove_multiple_wires(self, schematic_with_wires): + removed = remove_sexp_blocks_by_uuid( + schematic_with_wires, {"wire-1", "wire-2"}, + ) + assert removed == 2 + + remaining = parse_wire_segments(schematic_with_wires) + remaining_uuids = {w["uuid"] for w in remaining} + assert remaining_uuids == {"wire-3", "wire-4"} + + def test_remove_all_wires(self, schematic_with_wires): + removed = remove_sexp_blocks_by_uuid( + schematic_with_wires, {"wire-1", "wire-2", "wire-3", "wire-4"}, + ) + assert removed == 4 + + remaining = parse_wire_segments(schematic_with_wires) + assert remaining == [] + + # File should still be valid + with open(schematic_with_wires) as f: + content = f.read() + assert content.strip().startswith("(kicad_sch") + assert content.strip().endswith(")") + + def test_remove_nonexistent_uuid(self, schematic_with_wires): + removed = remove_sexp_blocks_by_uuid( + schematic_with_wires, {"nonexistent-uuid"}, + ) + assert removed == 0 + + remaining = parse_wire_segments(schematic_with_wires) + assert len(remaining) == 4 + + def test_empty_uuid_set(self, schematic_with_wires): + removed = remove_sexp_blocks_by_uuid(schematic_with_wires, set()) + assert removed == 0 + + def test_preserves_other_content(self, schematic_with_wires): + remove_sexp_blocks_by_uuid(schematic_with_wires, {"wire-1"}) + + with open(schematic_with_wires) as f: + content = f.read() + + assert "(version 20231120)" in content + assert '(uuid "root-uuid")' in content + assert "(lib_symbols" in content