""" Apollo Uplink Command Encoder — formats ground commands for AGC channel 45 (INLINK). The MSFN ground station sends commands to the spacecraft via the Up-Data Link, which delivers 15-bit words to AGC I/O channel 045 (octal). Each word triggers the UPRUPT interrupt in the flight software. Command encoding follows the DSKY command structure: VERB-NOUN pairs optionally followed by data words. This module translates high-level command descriptions into the (channel, value) pairs expected by the AGC socket protocol. Standalone class: UplinkEncoder — encodes command types into (channel, value) tuples GNU Radio wrapper: uplink_encoder — message port block for use in GRC flowgraphs Reference: IMPLEMENTATION_SPEC.md section 1 (INLINK / UPRUPT) """ import logging from apollo.constants import AGC_CH_INLINK logger = logging.getLogger(__name__) # DSKY key codes (5-bit encoding used by the AGC for keyboard input). # These map to the bit patterns the Up-Data Link sends on channel 045. # Bits 14-10 carry the key code, bits 9-5 carry additional data for DATA words. KEYCODE_VERB = 0o21 # 17 decimal — VERB key KEYCODE_NOUN = 0o37 # 31 decimal — NOUN key KEYCODE_ENTER = 0o34 # 28 decimal — ENTER / PROCEED KEYCODE_RESET = 0o22 # 18 decimal — RESET / KEY RELEASE KEYCODE_CLEAR = 0o36 # 30 decimal — CLEAR # Digit keycodes 0-9 KEYCODE_DIGITS = { 0: 0o20, # 16 decimal 1: 0o01, 2: 0o02, 3: 0o03, 4: 0o04, 5: 0o05, 6: 0o06, 7: 0o07, 8: 0o10, # 8 decimal 9: 0o11, # 9 decimal } KEYCODE_PLUS = 0o32 # 26 decimal — + sign KEYCODE_MINUS = 0o33 # 27 decimal — - sign class UplinkEncoder: """Encodes ground commands into AGC INLINK (channel, value) pairs. Each method returns a list of (channel, value) tuples representing the sequence of words to deliver to AGC channel 045. The AGC processes one word per UPRUPT, so multi-word sequences must be sent with appropriate timing (the bridge handles pacing). Args: channel: AGC I/O channel for uplink data. Default is channel 045 (INLINK). """ def __init__(self, channel: int = AGC_CH_INLINK): self.channel = channel def encode_keycode(self, keycode: int) -> tuple[int, int]: """Encode a single DSKY keycode as a (channel, value) pair. The keycode occupies bits 14-10 of the 15-bit value. Bits 9-0 are zero for simple key presses. """ value = (keycode & 0x1F) << 10 return (self.channel, value) def encode_digit(self, digit: int) -> tuple[int, int]: """Encode a single decimal digit (0-9).""" if digit not in KEYCODE_DIGITS: raise ValueError(f"digit must be 0-9, got {digit}") return self.encode_keycode(KEYCODE_DIGITS[digit]) def encode_verb(self, verb_number: int) -> list[tuple[int, int]]: """Encode a VERB command (e.g., V37 → [VERB, 3, 7]). Args: verb_number: Two-digit verb number (0-99). Returns: List of (channel, value) pairs: VERB key + two digit keys. """ if not 0 <= verb_number <= 99: raise ValueError(f"verb must be 0-99, got {verb_number}") d1 = verb_number // 10 d2 = verb_number % 10 return [ self.encode_keycode(KEYCODE_VERB), self.encode_digit(d1), self.encode_digit(d2), ] def encode_noun(self, noun_number: int) -> list[tuple[int, int]]: """Encode a NOUN selection (e.g., N01 → [NOUN, 0, 1]). Args: noun_number: Two-digit noun number (0-99). Returns: List of (channel, value) pairs: NOUN key + two digit keys. """ if not 0 <= noun_number <= 99: raise ValueError(f"noun must be 0-99, got {noun_number}") d1 = noun_number // 10 d2 = noun_number % 10 return [ self.encode_keycode(KEYCODE_NOUN), self.encode_digit(d1), self.encode_digit(d2), ] def encode_data(self, value: int, signed: bool = True) -> list[tuple[int, int]]: """Encode a 5-digit data entry (e.g., +12345 → [+, 1, 2, 3, 4, 5]). Args: value: Integer data value. If signed, range is -99999 to +99999. If unsigned, range is 0 to 99999. signed: If True, prepend a +/- sign key. Returns: List of (channel, value) pairs for the digit sequence. """ pairs: list[tuple[int, int]] = [] if signed: if value < 0: pairs.append(self.encode_keycode(KEYCODE_MINUS)) value = abs(value) else: pairs.append(self.encode_keycode(KEYCODE_PLUS)) if not 0 <= value <= 99999: raise ValueError(f"data magnitude must be 0-99999, got {value}") digits = f"{value:05d}" for ch in digits: pairs.append(self.encode_digit(int(ch))) return pairs def encode_proceed(self) -> list[tuple[int, int]]: """Encode a PROCEED (ENTER) keystroke.""" return [self.encode_keycode(KEYCODE_ENTER)] def encode_command( self, command_type: str, data: int | None = None ) -> list[tuple[int, int]]: """High-level command encoder dispatching by type. Args: command_type: One of "VERB", "NOUN", "DATA", "PROCEED". data: Required for VERB (verb number), NOUN (noun number), and DATA (integer value). Ignored for PROCEED. Returns: List of (channel, value) pairs. Raises: ValueError: Unknown command type or missing data. """ ct = command_type.upper() if ct == "VERB": if data is None: raise ValueError("VERB requires a verb number") return self.encode_verb(data) elif ct == "NOUN": if data is None: raise ValueError("NOUN requires a noun number") return self.encode_noun(data) elif ct == "DATA": if data is None: raise ValueError("DATA requires a value") return self.encode_data(data) elif ct == "PROCEED": return self.encode_proceed() else: raise ValueError(f"unknown command type: {command_type!r}") def encode_verb_noun(self, verb: int, noun: int) -> list[tuple[int, int]]: """Convenience: encode a full V-N-ENTER sequence. Args: verb: Verb number (0-99). noun: Noun number (0-99). Returns: Sequence: VERB + digits + NOUN + digits + ENTER. """ pairs = self.encode_verb(verb) pairs.extend(self.encode_noun(noun)) pairs.extend(self.encode_proceed()) return pairs # --------------------------------------------------------------------------- # GNU Radio wrapper # --------------------------------------------------------------------------- try: import pmt from gnuradio import gr class uplink_encoder(gr.basic_block): """GNU Radio block encoding DSKY commands for AGC uplink. Message ports: command (input) — PDU with metadata dict containing: "type": string ("VERB", "NOUN", "DATA", "PROCEED") "data": long (optional, depends on type) uplink_words (output) — sequence of PDUs, each containing a single (channel, value) pair for the AGC bridge """ def __init__(self, channel: int = AGC_CH_INLINK): gr.basic_block.__init__( self, name="apollo_uplink_encoder", in_sig=[], out_sig=[] ) self.message_port_register_in(pmt.intern("command")) self.message_port_register_out(pmt.intern("uplink_words")) self.set_msg_handler(pmt.intern("command"), self._handle_command) self._encoder = UplinkEncoder(channel=channel) def _handle_command(self, msg): """Parse a command PDU and emit encoded uplink words.""" if not pmt.is_pair(msg): return meta = pmt.car(msg) if not pmt.is_dict(meta): return cmd_type_pmt = pmt.dict_ref( meta, pmt.intern("type"), pmt.PMT_NIL ) if pmt.is_null(cmd_type_pmt): return cmd_type = pmt.symbol_to_string(cmd_type_pmt) data_pmt = pmt.dict_ref(meta, pmt.intern("data"), pmt.PMT_NIL) data = pmt.to_long(data_pmt) if not pmt.is_null(data_pmt) else None try: pairs = self._encoder.encode_command(cmd_type, data) except ValueError as exc: logger.warning("encode_command failed: %s", exc) return for channel, value in pairs: out_meta = pmt.make_dict() out_meta = pmt.dict_add( out_meta, pmt.intern("channel"), pmt.from_long(channel) ) out_meta = pmt.dict_add( out_meta, pmt.intern("value"), pmt.from_long(value) ) out_data = pmt.cons(pmt.from_long(channel), pmt.from_long(value)) self.message_port_pub( pmt.intern("uplink_words"), pmt.cons(out_meta, out_data) ) except ImportError: pass