"""Phase 14 unit tests — TLS layer for IfxSocket. Informix uses dedicated TLS-enabled listener ports rather than STARTTLS-style upgrade. The SSL handshake runs immediately after TCP connect, transparent to the SQLI protocol layer. These tests exercise the wrapping logic against a local Python TLS echo server — they don't need an actual TLS-enabled Informix listener. The integration tests for "plain TCP works" and "connecting tls=True to a non-TLS port fails" live alongside the other smoke tests. """ from __future__ import annotations import socket import ssl import threading from pathlib import Path import pytest from informix_db._socket import IfxSocket, _make_default_dev_context # -------- API surface -------- def test_default_dev_context_has_no_verification() -> None: """``tls=True`` produces a context safe for self-signed dev servers.""" ctx = _make_default_dev_context() assert ctx.check_hostname is False assert ctx.verify_mode == ssl.CERT_NONE def test_dev_context_uses_modern_protocol() -> None: """The default context excludes TLSv1.0/1.1 (Python's SSLContext defaults).""" ctx = _make_default_dev_context() # ssl.PROTOCOL_TLS_CLIENT prevents downgrade attacks; minimum TLS 1.2 assert ctx.minimum_version >= ssl.TLSVersion.TLSv1_2 # -------- End-to-end wrapping: local TLS echo server -------- def _make_self_signed_cert(tmp_path: Path) -> tuple[Path, Path]: """Generate an ephemeral self-signed cert via the ``openssl`` CLI. Skips the test if ``openssl`` isn't on PATH. Avoids adding ``cryptography`` as a hard dev dep for one corner-case test. """ import shutil import subprocess if not shutil.which("openssl"): pytest.skip("openssl CLI not available; needed to generate test cert") cert_path = tmp_path / "cert.pem" key_path = tmp_path / "key.pem" subprocess.run( [ "openssl", "req", "-x509", "-newkey", "rsa:2048", "-keyout", str(key_path), "-out", str(cert_path), "-days", "1", "-nodes", "-subj", "/CN=localhost", "-addext", "subjectAltName=DNS:localhost", ], check=True, capture_output=True, ) return cert_path, key_path def _start_tls_echo_server( cert_path: Path, key_path: Path ) -> tuple[int, threading.Thread, threading.Event]: """Spin up a one-shot TLS echo server on an ephemeral port. Returns ``(port, thread, ready_event)``. The thread accepts ONE connection, echoes back any bytes sent, and exits. """ ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) ctx.load_cert_chain(certfile=str(cert_path), keyfile=str(key_path)) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener.bind(("127.0.0.1", 0)) listener.listen(1) port = listener.getsockname()[1] ready = threading.Event() def serve() -> None: ready.set() try: client, _ = listener.accept() with ctx.wrap_socket(client, server_side=True) as sslsock: while True: data = sslsock.recv(4096) if not data: break sslsock.sendall(data) except Exception: pass finally: listener.close() t = threading.Thread(target=serve, daemon=True) t.start() return port, t, ready def test_tls_handshake_succeeds_against_real_tls_server(tmp_path: Path) -> None: """``IfxSocket(tls=True)`` completes a TLS handshake with a self-signed server.""" cert_path, key_path = _make_self_signed_cert(tmp_path) port, _t, ready = _start_tls_echo_server(cert_path, key_path) ready.wait(timeout=2.0) sock = IfxSocket( "127.0.0.1", port, connect_timeout=2.0, read_timeout=2.0, tls=True, ) try: # Echo round-trip proves the encrypted channel works sock.write_all(b"phase 14 tls test") echoed = sock.read_exact(len(b"phase 14 tls test")) assert echoed == b"phase 14 tls test" finally: sock.close() def test_tls_handshake_with_custom_context(tmp_path: Path) -> None: """Caller-supplied ``ssl.SSLContext`` is honored verbatim.""" cert_path, key_path = _make_self_signed_cert(tmp_path) port, _t, ready = _start_tls_echo_server(cert_path, key_path) ready.wait(timeout=2.0) # Caller can supply a context with custom CA verification ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = True ctx.verify_mode = ssl.CERT_REQUIRED ctx.load_verify_locations(cafile=str(cert_path)) sock = IfxSocket( "127.0.0.1", port, connect_timeout=2.0, read_timeout=2.0, tls=ctx, tls_server_hostname="localhost", ) try: sock.write_all(b"verified") assert sock.read_exact(8) == b"verified" finally: sock.close() def test_tls_against_plain_socket_raises_operational_error() -> None: """``tls=True`` against a non-TLS port produces a clear error.""" # Use a one-shot plain TCP server that hangs up immediately listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener.bind(("127.0.0.1", 0)) listener.listen(1) port = listener.getsockname()[1] accepted: list[socket.socket] = [] def serve() -> None: client, _ = listener.accept() accepted.append(client) client.close() # drop the connection — looks like garbage to TLS t = threading.Thread(target=serve, daemon=True) t.start() try: from informix_db.exceptions import OperationalError with pytest.raises(OperationalError, match="TLS handshake failed"): IfxSocket( "127.0.0.1", port, connect_timeout=2.0, read_timeout=2.0, tls=True, ) finally: listener.close() t.join(timeout=2.0)