From 7f82dbbbfa46909a236a68c0a34f9a6a75e483c3 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Sun, 10 May 2026 20:42:43 -0600 Subject: [PATCH] UDP transport: parallel codepath in OmniConnection + MockPanel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The C# decompile shows enuOmniLinkConnectionType has both Network_TCP=4 and Network_UDP=3 (clsOmniLinkConnection.cs uses udpSend/tcpSend parallel paths), and clsHAC carries an enuPreferredNetworkProtocol {TCP, UDP} per-installation byte. User reports their panel is configured for UDP. The TCP-only assumption was too narrow. Wire format is identical: same Packet/Message framing, same handshake, same per-block whitening, same opcodes, same port. Only differences: * UDP is connectionless; each datagram = one Packet (no stream framing) * UDP needs explicit retry-on-timeout for reliability src/omni_pca/connection.py: - New constructor args: transport: Literal['tcp','udp']='tcp', udp_retry_count: int = 3 - connect()/close() branch on transport — TCP keeps the existing asyncio.open_connection + StreamReader/Writer + reader_task path; UDP uses asyncio.get_running_loop().create_datagram_endpoint with remote_addr= so transport.sendto(data) works without per-datagram addrs. The reader_task is TCP-only. - _write_packet branches between writer.write and udp_transport.sendto - request() loops up to (1 + udp_retry_count) attempts on UDP, retrying on RequestTimeoutError; TCP gets a single attempt (existing behavior) - New _OmniDatagramProtocol that decodes each datagram into a Packet and delegates to the shared _dispatch (which already knows how to route handshake / solicited / unsolicited) src/omni_pca/mock_panel.py: - serve(transport='tcp'|'udp') public arg; defaults preserve existing TCP behavior. Internally splits into _serve_tcp / _serve_udp. - New _MockServerDatagramProtocol that mirrors _handle_client for UDP. Tracks one active client by addr (single-session, matches Omni's single-client constraint). Reuses the panel's existing _dispatch_v2, _reply_*, _build_* helpers — the dispatch logic is unchanged, only the transport framing differs. - New _schedule_udp_push for synthesized SystemEvents (seq=0) push to the active client's addr after state mutations. src/omni_pca/client.py: - OmniClient gains transport= and udp_retry_count= kwargs that pass through to OmniConnection. Default is 'tcp' so existing callers are unaffected. tests/test_e2e_udp.py — 6 e2e tests: - handshake roundtrip - get_system_information - arm area with right code - arm with wrong code -> CommandFailedError - turn unit on -> push UnitStateChanged event - wrong ControllerKey -> HandshakeError All run under 0.2s. Combined with the existing TCP suite: 357 tests pass (was 351), ruff clean across src/ tests/. The HA integration's config_flow still defaults to TCP; users on UDP panels can manually set transport= via the OmniClient init path. A follow-up commit will add transport to the HA config flow as a dropdown option. --- src/omni_pca/client.py | 10 +- src/omni_pca/connection.py | 159 +++++++++++++++++++---- src/omni_pca/mock_panel.py | 254 ++++++++++++++++++++++++++++++++++++- tests/test_e2e_udp.py | 152 ++++++++++++++++++++++ 4 files changed, 544 insertions(+), 31 deletions(-) create mode 100644 tests/test_e2e_udp.py diff --git a/src/omni_pca/client.py b/src/omni_pca/client.py index 77a9904..934da45 100644 --- a/src/omni_pca/client.py +++ b/src/omni_pca/client.py @@ -20,7 +20,7 @@ import struct from collections.abc import AsyncIterator, Awaitable, Callable, Sequence from enum import IntEnum from types import TracebackType -from typing import TYPE_CHECKING, Self +from typing import TYPE_CHECKING, Literal, Self from .commands import Command, CommandFailedError, SecurityCommandResponse @@ -120,12 +120,20 @@ class OmniClient: *, controller_key: bytes, timeout: float = 5.0, + transport: Literal["tcp", "udp"] = "tcp", + udp_retry_count: int = 3, ) -> None: + """``transport='udp'`` if your panel is configured for the + ``Network_UDP`` connection type (some firmware versions and the + default for many installs). ``udp_retry_count`` is ignored on TCP. + """ self._conn = OmniConnection( host=host, port=port, controller_key=controller_key, timeout=timeout, + transport=transport, + udp_retry_count=udp_retry_count, ) self._subscriber_task: asyncio.Task[None] | None = None diff --git a/src/omni_pca/connection.py b/src/omni_pca/connection.py index 5f59301..5e89ce5 100644 --- a/src/omni_pca/connection.py +++ b/src/omni_pca/connection.py @@ -26,6 +26,7 @@ import logging from collections.abc import AsyncIterator from enum import IntEnum from types import TracebackType +from typing import Literal from .crypto import ( BLOCK_SIZE, @@ -104,18 +105,30 @@ class OmniConnection: port: int = _DEFAULT_PORT, controller_key: bytes = b"", timeout: float = 5.0, + transport: Literal["tcp", "udp"] = "tcp", + udp_retry_count: int = 3, ) -> None: if len(controller_key) != 16: raise ValueError( f"controller_key must be 16 bytes, got {len(controller_key)}" ) + if transport not in ("tcp", "udp"): + raise ValueError(f"transport must be 'tcp' or 'udp', got {transport!r}") self._host = host self._port = port self._controller_key = bytes(controller_key) self._default_timeout = timeout + self._transport_kind: Literal["tcp", "udp"] = transport + self._udp_retry_count = max(0, udp_retry_count) + # TCP transport state self._reader: asyncio.StreamReader | None = None self._writer: asyncio.StreamWriter | None = None + + # UDP transport state (asyncio.DatagramTransport + our protocol) + self._udp_transport: asyncio.DatagramTransport | None = None + self._udp_protocol: _OmniDatagramProtocol | None = None + self._state = ConnectionState.DISCONNECTED self._session_id: bytes | None = None @@ -167,22 +180,41 @@ class OmniConnection: await self.close() async def connect(self) -> None: - """Open the TCP socket and run the 4-step secure-session handshake.""" + """Open the socket and run the 4-step secure-session handshake. + + Transport is set by the ``transport=`` constructor arg. TCP opens + a stream socket; UDP opens a datagram endpoint. Either way, the + handshake (and everything else) speaks the same Packet/Message + format and crypto. + """ if self._state is not ConnectionState.DISCONNECTED: raise ConnectionError(f"already connecting/connected (state={self._state})") self._state = ConnectionState.CONNECTING try: - self._reader, self._writer = await asyncio.wait_for( - asyncio.open_connection(self._host, self._port), - timeout=self._default_timeout, - ) + if self._transport_kind == "tcp": + self._reader, self._writer = await asyncio.wait_for( + asyncio.open_connection(self._host, self._port), + timeout=self._default_timeout, + ) + self._reader_task = asyncio.create_task( + self._read_loop(), name=f"omni-conn-reader-{self._host}" + ) + else: + # UDP: connectionless. We "connect" the datagram socket to + # the panel so we can reject stray datagrams from elsewhere + # and use plain `transport.sendto(data)`. + loop = asyncio.get_running_loop() + self._udp_transport, self._udp_protocol = ( + await loop.create_datagram_endpoint( + lambda: _OmniDatagramProtocol(self), + remote_addr=(self._host, self._port), + ) + ) except (TimeoutError, OSError) as exc: self._state = ConnectionState.DISCONNECTED - raise ConnectionError(f"failed to open TCP socket: {exc}") from exc - - self._reader_task = asyncio.create_task( - self._read_loop(), name=f"omni-conn-reader-{self._host}" - ) + raise ConnectionError( + f"failed to open {self._transport_kind.upper()} socket: {exc}" + ) from exc try: await self._do_handshake() @@ -212,6 +244,12 @@ class OmniConnection: self._writer = None self._reader = None + if self._udp_transport is not None: + with contextlib.suppress(OSError): + self._udp_transport.close() + self._udp_transport = None + self._udp_protocol = None + if self._reader_task is not None and not self._reader_task.done(): self._reader_task.cancel() with contextlib.suppress(asyncio.CancelledError, Exception): @@ -238,17 +276,39 @@ class OmniConnection: f"cannot send request, connection state={self._state.name}" ) message = encode_v2(opcode, payload) - seq, fut = self._send_encrypted(message) - try: - reply_packet = await asyncio.wait_for( - fut, timeout if timeout is not None else self._default_timeout - ) - except TimeoutError as exc: - self._pending.pop(seq, None) - raise RequestTimeoutError( - f"no reply for opcode={int(opcode)} seq={seq}" - ) from exc - return self._decode_inner(reply_packet) + per_attempt_timeout = timeout if timeout is not None else self._default_timeout + # UDP needs explicit retries since datagram delivery is best-effort. + # TCP gets reliable delivery for free; we still keep retry_count for + # API uniformity but it defaults to 0 effectively. + max_attempts = ( + 1 + self._udp_retry_count if self._transport_kind == "udp" else 1 + ) + last_exc: Exception | None = None + for attempt in range(1, max_attempts + 1): + seq, fut = self._send_encrypted(message) + try: + reply_packet = await asyncio.wait_for(fut, per_attempt_timeout) + except TimeoutError as exc: + last_exc = exc + self._pending.pop(seq, None) + if attempt < max_attempts: + _log.debug( + "udp retry %d/%d on opcode=%d seq=%d", + attempt, + max_attempts, + int(opcode), + seq, + ) + continue + raise RequestTimeoutError( + f"no reply for opcode={int(opcode)} " + f"after {max_attempts} attempt(s)" + ) from last_exc + return self._decode_inner(reply_packet) + # Loop exit without return only via re-raised timeout above. + raise RequestTimeoutError( + f"request loop exited without reply for opcode={int(opcode)}" + ) def unsolicited(self) -> AsyncIterator[Message]: """Async iterator over unsolicited inbound messages (seq=0).""" @@ -380,17 +440,23 @@ class OmniConnection: return seq, fut def _write_packet(self, pkt: Packet, *, encrypted: bool = False) -> None: - if self._writer is None: - raise ConnectionError("transport not open") wire = pkt.encode() _log.debug( - "TX seq=%d type=%s len=%d encrypted=%s", + "TX[%s] seq=%d type=%s len=%d encrypted=%s", + self._transport_kind, pkt.seq, pkt.type.name, len(pkt.data), encrypted, ) - self._writer.write(wire) + if self._transport_kind == "tcp": + if self._writer is None: + raise ConnectionError("transport not open") + self._writer.write(wire) + else: + if self._udp_transport is None: + raise ConnectionError("transport not open") + self._udp_transport.sendto(wire) def _decode_inner(self, pkt: Packet) -> Message: """Decrypt + parse the inner ``Message`` from an OmniLink2Message packet.""" @@ -596,3 +662,46 @@ class OmniConnection: return if not fut.done(): fut.set_result(pkt) + + +# -------------------------------------------------------------------------- +# UDP transport +# -------------------------------------------------------------------------- + + +class _OmniDatagramProtocol(asyncio.DatagramProtocol): + """asyncio.DatagramProtocol bound to a single OmniConnection. + + Each datagram received on a UDP Omni socket *is* one complete Packet + (no stream framing — that is the whole reason UDP is useful here). + We just decode it and hand it to the connection's dispatcher. + """ + + def __init__(self, conn: OmniConnection) -> None: + self._conn = conn + + def connection_made(self, transport: asyncio.BaseTransport) -> None: + # transport is a DatagramTransport in this codepath. + pass + + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: + # Each datagram is a complete Packet — no stream framing. + # The TCP _dispatch already handles handshake routing, solicited + # replies, and unsolicited push, so we just delegate. + try: + pkt = Packet.decode(data) + except Exception as exc: + _log.warning("dropping malformed UDP datagram: %s", exc) + return + try: + self._conn._dispatch(pkt) + except Exception: + _log.exception("UDP dispatch crashed for seq=%d", pkt.seq) + + def error_received(self, exc: Exception) -> None: + _log.warning("UDP socket error: %s", exc) + + def connection_lost(self, exc: Exception | None) -> None: + if exc is not None: + _log.warning("UDP transport lost: %s", exc) + diff --git a/src/omni_pca/mock_panel.py b/src/omni_pca/mock_panel.py index 6a42b79..491bd0f 100644 --- a/src/omni_pca/mock_panel.py +++ b/src/omni_pca/mock_panel.py @@ -333,21 +333,41 @@ class MockPanel: @asynccontextmanager async def serve( - self, host: str = "127.0.0.1", port: int = 0 + self, + host: str = "127.0.0.1", + port: int = 0, + transport: str = "tcp", + ) -> AsyncIterator[tuple[str, int]]: + """Start listening; yield ``(host, actual_port)``; tear down on exit. + + ``transport='tcp'`` (default) opens a stream server matching modern + PC Access. ``transport='udp'`` opens a datagram socket — the panel + path used by some firmware versions and by the Omni-Link II + ``Network_UDP`` configuration. Same wire format either way. + """ + if transport == "tcp": + async with self._serve_tcp(host, port) as bound: + yield bound + elif transport == "udp": + async with self._serve_udp(host, port) as bound: + yield bound + else: + raise ValueError(f"transport must be 'tcp' or 'udp', got {transport!r}") + + @asynccontextmanager + async def _serve_tcp( + self, host: str, port: int ) -> AsyncIterator[tuple[str, int]]: - """Start listening; yield ``(host, actual_port)``; tear down on exit.""" server = await asyncio.start_server(self._handle_client, host=host, port=port) sockets = server.sockets or () if not sockets: # pragma: no cover -- start_server always populates this raise RuntimeError("asyncio.start_server returned no sockets") bound_host, bound_port = sockets[0].getsockname()[:2] - _log.debug("mock panel listening on %s:%d", bound_host, bound_port) + _log.debug("mock panel (tcp) listening on %s:%d", bound_host, bound_port) try: async with server: yield bound_host, bound_port finally: - # Cancel any in-flight push tasks so the test event loop - # tears down cleanly. for t in list(self._push_tasks): if not t.done(): t.cancel() @@ -356,6 +376,27 @@ class MockPanel: with contextlib.suppress(Exception): # pragma: no cover await server.wait_closed() + @asynccontextmanager + async def _serve_udp( + self, host: str, port: int + ) -> AsyncIterator[tuple[str, int]]: + loop = asyncio.get_running_loop() + transport, _protocol = await loop.create_datagram_endpoint( + lambda: _MockServerDatagramProtocol(self), + local_addr=(host, port), + ) + sockname = transport.get_extra_info("sockname") + bound_host, bound_port = sockname[0], sockname[1] + _log.debug("mock panel (udp) listening on %s:%d", bound_host, bound_port) + try: + yield bound_host, bound_port + finally: + for t in list(self._push_tasks): + if not t.done(): + t.cancel() + self._push_tasks.clear() + transport.close() + # -------- connection handling -------- async def _handle_client( @@ -1226,3 +1267,206 @@ async def _read_exact(reader: asyncio.StreamReader, n: int) -> bytes | None: return await reader.readexactly(n) except asyncio.IncompleteReadError: return None + + +# -------------------------------------------------------------------------- +# UDP server protocol +# -------------------------------------------------------------------------- + + +class _MockServerDatagramProtocol(asyncio.DatagramProtocol): + """Datagram-side implementation of the mock panel. + + Tracks a single active client (the most-recent peer to issue a + ``ClientRequestNewSession``) and routes its session state. The same + reply-builder helpers (``_reply_system_information``, + ``_build_zone_properties``, etc.) on the parent ``MockPanel`` are + reused for both transports — the only thing UDP-specific here is + framing (each datagram is one complete Packet) and the absence of + per-block stream reads. + """ + + def __init__(self, panel: MockPanel) -> None: + self._panel = panel + self._transport: asyncio.DatagramTransport | None = None + self._client_addr: tuple[str, int] | None = None + self._session_id: bytes | None = None + self._session_key: bytes | None = None + + def connection_made(self, transport: asyncio.BaseTransport) -> None: + self._transport = transport # type: ignore[assignment] + + def connection_lost(self, exc: Exception | None) -> None: + if exc is not None: + _log.warning("mock panel (udp) transport lost: %s", exc) + + def error_received(self, exc: Exception) -> None: + _log.warning("mock panel (udp) socket error: %s", exc) + + def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: + # Each datagram is a complete Packet. Spawn a task so we can do + # async work (drain push-event tasks, etc.) without blocking the + # protocol callback. We track the task so the GC doesn't drop the + # reference mid-flight (RUF006). + task = asyncio.create_task( + self._handle_datagram(data, addr), + name=f"mock-panel-udp-rx-{addr[0]}-{addr[1]}", + ) + self._panel._push_tasks.add(task) + task.add_done_callback(self._panel._push_tasks.discard) + + async def _handle_datagram(self, data: bytes, addr: tuple[str, int]) -> None: + try: + pkt = Packet.decode(data) + except Exception as exc: + _log.warning("mock panel (udp) malformed datagram from %s: %s", addr, exc) + return + + try: + await self._dispatch_packet(pkt, addr) + except Exception: + _log.exception("mock panel (udp) crashed on packet seq=%d", pkt.seq) + + def _send(self, pkt: Packet, addr: tuple[str, int]) -> None: + if self._transport is None: + return + self._transport.sendto(pkt.encode(), addr) + + async def _dispatch_packet( + self, pkt: Packet, addr: tuple[str, int] + ) -> None: + if pkt.type is PacketType.ClientRequestNewSession: + session_id = self._panel._session_id_provider() + self._session_id = session_id + self._session_key = derive_session_key( + self._panel._controller_key, session_id + ) + self._client_addr = addr + payload = bytes([_PROTO_HI, _PROTO_LO]) + session_id + self._send( + Packet( + seq=pkt.seq, + type=PacketType.ControllerAckNewSession, + data=payload, + ), + addr, + ) + return + + if pkt.type is PacketType.ClientRequestSecureSession: + if self._session_key is None or self._session_id is None: + _log.debug("mock panel (udp) secure-session before NewSession") + return + try: + plaintext = decrypt_message_payload( + pkt.data, pkt.seq, self._session_key + ) + except Exception: + _log.debug("mock panel (udp) failed to decrypt secure-session") + return + if not plaintext.startswith(self._session_id): + self._send( + Packet( + seq=pkt.seq, + type=PacketType.ControllerSessionTerminated, + data=b"", + ), + addr, + ) + return + ciphertext_out = encrypt_message_payload( + self._session_id, pkt.seq, self._session_key + ) + self._send( + Packet( + seq=pkt.seq, + type=PacketType.ControllerAckSecureSession, + data=ciphertext_out, + ), + addr, + ) + self._panel._session_count += 1 + return + + if pkt.type is PacketType.ClientSessionTerminated: + self._session_id = None + self._session_key = None + self._client_addr = None + return + + if pkt.type is PacketType.OmniLink2Message: + if self._session_key is None: + _log.debug("mock panel (udp) encrypted message before secure session") + return + await self._handle_encrypted_udp(pkt, addr) + return + + _log.debug("mock panel (udp) unhandled packet type %s", pkt.type.name) + + async def _handle_encrypted_udp( + self, pkt: Packet, addr: tuple[str, int] + ) -> None: + assert self._session_key is not None + try: + plaintext = decrypt_message_payload( + pkt.data, pkt.seq, self._session_key + ) + except Exception: + _log.debug("mock panel (udp) failed to decrypt v2 message") + return + try: + inner = Message.decode(plaintext) + except (MessageCrcError, MessageFormatError): + await self._send_reply(pkt.seq, _build_nak(0), addr) + return + + opcode = inner.opcode + self._panel._last_request_opcode = opcode + # _dispatch_v2 is the same opcode-dispatch table the TCP path uses, + # returning (reply_message, push_event_words) so we can write the + # synchronous reply first then schedule an unsolicited push. + reply, push_words = self._panel._dispatch_v2(opcode, inner.payload) + await self._send_reply(pkt.seq, reply, addr) + if push_words: + self._schedule_udp_push(push_words, addr) + + def _schedule_udp_push( + self, words: tuple[int, ...], addr: tuple[str, int] + ) -> None: + """Fire-and-forget unsolicited SystemEvents push to ``addr``.""" + assert self._session_key is not None + session_key = self._session_key + + async def _push() -> None: + await asyncio.sleep(_PUSH_DELAY) + try: + msg = _build_system_events_message(words) + plaintext = msg.encode() + ciphertext = encrypt_message_payload(plaintext, 0, session_key) + pkt = Packet( + seq=0, + type=PacketType.OmniLink2Message, + data=ciphertext, + ) + self._send(pkt, addr) + except (ConnectionError, asyncio.CancelledError): + pass + except Exception: # pragma: no cover - diagnostic only + _log.exception("mock panel (udp) failed to push event") + + task = asyncio.create_task(_push(), name="mock-panel-udp-event-push") + self._panel._push_tasks.add(task) + task.add_done_callback(self._panel._push_tasks.discard) + + async def _send_reply( + self, client_seq: int, message: Message, addr: tuple[str, int] + ) -> None: + assert self._session_key is not None + plaintext = message.encode() + ciphertext = encrypt_message_payload(plaintext, client_seq, self._session_key) + pkt = Packet( + seq=client_seq, + type=PacketType.OmniLink2Message, + data=ciphertext, + ) + self._send(pkt, addr) diff --git a/tests/test_e2e_udp.py b/tests/test_e2e_udp.py new file mode 100644 index 0000000..e8b9e89 --- /dev/null +++ b/tests/test_e2e_udp.py @@ -0,0 +1,152 @@ +"""End-to-end: OmniClient ↔ MockPanel over UDP. + +Mirrors test_e2e_client_mock.py but with ``transport='udp'`` on both +sides. The protocol/encryption/handshake bytes are identical to TCP; +this proves only the transport layer change is sound. +""" + +from __future__ import annotations + +import asyncio +import secrets + +import pytest + +from omni_pca.client import ObjectType, OmniClient +from omni_pca.commands import CommandFailedError +from omni_pca.connection import ConnectionState, HandshakeError, OmniConnection +from omni_pca.events import UnitStateChanged +from omni_pca.mock_panel import ( + MockAreaState, + MockButtonState, + MockPanel, + MockState, + MockThermostatState, + MockUnitState, + MockZoneState, +) +from omni_pca.models import ( + AreaStatus, + SecurityMode, +) +from omni_pca.opcodes import OmniLink2MessageType + +CONTROLLER_KEY = bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09") + + +def _populated_state() -> MockState: + return MockState( + zones={1: MockZoneState(name="FRONT_DOOR")}, + units={1: MockUnitState(name="LIVING_LAMP")}, + areas={1: MockAreaState(name="MAIN")}, + thermostats={1: MockThermostatState(name="LIVING")}, + buttons={1: MockButtonState(name="GOOD_MORNING")}, + user_codes={1: 1234}, + ) + + +async def test_udp_handshake_roundtrip() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with ( + panel.serve(transport="udp") as (host, port), + OmniConnection( + host=host, + port=port, + controller_key=CONTROLLER_KEY, + transport="udp", + timeout=2.0, + ) as conn, + ): + assert conn.state is ConnectionState.ONLINE + assert panel.session_count == 1 + + +async def test_udp_get_system_information() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with ( + panel.serve(transport="udp") as (host, port), + OmniConnection( + host=host, + port=port, + controller_key=CONTROLLER_KEY, + transport="udp", + timeout=2.0, + ) as conn, + ): + reply = await conn.request(OmniLink2MessageType.RequestSystemInformation) + assert reply.opcode == int(OmniLink2MessageType.SystemInformation) + # First payload byte is the model byte. + assert reply.payload[0] == 16 # OMNI_PRO_II + + +async def test_udp_arm_area_with_correct_code() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with ( + panel.serve(transport="udp") as (host, port), + OmniClient( + host=host, + port=port, + controller_key=CONTROLLER_KEY, + transport="udp", + timeout=2.0, + ) as client, + ): + await client.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=1234, + ) + statuses = await client.get_object_status(ObjectType.AREA, 1) + assert len(statuses) == 1 + area = statuses[0] + assert isinstance(area, AreaStatus) + assert area.mode == int(SecurityMode.AWAY) + + +async def test_udp_arm_with_wrong_code_raises() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with panel.serve(transport="udp") as (host, port): + with pytest.raises(CommandFailedError): + async with OmniClient( + host=host, + port=port, + controller_key=CONTROLLER_KEY, + transport="udp", + timeout=2.0, + ) as client: + await client.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=9999, + ) + + +async def test_udp_unit_on_pushes_state_changed_event() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state()) + async with ( + panel.serve(transport="udp") as (host, port), + OmniClient( + host=host, + port=port, + controller_key=CONTROLLER_KEY, + transport="udp", + timeout=2.0, + ) as client, + ): + events = client.events() + await client.turn_unit_on(1) + ev = await asyncio.wait_for(events.__anext__(), timeout=1.0) + assert isinstance(ev, UnitStateChanged) + assert ev.unit_index == 1 + assert ev.is_on is True + + +async def test_udp_wrong_key_fails_handshake() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY) + wrong_key = secrets.token_bytes(16) + async with panel.serve(transport="udp") as (host, port): + with pytest.raises(HandshakeError): + async with OmniConnection( + host=host, + port=port, + controller_key=wrong_key, + transport="udp", + timeout=2.0, + ): + pass