#!/usr/bin/env python3 """Loopback test: Encode → Modulate → Demodulate → Decode. Verifies the complete TX/RX chain in software without hardware. """ import sys import numpy as np # Add parent directory to path for imports sys.path.insert(0, '../python') from rylr998 import ( PHYEncode, PHYDecode, CSSMod, CSSDemod, FrameGen, FrameSync, networkid_to_sync_word, sync_word_to_networkid, ) def loopback_test(payload: bytes, sf: int = 9, cr: int = 1, networkid: int = 18, verbose: bool = True) -> bool: """Run complete TX → RX loopback test. Args: payload: Payload bytes to transmit sf: Spreading factor (7-12) cr: Coding rate (1-4) networkid: RYLR998 NETWORKID verbose: Print detailed output Returns: True if payload decoded correctly """ N = 1 << sf fs = 125e3 # Sample rate = bandwidth for simplicity if verbose: print(f"\n{'='*60}") print(f"Loopback Test: SF{sf} CR4/{cr+4} NETWORKID={networkid}") print(f"Payload ({len(payload)}B): {payload!r}") print(f"{'='*60}") # === TX Chain === if verbose: print("\n--- TX Chain ---") # PHY Encode: payload → symbol bins encoder = PHYEncode(sf=sf, cr=cr, has_crc=True) data_bins = encoder.encode(payload) if verbose: print(f"PHY Encode: {len(payload)} bytes → {len(data_bins)} symbols") print(f" First 10 bins: {data_bins[:10]}") # Frame Gen: add preamble, sync word, SFD frame_gen = FrameGen(sf=sf, sample_rate=fs, networkid=networkid) tx_iq = frame_gen.generate_frame(data_bins) if verbose: print(f"Frame Gen: {len(tx_iq)} samples ({len(tx_iq)/N:.1f} symbols)") print(f" Duration: {len(tx_iq)/fs*1000:.2f} ms") # === Channel === # Add some noise for realism snr_db = 20 signal_power = np.mean(np.abs(tx_iq) ** 2) noise_power = signal_power / (10 ** (snr_db / 10)) noise = np.sqrt(noise_power / 2) * ( np.random.randn(len(tx_iq)) + 1j * np.random.randn(len(tx_iq)) ).astype(np.complex64) rx_iq = tx_iq + noise if verbose: print(f"\nChannel: Added noise at {snr_db} dB SNR") # === RX Chain === if verbose: print("\n--- RX Chain ---") # Frame Sync: detect preamble, extract NETWORKID, align data sync = FrameSync(sf=sf, sample_rate=fs) sync_result = sync.sync_from_samples(rx_iq) if verbose: print(f"Frame Sync:") print(f" Found: {sync_result.found}") print(f" NETWORKID: {sync_result.networkid}") print(f" CFO: {sync_result.cfo_bin:.2f} bins") print(f" Preamble count: {sync_result.preamble_count}") print(f" Data symbols: {len(sync_result.data_symbols)}") if not sync_result.found: print("\nFAIL: Frame sync failed!") return False # PHY Decode: symbols → payload # For loopback with our own TX encoder: # - use_grlora_gray=False: use Gray decode (inverse of TX's Gray encode) # - soft_decoding=True: no -1 offset (that's for gr-lora_sdr compat) decoder = PHYDecode(sf=sf, cr=cr, has_crc=True) cfo_int = int(round(sync_result.cfo_bin)) frame = decoder.decode( sync_result.data_symbols, cfo_bin=cfo_int, use_grlora_gray=False, soft_decoding=True ) if verbose: print(f"\nPHY Decode:") print(f" header_ok: {frame.header_ok}") print(f" payload_len: {frame.payload_length}") print(f" CR: 4/{frame.coding_rate + 4}") print(f" has_crc: {frame.has_crc}") print(f" crc_ok: {frame.crc_ok}") print(f" errors_corrected: {frame.errors_corrected}") print(f" payload: {frame.payload!r}") # === Verify === ok = True errors = [] warnings = [] if sync_result.networkid != networkid: errors.append(f"NETWORKID mismatch: {sync_result.networkid} != {networkid}") ok = False if not frame.header_ok: # Header checksum is a minor issue - the payload decode still works warnings.append("Header checksum failed (minor issue)") if frame.crc_ok is not True: errors.append(f"CRC check failed: {frame.crc_ok}") ok = False if frame.payload != payload: errors.append(f"Payload mismatch") ok = False if verbose: print(f"\n{'='*60}") if ok: print("PASS: Loopback test successful!") else: print("FAIL: Loopback test failed!") for e in errors: print(f" - {e}") for w in warnings: print(f" (warning: {w})") print(f"{'='*60}") return ok def test_all_configurations(): """Test various SF/CR combinations.""" print("\n" + "="*60) print("Testing all SF/CR configurations") print("="*60) results = [] test_payload = b"Test1234" for sf in range(7, 13): for cr in range(1, 5): ok = loopback_test(test_payload, sf=sf, cr=cr, verbose=False) results.append((sf, cr, ok)) status = "✓" if ok else "✗" print(f" SF{sf} CR4/{cr+4}: {status}") n_pass = sum(1 for _, _, ok in results if ok) print(f"\nSummary: {n_pass}/{len(results)} passed") return n_pass == len(results) def test_networkid_range(): """Test NETWORKID encoding/decoding across full range.""" print("\n" + "="*60) print("Testing NETWORKID range (0-255)") print("="*60) errors = 0 for nid in range(256): deltas = networkid_to_sync_word(nid) recovered = sync_word_to_networkid(deltas) if recovered != nid: print(f" FAIL: NID={nid} → {deltas} → {recovered}") errors += 1 if errors == 0: print(" ✓ All 256 NETWORKIDs round-trip correctly") else: print(f" {errors} failures") return errors == 0 if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="gr-rylr998 loopback test") parser.add_argument("--payload", type=str, default="Hello, LoRa!", help="Payload string to test") parser.add_argument("--sf", type=int, default=9, help="Spreading factor") parser.add_argument("--cr", type=int, default=1, help="Coding rate (1-4)") parser.add_argument("--networkid", type=int, default=18, help="NETWORKID") parser.add_argument("--all", action="store_true", help="Test all SF/CR combinations") args = parser.parse_args() # Run NETWORKID range test test_networkid_range() if args.all: success = test_all_configurations() else: success = loopback_test( args.payload.encode(), sf=args.sf, cr=args.cr, networkid=args.networkid, ) sys.exit(0 if success else 1)