From 7d4839855169674993b67c196ee21a71743a18c2 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 24 Feb 2026 10:18:42 -0700 Subject: [PATCH] Add FM downlink mode: carrier blocks, convenience wrappers, and loopback demo FM mode now has the same three-layer architecture as PM mode: - fm_mod/fm_demod for carrier-level FM modulation - fm_signal_source/fm_downlink_receiver convenience wrappers - fm_loopback_demo.py verifying round-trip SCO voltage recovery Includes GRC YAML for all 4 blocks and doc updates across blocks reference, SCO guide, and signal architecture pages. --- .../docs/explanation/signal-architecture.mdx | 56 ++++++ .../content/docs/guides/sco-modulation.mdx | 59 ++++++ docs/src/content/docs/reference/blocks.mdx | 185 ++++++++++++++++++ examples/fm_loopback_demo.py | 149 ++++++++++++++ grc/apollo_fm_demod.block.yml | 48 +++++ grc/apollo_fm_downlink_receiver.block.yml | 63 ++++++ grc/apollo_fm_mod.block.yml | 45 +++++ grc/apollo_fm_signal_source.block.yml | 60 ++++++ src/apollo/__init__.py | 4 + src/apollo/constants.py | 1 + src/apollo/fm_demod.py | 72 +++++++ src/apollo/fm_downlink_receiver.py | 94 +++++++++ src/apollo/fm_mod.py | 58 ++++++ src/apollo/fm_signal_source.py | 135 +++++++++++++ 14 files changed, 1029 insertions(+) create mode 100644 examples/fm_loopback_demo.py create mode 100644 grc/apollo_fm_demod.block.yml create mode 100644 grc/apollo_fm_downlink_receiver.block.yml create mode 100644 grc/apollo_fm_mod.block.yml create mode 100644 grc/apollo_fm_signal_source.block.yml create mode 100644 src/apollo/fm_demod.py create mode 100644 src/apollo/fm_downlink_receiver.py create mode 100644 src/apollo/fm_mod.py create mode 100644 src/apollo/fm_signal_source.py diff --git a/docs/src/content/docs/explanation/signal-architecture.mdx b/docs/src/content/docs/explanation/signal-architecture.mdx index f780b7f..781c711 100644 --- a/docs/src/content/docs/explanation/signal-architecture.mdx +++ b/docs/src/content/docs/explanation/signal-architecture.mdx @@ -226,3 +226,59 @@ The `usb_signal_source` hierarchical block wires the entire TX chain together as + +## FM downlink mode + +During pre-launch checkout and certain test configurations, the Apollo USB system switches from PM mode to wideband FM. In this mode, the PCM and voice subcarriers are replaced by 9 Subcarrier Oscillators (SCOs) that encode analog sensor voltages as FM tones: + +```mermaid +graph TD + A["Sensor 1\n0-5V DC"] -->|"SCO 1\n14.5 kHz"| G["Composite\nSCO Signal"] + B["Sensor 5\n0-5V DC"] -->|"SCO 5\n52.5 kHz"| G + C["Sensor 9\n0-5V DC"] -->|"SCO 9\n165 kHz"| G + + G -->|"FM\n500 kHz dev"| H["2287.5 MHz\nRF Carrier"] + + H --> I["Transmitted\nFM Downlink"] + + style A fill:#2d5016,stroke:#4a8c2a + style B fill:#2d5016,stroke:#4a8c2a + style C fill:#2d5016,stroke:#4a8c2a + style H fill:#1a3a5c,stroke:#3a7abd + style I fill:#1a3a5c,stroke:#3a7abd +``` + +The key differences from PM mode: + +| Property | PM Mode | FM Mode | +|----------|---------|---------| +| Carrier modulation | Phase (0.133 rad peak) | Frequency (wideband) | +| Data format | Digital (PCM frames) | Analog (voltage-to-frequency) | +| Subcarriers | 1.024 MHz BPSK + 1.25 MHz FM | 9 SCO tones (14.5-165 kHz) | +| Demodulation | Phase extraction | Frequency extraction | + +### gr-apollo FM block decomposition + +The FM mode blocks mirror the PM mode three-layer architecture: + +```mermaid +graph LR + subgraph "Layer 1: Carrier" + A["fm_mod"] --> B["fm_demod"] + end + subgraph "Layer 2: Subcarrier" + C["sco_mod\n(per channel)"] --> D["sco_demod\n(per channel)"] + end + subgraph "Layer 3: Convenience" + E["fm_signal_source"] --> F["fm_downlink_receiver"] + end + + style A fill:#1a3a5c,stroke:#3a7abd,color:#fff + style B fill:#1a3a5c,stroke:#3a7abd,color:#fff + style C fill:#5c3a1a,stroke:#bd7a3a,color:#fff + style D fill:#5c3a1a,stroke:#bd7a3a,color:#fff + style E fill:#2d5016,stroke:#4a8c2a,color:#fff + style F fill:#2d5016,stroke:#4a8c2a,color:#fff +``` + +The `fm_signal_source` and `fm_downlink_receiver` convenience blocks wire the full chain together, just as `usb_signal_source` and `usb_downlink_receiver` do for PM mode. The FM receiver uses streaming float outputs (one per SCO channel) rather than PDU messages, since SCO telemetry is continuous analog data. diff --git a/docs/src/content/docs/guides/sco-modulation.mdx b/docs/src/content/docs/guides/sco-modulation.mdx index 4db7e60..715e110 100644 --- a/docs/src/content/docs/guides/sco-modulation.mdx +++ b/docs/src/content/docs/guides/sco-modulation.mdx @@ -318,3 +318,62 @@ The first few milliseconds of demodulator output may show transient behavior as href="/reference/constants/#subcarrier-oscillators-fm-mode" /> + +## Convenience Wrappers + +For quick FM downlink testing, the `fm_signal_source` and `fm_downlink_receiver` blocks wrap the entire SCO modulation/demodulation chain into single blocks: + +```python +from gnuradio import blocks, gr + +from apollo.constants import SAMPLE_RATE_BASEBAND +from apollo.fm_downlink_receiver import fm_downlink_receiver +from apollo.fm_signal_source import fm_signal_source + +tb = gr.top_block() + +# TX: 3 SCO channels at known voltages +tx = fm_signal_source( + channels=[1, 5, 9], + test_voltages={1: 1.0, 5: 2.5, 9: 4.0}, +) + +head = blocks.head(gr.sizeof_gr_complex, int(SAMPLE_RATE_BASEBAND * 0.2)) # 200 ms +rx = fm_downlink_receiver(channels=[1, 5, 9]) + +tb.connect(tx, head, rx) + +# One sink per channel +import numpy as np +sinks = [] +for idx in range(3): + snk = blocks.vector_sink_f() + tb.connect((rx, idx), snk) + sinks.append(snk) + +tb.run() + +for idx, ch in enumerate([1, 5, 9]): + data = np.array(sinks[idx].data()) + settled = data[len(data) // 5:] # skip first 20% for settling + print(f"SCO {ch}: recovered {np.mean(settled):.3f} V") +``` + +For a complete round-trip demo with error analysis, run: + +```bash +uv run python examples/fm_loopback_demo.py --channels 1 5 9 --snr 30 +``` + + + + + diff --git a/docs/src/content/docs/reference/blocks.mdx b/docs/src/content/docs/reference/blocks.mdx index 633b3b8..35be406 100644 --- a/docs/src/content/docs/reference/blocks.mdx +++ b/docs/src/content/docs/reference/blocks.mdx @@ -657,6 +657,51 @@ The voltage mapping is linear: demod output of -1.0 maps to 0V, 0.0 maps to 2.5V --- +### FM Downlink + +### `fm_demod` + +**Module:** `apollo.fm_demod` +**Type:** `gr.hier_block2` +**Purpose:** Extract frequency modulation from complex baseband using a carrier-tracking PLL and quadrature demodulation. + +```python +from apollo.fm_demod import fm_demod + +blk = fm_demod(carrier_pll_bw=0.02, fm_deviation_hz=500_000) +``` + +#### I/O Signature + +| Port | Direction | Type | Description | +|------|-----------|------|-------------| +| `in0` | Input | `complex` | Complex baseband IQ samples | +| `out0` | Output | `float` | Demodulated composite signal (normalized: +/-1.0 at full deviation) | + +#### Constructor Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `carrier_pll_bw` | `float` | `0.02` | PLL loop bandwidth in rad/sample | +| `fm_deviation_hz` | `float` | `500000` | Expected max FM deviation in Hz (sets demod gain) | +| `sample_rate` | `float` | `5120000` | Input sample rate in Hz | + +#### Runtime Methods + +| Method | Signature | Description | +|--------|-----------|-------------| +| `get_carrier_pll_bw` | `() -> float` | Read current PLL loop bandwidth | +| `set_carrier_pll_bw` | `(bw: float) -> None` | Update PLL loop bandwidth at runtime | +| `get_fm_deviation` | `() -> float` | Read configured FM deviation | + +#### Internal Chain + +`input -> pll_carriertracking_cc -> quadrature_demod_cf(gain) -> output` + +where `gain = sample_rate / (2*pi*fm_deviation_hz)`. + +--- + ## AGC Integration ### `AGCBridgeClient` / `agc_bridge` @@ -1226,6 +1271,47 @@ blk = sco_mod(sco_number=5, sample_rate=5_120_000) --- +### `fm_mod` + +**Module:** `apollo.fm_mod` +**Type:** `gr.hier_block2` +**Purpose:** Apply frequency modulation to produce complex baseband. Used in FM downlink mode. + +```python +from apollo.fm_mod import fm_mod + +blk = fm_mod(fm_deviation_hz=500_000, sample_rate=5_120_000) +``` + +#### I/O Signature + +| Port | Direction | Type | Description | +|------|-----------|------|-------------| +| `in0` | Input | `float` | Composite modulating signal (sum of SCO subcarriers) | +| `out0` | Output | `complex` | FM complex baseband | + +#### Constructor Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `fm_deviation_hz` | `float` | `500000` | Max frequency deviation in Hz (`FM_CARRIER_DEVIATION_HZ`) | +| `sample_rate` | `float` | `5120000` | Sample rate in Hz | + +#### Runtime Methods + +| Method | Signature | Description | +|--------|-----------|-------------| +| `get_fm_deviation` | `() -> float` | Read current FM deviation | +| `set_fm_deviation` | `(hz: float) -> None` | Update FM deviation at runtime | + +#### Internal Chain + +`input -> multiply_const_ff(1.0) -> frequency_modulator_fc(sensitivity) -> output` + +where `sensitivity = 2*pi*fm_deviation_hz/sample_rate`. + +--- + ### `usb_signal_source` **Module:** `apollo.usb_signal_source` @@ -1282,3 +1368,102 @@ The internal blocks are exposed as instance attributes for runtime inspection or | `self.adder` | `add_ff` | Subcarrier summer (when `voice_enabled`) | | `self.pm` | `pm_mod` | Phase modulator | | `self.noise` | `noise_source_c` | AWGN source (when `snr_db` is set) | + +--- + +### `fm_signal_source` + +**Module:** `apollo.fm_signal_source` +**Type:** `gr.hier_block2` +**Purpose:** Complete Apollo FM downlink transmit chain -- generates SCO channels at test voltages, sums them, and applies FM carrier modulation. + +```python +from apollo.fm_signal_source import fm_signal_source + +blk = fm_signal_source(channels=[1, 5, 9], test_voltages={1: 1.0, 5: 2.5, 9: 4.0}, snr_db=30) +``` + +#### I/O Signature + +| Port | Direction | Type | Description | +|------|-----------|------|-------------| +| (none) | Input | (none) | Source block -- no streaming input | +| `out0` | Output | `complex` | FM-modulated complex baseband at `sample_rate` | + +#### Constructor Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `channels` | `list[int]` | `[1, 5, 9]` | SCO channel numbers to generate (1-9) | +| `test_voltages` | `dict[int, float]` | `{ch: 2.5 for each}` | DC voltage per channel (0.0-5.0 V) | +| `sample_rate` | `float` | `5120000` | Baseband sample rate in Hz | +| `fm_deviation_hz` | `float` | `500000` | Carrier FM deviation in Hz | +| `snr_db` | `float \| None` | `None` | Add AWGN at this SNR. `None` = no noise | + +#### Properties + +| Property | Type | Description | +|----------|------|-------------| +| `channels` | `list[int]` | SCO channel numbers being generated | +| `test_voltages` | `dict[int, float]` | Current voltage per channel | +| `fm_deviation_hz` | `float` | Carrier FM deviation in Hz | + +#### Internal Signal Chain + +``` +dc_source(v1) -> sco_mod(ch1) -+-> add_ff -> fm_mod -> [+AWGN] -> output +dc_source(v2) -> sco_mod(ch2) -+ +dc_source(vN) -> sco_mod(chN) -+ +``` + +--- + +### `fm_downlink_receiver` + +**Module:** `apollo.fm_downlink_receiver` +**Type:** `gr.hier_block2` +**Purpose:** Complete Apollo FM downlink receiver -- complex baseband input to recovered SCO voltages on streaming float outputs. + +```python +from apollo.fm_downlink_receiver import fm_downlink_receiver + +blk = fm_downlink_receiver(channels=[1, 5, 9]) +``` + +#### I/O Signature + +| Port | Direction | Type | Description | +|------|-----------|------|-------------| +| `in0` | Input | `complex` (streaming) | Baseband IQ samples at `sample_rate` | +| `out0..N-1` | Output | `float` (streaming) | Recovered 0-5V sensor voltage per SCO channel | + +Output port ordering matches the `channels` list: output 0 = `channels[0]`, etc. + +#### Constructor Parameters + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `channels` | `list[int]` | `[1, 5, 9]` | SCO channel numbers to decode (1-9) | +| `sample_rate` | `float` | `5120000` | Input sample rate in Hz | +| `carrier_pll_bw` | `float` | `0.02` | FM carrier recovery loop bandwidth (rad/sample) | +| `fm_deviation_hz` | `float` | `500000` | Expected carrier FM deviation in Hz | + +#### Properties + +| Property | Type | Description | +|----------|------|-------------| +| `channels` | `list[int]` | SCO channel numbers being decoded | + +#### Methods + +| Method | Signature | Description | +|--------|-----------|-------------| +| `get_sco_demod` | `(channel: int) -> sco_demod` | Access a specific SCO demodulator for runtime inspection | + +#### Internal Signal Chain + +``` +complex in -> fm_demod -> sco_demod(ch1) -> output[0] + -> sco_demod(ch2) -> output[1] + -> sco_demod(chN) -> output[N-1] +``` diff --git a/examples/fm_loopback_demo.py b/examples/fm_loopback_demo.py new file mode 100644 index 0000000..7c67b72 --- /dev/null +++ b/examples/fm_loopback_demo.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +""" +Apollo FM Downlink Loopback Demo -- SCO round-trip verification. + +Demonstrates the FM downlink block chain using GNU Radio streaming blocks: + + TX: dc_sources -> sco_mods -> add -> fm_mod + RX: fm_demod -> sco_demods -> recovered voltages + +All wrapped in the convenience blocks: + fm_signal_source -> fm_downlink_receiver + +Generates FM signal with SCO channels at known DC voltages, demodulates, +and compares recovered vs input voltages. + +Usage: + uv run python examples/fm_loopback_demo.py + uv run python examples/fm_loopback_demo.py --channels 1 5 9 + uv run python examples/fm_loopback_demo.py --snr 30 + uv run python examples/fm_loopback_demo.py --samples 1024000 +""" + +import argparse +import sys + +import numpy as np +from gnuradio import blocks, gr + +from apollo.constants import SAMPLE_RATE_BASEBAND, SCO_FREQUENCIES +from apollo.fm_downlink_receiver import fm_downlink_receiver +from apollo.fm_signal_source import fm_signal_source + + +# Default test voltages: spread across the 0-5V range +DEFAULT_VOLTAGES = {1: 1.0, 5: 2.5, 9: 4.0} + + +def main(): + parser = argparse.ArgumentParser(description="Apollo FM downlink loopback demo") + parser.add_argument( + "--channels", type=int, nargs="+", default=[1, 5, 9], + help="SCO channel numbers to test (default: 1 5 9)", + ) + parser.add_argument( + "--snr", type=float, default=None, + help="SNR in dB (default: no noise)", + ) + parser.add_argument( + "--samples", type=int, default=10 * 102400, + help="Number of samples to process (default: 1024000)", + ) + args = parser.parse_args() + + channels = args.channels + n_samples = args.samples + + # Assign test voltages: spread evenly across 0-5V range + if set(channels) == {1, 5, 9}: + test_voltages = dict(DEFAULT_VOLTAGES) + else: + step = 4.0 / max(1, len(channels) - 1) if len(channels) > 1 else 0 + test_voltages = {ch: 0.5 + i * step for i, ch in enumerate(channels)} + + print("=" * 60) + print("Apollo FM Downlink Loopback Demo") + print("=" * 60) + print(f" Channels: {channels}") + print(f" Input voltages:") + for ch in channels: + v = test_voltages[ch] + freq = SCO_FREQUENCIES[ch] + print(f" SCO {ch} ({freq:,} Hz): {v:.2f} V") + print(f" Samples: {n_samples:,}") + print(f" Duration: {n_samples / SAMPLE_RATE_BASEBAND:.3f} s") + print(f" SNR: {'clean (no noise)' if args.snr is None else f'{args.snr} dB'}") + print() + + # Build the flowgraph + print("Building flowgraph...") + tb = gr.top_block() + + tx = fm_signal_source( + channels=channels, + test_voltages=test_voltages, + snr_db=args.snr, + ) + head = blocks.head(gr.sizeof_gr_complex, n_samples) + rx = fm_downlink_receiver(channels=channels) + + tb.connect(tx, head, rx) + + # One vector sink per output channel + sinks = [] + for idx in range(len(channels)): + snk = blocks.vector_sink_f() + tb.connect((rx, idx), snk) + sinks.append(snk) + + print("Running flowgraph (TX -> RX)...") + print() + tb.run() + + # Analyze results + print("-" * 60) + print(f" {'Channel':>10} {'Freq':>10} {'Input':>8} {'Recovered':>10} {'Error':>8}") + print("-" * 60) + + max_error = 0.0 + for idx, ch in enumerate(channels): + data = np.array(sinks[idx].data()) + if len(data) == 0: + print(f" SCO {ch:>2} {SCO_FREQUENCIES[ch]:>8,} Hz {test_voltages[ch]:>6.2f} V {'NO DATA':>10} {'N/A':>8}") + continue + + # Skip first 20% for filter settling + settle = len(data) // 5 + settled = data[settle:] + + if len(settled) == 0: + mean_v = np.mean(data) + else: + mean_v = np.mean(settled) + + error = abs(test_voltages[ch] - mean_v) + max_error = max(max_error, error) + + print( + f" SCO {ch:>2} {SCO_FREQUENCIES[ch]:>8,} Hz " + f"{test_voltages[ch]:>6.2f} V {mean_v:>8.3f} V {error:>6.3f} V" + ) + + print("-" * 60) + print() + + if max_error > 0.5: + print(f"Max error: {max_error:.3f} V -- EXCESSIVE (> 0.5V)") + print("PLL may need more settling time. Try increasing --samples.") + sys.exit(1) + elif max_error > 0.1: + print(f"Max error: {max_error:.3f} V -- MODERATE") + print("Consider increasing --samples or --snr for better accuracy.") + else: + print(f"Max error: {max_error:.3f} V -- GOOD") + + print("FM loopback complete.") + + +if __name__ == "__main__": + main() diff --git a/grc/apollo_fm_demod.block.yml b/grc/apollo_fm_demod.block.yml new file mode 100644 index 0000000..8dd3194 --- /dev/null +++ b/grc/apollo_fm_demod.block.yml @@ -0,0 +1,48 @@ +id: apollo_fm_demod +label: Apollo FM Demod +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: carrier_pll_bw + label: Carrier PLL Bandwidth + dtype: real + default: '0.02' +- id: fm_deviation_hz + label: FM Deviation (Hz) + dtype: real + default: '500000' +- id: sample_rate + label: Sample Rate + dtype: real + default: '5120000' + +inputs: +- label: in + domain: stream + dtype: complex + +outputs: +- label: out + domain: stream + dtype: float + +templates: + imports: from apollo.fm_demod import fm_demod + make: apollo.fm_demod.fm_demod(carrier_pll_bw=${carrier_pll_bw}, fm_deviation_hz=${fm_deviation_hz}, sample_rate=${sample_rate}) + +documentation: |- + Apollo FM Demodulator + + Extracts frequency modulation from complex baseband signal. + Uses a carrier tracking PLL followed by quadrature demodulation + to recover the instantaneous frequency (composite SCO signal). + + Output is normalized so +/- full FM deviation maps to +/- 1.0. + + Parameters: + carrier_pll_bw: PLL loop bandwidth in rad/sample (default 0.02) + fm_deviation_hz: Expected max FM deviation in Hz (default 500 kHz) + sample_rate: Input sample rate in Hz (default 5.12 MHz) + +file_format: 1 diff --git a/grc/apollo_fm_downlink_receiver.block.yml b/grc/apollo_fm_downlink_receiver.block.yml new file mode 100644 index 0000000..9dfc9e9 --- /dev/null +++ b/grc/apollo_fm_downlink_receiver.block.yml @@ -0,0 +1,63 @@ +id: apollo_fm_downlink_receiver +label: Apollo FM Downlink Receiver +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: channels + label: SCO Channels + dtype: raw + default: '[1, 5, 9]' +- id: fm_deviation_hz + label: FM Deviation (Hz) + dtype: real + default: '500000' +- id: sample_rate + label: Sample Rate + dtype: float + default: '5120000' +- id: carrier_pll_bw + label: Carrier PLL BW + dtype: float + default: '0.02' + +inputs: +- label: in + domain: stream + dtype: complex + +outputs: +- label: ch${n} + domain: stream + dtype: float + multiplicity: ${ len(channels) } + +templates: + imports: from apollo.fm_downlink_receiver import fm_downlink_receiver + make: >- + apollo.fm_downlink_receiver.fm_downlink_receiver( + channels=${channels}, + sample_rate=${sample_rate}, + carrier_pll_bw=${carrier_pll_bw}, + fm_deviation_hz=${fm_deviation_hz}) + +documentation: |- + Apollo FM Downlink Receiver -- complete FM demod chain in one block. + + Demodulates an FM-modulated complex baseband signal and recovers + individual SCO analog telemetry channels. Each output port provides + a recovered 0-5V sensor voltage for the corresponding SCO channel. + + Output port ordering matches the channels list: + output 0 = channels[0], output 1 = channels[1], etc. + + Used in FM downlink mode (pre-launch checkout), not PM mode. + This is the receive-side counterpart to the FM Signal Source. + + Parameters: + channels: List of SCO channel numbers to decode (1-9) + fm_deviation_hz: Expected carrier FM deviation in Hz (default 500 kHz) + sample_rate: Baseband sample rate (default 5.12 MHz) + carrier_pll_bw: FM carrier recovery loop bandwidth + +file_format: 1 diff --git a/grc/apollo_fm_mod.block.yml b/grc/apollo_fm_mod.block.yml new file mode 100644 index 0000000..20c1aa9 --- /dev/null +++ b/grc/apollo_fm_mod.block.yml @@ -0,0 +1,45 @@ +id: apollo_fm_mod +label: Apollo FM Mod +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: fm_deviation_hz + label: FM Deviation (Hz) + dtype: real + default: '500000' +- id: sample_rate + label: Sample Rate + dtype: real + default: '5120000' + +inputs: +- label: in + domain: stream + dtype: float + +outputs: +- label: out + domain: stream + dtype: complex + +templates: + imports: from apollo.fm_mod import fm_mod + make: apollo.fm_mod.fm_mod(fm_deviation_hz=${fm_deviation_hz}, sample_rate=${sample_rate}) + +documentation: |- + Apollo FM Modulator + + Applies frequency modulation to produce complex baseband signal. + Takes a composite modulating signal (sum of SCO subcarriers) and outputs + an FM complex baseband where instantaneous frequency is proportional to + the input amplitude. + + Used in FM downlink mode (pre-launch checkout), not PM mode. + This is the transmit-side counterpart to Apollo FM Demod. + + Parameters: + fm_deviation_hz: Max frequency deviation in Hz (default 500 kHz) + sample_rate: Sample rate in Hz (default 5.12 MHz) + +file_format: 1 diff --git a/grc/apollo_fm_signal_source.block.yml b/grc/apollo_fm_signal_source.block.yml new file mode 100644 index 0000000..d1c8f57 --- /dev/null +++ b/grc/apollo_fm_signal_source.block.yml @@ -0,0 +1,60 @@ +id: apollo_fm_signal_source +label: Apollo FM Signal Source +category: '[Apollo USB]' +flags: [python] + +parameters: +- id: channels + label: SCO Channels + dtype: raw + default: '[1, 5, 9]' +- id: test_voltages + label: Test Voltages (dict) + dtype: raw + default: 'None' +- id: fm_deviation_hz + label: FM Deviation (Hz) + dtype: real + default: '500000' +- id: sample_rate + label: Sample Rate (Hz) + dtype: float + default: '5120000' +- id: snr_db + label: SNR (dB) + dtype: raw + default: 'None' + +outputs: +- label: out + domain: stream + dtype: complex + +templates: + imports: from apollo.fm_signal_source import fm_signal_source + make: >- + apollo.fm_signal_source.fm_signal_source( + channels=${channels}, + test_voltages=${test_voltages}, + sample_rate=${sample_rate}, + fm_deviation_hz=${fm_deviation_hz}, + snr_db=${snr_db}) + +documentation: |- + Apollo FM Signal Source -- complete FM transmit chain in one block. + + Generates an FM-modulated complex baseband signal containing + SCO analog telemetry channels. Each channel encodes a DC test + voltage as an FM subcarrier tone (14.5 kHz to 165 kHz). + + Used in FM downlink mode (pre-launch checkout), not PM mode. + This is the transmit-side counterpart to the FM Downlink Receiver. + + Parameters: + channels: List of SCO channel numbers to generate (1-9) + test_voltages: Dict mapping channel -> DC voltage (default 2.5V) + fm_deviation_hz: Carrier FM deviation in Hz (default 500 kHz) + sample_rate: Output sample rate (default 5.12 MHz) + snr_db: Add AWGN noise at this SNR (None = no noise) + +file_format: 1 diff --git a/src/apollo/__init__.py b/src/apollo/__init__.py index e274c27..3801ddc 100644 --- a/src/apollo/__init__.py +++ b/src/apollo/__init__.py @@ -26,6 +26,7 @@ from apollo.usb_signal_gen import generate_usb_baseband as generate_usb_baseband try: from apollo.bpsk_demod import bpsk_demod as bpsk_demod from apollo.bpsk_subcarrier_demod import bpsk_subcarrier_demod as bpsk_subcarrier_demod + from apollo.fm_demod import fm_demod as fm_demod from apollo.pm_demod import pm_demod as pm_demod from apollo.sco_demod import sco_demod as sco_demod from apollo.subcarrier_extract import subcarrier_extract as subcarrier_extract @@ -36,6 +37,7 @@ except ImportError: # GNU Radio transmit-side blocks try: from apollo.bpsk_subcarrier_mod import bpsk_subcarrier_mod as bpsk_subcarrier_mod + from apollo.fm_mod import fm_mod as fm_mod from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod as fm_voice_subcarrier_mod from apollo.nrz_encoder import nrz_encoder as nrz_encoder from apollo.pcm_frame_source import pcm_frame_source as pcm_frame_source @@ -48,6 +50,8 @@ except ImportError: try: from apollo.agc_bridge import agc_bridge as agc_bridge from apollo.downlink_decoder import downlink_decoder as downlink_decoder + from apollo.fm_downlink_receiver import fm_downlink_receiver as fm_downlink_receiver + from apollo.fm_signal_source import fm_signal_source as fm_signal_source from apollo.pcm_demux import pcm_demux as pcm_demux from apollo.pcm_frame_sync import pcm_frame_sync as pcm_frame_sync from apollo.uplink_encoder import uplink_encoder as uplink_encoder diff --git a/src/apollo/constants.py b/src/apollo/constants.py index 40105ef..17d955d 100644 --- a/src/apollo/constants.py +++ b/src/apollo/constants.py @@ -20,6 +20,7 @@ PM_PEAK_DEVIATION_RAD = 0.133 # 7.6 degrees peak phase deviation PM_SENSITIVITY_RAD_PER_V = 0.033 # at 1 kHz FM_VCO_SENSITIVITY_HZ_PER_V = 1_500_000 # 1.5 MHz peak / V peak FM_MODULATION_BW_HZ = 1_500_000 # 5 Hz to 1.5 MHz +FM_CARRIER_DEVIATION_HZ = 500_000 # default max FM deviation for carrier # --------------------------------------------------------------------------- # Subcarrier Frequencies (IMPL_SPEC section 4.2) diff --git a/src/apollo/fm_demod.py b/src/apollo/fm_demod.py new file mode 100644 index 0000000..fb5579b --- /dev/null +++ b/src/apollo/fm_demod.py @@ -0,0 +1,72 @@ +""" +Apollo FM Demodulator -- extracts frequency modulation from complex baseband. + +The receive-side counterpart to fm_mod. Recovers the composite modulating +signal (sum of SCO subcarriers) from an FM complex baseband input by tracking +the carrier with a PLL and extracting instantaneous frequency. + +The key difference from pm_demod: PM extracts instantaneous *phase* via +complex_to_arg, while FM extracts instantaneous *frequency* via +quadrature_demod_cf (which computes the phase derivative). + +The quadrature demod gain is chosen so that +/- full deviation maps to ++/- 1.0 at the output. This normalized output feeds directly into the +sco_demod blocks for individual channel recovery. + +Reference: IMPLEMENTATION_SPEC.md section 2.3 +""" + +import math + +from gnuradio import analog, gr + +from apollo.constants import FM_CARRIER_DEVIATION_HZ, SAMPLE_RATE_BASEBAND + + +class fm_demod(gr.hier_block2): + """FM demodulator with carrier recovery. + + Inputs: + complex baseband (FM-modulated signal) + + Outputs: + float -- demodulated composite signal containing all SCO subcarriers + """ + + def __init__( + self, + carrier_pll_bw: float = 0.02, + fm_deviation_hz: float = FM_CARRIER_DEVIATION_HZ, + sample_rate: float = SAMPLE_RATE_BASEBAND, + ): + gr.hier_block2.__init__( + self, + "apollo_fm_demod", + gr.io_signature(1, 1, gr.sizeof_gr_complex), + gr.io_signature(1, 1, gr.sizeof_float), + ) + + self._fm_deviation_hz = fm_deviation_hz + self._sample_rate = sample_rate + + # Carrier tracking PLL -- same as pm_demod + max_freq = carrier_pll_bw * 2.0 + min_freq = -max_freq + self.pll = analog.pll_carriertracking_cc(carrier_pll_bw, max_freq, min_freq) + + # Quadrature demod: extract instantaneous frequency + # Gain normalizes output so +/- fm_deviation_hz maps to +/- 1.0 + fm_gain = sample_rate / (2.0 * math.pi * fm_deviation_hz) + self.fm_demod = analog.quadrature_demod_cf(fm_gain) + + # Connect: input -> PLL -> quadrature demod -> output + self.connect(self, self.pll, self.fm_demod, self) + + def get_carrier_pll_bw(self) -> float: + return self.pll.get_loop_bandwidth() + + def set_carrier_pll_bw(self, bw: float): + self.pll.set_loop_bandwidth(bw) + + def get_fm_deviation(self) -> float: + return self._fm_deviation_hz diff --git a/src/apollo/fm_downlink_receiver.py b/src/apollo/fm_downlink_receiver.py new file mode 100644 index 0000000..81ba9c7 --- /dev/null +++ b/src/apollo/fm_downlink_receiver.py @@ -0,0 +1,94 @@ +""" +Apollo FM Downlink Receiver -- top-level hierarchical block for FM mode. + +Combines FM carrier demodulation with per-channel SCO demodulation: + complex baseband -> fm_demod -> sco_demod(ch1) -> output[0] + -> sco_demod(ch2) -> output[1] + -> sco_demod(chN) -> output[N-1] + +Input: complex baseband samples at 5.12 MHz +Output: N streaming float outputs, one per SCO channel (recovered 0-5V voltage) + +Unlike usb_downlink_receiver (which outputs PDU messages), this block uses +streaming float outputs because SCO telemetry is continuous analog data, +not discrete frames. + +For finer control over individual channel parameters, use fm_demod and +sco_demod blocks directly. + +Reference: IMPLEMENTATION_SPEC.md sections 2.3, 4.3 +""" + +from gnuradio import gr + +from apollo.constants import FM_CARRIER_DEVIATION_HZ, SAMPLE_RATE_BASEBAND, SCO_FREQUENCIES +from apollo.fm_demod import fm_demod +from apollo.sco_demod import sco_demod + + +class fm_downlink_receiver(gr.hier_block2): + """Apollo FM downlink receiver -- complex baseband to recovered SCO voltages. + + Inputs: + complex -- baseband IQ samples at sample_rate (default 5.12 MHz) + + Outputs: + float[0..N-1] -- recovered sensor voltage per SCO channel (0.0 to 5.0 V) + Output ordering matches the channels list: output 0 = channels[0], etc. + """ + + def __init__( + self, + channels: list[int] | None = None, + sample_rate: float = SAMPLE_RATE_BASEBAND, + carrier_pll_bw: float = 0.02, + fm_deviation_hz: float = FM_CARRIER_DEVIATION_HZ, + ): + if channels is None: + channels = [1, 5, 9] + + n_channels = len(channels) + + gr.hier_block2.__init__( + self, + "apollo_fm_downlink_receiver", + gr.io_signature(1, 1, gr.sizeof_gr_complex), + gr.io_signature(n_channels, n_channels, gr.sizeof_float), + ) + + self._channels = list(channels) + self._sample_rate = sample_rate + + # Validate channels + for ch in self._channels: + if ch not in SCO_FREQUENCIES: + raise ValueError( + f"SCO channel {ch} invalid. Valid: {sorted(SCO_FREQUENCIES.keys())}" + ) + + # Stage 1: FM carrier demodulator + self.fm = fm_demod( + carrier_pll_bw=carrier_pll_bw, + fm_deviation_hz=fm_deviation_hz, + sample_rate=sample_rate, + ) + + self.connect(self, self.fm) + + # Stage 2: Per-channel SCO demodulators + self._sco_demods = {} + for idx, ch in enumerate(self._channels): + demod = sco_demod(sco_number=ch, sample_rate=sample_rate) + self._sco_demods[ch] = demod + + # fm_demod output -> sco_demod -> hier output[idx] + self.connect(self.fm, demod, (self, idx)) + + @property + def channels(self) -> list[int]: + """SCO channel numbers being decoded.""" + return list(self._channels) + + def get_sco_demod(self, channel: int) -> sco_demod: + """Access a specific SCO demodulator for runtime inspection.""" + return self._sco_demods[channel] diff --git a/src/apollo/fm_mod.py b/src/apollo/fm_mod.py new file mode 100644 index 0000000..ac8faba --- /dev/null +++ b/src/apollo/fm_mod.py @@ -0,0 +1,58 @@ +""" +Apollo FM Modulator -- applies frequency modulation to produce complex baseband. + +The transmit-side counterpart to fm_demod. Takes a composite modulating signal +(sum of SCO subcarriers) and produces FM complex baseband where the +instantaneous frequency is proportional to the input amplitude. + +In FM downlink mode, the spacecraft uses wideband FM instead of the narrow PM +used for normal PCM/voice operations. The SCO composite signal frequency-modulates +the carrier with much higher deviation than PM mode. + +Reference: IMPLEMENTATION_SPEC.md section 2.3 +""" + +import math + +from gnuradio import analog, blocks, gr + +from apollo.constants import FM_CARRIER_DEVIATION_HZ, SAMPLE_RATE_BASEBAND + + +class fm_mod(gr.hier_block2): + """FM modulator: float input -> FM complex baseband output.""" + + def __init__( + self, + fm_deviation_hz: float = FM_CARRIER_DEVIATION_HZ, + sample_rate: float = SAMPLE_RATE_BASEBAND, + ): + gr.hier_block2.__init__( + self, + "apollo_fm_mod", + gr.io_signature(1, 1, gr.sizeof_float), + gr.io_signature(1, 1, gr.sizeof_gr_complex), + ) + + self._fm_deviation_hz = fm_deviation_hz + self._sample_rate = sample_rate + + # Runtime-adjustable gain (unity default; mirrors pm_mod pattern) + self.gain = blocks.multiply_const_ff(1.0) + + # FM modulate: sensitivity encodes full deviation + sensitivity = 2.0 * math.pi * fm_deviation_hz / sample_rate + self.modulator = analog.frequency_modulator_fc(sensitivity) + + # Connect: input -> gain -> fm_mod -> output + self.connect(self, self.gain, self.modulator, self) + + def get_fm_deviation(self) -> float: + """Return current FM deviation in Hz.""" + return self._fm_deviation_hz + + def set_fm_deviation(self, hz: float): + """Update FM deviation at runtime by rescaling modulator sensitivity.""" + self._fm_deviation_hz = hz + sensitivity = 2.0 * math.pi * hz / self._sample_rate + self.modulator.set_sensitivity(sensitivity) diff --git a/src/apollo/fm_signal_source.py b/src/apollo/fm_signal_source.py new file mode 100644 index 0000000..d28e098 --- /dev/null +++ b/src/apollo/fm_signal_source.py @@ -0,0 +1,135 @@ +""" +Apollo FM Downlink Signal Source -- complete FM transmit chain in one block. + +The transmit-side counterpart to fm_downlink_receiver. Wires together the +full FM modulation chain for SCO analog telemetry: + + dc_source(v1) -> sco_mod(ch1) -+-> add_ff -> fm_mod -> [+AWGN] -> complex out + dc_source(v2) -> sco_mod(ch2) -+ + dc_source(vN) -> sco_mod(chN) -+ + +In FM downlink mode (used for pre-launch checkout), the spacecraft replaces +the PCM/voice subcarrier system with 9 Subcarrier Oscillators (SCOs) that +encode analog sensor voltages as FM tones. These SCO tones are summed and +frequency-modulate the RF carrier. + +For finer control, use the individual sco_mod and fm_mod blocks directly. + +Reference: IMPLEMENTATION_SPEC.md sections 2.3, 4.3 +""" + +import math + +from gnuradio import analog, blocks, gr + +from apollo.constants import FM_CARRIER_DEVIATION_HZ, SAMPLE_RATE_BASEBAND, SCO_FREQUENCIES +from apollo.fm_mod import fm_mod +from apollo.sco_mod import sco_mod + + +class fm_signal_source(gr.hier_block2): + """Apollo FM downlink signal source -- complex baseband output. + + Outputs: + complex -- FM-modulated baseband at sample_rate (default 5.12 MHz) + + Generates DC test voltages for each configured SCO channel, modulates + them onto their respective subcarrier tones, sums the composite, and + applies wideband FM to produce complex baseband. + + Optional AWGN noise can be added by setting snr_db to a finite value. + """ + + def __init__( + self, + channels: list[int] | None = None, + test_voltages: dict[int, float] | None = None, + sample_rate: float = SAMPLE_RATE_BASEBAND, + fm_deviation_hz: float = FM_CARRIER_DEVIATION_HZ, + snr_db: float | None = None, + ): + if channels is None: + channels = [1, 5, 9] + + if test_voltages is None: + test_voltages = {ch: 2.5 for ch in channels} + + gr.hier_block2.__init__( + self, + "apollo_fm_signal_source", + gr.io_signature(0, 0, 0), # source -- no input + gr.io_signature(1, 1, gr.sizeof_gr_complex), + ) + + self._channels = list(channels) + self._test_voltages = dict(test_voltages) + self._fm_deviation_hz = fm_deviation_hz + self._sample_rate = sample_rate + + # Validate channels + for ch in self._channels: + if ch not in SCO_FREQUENCIES: + raise ValueError( + f"SCO channel {ch} invalid. Valid: {sorted(SCO_FREQUENCIES.keys())}" + ) + + # --- Build SCO modulation chains --- + + self._dc_sources = {} + self._sco_mods = {} + n_channels = len(self._channels) + + self.adder = blocks.add_ff(1) + + for idx, ch in enumerate(self._channels): + voltage = self._test_voltages.get(ch, 2.5) + + # DC source at the test voltage + dc = analog.sig_source_f( + sample_rate, analog.GR_CONST_WAVE, 0, voltage, 0, + ) + self._dc_sources[ch] = dc + + # SCO modulator for this channel + mod = sco_mod(sco_number=ch, sample_rate=sample_rate) + self._sco_mods[ch] = mod + + # Connect: dc -> sco_mod -> adder port idx + self.connect(dc, mod, (self.adder, idx)) + + # --- FM carrier modulation --- + + self.fm = fm_mod(fm_deviation_hz=fm_deviation_hz, sample_rate=sample_rate) + self.connect(self.adder, self.fm) + + # --- Optional AWGN --- + + if snr_db is not None: + noise_power = 1.0 / (10.0 ** (snr_db / 10.0)) + noise_amplitude = math.sqrt(noise_power / 2.0) + + self.noise = analog.noise_source_c( + analog.GR_GAUSSIAN, noise_amplitude, 0, + ) + self.sum_noise = blocks.add_cc(1) + + self.connect(self.fm, (self.sum_noise, 0)) + self.connect(self.noise, (self.sum_noise, 1)) + self.connect(self.sum_noise, self) + else: + self.connect(self.fm, self) + + @property + def channels(self) -> list[int]: + """SCO channel numbers being generated.""" + return list(self._channels) + + @property + def test_voltages(self) -> dict[int, float]: + """Current test voltage per channel.""" + return dict(self._test_voltages) + + @property + def fm_deviation_hz(self) -> float: + """Carrier FM deviation in Hz.""" + return self._fm_deviation_hz