Implement precision timing recovery functions: - _refine_symbol_boundary(): Scans at 1/32-symbol resolution to find exact chirp boundary by maximizing dechirped SNR - _find_sfd_boundary(): FFT-based correlation with downchirp template to find exact data start position Bug fixes: - Fix _is_downchirp() false positives by comparing both correlations - Fix _estimate_cfo() to return values in [0, N) range The improved sync_from_samples() now produces bins identical to the reference lora_decode_gpu decoder.
122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Test full decode chain on a known frame position."""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
import numpy as np
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / "python"))
|
|
|
|
from rylr998 import Channelizer, FrameSync, PHYDecode
|
|
|
|
# Params
|
|
INPUT_SAMPLE_RATE = 2e6
|
|
CENTER_FREQ = 915e6
|
|
CHANNEL_FREQ = 915e6
|
|
CHANNEL_BW = 125e3
|
|
SF = 9
|
|
|
|
# Load and channelize
|
|
capture_path = Path(__file__).parent.parent.parent / "gnuradio" / "logs" / "capture_multi.raw"
|
|
iq_raw = np.fromfile(capture_path, dtype=np.complex64)
|
|
print(f"Loaded {len(iq_raw):,} samples")
|
|
|
|
ch = Channelizer(INPUT_SAMPLE_RATE, CHANNEL_BW, CENTER_FREQ, CHANNEL_FREQ)
|
|
iq_ch = ch.channelize(iq_raw)
|
|
print(f"Channelized to {len(iq_ch):,} samples")
|
|
|
|
# Extract region around first frame (4.0 - 5.0 seconds)
|
|
start_sample = int(4.0 * CHANNEL_BW)
|
|
end_sample = int(5.0 * CHANNEL_BW)
|
|
iq_slice = iq_ch[start_sample:end_sample]
|
|
print(f"Extracted {len(iq_slice):,} samples ({len(iq_slice)/CHANNEL_BW:.1f}s)")
|
|
|
|
# Sync
|
|
sync = FrameSync(sf=SF, sample_rate=CHANNEL_BW, bw=CHANNEL_BW)
|
|
result = sync.sync_from_samples(iq_slice, max_data_symbols=100)
|
|
|
|
print(f"\nSync result:")
|
|
print(f" found: {result.found}")
|
|
print(f" networkid: {result.networkid}")
|
|
print(f" cfo_bin: {result.cfo_bin}")
|
|
print(f" preamble_count: {result.preamble_count}")
|
|
print(f" data_symbols: {len(result.data_symbols)}")
|
|
print(f" sync_word_raw: {result.sync_word_raw}")
|
|
print(f" First 20 data bins: {result.data_symbols[:20]}")
|
|
|
|
if not result.found:
|
|
print("No frame found!")
|
|
sys.exit(1)
|
|
|
|
# Decode - try both modes
|
|
decoder = PHYDecode(sf=SF)
|
|
|
|
print("\n" + "=" * 70)
|
|
print("Trying PHY decode with use_grlora_gray=True, soft_decoding=False")
|
|
print("=" * 70)
|
|
|
|
cfo_int = int(round(result.cfo_bin))
|
|
frame = decoder.decode(
|
|
result.data_symbols,
|
|
cfo_bin=cfo_int,
|
|
use_grlora_gray=True,
|
|
soft_decoding=False,
|
|
)
|
|
|
|
print(f"Header OK: {frame.header_ok}")
|
|
print(f"Payload len: {frame.payload_length}")
|
|
print(f"Coding rate: {frame.coding_rate}")
|
|
print(f"Has CRC: {frame.has_crc}")
|
|
print(f"CRC OK: {frame.crc_ok}")
|
|
print(f"Errors corrected: {frame.errors_corrected}")
|
|
if frame.payload:
|
|
print(f"Payload hex: {frame.payload.hex()}")
|
|
try:
|
|
print(f"Payload text: {frame.payload.decode('utf-8', errors='replace')}")
|
|
except:
|
|
pass
|
|
|
|
print("\n" + "=" * 70)
|
|
print("Trying PHY decode with use_grlora_gray=False, soft_decoding=True")
|
|
print("=" * 70)
|
|
|
|
frame2 = decoder.decode(
|
|
result.data_symbols,
|
|
cfo_bin=cfo_int,
|
|
use_grlora_gray=False,
|
|
soft_decoding=True,
|
|
)
|
|
|
|
print(f"Header OK: {frame2.header_ok}")
|
|
print(f"Payload len: {frame2.payload_length}")
|
|
print(f"Coding rate: {frame2.coding_rate}")
|
|
print(f"Has CRC: {frame2.has_crc}")
|
|
print(f"CRC OK: {frame2.crc_ok}")
|
|
if frame2.payload:
|
|
print(f"Payload hex: {frame2.payload.hex()}")
|
|
try:
|
|
print(f"Payload text: {frame2.payload.decode('utf-8', errors='replace')}")
|
|
except:
|
|
pass
|
|
|
|
# Also try the existing decoder for comparison
|
|
print("\n" + "=" * 70)
|
|
print("Comparing with existing lora_phy decoder...")
|
|
print("=" * 70)
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "gnuradio"))
|
|
from lora_phy import decode_frame_grlora
|
|
|
|
frame3 = decode_frame_grlora(result.data_symbols, sf=SF, cfo_bin=cfo_int)
|
|
print(f"Header OK: {frame3.header_ok}")
|
|
print(f"Payload len: {frame3.payload_length}")
|
|
print(f"Coding rate: {frame3.coding_rate}")
|
|
print(f"Has CRC: {frame3.has_crc}")
|
|
print(f"CRC OK: {frame3.crc_ok}")
|
|
if frame3.payload:
|
|
print(f"Payload hex: {frame3.payload.hex()}")
|
|
try:
|
|
print(f"Payload text: {frame3.payload.decode('utf-8', errors='replace')}")
|
|
except:
|
|
pass
|