gr-rylr998/python/rylr998/css_demod.py
Ryan Malloy c839d225a8 Initial release: complete LoRa TX/RX for RYLR998 modems
GNU Radio Out-of-Tree module providing:
- Complete TX chain: PHYEncode → FrameGen → CSSMod
- Complete RX chain: CSSDemod → FrameSync → PHYDecode
- NETWORKID extraction/encoding (0-255 range)
- All SF (7-12) and CR (4/5-4/8) combinations
- Loopback tested with 24/24 configurations passing

Key features:
- Fractional SFD (2.25 downchirp) handling
- Gray encode/decode with proper inverse operations
- gr-lora_sdr compatible decode modes
- GRC block definitions and example flowgraphs
- Comprehensive documentation

Discovered RYLR998 sync word mapping:
  sync_bin_1 = (NETWORKID >> 4) * 8
  sync_bin_2 = (NETWORKID & 0x0F) * 8
2026-02-05 13:38:07 -07:00

273 lines
9.3 KiB
Python

"""CSS (Chirp Spread Spectrum) demodulator block.
Performs FFT-based peak detection on dechirped LoRa symbols to extract
the encoded bin values. Each symbol's bin encodes SF bits of information.
The demodulation process:
1. Multiply received signal by reference downchirp (dechirp)
2. FFT to convert frequency ramp to single tone
3. Find peak bin location
"""
import numpy as np
from numpy.typing import NDArray
from dataclasses import dataclass
from typing import Iterator
@dataclass
class CSSDemodConfig:
"""Configuration for CSS demodulator."""
sf: int = 9 # Spreading factor (7-12)
sample_rate: float = 250e3 # Input sample rate (Hz)
bw: float = 125e3 # LoRa bandwidth (Hz)
sps: int | None = None # Samples per symbol (computed if None)
def __post_init__(self):
if not 7 <= self.sf <= 12:
raise ValueError(f"SF must be 7-12, got {self.sf}")
N = 1 << self.sf
if self.sps is None:
# Compute samples per symbol from sample rate and bandwidth
self.sps = int(N * self.sample_rate / self.bw)
class CSSDemod:
"""FFT-based CSS demodulator for LoRa signals.
Takes complex IQ samples and outputs demodulated bin values (integers).
Each output bin represents one LoRa symbol encoding SF bits.
"""
def __init__(self, sf: int = 9, sample_rate: float = 250e3,
bw: float = 125e3):
"""Initialize CSS demodulator.
Args:
sf: Spreading factor (7-12)
sample_rate: Input sample rate in Hz
bw: LoRa signal bandwidth in Hz
"""
self.config = CSSDemodConfig(sf=sf, sample_rate=sample_rate, bw=bw)
self.N = 1 << sf # Number of bins
self.sps = self.config.sps
# Generate reference downchirp for dechirping
self._downchirp = self._generate_downchirp()
# For fractional rate conversion if sample_rate != bw
self._resample_needed = abs(sample_rate - bw) > 1.0
def _generate_downchirp(self) -> NDArray[np.complex64]:
"""Generate reference downchirp at the operating sample rate."""
N = self.N
sps = self.sps
n = np.arange(sps)
# Downchirp: frequency decreases linearly
# Phase = -π * k * t² / T where T = symbol period
# At sample rate fs with sps samples: t = n/fs, T = N/bw
phase = -2 * np.pi * (n * n / (2 * sps))
return np.exp(1j * phase).astype(np.complex64)
def _generate_upchirp(self, f_start: int = 0) -> NDArray[np.complex64]:
"""Generate upchirp starting at frequency bin f_start."""
N = self.N
sps = self.sps
n = np.arange(sps)
# Upchirp starting at bin f_start
# Frequency ramps from f_start to f_start+N (wrapping at N)
phase = 2 * np.pi * ((f_start * n / sps) + (n * n / (2 * sps)))
return np.exp(1j * phase).astype(np.complex64)
def demod_symbol(self, samples: NDArray[np.complex64]) -> int:
"""Demodulate a single symbol worth of samples.
Args:
samples: Complex IQ samples (length = sps)
Returns:
Demodulated bin value (0 to N-1)
"""
if len(samples) < self.sps:
raise ValueError(f"Need {self.sps} samples, got {len(samples)}")
# Dechirp by multiplying with reference downchirp
dechirped = samples[:self.sps] * self._downchirp
# FFT and find peak
# Use N-point FFT (or sps-point if different)
if self.sps == self.N:
spectrum = np.abs(np.fft.fft(dechirped)) ** 2
else:
# Interpolate to N bins using zero-padding
spectrum = np.abs(np.fft.fft(dechirped, n=self.N)) ** 2
return int(np.argmax(spectrum))
def demod_symbols(self, samples: NDArray[np.complex64],
offset: int = 0) -> list[int]:
"""Demodulate multiple symbols from a sample stream.
Args:
samples: Complex IQ samples
offset: Starting sample offset
Returns:
List of demodulated bin values
"""
bins = []
pos = offset
while pos + self.sps <= len(samples):
symbol_samples = samples[pos:pos + self.sps]
bins.append(self.demod_symbol(symbol_samples))
pos += self.sps
return bins
def demod_with_fine_timing(self, samples: NDArray[np.complex64],
offset: int = 0,
search_range: int = 10) -> tuple[list[int], int]:
"""Demodulate with fine timing search for optimal symbol boundary.
Searches around the initial offset for the timing that produces
the strongest FFT peaks, improving demodulation reliability.
Args:
samples: Complex IQ samples
offset: Initial sample offset
search_range: Samples to search around offset (±search_range)
Returns:
Tuple of (bin_values, best_offset)
"""
best_bins = []
best_offset = offset
best_metric = 0.0
for try_offset in range(max(0, offset - search_range),
min(len(samples), offset + search_range)):
bins = []
metric = 0.0
pos = try_offset
# Demodulate up to 3 symbols to evaluate timing
for _ in range(min(3, (len(samples) - pos) // self.sps)):
if pos + self.sps > len(samples):
break
symbol_samples = samples[pos:pos + self.sps]
dechirped = symbol_samples * self._downchirp
spectrum = np.abs(np.fft.fft(dechirped, n=self.N)) ** 2
peak_bin = int(np.argmax(spectrum))
peak_val = spectrum[peak_bin]
metric += peak_val
bins.append(peak_bin)
pos += self.sps
if metric > best_metric:
best_metric = metric
best_offset = try_offset
best_bins = bins
# Demodulate full stream at best offset
return self.demod_symbols(samples, best_offset), best_offset
def estimate_cfo(self, preamble_samples: NDArray[np.complex64],
n_symbols: int = 4) -> float:
"""Estimate carrier frequency offset from preamble.
Uses the first n_symbols of the preamble to estimate the
frequency offset as a fractional bin value.
Args:
preamble_samples: Samples containing preamble
n_symbols: Number of preamble symbols to average
Returns:
CFO estimate in bins (fractional)
"""
bins = []
for i in range(n_symbols):
start = i * self.sps
if start + self.sps > len(preamble_samples):
break
symbol = preamble_samples[start:start + self.sps]
dechirped = symbol * self._downchirp
# Use parabolic interpolation for fractional bin
spectrum = np.abs(np.fft.fft(dechirped, n=self.N))
peak = int(np.argmax(spectrum))
# Parabolic interpolation around peak
if 0 < peak < self.N - 1:
y0 = spectrum[peak - 1]
y1 = spectrum[peak]
y2 = spectrum[peak + 1]
delta = 0.5 * (y0 - y2) / (y0 - 2 * y1 + y2 + 1e-10)
bins.append(peak + delta)
else:
bins.append(float(peak))
return np.mean(bins) if bins else 0.0
def css_demod(sf: int = 9, sample_rate: float = 250e3) -> CSSDemod:
"""Factory function for GNU Radio compatibility.
Args:
sf: Spreading factor
sample_rate: Sample rate in Hz
Returns:
CSSDemod instance
"""
return CSSDemod(sf=sf, sample_rate=sample_rate)
if __name__ == "__main__":
print("CSS Demodulator Test")
print("=" * 50)
# Create test modulator and demodulator
sf = 9
N = 1 << sf # 512
fs = 125e3
demod = CSSDemod(sf=sf, sample_rate=fs, bw=fs)
# Generate test chirps
def upchirp(f_start: int) -> np.ndarray:
n = np.arange(N)
phase = 2 * np.pi * ((f_start * n / N) + (n * n / (2 * N)))
return np.exp(1j * phase).astype(np.complex64)
# Test demodulation of known bins
test_bins = [0, 1, 100, 255, 511]
print(f"\nDemodulating test symbols (SF{sf}, N={N}):")
for test_bin in test_bins:
chirp = upchirp(test_bin)
result = demod.demod_symbol(chirp)
status = "" if result == test_bin else ""
print(f" bin={test_bin:3d} → demod={result:3d} {status}")
# Test multi-symbol demodulation
test_sequence = [10, 20, 30, 40, 50]
multi_iq = np.concatenate([upchirp(b) for b in test_sequence])
results = demod.demod_symbols(multi_iq)
print(f"\nMulti-symbol test: {test_sequence}{results}")
assert results == test_sequence, "Multi-symbol demod failed!"
print("✓ Multi-symbol demodulation OK")
# Test with noise
snr_db = 10
signal = np.concatenate([upchirp(b) for b in test_sequence])
noise_power = np.mean(np.abs(signal) ** 2) / (10 ** (snr_db / 10))
noise = np.sqrt(noise_power / 2) * (
np.random.randn(len(signal)) + 1j * np.random.randn(len(signal))
).astype(np.complex64)
noisy = signal + noise
noisy_results = demod.demod_symbols(noisy)
print(f"\nWith {snr_db}dB SNR: {test_sequence}{noisy_results}")