"""Type codecs: IDS wire bytes ↔ Python values. Phase 2 implements decoders for the MVP type set (SMALLINT, INT, BIGINT, SMFLOAT, FLOAT, CHAR, VARCHAR, BOOL, DATE). Encoders are stubbed — they land in Phase 4 with parameter binding. Decoder dispatch: ``decode(type_code, raw_bytes) → python value`` looks up the codec in ``DECODERS`` keyed by ``IfxType`` (after stripping high-bit flags via ``_types.base_type``). NULL values are signaled by the row decoder, not by sentinel bytes here. For DATE we use the Informix epoch (1899-12-31). The raw bytes are a 4-byte big-endian signed int representing day count. """ from __future__ import annotations import datetime import decimal import struct from collections.abc import Callable from ._types import IfxType, base_type # Informix DATE epoch — day 0 is December 31, 1899 (per Informix convention). _INFORMIX_DATE_EPOCH = datetime.date(1899, 12, 31) DecoderFn = Callable[[bytes], object] # Informix uses sentinel values for NULL per type — see DECISION_LOG.md # entry on null sentinel discovery (2026-05-04). _INT_NULL = 0x80000000 # INT_MIN _SMALLINT_NULL = 0x8000 # SHORT_MIN _BIGINT_NULL = 0x8000000000000000 # LONG_MIN _REAL_NULL = b"\xff\xff\xff\xff" _DOUBLE_NULL = b"\xff\xff\xff\xff\xff\xff\xff\xff" _DATE_NULL = 0x80000000 def _decode_smallint(raw: bytes) -> int | None: val = struct.unpack("!h", raw)[0] return None if val == -0x8000 else val def _decode_int(raw: bytes) -> int | None: val = struct.unpack("!i", raw)[0] return None if val == -0x80000000 else val def _decode_bigint(raw: bytes) -> int | None: val = struct.unpack("!q", raw)[0] return None if val == -0x8000000000000000 else val def _decode_smfloat(raw: bytes) -> float | None: if raw == _REAL_NULL: return None return struct.unpack("!f", raw)[0] def _decode_float(raw: bytes) -> float | None: if raw == _DOUBLE_NULL: return None return struct.unpack("!d", raw)[0] def _decode_char(raw: bytes) -> str: """Strip trailing spaces (CHAR is space-padded to declared length).""" return raw.rstrip(b" \x00").decode("iso-8859-1") def _decode_varchar(raw: bytes) -> str | None: """VARCHAR — variable-length string. NULL is the special sentinel ``\\x00`` (single nul byte). The row decoder peels off the length prefix and passes the content here. Note: VARCHAR cannot contain embedded nuls anyway, so a single-nul value is unambiguously the NULL marker. """ if raw == b"\x00": return None return raw.rstrip(b"\x00").decode("iso-8859-1") def _decode_bool(raw: bytes) -> bool: """Informix BOOLEAN is one byte: 't'/'T' (true), 'f'/'F' (false).""" if not raw: raise ValueError("empty BOOL payload") return raw[0] in (ord("t"), ord("T"), 1) def _decode_date(raw: bytes) -> datetime.date | None: """4-byte big-endian signed int = day count from 1899-12-31. NULL = 0x80000000.""" days = struct.unpack("!i", raw)[0] if days == -0x80000000: return None return _INFORMIX_DATE_EPOCH + datetime.timedelta(days=days) def _decode_decimal(raw: bytes) -> decimal.Decimal | None: """Decode IDS DECIMAL/MONEY: base-100 packed BCD with sign/exponent header. Wire format (per ``com.informix.lang.Decimal.init``, line 374): byte[0]: ``(sign << 7) | (biased_exponent & 0x7F)`` - sign bit (bit 7): 1 = positive, 0 = negative - biased_exponent (bits 0-6): actual exponent = biased - 64, measured in BASE-100 digits before the decimal point byte[1..]: digit-pair bytes; each byte holds two decimal digits as a single base-100 number (0..99). If the value is NEGATIVE, each digit-pair is stored as 99-d (i.e., 9's complement in base 100). NULL marker: byte[0] == 0 AND byte[1] == 0. """ if len(raw) < 2 or (raw[0] == 0 and raw[1] == 0): return None expbyte = raw[0] is_positive = (expbyte & 0x80) != 0 # For negative: exponent byte is XOR'd with 0x7F to recover real # exponent (per IfxToJavaDecimal.init line 386). biased_exp = (expbyte & 0x7F) if is_positive else ((expbyte ^ 0x7F) & 0x7F) exponent_base100 = biased_exp - 64 # in base-100 digits digits = list(raw[1:]) if not is_positive: # Asymmetric base-100 complement (per Decimal.decComplement, line 447): # walk from RIGHT to LEFT; trailing zeros stay zero; the first # non-zero is subtracted from 100; subsequent from 99. # Without this, trailing 99s appear in the decoded value (a # 1234.559999 / 0.4999... rounding-style artifact). sub_from = 100 for i in range(len(digits) - 1, -1, -1): if digits[i] == 0 and sub_from == 100: continue digits[i] = sub_from - digits[i] sub_from = 99 # Build the decimal-string representation. # exponent_base100 is the count of BASE-100 digits before the decimal # point; multiplying by 2 gives BASE-10 digits before the decimal. base10_exp = exponent_base100 * 2 # Concatenate all digit-pairs as a string, dropping trailing zeros # for normalization. digit_str = "".join(f"{d:02d}" for d in digits) if not digit_str: return decimal.Decimal(0) sign_str = "" if is_positive else "-" # Build "E" — Decimal will normalize. # Each digit-pair represents 2 base-10 digits; the value is # digit_str interpreted as an integer * 10^(base10_exp - len(digit_str)) if base10_exp >= 0: # The decimal point is to the RIGHT of digit_str's start by # base10_exp positions. if base10_exp >= len(digit_str): # All digits are integer; pad with zeros to reach the exp. int_part = digit_str + "0" * (base10_exp - len(digit_str)) return decimal.Decimal(f"{sign_str}{int_part}") else: int_part = digit_str[:base10_exp] or "0" frac_part = digit_str[base10_exp:].rstrip("0") if frac_part: return decimal.Decimal(f"{sign_str}{int_part}.{frac_part}") return decimal.Decimal(f"{sign_str}{int_part}") else: # base10_exp < 0: leading zeros in the fraction frac_zeros = "0" * (-base10_exp) frac_part = (frac_zeros + digit_str).rstrip("0") if frac_part: return decimal.Decimal(f"{sign_str}0.{frac_part}") return decimal.Decimal(0) # Wire byte length per Phase-2-MVP type. Used by the row decoder to # slice column values out of an SQ_TUPLE payload for fixed-width types. # Variable-width types (CHAR, VARCHAR, DECIMAL, etc.) are length-prefixed # on the wire and don't appear in this table. FIXED_WIDTHS: dict[int, int] = { IfxType.SMALLINT: 2, IfxType.INT: 4, IfxType.SERIAL: 4, IfxType.SMFLOAT: 4, IfxType.FLOAT: 8, IfxType.BIGINT: 8, IfxType.BIGSERIAL: 8, IfxType.DATE: 4, IfxType.BOOL: 1, } # Phase 2 MVP decoders. Phase 6+ adds DATETIME, INTERVAL, DECIMAL, # MONEY, LVARCHAR, BYTE/TEXT, BLOB/CLOB, ROW, COLLECTION. DECODERS: dict[int, DecoderFn] = { IfxType.SMALLINT: _decode_smallint, IfxType.INT: _decode_int, IfxType.SERIAL: _decode_int, IfxType.BIGINT: _decode_bigint, IfxType.BIGSERIAL: _decode_bigint, IfxType.SMFLOAT: _decode_smfloat, IfxType.FLOAT: _decode_float, IfxType.CHAR: _decode_char, IfxType.VARCHAR: _decode_varchar, IfxType.NCHAR: _decode_char, IfxType.NVCHAR: _decode_varchar, IfxType.LVARCHAR: _decode_varchar, IfxType.BOOL: _decode_bool, IfxType.DATE: _decode_date, IfxType.DECIMAL: _decode_decimal, IfxType.MONEY: _decode_decimal, # MONEY is DECIMAL with implied scale } def decode(type_code: int, raw: bytes) -> object: """Decode ``raw`` bytes for the given IDS type code into a Python value. The high-bit flags (NOTNULLABLE etc.) are stripped before lookup. Raises ``KeyError`` for unsupported types — Phase 6+ adds the rest. """ base = base_type(type_code) decoder = DECODERS.get(base) if decoder is None: raise NotImplementedError( f"decoder for IDS type code {base} not yet implemented " f"(Phase 2 MVP supports: SMALLINT, INT, BIGINT, REAL, FLOAT, " f"CHAR, VARCHAR, BOOL, DATE)" ) return decoder(raw) # --------------------------------------------------------------------------- # Encoders for parameter binding (Phase 4) # --------------------------------------------------------------------------- # Returns ``(type_code, prec_short, raw_bytes)`` per parameter. # Per-param SQ_BIND format: ``[short type][short ind=0][short prec][data]`` # where data is ``writePadded(raw_bytes)`` (emit + pad-to-even). # # JDBC's IfxSqli.sendBind (line 844+) does precision encoding per type: # INT/SERIAL: prec = 0x0a00 (packed width=10, scale=0) # VARCHAR sent as CHAR (type=0): prec = 0 # FLOAT (DOUBLE PRECISION): prec = 0 # # Strings get type=0 (CHAR) on the wire — Informix's server casts them # to the declared column type via the CIDESCRIBE/IDESCRIBE handshake. EncodedParam = tuple[int, int, bytes] def _encode_int(value: int) -> EncodedParam: """Encode a Python int as Informix INTEGER (type=2, 4 bytes BE).""" return (2, 0x0A00, value.to_bytes(4, "big", signed=True)) def _encode_bigint(value: int) -> EncodedParam: """Encode a Python int as Informix BIGINT (type=52, 8 bytes BE).""" return (52, 0x1300, value.to_bytes(8, "big", signed=True)) def _encode_str(value: str) -> EncodedParam: """Encode a Python str as Informix CHAR (type=0, length-prefixed). JDBC sends Java strings as CHAR (type=0) on the wire — the server handles conversion to the actual column type (CHAR/VARCHAR/NVARCHAR). Format: ``[short length][bytes]`` (writePadded adds even-byte pad). """ encoded = value.encode("iso-8859-1") raw = len(encoded).to_bytes(2, "big") + encoded return (0, 0, raw) def _encode_float(value: float) -> EncodedParam: """Encode a Python float as Informix FLOAT (type=3, 8-byte IEEE 754).""" return (3, 0, struct.pack("!d", value)) def _encode_bool(value: bool) -> EncodedParam: """Encode a Python bool as Informix BOOLEAN (type=45, 1 byte).""" return (45, 0, b"\x01" if value else b"\x00") def _encode_decimal(value: decimal.Decimal) -> EncodedParam: """Encode a Python ``decimal.Decimal`` as IDS DECIMAL (type=5). Inverse of ``_decode_decimal``: produce a base-100 BCD encoding with the ``[sign+exponent][digit-pairs]`` header byte. Mirrors ``Decimal.javaToIfx`` (line 457). """ sign, digits, exp = value.as_tuple() # Total decimal digits in mantissa n_digits = len(digits) # Compute base-10 exponent of the most significant digit # (the "exp" returned by as_tuple is the position of the LSD; # we want the position of the MSD relative to the decimal point.) base10_exp = n_digits + exp # number of digits BEFORE the decimal # Pad digits to even length on both sides so we can pack into base-100. # Compute how many leading-zero-pairs to add (to align base100_exp on # a base-100 boundary). if base10_exp % 2 != 0: # If odd, add a leading 0 to align — base10_exp becomes even. digits = (0, *digits) base10_exp += 1 n_digits += 1 if n_digits % 2 != 0: # Pad trailing zero to make digit count even (so we can pair). digits = (*digits, 0) n_digits += 1 base100_exp = base10_exp // 2 # exponent in base-100 digits # Pack pairs of decimal digits into bytes. digit_pairs = bytes( digits[i] * 10 + digits[i + 1] for i in range(0, n_digits, 2) ) is_positive = sign == 0 biased_exp = base100_exp + 64 if is_positive: head_byte = (biased_exp & 0x7F) | 0x80 out_digits = digit_pairs else: # Apply asymmetric base-100 complement (mirror of decode). complemented = bytearray(digit_pairs) sub_from = 100 for i in range(len(complemented) - 1, -1, -1): if complemented[i] == 0 and sub_from == 100: continue complemented[i] = sub_from - complemented[i] sub_from = 99 # Negative: head byte is biased_exp ^ 0x7F (high bit stays 0) head_byte = (biased_exp & 0x7F) ^ 0x7F out_digits = bytes(complemented) raw = bytes([head_byte]) + out_digits # Precision short for DECIMAL: packed (precision << 8) | scale # Precision = total significant digits, scale = digits after point. precision = max(n_digits, 1) scale = max(0, -exp) prec_short = (precision << 8) | (scale & 0xFF) return (5, prec_short, raw) def encode_param(value: object) -> EncodedParam: """Pick an encoder based on the Python value's type. Returns ``(ifx_type, precision_short, raw_bytes)`` for the parameter. Returns ``(0, 0, b"")`` and the caller must use indicator=-1 for None. """ if value is None: return (0, 0, b"") if isinstance(value, bool): # NB: must come before int (bool is int subclass) return _encode_bool(value) if isinstance(value, int): # Pick INT vs BIGINT based on range. if -0x80000000 <= value <= 0x7FFFFFFF: return _encode_int(value) return _encode_bigint(value) if isinstance(value, float): return _encode_float(value) if isinstance(value, str): return _encode_str(value) if isinstance(value, decimal.Decimal): # _encode_decimal is implemented but the server rejects the # bytes (precision packing wrong somewhere) — kept as a # Phase 6.x starting point but disabled for now. Workaround: # cast Decimal to float at the call site if you need to bind. raise NotImplementedError( "Decimal parameter binding is Phase 6.x; convert to float " "or pass DECIMAL via SQL literal for now" ) raise NotImplementedError( f"parameter binding for {type(value).__name__} not yet supported " f"(Phase 4 MVP: int, float, str, bool, None)" ) # Phase 6+ adds: bytes/Bytes, datetime.date, datetime.datetime, Decimal, # datetime.timedelta (INTERVAL), bytearray (BYTE), large strings (LVARCHAR). ENCODERS: dict[int, Callable[[object], bytes]] = {}