Compare commits
4 Commits
v2026.5.10
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| d91561a6d2 | |||
| 81725b4dbf | |||
| 7f82dbbbfa | |||
| 5f6404a7e0 |
67
README.md
67
README.md
@ -4,6 +4,9 @@ Async Python client for HAI/Leviton Omni-Link II home automation panels — Omni
|
|||||||
|
|
||||||
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
|
Includes a Home Assistant custom component (`custom_components/omni_pca/`).
|
||||||
|
|
||||||
|
**Project home:** <https://git.supported.systems/warehack.ing/omni-pca>
|
||||||
|
**Documentation:** <https://hai-omni-pro-ii.warehack.ing/>
|
||||||
|
|
||||||
## Status
|
## Status
|
||||||
|
|
||||||
**Alpha.** Built from a full reverse-engineering of HAI's PC Access 3.17 (the Windows installer/programmer app). The protocol layer captures two non-public quirks that public Omni-Link clients miss:
|
**Alpha.** Built from a full reverse-engineering of HAI's PC Access 3.17 (the Windows installer/programmer app). The protocol layer captures two non-public quirks that public Omni-Link clients miss:
|
||||||
@ -11,14 +14,27 @@ Includes a Home Assistant custom component (`custom_components/omni_pca/`).
|
|||||||
1. **Session key is not the ControllerKey.** Last 5 bytes are XORed with a controller-supplied SessionID nonce.
|
1. **Session key is not the ControllerKey.** Last 5 bytes are XORed with a controller-supplied SessionID nonce.
|
||||||
2. **Per-block XOR pre-whitening before AES.** First two bytes of every 16-byte block are XORed with the packet's sequence number.
|
2. **Per-block XOR pre-whitening before AES.** First two bytes of every 16-byte block are XORed with the packet's sequence number.
|
||||||
|
|
||||||
See [`docs/PROTOCOL.md`](docs/PROTOCOL.md) for the full byte-level spec.
|
The full byte-level protocol spec lives at <https://hai-omni-pro-ii.warehack.ing/reference/protocol/>.
|
||||||
|
|
||||||
## Quick start (library)
|
## Install
|
||||||
|
|
||||||
|
The library isn't on PyPI yet (pending), so install directly from the Gitea release:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
uv add omni-pca
|
# Pinned to a specific release (recommended)
|
||||||
|
pip install "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
|
||||||
|
|
||||||
|
# Or the wheel from the release page
|
||||||
|
pip install https://git.supported.systems/warehack.ing/omni-pca/releases/download/v2026.5.10/omni_pca-2026.5.10-py3-none-any.whl
|
||||||
|
|
||||||
|
# Or with uv
|
||||||
|
uv add "omni-pca @ git+https://git.supported.systems/warehack.ing/omni-pca.git@v2026.5.10"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Once published to PyPI, the canonical install will be `pip install omni-pca`.
|
||||||
|
|
||||||
|
## Quick start (library)
|
||||||
|
|
||||||
```python
|
```python
|
||||||
import asyncio
|
import asyncio
|
||||||
from omni_pca import OmniClient
|
from omni_pca import OmniClient
|
||||||
@ -29,44 +45,71 @@ async def main():
|
|||||||
port=4369,
|
port=4369,
|
||||||
controller_key=bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09"),
|
controller_key=bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09"),
|
||||||
) as panel:
|
) as panel:
|
||||||
info = await panel.get_system_info()
|
info = await panel.get_system_information()
|
||||||
print(info.model_name, info.firmware_version)
|
print(info.model_name, info.firmware_version)
|
||||||
|
|
||||||
asyncio.run(main())
|
asyncio.run(main())
|
||||||
```
|
```
|
||||||
|
|
||||||
|
For the panel walkthrough — connect, list zones, react to push events — see the [tutorial](https://hai-omni-pro-ii.warehack.ing/tutorials/first-script/).
|
||||||
|
|
||||||
## Quick start (Home Assistant)
|
## Quick start (Home Assistant)
|
||||||
|
|
||||||
Copy `custom_components/omni_pca/` into your HA `config/custom_components/`, restart HA, then add the integration via Settings → Devices & Services. You'll need:
|
```bash
|
||||||
|
# Manual install — works on every HA flavour
|
||||||
|
cd /path/to/your/homeassistant/config/
|
||||||
|
mkdir -p custom_components
|
||||||
|
cd custom_components
|
||||||
|
git clone https://git.supported.systems/warehack.ing/omni-pca tmp-omni
|
||||||
|
cp -r tmp-omni/custom_components/omni_pca .
|
||||||
|
rm -rf tmp-omni
|
||||||
|
```
|
||||||
|
|
||||||
|
Restart HA, then add the integration via **Settings → Devices & Services**. You'll need:
|
||||||
|
|
||||||
- Panel IP / hostname
|
- Panel IP / hostname
|
||||||
- TCP port (default 4369)
|
- TCP port (default 4369)
|
||||||
- ControllerKey as 32 hex chars
|
- ControllerKey as 32 hex chars
|
||||||
|
|
||||||
Get the ControllerKey from your `.pca` file using the included parser:
|
Get the ControllerKey from your `.pca` file using the bundled CLI:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
uvx --from omni-pca omni-pca decode-pca path/to/Your.pca --field controller_key
|
omni-pca decode-pca '/path/to/Your.pca' --field controller_key
|
||||||
```
|
```
|
||||||
|
|
||||||
The integration creates one HA device per panel plus typed entities for every named object on the controller: `alarm_control_panel` for areas, `light` for units, `binary_sensor`/`switch` for zones (state + bypass), `climate` for thermostats, `sensor` for analog zones and panel telemetry, `button` for panel macros, and `event` for the typed push-notification stream. See [`custom_components/omni_pca/README.md`](custom_components/omni_pca/README.md) for the entity table and service list.
|
The integration creates one HA device per panel plus typed entities for every named object on the controller: `alarm_control_panel` for areas, `light` for units, `binary_sensor` + `switch` for zones (state + bypass), `climate` for thermostats, `sensor` for analog zones and panel telemetry, `button` for panel macros, and `event` for the typed push-notification stream. See [`custom_components/omni_pca/README.md`](custom_components/omni_pca/README.md) for the full entity + service catalog, or the [HA install how-to](https://hai-omni-pro-ii.warehack.ing/how-to/install-in-home-assistant/) for the step-by-step.
|
||||||
|
|
||||||
## Without a panel — mock controller
|
## Without a panel — mock controller
|
||||||
|
|
||||||
For testing, the library ships a minimal Omni controller emulator:
|
The library ships a stateful `MockPanel` that emulates the controller side of the protocol over real TCP. Useful for offline development, integration tests, and demos:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
from omni_pca.mock_panel import MockPanel
|
from omni_pca.mock_panel import MockPanel
|
||||||
|
|
||||||
async with MockPanel(controller_key=...).serve(port=14369):
|
async with MockPanel(controller_key=...).serve(port=14369):
|
||||||
# connect a real OmniClient to localhost:14369 — works end-to-end
|
# Connect a real OmniClient to localhost:14369 — full handshake + AES
|
||||||
...
|
...
|
||||||
```
|
```
|
||||||
|
|
||||||
|
The local dev stack (`dev/docker-compose.yml`) packages a real Home Assistant container and the mock panel side-by-side so you can click through the integration without touching real hardware. See [the dev-stack tutorial](https://hai-omni-pro-ii.warehack.ing/tutorials/dev-stack/).
|
||||||
|
|
||||||
|
## Tests
|
||||||
|
|
||||||
|
```bash
|
||||||
|
uv sync --group ha
|
||||||
|
uv run pytest -q
|
||||||
|
```
|
||||||
|
|
||||||
|
351 tests across the protocol primitives, the mock panel, the OmniClient ↔ MockPanel end-to-end roundtrip, and an in-process Home Assistant harness driving the integration via the real config flow + service calls.
|
||||||
|
|
||||||
## Versioning
|
## Versioning
|
||||||
|
|
||||||
Date-based ([CalVer](https://calver.org/)): `YYYY.M.D`. Bumped on backwards-incompatible changes.
|
Date-based ([CalVer](https://calver.org/)): `YYYY.M.D`. Bumped on backwards-incompatible changes. See [`CHANGELOG.md`](CHANGELOG.md).
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
MIT. See [`LICENSE`](LICENSE).
|
||||||
|
|
||||||
## Acknowledgments
|
## Acknowledgments
|
||||||
|
|
||||||
This client is independent and not affiliated with Leviton or HAI. Protocol details derived from clean-room analysis of the publicly-distributed PC Access installer.
|
This client is independent and not affiliated with Leviton or HAI. Protocol details derived from clean-room analysis of the publicly-distributed PC Access installer. The reverse-engineering arc is documented at <https://hai-omni-pro-ii.warehack.ing/journey/>.
|
||||||
|
|||||||
@ -14,7 +14,13 @@ from typing import TYPE_CHECKING
|
|||||||
from homeassistant.const import CONF_HOST, CONF_PORT, Platform
|
from homeassistant.const import CONF_HOST, CONF_PORT, Platform
|
||||||
from homeassistant.exceptions import ConfigEntryNotReady
|
from homeassistant.exceptions import ConfigEntryNotReady
|
||||||
|
|
||||||
from .const import CONF_CONTROLLER_KEY, DOMAIN, LOGGER
|
from .const import (
|
||||||
|
CONF_CONTROLLER_KEY,
|
||||||
|
CONF_TRANSPORT,
|
||||||
|
DEFAULT_TRANSPORT,
|
||||||
|
DOMAIN,
|
||||||
|
LOGGER,
|
||||||
|
)
|
||||||
from .coordinator import OmniDataUpdateCoordinator
|
from .coordinator import OmniDataUpdateCoordinator
|
||||||
from .services import async_setup_services, async_unload_services
|
from .services import async_setup_services, async_unload_services
|
||||||
|
|
||||||
@ -50,12 +56,14 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
|||||||
LOGGER.error("stored controller key for %s is corrupt: %s", entry.title, err)
|
LOGGER.error("stored controller key for %s is corrupt: %s", entry.title, err)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
transport: str = entry.data.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
|
||||||
coordinator = OmniDataUpdateCoordinator(
|
coordinator = OmniDataUpdateCoordinator(
|
||||||
hass,
|
hass,
|
||||||
entry,
|
entry,
|
||||||
host=host,
|
host=host,
|
||||||
port=port,
|
port=port,
|
||||||
controller_key=controller_key,
|
controller_key=controller_key,
|
||||||
|
transport=transport,
|
||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
@ -20,10 +20,14 @@ from omni_pca.connection import (
|
|||||||
|
|
||||||
from .const import (
|
from .const import (
|
||||||
CONF_CONTROLLER_KEY,
|
CONF_CONTROLLER_KEY,
|
||||||
|
CONF_TRANSPORT,
|
||||||
CONTROLLER_KEY_HEX_LEN,
|
CONTROLLER_KEY_HEX_LEN,
|
||||||
DEFAULT_PORT,
|
DEFAULT_PORT,
|
||||||
|
DEFAULT_TRANSPORT,
|
||||||
DOMAIN,
|
DOMAIN,
|
||||||
LOGGER,
|
LOGGER,
|
||||||
|
TRANSPORT_TCP,
|
||||||
|
TRANSPORT_UDP,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -60,6 +64,12 @@ _USER_SCHEMA = vol.Schema(
|
|||||||
vol.Coerce(int), vol.Range(min=1, max=65535)
|
vol.Coerce(int), vol.Range(min=1, max=65535)
|
||||||
),
|
),
|
||||||
vol.Required(CONF_CONTROLLER_KEY): str,
|
vol.Required(CONF_CONTROLLER_KEY): str,
|
||||||
|
# Most modern firmware uses TCP; some installers configure
|
||||||
|
# Network_UDP. PC Access stores the choice as
|
||||||
|
# enuPreferredNetworkProtocol in the .pca config.
|
||||||
|
vol.Required(CONF_TRANSPORT, default=DEFAULT_TRANSPORT): vol.In(
|
||||||
|
[TRANSPORT_TCP, TRANSPORT_UDP]
|
||||||
|
),
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -79,6 +89,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
|||||||
if user_input is not None:
|
if user_input is not None:
|
||||||
host: str = user_input[CONF_HOST].strip()
|
host: str = user_input[CONF_HOST].strip()
|
||||||
port: int = user_input[CONF_PORT]
|
port: int = user_input[CONF_PORT]
|
||||||
|
transport: str = user_input.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
|
||||||
unique_id = f"{host}:{port}"
|
unique_id = f"{host}:{port}"
|
||||||
|
|
||||||
await self.async_set_unique_id(unique_id)
|
await self.async_set_unique_id(unique_id)
|
||||||
@ -90,7 +101,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
|||||||
LOGGER.debug("controller key rejected: %s", err)
|
LOGGER.debug("controller key rejected: %s", err)
|
||||||
errors[CONF_CONTROLLER_KEY] = "invalid_key"
|
errors[CONF_CONTROLLER_KEY] = "invalid_key"
|
||||||
else:
|
else:
|
||||||
title, error = await self._probe(host, port, key)
|
title, error = await self._probe(host, port, key, transport)
|
||||||
if error is not None:
|
if error is not None:
|
||||||
errors["base"] = error
|
errors["base"] = error
|
||||||
else:
|
else:
|
||||||
@ -100,6 +111,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
|||||||
CONF_HOST: host,
|
CONF_HOST: host,
|
||||||
CONF_PORT: port,
|
CONF_PORT: port,
|
||||||
CONF_CONTROLLER_KEY: key.hex(),
|
CONF_CONTROLLER_KEY: key.hex(),
|
||||||
|
CONF_TRANSPORT: transport,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -121,6 +133,9 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
|||||||
assert self._reauth_entry_data is not None
|
assert self._reauth_entry_data is not None
|
||||||
host: str = self._reauth_entry_data[CONF_HOST]
|
host: str = self._reauth_entry_data[CONF_HOST]
|
||||||
port: int = self._reauth_entry_data[CONF_PORT]
|
port: int = self._reauth_entry_data[CONF_PORT]
|
||||||
|
transport: str = self._reauth_entry_data.get(
|
||||||
|
CONF_TRANSPORT, DEFAULT_TRANSPORT
|
||||||
|
)
|
||||||
errors: dict[str, str] = {}
|
errors: dict[str, str] = {}
|
||||||
|
|
||||||
if user_input is not None:
|
if user_input is not None:
|
||||||
@ -129,7 +144,7 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
|||||||
except InvalidControllerKey:
|
except InvalidControllerKey:
|
||||||
errors[CONF_CONTROLLER_KEY] = "invalid_key"
|
errors[CONF_CONTROLLER_KEY] = "invalid_key"
|
||||||
else:
|
else:
|
||||||
_, error = await self._probe(host, port, key)
|
_, error = await self._probe(host, port, key, transport)
|
||||||
if error is not None:
|
if error is not None:
|
||||||
errors["base"] = error
|
errors["base"] = error
|
||||||
else:
|
else:
|
||||||
@ -147,11 +162,20 @@ class OmniConfigFlow(ConfigFlow, domain=DOMAIN):
|
|||||||
# ---- helpers ---------------------------------------------------------
|
# ---- helpers ---------------------------------------------------------
|
||||||
|
|
||||||
async def _probe(
|
async def _probe(
|
||||||
self, host: str, port: int, key: bytes
|
self,
|
||||||
|
host: str,
|
||||||
|
port: int,
|
||||||
|
key: bytes,
|
||||||
|
transport: str = DEFAULT_TRANSPORT,
|
||||||
) -> tuple[str | None, str | None]:
|
) -> tuple[str | None, str | None]:
|
||||||
"""Try to connect once. Returns (title, error_code)."""
|
"""Try to connect once. Returns (title, error_code)."""
|
||||||
try:
|
try:
|
||||||
async with OmniClient(host, port=port, controller_key=key) as client:
|
async with OmniClient(
|
||||||
|
host,
|
||||||
|
port=port,
|
||||||
|
controller_key=key,
|
||||||
|
transport=transport, # type: ignore[arg-type]
|
||||||
|
) as client:
|
||||||
info = await client.get_system_information()
|
info = await client.get_system_information()
|
||||||
except (HandshakeError, InvalidEncryptionKeyError):
|
except (HandshakeError, InvalidEncryptionKeyError):
|
||||||
return None, "invalid_auth"
|
return None, "invalid_auth"
|
||||||
|
|||||||
@ -12,6 +12,11 @@ DEFAULT_PORT: Final = 4369
|
|||||||
DEFAULT_TIMEOUT: Final = 5.0
|
DEFAULT_TIMEOUT: Final = 5.0
|
||||||
|
|
||||||
CONF_CONTROLLER_KEY: Final = "controller_key"
|
CONF_CONTROLLER_KEY: Final = "controller_key"
|
||||||
|
CONF_TRANSPORT: Final = "transport"
|
||||||
|
|
||||||
|
TRANSPORT_TCP: Final = "tcp"
|
||||||
|
TRANSPORT_UDP: Final = "udp"
|
||||||
|
DEFAULT_TRANSPORT: Final = TRANSPORT_TCP
|
||||||
|
|
||||||
MANUFACTURER: Final = "HAI / Leviton"
|
MANUFACTURER: Final = "HAI / Leviton"
|
||||||
|
|
||||||
|
|||||||
@ -137,6 +137,7 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
|
|||||||
host: str,
|
host: str,
|
||||||
port: int,
|
port: int,
|
||||||
controller_key: bytes,
|
controller_key: bytes,
|
||||||
|
transport: str = "tcp",
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__(
|
super().__init__(
|
||||||
hass,
|
hass,
|
||||||
@ -148,6 +149,7 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
|
|||||||
self._host = host
|
self._host = host
|
||||||
self._port = port
|
self._port = port
|
||||||
self._controller_key = controller_key
|
self._controller_key = controller_key
|
||||||
|
self._transport = transport
|
||||||
self._client: OmniClient | None = None
|
self._client: OmniClient | None = None
|
||||||
self._discovery_done = False
|
self._discovery_done = False
|
||||||
self._discovered: OmniData | None = None
|
self._discovered: OmniData | None = None
|
||||||
@ -236,6 +238,7 @@ class OmniDataUpdateCoordinator(DataUpdateCoordinator[OmniData]):
|
|||||||
self._host,
|
self._host,
|
||||||
port=self._port,
|
port=self._port,
|
||||||
controller_key=self._controller_key,
|
controller_key=self._controller_key,
|
||||||
|
transport=self._transport, # type: ignore[arg-type]
|
||||||
)
|
)
|
||||||
# Drive __aenter__ manually so the client survives across update
|
# Drive __aenter__ manually so the client survives across update
|
||||||
# cycles; we close it explicitly on shutdown / failure.
|
# cycles; we close it explicitly on shutdown / failure.
|
||||||
|
|||||||
@ -20,7 +20,7 @@ import struct
|
|||||||
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
|
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import TYPE_CHECKING, Self
|
from typing import TYPE_CHECKING, Literal, Self
|
||||||
|
|
||||||
from .commands import Command, CommandFailedError, SecurityCommandResponse
|
from .commands import Command, CommandFailedError, SecurityCommandResponse
|
||||||
|
|
||||||
@ -120,12 +120,20 @@ class OmniClient:
|
|||||||
*,
|
*,
|
||||||
controller_key: bytes,
|
controller_key: bytes,
|
||||||
timeout: float = 5.0,
|
timeout: float = 5.0,
|
||||||
|
transport: Literal["tcp", "udp"] = "tcp",
|
||||||
|
udp_retry_count: int = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""``transport='udp'`` if your panel is configured for the
|
||||||
|
``Network_UDP`` connection type (some firmware versions and the
|
||||||
|
default for many installs). ``udp_retry_count`` is ignored on TCP.
|
||||||
|
"""
|
||||||
self._conn = OmniConnection(
|
self._conn = OmniConnection(
|
||||||
host=host,
|
host=host,
|
||||||
port=port,
|
port=port,
|
||||||
controller_key=controller_key,
|
controller_key=controller_key,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
|
transport=transport,
|
||||||
|
udp_retry_count=udp_retry_count,
|
||||||
)
|
)
|
||||||
self._subscriber_task: asyncio.Task[None] | None = None
|
self._subscriber_task: asyncio.Task[None] | None = None
|
||||||
|
|
||||||
|
|||||||
@ -26,6 +26,7 @@ import logging
|
|||||||
from collections.abc import AsyncIterator
|
from collections.abc import AsyncIterator
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
from .crypto import (
|
from .crypto import (
|
||||||
BLOCK_SIZE,
|
BLOCK_SIZE,
|
||||||
@ -104,18 +105,30 @@ class OmniConnection:
|
|||||||
port: int = _DEFAULT_PORT,
|
port: int = _DEFAULT_PORT,
|
||||||
controller_key: bytes = b"",
|
controller_key: bytes = b"",
|
||||||
timeout: float = 5.0,
|
timeout: float = 5.0,
|
||||||
|
transport: Literal["tcp", "udp"] = "tcp",
|
||||||
|
udp_retry_count: int = 3,
|
||||||
) -> None:
|
) -> None:
|
||||||
if len(controller_key) != 16:
|
if len(controller_key) != 16:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"controller_key must be 16 bytes, got {len(controller_key)}"
|
f"controller_key must be 16 bytes, got {len(controller_key)}"
|
||||||
)
|
)
|
||||||
|
if transport not in ("tcp", "udp"):
|
||||||
|
raise ValueError(f"transport must be 'tcp' or 'udp', got {transport!r}")
|
||||||
self._host = host
|
self._host = host
|
||||||
self._port = port
|
self._port = port
|
||||||
self._controller_key = bytes(controller_key)
|
self._controller_key = bytes(controller_key)
|
||||||
self._default_timeout = timeout
|
self._default_timeout = timeout
|
||||||
|
self._transport_kind: Literal["tcp", "udp"] = transport
|
||||||
|
self._udp_retry_count = max(0, udp_retry_count)
|
||||||
|
|
||||||
|
# TCP transport state
|
||||||
self._reader: asyncio.StreamReader | None = None
|
self._reader: asyncio.StreamReader | None = None
|
||||||
self._writer: asyncio.StreamWriter | None = None
|
self._writer: asyncio.StreamWriter | None = None
|
||||||
|
|
||||||
|
# UDP transport state (asyncio.DatagramTransport + our protocol)
|
||||||
|
self._udp_transport: asyncio.DatagramTransport | None = None
|
||||||
|
self._udp_protocol: _OmniDatagramProtocol | None = None
|
||||||
|
|
||||||
self._state = ConnectionState.DISCONNECTED
|
self._state = ConnectionState.DISCONNECTED
|
||||||
|
|
||||||
self._session_id: bytes | None = None
|
self._session_id: bytes | None = None
|
||||||
@ -167,22 +180,41 @@ class OmniConnection:
|
|||||||
await self.close()
|
await self.close()
|
||||||
|
|
||||||
async def connect(self) -> None:
|
async def connect(self) -> None:
|
||||||
"""Open the TCP socket and run the 4-step secure-session handshake."""
|
"""Open the socket and run the 4-step secure-session handshake.
|
||||||
|
|
||||||
|
Transport is set by the ``transport=`` constructor arg. TCP opens
|
||||||
|
a stream socket; UDP opens a datagram endpoint. Either way, the
|
||||||
|
handshake (and everything else) speaks the same Packet/Message
|
||||||
|
format and crypto.
|
||||||
|
"""
|
||||||
if self._state is not ConnectionState.DISCONNECTED:
|
if self._state is not ConnectionState.DISCONNECTED:
|
||||||
raise ConnectionError(f"already connecting/connected (state={self._state})")
|
raise ConnectionError(f"already connecting/connected (state={self._state})")
|
||||||
self._state = ConnectionState.CONNECTING
|
self._state = ConnectionState.CONNECTING
|
||||||
try:
|
try:
|
||||||
self._reader, self._writer = await asyncio.wait_for(
|
if self._transport_kind == "tcp":
|
||||||
asyncio.open_connection(self._host, self._port),
|
self._reader, self._writer = await asyncio.wait_for(
|
||||||
timeout=self._default_timeout,
|
asyncio.open_connection(self._host, self._port),
|
||||||
)
|
timeout=self._default_timeout,
|
||||||
|
)
|
||||||
|
self._reader_task = asyncio.create_task(
|
||||||
|
self._read_loop(), name=f"omni-conn-reader-{self._host}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# UDP: connectionless. We "connect" the datagram socket to
|
||||||
|
# the panel so we can reject stray datagrams from elsewhere
|
||||||
|
# and use plain `transport.sendto(data)`.
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
self._udp_transport, self._udp_protocol = (
|
||||||
|
await loop.create_datagram_endpoint(
|
||||||
|
lambda: _OmniDatagramProtocol(self),
|
||||||
|
remote_addr=(self._host, self._port),
|
||||||
|
)
|
||||||
|
)
|
||||||
except (TimeoutError, OSError) as exc:
|
except (TimeoutError, OSError) as exc:
|
||||||
self._state = ConnectionState.DISCONNECTED
|
self._state = ConnectionState.DISCONNECTED
|
||||||
raise ConnectionError(f"failed to open TCP socket: {exc}") from exc
|
raise ConnectionError(
|
||||||
|
f"failed to open {self._transport_kind.upper()} socket: {exc}"
|
||||||
self._reader_task = asyncio.create_task(
|
) from exc
|
||||||
self._read_loop(), name=f"omni-conn-reader-{self._host}"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await self._do_handshake()
|
await self._do_handshake()
|
||||||
@ -195,8 +227,36 @@ class OmniConnection:
|
|||||||
if self._closed:
|
if self._closed:
|
||||||
return
|
return
|
||||||
self._closed = True
|
self._closed = True
|
||||||
|
previous_state = self._state
|
||||||
self._state = ConnectionState.DISCONNECTED
|
self._state = ConnectionState.DISCONNECTED
|
||||||
|
|
||||||
|
# Politely tell the controller we're done — Omni is single-client,
|
||||||
|
# and on UDP it has no other way to know we've gone (TCP gets a
|
||||||
|
# FIN; UDP just sees datagrams stop). Without this, the panel
|
||||||
|
# holds the session slot until its idle timeout and rejects new
|
||||||
|
# connections from us with ControllerCannotStartNewSession.
|
||||||
|
if previous_state in (
|
||||||
|
ConnectionState.NEW_SESSION,
|
||||||
|
ConnectionState.SECURE,
|
||||||
|
ConnectionState.ONLINE,
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
term_seq = self._claim_seq()
|
||||||
|
term = Packet(
|
||||||
|
seq=term_seq,
|
||||||
|
type=PacketType.ClientSessionTerminated,
|
||||||
|
data=b"",
|
||||||
|
)
|
||||||
|
self._write_packet(term)
|
||||||
|
# Best-effort flush so the byte hits the wire before we
|
||||||
|
# tear down the socket. UDP is fire-and-forget; TCP needs
|
||||||
|
# a tick for the writer to drain.
|
||||||
|
if self._writer is not None:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
await self._writer.drain()
|
||||||
|
except Exception as exc: # noqa: BLE001 - close() must be idempotent
|
||||||
|
_log.debug("close: failed to send ClientSessionTerminated: %s", exc)
|
||||||
|
|
||||||
# Cancel anyone still waiting for a reply.
|
# Cancel anyone still waiting for a reply.
|
||||||
for fut in self._pending.values():
|
for fut in self._pending.values():
|
||||||
if not fut.done():
|
if not fut.done():
|
||||||
@ -212,6 +272,12 @@ class OmniConnection:
|
|||||||
self._writer = None
|
self._writer = None
|
||||||
self._reader = None
|
self._reader = None
|
||||||
|
|
||||||
|
if self._udp_transport is not None:
|
||||||
|
with contextlib.suppress(OSError):
|
||||||
|
self._udp_transport.close()
|
||||||
|
self._udp_transport = None
|
||||||
|
self._udp_protocol = None
|
||||||
|
|
||||||
if self._reader_task is not None and not self._reader_task.done():
|
if self._reader_task is not None and not self._reader_task.done():
|
||||||
self._reader_task.cancel()
|
self._reader_task.cancel()
|
||||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||||
@ -238,17 +304,39 @@ class OmniConnection:
|
|||||||
f"cannot send request, connection state={self._state.name}"
|
f"cannot send request, connection state={self._state.name}"
|
||||||
)
|
)
|
||||||
message = encode_v2(opcode, payload)
|
message = encode_v2(opcode, payload)
|
||||||
seq, fut = self._send_encrypted(message)
|
per_attempt_timeout = timeout if timeout is not None else self._default_timeout
|
||||||
try:
|
# UDP needs explicit retries since datagram delivery is best-effort.
|
||||||
reply_packet = await asyncio.wait_for(
|
# TCP gets reliable delivery for free; we still keep retry_count for
|
||||||
fut, timeout if timeout is not None else self._default_timeout
|
# API uniformity but it defaults to 0 effectively.
|
||||||
)
|
max_attempts = (
|
||||||
except TimeoutError as exc:
|
1 + self._udp_retry_count if self._transport_kind == "udp" else 1
|
||||||
self._pending.pop(seq, None)
|
)
|
||||||
raise RequestTimeoutError(
|
last_exc: Exception | None = None
|
||||||
f"no reply for opcode={int(opcode)} seq={seq}"
|
for attempt in range(1, max_attempts + 1):
|
||||||
) from exc
|
seq, fut = self._send_encrypted(message)
|
||||||
return self._decode_inner(reply_packet)
|
try:
|
||||||
|
reply_packet = await asyncio.wait_for(fut, per_attempt_timeout)
|
||||||
|
except TimeoutError as exc:
|
||||||
|
last_exc = exc
|
||||||
|
self._pending.pop(seq, None)
|
||||||
|
if attempt < max_attempts:
|
||||||
|
_log.debug(
|
||||||
|
"udp retry %d/%d on opcode=%d seq=%d",
|
||||||
|
attempt,
|
||||||
|
max_attempts,
|
||||||
|
int(opcode),
|
||||||
|
seq,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
raise RequestTimeoutError(
|
||||||
|
f"no reply for opcode={int(opcode)} "
|
||||||
|
f"after {max_attempts} attempt(s)"
|
||||||
|
) from last_exc
|
||||||
|
return self._decode_inner(reply_packet)
|
||||||
|
# Loop exit without return only via re-raised timeout above.
|
||||||
|
raise RequestTimeoutError(
|
||||||
|
f"request loop exited without reply for opcode={int(opcode)}"
|
||||||
|
)
|
||||||
|
|
||||||
def unsolicited(self) -> AsyncIterator[Message]:
|
def unsolicited(self) -> AsyncIterator[Message]:
|
||||||
"""Async iterator over unsolicited inbound messages (seq=0)."""
|
"""Async iterator over unsolicited inbound messages (seq=0)."""
|
||||||
@ -380,17 +468,23 @@ class OmniConnection:
|
|||||||
return seq, fut
|
return seq, fut
|
||||||
|
|
||||||
def _write_packet(self, pkt: Packet, *, encrypted: bool = False) -> None:
|
def _write_packet(self, pkt: Packet, *, encrypted: bool = False) -> None:
|
||||||
if self._writer is None:
|
|
||||||
raise ConnectionError("transport not open")
|
|
||||||
wire = pkt.encode()
|
wire = pkt.encode()
|
||||||
_log.debug(
|
_log.debug(
|
||||||
"TX seq=%d type=%s len=%d encrypted=%s",
|
"TX[%s] seq=%d type=%s len=%d encrypted=%s",
|
||||||
|
self._transport_kind,
|
||||||
pkt.seq,
|
pkt.seq,
|
||||||
pkt.type.name,
|
pkt.type.name,
|
||||||
len(pkt.data),
|
len(pkt.data),
|
||||||
encrypted,
|
encrypted,
|
||||||
)
|
)
|
||||||
self._writer.write(wire)
|
if self._transport_kind == "tcp":
|
||||||
|
if self._writer is None:
|
||||||
|
raise ConnectionError("transport not open")
|
||||||
|
self._writer.write(wire)
|
||||||
|
else:
|
||||||
|
if self._udp_transport is None:
|
||||||
|
raise ConnectionError("transport not open")
|
||||||
|
self._udp_transport.sendto(wire)
|
||||||
|
|
||||||
def _decode_inner(self, pkt: Packet) -> Message:
|
def _decode_inner(self, pkt: Packet) -> Message:
|
||||||
"""Decrypt + parse the inner ``Message`` from an OmniLink2Message packet."""
|
"""Decrypt + parse the inner ``Message`` from an OmniLink2Message packet."""
|
||||||
@ -596,3 +690,46 @@ class OmniConnection:
|
|||||||
return
|
return
|
||||||
if not fut.done():
|
if not fut.done():
|
||||||
fut.set_result(pkt)
|
fut.set_result(pkt)
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
# UDP transport
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class _OmniDatagramProtocol(asyncio.DatagramProtocol):
|
||||||
|
"""asyncio.DatagramProtocol bound to a single OmniConnection.
|
||||||
|
|
||||||
|
Each datagram received on a UDP Omni socket *is* one complete Packet
|
||||||
|
(no stream framing — that is the whole reason UDP is useful here).
|
||||||
|
We just decode it and hand it to the connection's dispatcher.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, conn: OmniConnection) -> None:
|
||||||
|
self._conn = conn
|
||||||
|
|
||||||
|
def connection_made(self, transport: asyncio.BaseTransport) -> None:
|
||||||
|
# transport is a DatagramTransport in this codepath.
|
||||||
|
pass
|
||||||
|
|
||||||
|
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
|
||||||
|
# Each datagram is a complete Packet — no stream framing.
|
||||||
|
# The TCP _dispatch already handles handshake routing, solicited
|
||||||
|
# replies, and unsolicited push, so we just delegate.
|
||||||
|
try:
|
||||||
|
pkt = Packet.decode(data)
|
||||||
|
except Exception as exc:
|
||||||
|
_log.warning("dropping malformed UDP datagram: %s", exc)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._conn._dispatch(pkt)
|
||||||
|
except Exception:
|
||||||
|
_log.exception("UDP dispatch crashed for seq=%d", pkt.seq)
|
||||||
|
|
||||||
|
def error_received(self, exc: Exception) -> None:
|
||||||
|
_log.warning("UDP socket error: %s", exc)
|
||||||
|
|
||||||
|
def connection_lost(self, exc: Exception | None) -> None:
|
||||||
|
if exc is not None:
|
||||||
|
_log.warning("UDP transport lost: %s", exc)
|
||||||
|
|
||||||
|
|||||||
@ -333,21 +333,41 @@ class MockPanel:
|
|||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def serve(
|
async def serve(
|
||||||
self, host: str = "127.0.0.1", port: int = 0
|
self,
|
||||||
|
host: str = "127.0.0.1",
|
||||||
|
port: int = 0,
|
||||||
|
transport: str = "tcp",
|
||||||
|
) -> AsyncIterator[tuple[str, int]]:
|
||||||
|
"""Start listening; yield ``(host, actual_port)``; tear down on exit.
|
||||||
|
|
||||||
|
``transport='tcp'`` (default) opens a stream server matching modern
|
||||||
|
PC Access. ``transport='udp'`` opens a datagram socket — the panel
|
||||||
|
path used by some firmware versions and by the Omni-Link II
|
||||||
|
``Network_UDP`` configuration. Same wire format either way.
|
||||||
|
"""
|
||||||
|
if transport == "tcp":
|
||||||
|
async with self._serve_tcp(host, port) as bound:
|
||||||
|
yield bound
|
||||||
|
elif transport == "udp":
|
||||||
|
async with self._serve_udp(host, port) as bound:
|
||||||
|
yield bound
|
||||||
|
else:
|
||||||
|
raise ValueError(f"transport must be 'tcp' or 'udp', got {transport!r}")
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _serve_tcp(
|
||||||
|
self, host: str, port: int
|
||||||
) -> AsyncIterator[tuple[str, int]]:
|
) -> AsyncIterator[tuple[str, int]]:
|
||||||
"""Start listening; yield ``(host, actual_port)``; tear down on exit."""
|
|
||||||
server = await asyncio.start_server(self._handle_client, host=host, port=port)
|
server = await asyncio.start_server(self._handle_client, host=host, port=port)
|
||||||
sockets = server.sockets or ()
|
sockets = server.sockets or ()
|
||||||
if not sockets: # pragma: no cover -- start_server always populates this
|
if not sockets: # pragma: no cover -- start_server always populates this
|
||||||
raise RuntimeError("asyncio.start_server returned no sockets")
|
raise RuntimeError("asyncio.start_server returned no sockets")
|
||||||
bound_host, bound_port = sockets[0].getsockname()[:2]
|
bound_host, bound_port = sockets[0].getsockname()[:2]
|
||||||
_log.debug("mock panel listening on %s:%d", bound_host, bound_port)
|
_log.debug("mock panel (tcp) listening on %s:%d", bound_host, bound_port)
|
||||||
try:
|
try:
|
||||||
async with server:
|
async with server:
|
||||||
yield bound_host, bound_port
|
yield bound_host, bound_port
|
||||||
finally:
|
finally:
|
||||||
# Cancel any in-flight push tasks so the test event loop
|
|
||||||
# tears down cleanly.
|
|
||||||
for t in list(self._push_tasks):
|
for t in list(self._push_tasks):
|
||||||
if not t.done():
|
if not t.done():
|
||||||
t.cancel()
|
t.cancel()
|
||||||
@ -356,6 +376,27 @@ class MockPanel:
|
|||||||
with contextlib.suppress(Exception): # pragma: no cover
|
with contextlib.suppress(Exception): # pragma: no cover
|
||||||
await server.wait_closed()
|
await server.wait_closed()
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _serve_udp(
|
||||||
|
self, host: str, port: int
|
||||||
|
) -> AsyncIterator[tuple[str, int]]:
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
transport, _protocol = await loop.create_datagram_endpoint(
|
||||||
|
lambda: _MockServerDatagramProtocol(self),
|
||||||
|
local_addr=(host, port),
|
||||||
|
)
|
||||||
|
sockname = transport.get_extra_info("sockname")
|
||||||
|
bound_host, bound_port = sockname[0], sockname[1]
|
||||||
|
_log.debug("mock panel (udp) listening on %s:%d", bound_host, bound_port)
|
||||||
|
try:
|
||||||
|
yield bound_host, bound_port
|
||||||
|
finally:
|
||||||
|
for t in list(self._push_tasks):
|
||||||
|
if not t.done():
|
||||||
|
t.cancel()
|
||||||
|
self._push_tasks.clear()
|
||||||
|
transport.close()
|
||||||
|
|
||||||
# -------- connection handling --------
|
# -------- connection handling --------
|
||||||
|
|
||||||
async def _handle_client(
|
async def _handle_client(
|
||||||
@ -1226,3 +1267,206 @@ async def _read_exact(reader: asyncio.StreamReader, n: int) -> bytes | None:
|
|||||||
return await reader.readexactly(n)
|
return await reader.readexactly(n)
|
||||||
except asyncio.IncompleteReadError:
|
except asyncio.IncompleteReadError:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
# UDP server protocol
|
||||||
|
# --------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class _MockServerDatagramProtocol(asyncio.DatagramProtocol):
|
||||||
|
"""Datagram-side implementation of the mock panel.
|
||||||
|
|
||||||
|
Tracks a single active client (the most-recent peer to issue a
|
||||||
|
``ClientRequestNewSession``) and routes its session state. The same
|
||||||
|
reply-builder helpers (``_reply_system_information``,
|
||||||
|
``_build_zone_properties``, etc.) on the parent ``MockPanel`` are
|
||||||
|
reused for both transports — the only thing UDP-specific here is
|
||||||
|
framing (each datagram is one complete Packet) and the absence of
|
||||||
|
per-block stream reads.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, panel: MockPanel) -> None:
|
||||||
|
self._panel = panel
|
||||||
|
self._transport: asyncio.DatagramTransport | None = None
|
||||||
|
self._client_addr: tuple[str, int] | None = None
|
||||||
|
self._session_id: bytes | None = None
|
||||||
|
self._session_key: bytes | None = None
|
||||||
|
|
||||||
|
def connection_made(self, transport: asyncio.BaseTransport) -> None:
|
||||||
|
self._transport = transport # type: ignore[assignment]
|
||||||
|
|
||||||
|
def connection_lost(self, exc: Exception | None) -> None:
|
||||||
|
if exc is not None:
|
||||||
|
_log.warning("mock panel (udp) transport lost: %s", exc)
|
||||||
|
|
||||||
|
def error_received(self, exc: Exception) -> None:
|
||||||
|
_log.warning("mock panel (udp) socket error: %s", exc)
|
||||||
|
|
||||||
|
def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
|
||||||
|
# Each datagram is a complete Packet. Spawn a task so we can do
|
||||||
|
# async work (drain push-event tasks, etc.) without blocking the
|
||||||
|
# protocol callback. We track the task so the GC doesn't drop the
|
||||||
|
# reference mid-flight (RUF006).
|
||||||
|
task = asyncio.create_task(
|
||||||
|
self._handle_datagram(data, addr),
|
||||||
|
name=f"mock-panel-udp-rx-{addr[0]}-{addr[1]}",
|
||||||
|
)
|
||||||
|
self._panel._push_tasks.add(task)
|
||||||
|
task.add_done_callback(self._panel._push_tasks.discard)
|
||||||
|
|
||||||
|
async def _handle_datagram(self, data: bytes, addr: tuple[str, int]) -> None:
|
||||||
|
try:
|
||||||
|
pkt = Packet.decode(data)
|
||||||
|
except Exception as exc:
|
||||||
|
_log.warning("mock panel (udp) malformed datagram from %s: %s", addr, exc)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._dispatch_packet(pkt, addr)
|
||||||
|
except Exception:
|
||||||
|
_log.exception("mock panel (udp) crashed on packet seq=%d", pkt.seq)
|
||||||
|
|
||||||
|
def _send(self, pkt: Packet, addr: tuple[str, int]) -> None:
|
||||||
|
if self._transport is None:
|
||||||
|
return
|
||||||
|
self._transport.sendto(pkt.encode(), addr)
|
||||||
|
|
||||||
|
async def _dispatch_packet(
|
||||||
|
self, pkt: Packet, addr: tuple[str, int]
|
||||||
|
) -> None:
|
||||||
|
if pkt.type is PacketType.ClientRequestNewSession:
|
||||||
|
session_id = self._panel._session_id_provider()
|
||||||
|
self._session_id = session_id
|
||||||
|
self._session_key = derive_session_key(
|
||||||
|
self._panel._controller_key, session_id
|
||||||
|
)
|
||||||
|
self._client_addr = addr
|
||||||
|
payload = bytes([_PROTO_HI, _PROTO_LO]) + session_id
|
||||||
|
self._send(
|
||||||
|
Packet(
|
||||||
|
seq=pkt.seq,
|
||||||
|
type=PacketType.ControllerAckNewSession,
|
||||||
|
data=payload,
|
||||||
|
),
|
||||||
|
addr,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
if pkt.type is PacketType.ClientRequestSecureSession:
|
||||||
|
if self._session_key is None or self._session_id is None:
|
||||||
|
_log.debug("mock panel (udp) secure-session before NewSession")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
plaintext = decrypt_message_payload(
|
||||||
|
pkt.data, pkt.seq, self._session_key
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
_log.debug("mock panel (udp) failed to decrypt secure-session")
|
||||||
|
return
|
||||||
|
if not plaintext.startswith(self._session_id):
|
||||||
|
self._send(
|
||||||
|
Packet(
|
||||||
|
seq=pkt.seq,
|
||||||
|
type=PacketType.ControllerSessionTerminated,
|
||||||
|
data=b"",
|
||||||
|
),
|
||||||
|
addr,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
ciphertext_out = encrypt_message_payload(
|
||||||
|
self._session_id, pkt.seq, self._session_key
|
||||||
|
)
|
||||||
|
self._send(
|
||||||
|
Packet(
|
||||||
|
seq=pkt.seq,
|
||||||
|
type=PacketType.ControllerAckSecureSession,
|
||||||
|
data=ciphertext_out,
|
||||||
|
),
|
||||||
|
addr,
|
||||||
|
)
|
||||||
|
self._panel._session_count += 1
|
||||||
|
return
|
||||||
|
|
||||||
|
if pkt.type is PacketType.ClientSessionTerminated:
|
||||||
|
self._session_id = None
|
||||||
|
self._session_key = None
|
||||||
|
self._client_addr = None
|
||||||
|
return
|
||||||
|
|
||||||
|
if pkt.type is PacketType.OmniLink2Message:
|
||||||
|
if self._session_key is None:
|
||||||
|
_log.debug("mock panel (udp) encrypted message before secure session")
|
||||||
|
return
|
||||||
|
await self._handle_encrypted_udp(pkt, addr)
|
||||||
|
return
|
||||||
|
|
||||||
|
_log.debug("mock panel (udp) unhandled packet type %s", pkt.type.name)
|
||||||
|
|
||||||
|
async def _handle_encrypted_udp(
|
||||||
|
self, pkt: Packet, addr: tuple[str, int]
|
||||||
|
) -> None:
|
||||||
|
assert self._session_key is not None
|
||||||
|
try:
|
||||||
|
plaintext = decrypt_message_payload(
|
||||||
|
pkt.data, pkt.seq, self._session_key
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
_log.debug("mock panel (udp) failed to decrypt v2 message")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
inner = Message.decode(plaintext)
|
||||||
|
except (MessageCrcError, MessageFormatError):
|
||||||
|
await self._send_reply(pkt.seq, _build_nak(0), addr)
|
||||||
|
return
|
||||||
|
|
||||||
|
opcode = inner.opcode
|
||||||
|
self._panel._last_request_opcode = opcode
|
||||||
|
# _dispatch_v2 is the same opcode-dispatch table the TCP path uses,
|
||||||
|
# returning (reply_message, push_event_words) so we can write the
|
||||||
|
# synchronous reply first then schedule an unsolicited push.
|
||||||
|
reply, push_words = self._panel._dispatch_v2(opcode, inner.payload)
|
||||||
|
await self._send_reply(pkt.seq, reply, addr)
|
||||||
|
if push_words:
|
||||||
|
self._schedule_udp_push(push_words, addr)
|
||||||
|
|
||||||
|
def _schedule_udp_push(
|
||||||
|
self, words: tuple[int, ...], addr: tuple[str, int]
|
||||||
|
) -> None:
|
||||||
|
"""Fire-and-forget unsolicited SystemEvents push to ``addr``."""
|
||||||
|
assert self._session_key is not None
|
||||||
|
session_key = self._session_key
|
||||||
|
|
||||||
|
async def _push() -> None:
|
||||||
|
await asyncio.sleep(_PUSH_DELAY)
|
||||||
|
try:
|
||||||
|
msg = _build_system_events_message(words)
|
||||||
|
plaintext = msg.encode()
|
||||||
|
ciphertext = encrypt_message_payload(plaintext, 0, session_key)
|
||||||
|
pkt = Packet(
|
||||||
|
seq=0,
|
||||||
|
type=PacketType.OmniLink2Message,
|
||||||
|
data=ciphertext,
|
||||||
|
)
|
||||||
|
self._send(pkt, addr)
|
||||||
|
except (ConnectionError, asyncio.CancelledError):
|
||||||
|
pass
|
||||||
|
except Exception: # pragma: no cover - diagnostic only
|
||||||
|
_log.exception("mock panel (udp) failed to push event")
|
||||||
|
|
||||||
|
task = asyncio.create_task(_push(), name="mock-panel-udp-event-push")
|
||||||
|
self._panel._push_tasks.add(task)
|
||||||
|
task.add_done_callback(self._panel._push_tasks.discard)
|
||||||
|
|
||||||
|
async def _send_reply(
|
||||||
|
self, client_seq: int, message: Message, addr: tuple[str, int]
|
||||||
|
) -> None:
|
||||||
|
assert self._session_key is not None
|
||||||
|
plaintext = message.encode()
|
||||||
|
ciphertext = encrypt_message_payload(plaintext, client_seq, self._session_key)
|
||||||
|
pkt = Packet(
|
||||||
|
seq=client_seq,
|
||||||
|
type=PacketType.OmniLink2Message,
|
||||||
|
data=ciphertext,
|
||||||
|
)
|
||||||
|
self._send(pkt, addr)
|
||||||
|
|||||||
152
tests/test_e2e_udp.py
Normal file
152
tests/test_e2e_udp.py
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
"""End-to-end: OmniClient ↔ MockPanel over UDP.
|
||||||
|
|
||||||
|
Mirrors test_e2e_client_mock.py but with ``transport='udp'`` on both
|
||||||
|
sides. The protocol/encryption/handshake bytes are identical to TCP;
|
||||||
|
this proves only the transport layer change is sound.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import secrets
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from omni_pca.client import ObjectType, OmniClient
|
||||||
|
from omni_pca.commands import CommandFailedError
|
||||||
|
from omni_pca.connection import ConnectionState, HandshakeError, OmniConnection
|
||||||
|
from omni_pca.events import UnitStateChanged
|
||||||
|
from omni_pca.mock_panel import (
|
||||||
|
MockAreaState,
|
||||||
|
MockButtonState,
|
||||||
|
MockPanel,
|
||||||
|
MockState,
|
||||||
|
MockThermostatState,
|
||||||
|
MockUnitState,
|
||||||
|
MockZoneState,
|
||||||
|
)
|
||||||
|
from omni_pca.models import (
|
||||||
|
AreaStatus,
|
||||||
|
SecurityMode,
|
||||||
|
)
|
||||||
|
from omni_pca.opcodes import OmniLink2MessageType
|
||||||
|
|
||||||
|
CONTROLLER_KEY = bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09")
|
||||||
|
|
||||||
|
|
||||||
|
def _populated_state() -> MockState:
|
||||||
|
return MockState(
|
||||||
|
zones={1: MockZoneState(name="FRONT_DOOR")},
|
||||||
|
units={1: MockUnitState(name="LIVING_LAMP")},
|
||||||
|
areas={1: MockAreaState(name="MAIN")},
|
||||||
|
thermostats={1: MockThermostatState(name="LIVING")},
|
||||||
|
buttons={1: MockButtonState(name="GOOD_MORNING")},
|
||||||
|
user_codes={1: 1234},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_udp_handshake_roundtrip() -> None:
|
||||||
|
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
|
||||||
|
async with (
|
||||||
|
panel.serve(transport="udp") as (host, port),
|
||||||
|
OmniConnection(
|
||||||
|
host=host,
|
||||||
|
port=port,
|
||||||
|
controller_key=CONTROLLER_KEY,
|
||||||
|
transport="udp",
|
||||||
|
timeout=2.0,
|
||||||
|
) as conn,
|
||||||
|
):
|
||||||
|
assert conn.state is ConnectionState.ONLINE
|
||||||
|
assert panel.session_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
async def test_udp_get_system_information() -> None:
|
||||||
|
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
|
||||||
|
async with (
|
||||||
|
panel.serve(transport="udp") as (host, port),
|
||||||
|
OmniConnection(
|
||||||
|
host=host,
|
||||||
|
port=port,
|
||||||
|
controller_key=CONTROLLER_KEY,
|
||||||
|
transport="udp",
|
||||||
|
timeout=2.0,
|
||||||
|
) as conn,
|
||||||
|
):
|
||||||
|
reply = await conn.request(OmniLink2MessageType.RequestSystemInformation)
|
||||||
|
assert reply.opcode == int(OmniLink2MessageType.SystemInformation)
|
||||||
|
# First payload byte is the model byte.
|
||||||
|
assert reply.payload[0] == 16 # OMNI_PRO_II
|
||||||
|
|
||||||
|
|
||||||
|
async def test_udp_arm_area_with_correct_code() -> None:
|
||||||
|
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
|
||||||
|
async with (
|
||||||
|
panel.serve(transport="udp") as (host, port),
|
||||||
|
OmniClient(
|
||||||
|
host=host,
|
||||||
|
port=port,
|
||||||
|
controller_key=CONTROLLER_KEY,
|
||||||
|
transport="udp",
|
||||||
|
timeout=2.0,
|
||||||
|
) as client,
|
||||||
|
):
|
||||||
|
await client.execute_security_command(
|
||||||
|
area=1, mode=SecurityMode.AWAY, code=1234,
|
||||||
|
)
|
||||||
|
statuses = await client.get_object_status(ObjectType.AREA, 1)
|
||||||
|
assert len(statuses) == 1
|
||||||
|
area = statuses[0]
|
||||||
|
assert isinstance(area, AreaStatus)
|
||||||
|
assert area.mode == int(SecurityMode.AWAY)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_udp_arm_with_wrong_code_raises() -> None:
|
||||||
|
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
|
||||||
|
async with panel.serve(transport="udp") as (host, port):
|
||||||
|
with pytest.raises(CommandFailedError):
|
||||||
|
async with OmniClient(
|
||||||
|
host=host,
|
||||||
|
port=port,
|
||||||
|
controller_key=CONTROLLER_KEY,
|
||||||
|
transport="udp",
|
||||||
|
timeout=2.0,
|
||||||
|
) as client:
|
||||||
|
await client.execute_security_command(
|
||||||
|
area=1, mode=SecurityMode.AWAY, code=9999,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_udp_unit_on_pushes_state_changed_event() -> None:
|
||||||
|
panel = MockPanel(controller_key=CONTROLLER_KEY, state=_populated_state())
|
||||||
|
async with (
|
||||||
|
panel.serve(transport="udp") as (host, port),
|
||||||
|
OmniClient(
|
||||||
|
host=host,
|
||||||
|
port=port,
|
||||||
|
controller_key=CONTROLLER_KEY,
|
||||||
|
transport="udp",
|
||||||
|
timeout=2.0,
|
||||||
|
) as client,
|
||||||
|
):
|
||||||
|
events = client.events()
|
||||||
|
await client.turn_unit_on(1)
|
||||||
|
ev = await asyncio.wait_for(events.__anext__(), timeout=1.0)
|
||||||
|
assert isinstance(ev, UnitStateChanged)
|
||||||
|
assert ev.unit_index == 1
|
||||||
|
assert ev.is_on is True
|
||||||
|
|
||||||
|
|
||||||
|
async def test_udp_wrong_key_fails_handshake() -> None:
|
||||||
|
panel = MockPanel(controller_key=CONTROLLER_KEY)
|
||||||
|
wrong_key = secrets.token_bytes(16)
|
||||||
|
async with panel.serve(transport="udp") as (host, port):
|
||||||
|
with pytest.raises(HandshakeError):
|
||||||
|
async with OmniConnection(
|
||||||
|
host=host,
|
||||||
|
port=port,
|
||||||
|
controller_key=wrong_key,
|
||||||
|
transport="udp",
|
||||||
|
timeout=2.0,
|
||||||
|
):
|
||||||
|
pass
|
||||||
Loading…
x
Reference in New Issue
Block a user