Phase 13: SQ_FPROUTINE / SQ_EXFPROUTINE fast-path RPC
Implements direct stored-procedure invocation via the parallel
fast-path protocol family. Three new wire messages:
* SQ_GETROUTINE (101) — handle resolution by signature
* SQ_EXFPROUTINE (102) — execute by handle with bound params
* SQ_FPROUTINE (103) — response with return values
API: Connection.fast_path_call(signature, *params) -> list
Routine handles cached per-connection in a dict[signature -> (db_name,
handle)] — first call resolves and caches, subsequent calls skip
GETROUTINE.
Why this matters even though Phase 10/11 already do most smart-LOB
work via SQL: ifx_lo_close(int) can't be invoked via "EXECUTE FUNCTION"
(returns -674). Without the fast-path, opened locators leak server-side
until the session ends. The fast-path also enables tighter UDF-in-loop
workloads — no PREPARE→DESCRIBE→EXECUTE overhead, just GETROUTINE+
EXFPROUTINE (one round-trip after caching).
Wire format examples (verified against JDBC):
* GETROUTINE request:
[short 101][byte 0][int sigLen][sig bytes][pad if odd]
[short 0][short SQ_EOT]
* EXFPROUTINE request:
[short 102][short dbNameLen][dbName][pad if odd][int handle]
[short paramCount][short fparamFlag][SQ_BIND data][short SQ_EOT]
* FPROUTINE response:
[short numReturns][per-return: type/UDT-info/ind/prec/data]
+ drain SQ_DONE/SQ_COST/SQ_XACTSTAT until SQ_EOT
MVP scope:
* Scalar params/returns only (int/float/str/bool/None/etc.)
* UDT params (e.g., 72-byte BLOB locator) deferred to Phase 13.x
* SQ_LODATA chunked I/O deferred — Phase 10/11 already cover read/write
Tests: 5 integration tests covering error paths, success paths,
handle caching, and multiple cycles.
Total: 64 unit + 139 integration = 203 tests.
Architectural milestone: with Phase 13 complete, the project now
covers every wire-message family JDBC uses for ordinary database
work. Only TLS handshake and cluster-redirect (replication failover)
remain unimplemented — neither is needed for a single-instance
driver.
This commit is contained in:
parent
9048335462
commit
fa3ab751f9
@ -950,6 +950,69 @@ This phase took less than an hour to ship after the protocol research from Phase
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
|
## 2026-05-04 — Phase 13: SQ_FPROUTINE / SQ_EXFPROUTINE fast-path RPC
|
||||||
|
|
||||||
|
**Status**: active (MVP — scalar-only params/returns; UDT params deferred to Phase 13.x)
|
||||||
|
**Decision**: Implemented the fast-path RPC layer for direct stored-procedure invocation, exposed as ``Connection.fast_path_call(signature, *params) -> list``. Routine handles cached per-connection by signature.
|
||||||
|
|
||||||
|
### Wire protocol — three-message family
|
||||||
|
|
||||||
|
1. **`SQ_GETROUTINE`** (101) — handle resolution by signature
|
||||||
|
- Request: `[short 101][byte isRoutineById=0][int sigLen][sig bytes][pad if odd][short fparamFlag=0][short SQ_EOT]`
|
||||||
|
- Response: `[short 101][short dbNameLen][dbName bytes][int handle][short SQ_EOT]`
|
||||||
|
2. **`SQ_EXFPROUTINE`** (102) — execute by handle
|
||||||
|
- Request: `[short 102][short dbNameLen][dbName][pad if odd][int handle][short paramCount][short fparamFlag][SQ_BIND-format params][short SQ_EOT]`
|
||||||
|
3. **`SQ_FPROUTINE`** (103) — response with return values
|
||||||
|
- Body: `[short numReturns]` then per return: `[short type]` (with optional UDT info block when type > 18 except 52/53), `[short ind][short prec][data]`, then drain SQ_DONE/SQ_COST/SQ_XACTSTAT/SQ_EOT
|
||||||
|
|
||||||
|
### Why this layer matters even though SQL works for the common cases
|
||||||
|
|
||||||
|
Phase 10 and 11 showed you can do most smart-LOB work via plain SQL (`SELECT ifx_lo_open(col, mode)`, `INSERT INTO ... filetoblob(...)`). So why implement the fast-path?
|
||||||
|
|
||||||
|
Three reasons:
|
||||||
|
1. **`ifx_lo_close(int)`** can't be invoked via plain SQL (`EXECUTE FUNCTION ifx_lo_close(?)` returns `-674`). The cleanup step of the smart-LOB lifecycle genuinely needs the fast-path. Without it, opened locators leak server-side until the session closes.
|
||||||
|
2. **Direct UDF invocation** is faster — no PREPARE → DESCRIBE → EXECUTE → DRAIN. Just the two-message GETROUTINE+EXFPROUTINE round-trip (one if cached). For tight UDF-in-loop workloads, this is materially cheaper.
|
||||||
|
3. **Some Informix introspection functions** (e.g., procedural diagnostics) aren't projectable through SQL.
|
||||||
|
|
||||||
|
### Routine-handle cache
|
||||||
|
|
||||||
|
Mirrors JDBC's `setFPCacheInfo`. Per-connection `dict[str, tuple[str, int]]` mapping signature → (db_name, handle). First call resolves and caches; subsequent calls skip `SQ_GETROUTINE` entirely.
|
||||||
|
|
||||||
|
The cache is invalidated implicitly when the connection closes — the server-side handle is per-session anyway. No explicit eviction needed for typical workloads (a few stored functions used repeatedly).
|
||||||
|
|
||||||
|
### MVP scope — what's NOT in Phase 13
|
||||||
|
|
||||||
|
- **UDT param encoding**: `ifx_lo_open(blob_locator, mode)` takes a 72-byte UDT blob param (extended_type_name="blob"). Our `encode_param` doesn't yet emit the UDT-specific extended_owner/extended_name preamble required for type > 18. So users can't pass locators directly through fast-path.
|
||||||
|
- **UDT return decoding**: `ifx_lo_create(spec, mode, blob)` returns both a 72-byte locator AND an int. The response parser handles the int but not the locator UDT.
|
||||||
|
- **`SQ_LODATA`** for chunked binary read/write to open file descriptors (the other half of the original Phase 13 plan). Not strictly needed since Phase 10/11's `lotofile`/`filetoblob` already cover read/write end-to-end.
|
||||||
|
|
||||||
|
These are Phase 13.x items if a real workload needs them. For now, the **read/write smart-LOB cycle works fully without any of them** thanks to the SQL-as-shortcut design from Phase 10/11.
|
||||||
|
|
||||||
|
### Test coverage
|
||||||
|
|
||||||
|
5 integration tests in `tests/test_fastpath.py`:
|
||||||
|
- `ifx_lo_close(-1)` raises with sqlcode -9810 (server error path)
|
||||||
|
- End-to-end open via SQL → close via fast-path returns 0
|
||||||
|
- Handle caching (signature appears in cache after first call, reused on second)
|
||||||
|
- Unknown function signature raises (SQ_GETROUTINE error path)
|
||||||
|
- Multiple open/close cycles (verifies cleanup)
|
||||||
|
|
||||||
|
Total: **64 unit + 139 integration = 203 tests**.
|
||||||
|
|
||||||
|
### Architectural completion
|
||||||
|
|
||||||
|
With Phase 13, the project's protocol implementation now covers **every wire-message family JDBC uses** for ordinary database work:
|
||||||
|
- Login + session init (Phases 0-2)
|
||||||
|
- PREPARE → EXECUTE → FETCH (Phases 2-4)
|
||||||
|
- Transaction control (Phase 7)
|
||||||
|
- Type codecs for all common types (Phases 5-12)
|
||||||
|
- Fast-path RPC (Phase 13)
|
||||||
|
- File-transfer protocol (Phases 10-11)
|
||||||
|
|
||||||
|
The only families still unimplemented are: TLS handshake (`STARTTLS`), and the cluster-redirect protocol (replication failover). Neither is needed for a single-instance driver.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
## (template — copy below this line for new entries)
|
## (template — copy below this line for new entries)
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|||||||
173
src/informix_db/_fastpath.py
Normal file
173
src/informix_db/_fastpath.py
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
"""Fast-path RPC layer (Phase 13).
|
||||||
|
|
||||||
|
Informix supports invoking server-side stored functions through a
|
||||||
|
parallel protocol that bypasses the regular PREPARE → EXECUTE → FETCH
|
||||||
|
cursor flow. The wire-level family is three messages:
|
||||||
|
|
||||||
|
* ``SQ_GETROUTINE`` (101) — request a routine handle by signature.
|
||||||
|
Server replies with the same tag carrying ``(dbName, handle)``.
|
||||||
|
* ``SQ_EXFPROUTINE`` (102) — execute a previously-resolved handle with
|
||||||
|
bound parameters. Body contains ``(dbName, handle, paramCount,
|
||||||
|
fparamFlag, SQ_BIND-format params)``.
|
||||||
|
* ``SQ_FPROUTINE`` (103) — response carrying the function's return
|
||||||
|
values, shaped like a single SQ_TUPLE row.
|
||||||
|
|
||||||
|
Used by JDBC's ``IfxSmartBlob`` to call ``ifx_lo_open``,
|
||||||
|
``ifx_lo_close``, ``ifx_lo_create`` etc. without going through SQL.
|
||||||
|
We expose it as ``Connection.fast_path_call(signature, *params)``
|
||||||
|
for direct UDF/SPL invocation with low overhead.
|
||||||
|
|
||||||
|
Routine handles are cached per-connection by signature (mirrors
|
||||||
|
JDBC's ``setFPCacheInfo``). The first call resolves and caches; later
|
||||||
|
calls skip ``SQ_GETROUTINE`` entirely.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import struct
|
||||||
|
|
||||||
|
from ._messages import MessageType
|
||||||
|
from ._protocol import IfxStreamReader
|
||||||
|
from ._types import base_type
|
||||||
|
from .converters import encode_param
|
||||||
|
|
||||||
|
|
||||||
|
def build_get_routine_pdu(signature: str) -> bytes:
|
||||||
|
"""Build a ``SQ_GETROUTINE`` request PDU.
|
||||||
|
|
||||||
|
Wire format (USVER, modern):
|
||||||
|
``[short SQ_GETROUTINE=101][byte isRoutineById=0][int sigLen]
|
||||||
|
[sig bytes][pad if odd][short fparamFlag=0][short SQ_EOT=12]``
|
||||||
|
|
||||||
|
JDBC's ``getJavaToIfxCharBytes`` uses 4-byte length prefix on
|
||||||
|
modern servers (``isRemove64KLimitSupported``). We always emit the
|
||||||
|
4-byte form — works against 12.10+ unequivocally.
|
||||||
|
"""
|
||||||
|
sig_bytes = signature.encode("iso-8859-1")
|
||||||
|
sig_len = len(sig_bytes)
|
||||||
|
out = bytearray()
|
||||||
|
out.extend(struct.pack("!h", MessageType.SQ_GETROUTINE))
|
||||||
|
out.append(0) # isRoutineById = 0
|
||||||
|
out.extend(struct.pack("!i", sig_len))
|
||||||
|
out.extend(sig_bytes)
|
||||||
|
if sig_len & 1:
|
||||||
|
out.append(0) # writeChar even-byte pad
|
||||||
|
out.extend(struct.pack("!h", 0)) # fparamFlag = 0
|
||||||
|
out.extend(struct.pack("!h", MessageType.SQ_EOT))
|
||||||
|
return bytes(out)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_get_routine_response(reader: IfxStreamReader) -> tuple[str, int]:
|
||||||
|
"""Parse ``SQ_GETROUTINE`` response body (caller already consumed tag).
|
||||||
|
|
||||||
|
Wire format: ``[short dbNameLen][dbName bytes][pad if odd][int handle]``
|
||||||
|
"""
|
||||||
|
name_len = reader.read_short()
|
||||||
|
db_name = ""
|
||||||
|
if name_len > 0:
|
||||||
|
name_bytes = reader.read_exact(name_len)
|
||||||
|
db_name = name_bytes.decode("iso-8859-1")
|
||||||
|
if name_len & 1:
|
||||||
|
reader.read_exact(1) # pad
|
||||||
|
handle = reader.read_int()
|
||||||
|
return db_name, handle
|
||||||
|
|
||||||
|
|
||||||
|
def build_exfp_routine_pdu(
|
||||||
|
db_name: str,
|
||||||
|
handle: int,
|
||||||
|
params: tuple,
|
||||||
|
) -> bytes:
|
||||||
|
"""Build a ``SQ_EXFPROUTINE`` request PDU.
|
||||||
|
|
||||||
|
Wire format:
|
||||||
|
``[short SQ_EXFPROUTINE=102][short dbNameLen][dbName][pad if odd]
|
||||||
|
[int handle][short paramCount][short fparamFlag=0]
|
||||||
|
[SQ_BIND data...]``
|
||||||
|
|
||||||
|
The trailing SQ_BIND block is identical to ``Cursor._emit_bind_params``
|
||||||
|
output: ``[short SQ_BIND=5][short numparams][per-param block...]``.
|
||||||
|
"""
|
||||||
|
name_bytes = db_name.encode("iso-8859-1")
|
||||||
|
name_len = len(name_bytes)
|
||||||
|
out = bytearray()
|
||||||
|
out.extend(struct.pack("!h", MessageType.SQ_EXFPROUTINE))
|
||||||
|
out.extend(struct.pack("!h", name_len))
|
||||||
|
out.extend(name_bytes)
|
||||||
|
if name_len & 1:
|
||||||
|
out.append(0)
|
||||||
|
out.extend(struct.pack("!i", handle))
|
||||||
|
out.extend(struct.pack("!h", len(params)))
|
||||||
|
out.extend(struct.pack("!h", 0)) # fparamFlag = 0
|
||||||
|
# SQ_BIND-formatted parameters
|
||||||
|
out.extend(struct.pack("!h", MessageType.SQ_BIND))
|
||||||
|
out.extend(struct.pack("!h", len(params)))
|
||||||
|
for value in params:
|
||||||
|
if value is None:
|
||||||
|
out.extend(struct.pack("!hhh", 0, -1, 0))
|
||||||
|
continue
|
||||||
|
ifx_type, prec, raw = encode_param(value)
|
||||||
|
out.extend(struct.pack("!hhh", ifx_type, 0, prec))
|
||||||
|
out.extend(raw)
|
||||||
|
if len(raw) & 1:
|
||||||
|
out.append(0) # writePadded even-byte pad
|
||||||
|
out.extend(struct.pack("!h", MessageType.SQ_EOT))
|
||||||
|
return bytes(out)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_fp_routine_response(
|
||||||
|
reader: IfxStreamReader,
|
||||||
|
) -> list[object]:
|
||||||
|
"""Parse ``SQ_FPROUTINE`` response — return values of a fast-path call.
|
||||||
|
|
||||||
|
Wire format (per ``IfxSqli.receiveFastPath`` line 4271):
|
||||||
|
``[short numReturns]`` then per return:
|
||||||
|
``[short type]`` (with optional ``[short ownerLen][owner][short
|
||||||
|
nameLen][name]`` UDT block when ``type > 18 && type != 52 && type
|
||||||
|
!= 53``), ``[short ind][short prec][data]``.
|
||||||
|
|
||||||
|
Phase 13 MVP: supports INT/SMALLINT/BIGINT/FLOAT/REAL/CHAR/VARCHAR
|
||||||
|
return types only. UDT returns (e.g., from ``ifx_lo_create`` which
|
||||||
|
returns a 72-byte locator) are deferred to Phase 13.x.
|
||||||
|
"""
|
||||||
|
from .converters import FIXED_WIDTHS, decode
|
||||||
|
|
||||||
|
num_returns = reader.read_short()
|
||||||
|
results: list[object] = []
|
||||||
|
for _ in range(num_returns):
|
||||||
|
type_code = reader.read_short()
|
||||||
|
is_distinct = (type_code & 0x800) != 0 # noqa: F841 — informational
|
||||||
|
base = base_type(type_code)
|
||||||
|
# Strip distinct bit for UDT-info check
|
||||||
|
type_for_udt_check = type_code & 0xFF
|
||||||
|
if type_for_udt_check >= 18 and type_for_udt_check not in (52, 53):
|
||||||
|
owner_len = reader.read_short()
|
||||||
|
if owner_len > 0:
|
||||||
|
reader.read_exact(owner_len)
|
||||||
|
if owner_len & 1:
|
||||||
|
reader.read_exact(1)
|
||||||
|
name_len = reader.read_short()
|
||||||
|
if name_len > 0:
|
||||||
|
reader.read_exact(name_len)
|
||||||
|
if name_len & 1:
|
||||||
|
reader.read_exact(1)
|
||||||
|
ind = reader.read_short()
|
||||||
|
prec = reader.read_short() # noqa: F841 — informational
|
||||||
|
if ind == -1:
|
||||||
|
results.append(None)
|
||||||
|
continue
|
||||||
|
# Read fixed-width payload
|
||||||
|
width = FIXED_WIDTHS.get(base)
|
||||||
|
if width is None:
|
||||||
|
raise NotImplementedError(
|
||||||
|
f"fast-path return type {base} not yet supported "
|
||||||
|
"(Phase 13 MVP: simple scalar types only)"
|
||||||
|
)
|
||||||
|
raw = reader.read_exact(width)
|
||||||
|
if width & 1:
|
||||||
|
reader.read_exact(1) # pad
|
||||||
|
try:
|
||||||
|
results.append(decode(type_code, raw))
|
||||||
|
except (NotImplementedError, KeyError):
|
||||||
|
results.append(raw)
|
||||||
|
return results
|
||||||
@ -101,11 +101,19 @@ class MessageType(IntEnum):
|
|||||||
# Body: [short subCom][short loFd][int length]
|
# Body: [short subCom][short loFd][int length]
|
||||||
# [short bufSize=32000] (+ [int8 offset][short whence]
|
# [short bufSize=32000] (+ [int8 offset][short whence]
|
||||||
# for LO_READWITHSEEK). See IfxSqli.sendLoData line 4864.
|
# for LO_READWITHSEEK). See IfxSqli.sendLoData line 4864.
|
||||||
SQ_FPROUTINE = 103 # fast-path RPC to invoke server-side stored
|
SQ_GETROUTINE = 101 # request a routine handle by signature.
|
||||||
# functions like ifx_lo_open / ifx_lo_close /
|
# Body: [byte isRoutineById][int sigLen]
|
||||||
# ifx_lo_create. Used to obtain a file descriptor
|
# [sig bytes][pad if odd][short fparamFlag].
|
||||||
# for an open smart-LOB locator. Implements its own
|
# Response is the same tag with body
|
||||||
# parameter-marshaling format with UDT support.
|
# [short dbNameLen][dbName][int handle].
|
||||||
|
SQ_EXFPROUTINE = 102 # execute a fast-path routine with bound
|
||||||
|
# params. Body: [char dbName][int handle]
|
||||||
|
# [short paramCount][short fparamFlag]
|
||||||
|
# [SQ_BIND-format params].
|
||||||
|
SQ_FPROUTINE = 103 # response tag: fast-path return-value descriptor.
|
||||||
|
# Body: [short numReturns] then per return:
|
||||||
|
# [short type][maybe UDT info][short ind]
|
||||||
|
# [short prec][data].
|
||||||
SQ_FPARAM = 104 # parameter metadata for SQ_FPROUTINE
|
SQ_FPARAM = 104 # parameter metadata for SQ_FPROUTINE
|
||||||
|
|
||||||
# --- RPC sub-protocol (range 200-205) — Phase 6+ ---
|
# --- RPC sub-protocol (range 200-205) — Phase 6+ ---
|
||||||
|
|||||||
@ -106,6 +106,10 @@ class Connection:
|
|||||||
# an unlogged-DB rejection (-201). None until we've tried.
|
# an unlogged-DB rejection (-201). None until we've tried.
|
||||||
# Used to avoid repeatedly probing on unlogged DBs.
|
# Used to avoid repeatedly probing on unlogged DBs.
|
||||||
self._supports_begin_work: bool | None = None
|
self._supports_begin_work: bool | None = None
|
||||||
|
# Phase 13: per-connection routine-handle cache. Maps function
|
||||||
|
# signature → (db_name, handle). First call resolves via
|
||||||
|
# SQ_GETROUTINE; subsequent calls skip that round-trip.
|
||||||
|
self._fp_handle_cache: dict[str, tuple[str, int]] = {}
|
||||||
|
|
||||||
# Build the env-var dict sent in the login PDU.
|
# Build the env-var dict sent in the login PDU.
|
||||||
self._env = dict(_DEFAULT_ENV)
|
self._env = dict(_DEFAULT_ENV)
|
||||||
@ -200,6 +204,107 @@ class Connection:
|
|||||||
self._drain_to_eot()
|
self._drain_to_eot()
|
||||||
self._in_transaction = False
|
self._in_transaction = False
|
||||||
|
|
||||||
|
def fast_path_call(
|
||||||
|
self, signature: str, *params: object
|
||||||
|
) -> list[object]:
|
||||||
|
"""Invoke a server-side stored function via the fast-path RPC.
|
||||||
|
|
||||||
|
Bypasses the regular PREPARE → EXECUTE → FETCH cursor pipeline,
|
||||||
|
going through ``SQ_GETROUTINE`` (101) + ``SQ_EXFPROUTINE`` (102)
|
||||||
|
directly. Routine handles are cached per-connection so repeated
|
||||||
|
calls skip the lookup round-trip.
|
||||||
|
|
||||||
|
Use this for direct UDF/SPL invocation when the regular SQL
|
||||||
|
``EXECUTE FUNCTION`` path doesn't work (e.g., ``ifx_lo_close``
|
||||||
|
which needs the fast-path mechanism).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
signature: Full function signature, e.g.
|
||||||
|
``"function informix.ifx_lo_close(integer)"``.
|
||||||
|
*params: Positional parameter values. Same Python types as
|
||||||
|
``cursor.execute(...)`` accepts (int/float/str/bool/
|
||||||
|
None/etc.).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of return values (a function returning a single int
|
||||||
|
yields a 1-element list).
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
``OperationalError`` etc. on server errors, same shape as
|
||||||
|
``cursor.execute``.
|
||||||
|
|
||||||
|
Example::
|
||||||
|
|
||||||
|
handles = conn.fast_path_call(
|
||||||
|
"function informix.ifx_lo_close(integer)", 42
|
||||||
|
)
|
||||||
|
# handles[0] is the int return code
|
||||||
|
"""
|
||||||
|
from ._fastpath import (
|
||||||
|
build_exfp_routine_pdu,
|
||||||
|
build_get_routine_pdu,
|
||||||
|
parse_fp_routine_response,
|
||||||
|
parse_get_routine_response,
|
||||||
|
)
|
||||||
|
from .cursors import _SocketReader
|
||||||
|
|
||||||
|
if self._closed:
|
||||||
|
raise InterfaceError("connection is closed")
|
||||||
|
|
||||||
|
cached = self._fp_handle_cache.get(signature)
|
||||||
|
if cached is None:
|
||||||
|
# Resolve via SQ_GETROUTINE
|
||||||
|
self._sock.write_all(build_get_routine_pdu(signature))
|
||||||
|
reader = _SocketReader(self._sock)
|
||||||
|
tag = reader.read_short()
|
||||||
|
if tag == MessageType.SQ_ERR:
|
||||||
|
self._raise_sq_err()
|
||||||
|
if tag != MessageType.SQ_GETROUTINE:
|
||||||
|
raise OperationalError(
|
||||||
|
f"fast-path GETROUTINE: unexpected tag 0x{tag:04x}"
|
||||||
|
)
|
||||||
|
db_name, handle = parse_get_routine_response(reader)
|
||||||
|
tail = reader.read_short()
|
||||||
|
if tail != MessageType.SQ_EOT:
|
||||||
|
raise OperationalError(
|
||||||
|
f"GETROUTINE response: missing SQ_EOT (got 0x{tail:04x})"
|
||||||
|
)
|
||||||
|
self._fp_handle_cache[signature] = (db_name, handle)
|
||||||
|
else:
|
||||||
|
db_name, handle = cached
|
||||||
|
|
||||||
|
# Now execute via SQ_EXFPROUTINE
|
||||||
|
self._sock.write_all(
|
||||||
|
build_exfp_routine_pdu(db_name, handle, params)
|
||||||
|
)
|
||||||
|
reader = _SocketReader(self._sock)
|
||||||
|
tag = reader.read_short()
|
||||||
|
if tag == MessageType.SQ_ERR:
|
||||||
|
self._raise_sq_err()
|
||||||
|
if tag != MessageType.SQ_FPROUTINE:
|
||||||
|
raise OperationalError(
|
||||||
|
f"fast-path EXFPROUTINE: unexpected response tag 0x{tag:04x}"
|
||||||
|
)
|
||||||
|
results = parse_fp_routine_response(reader)
|
||||||
|
# Drain any trailing tags until SQ_EOT (server may send
|
||||||
|
# SQ_DONE/SQ_COST/SQ_XACTSTAT before SQ_EOT, same as SQL paths)
|
||||||
|
while True:
|
||||||
|
tag = reader.read_short()
|
||||||
|
if tag == MessageType.SQ_EOT:
|
||||||
|
break
|
||||||
|
elif tag == MessageType.SQ_DONE:
|
||||||
|
reader.read_exact(2 + 4 + 4 + 4) # warn + rows + rowid + serial
|
||||||
|
elif tag == 55: # SQ_COST
|
||||||
|
reader.read_int()
|
||||||
|
reader.read_int()
|
||||||
|
elif tag == MessageType.SQ_XACTSTAT:
|
||||||
|
reader.read_exact(2 + 2 + 2)
|
||||||
|
else:
|
||||||
|
raise OperationalError(
|
||||||
|
f"fast-path response: unexpected tag 0x{tag:04x}"
|
||||||
|
)
|
||||||
|
return results
|
||||||
|
|
||||||
def _ensure_transaction(self) -> None:
|
def _ensure_transaction(self) -> None:
|
||||||
"""Open a server-side transaction if one isn't already open.
|
"""Open a server-side transaction if one isn't already open.
|
||||||
|
|
||||||
|
|||||||
153
tests/test_fastpath.py
Normal file
153
tests/test_fastpath.py
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
"""Phase 13 integration tests — fast-path RPC via SQ_FPROUTINE / SQ_EXFPROUTINE.
|
||||||
|
|
||||||
|
The fast-path family is a parallel protocol for invoking server-side
|
||||||
|
stored functions without going through PREPARE → EXECUTE → FETCH.
|
||||||
|
Three messages:
|
||||||
|
|
||||||
|
* ``SQ_GETROUTINE`` (101) — handle resolution by signature
|
||||||
|
* ``SQ_EXFPROUTINE`` (102) — execute by handle with bound params
|
||||||
|
* ``SQ_FPROUTINE`` (103) — response with return values
|
||||||
|
|
||||||
|
Used by JDBC's ``IfxSmartBlob`` to call ``ifx_lo_open`` / ``ifx_lo_close``
|
||||||
|
/ ``ifx_lo_create`` etc. We expose it as
|
||||||
|
``Connection.fast_path_call(signature, *params)`` for direct UDF/SPL
|
||||||
|
invocation.
|
||||||
|
|
||||||
|
Routine handles are cached per-connection by signature so repeated
|
||||||
|
calls skip the lookup round-trip.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
import informix_db
|
||||||
|
from tests.conftest import ConnParams
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.integration
|
||||||
|
|
||||||
|
|
||||||
|
def _connect(params: ConnParams) -> informix_db.Connection:
|
||||||
|
return informix_db.connect(
|
||||||
|
host=params.host,
|
||||||
|
port=params.port,
|
||||||
|
user=params.user,
|
||||||
|
password=params.password,
|
||||||
|
database=params.database,
|
||||||
|
server=params.server,
|
||||||
|
connect_timeout=10.0,
|
||||||
|
read_timeout=10.0,
|
||||||
|
autocommit=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_fast_path_close_invalid_lofd_raises(
|
||||||
|
logged_db_params: ConnParams,
|
||||||
|
) -> None:
|
||||||
|
"""Calling ``ifx_lo_close(-1)`` raises with sqlcode -9810."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
with pytest.raises(informix_db.Error) as excinfo:
|
||||||
|
conn.fast_path_call(
|
||||||
|
"function informix.ifx_lo_close(integer)", -1
|
||||||
|
)
|
||||||
|
# The message should mention -9810 (smart-large-object error)
|
||||||
|
assert "-9810" in str(excinfo.value)
|
||||||
|
|
||||||
|
|
||||||
|
def test_fast_path_close_real_lofd_returns_zero(
|
||||||
|
logged_db_params: ConnParams,
|
||||||
|
) -> None:
|
||||||
|
"""End-to-end: open a real smart-LOB, close it via fast-path, expect 0."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
cur.execute("DROP TABLE p13_t1")
|
||||||
|
try:
|
||||||
|
cur.execute("CREATE TABLE p13_t1 (id INT, data BLOB)")
|
||||||
|
except informix_db.Error as e:
|
||||||
|
pytest.skip(f"sbspace unavailable ({e!r})")
|
||||||
|
cur.write_blob_column(
|
||||||
|
"INSERT INTO p13_t1 VALUES (?, BLOB_PLACEHOLDER)",
|
||||||
|
b"fast-path test", (1,),
|
||||||
|
)
|
||||||
|
# Open via SQL (returns the lofd as an int)
|
||||||
|
cur.execute("SELECT ifx_lo_open(data, 4) FROM p13_t1")
|
||||||
|
(lofd,) = cur.fetchone()
|
||||||
|
assert lofd > 0 # valid file descriptor
|
||||||
|
|
||||||
|
# Close via fast-path RPC
|
||||||
|
result = conn.fast_path_call(
|
||||||
|
"function informix.ifx_lo_close(integer)", lofd
|
||||||
|
)
|
||||||
|
assert result == [0]
|
||||||
|
|
||||||
|
cur.execute("DROP TABLE p13_t1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_fast_path_handle_caching(
|
||||||
|
logged_db_params: ConnParams,
|
||||||
|
) -> None:
|
||||||
|
"""Second call with same signature reuses the cached handle (no GETROUTINE)."""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
sig = "function informix.ifx_lo_close(integer)"
|
||||||
|
# First call — populates cache
|
||||||
|
with contextlib.suppress(informix_db.Error):
|
||||||
|
conn.fast_path_call(sig, -1)
|
||||||
|
assert sig in conn._fp_handle_cache
|
||||||
|
cached_handle = conn._fp_handle_cache[sig]
|
||||||
|
assert cached_handle[0] == "testdb" # db name
|
||||||
|
assert isinstance(cached_handle[1], int)
|
||||||
|
first_handle = cached_handle[1]
|
||||||
|
|
||||||
|
# Second call — should use the cache (no second GETROUTINE)
|
||||||
|
with contextlib.suppress(informix_db.Error):
|
||||||
|
conn.fast_path_call(sig, -2)
|
||||||
|
# Handle in cache unchanged — same int = same cache entry hit
|
||||||
|
assert conn._fp_handle_cache[sig][1] == first_handle
|
||||||
|
|
||||||
|
|
||||||
|
def test_fast_path_unknown_function_raises(
|
||||||
|
logged_db_params: ConnParams,
|
||||||
|
) -> None:
|
||||||
|
"""Bad signature → server error from SQ_GETROUTINE."""
|
||||||
|
with _connect(logged_db_params) as conn, pytest.raises(informix_db.Error):
|
||||||
|
conn.fast_path_call(
|
||||||
|
"function informix.no_such_function_at_all_xyz(integer)", 1
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_fast_path_open_and_close_cycle(
|
||||||
|
logged_db_params: ConnParams,
|
||||||
|
) -> None:
|
||||||
|
"""Demonstrate a full open → close cycle through fast-path + SQL.
|
||||||
|
|
||||||
|
``ifx_lo_open`` accepts a UDT (BLOB locator) parameter which our
|
||||||
|
fast-path encoder doesn't yet support — but the *server* can supply
|
||||||
|
the locator from the column itself, so we open via plain SQL
|
||||||
|
(``SELECT ifx_lo_open(col, mode)``) and close via fast-path. This
|
||||||
|
is the same pattern Phase 10 used for read.
|
||||||
|
"""
|
||||||
|
with _connect(logged_db_params) as conn:
|
||||||
|
cur = conn.cursor()
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
cur.execute("DROP TABLE p13_t2")
|
||||||
|
try:
|
||||||
|
cur.execute("CREATE TABLE p13_t2 (id INT, data BLOB)")
|
||||||
|
except informix_db.Error as e:
|
||||||
|
pytest.skip(f"sbspace unavailable ({e!r})")
|
||||||
|
cur.write_blob_column(
|
||||||
|
"INSERT INTO p13_t2 VALUES (?, BLOB_PLACEHOLDER)",
|
||||||
|
b"open close cycle data", (1,),
|
||||||
|
)
|
||||||
|
# Multiple open/close cycles to verify cleanup
|
||||||
|
for _ in range(3):
|
||||||
|
cur.execute("SELECT ifx_lo_open(data, 4) FROM p13_t2")
|
||||||
|
(lofd,) = cur.fetchone()
|
||||||
|
result = conn.fast_path_call(
|
||||||
|
"function informix.ifx_lo_close(integer)", lofd
|
||||||
|
)
|
||||||
|
assert result == [0]
|
||||||
|
|
||||||
|
cur.execute("DROP TABLE p13_t2")
|
||||||
Loading…
x
Reference in New Issue
Block a user