informix-db/tests/test_protocol.py
Ryan Malloy 0b13acb13d Phase 30: Final hardening pass (2026.05.05.4)
Closes the last 3 medium-severity items from Hamilton's system-wide
audit. **0 critical, 0 high, 0 medium remaining.**

What changed:

pool.py:
* Pool acquire() growth path: restructured to remove _lock._is_owned()
  (CPython-private API) usage. Two explicit re-acquires (success path
  + exception path) replace the older try/finally + private check.

connections.py:
* _raise_from_rejection now extracts the server's human-readable
  error string from the rejection payload and surfaces it in the
  OperationalError. Wrong-password vs wrong-database now produce
  distinguishable errors. New helper _extract_server_error_text
  finds the longest printable-ASCII run (8-256 chars). Falls back
  to a hex preview when no string is found.
* _send_exit: broadened catch from (OperationalError, InterfaceError,
  OSError, ProtocolError) to bare Exception. Best-effort by
  definition; the socket FD is freed by close()'s finally clause via
  _socket.IfxSocket.close (idempotent, never-raising). Prevents
  unexpected errors from escaping close() and leaving partial state.

5 new unit tests in test_protocol.py for _extract_server_error_text:
finds-longest-run, picks-longest-of-multiple, too-short-returns-None,
empty-handled, caps-at-256.

77 unit + 231 integration + 28 benchmark = 336 tests; ruff clean.

Hamilton audit punch list final state: every actionable finding
addressed. No CRITICAL, no HIGH, no MEDIUM remaining.

  Pre-Phase-26: 2 critical, 3 high, 5 medium
  Post-Phase-30: 0 critical, 0 high, 0 medium - PRODUCTION READY
2026-05-05 10:52:39 -06:00

228 lines
8.1 KiB
Python

