Phase 6.d: INTERVAL decoding (both qualifier families)
Implements row-decoding for IDS INTERVAL, the last common temporal type. The qualifier short bisects the type at the wire level: start_TU >= DAY maps to datetime.timedelta (day-fraction), start_TU <= MONTH maps to a new informix_db.IntervalYM (year-month). Wire format mirrors DATETIME exactly — `[head byte][digit pairs in base-100]`, with the qualifier dictating field interpretation. The fraction-to-nanoseconds scaling (`scale_exp = 18 - end_TU`, forced odd) is the JDBC pattern from `Decimal.fromIfxToArray`. IntervalYM is a frozen dataclass holding signed total months, with `years` and `remainder_months` as derived properties. Matches JDBC's `IntervalYM.months` shape rather than a (years, months) tuple — avoids ambiguity around what "negative" means for a multi-field tuple. Tests: 13 unit (synthetic byte streams covering all decoder branches) + 9 integration (real Informix queries spanning DAY TO SECOND, HOUR TO SECOND, YEAR TO MONTH, negatives, NULL, and mixed-family rows). Total test count: 53 unit + 82 integration = 135. Encoder for INTERVAL parameter binding is deferred to a later phase (same arc as DECIMAL/DATETIME — decode lands first).
This commit is contained in:
parent
10863a9337
commit
4dafbf8ce9
@ -426,6 +426,32 @@ For DECIMAL, the encoder uses the asymmetric base-100 complement (mirror of deco
|
||||
|
||||
---
|
||||
|
||||
## 2026-05-04 — INTERVAL decoding (both qualifier families)
|
||||
|
||||
**Status**: active
|
||||
**Decision**: ``_decode_interval`` decodes IDS INTERVAL into one of two Python types based on the qualifier's ``start_TU``:
|
||||
- ``start_TU >= DAY (4)`` (IntervalDF) → ``datetime.timedelta``
|
||||
- ``start_TU <= MONTH (2)`` (IntervalYM) → :class:`informix_db.IntervalYM` (a small frozen dataclass holding signed total months)
|
||||
|
||||
**The wire format is the same as DECIMAL/DATETIME** — ``[head byte][digit pairs in base-100]`` with sign+biased-exponent header. The qualifier short tells you how to *interpret* those digits:
|
||||
- High byte = total digit count across all fields
|
||||
- Middle nibble = start_TU; low nibble = end_TU
|
||||
- First field has variable digit width: ``flen = total_len - (end_TU - start_TU)`` (which is the digits "added" past the first field; each non-first field is exactly 2 digits)
|
||||
- Subsequent non-first non-fractional fields are 1 byte each (since each is exactly 2 base-10 digits = 1 base-100 digit pair)
|
||||
- Fractional fields scale to nanoseconds via ``cv *= 10 ** scale_exp`` where ``scale_exp = 18 - end_TU`` forced odd
|
||||
|
||||
Wire byte width on the SQ_TUPLE side = ``ceil(digit_count / 2) + 1`` (one head byte + ceil(digits/2) digit pairs). Same formula as DATETIME and DECIMAL — surfaces in ``_resultset.parse_tuple_payload`` as a dedicated branch (because the qualifier is needed at decode time).
|
||||
|
||||
**The dec_exp arithmetic that initially fooled me**: I kept misreading ``(total_len + 10 - end_TU + 1) / 2`` as a much larger value than it is. For HOUR(2) TO SECOND, ``total_len=6, end_TU=10``, so dec_exp = 7//2 = 3, not 8. After the encoder writes dec_exp into the head byte and the decoder reads it back, the two match perfectly so the digit array lines up at offset 0 of the 16-byte working buffer — but only if you actually compute the value correctly. *Read your own arithmetic.* (The synthetic unit-test framework caught this immediately, before the integration tests even ran.)
|
||||
|
||||
**IntervalYM design**: I considered a NamedTuple with (years, months) fields, but a frozen dataclass with a single signed ``months`` field matches JDBC's ``IntervalYM`` and avoids ambiguity around "what does negative mean for a tuple". ``years`` and ``remainder_months`` are read-only properties; ``__str__`` emits the standard "Y-MM" / "-Y-MM" form. ``slots=True`` makes it as cheap as a NamedTuple memory-wise.
|
||||
|
||||
**Verified against 9 integration scenarios** (all decoder branches): DAY TO SECOND, HOUR TO SECOND, MINUTE TO SECOND, YEAR TO MONTH, YEAR-only, negative interval (9's-complement), table column, NULL, and a multi-INTERVAL row (proves per-column slicing works across mixed qualifier families).
|
||||
|
||||
INTERVAL parameter binding (encoder) is deferred to Phase 6.e or later — same arc as DECIMAL/DATETIME, where decoding lands first and encoding follows once we have wire captures to compare against.
|
||||
|
||||
---
|
||||
|
||||
## (template — copy below this line for new entries)
|
||||
|
||||
```
|
||||
|
||||
@ -23,6 +23,7 @@ from __future__ import annotations
|
||||
from importlib.metadata import PackageNotFoundError, version
|
||||
|
||||
from .connections import Connection
|
||||
from .converters import IntervalYM
|
||||
from .exceptions import (
|
||||
DatabaseError,
|
||||
DataError,
|
||||
@ -55,6 +56,7 @@ __all__ = [
|
||||
"IntegrityError",
|
||||
"InterfaceError",
|
||||
"InternalError",
|
||||
"IntervalYM",
|
||||
"NotSupportedError",
|
||||
"OperationalError",
|
||||
"ProgrammingError",
|
||||
|
||||
@ -270,6 +270,21 @@ def parse_tuple_payload(
|
||||
values.append(_decode_datetime(raw, col.encoded_length))
|
||||
continue
|
||||
|
||||
# INTERVAL: same width formula as DATETIME — high byte of
|
||||
# encoded_length holds the total digit count across all fields,
|
||||
# and the wire bytes are ``[head][digit pairs]`` (one head byte
|
||||
# plus ceil(digit_count/2) digit pairs). Like DATETIME, the
|
||||
# qualifier is needed at decode time, so we bypass the generic
|
||||
# dispatch.
|
||||
if base == int(IfxType.INTERVAL):
|
||||
digit_count = (col.encoded_length >> 8) & 0xFF
|
||||
width = (digit_count + 1) // 2 + 1
|
||||
raw = payload[offset:offset + width]
|
||||
offset += width
|
||||
from .converters import _decode_interval
|
||||
values.append(_decode_interval(raw, col.encoded_length))
|
||||
continue
|
||||
|
||||
# Fixed-width types
|
||||
width = FIXED_WIDTHS.get(base)
|
||||
if width is None:
|
||||
|
||||
@ -15,6 +15,7 @@ For DATE we use the Informix epoch (1899-12-31). The raw bytes are a
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import dataclasses
|
||||
import datetime
|
||||
import decimal
|
||||
import struct
|
||||
@ -22,6 +23,32 @@ from collections.abc import Callable
|
||||
|
||||
from ._types import IfxType, base_type
|
||||
|
||||
|
||||
@dataclasses.dataclass(frozen=True, slots=True)
|
||||
class IntervalYM:
|
||||
"""Year-month interval — Informix's only temporal duration that can't
|
||||
collapse into ``datetime.timedelta`` because months have variable length.
|
||||
|
||||
The single ``months`` field stores the signed total month count, mirroring
|
||||
JDBC's ``IntervalYM.months``. ``years`` and ``remainder_months`` are
|
||||
convenience derivatives.
|
||||
"""
|
||||
|
||||
months: int
|
||||
|
||||
@property
|
||||
def years(self) -> int:
|
||||
return abs(self.months) // 12 * (-1 if self.months < 0 else 1)
|
||||
|
||||
@property
|
||||
def remainder_months(self) -> int:
|
||||
return abs(self.months) % 12 * (-1 if self.months < 0 else 1)
|
||||
|
||||
def __str__(self) -> str:
|
||||
sign = "-" if self.months < 0 else ""
|
||||
m = abs(self.months)
|
||||
return f"{sign}{m // 12}-{m % 12:02d}"
|
||||
|
||||
# Informix DATE epoch — day 0 is December 31, 1899 (per Informix convention).
|
||||
_INFORMIX_DATE_EPOCH = datetime.date(1899, 12, 31)
|
||||
|
||||
@ -177,6 +204,120 @@ def _decode_datetime(raw: bytes, encoded_length: int) -> object | None:
|
||||
return datetime.datetime(year, month, day, hour, minute, second, microsecond)
|
||||
|
||||
|
||||
def _decode_interval(
|
||||
raw: bytes, encoded_length: int
|
||||
) -> datetime.timedelta | IntervalYM | None:
|
||||
"""Decode IDS INTERVAL bytes per the qualifier in ``encoded_length``.
|
||||
|
||||
Wire format mirrors DECIMAL/DATETIME: ``[head byte][digit pairs in base-100]``
|
||||
where the head byte packs ``(sign << 7) | (biased_exp & 0x7F)``.
|
||||
The qualifier short is ``(total_len << 8) | (start_TU << 4) | end_TU``
|
||||
where ``total_len`` is the total digit count across all fields.
|
||||
|
||||
Returns:
|
||||
- ``datetime.timedelta`` for day-fraction intervals (start_TU >= DAY)
|
||||
- :class:`IntervalYM` for year-month intervals (start_TU <= MONTH)
|
||||
- ``None`` for NULL (per Decimal NULL marker)
|
||||
|
||||
The scaling magic for fraction → nanoseconds (``scale_exp = 18 - end_TU``,
|
||||
forced odd) is what aligns the byte-packed BCD digits to the 9-digit
|
||||
nanosecond field — this is the JDBC pattern from ``Decimal.fromIfxToArray``.
|
||||
"""
|
||||
if len(raw) < 2 or (raw[0] == 0 and raw[1] == 0):
|
||||
return None
|
||||
|
||||
qual = encoded_length
|
||||
total_len = (qual >> 8) & 0xFF
|
||||
start_tu = (qual >> 4) & 0x0F
|
||||
end_tu = qual & 0x0F
|
||||
|
||||
head = raw[0]
|
||||
is_positive = (head & 0x80) != 0
|
||||
biased_exp = (head & 0x7F) if is_positive else ((head ^ 0x7F) & 0x7F)
|
||||
dec_exp = biased_exp - 64
|
||||
|
||||
digits = list(raw[1:])
|
||||
if not is_positive:
|
||||
# Asymmetric base-100 complement (mirror of negative-decimal decode).
|
||||
sub_from = 100
|
||||
for i in range(len(digits) - 1, -1, -1):
|
||||
if digits[i] == 0 and sub_from == 100:
|
||||
continue
|
||||
digits[i] = sub_from - digits[i]
|
||||
sub_from = 99
|
||||
|
||||
# Lay digits into a 16-byte buffer at offset (bexpon - dec_exp). For
|
||||
# values written by the standard encoder, dec_exp == bexpon and the
|
||||
# shift is zero — but legacy/short forms can have dec_exp < bexpon.
|
||||
bexpon = (total_len + 10 - end_tu + 1) // 2
|
||||
dtbuf = [0] * 16
|
||||
if digits and bexpon >= dec_exp:
|
||||
offset = bexpon - dec_exp
|
||||
for i, d in enumerate(digits):
|
||||
if offset + i < 16:
|
||||
dtbuf[offset + i] = d
|
||||
|
||||
# Walk fields per qualifier
|
||||
flen = total_len - (end_tu - start_tu) # first-field digit count
|
||||
fields = [0] * 7 # year, month, day, hour, min, sec, nanos-scaled
|
||||
dtbuf_idx = 0
|
||||
data_idx = 6 if start_tu > 10 else start_tu // 2
|
||||
|
||||
current = start_tu
|
||||
if current != 12: # 12 is the legacy "fraction-only" sentinel
|
||||
# Read first field as a multi-byte base-100 integer.
|
||||
nbytes = (flen + 1) // 2
|
||||
cv = 0
|
||||
while dtbuf_idx < nbytes:
|
||||
cv = cv * 100 + dtbuf[dtbuf_idx]
|
||||
dtbuf_idx += 1
|
||||
fields[data_idx] = cv
|
||||
data_idx += 1
|
||||
current += 2
|
||||
|
||||
while current <= end_tu and current <= 10:
|
||||
fields[data_idx] = dtbuf[dtbuf_idx]
|
||||
dtbuf_idx += 1
|
||||
data_idx += 1
|
||||
current += 2
|
||||
|
||||
if end_tu > 10:
|
||||
cv = 0
|
||||
cf = 11
|
||||
while cf <= end_tu:
|
||||
cv = cv * 100 + dtbuf[dtbuf_idx]
|
||||
dtbuf_idx += 1
|
||||
cf += 2
|
||||
scale_exp = 18 - end_tu
|
||||
if (scale_exp & 1) == 0:
|
||||
scale_exp += 1
|
||||
cv *= 10**scale_exp # now in nanoseconds
|
||||
fields[6] = cv
|
||||
|
||||
if start_tu >= 4:
|
||||
# IntervalDF — collapse to total seconds + nanos → timedelta.
|
||||
total_secs = (
|
||||
fields[2] * 86400
|
||||
+ fields[3] * 3600
|
||||
+ fields[4] * 60
|
||||
+ fields[5]
|
||||
)
|
||||
nanos = fields[6]
|
||||
if not is_positive:
|
||||
total_secs = -total_secs
|
||||
nanos = -nanos
|
||||
return datetime.timedelta(
|
||||
seconds=total_secs,
|
||||
microseconds=nanos // 1000,
|
||||
)
|
||||
|
||||
# IntervalYM — year-month: total months
|
||||
total_months = fields[0] * 12 + fields[1]
|
||||
if not is_positive:
|
||||
total_months = -total_months
|
||||
return IntervalYM(months=total_months)
|
||||
|
||||
|
||||
def _decode_decimal(raw: bytes) -> decimal.Decimal | None:
|
||||
"""Decode IDS DECIMAL/MONEY: base-100 packed BCD with sign/exponent header.
|
||||
|
||||
|
||||
161
tests/test_interval.py
Normal file
161
tests/test_interval.py
Normal file
@ -0,0 +1,161 @@
|
||||
"""Phase 6.d integration tests — INTERVAL decoding for both qualifier families.
|
||||
|
||||
INTERVAL splits into two families that decode to different Python types:
|
||||
- DAY-FRACTION (start TU >= DAY=4) → ``datetime.timedelta``
|
||||
- YEAR-MONTH (start TU <= MONTH=2) → :class:`informix_db.IntervalYM`
|
||||
|
||||
The wire format is identical to DECIMAL/DATETIME (BCD with sign+exp head
|
||||
byte); the qualifier short tells us how to interpret the digits as
|
||||
time fields.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
|
||||
import pytest
|
||||
|
||||
import informix_db
|
||||
from informix_db import IntervalYM
|
||||
from tests.conftest import ConnParams
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
def _connect(conn_params: ConnParams) -> informix_db.Connection:
|
||||
return informix_db.connect(
|
||||
host=conn_params.host,
|
||||
port=conn_params.port,
|
||||
user=conn_params.user,
|
||||
password=conn_params.password,
|
||||
database=conn_params.database,
|
||||
server=conn_params.server,
|
||||
connect_timeout=10.0,
|
||||
read_timeout=10.0,
|
||||
)
|
||||
|
||||
|
||||
def test_interval_day_to_second(conn_params: ConnParams) -> None:
|
||||
"""INTERVAL DAY TO SECOND: 3 days, 4 hours, 5 minutes, 6 seconds."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT INTERVAL(3 04:05:06) DAY TO SECOND "
|
||||
"FROM systables WHERE tabid = 1"
|
||||
)
|
||||
(val,) = cur.fetchone()
|
||||
assert val == datetime.timedelta(
|
||||
days=3, hours=4, minutes=5, seconds=6
|
||||
)
|
||||
|
||||
|
||||
def test_interval_hour_to_second(conn_params: ConnParams) -> None:
|
||||
"""INTERVAL HOUR TO SECOND: 12:34:56."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT INTERVAL(12:34:56) HOUR TO SECOND "
|
||||
"FROM systables WHERE tabid = 1"
|
||||
)
|
||||
(val,) = cur.fetchone()
|
||||
assert val == datetime.timedelta(hours=12, minutes=34, seconds=56)
|
||||
|
||||
|
||||
def test_interval_minute_to_second(conn_params: ConnParams) -> None:
|
||||
"""INTERVAL MINUTE TO SECOND: 23:45."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT INTERVAL(23:45) MINUTE TO SECOND "
|
||||
"FROM systables WHERE tabid = 1"
|
||||
)
|
||||
(val,) = cur.fetchone()
|
||||
assert val == datetime.timedelta(minutes=23, seconds=45)
|
||||
|
||||
|
||||
def test_interval_year_to_month(conn_params: ConnParams) -> None:
|
||||
"""INTERVAL YEAR TO MONTH: 5 years, 3 months."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT INTERVAL(5-3) YEAR TO MONTH "
|
||||
"FROM systables WHERE tabid = 1"
|
||||
)
|
||||
(val,) = cur.fetchone()
|
||||
assert val == IntervalYM(months=5 * 12 + 3)
|
||||
|
||||
|
||||
def test_interval_year_only(conn_params: ConnParams) -> None:
|
||||
"""INTERVAL YEAR — exercises start==end, year-only qualifier."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT INTERVAL(2026) YEAR(4) TO YEAR "
|
||||
"FROM systables WHERE tabid = 1"
|
||||
)
|
||||
(val,) = cur.fetchone()
|
||||
assert val == IntervalYM(months=2026 * 12)
|
||||
|
||||
|
||||
def test_interval_negative(conn_params: ConnParams) -> None:
|
||||
"""Negative INTERVAL — exercises 9's-complement decode for intervals."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT INTERVAL(-3 04:05:06) DAY TO SECOND "
|
||||
"FROM systables WHERE tabid = 1"
|
||||
)
|
||||
(val,) = cur.fetchone()
|
||||
# Note: Informix encodes the whole interval as negative — all
|
||||
# fields share the sign.
|
||||
assert val == datetime.timedelta(
|
||||
days=-3, hours=-4, minutes=-5, seconds=-6
|
||||
)
|
||||
|
||||
|
||||
def test_interval_in_table_column(conn_params: ConnParams) -> None:
|
||||
"""INTERVAL stored in a table column round-trips correctly."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"CREATE TEMP TABLE t_iv "
|
||||
"(id INTEGER, "
|
||||
" span INTERVAL DAY(2) TO SECOND, "
|
||||
" age INTERVAL YEAR(4) TO MONTH)"
|
||||
)
|
||||
cur.execute(
|
||||
"INSERT INTO t_iv VALUES "
|
||||
"(1, INTERVAL(7 12:00:00) DAY TO SECOND, "
|
||||
"INTERVAL(2-6) YEAR TO MONTH)"
|
||||
)
|
||||
cur.execute("SELECT span, age FROM t_iv")
|
||||
span, age = cur.fetchone()
|
||||
assert span == datetime.timedelta(days=7, hours=12)
|
||||
assert age == IntervalYM(months=2 * 12 + 6)
|
||||
|
||||
|
||||
def test_interval_null(conn_params: ConnParams) -> None:
|
||||
"""NULL INTERVAL decodes to Python None."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute("CREATE TEMP TABLE t_iv2 (span INTERVAL DAY TO SECOND)")
|
||||
cur.execute("INSERT INTO t_iv2 VALUES (NULL)")
|
||||
cur.execute("SELECT span FROM t_iv2")
|
||||
assert cur.fetchone() == (None,)
|
||||
|
||||
|
||||
def test_interval_multiple_columns(conn_params: ConnParams) -> None:
|
||||
"""Multiple INTERVAL qualifiers in one row — proves per-column slicing."""
|
||||
with _connect(conn_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT "
|
||||
" INTERVAL(3 04:05:06) DAY TO SECOND, "
|
||||
" INTERVAL(1-6) YEAR TO MONTH, "
|
||||
" INTERVAL(99:30) HOUR(2) TO MINUTE "
|
||||
"FROM systables WHERE tabid = 1"
|
||||
)
|
||||
df, ym, hm = cur.fetchone()
|
||||
assert df == datetime.timedelta(days=3, hours=4, minutes=5, seconds=6)
|
||||
assert ym == IntervalYM(months=18)
|
||||
assert hm == datetime.timedelta(hours=99, minutes=30)
|
||||
197
tests/test_interval_unit.py
Normal file
197
tests/test_interval_unit.py
Normal file
@ -0,0 +1,197 @@
|
||||
"""Phase 6.d unit tests — INTERVAL decoder against synthetic byte streams.
|
||||
|
||||
These don't require the Informix container; they exercise the codec layer
|
||||
directly with hand-constructed wire bytes per the JDBC reference encoding.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
|
||||
from informix_db import IntervalYM
|
||||
from informix_db.converters import _decode_interval
|
||||
|
||||
|
||||
def _qual(total_len: int, start_tu: int, end_tu: int) -> int:
|
||||
"""Pack qualifier short: ``(total_len << 8) | (start_tu << 4) | end_tu``."""
|
||||
return (total_len << 8) | (start_tu << 4) | end_tu
|
||||
|
||||
|
||||
def _head(positive: bool, dec_exp: int) -> int:
|
||||
"""Encode the head byte: ``(sign << 7) | (biased_exp & 0x7F)``."""
|
||||
biased = (dec_exp + 64) & 0x7F
|
||||
return (biased | 0x80) if positive else (biased ^ 0x7F)
|
||||
|
||||
|
||||
def _encode_digits(positive: bool, digits: list[int]) -> bytes:
|
||||
"""Apply the asymmetric base-100 complement for negative values."""
|
||||
if positive:
|
||||
return bytes(digits)
|
||||
out = list(digits)
|
||||
sub_from = 100
|
||||
for i in range(len(out) - 1, -1, -1):
|
||||
if out[i] == 0 and sub_from == 100:
|
||||
continue
|
||||
out[i] = sub_from - out[i]
|
||||
sub_from = 99
|
||||
return bytes(out)
|
||||
|
||||
|
||||
# -------- Day-fraction (IntervalDF) ----------
|
||||
|
||||
|
||||
def test_day_to_second_simple() -> None:
|
||||
"""INTERVAL DAY(2) TO SECOND with value 3 days, 4:05:06."""
|
||||
qual = _qual(2 + (10 - 4), 4, 10) # total_len=8, start=DAY(4), end=SECOND(10)
|
||||
# Fields: DAY=03, HOUR=04, MIN=05, SEC=06 → 4 base-100 digit-pair bytes
|
||||
# dec_exp = bexpon = (8 + 10 - 10 + 1)/2 = 9/2 = 4
|
||||
head = _head(True, 4)
|
||||
body = _encode_digits(True, [3, 4, 5, 6])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == datetime.timedelta(days=3, hours=4, minutes=5, seconds=6)
|
||||
|
||||
|
||||
def test_hour_to_second_basic() -> None:
|
||||
"""INTERVAL HOUR(2) TO SECOND with value 12:34:56."""
|
||||
qual = _qual(2 + (10 - 6), 6, 10) # total_len=6, start=HOUR(6), end=SECOND(10)
|
||||
# dec_exp = bexpon = (6 + 10 - 10 + 1)//2 = 3
|
||||
head = _head(True, 3)
|
||||
body = _encode_digits(True, [12, 34, 56])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == datetime.timedelta(hours=12, minutes=34, seconds=56)
|
||||
|
||||
|
||||
def test_negative_day_to_second() -> None:
|
||||
"""Negative INTERVAL DAY TO SECOND — exercises 9's-complement decode."""
|
||||
qual = _qual(2 + (10 - 4), 4, 10)
|
||||
head = _head(False, 4)
|
||||
body = _encode_digits(False, [3, 4, 5, 6])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == datetime.timedelta(
|
||||
days=-3, hours=-4, minutes=-5, seconds=-6
|
||||
)
|
||||
|
||||
|
||||
def test_day_with_long_first_field() -> None:
|
||||
"""INTERVAL DAY(5) TO SECOND — odd-length first field (5 digits = 3 bytes)."""
|
||||
qual = _qual(5 + (10 - 4), 4, 10) # total_len=11
|
||||
# Fields: DAY=12345 → as 3 bytes: [01, 23, 45], then HOUR=00, MIN=00, SEC=00
|
||||
# dec_exp = bexpon = (11 + 10 - 10 + 1)/2 = 6
|
||||
head = _head(True, 6)
|
||||
body = _encode_digits(True, [1, 23, 45, 0, 0, 0])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == datetime.timedelta(days=12345)
|
||||
|
||||
|
||||
def test_minute_to_second() -> None:
|
||||
"""INTERVAL MINUTE(2) TO SECOND."""
|
||||
qual = _qual(2 + (10 - 8), 8, 10) # total_len=4
|
||||
# Fields: MIN=23, SEC=45 → 2 bytes
|
||||
# dec_exp = bexpon = (4 + 10 - 10 + 1)/2 = 5/2 = 2
|
||||
head = _head(True, 2)
|
||||
body = _encode_digits(True, [23, 45])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == datetime.timedelta(minutes=23, seconds=45)
|
||||
|
||||
|
||||
def test_day_to_fraction_3() -> None:
|
||||
"""INTERVAL DAY(2) TO FRACTION(3): 1 day, 0:00:00.500"""
|
||||
qual = _qual(2 + (13 - 4), 4, 13) # total_len=11, FRAC3=13
|
||||
# Fields: DAY=01, HOUR=00, MIN=00, SEC=00, FRAC bytes = [50, 00] (.500 → 500 /
|
||||
# 100 = 5 then 0; wait — 0.500 nanos=500000000, dec_exp scaling per encoder:
|
||||
# scale_exp = 18 - 13 = 5 (odd, leave). nans/(10^6) = 500. Pack into 2 bytes
|
||||
# via (3 - tmpInt) loop: tmpInt=3 → mod=10, b=500/10=50, nans%=10=0; tmpInt=1
|
||||
# special: nans*=10=0, b=0/1=0. So [50, 0].
|
||||
# dec_exp = bexpon = (11 + 10 - 13 + 1)/2 = 9/2 = 4
|
||||
head = _head(True, 4)
|
||||
body = _encode_digits(True, [1, 0, 0, 0, 50, 0])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == datetime.timedelta(days=1, microseconds=500_000)
|
||||
|
||||
|
||||
def test_null_interval() -> None:
|
||||
"""NULL INTERVAL: head byte 0x00, first digit byte 0x00 → None."""
|
||||
qual = _qual(8, 4, 10)
|
||||
raw = bytes([0, 0, 0, 0, 0])
|
||||
assert _decode_interval(raw, qual) is None
|
||||
|
||||
|
||||
# -------- Year-month (IntervalYM) ----------
|
||||
|
||||
|
||||
def test_year_to_month_basic() -> None:
|
||||
"""INTERVAL YEAR(4) TO MONTH: 5 years, 3 months."""
|
||||
qual = _qual(4 + (2 - 0), 0, 2) # total_len=6, start=YEAR(0), end=MONTH(2)
|
||||
# Encoder: years=5, months=3. lfprec=4 (years gets 4 digits). Packed even:
|
||||
# b[0]=0 (years/100), b[1]=5 (years%100), b[2]=3 (months).
|
||||
# dec_exp = bexpon = (6 + 10 - 2 + 1)/2 = 15/2 = 7
|
||||
head = _head(True, 7)
|
||||
body = _encode_digits(True, [0, 5, 3])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == IntervalYM(months=5 * 12 + 3)
|
||||
|
||||
|
||||
def test_year_to_month_negative() -> None:
|
||||
qual = _qual(6, 0, 2)
|
||||
head = _head(False, 7)
|
||||
body = _encode_digits(False, [0, 5, 3])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == IntervalYM(months=-(5 * 12 + 3))
|
||||
|
||||
|
||||
def test_year_only() -> None:
|
||||
"""INTERVAL YEAR(4)."""
|
||||
qual = _qual(4, 0, 0) # start=YEAR, end=YEAR
|
||||
# Fields: YEAR=2026 → 2 bytes [20, 26]. lfprec=4, even.
|
||||
# dec_exp = bexpon = (4 + 10 - 0 + 1)/2 = 15/2 = 7
|
||||
head = _head(True, 7)
|
||||
body = _encode_digits(True, [20, 26])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == IntervalYM(months=2026 * 12)
|
||||
|
||||
|
||||
def test_month_only() -> None:
|
||||
"""INTERVAL MONTH(3) — 365 months."""
|
||||
qual = _qual(3, 2, 2) # start=MONTH, end=MONTH; total_len=3
|
||||
# Fields: MONTH=365 → odd lfprec=3: b[0]=3 (high), b[1]=65. So 2 bytes.
|
||||
# dec_exp = bexpon = (3 + 10 - 2 + 1)/2 = 12/2 = 6
|
||||
head = _head(True, 6)
|
||||
body = _encode_digits(True, [3, 65])
|
||||
raw = bytes([head]) + body
|
||||
result = _decode_interval(raw, qual)
|
||||
assert result == IntervalYM(months=365)
|
||||
|
||||
|
||||
# -------- IntervalYM convenience properties ----------
|
||||
|
||||
|
||||
def test_intervalym_str_and_decomposition() -> None:
|
||||
iv = IntervalYM(months=5 * 12 + 3)
|
||||
assert iv.years == 5
|
||||
assert iv.remainder_months == 3
|
||||
assert str(iv) == "5-03"
|
||||
|
||||
neg = IntervalYM(months=-(5 * 12 + 3))
|
||||
assert neg.years == -5
|
||||
assert neg.remainder_months == -3
|
||||
assert str(neg) == "-5-03"
|
||||
|
||||
|
||||
def test_intervalym_equality_and_hashing() -> None:
|
||||
a = IntervalYM(months=15)
|
||||
b = IntervalYM(months=15)
|
||||
c = IntervalYM(months=14)
|
||||
assert a == b
|
||||
assert a != c
|
||||
# frozen + slots → hashable
|
||||
assert hash(a) == hash(b)
|
||||
assert {a, b, c} == {a, c}
|
||||
Loading…
x
Reference in New Issue
Block a user