examples: add real-time signal strength display to FM scanner

Add probe_avg_mag_sqrd_c block that taps the LPF output to measure
filtered RF power. A variable_function_probe polls it and exposes
signal_level via XML-RPC.

Display features:
- Signal strength in dB shown on tune and on 's' command
- Color-coded bar: green (strong), yellow (medium), red (weak)
- Updates after each frequency change

Signal chain now includes measurement tap:
  source → LPF → demod → audio
             ↓
        probe → var_function_probe ("signal_level")
This commit is contained in:
Ryan Malloy 2026-01-29 03:23:59 -07:00
parent b4c6efec6e
commit f6c9e465c5

View File

@ -265,6 +265,10 @@ def build_fm_receiver(freq_mhz: float, gain: int = 10) -> Path:
Signal chain:
RTL-SDR (2.4 MHz) LPF (decim 5) WBFM Demod (decim 10) Audio (48 kHz)
probe_avg_mag_sqrd variable_function_probe ("signal_level")
XML-RPC exposes: get_freq/set_freq, get_signal_level
"""
# Late import to avoid dependency when just scanning (no --tune)
try:
@ -342,11 +346,31 @@ def build_fm_receiver(freq_mhz: float, gain: int = 10) -> Path:
xmlrpc.params["addr"].set_value("0.0.0.0")
xmlrpc.params["port"].set_value(str(XMLRPC_PORT))
# Signal strength probe — measures average power of filtered RF signal
# Taps off the LPF output (complex) and computes running avg magnitude²
probe = fg.new_block("analog_probe_avg_mag_sqrd_x")
probe.params["id"].set_value("signal_probe")
probe.params["type"].set_value("complex")
probe.params["threshold"].set_value("-60") # dB threshold (for unmuted flag)
probe.params["alpha"].set_value("0.001") # Smoothing: smaller = smoother
# Variable function probe — polls signal_probe.level() and exposes as variable
# This creates get_signal_level() / set_signal_level() XML-RPC methods
level_var = fg.new_block("variable_function_probe")
level_var.params["id"].set_value("signal_level")
level_var.params["block_id"].set_value("signal_probe")
level_var.params["function_name"].set_value("level")
level_var.params["function_args"].set_value("")
level_var.params["poll_rate"].set_value("10") # 10 Hz update rate
level_var.params["value"].set_value("0")
# Connect signal chain
# source:0 → lpf:0
fg.connect(source.sources[0], lpf.sinks[0])
# lpf:0 → wfm:0
# lpf:0 → wfm:0 (audio path)
fg.connect(lpf.sources[0], wfm.sinks[0])
# lpf:0 → probe:0 (measurement tap — same signal, parallel path)
fg.connect(lpf.sources[0], probe.sinks[0])
# wfm:0 → audio:0
fg.connect(wfm.sources[0], audio.sinks[0])
@ -389,13 +413,41 @@ def wait_for_xmlrpc(url: str, timeout: float = 10.0) -> xmlrpc.client.ServerProx
sys.exit(1)
def mag_squared_to_dbm(level: float) -> float:
"""Convert magnitude² (power) to dBm.
The probe outputs average |signal|². To get dBm we use:
dBm = 10 * log10(level) + 30 (assuming 1 mW reference)
For relative measurements, we just use 10*log10(level) as dB.
"""
import math
if level <= 0:
return -100.0 # Floor for display
return 10 * math.log10(level)
def format_signal_bar(db: float, width: int = 30) -> str:
"""Format a signal strength bar for terminal display."""
# Map dB to bar: -80 dB = empty, -20 dB = full
norm = max(0.0, min(1.0, (db + 80) / 60))
filled = int(norm * width)
bar = "" * filled + "" * (width - filled)
# Color: green if strong (> -40), yellow if medium, red if weak
if db > -40:
return f"\033[32m{bar}\033[0m" # green
elif db > -60:
return f"\033[33m{bar}\033[0m" # yellow
return f"\033[31m{bar}\033[0m" # red
def tune_station(freq_mhz: float, gain: int = 10):
"""Launch a GNU Radio FM receiver and tune via XML-RPC.
Builds a flowgraph programmatically using the GRC Platform API (the same
approach gr-mcp uses), compiles it with grcc, launches the Python flowgraph
as a subprocess, and connects to its XML-RPC server for live frequency
control.
control. Shows real-time signal strength.
"""
print(f"\n Building FM receiver for {freq_mhz:.1f} MHz...")
py_path = build_fm_receiver(freq_mhz, gain)
@ -409,26 +461,45 @@ def tune_station(freq_mhz: float, gain: int = 10):
)
proxy = wait_for_xmlrpc(url)
current = proxy.get_freq()
print(f" Receiving {current / 1e6:.1f} MHz — enter frequency to retune, q to quit\n")
# Wait a moment for signal probe to stabilize
time.sleep(0.5)
def show_status():
"""Display current frequency and signal strength."""
freq = proxy.get_freq() / 1e6
try:
level = proxy.get_signal_level()
db = mag_squared_to_dbm(level)
bar = format_signal_bar(db)
print(f" {freq:>5.1f} MHz {db:>6.1f} dB {bar}")
except Exception:
print(f" {freq:>5.1f} MHz (signal level unavailable)")
show_status()
print("\n Commands: frequency (MHz), 's' for signal, 'q' to quit\n")
try:
while fg_proc.poll() is None:
try:
cmd = input(" freq> ").strip()
cmd = input(" > ").strip()
except EOFError:
break
if cmd.lower() in ("q", "quit", ""):
break
if cmd.lower() == "s":
show_status()
continue
try:
new_freq = float(cmd)
if 87.5 <= new_freq <= 108.0:
proxy.set_freq(new_freq * 1e6)
print(f" Tuned to {new_freq:.1f} MHz")
time.sleep(0.3) # Let signal stabilize
show_status()
else:
print(" Frequency must be 87.5108.0 MHz.")
except ValueError:
print(" Enter a frequency (MHz) or q to quit.")
print(" Enter frequency (MHz), 's' for signal, 'q' to quit.")
except KeyboardInterrupt:
pass