Add analysis, netlist builder, model search, DRC, and diff tools

New modules:
- log_parser: Extract .meas results and errors from sim logs
- waveform_math: FFT, THD, RMS, settling time, rise time, bandwidth
- netlist: Programmatic SPICE netlist builder with templates
- models: Search 2800+ SPICE models and subcircuits in library
- diff: Compare two schematics for component/topology changes
- drc: Design rule checks (ground, floating nodes, missing values)

Server now has 18 tools, 3 resources, and 3 guided prompts.
This commit is contained in:
Ryan Malloy 2026-02-10 13:59:26 -07:00
parent a77874c972
commit b31ff1cbe4
7 changed files with 2868 additions and 116 deletions

430
src/mcp_ltspice/diff.py Normal file
View File

@ -0,0 +1,430 @@
"""Compare two LTspice schematics and produce a structured diff."""
from dataclasses import dataclass, field
from pathlib import Path
from .schematic import parse_schematic, Schematic, Component
@dataclass
class ComponentChange:
"""A change to a component."""
name: str
change_type: str # "added", "removed", "modified"
symbol: str = ""
old_value: str | None = None
new_value: str | None = None
old_attributes: dict[str, str] = field(default_factory=dict)
new_attributes: dict[str, str] = field(default_factory=dict)
moved: bool = False # Position changed
@dataclass
class DirectiveChange:
"""A change to a SPICE directive."""
change_type: str # "added", "removed", "modified"
old_text: str | None = None
new_text: str | None = None
@dataclass
class SchematicDiff:
"""Complete diff between two schematics."""
component_changes: list[ComponentChange] = field(default_factory=list)
directive_changes: list[DirectiveChange] = field(default_factory=list)
nets_added: list[str] = field(default_factory=list)
nets_removed: list[str] = field(default_factory=list)
wires_added: int = 0
wires_removed: int = 0
@property
def has_changes(self) -> bool:
return bool(
self.component_changes
or self.directive_changes
or self.nets_added
or self.nets_removed
or self.wires_added
or self.wires_removed
)
def summary(self) -> str:
"""Human-readable summary of changes."""
lines: list[str] = []
if not self.has_changes:
return "No changes detected."
# Group component changes by type
added = [c for c in self.component_changes if c.change_type == "added"]
removed = [c for c in self.component_changes if c.change_type == "removed"]
modified = [c for c in self.component_changes if c.change_type == "modified"]
if modified:
lines.append(
f"{len(modified)} component{'s' if len(modified) != 1 else ''} modified:"
)
for c in modified:
parts: list[str] = []
if c.old_value != c.new_value:
parts.append(f"{c.old_value} -> {c.new_value}")
if c.moved:
parts.append("moved")
# Check for attribute changes beyond value
attr_diff = _attr_diff_summary(c.old_attributes, c.new_attributes)
if attr_diff:
parts.append(attr_diff)
detail = ", ".join(parts) if parts else "attributes changed"
lines.append(f" {c.name}: {detail}")
if added:
for c in added:
val = f" = {c.new_value}" if c.new_value else ""
lines.append(
f"1 component added: {c.name} ({c.symbol}){val}"
if len(added) == 1
else f" {c.name} ({c.symbol}){val}"
)
if len(added) > 1:
lines.insert(
len(lines) - len(added),
f"{len(added)} components added:",
)
if removed:
for c in removed:
val = f" = {c.old_value}" if c.old_value else ""
lines.append(
f"1 component removed: {c.name} ({c.symbol}){val}"
if len(removed) == 1
else f" {c.name} ({c.symbol}){val}"
)
if len(removed) > 1:
lines.insert(
len(lines) - len(removed),
f"{len(removed)} components removed:",
)
# Directive changes
dir_added = [d for d in self.directive_changes if d.change_type == "added"]
dir_removed = [d for d in self.directive_changes if d.change_type == "removed"]
dir_modified = [d for d in self.directive_changes if d.change_type == "modified"]
for d in dir_modified:
lines.append(f"1 directive changed: {d.old_text} -> {d.new_text}")
for d in dir_added:
lines.append(f"1 directive added: {d.new_text}")
for d in dir_removed:
lines.append(f"1 directive removed: {d.old_text}")
# Net changes
if self.nets_added:
lines.append(
f"{len(self.nets_added)} net{'s' if len(self.nets_added) != 1 else ''} "
f"added: {', '.join(self.nets_added)}"
)
if self.nets_removed:
lines.append(
f"{len(self.nets_removed)} net{'s' if len(self.nets_removed) != 1 else ''} "
f"removed: {', '.join(self.nets_removed)}"
)
# Wire changes
wire_parts: list[str] = []
if self.wires_added:
wire_parts.append(
f"{self.wires_added} wire{'s' if self.wires_added != 1 else ''} added"
)
if self.wires_removed:
wire_parts.append(
f"{self.wires_removed} wire{'s' if self.wires_removed != 1 else ''} removed"
)
if wire_parts:
lines.append(", ".join(wire_parts))
return "\n".join(lines)
def to_dict(self) -> dict:
"""Convert to JSON-serializable dict."""
return {
"has_changes": self.has_changes,
"component_changes": [
{
"name": c.name,
"change_type": c.change_type,
"symbol": c.symbol,
"old_value": c.old_value,
"new_value": c.new_value,
"old_attributes": c.old_attributes,
"new_attributes": c.new_attributes,
"moved": c.moved,
}
for c in self.component_changes
],
"directive_changes": [
{
"change_type": d.change_type,
"old_text": d.old_text,
"new_text": d.new_text,
}
for d in self.directive_changes
],
"nets_added": self.nets_added,
"nets_removed": self.nets_removed,
"wires_added": self.wires_added,
"wires_removed": self.wires_removed,
"summary": self.summary(),
}
def _attr_diff_summary(old: dict[str, str], new: dict[str, str]) -> str:
"""Summarize attribute differences, excluding Value (handled separately)."""
changes: list[str] = []
all_keys = set(old) | set(new)
# Skip Value since it's reported on its own
all_keys.discard("Value")
all_keys.discard("Value2")
for key in sorted(all_keys):
old_val = old.get(key)
new_val = new.get(key)
if old_val != new_val:
if old_val is None:
changes.append(f"+{key}={new_val}")
elif new_val is None:
changes.append(f"-{key}={old_val}")
else:
changes.append(f"{key}: {old_val} -> {new_val}")
return "; ".join(changes)
def _normalize_directive(text: str) -> str:
"""Normalize whitespace in a SPICE directive for comparison."""
return " ".join(text.split())
def _wire_set(schematic: Schematic) -> set[tuple[int, int, int, int]]:
"""Convert wires to a set of coordinate tuples for comparison.
Each wire is stored in a canonical form (smaller point first) so that
reversed wires compare equal.
"""
result: set[tuple[int, int, int, int]] = set()
for w in schematic.wires:
# Canonical ordering: sort by (x, y) so direction doesn't matter
if (w.x1, w.y1) <= (w.x2, w.y2):
result.add((w.x1, w.y1, w.x2, w.y2))
else:
result.add((w.x2, w.y2, w.x1, w.y1))
return result
def _component_map(schematic: Schematic) -> dict[str, Component]:
"""Build a map of component instance name -> Component."""
return {comp.name: comp for comp in schematic.components}
def _diff_components(
schema_a: Schematic, schema_b: Schematic
) -> list[ComponentChange]:
"""Compare components between two schematics."""
map_a = _component_map(schema_a)
map_b = _component_map(schema_b)
names_a = set(map_a)
names_b = set(map_b)
changes: list[ComponentChange] = []
# Removed components (in A but not B)
for name in sorted(names_a - names_b):
comp = map_a[name]
changes.append(
ComponentChange(
name=name,
change_type="removed",
symbol=comp.symbol,
old_value=comp.value,
old_attributes=dict(comp.attributes),
)
)
# Added components (in B but not A)
for name in sorted(names_b - names_a):
comp = map_b[name]
changes.append(
ComponentChange(
name=name,
change_type="added",
symbol=comp.symbol,
new_value=comp.value,
new_attributes=dict(comp.attributes),
)
)
# Potentially modified components (in both)
for name in sorted(names_a & names_b):
comp_a = map_a[name]
comp_b = map_b[name]
moved = (comp_a.x, comp_a.y) != (comp_b.x, comp_b.y)
value_changed = comp_a.value != comp_b.value
attrs_changed = comp_a.attributes != comp_b.attributes
rotation_changed = (
comp_a.rotation != comp_b.rotation or comp_a.mirror != comp_b.mirror
)
if moved or value_changed or attrs_changed or rotation_changed:
changes.append(
ComponentChange(
name=name,
change_type="modified",
symbol=comp_a.symbol,
old_value=comp_a.value,
new_value=comp_b.value,
old_attributes=dict(comp_a.attributes),
new_attributes=dict(comp_b.attributes),
moved=moved,
)
)
return changes
def _diff_directives(
schema_a: Schematic, schema_b: Schematic
) -> list[DirectiveChange]:
"""Compare SPICE directives between two schematics."""
directives_a = [t.content for t in schema_a.texts if t.type == "spice"]
directives_b = [t.content for t in schema_b.texts if t.type == "spice"]
# Normalize for comparison but keep originals for display
norm_a = {_normalize_directive(d): d for d in directives_a}
norm_b = {_normalize_directive(d): d for d in directives_b}
keys_a = set(norm_a)
keys_b = set(norm_b)
changes: list[DirectiveChange] = []
# Removed directives
for key in sorted(keys_a - keys_b):
changes.append(
DirectiveChange(change_type="removed", old_text=norm_a[key])
)
# Added directives
for key in sorted(keys_b - keys_a):
changes.append(
DirectiveChange(change_type="added", new_text=norm_b[key])
)
# For modified detection: directives that share a command keyword but differ.
# We match by the first token (e.g., ".tran", ".ac") to detect modifications
# vs pure add/remove pairs.
removed = [c for c in changes if c.change_type == "removed"]
added = [c for c in changes if c.change_type == "added"]
matched_removed: set[int] = set()
matched_added: set[int] = set()
for ri, rc in enumerate(removed):
old_cmd = (rc.old_text or "").split()[0].lower() if rc.old_text else ""
for ai, ac in enumerate(added):
if ai in matched_added:
continue
new_cmd = (ac.new_text or "").split()[0].lower() if ac.new_text else ""
if old_cmd and old_cmd == new_cmd:
matched_removed.add(ri)
matched_added.add(ai)
break
# Rebuild the list: unmatched stay as-is, matched pairs become "modified"
final_changes: list[DirectiveChange] = []
for ri, rc in enumerate(removed):
if ri not in matched_removed:
final_changes.append(rc)
for ai, ac in enumerate(added):
if ai not in matched_added:
final_changes.append(ac)
for ri in sorted(matched_removed):
rc = removed[ri]
# Find its matched added entry
old_cmd = (rc.old_text or "").split()[0].lower()
for ai, ac in enumerate(added):
new_cmd = (ac.new_text or "").split()[0].lower() if ac.new_text else ""
if ai in matched_added and old_cmd == new_cmd:
final_changes.append(
DirectiveChange(
change_type="modified",
old_text=rc.old_text,
new_text=ac.new_text,
)
)
break
return final_changes
def _diff_nets(
schema_a: Schematic, schema_b: Schematic
) -> tuple[list[str], list[str]]:
"""Compare net flags between two schematics.
Returns:
(nets_added, nets_removed)
"""
names_a = {f.name for f in schema_a.flags}
names_b = {f.name for f in schema_b.flags}
added = sorted(names_b - names_a)
removed = sorted(names_a - names_b)
return added, removed
def _diff_wires(
schema_a: Schematic, schema_b: Schematic
) -> tuple[int, int]:
"""Compare wires between two schematics using set operations.
Returns:
(wires_added, wires_removed)
"""
set_a = _wire_set(schema_a)
set_b = _wire_set(schema_b)
added = len(set_b - set_a)
removed = len(set_a - set_b)
return added, removed
def diff_schematics(
path_a: Path | str,
path_b: Path | str,
) -> SchematicDiff:
"""Compare two schematics and return differences.
Args:
path_a: Path to the "before" schematic
path_b: Path to the "after" schematic
Returns:
SchematicDiff with all changes
"""
schema_a = parse_schematic(path_a)
schema_b = parse_schematic(path_b)
component_changes = _diff_components(schema_a, schema_b)
directive_changes = _diff_directives(schema_a, schema_b)
nets_added, nets_removed = _diff_nets(schema_a, schema_b)
wires_added, wires_removed = _diff_wires(schema_a, schema_b)
return SchematicDiff(
component_changes=component_changes,
directive_changes=directive_changes,
nets_added=nets_added,
nets_removed=nets_removed,
wires_added=wires_added,
wires_removed=wires_removed,
)

