gr-mcp/examples/combo_test_adsb_lora.py
Ryan Malloy 8800d35fd4 add Starlight docs site, LoRa examples, and clean up .gitignore
- Starlight docs: 28 pages covering getting started, guides, tool
  reference, concepts (architecture, dynamic tools, runtime comms)
- LoRa examples: channel scanner, quality analyzer, multi-SF receiver
  with both .grc and .py forms, plus ADSB+LoRa combo test
- .gitignore: exclude generated artifacts (*_patched_*.py, *.wav,
  docs build cache, tests/scratch/)
- Add .mcp.json for local MCP server config
- Sync uv.lock with date-based version
2026-02-24 09:34:50 -07:00

101 lines
3.2 KiB
Python

#!/usr/bin/env python3
"""Minimal flowgraph that uses blocks from both adsb and lora_sdr.
Verifies combo Docker images contain working blocks from multiple
OOT modules. Runs for 5 seconds then exits cleanly.
"""
import signal
import sys
import time
from xmlrpc.server import SimpleXMLRPCServer
import threading
from gnuradio import gr, blocks, analog
# Import OOT modules to verify they're available
from gnuradio import adsb as gr_adsb
from gnuradio import lora_sdr as gr_lora_sdr
class combo_test(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "Combo Test: ADS-B + LoRa SDR")
##################################################
# Variables
##################################################
self.samp_rate = samp_rate = 2e6
##################################################
# ADS-B: noise(float) -> throttle -> demod -> null
##################################################
self.noise_adsb = analog.noise_source_f(analog.GR_GAUSSIAN, 0.01, 0)
self.throttle_adsb = blocks.throttle(gr.sizeof_float, samp_rate, True)
self.adsb_demod = gr_adsb.demod(samp_rate)
self.null_adsb = blocks.null_sink(gr.sizeof_float)
self.connect(self.noise_adsb, self.throttle_adsb, self.adsb_demod, self.null_adsb)
##################################################
# LoRa: tx -> throttle -> null
##################################################
self.lora_tx = gr_lora_sdr.lora_sdr_lora_tx(
samp_rate=125000,
bw=125000,
sf=7,
impl_head=True,
cr=1,
has_crc=True,
ldro_mode=2,
frame_zero_padd=128,
)
self.throttle_lora = blocks.throttle(gr.sizeof_gr_complex, 125000, True)
self.null_lora = blocks.null_sink(gr.sizeof_gr_complex)
self.connect(self.lora_tx, self.throttle_lora, self.null_lora)
##################################################
# XML-RPC for runtime control
##################################################
self.xmlrpc_port = 8080
self.xmlrpc_server = SimpleXMLRPCServer(
("0.0.0.0", self.xmlrpc_port),
allow_none=True,
logRequests=False,
)
self.xmlrpc_server.register_instance(self)
threading.Thread(
target=self.xmlrpc_server.serve_forever,
daemon=True,
).start()
def main():
tb = combo_test()
print(f"[combo_test] Starting flowgraph with ADS-B + LoRa SDR blocks")
print(f"[combo_test] XML-RPC on port {tb.xmlrpc_port}")
print(f"[combo_test] ADS-B demod: {type(tb.adsb_demod).__name__}")
print(f"[combo_test] LoRa TX: {type(tb.lora_tx).__name__}")
sys.stdout.flush()
tb.start()
def sig_handler(sig, frame):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGTERM, sig_handler)
signal.signal(signal.SIGINT, sig_handler)
# Run for a bit then exit
time.sleep(5)
print("[combo_test] Flowgraph ran successfully for 5s, stopping")
sys.stdout.flush()
tb.stop()
tb.wait()
if __name__ == "__main__":
main()