Implement the transmit/generate side as streaming GNU Radio blocks, complementing the existing receive chain. Each block maps to a physical instrument on CuriousMarc's Keysight bench: pcm_frame_source - PCM bit stream generator (sync_block + FrameSourceEngine) nrz_encoder - bits to NRZ waveform (+1/-1) with upsampling bpsk_subcarrier_mod - NRZ x cos(1.024 MHz) BPSK modulator fm_voice_subcarrier_mod - 1.25 MHz FM test tone source pm_mod - phase modulator: exp(j * deviation * input) usb_signal_source - convenience wrapper wiring all blocks together Includes GRC YAML definitions for all blocks under [Apollo USB] category, 49 new tests (271 total, all passing), and a loopback test that validates the full TX->RX round trip including frame recovery with 30 dB AWGN.
114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
"""Tests for the USB signal source (complete transmit chain)."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
try:
|
|
from gnuradio import blocks, gr
|
|
|
|
HAS_GNURADIO = True
|
|
except ImportError:
|
|
HAS_GNURADIO = False
|
|
|
|
from apollo.constants import (
|
|
PCM_HIGH_BIT_RATE,
|
|
PCM_HIGH_WORDS_PER_FRAME,
|
|
PCM_WORD_LENGTH,
|
|
SAMPLE_RATE_BASEBAND,
|
|
)
|
|
|
|
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
|
|
|
|
|
|
class TestUSBSignalSource:
|
|
"""Test the convenience transmit wrapper."""
|
|
|
|
def _get_samples(self, n_samples, **kwargs):
|
|
"""Helper: run usb_signal_source and return complex samples."""
|
|
from apollo.usb_signal_source import usb_signal_source
|
|
|
|
tb = gr.top_block()
|
|
src = usb_signal_source(**kwargs)
|
|
head = blocks.head(gr.sizeof_gr_complex, n_samples)
|
|
snk = blocks.vector_sink_c()
|
|
|
|
tb.connect(src, head, snk)
|
|
tb.run()
|
|
|
|
return np.array(snk.data())
|
|
|
|
def test_block_instantiation(self):
|
|
"""Block should instantiate with default parameters."""
|
|
from apollo.usb_signal_source import usb_signal_source
|
|
|
|
src = usb_signal_source()
|
|
assert src is not None
|
|
|
|
def test_produces_complex_output(self):
|
|
"""Output should be complex-valued samples."""
|
|
n_samples = 51200 # ~10ms worth
|
|
data = self._get_samples(n_samples)
|
|
assert len(data) == n_samples
|
|
assert data.dtype == np.complex128 or data.dtype == np.complex64
|
|
|
|
def test_constant_envelope(self):
|
|
"""PM signal without noise should have near-constant envelope."""
|
|
n_samples = 102400 # 1 frame worth
|
|
data = self._get_samples(n_samples, snr_db=None)
|
|
envelope = np.abs(data)
|
|
# PM output: |exp(j*phi)| = 1.0 always
|
|
np.testing.assert_allclose(envelope, 1.0, atol=1e-4)
|
|
|
|
def test_spectral_content_pcm(self):
|
|
"""FFT of demodulated phase should show energy at 1.024 MHz."""
|
|
n_samples = 102400
|
|
data = self._get_samples(n_samples, snr_db=None)
|
|
|
|
# Extract phase (equivalent to PM demod)
|
|
phase = np.angle(data)
|
|
fft = np.fft.fft(phase)
|
|
freqs = np.fft.fftfreq(len(phase), 1.0 / SAMPLE_RATE_BASEBAND)
|
|
|
|
# Energy near 1.024 MHz
|
|
pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000)
|
|
pcm_power = np.mean(np.abs(fft[pcm_mask]) ** 2)
|
|
total_power = np.mean(np.abs(fft) ** 2)
|
|
assert pcm_power > total_power * 0.01
|
|
|
|
def test_with_voice(self):
|
|
"""With voice enabled, output should still be constant envelope."""
|
|
n_samples = 51200
|
|
data = self._get_samples(n_samples, voice_enabled=True, snr_db=None)
|
|
envelope = np.abs(data)
|
|
np.testing.assert_allclose(envelope, 1.0, atol=1e-4)
|
|
|
|
def test_with_noise(self):
|
|
"""With noise, envelope should vary (not constant)."""
|
|
n_samples = 51200
|
|
data = self._get_samples(n_samples, snr_db=10.0)
|
|
envelope = np.abs(data)
|
|
# With noise, std(envelope) should be > 0
|
|
assert np.std(envelope) > 0.01
|
|
|
|
def test_voice_spectral_content(self):
|
|
"""With voice, phase should contain 1.25 MHz energy."""
|
|
n_samples = 102400
|
|
data = self._get_samples(n_samples, voice_enabled=True, snr_db=None)
|
|
|
|
phase = np.angle(data)
|
|
fft = np.fft.fft(phase)
|
|
freqs = np.fft.fftfreq(len(phase), 1.0 / SAMPLE_RATE_BASEBAND)
|
|
|
|
# Energy near 1.25 MHz
|
|
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
|
|
voice_power = np.mean(np.abs(fft[voice_mask]) ** 2)
|
|
assert voice_power > 0
|
|
|
|
def test_frame_duration(self):
|
|
"""One frame at 51.2 kbps should produce the right number of samples."""
|
|
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
|
|
samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
|
|
|
|
data = self._get_samples(samples_per_frame)
|
|
assert len(data) == samples_per_frame
|