diff --git a/docs/DECISION_LOG.md b/docs/DECISION_LOG.md index 3154d48..17f885a 100644 --- a/docs/DECISION_LOG.md +++ b/docs/DECISION_LOG.md @@ -589,6 +589,76 @@ The ``conftest.py::_ensure_testdb`` fixture auto-creates ``testdb WITH LOG`` if --- +## 2026-05-04 — Phase 8: BYTE / TEXT bind+read (the SQ_BBIND/SQ_BLOB protocol) + +**Status**: active +**Decision**: BYTE (type 11) and TEXT (type 12) round-trip end-to-end. Python `bytes`/`bytearray` map to BYTE; `str` is auto-encoded as ISO-8859-1 for TEXT (matching the server's default codeset). NULL is byte 39 of the descriptor. + +### Wire protocol — write side + +A BYTE/TEXT param uses **two** PDU sections within the same SQ_BIND envelope: + +1. **Inline placeholder** (per `IfxBlob.toIfx` line 162): a 56-byte blob descriptor with **only** the size at offset [16..19] as a 4-byte big-endian int. All other bytes are zero. (For NULL, byte 39 is set to 1.) +2. **SQ_BBIND stream** (per `IfxSqli.sendBlob` line 3328): after all per-param SQ_BIND blocks, emit `[short SQ_BBIND=41][short blob_count]`, then for each blob param stream chunked SQ_BLOB messages: `[short SQ_BLOB=39][short chunk_len][padded data]` (max 1024 bytes/chunk per JDBC's `sendStreamBlob`), ending with a zero-length terminator `[short SQ_BLOB=39][short 0]`. + +Then SQ_EXECUTE proceeds normally. + +### Wire protocol — read side + +The SQ_TUPLE payload returns only the 56-byte descriptor for BYTE/TEXT columns — the actual bytes live in the blobspace. The client must explicitly fetch via SQ_FETCHBLOB (per `IfxSqli.sendFetchBlob` line 3716): + +``` +[short SQ_ID=4][int 38=SQ_FETCHBLOB][padded 56-byte descriptor][short SQ_EOT] +``` + +The server replies with one or more SQ_BLOB chunks ending with a zero-length terminator. The descriptor's locator is **only valid while the cursor is open** — the dereferencing must happen between the final NFETCH and CLOSE. Doing it after CLOSE returns -602 (Cannot open blob) with ISAM -101. + +### Server-side prerequisites + +The IBM dev container needs three things, in this order, before BYTE/TEXT works at all: +1. **A blobspace**: `onspaces -c -b blobspace1 -p /path -o 0 -s 50000` +2. **A logged database**: `CREATE DATABASE testdb WITH LOG` (BYTE/TEXT rejected in unlogged DBs with sqlcode -617) +3. **Config + level-0 archive to allow chunk page allocation**: + ```bash + onmode -wm LTAPEDEV=/dev/null + onmode -wm TAPEDEV=/dev/null + onmode -l # advance logical log + ontape -s -L 0 -t /dev/null # level-0 archive + ``` + Without the archive, JDBC fails identically to our driver with "Cannot close blob — BLOB pages can't be allocated from a chunk until chunk add is logged" (ISAM -169). **This was the unblocker that confirmed our protocol implementation was correct** — when JDBC and our driver fail identically against the same broken server config, you've got byte-for-byte protocol parity. Then fix the server. + +### Architectural note: rest-of-the-codec-types-vs-this-one + +Phase 6.a/c/e (DECIMAL/DATETIME/INTERVAL) shipped fast because each type was a single-PDU codec — encode bytes, send inline. BYTE/TEXT required **state-machine surgery**: +- The bind builder now knows about "blob-aware" params and queues them for a separate stream after the per-param block. +- The cursor's SELECT lifecycle now does a SQ_FETCHBLOB round-trip per blob column per row before sending CLOSE. +- The dereferencing is a separate read loop that handles its own SQ_DONE/SQ_COST/SQ_XACTSTAT interleaving. + +The smart-LOB family (BLOB type 102, CLOB type 101) is a **further** state-machine extension — they use `IfxLocator` references against sbspace and require an LO_OPEN/LO_READ/LO_WRITE/LO_CLOSE session protocol entirely separate from BBIND/BLOB. That's deferred to Phase 9. + +### Test coverage delivered + +9 integration tests in `tests/test_blob.py`: +- `test_byte_roundtrip_short` — single-chunk payload +- `test_byte_roundtrip_multichunk` — 5120 bytes (5 chunks at 1024 each) +- `test_byte_null` — null descriptor (byte 39=1) → Python None +- `test_byte_multi_row` — three rows, each with its own SQ_FETCHBLOB +- `test_byte_binary_safe` — preserves null bytes, high bytes, etc. +- `test_text_roundtrip` — TEXT column, str returned (decoded) +- `test_text_with_unicode_iso8859` — extended-Latin chars round-trip +- `test_text_null` +- `test_byte_alongside_other_types` — BYTE column mixed with INT + +Plus the Phase 4 `test_unsupported_param_type_raises` was updated — `bytes` is no longer the canonical "unsupported" sentinel, since we now support it. Switched to a custom Python class for that role. + +### The "JDBC fails identically" debugging discovery + +When the first round of integration tests failed with sqlcode -603, I built a Java `byte-cycle` scenario in `tests/reference/RefClient.java` that uses `PreparedStatement.setBytes()` against the same server. JDBC failed with the **exact same error** ("Cannot close blob — chunk add is logged"). That was the diagnostic moment: our protocol bytes were correct; the server config was wrong. After the level-0 archive, both JDBC and our driver succeeded. + +This is the third instance of "compare against JDBC at the byte level" diagnostic pattern paying off (after the SHORT-vs-INT bug from Phase 4.x and the 2-byte length prefix from Phase 6.c). Worth promoting to a debugging recipe: **when our driver fails and you suspect protocol error, replicate the operation through `RefClient`. Same error = server/config issue. Different error = our bug.** + +--- + ## (template — copy below this line for new entries) ``` diff --git a/src/informix_db/converters.py b/src/informix_db/converters.py index d53e144..20e2dc3 100644 --- a/src/informix_db/converters.py +++ b/src/informix_db/converters.py @@ -696,6 +696,25 @@ def _encode_intervalym(value: IntervalYM) -> EncodedParam: return (int(IfxType.INTERVAL), qual, raw) +def _encode_bytes(value: bytes) -> EncodedParam: + """Encode ``bytes`` as a BYTE blob descriptor (type=11). + + BYTE/TEXT have a two-PDU bind protocol (per Phase 6.f research): + the SQ_BIND payload for the param is a 56-byte descriptor with + only one populated field — the blob size at offset [16:20] as a + 4-byte BE int. The actual byte content is streamed separately + via SQ_BBIND + SQ_BLOB chunks AFTER the SQ_BIND envelope. + + The encoder returns the descriptor as the "raw" bytes; the + cursor's bind builder is responsible for noticing the BYTE type + code and queuing the value's content for the SQ_BBIND stream. + """ + descriptor = bytearray(56) + # Bytes [16..19] = size (BE int). Per IfxBlob.toIfx line 162. + descriptor[16:20] = len(value).to_bytes(4, "big", signed=True) + return (int(IfxType.BYTE), 0, bytes(descriptor)) + + def _encode_decimal(value: decimal.Decimal) -> EncodedParam: """Encode a Python ``decimal.Decimal`` as IDS DECIMAL (type=5). @@ -793,10 +812,12 @@ def encode_param(value: object) -> EncodedParam: return _encode_timedelta(value) if isinstance(value, IntervalYM): return _encode_intervalym(value) + if isinstance(value, (bytes, bytearray)): + return _encode_bytes(bytes(value)) raise NotImplementedError( f"parameter binding for {type(value).__name__} not yet supported " f"(supports: int, float, str, bool, None, date, datetime, " - f"timedelta, Decimal, IntervalYM)" + f"timedelta, Decimal, IntervalYM, bytes)" ) diff --git a/src/informix_db/cursors.py b/src/informix_db/cursors.py index 4fa3b15..3249503 100644 --- a/src/informix_db/cursors.py +++ b/src/informix_db/cursors.py @@ -88,6 +88,11 @@ class Cursor: # Informix optimizes literal-value INSERTs by executing during # PREPARE. In that case we skip SQ_EXECUTE and go straight to RELEASE. self._statement_already_done = False + # Statement ID from the DESCRIBE response. Used by SQ_FETCHBLOB + # to identify which prepared statement the blob descriptor came + # from. Empirically the server accepts 0 here even when a real + # ID was assigned, so this is best-effort tracking. + self._statement_id: int = 0 # -- PEP 249 attributes ------------------------------------------------ @@ -180,7 +185,13 @@ class Cursor: self._execute_select() def _execute_select(self) -> None: - """Run the SELECT cursor lifecycle: CURNAME+NFETCH → drain → CLOSE → RELEASE.""" + """Run the SELECT cursor lifecycle: CURNAME+NFETCH → drain → CLOSE → RELEASE. + + For BYTE/TEXT columns: the SQ_TUPLE payload only contains 56-byte + blob descriptors; the actual bytes live in the blobspace and must + be retrieved via ``SQ_FETCHBLOB`` round-trips **while the cursor + is still open**. The locator is invalidated by CLOSE. + """ cursor_name = _generate_cursor_name() self._conn._send_pdu(self._build_curname_nfetch_pdu(cursor_name)) self._read_fetch_response() @@ -190,11 +201,93 @@ class Cursor: self._conn._send_pdu(self._build_nfetch_pdu()) self._read_fetch_response() + # Dereference BYTE/TEXT blob descriptors BEFORE CLOSE — the + # locators are only valid while the cursor is open. No-op when + # no BYTE/TEXT columns are present. + self._dereference_blob_columns() + self._conn._send_pdu(self._build_close_pdu()) self._drain_to_eot() self._conn._send_pdu(self._build_release_pdu()) self._drain_to_eot() + def _dereference_blob_columns(self) -> None: + """Replace 56-byte BYTE/TEXT descriptors in ``self._rows`` with the + actual blob bytes via SQ_FETCHBLOB round-trips. No-op when no + BYTE/TEXT columns are present. + + Per ``IfxSqli.sendFetchBlob`` (line 3716): the request is + ``[SQ_ID][stmt_id][SQ_FETCHBLOB=38][padded 56-byte descriptor][SQ_EOT]`` + and the response is one or more ``SQ_BLOB`` (39) chunks ending + with a zero-length terminator. + """ + from ._types import IfxType, base_type + + blob_indices = [ + (i, base_type(c.type_code)) + for i, c in enumerate(self._columns) + if base_type(c.type_code) in (int(IfxType.BYTE), int(IfxType.TEXT)) + ] + if not blob_indices: + return + + new_rows: list[tuple] = [] + for row in self._rows: + row_list = list(row) + for idx, type_code in blob_indices: + descriptor = row_list[idx] + if not isinstance(descriptor, (bytes, bytearray)) or len(descriptor) != 56: + continue + # Byte 39 = null indicator (per IfxBlob.toIfxTuple) + if descriptor[39] == 1: + row_list[idx] = None + continue + blob_bytes = self._fetch_blob(bytes(descriptor)) + if type_code == int(IfxType.TEXT): + row_list[idx] = blob_bytes.decode("iso-8859-1") + else: + row_list[idx] = blob_bytes + new_rows.append(tuple(row_list)) + self._rows = new_rows + + def _fetch_blob(self, descriptor: bytes) -> bytes: + """Send SQ_FETCHBLOB and read the SQ_BLOB stream until terminator.""" + writer, buf = make_pdu_writer() + writer.write_short(MessageType.SQ_ID) + writer.write_int(MessageType.SQ_FETCHBLOB) # 38 + writer.write_padded(descriptor) + writer.write_short(MessageType.SQ_EOT) + self._conn._send_pdu(buf.getvalue()) + + reader = _SocketReader(self._conn._sock) + chunks: list[bytes] = [] + while True: + tag = reader.read_short() + if tag == MessageType.SQ_BLOB: + length = reader.read_short() + if length == 0: + continue # zero-length marks end-of-blob, but stream + # may still have SQ_DONE/EOT after; keep reading + chunks.append(reader.read_exact(length)) + if length & 1: + reader.read_exact(1) # writePadded even-byte align + elif tag == MessageType.SQ_EOT: + break + elif tag == MessageType.SQ_DONE: + self._consume_done(reader) + elif tag == 55: # SQ_COST + reader.read_int() + reader.read_int() + elif tag == MessageType.SQ_XACTSTAT: + reader.read_exact(2 + 2 + 2) + elif tag == MessageType.SQ_ERR: + self._raise_sq_err(reader) + else: + raise DatabaseError( + f"unexpected tag in FETCHBLOB response: 0x{tag:04x}" + ) + return b"".join(chunks) + def _execute_dml_with_params(self, params: tuple) -> None: """DML with bound parameters: SQ_BIND + SQ_EXECUTE → SQ_RELEASE. @@ -365,6 +458,73 @@ class Cursor: writer.write_short(MessageType.SQ_EOT) return buf.getvalue() + def _emit_bind_params(self, writer: object, params: tuple) -> list[bytes]: + """Emit SQ_BIND per-param blocks. Returns the list of blob bytes + (in order, NULL params skipped) that need streaming via SQ_BBIND. + + Routes through ``encode_param`` for each non-None param, then + either: + - Writes the raw bytes inline (normal types) + - Writes the 56-byte blob descriptor inline AND queues the + real content for the post-SQ_BIND blob stream (BYTE/TEXT) + """ + from ._types import IfxType # local import to avoid cycle + + blob_payloads: list[bytes] = [] + for value in params: + if value is None: + writer.write_short(0) + writer.write_short(-1) + writer.write_short(0) + continue + ifx_type, prec, raw = encode_param(value) + writer.write_short(ifx_type) + writer.write_short(0) # indicator = 0 (non-null) + writer.write_short(prec) + writer.write_padded(raw) + if ifx_type in (int(IfxType.BYTE), int(IfxType.TEXT)): + # The encoder put a 56-byte descriptor inline; queue the + # actual bytes for the SQ_BBIND/SQ_BLOB stream. + # ``bytes`` and ``bytearray`` flow through here; ``str`` + # for TEXT is converted to bytes per ``CLIENT_LOCALE``. + payload = ( + value.encode("iso-8859-1") + if isinstance(value, str) + else bytes(value) + ) + blob_payloads.append(payload) + return blob_payloads + + def _emit_blob_stream( + self, writer: object, blob_payloads: list[bytes] + ) -> None: + """Emit SQ_BBIND + SQ_BLOB chunks + zero-length terminator. + + Wire layout per ``IfxSqli.sendBlob`` line 3328 + ``sendStreamBlob`` + line 3482: + ``[short SQ_BBIND=41][short blob_count]`` + for each blob: + while bytes remain: + ``[short SQ_BLOB=39][short chunk_len][padded data]`` + ``[short SQ_BLOB=39][short 0]`` # end-of-blob + """ + if not blob_payloads: + return + writer.write_short(MessageType.SQ_BBIND) + writer.write_short(len(blob_payloads)) + chunk_size = 1024 # JDBC's sendStreamBlob hardcodes this + for blob in blob_payloads: + offset = 0 + while offset < len(blob): + chunk = blob[offset : offset + chunk_size] + writer.write_short(MessageType.SQ_BLOB) + writer.write_short(len(chunk)) + writer.write_padded(chunk) + offset += len(chunk) + # Zero-length terminator marks end of THIS blob + writer.write_short(MessageType.SQ_BLOB) + writer.write_short(0) + def _build_bind_only_pdu(self, params: tuple) -> bytes: """SQ_BIND with parameter values + SQ_EOT (no SQ_EXECUTE). @@ -376,28 +536,23 @@ class Cursor: writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_BIND) writer.write_short(len(params)) - for value in params: - if value is None: - writer.write_short(0) - writer.write_short(-1) - writer.write_short(0) - else: - ifx_type, prec, raw = encode_param(value) - writer.write_short(ifx_type) - writer.write_short(0) - writer.write_short(prec) - writer.write_padded(raw) + blob_payloads = self._emit_bind_params(writer, params) + self._emit_blob_stream(writer, blob_payloads) writer.write_short(MessageType.SQ_EOT) return buf.getvalue() def _build_bind_execute_pdu(self, params: tuple) -> bytes: - """SQ_BIND with parameter values + SQ_EXECUTE + SQ_EOT. + """SQ_BIND + (optional SQ_BBIND blob stream) + SQ_EXECUTE + SQ_EOT. - From the JDBC capture (msg[29] in 02-dml-cycle.socat.log): + From the JDBC capture (msg[29] in 02-dml-cycle.socat.log) plus + the BYTE/TEXT extension found in IfxSqli.sendBind line 998+: [short SQ_ID=4][int 5=SQ_BIND][short numparams] for each param: [short type][short indicator][short prec] writePadded(data) # data + 0-pad if odd-len + -- if any blob params -- + [short SQ_BBIND=41][short blob_count] + for each blob: SQ_BLOB chunks ending with [SQ_BLOB][0] [short SQ_EXECUTE=7] [short SQ_EOT] """ @@ -405,18 +560,8 @@ class Cursor: writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_BIND) # action = 5 writer.write_short(len(params)) - for value in params: - if value is None: - # NULL: type=0, indicator=-1, prec=0, no data - writer.write_short(0) - writer.write_short(-1) - writer.write_short(0) - else: - ifx_type, prec, raw = encode_param(value) - writer.write_short(ifx_type) - writer.write_short(0) # indicator = 0 (normal) - writer.write_short(prec) - writer.write_padded(raw) + blob_payloads = self._emit_bind_params(writer, params) + self._emit_blob_stream(writer, blob_payloads) writer.write_short(MessageType.SQ_EXECUTE) # 7 writer.write_short(MessageType.SQ_EOT) return buf.getvalue() @@ -503,7 +648,8 @@ class Cursor: if tag == MessageType.SQ_EOT: return elif tag == MessageType.SQ_DESCRIBE: - self._columns, _ = parse_describe(reader) + self._columns, meta = parse_describe(reader) + self._statement_id = meta.get("statement_id", 0) self._description = ( [c.to_description_tuple() for c in self._columns] if self._columns else None ) diff --git a/tests/reference/RefClient.java b/tests/reference/RefClient.java index 9114b07..925b70c 100644 --- a/tests/reference/RefClient.java +++ b/tests/reference/RefClient.java @@ -43,6 +43,7 @@ public class RefClient { case "connect-only": runConnectOnly(); break; case "select-1": runSelect1(); break; case "dml-cycle": runDmlCycle(); break; + case "byte-cycle": runByteCycle(); break; case "all": runConnectOnly(); runSelect1(); @@ -151,4 +152,45 @@ public class RefClient { // Temp table dropped automatically on disconnect; no DROP needed. } } + + // ------------------------------------------------------------------- + // Scenario D: BYTE column write+read cycle. Requires: + // - logged DB (env IFX_DATABASE=testdb) + // - a blobspace named "blobspace1" already created + // Set IFX_DATABASE=testdb before running. + // ------------------------------------------------------------------- + static void runByteCycle() throws SQLException { + log("=== byte-cycle ==="); + String table = "byte_" + Long.toHexString(System.nanoTime()); + try (Connection c = DriverManager.getConnection(url(), USER, PASSWORD)) { + c.setAutoCommit(true); + + try (Statement s = c.createStatement()) { + log("CREATE TABLE %s (id INT, data BYTE IN blobspace1)", table); + s.execute("CREATE TABLE " + table + " (id INT, data BYTE IN blobspace1)"); + } + + byte[] payload = "hello bytes from JDBC".getBytes(); + try (PreparedStatement ps = c.prepareStatement( + "INSERT INTO " + table + " VALUES (?, ?)")) { + ps.setInt(1, 1); + ps.setBytes(2, payload); + int n = ps.executeUpdate(); + log("INSERT rowcount=%d (sent %d bytes)", n, payload.length); + } + + try (Statement s = c.createStatement(); + ResultSet rs = s.executeQuery("SELECT id, data FROM " + table)) { + while (rs.next()) { + byte[] got = rs.getBytes(2); + log(" row: id=%d data.len=%d data=%s", rs.getInt(1), + got.length, new String(got)); + } + } + + try (Statement s = c.createStatement()) { + s.execute("DROP TABLE " + table); + } + } + } } diff --git a/tests/test_blob.py b/tests/test_blob.py new file mode 100644 index 0000000..7a560e8 --- /dev/null +++ b/tests/test_blob.py @@ -0,0 +1,240 @@ +"""Phase 8 integration tests — BYTE/TEXT round-trip via SQ_BBIND/SQ_BLOB. + +BYTE/TEXT use a multi-PDU wire protocol: the SQ_BIND payload carries a +56-byte blob descriptor (with size at offset [16..19]); the actual bytes +travel via SQ_BBIND + chunked SQ_BLOB messages after SQ_BIND. On read, +the SQ_TUPLE payload returns only the descriptor; the client must +explicitly fetch the bytes via SQ_FETCHBLOB while the cursor is still +open (locator invalidated by CLOSE). + +Server-side requirements (preconfigured in the dev container by Phase 7 +setup): blobspace1 + sbspace1 + a logged database (testdb). The blobspace +also requires a level-0 archive before allocating pages — done via +``ontape -s -L 0 -t /dev/null`` once. +""" + +from __future__ import annotations + +import contextlib +from collections.abc import Iterator + +import pytest + +import informix_db +from tests.conftest import ConnParams + +pytestmark = pytest.mark.integration + + +def _connect(params: ConnParams) -> informix_db.Connection: + return informix_db.connect( + host=params.host, + port=params.port, + user=params.user, + password=params.password, + database=params.database, + server=params.server, + connect_timeout=10.0, + read_timeout=10.0, + autocommit=True, + ) + + +@pytest.fixture +def byte_table(logged_db_params: ConnParams) -> Iterator[str]: + """Create a fresh permanent BYTE table per test, drop on teardown.""" + table = "t_blob_byte" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + with contextlib.suppress(Exception): + cur.execute(f"DROP TABLE {table}") + try: + cur.execute( + f"CREATE TABLE {table} (id INT, data BYTE IN blobspace1)" + ) + except informix_db.Error as e: + pytest.skip( + f"blobspace1 unavailable ({e!r}); set up per Phase 7" + ) + try: + yield table + finally: + with _connect(logged_db_params) as conn: + cur = conn.cursor() + with contextlib.suppress(Exception): + cur.execute(f"DROP TABLE {table}") + + +@pytest.fixture +def text_table(logged_db_params: ConnParams) -> Iterator[str]: + """Create a fresh permanent TEXT table per test, drop on teardown.""" + table = "t_blob_text" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + with contextlib.suppress(Exception): + cur.execute(f"DROP TABLE {table}") + try: + cur.execute( + f"CREATE TABLE {table} (id INT, data TEXT IN blobspace1)" + ) + except informix_db.Error as e: + pytest.skip( + f"blobspace1 unavailable ({e!r}); set up per Phase 7" + ) + try: + yield table + finally: + with _connect(logged_db_params) as conn: + cur = conn.cursor() + with contextlib.suppress(Exception): + cur.execute(f"DROP TABLE {table}") + + +# -------- BYTE round-trip -------- + + +def test_byte_roundtrip_short( + logged_db_params: ConnParams, byte_table: str +) -> None: + """Short BYTE payload (<1024 bytes, single SQ_BLOB chunk).""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"INSERT INTO {byte_table} VALUES (?, ?)", + (1, b"hello bytes round trip"), + ) + cur.execute(f"SELECT id, data FROM {byte_table}") + assert cur.fetchall() == [(1, b"hello bytes round trip")] + + +def test_byte_roundtrip_multichunk( + logged_db_params: ConnParams, byte_table: str +) -> None: + """Larger BYTE payload spanning multiple SQ_BLOB chunks (>1024 bytes).""" + payload = bytes(range(256)) * 20 # 5120 bytes + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload) + ) + cur.execute(f"SELECT data FROM {byte_table}") + (got,) = cur.fetchone() + assert got == payload + assert len(got) == 5120 + + +def test_byte_null( + logged_db_params: ConnParams, byte_table: str +) -> None: + """NULL BYTE column: byte 39 of descriptor=1 → Python None.""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"INSERT INTO {byte_table} VALUES (?, NULL)", (1,) + ) + cur.execute(f"SELECT id, data FROM {byte_table}") + assert cur.fetchall() == [(1, None)] + + +def test_byte_multi_row( + logged_db_params: ConnParams, byte_table: str +) -> None: + """Multiple rows with BYTE columns — each gets its own SQ_FETCHBLOB.""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.executemany( + f"INSERT INTO {byte_table} VALUES (?, ?)", + [ + (1, b"first row data"), + (2, b"second row content"), + (3, b"third"), + ], + ) + cur.execute( + f"SELECT id, data FROM {byte_table} ORDER BY id" + ) + assert cur.fetchall() == [ + (1, b"first row data"), + (2, b"second row content"), + (3, b"third"), + ] + + +def test_byte_binary_safe( + logged_db_params: ConnParams, byte_table: str +) -> None: + """BYTE preserves arbitrary binary data including nulls and high bytes.""" + payload = bytes([0, 1, 255, 0, 254, 128, 0]) + b"\x00\x00\x00\xff" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload) + ) + cur.execute(f"SELECT data FROM {byte_table}") + (got,) = cur.fetchone() + assert got == payload + + +# -------- TEXT round-trip -------- + + +def test_text_roundtrip( + logged_db_params: ConnParams, text_table: str +) -> None: + """TEXT column round-trip: bytes in, str out (decoded as iso-8859-1).""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"INSERT INTO {text_table} VALUES (?, ?)", + (1, b"this is some text content"), + ) + cur.execute(f"SELECT id, data FROM {text_table}") + assert cur.fetchall() == [(1, "this is some text content")] + + +def test_text_with_unicode_iso8859( + logged_db_params: ConnParams, text_table: str +) -> None: + """ISO-8859-1 characters preserved through the TEXT pipeline.""" + payload = "café résumé naïve".encode("iso-8859-1") + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"INSERT INTO {text_table} VALUES (?, ?)", (1, payload) + ) + cur.execute(f"SELECT data FROM {text_table}") + (got,) = cur.fetchone() + assert got == "café résumé naïve" + + +def test_text_null( + logged_db_params: ConnParams, text_table: str +) -> None: + """NULL TEXT column → Python None.""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.execute( + f"INSERT INTO {text_table} VALUES (?, NULL)", (1,) + ) + cur.execute(f"SELECT data FROM {text_table}") + assert cur.fetchone() == (None,) + + +# -------- Mixed columns -------- + + +def test_byte_alongside_other_types( + logged_db_params: ConnParams, byte_table: str +) -> None: + """A row with BYTE + INT columns — descriptor is in tuple, blob fetched separately.""" + with _connect(logged_db_params) as conn: + cur = conn.cursor() + cur.executemany( + f"INSERT INTO {byte_table} VALUES (?, ?)", + [(42, b"hello"), (99, b"world")], + ) + cur.execute( + f"SELECT id, data FROM {byte_table} ORDER BY id" + ) + rows = cur.fetchall() + assert rows == [(42, b"hello"), (99, b"world")] diff --git a/tests/test_params.py b/tests/test_params.py index b959253..0cac970 100644 --- a/tests/test_params.py +++ b/tests/test_params.py @@ -93,12 +93,19 @@ def test_delete_with_param(conn_params: ConnParams) -> None: def test_unsupported_param_type_raises(conn_params: ConnParams) -> None: - """Phase 4 supports int/float/str/bool/None; other types raise.""" + """Driver accepts a known set of Python types; other types raise. + + Phase 8 added ``bytes``/``bytearray``, so the canonical "unsupported" + sentinel is now an arbitrary Python class with no encoder dispatch. + """ + class CustomType: + pass + with _connect(conn_params) as conn: cur = conn.cursor() cur.execute("CREATE TEMP TABLE t_p_f (id INTEGER)") - with pytest.raises(NotImplementedError, match="bytes"): - cur.execute("INSERT INTO t_p_f VALUES (?)", (b"raw bytes",)) + with pytest.raises(NotImplementedError, match="CustomType"): + cur.execute("INSERT INTO t_p_f VALUES (?)", (CustomType(),)) def test_parameterized_select_int(conn_params: ConnParams) -> None: