From ad55391bf1b57a119735b2d75512e645e6538da4 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Fri, 8 May 2026 01:40:54 -0600 Subject: [PATCH] Phase 39: Connection-scoped read-ahead buffer (2026.05.05.12) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the bulk-fetch gap to within ~7-15% of IfxPy. Lever was the buffer/I/O machinery, not the codec — Phase 37/38 had already brought the codec close to IfxPy's C path; the remaining gap was ~450k read_exact calls per 100k-row fetch, each doing its own recv-loop and bytes.join. Architecture: IfxSocket owns a connection-scoped bytearray + integer offset cursor; BufferedSocketReader is a thin parser-view that delegates buffer-fill to the socket. One recv() per ~64 KB instead of per field. This is how asyncpg (buffer.pyx) and psycopg3 (pq.PGconn) structure their read paths. The buffer MUST be socket-scoped, not reader-scoped: the pipelined- executemany path (Phase 33) streams N responses back-to-back across multiple cursor reads, and a per-reader buffer would throw away pre-fetched bytes when one reader is destroyed. (The first iteration of this phase tried per-reader and hung on test_executemany_1000_rows.) A/B vs Phase 38, same harness, warmed cache: select_scaling_1000 2.90 -> 1.72 ms (-41%) select_scaling_10000 24.32 -> 16.08 ms (-34%) select_scaling_100000 250.36 -> 168.98 ms (-32%) Head-to-head vs IfxPy 2.0.7: select_scaling_1000 1.05x (basically tied) select_scaling_10000 1.07x select_scaling_100000 1.15x IQR collapsed 9x at 100k (3.6 ms -> 0.4 ms) — fewer recvs means fewer scheduler/jitter pulses showing up in the measurement. Default ON. Set IFX_BUFFERED_READER=0 to fall back to the legacy reader (still tested in CI as the escape hatch). Both paths green: 251/251 integration tests pass on each. --- CHANGELOG.md | 63 +++++++++++++++++++++ pyproject.toml | 2 +- src/informix_db/_protocol.py | 106 +++++++++++++++++++++++++++++++++++ src/informix_db/_socket.py | 51 ++++++++++++++++- src/informix_db/cursors.py | 30 ++++++++-- 5 files changed, 245 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63af38a..22f75ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,69 @@ All notable changes to `informix-db`. Versioning is [CalVer](https://calver.org/) — `YYYY.MM.DD` for date-based releases, `YYYY.MM.DD.N` for same-day post-releases per PEP 440. +## 2026.05.05.12 — Phase 39: Connection-scoped read-ahead buffer + +Closes the C-vs-Python bulk-fetch gap to within ~7-15% of IfxPy. The lever was the buffer/I/O machinery, not the codec — Phase 37/38 had already brought the codec to within ~25% of IfxPy's C path; the remaining gap was 450k+ ``read_exact`` calls per 100k-row fetch, each doing its own ``recv``-loop and ``bytes.join``. + +### What changed + +`src/informix_db/_socket.py`: + +- ``IfxSocket`` now owns a connection-scoped read-ahead buffer (``_recv_buf`` bytearray + ``_recv_pos`` integer cursor) and exposes ``fill_recv_buf(need)`` to top it up from the socket. +- One ``recv()`` per ~64 KB instead of per field. For a 100k-row SELECT, syscall count drops from ~450k to a few hundred. +- Compaction: when ``_recv_pos > recv_size``, the consumed prefix is sliced off in place. Memory is bounded at roughly ``2 * recv_size`` for the connection's lifetime. + +`src/informix_db/_protocol.py`: + +- New ``BufferedSocketReader(IfxStreamReader)`` — a thin parser-view over the ``IfxSocket``'s persistent buffer. Each method delegates the buffer-fill to the socket, then reads via ``struct.unpack_from(buf, offset)`` directly out of the bytearray (avoiding the intermediate slice the legacy reader created). + +`src/informix_db/cursors.py`: + +- ``_make_socket_reader(sock)`` chooses ``BufferedSocketReader`` (default) or the legacy ``_SocketReader`` based on ``IFX_BUFFERED_READER``. Set ``IFX_BUFFERED_READER=0`` to fall back to the legacy reader. +- The four cursor sites that build a reader now call the helper. + +### Why the buffer is on `IfxSocket`, not on the reader + +The first iteration of Phase 39 put the bytearray on the reader. That hung on `test_executemany_1000_rows` — the pipelined-executemany path (Phase 33) streams N responses back-to-back across multiple cursor reads, and a per-reader buffer threw away pre-fetched bytes when one reader was destroyed and the next was created. The next ``recv()`` then blocked waiting for bytes that had already been consumed. + +Moving the buffer to ``IfxSocket`` mirrors how `asyncpg` (`buffer.pyx` lives on the protocol object) and `psycopg3` (`pq.PGconn`) structure their read paths. The reader is a short-lived view; the buffer outlives it. + +### Performance + +A/B from the same harness, same Docker container, same ``p34_select`` table, warmed cache: + +| Workload | Phase 38 | Phase 39 | Δ | +|---|---:|---:|---:| +| `select_scaling_1000` | 2.901 ms | **1.716 ms** | **-41%** | +| `select_scaling_10000` | 24.317 ms | **16.084 ms** | **-34%** | +| `select_scaling_100000` | 250.363 ms | **168.982 ms** | **-32%** | + +Against IfxPy 2.0.7 (C-bound, ODBC) on the same workloads, head-to-head: + +| Workload | IfxPy | informix-db Phase 39 | Ratio | +|---|---:|---:|---:| +| `select_scaling_1000` | 1.637 ms | 1.716 ms | **1.05×** | +| `select_scaling_10000` | 15.073 ms | 16.084 ms | **1.07×** | +| `select_scaling_100000` | 147.361 ms | 168.982 ms | **1.15×** | + +The bulk-fetch gap that was 2.4× before Phase 37 is now within 5-15% of IfxPy. With IfxPy's measurement IQR at ~21% on the 100k workload (Docker→host loopback noise) vs our IQR at ~0.2%, the headline ratio is partly noise — the real gap is plausibly 1.05-1.2×. + +A pure-Python driver running within ~10% of an ODBC/C-bound driver on bulk fetch is, as far as I'm aware, unprecedented on the SQLI protocol. + +### Tests + +All 251 integration tests pass with the new default reader. The legacy reader is exercised via ``IFX_BUFFERED_READER=0`` (also passes 251/251). Both unit suites pass. + +### Migration + +No code changes required. The default reader switches at version bump. To opt out: + +```bash +IFX_BUFFERED_READER=0 python myapp.py +``` + +If you find a wire shape the buffered reader doesn't handle, please file an issue with the ``IFX_BUFFERED_READER=0`` workaround as the immediate mitigation. + ## 2026.05.05.11 — Phase 38: `exec()`-based row-decoder codegen Closes more of the C-vs-Python codec gap on bulk fetch by emitting a specialized row decoder per result-set shape via `exec(compile(src, ...))` and inlining the common fixed-width decode bodies directly into the generated source. This is the lever flagged in the Phase 37 changelog as "the next lever for materially closing the gap." diff --git a/pyproject.toml b/pyproject.toml index 528a57f..53f556d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "informix-db" -version = "2026.05.05.11" +version = "2026.05.05.12" description = "Pure-Python driver for IBM Informix IDS — speaks the SQLI wire protocol over raw sockets. No CSDK, no JVM, no native libraries." readme = "README.md" license = { text = "MIT" } diff --git a/src/informix_db/_protocol.py b/src/informix_db/_protocol.py index 81e3fd8..fe5e4d1 100644 --- a/src/informix_db/_protocol.py +++ b/src/informix_db/_protocol.py @@ -209,6 +209,112 @@ class IfxStreamReader: return raw.decode(encoding) +class BufferedSocketReader(IfxStreamReader): + """Phase 39: thin parser-view over the ``IfxSocket``'s persistent + read-ahead buffer. + + The buffer itself (``_recv_buf`` + ``_recv_pos``) lives on the + ``IfxSocket`` so it survives across reader instances. This is + critical for the pipelined-executemany path (Phase 33), which + streams N responses back-to-back — a per-reader buffer would lose + pre-fetched bytes when one reader is destroyed and the next is + created. + + Why this is faster than ``_SocketReader``: + + * **One ``recv()`` per ~64 KB** instead of one per field. A 100k-row + SELECT calls ``read_exact`` ~450k times today; with this reader, + ``recv`` fires ~hundreds of times instead. + * **No ``bytes.join``.** The bytearray IS the buffer; reads are + slices, not chunk-joins. + * **``struct.unpack_from(buf, offset)``** for fixed-width ints — reads + directly from the bytearray at offset, avoiding the intermediate + slice the legacy reader created. + + Each method reads through ``self._sock._recv_buf`` directly. The + ``IfxSocket`` exposes ``fill_recv_buf(n)`` to ensure ``n`` bytes + are available from the current cursor position; this method + handles ``recv()``, compaction, and EOF detection. + + ``read_exact`` returns a fresh ``bytes`` (not a memoryview) — + copying is cheap and decouples consumer lifetime from buffer + compaction, removing a class of use-after-compact bugs at + near-zero cost. + """ + + __slots__ = ("_sock",) + + def __init__(self, sock): + # Skip super().__init__ to avoid the unused BytesIO allocation + # — we override every method that touches ``self._source``. + self._sock = sock + + def read_exact(self, n: int) -> bytes: + if n <= 0: + return b"" + sock = self._sock + sock.fill_recv_buf(n) + buf = sock._recv_buf + pos = sock._recv_pos + end = pos + n + out = bytes(buf[pos:end]) + sock._recv_pos = end + return out + + def read_byte(self) -> int: + sock = self._sock + sock.fill_recv_buf(1) + b = sock._recv_buf[sock._recv_pos] + sock._recv_pos += 1 + return b + + def skip(self, n: int) -> None: + sock = self._sock + sock.fill_recv_buf(n) + sock._recv_pos += n + + def read_padded(self, n: int) -> bytes: + out = self.read_exact(n) + if n & 1: + self.skip(1) + return out + + def read_short(self) -> int: + sock = self._sock + sock.fill_recv_buf(2) + v = _FMT_SHORT.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 2 + return v + + def read_int(self) -> int: + sock = self._sock + sock.fill_recv_buf(4) + v = _FMT_INT.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 4 + return v + + def read_long_bigint(self) -> int: + sock = self._sock + sock.fill_recv_buf(8) + v = _FMT_LONG.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 8 + return v + + def read_real(self) -> float: + sock = self._sock + sock.fill_recv_buf(4) + v = _FMT_FLOAT.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 4 + return v + + def read_double(self) -> float: + sock = self._sock + sock.fill_recv_buf(8) + v = _FMT_DOUBLE.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 8 + return v + + def make_pdu_writer() -> tuple[IfxStreamWriter, BytesIO]: """Convenience: create a writer backed by a fresh in-memory buffer. diff --git a/src/informix_db/_socket.py b/src/informix_db/_socket.py index 3a5c0e0..e77faf9 100644 --- a/src/informix_db/_socket.py +++ b/src/informix_db/_socket.py @@ -42,7 +42,16 @@ def _make_default_dev_context() -> ssl.SSLContext: class IfxSocket: - """Owns a connected TCP socket and provides exact-N read/write.""" + """Owns a connected TCP socket and provides exact-N read/write. + + Phase 39 also owns the read-ahead buffer for ``BufferedSocketReader``. + The buffer (``_recv_buf`` + ``_recv_pos``) is connection-scoped, not + reader-scoped — readers are short-lived per-PDU views, but pipelined + responses (e.g., Phase 33's pipelined ``executemany``) can stream + multiple responses across reader boundaries. Persisting the buffer + here means we never throw away pre-fetched bytes when one reader + is destroyed and the next one is created. + """ def __init__( self, @@ -58,6 +67,11 @@ class IfxSocket: self._port = port self._read_timeout = read_timeout self._sock: socket.socket | None = None + # Phase 39 read-ahead buffer. Empty until a BufferedSocketReader + # touches us; subsequent readers share this buffer state. + self._recv_buf: bytearray = bytearray() + self._recv_pos: int = 0 + self._recv_size: int = 65536 try: sock = socket.create_connection((host, port), timeout=connect_timeout) @@ -130,6 +144,41 @@ class IfxSocket: remaining -= len(chunk) return b"".join(chunks) + def fill_recv_buf(self, need: int) -> None: + """Phase 39: ensure the read-ahead buffer holds ``need`` bytes + forward of ``_recv_pos``, recv'ing from the socket as needed. + + Compaction: when the read cursor has advanced past + ``_recv_size`` bytes, slice off the consumed prefix in place. + That bounds memory at roughly ``2 * recv_size`` while still + amortizing recv calls over many fields. + """ + if self._sock is None: + raise InterfaceError("socket is closed") + avail = len(self._recv_buf) - self._recv_pos + if avail >= need: + return + if self._recv_pos > self._recv_size: + del self._recv_buf[: self._recv_pos] + self._recv_pos = 0 + avail = len(self._recv_buf) + target = max(need, self._recv_size) + sock = self._sock + while avail < need: + try: + chunk = sock.recv(target - avail) + except OSError as e: + self._force_close() + raise OperationalError(f"read failed: {e}") from e + if not chunk: + self._force_close() + raise OperationalError( + f"server closed connection mid-read " + f"(wanted {need} bytes, got {avail})" + ) + self._recv_buf.extend(chunk) + avail += len(chunk) + def close(self) -> None: """Close the socket. Idempotent and never raises.""" if self._sock is None: diff --git a/src/informix_db/cursors.py b/src/informix_db/cursors.py index 15d1799..65c7070 100644 --- a/src/informix_db/cursors.py +++ b/src/informix_db/cursors.py @@ -22,6 +22,7 @@ from __future__ import annotations import contextlib import itertools import logging +import os import struct import weakref from collections.abc import Iterator @@ -29,7 +30,7 @@ from typing import TYPE_CHECKING, Any from . import _errcodes from ._messages import MessageType -from ._protocol import IfxStreamReader, make_pdu_writer +from ._protocol import BufferedSocketReader, IfxStreamReader, make_pdu_writer from ._resultset import ( ColumnInfo, compile_column_readers, @@ -77,6 +78,25 @@ del _build_static_pdu _log = logging.getLogger(__name__) +# Phase 39: buffered socket reader is the default. The reader holds a +# connection-scoped bytearray + offset cursor on ``IfxSocket``, so one +# ``recv()`` per ~64 KB amortizes over hundreds of fields rather than +# the per-field-recv pattern the legacy ``_SocketReader`` uses. +# +# Set ``IFX_BUFFERED_READER=0`` to fall back to the legacy reader if +# you hit a wire shape we haven't covered. The legacy path is kept as +# the escape hatch (and as the A/B baseline if we ever need to bisect +# a regression). Read once at module import so the check isn't on the +# hot path. +_USE_BUFFERED_READER = os.environ.get("IFX_BUFFERED_READER", "1") != "0" + + +def _make_socket_reader(sock): + if _USE_BUFFERED_READER: + return BufferedSocketReader(sock) + return _SocketReader(sock) + + def _finalize_cursor( conn_ref: weakref.ReferenceType, state: list, @@ -770,7 +790,7 @@ class Cursor: writer.write_short(MessageType.SQ_EOT) self._conn._send_pdu(buf.getvalue()) - reader = _SocketReader(self._conn._sock) + reader = _make_socket_reader(self._conn._sock) chunks: list[bytes] = [] while True: tag = reader.read_short() @@ -1538,7 +1558,7 @@ class Cursor: def _read_describe_response(self) -> None: """Read DESCRIBE (+ optional SQ_INSERTDONE) + DONE + COST + EOT after PREPARE.""" - reader = _SocketReader(self._conn._sock) + reader = _make_socket_reader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: @@ -1589,7 +1609,7 @@ class Cursor: def _read_fetch_response(self) -> None: """Read TUPLE* + DONE + COST + EOT after an NFETCH or SFETCH.""" - reader = _SocketReader(self._conn._sock) + reader = _make_socket_reader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: @@ -1623,7 +1643,7 @@ class Cursor: def _drain_to_eot(self) -> None: """Read response stream until SQ_EOT, allowing common tags in between.""" - reader = _SocketReader(self._conn._sock) + reader = _make_socket_reader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: