Add SCO modulator, external audio input, and demo scripts

- sco_mod: 9-channel FM subcarrier oscillator modulator (inverse of sco_demod),
  with round-trip tests proving voltage recovery across all channels
- fm_voice_subcarrier_mod: add audio_input parameter to accept external float
  streams (e.g., Apollo mission voice recordings) instead of internal test tone
- loopback_demo.py: streaming TX->RX round-trip with CLI for noise/voice/frames
- agc_loopback_demo.py: full Virtual AGC integration via TCP bridge
This commit is contained in:
Ryan Malloy 2026-02-22 13:01:48 -07:00
parent 493c21c511
commit cd3a8cc6be
9 changed files with 845 additions and 27 deletions

View File

@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
Apollo AGC Integration Demo -- full communications loop with Virtual AGC.
Demonstrates the complete Apollo unified S-band communications path:
yaAGC (emulator)
| DNTM1/DNTM2 telemetry via TCP
v
agc_bridge
| PDU message
v
downlink_decoder --> print decoded telemetry
^
| PDU frames
usb_downlink_receiver (RX chain)
^
| complex baseband
usb_signal_source (TX chain)
And the uplink path:
DSKY commands
|
uplink_encoder
| (channel, value) pairs
v
agc_bridge --> yaAGC (INLINK channel 045)
Prerequisites:
1. Install Virtual AGC: https://www.ibiblio.org/apollo/
2. Start yaAGC with a mission (e.g., Luminary099 for Apollo 11 LM):
$ yaAGC --core=Luminary099.bin --port=19697
3. Optionally start yaDSKY2 for visual display:
$ yaDSKY2 --port=19698
Usage:
uv run python examples/agc_loopback_demo.py
uv run python examples/agc_loopback_demo.py --host 192.168.1.100
uv run python examples/agc_loopback_demo.py --port 19697
uv run python examples/agc_loopback_demo.py --send-v16n36 # request time display
"""
import argparse
import sys
import time
from apollo.agc_bridge import AGCBridgeClient
from apollo.constants import (
AGC_CH_DNTM1,
AGC_CH_DNTM2,
AGC_CH_OUTLINK,
AGC_PORT_BASE,
)
from apollo.downlink_decoder import DownlinkEngine
from apollo.uplink_encoder import UplinkEncoder
def main():
parser = argparse.ArgumentParser(
description="Apollo AGC integration demo -- connect to yaAGC emulator"
)
parser.add_argument("--host", default="localhost", help="yaAGC host (default: localhost)")
parser.add_argument("--port", type=int, default=AGC_PORT_BASE,
help="yaAGC port (default: 19697)")
parser.add_argument("--duration", type=float, default=10.0, help="Run duration in seconds")
parser.add_argument("--send-v16n36", action="store_true",
help="Send V16N36E (display time) to AGC")
args = parser.parse_args()
print("=" * 60)
print("Apollo AGC Integration Demo")
print("=" * 60)
print(f" Target: {args.host}:{args.port}")
print(f" Duration: {args.duration} seconds")
print()
# Downlink decoder accumulates telemetry words
decoder = DownlinkEngine()
packet_count = 0
telemetry_words = 0
def on_packet(channel: int, value: int):
nonlocal packet_count, telemetry_words
packet_count += 1
if channel in (AGC_CH_DNTM1, AGC_CH_DNTM2):
telemetry_words += 1
decoder.feed_agc_word(channel, value)
elif channel == AGC_CH_OUTLINK:
print(f" OUTLINK: ch={channel:03o} val={value:05o} ({value})")
def on_status(state: str):
print(f" Connection: {state}")
# Connect to yaAGC
client = AGCBridgeClient(
host=args.host,
port=args.port,
channel_filter=None, # accept all channels for this demo
on_packet=on_packet,
on_status=on_status,
)
print(f"Connecting to yaAGC at {args.host}:{args.port}...")
client.start()
# Wait for connection
for _ in range(20): # 10 seconds max
if client.connected:
break
time.sleep(0.5)
if not client.connected:
print()
print("Could not connect to yaAGC.")
print()
print("Make sure yaAGC is running:")
print(f" yaAGC --core=Luminary099.bin --port={args.port}")
print()
print("Or try a different host/port:")
print(" python examples/agc_loopback_demo.py --host <ip> --port <port>")
client.stop()
sys.exit(1)
print()
# Optionally send a DSKY command
if args.send_v16n36:
print("Sending V16N36E (display time)...")
encoder = UplinkEncoder()
pairs = encoder.encode_verb_noun(verb=16, noun=36)
for channel, value in pairs:
client.send(channel, value)
time.sleep(0.1) # pace for UPRUPT processing
print(f" Sent {len(pairs)} uplink words")
print()
# Collect telemetry for the specified duration
print(f"Collecting telemetry for {args.duration} seconds...")
print("-" * 60)
start_time = time.time()
last_snapshot_count = 0
try:
while time.time() - start_time < args.duration:
time.sleep(0.5)
# Check for new telemetry snapshots
snapshots = decoder._completed_snapshots
if len(snapshots) > last_snapshot_count:
for snap in snapshots[last_snapshot_count:]:
list_type = snap.get("list_type_id", "?")
list_name = snap.get("list_name", "Unknown")
n_words = snap.get("word_count", 0)
print(f" Telemetry snapshot: {list_name} "
f"(type {list_type}), {n_words} words")
# Show first few words
words = snap.get("words", [])
for i, val in enumerate(words[:5]):
print(f" [{i:03d}] = {val:05o} ({val})")
if len(words) > 5:
print(f" ... ({len(words) - 5} more words)")
last_snapshot_count = len(snapshots)
except KeyboardInterrupt:
print()
print("Interrupted.")
print("-" * 60)
print()
print("Summary:")
print(f" Total packets received: {packet_count}")
print(f" Telemetry words: {telemetry_words}")
print(f" Telemetry snapshots: {len(decoder._completed_snapshots)}")
print(f" Duration: {time.time() - start_time:.1f} seconds")
client.stop()
print()
print("Done.")
if __name__ == "__main__":
main()

133
examples/loopback_demo.py Normal file
View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
"""
Apollo USB Loopback Demo -- streaming TX -> RX round-trip.
Demonstrates the full gr-apollo block chain using GNU Radio streaming blocks:
TX: pcm_frame_source -> nrz_encoder -> bpsk_subcarrier_mod -> pm_mod
RX: pm_demod -> subcarrier_extract -> bpsk_demod -> pcm_frame_sync -> pcm_demux
All wrapped in the convenience blocks:
usb_signal_source -> usb_downlink_receiver
Prints decoded frames as they arrive, including sync word analysis.
Usage:
uv run python examples/loopback_demo.py
uv run python examples/loopback_demo.py --voice # include voice subcarrier
uv run python examples/loopback_demo.py --snr 20 # add noise at 20 dB SNR
uv run python examples/loopback_demo.py --frames 20 # generate 20 frames
"""
import argparse
import sys
import pmt
from gnuradio import blocks, gr
from apollo.constants import (
PCM_HIGH_BIT_RATE,
PCM_HIGH_WORDS_PER_FRAME,
PCM_WORD_LENGTH,
SAMPLE_RATE_BASEBAND,
)
from apollo.pcm_demux import DemuxEngine
from apollo.usb_downlink_receiver import usb_downlink_receiver
from apollo.usb_signal_source import usb_signal_source
def main():
parser = argparse.ArgumentParser(description="Apollo USB loopback demo")
parser.add_argument("--frames", type=int, default=10, help="Number of frames to generate")
parser.add_argument("--snr", type=float, default=None, help="SNR in dB (None = no noise)")
parser.add_argument("--voice", action="store_true", help="Enable voice subcarrier")
args = parser.parse_args()
bits_per_frame = PCM_HIGH_WORDS_PER_FRAME * PCM_WORD_LENGTH
samples_per_frame = int(bits_per_frame * SAMPLE_RATE_BASEBAND / PCM_HIGH_BIT_RATE)
n_samples = args.frames * samples_per_frame
print("=" * 60)
print("Apollo USB Loopback Demo")
print("=" * 60)
print(f" Frames to transmit: {args.frames}")
print(f" Samples per frame: {samples_per_frame:,}")
print(f" Total samples: {n_samples:,}")
print(f" Duration: {n_samples / SAMPLE_RATE_BASEBAND:.3f} s")
print(f" SNR: {'clean (no noise)' if args.snr is None else f'{args.snr} dB'}")
print(f" Voice subcarrier: {'enabled' if args.voice else 'disabled'}")
print()
# Build the flowgraph
print("Building flowgraph...")
tb = gr.top_block()
tx = usb_signal_source(
sample_rate=SAMPLE_RATE_BASEBAND,
bit_rate=PCM_HIGH_BIT_RATE,
snr_db=args.snr,
voice_enabled=args.voice,
)
head = blocks.head(gr.sizeof_gr_complex, n_samples)
rx = usb_downlink_receiver(
sample_rate=SAMPLE_RATE_BASEBAND,
bit_rate=PCM_HIGH_BIT_RATE,
output_format="raw",
)
snk = blocks.message_debug()
tb.connect(tx, head, rx)
tb.msg_connect(rx, "frames", snk, "store")
print("Running flowgraph (TX -> RX)...")
print()
tb.run()
n_recovered = snk.num_messages()
print(f"Recovered {n_recovered} frames from {args.frames} transmitted")
print()
if n_recovered == 0:
print("No frames recovered. PLL may need more settling time.")
print("Try increasing --frames to give the receiver more data.")
sys.exit(1)
# Decode and display each recovered frame
demux = DemuxEngine(output_format="raw")
print("-" * 60)
for i in range(n_recovered):
msg = snk.get_message(i)
payload = pmt.cdr(msg) if pmt.is_pair(msg) else msg
frame_bytes = bytes(pmt.u8vector_elements(payload))
result = demux.process_frame(frame_bytes)
sync = result.get("sync", {})
frame_id = sync.get("frame_id", 0)
parity = "odd" if (frame_id % 2 == 1) else "even"
words = result.get("words", [])
n_words = len(words)
# Show first few data words as hex
word_preview = " ".join(
f"{w['raw_value']:02X}" for w in words[:8]
)
print(
f" Frame {i + 1:3d}: "
f"ID={frame_id:>2} ({parity:4s}), "
f"sync=0x{sync.get('word', 0):08X}, "
f"{n_words} words "
f"[{word_preview} ...]"
)
print("-" * 60)
print()
print(f"Recovery rate: {n_recovered}/{args.frames} "
f"({100 * n_recovered / args.frames:.0f}%)")
if __name__ == "__main__":
main()

View File

@ -20,6 +20,19 @@ parameters:
label: Test Tone Frequency (Hz)
dtype: real
default: '1000'
- id: audio_input
label: Audio Source
dtype: bool
default: 'False'
options: ['True', 'False']
option_labels: ['External Input', 'Internal Test Tone']
inputs:
- label: audio
domain: stream
dtype: float
optional: true
hide: ${ 'all' if not audio_input else 'none' }
outputs:
- label: out
@ -33,17 +46,21 @@ templates:
sample_rate=${sample_rate},
subcarrier_freq=${subcarrier_freq},
fm_deviation=${fm_deviation},
tone_freq=${tone_freq})
tone_freq=${tone_freq},
audio_input=${audio_input})
documentation: |-
Apollo FM Voice Subcarrier Modulator
Source block that generates a 1.25 MHz FM subcarrier with a sinusoidal test
tone. Transmit-side counterpart to the voice subcarrier demodulator.
Generates a 1.25 MHz FM subcarrier for the voice channel. Can operate in
two modes:
The signal chain is: test tone -> FM modulator -> upconvert to 1.25 MHz
-> real-valued output. With default parameters this produces a signal
matching the Apollo USB downlink voice subcarrier (+/-29 kHz deviation).
Internal mode (default): Uses a built-in sine test tone as the audio source.
This is useful for testing and signal validation.
External mode (audio_input=True): Accepts an external float audio stream as
input. Use this to modulate actual Apollo mission voice recordings or live
audio onto the 1.25 MHz FM subcarrier with +/-29 kHz deviation.
On the real spacecraft, voice audio (300-3000 Hz) drives an FM VCO at
113 kHz, which is mixed with the 512 kHz master clock and doubled to
@ -53,6 +70,7 @@ documentation: |-
sample_rate: Output sample rate in Hz (default 5.12 MHz)
subcarrier_freq: FM subcarrier center frequency in Hz (default 1.25 MHz)
fm_deviation: FM deviation in Hz (default +/-29 kHz)
tone_freq: Internal test tone frequency in Hz (default 1 kHz)
tone_freq: Internal test tone frequency in Hz (default 1 kHz, ignored when audio_input=True)
audio_input: When True, accept external audio via float input port
file_format: 1

View File

@ -0,0 +1,58 @@
id: apollo_sco_mod
label: Apollo SCO Mod
category: '[Apollo USB]'
flags: [python]
parameters:
- id: sco_number
label: SCO Channel (1-9)
dtype: int
default: '1'
- id: sample_rate
label: Sample Rate (Hz)
dtype: real
default: '5120000'
inputs:
- label: in
domain: stream
dtype: float
outputs:
- label: out
domain: stream
dtype: float
templates:
imports: from apollo.sco_mod import sco_mod
make: >-
apollo.sco_mod.sco_mod(
sco_number=${sco_number},
sample_rate=${sample_rate})
documentation: |-
Apollo Subcarrier Oscillator (SCO) Modulator
Generates FM subcarrier oscillator signals for analog telemetry. Takes a
0-5V sensor voltage input and produces an FM subcarrier tone at the
selected channel frequency with +/-7.5% deviation.
Transmit-side counterpart to the SCO Demodulator.
SCO Channels:
1: 14,500 Hz 4: 40,000 Hz 7: 95,000 Hz
2: 22,000 Hz 5: 52,500 Hz 8: 125,000 Hz
3: 30,000 Hz 6: 70,000 Hz 9: 165,000 Hz
Voltage mapping (linear):
0V -> center - 7.5% (low frequency)
2.5V -> center (nominal)
5V -> center + 7.5% (high frequency)
Only used in FM downlink mode.
Parameters:
sco_number: SCO channel number (1-9)
sample_rate: Sample rate in Hz (default 5.12 MHz)
file_format: 1

View File

@ -40,6 +40,7 @@ try:
from apollo.nrz_encoder import nrz_encoder as nrz_encoder
from apollo.pcm_frame_source import pcm_frame_source as pcm_frame_source
from apollo.pm_mod import pm_mod as pm_mod
from apollo.sco_mod import sco_mod as sco_mod
except ImportError:
pass # GNU Radio not available — transmit-side GR blocks won't be importable

View File

@ -1,16 +1,18 @@
"""
Apollo FM Voice Subcarrier Modulator -- generates 1.25 MHz FM test tone.
Apollo FM Voice Subcarrier Modulator -- 1.25 MHz FM with internal tone or external audio.
Transmit-side counterpart to voice_subcarrier_demod. Generates a sinusoidal
test tone, FM-modulates it onto a 1.25 MHz subcarrier, and outputs the
real-valued subcarrier signal.
Transmit-side counterpart to voice_subcarrier_demod. FM-modulates audio onto a
1.25 MHz subcarrier and outputs the real-valued subcarrier signal.
Two modes of operation:
- Internal test tone (default): generates a sine wave for testing.
- External audio input: accepts a float stream (e.g., Apollo mission voice
recordings) and modulates it onto the subcarrier.
On the real spacecraft, voice audio (300-3000 Hz) drives an FM VCO at 113 kHz,
which is mixed with the 512 kHz master clock and doubled to produce the
1.25 MHz FM subcarrier with +/-29 kHz deviation.
For testing, an internal sine-wave test tone replaces live audio.
Reference: IMPLEMENTATION_SPEC.md section 4.2
"""
@ -26,10 +28,17 @@ from apollo.constants import (
class fm_voice_subcarrier_mod(gr.hier_block2):
"""Source block: FM-modulated 1.25 MHz voice subcarrier with test tone.
"""FM-modulated voice subcarrier (1.25 MHz) with internal test tone or external audio.
Outputs:
float -- real-valued FM subcarrier at subcarrier_freq
Inputs (when audio_input=True):
float -- external audio signal (e.g., mission voice recordings)
When audio_input=False (default), an internal sine test tone is used.
When audio_input=True, the block accepts an external float stream -- for
example, actual Apollo mission crew voice recordings.
"""
def __init__(
@ -38,23 +47,23 @@ class fm_voice_subcarrier_mod(gr.hier_block2):
subcarrier_freq: float = VOICE_SUBCARRIER_HZ,
fm_deviation: float = VOICE_FM_DEVIATION_HZ,
tone_freq: float = 1000.0,
audio_input: bool = False,
):
# Choose input signature based on mode
in_sig = gr.io_signature(1, 1, gr.sizeof_float) if audio_input else gr.io_signature(0, 0, 0)
gr.hier_block2.__init__(
self,
"apollo_fm_voice_subcarrier_mod",
gr.io_signature(0, 0, 0), # source -- no input
gr.io_signature(1, 1, gr.sizeof_float), # float output
in_sig,
gr.io_signature(1, 1, gr.sizeof_float),
)
self._sample_rate = sample_rate
self._tone_freq = tone_freq
self._subcarrier_freq = subcarrier_freq
self._fm_deviation = fm_deviation
# Audio test tone generator
self.tone = analog.sig_source_f(
sample_rate, analog.GR_SIN_WAVE, tone_freq, 1.0, 0,
)
self._audio_input = audio_input
# FM modulator: sensitivity = 2*pi*deviation/sample_rate rad/sample
# With unit-amplitude sine input this gives +/-fm_deviation Hz.
@ -72,8 +81,16 @@ class fm_voice_subcarrier_mod(gr.hier_block2):
# Extract real part for float output
self.to_real = blocks.complex_to_real(1)
# Connect: tone -> FM mod -> mixer(x LO) -> real
if audio_input:
# External audio input -> FM mod
self.connect(self, self.fm_mod, (self.mixer, 0))
else:
# Internal test tone -> FM mod
self.tone = analog.sig_source_f(
sample_rate, analog.GR_SIN_WAVE, tone_freq, 1.0, 0,
)
self.connect(self.tone, self.fm_mod, (self.mixer, 0))
self.connect(self.lo, (self.mixer, 1))
self.connect(self.mixer, self.to_real, self)
@ -91,3 +108,8 @@ class fm_voice_subcarrier_mod(gr.hier_block2):
def fm_deviation(self) -> float:
"""FM deviation in Hz."""
return self._fm_deviation
@property
def audio_input(self) -> bool:
"""Whether the block accepts external audio input."""
return self._audio_input

