Fix waveform output bloat -- compact rounding, peak-preserving downsampling, range filter

get_waveform with 3 AC signals at 700 points produced ~92K chars of JSON,
exceeding MCP client token limits. Three fixes:

- Round output values (dB to 2dp, phase to 2dp, freq/time to 6 sig figs),
  cutting JSON size ~63% (97K -> 36K chars for the same data)
- Peak-preserving downsampling for AC data: bucket-based selection keeps
  the point with the largest magnitude deviation per bucket, so narrow
  resonance peaks (high-Q filters) aren't lost by blind stride sampling
- Add x_min/x_max parameters to get_waveform and evaluate_waveform_expression
  for zooming into a frequency/time range without excessive global point counts
This commit is contained in:
Ryan Malloy 2026-03-05 14:37:52 -07:00
parent 6acf08f80d
commit 902058f851
2 changed files with 164 additions and 27 deletions

View File

@ -1,6 +1,6 @@
[project]
name = "mcltspice"
version = "2026.02.14.1"
version = "2026.03.05"
description = "MCP server for LTspice circuit simulation automation"
readme = "README.md"
requires-python = ">=3.11"

View File

@ -174,6 +174,83 @@ mcp = FastMCP(
)
# ============================================================================
# OUTPUT HELPERS
# ============================================================================
def _round_sig(x: float, sig: int = 6) -> float:
"""Round to N significant figures for compact JSON output."""
if x == 0 or not math.isfinite(x):
return float(x)
digits = sig - 1 - int(math.floor(math.log10(abs(x))))
return round(x, digits)
def _compact_list(
values, decimal_places: int | None = None, sig_figs: int | None = None
) -> list[float]:
"""Round a list of floats for compact JSON output.
Full float64 precision (~17 digits) causes massive JSON bloat.
Rounding to appropriate precision cuts output size 50-60%.
"""
if decimal_places is not None:
return [round(float(v), decimal_places) for v in values]
if sig_figs is not None:
return [_round_sig(float(v), sig_figs) for v in values]
return [float(v) for v in values]
def _downsample_indices(
n_total: int,
max_points: int,
signals: list[np.ndarray] | None = None,
) -> np.ndarray:
"""Compute indices for downsampling, preserving peaks in AC data.
For AC data (when complex signal arrays are provided), uses
peak-preserving bucket selection: divides the data into max_points
buckets and keeps the point in each bucket whose magnitude deviates
most from the bucket mean. This ensures narrow resonance peaks
(e.g. high-Q bandpass filters) aren't lost by blind stride sampling.
For transient data (no signals provided), uses simple stride.
"""
if n_total <= max_points:
return np.arange(n_total)
if signals is None or len(signals) == 0:
step = max(1, n_total // max_points)
return np.arange(0, n_total, step)
# Pre-compute magnitude_dB for each signal
mag_db_list = []
for sig in signals:
mag = np.abs(sig)
with np.errstate(divide="ignore"):
db = np.where(mag > 0, 20 * np.log10(mag), -200.0)
mag_db_list.append(db)
# Bucket-based peak-preserving selection
indices = []
bucket_size = n_total / max_points
for i in range(max_points):
start = int(i * bucket_size)
end = min(int((i + 1) * bucket_size), n_total)
if start >= end:
continue
# Combined importance: sum of |deviation from bucket mean| across signals
combined = np.zeros(end - start)
for db in mag_db_list:
bucket = db[start:end]
combined += np.abs(bucket - np.mean(bucket))
indices.append(start + int(np.argmax(combined)))
return np.array(indices)
# ============================================================================
# SIMULATION TOOLS
# ============================================================================
@ -275,6 +352,8 @@ def get_waveform(
signal_names: list[str],
max_points: int = 1000,
run: int | None = None,
x_min: float | None = None,
x_max: float | None = None,
) -> dict:
"""Extract waveform data from a .raw simulation results file.
@ -284,11 +363,19 @@ def get_waveform(
For stepped simulations (.step, .mc, .temp), specify `run` (1-based)
to extract a single run's data. Omit `run` to get all data combined.
Use x_min/x_max to zoom into a frequency or time range of interest
without needing excessive max_points for the full sweep.
AC data uses peak-preserving downsampling that keeps resonance peaks
and notches visible even at low point counts.
Args:
raw_file_path: Path to .raw file from simulation
signal_names: Signal names to extract, e.g. ["V(out)", "I(R1)"]
max_points: Maximum data points (downsampled if needed)
run: Run number (1-based) for stepped simulations (None = all data)
x_min: Minimum x-axis value (frequency Hz or time s) to include
x_max: Maximum x-axis value (frequency Hz or time s) to include
"""
raw = parse_raw_file(raw_file_path)
@ -306,40 +393,72 @@ def get_waveform(
x_axis = raw.get_frequency()
x_name = "frequency"
total_points = len(x_axis) if x_axis is not None else raw.points
step = max(1, total_points // max_points)
if x_axis is None:
return {"error": "No time or frequency axis found in raw file"}
x_real = x_axis.real if np.iscomplexobj(x_axis) else x_axis
is_complex = np.iscomplexobj(raw.data)
# Apply x-axis range filter
mask = np.ones(len(x_real), dtype=bool)
if x_min is not None:
mask &= x_real >= x_min
if x_max is not None:
mask &= x_real <= x_max
filtered_indices = np.where(mask)[0]
if len(filtered_indices) == 0:
return {
"error": f"No data points in {x_name} range [{x_min}, {x_max}]",
"total_points": len(x_real),
f"{x_name}_range": [float(x_real[0]), float(x_real[-1])],
}
# Collect signal data for peak-preserving downsampling (AC only)
ac_signals = []
if is_complex:
for name in signal_names:
data = raw.get_variable(name)
if data is not None:
ac_signals.append(data[filtered_indices])
# Downsample: peak-preserving for AC, stride for transient
ds_indices = _downsample_indices(
len(filtered_indices),
max_points,
signals=ac_signals if ac_signals else None,
)
sample_indices = filtered_indices[ds_indices]
result = {
"x_axis_name": x_name,
"x_axis_data": [],
"x_axis_data": _compact_list(x_real[sample_indices], sig_figs=6),
"signals": {},
"total_points": total_points,
"returned_points": 0,
"total_points": len(x_real),
"returned_points": len(sample_indices),
"is_stepped": raw.is_stepped,
"n_runs": raw.n_runs,
}
if x_axis is not None:
sampled = x_axis[::step]
if np.iscomplexobj(sampled):
result["x_axis_data"] = sampled.real.tolist()
else:
result["x_axis_data"] = sampled.tolist()
result["returned_points"] = len(result["x_axis_data"])
for name in signal_names:
data = raw.get_variable(name)
if data is not None:
sampled = data[::step]
sampled = data[sample_indices]
if np.iscomplexobj(sampled):
result["signals"][name] = {
"magnitude_db": [
20 * math.log10(abs(x)) if abs(x) > 0 else -200 for x in sampled
],
"phase_degrees": [math.degrees(math.atan2(x.imag, x.real)) for x in sampled],
"magnitude_db": _compact_list(
[20 * math.log10(abs(x)) if abs(x) > 0 else -200 for x in sampled],
decimal_places=2,
),
"phase_degrees": _compact_list(
[math.degrees(math.atan2(x.imag, x.real)) for x in sampled],
decimal_places=2,
),
}
else:
result["signals"][name] = {"values": sampled.tolist()}
result["signals"][name] = {
"values": _compact_list(sampled, sig_figs=6)
}
return result
@ -919,6 +1038,8 @@ def evaluate_waveform_expression(
raw_file_path: str,
expression: str,
max_points: int = 1000,
x_min: float | None = None,
x_max: float | None = None,
) -> dict:
"""Evaluate a math expression on simulation waveforms.
@ -934,6 +1055,8 @@ def evaluate_waveform_expression(
raw_file_path: Path to .raw file
expression: Math expression using signal names
max_points: Maximum data points to return
x_min: Minimum x-axis value (frequency Hz or time s) to include
x_max: Maximum x-axis value (frequency Hz or time s) to include
"""
raw = parse_raw_file(raw_file_path)
calc = WaveformCalculator(raw)
@ -951,22 +1074,36 @@ def evaluate_waveform_expression(
x_name = "frequency"
total = len(result)
step = max(1, total // max_points)
# Apply x-axis range filter
if x_axis is not None and (x_min is not None or x_max is not None):
x_real = x_axis.real if np.iscomplexobj(x_axis) else x_axis
mask = np.ones(len(x_real), dtype=bool)
if x_min is not None:
mask &= x_real >= x_min
if x_max is not None:
mask &= x_real <= x_max
filtered = np.where(mask)[0]
if len(filtered) == 0:
return {"error": f"No data points in {x_name} range [{x_min}, {x_max}]"}
else:
filtered = np.arange(total)
step = max(1, len(filtered) // max_points)
sample_indices = filtered[::step]
response = {
"expression": expression,
"total_points": total,
"returned_points": len(result[::step]),
"returned_points": len(sample_indices),
}
if x_axis is not None:
sampled_x = x_axis[::step]
x_real = x_axis.real if np.iscomplexobj(x_axis) else x_axis
response["x_axis_name"] = x_name
response["x_axis_data"] = (
sampled_x.real.tolist() if np.iscomplexobj(sampled_x) else sampled_x.tolist()
)
response["x_axis_data"] = _compact_list(x_real[sample_indices], sig_figs=6)
response["values"] = result[::step].tolist()
response["values"] = _compact_list(result[sample_indices], sig_figs=6)
response["available_signals"] = calc.available_signals()
return response