#!/usr/bin/env python3 """Debug FrameSync step by step.""" import sys from pathlib import Path import numpy as np sys.path.insert(0, str(Path(__file__).parent.parent / "python")) from rylr998 import Channelizer, FrameSync from rylr998.frame_sync import FrameSyncState # Params INPUT_SAMPLE_RATE = 2e6 CENTER_FREQ = 915e6 CHANNEL_FREQ = 915e6 CHANNEL_BW = 125e3 SF = 9 # Load and channelize capture_path = Path(__file__).parent.parent.parent / "gnuradio" / "logs" / "capture_multi.raw" iq_raw = np.fromfile(capture_path, dtype=np.complex64) print(f"Loaded {len(iq_raw):,} samples") ch = Channelizer(INPUT_SAMPLE_RATE, CHANNEL_BW, CENTER_FREQ, CHANNEL_FREQ) iq_ch = ch.channelize(iq_raw) print(f"Channelized to {len(iq_ch):,} samples") # Extract region around frame (4.2 - 4.7 seconds) start_sample = int(4.2 * CHANNEL_BW) end_sample = int(4.7 * CHANNEL_BW) iq_slice = iq_ch[start_sample:end_sample] print(f"Extracted {len(iq_slice):,} samples ({len(iq_slice)/CHANNEL_BW:.1f}s)") # Create FrameSync sync = FrameSync(sf=SF, sample_rate=CHANNEL_BW, bw=CHANNEL_BW) sps = sync.sps N = sync.N print(f"\nFrameSync: SF={SF}, N={N}, sps={sps}") print("=" * 70) # Process symbol by symbol with state tracking n_symbols = len(iq_slice) // sps print(f"Processing {n_symbols} symbols...\n") for i in range(min(n_symbols, 80)): sym_samples = iq_slice[i*sps:(i+1)*sps] prev_state = sync._state.name peak_bin, peak_mag = sync._dechirp_and_peak(sym_samples) # Check what _is_preamble_chirp would return is_pre = sync._is_preamble_chirp(peak_bin, peak_mag) # Check downchirp is_dc, dc_mag = sync._is_downchirp(sym_samples) # Process result = sync.process_symbol(sym_samples) new_state = sync._state.name # Build status line status = f"sym {i:3d}: bin={peak_bin:3d} snr={peak_mag:5.1f}x" status += f" isPre={is_pre} isDC={is_dc}" status += f" state: {prev_state:10s} → {new_state:10s}" # Add extra info based on state if new_state == 'PREAMBLE': status += f" (count={sync._preamble_count}, cfo={sync._cfo_estimate:.1f})" elif new_state == 'SYNC_WORD': status += f" (sync_bins={sync._sync_bins})" elif new_state == 'SFD': status += f" (sfd_count={sync._sfd_count})" elif new_state == 'DATA': status += f" (data_len={len(sync._data_bins)})" print(status) if result is not None: print(f"\n*** FRAME DETECTED ***") print(f" networkid = {result.networkid}") print(f" cfo_bin = {result.cfo_bin}") print(f" data_symbols = {len(result.data_symbols)}") break # Try sync_from_samples directly print("\n" + "=" * 70) print("Trying sync_from_samples on the same data slice...") sync.reset() result = sync.sync_from_samples(iq_slice, max_data_symbols=50) print(f"Result found: {result.found}") print(f" networkid: {result.networkid}") print(f" cfo_bin: {result.cfo_bin}") print(f" preamble_count: {result.preamble_count}") print(f" data_symbols: {len(result.data_symbols)}") print(f" sync_word_raw: {result.sync_word_raw}")