informix-db/src/informix_db/connections.py
Ryan Malloy 92c4fdbcbf Phase 3: DDL + DML + commit/rollback wire machinery
Cursor.execute now branches on DESCRIBE response's nfields:
  - nfields > 0 → SELECT path (cursor lifecycle: CURNAME+NFETCH+...)
  - nfields == 0 → DDL/DML path (just SQ_EXECUTE then SQ_RELEASE)

Examples that work end-to-end against the dev container:

  cur.execute('CREATE TEMP TABLE t (id INTEGER, name VARCHAR(50))')
  cur.execute("INSERT INTO t VALUES (1, 'hello')")  # rowcount=1
  cur.execute("UPDATE t SET name = 'new' WHERE id = 1")
  cur.execute('DELETE FROM t WHERE id = 1')

Plus full mix: CREATE → 5 INSERTs → SELECT WHERE → DELETE WHERE → SELECT
(see tests/test_dml.py::test_full_dml_cycle_in_one_connection).

Three protocol findings during this push, documented in DECISION_LOG.md:

1. SQ_INSERTDONE (=94) is METADATA, not execution. It arrives in BOTH
   the DESCRIBE response (PREPARE phase) AND the EXECUTE response for
   literal-value INSERTs. The PREPARE-phase SQ_INSERTDONE carries the
   serial values that WILL be assigned IF you execute. The EXECUTE-
   phase SQ_INSERTDONE confirms execution. My initial assumption was
   "PREPARE-phase INSERTDONE means already-executed" — wrong. Skipping
   SQ_EXECUTE made the row not persist (SELECT returned []). Lesson:
   optimization-looking responses may not be what they look like —
   always verify with a follow-up SELECT.

2. SQ_INSERTDONE wire format: 18 bytes (10 byte longint serial8 + 8
   byte bigint bigserial). Per IfxSqli.receiveInsertDone line 2347.
   We read-and-discard for now; Phase 5+ surfaces as Cursor.lastrowid.

3. Transactions: commit() and rollback() are 2-byte messages.
   SQ_CMMTWORK=19 + SQ_EOT for commit; SQ_RBWORK=20 + SQ_EOT for
   rollback. Server responds with SQ_DONE+SQ_EOT in logged databases,
   or SQ_ERR sqlcode=-255 ("Not in transaction") in unlogged databases
   like sysmaster. Wire machinery is implemented; full transaction
   testing needs a logged DB (use ``stores_demo`` from the dev image).

Module changes:
  src/informix_db/cursors.py:
    - execute() branches on nfields (SELECT path vs DDL/DML path)
    - new _execute_dml() does just EXECUTE + RELEASE
    - new _build_execute_pdu() emits the 8-byte SQ_ID(EXECUTE)+EOT
    - _read_describe_response() and _drain_to_eot() handle SQ_INSERTDONE
  src/informix_db/connections.py:
    - commit() / rollback() now functional — send the SQ_CMMTWORK /
      SQ_RBWORK PDU and drain the response

Tests: 40 unit + 24 integration (6 new DML tests) = 64 total, all
green, ruff clean. New tests cover:
  - CREATE TEMP TABLE
  - INSERT (rowcount=1, persists, SELECT shows it)
  - UPDATE WHERE (specific row changed)
  - DELETE WHERE (specific row removed)
  - Full mixed cycle (CREATE + 5 INSERTs + SELECT + DELETE + SELECT)
  - commit() in unlogged DB raises OperationalError sqlcode=-255

Captured wire artifacts kept for future debugging:
  docs/CAPTURES/16-py-insert-literal.socat.log
  docs/CAPTURES/17-py-insert-select.socat.log
2026-05-04 08:02:48 -06:00

545 lines
22 KiB
Python

