gr-mcp/examples/lora_rx_mcp.py
Ryan Malloy 61471b7280 fix: tune lora_rx_mcp for Docker runtime compatibility
Adjust RTL-SDR bandwidth, IF gain, DC offset and IQ balance modes
for better performance. Use compatible message_debug constructor.
2026-01-31 10:01:48 -07:00

146 lines
4.6 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: LoRa SDR Receiver (MCP-built)
# Author: gr-mcp
# Description: LoRa receiver built entirely through MCP tools. Matches Heltec V3 beacon: 915MHz, SF7, BW125k.
# GNU Radio version: 3.10.12.0
from gnuradio import blocks, gr
from gnuradio import filter
from gnuradio.filter import firdes
from gnuradio import gr
from gnuradio.fft import window
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from xmlrpc.server import SimpleXMLRPCServer
import threading
import gnuradio.lora_sdr as lora_sdr
import osmosdr
import time
class lora_rx_mcp(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "LoRa SDR Receiver (MCP-built)", catch_exceptions=True)
self.flowgraph_started = threading.Event()
##################################################
# Variables
##################################################
self.samp_rate = samp_rate = 1000000
self.lora_sf = lora_sf = 7
self.lora_bw = lora_bw = 125000
self.center_freq = center_freq = 915e6
##################################################
# Blocks
##################################################
self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True)
self.xmlrpc_server_0.register_instance(self)
self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever)
self.xmlrpc_server_0_thread.daemon = True
self.xmlrpc_server_0_thread.start()
self.osmosdr_source_0 = osmosdr.source(
args="numchan=" + str(1) + " " + 'rtl=0'
)
self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t())
self.osmosdr_source_0.set_sample_rate(samp_rate)
self.osmosdr_source_0.set_center_freq(center_freq, 0)
self.osmosdr_source_0.set_freq_corr(0, 0)
self.osmosdr_source_0.set_dc_offset_mode(2, 0)
self.osmosdr_source_0.set_iq_balance_mode(2, 0)
self.osmosdr_source_0.set_gain_mode(False, 0)
self.osmosdr_source_0.set_gain(40, 0)
self.osmosdr_source_0.set_if_gain(20, 0)
self.osmosdr_source_0.set_bb_gain(20, 0)
self.osmosdr_source_0.set_antenna('', 0)
self.osmosdr_source_0.set_bandwidth(0, 0)
self.low_pass_filter_0 = filter.fir_filter_ccf(
2,
firdes.low_pass(
1,
samp_rate,
200e3,
50e3,
window.WIN_HAMMING,
6.76))
self.lora_rx_0 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=lora_sf, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
self.blocks_message_debug_0 = blocks.message_debug(True, gr.log_levels.info)
##################################################
# Connections
##################################################
self.msg_connect((self.lora_rx_0, 'out'), (self.blocks_message_debug_0, 'print'))
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_0, 0))
self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76))
self.osmosdr_source_0.set_sample_rate(self.samp_rate)
def get_lora_sf(self):
return self.lora_sf
def set_lora_sf(self, lora_sf):
self.lora_sf = lora_sf
def get_lora_bw(self):
return self.lora_bw
def set_lora_bw(self, lora_bw):
self.lora_bw = lora_bw
def get_center_freq(self):
return self.center_freq
def set_center_freq(self, center_freq):
self.center_freq = center_freq
self.osmosdr_source_0.set_center_freq(self.center_freq, 0)
def main(top_block_cls=lora_rx_mcp, options=None):
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.flowgraph_started.set()
try:
input('Press Enter to quit: ')
except EOFError:
pass
tb.stop()
tb.wait()
if __name__ == '__main__':
main()