informix-db/tests/test_blob.py
Ryan Malloy 52259f0152 Phase 8: BYTE/TEXT bind+read via SQ_BBIND/SQ_BLOB/SQ_FETCHBLOB
Implements end-to-end round-trip for BYTE (type 11) and TEXT (type 12)
columns. Python bytes/bytearray map to BYTE; str is auto-encoded as
ISO-8859-1 for TEXT.

Wire protocol — write side:
* SQ_BIND payload carries a 56-byte blob descriptor with size at offset
  [16..19] (per IfxBlob.toIfx). NULL is byte 39=1.
* After all per-param blocks, SQ_BBIND (41) declares blob count, then
  chunked SQ_BLOB (39) messages stream the actual bytes (max 1024
  bytes/chunk per JDBC), terminated by zero-length SQ_BLOB.
* Then SQ_EXECUTE proceeds normally.

Wire protocol — read side:
* SQ_TUPLE returns only the 56-byte descriptor; actual bytes live in
  the blobspace.
* For each BYTE/TEXT column in each row, send SQ_FETCHBLOB with the
  descriptor and read SQ_BLOB chunks until zero-length terminator.
* The locator is only valid while the cursor is open — must dereference
  BEFORE sending CLOSE. Doing it after returns -602 (Cannot open blob).

Server-side prerequisites (one-time setup):
1. blobspace: onspaces -c -b blobspace1 -p /path -o 0 -s 50000
2. logged DB: CREATE DATABASE testdb WITH LOG
3. config + archive:
     onmode -wm LTAPEDEV=/dev/null
     onmode -wm TAPEDEV=/dev/null
     onmode -l
     ontape -s -L 0 -t /dev/null

Without #3, JDBC fails identically to our driver with "BLOB pages can't
be allocated from a chunk until chunk add is logged". This identical
failure was the diagnostic confirmation that our protocol bytes were
correct — same server response = byte-for-byte parity.

Tests: 9 integration tests in tests/test_blob.py — single-chunk,
multi-chunk (5120 bytes), NULL, multi-row, binary-safe, TEXT roundtrip,
ISO-8859-1, NULL TEXT, mixed columns. Plus the Phase 4
test_unsupported_param_type_raises was updated since bytes is no longer
the canonical unsupported type — switched to a custom class.

Total: 53 unit + 107 integration = 160 tests.

The smart-LOB family (BLOB/CLOB) is a separate state-machine extension
deferred to Phase 9 — it uses IfxLocator + LO_OPEN/LO_READ session
protocol against sbspace, not the BBIND/BLOB stream.
2026-05-04 13:13:55 -06:00

241 lines
7.6 KiB
Python