439
src/mcp_ltspice/drc.py Normal file
View File

@ -0,0 +1,439 @@
"""Design Rule Checks for LTspice schematics."""
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from .schematic import Schematic, parse_schematic
class Severity(Enum):
ERROR = "error" # Will likely cause simulation failure
WARNING = "warning" # May cause unexpected results
INFO = "info" # Suggestion for improvement
@dataclass
class DRCViolation:
"""A single design rule violation."""
rule: str # Short rule identifier
severity: Severity
message: str # Human-readable description
component: str | None = None # Related component name
location: tuple[int, int] | None = None # (x, y) if applicable
@dataclass
class DRCResult:
"""Results of a design rule check."""
violations: list[DRCViolation] = field(default_factory=list)
checks_run: int = 0
@property
def errors(self) -> list[DRCViolation]:
return [v for v in self.violations if v.severity == Severity.ERROR]
@property
def warnings(self) -> list[DRCViolation]:
return [v for v in self.violations if v.severity == Severity.WARNING]
@property
def passed(self) -> bool:
return len(self.errors) == 0
def summary(self) -> str:
"""Human-readable summary."""
total = len(self.violations)
err_count = len(self.errors)
warn_count = len(self.warnings)
info_count = total - err_count - warn_count
if total == 0:
return f"DRC passed: {self.checks_run} checks run, no violations found."
parts = []
if err_count:
parts.append(f"{err_count} error{'s' if err_count != 1 else ''}")
if warn_count:
parts.append(f"{warn_count} warning{'s' if warn_count != 1 else ''}")
if info_count:
parts.append(f"{info_count} info")
status = "FAILED" if err_count else "passed with warnings"
return (
f"DRC {status}: {self.checks_run} checks run, "
f"{', '.join(parts)}."
)
def to_dict(self) -> dict:
"""Convert to JSON-serializable dict."""
return {
"passed": self.passed,
"checks_run": self.checks_run,
"summary": self.summary(),
"error_count": len(self.errors),
"warning_count": len(self.warnings),
"violations": [
{
"rule": v.rule,
"severity": v.severity.value,
"message": v.message,
"component": v.component,
"location": list(v.location) if v.location else None,
}
for v in self.violations
],
}
def run_drc(schematic_path: Path | str) -> DRCResult:
"""Run all design rule checks on a schematic.
Args:
schematic_path: Path to .asc file
Returns:
DRCResult with all violations found
"""
sch = parse_schematic(schematic_path)
result = DRCResult()
_check_ground(sch, result)
_check_floating_nodes(sch, result)
_check_simulation_directive(sch, result)
_check_voltage_source_loops(sch, result)
_check_component_values(sch, result)
_check_duplicate_names(sch, result)
_check_unconnected_components(sch, result)
return result
def _check_ground(sch: Schematic, result: DRCResult) -> None:
"""Check that at least one ground (node '0') exists."""
result.checks_run += 1
has_ground = any(f.name == "0" for f in sch.flags)
if not has_ground:
result.violations.append(
DRCViolation(
rule="NO_GROUND",
severity=Severity.ERROR,
message=(
"No ground node found. Every circuit needs at least "
"one ground (0) connection."
),
)
)
def _check_floating_nodes(sch: Schematic, result: DRCResult) -> None:
"""Check for nodes with only one wire connection (likely floating).
Build a connectivity map from wires and flags.
A node is a unique (x,y) point where wires meet or flags are placed.
If a point only has one wire endpoint and no flag, it might be floating.
This is approximate -- we cannot fully determine connectivity without
knowing component pin locations, but we can flag obvious cases.
"""
result.checks_run += 1
# Count how many wire endpoints touch each point
point_count: dict[tuple[int, int], int] = defaultdict(int)
for wire in sch.wires:
point_count[(wire.x1, wire.y1)] += 1
point_count[(wire.x2, wire.y2)] += 1
# Flags also represent connections at a point
flag_points = {(f.x, f.y) for f in sch.flags}
# Component positions also represent connection points (approximately)
component_points = {(c.x, c.y) for c in sch.components}
for point, count in point_count.items():
if count == 1 and point not in flag_points and point not in component_points:
result.violations.append(
DRCViolation(
rule="FLOATING_NODE",
severity=Severity.WARNING,
message=(
f"Possible floating node at ({point[0]}, {point[1]}). "
f"Wire endpoint has only one connection."
),
location=point,
)
)
def _check_simulation_directive(sch: Schematic, result: DRCResult) -> None:
"""Check that at least one simulation directive exists."""
result.checks_run += 1
directives = sch.get_spice_directives()
sim_types = [".tran", ".ac", ".dc", ".op", ".noise", ".tf"]
has_sim = any(
any(d.lower().startswith(s) for s in sim_types) for d in directives
)
if not has_sim:
result.violations.append(
DRCViolation(
rule="NO_SIM_DIRECTIVE",
severity=Severity.ERROR,
message=(
"No simulation directive found. "
"Add .tran, .ac, .dc, .op, etc."
),
)
)
def _check_voltage_source_loops(sch: Schematic, result: DRCResult) -> None:
"""Check for voltage sources connected in parallel (short circuit).
This is a simplified check -- look for voltage sources that share both
pin nodes via wire connectivity. Two voltage sources whose positions
connect to the same pair of nets would create a loop or conflict.
"""
result.checks_run += 1
# Build a union-find structure from wire connectivity so we can
# determine which points belong to the same electrical net.
parent: dict[tuple[int, int], tuple[int, int]] = {}
def find(p: tuple[int, int]) -> tuple[int, int]:
if p not in parent:
parent[p] = p
while parent[p] != p:
parent[p] = parent[parent[p]]
p = parent[p]
return p
def union(a: tuple[int, int], b: tuple[int, int]) -> None:
ra, rb = find(a), find(b)
if ra != rb:
parent[ra] = rb
# Each wire merges its two endpoints into the same net
for wire in sch.wires:
union((wire.x1, wire.y1), (wire.x2, wire.y2))
# Also merge flag locations (named nets connect distant points with
# the same name)
flag_groups: dict[str, list[tuple[int, int]]] = defaultdict(list)
for flag in sch.flags:
flag_groups[flag.name].append((flag.x, flag.y))
for pts in flag_groups.values():
for pt in pts[1:]:
union(pts[0], pt)
# Find voltage sources and approximate their pin positions.
# LTspice voltage sources have pins at the component origin and
# offset along the component axis. Standard pin spacing is 64 units
# vertically for a non-rotated voltage source (pin+ at top, pin- at
# bottom relative to the symbol origin).
voltage_sources = [
c
for c in sch.components
if "voltage" in c.symbol.lower()
]
if len(voltage_sources) < 2:
return
# Estimate pin positions per voltage source based on rotation.
# Default (R0): positive pin at (x, y-16), negative at (x, y+16)
# We use a coarse offset; the exact value depends on the symbol but
# 16 is a common half-pin-spacing in LTspice grid units.
PIN_OFFSET = 16
def _pin_positions(comp):
"""Return approximate (positive_pin, negative_pin) coordinates."""
x, y = comp.x, comp.y
rot = comp.rotation
if rot == 0:
return (x, y - PIN_OFFSET), (x, y + PIN_OFFSET)
elif rot == 90:
return (x + PIN_OFFSET, y), (x - PIN_OFFSET, y)
elif rot == 180:
return (x, y + PIN_OFFSET), (x, y - PIN_OFFSET)
elif rot == 270:
return (x - PIN_OFFSET, y), (x + PIN_OFFSET, y)
return (x, y - PIN_OFFSET), (x, y + PIN_OFFSET)
def _nearest_net(pin: tuple[int, int]) -> tuple[int, int]:
"""Find the nearest wire/flag point to a pin and return its net root.
If the pin is directly on a known point, use it. Otherwise search
within a small radius for the closest connected point.
"""
if pin in parent:
return find(pin)
# Search nearby points (LTspice grid snap is typically 16 units)
best = None
best_dist = float("inf")
for pt in parent:
dx = pin[0] - pt[0]
dy = pin[1] - pt[1]
dist = dx * dx + dy * dy
if dist < best_dist:
best_dist = dist
best = pt
if best is not None and best_dist <= 32 * 32:
return find(best)
return pin # isolated -- return pin itself as its own net
# For each pair of voltage sources, check if they share both nets
for i in range(len(voltage_sources)):
pin_a_pos, pin_a_neg = _pin_positions(voltage_sources[i])
net_a_pos = _nearest_net(pin_a_pos)
net_a_neg = _nearest_net(pin_a_neg)
for j in range(i + 1, len(voltage_sources)):
pin_b_pos, pin_b_neg = _pin_positions(voltage_sources[j])
net_b_pos = _nearest_net(pin_b_pos)
net_b_neg = _nearest_net(pin_b_neg)
# Parallel if both nets match (in either polarity)
parallel = (net_a_pos == net_b_pos and net_a_neg == net_b_neg) or (
net_a_pos == net_b_neg and net_a_neg == net_b_pos
)
if parallel:
name_i = voltage_sources[i].name
name_j = voltage_sources[j].name
result.violations.append(
DRCViolation(
rule="VSOURCE_LOOP",
severity=Severity.ERROR,
message=(
f"Voltage sources '{name_i}' and '{name_j}' "
f"appear to be connected in parallel, which "
f"creates a short circuit / voltage conflict."
),
component=name_i,
)
)
def _check_component_values(sch: Schematic, result: DRCResult) -> None:
"""Check that components have values where expected.
Resistors, capacitors, inductors should have values.
Voltage/current sources should have values.
Skip if the value looks like a parameter expression (e.g., "{R1}").
"""
result.checks_run += 1
# Map symbol substrings to human-readable type names
value_required = {
"res": "Resistor",
"cap": "Capacitor",
"ind": "Inductor",
"voltage": "Voltage source",
"current": "Current source",
}
for comp in sch.components:
symbol_lower = comp.symbol.lower()
matched_type = None
for pattern, label in value_required.items():
if pattern in symbol_lower:
matched_type = label
break
if matched_type is None:
continue
val = comp.value
if not val or not val.strip():
result.violations.append(
DRCViolation(
rule="MISSING_VALUE",
severity=Severity.WARNING,
message=(
f"{matched_type} '{comp.name}' has no value set."
),
component=comp.name,
location=(comp.x, comp.y),
)
)
elif val.strip().startswith("{") and val.strip().endswith("}"):
# Parameter expression -- valid, skip
pass
def _check_duplicate_names(sch: Schematic, result: DRCResult) -> None:
"""Check for duplicate component instance names."""
result.checks_run += 1
seen: dict[str, bool] = {}
for comp in sch.components:
if not comp.name:
continue
if comp.name in seen:
result.violations.append(
DRCViolation(
rule="DUPLICATE_NAME",
severity=Severity.ERROR,
message=f"Duplicate component name '{comp.name}'.",
component=comp.name,
location=(comp.x, comp.y),
)
)
else:
seen[comp.name] = True
def _check_unconnected_components(sch: Schematic, result: DRCResult) -> None:
"""Check for components that don't seem to be connected to anything.
A component at (x, y) should have wire endpoints near its pins.
This is approximate without knowing exact pin positions, so we check
whether any wire endpoint falls within a reasonable distance (16
units -- one LTspice grid step) of the component's origin.
"""
result.checks_run += 1
PROXIMITY = 16 # LTspice grid spacing
# Collect all wire endpoints into a set for fast lookup
wire_points: set[tuple[int, int]] = set()
for wire in sch.wires:
wire_points.add((wire.x1, wire.y1))
wire_points.add((wire.x2, wire.y2))
# Also include flag positions (flags connect to nets)
flag_points: set[tuple[int, int]] = set()
for flag in sch.flags:
flag_points.add((flag.x, flag.y))
all_connection_points = wire_points | flag_points
for comp in sch.components:
if not comp.name:
continue
# Check if any connection point is within PROXIMITY of the
# component origin. We scan a small bounding box rather than
# iterating all points.
connected = False
for pt in all_connection_points:
dx = abs(pt[0] - comp.x)
dy = abs(pt[1] - comp.y)
if dx <= PROXIMITY and dy <= PROXIMITY:
connected = True
break
if not connected:
result.violations.append(
DRCViolation(
rule="UNCONNECTED_COMPONENT",
severity=Severity.WARNING,
message=(
f"Component '{comp.name}' ({comp.symbol}) at "
f"({comp.x}, {comp.y}) has no nearby wire "
f"connections."
),
component=comp.name,
location=(comp.x, comp.y),
)
)

