Optional TLS via the ``tls`` parameter on connect() and IfxSocket. Three modes: tls=False (default) — plain TCP, current behavior unchanged tls=True — TLS w/ verification disabled (dev / self-signed) tls=ssl.SSLContext — caller-supplied context (production) Plus tls_server_hostname for SNI / cert verification. Architectural choice: Informix uses dedicated TLS-enabled listener ports (configured in server's sqlhosts), NOT STARTTLS upgrade. The SSL handshake runs immediately after TCP connect with no protocol- level negotiation. Wrapping happens inside IfxSocket.__init__ so the rest of the protocol layer (login PDU, SQ_BIND, fast-path, file transfer) is fully unaware of whether TLS is in use. Why tls=True defaults to insecure: most Informix dev installations use self-signed certs. tls=True produces a context with check_hostname=False and verify_mode=CERT_NONE. Minimum protocol is still TLSv1.2 (per ssl.PROTOCOL_TLS_CLIENT). Production users are expected to pass ssl.SSLContext explicitly. Tests: 5 unit tests in test_tls.py * tls=True dev context properties * default context uses TLSv1.2+ * real handshake against in-process TLS echo server (proves wrap_socket works end-to-end) * custom SSLContext honored verbatim * tls=True against non-TLS port raises OperationalError clearly Test certs are generated via openssl CLI subprocess instead of adding cryptography as a dev dep (saves ~5MB transitive deps for one phase). Total: 69 unit + 139 integration = 208 tests. Architectural milestone: with Phase 14 complete, the driver now implements EVERYTHING in the SQLI wire-protocol family that a Python application needs. Remaining backlog (async, pooling) is library- design work, not protocol work.
186 lines
5.8 KiB
Python
186 lines
5.8 KiB
Python
"""Phase 14 unit tests — TLS layer for IfxSocket.
|
|
|
|
Informix uses dedicated TLS-enabled listener ports rather than
|
|
STARTTLS-style upgrade. The SSL handshake runs immediately after
|
|
TCP connect, transparent to the SQLI protocol layer.
|
|
|
|
These tests exercise the wrapping logic against a local Python
|
|
TLS echo server — they don't need an actual TLS-enabled Informix
|
|
listener. The integration tests for "plain TCP works" and
|
|
"connecting tls=True to a non-TLS port fails" live alongside the
|
|
other smoke tests.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import socket
|
|
import ssl
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from informix_db._socket import IfxSocket, _make_default_dev_context
|
|
|
|
# -------- API surface --------
|
|
|
|
|
|
def test_default_dev_context_has_no_verification() -> None:
|
|
"""``tls=True`` produces a context safe for self-signed dev servers."""
|
|
ctx = _make_default_dev_context()
|
|
assert ctx.check_hostname is False
|
|
assert ctx.verify_mode == ssl.CERT_NONE
|
|
|
|
|
|
def test_dev_context_uses_modern_protocol() -> None:
|
|
"""The default context excludes TLSv1.0/1.1 (Python's SSLContext defaults)."""
|
|
ctx = _make_default_dev_context()
|
|
# ssl.PROTOCOL_TLS_CLIENT prevents downgrade attacks; minimum TLS 1.2
|
|
assert ctx.minimum_version >= ssl.TLSVersion.TLSv1_2
|
|
|
|
|
|
# -------- End-to-end wrapping: local TLS echo server --------
|
|
|
|
|
|
def _make_self_signed_cert(tmp_path: Path) -> tuple[Path, Path]:
|
|
"""Generate an ephemeral self-signed cert via the ``openssl`` CLI.
|
|
|
|
Skips the test if ``openssl`` isn't on PATH. Avoids adding
|
|
``cryptography`` as a hard dev dep for one corner-case test.
|
|
"""
|
|
import shutil
|
|
import subprocess
|
|
|
|
if not shutil.which("openssl"):
|
|
pytest.skip("openssl CLI not available; needed to generate test cert")
|
|
|
|
cert_path = tmp_path / "cert.pem"
|
|
key_path = tmp_path / "key.pem"
|
|
subprocess.run(
|
|
[
|
|
"openssl", "req", "-x509", "-newkey", "rsa:2048",
|
|
"-keyout", str(key_path), "-out", str(cert_path),
|
|
"-days", "1", "-nodes",
|
|
"-subj", "/CN=localhost",
|
|
"-addext", "subjectAltName=DNS:localhost",
|
|
],
|
|
check=True, capture_output=True,
|
|
)
|
|
return cert_path, key_path
|
|
|
|
|
|
def _start_tls_echo_server(
|
|
cert_path: Path, key_path: Path
|
|
) -> tuple[int, threading.Thread, threading.Event]:
|
|
"""Spin up a one-shot TLS echo server on an ephemeral port.
|
|
|
|
Returns ``(port, thread, ready_event)``. The thread accepts ONE
|
|
connection, echoes back any bytes sent, and exits.
|
|
"""
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ctx.load_cert_chain(certfile=str(cert_path), keyfile=str(key_path))
|
|
|
|
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
listener.bind(("127.0.0.1", 0))
|
|
listener.listen(1)
|
|
port = listener.getsockname()[1]
|
|
ready = threading.Event()
|
|
|
|
def serve() -> None:
|
|
ready.set()
|
|
try:
|
|
client, _ = listener.accept()
|
|
with ctx.wrap_socket(client, server_side=True) as sslsock:
|
|
while True:
|
|
data = sslsock.recv(4096)
|
|
if not data:
|
|
break
|
|
sslsock.sendall(data)
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
listener.close()
|
|
|
|
t = threading.Thread(target=serve, daemon=True)
|
|
t.start()
|
|
return port, t, ready
|
|
|
|
|
|
def test_tls_handshake_succeeds_against_real_tls_server(tmp_path: Path) -> None:
|
|
"""``IfxSocket(tls=True)`` completes a TLS handshake with a self-signed server."""
|
|
cert_path, key_path = _make_self_signed_cert(tmp_path)
|
|
port, _t, ready = _start_tls_echo_server(cert_path, key_path)
|
|
ready.wait(timeout=2.0)
|
|
|
|
sock = IfxSocket(
|
|
"127.0.0.1", port,
|
|
connect_timeout=2.0,
|
|
read_timeout=2.0,
|
|
tls=True,
|
|
)
|
|
try:
|
|
# Echo round-trip proves the encrypted channel works
|
|
sock.write_all(b"phase 14 tls test")
|
|
echoed = sock.read_exact(len(b"phase 14 tls test"))
|
|
assert echoed == b"phase 14 tls test"
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
def test_tls_handshake_with_custom_context(tmp_path: Path) -> None:
|
|
"""Caller-supplied ``ssl.SSLContext`` is honored verbatim."""
|
|
cert_path, key_path = _make_self_signed_cert(tmp_path)
|
|
port, _t, ready = _start_tls_echo_server(cert_path, key_path)
|
|
ready.wait(timeout=2.0)
|
|
|
|
# Caller can supply a context with custom CA verification
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
ctx.check_hostname = True
|
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
|
ctx.load_verify_locations(cafile=str(cert_path))
|
|
|
|
sock = IfxSocket(
|
|
"127.0.0.1", port,
|
|
connect_timeout=2.0,
|
|
read_timeout=2.0,
|
|
tls=ctx,
|
|
tls_server_hostname="localhost",
|
|
)
|
|
try:
|
|
sock.write_all(b"verified")
|
|
assert sock.read_exact(8) == b"verified"
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
def test_tls_against_plain_socket_raises_operational_error() -> None:
|
|
"""``tls=True`` against a non-TLS port produces a clear error."""
|
|
# Use a one-shot plain TCP server that hangs up immediately
|
|
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
listener.bind(("127.0.0.1", 0))
|
|
listener.listen(1)
|
|
port = listener.getsockname()[1]
|
|
|
|
accepted: list[socket.socket] = []
|
|
|
|
def serve() -> None:
|
|
client, _ = listener.accept()
|
|
accepted.append(client)
|
|
client.close() # drop the connection — looks like garbage to TLS
|
|
|
|
t = threading.Thread(target=serve, daemon=True)
|
|
t.start()
|
|
|
|
try:
|
|
from informix_db.exceptions import OperationalError
|
|
with pytest.raises(OperationalError, match="TLS handshake failed"):
|
|
IfxSocket(
|
|
"127.0.0.1", port,
|
|
connect_timeout=2.0,
|
|
read_timeout=2.0,
|
|
tls=True,
|
|
)
|
|
finally:
|
|
listener.close()
|
|
t.join(timeout=2.0)
|