cursor.execute("SELECT 1 FROM systables WHERE tabid = 1")
cursor.fetchone() == (1,)
To my knowledge, this is the first time a pure-Python implementation
has read data from Informix without wrapping IBM's CSDK or JDBC.
Three breakthroughs in this commit:
1. Login PDU's database field is BROKEN. Passing a database name there
makes the server reject subsequent SQ_DBOPEN with sqlcode -759
("database not available"). JDBC always sends NULL in the login
PDU's database slot — we now do the same. The user-supplied database
opens via SQ_DBOPEN in _init_session.
2. Post-login session init dance: SQ_PROTOCOLS (8-byte feature mask
replayed verbatim from JDBC) → SQ_INFO with INFO_ENV + env vars
(48-byte PDU replayed verbatim — DBTEMP=/tmp, SUBQCACHESZ=10) →
SQ_DBOPEN. Without all three steps in this exact order, the server
silently ignores SELECTs.
3. SQ_DESCRIBE per-column block has 10 fields per column (not the
simple "name + type" my best-effort parser assumed): fieldIndex,
columnStartPos, columnType, columnExtendedId, ownerName,
extendedName, reference, alignment, sourceType, encodedLength.
The string table at the end is offset-indexed (fieldIndex points
into it), which is how JDBC handles disambiguation.
Cursor lifecycle implementation in cursors.py mirrors JDBC exactly:
PREPARE+NDESCRIBE+WANTDONE → DESCRIBE+DONE+COST+EOT
CURNAME+NFETCH(4096) → TUPLE*+DONE+COST+EOT
NFETCH(4096) → DONE+COST+EOT (drain)
CLOSE → EOT
RELEASE → EOT
Five round trips per SELECT — same as JDBC.
Module changes:
src/informix_db/connections.py — added _init_session(), _send_protocols(),
_send_dbopen(), _drain_to_eot(), _raise_sq_err(); login PDU now
forces database=None always; SQ_INFO PDU replayed verbatim from
JDBC capture (offsets-indexed env-var format too gnarly to derive
in MVP).
src/informix_db/cursors.py — full rewrite: real PDU builders for
PREPARE/CURNAME+NFETCH/NFETCH/CLOSE/RELEASE; tag-dispatched
response readers; cursor-name generator matching JDBC's "_ifxc"
convention.
src/informix_db/_resultset.py — proper SQ_DESCRIBE parser per
JDBC's receiveDescribe (USVER mode); offset-indexed string table
with name lookup by fieldIndex; ColumnInfo dataclass with raw
type-code preserved for null-flag extraction.
src/informix_db/_messages.py — added SQ_NDESCRIBE=22, SQ_WANTDONE=49.
Test coverage: 40 unit + 15 integration tests (7 smoke + 8 new SELECT)
= 55 total, all green, ruff clean. New tests cover:
- SELECT 1 returns (1,)
- cursor.description shape per PEP 249
- Multi-row INT SELECT
- Multi-column mixed types (INT + FLOAT)
- Iterator protocol (for row in cursor)
- fetchmany(n)
- Re-executing on same cursor resets state
- Two cursors on one connection (sequential)
Known gap: VARCHAR row decoding doesn't yet handle the variable-width
on-wire encoding correctly. Phase 2.x will address — for now NotImpl
errors surface raw bytes in the row tuple.
123 lines
4.3 KiB
Python
123 lines
4.3 KiB
Python
"""Phase 2 integration tests — SELECT execution end-to-end.
|
|
|
|
Marked ``integration`` so the default ``pytest`` invocation skips them.
|
|
Run with ``pytest -m integration`` after ``docker compose up``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
import informix_db
|
|
from tests.conftest import ConnParams
|
|
|
|
pytestmark = pytest.mark.integration
|
|
|
|
|
|
def _connect(conn_params: ConnParams) -> informix_db.Connection:
|
|
return informix_db.connect(
|
|
host=conn_params.host,
|
|
port=conn_params.port,
|
|
user=conn_params.user,
|
|
password=conn_params.password,
|
|
database=conn_params.database,
|
|
server=conn_params.server,
|
|
connect_timeout=10.0,
|
|
read_timeout=10.0,
|
|
)
|
|
|
|
|
|
def test_select_1_returns_one_tuple(conn_params: ConnParams) -> None:
|
|
"""The canonical Phase 2 milestone: ``SELECT 1`` → ``(1,)``."""
|
|
with _connect(conn_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
assert cur.fetchone() == (1,)
|
|
assert cur.fetchone() is None # no more rows
|
|
|
|
|
|
def test_select_1_description_shape(conn_params: ConnParams) -> None:
|
|
"""``cursor.description`` is a 7-tuple per PEP 249."""
|
|
with _connect(conn_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
assert cur.description is not None
|
|
assert len(cur.description) == 1
|
|
col = cur.description[0]
|
|
assert len(col) == 7
|
|
# (name, type_code, display_size, internal_size, precision, scale, null_ok)
|
|
name, type_code, display_size, internal_size, precision, scale, null_ok = col
|
|
assert name == "(constant)"
|
|
assert type_code == 2 # IfxType.INT
|
|
assert display_size == internal_size == 4
|
|
|
|
|
|
def test_select_multi_row_int(conn_params: ConnParams) -> None:
|
|
"""Multi-row INT SELECT — fetchall returns a list of tuples."""
|
|
with _connect(conn_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT FIRST 5 tabid FROM systables ORDER BY tabid")
|
|
rows = cur.fetchall()
|
|
assert len(rows) == 5
|
|
assert rows == [(1,), (2,), (3,), (4,), (5,)]
|
|
assert cur.rowcount == 5
|
|
|
|
|
|
def test_select_multi_column_mixed_types(conn_params: ConnParams) -> None:
|
|
"""Multi-column with mixed types (INT + FLOAT)."""
|
|
with _connect(conn_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT FIRST 3 tabid, nrows FROM systables ORDER BY tabid")
|
|
rows = cur.fetchall()
|
|
assert len(rows) == 3
|
|
for tabid, nrows in rows:
|
|
assert isinstance(tabid, int)
|
|
assert isinstance(nrows, float)
|
|
names = [c[0] for c in cur.description]
|
|
assert names == ["tabid", "nrows"]
|
|
|
|
|
|
def test_iterator_protocol(conn_params: ConnParams) -> None:
|
|
"""Cursor supports the iterator protocol — ``for row in cursor``."""
|
|
with _connect(conn_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT FIRST 3 tabid FROM systables ORDER BY tabid")
|
|
rows = list(cur)
|
|
assert rows == [(1,), (2,), (3,)]
|
|
|
|
|
|
def test_fetchmany(conn_params: ConnParams) -> None:
|
|
"""``fetchmany(n)`` returns up to n rows."""
|
|
with _connect(conn_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT FIRST 5 tabid FROM systables ORDER BY tabid")
|
|
first_two = cur.fetchmany(2)
|
|
assert first_two == [(1,), (2,)]
|
|
rest = cur.fetchall()
|
|
assert rest == [(3,), (4,), (5,)]
|
|
|
|
|
|
def test_two_executes_on_same_cursor(conn_params: ConnParams) -> None:
|
|
"""Re-executing on the same cursor resets state cleanly."""
|
|
with _connect(conn_params) as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
assert cur.fetchone() == (1,)
|
|
|
|
cur.execute("SELECT 2 FROM systables WHERE tabid = 1")
|
|
assert cur.fetchone() == (2,)
|
|
|
|
|
|
def test_two_cursors_on_same_connection(conn_params: ConnParams) -> None:
|
|
"""Two cursors on one connection — used sequentially (Phase 4 may parallel-ize)."""
|
|
with _connect(conn_params) as conn:
|
|
cur1 = conn.cursor()
|
|
cur1.execute("SELECT 1 FROM systables WHERE tabid = 1")
|
|
assert cur1.fetchone() == (1,)
|
|
cur1.close()
|
|
|
|
cur2 = conn.cursor()
|
|
cur2.execute("SELECT FIRST 2 tabid FROM systables ORDER BY tabid")
|
|
assert cur2.fetchall() == [(1,), (2,)]
|
|
cur2.close()
|