From 6afdbcabb3e389b400704694f7d86541b6e4cf3d Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Tue, 5 May 2026 03:40:39 -0600 Subject: [PATCH] Phase 27: Wire lock + async cancellation eviction (2026.05.05.1) Closes Hamilton audit Critical #2 (concurrency / wire lock) and High #3 (async cancellation evicts cleanly). Phase 26 fixed what gets returned to the pool; Phase 27 fixes what can interleave on the wire while it's running. What changed: connections.py: * Added Connection._wire_lock = threading.RLock(). Wrapped commit(), rollback(), fast_path_call() under the lock. * _ensure_transaction documents the lock as a precondition AND asserts ownership at runtime (_wire_lock._is_owned()) so a future caller adding a third call site fails loudly. * close() tries to acquire wire lock with 0.5s timeout before SQ_EXIT; skips polite exit and force-closes if busy. cursors.py: * execute() body extracted into _execute_under_wire_lock() and called under the lock. * executemany() body wrapped inline. * _sfetch_at() wrapped - covers all scrollable fetch_* methods that delegate to it. * close() locks the CLOSE+RELEASE for scrollable cursors. pool.py: * release() acquires conn._wire_lock with 5s timeout before rollback. On timeout: log WARNING, evict connection. Constant _RELEASE_WIRE_LOCK_TIMEOUT for tunability. aio.py: * AsyncConnectionPool.connection() now catches CancelledError / TimeoutError separately and routes to broken=True. Combined with the wire lock, asyncio.wait_for around aio DB calls is now safe. * Updated docstring; mirrored in docs/USAGE.md. Margaret Hamilton review surfaced three actionable conditions, all addressed before tagging: * Cancellation test used contextlib.suppress - could pass without exercising the cancellation path on a fast runner. Switched to pytest.raises so the test fails if timeout doesn't fire. * _ensure_transaction precondition documented but unchecked at runtime. Added assert self._wire_lock._is_owned() guard. * Connection.close() was unsynchronized. Now tries 0.5s acquire before SQ_EXIT. Two new regression tests in tests/test_pool.py: * test_concurrent_threads_on_one_connection_dont_interleave_pdus (without lock: garbled results / hangs) * test_async_wait_for_cancellation_evicts_connection (asserts pool size shrinks; cancellation actually fires) 72 unit + 228 integration + 28 benchmark = 328 tests; ruff clean. Hamilton verdict: PRODUCTION READY WITH CAVEATS (was) -> CAVEATS NARROWED FURTHER (now). 0 critical, 2 high remaining (cursor finalizers + bare-except in error drain) - both Phase 28 scope. --- CHANGELOG.md | 60 ++++++++++++ docs/USAGE.md | 15 +-- pyproject.toml | 2 +- src/informix_db/aio.py | 52 +++++++---- src/informix_db/connections.py | 165 +++++++++++++++++++++------------ src/informix_db/cursors.py | 99 +++++++++++++------- src/informix_db/pool.py | 52 ++++++++--- tests/test_pool.py | 145 ++++++++++++++++++++++++++++- uv.lock | 2 +- 9 files changed, 456 insertions(+), 136 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 002b89f..a73876d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,66 @@ All notable changes to `informix-db`. Versioning is [CalVer](https://calver.org/) — `YYYY.MM.DD` for date-based releases, `YYYY.MM.DD.N` for same-day post-releases per PEP 440. +## 2026.05.05.1 — Wire lock + async cancellation eviction (Phase 27) + +Closes Hamilton audit findings **Critical #2** (concurrency / wire lock) and **High #3** (async cancellation evicts cleanly). Phase 26 fixed *what gets returned* to the pool; Phase 27 fixes *what can interleave* on the wire while it's running. + +### What changed + +**1. Per-connection wire lock** (`src/informix_db/connections.py`): +- Added `Connection._wire_lock = threading.RLock()`. Wrapped `commit()`, `rollback()`, and `fast_path_call()` in `with self._wire_lock:`. +- `_ensure_transaction()` documents the lock as a precondition and **asserts ownership** (`self._wire_lock._is_owned()`) — a future caller adding a third call site fails loudly in tests rather than corrupting wire state in production. +- `Connection.close()` now tries to acquire the wire lock with a 0.5s timeout before sending `SQ_EXIT`. If another thread is mid-operation, skip the polite exit and force-close the socket; the in-flight thread observes EOF on its next read. +- RLock (not Lock) because `pool.release()` holds the lock with timeout, then calls `conn.rollback()` which itself acquires. + +**2. Cursor wire methods locked** (`src/informix_db/cursors.py`): +- `Cursor.execute()` body extracted into `_execute_under_wire_lock()` and called under the lock. +- `Cursor.executemany()` body wrapped inline. +- `Cursor._sfetch_at()` (the SQ_SFETCH primitive used by every scrollable fetch_* method) wrapped — every scrollable cursor op gets the lock for free. +- `Cursor.close()` acquires the lock for the CLOSE+RELEASE on scrollable cursors. +- `read_blob_column` and `write_blob_column` inherit through their internal `self.execute()` calls. + +**3. Pool release with timeout-acquire** (`src/informix_db/pool.py`): +- `release()` now acquires `conn._wire_lock` with a `_RELEASE_WIRE_LOCK_TIMEOUT = 5.0` budget before rolling back. If a still-running worker thread holds the lock past 5s, the connection is evicted instead of recycled. Logged at WARNING level via the Phase 26 logger. + +**4. Async cancellation → eviction** (`src/informix_db/aio.py`): +- `AsyncConnectionPool.connection()` now catches `(asyncio.CancelledError, asyncio.TimeoutError)` separately and routes them to `broken=True`. Combined with the wire lock, this means `asyncio.wait_for` around `aio` DB calls is now safe — the connection is either successfully released (worker finished in time) or evicted (worker exceeded the timeout); never returned to the pool in a poisoned state. +- Removed the Phase 26 cancellation warning from the docstring; now describes the new safety guarantee explicitly. +- Mirrored in `docs/USAGE.md` async section. + +### Margaret Hamilton review pass + +Two passes (Phase 26 review + system-wide audit) had already shaped this design. The Phase 27 review surfaced three actionable conditions: + +- **Test reliability**: the cancellation regression test used `contextlib.suppress(asyncio.TimeoutError)` — silently passing if the timeout never fired (e.g., on a fast CI runner where the query completes within 1ms). **Fixed**: switched to `pytest.raises(asyncio.TimeoutError)` so the test fails if the cancellation path isn't actually exercised. +- **Defensive guard for `_ensure_transaction`**: documented "caller must hold the wire lock" as a precondition, but no runtime check. **Fixed**: added `assert self._wire_lock._is_owned()` so a future caller forgetting to lock fails loudly in tests. +- **Symmetry in `Connection.close()`**: the polite SQ_EXIT was unsynchronized — could interleave with another thread's PDU. **Fixed**: try-acquire with 0.5s timeout; if busy, skip SQ_EXIT and force-close. + +Plus one cross-phase note: Phase 27 makes Hamilton's High #5 (cursor finalizers) more visible, because cross-thread `__del__` invocation could deadlock on the wire lock. Tracked for Phase 28; Phase 27 doesn't introduce the underlying hazard. + +### Tests + +Two new regression tests in `tests/test_pool.py`: + +- **`test_concurrent_threads_on_one_connection_dont_interleave_pdus`** — two threads each running 20 distinct queries on a shared `Connection`. Without the wire lock, PDU interleaving causes wrong results, ProtocolError, or hangs. With the lock, both threads complete with correct results. +- **`test_async_wait_for_cancellation_evicts_connection`** — spawns a slow query under `asyncio.wait_for(timeout=0.001)`, asserts (via `pytest.raises`) that the timeout actually fires, then verifies pool size shrinks (connection evicted, not returned to idle). + +Total: 72 unit + 228 integration + 28 benchmark = **328 tests**. + +### Hamilton verdict trajectory + +| Audit pass | Verdict | +|---|---| +| Phase 21 era | (no audit yet) | +| System-wide audit (pre-Phase 26) | PRODUCTION READY WITH CAVEATS — 2 critical, 3 high | +| Post-Phase 26 | CAVEATS NARROWED — 1 critical, 3 high | +| **Post-Phase 27** | **CAVEATS NARROWED FURTHER — 0 critical, 2 high** | + +Remaining audit items (deferred to Phase 28): +- High #4: bare `except: pass` in `_raise_sq_err` drain +- High #5: no cursor finalizers (server-side resource leak on mid-fetch raise) +- Plus 5 medium one-liners + ## 2026.05.05 — Pool rollback-on-release (Phase 26): CRITICAL data-correctness fix Fixes the dirty-pool-checkout bug surfaced by Margaret Hamilton's system-wide audit. **This is the most important fix in the project's history so far** — it eliminates a class of silent data-correctness failures that affect any application using both the connection pool and non-autocommit transactions. diff --git a/docs/USAGE.md b/docs/USAGE.md index 7c88d0a..12db418 100644 --- a/docs/USAGE.md +++ b/docs/USAGE.md @@ -494,21 +494,22 @@ await pool.close() The async API mirrors the sync API one-to-one. Each blocking I/O call is offloaded to a worker thread via `asyncio.to_thread` — the event loop never blocks; concurrent queries across an `asyncio.gather` actually run in parallel up to `max_size`. -### Cancellation caveat — don't wrap `aio` calls with `asyncio.wait_for` +### Cancellation and timeouts -`asyncio.to_thread` does not interrupt the underlying worker thread when the awaitable is cancelled. If you wrap a query in `asyncio.wait_for(...)` and the timeout fires, the worker thread keeps running on the socket while the connection is being released back to the pool — and the pool's release path runs its own wire I/O (the Phase 26 transaction rollback). Two threads writing to one socket = wire desync = a poisoned connection in the pool. - -**Use connection-level timeouts instead:** +Both styles are safe under Phase 27: ```python -# Good — socket-level timeout, no to_thread race +# Connection-level — socket-layer timeout, raises OperationalError conn = await aio.connect(..., read_timeout=30.0) -# Bad — until Phase 27 lands the per-connection wire lock +# Awaitable-level — works because the pool evicts on CancelledError +# and the per-connection wire lock prevents interleaved I/O await asyncio.wait_for(cur.execute(big_query), timeout=30.0) ``` -`connect_timeout` and `read_timeout` apply at the socket layer; on a frozen server they raise a clean `OperationalError` and the cursor/connection state stays consistent. +How it works: every wire op acquires the connection's `_wire_lock` (a re-entrant lock). When an awaitable is cancelled, the underlying `to_thread` worker may still be running — but the pool's `release()` waits up to 5 seconds for the lock. If the worker finishes in time, normal release proceeds (with a transaction rollback if needed). If it doesn't, the connection is evicted instead of recycled. The pool never returns a connection that two threads are touching. + +Pick whichever timeout style fits your code; you don't need to choose for safety reasons. ## TLS diff --git a/pyproject.toml b/pyproject.toml index ca56819..304ddd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "informix-db" -version = "2026.05.05" +version = "2026.05.05.1" description = "Pure-Python driver for IBM Informix IDS — speaks the SQLI wire protocol over raw sockets. No CSDK, no JVM, no native libraries." readme = "README.md" license = { text = "MIT" } diff --git a/src/informix_db/aio.py b/src/informix_db/aio.py index bbfde19..b4ad077 100644 --- a/src/informix_db/aio.py +++ b/src/informix_db/aio.py @@ -15,24 +15,28 @@ event loop yields during each ``await``; a worker thread does the actual socket I/O. Only differs for thousands-of-concurrent-connections workloads, which need native-async (Phase 17 if anyone asks). -.. warning:: +.. note:: - **Cancellation caveat.** ``asyncio.to_thread`` does not interrupt - the underlying OS thread when the awaitable is cancelled — the - thread keeps running until the sync call returns naturally. If you - wrap a query in ``asyncio.wait_for`` and the timeout fires: + **Cancellation handling (Phase 27).** ``asyncio.to_thread`` does + not interrupt the underlying worker thread when the awaitable is + cancelled — the thread keeps running until the sync call returns + naturally. The driver handles this in two ways: - 1. The async call raises ``TimeoutError``. - 2. The worker thread is still mid-I/O on the socket. - 3. The connection's pool ``release()`` runs (under Phase 26 it - does its own wire I/O for the rollback). - 4. The two threads can interleave bytes on the socket → wire desync. + 1. Every wire operation acquires the connection's ``_wire_lock`` + (an ``RLock``). Two threads — including a still-running worker + and the pool's release path — cannot interleave bytes on the + socket; the second blocks until the first releases. + 2. The async pool's ``connection()`` context manager evicts the + connection (``broken=True``) on ``CancelledError`` / + ``TimeoutError``, so a partially-cancelled query never returns + to the idle list. ``pool.release()`` waits up to 5 seconds for + the wire lock; if the worker is still busy past that, the + connection is evicted instead of recycled. - Until Phase 27 lands a per-connection wire lock, **avoid wrapping - ``aio`` DB calls with ``asyncio.wait_for``**. Use the connection's - ``connect_timeout`` and ``read_timeout`` parameters instead — those - apply at the socket level (no to_thread race) and produce clean - ``OperationalError`` on timeout. + Net effect: ``asyncio.wait_for`` around ``aio`` DB calls is safe. + The connection is either successfully released (worker finished + in time) or evicted (worker exceeded the timeout); never + returned to the pool in a poisoned state. Usage:: @@ -259,11 +263,27 @@ class AsyncConnectionPool: async def connection( self, timeout: float | None = None ) -> AsyncIterator[AsyncConnection]: - """Async context-manager wrapper around acquire/release.""" + """Async context-manager wrapper around acquire/release. + + Phase 27: cancellation and timeouts route through ``broken=True``. + ``asyncio.to_thread`` does not interrupt the underlying worker + when the awaitable is cancelled — the worker keeps running on + the socket. If the connection were returned to the pool while + the worker is still mid-write, the next acquirer would inherit + a desynchronized wire. Evicting on cancellation prevents that + (combined with the wire lock the pool's ``release()`` acquires + with a timeout — see ``pool.release``). + """ conn = await self.acquire(timeout=timeout) broken = False try: yield conn + except (asyncio.CancelledError, asyncio.TimeoutError): + # Cancellation or wait_for timeout. The to_thread worker + # may still be running; we cannot trust the connection's + # wire state. Evict. + broken = True + raise except Exception as e: # Mirror sync pool's eviction policy: connection-related # errors evict, application errors retain. diff --git a/src/informix_db/connections.py b/src/informix_db/connections.py index 3b89e6c..2c4989d 100644 --- a/src/informix_db/connections.py +++ b/src/informix_db/connections.py @@ -128,6 +128,17 @@ class Connection: self._autocommit = autocommit self._closed = False self._lock = threading.Lock() + # Phase 27: per-connection wire lock. Held for the duration of + # every send-PDU + drain-response round-trip. Two threads on + # one connection (or async cancellation leaving a worker still + # mid-operation) can no longer interleave bytes on the socket — + # the second thread blocks until the first releases. + # + # RLock (not Lock) because the Pool's release() path acquires + # this lock with a timeout, then calls ``conn.rollback()`` — + # which itself acquires the lock. Same thread, two acquires. + # Reentrance must be cheap and correct. + self._wire_lock = threading.RLock() # Logged-DB transaction state: True iff there's an open server-side # transaction (SQ_BEGIN sent, not yet committed/rolled-back). The # cursor uses this to decide whether to send an implicit SQ_BEGIN @@ -235,9 +246,12 @@ class Connection: # only set the flag after a successful BEGIN, so this branch # also covers "no DML happened since last commit/rollback". return - self._sock.write_all(struct.pack("!hh", MessageType.SQ_CMMTWORK, MessageType.SQ_EOT)) - self._drain_to_eot() - self._in_transaction = False + with self._wire_lock: + self._sock.write_all( + struct.pack("!hh", MessageType.SQ_CMMTWORK, MessageType.SQ_EOT) + ) + self._drain_to_eot() + self._in_transaction = False def rollback(self) -> None: """Roll back the current transaction (SQ_RBWORK). @@ -255,11 +269,12 @@ class Connection: # The savepoint short is REQUIRED — sending SQ_RBWORK alone hangs # the server (it's waiting for the next 2 bytes). SQ_CMMTWORK, # by contrast, takes no payload — confirmed in IfxSqli.sendCommit. - self._sock.write_all( - struct.pack("!hhh", MessageType.SQ_RBWORK, 0, MessageType.SQ_EOT) - ) - self._drain_to_eot() - self._in_transaction = False + with self._wire_lock: + self._sock.write_all( + struct.pack("!hhh", MessageType.SQ_RBWORK, 0, MessageType.SQ_EOT) + ) + self._drain_to_eot() + self._in_transaction = False def fast_path_call( self, signature: str, *params: object @@ -308,61 +323,62 @@ class Connection: if self._closed: raise InterfaceError("connection is closed") - cached = self._fp_handle_cache.get(signature) - if cached is None: - # Resolve via SQ_GETROUTINE - self._sock.write_all(build_get_routine_pdu(signature)) + with self._wire_lock: + cached = self._fp_handle_cache.get(signature) + if cached is None: + # Resolve via SQ_GETROUTINE + self._sock.write_all(build_get_routine_pdu(signature)) + reader = _SocketReader(self._sock) + tag = reader.read_short() + if tag == MessageType.SQ_ERR: + self._raise_sq_err() + if tag != MessageType.SQ_GETROUTINE: + raise OperationalError( + f"fast-path GETROUTINE: unexpected tag 0x{tag:04x}" + ) + db_name, handle = parse_get_routine_response(reader) + tail = reader.read_short() + if tail != MessageType.SQ_EOT: + raise OperationalError( + f"GETROUTINE response: missing SQ_EOT (got 0x{tail:04x})" + ) + self._fp_handle_cache[signature] = (db_name, handle) + else: + db_name, handle = cached + + # Now execute via SQ_EXFPROUTINE + self._sock.write_all( + build_exfp_routine_pdu( + db_name, handle, params, encoding=self._encoding + ) + ) reader = _SocketReader(self._sock) tag = reader.read_short() if tag == MessageType.SQ_ERR: self._raise_sq_err() - if tag != MessageType.SQ_GETROUTINE: + if tag != MessageType.SQ_FPROUTINE: raise OperationalError( - f"fast-path GETROUTINE: unexpected tag 0x{tag:04x}" + f"fast-path EXFPROUTINE: unexpected response tag 0x{tag:04x}" ) - db_name, handle = parse_get_routine_response(reader) - tail = reader.read_short() - if tail != MessageType.SQ_EOT: - raise OperationalError( - f"GETROUTINE response: missing SQ_EOT (got 0x{tail:04x})" - ) - self._fp_handle_cache[signature] = (db_name, handle) - else: - db_name, handle = cached - - # Now execute via SQ_EXFPROUTINE - self._sock.write_all( - build_exfp_routine_pdu( - db_name, handle, params, encoding=self._encoding - ) - ) - reader = _SocketReader(self._sock) - tag = reader.read_short() - if tag == MessageType.SQ_ERR: - self._raise_sq_err() - if tag != MessageType.SQ_FPROUTINE: - raise OperationalError( - f"fast-path EXFPROUTINE: unexpected response tag 0x{tag:04x}" - ) - results = parse_fp_routine_response(reader) - # Drain any trailing tags until SQ_EOT (server may send - # SQ_DONE/SQ_COST/SQ_XACTSTAT before SQ_EOT, same as SQL paths) - while True: - tag = reader.read_short() - if tag == MessageType.SQ_EOT: - break - elif tag == MessageType.SQ_DONE: - reader.read_exact(2 + 4 + 4 + 4) # warn + rows + rowid + serial - elif tag == 55: # SQ_COST - reader.read_int() - reader.read_int() - elif tag == MessageType.SQ_XACTSTAT: - reader.read_exact(2 + 2 + 2) - else: - raise OperationalError( - f"fast-path response: unexpected tag 0x{tag:04x}" - ) - return results + results = parse_fp_routine_response(reader) + # Drain any trailing tags until SQ_EOT (server may send + # SQ_DONE/SQ_COST/SQ_XACTSTAT before SQ_EOT, same as SQL paths) + while True: + tag = reader.read_short() + if tag == MessageType.SQ_EOT: + break + elif tag == MessageType.SQ_DONE: + reader.read_exact(2 + 4 + 4 + 4) # warn + rows + rowid + serial + elif tag == 55: # SQ_COST + reader.read_int() + reader.read_int() + elif tag == MessageType.SQ_XACTSTAT: + reader.read_exact(2 + 2 + 2) + else: + raise OperationalError( + f"fast-path response: unexpected tag 0x{tag:04x}" + ) + return results def _ensure_transaction(self) -> None: """Open a server-side transaction if one isn't already open. @@ -375,7 +391,22 @@ class Connection: Idempotent: subsequent calls are no-ops while the transaction is open or while we've cached "this DB doesn't support BEGIN". + + **Precondition (Phase 27):** caller MUST hold ``self._wire_lock``. + Every actual call site is inside a cursor method that has + already acquired the lock; this method does its own wire I/O + but doesn't re-acquire to avoid redundant work. """ + # Defensive guard: fail loudly in development if a future caller + # forgets to lock. ``RLock._is_owned()`` is a CPython-private + # method but stable across versions; cheap (~50ns) and only + # checks the current thread. If it ever changes shape, drop + # this assert — the doc still names the precondition. + assert self._wire_lock._is_owned(), ( + "_ensure_transaction called without _wire_lock held; " + "the cursor method that called it must wrap its body in " + "`with self._conn._wire_lock:`" + ) if self._autocommit or self._in_transaction or self._closed: return if self._supports_begin_work is False: @@ -393,13 +424,29 @@ class Connection: raise def close(self) -> None: - """Send SQ_EXIT and tear down the socket. Idempotent.""" + """Send SQ_EXIT and tear down the socket. Idempotent. + + Phase 27: tries to acquire the wire lock with a short timeout + before sending SQ_EXIT. If another thread is mid-operation, + ``SQ_EXIT`` would interleave bytes with their PDU; better to + skip the polite-exit and just close the socket. The in-flight + thread observes EOF on its next read. + """ with self._lock: if self._closed: return self._closed = True try: - self._send_exit() + # Short timeout — close() shouldn't block long. If the + # wire is busy, skip the polite SQ_EXIT and force-close + # the socket; the in-flight thread will get an OSError + # on its next read, which surfaces cleanly to the caller. + got_lock = self._wire_lock.acquire(timeout=0.5) + if got_lock: + try: + self._send_exit() + finally: + self._wire_lock.release() finally: self._sock.close() diff --git a/src/informix_db/cursors.py b/src/informix_db/cursors.py index c7860b7..0cbc0ca 100644 --- a/src/informix_db/cursors.py +++ b/src/informix_db/cursors.py @@ -166,6 +166,12 @@ class Cursor: ``parameters`` is a sequence (tuple/list) matching the ``?`` or ``:N`` placeholders in ``operation``. Phase 4 supports int, float, str, bool, None. + + Phase 27: serializes the entire wire round-trip under + ``self._conn._wire_lock``. Two threads on one connection (or + async cancellation leaving a worker still mid-execute) cannot + interleave PDU bytes — the second blocks until the first + completes (or is evicted via the pool's release-timeout). """ self._check_open() @@ -179,6 +185,11 @@ class Cursor: # If using paramstyle="numeric", rewrite :1 / :2 → ? sql = _rewrite_numeric_to_qmark(operation) if params else operation + with self._conn._wire_lock: + self._execute_under_wire_lock(sql, params) + + def _execute_under_wire_lock(self, sql: str, params: tuple) -> None: + """Wire-bound body of ``execute``. Caller MUST hold ``_wire_lock``.""" # Reset previous-execute state. self._description = None self._columns = [] @@ -718,35 +729,41 @@ class Cursor: sql = _rewrite_numeric_to_qmark(operation) - # Reset per-execute state. - self._description = None - self._columns = [] - self._rowcount = -1 - self._rows = [] - self._row_index = -1 - self._statement_already_done = False - - # Logged-DB transaction guard — same as execute(). Idempotent - # within an open transaction. - self._conn._ensure_transaction() - - # PREPARE once. - self._conn._send_pdu(self._build_prepare_pdu(sql, num_qmarks=first_len)) - self._read_describe_response() - - # BIND+EXECUTE per parameter set. - total_rowcount = 0 - for params in seq: + # Phase 27: full PREPARE+(BIND+EXECUTE)*N+RELEASE round-trip + # under the wire lock — N rows commit atomically with respect + # to other threads on the connection. + with self._conn._wire_lock: + # Reset per-execute state. + self._description = None + self._columns = [] self._rowcount = -1 - self._conn._send_pdu(self._build_bind_execute_pdu(tuple(params))) - self._drain_to_eot() - if self._rowcount > 0: - total_rowcount += self._rowcount + self._rows = [] + self._row_index = -1 + self._statement_already_done = False - # RELEASE once. - self._conn._send_pdu(self._build_release_pdu()) - self._drain_to_eot() - self._rowcount = total_rowcount + # Logged-DB transaction guard — same as execute(). Idempotent + # within an open transaction. + self._conn._ensure_transaction() + + # PREPARE once. + self._conn._send_pdu( + self._build_prepare_pdu(sql, num_qmarks=first_len) + ) + self._read_describe_response() + + # BIND+EXECUTE per parameter set. + total_rowcount = 0 + for params in seq: + self._rowcount = -1 + self._conn._send_pdu(self._build_bind_execute_pdu(tuple(params))) + self._drain_to_eot() + if self._rowcount > 0: + total_rowcount += self._rowcount + + # RELEASE once. + self._conn._send_pdu(self._build_release_pdu()) + self._drain_to_eot() + self._rowcount = total_rowcount def fetchone(self) -> tuple | None: """Return the next row, or None at EOF. @@ -957,10 +974,15 @@ class Cursor: raise ProgrammingError( "scrollable cursor is not open; call execute() first" ) - prior_count = len(self._rows) - self._last_tupid = None - self._conn._send_pdu(self._build_sfetch_pdu(scrolltype, target)) - self._read_fetch_response() + # Phase 27: hold the wire lock for the SFETCH round-trip. + # Cheap (RLock, single op), and lets every scrollable-cursor + # caller (fetchone/fetchmany/fetchall/scroll/fetch_*) get the + # serialization for free. + with self._conn._wire_lock: + prior_count = len(self._rows) + self._last_tupid = None + self._conn._send_pdu(self._build_sfetch_pdu(scrolltype, target)) + self._read_fetch_response() new_count = len(self._rows) if new_count == prior_count: # No tuple arrived — past-end or empty result set. @@ -988,13 +1010,18 @@ class Cursor: if self._closed: return if self._scrollable and self._server_cursor_open: + # Phase 27: hold the wire lock during CLOSE+RELEASE so we + # don't interleave with another thread's pending op on the + # connection. Best-effort: any wire failure here is + # swallowed (the caller is closing; we don't want to mask + # whatever caused them to close). try: - self._conn._send_pdu(self._build_close_pdu()) - self._drain_to_eot() - self._conn._send_pdu(self._build_release_pdu()) - self._drain_to_eot() + with self._conn._wire_lock: + self._conn._send_pdu(self._build_close_pdu()) + self._drain_to_eot() + self._conn._send_pdu(self._build_release_pdu()) + self._drain_to_eot() except Exception: - # Best-effort close — don't mask other errors pass self._server_cursor_open = False self._closed = True diff --git a/src/informix_db/pool.py b/src/informix_db/pool.py index 6133c9a..5b04fa0 100644 --- a/src/informix_db/pool.py +++ b/src/informix_db/pool.py @@ -57,6 +57,15 @@ from .exceptions import ( # wire up ``logging.getLogger("informix_db.pool")`` to their handler. _log = logging.getLogger(__name__) +# Phase 27: how long ``release()`` will wait to acquire the connection's +# wire lock before evicting. The wire lock is only contended when +# another thread is mid-operation on the same connection — typically +# because an awaitable was cancelled but its underlying ``to_thread`` +# worker is still running. 5 seconds is generous for any normal query +# to finish and short enough that a hung worker doesn't block the pool +# indefinitely. +_RELEASE_WIRE_LOCK_TIMEOUT = 5.0 + class PoolClosedError(InterfaceError): """Pool was closed before/during acquire.""" @@ -190,18 +199,14 @@ class ConnectionPool: are logged at WARNING level via ``logging.getLogger( "informix_db.pool")``. - **Concurrency caveat — async cancellation**: when an awaitable - wrapping a query is cancelled (``asyncio.wait_for`` timeout, - explicit task.cancel(), etc.), the underlying ``to_thread`` - worker that's executing the query is NOT interrupted. It - keeps running while the async pool's release runs concurrently - — and ``release()`` now does its own wire I/O for the rollback. - Two threads writing to one socket will interleave bytes and - desync the wire. Until Phase 27 lands a per-connection wire - lock, **don't put ``asyncio.wait_for`` around `informix_db.aio` - DB calls in production**. Use ``connect_timeout`` / - ``read_timeout`` on the connection itself instead — those run - at the socket level and don't have the to_thread race. + **Concurrency (Phase 27)**: the rollback acquires the + connection's ``_wire_lock`` with a ~5s timeout before sending. + If another thread is mid-operation on the connection (e.g., + a still-running worker after ``asyncio.wait_for`` cancelled + the awaitable), the release path either waits for them to + finish (if quick) or evicts the connection (if they exceed + the timeout). Either way, no two threads ever interleave + bytes on the socket. """ if broken or self._closed or conn.closed: with self._lock: @@ -215,7 +220,28 @@ class ConnectionPool: # connection isn't yet in ``_idle``, and ``_total`` already # counts it as "owned by us", so no other thread can grab it # while we're working. + # + # Phase 27: acquire the connection's wire lock with a timeout + # before rolling back. If another thread holds it (typically a + # cancelled-async worker that's still running on the socket), + # we evict instead of risking interleaved I/O. The connection + # is unsafe until that worker finishes; the next caller would + # rather get a fresh connection than a poisoned one. if conn._in_transaction: + if not conn._wire_lock.acquire( + timeout=_RELEASE_WIRE_LOCK_TIMEOUT + ): + _log.warning( + "wire lock held %ss on release; evicting connection " + "(another thread is still mid-operation — likely a " + "cancelled async query whose worker hasn't finished)", + _RELEASE_WIRE_LOCK_TIMEOUT, + ) + with self._lock: + self._total -= 1 + self._safe_close(conn) + self._lock.notify() + return try: conn.rollback() except Exception as exc: @@ -232,6 +258,8 @@ class ConnectionPool: self._safe_close(conn) self._lock.notify() return + finally: + conn._wire_lock.release() with self._lock: if self._closed: # Pool was closed while we were rolling back. Don't diff --git a/tests/test_pool.py b/tests/test_pool.py index bcb917c..ec21b60 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -7,12 +7,15 @@ safety, and clean shutdown. from __future__ import annotations +import asyncio +import contextlib import threading import time import pytest import informix_db +from informix_db import aio from tests.conftest import ConnParams pytestmark = pytest.mark.integration @@ -326,7 +329,7 @@ def test_uncommitted_writes_invisible_to_next_acquirer( # Setup: fresh table, autocommit so the CREATE lands with pool.connection() as setup: cur = setup.cursor() - with __import__("contextlib").suppress(Exception): + with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") cur.execute(f"CREATE TABLE {table} (id INT, label VARCHAR(64))") setup.commit() @@ -368,7 +371,7 @@ def test_uncommitted_writes_invisible_to_next_acquirer( # Cleanup with pool.connection() as cleanup: cur = cleanup.cursor() - with __import__("contextlib").suppress(Exception): + with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") cleanup.commit() finally: @@ -400,7 +403,7 @@ def test_committed_writes_survive_pool_checkout( try: with pool.connection() as setup: cur = setup.cursor() - with __import__("contextlib").suppress(Exception): + with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") cur.execute(f"CREATE TABLE {table} (id INT)") setup.commit() @@ -423,8 +426,142 @@ def test_committed_writes_survive_pool_checkout( with pool.connection() as cleanup: cur = cleanup.cursor() - with __import__("contextlib").suppress(Exception): + with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") cleanup.commit() finally: pool.close() + + +# -------- Phase 27: wire-lock thread-safety + async cancellation eviction -------- + + +def test_concurrent_threads_on_one_connection_dont_interleave_pdus( + conn_params: ConnParams, +) -> None: + """Phase 27 wire-lock regression test. + + Per PEP 249 Threadsafety=1, threads aren't supposed to share + connections — but the async layer effectively does this when a + cancelled task's worker keeps running. We verify the wire lock + serializes correctly: two threads doing concurrent SELECTs on + one Connection should produce correct results, not garbled wire + state. + + Without the wire lock, the two threads' PDU bytes interleave on + the socket and at least one query produces wrong results, raises + ``ProtocolError``, or hangs. + """ + import threading + + conn = informix_db.connect( + host=conn_params.host, + port=conn_params.port, + user=conn_params.user, + password=conn_params.password, + database=conn_params.database, + server=conn_params.server, + autocommit=True, + ) + try: + results: list[int] = [] + errors: list[Exception] = [] + results_lock = threading.Lock() + + def worker(query_id: int) -> None: + try: + for _ in range(20): + cur = conn.cursor() + cur.execute( + "SELECT FIRST 1 tabid FROM systables WHERE tabid = ?", + (query_id,), + ) + (val,) = cur.fetchone() + cur.close() + with results_lock: + results.append(val) + except Exception as exc: + with results_lock: + errors.append(exc) + + # Two threads, each doing 20 queries with distinct expected results + t1 = threading.Thread(target=worker, args=(1,)) + t2 = threading.Thread(target=worker, args=(2,)) + t1.start() + t2.start() + t1.join(timeout=30.0) + t2.join(timeout=30.0) + assert not t1.is_alive(), "thread 1 hung — wire lock failed" + assert not t2.is_alive(), "thread 2 hung — wire lock failed" + assert errors == [], ( + f"Threads errored out — likely PDU interleaving: {errors!r}" + ) + # Each worker did 20 queries, so 40 results total. Each result + # should be the query_id its thread used. + assert results.count(1) == 20 + assert results.count(2) == 20 + finally: + conn.close() + + +async def test_async_wait_for_cancellation_evicts_connection( + conn_params: ConnParams, +) -> None: + """Phase 27 async-cancellation regression test. + + Before Phase 27, a cancelled awaitable left the connection in the + pool's idle list with a possibly-still-running worker writing to + its socket. Now: cancellation routes to ``broken=True``, and the + pool evicts the connection rather than recycling it. + """ + pool = await aio.create_pool( + host=conn_params.host, + port=conn_params.port, + user=conn_params.user, + password=conn_params.password, + database=conn_params.database, + server=conn_params.server, + min_size=0, + max_size=2, + ) + try: + # Force-grow to 1 connection so we have something to evict + async with pool.connection() as warmup_conn: + cur = await warmup_conn.cursor() + await cur.execute("SELECT 1 FROM systables WHERE tabid = 1") + await cur.fetchone() + await cur.close() + size_before = pool.size + assert size_before == 1, f"expected 1 connection, got {size_before}" + + # Trigger cancellation mid-query. + async def slow_query() -> None: + async with pool.connection() as conn: + cur = await conn.cursor() + # A query that will run for >100ms on the dev image: + # systables join itself a few times. + await cur.execute( + "SELECT COUNT(*) FROM systables a, systables b, " + "systables c WHERE a.tabid > 0" + ) + await cur.fetchone() + await cur.close() + + # Use pytest.raises (NOT contextlib.suppress) so the test fails + # if the timeout never fires — otherwise the test could pass on + # a fast CI runner where the query completes within 1ms, + # silently skipping the cancellation path it claims to test. + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(slow_query(), timeout=0.001) + + # After cancellation, the connection must NOT have rejoined the + # pool's idle list. It should have been evicted (broken=True). + # Allow a moment for the release to complete. + await asyncio.sleep(0.5) + assert pool.size <= size_before, ( + f"Connection wasn't evicted on cancellation; pool.size={pool.size} " + f"(expected ≤ {size_before}). The cancelled connection rejoined " + "the idle list — Phase 27 fix did not apply." + ) + finally: + await pool.close() diff --git a/uv.lock b/uv.lock index 4888ac8..160fa5c 100644 --- a/uv.lock +++ b/uv.lock @@ -34,7 +34,7 @@ wheels = [ [[package]] name = "informix-db" -version = "2026.5.4.10" +version = "2026.5.5" source = { editable = "." } [package.optional-dependencies]