Phase 7: real transaction semantics on logged databases
Introduces driver-managed transactions that work seamlessly across logged and unlogged databases. The user calls commit() and rollback() without needing to know which kind they're hitting — the connection tracks transaction state internally. Three protocol facts came out of integration testing: 1. Logged DBs in non-ANSI mode require an explicit SQ_BEGIN before the first DML — the server doesn't auto-open a transaction. Connection._ensure_transaction() sends SQ_BEGIN lazily and is idempotent within an open txn. After commit/rollback, the next DML triggers a fresh BEGIN. 2. SQ_RBWORK has a [short savepoint=0] payload before the SQ_EOT framing tag — sending SQ_RBWORK alone causes the server to hang silently (waiting for the missing 2 bytes). SQ_CMMTWORK has no payload. This is the same pattern as the SHORT-vs-INT bug from Phase 4.x and the 2-byte length prefix from Phase 6.c — when the server hangs, it's an incomplete PDU body. 3. SQ_XACTSTAT (tag 99) is a logged-DB-only message that's interleaved with normal responses. Now drained in all four response-reading paths: cursor _drain_to_eot, _read_describe_ response, _read_fetch_response, and connection _drain_to_eot. For unlogged DBs (e.g., sysmaster), SQ_BEGIN returns -201 and we cache that result so subsequent DML doesn't re-probe. commit() and rollback() are silent no-ops in that case — same client code works across both DB modes. Tests: * New tests/test_transactions.py — 10 integration tests covering commit visibility, rollback isolation, multi-row rollback, partial commit-then-rollback, autocommit behavior, cross-connection durability, UPDATE/DELETE rollback, implicit per-statement txn. * conftest.py auto-creates testdb (logged) for the suite. * Two old tests rewritten to assert new no-op behavior on unlogged DBs (test_commit_rollback_in_unlogged_db_is_noop, test_commit_in_unlogged_db_is_noop). Total: 53 unit + 98 integration = 151 tests. The Phase 3 "gate test" (test_rollback_hides_insert) — a rolled-back INSERT must be invisible to subsequent SELECTs in the same session — now passes against a real logged database for the first time.
This commit is contained in:
parent
f546f951c8
commit
1c19c71cb6
@ -529,6 +529,66 @@ The constants `SQ_BBIND=41`, `SQ_BLOB=39`, `SQ_FETCHBLOB=38`, `SQ_SBBIND=52`, `S
|
||||
|
||||
---
|
||||
|
||||
## 2026-05-04 — Phase 7: real transaction semantics on logged databases
|
||||
|
||||
**Status**: active
|
||||
**Decision**: The driver now manages transactions implicitly on logged databases. Three protocol facts came out of integration testing that materially shaped the implementation:
|
||||
|
||||
### Fact 1: SQ_BEGIN is REQUIRED before the first DML in a logged-DB transaction
|
||||
|
||||
Informix in non-ANSI mode does NOT auto-open a server-side transaction on the first DML. Without an explicit ``SQ_BEGIN`` (tag 35), the server treats each statement as if it's already in some implicit txn (data is visible after the INSERT) but ``COMMIT WORK`` afterward fails with sqlcode -255 ("Not in transaction"). The "INSERT then COMMIT" sequence appears to work for visibility but the COMMIT-as-no-op is broken in a way that violates user expectations.
|
||||
|
||||
**Solution**: ``Connection._ensure_transaction()`` is called by ``Cursor.execute()`` and ``Cursor.executemany()`` before sending PREPARE. It sends ``SQ_BEGIN`` if no transaction is currently open. Idempotent within an open txn. After ``commit()``/``rollback()``, ``_in_transaction`` is reset to ``False`` so the NEXT DML triggers a fresh ``SQ_BEGIN``.
|
||||
|
||||
For unlogged databases, ``SQ_BEGIN`` returns sqlcode -201 ("BEGIN WORK requires logged DB"). We **cache that result** on the connection (``_supports_begin_work=False``) so subsequent DML doesn't re-probe. This means the same client code works seamlessly on logged or unlogged DBs without the user having to know which they're hitting.
|
||||
|
||||
### Fact 2: SQ_RBWORK has a savepoint short payload — SQ_CMMTWORK does not
|
||||
|
||||
Reading ``IfxSqli.sendRollback`` (line 647) revealed that ``SQ_RBWORK`` (tag 20) is followed by ``[short savepoint=0]`` BEFORE the ``SQ_EOT`` framing tag. Without that 2-byte payload, the server **silently hangs** waiting for it — no error, no timeout, just a stuck socket read.
|
||||
|
||||
This caused a confusing 30-second test timeout on the first integration run. The fix is one line:
|
||||
|
||||
```python
|
||||
self._sock.write_all(struct.pack("!hhh", SQ_RBWORK, 0, SQ_EOT))
|
||||
```
|
||||
|
||||
``SQ_CMMTWORK`` (tag 19), by contrast, has no payload — it's just the tag followed by SQ_EOT.
|
||||
|
||||
**Lesson**: same pattern as the SHORT-vs-INT field in CURNAME+NFETCH (Phase 4.x) and the 2-byte length prefix in DECIMAL/DATETIME/INTERVAL bind data (Phase 6.c+). When the server hangs, **it's almost always an incomplete PDU body** — the server is waiting for bytes you didn't send. Compare your bytes to JDBC's, byte-by-byte.
|
||||
|
||||
### Fact 3: SQ_XACTSTAT (tag 99) is a logged-DB-only message
|
||||
|
||||
Logged databases emit ``SQ_XACTSTAT`` (tag 99) interleaved with normal DML responses to inform the client of transaction-state events. Body: ``[short xcEvent][short xcNewLevel][short xcOldLevel]``. We don't surface these events to the user (yet) but must drain them in **every** response-reading path: ``_drain_to_eot`` (used by commit, rollback, DML), ``_read_describe_response`` (PREPARE response), ``_read_fetch_response`` (NFETCH response), and the connection-level ``_drain_to_eot`` (used by SQ_BEGIN, session init).
|
||||
|
||||
Without handling SQ_XACTSTAT in all four paths, the cursor desynchronizes from the wire stream and the next read pulls garbage tags (which then raise "unexpected tag" errors that hide the real cause).
|
||||
|
||||
### Cross-connection isolation tests are config-dependent — don't bake them in
|
||||
|
||||
The original test plan included a cross-connection visibility test ("conn A inserts, conn B reads zero rows before commit, then sees one row after"). Informix's default isolation is **Committed Read with row-level locking**, so conn B's SELECT *blocks* on the unlocked row rather than returning zero. With ``LOCK MODE NOT WAIT`` (the default), this surfaces as sqlcode -252 (lock timeout) immediately. With ``LOCK MODE WAIT N``, it waits N seconds.
|
||||
|
||||
Either behavior is correct under Informix semantics — the test would just be testing the lock manager, not transaction visibility. We removed that test and replaced it with the simpler ``test_committed_data_visible_to_fresh_connection`` which proves durability across connections without engaging the lock manager.
|
||||
|
||||
### Test coverage delivered
|
||||
|
||||
10 transaction tests in ``tests/test_transactions.py``, all passing against the auto-created ``testdb`` logged database:
|
||||
|
||||
- Commit visibility (single connection)
|
||||
- Rollback isolation — the "Phase 3 gate" test
|
||||
- Multi-row rollback
|
||||
- Partial-commit-then-rollback
|
||||
- Autocommit semantics (persists, rollback no-op)
|
||||
- Cross-connection durability
|
||||
- UPDATE+rollback, DELETE+rollback
|
||||
- Implicit per-statement transaction
|
||||
|
||||
The ``conftest.py::_ensure_testdb`` fixture auto-creates ``testdb WITH LOG`` if missing, so the tests work on a fresh dev container provided ``blobspace1`` and ``sbspace1`` exist (created during Phase 6.f research).
|
||||
|
||||
### Two old tests retired
|
||||
|
||||
``test_commit_rollback_in_unlogged_db_raises`` and ``test_commit_in_unlogged_db_is_operational_error`` were written assuming commit() on an unlogged DB raised -255. The Phase 7 driver-side smarts now make those calls a silent no-op (the connection knows there's no open txn). Both tests were rewritten to assert the new (better) behavior. PEP 249 doesn't mandate any specific behavior for unsupported operations; "graceful no-op" matches what most modern drivers do.
|
||||
|
||||
---
|
||||
|
||||
## (template — copy below this line for new entries)
|
||||
|
||||
```
|
||||
|
||||
@ -41,6 +41,9 @@ class MessageType(IntEnum):
|
||||
SQ_ERR = 13 # error response from the server
|
||||
SQ_TUPLE = 14 # one row of result data
|
||||
SQ_DONE = 15 # statement / result-set completion
|
||||
SQ_XACTSTAT = 99 # transaction-state event (logged DBs only). Body:
|
||||
# ``[short xcEvent][short xcNewLevel][short xcOldLevel]``. See
|
||||
# ``IfxSqli.receiveXactstat`` and the Phase 7 DECISION_LOG entry.
|
||||
|
||||
# --- Transactions ---
|
||||
SQ_CMMTWORK = 19
|
||||
|
||||
@ -96,6 +96,16 @@ class Connection:
|
||||
self._autocommit = autocommit
|
||||
self._closed = False
|
||||
self._lock = threading.Lock()
|
||||
# Logged-DB transaction state: True iff there's an open server-side
|
||||
# transaction (SQ_BEGIN sent, not yet committed/rolled-back). The
|
||||
# cursor uses this to decide whether to send an implicit SQ_BEGIN
|
||||
# before the next DML in non-autocommit mode. We default to "no
|
||||
# open txn" — the first DML will trigger SQ_BEGIN.
|
||||
self._in_transaction = False
|
||||
# Tri-state: True after first successful SQ_BEGIN, False after
|
||||
# an unlogged-DB rejection (-201). None until we've tried.
|
||||
# Used to avoid repeatedly probing on unlogged DBs.
|
||||
self._supports_begin_work: bool | None = None
|
||||
|
||||
# Build the env-var dict sent in the login PDU.
|
||||
self._env = dict(_DEFAULT_ENV)
|
||||
@ -148,20 +158,75 @@ class Connection:
|
||||
self._sock.write_all(pdu)
|
||||
|
||||
def commit(self) -> None:
|
||||
"""Commit the current transaction (SQ_CMMTWORK)."""
|
||||
"""Commit the current transaction (SQ_CMMTWORK).
|
||||
|
||||
On unlogged DBs, this is a server-side no-op (the server returns
|
||||
a SQ_DONE with no error). On logged DBs without ANSI mode, this
|
||||
closes the transaction opened by the most recent ``_ensure_transaction``
|
||||
call — typically the first DML since the last commit/rollback.
|
||||
"""
|
||||
if self._closed:
|
||||
raise InterfaceError("connection is closed")
|
||||
# PDU: [short SQ_CMMTWORK=19][short SQ_EOT=12]
|
||||
if not self._in_transaction:
|
||||
# No server-side transaction is open — sending COMMIT WORK
|
||||
# would yield -255 ("Not in transaction"). On unlogged DBs
|
||||
# we can't tell from the client whether we're in a txn; we
|
||||
# only set the flag after a successful BEGIN, so this branch
|
||||
# also covers "no DML happened since last commit/rollback".
|
||||
return
|
||||
self._sock.write_all(struct.pack("!hh", MessageType.SQ_CMMTWORK, MessageType.SQ_EOT))
|
||||
self._drain_to_eot()
|
||||
self._in_transaction = False
|
||||
|
||||
def rollback(self) -> None:
|
||||
"""Roll back the current transaction (SQ_RBWORK)."""
|
||||
"""Roll back the current transaction (SQ_RBWORK).
|
||||
|
||||
Like ``commit``, this is a no-op when no server-side transaction
|
||||
is open. On logged DBs, this discards all DML since the last
|
||||
commit/begin.
|
||||
"""
|
||||
if self._closed:
|
||||
raise InterfaceError("connection is closed")
|
||||
# PDU: [short SQ_RBWORK=20][short SQ_EOT=12]
|
||||
self._sock.write_all(struct.pack("!hh", MessageType.SQ_RBWORK, MessageType.SQ_EOT))
|
||||
if not self._in_transaction:
|
||||
return
|
||||
# Wire format per ``IfxSqli.sendRollback`` line 647:
|
||||
# [short SQ_RBWORK=20][short savepoint=0][short SQ_EOT=12]
|
||||
# The savepoint short is REQUIRED — sending SQ_RBWORK alone hangs
|
||||
# the server (it's waiting for the next 2 bytes). SQ_CMMTWORK,
|
||||
# by contrast, takes no payload — confirmed in IfxSqli.sendCommit.
|
||||
self._sock.write_all(
|
||||
struct.pack("!hhh", MessageType.SQ_RBWORK, 0, MessageType.SQ_EOT)
|
||||
)
|
||||
self._drain_to_eot()
|
||||
self._in_transaction = False
|
||||
|
||||
def _ensure_transaction(self) -> None:
|
||||
"""Open a server-side transaction if one isn't already open.
|
||||
|
||||
Called by the cursor before sending DML in non-autocommit mode.
|
||||
On a logged DB, this sends ``SQ_BEGIN`` and the server allocates
|
||||
a transaction. On an unlogged DB the server replies with -201
|
||||
("BEGIN WORK requires a logged database") which we cache and
|
||||
skip on subsequent calls — DML still works fine in that mode.
|
||||
|
||||
Idempotent: subsequent calls are no-ops while the transaction
|
||||
is open or while we've cached "this DB doesn't support BEGIN".
|
||||
"""
|
||||
if self._autocommit or self._in_transaction or self._closed:
|
||||
return
|
||||
if self._supports_begin_work is False:
|
||||
return # cached: unlogged DB, BEGIN unavailable
|
||||
self._sock.write_all(struct.pack("!hh", MessageType.SQ_BEGIN, MessageType.SQ_EOT))
|
||||
try:
|
||||
self._drain_to_eot()
|
||||
self._in_transaction = True
|
||||
self._supports_begin_work = True
|
||||
except OperationalError as e:
|
||||
# -201: BEGIN WORK requires a logged DB. Cache and proceed.
|
||||
if getattr(e, "sqlcode", None) == -201:
|
||||
self._supports_begin_work = False
|
||||
return
|
||||
raise
|
||||
|
||||
def close(self) -> None:
|
||||
"""Send SQ_EXIT and tear down the socket. Idempotent."""
|
||||
@ -292,6 +357,11 @@ class Connection:
|
||||
elif tag == 55: # SQ_COST — server appends cost info; ignore
|
||||
# [int cost1][int cost2]
|
||||
self._sock.read_exact(4 + 4)
|
||||
elif tag == MessageType.SQ_XACTSTAT:
|
||||
# Transaction-state event (logged DBs only). Body: 3 shorts
|
||||
# — xcEvent, xcNewLevel, xcOldLevel. Drain to keep the
|
||||
# stream aligned. See IfxSqli.receiveXactstat.
|
||||
self._sock.read_exact(2 + 2 + 2)
|
||||
elif tag == MessageType.SQ_ERR:
|
||||
self._raise_sq_err()
|
||||
else:
|
||||
|
||||
@ -132,6 +132,12 @@ class Cursor:
|
||||
self._row_iter = None
|
||||
self._statement_already_done = False
|
||||
|
||||
# On a logged DB in non-autocommit mode, the server requires an
|
||||
# explicit SQ_BEGIN before the first DML in each transaction.
|
||||
# _ensure_transaction is a no-op for autocommit / unlogged DBs,
|
||||
# and idempotent within an open transaction.
|
||||
self._conn._ensure_transaction()
|
||||
|
||||
# Step 1: PREPARE — send SQL with numQmarks = len(params).
|
||||
self._conn._send_pdu(self._build_prepare_pdu(sql, num_qmarks=len(params)))
|
||||
self._read_describe_response()
|
||||
@ -263,6 +269,10 @@ class Cursor:
|
||||
self._row_iter = None
|
||||
self._statement_already_done = False
|
||||
|
||||
# Logged-DB transaction guard — same as execute(). Idempotent
|
||||
# within an open transaction.
|
||||
self._conn._ensure_transaction()
|
||||
|
||||
# PREPARE once.
|
||||
self._conn._send_pdu(self._build_prepare_pdu(sql, num_qmarks=first_len))
|
||||
self._read_describe_response()
|
||||
@ -510,6 +520,8 @@ class Cursor:
|
||||
elif tag == 55: # SQ_COST
|
||||
reader.read_int()
|
||||
reader.read_int()
|
||||
elif tag == MessageType.SQ_XACTSTAT:
|
||||
reader.read_exact(2 + 2 + 2)
|
||||
elif tag == MessageType.SQ_ERR:
|
||||
self._raise_sq_err(reader)
|
||||
else:
|
||||
@ -530,6 +542,8 @@ class Cursor:
|
||||
elif tag == 55: # SQ_COST
|
||||
reader.read_int()
|
||||
reader.read_int()
|
||||
elif tag == MessageType.SQ_XACTSTAT:
|
||||
reader.read_exact(2 + 2 + 2)
|
||||
elif tag == MessageType.SQ_ERR:
|
||||
self._raise_sq_err(reader)
|
||||
else:
|
||||
@ -554,6 +568,15 @@ class Cursor:
|
||||
# Track best-effort rowcount = 1 for literal-value INSERTs.
|
||||
if self._rowcount < 0:
|
||||
self._rowcount = 1
|
||||
elif tag == MessageType.SQ_XACTSTAT:
|
||||
# Transaction-state event. Body: 3 shorts (event,
|
||||
# newLevel, oldLevel) per IfxSqli.receiveXactstat. We
|
||||
# don't expose these to the user yet but must drain
|
||||
# them to keep the stream aligned. Logged DBs emit one
|
||||
# per DML statement.
|
||||
reader.read_short() # xcEvent
|
||||
reader.read_short() # xcNewLevel
|
||||
reader.read_short() # xcOldLevel
|
||||
elif tag == MessageType.SQ_ERR:
|
||||
self._raise_sq_err(reader)
|
||||
else:
|
||||
|
||||
@ -85,3 +85,56 @@ def ifx_connection(conn_params: ConnParams) -> Iterator[object]:
|
||||
yield conn
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def logged_db_params(conn_params: ConnParams) -> ConnParams:
|
||||
"""Connection parameters for a LOGGED database — required for real
|
||||
transaction tests (commit/rollback against an unlogged DB is a no-op).
|
||||
|
||||
Defaults to ``testdb`` (created by Phase 7 setup with
|
||||
``CREATE DATABASE testdb WITH LOG``). Override via
|
||||
``IFX_LOGGED_DATABASE``. The database must exist; if missing, the
|
||||
test will fail with a clear "database not found" error.
|
||||
"""
|
||||
return conn_params._replace(
|
||||
database=os.environ.get("IFX_LOGGED_DATABASE", "testdb"),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def _ensure_testdb(
|
||||
request: pytest.FixtureRequest, conn_params: ConnParams
|
||||
) -> None:
|
||||
"""Lazily ensure ``testdb`` exists if integration tests are running.
|
||||
|
||||
If the user has set ``IFX_LOGGED_DATABASE`` to something else, we
|
||||
don't try to create it (assume they manage that DB themselves).
|
||||
"""
|
||||
if "integration" not in request.config.getoption("-m", default=""):
|
||||
return
|
||||
if os.environ.get("IFX_LOGGED_DATABASE"):
|
||||
return # user-managed DB; don't auto-create
|
||||
try:
|
||||
import informix_db
|
||||
|
||||
conn = informix_db.connect(
|
||||
host=conn_params.host,
|
||||
port=conn_params.port,
|
||||
user=conn_params.user,
|
||||
password=conn_params.password,
|
||||
database="sysmaster",
|
||||
server=conn_params.server,
|
||||
connect_timeout=5.0,
|
||||
autocommit=True,
|
||||
)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
"SELECT name FROM sysdatabases WHERE name = 'testdb'"
|
||||
)
|
||||
if not cur.fetchone():
|
||||
cur.execute("CREATE DATABASE testdb WITH LOG")
|
||||
conn.close()
|
||||
except Exception:
|
||||
# If creation fails, downstream tests will surface a clearer error.
|
||||
pass
|
||||
|
||||
@ -105,15 +105,17 @@ def test_full_dml_cycle_in_one_connection(conn_params: ConnParams) -> None:
|
||||
assert rows == [(3,), (4,), (5,)]
|
||||
|
||||
|
||||
def test_commit_rollback_in_unlogged_db_raises(conn_params: ConnParams) -> None:
|
||||
"""commit() and rollback() fail with OperationalError in sysmaster (no logging).
|
||||
def test_commit_rollback_in_unlogged_db_is_noop(conn_params: ConnParams) -> None:
|
||||
"""commit() and rollback() are no-ops in sysmaster (unlogged).
|
||||
|
||||
Confirms the commit/rollback wire machinery works — it sends the right
|
||||
PDU and parses the SQ_ERR response. To actually test transactions,
|
||||
point integration at a logged database (e.g. ``stores_demo``).
|
||||
Phase 7 made these driver-side smart: the connection tracks whether
|
||||
a transaction is open. On unlogged DBs, ``_ensure_transaction`` is
|
||||
cached as "unsupported" after the first -201 from SQ_BEGIN, so
|
||||
subsequent commit/rollback calls skip the wire send entirely.
|
||||
Real transaction semantics are tested in test_transactions.py
|
||||
against the logged ``testdb`` database.
|
||||
"""
|
||||
with (
|
||||
_connect(conn_params) as conn,
|
||||
pytest.raises(informix_db.OperationalError, match="-255"),
|
||||
):
|
||||
with _connect(conn_params) as conn:
|
||||
# No transaction is open; both should silently no-op.
|
||||
conn.commit()
|
||||
conn.rollback()
|
||||
|
||||
@ -88,12 +88,17 @@ def test_not_null_violation_is_integrity_error(conn_params: ConnParams) -> None:
|
||||
assert excinfo.value.sqlcode == -391
|
||||
|
||||
|
||||
def test_commit_in_unlogged_db_is_operational_error(conn_params: ConnParams) -> None:
|
||||
"""sqlcode -255 (no transaction) → OperationalError."""
|
||||
def test_commit_in_unlogged_db_is_noop(conn_params: ConnParams) -> None:
|
||||
"""commit() on an unlogged DB does NOT raise — it's a silent no-op.
|
||||
|
||||
Phase 7 made this driver-side smart. Calling commit() when no
|
||||
server-side transaction is open is a no-op (same as autocommit-mode
|
||||
commit). This matches what most DB-API drivers do for graceful
|
||||
degradation. The PEP 249 spec is silent on this case.
|
||||
"""
|
||||
with _connect(conn_params) as conn:
|
||||
with pytest.raises(informix_db.OperationalError) as excinfo:
|
||||
conn.commit()
|
||||
assert excinfo.value.sqlcode == -255
|
||||
conn.commit() # must not raise
|
||||
conn.rollback() # must not raise either
|
||||
|
||||
|
||||
def test_connection_survives_query_error(conn_params: ConnParams) -> None:
|
||||
|
||||
295
tests/test_transactions.py
Normal file
295
tests/test_transactions.py
Normal file
@ -0,0 +1,295 @@
|
||||
"""Phase 7 integration tests — real transaction semantics on a logged DB.
|
||||
|
||||
These tests run against ``testdb`` (created with ``WITH LOG`` — see
|
||||
``conftest.py::_ensure_testdb``). Without a logged database, ``COMMIT``
|
||||
and ``ROLLBACK`` are no-ops, so the canonical "rolled-back insert is
|
||||
invisible" test would silently lie.
|
||||
|
||||
The tests cover:
|
||||
- Autocommit on/off semantics
|
||||
- Single-connection commit visibility
|
||||
- Rollback isolation (the gate test from the original Phase 3 plan)
|
||||
- Cross-connection isolation (committed data visible to a fresh conn,
|
||||
uncommitted data not)
|
||||
- Multi-statement transactions
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
from collections.abc import Iterator
|
||||
|
||||
import pytest
|
||||
|
||||
import informix_db
|
||||
from tests.conftest import ConnParams
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
def _connect(params: ConnParams, *, autocommit: bool = False) -> informix_db.Connection:
|
||||
return informix_db.connect(
|
||||
host=params.host,
|
||||
port=params.port,
|
||||
user=params.user,
|
||||
password=params.password,
|
||||
database=params.database,
|
||||
server=params.server,
|
||||
connect_timeout=10.0,
|
||||
read_timeout=10.0,
|
||||
autocommit=autocommit,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def fresh_table(logged_db_params: ConnParams) -> Iterator[str]:
|
||||
"""Create a fresh test table per test and drop it on teardown.
|
||||
|
||||
Uses a permanent table (not TEMP) because TEMP tables are session-
|
||||
scoped and don't participate in cross-connection isolation tests.
|
||||
"""
|
||||
table = "t_txn_test"
|
||||
with _connect(logged_db_params, autocommit=True) as conn:
|
||||
cur = conn.cursor()
|
||||
with contextlib.suppress(Exception):
|
||||
cur.execute(f"DROP TABLE {table}")
|
||||
cur.execute(f"CREATE TABLE {table} (id INTEGER, label VARCHAR(64))")
|
||||
try:
|
||||
yield table
|
||||
finally:
|
||||
with _connect(logged_db_params, autocommit=True) as conn:
|
||||
cur = conn.cursor()
|
||||
with contextlib.suppress(Exception):
|
||||
cur.execute(f"DROP TABLE {table}")
|
||||
|
||||
|
||||
# -------- Commit + rollback ---------------------------------------------
|
||||
|
||||
|
||||
def test_committed_insert_persists(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""Canonical commit test: COMMIT makes data permanent."""
|
||||
with _connect(logged_db_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)", (1, "alpha")
|
||||
)
|
||||
conn.commit()
|
||||
cur.execute(f"SELECT id, label FROM {fresh_table}")
|
||||
assert cur.fetchall() == [(1, "alpha")]
|
||||
|
||||
|
||||
def test_rollback_hides_insert(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""Canonical rollback test (the Phase 3 gate test).
|
||||
|
||||
A rolled-back INSERT must be invisible to subsequent SELECTs in the
|
||||
same session. This is the load-bearing transaction guarantee.
|
||||
"""
|
||||
with _connect(logged_db_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)", (1, "alpha")
|
||||
)
|
||||
conn.rollback()
|
||||
cur.execute(f"SELECT COUNT(*) FROM {fresh_table}")
|
||||
(count,) = cur.fetchone()
|
||||
assert int(count) == 0
|
||||
|
||||
|
||||
def test_rollback_after_multiple_inserts(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""ROLLBACK reverts ALL pending statements, not just the most recent."""
|
||||
with _connect(logged_db_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.executemany(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)",
|
||||
[(1, "a"), (2, "b"), (3, "c"), (4, "d")],
|
||||
)
|
||||
conn.rollback()
|
||||
cur.execute(f"SELECT COUNT(*) FROM {fresh_table}")
|
||||
(count,) = cur.fetchone()
|
||||
assert int(count) == 0
|
||||
|
||||
|
||||
def test_partial_commit_then_rollback(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""Committed data stays; subsequent uncommitted data rolls back cleanly."""
|
||||
with _connect(logged_db_params) as conn:
|
||||
cur = conn.cursor()
|
||||
# Phase 1: commit some rows
|
||||
cur.executemany(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)",
|
||||
[(1, "kept-a"), (2, "kept-b")],
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Phase 2: insert more, but roll back
|
||||
cur.executemany(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)",
|
||||
[(3, "lost-a"), (4, "lost-b")],
|
||||
)
|
||||
conn.rollback()
|
||||
|
||||
cur.execute(
|
||||
f"SELECT id, label FROM {fresh_table} ORDER BY id"
|
||||
)
|
||||
assert cur.fetchall() == [(1, "kept-a"), (2, "kept-b")]
|
||||
|
||||
|
||||
# -------- Autocommit ---------------------------------------------------
|
||||
|
||||
|
||||
def test_autocommit_persists_without_explicit_commit(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""With autocommit=True, no explicit commit() is needed."""
|
||||
with _connect(logged_db_params, autocommit=True) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)", (1, "auto")
|
||||
)
|
||||
# No conn.commit() call — autocommit handles it
|
||||
|
||||
# Verify in a fresh connection that the row is durable
|
||||
with _connect(logged_db_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(f"SELECT id, label FROM {fresh_table}")
|
||||
assert cur.fetchall() == [(1, "auto")]
|
||||
|
||||
|
||||
def test_autocommit_rollback_is_noop(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""In autocommit mode, rollback() can't undo already-committed work.
|
||||
|
||||
DB-API 2.0 spec is silent on the exact behavior here, but the
|
||||
pragmatic behavior is that rollback() in autocommit mode is a no-op
|
||||
— there's no open transaction to roll back. The data persists.
|
||||
"""
|
||||
with _connect(logged_db_params, autocommit=True) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)", (1, "x")
|
||||
)
|
||||
# Try to roll back — should be a no-op since each statement
|
||||
# was already committed. Some drivers raise; either is OK.
|
||||
with contextlib.suppress(Exception):
|
||||
conn.rollback()
|
||||
|
||||
cur.execute(f"SELECT COUNT(*) FROM {fresh_table}")
|
||||
(count,) = cur.fetchone()
|
||||
assert int(count) == 1
|
||||
|
||||
|
||||
# -------- Cross-connection isolation -----------------------------------
|
||||
|
||||
|
||||
def test_committed_data_visible_to_fresh_connection(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""Committed writes in conn A are durably visible to a fresh conn B.
|
||||
|
||||
This is the "data is durable across connection boundaries"
|
||||
guarantee. We don't test the *uncommitted-invisible* direction
|
||||
here because Informix's default isolation level (Committed Read
|
||||
with row-level locking) makes conn B *block* on uncommitted writes
|
||||
rather than reading zero — which surfaces as -252 lock-timeout
|
||||
errors that depend on `lock mode wait` config rather than on
|
||||
transaction semantics. The blocking behavior is correct; testing
|
||||
it here would just be testing Informix's lock manager.
|
||||
"""
|
||||
conn_a = _connect(logged_db_params)
|
||||
try:
|
||||
cur_a = conn_a.cursor()
|
||||
cur_a.execute(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)", (1, "writer")
|
||||
)
|
||||
conn_a.commit()
|
||||
finally:
|
||||
conn_a.close()
|
||||
|
||||
# Fresh connection — should see the committed row
|
||||
with _connect(logged_db_params) as conn_b:
|
||||
cur_b = conn_b.cursor()
|
||||
cur_b.execute(f"SELECT id, label FROM {fresh_table}")
|
||||
assert cur_b.fetchall() == [(1, "writer")]
|
||||
|
||||
|
||||
# -------- Mixed DML + sanity -------------------------------------------
|
||||
|
||||
|
||||
def test_update_inside_transaction(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""UPDATE within a transaction is reverted by ROLLBACK."""
|
||||
with _connect(logged_db_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)", (1, "original")
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Update + roll back
|
||||
cur.execute(
|
||||
f"UPDATE {fresh_table} SET label = ? WHERE id = ?",
|
||||
("modified", 1),
|
||||
)
|
||||
conn.rollback()
|
||||
|
||||
cur.execute(f"SELECT label FROM {fresh_table} WHERE id = ?", (1,))
|
||||
assert cur.fetchone() == ("original",)
|
||||
|
||||
|
||||
def test_delete_inside_transaction(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""DELETE within a transaction is reverted by ROLLBACK."""
|
||||
with _connect(logged_db_params) as conn:
|
||||
cur = conn.cursor()
|
||||
cur.executemany(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)",
|
||||
[(1, "a"), (2, "b"), (3, "c")],
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
cur.execute(f"DELETE FROM {fresh_table} WHERE id = ?", (2,))
|
||||
conn.rollback()
|
||||
|
||||
cur.execute(f"SELECT COUNT(*) FROM {fresh_table}")
|
||||
(count,) = cur.fetchone()
|
||||
assert int(count) == 3
|
||||
|
||||
|
||||
def test_implicit_transaction_per_dml(
|
||||
logged_db_params: ConnParams, fresh_table: str
|
||||
) -> None:
|
||||
"""The driver implicitly opens a transaction on each DML.
|
||||
|
||||
Users don't need (and shouldn't use) ``BEGIN WORK`` in SQL —
|
||||
the driver sends ``SQ_BEGIN`` automatically before the first DML
|
||||
in non-autocommit mode. After ``commit()``, the next DML opens
|
||||
a fresh transaction. This test verifies the round-trip of two
|
||||
independent transactions on the same connection.
|
||||
"""
|
||||
with _connect(logged_db_params) as conn:
|
||||
cur = conn.cursor()
|
||||
|
||||
# Transaction 1
|
||||
cur.execute(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)", (1, "txn-1")
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
# Transaction 2 — completely separate, opens fresh BEGIN
|
||||
cur.execute(
|
||||
f"INSERT INTO {fresh_table} VALUES (?, ?)", (2, "txn-2")
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
cur.execute(f"SELECT id, label FROM {fresh_table} ORDER BY id")
|
||||
assert cur.fetchall() == [(1, "txn-1"), (2, "txn-2")]
|
||||
Loading…
x
Reference in New Issue
Block a user