informix-db/tests/test_pool.py
Ryan Malloy 5c4a7a57f1 Phase 26: Pool rollback-on-release - CRITICAL data-correctness fix (2026.05.05)
Fixes the dirty-pool-checkout bug surfaced by Margaret Hamilton's
system-wide audit (Critical #1).

The bug: ConnectionPool.release() returned connections with open
server-side transactions still active. Request A's uncommitted
INSERTs would be inherited by Request B reusing the same connection -
B's commit would land A's writes permanently; B's rollback would
silently lose them. Same shape as psycopg2's pre-2.5 dirty-pool bug.

The fix: pool.release() now rolls back any open transaction before
returning the connection to the idle list. The rollback runs OUTSIDE
the pool lock since it's a wire round-trip - the connection is
already off the idle list and counted in _total, so no other thread
can grab it during the rollback window. If the rollback itself fails
(dead socket, etc.), the connection is evicted rather than recycled.

Async path covered automatically: AsyncConnectionPool.release()
delegates to the sync pool's release via _to_thread.

Margaret Hamilton review pass surfaced two findings, both addressed:
* Silent rollback failure: added a WARNING log via logging.getLogger
  ("informix_db.pool") so evictions are debuggable. First logger in
  the project.
* Async cancellation race: the fix doesn't introduce the
  asyncio.wait_for race (Critical #2, deferred to Phase 27), but it
  adds a code path that can trigger it. Documented loudly in
  pool.release() docstring, aio.py module docstring, and USAGE.md
  async section. Recommendation: use read_timeout on the connection
  instead of asyncio.wait_for until Phase 27 lands.

Two new regression tests in tests/test_pool.py:
* test_uncommitted_writes_invisible_to_next_acquirer (the bug)
* test_committed_writes_survive_pool_checkout (no over-correction)

Verified the regression test catches the bug: stashed the fix, ran
the test - it fails with "B sees 1 rows - leaked across pool
checkout boundary" - confirming it tests the real failure mode.

Total tests: 72 unit + 226 integration + 28 benchmark = 326.

Deferred to Phase 27 per Hamilton audit:
* Critical #2 (concurrency / per-connection wire lock)
* High #3 (async cancellation routes to broken=True)
* High #4 (bare except in _raise_sq_err drain)
* High #5 (no cursor finalizers - server-side resource leaks)
2026-05-05 03:22:18 -06:00

431 lines
14 KiB
Python

"""Phase 15 integration tests — connection pool.
Covers acquire/release, lazy/eager growth, timeout on exhaustion,
broken-connection eviction, health-check on acquire, multi-thread
safety, and clean shutdown.
"""
from __future__ import annotations
import threading
import time
import pytest
import informix_db
from tests.conftest import ConnParams
pytestmark = pytest.mark.integration
def _make_pool(
params: ConnParams, *, min_size: int = 0, max_size: int = 4, **kw
) -> informix_db.ConnectionPool:
return informix_db.create_pool(
host=params.host,
port=params.port,
user=params.user,
password=params.password,
database=params.database,
server=params.server,
min_size=min_size,
max_size=max_size,
**kw,
)
# -------- API + lifecycle --------
def test_pool_starts_with_min_size_connections(
conn_params: ConnParams,
) -> None:
"""``min_size`` connections are pre-opened on construction."""
pool = _make_pool(conn_params, min_size=2, max_size=4)
try:
assert pool.size == 2
assert pool.idle_count == 2
finally:
pool.close()
def test_pool_grows_lazily_to_max_size(conn_params: ConnParams) -> None:
"""Starts at 0, grows on demand up to ``max_size``."""
pool = _make_pool(conn_params, min_size=0, max_size=3)
try:
assert pool.size == 0
c1 = pool.acquire()
assert pool.size == 1
c2 = pool.acquire()
c3 = pool.acquire()
assert pool.size == 3
for c in (c1, c2, c3):
pool.release(c)
assert pool.idle_count == 3
finally:
pool.close()
def test_pool_context_manager_releases(conn_params: ConnParams) -> None:
"""``with pool.connection()`` checks out and returns automatically."""
pool = _make_pool(conn_params, max_size=2)
try:
with pool.connection() as conn:
assert pool.idle_count == 0
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
# Released back into the pool
assert pool.idle_count == 1
finally:
pool.close()
def test_pool_reuses_connections(conn_params: ConnParams) -> None:
"""Sequential acquires return the SAME underlying connection (LIFO)."""
pool = _make_pool(conn_params, max_size=2)
try:
with pool.connection() as conn1:
id1 = id(conn1)
with pool.connection() as conn2:
id2 = id(conn2)
assert id1 == id2 # same Connection object reused
finally:
pool.close()
# -------- Exhaustion + timeout --------
def test_pool_acquire_times_out_when_full(conn_params: ConnParams) -> None:
"""Beyond max_size, acquire blocks then raises PoolTimeoutError."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=0.3)
try:
c1 = pool.acquire()
start = time.monotonic()
with pytest.raises(informix_db.PoolTimeoutError, match="max_size=1"):
pool.acquire()
elapsed = time.monotonic() - start
assert 0.25 < elapsed < 1.0 # honors the timeout
pool.release(c1)
finally:
pool.close()
def test_pool_acquire_timeout_override(conn_params: ConnParams) -> None:
"""Per-acquire ``timeout`` overrides the pool default."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=10.0)
try:
c1 = pool.acquire()
start = time.monotonic()
with pytest.raises(informix_db.PoolTimeoutError):
pool.acquire(timeout=0.2)
assert time.monotonic() - start < 1.0 # didn't wait the 10s default
pool.release(c1)
finally:
pool.close()
def test_pool_release_unblocks_waiter(conn_params: ConnParams) -> None:
"""When a connection is released, a blocked acquire returns."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=2.0)
try:
c1 = pool.acquire()
# Start a thread that will block on acquire
result: list[informix_db.Connection] = []
def waiter() -> None:
result.append(pool.acquire())
t = threading.Thread(target=waiter, daemon=True)
t.start()
time.sleep(0.1) # let waiter block
assert not result # still waiting
# Releasing c1 should unblock the waiter
pool.release(c1)
t.join(timeout=1.0)
assert len(result) == 1
pool.release(result[0])
finally:
pool.close()
# -------- Broken connection eviction --------
def test_broken_connection_evicted(conn_params: ConnParams) -> None:
"""Releasing with broken=True closes the connection and frees the slot."""
pool = _make_pool(conn_params, max_size=2)
try:
c1 = pool.acquire()
assert pool.size == 1
pool.release(c1, broken=True)
# Slot freed, conn closed
assert pool.size == 0
assert pool.idle_count == 0
finally:
pool.close()
def test_with_block_on_operational_error_evicts(
conn_params: ConnParams,
) -> None:
"""Exception in ``with pool.connection()`` evicts the connection."""
pool = _make_pool(conn_params, max_size=2)
try:
with pytest.raises(informix_db.OperationalError), pool.connection():
raise informix_db.OperationalError("simulated failure")
# Slot freed
assert pool.size == 0
finally:
pool.close()
def test_with_block_on_other_error_returns_to_pool(
conn_params: ConnParams,
) -> None:
"""Non-connection-related exceptions DON'T evict (data errors stay)."""
pool = _make_pool(conn_params, max_size=2)
try:
with pytest.raises(ValueError), pool.connection() as _conn:
raise ValueError("application bug, not connection")
# Connection retained
assert pool.idle_count == 1
finally:
pool.close()
# -------- Health check on acquire --------
def test_dead_connection_silently_replaced(
conn_params: ConnParams,
) -> None:
"""An idle connection that died is dropped and a fresh one minted."""
pool = _make_pool(conn_params, max_size=2)
try:
c1 = pool.acquire()
pool.release(c1)
# Forcibly break the idle connection from the outside
c1.close()
# Next acquire should silently replace it
c2 = pool.acquire()
assert c2 is not c1 or not c1.closed
# Verify it's actually usable
cur = c2.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
pool.release(c2)
finally:
pool.close()
# -------- Shutdown --------
def test_pool_close_drains_idle(conn_params: ConnParams) -> None:
"""``close()`` closes all idle connections and rejects new acquires."""
pool = _make_pool(conn_params, min_size=2)
assert pool.idle_count == 2
pool.close()
assert pool.size == 0
with pytest.raises(informix_db.PoolClosedError):
pool.acquire()
def test_pool_close_idempotent(conn_params: ConnParams) -> None:
"""``close()`` may be called multiple times."""
pool = _make_pool(conn_params, max_size=1)
pool.close()
pool.close() # must not raise
def test_pool_as_context_manager(conn_params: ConnParams) -> None:
"""``with pool: ...`` closes on exit."""
with _make_pool(conn_params, min_size=1) as pool, pool.connection() as conn:
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
# After exit
with pytest.raises(informix_db.PoolClosedError):
pool.acquire()
# -------- Multi-threaded safety --------
def test_pool_thread_safe_concurrent_acquires(
conn_params: ConnParams,
) -> None:
"""Multiple threads sharing a pool don't deadlock or double-use."""
pool = _make_pool(conn_params, max_size=4, acquire_timeout=5.0)
try:
results: list[int] = []
results_lock = threading.Lock()
def worker() -> None:
for _ in range(3):
with pool.connection() as conn:
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
(val,) = cur.fetchone()
with results_lock:
results.append(val)
threads = [threading.Thread(target=worker) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10.0)
assert not t.is_alive()
# 8 workers x 3 queries each = 24 results, all = 1
assert len(results) == 24
assert all(r == 1 for r in results)
# Pool didn't leak: at most max_size connections
assert pool.size <= 4
finally:
pool.close()
# -------- Phase 26: pool rollback-on-release (CRITICAL data-correctness bug) --------
def test_uncommitted_writes_invisible_to_next_acquirer(
logged_db_params: ConnParams,
) -> None:
"""Critical regression test for the dirty-pool-checkout bug.
Pre-Phase-26 behavior:
Request A acquires → INSERTs (no commit) → releases. Server
transaction stays open. Request B acquires the SAME connection
(max_size=1 forces reuse) → its first SELECT sees A's
uncommitted row (because it's running inside A's transaction).
Worse: if B then commits, A's writes land permanently. If B
errors before commit, A's writes silently roll back.
This is the same shape as psycopg2's pre-2.5 dirty-pool bug.
Post-Phase-26: pool.release() rolls back any open transaction
before adding the connection to ``_idle``. A's uncommitted
writes are gone before B ever sees the connection.
"""
pool = informix_db.create_pool(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
min_size=1,
max_size=1, # forces A and B to share the connection
)
table = "p26_dirty_pool"
try:
# Setup: fresh table, autocommit so the CREATE lands
with pool.connection() as setup:
cur = setup.cursor()
with __import__("contextlib").suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cur.execute(f"CREATE TABLE {table} (id INT, label VARCHAR(64))")
setup.commit()
# Request A: insert without committing, then release
a_conn = pool.acquire()
try:
cur = a_conn.cursor()
cur.execute(
f"INSERT INTO {table} VALUES (?, ?)", (1, "A's dirty write")
)
# Confirm A sees its own write inside its own transaction
cur.execute(f"SELECT COUNT(*) FROM {table}")
assert cur.fetchone() == (1,), "A should see its own write pre-release"
assert a_conn._in_transaction, "A's connection should be in_transaction"
finally:
pool.release(a_conn) # NO commit — this is the critical case
# Request B: acquire the same connection (max_size=1 guarantees reuse).
# Note: we don't assert on ``_in_transaction`` after acquire — the
# pool's ``_is_alive`` health probe runs SELECT 1 which opens a
# fresh transaction under autocommit=False. The data-correctness
# check (the COUNT below) is the actual ground truth: if Phase 26
# didn't apply, A's uncommitted row would still be visible because
# B would be running INSIDE A's leftover transaction.
b_conn = pool.acquire()
try:
assert b_conn is a_conn, "max_size=1 must yield the same connection"
cur = b_conn.cursor()
cur.execute(f"SELECT COUNT(*) FROM {table}")
(count,) = cur.fetchone()
assert count == 0, (
f"B sees {count} rows — A's uncommitted writes leaked across "
"the pool checkout boundary. Phase 26 fix did not apply."
)
finally:
pool.release(b_conn)
# Cleanup
with pool.connection() as cleanup:
cur = cleanup.cursor()
with __import__("contextlib").suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cleanup.commit()
finally:
pool.close()
def test_committed_writes_survive_pool_checkout(
logged_db_params: ConnParams,
) -> None:
"""Counterpart to the previous test: COMMITTED writes must persist.
This guards against the obvious over-correction — if Phase 26's
rollback also somehow nukes already-committed work (e.g., via a
second BEGIN+ROLLBACK round-trip), the bug fix would itself be
a data-loss bug. This test fails if rollback runs when it
shouldn't.
"""
pool = informix_db.create_pool(
host=logged_db_params.host,
port=logged_db_params.port,
user=logged_db_params.user,
password=logged_db_params.password,
database=logged_db_params.database,
server=logged_db_params.server,
min_size=1,
max_size=1,
)
table = "p26_committed"
try:
with pool.connection() as setup:
cur = setup.cursor()
with __import__("contextlib").suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cur.execute(f"CREATE TABLE {table} (id INT)")
setup.commit()
# Request A: insert + commit + release
with pool.connection() as a_conn:
cur = a_conn.cursor()
cur.execute(f"INSERT INTO {table} VALUES (?)", (42,))
a_conn.commit()
assert not a_conn._in_transaction
# Request B: should see the committed row
with pool.connection() as b_conn:
cur = b_conn.cursor()
cur.execute(f"SELECT id FROM {table}")
assert cur.fetchone() == (42,), (
"Committed write disappeared — Phase 26 rollback ran when "
"it shouldn't have."
)
with pool.connection() as cleanup:
cur = cleanup.cursor()
with __import__("contextlib").suppress(Exception):
cur.execute(f"DROP TABLE {table}")
cleanup.commit()
finally:
pool.close()