diff --git a/examples/audio/apollo11_crew.wav b/examples/audio/apollo11_crew.wav new file mode 100644 index 0000000..86b8ab8 Binary files /dev/null and b/examples/audio/apollo11_crew.wav differ diff --git a/examples/voice_subcarrier_demo.py b/examples/voice_subcarrier_demo.py new file mode 100644 index 0000000..27090c7 --- /dev/null +++ b/examples/voice_subcarrier_demo.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +""" +Apollo Voice Subcarrier Demo -- modulate real audio onto 1.25 MHz FM subcarrier. + +Takes an audio file (e.g., actual Apollo mission crew recordings), modulates +it onto the 1.25 MHz FM voice subcarrier with +/-29 kHz deviation, then +demodulates it back to audio. This is exactly what the spacecraft's +Pre-Modulation Processor and the ground station receiver did. + +Signal path: + audio file (8 kHz) + -> upsample to 5.12 MHz + -> fm_voice_subcarrier_mod (audio_input=True) + [audio -> FM mod -> upconvert to 1.25 MHz -> float subcarrier] + -> voice_subcarrier_demod + [BPF 1.25 MHz -> FM discriminator -> BPF 300-3000 Hz -> decimate to 8 kHz] + -> recovered audio (8 kHz WAV) + +Usage: + uv run python examples/voice_subcarrier_demo.py examples/audio/apollo11_crew.wav + uv run python examples/voice_subcarrier_demo.py input.wav --output recovered.wav + uv run python examples/voice_subcarrier_demo.py input.wav --play +""" + +import argparse +import time +from math import gcd + +import numpy as np +from gnuradio import blocks, gr +from scipy.io import wavfile +from scipy.signal import resample_poly + +from apollo.constants import SAMPLE_RATE_BASEBAND +from apollo.fm_voice_subcarrier_mod import fm_voice_subcarrier_mod +from apollo.voice_subcarrier_demod import voice_subcarrier_demod + + +def main(): + parser = argparse.ArgumentParser( + description="Modulate audio onto Apollo 1.25 MHz FM voice subcarrier" + ) + parser.add_argument("input", help="Input WAV file (any sample rate)") + parser.add_argument("--output", "-o", default=None, + help="Output WAV file (default: _recovered.wav)") + parser.add_argument("--play", action="store_true", + help="Play recovered audio with aplay") + parser.add_argument("--sample-rate", type=float, default=SAMPLE_RATE_BASEBAND, + help="Baseband sample rate (default: 5.12 MHz)") + args = parser.parse_args() + + if args.output is None: + stem = args.input.rsplit(".", 1)[0] + args.output = f"{stem}_recovered.wav" + + # Load input audio + print("=" * 60) + print("Apollo Voice Subcarrier Demo") + print("=" * 60) + print() + + input_rate, audio_data = wavfile.read(args.input) + if audio_data.ndim > 1: + audio_data = audio_data[:, 0] # mono + + # Normalize to [-1, 1] float + if audio_data.dtype == np.int16: + audio_float = audio_data.astype(np.float32) / 32768.0 + elif audio_data.dtype == np.int32: + audio_float = audio_data.astype(np.float32) / 2147483648.0 + else: + audio_float = audio_data.astype(np.float32) + + duration = len(audio_float) / input_rate + print(f" Input: {args.input}") + print(f" Sample rate: {input_rate} Hz") + print(f" Duration: {duration:.2f} s") + print(f" Samples: {len(audio_float):,}") + print() + + # Upsample to baseband rate + sample_rate = int(args.sample_rate) + audio_rate = 8000 # output rate from voice demod + + # Resample input to 8 kHz first (the standard Apollo voice bandwidth) + if input_rate != audio_rate: + g = gcd(audio_rate, input_rate) + audio_float = resample_poly(audio_float, audio_rate // g, input_rate // g) + audio_float = audio_float.astype(np.float32) + print(f" Resampled to {audio_rate} Hz: {len(audio_float):,} samples") + + # Upsample from 8 kHz to baseband (5.12 MHz) + # Factor: 5120000 / 8000 = 640 + g = gcd(sample_rate, audio_rate) + up = sample_rate // g + down = audio_rate // g + print(f" Upsampling {audio_rate} Hz -> {sample_rate / 1e6:.2f} MHz " + f"(ratio {up}:{down})...") + + t0 = time.time() + upsampled = resample_poly(audio_float, up, down) + upsampled = upsampled.astype(np.float32) + t_resample = time.time() - t0 + print(f" Upsampled: {len(upsampled):,} samples ({t_resample:.1f}s)") + print() + + # Build GNU Radio flowgraph: voice mod -> voice demod + print("Building flowgraph: FM mod (1.25 MHz) -> FM demod...") + tb = gr.top_block() + + src = blocks.vector_source_f(upsampled.tolist()) + voice_mod = fm_voice_subcarrier_mod( + sample_rate=sample_rate, + audio_input=True, + ) + voice_demod = voice_subcarrier_demod( + sample_rate=sample_rate, + audio_rate=audio_rate, + ) + snk = blocks.vector_sink_f() + + tb.connect(src, voice_mod, voice_demod, snk) + + print("Running flowgraph...") + t0 = time.time() + tb.run() + t_run = time.time() - t0 + + recovered = np.array(snk.data(), dtype=np.float32) + print(f" Processed in {t_run:.1f}s") + print(f" Recovered: {len(recovered):,} samples at {audio_rate} Hz") + print(f" Duration: {len(recovered) / audio_rate:.2f} s") + print() + + # Normalize recovered audio for WAV output + peak = np.max(np.abs(recovered)) + if peak > 0: + recovered = recovered / peak * 0.9 # normalize to 90% to avoid clipping + + # Save as 16-bit WAV + recovered_int16 = (recovered * 32767).astype(np.int16) + wavfile.write(args.output, audio_rate, recovered_int16) + print(f" Saved: {args.output}") + print(f" Peak amplitude: {peak:.4f}") + print() + + # Play if requested + if args.play: + import subprocess + print("Playing recovered audio...") + subprocess.run(["aplay", args.output], check=False) + else: + print(f"Play with: aplay {args.output}") + + print() + print("Done.") + + +if __name__ == "__main__": + main()