"""Tests for the voice subcarrier demodulator block.""" import numpy as np import pytest try: from gnuradio import blocks, gr HAS_GNURADIO = True except ImportError: HAS_GNURADIO = False from apollo.constants import ( SAMPLE_RATE_BASEBAND, VOICE_SUBCARRIER_HZ, ) from apollo.usb_signal_gen import generate_fm_voice_subcarrier pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed") class TestVoiceDemodInstantiation: """Test block creation and parameter handling.""" def test_default_parameters(self): """Block should instantiate with default parameters.""" from apollo.voice_subcarrier_demod import voice_subcarrier_demod demod = voice_subcarrier_demod() assert demod is not None def test_custom_sample_rate(self): """Block should accept a custom sample rate.""" from apollo.voice_subcarrier_demod import voice_subcarrier_demod demod = voice_subcarrier_demod(sample_rate=10_240_000, audio_rate=16000) assert demod is not None assert demod.output_sample_rate == 16000 def test_output_sample_rate_property(self): """Output sample rate should match the requested audio rate.""" from apollo.voice_subcarrier_demod import voice_subcarrier_demod demod = voice_subcarrier_demod(audio_rate=8000) assert demod.output_sample_rate == 8000.0 class TestVoiceDemodFunctional: """Functional tests with synthetic FM voice signals.""" def test_fm_voice_produces_output(self): """An FM voice signal at 1.25 MHz should produce non-trivial audio output.""" from apollo.voice_subcarrier_demod import voice_subcarrier_demod tb = gr.top_block() sample_rate = SAMPLE_RATE_BASEBAND # Generate a 1.25 MHz FM subcarrier with 1 kHz tone, enough for # several audio cycles to pass through the 300-3000 Hz BPF. # At 8 kHz output, we need at least a few ms of signal. # 200ms of input gives ~1600 output samples at 8 kHz. n_samples = int(sample_rate * 0.2) voice_signal = generate_fm_voice_subcarrier( n_samples=n_samples, sample_rate=sample_rate, tone_freq=1000.0, ) src = blocks.vector_source_f(voice_signal.tolist()) demod = voice_subcarrier_demod(sample_rate=sample_rate, audio_rate=8000) snk = blocks.vector_sink_f() tb.connect(src, demod, snk) tb.run() output = np.array(snk.data()) assert len(output) > 0, "Demodulator produced no output" # After filter settling, there should be energy in the output. # Skip the first 25% for filter transients. settled = output[len(output) // 4 :] if len(settled) > 10: rms = np.sqrt(np.mean(settled**2)) assert rms > 1e-6, f"Output RMS too low: {rms} -- no audio recovered" def test_1khz_tone_spectral_peak(self): """A 1 kHz FM tone should produce audio with energy near 1 kHz.""" from apollo.voice_subcarrier_demod import voice_subcarrier_demod tb = gr.top_block() sample_rate = SAMPLE_RATE_BASEBAND audio_rate = 8000 tone_freq = 1000.0 # 500ms of signal for decent frequency resolution n_samples = int(sample_rate * 0.5) voice_signal = generate_fm_voice_subcarrier( n_samples=n_samples, sample_rate=sample_rate, tone_freq=tone_freq, ) src = blocks.vector_source_f(voice_signal.tolist()) demod = voice_subcarrier_demod(sample_rate=sample_rate, audio_rate=audio_rate) snk = blocks.vector_sink_f() tb.connect(src, demod, snk) tb.run() output = np.array(snk.data()) assert len(output) > 100, f"Too few output samples: {len(output)}" # Skip transients, use the last 75% settled = output[len(output) // 4 :] if len(settled) < 64: pytest.skip("Not enough settled samples for spectral analysis") # FFT to find the dominant frequency fft_vals = np.abs(np.fft.rfft(settled)) freqs = np.fft.rfftfreq(len(settled), d=1.0 / audio_rate) # Find peak frequency (skip DC bin) peak_idx = np.argmax(fft_vals[1:]) + 1 peak_freq = freqs[peak_idx] # The recovered tone should be within 200 Hz of 1 kHz assert abs(peak_freq - tone_freq) < 200, ( f"Peak frequency {peak_freq:.1f} Hz is not near {tone_freq} Hz" ) def test_no_output_on_silence(self): """A constant (unmodulated) carrier should produce near-zero audio.""" from apollo.voice_subcarrier_demod import voice_subcarrier_demod tb = gr.top_block() sample_rate = SAMPLE_RATE_BASEBAND # Unmodulated 1.25 MHz carrier (no FM deviation) n_samples = int(sample_rate * 0.1) t = np.arange(n_samples, dtype=np.float64) / sample_rate carrier = np.cos(2.0 * np.pi * VOICE_SUBCARRIER_HZ * t).astype(np.float32) src = blocks.vector_source_f(carrier.tolist()) demod = voice_subcarrier_demod(sample_rate=sample_rate, audio_rate=8000) snk = blocks.vector_sink_f() tb.connect(src, demod, snk) tb.run() output = np.array(snk.data()) if len(output) > 20: settled = output[len(output) // 4 :] if len(settled) > 0: rms = np.sqrt(np.mean(settled**2)) # Unmodulated carrier -> near-zero audio (just noise floor) # Be generous with the threshold since filter transients exist assert rms < 1.0, f"Unmodulated carrier produced too much audio: RMS={rms}"