gr-mcp/examples/lora_quality_analyzer.py
Ryan Malloy 8800d35fd4 add Starlight docs site, LoRa examples, and clean up .gitignore
- Starlight docs: 28 pages covering getting started, guides, tool
  reference, concepts (architecture, dynamic tools, runtime comms)
- LoRa examples: channel scanner, quality analyzer, multi-SF receiver
  with both .grc and .py forms, plus ADSB+LoRa combo test
- .gitignore: exclude generated artifacts (*_patched_*.py, *.wav,
  docs build cache, tests/scratch/)
- Add .mcp.json for local MCP server config
- Sync uv.lock with date-based version
2026-02-24 09:34:50 -07:00

210 lines
7.4 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: LoRa Signal Quality Analyzer
# Author: gr-mcp
# Description: LoRa decoder with IQ recording and real-time signal power measurement.
# GNU Radio version: 3.10.12.0
from gnuradio import blocks
from gnuradio import blocks, gr
from gnuradio import filter
from gnuradio.filter import firdes
from gnuradio import gr
from gnuradio.fft import window
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from xmlrpc.server import SimpleXMLRPCServer
import threading
import gnuradio.lora_sdr as lora_sdr
import osmosdr
import time
class lora_quality_analyzer(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "LoRa Signal Quality Analyzer", catch_exceptions=True)
self.flowgraph_started = threading.Event()
##################################################
# Variables
##################################################
self.signal_power_db = signal_power_db = 0
self.samp_rate = samp_rate = 1000000
self.rf_gain = rf_gain = 40
self.recording_selector = recording_selector = 1
self.lora_sf = lora_sf = 7
self.lora_bw = lora_bw = 125000
self.iq_file = iq_file = "/tmp/iq_capture.cf32"
self.center_freq = center_freq = 915e6
##################################################
# Blocks
##################################################
self.blocks_probe_signal_x_0 = blocks.probe_signal_f()
self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True)
self.xmlrpc_server_0.register_instance(self)
self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever)
self.xmlrpc_server_0_thread.daemon = True
self.xmlrpc_server_0_thread.start()
def _signal_power_db_probe():
self.flowgraph_started.wait()
while True:
val = self.blocks_probe_signal_x_0.level()
try:
try:
self.doc.add_next_tick_callback(functools.partial(self.set_signal_power_db,val))
except AttributeError:
self.set_signal_power_db(val)
except AttributeError:
pass
time.sleep(1.0 / (2))
_signal_power_db_thread = threading.Thread(target=_signal_power_db_probe)
_signal_power_db_thread.daemon = True
_signal_power_db_thread.start()
self.osmosdr_source_0 = osmosdr.source(
args="numchan=" + str(1) + " " + 'rtl=0'
)
self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t())
self.osmosdr_source_0.set_sample_rate(samp_rate)
self.osmosdr_source_0.set_center_freq(center_freq, 0)
self.osmosdr_source_0.set_freq_corr(0, 0)
self.osmosdr_source_0.set_dc_offset_mode(2, 0)
self.osmosdr_source_0.set_iq_balance_mode(2, 0)
self.osmosdr_source_0.set_gain_mode(False, 0)
self.osmosdr_source_0.set_gain(rf_gain, 0)
self.osmosdr_source_0.set_if_gain(20, 0)
self.osmosdr_source_0.set_bb_gain(20, 0)
self.osmosdr_source_0.set_antenna('', 0)
self.osmosdr_source_0.set_bandwidth(0, 0)
self.low_pass_filter_0 = filter.fir_filter_ccf(
2,
firdes.low_pass(
1,
samp_rate,
200e3,
50e3,
window.WIN_HAMMING,
6.76))
self.lora_rx_0 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=lora_sf, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
self.blocks_selector_0 = blocks.selector(gr.sizeof_gr_complex*1,0,recording_selector)
self.blocks_selector_0.set_enabled(True)
self.blocks_null_sink_0 = blocks.null_sink(gr.sizeof_gr_complex*1)
self.blocks_nlog10_ff_0 = blocks.nlog10_ff(10, 1, 0)
self.blocks_moving_average_xx_0 = blocks.moving_average_ff(10000, (1.0/10000), 4000, 1)
self.blocks_message_debug_0 = blocks.message_debug(True, gr.log_levels.info)
self.blocks_file_sink_0 = blocks.file_sink(gr.sizeof_gr_complex*1, iq_file, False)
self.blocks_file_sink_0.set_unbuffered(False)
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(1)
##################################################
# Connections
##################################################
self.msg_connect((self.lora_rx_0, 'out'), (self.blocks_message_debug_0, 'print'))
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_moving_average_xx_0, 0))
self.connect((self.blocks_moving_average_xx_0, 0), (self.blocks_nlog10_ff_0, 0))
self.connect((self.blocks_nlog10_ff_0, 0), (self.blocks_probe_signal_x_0, 0))
self.connect((self.blocks_selector_0, 0), (self.blocks_file_sink_0, 0))
self.connect((self.blocks_selector_0, 1), (self.blocks_null_sink_0, 0))
self.connect((self.low_pass_filter_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
self.connect((self.low_pass_filter_0, 0), (self.blocks_selector_0, 0))
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_0, 0))
self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0))
def get_signal_power_db(self):
return self.signal_power_db
def set_signal_power_db(self, signal_power_db):
self.signal_power_db = signal_power_db
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76))
self.osmosdr_source_0.set_sample_rate(self.samp_rate)
def get_rf_gain(self):
return self.rf_gain
def set_rf_gain(self, rf_gain):
self.rf_gain = rf_gain
self.osmosdr_source_0.set_gain(self.rf_gain, 0)
def get_recording_selector(self):
return self.recording_selector
def set_recording_selector(self, recording_selector):
self.recording_selector = recording_selector
self.blocks_selector_0.set_output_index(self.recording_selector)
def get_lora_sf(self):
return self.lora_sf
def set_lora_sf(self, lora_sf):
self.lora_sf = lora_sf
def get_lora_bw(self):
return self.lora_bw
def set_lora_bw(self, lora_bw):
self.lora_bw = lora_bw
def get_iq_file(self):
return self.iq_file
def set_iq_file(self, iq_file):
self.iq_file = iq_file
self.blocks_file_sink_0.open(self.iq_file)
def get_center_freq(self):
return self.center_freq
def set_center_freq(self, center_freq):
self.center_freq = center_freq
self.osmosdr_source_0.set_center_freq(self.center_freq, 0)
def main(top_block_cls=lora_quality_analyzer, options=None):
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.flowgraph_started.set()
try:
input('Press Enter to quit: ')
except EOFError:
pass
tb.stop()
tb.wait()
if __name__ == '__main__':
main()