informix-db/tests/test_executemany_pipeline.py
Ryan Malloy 362ecb3d63 Phase 33: Pipelined executemany - 2.85x faster bulk insert (2026.05.05.6)
The serial-loop executemany paid one wire round-trip per row (~30us/
row on loopback). It was the one benchmark where IfxPy beat us in
the comparison work - 10% slower at executemany(1000) in txn.

Phase 33 pipelines the BIND+EXECUTE PDUs: build all N PDUs, send
them back-to-back, then drain all N responses. Eliminates per-row
RTT entirely.

Performance impact:
* executemany(1000) in txn:   31.3 ms -> 11.0 ms (2.85x faster)
* executemany(100) autocommit: 173 ms -> 154 ms (11% faster)
* executemany(1000) autocommit: 1740 ms -> 1590 ms (9% faster)

(Autocommit gets smaller wins because server-side log flushes
dominate - Phase 21.1's "autocommit cliff".)

IfxPy comparison flipped: us 10% slower -> us 2.05x faster on bulk
inserts. We now win all 5 head-to-head benchmarks against the C-bound
driver.

Margaret Hamilton review surfaced one CRITICAL concern (C1) - the
pipeline assumes Informix sends N responses for N pipelined PDUs
even when one fails. If the server cut the stream short, the drain
loop would deadlock on the next read.

Verified by 3 new integration tests in tests/test_executemany_pipeline.py:
* test_pipelined_executemany_mid_batch_constraint_violation (row 500/1000)
* test_pipelined_executemany_first_row_fails (row 0/100)
* test_pipelined_executemany_last_row_fails (row 99/100)

All confirm Informix sends N responses; wire stays aligned; connection
is usable after.

Plus 4 lower-priority fixes Hamilton recommended:
* H1: documented _raise_sq_err self-drains-SQ_EOT invariant + tripwire
* H2: docstring warning about O(N) lock duration; chunk for huge batches
* M1: prepend row-index to exception message rather than reformat
* M2: documented sendall-no-timeout caveat on hostile networks

77 unit + 239 integration + 33 benchmark = 349 tests; ruff clean.

Note: Phase 32 (Tier 1+2 benchmarks) was tagged without bumping
pyproject.toml's version string. .5 was git-tag-only; .6 is the next
published version increment.
2026-05-05 12:26:15 -06:00

215 lines
7.3 KiB
Python

"""Phase 33 integration tests — pipelined ``executemany`` correctness.
The pipelined executemany sends all N BIND+EXECUTE PDUs to the wire
before draining any response. Hamilton's review of Phase 33 flagged
C1: this assumes the server sends *exactly* N responses for N
pipelined PDUs even when one row fails. If the server cuts the
response stream short on first error, the drain loop would block
reading bytes that never arrive — the connection would deadlock on
the next read.
These tests verify the wire-alignment assumption holds:
1. Constraint violation at row 500 of 1000 — happy-failure case.
2. Wire-alignment recovery — connection is still usable after the
error (proving the RELEASE drain succeeded and we read all the
remaining error responses).
3. Subsequent operations on the same connection work — proves no
stray bytes on the wire.
"""
from __future__ import annotations
import contextlib
from collections.abc import Iterator
import pytest
import informix_db
from tests.conftest import ConnParams
pytestmark = pytest.mark.integration
@pytest.fixture
def constraint_table(logged_db_params: ConnParams) -> Iterator[str]:
"""Table with a UNIQUE constraint on ``id`` so we can force a
constraint violation at a known row.
"""
table = "p33_constraint"
conn = informix_db.connect(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
autocommit=True,
)
cur = conn.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cur.execute(
f"CREATE TABLE {table} (id INT NOT NULL PRIMARY KEY, name VARCHAR(64))"
)
conn.close()
try:
yield table
finally:
conn = informix_db.connect(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
autocommit=True,
)
cur = conn.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
conn.close()
def test_pipelined_executemany_mid_batch_constraint_violation(
logged_db_params: ConnParams, constraint_table: str
) -> None:
"""C1 (Hamilton): force a constraint violation at row 500 of 1000;
verify the pipeline drains cleanly and the connection is usable
afterward.
This is the test that validates Phase 33's wire-alignment
assumption. If Informix sends fewer than 1000 responses for 1000
pipelined PDUs after the row-500 failure, this test will hang on
the drain loop's read (eventually timing out, but the test will
fail loudly either way).
"""
conn = informix_db.connect(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
autocommit=False,
read_timeout=30.0, # if the wire desyncs, fail loudly within 30s
)
try:
# Pre-seed row 500 so the executemany's row-500 INSERT will
# violate the UNIQUE constraint.
cur = conn.cursor()
cur.execute(
f"INSERT INTO {constraint_table} VALUES (?, ?)",
(500, "pre-existing"),
)
conn.commit()
# Now executemany 1000 rows; row 500 will collide
rows = [(i, f"row_{i}") for i in range(1000)]
with pytest.raises(informix_db.IntegrityError) as exc_info:
cur.executemany(
f"INSERT INTO {constraint_table} VALUES (?, ?)", rows
)
# The error message should identify which row failed in the batch
err_msg = str(exc_info.value)
assert "row 500" in err_msg or "500" in err_msg, (
f"error message should identify the failed row index: {err_msg}"
)
# Whatever the transaction state, rolling back is the correct
# response to a failed batch.
conn.rollback()
# The connection MUST be usable after the failed batch.
# If the wire is desynced, this query will block or fail
# with a ProtocolError. The test passing here proves the
# pipeline drained cleanly.
cur = conn.cursor()
cur.execute(f"SELECT COUNT(*) FROM {constraint_table}")
(count,) = cur.fetchone()
# After rollback, only the pre-seeded row 500 remains
assert count == 1, (
f"expected only the pre-seeded row to remain, got {count} "
"(transaction didn't roll back cleanly?)"
)
finally:
conn.close()
def test_pipelined_executemany_first_row_fails(
logged_db_params: ConnParams, constraint_table: str
) -> None:
"""Edge case: failure on the FIRST row of the pipeline. Tests that
the drain loop correctly handles "every response after this is an
error" without falling apart on the very first response."""
conn = informix_db.connect(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
autocommit=False,
read_timeout=30.0,
)
try:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {constraint_table} VALUES (?, ?)", (0, "seeded")
)
conn.commit()
rows = [(i, f"row_{i}") for i in range(100)]
with pytest.raises(informix_db.IntegrityError):
cur.executemany(
f"INSERT INTO {constraint_table} VALUES (?, ?)", rows
)
conn.rollback()
cur = conn.cursor()
cur.execute(f"SELECT COUNT(*) FROM {constraint_table}")
(count,) = cur.fetchone()
assert count == 1
finally:
conn.close()
def test_pipelined_executemany_last_row_fails(
logged_db_params: ConnParams, constraint_table: str
) -> None:
"""Edge case: failure on the LAST row of the pipeline. Tests that
we don't accidentally short-circuit the drain when we see the
"expected" rowcount before the actual error response arrives."""
conn = informix_db.connect(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
autocommit=False,
read_timeout=30.0,
)
try:
cur = conn.cursor()
cur.execute(
f"INSERT INTO {constraint_table} VALUES (?, ?)",
(99, "seeded-last"),
)
conn.commit()
rows = [(i, f"row_{i}") for i in range(100)]
with pytest.raises(informix_db.IntegrityError):
cur.executemany(
f"INSERT INTO {constraint_table} VALUES (?, ?)", rows
)
conn.rollback()
cur = conn.cursor()
cur.execute(f"SELECT COUNT(*) FROM {constraint_table}")
(count,) = cur.fetchone()
assert count == 1
finally:
conn.close()