GNU Radio Out-of-Tree module providing: - Complete TX chain: PHYEncode → FrameGen → CSSMod - Complete RX chain: CSSDemod → FrameSync → PHYDecode - NETWORKID extraction/encoding (0-255 range) - All SF (7-12) and CR (4/5-4/8) combinations - Loopback tested with 24/24 configurations passing Key features: - Fractional SFD (2.25 downchirp) handling - Gray encode/decode with proper inverse operations - gr-lora_sdr compatible decode modes - GRC block definitions and example flowgraphs - Comprehensive documentation Discovered RYLR998 sync word mapping: sync_bin_1 = (NETWORKID >> 4) * 8 sync_bin_2 = (NETWORKID & 0x0F) * 8
228 lines
6.9 KiB
Python
228 lines
6.9 KiB
Python
"""CSS (Chirp Spread Spectrum) modulator block.
|
||
|
||
Generates LoRa chirp signals from integer bin values. Each bin value
|
||
(0 to N-1) encodes SF bits of information into the starting frequency
|
||
of an upchirp.
|
||
"""
|
||
|
||
import numpy as np
|
||
from numpy.typing import NDArray
|
||
from dataclasses import dataclass
|
||
|
||
|
||
@dataclass
|
||
class CSSModConfig:
|
||
"""Configuration for CSS modulator."""
|
||
sf: int = 9 # Spreading factor (7-12)
|
||
sample_rate: float = 125e3 # Output sample rate (Hz)
|
||
bw: float = 125e3 # LoRa bandwidth (Hz)
|
||
sps: int | None = None # Samples per symbol (computed if None)
|
||
|
||
def __post_init__(self):
|
||
if not 7 <= self.sf <= 12:
|
||
raise ValueError(f"SF must be 7-12, got {self.sf}")
|
||
N = 1 << self.sf
|
||
if self.sps is None:
|
||
self.sps = int(N * self.sample_rate / self.bw)
|
||
|
||
|
||
class CSSMod:
|
||
"""CSS modulator for LoRa signals.
|
||
|
||
Takes integer bin values and outputs complex IQ chirp samples.
|
||
"""
|
||
|
||
def __init__(self, sf: int = 9, sample_rate: float = 125e3,
|
||
bw: float = 125e3):
|
||
"""Initialize CSS modulator.
|
||
|
||
Args:
|
||
sf: Spreading factor (7-12)
|
||
sample_rate: Output sample rate in Hz
|
||
bw: LoRa signal bandwidth in Hz
|
||
"""
|
||
self.config = CSSModConfig(sf=sf, sample_rate=sample_rate, bw=bw)
|
||
self.N = 1 << sf
|
||
self.sps = self.config.sps
|
||
self.sf = sf
|
||
self.bw = bw
|
||
self.sample_rate = sample_rate
|
||
|
||
# Precompute base chirps for efficiency
|
||
self._upchirp_base = self._generate_upchirp(0)
|
||
self._downchirp_base = np.conj(self._upchirp_base)
|
||
|
||
def _generate_upchirp(self, f_start: int) -> NDArray[np.complex64]:
|
||
"""Generate an upchirp starting at frequency bin f_start.
|
||
|
||
The chirp sweeps from f_start to f_start+N (wrapping at N),
|
||
covering the full bandwidth over one symbol period.
|
||
|
||
Args:
|
||
f_start: Starting frequency bin (0 to N-1)
|
||
|
||
Returns:
|
||
Complex64 samples for one symbol
|
||
"""
|
||
N = self.N
|
||
sps = self.sps
|
||
n = np.arange(sps)
|
||
|
||
# Phase integral of linear frequency ramp
|
||
# f(t) = f_start + t * BW / T where T = symbol period
|
||
# At sample k: t = k / fs, phase = 2π ∫ f(t) dt
|
||
phase = 2 * np.pi * ((f_start * n / sps) + (n * n / (2 * sps)))
|
||
return np.exp(1j * phase).astype(np.complex64)
|
||
|
||
def upchirp(self, f_start: int = 0) -> NDArray[np.complex64]:
|
||
"""Generate one upchirp symbol.
|
||
|
||
Args:
|
||
f_start: Starting frequency bin (0 to N-1)
|
||
|
||
Returns:
|
||
Complex64 samples
|
||
"""
|
||
if f_start == 0:
|
||
return self._upchirp_base.copy()
|
||
return self._generate_upchirp(f_start % self.N)
|
||
|
||
def downchirp(self) -> NDArray[np.complex64]:
|
||
"""Generate one downchirp symbol (frequency decreases).
|
||
|
||
Returns:
|
||
Complex64 samples
|
||
"""
|
||
return self._downchirp_base.copy()
|
||
|
||
def mod_symbol(self, bin_value: int) -> NDArray[np.complex64]:
|
||
"""Modulate a single bin value into a chirp.
|
||
|
||
Args:
|
||
bin_value: Frequency bin (0 to N-1)
|
||
|
||
Returns:
|
||
Complex64 chirp samples
|
||
"""
|
||
return self.upchirp(bin_value % self.N)
|
||
|
||
def mod_symbols(self, bins: list[int]) -> NDArray[np.complex64]:
|
||
"""Modulate multiple bin values into a continuous signal.
|
||
|
||
Args:
|
||
bins: List of frequency bins
|
||
|
||
Returns:
|
||
Complex64 samples (len = len(bins) * sps)
|
||
"""
|
||
if not bins:
|
||
return np.array([], dtype=np.complex64)
|
||
|
||
samples = np.empty(len(bins) * self.sps, dtype=np.complex64)
|
||
for i, b in enumerate(bins):
|
||
samples[i * self.sps:(i + 1) * self.sps] = self.upchirp(b)
|
||
return samples
|
||
|
||
def generate_preamble(self, length: int = 8) -> NDArray[np.complex64]:
|
||
"""Generate preamble consisting of upchirps at bin 0.
|
||
|
||
Args:
|
||
length: Number of preamble symbols
|
||
|
||
Returns:
|
||
Complex64 preamble samples
|
||
"""
|
||
return np.tile(self._upchirp_base, length)
|
||
|
||
def generate_sync_word(self, sync_byte: int) -> NDArray[np.complex64]:
|
||
"""Generate sync word symbols from a sync word byte.
|
||
|
||
The sync word byte is split into nibbles, each scaled to a bin:
|
||
- First symbol: high nibble × (N/16)
|
||
- Second symbol: low nibble × (N/16)
|
||
|
||
Args:
|
||
sync_byte: Sync word byte (0-255), e.g., 0x12 for private
|
||
|
||
Returns:
|
||
Complex64 samples for 2 symbols
|
||
"""
|
||
hi_nibble = (sync_byte >> 4) & 0x0F
|
||
lo_nibble = sync_byte & 0x0F
|
||
scale = self.N >> 4 # N / 16
|
||
|
||
s1 = self.upchirp(hi_nibble * scale)
|
||
s2 = self.upchirp(lo_nibble * scale)
|
||
return np.concatenate([s1, s2])
|
||
|
||
def generate_sfd(self) -> NDArray[np.complex64]:
|
||
"""Generate Start Frame Delimiter (2.25 downchirps).
|
||
|
||
Returns:
|
||
Complex64 SFD samples
|
||
"""
|
||
dc = self._downchirp_base
|
||
quarter = dc[:self.sps // 4]
|
||
return np.concatenate([dc, dc, quarter])
|
||
|
||
|
||
def css_mod(sf: int = 9, sample_rate: float = 125e3) -> CSSMod:
|
||
"""Factory function for GNU Radio compatibility.
|
||
|
||
Args:
|
||
sf: Spreading factor
|
||
sample_rate: Sample rate in Hz
|
||
|
||
Returns:
|
||
CSSMod instance
|
||
"""
|
||
return CSSMod(sf=sf, sample_rate=sample_rate)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("CSS Modulator Test")
|
||
print("=" * 50)
|
||
|
||
sf = 9
|
||
N = 1 << sf
|
||
fs = 125e3
|
||
|
||
mod = CSSMod(sf=sf, sample_rate=fs)
|
||
|
||
# Generate and display stats
|
||
uc = mod.upchirp(0)
|
||
dc = mod.downchirp()
|
||
preamble = mod.generate_preamble(8)
|
||
sync = mod.generate_sync_word(0x12)
|
||
sfd = mod.generate_sfd()
|
||
|
||
print(f"\nSF{sf} (N={N}) at {fs/1e3:.0f} kHz:")
|
||
print(f" Samples per symbol: {mod.sps}")
|
||
print(f" Upchirp length: {len(uc)} samples")
|
||
print(f" Downchirp length: {len(dc)} samples")
|
||
print(f" Preamble (8 sym): {len(preamble)} samples")
|
||
print(f" Sync word (2 sym): {len(sync)} samples")
|
||
print(f" SFD (2.25 sym): {len(sfd)} samples")
|
||
|
||
# Verify chirp properties
|
||
print("\nChirp verification:")
|
||
# Dechirp should produce a tone at bin 0
|
||
dechirped = uc * np.conj(uc)
|
||
spectrum = np.abs(np.fft.fft(dechirped))
|
||
peak = np.argmax(spectrum)
|
||
print(f" Upchirp × conj(upchirp) peak at bin {peak} (expect 0)")
|
||
|
||
# Modulate a sequence and verify
|
||
test_bins = [0, 100, 200, 300, 400]
|
||
signal = mod.mod_symbols(test_bins)
|
||
print(f"\nModulated {len(test_bins)} symbols: {len(signal)} samples")
|
||
|
||
# Demodulate to verify
|
||
from .css_demod import CSSDemod
|
||
demod = CSSDemod(sf=sf, sample_rate=fs)
|
||
recovered = demod.demod_symbols(signal)
|
||
print(f" Input bins: {test_bins}")
|
||
print(f" Recovered: {recovered}")
|
||
assert recovered == test_bins, "Round-trip failed!"
|
||
print("✓ Modulator/demodulator round-trip OK")
|