- sco_mod: 9-channel FM subcarrier oscillator modulator (inverse of sco_demod), with round-trip tests proving voltage recovery across all channels - fm_voice_subcarrier_mod: add audio_input parameter to accept external float streams (e.g., Apollo mission voice recordings) instead of internal test tone - loopback_demo.py: streaming TX->RX round-trip with CLI for noise/voice/frames - agc_loopback_demo.py: full Virtual AGC integration via TCP bridge
248 lines
9.0 KiB
Python
248 lines
9.0 KiB
Python
"""Tests for the FM voice subcarrier modulator block."""
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
try:
|
|
from gnuradio import blocks, gr
|
|
|
|
HAS_GNURADIO = True
|
|
except ImportError:
|
|
HAS_GNURADIO = False
|
|
|
|
from apollo.constants import SAMPLE_RATE_BASEBAND
|
|
|
|
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
|
|
|
|
|
|
class TestFmVoiceModInstantiation:
|
|
"""Test block creation and parameter handling."""
|
|
|
|
def test_default_parameters(self):
|
|
"""Block should instantiate with default parameters."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
mod = fm_voice_subcarrier_mod()
|
|
assert mod is not None
|
|
|
|
def test_custom_tone_freq(self):
|
|
"""Block should accept a custom tone frequency and produce output."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 10240
|
|
|
|
tb = gr.top_block()
|
|
src = fm_voice_subcarrier_mod(sample_rate=sample_rate, tone_freq=2000.0)
|
|
head = blocks.head(gr.sizeof_float, n_samples)
|
|
snk = blocks.vector_sink_f()
|
|
tb.connect(src, head, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
assert len(data) == n_samples
|
|
assert np.any(data != 0), "Output is all zeros with tone_freq=2000"
|
|
|
|
def test_properties(self):
|
|
"""Properties should reflect constructor arguments."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
mod = fm_voice_subcarrier_mod(
|
|
tone_freq=1500.0,
|
|
subcarrier_freq=1_000_000,
|
|
fm_deviation=20_000,
|
|
)
|
|
assert mod.tone_freq == 1500.0
|
|
assert mod.subcarrier_freq == 1_000_000
|
|
assert mod.fm_deviation == 20_000
|
|
|
|
|
|
class TestFmVoiceModFunctional:
|
|
"""Functional tests with signal analysis."""
|
|
|
|
def test_produces_output(self):
|
|
"""Source block should produce non-zero float output."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
|
|
tb = gr.top_block()
|
|
src = fm_voice_subcarrier_mod(sample_rate=sample_rate)
|
|
head = blocks.head(gr.sizeof_float, n_samples)
|
|
snk = blocks.vector_sink_f()
|
|
tb.connect(src, head, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}"
|
|
assert np.any(data != 0), "Output is all zeros"
|
|
|
|
def test_output_is_float(self):
|
|
"""Output samples should be real-valued floats."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 1024
|
|
|
|
tb = gr.top_block()
|
|
src = fm_voice_subcarrier_mod(sample_rate=sample_rate)
|
|
head = blocks.head(gr.sizeof_float, n_samples)
|
|
snk = blocks.vector_sink_f()
|
|
tb.connect(src, head, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
assert data.dtype in (np.float32, np.float64), (
|
|
f"Expected float output, got {data.dtype}"
|
|
)
|
|
|
|
def test_spectral_energy_at_subcarrier(self):
|
|
"""Most spectral energy should be near the 1.25 MHz subcarrier."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200 # ~10 ms
|
|
|
|
tb = gr.top_block()
|
|
src = fm_voice_subcarrier_mod(sample_rate=sample_rate)
|
|
head = blocks.head(gr.sizeof_float, n_samples)
|
|
snk = blocks.vector_sink_f()
|
|
tb.connect(src, head, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
fft_vals = np.fft.fft(data)
|
|
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
|
|
|
|
# Energy in the 1.2-1.3 MHz band (subcarrier +/- deviation margin)
|
|
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
|
|
voice_power = np.mean(np.abs(fft_vals[voice_mask]) ** 2)
|
|
total_power = np.mean(np.abs(fft_vals) ** 2)
|
|
|
|
assert voice_power > total_power * 0.1, (
|
|
f"Subcarrier band power ({voice_power:.1f}) is less than 10% of "
|
|
f"total power ({total_power:.1f})"
|
|
)
|
|
|
|
def test_output_bounded(self):
|
|
"""Output amplitude should stay bounded (not blow up)."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
|
|
tb = gr.top_block()
|
|
src = fm_voice_subcarrier_mod(sample_rate=sample_rate)
|
|
head = blocks.head(gr.sizeof_float, n_samples)
|
|
snk = blocks.vector_sink_f()
|
|
tb.connect(src, head, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
peak = np.max(np.abs(data))
|
|
# FM on a cosine carrier: peak should be around 1.0, certainly < 2.0
|
|
assert peak < 2.0, f"Output peak amplitude {peak:.3f} exceeds expected bound"
|
|
assert peak > 0.1, f"Output peak amplitude {peak:.3f} is suspiciously low"
|
|
|
|
|
|
class TestFmVoiceModExternalAudio:
|
|
"""Tests for external audio input mode."""
|
|
|
|
def test_default_is_source(self):
|
|
"""Default mode should be source (no input, backward compatible)."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
mod = fm_voice_subcarrier_mod()
|
|
assert not mod.audio_input
|
|
|
|
def test_external_audio_instantiation(self):
|
|
"""Block with audio_input=True should instantiate."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
mod = fm_voice_subcarrier_mod(audio_input=True)
|
|
assert mod is not None
|
|
|
|
def test_external_audio_property(self):
|
|
"""audio_input property should reflect constructor arg."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
mod_ext = fm_voice_subcarrier_mod(audio_input=True)
|
|
assert mod_ext.audio_input is True
|
|
|
|
mod_int = fm_voice_subcarrier_mod(audio_input=False)
|
|
assert mod_int.audio_input is False
|
|
|
|
def test_external_audio_produces_output(self):
|
|
"""Feed a 1 kHz sine wave into external input, verify output."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
t = np.arange(n_samples, dtype=np.float32) / sample_rate
|
|
audio = np.sin(2 * np.pi * 1000 * t).astype(np.float32)
|
|
|
|
tb = gr.top_block()
|
|
src = blocks.vector_source_f(audio.tolist())
|
|
mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True)
|
|
snk = blocks.vector_sink_f()
|
|
tb.connect(src, mod, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}"
|
|
assert np.any(data != 0), "Output is all zeros with external audio input"
|
|
|
|
def test_external_audio_spectral_energy(self):
|
|
"""Feed audio, verify spectral energy near 1.25 MHz subcarrier."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
t = np.arange(n_samples, dtype=np.float32) / sample_rate
|
|
audio = np.sin(2 * np.pi * 1000 * t).astype(np.float32)
|
|
|
|
tb = gr.top_block()
|
|
src = blocks.vector_source_f(audio.tolist())
|
|
mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True)
|
|
snk = blocks.vector_sink_f()
|
|
tb.connect(src, mod, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
fft_vals = np.fft.fft(data)
|
|
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
|
|
|
|
# Energy in the 1.2-1.3 MHz band (subcarrier +/- deviation margin)
|
|
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
|
|
voice_power = np.mean(np.abs(fft_vals[voice_mask]) ** 2)
|
|
total_power = np.mean(np.abs(fft_vals) ** 2)
|
|
|
|
assert voice_power > total_power * 0.1, (
|
|
f"Subcarrier band power ({voice_power:.1f}) is less than 10% of "
|
|
f"total power ({total_power:.1f})"
|
|
)
|
|
|
|
def test_external_audio_silence(self):
|
|
"""Feed zeros (silence), verify output still present (carrier only)."""
|
|
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
|
|
|
|
sample_rate = SAMPLE_RATE_BASEBAND
|
|
n_samples = 51200
|
|
silence = np.zeros(n_samples, dtype=np.float32)
|
|
|
|
tb = gr.top_block()
|
|
src = blocks.vector_source_f(silence.tolist())
|
|
mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True)
|
|
snk = blocks.vector_sink_f()
|
|
tb.connect(src, mod, snk)
|
|
tb.run()
|
|
|
|
data = np.array(snk.data())
|
|
assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}"
|
|
# With silence input, FM deviation is zero so the output is an
|
|
# unmodulated carrier at the subcarrier frequency -- still non-zero.
|
|
assert np.any(data != 0), "Output is all zeros with silence input"
|
|
peak = np.max(np.abs(data))
|
|
assert peak > 0.1, f"Carrier amplitude {peak:.3f} is suspiciously low"
|