- Starlight docs: 28 pages covering getting started, guides, tool reference, concepts (architecture, dynamic tools, runtime comms) - LoRa examples: channel scanner, quality analyzer, multi-SF receiver with both .grc and .py forms, plus ADSB+LoRa combo test - .gitignore: exclude generated artifacts (*_patched_*.py, *.wav, docs build cache, tests/scratch/) - Add .mcp.json for local MCP server config - Sync uv.lock with date-based version
101 lines
3.2 KiB
Python
101 lines
3.2 KiB
Python
#!/usr/bin/env python3
|
|
"""Minimal flowgraph that uses blocks from both adsb and lora_sdr.
|
|
|
|
Verifies combo Docker images contain working blocks from multiple
|
|
OOT modules. Runs for 5 seconds then exits cleanly.
|
|
"""
|
|
|
|
import signal
|
|
import sys
|
|
import time
|
|
from xmlrpc.server import SimpleXMLRPCServer
|
|
import threading
|
|
|
|
from gnuradio import gr, blocks, analog
|
|
|
|
# Import OOT modules to verify they're available
|
|
from gnuradio import adsb as gr_adsb
|
|
from gnuradio import lora_sdr as gr_lora_sdr
|
|
|
|
|
|
class combo_test(gr.top_block):
|
|
def __init__(self):
|
|
gr.top_block.__init__(self, "Combo Test: ADS-B + LoRa SDR")
|
|
|
|
##################################################
|
|
# Variables
|
|
##################################################
|
|
self.samp_rate = samp_rate = 2e6
|
|
|
|
##################################################
|
|
# ADS-B: noise(float) -> throttle -> demod -> null
|
|
##################################################
|
|
self.noise_adsb = analog.noise_source_f(analog.GR_GAUSSIAN, 0.01, 0)
|
|
self.throttle_adsb = blocks.throttle(gr.sizeof_float, samp_rate, True)
|
|
self.adsb_demod = gr_adsb.demod(samp_rate)
|
|
self.null_adsb = blocks.null_sink(gr.sizeof_float)
|
|
|
|
self.connect(self.noise_adsb, self.throttle_adsb, self.adsb_demod, self.null_adsb)
|
|
|
|
##################################################
|
|
# LoRa: tx -> throttle -> null
|
|
##################################################
|
|
self.lora_tx = gr_lora_sdr.lora_sdr_lora_tx(
|
|
samp_rate=125000,
|
|
bw=125000,
|
|
sf=7,
|
|
impl_head=True,
|
|
cr=1,
|
|
has_crc=True,
|
|
ldro_mode=2,
|
|
frame_zero_padd=128,
|
|
)
|
|
self.throttle_lora = blocks.throttle(gr.sizeof_gr_complex, 125000, True)
|
|
self.null_lora = blocks.null_sink(gr.sizeof_gr_complex)
|
|
|
|
self.connect(self.lora_tx, self.throttle_lora, self.null_lora)
|
|
|
|
##################################################
|
|
# XML-RPC for runtime control
|
|
##################################################
|
|
self.xmlrpc_port = 8080
|
|
self.xmlrpc_server = SimpleXMLRPCServer(
|
|
("0.0.0.0", self.xmlrpc_port),
|
|
allow_none=True,
|
|
logRequests=False,
|
|
)
|
|
self.xmlrpc_server.register_instance(self)
|
|
threading.Thread(
|
|
target=self.xmlrpc_server.serve_forever,
|
|
daemon=True,
|
|
).start()
|
|
|
|
|
|
def main():
|
|
tb = combo_test()
|
|
print(f"[combo_test] Starting flowgraph with ADS-B + LoRa SDR blocks")
|
|
print(f"[combo_test] XML-RPC on port {tb.xmlrpc_port}")
|
|
print(f"[combo_test] ADS-B demod: {type(tb.adsb_demod).__name__}")
|
|
print(f"[combo_test] LoRa TX: {type(tb.lora_tx).__name__}")
|
|
sys.stdout.flush()
|
|
tb.start()
|
|
|
|
def sig_handler(sig, frame):
|
|
tb.stop()
|
|
tb.wait()
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, sig_handler)
|
|
signal.signal(signal.SIGINT, sig_handler)
|
|
|
|
# Run for a bit then exit
|
|
time.sleep(5)
|
|
print("[combo_test] Flowgraph ran successfully for 5s, stopping")
|
|
sys.stdout.flush()
|
|
tb.stop()
|
|
tb.wait()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|