Compare commits

...

4 Commits

Author SHA1 Message Date
d91561a6d2 OmniConnection.close: send ClientSessionTerminated to free panel slot
Found via live testing against the user's Omni Pro II (firmware 2.12)
on UDP. Without this, the panel holds the session slot (it's
single-client by design) and rejects new sessions from us with
ControllerCannotStartNewSession (packet type 0x07) until its idle
timeout fires (60s+ in our testing).

src/omni_pca/connection.py:
  close() now sends Packet(type=ClientSessionTerminated) before
  tearing down the socket, but only if we got past CONNECTING (no
  point sending termination if we never had a session). For TCP we
  drain the writer briefly so the byte hits the wire before FIN.
  For UDP the sendto is fire-and-forget. Wrapped in try/except so
  close() stays idempotent.

Live-validation findings (real Omni Pro II, firmware 2.12):
  ✓ UDP handshake works end-to-end. The four-packet exchange
    (NewSession ack / SecureSession ack) round-trips cleanly,
    confirming the two non-public protocol quirks (session key
    XOR mix + per-block whitening) are correctly implemented.
  ✗ Post-handshake encrypted messages get no reply from this
    firmware (tried v1 OmniLinkMessage and v2 OmniLink2Message;
    both arrive at the panel — verified via tcpdump — and the
    panel sends nothing back). Suspected: firmware 2.12 either
    requires an explicit Login first or has a different opcode
    set than PC Access 3.17 documents. Needs more RE work.

The handshake-quirks validation is the headline win — we've now
proven that part of the protocol against real hardware, which no
public Omni-Link client has done. Post-handshake message dispatch
is the next investigation.

357 tests still pass.
2026-05-10 21:24:09 -06:00
81725b4dbf HA config_flow: transport dropdown (TCP/UDP) for the new UDP path
custom_components/omni_pca/const.py:
  + CONF_TRANSPORT, TRANSPORT_TCP, TRANSPORT_UDP, DEFAULT_TRANSPORT='tcp'

custom_components/omni_pca/config_flow.py:
  + 'transport' field in _USER_SCHEMA with vol.In([tcp, udp]),
    default tcp (so existing flows are unchanged)
  + transport stored in entry.data on create
  + reauth carries the existing transport over from entry.data
  + _probe() takes transport=, propagates to OmniClient

custom_components/omni_pca/coordinator.py:
  + transport= constructor arg, defaults to 'tcp'
  + _ensure_connected passes transport= through to OmniClient

custom_components/omni_pca/__init__.py:
  + reads transport from entry.data (default tcp), passes to coordinator

Backward-compat: existing config entries without a transport key fall
through to 'tcp', identical to current behavior. New entries get the
choice at the config-flow form. The reauth step preserves the existing
transport so users don't have to re-pick it.

