diff --git a/examples/fm_scanner.py b/examples/fm_scanner.py index 494d655..cb7cbf8 100755 --- a/examples/fm_scanner.py +++ b/examples/fm_scanner.py @@ -265,6 +265,10 @@ def build_fm_receiver(freq_mhz: float, gain: int = 10) -> Path: Signal chain: RTL-SDR (2.4 MHz) → LPF (decim 5) → WBFM Demod (decim 10) → Audio (48 kHz) + ↓ + probe_avg_mag_sqrd → variable_function_probe ("signal_level") + + XML-RPC exposes: get_freq/set_freq, get_signal_level """ # Late import to avoid dependency when just scanning (no --tune) try: @@ -342,11 +346,31 @@ def build_fm_receiver(freq_mhz: float, gain: int = 10) -> Path: xmlrpc.params["addr"].set_value("0.0.0.0") xmlrpc.params["port"].set_value(str(XMLRPC_PORT)) + # Signal strength probe — measures average power of filtered RF signal + # Taps off the LPF output (complex) and computes running avg magnitude² + probe = fg.new_block("analog_probe_avg_mag_sqrd_x") + probe.params["id"].set_value("signal_probe") + probe.params["type"].set_value("complex") + probe.params["threshold"].set_value("-60") # dB threshold (for unmuted flag) + probe.params["alpha"].set_value("0.001") # Smoothing: smaller = smoother + + # Variable function probe — polls signal_probe.level() and exposes as variable + # This creates get_signal_level() / set_signal_level() XML-RPC methods + level_var = fg.new_block("variable_function_probe") + level_var.params["id"].set_value("signal_level") + level_var.params["block_id"].set_value("signal_probe") + level_var.params["function_name"].set_value("level") + level_var.params["function_args"].set_value("") + level_var.params["poll_rate"].set_value("10") # 10 Hz update rate + level_var.params["value"].set_value("0") + # Connect signal chain # source:0 → lpf:0 fg.connect(source.sources[0], lpf.sinks[0]) - # lpf:0 → wfm:0 + # lpf:0 → wfm:0 (audio path) fg.connect(lpf.sources[0], wfm.sinks[0]) + # lpf:0 → probe:0 (measurement tap — same signal, parallel path) + fg.connect(lpf.sources[0], probe.sinks[0]) # wfm:0 → audio:0 fg.connect(wfm.sources[0], audio.sinks[0]) @@ -389,13 +413,41 @@ def wait_for_xmlrpc(url: str, timeout: float = 10.0) -> xmlrpc.client.ServerProx sys.exit(1) +def mag_squared_to_dbm(level: float) -> float: + """Convert magnitude² (power) to dBm. + + The probe outputs average |signal|². To get dBm we use: + dBm = 10 * log10(level) + 30 (assuming 1 mW reference) + + For relative measurements, we just use 10*log10(level) as dB. + """ + import math + if level <= 0: + return -100.0 # Floor for display + return 10 * math.log10(level) + + +def format_signal_bar(db: float, width: int = 30) -> str: + """Format a signal strength bar for terminal display.""" + # Map dB to bar: -80 dB = empty, -20 dB = full + norm = max(0.0, min(1.0, (db + 80) / 60)) + filled = int(norm * width) + bar = "█" * filled + "░" * (width - filled) + # Color: green if strong (> -40), yellow if medium, red if weak + if db > -40: + return f"\033[32m{bar}\033[0m" # green + elif db > -60: + return f"\033[33m{bar}\033[0m" # yellow + return f"\033[31m{bar}\033[0m" # red + + def tune_station(freq_mhz: float, gain: int = 10): """Launch a GNU Radio FM receiver and tune via XML-RPC. Builds a flowgraph programmatically using the GRC Platform API (the same approach gr-mcp uses), compiles it with grcc, launches the Python flowgraph as a subprocess, and connects to its XML-RPC server for live frequency - control. + control. Shows real-time signal strength. """ print(f"\n Building FM receiver for {freq_mhz:.1f} MHz...") py_path = build_fm_receiver(freq_mhz, gain) @@ -409,26 +461,45 @@ def tune_station(freq_mhz: float, gain: int = 10): ) proxy = wait_for_xmlrpc(url) - current = proxy.get_freq() - print(f" Receiving {current / 1e6:.1f} MHz — enter frequency to retune, q to quit\n") + + # Wait a moment for signal probe to stabilize + time.sleep(0.5) + + def show_status(): + """Display current frequency and signal strength.""" + freq = proxy.get_freq() / 1e6 + try: + level = proxy.get_signal_level() + db = mag_squared_to_dbm(level) + bar = format_signal_bar(db) + print(f" {freq:>5.1f} MHz {db:>6.1f} dB {bar}") + except Exception: + print(f" {freq:>5.1f} MHz (signal level unavailable)") + + show_status() + print("\n Commands: frequency (MHz), 's' for signal, 'q' to quit\n") try: while fg_proc.poll() is None: try: - cmd = input(" freq> ").strip() + cmd = input(" > ").strip() except EOFError: break if cmd.lower() in ("q", "quit", ""): break + if cmd.lower() == "s": + show_status() + continue try: new_freq = float(cmd) if 87.5 <= new_freq <= 108.0: proxy.set_freq(new_freq * 1e6) - print(f" Tuned to {new_freq:.1f} MHz") + time.sleep(0.3) # Let signal stabilize + show_status() else: print(" Frequency must be 87.5–108.0 MHz.") except ValueError: - print(" Enter a frequency (MHz) or q to quit.") + print(" Enter frequency (MHz), 's' for signal, 'q' to quit.") except KeyboardInterrupt: pass