"""Tests for Apollo protocol utilities — sync words and AGC packets.""" import pytest from apollo.constants import ( AGC_CH_DNTM1, AGC_CH_INLINK, AGC_CH_OUTLINK, DEFAULT_SYNC_A, DEFAULT_SYNC_B, DEFAULT_SYNC_CORE, ) from apollo.protocol import ( adc_to_voltage, bits_to_sync_word, form_io_packet, generate_sync_word, parse_io_packet, parse_sync_word, sync_word_to_bits, voltage_to_adc, ) class TestSyncWordGeneration: """Test 32-bit PCM frame sync word generation and parsing.""" def test_roundtrip(self): """Generate → parse → verify all fields match.""" word = generate_sync_word(frame_id=1) fields = parse_sync_word(word) assert fields["a_bits"] == DEFAULT_SYNC_A assert fields["core"] == DEFAULT_SYNC_CORE assert fields["b_bits"] == DEFAULT_SYNC_B assert fields["frame_id"] == 1 def test_frame_id_range(self): """All valid frame IDs (1-50) should roundtrip.""" for fid in range(1, 51): word = generate_sync_word(frame_id=fid) fields = parse_sync_word(word) assert fields["frame_id"] == fid def test_invalid_frame_id(self): with pytest.raises(ValueError): generate_sync_word(frame_id=0) with pytest.raises(ValueError): generate_sync_word(frame_id=51) def test_odd_frame_complements_core(self): """Odd frames should have complemented core.""" even = generate_sync_word(frame_id=2, odd=False) odd = generate_sync_word(frame_id=1, odd=True) even_fields = parse_sync_word(even) odd_fields = parse_sync_word(odd) # Core should be bitwise complement (15 bits) assert (even_fields["core"] ^ odd_fields["core"]) == 0x7FFF def test_word_is_32_bits(self): word = generate_sync_word(frame_id=25) assert 0 <= word < (1 << 32) def test_bits_roundtrip(self): """word → bits → word should be identity.""" word = generate_sync_word(frame_id=42) bits = sync_word_to_bits(word) assert len(bits) == 32 assert all(b in (0, 1) for b in bits) recovered = bits_to_sync_word(bits) assert recovered == word def test_bits_msb_first(self): """Bit 0 in the list should be the MSB of the word.""" word = generate_sync_word(frame_id=1) bits = sync_word_to_bits(word) # MSB is bit 31 assert bits[0] == (word >> 31) & 1 class TestAGCPacketProtocol: """Test Virtual AGC socket protocol encode/decode.""" def test_roundtrip_basic(self): """Encode → decode should preserve channel and value.""" packet = form_io_packet(channel=AGC_CH_OUTLINK, value=12345) ch, val, u = parse_io_packet(packet) assert ch == AGC_CH_OUTLINK assert val == 12345 def test_roundtrip_all_telecom_channels(self): for ch in [AGC_CH_INLINK, AGC_CH_OUTLINK, AGC_CH_DNTM1]: for val in [0, 1, 0x3FFF, 0x7FFF]: packet = form_io_packet(channel=ch, value=val) got_ch, got_val, _ = parse_io_packet(packet) assert got_ch == ch, f"Channel mismatch: {got_ch} != {ch}" assert got_val == val, f"Value mismatch: {got_val} != {val}" def test_packet_length(self): packet = form_io_packet(channel=0, value=0) assert len(packet) == 4 def test_signature_bits(self): """Verify the 2-bit signatures in each byte.""" packet = form_io_packet(channel=100, value=5000) assert (packet[0] & 0xC0) == 0x00 assert (packet[1] & 0xC0) == 0x40 assert (packet[2] & 0xC0) == 0x80 assert (packet[3] & 0xC0) == 0xC0 def test_invalid_packet_length(self): with pytest.raises(ValueError): parse_io_packet(b"\x00\x40\x80") def test_invalid_signature(self): with pytest.raises(ValueError): parse_io_packet(b"\xFF\x40\x80\xC0") def test_zero_values(self): packet = form_io_packet(channel=0, value=0) ch, val, _ = parse_io_packet(packet) assert ch == 0 assert val == 0 def test_max_values(self): packet = form_io_packet(channel=0x1FF, value=0x7FFF) ch, val, _ = parse_io_packet(packet) assert ch == 0x1FF assert val == 0x7FFF class TestADCConversion: """Test A/D converter code ↔ voltage conversion (IMPL_SPEC section 5.3).""" def test_zero_code(self): """Code 1 = 0V.""" assert adc_to_voltage(1) == 0.0 def test_fullscale_code(self): """Code 254 = 4.98V.""" assert abs(adc_to_voltage(254) - 4.98) < 0.001 def test_overflow_code(self): """Code 255 = >5V (clamped to 5.0).""" assert adc_to_voltage(255) == 5.0 def test_midscale(self): """Midscale should be roughly 2.5V.""" mid_code = 128 voltage = adc_to_voltage(mid_code) assert abs(voltage - 2.5) < 0.1 # within 100mV def test_voltage_roundtrip(self): """voltage_to_adc(adc_to_voltage(code)) ≈ code for valid range.""" for code in [1, 50, 127, 200, 254]: v = adc_to_voltage(code) recovered = voltage_to_adc(v) assert abs(recovered - code) <= 1, f"Code {code}: {v}V → {recovered}" def test_low_level_scaling(self): """Low-level inputs use ×125 gain: 0-40 mV → 0-5V internal.""" # 40 mV at low-level = 40 * 125 = 5000 mV = 5V internal → code 254 v = adc_to_voltage(254, low_level=True) assert abs(v - 0.03984) < 0.001 # 4.98V / 125 ≈ 0.03984V def test_step_size(self): """Step size should be ~19.7 mV per LSB.""" v1 = adc_to_voltage(100) v2 = adc_to_voltage(101) step_mv = (v2 - v1) * 1000 assert abs(step_mv - 19.7) < 0.1