"""Phase 8 integration tests — BYTE/TEXT round-trip via SQ_BBIND/SQ_BLOB.
BYTE/TEXT use a multi-PDU wire protocol: the SQ_BIND payload carries a
56-byte blob descriptor (with size at offset [16..19]); the actual bytes
travel via SQ_BBIND + chunked SQ_BLOB messages after SQ_BIND. On read,
the SQ_TUPLE payload returns only the descriptor; the client must
explicitly fetch the bytes via SQ_FETCHBLOB while the cursor is still
open (locator invalidated by CLOSE).
Server-side requirements (preconfigured in the dev container by Phase 7
setup): blobspace1 + sbspace1 + a logged database (testdb). The blobspace
also requires a level-0 archive before allocating pages — done via
``ontape -s -L 0 -t /dev/null`` once.
"""
from __future__ import annotations
import contextlib
from collections.abc import Iterator
import pytest
import informix_db
from tests.conftest import ConnParams
pytestmark = pytest.mark.integration
def _connect(params: ConnParams) -> informix_db.Connection:
return informix_db.connect(
host=params.host,
port=params.port,
user=params.user,
password=params.password,
database=params.database,
server=params.server,
connect_timeout=10.0,
read_timeout=10.0,
autocommit=True,
)
@pytest.fixture
def byte_table(logged_db_params: ConnParams) -> Iterator[str]:
"""Create a fresh permanent BYTE table per test, drop on teardown."""
table = "t_blob_byte"
with _connect(logged_db_params) as conn:
cur = conn.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
try:
cur.execute(
f"CREATE TABLE {table} (id INT, data BYTE IN blobspace1)"
)
except informix_db.Error as e:
pytest.skip(
f"blobspace1 unavailable ({e!r}); set up per Phase 7"
)
try:
yield table
finally:
with _connect(logged_db_params) as conn:
cur = conn.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
@pytest.fixture
def text_table(logged_db_params: ConnParams) -> Iterator[str]:
"""Create a fresh permanent TEXT table per test, drop on teardown."""
table = "t_blob_text"
with _connect(logged_db_params) as conn:
cur = conn.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
try:
cur.execute(
f"CREATE TABLE {table} (id INT, data TEXT IN blobspace1)"
)
except informix_db.Error as e:
pytest.skip(
f"blobspace1 unavailable ({e!r}); set up per Phase 7"
)
try:
yield table
finally:
with _connect(logged_db_params) as conn:
cur = conn.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
# -------- BYTE round-trip --------
def test_byte_roundtrip_short(
logged_db_params: ConnParams, byte_table: str
) -> None:
"""Short BYTE payload (<1024 bytes, single SQ_BLOB chunk)."""
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {byte_table} VALUES (?, ?)",
(1, b"hello bytes round trip"),
)
cur.execute(f"SELECT id, data FROM {byte_table}")
assert cur.fetchall() == [(1, b"hello bytes round trip")]
def test_byte_roundtrip_multichunk(
logged_db_params: ConnParams, byte_table: str
) -> None:
"""Larger BYTE payload spanning multiple SQ_BLOB chunks (>1024 bytes)."""
payload = bytes(range(256)) * 20 # 5120 bytes
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload)
)
cur.execute(f"SELECT data FROM {byte_table}")
(got,) = cur.fetchone()
assert got == payload
assert len(got) == 5120
def test_byte_null(
logged_db_params: ConnParams, byte_table: str
) -> None:
"""NULL BYTE column: byte 39 of descriptor=1 → Python None."""
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {byte_table} VALUES (?, NULL)", (1,)
)
cur.execute(f"SELECT id, data FROM {byte_table}")
assert cur.fetchall() == [(1, None)]
def test_byte_multi_row(
logged_db_params: ConnParams, byte_table: str
) -> None:
"""Multiple rows with BYTE columns — each gets its own SQ_FETCHBLOB."""
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.executemany(
f"INSERT INTO {byte_table} VALUES (?, ?)",
[
(1, b"first row data"),
(2, b"second row content"),
(3, b"third"),
],
)
cur.execute(
f"SELECT id, data FROM {byte_table} ORDER BY id"
)
assert cur.fetchall() == [
(1, b"first row data"),
(2, b"second row content"),
(3, b"third"),
]
def test_byte_binary_safe(
logged_db_params: ConnParams, byte_table: str
) -> None:
"""BYTE preserves arbitrary binary data including nulls and high bytes."""
payload = bytes([0, 1, 255, 0, 254, 128, 0]) + b"\x00\x00\x00\xff"
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload)
)
cur.execute(f"SELECT data FROM {byte_table}")
(got,) = cur.fetchone()
assert got == payload
# -------- TEXT round-trip --------
def test_text_roundtrip(
logged_db_params: ConnParams, text_table: str
) -> None:
"""TEXT column round-trip: bytes in, str out (decoded as iso-8859-1)."""
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {text_table} VALUES (?, ?)",
(1, b"this is some text content"),
)
cur.execute(f"SELECT id, data FROM {text_table}")
assert cur.fetchall() == [(1, "this is some text content")]
def test_text_with_unicode_iso8859(
logged_db_params: ConnParams, text_table: str
) -> None:
"""ISO-8859-1 characters preserved through the TEXT pipeline."""
payload = "café résumé naïve".encode("iso-8859-1")
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {text_table} VALUES (?, ?)", (1, payload)
)
cur.execute(f"SELECT data FROM {text_table}")
(got,) = cur.fetchone()
assert got == "café résumé naïve"
def test_text_null(
logged_db_params: ConnParams, text_table: str
) -> None:
"""NULL TEXT column → Python None."""
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {text_table} VALUES (?, NULL)", (1,)
)
cur.execute(f"SELECT data FROM {text_table}")
assert cur.fetchone() == (None,)
# -------- Mixed columns --------
def test_byte_alongside_other_types(
logged_db_params: ConnParams, byte_table: str
) -> None:
"""A row with BYTE + INT columns — descriptor is in tuple, blob fetched separately."""
with _connect(logged_db_params) as conn:
cur = conn.cursor()
cur.executemany(
f"INSERT INTO {byte_table} VALUES (?, ?)",
[(42, b"hello"), (99, b"world")],
)
cur.execute(
f"SELECT id, data FROM {byte_table} ORDER BY id"
)
rows = cur.fetchall()
assert rows == [(42, b"hello"), (99, b"world")]