357 tests pass; ruff clean across src/ tests/ custom_components/.
HA integration tests don't need updating because they don't pass
transport= explicitly (default tcp matches the mock's default).
2026-05-10 21:15:56 -06:00
7f82dbbbfa UDP transport: parallel codepath in OmniConnection + MockPanel
The C# decompile shows enuOmniLinkConnectionType has both Network_TCP=4
and Network_UDP=3 (clsOmniLinkConnection.cs uses udpSend/tcpSend
parallel paths), and clsHAC carries an enuPreferredNetworkProtocol
{TCP, UDP} per-installation byte. User reports their panel is
configured for UDP. The TCP-only assumption was too narrow.

Wire format is identical: same Packet/Message framing, same handshake,
same per-block whitening, same opcodes, same port. Only differences:
* UDP is connectionless; each datagram = one Packet (no stream framing)
* UDP needs explicit retry-on-timeout for reliability

src/omni_pca/connection.py:
- New constructor args: transport: Literal['tcp','udp']='tcp',
  udp_retry_count: int = 3
- connect()/close() branch on transport — TCP keeps the existing
  asyncio.open_connection + StreamReader/Writer + reader_task path;
  UDP uses asyncio.get_running_loop().create_datagram_endpoint with
  remote_addr= so transport.sendto(data) works without per-datagram
  addrs. The reader_task is TCP-only.
- _write_packet branches between writer.write and udp_transport.sendto
- request() loops up to (1 + udp_retry_count) attempts on UDP, retrying
  on RequestTimeoutError; TCP gets a single attempt (existing behavior)
- New _OmniDatagramProtocol that decodes each datagram into a Packet
  and delegates to the shared _dispatch (which already knows how to
  route handshake / solicited / unsolicited)

src/omni_pca/mock_panel.py:
- serve(transport='tcp'|'udp') public arg; defaults preserve existing
  TCP behavior. Internally splits into _serve_tcp / _serve_udp.
- New _MockServerDatagramProtocol that mirrors _handle_client for UDP.
  Tracks one active client by addr (single-session, matches Omni's
  single-client constraint). Reuses the panel's existing _dispatch_v2,
  _reply_*, _build_* helpers — the dispatch logic is unchanged, only
  the transport framing differs.
- New _schedule_udp_push for synthesized SystemEvents (seq=0) push
  to the active client's addr after state mutations.

src/omni_pca/client.py:
- OmniClient gains transport= and udp_retry_count= kwargs that pass
  through to OmniConnection. Default is 'tcp' so existing callers
  are unaffected.

tests/test_e2e_udp.py — 6 e2e tests:
- handshake roundtrip
- get_system_information
- arm area with right code
- arm with wrong code -> CommandFailedError
- turn unit on -> push UnitStateChanged event
- wrong ControllerKey -> HandshakeError

All run under 0.2s. Combined with the existing TCP suite: 357 tests
pass (was 351), ruff clean across src/ tests/.

The HA integration's config_flow still defaults to TCP; users on UDP
panels can manually set transport= via the OmniClient init path. A
follow-up commit will add transport to the HA config flow as a
dropdown option.
2026-05-10 20:42:43 -06:00
5f6404a7e0 README: Gitea install URLs + docs site links + accurate quick starts
- Move install instructions from PyPI-only to Gitea-release-first
  (pip from git+https or direct wheel URL); note PyPI as 'pending'
- Add Project home + Documentation links at top
- Fix quick-start API name (get_system_info -> get_system_information,
  matching the actual library)
- Replace HA quick-start with the manual-clone path that works today
  (HACS support pending PyPI publish)
- Cross-link tutorials (first-script, dev-stack) and how-tos
  (install-in-home-assistant) from hai-omni-pro-ii.warehack.ing
- Add Tests section showing how to run the suite
- Add License + JOURNEY link in acknowledgements
2026-05-10 17:53:56 -06:00
9 changed files with 672 additions and 48 deletions

View File

@ -4,6 +4,9 @@ Async Python client for HAI/Leviton Omni-Link II home automation panels — Omni
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
**Project home:** <https://git.supported.systems/warehack.ing/omni-pca>
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
## Status
**Alpha.** Built from a full reverse-engineering of HAI's PC Access 3.17 (the Windows installer/programmer app). The protocol layer captures two non-public quirks that public Omni-Link clients miss:
@ -11,14 +14,27 @@ Includes a Home Assistant custom component (`custom_components/omni_pca/`).
1. **Session key is not the ControllerKey.** Last 5 bytes are XORed with a controller-supplied SessionID nonce.
2. **Per-block XOR pre-whitening before AES.** First two bytes of every 16-byte block are XORed with the packet's sequence number.
See [`docs/PROTOCOL.md`](docs/PROTOCOL.md) for the full byte-level spec.
The full byte-level protocol spec lives at <https://hai-omni-pro-ii.warehack.ing/reference/protocol/>.
## Quick start (library)
## Install
The library isn't on PyPI yet (pending), so install directly from the Gitea release:
```bash
uv add omni-pca
# Pinned to a specific release (recommended)
pip install "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
# Or the wheel from the release page
pip install https://git.supported.systems/warehack.ing/omni-pca/releases/download/v2026.5.10/omni_pca-2026.5.10-py3-none-any.whl
# Or with uv
uv add "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
```
Once published to PyPI, the canonical install will be `pip install omni-pca`.
## Quick start (library)
```python
import asyncio
from omni_pca import OmniClient
@ -29,44 +45,71 @@ async def main():
port=4369,
controller_key=bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09"),
) as panel:
info = await panel.get_system_info()
info = await panel.get_system_information()
print(info.model_name, info.firmware_version)
asyncio.run(main())
```
For the panel walkthrough — connect, list zones, react to push events — see the [tutorial](https://hai-omni-pro-ii.warehack.ing/tutorials/first-script/).
## Quick start (Home Assistant)
Copy `custom_components/omni_pca/` into your HA `config/custom_components/`, restart HA, then add the integration via Settings → Devices & Services. You'll need:
```bash
# Manual install — works on every HA flavour
cd /path/to/your/homeassistant/config/
mkdir -p custom_components
cd custom_components
git clone https://git.supported.systems/warehack.ing/omni-pca tmp-omni
cp -r tmp-omni/custom_components/omni_pca .
rm -rf tmp-omni
```
Restart HA, then add the integration via **Settings → Devices & Services**. You'll need:
- Panel IP / hostname
- TCP port (default 4369)
- ControllerKey as 32 hex chars
Get the ControllerKey from your `.pca` file using the included parser:
Get the ControllerKey from your `.pca` file using the bundled CLI:
```bash
uvx --from omni-pca omni-pca decode-pca path/to/Your.pca --field controller_key
omni-pca decode-pca '/path/to/Your.pca' --field controller_key
```
The integration creates one HA device per panel plus typed entities for every named object on the controller: `alarm_control_panel` for areas, `light` for units, `binary_sensor`/`switch` for zones (state + bypass), `climate` for thermostats, `sensor` for analog zones and panel telemetry, `button` for panel macros, and `event` for the typed push-notification stream. See [`custom_components/omni_pca/README.md`](custom_components/omni_pca/README.md) for the entity table and service list.
The integration creates one HA device per panel plus typed entities for every named object on the controller: `alarm_control_panel` for areas, `light` for units, `binary_sensor` + `switch` for zones (state + bypass), `climate` for thermostats, `sensor` for analog zones and panel telemetry, `button` for panel macros, and `event` for the typed push-notification stream. See [`custom_components/omni_pca/README.md`](custom_components/omni_pca/README.md) for the full entity + service catalog, or the [HA install how-to](https://hai-omni-pro-ii.warehack.ing/how-to/install-in-home-assistant/) for the step-by-step.
## Without a panel — mock controller
For testing, the library ships a minimal Omni controller emulator:
The library ships a stateful `MockPanel` that emulates the controller side of the protocol over real TCP. Useful for offline development, integration tests, and demos:
```python
from omni_pca.mock_panel import MockPanel
async with MockPanel(controller_key=...).serve(port=14369):
# connect a real OmniClient to localhost:14369 — works end-to-end
# Connect a real OmniClient to localhost:14369 — full handshake + AES
...
```
The local dev stack (`dev/docker-compose.yml`) packages a real Home Assistant container and the mock panel side-by-side so you can click through the integration without touching real hardware. See [the dev-stack tutorial](https://hai-omni-pro-ii.warehack.ing/tutorials/dev-stack/).
## Tests
```bash
uv sync --group ha
uv run pytest -q
```
351 tests across the protocol primitives, the mock panel, the OmniClient ↔ MockPanel end-to-end roundtrip, and an in-process Home Assistant harness driving the integration via the real config flow + service calls.
## Versioning
Date-based ([CalVer](https://calver.org/)): `YYYY.M.D`. Bumped on backwards-incompatible changes.
Date-based ([CalVer](https://calver.org/)): `YYYY.M.D`. Bumped on backwards-incompatible changes. See [`CHANGELOG.md`](CHANGELOG.md).
## License
MIT. See [`LICENSE`](LICENSE).
## Acknowledgments
This client is independent and not affiliated with Leviton or HAI. Protocol details derived from clean-room analysis of the publicly-distributed PC Access installer.
This client is independent and not affiliated with Leviton or HAI. Protocol details derived from clean-room analysis of the publicly-distributed PC Access installer. The reverse-engineering arc is documented at <https://hai-omni-pro-ii.warehack.ing/journey/>.

View File

@ -14,7 +14,13 @@ from typing import TYPE_CHECKING
from homeassistant.const import CONF_HOST, CONF_PORT, Platform
from homeassistant.exceptions import ConfigEntryNotReady
from .const import CONF_CONTROLLER_KEY, DOMAIN, LOGGER
from .const import (
CONF_CONTROLLER_KEY,
CONF_TRANSPORT,
DEFAULT_TRANSPORT,
DOMAIN,
LOGGER,
)
from .coordinator import OmniDataUpdateCoordinator
from .services import async_setup_services, async_unload_services
@ -50,12 +56,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
LOGGER.error("stored controller key for %s is corrupt: %s", entry.title, err)
return False
transport: str = entry.data.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
coordinator = OmniDataUpdateCoordinator(
hass,
entry,
host=host,
port=port,
controller_key=controller_key,
transport=transport,
)
try:

View File

@ -20,10 +20,14 @@ from omni_pca.connection import (
from .const import (
CONF_CONTROLLER_KEY,
CONF_TRANSPORT,
CONTROLLER_KEY_HEX_LEN,
DEFAULT_PORT,
DEFAULT_TRANSPORT,
DOMAIN,
LOGGER,
TRANSPORT_TCP,
TRANSPORT_UDP,
)
@ -60,6 +64,12 @@ _USER_SCHEMA = vol.Schema(
vol.Coerce(int), vol.Range(min=1, max=65535)
),
vol.Required(CONF_CONTROLLER_KEY): str,
# Most modern firmware uses TCP; some installers configure
# Network_UDP. PC Access stores the choice as
# enuPreferredNetworkProtocol in the .pca config.
vol.Required(CONF_TRANSPORT, default=DEFAULT_TRANSPORT): vol.In(
[TRANSPORT_TCP, TRANSPORT_UDP]
),
}
)
@ -79,6 +89,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
if user_input is not None:
host: str = user_input[CONF_HOST].strip()
port: int = user_input[CONF_PORT]
transport: str = user_input.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
unique_id = f"{host}:{port}"
await self.async_set_unique_id(unique_id)
@ -90,7 +101,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
LOGGER.debug("controller key rejected: %s", err)
errors[CONF_CONTROLLER_KEY] = "invalid_key"
else:
title, error = await self._probe(host, port, key)
title, error = await self._probe(host, port, key, transport)
if error is not None:
errors["base"] = error
else:
@ -100,6 +111,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
CONF_HOST: host,
CONF_PORT: port,
CONF_CONTROLLER_KEY: key.hex(),
CONF_TRANSPORT: transport,
},
)
@ -121,6 +133,9 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
assert self._reauth_entry_data is not None
host: str = self._reauth_entry_data[CONF_HOST]
port: int = self._reauth_entry_data[CONF_PORT]
transport: str = self._reauth_entry_data.get(
CONF_TRANSPORT, DEFAULT_TRANSPORT
)
errors: dict[str, str] = {}
if user_input is not None:
@ -129,7 +144,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
except InvalidControllerKey:
errors[CONF_CONTROLLER_KEY] = "invalid_key"
else:
_, error = await self._probe(host, port, key)
_, error = await self._probe(host, port, key, transport)
if error is not None:
errors["base"] = error
else:
@ -147,11 +162,20 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
# ---- helpers ---------------------------------------------------------
async def _probe(
self, host: str, port: int, key: bytes
self,
host: str,
port: int,
key: bytes,
transport: str = DEFAULT_TRANSPORT,
) -> tuple[str | None, str | None]:
"""Try to connect once. Returns (title, error_code)."""
try:
async with OmniClient(host, port=port, controller_key=key) as client:
async with OmniClient(
host,
port=port,
controller_key=key,
transport=transport, # type: ignore[arg-type]
) as client:
info = await client.get_system_information()
except (HandshakeError, InvalidEncryptionKeyError):
return None, "invalid_auth"

View File

@ -12,6 +12,11 @@ DEFAULT_PORT: Final = 4369
DEFAULT_TIMEOUT: Final = 5.0
CONF_CONTROLLER_KEY: Final = "controller_key"
CONF_TRANSPORT: Final = "transport"
TRANSPORT_TCP: Final = "tcp"
TRANSPORT_UDP: Final = "udp"
DEFAULT_TRANSPORT: Final = TRANSPORT_TCP
MANUFACTURER: Final = "HAI / Leviton"

View File

@ -137,6 +137,7 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
host: str,
port: int,
controller_key: bytes,
transport: str = "tcp",
) -> None:
super().__init__(
hass,
@ -148,6 +149,7 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
self._host = host
self._port = port
self._controller_key = controller_key
self._transport = transport
self._client: OmniClient | None = None
self._discovery_done = False
self._discovered: OmniData | None = None
@ -236,6 +238,7 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
self._host,
port=self._port,
controller_key=self._controller_key,
transport=self._transport, # type: ignore[arg-type]
)
# Drive __aenter__ manually so the client survives across update
# cycles; we close it explicitly on shutdown / failure.

View File

@ -20,7 +20,7 @@ import struct
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from enum import IntEnum
from types import TracebackType
from typing import TYPE_CHECKING, Self
from typing import TYPE_CHECKING, Literal, Self
from .commands import Command, CommandFailedError, SecurityCommandResponse
@ -120,12 +120,20 @@ class OmniClient:
*,
controller_key: bytes,
timeout: float = 5.0,
transport: Literal["tcp", "udp"] = "tcp",
udp_retry_count: int = 3,
) -> None:
"""``transport='udp'`` if your panel is configured for the
``Network_UDP`` connection type (some firmware versions and the
default for many installs). ``udp_retry_count`` is ignored on TCP.
"""
self._conn = OmniConnection(
host=host,
port=port,
controller_key=controller_key,
timeout=timeout,
transport=transport,
udp_retry_count=udp_retry_count,
)
self._subscriber_task: asyncio.Task[None] | None = None

View File

@ -26,6 +26,7 @@ import logging
from collections.abc import AsyncIterator
from enum import IntEnum
from types import TracebackType
from typing import Literal
from .crypto import (
BLOCK_SIZE,
@ -104,18 +105,30 @@ class OmniConnection:
port: int = _DEFAULT_PORT,
controller_key: bytes = b"",
timeout: float = 5.0,
transport: Literal["tcp", "udp"] = "tcp",
udp_retry_count: int = 3,
) -> None:
if len(controller_key) != 16:
raise ValueError(
f"controller_key must be 16 bytes, got {len(controller_key)}"
)
if transport not in ("tcp", "udp"):
raise ValueError(f"transport must be 'tcp' or 'udp', got {transport!r}")
self._host = host
self._port = port
self._controller_key = bytes(controller_key)
self._default_timeout = timeout
self._transport_kind: Literal["tcp", "udp"] = transport
self._udp_retry_count = max(0, udp_retry_count)
# TCP transport state
self._reader: asyncio.StreamReader | None = None
self._writer: asyncio.StreamWriter | None = None
# UDP transport state (asyncio.DatagramTransport + our protocol)
self._udp_transport: asyncio.DatagramTransport | None = None
self._udp_protocol: _OmniDatagramProtocol | None = None
self._state = ConnectionState.DISCONNECTED
self._session_id: bytes | None = None
@ -167,22 +180,41 @@ class OmniConnection:
await self.close()
async def connect(self) -> None:
"""Open the TCP socket and run the 4-step secure-session handshake."""
"""Open the socket and run the 4-step secure-session handshake.
Transport is set by the ``transport=`` constructor arg. TCP opens
a stream socket; UDP opens a datagram endpoint. Either way, the
handshake (and everything else) speaks the same Packet/Message
format and crypto.
"""
if self._state is not ConnectionState.DISCONNECTED:
raise ConnectionError(f"already connecting/connected (state={self._state})")
self._state = ConnectionState.CONNECTING
try:
if self._transport_kind == "tcp":
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self._host, self._port),
timeout=self._default_timeout,
)
except (TimeoutError, OSError) as exc:
self._state = ConnectionState.DISCONNECTED
raise ConnectionError(f"failed to open TCP socket: {exc}") from exc
self._reader_task = asyncio.create_task(
self._read_loop(), name=f"omni-conn-reader-{self._host}"
)
else:
# UDP: connectionless. We "connect" the datagram socket to
# the panel so we can reject stray datagrams from elsewhere
# and use plain `transport.sendto(data)`.
loop = asyncio.get_running_loop()
self._udp_transport, self._udp_protocol = (
await loop.create_datagram_endpoint(
lambda: _OmniDatagramProtocol(self),
remote_addr=(self._host, self._port),
)
)
except (TimeoutError, OSError) as exc:
self._state = ConnectionState.DISCONNECTED
raise ConnectionError(
f"failed to open {self._transport_kind.upper()} socket: {exc}"
) from exc
try:
await self._do_handshake()
@ -195,8 +227,36 @@ class OmniConnection:
if self._closed:
return
self._closed = True
previous_state = self._state
self._state = ConnectionState.DISCONNECTED
# Politely tell the controller we're done — Omni is single-client,
# and on UDP it has no other way to know we've gone (TCP gets a
# FIN; UDP just sees datagrams stop). Without this, the panel
# holds the session slot until its idle timeout and rejects new
# connections from us with ControllerCannotStartNewSession.
if previous_state in (
ConnectionState.NEW_SESSION,
ConnectionState.SECURE,
ConnectionState.ONLINE,
):
try:
term_seq = self._claim_seq()
term = Packet(
seq=term_seq,
type=PacketType.ClientSessionTerminated,
data=b"",
)
self._write_packet(term)
# Best-effort flush so the byte hits the wire before we
# tear down the socket. UDP is fire-and-forget; TCP needs
# a tick for the writer to drain.
if self._writer is not None:
with contextlib.suppress(Exception):
await self._writer.drain()
except Exception as exc: # noqa: BLE001 - close() must be idempotent
_log.debug("close: failed to send ClientSessionTerminated: %s", exc)
# Cancel anyone still waiting for a reply.
for fut in self._pending.values():
if not fut.done():
@ -212,6 +272,12 @@ class OmniConnection:
self._writer = None
self._reader = None
if self._udp_transport is not None:
with contextlib.suppress(OSError):
self._udp_transport.close()
self._udp_transport = None
self._udp_protocol = None
if self._reader_task is not None and not self._reader_task.done():
self._reader_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
@ -238,17 +304,39 @@ class OmniConnection:
f"cannot send request, connection state={self._state.name}"
)
message = encode_v2(opcode, payload)
per_attempt_timeout = timeout if timeout is not None else self._default_timeout
# UDP needs explicit retries since datagram delivery is best-effort.
# TCP gets reliable delivery for free; we still keep retry_count for
# API uniformity but it defaults to 0 effectively.
max_attempts = (
1 + self._udp_retry_count if self._transport_kind == "udp" else 1
)
last_exc: Exception | None = None
for attempt in range(1, max_attempts + 1):
seq, fut = self._send_encrypted(message)
try:
reply_packet = await asyncio.wait_for(
fut, timeout if timeout is not None else self._default_timeout
)
reply_packet = await asyncio.wait_for(fut, per_attempt_timeout)
except TimeoutError as exc:
last_exc = exc
self._pending.pop(seq, None)
if attempt < max_attempts:
_log.debug(
"udp retry %d/%d on opcode=%d seq=%d",
attempt,
max_attempts,
int(opcode),
seq,
)
continue
raise RequestTimeoutError(
f"no reply for opcode={int(opcode)} seq={seq}"
) from exc
f"no reply for opcode={int(opcode)} "
f"after {max_attempts} attempt(s)"
) from last_exc
return self._decode_inner(reply_packet)
# Loop exit without return only via re-raised timeout above.
raise RequestTimeoutError(
f"request loop exited without reply for opcode={int(opcode)}"
)
def unsolicited(self) -> AsyncIterator[Message]:
"""Async iterator over unsolicited inbound messages (seq=0)."""
@ -380,17 +468,23 @@ class OmniConnection:
return seq, fut
def _write_packet(self, pkt: Packet, *, encrypted: bool = False) -> None:
if self._writer is None:
raise ConnectionError("transport not open")
wire = pkt.encode()
_log.debug(
"TX seq=%d type=%s len=%d encrypted=%s",
"TX[%s] seq=%d type=%s len=%d encrypted=%s",
self._transport_kind,
pkt.seq,
pkt.type.name,
len(pkt.data),
encrypted,
)
if self._transport_kind == "tcp":
if self._writer is None:
raise ConnectionError("transport not open")
self._writer.write(wire)
else:
if self._udp_transport is None:
raise ConnectionError("transport not open")
self._udp_transport.sendto(wire)
def _decode_inner(self, pkt: Packet) -> Message:
"""Decrypt + parse the inner ``Message`` from an OmniLink2Message packet."""
@ -596,3 +690,46 @@ class OmniConnection:
return
if not fut.done():
fut.set_result(pkt)
# --------------------------------------------------------------------------
# UDP transport
# --------------------------------------------------------------------------
class _OmniDatagramProtocol(asyncio.DatagramProtocol):
"""asyncio.DatagramProtocol bound to a single OmniConnection.
Each datagram received on a UDP Omni socket *is* one complete Packet
(no stream framing that is the whole reason UDP is useful here).
We just decode it and hand it to the connection's dispatcher.
"""
def __init__(self, conn: OmniConnection) -> None:
self._conn = conn
def connection_made(self, transport: asyncio.BaseTransport) -> None:
# transport is a DatagramTransport in this codepath.
pass
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
# Each datagram is a complete Packet — no stream framing.
# The TCP _dispatch already handles handshake routing, solicited
# replies, and unsolicited push, so we just delegate.
try:
pkt = Packet.decode(data)
except Exception as exc:
_log.warning("dropping malformed UDP datagram: %s", exc)
return
try:
self._conn._dispatch(pkt)
except Exception:
_log.exception("UDP dispatch crashed for seq=%d", pkt.seq)
def error_received(self, exc: Exception) -> None:
_log.warning("UDP socket error: %s", exc)
def connection_lost(self, exc: Exception | None) -> None:
if exc is not None:
_log.warning("UDP transport lost: %s", exc)