View File

@ -0,0 +1,171 @@
"""Parse LTspice simulation log files."""
from dataclasses import dataclass, field
from pathlib import Path
import re
@dataclass
class Measurement:
"""A .meas result from the log."""
name: str
value: float | None # None if FAILED
failed: bool = False
@dataclass
class SimulationLog:
"""Parsed contents of an LTspice .log file."""
measurements: list[Measurement] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
elapsed_time: float | None = None
n_equations: int | None = None
n_steps: int | None = None
raw_text: str = ""
def get_measurement(self, name: str) -> Measurement | None:
"""Get a measurement by name (case-insensitive)."""
name_lower = name.lower()
for m in self.measurements:
if m.name.lower() == name_lower:
return m
return None
def get_all_measurements(self) -> dict[str, float | None]:
"""Return dict of measurement name -> value."""
return {m.name: m.value for m in self.measurements}
# Patterns for measurement results.
# LTspice emits results in several formats depending on version and OS:
# name=1.23456e-006 (no spaces around '=')
# name = 1.23456e-006 (spaces around '=')
# name: 1.23456e-006 (colon separator)
# name: FAILED (measurement could not be computed)
# name=FAILED
_MEAS_VALUE_RE = re.compile(
r"^(?P<name>\S+?)\s*[=:]\s*(?P<value>[+-]?\d+(?:\.\d+)?(?:e[+-]?\d+)?)\s*$",
re.IGNORECASE,
)
_MEAS_FAILED_RE = re.compile(
r"^(?P<name>\S+?)\s*[=:]?\s*FAILED\s*$",
re.IGNORECASE,
)
# Simulation statistics patterns.
_ELAPSED_TIME_RE = re.compile(
r"Total elapsed time:\s*(?P<seconds>[+-]?\d+(?:\.\d+)?)\s*seconds?",
re.IGNORECASE,
)
_N_EQUATIONS_RE = re.compile(
r"N-of-equations:\s*(?P<n>\d+)",
re.IGNORECASE,
)
_N_STEPS_RE = re.compile(
r"N-of-steps:\s*(?P<n>\d+)",
re.IGNORECASE,
)
# Lines starting with ".meas" are directive echoes, not results -- skip them.
_MEAS_DIRECTIVE_RE = re.compile(r"^\s*\.meas\s", re.IGNORECASE)
def _is_error_line(line: str) -> bool:
"""Return True if the line reports an error."""
return bool(re.search(r"\bError\b", line, re.IGNORECASE))
def _is_warning_line(line: str) -> bool:
"""Return True if the line reports a warning."""
return bool(re.search(r"\bWarning\b", line, re.IGNORECASE))
def parse_log(path: Path | str) -> SimulationLog:
"""Parse an LTspice .log file.
Reads the file at *path* and extracts measurement results, errors,
warnings, and basic simulation statistics. The parser is intentionally
lenient -- unknown lines are silently ignored so that it works across
different LTspice versions and simulation types (transient, AC, DC, etc.).
"""
path = Path(path)
# LTspice log files may be encoded as UTF-8 or Latin-1 depending on the
# platform. Try UTF-8 first, fall back to Latin-1 which never raises.
for encoding in ("utf-8", "latin-1"):
try:
raw_text = path.read_text(encoding=encoding)
break
except UnicodeDecodeError:
continue
else:
raw_text = path.read_text(encoding="latin-1", errors="replace")
log = SimulationLog(raw_text=raw_text)
for line in raw_text.splitlines():
stripped = line.strip()
if not stripped:
continue
# Skip echoed .meas directives -- they are not results.
if _MEAS_DIRECTIVE_RE.match(stripped):
continue
# Errors and warnings.
if _is_error_line(stripped):
log.errors.append(stripped)
if _is_warning_line(stripped):
log.warnings.append(stripped)
# Measurement: failed.
m = _MEAS_FAILED_RE.match(stripped)
if m:
log.measurements.append(
Measurement(name=m.group("name"), value=None, failed=True)
)
continue
# Measurement: numeric value.
m = _MEAS_VALUE_RE.match(stripped)
if m:
try:
value = float(m.group("value"))
except ValueError:
value = None
log.measurements.append(
Measurement(name=m.group("name"), value=value, failed=(value is None))
)
continue
# Elapsed time.
m = _ELAPSED_TIME_RE.search(stripped)
if m:
try:
log.elapsed_time = float(m.group("seconds"))
except ValueError:
pass
continue
# Number of equations.
m = _N_EQUATIONS_RE.search(stripped)
if m:
try:
log.n_equations = int(m.group("n"))
except ValueError:
pass
continue
# Number of steps / iterations.
m = _N_STEPS_RE.search(stripped)
if m:
try:
log.n_steps = int(m.group("n"))
except ValueError:
pass
continue
return log

355
src/mcp_ltspice/models.py Normal file
View File

