diff --git a/docs/DECISION_LOG.md b/docs/DECISION_LOG.md index 469c2c5..1e64f0d 100644 --- a/docs/DECISION_LOG.md +++ b/docs/DECISION_LOG.md @@ -719,6 +719,85 @@ Estimating Phase 10 at ~2x the protocol surface of Phase 8. --- +## 2026-05-04 — Phase 10: smart-LOB BLOB read via SQ_FILE / lotofile + +**Status**: active +**Decision**: Implemented BLOB read end-to-end via the **`SQ_FILE` (98) protocol** rather than the heavier `SQ_FPROUTINE` (103) + `SQ_LODATA` (97) stack that the earlier Phase 9 entry estimated as 2x Phase 8. The actual implementation came in much smaller because we leveraged a server-side SQL function (`lotofile`) that orchestrates the byte transfer, with our driver acting as a remote filesystem. + +### The strategic pivot + +Initial estimate for Phase 10 was: implement `SQ_FPROUTINE` (RPC fast-path with UDT parameter marshaling) + `SQ_LODATA` (chunked transfer to/from open file descriptors). Both are big new wire-protocol surfaces. + +Then I discovered that `SELECT ifx_lo_open(blob_col, 4) FROM tbl` works as **regular SQL** — the server reads the locator from the column itself and passes it to the function, returning the file descriptor as an INT result. No client-side UDT marshaling needed. But that was a partial win — we'd still need `SQ_LODATA` for actually transferring the bytes after the open. + +Then I tried `SELECT lotofile(blob_col, '/path', 'client') FROM tbl` — and the server responded with `unexpected tag in FETCH response: 0x0062`. That tag is **`SQ_FILE`** — a *separate* protocol I hadn't recognized as relevant. Reading the JDBC source: `SQ_FILE` is the "remote filesystem" protocol where the server tells the client to act as a file server (open a path, accept these chunks, close). The bytes flow back to us automatically. + +The key insight: **`lotofile(...)` is a server-side function that orchestrates the entire transfer in one SQL statement**. The client doesn't need to do `ifx_lo_open` → `ifx_lo_read` → `ifx_lo_close`. Just write the SQL, intercept the `SQ_FILE` messages, return the bytes. Maybe 1/3 the protocol surface I'd planned. + +### Wire protocol — SQ_FILE (98) + +The server sends `SQ_FILE` messages with sub-types (per `IfxSqli.receiveSQFILE` line 4980): +- **0 (open)**: `[short fnameLen][padded fname][int mode][int flags][int offset][short SQ_EOT]`. Client opens the named file. We respond with `[short SQ_EOT]`. +- **3 (write to client)**: stream of `[short SQ_FILE_WRITE=107][short bufSize][padded data]` chunks, terminated by `SQ_EOT`. We respond with `[short 107][int totalBytesWritten][short SQ_EOT]`. +- **1 (close)**: `[short SQ_EOT]`. We respond with `[short SQ_EOT]`. +- (2 = read-from-client / `filetoblob` path; not implemented this phase.) + +Our implementation buffers writes in memory (`bytearray`) keyed by the requested filename; the bytes never touch disk. Users retrieve via `cursor.blob_files[filename]`. + +### Implementation: in-memory file emulation + +```python +# In cursor state: +self.blob_files: dict[str, bytes] = {} # filename -> assembled bytes +self._sqfile_current_name: str | None = None +self._sqfile_current_buf: bytearray | None = None + +# In _read_fetch_response, when tag == 98: +self._handle_sq_file(reader) +``` + +The handler dispatches by optype: open creates a fresh buffer, write extends it, close seals it into `blob_files`. + +### Bonus discovery: UDTVAR(lvarchar) row decoding + +`SELECT lotofile(...)` returns its result as **UDTVAR (type 40) with extended_name="lvarchar"** — not as plain LVARCHAR. The wire format is `[byte indicator][int length][bytes]` (vs. plain LVARCHAR's `[int length][bytes]`). Added a row-decoder branch that handles this — needed to surface the actual filename string instead of raw locator bytes. + +### High-level helper: `cursor.read_blob_column` + +For the common case "give me the bytes of column X from row matching Y", added a convenience method that wraps the user's SQL with `lotofile(...)` and returns the assembled bytes: + +```python +data: bytes = cur.read_blob_column( + "SELECT data FROM photos WHERE id = ?", (42,) +) +``` + +Naive SQL splitter that handles the common shape (single column, FROM clause). Power users can drop down to manual `lotofile` + `cur.blob_files[name]`. + +### Test coverage + +6 integration tests in `tests/test_smart_lob_read.py`: +- Low-level `lotofile` + `blob_files` lookup +- 30KB BLOB across multiple SQ_FILE_WRITE chunks +- High-level `read_blob_column` simple case +- `read_blob_column` returns `None` when no rows match +- High-level helper for 30KB BLOB +- `read_blob_column` validation (rejects non-SELECT and FROM-less SQL) + +Total project tests: **64 unit + 117 integration = 181 tests**. + +### What's still deferred (Phase 11+) + +- **Smart-LOB write**: `INSERT INTO tbl VALUES (?, ?)` with a `bytes` BLOB parameter still requires the full `SQ_FPROUTINE` + `SQ_LODATA` stack to invoke `ifx_lo_create` + write chunks. There's no `lotofromfile_client(bytes)` SQL function with the same shape as `lotofile`. +- **`BlobLocator.read(connection)`**: an OO API would be nice but requires reverse-mapping a locator back to its source — which the `SQ_FPROUTINE` path does naturally, but the `lotofile` path does not. +- **`filetoblob` path**: server-as-reader (SQ_FILE optype 2) — for streaming files from client to server. + +### Lesson + +**Don't estimate protocol-implementation cost from JDBC's class hierarchy alone.** JDBC's `IfxSmBlob` class is 600+ lines and looks like a massive surface, but the actual *wire-level* read path can be reduced to a single SQL function (`lotofile`) plus one new tag handler (`SQ_FILE`). When estimating, look at the wire trace, not the client SDK abstractions. The wire is often simpler than the SDK suggests. + +--- + ## (template — copy below this line for new entries) ``` diff --git a/src/informix_db/_resultset.py b/src/informix_db/_resultset.py index 4b7fe59..49b985d 100644 --- a/src/informix_db/_resultset.py +++ b/src/informix_db/_resultset.py @@ -289,8 +289,7 @@ def parse_tuple_payload( # these as UDTFIXED (type 41) with extended_id 10 (BLOB) or 11 # (CLOB) and encoded_length = 72 (locator size). The 72 bytes # we read here are an opaque server-side reference, NOT the - # actual data. To fetch bytes, the client must call ``ifx_lo_open`` - # via SQ_FPROUTINE then SQ_LODATA(LO_READ) — deferred to Phase 10. + # actual data. Phase 10 lets users fetch via lotofile + SQ_FILE. if base == int(IfxType.UDTFIXED) and col.extended_id in (10, 11): from .converters import BlobLocator, ClobLocator width = col.encoded_length @@ -300,6 +299,30 @@ def parse_tuple_payload( values.append(cls(raw=bytes(raw))) continue + # UDTVAR (type 40) with extended_name="lvarchar": this is what + # functions like ``lotofile`` return — a length-prefixed string + # wrapped as a UDT. The wire format adds a 1-byte indicator + # prefix BEFORE the LVARCHAR ``[int len][bytes]``. Empirically + # verified against ``SELECT lotofile(...)`` row data — the + # leading ``00`` is null indicator (0=not null, 1=null per UDT + # convention). + if base == int(IfxType.UDTVAR) and col.extended_name == "lvarchar": + indicator = payload[offset] + offset += 1 + if indicator == 1: + values.append(None) + continue + length = int.from_bytes( + payload[offset:offset + 4], "big", signed=True + ) + offset += 4 + raw = payload[offset:offset + length] + offset += length + if length & 1: + offset += 1 + values.append(raw.decode("iso-8859-1")) + continue + # Fixed-width types width = FIXED_WIDTHS.get(base) if width is None: diff --git a/src/informix_db/cursors.py b/src/informix_db/cursors.py index 3249503..60aa5d1 100644 --- a/src/informix_db/cursors.py +++ b/src/informix_db/cursors.py @@ -93,6 +93,15 @@ class Cursor: # from. Empirically the server accepts 0 here even when a real # ID was assigned, so this is best-effort tracking. self._statement_id: int = 0 + # Phase 10: smart-LOB read via ``lotofile(col, path, 'client')``. + # The server orchestrates a SQ_FILE (98) protocol where it tells + # us to "open file X, write these bytes, close". We emulate the + # filesystem in memory — the bytes are buffered keyed by the + # filename the server requested. Users can retrieve them via + # ``cursor.blob_files[path]`` after the SELECT completes. + self.blob_files: dict[str, bytes] = {} + self._sqfile_current_name: str | None = None + self._sqfile_current_buf: bytearray | None = None # -- PEP 249 attributes ------------------------------------------------ @@ -250,6 +259,153 @@ class Cursor: new_rows.append(tuple(row_list)) self._rows = new_rows + def read_blob_column( + self, + select_blob_sql: str, + params: tuple = (), + *, + sentinel: str = "/tmp/_informix_db_blob", + ) -> bytes | None: + """Fetch the bytes of a single smart-LOB BLOB column. + + Wraps the user's SQL with ``lotofile(, sentinel, 'client')``, + runs the query, and returns the bytes assembled from the SQ_FILE + stream. Returns ``None`` if the row's BLOB value is NULL or if + no rows match. + + Phase 10 implements this via the SQ_FILE protocol — the server + writes the BLOB content to a "file" on the client side and + we intercept those writes into an in-memory buffer. This avoids + implementing the heavier SQ_FPROUTINE + SQ_LODATA stack. + + Caveat: ``select_blob_sql`` must be a SELECT statement returning + exactly one BLOB-typed column. For multi-row reads, call this + method per row (e.g., add a ``WHERE`` clause to scope to one row). + For more general use, run the wrapped SQL yourself and inspect + ``self.blob_files``. + + Example:: + + data = cur.read_blob_column( + "SELECT data FROM photos WHERE id = ?", (42,) + ) + """ + # Find the column expression — drop "SELECT " prefix + sql = select_blob_sql.strip() + upper = sql.upper() + if not upper.startswith("SELECT"): + raise ProgrammingError( + "read_blob_column requires a SELECT statement" + ) + rest = sql[6:].lstrip() # everything after SELECT + # Find the FROM keyword (whitespace-bounded). Naive split — works + # for the common case where the user is selecting a single column. + from_idx = rest.upper().find(" FROM ") + if from_idx < 0: + raise ProgrammingError( + "read_blob_column requires a FROM clause" + ) + col_expr = rest[:from_idx].strip() + tail = rest[from_idx:] + wrapped_sql = ( + f"SELECT lotofile({col_expr}, '{sentinel}', 'client'){tail}" + ) + # Reset blob_files so we only see the one we just fetched + self.blob_files = {} + self.execute(wrapped_sql, params) + row = self.fetchone() + if row is None: + return None + # Server returns a generated filename based on the sentinel + if not self.blob_files: + return None + # If user only fetched one row, the dict has one entry + return next(iter(self.blob_files.values())) + + def _handle_sq_file(self, reader: IfxStreamReader) -> None: + """Process an SQ_FILE (98) message from the server. + + The server treats us as a remote filesystem: it tells us to + "open file X, write these bytes, close". We emulate the + filesystem in memory — chunks land in ``self._sqfile_current_buf`` + keyed by ``self._sqfile_current_name``, then sealed into + ``self.blob_files`` on close. + + Sub-types (per ``IfxSqli.receiveSQFILE`` line 4980): + - 0 (open): ``[short fnameLen][bytes fname][int mode][int flags] + [int offset][short SQ_EOT]``. We acknowledge with + ``[short SQ_EOT]``. + - 3 (write to client): ``[short SQ_FILE_WRITE=107][short bufSize] + [padded data]`` repeated, terminated by ``SQ_EOT``. We + respond with ``[short 107][int totalSize][short SQ_EOT]``. + - 1 (close): ``[short SQ_EOT]``. We respond with ``[short SQ_EOT]``. + - 2 (read from client): unimplemented (would be filetoblob path). + """ + optype = reader.read_short() + if optype == 0: # open + name_len = reader.read_short() + fname_bytes = reader.read_exact(name_len) + if name_len & 1: + reader.read_exact(1) # readPadded pad + fname = fname_bytes.decode("iso-8859-1") + reader.read_int() # mode (ignored — we're in-memory) + reader.read_int() # flags (ignored) + reader.read_int() # offset (ignored — start at 0) + tail = reader.read_short() + if tail != MessageType.SQ_EOT: + raise DatabaseError( + f"SQ_FILE open: expected SQ_EOT, got 0x{tail:04x}" + ) + self._sqfile_current_name = fname + self._sqfile_current_buf = bytearray() + # Acknowledge with bare SQ_EOT (mirrors JDBC's flip() flush) + self._conn._send_pdu( + struct.pack("!h", MessageType.SQ_EOT) + ) + elif optype == 3: # write to client + total = 0 + while True: + chunk_tag = reader.read_short() + if chunk_tag == MessageType.SQ_EOT: + break + if chunk_tag != MessageType.SQ_FILE_WRITE: # 107 + raise DatabaseError( + f"SQ_FILE write: unexpected tag 0x{chunk_tag:04x}" + ) + buf_size = reader.read_short() + data = reader.read_exact(buf_size) + if buf_size & 1: + reader.read_exact(1) # writePadded pad + if self._sqfile_current_buf is not None: + self._sqfile_current_buf.extend(data) + total += buf_size + # Respond with [short 107][int totalSize][short SQ_EOT] + self._conn._send_pdu( + struct.pack( + "!hih", MessageType.SQ_FILE_WRITE, total, MessageType.SQ_EOT + ) + ) + elif optype == 1: # close + tail = reader.read_short() + if tail != MessageType.SQ_EOT: + raise DatabaseError( + f"SQ_FILE close: expected SQ_EOT, got 0x{tail:04x}" + ) + if self._sqfile_current_name is not None and self._sqfile_current_buf is not None: + self.blob_files[self._sqfile_current_name] = bytes( + self._sqfile_current_buf + ) + self._sqfile_current_name = None + self._sqfile_current_buf = None + self._conn._send_pdu( + struct.pack("!h", MessageType.SQ_EOT) + ) + else: + raise DatabaseError( + f"SQ_FILE: unsupported optype {optype} (0=open, 1=close, " + f"2=read-from-client, 3=write-to-client; 2 is unimplemented)" + ) + def _fetch_blob(self, descriptor: bytes) -> bytes: """Send SQ_FETCHBLOB and read the SQ_BLOB stream until terminator.""" writer, buf = make_pdu_writer() @@ -690,6 +846,8 @@ class Cursor: reader.read_int() elif tag == MessageType.SQ_XACTSTAT: reader.read_exact(2 + 2 + 2) + elif tag == 98: # SQ_FILE — server orchestrates a file transfer + self._handle_sq_file(reader) elif tag == MessageType.SQ_ERR: self._raise_sq_err(reader) else: diff --git a/tests/test_smart_lob_read.py b/tests/test_smart_lob_read.py new file mode 100644 index 0000000..70f7e8e --- /dev/null +++ b/tests/test_smart_lob_read.py @@ -0,0 +1,227 @@ +"""Phase 10 integration tests — smart-LOB BLOB read via SQ_FILE / lotofile. + +Smart-LOB BLOBs return only a 72-byte locator in SELECT row data (Phase 9). +Phase 10 lets users retrieve the actual bytes via the +``lotofile(blob_col, '/tmp/X', 'client')`` SQL function — the server +orchestrates a ``SQ_FILE`` (98) protocol round-trip where the bytes +flow back over the wire and we intercept them in memory. + +Three APIs are exposed: + - ``cursor.execute("SELECT lotofile(...) FROM ...")`` followed by + ``cursor.blob_files[filename]`` for the low-level path. + - ``cursor.read_blob_column(sql, params)`` convenience wrapper. + +Test data is seeded via the JDBC reference client because writing +smart-LOBs from our driver still requires the deferred SQ_FPROUTINE ++ SQ_LODATA stack. +""" + +from __future__ import annotations + +import contextlib +import shutil +import subprocess +from collections.abc import Iterator +from pathlib import Path + +import pytest + +import informix_db +from tests.conftest import ConnParams + +pytestmark = pytest.mark.integration + + +def _connect(params: ConnParams) -> informix_db.Connection: + return informix_db.connect( + host=params.host, + port=params.port, + user=params.user, + password=params.password, + database=params.database, + server=params.server, + connect_timeout=10.0, + read_timeout=10.0, + autocommit=True, + ) + + +def _java_available() -> bool: + return ( + shutil.which("java") is not None + and Path("build/ifxjdbc.jar").exists() + ) + + +def _seed_blob(table: str, payload: bytes) -> None: + """Use JDBC to seed a BLOB row (since smart-LOB write needs Phase 11).""" + helper_dir = Path("build/tests/reference") + helper_dir.mkdir(parents=True, exist_ok=True) + src_path = Path("/tmp/p10/tests/reference/SeedBlob.java") + src_path.parent.mkdir(parents=True, exist_ok=True) + src_path.write_text( + 'package tests.reference;\n' + 'import java.sql.*;\n' + 'import java.io.*;\n' + 'import java.util.Base64;\n' + 'public class SeedBlob {\n' + ' public static void main(String[] args) throws Exception {\n' + ' String table = args[0];\n' + ' byte[] payload = Base64.getDecoder().decode(args[1]);\n' + ' Class.forName("com.informix.jdbc.IfxDriver");\n' + ' try (Connection c = DriverManager.getConnection(\n' + ' "jdbc:informix-sqli://127.0.0.1:9088/testdb:INFORMIXSERVER=informix",\n' + ' "informix", "in4mix")) {\n' + ' c.setAutoCommit(true);\n' + ' try (Statement s = c.createStatement()) {\n' + ' try { s.execute("DROP TABLE " + table); } catch (Exception e) {}\n' + ' s.execute("CREATE TABLE " + table + " (id INT, data BLOB)");\n' + ' }\n' + ' try (PreparedStatement ps = c.prepareStatement(\n' + ' "INSERT INTO " + table + " VALUES (1, ?)")) {\n' + ' ps.setBytes(1, payload);\n' + ' ps.executeUpdate();\n' + ' }\n' + ' }\n' + ' }\n' + '}\n' + ) + subprocess.run( + ["javac", "-cp", "build/ifxjdbc.jar", "-d", "build/", str(src_path)], + check=True, capture_output=True, + ) + import base64 + subprocess.run( + [ + "java", "-cp", "build/ifxjdbc.jar:build/", + "tests.reference.SeedBlob", + table, base64.b64encode(payload).decode(), + ], + check=True, capture_output=True, + ) + + +@pytest.fixture +def small_blob(logged_db_params: ConnParams) -> Iterator[str]: + """A BLOB table seeded with a small payload.""" + if not _java_available(): + pytest.skip("JDBC reference client unavailable") + table = "p10_small" + payload = b"hello phase 10 lotofile" + _seed_blob(table, payload) + try: + yield table + finally: + with _connect(logged_db_params) as conn: + cur = conn.cursor() + with contextlib.suppress(Exception): + cur.execute(f"DROP TABLE {table}") + + +@pytest.fixture +def big_blob(logged_db_params: ConnParams) -> Iterator[tuple[str, bytes]]: + """A BLOB table seeded with a multi-chunk (>1KB) payload.""" + if not _java_available(): + pytest.skip("JDBC reference client unavailable") + table = "p10_big" + # 30000 bytes — spans many SQ_FILE_WRITE chunks + payload = (b"X" * 10_000) + (b"Y" * 10_000) + (b"Z" * 10_000) + _seed_blob(table, payload) + try: + yield table, payload + finally: + with _connect(logged_db_params) as conn: + cur = conn.cursor() + with contextlib.suppress(Exception): + cur.execute(f"DROP TABLE {table}") + + +# -------- Low-level lotofile + blob_files -------- + + +def test_lotofile_low_level( + logged_db_params: ConnParams, small_blob: str +) -> None: + """``SELECT lotofile(...)`` populates ``cursor.blob_files``.""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"SELECT lotofile(data, '/tmp/test_low', 'client') " + f"FROM {small_blob}" + ) + # The result column is the actual filename the server used + # (with a generated suffix). The bytes were captured via SQ_FILE. + (fname,) = cur.fetchone() + assert fname.startswith("/tmp/test_low.") + assert fname in cur.blob_files + assert cur.blob_files[fname] == b"hello phase 10 lotofile" + + +def test_lotofile_multichunk( + logged_db_params: ConnParams, big_blob: tuple[str, bytes] +) -> None: + """30KB BLOB read across multiple SQ_FILE_WRITE chunks.""" + table, payload = big_blob + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"SELECT lotofile(data, '/tmp/test_big', 'client') FROM {table}" + ) + cur.fetchone() + captured = next(iter(cur.blob_files.values())) + assert len(captured) == 30_000 + assert captured == payload + + +# -------- High-level read_blob_column helper -------- + + +def test_read_blob_column_simple( + logged_db_params: ConnParams, small_blob: str +) -> None: + """``cursor.read_blob_column`` returns the BLOB bytes directly.""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + data = cur.read_blob_column( + f"SELECT data FROM {small_blob} WHERE id = ?", (1,) + ) + assert data == b"hello phase 10 lotofile" + + +def test_read_blob_column_no_match( + logged_db_params: ConnParams, small_blob: str +) -> None: + """When the WHERE clause matches no rows, returns None.""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + data = cur.read_blob_column( + f"SELECT data FROM {small_blob} WHERE id = ?", (9999,) + ) + assert data is None + + +def test_read_blob_column_big( + logged_db_params: ConnParams, big_blob: tuple[str, bytes] +) -> None: + """30KB BLOB via the convenience helper.""" + table, payload = big_blob + with _connect(logged_db_params) as conn: + cur = conn.cursor() + data = cur.read_blob_column( + f"SELECT data FROM {table} WHERE id = ?", (1,) + ) + assert data == payload + + +def test_read_blob_column_validates_select( + logged_db_params: ConnParams, small_blob: str +) -> None: + """``read_blob_column`` rejects non-SELECT statements.""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + with pytest.raises(informix_db.ProgrammingError, match="SELECT"): + cur.read_blob_column( + f"DELETE FROM {small_blob}", () + ) + with pytest.raises(informix_db.ProgrammingError, match="FROM"): + cur.read_blob_column("SELECT 1", ())