"""Tier 2 benchmarks — observability and concurrency. The existing benchmark suite measures *single-call latency* under low contention. This file adds three benchmark categories that verify claims the driver makes elsewhere but doesn't currently prove: 1. **Memory-growth during streaming fetch** — USAGE.md claims iterator- based fetch keeps memory flat. We verify by sampling RSS every 1000 rows during a 100k-row iteration; the slope must be near-zero. 2. **Latency percentiles** — most benchmarks report mean/median, but for SLO-bound applications p95/p99/max matter more. We hammer ``SELECT 1`` with 1000 round-trips and report the full distribution. 3. **Concurrent pool throughput** — the pool is supposed to give per-request fairness and aggregate throughput scaling. We verify with N=2/4/8 worker threads each running 100 queries. These benchmarks have a different shape than ``test_*_perf.py``: instead of timing one operation per round and reporting median, they run the entire workload once per round and assert the *shape* of the result. The output goes to stdout for inspection rather than into the pytest-benchmark JSON archive (which doesn't model multi-dimensional results well). """ from __future__ import annotations import gc import statistics import threading import time import pytest import informix_db from tests.conftest import ConnParams pytestmark = [pytest.mark.benchmark, pytest.mark.integration] def test_streaming_fetch_memory_profile( bench_conn: informix_db.Connection, bench_table: str ) -> None: """Document the memory profile of iterator-based fetch. The current cursor materializes the full result set on ``execute()`` (Phase 17 in-memory model), so memory IS expected to grow proportional to row count. This test: 1. Records the actual growth shape so it's visible in CI output. 2. Provides a regression baseline — if growth ever exceeds 100 MB for a 1k-row table, something is leaking. 3. Is the future regression test for a streaming/server-cursor mode that maintains constant memory. """ import resource cur = bench_conn.cursor() cur.execute(f"SELECT * FROM {bench_table}") def rss_kb() -> int: return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss samples: list[tuple[int, int]] = [] rows_seen = 0 initial_rss = rss_kb() samples.append((0, initial_rss)) for _ in cur: rows_seen += 1 if rows_seen % 100 == 0: samples.append((rows_seen, rss_kb())) cur.close() gc.collect() final_rss = rss_kb() print(f"\nstreaming_fetch memory profile ({rows_seen} rows from {bench_table}):") for rows, rss in samples[::2]: # every other sample to keep output short print(f" rows={rows:>5} rss={rss:>8} KB (Δ={rss - initial_rss:+} KB)") print(f" final={final_rss} KB after gc.collect()") # Regression wall: 100 MB growth for 1k rows would mean we're # leaking ~100 KB/row. Realistic in-memory cost is ~500 bytes/row, # so growth should be well under 1 MB. growth_kb = final_rss - initial_rss assert growth_kb < 100_000, ( f"streaming fetch grew RSS by {growth_kb} KB for {rows_seen} rows " f"— cursor is leaking or holding references it shouldn't" ) def test_select_1_latency_percentiles( bench_conn: informix_db.Connection, ) -> None: """Run ``SELECT 1`` 1000 times; report p50/p90/p95/p99/max. Mean alone is misleading for latency-sensitive applications — the tail (p95/p99) is what actually breaks SLOs. A 200 us mean with a 5 ms p99 means 1% of requests are 25x slower than typical. No assertions: the test exists to surface the distribution shape so a regression that worsens the tail without moving the mean becomes visible to a human reviewer. """ timings: list[float] = [] # Warmup for _ in range(20): cur = bench_conn.cursor() cur.execute("SELECT 1 FROM systables WHERE tabid = 1") cur.fetchone() cur.close() # Measure for _ in range(1000): t0 = time.perf_counter() cur = bench_conn.cursor() cur.execute("SELECT 1 FROM systables WHERE tabid = 1") cur.fetchone() cur.close() timings.append(time.perf_counter() - t0) timings.sort() def at(p: float) -> float: return timings[int(p * len(timings))] p50 = at(0.50) p90 = at(0.90) p95 = at(0.95) p99 = at(0.99) p_max = timings[-1] print("\nSELECT 1 latency distribution (n=1000):") print(f" p50 = {p50 * 1e6:>8.1f} µs") print(f" p90 = {p90 * 1e6:>8.1f} µs") print(f" p95 = {p95 * 1e6:>8.1f} µs") print(f" p99 = {p99 * 1e6:>8.1f} µs") print(f" max = {p_max * 1e6:>8.1f} µs") print(f" mean = {statistics.mean(timings) * 1e6:>8.1f} µs") print(f" ratio p99/p50 = {p99 / p50:.2f}x") # Sanity check: p50 should be sub-millisecond on loopback. If # this fails, something is wrong with the test environment, not # the driver. assert p50 < 0.001, f"p50 latency {p50 * 1e6:.0f} µs > 1 ms — env issue" @pytest.mark.parametrize("n_threads", [2, 4, 8]) def test_concurrent_pool_throughput( conn_params: ConnParams, n_threads: int ) -> None: """N worker threads each run M queries through a shared pool. Reports aggregate queries/sec and per-thread mean latency. Verifies the pool actually parallelizes work (aggregate throughput should scale roughly linearly with N up to the wire's saturation point). """ QUERIES_PER_WORKER = 100 pool = informix_db.create_pool( host=conn_params.host, port=conn_params.port, user=conn_params.user, password=conn_params.password, database=conn_params.database, server=conn_params.server, autocommit=True, min_size=n_threads, max_size=n_threads, ) try: per_thread_timings: dict[int, list[float]] = {i: [] for i in range(n_threads)} barrier = threading.Barrier(n_threads + 1) # +1 for main thread def worker(tid: int) -> None: barrier.wait() # synchronize start for _ in range(QUERIES_PER_WORKER): t0 = time.perf_counter() with pool.connection() as conn: cur = conn.cursor() cur.execute("SELECT 1 FROM systables WHERE tabid = 1") cur.fetchone() cur.close() per_thread_timings[tid].append(time.perf_counter() - t0) threads = [ threading.Thread(target=worker, args=(i,)) for i in range(n_threads) ] for t in threads: t.start() barrier.wait() # release all workers simultaneously wall_start = time.perf_counter() for t in threads: t.join(timeout=60.0) assert not t.is_alive() wall_total = time.perf_counter() - wall_start total_queries = n_threads * QUERIES_PER_WORKER agg_qps = total_queries / wall_total all_timings = [t for ts in per_thread_timings.values() for t in ts] all_timings.sort() median_per_call = all_timings[len(all_timings) // 2] # Per-thread fairness check: each thread's count should equal # QUERIES_PER_WORKER (all completed) for tid, ts in per_thread_timings.items(): assert len(ts) == QUERIES_PER_WORKER, ( f"thread {tid} only completed {len(ts)}/{QUERIES_PER_WORKER}" ) print(f"\nconcurrent pool throughput (N={n_threads} threads):") print(f" total queries = {total_queries}") print(f" wall time = {wall_total * 1000:.1f} ms") print(f" aggregate QPS = {agg_qps:.1f}") print(f" median per-call = {median_per_call * 1e6:.1f} µs") print(f" per-thread fairness: all {n_threads} completed all " f"{QUERIES_PER_WORKER} queries") finally: pool.close()