informix-db/tests/test_interval_unit.py
Ryan Malloy 4dafbf8ce9 Phase 6.d: INTERVAL decoding (both qualifier families)
Implements row-decoding for IDS INTERVAL, the last common temporal type.
The qualifier short bisects the type at the wire level: start_TU >= DAY
maps to datetime.timedelta (day-fraction), start_TU <= MONTH maps to a
new informix_db.IntervalYM (year-month).

Wire format mirrors DATETIME exactly — `[head byte][digit pairs in
base-100]`, with the qualifier dictating field interpretation. The
fraction-to-nanoseconds scaling (`scale_exp = 18 - end_TU`, forced odd)
is the JDBC pattern from `Decimal.fromIfxToArray`.

IntervalYM is a frozen dataclass holding signed total months, with
`years` and `remainder_months` as derived properties. Matches JDBC's
`IntervalYM.months` shape rather than a (years, months) tuple — avoids
ambiguity around what "negative" means for a multi-field tuple.

Tests: 13 unit (synthetic byte streams covering all decoder branches)
+ 9 integration (real Informix queries spanning DAY TO SECOND, HOUR TO
SECOND, YEAR TO MONTH, negatives, NULL, and mixed-family rows).

Total test count: 53 unit + 82 integration = 135.

Encoder for INTERVAL parameter binding is deferred to a later phase
(same arc as DECIMAL/DATETIME — decode lands first).
2026-05-04 12:22:07 -06:00

198 lines
6.9 KiB
Python

