"""LoRa frame generator block. Generates complete LoRa frames from encoded symbol bins: [Preamble] + [Sync Word] + [SFD] + [Data Symbols] The sync word encodes the NETWORKID as two chirps. """ import numpy as np from numpy.typing import NDArray from dataclasses import dataclass from typing import Optional from .networkid import networkid_to_sync_word @dataclass class FrameGenConfig: """Configuration for frame generator.""" sf: int = 9 # Spreading factor sample_rate: float = 125e3 # Output sample rate bw: float = 125e3 # LoRa bandwidth preamble_len: int = 8 # Number of preamble symbols networkid: int = 18 # RYLR998 NETWORKID (0-255) class FrameGen: """Frame generator for LoRa signals. Prepends preamble, sync word, and SFD to data symbol bins, then modulates everything into IQ samples. """ def __init__(self, sf: int = 9, sample_rate: float = 125e3, bw: float = 125e3, preamble_len: int = 8, networkid: int = 18): """Initialize frame generator. Args: sf: Spreading factor (7-12) sample_rate: Output sample rate in Hz bw: LoRa signal bandwidth in Hz preamble_len: Number of preamble upchirps networkid: RYLR998 NETWORKID for sync word (0-255) """ self.config = FrameGenConfig( sf=sf, sample_rate=sample_rate, bw=bw, preamble_len=preamble_len, networkid=networkid ) self.sf = sf self.N = 1 << sf self.sps = int(self.N * sample_rate / bw) self.networkid = networkid # Precompute base chirps self._upchirp_base = self._generate_upchirp(0) self._downchirp_base = np.conj(self._upchirp_base) def _generate_upchirp(self, f_start: int) -> NDArray[np.complex64]: """Generate upchirp starting at frequency bin f_start.""" n = np.arange(self.sps) phase = 2 * np.pi * ((f_start * n / self.sps) + (n * n / (2 * self.sps))) return np.exp(1j * phase).astype(np.complex64) def set_networkid(self, networkid: int): """Update NETWORKID for subsequent frames. Args: networkid: New NETWORKID (0-255) """ if not 0 <= networkid <= 255: raise ValueError(f"NETWORKID must be 0-255, got {networkid}") self.networkid = networkid self.config.networkid = networkid def generate_preamble(self) -> NDArray[np.complex64]: """Generate preamble (upchirps at bin 0). Returns: Complex samples for preamble """ return np.tile(self._upchirp_base, self.config.preamble_len) def generate_sync_word(self) -> NDArray[np.complex64]: """Generate sync word from NETWORKID. The RYLR998 sync word encoding (verified from real captures): - First chirp: high nibble Ɨ 8 - Second chirp: low nibble Ɨ 8 This uses a fixed scale of 8 (not N/16 like standard LoRa). Returns: Complex samples for sync word (2 symbols) """ hi_nibble = (self.networkid >> 4) & 0x0F lo_nibble = self.networkid & 0x0F # RYLR998 uses fixed scale of 8 (verified from SDR captures) scale = 8 s1 = self._generate_upchirp(hi_nibble * scale) s2 = self._generate_upchirp(lo_nibble * scale) return np.concatenate([s1, s2]) def generate_sfd(self) -> NDArray[np.complex64]: """Generate Start Frame Delimiter (2.25 downchirps). Returns: Complex samples for SFD """ quarter = self._downchirp_base[:self.sps // 4] return np.concatenate([ self._downchirp_base, self._downchirp_base, quarter ]) def modulate_bins(self, bins: list[int]) -> NDArray[np.complex64]: """Modulate bin values into chirp samples. Args: bins: List of symbol bin values (0 to N-1) Returns: Complex samples """ samples = np.empty(len(bins) * self.sps, dtype=np.complex64) for i, b in enumerate(bins): samples[i * self.sps:(i + 1) * self.sps] = self._generate_upchirp(b % self.N) return samples def generate_frame(self, data_bins: list[int]) -> NDArray[np.complex64]: """Generate complete LoRa frame from data symbol bins. Args: data_bins: Encoded data symbol bin values (from PHY encoder) Returns: Complete IQ frame samples """ parts = [ self.generate_preamble(), self.generate_sync_word(), self.generate_sfd(), self.modulate_bins(data_bins), ] return np.concatenate(parts) def generate_frame_bins(self, data_bins: list[int]) -> list[int]: """Generate complete frame as bin values (without IQ modulation). Useful for testing the symbol-level chain before CSS modulation. Args: data_bins: Encoded data symbol bin values Returns: Complete frame as bin values (preamble + sync + data) Note: SFD is not representable as bins (downchirps) """ # Preamble bins (all 0) preamble_bins = [0] * self.config.preamble_len # Sync word bins (RYLR998 uses fixed scale of 8) hi_nibble = (self.networkid >> 4) & 0x0F lo_nibble = self.networkid & 0x0F scale = 8 # RYLR998 fixed scale sync_bins = [hi_nibble * scale, lo_nibble * scale] # Note: SFD is downchirps, not representable as upchirp bins # Caller must handle SFD separately in modulation return preamble_bins + sync_bins + list(data_bins) def frame_duration_ms(self, n_data_symbols: int) -> float: """Calculate frame duration in milliseconds. Args: n_data_symbols: Number of data symbols Returns: Frame duration in ms """ # Preamble + sync (2) + SFD (2.25) + data total_symbols = self.config.preamble_len + 2 + 2.25 + n_data_symbols symbol_time = self.N / self.config.bw return total_symbols * symbol_time * 1000 def frame_gen(sf: int = 9, sample_rate: float = 125e3, preamble_len: int = 8, networkid: int = 18) -> FrameGen: """Factory function for GNU Radio compatibility.""" return FrameGen(sf=sf, sample_rate=sample_rate, preamble_len=preamble_len, networkid=networkid) if __name__ == "__main__": print("Frame Generator Test") print("=" * 50) sf = 9 N = 1 << sf fs = 125e3 gen = FrameGen(sf=sf, sample_rate=fs, networkid=18) # Test components preamble = gen.generate_preamble() sync = gen.generate_sync_word() sfd = gen.generate_sfd() print(f"\nSF{sf} frame components:") print(f" Preamble: {len(preamble)} samples ({len(preamble)/N:.1f} symbols)") print(f" Sync word: {len(sync)} samples ({len(sync)/N:.1f} symbols)") print(f" SFD: {len(sfd)} samples ({len(sfd)/N:.2f} symbols)") # Test complete frame data_bins = [100, 200, 300, 400, 500] frame = gen.generate_frame(data_bins) print(f"\nComplete frame with {len(data_bins)} data symbols:") print(f" Total samples: {len(frame)}") print(f" Duration: {gen.frame_duration_ms(len(data_bins)):.2f} ms") # Verify sync word bins gen.set_networkid(0x34) # LoRaWAN public sync_bins = gen.generate_frame_bins([])[:10] # Just preamble + sync print(f"\nNETWORKID=0x34 sync bins: {sync_bins[8:10]}") # Skip preamble # Should be [48, 64] for 0x34 = nibbles 3, 4 → bins 3*32, 4*32 expected_hi = 3 * (N // 16) # 96 expected_lo = 4 * (N // 16) # 128 print(f" Expected: [{expected_hi}, {expected_lo}]") # Verify NETWORKID=18 (0x12) gen.set_networkid(0x12) sync_bins = gen.generate_frame_bins([])[:10] print(f"\nNETWORKID=0x12 sync bins: {sync_bins[8:10]}") expected_hi = 1 * (N // 16) # 32 expected_lo = 2 * (N // 16) # 64 print(f" Expected: [{expected_hi}, {expected_lo}]") print("\nāœ“ Frame generator OK")