diff --git a/examples/decode_capture.py b/examples/decode_capture.py new file mode 100644 index 0000000..c5d16df --- /dev/null +++ b/examples/decode_capture.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +"""Decode RYLR998 LoRa frames from wideband SDR captures. + +This script processes real BladeRF captures at 2 MHz sample rate, +channelizes to 125 kHz, and decodes LoRa frames. + +Usage: + python decode_capture.py + python decode_capture.py --list # List available captures +""" + +import argparse +import sys +from pathlib import Path + +import numpy as np + +# Add parent to path for local development +sys.path.insert(0, str(Path(__file__).parent.parent / "python")) + +from rylr998 import ( + Channelizer, + FrameSync, + PHYDecode, + sync_word_to_networkid, +) + + +# Capture parameters (BladeRF defaults) +INPUT_SAMPLE_RATE = 2e6 # 2 MHz capture rate +CENTER_FREQ = 915e6 # Center frequency +CHANNEL_FREQ = 915e6 # LoRa channel frequency +CHANNEL_BW = 125e3 # LoRa bandwidth + +# LoRa parameters (RYLR998 defaults) +SF = 9 +CR = 1 + + +def decode_capture( + capture_path: Path, + sf: int = SF, + verbose: bool = True, +) -> list: + """Decode LoRa frames from a wideband capture file. + + Args: + capture_path: Path to .raw capture file (complex64) + sf: Spreading factor + verbose: Print progress + + Returns: + List of decoded frames + """ + # Load capture + if verbose: + print(f"Loading {capture_path.name}...") + iq_raw = np.fromfile(capture_path, dtype=np.complex64) + + if len(iq_raw) == 0: + print(" ERROR: Empty file") + return [] + + # Check for NaN/Inf + if np.any(~np.isfinite(iq_raw)): + nan_count = np.sum(~np.isfinite(iq_raw)) + print(f" WARNING: {nan_count} NaN/Inf values ({100*nan_count/len(iq_raw):.1f}%)") + # Replace NaN with zeros + iq_raw = np.nan_to_num(iq_raw, nan=0.0, posinf=0.0, neginf=0.0) + + duration_s = len(iq_raw) / INPUT_SAMPLE_RATE + if verbose: + print(f" Loaded {len(iq_raw):,} samples ({duration_s:.1f}s at {INPUT_SAMPLE_RATE/1e6:.1f} MHz)") + + # Channelize to LoRa bandwidth + if verbose: + print(f"Channelizing to {CHANNEL_BW/1e3:.0f} kHz...") + channelizer = Channelizer( + input_sample_rate=INPUT_SAMPLE_RATE, + channel_bw=CHANNEL_BW, + center_freq=CENTER_FREQ, + channel_freq=CHANNEL_FREQ, + ) + iq_ch = channelizer.channelize(iq_raw) + if verbose: + print(f" Channelized: {len(iq_ch):,} samples ({channelizer})") + + # Frame synchronization + if verbose: + print(f"Searching for LoRa frames (SF{sf})...") + + sync = FrameSync(sf=sf) + frames = [] + + # Process in chunks (1 second at a time) + chunk_size = int(CHANNEL_BW) # 1 second of channelized data + n_chunks = (len(iq_ch) + chunk_size - 1) // chunk_size + + decoder = PHYDecode(sf=sf) + + for i in range(n_chunks): + start = i * chunk_size + end = min((i + 1) * chunk_size, len(iq_ch)) + chunk = iq_ch[start:end] + + result = sync.sync_from_samples(chunk) + if result.found: + # Extract NETWORKID + nid = result.networkid + + if verbose: + print(f"\n Frame detected at chunk {i} ({start/CHANNEL_BW:.1f}s):") + print(f" CFO: {result.cfo_bin:.2f} bins") + print(f" Sync word raw: {result.sync_word_raw}") + print(f" NETWORKID: {nid}") + print(f" Data symbols: {len(result.data_symbols)}") + + # Decode + cfo_int = int(round(result.cfo_bin)) + frame = decoder.decode( + result.data_symbols, + cfo_bin=cfo_int, + use_grlora_gray=True, # Real captures use gr-lora_sdr convention + soft_decoding=False, # Real captures need -1 offset + ) + + if verbose: + print(f" Header OK: {frame.header_ok}") + print(f" Payload len: {frame.payload_length}") + print(f" CRC OK: {frame.crc_ok}") + if frame.payload: + try: + text = frame.payload.decode('utf-8', errors='replace') + print(f" Payload: {repr(text)}") + except Exception: + print(f" Payload (hex): {frame.payload.hex()}") + + frames.append({ + 'time': start / CHANNEL_BW, + 'networkid': nid, + 'cfo_bin': result.cfo_bin, + 'frame': frame, + }) + + # Reset sync for next frame + sync.reset() + + if verbose: + print(f"\n{'='*60}") + print(f"Total frames found: {len(frames)}") + + return frames + + +def list_captures(): + """List available capture files.""" + logs_dir = Path(__file__).parent.parent.parent / "gnuradio" / "logs" + if not logs_dir.exists(): + print(f"Logs directory not found: {logs_dir}") + return + + print(f"Capture files in {logs_dir}:") + print("-" * 60) + + for raw_file in sorted(logs_dir.glob("*.raw")): + size_mb = raw_file.stat().st_size / 1e6 + n_samples = raw_file.stat().st_size // 8 # complex64 = 8 bytes + duration_s = n_samples / INPUT_SAMPLE_RATE + print(f" {raw_file.name:<45} {size_mb:>7.1f} MB ({duration_s:.1f}s)") + + +def main(): + parser = argparse.ArgumentParser(description="Decode LoRa frames from SDR captures") + parser.add_argument("capture", nargs="?", help="Capture file path") + parser.add_argument("--list", action="store_true", help="List available captures") + parser.add_argument("--sf", type=int, default=SF, help=f"Spreading factor (default: {SF})") + parser.add_argument("--quiet", "-q", action="store_true", help="Less verbose output") + args = parser.parse_args() + + if args.list: + list_captures() + return + + if not args.capture: + parser.print_help() + print("\n\nTip: Use --list to see available capture files") + return + + capture_path = Path(args.capture) + if not capture_path.exists(): + # Try looking in the logs directory + logs_dir = Path(__file__).parent.parent.parent / "gnuradio" / "logs" + alt_path = logs_dir / args.capture + if alt_path.exists(): + capture_path = alt_path + else: + print(f"ERROR: File not found: {capture_path}") + sys.exit(1) + + frames = decode_capture(capture_path, sf=args.sf, verbose=not args.quiet) + + if not frames: + print("\nNo frames decoded. Try:") + print(" - Different SF (--sf 7 through --sf 12)") + print(" - Check capture frequency matches RYLR998 setting") + print(" - Ensure RYLR998 is transmitting") + + +if __name__ == "__main__": + main() diff --git a/python/rylr998/__init__.py b/python/rylr998/__init__.py index f32d02b..1c96b7b 100644 --- a/python/rylr998/__init__.py +++ b/python/rylr998/__init__.py @@ -26,6 +26,7 @@ from .phy_decode import PHYDecode, LoRaFrame from .phy_encode import PHYEncode from .frame_sync import FrameSync from .frame_gen import FrameGen +from .channelizer import Channelizer, channelize __version__ = "0.1.0" __all__ = [ @@ -39,4 +40,6 @@ __all__ = [ "FrameSync", "FrameGen", "LoRaFrame", + "Channelizer", + "channelize", ] diff --git a/python/rylr998/channelizer.py b/python/rylr998/channelizer.py new file mode 100644 index 0000000..90a5390 --- /dev/null +++ b/python/rylr998/channelizer.py @@ -0,0 +1,179 @@ +"""Channelizer: frequency translation and decimation for wideband captures. + +When captures are taken at a higher sample rate than the LoRa bandwidth (e.g., +2 MHz capture of a 125 kHz LoRa signal), the signal must be: + 1. Frequency-shifted to baseband + 2. Low-pass filtered (anti-aliasing) + 3. Decimated to match the LoRa bandwidth + +This module provides the channelization step needed before FrameSync. +""" + +import numpy as np +from dataclasses import dataclass + + +@dataclass +class ChannelizerConfig: + """Configuration for channelizer.""" + input_sample_rate: float # Hz - rate of incoming samples + channel_bw: float # Hz - LoRa bandwidth (typically 125e3) + center_freq: float = 0.0 # Hz - center frequency of capture + channel_freq: float = 0.0 # Hz - frequency of LoRa channel + n_taps: int = 0 # FIR filter taps (0 = auto-calculate) + + +class Channelizer: + """Frequency translation and decimation for wideband captures. + + Shifts a LoRa channel to baseband with proper anti-alias filtering. + Uses a windowed-sinc FIR filter instead of a moving average. + + Example: + >>> ch = Channelizer(input_sample_rate=2e6, channel_bw=125e3) + >>> baseband = ch.channelize(wideband_iq) + >>> # baseband is now at 125 kHz sample rate + """ + + def __init__( + self, + input_sample_rate: float, + channel_bw: float, + center_freq: float = 0.0, + channel_freq: float = 0.0, + n_taps: int = 0, + ): + """Initialize channelizer. + + Args: + input_sample_rate: Sample rate of input data (Hz) + channel_bw: LoRa bandwidth / output sample rate (Hz) + center_freq: Center frequency of capture (Hz) + channel_freq: Frequency of LoRa channel (Hz) + n_taps: FIR filter taps (0 = auto: 4 * decimation + 1) + """ + self.input_sample_rate = input_sample_rate + self.channel_bw = channel_bw + self.center_freq = center_freq + self.channel_freq = channel_freq + + # Calculate decimation factor + self.decim = max(1, int(input_sample_rate / channel_bw)) + + # FIR filter design + if n_taps <= 0: + n_taps = self.decim * 4 + 1 # odd length for type-I linear phase + self.n_taps = n_taps + + # Normalized cutoff (Nyquist = 1.0) + cutoff = channel_bw / input_sample_rate + + # Design the filter + self._fir = self._design_lowpass(n_taps, cutoff) + + # Precompute frequency offset + self._freq_offset = channel_freq - center_freq + + def _design_lowpass(self, n_taps: int, cutoff: float) -> np.ndarray: + """Design windowed-sinc lowpass filter. + + Args: + n_taps: Number of filter taps (should be odd) + cutoff: Normalized cutoff frequency (0 to 1, where 1 = Nyquist) + + Returns: + FIR filter coefficients + """ + try: + from scipy.signal import firwin + return firwin(n_taps, cutoff).astype(np.float32) + except ImportError: + # Fallback: simple windowed sinc if scipy not available + n = np.arange(n_taps) - (n_taps - 1) / 2 + # Avoid division by zero + with np.errstate(divide='ignore', invalid='ignore'): + h = np.where(n == 0, 2 * cutoff, + np.sin(2 * np.pi * cutoff * n) / (np.pi * n)) + # Hamming window + window = 0.54 - 0.46 * np.cos(2 * np.pi * np.arange(n_taps) / (n_taps - 1)) + fir = (h * window).astype(np.float32) + return fir / np.sum(fir) # Normalize + + def channelize( + self, + iq_data: np.ndarray, + freq_offset: float | None = None, + ) -> np.ndarray: + """Channelize wideband IQ data to baseband at channel bandwidth. + + Args: + iq_data: Complex IQ samples at input_sample_rate + freq_offset: Override frequency offset (Hz). If None, uses + channel_freq - center_freq from init. + + Returns: + Complex IQ samples at channel_bw sample rate + """ + if freq_offset is None: + freq_offset = self._freq_offset + + n = len(iq_data) + + # Step 1: Frequency shift to baseband + if abs(freq_offset) > 0.01: # Only shift if significant offset + t = np.arange(n, dtype=np.float64) / self.input_sample_rate + shifted = iq_data * np.exp(-1j * 2 * np.pi * freq_offset * t).astype(np.complex64) + else: + shifted = iq_data.astype(np.complex64) + + # Step 2 & 3: Filter and decimate + if self.decim <= 1: + return shifted + + # Convolve with FIR filter + filtered = np.convolve(shifted, self._fir, mode="same") + + # Decimate + return filtered[::self.decim] + + @property + def output_sample_rate(self) -> float: + """Output sample rate after channelization.""" + return self.input_sample_rate / self.decim + + def __repr__(self) -> str: + return (f"Channelizer(in={self.input_sample_rate/1e3:.0f}kHz, " + f"out={self.output_sample_rate/1e3:.0f}kHz, " + f"decim={self.decim}, taps={self.n_taps})") + + +def channelize( + iq_data: np.ndarray, + input_sample_rate: float, + center_freq: float, + channel_freq: float, + channel_bw: float, +) -> np.ndarray: + """Convenience function to channelize wideband data. + + Args: + iq_data: Complex IQ samples at input_sample_rate + input_sample_rate: Sample rate of input data (Hz) + center_freq: Center frequency of capture (Hz) + channel_freq: Frequency of LoRa channel (Hz) + channel_bw: LoRa bandwidth (Hz) + + Returns: + Complex IQ samples at channel_bw sample rate + + Example: + >>> # 2 MHz capture, LoRa at 915 MHz, 125 kHz bandwidth + >>> baseband = channelize(iq, 2e6, 915e6, 915e6, 125e3) + """ + ch = Channelizer( + input_sample_rate=input_sample_rate, + channel_bw=channel_bw, + center_freq=center_freq, + channel_freq=channel_freq, + ) + return ch.channelize(iq_data) diff --git a/python/rylr998/frame_sync.py b/python/rylr998/frame_sync.py index 680df6e..8002743 100644 --- a/python/rylr998/frame_sync.py +++ b/python/rylr998/frame_sync.py @@ -125,10 +125,12 @@ class FrameSync: Preamble chirps should have: - Strong FFT peak (high SNR) - - Bin very close to 0 (or consistent with CFO) + - Bin consistent with previous preamble chirps (if any) - The tolerance must be tight enough to distinguish preamble (bin ~0) - from sync word symbols which can be as low as 8 for RYLR998. + Real SDR captures can have significant CFO (carrier frequency offset), + so the preamble bin can appear anywhere in 0..N-1, not just near 0. + The key insight is that preamble chirps have the SAME bin value + (modulo small noise) for many consecutive symbols. """ if peak_mag < 3.0: # Minimum SNR threshold return False @@ -137,14 +139,16 @@ class FrameSync: if self._preamble_count > 0: expected_bin = int(round(self._cfo_estimate)) % self.N # Tight tolerance: must be within 3 bins of expected - # This distinguishes preamble (bin ~0) from sync word (bin >= 8) + # This distinguishes preamble (bin ~0+CFO) from sync word (bin >= 8+CFO) tolerance = 3 distance = min(abs(peak_bin - expected_bin), self.N - abs(peak_bin - expected_bin)) return distance <= tolerance else: - # First preamble chirp - accept bins close to 0 or N-1 (CFO) - return peak_bin < 4 or peak_bin > self.N - 4 + # First preamble chirp - accept ANY strong signal + # Real captures have arbitrary CFO, so preamble can appear at any bin + # We'll validate by checking if subsequent chirps have the same bin + return True def _is_downchirp(self, samples: NDArray[np.complex64]) -> tuple[bool, float]: """Detect if samples contain a downchirp (SFD).