"""Phase 8 integration tests — BYTE/TEXT round-trip via SQ_BBIND/SQ_BLOB. BYTE/TEXT use a multi-PDU wire protocol: the SQ_BIND payload carries a 56-byte blob descriptor (with size at offset [16..19]); the actual bytes travel via SQ_BBIND + chunked SQ_BLOB messages after SQ_BIND. On read, the SQ_TUPLE payload returns only the descriptor; the client must explicitly fetch the bytes via SQ_FETCHBLOB while the cursor is still open (locator invalidated by CLOSE). Server-side requirements (preconfigured in the dev container by Phase 7 setup): blobspace1 + sbspace1 + a logged database (testdb). The blobspace also requires a level-0 archive before allocating pages — done via ``ontape -s -L 0 -t /dev/null`` once. """ from __future__ import annotations import contextlib from collections.abc import Iterator import pytest import informix_db from tests.conftest import ConnParams pytestmark = pytest.mark.integration def _connect(params: ConnParams) -> informix_db.Connection: return informix_db.connect( host=params.host, port=params.port, user=params.user, password=params.password, database=params.database, server=params.server, connect_timeout=10.0, read_timeout=10.0, autocommit=True, ) @pytest.fixture def byte_table(logged_db_params: ConnParams) -> Iterator[str]: """Create a fresh permanent BYTE table per test, drop on teardown.""" table = "t_blob_byte" with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") try: cur.execute( f"CREATE TABLE {table} (id INT, data BYTE IN blobspace1)" ) except informix_db.Error as e: pytest.skip( f"blobspace1 unavailable ({e!r}); set up per Phase 7" ) try: yield table finally: with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") @pytest.fixture def text_table(logged_db_params: ConnParams) -> Iterator[str]: """Create a fresh permanent TEXT table per test, drop on teardown.""" table = "t_blob_text" with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") try: cur.execute( f"CREATE TABLE {table} (id INT, data TEXT IN blobspace1)" ) except informix_db.Error as e: pytest.skip( f"blobspace1 unavailable ({e!r}); set up per Phase 7" ) try: yield table finally: with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") # -------- BYTE round-trip -------- def test_byte_roundtrip_short( logged_db_params: ConnParams, byte_table: str ) -> None: """Short BYTE payload (<1024 bytes, single SQ_BLOB chunk).""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"INSERT INTO {byte_table} VALUES (?, ?)", (1, b"hello bytes round trip"), ) cur.execute(f"SELECT id, data FROM {byte_table}") assert cur.fetchall() == [(1, b"hello bytes round trip")] def test_byte_roundtrip_multichunk( logged_db_params: ConnParams, byte_table: str ) -> None: """Larger BYTE payload spanning multiple SQ_BLOB chunks (>1024 bytes).""" payload = bytes(range(256)) * 20 # 5120 bytes with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload) ) cur.execute(f"SELECT data FROM {byte_table}") (got,) = cur.fetchone() assert got == payload assert len(got) == 5120 def test_byte_null( logged_db_params: ConnParams, byte_table: str ) -> None: """NULL BYTE column: byte 39 of descriptor=1 → Python None.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"INSERT INTO {byte_table} VALUES (?, NULL)", (1,) ) cur.execute(f"SELECT id, data FROM {byte_table}") assert cur.fetchall() == [(1, None)] def test_byte_multi_row( logged_db_params: ConnParams, byte_table: str ) -> None: """Multiple rows with BYTE columns — each gets its own SQ_FETCHBLOB.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.executemany( f"INSERT INTO {byte_table} VALUES (?, ?)", [ (1, b"first row data"), (2, b"second row content"), (3, b"third"), ], ) cur.execute( f"SELECT id, data FROM {byte_table} ORDER BY id" ) assert cur.fetchall() == [ (1, b"first row data"), (2, b"second row content"), (3, b"third"), ] def test_byte_binary_safe( logged_db_params: ConnParams, byte_table: str ) -> None: """BYTE preserves arbitrary binary data including nulls and high bytes.""" payload = bytes([0, 1, 255, 0, 254, 128, 0]) + b"\x00\x00\x00\xff" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload) ) cur.execute(f"SELECT data FROM {byte_table}") (got,) = cur.fetchone() assert got == payload # -------- TEXT round-trip -------- def test_text_roundtrip( logged_db_params: ConnParams, text_table: str ) -> None: """TEXT column round-trip: bytes in, str out (decoded as iso-8859-1).""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"INSERT INTO {text_table} VALUES (?, ?)", (1, b"this is some text content"), ) cur.execute(f"SELECT id, data FROM {text_table}") assert cur.fetchall() == [(1, "this is some text content")] def test_text_with_unicode_iso8859( logged_db_params: ConnParams, text_table: str ) -> None: """ISO-8859-1 characters preserved through the TEXT pipeline.""" payload = "café résumé naïve".encode("iso-8859-1") with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"INSERT INTO {text_table} VALUES (?, ?)", (1, payload) ) cur.execute(f"SELECT data FROM {text_table}") (got,) = cur.fetchone() assert got == "café résumé naïve" def test_text_null( logged_db_params: ConnParams, text_table: str ) -> None: """NULL TEXT column → Python None.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"INSERT INTO {text_table} VALUES (?, NULL)", (1,) ) cur.execute(f"SELECT data FROM {text_table}") assert cur.fetchone() == (None,) # -------- Mixed columns -------- def test_byte_alongside_other_types( logged_db_params: ConnParams, byte_table: str ) -> None: """A row with BYTE + INT columns — descriptor is in tuple, blob fetched separately.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.executemany( f"INSERT INTO {byte_table} VALUES (?, ?)", [(42, b"hello"), (99, b"world")], ) cur.execute( f"SELECT id, data FROM {byte_table} ORDER BY id" ) rows = cur.fetchall() assert rows == [(42, b"hello"), (99, b"world")]