Phase 15: connection pool

Thread-safe connection pool with min/max sizing, lazy growth,
idle recycling, and per-acquire health-check.

API:
  pool = informix_db.create_pool(host=..., min_size=1, max_size=10)
  with pool.connection() as conn:
      ...
  pool.close()

Design choices:

* Lazy growth from min_size — pre-opens min_size on construction,
  grows to max_size on demand. Pay-nothing startup with burst capacity.

* Health-check on acquire, not release. Sends a trivial SELECT 1
  round-trip before yielding. Dead idle connections (server-side
  timeout, network drop) are silently replaced. The cost is ~1ms
  per acquire, bought at the price of "users never see a stale-
  connection error". Check-on-release is wrong because idle time
  is when connections actually die.

* Eviction on OperationalError/InterfaceError only. The "with
  pool.connection()" context manager retains the connection on
  application-level errors (ValueError, IntegrityError, etc.).
  Avoids the "every constraint violation evicts a healthy connection"
  pitfall.

* Releases the pool lock during connect() — the slow handshake
  (50-100ms) doesn't serialize other threads' acquires.

Tests: 15 integration tests in test_pool.py covering:
* API & lifecycle (pre-open, lazy growth, context-manager, LIFO)
* Exhaustion (timeout when full, per-acquire override, unblock-on-release)
* Eviction (explicit broken, auto on OperationalError, retain on
  application errors)
* Health-check (dead idle silently replaced)
* Shutdown (close drains, idempotent, context-manager)
* Multi-thread safety (8 workers × 3 queries each, no leaks)

Total: 69 unit + 154 integration = 223 tests.

With Phase 14 (TLS) and Phase 15 (pool), the project covers the
three things a typical Python web/API workload needs from a
database driver: PEP 249 surface, TLS transport, connection pool.
Only async (informix_db.aio) remains in the backlog.
This commit is contained in:
Ryan Malloy 2026-05-04 14:50:27 -06:00
parent 345838fe2d
commit 5e26b34564
4 changed files with 653 additions and 0 deletions

View File

@ -1075,6 +1075,65 @@ The remaining backlog (async, pooling) is **library-design** work, not protocol
--- ---
## 2026-05-04 — Phase 15: connection pool
**Status**: active
**Decision**: Added a thread-safe `informix_db.ConnectionPool` with min/max sizing, lazy growth, idle recycling, and per-acquire health-check. Construct via `informix_db.create_pool(...)`.
```python
import informix_db
pool = informix_db.create_pool(
host="...", user="informix", password="...",
min_size=1, max_size=10, acquire_timeout=5.0,
)
with pool.connection() as conn:
cur = conn.cursor()
cur.execute("SELECT ...")
rows = cur.fetchall()
# conn returned to pool
pool.close()
```
### Design choices
**Lazy growth from `min_size`**. The pool pre-opens `min_size` connections on construction (defaults to 0). Beyond that, new connections are minted on-demand up to `max_size`. This matches what FastAPI / Flask apps want: pay nothing on startup if the workload is light, but burst up to `max_size` under load.
**Health-check on acquire, not on release**. Before returning a connection to a caller, the pool sends a trivial `SELECT 1 FROM systables WHERE tabid=1` round-trip. Dead connections (server-side timeout, network drop) are silently dropped and a fresh one is minted. The cost is one round-trip per acquire — typically <1ms on the same network bought at the price of "users never see a stale-connection error".
The alternative (check on *release*) is wrong for the obvious reason: idle time between release and the next acquire is when connections actually die. Network drops, server-side idle timeouts, and OOM-kills happen *between* uses, not during them.
**Eviction on `OperationalError` / `InterfaceError` only**. The `with pool.connection()` context manager evicts the connection on connection-related exceptions but *retains* it on application-level errors (e.g., `ValueError` from user code, or `IntegrityError` from a constraint violation). This avoids the "every constraint violation evicts a healthy connection" pitfall some pools have.
**Releasing the lock during `connect()`**. The slow part of pool growth is the actual TCP/TLS handshake + login PDU exchange — easily 50-100ms even on the same host. Holding the pool lock during this would serialize all growth. Instead, the `acquire()` method increments `_total` (under the lock), releases the lock, opens the connection, then re-acquires to return it. Other threads can grow / acquire idles concurrently. There's a careful try/finally around the lock release because exceptions during `connect()` need to decrement `_total` and notify waiters.
### Why I went with `threading.Condition` instead of `asyncio.Queue` or similar
The pool is sync-only. Async support is a separate phase that needs an entire `informix_db.aio` module — making the pool dual-API now would couple two unrelated concerns. The sync-pool implementation is ~250 lines; an async-pool will reuse the lifecycle logic but needs different waiting primitives (no real way to share).
### Test coverage: 15 integration tests
`tests/test_pool.py`:
- API & lifecycle: `min_size` pre-opens, lazy growth, context-manager release, LIFO reuse
- Exhaustion: timeout when full, per-acquire timeout override, release unblocks waiters
- Eviction: explicit `broken=True`, auto-evict on `OperationalError`, retain on application errors
- Health-check: dead idle connection silently replaced
- Shutdown: `close()` drains idles, idempotent close, `with pool: ...` context manager
- Multi-thread safety: 8 workers × 3 queries each, no leaks, no double-use
### What's now in the library-design layer
With the pool, the project covers the three things a typical Python web/API workload needs from a database driver:
1. PEP 249 surface (Connection, Cursor, types) — Phases 0-12
2. TLS transport — Phase 14
3. Connection pool — this phase
Remaining backlog is just `informix_db.aio` (async) — a more substantial refactor since it requires factoring the I/O layer behind a transport abstraction.
---
## (template — copy below this line for new entries) ## (template — copy below this line for new entries)
``` ```

