"""Channelizer: frequency translation and decimation for wideband captures. When captures are taken at a higher sample rate than the LoRa bandwidth (e.g., 2 MHz capture of a 125 kHz LoRa signal), the signal must be: 1. Frequency-shifted to baseband 2. Low-pass filtered (anti-aliasing) 3. Decimated to match the LoRa bandwidth This module provides the channelization step needed before FrameSync. """ import numpy as np from dataclasses import dataclass @dataclass class ChannelizerConfig: """Configuration for channelizer.""" input_sample_rate: float # Hz - rate of incoming samples channel_bw: float # Hz - LoRa bandwidth (typically 125e3) center_freq: float = 0.0 # Hz - center frequency of capture channel_freq: float = 0.0 # Hz - frequency of LoRa channel n_taps: int = 0 # FIR filter taps (0 = auto-calculate) class Channelizer: """Frequency translation and decimation for wideband captures. Shifts a LoRa channel to baseband with proper anti-alias filtering. Uses a windowed-sinc FIR filter instead of a moving average. Example: >>> ch = Channelizer(input_sample_rate=2e6, channel_bw=125e3) >>> baseband = ch.channelize(wideband_iq) >>> # baseband is now at 125 kHz sample rate """ def __init__( self, input_sample_rate: float, channel_bw: float, center_freq: float = 0.0, channel_freq: float = 0.0, n_taps: int = 0, ): """Initialize channelizer. Args: input_sample_rate: Sample rate of input data (Hz) channel_bw: LoRa bandwidth / output sample rate (Hz) center_freq: Center frequency of capture (Hz) channel_freq: Frequency of LoRa channel (Hz) n_taps: FIR filter taps (0 = auto: 4 * decimation + 1) """ self.input_sample_rate = input_sample_rate self.channel_bw = channel_bw self.center_freq = center_freq self.channel_freq = channel_freq # Calculate decimation factor self.decim = max(1, int(input_sample_rate / channel_bw)) # FIR filter design if n_taps <= 0: n_taps = self.decim * 4 + 1 # odd length for type-I linear phase self.n_taps = n_taps # Normalized cutoff (Nyquist = 1.0) cutoff = channel_bw / input_sample_rate # Design the filter self._fir = self._design_lowpass(n_taps, cutoff) # Precompute frequency offset self._freq_offset = channel_freq - center_freq def _design_lowpass(self, n_taps: int, cutoff: float) -> np.ndarray: """Design windowed-sinc lowpass filter. Args: n_taps: Number of filter taps (should be odd) cutoff: Normalized cutoff frequency (0 to 1, where 1 = Nyquist) Returns: FIR filter coefficients """ try: from scipy.signal import firwin return firwin(n_taps, cutoff).astype(np.float32) except ImportError: # Fallback: simple windowed sinc if scipy not available n = np.arange(n_taps) - (n_taps - 1) / 2 # Avoid division by zero with np.errstate(divide='ignore', invalid='ignore'): h = np.where(n == 0, 2 * cutoff, np.sin(2 * np.pi * cutoff * n) / (np.pi * n)) # Hamming window window = 0.54 - 0.46 * np.cos(2 * np.pi * np.arange(n_taps) / (n_taps - 1)) fir = (h * window).astype(np.float32) return fir / np.sum(fir) # Normalize def channelize( self, iq_data: np.ndarray, freq_offset: float | None = None, ) -> np.ndarray: """Channelize wideband IQ data to baseband at channel bandwidth. Args: iq_data: Complex IQ samples at input_sample_rate freq_offset: Override frequency offset (Hz). If None, uses channel_freq - center_freq from init. Returns: Complex IQ samples at channel_bw sample rate """ if freq_offset is None: freq_offset = self._freq_offset n = len(iq_data) # Step 1: Frequency shift to baseband if abs(freq_offset) > 0.01: # Only shift if significant offset t = np.arange(n, dtype=np.float64) / self.input_sample_rate shifted = iq_data * np.exp(-1j * 2 * np.pi * freq_offset * t).astype(np.complex64) else: shifted = iq_data.astype(np.complex64) # Step 2 & 3: Filter and decimate if self.decim <= 1: return shifted # Convolve with FIR filter filtered = np.convolve(shifted, self._fir, mode="same") # Decimate return filtered[::self.decim] @property def output_sample_rate(self) -> float: """Output sample rate after channelization.""" return self.input_sample_rate / self.decim def __repr__(self) -> str: return (f"Channelizer(in={self.input_sample_rate/1e3:.0f}kHz, " f"out={self.output_sample_rate/1e3:.0f}kHz, " f"decim={self.decim}, taps={self.n_taps})") def channelize( iq_data: np.ndarray, input_sample_rate: float, center_freq: float, channel_freq: float, channel_bw: float, ) -> np.ndarray: """Convenience function to channelize wideband data. Args: iq_data: Complex IQ samples at input_sample_rate input_sample_rate: Sample rate of input data (Hz) center_freq: Center frequency of capture (Hz) channel_freq: Frequency of LoRa channel (Hz) channel_bw: LoRa bandwidth (Hz) Returns: Complex IQ samples at channel_bw sample rate Example: >>> # 2 MHz capture, LoRa at 915 MHz, 125 kHz bandwidth >>> baseband = channelize(iq, 2e6, 915e6, 915e6, 125e3) """ ch = Channelizer( input_sample_rate=input_sample_rate, channel_bw=channel_bw, center_freq=center_freq, channel_freq=channel_freq, ) return ch.channelize(iq_data)