Ryan Malloy ad55391bf1 Phase 39: Connection-scoped read-ahead buffer (2026.05.05.12)
Closes the bulk-fetch gap to within ~7-15% of IfxPy. Lever was the
buffer/I/O machinery, not the codec — Phase 37/38 had already brought
the codec close to IfxPy's C path; the remaining gap was ~450k
read_exact calls per 100k-row fetch, each doing its own recv-loop
and bytes.join.

Architecture: IfxSocket owns a connection-scoped bytearray + integer
offset cursor; BufferedSocketReader is a thin parser-view that delegates
buffer-fill to the socket. One recv() per ~64 KB instead of per field.
This is how asyncpg (buffer.pyx) and psycopg3 (pq.PGconn) structure
their read paths.

The buffer MUST be socket-scoped, not reader-scoped: the pipelined-
executemany path (Phase 33) streams N responses back-to-back across
multiple cursor reads, and a per-reader buffer would throw away
pre-fetched bytes when one reader is destroyed. (The first iteration
of this phase tried per-reader and hung on test_executemany_1000_rows.)

A/B vs Phase 38, same harness, warmed cache:
  select_scaling_1000     2.90 ->  1.72 ms  (-41%)
  select_scaling_10000   24.32 -> 16.08 ms  (-34%)
  select_scaling_100000 250.36 -> 168.98 ms (-32%)

Head-to-head vs IfxPy 2.0.7:
  select_scaling_1000     1.05x   (basically tied)
  select_scaling_10000    1.07x
  select_scaling_100000   1.15x

IQR collapsed 9x at 100k (3.6 ms -> 0.4 ms) — fewer recvs means fewer
scheduler/jitter pulses showing up in the measurement.

Default ON. Set IFX_BUFFERED_READER=0 to fall back to the legacy reader
(still tested in CI as the escape hatch). Both paths green: 251/251
integration tests pass on each.
2026-05-08 01:40:54 -06:00

1767 lines
75 KiB
Python

