Compare commits

..

No commits in common. "main" and "v2026.5.10" have entirely different histories.

9 changed files with 50 additions and 674 deletions

View File

@ -4,9 +4,6 @@ Async Python client for HAI/Leviton Omni-Link II home automation panels — Omni
Includes a Home Assistant custom component (`custom_components/omni_pca/`). Includes a Home Assistant custom component (`custom_components/omni_pca/`).
**Project home:** <https://git.supported.systems/warehack.ing/omni-pca>
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
## Status ## Status
**Alpha.** Built from a full reverse-engineering of HAI's PC Access 3.17 (the Windows installer/programmer app). The protocol layer captures two non-public quirks that public Omni-Link clients miss: **Alpha.** Built from a full reverse-engineering of HAI's PC Access 3.17 (the Windows installer/programmer app). The protocol layer captures two non-public quirks that public Omni-Link clients miss:
@ -14,27 +11,14 @@ Includes a Home Assistant custom component (`custom_components/omni_pca/`).
1. **Session key is not the ControllerKey.** Last 5 bytes are XORed with a controller-supplied SessionID nonce. 1. **Session key is not the ControllerKey.** Last 5 bytes are XORed with a controller-supplied SessionID nonce.
2. **Per-block XOR pre-whitening before AES.** First two bytes of every 16-byte block are XORed with the packet's sequence number. 2. **Per-block XOR pre-whitening before AES.** First two bytes of every 16-byte block are XORed with the packet's sequence number.
The full byte-level protocol spec lives at <https://hai-omni-pro-ii.warehack.ing/reference/protocol/>. See [`docs/PROTOCOL.md`](docs/PROTOCOL.md) for the full byte-level spec.
## Install
The library isn't on PyPI yet (pending), so install directly from the Gitea release:
```bash
# Pinned to a specific release (recommended)
pip install "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
# Or the wheel from the release page
pip install https://git.supported.systems/warehack.ing/omni-pca/releases/download/v2026.5.10/omni_pca-2026.5.10-py3-none-any.whl
# Or with uv
uv add "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
```
Once published to PyPI, the canonical install will be `pip install omni-pca`.
## Quick start (library) ## Quick start (library)
```bash
uv add omni-pca
```
```python ```python
import asyncio import asyncio
from omni_pca import OmniClient from omni_pca import OmniClient
@ -45,71 +29,44 @@ async def main():
port=4369, port=4369,
controller_key=bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09"), controller_key=bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09"),
) as panel: ) as panel:
info = await panel.get_system_information() info = await panel.get_system_info()
print(info.model_name, info.firmware_version) print(info.model_name, info.firmware_version)
asyncio.run(main()) asyncio.run(main())
``` ```
For the panel walkthrough — connect, list zones, react to push events — see the [tutorial](https://hai-omni-pro-ii.warehack.ing/tutorials/first-script/).
## Quick start (Home Assistant) ## Quick start (Home Assistant)
```bash Copy `custom_components/omni_pca/` into your HA `config/custom_components/`, restart HA, then add the integration via Settings → Devices & Services. You'll need:
# Manual install — works on every HA flavour
cd /path/to/your/homeassistant/config/
mkdir -p custom_components
cd custom_components
git clone https://git.supported.systems/warehack.ing/omni-pca tmp-omni
cp -r tmp-omni/custom_components/omni_pca .
rm -rf tmp-omni
```
Restart HA, then add the integration via **Settings → Devices & Services**. You'll need:
- Panel IP / hostname - Panel IP / hostname
- TCP port (default 4369) - TCP port (default 4369)
- ControllerKey as 32 hex chars - ControllerKey as 32 hex chars
Get the ControllerKey from your `.pca` file using the bundled CLI: Get the ControllerKey from your `.pca` file using the included parser:
```bash ```bash
omni-pca decode-pca '/path/to/Your.pca' --field controller_key uvx --from omni-pca omni-pca decode-pca path/to/Your.pca --field controller_key
``` ```
The integration creates one HA device per panel plus typed entities for every named object on the controller: `alarm_control_panel` for areas, `light` for units, `binary_sensor` + `switch` for zones (state + bypass), `climate` for thermostats, `sensor` for analog zones and panel telemetry, `button` for panel macros, and `event` for the typed push-notification stream. See [`custom_components/omni_pca/README.md`](custom_components/omni_pca/README.md) for the full entity + service catalog, or the [HA install how-to](https://hai-omni-pro-ii.warehack.ing/how-to/install-in-home-assistant/) for the step-by-step. The integration creates one HA device per panel plus typed entities for every named object on the controller: `alarm_control_panel` for areas, `light` for units, `binary_sensor`/`switch` for zones (state + bypass), `climate` for thermostats, `sensor` for analog zones and panel telemetry, `button` for panel macros, and `event` for the typed push-notification stream. See [`custom_components/omni_pca/README.md`](custom_components/omni_pca/README.md) for the entity table and service list.
## Without a panel — mock controller ## Without a panel — mock controller
The library ships a stateful `MockPanel` that emulates the controller side of the protocol over real TCP. Useful for offline development, integration tests, and demos: For testing, the library ships a minimal Omni controller emulator:
```python ```python
from omni_pca.mock_panel import MockPanel from omni_pca.mock_panel import MockPanel
async with MockPanel(controller_key=...).serve(port=14369): async with MockPanel(controller_key=...).serve(port=14369):
# Connect a real OmniClient to localhost:14369 — full handshake + AES # connect a real OmniClient to localhost:14369 — works end-to-end
... ...
``` ```
The local dev stack (`dev/docker-compose.yml`) packages a real Home Assistant container and the mock panel side-by-side so you can click through the integration without touching real hardware. See [the dev-stack tutorial](https://hai-omni-pro-ii.warehack.ing/tutorials/dev-stack/).
## Tests
```bash
uv sync --group ha
uv run pytest -q
```
351 tests across the protocol primitives, the mock panel, the OmniClient ↔ MockPanel end-to-end roundtrip, and an in-process Home Assistant harness driving the integration via the real config flow + service calls.
## Versioning ## Versioning
Date-based ([CalVer](https://calver.org/)): `YYYY.M.D`. Bumped on backwards-incompatible changes. See [`CHANGELOG.md`](CHANGELOG.md). Date-based ([CalVer](https://calver.org/)): `YYYY.M.D`. Bumped on backwards-incompatible changes.
## License
MIT. See [`LICENSE`](LICENSE).
## Acknowledgments ## Acknowledgments
This client is independent and not affiliated with Leviton or HAI. Protocol details derived from clean-room analysis of the publicly-distributed PC Access installer. The reverse-engineering arc is documented at <https://hai-omni-pro-ii.warehack.ing/journey/>. This client is independent and not affiliated with Leviton or HAI. Protocol details derived from clean-room analysis of the publicly-distributed PC Access installer.

View File

@ -14,13 +14,7 @@ from typing import TYPE_CHECKING
from homeassistant.const import CONF_HOST, CONF_PORT, Platform from homeassistant.const import CONF_HOST, CONF_PORT, Platform
from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.exceptions import ConfigEntryNotReady
from .const import ( from .const import CONF_CONTROLLER_KEY, DOMAIN, LOGGER
CONF_CONTROLLER_KEY,
CONF_TRANSPORT,
DEFAULT_TRANSPORT,
DOMAIN,
LOGGER,
)
from .coordinator import OmniDataUpdateCoordinator from .coordinator import OmniDataUpdateCoordinator
from .services import async_setup_services, async_unload_services from .services import async_setup_services, async_unload_services
@ -56,14 +50,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
LOGGER.error("stored controller key for %s is corrupt: %s", entry.title, err) LOGGER.error("stored controller key for %s is corrupt: %s", entry.title, err)
return False return False
transport: str = entry.data.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
coordinator = OmniDataUpdateCoordinator( coordinator = OmniDataUpdateCoordinator(
hass, hass,
entry, entry,
host=host, host=host,
port=port, port=port,
controller_key=controller_key, controller_key=controller_key,
transport=transport,
) )
try: try:

View File

@ -20,14 +20,10 @@ from omni_pca.connection import (
from .const import ( from .const import (
CONF_CONTROLLER_KEY, CONF_CONTROLLER_KEY,
CONF_TRANSPORT,
CONTROLLER_KEY_HEX_LEN, CONTROLLER_KEY_HEX_LEN,
DEFAULT_PORT, DEFAULT_PORT,
DEFAULT_TRANSPORT,
DOMAIN, DOMAIN,
LOGGER, LOGGER,
TRANSPORT_TCP,
TRANSPORT_UDP,
) )
@ -64,12 +60,6 @@ _USER_SCHEMA = vol.Schema(
vol.Coerce(int), vol.Range(min=1, max=65535) vol.Coerce(int), vol.Range(min=1, max=65535)
), ),
vol.Required(CONF_CONTROLLER_KEY): str, vol.Required(CONF_CONTROLLER_KEY): str,
# Most modern firmware uses TCP; some installers configure
# Network_UDP. PC Access stores the choice as
# enuPreferredNetworkProtocol in the .pca config.
vol.Required(CONF_TRANSPORT, default=DEFAULT_TRANSPORT): vol.In(
[TRANSPORT_TCP, TRANSPORT_UDP]
),
} }
) )
@ -89,7 +79,6 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
if user_input is not None: if user_input is not None:
host: str = user_input[CONF_HOST].strip() host: str = user_input[CONF_HOST].strip()
port: int = user_input[CONF_PORT] port: int = user_input[CONF_PORT]
transport: str = user_input.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
unique_id = f"{host}:{port}" unique_id = f"{host}:{port}"
await self.async_set_unique_id(unique_id) await self.async_set_unique_id(unique_id)
@ -101,7 +90,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
LOGGER.debug("controller key rejected: %s", err) LOGGER.debug("controller key rejected: %s", err)
errors[CONF_CONTROLLER_KEY] = "invalid_key" errors[CONF_CONTROLLER_KEY] = "invalid_key"
else: else:
title, error = await self._probe(host, port, key, transport) title, error = await self._probe(host, port, key)
if error is not None: if error is not None:
errors["base"] = error errors["base"] = error
else: else:
@ -111,7 +100,6 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
CONF_HOST: host, CONF_HOST: host,
CONF_PORT: port, CONF_PORT: port,
CONF_CONTROLLER_KEY: key.hex(), CONF_CONTROLLER_KEY: key.hex(),
CONF_TRANSPORT: transport,
}, },
) )
@ -133,9 +121,6 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
assert self._reauth_entry_data is not None assert self._reauth_entry_data is not None
host: str = self._reauth_entry_data[CONF_HOST] host: str = self._reauth_entry_data[CONF_HOST]
port: int = self._reauth_entry_data[CONF_PORT] port: int = self._reauth_entry_data[CONF_PORT]
transport: str = self._reauth_entry_data.get(
CONF_TRANSPORT, DEFAULT_TRANSPORT
)
errors: dict[str, str] = {} errors: dict[str, str] = {}
if user_input is not None: if user_input is not None:
@ -144,7 +129,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
except InvalidControllerKey: except InvalidControllerKey:
errors[CONF_CONTROLLER_KEY] = "invalid_key" errors[CONF_CONTROLLER_KEY] = "invalid_key"
else: else:
_, error = await self._probe(host, port, key, transport) _, error = await self._probe(host, port, key)
if error is not None: if error is not None:
errors["base"] = error errors["base"] = error
else: else:
@ -162,20 +147,11 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
# ---- helpers --------------------------------------------------------- # ---- helpers ---------------------------------------------------------
async def _probe( async def _probe(
self, self, host: str, port: int, key: bytes
host: str,
port: int,
key: bytes,
transport: str = DEFAULT_TRANSPORT,
) -> tuple[str | None, str | None]: ) -> tuple[str | None, str | None]:
"""Try to connect once. Returns (title, error_code).""" """Try to connect once. Returns (title, error_code)."""
try: try:
async with OmniClient( async with OmniClient(host, port=port, controller_key=key) as client:
host,
port=port,
controller_key=key,
transport=transport, # type: ignore[arg-type]
) as client:
info = await client.get_system_information() info = await client.get_system_information()
except (HandshakeError, InvalidEncryptionKeyError): except (HandshakeError, InvalidEncryptionKeyError):
return None, "invalid_auth" return None, "invalid_auth"