View File

@ -43,6 +43,12 @@ from .exceptions import (
ProgrammingError, ProgrammingError,
Warning, Warning,
) )
from .pool import (
ConnectionPool,
PoolClosedError,
PoolTimeoutError,
create_pool,
)
# PEP 249 module-level globals # PEP 249 module-level globals
apilevel = "2.0" apilevel = "2.0"
@ -60,6 +66,7 @@ __all__ = [
"ClobLocator", "ClobLocator",
"CollectionValue", "CollectionValue",
"Connection", "Connection",
"ConnectionPool",
"DataError", "DataError",
"DatabaseError", "DatabaseError",
"Error", "Error",
@ -69,12 +76,15 @@ __all__ = [
"IntervalYM", "IntervalYM",
"NotSupportedError", "NotSupportedError",
"OperationalError", "OperationalError",
"PoolClosedError",
"PoolTimeoutError",
"ProgrammingError", "ProgrammingError",
"RowValue", "RowValue",
"Warning", "Warning",
"__version__", "__version__",
"apilevel", "apilevel",
"connect", "connect",
"create_pool",
"paramstyle", "paramstyle",
"threadsafety", "threadsafety",
] ]

295
src/informix_db/pool.py Normal file
View File

@ -0,0 +1,295 @@
"""Connection pool for ``informix-db`` (Phase 15).
A simple thread-safe pool with min/max sizing, lazy growth, idle
recycling, and per-acquire health-check. Designed for Web/API
workloads where each request acquires a connection briefly and
returns it.
Example::
import informix_db
pool = informix_db.create_pool(
host="...", port=9088, user="informix", password="...",
database="mydb",
min_size=1, max_size=10, acquire_timeout=5.0,
)
with pool.connection() as conn:
cur = conn.cursor()
cur.execute("SELECT ...")
rows = cur.fetchall()
# conn returned to pool
pool.close() # drain on shutdown
Design notes:
* **Lazy growth**: starts with ``min_size`` connections (0 by default).
New connections are created on-demand up to ``max_size``.
* **Health-check on acquire**: each idle connection is verified before
being yielded by sending a quick ``SELECT 1`` cursor execution.
Dead connections are silently replaced.
* **Eviction on errors**: if the caller's ``with`` block raises a
connection-related exception, the connection is closed and
*not* returned to the pool preventing poison-connection bugs.
* **Thread-safe**: a single ``threading.Condition`` guards the pool
state and signals waiters when a connection returns.
"""
from __future__ import annotations
import contextlib
import threading
import time
from collections.abc import Iterator
from typing import Any
from .connections import Connection
from .exceptions import (
InterfaceError,
OperationalError,
)
class PoolClosedError(InterfaceError):
"""Pool was closed before/during acquire."""
class PoolTimeoutError(OperationalError):
"""Acquire blocked beyond ``acquire_timeout`` waiting for a free connection."""
class ConnectionPool:
"""Thread-safe connection pool. Construct via :func:`create_pool`."""
def __init__(
self,
*,
min_size: int = 0,
max_size: int = 10,
acquire_timeout: float | None = 30.0,
connect_kwargs: dict[str, Any] | None = None,
):
if min_size < 0 or max_size < 1 or min_size > max_size:
raise ValueError(
f"invalid pool sizing: min={min_size} max={max_size}"
)
self._min_size = min_size
self._max_size = max_size
self._acquire_timeout = acquire_timeout
self._connect_kwargs = dict(connect_kwargs or {})
self._lock = threading.Condition()
self._idle: list[Connection] = []
self._total: int = 0 # idle + in-use
self._closed: bool = False
# Eagerly open ``min_size`` connections on construction so the
# first acquire doesn't pay the connect latency.
for _ in range(min_size):
self._idle.append(self._make_connection())
self._total += 1
# -- public API --------------------------------------------------------
@property
def min_size(self) -> int:
return self._min_size
@property
def max_size(self) -> int:
return self._max_size
@property
def size(self) -> int:
"""Total connections (idle + checked-out) currently owned."""
with self._lock:
return self._total
@property
def idle_count(self) -> int:
with self._lock:
return len(self._idle)
def acquire(self, timeout: float | None = None) -> Connection:
"""Check out a connection from the pool.
Blocks up to ``timeout`` seconds (or ``acquire_timeout`` if
unset) waiting for a free connection if the pool is at
``max_size``. Raises :class:`PoolTimeoutError` on timeout.
"""
deadline = None
if timeout is None:
timeout = self._acquire_timeout
if timeout is not None:
deadline = time.monotonic() + timeout
with self._lock:
while True:
if self._closed:
raise PoolClosedError("connection pool is closed")
# Reuse an idle connection if available
if self._idle:
conn = self._idle.pop()
if self._is_alive(conn):
return conn
# Dead connection — drop and try again (also
# decrements total since this slot is freed)
self._total -= 1
self._safe_close(conn)
continue
# Grow if we have room
if self._total < self._max_size:
self._total += 1
# Release the lock during connect (slow op) so
# other threads can also grow / acquire idles.
self._lock.release()
try:
try:
conn = self._make_connection()
except Exception:
self._lock.acquire()
self._total -= 1
self._lock.notify()
raise
finally:
if not self._lock._is_owned():
self._lock.acquire()
return conn
# At max — wait for a free connection
remaining = None
if deadline is not None:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise PoolTimeoutError(
f"could not acquire connection within {timeout}s "
f"(pool at max_size={self._max_size})"
)
self._lock.wait(timeout=remaining)
def release(self, conn: Connection, *, broken: bool = False) -> None:
"""Return a connection to the pool.
Pass ``broken=True`` to evict it (e.g., after a connection-
related exception). Broken connections are closed and the
slot is freed for a new connection.
"""
with self._lock:
if broken or self._closed or conn.closed:
self._total -= 1
self._safe_close(conn)
self._lock.notify()
return
self._idle.append(conn)
self._lock.notify()
@contextlib.contextmanager
def connection(
self, timeout: float | None = None
) -> Iterator[Connection]:
"""Context-manager wrapper around acquire/release.
On exception inside the ``with`` block, the connection is
evicted (broken=True) for safety a subsequent caller would
otherwise inherit possibly-corrupt session state.
"""
conn = self.acquire(timeout=timeout)
broken = False
try:
yield conn
except (OperationalError, InterfaceError):
broken = True
raise
finally:
self.release(conn, broken=broken)
def close(self) -> None:
"""Drain the pool — closes idle connections and rejects new acquires.
Connections currently in use are left to the user; their
eventual ``release`` will close them rather than recycling.
"""
with self._lock:
if self._closed:
return
self._closed = True
idles = self._idle
self._idle = []
for conn in idles:
self._safe_close(conn)
self._total -= 1
self._lock.notify_all()
def __enter__(self) -> ConnectionPool:
return self
def __exit__(self, *_exc: object) -> None:
self.close()
# -- internals ---------------------------------------------------------
def _make_connection(self) -> Connection:
"""Open a fresh connection using the stashed connect kwargs."""
from . import connect # local import to avoid circular
return connect(**self._connect_kwargs)
def _is_alive(self, conn: Connection) -> bool:
"""Cheap health probe.
Sends a trivial ``SELECT 1`` cursor cycle if it raises,
the connection is dead. The cursor is closed afterward.
"""
if conn.closed:
return False
try:
cur = conn.cursor()
try:
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
cur.fetchone()
finally:
cur.close()
except Exception:
return False
return True
@staticmethod
def _safe_close(conn: Connection) -> None:
with contextlib.suppress(Exception):
conn.close()
def create_pool(
*,
host: str,
port: int = 9088,
user: str,
password: str | None,
database: str | None = None,
server: str = "informix",
min_size: int = 0,
max_size: int = 10,
acquire_timeout: float | None = 30.0,
**connect_kwargs: Any,
) -> ConnectionPool:
"""Create a :class:`ConnectionPool` with the given connect parameters.
All parameters except ``min_size`` / ``max_size`` / ``acquire_timeout``
are forwarded verbatim to :func:`informix_db.connect` for each
new connection the pool opens.
"""
cfg: dict[str, Any] = {
"host": host,
"port": port,
"user": user,
"password": password,
"database": database,
"server": server,
**connect_kwargs,
}
return ConnectionPool(
min_size=min_size,
max_size=max_size,
acquire_timeout=acquire_timeout,
connect_kwargs=cfg,
)