@ -0,0 +1,355 @@
"""Search and parse LTspice SPICE model libraries."""
from dataclasses import dataclass, field
from pathlib import Path
import re
from .config import LTSPICE_LIB
# Known SPICE model types and their categories
_DISCRETE_TYPES = frozenset({
"NPN", "PNP", # BJTs
"NMOS", "PMOS", "VDMOS", # MOSFETs
"D", # Diodes
"NJF", "PJF", # JFETs
})
# Module-level cache
_cache: tuple[list["SpiceModel"], list["SpiceSubcircuit"]] | None = None
@dataclass
class SpiceModel:
"""A .model definition."""
name: str # e.g., "2N2222"
type: str # e.g., "NPN", "D", "NMOS", "PMOS", "PNP"
parameters: dict[str, str] = field(default_factory=dict)
source_file: str = ""
@dataclass
class SpiceSubcircuit:
"""A .subckt definition."""
name: str # e.g., "LT1001"
pins: list[str] = field(default_factory=list)
pin_names: list[str] = field(default_factory=list) # From comments
description: str = ""
source_file: str = ""
n_components: int = 0
def search_models(
search: str | None = None,
model_type: str | None = None,
limit: int = 50,
) -> list[SpiceModel]:
"""Search for .model definitions in the library.
Args:
search: Search term for model name (case-insensitive)
model_type: Filter by type: NPN, PNP, NMOS, PMOS, D, etc.
limit: Maximum results
Returns:
List of matching SpiceModel objects
"""
all_models, _ = _scan_all_libraries()
results: list[SpiceModel] = []
search_upper = search.upper() if search else None
type_upper = model_type.upper() if model_type else None
for model in all_models:
if type_upper and model.type.upper() != type_upper:
continue
if search_upper and search_upper not in model.name.upper():
continue
results.append(model)
if len(results) >= limit:
break
return results
def search_subcircuits(
search: str | None = None,
limit: int = 50,
) -> list[SpiceSubcircuit]:
"""Search for .subckt definitions in the library.
Args:
search: Search term for subcircuit name
limit: Maximum results
Returns:
List of matching SpiceSubcircuit objects
"""
_, all_subcircuits = _scan_all_libraries()
results: list[SpiceSubcircuit] = []
search_upper = search.upper() if search else None
for subckt in all_subcircuits:
if search_upper and search_upper not in subckt.name.upper():
continue
results.append(subckt)
if len(results) >= limit:
break
return results
def get_model_details(name: str) -> SpiceModel | SpiceSubcircuit | None:
"""Get detailed information about a specific model or subcircuit.
Searches all library files for exact match (case-insensitive).
"""
all_models, all_subcircuits = _scan_all_libraries()
name_upper = name.upper()
for model in all_models:
if model.name.upper() == name_upper:
return model
for subckt in all_subcircuits:
if subckt.name.upper() == name_upper:
return subckt
return None
def _read_file_text(path: Path) -> str:
"""Read a library file, handling both UTF-16-LE and ASCII/Latin-1 encodings.
LTspice stores some files (especially .bjt, .mos, .jft, etc.) as
UTF-16-LE without a BOM. Others are plain ASCII or Latin-1.
Binary/encrypted .sub files will raise on decode; callers handle that.
"""
raw = path.read_bytes()
if not raw:
return ""
# Detect UTF-16-LE: check if every other byte (odd positions) is 0x00
# for the first several bytes. This is a strong indicator of UTF-16-LE
# ASCII text without a BOM.
if len(raw) >= 20:
sample = raw[:40]
null_positions = sum(1 for i in range(1, len(sample), 2) if sample[i] == 0)
total_pairs = len(sample) // 2
if total_pairs > 0 and null_positions / total_pairs > 0.7:
return raw.decode("utf-16-le", errors="replace")
# Fall back to latin-1 which never fails (maps bytes 1:1 to codepoints)
return raw.decode("latin-1")
# Regex patterns compiled once
_MODEL_RE = re.compile(
r"^\s*\.model\s+" # .model keyword
r"(\S+)\s+" # model name
r"(?:ako:\S+\s+)?" # optional ako:reference
r"(\w+)" # type (NPN, D, VDMOS, etc.)
r"(?:\s*\(([^)]*)\))?" # optional (params)
r"(.*)", # trailing params outside parens
re.IGNORECASE,
)
_SUBCKT_RE = re.compile(
r"^\s*\.subckt\s+"
r"(\S+)" # subcircuit name
r"((?:\s+\S+)*)", # pins (space-separated)
re.IGNORECASE,
)
_ENDS_RE = re.compile(r"^\s*\.ends\b", re.IGNORECASE)
_PIN_COMMENT_RE = re.compile(
r"^\s*\*\s*[Pp]in\s+\S+\s*[:=]?\s*(.*)",
)
def _parse_params(param_str: str) -> dict[str, str]:
"""Parse SPICE parameter string into a dict.
Handles formats like: IS=14.34f BF=200 NF=1 VAF=74.03
"""
params: dict[str, str] = {}
if not param_str:
return params
for match in re.finditer(r"(\w+)\s*=\s*(\S+)", param_str):
params[match.group(1)] = match.group(2)
return params
def _scan_lib_file(path: Path) -> tuple[list[SpiceModel], list[SpiceSubcircuit]]:
"""Scan a single library file for model and subcircuit definitions.
Handles multi-line continuation (lines starting with +).
Silently skips binary/encrypted files that can't be decoded.
"""
models: list[SpiceModel] = []
subcircuits: list[SpiceSubcircuit] = []
try:
text = _read_file_text(path)
except Exception:
return models, subcircuits
if not text:
return models, subcircuits
# Join continuation lines (+ at start of line continues previous line)
# Work through lines, merging continuations
raw_lines = text.splitlines()
lines: list[str] = []
for line in raw_lines:
stripped = line.strip()
if stripped.startswith("+") and lines:
# Continuation: append to previous line (strip the +)
lines[-1] = lines[-1] + " " + stripped[1:].strip()
else:
lines.append(line)
source = str(path.relative_to(LTSPICE_LIB)) if _is_under(path, LTSPICE_LIB) else path.name
# State tracking for subcircuit parsing
in_subckt = False
current_subckt: SpiceSubcircuit | None = None
component_count = 0
pin_comments: list[str] = []
pre_subckt_comments: list[str] = []
for line in lines:
stripped = line.strip()
# Track comments that might describe pins (before or after .subckt)
if stripped.startswith("*"):
if in_subckt:
pin_match = _PIN_COMMENT_RE.match(stripped)
if pin_match and current_subckt is not None:
pin_comments.append(pin_match.group(1).strip())
else:
pre_subckt_comments.append(stripped)
continue
if not stripped:
if not in_subckt:
pre_subckt_comments.clear()
continue
# Check for .model
model_match = _MODEL_RE.match(stripped)
if model_match:
name = model_match.group(1)
mtype = model_match.group(2).upper()
param_str = (model_match.group(3) or "") + " " + (model_match.group(4) or "")
params = _parse_params(param_str)
models.append(SpiceModel(
name=name,
type=mtype,
parameters=params,
source_file=source,
))
continue
# Check for .subckt
subckt_match = _SUBCKT_RE.match(stripped)
if subckt_match and not in_subckt:
name = subckt_match.group(1)
pin_str = subckt_match.group(2).strip()
pins = pin_str.split() if pin_str else []
# Extract description from preceding comments
description = ""
for comment in pre_subckt_comments:
cleaned = comment.lstrip("* ").strip()
if cleaned and not cleaned.startswith("Copyright") and len(cleaned) > 3:
description = cleaned
break
current_subckt = SpiceSubcircuit(
name=name,
pins=pins,
description=description,
source_file=source,
)
in_subckt = True
component_count = 0
pin_comments.clear()
pre_subckt_comments.clear()
continue
# Check for .ends
if _ENDS_RE.match(stripped):
if current_subckt is not None:
current_subckt.n_components = component_count
current_subckt.pin_names = pin_comments[:]
subcircuits.append(current_subckt)
in_subckt = False
current_subckt = None
pin_comments.clear()
continue
# Count component lines inside a subcircuit
if in_subckt and stripped and not stripped.startswith("."):
# Component lines typically start with a letter (R, C, L, M, Q, D, etc.)
if stripped[0].isalpha():
component_count += 1
return models, subcircuits
def _is_under(path: Path, parent: Path) -> bool:
"""Check if path is under parent directory."""
try:
path.relative_to(parent)
return True
except ValueError:
return False
def _collect_lib_files() -> list[Path]:
"""Collect all scannable library files from known directories."""
files: list[Path] = []
extensions = {".lib", ".sub", ".mod", ".bjt", ".dio", ".mos", ".jft"}
# Scan lib/cmp/ for component model files
cmp_dir = LTSPICE_LIB / "cmp"
if cmp_dir.is_dir():
for f in cmp_dir.iterdir():
if f.is_file() and f.suffix.lower() in extensions:
files.append(f)
# Scan lib/sub/ for subcircuit files (recursive to include Contrib)
sub_dir = LTSPICE_LIB / "sub"
if sub_dir.is_dir():
for f in sub_dir.rglob("*"):
if f.is_file() and f.suffix.lower() in extensions:
files.append(f)
return sorted(files)
def _scan_all_libraries() -> tuple[list[SpiceModel], list[SpiceSubcircuit]]:
"""Scan all library files. Results are cached after first call."""
global _cache
if _cache is not None:
return _cache
all_models: list[SpiceModel] = []
all_subcircuits: list[SpiceSubcircuit] = []
for path in _collect_lib_files():
models, subcircuits = _scan_lib_file(path)
all_models.extend(models)
all_subcircuits.extend(subcircuits)
# Sort for consistent ordering
all_models.sort(key=lambda m: m.name.upper())
all_subcircuits.sort(key=lambda s: s.name.upper())
_cache = (all_models, all_subcircuits)
return _cache

383
src/mcp_ltspice/netlist.py Normal file
View File

