"""Phase 19 integration tests — connection resilience under fault injection. Tests what happens when the network drops, the server crashes, or the socket is forcibly torn down mid-conversation. Each test uses one of two fault-injection mechanisms: 1. **Socket close from client side** — ``conn._sock._sock.close()`` simulates the OS forcibly closing the local end (e.g., kernel socket-buffer overflow, signal handler). 2. **Controlled TCP proxy** (:class:`ControlledProxy` in ``_proxy.py``) sits between the client and Informix; ``proxy.kill()`` severs the connection with TCP RST, mimicking a router drop or server crash. Both produce the same client-observable failure: the next I/O operation raises ``OperationalError``. Verifying these paths catches several classes of bugs: - Hangs (waiting forever on a dead socket) - Silent data corruption (treating EOF as a valid tuple) - Double-fault (raising one error, then a different error on cleanup) - Pool poisoning (returning a broken connection to the pool) """ from __future__ import annotations import asyncio import contextlib import time import pytest import informix_db from tests._proxy import ControlledProxy from tests.conftest import ConnParams pytestmark = pytest.mark.integration def _connect_via_proxy( proxy: ControlledProxy, params: ConnParams, **overrides ) -> informix_db.Connection: kwargs = { "host": "127.0.0.1", "port": proxy.port, "user": params.user, "password": params.password, "database": params.database, "server": params.server, "connect_timeout": 5.0, "read_timeout": 5.0, } kwargs.update(overrides) return informix_db.connect(**kwargs) def _connect_direct(params: ConnParams, **overrides) -> informix_db.Connection: kwargs = { "host": params.host, "port": params.port, "user": params.user, "password": params.password, "database": params.database, "server": params.server, "connect_timeout": 5.0, "read_timeout": 5.0, } kwargs.update(overrides) return informix_db.connect(**kwargs) # -------- Network-drop scenarios via ControlledProxy -------- def test_network_drop_mid_select_raises_operational_error( conn_params: ConnParams, ) -> None: """Killing the proxy mid-query yields a clean ``OperationalError``.""" proxy = ControlledProxy(conn_params.host, conn_params.port) proxy.start() try: conn = _connect_via_proxy(proxy, conn_params) cur = conn.cursor() # Drop the connection BEFORE issuing the query proxy.kill() # Next I/O must raise (not hang, not silently produce empty # result set, not corrupt state) with pytest.raises(informix_db.OperationalError): cur.execute("SELECT FIRST 1 tabname FROM systables") finally: proxy.close() def test_network_drop_after_describe_before_fetch( conn_params: ConnParams, ) -> None: """Drop AFTER describe phase but before NFETCH — execute should raise.""" proxy = ControlledProxy(conn_params.host, conn_params.port) proxy.start() try: conn = _connect_via_proxy(proxy, conn_params) cur = conn.cursor() # Establish the connection works first cur.execute("SELECT 1 FROM systables WHERE tabid = 1") assert cur.fetchone() == (1,) # Now sever and verify the next query fails proxy.kill() with pytest.raises(informix_db.OperationalError): cur.execute("SELECT 2 FROM systables WHERE tabid = 1") finally: proxy.close() def test_network_drop_during_fetch_iteration( conn_params: ConnParams, ) -> None: """Drop between fetches inside an open cursor. For non-scrollable cursors (default), all rows are materialized during ``execute()`` so subsequent ``fetchone`` calls don't do I/O — they read from the local buffer. The drop is detected on the *next* cursor lifecycle operation (close/release), but the in-memory rows are still readable. We test that subsequent execute raises rather than silently returning stale data. """ proxy = ControlledProxy(conn_params.host, conn_params.port) proxy.start() try: conn = _connect_via_proxy(proxy, conn_params) cur = conn.cursor() cur.execute("SELECT FIRST 5 tabid FROM systables ORDER BY tabid") # Materialized; we still have the rows first = cur.fetchone() assert first is not None # Now sever the connection proxy.kill() # Continued reads from already-materialized buffer succeed more = cur.fetchall() assert len(more) == 4 # But a fresh execute over the dead socket fails with pytest.raises(informix_db.OperationalError): cur.execute("SELECT 1 FROM systables WHERE tabid = 1") finally: proxy.close() # -------- Forcible local socket close -------- def test_local_socket_close_then_query(conn_params: ConnParams) -> None: """Forcibly close the underlying socket; next query raises cleanly.""" with _connect_direct(conn_params) as conn: # Yank the rug with contextlib.suppress(OSError): conn._sock._sock.close() cur = conn.cursor() with pytest.raises(informix_db.OperationalError): cur.execute("SELECT 1 FROM systables WHERE tabid = 1") def test_io_error_marks_connection_unusable(conn_params: ConnParams) -> None: """After a transport failure, the connection's socket reports closed.""" conn = _connect_direct(conn_params) try: with contextlib.suppress(OSError): conn._sock._sock.close() cur = conn.cursor() with contextlib.suppress(informix_db.Error): cur.execute("SELECT 1 FROM systables WHERE tabid = 1") # The IfxSocket's _force_close should have run assert conn._sock.closed finally: with contextlib.suppress(Exception): conn.close() # -------- Pool eviction on connection failure -------- def test_pool_evicts_connection_after_proxy_kill( conn_params: ConnParams, ) -> None: """A connection that died inside a pooled ``with`` block is NOT returned.""" proxy = ControlledProxy(conn_params.host, conn_params.port) proxy.start() try: pool = informix_db.create_pool( host="127.0.0.1", port=proxy.port, user=conn_params.user, password=conn_params.password, database=conn_params.database, server=conn_params.server, min_size=0, max_size=2, ) try: # Acquire one, kill the proxy mid-use with ( pytest.raises(informix_db.OperationalError), pool.connection() as conn, ): cur = conn.cursor() cur.execute("SELECT 1 FROM systables WHERE tabid = 1") cur.fetchone() # Sever; next query inside the with-block will fail proxy.kill() cur.execute("SELECT 2 FROM systables WHERE tabid = 1") # Pool should have evicted: zero connections owned now assert pool.size == 0 finally: pool.close() finally: proxy.close() def test_pool_revives_after_all_idles_died( conn_params: ConnParams, ) -> None: """If all idle connections are dead, acquire silently mints fresh ones.""" pool = informix_db.create_pool( host=conn_params.host, port=conn_params.port, user=conn_params.user, password=conn_params.password, database=conn_params.database, server=conn_params.server, min_size=2, max_size=2, ) try: assert pool.idle_count == 2 # Forcibly kill both idle sockets for c in pool._idle: with contextlib.suppress(OSError): c._sock._sock.close() # The next acquire should detect dead connections via health # check, drop them, and mint a fresh one. with pool.connection() as conn: cur = conn.cursor() cur.execute("SELECT 1 FROM systables WHERE tabid = 1") assert cur.fetchone() == (1,) finally: pool.close() # -------- Async cancellation -------- async def test_async_cancellation_during_execute( conn_params: ConnParams, ) -> None: """Cancelling a coroutine mid-await leaves the pool in a sane state. Uses ``asyncio.wait_for`` with an unrealistically short timeout so the worker thread is still running ``cur.execute()`` when the asyncio side gives up. The thread keeps going until I/O completes, but the awaiting coroutine sees ``TimeoutError``. The connection itself ends up in an ambiguous state — Phase 16's pool-eviction policy kicks in: subsequent users get fresh connections. """ from informix_db import aio pool = await aio.create_pool( host=conn_params.host, port=conn_params.port, user=conn_params.user, password=conn_params.password, database=conn_params.database, server=conn_params.server, min_size=0, max_size=2, acquire_timeout=2.0, ) try: # The cancellation behavior we want to verify: even if a query # is interrupted, the pool stays healthy and subsequent queries # work. We use a short timeout that may or may not fire (depends # on local network speed); we assert the *post-condition*, not # which path was taken. async def worker() -> int | None: async with pool.connection() as conn: cur = await conn.cursor() await cur.execute( "SELECT FIRST 1 tabid FROM systables WHERE tabid = 1" ) row = await cur.fetchone() return row[0] if row else None # Best-effort cancel attempt with contextlib.suppress(asyncio.TimeoutError): await asyncio.wait_for(worker(), timeout=0.001) # Pool should still be usable for fresh queries async with pool.connection() as conn: cur = await conn.cursor() await cur.execute("SELECT 1 FROM systables WHERE tabid = 1") assert (await cur.fetchone()) == (1,) finally: await pool.close() # -------- Cursor reuse after error -------- def test_cursor_can_be_reused_after_sql_error( conn_params: ConnParams, ) -> None: """After a SQL-level error, the cursor remains usable for fresh queries.""" with _connect_direct(conn_params) as conn: cur = conn.cursor() with pytest.raises(informix_db.ProgrammingError): cur.execute("SELECT * FROM no_such_table_zzz") # Same cursor, fresh query — must work cur.execute("SELECT 1 FROM systables WHERE tabid = 1") assert cur.fetchone() == (1,) def test_connection_survives_cursor_close_after_error( conn_params: ConnParams, ) -> None: """Closing a cursor after an error doesn't poison the connection.""" with _connect_direct(conn_params) as conn: cur = conn.cursor() with pytest.raises(informix_db.ProgrammingError): cur.execute("SELECT * FROM no_such_table_zzz") cur.close() # Brand-new cursor on the same connection cur2 = conn.cursor() cur2.execute("SELECT 1 FROM systables WHERE tabid = 1") assert cur2.fetchone() == (1,) # -------- Stress / timing -------- def test_pool_sustained_load_no_leaks(conn_params: ConnParams) -> None: """Open + close 50 connections via the pool; ``size`` doesn't grow unboundedly. Catches the obvious leak class: each acquire/release minting a new connection without recycling. Doesn't catch slow leaks (would need tracemalloc for that), but is a sanity baseline. """ pool = informix_db.create_pool( host=conn_params.host, port=conn_params.port, user=conn_params.user, password=conn_params.password, database=conn_params.database, server=conn_params.server, min_size=0, max_size=4, ) try: for _ in range(50): with pool.connection() as conn: cur = conn.cursor() cur.execute("SELECT 1 FROM systables WHERE tabid = 1") cur.fetchone() # Pool should have at most max_size connections owned assert pool.size <= 4 finally: pool.close() def test_read_timeout_fires(conn_params: ConnParams) -> None: """A connection with ``read_timeout`` set raises on a hung server. Set up via the proxy: connect, then kill the proxy *without* a TCP RST so the read silently waits. The configured ``read_timeout`` should fire and produce a clear error rather than hanging forever. """ proxy = ControlledProxy(conn_params.host, conn_params.port) proxy.start() try: conn = _connect_via_proxy(proxy, conn_params, read_timeout=1.0) cur = conn.cursor() cur.execute("SELECT 1 FROM systables WHERE tabid = 1") cur.fetchone() # Soft-kill the upstream side WITHOUT triggering RST; reads will # block forever (or until timeout). We do this by closing the # listener, then severing only the upstream socket gracefully — # the client-side socket sits there with no incoming data. if proxy._upstream is not None: with contextlib.suppress(OSError): proxy._upstream.shutdown(2) # SHUT_RDWR proxy._upstream.close() # Mark the proxy as killed so its pump threads exit proxy._killed = True start = time.monotonic() with pytest.raises(informix_db.OperationalError): cur.execute("SELECT 2 FROM systables WHERE tabid = 1") elapsed = time.monotonic() - start # Should fire within ~2x the timeout, not hang forever assert elapsed < 5.0 finally: proxy.close()