124
src/apollo/sco_mod.py Normal file
View File

@ -0,0 +1,124 @@
"""
Apollo Subcarrier Oscillator (SCO) Modulator FM analog telemetry.
In FM downlink mode, the Pre-Modulation Processor generates 9 subcarrier
oscillators (SCOs) that encode analog sensor voltages (0-5V DC) as frequency
deviations of +/-7.5% around each channel's center frequency.
This is the transmit-side counterpart to sco_demod.py. It takes a 0-5V sensor
voltage input and produces an FM subcarrier tone at the configured SCO channel
frequency.
Transmitter side (this block):
0-5V input -> subtract 2.5V -> scale to +/-1.0
-> FM modulator -> upconvert to center_freq
-> extract real part -> float output
The mapping is linear:
0V input -> center_freq - 7.5% = low frequency
2.5V input -> center_freq (nominal)
5V input -> center_freq + 7.5% = high frequency
Reference: IMPLEMENTATION_SPEC.md section 4.3
"""
import math
from gnuradio import analog, blocks, gr
from apollo.constants import (
SAMPLE_RATE_BASEBAND,
SCO_DEVIATION_PERCENT,
SCO_FREQUENCIES,
SCO_INPUT_RANGE_V,
)
class sco_mod(gr.hier_block2):
"""Modulate a 0-5V sensor voltage onto an FM subcarrier oscillator tone.
Only used in FM downlink mode.
Inputs:
float -- sensor voltage (0.0 to 5.0 V)
Outputs:
float -- FM subcarrier tone at the selected SCO channel frequency
"""
def __init__(
self,
sco_number: int = 1,
sample_rate: float = SAMPLE_RATE_BASEBAND,
):
gr.hier_block2.__init__(
self,
"apollo_sco_mod",
gr.io_signature(1, 1, gr.sizeof_float),
gr.io_signature(1, 1, gr.sizeof_float),
)
if sco_number not in SCO_FREQUENCIES:
raise ValueError(
f"SCO number must be 1-9, got {sco_number}. "
f"Valid channels: {sorted(SCO_FREQUENCIES.keys())}"
)
self._sco_number = sco_number
self._sample_rate = sample_rate
center_freq = SCO_FREQUENCIES[sco_number]
self._center_freq = center_freq
# Frequency deviation in Hz: +/-7.5% of center
deviation_hz = center_freq * (SCO_DEVIATION_PERCENT / 100.0)
self._deviation_hz = deviation_hz
# Voltage range parameters
v_min, v_max = SCO_INPUT_RANGE_V
v_range = v_max - v_min # 5.0
v_mid = (v_max + v_min) / 2.0 # 2.5
# Stage 1: Offset — subtract midpoint so 2.5V becomes 0
self.offset = blocks.add_const_ff(-v_mid)
# Stage 2: Scale — map +/-2.5V to +/-1.0
self.scale = blocks.multiply_const_ff(1.0 / (v_range / 2.0))
# Stage 3: FM modulator — +/-1.0 input produces +/-deviation_hz
fm_sensitivity = 2.0 * math.pi * deviation_hz / sample_rate
self.fm_mod = analog.frequency_modulator_fc(fm_sensitivity)
# Stage 4: Local oscillator at center_freq for upconversion
self.lo = analog.sig_source_c(
sample_rate, analog.GR_COS_WAVE, center_freq, 1.0, 0,
)
# Stage 5: Mixer — shift baseband FM signal up to center_freq
self.mixer = blocks.multiply_cc(1)
# Stage 6: Extract real part for float output
self.to_real = blocks.complex_to_real(1)
# Connect the chain:
# input -> offset -> scale -> fm_mod -> (mixer, 0)
# lo -> (mixer, 1)
# mixer -> to_real -> output
self.connect(self, self.offset, self.scale, self.fm_mod, (self.mixer, 0))
self.connect(self.lo, (self.mixer, 1))
self.connect(self.mixer, self.to_real, self)
@property
def center_freq(self) -> float:
"""Center frequency of this SCO channel in Hz."""
return self._center_freq
@property
def deviation_hz(self) -> float:
"""FM deviation in Hz (+/- from center)."""
return self._deviation_hz
@property
def sco_number(self) -> int:
"""SCO channel number (1-9)."""
return self._sco_number