"""SQLI connection management — login PDU assembly, send, parse, close.
This is the Phase 1 minimum: open socket, send the binary login PDU,
parse the server response, expose ``close()`` (which sends ``SQ_EXIT``).
``cursor()`` is a stub — it lands in Phase 2.
The login PDU layout is documented byte-by-byte in
``docs/PROTOCOL_NOTES.md`` §3 and validated against the captured JDBC
reference in ``docs/CAPTURES/01-connect-only.socat.log``.
"""
from __future__ import annotations
import os
import socket as socket_mod
import struct
import threading
from io import BytesIO
from pathlib import Path
from . import _auth
from ._messages import (
APPL_ID,
APPL_TYPE,
ASF_XCONNECT,
CLIENT_SERIAL,
CLIENT_VERSION,
FLOAT_TYPE,
NET_TLITCP,
PROT_SQLIOL,
UTYPE_INTERNET,
LoginMarker,
MessageType,
SLHeader,
StmtOptions,
)
from ._protocol import IfxStreamReader, IfxStreamWriter, ProtocolError, make_pdu_writer
from ._socket import IfxSocket
from .cursors import Cursor
from .exceptions import InterfaceError, OperationalError
# Default capability bits the JDBC reference sends. Validated against
# 01-connect-only.socat.log via the PDU diff in tests/test_pdu_match.py:
# Cap_1 = 0x0000013c = 316 — appears to be (capability_class << 8) | protocol_version,
# where protocol_version = 0x3c = PF_PROT_SQLI_0600 (=60)
# Cap_2 = 0
# Cap_3 = 0
# Server echoes these back in DecodeAscBinary. The dev image is permissive
# and accepts other values too, but matching JDBC's reference protects us
# against subtle compatibility issues with stricter server configurations.
_DEFAULT_CAP_1 = 0x0000013C
_DEFAULT_CAP_2 = 0
_DEFAULT_CAP_3 = 0
# Default environment variables sent in the login PDU (SQ_ASCENV section).
# These match what the JDBC driver sends for a vanilla en_US.8859-1
# connection. Anything missing makes the server fall back to defaults.
_DEFAULT_ENV: dict[str, str] = {
"DBPATH": ".",
"CLIENT_LOCALE": "en_US.8859-1",
"CLNT_PAM_CAPABLE": "1",
"DBDATE": "Y4MD-",
"IFX_UPDDESC": "1",
"NODEFDAC": "no",
}
class Connection:
"""A SQLI session. Owns one TCP socket and the post-login state.
Constructed via :func:`informix_db.connect`, not directly.
"""
def __init__(
self,
host: str,
port: int,
user: str,
password: str | None,
database: str | None,
server: str,
*,
connect_timeout: float | None = None,
read_timeout: float | None = None,
keepalive: bool = False,
client_locale: str = "en_US.8859-1",
env: dict[str, str] | None = None,
autocommit: bool = False, # honored from Phase 3 onward
):
self._host = host
self._port = port
self._user = user
self._database = database
self._server = server
self._client_locale = client_locale
self._autocommit = autocommit
self._closed = False
self._lock = threading.Lock()
# Build the env-var dict sent in the login PDU.
self._env = dict(_DEFAULT_ENV)
self._env["CLIENT_LOCALE"] = client_locale
if env:
self._env.update(env)
self._sock = IfxSocket(
host,
port,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
keepalive=keepalive,
)
try:
# The login PDU's database field is BROKEN — passing a db name
# there makes the server reject subsequent SQ_DBOPEN with
# sqlcode=-759. JDBC always sends NULL in the login PDU's database
# slot and then opens the db via SQ_DBOPEN in the post-login init.
# We do the same. The actual database opens in `_init_session`.
login_pdu = self._build_login_pdu(user, password, login_database=None)
self._sock.write_all(login_pdu)
self._parse_login_response()
# Post-login session init: protocol negotiation + (optional) DBOPEN.
# Without this, the server silently drops PREPAREs and rejects DBOPEN
# — see PROTOCOL_NOTES.md §6c for the discovery story.
self._init_session()
except Exception:
self._sock.close()
self._closed = True
raise
# -- public API surface (PEP 249-shaped, minimal in Phase 1) -----------
@property
def closed(self) -> bool:
return self._closed
def cursor(self) -> Cursor:
"""Return a new Cursor for executing SQL on this connection."""
if self._closed:
raise InterfaceError("connection is closed")
return Cursor(self)
def _send_pdu(self, pdu: bytes) -> None:
"""Send an assembled PDU. Used by Cursor."""
if self._closed:
raise InterfaceError("connection is closed")
self._sock.write_all(pdu)
def commit(self) -> None:
"""Commit the current transaction (SQ_CMMTWORK)."""
if self._closed:
raise InterfaceError("connection is closed")
# PDU: [short SQ_CMMTWORK=19][short SQ_EOT=12]
self._sock.write_all(struct.pack("!hh", MessageType.SQ_CMMTWORK, MessageType.SQ_EOT))
self._drain_to_eot()
def rollback(self) -> None:
"""Roll back the current transaction (SQ_RBWORK)."""
if self._closed:
raise InterfaceError("connection is closed")
# PDU: [short SQ_RBWORK=20][short SQ_EOT=12]
self._sock.write_all(struct.pack("!hh", MessageType.SQ_RBWORK, MessageType.SQ_EOT))
self._drain_to_eot()
def close(self) -> None:
"""Send SQ_EXIT and tear down the socket. Idempotent."""
with self._lock:
if self._closed:
return
self._closed = True
try:
self._send_exit()
finally:
self._sock.close()
def __enter__(self) -> Connection:
return self
def __exit__(self, *_exc: object) -> None:
self.close()
# -- post-login session init ------------------------------------------
def _init_session(self) -> None:
"""Run the post-login session init dance.
After login the server is in a 'connected but not initialized' state.
Before any SELECT/DML works, it needs:
1. ``SQ_PROTOCOLS`` — feature-bitmap negotiation (the server ignores
PREPAREs until this completes)
2. ``SQ_DBOPEN`` — explicit database open (the login PDU's database
field is advisory only; without DBOPEN the server returns
sqlcode -759 on queries)
We skip the JDBC-additional SQ_INFO and SQ_ID(env vars) steps for
now — they don't appear strictly required for the basic SELECT path.
Phase 2.x can re-add them if needed.
"""
# Step 1: SQ_PROTOCOLS — feature-bitmap negotiation.
# The 8-byte protocols mask is the JDBC reference value from
# docs/CAPTURES/02-select-1.socat.log; we replay it verbatim
# since the bits are opaque (server-recognized features).
protocols_mask = bytes.fromhex("fffc7ffc3c8caa97")
self._send_protocols(protocols_mask)
self._drain_to_eot()
# Step 2: SQ_INFO with INFO_ENV subtype + session env vars.
# The actual on-wire format (from JDBC's sendEnv at IfxSqli.java
# line 2990) is:
# [short SQ_INFO=81][short INFO_ENV=6][short totLen]
# [short LongNameLen][short LongValueLen]
# [for each env var: writeChar(name); writeChar(value)]
# [short 0][short 0] # INFO_DONE markers
# [short SQ_EOT=12]
# Where each writeChar emits [short length][bytes][optional pad].
#
# We replay JDBC's exact 48-byte PDU verbatim. Decoded structure:
# 00 51 00 06 00 26 SQ_INFO + INFO_ENV + totLen=38
# 00 0c 00 04 LongNameLen=12, LongValueLen=4
# 00 06 "DBTEMP" nameLen=6, "DBTEMP" (even, no pad)
# 00 04 "/tmp" valueLen=4, "/tmp" (even, no pad)
# 00 0b "SUBQCACHESZ" 00 nameLen=11, name + 1-byte pad
# 00 02 "10" valueLen=2, "10" (even, no pad)
# 00 00 00 00 INFO_DONE markers (two short 0s)
# 00 0c SQ_EOT
# Hex extracted directly from docs/CAPTURES/02-select-1.socat.log.
self._sock.write_all(
bytes.fromhex(
"005100060026000c00040006444254454d5000042f746d70"
"000b535542514341434845535a000002313000000000000c"
)
)
self._drain_to_eot()
# Step 3: SQ_DBOPEN, if the user requested a specific database.
if self._database is not None:
self._send_dbopen(self._database)
self._drain_to_eot()
def _send_protocols(self, protocols: bytes) -> None:
"""Emit a SQ_PROTOCOLS PDU per ``IfxSqli.sendProtocols``.
Layout: ``[short SQ_PROTOCOLS=126][short payloadLen][bytes payload pad-even][short SQ_EOT]``
"""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_PROTOCOLS)
writer.write_short(len(protocols))
writer.write_padded(protocols)
writer.write_short(MessageType.SQ_EOT)
self._sock.write_all(buf.getvalue())
def _send_dbopen(self, database: str) -> None:
"""Emit a SQ_DBOPEN PDU per JDBC's executeOpenDatabase.
Layout (from capture analysis):
``[short SQ_DBOPEN=36][short nameLen][bytes name][byte 0 if odd-len pad][short mode=0][short SQ_EOT]``
"""
writer, buf = make_pdu_writer()
writer.write_short(MessageType.SQ_DBOPEN)
name_bytes = database.encode("iso-8859-1")
writer.write_short(len(name_bytes))
writer.write_padded(name_bytes) # writes bytes + nul if odd
writer.write_short(0) # mode = 0 (default — read/write access)
writer.write_short(MessageType.SQ_EOT)
self._sock.write_all(buf.getvalue())
def _drain_to_eot(self) -> None:
"""Read response messages until SQ_EOT, dispatching on tag.
Raises ``OperationalError`` on SQ_ERR. Most response payloads we
don't need for session init are skipped after a best-effort length
decode. The SQ_PROTOCOLS reply has its own format; SQ_DONE has
warnings/rowcount/rowid/serial.
"""
while True:
tag = struct.unpack("!h", self._sock.read_exact(2))[0]
if tag == MessageType.SQ_EOT:
return
elif tag == MessageType.SQ_PROTOCOLS:
# ``[short payloadLen][bytes payload][byte 0 if odd-len pad]``
# Then the loop continues and consumes the next tag (usually SQ_EOT).
payload_len = struct.unpack("!h", self._sock.read_exact(2))[0]
if payload_len > 0:
self._sock.read_exact(payload_len)
if payload_len & 1:
self._sock.read_exact(1) # writePadded's even-alignment pad
elif tag == MessageType.SQ_DONE:
# [short warnings][int rows][int rowid][int serial]
self._sock.read_exact(2 + 4 + 4 + 4)
elif tag == 55: # SQ_COST — server appends cost info; ignore
# [int cost1][int cost2]
self._sock.read_exact(4 + 4)
elif tag == MessageType.SQ_ERR:
self._raise_sq_err()
else:
# Unknown tag during session init — fail loudly so we notice
raise OperationalError(
f"unexpected wire tag during session init: 0x{tag:04x} ({tag})"
)
def _raise_sq_err(self) -> None:
"""Decode a SQ_ERR payload and raise OperationalError.
Per ``IfxSqli.receiveError``:
``[short sqlcode][short isamcode][int statementOffset][...]``
"""
sqlcode = struct.unpack("!h", self._sock.read_exact(2))[0]
isamcode = struct.unpack("!h", self._sock.read_exact(2))[0]
offset = struct.unpack("!i", self._sock.read_exact(4))[0] # noqa: F841
# Drain any remaining error payload (varies by sqlcode) until SQ_EOT.
# Best-effort: read shorts and discard until we hit 0x000c.
try:
while True:
next_tag = struct.unpack("!h", self._sock.read_exact(2))[0]
if next_tag == MessageType.SQ_EOT:
break
except OperationalError:
pass
raise OperationalError(f"server returned SQ_ERR sqlcode={sqlcode} isamcode={isamcode}")
# -- login PDU assembly ------------------------------------------------
def _build_login_pdu(
self, user: str, password: str | None, *, login_database: str | None = None
) -> bytes:
"""Assemble the full client→server login PDU.
Returns the SLheader (6 bytes) prepended to the PFheader payload.
Layout per PROTOCOL_NOTES.md §3.
"""
# Build the PFheader (variable-size body).
pf_writer, pf_buf = make_pdu_writer()
self._write_pf_payload(pf_writer, user, password, login_database=login_database)
pf_bytes = pf_buf.getvalue()
# Prepend the SLheader (6 bytes: total length, type, attr, opts).
sl_writer, sl_buf = make_pdu_writer()
sl_writer.write_short(len(pf_bytes) + 6) # total PDU size incl. header
sl_writer.write_byte(SLHeader.SLTYPE_CONREQ) # 1 = connection request
sl_writer.write_byte(SLHeader.PF_PROT_SQLI_0600) # 60 = protocol version
sl_writer.write_short(0) # slOptions (0 in vanilla connect)
return sl_buf.getvalue() + pf_bytes
def _write_pf_payload(
self,
w: IfxStreamWriter,
user: str,
password: str | None,
*,
login_database: str | None = None,
) -> None:
"""Write the PFheader (binary login body), per PROTOCOL_NOTES §3b."""
# Association markers
w.write_short(LoginMarker.SQ_ASSOC)
w.write_short(LoginMarker.SQ_ASCBINARY)
w.write_int(61) # observed magic (probably PF_PROT_SQLI_WITH_CSS)
# Float-type identifier
w.write_short(len(FLOAT_TYPE) + 1)
w.write_bytes(FLOAT_TYPE)
w.write_byte(0)
# Binary parameters block start
w.write_short(LoginMarker.SQ_ASCBPARMS)
w.write_bytes(APPL_TYPE) # 12 bytes "sqlexec\0\0\0\0\0", no length prefix
# Client version (hardcoded "9.280")
w.write_short(len(CLIENT_VERSION) + 1)
w.write_bytes(CLIENT_VERSION)
w.write_byte(0)
# Client serial (hardcoded "RDS#R000000")
w.write_short(len(CLIENT_SERIAL) + 1)
w.write_bytes(CLIENT_SERIAL)
w.write_byte(0)
# Application ID
w.write_short(len(APPL_ID) + 1)
w.write_bytes(APPL_ID)
w.write_byte(0)
# Three negotiated capability ints (Cap_1, Cap_2, Cap_3)
w.write_int(_DEFAULT_CAP_1)
w.write_int(_DEFAULT_CAP_2)
w.write_int(_DEFAULT_CAP_3)
# ?? section: short 1 (purpose unknown; observed in capture)
w.write_short(1)
# Username + password — delegated to the auth handler
_auth.write_plain_password(w, user, password)
# Protocol & network identifiers
w.write_bytes(PROT_SQLIOL) # "ol\0\0\0\0\0\0"
w.write_int(61) # observed magic
w.write_bytes(NET_TLITCP) # "tlitcp\0\0"
w.write_int(UTYPE_INTERNET) # 1
# Init-request marker block
w.write_short(LoginMarker.SQ_ASCINITREQ)
w.write_short(ASF_XCONNECT)
w.write_int(StmtOptions.ASF_AMBIG_SEOL) # 3 in vanilla connect
# Server name
w.write_string_with_nul(self._server)
# Database — always None in the login PDU per the JDBC behavior
# documented in __init__. The user-supplied database opens via
# SQ_DBOPEN in `_init_session`.
if login_database is None:
w.write_short(0)
else:
w.write_string_with_nul(login_database)
# 4 reserved/empty option slots (8 bytes total)
for _ in range(4):
w.write_short(0)
# Environment vars
w.write_short(LoginMarker.SQ_ASCENV)
w.write_short(len(self._env))
for name, value in self._env.items():
w.write_string_with_nul(name)
w.write_string_with_nul(value)
# Process info
w.write_short(LoginMarker.SQ_ASCPINFO)
w.write_int(0) # reserved
w.write_int(os.getpid() & 0x7FFFFFFF)
# threading.get_ident() can exceed signed 32-bit on long-running
# processes; the JDBC reference catches NumberFormatException and
# falls back to 0. We do the same — the field is diagnostic only.
tid = threading.get_ident()
w.write_int(tid if 0 <= tid <= 0x7FFFFFFF else 0)
hostname = socket_mod.gethostname() or "unknown"
w.write_string_with_nul(hostname)
w.write_short(0) # reserved
cwd = str(Path.cwd()) or ""
w.write_string_with_nul(cwd)
# AppName section (SQ_ASCMISC_60)
appname = f"informix-db@pid{os.getpid()}"
w.write_short(LoginMarker.SQ_ASCMISC_60)
w.write_short(10 + len(appname) + 1)
w.write_int(0)
w.write_int(0)
w.write_string_with_nul(appname)
# End-of-PDU marker
w.write_short(LoginMarker.SQ_ASCEOT)
# -- response parsing -------------------------------------------------
def _parse_login_response(self) -> None:
"""Read and parse the server's login response.
Server returns either an SLTYPE_CONACC (success, with server
version + capabilities) or an SLTYPE_CONREJ (rejection, with
error block). We decode just enough to distinguish success from
failure for Phase 1; the full response decode (server version,
capabilities, etc.) lands as it becomes useful.
"""
# First two bytes: total response length (including this field)
length_bytes = self._sock.read_exact(2)
total_length = struct.unpack("!h", length_bytes)[0]
if total_length < 6:
raise ProtocolError(f"login response too short: {total_length} bytes")
# Read the rest of the SLheader + payload
rest = self._sock.read_exact(total_length - 2)
reader = IfxStreamReader(BytesIO(rest))
sl_type = reader.read_byte()
sl_attribute = reader.read_byte() # noqa: F841 — read for stream alignment
sl_options = reader.read_short() # noqa: F841
if sl_type == SLHeader.SLTYPE_CONREJ:
# Rejection — pull out the error message if we can
self._raise_from_rejection(reader)
elif sl_type == SLHeader.SLTYPE_REDIRECT:
raise OperationalError(
"server sent a connection redirect; this driver doesn't "
"follow redirects yet (Phase 6+)"
)
elif sl_type != SLHeader.SLTYPE_CONACC:
raise ProtocolError(f"unknown SLType in login response: {sl_type}")
# SLTYPE_CONACC — connection accepted. We don't (yet) decode the
# full server-side metadata. Phase 1 just needs to know "we got in".
def _raise_from_rejection(self, reader: IfxStreamReader) -> None:
"""Best-effort decode of the connection-rejection error block.
Per PROTOCOL_NOTES.md §3c-d. We try to extract the SQLCODE and
message, but if the layout drifts we raise a generic
OperationalError with whatever bytes we read.
"""
try:
# Skip the SQ_ASSOC + SQ_ASCBINARY markers and the int 61 magic
reader.skip(2 + 2 + 4)
# Then there's a length-prefixed block we skip
sub_length = reader.read_short()
reader.skip(sub_length)
# Then SQ_ASCBPARMS marker
marker = reader.read_short()
if marker != LoginMarker.SQ_ASCBPARMS:
raise OperationalError("server rejected the connection (no decodable error block)")
# Skip 12 bytes of fixed-position metadata, then the version
# string, serial, applid, capabilities — we don't need any of
# that on the failure path, so we just bail out with a clear
# message. Phase 5 expands this to actually find the SQ_ASCINITRESP
# block and pull svcError/osError/Warnings/errMsg.
raise OperationalError("server rejected the connection")
except ProtocolError as e:
raise OperationalError(f"server rejected the connection: {e}") from e
# -- disconnection ----------------------------------------------------
def _send_exit(self) -> None:
"""Send the bare ``[short SQ_EXIT=56]`` disconnect message.
Per PROTOCOL_NOTES.md §8. Server echoes back ``SQ_EXIT`` or
``SQ_EOT``; we read and discard. Errors are swallowed because
we're already tearing down.
"""
try:
self._sock.write_all(struct.pack("!h", MessageType.SQ_EXIT))
# Read the ack — server may interleave one or more SQ_XACTSTAT
# records, so loop until we see SQ_EXIT or SQ_EOT.
for _ in range(8): # bounded loop just in case
tag = struct.unpack("!h", self._sock.read_exact(2))[0]
if tag in (MessageType.SQ_EXIT, MessageType.SQ_EOT):
return
# SQ_XACTSTAT (99) carries 6 bytes of status info we don't need.
if tag == 99:
self._sock.read_exact(6)
continue
# Unknown ack; bail out — we're closing anyway.
return
except (OperationalError, InterfaceError, OSError, ProtocolError):
# Already closing; nothing to do but suppress.
return