Phase 8: BYTE/TEXT bind+read via SQ_BBIND/SQ_BLOB/SQ_FETCHBLOB
Implements end-to-end round-trip for BYTE (type 11) and TEXT (type 12)
columns. Python bytes/bytearray map to BYTE; str is auto-encoded as
ISO-8859-1 for TEXT.
Wire protocol — write side:
* SQ_BIND payload carries a 56-byte blob descriptor with size at offset
[16..19] (per IfxBlob.toIfx). NULL is byte 39=1.
* After all per-param blocks, SQ_BBIND (41) declares blob count, then
chunked SQ_BLOB (39) messages stream the actual bytes (max 1024
bytes/chunk per JDBC), terminated by zero-length SQ_BLOB.
* Then SQ_EXECUTE proceeds normally.
Wire protocol — read side:
* SQ_TUPLE returns only the 56-byte descriptor; actual bytes live in
the blobspace.
* For each BYTE/TEXT column in each row, send SQ_FETCHBLOB with the
descriptor and read SQ_BLOB chunks until zero-length terminator.
* The locator is only valid while the cursor is open — must dereference
BEFORE sending CLOSE. Doing it after returns -602 (Cannot open blob).
Server-side prerequisites (one-time setup):
1. blobspace: onspaces -c -b blobspace1 -p /path -o 0 -s 50000
2. logged DB: CREATE DATABASE testdb WITH LOG
3. config + archive:
onmode -wm LTAPEDEV=/dev/null
onmode -wm TAPEDEV=/dev/null
onmode -l
ontape -s -L 0 -t /dev/null
Without #3, JDBC fails identically to our driver with "BLOB pages can't
be allocated from a chunk until chunk add is logged". This identical
failure was the diagnostic confirmation that our protocol bytes were
correct — same server response = byte-for-byte parity.
Tests: 9 integration tests in tests/test_blob.py — single-chunk,
multi-chunk (5120 bytes), NULL, multi-row, binary-safe, TEXT roundtrip,
ISO-8859-1, NULL TEXT, mixed columns. Plus the Phase 4
test_unsupported_param_type_raises was updated since bytes is no longer
the canonical unsupported type — switched to a custom class.
Total: 53 unit + 107 integration = 160 tests.
The smart-LOB family (BLOB/CLOB) is a separate state-machine extension
deferred to Phase 9 — it uses IfxLocator + LO_OPEN/LO_READ session
protocol against sbspace, not the BBIND/BLOB stream.
This commit is contained in:
parent
1c19c71cb6
commit
52259f0152
@ -589,6 +589,76 @@ The ``conftest.py::_ensure_testdb`` fixture auto-creates ``testdb WITH LOG`` if
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## 2026-05-04 — Phase 8: BYTE / TEXT bind+read (the SQ_BBIND/SQ_BLOB protocol)
|
||||||
|
|
||||||
|
**Status**: active
|
||||||
|
**Decision**: BYTE (type 11) and TEXT (type 12) round-trip end-to-end. Python `bytes`/`bytearray` map to BYTE; `str` is auto-encoded as ISO-8859-1 for TEXT (matching the server's default codeset). NULL is byte 39 of the descriptor.
|
||||||
|
|
||||||
|
### Wire protocol — write side
|
||||||
|
|
||||||
|
A BYTE/TEXT param uses **two** PDU sections within the same SQ_BIND envelope:
|
||||||
|
|
||||||
|
1. **Inline placeholder** (per `IfxBlob.toIfx` line 162): a 56-byte blob descriptor with **only** the size at offset [16..19] as a 4-byte big-endian int. All other bytes are zero. (For NULL, byte 39 is set to 1.)
|
||||||
|
2. **SQ_BBIND stream** (per `IfxSqli.sendBlob` line 3328): after all per-param SQ_BIND blocks, emit `[short SQ_BBIND=41][short blob_count]`, then for each blob param stream chunked SQ_BLOB messages: `[short SQ_BLOB=39][short chunk_len][padded data]` (max 1024 bytes/chunk per JDBC's `sendStreamBlob`), ending with a zero-length terminator `[short SQ_BLOB=39][short 0]`.
|
||||||
|
|
||||||
|
Then SQ_EXECUTE proceeds normally.
|
||||||
|
|
||||||
|
### Wire protocol — read side
|
||||||
|
|
||||||
|
The SQ_TUPLE payload returns only the 56-byte descriptor for BYTE/TEXT columns — the actual bytes live in the blobspace. The client must explicitly fetch via SQ_FETCHBLOB (per `IfxSqli.sendFetchBlob` line 3716):
|
||||||
|
|
||||||
|
```
|
||||||
|
[short SQ_ID=4][int 38=SQ_FETCHBLOB][padded 56-byte descriptor][short SQ_EOT]
|
||||||
|
```
|
||||||
|
|
||||||
|
The server replies with one or more SQ_BLOB chunks ending with a zero-length terminator. The descriptor's locator is **only valid while the cursor is open** — the dereferencing must happen between the final NFETCH and CLOSE. Doing it after CLOSE returns -602 (Cannot open blob) with ISAM -101.
|
||||||
|
|
||||||
|
### Server-side prerequisites
|
||||||
|
|
||||||
|
The IBM dev container needs three things, in this order, before BYTE/TEXT works at all:
|
||||||
|
1. **A blobspace**: `onspaces -c -b blobspace1 -p /path -o 0 -s 50000`
|
||||||
|
2. **A logged database**: `CREATE DATABASE testdb WITH LOG` (BYTE/TEXT rejected in unlogged DBs with sqlcode -617)
|
||||||
|
3. **Config + level-0 archive to allow chunk page allocation**:
|
||||||
|
```bash
|
||||||
|
onmode -wm LTAPEDEV=/dev/null
|
||||||
|
onmode -wm TAPEDEV=/dev/null
|
||||||
|
onmode -l # advance logical log
|
||||||
|
ontape -s -L 0 -t /dev/null # level-0 archive
|
||||||
|
```
|
||||||
|
Without the archive, JDBC fails identically to our driver with "Cannot close blob — BLOB pages can't be allocated from a chunk until chunk add is logged" (ISAM -169). **This was the unblocker that confirmed our protocol implementation was correct** — when JDBC and our driver fail identically against the same broken server config, you've got byte-for-byte protocol parity. Then fix the server.
|
||||||
|
|
||||||
|
### Architectural note: rest-of-the-codec-types-vs-this-one
|
||||||
|
|
||||||
|
Phase 6.a/c/e (DECIMAL/DATETIME/INTERVAL) shipped fast because each type was a single-PDU codec — encode bytes, send inline. BYTE/TEXT required **state-machine surgery**:
|
||||||
|
- The bind builder now knows about "blob-aware" params and queues them for a separate stream after the per-param block.
|
||||||
|
- The cursor's SELECT lifecycle now does a SQ_FETCHBLOB round-trip per blob column per row before sending CLOSE.
|
||||||
|
- The dereferencing is a separate read loop that handles its own SQ_DONE/SQ_COST/SQ_XACTSTAT interleaving.
|
||||||
|
|
||||||
|
The smart-LOB family (BLOB type 102, CLOB type 101) is a **further** state-machine extension — they use `IfxLocator` references against sbspace and require an LO_OPEN/LO_READ/LO_WRITE/LO_CLOSE session protocol entirely separate from BBIND/BLOB. That's deferred to Phase 9.
|
||||||
|
|
||||||
|
### Test coverage delivered
|
||||||
|
|
||||||
|
9 integration tests in `tests/test_blob.py`:
|
||||||
|
- `test_byte_roundtrip_short` — single-chunk payload
|
||||||
|
- `test_byte_roundtrip_multichunk` — 5120 bytes (5 chunks at 1024 each)
|
||||||
|
- `test_byte_null` — null descriptor (byte 39=1) → Python None
|
||||||
|
- `test_byte_multi_row` — three rows, each with its own SQ_FETCHBLOB
|
||||||
|
- `test_byte_binary_safe` — preserves null bytes, high bytes, etc.
|
||||||
|
- `test_text_roundtrip` — TEXT column, str returned (decoded)
|
||||||
|
- `test_text_with_unicode_iso8859` — extended-Latin chars round-trip
|
||||||
|
- `test_text_null`
|
||||||
|
- `test_byte_alongside_other_types` — BYTE column mixed with INT
|
||||||
|
|
||||||
|
Plus the Phase 4 `test_unsupported_param_type_raises` was updated — `bytes` is no longer the canonical "unsupported" sentinel, since we now support it. Switched to a custom Python class for that role.
|
||||||
|
|
||||||
|
### The "JDBC fails identically" debugging discovery
|
||||||
|
|
||||||
|
When the first round of integration tests failed with sqlcode -603, I built a Java `byte-cycle` scenario in `tests/reference/RefClient.java` that uses `PreparedStatement.setBytes()` against the same server. JDBC failed with the **exact same error** ("Cannot close blob — chunk add is logged"). That was the diagnostic moment: our protocol bytes were correct; the server config was wrong. After the level-0 archive, both JDBC and our driver succeeded.
|
||||||
|
|
||||||
|
This is the third instance of "compare against JDBC at the byte level" diagnostic pattern paying off (after the SHORT-vs-INT bug from Phase 4.x and the 2-byte length prefix from Phase 6.c). Worth promoting to a debugging recipe: **when our driver fails and you suspect protocol error, replicate the operation through `RefClient`. Same error = server/config issue. Different error = our bug.**
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## (template — copy below this line for new entries)
|
## (template — copy below this line for new entries)
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|||||||
@ -696,6 +696,25 @@ def _encode_intervalym(value: IntervalYM) -> EncodedParam:
|
|||||||
return (int(IfxType.INTERVAL), qual, raw)
|
return (int(IfxType.INTERVAL), qual, raw)
|
||||||
|
|
||||||
|
|
||||||
|
def _encode_bytes(value: bytes) -> EncodedParam:
|
||||||
|
"""Encode ``bytes`` as a BYTE blob descriptor (type=11).
|
||||||
|
|
||||||
|
BYTE/TEXT have a two-PDU bind protocol (per Phase 6.f research):
|
||||||
|
the SQ_BIND payload for the param is a 56-byte descriptor with
|
||||||
|
only one populated field — the blob size at offset [16:20] as a
|
||||||
|
4-byte BE int. The actual byte content is streamed separately
|
||||||
|
via SQ_BBIND + SQ_BLOB chunks AFTER the SQ_BIND envelope.
|
||||||
|
|
||||||
|
The encoder returns the descriptor as the "raw" bytes; the
|
||||||
|
cursor's bind builder is responsible for noticing the BYTE type
|
||||||
|
code and queuing the value's content for the SQ_BBIND stream.
|
||||||
|
"""
|
||||||
|
descriptor = bytearray(56)
|
||||||
|
# Bytes [16..19] = size (BE int). Per IfxBlob.toIfx line 162.
|
||||||
|
descriptor[16:20] = len(value).to_bytes(4, "big", signed=True)
|
||||||
|
return (int(IfxType.BYTE), 0, bytes(descriptor))
|
||||||
|
|
||||||
|
|
||||||
def _encode_decimal(value: decimal.Decimal) -> EncodedParam:
|
def _encode_decimal(value: decimal.Decimal) -> EncodedParam:
|
||||||
"""Encode a Python ``decimal.Decimal`` as IDS DECIMAL (type=5).
|
"""Encode a Python ``decimal.Decimal`` as IDS DECIMAL (type=5).
|
||||||
|
|
||||||
@ -793,10 +812,12 @@ def encode_param(value: object) -> EncodedParam:
|
|||||||
return _encode_timedelta(value)
|
return _encode_timedelta(value)
|
||||||
if isinstance(value, IntervalYM):
|
if isinstance(value, IntervalYM):
|
||||||
return _encode_intervalym(value)
|
return _encode_intervalym(value)
|
||||||
|
if isinstance(value, (bytes, bytearray)):
|
||||||
|
return _encode_bytes(bytes(value))
|
||||||
raise NotImplementedError(
|
raise NotImplementedError(
|
||||||
f"parameter binding for {type(value).__name__} not yet supported "
|
f"parameter binding for {type(value).__name__} not yet supported "
|
||||||
f"(supports: int, float, str, bool, None, date, datetime, "
|
f"(supports: int, float, str, bool, None, date, datetime, "
|
||||||
f"timedelta, Decimal, IntervalYM)"
|
f"timedelta, Decimal, IntervalYM, bytes)"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -88,6 +88,11 @@ class Cursor:
|
|||||||
# Informix optimizes literal-value INSERTs by executing during
|
# Informix optimizes literal-value INSERTs by executing during
|
||||||
# PREPARE. In that case we skip SQ_EXECUTE and go straight to RELEASE.
|
# PREPARE. In that case we skip SQ_EXECUTE and go straight to RELEASE.
|
||||||
self._statement_already_done = False
|
self._statement_already_done = False
|
||||||
|
# Statement ID from the DESCRIBE response. Used by SQ_FETCHBLOB
|
||||||
|
# to identify which prepared statement the blob descriptor came
|
||||||
|
# from. Empirically the server accepts 0 here even when a real
|
||||||
|
# ID was assigned, so this is best-effort tracking.
|
||||||
|
self._statement_id: int = 0
|
||||||
|
|
||||||
# -- PEP 249 attributes ------------------------------------------------
|
# -- PEP 249 attributes ------------------------------------------------
|
||||||
|
|
||||||
@ -180,7 +185,13 @@ class Cursor:
|
|||||||
self._execute_select()
|
self._execute_select()
|
||||||
|
|
||||||
def _execute_select(self) -> None:
|
def _execute_select(self) -> None:
|
||||||
"""Run the SELECT cursor lifecycle: CURNAME+NFETCH → drain → CLOSE → RELEASE."""
|
"""Run the SELECT cursor lifecycle: CURNAME+NFETCH → drain → CLOSE → RELEASE.
|
||||||
|
|
||||||
|
For BYTE/TEXT columns: the SQ_TUPLE payload only contains 56-byte
|
||||||
|
blob descriptors; the actual bytes live in the blobspace and must
|
||||||
|
be retrieved via ``SQ_FETCHBLOB`` round-trips **while the cursor
|
||||||
|
is still open**. The locator is invalidated by CLOSE.
|
||||||
|
"""
|
||||||
cursor_name = _generate_cursor_name()
|
cursor_name = _generate_cursor_name()
|
||||||
self._conn._send_pdu(self._build_curname_nfetch_pdu(cursor_name))
|
self._conn._send_pdu(self._build_curname_nfetch_pdu(cursor_name))
|
||||||
self._read_fetch_response()
|
self._read_fetch_response()
|
||||||
@ -190,11 +201,93 @@ class Cursor:
|
|||||||
self._conn._send_pdu(self._build_nfetch_pdu())
|
self._conn._send_pdu(self._build_nfetch_pdu())
|
||||||
self._read_fetch_response()
|
self._read_fetch_response()
|
||||||
|
|
||||||
|
# Dereference BYTE/TEXT blob descriptors BEFORE CLOSE — the
|
||||||
|
# locators are only valid while the cursor is open. No-op when
|
||||||
|
# no BYTE/TEXT columns are present.
|
||||||
|
self._dereference_blob_columns()
|
||||||
|
|
||||||
self._conn._send_pdu(self._build_close_pdu())
|
self._conn._send_pdu(self._build_close_pdu())
|
||||||
self._drain_to_eot()
|
self._drain_to_eot()
|
||||||
self._conn._send_pdu(self._build_release_pdu())
|
self._conn._send_pdu(self._build_release_pdu())
|
||||||
self._drain_to_eot()
|
self._drain_to_eot()
|
||||||
|
|
||||||
|
def _dereference_blob_columns(self) -> None:
|
||||||
|
"""Replace 56-byte BYTE/TEXT descriptors in ``self._rows`` with the
|
||||||
|
actual blob bytes via SQ_FETCHBLOB round-trips. No-op when no
|
||||||
|
BYTE/TEXT columns are present.
|
||||||
|
|
||||||
|
Per ``IfxSqli.sendFetchBlob`` (line 3716): the request is
|
||||||
|
``[SQ_ID][stmt_id][SQ_FETCHBLOB=38][padded 56-byte descriptor][SQ_EOT]``
|
||||||
|
and the response is one or more ``SQ_BLOB`` (39) chunks ending
|
||||||
|
with a zero-length terminator.
|
||||||
|
"""
|
||||||
|
from ._types import IfxType, base_type
|
||||||
|
|
||||||
|
blob_indices = [
|
||||||
|
(i, base_type(c.type_code))
|
||||||
|
for i, c in enumerate(self._columns)
|
||||||
|
if base_type(c.type_code) in (int(IfxType.BYTE), int(IfxType.TEXT))
|
||||||
|
]
|
||||||
|
if not blob_indices:
|
||||||
|
return
|
||||||
|
|
||||||
|
new_rows: list[tuple] = []
|
||||||
|
for row in self._rows:
|
||||||
|
row_list = list(row)
|
||||||
|
for idx, type_code in blob_indices:
|
||||||
|
descriptor = row_list[idx]
|
||||||
|
if not isinstance(descriptor, (bytes, bytearray)) or len(descriptor) != 56:
|
||||||
|
continue
|
||||||
|
# Byte 39 = null indicator (per IfxBlob.toIfxTuple)
|
||||||
|
if descriptor[39] == 1:
|
||||||
|
row_list[idx] = None
|
||||||
|
continue
|
||||||
|
blob_bytes = self._fetch_blob(bytes(descriptor))
|
||||||
|
if type_code == int(IfxType.TEXT):
|
||||||
|
row_list[idx] = blob_bytes.decode("iso-8859-1")
|
||||||
|
else:
|
||||||
|
row_list[idx] = blob_bytes
|
||||||
|
new_rows.append(tuple(row_list))
|
||||||
|
self._rows = new_rows
|
||||||
|
|
||||||
|
def _fetch_blob(self, descriptor: bytes) -> bytes:
|
||||||
|
"""Send SQ_FETCHBLOB and read the SQ_BLOB stream until terminator."""
|
||||||
|
writer, buf = make_pdu_writer()
|
||||||
|
writer.write_short(MessageType.SQ_ID)
|
||||||
|
writer.write_int(MessageType.SQ_FETCHBLOB) # 38
|
||||||
|
writer.write_padded(descriptor)
|
||||||
|
writer.write_short(MessageType.SQ_EOT)
|
||||||
|
self._conn._send_pdu(buf.getvalue())
|
||||||
|
|
||||||
|
reader = _SocketReader(self._conn._sock)
|
||||||
|
chunks: list[bytes] = []
|
||||||
|
while True:
|
||||||
|
tag = reader.read_short()
|
||||||
|
if tag == MessageType.SQ_BLOB:
|
||||||
|
length = reader.read_short()
|
||||||
|
if length == 0:
|
||||||
|
continue # zero-length marks end-of-blob, but stream
|
||||||
|
# may still have SQ_DONE/EOT after; keep reading
|
||||||
|
chunks.append(reader.read_exact(length))
|
||||||
|
if length & 1:
|
||||||
|
reader.read_exact(1) # writePadded even-byte align
|
||||||
|
elif tag == MessageType.SQ_EOT:
|
||||||
|
break
|
||||||
|
elif tag == MessageType.SQ_DONE:
|
||||||
|
self._consume_done(reader)
|
||||||
|
elif tag == 55: # SQ_COST
|
||||||
|
reader.read_int()
|
||||||
|
reader.read_int()
|
||||||
|
elif tag == MessageType.SQ_XACTSTAT:
|
||||||
|
reader.read_exact(2 + 2 + 2)
|
||||||
|
elif tag == MessageType.SQ_ERR:
|
||||||
|
self._raise_sq_err(reader)
|
||||||
|
else:
|
||||||
|
raise DatabaseError(
|
||||||
|
f"unexpected tag in FETCHBLOB response: 0x{tag:04x}"
|
||||||
|
)
|
||||||
|
return b"".join(chunks)
|
||||||
|
|
||||||
def _execute_dml_with_params(self, params: tuple) -> None:
|
def _execute_dml_with_params(self, params: tuple) -> None:
|
||||||
"""DML with bound parameters: SQ_BIND + SQ_EXECUTE → SQ_RELEASE.
|
"""DML with bound parameters: SQ_BIND + SQ_EXECUTE → SQ_RELEASE.
|
||||||
|
|
||||||
@ -365,6 +458,73 @@ class Cursor:
|
|||||||
writer.write_short(MessageType.SQ_EOT)
|
writer.write_short(MessageType.SQ_EOT)
|
||||||
return buf.getvalue()
|
return buf.getvalue()
|
||||||
|
|
||||||
|
def _emit_bind_params(self, writer: object, params: tuple) -> list[bytes]:
|
||||||
|
"""Emit SQ_BIND per-param blocks. Returns the list of blob bytes
|
||||||
|
(in order, NULL params skipped) that need streaming via SQ_BBIND.
|
||||||
|
|
||||||
|
Routes through ``encode_param`` for each non-None param, then
|
||||||
|
either:
|
||||||
|
- Writes the raw bytes inline (normal types)
|
||||||
|
- Writes the 56-byte blob descriptor inline AND queues the
|
||||||
|
real content for the post-SQ_BIND blob stream (BYTE/TEXT)
|
||||||
|
"""
|
||||||
|
from ._types import IfxType # local import to avoid cycle
|
||||||
|
|
||||||
|
blob_payloads: list[bytes] = []
|
||||||
|
for value in params:
|
||||||
|
if value is None:
|
||||||
|
writer.write_short(0)
|
||||||
|
writer.write_short(-1)
|
||||||
|
writer.write_short(0)
|
||||||
|
continue
|
||||||
|
ifx_type, prec, raw = encode_param(value)
|
||||||
|
writer.write_short(ifx_type)
|
||||||
|
writer.write_short(0) # indicator = 0 (non-null)
|
||||||
|
writer.write_short(prec)
|
||||||
|
writer.write_padded(raw)
|
||||||
|
if ifx_type in (int(IfxType.BYTE), int(IfxType.TEXT)):
|
||||||
|
# The encoder put a 56-byte descriptor inline; queue the
|
||||||
|
# actual bytes for the SQ_BBIND/SQ_BLOB stream.
|
||||||
|
# ``bytes`` and ``bytearray`` flow through here; ``str``
|
||||||
|
# for TEXT is converted to bytes per ``CLIENT_LOCALE``.
|
||||||
|
payload = (
|
||||||
|
value.encode("iso-8859-1")
|
||||||
|
if isinstance(value, str)
|
||||||
|
else bytes(value)
|
||||||
|
)
|
||||||
|
blob_payloads.append(payload)
|
||||||
|
return blob_payloads
|
||||||
|
|
||||||
|
def _emit_blob_stream(
|
||||||
|
self, writer: object, blob_payloads: list[bytes]
|
||||||
|
) -> None:
|
||||||
|
"""Emit SQ_BBIND + SQ_BLOB chunks + zero-length terminator.
|
||||||
|
|
||||||
|
Wire layout per ``IfxSqli.sendBlob`` line 3328 + ``sendStreamBlob``
|
||||||
|
line 3482:
|
||||||
|
``[short SQ_BBIND=41][short blob_count]``
|
||||||
|
for each blob:
|
||||||
|
while bytes remain:
|
||||||
|
``[short SQ_BLOB=39][short chunk_len][padded data]``
|
||||||
|
``[short SQ_BLOB=39][short 0]`` # end-of-blob
|
||||||
|
"""
|
||||||
|
if not blob_payloads:
|
||||||
|
return
|
||||||
|
writer.write_short(MessageType.SQ_BBIND)
|
||||||
|
writer.write_short(len(blob_payloads))
|
||||||
|
chunk_size = 1024 # JDBC's sendStreamBlob hardcodes this
|
||||||
|
for blob in blob_payloads:
|
||||||
|
offset = 0
|
||||||
|
while offset < len(blob):
|
||||||
|
chunk = blob[offset : offset + chunk_size]
|
||||||
|
writer.write_short(MessageType.SQ_BLOB)
|
||||||
|
writer.write_short(len(chunk))
|
||||||
|
writer.write_padded(chunk)
|
||||||
|
offset += len(chunk)
|
||||||
|
# Zero-length terminator marks end of THIS blob
|
||||||
|
writer.write_short(MessageType.SQ_BLOB)
|
||||||
|
writer.write_short(0)
|
||||||
|
|
||||||
def _build_bind_only_pdu(self, params: tuple) -> bytes:
|
def _build_bind_only_pdu(self, params: tuple) -> bytes:
|
||||||
"""SQ_BIND with parameter values + SQ_EOT (no SQ_EXECUTE).
|
"""SQ_BIND with parameter values + SQ_EOT (no SQ_EXECUTE).
|
||||||
|
|
||||||
@ -376,28 +536,23 @@ class Cursor:
|
|||||||
writer.write_short(MessageType.SQ_ID)
|
writer.write_short(MessageType.SQ_ID)
|
||||||
writer.write_int(MessageType.SQ_BIND)
|
writer.write_int(MessageType.SQ_BIND)
|
||||||
writer.write_short(len(params))
|
writer.write_short(len(params))
|
||||||
for value in params:
|
blob_payloads = self._emit_bind_params(writer, params)
|
||||||
if value is None:
|
self._emit_blob_stream(writer, blob_payloads)
|
||||||
writer.write_short(0)
|
|
||||||
writer.write_short(-1)
|
|
||||||
writer.write_short(0)
|
|
||||||
else:
|
|
||||||
ifx_type, prec, raw = encode_param(value)
|
|
||||||
writer.write_short(ifx_type)
|
|
||||||
writer.write_short(0)
|
|
||||||
writer.write_short(prec)
|
|
||||||
writer.write_padded(raw)
|
|
||||||
writer.write_short(MessageType.SQ_EOT)
|
writer.write_short(MessageType.SQ_EOT)
|
||||||
return buf.getvalue()
|
return buf.getvalue()
|
||||||
|
|
||||||
def _build_bind_execute_pdu(self, params: tuple) -> bytes:
|
def _build_bind_execute_pdu(self, params: tuple) -> bytes:
|
||||||
"""SQ_BIND with parameter values + SQ_EXECUTE + SQ_EOT.
|
"""SQ_BIND + (optional SQ_BBIND blob stream) + SQ_EXECUTE + SQ_EOT.
|
||||||
|
|
||||||
From the JDBC capture (msg[29] in 02-dml-cycle.socat.log):
|
From the JDBC capture (msg[29] in 02-dml-cycle.socat.log) plus
|
||||||
|
the BYTE/TEXT extension found in IfxSqli.sendBind line 998+:
|
||||||
[short SQ_ID=4][int 5=SQ_BIND][short numparams]
|
[short SQ_ID=4][int 5=SQ_BIND][short numparams]
|
||||||
for each param:
|
for each param:
|
||||||
[short type][short indicator][short prec]
|
[short type][short indicator][short prec]
|
||||||
writePadded(data) # data + 0-pad if odd-len
|
writePadded(data) # data + 0-pad if odd-len
|
||||||
|
-- if any blob params --
|
||||||
|
[short SQ_BBIND=41][short blob_count]
|
||||||
|
for each blob: SQ_BLOB chunks ending with [SQ_BLOB][0]
|
||||||
[short SQ_EXECUTE=7]
|
[short SQ_EXECUTE=7]
|
||||||
[short SQ_EOT]
|
[short SQ_EOT]
|
||||||
"""
|
"""
|
||||||
@ -405,18 +560,8 @@ class Cursor:
|
|||||||
writer.write_short(MessageType.SQ_ID)
|
writer.write_short(MessageType.SQ_ID)
|
||||||
writer.write_int(MessageType.SQ_BIND) # action = 5
|
writer.write_int(MessageType.SQ_BIND) # action = 5
|
||||||
writer.write_short(len(params))
|
writer.write_short(len(params))
|
||||||
for value in params:
|
blob_payloads = self._emit_bind_params(writer, params)
|
||||||
if value is None:
|
self._emit_blob_stream(writer, blob_payloads)
|
||||||
# NULL: type=0, indicator=-1, prec=0, no data
|
|
||||||
writer.write_short(0)
|
|
||||||
writer.write_short(-1)
|
|
||||||
writer.write_short(0)
|
|
||||||
else:
|
|
||||||
ifx_type, prec, raw = encode_param(value)
|
|
||||||
writer.write_short(ifx_type)
|
|
||||||
writer.write_short(0) # indicator = 0 (normal)
|
|
||||||
writer.write_short(prec)
|
|
||||||
writer.write_padded(raw)
|
|
||||||
writer.write_short(MessageType.SQ_EXECUTE) # 7
|
writer.write_short(MessageType.SQ_EXECUTE) # 7
|
||||||
writer.write_short(MessageType.SQ_EOT)
|
writer.write_short(MessageType.SQ_EOT)
|
||||||
return buf.getvalue()
|
return buf.getvalue()
|
||||||
@ -503,7 +648,8 @@ class Cursor:
|
|||||||
if tag == MessageType.SQ_EOT:
|
if tag == MessageType.SQ_EOT:
|
||||||
return
|
return
|
||||||
elif tag == MessageType.SQ_DESCRIBE:
|
elif tag == MessageType.SQ_DESCRIBE:
|
||||||
self._columns, _ = parse_describe(reader)
|
self._columns, meta = parse_describe(reader)
|
||||||
|
self._statement_id = meta.get("statement_id", 0)
|
||||||
self._description = (
|
self._description = (
|
||||||
[c.to_description_tuple() for c in self._columns] if self._columns else None
|
[c.to_description_tuple() for c in self._columns] if self._columns else None
|
||||||
)
|
)
|
||||||
|
|||||||
@ -43,6 +43,7 @@ public class RefClient {
|
|||||||
case "connect-only": runConnectOnly(); break;
|
case "connect-only": runConnectOnly(); break;
|
||||||
case "select-1": runSelect1(); break;
|
case "select-1": runSelect1(); break;
|
||||||
case "dml-cycle": runDmlCycle(); break;
|
case "dml-cycle": runDmlCycle(); break;
|
||||||
|
case "byte-cycle": runByteCycle(); break;
|
||||||
case "all":
|
case "all":
|
||||||
runConnectOnly();
|
runConnectOnly();
|
||||||
runSelect1();
|
runSelect1();
|
||||||
@ -151,4 +152,45 @@ public class RefClient {
|
|||||||
// Temp table dropped automatically on disconnect; no DROP needed.
|
// Temp table dropped automatically on disconnect; no DROP needed.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// -------------------------------------------------------------------
|
||||||
|
// Scenario D: BYTE column write+read cycle. Requires:
|
||||||
|
// - logged DB (env IFX_DATABASE=testdb)
|
||||||
|
// - a blobspace named "blobspace1" already created
|
||||||
|
// Set IFX_DATABASE=testdb before running.
|
||||||
|
// -------------------------------------------------------------------
|
||||||
|
static void runByteCycle() throws SQLException {
|
||||||
|
log("=== byte-cycle ===");
|
||||||
|
String table = "byte_" + Long.toHexString(System.nanoTime());
|
||||||
|
try (Connection c = DriverManager.getConnection(url(), USER, PASSWORD)) {
|
||||||
|
c.setAutoCommit(true);
|
||||||
|
|
||||||
|
try (Statement s = c.createStatement()) {
|
||||||
|
log("CREATE TABLE %s (id INT, data BYTE IN blobspace1)", table);
|
||||||
|
s.execute("CREATE TABLE " + table + " (id INT, data BYTE IN blobspace1)");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] payload = "hello bytes from JDBC".getBytes();
|
||||||
|
try (PreparedStatement ps = c.prepareStatement(
|
||||||
|
"INSERT INTO " + table + " VALUES (?, ?)")) {
|
||||||
|
ps.setInt(1, 1);
|
||||||
|
ps.setBytes(2, payload);
|
||||||
|
int n = ps.executeUpdate();
|
||||||
|
log("INSERT rowcount=%d (sent %d bytes)", n, payload.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
try (Statement s = c.createStatement();
|
||||||
|
ResultSet rs = s.executeQuery("SELECT id, data FROM " + table)) {
|
||||||
|
while (rs.next()) {
|
||||||
|
byte[] got = rs.getBytes(2);
|
||||||
|
log(" row: id=%d data.len=%d data=%s", rs.getInt(1),
|
||||||
|
got.length, new String(got));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try (Statement s = c.createStatement()) {
|
||||||
|
s.execute("DROP TABLE " + table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
240
tests/test_blob.py
Normal file
240
tests/test_blob.py
Normal file
@ -0,0 +1,240 @@
|
|||||||
|
"""Phase 8 integration tests — BYTE/TEXT round-trip via SQ_BBIND/SQ_BLOB.
|
||||||
|
|
||||||
|
BYTE/TEXT use a multi-PDU wire protocol: the SQ_BIND payload carries a
|
||||||
|
56-byte blob descriptor (with size at offset [16..19]); the actual bytes
|
||||||
|
travel via SQ_BBIND + chunked SQ_BLOB messages after SQ_BIND. On read,
|
||||||
|
the SQ_TUPLE payload returns only the descriptor; the client must
|
||||||
|
explicitly fetch the bytes via SQ_FETCHBLOB while the cursor is still
|
||||||
|
open (locator invalidated by CLOSE).
|
||||||
|
|
||||||
|
Server-side requirements (preconfigured in the dev container by Phase 7
|
||||||
|
setup): blobspace1 + sbspace1 + a logged database (testdb). The blobspace
|
||||||
|
also requires a level-0 archive before allocating pages — done via
|
||||||
|
``ontape -s -L 0 -t /dev/null`` once.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
from collections.abc import Iterator
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import informix_db
|
||||||
|
from tests.conftest import ConnParams
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.integration
|
||||||
|
|
||||||
|
|
||||||
|
def _connect(params: ConnParams) -> informix_db.Connection:
|
||||||
|
return informix_db.connect(
|
||||||
|
host=params.host,
|
||||||
|
port=params.port,
|
||||||
|
user=params.user,
|
||||||
|
password=params.password,
|
||||||
|
database=params.database,
|
||||||
|
server=params.server,
|
||||||
|
connect_timeout=10.0,
|
||||||
|
read_timeout=10.0,
|
||||||
|
autocommit=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def byte_table(logged_db_params: ConnParams) -> Iterator[str]:
|
||||||
|
"""Create a fresh permanent BYTE table per test, drop on teardown."""
|
||||||
|
table = "t_blob_byte"
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
cur.execute(f"DROP TABLE {table}")
|
||||||
|
try:
|
||||||
|
cur.execute(
|
||||||
|
f"CREATE TABLE {table} (id INT, data BYTE IN blobspace1)"
|
||||||
|
)
|
||||||
|
except informix_db.Error as e:
|
||||||
|
pytest.skip(
|
||||||
|
f"blobspace1 unavailable ({e!r}); set up per Phase 7"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
yield table
|
||||||
|
finally:
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
cur.execute(f"DROP TABLE {table}")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def text_table(logged_db_params: ConnParams) -> Iterator[str]:
|
||||||
|
"""Create a fresh permanent TEXT table per test, drop on teardown."""
|
||||||
|
table = "t_blob_text"
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
cur.execute(f"DROP TABLE {table}")
|
||||||
|
try:
|
||||||
|
cur.execute(
|
||||||
|
f"CREATE TABLE {table} (id INT, data TEXT IN blobspace1)"
|
||||||
|
)
|
||||||
|
except informix_db.Error as e:
|
||||||
|
pytest.skip(
|
||||||
|
f"blobspace1 unavailable ({e!r}); set up per Phase 7"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
yield table
|
||||||
|
finally:
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
cur.execute(f"DROP TABLE {table}")
|
||||||
|
|
||||||
|
|
||||||
|
# -------- BYTE round-trip --------
|
||||||
|
|
||||||
|
|
||||||
|
def test_byte_roundtrip_short(
|
||||||
|
logged_db_params: ConnParams, byte_table: str
|
||||||
|
) -> None:
|
||||||
|
"""Short BYTE payload (<1024 bytes, single SQ_BLOB chunk)."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
f"INSERT INTO {byte_table} VALUES (?, ?)",
|
||||||
|
(1, b"hello bytes round trip"),
|
||||||
|
)
|
||||||
|
cur.execute(f"SELECT id, data FROM {byte_table}")
|
||||||
|
assert cur.fetchall() == [(1, b"hello bytes round trip")]
|
||||||
|
|
||||||
|
|
||||||
|
def test_byte_roundtrip_multichunk(
|
||||||
|
logged_db_params: ConnParams, byte_table: str
|
||||||
|
) -> None:
|
||||||
|
"""Larger BYTE payload spanning multiple SQ_BLOB chunks (>1024 bytes)."""
|
||||||
|
payload = bytes(range(256)) * 20 # 5120 bytes
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload)
|
||||||
|
)
|
||||||
|
cur.execute(f"SELECT data FROM {byte_table}")
|
||||||
|
(got,) = cur.fetchone()
|
||||||
|
assert got == payload
|
||||||
|
assert len(got) == 5120
|
||||||
|
|
||||||
|
|
||||||
|
def test_byte_null(
|
||||||
|
logged_db_params: ConnParams, byte_table: str
|
||||||
|
) -> None:
|
||||||
|
"""NULL BYTE column: byte 39 of descriptor=1 → Python None."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
f"INSERT INTO {byte_table} VALUES (?, NULL)", (1,)
|
||||||
|
)
|
||||||
|
cur.execute(f"SELECT id, data FROM {byte_table}")
|
||||||
|
assert cur.fetchall() == [(1, None)]
|
||||||
|
|
||||||
|
|
||||||
|
def test_byte_multi_row(
|
||||||
|
logged_db_params: ConnParams, byte_table: str
|
||||||
|
) -> None:
|
||||||
|
"""Multiple rows with BYTE columns — each gets its own SQ_FETCHBLOB."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.executemany(
|
||||||
|
f"INSERT INTO {byte_table} VALUES (?, ?)",
|
||||||
|
[
|
||||||
|
(1, b"first row data"),
|
||||||
|
(2, b"second row content"),
|
||||||
|
(3, b"third"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
cur.execute(
|
||||||
|
f"SELECT id, data FROM {byte_table} ORDER BY id"
|
||||||
|
)
|
||||||
|
assert cur.fetchall() == [
|
||||||
|
(1, b"first row data"),
|
||||||
|
(2, b"second row content"),
|
||||||
|
(3, b"third"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_byte_binary_safe(
|
||||||
|
logged_db_params: ConnParams, byte_table: str
|
||||||
|
) -> None:
|
||||||
|
"""BYTE preserves arbitrary binary data including nulls and high bytes."""
|
||||||
|
payload = bytes([0, 1, 255, 0, 254, 128, 0]) + b"\x00\x00\x00\xff"
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
f"INSERT INTO {byte_table} VALUES (?, ?)", (1, payload)
|
||||||
|
)
|
||||||
|
cur.execute(f"SELECT data FROM {byte_table}")
|
||||||
|
(got,) = cur.fetchone()
|
||||||
|
assert got == payload
|
||||||
|
|
||||||
|
|
||||||
|
# -------- TEXT round-trip --------
|
||||||
|
|
||||||
|
|
||||||
|
def test_text_roundtrip(
|
||||||
|
logged_db_params: ConnParams, text_table: str
|
||||||
|
) -> None:
|
||||||
|
"""TEXT column round-trip: bytes in, str out (decoded as iso-8859-1)."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
f"INSERT INTO {text_table} VALUES (?, ?)",
|
||||||
|
(1, b"this is some text content"),
|
||||||
|
)
|
||||||
|
cur.execute(f"SELECT id, data FROM {text_table}")
|
||||||
|
assert cur.fetchall() == [(1, "this is some text content")]
|
||||||
|
|
||||||
|
|
||||||
|
def test_text_with_unicode_iso8859(
|
||||||
|
logged_db_params: ConnParams, text_table: str
|
||||||
|
) -> None:
|
||||||
|
"""ISO-8859-1 characters preserved through the TEXT pipeline."""
|
||||||
|
payload = "café résumé naïve".encode("iso-8859-1")
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
f"INSERT INTO {text_table} VALUES (?, ?)", (1, payload)
|
||||||
|
)
|
||||||
|
cur.execute(f"SELECT data FROM {text_table}")
|
||||||
|
(got,) = cur.fetchone()
|
||||||
|
assert got == "café résumé naïve"
|
||||||
|
|
||||||
|
|
||||||
|
def test_text_null(
|
||||||
|
logged_db_params: ConnParams, text_table: str
|
||||||
|
) -> None:
|
||||||
|
"""NULL TEXT column → Python None."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
f"INSERT INTO {text_table} VALUES (?, NULL)", (1,)
|
||||||
|
)
|
||||||
|
cur.execute(f"SELECT data FROM {text_table}")
|
||||||
|
assert cur.fetchone() == (None,)
|
||||||
|
|
||||||
|
|
||||||
|
# -------- Mixed columns --------
|
||||||
|
|
||||||
|
|
||||||
|
def test_byte_alongside_other_types(
|
||||||
|
logged_db_params: ConnParams, byte_table: str
|
||||||
|
) -> None:
|
||||||
|
"""A row with BYTE + INT columns — descriptor is in tuple, blob fetched separately."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
cur.executemany(
|
||||||
|
f"INSERT INTO {byte_table} VALUES (?, ?)",
|
||||||
|
[(42, b"hello"), (99, b"world")],
|
||||||
|
)
|
||||||
|
cur.execute(
|
||||||
|
f"SELECT id, data FROM {byte_table} ORDER BY id"
|
||||||
|
)
|
||||||
|
rows = cur.fetchall()
|
||||||
|
assert rows == [(42, b"hello"), (99, b"world")]
|
||||||
@ -93,12 +93,19 @@ def test_delete_with_param(conn_params: ConnParams) -> None:
|
|||||||
|
|
||||||
|
|
||||||
def test_unsupported_param_type_raises(conn_params: ConnParams) -> None:
|
def test_unsupported_param_type_raises(conn_params: ConnParams) -> None:
|
||||||
"""Phase 4 supports int/float/str/bool/None; other types raise."""
|
"""Driver accepts a known set of Python types; other types raise.
|
||||||
|
|
||||||
|
Phase 8 added ``bytes``/``bytearray``, so the canonical "unsupported"
|
||||||
|
sentinel is now an arbitrary Python class with no encoder dispatch.
|
||||||
|
"""
|
||||||
|
class CustomType:
|
||||||
|
pass
|
||||||
|
|
||||||
with _connect(conn_params) as conn:
|
with _connect(conn_params) as conn:
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
cur.execute("CREATE TEMP TABLE t_p_f (id INTEGER)")
|
cur.execute("CREATE TEMP TABLE t_p_f (id INTEGER)")
|
||||||
with pytest.raises(NotImplementedError, match="bytes"):
|
with pytest.raises(NotImplementedError, match="CustomType"):
|
||||||
cur.execute("INSERT INTO t_p_f VALUES (?)", (b"raw bytes",))
|
cur.execute("INSERT INTO t_p_f VALUES (?)", (CustomType(),))
|
||||||
|
|
||||||
|
|
||||||
def test_parameterized_select_int(conn_params: ConnParams) -> None:
|
def test_parameterized_select_int(conn_params: ConnParams) -> None:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user