#!/usr/bin/env python3 """ Apollo AGC Integration Demo -- full communications loop with Virtual AGC. Demonstrates the complete Apollo unified S-band communications path: yaAGC (emulator) | DNTM1/DNTM2 telemetry via TCP v agc_bridge | PDU message v downlink_decoder --> print decoded telemetry ^ | PDU frames usb_downlink_receiver (RX chain) ^ | complex baseband usb_signal_source (TX chain) And the uplink path: DSKY commands | uplink_encoder | (channel, value) pairs v agc_bridge --> yaAGC (INLINK channel 045) Prerequisites: 1. Install Virtual AGC: https://www.ibiblio.org/apollo/ 2. Start yaAGC with a mission (e.g., Luminary099 for Apollo 11 LM): $ yaAGC --core=Luminary099.bin --port=19697 3. Optionally start yaDSKY2 for visual display: $ yaDSKY2 --port=19698 Usage: uv run python examples/agc_loopback_demo.py uv run python examples/agc_loopback_demo.py --host 192.168.1.100 uv run python examples/agc_loopback_demo.py --port 19697 uv run python examples/agc_loopback_demo.py --send-v16n36 # request time display """ import argparse import sys import time from apollo.agc_bridge import AGCBridgeClient from apollo.constants import ( AGC_CH_DNTM1, AGC_CH_DNTM2, AGC_CH_OUTLINK, AGC_PORT_BASE, ) from apollo.downlink_decoder import DownlinkEngine from apollo.uplink_encoder import UplinkEncoder def main(): parser = argparse.ArgumentParser( description="Apollo AGC integration demo -- connect to yaAGC emulator" ) parser.add_argument("--host", default="localhost", help="yaAGC host (default: localhost)") parser.add_argument("--port", type=int, default=AGC_PORT_BASE, help="yaAGC port (default: 19697)") parser.add_argument("--duration", type=float, default=10.0, help="Run duration in seconds") parser.add_argument("--send-v16n36", action="store_true", help="Send V16N36E (display time) to AGC") args = parser.parse_args() print("=" * 60) print("Apollo AGC Integration Demo") print("=" * 60) print(f" Target: {args.host}:{args.port}") print(f" Duration: {args.duration} seconds") print() # Downlink decoder accumulates telemetry words decoder = DownlinkEngine() packet_count = 0 telemetry_words = 0 def on_packet(channel: int, value: int): nonlocal packet_count, telemetry_words packet_count += 1 if channel in (AGC_CH_DNTM1, AGC_CH_DNTM2): telemetry_words += 1 decoder.feed_agc_word(channel, value) elif channel == AGC_CH_OUTLINK: print(f" OUTLINK: ch={channel:03o} val={value:05o} ({value})") def on_status(state: str): print(f" Connection: {state}") # Connect to yaAGC client = AGCBridgeClient( host=args.host, port=args.port, channel_filter=None, # accept all channels for this demo on_packet=on_packet, on_status=on_status, ) print(f"Connecting to yaAGC at {args.host}:{args.port}...") client.start() # Wait for connection for _ in range(20): # 10 seconds max if client.connected: break time.sleep(0.5) if not client.connected: print() print("Could not connect to yaAGC.") print() print("Make sure yaAGC is running:") print(f" yaAGC --core=Luminary099.bin --port={args.port}") print() print("Or try a different host/port:") print(" python examples/agc_loopback_demo.py --host --port ") client.stop() sys.exit(1) print() # Optionally send a DSKY command if args.send_v16n36: print("Sending V16N36E (display time)...") encoder = UplinkEncoder() pairs = encoder.encode_verb_noun(verb=16, noun=36) for channel, value in pairs: client.send(channel, value) time.sleep(0.1) # pace for UPRUPT processing print(f" Sent {len(pairs)} uplink words") print() # Collect telemetry for the specified duration print(f"Collecting telemetry for {args.duration} seconds...") print("-" * 60) start_time = time.time() last_snapshot_count = 0 try: while time.time() - start_time < args.duration: time.sleep(0.5) # Check for new telemetry snapshots snapshots = decoder._completed_snapshots if len(snapshots) > last_snapshot_count: for snap in snapshots[last_snapshot_count:]: list_type = snap.get("list_type_id", "?") list_name = snap.get("list_name", "Unknown") n_words = snap.get("word_count", 0) print(f" Telemetry snapshot: {list_name} " f"(type {list_type}), {n_words} words") # Show first few words words = snap.get("words", []) for i, val in enumerate(words[:5]): print(f" [{i:03d}] = {val:05o} ({val})") if len(words) > 5: print(f" ... ({len(words) - 5} more words)") last_snapshot_count = len(snapshots) except KeyboardInterrupt: print() print("Interrupted.") print("-" * 60) print() print("Summary:") print(f" Total packets received: {packet_count}") print(f" Telemetry words: {telemetry_words}") print(f" Telemetry snapshots: {len(decoder._completed_snapshots)}") print(f" Duration: {time.time() - start_time:.1f} seconds") client.stop() print() print("Done.") if __name__ == "__main__": main()