View File

@ -12,11 +12,6 @@ DEFAULT_PORT: Final = 4369
DEFAULT_TIMEOUT: Final = 5.0 DEFAULT_TIMEOUT: Final = 5.0
CONF_CONTROLLER_KEY: Final = "controller_key" CONF_CONTROLLER_KEY: Final = "controller_key"
CONF_TRANSPORT: Final = "transport"
TRANSPORT_TCP: Final = "tcp"
TRANSPORT_UDP: Final = "udp"
DEFAULT_TRANSPORT: Final = TRANSPORT_TCP
MANUFACTURER: Final = "HAI / Leviton" MANUFACTURER: Final = "HAI / Leviton"

View File

@ -137,7 +137,6 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
host: str, host: str,
port: int, port: int,
controller_key: bytes, controller_key: bytes,
transport: str = "tcp",
) -> None: ) -> None:
super().__init__( super().__init__(
hass, hass,
@ -149,7 +148,6 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
self._host = host self._host = host
self._port = port self._port = port
self._controller_key = controller_key self._controller_key = controller_key
self._transport = transport
self._client: OmniClient | None = None self._client: OmniClient | None = None
self._discovery_done = False self._discovery_done = False
self._discovered: OmniData | None = None self._discovered: OmniData | None = None
@ -238,7 +236,6 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
self._host, self._host,
port=self._port, port=self._port,
controller_key=self._controller_key, controller_key=self._controller_key,
transport=self._transport, # type: ignore[arg-type]
) )
# Drive __aenter__ manually so the client survives across update # Drive __aenter__ manually so the client survives across update
# cycles; we close it explicitly on shutdown / failure. # cycles; we close it explicitly on shutdown / failure.

