"""Controlled TCP proxy for fault-injection testing. Spins up a one-shot proxy in a thread that forwards bytes between the test client and the real Informix server. The test can call :meth:`ControlledProxy.kill` at any moment to simulate a network drop or server crash mid-conversation. Usage:: proxy = ControlledProxy("127.0.0.1", 9088) proxy.start() conn = informix_db.connect(host="127.0.0.1", port=proxy.port, ...) cur = conn.cursor() cur.execute(...) proxy.kill() # simulated network drop cur.fetchone() # should raise OperationalError proxy.close() """ from __future__ import annotations import contextlib import socket import threading class ControlledProxy: """A TCP forwarder we can kill at will. Listens on an ephemeral port on 127.0.0.1, forwards bytes to/from the upstream Informix server. Forwarding runs in two daemon threads (one per direction). ``kill()`` closes both sockets, simulating a network drop. Idempotent. """ def __init__(self, upstream_host: str, upstream_port: int): self.upstream_host = upstream_host self.upstream_port = upstream_port self._listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._listener.bind(("127.0.0.1", 0)) self._listener.listen(1) self.port = self._listener.getsockname()[1] self._client: socket.socket | None = None self._upstream: socket.socket | None = None self._threads: list[threading.Thread] = [] self._killed = False def start(self) -> None: """Begin accepting on a daemon thread (returns immediately).""" def accept_and_forward() -> None: try: client, _ = self._listener.accept() upstream = socket.create_connection( (self.upstream_host, self.upstream_port), timeout=5.0 ) self._client = client self._upstream = upstream t1 = threading.Thread( target=self._pump, args=(client, upstream), daemon=True ) t2 = threading.Thread( target=self._pump, args=(upstream, client), daemon=True ) t1.start() t2.start() self._threads.extend([t1, t2]) except Exception: pass # caller's connect will fail visibly accept_thread = threading.Thread(target=accept_and_forward, daemon=True) accept_thread.start() def _pump(self, src: socket.socket, dst: socket.socket) -> None: try: while not self._killed: data = src.recv(8192) if not data: break dst.sendall(data) except OSError: pass def kill(self) -> None: """Sever the connection. Mimics network failure / server crash. Closes both sockets *brutally* (SO_LINGER=0 for RST instead of FIN) so the client sees a connection-aborted error, not a clean EOF. """ self._killed = True for sock in (self._client, self._upstream): if sock is not None: with contextlib.suppress(OSError): # Force RST instead of FIN: SO_LINGER=0 import struct sock.setsockopt( socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0), ) with contextlib.suppress(OSError): sock.close() self._client = None self._upstream = None def close(self) -> None: """Final cleanup — closes everything including the listener.""" self.kill() with contextlib.suppress(OSError): self._listener.close() def __enter__(self) -> ControlledProxy: self.start() return self def __exit__(self, *_exc: object) -> None: self.close()