#!/usr/bin/env python3 """Test PHYDecode using symbols extracted by the existing decoder.""" import sys from pathlib import Path import numpy as np sys.path.insert(0, str(Path(__file__).parent.parent / "python")) sys.path.insert(0, str(Path(__file__).parent.parent.parent / "gnuradio")) from rylr998 import Channelizer, PHYDecode from lora_decode_gpu import GPULoraDecoder, channelize from lora_phy import decode_frame_grlora, sync_word_to_networkid # Params SF = 9 BW = 125e3 # Load capture capture_path = Path(__file__).parent.parent.parent / "gnuradio" / "logs" / "capture_multi.raw" iq_raw = np.fromfile(capture_path, dtype=np.complex64) print(f"Loaded {len(iq_raw):,} samples") # Channelize using existing function iq_ch = channelize(iq_raw, 2e6, 915e6, 915e6, BW) print(f"Channelized to {len(iq_ch):,} samples") # Create existing decoder dec = GPULoraDecoder(sf=SF, bw=BW, sample_rate=BW, use_gpu=False) # Detect symbols print("\nDetecting symbols with existing decoder...") symbols, snrs = dec.batch_detect_symbols(iq_ch) print(f"Got {len(symbols)} symbols") # Find preambles preambles = dec.find_preambles(symbols, snrs) print(f"Found {len(preambles)} preambles") for i, (start_idx, length, avg_snr) in enumerate(preambles[:3]): print(f"\n{'='*70}") print(f"Frame {i+1}: preamble at symbol {start_idx}, len={length}, SNR={avg_snr:.1f}dB") print("="*70) # Use existing decoder's demodulate_payload preamble_bin = int(symbols[start_idx]) preamble_start_samples = start_idx * dec.samples_per_symbol result = dec.demodulate_payload( iq_ch, preamble_start_samples, length, preamble_snr=avg_snr, preamble_bin=preamble_bin, ) if result is None: print(" demodulate_payload returned None") continue data_bins, cfo_bin = result print(f" CFO bin (preamble bin): {cfo_bin}") print(f" Data bins: {len(data_bins)} symbols") print(f" First 15 data bins: {list(data_bins[:15])}") # Decode with existing lora_phy print("\n Decoding with existing lora_phy.decode_frame_grlora:") frame = decode_frame_grlora(list(data_bins), sf=SF, cfo_bin=int(cfo_bin)) print(f" Header OK: {frame.header_ok}") print(f" Payload len: {frame.payload_length}") print(f" CR: {frame.coding_rate}") print(f" CRC: {frame.has_crc} / OK={frame.crc_ok}") if frame.payload: try: text = frame.payload.decode('utf-8', errors='replace') print(f" Payload: {repr(text)}") except: print(f" Payload hex: {frame.payload.hex()}") # Decode with our PHYDecode print("\n Decoding with gr-rylr998 PHYDecode:") our_decoder = PHYDecode(sf=SF) our_frame = our_decoder.decode( list(data_bins), cfo_bin=int(cfo_bin), use_grlora_gray=True, soft_decoding=False, ) print(f" Header OK: {our_frame.header_ok}") print(f" Payload len: {our_frame.payload_length}") print(f" CR: {our_frame.coding_rate}") print(f" CRC: {our_frame.has_crc} / OK={our_frame.crc_ok}") if our_frame.payload: try: text = our_frame.payload.decode('utf-8', errors='replace') print(f" Payload: {repr(text)}") except: print(f" Payload hex: {our_frame.payload.hex()}") # Check if they match if frame.payload == our_frame.payload: print("\n ✓ MATCH: Both decoders produced identical output!") else: print("\n ✗ MISMATCH: Decoders produced different output")