View File

@ -20,7 +20,7 @@ import struct
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from enum import IntEnum from enum import IntEnum
from types import TracebackType from types import TracebackType
from typing import TYPE_CHECKING, Literal, Self from typing import TYPE_CHECKING, Self
from .commands import Command, CommandFailedError, SecurityCommandResponse from .commands import Command, CommandFailedError, SecurityCommandResponse
@ -120,20 +120,12 @@ class OmniClient:
*, *,
controller_key: bytes, controller_key: bytes,
timeout: float = 5.0, timeout: float = 5.0,
transport: Literal["tcp", "udp"] = "tcp",
udp_retry_count: int = 3,
) -> None: ) -> None:
"""``transport='udp'`` if your panel is configured for the
``Network_UDP`` connection type (some firmware versions and the
default for many installs). ``udp_retry_count`` is ignored on TCP.
"""
self._conn = OmniConnection( self._conn = OmniConnection(
host=host, host=host,
port=port, port=port,
controller_key=controller_key, controller_key=controller_key,
timeout=timeout, timeout=timeout,
transport=transport,
udp_retry_count=udp_retry_count,
) )
self._subscriber_task: asyncio.Task[None] | None = None self._subscriber_task: asyncio.Task[None] | None = None

View File

@ -26,7 +26,6 @@ import logging
from collections.abc import AsyncIterator from collections.abc import AsyncIterator
from enum import IntEnum from enum import IntEnum
from types import TracebackType from types import TracebackType
from typing import Literal
from .crypto import ( from .crypto import (
BLOCK_SIZE, BLOCK_SIZE,
@ -105,30 +104,18 @@ class OmniConnection:
port: int = _DEFAULT_PORT, port: int = _DEFAULT_PORT,
controller_key: bytes = b"", controller_key: bytes = b"",
timeout: float = 5.0, timeout: float = 5.0,
transport: Literal["tcp", "udp"] = "tcp",
udp_retry_count: int = 3,
) -> None: ) -> None:
if len(controller_key) != 16: if len(controller_key) != 16:
raise ValueError( raise ValueError(
f"controller_key must be 16 bytes, got {len(controller_key)}" f"controller_key must be 16 bytes, got {len(controller_key)}"
) )
if transport not in ("tcp", "udp"):
raise ValueError(f"transport must be 'tcp' or 'udp', got {transport!r}")
self._host = host self._host = host
self._port = port self._port = port
self._controller_key = bytes(controller_key) self._controller_key = bytes(controller_key)
self._default_timeout = timeout self._default_timeout = timeout
self._transport_kind: Literal["tcp", "udp"] = transport
self._udp_retry_count = max(0, udp_retry_count)
# TCP transport state
self._reader: asyncio.StreamReader | None = None self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None self._writer: asyncio.StreamWriter | None = None
# UDP transport state (asyncio.DatagramTransport + our protocol)
self._udp_transport: asyncio.DatagramTransport | None = None
self._udp_protocol: _OmniDatagramProtocol | None = None
self._state = ConnectionState.DISCONNECTED self._state = ConnectionState.DISCONNECTED
self._session_id: bytes | None = None self._session_id: bytes | None = None
@ -180,41 +167,22 @@ class OmniConnection:
await self.close() await self.close()
async def connect(self) -> None: async def connect(self) -> None:
"""Open the socket and run the 4-step secure-session handshake. """Open the TCP socket and run the 4-step secure-session handshake."""
Transport is set by the ``transport=`` constructor arg. TCP opens
a stream socket; UDP opens a datagram endpoint. Either way, the
handshake (and everything else) speaks the same Packet/Message
format and crypto.
"""
if self._state is not ConnectionState.DISCONNECTED: if self._state is not ConnectionState.DISCONNECTED:
raise ConnectionError(f"already connecting/connected (state={self._state})") raise ConnectionError(f"already connecting/connected (state={self._state})")
self._state = ConnectionState.CONNECTING self._state = ConnectionState.CONNECTING
try: try:
if self._transport_kind == "tcp": self._reader, self._writer = await asyncio.wait_for(
self._reader, self._writer = await asyncio.wait_for( asyncio.open_connection(self._host, self._port),
asyncio.open_connection(self._host, self._port), timeout=self._default_timeout,
timeout=self._default_timeout, )
)
self._reader_task = asyncio.create_task(
self._read_loop(), name=f"omni-conn-reader-{self._host}"
)
else:
# UDP: connectionless. We "connect" the datagram socket to
# the panel so we can reject stray datagrams from elsewhere
# and use plain `transport.sendto(data)`.
loop = asyncio.get_running_loop()
self._udp_transport, self._udp_protocol = (
await loop.create_datagram_endpoint(
lambda: _OmniDatagramProtocol(self),
remote_addr=(self._host, self._port),
)
)
except (TimeoutError, OSError) as exc: except (TimeoutError, OSError) as exc:
self._state = ConnectionState.DISCONNECTED self._state = ConnectionState.DISCONNECTED
raise ConnectionError( raise ConnectionError(f"failed to open TCP socket: {exc}") from exc
f"failed to open {self._transport_kind.upper()} socket: {exc}"
) from exc self._reader_task = asyncio.create_task(
self._read_loop(), name=f"omni-conn-reader-{self._host}"
)
try: try:
await self._do_handshake() await self._do_handshake()
@ -227,36 +195,8 @@ class OmniConnection:
if self._closed: if self._closed:
return return
self._closed = True self._closed = True
previous_state = self._state
self._state = ConnectionState.DISCONNECTED self._state = ConnectionState.DISCONNECTED
# Politely tell the controller we're done — Omni is single-client,
# and on UDP it has no other way to know we've gone (TCP gets a
# FIN; UDP just sees datagrams stop). Without this, the panel
# holds the session slot until its idle timeout and rejects new
# connections from us with ControllerCannotStartNewSession.
if previous_state in (
ConnectionState.NEW_SESSION,
ConnectionState.SECURE,
ConnectionState.ONLINE,
):
try:
term_seq = self._claim_seq()
term = Packet(
seq=term_seq,
type=PacketType.ClientSessionTerminated,
data=b"",
)
self._write_packet(term)
# Best-effort flush so the byte hits the wire before we
# tear down the socket. UDP is fire-and-forget; TCP needs
# a tick for the writer to drain.
if self._writer is not None:
with contextlib.suppress(Exception):
await self._writer.drain()
except Exception as exc: # noqa: BLE001 - close() must be idempotent
_log.debug("close: failed to send ClientSessionTerminated: %s", exc)
# Cancel anyone still waiting for a reply. # Cancel anyone still waiting for a reply.
for fut in self._pending.values(): for fut in self._pending.values():
if not fut.done(): if not fut.done():
@ -272,12 +212,6 @@ class OmniConnection:
self._writer = None self._writer = None
self._reader = None self._reader = None
if self._udp_transport is not None:
with contextlib.suppress(OSError):
self._udp_transport.close()
self._udp_transport = None
self._udp_protocol = None
if self._reader_task is not None and not self._reader_task.done(): if self._reader_task is not None and not self._reader_task.done():
self._reader_task.cancel() self._reader_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception): with contextlib.suppress(asyncio.CancelledError, Exception):
@ -304,39 +238,17 @@ class OmniConnection:
f"cannot send request, connection state={self._state.name}" f"cannot send request, connection state={self._state.name}"
) )
message = encode_v2(opcode, payload) message = encode_v2(opcode, payload)
per_attempt_timeout = timeout if timeout is not None else self._default_timeout seq, fut = self._send_encrypted(message)
# UDP needs explicit retries since datagram delivery is best-effort. try:
# TCP gets reliable delivery for free; we still keep retry_count for reply_packet = await asyncio.wait_for(
# API uniformity but it defaults to 0 effectively. fut, timeout if timeout is not None else self._default_timeout
max_attempts = ( )
1 + self._udp_retry_count if self._transport_kind == "udp" else 1 except TimeoutError as exc:
) self._pending.pop(seq, None)
last_exc: Exception | None = None raise RequestTimeoutError(
for attempt in range(1, max_attempts + 1): f"no reply for opcode={int(opcode)} seq={seq}"
seq, fut = self._send_encrypted(message) ) from exc
try: return self._decode_inner(reply_packet)
reply_packet = await asyncio.wait_for(fut, per_attempt_timeout)
except TimeoutError as exc:
last_exc = exc
self._pending.pop(seq, None)
if attempt < max_attempts:
_log.debug(
"udp retry %d/%d on opcode=%d seq=%d",
attempt,
max_attempts,
int(opcode),
seq,
)
continue
raise RequestTimeoutError(
f"no reply for opcode={int(opcode)} "
f"after {max_attempts} attempt(s)"
) from last_exc
return self._decode_inner(reply_packet)
# Loop exit without return only via re-raised timeout above.
raise RequestTimeoutError(
f"request loop exited without reply for opcode={int(opcode)}"
)
def unsolicited(self) -> AsyncIterator[Message]: def unsolicited(self) -> AsyncIterator[Message]:
"""Async iterator over unsolicited inbound messages (seq=0).""" """Async iterator over unsolicited inbound messages (seq=0)."""
@ -468,23 +380,17 @@ class OmniConnection:
return seq, fut return seq, fut
def _write_packet(self, pkt: Packet, *, encrypted: bool = False) -> None: def _write_packet(self, pkt: Packet, *, encrypted: bool = False) -> None:
if self._writer is None:
raise ConnectionError("transport not open")
wire = pkt.encode() wire = pkt.encode()
_log.debug( _log.debug(
"TX[%s] seq=%d type=%s len=%d encrypted=%s", "TX seq=%d type=%s len=%d encrypted=%s",
self._transport_kind,
pkt.seq, pkt.seq,
pkt.type.name, pkt.type.name,
len(pkt.data), len(pkt.data),
encrypted, encrypted,
) )
if self._transport_kind == "tcp": self._writer.write(wire)
if self._writer is None:
raise ConnectionError("transport not open")
self._writer.write(wire)
else:
if self._udp_transport is None:
raise ConnectionError("transport not open")
self._udp_transport.sendto(wire)
def _decode_inner(self, pkt: Packet) -> Message: def _decode_inner(self, pkt: Packet) -> Message:
"""Decrypt + parse the inner ``Message`` from an OmniLink2Message packet.""" """Decrypt + parse the inner ``Message`` from an OmniLink2Message packet."""
@ -690,46 +596,3 @@ class OmniConnection:
return return
if not fut.done(): if not fut.done():
fut.set_result(pkt) fut.set_result(pkt)
# --------------------------------------------------------------------------
# UDP transport
# --------------------------------------------------------------------------
class _OmniDatagramProtocol(asyncio.DatagramProtocol):
"""asyncio.DatagramProtocol bound to a single OmniConnection.
Each datagram received on a UDP Omni socket *is* one complete Packet
(no stream framing that is the whole reason UDP is useful here).
We just decode it and hand it to the connection's dispatcher.
"""
def __init__(self, conn: OmniConnection) -> None:
self._conn = conn
def connection_made(self, transport: asyncio.BaseTransport) -> None:
# transport is a DatagramTransport in this codepath.
pass
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
# Each datagram is a complete Packet — no stream framing.
# The TCP _dispatch already handles handshake routing, solicited
# replies, and unsolicited push, so we just delegate.
try:
pkt = Packet.decode(data)
except Exception as exc:
_log.warning("dropping malformed UDP datagram: %s", exc)
return
try:
self._conn._dispatch(pkt)
except Exception:
_log.exception("UDP dispatch crashed for seq=%d", pkt.seq)
def error_received(self, exc: Exception) -> None:
_log.warning("UDP socket error: %s", exc)
def connection_lost(self, exc: Exception | None) -> None:
if exc is not None:
_log.warning("UDP transport lost: %s", exc)