@ -0,0 +1,383 @@
"""Programmatic SPICE netlist generation for LTspice."""
from dataclasses import dataclass, field
from pathlib import Path
@dataclass
class NetlistComponent:
"""A component in the netlist."""
name: str # R1, C1, V1, M1, X1, etc.
nodes: list[str] # Connected node names
value: str # Value or model name
params: str = "" # Additional parameters
@dataclass
class Netlist:
"""A SPICE netlist that can be saved as a .cir file.
Supports a builder pattern -- all add_* methods return self for chaining:
netlist = (Netlist("My Circuit")
.add_resistor("R1", "in", "out", "10k")
.add_capacitor("C1", "out", "0", "100n")
.add_voltage_source("V1", "in", "0", ac="1")
.add_directive(".ac dec 100 1 1meg"))
"""
title: str = "LTspice Simulation"
components: list[NetlistComponent] = field(default_factory=list)
directives: list[str] = field(default_factory=list)
comments: list[str] = field(default_factory=list)
includes: list[str] = field(default_factory=list)
# -- Passive components ---------------------------------------------------
def add_resistor(
self, name: str, node_p: str, node_n: str, value: str
) -> "Netlist":
"""Add a resistor. Example: add_resistor('R1', 'in', 'out', '10k')"""
self.components.append(
NetlistComponent(name=name, nodes=[node_p, node_n], value=value)
)
return self
def add_capacitor(
self, name: str, node_p: str, node_n: str, value: str
) -> "Netlist":
"""Add a capacitor."""
self.components.append(
NetlistComponent(name=name, nodes=[node_p, node_n], value=value)
)
return self
def add_inductor(
self,
name: str,
node_p: str,
node_n: str,
value: str,
series_resistance: str | None = None,
) -> "Netlist":
"""Add an inductor with optional series resistance (Rser)."""
params = f"Rser={series_resistance}" if series_resistance else ""
self.components.append(
NetlistComponent(
name=name, nodes=[node_p, node_n], value=value, params=params
)
)
return self
# -- Sources --------------------------------------------------------------
def add_voltage_source(
self,
name: str,
node_p: str,
node_n: str,
dc: str | None = None,
ac: str | None = None,
pulse: tuple | None = None,
sin: tuple | None = None,
) -> "Netlist":
"""Add a voltage source.
Args:
name: Source name (V1, V2, etc.)
node_p: Positive node
node_n: Negative node
dc: DC value (e.g., "5")
ac: AC magnitude (e.g., "1")
pulse: (Vinitial, Von, Tdelay, Trise, Tfall, Ton, Tperiod)
sin: (Voffset, Vamp, Freq, Td, Theta, Phi)
"""
value = self._build_source_value(dc=dc, ac=ac, pulse=pulse, sin=sin)
self.components.append(
NetlistComponent(name=name, nodes=[node_p, node_n], value=value)
)
return self
def add_current_source(
self,
name: str,
node_p: str,
node_n: str,
dc: str | None = None,
ac: str | None = None,
) -> "Netlist":
"""Add a current source."""
value = self._build_source_value(dc=dc, ac=ac)
self.components.append(
NetlistComponent(name=name, nodes=[node_p, node_n], value=value)
)
return self
# -- Semiconductors -------------------------------------------------------
def add_diode(
self, name: str, anode: str, cathode: str, model: str
) -> "Netlist":
"""Add a diode. Example: add_diode('D1', 'a', 'k', '1N4148')"""
self.components.append(
NetlistComponent(name=name, nodes=[anode, cathode], value=model)
)
return self
def add_mosfet(
self,
name: str,
drain: str,
gate: str,
source: str,
body: str,
model: str,
w: str | None = None,
l: str | None = None,
) -> "Netlist":
"""Add a MOSFET."""
params_parts: list[str] = []
if w:
params_parts.append(f"W={w}")
if l:
params_parts.append(f"L={l}")
params = " ".join(params_parts)
self.components.append(
NetlistComponent(
name=name,
nodes=[drain, gate, source, body],
value=model,
params=params,
)
)
return self
def add_bjt(
self, name: str, collector: str, base: str, emitter: str, model: str
) -> "Netlist":
"""Add a BJT transistor."""
self.components.append(
NetlistComponent(
name=name, nodes=[collector, base, emitter], value=model
)
)
return self
# -- Subcircuits ----------------------------------------------------------
def add_opamp(
self,
name: str,
inp: str,
inn: str,
out: str,
vpos: str,
vneg: str,
model: str,
) -> "Netlist":
"""Add an op-amp subcircuit instance.
Pin order follows the LTspice convention:
X<name> <inp> <inn> <vpos> <vneg> <out> <model>
"""
self.components.append(
NetlistComponent(
name=name, nodes=[inp, inn, vpos, vneg, out], value=model
)
)
return self
def add_subcircuit(
self, name: str, nodes: list[str], model: str
) -> "Netlist":
"""Add a generic subcircuit instance."""
self.components.append(
NetlistComponent(name=name, nodes=list(nodes), value=model)
)
return self
# -- Generic component ----------------------------------------------------
def add_component(
self, name: str, nodes: list[str], value: str, params: str = ""
) -> "Netlist":
"""Add any component with explicit nodes."""
self.components.append(
NetlistComponent(
name=name, nodes=list(nodes), value=value, params=params
)
)
return self
# -- Directives -----------------------------------------------------------
def add_directive(self, directive: str) -> "Netlist":
"""Add a SPICE directive (e.g., '.tran 10m', '.ac dec 100 1 1meg')."""
self.directives.append(directive)
return self
def add_meas(self, analysis: str, name: str, expression: str) -> "Netlist":
"""Add a .meas directive.
Example:
add_meas('tran', 'rise_time',
'TRIG V(out) VAL=0.1 RISE=1 TARG V(out) VAL=0.9 RISE=1')
"""
self.directives.append(f".meas {analysis} {name} {expression}")
return self
def add_param(self, name: str, value: str) -> "Netlist":
"""Add a .param directive."""
self.directives.append(f".param {name}={value}")
return self
def add_include(self, path: str) -> "Netlist":
"""Add a .include directive for library files."""
self.includes.append(f".include {path}")
return self
def add_lib(self, path: str) -> "Netlist":
"""Add a .lib directive."""
self.includes.append(f".lib {path}")
return self
def add_comment(self, text: str) -> "Netlist":
"""Add a comment line."""
self.comments.append(text)
return self
# -- Rendering / I/O ------------------------------------------------------
def render(self) -> str:
"""Render the netlist to a SPICE string."""
lines = [f"* {self.title}"]
for comment in self.comments:
lines.append(f"* {comment}")
for inc in self.includes:
lines.append(inc) # Already formatted as .include or .lib
lines.append("") # blank separator
for comp in self.components:
line = f"{comp.name} {' '.join(comp.nodes)} {comp.value}"
if comp.params:
line += f" {comp.params}"
lines.append(line)
lines.append("")
for directive in self.directives:
lines.append(directive)
lines.append(".backanno")
lines.append(".end")
return "\n".join(lines) + "\n"
def save(self, path: Path | str) -> Path:
"""Save netlist to a .cir file."""
path = Path(path)
path.write_text(self.render())
return path
# -- Internal helpers -----------------------------------------------------
@staticmethod
def _build_source_value(
dc: str | None = None,
ac: str | None = None,
pulse: tuple | None = None,
sin: tuple | None = None,
) -> str:
"""Build the value string for a voltage/current source."""
parts: list[str] = []
if dc is not None:
parts.append(dc)
if ac is not None:
parts.append(f"AC {ac}")
if pulse is not None:
params_str = " ".join(str(p) for p in pulse)
parts.append(f"PULSE({params_str})")
if sin is not None:
params_str = " ".join(str(p) for p in sin)
parts.append(f"SIN({params_str})")
return " ".join(parts) if parts else "0"
# ---------------------------------------------------------------------------
# Convenience functions for common circuit topologies
# ---------------------------------------------------------------------------
def voltage_divider(
v_in: str = "5",
r1: str = "10k",
r2: str = "10k",
sim_type: str = "op",
) -> Netlist:
"""Create a voltage divider circuit.
Args:
v_in: Input voltage (DC).
r1: Top resistor value.
r2: Bottom resistor value.
sim_type: Simulation directive -- "op" for operating point,
or a full directive string like ".tran 10m".
"""
netlist = (
Netlist("Voltage Divider")
.add_voltage_source("V1", "in", "0", dc=v_in)
.add_resistor("R1", "in", "out", r1)
.add_resistor("R2", "out", "0", r2)
)
directive = sim_type if sim_type.startswith(".") else f".{sim_type}"
netlist.add_directive(directive)
return netlist
def rc_lowpass(
r: str = "1k",
c: str = "100n",
f_start: str = "1",
f_stop: str = "1meg",
) -> Netlist:
"""Create an RC lowpass filter with AC analysis.
The circuit is driven by a 1V AC source and outputs at node 'out'.
"""
return (
Netlist("RC Lowpass Filter")
.add_voltage_source("V1", "in", "0", ac="1")
.add_resistor("R1", "in", "out", r)
.add_capacitor("C1", "out", "0", c)
.add_directive(f".ac dec 100 {f_start} {f_stop}")
)
def inverting_amplifier(
r_in: str = "10k",
r_f: str = "100k",
opamp_model: str = "LT1001",
) -> Netlist:
"""Create an inverting op-amp amplifier.
Topology:
V1 --[R_in]--> inv(-) --[R_f]--> out
non-inv(+) --> GND
Supply: +/-15V
"""
return (
Netlist("Inverting Amplifier")
.add_comment(f"Gain = -{r_f}/{r_in}")
.add_lib(opamp_model)
.add_voltage_source("V1", "in", "0", ac="1")
.add_voltage_source("Vpos", "vdd", "0", dc="15")
.add_voltage_source("Vneg", "0", "vss", dc="15")
.add_resistor("Rin", "in", "inv", r_in)
.add_resistor("Rf", "inv", "out", r_f)
.add_opamp("X1", "0", "inv", "out", "vdd", "vss", opamp_model)
.add_directive(".ac dec 100 1 1meg")
)

View File

