"""Phase 10 integration tests — smart-LOB BLOB read via SQ_FILE / lotofile. Smart-LOB BLOBs return only a 72-byte locator in SELECT row data (Phase 9). Phase 10 lets users retrieve the actual bytes via the ``lotofile(blob_col, '/tmp/X', 'client')`` SQL function — the server orchestrates a ``SQ_FILE`` (98) protocol round-trip where the bytes flow back over the wire and we intercept them in memory. Three APIs are exposed: - ``cursor.execute("SELECT lotofile(...) FROM ...")`` followed by ``cursor.blob_files[filename]`` for the low-level path. - ``cursor.read_blob_column(sql, params)`` convenience wrapper. After Phase 11, smart-LOB writes also work in pure Python — so test data is now seeded via our own driver instead of the JDBC reference client. The full read+write loop is end-to-end pure Python. """ from __future__ import annotations import contextlib from collections.abc import Iterator import pytest import informix_db from tests.conftest import ConnParams pytestmark = pytest.mark.integration def _connect(params: ConnParams) -> informix_db.Connection: return informix_db.connect( host=params.host, port=params.port, user=params.user, password=params.password, database=params.database, server=params.server, connect_timeout=10.0, read_timeout=10.0, autocommit=True, ) @pytest.fixture def small_blob(logged_db_params: ConnParams) -> Iterator[str]: """A BLOB table seeded (via Phase 11 write) with a small payload.""" table = "p10_small" payload = b"hello phase 10 lotofile" with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") try: cur.execute(f"CREATE TABLE {table} (id INT, data BLOB)") except informix_db.Error as e: pytest.skip(f"sbspace unavailable ({e!r})") cur.write_blob_column( f"INSERT INTO {table} VALUES (?, BLOB_PLACEHOLDER)", payload, (1,), ) try: yield table finally: with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") @pytest.fixture def big_blob(logged_db_params: ConnParams) -> Iterator[tuple[str, bytes]]: """A BLOB table seeded with a multi-chunk (>1KB) payload.""" table = "p10_big" # 30000 bytes — spans many SQ_FILE_WRITE chunks payload = (b"X" * 10_000) + (b"Y" * 10_000) + (b"Z" * 10_000) with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") try: cur.execute(f"CREATE TABLE {table} (id INT, data BLOB)") except informix_db.Error as e: pytest.skip(f"sbspace unavailable ({e!r})") cur.write_blob_column( f"INSERT INTO {table} VALUES (?, BLOB_PLACEHOLDER)", payload, (1,), ) try: yield table, payload finally: with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") # -------- Low-level lotofile + blob_files -------- def test_lotofile_low_level( logged_db_params: ConnParams, small_blob: str ) -> None: """``SELECT lotofile(...)`` populates ``cursor.blob_files``.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"SELECT lotofile(data, '/tmp/test_low', 'client') " f"FROM {small_blob}" ) # The result column is the actual filename the server used # (with a generated suffix). The bytes were captured via SQ_FILE. (fname,) = cur.fetchone() assert fname.startswith("/tmp/test_low.") assert fname in cur.blob_files assert cur.blob_files[fname] == b"hello phase 10 lotofile" def test_lotofile_multichunk( logged_db_params: ConnParams, big_blob: tuple[str, bytes] ) -> None: """30KB BLOB read across multiple SQ_FILE_WRITE chunks.""" table, payload = big_blob with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"SELECT lotofile(data, '/tmp/test_big', 'client') FROM {table}" ) cur.fetchone() captured = next(iter(cur.blob_files.values())) assert len(captured) == 30_000 assert captured == payload # -------- High-level read_blob_column helper -------- def test_read_blob_column_simple( logged_db_params: ConnParams, small_blob: str ) -> None: """``cursor.read_blob_column`` returns the BLOB bytes directly.""" with _connect(logged_db_params) as conn: cur = conn.cursor() data = cur.read_blob_column( f"SELECT data FROM {small_blob} WHERE id = ?", (1,) ) assert data == b"hello phase 10 lotofile" def test_read_blob_column_no_match( logged_db_params: ConnParams, small_blob: str ) -> None: """When the WHERE clause matches no rows, returns None.""" with _connect(logged_db_params) as conn: cur = conn.cursor() data = cur.read_blob_column( f"SELECT data FROM {small_blob} WHERE id = ?", (9999,) ) assert data is None def test_read_blob_column_big( logged_db_params: ConnParams, big_blob: tuple[str, bytes] ) -> None: """30KB BLOB via the convenience helper.""" table, payload = big_blob with _connect(logged_db_params) as conn: cur = conn.cursor() data = cur.read_blob_column( f"SELECT data FROM {table} WHERE id = ?", (1,) ) assert data == payload def test_read_blob_column_validates_select( logged_db_params: ConnParams, small_blob: str ) -> None: """``read_blob_column`` rejects non-SELECT statements.""" with _connect(logged_db_params) as conn: cur = conn.cursor() with pytest.raises(informix_db.ProgrammingError, match="SELECT"): cur.read_blob_column( f"DELETE FROM {small_blob}", () ) with pytest.raises(informix_db.ProgrammingError, match="FROM"): cur.read_blob_column("SELECT 1", ())