informix-db/tests/test_pool.py
Ryan Malloy 6afdbcabb3 Phase 27: Wire lock + async cancellation eviction (2026.05.05.1)
Closes Hamilton audit Critical #2 (concurrency / wire lock) and
High #3 (async cancellation evicts cleanly). Phase 26 fixed what
gets returned to the pool; Phase 27 fixes what can interleave on
the wire while it's running.

What changed:

connections.py:
* Added Connection._wire_lock = threading.RLock(). Wrapped commit(),
  rollback(), fast_path_call() under the lock.
* _ensure_transaction documents the lock as a precondition AND
  asserts ownership at runtime (_wire_lock._is_owned()) so a future
  caller adding a third call site fails loudly.
* close() tries to acquire wire lock with 0.5s timeout before
  SQ_EXIT; skips polite exit and force-closes if busy.

cursors.py:
* execute() body extracted into _execute_under_wire_lock() and
  called under the lock.
* executemany() body wrapped inline.
* _sfetch_at() wrapped - covers all scrollable fetch_* methods
  that delegate to it.
* close() locks the CLOSE+RELEASE for scrollable cursors.

pool.py:
* release() acquires conn._wire_lock with 5s timeout before rollback.
  On timeout: log WARNING, evict connection. Constant
  _RELEASE_WIRE_LOCK_TIMEOUT for tunability.

aio.py:
* AsyncConnectionPool.connection() now catches CancelledError /
  TimeoutError separately and routes to broken=True. Combined with
  the wire lock, asyncio.wait_for around aio DB calls is now safe.
* Updated docstring; mirrored in docs/USAGE.md.

Margaret Hamilton review surfaced three actionable conditions, all
addressed before tagging:
* Cancellation test used contextlib.suppress - could pass without
  exercising the cancellation path on a fast runner. Switched to
  pytest.raises so the test fails if timeout doesn't fire.
* _ensure_transaction precondition documented but unchecked at
  runtime. Added assert self._wire_lock._is_owned() guard.
* Connection.close() was unsynchronized. Now tries 0.5s acquire
  before SQ_EXIT.

Two new regression tests in tests/test_pool.py:
* test_concurrent_threads_on_one_connection_dont_interleave_pdus
  (without lock: garbled results / hangs)
* test_async_wait_for_cancellation_evicts_connection
  (asserts pool size shrinks; cancellation actually fires)

72 unit + 228 integration + 28 benchmark = 328 tests; ruff clean.

Hamilton verdict: PRODUCTION READY WITH CAVEATS (was) -> CAVEATS
NARROWED FURTHER (now). 0 critical, 2 high remaining (cursor
finalizers + bare-except in error drain) - both Phase 28 scope.
2026-05-05 03:40:39 -06:00

568 lines
19 KiB
Python

