"""Phase 33 integration tests — pipelined ``executemany`` correctness. The pipelined executemany sends all N BIND+EXECUTE PDUs to the wire before draining any response. Hamilton's review of Phase 33 flagged C1: this assumes the server sends *exactly* N responses for N pipelined PDUs even when one row fails. If the server cuts the response stream short on first error, the drain loop would block reading bytes that never arrive — the connection would deadlock on the next read. These tests verify the wire-alignment assumption holds: 1. Constraint violation at row 500 of 1000 — happy-failure case. 2. Wire-alignment recovery — connection is still usable after the error (proving the RELEASE drain succeeded and we read all the remaining error responses). 3. Subsequent operations on the same connection work — proves no stray bytes on the wire. """ from __future__ import annotations import contextlib from collections.abc import Iterator import pytest import informix_db from tests.conftest import ConnParams pytestmark = pytest.mark.integration @pytest.fixture def constraint_table(logged_db_params: ConnParams) -> Iterator[str]: """Table with a UNIQUE constraint on ``id`` so we can force a constraint violation at a known row. """ table = "p33_constraint" conn = informix_db.connect( host=logged_db_params.host, port=logged_db_params.port, user=logged_db_params.user, password=logged_db_params.password, database=logged_db_params.database, server=logged_db_params.server, autocommit=True, ) cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") cur.execute( f"CREATE TABLE {table} (id INT NOT NULL PRIMARY KEY, name VARCHAR(64))" ) conn.close() try: yield table finally: conn = informix_db.connect( host=logged_db_params.host, port=logged_db_params.port, user=logged_db_params.user, password=logged_db_params.password, database=logged_db_params.database, server=logged_db_params.server, autocommit=True, ) cur = conn.cursor() with contextlib.suppress(Exception): cur.execute(f"DROP TABLE {table}") conn.close() def test_pipelined_executemany_mid_batch_constraint_violation( logged_db_params: ConnParams, constraint_table: str ) -> None: """C1 (Hamilton): force a constraint violation at row 500 of 1000; verify the pipeline drains cleanly and the connection is usable afterward. This is the test that validates Phase 33's wire-alignment assumption. If Informix sends fewer than 1000 responses for 1000 pipelined PDUs after the row-500 failure, this test will hang on the drain loop's read (eventually timing out, but the test will fail loudly either way). """ conn = informix_db.connect( host=logged_db_params.host, port=logged_db_params.port, user=logged_db_params.user, password=logged_db_params.password, database=logged_db_params.database, server=logged_db_params.server, autocommit=False, read_timeout=30.0, # if the wire desyncs, fail loudly within 30s ) try: # Pre-seed row 500 so the executemany's row-500 INSERT will # violate the UNIQUE constraint. cur = conn.cursor() cur.execute( f"INSERT INTO {constraint_table} VALUES (?, ?)", (500, "pre-existing"), ) conn.commit() # Now executemany 1000 rows; row 500 will collide rows = [(i, f"row_{i}") for i in range(1000)] with pytest.raises(informix_db.IntegrityError) as exc_info: cur.executemany( f"INSERT INTO {constraint_table} VALUES (?, ?)", rows ) # The error message should identify which row failed in the batch err_msg = str(exc_info.value) assert "row 500" in err_msg or "500" in err_msg, ( f"error message should identify the failed row index: {err_msg}" ) # Whatever the transaction state, rolling back is the correct # response to a failed batch. conn.rollback() # The connection MUST be usable after the failed batch. # If the wire is desynced, this query will block or fail # with a ProtocolError. The test passing here proves the # pipeline drained cleanly. cur = conn.cursor() cur.execute(f"SELECT COUNT(*) FROM {constraint_table}") (count,) = cur.fetchone() # After rollback, only the pre-seeded row 500 remains assert count == 1, ( f"expected only the pre-seeded row to remain, got {count} " "(transaction didn't roll back cleanly?)" ) finally: conn.close() def test_pipelined_executemany_first_row_fails( logged_db_params: ConnParams, constraint_table: str ) -> None: """Edge case: failure on the FIRST row of the pipeline. Tests that the drain loop correctly handles "every response after this is an error" without falling apart on the very first response.""" conn = informix_db.connect( host=logged_db_params.host, port=logged_db_params.port, user=logged_db_params.user, password=logged_db_params.password, database=logged_db_params.database, server=logged_db_params.server, autocommit=False, read_timeout=30.0, ) try: cur = conn.cursor() cur.execute( f"INSERT INTO {constraint_table} VALUES (?, ?)", (0, "seeded") ) conn.commit() rows = [(i, f"row_{i}") for i in range(100)] with pytest.raises(informix_db.IntegrityError): cur.executemany( f"INSERT INTO {constraint_table} VALUES (?, ?)", rows ) conn.rollback() cur = conn.cursor() cur.execute(f"SELECT COUNT(*) FROM {constraint_table}") (count,) = cur.fetchone() assert count == 1 finally: conn.close() def test_pipelined_executemany_last_row_fails( logged_db_params: ConnParams, constraint_table: str ) -> None: """Edge case: failure on the LAST row of the pipeline. Tests that we don't accidentally short-circuit the drain when we see the "expected" rowcount before the actual error response arrives.""" conn = informix_db.connect( host=logged_db_params.host, port=logged_db_params.port, user=logged_db_params.user, password=logged_db_params.password, database=logged_db_params.database, server=logged_db_params.server, autocommit=False, read_timeout=30.0, ) try: cur = conn.cursor() cur.execute( f"INSERT INTO {constraint_table} VALUES (?, ?)", (99, "seeded-last"), ) conn.commit() rows = [(i, f"row_{i}") for i in range(100)] with pytest.raises(informix_db.IntegrityError): cur.executemany( f"INSERT INTO {constraint_table} VALUES (?, ?)", rows ) conn.rollback() cur = conn.cursor() cur.execute(f"SELECT COUNT(*) FROM {constraint_table}") (count,) = cur.fetchone() assert count == 1 finally: conn.close()