@ -1,13 +1,19 @@
"""FastMCP server for LTspice circuit simulation automation.
This server provides tools for:
- Running SPICE simulations
- Extracting waveform data
- Running SPICE simulations on schematics and netlists
- Extracting and analyzing waveform data
- Creating circuits programmatically
- Modifying schematic components
- Browsing component libraries and examples
- Browsing component libraries, models, and examples
- Design rule checks and circuit comparison
"""
import csv
import io
import json
import math
import tempfile
from pathlib import Path
import numpy as np
@ -19,14 +25,29 @@ from .config import (
LTSPICE_LIB,
validate_installation,
)
from .diff import diff_schematics as _diff_schematics
from .drc import run_drc as _run_drc
from .log_parser import parse_log
from .models import (
get_model_details as _get_model_details,
search_models as _search_models,
search_subcircuits as _search_subcircuits,
)
from .netlist import Netlist
from .raw_parser import parse_raw_file
from .runner import run_netlist, run_simulation
from .schematic import (
modify_component_value,
parse_schematic,
from .schematic import modify_component_value, parse_schematic
from .waveform_math import (
compute_bandwidth,
compute_fft,
compute_peak_to_peak,
compute_rise_time,
compute_rms,
compute_settling_time,
compute_thd,
)
# Initialize FastMCP server
mcp = FastMCP(
name="mcp-ltspice",
instructions="""
@ -35,9 +56,15 @@ mcp = FastMCP(
Use this server to:
- Run SPICE simulations on .asc schematics or .cir netlists
- Extract waveform data (voltages, currents) from simulation results
- Analyze signals: FFT, THD, RMS, bandwidth, settling time
- Create circuits from scratch using the netlist builder
- Modify component values in schematics programmatically
- Browse LTspice's component library (6500+ symbols)
- Search 2800+ SPICE models and subcircuits
- Access example circuits (4000+ examples)
- Run design rule checks before simulation
- Compare schematics to see what changed
- Export waveform data to CSV
LTspice runs via Wine on Linux. Simulations execute in batch mode
and results are parsed from binary .raw files.
@ -46,7 +73,7 @@ mcp = FastMCP(
# ============================================================================
# TOOLS
# SIMULATION TOOLS
# ============================================================================
@ -57,20 +84,13 @@ async def simulate(
) -> dict:
"""Run an LTspice simulation on a schematic file.
This runs LTspice in batch mode, which executes any simulation
directives (.tran, .ac, .dc, .op, etc.) in the schematic.
Executes any simulation directives (.tran, .ac, .dc, .op, etc.)
found in the schematic. Returns available signal names and
the path to the .raw file for waveform extraction.
Args:
schematic_path: Absolute path to .asc schematic file
timeout_seconds: Maximum time to wait for simulation (default 5 min)
Returns:
dict with:
- success: bool
- elapsed_seconds: simulation time
- variables: list of signal names available
- points: number of data points
- error: error message if failed
"""
result = await run_simulation(
schematic_path,
@ -93,6 +113,13 @@ async def simulate(
response["plotname"] = result.raw_data.plotname
response["raw_file"] = str(result.raw_file) if result.raw_file else None
if result.log_file and result.log_file.exists():
log = parse_log(result.log_file)
if log.measurements:
response["measurements"] = log.get_all_measurements()
if log.errors:
response["log_errors"] = log.errors
return response
@ -101,17 +128,11 @@ async def simulate_netlist(
netlist_path: str,
timeout_seconds: float = 300,
) -> dict:
"""Run an LTspice simulation on a netlist file.
Use this for .cir or .net SPICE netlist files instead of
schematic .asc files.
"""Run an LTspice simulation on a netlist file (.cir or .net).
Args:
netlist_path: Absolute path to .cir or .net netlist file
timeout_seconds: Maximum time to wait for simulation
Returns:
dict with simulation results (same as simulate)
"""
result = await run_netlist(
netlist_path,
@ -133,9 +154,21 @@ async def simulate_netlist(
response["points"] = result.raw_data.points
response["raw_file"] = str(result.raw_file) if result.raw_file else None
if result.log_file and result.log_file.exists():
log = parse_log(result.log_file)
if log.measurements:
response["measurements"] = log.get_all_measurements()
if log.errors:
response["log_errors"] = log.errors
return response
# ============================================================================
# WAVEFORM & ANALYSIS TOOLS
# ============================================================================
@mcp.tool()
def get_waveform(
raw_file_path: str,
@ -144,31 +177,22 @@ def get_waveform(
) -> dict:
"""Extract waveform data from a .raw simulation results file.
After running a simulation, use this to get the actual data values.
For transient analysis, includes time axis. For AC, includes frequency.
For transient analysis, returns time + voltage/current values.
For AC analysis, returns frequency + magnitude(dB)/phase(degrees).
Args:
raw_file_path: Path to .raw file from simulation
signal_names: List of signal names to extract (partial match OK)
e.g., ["V(out)", "I(R1)"] or just ["out", "R1"]
max_points: Maximum data points to return (downsampled if needed)
Returns:
dict with:
- time_or_frequency: the x-axis data
- signals: dict mapping signal name to data array
- units: dict mapping signal name to unit type
signal_names: Signal names to extract, e.g. ["V(out)", "I(R1)"]
max_points: Maximum data points (downsampled if needed)
"""
raw = parse_raw_file(raw_file_path)
# Get x-axis (time or frequency)
x_axis = raw.get_time()
x_name = "time"
if x_axis is None:
x_axis = raw.get_frequency()
x_name = "frequency"
# Downsample if needed
total_points = len(x_axis) if x_axis is not None else raw.points
step = max(1, total_points // max_points)
@ -182,21 +206,17 @@ def get_waveform(
if x_axis is not None:
sampled = x_axis[::step]
# For frequency domain, take real part (imag is 0)
if np.iscomplexobj(sampled):
result["x_axis_data"] = sampled.real.tolist()
else:
result["x_axis_data"] = sampled.tolist()
result["returned_points"] = len(result["x_axis_data"])
# Extract requested signals
for name in signal_names:
data = raw.get_variable(name)
if data is not None:
sampled = data[::step]
# Handle complex data (AC analysis)
if np.iscomplexobj(sampled):
import math
result["signals"][name] = {
"magnitude_db": [
20 * math.log10(abs(x)) if abs(x) > 0 else -200
@ -213,6 +233,223 @@ def get_waveform(
return result
@mcp.tool()
def analyze_waveform(
raw_file_path: str,
signal_name: str,
analyses: list[str],
settling_tolerance_pct: float = 2.0,
settling_final_value: float | None = None,
rise_low_pct: float = 10.0,
rise_high_pct: float = 90.0,
fft_max_harmonics: int = 50,
thd_n_harmonics: int = 10,
) -> dict:
"""Analyze a signal from simulation results.
Run one or more analyses on a waveform. Available analyses:
- "rms": Root mean square value
- "peak_to_peak": Min, max, peak-to-peak swing, mean
- "settling_time": Time to settle within tolerance of final value
- "rise_time": 10%-90% rise time (configurable)
- "fft": Frequency spectrum via FFT
- "thd": Total Harmonic Distortion
Args:
raw_file_path: Path to .raw file
signal_name: Signal to analyze, e.g. "V(out)"
analyses: List of analysis types to run
settling_tolerance_pct: Tolerance for settling time (default 2%)
settling_final_value: Target value (None = use last sample)
rise_low_pct: Low threshold for rise time (default 10%)
rise_high_pct: High threshold for rise time (default 90%)
fft_max_harmonics: Max harmonics to return in FFT
thd_n_harmonics: Number of harmonics for THD calculation
"""
raw = parse_raw_file(raw_file_path)
time = raw.get_time()
signal = raw.get_variable(signal_name)
if signal is None:
return {"error": f"Signal '{signal_name}' not found. Available: "
f"{[v.name for v in raw.variables]}"}
# Use real parts for time-domain analysis
if np.iscomplexobj(time):
time = time.real
if np.iscomplexobj(signal):
signal = np.abs(signal)
results = {"signal": signal_name}
for analysis in analyses:
if analysis == "rms":
results["rms"] = compute_rms(signal)
elif analysis == "peak_to_peak":
results["peak_to_peak"] = compute_peak_to_peak(signal)
elif analysis == "settling_time":
if time is not None:
results["settling_time"] = compute_settling_time(
time, signal,
final_value=settling_final_value,
tolerance_percent=settling_tolerance_pct,
)
elif analysis == "rise_time":
if time is not None:
results["rise_time"] = compute_rise_time(
time, signal,
low_pct=rise_low_pct,
high_pct=rise_high_pct,
)
elif analysis == "fft":
if time is not None:
results["fft"] = compute_fft(
time, signal,
max_harmonics=fft_max_harmonics,
)
elif analysis == "thd":
if time is not None:
results["thd"] = compute_thd(
time, signal,
n_harmonics=thd_n_harmonics,
)
return results
@mcp.tool()
def measure_bandwidth(
raw_file_path: str,
signal_name: str,
ref_db: float | None = None,
) -> dict:
"""Measure -3dB bandwidth from an AC analysis result.
Computes the frequency range where the signal is within 3dB
of its peak (or a specified reference level).
Args:
raw_file_path: Path to .raw file from AC simulation
signal_name: Signal to measure, e.g. "V(out)"
ref_db: Reference level in dB (None = use peak)
"""
raw = parse_raw_file(raw_file_path)
freq = raw.get_frequency()
signal = raw.get_variable(signal_name)
if freq is None:
return {"error": "Not an AC analysis - no frequency data found"}
if signal is None:
return {"error": f"Signal '{signal_name}' not found"}
# Convert complex signal to magnitude in dB
mag_db = np.array([
20 * math.log10(abs(x)) if abs(x) > 0 else -200
for x in signal
])
return compute_bandwidth(freq.real, mag_db, ref_db=ref_db)
@mcp.tool()
def export_csv(
raw_file_path: str,
signal_names: list[str] | None = None,
output_path: str | None = None,
max_points: int = 10000,
) -> dict:
"""Export simulation waveform data to CSV format.
Args:
raw_file_path: Path to .raw file
signal_names: Signals to export (None = all)
output_path: Where to save CSV (None = auto-generate in /tmp)
max_points: Maximum rows to export
"""
raw = parse_raw_file(raw_file_path)
# Determine x-axis
x_axis = raw.get_time()
x_name = "time"
if x_axis is None:
x_axis = raw.get_frequency()
x_name = "frequency"
# Select signals
if signal_names is None:
signal_names = [v.name for v in raw.variables if v.name not in (x_name, "time", "frequency")]
# Downsample
total = raw.points
step = max(1, total // max_points)
# Build CSV
buf = io.StringIO()
writer = csv.writer(buf)
# Header
if x_axis is not None and np.iscomplexobj(x_axis):
headers = [x_name]
else:
headers = [x_name]
for name in signal_names:
data = raw.get_variable(name)
if data is not None:
if np.iscomplexobj(data):
headers.extend([f"{name}_magnitude_db", f"{name}_phase_deg"])
else:
headers.append(name)
writer.writerow(headers)
# Data rows
indices = range(0, total, step)
for i in indices:
row = []
if x_axis is not None:
row.append(x_axis[i].real if np.iscomplexobj(x_axis) else x_axis[i])
for name in signal_names:
data = raw.get_variable(name)
if data is not None:
if np.iscomplexobj(data):
val = data[i]
row.append(20 * math.log10(abs(val)) if abs(val) > 0 else -200)
row.append(math.degrees(math.atan2(val.imag, val.real)))
else:
row.append(data[i])
writer.writerow(row)
csv_content = buf.getvalue()
# Save to file
if output_path is None:
raw_name = Path(raw_file_path).stem
output_path = str(Path(tempfile.gettempdir()) / f"{raw_name}.csv")
Path(output_path).write_text(csv_content)
return {
"output_path": output_path,
"rows": len(indices),
"columns": headers,
}
# ============================================================================
# SCHEMATIC TOOLS
# ============================================================================
@mcp.tool()
def read_schematic(schematic_path: str) -> dict:
"""Read and parse an LTspice schematic file.
@ -221,12 +458,6 @@ def read_schematic(schematic_path: str) -> dict:
Args:
schematic_path: Path to .asc schematic file
Returns:
dict with:
- components: list of {name, symbol, value, x, y}
- nets: list of net/flag names
- directives: list of SPICE directive strings
"""
sch = parse_schematic(schematic_path)
@ -258,26 +489,16 @@ def edit_component(
) -> dict:
"""Modify a component's value in a schematic.
Use this to change resistor values, capacitor values, etc.
programmatically before running a simulation.
Args:
schematic_path: Path to .asc schematic file
component_name: Instance name like "R1", "C2", "M1"
new_value: New value string, e.g., "10k", "100n", "2N7000"
output_path: Where to save modified schematic (None = overwrite)
Returns:
dict with success status and component details
output_path: Where to save (None = overwrite original)
"""
try:
sch = modify_component_value(
schematic_path,
component_name,
new_value,
output_path,
schematic_path, component_name, new_value, output_path,
)
comp = sch.get_component(component_name)
return {
"success": True,
@ -287,12 +508,116 @@ def edit_component(
"symbol": comp.symbol if comp else None,
}
except ValueError as e:
return {"success": False, "error": str(e)}
@mcp.tool()
def diff_schematics(
schematic_a: str,
schematic_b: str,
) -> dict:
"""Compare two schematics and show what changed.
Reports component additions, removals, value changes,
directive changes, and wire/net topology differences.
Args:
schematic_a: Path to "before" .asc file
schematic_b: Path to "after" .asc file
"""
diff = _diff_schematics(schematic_a, schematic_b)
return diff.to_dict()
@mcp.tool()
def run_drc(schematic_path: str) -> dict:
"""Run design rule checks on a schematic.
Checks for common issues:
- Missing ground connection
- Floating nodes
- Missing simulation directive
- Voltage source loops
- Missing component values
- Duplicate component names
- Unconnected components
Args:
schematic_path: Path to .asc schematic file
"""
result = _run_drc(schematic_path)
return result.to_dict()
# ============================================================================
# NETLIST BUILDER TOOLS
# ============================================================================
@mcp.tool()
def create_netlist(
title: str,
components: list[dict],
directives: list[str],
output_path: str | None = None,
) -> dict:
"""Create a SPICE netlist programmatically and save to a .cir file.
Build circuits from scratch without needing a graphical schematic.
The created .cir file can be simulated with simulate_netlist.
Args:
title: Circuit title/description
components: List of component dicts, each with:
- name: Component name (R1, C1, V1, M1, X1, etc.)
- nodes: List of node names (use "0" for ground)
- value: Value or model name
- params: Optional extra parameters string
directives: List of SPICE directives, e.g.:
[".tran 10m", ".ac dec 100 1 1meg",
".meas tran vmax MAX V(out)"]
output_path: Where to save .cir file (None = auto in /tmp)
Example components:
[
{"name": "V1", "nodes": ["in", "0"], "value": "AC 1"},
{"name": "R1", "nodes": ["in", "out"], "value": "10k"},
{"name": "C1", "nodes": ["out", "0"], "value": "100n"}
]
"""
nl = Netlist(title=title)
for comp in components:
nl.add_component(
name=comp["name"],
nodes=comp["nodes"],
value=comp["value"],
params=comp.get("params", ""),
)
for directive in directives:
nl.add_directive(directive)
# Determine output path
if output_path is None:
safe_title = "".join(c if c.isalnum() else "_" for c in title)[:30]
output_path = str(Path(tempfile.gettempdir()) / f"{safe_title}.cir")
saved = nl.save(output_path)
return {
"success": False,
"error": str(e),
"success": True,
"output_path": str(saved),
"netlist_preview": nl.render(),
"component_count": len(nl.components),
}
# ============================================================================
# LIBRARY & MODEL TOOLS
# ============================================================================
@mcp.tool()
def list_symbols(
category: str | None = None,
@ -301,17 +626,10 @@ def list_symbols(
) -> dict:
"""List available component symbols from LTspice library.
Symbols define the graphical representation and pins of components.
Args:
category: Filter by category folder (e.g., "Opamps", "Comparators")
category: Filter by category (e.g., "Opamps", "Comparators")
search: Search term for symbol name (case-insensitive)
limit: Maximum results to return
Returns:
dict with:
- symbols: list of {name, category, path}
- total_count: total matching symbols
"""
symbols = []
sym_dir = LTSPICE_LIB / "sym"
@ -324,27 +642,16 @@ def list_symbols(
cat = str(rel_path.parent) if rel_path.parent != Path(".") else "misc"
name = asy_file.stem
# Apply filters
if category and cat.lower() != category.lower():
continue
if search and search.lower() not in name.lower():
continue
symbols.append({
"name": name,
"category": cat,
"path": str(asy_file),
})
symbols.append({"name": name, "category": cat, "path": str(asy_file)})
# Sort by name
symbols.sort(key=lambda x: x["name"].lower())
total = len(symbols)
return {
"symbols": symbols[:limit],
"total_count": total,
"returned_count": min(limit, total),
}
return {"symbols": symbols[:limit], "total_count": total, "returned_count": min(limit, total)}
@mcp.tool()
@ -355,15 +662,10 @@ def list_examples(
) -> dict:
"""List example circuits from LTspice examples library.
Great for learning or as starting points for new designs.
Args:
category: Filter by category folder
search: Search term for example name
limit: Maximum results to return
Returns:
dict with list of example schematics
"""
examples = []
@ -380,20 +682,11 @@ def list_examples(
if search and search.lower() not in name.lower():
continue
examples.append({
"name": name,
"category": cat,
"path": str(asc_file),
})
examples.append({"name": name, "category": cat, "path": str(asc_file)})
examples.sort(key=lambda x: x["name"].lower())
total = len(examples)
return {
"examples": examples[:limit],
"total_count": total,
"returned_count": min(limit, total),
}
return {"examples": examples[:limit], "total_count": total, "returned_count": min(limit, total)}
@mcp.tool()
@ -404,9 +697,6 @@ def get_symbol_info(symbol_path: str) -> dict:
Args:
symbol_path: Path to .asy symbol file
Returns:
dict with symbol details including pins and default attributes
"""
path = Path(symbol_path)
if not path.exists():
@ -448,7 +738,6 @@ def get_symbol_info(symbol_path: str) -> dict:
attr_name = parts[1]
attr_value = parts[2]
info["attributes"][attr_name] = attr_value
if attr_name == "Description":
info["description"] = attr_value
elif attr_name == "Prefix":
@ -460,14 +749,67 @@ def get_symbol_info(symbol_path: str) -> dict:
@mcp.tool()
def check_installation() -> dict:
"""Verify LTspice and Wine are properly installed.
def search_spice_models(
search: str | None = None,
model_type: str | None = None,
limit: int = 50,
) -> dict:
"""Search for SPICE .model definitions in the library.
Returns:
dict with installation status and paths
Finds transistors, diodes, and other discrete devices.
Args:
search: Search term for model name (case-insensitive)
model_type: Filter by type: NPN, PNP, NMOS, PMOS, D, NJF, PJF
limit: Maximum results
"""
ok, msg = validate_installation()
models = _search_models(search=search, model_type=model_type, limit=limit)
return {
"models": [
{
"name": m.name,
"type": m.type,
"source_file": m.source_file,
"parameters": m.parameters,
}
for m in models
],
"total_count": len(models),
}
@mcp.tool()
def search_spice_subcircuits(
search: str | None = None,
limit: int = 50,
) -> dict:
"""Search for SPICE .subckt definitions (op-amps, ICs, etc.).
Args:
search: Search term for subcircuit name
limit: Maximum results
"""
subs = _search_subcircuits(search=search, limit=limit)
return {
"subcircuits": [
{
"name": s.name,
"pins": s.pins,
"pin_names": s.pin_names,
"description": s.description,
"source_file": s.source_file,
"n_components": s.n_components,
}
for s in subs
],
"total_count": len(subs),
}
@mcp.tool()
def check_installation() -> dict:
"""Verify LTspice and Wine are properly installed."""
ok, msg = validate_installation()
from .config import LTSPICE_DIR, LTSPICE_EXE, WINE_PREFIX
return {
@ -492,14 +834,14 @@ def check_installation() -> dict:
@mcp.resource("ltspice://symbols")
def resource_symbols() -> str:
"""List of all available LTspice symbols organized by category."""
"""All available LTspice symbols organized by category."""
result = list_symbols(limit=10000)
return json.dumps(result, indent=2)
@mcp.resource("ltspice://examples")
def resource_examples() -> str:
"""List of all LTspice example circuits."""
"""All LTspice example circuits."""
result = list_examples(limit=10000)
return json.dumps(result, indent=2)
@ -510,6 +852,122 @@ def resource_status() -> str:
return json.dumps(check_installation(), indent=2)
# ============================================================================
# PROMPTS
# ============================================================================
@mcp.prompt()
def design_filter(
filter_type: str = "lowpass",
topology: str = "rc",
cutoff_freq: str = "1kHz",
) -> str:
"""Guide through designing and simulating a filter circuit.
Args:
filter_type: lowpass, highpass, bandpass, or notch
topology: rc (1st order), rlc (2nd order), or sallen-key (active)
cutoff_freq: Target cutoff frequency with units
"""
return f"""Design a {filter_type} filter with these requirements:
- Topology: {topology}
- Cutoff frequency: {cutoff_freq}
Workflow:
1. Use create_netlist to build the circuit
2. Add .ac analysis directive for frequency sweep
3. Add .meas directive for -3dB bandwidth
4. Simulate with simulate_netlist
5. Use measure_bandwidth to verify cutoff frequency
6. Use get_waveform to inspect the frequency response
7. Adjust component values with create_netlist if needed
Tips:
- For RC lowpass: f_c = 1/(2*pi*R*C)
- For 2nd order: Q controls peaking, Butterworth Q=0.707
- Use search_spice_models to find op-amp models for active filters
"""
@mcp.prompt()
def analyze_power_supply(schematic_path: str = "") -> str:
"""Guide through analyzing a power supply circuit.
Args:
schematic_path: Path to the power supply schematic
"""
path_instruction = (
f"The schematic is at: {schematic_path}"
if schematic_path
else "First, identify or create the power supply schematic."
)
return f"""Analyze a power supply circuit for key performance metrics.
{path_instruction}
Workflow:
1. Use read_schematic to understand the circuit topology
2. Use run_drc to check for design issues
3. Simulate with .tran analysis (include load step if applicable)
4. Use analyze_waveform with these analyses:
- "peak_to_peak" on output for ripple measurement
- "settling_time" for transient response
- "fft" on output to identify noise frequencies
5. If AC analysis available, use measure_bandwidth for loop gain
Key metrics to extract:
- Output voltage regulation (DC accuracy)
- Ripple voltage (peak-to-peak on output)
- Load transient response (settling time after step)
- Efficiency (input power vs output power)
"""
@mcp.prompt()
def debug_circuit(schematic_path: str = "") -> str:
"""Guide through debugging a circuit that isn't working.
Args:
schematic_path: Path to the problematic schematic
"""
path_instruction = (
f"The schematic is at: {schematic_path}"
if schematic_path
else "First, identify the schematic file."
)
return f"""Systematic approach to debugging a circuit.
{path_instruction}
Step 1 - Validate the schematic:
- Use run_drc to catch obvious issues (missing ground, floating nodes)
- Use read_schematic to review component values and connections
Step 2 - Check simulation setup:
- Verify simulation directives are correct
- Check that models/subcircuits are available (search_spice_models)
Step 3 - Run and analyze:
- Simulate the circuit
- Use get_waveform to inspect key node voltages
- Compare expected vs actual values at each stage
Step 4 - Isolate the problem:
- Use edit_component to simplify (replace active devices with ideal)
- Use diff_schematics to track what changes fixed the issue
- Re-simulate after each change
Common issues:
- Wrong node connections (check wire endpoints)
- Missing bias voltages or ground
- Component values off by orders of magnitude
- Wrong model (check with search_spice_models)
"""
# ============================================================================
# ENTRY POINT
# ============================================================================
@ -517,15 +975,14 @@ def resource_status() -> str:
def main():
"""Run the MCP server."""
print(f"🔌 mcp-ltspice v{__version__}")
print(f"\U0001f50c mcp-ltspice v{__version__}")
print(" LTspice circuit simulation automation")
# Quick validation
ok, msg = validate_installation()
if ok:
print(f" {msg}")
print(f" \u2713 {msg}")
else:
print(f" {msg}")
print(f" \u26a0 {msg}")
mcp.run()

View File

@ -0,0 +1,517 @@
"""Waveform analysis and signal processing for simulation data."""
import numpy as np
def compute_fft(
time: np.ndarray, signal: np.ndarray, max_harmonics: int = 50
) -> dict:
"""Compute FFT of a time-domain signal.
Args:
time: Time array in seconds (must be monotonically increasing)
signal: Signal amplitude array (same length as time)
max_harmonics: Maximum number of frequency bins to return
Returns:
Dict with frequencies, magnitudes, magnitudes_db,
fundamental_freq, and dc_offset
"""
if len(time) < 2 or len(signal) < 2:
return {
"frequencies": [],
"magnitudes": [],
"magnitudes_db": [],
"fundamental_freq": 0.0,
"dc_offset": 0.0,
}
n = len(signal)
dt = (time[-1] - time[0]) / (n - 1)
if dt <= 0:
return {
"frequencies": [],
"magnitudes": [],
"magnitudes_db": [],
"fundamental_freq": 0.0,
"dc_offset": float(np.mean(np.real(signal))),
}
# Use real FFT for real-valued signals
spectrum = np.fft.rfft(np.real(signal))
freqs = np.fft.rfftfreq(n, d=dt)
magnitudes = np.abs(spectrum) * 2.0 / n
# DC component doesn't get the 2x factor
magnitudes[0] /= 2.0
dc_offset = float(magnitudes[0])
# Find fundamental: largest magnitude excluding DC
if len(magnitudes) > 1:
fund_idx = int(np.argmax(magnitudes[1:])) + 1
fundamental_freq = float(freqs[fund_idx])
else:
fund_idx = 0
fundamental_freq = 0.0
# Trim to max_harmonics (plus DC bin)
limit = min(max_harmonics + 1, len(freqs))
freqs = freqs[:limit]
magnitudes = magnitudes[:limit]
# dB conversion with floor to avoid log(0)
magnitudes_db = 20.0 * np.log10(np.maximum(magnitudes, 1e-15))
return {
"frequencies": freqs.tolist(),
"magnitudes": magnitudes.tolist(),
"magnitudes_db": magnitudes_db.tolist(),
"fundamental_freq": fundamental_freq,
"dc_offset": dc_offset,
}
def compute_thd(
time: np.ndarray, signal: np.ndarray, n_harmonics: int = 10
) -> dict:
"""Compute Total Harmonic Distortion.
THD is the ratio of harmonic content to the fundamental, expressed
as a percentage: sqrt(sum(V_n^2 for n>=2)) / V_1 * 100
Args:
time: Time array in seconds
signal: Signal amplitude array
n_harmonics: Number of harmonics to include (2nd through nth)
Returns:
Dict with thd_percent, fundamental_freq, fundamental_magnitude,
and harmonics list
"""
if len(time) < 2 or len(signal) < 2:
return {
"thd_percent": 0.0,
"fundamental_freq": 0.0,
"fundamental_magnitude": 0.0,
"harmonics": [],
}
n = len(signal)
dt = (time[-1] - time[0]) / (n - 1)
if dt <= 0:
return {
"thd_percent": 0.0,
"fundamental_freq": 0.0,
"fundamental_magnitude": 0.0,
"harmonics": [],
}
spectrum = np.fft.rfft(np.real(signal))
freqs = np.fft.rfftfreq(n, d=dt)
magnitudes = np.abs(spectrum) * 2.0 / n
magnitudes[0] /= 2.0 # DC correction
# Fundamental = largest non-DC peak
if len(magnitudes) <= 1:
return {
"thd_percent": 0.0,
"fundamental_freq": 0.0,
"fundamental_magnitude": 0.0,
"harmonics": [],
}
fund_idx = int(np.argmax(magnitudes[1:])) + 1
fundamental_freq = float(freqs[fund_idx])
fundamental_mag = float(magnitudes[fund_idx])
if fundamental_mag < 1e-15:
return {
"thd_percent": 0.0,
"fundamental_freq": fundamental_freq,
"fundamental_magnitude": fundamental_mag,
"harmonics": [],
}
# Collect harmonics by finding the bin closest to each integer multiple
harmonics = []
harmonic_sum_sq = 0.0
for h in range(2, n_harmonics + 2):
target_freq = fundamental_freq * h
if target_freq > freqs[-1]:
break
idx = int(np.argmin(np.abs(freqs - target_freq)))
mag = float(magnitudes[idx])
mag_db = 20.0 * np.log10(max(mag, 1e-15))
harmonic_sum_sq += mag ** 2
harmonics.append({
"harmonic": h,
"frequency": float(freqs[idx]),
"magnitude": mag,
"magnitude_db": mag_db,
})
thd_percent = (np.sqrt(harmonic_sum_sq) / fundamental_mag) * 100.0
return {
"thd_percent": float(thd_percent),
"fundamental_freq": fundamental_freq,
"fundamental_magnitude": fundamental_mag,
"harmonics": harmonics,
}
def compute_rms(signal: np.ndarray) -> float:
"""Compute RMS value of a signal.
Args:
signal: Signal amplitude array (real or complex)
Returns:
RMS value as a float
"""
if len(signal) == 0:
return 0.0
real_signal = np.real(signal)
return float(np.sqrt(np.mean(real_signal ** 2)))
def compute_peak_to_peak(signal: np.ndarray) -> dict:
"""Compute peak-to-peak metrics.
Args:
signal: Signal amplitude array
Returns:
Dict with peak_to_peak, max, min, and mean values
"""
if len(signal) == 0:
return {
"peak_to_peak": 0.0,
"max": 0.0,
"min": 0.0,
"mean": 0.0,
}
real_signal = np.real(signal)
sig_max = float(np.max(real_signal))
sig_min = float(np.min(real_signal))
return {
"peak_to_peak": sig_max - sig_min,
"max": sig_max,
"min": sig_min,
"mean": float(np.mean(real_signal)),
}
def compute_settling_time(
time: np.ndarray,
signal: np.ndarray,
final_value: float | None = None,
tolerance_percent: float = 2.0,
) -> dict:
"""Compute settling time.
Searches backwards from the end of the signal to find the last
point where the signal was outside the tolerance band around the
final value. Settling time is measured from time[0] to that crossing.
Args:
time: Time array in seconds
signal: Signal amplitude array
final_value: Target value. If None, uses the last sample.
tolerance_percent: Allowed deviation as a percentage of final_value
(or absolute if final_value is near zero)
Returns:
Dict with settling_time, final_value, tolerance, and settled flag
"""
if len(time) < 2 or len(signal) < 2:
return {
"settling_time": 0.0,
"final_value": 0.0,
"tolerance": 0.0,
"settled": False,
}
real_signal = np.real(signal)
if final_value is None:
final_value = float(real_signal[-1])
# Tolerance band: percentage of final_value, but use absolute
# tolerance if final_value is near zero to avoid a degenerate band
if abs(final_value) > 1e-12:
tolerance = abs(final_value) * tolerance_percent / 100.0
else:
# Fall back to percentage of signal range
sig_range = float(np.max(real_signal) - np.min(real_signal))
tolerance = sig_range * tolerance_percent / 100.0 if sig_range > 0 else 1e-12
# Walk backwards to find where signal last left the tolerance band
outside = np.abs(real_signal - final_value) > tolerance
if not np.any(outside):
# Signal was always within tolerance
return {
"settling_time": 0.0,
"final_value": final_value,
"tolerance": tolerance,
"settled": True,
}
# Find the last index that was outside the band
last_outside = int(np.max(np.nonzero(outside)[0]))
if last_outside >= len(time) - 1:
# Never settled within the captured data
return {
"settling_time": float(time[-1] - time[0]),
"final_value": final_value,
"tolerance": tolerance,
"settled": False,
}
# Settling time = time from start to the first sample inside the band
# after the last excursion
settling_time = float(time[last_outside + 1] - time[0])
return {
"settling_time": settling_time,
"final_value": final_value,
"tolerance": tolerance,
"settled": True,
}
def compute_rise_time(
time: np.ndarray,
signal: np.ndarray,
low_pct: float = 10,
high_pct: float = 90,
) -> dict:
"""Compute rise time between two percentage thresholds.
Uses linear interpolation between samples for sub-sample accuracy.
Thresholds are computed relative to the signal's min-to-max swing.
Args:
time: Time array in seconds
signal: Signal amplitude array
low_pct: Lower threshold as percentage of swing (default 10%)
high_pct: Upper threshold as percentage of swing (default 90%)
Returns:
Dict with rise_time, low_threshold, high_threshold,
low_time, and high_time
"""
if len(time) < 2 or len(signal) < 2:
return {
"rise_time": 0.0,
"low_threshold": 0.0,
"high_threshold": 0.0,
"low_time": 0.0,
"high_time": 0.0,
}
real_signal = np.real(signal)
sig_min = float(np.min(real_signal))
sig_max = float(np.max(real_signal))
swing = sig_max - sig_min
if swing < 1e-15:
return {
"rise_time": 0.0,
"low_threshold": sig_min,
"high_threshold": sig_max,
"low_time": float(time[0]),
"high_time": float(time[0]),
}
low_thresh = sig_min + swing * (low_pct / 100.0)
high_thresh = sig_min + swing * (high_pct / 100.0)
low_time = _interpolate_crossing(time, real_signal, low_thresh, rising=True)
high_time = _interpolate_crossing(time, real_signal, high_thresh, rising=True)
if low_time is None or high_time is None:
return {
"rise_time": 0.0,
"low_threshold": low_thresh,
"high_threshold": high_thresh,
"low_time": float(low_time) if low_time is not None else None,
"high_time": float(high_time) if high_time is not None else None,
}
return {
"rise_time": high_time - low_time,
"low_threshold": low_thresh,
"high_threshold": high_thresh,
"low_time": low_time,
"high_time": high_time,
}
def _interpolate_crossing(
time: np.ndarray,
signal: np.ndarray,
threshold: float,
rising: bool = True,
) -> float | None:
"""Find the first time a signal crosses a threshold, with interpolation.
Args:
time: Time array
signal: Signal array
threshold: Value to cross
rising: If True, look for low-to-high crossing
Returns:
Interpolated time of crossing, or None if no crossing found
"""
for i in range(len(signal) - 1):
if rising:
crosses = signal[i] <= threshold < signal[i + 1]
else:
crosses = signal[i] >= threshold > signal[i + 1]
if crosses:
# Linear interpolation between samples
dv = signal[i + 1] - signal[i]
if abs(dv) < 1e-30:
return float(time[i])
frac = (threshold - signal[i]) / dv
return float(time[i] + frac * (time[i + 1] - time[i]))
return None
def compute_bandwidth(
frequency: np.ndarray,
magnitude_db: np.ndarray,
ref_db: float | None = None,
) -> dict:
"""Compute -3dB bandwidth from frequency response data.
Interpolates between data points to find the exact frequencies
where the response crosses the -3dB level relative to the reference.
Args:
frequency: Frequency array in Hz (may be complex; .real is used)
magnitude_db: Magnitude in dB
ref_db: Reference level in dB. Defaults to the peak of magnitude_db.
Returns:
Dict with bandwidth_hz, f_low, f_high, ref_db, and type
"""
if len(frequency) < 2 or len(magnitude_db) < 2:
return {
"bandwidth_hz": 0.0,
"f_low": None,
"f_high": None,
"ref_db": 0.0,
"type": "unknown",
}
freq = np.real(frequency).astype(np.float64)
mag = np.real(magnitude_db).astype(np.float64)
# Sort by frequency (in case data isn't ordered)
sort_idx = np.argsort(freq)
freq = freq[sort_idx]
mag = mag[sort_idx]
# Strip any negative frequencies
positive_mask = freq >= 0
freq = freq[positive_mask]
mag = mag[positive_mask]
if len(freq) < 2:
return {
"bandwidth_hz": 0.0,
"f_low": None,
"f_high": None,
"ref_db": 0.0,
"type": "unknown",
}
if ref_db is None:
ref_db = float(np.max(mag))
cutoff = ref_db - 3.0
# Find all -3dB crossings by checking where magnitude crosses the cutoff
above = mag >= cutoff
crossings = []
for i in range(len(mag) - 1):
if above[i] != above[i + 1]:
# Interpolate the exact crossing frequency
dm = mag[i + 1] - mag[i]
if abs(dm) < 1e-30:
f_cross = float(freq[i])
else:
frac = (cutoff - mag[i]) / dm
f_cross = float(freq[i] + frac * (freq[i + 1] - freq[i]))
crossings.append(f_cross)
if not crossings:
# No crossing found - check if entirely above or below
if np.all(above):
return {
"bandwidth_hz": float(freq[-1] - freq[0]),
"f_low": None,
"f_high": None,
"ref_db": ref_db,
"type": "unknown",
}
return {
"bandwidth_hz": 0.0,
"f_low": None,
"f_high": None,
"ref_db": ref_db,
"type": "unknown",
}
# Classify response shape
peak_idx = int(np.argmax(mag))
if len(crossings) == 1:
f_cross = crossings[0]
if peak_idx < len(freq) // 2:
# Peak is at low end => lowpass
return {
"bandwidth_hz": f_cross,
"f_low": None,
"f_high": f_cross,
"ref_db": ref_db,
"type": "lowpass",
}
else:
# Peak is at high end => highpass
return {
"bandwidth_hz": float(freq[-1]) - f_cross,
"f_low": f_cross,
"f_high": None,
"ref_db": ref_db,
"type": "highpass",
}
# Two or more crossings => bandpass (use first and last)
f_low = crossings[0]
f_high = crossings[-1]
return {
"bandwidth_hz": f_high - f_low,
"f_low": f_low,
"f_high": f_high,
"ref_db": ref_db,
"type": "bandpass",
}