Phase 39: Connection-scoped read-ahead buffer (2026.05.05.12)
Closes the bulk-fetch gap to within ~7-15% of IfxPy. Lever was the buffer/I/O machinery, not the codec — Phase 37/38 had already brought the codec close to IfxPy's C path; the remaining gap was ~450k read_exact calls per 100k-row fetch, each doing its own recv-loop and bytes.join. Architecture: IfxSocket owns a connection-scoped bytearray + integer offset cursor; BufferedSocketReader is a thin parser-view that delegates buffer-fill to the socket. One recv() per ~64 KB instead of per field. This is how asyncpg (buffer.pyx) and psycopg3 (pq.PGconn) structure their read paths. The buffer MUST be socket-scoped, not reader-scoped: the pipelined- executemany path (Phase 33) streams N responses back-to-back across multiple cursor reads, and a per-reader buffer would throw away pre-fetched bytes when one reader is destroyed. (The first iteration of this phase tried per-reader and hung on test_executemany_1000_rows.) A/B vs Phase 38, same harness, warmed cache: select_scaling_1000 2.90 -> 1.72 ms (-41%) select_scaling_10000 24.32 -> 16.08 ms (-34%) select_scaling_100000 250.36 -> 168.98 ms (-32%) Head-to-head vs IfxPy 2.0.7: select_scaling_1000 1.05x (basically tied) select_scaling_10000 1.07x select_scaling_100000 1.15x IQR collapsed 9x at 100k (3.6 ms -> 0.4 ms) — fewer recvs means fewer scheduler/jitter pulses showing up in the measurement. Default ON. Set IFX_BUFFERED_READER=0 to fall back to the legacy reader (still tested in CI as the escape hatch). Both paths green: 251/251 integration tests pass on each.
This commit is contained in:
parent
a5e6cf1ae3
commit
ad55391bf1
63
CHANGELOG.md
63
CHANGELOG.md
@ -2,6 +2,69 @@
|
||||
|
||||
All notable changes to `informix-db`. Versioning is [CalVer](https://calver.org/) — `YYYY.MM.DD` for date-based releases, `YYYY.MM.DD.N` for same-day post-releases per PEP 440.
|
||||
|
||||
## 2026.05.05.12 — Phase 39: Connection-scoped read-ahead buffer
|
||||
|
||||
Closes the C-vs-Python bulk-fetch gap to within ~7-15% of IfxPy. The lever was the buffer/I/O machinery, not the codec — Phase 37/38 had already brought the codec to within ~25% of IfxPy's C path; the remaining gap was 450k+ ``read_exact`` calls per 100k-row fetch, each doing its own ``recv``-loop and ``bytes.join``.
|
||||
|
||||
### What changed
|
||||
|
||||
`src/informix_db/_socket.py`:
|
||||
|
||||
- ``IfxSocket`` now owns a connection-scoped read-ahead buffer (``_recv_buf`` bytearray + ``_recv_pos`` integer cursor) and exposes ``fill_recv_buf(need)`` to top it up from the socket.
|
||||
- One ``recv()`` per ~64 KB instead of per field. For a 100k-row SELECT, syscall count drops from ~450k to a few hundred.
|
||||
- Compaction: when ``_recv_pos > recv_size``, the consumed prefix is sliced off in place. Memory is bounded at roughly ``2 * recv_size`` for the connection's lifetime.
|
||||
|
||||
`src/informix_db/_protocol.py`:
|
||||
|
||||
- New ``BufferedSocketReader(IfxStreamReader)`` — a thin parser-view over the ``IfxSocket``'s persistent buffer. Each method delegates the buffer-fill to the socket, then reads via ``struct.unpack_from(buf, offset)`` directly out of the bytearray (avoiding the intermediate slice the legacy reader created).
|
||||
|
||||
`src/informix_db/cursors.py`:
|
||||
|
||||
- ``_make_socket_reader(sock)`` chooses ``BufferedSocketReader`` (default) or the legacy ``_SocketReader`` based on ``IFX_BUFFERED_READER``. Set ``IFX_BUFFERED_READER=0`` to fall back to the legacy reader.
|
||||
- The four cursor sites that build a reader now call the helper.
|
||||
|
||||
### Why the buffer is on `IfxSocket`, not on the reader
|
||||
|
||||
The first iteration of Phase 39 put the bytearray on the reader. That hung on `test_executemany_1000_rows` — the pipelined-executemany path (Phase 33) streams N responses back-to-back across multiple cursor reads, and a per-reader buffer threw away pre-fetched bytes when one reader was destroyed and the next was created. The next ``recv()`` then blocked waiting for bytes that had already been consumed.
|
||||
|
||||
Moving the buffer to ``IfxSocket`` mirrors how `asyncpg` (`buffer.pyx` lives on the protocol object) and `psycopg3` (`pq.PGconn`) structure their read paths. The reader is a short-lived view; the buffer outlives it.
|
||||
|
||||
### Performance
|
||||
|
||||
A/B from the same harness, same Docker container, same ``p34_select`` table, warmed cache:
|
||||
|
||||
| Workload | Phase 38 | Phase 39 | Δ |
|
||||
|---|---:|---:|---:|
|
||||
| `select_scaling_1000` | 2.901 ms | **1.716 ms** | **-41%** |
|
||||
| `select_scaling_10000` | 24.317 ms | **16.084 ms** | **-34%** |
|
||||
| `select_scaling_100000` | 250.363 ms | **168.982 ms** | **-32%** |
|
||||
|
||||
Against IfxPy 2.0.7 (C-bound, ODBC) on the same workloads, head-to-head:
|
||||
|
||||
| Workload | IfxPy | informix-db Phase 39 | Ratio |
|
||||
|---|---:|---:|---:|
|
||||
| `select_scaling_1000` | 1.637 ms | 1.716 ms | **1.05×** |
|
||||
| `select_scaling_10000` | 15.073 ms | 16.084 ms | **1.07×** |
|
||||
| `select_scaling_100000` | 147.361 ms | 168.982 ms | **1.15×** |
|
||||
|
||||
The bulk-fetch gap that was 2.4× before Phase 37 is now within 5-15% of IfxPy. With IfxPy's measurement IQR at ~21% on the 100k workload (Docker→host loopback noise) vs our IQR at ~0.2%, the headline ratio is partly noise — the real gap is plausibly 1.05-1.2×.
|
||||
|
||||
A pure-Python driver running within ~10% of an ODBC/C-bound driver on bulk fetch is, as far as I'm aware, unprecedented on the SQLI protocol.
|
||||
|
||||
### Tests
|
||||
|
||||
All 251 integration tests pass with the new default reader. The legacy reader is exercised via ``IFX_BUFFERED_READER=0`` (also passes 251/251). Both unit suites pass.
|
||||
|
||||
### Migration
|
||||
|
||||
No code changes required. The default reader switches at version bump. To opt out:
|
||||
|
||||
```bash
|
||||
IFX_BUFFERED_READER=0 python myapp.py
|
||||
```
|
||||
|
||||
If you find a wire shape the buffered reader doesn't handle, please file an issue with the ``IFX_BUFFERED_READER=0`` workaround as the immediate mitigation.
|
||||
|
||||
## 2026.05.05.11 — Phase 38: `exec()`-based row-decoder codegen
|
||||
|
||||
Closes more of the C-vs-Python codec gap on bulk fetch by emitting a specialized row decoder per result-set shape via `exec(compile(src, ...))` and inlining the common fixed-width decode bodies directly into the generated source. This is the lever flagged in the Phase 37 changelog as "the next lever for materially closing the gap."
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "informix-db"
|
||||
version = "2026.05.05.11"
|
||||
version = "2026.05.05.12"
|
||||
description = "Pure-Python driver for IBM Informix IDS — speaks the SQLI wire protocol over raw sockets. No CSDK, no JVM, no native libraries."
|
||||
readme = "README.md"
|
||||
license = { text = "MIT" }
|
||||
|
||||
@ -209,6 +209,112 @@ class IfxStreamReader:
|
||||
return raw.decode(encoding)
|
||||
|
||||
|
||||
class BufferedSocketReader(IfxStreamReader):
|
||||
"""Phase 39: thin parser-view over the ``IfxSocket``'s persistent
|
||||
read-ahead buffer.
|
||||
|
||||
The buffer itself (``_recv_buf`` + ``_recv_pos``) lives on the
|
||||
``IfxSocket`` so it survives across reader instances. This is
|
||||
critical for the pipelined-executemany path (Phase 33), which
|
||||
streams N responses back-to-back — a per-reader buffer would lose
|
||||
pre-fetched bytes when one reader is destroyed and the next is
|
||||
created.
|
||||
|
||||
Why this is faster than ``_SocketReader``:
|
||||
|
||||
* **One ``recv()`` per ~64 KB** instead of one per field. A 100k-row
|
||||
SELECT calls ``read_exact`` ~450k times today; with this reader,
|
||||
``recv`` fires ~hundreds of times instead.
|
||||
* **No ``bytes.join``.** The bytearray IS the buffer; reads are
|
||||
slices, not chunk-joins.
|
||||
* **``struct.unpack_from(buf, offset)``** for fixed-width ints — reads
|
||||
directly from the bytearray at offset, avoiding the intermediate
|
||||
slice the legacy reader created.
|
||||
|
||||
Each method reads through ``self._sock._recv_buf`` directly. The
|
||||
``IfxSocket`` exposes ``fill_recv_buf(n)`` to ensure ``n`` bytes
|
||||
are available from the current cursor position; this method
|
||||
handles ``recv()``, compaction, and EOF detection.
|
||||
|
||||
``read_exact`` returns a fresh ``bytes`` (not a memoryview) —
|
||||
copying is cheap and decouples consumer lifetime from buffer
|
||||
compaction, removing a class of use-after-compact bugs at
|
||||
near-zero cost.
|
||||
"""
|
||||
|
||||
__slots__ = ("_sock",)
|
||||
|
||||
def __init__(self, sock):
|
||||
# Skip super().__init__ to avoid the unused BytesIO allocation
|
||||
# — we override every method that touches ``self._source``.
|
||||
self._sock = sock
|
||||
|
||||
def read_exact(self, n: int) -> bytes:
|
||||
if n <= 0:
|
||||
return b""
|
||||
sock = self._sock
|
||||
sock.fill_recv_buf(n)
|
||||
buf = sock._recv_buf
|
||||
pos = sock._recv_pos
|
||||
end = pos + n
|
||||
out = bytes(buf[pos:end])
|
||||
sock._recv_pos = end
|
||||
return out
|
||||
|
||||
def read_byte(self) -> int:
|
||||
sock = self._sock
|
||||
sock.fill_recv_buf(1)
|
||||
b = sock._recv_buf[sock._recv_pos]
|
||||
sock._recv_pos += 1
|
||||
return b
|
||||
|
||||
def skip(self, n: int) -> None:
|
||||
sock = self._sock
|
||||
sock.fill_recv_buf(n)
|
||||
sock._recv_pos += n
|
||||
|
||||
def read_padded(self, n: int) -> bytes:
|
||||
out = self.read_exact(n)
|
||||
if n & 1:
|
||||
self.skip(1)
|
||||
return out
|
||||
|
||||
def read_short(self) -> int:
|
||||
sock = self._sock
|
||||
sock.fill_recv_buf(2)
|
||||
v = _FMT_SHORT.unpack_from(sock._recv_buf, sock._recv_pos)[0]
|
||||
sock._recv_pos += 2
|
||||
return v
|
||||
|
||||
def read_int(self) -> int:
|
||||
sock = self._sock
|
||||
sock.fill_recv_buf(4)
|
||||
v = _FMT_INT.unpack_from(sock._recv_buf, sock._recv_pos)[0]
|
||||
sock._recv_pos += 4
|
||||
return v
|
||||
|
||||
def read_long_bigint(self) -> int:
|
||||
sock = self._sock
|
||||
sock.fill_recv_buf(8)
|
||||
v = _FMT_LONG.unpack_from(sock._recv_buf, sock._recv_pos)[0]
|
||||
sock._recv_pos += 8
|
||||
return v
|
||||
|
||||
def read_real(self) -> float:
|
||||
sock = self._sock
|
||||
sock.fill_recv_buf(4)
|
||||
v = _FMT_FLOAT.unpack_from(sock._recv_buf, sock._recv_pos)[0]
|
||||
sock._recv_pos += 4
|
||||
return v
|
||||
|
||||
def read_double(self) -> float:
|
||||
sock = self._sock
|
||||
sock.fill_recv_buf(8)
|
||||
v = _FMT_DOUBLE.unpack_from(sock._recv_buf, sock._recv_pos)[0]
|
||||
sock._recv_pos += 8
|
||||
return v
|
||||
|
||||
|
||||
def make_pdu_writer() -> tuple[IfxStreamWriter, BytesIO]:
|
||||
"""Convenience: create a writer backed by a fresh in-memory buffer.
|
||||
|
||||
|
||||
@ -42,7 +42,16 @@ def _make_default_dev_context() -> ssl.SSLContext:
|
||||
|
||||
|
||||
class IfxSocket:
|
||||
"""Owns a connected TCP socket and provides exact-N read/write."""
|
||||
"""Owns a connected TCP socket and provides exact-N read/write.
|
||||
|
||||
Phase 39 also owns the read-ahead buffer for ``BufferedSocketReader``.
|
||||
The buffer (``_recv_buf`` + ``_recv_pos``) is connection-scoped, not
|
||||
reader-scoped — readers are short-lived per-PDU views, but pipelined
|
||||
responses (e.g., Phase 33's pipelined ``executemany``) can stream
|
||||
multiple responses across reader boundaries. Persisting the buffer
|
||||
here means we never throw away pre-fetched bytes when one reader
|
||||
is destroyed and the next one is created.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -58,6 +67,11 @@ class IfxSocket:
|
||||
self._port = port
|
||||
self._read_timeout = read_timeout
|
||||
self._sock: socket.socket | None = None
|
||||
# Phase 39 read-ahead buffer. Empty until a BufferedSocketReader
|
||||
# touches us; subsequent readers share this buffer state.
|
||||
self._recv_buf: bytearray = bytearray()
|
||||
self._recv_pos: int = 0
|
||||
self._recv_size: int = 65536
|
||||
|
||||
try:
|
||||
sock = socket.create_connection((host, port), timeout=connect_timeout)
|
||||
@ -130,6 +144,41 @@ class IfxSocket:
|
||||
remaining -= len(chunk)
|
||||
return b"".join(chunks)
|
||||
|
||||
def fill_recv_buf(self, need: int) -> None:
|
||||
"""Phase 39: ensure the read-ahead buffer holds ``need`` bytes
|
||||
forward of ``_recv_pos``, recv'ing from the socket as needed.
|
||||
|
||||
Compaction: when the read cursor has advanced past
|
||||
``_recv_size`` bytes, slice off the consumed prefix in place.
|
||||
That bounds memory at roughly ``2 * recv_size`` while still
|
||||
amortizing recv calls over many fields.
|
||||
"""
|
||||
if self._sock is None:
|
||||
raise InterfaceError("socket is closed")
|
||||
avail = len(self._recv_buf) - self._recv_pos
|
||||
if avail >= need:
|
||||
return
|
||||
if self._recv_pos > self._recv_size:
|
||||
del self._recv_buf[: self._recv_pos]
|
||||
self._recv_pos = 0
|
||||
avail = len(self._recv_buf)
|
||||
target = max(need, self._recv_size)
|
||||
sock = self._sock
|
||||
while avail < need:
|
||||
try:
|
||||
chunk = sock.recv(target - avail)
|
||||
except OSError as e:
|
||||
self._force_close()
|
||||
raise OperationalError(f"read failed: {e}") from e
|
||||
if not chunk:
|
||||
self._force_close()
|
||||
raise OperationalError(
|
||||
f"server closed connection mid-read "
|
||||
f"(wanted {need} bytes, got {avail})"
|
||||
)
|
||||
self._recv_buf.extend(chunk)
|
||||
avail += len(chunk)
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the socket. Idempotent and never raises."""
|
||||
if self._sock is None:
|
||||
|
||||
@ -22,6 +22,7 @@ from __future__ import annotations
|
||||
import contextlib
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import struct
|
||||
import weakref
|
||||
from collections.abc import Iterator
|
||||
@ -29,7 +30,7 @@ from typing import TYPE_CHECKING, Any
|
||||
|
||||
from . import _errcodes
|
||||
from ._messages import MessageType
|
||||
from ._protocol import IfxStreamReader, make_pdu_writer
|
||||
from ._protocol import BufferedSocketReader, IfxStreamReader, make_pdu_writer
|
||||
from ._resultset import (
|
||||
ColumnInfo,
|
||||
compile_column_readers,
|
||||
@ -77,6 +78,25 @@ del _build_static_pdu
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# Phase 39: buffered socket reader is the default. The reader holds a
|
||||
# connection-scoped bytearray + offset cursor on ``IfxSocket``, so one
|
||||
# ``recv()`` per ~64 KB amortizes over hundreds of fields rather than
|
||||
# the per-field-recv pattern the legacy ``_SocketReader`` uses.
|
||||
#
|
||||
# Set ``IFX_BUFFERED_READER=0`` to fall back to the legacy reader if
|
||||
# you hit a wire shape we haven't covered. The legacy path is kept as
|
||||
# the escape hatch (and as the A/B baseline if we ever need to bisect
|
||||
# a regression). Read once at module import so the check isn't on the
|
||||
# hot path.
|
||||
_USE_BUFFERED_READER = os.environ.get("IFX_BUFFERED_READER", "1") != "0"
|
||||
|
||||
|
||||
def _make_socket_reader(sock):
|
||||
if _USE_BUFFERED_READER:
|
||||
return BufferedSocketReader(sock)
|
||||
return _SocketReader(sock)
|
||||
|
||||
|
||||
def _finalize_cursor(
|
||||
conn_ref: weakref.ReferenceType,
|
||||
state: list,
|
||||
@ -770,7 +790,7 @@ class Cursor:
|
||||
writer.write_short(MessageType.SQ_EOT)
|
||||
self._conn._send_pdu(buf.getvalue())
|
||||
|
||||
reader = _SocketReader(self._conn._sock)
|
||||
reader = _make_socket_reader(self._conn._sock)
|
||||
chunks: list[bytes] = []
|
||||
while True:
|
||||
tag = reader.read_short()
|
||||
@ -1538,7 +1558,7 @@ class Cursor:
|
||||
|
||||
def _read_describe_response(self) -> None:
|
||||
"""Read DESCRIBE (+ optional SQ_INSERTDONE) + DONE + COST + EOT after PREPARE."""
|
||||
reader = _SocketReader(self._conn._sock)
|
||||
reader = _make_socket_reader(self._conn._sock)
|
||||
while True:
|
||||
tag = reader.read_short()
|
||||
if tag == MessageType.SQ_EOT:
|
||||
@ -1589,7 +1609,7 @@ class Cursor:
|
||||
|
||||
def _read_fetch_response(self) -> None:
|
||||
"""Read TUPLE* + DONE + COST + EOT after an NFETCH or SFETCH."""
|
||||
reader = _SocketReader(self._conn._sock)
|
||||
reader = _make_socket_reader(self._conn._sock)
|
||||
while True:
|
||||
tag = reader.read_short()
|
||||
if tag == MessageType.SQ_EOT:
|
||||
@ -1623,7 +1643,7 @@ class Cursor:
|
||||
|
||||
def _drain_to_eot(self) -> None:
|
||||
"""Read response stream until SQ_EOT, allowing common tags in between."""
|
||||
reader = _SocketReader(self._conn._sock)
|
||||
reader = _make_socket_reader(self._conn._sock)
|
||||
while True:
|
||||
tag = reader.read_short()
|
||||
if tag == MessageType.SQ_EOT:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user