"""Phase 11 integration tests — smart-LOB BLOB/CLOB write via SQ_FILE / filetoblob. Phase 10 implemented BLOB *read* by leveraging ``lotofile(...)`` and intercepting the resulting ``SQ_FILE`` (98) protocol. Phase 11 mirrors that pattern in the *write* direction: the user calls ``filetoblob('/sentinel', 'client')`` (or ``filetoclob``) with bytes pre-registered in ``cursor.virtual_files``. The server's read-from- client SQ_FILE optype=2 messages drive our handler to stream the registered bytes up. The high-level API is ``cursor.write_blob_column(sql, blob_data, params)`` which uses a ``BLOB_PLACEHOLDER`` token in the SQL. This is the symmetric counterpart of Phase 10's ``read_blob_column`` and the missing piece that makes the smart-LOB read+write loop complete entirely in pure Python — no JDBC needed for fixture seeding. """ from __future__ import annotations import contextlib from collections.abc import Iterator import pytest import informix_db from tests.conftest import ConnParams pytestmark = pytest.mark.integration def _connect(params: ConnParams) -> informix_db.Connection: return informix_db.connect( host=params.host, port=params.port, user=params.user, password=params.password, database=params.database, server=params.server, connect_timeout=10.0, read_timeout=10.0, autocommit=True, ) @pytest.fixture def blob_table(logged_db_params: ConnParams) -> Iterator[str]: """A fresh BLOB table per test, dropped on teardown.""" table = "t_p11_blob" with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") try: cur.execute(f"CREATE TABLE {table} (id INT, data BLOB)") except informix_db.Error as e: pytest.skip(f"sbspace unavailable ({e!r})") try: yield table finally: with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") @pytest.fixture def clob_table(logged_db_params: ConnParams) -> Iterator[str]: """A fresh CLOB table per test.""" table = "t_p11_clob" with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") try: cur.execute(f"CREATE TABLE {table} (id INT, txt CLOB)") except informix_db.Error as e: pytest.skip(f"sbspace unavailable ({e!r})") try: yield table finally: with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") # -------- BLOB write+read round-trip -------- def test_write_blob_round_trip_short( logged_db_params: ConnParams, blob_table: str ) -> None: """Short payload — single SQ_FILE_READ chunk.""" payload = b"hello phase 11 blob write" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.write_blob_column( f"INSERT INTO {blob_table} VALUES (?, BLOB_PLACEHOLDER)", payload, (1,), ) got = cur.read_blob_column( f"SELECT data FROM {blob_table} WHERE id = ?", (1,) ) assert got == payload def test_write_blob_round_trip_multichunk( logged_db_params: ConnParams, blob_table: str ) -> None: """50KB payload — spans many SQ_FILE_READ chunks (32KB cap each).""" payload = bytes(range(256)) * 200 # 51200 bytes with _connect(logged_db_params) as conn: cur = conn.cursor() cur.write_blob_column( f"INSERT INTO {blob_table} VALUES (?, BLOB_PLACEHOLDER)", payload, (1,), ) got = cur.read_blob_column( f"SELECT data FROM {blob_table} WHERE id = ?", (1,) ) assert got == payload assert len(got) == 51200 def test_write_blob_empty( logged_db_params: ConnParams, blob_table: str ) -> None: """Empty bytes round-trip cleanly.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.write_blob_column( f"INSERT INTO {blob_table} VALUES (?, BLOB_PLACEHOLDER)", b"", (1,), ) got = cur.read_blob_column( f"SELECT data FROM {blob_table} WHERE id = ?", (1,) ) assert got == b"" def test_write_blob_binary_safe( logged_db_params: ConnParams, blob_table: str ) -> None: """All-byte-values payload — no encoding artifacts.""" payload = bytes(range(256)) * 4 # 1024 bytes covering all values with _connect(logged_db_params) as conn: cur = conn.cursor() cur.write_blob_column( f"INSERT INTO {blob_table} VALUES (?, BLOB_PLACEHOLDER)", payload, (1,), ) got = cur.read_blob_column( f"SELECT data FROM {blob_table} WHERE id = ?", (1,) ) assert got == payload def test_write_blob_update( logged_db_params: ConnParams, blob_table: str ) -> None: """UPDATE with BLOB column replaces the prior value.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.write_blob_column( f"INSERT INTO {blob_table} VALUES (?, BLOB_PLACEHOLDER)", b"original", (1,), ) cur.write_blob_column( f"UPDATE {blob_table} SET data = BLOB_PLACEHOLDER WHERE id = ?", b"replacement", (1,), ) got = cur.read_blob_column( f"SELECT data FROM {blob_table} WHERE id = ?", (1,) ) assert got == b"replacement" def test_write_blob_multiple_rows( logged_db_params: ConnParams, blob_table: str ) -> None: """Distinct INSERTs round-trip independently.""" rows = [ (1, b"first row"), (2, b"second row blob"), (3, b"third"), ] with _connect(logged_db_params) as conn: cur = conn.cursor() for rid, payload in rows: cur.write_blob_column( f"INSERT INTO {blob_table} VALUES (?, BLOB_PLACEHOLDER)", payload, (rid,), ) for rid, expected in rows: got = cur.read_blob_column( f"SELECT data FROM {blob_table} WHERE id = ?", (rid,) ) assert got == expected # -------- CLOB -------- def test_write_clob_round_trip( logged_db_params: ConnParams, clob_table: str ) -> None: """``clob=True`` routes through ``filetoclob`` (not ``filetoblob``).""" text = "Lorem ipsum dolor sit amet, café résumé".encode("iso-8859-1") with _connect(logged_db_params) as conn: cur = conn.cursor() cur.write_blob_column( f"INSERT INTO {clob_table} VALUES (?, BLOB_PLACEHOLDER)", text, (1,), clob=True, ) got = cur.read_blob_column( f"SELECT txt FROM {clob_table} WHERE id = ?", (1,) ) assert got == text # -------- Helper validation -------- def test_write_blob_column_requires_placeholder( logged_db_params: ConnParams, blob_table: str ) -> None: """SQL without ``BLOB_PLACEHOLDER`` is rejected.""" with _connect(logged_db_params) as conn: cur = conn.cursor() with pytest.raises( informix_db.ProgrammingError, match="BLOB_PLACEHOLDER" ): cur.write_blob_column( f"INSERT INTO {blob_table} VALUES (1, NULL)", b"data", (), ) def test_virtual_files_cleared_after_call( logged_db_params: ConnParams, blob_table: str ) -> None: """``virtual_files`` doesn't leak the registered bytes between calls.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.write_blob_column( f"INSERT INTO {blob_table} VALUES (?, BLOB_PLACEHOLDER)", b"some data", (1,), ) # The default sentinel should have been removed assert "/tmp/_informix_db_blob_in" not in cur.virtual_files