View File

@ -10,10 +10,7 @@ try:
except ImportError:
HAS_GNURADIO = False
from apollo.constants import (
SAMPLE_RATE_BASEBAND,
VOICE_SUBCARRIER_HZ,
)
from apollo.constants import SAMPLE_RATE_BASEBAND
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
@ -147,3 +144,104 @@ class TestFmVoiceModFunctional:
# FM on a cosine carrier: peak should be around 1.0, certainly < 2.0
assert peak < 2.0, f"Output peak amplitude {peak:.3f} exceeds expected bound"
assert peak > 0.1, f"Output peak amplitude {peak:.3f} is suspiciously low"
class TestFmVoiceModExternalAudio:
"""Tests for external audio input mode."""
def test_default_is_source(self):
"""Default mode should be source (no input, backward compatible)."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
mod = fm_voice_subcarrier_mod()
assert not mod.audio_input
def test_external_audio_instantiation(self):
"""Block with audio_input=True should instantiate."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
mod = fm_voice_subcarrier_mod(audio_input=True)
assert mod is not None
def test_external_audio_property(self):
"""audio_input property should reflect constructor arg."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
mod_ext = fm_voice_subcarrier_mod(audio_input=True)
assert mod_ext.audio_input is True
mod_int = fm_voice_subcarrier_mod(audio_input=False)
assert mod_int.audio_input is False
def test_external_audio_produces_output(self):
"""Feed a 1 kHz sine wave into external input, verify output."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
t = np.arange(n_samples, dtype=np.float32) / sample_rate
audio = np.sin(2 * np.pi * 1000 * t).astype(np.float32)
tb = gr.top_block()
src = blocks.vector_source_f(audio.tolist())
mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
data = np.array(snk.data())
assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}"
assert np.any(data != 0), "Output is all zeros with external audio input"
def test_external_audio_spectral_energy(self):
"""Feed audio, verify spectral energy near 1.25 MHz subcarrier."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
t = np.arange(n_samples, dtype=np.float32) / sample_rate
audio = np.sin(2 * np.pi * 1000 * t).astype(np.float32)
tb = gr.top_block()
src = blocks.vector_source_f(audio.tolist())
mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
data = np.array(snk.data())
fft_vals = np.fft.fft(data)
freqs = np.fft.fftfreq(len(data), 1.0 / sample_rate)
# Energy in the 1.2-1.3 MHz band (subcarrier +/- deviation margin)
voice_mask = (np.abs(freqs) > 1_200_000) & (np.abs(freqs) < 1_300_000)
voice_power = np.mean(np.abs(fft_vals[voice_mask]) ** 2)
total_power = np.mean(np.abs(fft_vals) ** 2)
assert voice_power > total_power * 0.1, (
f"Subcarrier band power ({voice_power:.1f}) is less than 10% of "
f"total power ({total_power:.1f})"
)
def test_external_audio_silence(self):
"""Feed zeros (silence), verify output still present (carrier only)."""
from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod
sample_rate = SAMPLE_RATE_BASEBAND
n_samples = 51200
silence = np.zeros(n_samples, dtype=np.float32)
tb = gr.top_block()
src = blocks.vector_source_f(silence.tolist())
mod = fm_voice_subcarrier_mod(sample_rate=sample_rate, audio_input=True)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
data = np.array(snk.data())
assert len(data) == n_samples, f"Expected {n_samples} samples, got {len(data)}"
# With silence input, FM deviation is zero so the output is an
# unmodulated carrier at the subcarrier frequency -- still non-zero.
assert np.any(data != 0), "Output is all zeros with silence input"
peak = np.max(np.abs(data))
assert peak > 0.1, f"Carrier amplitude {peak:.3f} is suspiciously low"

178
tests/test_sco_mod.py Normal file
View File

@ -0,0 +1,178 @@
"""Tests for the SCO (Subcarrier Oscillator) modulator block."""
import numpy as np
import pytest
try:
from gnuradio import blocks, gr
HAS_GNURADIO = True
except ImportError:
HAS_GNURADIO = False
from apollo.constants import (
SAMPLE_RATE_BASEBAND,
SCO_FREQUENCIES,
)
pytestmark = pytest.mark.skipif(not HAS_GNURADIO, reason="GNU Radio not installed")
class TestSCOModInstantiation:
"""Test block creation and parameter validation."""
def test_all_channels(self):
"""Should instantiate for each valid SCO channel (1-9)."""
from apollo.sco_mod import sco_mod
for ch in range(1, 10):
mod = sco_mod(sco_number=ch)
assert mod is not None
assert mod.center_freq == SCO_FREQUENCIES[ch]
def test_invalid_channel_zero(self):
"""Channel 0 should raise ValueError."""
from apollo.sco_mod import sco_mod
with pytest.raises(ValueError, match="SCO number must be 1-9"):
sco_mod(sco_number=0)
def test_invalid_channel_ten(self):
"""Channel 10 should raise ValueError."""
from apollo.sco_mod import sco_mod
with pytest.raises(ValueError, match="SCO number must be 1-9"):
sco_mod(sco_number=10)
def test_deviation_property(self):
"""Deviation should be 7.5% of center frequency."""
from apollo.sco_mod import sco_mod
for ch in range(1, 10):
mod = sco_mod(sco_number=ch)
expected = SCO_FREQUENCIES[ch] * 0.075
assert abs(mod.deviation_hz - expected) < 0.01
def test_custom_sample_rate(self):
"""Should accept a custom sample rate."""
from apollo.sco_mod import sco_mod
mod = sco_mod(sco_number=1, sample_rate=10_240_000)
assert mod is not None
class TestSCOModFunctional:
"""Functional tests with constant-voltage inputs."""
def _get_output(self, sco_number, voltage, n_samples=None,
sample_rate=SAMPLE_RATE_BASEBAND):
"""Feed constant voltage through sco_mod and return output samples."""
from apollo.sco_mod import sco_mod
if n_samples is None:
n_samples = int(sample_rate * 0.1) # 100ms
tb = gr.top_block()
src = blocks.vector_source_f([voltage] * n_samples)
mod = sco_mod(sco_number=sco_number, sample_rate=sample_rate)
snk = blocks.vector_sink_f()
tb.connect(src, mod, snk)
tb.run()
return np.array(snk.data())
def test_midscale_produces_center_freq(self):
"""Feed 2.5V DC, verify spectral peak near center frequency."""
sco_ch = 5 # 52,500 Hz
sample_rate = SAMPLE_RATE_BASEBAND
output = self._get_output(sco_ch, voltage=2.5, sample_rate=sample_rate)
assert len(output) > 0, "Modulator produced no output"
# Find dominant frequency via FFT
spectrum = np.abs(np.fft.rfft(output))
freqs = np.fft.rfftfreq(len(output), d=1.0 / sample_rate)
peak_idx = np.argmax(spectrum[1:]) + 1 # skip DC
peak_freq = freqs[peak_idx]
expected = SCO_FREQUENCIES[sco_ch]
tolerance = expected * 0.02 # 2% tolerance
assert abs(peak_freq - expected) < tolerance, (
f"SCO ch{sco_ch} at 2.5V: peak at {peak_freq:.0f} Hz, "
f"expected {expected} Hz +/- {tolerance:.0f} Hz"
)
def test_produces_output(self):
"""Feed 2.5V, verify non-zero output."""
output = self._get_output(sco_number=5, voltage=2.5)
assert len(output) > 0, "Modulator produced no output"
assert np.any(output != 0.0), "Output is all zeros"
def test_output_bounded(self):
"""Peak amplitude should be reasonable (< 2.0, > 0.1)."""
output = self._get_output(sco_number=5, voltage=2.5)
peak = np.max(np.abs(output))
assert peak > 0.1, f"Output too small: peak amplitude {peak:.4f}"
assert peak < 2.0, f"Output too large: peak amplitude {peak:.4f}"
def test_all_channels_produce_output(self):
"""All 9 channels should produce non-zero output with 2.5V input."""
for ch in range(1, 10):
output = self._get_output(sco_number=ch, voltage=2.5)
assert len(output) > 0, f"SCO ch{ch} produced no output"
assert np.any(output != 0.0), f"SCO ch{ch} output is all zeros"
class TestSCOModDemodRoundtrip:
"""Round-trip tests: sco_mod -> sco_demod should recover the input voltage."""
def _roundtrip(self, sco_number, voltage, n_samples=None,
sample_rate=SAMPLE_RATE_BASEBAND):
"""Feed voltage through sco_mod -> sco_demod, return demod output."""
from apollo.sco_demod import sco_demod
from apollo.sco_mod import sco_mod
if n_samples is None:
n_samples = int(sample_rate * 0.2) # 200ms for settling
tb = gr.top_block()
src = blocks.vector_source_f([voltage] * n_samples)
mod = sco_mod(sco_number=sco_number, sample_rate=sample_rate)
demod = sco_demod(sco_number=sco_number, sample_rate=sample_rate)
snk = blocks.vector_sink_f()
tb.connect(src, mod, demod, snk)
tb.run()
return np.array(snk.data())
def test_roundtrip_midscale(self):
"""sco_mod(2.5V) -> sco_demod should recover ~2.5V."""
sco_ch = 5 # 52,500 Hz
output = self._roundtrip(sco_ch, voltage=2.5)
assert len(output) > 0, "Round-trip produced no output"
# Skip first 50% for filter settling
settled = output[len(output) // 2:]
if len(settled) > 10:
mean_v = np.mean(settled)
assert 1.5 < mean_v < 3.5, (
f"SCO ch{sco_ch} round-trip at 2.5V: mean output {mean_v:.2f}V, "
f"expected near 2.5V"
)
def test_roundtrip_monotonic(self):
"""Feed 0V, 2.5V, 5V through mod->demod; output should be monotonic."""
sco_ch = 6 # 70,000 Hz
voltages = [0.0, 2.5, 5.0]
means = []
for v_in in voltages:
output = self._roundtrip(sco_ch, voltage=v_in)
settled = output[len(output) // 2:]
mean_v = np.mean(settled) if len(settled) > 10 else float("nan")
means.append(mean_v)
assert means[0] < means[1] < means[2], (
f"Non-monotonic round-trip: "
f"V_in={voltages}, V_out={[f'{v:.2f}' for v in means]}"
)