#!/usr/bin/env python3 """ Enhanced signal analysis for carrier detection and characterization. Goes beyond the basic detect_peaks() in skywalker_lib by using robust noise floor estimation (median + MAD), peak width measurement at -3dB, peak merging within estimated carrier bandwidth, and carrier classification. """ import math import statistics def adaptive_noise_floor(powers: list) -> tuple: """ Robust noise floor estimation using median + MAD. The median is insensitive to strong carriers in the sweep, and the Median Absolute Deviation (MAD) provides a robust spread measure that won't be pulled by a few dominant peaks. Returns (noise_floor_db, mad_db). """ if not powers: return (0.0, 0.0) median_power = statistics.median(powers) deviations = [abs(p - median_power) for p in powers] mad = statistics.median(deviations) if deviations else 0.0 # The noise floor sits at the median -- most bins are noise in a # typical satellite IF sweep. MAD gives us an idea of how "bumpy" # the noise is, useful for setting adaptive thresholds. return (median_power, mad) def detect_peaks_enhanced(freqs: list, powers: list, threshold_db: float = 6.0) -> list: """ Enhanced peak detection with width estimation and merging. Returns list of dicts, each containing: freq - center frequency in MHz power - peak power in dB (relative) index - index into freqs/powers arrays width_mhz - estimated carrier bandwidth at -3dB prominence_db - peak power above noise floor Steps: 1. Compute adaptive noise floor (median + MAD) 2. Find local maxima above noise_floor + threshold_db 3. Estimate -3dB width around each peak 4. Merge peaks whose -3dB extents overlap (same carrier) """ if len(powers) < 3 or len(freqs) != len(powers): return [] noise_floor, mad = adaptive_noise_floor(powers) # Effective threshold: user threshold, but never below 3x MAD to # avoid chasing noise ripples. effective_threshold = max(threshold_db, 3.0 * mad) if mad > 0 else threshold_db min_power = noise_floor + effective_threshold # Step 1: find raw local maxima raw_peaks = [] for i in range(1, len(powers) - 1): if powers[i] > powers[i - 1] and powers[i] > powers[i + 1]: if powers[i] >= min_power: raw_peaks.append(i) # Also check endpoints if they are strong if len(powers) >= 2: if powers[0] > powers[1] and powers[0] >= min_power: raw_peaks.insert(0, 0) if powers[-1] > powers[-2] and powers[-1] >= min_power: raw_peaks.append(len(powers) - 1) if not raw_peaks: return [] # Step 2: measure width and build peak dicts peaks = [] for idx in raw_peaks: bw = estimate_carrier_bw(freqs, powers, idx) prominence = powers[idx] - noise_floor peaks.append({ "freq": freqs[idx], "power": powers[idx], "index": idx, "width_mhz": bw, "prominence_db": prominence, }) # Step 3: merge overlapping peaks (keep the stronger one) merged = _merge_peaks(peaks) return merged def _merge_peaks(peaks: list) -> list: """ Merge peaks whose -3dB extents overlap. When two peaks are closer together than the sum of their half-widths they likely belong to the same carrier. Keep the stronger peak and take the wider bandwidth. """ if len(peaks) <= 1: return peaks # Sort by frequency peaks = sorted(peaks, key=lambda p: p["freq"]) merged = [peaks[0]] for peak in peaks[1:]: prev = merged[-1] # Half-widths prev_upper = prev["freq"] + prev["width_mhz"] / 2 peak_lower = peak["freq"] - peak["width_mhz"] / 2 if peak_lower <= prev_upper: # Overlap: keep the stronger peak, widen the bandwidth if peak["power"] > prev["power"]: wider = max(prev["width_mhz"], peak["width_mhz"], (peak["freq"] + peak["width_mhz"] / 2) - (prev["freq"] - prev["width_mhz"] / 2)) peak["width_mhz"] = wider merged[-1] = peak else: wider = max(prev["width_mhz"], peak["width_mhz"], (peak["freq"] + peak["width_mhz"] / 2) - (prev["freq"] - prev["width_mhz"] / 2)) prev["width_mhz"] = wider else: merged.append(peak) return merged def estimate_carrier_bw(freqs: list, powers: list, peak_idx: int) -> float: """ Estimate carrier bandwidth by walking from peak until power drops 3 dB below the peak value (the -3dB bandwidth). Walks left and right from the peak index, interpolating between adjacent frequency bins when the -3dB crossing falls between them. Returns estimated bandwidth in MHz. Minimum return is one frequency step width (avoids zero-width artifacts on single-bin peaks). """ if peak_idx < 0 or peak_idx >= len(powers): return 0.0 peak_power = powers[peak_idx] cutoff = peak_power - 3.0 # Minimum step size for fallback if len(freqs) >= 2: step = abs(freqs[1] - freqs[0]) else: return 0.0 # Walk left left_freq = freqs[peak_idx] for i in range(peak_idx - 1, -1, -1): if powers[i] <= cutoff: # Interpolate between bin i and bin i+1 if powers[i + 1] != powers[i]: frac = (cutoff - powers[i]) / (powers[i + 1] - powers[i]) else: frac = 0.5 left_freq = freqs[i] + frac * (freqs[i + 1] - freqs[i]) break left_freq = freqs[i] # Walk right right_freq = freqs[peak_idx] for i in range(peak_idx + 1, len(powers)): if powers[i] <= cutoff: if powers[i - 1] != powers[i]: frac = (cutoff - powers[i]) / (powers[i - 1] - powers[i]) else: frac = 0.5 right_freq = freqs[i] - frac * (freqs[i] - freqs[i - 1]) break right_freq = freqs[i] bw = right_freq - left_freq return max(bw, step) def classify_carrier(bw_mhz: float, power_db: float) -> dict: """ Classify a detected carrier based on bandwidth and power. Uses empirical ranges for DVB-S symbol rates: SR (Msps) is roughly BW (MHz) / 1.35 for QPSK with roll-off 0.35. Returns dict with: estimated_sr_range - (min_sps, max_sps) tuple likely_modulation - list of plausible modulation names signal_quality - 'strong', 'moderate', or 'weak' """ # Roll-off factor for DVB-S is typically 0.35, so BW ~ SR * 1.35. # Allow some tolerance on both sides. sr_center = bw_mhz / 1.35 # Msps sr_min = int(max(256_000, (bw_mhz / 1.5) * 1_000_000)) sr_max = int(min(30_000_000, (bw_mhz / 1.2) * 1_000_000)) if sr_max < sr_min: sr_max = sr_min # Guess modulation based on bandwidth likely_mods = [] if bw_mhz < 3.0: # Narrow carrier: low-SR data channels, SCPC, DCII split likely_mods = ["qpsk", "dcii-i", "dcii-q", "dss"] elif bw_mhz < 10.0: # Medium: typical SCPC, small MCPC likely_mods = ["qpsk", "turbo-qpsk", "dcii-combo"] elif bw_mhz < 20.0: # Wide: MCPC transponders likely_mods = ["qpsk", "turbo-qpsk", "turbo-8psk"] else: # Very wide: full transponder, high-SR likely_mods = ["qpsk", "turbo-qpsk", "turbo-8psk", "dcii-combo"] # Signal quality heuristic (relative power, device-dependent) if power_db > -10.0: quality = "strong" elif power_db > -25.0: quality = "moderate" else: quality = "weak" return { "estimated_sr_range": (sr_min, sr_max), "likely_modulation": likely_mods, "signal_quality": quality, }