GNU Radio Out-of-Tree module providing: - Complete TX chain: PHYEncode → FrameGen → CSSMod - Complete RX chain: CSSDemod → FrameSync → PHYDecode - NETWORKID extraction/encoding (0-255 range) - All SF (7-12) and CR (4/5-4/8) combinations - Loopback tested with 24/24 configurations passing Key features: - Fractional SFD (2.25 downchirp) handling - Gray encode/decode with proper inverse operations - gr-lora_sdr compatible decode modes - GRC block definitions and example flowgraphs - Comprehensive documentation Discovered RYLR998 sync word mapping: sync_bin_1 = (NETWORKID >> 4) * 8 sync_bin_2 = (NETWORKID & 0x0F) * 8
244 lines
8.0 KiB
Python
244 lines
8.0 KiB
Python
"""LoRa frame generator block.
|
||
|
||
Generates complete LoRa frames from encoded symbol bins:
|
||
[Preamble] + [Sync Word] + [SFD] + [Data Symbols]
|
||
|
||
The sync word encodes the NETWORKID as two chirps.
|
||
"""
|
||
|
||
import numpy as np
|
||
from numpy.typing import NDArray
|
||
from dataclasses import dataclass
|
||
from typing import Optional
|
||
|
||
from .networkid import networkid_to_sync_word
|
||
|
||
|
||
@dataclass
|
||
class FrameGenConfig:
|
||
"""Configuration for frame generator."""
|
||
sf: int = 9 # Spreading factor
|
||
sample_rate: float = 125e3 # Output sample rate
|
||
bw: float = 125e3 # LoRa bandwidth
|
||
preamble_len: int = 8 # Number of preamble symbols
|
||
networkid: int = 18 # RYLR998 NETWORKID (0-255)
|
||
|
||
|
||
class FrameGen:
|
||
"""Frame generator for LoRa signals.
|
||
|
||
Prepends preamble, sync word, and SFD to data symbol bins,
|
||
then modulates everything into IQ samples.
|
||
"""
|
||
|
||
def __init__(self, sf: int = 9, sample_rate: float = 125e3,
|
||
bw: float = 125e3, preamble_len: int = 8,
|
||
networkid: int = 18):
|
||
"""Initialize frame generator.
|
||
|
||
Args:
|
||
sf: Spreading factor (7-12)
|
||
sample_rate: Output sample rate in Hz
|
||
bw: LoRa signal bandwidth in Hz
|
||
preamble_len: Number of preamble upchirps
|
||
networkid: RYLR998 NETWORKID for sync word (0-255)
|
||
"""
|
||
self.config = FrameGenConfig(
|
||
sf=sf, sample_rate=sample_rate, bw=bw,
|
||
preamble_len=preamble_len, networkid=networkid
|
||
)
|
||
self.sf = sf
|
||
self.N = 1 << sf
|
||
self.sps = int(self.N * sample_rate / bw)
|
||
self.networkid = networkid
|
||
|
||
# Precompute base chirps
|
||
self._upchirp_base = self._generate_upchirp(0)
|
||
self._downchirp_base = np.conj(self._upchirp_base)
|
||
|
||
def _generate_upchirp(self, f_start: int) -> NDArray[np.complex64]:
|
||
"""Generate upchirp starting at frequency bin f_start."""
|
||
n = np.arange(self.sps)
|
||
phase = 2 * np.pi * ((f_start * n / self.sps) + (n * n / (2 * self.sps)))
|
||
return np.exp(1j * phase).astype(np.complex64)
|
||
|
||
def set_networkid(self, networkid: int):
|
||
"""Update NETWORKID for subsequent frames.
|
||
|
||
Args:
|
||
networkid: New NETWORKID (0-255)
|
||
"""
|
||
if not 0 <= networkid <= 255:
|
||
raise ValueError(f"NETWORKID must be 0-255, got {networkid}")
|
||
self.networkid = networkid
|
||
self.config.networkid = networkid
|
||
|
||
def generate_preamble(self) -> NDArray[np.complex64]:
|
||
"""Generate preamble (upchirps at bin 0).
|
||
|
||
Returns:
|
||
Complex samples for preamble
|
||
"""
|
||
return np.tile(self._upchirp_base, self.config.preamble_len)
|
||
|
||
def generate_sync_word(self) -> NDArray[np.complex64]:
|
||
"""Generate sync word from NETWORKID.
|
||
|
||
The RYLR998 sync word encoding (verified from real captures):
|
||
- First chirp: high nibble × 8
|
||
- Second chirp: low nibble × 8
|
||
|
||
This uses a fixed scale of 8 (not N/16 like standard LoRa).
|
||
|
||
Returns:
|
||
Complex samples for sync word (2 symbols)
|
||
"""
|
||
hi_nibble = (self.networkid >> 4) & 0x0F
|
||
lo_nibble = self.networkid & 0x0F
|
||
# RYLR998 uses fixed scale of 8 (verified from SDR captures)
|
||
scale = 8
|
||
|
||
s1 = self._generate_upchirp(hi_nibble * scale)
|
||
s2 = self._generate_upchirp(lo_nibble * scale)
|
||
return np.concatenate([s1, s2])
|
||
|
||
def generate_sfd(self) -> NDArray[np.complex64]:
|
||
"""Generate Start Frame Delimiter (2.25 downchirps).
|
||
|
||
Returns:
|
||
Complex samples for SFD
|
||
"""
|
||
quarter = self._downchirp_base[:self.sps // 4]
|
||
return np.concatenate([
|
||
self._downchirp_base,
|
||
self._downchirp_base,
|
||
quarter
|
||
])
|
||
|
||
def modulate_bins(self, bins: list[int]) -> NDArray[np.complex64]:
|
||
"""Modulate bin values into chirp samples.
|
||
|
||
Args:
|
||
bins: List of symbol bin values (0 to N-1)
|
||
|
||
Returns:
|
||
Complex samples
|
||
"""
|
||
samples = np.empty(len(bins) * self.sps, dtype=np.complex64)
|
||
for i, b in enumerate(bins):
|
||
samples[i * self.sps:(i + 1) * self.sps] = self._generate_upchirp(b % self.N)
|
||
return samples
|
||
|
||
def generate_frame(self, data_bins: list[int]) -> NDArray[np.complex64]:
|
||
"""Generate complete LoRa frame from data symbol bins.
|
||
|
||
Args:
|
||
data_bins: Encoded data symbol bin values (from PHY encoder)
|
||
|
||
Returns:
|
||
Complete IQ frame samples
|
||
"""
|
||
parts = [
|
||
self.generate_preamble(),
|
||
self.generate_sync_word(),
|
||
self.generate_sfd(),
|
||
self.modulate_bins(data_bins),
|
||
]
|
||
return np.concatenate(parts)
|
||
|
||
def generate_frame_bins(self, data_bins: list[int]) -> list[int]:
|
||
"""Generate complete frame as bin values (without IQ modulation).
|
||
|
||
Useful for testing the symbol-level chain before CSS modulation.
|
||
|
||
Args:
|
||
data_bins: Encoded data symbol bin values
|
||
|
||
Returns:
|
||
Complete frame as bin values (preamble + sync + data)
|
||
Note: SFD is not representable as bins (downchirps)
|
||
"""
|
||
# Preamble bins (all 0)
|
||
preamble_bins = [0] * self.config.preamble_len
|
||
|
||
# Sync word bins (RYLR998 uses fixed scale of 8)
|
||
hi_nibble = (self.networkid >> 4) & 0x0F
|
||
lo_nibble = self.networkid & 0x0F
|
||
scale = 8 # RYLR998 fixed scale
|
||
sync_bins = [hi_nibble * scale, lo_nibble * scale]
|
||
|
||
# Note: SFD is downchirps, not representable as upchirp bins
|
||
# Caller must handle SFD separately in modulation
|
||
|
||
return preamble_bins + sync_bins + list(data_bins)
|
||
|
||
def frame_duration_ms(self, n_data_symbols: int) -> float:
|
||
"""Calculate frame duration in milliseconds.
|
||
|
||
Args:
|
||
n_data_symbols: Number of data symbols
|
||
|
||
Returns:
|
||
Frame duration in ms
|
||
"""
|
||
# Preamble + sync (2) + SFD (2.25) + data
|
||
total_symbols = self.config.preamble_len + 2 + 2.25 + n_data_symbols
|
||
symbol_time = self.N / self.config.bw
|
||
return total_symbols * symbol_time * 1000
|
||
|
||
|
||
def frame_gen(sf: int = 9, sample_rate: float = 125e3,
|
||
preamble_len: int = 8, networkid: int = 18) -> FrameGen:
|
||
"""Factory function for GNU Radio compatibility."""
|
||
return FrameGen(sf=sf, sample_rate=sample_rate,
|
||
preamble_len=preamble_len, networkid=networkid)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("Frame Generator Test")
|
||
print("=" * 50)
|
||
|
||
sf = 9
|
||
N = 1 << sf
|
||
fs = 125e3
|
||
|
||
gen = FrameGen(sf=sf, sample_rate=fs, networkid=18)
|
||
|
||
# Test components
|
||
preamble = gen.generate_preamble()
|
||
sync = gen.generate_sync_word()
|
||
sfd = gen.generate_sfd()
|
||
|
||
print(f"\nSF{sf} frame components:")
|
||
print(f" Preamble: {len(preamble)} samples ({len(preamble)/N:.1f} symbols)")
|
||
print(f" Sync word: {len(sync)} samples ({len(sync)/N:.1f} symbols)")
|
||
print(f" SFD: {len(sfd)} samples ({len(sfd)/N:.2f} symbols)")
|
||
|
||
# Test complete frame
|
||
data_bins = [100, 200, 300, 400, 500]
|
||
frame = gen.generate_frame(data_bins)
|
||
|
||
print(f"\nComplete frame with {len(data_bins)} data symbols:")
|
||
print(f" Total samples: {len(frame)}")
|
||
print(f" Duration: {gen.frame_duration_ms(len(data_bins)):.2f} ms")
|
||
|
||
# Verify sync word bins
|
||
gen.set_networkid(0x34) # LoRaWAN public
|
||
sync_bins = gen.generate_frame_bins([])[:10] # Just preamble + sync
|
||
print(f"\nNETWORKID=0x34 sync bins: {sync_bins[8:10]}") # Skip preamble
|
||
# Should be [48, 64] for 0x34 = nibbles 3, 4 → bins 3*32, 4*32
|
||
|
||
expected_hi = 3 * (N // 16) # 96
|
||
expected_lo = 4 * (N // 16) # 128
|
||
print(f" Expected: [{expected_hi}, {expected_lo}]")
|
||
|
||
# Verify NETWORKID=18 (0x12)
|
||
gen.set_networkid(0x12)
|
||
sync_bins = gen.generate_frame_bins([])[:10]
|
||
print(f"\nNETWORKID=0x12 sync bins: {sync_bins[8:10]}")
|
||
expected_hi = 1 * (N // 16) # 32
|
||
expected_lo = 2 * (N // 16) # 64
|
||
print(f" Expected: [{expected_hi}, {expected_lo}]")
|
||
|
||
print("\n✓ Frame generator OK")
|