"""Tests for the BPSK subcarrier modulator block.""" import numpy as np import pytest try: from gnuradio import blocks, gr HAS_GNURADIO = True except ImportError: HAS_GNURADIO = False from apollo.constants import PCM_SUBCARRIER_HZ, SAMPLE_RATE_BASEBAND pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") class TestBPSKSubcarrierMod: """Test BPSK subcarrier modulation with synthetic NRZ inputs.""" def test_block_instantiation(self): """Block should instantiate with default parameters.""" from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod mod = bpsk_subcarrier_mod() assert mod is not None assert mod.subcarrier_freq == PCM_SUBCARRIER_HZ assert mod.sample_rate == SAMPLE_RATE_BASEBAND def test_constant_positive_input(self): """All +1.0 input should produce a pure cosine at 1.024 MHz.""" from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod sample_rate = SAMPLE_RATE_BASEBAND n_samples = 51200 tb = gr.top_block() src = blocks.vector_source_f([1.0] * n_samples) mod = bpsk_subcarrier_mod( subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate, ) snk = blocks.vector_sink_f() tb.connect(src, mod, snk) tb.run() data = np.array(snk.data()) assert len(data) == n_samples # FFT: spectral energy should concentrate at 1.024 MHz fft_vals = np.fft.fft(data) freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000) pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2) total_power = np.mean(np.abs(fft_vals) ** 2) assert pcm_power > total_power * 0.1, ( f"PCM band power ({pcm_power:.1f}) is less than 10% of " f"total power ({total_power:.1f})" ) def test_constant_negative_input(self): """All -1.0 input should produce -cos (inverted cosine) at 1.024 MHz.""" from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod sample_rate = SAMPLE_RATE_BASEBAND n_samples = 51200 tb = gr.top_block() src = blocks.vector_source_f([-1.0] * n_samples) mod = bpsk_subcarrier_mod( subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate, ) snk = blocks.vector_sink_f() tb.connect(src, mod, snk) tb.run() data = np.array(snk.data()) assert len(data) == n_samples # Inverted cosine still has energy at 1.024 MHz fft_vals = np.fft.fft(data) freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000) pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2) total_power = np.mean(np.abs(fft_vals) ** 2) assert pcm_power > total_power * 0.1, ( f"PCM band power ({pcm_power:.1f}) is less than 10% of " f"total power ({total_power:.1f})" ) def test_alternating_input_spectrum(self): """Alternating +1/-1 NRZ should still have spectral peak near subcarrier.""" from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod sample_rate = SAMPLE_RATE_BASEBAND # Samples per bit at high rate: 5_120_000 / 51_200 = 100 samples_per_bit = int(sample_rate / 51_200) n_bits = 512 n_samples = n_bits * samples_per_bit # Build alternating NRZ: +1 for 100 samples, -1 for 100, ... nrz = [] for i in range(n_bits): val = 1.0 if i % 2 == 0 else -1.0 nrz.extend([val] * samples_per_bit) tb = gr.top_block() src = blocks.vector_source_f(nrz) mod = bpsk_subcarrier_mod( subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate, ) snk = blocks.vector_sink_f() tb.connect(src, mod, snk) tb.run() data = np.array(snk.data()) assert len(data) == n_samples # BPSK with alternating data spreads energy around subcarrier +/- bit_rate, # but the band near 1.024 MHz should still carry significant power fft_vals = np.fft.fft(data) freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) pcm_mask = (np.abs(freqs) > 950_000) & (np.abs(freqs) < 1_100_000) pcm_power = np.mean(np.abs(fft_vals[pcm_mask]) ** 2) total_power = np.mean(np.abs(fft_vals) ** 2) assert pcm_power > total_power * 0.1, ( f"PCM band power ({pcm_power:.1f}) is less than 10% of " f"total power ({total_power:.1f})" ) def test_amplitude_bounded(self): """Output amplitude should be <= 1.0 (product of +/-1 and cos).""" from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod sample_rate = SAMPLE_RATE_BASEBAND n_samples = 51200 tb = gr.top_block() src = blocks.vector_source_f([1.0] * n_samples) mod = bpsk_subcarrier_mod( subcarrier_freq=PCM_SUBCARRIER_HZ, sample_rate=sample_rate, ) snk = blocks.vector_sink_f() tb.connect(src, mod, snk) tb.run() data = np.array(snk.data()) peak = np.max(np.abs(data)) # cos has peak 1.0, NRZ is +/-1.0, product peak should be ~1.0 assert peak <= 1.0 + 1e-6, ( f"Output peak amplitude {peak:.6f} exceeds 1.0" ) assert peak > 0.9, ( f"Output peak amplitude {peak:.6f} is suspiciously low" ) def test_custom_subcarrier_freq(self): """Custom subcarrier frequency should shift spectral peak.""" from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod sample_rate = SAMPLE_RATE_BASEBAND n_samples = 51200 custom_freq = 500_000 # 500 kHz tb = gr.top_block() src = blocks.vector_source_f([1.0] * n_samples) mod = bpsk_subcarrier_mod( subcarrier_freq=custom_freq, sample_rate=sample_rate, ) snk = blocks.vector_sink_f() tb.connect(src, mod, snk) tb.run() data = np.array(snk.data()) fft_vals = np.fft.fft(data) freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate) # Energy should be near 500 kHz, not 1.024 MHz custom_mask = (np.abs(freqs) > 450_000) & (np.abs(freqs) < 550_000) custom_power = np.mean(np.abs(fft_vals[custom_mask]) ** 2) total_power = np.mean(np.abs(fft_vals) ** 2) assert custom_power > total_power * 0.1, ( f"Custom freq band power ({custom_power:.1f}) is less than 10% of " f"total power ({total_power:.1f})" )