"""Tests for Apollo PCM frame synchronizer.""" import numpy as np from apollo.constants import ( PCM_HIGH_BIT_RATE, PCM_HIGH_WORDS_PER_FRAME, PCM_LOW_BIT_RATE, PCM_LOW_WORDS_PER_FRAME, PCM_SYNC_WORD_LENGTH, PCM_WORD_LENGTH, ) from apollo.pcm_frame_sync import ( STATE_LOCKED, STATE_SEARCH, STATE_VERIFY, FrameSyncEngine, _bits_to_bytes, _hamming_distance, ) from apollo.usb_signal_gen import generate_pcm_frame def _make_frame_bits(frame_id: int = 1, odd: bool = False, data: bytes | None = None): """Helper: generate a complete frame as a bit list.""" return generate_pcm_frame(frame_id=frame_id, odd=odd, data=data) def _make_multi_frame_bits(n_frames: int = 5, data: bytes | None = None) -> list[int]: """Helper: generate N consecutive frames concatenated as a bit stream.""" all_bits = [] for i in range(n_frames): fid = (i % 50) + 1 odd = (fid % 2) == 1 all_bits.extend(_make_frame_bits(frame_id=fid, odd=odd, data=data)) return all_bits class TestHammingDistance: """Unit tests for the Hamming distance helper.""" def test_identical(self): assert _hamming_distance([1, 0, 1, 0], [1, 0, 1, 0]) == 0 def test_all_different(self): assert _hamming_distance([1, 1, 1, 1], [0, 0, 0, 0]) == 4 def test_one_error(self): assert _hamming_distance([1, 0, 1, 0], [1, 0, 0, 0]) == 1 class TestBitsToBytes: """Unit tests for bit-to-byte packing.""" def test_single_byte(self): assert _bits_to_bytes([1, 0, 1, 0, 1, 0, 1, 0]) == bytes([0xAA]) def test_two_bytes(self): bits = [1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1] assert _bits_to_bytes(bits) == bytes([0xF0, 0x0F]) def test_zero_byte(self): assert _bits_to_bytes([0, 0, 0, 0, 0, 0, 0, 0]) == bytes([0x00]) def test_ff_byte(self): assert _bits_to_bytes([1, 1, 1, 1, 1, 1, 1, 1]) == bytes([0xFF]) class TestSyncAcquisitionFromRandomOffset: """Test that the engine can find sync from an arbitrary bit offset.""" def test_acquire_with_no_offset(self): """Frame starting at bit 0 should be acquired.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=3) bits = _make_multi_frame_bits(n_frames=4) frames = engine.process_bits(bits) assert len(frames) >= 1, "Should acquire at least one frame" def test_acquire_with_random_prefix(self): """Random bits before first sync should be skipped.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=3) np.random.seed(77) garbage = list(np.random.randint(0, 2, size=200)) frame_bits = _make_multi_frame_bits(n_frames=4) bits = garbage + frame_bits frames = engine.process_bits(bits) assert len(frames) >= 1, "Should find sync after random prefix" def test_acquire_with_large_offset(self): """Even with a large garbage prefix, sync should be found.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=3) np.random.seed(88) garbage = list(np.random.randint(0, 2, size=2000)) frame_bits = _make_multi_frame_bits(n_frames=5) bits = garbage + frame_bits frames = engine.process_bits(bits) assert len(frames) >= 1 class TestComplementOnOdd: """Verify that the engine handles odd-frame core complementing.""" def test_even_frame_detected(self): """Even frame (normal core) should be detected.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=0) bits = _make_frame_bits(frame_id=2, odd=False) # Need enough frames to get through VERIFY bits2 = _make_frame_bits(frame_id=3, odd=True) bits3 = _make_frame_bits(frame_id=4, odd=False) bits4 = _make_frame_bits(frame_id=5, odd=True) frames = engine.process_bits(bits + bits2 + bits3 + bits4) assert len(frames) >= 1 # First frame should be even assert frames[0]["odd_frame"] is False def test_odd_frame_detected(self): """Odd frame (complemented core) should be detected.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=0) bits = _make_frame_bits(frame_id=1, odd=True) bits2 = _make_frame_bits(frame_id=2, odd=False) bits3 = _make_frame_bits(frame_id=3, odd=True) bits4 = _make_frame_bits(frame_id=4, odd=False) frames = engine.process_bits(bits + bits2 + bits3 + bits4) assert len(frames) >= 1 assert frames[0]["odd_frame"] is True def test_alternating_odd_even(self): """Multiple consecutive frames should alternate odd/even detection.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=0) all_bits = [] for i in range(6): fid = i + 1 odd = (fid % 2) == 1 all_bits.extend(_make_frame_bits(frame_id=fid, odd=odd)) frames = engine.process_bits(all_bits) assert len(frames) >= 3 for frame in frames: fid = frame["frame_id"] expected_odd = (fid % 2) == 1 assert frame["odd_frame"] == expected_odd, ( f"Frame {fid}: expected odd={expected_odd}, got {frame['odd_frame']}" ) class TestStateMachineTransitions: """Test SEARCH -> VERIFY -> LOCKED transitions.""" def test_starts_in_search(self): engine = FrameSyncEngine() assert engine.state == STATE_SEARCH def test_moves_to_verify_on_first_match(self): """First sync match should transition to VERIFY.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=3) bits = _make_frame_bits(frame_id=1, odd=True) # Process just the sync word to trigger SEARCH -> VERIFY engine.process_bits(bits[:PCM_SYNC_WORD_LENGTH]) assert engine.state == STATE_VERIFY def test_reaches_locked_after_verify(self): """After verify_count consecutive hits, should reach LOCKED.""" engine = FrameSyncEngine( bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=3, verify_count=2, ) all_bits = _make_multi_frame_bits(n_frames=5) engine.process_bits(all_bits) assert engine.state == STATE_LOCKED def test_drops_to_search_on_consecutive_misses(self): """Corrupting sync words should eventually drop back to SEARCH.""" engine = FrameSyncEngine( bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=0, # strict matching miss_limit=2, verify_count=2, ) # First, establish lock with clean frames clean = _make_multi_frame_bits(n_frames=5) engine.process_bits(clean) assert engine.state == STATE_LOCKED # Now feed frames with completely corrupted sync words frame_len = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH for _ in range(3): np.random.seed(42) bad_frame = list(np.random.randint(0, 2, size=frame_len)) engine.process_bits(bad_frame) assert engine.state == STATE_SEARCH class TestMaxBitErrors: """Test Hamming distance threshold for sync detection.""" def test_exact_match_required(self): """With max_bit_errors=0, only exact sync matches should work.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=0) bits = _make_multi_frame_bits(n_frames=4) frames = engine.process_bits(bits) assert len(frames) >= 1 # All frames should have full confidence for f in frames: assert f["sync_confidence"] == PCM_SYNC_WORD_LENGTH def test_tolerates_bit_errors(self): """With max_bit_errors=3, frames with up to 3 flipped sync bits should work.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=3) # Generate a clean frame and flip 2 bits in the sync word bits = _make_frame_bits(frame_id=2, odd=False) bits[5] ^= 1 # flip bit 5 bits[10] ^= 1 # flip bit 10 # Append more clean frames so the engine can VERIFY/LOCK bits2 = _make_frame_bits(frame_id=3, odd=True) bits3 = _make_frame_bits(frame_id=4, odd=False) bits4 = _make_frame_bits(frame_id=5, odd=True) frames = engine.process_bits(bits + bits2 + bits3 + bits4) assert len(frames) >= 1 def test_rejects_too_many_errors(self): """With max_bit_errors=0, a single flipped sync bit should prevent match.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=0) # Generate frames with 1 corrupted sync bit each all_bits = [] for i in range(4): fid = i + 1 odd = (fid % 2) == 1 frame = _make_frame_bits(frame_id=fid, odd=odd) frame[3] ^= 1 # flip one bit in sync all_bits.extend(frame) frames = engine.process_bits(all_bits) # With strict matching and corrupted syncs, should get no frames assert len(frames) == 0 class TestKnownPayloadRoundtrip: """Test that payload data survives the frame sync extraction.""" def test_payload_recovery(self): """Known payload should be recoverable from the output frame.""" np.random.seed(42) payload = bytes(np.random.randint(0, 256, size=124, dtype=np.uint8)) engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=3) # Generate 4 frames with the same payload to allow lock acquisition all_bits = [] for i in range(4): fid = i + 1 odd = (fid % 2) == 1 all_bits.extend(_make_frame_bits(frame_id=fid, odd=odd, data=payload)) frames = engine.process_bits(all_bits) assert len(frames) >= 1 # Check that the payload portion (bytes 4 onward) of at least one frame matches found_match = False for f in frames: frame_bytes = f["frame_bytes"] # Words 5-128 are bytes 4-127 (0-indexed) recovered_payload = frame_bytes[4:128] if recovered_payload == payload: found_match = True break assert found_match, "Payload not recovered correctly from any emitted frame" def test_frame_id_in_output(self): """Output metadata should contain the correct frame ID.""" engine = FrameSyncEngine(bit_rate=PCM_HIGH_BIT_RATE, max_bit_errors=3) all_bits = _make_multi_frame_bits(n_frames=5) frames = engine.process_bits(all_bits) assert len(frames) >= 1 for f in frames: assert 1 <= f["frame_id"] <= 50 class TestLowRateFrames: """Test with 200-word low-rate frames.""" def test_low_rate_frame_length(self): """Low-rate engine should expect 200-word frames.""" engine = FrameSyncEngine(bit_rate=PCM_LOW_BIT_RATE, max_bit_errors=3) assert engine.bits_per_frame == PCM_LOW_WORDS_PER_FRAME * PCM_WORD_LENGTH def test_low_rate_acquisition(self): """Should acquire low-rate frames (200 words each).""" engine = FrameSyncEngine(bit_rate=PCM_LOW_BIT_RATE, max_bit_errors=3) all_bits = [] for _i in range(4): frame = generate_pcm_frame( frame_id=1, odd=True, words_per_frame=PCM_LOW_WORDS_PER_FRAME, ) all_bits.extend(frame) frames = engine.process_bits(all_bits) assert len(frames) >= 1 # Frame should be 200 bytes for f in frames: assert len(f["frame_bytes"]) == PCM_LOW_WORDS_PER_FRAME