View File

@ -333,21 +333,41 @@ class MockPanel:
@asynccontextmanager
async def serve(
self, host: str = "127.0.0.1", port: int = 0
self,
host: str = "127.0.0.1",
port: int = 0,
transport: str = "tcp",
) -> AsyncIterator[tuple[str, int]]:
"""Start listening; yield ``(host, actual_port)``; tear down on exit.
``transport='tcp'`` (default) opens a stream server matching modern
PC Access. ``transport='udp'`` opens a datagram socket the panel
path used by some firmware versions and by the Omni-Link II
``Network_UDP`` configuration. Same wire format either way.
"""
if transport == "tcp":
async with self._serve_tcp(host, port) as bound:
yield bound
elif transport == "udp":
async with self._serve_udp(host, port) as bound:
yield bound
else:
raise ValueError(f"transport must be 'tcp' or 'udp', got {transport!r}")
@asynccontextmanager
async def _serve_tcp(
self, host: str, port: int
) -> AsyncIterator[tuple[str, int]]:
"""Start listening; yield ``(host, actual_port)``; tear down on exit."""
server = await asyncio.start_server(self._handle_client, host=host, port=port)
sockets = server.sockets or ()
if not sockets: # pragma: no cover -- start_server always populates this
raise RuntimeError("asyncio.start_server returned no sockets")
bound_host, bound_port = sockets[0].getsockname()[:2]
_log.debug("mock panel listening on %s:%d", bound_host, bound_port)
_log.debug("mock panel (tcp) listening on %s:%d", bound_host, bound_port)
try:
async with server:
yield bound_host, bound_port
finally:
# Cancel any in-flight push tasks so the test event loop
# tears down cleanly.
for t in list(self._push_tasks):
if not t.done():
t.cancel()
@ -356,6 +376,27 @@ class MockPanel:
with contextlib.suppress(Exception): # pragma: no cover
await server.wait_closed()
@asynccontextmanager
async def _serve_udp(
self, host: str, port: int
) -> AsyncIterator[tuple[str, int]]:
loop = asyncio.get_running_loop()
transport, _protocol = await loop.create_datagram_endpoint(
lambda: _MockServerDatagramProtocol(self),
local_addr=(host, port),
)
sockname = transport.get_extra_info("sockname")
bound_host, bound_port = sockname[0], sockname[1]
_log.debug("mock panel (udp) listening on %s:%d", bound_host, bound_port)
try:
yield bound_host, bound_port
finally:
for t in list(self._push_tasks):
if not t.done():
t.cancel()
self._push_tasks.clear()
transport.close()
# -------- connection handling --------
async def _handle_client(
@ -1226,3 +1267,206 @@ async def _read_exact(reader: asyncio.StreamReader, n: int) -> bytes | None:
return await reader.readexactly(n)
except asyncio.IncompleteReadError:
return None
# --------------------------------------------------------------------------
# UDP server protocol
# --------------------------------------------------------------------------
class _MockServerDatagramProtocol(asyncio.DatagramProtocol):
"""Datagram-side implementation of the mock panel.
Tracks a single active client (the most-recent peer to issue a
``ClientRequestNewSession``) and routes its session state. The same
reply-builder helpers (``_reply_system_information``,
``_build_zone_properties``, etc.) on the parent ``MockPanel`` are
reused for both transports the only thing UDP-specific here is
framing (each datagram is one complete Packet) and the absence of
per-block stream reads.
"""
def __init__(self, panel: MockPanel) -> None:
self._panel = panel
self._transport: asyncio.DatagramTransport | None = None
self._client_addr: tuple[str, int] | None = None
self._session_id: bytes | None = None
self._session_key: bytes | None = None
def connection_made(self, transport: asyncio.BaseTransport) -> None:
self._transport = transport # type: ignore[assignment]
def connection_lost(self, exc: Exception | None) -> None:
if exc is not None:
_log.warning("mock panel (udp) transport lost: %s", exc)
def error_received(self, exc: Exception) -> None:
_log.warning("mock panel (udp) socket error: %s", exc)
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
# Each datagram is a complete Packet. Spawn a task so we can do
# async work (drain push-event tasks, etc.) without blocking the
# protocol callback. We track the task so the GC doesn't drop the
# reference mid-flight (RUF006).
task = asyncio.create_task(
self._handle_datagram(data, addr),
name=f"mock-panel-udp-rx-{addr[0]}-{addr[1]}",
)
self._panel._push_tasks.add(task)
task.add_done_callback(self._panel._push_tasks.discard)
async def _handle_datagram(self, data: bytes, addr: tuple[str, int]) -> None:
try:
pkt = Packet.decode(data)
except Exception as exc:
_log.warning("mock panel (udp) malformed datagram from %s: %s", addr, exc)
return
try:
await self._dispatch_packet(pkt, addr)
except Exception:
_log.exception("mock panel (udp) crashed on packet seq=%d", pkt.seq)
def _send(self, pkt: Packet, addr: tuple[str, int]) -> None:
if self._transport is None:
return
self._transport.sendto(pkt.encode(), addr)
async def _dispatch_packet(
self, pkt: Packet, addr: tuple[str, int]
) -> None:
if pkt.type is PacketType.ClientRequestNewSession:
session_id = self._panel._session_id_provider()
self._session_id = session_id
self._session_key = derive_session_key(
self._panel._controller_key, session_id
)
self._client_addr = addr
payload = bytes([_PROTO_HI, _PROTO_LO]) + session_id
self._send(
Packet(
seq=pkt.seq,
type=PacketType.ControllerAckNewSession,
data=payload,
),
addr,
)
return
if pkt.type is PacketType.ClientRequestSecureSession:
if self._session_key is None or self._session_id is None:
_log.debug("mock panel (udp) secure-session before NewSession")
return
try:
plaintext = decrypt_message_payload(
pkt.data, pkt.seq, self._session_key
)
except Exception:
_log.debug("mock panel (udp) failed to decrypt secure-session")
return
if not plaintext.startswith(self._session_id):
self._send(
Packet(
seq=pkt.seq,
type=PacketType.ControllerSessionTerminated,
data=b"",
),
addr,
)
return
ciphertext_out = encrypt_message_payload(
self._session_id, pkt.seq, self._session_key
)
self._send(
Packet(
seq=pkt.seq,
type=PacketType.ControllerAckSecureSession,
data=ciphertext_out,
),
addr,
)
self._panel._session_count += 1
return
if pkt.type is PacketType.ClientSessionTerminated:
self._session_id = None
self._session_key = None
self._client_addr = None
return
if pkt.type is PacketType.OmniLink2Message:
if self._session_key is None:
_log.debug("mock panel (udp) encrypted message before secure session")
return
await self._handle_encrypted_udp(pkt, addr)
return
_log.debug("mock panel (udp) unhandled packet type %s", pkt.type.name)
async def _handle_encrypted_udp(
self, pkt: Packet, addr: tuple[str, int]
) -> None:
assert self._session_key is not None
try:
plaintext = decrypt_message_payload(
pkt.data, pkt.seq, self._session_key
)
except Exception:
_log.debug("mock panel (udp) failed to decrypt v2 message")
return
try:
inner = Message.decode(plaintext)
except (MessageCrcError, MessageFormatError):
await self._send_reply(pkt.seq, _build_nak(0), addr)
return
opcode = inner.opcode
self._panel._last_request_opcode = opcode
# _dispatch_v2 is the same opcode-dispatch table the TCP path uses,
# returning (reply_message, push_event_words) so we can write the
# synchronous reply first then schedule an unsolicited push.
reply, push_words = self._panel._dispatch_v2(opcode, inner.payload)
await self._send_reply(pkt.seq, reply, addr)
if push_words:
self._schedule_udp_push(push_words, addr)
def _schedule_udp_push(
self, words: tuple[int, ...], addr: tuple[str, int]
) -> None:
"""Fire-and-forget unsolicited SystemEvents push to ``addr``."""
assert self._session_key is not None
session_key = self._session_key
async def _push() -> None:
await asyncio.sleep(_PUSH_DELAY)
try:
msg = _build_system_events_message(words)
plaintext = msg.encode()
ciphertext = encrypt_message_payload(plaintext, 0, session_key)
pkt = Packet(
seq=0,
type=PacketType.OmniLink2Message,
data=ciphertext,
)
self._send(pkt, addr)
except (ConnectionError, asyncio.CancelledError):
pass
except Exception: # pragma: no cover - diagnostic only
_log.exception("mock panel (udp) failed to push event")
task = asyncio.create_task(_push(), name="mock-panel-udp-event-push")
self._panel._push_tasks.add(task)
task.add_done_callback(self._panel._push_tasks.discard)
async def _send_reply(
self, client_seq: int, message: Message, addr: tuple[str, int]
) -> None:
assert self._session_key is not None
plaintext = message.encode()
ciphertext = encrypt_message_payload(plaintext, client_seq, self._session_key)
pkt = Packet(
seq=client_seq,
type=PacketType.OmniLink2Message,
data=ciphertext,
)
self._send(pkt, addr)

152
tests/test_e2e_udp.py Normal file
View File

@ -0,0 +1,152 @@
"""End-to-end: OmniClient ↔ MockPanel over UDP.
Mirrors test_e2e_client_mock.py but with ``transport='udp'`` on both
sides. The protocol/encryption/handshake bytes are identical to TCP;
this proves only the transport layer change is sound.
"""
from __future__ import annotations
import asyncio
import secrets
import pytest
from omni_pca.client import ObjectType, OmniClient
from omni_pca.commands import CommandFailedError
from omni_pca.connection import ConnectionState, HandshakeError, OmniConnection
from omni_pca.events import UnitStateChanged
from omni_pca.mock_panel import (
MockAreaState,
MockButtonState,
MockPanel,
MockState,
MockThermostatState,
MockUnitState,
MockZoneState,
)
from omni_pca.models import (
AreaStatus,
SecurityMode,
)
from omni_pca.opcodes import OmniLink2MessageType
CONTROLLER_KEY = bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09")
def _populated_state() -> MockState:
return MockState(
zones={1: MockZoneState(name="FRONT_DOOR")},
units={1: MockUnitState(name="LIVING_LAMP")},
areas={1: MockAreaState(name="MAIN")},
thermostats={1: MockThermostatState(name="LIVING")},
buttons={1: MockButtonState(name="GOOD_MORNING")},
user_codes={1: 1234},
)
async def test_udp_handshake_roundtrip() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with (
panel.serve(transport="udp") as (host, port),
OmniConnection(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as conn,
):
assert conn.state is ConnectionState.ONLINE
assert panel.session_count == 1
async def test_udp_get_system_information() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with (
panel.serve(transport="udp") as (host, port),
OmniConnection(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as conn,
):
reply = await conn.request(OmniLink2MessageType.RequestSystemInformation)
assert reply.opcode == int(OmniLink2MessageType.SystemInformation)
# First payload byte is the model byte.
assert reply.payload[0] == 16 # OMNI_PRO_II
async def test_udp_arm_area_with_correct_code() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with (
panel.serve(transport="udp") as (host, port),
OmniClient(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as client,
):
await client.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=1234,
)
statuses = await client.get_object_status(ObjectType.AREA, 1)
assert len(statuses) == 1
area = statuses[0]
assert isinstance(area, AreaStatus)
assert area.mode == int(SecurityMode.AWAY)
async def test_udp_arm_with_wrong_code_raises() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with panel.serve(transport="udp") as (host, port):
with pytest.raises(CommandFailedError):
async with OmniClient(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as client:
await client.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=9999,
)
async def test_udp_unit_on_pushes_state_changed_event() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
async with (
panel.serve(transport="udp") as (host, port),
OmniClient(
host=host,
port=port,
controller_key=CONTROLLER_KEY,
transport="udp",
timeout=2.0,
) as client,
):
events = client.events()
await client.turn_unit_on(1)
ev = await asyncio.wait_for(events.__anext__(), timeout=1.0)
assert isinstance(ev, UnitStateChanged)
assert ev.unit_index == 1
assert ev.is_on is True
async def test_udp_wrong_key_fails_handshake() -> None:
panel = MockPanel(controller_key=CONTROLLER_KEY)
wrong_key = secrets.token_bytes(16)
async with panel.serve(transport="udp") as (host, port):
with pytest.raises(HandshakeError):
async with OmniConnection(
host=host,
port=port,
controller_key=wrong_key,
transport="udp",
timeout=2.0,
):
pass