diff --git a/pyproject.toml b/pyproject.toml index e5c3ad1..05c202b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "mcltspice" -version = "2026.02.14.1" +version = "2026.03.05" description = "MCP server for LTspice circuit simulation automation" readme = "README.md" requires-python = ">=3.11" diff --git a/src/mcltspice/server.py b/src/mcltspice/server.py index b0a4bf9..acfb7c3 100644 --- a/src/mcltspice/server.py +++ b/src/mcltspice/server.py @@ -174,6 +174,83 @@ mcp = FastMCP( ) +# ============================================================================ +# OUTPUT HELPERS +# ============================================================================ + + +def _round_sig(x: float, sig: int = 6) -> float: + """Round to N significant figures for compact JSON output.""" + if x == 0 or not math.isfinite(x): + return float(x) + digits = sig - 1 - int(math.floor(math.log10(abs(x)))) + return round(x, digits) + + +def _compact_list( + values, decimal_places: int | None = None, sig_figs: int | None = None +) -> list[float]: + """Round a list of floats for compact JSON output. + + Full float64 precision (~17 digits) causes massive JSON bloat. + Rounding to appropriate precision cuts output size 50-60%. + """ + if decimal_places is not None: + return [round(float(v), decimal_places) for v in values] + if sig_figs is not None: + return [_round_sig(float(v), sig_figs) for v in values] + return [float(v) for v in values] + + +def _downsample_indices( + n_total: int, + max_points: int, + signals: list[np.ndarray] | None = None, +) -> np.ndarray: + """Compute indices for downsampling, preserving peaks in AC data. + + For AC data (when complex signal arrays are provided), uses + peak-preserving bucket selection: divides the data into max_points + buckets and keeps the point in each bucket whose magnitude deviates + most from the bucket mean. This ensures narrow resonance peaks + (e.g. high-Q bandpass filters) aren't lost by blind stride sampling. + + For transient data (no signals provided), uses simple stride. + """ + if n_total <= max_points: + return np.arange(n_total) + + if signals is None or len(signals) == 0: + step = max(1, n_total // max_points) + return np.arange(0, n_total, step) + + # Pre-compute magnitude_dB for each signal + mag_db_list = [] + for sig in signals: + mag = np.abs(sig) + with np.errstate(divide="ignore"): + db = np.where(mag > 0, 20 * np.log10(mag), -200.0) + mag_db_list.append(db) + + # Bucket-based peak-preserving selection + indices = [] + bucket_size = n_total / max_points + + for i in range(max_points): + start = int(i * bucket_size) + end = min(int((i + 1) * bucket_size), n_total) + if start >= end: + continue + # Combined importance: sum of |deviation from bucket mean| across signals + combined = np.zeros(end - start) + for db in mag_db_list: + bucket = db[start:end] + combined += np.abs(bucket - np.mean(bucket)) + indices.append(start + int(np.argmax(combined))) + + return np.array(indices) + + # ============================================================================ # SIMULATION TOOLS # ============================================================================ @@ -275,6 +352,8 @@ def get_waveform( signal_names: list[str], max_points: int = 1000, run: int | None = None, + x_min: float | None = None, + x_max: float | None = None, ) -> dict: """Extract waveform data from a .raw simulation results file. @@ -284,11 +363,19 @@ def get_waveform( For stepped simulations (.step, .mc, .temp), specify `run` (1-based) to extract a single run's data. Omit `run` to get all data combined. + Use x_min/x_max to zoom into a frequency or time range of interest + without needing excessive max_points for the full sweep. + + AC data uses peak-preserving downsampling that keeps resonance peaks + and notches visible even at low point counts. + Args: raw_file_path: Path to .raw file from simulation signal_names: Signal names to extract, e.g. ["V(out)", "I(R1)"] max_points: Maximum data points (downsampled if needed) run: Run number (1-based) for stepped simulations (None = all data) + x_min: Minimum x-axis value (frequency Hz or time s) to include + x_max: Maximum x-axis value (frequency Hz or time s) to include """ raw = parse_raw_file(raw_file_path) @@ -306,40 +393,72 @@ def get_waveform( x_axis = raw.get_frequency() x_name = "frequency" - total_points = len(x_axis) if x_axis is not None else raw.points - step = max(1, total_points // max_points) + if x_axis is None: + return {"error": "No time or frequency axis found in raw file"} + + x_real = x_axis.real if np.iscomplexobj(x_axis) else x_axis + is_complex = np.iscomplexobj(raw.data) + + # Apply x-axis range filter + mask = np.ones(len(x_real), dtype=bool) + if x_min is not None: + mask &= x_real >= x_min + if x_max is not None: + mask &= x_real <= x_max + filtered_indices = np.where(mask)[0] + + if len(filtered_indices) == 0: + return { + "error": f"No data points in {x_name} range [{x_min}, {x_max}]", + "total_points": len(x_real), + f"{x_name}_range": [float(x_real[0]), float(x_real[-1])], + } + + # Collect signal data for peak-preserving downsampling (AC only) + ac_signals = [] + if is_complex: + for name in signal_names: + data = raw.get_variable(name) + if data is not None: + ac_signals.append(data[filtered_indices]) + + # Downsample: peak-preserving for AC, stride for transient + ds_indices = _downsample_indices( + len(filtered_indices), + max_points, + signals=ac_signals if ac_signals else None, + ) + sample_indices = filtered_indices[ds_indices] result = { "x_axis_name": x_name, - "x_axis_data": [], + "x_axis_data": _compact_list(x_real[sample_indices], sig_figs=6), "signals": {}, - "total_points": total_points, - "returned_points": 0, + "total_points": len(x_real), + "returned_points": len(sample_indices), "is_stepped": raw.is_stepped, "n_runs": raw.n_runs, } - if x_axis is not None: - sampled = x_axis[::step] - if np.iscomplexobj(sampled): - result["x_axis_data"] = sampled.real.tolist() - else: - result["x_axis_data"] = sampled.tolist() - result["returned_points"] = len(result["x_axis_data"]) - for name in signal_names: data = raw.get_variable(name) if data is not None: - sampled = data[::step] + sampled = data[sample_indices] if np.iscomplexobj(sampled): result["signals"][name] = { - "magnitude_db": [ - 20 * math.log10(abs(x)) if abs(x) > 0 else -200 for x in sampled - ], - "phase_degrees": [math.degrees(math.atan2(x.imag, x.real)) for x in sampled], + "magnitude_db": _compact_list( + [20 * math.log10(abs(x)) if abs(x) > 0 else -200 for x in sampled], + decimal_places=2, + ), + "phase_degrees": _compact_list( + [math.degrees(math.atan2(x.imag, x.real)) for x in sampled], + decimal_places=2, + ), } else: - result["signals"][name] = {"values": sampled.tolist()} + result["signals"][name] = { + "values": _compact_list(sampled, sig_figs=6) + } return result @@ -919,6 +1038,8 @@ def evaluate_waveform_expression( raw_file_path: str, expression: str, max_points: int = 1000, + x_min: float | None = None, + x_max: float | None = None, ) -> dict: """Evaluate a math expression on simulation waveforms. @@ -934,6 +1055,8 @@ def evaluate_waveform_expression( raw_file_path: Path to .raw file expression: Math expression using signal names max_points: Maximum data points to return + x_min: Minimum x-axis value (frequency Hz or time s) to include + x_max: Maximum x-axis value (frequency Hz or time s) to include """ raw = parse_raw_file(raw_file_path) calc = WaveformCalculator(raw) @@ -951,22 +1074,36 @@ def evaluate_waveform_expression( x_name = "frequency" total = len(result) - step = max(1, total // max_points) + + # Apply x-axis range filter + if x_axis is not None and (x_min is not None or x_max is not None): + x_real = x_axis.real if np.iscomplexobj(x_axis) else x_axis + mask = np.ones(len(x_real), dtype=bool) + if x_min is not None: + mask &= x_real >= x_min + if x_max is not None: + mask &= x_real <= x_max + filtered = np.where(mask)[0] + if len(filtered) == 0: + return {"error": f"No data points in {x_name} range [{x_min}, {x_max}]"} + else: + filtered = np.arange(total) + + step = max(1, len(filtered) // max_points) + sample_indices = filtered[::step] response = { "expression": expression, "total_points": total, - "returned_points": len(result[::step]), + "returned_points": len(sample_indices), } if x_axis is not None: - sampled_x = x_axis[::step] + x_real = x_axis.real if np.iscomplexobj(x_axis) else x_axis response["x_axis_name"] = x_name - response["x_axis_data"] = ( - sampled_x.real.tolist() if np.iscomplexobj(sampled_x) else sampled_x.tolist() - ) + response["x_axis_data"] = _compact_list(x_real[sample_indices], sig_figs=6) - response["values"] = result[::step].tolist() + response["values"] = _compact_list(result[sample_indices], sig_figs=6) response["available_signals"] = calc.available_signals() return response