"""DB-API 2.0 Cursor — SELECT execution and row iteration.
Phase 2 implements the full JDBC cursor lifecycle for parameterless SELECTs:
C → PREPARE + NDESCRIBE + WANTDONE (one PDU)
C ← DESCRIBE (column metadata) + DONE + COST + EOT
C → SQ_ID(CURNAME) + SQ_ID(NFETCH 4096) (one PDU)
C ← TUPLE* + DONE + COST + EOT (rows + completion)
C → SQ_ID(NFETCH 4096) (drain to end)
C ← DONE + COST + EOT (no more rows)
C → SQ_ID(CLOSE) (close cursor)
C ← EOT
C → SQ_ID(RELEASE) (release statement)
C ← EOT
Parameter binding (SQ_BIND inserted between PREPARE and CURNAME) lands
in Phase 4.
"""
from __future__ import annotations
import contextlib
import itertools
import logging
import os
import struct
import weakref
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any
from . import _errcodes
from ._messages import MessageType
from ._protocol import BufferedSocketReader, IfxStreamReader, make_pdu_writer
from ._resultset import (
ColumnInfo,
compile_column_readers,
compile_row_decoder,
parse_describe,
parse_tuple_payload,
)
from .converters import encode_param
from .exceptions import (
DatabaseError,
InterfaceError,
NotSupportedError,
ProgrammingError,
)
if TYPE_CHECKING:
from .connections import Connection
# Process-wide cursor name counter — appended to a "_ifxc" prefix to mimic
# JDBC's auto-generated names.
_cursor_counter = itertools.count(1)
_NUMERIC_PLACEHOLDER_RE = __import__("re").compile(r":(\d+)")
# Phase 28: pre-built CLOSE and RELEASE PDU bytes for cursor finalizers.
# These are stateless — every cursor sends the same SQ_ID(CLOSE) +
# SQ_ID(RELEASE) sequence to free server-side resources. Building them
# once at module load lets the GC-time finalizer (which can run on any
# thread, with no Python state guarantees) avoid touching cursor methods.
def _build_static_pdu(op: int) -> bytes:
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(op)
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
_CLOSE_PDU = _build_static_pdu(MessageType.SQ_CLOSE)
_RELEASE_PDU = _build_static_pdu(MessageType.SQ_RELEASE)
del _build_static_pdu
_log = logging.getLogger(__name__)
# Phase 39: buffered socket reader is the default. The reader holds a
# connection-scoped bytearray + offset cursor on ``IfxSocket``, so one
# ``recv()`` per ~64 KB amortizes over hundreds of fields rather than
# the per-field-recv pattern the legacy ``_SocketReader`` uses.
#
# Set ``IFX_BUFFERED_READER=0`` to fall back to the legacy reader if
# you hit a wire shape we haven't covered. The legacy path is kept as
# the escape hatch (and as the A/B baseline if we ever need to bisect
# a regression). Read once at module import so the check isn't on the
# hot path.
_USE_BUFFERED_READER = os.environ.get("IFX_BUFFERED_READER", "1") != "0"
def _make_socket_reader(sock):
if _USE_BUFFERED_READER:
return BufferedSocketReader(sock)
return _SocketReader(sock)
def _finalize_cursor(
conn_ref: weakref.ReferenceType,
state: list,
) -> None:
"""Best-effort cleanup of server-side cursor resources at GC time.
Phase 28: if a Cursor is garbage-collected without a prior explicit
``close()``, this finalizer attempts to send CLOSE + RELEASE so the
server doesn't leak the prepared statement / scrollable cursor handle.
**Crucial constraint**: this can run on ANY thread (cyclic GC,
weakref callback delivery, etc.). It MUST NOT block on the wire
lock — doing so could deadlock against a thread that's mid-operation
on the same connection. We therefore try a non-blocking acquire
and give up if the lock is held; the server-side resource leaks
until session close, but that's a soft failure (resource limits)
rather than a hard one (deadlock).
``state`` is a single-element list ``[bool]`` mutated by the cursor
to signal whether it has server-side resources outstanding. Using
a list (not the cursor object itself) keeps the finalizer's closure
weak — the cursor remains GC'd-able.
"""
from ._protocol import ProtocolError
if not state[0]:
return # nothing to release
conn = conn_ref()
if conn is None or conn.closed:
return
if not conn._wire_lock.acquire(blocking=False):
# Another thread is mid-operation on this connection. Don't
# deadlock; instead, hand the cleanup bytes to the connection's
# deferred-cleanup queue. The next ``_send_pdu`` on this
# connection will drain the queue and send these PDUs at the
# right moment (under the wire lock). Closes the unbounded-leak
# gap on long-lived pooled connections.
conn._enqueue_cleanup([_CLOSE_PDU, _RELEASE_PDU])
_log.debug(
"cursor finalizer: wire lock busy; enqueued CLOSE+RELEASE "
"for deferred cleanup on conn %s",
id(conn),
)
return
try:
try:
conn._send_pdu(_CLOSE_PDU)
conn._drain_to_eot()
conn._send_pdu(_RELEASE_PDU)
conn._drain_to_eot()
except (ProtocolError, OSError) as exc:
# Wire desync during cleanup — same doctrine as
# ``_raise_sq_err``: the wire is unrecoverable, force-close
# the connection. Asymmetric handling of the same failure
# mode would be a Hamilton smell.
_log.warning(
"cursor finalizer: wire desync during cleanup; "
"force-closing connection: %r",
exc,
)
conn._closed = True
with contextlib.suppress(Exception):
conn._sock.close()
except InterfaceError:
# Connection was closed by another thread between our
# ``conn.closed`` check above and the actual write. No-op:
# the resource we wanted to release is also gone.
pass
finally:
conn._wire_lock.release()
def _rewrite_numeric_to_qmark(sql: str) -> str:
"""Convert ``:1`` / ``:2`` placeholders (paramstyle="numeric") to ``?``.
Informix's wire protocol uses ``?`` natively. Since we expose
``paramstyle="numeric"`` in the public API (matches Informix
ESQL/C convention), we rewrite before sending. Trivial cases only
— strings and comments are NOT escaped, so SQL containing literal
``:1`` inside string literals will be wrongly substituted. Phase 5
can add a proper SQL tokenizer.
"""
return _NUMERIC_PLACEHOLDER_RE.sub("?", sql)
def _generate_cursor_name() -> str:
"""Generate a unique cursor name per the JDBC convention.
JDBC names are "_ifxc" + zero-padded counter, total 18 characters.
We replicate the format so the server treats us identically.
"""
n = next(_cursor_counter)
return f"_ifxc{n:013d}" # 5-char prefix + 13 digits = 18 chars
class Cursor:
"""PEP 249 Cursor over a SQLI session."""
arraysize: int = 1
def __init__(
self, connection: Connection, *, scrollable: bool = False
):
self._conn = connection
self._closed = False
# Phase 18: scrollable=True opens a server-side scrollable
# cursor that doesn't materialize all rows up-front. Each
# scroll method sends SQ_SFETCH (tag 23) per call.
# scrollable=False (default): existing in-memory model from
# Phase 17 — execute() materializes all rows; scroll is index
# manipulation. Two-mode cursor; the same surface API works
# for both.
self._scrollable = scrollable
self._description: list[tuple] | None = None
self._columns: list[ColumnInfo] = []
self._column_readers: list[tuple] | None = None # Phase 37
self._row_decoder = None # Phase 38 codegen'd row decoder
self._rowcount: int = -1
self._rows: list[tuple] = []
# Phase 17: index-based row access enables scroll cursors. The
# cursor materializes all rows on execute() (non-scrollable),
# then ``fetchone`` / ``scroll`` / ``fetch_*`` move ``_row_index``
# through them. For scrollable cursors, ``_row_index`` instead
# tracks the *server-side* position (1-indexed for SQ_SFETCH).
# Default position is "before first row" (-1) so the first
# ``fetchone()`` returns row 1.
self._row_index: int = -1
# Phase 18: tracks whether a server-side scrollable cursor is
# still open. cur.close() sends CLOSE + RELEASE when True.
self._server_cursor_open: bool = False
# Phase 18: cached row count for scrollable cursors. Discovered
# lazily by SFETCH(LAST). Used so fetch_last() / negative
# absolute indexes can compute the target.
self._scroll_total_rows: int | None = None
# Phase 18: most-recent SQ_TUPID value from the server. The
# server sends this with every scrollable-cursor SFETCH
# response carrying the 1-indexed row position of the row it
# just delivered. Captures the source of truth for "what row
# did we get" — vital for SFETCH(LAST) where the response is
# ``[TUPLE][TUPID]`` with no SQ_DONE / rowcount payload.
self._last_tupid: int | None = None
# Set if the DESCRIBE response already includes SQ_INSERTDONE —
# Informix optimizes literal-value INSERTs by executing during
# PREPARE. In that case we skip SQ_EXECUTE and go straight to RELEASE.
self._statement_already_done = False
# Statement ID from the DESCRIBE response. Used by SQ_FETCHBLOB
# to identify which prepared statement the blob descriptor came
# from. Empirically the server accepts 0 here even when a real
# ID was assigned, so this is best-effort tracking.
self._statement_id: int = 0
# Phase 10: smart-LOB read via ``lotofile(col, path, 'client')``.
# The server orchestrates a SQ_FILE (98) protocol where it tells
# us to "open file X, write these bytes, close". We emulate the
# filesystem in memory — the bytes are buffered keyed by the
# filename the server requested. Users can retrieve them via
# ``cursor.blob_files[path]`` after the SELECT completes.
self.blob_files: dict[str, bytes] = {}
self._sqfile_current_name: str | None = None
self._sqfile_current_buf: bytearray | None = None
# Phase 11: smart-LOB write via ``filetoblob(path, 'client', ...)``.
# The server tells us "open file X for reading, send me chunks".
# Users register the bytes they want uploaded keyed by filename
# in ``virtual_files``; our SQ_FILE handler streams them on
# request. After the INSERT completes, the registry can be
# cleared (or kept for batched uploads).
self.virtual_files: dict[str, bytes] = {}
self._sqfile_read_source: bytes | None = None
self._sqfile_read_offset: int = 0
# Phase 28: register a finalizer that releases server-side
# cursor resources at GC time if the user forgot to call
# ``close()``. The state-list pattern keeps the closure weak
# — the cursor itself isn't captured, just a list whose value
# the cursor mutates. See ``_finalize_cursor`` for details.
#
# Thread safety: ``state[0] = True`` / ``state[0] = False`` and
# ``if state[0]`` are single bytecode ops in CPython; the GIL
# makes them atomic, no torn reads. PyPy has the same property.
# Free-threaded CPython (PEP 703, opt-in via ``--disable-gil``)
# is where this could regress — swap to ``threading.Event``
# if/when targeting that runtime.
self._finalizer_state: list = [False]
self._finalizer = weakref.finalize(
self,
_finalize_cursor,
weakref.ref(connection),
self._finalizer_state,
)
# -- PEP 249 attributes ------------------------------------------------
@property
def description(self) -> list[tuple] | None:
return self._description
@property
def rowcount(self) -> int:
return self._rowcount
@property
def closed(self) -> bool:
return self._closed
# -- PEP 249 methods ---------------------------------------------------
def execute(self, operation: str, parameters: Any = None) -> None:
"""Execute a single SQL statement, optionally with bound parameters.
``parameters`` is a sequence (tuple/list) matching the ``?`` or
``:N`` placeholders in ``operation``. Phase 4 supports int, float,
str, bool, None.
Phase 27: serializes the entire wire round-trip under
``self._conn._wire_lock``. Two threads on one connection (or
async cancellation leaving a worker still mid-execute) cannot
interleave PDU bytes — the second blocks until the first
completes (or is evicted via the pool's release-timeout).
"""
self._check_open()
# Normalize parameters to a tuple for indexing.
params: tuple = ()
if parameters is not None:
if isinstance(parameters, dict):
raise NotSupportedError("named parameters not yet supported (use positional)")
params = tuple(parameters)
# If using paramstyle="numeric", rewrite :1 / :2 → ?
sql = _rewrite_numeric_to_qmark(operation) if params else operation
with self._conn._wire_lock:
self._execute_under_wire_lock(sql, params)
def _execute_under_wire_lock(self, sql: str, params: tuple) -> None:
"""Wire-bound body of ``execute``. Caller MUST hold ``_wire_lock``."""
# Reset previous-execute state.
self._description = None
self._columns = []
self._column_readers = None # Phase 37
self._row_decoder = None # Phase 38
self._rowcount = -1
self._rows = []
self._row_index = -1 # before-first-row
self._statement_already_done = False
# On a logged DB in non-autocommit mode, the server requires an
# explicit SQ_BEGIN before the first DML in each transaction.
# _ensure_transaction is a no-op for autocommit / unlogged DBs,
# and idempotent within an open transaction.
self._conn._ensure_transaction()
# Step 1: PREPARE — send SQL with numQmarks = len(params).
self._conn._send_pdu(self._build_prepare_pdu(sql, num_qmarks=len(params)))
self._read_describe_response()
# Branch on the SQL keyword. We can't use ``self._columns`` /
# ``nfields`` here because a parameterized INSERT also returns
# a non-empty DESCRIBE (server describes the would-be inserted
# row's columns). The SQL-keyword heuristic is what JDBC effectively
# does too via its IfxStatement / IfxPreparedStatement subclassing.
first_word = sql.lstrip().split(None, 1)[0].upper() if sql.strip() else ""
is_select = first_word == "SELECT"
if is_select:
if params:
self._execute_select_with_params(params)
else:
self._execute_select()
elif params:
self._execute_dml_with_params(params)
else:
self._execute_dml()
# SELECT path: position cursor before the first row so the next
# ``fetchone()`` returns ``rows[0]``. DML paths leave _row_index
# at -1 too (no rows to iterate).
# (No-op now — the reset above already set _row_index = -1 — but
# left explicit for symmetry with prior _row_iter logic.)
if self._description is not None:
self._row_index = -1
def _execute_select_with_params(self, params: tuple) -> None:
"""Parameterized SELECT: SQ_BIND → CURNAME+NFETCH → drain → CLOSE+RELEASE.
Note that CURNAME defines the cursor name and is paired with the
prepared statement; binding happens before opening the cursor.
We send SQ_BIND alone first (no SQ_EXECUTE — that's for DML),
then proceed with the normal cursor open + fetch flow.
Mirrors :meth:`_execute_dml_with_params` cleanup: a client-side
failure during bind-build (e.g., a DataError for a string that
can't fit the connection's codec) releases the prepared
statement before propagating.
"""
# Send SQ_BIND alone (without SQ_EXECUTE chained — for SELECT,
# opening the cursor is what executes the prepared query).
try:
pdu = self._build_bind_only_pdu(params)
except Exception:
with contextlib.suppress(Exception):
self._conn._send_pdu(self._build_release_pdu())
self._drain_to_eot()
raise
self._conn._send_pdu(pdu)
self._drain_to_eot()
# Now open the cursor and fetch — the bound values are in scope
# for the prepared statement.
self._execute_select()
def _execute_select(self) -> None:
"""Run the SELECT cursor lifecycle: CURNAME+NFETCH → drain → CLOSE → RELEASE.
For BYTE/TEXT columns: the SQ_TUPLE payload only contains 56-byte
blob descriptors; the actual bytes live in the blobspace and must
be retrieved via ``SQ_FETCHBLOB`` round-trips **while the cursor
is still open**. The locator is invalidated by CLOSE.
Phase 18: when ``self._scrollable`` is True, the cursor is opened
with ``SQ_SCROLL`` and stays open server-side after this method.
Initial rows are NOT fetched; ``fetchone`` / scroll methods
send ``SQ_SFETCH`` per call.
"""
cursor_name = _generate_cursor_name()
if self._scrollable:
self._conn._send_pdu(
self._build_curname_scroll_open_pdu(cursor_name)
)
self._drain_to_eot()
self._server_cursor_open = True
self._finalizer_state[0] = True # arm the GC-time fallback
self._scroll_total_rows = None
return # don't close; cursor stays live for SQ_SFETCH
# Phase 35: NFETCH loop — keep fetching until a response yields
# zero new tuples. The previous "two NFETCHes" pattern silently
# truncated any result set whose tuples didn't fit in 1-2 server
# batches (~200 rows at default 4096-byte buffer x 5-col rows).
# This bug was latent for ~30 phases because no test used a
# large enough result set to trigger it.
self._conn._send_pdu(self._build_curname_nfetch_pdu(cursor_name))
rows_before = len(self._rows)
self._read_fetch_response()
rows_received = len(self._rows) - rows_before
while rows_received > 0:
self._conn._send_pdu(self._build_nfetch_pdu())
rows_before = len(self._rows)
self._read_fetch_response()
rows_received = len(self._rows) - rows_before
# Dereference BYTE/TEXT blob descriptors BEFORE CLOSE — the
# locators are only valid while the cursor is open. No-op when
# no BYTE/TEXT columns are present.
self._dereference_blob_columns()
self._conn._send_pdu(self._build_close_pdu())
self._drain_to_eot()
self._conn._send_pdu(self._build_release_pdu())
self._drain_to_eot()
def _dereference_blob_columns(self) -> None:
"""Replace 56-byte BYTE/TEXT descriptors in ``self._rows`` with the
actual blob bytes via SQ_FETCHBLOB round-trips. No-op when no
BYTE/TEXT columns are present.
Per ``IfxSqli.sendFetchBlob`` (line 3716): the request is
``[SQ_ID][stmt_id][SQ_FETCHBLOB=38][padded 56-byte descriptor][SQ_EOT]``
and the response is one or more ``SQ_BLOB`` (39) chunks ending
with a zero-length terminator.
"""
from ._types import IfxType
# ColumnInfo.type_code is base-typed by construction
# (see parse_describe / INVARIANT comment) — no base_type() needed.
byte_text_codes = (int(IfxType.BYTE), int(IfxType.TEXT))
blob_indices = [
(i, c.type_code)
for i, c in enumerate(self._columns)
if c.type_code in byte_text_codes
]
if not blob_indices:
return
new_rows: list[tuple] = []
for row in self._rows:
row_list = list(row)
for idx, type_code in blob_indices:
descriptor = row_list[idx]
if not isinstance(descriptor, (bytes, bytearray)) or len(descriptor) != 56:
continue
# Byte 39 = null indicator (per IfxBlob.toIfxTuple)
if descriptor[39] == 1:
row_list[idx] = None
continue
blob_bytes = self._fetch_blob(bytes(descriptor))
if type_code == int(IfxType.TEXT):
row_list[idx] = blob_bytes.decode(self._conn.encoding)
else:
row_list[idx] = blob_bytes
new_rows.append(tuple(row_list))
self._rows = new_rows
def _lookup_virtual_file(self, fname: str) -> bytes | None:
"""Look up bytes registered for a given filename.
Tries exact match first, then falls back to prefix match. The
server occasionally rewrites paths (e.g., adds a unique suffix
for ``lotofile`` output, but seemingly NOT for ``filetoblob``
input — kept conservative for safety).
"""
if fname in self.virtual_files:
return self.virtual_files[fname]
# Try prefix match (e.g., user registered '/sentinel' and server
# opens '/sentinel.SUFFIX').
for k, v in self.virtual_files.items():
if fname.startswith(k):
return v
return None
def _send_sqfile_read_response(self, payload: bytes, buf_size: int) -> None:
"""Send the SQ_FILE optype=2 response: client→server file read.
Per ``IfxSqli.receiveSQFILE`` case 2 (line 5103+):
``[short SQ_FILE_READ=106][int total][short 106][short chunkLen]
[padded data]...[short SQ_EOT]`` — ``buf_size`` is the chunk cap.
"""
# Cap chunk size to a sane maximum if server requested 0 (defensive)
if buf_size <= 0:
buf_size = 32_000
out = bytearray()
# Header: tag + total amount being sent
out.extend(struct.pack("!hi", MessageType.SQ_FILE_READ, len(payload)))
# Chunks
offset = 0
while offset < len(payload):
chunk = payload[offset : offset + buf_size]
out.extend(struct.pack("!hh", MessageType.SQ_FILE_READ, len(chunk)))
out.extend(chunk)
if len(chunk) & 1:
out.append(0) # writePadded pad
offset += len(chunk)
# Final SQ_EOT (per JDBC's flip())
out.extend(struct.pack("!h", MessageType.SQ_EOT))
self._conn._send_pdu(bytes(out))
def write_blob_column(
self,
sql: str,
blob_data: bytes,
params: tuple = (),
*,
sentinel: str = "/tmp/_informix_db_blob_in",
clob: bool = False,
) -> None:
"""Insert/update a smart-LOB BLOB or CLOB column with the given bytes.
Wraps the user's SQL by replacing a ``BLOB_PLACEHOLDER`` token
with ``filetoblob('<sentinel>', 'client')`` (or ``filetoclob``
when ``clob=True``). Registers the bytes in ``virtual_files``
keyed by ``<sentinel>`` so the SQ_FILE protocol's read-from-
client path streams them up.
Phase 11 implementation — uses the SQ_FILE optype 2 protocol
instead of the heavier ``ifx_lo_create`` + ``SQ_LODATA`` stack.
Example::
cur.write_blob_column(
"INSERT INTO photos VALUES (?, BLOB_PLACEHOLDER)",
jpeg_bytes,
(42,),
)
# CLOB column:
cur.write_blob_column(
"INSERT INTO docs VALUES (?, BLOB_PLACEHOLDER)",
"Lorem ipsum...".encode("iso-8859-1"),
(1,),
clob=True,
)
The ``BLOB_PLACEHOLDER`` token in the SQL must appear exactly
where the BLOB/CLOB-typed value belongs (typically as a
``VALUES`` item or a ``SET col = ...`` RHS).
"""
# Phase 28: validate that BLOB_PLACEHOLDER appears EXACTLY once.
# The previous ``sql.replace`` would silently corrupt SQL that
# contained the literal string in a comment, column value, or
# other non-slot position. Better to fail loudly than to send
# garbled SQL that the server then rejects with a confusing
# syntax error.
count = sql.count("BLOB_PLACEHOLDER")
if count == 0:
raise ProgrammingError(
"write_blob_column SQL must include a BLOB_PLACEHOLDER token "
"where the BLOB/CLOB value goes"
)
if count > 1:
raise ProgrammingError(
f"write_blob_column SQL contains BLOB_PLACEHOLDER {count} "
"times — must appear exactly once. If your SQL legitimately "
"needs the literal string elsewhere (e.g., in a comment), "
"construct the filetoblob/filetoclob call yourself and use "
"regular execute() with virtual_files registered manually."
)
fn = "filetoclob" if clob else "filetoblob"
substitution = f"{fn}('{sentinel}', 'client')"
rewritten = sql.replace("BLOB_PLACEHOLDER", substitution)
self.virtual_files[sentinel] = blob_data
try:
self.execute(rewritten, params)
finally:
# Clean up to avoid leaking bytes into a future call
self.virtual_files.pop(sentinel, None)
def read_blob_column(
self,
select_blob_sql: str,
params: tuple = (),
*,
sentinel: str = "/tmp/_informix_db_blob",
) -> bytes | None:
"""Fetch the bytes of a single smart-LOB BLOB column.
Wraps the user's SQL with ``lotofile(<col>, sentinel, 'client')``,
runs the query, and returns the bytes assembled from the SQ_FILE
stream. Returns ``None`` if the row's BLOB value is NULL or if
no rows match.
Phase 10 implements this via the SQ_FILE protocol — the server
writes the BLOB content to a "file" on the client side and
we intercept those writes into an in-memory buffer. This avoids
implementing the heavier SQ_FPROUTINE + SQ_LODATA stack.
Caveat: ``select_blob_sql`` must be a SELECT statement returning
exactly one BLOB-typed column. For multi-row reads, call this
method per row (e.g., add a ``WHERE`` clause to scope to one row).
For more general use, run the wrapped SQL yourself and inspect
``self.blob_files``.
Example::
data = cur.read_blob_column(
"SELECT data FROM photos WHERE id = ?", (42,)
)
"""
# Find the column expression — drop "SELECT " prefix
sql = select_blob_sql.strip()
upper = sql.upper()
if not upper.startswith("SELECT"):
raise ProgrammingError(
"read_blob_column requires a SELECT statement"
)
rest = sql[6:].lstrip() # everything after SELECT
# Find the FROM keyword (whitespace-bounded). Naive split — works
# for the common case where the user is selecting a single column.
from_idx = rest.upper().find(" FROM ")
if from_idx < 0:
raise ProgrammingError(
"read_blob_column requires a FROM clause"
)
col_expr = rest[:from_idx].strip()
tail = rest[from_idx:]
wrapped_sql = (
f"SELECT lotofile({col_expr}, '{sentinel}', 'client'){tail}"
)
# Reset blob_files so we only see the one we just fetched
self.blob_files = {}
self.execute(wrapped_sql, params)
row = self.fetchone()
if row is None:
return None
# Server returns a generated filename based on the sentinel
if not self.blob_files:
return None
# If user only fetched one row, the dict has one entry
return next(iter(self.blob_files.values()))
def _handle_sq_file(self, reader: IfxStreamReader) -> None:
"""Process an SQ_FILE (98) message from the server.
The server treats us as a remote filesystem: it tells us to
"open file X, write these bytes, close". We emulate the
filesystem in memory — chunks land in ``self._sqfile_current_buf``
keyed by ``self._sqfile_current_name``, then sealed into
``self.blob_files`` on close.
Sub-types (per ``IfxSqli.receiveSQFILE`` line 4980):
- 0 (open): ``[short fnameLen][bytes fname][int mode][int flags]
[int offset][short SQ_EOT]``. We acknowledge with
``[short SQ_EOT]``.
- 3 (write to client): ``[short SQ_FILE_WRITE=107][short bufSize]
[padded data]`` repeated, terminated by ``SQ_EOT``. We
respond with ``[short 107][int totalSize][short SQ_EOT]``.
- 1 (close): ``[short SQ_EOT]``. We respond with ``[short SQ_EOT]``.
- 2 (read from client): unimplemented (would be filetoblob path).
"""
optype = reader.read_short()
if optype == 0: # open
name_len = reader.read_short()
fname_bytes = reader.read_exact(name_len)
if name_len & 1:
reader.read_exact(1) # readPadded pad
fname = fname_bytes.decode("iso-8859-1")
reader.read_int() # mode (ignored — we're in-memory)
reader.read_int() # flags (ignored)
reader.read_int() # offset (ignored — start at 0)
tail = reader.read_short()
if tail != MessageType.SQ_EOT:
raise DatabaseError(
f"SQ_FILE open: expected SQ_EOT, got 0x{tail:04x}"
)
self._sqfile_current_name = fname
self._sqfile_current_buf = bytearray()
# If the user pre-registered bytes for this filename (or any
# close-enough match — server may add a suffix), prepare the
# read source for an upcoming optype=2.
self._sqfile_read_source = self._lookup_virtual_file(fname)
self._sqfile_read_offset = 0
# Acknowledge with bare SQ_EOT (mirrors JDBC's flip() flush)
self._conn._send_pdu(
struct.pack("!h", MessageType.SQ_EOT)
)
elif optype == 2: # read from client (filetoblob path)
buf_size = reader.read_short() & 0xFFFF
read_amount = reader.read_int() # signed int; -1 = read all
tail = reader.read_short()
if tail != MessageType.SQ_EOT:
raise DatabaseError(
f"SQ_FILE read-from-client: expected SQ_EOT, got 0x{tail:04x}"
)
if self._sqfile_read_source is None:
# No virtual file registered — server expects a real file
# but we're in-memory only. Send a zero-byte response.
self._send_sqfile_read_response(b"", buf_size)
else:
start = self._sqfile_read_offset
if read_amount < 0:
payload = self._sqfile_read_source[start:]
else:
payload = self._sqfile_read_source[start : start + read_amount]
self._sqfile_read_offset += len(payload)
self._send_sqfile_read_response(payload, buf_size)
elif optype == 3: # write to client
total = 0
while True:
chunk_tag = reader.read_short()
if chunk_tag == MessageType.SQ_EOT:
break
if chunk_tag != MessageType.SQ_FILE_WRITE: # 107
raise DatabaseError(
f"SQ_FILE write: unexpected tag 0x{chunk_tag:04x}"
)
buf_size = reader.read_short()
data = reader.read_exact(buf_size)
if buf_size & 1:
reader.read_exact(1) # writePadded pad
if self._sqfile_current_buf is not None:
self._sqfile_current_buf.extend(data)
total += buf_size
# Respond with [short 107][int totalSize][short SQ_EOT]
self._conn._send_pdu(
struct.pack(
"!hih", MessageType.SQ_FILE_WRITE, total, MessageType.SQ_EOT
)
)
elif optype == 1: # close
tail = reader.read_short()
if tail != MessageType.SQ_EOT:
raise DatabaseError(
f"SQ_FILE close: expected SQ_EOT, got 0x{tail:04x}"
)
if self._sqfile_current_name is not None and self._sqfile_current_buf is not None:
self.blob_files[self._sqfile_current_name] = bytes(
self._sqfile_current_buf
)
self._sqfile_current_name = None
self._sqfile_current_buf = None
self._conn._send_pdu(
struct.pack("!h", MessageType.SQ_EOT)
)
else:
raise DatabaseError(
f"SQ_FILE: unsupported optype {optype} (0=open, 1=close, "
f"2=read-from-client, 3=write-to-client; 2 is unimplemented)"
)
def _fetch_blob(self, descriptor: bytes) -> bytes:
"""Send SQ_FETCHBLOB and read the SQ_BLOB stream until terminator."""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_FETCHBLOB) # 38
writer.write_padded(descriptor)
writer.write_short(MessageType.SQ_EOT)
self._conn._send_pdu(buf.getvalue())
reader = _make_socket_reader(self._conn._sock)
chunks: list[bytes] = []
while True:
tag = reader.read_short()
if tag == MessageType.SQ_BLOB:
length = reader.read_short()
if length == 0:
continue # zero-length marks end-of-blob, but stream
# may still have SQ_DONE/EOT after; keep reading
chunks.append(reader.read_exact(length))
if length & 1:
reader.read_exact(1) # writePadded even-byte align
elif tag == MessageType.SQ_EOT:
break
elif tag == MessageType.SQ_DONE:
self._consume_done(reader)
elif tag == 55: # SQ_COST
reader.read_int()
reader.read_int()
elif tag == MessageType.SQ_XACTSTAT:
reader.read_exact(2 + 2 + 2)
elif tag == MessageType.SQ_ERR:
self._raise_sq_err(reader)
else:
raise DatabaseError(
f"unexpected tag in FETCHBLOB response: 0x{tag:04x}"
)
return b"".join(chunks)
def _execute_dml_with_params(self, params: tuple) -> None:
"""DML with bound parameters: SQ_BIND + SQ_EXECUTE → SQ_RELEASE.
Per JDBC's sendExecute path for prepared statements (line 1108
of IfxSqli): build a single PDU containing SQ_BIND with all
parameter values followed by SQ_EXECUTE.
If parameter encoding raises (e.g., :class:`DataError` for a
non-representable string), the prepared statement is still
allocated on the server. Send the SQ_RELEASE before propagating
— otherwise the next ``execute()`` finds a half-state connection.
"""
try:
pdu = self._build_bind_execute_pdu(params)
except Exception:
with contextlib.suppress(Exception):
self._conn._send_pdu(self._build_release_pdu())
self._drain_to_eot()
raise
self._conn._send_pdu(pdu)
self._drain_to_eot()
self._conn._send_pdu(self._build_release_pdu())
self._drain_to_eot()
def _execute_dml(self) -> None:
"""Run the DDL/DML path: SQ_EXECUTE → SQ_RELEASE.
For statements that don't return rows (CREATE, INSERT, UPDATE,
DELETE, DROP), the server's DESCRIBE response has ``nfields=0``.
We don't open a cursor — just execute the prepared statement and
release it. Per JDBC's executeExecute path for non-prepared
statements (line 1075 of IfxSqli.sendExecute).
Note: when the DESCRIBE response includes SQ_INSERTDONE for a
literal-value INSERT, that's METADATA about the would-be insert
(auto-generated serial values), NOT the actual execution. We
still need SQ_EXECUTE to make the row persist. Lesson: don't
let the optimization-looking response confuse you.
"""
self._conn._send_pdu(self._build_execute_pdu())
self._drain_to_eot() # reads DONE + COST + EOT, populates rowcount
self._conn._send_pdu(self._build_release_pdu())
self._drain_to_eot()
def executemany(self, operation: str, seq_of_parameters: Any) -> None:
"""Execute the same SQL once per parameter set.
Per PEP 249. Common case is batched INSERT. We PREPARE once,
send N SQ_BIND+SQ_EXECUTE PDUs in a pipelined batch, then drain
N responses, then RELEASE once. Phase 33 introduces the pipeline
— earlier serial-loop implementations paid one wire round-trip
per row (~30 us/row on loopback x N rows = the dominant cost
for any sizeable batch).
Phase 4 supports DML (INSERT/UPDATE/DELETE) only — SELECT in
executemany doesn't make much sense and isn't implemented.
**Pipelining safety** (Phase 33):
* The Phase 27 wire lock holds for the whole executemany, so
the entire send-batch + drain-batch is atomic against other
threads on the connection.
* TCP send buffer (~16-256 KB) easily fits 1000 PDUs (~80-200
KB worst case); response packets are tiny (~10 bytes per
OK), so the server's send buffer can't fill before we drain.
Note: ``sendall`` doesn't honor a write timeout reliably on
all kernels — a wedged peer could block until TCP keepalive
fires (default ~2 hours). For hostile-network deployments,
set ``keepalive=True`` on connect.
* On the first error mid-drain, remaining responses are
drained silently (they're SQ_ERR replies for rows that the
aborted transaction couldn't commit anyway). Wire alignment
is verified by ``test_executemany_pipeline.py`` — Informix
does send N responses for N pipelined PDUs even when one
fails. If a future Informix version changes that behavior,
those tests fail loudly.
* **Lock duration scales O(N) with batch size.** For very
large batches (>10000 rows), other threads waiting on this
connection will block proportionally. Prefer chunking into
multiple ``executemany`` calls of 1000-10000 rows so other
threads aren't starved.
"""
self._check_open()
seq = list(seq_of_parameters)
if not seq:
self._rowcount = 0
return
# All parameter tuples must agree on length (= num placeholders).
first_len = len(seq[0])
for i, p in enumerate(seq):
if len(p) != first_len:
raise ProgrammingError(
f"executemany: parameter set [{i}] has {len(p)} values, "
f"expected {first_len} (matching set [0])"
)
# Detect SELECT — not supported in executemany.
first_word = operation.lstrip().split(None, 1)[0].upper() if operation.strip() else ""
if first_word == "SELECT":
raise NotSupportedError("executemany on SELECT is not supported")
sql = _rewrite_numeric_to_qmark(operation)
# Phase 27: full PREPARE+(BIND+EXECUTE)*N+RELEASE round-trip
# under the wire lock — N rows commit atomically with respect
# to other threads on the connection.
with self._conn._wire_lock:
# Reset per-execute state.
self._description = None
self._columns = []
self._column_readers = None # Phase 37
self._rowcount = -1
self._rows = []
self._row_index = -1
self._statement_already_done = False
# Logged-DB transaction guard — same as execute(). Idempotent
# within an open transaction.
self._conn._ensure_transaction()
# PREPARE once.
self._conn._send_pdu(
self._build_prepare_pdu(sql, num_qmarks=first_len)
)
self._read_describe_response()
# Phase 33: pipeline — build all BIND+EXECUTE PDUs first
# (Python work, no I/O), then send them back-to-back, then
# drain all responses. Eliminates the per-row round-trip
# the older serial loop paid.
pdus = [
self._build_bind_execute_pdu(tuple(p)) for p in seq
]
for pdu in pdus:
self._conn._send_pdu(pdu)
# Drain N responses. The first error is captured but we
# still drain the rest (they're SQ_ERRs for the aborted
# transaction's queued rows) so the wire stays consistent.
#
# Wire-framing invariant: each response — whether SQ_DONE
# for a successful row or SQ_ERR for a failed one — ends
# with its own SQ_EOT. ``_raise_sq_err`` self-drains the
# SQ_ERR's trailing SQ_EOT (see connections.py:_raise_sq_err
# drain loop). So calling ``_drain_to_eot`` exactly N times
# consumes exactly the responses for N PDUs, regardless of
# how many succeeded vs. failed. If ``_raise_sq_err`` is
# ever refactored to leave its trailing EOT for the caller,
# this loop silently desyncs — the test
# ``test_executemany_pipeline.py`` is the tripwire.
total_rowcount = 0
first_error: Exception | None = None
first_error_row: int | None = None
for i in range(len(pdus)):
self._rowcount = -1
try:
self._drain_to_eot()
except Exception as exc:
if first_error is None:
first_error = exc
first_error_row = i
continue
if self._rowcount > 0:
total_rowcount += self._rowcount
# RELEASE once.
self._conn._send_pdu(self._build_release_pdu())
self._drain_to_eot()
if first_error is not None:
# Annotate which row in the batch first failed by
# PREPENDING to the existing message — preserves the
# ``[<sqlcode>] <text>`` prefix that string-scraping
# callers may rely on, and keeps the exception class
# + structured fields (.sqlcode, .isamcode, .near).
if first_error.args:
first_error.args = (
f"executemany row {first_error_row}/{len(pdus)}: "
f"{first_error.args[0]}",
*first_error.args[1:],
)
raise first_error
self._rowcount = total_rowcount
def fetchone(self) -> tuple | None:
"""Return the next row, or None at EOF.
Non-scrollable: returns ``self._rows[_row_index + 1]`` from the
materialized result set and advances the index.
Scrollable: sends ``SQ_SFETCH(ABSOLUTE, current+1)`` to the
server. We use scrolltype=6 with a computed target rather than
scrolltype=1 because JDBC's ``IfxResultSet.next()`` does the
same — target=0 with scrolltype=1 is interpreted by the server
as "scan to last", not "next sequential".
"""
self._check_open()
if self._scrollable:
target = self._row_index + 2 # current is 0-indexed; SFETCH wants 1-indexed (current+1) +1 for "next"
return self._sfetch_at(scrolltype=6, target=target)
if self._description is None or not self._rows:
return None
nxt = self._row_index + 1
if nxt >= len(self._rows):
self._row_index = len(self._rows) # past-last
return None
self._row_index = nxt
return self._rows[nxt]
def fetchmany(self, size: int | None = None) -> list[tuple]:
self._check_open()
n = size if size is not None else self.arraysize
out: list[tuple] = []
for _ in range(n):
row = self.fetchone()
if row is None:
break
out.append(row)
return out
def fetchall(self) -> list[tuple]:
"""Return all remaining rows from the current position to the end.
Non-scrollable: slice from the materialized result set.
Scrollable: sequentially SFETCH(NEXT) until EOF — N round-trips
for N rows. For huge result sets, prefer indexed access via
``fetch_absolute`` if you don't actually need every row.
"""
self._check_open()
if self._scrollable:
out: list[tuple] = []
while (row := self.fetchone()) is not None:
out.append(row)
return out
if self._description is None or not self._rows:
return []
start = self._row_index + 1
out = self._rows[start:]
self._row_index = len(self._rows)
return list(out)
# -- Phase 17/18: scroll cursor API -----------------------------------
def scroll(self, value: int, mode: str = "relative") -> None:
"""Move the cursor position. PEP 249-compatible.
``mode='relative'`` (default): move ``value`` rows forward
(negative = backward). ``mode='absolute'``: jump to row ``value``
(0-indexed; the next ``fetchone()`` returns the row at ``value``).
Raises :class:`IndexError` if the target falls outside the result
set (per PEP 249). For non-scrollable cursors, this is enforced
eagerly using the materialized result-set length. For scrollable
cursors, only out-of-range NEGATIVE positions raise immediately
— positions past the end are detected lazily on the next fetch
(returns None).
"""
self._check_open()
if self._description is None:
raise ProgrammingError("no result set; call execute() first")
if mode == "relative":
target = self._row_index + value
elif mode == "absolute":
target = value - 1 # absolute(0) = before first; absolute(N) = at row N-1
else:
raise ProgrammingError(
f"scroll mode must be 'relative' or 'absolute', got {mode!r}"
)
if self._scrollable:
if target < -1:
raise IndexError(
f"scroll target out of range: position {target}"
)
self._row_index = target
return
if target < -1 or target >= len(self._rows):
raise IndexError(
f"scroll target out of range: position {target} "
f"vs. result set of {len(self._rows)} rows"
)
self._row_index = target
def fetch_first(self) -> tuple | None:
"""Reset to before-first then fetch row 0 / SFETCH(ABSOLUTE, 1)."""
self._check_open()
if self._scrollable:
self._row_index = -1 # before-first
return self._sfetch_at(scrolltype=6, target=1)
self._row_index = -1
return self.fetchone()
def fetch_last(self) -> tuple | None:
"""Position at and return the last row (None if empty)."""
self._check_open()
if self._scrollable:
# SFETCH(LAST=4) returns the last row and tells us the count.
return self._sfetch_at(scrolltype=4, target=0, is_last_probe=True)
if not self._rows:
return None
self._row_index = len(self._rows) - 1
return self._rows[self._row_index]
def fetch_prior(self) -> tuple | None:
"""Move backward one row and return it (None if before-first)."""
self._check_open()
if self._scrollable:
prev = self._row_index - 1 if self._row_index >= 0 else -1
if prev < 0:
self._row_index = -1
return None
return self._sfetch_at(scrolltype=6, target=prev + 1)
prev = self._row_index - 1
if prev < 0:
self._row_index = -1
return None
self._row_index = prev
return self._rows[prev]
def fetch_absolute(self, n: int) -> tuple | None:
"""Position at row ``n`` (0-indexed) and return it.
Negative ``n`` indexes from the end (Python-style):
``fetch_absolute(-1)`` returns the last row. For scrollable
cursors, negative indexes need the row count, which is
discovered (cached) via a one-time ``SFETCH(LAST)`` probe.
"""
self._check_open()
if self._scrollable:
if n < 0:
# Need total row count for negative indexing — cache it.
if self._scroll_total_rows is None:
saved = self._row_index
self._sfetch_at(scrolltype=4, target=0, is_last_probe=True)
self._row_index = saved # restore
if self._scroll_total_rows is None:
return None # empty
n = self._scroll_total_rows + n
if n < 0:
return None
return self._sfetch_at(scrolltype=6, target=n + 1)
if not self._rows:
return None
if n < 0:
n = len(self._rows) + n
if n < 0 or n >= len(self._rows):
return None
self._row_index = n
return self._rows[n]
def fetch_relative(self, n: int) -> tuple | None:
"""Move ``n`` rows from the current position and return that row.
``n=1`` is equivalent to ``fetchone``, ``n=-1`` to ``fetch_prior``.
Returns None if the target falls outside the result set.
"""
self._check_open()
if self._scrollable:
target = self._row_index + n
if target < 0:
return None
return self._sfetch_at(scrolltype=6, target=target + 1)
if not self._rows:
return None
target = self._row_index + n
if target < 0 or target >= len(self._rows):
return None
self._row_index = target
return self._rows[target]
@property
def rownumber(self) -> int | None:
"""Current 0-indexed row position, or None if no result set / before-first."""
if self._description is None or self._row_index < 0:
return None
return self._row_index
def _sfetch_at(
self, scrolltype: int, target: int, *, is_last_probe: bool = False
) -> tuple | None:
"""Send SQ_SFETCH and parse the single-tuple response.
``scrolltype``: 1=NEXT, 4=LAST (probes for end-of-cursor and
returns the last row), 6=ABSOLUTE (target is 1-indexed row).
Side-effects:
- Updates ``self._row_index`` to reflect the new position
(from the server's authoritative ``SQ_TUPID`` response).
- Caches ``self._scroll_total_rows`` after a LAST probe.
- Returns the row tuple, or None if the target is past-end.
"""
if not self._server_cursor_open:
raise ProgrammingError(
"scrollable cursor is not open; call execute() first"
)
# Phase 27: hold the wire lock for the SFETCH round-trip.
# Cheap (RLock, single op), and lets every scrollable-cursor
# caller (fetchone/fetchmany/fetchall/scroll/fetch_*) get the
# serialization for free.
with self._conn._wire_lock:
prior_count = len(self._rows)
self._last_tupid = None
self._conn._send_pdu(self._build_sfetch_pdu(scrolltype, target))
self._read_fetch_response()
new_count = len(self._rows)
if new_count == prior_count:
# No tuple arrived — past-end or empty result set.
# Don't move _row_index forward speculatively; let the
# caller observe the None return.
return None
row = self._rows[-1]
# Update position from the server's TUPID (authoritative).
# SQ_TUPID arrives in every scrollable-cursor response and
# carries the 1-indexed row position the server delivered.
if self._last_tupid is not None:
self._row_index = self._last_tupid - 1 # → 0-indexed
if scrolltype == 4 or is_last_probe:
# SFETCH(LAST) — TUPID == total row count
self._scroll_total_rows = self._last_tupid
return row
def close(self) -> None:
"""Close the cursor.
Non-scrollable: idempotent local cleanup.
Scrollable: sends ``SQ_CLOSE`` + ``SQ_RELEASE`` to free the
server-side cursor before marking the local cursor closed.
"""
if self._closed:
return
if self._scrollable and self._server_cursor_open:
# Phase 27: hold the wire lock during CLOSE+RELEASE so we
# don't interleave with another thread's pending op on the
# connection. Best-effort: any wire failure here is
# swallowed (the caller is closing; we don't want to mask
# whatever caused them to close).
try:
with self._conn._wire_lock:
self._conn._send_pdu(self._build_close_pdu())
self._drain_to_eot()
self._conn._send_pdu(self._build_release_pdu())
self._drain_to_eot()
except Exception:
pass
self._server_cursor_open = False
# Phase 28: explicit close has handled the server-side resources
# (or tried to). Disarm the finalizer so it doesn't fire later
# for nothing — and clear the state flag as a belt-and-suspenders
# measure in case detach() somehow misses (e.g., already-running
# callback on another thread).
self._finalizer_state[0] = False
self._finalizer.detach()
self._closed = True
self._row_index = len(self._rows) # mark exhausted
def __iter__(self) -> Iterator[tuple]:
return self
def __next__(self) -> tuple:
row = self.fetchone()
if row is None:
raise StopIteration
return row
def __enter__(self) -> Cursor:
return self
def __exit__(self, *_exc: object) -> None:
self.close()
# -- internals ---------------------------------------------------------
def _check_open(self) -> None:
if self._closed:
raise InterfaceError("cursor is closed")
if self._conn.closed:
raise InterfaceError("connection is closed")
# -- PDU builders -----------------------------------------------------
def _build_prepare_pdu(self, sql: str, num_qmarks: int = 0) -> bytes:
"""SQ_PREPARE + SQ_NDESCRIBE + SQ_WANTDONE + SQ_EOT.
Per IfxSqli.sendPrepare. SQL uses 4-byte length prefix on modern
servers (isRemove64KLimitSupported), with even-byte alignment pad.
``num_qmarks`` is the count of ``?`` placeholders in the SQL.
"""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_PREPARE)
writer.write_short(num_qmarks)
sql_bytes = sql.encode("iso-8859-1")
writer.write_int(len(sql_bytes))
writer.write_bytes(sql_bytes)
if (4 + len(sql_bytes)) & 1:
writer.write_byte(0) # writeChar pad
writer.write_short(MessageType.SQ_NDESCRIBE)
writer.write_short(MessageType.SQ_WANTDONE)
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _emit_bind_params(self, writer: object, params: tuple) -> list[bytes]:
"""Emit SQ_BIND per-param blocks. Returns the list of blob bytes
(in order, NULL params skipped) that need streaming via SQ_BBIND.
Routes through ``encode_param`` for each non-None param, then
either:
- Writes the raw bytes inline (normal types)
- Writes the 56-byte blob descriptor inline AND queues the
real content for the post-SQ_BIND blob stream (BYTE/TEXT)
"""
from ._types import IfxType # local import to avoid cycle
blob_payloads: list[bytes] = []
for value in params:
if value is None:
writer.write_short(0)
writer.write_short(-1)
writer.write_short(0)
continue
ifx_type, prec, raw = encode_param(value, encoding=self._conn.encoding)
writer.write_short(ifx_type)
writer.write_short(0) # indicator = 0 (non-null)
writer.write_short(prec)
writer.write_padded(raw)
if ifx_type in (int(IfxType.BYTE), int(IfxType.TEXT)):
# The encoder put a 56-byte descriptor inline; queue the
# actual bytes for the SQ_BBIND/SQ_BLOB stream.
# ``bytes`` and ``bytearray`` flow through here; ``str``
# for TEXT is converted to bytes per ``CLIENT_LOCALE``.
payload = (
value.encode(self._conn.encoding)
if isinstance(value, str)
else bytes(value)
)
blob_payloads.append(payload)
return blob_payloads
def _emit_blob_stream(
self, writer: object, blob_payloads: list[bytes]
) -> None:
"""Emit SQ_BBIND + SQ_BLOB chunks + zero-length terminator.
Wire layout per ``IfxSqli.sendBlob`` line 3328 + ``sendStreamBlob``
line 3482:
``[short SQ_BBIND=41][short blob_count]``
for each blob:
while bytes remain:
``[short SQ_BLOB=39][short chunk_len][padded data]``
``[short SQ_BLOB=39][short 0]`` # end-of-blob
"""
if not blob_payloads:
return
writer.write_short(MessageType.SQ_BBIND)
writer.write_short(len(blob_payloads))
chunk_size = 1024 # JDBC's sendStreamBlob hardcodes this
for blob in blob_payloads:
offset = 0
while offset < len(blob):
chunk = blob[offset : offset + chunk_size]
writer.write_short(MessageType.SQ_BLOB)
writer.write_short(len(chunk))
writer.write_padded(chunk)
offset += len(chunk)
# Zero-length terminator marks end of THIS blob
writer.write_short(MessageType.SQ_BLOB)
writer.write_short(0)
def _build_bind_only_pdu(self, params: tuple) -> bytes:
"""SQ_BIND with parameter values + SQ_EOT (no SQ_EXECUTE).
Used for parameterized SELECT — the cursor open (CURNAME+NFETCH)
is what triggers query execution; SQ_BIND just binds the values
in scope for the prepared statement.
"""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_BIND)
writer.write_short(len(params))
blob_payloads = self._emit_bind_params(writer, params)
self._emit_blob_stream(writer, blob_payloads)
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _build_bind_execute_pdu(self, params: tuple) -> bytes:
"""SQ_BIND + (optional SQ_BBIND blob stream) + SQ_EXECUTE + SQ_EOT.
From the JDBC capture (msg[29] in 02-dml-cycle.socat.log) plus
the BYTE/TEXT extension found in IfxSqli.sendBind line 998+:
[short SQ_ID=4][int 5=SQ_BIND][short numparams]
for each param:
[short type][short indicator][short prec]
writePadded(data) # data + 0-pad if odd-len
-- if any blob params --
[short SQ_BBIND=41][short blob_count]
for each blob: SQ_BLOB chunks ending with [SQ_BLOB][0]
[short SQ_EXECUTE=7]
[short SQ_EOT]
"""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_BIND) # action = 5
writer.write_short(len(params))
blob_payloads = self._emit_bind_params(writer, params)
self._emit_blob_stream(writer, blob_payloads)
writer.write_short(MessageType.SQ_EXECUTE) # 7
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _build_curname_nfetch_pdu(self, cursor_name: str) -> bytes:
"""SQ_ID(CURNAME) + SQ_ID(NFETCH 4096) chained.
From the JDBC capture (msg[21]):
[short SQ_ID=4][int 3][short nameLen][bytes name][short 6]
[short SQ_ID=4][int 9][int 4096][int 0]
[short SQ_EOT]
The trailing ``[short 6]`` after the cursor name is the
``SQ_OPEN`` action — JDBC chains ``CURNAME → OPEN → NFETCH``
in one PDU.
"""
writer, buf = make_pdu_writer()
# CURNAME
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_CURNAME) # action = 3
name_bytes = cursor_name.encode("ascii")
writer.write_short(len(name_bytes))
writer.write_bytes(name_bytes)
if len(name_bytes) & 1:
writer.write_byte(0)
writer.write_short(MessageType.SQ_OPEN) # 6
# NFETCH (note: trailing field is a SHORT, not an int —
# caught by byte-diff against JDBC's 42-byte reference PDU,
# see docs/CAPTURES/14-py-varchar-fail.socat.log analysis)
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_NFETCH) # action = 9
writer.write_int(4096) # max bytes per fetch
writer.write_short(0) # reserved short (NOT int)
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _build_curname_scroll_open_pdu(self, cursor_name: str) -> bytes:
"""Open a scrollable cursor: SQ_CURNAME + SQ_SCROLL + SQ_OPEN.
Per JDBC's ``sendCursorOpen`` line 1413+: when
``ResultSet.TYPE_SCROLL_INSENSITIVE`` is set, JDBC emits
``SQ_SCROLL=24`` immediately before ``SQ_OPEN=6``. The server
treats subsequent fetches as scrollable (random-access via
``SQ_SFETCH``) instead of forward-only.
Phase 18: we don't chain an NFETCH here — scrollable cursors
do per-call ``SQ_SFETCH`` instead.
"""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_CURNAME)
name_bytes = cursor_name.encode("ascii")
writer.write_short(len(name_bytes))
writer.write_bytes(name_bytes)
if len(name_bytes) & 1:
writer.write_byte(0)
writer.write_short(MessageType.SQ_SCROLL) # 24 — mark as scrollable
writer.write_short(MessageType.SQ_OPEN) # 6
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _build_sfetch_pdu(self, scrolltype: int, target: int) -> bytes:
"""SQ_SFETCH (scroll-fetch) PDU.
Wire format verified against JDBC capture
(``tests/reference/ScrollProbe`` against the dev container):
``[short SQ_ID=4][int SQ_SFETCH=23]``
``[short scrolltype]`` (1=NEXT, 4=LAST, 6=ABSOLUTE)
``[int target_row]`` (1-indexed for scrolltype=6)
``[int bufSize=4096]``
``[short SQ_EOT]``
The action code follows the standard ``[short SQ_ID][int action]``
framing of other commands (SQ_BIND, SQ_EXECUTE, etc.). The
cursor being scrolled is implicit: it's the most-recently-named
cursor on this connection. ``sendStatementID`` is a no-op here
because we don't track a separate ``statementType``.
Initial draft used SHORT for ``bufSize`` and it caused the
server to silently hang — same diagnostic pattern as the
SHORT-vs-INT trap from Phase 4.x's CURNAME+NFETCH PDU.
"""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_SFETCH) # 23
writer.write_short(scrolltype)
writer.write_int(target)
writer.write_int(4096) # tuple buffer size — INT, not SHORT
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _build_nfetch_pdu(self) -> bytes:
"""SQ_ID(NFETCH 4096) + SQ_EOT — used to drain remaining rows."""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_NFETCH)
writer.write_int(4096)
writer.write_short(0) # reserved short (matches JDBC, not int)
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _build_execute_pdu(self) -> bytes:
"""SQ_ID(EXECUTE=7) + SQ_EOT — runs the most-recently-prepared statement.
From JDBC capture msg[21] in 02-dml-cycle.socat.log: 8 bytes,
``00 04 00 00 00 07 00 0c``.
"""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_EXECUTE) # action = 7
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _build_close_pdu(self) -> bytes:
"""SQ_ID(CLOSE) + SQ_EOT."""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_CLOSE) # 10
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
def _build_release_pdu(self) -> bytes:
"""SQ_ID(RELEASE) + SQ_EOT."""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_ID)
writer.write_int(MessageType.SQ_RELEASE) # 11
writer.write_short(MessageType.SQ_EOT)
return buf.getvalue()
# -- response readers -------------------------------------------------
def _read_describe_response(self) -> None:
"""Read DESCRIBE (+ optional SQ_INSERTDONE) + DONE + COST + EOT after PREPARE."""
reader = _make_socket_reader(self._conn._sock)
while True:
tag = reader.read_short()
if tag == MessageType.SQ_EOT:
return
elif tag == MessageType.SQ_DESCRIBE:
self._columns, meta = parse_describe(reader)
self._statement_id = meta.get("statement_id", 0)
self._description = (
[c.to_description_tuple() for c in self._columns] if self._columns else None
)
# Phase 37: pre-compile per-column reader strategy. The hot
# row-decode loop in parse_tuple_payload uses this to avoid
# re-running per-row dispatch decisions that depend only
# on column metadata.
if self._columns:
self._column_readers = compile_column_readers(self._columns)
# Phase 38: take it one step further — codegen a
# specialized row decoder for THIS column shape.
# Eliminates the per-column iteration overhead of
# the readers loop. ``None`` if codegen can't
# handle the shape; parse_tuple_payload then
# falls back to the readers-list dispatch.
self._row_decoder = compile_row_decoder(
self._column_readers, self._columns
)
else:
self._column_readers = None
self._row_decoder = None
elif tag == 94: # SQ_INSERTDONE — Informix optimization: literal
# INSERT executed during PREPARE. Payload is:
# readLongInt (10 bytes) — serial8 inserted
# readLongBigint (8 bytes) — bigserial inserted (modern servers)
# See IfxSqli.receiveInsertDone (line 2347).
reader.read_exact(10 + 8)
self._statement_already_done = True
self._rowcount = 1 # best-effort; literal INSERT = 1 row
elif tag == MessageType.SQ_DONE:
self._consume_done(reader)
elif tag == 55: # SQ_COST
reader.read_int()
reader.read_int()
elif tag == MessageType.SQ_XACTSTAT:
reader.read_exact(2 + 2 + 2)
elif tag == MessageType.SQ_ERR:
self._raise_sq_err(reader)
else:
raise DatabaseError(f"unexpected tag in DESCRIBE response: 0x{tag:04x}")
def _read_fetch_response(self) -> None:
"""Read TUPLE* + DONE + COST + EOT after an NFETCH or SFETCH."""
reader = _make_socket_reader(self._conn._sock)
while True:
tag = reader.read_short()
if tag == MessageType.SQ_EOT:
return
elif tag == MessageType.SQ_TUPLE:
row = parse_tuple_payload(
reader,
self._columns,
encoding=self._conn.encoding,
readers=self._column_readers,
row_decoder=self._row_decoder,
)
self._rows.append(row)
elif tag == MessageType.SQ_DONE:
self._consume_done(reader)
elif tag == 55: # SQ_COST
reader.read_int()
reader.read_int()
elif tag == MessageType.SQ_XACTSTAT:
reader.read_exact(2 + 2 + 2)
elif tag == MessageType.SQ_TUPID:
# Phase 18: scrollable-cursor SFETCH responses include
# the 1-indexed row position. Capture for state-update.
self._last_tupid = reader.read_int()
elif tag == 98: # SQ_FILE — server orchestrates a file transfer
self._handle_sq_file(reader)
elif tag == MessageType.SQ_ERR:
self._raise_sq_err(reader)
else:
raise DatabaseError(f"unexpected tag in FETCH response: 0x{tag:04x}")
def _drain_to_eot(self) -> None:
"""Read response stream until SQ_EOT, allowing common tags in between."""
reader = _make_socket_reader(self._conn._sock)
while True:
tag = reader.read_short()
if tag == MessageType.SQ_EOT:
return
elif tag == MessageType.SQ_DONE:
self._consume_done(reader)
elif tag == 55: # SQ_COST
reader.read_int()
reader.read_int()
elif tag == 94: # SQ_INSERTDONE
# serial8 (10 bytes) + bigserial (8 bytes)
reader.read_exact(10 + 8)
# If the server sent INSERTDONE, the row was inserted.
# Track best-effort rowcount = 1 for literal-value INSERTs.
if self._rowcount < 0:
self._rowcount = 1
elif tag == MessageType.SQ_XACTSTAT:
# Transaction-state event. Body: 3 shorts (event,
# newLevel, oldLevel) per IfxSqli.receiveXactstat. We
# don't expose these to the user yet but must drain
# them to keep the stream aligned. Logged DBs emit one
# per DML statement.
reader.read_short() # xcEvent
reader.read_short() # xcNewLevel
reader.read_short() # xcOldLevel
elif tag == 98: # SQ_FILE — server orchestrates file transfer
# mid-DML (e.g., INSERT ... filetoblob('X', 'client'))
self._handle_sq_file(reader)
elif tag == MessageType.SQ_ERR:
self._raise_sq_err(reader)
else:
raise DatabaseError(f"unexpected tag while draining: 0x{tag:04x}")
def _consume_done(self, reader: IfxStreamReader) -> None:
"""SQ_DONE: [short warnings][int rowsAffected][int rowid][int serial]."""
reader.read_short() # warnings
rows = reader.read_int()
reader.read_int() # rowid
reader.read_int() # serial
if rows >= 0:
self._rowcount = rows
def _raise_sq_err(self, reader: IfxStreamReader) -> None:
"""Decode SQ_ERR and raise the appropriate PEP 249 exception class.
Wire format (per IfxSqli.receiveError, line 2717):
[short sqlcode][short isamcode][int offset]
[short near_token_len][bytes name][optional pad][short SQ_EOT]
The "near" token is the object name where the error occurred
(e.g. table or column name for "not found" errors). Empty for
most syntax errors.
"""
from ._protocol import ProtocolError
sqlcode = reader.read_short()
isamcode = reader.read_short()
offset = reader.read_int()
near_token = ""
# Phase 28: catch specific (truncation / socket) errors during
# near-token parse — leave near_token empty if it can't be
# decoded; the user still gets the right exception class with
# sqlcode. Other exception types (programming errors, etc.)
# propagate so they aren't masked.
try:
name_len = reader.read_short()
if name_len > 0:
raw = reader.read_exact(name_len)
if name_len & 1:
reader.read_exact(1) # pad to even
near_token = raw.rstrip(b"\x00").decode("iso-8859-1", errors="replace")
except (ProtocolError, OSError):
pass
# Drain remaining bytes until SQ_EOT. Phase 28: a (ProtocolError,
# OSError) during drain means the wire is in an unknown state —
# force-close the connection so subsequent operations don't
# inherit the desync. Previously bare ``except: pass`` masked
# this and let a poisoned connection survive.
try:
while True:
t = reader.read_short()
if t == MessageType.SQ_EOT:
break
except (ProtocolError, OSError):
with contextlib.suppress(Exception):
self._conn.close()
text = _errcodes.text_for(sqlcode)
exc_class = _errcodes.exception_for(sqlcode)
msg = f"[{sqlcode}] {text} (near {near_token!r})" if near_token else f"[{sqlcode}] {text}"
if isamcode != 0:
msg += f" [ISAM {isamcode}]"
if offset > 0:
msg += f" (offset {offset})"
exc = exc_class(msg)
# Attach structured fields for programmatic inspection.
exc.sqlcode = sqlcode
exc.isamcode = isamcode
exc.offset = offset
exc.near = near_token
raise exc
class _SocketReader(IfxStreamReader):
"""``IfxStreamReader`` backed by an ``IfxSocket`` — pulls bytes from the wire on demand."""
def __init__(self, sock):
self._sock = sock
from io import BytesIO
super().__init__(BytesIO(b""))
def read_exact(self, n: int) -> bytes:
return self._sock.read_exact(n)
def read_short(self) -> int:
return struct.unpack("!h", self.read_exact(2))[0]
def read_int(self) -> int:
return struct.unpack("!i", self.read_exact(4))[0]