gr-mcp/examples/lora_infrastructure_test.py
Ryan Malloy e8b3600e60 feat: add LoRa SDR receiver with Docker runtime infrastructure
LoRa receiver flowgraph built programmatically via gr-mcp:
- osmosdr_source → low_pass_filter → lora_rx → message_debug
- XML-RPC server for runtime variable control (samp_rate,
  center_freq) with introspection enabled
- Qt frequency sink for spectrum visualization

Docker infrastructure:
- gnuradio-lora: gr-lora_sdr OOT module from EPFL (chirp spread spectrum)
- gnuradio-lora-runtime: combined runtime with Xvfb + gr-lora_sdr
- Compose file, entrypoint, and launch script for LoRa receiver

Also includes:
- lora_scanner.py: multi-SF LoRa scanner example
- lora_infrastructure_test.py: hardware-free pipeline validation
  (signal_source → throttle → null_sink + xmlrpc variable control)
- Integration tests for LoRa scanner flowgraph construction

End-to-end pipeline validated: launch_flowgraph → connect_to_container →
list_variables → get/set_variable all working through Docker + XML-RPC.
2026-01-30 13:55:40 -07:00

118 lines
3.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: LoRa Infrastructure Test
# Description: Validates the gr-mcp runtime pipeline (Docker + XML-RPC)
# without requiring SDR hardware.
# GNU Radio version: 3.10.12.0
from gnuradio import analog, blocks, gr
from xmlrpc.server import SimpleXMLRPCServer
import signal
import sys
import threading
import time
class lora_infra_test(gr.top_block):
"""Minimal flowgraph for testing gr-mcp runtime infrastructure.
Signal chain: sig_source → throttle → null_sink
Variables exposed via XML-RPC: samp_rate, center_freq, lora_sf, lora_bw
"""
def __init__(self):
gr.top_block.__init__(self, "LoRa Infrastructure Test", catch_exceptions=True)
##################################################
# Variables (same as LoRa receiver for API compatibility)
##################################################
self.samp_rate = samp_rate = 1000000
self.center_freq = center_freq = 915e6
self.lora_sf = lora_sf = 7
self.lora_bw = lora_bw = 125000
##################################################
# Blocks
##################################################
self.analog_sig_source_0 = analog.sig_source_c(
samp_rate, analog.GR_COS_WAVE, 1000, 1, 0, 0
)
self.blocks_throttle_0 = blocks.throttle(gr.sizeof_gr_complex, samp_rate, True)
self.blocks_null_sink_0 = blocks.null_sink(gr.sizeof_gr_complex)
# XML-RPC server for runtime variable control
self.xmlrpc_server_0 = SimpleXMLRPCServer(("0.0.0.0", 8080), allow_none=True)
self.xmlrpc_server_0.register_introspection_functions()
self.xmlrpc_server_0.register_instance(self)
self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever)
self.xmlrpc_server_0_thread.daemon = True
self.xmlrpc_server_0_thread.start()
##################################################
# Connections
##################################################
self.connect((self.analog_sig_source_0, 0), (self.blocks_throttle_0, 0))
self.connect((self.blocks_throttle_0, 0), (self.blocks_null_sink_0, 0))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.analog_sig_source_0.set_sampling_freq(self.samp_rate)
self.blocks_throttle_0.set_sample_rate(self.samp_rate)
def get_center_freq(self):
return self.center_freq
def set_center_freq(self, center_freq):
self.center_freq = center_freq
def get_lora_sf(self):
return self.lora_sf
def set_lora_sf(self, lora_sf):
self.lora_sf = lora_sf
def get_lora_bw(self):
return self.lora_bw
def set_lora_bw(self, lora_bw):
self.lora_bw = lora_bw
def main(top_block_cls=lora_infra_test, options=None):
tb = top_block_cls()
tb.start()
print("Flowgraph started, XML-RPC on 0.0.0.0:8080", flush=True)
print(f"Variables: samp_rate={tb.samp_rate}, center_freq={tb.center_freq}, "
f"lora_sf={tb.lora_sf}, lora_bw={tb.lora_bw}", flush=True)
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
# Keep alive
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
tb.stop()
tb.wait()
if __name__ == "__main__":
main()