gr-apollo/tests/test_signal_gen.py
Ryan Malloy 0ee7ff0ad7 Implement full Apollo USB downlink decoder chain
Complete signal processing pipeline from complex baseband to decoded
PCM telemetry, verified against the 1965 NAA Study Guide (A-624):

Core demod (Phase 1):
  - PM demodulator with carrier PLL recovery
  - 1.024 MHz subcarrier extractor (bandpass + downconvert)
  - BPSK demodulator with Costas loop + symbol sync
  - Convenience hier_block2 combining subcarrier + BPSK

PCM frame processing (Phase 2):
  - 32-bit frame sync with Hamming distance correlator
  - SEARCH/VERIFY/LOCKED state machine, complement-on-odd handling
  - Frame demultiplexer (128-word, A/D voltage scaling)
  - AGC downlink decoder (15-bit word reassembly from DNTM1/DNTM2)

Voice and analog (Phase 3):
  - 1.25 MHz FM voice subcarrier demod to 8 kHz audio
  - SCO demodulator for 9 analog sensor channels (14.5-165 kHz)

Virtual AGC integration (Phase 4):
  - TCP bridge client with auto-reconnect and channel filtering
  - DSKY uplink encoder (VERB/NOUN/DATA command sequences)

Top-level receiver (Phase 5):
  - usb_downlink_receiver hier_block2: one block, complex in, PDUs out
  - 14 GRC block YAML definitions for GNU Radio Companion
  - Example scripts for signal analysis and full-chain demo

Infrastructure:
  - constants.py with all timing/frequency/frame parameters
  - protocol.py for sync word + AGC packet encode/decode
  - Synthetic USB signal generator for testing
  - 222 tests passing, ruff lint clean
2026-02-20 13:18:42 -07:00

156 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for the USB signal generator (pure numpy, no GNU Radio needed)."""
import numpy as np
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_HIGH_WORDS_PER_FRAME,
PCM_WORD_LENGTH,
SAMPLE_RATE_BASEBAND,
)
from apollo.usb_signal_gen import (
generate_bpsk_subcarrier,
generate_nrz_waveform,
generate_pcm_frame,
generate_usb_baseband,
)
class TestPCMFrameGeneration:
"""Test PCM frame bit generation."""
def test_frame_length_high_rate(self):
bits = generate_pcm_frame(frame_id=1, words_per_frame=128)
assert len(bits) == 128 * 8
def test_frame_length_low_rate(self):
bits = generate_pcm_frame(frame_id=1, words_per_frame=200)
assert len(bits) == 200 * 8
def test_frame_starts_with_sync(self):
"""First 32 bits should be the sync word."""
bits = generate_pcm_frame(frame_id=1)
# All bits should be 0 or 1
assert all(b in (0, 1) for b in bits[:32])
def test_known_payload(self):
"""With known data, data bits should match."""
data = bytes([0xAA, 0x55]) # 10101010, 01010101
bits = generate_pcm_frame(frame_id=1, data=data, words_per_frame=128)
# Data starts at bit 32 (after sync word)
data_bits = bits[32:48] # first two data bytes
expected = [1, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1]
assert data_bits == expected
def test_different_frame_ids(self):
"""Different frame IDs should produce different sync words."""
bits1 = generate_pcm_frame(frame_id=1)
bits2 = generate_pcm_frame(frame_id=2)
# At minimum, the frame ID field (last 6 bits of sync) differs
assert bits1[:32] != bits2[:32]
class TestNRZWaveform:
"""Test NRZ waveform generation."""
def test_output_length(self):
bits = [1, 0, 1, 0]
waveform = generate_nrz_waveform(bits, bit_rate=100, sample_rate=1000)
assert len(waveform) == 40 # 4 bits × 10 samples/bit
def test_nrz_levels(self):
"""Bit 1 → +1.0, bit 0 → -1.0."""
bits = [1, 0]
waveform = generate_nrz_waveform(bits, bit_rate=100, sample_rate=1000)
assert np.all(waveform[:10] == 1.0)
assert np.all(waveform[10:] == -1.0)
def test_dtype(self):
bits = [1, 0, 1]
waveform = generate_nrz_waveform(bits, bit_rate=100, sample_rate=1000)
assert waveform.dtype == np.float32
class TestBPSKSubcarrier:
"""Test BPSK subcarrier generation."""
def test_output_length(self):
nrz = np.array([1.0, -1.0, 1.0], dtype=np.float32)
bpsk = generate_bpsk_subcarrier(nrz, 1000.0, 10000.0)
assert len(bpsk) == 3
def test_amplitude(self):
"""BPSK signal should have amplitude ≤ 1.0."""
nrz = np.ones(1000, dtype=np.float32)
bpsk = generate_bpsk_subcarrier(nrz, 1_024_000, SAMPLE_RATE_BASEBAND)
assert np.max(np.abs(bpsk)) <= 1.001
class TestUSBBaseband:
"""Test complete baseband signal generation."""
def test_output_is_complex(self):
signal, _ = generate_usb_baseband(frames=1)
assert signal.dtype == np.complex64
def test_single_frame_duration(self):
"""1 frame at 51.2 kbps = 1024 bits → 1024/51200 = 0.02s → 102400 samples."""
signal, bits = generate_usb_baseband(frames=1)
expected_bits = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
expected_samples = int(expected_bits * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
assert len(signal) == expected_samples
def test_multi_frame(self):
signal, bits = generate_usb_baseband(frames=3)
assert len(bits) == 3
frame_samples = int(
PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE
)
assert len(signal) == 3 * frame_samples
def test_pm_envelope(self):
"""PM signal should have roughly constant envelope."""
signal, _ = generate_usb_baseband(frames=1, snr_db=None)
envelope = np.abs(signal)
assert np.std(envelope) < 0.01 # near-constant for PM
def test_noise_addition(self):
"""With noise, SNR should be approximately as requested."""
signal_clean, _ = generate_usb_baseband(frames=1, snr_db=None)
signal_noisy, _ = generate_usb_baseband(frames=1, snr_db=10.0)
# Noisy signal should have varying envelope
assert np.std(np.abs(signal_noisy)) > np.std(np.abs(signal_clean))
def test_voice_subcarrier(self):
"""With voice enabled, signal should contain 1.25 MHz energy."""
signal, _ = generate_usb_baseband(frames=2, voice_enabled=True)
# Check that the signal has voice subcarrier content via FFT
fft = np.fft.fft(signal[:50000])
freqs = np.fft.fftfreq(50000, 1.0 / SAMPLE_RATE_BASEBAND)
# Find power near 1.25 MHz
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
voice_power = np.mean(np.abs(fft[voice_mask]) ** 2)
# Should have detectable energy there
assert voice_power > 0
def test_frame_bits_returned(self):
"""Should return the bit patterns for each frame."""
_, bits = generate_usb_baseband(frames=3)
assert len(bits) == 3
for frame_bits in bits:
assert len(frame_bits) == PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
def test_spectral_content_pcm_subcarrier(self):
"""FFT should show energy at 1.024 MHz (PCM subcarrier)."""
signal, _ = generate_usb_baseband(frames=2)
# PM demod equivalent: extract phase
phase = np.angle(signal)
fft = np.fft.fft(phase[:50000])
freqs = np.fft.fftfreq(50000, 1.0 / SAMPLE_RATE_BASEBAND)
# Find power near 1.024 MHz
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
pcm_power = np.mean(np.abs(fft[pcm_mask]) ** 2)
# PCM subcarrier should dominate
total_power = np.mean(np.abs(fft) ** 2)
assert pcm_power > total_power * 0.01 # at least 1% of total in PCM band