gr-mcp/examples/lora_scanner.py
Ryan Malloy e8b3600e60 feat: add LoRa SDR receiver with Docker runtime infrastructure
LoRa receiver flowgraph built programmatically via gr-mcp:
- osmosdr_source → low_pass_filter → lora_rx → message_debug
- XML-RPC server for runtime variable control (samp_rate,
  center_freq) with introspection enabled
- Qt frequency sink for spectrum visualization

Docker infrastructure:
- gnuradio-lora: gr-lora_sdr OOT module from EPFL (chirp spread spectrum)
- gnuradio-lora-runtime: combined runtime with Xvfb + gr-lora_sdr
- Compose file, entrypoint, and launch script for LoRa receiver

Also includes:
- lora_scanner.py: multi-SF LoRa scanner example
- lora_infrastructure_test.py: hardware-free pipeline validation
  (signal_source → throttle → null_sink + xmlrpc variable control)
- Integration tests for LoRa scanner flowgraph construction

End-to-end pipeline validated: launch_flowgraph → connect_to_container →
list_variables → get/set_variable all working through Docker + XML-RPC.
2026-01-30 13:55:40 -07:00

733 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""LoRa Band Scanner — scan 902928 MHz US ISM band for LoRa activity.
Scanning uses rtl_power to sweep the band and detect RF activity.
Decoding builds a gr-lora_sdr receiver flowgraph programmatically using
the same GRC Platform API that gr-mcp uses, compiles it with grcc, and
controls it at runtime via XML-RPC for live parameter changes.
gr-lora_sdr: https://github.com/tapparelj/gr-lora_sdr
"""
import argparse
import csv
import io
import json
import math
import shutil
import signal
import subprocess
import sys
import tempfile
import time
import xmlrpc.client
from collections import defaultdict
from pathlib import Path
# --- Phase A: Band scanning (rtl_power sweep) ---
def run_lora_scan(gain: int = 20) -> str:
"""Execute rtl_power for a single sweep of the US ISM 902928 MHz band.
Uses 50 kHz bins (finer than 125 kHz LoRa channel BW) for better
resolution. Integration time is 2 seconds to catch bursty LoRa packets.
"""
cmd = [
"rtl_power",
"-f", "902M:928M:50k",
"-g", str(gain),
"-i", "2", # 2s integration (LoRa is bursty, needs longer dwell)
"-1", # single-shot
"-", # stdout
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
except FileNotFoundError:
print("Error: rtl_power not found. Install rtl-sdr tools.", file=sys.stderr)
sys.exit(1)
except subprocess.TimeoutExpired:
print("Error: rtl_power timed out after 60 seconds.", file=sys.stderr)
sys.exit(1)
if result.returncode != 0:
stderr = result.stderr.strip()
print(f"Error: rtl_power exited with code {result.returncode}", file=sys.stderr)
if stderr:
print(stderr, file=sys.stderr)
sys.exit(1)
return result.stdout
def parse_lora_scan(csv_data: str) -> list[tuple[float, float]]:
"""Parse rtl_power CSV output into (frequency_mhz, power_dbm) pairs.
rtl_power CSV format per row:
date, time, freq_low_hz, freq_high_hz, bin_step_hz, num_samples, dBm, dBm, ...
Each row covers a frequency range with multiple FFT bins. We compute the
center frequency of each bin and pair it with its power reading.
"""
readings: list[tuple[float, float]] = []
reader = csv.reader(io.StringIO(csv_data))
for row in reader:
if len(row) < 7:
continue
try:
freq_low = float(row[2].strip())
freq_high = float(row[3].strip())
bin_step = float(row[4].strip())
power_values = [float(v.strip()) for v in row[6:] if v.strip()]
except (ValueError, IndexError):
continue
for i, power in enumerate(power_values):
freq_hz = freq_low + (i * bin_step) + (bin_step / 2)
freq_mhz = freq_hz / 1e6
readings.append((freq_mhz, power))
return readings
def aggregate_lora_channels(
readings: list[tuple[float, float]], channel_bw_khz: int = 125
) -> list[dict]:
"""Aggregate raw FFT bins into LoRa-width channels.
LoRa typically uses 125 kHz bandwidth per channel. We snap each reading
to the nearest channel grid and take the max power across all bins in
that channel (peak represents the carrier/chirp).
"""
channel_step_mhz = channel_bw_khz / 1000.0 # 0.125 MHz
channel_bins: dict[float, list[float]] = defaultdict(list)
for freq_mhz, power in readings:
# Snap to nearest channel center
channel = round(round(freq_mhz / channel_step_mhz) * channel_step_mhz, 3)
if 902.0 <= channel <= 928.0:
channel_bins[channel].append(power)
channels = []
for freq in sorted(channel_bins):
powers = channel_bins[freq]
max_power = max(powers)
channels.append({"freq_mhz": freq, "power_dbm": max_power})
return channels
def detect_lora_activity(
channels: list[dict], threshold_db: float = 8.0
) -> tuple[list[dict], float]:
"""Find channels with activity above the noise floor.
LoRa signals are bursty and spread-spectrum, so they appear closer to
the noise floor than narrowband FM. We use a lower default threshold
(8 dB vs 10 dB for FM).
Returns (active_channels_sorted_by_power, noise_floor_dbm).
"""
if not channels:
return [], -99.0
powers = sorted(ch["power_dbm"] for ch in channels)
noise_floor = powers[len(powers) // 2] # median
active = []
for ch in channels:
snr = ch["power_dbm"] - noise_floor
if snr >= threshold_db:
active.append({**ch, "snr_db": round(snr, 1)})
active.sort(key=lambda s: s["power_dbm"], reverse=True)
return active, noise_floor
def display_lora_results(
active_channels: list[dict],
noise_floor: float,
all_channels: list[dict] | None = None,
show_all: bool = False,
):
"""Print a formatted table of LoRa band scan results."""
term_width = shutil.get_terminal_size((80, 24)).columns
bar_max = max(32, term_width - 48)
items = all_channels if (show_all and all_channels) else active_channels
if not items:
print("No LoRa activity detected.")
return
powers = [ch["power_dbm"] for ch in items]
p_min = noise_floor
p_max = max(powers)
p_range = p_max - p_min if p_max != p_min else 1.0
header = "LoRa Band Scan \u2014 902 to 928 MHz (US ISM)"
print()
print(f" {header}")
print(f" {'' * (len(header) + 2)}")
print(f" {'#':>3} {'Channel':<14} {'Power':<10} Activity")
print(f" {'' * 3} {'' * 14} {'' * 9} {'' * bar_max}")
block_chars = " \u2581\u2582\u2583\u2584\u2585\u2586\u2587\u2588"
for i, ch in enumerate(items, 1):
freq = ch["freq_mhz"]
power = ch["power_dbm"]
norm = max(0.0, min(1.0, (power - p_min) / p_range))
bar_len = norm * bar_max
full_blocks = int(bar_len)
frac = bar_len - full_blocks
frac_char = block_chars[int(frac * 8)] if frac > 0.05 else ""
bar = "\u2588" * full_blocks + frac_char
if "snr_db" in ch and ch["snr_db"] >= 10:
bar = f"\033[32m{bar}\033[0m" # green
elif "snr_db" in ch:
bar = f"\033[33m{bar}\033[0m" # yellow
elif show_all:
bar = f"\033[2m{bar}\033[0m" # dim
label = f"{freq:>7.3f} MHz"
print(f" {i:>3} {label:<14} {power:>7.1f} dBm {bar}")
print(f" {'' * (len(header) + 2)}")
print(
f" Noise floor: {noise_floor:.1f} dBm | "
f"Active channels: {len(active_channels)}"
)
print()
def save_json(active_channels: list[dict], noise_floor: float, path: str):
"""Write scan results to a JSON file."""
data = {
"band": "LoRa ISM",
"range_mhz": [902.0, 928.0],
"noise_floor_dbm": round(noise_floor, 1),
"active_channel_count": len(active_channels),
"channels": [
{
"freq_mhz": s["freq_mhz"],
"power_dbm": round(s["power_dbm"], 1),
"snr_db": s["snr_db"],
}
for s in active_channels
],
}
Path(path).write_text(json.dumps(data, indent=2) + "\n")
print(f"Results saved to {path}")
def pick_channel(active_channels: list[dict]) -> float | None:
"""Interactive channel picker. Returns frequency in MHz or None to quit."""
if not active_channels:
print("No active channels to choose from.")
return None
try:
choice = input(" Tune to channel # (or q to quit): ").strip()
except (EOFError, KeyboardInterrupt):
print()
return None
if choice.lower() in ("q", "quit", ""):
return None
try:
idx = int(choice) - 1
if 0 <= idx < len(active_channels):
return active_channels[idx]["freq_mhz"]
print(f" Pick 1\u2013{len(active_channels)}.")
except ValueError:
try:
freq = float(choice)
if 902.0 <= freq <= 928.0:
return freq
print(" Frequency must be 902\u2013928 MHz.")
except ValueError:
print(" Enter a channel number or frequency.")
return pick_channel(active_channels)
# --- Phase B: LoRa packet receiver (gr-lora_sdr) ---
XMLRPC_PORT = 8091
def build_lora_receiver(
freq_mhz: float = 915.0,
sf: int = 7,
bw: int = 125000,
cr: int = 1,
gain: int = 20,
) -> Path:
"""Build a gr-lora_sdr receiver flowgraph programmatically.
Creates all blocks, sets parameters, connects the full LoRa decode
chain, saves to .grc, and compiles with grcc. Uses soft decoding for
~2-3 dB better sensitivity than hard decisions.
Signal chain:
RTL-SDR (1 Msps) -> frame_sync -> fft_demod -> gray_mapping ->
deinterleaver -> hamming_dec -> header_decoder -> dewhitening -> crc_verif
The header_decoder feeds frame_info back to frame_sync for adaptive
reception (a feedback loop unusual in GNU Radio flowgraphs).
XML-RPC exposes: freq, sf, bw, cr, gain (all settable at runtime)
"""
try:
from gnuradio import gr
from gnuradio.grc.core.platform import Platform
except ImportError:
print("Error: GNU Radio not found. Install gnuradio.", file=sys.stderr)
sys.exit(1)
platform = Platform(
version=gr.version(),
version_parts=(gr.major_version(), gr.api_version(), gr.minor_version()),
prefs=gr.prefs(),
)
platform.build_library()
# Verify gr-lora_sdr blocks are available
block_keys = list(platform.blocks.keys())
lora_blocks = [k for k in block_keys if "lora" in k.lower()]
if not lora_blocks:
print(
"Error: gr-lora_sdr blocks not found. Install gr-lora_sdr OOT module.",
file=sys.stderr,
)
sys.exit(1)
fg = platform.make_flow_graph()
# Configure options block
options = next(b for b in fg.blocks if b.key == "options")
options.params["id"].set_value("lora_receiver")
options.params["title"].set_value("LoRa Receiver")
options.params["generate_options"].set_value("no_gui")
options.params["run_options"].set_value("run")
# --- Variables (all exposed via XML-RPC) ---
samp_rate_var = fg.new_block("variable")
samp_rate_var.params["id"].set_value("samp_rate")
samp_rate_var.params["value"].set_value("int(1e6)")
freq_var = fg.new_block("variable")
freq_var.params["id"].set_value("freq")
freq_var.params["value"].set_value(f"{freq_mhz}e6")
sf_var = fg.new_block("variable")
sf_var.params["id"].set_value("sf")
sf_var.params["value"].set_value(str(sf))
bw_var = fg.new_block("variable")
bw_var.params["id"].set_value("bw")
bw_var.params["value"].set_value(str(bw))
cr_var = fg.new_block("variable")
cr_var.params["id"].set_value("cr")
cr_var.params["value"].set_value(str(cr))
gain_var = fg.new_block("variable")
gain_var.params["id"].set_value("gain")
gain_var.params["value"].set_value(str(gain))
# --- XML-RPC server for runtime parameter control ---
xmlrpc = fg.new_block("xmlrpc_server")
xmlrpc.params["id"].set_value("xmlrpc_server_0")
xmlrpc.params["addr"].set_value("0.0.0.0")
xmlrpc.params["port"].set_value(str(XMLRPC_PORT))
# --- RTL-SDR source ---
source = fg.new_block("osmosdr_source")
source.params["id"].set_value("osmosdr_source_0")
source.params["sample_rate"].set_value("samp_rate")
source.params["freq0"].set_value("freq")
source.params["gain0"].set_value("gain")
source.params["if_gain0"].set_value("20")
source.params["bb_gain0"].set_value("20")
source.params["args"].set_value('"rtl=0"')
# --- gr-lora_sdr decode chain ---
# frame_sync: preamble detection, STO/CFO correction
frame_sync = fg.new_block("lora_sdr_frame_sync")
frame_sync.params["id"].set_value("lora_sdr_frame_sync_0")
frame_sync.params["center_freq"].set_value("freq")
frame_sync.params["bandwidth"].set_value("bw")
frame_sync.params["sf"].set_value("sf")
frame_sync.params["impl_head"].set_value("False") # explicit header
frame_sync.params["os_factor"].set_value("4")
frame_sync.params["show_log_port"].set_value("True")
# fft_demod: chirp demodulation via FFT (soft output)
fft_demod = fg.new_block("lora_sdr_fft_demod")
fft_demod.params["id"].set_value("lora_sdr_fft_demod_0")
fft_demod.params["soft_decoding"].set_value("True")
fft_demod.params["max_log_approx"].set_value("False")
# gray_mapping: Gray code demapping (soft)
gray_map = fg.new_block("lora_sdr_gray_mapping")
gray_map.params["id"].set_value("lora_sdr_gray_mapping_0")
gray_map.params["soft_decoding"].set_value("True")
# deinterleaver: diagonal deinterleaver (soft)
deinterleaver = fg.new_block("lora_sdr_deinterleaver")
deinterleaver.params["id"].set_value("lora_sdr_deinterleaver_0")
deinterleaver.params["soft_decoding"].set_value("True")
# hamming_dec: Hamming FEC decoder (soft input -> hard output)
hamming = fg.new_block("lora_sdr_hamming_dec")
hamming.params["id"].set_value("lora_sdr_hamming_dec_0")
hamming.params["soft_decoding"].set_value("True")
# header_decoder: extract header fields, feed frame_info back to frame_sync
header_dec = fg.new_block("lora_sdr_header_decoder")
header_dec.params["id"].set_value("lora_sdr_header_decoder_0")
header_dec.params["impl_head"].set_value("False")
header_dec.params["cr"].set_value("cr")
header_dec.params["pay_len"].set_value("255")
header_dec.params["has_crc"].set_value("True")
header_dec.params["ldro"].set_value("2") # auto low-data-rate optimize
# dewhitening: XOR with LoRa whitening sequence
dewhiten = fg.new_block("lora_sdr_dewhitening")
dewhiten.params["id"].set_value("lora_sdr_dewhitening_0")
# crc_verif: CRC check and payload output
crc = fg.new_block("lora_sdr_crc_verif")
crc.params["id"].set_value("lora_sdr_crc_verif_0")
crc.params["print_rx_msg"].set_value("True")
crc.params["output_crc_check"].set_value("True")
# --- Connect signal chain ---
# RTL-SDR -> frame_sync
fg.connect(source.sources[0], frame_sync.sinks[0])
# frame_sync -> fft_demod -> gray_mapping -> deinterleaver -> hamming_dec
fg.connect(frame_sync.sources[0], fft_demod.sinks[0])
fg.connect(fft_demod.sources[0], gray_map.sinks[0])
fg.connect(gray_map.sources[0], deinterleaver.sinks[0])
fg.connect(deinterleaver.sources[0], hamming.sinks[0])
# hamming_dec -> header_decoder -> dewhitening -> crc_verif
fg.connect(hamming.sources[0], header_dec.sinks[0])
fg.connect(header_dec.sources[0], dewhiten.sinks[0])
fg.connect(dewhiten.sources[0], crc.sinks[0])
# Feedback: header_decoder frame_info (source) -> frame_sync frame_info (sink)
# This is a message port connection — header_decoder sends decoded
# SF/CR/payload length back to frame_sync for adaptive reception.
# header_decoder.sources[1] = frame_info (message)
# frame_sync.sinks[1] = frame_info (message)
_connect_frame_info_feedback(fg, header_dec, frame_sync)
# --- Save and generate ---
# gr-lora_sdr's soft decoding changes port types at runtime (short→float64)
# which GRC's static validator flags as mismatches. The types are actually
# correct at runtime — soft_decoding=True uses float paths throughout.
# Use platform.Generator directly instead of grcc (which refuses is_valid=False).
work_dir = Path(tempfile.mkdtemp(prefix="lora_receiver_"))
grc_path = work_dir / "lora_receiver.grc"
platform.save_flow_graph(str(grc_path), fg)
fg.rewrite()
generator = platform.Generator(fg, str(work_dir))
generator.write()
py_path = work_dir / "lora_receiver.py"
if not py_path.exists():
print("Error: code generation produced no Python output.", file=sys.stderr)
sys.exit(1)
return py_path
def _connect_frame_info_feedback(fg, header_dec, frame_sync):
"""Connect frame_info message port from header_decoder back to frame_sync.
gr-lora_sdr uses message ports for the feedback loop where the header
decoder sends decoded SF/CR/payload length back to frame_sync. This
allows adaptive reception of packets with different parameters.
Port layout (from block introspection):
header_decoder sources: [0]=byte (data), [1]=frame_info (message)
frame_sync sinks: [0]=complex (signal), [1]=frame_info (message)
"""
# header_decoder.sources[1] is the frame_info message output
# frame_sync.sinks[1] is the frame_info message input
if len(header_dec.sources) > 1 and len(frame_sync.sinks) > 1:
fg.connect(header_dec.sources[1], frame_sync.sinks[1])
# --- Runtime control ---
def wait_for_xmlrpc(url: str, timeout: float = 10.0) -> xmlrpc.client.ServerProxy:
"""Wait for the XML-RPC server to become reachable."""
proxy = xmlrpc.client.ServerProxy(url)
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
proxy.get_freq()
return proxy
except ConnectionRefusedError:
time.sleep(0.3)
except Exception:
return proxy
print("Error: flowgraph XML-RPC server did not start.", file=sys.stderr)
sys.exit(1)
def format_lora_params(proxy: xmlrpc.client.ServerProxy) -> str:
"""Format current LoRa parameters for display."""
try:
freq = proxy.get_freq() / 1e6
sf = int(proxy.get_sf())
bw_khz = proxy.get_bw() / 1000
cr = int(proxy.get_cr())
return f"{freq:.3f} MHz SF{sf} BW {bw_khz:.0f} kHz CR 4/{4+cr}"
except Exception:
return "(parameters unavailable)"
def tune_lora(
freq_mhz: float,
sf: int = 7,
bw: int = 125000,
cr: int = 1,
gain: int = 20,
):
"""Launch a gr-lora_sdr receiver and control via XML-RPC.
Builds the flowgraph programmatically, launches it as a subprocess,
connects to its XML-RPC server, and provides an interactive control
loop for changing LoRa parameters at runtime.
"""
print(f"\n Building LoRa receiver for {freq_mhz:.3f} MHz (SF{sf})...")
py_path = build_lora_receiver(freq_mhz, sf=sf, bw=bw, cr=cr, gain=gain)
url = f"http://localhost:{XMLRPC_PORT}"
print(f" Launching flowgraph ({py_path.name})...")
fg_proc = subprocess.Popen(
[sys.executable, str(py_path)],
stderr=subprocess.DEVNULL,
)
proxy = wait_for_xmlrpc(url)
time.sleep(0.5)
params = format_lora_params(proxy)
print(f" Listening: {params}")
print()
print(" Commands:")
print(" freq <MHz> — change frequency (e.g. 'freq 915.0')")
print(" sf <N> — change spreading factor (7-12)")
print(" bw <Hz> — change bandwidth (e.g. 'bw 250000')")
print(" cr <N> — change coding rate (1-4)")
print(" status — show current parameters")
print(" q — quit")
print()
try:
while fg_proc.poll() is None:
try:
cmd = input(" lora> ").strip()
except EOFError:
break
if not cmd or cmd.lower() in ("q", "quit"):
break
if cmd.lower() in ("s", "status"):
print(f" {format_lora_params(proxy)}")
continue
parts = cmd.split(maxsplit=1)
if len(parts) != 2:
print(" Usage: freq|sf|bw|cr <value>, status, q")
continue
param, value_str = parts[0].lower(), parts[1]
try:
if param == "freq":
new_freq = float(value_str)
if 902.0 <= new_freq <= 928.0:
proxy.set_freq(new_freq * 1e6)
time.sleep(0.3)
print(f" {format_lora_params(proxy)}")
else:
print(" Frequency must be 902\u2013928 MHz.")
elif param == "sf":
new_sf = int(value_str)
if 7 <= new_sf <= 12:
proxy.set_sf(new_sf)
time.sleep(0.3)
print(f" {format_lora_params(proxy)}")
else:
print(" Spreading factor must be 7\u201312.")
elif param == "bw":
new_bw = int(value_str)
if new_bw in (7800, 10400, 15600, 20800, 31250,
41700, 62500, 125000, 250000, 500000):
proxy.set_bw(new_bw)
time.sleep(0.3)
print(f" {format_lora_params(proxy)}")
else:
print(" Valid BWs: 7800 10400 15600 20800 31250 "
"41700 62500 125000 250000 500000")
elif param == "cr":
new_cr = int(value_str)
if 1 <= new_cr <= 4:
proxy.set_cr(new_cr)
time.sleep(0.3)
print(f" {format_lora_params(proxy)}")
else:
print(" Coding rate must be 1\u20134.")
elif param == "gain":
new_gain = int(value_str)
proxy.set_gain(new_gain)
time.sleep(0.3)
print(f" Gain set to {new_gain} dB")
else:
print(" Unknown param. Use: freq, sf, bw, cr, gain")
except ValueError:
print(f" Invalid value: {value_str}")
except Exception as e:
print(f" XML-RPC error: {e}")
except KeyboardInterrupt:
pass
print("\n Stopping flowgraph...")
if fg_proc.poll() is None:
fg_proc.send_signal(signal.SIGTERM)
try:
fg_proc.wait(timeout=5)
except subprocess.TimeoutExpired:
fg_proc.kill()
# --- CLI entry point ---
def main():
parser = argparse.ArgumentParser(
description="Scan the US ISM band (902-928 MHz) for LoRa activity."
)
parser.add_argument(
"--threshold",
type=float,
default=8.0,
help="Minimum dB above noise floor to flag as active (default: 8)",
)
parser.add_argument(
"--gain",
type=int,
default=20,
help="RF tuner gain in dB (default: 20, higher for 915 MHz)",
)
parser.add_argument(
"--json",
metavar="FILE",
help="Save scan results to JSON file",
)
parser.add_argument(
"--all",
action="store_true",
dest="show_all",
help="Show all channels, not just active ones",
)
parser.add_argument(
"--listen",
type=float,
metavar="FREQ",
help="Listen on specific frequency (MHz) without scanning first",
)
parser.add_argument(
"--tune",
action="store_true",
help="Pick an active channel to listen on after scanning",
)
parser.add_argument(
"--sf",
type=int,
default=7,
choices=range(7, 13),
help="LoRa spreading factor (default: 7)",
)
parser.add_argument(
"--bw",
type=int,
default=125000,
help="LoRa bandwidth in Hz (default: 125000)",
)
parser.add_argument(
"--cr",
type=int,
default=1,
choices=range(1, 5),
help="LoRa coding rate 1-4 (default: 1, meaning 4/5)",
)
args = parser.parse_args()
# Direct listen mode — skip scanning
if args.listen is not None:
tune_lora(
args.listen,
sf=args.sf,
bw=args.bw,
cr=args.cr,
gain=args.gain,
)
return
# Scan mode
print("Scanning LoRa band (902\u2013928 MHz)...", flush=True)
raw = run_lora_scan(gain=args.gain)
readings = parse_lora_scan(raw)
if not readings:
print("No data received from rtl_power.", file=sys.stderr)
sys.exit(1)
channels = aggregate_lora_channels(readings)
active, noise_floor = detect_lora_activity(
channels, threshold_db=args.threshold
)
display_lora_results(
active,
noise_floor,
all_channels=channels,
show_all=args.show_all,
)
if args.json:
save_json(active, noise_floor, args.json)
if args.tune:
freq = pick_channel(active)
if freq:
tune_lora(
freq,
sf=args.sf,
bw=args.bw,
cr=args.cr,
gain=args.gain,
)
if __name__ == "__main__":
main()