"""Phase 6.d unit tests — INTERVAL decoder against synthetic byte streams.
These don't require the Informix container; they exercise the codec layer
directly with hand-constructed wire bytes per the JDBC reference encoding.
"""
from __future__ import annotations
import datetime
from informix_db import IntervalYM
from informix_db.converters import _decode_interval
def _qual(total_len: int, start_tu: int, end_tu: int) -> int:
"""Pack qualifier short: ``(total_len << 8) | (start_tu << 4) | end_tu``."""
return (total_len << 8) | (start_tu << 4) | end_tu
def _head(positive: bool, dec_exp: int) -> int:
"""Encode the head byte: ``(sign << 7) | (biased_exp & 0x7F)``."""
biased = (dec_exp + 64) & 0x7F
return (biased | 0x80) if positive else (biased ^ 0x7F)
def _encode_digits(positive: bool, digits: list[int]) -> bytes:
"""Apply the asymmetric base-100 complement for negative values."""
if positive:
return bytes(digits)
out = list(digits)
sub_from = 100
for i in range(len(out) - 1, -1, -1):
if out[i] == 0 and sub_from == 100:
continue
out[i] = sub_from - out[i]
sub_from = 99
return bytes(out)
# -------- Day-fraction (IntervalDF) ----------
def test_day_to_second_simple() -> None:
"""INTERVAL DAY(2) TO SECOND with value 3 days, 4:05:06."""
qual = _qual(2 + (10 - 4), 4, 10) # total_len=8, start=DAY(4), end=SECOND(10)
# Fields: DAY=03, HOUR=04, MIN=05, SEC=06 → 4 base-100 digit-pair bytes
# dec_exp = bexpon = (8 + 10 - 10 + 1)/2 = 9/2 = 4
head = _head(True, 4)
body = _encode_digits(True, [3, 4, 5, 6])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == datetime.timedelta(days=3, hours=4, minutes=5, seconds=6)
def test_hour_to_second_basic() -> None:
"""INTERVAL HOUR(2) TO SECOND with value 12:34:56."""
qual = _qual(2 + (10 - 6), 6, 10) # total_len=6, start=HOUR(6), end=SECOND(10)
# dec_exp = bexpon = (6 + 10 - 10 + 1)//2 = 3
head = _head(True, 3)
body = _encode_digits(True, [12, 34, 56])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == datetime.timedelta(hours=12, minutes=34, seconds=56)
def test_negative_day_to_second() -> None:
"""Negative INTERVAL DAY TO SECOND — exercises 9's-complement decode."""
qual = _qual(2 + (10 - 4), 4, 10)
head = _head(False, 4)
body = _encode_digits(False, [3, 4, 5, 6])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == datetime.timedelta(
days=-3, hours=-4, minutes=-5, seconds=-6
)
def test_day_with_long_first_field() -> None:
"""INTERVAL DAY(5) TO SECOND — odd-length first field (5 digits = 3 bytes)."""
qual = _qual(5 + (10 - 4), 4, 10) # total_len=11
# Fields: DAY=12345 → as 3 bytes: [01, 23, 45], then HOUR=00, MIN=00, SEC=00
# dec_exp = bexpon = (11 + 10 - 10 + 1)/2 = 6
head = _head(True, 6)
body = _encode_digits(True, [1, 23, 45, 0, 0, 0])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == datetime.timedelta(days=12345)
def test_minute_to_second() -> None:
"""INTERVAL MINUTE(2) TO SECOND."""
qual = _qual(2 + (10 - 8), 8, 10) # total_len=4
# Fields: MIN=23, SEC=45 → 2 bytes
# dec_exp = bexpon = (4 + 10 - 10 + 1)/2 = 5/2 = 2
head = _head(True, 2)
body = _encode_digits(True, [23, 45])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == datetime.timedelta(minutes=23, seconds=45)
def test_day_to_fraction_3() -> None:
"""INTERVAL DAY(2) TO FRACTION(3): 1 day, 0:00:00.500"""
qual = _qual(2 + (13 - 4), 4, 13) # total_len=11, FRAC3=13
# Fields: DAY=01, HOUR=00, MIN=00, SEC=00, FRAC bytes = [50, 00] (.500 → 500 /
# 100 = 5 then 0; wait — 0.500 nanos=500000000, dec_exp scaling per encoder:
# scale_exp = 18 - 13 = 5 (odd, leave). nans/(10^6) = 500. Pack into 2 bytes
# via (3 - tmpInt) loop: tmpInt=3 → mod=10, b=500/10=50, nans%=10=0; tmpInt=1
# special: nans*=10=0, b=0/1=0. So [50, 0].
# dec_exp = bexpon = (11 + 10 - 13 + 1)/2 = 9/2 = 4
head = _head(True, 4)
body = _encode_digits(True, [1, 0, 0, 0, 50, 0])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == datetime.timedelta(days=1, microseconds=500_000)
def test_null_interval() -> None:
"""NULL INTERVAL: head byte 0x00, first digit byte 0x00 → None."""
qual = _qual(8, 4, 10)
raw = bytes([0, 0, 0, 0, 0])
assert _decode_interval(raw, qual) is None
# -------- Year-month (IntervalYM) ----------
def test_year_to_month_basic() -> None:
"""INTERVAL YEAR(4) TO MONTH: 5 years, 3 months."""
qual = _qual(4 + (2 - 0), 0, 2) # total_len=6, start=YEAR(0), end=MONTH(2)
# Encoder: years=5, months=3. lfprec=4 (years gets 4 digits). Packed even:
# b[0]=0 (years/100), b[1]=5 (years%100), b[2]=3 (months).
# dec_exp = bexpon = (6 + 10 - 2 + 1)/2 = 15/2 = 7
head = _head(True, 7)
body = _encode_digits(True, [0, 5, 3])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == IntervalYM(months=5 * 12 + 3)
def test_year_to_month_negative() -> None:
qual = _qual(6, 0, 2)
head = _head(False, 7)
body = _encode_digits(False, [0, 5, 3])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == IntervalYM(months=-(5 * 12 + 3))
def test_year_only() -> None:
"""INTERVAL YEAR(4)."""
qual = _qual(4, 0, 0) # start=YEAR, end=YEAR
# Fields: YEAR=2026 → 2 bytes [20, 26]. lfprec=4, even.
# dec_exp = bexpon = (4 + 10 - 0 + 1)/2 = 15/2 = 7
head = _head(True, 7)
body = _encode_digits(True, [20, 26])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == IntervalYM(months=2026 * 12)
def test_month_only() -> None:
"""INTERVAL MONTH(3) — 365 months."""
qual = _qual(3, 2, 2) # start=MONTH, end=MONTH; total_len=3
# Fields: MONTH=365 → odd lfprec=3: b[0]=3 (high), b[1]=65. So 2 bytes.
# dec_exp = bexpon = (3 + 10 - 2 + 1)/2 = 12/2 = 6
head = _head(True, 6)
body = _encode_digits(True, [3, 65])
raw = bytes([head]) + body
result = _decode_interval(raw, qual)
assert result == IntervalYM(months=365)
# -------- IntervalYM convenience properties ----------
def test_intervalym_str_and_decomposition() -> None:
iv = IntervalYM(months=5 * 12 + 3)
assert iv.years == 5
assert iv.remainder_months == 3
assert str(iv) == "5-03"
neg = IntervalYM(months=-(5 * 12 + 3))
assert neg.years == -5
assert neg.remainder_months == -3
assert str(neg) == "-5-03"
def test_intervalym_equality_and_hashing() -> None:
a = IntervalYM(months=15)
b = IntervalYM(months=15)
c = IntervalYM(months=14)
assert a == b
assert a != c
# frozen + slots → hashable
assert hash(a) == hash(b)
assert {a, b, c} == {a, c}