diff --git a/docs/DECISION_LOG.md b/docs/DECISION_LOG.md index 074f6f2..7b070ba 100644 --- a/docs/DECISION_LOG.md +++ b/docs/DECISION_LOG.md @@ -1075,6 +1075,65 @@ The remaining backlog (async, pooling) is **library-design** work, not protocol --- +## 2026-05-04 — Phase 15: connection pool + +**Status**: active +**Decision**: Added a thread-safe `informix_db.ConnectionPool` with min/max sizing, lazy growth, idle recycling, and per-acquire health-check. Construct via `informix_db.create_pool(...)`. + +```python +import informix_db + +pool = informix_db.create_pool( + host="...", user="informix", password="...", + min_size=1, max_size=10, acquire_timeout=5.0, +) + +with pool.connection() as conn: + cur = conn.cursor() + cur.execute("SELECT ...") + rows = cur.fetchall() +# conn returned to pool + +pool.close() +``` + +### Design choices + +**Lazy growth from `min_size`**. The pool pre-opens `min_size` connections on construction (defaults to 0). Beyond that, new connections are minted on-demand up to `max_size`. This matches what FastAPI / Flask apps want: pay nothing on startup if the workload is light, but burst up to `max_size` under load. + +**Health-check on acquire, not on release**. Before returning a connection to a caller, the pool sends a trivial `SELECT 1 FROM systables WHERE tabid=1` round-trip. Dead connections (server-side timeout, network drop) are silently dropped and a fresh one is minted. The cost is one round-trip per acquire — typically <1ms on the same network — bought at the price of "users never see a stale-connection error". + +The alternative (check on *release*) is wrong for the obvious reason: idle time between release and the next acquire is when connections actually die. Network drops, server-side idle timeouts, and OOM-kills happen *between* uses, not during them. + +**Eviction on `OperationalError` / `InterfaceError` only**. The `with pool.connection()` context manager evicts the connection on connection-related exceptions but *retains* it on application-level errors (e.g., `ValueError` from user code, or `IntegrityError` from a constraint violation). This avoids the "every constraint violation evicts a healthy connection" pitfall some pools have. + +**Releasing the lock during `connect()`**. The slow part of pool growth is the actual TCP/TLS handshake + login PDU exchange — easily 50-100ms even on the same host. Holding the pool lock during this would serialize all growth. Instead, the `acquire()` method increments `_total` (under the lock), releases the lock, opens the connection, then re-acquires to return it. Other threads can grow / acquire idles concurrently. There's a careful try/finally around the lock release because exceptions during `connect()` need to decrement `_total` and notify waiters. + +### Why I went with `threading.Condition` instead of `asyncio.Queue` or similar + +The pool is sync-only. Async support is a separate phase that needs an entire `informix_db.aio` module — making the pool dual-API now would couple two unrelated concerns. The sync-pool implementation is ~250 lines; an async-pool will reuse the lifecycle logic but needs different waiting primitives (no real way to share). + +### Test coverage: 15 integration tests + +`tests/test_pool.py`: +- API & lifecycle: `min_size` pre-opens, lazy growth, context-manager release, LIFO reuse +- Exhaustion: timeout when full, per-acquire timeout override, release unblocks waiters +- Eviction: explicit `broken=True`, auto-evict on `OperationalError`, retain on application errors +- Health-check: dead idle connection silently replaced +- Shutdown: `close()` drains idles, idempotent close, `with pool: ...` context manager +- Multi-thread safety: 8 workers × 3 queries each, no leaks, no double-use + +### What's now in the library-design layer + +With the pool, the project covers the three things a typical Python web/API workload needs from a database driver: +1. PEP 249 surface (Connection, Cursor, types) — Phases 0-12 +2. TLS transport — Phase 14 +3. Connection pool — this phase + +Remaining backlog is just `informix_db.aio` (async) — a more substantial refactor since it requires factoring the I/O layer behind a transport abstraction. + +--- + ## (template — copy below this line for new entries) ``` diff --git a/src/informix_db/__init__.py b/src/informix_db/__init__.py index 3c2d3b9..7625af1 100644 --- a/src/informix_db/__init__.py +++ b/src/informix_db/__init__.py @@ -43,6 +43,12 @@ from .exceptions import ( ProgrammingError, Warning, ) +from .pool import ( + ConnectionPool, + PoolClosedError, + PoolTimeoutError, + create_pool, +) # PEP 249 module-level globals apilevel = "2.0" @@ -60,6 +66,7 @@ __all__ = [ "ClobLocator", "CollectionValue", "Connection", + "ConnectionPool", "DataError", "DatabaseError", "Error", @@ -69,12 +76,15 @@ __all__ = [ "IntervalYM", "NotSupportedError", "OperationalError", + "PoolClosedError", + "PoolTimeoutError", "ProgrammingError", "RowValue", "Warning", "__version__", "apilevel", "connect", + "create_pool", "paramstyle", "threadsafety", ] diff --git a/src/informix_db/pool.py b/src/informix_db/pool.py new file mode 100644 index 0000000..701d43e --- /dev/null +++ b/src/informix_db/pool.py @@ -0,0 +1,295 @@ +"""Connection pool for ``informix-db`` (Phase 15). + +A simple thread-safe pool with min/max sizing, lazy growth, idle +recycling, and per-acquire health-check. Designed for Web/API +workloads where each request acquires a connection briefly and +returns it. + +Example:: + + import informix_db + + pool = informix_db.create_pool( + host="...", port=9088, user="informix", password="...", + database="mydb", + min_size=1, max_size=10, acquire_timeout=5.0, + ) + + with pool.connection() as conn: + cur = conn.cursor() + cur.execute("SELECT ...") + rows = cur.fetchall() + # conn returned to pool + + pool.close() # drain on shutdown + +Design notes: + +* **Lazy growth**: starts with ``min_size`` connections (0 by default). + New connections are created on-demand up to ``max_size``. +* **Health-check on acquire**: each idle connection is verified before + being yielded by sending a quick ``SELECT 1`` cursor execution. + Dead connections are silently replaced. +* **Eviction on errors**: if the caller's ``with`` block raises a + connection-related exception, the connection is closed and + *not* returned to the pool — preventing poison-connection bugs. +* **Thread-safe**: a single ``threading.Condition`` guards the pool + state and signals waiters when a connection returns. +""" + +from __future__ import annotations + +import contextlib +import threading +import time +from collections.abc import Iterator +from typing import Any + +from .connections import Connection +from .exceptions import ( + InterfaceError, + OperationalError, +) + + +class PoolClosedError(InterfaceError): + """Pool was closed before/during acquire.""" + + +class PoolTimeoutError(OperationalError): + """Acquire blocked beyond ``acquire_timeout`` waiting for a free connection.""" + + +class ConnectionPool: + """Thread-safe connection pool. Construct via :func:`create_pool`.""" + + def __init__( + self, + *, + min_size: int = 0, + max_size: int = 10, + acquire_timeout: float | None = 30.0, + connect_kwargs: dict[str, Any] | None = None, + ): + if min_size < 0 or max_size < 1 or min_size > max_size: + raise ValueError( + f"invalid pool sizing: min={min_size} max={max_size}" + ) + self._min_size = min_size + self._max_size = max_size + self._acquire_timeout = acquire_timeout + self._connect_kwargs = dict(connect_kwargs or {}) + self._lock = threading.Condition() + self._idle: list[Connection] = [] + self._total: int = 0 # idle + in-use + self._closed: bool = False + + # Eagerly open ``min_size`` connections on construction so the + # first acquire doesn't pay the connect latency. + for _ in range(min_size): + self._idle.append(self._make_connection()) + self._total += 1 + + # -- public API -------------------------------------------------------- + + @property + def min_size(self) -> int: + return self._min_size + + @property + def max_size(self) -> int: + return self._max_size + + @property + def size(self) -> int: + """Total connections (idle + checked-out) currently owned.""" + with self._lock: + return self._total + + @property + def idle_count(self) -> int: + with self._lock: + return len(self._idle) + + def acquire(self, timeout: float | None = None) -> Connection: + """Check out a connection from the pool. + + Blocks up to ``timeout`` seconds (or ``acquire_timeout`` if + unset) waiting for a free connection if the pool is at + ``max_size``. Raises :class:`PoolTimeoutError` on timeout. + """ + deadline = None + if timeout is None: + timeout = self._acquire_timeout + if timeout is not None: + deadline = time.monotonic() + timeout + + with self._lock: + while True: + if self._closed: + raise PoolClosedError("connection pool is closed") + # Reuse an idle connection if available + if self._idle: + conn = self._idle.pop() + if self._is_alive(conn): + return conn + # Dead connection — drop and try again (also + # decrements total since this slot is freed) + self._total -= 1 + self._safe_close(conn) + continue + # Grow if we have room + if self._total < self._max_size: + self._total += 1 + # Release the lock during connect (slow op) so + # other threads can also grow / acquire idles. + self._lock.release() + try: + try: + conn = self._make_connection() + except Exception: + self._lock.acquire() + self._total -= 1 + self._lock.notify() + raise + finally: + if not self._lock._is_owned(): + self._lock.acquire() + return conn + # At max — wait for a free connection + remaining = None + if deadline is not None: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise PoolTimeoutError( + f"could not acquire connection within {timeout}s " + f"(pool at max_size={self._max_size})" + ) + self._lock.wait(timeout=remaining) + + def release(self, conn: Connection, *, broken: bool = False) -> None: + """Return a connection to the pool. + + Pass ``broken=True`` to evict it (e.g., after a connection- + related exception). Broken connections are closed and the + slot is freed for a new connection. + """ + with self._lock: + if broken or self._closed or conn.closed: + self._total -= 1 + self._safe_close(conn) + self._lock.notify() + return + self._idle.append(conn) + self._lock.notify() + + @contextlib.contextmanager + def connection( + self, timeout: float | None = None + ) -> Iterator[Connection]: + """Context-manager wrapper around acquire/release. + + On exception inside the ``with`` block, the connection is + evicted (broken=True) for safety — a subsequent caller would + otherwise inherit possibly-corrupt session state. + """ + conn = self.acquire(timeout=timeout) + broken = False + try: + yield conn + except (OperationalError, InterfaceError): + broken = True + raise + finally: + self.release(conn, broken=broken) + + def close(self) -> None: + """Drain the pool — closes idle connections and rejects new acquires. + + Connections currently in use are left to the user; their + eventual ``release`` will close them rather than recycling. + """ + with self._lock: + if self._closed: + return + self._closed = True + idles = self._idle + self._idle = [] + for conn in idles: + self._safe_close(conn) + self._total -= 1 + self._lock.notify_all() + + def __enter__(self) -> ConnectionPool: + return self + + def __exit__(self, *_exc: object) -> None: + self.close() + + # -- internals --------------------------------------------------------- + + def _make_connection(self) -> Connection: + """Open a fresh connection using the stashed connect kwargs.""" + from . import connect # local import to avoid circular + + return connect(**self._connect_kwargs) + + def _is_alive(self, conn: Connection) -> bool: + """Cheap health probe. + + Sends a trivial ``SELECT 1`` cursor cycle — if it raises, + the connection is dead. The cursor is closed afterward. + """ + if conn.closed: + return False + try: + cur = conn.cursor() + try: + cur.execute("SELECT 1 FROM systables WHERE tabid = 1") + cur.fetchone() + finally: + cur.close() + except Exception: + return False + return True + + @staticmethod + def _safe_close(conn: Connection) -> None: + with contextlib.suppress(Exception): + conn.close() + + +def create_pool( + *, + host: str, + port: int = 9088, + user: str, + password: str | None, + database: str | None = None, + server: str = "informix", + min_size: int = 0, + max_size: int = 10, + acquire_timeout: float | None = 30.0, + **connect_kwargs: Any, +) -> ConnectionPool: + """Create a :class:`ConnectionPool` with the given connect parameters. + + All parameters except ``min_size`` / ``max_size`` / ``acquire_timeout`` + are forwarded verbatim to :func:`informix_db.connect` for each + new connection the pool opens. + """ + cfg: dict[str, Any] = { + "host": host, + "port": port, + "user": user, + "password": password, + "database": database, + "server": server, + **connect_kwargs, + } + return ConnectionPool( + min_size=min_size, + max_size=max_size, + acquire_timeout=acquire_timeout, + connect_kwargs=cfg, + ) diff --git a/tests/test_pool.py b/tests/test_pool.py new file mode 100644 index 0000000..36d9c36 --- /dev/null +++ b/tests/test_pool.py @@ -0,0 +1,289 @@ +"""Phase 15 integration tests — connection pool. + +Covers acquire/release, lazy/eager growth, timeout on exhaustion, +broken-connection eviction, health-check on acquire, multi-thread +safety, and clean shutdown. +""" + +from __future__ import annotations + +import threading +import time + +import pytest + +import informix_db +from tests.conftest import ConnParams + +pytestmark = pytest.mark.integration + + +def _make_pool( + params: ConnParams, *, min_size: int = 0, max_size: int = 4, **kw +) -> informix_db.ConnectionPool: + return informix_db.create_pool( + host=params.host, + port=params.port, + user=params.user, + password=params.password, + database=params.database, + server=params.server, + min_size=min_size, + max_size=max_size, + **kw, + ) + + +# -------- API + lifecycle -------- + + +def test_pool_starts_with_min_size_connections( + conn_params: ConnParams, +) -> None: + """``min_size`` connections are pre-opened on construction.""" + pool = _make_pool(conn_params, min_size=2, max_size=4) + try: + assert pool.size == 2 + assert pool.idle_count == 2 + finally: + pool.close() + + +def test_pool_grows_lazily_to_max_size(conn_params: ConnParams) -> None: + """Starts at 0, grows on demand up to ``max_size``.""" + pool = _make_pool(conn_params, min_size=0, max_size=3) + try: + assert pool.size == 0 + c1 = pool.acquire() + assert pool.size == 1 + c2 = pool.acquire() + c3 = pool.acquire() + assert pool.size == 3 + for c in (c1, c2, c3): + pool.release(c) + assert pool.idle_count == 3 + finally: + pool.close() + + +def test_pool_context_manager_releases(conn_params: ConnParams) -> None: + """``with pool.connection()`` checks out and returns automatically.""" + pool = _make_pool(conn_params, max_size=2) + try: + with pool.connection() as conn: + assert pool.idle_count == 0 + cur = conn.cursor() + cur.execute("SELECT 1 FROM systables WHERE tabid = 1") + assert cur.fetchone() == (1,) + # Released back into the pool + assert pool.idle_count == 1 + finally: + pool.close() + + +def test_pool_reuses_connections(conn_params: ConnParams) -> None: + """Sequential acquires return the SAME underlying connection (LIFO).""" + pool = _make_pool(conn_params, max_size=2) + try: + with pool.connection() as conn1: + id1 = id(conn1) + with pool.connection() as conn2: + id2 = id(conn2) + assert id1 == id2 # same Connection object reused + finally: + pool.close() + + +# -------- Exhaustion + timeout -------- + + +def test_pool_acquire_times_out_when_full(conn_params: ConnParams) -> None: + """Beyond max_size, acquire blocks then raises PoolTimeoutError.""" + pool = _make_pool(conn_params, max_size=1, acquire_timeout=0.3) + try: + c1 = pool.acquire() + start = time.monotonic() + with pytest.raises(informix_db.PoolTimeoutError, match="max_size=1"): + pool.acquire() + elapsed = time.monotonic() - start + assert 0.25 < elapsed < 1.0 # honors the timeout + pool.release(c1) + finally: + pool.close() + + +def test_pool_acquire_timeout_override(conn_params: ConnParams) -> None: + """Per-acquire ``timeout`` overrides the pool default.""" + pool = _make_pool(conn_params, max_size=1, acquire_timeout=10.0) + try: + c1 = pool.acquire() + start = time.monotonic() + with pytest.raises(informix_db.PoolTimeoutError): + pool.acquire(timeout=0.2) + assert time.monotonic() - start < 1.0 # didn't wait the 10s default + pool.release(c1) + finally: + pool.close() + + +def test_pool_release_unblocks_waiter(conn_params: ConnParams) -> None: + """When a connection is released, a blocked acquire returns.""" + pool = _make_pool(conn_params, max_size=1, acquire_timeout=2.0) + try: + c1 = pool.acquire() + + # Start a thread that will block on acquire + result: list[informix_db.Connection] = [] + + def waiter() -> None: + result.append(pool.acquire()) + + t = threading.Thread(target=waiter, daemon=True) + t.start() + time.sleep(0.1) # let waiter block + assert not result # still waiting + + # Releasing c1 should unblock the waiter + pool.release(c1) + t.join(timeout=1.0) + assert len(result) == 1 + pool.release(result[0]) + finally: + pool.close() + + +# -------- Broken connection eviction -------- + + +def test_broken_connection_evicted(conn_params: ConnParams) -> None: + """Releasing with broken=True closes the connection and frees the slot.""" + pool = _make_pool(conn_params, max_size=2) + try: + c1 = pool.acquire() + assert pool.size == 1 + pool.release(c1, broken=True) + # Slot freed, conn closed + assert pool.size == 0 + assert pool.idle_count == 0 + finally: + pool.close() + + +def test_with_block_on_operational_error_evicts( + conn_params: ConnParams, +) -> None: + """Exception in ``with pool.connection()`` evicts the connection.""" + pool = _make_pool(conn_params, max_size=2) + try: + with pytest.raises(informix_db.OperationalError), pool.connection(): + raise informix_db.OperationalError("simulated failure") + # Slot freed + assert pool.size == 0 + finally: + pool.close() + + +def test_with_block_on_other_error_returns_to_pool( + conn_params: ConnParams, +) -> None: + """Non-connection-related exceptions DON'T evict (data errors stay).""" + pool = _make_pool(conn_params, max_size=2) + try: + with pytest.raises(ValueError), pool.connection() as _conn: + raise ValueError("application bug, not connection") + # Connection retained + assert pool.idle_count == 1 + finally: + pool.close() + + +# -------- Health check on acquire -------- + + +def test_dead_connection_silently_replaced( + conn_params: ConnParams, +) -> None: + """An idle connection that died is dropped and a fresh one minted.""" + pool = _make_pool(conn_params, max_size=2) + try: + c1 = pool.acquire() + pool.release(c1) + # Forcibly break the idle connection from the outside + c1.close() + # Next acquire should silently replace it + c2 = pool.acquire() + assert c2 is not c1 or not c1.closed + # Verify it's actually usable + cur = c2.cursor() + cur.execute("SELECT 1 FROM systables WHERE tabid = 1") + assert cur.fetchone() == (1,) + pool.release(c2) + finally: + pool.close() + + +# -------- Shutdown -------- + + +def test_pool_close_drains_idle(conn_params: ConnParams) -> None: + """``close()`` closes all idle connections and rejects new acquires.""" + pool = _make_pool(conn_params, min_size=2) + assert pool.idle_count == 2 + pool.close() + assert pool.size == 0 + with pytest.raises(informix_db.PoolClosedError): + pool.acquire() + + +def test_pool_close_idempotent(conn_params: ConnParams) -> None: + """``close()`` may be called multiple times.""" + pool = _make_pool(conn_params, max_size=1) + pool.close() + pool.close() # must not raise + + +def test_pool_as_context_manager(conn_params: ConnParams) -> None: + """``with pool: ...`` closes on exit.""" + with _make_pool(conn_params, min_size=1) as pool, pool.connection() as conn: + cur = conn.cursor() + cur.execute("SELECT 1 FROM systables WHERE tabid = 1") + assert cur.fetchone() == (1,) + # After exit + with pytest.raises(informix_db.PoolClosedError): + pool.acquire() + + +# -------- Multi-threaded safety -------- + + +def test_pool_thread_safe_concurrent_acquires( + conn_params: ConnParams, +) -> None: + """Multiple threads sharing a pool don't deadlock or double-use.""" + pool = _make_pool(conn_params, max_size=4, acquire_timeout=5.0) + try: + results: list[int] = [] + results_lock = threading.Lock() + + def worker() -> None: + for _ in range(3): + with pool.connection() as conn: + cur = conn.cursor() + cur.execute("SELECT 1 FROM systables WHERE tabid = 1") + (val,) = cur.fetchone() + with results_lock: + results.append(val) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10.0) + assert not t.is_alive() + # 8 workers x 3 queries each = 24 results, all = 1 + assert len(results) == 24 + assert all(r == 1 for r in results) + # Pool didn't leak: at most max_size connections + assert pool.size <= 4 + finally: + pool.close()