- Starlight docs: 28 pages covering getting started, guides, tool reference, concepts (architecture, dynamic tools, runtime comms) - LoRa examples: channel scanner, quality analyzer, multi-SF receiver with both .grc and .py forms, plus ADSB+LoRa combo test - .gitignore: exclude generated artifacts (*_patched_*.py, *.wav, docs build cache, tests/scratch/) - Add .mcp.json for local MCP server config - Sync uv.lock with date-based version
156 lines
5.7 KiB
Python
Executable File
156 lines
5.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
#
|
|
# SPDX-License-Identifier: GPL-3.0
|
|
#
|
|
# GNU Radio Python Flow Graph
|
|
# Title: Multi-SF LoRa Receiver
|
|
# Author: gr-mcp
|
|
# Description: Simultaneous LoRa decoder for SF7-SF12. 915MHz, BW125k.
|
|
# GNU Radio version: 3.10.12.0
|
|
|
|
from gnuradio import blocks, gr
|
|
from gnuradio import filter
|
|
from gnuradio.filter import firdes
|
|
from gnuradio import gr
|
|
from gnuradio.fft import window
|
|
import sys
|
|
import signal
|
|
from argparse import ArgumentParser
|
|
from gnuradio.eng_arg import eng_float, intx
|
|
from gnuradio import eng_notation
|
|
from xmlrpc.server import SimpleXMLRPCServer
|
|
import threading
|
|
import gnuradio.lora_sdr as lora_sdr
|
|
import osmosdr
|
|
import time
|
|
|
|
|
|
|
|
|
|
class multi_sf_lora_rx(gr.top_block):
|
|
|
|
def __init__(self):
|
|
gr.top_block.__init__(self, "Multi-SF LoRa Receiver", catch_exceptions=True)
|
|
self.flowgraph_started = threading.Event()
|
|
|
|
##################################################
|
|
# Variables
|
|
##################################################
|
|
self.samp_rate = samp_rate = 1000000
|
|
self.rf_gain = rf_gain = 40
|
|
self.lora_bw = lora_bw = 125000
|
|
self.center_freq = center_freq = 915e6
|
|
|
|
##################################################
|
|
# Blocks
|
|
##################################################
|
|
|
|
self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True)
|
|
self.xmlrpc_server_0.register_instance(self)
|
|
self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever)
|
|
self.xmlrpc_server_0_thread.daemon = True
|
|
self.xmlrpc_server_0_thread.start()
|
|
self.osmosdr_source_0 = osmosdr.source(
|
|
args="numchan=" + str(1) + " " + 'rtl=0'
|
|
)
|
|
self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t())
|
|
self.osmosdr_source_0.set_sample_rate(samp_rate)
|
|
self.osmosdr_source_0.set_center_freq(center_freq, 0)
|
|
self.osmosdr_source_0.set_freq_corr(0, 0)
|
|
self.osmosdr_source_0.set_dc_offset_mode(2, 0)
|
|
self.osmosdr_source_0.set_iq_balance_mode(2, 0)
|
|
self.osmosdr_source_0.set_gain_mode(False, 0)
|
|
self.osmosdr_source_0.set_gain(rf_gain, 0)
|
|
self.osmosdr_source_0.set_if_gain(20, 0)
|
|
self.osmosdr_source_0.set_bb_gain(20, 0)
|
|
self.osmosdr_source_0.set_antenna('', 0)
|
|
self.osmosdr_source_0.set_bandwidth(0, 0)
|
|
self.low_pass_filter_0 = filter.fir_filter_ccf(
|
|
2,
|
|
firdes.low_pass(
|
|
1,
|
|
samp_rate,
|
|
200e3,
|
|
50e3,
|
|
window.WIN_HAMMING,
|
|
6.76))
|
|
self.lora_rx_3 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=10, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
|
|
self.lora_rx_2 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=9, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
|
|
self.lora_rx_1 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=8, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
|
|
self.lora_rx_0 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=7, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
|
|
self.blocks_message_debug_0 = blocks.message_debug(True, gr.log_levels.info)
|
|
|
|
|
|
##################################################
|
|
# Connections
|
|
##################################################
|
|
self.msg_connect((self.lora_rx_0, 'out'), (self.blocks_message_debug_0, 'print'))
|
|
self.msg_connect((self.lora_rx_1, 'out'), (self.blocks_message_debug_0, 'print'))
|
|
self.msg_connect((self.lora_rx_2, 'out'), (self.blocks_message_debug_0, 'print'))
|
|
self.msg_connect((self.lora_rx_3, 'out'), (self.blocks_message_debug_0, 'print'))
|
|
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_0, 0))
|
|
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_1, 0))
|
|
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_2, 0))
|
|
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_3, 0))
|
|
self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0))
|
|
|
|
|
|
def get_samp_rate(self):
|
|
return self.samp_rate
|
|
|
|
def set_samp_rate(self, samp_rate):
|
|
self.samp_rate = samp_rate
|
|
self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76))
|
|
self.osmosdr_source_0.set_sample_rate(self.samp_rate)
|
|
|
|
def get_rf_gain(self):
|
|
return self.rf_gain
|
|
|
|
def set_rf_gain(self, rf_gain):
|
|
self.rf_gain = rf_gain
|
|
self.osmosdr_source_0.set_gain(self.rf_gain, 0)
|
|
|
|
def get_lora_bw(self):
|
|
return self.lora_bw
|
|
|
|
def set_lora_bw(self, lora_bw):
|
|
self.lora_bw = lora_bw
|
|
|
|
def get_center_freq(self):
|
|
return self.center_freq
|
|
|
|
def set_center_freq(self, center_freq):
|
|
self.center_freq = center_freq
|
|
self.osmosdr_source_0.set_center_freq(self.center_freq, 0)
|
|
|
|
|
|
|
|
|
|
def main(top_block_cls=multi_sf_lora_rx, options=None):
|
|
tb = top_block_cls()
|
|
|
|
def sig_handler(sig=None, frame=None):
|
|
tb.stop()
|
|
tb.wait()
|
|
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, sig_handler)
|
|
signal.signal(signal.SIGTERM, sig_handler)
|
|
|
|
tb.start()
|
|
tb.flowgraph_started.set()
|
|
|
|
try:
|
|
input('Press Enter to quit: ')
|
|
except EOFError:
|
|
pass
|
|
tb.stop()
|
|
tb.wait()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|