gr-mcp/examples/lora_channel_scanner.py
Ryan Malloy 8800d35fd4 add Starlight docs site, LoRa examples, and clean up .gitignore
- Starlight docs: 28 pages covering getting started, guides, tool
  reference, concepts (architecture, dynamic tools, runtime comms)
- LoRa examples: channel scanner, quality analyzer, multi-SF receiver
  with both .grc and .py forms, plus ADSB+LoRa combo test
- .gitignore: exclude generated artifacts (*_patched_*.py, *.wav,
  docs build cache, tests/scratch/)
- Add .mcp.json for local MCP server config
- Sync uv.lock with date-based version
2026-02-24 09:34:50 -07:00

168 lines
5.1 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-License-Identifier: GPL-3.0
#
# GNU Radio Python Flow Graph
# Title: LoRa Channel Scanner
# Author: gr-mcp
# Description: Scans US ISM 915MHz band for LoRa activity. Retune via XML-RPC.
# GNU Radio version: 3.10.12.0
from gnuradio import blocks, gr
from gnuradio import filter
from gnuradio.filter import firdes
from gnuradio import gr
from gnuradio.fft import window
import sys
import signal
from argparse import ArgumentParser
from gnuradio.eng_arg import eng_float, intx
from gnuradio import eng_notation
from xmlrpc.server import SimpleXMLRPCServer
import threading
import gnuradio.lora_sdr as lora_sdr
import osmosdr
import time
class lora_channel_scanner(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "LoRa Channel Scanner", catch_exceptions=True)
self.flowgraph_started = threading.Event()
##################################################
# Variables
##################################################
self.samp_rate = samp_rate = 1000000
self.rf_gain = rf_gain = 40
self.lora_sf = lora_sf = 7
self.lora_cr = lora_cr = 1
self.lora_bw = lora_bw = 125000
self.channel_index = channel_index = 0
self.center_freq = center_freq = 915e6
##################################################
# Blocks
##################################################
self.xmlrpc_server_0 = SimpleXMLRPCServer(('localhost', 8080), allow_none=True)
self.xmlrpc_server_0.register_instance(self)
self.xmlrpc_server_0_thread = threading.Thread(target=self.xmlrpc_server_0.serve_forever)
self.xmlrpc_server_0_thread.daemon = True
self.xmlrpc_server_0_thread.start()
self.osmosdr_source_0 = osmosdr.source(
args="numchan=" + str(1) + " " + 'rtl=0'
)
self.osmosdr_source_0.set_time_unknown_pps(osmosdr.time_spec_t())
self.osmosdr_source_0.set_sample_rate(samp_rate)
self.osmosdr_source_0.set_center_freq(center_freq, 0)
self.osmosdr_source_0.set_freq_corr(0, 0)
self.osmosdr_source_0.set_dc_offset_mode(2, 0)
self.osmosdr_source_0.set_iq_balance_mode(2, 0)
self.osmosdr_source_0.set_gain_mode(False, 0)
self.osmosdr_source_0.set_gain(rf_gain, 0)
self.osmosdr_source_0.set_if_gain(20, 0)
self.osmosdr_source_0.set_bb_gain(20, 0)
self.osmosdr_source_0.set_antenna('', 0)
self.osmosdr_source_0.set_bandwidth(0, 0)
self.low_pass_filter_0 = filter.fir_filter_ccf(
2,
firdes.low_pass(
1,
samp_rate,
200e3,
50e3,
window.WIN_HAMMING,
6.76))
self.lora_rx_0 = lora_sdr.lora_sdr_lora_rx( bw=lora_bw, cr=1, has_crc=True, impl_head=False, pay_len=255, samp_rate=(int(samp_rate/2)), sf=lora_sf, sync_word=[0x12], soft_decoding=True, ldro_mode=2, print_rx=[True,True])
self.blocks_message_debug_0 = blocks.message_debug(True, gr.log_levels.info)
##################################################
# Connections
##################################################
self.msg_connect((self.lora_rx_0, 'out'), (self.blocks_message_debug_0, 'print'))
self.connect((self.low_pass_filter_0, 0), (self.lora_rx_0, 0))
self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0))
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.low_pass_filter_0.set_taps(firdes.low_pass(1, self.samp_rate, 200e3, 50e3, window.WIN_HAMMING, 6.76))
self.osmosdr_source_0.set_sample_rate(self.samp_rate)
def get_rf_gain(self):
return self.rf_gain
def set_rf_gain(self, rf_gain):
self.rf_gain = rf_gain
self.osmosdr_source_0.set_gain(self.rf_gain, 0)
def get_lora_sf(self):
return self.lora_sf
def set_lora_sf(self, lora_sf):
self.lora_sf = lora_sf
def get_lora_cr(self):
return self.lora_cr
def set_lora_cr(self, lora_cr):
self.lora_cr = lora_cr
def get_lora_bw(self):
return self.lora_bw
def set_lora_bw(self, lora_bw):
self.lora_bw = lora_bw
def get_channel_index(self):
return self.channel_index
def set_channel_index(self, channel_index):
self.channel_index = channel_index
def get_center_freq(self):
return self.center_freq
def set_center_freq(self, center_freq):
self.center_freq = center_freq
self.osmosdr_source_0.set_center_freq(self.center_freq, 0)
def main(top_block_cls=lora_channel_scanner, options=None):
tb = top_block_cls()
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.flowgraph_started.set()
try:
input('Press Enter to quit: ')
except EOFError:
pass
tb.stop()
tb.wait()
if __name__ == '__main__':
main()