diff --git a/CHANGELOG.md b/CHANGELOG.md index 63af38a..22f75ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,69 @@ All notable changes to `informix-db`. Versioning is [CalVer](https://calver.org/) — `YYYY.MM.DD` for date-based releases, `YYYY.MM.DD.N` for same-day post-releases per PEP 440. +## 2026.05.05.12 — Phase 39: Connection-scoped read-ahead buffer + +Closes the C-vs-Python bulk-fetch gap to within ~7-15% of IfxPy. The lever was the buffer/I/O machinery, not the codec — Phase 37/38 had already brought the codec to within ~25% of IfxPy's C path; the remaining gap was 450k+ ``read_exact`` calls per 100k-row fetch, each doing its own ``recv``-loop and ``bytes.join``. + +### What changed + +`src/informix_db/_socket.py`: + +- ``IfxSocket`` now owns a connection-scoped read-ahead buffer (``_recv_buf`` bytearray + ``_recv_pos`` integer cursor) and exposes ``fill_recv_buf(need)`` to top it up from the socket. +- One ``recv()`` per ~64 KB instead of per field. For a 100k-row SELECT, syscall count drops from ~450k to a few hundred. +- Compaction: when ``_recv_pos > recv_size``, the consumed prefix is sliced off in place. Memory is bounded at roughly ``2 * recv_size`` for the connection's lifetime. + +`src/informix_db/_protocol.py`: + +- New ``BufferedSocketReader(IfxStreamReader)`` — a thin parser-view over the ``IfxSocket``'s persistent buffer. Each method delegates the buffer-fill to the socket, then reads via ``struct.unpack_from(buf, offset)`` directly out of the bytearray (avoiding the intermediate slice the legacy reader created). + +`src/informix_db/cursors.py`: + +- ``_make_socket_reader(sock)`` chooses ``BufferedSocketReader`` (default) or the legacy ``_SocketReader`` based on ``IFX_BUFFERED_READER``. Set ``IFX_BUFFERED_READER=0`` to fall back to the legacy reader. +- The four cursor sites that build a reader now call the helper. + +### Why the buffer is on `IfxSocket`, not on the reader + +The first iteration of Phase 39 put the bytearray on the reader. That hung on `test_executemany_1000_rows` — the pipelined-executemany path (Phase 33) streams N responses back-to-back across multiple cursor reads, and a per-reader buffer threw away pre-fetched bytes when one reader was destroyed and the next was created. The next ``recv()`` then blocked waiting for bytes that had already been consumed. + +Moving the buffer to ``IfxSocket`` mirrors how `asyncpg` (`buffer.pyx` lives on the protocol object) and `psycopg3` (`pq.PGconn`) structure their read paths. The reader is a short-lived view; the buffer outlives it. + +### Performance + +A/B from the same harness, same Docker container, same ``p34_select`` table, warmed cache: + +| Workload | Phase 38 | Phase 39 | Δ | +|---|---:|---:|---:| +| `select_scaling_1000` | 2.901 ms | **1.716 ms** | **-41%** | +| `select_scaling_10000` | 24.317 ms | **16.084 ms** | **-34%** | +| `select_scaling_100000` | 250.363 ms | **168.982 ms** | **-32%** | + +Against IfxPy 2.0.7 (C-bound, ODBC) on the same workloads, head-to-head: + +| Workload | IfxPy | informix-db Phase 39 | Ratio | +|---|---:|---:|---:| +| `select_scaling_1000` | 1.637 ms | 1.716 ms | **1.05×** | +| `select_scaling_10000` | 15.073 ms | 16.084 ms | **1.07×** | +| `select_scaling_100000` | 147.361 ms | 168.982 ms | **1.15×** | + +The bulk-fetch gap that was 2.4× before Phase 37 is now within 5-15% of IfxPy. With IfxPy's measurement IQR at ~21% on the 100k workload (Docker→host loopback noise) vs our IQR at ~0.2%, the headline ratio is partly noise — the real gap is plausibly 1.05-1.2×. + +A pure-Python driver running within ~10% of an ODBC/C-bound driver on bulk fetch is, as far as I'm aware, unprecedented on the SQLI protocol. + +### Tests + +All 251 integration tests pass with the new default reader. The legacy reader is exercised via ``IFX_BUFFERED_READER=0`` (also passes 251/251). Both unit suites pass. + +### Migration + +No code changes required. The default reader switches at version bump. To opt out: + +```bash +IFX_BUFFERED_READER=0 python myapp.py +``` + +If you find a wire shape the buffered reader doesn't handle, please file an issue with the ``IFX_BUFFERED_READER=0`` workaround as the immediate mitigation. + ## 2026.05.05.11 — Phase 38: `exec()`-based row-decoder codegen Closes more of the C-vs-Python codec gap on bulk fetch by emitting a specialized row decoder per result-set shape via `exec(compile(src, ...))` and inlining the common fixed-width decode bodies directly into the generated source. This is the lever flagged in the Phase 37 changelog as "the next lever for materially closing the gap." diff --git a/pyproject.toml b/pyproject.toml index 528a57f..53f556d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "informix-db" -version = "2026.05.05.11" +version = "2026.05.05.12" description = "Pure-Python driver for IBM Informix IDS — speaks the SQLI wire protocol over raw sockets. No CSDK, no JVM, no native libraries." readme = "README.md" license = { text = "MIT" } diff --git a/src/informix_db/_protocol.py b/src/informix_db/_protocol.py index 81e3fd8..fe5e4d1 100644 --- a/src/informix_db/_protocol.py +++ b/src/informix_db/_protocol.py @@ -209,6 +209,112 @@ class IfxStreamReader: return raw.decode(encoding) +class BufferedSocketReader(IfxStreamReader): + """Phase 39: thin parser-view over the ``IfxSocket``'s persistent + read-ahead buffer. + + The buffer itself (``_recv_buf`` + ``_recv_pos``) lives on the + ``IfxSocket`` so it survives across reader instances. This is + critical for the pipelined-executemany path (Phase 33), which + streams N responses back-to-back — a per-reader buffer would lose + pre-fetched bytes when one reader is destroyed and the next is + created. + + Why this is faster than ``_SocketReader``: + + * **One ``recv()`` per ~64 KB** instead of one per field. A 100k-row + SELECT calls ``read_exact`` ~450k times today; with this reader, + ``recv`` fires ~hundreds of times instead. + * **No ``bytes.join``.** The bytearray IS the buffer; reads are + slices, not chunk-joins. + * **``struct.unpack_from(buf, offset)``** for fixed-width ints — reads + directly from the bytearray at offset, avoiding the intermediate + slice the legacy reader created. + + Each method reads through ``self._sock._recv_buf`` directly. The + ``IfxSocket`` exposes ``fill_recv_buf(n)`` to ensure ``n`` bytes + are available from the current cursor position; this method + handles ``recv()``, compaction, and EOF detection. + + ``read_exact`` returns a fresh ``bytes`` (not a memoryview) — + copying is cheap and decouples consumer lifetime from buffer + compaction, removing a class of use-after-compact bugs at + near-zero cost. + """ + + __slots__ = ("_sock",) + + def __init__(self, sock): + # Skip super().__init__ to avoid the unused BytesIO allocation + # — we override every method that touches ``self._source``. + self._sock = sock + + def read_exact(self, n: int) -> bytes: + if n <= 0: + return b"" + sock = self._sock + sock.fill_recv_buf(n) + buf = sock._recv_buf + pos = sock._recv_pos + end = pos + n + out = bytes(buf[pos:end]) + sock._recv_pos = end + return out + + def read_byte(self) -> int: + sock = self._sock + sock.fill_recv_buf(1) + b = sock._recv_buf[sock._recv_pos] + sock._recv_pos += 1 + return b + + def skip(self, n: int) -> None: + sock = self._sock + sock.fill_recv_buf(n) + sock._recv_pos += n + + def read_padded(self, n: int) -> bytes: + out = self.read_exact(n) + if n & 1: + self.skip(1) + return out + + def read_short(self) -> int: + sock = self._sock + sock.fill_recv_buf(2) + v = _FMT_SHORT.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 2 + return v + + def read_int(self) -> int: + sock = self._sock + sock.fill_recv_buf(4) + v = _FMT_INT.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 4 + return v + + def read_long_bigint(self) -> int: + sock = self._sock + sock.fill_recv_buf(8) + v = _FMT_LONG.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 8 + return v + + def read_real(self) -> float: + sock = self._sock + sock.fill_recv_buf(4) + v = _FMT_FLOAT.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 4 + return v + + def read_double(self) -> float: + sock = self._sock + sock.fill_recv_buf(8) + v = _FMT_DOUBLE.unpack_from(sock._recv_buf, sock._recv_pos)[0] + sock._recv_pos += 8 + return v + + def make_pdu_writer() -> tuple[IfxStreamWriter, BytesIO]: """Convenience: create a writer backed by a fresh in-memory buffer. diff --git a/src/informix_db/_socket.py b/src/informix_db/_socket.py index 3a5c0e0..e77faf9 100644 --- a/src/informix_db/_socket.py +++ b/src/informix_db/_socket.py @@ -42,7 +42,16 @@ def _make_default_dev_context() -> ssl.SSLContext: class IfxSocket: - """Owns a connected TCP socket and provides exact-N read/write.""" + """Owns a connected TCP socket and provides exact-N read/write. + + Phase 39 also owns the read-ahead buffer for ``BufferedSocketReader``. + The buffer (``_recv_buf`` + ``_recv_pos``) is connection-scoped, not + reader-scoped — readers are short-lived per-PDU views, but pipelined + responses (e.g., Phase 33's pipelined ``executemany``) can stream + multiple responses across reader boundaries. Persisting the buffer + here means we never throw away pre-fetched bytes when one reader + is destroyed and the next one is created. + """ def __init__( self, @@ -58,6 +67,11 @@ class IfxSocket: self._port = port self._read_timeout = read_timeout self._sock: socket.socket | None = None + # Phase 39 read-ahead buffer. Empty until a BufferedSocketReader + # touches us; subsequent readers share this buffer state. + self._recv_buf: bytearray = bytearray() + self._recv_pos: int = 0 + self._recv_size: int = 65536 try: sock = socket.create_connection((host, port), timeout=connect_timeout) @@ -130,6 +144,41 @@ class IfxSocket: remaining -= len(chunk) return b"".join(chunks) + def fill_recv_buf(self, need: int) -> None: + """Phase 39: ensure the read-ahead buffer holds ``need`` bytes + forward of ``_recv_pos``, recv'ing from the socket as needed. + + Compaction: when the read cursor has advanced past + ``_recv_size`` bytes, slice off the consumed prefix in place. + That bounds memory at roughly ``2 * recv_size`` while still + amortizing recv calls over many fields. + """ + if self._sock is None: + raise InterfaceError("socket is closed") + avail = len(self._recv_buf) - self._recv_pos + if avail >= need: + return + if self._recv_pos > self._recv_size: + del self._recv_buf[: self._recv_pos] + self._recv_pos = 0 + avail = len(self._recv_buf) + target = max(need, self._recv_size) + sock = self._sock + while avail < need: + try: + chunk = sock.recv(target - avail) + except OSError as e: + self._force_close() + raise OperationalError(f"read failed: {e}") from e + if not chunk: + self._force_close() + raise OperationalError( + f"server closed connection mid-read " + f"(wanted {need} bytes, got {avail})" + ) + self._recv_buf.extend(chunk) + avail += len(chunk) + def close(self) -> None: """Close the socket. Idempotent and never raises.""" if self._sock is None: diff --git a/src/informix_db/cursors.py b/src/informix_db/cursors.py index 15d1799..65c7070 100644 --- a/src/informix_db/cursors.py +++ b/src/informix_db/cursors.py @@ -22,6 +22,7 @@ from __future__ import annotations import contextlib import itertools import logging +import os import struct import weakref from collections.abc import Iterator @@ -29,7 +30,7 @@ from typing import TYPE_CHECKING, Any from . import _errcodes from ._messages import MessageType -from ._protocol import IfxStreamReader, make_pdu_writer +from ._protocol import BufferedSocketReader, IfxStreamReader, make_pdu_writer from ._resultset import ( ColumnInfo, compile_column_readers, @@ -77,6 +78,25 @@ del _build_static_pdu _log = logging.getLogger(__name__) +# Phase 39: buffered socket reader is the default. The reader holds a +# connection-scoped bytearray + offset cursor on ``IfxSocket``, so one +# ``recv()`` per ~64 KB amortizes over hundreds of fields rather than +# the per-field-recv pattern the legacy ``_SocketReader`` uses. +# +# Set ``IFX_BUFFERED_READER=0`` to fall back to the legacy reader if +# you hit a wire shape we haven't covered. The legacy path is kept as +# the escape hatch (and as the A/B baseline if we ever need to bisect +# a regression). Read once at module import so the check isn't on the +# hot path. +_USE_BUFFERED_READER = os.environ.get("IFX_BUFFERED_READER", "1") != "0" + + +def _make_socket_reader(sock): + if _USE_BUFFERED_READER: + return BufferedSocketReader(sock) + return _SocketReader(sock) + + def _finalize_cursor( conn_ref: weakref.ReferenceType, state: list, @@ -770,7 +790,7 @@ class Cursor: writer.write_short(MessageType.SQ_EOT) self._conn._send_pdu(buf.getvalue()) - reader = _SocketReader(self._conn._sock) + reader = _make_socket_reader(self._conn._sock) chunks: list[bytes] = [] while True: tag = reader.read_short() @@ -1538,7 +1558,7 @@ class Cursor: def _read_describe_response(self) -> None: """Read DESCRIBE (+ optional SQ_INSERTDONE) + DONE + COST + EOT after PREPARE.""" - reader = _SocketReader(self._conn._sock) + reader = _make_socket_reader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: @@ -1589,7 +1609,7 @@ class Cursor: def _read_fetch_response(self) -> None: """Read TUPLE* + DONE + COST + EOT after an NFETCH or SFETCH.""" - reader = _SocketReader(self._conn._sock) + reader = _make_socket_reader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: @@ -1623,7 +1643,7 @@ class Cursor: def _drain_to_eot(self) -> None: """Read response stream until SQ_EOT, allowing common tags in between.""" - reader = _SocketReader(self._conn._sock) + reader = _make_socket_reader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: