#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: Multi-SF LoRa Receiver # Author: gr-mcp # Description: Simultaneous LoRa decoder for SF7-SF12. 915MHz, BW125k. # GNU Radio version: 3.10.12.0 from gnuradio import blocks, gr from gnuradio import filter from gnuradio.filter import firdes from gnuradio import gr from gnuradio.fft import window import sys import signal from argparse import ArgumentParser from gnuradio.eng_arg import eng_float, intx from gnuradio import eng_notation from xmlrpc.server import SimpleXMLRPCServer import threading import gnuradio.lora_sdr as lora_sdr import osmosdr import time class multi_sf_lora_rx(gr.top_block): def __init__(self): gr.top_block.__init__(self, "Multi-SF LoRa Receiver", catch_exceptions=True) self.flowgraph_started = threading.Event() ################################################## # Variables ################################################## self.samp_rate = samp_rate = 1000000 self.rf_gain = rf_gain = 40 self.lora_bw = lora_bw = 125000 self.center_freq = center_freq = 915e6 ################################################## # Blocks ################################################## self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True) self.xmlrpc_server_0.register_instance(self) self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever) self.xmlrpc_server_0_thread.daemon = True self.xmlrpc_server_0_thread.start() self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + 'rtl=0' ) self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t()) self.osmosdr_source_0.set_sample_rate(samp_rate) self.osmosdr_source_0.set_center_freq(center_freq, 0) self.osmosdr_source_0.set_freq_corr(0, 0) self.osmosdr_source_0.set_dc_offset_mode(2, 0) self.osmosdr_source_0.set_iq_balance_mode(2, 0) self.osmosdr_source_0.set_gain_mode(False, 0) self.osmosdr_source_0.set_gain(rf_gain, 0) self.osmosdr_source_0.set_if_gain(20, 0) self.osmosdr_source_0.set_bb_gain(20, 0) self.osmosdr_source_0.set_antenna('', 0) self.osmosdr_source_0.set_bandwidth(0, 0) self.low_pass_filter_0 = filter.fir_filter_ccf( 2, firdes.low_pass( 1, samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76)) self.lora_rx_3 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=10, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True]) self.lora_rx_2 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=9, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True]) self.lora_rx_1 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=8, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True]) self.lora_rx_0 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=7, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True]) self.blocks_message_debug_0 = blocks.message_debug(True, gr.log_levels.info) ################################################## # Connections ################################################## self.msg_connect((self.lora_rx_0, 'out'), (self.blocks_message_debug_0, 'print')) self.msg_connect((self.lora_rx_1, 'out'), (self.blocks_message_debug_0, 'print')) self.msg_connect((self.lora_rx_2, 'out'), (self.blocks_message_debug_0, 'print')) self.msg_connect((self.lora_rx_3, 'out'), (self.blocks_message_debug_0, 'print')) self.connect((self.low_pass_filter_0, 0), (self.lora_rx_0, 0)) self.connect((self.low_pass_filter_0, 0), (self.lora_rx_1, 0)) self.connect((self.low_pass_filter_0, 0), (self.lora_rx_2, 0)) self.connect((self.low_pass_filter_0, 0), (self.lora_rx_3, 0)) self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0)) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76)) self.osmosdr_source_0.set_sample_rate(self.samp_rate) def get_rf_gain(self): return self.rf_gain def set_rf_gain(self, rf_gain): self.rf_gain = rf_gain self.osmosdr_source_0.set_gain(self.rf_gain, 0) def get_lora_bw(self): return self.lora_bw def set_lora_bw(self, lora_bw): self.lora_bw = lora_bw def get_center_freq(self): return self.center_freq def set_center_freq(self, center_freq): self.center_freq = center_freq self.osmosdr_source_0.set_center_freq(self.center_freq, 0) def main(top_block_cls=multi_sf_lora_rx, options=None): tb = top_block_cls() def sig_handler(sig=None, frame=None): tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() tb.flowgraph_started.set() try: input('Press Enter to quit: ') except EOFError: pass tb.stop() tb.wait() if __name__ == '__main__': main()