Add wire auditing, bulk wire removal, and net-to-pin verification tools

Refactors _build_connectivity() into a two-layer state builder so the
union-find internals (pin_at, label_at, wire_segments) are accessible
to new analysis tools without duplicating the 200-line connectivity engine.

New tools:
- audit_wiring: trace all wires connected to a component, report per-pin
  net membership with wire segment coordinates and connected pins
- remove_wires_by_criteria: bulk-remove wires by coordinate filters
  (y, x, min/max ranges, tolerance) with dry_run preview support
- verify_connectivity: compare actual wiring against an expected
  net-to-pin mapping, report matches/mismatches/missing nets

New sexp_parser utilities:
- parse_wire_segments: extract (wire ...) blocks with start/end/uuid
- remove_sexp_blocks_by_uuid: atomically remove blocks by UUID set
This commit is contained in:
Ryan Malloy 2026-03-04 23:12:13 -07:00
parent e88f75f567
commit 61ed7b3efe
6 changed files with 1043 additions and 9 deletions

View File

@ -23,6 +23,9 @@ from mckicad.utils.sexp_parser import (
parse_lib_symbol_pins,
transform_pin_to_schematic,
)
from mckicad.utils.sexp_parser import (
parse_wire_segments as parse_wire_segments_sexp,
)
logger = logging.getLogger(__name__)
@ -111,15 +114,19 @@ def _default_output_path(schematic_path: str, filename: str) -> str:
# ---------------------------------------------------------------------------
def _build_connectivity(
def _build_connectivity_state(
sch: Any, schematic_path: str | None = None
) -> tuple[dict[str, list[dict[str, str]]], list[dict[str, str]]]:
"""Build a net connectivity graph by walking wires, pin positions, and labels.
) -> dict[str, Any]:
"""Build the full connectivity state by walking wires, pin positions, and labels.
This is the inner engine that returns all intermediate data structures.
Use :func:`_build_connectivity` for the simpler ``(net_graph, unconnected)``
return signature.
kicad-sch-api does not auto-compute nets on loaded schematics. This
function reconstructs the connectivity by:
1. Collecting all wire start/end coordinates
1. Collecting all wire start/end coordinates (and wire segment metadata)
2. Mapping component pin positions to coordinates
3. Mapping label positions to coordinates (local, hierarchical, global)
4. Using union-find to group touching points into nets
@ -127,12 +134,20 @@ def _build_connectivity(
When *schematic_path* is provided, ``(global_label ...)`` entries are
also extracted directly from the raw file via :mod:`sexp_parser`, since
kicad-sch-api does not expose them.
kicad-sch-api does not expose them. Wire UUIDs are also parsed from
the raw file for segment tracking.
Returns:
(net_graph, unconnected_pins) where net_graph maps net names to
lists of {reference, pin} dicts, and unconnected_pins lists
pins not connected to any wire or label.
Dict with keys:
- ``find``: union-find lookup function
- ``parent``: raw union-find parent map
- ``pin_at``: coord list of {reference, pin} dicts
- ``label_at``: coord net name string
- ``wire_segments``: list of {start, end, uuid} dicts
- ``wire_coords``: set of all wire endpoint coordinates
- ``all_pins``: list of (reference, pin_num, coord) tuples
- ``net_graph``: net_name list of {reference, pin} dicts
- ``unconnected_pins``: list of {reference, pin} dicts
"""
# -- Union-Find --
_parent: dict[tuple[float, float], tuple[float, float]] = {}
@ -164,6 +179,17 @@ def _build_connectivity(
# --- 1. Process wires — each wire connects its start and end ---
wire_coords: set[tuple[float, float]] = set()
wire_segments: list[dict[str, Any]] = []
# Build a UUID lookup from raw file if available (kicad-sch-api
# doesn't reliably expose wire UUIDs)
sexp_wire_lookup: dict[tuple[tuple[float, float], tuple[float, float]], str] = {}
if schematic_path:
for sw in parse_wire_segments_sexp(schematic_path):
s_key = _rc(sw["start"]["x"], sw["start"]["y"])
e_key = _rc(sw["end"]["x"], sw["end"]["y"])
sexp_wire_lookup[(s_key, e_key)] = sw["uuid"]
for wire in getattr(sch, "wires", []):
start = getattr(wire, "start", None)
end = getattr(wire, "end", None)
@ -177,6 +203,16 @@ def _build_connectivity(
wire_coords.add(s)
wire_coords.add(e)
# Track wire segment metadata for audit tool
wire_uuid = getattr(wire, "uuid", None)
if wire_uuid is None:
wire_uuid = sexp_wire_lookup.get((s, e))
wire_segments.append({
"start": {"x": s[0], "y": s[1]},
"end": {"x": e[0], "y": e[1]},
"uuid": str(wire_uuid) if wire_uuid else None,
})
# --- 2. Map component pin positions to coordinates ---
pin_at: dict[tuple[float, float], list[dict[str, str]]] = {}
all_pins: list[tuple[str, str, tuple[float, float]]] = []
@ -333,7 +369,29 @@ def _build_connectivity(
if coord not in wire_coords and coord not in label_at:
unconnected.append({"reference": ref, "pin": pin_num})
return net_graph, unconnected
return {
"find": _find,
"parent": _parent,
"pin_at": pin_at,
"label_at": label_at,
"wire_segments": wire_segments,
"wire_coords": wire_coords,
"all_pins": all_pins,
"net_graph": net_graph,
"unconnected_pins": unconnected,
}
def _build_connectivity(
sch: Any, schematic_path: str | None = None
) -> tuple[dict[str, list[dict[str, str]]], list[dict[str, str]]]:
"""Build a net connectivity graph by walking wires, pin positions, and labels.
Thin wrapper around :func:`_build_connectivity_state` that returns only
the ``(net_graph, unconnected_pins)`` tuple for backward compatibility.
"""
state = _build_connectivity_state(sch, schematic_path)
return state["net_graph"], state["unconnected_pins"]
# ---------------------------------------------------------------------------
@ -1078,3 +1136,273 @@ def export_schematic_pdf(
except Exception as exc:
logger.error("Schematic PDF export failed: %s", exc, exc_info=True)
return {"success": False, "error": str(exc), "schematic_path": schematic_path}
@mcp.tool()
def audit_wiring(schematic_path: str, reference: str) -> dict[str, Any]:
"""Trace all wires connected to a component and report per-pin net membership.
For each pin on the specified component, reports the net it belongs to,
the wire segments in that net, and other pins sharing the same net.
Essential for debugging wiring errors especially overlapping wire
segments that create unintended shorts.
Args:
schematic_path: Path to a .kicad_sch file.
reference: Component reference designator (e.g. ``U8``, ``R1``).
Returns:
Dictionary with ``reference``, ``pin_nets`` list (per-pin net info
with wire segments and connected pins), and ``detail_file`` when
the result exceeds the inline threshold.
"""
err = _require_sch_api()
if err:
return err
verr = _validate_schematic_path(schematic_path)
if verr:
return verr
if not reference:
return {"success": False, "error": "reference must be a non-empty string"}
schematic_path = _expand(schematic_path)
try:
sch = _ksa_load(schematic_path)
state = _build_connectivity_state(sch, schematic_path)
find_fn = state["find"]
pin_at = state["pin_at"]
wire_segments = state["wire_segments"]
net_graph = state["net_graph"]
# Build reverse lookup: coord root → net name
root_to_net: dict[tuple[float, float], str] = {}
for n_name, n_pins in net_graph.items():
for p in n_pins:
# Find coordinate for this pin
for coord, pin_list in pin_at.items():
for pe in pin_list:
if pe["reference"] == p["reference"] and pe["pin"] == p["pin"]:
root = find_fn(coord)
root_to_net[root] = n_name
# Build wire segments by net root (each wire's endpoints share a root)
root_to_wires: dict[tuple[float, float], list[dict[str, Any]]] = {}
for ws in wire_segments:
s_coord = (round(ws["start"]["x"], 2), round(ws["start"]["y"], 2))
root = find_fn(s_coord)
root_to_wires.setdefault(root, []).append(ws)
# Find all pins for the target component
target_pins: list[tuple[str, tuple[float, float]]] = []
for coord, pin_list in pin_at.items():
for pe in pin_list:
if pe["reference"] == reference:
target_pins.append((pe["pin"], coord))
if not target_pins:
return {
"success": False,
"error": f"Component '{reference}' has no pins in the connectivity graph",
"schematic_path": schematic_path,
}
# Build per-pin audit report
pin_nets: list[dict[str, Any]] = []
for pin_num, coord in target_pins:
root = find_fn(coord)
net_name = root_to_net.get(root, "unconnected")
wires = root_to_wires.get(root, [])
# Collect other pins on the same net
connected_pins: list[dict[str, str]] = []
if net_name in net_graph:
for p in net_graph[net_name]:
if not (p["reference"] == reference and p["pin"] == pin_num):
connected_pins.append(p)
pin_nets.append({
"pin": pin_num,
"net": net_name,
"wire_count": len(wires),
"wires": wires,
"connected_pins": connected_pins,
})
logger.info(
"audit_wiring for %s: %d pins, %d total wire segments",
reference, len(pin_nets),
sum(pn["wire_count"] for pn in pin_nets),
)
result: dict[str, Any] = {
"success": True,
"reference": reference,
"pin_count": len(pin_nets),
"engine": _get_schematic_engine(),
"schematic_path": schematic_path,
}
if len(pin_nets) > INLINE_RESULT_THRESHOLD:
detail_path = write_detail_file(
schematic_path, f"audit_{reference}.json", pin_nets,
)
result["detail_file"] = detail_path
result["pin_nets_preview"] = pin_nets[:INLINE_RESULT_THRESHOLD]
else:
result["pin_nets"] = pin_nets
return result
except Exception as exc:
logger.error("audit_wiring failed for %s: %s", reference, exc, exc_info=True)
return {"success": False, "error": str(exc), "schematic_path": schematic_path}
@mcp.tool()
def verify_connectivity(
schematic_path: str,
expected: dict[str, list[list[str]]],
) -> dict[str, Any]:
"""Compare actual schematic wiring against an expected net-to-pin mapping.
For each expected net, checks whether the correct pins are connected,
flags pins wired to the wrong net, and identifies missing or extra
connections. Use this after bulk edits or PDF-extracted schematics to
verify that wiring matches the intended netlist.
Args:
schematic_path: Path to a .kicad_sch file.
expected: Maps net names to lists of ``[reference, pin]`` pairs.
Example: ``{"ESP_EN": [["U8", "103"]], "GND": [["U8", "1"], ["C1", "2"]]}``
Returns:
Dictionary with ``verified`` count, ``failed`` count, and ``results``
list with per-net status (``match``, ``mismatch``, ``missing_net``).
"""
err = _require_sch_api()
if err:
return err
verr = _validate_schematic_path(schematic_path)
if verr:
return verr
if not expected:
return {"success": False, "error": "expected mapping must be non-empty"}
schematic_path = _expand(schematic_path)
try:
sch = _ksa_load(schematic_path)
state = _build_connectivity_state(sch, schematic_path)
net_graph = state["net_graph"]
# Build reverse lookup: (reference, pin) → actual net name
pin_to_actual_net: dict[tuple[str, str], str] = {}
for n_name, n_pins in net_graph.items():
for p in n_pins:
pin_to_actual_net[(p["reference"], p["pin"])] = n_name
results: list[dict[str, Any]] = []
verified = 0
failed = 0
for net_name, expected_pins in expected.items():
# Validate expected_pins format
pin_pairs = []
for pair in expected_pins:
if not isinstance(pair, list) or len(pair) != 2:
results.append({
"net": net_name,
"status": "invalid_format",
"error": f"Expected [reference, pin] pair, got: {pair}",
})
failed += 1
continue
pin_pairs.append((str(pair[0]), str(pair[1])))
if not pin_pairs:
continue
# Check if net exists in actual graph
actual_pins_on_net: list[dict[str, str]] = net_graph.get(net_name, [])
# Check each expected pin
all_match = True
missing_pins: list[list[str]] = []
wrong_net_pins: list[dict[str, Any]] = []
extra_pins: list[list[str]] = []
expected_pin_set: set[tuple[str, str]] = set()
for ref, pin in pin_pairs:
expected_pin_set.add((ref, pin))
actual_net = pin_to_actual_net.get((ref, pin))
if actual_net is None:
missing_pins.append([ref, pin])
all_match = False
elif actual_net != net_name:
wrong_net_pins.append({
"pin": [ref, pin],
"expected_net": net_name,
"actual_net": actual_net,
})
all_match = False
# Find extra pins on this net (in actual but not expected)
for p in actual_pins_on_net:
key = (p["reference"], p["pin"])
if key not in expected_pin_set:
extra_pins.append([p["reference"], p["pin"]])
if all_match and not extra_pins:
results.append({"net": net_name, "status": "match"})
verified += 1
elif not actual_pins_on_net and not wrong_net_pins:
entry: dict[str, Any] = {
"net": net_name,
"status": "missing_net",
"expected_pins": [list(p) for p in pin_pairs],
}
if missing_pins:
entry["missing_pins"] = missing_pins
results.append(entry)
failed += 1
else:
entry = {
"net": net_name,
"status": "mismatch",
"expected_pins": [list(p) for p in pin_pairs],
"actual_pins": [[p["reference"], p["pin"]] for p in actual_pins_on_net],
}
if missing_pins:
entry["missing_pins"] = missing_pins
if wrong_net_pins:
entry["wrong_net"] = wrong_net_pins
if extra_pins:
entry["extra_pins"] = extra_pins
results.append(entry)
failed += 1
logger.info(
"verify_connectivity: %d verified, %d failed out of %d nets",
verified, failed, len(expected),
)
return {
"success": True,
"verified": verified,
"failed": failed,
"total": len(expected),
"results": results,
"engine": _get_schematic_engine(),
"schematic_path": schematic_path,
}
except Exception as exc:
logger.error("verify_connectivity failed: %s", exc, exc_info=True)
return {"success": False, "error": str(exc), "schematic_path": schematic_path}

View File

@ -4,6 +4,9 @@ Schematic editing tools for modifying existing elements in KiCad schematics.
Complements the creation-oriented tools in schematic.py with modification,
removal, and annotation capabilities. Uses the same kicad-sch-api engine
and follows the same swap-point pattern for future kipy IPC support.
Also includes s-expression-direct tools (e.g. ``remove_wires_by_criteria``)
that bypass kicad-sch-api for operations it doesn't support well.
"""
import logging
@ -11,6 +14,10 @@ import os
from typing import Any
from mckicad.server import mcp
from mckicad.utils.sexp_parser import (
parse_wire_segments,
remove_sexp_blocks_by_uuid,
)
logger = logging.getLogger(__name__)
@ -578,3 +585,134 @@ def backup_schematic(schematic_path: str) -> dict[str, Any]:
except Exception as e:
logger.error("Failed to back up %s: %s", schematic_path, e)
return {"success": False, "error": str(e), "schematic_path": schematic_path}
@mcp.tool()
def remove_wires_by_criteria(
schematic_path: str,
y: float | None = None,
x: float | None = None,
min_x: float | None = None,
max_x: float | None = None,
min_y: float | None = None,
max_y: float | None = None,
tolerance: float = 0.01,
dry_run: bool = False,
) -> dict[str, Any]:
"""Remove wire segments matching coordinate criteria from a KiCad schematic.
Uses direct s-expression manipulation to surgically remove wires
by their position essential for cleaning up bulk wiring errors
like overlapping horizontal/vertical segments that create shorts.
All specified criteria must match for a wire to be selected.
At least one criterion is required (won't delete all wires).
Args:
schematic_path: Path to an existing .kicad_sch file.
y: Match horizontal wires where both endpoints have Y within
``tolerance`` of this value.
x: Match vertical wires where both endpoints have X within
``tolerance`` of this value.
min_x: Both endpoints' X values must be >= this value (minus tolerance).
max_x: Both endpoints' X values must be <= this value (plus tolerance).
min_y: Both endpoints' Y values must be >= this value (minus tolerance).
max_y: Both endpoints' Y values must be <= this value (plus tolerance).
tolerance: Coordinate comparison tolerance (default 0.01).
dry_run: If True, return matching wires without removing them.
Returns:
Dictionary with ``matched_count``, ``removed_count`` (0 if dry_run),
and ``matched_wires`` preview list.
"""
verr = _validate_schematic_path(schematic_path)
if verr:
return verr
schematic_path = _expand(schematic_path)
# Require at least one filtering criterion
has_criteria = any(v is not None for v in (y, x, min_x, max_x, min_y, max_y))
if not has_criteria:
return {
"success": False,
"error": "At least one coordinate criterion (y, x, min_x, max_x, min_y, max_y) is required",
}
try:
all_wires = parse_wire_segments(schematic_path)
if not all_wires:
return {
"success": True,
"matched_count": 0,
"removed_count": 0,
"note": "No wire segments found in schematic",
"schematic_path": schematic_path,
}
# Filter wires by criteria
matched: list[dict[str, Any]] = []
for wire in all_wires:
sx, sy = wire["start"]["x"], wire["start"]["y"]
ex, ey = wire["end"]["x"], wire["end"]["y"]
if y is not None and (abs(sy - y) > tolerance or abs(ey - y) > tolerance):
continue
if x is not None and (abs(sx - x) > tolerance or abs(ex - x) > tolerance):
continue
if min_x is not None and min(sx, ex) < min_x - tolerance:
continue
if max_x is not None and max(sx, ex) > max_x + tolerance:
continue
if min_y is not None and min(sy, ey) < min_y - tolerance:
continue
if max_y is not None and max(sy, ey) > max_y + tolerance:
continue
matched.append(wire)
if dry_run or not matched:
return {
"success": True,
"dry_run": dry_run,
"matched_count": len(matched),
"removed_count": 0,
"matched_wires": matched[:50], # cap preview
"schematic_path": schematic_path,
}
# Remove matched wires via s-expression manipulation
uuids_to_remove = {w["uuid"] for w in matched if w.get("uuid")}
if not uuids_to_remove:
return {
"success": False,
"error": "Matched wires have no UUIDs — cannot remove (file may be malformed)",
"matched_count": len(matched),
"schematic_path": schematic_path,
}
removed_count = remove_sexp_blocks_by_uuid(schematic_path, uuids_to_remove)
logger.info(
"remove_wires_by_criteria: removed %d/%d matched wires from %s",
removed_count, len(matched), schematic_path,
)
return {
"success": True,
"dry_run": False,
"matched_count": len(matched),
"removed_count": removed_count,
"matched_wires": matched[:50],
"schematic_path": schematic_path,
}
except Exception as e:
logger.error("remove_wires_by_criteria failed: %s", e, exc_info=True)
return {"success": False, "error": str(e), "schematic_path": schematic_path}

View File

@ -569,6 +569,142 @@ def _suppress_os_error():
return contextlib.suppress(OSError)
# ---------------------------------------------------------------------------
# Wire segment extraction and removal
# ---------------------------------------------------------------------------
# Match (wire ...) blocks: extract start/end coordinates and UUID.
# Format:
# (wire (pts (xy X1 Y1) (xy X2 Y2))
# (stroke ...)
# (uuid "UUID")
# )
_WIRE_BLOCK_RE = re.compile(
r'\(wire\s+'
r'\(pts\s+'
r'\(xy\s+([\d.e+-]+)\s+([\d.e+-]+)\)\s*' # start x, y
r'\(xy\s+([\d.e+-]+)\s+([\d.e+-]+)\)\s*' # end x, y
r'\)' # close pts
r'.*?' # stroke etc (non-greedy)
r'\(uuid\s+"([^"]+)"\)', # uuid
re.DOTALL,
)
def parse_wire_segments(filepath: str) -> list[dict[str, Any]]:
"""Extract all ``(wire ...)`` blocks from a ``.kicad_sch`` file.
Returns a list of dicts with ``start`` (x, y), ``end`` (x, y), and
``uuid`` for each wire segment found.
"""
try:
with open(filepath, encoding="utf-8") as f:
content = f.read()
except Exception:
return []
wires: list[dict[str, Any]] = []
for match in _WIRE_BLOCK_RE.finditer(content):
wires.append({
"start": {"x": float(match.group(1)), "y": float(match.group(2))},
"end": {"x": float(match.group(3)), "y": float(match.group(4))},
"uuid": match.group(5),
})
return wires
def remove_sexp_blocks_by_uuid(filepath: str, uuids: set[str]) -> int:
"""Remove ``(wire ...)`` or other top-level s-expression blocks whose UUID
is in the given set.
Uses bracket-counting to find each ``(wire`` block, checks for a UUID
match, and rebuilds the file without matched blocks. Performs an atomic
write (temp file + ``os.replace()``) to avoid corruption.
Args:
filepath: Path to an existing ``.kicad_sch`` file.
uuids: Set of UUID strings to match for removal.
Returns:
Number of blocks actually removed.
"""
if not uuids:
return 0
with open(filepath, encoding="utf-8") as f:
content = f.read()
# Find all (wire ...) block spans and mark those matching a target UUID
removals: list[tuple[int, int]] = [] # (start, end) byte offsets
search_start = 0
while True:
idx = content.find("(wire ", search_start)
if idx == -1:
idx = content.find("(wire\n", search_start)
if idx == -1:
idx = content.find("(wire\t", search_start)
if idx == -1:
break
# Bracket-count to find the end of this (wire ...) block
depth = 0
block_end = idx
for i in range(idx, len(content)):
if content[i] == "(":
depth += 1
elif content[i] == ")":
depth -= 1
if depth == 0:
block_end = i + 1
break
block_text = content[idx:block_end]
# Check if this block's UUID matches
uuid_match = re.search(r'\(uuid\s+"([^"]+)"\)', block_text)
if uuid_match and uuid_match.group(1) in uuids:
# Include any leading whitespace/newline on the same line
trim_start = idx
while trim_start > 0 and content[trim_start - 1] in (" ", "\t"):
trim_start -= 1
# Include trailing newline if present
trim_end = block_end
if trim_end < len(content) and content[trim_end] == "\n":
trim_end += 1
removals.append((trim_start, trim_end))
search_start = block_end
if not removals:
return 0
# Rebuild content without removed blocks
parts: list[str] = []
prev_end = 0
for start, end in removals:
parts.append(content[prev_end:start])
prev_end = end
parts.append(content[prev_end:])
new_content = "".join(parts)
# Atomic write
fd, tmp_path = tempfile.mkstemp(
dir=os.path.dirname(filepath) or ".",
suffix=".kicad_sch.tmp",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as tmp_f:
tmp_f.write(new_content)
os.replace(tmp_path, filepath)
except BaseException:
with _suppress_os_error():
os.unlink(tmp_path)
raise
return len(removals)
# ---------------------------------------------------------------------------
# Shared pin resolution — "try API, fall back to sexp" pattern
# ---------------------------------------------------------------------------

View File

@ -1,6 +1,7 @@
"""Tests for schematic analysis tools."""
import pytest
from tests.conftest import requires_sch_api
@ -137,3 +138,125 @@ class TestExportValidation:
)
assert result["success"] is False
assert "Unsupported" in result.get("error", "")
@requires_sch_api
@pytest.mark.unit
class TestAuditWiring:
"""Tests for the audit_wiring tool."""
def test_audit_existing_component(self, populated_schematic):
from mckicad.tools.schematic_analysis import audit_wiring
result = audit_wiring(
schematic_path=populated_schematic,
reference="R1",
)
assert result["success"] is True
assert result["reference"] == "R1"
assert "pin_nets" in result or "pin_nets_preview" in result
assert result.get("pin_count", 0) > 0
def test_audit_nonexistent_component(self, populated_schematic):
from mckicad.tools.schematic_analysis import audit_wiring
result = audit_wiring(
schematic_path=populated_schematic,
reference="Z99",
)
assert result["success"] is False
def test_audit_invalid_path(self):
from mckicad.tools.schematic_analysis import audit_wiring
result = audit_wiring(
schematic_path="/tmp/nonexistent.kicad_sch",
reference="R1",
)
assert result["success"] is False
def test_audit_empty_reference(self):
from mckicad.tools.schematic_analysis import audit_wiring
result = audit_wiring(
schematic_path="/tmp/nonexistent.kicad_sch",
reference="",
)
assert result["success"] is False
def test_audit_pin_net_structure(self, populated_schematic):
from mckicad.tools.schematic_analysis import audit_wiring
result = audit_wiring(
schematic_path=populated_schematic,
reference="R1",
)
if result["success"]:
pin_nets = result.get("pin_nets", result.get("pin_nets_preview", []))
for pn in pin_nets:
assert "pin" in pn
assert "net" in pn
assert "wire_count" in pn
assert "wires" in pn
assert "connected_pins" in pn
@requires_sch_api
@pytest.mark.unit
class TestVerifyConnectivity:
"""Tests for the verify_connectivity tool."""
def test_verify_with_matching_net(self, populated_schematic):
from mckicad.tools.schematic_analysis import (
analyze_connectivity,
verify_connectivity,
)
# First get actual connectivity to build a valid expected map
conn = analyze_connectivity(schematic_path=populated_schematic)
assert conn["success"] is True
# Try to verify with an empty expected — should fail validation
result = verify_connectivity(
schematic_path=populated_schematic,
expected={},
)
assert result["success"] is False
def test_verify_missing_net(self, populated_schematic):
from mckicad.tools.schematic_analysis import verify_connectivity
result = verify_connectivity(
schematic_path=populated_schematic,
expected={"NONEXISTENT_NET": [["U99", "1"]]},
)
assert result["success"] is True
assert result["failed"] >= 1
# Should report as missing_net or missing pin
statuses = {r["status"] for r in result["results"]}
assert statuses & {"missing_net", "mismatch"}
def test_verify_invalid_path(self):
from mckicad.tools.schematic_analysis import verify_connectivity
result = verify_connectivity(
schematic_path="/tmp/nonexistent.kicad_sch",
expected={"NET": [["R1", "1"]]},
)
assert result["success"] is False
def test_verify_result_structure(self, populated_schematic):
from mckicad.tools.schematic_analysis import verify_connectivity
result = verify_connectivity(
schematic_path=populated_schematic,
expected={"TEST_NET": [["R1", "1"]]},
)
assert result["success"] is True
assert "verified" in result
assert "failed" in result
assert "total" in result
assert "results" in result
for r in result["results"]:
assert "net" in r
assert "status" in r

View File

@ -1,6 +1,7 @@
"""Tests for schematic editing tools."""
import os
import tempfile
import pytest
@ -174,3 +175,172 @@ class TestAnnotationTools:
y=300,
)
assert result["success"] is True
# Minimal schematic with wire segments for testing remove_wires_by_criteria.
# Does NOT require kicad-sch-api — works on raw s-expression files.
_WIRES_SCHEMATIC = """\
(kicad_sch
(version 20231120)
(generator "eeschema")
(uuid "test-root")
(paper "A4")
(lib_symbols
)
(wire (pts (xy 148.59 194.31) (xy 156.21 194.31))
(stroke (width 0) (type default))
(uuid "hw-1")
)
(wire (pts (xy 156.21 194.31) (xy 163.83 194.31))
(stroke (width 0) (type default))
(uuid "hw-2")
)
(wire (pts (xy 163.83 194.31) (xy 171.45 194.31))
(stroke (width 0) (type default))
(uuid "hw-3")
)
(wire (pts (xy 100.0 100.0) (xy 100.0 130.0))
(stroke (width 0) (type default))
(uuid "vw-1")
)
(wire (pts (xy 200.0 50.0) (xy 250.0 80.0))
(stroke (width 0) (type default))
(uuid "dw-1")
)
)
"""
@pytest.fixture
def wires_schematic():
"""Write a schematic with wires to a temp file for testing."""
with tempfile.NamedTemporaryFile(
mode="w", suffix=".kicad_sch", delete=False, encoding="utf-8",
) as f:
f.write(_WIRES_SCHEMATIC)
path = f.name
yield path
os.unlink(path)
@pytest.mark.unit
class TestRemoveWiresByCriteria:
"""Tests for the remove_wires_by_criteria tool."""
def test_dry_run_horizontal(self, wires_schematic):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
y=194.31,
dry_run=True,
)
assert result["success"] is True
assert result["dry_run"] is True
assert result["matched_count"] == 3
assert result["removed_count"] == 0
def test_remove_horizontal_wires(self, wires_schematic):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
y=194.31,
)
assert result["success"] is True
assert result["removed_count"] == 3
# Verify they're gone
from mckicad.utils.sexp_parser import parse_wire_segments
remaining = parse_wire_segments(wires_schematic)
remaining_uuids = {w["uuid"] for w in remaining}
assert "hw-1" not in remaining_uuids
assert "hw-2" not in remaining_uuids
assert "hw-3" not in remaining_uuids
assert "vw-1" in remaining_uuids
def test_remove_with_x_range(self, wires_schematic):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
y=194.31,
min_x=148.0,
max_x=164.0,
)
assert result["success"] is True
assert result["matched_count"] == 2 # hw-1 and hw-2 only
def test_remove_vertical_wire(self, wires_schematic):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
x=100.0,
)
assert result["success"] is True
assert result["matched_count"] == 1
assert result["removed_count"] == 1
def test_no_criteria_fails(self, wires_schematic):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
)
assert result["success"] is False
assert "criterion" in result["error"].lower()
def test_no_matches(self, wires_schematic):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
y=999.0,
)
assert result["success"] is True
assert result["matched_count"] == 0
def test_invalid_path(self):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
result = remove_wires_by_criteria(
schematic_path="/tmp/nonexistent.kicad_sch",
y=100.0,
)
assert result["success"] is False
def test_min_max_y_range(self, wires_schematic):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
min_y=99.0,
max_y=131.0,
dry_run=True,
)
assert result["success"] is True
# vw-1 (100→130) fits, horizontal wires at 194.31 do not
assert result["matched_count"] == 1
def test_tolerance(self, wires_schematic):
from mckicad.tools.schematic_edit import remove_wires_by_criteria
# With tight tolerance, exact match
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
y=194.31,
tolerance=0.001,
dry_run=True,
)
assert result["matched_count"] == 3
# Offset slightly beyond tolerance
result = remove_wires_by_criteria(
schematic_path=wires_schematic,
y=194.35,
tolerance=0.01,
dry_run=True,
)
assert result["matched_count"] == 0

