Merge feature/netlist-import: add import_netlist tool

Adds KiCad XML (.net) and CSV/TSV netlist import with auto-format
detection. Output nets dict is directly compatible with
verify_connectivity's expected parameter.
This commit is contained in:
Ryan Malloy 2026-03-05 13:43:34 -07:00
commit 1f06ef8b7f
3 changed files with 582 additions and 0 deletions

View File

@ -52,6 +52,7 @@ from mckicad.tools import ( # noqa: E402, F401
bom,
drc,
export,
netlist,
pcb,
power_symbols,
project,

View File

@ -0,0 +1,295 @@
"""Netlist import tools for the mckicad MCP server.
Parses external netlist files into the component-pin-net graph that
``verify_connectivity`` and ``apply_batch`` consume. Supports KiCad
XML (.net) and CSV/TSV formats.
"""
import csv
import io
import json
import logging
import os
from typing import Any
import xml.etree.ElementTree as ET
from mckicad.server import mcp
from mckicad.utils.file_utils import write_detail_file
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Format-specific parsers
# ---------------------------------------------------------------------------
def _parse_kicad_xml(content: str) -> dict[str, Any]:
"""Parse a KiCad XML netlist (.net file) into nets/components dicts.
KiCad XML structure::
<export version="D">
<components>
<comp ref="R1"><value>1K</value>...</comp>
</components>
<nets>
<net code="1" name="GND">
<node ref="R2" pin="1"/>
</net>
</nets>
</export>
"""
root = ET.fromstring(content)
# Parse components
components: dict[str, dict[str, str]] = {}
comp_section = root.find("components")
if comp_section is not None:
for comp in comp_section.findall("comp"):
ref = comp.get("ref", "")
if not ref:
continue
value_el = comp.find("value")
footprint_el = comp.find("footprint")
libsource = comp.find("libsource")
components[ref] = {
"value": value_el.text if value_el is not None and value_el.text else "",
"footprint": footprint_el.text if footprint_el is not None and footprint_el.text else "",
"lib": libsource.get("lib", "") if libsource is not None else "",
"part": libsource.get("part", "") if libsource is not None else "",
}
# Parse nets
nets: dict[str, list[list[str]]] = {}
connection_count = 0
nets_section = root.find("nets")
if nets_section is not None:
for net_el in nets_section.findall("net"):
net_name = net_el.get("name", "")
if not net_name:
continue
pins: list[list[str]] = []
for node in net_el.findall("node"):
ref = node.get("ref", "")
pin = node.get("pin", "")
if ref and pin:
pins.append([ref, pin])
connection_count += 1
if pins:
nets[net_name] = pins
return {
"nets": nets,
"components": components,
"statistics": {
"net_count": len(nets),
"component_count": len(components),
"connection_count": connection_count,
},
}
def _parse_csv(content: str) -> dict[str, Any]:
"""Parse a CSV/TSV netlist into nets/components dicts.
Expects columns that include at least ``Reference``, ``Pin``, and ``Net``.
Column names are matched case-insensitively. Both comma and tab
delimiters are auto-detected.
"""
# Auto-detect delimiter
delimiter = "\t" if "\t" in content.split("\n", 1)[0] else ","
reader = csv.DictReader(io.StringIO(content), delimiter=delimiter)
if reader.fieldnames is None:
raise ValueError("CSV file has no header row")
# Normalise header names to lowercase for flexible matching
field_map: dict[str, str] = {}
for f in reader.fieldnames:
fl = f.strip().lower()
field_map[fl] = f
# Find the columns we need
ref_col = field_map.get("reference") or field_map.get("ref") or field_map.get("designator")
pin_col = field_map.get("pin") or field_map.get("pin_number") or field_map.get("pin number")
net_col = field_map.get("net") or field_map.get("net_name") or field_map.get("net name")
if not ref_col:
raise ValueError("CSV missing 'Reference' (or 'Ref', 'Designator') column")
if not pin_col:
raise ValueError("CSV missing 'Pin' (or 'Pin_Number', 'Pin Number') column")
if not net_col:
raise ValueError("CSV missing 'Net' (or 'Net_Name', 'Net Name') column")
nets: dict[str, list[list[str]]] = {}
components: set[str] = set()
connection_count = 0
for row in reader:
ref = (row.get(ref_col) or "").strip()
pin = (row.get(pin_col) or "").strip()
net_name = (row.get(net_col) or "").strip()
if not ref or not pin or not net_name:
continue
nets.setdefault(net_name, []).append([ref, pin])
components.add(ref)
connection_count += 1
# Return components as a simple dict (no value/footprint info from CSV)
comp_dict: dict[str, dict[str, str]] = {ref: {} for ref in sorted(components)}
return {
"nets": nets,
"components": comp_dict,
"statistics": {
"net_count": len(nets),
"component_count": len(components),
"connection_count": connection_count,
},
}
def _detect_format(content: str, file_path: str) -> str:
"""Auto-detect netlist format from file extension and content."""
ext = os.path.splitext(file_path)[1].lower()
# Extension-based detection
if ext == ".net":
return "kicad_xml"
if ext in (".csv", ".tsv"):
return "csv"
if ext == ".xml":
return "kicad_xml"
# Content-based detection
stripped = content.lstrip()
if stripped.startswith("<?xml") or stripped.startswith("<export"):
return "kicad_xml"
# Check for CSV header pattern
first_line = stripped.split("\n", 1)[0].lower()
if any(kw in first_line for kw in ("reference", "designator", "ref,")):
return "csv"
return "unknown"
# ---------------------------------------------------------------------------
# MCP tool
# ---------------------------------------------------------------------------
@mcp.tool()
def import_netlist(
source_path: str,
format: str = "auto",
output_path: str | None = None,
) -> dict[str, Any]:
"""Import a netlist file and extract the component-pin-net graph.
Parses an external netlist into a structured JSON representation
suitable for ``verify_connectivity`` (as the ``expected`` parameter)
or for generating ``apply_batch`` wiring instructions.
Supported formats:
- ``kicad_xml`` KiCad intermediate netlist (.net XML). Most reliable
source, exported via ``kicad-cli sch export netlist``.
- ``csv`` CSV or TSV with Reference, Pin, and Net columns. Common
export from EDA tools (Altium, Eagle, etc.).
- ``auto`` detect format from file extension and content (default).
Args:
source_path: Path to the netlist file to import.
format: Netlist format (``auto``, ``kicad_xml``, ``csv``).
output_path: Optional path to write the parsed result as JSON.
Defaults to ``.mckicad/imported_netlist.json`` next to
the source file.
Returns:
Dictionary with ``nets`` (net_name [[ref, pin], ...]),
``components`` (ref metadata dict), and ``statistics``.
The ``nets`` dict is directly compatible with
``verify_connectivity``'s ``expected`` parameter.
"""
source_path = os.path.abspath(os.path.expanduser(source_path))
if not os.path.isfile(source_path):
return {
"success": False,
"error": f"Source file not found: {source_path}",
}
allowed_formats = ("auto", "kicad_xml", "csv")
fmt = format.lower().strip()
if fmt not in allowed_formats:
return {
"success": False,
"error": f"Unsupported format: {format}. Allowed: {', '.join(allowed_formats)}",
}
try:
with open(source_path) as f:
content = f.read()
except Exception as e:
return {"success": False, "error": f"Failed to read file: {e}"}
if not content.strip():
return {"success": False, "error": "Source file is empty"}
# Detect format if auto
if fmt == "auto":
fmt = _detect_format(content, source_path)
if fmt == "unknown":
return {
"success": False,
"error": (
"Could not auto-detect netlist format. "
"Specify format='kicad_xml' or format='csv' explicitly."
),
}
# Parse
try:
if fmt == "kicad_xml":
parsed = _parse_kicad_xml(content)
elif fmt == "csv":
parsed = _parse_csv(content)
else:
return {"success": False, "error": f"Parser not implemented for format: {fmt}"}
except ET.ParseError as e:
return {"success": False, "error": f"XML parse error: {e}"}
except ValueError as e:
return {"success": False, "error": str(e)}
except Exception as e:
logger.error("Netlist import failed: %s", e, exc_info=True)
return {"success": False, "error": f"Parse error: {e}"}
stats = parsed["statistics"]
logger.info(
"Imported netlist from %s (%s): %d nets, %d components, %d connections",
os.path.basename(source_path), fmt,
stats["net_count"], stats["component_count"], stats["connection_count"],
)
result: dict[str, Any] = {
"success": True,
"format_detected": fmt,
"source_path": source_path,
**parsed,
}
# Write output JSON
if output_path:
out = os.path.abspath(os.path.expanduser(output_path))
os.makedirs(os.path.dirname(out) or ".", exist_ok=True)
with open(out, "w") as f:
json.dump(parsed, f, indent=2)
else:
out = write_detail_file(
source_path, "imported_netlist.json", parsed,
)
result["output_path"] = out
return result

286
tests/test_netlist.py Normal file
View File

@ -0,0 +1,286 @@
"""Tests for netlist import tools."""
import json
import os
import textwrap
import pytest
@pytest.mark.unit
class TestParseKicadXml:
"""Tests for KiCad XML netlist parsing."""
def test_parse_basic_netlist(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
xml_content = textwrap.dedent("""\
<?xml version="1.0" encoding="UTF-8"?>
<export version="D">
<components>
<comp ref="R1">
<value>10k</value>
<footprint>Resistor_SMD:R_0402</footprint>
<libsource lib="Device" part="R"/>
</comp>
<comp ref="R2">
<value>4.7k</value>
<footprint>Resistor_SMD:R_0402</footprint>
<libsource lib="Device" part="R"/>
</comp>
<comp ref="C1">
<value>100nF</value>
<footprint>Capacitor_SMD:C_0402</footprint>
<libsource lib="Device" part="C"/>
</comp>
</components>
<nets>
<net code="1" name="GND">
<node ref="R1" pin="2"/>
<node ref="C1" pin="2"/>
</net>
<net code="2" name="VCC">
<node ref="R1" pin="1"/>
<node ref="R2" pin="1"/>
</net>
<net code="3" name="SIG">
<node ref="R2" pin="2"/>
<node ref="C1" pin="1"/>
</net>
</nets>
</export>
""")
path = os.path.join(tmp_output_dir, "test.net")
with open(path, "w") as f:
f.write(xml_content)
result = import_netlist(source_path=path)
assert result["success"] is True
assert result["format_detected"] == "kicad_xml"
assert result["statistics"]["net_count"] == 3
assert result["statistics"]["component_count"] == 3
assert result["statistics"]["connection_count"] == 6
# Check nets structure
nets = result["nets"]
assert "GND" in nets
assert ["R1", "2"] in nets["GND"]
assert ["C1", "2"] in nets["GND"]
assert "VCC" in nets
assert len(nets["VCC"]) == 2
# Check components
assert "R1" in result["components"]
assert result["components"]["R1"]["value"] == "10k"
assert result["components"]["R1"]["lib"] == "Device"
def test_auto_detects_xml_format(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
xml = '<export version="D"><components/><nets/></export>'
path = os.path.join(tmp_output_dir, "test.xml")
with open(path, "w") as f:
f.write(xml)
result = import_netlist(source_path=path, format="auto")
assert result["success"] is True
assert result["format_detected"] == "kicad_xml"
def test_malformed_xml_returns_error(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
path = os.path.join(tmp_output_dir, "bad.net")
with open(path, "w") as f:
f.write("<export><broken>")
result = import_netlist(source_path=path)
assert result["success"] is False
assert "XML parse error" in result["error"]
def test_verify_connectivity_compatible(self, tmp_output_dir):
"""The nets dict is directly usable as verify_connectivity expected param."""
from mckicad.tools.netlist import import_netlist
xml = textwrap.dedent("""\
<export version="D">
<components>
<comp ref="U1"><value>IC</value></comp>
</components>
<nets>
<net code="1" name="NET1">
<node ref="U1" pin="1"/>
<node ref="R1" pin="2"/>
</net>
</nets>
</export>
""")
path = os.path.join(tmp_output_dir, "compat.net")
with open(path, "w") as f:
f.write(xml)
result = import_netlist(source_path=path)
assert result["success"] is True
# verify_connectivity expects: {"NET1": [["U1", "1"], ["R1", "2"]]}
nets = result["nets"]
assert "NET1" in nets
assert isinstance(nets["NET1"], list)
for pin_pair in nets["NET1"]:
assert isinstance(pin_pair, list)
assert len(pin_pair) == 2
@pytest.mark.unit
class TestParseCsv:
"""Tests for CSV/TSV netlist parsing."""
def test_parse_csv_netlist(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
csv_content = textwrap.dedent("""\
Reference,Pin,Net
R1,1,VCC
R1,2,GND
C1,1,VCC
C1,2,GND
""")
path = os.path.join(tmp_output_dir, "test.csv")
with open(path, "w") as f:
f.write(csv_content)
result = import_netlist(source_path=path)
assert result["success"] is True
assert result["format_detected"] == "csv"
assert result["statistics"]["net_count"] == 2
assert result["statistics"]["component_count"] == 2
assert result["statistics"]["connection_count"] == 4
assert ["R1", "1"] in result["nets"]["VCC"]
assert ["C1", "2"] in result["nets"]["GND"]
def test_parse_tsv_netlist(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
tsv_content = "Reference\tPin\tNet\nR1\t1\tVCC\nR1\t2\tGND\n"
path = os.path.join(tmp_output_dir, "test.tsv")
with open(path, "w") as f:
f.write(tsv_content)
result = import_netlist(source_path=path, format="csv")
assert result["success"] is True
assert result["statistics"]["connection_count"] == 2
def test_csv_missing_columns(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
csv_content = "Name,Value\nR1,10k\n"
path = os.path.join(tmp_output_dir, "bad.csv")
with open(path, "w") as f:
f.write(csv_content)
result = import_netlist(source_path=path, format="csv")
assert result["success"] is False
assert "Reference" in result["error"]
def test_csv_alternative_column_names(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
csv_content = "Designator,Pin Number,Net Name\nU1,3,SDA\nU1,4,SCL\n"
path = os.path.join(tmp_output_dir, "alt.csv")
with open(path, "w") as f:
f.write(csv_content)
result = import_netlist(source_path=path, format="csv")
assert result["success"] is True
assert "SDA" in result["nets"]
assert "SCL" in result["nets"]
@pytest.mark.unit
class TestImportNetlistErrors:
"""Tests for error handling in import_netlist."""
def test_nonexistent_file(self):
from mckicad.tools.netlist import import_netlist
result = import_netlist(source_path="/tmp/nonexistent_12345.net")
assert result["success"] is False
assert "not found" in result["error"]
def test_empty_file(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
path = os.path.join(tmp_output_dir, "empty.net")
with open(path, "w") as f:
f.write("")
result = import_netlist(source_path=path)
assert result["success"] is False
assert "empty" in result["error"]
def test_unsupported_format(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
path = os.path.join(tmp_output_dir, "test.net")
with open(path, "w") as f:
f.write("some content")
result = import_netlist(source_path=path, format="spice")
assert result["success"] is False
assert "Unsupported" in result["error"]
def test_unknown_format_auto(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
path = os.path.join(tmp_output_dir, "mystery.dat")
with open(path, "w") as f:
f.write("random binary garbage 12345")
result = import_netlist(source_path=path)
assert result["success"] is False
assert "auto-detect" in result["error"]
@pytest.mark.unit
class TestOutputFile:
"""Tests for output file writing."""
def test_writes_output_json(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
xml = textwrap.dedent("""\
<export version="D">
<components><comp ref="R1"><value>1k</value></comp></components>
<nets>
<net code="1" name="GND"><node ref="R1" pin="2"/></net>
</nets>
</export>
""")
src = os.path.join(tmp_output_dir, "test.net")
with open(src, "w") as f:
f.write(xml)
out = os.path.join(tmp_output_dir, "output.json")
result = import_netlist(source_path=src, output_path=out)
assert result["success"] is True
assert result["output_path"] == out
assert os.path.isfile(out)
with open(out) as f:
data = json.load(f)
assert "nets" in data
assert "GND" in data["nets"]
def test_default_output_path(self, tmp_output_dir):
from mckicad.tools.netlist import import_netlist
xml = '<export version="D"><components/><nets/></export>'
src = os.path.join(tmp_output_dir, "test.net")
with open(src, "w") as f:
f.write(xml)
result = import_netlist(source_path=src)
assert result["success"] is True
assert "output_path" in result
assert os.path.isfile(result["output_path"])