From ba649d2a6e628769f73e8cc2dd1fa17d1f3bca06 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 10 Feb 2026 23:05:35 -0700 Subject: [PATCH] Add stability, power, optimization, batch, and schematic generation tools Phase 3 features bringing the server to 27 tools: - Stepped/multi-run .raw file parsing (.step, .mc, .temp) - Stability analysis (gain/phase margin from AC loop gain) - Power analysis (average, RMS, efficiency, power factor) - Safe waveform expression evaluator (recursive-descent parser) - Component value optimizer (binary search + coordinate descent) - Batch simulation: parameter sweep, temperature sweep, Monte Carlo - .asc schematic generation from templates (RC filter, divider, inverting amp) - Touchstone .s1p/.s2p/.snp S-parameter file parsing - 7 new netlist templates (diff amp, common emitter, buck, LDO, oscillator, H-bridge) - Full ruff lint and format compliance across all modules --- src/mcp_ltspice/asc_generator.py | 329 ++++++++++++ src/mcp_ltspice/batch.py | 324 ++++++++++++ src/mcp_ltspice/config.py | 7 +- src/mcp_ltspice/diff.py | 37 +- src/mcp_ltspice/drc.py | 45 +- src/mcp_ltspice/log_parser.py | 6 +- src/mcp_ltspice/models.py | 58 ++- src/mcp_ltspice/netlist.py | 393 +++++++++++++-- src/mcp_ltspice/optimizer.py | 809 ++++++++++++++++++++++++++++++ src/mcp_ltspice/power_analysis.py | 196 ++++++++ src/mcp_ltspice/raw_parser.py | 121 ++++- src/mcp_ltspice/runner.py | 51 +- src/mcp_ltspice/schematic.py | 48 +- src/mcp_ltspice/server.py | 570 +++++++++++++++++++-- src/mcp_ltspice/stability.py | 167 ++++++ src/mcp_ltspice/touchstone.py | 254 ++++++++++ src/mcp_ltspice/waveform_expr.py | 299 +++++++++++ src/mcp_ltspice/waveform_math.py | 26 +- 18 files changed, 3493 insertions(+), 247 deletions(-) create mode 100644 src/mcp_ltspice/asc_generator.py create mode 100644 src/mcp_ltspice/batch.py create mode 100644 src/mcp_ltspice/optimizer.py create mode 100644 src/mcp_ltspice/power_analysis.py create mode 100644 src/mcp_ltspice/stability.py create mode 100644 src/mcp_ltspice/touchstone.py create mode 100644 src/mcp_ltspice/waveform_expr.py diff --git a/src/mcp_ltspice/asc_generator.py b/src/mcp_ltspice/asc_generator.py new file mode 100644 index 0000000..7317ab6 --- /dev/null +++ b/src/mcp_ltspice/asc_generator.py @@ -0,0 +1,329 @@ +"""Programmatic LTspice .asc schematic file generation. + +Generates graphical schematics (the .asc format LTspice uses for its GUI), +not just text netlists. Placed components snap to an 80-pixel grid and +auto-wired with horizontal left-to-right signal flow. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path + +# LTspice grid spacing -- all coordinates should be multiples of this +GRID = 80 + + +@dataclass +class _WireEntry: + x1: int + y1: int + x2: int + y2: int + + +@dataclass +class _SymbolEntry: + symbol: str + name: str + value: str + x: int + y: int + rotation: int # 0, 90, 180, 270 + windows: list[str] = field(default_factory=list) + + +@dataclass +class _FlagEntry: + x: int + y: int + name: str + + +@dataclass +class _TextEntry: + x: int + y: int + content: str + + +class AscSchematic: + """Builder for LTspice .asc schematic files. + + All ``add_*`` methods return ``self`` for chaining:: + + sch = (AscSchematic() + .add_component("res", "R1", "1k", 160, 176, rotation=90) + .add_wire(80, 176, 160, 176) + .add_ground(80, 256)) + """ + + def __init__(self, sheet_w: int = 880, sheet_h: int = 680) -> None: + self._sheet_w = sheet_w + self._sheet_h = sheet_h + self._wires: list[_WireEntry] = [] + self._symbols: list[_SymbolEntry] = [] + self._flags: list[_FlagEntry] = [] + self._texts: list[_TextEntry] = [] + + # -- builder methods ----------------------------------------------------- + + def add_component( + self, + symbol: str, + name: str, + value: str, + x: int, + y: int, + rotation: int = 0, + ) -> AscSchematic: + """Place a component symbol. + + Args: + symbol: LTspice symbol name (``res``, ``cap``, ``voltage``, etc.) + name: Instance name (``R1``, ``C1``, ``V1``, ...) + value: Component value (``1k``, ``100n``, ``AC 1``, ...) + x: X coordinate (should be on 80-pixel grid) + y: Y coordinate + rotation: 0, 90, 180, or 270 degrees + """ + windows: list[str] = [] + # For resistors and inductors placed at R90, shift the WINDOW lines + # so labels sit neatly above/below the body + if rotation == 90 and symbol in ("res", "ind", "ind2"): + windows = [ + "WINDOW 0 0 56 VBottom 2", + "WINDOW 3 32 56 VTop 2", + ] + self._symbols.append(_SymbolEntry(symbol, name, value, x, y, rotation, windows)) + return self + + def add_wire(self, x1: int, y1: int, x2: int, y2: int) -> AscSchematic: + """Add a wire segment between two points.""" + self._wires.append(_WireEntry(x1, y1, x2, y2)) + return self + + def add_ground(self, x: int, y: int) -> AscSchematic: + """Place a ground flag (net name ``0``).""" + self._flags.append(_FlagEntry(x, y, "0")) + return self + + def add_net_label(self, name: str, x: int, y: int) -> AscSchematic: + """Place a named net label (e.g., ``out``, ``vdd``).""" + self._flags.append(_FlagEntry(x, y, name)) + return self + + def add_directive(self, text: str, x: int, y: int) -> AscSchematic: + """Add a SPICE directive (rendered with ``!`` prefix).""" + self._texts.append(_TextEntry(x, y, text)) + return self + + # -- output -------------------------------------------------------------- + + def render(self) -> str: + """Render the schematic to an ``.asc`` format string.""" + lines: list[str] = [] + lines.append("Version 4") + lines.append(f"SHEET 1 {self._sheet_w} {self._sheet_h}") + + for w in self._wires: + lines.append(f"WIRE {w.x1} {w.y1} {w.x2} {w.y2}") + + for f in self._flags: + lines.append(f"FLAG {f.x} {f.y} {f.name}") + + for s in self._symbols: + lines.append(f"SYMBOL {s.symbol} {s.x} {s.y} R{s.rotation}") + for win in s.windows: + lines.append(win) + lines.append(f"SYMATTR InstName {s.name}") + lines.append(f"SYMATTR Value {s.value}") + + for t in self._texts: + lines.append(f"TEXT {t.x} {t.y} Left 2 !{t.content}") + + return "\n".join(lines) + "\n" + + def save(self, path: Path | str) -> Path: + """Write the schematic to an ``.asc`` file on disk.""" + path = Path(path) + path.write_text(self.render(), encoding="utf-8") + return path + + +# ============================================================================ +# Layout helper functions -- auto-placed, ready-to-simulate schematics +# ============================================================================ + + +def generate_rc_lowpass(r: str = "1k", c: str = "100n") -> AscSchematic: + """Generate an RC lowpass filter schematic with AC analysis. + + Signal flow (left to right):: + + V1 --[R1]-- out --+ + | + [C1] + | + GND + + Args: + r: Resistor value (e.g., ``"1k"``, ``"4.7k"``) + c: Capacitor value (e.g., ``"100n"``, ``"10p"``) + + Returns: + An ``AscSchematic`` ready to ``.save()`` or ``.render()``. + """ + # Grid positions (all multiples of GRID=80) + # V1 at x=80, R1 from 160..240, C1 at 304, out label at 304 + sch = AscSchematic() + + # Wires: V1_p -> R1_in, R1_out -> C1_top + sch.add_wire(80, 176, 160, 176) + sch.add_wire(240, 176, 304, 176) + + # Vertical wire for cap bottom to ground + sch.add_wire(304, 256, 304, 256) + + # Components + sch.add_component("res", "R1", r, 160, 176, rotation=90) + sch.add_component("cap", "C1", c, 288, 192, rotation=0) + sch.add_component("voltage", "V1", "AC 1", 80, 176, rotation=0) + + # Ground flags (V1 bottom and C1 bottom share ground) + sch.add_ground(80, 256) + sch.add_ground(304, 256) + + # Output net label + sch.add_net_label("out", 304, 176) + + # Simulation directive + sch.add_directive(".ac dec 100 1 1meg", 80, 312) + + return sch + + +def generate_voltage_divider( + r1: str = "10k", + r2: str = "10k", + vin: str = "5", +) -> AscSchematic: + """Generate a voltage divider schematic with operating-point analysis. + + Topology:: + + V1 --[R1]-- out --[R2]-- GND + + Args: + r1: Top resistor value + r2: Bottom resistor value + vin: DC input voltage + + Returns: + An ``AscSchematic`` ready to ``.save()`` or ``.render()``. + """ + sch = AscSchematic() + + # Layout: V1 on left, R1 horizontal, junction = "out", R2 vertical to gnd + # V1 at (80, 176) + # R1 from (160, 176) to (240, 176) -- horizontal (R90) + # R2 from (320, 192) to (320, 272) -- vertical (R0) + + # Wires + sch.add_wire(80, 176, 160, 176) # V1_p to R1_left + sch.add_wire(240, 176, 320, 176) # R1_right to R2_top / out node + + # Components + sch.add_component("res", "R1", r1, 160, 176, rotation=90) + sch.add_component("res", "R2", r2, 304, 192, rotation=0) + sch.add_component("voltage", "V1", vin, 80, 176, rotation=0) + + # Ground + sch.add_ground(80, 256) + sch.add_ground(320, 272) + + # Net label + sch.add_net_label("out", 320, 176) + + # Directive + sch.add_directive(".op", 80, 312) + + return sch + + +def generate_inverting_amp( + rin: str = "10k", + rf: str = "100k", + opamp_model: str = "UniversalOpamp2", +) -> AscSchematic: + """Generate an inverting op-amp amplifier schematic. + + Topology:: + + V1 --[Rin]--> inv(-) --[Rf]--> out + non-inv(+) --> GND + Supply: Vpos=+15V, Vneg=-15V + + The gain is ``-Rf/Rin``. + + Args: + rin: Input resistor value + rf: Feedback resistor value + opamp_model: Op-amp symbol name (default ``UniversalOpamp2``, + the built-in ideal op-amp that needs no external model file) + + Returns: + An ``AscSchematic`` ready to ``.save()`` or ``.render()``. + """ + sch = AscSchematic(sheet_w=1200, sheet_h=880) + + # Coordinate plan (all on 80-grid): + # V1 source at (80, 320) + # Rin horizontal from (192, 320) rotation=90 + # Opamp at (400, 320) -- inv input top-left, non-inv bottom-left, out right + # Rf from (400, 240) across top to output, rotation=90 + # Output net at (560, 320) + # Vpos at (480, 160), Vneg at (480, 480) + + # -- wires --------------------------------------------------------------- + # V1_p to Rin left + sch.add_wire(80, 320, 192, 320) + # Rin right to opamp inv input + sch.add_wire(272, 320, 400, 320) + # Opamp inv to Rf left (vertical up to Rf row, then horizontal) + sch.add_wire(400, 320, 400, 240) + sch.add_wire(400, 240, 416, 240) + # Rf right to output + sch.add_wire(496, 240, 560, 240) + # Output down to opamp output level + sch.add_wire(560, 240, 560, 336) + # Opamp output to out node + sch.add_wire(496, 336, 560, 336) + # Opamp non-inv to ground + sch.add_wire(400, 352, 400, 400) + # Supply wires + sch.add_wire(448, 288, 448, 240) + sch.add_wire(448, 240, 480, 240) + sch.add_wire(448, 368, 448, 416) + sch.add_wire(448, 416, 480, 416) + + # -- components ---------------------------------------------------------- + sch.add_component("voltage", "V1", "AC 1", 80, 320, rotation=0) + sch.add_component("res", "Rin", rin, 192, 320, rotation=90) + sch.add_component("res", "Rf", rf, 416, 240, rotation=90) + sch.add_component(f"Opamps\\\\{opamp_model}", "U1", opamp_model, 400, 304, rotation=0) + sch.add_component("voltage", "Vpos", "15", 480, 160, rotation=0) + sch.add_component("voltage", "Vneg", "15", 480, 416, rotation=0) + + # -- ground & labels ----------------------------------------------------- + sch.add_ground(80, 400) + sch.add_ground(400, 400) + sch.add_ground(480, 240) + sch.add_ground(480, 496) + sch.add_net_label("out", 560, 336) + sch.add_net_label("inv", 400, 320) + + # -- directives ---------------------------------------------------------- + sch.add_directive(".ac dec 100 1 1meg", 80, 544) + + return sch diff --git a/src/mcp_ltspice/batch.py b/src/mcp_ltspice/batch.py new file mode 100644 index 0000000..0fa77e4 --- /dev/null +++ b/src/mcp_ltspice/batch.py @@ -0,0 +1,324 @@ +"""Batch simulation runner for parameter sweeps and Monte Carlo analysis. + +Runs multiple LTspice simulations sequentially (LTspice under Wine is +single-instance) and collects results into a single BatchResult. +""" + +from __future__ import annotations + +import random +import re +import tempfile +import time +from dataclasses import dataclass, field +from pathlib import Path + +from .runner import SimulationResult, run_netlist + + +@dataclass +class BatchResult: + """Aggregated results from a batch of simulations.""" + + results: list[SimulationResult] = field(default_factory=list) + parameter_values: list[dict] = field(default_factory=list) + total_elapsed: float = 0.0 + + @property + def success_count(self) -> int: + return sum(1 for r in self.results if r.success) + + @property + def failure_count(self) -> int: + return sum(1 for r in self.results if not r.success) + + +async def run_batch( + netlists: list[tuple[str, str]], + timeout: float = 300, +) -> BatchResult: + """Run a list of netlists sequentially and collect results. + + Args: + netlists: List of ``(name, netlist_text)`` pairs. Each netlist + is written to a temp ``.cir`` file and simulated. + timeout: Per-simulation timeout in seconds. + + Returns: + BatchResult with one SimulationResult per netlist. + """ + batch = BatchResult() + start = time.monotonic() + + for name, text in netlists: + safe = _safe_filename(name) + work_dir = Path(tempfile.mkdtemp(prefix=f"ltbatch_{safe}_")) + cir_path = work_dir / f"{safe}.cir" + cir_path.write_text(text, encoding="utf-8") + + result = await run_netlist(cir_path, timeout=timeout, work_dir=work_dir) + batch.results.append(result) + batch.parameter_values.append({"name": name}) + + batch.total_elapsed = time.monotonic() - start + return batch + + +async def run_parameter_sweep( + netlist_template: str, + param_name: str, + values: list[float], + timeout: float = 300, +) -> BatchResult: + """Sweep a single parameter across a range of values. + + The *netlist_template* should contain a ``.param`` directive for + *param_name* whose value will be replaced for each run. If no + ``.param`` line is found, one is inserted before the ``.end`` + directive. + + Args: + netlist_template: Netlist text with a ``.param {param_name}=...`` line. + param_name: Parameter to sweep (e.g., ``"Rval"``). + values: List of numeric values to substitute. + timeout: Per-simulation timeout in seconds. + + Returns: + BatchResult indexed by parameter value. + """ + batch = BatchResult() + start = time.monotonic() + + param_re = re.compile( + rf"(\.param\s+{re.escape(param_name)}\s*=\s*)(\S+)", + re.IGNORECASE, + ) + + for val in values: + val_str = _format_value(val) + + if param_re.search(netlist_template): + text = param_re.sub(rf"\g<1>{val_str}", netlist_template) + else: + # Insert a .param line before .end + text = _insert_before_end(netlist_template, f".param {param_name}={val_str}") + + safe = f"sweep_{param_name}_{val_str}" + work_dir = Path(tempfile.mkdtemp(prefix=f"ltbatch_{_safe_filename(safe)}_")) + cir_path = work_dir / f"{_safe_filename(safe)}.cir" + cir_path.write_text(text, encoding="utf-8") + + result = await run_netlist(cir_path, timeout=timeout, work_dir=work_dir) + batch.results.append(result) + batch.parameter_values.append({param_name: val}) + + batch.total_elapsed = time.monotonic() - start + return batch + + +async def run_temperature_sweep( + netlist_text: str, + temperatures: list[float], + timeout: float = 300, +) -> BatchResult: + """Run the same netlist at different temperatures. + + A ``.temp`` directive is added (or replaced) for each run. + + Args: + netlist_text: Base netlist text. + temperatures: List of temperatures in degrees C. + timeout: Per-simulation timeout in seconds. + + Returns: + BatchResult indexed by temperature. + """ + batch = BatchResult() + start = time.monotonic() + + temp_re = re.compile(r"\.temp\s+\S+", re.IGNORECASE) + + for temp in temperatures: + temp_str = _format_value(temp) + + if temp_re.search(netlist_text): + text = temp_re.sub(f".temp {temp_str}", netlist_text) + else: + text = _insert_before_end(netlist_text, f".temp {temp_str}") + + safe = f"temp_{temp_str}" + work_dir = Path(tempfile.mkdtemp(prefix=f"ltbatch_{_safe_filename(safe)}_")) + cir_path = work_dir / f"{_safe_filename(safe)}.cir" + cir_path.write_text(text, encoding="utf-8") + + result = await run_netlist(cir_path, timeout=timeout, work_dir=work_dir) + batch.results.append(result) + batch.parameter_values.append({"temperature": temp}) + + batch.total_elapsed = time.monotonic() - start + return batch + + +async def run_monte_carlo( + netlist_text: str, + n_runs: int, + tolerances: dict[str, float], + timeout: float = 300, + seed: int | None = None, +) -> BatchResult: + """Monte Carlo analysis with component tolerances. + + For each run, every component listed in *tolerances* has its value + randomly varied using a normal distribution truncated to +/-3 sigma, + where sigma equals the tolerance fraction. + + Args: + netlist_text: Base netlist text. + n_runs: Number of Monte Carlo iterations. + tolerances: Mapping of component name to tolerance fraction + (e.g., ``{"R1": 0.05}`` for 5%). + timeout: Per-simulation timeout in seconds. + seed: Optional RNG seed for reproducibility. + + Returns: + BatchResult with per-run component values in ``parameter_values``. + """ + rng = random.Random(seed) + batch = BatchResult() + start = time.monotonic() + + # Pre-extract nominal values from the netlist + nominals = _extract_component_values(netlist_text, list(tolerances.keys())) + + for run_idx in range(n_runs): + text = netlist_text + params: dict[str, float] = {} + + for comp_name, tol_frac in tolerances.items(): + nominal = nominals.get(comp_name) + if nominal is None: + continue + + # Normal distribution, clamp to +/-3sigma + sigma = nominal * tol_frac + deviation = rng.gauss(0, sigma) + deviation = max(-3 * sigma, min(3 * sigma, deviation)) + varied = nominal + deviation + params[comp_name] = varied + + # Replace the component value in the netlist text + text = _replace_component_value(text, comp_name, _format_value(varied)) + + safe = f"mc_{run_idx:04d}" + work_dir = Path(tempfile.mkdtemp(prefix=f"ltbatch_{safe}_")) + cir_path = work_dir / f"{safe}.cir" + cir_path.write_text(text, encoding="utf-8") + + result = await run_netlist(cir_path, timeout=timeout, work_dir=work_dir) + batch.results.append(result) + batch.parameter_values.append(params) + + batch.total_elapsed = time.monotonic() - start + return batch + + +# ============================================================================ +# Internal helpers +# ============================================================================ + +# SPICE engineering suffixes +_SUFFIX_MAP = { + "T": 1e12, + "G": 1e9, + "MEG": 1e6, + "K": 1e3, + "M": 1e-3, + "U": 1e-6, + "N": 1e-9, + "P": 1e-12, + "F": 1e-15, +} + + +def _parse_spice_value(s: str) -> float | None: + """Parse a SPICE value string like ``10k``, ``100n``, ``4.7meg`` to float.""" + s = s.strip().upper() + if not s: + return None + + # Try plain float first + try: + return float(s) + except ValueError: + pass + + # Try suffixed form + for suffix, mult in sorted(_SUFFIX_MAP.items(), key=lambda x: -len(x[0])): + if s.endswith(suffix): + num_part = s[: -len(suffix)] + try: + return float(num_part) * mult + except ValueError: + continue + + return None + + +def _format_value(v: float) -> str: + """Format a float for SPICE, using engineering suffixes where tidy.""" + if v == 0: + return "0" + + abs_v = abs(v) + # Walk the suffix table from large to small + for suffix, mult in sorted(_SUFFIX_MAP.items(), key=lambda x: -x[1]): + if abs_v >= mult * 0.999: + scaled = v / mult + formatted = f"{scaled:g}{suffix.lower()}" + return formatted + + return f"{v:g}" + + +def _safe_filename(name: str) -> str: + """Turn an arbitrary name into a safe filename fragment.""" + return re.sub(r"[^\w\-.]", "_", name)[:60] + + +def _insert_before_end(netlist: str, directive: str) -> str: + """Insert a directive line just before ``.end``.""" + end_re = re.compile(r"^(\.end\b)", re.IGNORECASE | re.MULTILINE) + if end_re.search(netlist): + return end_re.sub(f"{directive}\n\\1", netlist) + # No .end found -- append + return netlist.rstrip() + f"\n{directive}\n.end\n" + + +def _extract_component_values(netlist: str, names: list[str]) -> dict[str, float]: + """Extract numeric component values from a netlist by instance name. + + Looks for lines like ``R1 in out 10k`` and parses the value token. + """ + result: dict[str, float] = {} + + for name in names: + pattern = re.compile( + rf"^\s*{re.escape(name)}\s+\S+\s+\S+\s+(\S+)", + re.IGNORECASE | re.MULTILINE, + ) + match = pattern.search(netlist) + if match: + parsed = _parse_spice_value(match.group(1)) + if parsed is not None: + result[name] = parsed + + return result + + +def _replace_component_value(netlist: str, comp_name: str, new_value: str) -> str: + """Replace a component's value token in the netlist text.""" + pattern = re.compile( + rf"^(\s*{re.escape(comp_name)}\s+\S+\s+\S+\s+)\S+", + re.IGNORECASE | re.MULTILINE, + ) + return pattern.sub(rf"\g<1>{new_value}", netlist, count=1) diff --git a/src/mcp_ltspice/config.py b/src/mcp_ltspice/config.py index b8a8d5b..bb684f6 100644 --- a/src/mcp_ltspice/config.py +++ b/src/mcp_ltspice/config.py @@ -4,10 +4,9 @@ import os from pathlib import Path # LTspice installation paths -LTSPICE_DIR = Path(os.environ.get( - "LTSPICE_DIR", - Path.home() / "claude" / "ltspice" / "extracted" / "ltspice" -)) +LTSPICE_DIR = Path( + os.environ.get("LTSPICE_DIR", Path.home() / "claude" / "ltspice" / "extracted" / "ltspice") +) LTSPICE_EXE = LTSPICE_DIR / "LTspice.exe" LTSPICE_LIB = LTSPICE_DIR / "lib" diff --git a/src/mcp_ltspice/diff.py b/src/mcp_ltspice/diff.py index 08fcd2d..5789737 100644 --- a/src/mcp_ltspice/diff.py +++ b/src/mcp_ltspice/diff.py @@ -3,12 +3,13 @@ from dataclasses import dataclass, field from pathlib import Path -from .schematic import parse_schematic, Schematic, Component +from .schematic import Component, Schematic, parse_schematic @dataclass class ComponentChange: """A change to a component.""" + name: str change_type: str # "added", "removed", "modified" symbol: str = "" @@ -22,6 +23,7 @@ class ComponentChange: @dataclass class DirectiveChange: """A change to a SPICE directive.""" + change_type: str # "added", "removed", "modified" old_text: str | None = None new_text: str | None = None @@ -30,6 +32,7 @@ class DirectiveChange: @dataclass class SchematicDiff: """Complete diff between two schematics.""" + component_changes: list[ComponentChange] = field(default_factory=list) directive_changes: list[DirectiveChange] = field(default_factory=list) nets_added: list[str] = field(default_factory=list) @@ -61,9 +64,7 @@ class SchematicDiff: modified = [c for c in self.component_changes if c.change_type == "modified"] if modified: - lines.append( - f"{len(modified)} component{'s' if len(modified) != 1 else ''} modified:" - ) + lines.append(f"{len(modified)} component{'s' if len(modified) != 1 else ''} modified:") for c in modified: parts: list[str] = [] if c.old_value != c.new_value: @@ -223,9 +224,7 @@ def _component_map(schematic: Schematic) -> dict[str, Component]: return {comp.name: comp for comp in schematic.components} -def _diff_components( - schema_a: Schematic, schema_b: Schematic -) -> list[ComponentChange]: +def _diff_components(schema_a: Schematic, schema_b: Schematic) -> list[ComponentChange]: """Compare components between two schematics.""" map_a = _component_map(schema_a) map_b = _component_map(schema_b) @@ -269,9 +268,7 @@ def _diff_components( moved = (comp_a.x, comp_a.y) != (comp_b.x, comp_b.y) value_changed = comp_a.value != comp_b.value attrs_changed = comp_a.attributes != comp_b.attributes - rotation_changed = ( - comp_a.rotation != comp_b.rotation or comp_a.mirror != comp_b.mirror - ) + rotation_changed = comp_a.rotation != comp_b.rotation or comp_a.mirror != comp_b.mirror if moved or value_changed or attrs_changed or rotation_changed: changes.append( @@ -290,9 +287,7 @@ def _diff_components( return changes -def _diff_directives( - schema_a: Schematic, schema_b: Schematic -) -> list[DirectiveChange]: +def _diff_directives(schema_a: Schematic, schema_b: Schematic) -> list[DirectiveChange]: """Compare SPICE directives between two schematics.""" directives_a = [t.content for t in schema_a.texts if t.type == "spice"] directives_b = [t.content for t in schema_b.texts if t.type == "spice"] @@ -308,15 +303,11 @@ def _diff_directives( # Removed directives for key in sorted(keys_a - keys_b): - changes.append( - DirectiveChange(change_type="removed", old_text=norm_a[key]) - ) + changes.append(DirectiveChange(change_type="removed", old_text=norm_a[key])) # Added directives for key in sorted(keys_b - keys_a): - changes.append( - DirectiveChange(change_type="added", new_text=norm_b[key]) - ) + changes.append(DirectiveChange(change_type="added", new_text=norm_b[key])) # For modified detection: directives that share a command keyword but differ. # We match by the first token (e.g., ".tran", ".ac") to detect modifications @@ -367,9 +358,7 @@ def _diff_directives( return final_changes -def _diff_nets( - schema_a: Schematic, schema_b: Schematic -) -> tuple[list[str], list[str]]: +def _diff_nets(schema_a: Schematic, schema_b: Schematic) -> tuple[list[str], list[str]]: """Compare net flags between two schematics. Returns: @@ -383,9 +372,7 @@ def _diff_nets( return added, removed -def _diff_wires( - schema_a: Schematic, schema_b: Schematic -) -> tuple[int, int]: +def _diff_wires(schema_a: Schematic, schema_b: Schematic) -> tuple[int, int]: """Compare wires between two schematics using set operations. Returns: diff --git a/src/mcp_ltspice/drc.py b/src/mcp_ltspice/drc.py index e0959f0..b7de026 100644 --- a/src/mcp_ltspice/drc.py +++ b/src/mcp_ltspice/drc.py @@ -63,10 +63,7 @@ class DRCResult: parts.append(f"{info_count} info") status = "FAILED" if err_count else "passed with warnings" - return ( - f"DRC {status}: {self.checks_run} checks run, " - f"{', '.join(parts)}." - ) + return f"DRC {status}: {self.checks_run} checks run, {', '.join(parts)}." def to_dict(self) -> dict: """Convert to JSON-serializable dict.""" @@ -122,8 +119,7 @@ def _check_ground(sch: Schematic, result: DRCResult) -> None: rule="NO_GROUND", severity=Severity.ERROR, message=( - "No ground node found. Every circuit needs at least " - "one ground (0) connection." + "No ground node found. Every circuit needs at least one ground (0) connection." ), ) ) @@ -173,18 +169,13 @@ def _check_simulation_directive(sch: Schematic, result: DRCResult) -> None: result.checks_run += 1 directives = sch.get_spice_directives() sim_types = [".tran", ".ac", ".dc", ".op", ".noise", ".tf"] - has_sim = any( - any(d.lower().startswith(s) for s in sim_types) for d in directives - ) + has_sim = any(any(d.lower().startswith(s) for s in sim_types) for d in directives) if not has_sim: result.violations.append( DRCViolation( rule="NO_SIM_DIRECTIVE", severity=Severity.ERROR, - message=( - "No simulation directive found. " - "Add .tran, .ac, .dc, .op, etc." - ), + message=("No simulation directive found. Add .tran, .ac, .dc, .op, etc."), ) ) @@ -233,11 +224,7 @@ def _check_voltage_source_loops(sch: Schematic, result: DRCResult) -> None: # offset along the component axis. Standard pin spacing is 64 units # vertically for a non-rotated voltage source (pin+ at top, pin- at # bottom relative to the symbol origin). - voltage_sources = [ - c - for c in sch.components - if "voltage" in c.symbol.lower() - ] + voltage_sources = [c for c in sch.components if "voltage" in c.symbol.lower()] if len(voltage_sources) < 2: return @@ -246,21 +233,21 @@ def _check_voltage_source_loops(sch: Schematic, result: DRCResult) -> None: # Default (R0): positive pin at (x, y-16), negative at (x, y+16) # We use a coarse offset; the exact value depends on the symbol but # 16 is a common half-pin-spacing in LTspice grid units. - PIN_OFFSET = 16 + pin_offset = 16 def _pin_positions(comp): """Return approximate (positive_pin, negative_pin) coordinates.""" x, y = comp.x, comp.y rot = comp.rotation if rot == 0: - return (x, y - PIN_OFFSET), (x, y + PIN_OFFSET) + return (x, y - pin_offset), (x, y + pin_offset) elif rot == 90: - return (x + PIN_OFFSET, y), (x - PIN_OFFSET, y) + return (x + pin_offset, y), (x - pin_offset, y) elif rot == 180: - return (x, y + PIN_OFFSET), (x, y - PIN_OFFSET) + return (x, y + pin_offset), (x, y - pin_offset) elif rot == 270: - return (x - PIN_OFFSET, y), (x + PIN_OFFSET, y) - return (x, y - PIN_OFFSET), (x, y + PIN_OFFSET) + return (x - pin_offset, y), (x + pin_offset, y) + return (x, y - pin_offset), (x, y + pin_offset) def _nearest_net(pin: tuple[int, int]) -> tuple[int, int]: """Find the nearest wire/flag point to a pin and return its net root. @@ -350,9 +337,7 @@ def _check_component_values(sch: Schematic, result: DRCResult) -> None: DRCViolation( rule="MISSING_VALUE", severity=Severity.WARNING, - message=( - f"{matched_type} '{comp.name}' has no value set." - ), + message=(f"{matched_type} '{comp.name}' has no value set."), component=comp.name, location=(comp.x, comp.y), ) @@ -393,7 +378,7 @@ def _check_unconnected_components(sch: Schematic, result: DRCResult) -> None: """ result.checks_run += 1 - PROXIMITY = 16 # LTspice grid spacing + proximity = 16 # LTspice grid spacing # Collect all wire endpoints into a set for fast lookup wire_points: set[tuple[int, int]] = set() @@ -412,14 +397,14 @@ def _check_unconnected_components(sch: Schematic, result: DRCResult) -> None: if not comp.name: continue - # Check if any connection point is within PROXIMITY of the + # Check if any connection point is within proximity of the # component origin. We scan a small bounding box rather than # iterating all points. connected = False for pt in all_connection_points: dx = abs(pt[0] - comp.x) dy = abs(pt[1] - comp.y) - if dx <= PROXIMITY and dy <= PROXIMITY: + if dx <= proximity and dy <= proximity: connected = True break diff --git a/src/mcp_ltspice/log_parser.py b/src/mcp_ltspice/log_parser.py index 8a34cef..e37d751 100644 --- a/src/mcp_ltspice/log_parser.py +++ b/src/mcp_ltspice/log_parser.py @@ -1,8 +1,8 @@ """Parse LTspice simulation log files.""" +import re from dataclasses import dataclass, field from pathlib import Path -import re @dataclass @@ -124,9 +124,7 @@ def parse_log(path: Path | str) -> SimulationLog: # Measurement: failed. m = _MEAS_FAILED_RE.match(stripped) if m: - log.measurements.append( - Measurement(name=m.group("name"), value=None, failed=True) - ) + log.measurements.append(Measurement(name=m.group("name"), value=None, failed=True)) continue # Measurement: numeric value. diff --git a/src/mcp_ltspice/models.py b/src/mcp_ltspice/models.py index 122e826..dfe8774 100644 --- a/src/mcp_ltspice/models.py +++ b/src/mcp_ltspice/models.py @@ -1,18 +1,24 @@ """Search and parse LTspice SPICE model libraries.""" +import re from dataclasses import dataclass, field from pathlib import Path -import re from .config import LTSPICE_LIB # Known SPICE model types and their categories -_DISCRETE_TYPES = frozenset({ - "NPN", "PNP", # BJTs - "NMOS", "PMOS", "VDMOS", # MOSFETs - "D", # Diodes - "NJF", "PJF", # JFETs -}) +_DISCRETE_TYPES = frozenset( + { + "NPN", + "PNP", # BJTs + "NMOS", + "PMOS", + "VDMOS", # MOSFETs + "D", # Diodes + "NJF", + "PJF", # JFETs + } +) # Module-level cache _cache: tuple[list["SpiceModel"], list["SpiceSubcircuit"]] | None = None @@ -21,8 +27,9 @@ _cache: tuple[list["SpiceModel"], list["SpiceSubcircuit"]] | None = None @dataclass class SpiceModel: """A .model definition.""" - name: str # e.g., "2N2222" - type: str # e.g., "NPN", "D", "NMOS", "PMOS", "PNP" + + name: str # e.g., "2N2222" + type: str # e.g., "NPN", "D", "NMOS", "PMOS", "PNP" parameters: dict[str, str] = field(default_factory=dict) source_file: str = "" @@ -30,7 +37,8 @@ class SpiceModel: @dataclass class SpiceSubcircuit: """A .subckt definition.""" - name: str # e.g., "LT1001" + + name: str # e.g., "LT1001" pins: list[str] = field(default_factory=list) pin_names: list[str] = field(default_factory=list) # From comments description: str = "" @@ -145,19 +153,19 @@ def _read_file_text(path: Path) -> str: # Regex patterns compiled once _MODEL_RE = re.compile( - r"^\s*\.model\s+" # .model keyword - r"(\S+)\s+" # model name - r"(?:ako:\S+\s+)?" # optional ako:reference - r"(\w+)" # type (NPN, D, VDMOS, etc.) - r"(?:\s*\(([^)]*)\))?" # optional (params) - r"(.*)", # trailing params outside parens + r"^\s*\.model\s+" # .model keyword + r"(\S+)\s+" # model name + r"(?:ako:\S+\s+)?" # optional ako:reference + r"(\w+)" # type (NPN, D, VDMOS, etc.) + r"(?:\s*\(([^)]*)\))?" # optional (params) + r"(.*)", # trailing params outside parens re.IGNORECASE, ) _SUBCKT_RE = re.compile( r"^\s*\.subckt\s+" - r"(\S+)" # subcircuit name - r"((?:\s+\S+)*)", # pins (space-separated) + r"(\S+)" # subcircuit name + r"((?:\s+\S+)*)", # pins (space-separated) re.IGNORECASE, ) @@ -247,12 +255,14 @@ def _scan_lib_file(path: Path) -> tuple[list[SpiceModel], list[SpiceSubcircuit]] param_str = (model_match.group(3) or "") + " " + (model_match.group(4) or "") params = _parse_params(param_str) - models.append(SpiceModel( - name=name, - type=mtype, - parameters=params, - source_file=source, - )) + models.append( + SpiceModel( + name=name, + type=mtype, + parameters=params, + source_file=source, + ) + ) continue # Check for .subckt diff --git a/src/mcp_ltspice/netlist.py b/src/mcp_ltspice/netlist.py index a1e91ab..f0051db 100644 --- a/src/mcp_ltspice/netlist.py +++ b/src/mcp_ltspice/netlist.py @@ -35,22 +35,14 @@ class Netlist: # -- Passive components --------------------------------------------------- - def add_resistor( - self, name: str, node_p: str, node_n: str, value: str - ) -> "Netlist": + def add_resistor(self, name: str, node_p: str, node_n: str, value: str) -> "Netlist": """Add a resistor. Example: add_resistor('R1', 'in', 'out', '10k')""" - self.components.append( - NetlistComponent(name=name, nodes=[node_p, node_n], value=value) - ) + self.components.append(NetlistComponent(name=name, nodes=[node_p, node_n], value=value)) return self - def add_capacitor( - self, name: str, node_p: str, node_n: str, value: str - ) -> "Netlist": + def add_capacitor(self, name: str, node_p: str, node_n: str, value: str) -> "Netlist": """Add a capacitor.""" - self.components.append( - NetlistComponent(name=name, nodes=[node_p, node_n], value=value) - ) + self.components.append(NetlistComponent(name=name, nodes=[node_p, node_n], value=value)) return self def add_inductor( @@ -64,9 +56,7 @@ class Netlist: """Add an inductor with optional series resistance (Rser).""" params = f"Rser={series_resistance}" if series_resistance else "" self.components.append( - NetlistComponent( - name=name, nodes=[node_p, node_n], value=value, params=params - ) + NetlistComponent(name=name, nodes=[node_p, node_n], value=value, params=params) ) return self @@ -94,9 +84,7 @@ class Netlist: sin: (Voffset, Vamp, Freq, Td, Theta, Phi) """ value = self._build_source_value(dc=dc, ac=ac, pulse=pulse, sin=sin) - self.components.append( - NetlistComponent(name=name, nodes=[node_p, node_n], value=value) - ) + self.components.append(NetlistComponent(name=name, nodes=[node_p, node_n], value=value)) return self def add_current_source( @@ -109,20 +97,14 @@ class Netlist: ) -> "Netlist": """Add a current source.""" value = self._build_source_value(dc=dc, ac=ac) - self.components.append( - NetlistComponent(name=name, nodes=[node_p, node_n], value=value) - ) + self.components.append(NetlistComponent(name=name, nodes=[node_p, node_n], value=value)) return self # -- Semiconductors ------------------------------------------------------- - def add_diode( - self, name: str, anode: str, cathode: str, model: str - ) -> "Netlist": + def add_diode(self, name: str, anode: str, cathode: str, model: str) -> "Netlist": """Add a diode. Example: add_diode('D1', 'a', 'k', '1N4148')""" - self.components.append( - NetlistComponent(name=name, nodes=[anode, cathode], value=model) - ) + self.components.append(NetlistComponent(name=name, nodes=[anode, cathode], value=model)) return self def add_mosfet( @@ -134,14 +116,14 @@ class Netlist: body: str, model: str, w: str | None = None, - l: str | None = None, + length: str | None = None, ) -> "Netlist": """Add a MOSFET.""" params_parts: list[str] = [] if w: params_parts.append(f"W={w}") - if l: - params_parts.append(f"L={l}") + if length: + params_parts.append(f"L={length}") params = " ".join(params_parts) self.components.append( NetlistComponent( @@ -153,14 +135,10 @@ class Netlist: ) return self - def add_bjt( - self, name: str, collector: str, base: str, emitter: str, model: str - ) -> "Netlist": + def add_bjt(self, name: str, collector: str, base: str, emitter: str, model: str) -> "Netlist": """Add a BJT transistor.""" self.components.append( - NetlistComponent( - name=name, nodes=[collector, base, emitter], value=model - ) + NetlistComponent(name=name, nodes=[collector, base, emitter], value=model) ) return self @@ -182,31 +160,21 @@ class Netlist: X """ self.components.append( - NetlistComponent( - name=name, nodes=[inp, inn, vpos, vneg, out], value=model - ) + NetlistComponent(name=name, nodes=[inp, inn, vpos, vneg, out], value=model) ) return self - def add_subcircuit( - self, name: str, nodes: list[str], model: str - ) -> "Netlist": + def add_subcircuit(self, name: str, nodes: list[str], model: str) -> "Netlist": """Add a generic subcircuit instance.""" - self.components.append( - NetlistComponent(name=name, nodes=list(nodes), value=model) - ) + self.components.append(NetlistComponent(name=name, nodes=list(nodes), value=model)) return self # -- Generic component ---------------------------------------------------- - def add_component( - self, name: str, nodes: list[str], value: str, params: str = "" - ) -> "Netlist": + def add_component(self, name: str, nodes: list[str], value: str, params: str = "") -> "Netlist": """Add any component with explicit nodes.""" self.components.append( - NetlistComponent( - name=name, nodes=list(nodes), value=value, params=params - ) + NetlistComponent(name=name, nodes=list(nodes), value=value, params=params) ) return self @@ -381,3 +349,326 @@ def inverting_amplifier( .add_opamp("X1", "0", "inv", "out", "vdd", "vss", opamp_model) .add_directive(".ac dec 100 1 1meg") ) + + +def non_inverting_amplifier( + r_in: str = "10k", + r_f: str = "100k", + opamp_model: str = "LT1001", +) -> Netlist: + """Create a non-inverting op-amp amplifier. + + Topology: + V1 --> non-inv(+) + inv(-) --[R_in]--> GND + inv(-) --[R_f]--> out + Supply: +/-15V + Gain = 1 + R_f / R_in + """ + return ( + Netlist("Non-Inverting Amplifier") + .add_comment(f"Gain = 1 + {r_f}/{r_in}") + .add_lib(opamp_model) + .add_voltage_source("V1", "in", "0", ac="1") + .add_voltage_source("Vpos", "vdd", "0", dc="15") + .add_voltage_source("Vneg", "0", "vss", dc="15") + .add_resistor("Rin", "inv", "0", r_in) + .add_resistor("Rf", "inv", "out", r_f) + .add_opamp("X1", "in", "inv", "out", "vdd", "vss", opamp_model) + .add_directive(".ac dec 100 1 1meg") + ) + + +def differential_amplifier( + r1: str = "10k", + r2: str = "10k", + r3: str = "10k", + r4: str = "10k", + opamp_model: str = "LT1001", +) -> Netlist: + """Create a classic differential amplifier. + + Topology: + V1 --[R1]--> inv(-) --[R2]--> out + V2 --[R3]--> non-inv(+) + non-inv(+) --[R4]--> GND + Supply: +/-15V + Vout = (R2/R1) * (V2 - V1) when R2/R1 = R4/R3 + """ + return ( + Netlist("Differential Amplifier") + .add_comment(f"Vout = ({r2}/{r1}) * (V2 - V1) when R2/R1 = R4/R3") + .add_lib(opamp_model) + .add_voltage_source("V1", "in1", "0", ac="1") + .add_voltage_source("V2", "in2", "0", ac="1") + .add_voltage_source("Vpos", "vdd", "0", dc="15") + .add_voltage_source("Vneg", "0", "vss", dc="15") + .add_resistor("R1", "in1", "inv", r1) + .add_resistor("R2", "inv", "out", r2) + .add_resistor("R3", "in2", "noninv", r3) + .add_resistor("R4", "noninv", "0", r4) + .add_opamp("X1", "noninv", "inv", "out", "vdd", "vss", opamp_model) + .add_directive(".ac dec 100 1 1meg") + ) + + +def common_emitter_amplifier( + rc: str = "2.2k", + rb1: str = "56k", + rb2: str = "12k", + re: str = "1k", + cc1: str = "10u", + cc2: str = "10u", + ce: str = "47u", + vcc: str = "12", + bjt_model: str = "2N2222", +) -> Netlist: + """Create a common-emitter amplifier with voltage divider bias. + + Topology: + VCC --[RC]--> collector --> [CC2] --> out + VCC --[RB1]--> base + base --[RB2]--> GND + emitter --[RE]--> GND + emitter --[CE]--> GND (bypass cap) + in --[CC1]--> base (input coupling cap) + """ + return ( + Netlist("Common Emitter Amplifier") + .add_comment("Voltage divider bias with emitter bypass cap") + .add_lib(bjt_model) + .add_voltage_source("Vcc", "vcc", "0", dc=vcc) + .add_voltage_source("Vin", "in", "0", ac="1", sin=("0", "10m", "1k")) + .add_resistor("RC", "vcc", "collector", rc) + .add_resistor("RB1", "vcc", "base", rb1) + .add_resistor("RB2", "base", "0", rb2) + .add_resistor("RE", "emitter", "0", re) + .add_capacitor("CC1", "in", "base", cc1) + .add_capacitor("CC2", "collector", "out", cc2) + .add_capacitor("CE", "emitter", "0", ce) + .add_bjt("Q1", "collector", "base", "emitter", bjt_model) + .add_directive(".tran 5m") + .add_directive(".ac dec 100 10 10meg") + ) + + +def buck_converter( + ind: str = "10u", + c_out: str = "100u", + r_load: str = "10", + v_in: str = "12", + duty_cycle: float = 0.5, + freq: str = "100k", + mosfet_model: str = "IRF540N", + diode_model: str = "1N5819", +) -> Netlist: + """Create a buck (step-down) converter. + + Topology: + V_in --> MOSFET (high-side switch) --> sw node + sw --> [L] --> out + sw --> Diode (cathode) --> GND (freewheeling) + out --> [C_out] --> GND + out --> [R_load] --> GND + Gate driven by PULSE source at specified frequency and duty cycle. + """ + # Compute timing from duty cycle and frequency + # freq string may use SPICE suffixes; keep it as-is for the netlist. + # For the PULSE source we need numeric period and on-time. + freq_hz = _parse_spice_value(freq) + period = 1.0 / freq_hz + t_on = period * duty_cycle + t_rise = period * 0.01 # 1% of period + t_fall = t_rise + + return ( + Netlist("Buck Converter") + .add_comment(f"Duty cycle = {duty_cycle:.0%}, Fsw = {freq}") + .add_lib(mosfet_model) + .add_lib(diode_model) + .add_voltage_source("Vin", "vin", "0", dc=v_in) + .add_voltage_source( + "Vgate", + "gate", + "0", + pulse=( + "0", + v_in, + "0", + f"{t_rise:.4g}", + f"{t_fall:.4g}", + f"{t_on:.4g}", + f"{period:.4g}", + ), + ) + .add_mosfet("M1", "vin", "gate", "sw", "sw", mosfet_model) + .add_diode("D1", "0", "sw", diode_model) + .add_inductor("L1", "sw", "out", ind) + .add_capacitor("Cout", "out", "0", c_out) + .add_resistor("Rload", "out", "0", r_load) + .add_directive(f".tran {period * 200:.4g}") + ) + + +def ldo_regulator( + opamp_model: str = "LT1001", + r1: str = "10k", + r2: str = "10k", + pass_transistor: str = "IRF9540N", + v_in: str = "8", + v_ref: str = "2.5", +) -> Netlist: + """Create a simple LDO voltage regulator. + + Topology: + V_in --> PMOS pass transistor (source) --> out (drain) + Error amp: non-inv(+) = V_ref, inv(-) = feedback + Feedback divider: out --[R1]--> fb --[R2]--> GND + Error amp output drives PMOS gate + Vout = V_ref * (1 + R1/R2) + """ + return ( + Netlist("LDO Regulator") + .add_comment(f"Vout = {v_ref} * (1 + {r1}/{r2})") + .add_lib(opamp_model) + .add_lib(pass_transistor) + .add_voltage_source("Vin", "vin", "0", dc=v_in) + .add_voltage_source("Vref", "vref", "0", dc=v_ref) + .add_voltage_source("Vpos", "vdd", "0", dc=v_in) + .add_voltage_source("Vneg", "0", "vss", dc=v_in) + .add_mosfet("M1", "out", "gate", "vin", "vin", pass_transistor) + .add_opamp("X1", "vref", "fb", "gate", "vdd", "vss", opamp_model) + .add_resistor("R1", "out", "fb", r1) + .add_resistor("R2", "fb", "0", r2) + .add_resistor("Rload", "out", "0", "100") + .add_capacitor("Cout", "out", "0", "10u") + .add_directive(".tran 10m") + ) + + +def colpitts_oscillator( + ind: str = "1u", + c1: str = "100p", + c2: str = "100p", + rb: str = "47k", + rc: str = "1k", + re: str = "470", + vcc: str = "12", + bjt_model: str = "2N2222", +) -> Netlist: + """Create a Colpitts oscillator. + + Topology: + VCC --[RC]--> collector + collector --[L]--> tank + tank --[C1]--> base + tank --[C2]--> GND + base --[RB]--> VCC (bias) + emitter --[RE]--> GND + Output taken at the collector. + + The oscillation frequency is approximately: + f = 1 / (2*pi*sqrt(L * C1*C2/(C1+C2))) + """ + return ( + Netlist("Colpitts Oscillator") + .add_comment("f ~ 1 / (2*pi*sqrt(L * Cseries))") + .add_lib(bjt_model) + .add_voltage_source("Vcc", "vcc", "0", dc=vcc) + .add_resistor("RC", "vcc", "collector", rc) + .add_resistor("RB", "vcc", "base", rb) + .add_resistor("RE", "emitter", "0", re) + .add_inductor("L1", "collector", "tank", ind) + .add_capacitor("C1", "tank", "base", c1) + .add_capacitor("C2", "tank", "0", c2) + .add_bjt("Q1", "collector", "base", "emitter", bjt_model) + .add_directive(".tran 100u") + .add_directive(".ic V(collector)=6") + ) + + +def h_bridge( + v_supply: str = "12", + r_load: str = "10", + mosfet_model: str = "IRF540N", +) -> Netlist: + """Create an H-bridge motor driver. + + Topology: + V+ --[M1 (high-side A)]--> outA --[M3 (low-side A)]--> GND + V+ --[M2 (high-side B)]--> outB --[M4 (low-side B)]--> GND + R_load between outA and outB. + M1 & M4 driven together (forward), M2 & M3 driven together (reverse). + Gate signals are complementary PULSE sources with dead time. + """ + # Drive at 1 kHz with 50% duty, small dead time to prevent shoot-through + period = "1m" + t_on = "450u" + t_dead = "25u" + + return ( + Netlist("H-Bridge Motor Driver") + .add_comment("Complementary gate drives with dead time") + .add_lib(mosfet_model) + .add_voltage_source("Vsupply", "vcc", "0", dc=v_supply) + # Forward drive: M1 (high-A) and M4 (low-B) on first half + .add_voltage_source( + "Vg_fwd", + "gate_fwd", + "0", + pulse=("0", v_supply, t_dead, "10n", "10n", t_on, period), + ) + # Reverse drive: M2 (high-B) and M3 (low-A) on second half + .add_voltage_source( + "Vg_rev", + "gate_rev", + "0", + pulse=("0", v_supply, "525u", "10n", "10n", t_on, period), + ) + # High-side A + .add_mosfet("M1", "vcc", "gate_fwd", "outA", "outA", mosfet_model) + # High-side B + .add_mosfet("M2", "vcc", "gate_rev", "outB", "outB", mosfet_model) + # Low-side A + .add_mosfet("M3", "outA", "gate_rev", "0", "0", mosfet_model) + # Low-side B + .add_mosfet("M4", "outB", "gate_fwd", "0", "0", mosfet_model) + .add_resistor("Rload", "outA", "outB", r_load) + .add_directive(".tran 5m") + ) + + +def _parse_spice_value(value: str) -> float: + """Convert a SPICE-style value string to a float. + + Handles common suffixes: T, G, meg, k, m, u, n, p, f. + """ + suffixes = { + "T": 1e12, + "G": 1e9, + "MEG": 1e6, + "K": 1e3, + "M": 1e-3, + "U": 1e-6, + "N": 1e-9, + "P": 1e-12, + "F": 1e-15, + } + value = value.strip() + # Try plain float first + try: + return float(value) + except ValueError: + pass + + # Try suffixes (longest match first to catch "meg" before "m") + upper = value.upper() + for suffix, mult in sorted(suffixes.items(), key=lambda x: -len(x[0])): + if upper.endswith(suffix): + num_part = value[: len(value) - len(suffix)] + try: + return float(num_part) * mult + except ValueError: + continue + + raise ValueError(f"Cannot parse SPICE value: {value!r}") diff --git a/src/mcp_ltspice/optimizer.py b/src/mcp_ltspice/optimizer.py new file mode 100644 index 0000000..185deff --- /dev/null +++ b/src/mcp_ltspice/optimizer.py @@ -0,0 +1,809 @@ +"""Component value optimizer for iterative circuit tuning. + +Adjusts circuit parameters toward target specifications by running +real LTspice simulations in a loop. Supports single-component binary +search and multi-component coordinate descent. +""" + +import logging +import math +import tempfile +from dataclasses import dataclass, field +from pathlib import Path + +import numpy as np + +from .raw_parser import RawFile +from .runner import run_netlist +from .waveform_math import ( + compute_bandwidth, + compute_peak_to_peak, + compute_rms, + compute_settling_time, +) + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Standard component value series +# --------------------------------------------------------------------------- + +# E12: 12 values per decade (10% tolerance) +_E12 = [1.0, 1.2, 1.5, 1.8, 2.2, 2.7, 3.3, 3.9, 4.7, 5.6, 6.8, 8.2] + +# E24: 24 values per decade (5% tolerance) +_E24 = [ + 1.0, + 1.1, + 1.2, + 1.3, + 1.5, + 1.6, + 1.8, + 2.0, + 2.2, + 2.4, + 2.7, + 3.0, + 3.3, + 3.6, + 3.9, + 4.3, + 4.7, + 5.1, + 5.6, + 6.2, + 6.8, + 7.5, + 8.2, + 9.1, +] + +# E96: 96 values per decade (1% tolerance) +_E96 = [ + 1.00, + 1.02, + 1.05, + 1.07, + 1.10, + 1.13, + 1.15, + 1.18, + 1.21, + 1.24, + 1.27, + 1.30, + 1.33, + 1.37, + 1.40, + 1.43, + 1.47, + 1.50, + 1.54, + 1.58, + 1.62, + 1.65, + 1.69, + 1.74, + 1.78, + 1.82, + 1.87, + 1.91, + 1.96, + 2.00, + 2.05, + 2.10, + 2.15, + 2.21, + 2.26, + 2.32, + 2.37, + 2.43, + 2.49, + 2.55, + 2.61, + 2.67, + 2.74, + 2.80, + 2.87, + 2.94, + 3.01, + 3.09, + 3.16, + 3.24, + 3.32, + 3.40, + 3.48, + 3.57, + 3.65, + 3.74, + 3.83, + 3.92, + 4.02, + 4.12, + 4.22, + 4.32, + 4.42, + 4.53, + 4.64, + 4.75, + 4.87, + 4.99, + 5.11, + 5.23, + 5.36, + 5.49, + 5.62, + 5.76, + 5.90, + 6.04, + 6.19, + 6.34, + 6.49, + 6.65, + 6.81, + 6.98, + 7.15, + 7.32, + 7.50, + 7.68, + 7.87, + 8.06, + 8.25, + 8.45, + 8.66, + 8.87, + 9.09, + 9.31, + 9.53, + 9.76, +] + +_SERIES = { + "E12": _E12, + "E24": _E24, + "E96": _E96, +} + +# Engineering prefixes: exponent -> suffix +_ENG_PREFIXES = { + -15: "f", + -12: "p", + -9: "n", + -6: "u", + -3: "m", + 0: "", + 3: "k", + 6: "M", + 9: "G", + 12: "T", +} + + +# --------------------------------------------------------------------------- +# Data classes +# --------------------------------------------------------------------------- + + +@dataclass +class OptimizationTarget: + """A single performance target to optimize toward.""" + + signal_name: str # e.g. "V(out)" + metric: str # "bandwidth_hz", "rms", "peak_to_peak", "settling_time", + # "gain_db", "phase_margin_deg" + target_value: float + weight: float = 1.0 + + +@dataclass +class ComponentRange: + """Allowed range for a component value during optimization.""" + + component_name: str # e.g. "R1" + min_value: float + max_value: float + preferred_series: str | None = None # "E12", "E24", "E96" + + +@dataclass +class OptimizationResult: + """Outcome of an optimization run.""" + + best_values: dict[str, float] + best_cost: float + iterations: int + history: list[dict] = field(default_factory=list) + targets_met: dict[str, bool] = field(default_factory=dict) + final_metrics: dict[str, float] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def snap_to_preferred(value: float, series: str = "E12") -> float: + """Snap a value to the nearest standard component value. + + Works across decades -- e.g. 4800 snaps to 4.7k (E12). + + Args: + value: Raw component value in base units (ohms, farads, etc.) + series: Preferred series name ("E12", "E24", "E96") + + Returns: + Nearest standard value from the requested series. + """ + if value <= 0: + return _SERIES.get(series, _E12)[0] + + base_values = _SERIES.get(series, _E12) + + decade = math.floor(math.log10(value)) + mantissa = value / (10**decade) + + best = base_values[0] + best_ratio = abs(math.log10(mantissa / best)) + + for bv in base_values[1:]: + ratio = abs(math.log10(mantissa / bv)) + if ratio < best_ratio: + best = bv + best_ratio = ratio + + # Also check the decade above (mantissa might round up past 9.x) + for bv in base_values[:2]: + candidate = bv * 10 + ratio = abs(math.log10(mantissa / candidate)) + if ratio < best_ratio: + best = candidate + best_ratio = ratio + + return best * (10**decade) + + +def format_engineering(value: float) -> str: + """Format a float in engineering notation with SI suffix. + + Examples: + 10000 -> "10k" + 0.0001 -> "100u" + 4700 -> "4.7k" + 0.0000033 -> "3.3u" + 1.5 -> "1.5" + """ + if value == 0: + return "0" + + sign = "-" if value < 0 else "" + value = abs(value) + + exp = math.floor(math.log10(value)) + # Round to nearest multiple of 3 (downward) + eng_exp = (exp // 3) * 3 + + # Clamp to known prefixes + eng_exp = max(-15, min(12, eng_exp)) + + mantissa = value / (10**eng_exp) + + suffix = _ENG_PREFIXES.get(eng_exp, f"e{eng_exp}") + + # Format mantissa: strip trailing zeros but keep at least one digit + if mantissa == int(mantissa) and mantissa < 1000: + formatted = f"{sign}{int(mantissa)}{suffix}" + else: + formatted = f"{sign}{mantissa:.3g}{suffix}" + + return formatted + + +# --------------------------------------------------------------------------- +# Metric extraction +# --------------------------------------------------------------------------- + + +def _extract_metric( + raw_data: RawFile, + target: OptimizationTarget, +) -> float | None: + """Compute a single metric from simulation results. + + Returns the measured value, or None if the signal/data is unavailable. + """ + signal = raw_data.get_variable(target.signal_name) + if signal is None: + return None + + metric = target.metric + + if metric == "rms": + return compute_rms(signal) + + if metric == "peak_to_peak": + result = compute_peak_to_peak(signal) + return result["peak_to_peak"] + + if metric == "settling_time": + time = raw_data.get_time() + if time is None: + return None + if np.iscomplexobj(time): + time = time.real + if np.iscomplexobj(signal): + signal = np.abs(signal) + result = compute_settling_time(time, signal) + return result["settling_time"] + + if metric == "bandwidth_hz": + freq = raw_data.get_frequency() + if freq is None: + return None + # Convert complex magnitude to dB + mag_db = np.where( + np.abs(signal) > 0, + 20.0 * np.log10(np.abs(signal)), + -200.0, + ) + result = compute_bandwidth(np.real(freq), np.real(mag_db)) + return result["bandwidth_hz"] + + if metric == "gain_db": + # Peak gain in dB (for AC analysis signals) + mag = np.abs(signal) + peak = float(np.max(mag)) + if peak > 0: + return 20.0 * math.log10(peak) + return -200.0 + + if metric == "phase_margin_deg": + freq = raw_data.get_frequency() + if freq is None: + return None + # Phase margin: phase at the frequency where |gain| crosses 0 dB + mag = np.abs(signal) + mag_db = np.where(mag > 0, 20.0 * np.log10(mag), -200.0) + phase_deg = np.degrees(np.angle(signal)) + + # Find 0 dB crossing (gain crossover) + for i in range(len(mag_db) - 1): + if mag_db[i] >= 0 and mag_db[i + 1] < 0: + # Interpolate phase at 0 dB crossing + dm = mag_db[i + 1] - mag_db[i] + if abs(dm) < 1e-30: + phase_at_xover = float(phase_deg[i]) + else: + frac = (0.0 - mag_db[i]) / dm + phase_at_xover = float(phase_deg[i] + frac * (phase_deg[i + 1] - phase_deg[i])) + # Phase margin = 180 + phase (since we want distance from -180) + return 180.0 + phase_at_xover + + # No 0 dB crossing found -- gain never reaches unity + return None + + return None + + +def _compute_cost( + raw_data: RawFile, + targets: list[OptimizationTarget], +) -> tuple[float, dict[str, float]]: + """Evaluate weighted cost across all targets. + + Cost for each target: weight * |measured - target| / |target| + Uses |target| normalization so different units are comparable. + + Returns: + (total_cost, metrics_dict) where metrics_dict maps + "signal_name.metric" -> measured_value + """ + total_cost = 0.0 + metrics: dict[str, float] = {} + + for t in targets: + measured = _extract_metric(raw_data, t) + key = f"{t.signal_name}.{t.metric}" + + if measured is None: + # Signal not found -- heavy penalty + total_cost += t.weight * 1e6 + metrics[key] = float("nan") + continue + + metrics[key] = measured + + if abs(t.target_value) > 1e-30: + cost = t.weight * abs(measured - t.target_value) / abs(t.target_value) + else: + # Target is ~0: use absolute error + cost = t.weight * abs(measured) + + total_cost += cost + + return total_cost, metrics + + +# --------------------------------------------------------------------------- +# Core optimizer +# --------------------------------------------------------------------------- + + +async def _run_with_values( + netlist_template: str, + values: dict[str, float], + work_dir: Path, +) -> RawFile | None: + """Substitute values into template, write .cir, simulate, return parsed data.""" + text = netlist_template + for name, val in values.items(): + text = text.replace(f"{{{name}}}", format_engineering(val)) + + cir_path = work_dir / "opt_iter.cir" + cir_path.write_text(text) + + result = await run_netlist(cir_path, work_dir=work_dir) + + if not result.success or result.raw_data is None: + logger.warning("Simulation failed: %s", result.error) + return None + + return result.raw_data + + +async def _binary_search_single( + netlist_template: str, + target: OptimizationTarget, + comp: ComponentRange, + max_iterations: int, + work_dir: Path, +) -> OptimizationResult: + """Optimize a single component via binary search. + + Assumes the metric is monotonic (or at least locally monotonic) + with respect to the component value. Evaluates the midpoint, + then narrows the half that moves the metric toward the target. + """ + lo = comp.min_value + hi = comp.max_value + history: list[dict] = [] + best_values = {comp.component_name: (lo + hi) / 2} + best_cost = float("inf") + best_metrics: dict[str, float] = {} + + for iteration in range(max_iterations): + mid = (lo + hi) / 2 + values = {comp.component_name: mid} + + raw_data = await _run_with_values(netlist_template, values, work_dir) + if raw_data is None: + history.append( + { + "iteration": iteration, + "values": {comp.component_name: mid}, + "cost": float("inf"), + "error": "simulation_failed", + } + ) + # Narrow toward the other half on failure + hi = mid + continue + + cost, metrics = _compute_cost(raw_data, [target]) + measured = metrics.get(f"{target.signal_name}.{target.metric}") + + history.append( + { + "iteration": iteration, + "values": {comp.component_name: mid}, + "cost": cost, + "metrics": metrics, + } + ) + + if cost < best_cost: + best_cost = cost + best_values = {comp.component_name: mid} + best_metrics = metrics + + # Decide which half to keep + if measured is not None and not math.isnan(measured): + if measured < target.target_value: + # Need larger metric -- which direction depends on monotonicity. + # Test: does increasing component value increase or decrease metric? + # We probe a point slightly above mid to determine direction. + probe = mid * 1.1 + if probe > hi: + probe = mid * 0.9 + probe_is_lower = True + else: + probe_is_lower = False + + probe_data = await _run_with_values( + netlist_template, {comp.component_name: probe}, work_dir + ) + if probe_data is not None: + probe_measured = _extract_metric(probe_data, target) + if probe_measured is not None: + if probe_is_lower: + # We probed lower. If metric went up, metric increases + # with decreasing value => go lower. + if probe_measured > measured: + hi = mid + else: + lo = mid + else: + # We probed higher. If metric went up, go higher. + if probe_measured > measured: + lo = mid + else: + hi = mid + else: + hi = mid + else: + hi = mid + elif measured > target.target_value: + # Need smaller metric -- mirror logic + probe = mid * 1.1 + if probe > hi: + probe = mid * 0.9 + probe_is_lower = True + else: + probe_is_lower = False + + probe_data = await _run_with_values( + netlist_template, {comp.component_name: probe}, work_dir + ) + if probe_data is not None: + probe_measured = _extract_metric(probe_data, target) + if probe_measured is not None: + if probe_is_lower: + if probe_measured < measured: + hi = mid + else: + lo = mid + else: + if probe_measured < measured: + lo = mid + else: + hi = mid + else: + lo = mid + else: + lo = mid + else: + # Exact match + break + + # Converged if search range is tight enough + if hi - lo < lo * 0.001: + break + + # Snap to preferred series if requested + final_val = best_values[comp.component_name] + if comp.preferred_series: + final_val = snap_to_preferred(final_val, comp.preferred_series) + best_values[comp.component_name] = final_val + + target_key = f"{target.signal_name}.{target.metric}" + met = best_metrics.get(target_key) + tolerance = abs(target.target_value * 0.05) if abs(target.target_value) > 0 else 0.05 + targets_met = { + target_key: met is not None and abs(met - target.target_value) <= tolerance, + } + + return OptimizationResult( + best_values=best_values, + best_cost=best_cost, + iterations=len(history), + history=history, + targets_met=targets_met, + final_metrics=best_metrics, + ) + + +async def _coordinate_descent( + netlist_template: str, + targets: list[OptimizationTarget], + component_ranges: list[ComponentRange], + max_iterations: int, + work_dir: Path, +) -> OptimizationResult: + """Optimize multiple components via coordinate descent. + + Cycles through components, running a bounded golden-section search + on each one while holding the others fixed. Repeats until the + overall cost stops improving or we exhaust the iteration budget. + """ + # Start at geometric midpoint of each range + current_values: dict[str, float] = {} + for cr in component_ranges: + current_values[cr.component_name] = math.sqrt(cr.min_value * cr.max_value) + + history: list[dict] = [] + best_cost = float("inf") + best_values = dict(current_values) + best_metrics: dict[str, float] = {} + iteration = 0 + n_comps = len(component_ranges) + + # Golden ratio for search + phi = (1 + math.sqrt(5)) / 2 + resphi = 2 - phi # ~0.382 + + while iteration < max_iterations: + improved_this_cycle = False + + for cr in component_ranges: + if iteration >= max_iterations: + break + + lo = cr.min_value + hi = cr.max_value + + # Golden-section search for this component + # Allocate a few iterations per component per cycle + iters_per_comp = max(2, (max_iterations - iteration) // n_comps) + + a = lo + b = hi + x1 = a + resphi * (b - a) + x2 = b - resphi * (b - a) + + # Evaluate x1 + trial_1 = dict(current_values) + trial_1[cr.component_name] = x1 + raw_1 = await _run_with_values(netlist_template, trial_1, work_dir) + cost_1 = float("inf") + metrics_1: dict[str, float] = {} + if raw_1 is not None: + cost_1, metrics_1 = _compute_cost(raw_1, targets) + iteration += 1 + + # Evaluate x2 + trial_2 = dict(current_values) + trial_2[cr.component_name] = x2 + raw_2 = await _run_with_values(netlist_template, trial_2, work_dir) + cost_2 = float("inf") + metrics_2: dict[str, float] = {} + if raw_2 is not None: + cost_2, metrics_2 = _compute_cost(raw_2, targets) + iteration += 1 + + for _ in range(iters_per_comp - 2): + if iteration >= max_iterations: + break + + if cost_1 < cost_2: + b = x2 + x2 = x1 + cost_2 = cost_1 + metrics_2 = metrics_1 + x1 = a + resphi * (b - a) + trial = dict(current_values) + trial[cr.component_name] = x1 + raw = await _run_with_values(netlist_template, trial, work_dir) + if raw is not None: + cost_1, metrics_1 = _compute_cost(raw, targets) + else: + cost_1 = float("inf") + metrics_1 = {} + else: + a = x1 + x1 = x2 + cost_1 = cost_2 + metrics_1 = metrics_2 + x2 = b - resphi * (b - a) + trial = dict(current_values) + trial[cr.component_name] = x2 + raw = await _run_with_values(netlist_template, trial, work_dir) + if raw is not None: + cost_2, metrics_2 = _compute_cost(raw, targets) + else: + cost_2 = float("inf") + metrics_2 = {} + + iteration += 1 + + # Pick the better of the final two + if cost_1 <= cost_2: + chosen_val = x1 + chosen_cost = cost_1 + chosen_metrics = metrics_1 + else: + chosen_val = x2 + chosen_cost = cost_2 + chosen_metrics = metrics_2 + + # Snap to preferred series + if cr.preferred_series: + chosen_val = snap_to_preferred(chosen_val, cr.preferred_series) + + current_values[cr.component_name] = chosen_val + + history.append( + { + "iteration": iteration, + "optimized_component": cr.component_name, + "values": dict(current_values), + "cost": chosen_cost, + "metrics": chosen_metrics, + } + ) + + if chosen_cost < best_cost: + best_cost = chosen_cost + best_values = dict(current_values) + best_metrics = chosen_metrics + improved_this_cycle = True + + if not improved_this_cycle: + break + + # Determine which targets are met (within 5%) + targets_met: dict[str, bool] = {} + for t in targets: + key = f"{t.signal_name}.{t.metric}" + measured = best_metrics.get(key) + if measured is None or math.isnan(measured): + targets_met[key] = False + else: + tolerance = abs(t.target_value * 0.05) if abs(t.target_value) > 0 else 0.05 + targets_met[key] = abs(measured - t.target_value) <= tolerance + + return OptimizationResult( + best_values=best_values, + best_cost=best_cost, + iterations=iteration, + history=history, + targets_met=targets_met, + final_metrics=best_metrics, + ) + + +async def optimize_component_values( + netlist_template: str, + targets: list[OptimizationTarget], + component_ranges: list[ComponentRange], + max_iterations: int = 20, + method: str = "binary_search", +) -> OptimizationResult: + """Iteratively adjust component values toward target specifications. + + Creates temporary .cir files, runs real LTspice simulations via Wine, + and evaluates a cost function against the targets after each run. + + Args: + netlist_template: Netlist text with ``{ComponentName}`` placeholders + (e.g. ``{R1}``, ``{C1}``) that get substituted each iteration. + targets: Performance targets to optimize toward. + component_ranges: Allowed ranges for each tunable component. + max_iterations: Maximum simulation iterations (default 20). + method: ``"binary_search"`` for single-component optimization, + ``"coordinate_descent"`` for multi-component. When + ``"binary_search"`` is requested with multiple components, + coordinate descent is used automatically. + + Returns: + OptimizationResult with best values found, cost history, + and whether each target was met. + """ + work_dir = Path(tempfile.mkdtemp(prefix="ltspice_opt_")) + + if len(component_ranges) == 1 and len(targets) == 1 and method == "binary_search": + return await _binary_search_single( + netlist_template, + targets[0], + component_ranges[0], + max_iterations, + work_dir, + ) + + return await _coordinate_descent( + netlist_template, + targets, + component_ranges, + max_iterations, + work_dir, + ) diff --git a/src/mcp_ltspice/power_analysis.py b/src/mcp_ltspice/power_analysis.py new file mode 100644 index 0000000..ce9bb79 --- /dev/null +++ b/src/mcp_ltspice/power_analysis.py @@ -0,0 +1,196 @@ +"""Power and efficiency calculations from simulation data.""" + +import numpy as np + +# np.trapz was renamed to np.trapezoid in numpy 2.0 +_trapz = getattr(np, "trapezoid", getattr(np, "trapz", None)) + + +def compute_instantaneous_power(voltage: np.ndarray, current: np.ndarray) -> np.ndarray: + """Element-wise power: P(t) = V(t) * I(t). + + Args: + voltage: Voltage waveform array + current: Current waveform array + + Returns: + Instantaneous power array (same length as inputs) + """ + return np.real(voltage) * np.real(current) + + +def compute_average_power(time: np.ndarray, voltage: np.ndarray, current: np.ndarray) -> float: + """Time-averaged power: P_avg = (1/T) * integral(V*I dt). + + Uses trapezoidal integration over the full time span. + + Args: + time: Time array in seconds + voltage: Voltage waveform array + current: Current waveform array + + Returns: + Average power in watts + """ + t = np.real(time) + duration = t[-1] - t[0] + + if len(t) < 2 or duration <= 0: + return 0.0 + + p_inst = np.real(voltage) * np.real(current) + return float(_trapz(p_inst, t) / duration) + + +def compute_efficiency( + time: np.ndarray, + input_voltage: np.ndarray, + input_current: np.ndarray, + output_voltage: np.ndarray, + output_current: np.ndarray, +) -> dict: + """Compute power conversion efficiency. + + Args: + time: Time array in seconds + input_voltage: Input voltage waveform + input_current: Input current waveform + output_voltage: Output voltage waveform + output_current: Output current waveform + + Returns: + Dict with efficiency_percent, input_power_watts, + output_power_watts, power_dissipated_watts + """ + p_in = compute_average_power(time, input_voltage, input_current) + p_out = compute_average_power(time, output_voltage, output_current) + p_dissipated = p_in - p_out + + if abs(p_in) < 1e-15: + efficiency = 0.0 + else: + efficiency = (p_out / p_in) * 100.0 + + return { + "efficiency_percent": efficiency, + "input_power_watts": p_in, + "output_power_watts": p_out, + "power_dissipated_watts": p_dissipated, + } + + +def compute_power_spectrum( + time: np.ndarray, + voltage: np.ndarray, + current: np.ndarray, + max_harmonics: int = 20, +) -> dict: + """FFT of instantaneous power to identify ripple frequencies. + + Args: + time: Time array in seconds + voltage: Voltage waveform array + current: Current waveform array + max_harmonics: Maximum number of frequency bins to return + + Returns: + Dict with frequencies, power_magnitudes, dominant_freq, dc_power + """ + t = np.real(time) + + if len(t) < 2: + return { + "frequencies": [], + "power_magnitudes": [], + "dominant_freq": 0.0, + "dc_power": 0.0, + } + + dt = (t[-1] - t[0]) / (len(t) - 1) + if dt <= 0: + return { + "frequencies": [], + "power_magnitudes": [], + "dominant_freq": 0.0, + "dc_power": float(np.mean(np.real(voltage) * np.real(current))), + } + + p_inst = np.real(voltage) * np.real(current) + n = len(p_inst) + + spectrum = np.fft.rfft(p_inst) + freqs = np.fft.rfftfreq(n, d=dt) + magnitudes = np.abs(spectrum) * 2.0 / n + magnitudes[0] /= 2.0 # DC component correction + + dc_power = float(magnitudes[0]) + + # Dominant AC frequency (largest non-DC bin) + if len(magnitudes) > 1: + dominant_idx = int(np.argmax(magnitudes[1:])) + 1 + dominant_freq = float(freqs[dominant_idx]) + else: + dominant_freq = 0.0 + + # Trim to requested harmonics (plus DC) + limit = min(max_harmonics + 1, len(freqs)) + freqs = freqs[:limit] + magnitudes = magnitudes[:limit] + + return { + "frequencies": freqs.tolist(), + "power_magnitudes": magnitudes.tolist(), + "dominant_freq": dominant_freq, + "dc_power": dc_power, + } + + +def compute_power_metrics(time: np.ndarray, voltage: np.ndarray, current: np.ndarray) -> dict: + """Comprehensive power report for a voltage/current pair. + + Power factor here is defined as the ratio of real (average) power + to apparent power (Vrms * Irms). + + Args: + time: Time array in seconds + voltage: Voltage waveform array + current: Current waveform array + + Returns: + Dict with avg_power, rms_power, peak_power, min_power, power_factor + """ + v = np.real(voltage) + i = np.real(current) + p_inst = v * i + + if len(p_inst) == 0: + return { + "avg_power": 0.0, + "rms_power": 0.0, + "peak_power": 0.0, + "min_power": 0.0, + "power_factor": 0.0, + } + + avg_power = compute_average_power(time, voltage, current) + rms_power = float(np.sqrt(np.mean(p_inst**2))) + peak_power = float(np.max(p_inst)) + min_power = float(np.min(p_inst)) + + # Power factor = avg power / apparent power (Vrms * Irms) + v_rms = float(np.sqrt(np.mean(v**2))) + i_rms = float(np.sqrt(np.mean(i**2))) + apparent = v_rms * i_rms + + if apparent < 1e-15: + power_factor = 0.0 + else: + power_factor = avg_power / apparent + + return { + "avg_power": avg_power, + "rms_power": rms_power, + "peak_power": peak_power, + "min_power": min_power, + "power_factor": power_factor, + } diff --git a/src/mcp_ltspice/raw_parser.py b/src/mcp_ltspice/raw_parser.py index 3e5e8d2..f9b8a27 100644 --- a/src/mcp_ltspice/raw_parser.py +++ b/src/mcp_ltspice/raw_parser.py @@ -15,6 +15,7 @@ import numpy as np @dataclass class Variable: """A variable (signal) in the raw file.""" + index: int name: str type: str # e.g., "voltage", "current", "time", "frequency" @@ -23,6 +24,7 @@ class Variable: @dataclass class RawFile: """Parsed LTspice .raw file.""" + title: str date: str plotname: str @@ -30,22 +32,65 @@ class RawFile: variables: list[Variable] points: int data: np.ndarray # Shape: (n_variables, n_points) + n_runs: int = 1 # Number of runs (>1 for .step/.mc/.temp) + run_boundaries: list[int] | None = None # Start index of each run - def get_variable(self, name: str) -> np.ndarray | None: - """Get data for a variable by name (case-insensitive partial match).""" + def get_variable(self, name: str, run: int | None = None) -> np.ndarray | None: + """Get data for a variable by name (case-insensitive partial match). + + Args: + name: Signal name (partial match OK) + run: If stepped data, return only this run (0-indexed). None = all data. + """ name_lower = name.lower() for var in self.variables: if name_lower in var.name.lower(): - return self.data[var.index] + arr = self.data[var.index] + if run is not None and self.run_boundaries: + start, end = self._run_slice(run) + return arr[start:end] + return arr return None - def get_time(self) -> np.ndarray | None: + def get_time(self, run: int | None = None) -> np.ndarray | None: """Get the time axis (for transient analysis).""" - return self.get_variable("time") + return self.get_variable("time", run=run) - def get_frequency(self) -> np.ndarray | None: + def get_frequency(self, run: int | None = None) -> np.ndarray | None: """Get the frequency axis (for AC analysis).""" - return self.get_variable("frequency") + return self.get_variable("frequency", run=run) + + def get_run_data(self, run: int) -> "RawFile": + """Extract a single run as a new RawFile (for stepped simulations).""" + if not self.run_boundaries or run >= self.n_runs: + return self + start, end = self._run_slice(run) + return RawFile( + title=self.title, + date=self.date, + plotname=self.plotname, + flags=self.flags, + variables=self.variables, + points=end - start, + data=self.data[:, start:end].copy(), + n_runs=1, + run_boundaries=None, + ) + + @property + def is_stepped(self) -> bool: + return self.n_runs > 1 + + def _run_slice(self, run: int) -> tuple[int, int]: + """Get (start, end) indices for a given run.""" + if not self.run_boundaries: + return 0, self.points + start = self.run_boundaries[run] + if run + 1 < len(self.run_boundaries): + end = self.run_boundaries[run + 1] + else: + end = self.data.shape[1] + return start, end def parse_raw_file(path: Path | str) -> RawFile: @@ -68,7 +113,7 @@ def parse_raw_file(path: Path | str) -> RawFile: # Detect encoding: UTF-16 LE (Windows) vs ASCII # UTF-16 LE has null bytes between characters - is_utf16 = content[1:2] == b'\x00' and content[3:4] == b'\x00' + is_utf16 = content[1:2] == b"\x00" and content[3:4] == b"\x00" if is_utf16: # UTF-16 LE encoding - decode header portion, find Binary marker @@ -106,7 +151,6 @@ def parse_raw_file(path: Path | str) -> RawFile: points = 0 in_variables = False - var_count = 0 for line in header_lines: line = line.strip() @@ -120,12 +164,17 @@ def parse_raw_file(path: Path | str) -> RawFile: elif line.startswith("Flags:"): flags = line[6:].strip().split() elif line.startswith("No. Variables:"): - var_count = int(line[14:].strip()) + pass # Parsed from Variables section instead elif line.startswith("No. Points:"): points = int(line[11:].strip()) elif line.startswith("Variables:"): in_variables = True - elif in_variables and line and not line.startswith("Binary") and not line.startswith("Values"): + elif ( + in_variables + and line + and not line.startswith("Binary") + and not line.startswith("Values") + ): # Parse variable line: "index name type" parts = line.split() if len(parts) >= 3: @@ -143,27 +192,36 @@ def parse_raw_file(path: Path | str) -> RawFile: is_complex = "complex" in flags is_stepped = "stepped" in flags - is_forward = "forward" in flags # FastAccess format + # "forward" flag indicates FastAccess format (unused for now) if is_binary: data = _parse_binary_data(data_bytes, n_vars, points, is_complex) else: data = _parse_ascii_data(data_bytes.decode("utf-8"), n_vars, points, is_complex) + # Detect run boundaries for stepped simulations + n_runs = 1 + run_boundaries = None + if is_stepped and data.shape[1] > 1: + run_boundaries = _detect_run_boundaries(data[0]) + n_runs = len(run_boundaries) + + actual_points = data.shape[1] + return RawFile( title=title, date=date, plotname=plotname, flags=flags, variables=variables, - points=points, + points=actual_points, data=data, + n_runs=n_runs, + run_boundaries=run_boundaries, ) -def _parse_binary_data( - data: bytes, n_vars: int, points: int, is_complex: bool -) -> np.ndarray: +def _parse_binary_data(data: bytes, n_vars: int, points: int, is_complex: bool) -> np.ndarray: """Parse binary data section. LTspice binary formats: @@ -178,7 +236,7 @@ def _parse_binary_data( actual_points = len(data) // bytes_per_point # Read as flat complex128 array and reshape - flat = np.frombuffer(data[:actual_points * bytes_per_point], dtype=np.complex128) + flat = np.frombuffer(data[: actual_points * bytes_per_point], dtype=np.complex128) return flat.reshape(actual_points, n_vars).T.copy() # Real data - detect format from data size @@ -197,12 +255,12 @@ def _parse_binary_data( for p in range(actual_points): # Time as double - result[0, p] = struct.unpack(" np.ndarray: +def _parse_ascii_data(data: str, n_vars: int, points: int, is_complex: bool) -> np.ndarray: """Parse ASCII data section.""" lines = data.strip().split("\n") @@ -258,3 +314,22 @@ def _parse_ascii_data( continue return result + + +def _detect_run_boundaries(x_axis: np.ndarray) -> list[int]: + """Detect run boundaries in stepped simulation data. + + In stepped data, the independent variable (time or frequency) resets + at the start of each new run. We detect this as a point where the + value decreases (time goes backwards) or resets to near-zero. + """ + x = np.real(x_axis) if np.iscomplexobj(x_axis) else x_axis + + boundaries = [0] # First run always starts at index 0 + + for i in range(1, len(x)): + # Detect reset: value drops back to near the starting value + if x[i] <= x[0] and x[i] < x[i - 1]: + boundaries.append(i) + + return boundaries diff --git a/src/mcp_ltspice/runner.py b/src/mcp_ltspice/runner.py index faabce0..f0a1071 100644 --- a/src/mcp_ltspice/runner.py +++ b/src/mcp_ltspice/runner.py @@ -19,6 +19,7 @@ from .raw_parser import RawFile, parse_raw_file @dataclass class SimulationResult: """Result of a simulation run.""" + success: bool raw_file: Path | None log_file: Path | None @@ -47,6 +48,7 @@ async def run_simulation( SimulationResult with status and data """ import time + start_time = time.monotonic() # Validate installation @@ -121,10 +123,9 @@ async def run_simulation( try: stdout_bytes, stderr_bytes = await asyncio.wait_for( - process.communicate(), - timeout=timeout + process.communicate(), timeout=timeout ) - except asyncio.TimeoutError: + except TimeoutError: process.kill() await process.wait() return SimulationResult( @@ -226,21 +227,33 @@ async def run_netlist( SimulationResult with status and data """ import time + start_time = time.monotonic() ok, msg = validate_installation() if not ok: return SimulationResult( - success=False, raw_file=None, log_file=None, raw_data=None, - error=msg, stdout="", stderr="", elapsed_seconds=0, + success=False, + raw_file=None, + log_file=None, + raw_data=None, + error=msg, + stdout="", + stderr="", + elapsed_seconds=0, ) netlist_path = Path(netlist_path).resolve() if not netlist_path.exists(): return SimulationResult( - success=False, raw_file=None, log_file=None, raw_data=None, + success=False, + raw_file=None, + log_file=None, + raw_data=None, error=f"Netlist not found: {netlist_path}", - stdout="", stderr="", elapsed_seconds=0, + stdout="", + stderr="", + elapsed_seconds=0, ) temp_dir = None @@ -277,16 +290,19 @@ async def run_netlist( try: stdout_bytes, stderr_bytes = await asyncio.wait_for( - process.communicate(), - timeout=timeout + process.communicate(), timeout=timeout ) - except asyncio.TimeoutError: + except TimeoutError: process.kill() await process.wait() return SimulationResult( - success=False, raw_file=None, log_file=None, raw_data=None, + success=False, + raw_file=None, + log_file=None, + raw_data=None, error=f"Simulation timed out after {timeout} seconds", - stdout="", stderr="", + stdout="", + stderr="", elapsed_seconds=time.monotonic() - start_time, ) @@ -308,8 +324,11 @@ async def run_netlist( success=False, raw_file=None, log_file=log_file if log_file.exists() else None, - raw_data=None, error=error_msg, - stdout=stdout, stderr=stderr, elapsed_seconds=elapsed, + raw_data=None, + error=error_msg, + stdout=stdout, + stderr=stderr, + elapsed_seconds=elapsed, ) raw_data = None @@ -326,7 +345,9 @@ async def run_netlist( log_file=log_file if log_file.exists() else None, raw_data=raw_data, error=parse_error, - stdout=stdout, stderr=stderr, elapsed_seconds=elapsed, + stdout=stdout, + stderr=stderr, + elapsed_seconds=elapsed, ) finally: pass # Keep temp files for debugging diff --git a/src/mcp_ltspice/schematic.py b/src/mcp_ltspice/schematic.py index 1b40e8f..db888a4 100644 --- a/src/mcp_ltspice/schematic.py +++ b/src/mcp_ltspice/schematic.py @@ -11,6 +11,7 @@ from pathlib import Path @dataclass class Component: """A component instance in the schematic.""" + name: str # Instance name (e.g., R1, C1, M1) symbol: str # Symbol name (e.g., res, cap, nmos) x: int @@ -33,6 +34,7 @@ class Component: @dataclass class Wire: """A wire connection.""" + x1: int y1: int x2: int @@ -42,6 +44,7 @@ class Wire: @dataclass class Text: """Text annotation or SPICE directive.""" + x: int y: int content: str @@ -51,6 +54,7 @@ class Text: @dataclass class Flag: """A net flag/label.""" + x: int y: int name: str @@ -60,6 +64,7 @@ class Flag: @dataclass class Schematic: """A parsed LTspice schematic.""" + version: int = 4 sheet: tuple[int, int, int, int] = (1, 1, 0, 0) components: list[Component] = field(default_factory=list) @@ -113,27 +118,23 @@ def parse_schematic(path: Path | str) -> Schematic: elif line.startswith("SHEET"): parts = line.split() if len(parts) >= 5: - schematic.sheet = ( - int(parts[1]), int(parts[2]), - int(parts[3]), int(parts[4]) - ) + schematic.sheet = (int(parts[1]), int(parts[2]), int(parts[3]), int(parts[4])) elif line.startswith("WIRE"): parts = line.split() if len(parts) >= 5: - schematic.wires.append(Wire( - int(parts[1]), int(parts[2]), - int(parts[3]), int(parts[4]) - )) + schematic.wires.append( + Wire(int(parts[1]), int(parts[2]), int(parts[3]), int(parts[4])) + ) elif line.startswith("FLAG"): parts = line.split() if len(parts) >= 4: - schematic.flags.append(Flag( - int(parts[1]), int(parts[2]), - parts[3], - parts[4] if len(parts) > 4 else "0" - )) + schematic.flags.append( + Flag( + int(parts[1]), int(parts[2]), parts[3], parts[4] if len(parts) > 4 else "0" + ) + ) elif line.startswith("SYMBOL"): # Save previous component before starting a new one @@ -159,11 +160,7 @@ def parse_schematic(path: Path | str) -> Schematic: rotation = int(rot_str[1:]) current_component = Component( - name="", - symbol=symbol, - x=x, y=y, - rotation=rotation, - mirror=mirror + name="", symbol=symbol, x=x, y=y, rotation=rotation, mirror=mirror ) else: current_component = None @@ -184,8 +181,7 @@ def parse_schematic(path: Path | str) -> Schematic: match = re.match(r"TEXT\s+(-?\d+)\s+(-?\d+)\s+(\w+)\s+(\d+)\s*(.*)", line) if match: x, y = int(match.group(1)), int(match.group(2)) - align = match.group(3) - size = int(match.group(4)) + # groups 3 (align) and 4 (size) are parsed but not stored content = match.group(5) if match.group(5) else "" # Check for multi-line text (continuation with \n or actual newlines) @@ -219,7 +215,9 @@ def write_schematic(schematic: Schematic, path: Path | str) -> None: lines = [] lines.append(f"Version {schematic.version}") - lines.append(f"SHEET {schematic.sheet[0]} {schematic.sheet[1]} {schematic.sheet[2]} {schematic.sheet[3]}") + lines.append( + f"SHEET {schematic.sheet[0]} {schematic.sheet[1]} {schematic.sheet[2]} {schematic.sheet[3]}" + ) # Write wires for wire in schematic.wires: @@ -251,10 +249,7 @@ def write_schematic(schematic: Schematic, path: Path | str) -> None: def modify_component_value( - path: Path | str, - component_name: str, - new_value: str, - output_path: Path | str | None = None + path: Path | str, component_name: str, new_value: str, output_path: Path | str | None = None ) -> Schematic: """Modify a component's value in a schematic. @@ -276,8 +271,7 @@ def modify_component_value( if not comp: available = [c.name for c in schematic.components] raise ValueError( - f"Component '{component_name}' not found. " - f"Available components: {', '.join(available)}" + f"Component '{component_name}' not found. Available components: {', '.join(available)}" ) comp.value = new_value diff --git a/src/mcp_ltspice/server.py b/src/mcp_ltspice/server.py index 86f1483..377c64d 100644 --- a/src/mcp_ltspice/server.py +++ b/src/mcp_ltspice/server.py @@ -20,6 +20,20 @@ import numpy as np from fastmcp import FastMCP from . import __version__ +from .asc_generator import ( + generate_inverting_amp, +) +from .asc_generator import ( + generate_rc_lowpass as generate_rc_lowpass_asc, +) +from .asc_generator import ( + generate_voltage_divider as generate_voltage_divider_asc, +) +from .batch import ( + run_monte_carlo, + run_parameter_sweep, + run_temperature_sweep, +) from .config import ( LTSPICE_EXAMPLES, LTSPICE_LIB, @@ -29,14 +43,25 @@ from .diff import diff_schematics as _diff_schematics from .drc import run_drc as _run_drc from .log_parser import parse_log from .models import ( - get_model_details as _get_model_details, search_models as _search_models, +) +from .models import ( search_subcircuits as _search_subcircuits, ) from .netlist import Netlist +from .optimizer import ( + ComponentRange, + OptimizationTarget, + format_engineering, + optimize_component_values, +) +from .power_analysis import compute_efficiency, compute_power_metrics from .raw_parser import parse_raw_file from .runner import run_netlist, run_simulation from .schematic import modify_component_value, parse_schematic +from .stability import compute_stability_metrics +from .touchstone import parse_touchstone, s_param_to_db +from .waveform_expr import WaveformCalculator from .waveform_math import ( compute_bandwidth, compute_fft, @@ -47,7 +72,6 @@ from .waveform_math import ( compute_thd, ) - mcp = FastMCP( name="mcp-ltspice", instructions=""" @@ -65,6 +89,14 @@ mcp = FastMCP( - Run design rule checks before simulation - Compare schematics to see what changed - Export waveform data to CSV + - Measure stability (gain/phase margins from AC loop gain) + - Compute power and efficiency from voltage/current waveforms + - Evaluate waveform math expressions (V*I, gain, dB, etc.) + - Optimize component values to hit target specs automatically + - Generate .asc schematic files (graphical format) + - Run parameter sweeps, temperature sweeps, and Monte Carlo analysis + - Parse Touchstone (.s2p) S-parameter files + - Use circuit templates: buck converter, LDO, diff amp, oscillator, H-bridge LTspice runs via Wine on Linux. Simulations execute in batch mode and results are parsed from binary .raw files. @@ -106,8 +138,7 @@ async def simulate( if result.raw_data: response["variables"] = [ - {"name": v.name, "type": v.type} - for v in result.raw_data.variables + {"name": v.name, "type": v.type} for v in result.raw_data.variables ] response["points"] = result.raw_data.points response["plotname"] = result.raw_data.plotname @@ -148,8 +179,7 @@ async def simulate_netlist( if result.raw_data: response["variables"] = [ - {"name": v.name, "type": v.type} - for v in result.raw_data.variables + {"name": v.name, "type": v.type} for v in result.raw_data.variables ] response["points"] = result.raw_data.points response["raw_file"] = str(result.raw_file) if result.raw_file else None @@ -219,13 +249,9 @@ def get_waveform( if np.iscomplexobj(sampled): result["signals"][name] = { "magnitude_db": [ - 20 * math.log10(abs(x)) if abs(x) > 0 else -200 - for x in sampled - ], - "phase_degrees": [ - math.degrees(math.atan2(x.imag, x.real)) - for x in sampled + 20 * math.log10(abs(x)) if abs(x) > 0 else -200 for x in sampled ], + "phase_degrees": [math.degrees(math.atan2(x.imag, x.real)) for x in sampled], } else: result["signals"][name] = {"values": sampled.tolist()} @@ -272,8 +298,10 @@ def analyze_waveform( signal = raw.get_variable(signal_name) if signal is None: - return {"error": f"Signal '{signal_name}' not found. Available: " - f"{[v.name for v in raw.variables]}"} + return { + "error": f"Signal '{signal_name}' not found. Available: " + f"{[v.name for v in raw.variables]}" + } # Use real parts for time-domain analysis if np.iscomplexobj(time): @@ -293,7 +321,8 @@ def analyze_waveform( elif analysis == "settling_time": if time is not None: results["settling_time"] = compute_settling_time( - time, signal, + time, + signal, final_value=settling_final_value, tolerance_percent=settling_tolerance_pct, ) @@ -301,7 +330,8 @@ def analyze_waveform( elif analysis == "rise_time": if time is not None: results["rise_time"] = compute_rise_time( - time, signal, + time, + signal, low_pct=rise_low_pct, high_pct=rise_high_pct, ) @@ -309,14 +339,16 @@ def analyze_waveform( elif analysis == "fft": if time is not None: results["fft"] = compute_fft( - time, signal, + time, + signal, max_harmonics=fft_max_harmonics, ) elif analysis == "thd": if time is not None: results["thd"] = compute_thd( - time, signal, + time, + signal, n_harmonics=thd_n_harmonics, ) @@ -350,10 +382,7 @@ def measure_bandwidth( return {"error": f"Signal '{signal_name}' not found"} # Convert complex signal to magnitude in dB - mag_db = np.array([ - 20 * math.log10(abs(x)) if abs(x) > 0 else -200 - for x in signal - ]) + mag_db = np.array([20 * math.log10(abs(x)) if abs(x) > 0 else -200 for x in signal]) return compute_bandwidth(freq.real, mag_db, ref_db=ref_db) @@ -384,7 +413,9 @@ def export_csv( # Select signals if signal_names is None: - signal_names = [v.name for v in raw.variables if v.name not in (x_name, "time", "frequency")] + signal_names = [ + v.name for v in raw.variables if v.name not in (x_name, "time", "frequency") + ] # Downsample total = raw.points @@ -445,6 +476,480 @@ def export_csv( } +# ============================================================================ +# STABILITY ANALYSIS TOOLS +# ============================================================================ + + +@mcp.tool() +def analyze_stability( + raw_file_path: str, + signal_name: str, +) -> dict: + """Measure gain margin and phase margin from AC loop gain data. + + Computes Bode plot (magnitude + phase) and finds the crossover + frequencies where gain = 0 dB and phase = -180 degrees. + + Args: + raw_file_path: Path to .raw file from AC simulation + signal_name: Loop gain signal, e.g. "V(out)" or "V(loop_gain)" + """ + raw = parse_raw_file(raw_file_path) + freq = raw.get_frequency() + signal = raw.get_variable(signal_name) + + if freq is None: + return {"error": "Not an AC analysis - no frequency data found"} + if signal is None: + return { + "error": f"Signal '{signal_name}' not found. Available: " + f"{[v.name for v in raw.variables]}" + } + + return compute_stability_metrics(freq.real, signal) + + +# ============================================================================ +# POWER ANALYSIS TOOLS +# ============================================================================ + + +@mcp.tool() +def analyze_power( + raw_file_path: str, + voltage_signal: str, + current_signal: str, +) -> dict: + """Compute power metrics from voltage and current waveforms. + + Returns average power, RMS power, peak power, and power factor. + + Args: + raw_file_path: Path to .raw file from transient simulation + voltage_signal: Voltage signal name, e.g. "V(out)" + current_signal: Current signal name, e.g. "I(R1)" + """ + raw = parse_raw_file(raw_file_path) + time = raw.get_time() + voltage = raw.get_variable(voltage_signal) + current = raw.get_variable(current_signal) + + if time is None: + return {"error": "Not a transient analysis - no time data found"} + if voltage is None: + return {"error": f"Voltage signal '{voltage_signal}' not found"} + if current is None: + return {"error": f"Current signal '{current_signal}' not found"} + + return compute_power_metrics(time, voltage, current) + + +@mcp.tool() +def compute_efficiency_tool( + raw_file_path: str, + input_voltage_signal: str, + input_current_signal: str, + output_voltage_signal: str, + output_current_signal: str, +) -> dict: + """Compute power conversion efficiency. + + Compares input power to output power for regulators, converters, etc. + + Args: + raw_file_path: Path to .raw file from transient simulation + input_voltage_signal: Input voltage, e.g. "V(vin)" + input_current_signal: Input current, e.g. "I(Vin)" + output_voltage_signal: Output voltage, e.g. "V(out)" + output_current_signal: Output current, e.g. "I(Rload)" + """ + raw = parse_raw_file(raw_file_path) + time = raw.get_time() + if time is None: + return {"error": "Not a transient analysis"} + + v_in = raw.get_variable(input_voltage_signal) + i_in = raw.get_variable(input_current_signal) + v_out = raw.get_variable(output_voltage_signal) + i_out = raw.get_variable(output_current_signal) + + for name, sig in [ + (input_voltage_signal, v_in), + (input_current_signal, i_in), + (output_voltage_signal, v_out), + (output_current_signal, i_out), + ]: + if sig is None: + return {"error": f"Signal '{name}' not found"} + + return compute_efficiency(time, v_in, i_in, v_out, i_out) + + +# ============================================================================ +# WAVEFORM EXPRESSION TOOLS +# ============================================================================ + + +@mcp.tool() +def evaluate_waveform_expression( + raw_file_path: str, + expression: str, + max_points: int = 1000, +) -> dict: + """Evaluate a math expression on simulation waveforms. + + Supports: +, -, *, /, abs(), sqrt(), log10(), dB() + Signal names reference variables from the .raw file. + + Examples: + "V(out) * I(R1)" - instantaneous power + "V(out) / V(in)" - voltage gain + "dB(V(out))" - magnitude in dB + + Args: + raw_file_path: Path to .raw file + expression: Math expression using signal names + max_points: Maximum data points to return + """ + raw = parse_raw_file(raw_file_path) + calc = WaveformCalculator(raw) + + try: + result = calc.calc(expression) + except ValueError as e: + return {"error": str(e), "available_signals": calc.available_signals()} + + # Get x-axis + x_axis = raw.get_time() + x_name = "time" + if x_axis is None: + x_axis = raw.get_frequency() + x_name = "frequency" + + total = len(result) + step = max(1, total // max_points) + + response = { + "expression": expression, + "total_points": total, + "returned_points": len(result[::step]), + } + + if x_axis is not None: + sampled_x = x_axis[::step] + response["x_axis_name"] = x_name + response["x_axis_data"] = ( + sampled_x.real.tolist() if np.iscomplexobj(sampled_x) else sampled_x.tolist() + ) + + response["values"] = result[::step].tolist() + response["available_signals"] = calc.available_signals() + + return response + + +# ============================================================================ +# OPTIMIZER TOOLS +# ============================================================================ + + +@mcp.tool() +async def optimize_circuit( + netlist_template: str, + targets: list[dict], + component_ranges: list[dict], + max_iterations: int = 20, +) -> dict: + """Automatically optimize component values to hit target specifications. + + Runs real LTspice simulations in a loop, adjusting component values + using binary search (single component) or coordinate descent (multiple). + + Args: + netlist_template: Netlist text with {ComponentName} placeholders + (e.g., {R1}, {C1}) that get substituted each iteration. + targets: List of target specs, each with: + - signal_name: Signal to measure (e.g., "V(out)") + - metric: One of "bandwidth_hz", "rms", "peak_to_peak", + "settling_time", "gain_db", "phase_margin_deg" + - target_value: Desired value + - weight: Importance weight (default 1.0) + component_ranges: List of tunable components, each with: + - component_name: Name matching {placeholder} (e.g., "R1") + - min_value: Minimum value in base units + - max_value: Maximum value in base units + - preferred_series: Optional "E12", "E24", or "E96" for snapping + max_iterations: Max simulation iterations (default 20) + """ + opt_targets = [ + OptimizationTarget( + signal_name=t["signal_name"], + metric=t["metric"], + target_value=t["target_value"], + weight=t.get("weight", 1.0), + ) + for t in targets + ] + + opt_ranges = [ + ComponentRange( + component_name=r["component_name"], + min_value=r["min_value"], + max_value=r["max_value"], + preferred_series=r.get("preferred_series"), + ) + for r in component_ranges + ] + + result = await optimize_component_values( + netlist_template, + opt_targets, + opt_ranges, + max_iterations, + ) + + return { + "best_values": {k: format_engineering(v) for k, v in result.best_values.items()}, + "best_values_raw": result.best_values, + "best_cost": result.best_cost, + "iterations": result.iterations, + "targets_met": result.targets_met, + "final_metrics": result.final_metrics, + "history_length": len(result.history), + } + + +# ============================================================================ +# BATCH SIMULATION TOOLS +# ============================================================================ + + +@mcp.tool() +async def parameter_sweep( + netlist_text: str, + param_name: str, + start: float, + stop: float, + num_points: int = 10, + timeout_seconds: float = 300, +) -> dict: + """Sweep a parameter across a range of values. + + Runs multiple simulations, substituting the parameter value each time. + The netlist should contain a .param directive for the parameter. + + Args: + netlist_text: Netlist with .param directive + param_name: Parameter to sweep (e.g., "Rval") + start: Start value + stop: Stop value + num_points: Number of sweep points + timeout_seconds: Per-simulation timeout + """ + values = np.linspace(start, stop, num_points).tolist() + result = await run_parameter_sweep( + netlist_text, + param_name, + values, + timeout=timeout_seconds, + ) + + return { + "success_count": result.success_count, + "failure_count": result.failure_count, + "total_elapsed": result.total_elapsed, + "parameter_values": result.parameter_values, + "raw_files": [str(r.raw_file) if r.raw_file else None for r in result.results], + } + + +@mcp.tool() +async def temperature_sweep( + netlist_text: str, + temperatures: list[float], + timeout_seconds: float = 300, +) -> dict: + """Run simulations at different temperatures. + + Args: + netlist_text: Netlist text + temperatures: List of temperatures in degrees C + timeout_seconds: Per-simulation timeout + """ + result = await run_temperature_sweep( + netlist_text, + temperatures, + timeout=timeout_seconds, + ) + + return { + "success_count": result.success_count, + "failure_count": result.failure_count, + "total_elapsed": result.total_elapsed, + "parameter_values": result.parameter_values, + "raw_files": [str(r.raw_file) if r.raw_file else None for r in result.results], + } + + +@mcp.tool() +async def monte_carlo( + netlist_text: str, + n_runs: int, + tolerances: dict[str, float], + timeout_seconds: float = 300, + seed: int | None = None, +) -> dict: + """Run Monte Carlo analysis with component tolerances. + + Randomly varies component values within tolerance using a normal + distribution, then runs simulations for each variant. + + Args: + netlist_text: Netlist text + n_runs: Number of Monte Carlo iterations + tolerances: Component tolerances, e.g. {"R1": 0.05} for 5% + timeout_seconds: Per-simulation timeout + seed: Optional RNG seed for reproducibility + """ + result = await run_monte_carlo( + netlist_text, + n_runs, + tolerances, + timeout=timeout_seconds, + seed=seed, + ) + + return { + "success_count": result.success_count, + "failure_count": result.failure_count, + "total_elapsed": result.total_elapsed, + "parameter_values": result.parameter_values, + "raw_files": [str(r.raw_file) if r.raw_file else None for r in result.results], + } + + +# ============================================================================ +# SCHEMATIC GENERATION TOOLS +# ============================================================================ + + +@mcp.tool() +def generate_schematic( + template: str, + output_path: str | None = None, + r: str | None = None, + c: str | None = None, + r1: str | None = None, + r2: str | None = None, + vin: str | None = None, + rin: str | None = None, + rf: str | None = None, + opamp_model: str | None = None, +) -> dict: + """Generate an LTspice .asc schematic file from a template. + + Available templates and their parameters: + - "rc_lowpass": r (resistor, default "1k"), c (capacitor, default "100n") + - "voltage_divider": r1 (top, default "10k"), r2 (bottom, default "10k"), + vin (input voltage, default "5") + - "inverting_amp": rin (input R, default "10k"), rf (feedback R, + default "100k"), opamp_model (default "UniversalOpamp2") + + Args: + template: Template name + output_path: Where to save (None = auto in /tmp) + r: Resistor value (rc_lowpass) + c: Capacitor value (rc_lowpass) + r1: Top resistor (voltage_divider) + r2: Bottom resistor (voltage_divider) + vin: Input voltage (voltage_divider) + rin: Input resistor (inverting_amp) + rf: Feedback resistor (inverting_amp) + opamp_model: Op-amp model name (inverting_amp) + """ + if template == "rc_lowpass": + params: dict[str, str] = {} + if r is not None: + params["r"] = r + if c is not None: + params["c"] = c + sch = generate_rc_lowpass_asc(**params) + elif template == "voltage_divider": + params = {} + if r1 is not None: + params["r1"] = r1 + if r2 is not None: + params["r2"] = r2 + if vin is not None: + params["vin"] = vin + sch = generate_voltage_divider_asc(**params) + elif template == "inverting_amp": + params = {} + if rin is not None: + params["rin"] = rin + if rf is not None: + params["rf"] = rf + if opamp_model is not None: + params["opamp_model"] = opamp_model + sch = generate_inverting_amp(**params) + else: + return { + "error": f"Unknown template '{template}'. " + f"Available: rc_lowpass, voltage_divider, inverting_amp" + } + + if output_path is None: + output_path = str(Path(tempfile.gettempdir()) / f"{template}.asc") + + saved = sch.save(output_path) + return { + "success": True, + "output_path": str(saved), + "schematic_preview": sch.render()[:500], + } + + +# ============================================================================ +# TOUCHSTONE / S-PARAMETER TOOLS +# ============================================================================ + + +@mcp.tool() +def read_touchstone(file_path: str) -> dict: + """Parse a Touchstone (.s1p, .s2p, .snp) S-parameter file. + + Returns S-parameter data, frequency points, and port information. + + Args: + file_path: Path to Touchstone file + """ + try: + data = parse_touchstone(file_path) + except (ValueError, FileNotFoundError) as e: + return {"error": str(e)} + + # Convert S-parameter data to a more digestible format + s_params = {} + for i in range(data.n_ports): + for j in range(data.n_ports): + key = f"S{i + 1}{j + 1}" + s_data = data.data[:, i, j] + s_params[key] = { + "magnitude_db": s_param_to_db(s_data).tolist(), + } + + return { + "filename": data.filename, + "n_ports": data.n_ports, + "n_frequencies": len(data.frequencies), + "freq_range_hz": [float(data.frequencies[0]), float(data.frequencies[-1])], + "reference_impedance": data.reference_impedance, + "s_parameters": s_params, + "comments": data.comments[:5], # First 5 comment lines + } + + # ============================================================================ # SCHEMATIC TOOLS # ============================================================================ @@ -497,7 +1002,10 @@ def edit_component( """ try: sch = modify_component_value( - schematic_path, component_name, new_value, output_path, + schematic_path, + component_name, + new_value, + output_path, ) comp = sch.get_component(component_name) return { @@ -720,12 +1228,14 @@ def get_symbol_info(symbol_path: str) -> dict: if line.startswith("PIN"): parts = line.split() if len(parts) >= 5: - info["pins"].append({ - "x": int(parts[1]), - "y": int(parts[2]), - "justification": parts[3], - "rotation": parts[4] if len(parts) > 4 else "0", - }) + info["pins"].append( + { + "x": int(parts[1]), + "y": int(parts[2]), + "justification": parts[3], + "rotation": parts[4] if len(parts) > 4 else "0", + } + ) elif line.startswith("PINATTR PinName"): pin_name = line.split(None, 2)[2] if len(line.split()) > 2 else "" diff --git a/src/mcp_ltspice/stability.py b/src/mcp_ltspice/stability.py new file mode 100644 index 0000000..d703b5c --- /dev/null +++ b/src/mcp_ltspice/stability.py @@ -0,0 +1,167 @@ +"""Stability analysis for AC simulation loop gain data (gain/phase margins).""" + +import numpy as np + + +def _interp_crossing(x: np.ndarray, y: np.ndarray, threshold: float) -> list[float]: + """Find all x values where y crosses threshold, using linear interpolation.""" + crossings = [] + for i in range(len(y) - 1): + if (y[i] - threshold) * (y[i + 1] - threshold) < 0: + dy = y[i + 1] - y[i] + if abs(dy) < 1e-30: + crossings.append(float(x[i])) + else: + frac = (threshold - y[i]) / dy + crossings.append(float(x[i] + frac * (x[i + 1] - x[i]))) + return crossings + + +def _interp_y_at_crossing( + x: np.ndarray, y: np.ndarray, ref: np.ndarray, threshold: float +) -> list[tuple[float, float]]: + """Find interpolated (x, y) pairs where ref crosses threshold.""" + results = [] + for i in range(len(ref) - 1): + if (ref[i] - threshold) * (ref[i + 1] - threshold) < 0: + dref = ref[i + 1] - ref[i] + if abs(dref) < 1e-30: + results.append((float(x[i]), float(y[i]))) + else: + frac = (threshold - ref[i]) / dref + xi = float(x[i] + frac * (x[i + 1] - x[i])) + yi = float(y[i] + frac * (y[i + 1] - y[i])) + results.append((xi, yi)) + return results + + +def compute_gain_margin(frequency: np.ndarray, loop_gain_complex: np.ndarray) -> dict: + """Compute gain margin from loop gain T(s). + + Args: + frequency: Frequency array in Hz (real-valued) + loop_gain_complex: Complex loop gain T(jw) + + Returns: + Dict with gain_margin_db, phase_crossover_freq_hz, is_stable + """ + if len(frequency) < 2 or len(loop_gain_complex) < 2: + return { + "gain_margin_db": None, + "phase_crossover_freq_hz": None, + "is_stable": None, + } + + freq = np.real(frequency).astype(np.float64) + mag_db = 20.0 * np.log10(np.maximum(np.abs(loop_gain_complex), 1e-30)) + phase_deg = np.degrees(np.unwrap(np.angle(loop_gain_complex))) + + # Phase crossover: where phase crosses -180 degrees + # Find magnitude at that crossing via interpolation + hits = _interp_y_at_crossing(freq, mag_db, phase_deg, -180.0) + + if not hits: + # No phase crossover => infinite gain margin (stable for any gain) + return { + "gain_margin_db": float("inf"), + "phase_crossover_freq_hz": None, + "is_stable": True, + } + + # Use the first phase crossover + crossover_freq, mag_at_crossover = hits[0] + gain_margin = -mag_at_crossover + + return { + "gain_margin_db": float(gain_margin), + "phase_crossover_freq_hz": float(crossover_freq), + "is_stable": gain_margin > 0, + } + + +def compute_phase_margin(frequency: np.ndarray, loop_gain_complex: np.ndarray) -> dict: + """Compute phase margin from loop gain T(s). + + Args: + frequency: Frequency array in Hz (real-valued) + loop_gain_complex: Complex loop gain T(jw) + + Returns: + Dict with phase_margin_deg, gain_crossover_freq_hz, is_stable + """ + if len(frequency) < 2 or len(loop_gain_complex) < 2: + return { + "phase_margin_deg": None, + "gain_crossover_freq_hz": None, + "is_stable": None, + } + + freq = np.real(frequency).astype(np.float64) + mag_db = 20.0 * np.log10(np.maximum(np.abs(loop_gain_complex), 1e-30)) + phase_deg = np.degrees(np.unwrap(np.angle(loop_gain_complex))) + + # Gain crossover: where magnitude crosses 0 dB + # Find phase at that crossing via interpolation + hits = _interp_y_at_crossing(freq, phase_deg, mag_db, 0.0) + + if not hits: + # No gain crossover. If gain is always below 0 dB, system is stable. + is_stable = bool(np.all(mag_db < 0)) + return { + "phase_margin_deg": float("inf") if is_stable else None, + "gain_crossover_freq_hz": None, + "is_stable": is_stable, + } + + # Use the first gain crossover + crossover_freq, phase_at_crossover = hits[0] + phase_margin = 180.0 + phase_at_crossover + + return { + "phase_margin_deg": float(phase_margin), + "gain_crossover_freq_hz": float(crossover_freq), + "is_stable": phase_margin > 0, + } + + +def compute_stability_metrics(frequency: np.ndarray, loop_gain_complex: np.ndarray) -> dict: + """Compute comprehensive stability metrics including Bode plot data. + + Args: + frequency: Frequency array in Hz (real-valued) + loop_gain_complex: Complex loop gain T(jw) + + Returns: + Dict with gain_margin, phase_margin, bode data, crossover + frequencies, and overall stability assessment + """ + if len(frequency) < 2 or len(loop_gain_complex) < 2: + return { + "gain_margin": compute_gain_margin(frequency, loop_gain_complex), + "phase_margin": compute_phase_margin(frequency, loop_gain_complex), + "bode": {"frequency_hz": [], "magnitude_db": [], "phase_deg": []}, + "is_stable": None, + } + + freq = np.real(frequency).astype(np.float64) + mag_db = 20.0 * np.log10(np.maximum(np.abs(loop_gain_complex), 1e-30)) + phase_deg = np.degrees(np.unwrap(np.angle(loop_gain_complex))) + + gm = compute_gain_margin(frequency, loop_gain_complex) + pm = compute_phase_margin(frequency, loop_gain_complex) + + # Overall stability: both margins must be positive (when defined) + gm_ok = gm["is_stable"] is not False + pm_ok = pm["is_stable"] is not False + is_stable = gm_ok and pm_ok + + return { + "gain_margin": gm, + "phase_margin": pm, + "bode": { + "frequency_hz": freq.tolist(), + "magnitude_db": mag_db.tolist(), + "phase_deg": phase_deg.tolist(), + }, + "is_stable": is_stable, + } diff --git a/src/mcp_ltspice/touchstone.py b/src/mcp_ltspice/touchstone.py new file mode 100644 index 0000000..d5caea0 --- /dev/null +++ b/src/mcp_ltspice/touchstone.py @@ -0,0 +1,254 @@ +"""Parse Touchstone (.s1p, .s2p, .snp) S-parameter files for LTspice.""" + +import re +from dataclasses import dataclass, field +from pathlib import Path + +import numpy as np + +# Frequency unit multipliers to Hz +_FREQ_MULTIPLIERS: dict[str, float] = { + "HZ": 1.0, + "KHZ": 1e3, + "MHZ": 1e6, + "GHZ": 1e9, +} + + +@dataclass +class TouchstoneData: + """Parsed contents of a Touchstone file. + + All frequencies are stored in Hz regardless of the original file's + unit. The ``data`` array holds complex-valued parameters with shape + ``(n_freq, n_ports, n_ports)``. + """ + + filename: str + n_ports: int + freq_unit: str + parameter_type: str # S, Y, Z, H, G + format_type: str # MA, DB, RI + reference_impedance: float + frequencies: np.ndarray # shape (n_freq,), always in Hz + data: np.ndarray # shape (n_freq, n_ports, n_ports), complex128 + comments: list[str] = field(default_factory=list) + + +def _to_complex(v1: float, v2: float, fmt: str) -> complex: + """Convert a value pair to complex according to the format type.""" + if fmt == "RI": + return complex(v1, v2) + elif fmt == "MA": + mag = v1 + angle_rad = np.deg2rad(v2) + return complex(mag * np.cos(angle_rad), mag * np.sin(angle_rad)) + elif fmt == "DB": + mag = 10.0 ** (v1 / 20.0) + angle_rad = np.deg2rad(v2) + return complex(mag * np.cos(angle_rad), mag * np.sin(angle_rad)) + else: + raise ValueError(f"Unknown format type: {fmt!r}") + + +def _detect_ports(path: Path) -> int: + """Detect port count from the file extension (.sp).""" + suffix = path.suffix.lower() + m = re.match(r"\.s(\d+)p$", suffix) + if not m: + raise ValueError( + f"Cannot determine port count from extension {suffix!r}. " + "Expected .s1p, .s2p, .s3p, .s4p, etc." + ) + return int(m.group(1)) + + +def parse_touchstone(path: str | Path) -> TouchstoneData: + """Parse a Touchstone file into a TouchstoneData object. + + Handles .s1p through .s4p (and beyond), all three format types + (MA, DB, RI), all frequency units, and continuation lines used + by files with more than two ports. + + Args: + path: Path to the Touchstone file. + + Returns: + A TouchstoneData with frequencies converted to Hz and + parameters stored as complex128. + """ + path = Path(path) + n_ports = _detect_ports(path) + + # Defaults per Touchstone spec + freq_unit = "GHZ" + param_type = "S" + fmt = "MA" + ref_impedance = 50.0 + + comments: list[str] = [] + data_lines: list[str] = [] + option_found = False + + with path.open() as fh: + for raw_line in fh: + line = raw_line.strip() + + # Comment lines + if line.startswith("!"): + comments.append(line[1:].strip()) + continue + + # Option line (only the first one is used) + if line.startswith("#"): + if not option_found: + option_found = True + tokens = line[1:].split() + # Parse tokens case-insensitively + i = 0 + while i < len(tokens): + tok = tokens[i].upper() + if tok in _FREQ_MULTIPLIERS: + freq_unit = tok + elif tok in ("S", "Y", "Z", "H", "G"): + param_type = tok + elif tok in ("MA", "DB", "RI"): + fmt = tok + elif tok == "R" and i + 1 < len(tokens): + i += 1 + ref_impedance = float(tokens[i]) + i += 1 + continue + + # Skip blank lines + if not line: + continue + + # Inline comments after data (some files use !) + if "!" in line: + line = line[: line.index("!")] + + data_lines.append(line) + + # Compute the expected number of value pairs per frequency point. + # Each frequency point has n_ports * n_ports parameters, each + # consisting of two floats. + values_per_freq = n_ports * n_ports * 2 # pairs * 2 values each + + # Flatten all data tokens + all_tokens: list[float] = [] + for dl in data_lines: + all_tokens.extend(float(t) for t in dl.split()) + + # Each frequency row starts with the frequency value, followed by + # values_per_freq data values. For n_ports <= 2, everything fits + # on one line. For n_ports > 2, continuation lines are used and + # don't repeat the frequency. + stride = 1 + values_per_freq # freq + data + if len(all_tokens) % stride != 0: + raise ValueError( + f"Token count {len(all_tokens)} is not a multiple of " + f"expected stride {stride} for a {n_ports}-port file." + ) + + n_freq = len(all_tokens) // stride + freq_mult = _FREQ_MULTIPLIERS[freq_unit] + + frequencies = np.empty(n_freq, dtype=np.float64) + data = np.empty((n_freq, n_ports, n_ports), dtype=np.complex128) + + for k in range(n_freq): + offset = k * stride + frequencies[k] = all_tokens[offset] * freq_mult + + # Data values come in pairs: (v1, v2) per parameter + idx = offset + 1 + for row in range(n_ports): + for col in range(n_ports): + v1 = all_tokens[idx] + v2 = all_tokens[idx + 1] + data[k, row, col] = _to_complex(v1, v2, fmt) + idx += 2 + + return TouchstoneData( + filename=path.name, + n_ports=n_ports, + freq_unit=freq_unit, + parameter_type=param_type, + format_type=fmt, + reference_impedance=ref_impedance, + frequencies=frequencies, + data=data, + comments=comments, + ) + + +def get_s_parameter(data: TouchstoneData, i: int, j: int) -> tuple[np.ndarray, np.ndarray]: + """Extract a single S-parameter across all frequencies. + + Args: + data: Parsed Touchstone data. + i: Row index (1-based, as in S(i,j)). + j: Column index (1-based). + + Returns: + Tuple of (frequencies_hz, complex_values) where both are 1-D + numpy arrays. + """ + if i < 1 or i > data.n_ports: + raise IndexError(f"Row index {i} out of range for {data.n_ports}-port data") + if j < 1 or j > data.n_ports: + raise IndexError(f"Column index {j} out of range for {data.n_ports}-port data") + return data.frequencies.copy(), data.data[:, i - 1, j - 1].copy() + + +def s_param_to_db(complex_values: np.ndarray) -> np.ndarray: + """Convert complex S-parameter values to decibels. + + Computes 20 * log10(|S|), flooring magnitudes at -300 dB to avoid + log-of-zero warnings. + + Args: + complex_values: Array of complex S-parameter values. + + Returns: + Magnitude in dB as a real-valued numpy array. + """ + magnitude = np.abs(complex_values) + return 20.0 * np.log10(np.maximum(magnitude, 1e-15)) + + +def generate_ltspice_subcircuit(touchstone_data: TouchstoneData, name: str) -> str: + """Generate an LTspice-compatible subcircuit wrapping S-parameter data. + + LTspice can reference Touchstone files from within a subcircuit + using the ``.net`` directive. This function produces a ``.sub`` + file body that instantiates the S-parameter block. + + Args: + touchstone_data: Parsed Touchstone data. + name: Subcircuit name (used in .SUBCKT and filename references). + + Returns: + A string containing the complete subcircuit definition. + """ + td = touchstone_data + n = td.n_ports + + # Build port list: port1, port2, ..., portN, plus a reference node + port_names = [f"port{k}" for k in range(1, n + 1)] + port_list = " ".join(port_names) + + lines: list[str] = [] + lines.append(f"* LTspice subcircuit for {td.filename}") + lines.append(f"* {n}-port {td.parameter_type}-parameters, Z0={td.reference_impedance} Ohm") + lines.append( + f"* Frequency range: {td.frequencies[0]:.6g} Hz " + f"to {td.frequencies[-1]:.6g} Hz " + f"({len(td.frequencies)} points)" + ) + lines.append(f".SUBCKT {name} {port_list} ref") + lines.append(f".net {td.filename} {port_list} ref") + lines.append(f".ends {name}") + + return "\n".join(lines) + "\n" diff --git a/src/mcp_ltspice/waveform_expr.py b/src/mcp_ltspice/waveform_expr.py new file mode 100644 index 0000000..24e4437 --- /dev/null +++ b/src/mcp_ltspice/waveform_expr.py @@ -0,0 +1,299 @@ +"""Evaluate mathematical expressions on waveform data. + +Supports expressions like "V(out) * I(R1)", "V(out) / V(in)", +"20*log10(abs(V(out)))", etc. Uses a safe recursive-descent parser +instead of Python's eval/exec. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from enum import Enum, auto + +import numpy as np + +from .raw_parser import RawFile + +# -- Tokenizer --------------------------------------------------------------- + + +class _TokenType(Enum): + NUMBER = auto() + SIGNAL = auto() # e.g., V(out), I(R1), time + FUNC = auto() # abs, sqrt, log10, dB + PLUS = auto() + MINUS = auto() + STAR = auto() + SLASH = auto() + LPAREN = auto() + RPAREN = auto() + EOF = auto() + + +@dataclass +class _Token: + type: _TokenType + value: str + + +# Functions we support (case-insensitive lookup) +_FUNCTIONS = {"abs", "sqrt", "log10", "db"} + +# Regex pieces for the tokenizer +_NUMBER_RE = re.compile(r"[0-9]*\.?[0-9]+(?:[eE][+-]?[0-9]+)?") + +# Signal names: either V(...), I(...) style or bare identifiers like "time". +# Handles nested parens for things like V(N001) and I(R1). +_SIGNAL_RE = re.compile(r"[A-Za-z_]\w*\([^)]*\)") +_IDENT_RE = re.compile(r"[A-Za-z_]\w*") + + +def _tokenize(expr: str) -> list[_Token]: + """Convert an expression string into a list of tokens.""" + tokens: list[_Token] = [] + i = 0 + s = expr.strip() + + while i < len(s): + # Skip whitespace + if s[i].isspace(): + i += 1 + continue + + # Operators / parens + if s[i] == "+": + tokens.append(_Token(_TokenType.PLUS, "+")) + i += 1 + elif s[i] == "-": + tokens.append(_Token(_TokenType.MINUS, "-")) + i += 1 + elif s[i] == "*": + tokens.append(_Token(_TokenType.STAR, "*")) + i += 1 + elif s[i] == "/": + tokens.append(_Token(_TokenType.SLASH, "/")) + i += 1 + elif s[i] == "(": + tokens.append(_Token(_TokenType.LPAREN, "(")) + i += 1 + elif s[i] == ")": + tokens.append(_Token(_TokenType.RPAREN, ")")) + i += 1 + + # Number literal + elif s[i].isdigit() or (s[i] == "." and i + 1 < len(s) and s[i + 1].isdigit()): + m = _NUMBER_RE.match(s, i) + if m: + tokens.append(_Token(_TokenType.NUMBER, m.group())) + i = m.end() + else: + raise ValueError(f"Invalid number at position {i}: {s[i:]!r}") + + # Identifier: could be a function, signal like V(out), or bare name + elif s[i].isalpha() or s[i] == "_": + # Try signal pattern first: name(...) + m = _SIGNAL_RE.match(s, i) + if m: + full = m.group() + # Check if the part before '(' is a known function + paren_pos = full.index("(") + prefix = full[:paren_pos].lower() + if prefix in _FUNCTIONS: + # It's a function call -- emit FUNC token, then let + # the parser handle the parenthesized argument + tokens.append(_Token(_TokenType.FUNC, prefix)) + i += paren_pos # parser will see '(' next + else: + # It's a signal name like V(out) or I(R1) + tokens.append(_Token(_TokenType.SIGNAL, full)) + i = m.end() + else: + # Bare identifier (e.g., "time", or a function without parens) + m = _IDENT_RE.match(s, i) + if m: + name = m.group() + if name.lower() in _FUNCTIONS: + tokens.append(_Token(_TokenType.FUNC, name.lower())) + else: + tokens.append(_Token(_TokenType.SIGNAL, name)) + i = m.end() + else: + raise ValueError(f"Unexpected character at position {i}: {s[i:]!r}") + else: + raise ValueError(f"Unexpected character at position {i}: {s[i:]!r}") + + tokens.append(_Token(_TokenType.EOF, "")) + return tokens + + +# -- Recursive-descent parser / evaluator ------------------------------------ +# +# Grammar: +# expr -> term (('+' | '-') term)* +# term -> unary (('*' | '/') unary)* +# unary -> '-' unary | primary +# primary -> NUMBER | SIGNAL | FUNC '(' expr ')' | '(' expr ')' + + +class _Parser: + """Recursive-descent expression evaluator over numpy arrays.""" + + def __init__(self, tokens: list[_Token], variables: dict[str, np.ndarray]): + self.tokens = tokens + self.variables = variables + self.pos = 0 + + def _peek(self) -> _Token: + return self.tokens[self.pos] + + def _advance(self) -> _Token: + tok = self.tokens[self.pos] + self.pos += 1 + return tok + + def _expect(self, ttype: _TokenType) -> _Token: + tok = self._advance() + if tok.type != ttype: + raise ValueError(f"Expected {ttype.name}, got {tok.type.name} ({tok.value!r})") + return tok + + def parse(self) -> np.ndarray: + result = self._expr() + if self._peek().type != _TokenType.EOF: + raise ValueError(f"Unexpected token after expression: {self._peek().value!r}") + return np.real(result) if np.iscomplexobj(result) else result + + def _expr(self) -> np.ndarray: + left = self._term() + while self._peek().type in (_TokenType.PLUS, _TokenType.MINUS): + op = self._advance() + right = self._term() + if op.type == _TokenType.PLUS: + left = left + right + else: + left = left - right + return left + + def _term(self) -> np.ndarray: + left = self._unary() + while self._peek().type in (_TokenType.STAR, _TokenType.SLASH): + op = self._advance() + right = self._unary() + if op.type == _TokenType.STAR: + left = left * right + else: + # Avoid division by zero -- replace zeros with tiny value + safe = np.where(np.abs(right) < 1e-30, 1e-30, right) + left = left / safe + return left + + def _unary(self) -> np.ndarray: + if self._peek().type == _TokenType.MINUS: + self._advance() + return -self._unary() + return self._primary() + + def _primary(self) -> np.ndarray: + tok = self._peek() + + if tok.type == _TokenType.NUMBER: + self._advance() + return np.float64(tok.value) + + if tok.type == _TokenType.SIGNAL: + self._advance() + return self._resolve_signal(tok.value) + + if tok.type == _TokenType.FUNC: + self._advance() + self._expect(_TokenType.LPAREN) + arg = self._expr() + self._expect(_TokenType.RPAREN) + return self._apply_func(tok.value, arg) + + if tok.type == _TokenType.LPAREN: + self._advance() + result = self._expr() + self._expect(_TokenType.RPAREN) + return result + + raise ValueError(f"Unexpected token: {tok.type.name} ({tok.value!r})") + + def _resolve_signal(self, name: str) -> np.ndarray: + """Look up a signal by name, trying exact match then case-insensitive.""" + if name in self.variables: + return np.real(self.variables[name]) + + name_lower = name.lower() + for key, val in self.variables.items(): + if key.lower() == name_lower: + return np.real(val) + + available = ", ".join(sorted(self.variables.keys())) + raise ValueError(f"Unknown signal {name!r}. Available: {available}") + + @staticmethod + def _apply_func(name: str, arg: np.ndarray) -> np.ndarray: + """Apply a built-in function to a numpy array.""" + a = np.real(arg) + + if name == "abs": + return np.abs(a) + if name == "sqrt": + return np.sqrt(np.maximum(a, 0.0)) + if name == "log10": + return np.log10(np.maximum(np.abs(a), 1e-30)) + if name == "db": + return 20.0 * np.log10(np.maximum(np.abs(a), 1e-30)) + + raise ValueError(f"Unknown function: {name!r}") + + +# -- Public API --------------------------------------------------------------- + + +def evaluate_expression(expression: str, variables: dict[str, np.ndarray]) -> np.ndarray: + """Evaluate a mathematical expression over waveform data. + + Uses a safe recursive-descent parser -- no eval() or exec(). + + Args: + expression: Math expression, e.g. "V(out) * I(R1)" or "dB(V(out))" + variables: Dict mapping signal names to numpy arrays + + Returns: + Resulting numpy array + + Raises: + ValueError: On parse errors or unknown signals + """ + tokens = _tokenize(expression) + parser = _Parser(tokens, variables) + return parser.parse() + + +class WaveformCalculator: + """Evaluate expressions against all signals in a RawFile.""" + + def __init__(self, raw_file: RawFile): + self._raw = raw_file + self._variables: dict[str, np.ndarray] = {} + + for var in raw_file.variables: + self._variables[var.name] = raw_file.data[var.index] + + def calc(self, expression: str) -> np.ndarray: + """Evaluate an expression against the loaded signals. + + Args: + expression: Math expression referencing signal names from the raw file + + Returns: + Resulting numpy array + """ + return evaluate_expression(expression, self._variables) + + def available_signals(self) -> list[str]: + """List all signal names available for expressions.""" + return sorted(self._variables.keys()) diff --git a/src/mcp_ltspice/waveform_math.py b/src/mcp_ltspice/waveform_math.py index d242201..f8f2b92 100644 --- a/src/mcp_ltspice/waveform_math.py +++ b/src/mcp_ltspice/waveform_math.py @@ -3,9 +3,7 @@ import numpy as np -def compute_fft( - time: np.ndarray, signal: np.ndarray, max_harmonics: int = 50 -) -> dict: +def compute_fft(time: np.ndarray, signal: np.ndarray, max_harmonics: int = 50) -> dict: """Compute FFT of a time-domain signal. Args: @@ -73,9 +71,7 @@ def compute_fft( } -def compute_thd( - time: np.ndarray, signal: np.ndarray, n_harmonics: int = 10 -) -> dict: +def compute_thd(time: np.ndarray, signal: np.ndarray, n_harmonics: int = 10) -> dict: """Compute Total Harmonic Distortion. THD is the ratio of harmonic content to the fundamental, expressed @@ -147,14 +143,16 @@ def compute_thd( idx = int(np.argmin(np.abs(freqs - target_freq))) mag = float(magnitudes[idx]) mag_db = 20.0 * np.log10(max(mag, 1e-15)) - harmonic_sum_sq += mag ** 2 + harmonic_sum_sq += mag**2 - harmonics.append({ - "harmonic": h, - "frequency": float(freqs[idx]), - "magnitude": mag, - "magnitude_db": mag_db, - }) + harmonics.append( + { + "harmonic": h, + "frequency": float(freqs[idx]), + "magnitude": mag, + "magnitude_db": mag_db, + } + ) thd_percent = (np.sqrt(harmonic_sum_sq) / fundamental_mag) * 100.0 @@ -179,7 +177,7 @@ def compute_rms(signal: np.ndarray) -> float: return 0.0 real_signal = np.real(signal) - return float(np.sqrt(np.mean(real_signal ** 2))) + return float(np.sqrt(np.mean(real_signal**2))) def compute_peak_to_peak(signal: np.ndarray) -> dict: