Add stability, power, optimization, batch, and schematic generation tools

Phase 3 features bringing the server to 27 tools:
- Stepped/multi-run .raw file parsing (.step, .mc, .temp)
- Stability analysis (gain/phase margin from AC loop gain)
- Power analysis (average, RMS, efficiency, power factor)
- Safe waveform expression evaluator (recursive-descent parser)
- Component value optimizer (binary search + coordinate descent)
- Batch simulation: parameter sweep, temperature sweep, Monte Carlo
- .asc schematic generation from templates (RC filter, divider, inverting amp)
- Touchstone .s1p/.s2p/.snp S-parameter file parsing
- 7 new netlist templates (diff amp, common emitter, buck, LDO, oscillator, H-bridge)
- Full ruff lint and format compliance across all modules
This commit is contained in:
Ryan Malloy 2026-02-10 23:05:35 -07:00
parent b31ff1cbe4
commit ba649d2a6e
18 changed files with 3493 additions and 247 deletions

View File

@ -0,0 +1,329 @@
"""Programmatic LTspice .asc schematic file generation.
Generates graphical schematics (the .asc format LTspice uses for its GUI),
not just text netlists. Placed components snap to an 80-pixel grid and
auto-wired with horizontal left-to-right signal flow.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
# LTspice grid spacing -- all coordinates should be multiples of this
GRID = 80
@dataclass
class _WireEntry:
x1: int
y1: int
x2: int
y2: int
@dataclass
class _SymbolEntry:
symbol: str
name: str
value: str
x: int
y: int
rotation: int # 0, 90, 180, 270
windows: list[str] = field(default_factory=list)
@dataclass
class _FlagEntry:
x: int
y: int
name: str
@dataclass
class _TextEntry:
x: int
y: int
content: str
class AscSchematic:
"""Builder for LTspice .asc schematic files.
All ``add_*`` methods return ``self`` for chaining::
sch = (AscSchematic()
.add_component("res", "R1", "1k", 160, 176, rotation=90)
.add_wire(80, 176, 160, 176)
.add_ground(80, 256))
"""
def __init__(self, sheet_w: int = 880, sheet_h: int = 680) -> None:
self._sheet_w = sheet_w
self._sheet_h = sheet_h
self._wires: list[_WireEntry] = []
self._symbols: list[_SymbolEntry] = []
self._flags: list[_FlagEntry] = []
self._texts: list[_TextEntry] = []
# -- builder methods -----------------------------------------------------
def add_component(
self,
symbol: str,
name: str,
value: str,
x: int,
y: int,
rotation: int = 0,
) -> AscSchematic:
"""Place a component symbol.
Args:
symbol: LTspice symbol name (``res``, ``cap``, ``voltage``, etc.)
name: Instance name (``R1``, ``C1``, ``V1``, ...)
value: Component value (``1k``, ``100n``, ``AC 1``, ...)
x: X coordinate (should be on 80-pixel grid)
y: Y coordinate
rotation: 0, 90, 180, or 270 degrees
"""
windows: list[str] = []
# For resistors and inductors placed at R90, shift the WINDOW lines
# so labels sit neatly above/below the body
if rotation == 90 and symbol in ("res", "ind", "ind2"):
windows = [
"WINDOW 0 0 56 VBottom 2",
"WINDOW 3 32 56 VTop 2",
]
self._symbols.append(_SymbolEntry(symbol, name, value, x, y, rotation, windows))
return self
def add_wire(self, x1: int, y1: int, x2: int, y2: int) -> AscSchematic:
"""Add a wire segment between two points."""
self._wires.append(_WireEntry(x1, y1, x2, y2))
return self
def add_ground(self, x: int, y: int) -> AscSchematic:
"""Place a ground flag (net name ``0``)."""
self._flags.append(_FlagEntry(x, y, "0"))
return self
def add_net_label(self, name: str, x: int, y: int) -> AscSchematic:
"""Place a named net label (e.g., ``out``, ``vdd``)."""
self._flags.append(_FlagEntry(x, y, name))
return self
def add_directive(self, text: str, x: int, y: int) -> AscSchematic:
"""Add a SPICE directive (rendered with ``!`` prefix)."""
self._texts.append(_TextEntry(x, y, text))
return self
# -- output --------------------------------------------------------------
def render(self) -> str:
"""Render the schematic to an ``.asc`` format string."""
lines: list[str] = []
lines.append("Version 4")
lines.append(f"SHEET 1 {self._sheet_w} {self._sheet_h}")
for w in self._wires:
lines.append(f"WIRE {w.x1} {w.y1} {w.x2} {w.y2}")
for f in self._flags:
lines.append(f"FLAG {f.x} {f.y} {f.name}")
for s in self._symbols:
lines.append(f"SYMBOL {s.symbol} {s.x} {s.y} R{s.rotation}")
for win in s.windows:
lines.append(win)
lines.append(f"SYMATTR InstName {s.name}")
lines.append(f"SYMATTR Value {s.value}")
for t in self._texts:
lines.append(f"TEXT {t.x} {t.y} Left 2 !{t.content}")
return "\n".join(lines) + "\n"
def save(self, path: Path | str) -> Path:
"""Write the schematic to an ``.asc`` file on disk."""
path = Path(path)
path.write_text(self.render(), encoding="utf-8")
return path
# ============================================================================
# Layout helper functions -- auto-placed, ready-to-simulate schematics
# ============================================================================
def generate_rc_lowpass(r: str = "1k", c: str = "100n") -> AscSchematic:
"""Generate an RC lowpass filter schematic with AC analysis.
Signal flow (left to right)::
V1 --[R1]-- out --+
|
[C1]
|
GND
Args:
r: Resistor value (e.g., ``"1k"``, ``"4.7k"``)
c: Capacitor value (e.g., ``"100n"``, ``"10p"``)
Returns:
An ``AscSchematic`` ready to ``.save()`` or ``.render()``.
"""
# Grid positions (all multiples of GRID=80)
# V1 at x=80, R1 from 160..240, C1 at 304, out label at 304
sch = AscSchematic()
# Wires: V1_p -> R1_in, R1_out -> C1_top
sch.add_wire(80, 176, 160, 176)
sch.add_wire(240, 176, 304, 176)
# Vertical wire for cap bottom to ground
sch.add_wire(304, 256, 304, 256)
# Components
sch.add_component("res", "R1", r, 160, 176, rotation=90)
sch.add_component("cap", "C1", c, 288, 192, rotation=0)
sch.add_component("voltage", "V1", "AC 1", 80, 176, rotation=0)
# Ground flags (V1 bottom and C1 bottom share ground)
sch.add_ground(80, 256)
sch.add_ground(304, 256)
# Output net label
sch.add_net_label("out", 304, 176)
# Simulation directive
sch.add_directive(".ac dec 100 1 1meg", 80, 312)
return sch
def generate_voltage_divider(
r1: str = "10k",
r2: str = "10k",
vin: str = "5",
) -> AscSchematic:
"""Generate a voltage divider schematic with operating-point analysis.
Topology::
V1 --[R1]-- out --[R2]-- GND
Args:
r1: Top resistor value
r2: Bottom resistor value
vin: DC input voltage
Returns:
An ``AscSchematic`` ready to ``.save()`` or ``.render()``.
"""
sch = AscSchematic()
# Layout: V1 on left, R1 horizontal, junction = "out", R2 vertical to gnd
# V1 at (80, 176)
# R1 from (160, 176) to (240, 176) -- horizontal (R90)
# R2 from (320, 192) to (320, 272) -- vertical (R0)
# Wires
sch.add_wire(80, 176, 160, 176) # V1_p to R1_left
sch.add_wire(240, 176, 320, 176) # R1_right to R2_top / out node
# Components
sch.add_component("res", "R1", r1, 160, 176, rotation=90)
sch.add_component("res", "R2", r2, 304, 192, rotation=0)
sch.add_component("voltage", "V1", vin, 80, 176, rotation=0)
# Ground
sch.add_ground(80, 256)
sch.add_ground(320, 272)
# Net label
sch.add_net_label("out", 320, 176)
# Directive
sch.add_directive(".op", 80, 312)
return sch
def generate_inverting_amp(
rin: str = "10k",
rf: str = "100k",
opamp_model: str = "UniversalOpamp2",
) -> AscSchematic:
"""Generate an inverting op-amp amplifier schematic.
Topology::
V1 --[Rin]--> inv(-) --[Rf]--> out
non-inv(+) --> GND
Supply: Vpos=+15V, Vneg=-15V
The gain is ``-Rf/Rin``.
Args:
rin: Input resistor value
rf: Feedback resistor value
opamp_model: Op-amp symbol name (default ``UniversalOpamp2``,
the built-in ideal op-amp that needs no external model file)
Returns:
An ``AscSchematic`` ready to ``.save()`` or ``.render()``.
"""
sch = AscSchematic(sheet_w=1200, sheet_h=880)
# Coordinate plan (all on 80-grid):
# V1 source at (80, 320)
# Rin horizontal from (192, 320) rotation=90
# Opamp at (400, 320) -- inv input top-left, non-inv bottom-left, out right
# Rf from (400, 240) across top to output, rotation=90
# Output net at (560, 320)
# Vpos at (480, 160), Vneg at (480, 480)
# -- wires ---------------------------------------------------------------
# V1_p to Rin left
sch.add_wire(80, 320, 192, 320)
# Rin right to opamp inv input
sch.add_wire(272, 320, 400, 320)
# Opamp inv to Rf left (vertical up to Rf row, then horizontal)
sch.add_wire(400, 320, 400, 240)
sch.add_wire(400, 240, 416, 240)
# Rf right to output
sch.add_wire(496, 240, 560, 240)
# Output down to opamp output level
sch.add_wire(560, 240, 560, 336)
# Opamp output to out node
sch.add_wire(496, 336, 560, 336)
# Opamp non-inv to ground
sch.add_wire(400, 352, 400, 400)
# Supply wires
sch.add_wire(448, 288, 448, 240)
sch.add_wire(448, 240, 480, 240)
sch.add_wire(448, 368, 448, 416)
sch.add_wire(448, 416, 480, 416)
# -- components ----------------------------------------------------------
sch.add_component("voltage", "V1", "AC 1", 80, 320, rotation=0)
sch.add_component("res", "Rin", rin, 192, 320, rotation=90)
sch.add_component("res", "Rf", rf, 416, 240, rotation=90)
sch.add_component(f"Opamps\\\\{opamp_model}", "U1", opamp_model, 400, 304, rotation=0)
sch.add_component("voltage", "Vpos", "15", 480, 160, rotation=0)
sch.add_component("voltage", "Vneg", "15", 480, 416, rotation=0)
# -- ground & labels -----------------------------------------------------
sch.add_ground(80, 400)
sch.add_ground(400, 400)
sch.add_ground(480, 240)
sch.add_ground(480, 496)
sch.add_net_label("out", 560, 336)
sch.add_net_label("inv", 400, 320)
# -- directives ----------------------------------------------------------
sch.add_directive(".ac dec 100 1 1meg", 80, 544)
return sch

324
src/mcp_ltspice/batch.py Normal file
View File

@ -0,0 +1,324 @@
"""Batch simulation runner for parameter sweeps and Monte Carlo analysis.
Runs multiple LTspice simulations sequentially (LTspice under Wine is
single-instance) and collects results into a single BatchResult.
"""
from __future__ import annotations
import random
import re
import tempfile
import time
from dataclasses import dataclass, field
from pathlib import Path
from .runner import SimulationResult, run_netlist
@dataclass
class BatchResult:
"""Aggregated results from a batch of simulations."""
results: list[SimulationResult] = field(default_factory=list)
parameter_values: list[dict] = field(default_factory=list)
total_elapsed: float = 0.0
@property
def success_count(self) -> int:
return sum(1 for r in self.results if r.success)
@property
def failure_count(self) -> int:
return sum(1 for r in self.results if not r.success)
async def run_batch(
netlists: list[tuple[str, str]],
timeout: float = 300,
) -> BatchResult:
"""Run a list of netlists sequentially and collect results.
Args:
netlists: List of ``(name, netlist_text)`` pairs. Each netlist
is written to a temp ``.cir`` file and simulated.
timeout: Per-simulation timeout in seconds.
Returns:
BatchResult with one SimulationResult per netlist.
"""
batch = BatchResult()
start = time.monotonic()
for name, text in netlists:
safe = _safe_filename(name)
work_dir = Path(tempfile.mkdtemp(prefix=f"ltbatch_{safe}_"))
cir_path = work_dir / f"{safe}.cir"
cir_path.write_text(text, encoding="utf-8")
result = await run_netlist(cir_path, timeout=timeout, work_dir=work_dir)
batch.results.append(result)
batch.parameter_values.append({"name": name})
batch.total_elapsed = time.monotonic() - start
return batch
async def run_parameter_sweep(
netlist_template: str,
param_name: str,
values: list[float],
timeout: float = 300,
) -> BatchResult:
"""Sweep a single parameter across a range of values.
The *netlist_template* should contain a ``.param`` directive for
*param_name* whose value will be replaced for each run. If no
``.param`` line is found, one is inserted before the ``.end``
directive.
Args:
netlist_template: Netlist text with a ``.param {param_name}=...`` line.
param_name: Parameter to sweep (e.g., ``"Rval"``).
values: List of numeric values to substitute.
timeout: Per-simulation timeout in seconds.
Returns:
BatchResult indexed by parameter value.
"""
batch = BatchResult()
start = time.monotonic()
param_re = re.compile(
rf"(\.param\s+{re.escape(param_name)}\s*=\s*)(\S+)",
re.IGNORECASE,
)
for val in values:
val_str = _format_value(val)
if param_re.search(netlist_template):
text = param_re.sub(rf"\g<1>{val_str}", netlist_template)
else:
# Insert a .param line before .end
text = _insert_before_end(netlist_template, f".param {param_name}={val_str}")
safe = f"sweep_{param_name}_{val_str}"
work_dir = Path(tempfile.mkdtemp(prefix=f"ltbatch_{_safe_filename(safe)}_"))
cir_path = work_dir / f"{_safe_filename(safe)}.cir"
cir_path.write_text(text, encoding="utf-8")
result = await run_netlist(cir_path, timeout=timeout, work_dir=work_dir)
batch.results.append(result)
batch.parameter_values.append({param_name: val})
batch.total_elapsed = time.monotonic() - start
return batch
async def run_temperature_sweep(
netlist_text: str,
temperatures: list[float],
timeout: float = 300,
) -> BatchResult:
"""Run the same netlist at different temperatures.
A ``.temp`` directive is added (or replaced) for each run.
Args:
netlist_text: Base netlist text.
temperatures: List of temperatures in degrees C.
timeout: Per-simulation timeout in seconds.
Returns:
BatchResult indexed by temperature.
"""
batch = BatchResult()
start = time.monotonic()
temp_re = re.compile(r"\.temp\s+\S+", re.IGNORECASE)
for temp in temperatures:
temp_str = _format_value(temp)
if temp_re.search(netlist_text):
text = temp_re.sub(f".temp {temp_str}", netlist_text)
else:
text = _insert_before_end(netlist_text, f".temp {temp_str}")
safe = f"temp_{temp_str}"
work_dir = Path(tempfile.mkdtemp(prefix=f"ltbatch_{_safe_filename(safe)}_"))
cir_path = work_dir / f"{_safe_filename(safe)}.cir"
cir_path.write_text(text, encoding="utf-8")
result = await run_netlist(cir_path, timeout=timeout, work_dir=work_dir)
batch.results.append(result)
batch.parameter_values.append({"temperature": temp})
batch.total_elapsed = time.monotonic() - start
return batch
async def run_monte_carlo(
netlist_text: str,
n_runs: int,
tolerances: dict[str, float],
timeout: float = 300,
seed: int | None = None,
) -> BatchResult:
"""Monte Carlo analysis with component tolerances.
For each run, every component listed in *tolerances* has its value
randomly varied using a normal distribution truncated to +/-3 sigma,
where sigma equals the tolerance fraction.
Args:
netlist_text: Base netlist text.
n_runs: Number of Monte Carlo iterations.
tolerances: Mapping of component name to tolerance fraction
(e.g., ``{"R1": 0.05}`` for 5%).
timeout: Per-simulation timeout in seconds.
seed: Optional RNG seed for reproducibility.
Returns:
BatchResult with per-run component values in ``parameter_values``.
"""
rng = random.Random(seed)
batch = BatchResult()
start = time.monotonic()
# Pre-extract nominal values from the netlist
nominals = _extract_component_values(netlist_text, list(tolerances.keys()))
for run_idx in range(n_runs):
text = netlist_text
params: dict[str, float] = {}
for comp_name, tol_frac in tolerances.items():
nominal = nominals.get(comp_name)
if nominal is None:
continue
# Normal distribution, clamp to +/-3sigma
sigma = nominal * tol_frac
deviation = rng.gauss(0, sigma)
deviation = max(-3 * sigma, min(3 * sigma, deviation))
varied = nominal + deviation
params[comp_name] = varied
# Replace the component value in the netlist text
text = _replace_component_value(text, comp_name, _format_value(varied))
safe = f"mc_{run_idx:04d}"
work_dir = Path(tempfile.mkdtemp(prefix=f"ltbatch_{safe}_"))
cir_path = work_dir / f"{safe}.cir"
cir_path.write_text(text, encoding="utf-8")
result = await run_netlist(cir_path, timeout=timeout, work_dir=work_dir)
batch.results.append(result)
batch.parameter_values.append(params)
batch.total_elapsed = time.monotonic() - start
return batch
# ============================================================================
# Internal helpers
# ============================================================================
# SPICE engineering suffixes
_SUFFIX_MAP = {
"T": 1e12,
"G": 1e9,
"MEG": 1e6,
"K": 1e3,
"M": 1e-3,
"U": 1e-6,
"N": 1e-9,
"P": 1e-12,
"F": 1e-15,
}
def _parse_spice_value(s: str) -> float | None:
"""Parse a SPICE value string like ``10k``, ``100n``, ``4.7meg`` to float."""
s = s.strip().upper()
if not s:
return None
# Try plain float first
try:
return float(s)
except ValueError:
pass
# Try suffixed form
for suffix, mult in sorted(_SUFFIX_MAP.items(), key=lambda x: -len(x[0])):
if s.endswith(suffix):
num_part = s[: -len(suffix)]
try:
return float(num_part) * mult
except ValueError:
continue
return None
def _format_value(v: float) -> str:
"""Format a float for SPICE, using engineering suffixes where tidy."""
if v == 0:
return "0"
abs_v = abs(v)
# Walk the suffix table from large to small
for suffix, mult in sorted(_SUFFIX_MAP.items(), key=lambda x: -x[1]):
if abs_v >= mult * 0.999:
scaled = v / mult
formatted = f"{scaled:g}{suffix.lower()}"
return formatted
return f"{v:g}"
def _safe_filename(name: str) -> str:
"""Turn an arbitrary name into a safe filename fragment."""
return re.sub(r"[^\w\-.]", "_", name)[:60]
def _insert_before_end(netlist: str, directive: str) -> str:
"""Insert a directive line just before ``.end``."""
end_re = re.compile(r"^(\.end\b)", re.IGNORECASE | re.MULTILINE)
if end_re.search(netlist):
return end_re.sub(f"{directive}\n\\1", netlist)
# No .end found -- append
return netlist.rstrip() + f"\n{directive}\n.end\n"
def _extract_component_values(netlist: str, names: list[str]) -> dict[str, float]:
"""Extract numeric component values from a netlist by instance name.
Looks for lines like ``R1 in out 10k`` and parses the value token.
"""
result: dict[str, float] = {}
for name in names:
pattern = re.compile(
rf"^\s*{re.escape(name)}\s+\S+\s+\S+\s+(\S+)",
re.IGNORECASE | re.MULTILINE,
)
match = pattern.search(netlist)
if match:
parsed = _parse_spice_value(match.group(1))
if parsed is not None:
result[name] = parsed
return result
def _replace_component_value(netlist: str, comp_name: str, new_value: str) -> str:
"""Replace a component's value token in the netlist text."""
pattern = re.compile(
rf"^(\s*{re.escape(comp_name)}\s+\S+\s+\S+\s+)\S+",
re.IGNORECASE | re.MULTILINE,
)
return pattern.sub(rf"\g<1>{new_value}", netlist, count=1)

