#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: LoRa Signal Quality Analyzer # Author: gr-mcp # Description: LoRa decoder with IQ recording and real-time signal power measurement. # GNU Radio version: 3.10.12.0 from gnuradio import blocks from gnuradio import blocks, gr from gnuradio import filter from gnuradio.filter import firdes from gnuradio import gr from gnuradio.fft import window import sys import signal from argparse import ArgumentParser from gnuradio.eng_arg import eng_float, intx from gnuradio import eng_notation from xmlrpc.server import SimpleXMLRPCServer import threading import gnuradio.lora_sdr as lora_sdr import osmosdr import time class lora_quality_analyzer(gr.top_block): def __init__(self): gr.top_block.__init__(self, "LoRa Signal Quality Analyzer", catch_exceptions=True) self.flowgraph_started = threading.Event() ################################################## # Variables ################################################## self.signal_power_db = signal_power_db = 0 self.samp_rate = samp_rate = 1000000 self.rf_gain = rf_gain = 40 self.recording_selector = recording_selector = 1 self.lora_sf = lora_sf = 7 self.lora_bw = lora_bw = 125000 self.iq_file = iq_file = "/tmp/iq_capture.cf32" self.center_freq = center_freq = 915e6 ################################################## # Blocks ################################################## self.blocks_probe_signal_x_0 = blocks.probe_signal_f() self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True) self.xmlrpc_server_0.register_instance(self) self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever) self.xmlrpc_server_0_thread.daemon = True self.xmlrpc_server_0_thread.start() def _signal_power_db_probe(): self.flowgraph_started.wait() while True: val = self.blocks_probe_signal_x_0.level() try: try: self.doc.add_next_tick_callback(functools.partial(self.set_signal_power_db,val)) except AttributeError: self.set_signal_power_db(val) except AttributeError: pass time.sleep(1.0 / (2)) _signal_power_db_thread = threading.Thread(target=_signal_power_db_probe) _signal_power_db_thread.daemon = True _signal_power_db_thread.start() self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + 'rtl=0' ) self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t()) self.osmosdr_source_0.set_sample_rate(samp_rate) self.osmosdr_source_0.set_center_freq(center_freq, 0) self.osmosdr_source_0.set_freq_corr(0, 0) self.osmosdr_source_0.set_dc_offset_mode(2, 0) self.osmosdr_source_0.set_iq_balance_mode(2, 0) self.osmosdr_source_0.set_gain_mode(False, 0) self.osmosdr_source_0.set_gain(rf_gain, 0) self.osmosdr_source_0.set_if_gain(20, 0) self.osmosdr_source_0.set_bb_gain(20, 0) self.osmosdr_source_0.set_antenna('', 0) self.osmosdr_source_0.set_bandwidth(0, 0) self.low_pass_filter_0 = filter.fir_filter_ccf( 2, firdes.low_pass( 1, samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76)) self.lora_rx_0 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=lora_sf, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True]) self.blocks_selector_0 = blocks.selector(gr.sizeof_gr_complex*1,0,recording_selector) self.blocks_selector_0.set_enabled(True) self.blocks_null_sink_0 = blocks.null_sink(gr.sizeof_gr_complex*1) self.blocks_nlog10_ff_0 = blocks.nlog10_ff(10, 1, 0) self.blocks_moving_average_xx_0 = blocks.moving_average_ff(10000, (1.0/10000), 4000, 1) self.blocks_message_debug_0 = blocks.message_debug(True, gr.log_levels.info) self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_gr_complex*1, iq_file, False) self.blocks_file_sink_0.set_unbuffered(False) self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(1) ################################################## # Connections ################################################## self.msg_connect((self.lora_rx_0, 'out'), (self.blocks_message_debug_0, 'print')) self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_moving_average_xx_0, 0)) self.connect((self.blocks_moving_average_xx_0, 0), (self.blocks_nlog10_ff_0, 0)) self.connect((self.blocks_nlog10_ff_0, 0), (self.blocks_probe_signal_x_0, 0)) self.connect((self.blocks_selector_0, 0), (self.blocks_file_sink_0, 0)) self.connect((self.blocks_selector_0, 1), (self.blocks_null_sink_0, 0)) self.connect((self.low_pass_filter_0, 0), (self.blocks_complex_to_mag_squared_0, 0)) self.connect((self.low_pass_filter_0, 0), (self.blocks_selector_0, 0)) self.connect((self.low_pass_filter_0, 0), (self.lora_rx_0, 0)) self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0)) def get_signal_power_db(self): return self.signal_power_db def set_signal_power_db(self, signal_power_db): self.signal_power_db = signal_power_db def get_samp_rate(self): return self.samp_rate def set_samp_rate(self, samp_rate): self.samp_rate = samp_rate self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76)) self.osmosdr_source_0.set_sample_rate(self.samp_rate) def get_rf_gain(self): return self.rf_gain def set_rf_gain(self, rf_gain): self.rf_gain = rf_gain self.osmosdr_source_0.set_gain(self.rf_gain, 0) def get_recording_selector(self): return self.recording_selector def set_recording_selector(self, recording_selector): self.recording_selector = recording_selector self.blocks_selector_0.set_output_index(self.recording_selector) def get_lora_sf(self): return self.lora_sf def set_lora_sf(self, lora_sf): self.lora_sf = lora_sf def get_lora_bw(self): return self.lora_bw def set_lora_bw(self, lora_bw): self.lora_bw = lora_bw def get_iq_file(self): return self.iq_file def set_iq_file(self, iq_file): self.iq_file = iq_file self.blocks_file_sink_0.open(self.iq_file) def get_center_freq(self): return self.center_freq def set_center_freq(self, center_freq): self.center_freq = center_freq self.osmosdr_source_0.set_center_freq(self.center_freq, 0) def main(top_block_cls=lora_quality_analyzer, options=None): tb = top_block_cls() def sig_handler(sig=None, frame=None): tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() tb.flowgraph_started.set() try: input('Press Enter to quit: ') except EOFError: pass tb.stop() tb.wait() if __name__ == '__main__': main()