#!/usr/bin/env python3 """FM Band Scanner — scan 87.5–108.0 MHz using rtl_power, rank stations by signal strength.""" import argparse import csv import io import json import signal import shutil import subprocess import sys from collections import defaultdict from pathlib import Path def run_rtl_power(gain: int = 10) -> str: """Execute rtl_power for a single FM band sweep and return raw CSV output. The scan covers 87.5–108.0 MHz in 200 kHz bins (US FM channel spacing) with 1-second integration time per sweep segment. """ cmd = [ "rtl_power", "-f", "87.5M:108M:200k", "-g", str(gain), "-i", "1", "-1", # single-shot "-", # stdout ] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) except FileNotFoundError: print("Error: rtl_power not found. Install rtl-sdr tools.", file=sys.stderr) sys.exit(1) except subprocess.TimeoutExpired: print("Error: rtl_power timed out after 30 seconds.", file=sys.stderr) sys.exit(1) if result.returncode != 0: stderr = result.stderr.strip() print(f"Error: rtl_power exited with code {result.returncode}", file=sys.stderr) if stderr: print(stderr, file=sys.stderr) sys.exit(1) return result.stdout def parse_scan(csv_data: str) -> list[tuple[float, float]]: """Parse rtl_power CSV output into (frequency_mhz, power_dbm) pairs. rtl_power CSV format per row: date, time, freq_low_hz, freq_high_hz, bin_step_hz, num_samples, dBm, dBm, ... Each row covers a frequency range with multiple FFT bins. We compute the center frequency of each bin and pair it with its power reading. """ readings: list[tuple[float, float]] = [] reader = csv.reader(io.StringIO(csv_data)) for row in reader: if len(row) < 7: continue try: freq_low = float(row[2].strip()) freq_high = float(row[3].strip()) bin_step = float(row[4].strip()) # num_samples = int(row[5].strip()) # not needed power_values = [float(v.strip()) for v in row[6:] if v.strip()] except (ValueError, IndexError): continue # Map each FFT bin to its center frequency for i, power in enumerate(power_values): freq_hz = freq_low + (i * bin_step) + (bin_step / 2) freq_mhz = freq_hz / 1e6 readings.append((freq_mhz, power)) return readings def aggregate_channels(readings: list[tuple[float, float]]) -> list[dict]: """Aggregate raw FFT bins into 200 kHz FM channels. FM stations in the US are spaced at odd multiples of 100 kHz (87.9, 88.1, 88.3, ..., 107.9). Each occupies ~200 kHz bandwidth. We snap each reading to the nearest standard channel and take the max power across all bins in that channel. """ channel_bins: dict[float, list[float]] = defaultdict(list) for freq_mhz, power in readings: # Snap to nearest 0.2 MHz FM channel (87.5, 87.7, 87.9, ...) channel = round(round(freq_mhz / 0.2) * 0.2, 1) if 87.5 <= channel <= 108.0: channel_bins[channel].append(power) channels = [] for freq in sorted(channel_bins): powers = channel_bins[freq] # Use max power — peak represents the carrier max_power = max(powers) channels.append({"freq_mhz": freq, "power_dbm": max_power}) return channels def detect_stations( channels: list[dict], threshold_db: float = 10.0 ) -> tuple[list[dict], float]: """Find stations that rise above the noise floor. The noise floor is estimated as the median power across all channels. A channel is flagged as a station if its power exceeds noise_floor + threshold_db. Returns (stations_sorted_by_power, noise_floor_dbm). """ if not channels: return [], -99.0 powers = sorted(ch["power_dbm"] for ch in channels) noise_floor = powers[len(powers) // 2] # median stations = [] for ch in channels: snr = ch["power_dbm"] - noise_floor if snr >= threshold_db: stations.append({**ch, "snr_db": round(snr, 1)}) stations.sort(key=lambda s: s["power_dbm"], reverse=True) return stations, noise_floor def display_results( stations: list[dict], noise_floor: float, all_channels: list[dict] | None = None, show_all: bool = False, ): """Print a formatted table of scan results to the terminal.""" term_width = shutil.get_terminal_size((80, 24)).columns bar_max = max(32, term_width - 42) items = all_channels if (show_all and all_channels) else stations if not items: print("No stations detected.") return # Bar scaling: use noise floor as baseline so every station gets a visible bar powers = [ch["power_dbm"] for ch in items] p_min = noise_floor p_max = max(powers) p_range = p_max - p_min if p_max != p_min else 1.0 header = "FM Band Scan \u2014 87.5 to 108.0 MHz" print() print(f" {header}") print(f" {'═' * (len(header) + 2)}") print(f" {'#':>3} {'Frequency':<12} {'Power':<10} Signal") print(f" {'─' * 3} {'─' * 12} {'─' * 9} {'─' * bar_max}") block_chars = " \u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588" for i, ch in enumerate(items, 1): freq = ch["freq_mhz"] power = ch["power_dbm"] # Normalize to [0, 1] norm = max(0.0, min(1.0, (power - p_min) / p_range)) bar_len = norm * bar_max full_blocks = int(bar_len) frac = bar_len - full_blocks frac_char = block_chars[int(frac * 8)] if frac > 0.05 else "" bar = "\u2588" * full_blocks + frac_char # Color: green for strong, yellow for mid, dim for weak if "snr_db" in ch and ch["snr_db"] >= 10: bar = f"\033[32m{bar}\033[0m" # green elif "snr_db" in ch: bar = f"\033[33m{bar}\033[0m" # yellow elif show_all: bar = f"\033[2m{bar}\033[0m" # dim label = f"{freq:>5.1f} MHz" print(f" {i:>3} {label:<12} {power:>7.1f} dBm {bar}") print(f" {'═' * (len(header) + 2)}") print( f" Noise floor: {noise_floor:.1f} dBm | " f"Stations found: {len(stations)}" ) print() def save_json(stations: list[dict], noise_floor: float, path: str): """Write scan results to a JSON file.""" data = { "band": "FM", "range_mhz": [87.5, 108.0], "noise_floor_dbm": round(noise_floor, 1), "station_count": len(stations), "stations": [ { "freq_mhz": s["freq_mhz"], "power_dbm": round(s["power_dbm"], 1), "snr_db": s["snr_db"], } for s in stations ], } Path(path).write_text(json.dumps(data, indent=2) + "\n") print(f"Results saved to {path}") def pick_station(stations: list[dict]) -> float | None: """Interactive station picker. Returns frequency in MHz or None to quit.""" if not stations: print("No stations to choose from.") return None try: choice = input(" Tune to station # (or q to quit): ").strip() except (EOFError, KeyboardInterrupt): print() return None if choice.lower() in ("q", "quit", ""): return None try: idx = int(choice) - 1 if 0 <= idx < len(stations): return stations[idx]["freq_mhz"] print(f" Pick 1–{len(stations)}.") except ValueError: # Maybe they typed a frequency directly try: freq = float(choice) if 87.5 <= freq <= 108.0: return freq print(" Frequency must be 87.5–108.0 MHz.") except ValueError: print(" Enter a station number or frequency.") return pick_station(stations) def tune_station(freq_mhz: float, gain: int = 10): """Tune to an FM station using rtl_fm piped to aplay. rtl_fm demodulates wideband FM, resamples to 48 kHz mono, and pipes raw PCM to ALSA's aplay for real-time audio output. """ freq_hz = int(freq_mhz * 1e6) print(f"\n Tuning to {freq_mhz:.1f} MHz — Ctrl+C to stop\n") rtl_cmd = [ "rtl_fm", "-f", str(freq_hz), "-M", "wbfm", # wideband FM demodulation "-s", "200k", # sample rate (200 kHz captures full FM channel) "-r", "48k", # resample output to 48 kHz "-g", str(gain), "-", # output to stdout ] play_cmd = [ "aplay", "-r", "48000", # 48 kHz sample rate "-f", "S16_LE", # signed 16-bit little-endian PCM "-t", "raw", # raw format (no WAV header) "-c", "1", # mono "-q", # quiet (no progress output) ] rtl_proc = None play_proc = None try: rtl_proc = subprocess.Popen(rtl_cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) play_proc = subprocess.Popen(play_cmd, stdin=rtl_proc.stdout, stderr=subprocess.DEVNULL) # Allow rtl_proc to receive SIGPIPE if play_proc exits rtl_proc.stdout.close() play_proc.wait() except KeyboardInterrupt: print("\n Stopped.") except FileNotFoundError as e: print(f" Error: {e.filename} not found.", file=sys.stderr) finally: for proc in (play_proc, rtl_proc): if proc and proc.poll() is None: proc.send_signal(signal.SIGTERM) proc.wait(timeout=3) def main(): parser = argparse.ArgumentParser( description="Scan the US FM band and rank stations by signal strength." ) parser.add_argument( "--threshold", type=float, default=10.0, help="Minimum dB above noise floor to flag as station (default: 10)", ) parser.add_argument( "--gain", type=int, default=10, help="RF tuner gain in dB (default: 10)", ) parser.add_argument( "--json", metavar="FILE", help="Save results to JSON file", ) parser.add_argument( "--all", action="store_true", dest="show_all", help="Show all channels, not just detected stations", ) parser.add_argument( "--tune", nargs="?", const="pick", metavar="FREQ", help="Tune to a station after scanning (optionally specify frequency in MHz)", ) args = parser.parse_args() print("Scanning FM band (87.5–108.0 MHz)...", flush=True) raw = run_rtl_power(gain=args.gain) readings = parse_scan(raw) if not readings: print("No data received from rtl_power.", file=sys.stderr) sys.exit(1) channels = aggregate_channels(readings) stations, noise_floor = detect_stations(channels, threshold_db=args.threshold) display_results( stations, noise_floor, all_channels=channels, show_all=args.show_all, ) if args.json: save_json(stations, noise_floor, args.json) if args.tune is not None: if args.tune == "pick": freq = pick_station(stations) else: freq = float(args.tune) if freq: tune_station(freq, gain=args.gain) if __name__ == "__main__": main()