omni-pca/tests/test_client.py
Ryan Malloy 1901d6ec87 Async client + mock panel + e2e roundtrip
src/omni_pca/connection.py — low-level OmniConnection
- 4-step secure-session handshake (NewSession, SecureSession)
- Per-direction monotonic seq with 0xFFFF -> 1 wraparound (skips 0)
- TCP framing: read first 16-byte block, decrypt, learn length, read rest
- Reader task dispatches solicited replies to Future, unsolicited to queue
- Custom exceptions: HandshakeError, InvalidEncryptionKeyError, ProtocolError,
  RequestTimeoutError

src/omni_pca/models.py — typed response objects
- SystemInformation (with model_name lookup), SystemStatus, ZoneProperties,
  UnitProperties, AreaProperties — all frozen+slots dataclasses with
  .parse(payload) classmethods

src/omni_pca/client.py — high-level OmniClient
- get_system_information / get_system_status / get_object_properties
- list_{zone,unit,area}_names walks via RequestProperties rel=1
- subscribe(callback) for unsolicited messages

src/omni_pca/mock_panel.py — async TCP server emulating an Omni Pro II
- Full handshake (controller side), seedable MockState
- Implements RequestSystemInformation, RequestSystemStatus,
  RequestProperties (Zone/Unit/Area, both absolute and rel=1 iteration
  with EOD termination); Nak for everything else
- 'omni-pca mock-panel' CLI subcommand

tests/ — 85 passed, 1 skip (live fixture)
- 23 unit tests for connection/models/client (canned-server fixtures)
- 7 unit tests for mock panel (raw protocol drive)
- 6 e2e tests: real OmniClient over real TCP to real MockPanel,
  proves handshake + AES + whitening + sequencing all agree
2026-05-10 13:02:49 -06:00

180 lines
5.7 KiB
Python

"""Unit tests for omni_pca.client — typed request methods.
The fixture is a tiny in-process asyncio server that completes the
handshake then serves whichever opcode the test wants. No mock_panel
dependency.
"""
from __future__ import annotations
import asyncio
import contextlib
import struct
from collections.abc import Awaitable, Callable
import pytest
from omni_pca.client import ObjectType, OmniClient
from omni_pca.crypto import (
decrypt_message_payload,
encrypt_message_payload,
)
from omni_pca.message import Message, encode_v2
from omni_pca.opcodes import OmniLink2MessageType, PacketType
from .test_connection import ( # reuse handshake helpers
CONTROLLER_KEY,
SESSION_ID,
_do_full_handshake,
_pack_header,
_start_server,
)
def _name_field(name: str, width: int) -> bytes:
encoded = name.encode("latin-1")
return encoded + b"\x00" * (width - len(encoded))
def _build_system_information_payload() -> bytes:
return bytes([16, 2, 12, 1]) + _name_field("415-555-1212", 24)
def _build_zone_properties_payload(index: int, name: str) -> bytes:
return (
bytes([1])
+ struct.pack(">H", index)
+ bytes([0, 0, 0, 1, 0])
+ _name_field(name, 15)
)
async def _read_one_request(
reader: asyncio.StreamReader, session_key: bytes
) -> tuple[int, Message]:
"""Read one OmniLink2Message packet from the client; return (seq, inner Message)."""
header = await reader.readexactly(4)
seq = (header[0] << 8) | header[1]
type_byte = header[2]
assert type_byte == int(PacketType.OmniLink2Message)
first = await reader.readexactly(16)
plain_first = decrypt_message_payload(first, seq, session_key)
msg_len = plain_first[1]
remaining_inner = msg_len + 4 - 16
if remaining_inner <= 0:
extra = 0
else:
pad = (-remaining_inner) % 16
extra = remaining_inner + pad
rest = await reader.readexactly(extra) if extra else b""
full_ct = first + rest
full_plain = decrypt_message_payload(full_ct, seq, session_key)
inner = Message.decode(full_plain)
return seq, inner
def _send_reply(
writer: asyncio.StreamWriter,
seq: int,
opcode: OmniLink2MessageType,
payload: bytes,
session_key: bytes,
) -> None:
inner = encode_v2(opcode, payload)
ct = encrypt_message_payload(inner.encode(), seq, session_key)
writer.write(_pack_header(seq, int(PacketType.OmniLink2Message)) + ct)
async def _serve_one_reply(
handler_replies: dict[int, tuple[OmniLink2MessageType, bytes]],
) -> Callable[[asyncio.StreamReader, asyncio.StreamWriter], Awaitable[None]]:
"""Build a handler that does the handshake then replies once per opcode received."""
async def handler(r: asyncio.StreamReader, w: asyncio.StreamWriter) -> None:
try:
sk = await _do_full_handshake(r, w)
for _ in range(len(handler_replies)):
seq, inner = await _read_one_request(r, sk)
opcode = inner.opcode
if opcode not in handler_replies:
return
reply_op, reply_payload = handler_replies[opcode]
_send_reply(w, seq, reply_op, reply_payload, sk)
await w.drain()
with contextlib.suppress(TimeoutError):
await asyncio.wait_for(asyncio.Event().wait(), timeout=2.0)
finally:
w.close()
return handler
@pytest.mark.asyncio
async def test_client_get_system_information_round_trip() -> None:
handler = await _serve_one_reply(
{
int(OmniLink2MessageType.RequestSystemInformation): (
OmniLink2MessageType.SystemInformation,
_build_system_information_payload(),
)
}
)
server, host, port = await _start_server(handler)
try:
async with OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as c:
info = await c.get_system_information()
assert info.model_byte == 16
assert info.model_name == "Omni Pro II"
assert info.firmware_version == "2.12r1"
assert info.local_phone == "415-555-1212"
finally:
server.close()
await server.wait_closed()
@pytest.mark.asyncio
async def test_client_get_zone_properties_round_trip() -> None:
handler = await _serve_one_reply(
{
int(OmniLink2MessageType.RequestProperties): (
OmniLink2MessageType.Properties,
_build_zone_properties_payload(7, "Front Door"),
)
}
)
server, host, port = await _start_server(handler)
try:
async with OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as c:
zone = await c.get_object_properties(ObjectType.ZONE, 7)
assert zone.index == 7
assert zone.name == "Front Door"
finally:
server.close()
await server.wait_closed()
@pytest.mark.asyncio
async def test_client_get_object_properties_eod_raises_value_error() -> None:
"""A ``EOD`` reply means the panel has no object at that index."""
handler = await _serve_one_reply(
{
int(OmniLink2MessageType.RequestProperties): (
OmniLink2MessageType.EOD,
b"",
)
}
)
server, host, port = await _start_server(handler)
try:
async with OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as c:
with pytest.raises(ValueError, match="no ZONE"):
await c.get_object_properties(ObjectType.ZONE, 999)
finally:
server.close()
await server.wait_closed()
# Keep `SESSION_ID` reachable so ruff doesn't complain about unused
# imports — it's used implicitly by `_do_full_handshake`.
assert SESSION_ID