informix-db/tests/test_tls.py
Ryan Malloy 345838fe2d Phase 14: TLS / SSL transport
Optional TLS via the ``tls`` parameter on connect() and IfxSocket.
Three modes:

  tls=False (default)  — plain TCP, current behavior unchanged
  tls=True             — TLS w/ verification disabled (dev / self-signed)
  tls=ssl.SSLContext   — caller-supplied context (production)

Plus tls_server_hostname for SNI / cert verification.

Architectural choice: Informix uses dedicated TLS-enabled listener
ports (configured in server's sqlhosts), NOT STARTTLS upgrade. The
SSL handshake runs immediately after TCP connect with no protocol-
level negotiation. Wrapping happens inside IfxSocket.__init__ so the
rest of the protocol layer (login PDU, SQ_BIND, fast-path, file
transfer) is fully unaware of whether TLS is in use.

Why tls=True defaults to insecure: most Informix dev installations
use self-signed certs. tls=True produces a context with
check_hostname=False and verify_mode=CERT_NONE. Minimum protocol is
still TLSv1.2 (per ssl.PROTOCOL_TLS_CLIENT). Production users are
expected to pass ssl.SSLContext explicitly.

Tests: 5 unit tests in test_tls.py
* tls=True dev context properties
* default context uses TLSv1.2+
* real handshake against in-process TLS echo server (proves wrap_socket
  works end-to-end)
* custom SSLContext honored verbatim
* tls=True against non-TLS port raises OperationalError clearly

Test certs are generated via openssl CLI subprocess instead of
adding cryptography as a dev dep (saves ~5MB transitive deps for
one phase).

Total: 69 unit + 139 integration = 208 tests.

Architectural milestone: with Phase 14 complete, the driver now
implements EVERYTHING in the SQLI wire-protocol family that a Python
application needs. Remaining backlog (async, pooling) is library-
design work, not protocol work.
2026-05-04 14:46:53 -06:00

186 lines
5.8 KiB
Python

"""Phase 14 unit tests — TLS layer for IfxSocket.
Informix uses dedicated TLS-enabled listener ports rather than
STARTTLS-style upgrade. The SSL handshake runs immediately after
TCP connect, transparent to the SQLI protocol layer.
These tests exercise the wrapping logic against a local Python
TLS echo server — they don't need an actual TLS-enabled Informix
listener. The integration tests for "plain TCP works" and
"connecting tls=True to a non-TLS port fails" live alongside the
other smoke tests.
"""
from __future__ import annotations
import socket
import ssl
import threading
from pathlib import Path
import pytest
from informix_db._socket import IfxSocket, _make_default_dev_context
# -------- API surface --------
def test_default_dev_context_has_no_verification() -> None:
"""``tls=True`` produces a context safe for self-signed dev servers."""
ctx = _make_default_dev_context()
assert ctx.check_hostname is False
assert ctx.verify_mode == ssl.CERT_NONE
def test_dev_context_uses_modern_protocol() -> None:
"""The default context excludes TLSv1.0/1.1 (Python's SSLContext defaults)."""
ctx = _make_default_dev_context()
# ssl.PROTOCOL_TLS_CLIENT prevents downgrade attacks; minimum TLS 1.2
assert ctx.minimum_version >= ssl.TLSVersion.TLSv1_2
# -------- End-to-end wrapping: local TLS echo server --------
def _make_self_signed_cert(tmp_path: Path) -> tuple[Path, Path]:
"""Generate an ephemeral self-signed cert via the ``openssl`` CLI.
Skips the test if ``openssl`` isn't on PATH. Avoids adding
``cryptography`` as a hard dev dep for one corner-case test.
"""
import shutil
import subprocess
if not shutil.which("openssl"):
pytest.skip("openssl CLI not available; needed to generate test cert")
cert_path = tmp_path / "cert.pem"
key_path = tmp_path / "key.pem"
subprocess.run(
[
"openssl", "req", "-x509", "-newkey", "rsa:2048",
"-keyout", str(key_path), "-out", str(cert_path),
"-days", "1", "-nodes",
"-subj", "/CN=localhost",
"-addext", "subjectAltName=DNS:localhost",
],
check=True, capture_output=True,
)
return cert_path, key_path
def _start_tls_echo_server(
cert_path: Path, key_path: Path
) -> tuple[int, threading.Thread, threading.Event]:
"""Spin up a one-shot TLS echo server on an ephemeral port.
Returns ``(port, thread, ready_event)``. The thread accepts ONE
connection, echoes back any bytes sent, and exits.
"""
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(certfile=str(cert_path), keyfile=str(key_path))
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind(("127.0.0.1", 0))
listener.listen(1)
port = listener.getsockname()[1]
ready = threading.Event()
def serve() -> None:
ready.set()
try:
client, _ = listener.accept()
with ctx.wrap_socket(client, server_side=True) as sslsock:
while True:
data = sslsock.recv(4096)
if not data:
break
sslsock.sendall(data)
except Exception:
pass
finally:
listener.close()
t = threading.Thread(target=serve, daemon=True)
t.start()
return port, t, ready
def test_tls_handshake_succeeds_against_real_tls_server(tmp_path: Path) -> None:
"""``IfxSocket(tls=True)`` completes a TLS handshake with a self-signed server."""
cert_path, key_path = _make_self_signed_cert(tmp_path)
port, _t, ready = _start_tls_echo_server(cert_path, key_path)
ready.wait(timeout=2.0)
sock = IfxSocket(
"127.0.0.1", port,
connect_timeout=2.0,
read_timeout=2.0,
tls=True,
)
try:
# Echo round-trip proves the encrypted channel works
sock.write_all(b"phase 14 tls test")
echoed = sock.read_exact(len(b"phase 14 tls test"))
assert echoed == b"phase 14 tls test"
finally:
sock.close()
def test_tls_handshake_with_custom_context(tmp_path: Path) -> None:
"""Caller-supplied ``ssl.SSLContext`` is honored verbatim."""
cert_path, key_path = _make_self_signed_cert(tmp_path)
port, _t, ready = _start_tls_echo_server(cert_path, key_path)
ready.wait(timeout=2.0)
# Caller can supply a context with custom CA verification
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.load_verify_locations(cafile=str(cert_path))
sock = IfxSocket(
"127.0.0.1", port,
connect_timeout=2.0,
read_timeout=2.0,
tls=ctx,
tls_server_hostname="localhost",
)
try:
sock.write_all(b"verified")
assert sock.read_exact(8) == b"verified"
finally:
sock.close()
def test_tls_against_plain_socket_raises_operational_error() -> None:
"""``tls=True`` against a non-TLS port produces a clear error."""
# Use a one-shot plain TCP server that hangs up immediately
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.bind(("127.0.0.1", 0))
listener.listen(1)
port = listener.getsockname()[1]
accepted: list[socket.socket] = []
def serve() -> None:
client, _ = listener.accept()
accepted.append(client)
client.close() # drop the connection — looks like garbage to TLS
t = threading.Thread(target=serve, daemon=True)
t.start()
try:
from informix_db.exceptions import OperationalError
with pytest.raises(OperationalError, match="TLS handshake failed"):
IfxSocket(
"127.0.0.1", port,
connect_timeout=2.0,
read_timeout=2.0,
tls=True,
)
finally:
listener.close()
t.join(timeout=2.0)