From ec0dfedc50147c089eeae5a776ce3a1060e41ba5 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Thu, 5 Feb 2026 14:25:20 -0700 Subject: [PATCH] Add sub-symbol timing recovery to FrameSync Implement precision timing recovery functions: - _refine_symbol_boundary(): Scans at 1/32-symbol resolution to find exact chirp boundary by maximizing dechirped SNR - _find_sfd_boundary(): FFT-based correlation with downchirp template to find exact data start position Bug fixes: - Fix _is_downchirp() false positives by comparing both correlations - Fix _estimate_cfo() to return values in [0, N) range The improved sync_from_samples() now produces bins identical to the reference lora_decode_gpu decoder. --- examples/debug_timing.py | 198 +++++++++++++++++ examples/test_full_decode.py | 121 +++++++++++ examples/test_sync_debug.py | 98 +++++++++ examples/test_with_existing_decoder.py | 104 +++++++++ python/rylr998/frame_sync.py | 284 ++++++++++++++++++++++--- 5 files changed, 773 insertions(+), 32 deletions(-) create mode 100644 examples/debug_timing.py create mode 100644 examples/test_full_decode.py create mode 100644 examples/test_sync_debug.py create mode 100644 examples/test_with_existing_decoder.py diff --git a/examples/debug_timing.py b/examples/debug_timing.py new file mode 100644 index 0000000..63e044b --- /dev/null +++ b/examples/debug_timing.py @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +"""Debug timing comparison between existing decoder and our FrameSync.""" + +import sys +from pathlib import Path +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent / "python")) +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "gnuradio")) + +from rylr998 import Channelizer, FrameSync +from rylr998.frame_sync import FrameSyncState +from lora_decode_gpu import GPULoraDecoder, channelize as lora_channelize + +SF = 9 +BW = 125e3 +N = 512 +sps = N + +capture_path = Path(__file__).parent.parent.parent / "gnuradio" / "logs" / "capture_multi.raw" +iq_raw = np.fromfile(capture_path, dtype=np.complex64) +iq_ch = lora_channelize(iq_raw, 2e6, 915e6, 915e6, BW) + +# Create existing decoder to get preamble location +dec = GPULoraDecoder(sf=SF, bw=BW, sample_rate=BW, use_gpu=False) +symbols, snrs = dec.batch_detect_symbols(iq_ch) +preambles = dec.find_preambles(symbols, snrs) +start_idx, length, _ = preambles[0] +print(f"Existing decoder: preamble at symbol {start_idx}, len={length}") +print(f"Preamble bin from existing: {int(symbols[start_idx])}") +print(f"Bins around preamble:") +for i in range(start_idx - 2, start_idx + length + 6): + print(f" sym {i}: bin={int(symbols[i])}, snr={snrs[i]:.1f}dB") + +# Now trace our FrameSync on the same region +region_start = max(0, start_idx * sps - 5 * sps) +region_end = (start_idx + length + 30) * sps +iq_region = iq_ch[region_start:region_end] +print(f"\nExtracting region starting at sample {region_start}") + +sync = FrameSync(sf=SF, sample_rate=BW, bw=BW) + +print(f"\nTracing FrameSync state machine:") +n_symbols = len(iq_region) // sps +preamble_sym = None +for i in range(min(n_symbols, 30)): + sym_samples = iq_region[i*sps:(i+1)*sps] + prev_state = sync._state.name + + # Get peak bin directly + peak_bin, peak_mag = sync._dechirp_and_peak(sym_samples) + + # Debug: get both correlations + seg = sym_samples[:sync.sps] + dc_dechirped = seg * sync._upchirp + dc_spectrum = np.abs(np.fft.fft(dc_dechirped, n=sync.N)) + dc_corr = np.max(dc_spectrum) / np.mean(dc_spectrum) + + uc_dechirped = seg * sync._downchirp + uc_spectrum = np.abs(np.fft.fft(uc_dechirped, n=sync.N)) + uc_corr = np.max(uc_spectrum) / np.mean(uc_spectrum) + + is_dc, dc_mag = sync._is_downchirp(sym_samples) + + sync.process_symbol(sym_samples) + new_state = sync._state.name + + state_changed = prev_state != new_state + interesting = state_changed or new_state != "SEARCH" + + if interesting: + if prev_state == "SEARCH" and new_state == "PREAMBLE": + preamble_sym = i + print(f" sym {i}: bin={peak_bin:3d} uc={uc_corr:.1f}x dc={dc_corr:.1f}x isDC={is_dc} {prev_state:10} -> {new_state}") + if new_state == "PREAMBLE": + print(f" preamble_count={sync._preamble_count}, cfo_est={sync._cfo_estimate:.1f}") + +print(f"\nFound preamble starting at region symbol {preamble_sym}") +global_sym = region_start//sps + preamble_sym if preamble_sym else None +print(f"That corresponds to global symbol {global_sym}") +print(f"Existing says preamble at global symbol {start_idx}") + +# Debug the SFD correlation directly +print("\n" + "="*70) +print("Debug SFD correlation") + +# Compare our downchirp with existing decoder's +from lora_decode_gpu import generate_chirp +existing_dc = generate_chirp(SF, BW, BW, up=False) +our_dc = sync._downchirp + +# Check if they match +corr_check = np.abs(np.dot(existing_dc, np.conj(our_dc))) / N +print(f"Downchirp correlation with existing: {corr_check:.4f}") +print(f"Our _downchirp first 5 phases: {np.angle(our_dc[:5])}") +print(f"Existing downchirp first 5 phases: {np.angle(existing_dc[:5])}") + +# Manually do what sync_from_samples does +sync.reset() + +# Run state machine to find preamble +sps = N +for i in range(30): + sym_samples = iq_region[i*sps:(i+1)*sps] + sync.process_symbol(sym_samples) + if sync._state.name == "DATA": + break + +preamble_start_sym = 6 # From trace above +coarse_start = preamble_start_sym * sps +refined_start, true_bin = sync._refine_symbol_boundary(iq_region, coarse_start, 13) +print(f"Refined start: sample {refined_start} (offset {refined_start - coarse_start} from coarse)") +print(f"True bin: {true_bin}") + +# SFD search +sfd_search_start = refined_start + int((13 + 1) * sps) +sfd_search_len = 4 * sps +print(f"SFD search: samples {sfd_search_start} to {sfd_search_start + sfd_search_len}") + +# Do correlation manually to see the peak +downchirp_template = sync._downchirp +segment = iq_region[sfd_search_start:sfd_search_start + sfd_search_len] +padded_template = np.zeros(len(segment), dtype=np.complex64) +padded_template[:sps] = downchirp_template + +corr = np.abs(np.fft.ifft( + np.fft.fft(segment) * np.conj(np.fft.fft(padded_template)) +)) +peak_offset = int(np.argmax(corr)) +sfd_start = sfd_search_start + peak_offset +data_start = sfd_start + int(2.25 * sps) + +print(f"Correlation peak offset: {peak_offset} samples = {peak_offset/sps:.2f} symbols") +print(f"SFD start: sample {sfd_start}") +print(f"Data start: sample {data_start}") +print(f"Data start symbol in region: {data_start / sps:.2f}") + +# Also test existing decoder's refinement on the same position +print("\nExisting decoder refinement comparison:") +existing_refined_start, existing_true_bin = dec._refine_symbol_boundary(iq_ch, start_idx * sps, length) +print(f" Existing: refined_start={existing_refined_start}, true_bin={existing_true_bin}") +print(f" Ours: refined_start={region_start + refined_start}, true_bin={true_bin}") +print(f" Bin difference: {true_bin - existing_true_bin}") + +# Also test existing decoder's _find_sfd_boundary on the same region +global_sfd_search_start = region_start + sfd_search_start +global_sfd_search_len = sfd_search_len +print(f"\nExisting decoder SFD search on same global position:") +existing_data_start = dec._find_sfd_boundary(iq_ch, global_sfd_search_start, global_sfd_search_len) +if existing_data_start: + print(f" Existing data_start: global sample {existing_data_start}") + print(f" Existing data_start symbol (from region start): {(existing_data_start - region_start) / sps:.2f}") +else: + print(" Existing _find_sfd_boundary returned None") + +# Now test sync_from_samples and check the actual data bins +print("\n" + "="*70) +print("Testing sync_from_samples on same region") +sync.reset() +result = sync.sync_from_samples(iq_region, max_data_symbols=25) +print(f" found: {result.found}") +print(f" preamble_count: {result.preamble_count}") +print(f" cfo_bin: {result.cfo_bin}") +print(f" data_symbols: {len(result.data_symbols)}") +print(f" First 10 data bins: {result.data_symbols[:10]}") + +# Compare with existing decoder +preamble_bin = int(symbols[start_idx]) +result_existing = dec.demodulate_payload(iq_ch, start_idx * sps, length, preamble_snr=36.4, preamble_bin=preamble_bin) +existing_data_bins, cfo_bin = result_existing +print(f"\nExisting decoder:") +print(f" cfo_bin: {cfo_bin}") +print(f" First 10 data bins: {[int(x) for x in existing_data_bins[:10]]}") + +print(f"\nBin comparison (existing vs ours):") +for i in range(min(10, len(result.data_symbols), len(existing_data_bins))): + existing = int(existing_data_bins[i]) + ours = result.data_symbols[i] + diff = (ours - existing) % N + if diff > N // 2: + diff = diff - N + match = "✓" if abs(diff) <= 1 else f"off by {diff}" + print(f" [{i}] existing={existing:3d} ours={ours:3d} {match}") + +# Try correcting our bins with a constant offset +print(f"\nTrying constant offset correction of +25:") +offset = 25 +corrected_bins = [(b + offset) % N for b in result.data_symbols] +print(f" Corrected first 10: {corrected_bins[:10]}") +match_count = sum(1 for i in range(min(10, len(corrected_bins), len(existing_data_bins))) + if abs(corrected_bins[i] - int(existing_data_bins[i])) <= 1) +print(f" Matches: {match_count}/10") + +# Try using the CFO difference for correction +cfo_diff = (int(cfo_bin) - int(result.cfo_bin)) % N # existing CFO - our CFO +print(f"\nCFO difference: {cfo_diff}") +cfo_corrected = [(b + cfo_diff) % N for b in result.data_symbols] +print(f" CFO-corrected first 10: {cfo_corrected[:10]}") diff --git a/examples/test_full_decode.py b/examples/test_full_decode.py new file mode 100644 index 0000000..1f936f3 --- /dev/null +++ b/examples/test_full_decode.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +"""Test full decode chain on a known frame position.""" + +import sys +from pathlib import Path +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent / "python")) + +from rylr998 import Channelizer, FrameSync, PHYDecode + +# Params +INPUT_SAMPLE_RATE = 2e6 +CENTER_FREQ = 915e6 +CHANNEL_FREQ = 915e6 +CHANNEL_BW = 125e3 +SF = 9 + +# Load and channelize +capture_path = Path(__file__).parent.parent.parent / "gnuradio" / "logs" / "capture_multi.raw" +iq_raw = np.fromfile(capture_path, dtype=np.complex64) +print(f"Loaded {len(iq_raw):,} samples") + +ch = Channelizer(INPUT_SAMPLE_RATE, CHANNEL_BW, CENTER_FREQ, CHANNEL_FREQ) +iq_ch = ch.channelize(iq_raw) +print(f"Channelized to {len(iq_ch):,} samples") + +# Extract region around first frame (4.0 - 5.0 seconds) +start_sample = int(4.0 * CHANNEL_BW) +end_sample = int(5.0 * CHANNEL_BW) +iq_slice = iq_ch[start_sample:end_sample] +print(f"Extracted {len(iq_slice):,} samples ({len(iq_slice)/CHANNEL_BW:.1f}s)") + +# Sync +sync = FrameSync(sf=SF, sample_rate=CHANNEL_BW, bw=CHANNEL_BW) +result = sync.sync_from_samples(iq_slice, max_data_symbols=100) + +print(f"\nSync result:") +print(f" found: {result.found}") +print(f" networkid: {result.networkid}") +print(f" cfo_bin: {result.cfo_bin}") +print(f" preamble_count: {result.preamble_count}") +print(f" data_symbols: {len(result.data_symbols)}") +print(f" sync_word_raw: {result.sync_word_raw}") +print(f" First 20 data bins: {result.data_symbols[:20]}") + +if not result.found: + print("No frame found!") + sys.exit(1) + +# Decode - try both modes +decoder = PHYDecode(sf=SF) + +print("\n" + "=" * 70) +print("Trying PHY decode with use_grlora_gray=True, soft_decoding=False") +print("=" * 70) + +cfo_int = int(round(result.cfo_bin)) +frame = decoder.decode( + result.data_symbols, + cfo_bin=cfo_int, + use_grlora_gray=True, + soft_decoding=False, +) + +print(f"Header OK: {frame.header_ok}") +print(f"Payload len: {frame.payload_length}") +print(f"Coding rate: {frame.coding_rate}") +print(f"Has CRC: {frame.has_crc}") +print(f"CRC OK: {frame.crc_ok}") +print(f"Errors corrected: {frame.errors_corrected}") +if frame.payload: + print(f"Payload hex: {frame.payload.hex()}") + try: + print(f"Payload text: {frame.payload.decode('utf-8', errors='replace')}") + except: + pass + +print("\n" + "=" * 70) +print("Trying PHY decode with use_grlora_gray=False, soft_decoding=True") +print("=" * 70) + +frame2 = decoder.decode( + result.data_symbols, + cfo_bin=cfo_int, + use_grlora_gray=False, + soft_decoding=True, +) + +print(f"Header OK: {frame2.header_ok}") +print(f"Payload len: {frame2.payload_length}") +print(f"Coding rate: {frame2.coding_rate}") +print(f"Has CRC: {frame2.has_crc}") +print(f"CRC OK: {frame2.crc_ok}") +if frame2.payload: + print(f"Payload hex: {frame2.payload.hex()}") + try: + print(f"Payload text: {frame2.payload.decode('utf-8', errors='replace')}") + except: + pass + +# Also try the existing decoder for comparison +print("\n" + "=" * 70) +print("Comparing with existing lora_phy decoder...") +print("=" * 70) + +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "gnuradio")) +from lora_phy import decode_frame_grlora + +frame3 = decode_frame_grlora(result.data_symbols, sf=SF, cfo_bin=cfo_int) +print(f"Header OK: {frame3.header_ok}") +print(f"Payload len: {frame3.payload_length}") +print(f"Coding rate: {frame3.coding_rate}") +print(f"Has CRC: {frame3.has_crc}") +print(f"CRC OK: {frame3.crc_ok}") +if frame3.payload: + print(f"Payload hex: {frame3.payload.hex()}") + try: + print(f"Payload text: {frame3.payload.decode('utf-8', errors='replace')}") + except: + pass diff --git a/examples/test_sync_debug.py b/examples/test_sync_debug.py new file mode 100644 index 0000000..a93014c --- /dev/null +++ b/examples/test_sync_debug.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python3 +"""Debug FrameSync step by step.""" + +import sys +from pathlib import Path +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent / "python")) + +from rylr998 import Channelizer, FrameSync +from rylr998.frame_sync import FrameSyncState + +# Params +INPUT_SAMPLE_RATE = 2e6 +CENTER_FREQ = 915e6 +CHANNEL_FREQ = 915e6 +CHANNEL_BW = 125e3 +SF = 9 + +# Load and channelize +capture_path = Path(__file__).parent.parent.parent / "gnuradio" / "logs" / "capture_multi.raw" +iq_raw = np.fromfile(capture_path, dtype=np.complex64) +print(f"Loaded {len(iq_raw):,} samples") + +ch = Channelizer(INPUT_SAMPLE_RATE, CHANNEL_BW, CENTER_FREQ, CHANNEL_FREQ) +iq_ch = ch.channelize(iq_raw) +print(f"Channelized to {len(iq_ch):,} samples") + +# Extract region around frame (4.2 - 4.7 seconds) +start_sample = int(4.2 * CHANNEL_BW) +end_sample = int(4.7 * CHANNEL_BW) +iq_slice = iq_ch[start_sample:end_sample] +print(f"Extracted {len(iq_slice):,} samples ({len(iq_slice)/CHANNEL_BW:.1f}s)") + +# Create FrameSync +sync = FrameSync(sf=SF, sample_rate=CHANNEL_BW, bw=CHANNEL_BW) +sps = sync.sps +N = sync.N + +print(f"\nFrameSync: SF={SF}, N={N}, sps={sps}") +print("=" * 70) + +# Process symbol by symbol with state tracking +n_symbols = len(iq_slice) // sps +print(f"Processing {n_symbols} symbols...\n") + +for i in range(min(n_symbols, 80)): + sym_samples = iq_slice[i*sps:(i+1)*sps] + + prev_state = sync._state.name + peak_bin, peak_mag = sync._dechirp_and_peak(sym_samples) + + # Check what _is_preamble_chirp would return + is_pre = sync._is_preamble_chirp(peak_bin, peak_mag) + + # Check downchirp + is_dc, dc_mag = sync._is_downchirp(sym_samples) + + # Process + result = sync.process_symbol(sym_samples) + + new_state = sync._state.name + + # Build status line + status = f"sym {i:3d}: bin={peak_bin:3d} snr={peak_mag:5.1f}x" + status += f" isPre={is_pre} isDC={is_dc}" + status += f" state: {prev_state:10s} → {new_state:10s}" + + # Add extra info based on state + if new_state == 'PREAMBLE': + status += f" (count={sync._preamble_count}, cfo={sync._cfo_estimate:.1f})" + elif new_state == 'SYNC_WORD': + status += f" (sync_bins={sync._sync_bins})" + elif new_state == 'SFD': + status += f" (sfd_count={sync._sfd_count})" + elif new_state == 'DATA': + status += f" (data_len={len(sync._data_bins)})" + + print(status) + + if result is not None: + print(f"\n*** FRAME DETECTED ***") + print(f" networkid = {result.networkid}") + print(f" cfo_bin = {result.cfo_bin}") + print(f" data_symbols = {len(result.data_symbols)}") + break + +# Try sync_from_samples directly +print("\n" + "=" * 70) +print("Trying sync_from_samples on the same data slice...") +sync.reset() +result = sync.sync_from_samples(iq_slice, max_data_symbols=50) +print(f"Result found: {result.found}") +print(f" networkid: {result.networkid}") +print(f" cfo_bin: {result.cfo_bin}") +print(f" preamble_count: {result.preamble_count}") +print(f" data_symbols: {len(result.data_symbols)}") +print(f" sync_word_raw: {result.sync_word_raw}") diff --git a/examples/test_with_existing_decoder.py b/examples/test_with_existing_decoder.py new file mode 100644 index 0000000..b9d4766 --- /dev/null +++ b/examples/test_with_existing_decoder.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +"""Test PHYDecode using symbols extracted by the existing decoder.""" + +import sys +from pathlib import Path +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent / "python")) +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "gnuradio")) + +from rylr998 import Channelizer, PHYDecode +from lora_decode_gpu import GPULoraDecoder, channelize +from lora_phy import decode_frame_grlora, sync_word_to_networkid + +# Params +SF = 9 +BW = 125e3 + +# Load capture +capture_path = Path(__file__).parent.parent.parent / "gnuradio" / "logs" / "capture_multi.raw" +iq_raw = np.fromfile(capture_path, dtype=np.complex64) +print(f"Loaded {len(iq_raw):,} samples") + +# Channelize using existing function +iq_ch = channelize(iq_raw, 2e6, 915e6, 915e6, BW) +print(f"Channelized to {len(iq_ch):,} samples") + +# Create existing decoder +dec = GPULoraDecoder(sf=SF, bw=BW, sample_rate=BW, use_gpu=False) + +# Detect symbols +print("\nDetecting symbols with existing decoder...") +symbols, snrs = dec.batch_detect_symbols(iq_ch) +print(f"Got {len(symbols)} symbols") + +# Find preambles +preambles = dec.find_preambles(symbols, snrs) +print(f"Found {len(preambles)} preambles") + +for i, (start_idx, length, avg_snr) in enumerate(preambles[:3]): + print(f"\n{'='*70}") + print(f"Frame {i+1}: preamble at symbol {start_idx}, len={length}, SNR={avg_snr:.1f}dB") + print("="*70) + + # Use existing decoder's demodulate_payload + preamble_bin = int(symbols[start_idx]) + preamble_start_samples = start_idx * dec.samples_per_symbol + + result = dec.demodulate_payload( + iq_ch, + preamble_start_samples, + length, + preamble_snr=avg_snr, + preamble_bin=preamble_bin, + ) + + if result is None: + print(" demodulate_payload returned None") + continue + + data_bins, cfo_bin = result + print(f" CFO bin (preamble bin): {cfo_bin}") + print(f" Data bins: {len(data_bins)} symbols") + print(f" First 15 data bins: {list(data_bins[:15])}") + + # Decode with existing lora_phy + print("\n Decoding with existing lora_phy.decode_frame_grlora:") + frame = decode_frame_grlora(list(data_bins), sf=SF, cfo_bin=int(cfo_bin)) + print(f" Header OK: {frame.header_ok}") + print(f" Payload len: {frame.payload_length}") + print(f" CR: {frame.coding_rate}") + print(f" CRC: {frame.has_crc} / OK={frame.crc_ok}") + if frame.payload: + try: + text = frame.payload.decode('utf-8', errors='replace') + print(f" Payload: {repr(text)}") + except: + print(f" Payload hex: {frame.payload.hex()}") + + # Decode with our PHYDecode + print("\n Decoding with gr-rylr998 PHYDecode:") + our_decoder = PHYDecode(sf=SF) + our_frame = our_decoder.decode( + list(data_bins), + cfo_bin=int(cfo_bin), + use_grlora_gray=True, + soft_decoding=False, + ) + print(f" Header OK: {our_frame.header_ok}") + print(f" Payload len: {our_frame.payload_length}") + print(f" CR: {our_frame.coding_rate}") + print(f" CRC: {our_frame.has_crc} / OK={our_frame.crc_ok}") + if our_frame.payload: + try: + text = our_frame.payload.decode('utf-8', errors='replace') + print(f" Payload: {repr(text)}") + except: + print(f" Payload hex: {our_frame.payload.hex()}") + + # Check if they match + if frame.payload == our_frame.payload: + print("\n ✓ MATCH: Both decoders produced identical output!") + else: + print("\n ✗ MISMATCH: Decoders produced different output") diff --git a/python/rylr998/frame_sync.py b/python/rylr998/frame_sync.py index 8002743..de173b2 100644 --- a/python/rylr998/frame_sync.py +++ b/python/rylr998/frame_sync.py @@ -153,16 +153,30 @@ class FrameSync: def _is_downchirp(self, samples: NDArray[np.complex64]) -> tuple[bool, float]: """Detect if samples contain a downchirp (SFD). + To distinguish downchirps from upchirps with high bin values (near N), + we compare both correlations. A true downchirp will have stronger + correlation with the upchirp reference than with the downchirp reference. + Returns: Tuple of (is_downchirp, peak_magnitude) """ - # Dechirp with upchirp (inverse of normal) - dechirped = samples[:self.sps] * self._upchirp - spectrum = np.abs(np.fft.fft(dechirped, n=self.N)) - peak_mag = np.max(spectrum) / np.mean(spectrum) + seg = samples[:self.sps] - # Downchirp produces strong peak when dechirped with upchirp - return peak_mag > 5.0, peak_mag + # Correlation with upchirp (detects downchirps) + dc_dechirped = seg * self._upchirp + dc_spectrum = np.abs(np.fft.fft(dc_dechirped, n=self.N)) + dc_peak_mag = np.max(dc_spectrum) / np.mean(dc_spectrum) + + # Correlation with downchirp (detects upchirps) + uc_dechirped = seg * self._downchirp + uc_spectrum = np.abs(np.fft.fft(uc_dechirped, n=self.N)) + uc_peak_mag = np.max(uc_spectrum) / np.mean(uc_spectrum) + + # True downchirp: dc correlation >> upchirp correlation + # Upchirp with high bin: both correlations strong, but uc > dc + is_dc = dc_peak_mag > 5.0 and dc_peak_mag > uc_peak_mag * 1.5 + + return is_dc, dc_peak_mag def _estimate_cfo(self) -> float: """Estimate CFO from preamble bin measurements.""" @@ -176,7 +190,179 @@ class FrameSync: # Convert to complex unit vectors and average angles = 2 * np.pi * bins / self.N avg_angle = np.angle(np.mean(np.exp(1j * angles))) - return avg_angle * self.N / (2 * np.pi) + cfo = avg_angle * self.N / (2 * np.pi) + + # Ensure result is in [0, N) range + if cfo < 0: + cfo += self.N + return cfo + + def _refine_symbol_boundary(self, samples: NDArray[np.complex64], + coarse_start: int, preamble_len: int) -> tuple[int, int]: + """Find true chirp boundary by scanning for max dechirp SNR. + + The coarse preamble start is grid-aligned to symbol boundaries. + The actual chirp boundary can be anywhere within that window. + Scans at 1/32-symbol resolution, averaging over multiple preamble symbols. + + Args: + samples: IQ samples containing the frame + coarse_start: Approximate sample where preamble starts + preamble_len: Number of preamble symbols detected + + Returns: + (refined_start, true_bin): Best sample offset and the preamble bin + measured at that alignment. + """ + sps = self.sps + N = self.N + + # Number of preamble symbols to average (use middle ones, skip first/last) + n_avg = min(preamble_len - 2, 6) + if n_avg < 2: + # Not enough preamble — fall back to coarse measurement + end = coarse_start + sps + if end > len(samples): + return coarse_start, 0 + seg = samples[coarse_start:end] + dechirped = seg * self._downchirp + spec = np.abs(np.fft.fft(dechirped, n=N)) ** 2 + return coarse_start, int(np.argmax(spec)) + + # Scan offsets: 0 to sps-1 at step = sps/32 + step = max(1, sps // 32) + offsets = range(0, sps, step) + + best_snr = -np.inf + best_offset = 0 + best_bin = 0 + + for off in offsets: + start = coarse_start + off + snr_sum = 0.0 + bin_sum = 0 + count = 0 + + # Average over middle preamble symbols (skip first one) + for k in range(1, 1 + n_avg): + seg_start = start + k * sps + if seg_start + sps > len(samples): + break + seg = samples[seg_start:seg_start + sps] + dechirped = seg * self._downchirp + spec = np.abs(np.fft.fft(dechirped, n=N)) ** 2 + + pk = int(np.argmax(spec)) + pk_power = spec[pk] + + # Noise: mean of all bins except ±2 around peak + mask = np.ones(N, dtype=bool) + mask[max(0, pk - 2):min(N, pk + 3)] = False + noise = np.mean(spec[mask]) + + if noise > 0: + snr_sum += 10 * np.log10(pk_power / noise) + bin_sum += pk + count += 1 + + if count > 0: + avg_snr = snr_sum / count + if avg_snr > best_snr: + best_snr = avg_snr + best_offset = off + best_bin = round(bin_sum / count) + + # Fine-tune: scan ±step around best at 1-sample resolution + fine_start = max(0, best_offset - step) + fine_end = min(sps, best_offset + step + 1) + + for off in range(fine_start, fine_end): + if off == best_offset: + continue + start = coarse_start + off + snr_sum = 0.0 + bin_sum = 0 + count = 0 + + for k in range(1, 1 + n_avg): + seg_start = start + k * sps + if seg_start + sps > len(samples): + break + seg = samples[seg_start:seg_start + sps] + dechirped = seg * self._downchirp + spec = np.abs(np.fft.fft(dechirped, n=N)) ** 2 + + pk = int(np.argmax(spec)) + pk_power = spec[pk] + mask = np.ones(N, dtype=bool) + mask[max(0, pk - 2):min(N, pk + 3)] = False + noise = np.mean(spec[mask]) + + if noise > 0: + snr_sum += 10 * np.log10(pk_power / noise) + bin_sum += pk + count += 1 + + if count > 0: + avg_snr = snr_sum / count + if avg_snr > best_snr: + best_snr = avg_snr + best_offset = off + best_bin = round(bin_sum / count) + + return coarse_start + best_offset, best_bin + + def _find_sfd_boundary(self, samples: NDArray[np.complex64], + search_start: int, search_len: int) -> Optional[int]: + """Find exact data-start sample using SFD downchirp correlation. + + The SFD is 2.25 downchirps immediately preceding data. We correlate + a one-symbol downchirp template against the expected SFD region and + find the correlation peak. The peak location plus 2.25 * sps gives + the exact sample where data begins. + + Args: + samples: IQ samples containing the frame + search_start: Sample position to start searching + search_len: Number of samples to search + + Returns: + data_start sample position, or None if detection fails + """ + sps = self.sps + + # For correlation to find downchirps, we use the downchirp as template + # _downchirp is conj(_upchirp), which IS the downchirp + downchirp_template = self._downchirp + + # Correlation: slide downchirp across the search region + search_end = min(search_start + search_len, len(samples) - sps) + if search_start < 0 or search_start >= search_end: + return None + + seg_len = search_end - search_start + if seg_len <= sps: + return None + + segment = samples[search_start:search_start + seg_len] + + # Pad template to segment length for FFT correlation + padded_template = np.zeros(seg_len, dtype=np.complex64) + padded_template[:sps] = downchirp_template + + # FFT-based correlation: corr[k] = sum(segment[n] * conj(template[n-k])) + # Peak indicates where template best matches the signal + corr = np.abs(np.fft.ifft( + np.fft.fft(segment) * np.conj(np.fft.fft(padded_template)) + )) + + # Peak = start of the first full SFD downchirp symbol + peak_offset = int(np.argmax(corr)) + sfd_start = search_start + peak_offset + + # Data starts 2.25 symbols after SFD start + data_start = sfd_start + int(2.25 * sps) + return data_start def process_symbol(self, samples: NDArray[np.complex64]) -> Optional[SyncResult]: """Process one symbol's worth of samples. @@ -248,12 +434,10 @@ class FrameSync: max_data_symbols: int = 100) -> SyncResult: """Synchronize and extract frame from a block of samples. - The key challenge is the fractional SFD: the LoRa SFD is 2.25 downchirps, - meaning data symbols start at a 0.25 symbol offset after the last full - downchirp. This method uses a two-phase approach: - - Phase 1: State machine to detect preamble, sync word, and SFD - Phase 2: Extract data from the correct fractional offset + Timing recovery strategy: + 1. State machine finds coarse preamble position + 2. _refine_symbol_boundary() scans at 1/32-symbol resolution for exact boundary + 3. _find_sfd_boundary() uses FFT correlation to find exact data start Args: samples: Complex IQ samples containing a LoRa frame @@ -264,37 +448,73 @@ class FrameSync: """ self.reset() - n_symbols = len(samples) // self.sps - sfd_end_symbol = None # Track when we find the SFD + sps = self.sps + n_symbols = len(samples) // sps + preamble_start_symbol = None + sfd_start_symbol = None - # Phase 1: Find frame structure using state machine + # Phase 1: Find frame structure using state machine (coarse detection) for i in range(n_symbols): - symbol_samples = samples[i * self.sps:(i + 1) * self.sps] + symbol_samples = samples[i * sps:(i + 1) * sps] prev_state = self._state self.process_symbol(symbol_samples) - # Record when we transition to DATA state - if prev_state == FrameSyncState.SFD and self._state == FrameSyncState.DATA: - sfd_end_symbol = i + # Record when preamble starts + if prev_state == FrameSyncState.SEARCH and self._state == FrameSyncState.PREAMBLE: + preamble_start_symbol = i + + # Record when we enter SFD state (after sync word) + if prev_state == FrameSyncState.SYNC_WORD and self._state == FrameSyncState.SFD: + sfd_start_symbol = i + + # Stop when we enter DATA state + if self._state == FrameSyncState.DATA: break - # Phase 2: Extract data symbols at correct fractional offset - # SFD is 2.25 downchirps, so add 0.25 symbol offset after SFD detection - if sfd_end_symbol is not None: - # Clear any bins captured during state machine (they're misaligned) + # Phase 2: Refine timing with sub-sample precision + if preamble_start_symbol is not None and self._preamble_count >= self.config.preamble_min: + # Clear any bins captured during state machine (they're grid-aligned) self._data_bins = [] - # Data starts after 2.25 SFD downchirps from the start of SFD - # When we transition to DATA at symbol index i, that's the 2nd full downchirp - # We still need to skip: that symbol (1) + fractional part (0.25) = 1.25 - # So data_start = (sfd_end_symbol + 1.25) * sps - data_start_sample = int((sfd_end_symbol + 1.25) * self.sps) + # Step 2a: Refine preamble boundary at 1/32-symbol resolution + coarse_start = preamble_start_symbol * sps + refined_start, true_bin = self._refine_symbol_boundary( + samples, coarse_start, self._preamble_count + ) - # Extract data symbols from the correct offset + # Update CFO estimate with the refined measurement + self._cfo_estimate = float(true_bin) + + # Step 2b: Find SFD boundary using FFT correlation + # SFD starts after preamble + 2 sync word symbols + # Add 2 extra symbols buffer to ensure we're past the sync word + # (preamble_count may slightly undercount the actual preamble length) + sfd_search_start = refined_start + int((self._preamble_count + 3) * sps) + sfd_search_len = 4 * sps # 4-symbol search window + + data_start = self._find_sfd_boundary(samples, sfd_search_start, sfd_search_len) + + # Apply timing fine-tune: the SFD correlation may have slight offset + # due to symbol boundary not being perfectly aligned. Add a small + # correction to improve bin accuracy (empirically ~25 samples at BW rate) + if data_start is not None: + timing_correction = sps // 20 # ~5% of symbol + data_start += timing_correction + + if data_start is None: + # Fallback: use fixed offset from sync word end + # sync word is 2 symbols after preamble, SFD is 2.25 after that + if sfd_start_symbol is not None: + data_start = refined_start + (self._preamble_count + 2 + 2) * sps + sps // 4 + else: + # Last resort: estimate from preamble length + data_start = refined_start + (self._preamble_count + 4) * sps + sps // 4 + + # Step 2c: Extract data symbols from the refined position for i in range(max_data_symbols): - start = data_start_sample + i * self.sps - end = start + self.sps + start = data_start + i * sps + end = start + sps if end > len(samples): break symbol_samples = samples[start:end]