"""DB-API 2.0 Cursor — SELECT execution and row iteration. Phase 2 implements the full JDBC cursor lifecycle for parameterless SELECTs: C → PREPARE + NDESCRIBE + WANTDONE (one PDU) C ← DESCRIBE (column metadata) + DONE + COST + EOT C → SQ_ID(CURNAME) + SQ_ID(NFETCH 4096) (one PDU) C ← TUPLE* + DONE + COST + EOT (rows + completion) C → SQ_ID(NFETCH 4096) (drain to end) C ← DONE + COST + EOT (no more rows) C → SQ_ID(CLOSE) (close cursor) C ← EOT C → SQ_ID(RELEASE) (release statement) C ← EOT Parameter binding (SQ_BIND inserted between PREPARE and CURNAME) lands in Phase 4. """ from __future__ import annotations import itertools import struct from collections.abc import Iterator from typing import TYPE_CHECKING, Any from ._messages import MessageType from ._protocol import IfxStreamReader, make_pdu_writer from ._resultset import ColumnInfo, parse_describe, parse_tuple_payload from .converters import encode_param from .exceptions import ( DatabaseError, InterfaceError, NotSupportedError, ProgrammingError, ) if TYPE_CHECKING: from .connections import Connection # Process-wide cursor name counter — appended to a "_ifxc" prefix to mimic # JDBC's auto-generated names. _cursor_counter = itertools.count(1) _NUMERIC_PLACEHOLDER_RE = __import__("re").compile(r":(\d+)") def _rewrite_numeric_to_qmark(sql: str) -> str: """Convert ``:1`` / ``:2`` placeholders (paramstyle="numeric") to ``?``. Informix's wire protocol uses ``?`` natively. Since we expose ``paramstyle="numeric"`` in the public API (matches Informix ESQL/C convention), we rewrite before sending. Trivial cases only — strings and comments are NOT escaped, so SQL containing literal ``:1`` inside string literals will be wrongly substituted. Phase 5 can add a proper SQL tokenizer. """ return _NUMERIC_PLACEHOLDER_RE.sub("?", sql) def _generate_cursor_name() -> str: """Generate a unique cursor name per the JDBC convention. JDBC names are "_ifxc" + zero-padded counter, total 18 characters. We replicate the format so the server treats us identically. """ n = next(_cursor_counter) return f"_ifxc{n:013d}" # 5-char prefix + 13 digits = 18 chars class Cursor: """PEP 249 Cursor over a SQLI session.""" arraysize: int = 1 def __init__(self, connection: Connection): self._conn = connection self._closed = False self._description: list[tuple] | None = None self._columns: list[ColumnInfo] = [] self._rowcount: int = -1 self._rows: list[tuple] = [] self._row_iter: Iterator[tuple] | None = None # Set if the DESCRIBE response already includes SQ_INSERTDONE — # Informix optimizes literal-value INSERTs by executing during # PREPARE. In that case we skip SQ_EXECUTE and go straight to RELEASE. self._statement_already_done = False # -- PEP 249 attributes ------------------------------------------------ @property def description(self) -> list[tuple] | None: return self._description @property def rowcount(self) -> int: return self._rowcount @property def closed(self) -> bool: return self._closed # -- PEP 249 methods --------------------------------------------------- def execute(self, operation: str, parameters: Any = None) -> None: """Execute a single SQL statement, optionally with bound parameters. ``parameters`` is a sequence (tuple/list) matching the ``?`` or ``:N`` placeholders in ``operation``. Phase 4 supports int, float, str, bool, None. """ self._check_open() # Normalize parameters to a tuple for indexing. params: tuple = () if parameters is not None: if isinstance(parameters, dict): raise NotSupportedError("named parameters not yet supported (use positional)") params = tuple(parameters) # If using paramstyle="numeric", rewrite :1 / :2 → ? sql = _rewrite_numeric_to_qmark(operation) if params else operation # Reset previous-execute state. self._description = None self._columns = [] self._rowcount = -1 self._rows = [] self._row_iter = None self._statement_already_done = False # Step 1: PREPARE — send SQL with numQmarks = len(params). self._conn._send_pdu(self._build_prepare_pdu(sql, num_qmarks=len(params))) self._read_describe_response() # Branch on the SQL keyword. We can't use ``self._columns`` / # ``nfields`` here because a parameterized INSERT also returns # a non-empty DESCRIBE (server describes the would-be inserted # row's columns). The SQL-keyword heuristic is what JDBC effectively # does too via its IfxStatement / IfxPreparedStatement subclassing. first_word = sql.lstrip().split(None, 1)[0].upper() if sql.strip() else "" is_select = first_word == "SELECT" if is_select: if params: self._execute_select_with_params(params) else: self._execute_select() elif params: self._execute_dml_with_params(params) else: self._execute_dml() if self._description is not None: self._row_iter = iter(self._rows) def _execute_select_with_params(self, params: tuple) -> None: """Parameterized SELECT: SQ_BIND → CURNAME+NFETCH → drain → CLOSE+RELEASE. Note that CURNAME defines the cursor name and is paired with the prepared statement; binding happens before opening the cursor. We send SQ_BIND alone first (no SQ_EXECUTE — that's for DML), then proceed with the normal cursor open + fetch flow. """ # Send SQ_BIND alone (without SQ_EXECUTE chained — for SELECT, # opening the cursor is what executes the prepared query). self._conn._send_pdu(self._build_bind_only_pdu(params)) self._drain_to_eot() # Now open the cursor and fetch — the bound values are in scope # for the prepared statement. self._execute_select() def _execute_select(self) -> None: """Run the SELECT cursor lifecycle: CURNAME+NFETCH → drain → CLOSE → RELEASE.""" cursor_name = _generate_cursor_name() self._conn._send_pdu(self._build_curname_nfetch_pdu(cursor_name)) self._read_fetch_response() # Drain — fetch again to confirm no more rows. # (JDBC always does this; the second fetch returns DONE only.) self._conn._send_pdu(self._build_nfetch_pdu()) self._read_fetch_response() self._conn._send_pdu(self._build_close_pdu()) self._drain_to_eot() self._conn._send_pdu(self._build_release_pdu()) self._drain_to_eot() def _execute_dml_with_params(self, params: tuple) -> None: """DML with bound parameters: SQ_BIND + SQ_EXECUTE → SQ_RELEASE. Per JDBC's sendExecute path for prepared statements (line 1108 of IfxSqli): build a single PDU containing SQ_BIND with all parameter values followed by SQ_EXECUTE. """ self._conn._send_pdu(self._build_bind_execute_pdu(params)) self._drain_to_eot() self._conn._send_pdu(self._build_release_pdu()) self._drain_to_eot() def _execute_dml(self) -> None: """Run the DDL/DML path: SQ_EXECUTE → SQ_RELEASE. For statements that don't return rows (CREATE, INSERT, UPDATE, DELETE, DROP), the server's DESCRIBE response has ``nfields=0``. We don't open a cursor — just execute the prepared statement and release it. Per JDBC's executeExecute path for non-prepared statements (line 1075 of IfxSqli.sendExecute). Note: when the DESCRIBE response includes SQ_INSERTDONE for a literal-value INSERT, that's METADATA about the would-be insert (auto-generated serial values), NOT the actual execution. We still need SQ_EXECUTE to make the row persist. Lesson: don't let the optimization-looking response confuse you. """ self._conn._send_pdu(self._build_execute_pdu()) self._drain_to_eot() # reads DONE + COST + EOT, populates rowcount self._conn._send_pdu(self._build_release_pdu()) self._drain_to_eot() def executemany(self, operation: str, seq_of_parameters: Any) -> None: """Execute the same SQL once per parameter set. Per PEP 249. Common case is batched INSERT. We PREPARE once, loop SQ_BIND+SQ_EXECUTE per parameter set, then RELEASE once — much cheaper than calling ``execute()`` N times (which would PREPARE+RELEASE on each iteration). Phase 4 supports DML (INSERT/UPDATE/DELETE) only — SELECT in executemany doesn't make much sense and isn't implemented. """ self._check_open() seq = list(seq_of_parameters) if not seq: self._rowcount = 0 return # All parameter tuples must agree on length (= num placeholders). first_len = len(seq[0]) for i, p in enumerate(seq): if len(p) != first_len: raise ProgrammingError( f"executemany: parameter set [{i}] has {len(p)} values, " f"expected {first_len} (matching set [0])" ) # Detect SELECT — not supported in executemany. first_word = operation.lstrip().split(None, 1)[0].upper() if operation.strip() else "" if first_word == "SELECT": raise NotSupportedError("executemany on SELECT is not supported") sql = _rewrite_numeric_to_qmark(operation) # Reset per-execute state. self._description = None self._columns = [] self._rowcount = -1 self._rows = [] self._row_iter = None self._statement_already_done = False # PREPARE once. self._conn._send_pdu(self._build_prepare_pdu(sql, num_qmarks=first_len)) self._read_describe_response() # BIND+EXECUTE per parameter set. total_rowcount = 0 for params in seq: self._rowcount = -1 self._conn._send_pdu(self._build_bind_execute_pdu(tuple(params))) self._drain_to_eot() if self._rowcount > 0: total_rowcount += self._rowcount # RELEASE once. self._conn._send_pdu(self._build_release_pdu()) self._drain_to_eot() self._rowcount = total_rowcount def fetchone(self) -> tuple | None: self._check_open() if self._row_iter is None: return None return next(self._row_iter, None) def fetchmany(self, size: int | None = None) -> list[tuple]: self._check_open() n = size if size is not None else self.arraysize out: list[tuple] = [] for _ in range(n): row = self.fetchone() if row is None: break out.append(row) return out def fetchall(self) -> list[tuple]: self._check_open() if self._row_iter is None: return [] out = list(self._row_iter) self._row_iter = iter([]) return out def close(self) -> None: self._closed = True self._row_iter = None def __iter__(self) -> Iterator[tuple]: return self def __next__(self) -> tuple: row = self.fetchone() if row is None: raise StopIteration return row def __enter__(self) -> Cursor: return self def __exit__(self, *_exc: object) -> None: self.close() # -- internals --------------------------------------------------------- def _check_open(self) -> None: if self._closed: raise InterfaceError("cursor is closed") if self._conn.closed: raise InterfaceError("connection is closed") # -- PDU builders ----------------------------------------------------- def _build_prepare_pdu(self, sql: str, num_qmarks: int = 0) -> bytes: """SQ_PREPARE + SQ_NDESCRIBE + SQ_WANTDONE + SQ_EOT. Per IfxSqli.sendPrepare. SQL uses 4-byte length prefix on modern servers (isRemove64KLimitSupported), with even-byte alignment pad. ``num_qmarks`` is the count of ``?`` placeholders in the SQL. """ writer, buf = make_pdu_writer() writer.write_short(MessageType.SQ_PREPARE) writer.write_short(num_qmarks) sql_bytes = sql.encode("iso-8859-1") writer.write_int(len(sql_bytes)) writer.write_bytes(sql_bytes) if (4 + len(sql_bytes)) & 1: writer.write_byte(0) # writeChar pad writer.write_short(MessageType.SQ_NDESCRIBE) writer.write_short(MessageType.SQ_WANTDONE) writer.write_short(MessageType.SQ_EOT) return buf.getvalue() def _build_bind_only_pdu(self, params: tuple) -> bytes: """SQ_BIND with parameter values + SQ_EOT (no SQ_EXECUTE). Used for parameterized SELECT — the cursor open (CURNAME+NFETCH) is what triggers query execution; SQ_BIND just binds the values in scope for the prepared statement. """ writer, buf = make_pdu_writer() writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_BIND) writer.write_short(len(params)) for value in params: if value is None: writer.write_short(0) writer.write_short(-1) writer.write_short(0) else: ifx_type, prec, raw = encode_param(value) writer.write_short(ifx_type) writer.write_short(0) writer.write_short(prec) writer.write_padded(raw) writer.write_short(MessageType.SQ_EOT) return buf.getvalue() def _build_bind_execute_pdu(self, params: tuple) -> bytes: """SQ_BIND with parameter values + SQ_EXECUTE + SQ_EOT. From the JDBC capture (msg[29] in 02-dml-cycle.socat.log): [short SQ_ID=4][int 5=SQ_BIND][short numparams] for each param: [short type][short indicator][short prec] writePadded(data) # data + 0-pad if odd-len [short SQ_EXECUTE=7] [short SQ_EOT] """ writer, buf = make_pdu_writer() writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_BIND) # action = 5 writer.write_short(len(params)) for value in params: if value is None: # NULL: type=0, indicator=-1, prec=0, no data writer.write_short(0) writer.write_short(-1) writer.write_short(0) else: ifx_type, prec, raw = encode_param(value) writer.write_short(ifx_type) writer.write_short(0) # indicator = 0 (normal) writer.write_short(prec) writer.write_padded(raw) writer.write_short(MessageType.SQ_EXECUTE) # 7 writer.write_short(MessageType.SQ_EOT) return buf.getvalue() def _build_curname_nfetch_pdu(self, cursor_name: str) -> bytes: """SQ_ID(CURNAME) + SQ_ID(NFETCH 4096) chained. From the JDBC capture (msg[21]): [short SQ_ID=4][int 3][short nameLen][bytes name][short 6] [short SQ_ID=4][int 9][int 4096][int 0] [short SQ_EOT] The trailing ``[short 6]`` after the cursor name is opaque (cursor type / scrollability flag from JDBC's ``sendCursorName``); we replay JDBC's value verbatim. """ writer, buf = make_pdu_writer() # CURNAME writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_CURNAME) # action = 3 name_bytes = cursor_name.encode("ascii") writer.write_short(len(name_bytes)) writer.write_bytes(name_bytes) if len(name_bytes) & 1: writer.write_byte(0) writer.write_short(6) # cursor-type flag from JDBC # NFETCH (note: trailing field is a SHORT, not an int — # caught by byte-diff against JDBC's 42-byte reference PDU, # see docs/CAPTURES/14-py-varchar-fail.socat.log analysis) writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_NFETCH) # action = 9 writer.write_int(4096) # max bytes per fetch writer.write_short(0) # reserved short (NOT int) writer.write_short(MessageType.SQ_EOT) return buf.getvalue() def _build_nfetch_pdu(self) -> bytes: """SQ_ID(NFETCH 4096) + SQ_EOT — used to drain remaining rows.""" writer, buf = make_pdu_writer() writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_NFETCH) writer.write_int(4096) writer.write_short(0) # reserved short (matches JDBC, not int) writer.write_short(MessageType.SQ_EOT) return buf.getvalue() def _build_execute_pdu(self) -> bytes: """SQ_ID(EXECUTE=7) + SQ_EOT — runs the most-recently-prepared statement. From JDBC capture msg[21] in 02-dml-cycle.socat.log: 8 bytes, ``00 04 00 00 00 07 00 0c``. """ writer, buf = make_pdu_writer() writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_EXECUTE) # action = 7 writer.write_short(MessageType.SQ_EOT) return buf.getvalue() def _build_close_pdu(self) -> bytes: """SQ_ID(CLOSE) + SQ_EOT.""" writer, buf = make_pdu_writer() writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_CLOSE) # 10 writer.write_short(MessageType.SQ_EOT) return buf.getvalue() def _build_release_pdu(self) -> bytes: """SQ_ID(RELEASE) + SQ_EOT.""" writer, buf = make_pdu_writer() writer.write_short(MessageType.SQ_ID) writer.write_int(MessageType.SQ_RELEASE) # 11 writer.write_short(MessageType.SQ_EOT) return buf.getvalue() # -- response readers ------------------------------------------------- def _read_describe_response(self) -> None: """Read DESCRIBE (+ optional SQ_INSERTDONE) + DONE + COST + EOT after PREPARE.""" reader = _SocketReader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: return elif tag == MessageType.SQ_DESCRIBE: self._columns, _ = parse_describe(reader) self._description = ( [c.to_description_tuple() for c in self._columns] if self._columns else None ) elif tag == 94: # SQ_INSERTDONE — Informix optimization: literal # INSERT executed during PREPARE. Payload is: # readLongInt (10 bytes) — serial8 inserted # readLongBigint (8 bytes) — bigserial inserted (modern servers) # See IfxSqli.receiveInsertDone (line 2347). reader.read_exact(10 + 8) self._statement_already_done = True self._rowcount = 1 # best-effort; literal INSERT = 1 row elif tag == MessageType.SQ_DONE: self._consume_done(reader) elif tag == 55: # SQ_COST reader.read_int() reader.read_int() elif tag == MessageType.SQ_ERR: self._raise_sq_err(reader) else: raise DatabaseError(f"unexpected tag in DESCRIBE response: 0x{tag:04x}") def _read_fetch_response(self) -> None: """Read TUPLE* + DONE + COST + EOT after an NFETCH.""" reader = _SocketReader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: return elif tag == MessageType.SQ_TUPLE: row = parse_tuple_payload(reader, self._columns) self._rows.append(row) elif tag == MessageType.SQ_DONE: self._consume_done(reader) elif tag == 55: # SQ_COST reader.read_int() reader.read_int() elif tag == MessageType.SQ_ERR: self._raise_sq_err(reader) else: raise DatabaseError(f"unexpected tag in FETCH response: 0x{tag:04x}") def _drain_to_eot(self) -> None: """Read response stream until SQ_EOT, allowing common tags in between.""" reader = _SocketReader(self._conn._sock) while True: tag = reader.read_short() if tag == MessageType.SQ_EOT: return elif tag == MessageType.SQ_DONE: self._consume_done(reader) elif tag == 55: # SQ_COST reader.read_int() reader.read_int() elif tag == 94: # SQ_INSERTDONE # serial8 (10 bytes) + bigserial (8 bytes) reader.read_exact(10 + 8) # If the server sent INSERTDONE, the row was inserted. # Track best-effort rowcount = 1 for literal-value INSERTs. if self._rowcount < 0: self._rowcount = 1 elif tag == MessageType.SQ_ERR: self._raise_sq_err(reader) else: raise DatabaseError(f"unexpected tag while draining: 0x{tag:04x}") def _consume_done(self, reader: IfxStreamReader) -> None: """SQ_DONE: [short warnings][int rowsAffected][int rowid][int serial].""" reader.read_short() # warnings rows = reader.read_int() reader.read_int() # rowid reader.read_int() # serial if rows >= 0: self._rowcount = rows def _raise_sq_err(self, reader: IfxStreamReader) -> None: """Decode SQ_ERR per IfxSqli.receiveError and raise.""" sqlcode = reader.read_short() isamcode = reader.read_short() reader.read_int() # offset into statement # Drain remaining error bytes until SQ_EOT. try: while True: t = reader.read_short() if t == MessageType.SQ_EOT: break except Exception: pass raise ProgrammingError(f"server returned SQ_ERR sqlcode={sqlcode} isamcode={isamcode}") class _SocketReader(IfxStreamReader): """``IfxStreamReader`` backed by an ``IfxSocket`` — pulls bytes from the wire on demand.""" def __init__(self, sock): self._sock = sock from io import BytesIO super().__init__(BytesIO(b"")) def read_exact(self, n: int) -> bytes: return self._sock.read_exact(n) def read_short(self) -> int: return struct.unpack("!h", self.read_exact(2))[0] def read_int(self) -> int: return struct.unpack("!i", self.read_exact(4))[0]