gr-rylr998/examples/test_sync_debug.py
Ryan Malloy ec0dfedc50 Add sub-symbol timing recovery to FrameSync
Implement precision timing recovery functions:
- _refine_symbol_boundary(): Scans at 1/32-symbol resolution to find
  exact chirp boundary by maximizing dechirped SNR
- _find_sfd_boundary(): FFT-based correlation with downchirp template
  to find exact data start position

Bug fixes:
- Fix _is_downchirp() false positives by comparing both correlations
- Fix _estimate_cfo() to return values in [0, N) range

The improved sync_from_samples() now produces bins identical to the
reference lora_decode_gpu decoder.
2026-02-05 14:25:20 -07:00

99 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""Debug FrameSync step by step."""
import sys
from pathlib import Path
import numpy as np
sys.path.insert(0, str(Path(__file__).parent.parent / "python"))
from rylr998 import Channelizer, FrameSync
from rylr998.frame_sync import FrameSyncState
# Params
INPUT_SAMPLE_RATE = 2e6
CENTER_FREQ = 915e6
CHANNEL_FREQ = 915e6
CHANNEL_BW = 125e3
SF = 9
# Load and channelize
capture_path = Path(__file__).parent.parent.parent / "gnuradio" / "logs" / "capture_multi.raw"
iq_raw = np.fromfile(capture_path, dtype=np.complex64)
print(f"Loaded {len(iq_raw):,} samples")
ch = Channelizer(INPUT_SAMPLE_RATE, CHANNEL_BW, CENTER_FREQ, CHANNEL_FREQ)
iq_ch = ch.channelize(iq_raw)
print(f"Channelized to {len(iq_ch):,} samples")
# Extract region around frame (4.2 - 4.7 seconds)
start_sample = int(4.2 * CHANNEL_BW)
end_sample = int(4.7 * CHANNEL_BW)
iq_slice = iq_ch[start_sample:end_sample]
print(f"Extracted {len(iq_slice):,} samples ({len(iq_slice)/CHANNEL_BW:.1f}s)")
# Create FrameSync
sync = FrameSync(sf=SF, sample_rate=CHANNEL_BW, bw=CHANNEL_BW)
sps = sync.sps
N = sync.N
print(f"\nFrameSync: SF={SF}, N={N}, sps={sps}")
print("=" * 70)
# Process symbol by symbol with state tracking
n_symbols = len(iq_slice) // sps
print(f"Processing {n_symbols} symbols...\n")
for i in range(min(n_symbols, 80)):
sym_samples = iq_slice[i*sps:(i+1)*sps]
prev_state = sync._state.name
peak_bin, peak_mag = sync._dechirp_and_peak(sym_samples)
# Check what _is_preamble_chirp would return
is_pre = sync._is_preamble_chirp(peak_bin, peak_mag)
# Check downchirp
is_dc, dc_mag = sync._is_downchirp(sym_samples)
# Process
result = sync.process_symbol(sym_samples)
new_state = sync._state.name
# Build status line
status = f"sym {i:3d}: bin={peak_bin:3d} snr={peak_mag:5.1f}x"
status += f" isPre={is_pre} isDC={is_dc}"
status += f" state: {prev_state:10s}{new_state:10s}"
# Add extra info based on state
if new_state == 'PREAMBLE':
status += f" (count={sync._preamble_count}, cfo={sync._cfo_estimate:.1f})"
elif new_state == 'SYNC_WORD':
status += f" (sync_bins={sync._sync_bins})"
elif new_state == 'SFD':
status += f" (sfd_count={sync._sfd_count})"
elif new_state == 'DATA':
status += f" (data_len={len(sync._data_bins)})"
print(status)
if result is not None:
print(f"\n*** FRAME DETECTED ***")
print(f" networkid = {result.networkid}")
print(f" cfo_bin = {result.cfo_bin}")
print(f" data_symbols = {len(result.data_symbols)}")
break
# Try sync_from_samples directly
print("\n" + "=" * 70)
print("Trying sync_from_samples on the same data slice...")
sync.reset()
result = sync.sync_from_samples(iq_slice, max_data_symbols=50)
print(f"Result found: {result.found}")
print(f" networkid: {result.networkid}")
print(f" cfo_bin: {result.cfo_bin}")
print(f" preamble_count: {result.preamble_count}")
print(f" data_symbols: {len(result.data_symbols)}")
print(f" sync_word_raw: {result.sync_word_raw}")