289
tests/test_pool.py Normal file
View File

@ -0,0 +1,289 @@
"""Phase 15 integration tests — connection pool.
Covers acquire/release, lazy/eager growth, timeout on exhaustion,
broken-connection eviction, health-check on acquire, multi-thread
safety, and clean shutdown.
"""
from __future__ import annotations
import threading
import time
import pytest
import informix_db
from tests.conftest import ConnParams
pytestmark = pytest.mark.integration
def _make_pool(
params: ConnParams, *, min_size: int = 0, max_size: int = 4, **kw
) -> informix_db.ConnectionPool:
return informix_db.create_pool(
host=params.host,
port=params.port,
user=params.user,
password=params.password,
database=params.database,
server=params.server,
min_size=min_size,
max_size=max_size,
**kw,
)
# -------- API + lifecycle --------
def test_pool_starts_with_min_size_connections(
conn_params: ConnParams,
) -> None:
"""``min_size`` connections are pre-opened on construction."""
pool = _make_pool(conn_params, min_size=2, max_size=4)
try:
assert pool.size == 2
assert pool.idle_count == 2
finally:
pool.close()
def test_pool_grows_lazily_to_max_size(conn_params: ConnParams) -> None:
"""Starts at 0, grows on demand up to ``max_size``."""
pool = _make_pool(conn_params, min_size=0, max_size=3)
try:
assert pool.size == 0
c1 = pool.acquire()
assert pool.size == 1
c2 = pool.acquire()
c3 = pool.acquire()
assert pool.size == 3
for c in (c1, c2, c3):
pool.release(c)
assert pool.idle_count == 3
finally:
pool.close()
def test_pool_context_manager_releases(conn_params: ConnParams) -> None:
"""``with pool.connection()`` checks out and returns automatically."""
pool = _make_pool(conn_params, max_size=2)
try:
with pool.connection() as conn:
assert pool.idle_count == 0
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
# Released back into the pool
assert pool.idle_count == 1
finally:
pool.close()
def test_pool_reuses_connections(conn_params: ConnParams) -> None:
"""Sequential acquires return the SAME underlying connection (LIFO)."""
pool = _make_pool(conn_params, max_size=2)
try:
with pool.connection() as conn1:
id1 = id(conn1)
with pool.connection() as conn2:
id2 = id(conn2)
assert id1 == id2 # same Connection object reused
finally:
pool.close()
# -------- Exhaustion + timeout --------
def test_pool_acquire_times_out_when_full(conn_params: ConnParams) -> None:
"""Beyond max_size, acquire blocks then raises PoolTimeoutError."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=0.3)
try:
c1 = pool.acquire()
start = time.monotonic()
with pytest.raises(informix_db.PoolTimeoutError, match="max_size=1"):
pool.acquire()
elapsed = time.monotonic() - start
assert 0.25 < elapsed < 1.0 # honors the timeout
pool.release(c1)
finally:
pool.close()
def test_pool_acquire_timeout_override(conn_params: ConnParams) -> None:
"""Per-acquire ``timeout`` overrides the pool default."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=10.0)
try:
c1 = pool.acquire()
start = time.monotonic()
with pytest.raises(informix_db.PoolTimeoutError):
pool.acquire(timeout=0.2)
assert time.monotonic() - start < 1.0 # didn't wait the 10s default
pool.release(c1)
finally:
pool.close()
def test_pool_release_unblocks_waiter(conn_params: ConnParams) -> None:
"""When a connection is released, a blocked acquire returns."""
pool = _make_pool(conn_params, max_size=1, acquire_timeout=2.0)
try:
c1 = pool.acquire()
# Start a thread that will block on acquire
result: list[informix_db.Connection] = []
def waiter() -> None:
result.append(pool.acquire())
t = threading.Thread(target=waiter, daemon=True)
t.start()
time.sleep(0.1) # let waiter block
assert not result # still waiting
# Releasing c1 should unblock the waiter
pool.release(c1)
t.join(timeout=1.0)
assert len(result) == 1
pool.release(result[0])
finally:
pool.close()
# -------- Broken connection eviction --------
def test_broken_connection_evicted(conn_params: ConnParams) -> None:
"""Releasing with broken=True closes the connection and frees the slot."""
pool = _make_pool(conn_params, max_size=2)
try:
c1 = pool.acquire()
assert pool.size == 1
pool.release(c1, broken=True)
# Slot freed, conn closed
assert pool.size == 0
assert pool.idle_count == 0
finally:
pool.close()
def test_with_block_on_operational_error_evicts(
conn_params: ConnParams,
) -> None:
"""Exception in ``with pool.connection()`` evicts the connection."""
pool = _make_pool(conn_params, max_size=2)
try:
with pytest.raises(informix_db.OperationalError), pool.connection():
raise informix_db.OperationalError("simulated failure")
# Slot freed
assert pool.size == 0
finally:
pool.close()
def test_with_block_on_other_error_returns_to_pool(
conn_params: ConnParams,
) -> None:
"""Non-connection-related exceptions DON'T evict (data errors stay)."""
pool = _make_pool(conn_params, max_size=2)
try:
with pytest.raises(ValueError), pool.connection() as _conn:
raise ValueError("application bug, not connection")
# Connection retained
assert pool.idle_count == 1
finally:
pool.close()
# -------- Health check on acquire --------
def test_dead_connection_silently_replaced(
conn_params: ConnParams,
) -> None:
"""An idle connection that died is dropped and a fresh one minted."""
pool = _make_pool(conn_params, max_size=2)
try:
c1 = pool.acquire()
pool.release(c1)
# Forcibly break the idle connection from the outside
c1.close()
# Next acquire should silently replace it
c2 = pool.acquire()
assert c2 is not c1 or not c1.closed
# Verify it's actually usable
cur = c2.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
pool.release(c2)
finally:
pool.close()
# -------- Shutdown --------
def test_pool_close_drains_idle(conn_params: ConnParams) -> None:
"""``close()`` closes all idle connections and rejects new acquires."""
pool = _make_pool(conn_params, min_size=2)
assert pool.idle_count == 2
pool.close()
assert pool.size == 0
with pytest.raises(informix_db.PoolClosedError):
pool.acquire()
def test_pool_close_idempotent(conn_params: ConnParams) -> None:
"""``close()`` may be called multiple times."""
pool = _make_pool(conn_params, max_size=1)
pool.close()
pool.close() # must not raise
def test_pool_as_context_manager(conn_params: ConnParams) -> None:
"""``with pool: ...`` closes on exit."""
with _make_pool(conn_params, min_size=1) as pool, pool.connection() as conn:
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
assert cur.fetchone() == (1,)
# After exit
with pytest.raises(informix_db.PoolClosedError):
pool.acquire()
# -------- Multi-threaded safety --------
def test_pool_thread_safe_concurrent_acquires(
conn_params: ConnParams,
) -> None:
"""Multiple threads sharing a pool don't deadlock or double-use."""
pool = _make_pool(conn_params, max_size=4, acquire_timeout=5.0)
try:
results: list[int] = []
results_lock = threading.Lock()
def worker() -> None:
for _ in range(3):
with pool.connection() as conn:
cur = conn.cursor()
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
(val,) = cur.fetchone()
with results_lock:
results.append(val)
threads = [threading.Thread(target=worker) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10.0)
assert not t.is_alive()
# 8 workers x 3 queries each = 24 results, all = 1
assert len(results) == 24
assert all(r == 1 for r in results)
# Pool didn't leak: at most max_size connections
assert pool.size <= 4
finally:
pool.close()