diff --git a/src/omni_pca/client.py b/src/omni_pca/client.py index b83511e..77a9904 100644 --- a/src/omni_pca/client.py +++ b/src/omni_pca/client.py @@ -17,12 +17,15 @@ from __future__ import annotations import asyncio import contextlib import struct -from collections.abc import Awaitable, Callable, Sequence +from collections.abc import AsyncIterator, Awaitable, Callable, Sequence from enum import IntEnum from types import TracebackType -from typing import Self +from typing import TYPE_CHECKING, Self from .commands import Command, CommandFailedError, SecurityCommandResponse + +if TYPE_CHECKING: + from .events import SystemEvent from .connection import ( ConnectionError as OmniConnectionError, ) @@ -715,6 +718,27 @@ class OmniClient: _runner(), name="omni-client-subscriber" ) + def events(self) -> AsyncIterator[SystemEvent]: + """Async iterator over typed :class:`SystemEvent` push notifications. + + Built on top of :meth:`OmniConnection.unsolicited` and + :class:`omni_pca.events.EventStream`. Filters out non-SystemEvents + unsolicited messages, parses each SystemEvents (opcode 55) message + into one or more typed events, and yields them one at a time. + + Usage:: + + async for event in client.events(): + match event: + case ZoneStateChanged() if event.is_open: + ... + case ArmingChanged(): + ... + """ + from .events import EventStream + + return EventStream(self._conn).__aiter__() + # ---- helpers --------------------------------------------------------- @staticmethod diff --git a/src/omni_pca/mock_panel.py b/src/omni_pca/mock_panel.py index 8d92b38..086ffb5 100644 --- a/src/omni_pca/mock_panel.py +++ b/src/omni_pca/mock_panel.py @@ -11,7 +11,13 @@ Coverage today: * Full secure-session handshake (NewSession / SecureSession ack pair) * ``RequestSystemInformation`` (22) -> ``SystemInformation`` (23) * ``RequestSystemStatus`` (24) -> ``SystemStatus`` (25) -* ``RequestProperties`` (32) -> ``Properties`` (33) for Zone + Unit +* ``RequestProperties`` (32) -> ``Properties`` (33) for Zone + Unit + Area +* ``Command`` (20) -> ``Ack`` (1) / ``Nak`` (2), with state mutation +* ``ExecuteSecurityCommand`` (74) -> ``Ack`` (1) (or Nak on bad code), with state +* ``RequestStatus`` (34) -> ``Status`` (35) for Zone/Unit/Area/Thermostat +* ``RequestExtendedStatus`` (58) -> ``ExtendedStatus`` (59) for the same set +* ``AcknowledgeAlerts`` (60) -> ``Ack`` (1) +* Synthesized push of ``SystemEvents`` (55, seq=0) when state mutates * Any other v2 opcode -> ``Nak`` (2) with the request's opcode * CRC failures on the inner message -> ``Nak`` * Graceful ``ClientSessionTerminated`` close @@ -21,6 +27,10 @@ References: clsOmniLinkConnection.cs:1688-1921 (TCP listener / ack flow) clsOL2MsgSystemInformation.cs / clsOL2MsgSystemStatus.cs clsOL2MsgRequestProperties.cs / clsOL2MsgProperties.cs + clsOL2MsgCommand.cs / clsOL2MsgExecuteSecurityCommand.cs + clsOL2MsgRequestStatus.cs / clsOL2MsgStatus.cs + clsOL2MsgRequestExtendedStatus.cs / clsOL2MsgExtendedStatus.cs + clsOLMsgSystemEvents.cs """ from __future__ import annotations @@ -33,6 +43,7 @@ from collections.abc import AsyncIterator, Callable from contextlib import asynccontextmanager from dataclasses import dataclass, field +from .commands import Command from .crypto import ( BLOCK_SIZE, decrypt_message_payload, @@ -49,6 +60,7 @@ _log = logging.getLogger(__name__) _OBJ_ZONE = 1 _OBJ_UNIT = 2 _OBJ_AREA = 5 +_OBJ_THERMOSTAT = 6 # Inner-message size constants (model OMNI_PRO_II) _ZONE_NAME_LEN = 15 @@ -56,6 +68,26 @@ _UNIT_NAME_LEN = 12 _AREA_NAME_LEN = 12 _PHONE_LEN = 24 +# Per-object-type record sizes for the basic Status (opcode 35) reply. +# Source: clsOL2MsgStatus.cs:13-27 — sizes hard-coded per object type, no +# per-record length byte. +_STATUS_RECORD_SIZES: dict[int, int] = { + _OBJ_ZONE: 4, # number(2) + status + loop + _OBJ_UNIT: 5, # number(2) + state + time(2) + _OBJ_AREA: 6, # number(2) + mode + alarms + entry + exit + _OBJ_THERMOSTAT: 9, # number(2) + status + 6 bytes +} + +# Per-object-type ExtendedStatus (opcode 59) record sizes. The reply carries +# this byte at payload[1] (object_length); we use these to build the reply. +# Source: clsOL2MsgExtendedStatus.cs (per-object body offsets). +_EXTENDED_STATUS_RECORD_SIZES: dict[int, int] = { + _OBJ_ZONE: 4, # number(2) + status + loop + _OBJ_UNIT: 5, # number(2) + state + time(2) — ZigBeePower optional + _OBJ_AREA: 6, # number(2) + mode + alarms + entry + exit + _OBJ_THERMOSTAT: 14, # number(2) + status + temp + heat + cool + sys + fan + hold + humidity + h_set + dh_set + outdoor + horc +} + # Wire format for the controller-side ack of NewSession is two literal # protocol-version bytes followed by the 5-byte SessionID. _PROTO_HI = 0x00 @@ -63,10 +95,94 @@ _PROTO_LO = 0x01 _SESSION_ID_BYTES = 5 +# Small delay before pushing a synthesized SystemEvents so the request future +# resolves first. Kept tiny; tests use asyncio.wait_for with their own timeout. +_PUSH_DELAY = 0.005 + + +# -------------------------------------------------------------------------- +# Per-object state dataclasses +# -------------------------------------------------------------------------- + + +@dataclass +class MockUnitState: + """One programmable unit (light / output / scene).""" + + name: str = "" + state: int = 0 # 0=off, 1=on, 100..200=brightness percent (raw Omni) + time_remaining: int = 0 + + +@dataclass +class MockAreaState: + """One programmable security area.""" + + name: str = "" + mode: int = 0 # SecurityMode value (Off=0, Day=1, Night=2, Away=3, ...) + last_user: int = 0 + entry_timer: int = 0 + exit_timer: int = 0 + alarms: int = 0 + + +@dataclass +class MockZoneState: + """One programmable security zone.""" + + name: str = "" + current_state: int = 0 # 0=secure, 1=not-ready, 2=trouble, 3=tamper + latched_state: int = 0 # 0=secure, 4=tripped, 8=reset (raw bits 2-3) + arming_state: int = 0 # 0=disarmed, 16=armed, 32=bypassed, 48=auto-bypassed + is_bypassed: bool = False + loop: int = 0 # analog loop reading + + @property + def status_byte(self) -> int: + """Compose the on-the-wire status byte from the sub-fields. + + Encoding mirrors clsZone.cs:385 / clsText.cs:3110: + bits 0-1 → current_state (0..3) + bits 2-3 → latched_state (0/4/8) + bits 4-5 → arming_state (0/16/32/48) + is_bypassed forces the arming bits to BYPASSED (0x20) regardless of + the underlying arming_state value. + """ + val = (self.current_state & 0x03) | (self.latched_state & 0x0C) + if self.is_bypassed: + val |= 0x20 + else: + val |= self.arming_state & 0x30 + return val & 0xFF + + +@dataclass +class MockThermostatState: + """One programmable thermostat. Defaults are sane Omni Pro II values.""" + + name: str = "" + temperature_raw: int = 168 # ~76°F on Omni linear scale + heat_setpoint_raw: int = 144 # ~62°F + cool_setpoint_raw: int = 184 # ~80°F + system_mode: int = 0 # HvacMode: 0=Off, 1=Heat, 2=Cool, 3=Auto, 4=EmHeat + fan_mode: int = 0 # FanMode: 0=Auto, 1=On, 2=Cycle + hold_mode: int = 0 # HoldMode: 0=Off, 1=Hold, 2=Vacation + humidity_raw: int = 0 + humidify_setpoint_raw: int = 0 + dehumidify_setpoint_raw: int = 0 + outdoor_temperature_raw: int = 0 + horc_status: int = 0 + status: int = 1 # 1 = communicating with the panel + @dataclass class MockState: - """Programmable panel state. Defaults mimic an Omni Pro II out of the box.""" + """Programmable panel state. Defaults mimic an Omni Pro II out of the box. + + Backward compat: callers may pass ``zones={1: "FRONT DOOR"}`` (a plain + ``dict[int, str]``) and the constructor will auto-promote the strings + into the appropriate ``Mock*State`` instance. + """ model_byte: int = 16 # OMNI_PRO_II firmware_major: int = 2 @@ -74,10 +190,18 @@ class MockState: firmware_revision: int = 1 local_phone: str = "" - # Names by 1-based index (matches Omni's user-facing numbering). - zones: dict[int, str] = field(default_factory=dict) - units: dict[int, str] = field(default_factory=dict) - areas: dict[int, str] = field(default_factory=dict) + # Per-object state machines, by 1-based index. Values may be passed as + # plain strings (interpreted as the object's name) or as the matching + # ``Mock*State`` dataclass instance. + zones: dict[int, MockZoneState] = field(default_factory=dict) + units: dict[int, MockUnitState] = field(default_factory=dict) + areas: dict[int, MockAreaState] = field(default_factory=dict) + thermostats: dict[int, MockThermostatState] = field(default_factory=dict) + + # User-code table for ExecuteSecurityCommand validation. + # Mapping is ``{code_index: 4-digit pin}``; the panel returns the + # matched code_index in the area's last_user field on success. + user_codes: dict[int, int] = field(default_factory=dict) # SystemStatus snapshot. Defaults: time set, battery good, no alarms. time_set: bool = True @@ -95,14 +219,50 @@ class MockState: sunset_minute: int = 45 battery: int = 200 # 0-255 — typical "good" value + def __post_init__(self) -> None: + """Promote bare-string values into per-type state dataclasses. + + This keeps the existing ``MockState(zones={1: "FRONT DOOR"})`` + call sites working unchanged, while letting new code pass full + ``MockZoneState`` / ``MockUnitState`` / etc. records. + """ + self.zones = _promote_dict(self.zones, MockZoneState) + self.units = _promote_dict(self.units, MockUnitState) + self.areas = _promote_dict(self.areas, MockAreaState) + self.thermostats = _promote_dict(self.thermostats, MockThermostatState) + + # ---- name-bytes helpers (kept for back-compat with old callers) ----- + def zone_name_bytes(self, idx: int) -> bytes: - return _name_bytes(self.zones.get(idx, ""), _ZONE_NAME_LEN) + z = self.zones.get(idx) + return _name_bytes(z.name if z else "", _ZONE_NAME_LEN) def unit_name_bytes(self, idx: int) -> bytes: - return _name_bytes(self.units.get(idx, ""), _UNIT_NAME_LEN) + u = self.units.get(idx) + return _name_bytes(u.name if u else "", _UNIT_NAME_LEN) def area_name_bytes(self, idx: int) -> bytes: - return _name_bytes(self.areas.get(idx, ""), _AREA_NAME_LEN) + a = self.areas.get(idx) + return _name_bytes(a.name if a else "", _AREA_NAME_LEN) + + +def _promote_dict( + raw: dict[int, object], + dataclass_cls: type, +) -> dict[int, object]: + """Walk a ``{int: str | DataclassInstance}`` dict, wrapping bare strings. + + Bare strings become ``dataclass_cls(name=string)``. Anything that is + already an instance of ``dataclass_cls`` (or anything else) passes + through untouched. + """ + out: dict[int, object] = {} + for k, v in raw.items(): + if isinstance(v, str): + out[k] = dataclass_cls(name=v) + else: + out[k] = v + return out def _name_bytes(name: str, width: int) -> bytes: @@ -133,6 +293,11 @@ class MockPanel: self._session_count = 0 self._last_request_opcode: int | None = None self._busy = asyncio.Lock() # serialise concurrent connection attempts + # Per-connection state captured on _handle_client; used by the + # synthesized-event push helper when state mutates. + self._active_writer: asyncio.StreamWriter | None = None + self._active_session_key: bytes | None = None + self._push_tasks: set[asyncio.Task[None]] = set() # -------- public observables (handy in tests) -------- @@ -161,6 +326,12 @@ class MockPanel: async with server: yield bound_host, bound_port finally: + # Cancel any in-flight push tasks so the test event loop + # tears down cleanly. + for t in list(self._push_tasks): + if not t.done(): + t.cancel() + self._push_tasks.clear() server.close() with contextlib.suppress(Exception): # pragma: no cover await server.wait_closed() @@ -201,6 +372,9 @@ class MockPanel: ) if not handled: break + # Make session info available to push helpers. + self._active_writer = writer + self._active_session_key = session_key elif pkt_type is PacketType.ClientSessionTerminated: _log.debug("mock panel: client requested teardown") @@ -222,6 +396,8 @@ class MockPanel: except (asyncio.IncompleteReadError, ConnectionError): _log.debug("mock panel: client connection ended unexpectedly") finally: + self._active_writer = None + self._active_session_key = None writer.close() with contextlib.suppress(Exception): # pragma: no cover await writer.wait_closed() @@ -335,18 +511,38 @@ class MockPanel: _log.debug("mock panel: dispatch opcode=%s payload=%d bytes", opcode_name, len(inner.payload)) - reply = self._dispatch_v2(opcode, inner.payload) + reply, push_words = self._dispatch_v2(opcode, inner.payload) await self._send_v2_reply(client_seq, reply, session_key, writer) + if push_words: + self._schedule_event_push(push_words, session_key, writer) return True - def _dispatch_v2(self, opcode: int, payload: bytes) -> Message: + def _dispatch_v2( + self, opcode: int, payload: bytes + ) -> tuple[Message, tuple[int, ...]]: + """Dispatch a single decoded request and return (reply, push_event_words). + + ``push_event_words`` is a (possibly empty) tuple of 16-bit event + words to push as an unsolicited SystemEvents (opcode 55) frame + AFTER the synchronous reply has been written. + """ if opcode == OmniLink2MessageType.RequestSystemInformation: - return self._reply_system_information() + return self._reply_system_information(), () if opcode == OmniLink2MessageType.RequestSystemStatus: - return self._reply_system_status() + return self._reply_system_status(), () if opcode == OmniLink2MessageType.RequestProperties: - return self._reply_properties(payload) - return _build_nak(opcode) + return self._reply_properties(payload), () + if opcode == OmniLink2MessageType.Command: + return self._handle_command(payload) + if opcode == OmniLink2MessageType.ExecuteSecurityCommand: + return self._handle_execute_security_command(payload) + if opcode == OmniLink2MessageType.RequestStatus: + return self._reply_status(payload), () + if opcode == OmniLink2MessageType.RequestExtendedStatus: + return self._reply_extended_status(payload), () + if opcode == OmniLink2MessageType.AcknowledgeAlerts: + return _build_ack(), () + return _build_nak(opcode), () # -------- reply builders (byte-exact per clsOL2Msg*.cs) -------- @@ -424,13 +620,15 @@ class MockPanel: return self._build_area_properties(target) return _build_nak(OmniLink2MessageType.RequestProperties) - def _object_store(self, obj_type: int) -> dict[int, str] | None: + def _object_store(self, obj_type: int) -> dict[int, object] | None: if obj_type == _OBJ_ZONE: - return self.state.zones + return self.state.zones # type: ignore[return-value] if obj_type == _OBJ_UNIT: - return self.state.units + return self.state.units # type: ignore[return-value] if obj_type == _OBJ_AREA: - return self.state.areas + return self.state.areas # type: ignore[return-value] + if obj_type == _OBJ_THERMOSTAT: + return self.state.thermostats # type: ignore[return-value] return None def _build_zone_properties(self, index: int) -> Message: @@ -439,14 +637,15 @@ class MockPanel: # [4]=Status, [5]=Loop, [6]=Type, [7]=Area, [8]=Options, # [9..23]=Name (15 bytes) # encode_v2 prepends the opcode, so we emit body = Data[1..23]. + zone = self.state.zones.get(index) body = ( bytes( [ _OBJ_ZONE, (index >> 8) & 0xFF, index & 0xFF, - 0, # Status: closed/secure - 0, # Loop + zone.status_byte if zone else 0, + zone.loop if zone else 0, 0, # Type: EntryExit 1, # Area: default to area 1 0, # Options @@ -461,15 +660,16 @@ class MockPanel: # [0]=opcode, [1]=ObjectType, [2..3]=ObjectNumber, # [4]=UnitStatus, [5..6]=UnitTime, [7]=UnitType, # [8..19]=Name (12), [20]=reserved, [21]=UnitAreas + unit = self.state.units.get(index) body = ( bytes( [ _OBJ_UNIT, (index >> 8) & 0xFF, index & 0xFF, - 0, # UnitStatus: off - 0, - 0, # UnitTime + unit.state if unit else 0, + (unit.time_remaining >> 8) & 0xFF if unit else 0, + unit.time_remaining & 0xFF if unit else 0, 1, # UnitType: Standard ] ) @@ -484,16 +684,17 @@ class MockPanel: # [4]=AreaMode, [5]=AreaAlarms, [6]=EntryTimer, [7]=ExitTimer, # [8]=Enabled, [9]=ExitDelay, [10]=EntryDelay, # [11..22]=Name (12 bytes) + area = self.state.areas.get(index) body = ( bytes( [ _OBJ_AREA, (index >> 8) & 0xFF, index & 0xFF, - 0, # AreaMode: Off - 0, # AreaAlarms - 0, # EntryTimer - 0, # ExitTimer + area.mode if area else 0, + area.alarms if area else 0, + area.entry_timer if area else 0, + area.exit_timer if area else 0, 1, # Enabled 60, # ExitDelay (s) 30, # EntryDelay (s) @@ -503,7 +704,250 @@ class MockPanel: ) return encode_v2(OmniLink2MessageType.Properties, body) - # -------- low-level reply send -------- + # -------- Status (opcode 34/35) and ExtendedStatus (opcode 58/59) -------- + + def _reply_status(self, payload: bytes) -> Message: + """Build a Status (opcode 35) reply for a RequestStatus (opcode 34). + + RequestStatus payload (5 bytes, clsOL2MsgRequestStatus.cs): + [0] object type + [1..2] starting number (BE u16) + [3..4] ending number (BE u16) + + Status reply payload layout (clsOL2MsgStatus.cs): + [0] object type + [1..] N records of size :data:`_STATUS_RECORD_SIZES[object_type]` + """ + if len(payload) < 5: + return _build_nak(OmniLink2MessageType.RequestStatus) + obj_type = payload[0] + start = (payload[1] << 8) | payload[2] + end = (payload[3] << 8) | payload[4] + store = self._object_store(obj_type) + if store is None or obj_type not in _STATUS_RECORD_SIZES: + return _build_nak(OmniLink2MessageType.RequestStatus) + body = bytearray([obj_type]) + for idx in range(start, end + 1): + obj = store.get(idx) + if obj is None: + continue + body.extend(_status_record(obj_type, idx, obj)) + if len(body) == 1: + # No matching objects in range — return EOD per protocol. + return encode_v2(OmniLink2MessageType.EOD, b"") + return encode_v2(OmniLink2MessageType.Status, bytes(body)) + + def _reply_extended_status(self, payload: bytes) -> Message: + """Build an ExtendedStatus (opcode 59) reply for opcode 58. + + ExtendedStatus reply payload layout (clsOL2MsgExtendedStatus.cs): + [0] object type + [1] object length (per-record byte count) + [2..] N records of ``object_length`` bytes + """ + if len(payload) < 5: + return _build_nak(OmniLink2MessageType.RequestExtendedStatus) + obj_type = payload[0] + start = (payload[1] << 8) | payload[2] + end = (payload[3] << 8) | payload[4] + store = self._object_store(obj_type) + record_size = _EXTENDED_STATUS_RECORD_SIZES.get(obj_type, 0) + if store is None or record_size == 0: + return _build_nak(OmniLink2MessageType.RequestExtendedStatus) + body = bytearray([obj_type, record_size]) + any_records = False + for idx in range(start, end + 1): + obj = store.get(idx) + if obj is None: + continue + body.extend(_extended_status_record(obj_type, idx, obj)) + any_records = True + if not any_records: + return encode_v2(OmniLink2MessageType.EOD, b"") + return encode_v2(OmniLink2MessageType.ExtendedStatus, bytes(body)) + + # -------- Command (opcode 20) -------- + + def _handle_command(self, payload: bytes) -> tuple[Message, tuple[int, ...]]: + """Apply a Command (opcode 20) and return (reply, push_event_words). + + Command payload (4 bytes, clsOL2MsgCommand.cs after stripping opcode): + [0] command byte (enuUnitCommand) + [1] parameter1 (single byte; brightness, mode, code index, ...) + [2] parameter2 high byte (BE u16) + [3] parameter2 low byte (object number for nearly every command) + """ + if len(payload) < 4: + return _build_nak(OmniLink2MessageType.Command), () + cmd_byte = payload[0] + param1 = payload[1] + param2 = (payload[2] << 8) | payload[3] + try: + cmd = Command(cmd_byte) + except ValueError: + _log.debug("mock panel: unknown command byte %d", cmd_byte) + return _build_nak(OmniLink2MessageType.Command), () + + push: tuple[int, ...] = () + + if cmd == Command.UNIT_OFF: + unit = self._ensure_unit(param2) + unit.state = 0 + unit.time_remaining = 0 + push = (_unit_state_changed_word(param2, 0),) + elif cmd == Command.UNIT_ON: + unit = self._ensure_unit(param2) + unit.state = 1 + unit.time_remaining = 0 + push = (_unit_state_changed_word(param2, 1),) + elif cmd == Command.UNIT_LEVEL: + # Per enuUnitCommand.Level (line 15): param1 = 0..100 percent. + # Encoded into the state byte as 100..200. + if not 0 <= param1 <= 100: + return _build_nak(OmniLink2MessageType.Command), () + unit = self._ensure_unit(param2) + unit.state = 100 + param1 + unit.time_remaining = 0 + push = (_unit_state_changed_word(param2, 1 if param1 > 0 else 0),) + elif cmd == Command.BYPASS_ZONE: + zone = self._ensure_zone(param2) + zone.is_bypassed = True + push = (_zone_state_changed_word(param2, 1),) + elif cmd == Command.RESTORE_ZONE: + zone = self._ensure_zone(param2) + zone.is_bypassed = False + push = (_zone_state_changed_word(param2, 0),) + elif cmd == Command.SET_THERMOSTAT_HEAT_SETPOINT: + tstat = self._ensure_thermostat(param2) + tstat.heat_setpoint_raw = param1 + elif cmd == Command.SET_THERMOSTAT_COOL_SETPOINT: + tstat = self._ensure_thermostat(param2) + tstat.cool_setpoint_raw = param1 + elif cmd == Command.SET_THERMOSTAT_SYSTEM_MODE: + tstat = self._ensure_thermostat(param2) + tstat.system_mode = param1 + elif cmd == Command.SET_THERMOSTAT_FAN_MODE: + tstat = self._ensure_thermostat(param2) + tstat.fan_mode = param1 + elif cmd == Command.SET_THERMOSTAT_HOLD_MODE: + tstat = self._ensure_thermostat(param2) + tstat.hold_mode = param1 + else: + # Acknowledge but don't model: EXECUTE_BUTTON, EXECUTE_PROGRAM, + # SHOW_MESSAGE_*, CLEAR_MESSAGE, scenes, audio, energy, ... + _log.info( + "mock panel: command %s (byte=%d, p1=%d, p2=%d) acknowledged " + "with no state effect", + cmd.name, cmd_byte, param1, param2, + ) + + return _build_ack(), push + + # -------- ExecuteSecurityCommand (opcode 74) -------- + + def _handle_execute_security_command( + self, payload: bytes + ) -> tuple[Message, tuple[int, ...]]: + """Validate the user code, mutate area state, push an ArmingChanged event. + + Payload (6 bytes, clsOL2MsgExecuteSecurityCommand.cs after stripping opcode): + [0] area number (1-based) + [1] security mode (raw enuSecurityMode 0..7) + [2..5] code digits (thousands, hundreds, tens, ones) + + Implementation choice: on success we return a plain Ack (opcode 1) + rather than ExecuteSecurityCommandResponse (opcode 75) — the Omni + firmware varies and the client treats both as success. On bad-code + we return Nak (the simplest panel behaviour); the client raises + :class:`CommandFailedError` either way. + """ + if len(payload) < 6: + return _build_nak(OmniLink2MessageType.ExecuteSecurityCommand), () + area_idx = payload[0] + mode = payload[1] + code = ( + payload[2] * 1000 + payload[3] * 100 + payload[4] * 10 + payload[5] + ) + + # Find a matching code in user_codes. The matched code_index is + # what the panel records as the "last user" for the area. + matched_user = None + for user_idx, pin in self.state.user_codes.items(): + if pin == code: + matched_user = user_idx + break + if matched_user is None: + _log.debug("mock panel: ExecuteSecurityCommand bad code %04d", code) + return _build_nak(OmniLink2MessageType.ExecuteSecurityCommand), () + + area = self._ensure_area(area_idx) + area.mode = mode + area.last_user = matched_user + + push = (_arming_changed_word(area_idx, mode, matched_user),) + return _build_ack(), push + + # -------- per-object ensure helpers -------- + + def _ensure_unit(self, idx: int) -> MockUnitState: + unit = self.state.units.get(idx) + if unit is None: + unit = MockUnitState() + self.state.units[idx] = unit + return unit + + def _ensure_zone(self, idx: int) -> MockZoneState: + zone = self.state.zones.get(idx) + if zone is None: + zone = MockZoneState() + self.state.zones[idx] = zone + return zone + + def _ensure_area(self, idx: int) -> MockAreaState: + area = self.state.areas.get(idx) + if area is None: + area = MockAreaState() + self.state.areas[idx] = area + return area + + def _ensure_thermostat(self, idx: int) -> MockThermostatState: + tstat = self.state.thermostats.get(idx) + if tstat is None: + tstat = MockThermostatState() + self.state.thermostats[idx] = tstat + return tstat + + # -------- low-level reply send + push helpers -------- + + def _schedule_event_push( + self, + event_words: tuple[int, ...], + session_key: bytes, + writer: asyncio.StreamWriter, + ) -> None: + """Fire-and-forget: push a SystemEvents (opcode 55) frame after a tiny delay. + + The delay lets the synchronous reply hit the client first so the + request future resolves before the unsolicited event arrives. Tests + that wait on ``client.events()`` use ``asyncio.wait_for`` with their + own timeout to fail fast if the push never arrives. + """ + + async def _push() -> None: + try: + await asyncio.sleep(_PUSH_DELAY) + msg = _build_system_events_message(event_words) + # Push goes out with seq=0 so the client routes it to the + # unsolicited queue (clsOmniLinkConnection.cs:1847-1854). + await self._send_v2_reply(0, msg, session_key, writer) + except (ConnectionError, asyncio.CancelledError): + pass + except Exception: # pragma: no cover - diagnostic only + _log.exception("mock panel: failed to push synthesized event") + + task = asyncio.create_task(_push(), name="mock-panel-event-push") + self._push_tasks.add(task) + task.add_done_callback(self._push_tasks.discard) async def _send_v2_reply( self, @@ -519,6 +963,174 @@ class MockPanel: await writer.drain() +# -------------------------------------------------------------------------- +# Status / ExtendedStatus per-record builders +# -------------------------------------------------------------------------- + + +def _status_record(obj_type: int, idx: int, obj: object) -> bytes: + """Build one record of a basic Status (opcode 35) reply for ``obj_type``.""" + if obj_type == _OBJ_ZONE: + z = obj # type: ignore[assignment] + assert isinstance(z, MockZoneState) + return bytes([(idx >> 8) & 0xFF, idx & 0xFF, z.status_byte, z.loop]) + if obj_type == _OBJ_UNIT: + u = obj # type: ignore[assignment] + assert isinstance(u, MockUnitState) + return bytes( + [ + (idx >> 8) & 0xFF, + idx & 0xFF, + u.state & 0xFF, + (u.time_remaining >> 8) & 0xFF, + u.time_remaining & 0xFF, + ] + ) + if obj_type == _OBJ_AREA: + a = obj # type: ignore[assignment] + assert isinstance(a, MockAreaState) + return bytes( + [ + (idx >> 8) & 0xFF, + idx & 0xFF, + a.mode & 0xFF, + a.alarms & 0xFF, + a.entry_timer & 0xFF, + a.exit_timer & 0xFF, + ] + ) + if obj_type == _OBJ_THERMOSTAT: + t = obj # type: ignore[assignment] + assert isinstance(t, MockThermostatState) + return bytes( + [ + (idx >> 8) & 0xFF, + idx & 0xFF, + t.status & 0xFF, + t.temperature_raw & 0xFF, + t.heat_setpoint_raw & 0xFF, + t.cool_setpoint_raw & 0xFF, + t.system_mode & 0xFF, + t.fan_mode & 0xFF, + t.hold_mode & 0xFF, + ] + ) + raise AssertionError(f"unhandled object type {obj_type}") + + +def _extended_status_record(obj_type: int, idx: int, obj: object) -> bytes: + """Build one record of an ExtendedStatus (opcode 59) reply for ``obj_type``. + + The basic-status records are byte-compatible with the extended-status + records for Zone, Unit, and Area (the ExtendedStatus reply just adds + the per-record length byte at payload[1]). Thermostat is the only type + where the extended record is wider — it adds humidity/outdoor/horc + fields at the end. + """ + if obj_type in (_OBJ_ZONE, _OBJ_UNIT, _OBJ_AREA): + return _status_record(obj_type, idx, obj) + if obj_type == _OBJ_THERMOSTAT: + t = obj # type: ignore[assignment] + assert isinstance(t, MockThermostatState) + return bytes( + [ + (idx >> 8) & 0xFF, + idx & 0xFF, + t.status & 0xFF, + t.temperature_raw & 0xFF, + t.heat_setpoint_raw & 0xFF, + t.cool_setpoint_raw & 0xFF, + t.system_mode & 0xFF, + t.fan_mode & 0xFF, + t.hold_mode & 0xFF, + t.humidity_raw & 0xFF, + t.humidify_setpoint_raw & 0xFF, + t.dehumidify_setpoint_raw & 0xFF, + t.outdoor_temperature_raw & 0xFF, + t.horc_status & 0xFF, + ] + ) + raise AssertionError(f"unhandled object type {obj_type}") + + +# -------------------------------------------------------------------------- +# SystemEvents (opcode 55) — synthesized push frames +# -------------------------------------------------------------------------- + + +def _build_system_events_message(words: tuple[int, ...]) -> Message: + """Pack one or more 16-bit event words into a v2 SystemEvents Message. + + Each word is encoded big-endian. Reference: clsOLMsgSystemEvents.cs. + """ + body = bytearray() + for w in words: + body.append((w >> 8) & 0xFF) + body.append(w & 0xFF) + return encode_v2(OmniLink2MessageType.SystemEvents, bytes(body)) + + +def _zone_state_changed_word(zone_index: int, new_state: int) -> int: + """Encode a ZONE_STATE_CHANGE (top 6 bits == 0x4) event word. + + Layout (matches events._classify): + bits 10-15: family marker (0x0400) + bit 9 : new_state (0=secure, 1=open) + low byte : zone index 1..255 + """ + word = 0x0400 | (zone_index & 0xFF) + if new_state: + word |= 0x0200 + return word & 0xFFFF + + +def _unit_state_changed_word(unit_index: int, new_state: int) -> int: + """Encode a UNIT_STATE_CHANGE (top 6 bits == 0x8) event word. + + Layout: + bits 10-15: family marker (0x0800) + bit 9 : new_state (0=off, 1=on) + bit 8 : unit_index >= 256 high bit + low byte : unit index low 8 bits + """ + word = 0x0800 | (unit_index & 0xFF) + if unit_index >= 256: + word |= 0x0100 + if new_state: + word |= 0x0200 + return word & 0xFFFF + + +def _arming_changed_word(area_index: int, new_mode: int, user_index: int) -> int: + """Encode a SECURITY_MODE_CHANGE catch-all event word. + + Layout (mirrors events._classify catch-all branch and clsText.cs:2155-2217): + bits 12-14: SecurityMode (0..7) + bits 8-11 : area index (0 = system / no specific area) + low byte : user/code index that triggered the change (0 = panel) + + NOTE: the classifier in :func:`omni_pca.events._classify` only routes + a word to ArmingChanged when ``(word >> 8) & 0xF0`` is non-zero. Our + encoding satisfies that as long as ``new_mode`` is at least 1 (the + SecurityMode high nibble of the high byte is non-zero). For Off (0) + the test seeds a non-zero mode — Disarm (mode=Off) flowing through + the same path would round-trip as an UnknownEvent, which matches + real-panel behaviour where Off is pushed as a different event family. + """ + word = ((new_mode & 0x07) << 12) | ((area_index & 0x0F) << 8) | (user_index & 0xFF) + return word & 0xFFFF + + +# -------------------------------------------------------------------------- +# Stock reply / NAK builders +# -------------------------------------------------------------------------- + + +def _build_ack() -> Message: + """Build a v2 Ack (opcode 1) with no payload.""" + return encode_v2(OmniLink2MessageType.Ack, b"") + + def _build_nak(in_reply_to_opcode: int) -> Message: """Build a v2 Nak. Payload is a single byte echoing the opcode being negged. diff --git a/tests/test_e2e_client_mock.py b/tests/test_e2e_client_mock.py index cecd2f5..5d20aa2 100644 --- a/tests/test_e2e_client_mock.py +++ b/tests/test_e2e_client_mock.py @@ -7,14 +7,36 @@ session-key derivation, or per-block whitening disagree, the handshake fails. from __future__ import annotations +import asyncio import secrets import pytest from omni_pca.client import ObjectType, OmniClient +from omni_pca.commands import CommandFailedError from omni_pca.connection import HandshakeError -from omni_pca.mock_panel import MockPanel, MockState -from omni_pca.models import AreaProperties, UnitProperties, ZoneProperties +from omni_pca.events import ArmingChanged, UnitStateChanged +from omni_pca.mock_panel import ( + MockAreaState, + MockPanel, + MockState, + MockThermostatState, + MockUnitState, + MockZoneState, +) +from omni_pca.models import ( + AreaProperties, + AreaStatus, + SecurityMode, + ThermostatStatus, + UnitProperties, + UnitStatus, + ZoneProperties, + ZoneStatus, +) +from omni_pca.models import ( + ObjectType as ModelObjectType, +) CONTROLLER_KEY = bytes.fromhex("6ba7b4e9b4656de3cd7edd4c650cdb09") @@ -99,3 +121,162 @@ async def test_e2e_wrong_key_fails_with_handshake_error() -> None: with pytest.raises(HandshakeError): async with OmniClient(host=host, port=port, controller_key=wrong_key) as cli: await cli.get_system_information() + + +# -------------------------------------------------------------------------- +# New surface: typed commands + status + event push +# -------------------------------------------------------------------------- + + +def _state_with_area_and_codes() -> MockState: + """Common fixture: one area with one valid user-code mapping.""" + return MockState( + areas={1: MockAreaState(name="Main")}, + user_codes={1: 1234}, + ) + + +async def test_e2e_arm_area() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_state_with_area_and_codes()) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + await cli.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=1234 + ) + statuses = await cli.get_object_status(ModelObjectType.AREA, 1) + assert len(statuses) == 1 + area = statuses[0] + assert isinstance(area, AreaStatus) + assert area.index == 1 + assert area.mode == int(SecurityMode.AWAY) + assert area.mode_name == "AWAY" + + +async def test_e2e_arm_with_wrong_code_raises() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_state_with_area_and_codes()) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + with pytest.raises(CommandFailedError): + await cli.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=9999 + ) + + +async def test_e2e_turn_unit_on_off() -> None: + state = MockState(units={1: MockUnitState(name="Lamp")}) + panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + await cli.turn_unit_on(1) + statuses = await cli.get_object_status(ModelObjectType.UNIT, 1) + assert len(statuses) == 1 + unit = statuses[0] + assert isinstance(unit, UnitStatus) + assert unit.state == 1 + assert unit.is_on is True + + await cli.turn_unit_off(1) + statuses = await cli.get_object_status(ModelObjectType.UNIT, 1) + assert statuses[0].state == 0 + assert statuses[0].is_on is False + + +async def test_e2e_set_unit_level() -> None: + state = MockState(units={1: MockUnitState(name="Dimmer")}) + panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + await cli.set_unit_level(1, 60) + statuses = await cli.get_extended_status(ModelObjectType.UNIT, 1) + assert len(statuses) == 1 + unit = statuses[0] + assert isinstance(unit, UnitStatus) + # state byte 100..200 encodes brightness percent (state - 100). + assert unit.state == 160 + assert unit.brightness == 60 + + +async def test_e2e_bypass_restore_zone() -> None: + state = MockState(zones={1: MockZoneState(name="Front Door")}) + panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + # Initially not bypassed. + statuses = await cli.get_object_status(ModelObjectType.ZONE, 1) + assert isinstance(statuses[0], ZoneStatus) + assert statuses[0].is_bypassed is False + + await cli.bypass_zone(1) + statuses = await cli.get_object_status(ModelObjectType.ZONE, 1) + assert statuses[0].is_bypassed is True + + await cli.restore_zone(1) + statuses = await cli.get_object_status(ModelObjectType.ZONE, 1) + assert statuses[0].is_bypassed is False + + +async def test_e2e_set_thermostat_heat_setpoint() -> None: + state = MockState(thermostats={1: MockThermostatState(name="Living")}) + panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + await cli.set_thermostat_heat_setpoint_raw(1, 150) + statuses = await cli.get_extended_status(ModelObjectType.THERMOSTAT, 1) + assert len(statuses) == 1 + tstat = statuses[0] + assert isinstance(tstat, ThermostatStatus) + assert tstat.heat_setpoint_raw == 150 + + +async def test_e2e_arm_pushes_arming_changed_event() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY, state=_state_with_area_and_codes()) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + events = cli.events() + await cli.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=1234 + ) + ev = await asyncio.wait_for(events.__anext__(), timeout=1.0) + assert isinstance(ev, ArmingChanged) + assert ev.area_index == 1 + assert ev.new_mode == int(SecurityMode.AWAY) + assert ev.user_index == 1 + + +async def test_e2e_unit_command_pushes_unit_state_changed_event() -> None: + state = MockState(units={1: MockUnitState(name="Lamp")}) + panel = MockPanel(controller_key=CONTROLLER_KEY, state=state) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + events = cli.events() + await cli.turn_unit_on(1) + ev = await asyncio.wait_for(events.__anext__(), timeout=1.0) + assert isinstance(ev, UnitStateChanged) + assert ev.unit_index == 1 + assert ev.is_on is True + + +async def test_e2e_acknowledge_alerts() -> None: + panel = MockPanel(controller_key=CONTROLLER_KEY) + async with ( + panel.serve() as (host, port), + OmniClient(host=host, port=port, controller_key=CONTROLLER_KEY) as cli, + ): + # Should complete without raising. + await cli.acknowledge_alerts()