Complete signal processing pipeline from complex baseband to decoded PCM telemetry, verified against the 1965 NAA Study Guide (A-624): Core demod (Phase 1): - PM demodulator with carrier PLL recovery - 1.024 MHz subcarrier extractor (bandpass + downconvert) - BPSK demodulator with Costas loop + symbol sync - Convenience hier_block2 combining subcarrier + BPSK PCM frame processing (Phase 2): - 32-bit frame sync with Hamming distance correlator - SEARCH/VERIFY/LOCKED state machine, complement-on-odd handling - Frame demultiplexer (128-word, A/D voltage scaling) - AGC downlink decoder (15-bit word reassembly from DNTM1/DNTM2) Voice and analog (Phase 3): - 1.25 MHz FM voice subcarrier demod to 8 kHz audio - SCO demodulator for 9 analog sensor channels (14.5-165 kHz) Virtual AGC integration (Phase 4): - TCP bridge client with auto-reconnect and channel filtering - DSKY uplink encoder (VERB/NOUN/DATA command sequences) Top-level receiver (Phase 5): - usb_downlink_receiver hier_block2: one block, complex in, PDUs out - 14 GRC block YAML definitions for GNU Radio Companion - Example scripts for signal analysis and full-chain demo Infrastructure: - constants.py with all timing/frequency/frame parameters - protocol.py for sync word + AGC packet encode/decode - Synthetic USB signal generator for testing - 222 tests passing, ruff lint clean
156 lines
5.9 KiB
Python
156 lines
5.9 KiB
Python
"""Tests for the USB signal generator (pure numpy, no GNU Radio needed)."""
|
||
|
||
import numpy as np
|
||
|
||
from apollo.constants import (
|
||
PCM_HIGH_BIT_RATE,
|
||
PCM_HIGH_WORDS_PER_FRAME,
|
||
PCM_WORD_LENGTH,
|
||
SAMPLE_RATE_BASEBAND,
|
||
)
|
||
from apollo.usb_signal_gen import (
|
||
generate_bpsk_subcarrier,
|
||
generate_nrz_waveform,
|
||
generate_pcm_frame,
|
||
generate_usb_baseband,
|
||
)
|
||
|
||
|
||
class TestPCMFrameGeneration:
|
||
"""Test PCM frame bit generation."""
|
||
|
||
def test_frame_length_high_rate(self):
|
||
bits = generate_pcm_frame(frame_id=1, words_per_frame=128)
|
||
assert len(bits) == 128 * 8
|
||
|
||
def test_frame_length_low_rate(self):
|
||
bits = generate_pcm_frame(frame_id=1, words_per_frame=200)
|
||
assert len(bits) == 200 * 8
|
||
|
||
def test_frame_starts_with_sync(self):
|
||
"""First 32 bits should be the sync word."""
|
||
bits = generate_pcm_frame(frame_id=1)
|
||
# All bits should be 0 or 1
|
||
assert all(b in (0, 1) for b in bits[:32])
|
||
|
||
def test_known_payload(self):
|
||
"""With known data, data bits should match."""
|
||
data = bytes([0xAA, 0x55]) # 10101010, 01010101
|
||
bits = generate_pcm_frame(frame_id=1, data=data, words_per_frame=128)
|
||
# Data starts at bit 32 (after sync word)
|
||
data_bits = bits[32:48] # first two data bytes
|
||
expected = [1, 0, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 1, 0, 1]
|
||
assert data_bits == expected
|
||
|
||
def test_different_frame_ids(self):
|
||
"""Different frame IDs should produce different sync words."""
|
||
bits1 = generate_pcm_frame(frame_id=1)
|
||
bits2 = generate_pcm_frame(frame_id=2)
|
||
# At minimum, the frame ID field (last 6 bits of sync) differs
|
||
assert bits1[:32] != bits2[:32]
|
||
|
||
|
||
class TestNRZWaveform:
|
||
"""Test NRZ waveform generation."""
|
||
|
||
def test_output_length(self):
|
||
bits = [1, 0, 1, 0]
|
||
waveform = generate_nrz_waveform(bits, bit_rate=100, sample_rate=1000)
|
||
assert len(waveform) == 40 # 4 bits × 10 samples/bit
|
||
|
||
def test_nrz_levels(self):
|
||
"""Bit 1 → +1.0, bit 0 → -1.0."""
|
||
bits = [1, 0]
|
||
waveform = generate_nrz_waveform(bits, bit_rate=100, sample_rate=1000)
|
||
assert np.all(waveform[:10] == 1.0)
|
||
assert np.all(waveform[10:] == -1.0)
|
||
|
||
def test_dtype(self):
|
||
bits = [1, 0, 1]
|
||
waveform = generate_nrz_waveform(bits, bit_rate=100, sample_rate=1000)
|
||
assert waveform.dtype == np.float32
|
||
|
||
|
||
class TestBPSKSubcarrier:
|
||
"""Test BPSK subcarrier generation."""
|
||
|
||
def test_output_length(self):
|
||
nrz = np.array([1.0, -1.0, 1.0], dtype=np.float32)
|
||
bpsk = generate_bpsk_subcarrier(nrz, 1000.0, 10000.0)
|
||
assert len(bpsk) == 3
|
||
|
||
def test_amplitude(self):
|
||
"""BPSK signal should have amplitude ≤ 1.0."""
|
||
nrz = np.ones(1000, dtype=np.float32)
|
||
bpsk = generate_bpsk_subcarrier(nrz, 1_024_000, SAMPLE_RATE_BASEBAND)
|
||
assert np.max(np.abs(bpsk)) <= 1.001
|
||
|
||
|
||
class TestUSBBaseband:
|
||
"""Test complete baseband signal generation."""
|
||
|
||
def test_output_is_complex(self):
|
||
signal, _ = generate_usb_baseband(frames=1)
|
||
assert signal.dtype == np.complex64
|
||
|
||
def test_single_frame_duration(self):
|
||
"""1 frame at 51.2 kbps = 1024 bits → 1024/51200 = 0.02s → 102400 samples."""
|
||
signal, bits = generate_usb_baseband(frames=1)
|
||
expected_bits = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
|
||
expected_samples = int(expected_bits * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
|
||
assert len(signal) == expected_samples
|
||
|
||
def test_multi_frame(self):
|
||
signal, bits = generate_usb_baseband(frames=3)
|
||
assert len(bits) == 3
|
||
frame_samples = int(
|
||
PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE
|
||
)
|
||
assert len(signal) == 3 * frame_samples
|
||
|
||
def test_pm_envelope(self):
|
||
"""PM signal should have roughly constant envelope."""
|
||
signal, _ = generate_usb_baseband(frames=1, snr_db=None)
|
||
envelope = np.abs(signal)
|
||
assert np.std(envelope) < 0.01 # near-constant for PM
|
||
|
||
def test_noise_addition(self):
|
||
"""With noise, SNR should be approximately as requested."""
|
||
signal_clean, _ = generate_usb_baseband(frames=1, snr_db=None)
|
||
signal_noisy, _ = generate_usb_baseband(frames=1, snr_db=10.0)
|
||
# Noisy signal should have varying envelope
|
||
assert np.std(np.abs(signal_noisy)) > np.std(np.abs(signal_clean))
|
||
|
||
def test_voice_subcarrier(self):
|
||
"""With voice enabled, signal should contain 1.25 MHz energy."""
|
||
signal, _ = generate_usb_baseband(frames=2, voice_enabled=True)
|
||
# Check that the signal has voice subcarrier content via FFT
|
||
fft = np.fft.fft(signal[:50000])
|
||
freqs = np.fft.fftfreq(50000, 1.0 / SAMPLE_RATE_BASEBAND)
|
||
# Find power near 1.25 MHz
|
||
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
|
||
voice_power = np.mean(np.abs(fft[voice_mask]) ** 2)
|
||
# Should have detectable energy there
|
||
assert voice_power > 0
|
||
|
||
def test_frame_bits_returned(self):
|
||
"""Should return the bit patterns for each frame."""
|
||
_, bits = generate_usb_baseband(frames=3)
|
||
assert len(bits) == 3
|
||
for frame_bits in bits:
|
||
assert len(frame_bits) == PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
|
||
|
||
def test_spectral_content_pcm_subcarrier(self):
|
||
"""FFT should show energy at 1.024 MHz (PCM subcarrier)."""
|
||
signal, _ = generate_usb_baseband(frames=2)
|
||
# PM demod equivalent: extract phase
|
||
phase = np.angle(signal)
|
||
fft = np.fft.fft(phase[:50000])
|
||
freqs = np.fft.fftfreq(50000, 1.0 / SAMPLE_RATE_BASEBAND)
|
||
# Find power near 1.024 MHz
|
||
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
|
||
pcm_power = np.mean(np.abs(fft[pcm_mask]) ** 2)
|
||
# PCM subcarrier should dominate
|
||
total_power = np.mean(np.abs(fft) ** 2)
|
||
assert pcm_power > total_power * 0.01 # at least 1% of total in PCM band
|