diff --git a/src/mcp_ltspice/diff.py b/src/mcp_ltspice/diff.py new file mode 100644 index 0000000..08fcd2d --- /dev/null +++ b/src/mcp_ltspice/diff.py @@ -0,0 +1,430 @@ +"""Compare two LTspice schematics and produce a structured diff.""" + +from dataclasses import dataclass, field +from pathlib import Path + +from .schematic import parse_schematic, Schematic, Component + + +@dataclass +class ComponentChange: + """A change to a component.""" + name: str + change_type: str # "added", "removed", "modified" + symbol: str = "" + old_value: str | None = None + new_value: str | None = None + old_attributes: dict[str, str] = field(default_factory=dict) + new_attributes: dict[str, str] = field(default_factory=dict) + moved: bool = False # Position changed + + +@dataclass +class DirectiveChange: + """A change to a SPICE directive.""" + change_type: str # "added", "removed", "modified" + old_text: str | None = None + new_text: str | None = None + + +@dataclass +class SchematicDiff: + """Complete diff between two schematics.""" + component_changes: list[ComponentChange] = field(default_factory=list) + directive_changes: list[DirectiveChange] = field(default_factory=list) + nets_added: list[str] = field(default_factory=list) + nets_removed: list[str] = field(default_factory=list) + wires_added: int = 0 + wires_removed: int = 0 + + @property + def has_changes(self) -> bool: + return bool( + self.component_changes + or self.directive_changes + or self.nets_added + or self.nets_removed + or self.wires_added + or self.wires_removed + ) + + def summary(self) -> str: + """Human-readable summary of changes.""" + lines: list[str] = [] + + if not self.has_changes: + return "No changes detected." + + # Group component changes by type + added = [c for c in self.component_changes if c.change_type == "added"] + removed = [c for c in self.component_changes if c.change_type == "removed"] + modified = [c for c in self.component_changes if c.change_type == "modified"] + + if modified: + lines.append( + f"{len(modified)} component{'s' if len(modified) != 1 else ''} modified:" + ) + for c in modified: + parts: list[str] = [] + if c.old_value != c.new_value: + parts.append(f"{c.old_value} -> {c.new_value}") + if c.moved: + parts.append("moved") + # Check for attribute changes beyond value + attr_diff = _attr_diff_summary(c.old_attributes, c.new_attributes) + if attr_diff: + parts.append(attr_diff) + detail = ", ".join(parts) if parts else "attributes changed" + lines.append(f" {c.name}: {detail}") + + if added: + for c in added: + val = f" = {c.new_value}" if c.new_value else "" + lines.append( + f"1 component added: {c.name} ({c.symbol}){val}" + if len(added) == 1 + else f" {c.name} ({c.symbol}){val}" + ) + if len(added) > 1: + lines.insert( + len(lines) - len(added), + f"{len(added)} components added:", + ) + + if removed: + for c in removed: + val = f" = {c.old_value}" if c.old_value else "" + lines.append( + f"1 component removed: {c.name} ({c.symbol}){val}" + if len(removed) == 1 + else f" {c.name} ({c.symbol}){val}" + ) + if len(removed) > 1: + lines.insert( + len(lines) - len(removed), + f"{len(removed)} components removed:", + ) + + # Directive changes + dir_added = [d for d in self.directive_changes if d.change_type == "added"] + dir_removed = [d for d in self.directive_changes if d.change_type == "removed"] + dir_modified = [d for d in self.directive_changes if d.change_type == "modified"] + + for d in dir_modified: + lines.append(f"1 directive changed: {d.old_text} -> {d.new_text}") + for d in dir_added: + lines.append(f"1 directive added: {d.new_text}") + for d in dir_removed: + lines.append(f"1 directive removed: {d.old_text}") + + # Net changes + if self.nets_added: + lines.append( + f"{len(self.nets_added)} net{'s' if len(self.nets_added) != 1 else ''} " + f"added: {', '.join(self.nets_added)}" + ) + if self.nets_removed: + lines.append( + f"{len(self.nets_removed)} net{'s' if len(self.nets_removed) != 1 else ''} " + f"removed: {', '.join(self.nets_removed)}" + ) + + # Wire changes + wire_parts: list[str] = [] + if self.wires_added: + wire_parts.append( + f"{self.wires_added} wire{'s' if self.wires_added != 1 else ''} added" + ) + if self.wires_removed: + wire_parts.append( + f"{self.wires_removed} wire{'s' if self.wires_removed != 1 else ''} removed" + ) + if wire_parts: + lines.append(", ".join(wire_parts)) + + return "\n".join(lines) + + def to_dict(self) -> dict: + """Convert to JSON-serializable dict.""" + return { + "has_changes": self.has_changes, + "component_changes": [ + { + "name": c.name, + "change_type": c.change_type, + "symbol": c.symbol, + "old_value": c.old_value, + "new_value": c.new_value, + "old_attributes": c.old_attributes, + "new_attributes": c.new_attributes, + "moved": c.moved, + } + for c in self.component_changes + ], + "directive_changes": [ + { + "change_type": d.change_type, + "old_text": d.old_text, + "new_text": d.new_text, + } + for d in self.directive_changes + ], + "nets_added": self.nets_added, + "nets_removed": self.nets_removed, + "wires_added": self.wires_added, + "wires_removed": self.wires_removed, + "summary": self.summary(), + } + + +def _attr_diff_summary(old: dict[str, str], new: dict[str, str]) -> str: + """Summarize attribute differences, excluding Value (handled separately).""" + changes: list[str] = [] + all_keys = set(old) | set(new) + # Skip Value since it's reported on its own + all_keys.discard("Value") + all_keys.discard("Value2") + for key in sorted(all_keys): + old_val = old.get(key) + new_val = new.get(key) + if old_val != new_val: + if old_val is None: + changes.append(f"+{key}={new_val}") + elif new_val is None: + changes.append(f"-{key}={old_val}") + else: + changes.append(f"{key}: {old_val} -> {new_val}") + return "; ".join(changes) + + +def _normalize_directive(text: str) -> str: + """Normalize whitespace in a SPICE directive for comparison.""" + return " ".join(text.split()) + + +def _wire_set(schematic: Schematic) -> set[tuple[int, int, int, int]]: + """Convert wires to a set of coordinate tuples for comparison. + + Each wire is stored in a canonical form (smaller point first) so that + reversed wires compare equal. + """ + result: set[tuple[int, int, int, int]] = set() + for w in schematic.wires: + # Canonical ordering: sort by (x, y) so direction doesn't matter + if (w.x1, w.y1) <= (w.x2, w.y2): + result.add((w.x1, w.y1, w.x2, w.y2)) + else: + result.add((w.x2, w.y2, w.x1, w.y1)) + return result + + +def _component_map(schematic: Schematic) -> dict[str, Component]: + """Build a map of component instance name -> Component.""" + return {comp.name: comp for comp in schematic.components} + + +def _diff_components( + schema_a: Schematic, schema_b: Schematic +) -> list[ComponentChange]: + """Compare components between two schematics.""" + map_a = _component_map(schema_a) + map_b = _component_map(schema_b) + + names_a = set(map_a) + names_b = set(map_b) + + changes: list[ComponentChange] = [] + + # Removed components (in A but not B) + for name in sorted(names_a - names_b): + comp = map_a[name] + changes.append( + ComponentChange( + name=name, + change_type="removed", + symbol=comp.symbol, + old_value=comp.value, + old_attributes=dict(comp.attributes), + ) + ) + + # Added components (in B but not A) + for name in sorted(names_b - names_a): + comp = map_b[name] + changes.append( + ComponentChange( + name=name, + change_type="added", + symbol=comp.symbol, + new_value=comp.value, + new_attributes=dict(comp.attributes), + ) + ) + + # Potentially modified components (in both) + for name in sorted(names_a & names_b): + comp_a = map_a[name] + comp_b = map_b[name] + + moved = (comp_a.x, comp_a.y) != (comp_b.x, comp_b.y) + value_changed = comp_a.value != comp_b.value + attrs_changed = comp_a.attributes != comp_b.attributes + rotation_changed = ( + comp_a.rotation != comp_b.rotation or comp_a.mirror != comp_b.mirror + ) + + if moved or value_changed or attrs_changed or rotation_changed: + changes.append( + ComponentChange( + name=name, + change_type="modified", + symbol=comp_a.symbol, + old_value=comp_a.value, + new_value=comp_b.value, + old_attributes=dict(comp_a.attributes), + new_attributes=dict(comp_b.attributes), + moved=moved, + ) + ) + + return changes + + +def _diff_directives( + schema_a: Schematic, schema_b: Schematic +) -> list[DirectiveChange]: + """Compare SPICE directives between two schematics.""" + directives_a = [t.content for t in schema_a.texts if t.type == "spice"] + directives_b = [t.content for t in schema_b.texts if t.type == "spice"] + + # Normalize for comparison but keep originals for display + norm_a = {_normalize_directive(d): d for d in directives_a} + norm_b = {_normalize_directive(d): d for d in directives_b} + + keys_a = set(norm_a) + keys_b = set(norm_b) + + changes: list[DirectiveChange] = [] + + # Removed directives + for key in sorted(keys_a - keys_b): + changes.append( + DirectiveChange(change_type="removed", old_text=norm_a[key]) + ) + + # Added directives + for key in sorted(keys_b - keys_a): + changes.append( + DirectiveChange(change_type="added", new_text=norm_b[key]) + ) + + # For modified detection: directives that share a command keyword but differ. + # We match by the first token (e.g., ".tran", ".ac") to detect modifications + # vs pure add/remove pairs. + removed = [c for c in changes if c.change_type == "removed"] + added = [c for c in changes if c.change_type == "added"] + + matched_removed: set[int] = set() + matched_added: set[int] = set() + + for ri, rc in enumerate(removed): + old_cmd = (rc.old_text or "").split()[0].lower() if rc.old_text else "" + for ai, ac in enumerate(added): + if ai in matched_added: + continue + new_cmd = (ac.new_text or "").split()[0].lower() if ac.new_text else "" + if old_cmd and old_cmd == new_cmd: + matched_removed.add(ri) + matched_added.add(ai) + break + + # Rebuild the list: unmatched stay as-is, matched pairs become "modified" + final_changes: list[DirectiveChange] = [] + for ri, rc in enumerate(removed): + if ri not in matched_removed: + final_changes.append(rc) + + for ai, ac in enumerate(added): + if ai not in matched_added: + final_changes.append(ac) + + for ri in sorted(matched_removed): + rc = removed[ri] + # Find its matched added entry + old_cmd = (rc.old_text or "").split()[0].lower() + for ai, ac in enumerate(added): + new_cmd = (ac.new_text or "").split()[0].lower() if ac.new_text else "" + if ai in matched_added and old_cmd == new_cmd: + final_changes.append( + DirectiveChange( + change_type="modified", + old_text=rc.old_text, + new_text=ac.new_text, + ) + ) + break + + return final_changes + + +def _diff_nets( + schema_a: Schematic, schema_b: Schematic +) -> tuple[list[str], list[str]]: + """Compare net flags between two schematics. + + Returns: + (nets_added, nets_removed) + """ + names_a = {f.name for f in schema_a.flags} + names_b = {f.name for f in schema_b.flags} + + added = sorted(names_b - names_a) + removed = sorted(names_a - names_b) + return added, removed + + +def _diff_wires( + schema_a: Schematic, schema_b: Schematic +) -> tuple[int, int]: + """Compare wires between two schematics using set operations. + + Returns: + (wires_added, wires_removed) + """ + set_a = _wire_set(schema_a) + set_b = _wire_set(schema_b) + + added = len(set_b - set_a) + removed = len(set_a - set_b) + return added, removed + + +def diff_schematics( + path_a: Path | str, + path_b: Path | str, +) -> SchematicDiff: + """Compare two schematics and return differences. + + Args: + path_a: Path to the "before" schematic + path_b: Path to the "after" schematic + + Returns: + SchematicDiff with all changes + """ + schema_a = parse_schematic(path_a) + schema_b = parse_schematic(path_b) + + component_changes = _diff_components(schema_a, schema_b) + directive_changes = _diff_directives(schema_a, schema_b) + nets_added, nets_removed = _diff_nets(schema_a, schema_b) + wires_added, wires_removed = _diff_wires(schema_a, schema_b) + + return SchematicDiff( + component_changes=component_changes, + directive_changes=directive_changes, + nets_added=nets_added, + nets_removed=nets_removed, + wires_added=wires_added, + wires_removed=wires_removed, + ) diff --git a/src/mcp_ltspice/drc.py b/src/mcp_ltspice/drc.py new file mode 100644 index 0000000..e0959f0 --- /dev/null +++ b/src/mcp_ltspice/drc.py @@ -0,0 +1,439 @@ +"""Design Rule Checks for LTspice schematics.""" + +from collections import defaultdict +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path + +from .schematic import Schematic, parse_schematic + + +class Severity(Enum): + ERROR = "error" # Will likely cause simulation failure + WARNING = "warning" # May cause unexpected results + INFO = "info" # Suggestion for improvement + + +@dataclass +class DRCViolation: + """A single design rule violation.""" + + rule: str # Short rule identifier + severity: Severity + message: str # Human-readable description + component: str | None = None # Related component name + location: tuple[int, int] | None = None # (x, y) if applicable + + +@dataclass +class DRCResult: + """Results of a design rule check.""" + + violations: list[DRCViolation] = field(default_factory=list) + checks_run: int = 0 + + @property + def errors(self) -> list[DRCViolation]: + return [v for v in self.violations if v.severity == Severity.ERROR] + + @property + def warnings(self) -> list[DRCViolation]: + return [v for v in self.violations if v.severity == Severity.WARNING] + + @property + def passed(self) -> bool: + return len(self.errors) == 0 + + def summary(self) -> str: + """Human-readable summary.""" + total = len(self.violations) + err_count = len(self.errors) + warn_count = len(self.warnings) + info_count = total - err_count - warn_count + + if total == 0: + return f"DRC passed: {self.checks_run} checks run, no violations found." + + parts = [] + if err_count: + parts.append(f"{err_count} error{'s' if err_count != 1 else ''}") + if warn_count: + parts.append(f"{warn_count} warning{'s' if warn_count != 1 else ''}") + if info_count: + parts.append(f"{info_count} info") + + status = "FAILED" if err_count else "passed with warnings" + return ( + f"DRC {status}: {self.checks_run} checks run, " + f"{', '.join(parts)}." + ) + + def to_dict(self) -> dict: + """Convert to JSON-serializable dict.""" + return { + "passed": self.passed, + "checks_run": self.checks_run, + "summary": self.summary(), + "error_count": len(self.errors), + "warning_count": len(self.warnings), + "violations": [ + { + "rule": v.rule, + "severity": v.severity.value, + "message": v.message, + "component": v.component, + "location": list(v.location) if v.location else None, + } + for v in self.violations + ], + } + + +def run_drc(schematic_path: Path | str) -> DRCResult: + """Run all design rule checks on a schematic. + + Args: + schematic_path: Path to .asc file + + Returns: + DRCResult with all violations found + """ + sch = parse_schematic(schematic_path) + result = DRCResult() + + _check_ground(sch, result) + _check_floating_nodes(sch, result) + _check_simulation_directive(sch, result) + _check_voltage_source_loops(sch, result) + _check_component_values(sch, result) + _check_duplicate_names(sch, result) + _check_unconnected_components(sch, result) + + return result + + +def _check_ground(sch: Schematic, result: DRCResult) -> None: + """Check that at least one ground (node '0') exists.""" + result.checks_run += 1 + has_ground = any(f.name == "0" for f in sch.flags) + if not has_ground: + result.violations.append( + DRCViolation( + rule="NO_GROUND", + severity=Severity.ERROR, + message=( + "No ground node found. Every circuit needs at least " + "one ground (0) connection." + ), + ) + ) + + +def _check_floating_nodes(sch: Schematic, result: DRCResult) -> None: + """Check for nodes with only one wire connection (likely floating). + + Build a connectivity map from wires and flags. + A node is a unique (x,y) point where wires meet or flags are placed. + If a point only has one wire endpoint and no flag, it might be floating. + + This is approximate -- we cannot fully determine connectivity without + knowing component pin locations, but we can flag obvious cases. + """ + result.checks_run += 1 + + # Count how many wire endpoints touch each point + point_count: dict[tuple[int, int], int] = defaultdict(int) + for wire in sch.wires: + point_count[(wire.x1, wire.y1)] += 1 + point_count[(wire.x2, wire.y2)] += 1 + + # Flags also represent connections at a point + flag_points = {(f.x, f.y) for f in sch.flags} + + # Component positions also represent connection points (approximately) + component_points = {(c.x, c.y) for c in sch.components} + + for point, count in point_count.items(): + if count == 1 and point not in flag_points and point not in component_points: + result.violations.append( + DRCViolation( + rule="FLOATING_NODE", + severity=Severity.WARNING, + message=( + f"Possible floating node at ({point[0]}, {point[1]}). " + f"Wire endpoint has only one connection." + ), + location=point, + ) + ) + + +def _check_simulation_directive(sch: Schematic, result: DRCResult) -> None: + """Check that at least one simulation directive exists.""" + result.checks_run += 1 + directives = sch.get_spice_directives() + sim_types = [".tran", ".ac", ".dc", ".op", ".noise", ".tf"] + has_sim = any( + any(d.lower().startswith(s) for s in sim_types) for d in directives + ) + if not has_sim: + result.violations.append( + DRCViolation( + rule="NO_SIM_DIRECTIVE", + severity=Severity.ERROR, + message=( + "No simulation directive found. " + "Add .tran, .ac, .dc, .op, etc." + ), + ) + ) + + +def _check_voltage_source_loops(sch: Schematic, result: DRCResult) -> None: + """Check for voltage sources connected in parallel (short circuit). + + This is a simplified check -- look for voltage sources that share both + pin nodes via wire connectivity. Two voltage sources whose positions + connect to the same pair of nets would create a loop or conflict. + """ + result.checks_run += 1 + + # Build a union-find structure from wire connectivity so we can + # determine which points belong to the same electrical net. + parent: dict[tuple[int, int], tuple[int, int]] = {} + + def find(p: tuple[int, int]) -> tuple[int, int]: + if p not in parent: + parent[p] = p + while parent[p] != p: + parent[p] = parent[parent[p]] + p = parent[p] + return p + + def union(a: tuple[int, int], b: tuple[int, int]) -> None: + ra, rb = find(a), find(b) + if ra != rb: + parent[ra] = rb + + # Each wire merges its two endpoints into the same net + for wire in sch.wires: + union((wire.x1, wire.y1), (wire.x2, wire.y2)) + + # Also merge flag locations (named nets connect distant points with + # the same name) + flag_groups: dict[str, list[tuple[int, int]]] = defaultdict(list) + for flag in sch.flags: + flag_groups[flag.name].append((flag.x, flag.y)) + for pts in flag_groups.values(): + for pt in pts[1:]: + union(pts[0], pt) + + # Find voltage sources and approximate their pin positions. + # LTspice voltage sources have pins at the component origin and + # offset along the component axis. Standard pin spacing is 64 units + # vertically for a non-rotated voltage source (pin+ at top, pin- at + # bottom relative to the symbol origin). + voltage_sources = [ + c + for c in sch.components + if "voltage" in c.symbol.lower() + ] + + if len(voltage_sources) < 2: + return + + # Estimate pin positions per voltage source based on rotation. + # Default (R0): positive pin at (x, y-16), negative at (x, y+16) + # We use a coarse offset; the exact value depends on the symbol but + # 16 is a common half-pin-spacing in LTspice grid units. + PIN_OFFSET = 16 + + def _pin_positions(comp): + """Return approximate (positive_pin, negative_pin) coordinates.""" + x, y = comp.x, comp.y + rot = comp.rotation + if rot == 0: + return (x, y - PIN_OFFSET), (x, y + PIN_OFFSET) + elif rot == 90: + return (x + PIN_OFFSET, y), (x - PIN_OFFSET, y) + elif rot == 180: + return (x, y + PIN_OFFSET), (x, y - PIN_OFFSET) + elif rot == 270: + return (x - PIN_OFFSET, y), (x + PIN_OFFSET, y) + return (x, y - PIN_OFFSET), (x, y + PIN_OFFSET) + + def _nearest_net(pin: tuple[int, int]) -> tuple[int, int]: + """Find the nearest wire/flag point to a pin and return its net root. + + If the pin is directly on a known point, use it. Otherwise search + within a small radius for the closest connected point. + """ + if pin in parent: + return find(pin) + # Search nearby points (LTspice grid snap is typically 16 units) + best = None + best_dist = float("inf") + for pt in parent: + dx = pin[0] - pt[0] + dy = pin[1] - pt[1] + dist = dx * dx + dy * dy + if dist < best_dist: + best_dist = dist + best = pt + if best is not None and best_dist <= 32 * 32: + return find(best) + return pin # isolated -- return pin itself as its own net + + # For each pair of voltage sources, check if they share both nets + for i in range(len(voltage_sources)): + pin_a_pos, pin_a_neg = _pin_positions(voltage_sources[i]) + net_a_pos = _nearest_net(pin_a_pos) + net_a_neg = _nearest_net(pin_a_neg) + for j in range(i + 1, len(voltage_sources)): + pin_b_pos, pin_b_neg = _pin_positions(voltage_sources[j]) + net_b_pos = _nearest_net(pin_b_pos) + net_b_neg = _nearest_net(pin_b_neg) + + # Parallel if both nets match (in either polarity) + parallel = (net_a_pos == net_b_pos and net_a_neg == net_b_neg) or ( + net_a_pos == net_b_neg and net_a_neg == net_b_pos + ) + if parallel: + name_i = voltage_sources[i].name + name_j = voltage_sources[j].name + result.violations.append( + DRCViolation( + rule="VSOURCE_LOOP", + severity=Severity.ERROR, + message=( + f"Voltage sources '{name_i}' and '{name_j}' " + f"appear to be connected in parallel, which " + f"creates a short circuit / voltage conflict." + ), + component=name_i, + ) + ) + + +def _check_component_values(sch: Schematic, result: DRCResult) -> None: + """Check that components have values where expected. + + Resistors, capacitors, inductors should have values. + Voltage/current sources should have values. + Skip if the value looks like a parameter expression (e.g., "{R1}"). + """ + result.checks_run += 1 + + # Map symbol substrings to human-readable type names + value_required = { + "res": "Resistor", + "cap": "Capacitor", + "ind": "Inductor", + "voltage": "Voltage source", + "current": "Current source", + } + + for comp in sch.components: + symbol_lower = comp.symbol.lower() + matched_type = None + for pattern, label in value_required.items(): + if pattern in symbol_lower: + matched_type = label + break + + if matched_type is None: + continue + + val = comp.value + if not val or not val.strip(): + result.violations.append( + DRCViolation( + rule="MISSING_VALUE", + severity=Severity.WARNING, + message=( + f"{matched_type} '{comp.name}' has no value set." + ), + component=comp.name, + location=(comp.x, comp.y), + ) + ) + elif val.strip().startswith("{") and val.strip().endswith("}"): + # Parameter expression -- valid, skip + pass + + +def _check_duplicate_names(sch: Schematic, result: DRCResult) -> None: + """Check for duplicate component instance names.""" + result.checks_run += 1 + seen: dict[str, bool] = {} + for comp in sch.components: + if not comp.name: + continue + if comp.name in seen: + result.violations.append( + DRCViolation( + rule="DUPLICATE_NAME", + severity=Severity.ERROR, + message=f"Duplicate component name '{comp.name}'.", + component=comp.name, + location=(comp.x, comp.y), + ) + ) + else: + seen[comp.name] = True + + +def _check_unconnected_components(sch: Schematic, result: DRCResult) -> None: + """Check for components that don't seem to be connected to anything. + + A component at (x, y) should have wire endpoints near its pins. + This is approximate without knowing exact pin positions, so we check + whether any wire endpoint falls within a reasonable distance (16 + units -- one LTspice grid step) of the component's origin. + """ + result.checks_run += 1 + + PROXIMITY = 16 # LTspice grid spacing + + # Collect all wire endpoints into a set for fast lookup + wire_points: set[tuple[int, int]] = set() + for wire in sch.wires: + wire_points.add((wire.x1, wire.y1)) + wire_points.add((wire.x2, wire.y2)) + + # Also include flag positions (flags connect to nets) + flag_points: set[tuple[int, int]] = set() + for flag in sch.flags: + flag_points.add((flag.x, flag.y)) + + all_connection_points = wire_points | flag_points + + for comp in sch.components: + if not comp.name: + continue + + # Check if any connection point is within PROXIMITY of the + # component origin. We scan a small bounding box rather than + # iterating all points. + connected = False + for pt in all_connection_points: + dx = abs(pt[0] - comp.x) + dy = abs(pt[1] - comp.y) + if dx <= PROXIMITY and dy <= PROXIMITY: + connected = True + break + + if not connected: + result.violations.append( + DRCViolation( + rule="UNCONNECTED_COMPONENT", + severity=Severity.WARNING, + message=( + f"Component '{comp.name}' ({comp.symbol}) at " + f"({comp.x}, {comp.y}) has no nearby wire " + f"connections." + ), + component=comp.name, + location=(comp.x, comp.y), + ) + ) diff --git a/src/mcp_ltspice/log_parser.py b/src/mcp_ltspice/log_parser.py new file mode 100644 index 0000000..8a34cef --- /dev/null +++ b/src/mcp_ltspice/log_parser.py @@ -0,0 +1,171 @@ +"""Parse LTspice simulation log files.""" + +from dataclasses import dataclass, field +from pathlib import Path +import re + + +@dataclass +class Measurement: + """A .meas result from the log.""" + + name: str + value: float | None # None if FAILED + failed: bool = False + + +@dataclass +class SimulationLog: + """Parsed contents of an LTspice .log file.""" + + measurements: list[Measurement] = field(default_factory=list) + errors: list[str] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + elapsed_time: float | None = None + n_equations: int | None = None + n_steps: int | None = None + raw_text: str = "" + + def get_measurement(self, name: str) -> Measurement | None: + """Get a measurement by name (case-insensitive).""" + name_lower = name.lower() + for m in self.measurements: + if m.name.lower() == name_lower: + return m + return None + + def get_all_measurements(self) -> dict[str, float | None]: + """Return dict of measurement name -> value.""" + return {m.name: m.value for m in self.measurements} + + +# Patterns for measurement results. +# LTspice emits results in several formats depending on version and OS: +# name=1.23456e-006 (no spaces around '=') +# name = 1.23456e-006 (spaces around '=') +# name: 1.23456e-006 (colon separator) +# name: FAILED (measurement could not be computed) +# name=FAILED +_MEAS_VALUE_RE = re.compile( + r"^(?P\S+?)\s*[=:]\s*(?P[+-]?\d+(?:\.\d+)?(?:e[+-]?\d+)?)\s*$", + re.IGNORECASE, +) +_MEAS_FAILED_RE = re.compile( + r"^(?P\S+?)\s*[=:]?\s*FAILED\s*$", + re.IGNORECASE, +) + +# Simulation statistics patterns. +_ELAPSED_TIME_RE = re.compile( + r"Total elapsed time:\s*(?P[+-]?\d+(?:\.\d+)?)\s*seconds?", + re.IGNORECASE, +) +_N_EQUATIONS_RE = re.compile( + r"N-of-equations:\s*(?P\d+)", + re.IGNORECASE, +) +_N_STEPS_RE = re.compile( + r"N-of-steps:\s*(?P\d+)", + re.IGNORECASE, +) + +# Lines starting with ".meas" are directive echoes, not results -- skip them. +_MEAS_DIRECTIVE_RE = re.compile(r"^\s*\.meas\s", re.IGNORECASE) + + +def _is_error_line(line: str) -> bool: + """Return True if the line reports an error.""" + return bool(re.search(r"\bError\b", line, re.IGNORECASE)) + + +def _is_warning_line(line: str) -> bool: + """Return True if the line reports a warning.""" + return bool(re.search(r"\bWarning\b", line, re.IGNORECASE)) + + +def parse_log(path: Path | str) -> SimulationLog: + """Parse an LTspice .log file. + + Reads the file at *path* and extracts measurement results, errors, + warnings, and basic simulation statistics. The parser is intentionally + lenient -- unknown lines are silently ignored so that it works across + different LTspice versions and simulation types (transient, AC, DC, etc.). + """ + path = Path(path) + + # LTspice log files may be encoded as UTF-8 or Latin-1 depending on the + # platform. Try UTF-8 first, fall back to Latin-1 which never raises. + for encoding in ("utf-8", "latin-1"): + try: + raw_text = path.read_text(encoding=encoding) + break + except UnicodeDecodeError: + continue + else: + raw_text = path.read_text(encoding="latin-1", errors="replace") + + log = SimulationLog(raw_text=raw_text) + + for line in raw_text.splitlines(): + stripped = line.strip() + if not stripped: + continue + + # Skip echoed .meas directives -- they are not results. + if _MEAS_DIRECTIVE_RE.match(stripped): + continue + + # Errors and warnings. + if _is_error_line(stripped): + log.errors.append(stripped) + if _is_warning_line(stripped): + log.warnings.append(stripped) + + # Measurement: failed. + m = _MEAS_FAILED_RE.match(stripped) + if m: + log.measurements.append( + Measurement(name=m.group("name"), value=None, failed=True) + ) + continue + + # Measurement: numeric value. + m = _MEAS_VALUE_RE.match(stripped) + if m: + try: + value = float(m.group("value")) + except ValueError: + value = None + log.measurements.append( + Measurement(name=m.group("name"), value=value, failed=(value is None)) + ) + continue + + # Elapsed time. + m = _ELAPSED_TIME_RE.search(stripped) + if m: + try: + log.elapsed_time = float(m.group("seconds")) + except ValueError: + pass + continue + + # Number of equations. + m = _N_EQUATIONS_RE.search(stripped) + if m: + try: + log.n_equations = int(m.group("n")) + except ValueError: + pass + continue + + # Number of steps / iterations. + m = _N_STEPS_RE.search(stripped) + if m: + try: + log.n_steps = int(m.group("n")) + except ValueError: + pass + continue + + return log diff --git a/src/mcp_ltspice/models.py b/src/mcp_ltspice/models.py new file mode 100644 index 0000000..122e826 --- /dev/null +++ b/src/mcp_ltspice/models.py @@ -0,0 +1,355 @@ +"""Search and parse LTspice SPICE model libraries.""" + +from dataclasses import dataclass, field +from pathlib import Path +import re + +from .config import LTSPICE_LIB + +# Known SPICE model types and their categories +_DISCRETE_TYPES = frozenset({ + "NPN", "PNP", # BJTs + "NMOS", "PMOS", "VDMOS", # MOSFETs + "D", # Diodes + "NJF", "PJF", # JFETs +}) + +# Module-level cache +_cache: tuple[list["SpiceModel"], list["SpiceSubcircuit"]] | None = None + + +@dataclass +class SpiceModel: + """A .model definition.""" + name: str # e.g., "2N2222" + type: str # e.g., "NPN", "D", "NMOS", "PMOS", "PNP" + parameters: dict[str, str] = field(default_factory=dict) + source_file: str = "" + + +@dataclass +class SpiceSubcircuit: + """A .subckt definition.""" + name: str # e.g., "LT1001" + pins: list[str] = field(default_factory=list) + pin_names: list[str] = field(default_factory=list) # From comments + description: str = "" + source_file: str = "" + n_components: int = 0 + + +def search_models( + search: str | None = None, + model_type: str | None = None, + limit: int = 50, +) -> list[SpiceModel]: + """Search for .model definitions in the library. + + Args: + search: Search term for model name (case-insensitive) + model_type: Filter by type: NPN, PNP, NMOS, PMOS, D, etc. + limit: Maximum results + + Returns: + List of matching SpiceModel objects + """ + all_models, _ = _scan_all_libraries() + + results: list[SpiceModel] = [] + search_upper = search.upper() if search else None + type_upper = model_type.upper() if model_type else None + + for model in all_models: + if type_upper and model.type.upper() != type_upper: + continue + if search_upper and search_upper not in model.name.upper(): + continue + results.append(model) + if len(results) >= limit: + break + + return results + + +def search_subcircuits( + search: str | None = None, + limit: int = 50, +) -> list[SpiceSubcircuit]: + """Search for .subckt definitions in the library. + + Args: + search: Search term for subcircuit name + limit: Maximum results + + Returns: + List of matching SpiceSubcircuit objects + """ + _, all_subcircuits = _scan_all_libraries() + + results: list[SpiceSubcircuit] = [] + search_upper = search.upper() if search else None + + for subckt in all_subcircuits: + if search_upper and search_upper not in subckt.name.upper(): + continue + results.append(subckt) + if len(results) >= limit: + break + + return results + + +def get_model_details(name: str) -> SpiceModel | SpiceSubcircuit | None: + """Get detailed information about a specific model or subcircuit. + + Searches all library files for exact match (case-insensitive). + """ + all_models, all_subcircuits = _scan_all_libraries() + name_upper = name.upper() + + for model in all_models: + if model.name.upper() == name_upper: + return model + + for subckt in all_subcircuits: + if subckt.name.upper() == name_upper: + return subckt + + return None + + +def _read_file_text(path: Path) -> str: + """Read a library file, handling both UTF-16-LE and ASCII/Latin-1 encodings. + + LTspice stores some files (especially .bjt, .mos, .jft, etc.) as + UTF-16-LE without a BOM. Others are plain ASCII or Latin-1. + Binary/encrypted .sub files will raise on decode; callers handle that. + """ + raw = path.read_bytes() + if not raw: + return "" + + # Detect UTF-16-LE: check if every other byte (odd positions) is 0x00 + # for the first several bytes. This is a strong indicator of UTF-16-LE + # ASCII text without a BOM. + if len(raw) >= 20: + sample = raw[:40] + null_positions = sum(1 for i in range(1, len(sample), 2) if sample[i] == 0) + total_pairs = len(sample) // 2 + if total_pairs > 0 and null_positions / total_pairs > 0.7: + return raw.decode("utf-16-le", errors="replace") + + # Fall back to latin-1 which never fails (maps bytes 1:1 to codepoints) + return raw.decode("latin-1") + + +# Regex patterns compiled once +_MODEL_RE = re.compile( + r"^\s*\.model\s+" # .model keyword + r"(\S+)\s+" # model name + r"(?:ako:\S+\s+)?" # optional ako:reference + r"(\w+)" # type (NPN, D, VDMOS, etc.) + r"(?:\s*\(([^)]*)\))?" # optional (params) + r"(.*)", # trailing params outside parens + re.IGNORECASE, +) + +_SUBCKT_RE = re.compile( + r"^\s*\.subckt\s+" + r"(\S+)" # subcircuit name + r"((?:\s+\S+)*)", # pins (space-separated) + re.IGNORECASE, +) + +_ENDS_RE = re.compile(r"^\s*\.ends\b", re.IGNORECASE) + +_PIN_COMMENT_RE = re.compile( + r"^\s*\*\s*[Pp]in\s+\S+\s*[:=]?\s*(.*)", +) + + +def _parse_params(param_str: str) -> dict[str, str]: + """Parse SPICE parameter string into a dict. + + Handles formats like: IS=14.34f BF=200 NF=1 VAF=74.03 + """ + params: dict[str, str] = {} + if not param_str: + return params + + for match in re.finditer(r"(\w+)\s*=\s*(\S+)", param_str): + params[match.group(1)] = match.group(2) + + return params + + +def _scan_lib_file(path: Path) -> tuple[list[SpiceModel], list[SpiceSubcircuit]]: + """Scan a single library file for model and subcircuit definitions. + + Handles multi-line continuation (lines starting with +). + Silently skips binary/encrypted files that can't be decoded. + """ + models: list[SpiceModel] = [] + subcircuits: list[SpiceSubcircuit] = [] + + try: + text = _read_file_text(path) + except Exception: + return models, subcircuits + + if not text: + return models, subcircuits + + # Join continuation lines (+ at start of line continues previous line) + # Work through lines, merging continuations + raw_lines = text.splitlines() + lines: list[str] = [] + for line in raw_lines: + stripped = line.strip() + if stripped.startswith("+") and lines: + # Continuation: append to previous line (strip the +) + lines[-1] = lines[-1] + " " + stripped[1:].strip() + else: + lines.append(line) + + source = str(path.relative_to(LTSPICE_LIB)) if _is_under(path, LTSPICE_LIB) else path.name + + # State tracking for subcircuit parsing + in_subckt = False + current_subckt: SpiceSubcircuit | None = None + component_count = 0 + pin_comments: list[str] = [] + pre_subckt_comments: list[str] = [] + + for line in lines: + stripped = line.strip() + + # Track comments that might describe pins (before or after .subckt) + if stripped.startswith("*"): + if in_subckt: + pin_match = _PIN_COMMENT_RE.match(stripped) + if pin_match and current_subckt is not None: + pin_comments.append(pin_match.group(1).strip()) + else: + pre_subckt_comments.append(stripped) + continue + + if not stripped: + if not in_subckt: + pre_subckt_comments.clear() + continue + + # Check for .model + model_match = _MODEL_RE.match(stripped) + if model_match: + name = model_match.group(1) + mtype = model_match.group(2).upper() + param_str = (model_match.group(3) or "") + " " + (model_match.group(4) or "") + params = _parse_params(param_str) + + models.append(SpiceModel( + name=name, + type=mtype, + parameters=params, + source_file=source, + )) + continue + + # Check for .subckt + subckt_match = _SUBCKT_RE.match(stripped) + if subckt_match and not in_subckt: + name = subckt_match.group(1) + pin_str = subckt_match.group(2).strip() + pins = pin_str.split() if pin_str else [] + + # Extract description from preceding comments + description = "" + for comment in pre_subckt_comments: + cleaned = comment.lstrip("* ").strip() + if cleaned and not cleaned.startswith("Copyright") and len(cleaned) > 3: + description = cleaned + break + + current_subckt = SpiceSubcircuit( + name=name, + pins=pins, + description=description, + source_file=source, + ) + in_subckt = True + component_count = 0 + pin_comments.clear() + pre_subckt_comments.clear() + continue + + # Check for .ends + if _ENDS_RE.match(stripped): + if current_subckt is not None: + current_subckt.n_components = component_count + current_subckt.pin_names = pin_comments[:] + subcircuits.append(current_subckt) + in_subckt = False + current_subckt = None + pin_comments.clear() + continue + + # Count component lines inside a subcircuit + if in_subckt and stripped and not stripped.startswith("."): + # Component lines typically start with a letter (R, C, L, M, Q, D, etc.) + if stripped[0].isalpha(): + component_count += 1 + + return models, subcircuits + + +def _is_under(path: Path, parent: Path) -> bool: + """Check if path is under parent directory.""" + try: + path.relative_to(parent) + return True + except ValueError: + return False + + +def _collect_lib_files() -> list[Path]: + """Collect all scannable library files from known directories.""" + files: list[Path] = [] + extensions = {".lib", ".sub", ".mod", ".bjt", ".dio", ".mos", ".jft"} + + # Scan lib/cmp/ for component model files + cmp_dir = LTSPICE_LIB / "cmp" + if cmp_dir.is_dir(): + for f in cmp_dir.iterdir(): + if f.is_file() and f.suffix.lower() in extensions: + files.append(f) + + # Scan lib/sub/ for subcircuit files (recursive to include Contrib) + sub_dir = LTSPICE_LIB / "sub" + if sub_dir.is_dir(): + for f in sub_dir.rglob("*"): + if f.is_file() and f.suffix.lower() in extensions: + files.append(f) + + return sorted(files) + + +def _scan_all_libraries() -> tuple[list[SpiceModel], list[SpiceSubcircuit]]: + """Scan all library files. Results are cached after first call.""" + global _cache + if _cache is not None: + return _cache + + all_models: list[SpiceModel] = [] + all_subcircuits: list[SpiceSubcircuit] = [] + + for path in _collect_lib_files(): + models, subcircuits = _scan_lib_file(path) + all_models.extend(models) + all_subcircuits.extend(subcircuits) + + # Sort for consistent ordering + all_models.sort(key=lambda m: m.name.upper()) + all_subcircuits.sort(key=lambda s: s.name.upper()) + + _cache = (all_models, all_subcircuits) + return _cache diff --git a/src/mcp_ltspice/netlist.py b/src/mcp_ltspice/netlist.py new file mode 100644 index 0000000..a1e91ab --- /dev/null +++ b/src/mcp_ltspice/netlist.py @@ -0,0 +1,383 @@ +"""Programmatic SPICE netlist generation for LTspice.""" + +from dataclasses import dataclass, field +from pathlib import Path + + +@dataclass +class NetlistComponent: + """A component in the netlist.""" + + name: str # R1, C1, V1, M1, X1, etc. + nodes: list[str] # Connected node names + value: str # Value or model name + params: str = "" # Additional parameters + + +@dataclass +class Netlist: + """A SPICE netlist that can be saved as a .cir file. + + Supports a builder pattern -- all add_* methods return self for chaining: + + netlist = (Netlist("My Circuit") + .add_resistor("R1", "in", "out", "10k") + .add_capacitor("C1", "out", "0", "100n") + .add_voltage_source("V1", "in", "0", ac="1") + .add_directive(".ac dec 100 1 1meg")) + """ + + title: str = "LTspice Simulation" + components: list[NetlistComponent] = field(default_factory=list) + directives: list[str] = field(default_factory=list) + comments: list[str] = field(default_factory=list) + includes: list[str] = field(default_factory=list) + + # -- Passive components --------------------------------------------------- + + def add_resistor( + self, name: str, node_p: str, node_n: str, value: str + ) -> "Netlist": + """Add a resistor. Example: add_resistor('R1', 'in', 'out', '10k')""" + self.components.append( + NetlistComponent(name=name, nodes=[node_p, node_n], value=value) + ) + return self + + def add_capacitor( + self, name: str, node_p: str, node_n: str, value: str + ) -> "Netlist": + """Add a capacitor.""" + self.components.append( + NetlistComponent(name=name, nodes=[node_p, node_n], value=value) + ) + return self + + def add_inductor( + self, + name: str, + node_p: str, + node_n: str, + value: str, + series_resistance: str | None = None, + ) -> "Netlist": + """Add an inductor with optional series resistance (Rser).""" + params = f"Rser={series_resistance}" if series_resistance else "" + self.components.append( + NetlistComponent( + name=name, nodes=[node_p, node_n], value=value, params=params + ) + ) + return self + + # -- Sources -------------------------------------------------------------- + + def add_voltage_source( + self, + name: str, + node_p: str, + node_n: str, + dc: str | None = None, + ac: str | None = None, + pulse: tuple | None = None, + sin: tuple | None = None, + ) -> "Netlist": + """Add a voltage source. + + Args: + name: Source name (V1, V2, etc.) + node_p: Positive node + node_n: Negative node + dc: DC value (e.g., "5") + ac: AC magnitude (e.g., "1") + pulse: (Vinitial, Von, Tdelay, Trise, Tfall, Ton, Tperiod) + sin: (Voffset, Vamp, Freq, Td, Theta, Phi) + """ + value = self._build_source_value(dc=dc, ac=ac, pulse=pulse, sin=sin) + self.components.append( + NetlistComponent(name=name, nodes=[node_p, node_n], value=value) + ) + return self + + def add_current_source( + self, + name: str, + node_p: str, + node_n: str, + dc: str | None = None, + ac: str | None = None, + ) -> "Netlist": + """Add a current source.""" + value = self._build_source_value(dc=dc, ac=ac) + self.components.append( + NetlistComponent(name=name, nodes=[node_p, node_n], value=value) + ) + return self + + # -- Semiconductors ------------------------------------------------------- + + def add_diode( + self, name: str, anode: str, cathode: str, model: str + ) -> "Netlist": + """Add a diode. Example: add_diode('D1', 'a', 'k', '1N4148')""" + self.components.append( + NetlistComponent(name=name, nodes=[anode, cathode], value=model) + ) + return self + + def add_mosfet( + self, + name: str, + drain: str, + gate: str, + source: str, + body: str, + model: str, + w: str | None = None, + l: str | None = None, + ) -> "Netlist": + """Add a MOSFET.""" + params_parts: list[str] = [] + if w: + params_parts.append(f"W={w}") + if l: + params_parts.append(f"L={l}") + params = " ".join(params_parts) + self.components.append( + NetlistComponent( + name=name, + nodes=[drain, gate, source, body], + value=model, + params=params, + ) + ) + return self + + def add_bjt( + self, name: str, collector: str, base: str, emitter: str, model: str + ) -> "Netlist": + """Add a BJT transistor.""" + self.components.append( + NetlistComponent( + name=name, nodes=[collector, base, emitter], value=model + ) + ) + return self + + # -- Subcircuits ---------------------------------------------------------- + + def add_opamp( + self, + name: str, + inp: str, + inn: str, + out: str, + vpos: str, + vneg: str, + model: str, + ) -> "Netlist": + """Add an op-amp subcircuit instance. + + Pin order follows the LTspice convention: + X + """ + self.components.append( + NetlistComponent( + name=name, nodes=[inp, inn, vpos, vneg, out], value=model + ) + ) + return self + + def add_subcircuit( + self, name: str, nodes: list[str], model: str + ) -> "Netlist": + """Add a generic subcircuit instance.""" + self.components.append( + NetlistComponent(name=name, nodes=list(nodes), value=model) + ) + return self + + # -- Generic component ---------------------------------------------------- + + def add_component( + self, name: str, nodes: list[str], value: str, params: str = "" + ) -> "Netlist": + """Add any component with explicit nodes.""" + self.components.append( + NetlistComponent( + name=name, nodes=list(nodes), value=value, params=params + ) + ) + return self + + # -- Directives ----------------------------------------------------------- + + def add_directive(self, directive: str) -> "Netlist": + """Add a SPICE directive (e.g., '.tran 10m', '.ac dec 100 1 1meg').""" + self.directives.append(directive) + return self + + def add_meas(self, analysis: str, name: str, expression: str) -> "Netlist": + """Add a .meas directive. + + Example: + add_meas('tran', 'rise_time', + 'TRIG V(out) VAL=0.1 RISE=1 TARG V(out) VAL=0.9 RISE=1') + """ + self.directives.append(f".meas {analysis} {name} {expression}") + return self + + def add_param(self, name: str, value: str) -> "Netlist": + """Add a .param directive.""" + self.directives.append(f".param {name}={value}") + return self + + def add_include(self, path: str) -> "Netlist": + """Add a .include directive for library files.""" + self.includes.append(f".include {path}") + return self + + def add_lib(self, path: str) -> "Netlist": + """Add a .lib directive.""" + self.includes.append(f".lib {path}") + return self + + def add_comment(self, text: str) -> "Netlist": + """Add a comment line.""" + self.comments.append(text) + return self + + # -- Rendering / I/O ------------------------------------------------------ + + def render(self) -> str: + """Render the netlist to a SPICE string.""" + lines = [f"* {self.title}"] + + for comment in self.comments: + lines.append(f"* {comment}") + + for inc in self.includes: + lines.append(inc) # Already formatted as .include or .lib + + lines.append("") # blank separator + + for comp in self.components: + line = f"{comp.name} {' '.join(comp.nodes)} {comp.value}" + if comp.params: + line += f" {comp.params}" + lines.append(line) + + lines.append("") + + for directive in self.directives: + lines.append(directive) + + lines.append(".backanno") + lines.append(".end") + + return "\n".join(lines) + "\n" + + def save(self, path: Path | str) -> Path: + """Save netlist to a .cir file.""" + path = Path(path) + path.write_text(self.render()) + return path + + # -- Internal helpers ----------------------------------------------------- + + @staticmethod + def _build_source_value( + dc: str | None = None, + ac: str | None = None, + pulse: tuple | None = None, + sin: tuple | None = None, + ) -> str: + """Build the value string for a voltage/current source.""" + parts: list[str] = [] + if dc is not None: + parts.append(dc) + if ac is not None: + parts.append(f"AC {ac}") + if pulse is not None: + params_str = " ".join(str(p) for p in pulse) + parts.append(f"PULSE({params_str})") + if sin is not None: + params_str = " ".join(str(p) for p in sin) + parts.append(f"SIN({params_str})") + return " ".join(parts) if parts else "0" + + +# --------------------------------------------------------------------------- +# Convenience functions for common circuit topologies +# --------------------------------------------------------------------------- + + +def voltage_divider( + v_in: str = "5", + r1: str = "10k", + r2: str = "10k", + sim_type: str = "op", +) -> Netlist: + """Create a voltage divider circuit. + + Args: + v_in: Input voltage (DC). + r1: Top resistor value. + r2: Bottom resistor value. + sim_type: Simulation directive -- "op" for operating point, + or a full directive string like ".tran 10m". + """ + netlist = ( + Netlist("Voltage Divider") + .add_voltage_source("V1", "in", "0", dc=v_in) + .add_resistor("R1", "in", "out", r1) + .add_resistor("R2", "out", "0", r2) + ) + directive = sim_type if sim_type.startswith(".") else f".{sim_type}" + netlist.add_directive(directive) + return netlist + + +def rc_lowpass( + r: str = "1k", + c: str = "100n", + f_start: str = "1", + f_stop: str = "1meg", +) -> Netlist: + """Create an RC lowpass filter with AC analysis. + + The circuit is driven by a 1V AC source and outputs at node 'out'. + """ + return ( + Netlist("RC Lowpass Filter") + .add_voltage_source("V1", "in", "0", ac="1") + .add_resistor("R1", "in", "out", r) + .add_capacitor("C1", "out", "0", c) + .add_directive(f".ac dec 100 {f_start} {f_stop}") + ) + + +def inverting_amplifier( + r_in: str = "10k", + r_f: str = "100k", + opamp_model: str = "LT1001", +) -> Netlist: + """Create an inverting op-amp amplifier. + + Topology: + V1 --[R_in]--> inv(-) --[R_f]--> out + non-inv(+) --> GND + Supply: +/-15V + """ + return ( + Netlist("Inverting Amplifier") + .add_comment(f"Gain = -{r_f}/{r_in}") + .add_lib(opamp_model) + .add_voltage_source("V1", "in", "0", ac="1") + .add_voltage_source("Vpos", "vdd", "0", dc="15") + .add_voltage_source("Vneg", "0", "vss", dc="15") + .add_resistor("Rin", "in", "inv", r_in) + .add_resistor("Rf", "inv", "out", r_f) + .add_opamp("X1", "0", "inv", "out", "vdd", "vss", opamp_model) + .add_directive(".ac dec 100 1 1meg") + ) diff --git a/src/mcp_ltspice/server.py b/src/mcp_ltspice/server.py index 2622662..86f1483 100644 --- a/src/mcp_ltspice/server.py +++ b/src/mcp_ltspice/server.py @@ -1,13 +1,19 @@ """FastMCP server for LTspice circuit simulation automation. This server provides tools for: -- Running SPICE simulations -- Extracting waveform data +- Running SPICE simulations on schematics and netlists +- Extracting and analyzing waveform data +- Creating circuits programmatically - Modifying schematic components -- Browsing component libraries and examples +- Browsing component libraries, models, and examples +- Design rule checks and circuit comparison """ +import csv +import io import json +import math +import tempfile from pathlib import Path import numpy as np @@ -19,14 +25,29 @@ from .config import ( LTSPICE_LIB, validate_installation, ) +from .diff import diff_schematics as _diff_schematics +from .drc import run_drc as _run_drc +from .log_parser import parse_log +from .models import ( + get_model_details as _get_model_details, + search_models as _search_models, + search_subcircuits as _search_subcircuits, +) +from .netlist import Netlist from .raw_parser import parse_raw_file from .runner import run_netlist, run_simulation -from .schematic import ( - modify_component_value, - parse_schematic, +from .schematic import modify_component_value, parse_schematic +from .waveform_math import ( + compute_bandwidth, + compute_fft, + compute_peak_to_peak, + compute_rise_time, + compute_rms, + compute_settling_time, + compute_thd, ) -# Initialize FastMCP server + mcp = FastMCP( name="mcp-ltspice", instructions=""" @@ -35,9 +56,15 @@ mcp = FastMCP( Use this server to: - Run SPICE simulations on .asc schematics or .cir netlists - Extract waveform data (voltages, currents) from simulation results + - Analyze signals: FFT, THD, RMS, bandwidth, settling time + - Create circuits from scratch using the netlist builder - Modify component values in schematics programmatically - Browse LTspice's component library (6500+ symbols) + - Search 2800+ SPICE models and subcircuits - Access example circuits (4000+ examples) + - Run design rule checks before simulation + - Compare schematics to see what changed + - Export waveform data to CSV LTspice runs via Wine on Linux. Simulations execute in batch mode and results are parsed from binary .raw files. @@ -46,7 +73,7 @@ mcp = FastMCP( # ============================================================================ -# TOOLS +# SIMULATION TOOLS # ============================================================================ @@ -57,20 +84,13 @@ async def simulate( ) -> dict: """Run an LTspice simulation on a schematic file. - This runs LTspice in batch mode, which executes any simulation - directives (.tran, .ac, .dc, .op, etc.) in the schematic. + Executes any simulation directives (.tran, .ac, .dc, .op, etc.) + found in the schematic. Returns available signal names and + the path to the .raw file for waveform extraction. Args: schematic_path: Absolute path to .asc schematic file timeout_seconds: Maximum time to wait for simulation (default 5 min) - - Returns: - dict with: - - success: bool - - elapsed_seconds: simulation time - - variables: list of signal names available - - points: number of data points - - error: error message if failed """ result = await run_simulation( schematic_path, @@ -93,6 +113,13 @@ async def simulate( response["plotname"] = result.raw_data.plotname response["raw_file"] = str(result.raw_file) if result.raw_file else None + if result.log_file and result.log_file.exists(): + log = parse_log(result.log_file) + if log.measurements: + response["measurements"] = log.get_all_measurements() + if log.errors: + response["log_errors"] = log.errors + return response @@ -101,17 +128,11 @@ async def simulate_netlist( netlist_path: str, timeout_seconds: float = 300, ) -> dict: - """Run an LTspice simulation on a netlist file. - - Use this for .cir or .net SPICE netlist files instead of - schematic .asc files. + """Run an LTspice simulation on a netlist file (.cir or .net). Args: netlist_path: Absolute path to .cir or .net netlist file timeout_seconds: Maximum time to wait for simulation - - Returns: - dict with simulation results (same as simulate) """ result = await run_netlist( netlist_path, @@ -133,9 +154,21 @@ async def simulate_netlist( response["points"] = result.raw_data.points response["raw_file"] = str(result.raw_file) if result.raw_file else None + if result.log_file and result.log_file.exists(): + log = parse_log(result.log_file) + if log.measurements: + response["measurements"] = log.get_all_measurements() + if log.errors: + response["log_errors"] = log.errors + return response +# ============================================================================ +# WAVEFORM & ANALYSIS TOOLS +# ============================================================================ + + @mcp.tool() def get_waveform( raw_file_path: str, @@ -144,31 +177,22 @@ def get_waveform( ) -> dict: """Extract waveform data from a .raw simulation results file. - After running a simulation, use this to get the actual data values. - For transient analysis, includes time axis. For AC, includes frequency. + For transient analysis, returns time + voltage/current values. + For AC analysis, returns frequency + magnitude(dB)/phase(degrees). Args: raw_file_path: Path to .raw file from simulation - signal_names: List of signal names to extract (partial match OK) - e.g., ["V(out)", "I(R1)"] or just ["out", "R1"] - max_points: Maximum data points to return (downsampled if needed) - - Returns: - dict with: - - time_or_frequency: the x-axis data - - signals: dict mapping signal name to data array - - units: dict mapping signal name to unit type + signal_names: Signal names to extract, e.g. ["V(out)", "I(R1)"] + max_points: Maximum data points (downsampled if needed) """ raw = parse_raw_file(raw_file_path) - # Get x-axis (time or frequency) x_axis = raw.get_time() x_name = "time" if x_axis is None: x_axis = raw.get_frequency() x_name = "frequency" - # Downsample if needed total_points = len(x_axis) if x_axis is not None else raw.points step = max(1, total_points // max_points) @@ -182,21 +206,17 @@ def get_waveform( if x_axis is not None: sampled = x_axis[::step] - # For frequency domain, take real part (imag is 0) if np.iscomplexobj(sampled): result["x_axis_data"] = sampled.real.tolist() else: result["x_axis_data"] = sampled.tolist() result["returned_points"] = len(result["x_axis_data"]) - # Extract requested signals for name in signal_names: data = raw.get_variable(name) if data is not None: sampled = data[::step] - # Handle complex data (AC analysis) if np.iscomplexobj(sampled): - import math result["signals"][name] = { "magnitude_db": [ 20 * math.log10(abs(x)) if abs(x) > 0 else -200 @@ -213,6 +233,223 @@ def get_waveform( return result +@mcp.tool() +def analyze_waveform( + raw_file_path: str, + signal_name: str, + analyses: list[str], + settling_tolerance_pct: float = 2.0, + settling_final_value: float | None = None, + rise_low_pct: float = 10.0, + rise_high_pct: float = 90.0, + fft_max_harmonics: int = 50, + thd_n_harmonics: int = 10, +) -> dict: + """Analyze a signal from simulation results. + + Run one or more analyses on a waveform. Available analyses: + - "rms": Root mean square value + - "peak_to_peak": Min, max, peak-to-peak swing, mean + - "settling_time": Time to settle within tolerance of final value + - "rise_time": 10%-90% rise time (configurable) + - "fft": Frequency spectrum via FFT + - "thd": Total Harmonic Distortion + + Args: + raw_file_path: Path to .raw file + signal_name: Signal to analyze, e.g. "V(out)" + analyses: List of analysis types to run + settling_tolerance_pct: Tolerance for settling time (default 2%) + settling_final_value: Target value (None = use last sample) + rise_low_pct: Low threshold for rise time (default 10%) + rise_high_pct: High threshold for rise time (default 90%) + fft_max_harmonics: Max harmonics to return in FFT + thd_n_harmonics: Number of harmonics for THD calculation + """ + raw = parse_raw_file(raw_file_path) + + time = raw.get_time() + signal = raw.get_variable(signal_name) + + if signal is None: + return {"error": f"Signal '{signal_name}' not found. Available: " + f"{[v.name for v in raw.variables]}"} + + # Use real parts for time-domain analysis + if np.iscomplexobj(time): + time = time.real + if np.iscomplexobj(signal): + signal = np.abs(signal) + + results = {"signal": signal_name} + + for analysis in analyses: + if analysis == "rms": + results["rms"] = compute_rms(signal) + + elif analysis == "peak_to_peak": + results["peak_to_peak"] = compute_peak_to_peak(signal) + + elif analysis == "settling_time": + if time is not None: + results["settling_time"] = compute_settling_time( + time, signal, + final_value=settling_final_value, + tolerance_percent=settling_tolerance_pct, + ) + + elif analysis == "rise_time": + if time is not None: + results["rise_time"] = compute_rise_time( + time, signal, + low_pct=rise_low_pct, + high_pct=rise_high_pct, + ) + + elif analysis == "fft": + if time is not None: + results["fft"] = compute_fft( + time, signal, + max_harmonics=fft_max_harmonics, + ) + + elif analysis == "thd": + if time is not None: + results["thd"] = compute_thd( + time, signal, + n_harmonics=thd_n_harmonics, + ) + + return results + + +@mcp.tool() +def measure_bandwidth( + raw_file_path: str, + signal_name: str, + ref_db: float | None = None, +) -> dict: + """Measure -3dB bandwidth from an AC analysis result. + + Computes the frequency range where the signal is within 3dB + of its peak (or a specified reference level). + + Args: + raw_file_path: Path to .raw file from AC simulation + signal_name: Signal to measure, e.g. "V(out)" + ref_db: Reference level in dB (None = use peak) + """ + raw = parse_raw_file(raw_file_path) + + freq = raw.get_frequency() + signal = raw.get_variable(signal_name) + + if freq is None: + return {"error": "Not an AC analysis - no frequency data found"} + if signal is None: + return {"error": f"Signal '{signal_name}' not found"} + + # Convert complex signal to magnitude in dB + mag_db = np.array([ + 20 * math.log10(abs(x)) if abs(x) > 0 else -200 + for x in signal + ]) + + return compute_bandwidth(freq.real, mag_db, ref_db=ref_db) + + +@mcp.tool() +def export_csv( + raw_file_path: str, + signal_names: list[str] | None = None, + output_path: str | None = None, + max_points: int = 10000, +) -> dict: + """Export simulation waveform data to CSV format. + + Args: + raw_file_path: Path to .raw file + signal_names: Signals to export (None = all) + output_path: Where to save CSV (None = auto-generate in /tmp) + max_points: Maximum rows to export + """ + raw = parse_raw_file(raw_file_path) + + # Determine x-axis + x_axis = raw.get_time() + x_name = "time" + if x_axis is None: + x_axis = raw.get_frequency() + x_name = "frequency" + + # Select signals + if signal_names is None: + signal_names = [v.name for v in raw.variables if v.name not in (x_name, "time", "frequency")] + + # Downsample + total = raw.points + step = max(1, total // max_points) + + # Build CSV + buf = io.StringIO() + writer = csv.writer(buf) + + # Header + if x_axis is not None and np.iscomplexobj(x_axis): + headers = [x_name] + else: + headers = [x_name] + + for name in signal_names: + data = raw.get_variable(name) + if data is not None: + if np.iscomplexobj(data): + headers.extend([f"{name}_magnitude_db", f"{name}_phase_deg"]) + else: + headers.append(name) + + writer.writerow(headers) + + # Data rows + indices = range(0, total, step) + for i in indices: + row = [] + if x_axis is not None: + row.append(x_axis[i].real if np.iscomplexobj(x_axis) else x_axis[i]) + + for name in signal_names: + data = raw.get_variable(name) + if data is not None: + if np.iscomplexobj(data): + val = data[i] + row.append(20 * math.log10(abs(val)) if abs(val) > 0 else -200) + row.append(math.degrees(math.atan2(val.imag, val.real))) + else: + row.append(data[i]) + + writer.writerow(row) + + csv_content = buf.getvalue() + + # Save to file + if output_path is None: + raw_name = Path(raw_file_path).stem + output_path = str(Path(tempfile.gettempdir()) / f"{raw_name}.csv") + + Path(output_path).write_text(csv_content) + + return { + "output_path": output_path, + "rows": len(indices), + "columns": headers, + } + + +# ============================================================================ +# SCHEMATIC TOOLS +# ============================================================================ + + @mcp.tool() def read_schematic(schematic_path: str) -> dict: """Read and parse an LTspice schematic file. @@ -221,12 +458,6 @@ def read_schematic(schematic_path: str) -> dict: Args: schematic_path: Path to .asc schematic file - - Returns: - dict with: - - components: list of {name, symbol, value, x, y} - - nets: list of net/flag names - - directives: list of SPICE directive strings """ sch = parse_schematic(schematic_path) @@ -258,26 +489,16 @@ def edit_component( ) -> dict: """Modify a component's value in a schematic. - Use this to change resistor values, capacitor values, etc. - programmatically before running a simulation. - Args: schematic_path: Path to .asc schematic file component_name: Instance name like "R1", "C2", "M1" new_value: New value string, e.g., "10k", "100n", "2N7000" - output_path: Where to save modified schematic (None = overwrite) - - Returns: - dict with success status and component details + output_path: Where to save (None = overwrite original) """ try: sch = modify_component_value( - schematic_path, - component_name, - new_value, - output_path, + schematic_path, component_name, new_value, output_path, ) - comp = sch.get_component(component_name) return { "success": True, @@ -287,10 +508,114 @@ def edit_component( "symbol": comp.symbol if comp else None, } except ValueError as e: - return { - "success": False, - "error": str(e), - } + return {"success": False, "error": str(e)} + + +@mcp.tool() +def diff_schematics( + schematic_a: str, + schematic_b: str, +) -> dict: + """Compare two schematics and show what changed. + + Reports component additions, removals, value changes, + directive changes, and wire/net topology differences. + + Args: + schematic_a: Path to "before" .asc file + schematic_b: Path to "after" .asc file + """ + diff = _diff_schematics(schematic_a, schematic_b) + return diff.to_dict() + + +@mcp.tool() +def run_drc(schematic_path: str) -> dict: + """Run design rule checks on a schematic. + + Checks for common issues: + - Missing ground connection + - Floating nodes + - Missing simulation directive + - Voltage source loops + - Missing component values + - Duplicate component names + - Unconnected components + + Args: + schematic_path: Path to .asc schematic file + """ + result = _run_drc(schematic_path) + return result.to_dict() + + +# ============================================================================ +# NETLIST BUILDER TOOLS +# ============================================================================ + + +@mcp.tool() +def create_netlist( + title: str, + components: list[dict], + directives: list[str], + output_path: str | None = None, +) -> dict: + """Create a SPICE netlist programmatically and save to a .cir file. + + Build circuits from scratch without needing a graphical schematic. + The created .cir file can be simulated with simulate_netlist. + + Args: + title: Circuit title/description + components: List of component dicts, each with: + - name: Component name (R1, C1, V1, M1, X1, etc.) + - nodes: List of node names (use "0" for ground) + - value: Value or model name + - params: Optional extra parameters string + directives: List of SPICE directives, e.g.: + [".tran 10m", ".ac dec 100 1 1meg", + ".meas tran vmax MAX V(out)"] + output_path: Where to save .cir file (None = auto in /tmp) + + Example components: + [ + {"name": "V1", "nodes": ["in", "0"], "value": "AC 1"}, + {"name": "R1", "nodes": ["in", "out"], "value": "10k"}, + {"name": "C1", "nodes": ["out", "0"], "value": "100n"} + ] + """ + nl = Netlist(title=title) + + for comp in components: + nl.add_component( + name=comp["name"], + nodes=comp["nodes"], + value=comp["value"], + params=comp.get("params", ""), + ) + + for directive in directives: + nl.add_directive(directive) + + # Determine output path + if output_path is None: + safe_title = "".join(c if c.isalnum() else "_" for c in title)[:30] + output_path = str(Path(tempfile.gettempdir()) / f"{safe_title}.cir") + + saved = nl.save(output_path) + + return { + "success": True, + "output_path": str(saved), + "netlist_preview": nl.render(), + "component_count": len(nl.components), + } + + +# ============================================================================ +# LIBRARY & MODEL TOOLS +# ============================================================================ @mcp.tool() @@ -301,17 +626,10 @@ def list_symbols( ) -> dict: """List available component symbols from LTspice library. - Symbols define the graphical representation and pins of components. - Args: - category: Filter by category folder (e.g., "Opamps", "Comparators") + category: Filter by category (e.g., "Opamps", "Comparators") search: Search term for symbol name (case-insensitive) limit: Maximum results to return - - Returns: - dict with: - - symbols: list of {name, category, path} - - total_count: total matching symbols """ symbols = [] sym_dir = LTSPICE_LIB / "sym" @@ -324,27 +642,16 @@ def list_symbols( cat = str(rel_path.parent) if rel_path.parent != Path(".") else "misc" name = asy_file.stem - # Apply filters if category and cat.lower() != category.lower(): continue if search and search.lower() not in name.lower(): continue - symbols.append({ - "name": name, - "category": cat, - "path": str(asy_file), - }) + symbols.append({"name": name, "category": cat, "path": str(asy_file)}) - # Sort by name symbols.sort(key=lambda x: x["name"].lower()) total = len(symbols) - - return { - "symbols": symbols[:limit], - "total_count": total, - "returned_count": min(limit, total), - } + return {"symbols": symbols[:limit], "total_count": total, "returned_count": min(limit, total)} @mcp.tool() @@ -355,15 +662,10 @@ def list_examples( ) -> dict: """List example circuits from LTspice examples library. - Great for learning or as starting points for new designs. - Args: category: Filter by category folder search: Search term for example name limit: Maximum results to return - - Returns: - dict with list of example schematics """ examples = [] @@ -380,20 +682,11 @@ def list_examples( if search and search.lower() not in name.lower(): continue - examples.append({ - "name": name, - "category": cat, - "path": str(asc_file), - }) + examples.append({"name": name, "category": cat, "path": str(asc_file)}) examples.sort(key=lambda x: x["name"].lower()) total = len(examples) - - return { - "examples": examples[:limit], - "total_count": total, - "returned_count": min(limit, total), - } + return {"examples": examples[:limit], "total_count": total, "returned_count": min(limit, total)} @mcp.tool() @@ -404,9 +697,6 @@ def get_symbol_info(symbol_path: str) -> dict: Args: symbol_path: Path to .asy symbol file - - Returns: - dict with symbol details including pins and default attributes """ path = Path(symbol_path) if not path.exists(): @@ -448,7 +738,6 @@ def get_symbol_info(symbol_path: str) -> dict: attr_name = parts[1] attr_value = parts[2] info["attributes"][attr_name] = attr_value - if attr_name == "Description": info["description"] = attr_value elif attr_name == "Prefix": @@ -460,14 +749,67 @@ def get_symbol_info(symbol_path: str) -> dict: @mcp.tool() -def check_installation() -> dict: - """Verify LTspice and Wine are properly installed. +def search_spice_models( + search: str | None = None, + model_type: str | None = None, + limit: int = 50, +) -> dict: + """Search for SPICE .model definitions in the library. - Returns: - dict with installation status and paths + Finds transistors, diodes, and other discrete devices. + + Args: + search: Search term for model name (case-insensitive) + model_type: Filter by type: NPN, PNP, NMOS, PMOS, D, NJF, PJF + limit: Maximum results """ - ok, msg = validate_installation() + models = _search_models(search=search, model_type=model_type, limit=limit) + return { + "models": [ + { + "name": m.name, + "type": m.type, + "source_file": m.source_file, + "parameters": m.parameters, + } + for m in models + ], + "total_count": len(models), + } + +@mcp.tool() +def search_spice_subcircuits( + search: str | None = None, + limit: int = 50, +) -> dict: + """Search for SPICE .subckt definitions (op-amps, ICs, etc.). + + Args: + search: Search term for subcircuit name + limit: Maximum results + """ + subs = _search_subcircuits(search=search, limit=limit) + return { + "subcircuits": [ + { + "name": s.name, + "pins": s.pins, + "pin_names": s.pin_names, + "description": s.description, + "source_file": s.source_file, + "n_components": s.n_components, + } + for s in subs + ], + "total_count": len(subs), + } + + +@mcp.tool() +def check_installation() -> dict: + """Verify LTspice and Wine are properly installed.""" + ok, msg = validate_installation() from .config import LTSPICE_DIR, LTSPICE_EXE, WINE_PREFIX return { @@ -492,14 +834,14 @@ def check_installation() -> dict: @mcp.resource("ltspice://symbols") def resource_symbols() -> str: - """List of all available LTspice symbols organized by category.""" + """All available LTspice symbols organized by category.""" result = list_symbols(limit=10000) return json.dumps(result, indent=2) @mcp.resource("ltspice://examples") def resource_examples() -> str: - """List of all LTspice example circuits.""" + """All LTspice example circuits.""" result = list_examples(limit=10000) return json.dumps(result, indent=2) @@ -510,6 +852,122 @@ def resource_status() -> str: return json.dumps(check_installation(), indent=2) +# ============================================================================ +# PROMPTS +# ============================================================================ + + +@mcp.prompt() +def design_filter( + filter_type: str = "lowpass", + topology: str = "rc", + cutoff_freq: str = "1kHz", +) -> str: + """Guide through designing and simulating a filter circuit. + + Args: + filter_type: lowpass, highpass, bandpass, or notch + topology: rc (1st order), rlc (2nd order), or sallen-key (active) + cutoff_freq: Target cutoff frequency with units + """ + return f"""Design a {filter_type} filter with these requirements: +- Topology: {topology} +- Cutoff frequency: {cutoff_freq} + +Workflow: +1. Use create_netlist to build the circuit +2. Add .ac analysis directive for frequency sweep +3. Add .meas directive for -3dB bandwidth +4. Simulate with simulate_netlist +5. Use measure_bandwidth to verify cutoff frequency +6. Use get_waveform to inspect the frequency response +7. Adjust component values with create_netlist if needed + +Tips: +- For RC lowpass: f_c = 1/(2*pi*R*C) +- For 2nd order: Q controls peaking, Butterworth Q=0.707 +- Use search_spice_models to find op-amp models for active filters +""" + + +@mcp.prompt() +def analyze_power_supply(schematic_path: str = "") -> str: + """Guide through analyzing a power supply circuit. + + Args: + schematic_path: Path to the power supply schematic + """ + path_instruction = ( + f"The schematic is at: {schematic_path}" + if schematic_path + else "First, identify or create the power supply schematic." + ) + + return f"""Analyze a power supply circuit for key performance metrics. + +{path_instruction} + +Workflow: +1. Use read_schematic to understand the circuit topology +2. Use run_drc to check for design issues +3. Simulate with .tran analysis (include load step if applicable) +4. Use analyze_waveform with these analyses: + - "peak_to_peak" on output for ripple measurement + - "settling_time" for transient response + - "fft" on output to identify noise frequencies +5. If AC analysis available, use measure_bandwidth for loop gain + +Key metrics to extract: +- Output voltage regulation (DC accuracy) +- Ripple voltage (peak-to-peak on output) +- Load transient response (settling time after step) +- Efficiency (input power vs output power) +""" + + +@mcp.prompt() +def debug_circuit(schematic_path: str = "") -> str: + """Guide through debugging a circuit that isn't working. + + Args: + schematic_path: Path to the problematic schematic + """ + path_instruction = ( + f"The schematic is at: {schematic_path}" + if schematic_path + else "First, identify the schematic file." + ) + + return f"""Systematic approach to debugging a circuit. + +{path_instruction} + +Step 1 - Validate the schematic: +- Use run_drc to catch obvious issues (missing ground, floating nodes) +- Use read_schematic to review component values and connections + +Step 2 - Check simulation setup: +- Verify simulation directives are correct +- Check that models/subcircuits are available (search_spice_models) + +Step 3 - Run and analyze: +- Simulate the circuit +- Use get_waveform to inspect key node voltages +- Compare expected vs actual values at each stage + +Step 4 - Isolate the problem: +- Use edit_component to simplify (replace active devices with ideal) +- Use diff_schematics to track what changes fixed the issue +- Re-simulate after each change + +Common issues: +- Wrong node connections (check wire endpoints) +- Missing bias voltages or ground +- Component values off by orders of magnitude +- Wrong model (check with search_spice_models) +""" + + # ============================================================================ # ENTRY POINT # ============================================================================ @@ -517,15 +975,14 @@ def resource_status() -> str: def main(): """Run the MCP server.""" - print(f"🔌 mcp-ltspice v{__version__}") + print(f"\U0001f50c mcp-ltspice v{__version__}") print(" LTspice circuit simulation automation") - # Quick validation ok, msg = validate_installation() if ok: - print(f" ✓ {msg}") + print(f" \u2713 {msg}") else: - print(f" ⚠ {msg}") + print(f" \u26a0 {msg}") mcp.run() diff --git a/src/mcp_ltspice/waveform_math.py b/src/mcp_ltspice/waveform_math.py new file mode 100644 index 0000000..d242201 --- /dev/null +++ b/src/mcp_ltspice/waveform_math.py @@ -0,0 +1,517 @@ +"""Waveform analysis and signal processing for simulation data.""" + +import numpy as np + + +def compute_fft( + time: np.ndarray, signal: np.ndarray, max_harmonics: int = 50 +) -> dict: + """Compute FFT of a time-domain signal. + + Args: + time: Time array in seconds (must be monotonically increasing) + signal: Signal amplitude array (same length as time) + max_harmonics: Maximum number of frequency bins to return + + Returns: + Dict with frequencies, magnitudes, magnitudes_db, + fundamental_freq, and dc_offset + """ + if len(time) < 2 or len(signal) < 2: + return { + "frequencies": [], + "magnitudes": [], + "magnitudes_db": [], + "fundamental_freq": 0.0, + "dc_offset": 0.0, + } + + n = len(signal) + dt = (time[-1] - time[0]) / (n - 1) + + if dt <= 0: + return { + "frequencies": [], + "magnitudes": [], + "magnitudes_db": [], + "fundamental_freq": 0.0, + "dc_offset": float(np.mean(np.real(signal))), + } + + # Use real FFT for real-valued signals + spectrum = np.fft.rfft(np.real(signal)) + freqs = np.fft.rfftfreq(n, d=dt) + magnitudes = np.abs(spectrum) * 2.0 / n + + # DC component doesn't get the 2x factor + magnitudes[0] /= 2.0 + + dc_offset = float(magnitudes[0]) + + # Find fundamental: largest magnitude excluding DC + if len(magnitudes) > 1: + fund_idx = int(np.argmax(magnitudes[1:])) + 1 + fundamental_freq = float(freqs[fund_idx]) + else: + fund_idx = 0 + fundamental_freq = 0.0 + + # Trim to max_harmonics (plus DC bin) + limit = min(max_harmonics + 1, len(freqs)) + freqs = freqs[:limit] + magnitudes = magnitudes[:limit] + + # dB conversion with floor to avoid log(0) + magnitudes_db = 20.0 * np.log10(np.maximum(magnitudes, 1e-15)) + + return { + "frequencies": freqs.tolist(), + "magnitudes": magnitudes.tolist(), + "magnitudes_db": magnitudes_db.tolist(), + "fundamental_freq": fundamental_freq, + "dc_offset": dc_offset, + } + + +def compute_thd( + time: np.ndarray, signal: np.ndarray, n_harmonics: int = 10 +) -> dict: + """Compute Total Harmonic Distortion. + + THD is the ratio of harmonic content to the fundamental, expressed + as a percentage: sqrt(sum(V_n^2 for n>=2)) / V_1 * 100 + + Args: + time: Time array in seconds + signal: Signal amplitude array + n_harmonics: Number of harmonics to include (2nd through nth) + + Returns: + Dict with thd_percent, fundamental_freq, fundamental_magnitude, + and harmonics list + """ + if len(time) < 2 or len(signal) < 2: + return { + "thd_percent": 0.0, + "fundamental_freq": 0.0, + "fundamental_magnitude": 0.0, + "harmonics": [], + } + + n = len(signal) + dt = (time[-1] - time[0]) / (n - 1) + + if dt <= 0: + return { + "thd_percent": 0.0, + "fundamental_freq": 0.0, + "fundamental_magnitude": 0.0, + "harmonics": [], + } + + spectrum = np.fft.rfft(np.real(signal)) + freqs = np.fft.rfftfreq(n, d=dt) + magnitudes = np.abs(spectrum) * 2.0 / n + magnitudes[0] /= 2.0 # DC correction + + # Fundamental = largest non-DC peak + if len(magnitudes) <= 1: + return { + "thd_percent": 0.0, + "fundamental_freq": 0.0, + "fundamental_magnitude": 0.0, + "harmonics": [], + } + + fund_idx = int(np.argmax(magnitudes[1:])) + 1 + fundamental_freq = float(freqs[fund_idx]) + fundamental_mag = float(magnitudes[fund_idx]) + + if fundamental_mag < 1e-15: + return { + "thd_percent": 0.0, + "fundamental_freq": fundamental_freq, + "fundamental_magnitude": fundamental_mag, + "harmonics": [], + } + + # Collect harmonics by finding the bin closest to each integer multiple + harmonics = [] + harmonic_sum_sq = 0.0 + + for h in range(2, n_harmonics + 2): + target_freq = fundamental_freq * h + if target_freq > freqs[-1]: + break + + idx = int(np.argmin(np.abs(freqs - target_freq))) + mag = float(magnitudes[idx]) + mag_db = 20.0 * np.log10(max(mag, 1e-15)) + harmonic_sum_sq += mag ** 2 + + harmonics.append({ + "harmonic": h, + "frequency": float(freqs[idx]), + "magnitude": mag, + "magnitude_db": mag_db, + }) + + thd_percent = (np.sqrt(harmonic_sum_sq) / fundamental_mag) * 100.0 + + return { + "thd_percent": float(thd_percent), + "fundamental_freq": fundamental_freq, + "fundamental_magnitude": fundamental_mag, + "harmonics": harmonics, + } + + +def compute_rms(signal: np.ndarray) -> float: + """Compute RMS value of a signal. + + Args: + signal: Signal amplitude array (real or complex) + + Returns: + RMS value as a float + """ + if len(signal) == 0: + return 0.0 + + real_signal = np.real(signal) + return float(np.sqrt(np.mean(real_signal ** 2))) + + +def compute_peak_to_peak(signal: np.ndarray) -> dict: + """Compute peak-to-peak metrics. + + Args: + signal: Signal amplitude array + + Returns: + Dict with peak_to_peak, max, min, and mean values + """ + if len(signal) == 0: + return { + "peak_to_peak": 0.0, + "max": 0.0, + "min": 0.0, + "mean": 0.0, + } + + real_signal = np.real(signal) + sig_max = float(np.max(real_signal)) + sig_min = float(np.min(real_signal)) + + return { + "peak_to_peak": sig_max - sig_min, + "max": sig_max, + "min": sig_min, + "mean": float(np.mean(real_signal)), + } + + +def compute_settling_time( + time: np.ndarray, + signal: np.ndarray, + final_value: float | None = None, + tolerance_percent: float = 2.0, +) -> dict: + """Compute settling time. + + Searches backwards from the end of the signal to find the last + point where the signal was outside the tolerance band around the + final value. Settling time is measured from time[0] to that crossing. + + Args: + time: Time array in seconds + signal: Signal amplitude array + final_value: Target value. If None, uses the last sample. + tolerance_percent: Allowed deviation as a percentage of final_value + (or absolute if final_value is near zero) + + Returns: + Dict with settling_time, final_value, tolerance, and settled flag + """ + if len(time) < 2 or len(signal) < 2: + return { + "settling_time": 0.0, + "final_value": 0.0, + "tolerance": 0.0, + "settled": False, + } + + real_signal = np.real(signal) + + if final_value is None: + final_value = float(real_signal[-1]) + + # Tolerance band: percentage of final_value, but use absolute + # tolerance if final_value is near zero to avoid a degenerate band + if abs(final_value) > 1e-12: + tolerance = abs(final_value) * tolerance_percent / 100.0 + else: + # Fall back to percentage of signal range + sig_range = float(np.max(real_signal) - np.min(real_signal)) + tolerance = sig_range * tolerance_percent / 100.0 if sig_range > 0 else 1e-12 + + # Walk backwards to find where signal last left the tolerance band + outside = np.abs(real_signal - final_value) > tolerance + + if not np.any(outside): + # Signal was always within tolerance + return { + "settling_time": 0.0, + "final_value": final_value, + "tolerance": tolerance, + "settled": True, + } + + # Find the last index that was outside the band + last_outside = int(np.max(np.nonzero(outside)[0])) + + if last_outside >= len(time) - 1: + # Never settled within the captured data + return { + "settling_time": float(time[-1] - time[0]), + "final_value": final_value, + "tolerance": tolerance, + "settled": False, + } + + # Settling time = time from start to the first sample inside the band + # after the last excursion + settling_time = float(time[last_outside + 1] - time[0]) + + return { + "settling_time": settling_time, + "final_value": final_value, + "tolerance": tolerance, + "settled": True, + } + + +def compute_rise_time( + time: np.ndarray, + signal: np.ndarray, + low_pct: float = 10, + high_pct: float = 90, +) -> dict: + """Compute rise time between two percentage thresholds. + + Uses linear interpolation between samples for sub-sample accuracy. + Thresholds are computed relative to the signal's min-to-max swing. + + Args: + time: Time array in seconds + signal: Signal amplitude array + low_pct: Lower threshold as percentage of swing (default 10%) + high_pct: Upper threshold as percentage of swing (default 90%) + + Returns: + Dict with rise_time, low_threshold, high_threshold, + low_time, and high_time + """ + if len(time) < 2 or len(signal) < 2: + return { + "rise_time": 0.0, + "low_threshold": 0.0, + "high_threshold": 0.0, + "low_time": 0.0, + "high_time": 0.0, + } + + real_signal = np.real(signal) + sig_min = float(np.min(real_signal)) + sig_max = float(np.max(real_signal)) + swing = sig_max - sig_min + + if swing < 1e-15: + return { + "rise_time": 0.0, + "low_threshold": sig_min, + "high_threshold": sig_max, + "low_time": float(time[0]), + "high_time": float(time[0]), + } + + low_thresh = sig_min + swing * (low_pct / 100.0) + high_thresh = sig_min + swing * (high_pct / 100.0) + + low_time = _interpolate_crossing(time, real_signal, low_thresh, rising=True) + high_time = _interpolate_crossing(time, real_signal, high_thresh, rising=True) + + if low_time is None or high_time is None: + return { + "rise_time": 0.0, + "low_threshold": low_thresh, + "high_threshold": high_thresh, + "low_time": float(low_time) if low_time is not None else None, + "high_time": float(high_time) if high_time is not None else None, + } + + return { + "rise_time": high_time - low_time, + "low_threshold": low_thresh, + "high_threshold": high_thresh, + "low_time": low_time, + "high_time": high_time, + } + + +def _interpolate_crossing( + time: np.ndarray, + signal: np.ndarray, + threshold: float, + rising: bool = True, +) -> float | None: + """Find the first time a signal crosses a threshold, with interpolation. + + Args: + time: Time array + signal: Signal array + threshold: Value to cross + rising: If True, look for low-to-high crossing + + Returns: + Interpolated time of crossing, or None if no crossing found + """ + for i in range(len(signal) - 1): + if rising: + crosses = signal[i] <= threshold < signal[i + 1] + else: + crosses = signal[i] >= threshold > signal[i + 1] + + if crosses: + # Linear interpolation between samples + dv = signal[i + 1] - signal[i] + if abs(dv) < 1e-30: + return float(time[i]) + frac = (threshold - signal[i]) / dv + return float(time[i] + frac * (time[i + 1] - time[i])) + + return None + + +def compute_bandwidth( + frequency: np.ndarray, + magnitude_db: np.ndarray, + ref_db: float | None = None, +) -> dict: + """Compute -3dB bandwidth from frequency response data. + + Interpolates between data points to find the exact frequencies + where the response crosses the -3dB level relative to the reference. + + Args: + frequency: Frequency array in Hz (may be complex; .real is used) + magnitude_db: Magnitude in dB + ref_db: Reference level in dB. Defaults to the peak of magnitude_db. + + Returns: + Dict with bandwidth_hz, f_low, f_high, ref_db, and type + """ + if len(frequency) < 2 or len(magnitude_db) < 2: + return { + "bandwidth_hz": 0.0, + "f_low": None, + "f_high": None, + "ref_db": 0.0, + "type": "unknown", + } + + freq = np.real(frequency).astype(np.float64) + mag = np.real(magnitude_db).astype(np.float64) + + # Sort by frequency (in case data isn't ordered) + sort_idx = np.argsort(freq) + freq = freq[sort_idx] + mag = mag[sort_idx] + + # Strip any negative frequencies + positive_mask = freq >= 0 + freq = freq[positive_mask] + mag = mag[positive_mask] + + if len(freq) < 2: + return { + "bandwidth_hz": 0.0, + "f_low": None, + "f_high": None, + "ref_db": 0.0, + "type": "unknown", + } + + if ref_db is None: + ref_db = float(np.max(mag)) + + cutoff = ref_db - 3.0 + + # Find all -3dB crossings by checking where magnitude crosses the cutoff + above = mag >= cutoff + crossings = [] + + for i in range(len(mag) - 1): + if above[i] != above[i + 1]: + # Interpolate the exact crossing frequency + dm = mag[i + 1] - mag[i] + if abs(dm) < 1e-30: + f_cross = float(freq[i]) + else: + frac = (cutoff - mag[i]) / dm + f_cross = float(freq[i] + frac * (freq[i + 1] - freq[i])) + crossings.append(f_cross) + + if not crossings: + # No crossing found - check if entirely above or below + if np.all(above): + return { + "bandwidth_hz": float(freq[-1] - freq[0]), + "f_low": None, + "f_high": None, + "ref_db": ref_db, + "type": "unknown", + } + return { + "bandwidth_hz": 0.0, + "f_low": None, + "f_high": None, + "ref_db": ref_db, + "type": "unknown", + } + + # Classify response shape + peak_idx = int(np.argmax(mag)) + + if len(crossings) == 1: + f_cross = crossings[0] + if peak_idx < len(freq) // 2: + # Peak is at low end => lowpass + return { + "bandwidth_hz": f_cross, + "f_low": None, + "f_high": f_cross, + "ref_db": ref_db, + "type": "lowpass", + } + else: + # Peak is at high end => highpass + return { + "bandwidth_hz": float(freq[-1]) - f_cross, + "f_low": f_cross, + "f_high": None, + "ref_db": ref_db, + "type": "highpass", + } + + # Two or more crossings => bandpass (use first and last) + f_low = crossings[0] + f_high = crossings[-1] + + return { + "bandwidth_hz": f_high - f_low, + "f_low": f_low, + "f_high": f_high, + "ref_db": ref_db, + "type": "bandpass", + }