View File

@ -4,10 +4,9 @@ import os
from pathlib import Path from pathlib import Path
# LTspice installation paths # LTspice installation paths
LTSPICE_DIR = Path(os.environ.get( LTSPICE_DIR = Path(
"LTSPICE_DIR", os.environ.get("LTSPICE_DIR", Path.home() / "claude" / "ltspice" / "extracted" / "ltspice")
Path.home() / "claude" / "ltspice" / "extracted" / "ltspice" )
))
LTSPICE_EXE = LTSPICE_DIR / "LTspice.exe" LTSPICE_EXE = LTSPICE_DIR / "LTspice.exe"
LTSPICE_LIB = LTSPICE_DIR / "lib" LTSPICE_LIB = LTSPICE_DIR / "lib"

View File

@ -3,12 +3,13 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from .schematic import parse_schematic, Schematic, Component from .schematic import Component, Schematic, parse_schematic
@dataclass @dataclass
class ComponentChange: class ComponentChange:
"""A change to a component.""" """A change to a component."""
name: str name: str
change_type: str # "added", "removed", "modified" change_type: str # "added", "removed", "modified"
symbol: str = "" symbol: str = ""
@ -22,6 +23,7 @@ class ComponentChange:
@dataclass @dataclass
class DirectiveChange: class DirectiveChange:
"""A change to a SPICE directive.""" """A change to a SPICE directive."""
change_type: str # "added", "removed", "modified" change_type: str # "added", "removed", "modified"
old_text: str | None = None old_text: str | None = None
new_text: str | None = None new_text: str | None = None
@ -30,6 +32,7 @@ class DirectiveChange:
@dataclass @dataclass
class SchematicDiff: class SchematicDiff:
"""Complete diff between two schematics.""" """Complete diff between two schematics."""
component_changes: list[ComponentChange] = field(default_factory=list) component_changes: list[ComponentChange] = field(default_factory=list)
directive_changes: list[DirectiveChange] = field(default_factory=list) directive_changes: list[DirectiveChange] = field(default_factory=list)
nets_added: list[str] = field(default_factory=list) nets_added: list[str] = field(default_factory=list)
@ -61,9 +64,7 @@ class SchematicDiff:
modified = [c for c in self.component_changes if c.change_type == "modified"] modified = [c for c in self.component_changes if c.change_type == "modified"]
if modified: if modified:
lines.append( lines.append(f"{len(modified)} component{'s' if len(modified) != 1 else ''} modified:")
f"{len(modified)} component{'s' if len(modified) != 1 else ''} modified:"
)
for c in modified: for c in modified:
parts: list[str] = [] parts: list[str] = []
if c.old_value != c.new_value: if c.old_value != c.new_value:
@ -223,9 +224,7 @@ def _component_map(schematic: Schematic) -> dict[str, Component]:
return {comp.name: comp for comp in schematic.components} return {comp.name: comp for comp in schematic.components}
def _diff_components( def _diff_components(schema_a: Schematic, schema_b: Schematic) -> list[ComponentChange]:
schema_a: Schematic, schema_b: Schematic
) -> list[ComponentChange]:
"""Compare components between two schematics.""" """Compare components between two schematics."""
map_a = _component_map(schema_a) map_a = _component_map(schema_a)
map_b = _component_map(schema_b) map_b = _component_map(schema_b)
@ -269,9 +268,7 @@ def _diff_components(
moved = (comp_a.x, comp_a.y) != (comp_b.x, comp_b.y) moved = (comp_a.x, comp_a.y) != (comp_b.x, comp_b.y)
value_changed = comp_a.value != comp_b.value value_changed = comp_a.value != comp_b.value
attrs_changed = comp_a.attributes != comp_b.attributes attrs_changed = comp_a.attributes != comp_b.attributes
rotation_changed = ( rotation_changed = comp_a.rotation != comp_b.rotation or comp_a.mirror != comp_b.mirror
comp_a.rotation != comp_b.rotation or comp_a.mirror != comp_b.mirror
)
if moved or value_changed or attrs_changed or rotation_changed: if moved or value_changed or attrs_changed or rotation_changed:
changes.append( changes.append(
@ -290,9 +287,7 @@ def _diff_components(
return changes return changes
def _diff_directives( def _diff_directives(schema_a: Schematic, schema_b: Schematic) -> list[DirectiveChange]:
schema_a: Schematic, schema_b: Schematic
) -> list[DirectiveChange]:
"""Compare SPICE directives between two schematics.""" """Compare SPICE directives between two schematics."""
directives_a = [t.content for t in schema_a.texts if t.type == "spice"] directives_a = [t.content for t in schema_a.texts if t.type == "spice"]
directives_b = [t.content for t in schema_b.texts if t.type == "spice"] directives_b = [t.content for t in schema_b.texts if t.type == "spice"]
@ -308,15 +303,11 @@ def _diff_directives(
# Removed directives # Removed directives
for key in sorted(keys_a - keys_b): for key in sorted(keys_a - keys_b):
changes.append( changes.append(DirectiveChange(change_type="removed", old_text=norm_a[key]))
DirectiveChange(change_type="removed", old_text=norm_a[key])
)
# Added directives # Added directives
for key in sorted(keys_b - keys_a): for key in sorted(keys_b - keys_a):
changes.append( changes.append(DirectiveChange(change_type="added", new_text=norm_b[key]))
DirectiveChange(change_type="added", new_text=norm_b[key])
)
# For modified detection: directives that share a command keyword but differ. # For modified detection: directives that share a command keyword but differ.
# We match by the first token (e.g., ".tran", ".ac") to detect modifications # We match by the first token (e.g., ".tran", ".ac") to detect modifications
@ -367,9 +358,7 @@ def _diff_directives(
return final_changes return final_changes
def _diff_nets( def _diff_nets(schema_a: Schematic, schema_b: Schematic) -> tuple[list[str], list[str]]:
schema_a: Schematic, schema_b: Schematic
) -> tuple[list[str], list[str]]:
"""Compare net flags between two schematics. """Compare net flags between two schematics.
Returns: Returns:
@ -383,9 +372,7 @@ def _diff_nets(
return added, removed return added, removed
def _diff_wires( def _diff_wires(schema_a: Schematic, schema_b: Schematic) -> tuple[int, int]:
schema_a: Schematic, schema_b: Schematic
) -> tuple[int, int]:
"""Compare wires between two schematics using set operations. """Compare wires between two schematics using set operations.
Returns: Returns:

View File

@ -63,10 +63,7 @@ class DRCResult:
parts.append(f"{info_count} info") parts.append(f"{info_count} info")
status = "FAILED" if err_count else "passed with warnings" status = "FAILED" if err_count else "passed with warnings"
return ( return f"DRC {status}: {self.checks_run} checks run, {', '.join(parts)}."
f"DRC {status}: {self.checks_run} checks run, "
f"{', '.join(parts)}."
)
def to_dict(self) -> dict: def to_dict(self) -> dict:
"""Convert to JSON-serializable dict.""" """Convert to JSON-serializable dict."""
@ -122,8 +119,7 @@ def _check_ground(sch: Schematic, result: DRCResult) -> None:
rule="NO_GROUND", rule="NO_GROUND",
severity=Severity.ERROR, severity=Severity.ERROR,
message=( message=(
"No ground node found. Every circuit needs at least " "No ground node found. Every circuit needs at least one ground (0) connection."
"one ground (0) connection."
), ),
) )
) )
@ -173,18 +169,13 @@ def _check_simulation_directive(sch: Schematic, result: DRCResult) -> None:
result.checks_run += 1 result.checks_run += 1
directives = sch.get_spice_directives() directives = sch.get_spice_directives()
sim_types = [".tran", ".ac", ".dc", ".op", ".noise", ".tf"] sim_types = [".tran", ".ac", ".dc", ".op", ".noise", ".tf"]
has_sim = any( has_sim = any(any(d.lower().startswith(s) for s in sim_types) for d in directives)
any(d.lower().startswith(s) for s in sim_types) for d in directives
)
if not has_sim: if not has_sim:
result.violations.append( result.violations.append(
DRCViolation( DRCViolation(
rule="NO_SIM_DIRECTIVE", rule="NO_SIM_DIRECTIVE",
severity=Severity.ERROR, severity=Severity.ERROR,
message=( message=("No simulation directive found. Add .tran, .ac, .dc, .op, etc."),
"No simulation directive found. "
"Add .tran, .ac, .dc, .op, etc."
),
) )
) )
@ -233,11 +224,7 @@ def _check_voltage_source_loops(sch: Schematic, result: DRCResult) -> None:
# offset along the component axis. Standard pin spacing is 64 units # offset along the component axis. Standard pin spacing is 64 units
# vertically for a non-rotated voltage source (pin+ at top, pin- at # vertically for a non-rotated voltage source (pin+ at top, pin- at
# bottom relative to the symbol origin). # bottom relative to the symbol origin).
voltage_sources = [ voltage_sources = [c for c in sch.components if "voltage" in c.symbol.lower()]
c
for c in sch.components
if "voltage" in c.symbol.lower()
]
if len(voltage_sources) < 2: if len(voltage_sources) < 2:
return return
@ -246,21 +233,21 @@ def _check_voltage_source_loops(sch: Schematic, result: DRCResult) -> None:
# Default (R0): positive pin at (x, y-16), negative at (x, y+16) # Default (R0): positive pin at (x, y-16), negative at (x, y+16)
# We use a coarse offset; the exact value depends on the symbol but # We use a coarse offset; the exact value depends on the symbol but
# 16 is a common half-pin-spacing in LTspice grid units. # 16 is a common half-pin-spacing in LTspice grid units.
PIN_OFFSET = 16 pin_offset = 16
def _pin_positions(comp): def _pin_positions(comp):
"""Return approximate (positive_pin, negative_pin) coordinates.""" """Return approximate (positive_pin, negative_pin) coordinates."""
x, y = comp.x, comp.y x, y = comp.x, comp.y
rot = comp.rotation rot = comp.rotation
if rot == 0: if rot == 0:
return (x, y - PIN_OFFSET), (x, y + PIN_OFFSET) return (x, y - pin_offset), (x, y + pin_offset)
elif rot == 90: elif rot == 90:
return (x + PIN_OFFSET, y), (x - PIN_OFFSET, y) return (x + pin_offset, y), (x - pin_offset, y)
elif rot == 180: elif rot == 180:
return (x, y + PIN_OFFSET), (x, y - PIN_OFFSET) return (x, y + pin_offset), (x, y - pin_offset)
elif rot == 270: elif rot == 270:
return (x - PIN_OFFSET, y), (x + PIN_OFFSET, y) return (x - pin_offset, y), (x + pin_offset, y)
return (x, y - PIN_OFFSET), (x, y + PIN_OFFSET) return (x, y - pin_offset), (x, y + pin_offset)
def _nearest_net(pin: tuple[int, int]) -> tuple[int, int]: def _nearest_net(pin: tuple[int, int]) -> tuple[int, int]:
"""Find the nearest wire/flag point to a pin and return its net root. """Find the nearest wire/flag point to a pin and return its net root.
@ -350,9 +337,7 @@ def _check_component_values(sch: Schematic, result: DRCResult) -> None:
DRCViolation( DRCViolation(
rule="MISSING_VALUE", rule="MISSING_VALUE",
severity=Severity.WARNING, severity=Severity.WARNING,
message=( message=(f"{matched_type} '{comp.name}' has no value set."),
f"{matched_type} '{comp.name}' has no value set."
),
component=comp.name, component=comp.name,
location=(comp.x, comp.y), location=(comp.x, comp.y),
) )
@ -393,7 +378,7 @@ def _check_unconnected_components(sch: Schematic, result: DRCResult) -> None:
""" """
result.checks_run += 1 result.checks_run += 1
PROXIMITY = 16 # LTspice grid spacing proximity = 16 # LTspice grid spacing
# Collect all wire endpoints into a set for fast lookup # Collect all wire endpoints into a set for fast lookup
wire_points: set[tuple[int, int]] = set() wire_points: set[tuple[int, int]] = set()
@ -412,14 +397,14 @@ def _check_unconnected_components(sch: Schematic, result: DRCResult) -> None:
if not comp.name: if not comp.name:
continue continue
# Check if any connection point is within PROXIMITY of the # Check if any connection point is within proximity of the
# component origin. We scan a small bounding box rather than # component origin. We scan a small bounding box rather than
# iterating all points. # iterating all points.
connected = False connected = False
for pt in all_connection_points: for pt in all_connection_points:
dx = abs(pt[0] - comp.x) dx = abs(pt[0] - comp.x)
dy = abs(pt[1] - comp.y) dy = abs(pt[1] - comp.y)
if dx <= PROXIMITY and dy <= PROXIMITY: if dx <= proximity and dy <= proximity:
connected = True connected = True
break break

View File

@ -1,8 +1,8 @@
"""Parse LTspice simulation log files.""" """Parse LTspice simulation log files."""
import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
import re
@dataclass @dataclass
@ -124,9 +124,7 @@ def parse_log(path: Path | str) -> SimulationLog:
# Measurement: failed. # Measurement: failed.
m = _MEAS_FAILED_RE.match(stripped) m = _MEAS_FAILED_RE.match(stripped)
if m: if m:
log.measurements.append( log.measurements.append(Measurement(name=m.group("name"), value=None, failed=True))
Measurement(name=m.group("name"), value=None, failed=True)
)
continue continue
# Measurement: numeric value. # Measurement: numeric value.

View File

@ -1,18 +1,24 @@
"""Search and parse LTspice SPICE model libraries.""" """Search and parse LTspice SPICE model libraries."""
import re
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
import re
from .config import LTSPICE_LIB from .config import LTSPICE_LIB
# Known SPICE model types and their categories # Known SPICE model types and their categories
_DISCRETE_TYPES = frozenset({ _DISCRETE_TYPES = frozenset(
"NPN", "PNP", # BJTs {
"NMOS", "PMOS", "VDMOS", # MOSFETs "NPN",
"D", # Diodes "PNP", # BJTs
"NJF", "PJF", # JFETs "NMOS",
}) "PMOS",
"VDMOS", # MOSFETs
"D", # Diodes
"NJF",
"PJF", # JFETs
}
)
# Module-level cache # Module-level cache
_cache: tuple[list["SpiceModel"], list["SpiceSubcircuit"]] | None = None _cache: tuple[list["SpiceModel"], list["SpiceSubcircuit"]] | None = None
@ -21,8 +27,9 @@ _cache: tuple[list["SpiceModel"], list["SpiceSubcircuit"]] | None = None
@dataclass @dataclass
class SpiceModel: class SpiceModel:
"""A .model definition.""" """A .model definition."""
name: str # e.g., "2N2222"
type: str # e.g., "NPN", "D", "NMOS", "PMOS", "PNP" name: str # e.g., "2N2222"
type: str # e.g., "NPN", "D", "NMOS", "PMOS", "PNP"
parameters: dict[str, str] = field(default_factory=dict) parameters: dict[str, str] = field(default_factory=dict)
source_file: str = "" source_file: str = ""
@ -30,7 +37,8 @@ class SpiceModel:
@dataclass @dataclass
class SpiceSubcircuit: class SpiceSubcircuit:
"""A .subckt definition.""" """A .subckt definition."""
name: str # e.g., "LT1001"
name: str # e.g., "LT1001"
pins: list[str] = field(default_factory=list) pins: list[str] = field(default_factory=list)
pin_names: list[str] = field(default_factory=list) # From comments pin_names: list[str] = field(default_factory=list) # From comments
description: str = "" description: str = ""
@ -145,19 +153,19 @@ def _read_file_text(path: Path) -> str:
# Regex patterns compiled once # Regex patterns compiled once
_MODEL_RE = re.compile( _MODEL_RE = re.compile(
r"^\s*\.model\s+" # .model keyword r"^\s*\.model\s+" # .model keyword
r"(\S+)\s+" # model name r"(\S+)\s+" # model name
r"(?:ako:\S+\s+)?" # optional ako:reference r"(?:ako:\S+\s+)?" # optional ako:reference
r"(\w+)" # type (NPN, D, VDMOS, etc.) r"(\w+)" # type (NPN, D, VDMOS, etc.)
r"(?:\s*\(([^)]*)\))?" # optional (params) r"(?:\s*\(([^)]*)\))?" # optional (params)
r"(.*)", # trailing params outside parens r"(.*)", # trailing params outside parens
re.IGNORECASE, re.IGNORECASE,
) )
_SUBCKT_RE = re.compile( _SUBCKT_RE = re.compile(
r"^\s*\.subckt\s+" r"^\s*\.subckt\s+"
r"(\S+)" # subcircuit name r"(\S+)" # subcircuit name
r"((?:\s+\S+)*)", # pins (space-separated) r"((?:\s+\S+)*)", # pins (space-separated)
re.IGNORECASE, re.IGNORECASE,
) )
@ -247,12 +255,14 @@ def _scan_lib_file(path: Path) -> tuple[list[SpiceModel], list[SpiceSubcircuit]]
param_str = (model_match.group(3) or "") + " " + (model_match.group(4) or "") param_str = (model_match.group(3) or "") + " " + (model_match.group(4) or "")
params = _parse_params(param_str) params = _parse_params(param_str)
models.append(SpiceModel( models.append(
name=name, SpiceModel(
type=mtype, name=name,
parameters=params, type=mtype,
source_file=source, parameters=params,
)) source_file=source,
)
)
continue continue
# Check for .subckt # Check for .subckt

View File

@ -35,22 +35,14 @@ class Netlist:
# -- Passive components --------------------------------------------------- # -- Passive components ---------------------------------------------------
def add_resistor( def add_resistor(self, name: str, node_p: str, node_n: str, value: str) -> "Netlist":
self, name: str, node_p: str, node_n: str, value: str
) -> "Netlist":
"""Add a resistor. Example: add_resistor('R1', 'in', 'out', '10k')""" """Add a resistor. Example: add_resistor('R1', 'in', 'out', '10k')"""
self.components.append( self.components.append(NetlistComponent(name=name, nodes=[node_p, node_n], value=value))
NetlistComponent(name=name, nodes=[node_p, node_n], value=value)
)
return self return self
def add_capacitor( def add_capacitor(self, name: str, node_p: str, node_n: str, value: str) -> "Netlist":
self, name: str, node_p: str, node_n: str, value: str
) -> "Netlist":
"""Add a capacitor.""" """Add a capacitor."""
self.components.append( self.components.append(NetlistComponent(name=name, nodes=[node_p, node_n], value=value))
NetlistComponent(name=name, nodes=[node_p, node_n], value=value)
)
return self return self
def add_inductor( def add_inductor(
@ -64,9 +56,7 @@ class Netlist:
"""Add an inductor with optional series resistance (Rser).""" """Add an inductor with optional series resistance (Rser)."""
params = f"Rser={series_resistance}" if series_resistance else "" params = f"Rser={series_resistance}" if series_resistance else ""
self.components.append( self.components.append(
NetlistComponent( NetlistComponent(name=name, nodes=[node_p, node_n], value=value, params=params)
name=name, nodes=[node_p, node_n], value=value, params=params
)
) )
return self return self
@ -94,9 +84,7 @@ class Netlist:
sin: (Voffset, Vamp, Freq, Td, Theta, Phi) sin: (Voffset, Vamp, Freq, Td, Theta, Phi)
""" """
value = self._build_source_value(dc=dc, ac=ac, pulse=pulse, sin=sin) value = self._build_source_value(dc=dc, ac=ac, pulse=pulse, sin=sin)
self.components.append( self.components.append(NetlistComponent(name=name, nodes=[node_p, node_n], value=value))
NetlistComponent(name=name, nodes=[node_p, node_n], value=value)
)
return self return self
def add_current_source( def add_current_source(
@ -109,20 +97,14 @@ class Netlist:
) -> "Netlist": ) -> "Netlist":
"""Add a current source.""" """Add a current source."""
value = self._build_source_value(dc=dc, ac=ac) value = self._build_source_value(dc=dc, ac=ac)
self.components.append( self.components.append(NetlistComponent(name=name, nodes=[node_p, node_n], value=value))
NetlistComponent(name=name, nodes=[node_p, node_n], value=value)
)
return self return self
# -- Semiconductors ------------------------------------------------------- # -- Semiconductors -------------------------------------------------------
def add_diode( def add_diode(self, name: str, anode: str, cathode: str, model: str) -> "Netlist":
self, name: str, anode: str, cathode: str, model: str
) -> "Netlist":
"""Add a diode. Example: add_diode('D1', 'a', 'k', '1N4148')""" """Add a diode. Example: add_diode('D1', 'a', 'k', '1N4148')"""
self.components.append( self.components.append(NetlistComponent(name=name, nodes=[anode, cathode], value=model))
NetlistComponent(name=name, nodes=[anode, cathode], value=model)
)
return self return self
def add_mosfet( def add_mosfet(
@ -134,14 +116,14 @@ class Netlist:
body: str, body: str,
model: str, model: str,
w: str | None = None, w: str | None = None,
l: str | None = None, length: str | None = None,
) -> "Netlist": ) -> "Netlist":
"""Add a MOSFET.""" """Add a MOSFET."""
params_parts: list[str] = [] params_parts: list[str] = []
if w: if w:
params_parts.append(f"W={w}") params_parts.append(f"W={w}")
if l: if length:
params_parts.append(f"L={l}") params_parts.append(f"L={length}")
params = " ".join(params_parts) params = " ".join(params_parts)
self.components.append( self.components.append(
NetlistComponent( NetlistComponent(
@ -153,14 +135,10 @@ class Netlist:
) )
return self return self
def add_bjt( def add_bjt(self, name: str, collector: str, base: str, emitter: str, model: str) -> "Netlist":
self, name: str, collector: str, base: str, emitter: str, model: str
) -> "Netlist":
"""Add a BJT transistor.""" """Add a BJT transistor."""
self.components.append( self.components.append(
NetlistComponent( NetlistComponent(name=name, nodes=[collector, base, emitter], value=model)
name=name, nodes=[collector, base, emitter], value=model
)
) )
return self return self
@ -182,31 +160,21 @@ class Netlist:
X<name> <inp> <inn> <vpos> <vneg> <out> <model> X<name> <inp> <inn> <vpos> <vneg> <out> <model>
""" """
self.components.append( self.components.append(
NetlistComponent( NetlistComponent(name=name, nodes=[inp, inn, vpos, vneg, out], value=model)
name=name, nodes=[inp, inn, vpos, vneg, out], value=model
)
) )
return self return self
def add_subcircuit( def add_subcircuit(self, name: str, nodes: list[str], model: str) -> "Netlist":
self, name: str, nodes: list[str], model: str
) -> "Netlist":
"""Add a generic subcircuit instance.""" """Add a generic subcircuit instance."""
self.components.append( self.components.append(NetlistComponent(name=name, nodes=list(nodes), value=model))
NetlistComponent(name=name, nodes=list(nodes), value=model)
)
return self return self
# -- Generic component ---------------------------------------------------- # -- Generic component ----------------------------------------------------
def add_component( def add_component(self, name: str, nodes: list[str], value: str, params: str = "") -> "Netlist":
self, name: str, nodes: list[str], value: str, params: str = ""
) -> "Netlist":
"""Add any component with explicit nodes.""" """Add any component with explicit nodes."""
self.components.append( self.components.append(
NetlistComponent( NetlistComponent(name=name, nodes=list(nodes), value=value, params=params)
name=name, nodes=list(nodes), value=value, params=params
)
) )
return self return self
@ -381,3 +349,326 @@ def inverting_amplifier(
.add_opamp("X1", "0", "inv", "out", "vdd", "vss", opamp_model) .add_opamp("X1", "0", "inv", "out", "vdd", "vss", opamp_model)
.add_directive(".ac dec 100 1 1meg") .add_directive(".ac dec 100 1 1meg")
) )
def non_inverting_amplifier(
r_in: str = "10k",
r_f: str = "100k",
opamp_model: str = "LT1001",
) -> Netlist:
"""Create a non-inverting op-amp amplifier.
Topology:
V1 --> non-inv(+)
inv(-) --[R_in]--> GND
inv(-) --[R_f]--> out
Supply: +/-15V
Gain = 1 + R_f / R_in
"""
return (
Netlist("Non-Inverting Amplifier")
.add_comment(f"Gain = 1 + {r_f}/{r_in}")
.add_lib(opamp_model)
.add_voltage_source("V1", "in", "0", ac="1")
.add_voltage_source("Vpos", "vdd", "0", dc="15")
.add_voltage_source("Vneg", "0", "vss", dc="15")
.add_resistor("Rin", "inv", "0", r_in)
.add_resistor("Rf", "inv", "out", r_f)
.add_opamp("X1", "in", "inv", "out", "vdd", "vss", opamp_model)
.add_directive(".ac dec 100 1 1meg")
)
def differential_amplifier(
r1: str = "10k",
r2: str = "10k",
r3: str = "10k",
r4: str = "10k",
opamp_model: str = "LT1001",
) -> Netlist:
"""Create a classic differential amplifier.
Topology:
V1 --[R1]--> inv(-) --[R2]--> out
V2 --[R3]--> non-inv(+)
non-inv(+) --[R4]--> GND
Supply: +/-15V
Vout = (R2/R1) * (V2 - V1) when R2/R1 = R4/R3
"""
return (
Netlist("Differential Amplifier")
.add_comment(f"Vout = ({r2}/{r1}) * (V2 - V1) when R2/R1 = R4/R3")
.add_lib(opamp_model)
.add_voltage_source("V1", "in1", "0", ac="1")
.add_voltage_source("V2", "in2", "0", ac="1")
.add_voltage_source("Vpos", "vdd", "0", dc="15")
.add_voltage_source("Vneg", "0", "vss", dc="15")
.add_resistor("R1", "in1", "inv", r1)
.add_resistor("R2", "inv", "out", r2)
.add_resistor("R3", "in2", "noninv", r3)
.add_resistor("R4", "noninv", "0", r4)
.add_opamp("X1", "noninv", "inv", "out", "vdd", "vss", opamp_model)
.add_directive(".ac dec 100 1 1meg")
)
def common_emitter_amplifier(
rc: str = "2.2k",
rb1: str = "56k",
rb2: str = "12k",
re: str = "1k",
cc1: str = "10u",
cc2: str = "10u",
ce: str = "47u",
vcc: str = "12",
bjt_model: str = "2N2222",
) -> Netlist:
"""Create a common-emitter amplifier with voltage divider bias.
Topology:
VCC --[RC]--> collector --> [CC2] --> out
VCC --[RB1]--> base
base --[RB2]--> GND
emitter --[RE]--> GND
emitter --[CE]--> GND (bypass cap)
in --[CC1]--> base (input coupling cap)
"""
return (
Netlist("Common Emitter Amplifier")
.add_comment("Voltage divider bias with emitter bypass cap")
.add_lib(bjt_model)
.add_voltage_source("Vcc", "vcc", "0", dc=vcc)
.add_voltage_source("Vin", "in", "0", ac="1", sin=("0", "10m", "1k"))
.add_resistor("RC", "vcc", "collector", rc)
.add_resistor("RB1", "vcc", "base", rb1)
.add_resistor("RB2", "base", "0", rb2)
.add_resistor("RE", "emitter", "0", re)
.add_capacitor("CC1", "in", "base", cc1)
.add_capacitor("CC2", "collector", "out", cc2)
.add_capacitor("CE", "emitter", "0", ce)
.add_bjt("Q1", "collector", "base", "emitter", bjt_model)
.add_directive(".tran 5m")
.add_directive(".ac dec 100 10 10meg")
)
def buck_converter(
ind: str = "10u",
c_out: str = "100u",
r_load: str = "10",
v_in: str = "12",
duty_cycle: float = 0.5,
freq: str = "100k",
mosfet_model: str = "IRF540N",
diode_model: str = "1N5819",
) -> Netlist:
"""Create a buck (step-down) converter.
Topology:
V_in --> MOSFET (high-side switch) --> sw node
sw --> [L] --> out
sw --> Diode (cathode) --> GND (freewheeling)
out --> [C_out] --> GND
out --> [R_load] --> GND
Gate driven by PULSE source at specified frequency and duty cycle.
"""
# Compute timing from duty cycle and frequency
# freq string may use SPICE suffixes; keep it as-is for the netlist.
# For the PULSE source we need numeric period and on-time.
freq_hz = _parse_spice_value(freq)
period = 1.0 / freq_hz
t_on = period * duty_cycle
t_rise = period * 0.01 # 1% of period
t_fall = t_rise
return (
Netlist("Buck Converter")
.add_comment(f"Duty cycle = {duty_cycle:.0%}, Fsw = {freq}")
.add_lib(mosfet_model)
.add_lib(diode_model)
.add_voltage_source("Vin", "vin", "0", dc=v_in)
.add_voltage_source(
"Vgate",
"gate",
"0",
pulse=(
"0",
v_in,
"0",
f"{t_rise:.4g}",
f"{t_fall:.4g}",
f"{t_on:.4g}",
f"{period:.4g}",
),
)
.add_mosfet("M1", "vin", "gate", "sw", "sw", mosfet_model)
.add_diode("D1", "0", "sw", diode_model)
.add_inductor("L1", "sw", "out", ind)
.add_capacitor("Cout", "out", "0", c_out)
.add_resistor("Rload", "out", "0", r_load)
.add_directive(f".tran {period * 200:.4g}")
)
def ldo_regulator(
opamp_model: str = "LT1001",
r1: str = "10k",
r2: str = "10k",
pass_transistor: str = "IRF9540N",
v_in: str = "8",
v_ref: str = "2.5",
) -> Netlist:
"""Create a simple LDO voltage regulator.
Topology:
V_in --> PMOS pass transistor (source) --> out (drain)
Error amp: non-inv(+) = V_ref, inv(-) = feedback
Feedback divider: out --[R1]--> fb --[R2]--> GND
Error amp output drives PMOS gate
Vout = V_ref * (1 + R1/R2)
"""
return (
Netlist("LDO Regulator")
.add_comment(f"Vout = {v_ref} * (1 + {r1}/{r2})")
.add_lib(opamp_model)
.add_lib(pass_transistor)
.add_voltage_source("Vin", "vin", "0", dc=v_in)
.add_voltage_source("Vref", "vref", "0", dc=v_ref)
.add_voltage_source("Vpos", "vdd", "0", dc=v_in)
.add_voltage_source("Vneg", "0", "vss", dc=v_in)
.add_mosfet("M1", "out", "gate", "vin", "vin", pass_transistor)
.add_opamp("X1", "vref", "fb", "gate", "vdd", "vss", opamp_model)
.add_resistor("R1", "out", "fb", r1)
.add_resistor("R2", "fb", "0", r2)
.add_resistor("Rload", "out", "0", "100")
.add_capacitor("Cout", "out", "0", "10u")
.add_directive(".tran 10m")
)
def colpitts_oscillator(
ind: str = "1u",
c1: str = "100p",
c2: str = "100p",
rb: str = "47k",
rc: str = "1k",
re: str = "470",
vcc: str = "12",
bjt_model: str = "2N2222",
) -> Netlist:
"""Create a Colpitts oscillator.
Topology:
VCC --[RC]--> collector
collector --[L]--> tank
tank --[C1]--> base
tank --[C2]--> GND
base --[RB]--> VCC (bias)
emitter --[RE]--> GND
Output taken at the collector.
The oscillation frequency is approximately:
f = 1 / (2*pi*sqrt(L * C1*C2/(C1+C2)))
"""
return (
Netlist("Colpitts Oscillator")
.add_comment("f ~ 1 / (2*pi*sqrt(L * Cseries))")
.add_lib(bjt_model)
.add_voltage_source("Vcc", "vcc", "0", dc=vcc)
.add_resistor("RC", "vcc", "collector", rc)
.add_resistor("RB", "vcc", "base", rb)
.add_resistor("RE", "emitter", "0", re)
.add_inductor("L1", "collector", "tank", ind)
.add_capacitor("C1", "tank", "base", c1)
.add_capacitor("C2", "tank", "0", c2)
.add_bjt("Q1", "collector", "base", "emitter", bjt_model)
.add_directive(".tran 100u")
.add_directive(".ic V(collector)=6")
)
def h_bridge(
v_supply: str = "12",
r_load: str = "10",
mosfet_model: str = "IRF540N",
) -> Netlist:
"""Create an H-bridge motor driver.
Topology:
V+ --[M1 (high-side A)]--> outA --[M3 (low-side A)]--> GND
V+ --[M2 (high-side B)]--> outB --[M4 (low-side B)]--> GND
R_load between outA and outB.
M1 & M4 driven together (forward), M2 & M3 driven together (reverse).
Gate signals are complementary PULSE sources with dead time.
"""
# Drive at 1 kHz with 50% duty, small dead time to prevent shoot-through
period = "1m"
t_on = "450u"
t_dead = "25u"
return (
Netlist("H-Bridge Motor Driver")
.add_comment("Complementary gate drives with dead time")
.add_lib(mosfet_model)
.add_voltage_source("Vsupply", "vcc", "0", dc=v_supply)
# Forward drive: M1 (high-A) and M4 (low-B) on first half
.add_voltage_source(
"Vg_fwd",
"gate_fwd",
"0",
pulse=("0", v_supply, t_dead, "10n", "10n", t_on, period),
)
# Reverse drive: M2 (high-B) and M3 (low-A) on second half
.add_voltage_source(
"Vg_rev",
"gate_rev",
"0",
pulse=("0", v_supply, "525u", "10n", "10n", t_on, period),
)
# High-side A
.add_mosfet("M1", "vcc", "gate_fwd", "outA", "outA", mosfet_model)
# High-side B
.add_mosfet("M2", "vcc", "gate_rev", "outB", "outB", mosfet_model)
# Low-side A
.add_mosfet("M3", "outA", "gate_rev", "0", "0", mosfet_model)
# Low-side B
.add_mosfet("M4", "outB", "gate_fwd", "0", "0", mosfet_model)
.add_resistor("Rload", "outA", "outB", r_load)
.add_directive(".tran 5m")
)
def _parse_spice_value(value: str) -> float:
"""Convert a SPICE-style value string to a float.
Handles common suffixes: T, G, meg, k, m, u, n, p, f.
"""
suffixes = {
"T": 1e12,
"G": 1e9,
"MEG": 1e6,
"K": 1e3,
"M": 1e-3,
"U": 1e-6,
"N": 1e-9,
"P": 1e-12,
"F": 1e-15,
}
value = value.strip()
# Try plain float first
try:
return float(value)
except ValueError:
pass
# Try suffixes (longest match first to catch "meg" before "m")
upper = value.upper()
for suffix, mult in sorted(suffixes.items(), key=lambda x: -len(x[0])):
if upper.endswith(suffix):
num_part = value[: len(value) - len(suffix)]
try:
return float(num_part) * mult
except ValueError:
continue
raise ValueError(f"Cannot parse SPICE value: {value!r}")

View File

@ -0,0 +1,809 @@
"""Component value optimizer for iterative circuit tuning.
Adjusts circuit parameters toward target specifications by running
real LTspice simulations in a loop. Supports single-component binary
search and multi-component coordinate descent.
"""
import logging
import math
import tempfile
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
from .raw_parser import RawFile
from .runner import run_netlist
from .waveform_math import (
compute_bandwidth,
compute_peak_to_peak,
compute_rms,
compute_settling_time,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Standard component value series
# ---------------------------------------------------------------------------
# E12: 12 values per decade (10% tolerance)
_E12 = [1.0, 1.2, 1.5, 1.8, 2.2, 2.7, 3.3, 3.9, 4.7, 5.6, 6.8, 8.2]
# E24: 24 values per decade (5% tolerance)
_E24 = [
1.0,
1.1,
1.2,
1.3,
1.5,
1.6,
1.8,
2.0,
2.2,
2.4,
2.7,
3.0,
3.3,
3.6,
3.9,
4.3,
4.7,
5.1,
5.6,
6.2,
6.8,
7.5,
8.2,
9.1,
]
# E96: 96 values per decade (1% tolerance)
_E96 = [
1.00,
1.02,
1.05,
1.07,
1.10,
1.13,
1.15,
1.18,
1.21,
1.24,
1.27,
1.30,
1.33,
1.37,
1.40,
1.43,
1.47,
1.50,
1.54,
1.58,
1.62,
1.65,
1.69,
1.74,
1.78,
1.82,
1.87,
1.91,
1.96,
2.00,
2.05,
2.10,
2.15,
2.21,
2.26,
2.32,
2.37,
2.43,
2.49,
2.55,
2.61,
2.67,
2.74,
2.80,
2.87,
2.94,
3.01,
3.09,
3.16,
3.24,
3.32,
3.40,
3.48,
3.57,
3.65,
3.74,
3.83,
3.92,
4.02,
4.12,
4.22,
4.32,
4.42,
4.53,
4.64,
4.75,
4.87,
4.99,
5.11,
5.23,
5.36,
5.49,
5.62,
5.76,
5.90,
6.04,
6.19,
6.34,
6.49,
6.65,
6.81,
6.98,
7.15,
7.32,
7.50,
7.68,
7.87,
8.06,
8.25,
8.45,
8.66,
8.87,
9.09,
9.31,
9.53,
9.76,
]
_SERIES = {
"E12": _E12,
"E24": _E24,
"E96": _E96,
}
# Engineering prefixes: exponent -> suffix
_ENG_PREFIXES = {
-15: "f",
-12: "p",
-9: "n",
-6: "u",
-3: "m",
0: "",
3: "k",
6: "M",
9: "G",
12: "T",
}
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class OptimizationTarget:
"""A single performance target to optimize toward."""
signal_name: str # e.g. "V(out)"
metric: str # "bandwidth_hz", "rms", "peak_to_peak", "settling_time",
# "gain_db", "phase_margin_deg"
target_value: float
weight: float = 1.0
@dataclass
class ComponentRange:
"""Allowed range for a component value during optimization."""
component_name: str # e.g. "R1"
min_value: float
max_value: float
preferred_series: str | None = None # "E12", "E24", "E96"
@dataclass
class OptimizationResult:
"""Outcome of an optimization run."""
best_values: dict[str, float]
best_cost: float
iterations: int
history: list[dict] = field(default_factory=list)
targets_met: dict[str, bool] = field(default_factory=dict)
final_metrics: dict[str, float] = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def snap_to_preferred(value: float, series: str = "E12") -> float:
"""Snap a value to the nearest standard component value.
Works across decades -- e.g. 4800 snaps to 4.7k (E12).
Args:
value: Raw component value in base units (ohms, farads, etc.)
series: Preferred series name ("E12", "E24", "E96")
Returns:
Nearest standard value from the requested series.
"""
if value <= 0:
return _SERIES.get(series, _E12)[0]
base_values = _SERIES.get(series, _E12)
decade = math.floor(math.log10(value))
mantissa = value / (10**decade)
best = base_values[0]
best_ratio = abs(math.log10(mantissa / best))
for bv in base_values[1:]:
ratio = abs(math.log10(mantissa / bv))
if ratio < best_ratio:
best = bv
best_ratio = ratio
# Also check the decade above (mantissa might round up past 9.x)
for bv in base_values[:2]:
candidate = bv * 10
ratio = abs(math.log10(mantissa / candidate))
if ratio < best_ratio:
best = candidate
best_ratio = ratio
return best * (10**decade)
def format_engineering(value: float) -> str:
"""Format a float in engineering notation with SI suffix.
Examples:
10000 -> "10k"
0.0001 -> "100u"
4700 -> "4.7k"
0.0000033 -> "3.3u"
1.5 -> "1.5"
"""
if value == 0:
return "0"
sign = "-" if value < 0 else ""
value = abs(value)
exp = math.floor(math.log10(value))
# Round to nearest multiple of 3 (downward)
eng_exp = (exp // 3) * 3
# Clamp to known prefixes
eng_exp = max(-15, min(12, eng_exp))
mantissa = value / (10**eng_exp)
suffix = _ENG_PREFIXES.get(eng_exp, f"e{eng_exp}")
# Format mantissa: strip trailing zeros but keep at least one digit
if mantissa == int(mantissa) and mantissa < 1000:
formatted = f"{sign}{int(mantissa)}{suffix}"
else:
formatted = f"{sign}{mantissa:.3g}{suffix}"
return formatted
# ---------------------------------------------------------------------------
# Metric extraction
# ---------------------------------------------------------------------------
def _extract_metric(
raw_data: RawFile,
target: OptimizationTarget,
) -> float | None:
"""Compute a single metric from simulation results.
Returns the measured value, or None if the signal/data is unavailable.
"""
signal = raw_data.get_variable(target.signal_name)
if signal is None:
return None
metric = target.metric
if metric == "rms":
return compute_rms(signal)
if metric == "peak_to_peak":
result = compute_peak_to_peak(signal)
return result["peak_to_peak"]
if metric == "settling_time":
time = raw_data.get_time()
if time is None:
return None
if np.iscomplexobj(time):
time = time.real
if np.iscomplexobj(signal):
signal = np.abs(signal)
result = compute_settling_time(time, signal)
return result["settling_time"]
if metric == "bandwidth_hz":
freq = raw_data.get_frequency()
if freq is None:
return None
# Convert complex magnitude to dB
mag_db = np.where(
np.abs(signal) > 0,
20.0 * np.log10(np.abs(signal)),
-200.0,
)
result = compute_bandwidth(np.real(freq), np.real(mag_db))
return result["bandwidth_hz"]
if metric == "gain_db":
# Peak gain in dB (for AC analysis signals)
mag = np.abs(signal)
peak = float(np.max(mag))
if peak > 0:
return 20.0 * math.log10(peak)
return -200.0
if metric == "phase_margin_deg":
freq = raw_data.get_frequency()
if freq is None:
return None
# Phase margin: phase at the frequency where |gain| crosses 0 dB
mag = np.abs(signal)
mag_db = np.where(mag > 0, 20.0 * np.log10(mag), -200.0)
phase_deg = np.degrees(np.angle(signal))
# Find 0 dB crossing (gain crossover)
for i in range(len(mag_db) - 1):
if mag_db[i] >= 0 and mag_db[i + 1] < 0:
# Interpolate phase at 0 dB crossing
dm = mag_db[i + 1] - mag_db[i]
if abs(dm) < 1e-30:
phase_at_xover = float(phase_deg[i])
else:
frac = (0.0 - mag_db[i]) / dm
phase_at_xover = float(phase_deg[i] + frac * (phase_deg[i + 1] - phase_deg[i]))
# Phase margin = 180 + phase (since we want distance from -180)
return 180.0 + phase_at_xover
# No 0 dB crossing found -- gain never reaches unity
return None
return None
def _compute_cost(
raw_data: RawFile,
targets: list[OptimizationTarget],
) -> tuple[float, dict[str, float]]:
"""Evaluate weighted cost across all targets.
Cost for each target: weight * |measured - target| / |target|
Uses |target| normalization so different units are comparable.
Returns:
(total_cost, metrics_dict) where metrics_dict maps
"signal_name.metric" -> measured_value
"""
total_cost = 0.0
metrics: dict[str, float] = {}
for t in targets:
measured = _extract_metric(raw_data, t)
key = f"{t.signal_name}.{t.metric}"
if measured is None:
# Signal not found -- heavy penalty
total_cost += t.weight * 1e6
metrics[key] = float("nan")
continue
metrics[key] = measured
if abs(t.target_value) > 1e-30:
cost = t.weight * abs(measured - t.target_value) / abs(t.target_value)
else:
# Target is ~0: use absolute error
cost = t.weight * abs(measured)
total_cost += cost
return total_cost, metrics
# ---------------------------------------------------------------------------
# Core optimizer
# ---------------------------------------------------------------------------
async def _run_with_values(
netlist_template: str,
values: dict[str, float],
work_dir: Path,
) -> RawFile | None:
"""Substitute values into template, write .cir, simulate, return parsed data."""
text = netlist_template
for name, val in values.items():
text = text.replace(f"{{{name}}}", format_engineering(val))
cir_path = work_dir / "opt_iter.cir"
cir_path.write_text(text)
result = await run_netlist(cir_path, work_dir=work_dir)
if not result.success or result.raw_data is None:
logger.warning("Simulation failed: %s", result.error)
return None
return result.raw_data
async def _binary_search_single(
netlist_template: str,
target: OptimizationTarget,
comp: ComponentRange,
max_iterations: int,
work_dir: Path,
) -> OptimizationResult:
"""Optimize a single component via binary search.
Assumes the metric is monotonic (or at least locally monotonic)
with respect to the component value. Evaluates the midpoint,
then narrows the half that moves the metric toward the target.
"""
lo = comp.min_value
hi = comp.max_value
history: list[dict] = []
best_values = {comp.component_name: (lo + hi) / 2}
best_cost = float("inf")
best_metrics: dict[str, float] = {}
for iteration in range(max_iterations):
mid = (lo + hi) / 2
values = {comp.component_name: mid}
raw_data = await _run_with_values(netlist_template, values, work_dir)
if raw_data is None:
history.append(
{
"iteration": iteration,
"values": {comp.component_name: mid},
"cost": float("inf"),
"error": "simulation_failed",
}
)
# Narrow toward the other half on failure
hi = mid
continue
cost, metrics = _compute_cost(raw_data, [target])
measured = metrics.get(f"{target.signal_name}.{target.metric}")
history.append(
{
"iteration": iteration,
"values": {comp.component_name: mid},
"cost": cost,
"metrics": metrics,
}
)
if cost < best_cost:
best_cost = cost
best_values = {comp.component_name: mid}
best_metrics = metrics
# Decide which half to keep
if measured is not None and not math.isnan(measured):
if measured < target.target_value:
# Need larger metric -- which direction depends on monotonicity.
# Test: does increasing component value increase or decrease metric?
# We probe a point slightly above mid to determine direction.
probe = mid * 1.1
if probe > hi:
probe = mid * 0.9
probe_is_lower = True
else:
probe_is_lower = False
probe_data = await _run_with_values(
netlist_template, {comp.component_name: probe}, work_dir
)
if probe_data is not None:
probe_measured = _extract_metric(probe_data, target)
if probe_measured is not None:
if probe_is_lower:
# We probed lower. If metric went up, metric increases
# with decreasing value => go lower.
if probe_measured > measured:
hi = mid
else:
lo = mid
else:
# We probed higher. If metric went up, go higher.
if probe_measured > measured:
lo = mid
else:
hi = mid
else:
hi = mid
else:
hi = mid
elif measured > target.target_value:
# Need smaller metric -- mirror logic
probe = mid * 1.1
if probe > hi:
probe = mid * 0.9
probe_is_lower = True
else:
probe_is_lower = False
probe_data = await _run_with_values(
netlist_template, {comp.component_name: probe}, work_dir
)
if probe_data is not None:
probe_measured = _extract_metric(probe_data, target)
if probe_measured is not None:
if probe_is_lower:
if probe_measured < measured:
hi = mid
else:
lo = mid
else:
if probe_measured < measured:
lo = mid
else:
hi = mid
else:
lo = mid
else:
lo = mid
else:
# Exact match
break
# Converged if search range is tight enough
if hi - lo < lo * 0.001:
break
# Snap to preferred series if requested
final_val = best_values[comp.component_name]
if comp.preferred_series:
final_val = snap_to_preferred(final_val, comp.preferred_series)
best_values[comp.component_name] = final_val
target_key = f"{target.signal_name}.{target.metric}"
met = best_metrics.get(target_key)
tolerance = abs(target.target_value * 0.05) if abs(target.target_value) > 0 else 0.05
targets_met = {
target_key: met is not None and abs(met - target.target_value) <= tolerance,
}
return OptimizationResult(
best_values=best_values,
best_cost=best_cost,
iterations=len(history),
history=history,
targets_met=targets_met,
final_metrics=best_metrics,
)
async def _coordinate_descent(
netlist_template: str,
targets: list[OptimizationTarget],
component_ranges: list[ComponentRange],
max_iterations: int,
work_dir: Path,
) -> OptimizationResult:
"""Optimize multiple components via coordinate descent.
Cycles through components, running a bounded golden-section search
on each one while holding the others fixed. Repeats until the
overall cost stops improving or we exhaust the iteration budget.
"""
# Start at geometric midpoint of each range
current_values: dict[str, float] = {}
for cr in component_ranges:
current_values[cr.component_name] = math.sqrt(cr.min_value * cr.max_value)
history: list[dict] = []
best_cost = float("inf")
best_values = dict(current_values)
best_metrics: dict[str, float] = {}
iteration = 0
n_comps = len(component_ranges)
# Golden ratio for search
phi = (1 + math.sqrt(5)) / 2
resphi = 2 - phi # ~0.382
while iteration < max_iterations:
improved_this_cycle = False
for cr in component_ranges:
if iteration >= max_iterations:
break
lo = cr.min_value
hi = cr.max_value
# Golden-section search for this component
# Allocate a few iterations per component per cycle
iters_per_comp = max(2, (max_iterations - iteration) // n_comps)
a = lo
b = hi
x1 = a + resphi * (b - a)
x2 = b - resphi * (b - a)
# Evaluate x1
trial_1 = dict(current_values)
trial_1[cr.component_name] = x1
raw_1 = await _run_with_values(netlist_template, trial_1, work_dir)
cost_1 = float("inf")
metrics_1: dict[str, float] = {}
if raw_1 is not None:
cost_1, metrics_1 = _compute_cost(raw_1, targets)
iteration += 1
# Evaluate x2
trial_2 = dict(current_values)
trial_2[cr.component_name] = x2
raw_2 = await _run_with_values(netlist_template, trial_2, work_dir)
cost_2 = float("inf")
metrics_2: dict[str, float] = {}
if raw_2 is not None:
cost_2, metrics_2 = _compute_cost(raw_2, targets)
iteration += 1
for _ in range(iters_per_comp - 2):
if iteration >= max_iterations:
break
if cost_1 < cost_2:
b = x2
x2 = x1
cost_2 = cost_1
metrics_2 = metrics_1
x1 = a + resphi * (b - a)
trial = dict(current_values)
trial[cr.component_name] = x1
raw = await _run_with_values(netlist_template, trial, work_dir)
if raw is not None:
cost_1, metrics_1 = _compute_cost(raw, targets)
else:
cost_1 = float("inf")
metrics_1 = {}
else:
a = x1
x1 = x2
cost_1 = cost_2
metrics_1 = metrics_2
x2 = b - resphi * (b - a)
trial = dict(current_values)
trial[cr.component_name] = x2
raw = await _run_with_values(netlist_template, trial, work_dir)
if raw is not None:
cost_2, metrics_2 = _compute_cost(raw, targets)
else:
cost_2 = float("inf")
metrics_2 = {}
iteration += 1
# Pick the better of the final two
if cost_1 <= cost_2:
chosen_val = x1
chosen_cost = cost_1
chosen_metrics = metrics_1
else:
chosen_val = x2
chosen_cost = cost_2
chosen_metrics = metrics_2
# Snap to preferred series
if cr.preferred_series:
chosen_val = snap_to_preferred(chosen_val, cr.preferred_series)
current_values[cr.component_name] = chosen_val
history.append(
{
"iteration": iteration,
"optimized_component": cr.component_name,
"values": dict(current_values),
"cost": chosen_cost,
"metrics": chosen_metrics,
}
)
if chosen_cost < best_cost:
best_cost = chosen_cost
best_values = dict(current_values)
best_metrics = chosen_metrics
improved_this_cycle = True
if not improved_this_cycle:
break
# Determine which targets are met (within 5%)
targets_met: dict[str, bool] = {}
for t in targets:
key = f"{t.signal_name}.{t.metric}"
measured = best_metrics.get(key)
if measured is None or math.isnan(measured):
targets_met[key] = False
else:
tolerance = abs(t.target_value * 0.05) if abs(t.target_value) > 0 else 0.05
targets_met[key] = abs(measured - t.target_value) <= tolerance
return OptimizationResult(
best_values=best_values,
best_cost=best_cost,
iterations=iteration,
history=history,
targets_met=targets_met,
final_metrics=best_metrics,
)
async def optimize_component_values(
netlist_template: str,
targets: list[OptimizationTarget],
component_ranges: list[ComponentRange],
max_iterations: int = 20,
method: str = "binary_search",
) -> OptimizationResult:
"""Iteratively adjust component values toward target specifications.
Creates temporary .cir files, runs real LTspice simulations via Wine,
and evaluates a cost function against the targets after each run.
Args:
netlist_template: Netlist text with ``{ComponentName}`` placeholders
(e.g. ``{R1}``, ``{C1}``) that get substituted each iteration.
targets: Performance targets to optimize toward.
component_ranges: Allowed ranges for each tunable component.
max_iterations: Maximum simulation iterations (default 20).
method: ``"binary_search"`` for single-component optimization,
``"coordinate_descent"`` for multi-component. When
``"binary_search"`` is requested with multiple components,
coordinate descent is used automatically.
Returns:
OptimizationResult with best values found, cost history,
and whether each target was met.
"""
work_dir = Path(tempfile.mkdtemp(prefix="ltspice_opt_"))
if len(component_ranges) == 1 and len(targets) == 1 and method == "binary_search":
return await _binary_search_single(
netlist_template,
targets[0],
component_ranges[0],
max_iterations,
work_dir,
)
return await _coordinate_descent(
netlist_template,
targets,
component_ranges,
max_iterations,
work_dir,
)

View File

@ -0,0 +1,196 @@
"""Power and efficiency calculations from simulation data."""
import numpy as np
# np.trapz was renamed to np.trapezoid in numpy 2.0
_trapz = getattr(np, "trapezoid", getattr(np, "trapz", None))
def compute_instantaneous_power(voltage: np.ndarray, current: np.ndarray) -> np.ndarray:
"""Element-wise power: P(t) = V(t) * I(t).
Args:
voltage: Voltage waveform array
current: Current waveform array
Returns:
Instantaneous power array (same length as inputs)
"""
return np.real(voltage) * np.real(current)
def compute_average_power(time: np.ndarray, voltage: np.ndarray, current: np.ndarray) -> float:
"""Time-averaged power: P_avg = (1/T) * integral(V*I dt).
Uses trapezoidal integration over the full time span.
Args:
time: Time array in seconds
voltage: Voltage waveform array
current: Current waveform array
Returns:
Average power in watts
"""
t = np.real(time)
duration = t[-1] - t[0]
if len(t) < 2 or duration <= 0:
return 0.0
p_inst = np.real(voltage) * np.real(current)
return float(_trapz(p_inst, t) / duration)
def compute_efficiency(
time: np.ndarray,
input_voltage: np.ndarray,
input_current: np.ndarray,
output_voltage: np.ndarray,
output_current: np.ndarray,
) -> dict:
"""Compute power conversion efficiency.
Args:
time: Time array in seconds
input_voltage: Input voltage waveform
input_current: Input current waveform
output_voltage: Output voltage waveform
output_current: Output current waveform
Returns:
Dict with efficiency_percent, input_power_watts,
output_power_watts, power_dissipated_watts
"""
p_in = compute_average_power(time, input_voltage, input_current)
p_out = compute_average_power(time, output_voltage, output_current)
p_dissipated = p_in - p_out
if abs(p_in) < 1e-15:
efficiency = 0.0
else:
efficiency = (p_out / p_in) * 100.0
return {
"efficiency_percent": efficiency,
"input_power_watts": p_in,
"output_power_watts": p_out,
"power_dissipated_watts": p_dissipated,
}
def compute_power_spectrum(
time: np.ndarray,
voltage: np.ndarray,
current: np.ndarray,
max_harmonics: int = 20,
) -> dict:
"""FFT of instantaneous power to identify ripple frequencies.
Args:
time: Time array in seconds
voltage: Voltage waveform array
current: Current waveform array
max_harmonics: Maximum number of frequency bins to return
Returns:
Dict with frequencies, power_magnitudes, dominant_freq, dc_power
"""
t = np.real(time)
if len(t) < 2:
return {
"frequencies": [],
"power_magnitudes": [],
"dominant_freq": 0.0,
"dc_power": 0.0,
}
dt = (t[-1] - t[0]) / (len(t) - 1)
if dt <= 0:
return {
"frequencies": [],
"power_magnitudes": [],
"dominant_freq": 0.0,
"dc_power": float(np.mean(np.real(voltage) * np.real(current))),
}
p_inst = np.real(voltage) * np.real(current)
n = len(p_inst)
spectrum = np.fft.rfft(p_inst)
freqs = np.fft.rfftfreq(n, d=dt)
magnitudes = np.abs(spectrum) * 2.0 / n
magnitudes[0] /= 2.0 # DC component correction
dc_power = float(magnitudes[0])
# Dominant AC frequency (largest non-DC bin)
if len(magnitudes) > 1:
dominant_idx = int(np.argmax(magnitudes[1:])) + 1
dominant_freq = float(freqs[dominant_idx])
else:
dominant_freq = 0.0
# Trim to requested harmonics (plus DC)
limit = min(max_harmonics + 1, len(freqs))
freqs = freqs[:limit]
magnitudes = magnitudes[:limit]
return {
"frequencies": freqs.tolist(),
"power_magnitudes": magnitudes.tolist(),
"dominant_freq": dominant_freq,
"dc_power": dc_power,
}
def compute_power_metrics(time: np.ndarray, voltage: np.ndarray, current: np.ndarray) -> dict:
"""Comprehensive power report for a voltage/current pair.
Power factor here is defined as the ratio of real (average) power
to apparent power (Vrms * Irms).
Args:
time: Time array in seconds
voltage: Voltage waveform array
current: Current waveform array
Returns:
Dict with avg_power, rms_power, peak_power, min_power, power_factor
"""
v = np.real(voltage)
i = np.real(current)
p_inst = v * i
if len(p_inst) == 0:
return {
"avg_power": 0.0,
"rms_power": 0.0,
"peak_power": 0.0,
"min_power": 0.0,
"power_factor": 0.0,
}
avg_power = compute_average_power(time, voltage, current)
rms_power = float(np.sqrt(np.mean(p_inst**2)))
peak_power = float(np.max(p_inst))
min_power = float(np.min(p_inst))
# Power factor = avg power / apparent power (Vrms * Irms)
v_rms = float(np.sqrt(np.mean(v**2)))
i_rms = float(np.sqrt(np.mean(i**2)))
apparent = v_rms * i_rms
if apparent < 1e-15:
power_factor = 0.0
else:
power_factor = avg_power / apparent
return {
"avg_power": avg_power,
"rms_power": rms_power,
"peak_power": peak_power,
"min_power": min_power,
"power_factor": power_factor,
}

View File

@ -15,6 +15,7 @@ import numpy as np
@dataclass @dataclass
class Variable: class Variable:
"""A variable (signal) in the raw file.""" """A variable (signal) in the raw file."""
index: int index: int
name: str name: str
type: str # e.g., "voltage", "current", "time", "frequency" type: str # e.g., "voltage", "current", "time", "frequency"
@ -23,6 +24,7 @@ class Variable:
@dataclass @dataclass
class RawFile: class RawFile:
"""Parsed LTspice .raw file.""" """Parsed LTspice .raw file."""
title: str title: str
date: str date: str
plotname: str plotname: str
@ -30,22 +32,65 @@ class RawFile:
variables: list[Variable] variables: list[Variable]
points: int points: int
data: np.ndarray # Shape: (n_variables, n_points) data: np.ndarray # Shape: (n_variables, n_points)
n_runs: int = 1 # Number of runs (>1 for .step/.mc/.temp)
run_boundaries: list[int] | None = None # Start index of each run
def get_variable(self, name: str) -> np.ndarray | None: def get_variable(self, name: str, run: int | None = None) -> np.ndarray | None:
"""Get data for a variable by name (case-insensitive partial match).""" """Get data for a variable by name (case-insensitive partial match).
Args:
name: Signal name (partial match OK)
run: If stepped data, return only this run (0-indexed). None = all data.
"""
name_lower = name.lower() name_lower = name.lower()
for var in self.variables: for var in self.variables:
if name_lower in var.name.lower(): if name_lower in var.name.lower():
return self.data[var.index] arr = self.data[var.index]
if run is not None and self.run_boundaries:
start, end = self._run_slice(run)
return arr[start:end]
return arr
return None return None
def get_time(self) -> np.ndarray | None: def get_time(self, run: int | None = None) -> np.ndarray | None:
"""Get the time axis (for transient analysis).""" """Get the time axis (for transient analysis)."""
return self.get_variable("time") return self.get_variable("time", run=run)
def get_frequency(self) -> np.ndarray | None: def get_frequency(self, run: int | None = None) -> np.ndarray | None:
"""Get the frequency axis (for AC analysis).""" """Get the frequency axis (for AC analysis)."""
return self.get_variable("frequency") return self.get_variable("frequency", run=run)
def get_run_data(self, run: int) -> "RawFile":
"""Extract a single run as a new RawFile (for stepped simulations)."""
if not self.run_boundaries or run >= self.n_runs:
return self
start, end = self._run_slice(run)
return RawFile(
title=self.title,
date=self.date,
plotname=self.plotname,
flags=self.flags,
variables=self.variables,
points=end - start,
data=self.data[:, start:end].copy(),
n_runs=1,
run_boundaries=None,
)
@property
def is_stepped(self) -> bool:
return self.n_runs > 1
def _run_slice(self, run: int) -> tuple[int, int]:
"""Get (start, end) indices for a given run."""
if not self.run_boundaries:
return 0, self.points
start = self.run_boundaries[run]
if run + 1 < len(self.run_boundaries):
end = self.run_boundaries[run + 1]
else:
end = self.data.shape[1]
return start, end
def parse_raw_file(path: Path | str) -> RawFile: def parse_raw_file(path: Path | str) -> RawFile:
@ -68,7 +113,7 @@ def parse_raw_file(path: Path | str) -> RawFile:
# Detect encoding: UTF-16 LE (Windows) vs ASCII # Detect encoding: UTF-16 LE (Windows) vs ASCII
# UTF-16 LE has null bytes between characters # UTF-16 LE has null bytes between characters
is_utf16 = content[1:2] == b'\x00' and content[3:4] == b'\x00' is_utf16 = content[1:2] == b"\x00" and content[3:4] == b"\x00"
if is_utf16: if is_utf16:
# UTF-16 LE encoding - decode header portion, find Binary marker # UTF-16 LE encoding - decode header portion, find Binary marker
@ -106,7 +151,6 @@ def parse_raw_file(path: Path | str) -> RawFile:
points = 0 points = 0
in_variables = False in_variables = False
var_count = 0
for line in header_lines: for line in header_lines:
line = line.strip() line = line.strip()
@ -120,12 +164,17 @@ def parse_raw_file(path: Path | str) -> RawFile:
elif line.startswith("Flags:"): elif line.startswith("Flags:"):
flags = line[6:].strip().split() flags = line[6:].strip().split()
elif line.startswith("No. Variables:"): elif line.startswith("No. Variables:"):
var_count = int(line[14:].strip()) pass # Parsed from Variables section instead
elif line.startswith("No. Points:"): elif line.startswith("No. Points:"):
points = int(line[11:].strip()) points = int(line[11:].strip())
elif line.startswith("Variables:"): elif line.startswith("Variables:"):
in_variables = True in_variables = True
elif in_variables and line and not line.startswith("Binary") and not line.startswith("Values"): elif (
in_variables
and line
and not line.startswith("Binary")
and not line.startswith("Values")
):
# Parse variable line: "index name type" # Parse variable line: "index name type"
parts = line.split() parts = line.split()
if len(parts) >= 3: if len(parts) >= 3:
@ -143,27 +192,36 @@ def parse_raw_file(path: Path | str) -> RawFile:
is_complex = "complex" in flags is_complex = "complex" in flags
is_stepped = "stepped" in flags is_stepped = "stepped" in flags
is_forward = "forward" in flags # FastAccess format # "forward" flag indicates FastAccess format (unused for now)
if is_binary: if is_binary:
data = _parse_binary_data(data_bytes, n_vars, points, is_complex) data = _parse_binary_data(data_bytes, n_vars, points, is_complex)
else: else:
data = _parse_ascii_data(data_bytes.decode("utf-8"), n_vars, points, is_complex) data = _parse_ascii_data(data_bytes.decode("utf-8"), n_vars, points, is_complex)
# Detect run boundaries for stepped simulations
n_runs = 1
run_boundaries = None
if is_stepped and data.shape[1] > 1:
run_boundaries = _detect_run_boundaries(data[0])
n_runs = len(run_boundaries)
actual_points = data.shape[1]
return RawFile( return RawFile(
title=title, title=title,
date=date, date=date,
plotname=plotname, plotname=plotname,
flags=flags, flags=flags,
variables=variables, variables=variables,
points=points, points=actual_points,
data=data, data=data,
n_runs=n_runs,
run_boundaries=run_boundaries,
) )
def _parse_binary_data( def _parse_binary_data(data: bytes, n_vars: int, points: int, is_complex: bool) -> np.ndarray:
data: bytes, n_vars: int, points: int, is_complex: bool
) -> np.ndarray:
"""Parse binary data section. """Parse binary data section.
LTspice binary formats: LTspice binary formats:
@ -178,7 +236,7 @@ def _parse_binary_data(
actual_points = len(data) // bytes_per_point actual_points = len(data) // bytes_per_point
# Read as flat complex128 array and reshape # Read as flat complex128 array and reshape
flat = np.frombuffer(data[:actual_points * bytes_per_point], dtype=np.complex128) flat = np.frombuffer(data[: actual_points * bytes_per_point], dtype=np.complex128)
return flat.reshape(actual_points, n_vars).T.copy() return flat.reshape(actual_points, n_vars).T.copy()
# Real data - detect format from data size # Real data - detect format from data size
@ -197,12 +255,12 @@ def _parse_binary_data(
for p in range(actual_points): for p in range(actual_points):
# Time as double # Time as double
result[0, p] = struct.unpack("<d", data[offset:offset + 8])[0] result[0, p] = struct.unpack("<d", data[offset : offset + 8])[0]
offset += 8 offset += 8
# Other variables as float32 # Other variables as float32
for v in range(1, n_vars): for v in range(1, n_vars):
result[v, p] = struct.unpack("<f", data[offset:offset + 4])[0] result[v, p] = struct.unpack("<f", data[offset : offset + 4])[0]
offset += 4 offset += 4
return result return result
@ -210,14 +268,12 @@ def _parse_binary_data(
# All double format (older LTspice or specific settings) # All double format (older LTspice or specific settings)
actual_points = len(data) // bytes_per_point_double actual_points = len(data) // bytes_per_point_double
flat = np.frombuffer(data[:actual_points * bytes_per_point_double], dtype=np.float64) flat = np.frombuffer(data[: actual_points * bytes_per_point_double], dtype=np.float64)
# LTspice stores point-by-point # LTspice stores point-by-point
return flat.reshape(actual_points, n_vars).T.copy() return flat.reshape(actual_points, n_vars).T.copy()
def _parse_ascii_data( def _parse_ascii_data(data: str, n_vars: int, points: int, is_complex: bool) -> np.ndarray:
data: str, n_vars: int, points: int, is_complex: bool
) -> np.ndarray:
"""Parse ASCII data section.""" """Parse ASCII data section."""
lines = data.strip().split("\n") lines = data.strip().split("\n")
@ -258,3 +314,22 @@ def _parse_ascii_data(
continue continue
return result return result
def _detect_run_boundaries(x_axis: np.ndarray) -> list[int]:
"""Detect run boundaries in stepped simulation data.
In stepped data, the independent variable (time or frequency) resets
at the start of each new run. We detect this as a point where the
value decreases (time goes backwards) or resets to near-zero.
"""
x = np.real(x_axis) if np.iscomplexobj(x_axis) else x_axis
boundaries = [0] # First run always starts at index 0
for i in range(1, len(x)):
# Detect reset: value drops back to near the starting value
if x[i] <= x[0] and x[i] < x[i - 1]:
boundaries.append(i)
return boundaries

View File

@ -19,6 +19,7 @@ from .raw_parser import RawFile, parse_raw_file
@dataclass @dataclass
class SimulationResult: class SimulationResult:
"""Result of a simulation run.""" """Result of a simulation run."""
success: bool success: bool
raw_file: Path | None raw_file: Path | None
log_file: Path | None log_file: Path | None
@ -47,6 +48,7 @@ async def run_simulation(
SimulationResult with status and data SimulationResult with status and data
""" """
import time import time
start_time = time.monotonic() start_time = time.monotonic()
# Validate installation # Validate installation
@ -121,10 +123,9 @@ async def run_simulation(
try: try:
stdout_bytes, stderr_bytes = await asyncio.wait_for( stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(), process.communicate(), timeout=timeout
timeout=timeout
) )
except asyncio.TimeoutError: except TimeoutError:
process.kill() process.kill()
await process.wait() await process.wait()
return SimulationResult( return SimulationResult(
@ -226,21 +227,33 @@ async def run_netlist(
SimulationResult with status and data SimulationResult with status and data
""" """
import time import time
start_time = time.monotonic() start_time = time.monotonic()
ok, msg = validate_installation() ok, msg = validate_installation()
if not ok: if not ok:
return SimulationResult( return SimulationResult(
success=False, raw_file=None, log_file=None, raw_data=None, success=False,
error=msg, stdout="", stderr="", elapsed_seconds=0, raw_file=None,
log_file=None,
raw_data=None,
error=msg,
stdout="",
stderr="",
elapsed_seconds=0,
) )
netlist_path = Path(netlist_path).resolve() netlist_path = Path(netlist_path).resolve()
if not netlist_path.exists(): if not netlist_path.exists():
return SimulationResult( return SimulationResult(
success=False, raw_file=None, log_file=None, raw_data=None, success=False,
raw_file=None,
log_file=None,
raw_data=None,
error=f"Netlist not found: {netlist_path}", error=f"Netlist not found: {netlist_path}",
stdout="", stderr="", elapsed_seconds=0, stdout="",
stderr="",
elapsed_seconds=0,
) )
temp_dir = None temp_dir = None
@ -277,16 +290,19 @@ async def run_netlist(
try: try:
stdout_bytes, stderr_bytes = await asyncio.wait_for( stdout_bytes, stderr_bytes = await asyncio.wait_for(
process.communicate(), process.communicate(), timeout=timeout
timeout=timeout
) )
except asyncio.TimeoutError: except TimeoutError:
process.kill() process.kill()
await process.wait() await process.wait()
return SimulationResult( return SimulationResult(
success=False, raw_file=None, log_file=None, raw_data=None, success=False,
raw_file=None,
log_file=None,
raw_data=None,
error=f"Simulation timed out after {timeout} seconds", error=f"Simulation timed out after {timeout} seconds",
stdout="", stderr="", stdout="",
stderr="",
elapsed_seconds=time.monotonic() - start_time, elapsed_seconds=time.monotonic() - start_time,
) )
@ -308,8 +324,11 @@ async def run_netlist(
success=False, success=False,
raw_file=None, raw_file=None,
log_file=log_file if log_file.exists() else None, log_file=log_file if log_file.exists() else None,
raw_data=None, error=error_msg, raw_data=None,
stdout=stdout, stderr=stderr, elapsed_seconds=elapsed, error=error_msg,
stdout=stdout,
stderr=stderr,
elapsed_seconds=elapsed,
) )
raw_data = None raw_data = None
@ -326,7 +345,9 @@ async def run_netlist(
log_file=log_file if log_file.exists() else None, log_file=log_file if log_file.exists() else None,
raw_data=raw_data, raw_data=raw_data,
error=parse_error, error=parse_error,
stdout=stdout, stderr=stderr, elapsed_seconds=elapsed, stdout=stdout,
stderr=stderr,
elapsed_seconds=elapsed,
) )
finally: finally:
pass # Keep temp files for debugging pass # Keep temp files for debugging

View File

@ -11,6 +11,7 @@ from pathlib import Path
@dataclass @dataclass
class Component: class Component:
"""A component instance in the schematic.""" """A component instance in the schematic."""
name: str # Instance name (e.g., R1, C1, M1) name: str # Instance name (e.g., R1, C1, M1)
symbol: str # Symbol name (e.g., res, cap, nmos) symbol: str # Symbol name (e.g., res, cap, nmos)
x: int x: int
@ -33,6 +34,7 @@ class Component:
@dataclass @dataclass
class Wire: class Wire:
"""A wire connection.""" """A wire connection."""
x1: int x1: int
y1: int y1: int
x2: int x2: int
@ -42,6 +44,7 @@ class Wire:
@dataclass @dataclass
class Text: class Text:
"""Text annotation or SPICE directive.""" """Text annotation or SPICE directive."""
x: int x: int
y: int y: int
content: str content: str
@ -51,6 +54,7 @@ class Text:
@dataclass @dataclass
class Flag: class Flag:
"""A net flag/label.""" """A net flag/label."""
x: int x: int
y: int y: int
name: str name: str
@ -60,6 +64,7 @@ class Flag:
@dataclass @dataclass
class Schematic: class Schematic:
"""A parsed LTspice schematic.""" """A parsed LTspice schematic."""
version: int = 4 version: int = 4
sheet: tuple[int, int, int, int] = (1, 1, 0, 0) sheet: tuple[int, int, int, int] = (1, 1, 0, 0)
components: list[Component] = field(default_factory=list) components: list[Component] = field(default_factory=list)
@ -113,27 +118,23 @@ def parse_schematic(path: Path | str) -> Schematic:
elif line.startswith("SHEET"): elif line.startswith("SHEET"):
parts = line.split() parts = line.split()
if len(parts) >= 5: if len(parts) >= 5:
schematic.sheet = ( schematic.sheet = (int(parts[1]), int(parts[2]), int(parts[3]), int(parts[4]))
int(parts[1]), int(parts[2]),
int(parts[3]), int(parts[4])
)
elif line.startswith("WIRE"): elif line.startswith("WIRE"):
parts = line.split() parts = line.split()
if len(parts) >= 5: if len(parts) >= 5:
schematic.wires.append(Wire( schematic.wires.append(
int(parts[1]), int(parts[2]), Wire(int(parts[1]), int(parts[2]), int(parts[3]), int(parts[4]))
int(parts[3]), int(parts[4]) )
))
elif line.startswith("FLAG"): elif line.startswith("FLAG"):
parts = line.split() parts = line.split()
if len(parts) >= 4: if len(parts) >= 4:
schematic.flags.append(Flag( schematic.flags.append(
int(parts[1]), int(parts[2]), Flag(
parts[3], int(parts[1]), int(parts[2]), parts[3], parts[4] if len(parts) > 4 else "0"
parts[4] if len(parts) > 4 else "0" )
)) )
elif line.startswith("SYMBOL"): elif line.startswith("SYMBOL"):
# Save previous component before starting a new one # Save previous component before starting a new one
@ -159,11 +160,7 @@ def parse_schematic(path: Path | str) -> Schematic:
rotation = int(rot_str[1:]) rotation = int(rot_str[1:])
current_component = Component( current_component = Component(
name="", name="", symbol=symbol, x=x, y=y, rotation=rotation, mirror=mirror
symbol=symbol,
x=x, y=y,
rotation=rotation,
mirror=mirror
) )
else: else:
current_component = None current_component = None
@ -184,8 +181,7 @@ def parse_schematic(path: Path | str) -> Schematic:
match = re.match(r"TEXT\s+(-?\d+)\s+(-?\d+)\s+(\w+)\s+(\d+)\s*(.*)", line) match = re.match(r"TEXT\s+(-?\d+)\s+(-?\d+)\s+(\w+)\s+(\d+)\s*(.*)", line)
if match: if match:
x, y = int(match.group(1)), int(match.group(2)) x, y = int(match.group(1)), int(match.group(2))
align = match.group(3) # groups 3 (align) and 4 (size) are parsed but not stored
size = int(match.group(4))
content = match.group(5) if match.group(5) else "" content = match.group(5) if match.group(5) else ""
# Check for multi-line text (continuation with \n or actual newlines) # Check for multi-line text (continuation with \n or actual newlines)
@ -219,7 +215,9 @@ def write_schematic(schematic: Schematic, path: Path | str) -> None:
lines = [] lines = []
lines.append(f"Version {schematic.version}") lines.append(f"Version {schematic.version}")
lines.append(f"SHEET {schematic.sheet[0]} {schematic.sheet[1]} {schematic.sheet[2]} {schematic.sheet[3]}") lines.append(
f"SHEET {schematic.sheet[0]} {schematic.sheet[1]} {schematic.sheet[2]} {schematic.sheet[3]}"
)
# Write wires # Write wires
for wire in schematic.wires: for wire in schematic.wires:
@ -251,10 +249,7 @@ def write_schematic(schematic: Schematic, path: Path | str) -> None:
def modify_component_value( def modify_component_value(
path: Path | str, path: Path | str, component_name: str, new_value: str, output_path: Path | str | None = None
component_name: str,
new_value: str,
output_path: Path | str | None = None
) -> Schematic: ) -> Schematic:
"""Modify a component's value in a schematic. """Modify a component's value in a schematic.
@ -276,8 +271,7 @@ def modify_component_value(
if not comp: if not comp:
available = [c.name for c in schematic.components] available = [c.name for c in schematic.components]
raise ValueError( raise ValueError(
f"Component '{component_name}' not found. " f"Component '{component_name}' not found. Available components: {', '.join(available)}"
f"Available components: {', '.join(available)}"
) )
comp.value = new_value comp.value = new_value

View File

@ -20,6 +20,20 @@ import numpy as np
from fastmcp import FastMCP from fastmcp import FastMCP
from . import __version__ from . import __version__
from .asc_generator import (
generate_inverting_amp,
)
from .asc_generator import (
generate_rc_lowpass as generate_rc_lowpass_asc,
)
from .asc_generator import (
generate_voltage_divider as generate_voltage_divider_asc,
)
from .batch import (
run_monte_carlo,
run_parameter_sweep,
run_temperature_sweep,
)
from .config import ( from .config import (
LTSPICE_EXAMPLES, LTSPICE_EXAMPLES,
LTSPICE_LIB, LTSPICE_LIB,
@ -29,14 +43,25 @@ from .diff import diff_schematics as _diff_schematics
from .drc import run_drc as _run_drc from .drc import run_drc as _run_drc
from .log_parser import parse_log from .log_parser import parse_log
from .models import ( from .models import (
get_model_details as _get_model_details,
search_models as _search_models, search_models as _search_models,
)
from .models import (
search_subcircuits as _search_subcircuits, search_subcircuits as _search_subcircuits,
) )
from .netlist import Netlist from .netlist import Netlist
from .optimizer import (
ComponentRange,
OptimizationTarget,
format_engineering,
optimize_component_values,
)
from .power_analysis import compute_efficiency, compute_power_metrics
from .raw_parser import parse_raw_file from .raw_parser import parse_raw_file
from .runner import run_netlist, run_simulation from .runner import run_netlist, run_simulation
from .schematic import modify_component_value, parse_schematic from .schematic import modify_component_value, parse_schematic
from .stability import compute_stability_metrics
from .touchstone import parse_touchstone, s_param_to_db
from .waveform_expr import WaveformCalculator
from .waveform_math import ( from .waveform_math import (
compute_bandwidth, compute_bandwidth,
compute_fft, compute_fft,
@ -47,7 +72,6 @@ from .waveform_math import (
compute_thd, compute_thd,
) )
mcp = FastMCP( mcp = FastMCP(
name="mcp-ltspice", name="mcp-ltspice",
instructions=""" instructions="""
@ -65,6 +89,14 @@ mcp = FastMCP(
- Run design rule checks before simulation - Run design rule checks before simulation
- Compare schematics to see what changed - Compare schematics to see what changed
- Export waveform data to CSV - Export waveform data to CSV
- Measure stability (gain/phase margins from AC loop gain)
- Compute power and efficiency from voltage/current waveforms
- Evaluate waveform math expressions (V*I, gain, dB, etc.)
- Optimize component values to hit target specs automatically
- Generate .asc schematic files (graphical format)
- Run parameter sweeps, temperature sweeps, and Monte Carlo analysis
- Parse Touchstone (.s2p) S-parameter files
- Use circuit templates: buck converter, LDO, diff amp, oscillator, H-bridge
LTspice runs via Wine on Linux. Simulations execute in batch mode LTspice runs via Wine on Linux. Simulations execute in batch mode
and results are parsed from binary .raw files. and results are parsed from binary .raw files.
@ -106,8 +138,7 @@ async def simulate(
if result.raw_data: if result.raw_data:
response["variables"] = [ response["variables"] = [
{"name": v.name, "type": v.type} {"name": v.name, "type": v.type} for v in result.raw_data.variables
for v in result.raw_data.variables
] ]
response["points"] = result.raw_data.points response["points"] = result.raw_data.points
response["plotname"] = result.raw_data.plotname response["plotname"] = result.raw_data.plotname
@ -148,8 +179,7 @@ async def simulate_netlist(
if result.raw_data: if result.raw_data:
response["variables"] = [ response["variables"] = [
{"name": v.name, "type": v.type} {"name": v.name, "type": v.type} for v in result.raw_data.variables
for v in result.raw_data.variables
] ]
response["points"] = result.raw_data.points response["points"] = result.raw_data.points
response["raw_file"] = str(result.raw_file) if result.raw_file else None response["raw_file"] = str(result.raw_file) if result.raw_file else None
@ -219,13 +249,9 @@ def get_waveform(
if np.iscomplexobj(sampled): if np.iscomplexobj(sampled):
result["signals"][name] = { result["signals"][name] = {
"magnitude_db": [ "magnitude_db": [
20 * math.log10(abs(x)) if abs(x) > 0 else -200 20 * math.log10(abs(x)) if abs(x) > 0 else -200 for x in sampled
for x in sampled
],
"phase_degrees": [
math.degrees(math.atan2(x.imag, x.real))
for x in sampled
], ],
"phase_degrees": [math.degrees(math.atan2(x.imag, x.real)) for x in sampled],
} }
else: else:
result["signals"][name] = {"values": sampled.tolist()} result["signals"][name] = {"values": sampled.tolist()}
@ -272,8 +298,10 @@ def analyze_waveform(
signal = raw.get_variable(signal_name) signal = raw.get_variable(signal_name)
if signal is None: if signal is None:
return {"error": f"Signal '{signal_name}' not found. Available: " return {
f"{[v.name for v in raw.variables]}"} "error": f"Signal '{signal_name}' not found. Available: "
f"{[v.name for v in raw.variables]}"
}
# Use real parts for time-domain analysis # Use real parts for time-domain analysis
if np.iscomplexobj(time): if np.iscomplexobj(time):
@ -293,7 +321,8 @@ def analyze_waveform(
elif analysis == "settling_time": elif analysis == "settling_time":
if time is not None: if time is not None:
results["settling_time"] = compute_settling_time( results["settling_time"] = compute_settling_time(
time, signal, time,
signal,
final_value=settling_final_value, final_value=settling_final_value,
tolerance_percent=settling_tolerance_pct, tolerance_percent=settling_tolerance_pct,
) )
@ -301,7 +330,8 @@ def analyze_waveform(
elif analysis == "rise_time": elif analysis == "rise_time":
if time is not None: if time is not None:
results["rise_time"] = compute_rise_time( results["rise_time"] = compute_rise_time(
time, signal, time,
signal,
low_pct=rise_low_pct, low_pct=rise_low_pct,
high_pct=rise_high_pct, high_pct=rise_high_pct,
) )
@ -309,14 +339,16 @@ def analyze_waveform(
elif analysis == "fft": elif analysis == "fft":
if time is not None: if time is not None:
results["fft"] = compute_fft( results["fft"] = compute_fft(
time, signal, time,
signal,
max_harmonics=fft_max_harmonics, max_harmonics=fft_max_harmonics,
) )
elif analysis == "thd": elif analysis == "thd":
if time is not None: if time is not None:
results["thd"] = compute_thd( results["thd"] = compute_thd(
time, signal, time,
signal,
n_harmonics=thd_n_harmonics, n_harmonics=thd_n_harmonics,
) )
@ -350,10 +382,7 @@ def measure_bandwidth(
return {"error": f"Signal '{signal_name}' not found"} return {"error": f"Signal '{signal_name}' not found"}
# Convert complex signal to magnitude in dB # Convert complex signal to magnitude in dB
mag_db = np.array([ mag_db = np.array([20 * math.log10(abs(x)) if abs(x) > 0 else -200 for x in signal])
20 * math.log10(abs(x)) if abs(x) > 0 else -200
for x in signal
])
return compute_bandwidth(freq.real, mag_db, ref_db=ref_db) return compute_bandwidth(freq.real, mag_db, ref_db=ref_db)
@ -384,7 +413,9 @@ def export_csv(
# Select signals # Select signals
if signal_names is None: if signal_names is None:
signal_names = [v.name for v in raw.variables if v.name not in (x_name, "time", "frequency")] signal_names = [
v.name for v in raw.variables if v.name not in (x_name, "time", "frequency")
]
# Downsample # Downsample
total = raw.points total = raw.points
@ -445,6 +476,480 @@ def export_csv(
} }
# ============================================================================
# STABILITY ANALYSIS TOOLS
# ============================================================================
@mcp.tool()
def analyze_stability(
raw_file_path: str,
signal_name: str,
) -> dict:
"""Measure gain margin and phase margin from AC loop gain data.
Computes Bode plot (magnitude + phase) and finds the crossover
frequencies where gain = 0 dB and phase = -180 degrees.
Args:
raw_file_path: Path to .raw file from AC simulation
signal_name: Loop gain signal, e.g. "V(out)" or "V(loop_gain)"
"""
raw = parse_raw_file(raw_file_path)
freq = raw.get_frequency()
signal = raw.get_variable(signal_name)
if freq is None:
return {"error": "Not an AC analysis - no frequency data found"}
if signal is None:
return {
"error": f"Signal '{signal_name}' not found. Available: "
f"{[v.name for v in raw.variables]}"
}
return compute_stability_metrics(freq.real, signal)
# ============================================================================
# POWER ANALYSIS TOOLS
# ============================================================================
@mcp.tool()
def analyze_power(
raw_file_path: str,
voltage_signal: str,
current_signal: str,
) -> dict:
"""Compute power metrics from voltage and current waveforms.
Returns average power, RMS power, peak power, and power factor.
Args:
raw_file_path: Path to .raw file from transient simulation
voltage_signal: Voltage signal name, e.g. "V(out)"
current_signal: Current signal name, e.g. "I(R1)"
"""
raw = parse_raw_file(raw_file_path)
time = raw.get_time()
voltage = raw.get_variable(voltage_signal)
current = raw.get_variable(current_signal)
if time is None:
return {"error": "Not a transient analysis - no time data found"}
if voltage is None:
return {"error": f"Voltage signal '{voltage_signal}' not found"}
if current is None:
return {"error": f"Current signal '{current_signal}' not found"}
return compute_power_metrics(time, voltage, current)
@mcp.tool()
def compute_efficiency_tool(
raw_file_path: str,
input_voltage_signal: str,
input_current_signal: str,
output_voltage_signal: str,
output_current_signal: str,
) -> dict:
"""Compute power conversion efficiency.
Compares input power to output power for regulators, converters, etc.
Args:
raw_file_path: Path to .raw file from transient simulation
input_voltage_signal: Input voltage, e.g. "V(vin)"
input_current_signal: Input current, e.g. "I(Vin)"
output_voltage_signal: Output voltage, e.g. "V(out)"
output_current_signal: Output current, e.g. "I(Rload)"
"""
raw = parse_raw_file(raw_file_path)
time = raw.get_time()
if time is None:
return {"error": "Not a transient analysis"}
v_in = raw.get_variable(input_voltage_signal)
i_in = raw.get_variable(input_current_signal)
v_out = raw.get_variable(output_voltage_signal)
i_out = raw.get_variable(output_current_signal)
for name, sig in [
(input_voltage_signal, v_in),
(input_current_signal, i_in),
(output_voltage_signal, v_out),
(output_current_signal, i_out),
]:
if sig is None:
return {"error": f"Signal '{name}' not found"}
return compute_efficiency(time, v_in, i_in, v_out, i_out)
# ============================================================================
# WAVEFORM EXPRESSION TOOLS
# ============================================================================
@mcp.tool()
def evaluate_waveform_expression(
raw_file_path: str,
expression: str,
max_points: int = 1000,
) -> dict:
"""Evaluate a math expression on simulation waveforms.
Supports: +, -, *, /, abs(), sqrt(), log10(), dB()
Signal names reference variables from the .raw file.
Examples:
"V(out) * I(R1)" - instantaneous power
"V(out) / V(in)" - voltage gain
"dB(V(out))" - magnitude in dB
Args:
raw_file_path: Path to .raw file
expression: Math expression using signal names
max_points: Maximum data points to return
"""
raw = parse_raw_file(raw_file_path)
calc = WaveformCalculator(raw)
try:
result = calc.calc(expression)
except ValueError as e:
return {"error": str(e), "available_signals": calc.available_signals()}
# Get x-axis
x_axis = raw.get_time()
x_name = "time"
if x_axis is None:
x_axis = raw.get_frequency()
x_name = "frequency"
total = len(result)
step = max(1, total // max_points)
response = {
"expression": expression,
"total_points": total,
"returned_points": len(result[::step]),
}
if x_axis is not None:
sampled_x = x_axis[::step]
response["x_axis_name"] = x_name
response["x_axis_data"] = (
sampled_x.real.tolist() if np.iscomplexobj(sampled_x) else sampled_x.tolist()
)
response["values"] = result[::step].tolist()
response["available_signals"] = calc.available_signals()
return response
# ============================================================================
# OPTIMIZER TOOLS
# ============================================================================
@mcp.tool()
async def optimize_circuit(
netlist_template: str,
targets: list[dict],
component_ranges: list[dict],
max_iterations: int = 20,
) -> dict:
"""Automatically optimize component values to hit target specifications.
Runs real LTspice simulations in a loop, adjusting component values
using binary search (single component) or coordinate descent (multiple).
Args:
netlist_template: Netlist text with {ComponentName} placeholders
(e.g., {R1}, {C1}) that get substituted each iteration.
targets: List of target specs, each with:
- signal_name: Signal to measure (e.g., "V(out)")
- metric: One of "bandwidth_hz", "rms", "peak_to_peak",
"settling_time", "gain_db", "phase_margin_deg"
- target_value: Desired value
- weight: Importance weight (default 1.0)
component_ranges: List of tunable components, each with:
- component_name: Name matching {placeholder} (e.g., "R1")
- min_value: Minimum value in base units
- max_value: Maximum value in base units
- preferred_series: Optional "E12", "E24", or "E96" for snapping
max_iterations: Max simulation iterations (default 20)
"""
opt_targets = [
OptimizationTarget(
signal_name=t["signal_name"],
metric=t["metric"],
target_value=t["target_value"],
weight=t.get("weight", 1.0),
)
for t in targets
]
opt_ranges = [
ComponentRange(
component_name=r["component_name"],
min_value=r["min_value"],
max_value=r["max_value"],
preferred_series=r.get("preferred_series"),
)
for r in component_ranges
]
result = await optimize_component_values(
netlist_template,
opt_targets,
opt_ranges,
max_iterations,
)
return {
"best_values": {k: format_engineering(v) for k, v in result.best_values.items()},
"best_values_raw": result.best_values,
"best_cost": result.best_cost,
"iterations": result.iterations,
"targets_met": result.targets_met,
"final_metrics": result.final_metrics,
"history_length": len(result.history),
}
# ============================================================================
# BATCH SIMULATION TOOLS
# ============================================================================
@mcp.tool()
async def parameter_sweep(
netlist_text: str,
param_name: str,
start: float,
stop: float,
num_points: int = 10,
timeout_seconds: float = 300,
) -> dict:
"""Sweep a parameter across a range of values.
Runs multiple simulations, substituting the parameter value each time.
The netlist should contain a .param directive for the parameter.
Args:
netlist_text: Netlist with .param directive
param_name: Parameter to sweep (e.g., "Rval")
start: Start value
stop: Stop value
num_points: Number of sweep points
timeout_seconds: Per-simulation timeout
"""
values = np.linspace(start, stop, num_points).tolist()
result = await run_parameter_sweep(
netlist_text,
param_name,
values,
timeout=timeout_seconds,
)
return {
"success_count": result.success_count,
"failure_count": result.failure_count,
"total_elapsed": result.total_elapsed,
"parameter_values": result.parameter_values,
"raw_files": [str(r.raw_file) if r.raw_file else None for r in result.results],
}
@mcp.tool()
async def temperature_sweep(
netlist_text: str,
temperatures: list[float],
timeout_seconds: float = 300,
) -> dict:
"""Run simulations at different temperatures.
Args:
netlist_text: Netlist text
temperatures: List of temperatures in degrees C
timeout_seconds: Per-simulation timeout
"""
result = await run_temperature_sweep(
netlist_text,
temperatures,
timeout=timeout_seconds,
)
return {
"success_count": result.success_count,
"failure_count": result.failure_count,
"total_elapsed": result.total_elapsed,
"parameter_values": result.parameter_values,
"raw_files": [str(r.raw_file) if r.raw_file else None for r in result.results],
}
@mcp.tool()
async def monte_carlo(
netlist_text: str,
n_runs: int,
tolerances: dict[str, float],
timeout_seconds: float = 300,
seed: int | None = None,
) -> dict:
"""Run Monte Carlo analysis with component tolerances.
Randomly varies component values within tolerance using a normal
distribution, then runs simulations for each variant.
Args:
netlist_text: Netlist text
n_runs: Number of Monte Carlo iterations
tolerances: Component tolerances, e.g. {"R1": 0.05} for 5%
timeout_seconds: Per-simulation timeout
seed: Optional RNG seed for reproducibility
"""
result = await run_monte_carlo(
netlist_text,
n_runs,
tolerances,
timeout=timeout_seconds,
seed=seed,
)
return {
"success_count": result.success_count,
"failure_count": result.failure_count,
"total_elapsed": result.total_elapsed,
"parameter_values": result.parameter_values,
"raw_files": [str(r.raw_file) if r.raw_file else None for r in result.results],
}
# ============================================================================
# SCHEMATIC GENERATION TOOLS
# ============================================================================
@mcp.tool()
def generate_schematic(
template: str,
output_path: str | None = None,
r: str | None = None,
c: str | None = None,
r1: str | None = None,
r2: str | None = None,
vin: str | None = None,
rin: str | None = None,
rf: str | None = None,
opamp_model: str | None = None,
) -> dict:
"""Generate an LTspice .asc schematic file from a template.
Available templates and their parameters:
- "rc_lowpass": r (resistor, default "1k"), c (capacitor, default "100n")
- "voltage_divider": r1 (top, default "10k"), r2 (bottom, default "10k"),
vin (input voltage, default "5")
- "inverting_amp": rin (input R, default "10k"), rf (feedback R,
default "100k"), opamp_model (default "UniversalOpamp2")
Args:
template: Template name
output_path: Where to save (None = auto in /tmp)
r: Resistor value (rc_lowpass)
c: Capacitor value (rc_lowpass)
r1: Top resistor (voltage_divider)
r2: Bottom resistor (voltage_divider)
vin: Input voltage (voltage_divider)
rin: Input resistor (inverting_amp)
rf: Feedback resistor (inverting_amp)
opamp_model: Op-amp model name (inverting_amp)
"""
if template == "rc_lowpass":
params: dict[str, str] = {}
if r is not None:
params["r"] = r
if c is not None:
params["c"] = c
sch = generate_rc_lowpass_asc(**params)
elif template == "voltage_divider":
params = {}
if r1 is not None:
params["r1"] = r1
if r2 is not None:
params["r2"] = r2
if vin is not None:
params["vin"] = vin
sch = generate_voltage_divider_asc(**params)
elif template == "inverting_amp":
params = {}
if rin is not None:
params["rin"] = rin
if rf is not None:
params["rf"] = rf
if opamp_model is not None:
params["opamp_model"] = opamp_model
sch = generate_inverting_amp(**params)
else:
return {
"error": f"Unknown template '{template}'. "
f"Available: rc_lowpass, voltage_divider, inverting_amp"
}
if output_path is None:
output_path = str(Path(tempfile.gettempdir()) / f"{template}.asc")
saved = sch.save(output_path)
return {
"success": True,
"output_path": str(saved),
"schematic_preview": sch.render()[:500],
}
# ============================================================================
# TOUCHSTONE / S-PARAMETER TOOLS
# ============================================================================
@mcp.tool()
def read_touchstone(file_path: str) -> dict:
"""Parse a Touchstone (.s1p, .s2p, .snp) S-parameter file.
Returns S-parameter data, frequency points, and port information.
Args:
file_path: Path to Touchstone file
"""
try:
data = parse_touchstone(file_path)
except (ValueError, FileNotFoundError) as e:
return {"error": str(e)}
# Convert S-parameter data to a more digestible format
s_params = {}
for i in range(data.n_ports):
for j in range(data.n_ports):
key = f"S{i + 1}{j + 1}"
s_data = data.data[:, i, j]
s_params[key] = {
"magnitude_db": s_param_to_db(s_data).tolist(),
}
return {
"filename": data.filename,
"n_ports": data.n_ports,
"n_frequencies": len(data.frequencies),
"freq_range_hz": [float(data.frequencies[0]), float(data.frequencies[-1])],
"reference_impedance": data.reference_impedance,
"s_parameters": s_params,
"comments": data.comments[:5], # First 5 comment lines
}
# ============================================================================ # ============================================================================
# SCHEMATIC TOOLS # SCHEMATIC TOOLS
# ============================================================================ # ============================================================================
@ -497,7 +1002,10 @@ def edit_component(
""" """
try: try:
sch = modify_component_value( sch = modify_component_value(
schematic_path, component_name, new_value, output_path, schematic_path,
component_name,
new_value,
output_path,
) )
comp = sch.get_component(component_name) comp = sch.get_component(component_name)
return { return {
@ -720,12 +1228,14 @@ def get_symbol_info(symbol_path: str) -> dict:
if line.startswith("PIN"): if line.startswith("PIN"):
parts = line.split() parts = line.split()
if len(parts) >= 5: if len(parts) >= 5:
info["pins"].append({ info["pins"].append(
"x": int(parts[1]), {
"y": int(parts[2]), "x": int(parts[1]),
"justification": parts[3], "y": int(parts[2]),
"rotation": parts[4] if len(parts) > 4 else "0", "justification": parts[3],
}) "rotation": parts[4] if len(parts) > 4 else "0",
}
)
elif line.startswith("PINATTR PinName"): elif line.startswith("PINATTR PinName"):
pin_name = line.split(None, 2)[2] if len(line.split()) > 2 else "" pin_name = line.split(None, 2)[2] if len(line.split()) > 2 else ""

View File

@ -0,0 +1,167 @@
"""Stability analysis for AC simulation loop gain data (gain/phase margins)."""
import numpy as np
def _interp_crossing(x: np.ndarray, y: np.ndarray, threshold: float) -> list[float]:
"""Find all x values where y crosses threshold, using linear interpolation."""
crossings = []
for i in range(len(y) - 1):
if (y[i] - threshold) * (y[i + 1] - threshold) < 0:
dy = y[i + 1] - y[i]
if abs(dy) < 1e-30:
crossings.append(float(x[i]))
else:
frac = (threshold - y[i]) / dy
crossings.append(float(x[i] + frac * (x[i + 1] - x[i])))
return crossings
def _interp_y_at_crossing(
x: np.ndarray, y: np.ndarray, ref: np.ndarray, threshold: float
) -> list[tuple[float, float]]:
"""Find interpolated (x, y) pairs where ref crosses threshold."""
results = []
for i in range(len(ref) - 1):
if (ref[i] - threshold) * (ref[i + 1] - threshold) < 0:
dref = ref[i + 1] - ref[i]
if abs(dref) < 1e-30:
results.append((float(x[i]), float(y[i])))
else:
frac = (threshold - ref[i]) / dref
xi = float(x[i] + frac * (x[i + 1] - x[i]))
yi = float(y[i] + frac * (y[i + 1] - y[i]))
results.append((xi, yi))
return results
def compute_gain_margin(frequency: np.ndarray, loop_gain_complex: np.ndarray) -> dict:
"""Compute gain margin from loop gain T(s).
Args:
frequency: Frequency array in Hz (real-valued)
loop_gain_complex: Complex loop gain T(jw)
Returns:
Dict with gain_margin_db, phase_crossover_freq_hz, is_stable
"""
if len(frequency) < 2 or len(loop_gain_complex) < 2:
return {
"gain_margin_db": None,
"phase_crossover_freq_hz": None,
"is_stable": None,
}
freq = np.real(frequency).astype(np.float64)
mag_db = 20.0 * np.log10(np.maximum(np.abs(loop_gain_complex), 1e-30))
phase_deg = np.degrees(np.unwrap(np.angle(loop_gain_complex)))
# Phase crossover: where phase crosses -180 degrees
# Find magnitude at that crossing via interpolation
hits = _interp_y_at_crossing(freq, mag_db, phase_deg, -180.0)
if not hits:
# No phase crossover => infinite gain margin (stable for any gain)
return {
"gain_margin_db": float("inf"),
"phase_crossover_freq_hz": None,
"is_stable": True,
}
# Use the first phase crossover
crossover_freq, mag_at_crossover = hits[0]
gain_margin = -mag_at_crossover
return {
"gain_margin_db": float(gain_margin),
"phase_crossover_freq_hz": float(crossover_freq),
"is_stable": gain_margin > 0,
}
def compute_phase_margin(frequency: np.ndarray, loop_gain_complex: np.ndarray) -> dict:
"""Compute phase margin from loop gain T(s).
Args:
frequency: Frequency array in Hz (real-valued)
loop_gain_complex: Complex loop gain T(jw)
Returns:
Dict with phase_margin_deg, gain_crossover_freq_hz, is_stable
"""
if len(frequency) < 2 or len(loop_gain_complex) < 2:
return {
"phase_margin_deg": None,
"gain_crossover_freq_hz": None,
"is_stable": None,
}
freq = np.real(frequency).astype(np.float64)
mag_db = 20.0 * np.log10(np.maximum(np.abs(loop_gain_complex), 1e-30))
phase_deg = np.degrees(np.unwrap(np.angle(loop_gain_complex)))
# Gain crossover: where magnitude crosses 0 dB
# Find phase at that crossing via interpolation
hits = _interp_y_at_crossing(freq, phase_deg, mag_db, 0.0)
if not hits:
# No gain crossover. If gain is always below 0 dB, system is stable.
is_stable = bool(np.all(mag_db < 0))
return {
"phase_margin_deg": float("inf") if is_stable else None,
"gain_crossover_freq_hz": None,
"is_stable": is_stable,
}
# Use the first gain crossover
crossover_freq, phase_at_crossover = hits[0]
phase_margin = 180.0 + phase_at_crossover
return {
"phase_margin_deg": float(phase_margin),
"gain_crossover_freq_hz": float(crossover_freq),
"is_stable": phase_margin > 0,
}
def compute_stability_metrics(frequency: np.ndarray, loop_gain_complex: np.ndarray) -> dict:
"""Compute comprehensive stability metrics including Bode plot data.
Args:
frequency: Frequency array in Hz (real-valued)
loop_gain_complex: Complex loop gain T(jw)
Returns:
Dict with gain_margin, phase_margin, bode data, crossover
frequencies, and overall stability assessment
"""
if len(frequency) < 2 or len(loop_gain_complex) < 2:
return {
"gain_margin": compute_gain_margin(frequency, loop_gain_complex),
"phase_margin": compute_phase_margin(frequency, loop_gain_complex),
"bode": {"frequency_hz": [], "magnitude_db": [], "phase_deg": []},
"is_stable": None,
}
freq = np.real(frequency).astype(np.float64)
mag_db = 20.0 * np.log10(np.maximum(np.abs(loop_gain_complex), 1e-30))
phase_deg = np.degrees(np.unwrap(np.angle(loop_gain_complex)))
gm = compute_gain_margin(frequency, loop_gain_complex)
pm = compute_phase_margin(frequency, loop_gain_complex)
# Overall stability: both margins must be positive (when defined)
gm_ok = gm["is_stable"] is not False
pm_ok = pm["is_stable"] is not False
is_stable = gm_ok and pm_ok
return {
"gain_margin": gm,
"phase_margin": pm,
"bode": {
"frequency_hz": freq.tolist(),
"magnitude_db": mag_db.tolist(),
"phase_deg": phase_deg.tolist(),
},
"is_stable": is_stable,
}

View File

@ -0,0 +1,254 @@
"""Parse Touchstone (.s1p, .s2p, .snp) S-parameter files for LTspice."""
import re
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
# Frequency unit multipliers to Hz
_FREQ_MULTIPLIERS: dict[str, float] = {
"HZ": 1.0,
"KHZ": 1e3,
"MHZ": 1e6,
"GHZ": 1e9,
}
@dataclass
class TouchstoneData:
"""Parsed contents of a Touchstone file.
All frequencies are stored in Hz regardless of the original file's
unit. The ``data`` array holds complex-valued parameters with shape
``(n_freq, n_ports, n_ports)``.
"""
filename: str
n_ports: int
freq_unit: str
parameter_type: str # S, Y, Z, H, G
format_type: str # MA, DB, RI
reference_impedance: float
frequencies: np.ndarray # shape (n_freq,), always in Hz
data: np.ndarray # shape (n_freq, n_ports, n_ports), complex128
comments: list[str] = field(default_factory=list)
def _to_complex(v1: float, v2: float, fmt: str) -> complex:
"""Convert a value pair to complex according to the format type."""
if fmt == "RI":
return complex(v1, v2)
elif fmt == "MA":
mag = v1
angle_rad = np.deg2rad(v2)
return complex(mag * np.cos(angle_rad), mag * np.sin(angle_rad))
elif fmt == "DB":
mag = 10.0 ** (v1 / 20.0)
angle_rad = np.deg2rad(v2)
return complex(mag * np.cos(angle_rad), mag * np.sin(angle_rad))
else:
raise ValueError(f"Unknown format type: {fmt!r}")
def _detect_ports(path: Path) -> int:
"""Detect port count from the file extension (.s<n>p)."""
suffix = path.suffix.lower()
m = re.match(r"\.s(\d+)p$", suffix)
if not m:
raise ValueError(
f"Cannot determine port count from extension {suffix!r}. "
"Expected .s1p, .s2p, .s3p, .s4p, etc."
)
return int(m.group(1))
def parse_touchstone(path: str | Path) -> TouchstoneData:
"""Parse a Touchstone file into a TouchstoneData object.
Handles .s1p through .s4p (and beyond), all three format types
(MA, DB, RI), all frequency units, and continuation lines used
by files with more than two ports.
Args:
path: Path to the Touchstone file.
Returns:
A TouchstoneData with frequencies converted to Hz and
parameters stored as complex128.
"""
path = Path(path)
n_ports = _detect_ports(path)
# Defaults per Touchstone spec
freq_unit = "GHZ"
param_type = "S"
fmt = "MA"
ref_impedance = 50.0
comments: list[str] = []
data_lines: list[str] = []
option_found = False
with path.open() as fh:
for raw_line in fh:
line = raw_line.strip()
# Comment lines
if line.startswith("!"):
comments.append(line[1:].strip())
continue
# Option line (only the first one is used)
if line.startswith("#"):
if not option_found:
option_found = True
tokens = line[1:].split()
# Parse tokens case-insensitively
i = 0
while i < len(tokens):
tok = tokens[i].upper()
if tok in _FREQ_MULTIPLIERS:
freq_unit = tok
elif tok in ("S", "Y", "Z", "H", "G"):
param_type = tok
elif tok in ("MA", "DB", "RI"):
fmt = tok
elif tok == "R" and i + 1 < len(tokens):
i += 1
ref_impedance = float(tokens[i])
i += 1
continue
# Skip blank lines
if not line:
continue
# Inline comments after data (some files use !)
if "!" in line:
line = line[: line.index("!")]
data_lines.append(line)
# Compute the expected number of value pairs per frequency point.
# Each frequency point has n_ports * n_ports parameters, each
# consisting of two floats.
values_per_freq = n_ports * n_ports * 2 # pairs * 2 values each
# Flatten all data tokens
all_tokens: list[float] = []
for dl in data_lines:
all_tokens.extend(float(t) for t in dl.split())
# Each frequency row starts with the frequency value, followed by
# values_per_freq data values. For n_ports <= 2, everything fits
# on one line. For n_ports > 2, continuation lines are used and
# don't repeat the frequency.
stride = 1 + values_per_freq # freq + data
if len(all_tokens) % stride != 0:
raise ValueError(
f"Token count {len(all_tokens)} is not a multiple of "
f"expected stride {stride} for a {n_ports}-port file."
)
n_freq = len(all_tokens) // stride
freq_mult = _FREQ_MULTIPLIERS[freq_unit]
frequencies = np.empty(n_freq, dtype=np.float64)
data = np.empty((n_freq, n_ports, n_ports), dtype=np.complex128)
for k in range(n_freq):
offset = k * stride
frequencies[k] = all_tokens[offset] * freq_mult
# Data values come in pairs: (v1, v2) per parameter
idx = offset + 1
for row in range(n_ports):
for col in range(n_ports):
v1 = all_tokens[idx]
v2 = all_tokens[idx + 1]
data[k, row, col] = _to_complex(v1, v2, fmt)
idx += 2
return TouchstoneData(
filename=path.name,
n_ports=n_ports,
freq_unit=freq_unit,
parameter_type=param_type,
format_type=fmt,
reference_impedance=ref_impedance,
frequencies=frequencies,
data=data,
comments=comments,
)
def get_s_parameter(data: TouchstoneData, i: int, j: int) -> tuple[np.ndarray, np.ndarray]:
"""Extract a single S-parameter across all frequencies.
Args:
data: Parsed Touchstone data.
i: Row index (1-based, as in S(i,j)).
j: Column index (1-based).
Returns:
Tuple of (frequencies_hz, complex_values) where both are 1-D
numpy arrays.
"""
if i < 1 or i > data.n_ports:
raise IndexError(f"Row index {i} out of range for {data.n_ports}-port data")
if j < 1 or j > data.n_ports:
raise IndexError(f"Column index {j} out of range for {data.n_ports}-port data")
return data.frequencies.copy(), data.data[:, i - 1, j - 1].copy()
def s_param_to_db(complex_values: np.ndarray) -> np.ndarray:
"""Convert complex S-parameter values to decibels.
Computes 20 * log10(|S|), flooring magnitudes at -300 dB to avoid
log-of-zero warnings.
Args:
complex_values: Array of complex S-parameter values.
Returns:
Magnitude in dB as a real-valued numpy array.
"""
magnitude = np.abs(complex_values)
return 20.0 * np.log10(np.maximum(magnitude, 1e-15))
def generate_ltspice_subcircuit(touchstone_data: TouchstoneData, name: str) -> str:
"""Generate an LTspice-compatible subcircuit wrapping S-parameter data.
LTspice can reference Touchstone files from within a subcircuit
using the ``.net`` directive. This function produces a ``.sub``
file body that instantiates the S-parameter block.
Args:
touchstone_data: Parsed Touchstone data.
name: Subcircuit name (used in .SUBCKT and filename references).
Returns:
A string containing the complete subcircuit definition.
"""
td = touchstone_data
n = td.n_ports
# Build port list: port1, port2, ..., portN, plus a reference node
port_names = [f"port{k}" for k in range(1, n + 1)]
port_list = " ".join(port_names)
lines: list[str] = []
lines.append(f"* LTspice subcircuit for {td.filename}")
lines.append(f"* {n}-port {td.parameter_type}-parameters, Z0={td.reference_impedance} Ohm")
lines.append(
f"* Frequency range: {td.frequencies[0]:.6g} Hz "
f"to {td.frequencies[-1]:.6g} Hz "
f"({len(td.frequencies)} points)"
)
lines.append(f".SUBCKT {name} {port_list} ref")
lines.append(f".net {td.filename} {port_list} ref")
lines.append(f".ends {name}")
return "\n".join(lines) + "\n"

View File

@ -0,0 +1,299 @@
"""Evaluate mathematical expressions on waveform data.
Supports expressions like "V(out) * I(R1)", "V(out) / V(in)",
"20*log10(abs(V(out)))", etc. Uses a safe recursive-descent parser
instead of Python's eval/exec.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from enum import Enum, auto
import numpy as np
from .raw_parser import RawFile
# -- Tokenizer ---------------------------------------------------------------
class _TokenType(Enum):
NUMBER = auto()
SIGNAL = auto() # e.g., V(out), I(R1), time
FUNC = auto() # abs, sqrt, log10, dB
PLUS = auto()
MINUS = auto()
STAR = auto()
SLASH = auto()
LPAREN = auto()
RPAREN = auto()
EOF = auto()
@dataclass
class _Token:
type: _TokenType
value: str
# Functions we support (case-insensitive lookup)
_FUNCTIONS = {"abs", "sqrt", "log10", "db"}
# Regex pieces for the tokenizer
_NUMBER_RE = re.compile(r"[0-9]*\.?[0-9]+(?:[eE][+-]?[0-9]+)?")
# Signal names: either V(...), I(...) style or bare identifiers like "time".
# Handles nested parens for things like V(N001) and I(R1).
_SIGNAL_RE = re.compile(r"[A-Za-z_]\w*\([^)]*\)")
_IDENT_RE = re.compile(r"[A-Za-z_]\w*")
def _tokenize(expr: str) -> list[_Token]:
"""Convert an expression string into a list of tokens."""
tokens: list[_Token] = []
i = 0
s = expr.strip()
while i < len(s):
# Skip whitespace
if s[i].isspace():
i += 1
continue
# Operators / parens
if s[i] == "+":
tokens.append(_Token(_TokenType.PLUS, "+"))
i += 1
elif s[i] == "-":
tokens.append(_Token(_TokenType.MINUS, "-"))
i += 1
elif s[i] == "*":
tokens.append(_Token(_TokenType.STAR, "*"))
i += 1
elif s[i] == "/":
tokens.append(_Token(_TokenType.SLASH, "/"))
i += 1
elif s[i] == "(":
tokens.append(_Token(_TokenType.LPAREN, "("))
i += 1
elif s[i] == ")":
tokens.append(_Token(_TokenType.RPAREN, ")"))
i += 1
# Number literal
elif s[i].isdigit() or (s[i] == "." and i + 1 < len(s) and s[i + 1].isdigit()):
m = _NUMBER_RE.match(s, i)
if m:
tokens.append(_Token(_TokenType.NUMBER, m.group()))
i = m.end()
else:
raise ValueError(f"Invalid number at position {i}: {s[i:]!r}")
# Identifier: could be a function, signal like V(out), or bare name
elif s[i].isalpha() or s[i] == "_":
# Try signal pattern first: name(...)
m = _SIGNAL_RE.match(s, i)
if m:
full = m.group()
# Check if the part before '(' is a known function
paren_pos = full.index("(")
prefix = full[:paren_pos].lower()
if prefix in _FUNCTIONS:
# It's a function call -- emit FUNC token, then let
# the parser handle the parenthesized argument
tokens.append(_Token(_TokenType.FUNC, prefix))
i += paren_pos # parser will see '(' next
else:
# It's a signal name like V(out) or I(R1)
tokens.append(_Token(_TokenType.SIGNAL, full))
i = m.end()
else:
# Bare identifier (e.g., "time", or a function without parens)
m = _IDENT_RE.match(s, i)
if m:
name = m.group()
if name.lower() in _FUNCTIONS:
tokens.append(_Token(_TokenType.FUNC, name.lower()))
else:
tokens.append(_Token(_TokenType.SIGNAL, name))
i = m.end()
else:
raise ValueError(f"Unexpected character at position {i}: {s[i:]!r}")
else:
raise ValueError(f"Unexpected character at position {i}: {s[i:]!r}")
tokens.append(_Token(_TokenType.EOF, ""))
return tokens
# -- Recursive-descent parser / evaluator ------------------------------------
#
# Grammar:
# expr -> term (('+' | '-') term)*
# term -> unary (('*' | '/') unary)*
# unary -> '-' unary | primary
# primary -> NUMBER | SIGNAL | FUNC '(' expr ')' | '(' expr ')'
class _Parser:
"""Recursive-descent expression evaluator over numpy arrays."""
def __init__(self, tokens: list[_Token], variables: dict[str, np.ndarray]):
self.tokens = tokens
self.variables = variables
self.pos = 0
def _peek(self) -> _Token:
return self.tokens[self.pos]
def _advance(self) -> _Token:
tok = self.tokens[self.pos]
self.pos += 1
return tok
def _expect(self, ttype: _TokenType) -> _Token:
tok = self._advance()
if tok.type != ttype:
raise ValueError(f"Expected {ttype.name}, got {tok.type.name} ({tok.value!r})")
return tok
def parse(self) -> np.ndarray:
result = self._expr()
if self._peek().type != _TokenType.EOF:
raise ValueError(f"Unexpected token after expression: {self._peek().value!r}")
return np.real(result) if np.iscomplexobj(result) else result
def _expr(self) -> np.ndarray:
left = self._term()
while self._peek().type in (_TokenType.PLUS, _TokenType.MINUS):
op = self._advance()
right = self._term()
if op.type == _TokenType.PLUS:
left = left + right
else:
left = left - right
return left
def _term(self) -> np.ndarray:
left = self._unary()
while self._peek().type in (_TokenType.STAR, _TokenType.SLASH):
op = self._advance()
right = self._unary()
if op.type == _TokenType.STAR:
left = left * right
else:
# Avoid division by zero -- replace zeros with tiny value
safe = np.where(np.abs(right) < 1e-30, 1e-30, right)
left = left / safe
return left
def _unary(self) -> np.ndarray:
if self._peek().type == _TokenType.MINUS:
self._advance()
return -self._unary()
return self._primary()
def _primary(self) -> np.ndarray:
tok = self._peek()
if tok.type == _TokenType.NUMBER:
self._advance()
return np.float64(tok.value)
if tok.type == _TokenType.SIGNAL:
self._advance()
return self._resolve_signal(tok.value)
if tok.type == _TokenType.FUNC:
self._advance()
self._expect(_TokenType.LPAREN)
arg = self._expr()
self._expect(_TokenType.RPAREN)
return self._apply_func(tok.value, arg)
if tok.type == _TokenType.LPAREN:
self._advance()
result = self._expr()
self._expect(_TokenType.RPAREN)
return result
raise ValueError(f"Unexpected token: {tok.type.name} ({tok.value!r})")
def _resolve_signal(self, name: str) -> np.ndarray:
"""Look up a signal by name, trying exact match then case-insensitive."""
if name in self.variables:
return np.real(self.variables[name])
name_lower = name.lower()
for key, val in self.variables.items():
if key.lower() == name_lower:
return np.real(val)
available = ", ".join(sorted(self.variables.keys()))
raise ValueError(f"Unknown signal {name!r}. Available: {available}")
@staticmethod
def _apply_func(name: str, arg: np.ndarray) -> np.ndarray:
"""Apply a built-in function to a numpy array."""
a = np.real(arg)
if name == "abs":
return np.abs(a)
if name == "sqrt":
return np.sqrt(np.maximum(a, 0.0))
if name == "log10":
return np.log10(np.maximum(np.abs(a), 1e-30))
if name == "db":
return 20.0 * np.log10(np.maximum(np.abs(a), 1e-30))
raise ValueError(f"Unknown function: {name!r}")
# -- Public API ---------------------------------------------------------------
def evaluate_expression(expression: str, variables: dict[str, np.ndarray]) -> np.ndarray:
"""Evaluate a mathematical expression over waveform data.
Uses a safe recursive-descent parser -- no eval() or exec().
Args:
expression: Math expression, e.g. "V(out) * I(R1)" or "dB(V(out))"
variables: Dict mapping signal names to numpy arrays
Returns:
Resulting numpy array
Raises:
ValueError: On parse errors or unknown signals
"""
tokens = _tokenize(expression)
parser = _Parser(tokens, variables)
return parser.parse()
class WaveformCalculator:
"""Evaluate expressions against all signals in a RawFile."""
def __init__(self, raw_file: RawFile):
self._raw = raw_file
self._variables: dict[str, np.ndarray] = {}
for var in raw_file.variables:
self._variables[var.name] = raw_file.data[var.index]
def calc(self, expression: str) -> np.ndarray:
"""Evaluate an expression against the loaded signals.
Args:
expression: Math expression referencing signal names from the raw file
Returns:
Resulting numpy array
"""
return evaluate_expression(expression, self._variables)
def available_signals(self) -> list[str]:
"""List all signal names available for expressions."""
return sorted(self._variables.keys())

View File

@ -3,9 +3,7 @@
import numpy as np import numpy as np
def compute_fft( def compute_fft(time: np.ndarray, signal: np.ndarray, max_harmonics: int = 50) -> dict:
time: np.ndarray, signal: np.ndarray, max_harmonics: int = 50
) -> dict:
"""Compute FFT of a time-domain signal. """Compute FFT of a time-domain signal.
Args: Args:
@ -73,9 +71,7 @@ def compute_fft(
} }
def compute_thd( def compute_thd(time: np.ndarray, signal: np.ndarray, n_harmonics: int = 10) -> dict:
time: np.ndarray, signal: np.ndarray, n_harmonics: int = 10
) -> dict:
"""Compute Total Harmonic Distortion. """Compute Total Harmonic Distortion.
THD is the ratio of harmonic content to the fundamental, expressed THD is the ratio of harmonic content to the fundamental, expressed
@ -147,14 +143,16 @@ def compute_thd(
idx = int(np.argmin(np.abs(freqs - target_freq))) idx = int(np.argmin(np.abs(freqs - target_freq)))
mag = float(magnitudes[idx]) mag = float(magnitudes[idx])
mag_db = 20.0 * np.log10(max(mag, 1e-15)) mag_db = 20.0 * np.log10(max(mag, 1e-15))
harmonic_sum_sq += mag ** 2 harmonic_sum_sq += mag**2
harmonics.append({ harmonics.append(
"harmonic": h, {
"frequency": float(freqs[idx]), "harmonic": h,
"magnitude": mag, "frequency": float(freqs[idx]),
"magnitude_db": mag_db, "magnitude": mag,
}) "magnitude_db": mag_db,
}
)
thd_percent = (np.sqrt(harmonic_sum_sq) / fundamental_mag) * 100.0 thd_percent = (np.sqrt(harmonic_sum_sq) / fundamental_mag) * 100.0
@ -179,7 +177,7 @@ def compute_rms(signal: np.ndarray) -> float:
return 0.0 return 0.0
real_signal = np.real(signal) real_signal = np.real(signal)
return float(np.sqrt(np.mean(real_signal ** 2))) return float(np.sqrt(np.mean(real_signal**2)))
def compute_peak_to_peak(signal: np.ndarray) -> dict: def compute_peak_to_peak(signal: np.ndarray) -> dict: