gr-mcp/examples/multi_sf_lora_rx.py
Ryan Malloy 8800d35fd4 add Starlight docs site, LoRa examples, and clean up .gitignore
- Starlight docs: 28 pages covering getting started, guides, tool
  reference, concepts (architecture, dynamic tools, runtime comms)
- LoRa examples: channel scanner, quality analyzer, multi-SF receiver
  with both .grc and .py forms, plus ADSB+LoRa combo test
- .gitignore: exclude generated artifacts (*_patched_*.py, *.wav,
  docs build cache, tests/scratch/)
- Add .mcp.json for local MCP server config
- Sync uv.lock with date-based version
2026-02-24 09:34:50 -07:00

156 lines
5.7 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: Multi-SF LoRa Receiver
# Author: gr-mcp
# Description: Simultaneous LoRa decoder for SF7-SF12. 915MHz, BW125k.
# GNU Radio version: 3.10.12.0
from gnuradio import blocks, gr
from gnuradio import filter
from gnuradio.filter import firdes
from gnuradio import gr
from gnuradio.fft import window
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from xmlrpc.server import SimpleXMLRPCServer
import threading
import gnuradio.lora_sdr as lora_sdr
import osmosdr
import time
class multi_sf_lora_rx(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "Multi-SF LoRa Receiver", catch_exceptions=True)
self.flowgraph_started = threading.Event()
##################################################
# Variables
##################################################
self.samp_rate = samp_rate = 1000000
self.rf_gain = rf_gain = 40
self.lora_bw = lora_bw = 125000
self.center_freq = center_freq = 915e6
##################################################
# Blocks
##################################################
self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True)
self.xmlrpc_server_0.register_instance(self)
self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever)
self.xmlrpc_server_0_thread.daemon = True
self.xmlrpc_server_0_thread.start()
self.osmosdr_source_0 = osmosdr.source(
args="numchan=" + str(1) + " " + 'rtl=0'
)
self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t())
self.osmosdr_source_0.set_sample_rate(samp_rate)
self.osmosdr_source_0.set_center_freq(center_freq, 0)
self.osmosdr_source_0.set_freq_corr(0, 0)
self.osmosdr_source_0.set_dc_offset_mode(2, 0)
self.osmosdr_source_0.set_iq_balance_mode(2, 0)
self.osmosdr_source_0.set_gain_mode(False, 0)
self.osmosdr_source_0.set_gain(rf_gain, 0)
self.osmosdr_source_0.set_if_gain(20, 0)
self.osmosdr_source_0.set_bb_gain(20, 0)
self.osmosdr_source_0.set_antenna('', 0)
self.osmosdr_source_0.set_bandwidth(0, 0)
self.low_pass_filter_0 = filter.fir_filter_ccf(
2,
firdes.low_pass(
1,
samp_rate,
200e3,
50e3,
window.WIN_HAMMING,
6.76))
self.lora_rx_3 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=10, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
self.lora_rx_2 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=9, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
self.lora_rx_1 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=8, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
self.lora_rx_0 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=7, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
self.blocks_message_debug_0 = blocks.message_debug(True, gr.log_levels.info)
##################################################
# Connections
##################################################
self.msg_connect((self.lora_rx_0, 'out'), (self.blocks_message_debug_0, 'print'))
self.msg_connect((self.lora_rx_1, 'out'), (self.blocks_message_debug_0, 'print'))
self.msg_connect((self.lora_rx_2, 'out'), (self.blocks_message_debug_0, 'print'))
self.msg_connect((self.lora_rx_3, 'out'), (self.blocks_message_debug_0, 'print'))
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_0, 0))
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_1, 0))
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_2, 0))
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_3, 0))
self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76))
self.osmosdr_source_0.set_sample_rate(self.samp_rate)
def get_rf_gain(self):
return self.rf_gain
def set_rf_gain(self, rf_gain):
self.rf_gain = rf_gain
self.osmosdr_source_0.set_gain(self.rf_gain, 0)
def get_lora_bw(self):
return self.lora_bw
def set_lora_bw(self, lora_bw):
self.lora_bw = lora_bw
def get_center_freq(self):
return self.center_freq
def set_center_freq(self, center_freq):
self.center_freq = center_freq
self.osmosdr_source_0.set_center_freq(self.center_freq, 0)
def main(top_block_cls=multi_sf_lora_rx, options=None):
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.flowgraph_started.set()
try:
input('Press Enter to quit: ')
except EOFError:
pass
tb.stop()
tb.wait()
if __name__ == '__main__':
main()