"""Phase 34 — scaling benchmarks. The existing benchmarks measure single-shape workloads (1k-row SELECT, 1k-row executemany). These add the scaling axes: 1. **executemany at 1k / 10k / 100k rows** in a transaction. Phase 33's pipelining eliminates per-row RTT; this test confirms the speedup scales linearly with N. 2. **SELECT at 1k / 10k / 100k rows**. Tests parse_tuple_payload throughput at real-world scale. Could surface codec slowdown, memory issues, or GC-pause amplification. 3. **Wide-row SELECT** (5 / 20 / 50 columns x 1k rows). More columns = more decode calls per row. Different cost shape than row-count scaling. 4. **Type-mix SELECT**: realistic application workload with INT + VARCHAR + DECIMAL + DATE + FLOAT in one query. Tests the codec dispatch under a representative mix of decoders. Each benchmark is parametrized; pytest-benchmark groups them so we get one row per scale point. """ from __future__ import annotations import contextlib from collections.abc import Iterator import pytest import informix_db from tests.conftest import ConnParams pytestmark = [pytest.mark.benchmark, pytest.mark.integration] # Module-level scaling sizes EXECUTEMANY_SIZES = [1_000, 10_000, 100_000] SELECT_SIZES = [1_000, 10_000, 100_000] WIDTH_COLUMNS = [5, 20, 50] @pytest.fixture(scope="module") def txn_conn(conn_params: ConnParams) -> Iterator[informix_db.Connection]: """Logged-DB connection with autocommit=False for in-transaction bulk-insert benchmarks.""" conn = informix_db.connect( host=conn_params.host, port=conn_params.port, user=conn_params.user, password=conn_params.password, database="testdb", server=conn_params.server, autocommit=False, ) try: yield conn finally: conn.close() # ---------------------------------------------------------------------------- # Bulk-insert scaling # ---------------------------------------------------------------------------- @pytest.mark.parametrize("n_rows", EXECUTEMANY_SIZES) def test_executemany_scaling( benchmark, txn_conn: informix_db.Connection, n_rows: int ) -> None: """executemany(N) in a single transaction. Pipelined Phase 33 design sends all N PDUs then drains all N responses — should scale roughly linearly with N at very low per-row cost. Round counts shrink as N grows so each scale point completes in similar wall time: 1k rows → 10 rounds (~110 ms each = 1.1 s) 10k rows → 5 rounds (~1.1 s each = 5.5 s) 100k rows → 3 rounds (~11 s each = 33 s) """ rounds_for = {1_000: 10, 10_000: 5, 100_000: 3} table = f"p34_em_{n_rows}" cur = txn_conn.cursor() with contextlib.suppress(informix_db.Error): cur.execute(f"DROP TABLE {table}") cur.execute(f"CREATE TABLE {table} (id INT, name VARCHAR(64), value FLOAT)") txn_conn.commit() counter = [0] def run() -> None: counter[0] += 1 base = counter[0] * n_rows rows = [(base + i, f"row_{base + i}", float(base + i)) for i in range(n_rows)] cur = txn_conn.cursor() cur.executemany(f"INSERT INTO {table} VALUES (?, ?, ?)", rows) cur.close() txn_conn.commit() try: benchmark.pedantic(run, rounds=rounds_for[n_rows], iterations=1) finally: with contextlib.suppress(informix_db.Error): cur = txn_conn.cursor() cur.execute(f"DROP TABLE {table}") txn_conn.commit() # ---------------------------------------------------------------------------- # SELECT-scaling # ---------------------------------------------------------------------------- @pytest.fixture(scope="module") def scaling_select_table(conn_params: ConnParams) -> Iterator[str]: """Pre-populated 100k-row table for SELECT scaling. Built once per module run; benchmarks select FIRST N rows. Uses its OWN connection (not the shared txn_conn) so its transaction state can't be polluted by other tests' executemany work. Earlier attempts to share txn_conn produced silent population failures (200 rows instead of 100k) likely from cursor-state leakage across pipelined batches in the same transaction. """ table = "p34_select" setup_conn = informix_db.connect( host=conn_params.host, port=conn_params.port, user=conn_params.user, password=conn_params.password, database="testdb", server=conn_params.server, autocommit=False, ) cur = setup_conn.cursor() with contextlib.suppress(informix_db.Error): cur.execute(f"DROP TABLE {table}") setup_conn.commit() cur.execute( f"CREATE TABLE {table} (" f" id INT, name VARCHAR(64), counter INT," f" value FLOAT, label VARCHAR(32))" ) setup_conn.commit() # Insert in 10k chunks, committing after each so a failure mid-loop # surfaces instead of silently dropping rows. chunk = 10_000 for base in range(0, 100_000, chunk): rows = [ (base + i, f"name_{base + i:06d}", (base + i) * 7, float(base + i) * 1.5, f"L{(base + i) % 100:02d}") for i in range(chunk) ] cur.executemany( f"INSERT INTO {table} VALUES (?, ?, ?, ?, ?)", rows ) setup_conn.commit() # Verify population — fail loud if the multi-chunk insert dropped rows. cur.execute(f"SELECT COUNT(*) FROM {table}") (count,) = cur.fetchone() assert count == 100_000, ( f"fixture failed: {table} has {count} rows, expected 100000" ) try: yield table finally: with contextlib.suppress(informix_db.Error): cur = setup_conn.cursor() cur.execute(f"DROP TABLE {table}") setup_conn.commit() setup_conn.close() @pytest.fixture(scope="module") def select_read_conn( conn_params: ConnParams, ) -> Iterator[informix_db.Connection]: """Dedicated read connection for SELECT scaling tests. Sharing ``txn_conn`` across read and write tests caused a transaction-isolation bug: ``txn_conn`` would have an open read-snapshot from before the fixture's writes committed, so SELECTs through it only saw 200 rows instead of 100k. A separate read-side connection that's never been in a transaction sees the committed state correctly. """ conn = informix_db.connect( host=conn_params.host, port=conn_params.port, user=conn_params.user, password=conn_params.password, database="testdb", server=conn_params.server, autocommit=True, # read-only — no transaction state to worry about ) try: yield conn finally: conn.close() @pytest.mark.parametrize("n_rows", SELECT_SIZES) def test_select_scaling( benchmark, select_read_conn: informix_db.Connection, scaling_select_table: str, n_rows: int, ) -> None: """SELECT FIRST N from a pre-populated 100k-row table. Tests parse_tuple_payload throughput at production scale. Per-row cost should stay roughly constant across N — if the per-row median grows with N, something's wrong (memory pressure, GC, codec degradation). """ rounds_for = {1_000: 10, 10_000: 5, 100_000: 3} cur = select_read_conn.cursor() cur.execute(f"SELECT COUNT(*) FROM {scaling_select_table}") (count,) = cur.fetchone() cur.close() assert count >= n_rows, ( f"{scaling_select_table} has only {count} rows; " f"can't benchmark SELECT FIRST {n_rows}" ) def run() -> int: cur = select_read_conn.cursor() cur.execute(f"SELECT FIRST {n_rows} * FROM {scaling_select_table}") rows = cur.fetchall() cur.close() assert len(rows) == n_rows, ( f"SELECT FIRST {n_rows} returned {len(rows)} rows" ) return len(rows) benchmark.pedantic(run, rounds=rounds_for[n_rows], iterations=1) # ---------------------------------------------------------------------------- # Wide-row scaling # ---------------------------------------------------------------------------- @pytest.mark.parametrize("n_cols", WIDTH_COLUMNS) def test_wide_row_select( benchmark, txn_conn: informix_db.Connection, n_cols: int ) -> None: """SELECT 1000 rows of width N columns. Tests the codec dispatch under different per-row column-count loads. parse_tuple_payload runs its dispatch loop N x 1000 times; doubling the column count should roughly double the per-row decode cost. """ table = f"p34_wide_{n_cols}" cur = txn_conn.cursor() with contextlib.suppress(informix_db.Error): cur.execute(f"DROP TABLE {table}") # Mix of types: id (int), col0..N-2 (int) col_defs = ", ".join([f"c{i} INT" for i in range(n_cols)]) cur.execute(f"CREATE TABLE {table} ({col_defs})") txn_conn.commit() rows = [tuple(j * 7 + i for j in range(n_cols)) for i in range(1000)] placeholders = ", ".join(["?"] * n_cols) cur.executemany( f"INSERT INTO {table} VALUES ({placeholders})", rows ) txn_conn.commit() def run() -> int: cur = txn_conn.cursor() cur.execute(f"SELECT * FROM {table}") rows = cur.fetchall() cur.close() return len(rows) try: benchmark.pedantic(run, rounds=10, iterations=1) finally: with contextlib.suppress(informix_db.Error): cur = txn_conn.cursor() cur.execute(f"DROP TABLE {table}") txn_conn.commit() # ---------------------------------------------------------------------------- # Type-mix workload — realistic application shape # ---------------------------------------------------------------------------- @pytest.fixture(scope="module") def type_mix_table(txn_conn: informix_db.Connection) -> Iterator[str]: """1000 rows mixing INT + VARCHAR + DECIMAL + DATE + FLOAT — representative of a typical business-data row shape.""" import datetime import decimal table = "p34_typemix" cur = txn_conn.cursor() with contextlib.suppress(informix_db.Error): cur.execute(f"DROP TABLE {table}") cur.execute( f"CREATE TABLE {table} (" f" id INT, name VARCHAR(64)," f" amount DECIMAL(12,2), event_date DATE, ratio FLOAT," f" tag SMALLINT)" ) txn_conn.commit() base_date = datetime.date(2024, 1, 1) rows = [ ( i, f"event_{i:05d}", decimal.Decimal(f"{i * 1.5:.2f}"), base_date + datetime.timedelta(days=i % 365), float(i) * 0.001, i % 100, ) for i in range(1000) ] cur.executemany( f"INSERT INTO {table} VALUES (?, ?, ?, ?, ?, ?)", rows ) txn_conn.commit() try: yield table finally: with contextlib.suppress(informix_db.Error): cur = txn_conn.cursor() cur.execute(f"DROP TABLE {table}") txn_conn.commit() def test_select_type_mix_1000_rows( benchmark, txn_conn: informix_db.Connection, type_mix_table: str, ) -> None: """1000-row SELECT with INT/VARCHAR/DECIMAL/DATE/FLOAT/SMALLINT columns — exercises 6 different decoders per row. Compared to test_select_bench_table_all (which is mostly INT + VARCHAR), this exercises the full decoder dispatch including the DECIMAL BCD parser and DATE epoch math. """ def run() -> int: cur = txn_conn.cursor() cur.execute(f"SELECT * FROM {type_mix_table}") rows = cur.fetchall() cur.close() return len(rows) benchmark.pedantic(run, rounds=10, iterations=1)