gr-apollo/src/apollo/fm_signal_source.py
Ryan Malloy 7d48398551 Add FM downlink mode: carrier blocks, convenience wrappers, and loopback demo
FM mode now has the same three-layer architecture as PM mode:
- fm_mod/fm_demod for carrier-level FM modulation
- fm_signal_source/fm_downlink_receiver convenience wrappers
- fm_loopback_demo.py verifying round-trip SCO voltage recovery

Includes GRC YAML for all 4 blocks and doc updates across
blocks reference, SCO guide, and signal architecture pages.
2026-02-24 10:18:42 -07:00

136 lines
4.3 KiB
Python

"""
Apollo FM Downlink Signal Source -- complete FM transmit chain in one block.
The transmit-side counterpart to fm_downlink_receiver. Wires together the
full FM modulation chain for SCO analog telemetry:
dc_source(v1) -> sco_mod(ch1) -+-> add_ff -> fm_mod -> [+AWGN] -> complex out
dc_source(v2) -> sco_mod(ch2) -+
dc_source(vN) -> sco_mod(chN) -+
In FM downlink mode (used for pre-launch checkout), the spacecraft replaces
the PCM/voice subcarrier system with 9 Subcarrier Oscillators (SCOs) that
encode analog sensor voltages as FM tones. These SCO tones are summed and
frequency-modulate the RF carrier.
For finer control, use the individual sco_mod and fm_mod blocks directly.
Reference: IMPLEMENTATION_SPEC.md sections 2.3, 4.3
"""
import math
from gnuradio import analog, blocks, gr
from apollo.constants import FM_CARRIER_DEVIATION_HZ, SAMPLE_RATE_BASEBAND, SCO_FREQUENCIES
from apollo.fm_mod import fm_mod
from apollo.sco_mod import sco_mod
class fm_signal_source(gr.hier_block2):
"""Apollo FM downlink signal source -- complex baseband output.
Outputs:
complex -- FM-modulated baseband at sample_rate (default 5.12 MHz)
Generates DC test voltages for each configured SCO channel, modulates
them onto their respective subcarrier tones, sums the composite, and
applies wideband FM to produce complex baseband.
Optional AWGN noise can be added by setting snr_db to a finite value.
"""
def __init__(
self,
channels: list[int] | None = None,
test_voltages: dict[int, float] | None = None,
sample_rate: float = SAMPLE_RATE_BASEBAND,
fm_deviation_hz: float = FM_CARRIER_DEVIATION_HZ,
snr_db: float | None = None,
):
if channels is None:
channels = [1, 5, 9]
if test_voltages is None:
test_voltages = {ch: 2.5 for ch in channels}
gr.hier_block2.__init__(
self,
"apollo_fm_signal_source",
gr.io_signature(0, 0, 0), # source -- no input
gr.io_signature(1, 1, gr.sizeof_gr_complex),
)
self._channels = list(channels)
self._test_voltages = dict(test_voltages)
self._fm_deviation_hz = fm_deviation_hz
self._sample_rate = sample_rate
# Validate channels
for ch in self._channels:
if ch not in SCO_FREQUENCIES:
raise ValueError(
f"SCO channel {ch} invalid. Valid: {sorted(SCO_FREQUENCIES.keys())}"
)
# --- Build SCO modulation chains ---
self._dc_sources = {}
self._sco_mods = {}
n_channels = len(self._channels)
self.adder = blocks.add_ff(1)
for idx, ch in enumerate(self._channels):
voltage = self._test_voltages.get(ch, 2.5)
# DC source at the test voltage
dc = analog.sig_source_f(
sample_rate, analog.GR_CONST_WAVE, 0, voltage, 0,
)
self._dc_sources[ch] = dc
# SCO modulator for this channel
mod = sco_mod(sco_number=ch, sample_rate=sample_rate)
self._sco_mods[ch] = mod
# Connect: dc -> sco_mod -> adder port idx
self.connect(dc, mod, (self.adder, idx))
# --- FM carrier modulation ---
self.fm = fm_mod(fm_deviation_hz=fm_deviation_hz, sample_rate=sample_rate)
self.connect(self.adder, self.fm)
# --- Optional AWGN ---
if snr_db is not None:
noise_power = 1.0 / (10.0 ** (snr_db / 10.0))
noise_amplitude = math.sqrt(noise_power / 2.0)
self.noise = analog.noise_source_c(
analog.GR_GAUSSIAN, noise_amplitude, 0,
)
self.sum_noise = blocks.add_cc(1)
self.connect(self.fm, (self.sum_noise, 0))
self.connect(self.noise, (self.sum_noise, 1))
self.connect(self.sum_noise, self)
else:
self.connect(self.fm, self)
@property
def channels(self) -> list[int]:
"""SCO channel numbers being generated."""
return list(self._channels)
@property
def test_voltages(self) -> dict[int, float]:
"""Current test voltage per channel."""
return dict(self._test_voltages)
@property
def fm_deviation_hz(self) -> float:
"""Carrier FM deviation in Hz."""
return self._fm_deviation_hz