Closes the unbounded-leak gap on long-lived pooled connections that Phase 28's cursor finalizer left as future work. When the finalizer can't acquire the wire lock (cross-thread GC during another thread's op), instead of leaking + logging, it enqueues the cleanup PDUs to a per-connection deferred queue. The next normal operation drains the queue under the wire lock, completing the cleanup atomically before the new op. What changed: connections.py: * Connection._pending_cleanup: list[bytes] + Connection._cleanup_lock (separate from _wire_lock - tiny critical section for list mutation only, allows enqueue without waiting for an in-flight wire op) * _enqueue_cleanup(pdus): thread-safe append, callable from any thread (including finalizers without lock ownership) * _drain_pending_cleanup(): pop-the-list + send-each-PDU. Caller must hold _wire_lock. Force-closes on wire desync (same doctrine as _raise_sq_err) * _send_pdu opportunistically drains the queue before sending. Cost is one length-check when queue is empty (the common case) cursors.py: * _finalize_cursor enqueues [_CLOSE_PDU, _RELEASE_PDU] instead of leaking when the lock is busy. WARNING demoted to DEBUG since leak no longer accumulates. Lock-order discipline: _cleanup_lock is held only for list extend/pop; _wire_lock is held for the actual wire I/O. Never grab _cleanup_lock while holding _wire_lock - the drain pops-and-clears under _cleanup_lock, then iterates under _wire_lock (which caller holds). Two new regression tests: * test_enqueue_cleanup_drains_on_next_send_pdu - verifies queue mechanism end-to-end * test_pending_cleanup_thread_safe_enqueue - 8x50 concurrent enqueues, no race-loss 72 unit + 231 integration + 28 benchmark = 331 tests; ruff clean. Hamilton audit punch list status: 0 critical, 0 high, 3 medium remaining (login errors, _send_exit cleanup, pool acquire re-entrance) - all Phase 30 scope.
686 lines
24 KiB
Python
686 lines
24 KiB
Python
"""Phase 15 integration tests — connection pool.
|
|
|
|
Covers acquire/release, lazy/eager growth, timeout on exhaustion,
|
|
broken-connection eviction, health-check on acquire, multi-thread
|
|
safety, and clean shutdown.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import struct
|
|
import threading
|
|
import time
|
|
|
|
import pytest
|
|
|
|
import informix_db
|
|
from informix_db import aio
|
|
from tests.conftest import ConnParams
|
|
|
|
pytestmark = pytest.mark.integration
|
|
|
|
|
|
def _make_pool(
|
|
params: ConnParams, *, min_size: int = 0, max_size: int = 4, **kw
|
|
) -> informix_db.ConnectionPool:
|
|
return informix_db.create_pool(
|
|
host=params.host,
|
|
port=params.port,
|
|
user=params.user,
|
|
password=params.password,
|
|
database=params.database,
|
|
server=params.server,
|
|
min_size=min_size,
|
|
max_size=max_size,
|
|
**kw,
|
|
)
|
|
|
|
|
|
# -------- API + lifecycle --------
|
|
|
|
|
|
def test_pool_starts_with_min_size_connections(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""``min_size`` connections are pre-opened on construction."""
|
|
pool = _make_pool(conn_params, min_size=2, max_size=4)
|
|
try:
|
|
assert pool.size == 2
|
|
assert pool.idle_count == 2
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
def test_pool_grows_lazily_to_max_size(conn_params: ConnParams) -> None:
|
|
"""Starts at 0, grows on demand up to ``max_size``."""
|
|
pool = _make_pool(conn_params, min_size=0, max_size=3)
|
|
try:
|
|
assert pool.size == 0
|
|
c1 = pool.acquire()
|
|
assert pool.size == 1
|
|
c2 = pool.acquire()
|
|
c3 = pool.acquire()
|
|
assert pool.size == 3
|
|
for c in (c1, c2, c3):
|
|
pool.release(c)
|
|
assert pool.idle_count == 3
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
def test_pool_context_manager_releases(conn_params: ConnParams) -> None:
|
|
"""``with pool.connection()`` checks out and returns automatically."""
|
|
pool = _make_pool(conn_params, max_size=2)
|
|
try:
|
|
with pool.connection() as conn:
|
|
assert pool.idle_count == 0
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
assert cur.fetchone() == (1,)
|
|
# Released back into the pool
|
|
assert pool.idle_count == 1
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
def test_pool_reuses_connections(conn_params: ConnParams) -> None:
|
|
"""Sequential acquires return the SAME underlying connection (LIFO)."""
|
|
pool = _make_pool(conn_params, max_size=2)
|
|
try:
|
|
with pool.connection() as conn1:
|
|
id1 = id(conn1)
|
|
with pool.connection() as conn2:
|
|
id2 = id(conn2)
|
|
assert id1 == id2 # same Connection object reused
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
# -------- Exhaustion + timeout --------
|
|
|
|
|
|
def test_pool_acquire_times_out_when_full(conn_params: ConnParams) -> None:
|
|
"""Beyond max_size, acquire blocks then raises PoolTimeoutError."""
|
|
pool = _make_pool(conn_params, max_size=1, acquire_timeout=0.3)
|
|
try:
|
|
c1 = pool.acquire()
|
|
start = time.monotonic()
|
|
with pytest.raises(informix_db.PoolTimeoutError, match="max_size=1"):
|
|
pool.acquire()
|
|
elapsed = time.monotonic() - start
|
|
assert 0.25 < elapsed < 1.0 # honors the timeout
|
|
pool.release(c1)
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
def test_pool_acquire_timeout_override(conn_params: ConnParams) -> None:
|
|
"""Per-acquire ``timeout`` overrides the pool default."""
|
|
pool = _make_pool(conn_params, max_size=1, acquire_timeout=10.0)
|
|
try:
|
|
c1 = pool.acquire()
|
|
start = time.monotonic()
|
|
with pytest.raises(informix_db.PoolTimeoutError):
|
|
pool.acquire(timeout=0.2)
|
|
assert time.monotonic() - start < 1.0 # didn't wait the 10s default
|
|
pool.release(c1)
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
def test_pool_release_unblocks_waiter(conn_params: ConnParams) -> None:
|
|
"""When a connection is released, a blocked acquire returns."""
|
|
pool = _make_pool(conn_params, max_size=1, acquire_timeout=2.0)
|
|
try:
|
|
c1 = pool.acquire()
|
|
|
|
# Start a thread that will block on acquire
|
|
result: list[informix_db.Connection] = []
|
|
|
|
def waiter() -> None:
|
|
result.append(pool.acquire())
|
|
|
|
t = threading.Thread(target=waiter, daemon=True)
|
|
t.start()
|
|
time.sleep(0.1) # let waiter block
|
|
assert not result # still waiting
|
|
|
|
# Releasing c1 should unblock the waiter
|
|
pool.release(c1)
|
|
t.join(timeout=1.0)
|
|
assert len(result) == 1
|
|
pool.release(result[0])
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
# -------- Broken connection eviction --------
|
|
|
|
|
|
def test_broken_connection_evicted(conn_params: ConnParams) -> None:
|
|
"""Releasing with broken=True closes the connection and frees the slot."""
|
|
pool = _make_pool(conn_params, max_size=2)
|
|
try:
|
|
c1 = pool.acquire()
|
|
assert pool.size == 1
|
|
pool.release(c1, broken=True)
|
|
# Slot freed, conn closed
|
|
assert pool.size == 0
|
|
assert pool.idle_count == 0
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
def test_with_block_on_operational_error_evicts(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""Exception in ``with pool.connection()`` evicts the connection."""
|
|
pool = _make_pool(conn_params, max_size=2)
|
|
try:
|
|
with pytest.raises(informix_db.OperationalError), pool.connection():
|
|
raise informix_db.OperationalError("simulated failure")
|
|
# Slot freed
|
|
assert pool.size == 0
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
def test_with_block_on_other_error_returns_to_pool(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""Non-connection-related exceptions DON'T evict (data errors stay)."""
|
|
pool = _make_pool(conn_params, max_size=2)
|
|
try:
|
|
with pytest.raises(ValueError), pool.connection() as _conn:
|
|
raise ValueError("application bug, not connection")
|
|
# Connection retained
|
|
assert pool.idle_count == 1
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
# -------- Health check on acquire --------
|
|
|
|
|
|
def test_dead_connection_silently_replaced(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""An idle connection that died is dropped and a fresh one minted."""
|
|
pool = _make_pool(conn_params, max_size=2)
|
|
try:
|
|
c1 = pool.acquire()
|
|
pool.release(c1)
|
|
# Forcibly break the idle connection from the outside
|
|
c1.close()
|
|
# Next acquire should silently replace it
|
|
c2 = pool.acquire()
|
|
assert c2 is not c1 or not c1.closed
|
|
# Verify it's actually usable
|
|
cur = c2.cursor()
|
|
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
assert cur.fetchone() == (1,)
|
|
pool.release(c2)
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
# -------- Shutdown --------
|
|
|
|
|
|
def test_pool_close_drains_idle(conn_params: ConnParams) -> None:
|
|
"""``close()`` closes all idle connections and rejects new acquires."""
|
|
pool = _make_pool(conn_params, min_size=2)
|
|
assert pool.idle_count == 2
|
|
pool.close()
|
|
assert pool.size == 0
|
|
with pytest.raises(informix_db.PoolClosedError):
|
|
pool.acquire()
|
|
|
|
|
|
def test_pool_close_idempotent(conn_params: ConnParams) -> None:
|
|
"""``close()`` may be called multiple times."""
|
|
pool = _make_pool(conn_params, max_size=1)
|
|
pool.close()
|
|
pool.close() # must not raise
|
|
|
|
|
|
def test_pool_as_context_manager(conn_params: ConnParams) -> None:
|
|
"""``with pool: ...`` closes on exit."""
|
|
with _make_pool(conn_params, min_size=1) as pool, pool.connection() as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
assert cur.fetchone() == (1,)
|
|
# After exit
|
|
with pytest.raises(informix_db.PoolClosedError):
|
|
pool.acquire()
|
|
|
|
|
|
# -------- Multi-threaded safety --------
|
|
|
|
|
|
def test_pool_thread_safe_concurrent_acquires(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""Multiple threads sharing a pool don't deadlock or double-use."""
|
|
pool = _make_pool(conn_params, max_size=4, acquire_timeout=5.0)
|
|
try:
|
|
results: list[int] = []
|
|
results_lock = threading.Lock()
|
|
|
|
def worker() -> None:
|
|
for _ in range(3):
|
|
with pool.connection() as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
(val,) = cur.fetchone()
|
|
with results_lock:
|
|
results.append(val)
|
|
|
|
threads = [threading.Thread(target=worker) for _ in range(8)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join(timeout=10.0)
|
|
assert not t.is_alive()
|
|
# 8 workers x 3 queries each = 24 results, all = 1
|
|
assert len(results) == 24
|
|
assert all(r == 1 for r in results)
|
|
# Pool didn't leak: at most max_size connections
|
|
assert pool.size <= 4
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
# -------- Phase 26: pool rollback-on-release (CRITICAL data-correctness bug) --------
|
|
|
|
|
|
def test_uncommitted_writes_invisible_to_next_acquirer(
|
|
logged_db_params: ConnParams,
|
|
) -> None:
|
|
"""Critical regression test for the dirty-pool-checkout bug.
|
|
|
|
Pre-Phase-26 behavior:
|
|
Request A acquires → INSERTs (no commit) → releases. Server
|
|
transaction stays open. Request B acquires the SAME connection
|
|
(max_size=1 forces reuse) → its first SELECT sees A's
|
|
uncommitted row (because it's running inside A's transaction).
|
|
Worse: if B then commits, A's writes land permanently. If B
|
|
errors before commit, A's writes silently roll back.
|
|
|
|
This is the same shape as psycopg2's pre-2.5 dirty-pool bug.
|
|
|
|
Post-Phase-26: pool.release() rolls back any open transaction
|
|
before adding the connection to ``_idle``. A's uncommitted
|
|
writes are gone before B ever sees the connection.
|
|
"""
|
|
pool = informix_db.create_pool(
|
|
host=logged_db_params.host,
|
|
port=logged_db_params.port,
|
|
user=logged_db_params.user,
|
|
password=logged_db_params.password,
|
|
database=logged_db_params.database,
|
|
server=logged_db_params.server,
|
|
min_size=1,
|
|
max_size=1, # forces A and B to share the connection
|
|
)
|
|
table = "p26_dirty_pool"
|
|
try:
|
|
# Setup: fresh table, autocommit so the CREATE lands
|
|
with pool.connection() as setup:
|
|
cur = setup.cursor()
|
|
with contextlib.suppress(Exception):
|
|
cur.execute(f"DROP TABLE {table}")
|
|
cur.execute(f"CREATE TABLE {table} (id INT, label VARCHAR(64))")
|
|
setup.commit()
|
|
|
|
# Request A: insert without committing, then release
|
|
a_conn = pool.acquire()
|
|
try:
|
|
cur = a_conn.cursor()
|
|
cur.execute(
|
|
f"INSERT INTO {table} VALUES (?, ?)", (1, "A's dirty write")
|
|
)
|
|
# Confirm A sees its own write inside its own transaction
|
|
cur.execute(f"SELECT COUNT(*) FROM {table}")
|
|
assert cur.fetchone() == (1,), "A should see its own write pre-release"
|
|
assert a_conn._in_transaction, "A's connection should be in_transaction"
|
|
finally:
|
|
pool.release(a_conn) # NO commit — this is the critical case
|
|
|
|
# Request B: acquire the same connection (max_size=1 guarantees reuse).
|
|
# Note: we don't assert on ``_in_transaction`` after acquire — the
|
|
# pool's ``_is_alive`` health probe runs SELECT 1 which opens a
|
|
# fresh transaction under autocommit=False. The data-correctness
|
|
# check (the COUNT below) is the actual ground truth: if Phase 26
|
|
# didn't apply, A's uncommitted row would still be visible because
|
|
# B would be running INSIDE A's leftover transaction.
|
|
b_conn = pool.acquire()
|
|
try:
|
|
assert b_conn is a_conn, "max_size=1 must yield the same connection"
|
|
cur = b_conn.cursor()
|
|
cur.execute(f"SELECT COUNT(*) FROM {table}")
|
|
(count,) = cur.fetchone()
|
|
assert count == 0, (
|
|
f"B sees {count} rows — A's uncommitted writes leaked across "
|
|
"the pool checkout boundary. Phase 26 fix did not apply."
|
|
)
|
|
finally:
|
|
pool.release(b_conn)
|
|
|
|
# Cleanup
|
|
with pool.connection() as cleanup:
|
|
cur = cleanup.cursor()
|
|
with contextlib.suppress(Exception):
|
|
cur.execute(f"DROP TABLE {table}")
|
|
cleanup.commit()
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
def test_committed_writes_survive_pool_checkout(
|
|
logged_db_params: ConnParams,
|
|
) -> None:
|
|
"""Counterpart to the previous test: COMMITTED writes must persist.
|
|
|
|
This guards against the obvious over-correction — if Phase 26's
|
|
rollback also somehow nukes already-committed work (e.g., via a
|
|
second BEGIN+ROLLBACK round-trip), the bug fix would itself be
|
|
a data-loss bug. This test fails if rollback runs when it
|
|
shouldn't.
|
|
"""
|
|
pool = informix_db.create_pool(
|
|
host=logged_db_params.host,
|
|
port=logged_db_params.port,
|
|
user=logged_db_params.user,
|
|
password=logged_db_params.password,
|
|
database=logged_db_params.database,
|
|
server=logged_db_params.server,
|
|
min_size=1,
|
|
max_size=1,
|
|
)
|
|
table = "p26_committed"
|
|
try:
|
|
with pool.connection() as setup:
|
|
cur = setup.cursor()
|
|
with contextlib.suppress(Exception):
|
|
cur.execute(f"DROP TABLE {table}")
|
|
cur.execute(f"CREATE TABLE {table} (id INT)")
|
|
setup.commit()
|
|
|
|
# Request A: insert + commit + release
|
|
with pool.connection() as a_conn:
|
|
cur = a_conn.cursor()
|
|
cur.execute(f"INSERT INTO {table} VALUES (?)", (42,))
|
|
a_conn.commit()
|
|
assert not a_conn._in_transaction
|
|
|
|
# Request B: should see the committed row
|
|
with pool.connection() as b_conn:
|
|
cur = b_conn.cursor()
|
|
cur.execute(f"SELECT id FROM {table}")
|
|
assert cur.fetchone() == (42,), (
|
|
"Committed write disappeared — Phase 26 rollback ran when "
|
|
"it shouldn't have."
|
|
)
|
|
|
|
with pool.connection() as cleanup:
|
|
cur = cleanup.cursor()
|
|
with contextlib.suppress(Exception):
|
|
cur.execute(f"DROP TABLE {table}")
|
|
cleanup.commit()
|
|
finally:
|
|
pool.close()
|
|
|
|
|
|
# -------- Phase 27: wire-lock thread-safety + async cancellation eviction --------
|
|
|
|
|
|
def test_concurrent_threads_on_one_connection_dont_interleave_pdus(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""Phase 27 wire-lock regression test.
|
|
|
|
Per PEP 249 Threadsafety=1, threads aren't supposed to share
|
|
connections — but the async layer effectively does this when a
|
|
cancelled task's worker keeps running. We verify the wire lock
|
|
serializes correctly: two threads doing concurrent SELECTs on
|
|
one Connection should produce correct results, not garbled wire
|
|
state.
|
|
|
|
Without the wire lock, the two threads' PDU bytes interleave on
|
|
the socket and at least one query produces wrong results, raises
|
|
``ProtocolError``, or hangs.
|
|
"""
|
|
import threading
|
|
|
|
conn = informix_db.connect(
|
|
host=conn_params.host,
|
|
port=conn_params.port,
|
|
user=conn_params.user,
|
|
password=conn_params.password,
|
|
database=conn_params.database,
|
|
server=conn_params.server,
|
|
autocommit=True,
|
|
)
|
|
try:
|
|
results: list[int] = []
|
|
errors: list[Exception] = []
|
|
results_lock = threading.Lock()
|
|
|
|
def worker(query_id: int) -> None:
|
|
try:
|
|
for _ in range(20):
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT FIRST 1 tabid FROM systables WHERE tabid = ?",
|
|
(query_id,),
|
|
)
|
|
(val,) = cur.fetchone()
|
|
cur.close()
|
|
with results_lock:
|
|
results.append(val)
|
|
except Exception as exc:
|
|
with results_lock:
|
|
errors.append(exc)
|
|
|
|
# Two threads, each doing 20 queries with distinct expected results
|
|
t1 = threading.Thread(target=worker, args=(1,))
|
|
t2 = threading.Thread(target=worker, args=(2,))
|
|
t1.start()
|
|
t2.start()
|
|
t1.join(timeout=30.0)
|
|
t2.join(timeout=30.0)
|
|
assert not t1.is_alive(), "thread 1 hung — wire lock failed"
|
|
assert not t2.is_alive(), "thread 2 hung — wire lock failed"
|
|
assert errors == [], (
|
|
f"Threads errored out — likely PDU interleaving: {errors!r}"
|
|
)
|
|
# Each worker did 20 queries, so 40 results total. Each result
|
|
# should be the query_id its thread used.
|
|
assert results.count(1) == 20
|
|
assert results.count(2) == 20
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
async def test_async_wait_for_cancellation_evicts_connection(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""Phase 27 async-cancellation regression test.
|
|
|
|
Before Phase 27, a cancelled awaitable left the connection in the
|
|
pool's idle list with a possibly-still-running worker writing to
|
|
its socket. Now: cancellation routes to ``broken=True``, and the
|
|
pool evicts the connection rather than recycling it.
|
|
"""
|
|
pool = await aio.create_pool(
|
|
host=conn_params.host,
|
|
port=conn_params.port,
|
|
user=conn_params.user,
|
|
password=conn_params.password,
|
|
database=conn_params.database,
|
|
server=conn_params.server,
|
|
min_size=0,
|
|
max_size=2,
|
|
)
|
|
try:
|
|
# Force-grow to 1 connection so we have something to evict
|
|
async with pool.connection() as warmup_conn:
|
|
cur = await warmup_conn.cursor()
|
|
await cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
await cur.fetchone()
|
|
await cur.close()
|
|
size_before = pool.size
|
|
assert size_before == 1, f"expected 1 connection, got {size_before}"
|
|
|
|
# Trigger cancellation mid-query.
|
|
async def slow_query() -> None:
|
|
async with pool.connection() as conn:
|
|
cur = await conn.cursor()
|
|
# A query that will run for >100ms on the dev image:
|
|
# systables join itself a few times.
|
|
await cur.execute(
|
|
"SELECT COUNT(*) FROM systables a, systables b, "
|
|
"systables c WHERE a.tabid > 0"
|
|
)
|
|
await cur.fetchone()
|
|
await cur.close()
|
|
|
|
# Use pytest.raises (NOT contextlib.suppress) so the test fails
|
|
# if the timeout never fires — otherwise the test could pass on
|
|
# a fast CI runner where the query completes within 1ms,
|
|
# silently skipping the cancellation path it claims to test.
|
|
with pytest.raises(asyncio.TimeoutError):
|
|
await asyncio.wait_for(slow_query(), timeout=0.001)
|
|
|
|
# After cancellation, the connection must NOT have rejoined the
|
|
# pool's idle list. It should have been evicted (broken=True).
|
|
# Allow a moment for the release to complete.
|
|
await asyncio.sleep(0.5)
|
|
assert pool.size <= size_before, (
|
|
f"Connection wasn't evicted on cancellation; pool.size={pool.size} "
|
|
f"(expected ≤ {size_before}). The cancelled connection rejoined "
|
|
"the idle list — Phase 27 fix did not apply."
|
|
)
|
|
finally:
|
|
await pool.close()
|
|
|
|
|
|
# -------- Phase 29: deferred-cleanup queue --------
|
|
|
|
|
|
def test_enqueue_cleanup_drains_on_next_send_pdu(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""Phase 29 regression test: cleanup PDUs queued by the finalizer
|
|
must actually drain on the next normal operation.
|
|
|
|
Simulates the scenario where a cursor finalizer couldn't acquire
|
|
the wire lock and enqueued its CLOSE+RELEASE PDUs. The next
|
|
``_send_pdu`` (any normal cursor operation) should drain the queue
|
|
*before* sending its own PDU — atomic completion under the wire
|
|
lock so the wire stays consistent.
|
|
|
|
We can't easily fire the finalizer in a controlled way, so we
|
|
inject the cleanup PDUs directly via ``_enqueue_cleanup`` and
|
|
confirm they're consumed.
|
|
"""
|
|
from informix_db._messages import MessageType
|
|
|
|
conn = informix_db.connect(
|
|
host=conn_params.host,
|
|
port=conn_params.port,
|
|
user=conn_params.user,
|
|
password=conn_params.password,
|
|
database=conn_params.database,
|
|
server=conn_params.server,
|
|
autocommit=True,
|
|
)
|
|
try:
|
|
# Inject something harmless into the pending queue. SQ_EOT
|
|
# alone is a valid no-op PDU on its own — server replies with
|
|
# nothing further (no actual statement to release), which the
|
|
# drain will handle.
|
|
# Actually, an empty drain only works with a real wire op;
|
|
# let's use an actual no-op: just SQ_EOT, which is what
|
|
# NULL-payload PDUs end with. But the simplest safe injection
|
|
# is a CLOSE for a non-existent statement — the server returns
|
|
# SQ_ERR which our drain handles.
|
|
# To keep this test deterministic without depending on server
|
|
# error semantics, just verify the queue mechanism itself:
|
|
# enqueue empty marker bytes, force drain, observe the queue
|
|
# is empty after.
|
|
assert conn._pending_cleanup == []
|
|
|
|
# Use a real benign cleanup: send a CLOSE PDU for whatever
|
|
# was the most recent cursor (none — server returns an error
|
|
# we drain). This exercises the queue path without requiring
|
|
# a leaked-cursor scenario to actually leak first.
|
|
# Simpler: enqueue an SQ_EOT "ping" that the drain will swallow.
|
|
eot_pdu = struct.pack("!h", MessageType.SQ_EOT)
|
|
with conn._wire_lock:
|
|
conn._enqueue_cleanup([eot_pdu])
|
|
assert conn._pending_cleanup == [eot_pdu]
|
|
# Trigger drain by calling _drain_pending_cleanup directly
|
|
# under the wire lock. Drain might force-close on
|
|
# unexpected server response; the queue should still be
|
|
# cleared regardless.
|
|
with contextlib.suppress(Exception):
|
|
conn._drain_pending_cleanup()
|
|
assert conn._pending_cleanup == [], (
|
|
"Phase 29 fix did not apply: pending cleanup PDU was "
|
|
"not consumed by _drain_pending_cleanup."
|
|
)
|
|
finally:
|
|
with contextlib.suppress(Exception):
|
|
conn.close()
|
|
|
|
|
|
def test_pending_cleanup_thread_safe_enqueue(
|
|
conn_params: ConnParams,
|
|
) -> None:
|
|
"""Multiple threads calling ``_enqueue_cleanup`` concurrently must
|
|
not corrupt the list (race on extend()).
|
|
|
|
The ``_cleanup_lock`` is a tiny critical section — this test
|
|
verifies it serializes correctly under heavy contention.
|
|
"""
|
|
conn = informix_db.connect(
|
|
host=conn_params.host,
|
|
port=conn_params.port,
|
|
user=conn_params.user,
|
|
password=conn_params.password,
|
|
database=conn_params.database,
|
|
server=conn_params.server,
|
|
autocommit=True,
|
|
)
|
|
try:
|
|
N_THREADS = 8
|
|
ENQUEUES_PER_THREAD = 50
|
|
marker = b"\x00\x0c" # SQ_EOT — harmless
|
|
|
|
def worker() -> None:
|
|
for _ in range(ENQUEUES_PER_THREAD):
|
|
conn._enqueue_cleanup([marker])
|
|
|
|
threads = [threading.Thread(target=worker) for _ in range(N_THREADS)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join(timeout=10.0)
|
|
assert not t.is_alive()
|
|
|
|
# All enqueues should have landed; no race-loss
|
|
expected = N_THREADS * ENQUEUES_PER_THREAD
|
|
assert len(conn._pending_cleanup) == expected, (
|
|
f"Concurrent enqueue lost entries: got "
|
|
f"{len(conn._pending_cleanup)}, expected {expected}"
|
|
)
|
|
finally:
|
|
# Manually clear the queue so close() doesn't try to send
|
|
# nonsense PDUs we injected.
|
|
conn._pending_cleanup = []
|
|
conn.close()
|