View File

@ -15,6 +15,8 @@ from mckicad.utils.sexp_parser import (
parse_global_labels,
parse_lib_file_symbol_pins,
parse_lib_symbol_pins,
parse_wire_segments,
remove_sexp_blocks_by_uuid,
transform_pin_to_schematic,
)
@ -720,3 +722,140 @@ class TestSymLibTableParsing:
)
assert result is not None
assert result.endswith("MyProject.kicad_sym")
# ---------------------------------------------------------------------------
# Wire segment parsing and removal tests
# ---------------------------------------------------------------------------
SAMPLE_SCHEMATIC_WITH_WIRES = """\
(kicad_sch
(version 20231120)
(generator "eeschema")
(uuid "root-uuid")
(paper "A4")
(lib_symbols
)
(wire (pts (xy 148.59 194.31) (xy 156.21 194.31))
(stroke (width 0) (type default))
(uuid "wire-1")
)
(wire (pts (xy 156.21 194.31) (xy 163.83 194.31))
(stroke (width 0) (type default))
(uuid "wire-2")
)
(wire (pts (xy 100.0 100.0) (xy 100.0 120.0))
(stroke (width 0) (type default))
(uuid "wire-3")
)
(wire (pts (xy 200.5 50.25) (xy 210.75 50.25))
(stroke (width 0) (type default))
(uuid "wire-4")
)
)
"""
@pytest.fixture
def schematic_with_wires(tmp_path):
"""Write a schematic with wire segments to a temp file."""
filepath = tmp_path / "wired.kicad_sch"
filepath.write_text(SAMPLE_SCHEMATIC_WITH_WIRES)
return str(filepath)
class TestParseWireSegments:
def test_finds_all_wires(self, schematic_with_wires):
wires = parse_wire_segments(schematic_with_wires)
assert len(wires) == 4
def test_extracts_coordinates(self, schematic_with_wires):
wires = parse_wire_segments(schematic_with_wires)
w1 = next(w for w in wires if w["uuid"] == "wire-1")
assert w1["start"]["x"] == pytest.approx(148.59)
assert w1["start"]["y"] == pytest.approx(194.31)
assert w1["end"]["x"] == pytest.approx(156.21)
assert w1["end"]["y"] == pytest.approx(194.31)
def test_extracts_uuids(self, schematic_with_wires):
wires = parse_wire_segments(schematic_with_wires)
uuids = {w["uuid"] for w in wires}
assert uuids == {"wire-1", "wire-2", "wire-3", "wire-4"}
def test_vertical_wire(self, schematic_with_wires):
wires = parse_wire_segments(schematic_with_wires)
w3 = next(w for w in wires if w["uuid"] == "wire-3")
assert w3["start"]["x"] == pytest.approx(100.0)
assert w3["start"]["y"] == pytest.approx(100.0)
assert w3["end"]["x"] == pytest.approx(100.0)
assert w3["end"]["y"] == pytest.approx(120.0)
def test_nonexistent_file_returns_empty(self):
wires = parse_wire_segments("/nonexistent/path.kicad_sch")
assert wires == []
def test_schematic_without_wires(self, tmp_path):
filepath = tmp_path / "empty.kicad_sch"
filepath.write_text("(kicad_sch\n (version 20231120)\n)\n")
wires = parse_wire_segments(str(filepath))
assert wires == []
class TestRemoveSexpBlocksByUuid:
def test_remove_single_wire(self, schematic_with_wires):
removed = remove_sexp_blocks_by_uuid(schematic_with_wires, {"wire-1"})
assert removed == 1
# Verify wire is gone
remaining = parse_wire_segments(schematic_with_wires)
remaining_uuids = {w["uuid"] for w in remaining}
assert "wire-1" not in remaining_uuids
assert len(remaining) == 3
def test_remove_multiple_wires(self, schematic_with_wires):
removed = remove_sexp_blocks_by_uuid(
schematic_with_wires, {"wire-1", "wire-2"},
)
assert removed == 2
remaining = parse_wire_segments(schematic_with_wires)
remaining_uuids = {w["uuid"] for w in remaining}
assert remaining_uuids == {"wire-3", "wire-4"}
def test_remove_all_wires(self, schematic_with_wires):
removed = remove_sexp_blocks_by_uuid(
schematic_with_wires, {"wire-1", "wire-2", "wire-3", "wire-4"},
)
assert removed == 4
remaining = parse_wire_segments(schematic_with_wires)
assert remaining == []
# File should still be valid
with open(schematic_with_wires) as f:
content = f.read()
assert content.strip().startswith("(kicad_sch")
assert content.strip().endswith(")")
def test_remove_nonexistent_uuid(self, schematic_with_wires):
removed = remove_sexp_blocks_by_uuid(
schematic_with_wires, {"nonexistent-uuid"},
)
assert removed == 0
remaining = parse_wire_segments(schematic_with_wires)
assert len(remaining) == 4
def test_empty_uuid_set(self, schematic_with_wires):
removed = remove_sexp_blocks_by_uuid(schematic_with_wires, set())
assert removed == 0
def test_preserves_other_content(self, schematic_with_wires):
remove_sexp_blocks_by_uuid(schematic_with_wires, {"wire-1"})
with open(schematic_with_wires) as f:
content = f.read()
assert "(version 20231120)" in content
assert '(uuid "root-uuid")' in content
assert "(lib_symbols" in content