"""Unit tests for ``_protocol.py`` framing primitives.
These run without a database — they assert that the encoders produce
exactly the bytes IBM's JDBC reference would produce, and that the
decoders round-trip cleanly. The byte expectations come from the
captured login PDU in ``docs/CAPTURES/01-connect-only.socat.log``.
"""
from __future__ import annotations
from io import BytesIO
import pytest
from informix_db._protocol import IfxStreamReader, IfxStreamWriter, ProtocolError
def make_writer() -> tuple[IfxStreamWriter, BytesIO]:
buf = BytesIO()
return IfxStreamWriter(buf), buf
# ---------------------------------------------------------------------------
# Endianness
# ---------------------------------------------------------------------------
class TestEndianness:
"""All multi-byte integers MUST be big-endian (network byte order)."""
def test_short_is_big_endian(self) -> None:
w, buf = make_writer()
w.write_short(0x1234)
assert buf.getvalue() == b"\x12\x34"
def test_int_is_big_endian(self) -> None:
w, buf = make_writer()
w.write_int(0x12345678)
assert buf.getvalue() == b"\x12\x34\x56\x78"
def test_long_bigint_is_big_endian(self) -> None:
w, buf = make_writer()
w.write_long_bigint(0x123456789ABCDEF0)
assert buf.getvalue() == b"\x12\x34\x56\x78\x9a\xbc\xde\xf0"
def test_observed_capability_int_matches(self) -> None:
"""Cap_2 from the captured PDU is 0x3c000000 — verify our encoder hits it."""
w, buf = make_writer()
w.write_int(0x3C000000)
assert buf.getvalue() == b"\x3c\x00\x00\x00"
# ---------------------------------------------------------------------------
# Padding
# ---------------------------------------------------------------------------
class TestPadding:
"""Variable-length payloads must be padded to even byte alignment."""
def test_even_length_no_padding(self) -> None:
w, buf = make_writer()
w.write_padded(b"\x01\x02")
assert buf.getvalue() == b"\x01\x02"
def test_odd_length_pads_with_nul(self) -> None:
w, buf = make_writer()
w.write_padded(b"\x01\x02\x03")
assert buf.getvalue() == b"\x01\x02\x03\x00"
def test_empty_no_padding(self) -> None:
w, buf = make_writer()
w.write_padded(b"")
assert buf.getvalue() == b""
# ---------------------------------------------------------------------------
# Length-prefixed strings
# ---------------------------------------------------------------------------
class TestStringEncoding:
"""``write_string_with_nul`` must produce ``[short len+1][bytes][nul]``."""
def test_simple_ascii(self) -> None:
"""The username 'informix' from the captured login: 9 bytes (8+nul)."""
w, buf = make_writer()
w.write_string_with_nul("informix")
assert buf.getvalue() == b"\x00\x09informix\x00"
def test_password_in4mix_matches_capture(self) -> None:
"""The password 'in4mix' from the captured login: 7 bytes (6+nul)."""
w, buf = make_writer()
w.write_string_with_nul("in4mix")
assert buf.getvalue() == b"\x00\x07in4mix\x00"
def test_empty_string(self) -> None:
w, buf = make_writer()
w.write_string_with_nul("")
# Empty string: length=1 (just the nul), content=just the nul.
assert buf.getvalue() == b"\x00\x01\x00"
# ---------------------------------------------------------------------------
# Reader round-trips
# ---------------------------------------------------------------------------
class TestRoundTrip:
"""Every encoder must round-trip through its matching decoder."""
@pytest.mark.parametrize("value", [0, 1, -1, 32767, -32768, 0x1234])
def test_short_round_trip(self, value: int) -> None:
w, buf = make_writer()
w.write_short(value)
r = IfxStreamReader(BytesIO(buf.getvalue()))
assert r.read_short() == value
@pytest.mark.parametrize("value", [0, 1, -1, 0x7FFFFFFF, -0x80000000, 0x3C000000])
def test_int_round_trip(self, value: int) -> None:
w, buf = make_writer()
w.write_int(value)
r = IfxStreamReader(BytesIO(buf.getvalue()))
assert r.read_int() == value
@pytest.mark.parametrize("value", [0, 1, -1, 0x7FFFFFFFFFFFFFFF, -0x8000000000000000])
def test_long_bigint_round_trip(self, value: int) -> None:
w, buf = make_writer()
w.write_long_bigint(value)
r = IfxStreamReader(BytesIO(buf.getvalue()))
assert r.read_long_bigint() == value
@pytest.mark.parametrize("text", ["", "informix", "in4mix", "hello world"])
def test_string_with_nul_round_trip(self, text: str) -> None:
w, buf = make_writer()
w.write_string_with_nul(text)
r = IfxStreamReader(BytesIO(buf.getvalue()))
assert r.read_string_with_nul() == text
# ---------------------------------------------------------------------------
# Failure modes
# ---------------------------------------------------------------------------
class TestFailureModes:
"""Truncated or malformed streams must raise ``ProtocolError``, not silently misbehave."""
def test_short_eof_raises(self) -> None:
r = IfxStreamReader(BytesIO(b"\x12")) # 1 byte, want 2
with pytest.raises(ProtocolError, match="unexpected EOF"):
r.read_short()
def test_int_eof_raises(self) -> None:
r = IfxStreamReader(BytesIO(b"\x00\x00\x00")) # 3 bytes, want 4
with pytest.raises(ProtocolError, match="unexpected EOF"):
r.read_int()
def test_negative_string_length_rejected(self) -> None:
# A short with the high bit set parses as negative; should fail fast.
r = IfxStreamReader(BytesIO(b"\xff\xff"))
with pytest.raises(ProtocolError, match="negative string length"):
r.read_string_with_nul()
# -------- Phase 30: server-error-text extraction (login rejection diagnostics) --------
def test_extract_server_error_text_finds_longest_printable_run() -> None:
"""Phase 30: ``_extract_server_error_text`` surfaces the human-readable
error string from an opaque rejection payload.
The function extracts the longest printable-ASCII run of length
≥ 8 and ≤ 256. Used to give login-rejection errors enough
diagnostic context to distinguish wrong-password from wrong-database
from version-mismatch — without doing the full structured decode
of the SLheader rejection block.
"""
from informix_db.connections import _extract_server_error_text
# Typical rejection payload: binary header + length-prefixed text
payload = (
b"\x00\x01\x00\x02\x00\x00\x00\x10"
b"incorrect password supplied"
b"\x00\x00\x00"
)
assert _extract_server_error_text(payload) == "incorrect password supplied"
def test_extract_server_error_text_picks_longest_run() -> None:
"""When multiple printable runs exist, return the longest."""
from informix_db.connections import _extract_server_error_text
payload = (
b"short\x00"
b"\x01\x02"
b"this is the longer error message we want\x00"
b"\x03"
b"medium length text\x00"
)
assert _extract_server_error_text(payload) == (
"this is the longer error message we want"
)
def test_extract_server_error_text_returns_none_when_too_short() -> None:
"""Runs under 8 chars don't qualify (avoids garbage matches)."""
from informix_db.connections import _extract_server_error_text
payload = b"\x00\x01abc\x02\x03def\x00ghij" # all runs < 8 chars
assert _extract_server_error_text(payload) is None
def test_extract_server_error_text_handles_empty_payload() -> None:
"""Empty input → None; doesn't crash."""
from informix_db.connections import _extract_server_error_text
assert _extract_server_error_text(b"") is None
def test_extract_server_error_text_caps_run_at_256() -> None:
"""Runs over 256 chars don't qualify (likely a binary block
misinterpreted as text)."""
from informix_db.connections import _extract_server_error_text
payload = b"a" * 300
assert _extract_server_error_text(payload) is None