#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: LoRa Channel Scanner # Author: gr-mcp # Description: Scans US ISM 915MHz band for LoRa activity. Retune via XML-RPC. # GNU Radio version: 3.10.12.0 from gnuradio import blocks, gr from gnuradio import filter from gnuradio.filter import firdes from gnuradio import gr from gnuradio.fft import window import sys import signal from argparse import ArgumentParser from gnuradio.eng_arg import eng_float, intx from gnuradio import eng_notation from xmlrpc.server import SimpleXMLRPCServer import threading import gnuradio.lora_sdr as lora_sdr import osmosdr import time class lora_channel_scanner(gr.top_block): def __init__(self): gr.top_block.__init__(self, "LoRa Channel Scanner", catch_exceptions=True) self.flowgraph_started = threading.Event() ################################################## # Variables ################################################## self.samp_rate = samp_rate = 1000000 self.rf_gain = rf_gain = 40 self.lora_sf = lora_sf = 7 self.lora_cr = lora_cr = 1 self.lora_bw = lora_bw = 125000 self.channel_index = channel_index = 0 self.center_freq = center_freq = 915e6 ################################################## # Blocks ################################################## self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True) self.xmlrpc_server_0.register_instance(self) self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever) self.xmlrpc_server_0_thread.daemon = True self.xmlrpc_server_0_thread.start() self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + 'rtl=0' ) self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t()) self.osmosdr_source_0.set_sample_rate(samp_rate) self.osmosdr_source_0.set_center_freq(center_freq, 0) self.osmosdr_source_0.set_freq_corr(0, 0) self.osmosdr_source_0.set_dc_offset_mode(2, 0) self.osmosdr_source_0.set_iq_balance_mode(2, 0) self.osmosdr_source_0.set_gain_mode(False, 0) self.osmosdr_source_0.set_gain(rf_gain, 0) self.osmosdr_source_0.set_if_gain(20, 0) self.osmosdr_source_0.set_bb_gain(20, 0) self.osmosdr_source_0.set_antenna('', 0) self.osmosdr_source_0.set_bandwidth(0, 0) self.low_pass_filter_0 = filter.fir_filter_ccf( 2, firdes.low_pass( 1, samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76)) self.lora_rx_0 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=lora_sf, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True]) self.blocks_message_debug_0 = blocks.message_debug(True, gr.log_levels.info) ################################################## # Connections ################################################## self.msg_connect((self.lora_rx_0, 'out'), (self.blocks_message_debug_0, 'print')) self.connect((self.low_pass_filter_0, 0), (self.lora_rx_0, 0)) self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0)) def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76)) self.osmosdr_source_0.set_sample_rate(self.samp_rate) def get_rf_gain(self): return self.rf_gain def set_rf_gain(self, rf_gain): self.rf_gain = rf_gain self.osmosdr_source_0.set_gain(self.rf_gain, 0) def get_lora_sf(self): return self.lora_sf def set_lora_sf(self, lora_sf): self.lora_sf = lora_sf def get_lora_cr(self): return self.lora_cr def set_lora_cr(self, lora_cr): self.lora_cr = lora_cr def get_lora_bw(self): return self.lora_bw def set_lora_bw(self, lora_bw): self.lora_bw = lora_bw def get_channel_index(self): return self.channel_index def set_channel_index(self, channel_index): self.channel_index = channel_index def get_center_freq(self): return self.center_freq def set_center_freq(self, center_freq): self.center_freq = center_freq self.osmosdr_source_0.set_center_freq(self.center_freq, 0) def main(top_block_cls=lora_channel_scanner, options=None): tb = top_block_cls() def sig_handler(sig=None, frame=None): tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() tb.flowgraph_started.set() try: input('Press Enter to quit: ') except EOFError: pass tb.stop() tb.wait() if __name__ == '__main__': main()