Some checks are pending
CI / Lint and Format (push) Waiting to run
CI / Test Python 3.11 on macos-latest (push) Waiting to run
CI / Test Python 3.12 on macos-latest (push) Waiting to run
CI / Test Python 3.13 on macos-latest (push) Waiting to run
CI / Test Python 3.10 on ubuntu-latest (push) Waiting to run
CI / Test Python 3.11 on ubuntu-latest (push) Waiting to run
CI / Test Python 3.12 on ubuntu-latest (push) Waiting to run
CI / Test Python 3.13 on ubuntu-latest (push) Waiting to run
CI / Security Scan (push) Waiting to run
CI / Build Package (push) Blocked by required conditions
Blanket exclude_points in clamp_stub_length() skipped same-component obstacles regardless of direction, allowing stubs to bridge through adjacent pins (R2 +3V3/SDA). Moved exclusion to batch.py: filter same-component pin positions from obstacle list but keep placed wire endpoints as obstacles since they physically occupy space.
935 lines
35 KiB
Python
935 lines
35 KiB
Python
"""
|
|
Batch operations tool for the mckicad MCP server.
|
|
|
|
Applies multiple schematic modifications (components, power symbols, wires,
|
|
labels, no-connects) atomically from a JSON file. All operations share a
|
|
single load-save cycle for performance and consistency.
|
|
|
|
Batch JSON files live in the ``.mckicad/`` sidecar directory by default.
|
|
The schema supports five operation types processed in dependency order:
|
|
components -> power_symbols -> wires -> labels -> no_connects.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Any
|
|
|
|
from mckicad.config import BATCH_LIMITS, LABEL_DEFAULTS
|
|
from mckicad.server import mcp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Map user-facing direction names to the pin schematic rotation that
|
|
# produces that label placement direction (label goes opposite to body).
|
|
_DIRECTION_TO_ROTATION: dict[str, float] = {
|
|
"left": 0, # body right → label left
|
|
"right": 180, # body left → label right
|
|
"up": 90, # body down → label up
|
|
"down": 270, # body up → label down
|
|
}
|
|
|
|
_HAS_SCH_API = False
|
|
|
|
try:
|
|
from kicad_sch_api import load_schematic as _ksa_load
|
|
|
|
_HAS_SCH_API = True
|
|
except ImportError:
|
|
logger.warning(
|
|
"kicad-sch-api not installed — batch tools will return helpful errors. "
|
|
"Install with: uv add kicad-sch-api"
|
|
)
|
|
|
|
|
|
def _require_sch_api() -> dict[str, Any] | None:
|
|
if not _HAS_SCH_API:
|
|
return {
|
|
"success": False,
|
|
"error": "kicad-sch-api is not installed. Install it with: uv add kicad-sch-api",
|
|
"engine": "none",
|
|
}
|
|
return None
|
|
|
|
|
|
def _get_schematic_engine() -> str:
|
|
return "kicad-sch-api" if _HAS_SCH_API else "none"
|
|
|
|
|
|
def _validate_schematic_path(path: str, must_exist: bool = True) -> dict[str, Any] | None:
|
|
if not path:
|
|
return {"success": False, "error": "Schematic path must be a non-empty string"}
|
|
expanded = os.path.expanduser(path)
|
|
if not expanded.endswith(".kicad_sch"):
|
|
return {"success": False, "error": f"Path must end with .kicad_sch, got: {path}"}
|
|
if must_exist and not os.path.isfile(expanded):
|
|
return {"success": False, "error": f"Schematic file not found: {expanded}"}
|
|
return None
|
|
|
|
|
|
def _expand(path: str) -> str:
|
|
return os.path.abspath(os.path.expanduser(path))
|
|
|
|
|
|
def _resolve_batch_file(batch_file: str, schematic_path: str) -> str:
|
|
"""Resolve batch file path, checking .mckicad/ directory if relative."""
|
|
expanded = os.path.expanduser(batch_file)
|
|
if os.path.isabs(expanded):
|
|
return expanded
|
|
|
|
# Try relative to .mckicad/ sidecar directory
|
|
sch_dir = os.path.dirname(os.path.abspath(schematic_path))
|
|
mckicad_path = os.path.join(sch_dir, ".mckicad", expanded)
|
|
if os.path.isfile(mckicad_path):
|
|
return mckicad_path
|
|
|
|
# Try relative to schematic directory
|
|
sch_relative = os.path.join(sch_dir, expanded)
|
|
if os.path.isfile(sch_relative):
|
|
return sch_relative
|
|
|
|
return mckicad_path # Return .mckicad path for error reporting
|
|
|
|
|
|
def _validate_batch_data(data: dict[str, Any], sch: Any) -> list[str]:
|
|
"""Validate batch JSON data before applying. Returns list of error strings."""
|
|
errors: list[str] = []
|
|
|
|
# Check for unknown keys
|
|
valid_keys = {"components", "power_symbols", "wires", "labels", "no_connects", "label_connections"}
|
|
unknown = set(data.keys()) - valid_keys
|
|
if unknown:
|
|
errors.append(f"Unknown batch keys: {', '.join(sorted(unknown))}")
|
|
|
|
components = data.get("components", [])
|
|
power_symbols = data.get("power_symbols", [])
|
|
wires = data.get("wires", [])
|
|
labels = data.get("labels", [])
|
|
no_connects = data.get("no_connects", [])
|
|
label_connections = data.get("label_connections", [])
|
|
lc_count = sum(len(lc.get("connections", [])) for lc in label_connections if isinstance(lc, dict))
|
|
|
|
# Check limits
|
|
if len(components) > BATCH_LIMITS["max_components"]:
|
|
errors.append(
|
|
f"Too many components: {len(components)} "
|
|
f"(max {BATCH_LIMITS['max_components']})"
|
|
)
|
|
if len(wires) > BATCH_LIMITS["max_wires"]:
|
|
errors.append(
|
|
f"Too many wires: {len(wires)} (max {BATCH_LIMITS['max_wires']})"
|
|
)
|
|
if len(labels) + lc_count > BATCH_LIMITS["max_labels"]:
|
|
errors.append(
|
|
f"Too many labels (including label_connections): {len(labels) + lc_count} "
|
|
f"(max {BATCH_LIMITS['max_labels']})"
|
|
)
|
|
|
|
total = len(components) + len(power_symbols) + len(wires) + len(labels) + len(no_connects) + lc_count
|
|
if total > BATCH_LIMITS["max_total_operations"]:
|
|
errors.append(
|
|
f"Too many total operations: {total} "
|
|
f"(max {BATCH_LIMITS['max_total_operations']})"
|
|
)
|
|
|
|
if total == 0:
|
|
errors.append("Batch file contains no operations")
|
|
|
|
# Validate component entries
|
|
# Track refs declared in this batch for wire pin-reference validation
|
|
batch_refs: set[str] = set()
|
|
existing_refs: set[str] = set()
|
|
for comp in sch.components:
|
|
ref = getattr(comp, "reference", None)
|
|
if ref:
|
|
existing_refs.add(ref)
|
|
|
|
for i, comp in enumerate(components):
|
|
if not isinstance(comp, dict):
|
|
errors.append(f"components[{i}]: must be a dict")
|
|
continue
|
|
if "lib_id" not in comp:
|
|
errors.append(f"components[{i}]: missing required field 'lib_id'")
|
|
if "x" not in comp or "y" not in comp:
|
|
errors.append(f"components[{i}]: missing required fields 'x' and 'y'")
|
|
ref = comp.get("reference")
|
|
if ref:
|
|
batch_refs.add(ref)
|
|
|
|
# Validate power symbol entries
|
|
for i, ps in enumerate(power_symbols):
|
|
if not isinstance(ps, dict):
|
|
errors.append(f"power_symbols[{i}]: must be a dict")
|
|
continue
|
|
if "net" not in ps:
|
|
errors.append(f"power_symbols[{i}]: missing required field 'net'")
|
|
if "pin_ref" not in ps:
|
|
errors.append(f"power_symbols[{i}]: missing required field 'pin_ref'")
|
|
if "pin_number" not in ps:
|
|
errors.append(f"power_symbols[{i}]: missing required field 'pin_number'")
|
|
|
|
pin_ref = ps.get("pin_ref", "")
|
|
if pin_ref and pin_ref not in existing_refs and pin_ref not in batch_refs:
|
|
errors.append(
|
|
f"power_symbols[{i}]: pin_ref '{pin_ref}' not found in schematic "
|
|
f"or batch components"
|
|
)
|
|
|
|
# Validate wire entries
|
|
for i, wire in enumerate(wires):
|
|
if not isinstance(wire, dict):
|
|
errors.append(f"wires[{i}]: must be a dict")
|
|
continue
|
|
|
|
has_coords = all(k in wire for k in ("start_x", "start_y", "end_x", "end_y"))
|
|
has_refs = all(k in wire for k in ("from_ref", "from_pin", "to_ref", "to_pin"))
|
|
|
|
if not has_coords and not has_refs:
|
|
errors.append(
|
|
f"wires[{i}]: must have either coordinate fields "
|
|
f"(start_x/start_y/end_x/end_y) or pin-reference fields "
|
|
f"(from_ref/from_pin/to_ref/to_pin)"
|
|
)
|
|
|
|
if has_refs:
|
|
for ref_key in ("from_ref", "to_ref"):
|
|
ref = wire.get(ref_key, "")
|
|
if ref and ref not in existing_refs and ref not in batch_refs:
|
|
errors.append(
|
|
f"wires[{i}]: {ref_key} '{ref}' not found in schematic "
|
|
f"or batch components"
|
|
)
|
|
|
|
# Validate label entries (either coordinate or pin-reference placement)
|
|
for i, label in enumerate(labels):
|
|
if not isinstance(label, dict):
|
|
errors.append(f"labels[{i}]: must be a dict")
|
|
continue
|
|
if "text" not in label:
|
|
errors.append(f"labels[{i}]: missing required field 'text'")
|
|
|
|
has_coords = "x" in label and "y" in label
|
|
has_pin_ref = "pin_ref" in label and "pin_number" in label
|
|
|
|
if not has_coords and not has_pin_ref:
|
|
errors.append(
|
|
f"labels[{i}]: must have either coordinate fields "
|
|
f"(x/y) or pin-reference fields (pin_ref/pin_number)"
|
|
)
|
|
|
|
if has_pin_ref:
|
|
pin_ref = label.get("pin_ref", "")
|
|
if pin_ref and pin_ref not in existing_refs and pin_ref not in batch_refs:
|
|
errors.append(
|
|
f"labels[{i}]: pin_ref '{pin_ref}' not found in schematic "
|
|
f"or batch components"
|
|
)
|
|
|
|
# Validate label_connections entries
|
|
for i, lc in enumerate(label_connections):
|
|
if not isinstance(lc, dict):
|
|
errors.append(f"label_connections[{i}]: must be a dict")
|
|
continue
|
|
if "net" not in lc:
|
|
errors.append(f"label_connections[{i}]: missing required field 'net'")
|
|
connections = lc.get("connections", [])
|
|
if not connections:
|
|
errors.append(f"label_connections[{i}]: missing or empty 'connections' list")
|
|
for j, conn in enumerate(connections):
|
|
if not isinstance(conn, dict):
|
|
errors.append(f"label_connections[{i}].connections[{j}]: must be a dict")
|
|
continue
|
|
if "ref" not in conn or "pin" not in conn:
|
|
errors.append(
|
|
f"label_connections[{i}].connections[{j}]: "
|
|
f"missing required fields 'ref' and 'pin'"
|
|
)
|
|
ref = conn.get("ref", "")
|
|
if ref and ref not in existing_refs and ref not in batch_refs:
|
|
errors.append(
|
|
f"label_connections[{i}].connections[{j}]: "
|
|
f"ref '{ref}' not found in schematic or batch components"
|
|
)
|
|
|
|
# Validate no-connect entries (coordinate or pin-reference placement)
|
|
for i, nc in enumerate(no_connects):
|
|
if not isinstance(nc, dict):
|
|
errors.append(f"no_connects[{i}]: must be a dict")
|
|
continue
|
|
|
|
has_coords = "x" in nc and "y" in nc
|
|
has_pin_ref = "pin_ref" in nc and "pin_number" in nc
|
|
|
|
if not has_coords and not has_pin_ref:
|
|
errors.append(
|
|
f"no_connects[{i}]: must have either coordinate fields "
|
|
f"(x/y) or pin-reference fields (pin_ref/pin_number)"
|
|
)
|
|
|
|
if has_pin_ref:
|
|
pin_ref = nc.get("pin_ref", "")
|
|
if pin_ref and pin_ref not in existing_refs and pin_ref not in batch_refs:
|
|
errors.append(
|
|
f"no_connects[{i}]: pin_ref '{pin_ref}' not found in schematic "
|
|
f"or batch components"
|
|
)
|
|
|
|
return errors
|
|
|
|
|
|
def _register_project_libraries(
|
|
data: dict[str, Any], schematic_path: str,
|
|
) -> list[str]:
|
|
"""Register project-local symbol libraries with kicad-sch-api's cache.
|
|
|
|
Scans the batch data for library names not already known to the global
|
|
``SymbolLibraryCache``, locates the ``.kicad_sym`` files via mckicad's
|
|
own library search (which reads ``sym-lib-table``), and registers them
|
|
so that ``components.add()`` can resolve the lib_id.
|
|
|
|
Returns list of newly registered library names.
|
|
"""
|
|
try:
|
|
from kicad_sch_api.library.cache import get_symbol_cache
|
|
except ImportError:
|
|
return []
|
|
|
|
from mckicad.utils.sexp_parser import _find_library_file
|
|
|
|
cache = get_symbol_cache()
|
|
|
|
# Collect unique library names from batch components
|
|
lib_names: set[str] = set()
|
|
for comp in data.get("components", []):
|
|
lib_id = comp.get("lib_id", "")
|
|
if ":" in lib_id:
|
|
lib_names.add(lib_id.split(":")[0])
|
|
|
|
registered: list[str] = []
|
|
for lib_name in lib_names:
|
|
# Skip if already indexed
|
|
if lib_name in cache._library_index:
|
|
continue
|
|
|
|
lib_path = _find_library_file(schematic_path, lib_name)
|
|
if lib_path and cache.add_library_path(lib_path):
|
|
logger.info("Registered project-local library: %s -> %s", lib_name, lib_path)
|
|
registered.append(lib_name)
|
|
elif lib_path is None:
|
|
logger.debug("Library '%s' not found in project paths", lib_name)
|
|
|
|
return registered
|
|
|
|
|
|
|
|
def _apply_batch_operations(
|
|
sch: Any, data: dict[str, Any], schematic_path: str,
|
|
) -> dict[str, Any]:
|
|
"""Apply all batch operations to a loaded schematic. Returns summary.
|
|
|
|
Labels are returned as pending sexp strings (``_pending_label_sexps``)
|
|
that must be inserted into the file *after* ``sch.save()``, because
|
|
kicad-sch-api's serializer drops global labels and raises TypeError
|
|
on local labels.
|
|
|
|
Args:
|
|
sch: A kicad-sch-api SchematicDocument instance.
|
|
data: Parsed batch JSON data.
|
|
schematic_path: Path to the .kicad_sch file (needed for sexp
|
|
pin fallback and label insertion).
|
|
"""
|
|
from mckicad.patterns._geometry import add_power_symbol_to_pin
|
|
from mckicad.utils.sexp_parser import (
|
|
WireSegment,
|
|
clamp_stub_length,
|
|
compute_label_placement,
|
|
generate_global_label_sexp,
|
|
generate_label_sexp,
|
|
generate_wire_sexp,
|
|
resolve_label_collision,
|
|
resolve_pin_position,
|
|
resolve_pin_position_and_orientation,
|
|
resolve_wire_collision,
|
|
)
|
|
|
|
placed_components: list[str] = []
|
|
placed_power: list[dict[str, Any]] = []
|
|
placed_wires: list[str] = []
|
|
placed_labels: list[str] = []
|
|
placed_no_connects = 0
|
|
pending_label_sexps: list[str] = []
|
|
occupied_positions: dict[tuple[float, float], str] = {}
|
|
placed_wire_segments: list[WireSegment] = []
|
|
collisions_resolved = 0
|
|
wire_collisions_resolved = 0
|
|
|
|
# 0. Paper size (if specified in batch)
|
|
batch_paper = data.get("paper_size")
|
|
if batch_paper:
|
|
try:
|
|
sch.set_paper_size(batch_paper)
|
|
except Exception:
|
|
logger.warning("Could not set paper_size to '%s'", batch_paper)
|
|
|
|
# 1. Components (multi-unit supported natively by kicad-sch-api)
|
|
for comp in data.get("components", []):
|
|
ref = comp.get("reference")
|
|
unit = comp.get("unit", 1)
|
|
|
|
sch.components.add(
|
|
lib_id=comp["lib_id"],
|
|
reference=ref,
|
|
value=comp.get("value", ""),
|
|
position=(comp["x"], comp["y"]),
|
|
unit=unit,
|
|
)
|
|
|
|
# Apply optional rotation
|
|
if "rotation" in comp and ref:
|
|
placed = sch.components.get(ref)
|
|
if placed:
|
|
placed.rotate(comp["rotation"])
|
|
placed_components.append(ref or comp["lib_id"])
|
|
|
|
# 2. Power symbols (with sexp pin fallback)
|
|
for ps in data.get("power_symbols", []):
|
|
pin_pos_tuple = resolve_pin_position(
|
|
sch, schematic_path, ps["pin_ref"], ps["pin_number"],
|
|
)
|
|
if pin_pos_tuple is None:
|
|
logger.warning(
|
|
"Skipping power symbol: pin %s.%s not found",
|
|
ps["pin_ref"],
|
|
ps["pin_number"],
|
|
)
|
|
continue
|
|
result = add_power_symbol_to_pin(
|
|
sch=sch,
|
|
pin_position=pin_pos_tuple,
|
|
net=ps["net"],
|
|
lib_id=ps.get("lib_id"),
|
|
stub_length=ps.get("stub_length"),
|
|
)
|
|
placed_power.append(result)
|
|
|
|
# 3. Wires
|
|
for wire in data.get("wires", []):
|
|
if "from_ref" in wire:
|
|
wire_id = sch.add_wire_between_pins(
|
|
component1_ref=wire["from_ref"],
|
|
pin1_number=wire["from_pin"],
|
|
component2_ref=wire["to_ref"],
|
|
pin2_number=wire["to_pin"],
|
|
)
|
|
else:
|
|
wire_id = sch.add_wire(
|
|
start=(wire["start_x"], wire["start_y"]),
|
|
end=(wire["end_x"], wire["end_y"]),
|
|
)
|
|
placed_wires.append(str(wire_id))
|
|
|
|
# Pre-scan: collect all pin positions referenced by labels, power
|
|
# symbols, and no-connects as obstacles for stub length clamping.
|
|
# Runs AFTER components/power/wires are placed so all refs exist.
|
|
# Also builds ref_to_pins so we can exclude same-component pins
|
|
# from collision checks (they can't cause external shorts).
|
|
obstacle_points: list[tuple[float, float]] = []
|
|
ref_to_pins: dict[str, list[tuple[float, float]]] = {}
|
|
_pin_cache: dict[tuple[str, str], dict[str, Any] | None] = {}
|
|
|
|
def _cached_resolve(ref: str, pin: str) -> dict[str, Any] | None:
|
|
key = (ref, pin)
|
|
if key not in _pin_cache:
|
|
_pin_cache[key] = resolve_pin_position_and_orientation(
|
|
sch, schematic_path, ref, pin,
|
|
)
|
|
return _pin_cache[key]
|
|
|
|
def _register_obstacle(ref: str, info: dict[str, Any]) -> None:
|
|
pt = (info["x"], info["y"])
|
|
obstacle_points.append(pt)
|
|
ref_to_pins.setdefault(ref, []).append(pt)
|
|
|
|
for _label in data.get("labels", []):
|
|
if "pin_ref" in _label:
|
|
_info = _cached_resolve(_label["pin_ref"], str(_label["pin_number"]))
|
|
if _info:
|
|
_register_obstacle(_label["pin_ref"], _info)
|
|
for _lc in data.get("label_connections", []):
|
|
for _conn in _lc.get("connections", []):
|
|
_info = _cached_resolve(_conn["ref"], str(_conn["pin"]))
|
|
if _info:
|
|
_register_obstacle(_conn["ref"], _info)
|
|
for _ps in data.get("power_symbols", []):
|
|
if "pin_ref" in _ps:
|
|
_info = _cached_resolve(_ps["pin_ref"], str(_ps["pin_number"]))
|
|
if _info:
|
|
_register_obstacle(_ps["pin_ref"], _info)
|
|
for _nc in data.get("no_connects", []):
|
|
if "pin_ref" in _nc:
|
|
_info = _cached_resolve(_nc["pin_ref"], str(_nc["pin_number"]))
|
|
if _info:
|
|
_register_obstacle(_nc["pin_ref"], _info)
|
|
|
|
# 4. Labels — generate sexp strings for post-save insertion
|
|
for label in data.get("labels", []):
|
|
is_global = label.get("global", False)
|
|
rotation = label.get("rotation", 0)
|
|
|
|
if "pin_ref" in label:
|
|
# Pin-referenced label: resolve position from component pin
|
|
pin_info = _cached_resolve(
|
|
label["pin_ref"], str(label["pin_number"]),
|
|
)
|
|
if pin_info is None:
|
|
logger.warning(
|
|
"Skipping pin-ref label '%s': pin %s.%s not found",
|
|
label["text"], label["pin_ref"], label["pin_number"],
|
|
)
|
|
continue
|
|
label_stub = label.get("stub_length", LABEL_DEFAULTS["stub_length"])
|
|
|
|
# Direction override: map user direction to schematic rotation
|
|
direction = label.get("direction")
|
|
if direction:
|
|
dir_rot = _DIRECTION_TO_ROTATION.get(direction)
|
|
if dir_rot is not None:
|
|
pin_info = dict(pin_info, schematic_rotation=dir_rot)
|
|
|
|
# Clamp stub to avoid bridging adjacent pins.
|
|
# Filter same-component pin positions from obstacle_points
|
|
# (they can't cause external shorts), but keep wire endpoints
|
|
# unfiltered since placed stubs physically occupy space.
|
|
same_comp = {
|
|
(round(p[0], 2), round(p[1], 2))
|
|
for p in ref_to_pins.get(label["pin_ref"], [])
|
|
}
|
|
filtered_obstacles = [
|
|
pt for pt in obstacle_points
|
|
if (round(pt[0], 2), round(pt[1], 2)) not in same_comp
|
|
]
|
|
stub_obstacles = filtered_obstacles + [
|
|
pt for s in placed_wire_segments for pt in (s[0], s[1])
|
|
]
|
|
label_stub = clamp_stub_length(
|
|
pin_info["x"], pin_info["y"],
|
|
pin_info["schematic_rotation"],
|
|
label_stub, stub_obstacles,
|
|
)
|
|
|
|
placement = compute_label_placement(
|
|
pin_info["x"], pin_info["y"],
|
|
pin_info["schematic_rotation"],
|
|
stub_length=label_stub,
|
|
)
|
|
lx, ly = placement["label_x"], placement["label_y"]
|
|
rotation = placement["label_rotation"]
|
|
|
|
# Resolve collisions before generating sexp
|
|
new_x, new_y = resolve_label_collision(
|
|
lx, ly, rotation, label["text"], occupied_positions,
|
|
)
|
|
if (new_x, new_y) != (lx, ly):
|
|
collisions_resolved += 1
|
|
lx, ly = new_x, new_y
|
|
|
|
if is_global:
|
|
sexp = generate_global_label_sexp(
|
|
text=label["text"], x=lx, y=ly, rotation=rotation,
|
|
shape=label.get("shape", "bidirectional"),
|
|
)
|
|
else:
|
|
sexp = generate_label_sexp(
|
|
text=label["text"], x=lx, y=ly, rotation=rotation,
|
|
)
|
|
pending_label_sexps.append(sexp)
|
|
|
|
# Wire stub from pin to (possibly shifted) label — with
|
|
# wire-level collision detection for collinear overlaps
|
|
w_sx, w_sy, w_lx, w_ly = resolve_wire_collision(
|
|
placement["stub_start_x"], placement["stub_start_y"],
|
|
lx, ly, rotation, label["text"], placed_wire_segments,
|
|
)
|
|
if (w_sx, w_sy, w_lx, w_ly) != (
|
|
placement["stub_start_x"], placement["stub_start_y"],
|
|
lx, ly,
|
|
):
|
|
wire_collisions_resolved += 1
|
|
lx, ly = w_lx, w_ly
|
|
wire_sexp = generate_wire_sexp(w_sx, w_sy, w_lx, w_ly)
|
|
pending_label_sexps.append(wire_sexp)
|
|
else:
|
|
# Coordinate-based label (original path)
|
|
if is_global:
|
|
sexp = generate_global_label_sexp(
|
|
text=label["text"],
|
|
x=label["x"],
|
|
y=label["y"],
|
|
rotation=rotation,
|
|
shape=label.get("shape", "bidirectional"),
|
|
)
|
|
else:
|
|
sexp = generate_label_sexp(
|
|
text=label["text"],
|
|
x=label["x"],
|
|
y=label["y"],
|
|
rotation=rotation,
|
|
)
|
|
pending_label_sexps.append(sexp)
|
|
|
|
placed_labels.append(label["text"])
|
|
|
|
# 4b. Label connections — pin-ref labels sharing a net name
|
|
for lc in data.get("label_connections", []):
|
|
net = lc["net"]
|
|
is_global = lc.get("global", False)
|
|
shape = lc.get("shape", "bidirectional")
|
|
stub_len = lc.get("stub_length", LABEL_DEFAULTS["stub_length"])
|
|
|
|
for conn in lc.get("connections", []):
|
|
pin_info = _cached_resolve(conn["ref"], str(conn["pin"]))
|
|
if pin_info is None:
|
|
logger.warning(
|
|
"Skipping label_connection '%s': pin %s.%s not found",
|
|
net, conn["ref"], conn["pin"],
|
|
)
|
|
continue
|
|
|
|
# Per-connection stub_length and direction overrides
|
|
conn_stub = conn.get("stub_length", stub_len)
|
|
|
|
direction = conn.get("direction")
|
|
if direction:
|
|
dir_rot = _DIRECTION_TO_ROTATION.get(direction)
|
|
if dir_rot is not None:
|
|
pin_info = dict(pin_info, schematic_rotation=dir_rot)
|
|
|
|
# Clamp stub to avoid bridging adjacent pins.
|
|
# Filter same-component pin positions from obstacle_points
|
|
# (they can't cause external shorts), but keep wire endpoints
|
|
# unfiltered since placed stubs physically occupy space.
|
|
same_comp = {
|
|
(round(p[0], 2), round(p[1], 2))
|
|
for p in ref_to_pins.get(conn["ref"], [])
|
|
}
|
|
filtered_obstacles = [
|
|
pt for pt in obstacle_points
|
|
if (round(pt[0], 2), round(pt[1], 2)) not in same_comp
|
|
]
|
|
stub_obstacles = filtered_obstacles + [
|
|
pt for s in placed_wire_segments for pt in (s[0], s[1])
|
|
]
|
|
conn_stub = clamp_stub_length(
|
|
pin_info["x"], pin_info["y"],
|
|
pin_info["schematic_rotation"],
|
|
conn_stub, stub_obstacles,
|
|
)
|
|
|
|
placement = compute_label_placement(
|
|
pin_info["x"], pin_info["y"],
|
|
pin_info["schematic_rotation"],
|
|
stub_length=conn_stub,
|
|
)
|
|
lx, ly = placement["label_x"], placement["label_y"]
|
|
rotation = placement["label_rotation"]
|
|
|
|
# Resolve collisions before generating sexp
|
|
new_x, new_y = resolve_label_collision(
|
|
lx, ly, rotation, net, occupied_positions,
|
|
)
|
|
if (new_x, new_y) != (lx, ly):
|
|
collisions_resolved += 1
|
|
lx, ly = new_x, new_y
|
|
|
|
if is_global:
|
|
sexp = generate_global_label_sexp(
|
|
text=net, x=lx, y=ly, rotation=rotation, shape=shape,
|
|
)
|
|
else:
|
|
sexp = generate_label_sexp(
|
|
text=net, x=lx, y=ly, rotation=rotation,
|
|
)
|
|
pending_label_sexps.append(sexp)
|
|
|
|
# Wire stub from pin to (possibly shifted) label — with
|
|
# wire-level collision detection for collinear overlaps
|
|
w_sx, w_sy, w_lx, w_ly = resolve_wire_collision(
|
|
placement["stub_start_x"], placement["stub_start_y"],
|
|
lx, ly, rotation, net, placed_wire_segments,
|
|
)
|
|
if (w_sx, w_sy, w_lx, w_ly) != (
|
|
placement["stub_start_x"], placement["stub_start_y"],
|
|
lx, ly,
|
|
):
|
|
wire_collisions_resolved += 1
|
|
lx, ly = w_lx, w_ly
|
|
wire_sexp = generate_wire_sexp(w_sx, w_sy, w_lx, w_ly)
|
|
pending_label_sexps.append(wire_sexp)
|
|
placed_labels.append(net)
|
|
|
|
# 5. No-connects (coordinate or pin-referenced)
|
|
for nc in data.get("no_connects", []):
|
|
if "pin_ref" in nc:
|
|
pin_pos = resolve_pin_position(
|
|
sch, schematic_path, nc["pin_ref"], str(nc["pin_number"]),
|
|
)
|
|
if pin_pos is None:
|
|
logger.warning(
|
|
"Skipping no-connect: pin %s.%s not found",
|
|
nc["pin_ref"], nc["pin_number"],
|
|
)
|
|
continue
|
|
sch.no_connects.add(position=pin_pos)
|
|
else:
|
|
sch.no_connects.add(position=(nc["x"], nc["y"]))
|
|
placed_no_connects += 1
|
|
|
|
return {
|
|
"components_placed": len(placed_components),
|
|
"component_refs": placed_components,
|
|
"power_symbols_placed": len(placed_power),
|
|
"power_details": placed_power,
|
|
"wires_placed": len(placed_wires),
|
|
"wire_ids": placed_wires,
|
|
"labels_placed": len(placed_labels),
|
|
"label_ids": placed_labels,
|
|
"no_connects_placed": placed_no_connects,
|
|
"collisions_resolved": collisions_resolved,
|
|
"wire_collisions_resolved": wire_collisions_resolved,
|
|
"total_operations": (
|
|
len(placed_components)
|
|
+ len(placed_power)
|
|
+ len(placed_wires)
|
|
+ len(placed_labels)
|
|
+ placed_no_connects
|
|
),
|
|
"_pending_label_sexps": pending_label_sexps,
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def apply_batch(
|
|
schematic_path: str,
|
|
batch_file: str,
|
|
dry_run: bool = False,
|
|
parent_uuid: str | None = None,
|
|
sheet_uuid: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Apply a batch of schematic modifications from a JSON file.
|
|
|
|
Reads a JSON file containing components, power symbols, wires, labels,
|
|
and no-connect flags, then applies them all in a single atomic operation.
|
|
All changes share one load-save cycle for performance and consistency.
|
|
|
|
Operations are processed in dependency order: components first (so
|
|
pin-reference wires can find them), then power symbols, wires, labels,
|
|
and finally no-connects.
|
|
|
|
Use ``dry_run=True`` to validate the batch file without modifying the
|
|
schematic -- returns validation results and a preview of what would be
|
|
applied.
|
|
|
|
Batch files can be placed in the ``.mckicad/`` directory next to the
|
|
schematic for clean organization.
|
|
|
|
**Hierarchy context:** For sub-sheets in a hierarchical design, pass
|
|
``parent_uuid`` and ``sheet_uuid`` (both returned by
|
|
``add_hierarchical_sheet``). This sets the instance path so that
|
|
kicad-cli correctly resolves power symbol nets during netlist export.
|
|
Without hierarchy context, power symbols (GND, +3V3, etc.) are silently
|
|
dropped from the netlist.
|
|
|
|
**Batch JSON schema:**
|
|
|
|
.. code-block:: json
|
|
|
|
{
|
|
"components": [
|
|
{"lib_id": "Device:C", "reference": "C1", "value": "100nF",
|
|
"x": 100, "y": 200, "rotation": 0}
|
|
],
|
|
"power_symbols": [
|
|
{"net": "GND", "pin_ref": "C1", "pin_number": "2"}
|
|
],
|
|
"wires": [
|
|
{"start_x": 100, "start_y": 200, "end_x": 200, "end_y": 200},
|
|
{"from_ref": "R1", "from_pin": "1", "to_ref": "R2", "to_pin": "2"}
|
|
],
|
|
"labels": [
|
|
{"text": "SPI_CLK", "x": 150, "y": 100, "global": false},
|
|
{"text": "GPIO5", "pin_ref": "U8", "pin_number": "15", "global": true}
|
|
],
|
|
"label_connections": [
|
|
{
|
|
"net": "BOOT_MODE", "global": true,
|
|
"connections": [
|
|
{"ref": "U8", "pin": "48"},
|
|
{"ref": "R42", "pin": "1"}
|
|
]
|
|
}
|
|
],
|
|
"no_connects": [
|
|
{"x": 300, "y": 300},
|
|
{"pin_ref": "U8", "pin_number": "33"}
|
|
]
|
|
}
|
|
|
|
Labels accept either ``{x, y}`` for coordinate placement or
|
|
``{pin_ref, pin_number}`` for pin-referenced placement (with automatic
|
|
wire stub). ``label_connections`` place the same net label on multiple
|
|
pins simultaneously.
|
|
|
|
Args:
|
|
schematic_path: Path to an existing .kicad_sch file.
|
|
batch_file: Path to the batch JSON file. Relative paths are resolved
|
|
against the ``.mckicad/`` directory first, then the schematic's
|
|
directory.
|
|
dry_run: When True, validates the batch without applying changes.
|
|
Returns validation results and operation preview.
|
|
parent_uuid: UUID of the parent (root) schematic. Required together
|
|
with ``sheet_uuid`` for hierarchical designs. Returned by
|
|
``add_hierarchical_sheet`` as ``parent_uuid``.
|
|
sheet_uuid: UUID of the sheet entry in the parent schematic. Required
|
|
together with ``parent_uuid`` for hierarchical designs. Returned
|
|
by ``add_hierarchical_sheet`` as ``sheet_uuid``.
|
|
|
|
Returns:
|
|
Dictionary with ``success``, counts of each operation type applied,
|
|
and wire/label IDs for the created elements.
|
|
"""
|
|
err = _require_sch_api()
|
|
if err:
|
|
return err
|
|
|
|
schematic_path = _expand(schematic_path)
|
|
verr = _validate_schematic_path(schematic_path)
|
|
if verr:
|
|
return verr
|
|
|
|
# Resolve and load batch file
|
|
resolved_path = _resolve_batch_file(batch_file, schematic_path)
|
|
if not os.path.isfile(resolved_path):
|
|
return {
|
|
"success": False,
|
|
"error": f"Batch file not found: {resolved_path}",
|
|
"searched_paths": [
|
|
resolved_path,
|
|
os.path.join(os.path.dirname(schematic_path), batch_file),
|
|
],
|
|
}
|
|
|
|
try:
|
|
with open(resolved_path) as f:
|
|
data = json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
return {
|
|
"success": False,
|
|
"error": f"Invalid JSON in batch file: {e}",
|
|
"batch_file": resolved_path,
|
|
}
|
|
|
|
if not isinstance(data, dict):
|
|
return {
|
|
"success": False,
|
|
"error": "Batch file must contain a JSON object at the top level",
|
|
"batch_file": resolved_path,
|
|
}
|
|
|
|
try:
|
|
sch = _ksa_load(schematic_path)
|
|
|
|
# Register project-local symbol libraries with the cache so
|
|
# components.add() can resolve non-standard lib_ids.
|
|
_register_project_libraries(data, schematic_path)
|
|
|
|
# Set hierarchy context for sub-sheets so kicad-cli resolves
|
|
# power symbol nets correctly during netlist export.
|
|
if parent_uuid and sheet_uuid:
|
|
sch.set_hierarchy_context(parent_uuid, sheet_uuid)
|
|
logger.info(
|
|
"Set hierarchy context: parent=%s, sheet=%s",
|
|
parent_uuid, sheet_uuid,
|
|
)
|
|
|
|
# Validation pass
|
|
validation_errors = _validate_batch_data(data, sch)
|
|
if validation_errors:
|
|
return {
|
|
"success": False,
|
|
"error": "Batch validation failed",
|
|
"validation_errors": validation_errors,
|
|
"batch_file": resolved_path,
|
|
"schematic_path": schematic_path,
|
|
}
|
|
|
|
# Preview for dry run
|
|
if dry_run:
|
|
lc_count = sum(
|
|
len(lc.get("connections", []))
|
|
for lc in data.get("label_connections", [])
|
|
)
|
|
return {
|
|
"success": True,
|
|
"dry_run": True,
|
|
"preview": {
|
|
"components": len(data.get("components", [])),
|
|
"power_symbols": len(data.get("power_symbols", [])),
|
|
"wires": len(data.get("wires", [])),
|
|
"labels": len(data.get("labels", [])) + lc_count,
|
|
"label_connections": lc_count,
|
|
"no_connects": len(data.get("no_connects", [])),
|
|
},
|
|
"validation": "passed",
|
|
"batch_file": resolved_path,
|
|
"schematic_path": schematic_path,
|
|
"engine": _get_schematic_engine(),
|
|
}
|
|
|
|
# Apply all operations
|
|
summary = _apply_batch_operations(sch, data, schematic_path)
|
|
|
|
# Single save (components, power symbols, wires, no-connects)
|
|
sch.save(schematic_path)
|
|
|
|
# Fix kicad-sch-api's mis-serialization of (property private ...)
|
|
# keywords before any further file modifications.
|
|
from mckicad.utils.sexp_parser import (
|
|
fix_property_private_keywords,
|
|
insert_sexp_before_close,
|
|
)
|
|
|
|
private_fixes = fix_property_private_keywords(schematic_path)
|
|
if private_fixes:
|
|
summary["property_private_fixes"] = private_fixes
|
|
|
|
# Insert labels via sexp AFTER save — kicad-sch-api's serializer
|
|
# drops global labels and raises TypeError on local labels.
|
|
pending_sexps = summary.pop("_pending_label_sexps", [])
|
|
if pending_sexps:
|
|
combined_sexp = "".join(pending_sexps)
|
|
insert_sexp_before_close(schematic_path, combined_sexp)
|
|
|
|
logger.info(
|
|
"Batch applied: %d operations from %s to %s",
|
|
summary["total_operations"],
|
|
resolved_path,
|
|
schematic_path,
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
**summary,
|
|
"batch_file": resolved_path,
|
|
"schematic_path": schematic_path,
|
|
"engine": _get_schematic_engine(),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("Batch apply failed for %s: %s", schematic_path, e)
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"batch_file": resolved_path,
|
|
"schematic_path": schematic_path,
|
|
}
|