#!/usr/bin/env python3 """Minimal flowgraph that uses blocks from both adsb and lora_sdr. Verifies combo Docker images contain working blocks from multiple OOT modules. Runs for 5 seconds then exits cleanly. """ import signal import sys import time from xmlrpc.server import SimpleXMLRPCServer import threading from gnuradio import gr, blocks, analog # Import OOT modules to verify they're available from gnuradio import adsb as gr_adsb from gnuradio import lora_sdr as gr_lora_sdr class combo_test(gr.top_block): def __init__(self): gr.top_block.__init__(self, "Combo Test: ADS-B + LoRa SDR") ################################################## # Variables ################################################## self.samp_rate = samp_rate = 2e6 ################################################## # ADS-B: noise(float) -> throttle -> demod -> null ################################################## self.noise_adsb = analog.noise_source_f(analog.GR_GAUSSIAN, 0.01, 0) self.throttle_adsb = blocks.throttle(gr.sizeof_float, samp_rate, True) self.adsb_demod = gr_adsb.demod(samp_rate) self.null_adsb = blocks.null_sink(gr.sizeof_float) self.connect(self.noise_adsb, self.throttle_adsb, self.adsb_demod, self.null_adsb) ################################################## # LoRa: tx -> throttle -> null ################################################## self.lora_tx = gr_lora_sdr.lora_sdr_lora_tx( samp_rate=125000, bw=125000, sf=7, impl_head=True, cr=1, has_crc=True, ldro_mode=2, frame_zero_padd=128, ) self.throttle_lora = blocks.throttle(gr.sizeof_gr_complex, 125000, True) self.null_lora = blocks.null_sink(gr.sizeof_gr_complex) self.connect(self.lora_tx, self.throttle_lora, self.null_lora) ################################################## # XML-RPC for runtime control ################################################## self.xmlrpc_port = 8080 self.xmlrpc_server = SimpleXMLRPCServer( ("0.0.0.0", self.xmlrpc_port), allow_none=True, logRequests=False, ) self.xmlrpc_server.register_instance(self) threading.Thread( target=self.xmlrpc_server.serve_forever, daemon=True, ).start() def main(): tb = combo_test() print(f"[combo_test] Starting flowgraph with ADS-B + LoRa SDR blocks") print(f"[combo_test] XML-RPC on port {tb.xmlrpc_port}") print(f"[combo_test] ADS-B demod: {type(tb.adsb_demod).__name__}") print(f"[combo_test] LoRa TX: {type(tb.lora_tx).__name__}") sys.stdout.flush() tb.start() def sig_handler(sig, frame): tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGTERM, sig_handler) signal.signal(signal.SIGINT, sig_handler) # Run for a bit then exit time.sleep(5) print("[combo_test] Flowgraph ran successfully for 5s, stopping") sys.stdout.flush() tb.stop() tb.wait() if __name__ == "__main__": main()