Implements end-to-end round-trip for BYTE (type 11) and TEXT (type 12)
columns. Python bytes/bytearray map to BYTE; str is auto-encoded as
ISO-8859-1 for TEXT.
Wire protocol — write side:
* SQ_BIND payload carries a 56-byte blob descriptor with size at offset
[16..19] (per IfxBlob.toIfx). NULL is byte 39=1.
* After all per-param blocks, SQ_BBIND (41) declares blob count, then
chunked SQ_BLOB (39) messages stream the actual bytes (max 1024
bytes/chunk per JDBC), terminated by zero-length SQ_BLOB.
* Then SQ_EXECUTE proceeds normally.
Wire protocol — read side:
* SQ_TUPLE returns only the 56-byte descriptor; actual bytes live in
the blobspace.
* For each BYTE/TEXT column in each row, send SQ_FETCHBLOB with the
descriptor and read SQ_BLOB chunks until zero-length terminator.
* The locator is only valid while the cursor is open — must dereference
BEFORE sending CLOSE. Doing it after returns -602 (Cannot open blob).
Server-side prerequisites (one-time setup):
1. blobspace: onspaces -c -b blobspace1 -p /path -o 0 -s 50000
2. logged DB: CREATE DATABASE testdb WITH LOG
3. config + archive:
onmode -wm LTAPEDEV=/dev/null
onmode -wm TAPEDEV=/dev/null
onmode -l
ontape -s -L 0 -t /dev/null
Without #3, JDBC fails identically to our driver with "BLOB pages can't
be allocated from a chunk until chunk add is logged". This identical
failure was the diagnostic confirmation that our protocol bytes were
correct — same server response = byte-for-byte parity.
Tests: 9 integration tests in tests/test_blob.py — single-chunk,
multi-chunk (5120 bytes), NULL, multi-row, binary-safe, TEXT roundtrip,
ISO-8859-1, NULL TEXT, mixed columns. Plus the Phase 4
test_unsupported_param_type_raises was updated since bytes is no longer
the canonical unsupported type — switched to a custom class.
Total: 53 unit + 107 integration = 160 tests.
The smart-LOB family (BLOB/CLOB) is a separate state-machine extension
deferred to Phase 9 — it uses IfxLocator + LO_OPEN/LO_READ session
protocol against sbspace, not the BBIND/BLOB stream.
241 lines
7.6 KiB
Python
241 lines
7.6 KiB
Python
"""Phase 8 integration tests — BYTE/TEXT round-trip via SQ_BBIND/SQ_BLOB.
|
|
|
|
BYTE/TEXT use a multi-PDU wire protocol: the SQ_BIND payload carries a
|
|
56-byte blob descriptor (with size at offset [16..19]); the actual bytes
|
|
travel via SQ_BBIND + chunked SQ_BLOB messages after SQ_BIND. On read,
|
|
the SQ_TUPLE payload returns only the descriptor; the client must
|
|
explicitly fetch the bytes via SQ_FETCHBLOB while the cursor is still
|
|
open (locator invalidated by CLOSE).
|
|
|
|
Server-side requirements (preconfigured in the dev container by Phase 7
|
|
setup): blobspace1 + sbspace1 + a logged database (testdb). The blobspace
|
|
also requires a level-0 archive before allocating pages — done via
|
|
``ontape -s -L 0 -t /dev/null`` once.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import contextlib
|
|
from collections.abc import Iterator
|
|
|
|
import pytest
|
|
|
|
import informix_db
|
|
from tests.conftest import ConnParams
|
|
|
|
pytestmark = pytest.mark.integration
|
|
|
|
|
|
def _connect(params: ConnParams) -> informix_db.Connection:
|
|
return informix_db.connect(
|
|
host=params.host,
|
|
port=params.port,
|
|
user=params.user,
|
|
password=params.password,
|
|
database=params.database,
|
|
server=params.server,
|
|
connect_timeout=10.0,
|
|
read_timeout=10.0,
|
|
autocommit=True,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def byte_table(logged_db_params: ConnParams) -> Iterator[str]:
|
|
"""Create a fresh permanent BYTE table per test, drop on teardown."""
|
|
table = "t_blob_byte"
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
with contextlib.suppress(Exception):
|
|
cur.execute(f"DROP TABLE {table}")
|
|
try:
|
|
cur.execute(
|
|
f"CREATE TABLE {table} (id INT, data BYTE IN blobspace1)"
|
|
)
|
|
except informix_db.Error as e:
|
|
pytest.skip(
|
|
f"blobspace1 unavailable ({e!r}); set up per Phase 7"
|
|
)
|
|
try:
|
|
yield table
|
|
finally:
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
with contextlib.suppress(Exception):
|
|
cur.execute(f"DROP TABLE {table}")
|
|
|
|
|
|
@pytest.fixture
|
|
def text_table(logged_db_params: ConnParams) -> Iterator[str]:
|
|
"""Create a fresh permanent TEXT table per test, drop on teardown."""
|
|
table = "t_blob_text"
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
with contextlib.suppress(Exception):
|
|
cur.execute(f"DROP TABLE {table}")
|
|
try:
|
|
cur.execute(
|
|
f"CREATE TABLE {table} (id INT, data TEXT IN blobspace1)"
|
|
)
|
|
except informix_db.Error as e:
|
|
pytest.skip(
|
|
f"blobspace1 unavailable ({e!r}); set up per Phase 7"
|
|
)
|
|
try:
|
|
yield table
|
|
finally:
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
with contextlib.suppress(Exception):
|
|
cur.execute(f"DROP TABLE {table}")
|
|
|
|
|
|
# -------- BYTE round-trip --------
|
|
|
|
|
|
def test_byte_roundtrip_short(
|
|
logged_db_params: ConnParams, byte_table: str
|
|
) -> None:
|
|
"""Short BYTE payload (<1024 bytes, single SQ_BLOB chunk)."""
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"INSERT INTO {byte_table} VALUES (?, ?)",
|
|
(1, b"hello bytes round trip"),
|
|
)
|
|
cur.execute(f"SELECT id, data FROM {byte_table}")
|
|
assert cur.fetchall() == [(1, b"hello bytes round trip")]
|
|
|
|
|
|
def test_byte_roundtrip_multichunk(
|
|
logged_db_params: ConnParams, byte_table: str
|
|
) -> None:
|
|
"""Larger BYTE payload spanning multiple SQ_BLOB chunks (>1024 bytes)."""
|
|
payload = bytes(range(256)) * 20 # 5120 bytes
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload)
|
|
)
|
|
cur.execute(f"SELECT data FROM {byte_table}")
|
|
(got,) = cur.fetchone()
|
|
assert got == payload
|
|
assert len(got) == 5120
|
|
|
|
|
|
def test_byte_null(
|
|
logged_db_params: ConnParams, byte_table: str
|
|
) -> None:
|
|
"""NULL BYTE column: byte 39 of descriptor=1 → Python None."""
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"INSERT INTO {byte_table} VALUES (?, NULL)", (1,)
|
|
)
|
|
cur.execute(f"SELECT id, data FROM {byte_table}")
|
|
assert cur.fetchall() == [(1, None)]
|
|
|
|
|
|
def test_byte_multi_row(
|
|
logged_db_params: ConnParams, byte_table: str
|
|
) -> None:
|
|
"""Multiple rows with BYTE columns — each gets its own SQ_FETCHBLOB."""
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.executemany(
|
|
f"INSERT INTO {byte_table} VALUES (?, ?)",
|
|
[
|
|
(1, b"first row data"),
|
|
(2, b"second row content"),
|
|
(3, b"third"),
|
|
],
|
|
)
|
|
cur.execute(
|
|
f"SELECT id, data FROM {byte_table} ORDER BY id"
|
|
)
|
|
assert cur.fetchall() == [
|
|
(1, b"first row data"),
|
|
(2, b"second row content"),
|
|
(3, b"third"),
|
|
]
|
|
|
|
|
|
def test_byte_binary_safe(
|
|
logged_db_params: ConnParams, byte_table: str
|
|
) -> None:
|
|
"""BYTE preserves arbitrary binary data including nulls and high bytes."""
|
|
payload = bytes([0, 1, 255, 0, 254, 128, 0]) + b"\x00\x00\x00\xff"
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload)
|
|
)
|
|
cur.execute(f"SELECT data FROM {byte_table}")
|
|
(got,) = cur.fetchone()
|
|
assert got == payload
|
|
|
|
|
|
# -------- TEXT round-trip --------
|
|
|
|
|
|
def test_text_roundtrip(
|
|
logged_db_params: ConnParams, text_table: str
|
|
) -> None:
|
|
"""TEXT column round-trip: bytes in, str out (decoded as iso-8859-1)."""
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"INSERT INTO {text_table} VALUES (?, ?)",
|
|
(1, b"this is some text content"),
|
|
)
|
|
cur.execute(f"SELECT id, data FROM {text_table}")
|
|
assert cur.fetchall() == [(1, "this is some text content")]
|
|
|
|
|
|
def test_text_with_unicode_iso8859(
|
|
logged_db_params: ConnParams, text_table: str
|
|
) -> None:
|
|
"""ISO-8859-1 characters preserved through the TEXT pipeline."""
|
|
payload = "café résumé naïve".encode("iso-8859-1")
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"INSERT INTO {text_table} VALUES (?, ?)", (1, payload)
|
|
)
|
|
cur.execute(f"SELECT data FROM {text_table}")
|
|
(got,) = cur.fetchone()
|
|
assert got == "café résumé naïve"
|
|
|
|
|
|
def test_text_null(
|
|
logged_db_params: ConnParams, text_table: str
|
|
) -> None:
|
|
"""NULL TEXT column → Python None."""
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
f"INSERT INTO {text_table} VALUES (?, NULL)", (1,)
|
|
)
|
|
cur.execute(f"SELECT data FROM {text_table}")
|
|
assert cur.fetchone() == (None,)
|
|
|
|
|
|
# -------- Mixed columns --------
|
|
|
|
|
|
def test_byte_alongside_other_types(
|
|
logged_db_params: ConnParams, byte_table: str
|
|
) -> None:
|
|
"""A row with BYTE + INT columns — descriptor is in tuple, blob fetched separately."""
|
|
with _connect(logged_db_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.executemany(
|
|
f"INSERT INTO {byte_table} VALUES (?, ?)",
|
|
[(42, b"hello"), (99, b"world")],
|
|
)
|
|
cur.execute(
|
|
f"SELECT id, data FROM {byte_table} ORDER BY id"
|
|
)
|
|
rows = cur.fetchall()
|
|
assert rows == [(42, b"hello"), (99, b"world")]
|