From 1b0a77f9567d36138e077b0a464c324c1c3f2772 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Thu, 5 Mar 2026 13:41:19 -0700 Subject: [PATCH] Add import_netlist tool for KiCad XML and CSV netlist ingestion Parses external netlist files into the component-pin-net graph that verify_connectivity and apply_batch consume. Supports KiCad XML (.net) exported by kicad-cli, and CSV/TSV with flexible column name matching. Auto-detects format from file extension and content. Output is directly compatible with verify_connectivity's expected parameter, closing the loop between "I have a design" and "I can build it in KiCad." Requested by ESP32-P4 project (agent thread message 028). --- src/mckicad/server.py | 1 + src/mckicad/tools/netlist.py | 295 +++++++++++++++++++++++++++++++++++ tests/test_netlist.py | 286 +++++++++++++++++++++++++++++++++ 3 files changed, 582 insertions(+) create mode 100644 src/mckicad/tools/netlist.py create mode 100644 tests/test_netlist.py diff --git a/src/mckicad/server.py b/src/mckicad/server.py index 0f0d53f..e725170 100644 --- a/src/mckicad/server.py +++ b/src/mckicad/server.py @@ -52,6 +52,7 @@ from mckicad.tools import ( # noqa: E402, F401 bom, drc, export, + netlist, pcb, power_symbols, project, diff --git a/src/mckicad/tools/netlist.py b/src/mckicad/tools/netlist.py new file mode 100644 index 0000000..e5dd4af --- /dev/null +++ b/src/mckicad/tools/netlist.py @@ -0,0 +1,295 @@ +"""Netlist import tools for the mckicad MCP server. + +Parses external netlist files into the component-pin-net graph that +``verify_connectivity`` and ``apply_batch`` consume. Supports KiCad +XML (.net) and CSV/TSV formats. +""" + +import csv +import io +import json +import logging +import os +from typing import Any +import xml.etree.ElementTree as ET + +from mckicad.server import mcp +from mckicad.utils.file_utils import write_detail_file + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Format-specific parsers +# --------------------------------------------------------------------------- + + +def _parse_kicad_xml(content: str) -> dict[str, Any]: + """Parse a KiCad XML netlist (.net file) into nets/components dicts. + + KiCad XML structure:: + + + + 1K... + + + + + + + + """ + root = ET.fromstring(content) + + # Parse components + components: dict[str, dict[str, str]] = {} + comp_section = root.find("components") + if comp_section is not None: + for comp in comp_section.findall("comp"): + ref = comp.get("ref", "") + if not ref: + continue + value_el = comp.find("value") + footprint_el = comp.find("footprint") + libsource = comp.find("libsource") + components[ref] = { + "value": value_el.text if value_el is not None and value_el.text else "", + "footprint": footprint_el.text if footprint_el is not None and footprint_el.text else "", + "lib": libsource.get("lib", "") if libsource is not None else "", + "part": libsource.get("part", "") if libsource is not None else "", + } + + # Parse nets + nets: dict[str, list[list[str]]] = {} + connection_count = 0 + nets_section = root.find("nets") + if nets_section is not None: + for net_el in nets_section.findall("net"): + net_name = net_el.get("name", "") + if not net_name: + continue + pins: list[list[str]] = [] + for node in net_el.findall("node"): + ref = node.get("ref", "") + pin = node.get("pin", "") + if ref and pin: + pins.append([ref, pin]) + connection_count += 1 + if pins: + nets[net_name] = pins + + return { + "nets": nets, + "components": components, + "statistics": { + "net_count": len(nets), + "component_count": len(components), + "connection_count": connection_count, + }, + } + + +def _parse_csv(content: str) -> dict[str, Any]: + """Parse a CSV/TSV netlist into nets/components dicts. + + Expects columns that include at least ``Reference``, ``Pin``, and ``Net``. + Column names are matched case-insensitively. Both comma and tab + delimiters are auto-detected. + """ + # Auto-detect delimiter + delimiter = "\t" if "\t" in content.split("\n", 1)[0] else "," + + reader = csv.DictReader(io.StringIO(content), delimiter=delimiter) + if reader.fieldnames is None: + raise ValueError("CSV file has no header row") + + # Normalise header names to lowercase for flexible matching + field_map: dict[str, str] = {} + for f in reader.fieldnames: + fl = f.strip().lower() + field_map[fl] = f + + # Find the columns we need + ref_col = field_map.get("reference") or field_map.get("ref") or field_map.get("designator") + pin_col = field_map.get("pin") or field_map.get("pin_number") or field_map.get("pin number") + net_col = field_map.get("net") or field_map.get("net_name") or field_map.get("net name") + + if not ref_col: + raise ValueError("CSV missing 'Reference' (or 'Ref', 'Designator') column") + if not pin_col: + raise ValueError("CSV missing 'Pin' (or 'Pin_Number', 'Pin Number') column") + if not net_col: + raise ValueError("CSV missing 'Net' (or 'Net_Name', 'Net Name') column") + + nets: dict[str, list[list[str]]] = {} + components: set[str] = set() + connection_count = 0 + + for row in reader: + ref = (row.get(ref_col) or "").strip() + pin = (row.get(pin_col) or "").strip() + net_name = (row.get(net_col) or "").strip() + if not ref or not pin or not net_name: + continue + nets.setdefault(net_name, []).append([ref, pin]) + components.add(ref) + connection_count += 1 + + # Return components as a simple dict (no value/footprint info from CSV) + comp_dict: dict[str, dict[str, str]] = {ref: {} for ref in sorted(components)} + + return { + "nets": nets, + "components": comp_dict, + "statistics": { + "net_count": len(nets), + "component_count": len(components), + "connection_count": connection_count, + }, + } + + +def _detect_format(content: str, file_path: str) -> str: + """Auto-detect netlist format from file extension and content.""" + ext = os.path.splitext(file_path)[1].lower() + + # Extension-based detection + if ext == ".net": + return "kicad_xml" + if ext in (".csv", ".tsv"): + return "csv" + if ext == ".xml": + return "kicad_xml" + + # Content-based detection + stripped = content.lstrip() + if stripped.startswith(" dict[str, Any]: + """Import a netlist file and extract the component-pin-net graph. + + Parses an external netlist into a structured JSON representation + suitable for ``verify_connectivity`` (as the ``expected`` parameter) + or for generating ``apply_batch`` wiring instructions. + + Supported formats: + - ``kicad_xml`` — KiCad intermediate netlist (.net XML). Most reliable + source, exported via ``kicad-cli sch export netlist``. + - ``csv`` — CSV or TSV with Reference, Pin, and Net columns. Common + export from EDA tools (Altium, Eagle, etc.). + - ``auto`` — detect format from file extension and content (default). + + Args: + source_path: Path to the netlist file to import. + format: Netlist format (``auto``, ``kicad_xml``, ``csv``). + output_path: Optional path to write the parsed result as JSON. + Defaults to ``.mckicad/imported_netlist.json`` next to + the source file. + + Returns: + Dictionary with ``nets`` (net_name → [[ref, pin], ...]), + ``components`` (ref → metadata dict), and ``statistics``. + The ``nets`` dict is directly compatible with + ``verify_connectivity``'s ``expected`` parameter. + """ + source_path = os.path.abspath(os.path.expanduser(source_path)) + + if not os.path.isfile(source_path): + return { + "success": False, + "error": f"Source file not found: {source_path}", + } + + allowed_formats = ("auto", "kicad_xml", "csv") + fmt = format.lower().strip() + if fmt not in allowed_formats: + return { + "success": False, + "error": f"Unsupported format: {format}. Allowed: {', '.join(allowed_formats)}", + } + + try: + with open(source_path) as f: + content = f.read() + except Exception as e: + return {"success": False, "error": f"Failed to read file: {e}"} + + if not content.strip(): + return {"success": False, "error": "Source file is empty"} + + # Detect format if auto + if fmt == "auto": + fmt = _detect_format(content, source_path) + if fmt == "unknown": + return { + "success": False, + "error": ( + "Could not auto-detect netlist format. " + "Specify format='kicad_xml' or format='csv' explicitly." + ), + } + + # Parse + try: + if fmt == "kicad_xml": + parsed = _parse_kicad_xml(content) + elif fmt == "csv": + parsed = _parse_csv(content) + else: + return {"success": False, "error": f"Parser not implemented for format: {fmt}"} + except ET.ParseError as e: + return {"success": False, "error": f"XML parse error: {e}"} + except ValueError as e: + return {"success": False, "error": str(e)} + except Exception as e: + logger.error("Netlist import failed: %s", e, exc_info=True) + return {"success": False, "error": f"Parse error: {e}"} + + stats = parsed["statistics"] + logger.info( + "Imported netlist from %s (%s): %d nets, %d components, %d connections", + os.path.basename(source_path), fmt, + stats["net_count"], stats["component_count"], stats["connection_count"], + ) + + result: dict[str, Any] = { + "success": True, + "format_detected": fmt, + "source_path": source_path, + **parsed, + } + + # Write output JSON + if output_path: + out = os.path.abspath(os.path.expanduser(output_path)) + os.makedirs(os.path.dirname(out) or ".", exist_ok=True) + with open(out, "w") as f: + json.dump(parsed, f, indent=2) + else: + out = write_detail_file( + source_path, "imported_netlist.json", parsed, + ) + + result["output_path"] = out + + return result diff --git a/tests/test_netlist.py b/tests/test_netlist.py new file mode 100644 index 0000000..7d21bde --- /dev/null +++ b/tests/test_netlist.py @@ -0,0 +1,286 @@ +"""Tests for netlist import tools.""" + +import json +import os +import textwrap + +import pytest + + +@pytest.mark.unit +class TestParseKicadXml: + """Tests for KiCad XML netlist parsing.""" + + def test_parse_basic_netlist(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + xml_content = textwrap.dedent("""\ + + + + + 10k + Resistor_SMD:R_0402 + + + + 4.7k + Resistor_SMD:R_0402 + + + + 100nF + Capacitor_SMD:C_0402 + + + + + + + + + + + + + + + + + + + """) + path = os.path.join(tmp_output_dir, "test.net") + with open(path, "w") as f: + f.write(xml_content) + + result = import_netlist(source_path=path) + + assert result["success"] is True + assert result["format_detected"] == "kicad_xml" + assert result["statistics"]["net_count"] == 3 + assert result["statistics"]["component_count"] == 3 + assert result["statistics"]["connection_count"] == 6 + + # Check nets structure + nets = result["nets"] + assert "GND" in nets + assert ["R1", "2"] in nets["GND"] + assert ["C1", "2"] in nets["GND"] + assert "VCC" in nets + assert len(nets["VCC"]) == 2 + + # Check components + assert "R1" in result["components"] + assert result["components"]["R1"]["value"] == "10k" + assert result["components"]["R1"]["lib"] == "Device" + + def test_auto_detects_xml_format(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + xml = '' + path = os.path.join(tmp_output_dir, "test.xml") + with open(path, "w") as f: + f.write(xml) + + result = import_netlist(source_path=path, format="auto") + assert result["success"] is True + assert result["format_detected"] == "kicad_xml" + + def test_malformed_xml_returns_error(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + path = os.path.join(tmp_output_dir, "bad.net") + with open(path, "w") as f: + f.write("") + + result = import_netlist(source_path=path) + assert result["success"] is False + assert "XML parse error" in result["error"] + + def test_verify_connectivity_compatible(self, tmp_output_dir): + """The nets dict is directly usable as verify_connectivity expected param.""" + from mckicad.tools.netlist import import_netlist + + xml = textwrap.dedent("""\ + + + IC + + + + + + + + + """) + path = os.path.join(tmp_output_dir, "compat.net") + with open(path, "w") as f: + f.write(xml) + + result = import_netlist(source_path=path) + assert result["success"] is True + + # verify_connectivity expects: {"NET1": [["U1", "1"], ["R1", "2"]]} + nets = result["nets"] + assert "NET1" in nets + assert isinstance(nets["NET1"], list) + for pin_pair in nets["NET1"]: + assert isinstance(pin_pair, list) + assert len(pin_pair) == 2 + + +@pytest.mark.unit +class TestParseCsv: + """Tests for CSV/TSV netlist parsing.""" + + def test_parse_csv_netlist(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + csv_content = textwrap.dedent("""\ + Reference,Pin,Net + R1,1,VCC + R1,2,GND + C1,1,VCC + C1,2,GND + """) + path = os.path.join(tmp_output_dir, "test.csv") + with open(path, "w") as f: + f.write(csv_content) + + result = import_netlist(source_path=path) + assert result["success"] is True + assert result["format_detected"] == "csv" + assert result["statistics"]["net_count"] == 2 + assert result["statistics"]["component_count"] == 2 + assert result["statistics"]["connection_count"] == 4 + + assert ["R1", "1"] in result["nets"]["VCC"] + assert ["C1", "2"] in result["nets"]["GND"] + + def test_parse_tsv_netlist(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + tsv_content = "Reference\tPin\tNet\nR1\t1\tVCC\nR1\t2\tGND\n" + path = os.path.join(tmp_output_dir, "test.tsv") + with open(path, "w") as f: + f.write(tsv_content) + + result = import_netlist(source_path=path, format="csv") + assert result["success"] is True + assert result["statistics"]["connection_count"] == 2 + + def test_csv_missing_columns(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + csv_content = "Name,Value\nR1,10k\n" + path = os.path.join(tmp_output_dir, "bad.csv") + with open(path, "w") as f: + f.write(csv_content) + + result = import_netlist(source_path=path, format="csv") + assert result["success"] is False + assert "Reference" in result["error"] + + def test_csv_alternative_column_names(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + csv_content = "Designator,Pin Number,Net Name\nU1,3,SDA\nU1,4,SCL\n" + path = os.path.join(tmp_output_dir, "alt.csv") + with open(path, "w") as f: + f.write(csv_content) + + result = import_netlist(source_path=path, format="csv") + assert result["success"] is True + assert "SDA" in result["nets"] + assert "SCL" in result["nets"] + + +@pytest.mark.unit +class TestImportNetlistErrors: + """Tests for error handling in import_netlist.""" + + def test_nonexistent_file(self): + from mckicad.tools.netlist import import_netlist + + result = import_netlist(source_path="/tmp/nonexistent_12345.net") + assert result["success"] is False + assert "not found" in result["error"] + + def test_empty_file(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + path = os.path.join(tmp_output_dir, "empty.net") + with open(path, "w") as f: + f.write("") + + result = import_netlist(source_path=path) + assert result["success"] is False + assert "empty" in result["error"] + + def test_unsupported_format(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + path = os.path.join(tmp_output_dir, "test.net") + with open(path, "w") as f: + f.write("some content") + + result = import_netlist(source_path=path, format="spice") + assert result["success"] is False + assert "Unsupported" in result["error"] + + def test_unknown_format_auto(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + path = os.path.join(tmp_output_dir, "mystery.dat") + with open(path, "w") as f: + f.write("random binary garbage 12345") + + result = import_netlist(source_path=path) + assert result["success"] is False + assert "auto-detect" in result["error"] + + +@pytest.mark.unit +class TestOutputFile: + """Tests for output file writing.""" + + def test_writes_output_json(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + xml = textwrap.dedent("""\ + + 1k + + + + + """) + src = os.path.join(tmp_output_dir, "test.net") + with open(src, "w") as f: + f.write(xml) + + out = os.path.join(tmp_output_dir, "output.json") + result = import_netlist(source_path=src, output_path=out) + assert result["success"] is True + assert result["output_path"] == out + assert os.path.isfile(out) + + with open(out) as f: + data = json.load(f) + assert "nets" in data + assert "GND" in data["nets"] + + def test_default_output_path(self, tmp_output_dir): + from mckicad.tools.netlist import import_netlist + + xml = '' + src = os.path.join(tmp_output_dir, "test.net") + with open(src, "w") as f: + f.write(xml) + + result = import_netlist(source_path=src) + assert result["success"] is True + assert "output_path" in result + assert os.path.isfile(result["output_path"])