"""CSS (Chirp Spread Spectrum) demodulator block. Performs FFT-based peak detection on dechirped LoRa symbols to extract the encoded bin values. Each symbol's bin encodes SF bits of information. The demodulation process: 1. Multiply received signal by reference downchirp (dechirp) 2. FFT to convert frequency ramp to single tone 3. Find peak bin location """ import numpy as np from numpy.typing import NDArray from dataclasses import dataclass from typing import Iterator @dataclass class CSSDemodConfig: """Configuration for CSS demodulator.""" sf: int = 9 # Spreading factor (7-12) sample_rate: float = 250e3 # Input sample rate (Hz) bw: float = 125e3 # LoRa bandwidth (Hz) sps: int | None = None # Samples per symbol (computed if None) def __post_init__(self): if not 7 <= self.sf <= 12: raise ValueError(f"SF must be 7-12, got {self.sf}") N = 1 << self.sf if self.sps is None: # Compute samples per symbol from sample rate and bandwidth self.sps = int(N * self.sample_rate / self.bw) class CSSDemod: """FFT-based CSS demodulator for LoRa signals. Takes complex IQ samples and outputs demodulated bin values (integers). Each output bin represents one LoRa symbol encoding SF bits. """ def __init__(self, sf: int = 9, sample_rate: float = 250e3, bw: float = 125e3): """Initialize CSS demodulator. Args: sf: Spreading factor (7-12) sample_rate: Input sample rate in Hz bw: LoRa signal bandwidth in Hz """ self.config = CSSDemodConfig(sf=sf, sample_rate=sample_rate, bw=bw) self.N = 1 << sf # Number of bins self.sps = self.config.sps # Generate reference downchirp for dechirping self._downchirp = self._generate_downchirp() # For fractional rate conversion if sample_rate != bw self._resample_needed = abs(sample_rate - bw) > 1.0 def _generate_downchirp(self) -> NDArray[np.complex64]: """Generate reference downchirp at the operating sample rate.""" N = self.N sps = self.sps n = np.arange(sps) # Downchirp: frequency decreases linearly # Phase = -π * k * t² / T where T = symbol period # At sample rate fs with sps samples: t = n/fs, T = N/bw phase = -2 * np.pi * (n * n / (2 * sps)) return np.exp(1j * phase).astype(np.complex64) def _generate_upchirp(self, f_start: int = 0) -> NDArray[np.complex64]: """Generate upchirp starting at frequency bin f_start.""" N = self.N sps = self.sps n = np.arange(sps) # Upchirp starting at bin f_start # Frequency ramps from f_start to f_start+N (wrapping at N) phase = 2 * np.pi * ((f_start * n / sps) + (n * n / (2 * sps))) return np.exp(1j * phase).astype(np.complex64) def demod_symbol(self, samples: NDArray[np.complex64]) -> int: """Demodulate a single symbol worth of samples. Args: samples: Complex IQ samples (length = sps) Returns: Demodulated bin value (0 to N-1) """ if len(samples) < self.sps: raise ValueError(f"Need {self.sps} samples, got {len(samples)}") # Dechirp by multiplying with reference downchirp dechirped = samples[:self.sps] * self._downchirp # FFT and find peak # Use N-point FFT (or sps-point if different) if self.sps == self.N: spectrum = np.abs(np.fft.fft(dechirped)) ** 2 else: # Interpolate to N bins using zero-padding spectrum = np.abs(np.fft.fft(dechirped, n=self.N)) ** 2 return int(np.argmax(spectrum)) def demod_symbols(self, samples: NDArray[np.complex64], offset: int = 0) -> list[int]: """Demodulate multiple symbols from a sample stream. Args: samples: Complex IQ samples offset: Starting sample offset Returns: List of demodulated bin values """ bins = [] pos = offset while pos + self.sps <= len(samples): symbol_samples = samples[pos:pos + self.sps] bins.append(self.demod_symbol(symbol_samples)) pos += self.sps return bins def demod_with_fine_timing(self, samples: NDArray[np.complex64], offset: int = 0, search_range: int = 10) -> tuple[list[int], int]: """Demodulate with fine timing search for optimal symbol boundary. Searches around the initial offset for the timing that produces the strongest FFT peaks, improving demodulation reliability. Args: samples: Complex IQ samples offset: Initial sample offset search_range: Samples to search around offset (±search_range) Returns: Tuple of (bin_values, best_offset) """ best_bins = [] best_offset = offset best_metric = 0.0 for try_offset in range(max(0, offset - search_range), min(len(samples), offset + search_range)): bins = [] metric = 0.0 pos = try_offset # Demodulate up to 3 symbols to evaluate timing for _ in range(min(3, (len(samples) - pos) // self.sps)): if pos + self.sps > len(samples): break symbol_samples = samples[pos:pos + self.sps] dechirped = symbol_samples * self._downchirp spectrum = np.abs(np.fft.fft(dechirped, n=self.N)) ** 2 peak_bin = int(np.argmax(spectrum)) peak_val = spectrum[peak_bin] metric += peak_val bins.append(peak_bin) pos += self.sps if metric > best_metric: best_metric = metric best_offset = try_offset best_bins = bins # Demodulate full stream at best offset return self.demod_symbols(samples, best_offset), best_offset def estimate_cfo(self, preamble_samples: NDArray[np.complex64], n_symbols: int = 4) -> float: """Estimate carrier frequency offset from preamble. Uses the first n_symbols of the preamble to estimate the frequency offset as a fractional bin value. Args: preamble_samples: Samples containing preamble n_symbols: Number of preamble symbols to average Returns: CFO estimate in bins (fractional) """ bins = [] for i in range(n_symbols): start = i * self.sps if start + self.sps > len(preamble_samples): break symbol = preamble_samples[start:start + self.sps] dechirped = symbol * self._downchirp # Use parabolic interpolation for fractional bin spectrum = np.abs(np.fft.fft(dechirped, n=self.N)) peak = int(np.argmax(spectrum)) # Parabolic interpolation around peak if 0 < peak < self.N - 1: y0 = spectrum[peak - 1] y1 = spectrum[peak] y2 = spectrum[peak + 1] delta = 0.5 * (y0 - y2) / (y0 - 2 * y1 + y2 + 1e-10) bins.append(peak + delta) else: bins.append(float(peak)) return np.mean(bins) if bins else 0.0 def css_demod(sf: int = 9, sample_rate: float = 250e3) -> CSSDemod: """Factory function for GNU Radio compatibility. Args: sf: Spreading factor sample_rate: Sample rate in Hz Returns: CSSDemod instance """ return CSSDemod(sf=sf, sample_rate=sample_rate) if __name__ == "__main__": print("CSS Demodulator Test") print("=" * 50) # Create test modulator and demodulator sf = 9 N = 1 << sf # 512 fs = 125e3 demod = CSSDemod(sf=sf, sample_rate=fs, bw=fs) # Generate test chirps def upchirp(f_start: int) -> np.ndarray: n = np.arange(N) phase = 2 * np.pi * ((f_start * n / N) + (n * n / (2 * N))) return np.exp(1j * phase).astype(np.complex64) # Test demodulation of known bins test_bins = [0, 1, 100, 255, 511] print(f"\nDemodulating test symbols (SF{sf}, N={N}):") for test_bin in test_bins: chirp = upchirp(test_bin) result = demod.demod_symbol(chirp) status = "✓" if result == test_bin else "✗" print(f" bin={test_bin:3d} → demod={result:3d} {status}") # Test multi-symbol demodulation test_sequence = [10, 20, 30, 40, 50] multi_iq = np.concatenate([upchirp(b) for b in test_sequence]) results = demod.demod_symbols(multi_iq) print(f"\nMulti-symbol test: {test_sequence} → {results}") assert results == test_sequence, "Multi-symbol demod failed!" print("✓ Multi-symbol demodulation OK") # Test with noise snr_db = 10 signal = np.concatenate([upchirp(b) for b in test_sequence]) noise_power = np.mean(np.abs(signal) ** 2) / (10 ** (snr_db / 10)) noise = np.sqrt(noise_power / 2) * ( np.random.randn(len(signal)) + 1j * np.random.randn(len(signal)) ).astype(np.complex64) noisy = signal + noise noisy_results = demod.demod_symbols(noisy) print(f"\nWith {snr_db}dB SNR: {test_sequence} → {noisy_results}")