diff --git a/examples/fm_scanner.py b/examples/fm_scanner.py new file mode 100755 index 0000000..9600dff --- /dev/null +++ b/examples/fm_scanner.py @@ -0,0 +1,266 @@ +#!/usr/bin/env python3 +"""FM Band Scanner — scan 87.5–108.0 MHz using rtl_power, rank stations by signal strength.""" + +import argparse +import csv +import io +import json +import shutil +import subprocess +import sys +from collections import defaultdict +from pathlib import Path + + +def run_rtl_power(gain: int = 10) -> str: + """Execute rtl_power for a single FM band sweep and return raw CSV output. + + The scan covers 87.5–108.0 MHz in 200 kHz bins (US FM channel spacing) + with 1-second integration time per sweep segment. + """ + cmd = [ + "rtl_power", + "-f", "87.5M:108M:200k", + "-g", str(gain), + "-i", "1", + "-1", # single-shot + "-", # stdout + ] + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + except FileNotFoundError: + print("Error: rtl_power not found. Install rtl-sdr tools.", file=sys.stderr) + sys.exit(1) + except subprocess.TimeoutExpired: + print("Error: rtl_power timed out after 30 seconds.", file=sys.stderr) + sys.exit(1) + + if result.returncode != 0: + stderr = result.stderr.strip() + print(f"Error: rtl_power exited with code {result.returncode}", file=sys.stderr) + if stderr: + print(stderr, file=sys.stderr) + sys.exit(1) + + return result.stdout + + +def parse_scan(csv_data: str) -> list[tuple[float, float]]: + """Parse rtl_power CSV output into (frequency_mhz, power_dbm) pairs. + + rtl_power CSV format per row: + date, time, freq_low_hz, freq_high_hz, bin_step_hz, num_samples, dBm, dBm, ... + + Each row covers a frequency range with multiple FFT bins. We compute the + center frequency of each bin and pair it with its power reading. + """ + readings: list[tuple[float, float]] = [] + + reader = csv.reader(io.StringIO(csv_data)) + for row in reader: + if len(row) < 7: + continue + try: + freq_low = float(row[2].strip()) + freq_high = float(row[3].strip()) + bin_step = float(row[4].strip()) + # num_samples = int(row[5].strip()) # not needed + power_values = [float(v.strip()) for v in row[6:] if v.strip()] + except (ValueError, IndexError): + continue + + # Map each FFT bin to its center frequency + for i, power in enumerate(power_values): + freq_hz = freq_low + (i * bin_step) + (bin_step / 2) + freq_mhz = freq_hz / 1e6 + readings.append((freq_mhz, power)) + + return readings + + +def aggregate_channels(readings: list[tuple[float, float]]) -> list[dict]: + """Aggregate raw FFT bins into 200 kHz FM channels. + + FM stations in the US are spaced at odd multiples of 100 kHz + (87.9, 88.1, 88.3, ..., 107.9). Each occupies ~200 kHz bandwidth. + We snap each reading to the nearest standard channel and take the + max power across all bins in that channel. + """ + channel_bins: dict[float, list[float]] = defaultdict(list) + + for freq_mhz, power in readings: + # Snap to nearest 0.2 MHz FM channel (87.5, 87.7, 87.9, ...) + channel = round(round(freq_mhz / 0.2) * 0.2, 1) + if 87.5 <= channel <= 108.0: + channel_bins[channel].append(power) + + channels = [] + for freq in sorted(channel_bins): + powers = channel_bins[freq] + # Use max power — peak represents the carrier + max_power = max(powers) + channels.append({"freq_mhz": freq, "power_dbm": max_power}) + + return channels + + +def detect_stations( + channels: list[dict], threshold_db: float = 10.0 +) -> tuple[list[dict], float]: + """Find stations that rise above the noise floor. + + The noise floor is estimated as the median power across all channels. + A channel is flagged as a station if its power exceeds + noise_floor + threshold_db. + + Returns (stations_sorted_by_power, noise_floor_dbm). + """ + if not channels: + return [], -99.0 + + powers = sorted(ch["power_dbm"] for ch in channels) + noise_floor = powers[len(powers) // 2] # median + + stations = [] + for ch in channels: + snr = ch["power_dbm"] - noise_floor + if snr >= threshold_db: + stations.append({**ch, "snr_db": round(snr, 1)}) + + stations.sort(key=lambda s: s["power_dbm"], reverse=True) + return stations, noise_floor + + +def display_results( + stations: list[dict], + noise_floor: float, + all_channels: list[dict] | None = None, + show_all: bool = False, +): + """Print a formatted table of scan results to the terminal.""" + term_width = shutil.get_terminal_size((80, 24)).columns + bar_max = max(32, term_width - 42) + + items = all_channels if (show_all and all_channels) else stations + if not items: + print("No stations detected.") + return + + # Bar scaling: use noise floor as baseline so every station gets a visible bar + powers = [ch["power_dbm"] for ch in items] + p_min = noise_floor + p_max = max(powers) + p_range = p_max - p_min if p_max != p_min else 1.0 + + header = "FM Band Scan \u2014 87.5 to 108.0 MHz" + print() + print(f" {header}") + print(f" {'═' * (len(header) + 2)}") + print(f" {'#':>3} {'Frequency':<12} {'Power':<10} Signal") + print(f" {'─' * 3} {'─' * 12} {'─' * 9} {'─' * bar_max}") + + block_chars = " \u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588" + + for i, ch in enumerate(items, 1): + freq = ch["freq_mhz"] + power = ch["power_dbm"] + # Normalize to [0, 1] + norm = max(0.0, min(1.0, (power - p_min) / p_range)) + bar_len = norm * bar_max + full_blocks = int(bar_len) + frac = bar_len - full_blocks + frac_char = block_chars[int(frac * 8)] if frac > 0.05 else "" + bar = "\u2588" * full_blocks + frac_char + + # Color: green for strong, yellow for mid, dim for weak + if "snr_db" in ch and ch["snr_db"] >= 10: + bar = f"\033[32m{bar}\033[0m" # green + elif "snr_db" in ch: + bar = f"\033[33m{bar}\033[0m" # yellow + elif show_all: + bar = f"\033[2m{bar}\033[0m" # dim + + label = f"{freq:>5.1f} MHz" + print(f" {i:>3} {label:<12} {power:>7.1f} dBm {bar}") + + print(f" {'═' * (len(header) + 2)}") + print( + f" Noise floor: {noise_floor:.1f} dBm | " + f"Stations found: {len(stations)}" + ) + print() + + +def save_json(stations: list[dict], noise_floor: float, path: str): + """Write scan results to a JSON file.""" + data = { + "band": "FM", + "range_mhz": [87.5, 108.0], + "noise_floor_dbm": round(noise_floor, 1), + "station_count": len(stations), + "stations": [ + { + "freq_mhz": s["freq_mhz"], + "power_dbm": round(s["power_dbm"], 1), + "snr_db": s["snr_db"], + } + for s in stations + ], + } + Path(path).write_text(json.dumps(data, indent=2) + "\n") + print(f"Results saved to {path}") + + +def main(): + parser = argparse.ArgumentParser( + description="Scan the US FM band and rank stations by signal strength." + ) + parser.add_argument( + "--threshold", + type=float, + default=10.0, + help="Minimum dB above noise floor to flag as station (default: 10)", + ) + parser.add_argument( + "--gain", + type=int, + default=10, + help="RF tuner gain in dB (default: 10)", + ) + parser.add_argument( + "--json", + metavar="FILE", + help="Save results to JSON file", + ) + parser.add_argument( + "--all", + action="store_true", + dest="show_all", + help="Show all channels, not just detected stations", + ) + args = parser.parse_args() + + print("Scanning FM band (87.5–108.0 MHz)...", flush=True) + raw = run_rtl_power(gain=args.gain) + + readings = parse_scan(raw) + if not readings: + print("No data received from rtl_power.", file=sys.stderr) + sys.exit(1) + + channels = aggregate_channels(readings) + stations, noise_floor = detect_stations(channels, threshold_db=args.threshold) + + display_results( + stations, + noise_floor, + all_channels=channels, + show_all=args.show_all, + ) + + if args.json: + save_json(stations, noise_floor, args.json) + + +if __name__ == "__main__": + main()