"""Phase 10 integration tests — smart-LOB BLOB read via SQ_FILE / lotofile. Smart-LOB BLOBs return only a 72-byte locator in SELECT row data (Phase 9). Phase 10 lets users retrieve the actual bytes via the ``lotofile(blob_col, '/tmp/X', 'client')`` SQL function — the server orchestrates a ``SQ_FILE`` (98) protocol round-trip where the bytes flow back over the wire and we intercept them in memory. Three APIs are exposed: - ``cursor.execute("SELECT lotofile(...) FROM ...")`` followed by ``cursor.blob_files[filename]`` for the low-level path. - ``cursor.read_blob_column(sql, params)`` convenience wrapper. Test data is seeded via the JDBC reference client because writing smart-LOBs from our driver still requires the deferred SQ_FPROUTINE + SQ_LODATA stack. """ from __future__ import annotations import contextlib import shutil import subprocess from collections.abc import Iterator from pathlib import Path import pytest import informix_db from tests.conftest import ConnParams pytestmark = pytest.mark.integration def _connect(params: ConnParams) -> informix_db.Connection: return informix_db.connect( host=params.host, port=params.port, user=params.user, password=params.password, database=params.database, server=params.server, connect_timeout=10.0, read_timeout=10.0, autocommit=True, ) def _java_available() -> bool: return ( shutil.which("java") is not None and Path("build/ifxjdbc.jar").exists() ) def _seed_blob(table: str, payload: bytes) -> None: """Use JDBC to seed a BLOB row (since smart-LOB write needs Phase 11).""" helper_dir = Path("build/tests/reference") helper_dir.mkdir(parents=True, exist_ok=True) src_path = Path("/tmp/p10/tests/reference/SeedBlob.java") src_path.parent.mkdir(parents=True, exist_ok=True) src_path.write_text( 'package tests.reference;\n' 'import java.sql.*;\n' 'import java.io.*;\n' 'import java.util.Base64;\n' 'public class SeedBlob {\n' ' public static void main(String[] args) throws Exception {\n' ' String table = args[0];\n' ' byte[] payload = Base64.getDecoder().decode(args[1]);\n' ' Class.forName("com.informix.jdbc.IfxDriver");\n' ' try (Connection c = DriverManager.getConnection(\n' ' "jdbc:informix-sqli://127.0.0.1:9088/testdb:INFORMIXSERVER=informix",\n' ' "informix", "in4mix")) {\n' ' c.setAutoCommit(true);\n' ' try (Statement s = c.createStatement()) {\n' ' try { s.execute("DROP TABLE " + table); } catch (Exception e) {}\n' ' s.execute("CREATE TABLE " + table + " (id INT, data BLOB)");\n' ' }\n' ' try (PreparedStatement ps = c.prepareStatement(\n' ' "INSERT INTO " + table + " VALUES (1, ?)")) {\n' ' ps.setBytes(1, payload);\n' ' ps.executeUpdate();\n' ' }\n' ' }\n' ' }\n' '}\n' ) subprocess.run( ["javac", "-cp", "build/ifxjdbc.jar", "-d", "build/", str(src_path)], check=True, capture_output=True, ) import base64 subprocess.run( [ "java", "-cp", "build/ifxjdbc.jar:build/", "tests.reference.SeedBlob", table, base64.b64encode(payload).decode(), ], check=True, capture_output=True, ) @pytest.fixture def small_blob(logged_db_params: ConnParams) -> Iterator[str]: """A BLOB table seeded with a small payload.""" if not _java_available(): pytest.skip("JDBC reference client unavailable") table = "p10_small" payload = b"hello phase 10 lotofile" _seed_blob(table, payload) try: yield table finally: with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") @pytest.fixture def big_blob(logged_db_params: ConnParams) -> Iterator[tuple[str, bytes]]: """A BLOB table seeded with a multi-chunk (>1KB) payload.""" if not _java_available(): pytest.skip("JDBC reference client unavailable") table = "p10_big" # 30000 bytes — spans many SQ_FILE_WRITE chunks payload = (b"X" * 10_000) + (b"Y" * 10_000) + (b"Z" * 10_000) _seed_blob(table, payload) try: yield table, payload finally: with _connect(logged_db_params) as conn: cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") # -------- Low-level lotofile + blob_files -------- def test_lotofile_low_level( logged_db_params: ConnParams, small_blob: str ) -> None: """``SELECT lotofile(...)`` populates ``cursor.blob_files``.""" with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"SELECT lotofile(data, '/tmp/test_low', 'client') " f"FROM {small_blob}" ) # The result column is the actual filename the server used # (with a generated suffix). The bytes were captured via SQ_FILE. (fname,) = cur.fetchone() assert fname.startswith("/tmp/test_low.") assert fname in cur.blob_files assert cur.blob_files[fname] == b"hello phase 10 lotofile" def test_lotofile_multichunk( logged_db_params: ConnParams, big_blob: tuple[str, bytes] ) -> None: """30KB BLOB read across multiple SQ_FILE_WRITE chunks.""" table, payload = big_blob with _connect(logged_db_params) as conn: cur = conn.cursor() cur.execute( f"SELECT lotofile(data, '/tmp/test_big', 'client') FROM {table}" ) cur.fetchone() captured = next(iter(cur.blob_files.values())) assert len(captured) == 30_000 assert captured == payload # -------- High-level read_blob_column helper -------- def test_read_blob_column_simple( logged_db_params: ConnParams, small_blob: str ) -> None: """``cursor.read_blob_column`` returns the BLOB bytes directly.""" with _connect(logged_db_params) as conn: cur = conn.cursor() data = cur.read_blob_column( f"SELECT data FROM {small_blob} WHERE id = ?", (1,) ) assert data == b"hello phase 10 lotofile" def test_read_blob_column_no_match( logged_db_params: ConnParams, small_blob: str ) -> None: """When the WHERE clause matches no rows, returns None.""" with _connect(logged_db_params) as conn: cur = conn.cursor() data = cur.read_blob_column( f"SELECT data FROM {small_blob} WHERE id = ?", (9999,) ) assert data is None def test_read_blob_column_big( logged_db_params: ConnParams, big_blob: tuple[str, bytes] ) -> None: """30KB BLOB via the convenience helper.""" table, payload = big_blob with _connect(logged_db_params) as conn: cur = conn.cursor() data = cur.read_blob_column( f"SELECT data FROM {table} WHERE id = ?", (1,) ) assert data == payload def test_read_blob_column_validates_select( logged_db_params: ConnParams, small_blob: str ) -> None: """``read_blob_column`` rejects non-SELECT statements.""" with _connect(logged_db_params) as conn: cur = conn.cursor() with pytest.raises(informix_db.ProgrammingError, match="SELECT"): cur.read_blob_column( f"DELETE FROM {small_blob}", () ) with pytest.raises(informix_db.ProgrammingError, match="FROM"): cur.read_blob_column("SELECT 1", ())