"""Phase 15 integration tests — connection pool.
Covers acquire/release, lazy/eager growth, timeout on exhaustion,
broken-connection eviction, health-check on acquire, multi-thread
safety, and clean shutdown.
"""
from __future__ import annotations
import asyncio
import contextlib
import threading
import time
import pytest
import informix_db
from informix_db import aio
from tests.conftest import ConnParams
pytestmark = pytest.mark.integration
def _make_pool(
params: ConnParams, *, min_size: int = 0, max_size: int = 4, **kw
) -> informix_db.ConnectionPool:
return informix_db.create_pool(
host=params.host,
port=params.port,
user=params.user,
password=params.password,
database=params.database,
server=params.server,
min_size=min_size,
max_size=max_size,
**kw,
)
# -------- API + lifecycle --------
def test_pool_starts_with_min_size_connections(
conn_params: ConnParams,
) -> None:
"""``min_size`` connections are pre-opened on construction."""
pool = _make_pool(conn_params, min_size=2, max_size=4)
try:
assert pool.size == 2
assert pool.idle_count == 2
finally:
pool.close()
def test_pool_grows_lazily_to_max_size(conn_params: ConnParams) -> None:
"""Starts at 0, grows on demand up to ``max_size``."""
pool = _make_pool(conn_params, min_size=0, max_size=3)
try:
assert pool.size == 0
c1 = pool.acquire()
assert pool.size == 1
c2 = pool.acquire()
c3 = pool.acquire()
assert pool.size == 3
for c in (c1, c2, c3):
pool.release(c)
assert pool.idle_count == 3
finally:
pool.close()
def test_pool_context_manager_releases(conn_params: ConnParams) -> None:
"""``with pool.connection()`` checks out and returns automatically."""
pool = _make_pool(conn_params, max_size=2)
try:
with pool.connection() as conn:
assert pool.idle_count == 0
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
# Released back into the pool
assert pool.idle_count == 1
finally:
pool.close()
def test_pool_reuses_connections(conn_params: ConnParams) -> None:
"""Sequential acquires return the SAME underlying connection (LIFO)."""
pool = _make_pool(conn_params, max_size=2)
try:
with pool.connection() as conn1:
id1 = id(conn1)
with pool.connection() as conn2:
id2 = id(conn2)
assert id1 == id2 # same Connection object reused
finally:
pool.close()
# -------- Exhaustion + timeout --------
def test_pool_acquire_times_out_when_full(conn_params: ConnParams) -> None:
"""Beyond max_size, acquire blocks then raises PoolTimeoutError."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=0.3)
try:
c1 = pool.acquire()
start = time.monotonic()
with pytest.raises(informix_db.PoolTimeoutError, match="max_size=1"):
pool.acquire()
elapsed = time.monotonic() - start
assert 0.25 < elapsed < 1.0 # honors the timeout
pool.release(c1)
finally:
pool.close()
def test_pool_acquire_timeout_override(conn_params: ConnParams) -> None:
"""Per-acquire ``timeout`` overrides the pool default."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=10.0)
try:
c1 = pool.acquire()
start = time.monotonic()
with pytest.raises(informix_db.PoolTimeoutError):
pool.acquire(timeout=0.2)
assert time.monotonic() - start < 1.0 # didn't wait the 10s default
pool.release(c1)
finally:
pool.close()
def test_pool_release_unblocks_waiter(conn_params: ConnParams) -> None:
"""When a connection is released, a blocked acquire returns."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=2.0)
try:
c1 = pool.acquire()
# Start a thread that will block on acquire
result: list[informix_db.Connection] = []
def waiter() -> None:
result.append(pool.acquire())
t = threading.Thread(target=waiter, daemon=True)
t.start()
time.sleep(0.1) # let waiter block
assert not result # still waiting
# Releasing c1 should unblock the waiter
pool.release(c1)
t.join(timeout=1.0)
assert len(result) == 1
pool.release(result[0])
finally:
pool.close()
# -------- Broken connection eviction --------
def test_broken_connection_evicted(conn_params: ConnParams) -> None:
"""Releasing with broken=True closes the connection and frees the slot."""
pool = _make_pool(conn_params, max_size=2)
try:
c1 = pool.acquire()
assert pool.size == 1
pool.release(c1, broken=True)
# Slot freed, conn closed
assert pool.size == 0
assert pool.idle_count == 0
finally:
pool.close()
def test_with_block_on_operational_error_evicts(
conn_params: ConnParams,
) -> None:
"""Exception in ``with pool.connection()`` evicts the connection."""
pool = _make_pool(conn_params, max_size=2)
try:
with pytest.raises(informix_db.OperationalError), pool.connection():
raise informix_db.OperationalError("simulated failure")
# Slot freed
assert pool.size == 0
finally:
pool.close()
def test_with_block_on_other_error_returns_to_pool(
conn_params: ConnParams,
) -> None:
"""Non-connection-related exceptions DON'T evict (data errors stay)."""
pool = _make_pool(conn_params, max_size=2)
try:
with pytest.raises(ValueError), pool.connection() as _conn:
raise ValueError("application bug, not connection")
# Connection retained
assert pool.idle_count == 1
finally:
pool.close()
# -------- Health check on acquire --------
def test_dead_connection_silently_replaced(
conn_params: ConnParams,
) -> None:
"""An idle connection that died is dropped and a fresh one minted."""
pool = _make_pool(conn_params, max_size=2)
try:
c1 = pool.acquire()
pool.release(c1)
# Forcibly break the idle connection from the outside
c1.close()
# Next acquire should silently replace it
c2 = pool.acquire()
assert c2 is not c1 or not c1.closed
# Verify it's actually usable
cur = c2.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
pool.release(c2)
finally:
pool.close()
# -------- Shutdown --------
def test_pool_close_drains_idle(conn_params: ConnParams) -> None:
"""``close()`` closes all idle connections and rejects new acquires."""
pool = _make_pool(conn_params, min_size=2)
assert pool.idle_count == 2
pool.close()
assert pool.size == 0
with pytest.raises(informix_db.PoolClosedError):
pool.acquire()
def test_pool_close_idempotent(conn_params: ConnParams) -> None:
"""``close()`` may be called multiple times."""
pool = _make_pool(conn_params, max_size=1)
pool.close()
pool.close() # must not raise
def test_pool_as_context_manager(conn_params: ConnParams) -> None:
"""``with pool: ...`` closes on exit."""
with _make_pool(conn_params, min_size=1) as pool, pool.connection() as conn:
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
# After exit
with pytest.raises(informix_db.PoolClosedError):
pool.acquire()
# -------- Multi-threaded safety --------
def test_pool_thread_safe_concurrent_acquires(
conn_params: ConnParams,
) -> None:
"""Multiple threads sharing a pool don't deadlock or double-use."""
pool = _make_pool(conn_params, max_size=4, acquire_timeout=5.0)
try:
results: list[int] = []
results_lock = threading.Lock()
def worker() -> None:
for _ in range(3):
with pool.connection() as conn:
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
(val,) = cur.fetchone()
with results_lock:
results.append(val)
threads = [threading.Thread(target=worker) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10.0)
assert not t.is_alive()
# 8 workers x 3 queries each = 24 results, all = 1
assert len(results) == 24
assert all(r == 1 for r in results)
# Pool didn't leak: at most max_size connections
assert pool.size <= 4
finally:
pool.close()
# -------- Phase 26: pool rollback-on-release (CRITICAL data-correctness bug) --------
def test_uncommitted_writes_invisible_to_next_acquirer(
logged_db_params: ConnParams,
) -> None:
"""Critical regression test for the dirty-pool-checkout bug.
Pre-Phase-26 behavior:
Request A acquires → INSERTs (no commit) → releases. Server
transaction stays open. Request B acquires the SAME connection
(max_size=1 forces reuse) → its first SELECT sees A's
uncommitted row (because it's running inside A's transaction).
Worse: if B then commits, A's writes land permanently. If B
errors before commit, A's writes silently roll back.
This is the same shape as psycopg2's pre-2.5 dirty-pool bug.
Post-Phase-26: pool.release() rolls back any open transaction
before adding the connection to ``_idle``. A's uncommitted
writes are gone before B ever sees the connection.
"""
pool = informix_db.create_pool(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
min_size=1,
max_size=1, # forces A and B to share the connection
)
table = "p26_dirty_pool"
try:
# Setup: fresh table, autocommit so the CREATE lands
with pool.connection() as setup:
cur = setup.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cur.execute(f"CREATE TABLE {table} (id INT, label VARCHAR(64))")
setup.commit()
# Request A: insert without committing, then release
a_conn = pool.acquire()
try:
cur = a_conn.cursor()
cur.execute(
f"INSERT INTO {table} VALUES (?, ?)", (1, "A's dirty write")
)
# Confirm A sees its own write inside its own transaction
cur.execute(f"SELECT COUNT(*) FROM {table}")
assert cur.fetchone() == (1,), "A should see its own write pre-release"
assert a_conn._in_transaction, "A's connection should be in_transaction"
finally:
pool.release(a_conn) # NO commit — this is the critical case
# Request B: acquire the same connection (max_size=1 guarantees reuse).
# Note: we don't assert on ``_in_transaction`` after acquire — the
# pool's ``_is_alive`` health probe runs SELECT 1 which opens a
# fresh transaction under autocommit=False. The data-correctness
# check (the COUNT below) is the actual ground truth: if Phase 26
# didn't apply, A's uncommitted row would still be visible because
# B would be running INSIDE A's leftover transaction.
b_conn = pool.acquire()
try:
assert b_conn is a_conn, "max_size=1 must yield the same connection"
cur = b_conn.cursor()
cur.execute(f"SELECT COUNT(*) FROM {table}")
(count,) = cur.fetchone()
assert count == 0, (
f"B sees {count} rows — A's uncommitted writes leaked across "
"the pool checkout boundary. Phase 26 fix did not apply."
)
finally:
pool.release(b_conn)
# Cleanup
with pool.connection() as cleanup:
cur = cleanup.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cleanup.commit()
finally:
pool.close()
def test_committed_writes_survive_pool_checkout(
logged_db_params: ConnParams,
) -> None:
"""Counterpart to the previous test: COMMITTED writes must persist.
This guards against the obvious over-correction — if Phase 26's
rollback also somehow nukes already-committed work (e.g., via a
second BEGIN+ROLLBACK round-trip), the bug fix would itself be
a data-loss bug. This test fails if rollback runs when it
shouldn't.
"""
pool = informix_db.create_pool(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
min_size=1,
max_size=1,
)
table = "p26_committed"
try:
with pool.connection() as setup:
cur = setup.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cur.execute(f"CREATE TABLE {table} (id INT)")
setup.commit()
# Request A: insert + commit + release
with pool.connection() as a_conn:
cur = a_conn.cursor()
cur.execute(f"INSERT INTO {table} VALUES (?)", (42,))
a_conn.commit()
assert not a_conn._in_transaction
# Request B: should see the committed row
with pool.connection() as b_conn:
cur = b_conn.cursor()
cur.execute(f"SELECT id FROM {table}")
assert cur.fetchone() == (42,), (
"Committed write disappeared — Phase 26 rollback ran when "
"it shouldn't have."
)
with pool.connection() as cleanup:
cur = cleanup.cursor()
with contextlib.suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cleanup.commit()
finally:
pool.close()
# -------- Phase 27: wire-lock thread-safety + async cancellation eviction --------
def test_concurrent_threads_on_one_connection_dont_interleave_pdus(
conn_params: ConnParams,
) -> None:
"""Phase 27 wire-lock regression test.
Per PEP 249 Threadsafety=1, threads aren't supposed to share
connections — but the async layer effectively does this when a
cancelled task's worker keeps running. We verify the wire lock
serializes correctly: two threads doing concurrent SELECTs on
one Connection should produce correct results, not garbled wire
state.
Without the wire lock, the two threads' PDU bytes interleave on
the socket and at least one query produces wrong results, raises
``ProtocolError``, or hangs.
"""
import threading
conn = informix_db.connect(
host=conn_params.host,
port=conn_params.port,
user=conn_params.user,
password=conn_params.password,
database=conn_params.database,
server=conn_params.server,
autocommit=True,
)
try:
results: list[int] = []
errors: list[Exception] = []
results_lock = threading.Lock()
def worker(query_id: int) -> None:
try:
for _ in range(20):
cur = conn.cursor()
cur.execute(
"SELECT FIRST 1 tabid FROM systables WHERE tabid = ?",
(query_id,),
)
(val,) = cur.fetchone()
cur.close()
with results_lock:
results.append(val)
except Exception as exc:
with results_lock:
errors.append(exc)
# Two threads, each doing 20 queries with distinct expected results
t1 = threading.Thread(target=worker, args=(1,))
t2 = threading.Thread(target=worker, args=(2,))
t1.start()
t2.start()
t1.join(timeout=30.0)
t2.join(timeout=30.0)
assert not t1.is_alive(), "thread 1 hung — wire lock failed"
assert not t2.is_alive(), "thread 2 hung — wire lock failed"
assert errors == [], (
f"Threads errored out — likely PDU interleaving: {errors!r}"
)
# Each worker did 20 queries, so 40 results total. Each result
# should be the query_id its thread used.
assert results.count(1) == 20
assert results.count(2) == 20
finally:
conn.close()
async def test_async_wait_for_cancellation_evicts_connection(
conn_params: ConnParams,
) -> None:
"""Phase 27 async-cancellation regression test.
Before Phase 27, a cancelled awaitable left the connection in the
pool's idle list with a possibly-still-running worker writing to
its socket. Now: cancellation routes to ``broken=True``, and the
pool evicts the connection rather than recycling it.
"""
pool = await aio.create_pool(
host=conn_params.host,
port=conn_params.port,
user=conn_params.user,
password=conn_params.password,
database=conn_params.database,
server=conn_params.server,
min_size=0,
max_size=2,
)
try:
# Force-grow to 1 connection so we have something to evict
async with pool.connection() as warmup_conn:
cur = await warmup_conn.cursor()
await cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
await cur.fetchone()
await cur.close()
size_before = pool.size
assert size_before == 1, f"expected 1 connection, got {size_before}"
# Trigger cancellation mid-query.
async def slow_query() -> None:
async with pool.connection() as conn:
cur = await conn.cursor()
# A query that will run for >100ms on the dev image:
# systables join itself a few times.
await cur.execute(
"SELECT COUNT(*) FROM systables a, systables b, "
"systables c WHERE a.tabid > 0"
)
await cur.fetchone()
await cur.close()
# Use pytest.raises (NOT contextlib.suppress) so the test fails
# if the timeout never fires — otherwise the test could pass on
# a fast CI runner where the query completes within 1ms,
# silently skipping the cancellation path it claims to test.
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_query(), timeout=0.001)
# After cancellation, the connection must NOT have rejoined the
# pool's idle list. It should have been evicted (broken=True).
# Allow a moment for the release to complete.
await asyncio.sleep(0.5)
assert pool.size <= size_before, (
f"Connection wasn't evicted on cancellation; pool.size={pool.size} "
f"(expected ≤ {size_before}). The cancelled connection rejoined "
"the idle list — Phase 27 fix did not apply."
)
finally:
await pool.close()