View File

@ -333,41 +333,21 @@ class MockPanel:
@asynccontextmanager @asynccontextmanager
async def serve( async def serve(
self, self, host: str = "127.0.0.1", port: int = 0
host: str = "127.0.0.1",
port: int = 0,
transport: str = "tcp",
) -> AsyncIterator[tuple[str, int]]:
"""Start listening; yield ``(host, actual_port)``; tear down on exit.
``transport='tcp'`` (default) opens a stream server matching modern
PC Access. ``transport='udp'`` opens a datagram socket the panel
path used by some firmware versions and by the Omni-Link II
``Network_UDP`` configuration. Same wire format either way.
"""
if transport == "tcp":
async with self._serve_tcp(host, port) as bound:
yield bound
elif transport == "udp":
async with self._serve_udp(host, port) as bound:
yield bound
else:
raise ValueError(f"transport must be 'tcp' or 'udp', got {transport!r}")
@asynccontextmanager
async def _serve_tcp(
self, host: str, port: int
) -> AsyncIterator[tuple[str, int]]: ) -> AsyncIterator[tuple[str, int]]:
"""Start listening; yield ``(host, actual_port)``; tear down on exit."""
server = await asyncio.start_server(self._handle_client, host=host, port=port) server = await asyncio.start_server(self._handle_client, host=host, port=port)
sockets = server.sockets or () sockets = server.sockets or ()
if not sockets: # pragma: no cover -- start_server always populates this if not sockets: # pragma: no cover -- start_server always populates this
raise RuntimeError("asyncio.start_server returned no sockets") raise RuntimeError("asyncio.start_server returned no sockets")
bound_host, bound_port = sockets[0].getsockname()[:2] bound_host, bound_port = sockets[0].getsockname()[:2]
_log.debug("mock panel (tcp) listening on %s:%d", bound_host, bound_port) _log.debug("mock panel listening on %s:%d", bound_host, bound_port)
try: try:
async with server: async with server:
yield bound_host, bound_port yield bound_host, bound_port
finally: finally:
# Cancel any in-flight push tasks so the test event loop
# tears down cleanly.
for t in list(self._push_tasks): for t in list(self._push_tasks):
if not t.done(): if not t.done():
t.cancel() t.cancel()
@ -376,27 +356,6 @@ class MockPanel:
with contextlib.suppress(Exception): # pragma: no cover with contextlib.suppress(Exception): # pragma: no cover
await server.wait_closed() await server.wait_closed()
@asynccontextmanager
async def _serve_udp(
self, host: str, port: int
) -> AsyncIterator[tuple[str, int]]:
loop = asyncio.get_running_loop()
transport, _protocol = await loop.create_datagram_endpoint(
lambda: _MockServerDatagramProtocol(self),
local_addr=(host, port),
)
sockname = transport.get_extra_info("sockname")
bound_host, bound_port = sockname[0], sockname[1]
_log.debug("mock panel (udp) listening on %s:%d", bound_host, bound_port)
try:
yield bound_host, bound_port
finally:
for t in list(self._push_tasks):
if not t.done():
t.cancel()
self._push_tasks.clear()
transport.close()
# -------- connection handling -------- # -------- connection handling --------
async def _handle_client( async def _handle_client(
@ -1267,206 +1226,3 @@ async def _read_exact(reader: asyncio.StreamReader, n: int) -> bytes | None:
return await reader.readexactly(n) return await reader.readexactly(n)
except asyncio.IncompleteReadError: except asyncio.IncompleteReadError:
return None return None
# --------------------------------------------------------------------------
# UDP server protocol
# --------------------------------------------------------------------------
class _MockServerDatagramProtocol(asyncio.DatagramProtocol):
"""Datagram-side implementation of the mock panel.
Tracks a single active client (the most-recent peer to issue a
``ClientRequestNewSession``) and routes its session state. The same
reply-builder helpers (``_reply_system_information``,
``_build_zone_properties``, etc.) on the parent ``MockPanel`` are
reused for both transports the only thing UDP-specific here is
framing (each datagram is one complete Packet) and the absence of
per-block stream reads.
"""
def __init__(self, panel: MockPanel) -> None:
self._panel = panel
self._transport: asyncio.DatagramTransport | None = None
self._client_addr: tuple[str, int] | None = None
self._session_id: bytes | None = None
self._session_key: bytes | None = None
def connection_made(self, transport: asyncio.BaseTransport) -> None:
self._transport = transport # type: ignore[assignment]
def connection_lost(self, exc: Exception | None) -> None:
if exc is not None:
_log.warning("mock panel (udp) transport lost: %s", exc)
def error_received(self, exc: Exception) -> None:
_log.warning("mock panel (udp) socket error: %s", exc)
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
# Each datagram is a complete Packet. Spawn a task so we can do
# async work (drain push-event tasks, etc.) without blocking the
# protocol callback. We track the task so the GC doesn't drop the
# reference mid-flight (RUF006).
task = asyncio.create_task(
self._handle_datagram(data, addr),
name=f"mock-panel-udp-rx-{addr[0]}-{addr[1]}",
)
self._panel._push_tasks.add(task)
task.add_done_callback(self._panel._push_tasks.discard)
async def _handle_datagram(self, data: bytes, addr: tuple[str, int]) -> None:
try:
pkt = Packet.decode(data)
except Exception as exc:
_log.warning("mock panel (udp) malformed datagram from %s: %s", addr, exc)
return
try:
await self._dispatch_packet(pkt, addr)
except Exception:
_log.exception("mock panel (udp) crashed on packet seq=%d", pkt.seq)
def _send(self, pkt: Packet, addr: tuple[str, int]) -> None:
if self._transport is None:
return
self._transport.sendto(pkt.encode(), addr)
async def _dispatch_packet(
self, pkt: Packet, addr: tuple[str, int]
) -> None:
if pkt.type is PacketType.ClientRequestNewSession:
session_id = self._panel._session_id_provider()
self._session_id = session_id
self._session_key = derive_session_key(
self._panel._controller_key, session_id
)
self._client_addr = addr
payload = bytes([_PROTO_HI, _PROTO_LO]) + session_id
self._send(
Packet(
seq=pkt.seq,
type=PacketType.ControllerAckNewSession,
data=payload,
),
addr,
)
return
if pkt.type is PacketType.ClientRequestSecureSession:
if self._session_key is None or self._session_id is None:
_log.debug("mock panel (udp) secure-session before NewSession")
return
try:
plaintext = decrypt_message_payload(
pkt.data, pkt.seq, self._session_key
)
except Exception:
_log.debug("mock panel (udp) failed to decrypt secure-session")
return
if not plaintext.startswith(self._session_id):
self._send(
Packet(
seq=pkt.seq,
type=PacketType.ControllerSessionTerminated,
data=b"",
),
addr,
)
return
ciphertext_out = encrypt_message_payload(
self._session_id, pkt.seq, self._session_key
)
self._send(
Packet(
seq=pkt.seq,
type=PacketType.ControllerAckSecureSession,
data=ciphertext_out,
),
addr,
)
self._panel._session_count += 1
return
if pkt.type is PacketType.ClientSessionTerminated:
self._session_id = None
self._session_key = None
self._client_addr = None
return
if pkt.type is PacketType.OmniLink2Message:
if self._session_key is None:
_log.debug("mock panel (udp) encrypted message before secure session")
return
await self._handle_encrypted_udp(pkt, addr)
return
_log.debug("mock panel (udp) unhandled packet type %s", pkt.type.name)
async def _handle_encrypted_udp(
self, pkt: Packet, addr: tuple[str, int]
) -> None:
assert self._session_key is not None
try:
plaintext = decrypt_message_payload(
pkt.data, pkt.seq, self._session_key
)
except Exception:
_log.debug("mock panel (udp) failed to decrypt v2 message")
return
try:
inner = Message.decode(plaintext)
except (MessageCrcError, MessageFormatError):
await self._send_reply(pkt.seq, _build_nak(0), addr)
return
opcode = inner.opcode
self._panel._last_request_opcode = opcode
# _dispatch_v2 is the same opcode-dispatch table the TCP path uses,
# returning (reply_message, push_event_words) so we can write the
# synchronous reply first then schedule an unsolicited push.
reply, push_words = self._panel._dispatch_v2(opcode, inner.payload)
await self._send_reply(pkt.seq, reply, addr)
if push_words:
self._schedule_udp_push(push_words, addr)
def _schedule_udp_push(
self, words: tuple[int, ...], addr: tuple[str, int]
) -> None:
"""Fire-and-forget unsolicited SystemEvents push to ``addr``."""
assert self._session_key is not None
session_key = self._session_key
async def _push() -> None:
await asyncio.sleep(_PUSH_DELAY)
try:
msg = _build_system_events_message(words)
plaintext = msg.encode()
ciphertext = encrypt_message_payload(plaintext, 0, session_key)
pkt = Packet(
seq=0,
type=PacketType.OmniLink2Message,
data=ciphertext,
)
self._send(pkt, addr)
except (ConnectionError, asyncio.CancelledError):
pass
except Exception: # pragma: no cover - diagnostic only
_log.exception("mock panel (udp) failed to push event")
task = asyncio.create_task(_push(), name="mock-panel-udp-event-push")
self._panel._push_tasks.add(task)
task.add_done_callback(self._panel._push_tasks.discard)
async def _send_reply(
self, client_seq: int, message: Message, addr: tuple[str, int]
) -> None:
assert self._session_key is not None
plaintext = message.encode()
ciphertext = encrypt_message_payload(plaintext, client_seq, self._session_key)
pkt = Packet(
seq=client_seq,
type=PacketType.OmniLink2Message,
data=ciphertext,
)
self._send(pkt, addr)

View File

@ -1,152 +0,0 @@
"""End-to-end: OmniClient ↔ MockPanel over UDP.
Mirrors test_e2e_client_mock.py but with ``transport='udp'`` on both
sides. The protocol/encryption/handshake bytes are identical to TCP;
this proves only the transport layer change is sound.
"""
from __future__ import annotations
import asyncio
import secrets
import pytest
from omni_pca.client import ObjectType, OmniClient
from omni_pca.commands import CommandFailedError
from omni_pca.connection import ConnectionState, HandshakeError, OmniConnection
from omni_pca.events import UnitStateChanged
from omni_pca.mock_panel import (
MockAreaState,
MockButtonState,
MockPanel,
MockState,
MockThermostatState,
MockUnitState,
MockZoneState,
)
from omni_pca.models import (
AreaStatus,
SecurityMode,
)
from omni_pca.opcodes import OmniLink2MessageType
CONTROLLER_KEY = bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09")
def _populated_state() -> MockState:
return MockState(
zones={1: MockZoneState(name="FRONT_DOOR")},
units={1: MockUnitState(name="LIVING_LAMP")},
areas={1: MockAreaState(name="MAIN")},
thermostats={1: MockThermostatState(name="LIVING")},
buttons={1: MockButtonState(name="GOOD_MORNING")},
user_codes={1: 1234},
)
async def test_udp_handshake_roundtrip() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with (
panel.serve(transport="udp") as (host, port),
OmniConnection(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as conn,
):
assert conn.state is ConnectionState.ONLINE
assert panel.session_count == 1
async def test_udp_get_system_information() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with (
panel.serve(transport="udp") as (host, port),
OmniConnection(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as conn,
):
reply = await conn.request(OmniLink2MessageType.RequestSystemInformation)
assert reply.opcode == int(OmniLink2MessageType.SystemInformation)
# First payload byte is the model byte.
assert reply.payload[0] == 16 # OMNI_PRO_II
async def test_udp_arm_area_with_correct_code() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with (
panel.serve(transport="udp") as (host, port),
OmniClient(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as client,
):
await client.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=1234,
)
statuses = await client.get_object_status(ObjectType.AREA, 1)
assert len(statuses) == 1
area = statuses[0]
assert isinstance(area, AreaStatus)
assert area.mode == int(SecurityMode.AWAY)
async def test_udp_arm_with_wrong_code_raises() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
with pytest.raises(CommandFailedError):
async with OmniClient(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as client:
await client.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=9999,
)
async def test_udp_unit_on_pushes_state_changed_event() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with (
panel.serve(transport="udp") as (host, port),
OmniClient(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as client,
):
events = client.events()
await client.turn_unit_on(1)
ev = await asyncio.wait_for(events.__anext__(), timeout=1.0)
assert isinstance(ev, UnitStateChanged)
assert ev.unit_index == 1
assert ev.is_on is True
async def test_udp_wrong_key_fails_handshake() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY)
wrong_key = secrets.token_bytes(16)
async with panel.serve(transport="udp") as (host, port):
with pytest.raises(HandshakeError):
async with OmniConnection(
host=host,
port=port,
controller_key=wrong_key,
transport="udp",
timeout=2.0,
):
pass