diff --git a/src/omni_pca/client.py b/src/omni_pca/client.py index 0a60a85..b83511e 100644 --- a/src/omni_pca/client.py +++ b/src/omni_pca/client.py @@ -17,11 +17,12 @@ from __future__ import annotations import asyncio import contextlib import struct -from collections.abc import Awaitable, Callable +from collections.abc import Awaitable, Callable, Sequence from enum import IntEnum from types import TracebackType from typing import Self +from .commands import Command, CommandFailedError, SecurityCommandResponse from .connection import ( ConnectionError as OmniConnectionError, ) @@ -31,13 +32,23 @@ from .connection import ( ) from .message import Message from .models import ( + OBJECT_TYPE_TO_STATUS, AreaProperties, + AreaStatus, + FanMode, + HoldMode, + HvacMode, PropertiesReply, + SecurityMode, + StatusReply, SystemInformation, SystemStatus, UnitProperties, ZoneProperties, ) +from .models import ( + ObjectType as ModelObjectType, +) from .opcodes import OmniLink2MessageType @@ -70,6 +81,23 @@ _PROPERTIES_PARSERS: dict[ObjectType, type[PropertiesReply]] = { } +# Per-object-type record sizes for a basic Status (opcode 35) reply, where +# (unlike ExtendedStatus) there is no per-record length byte and the size +# is hard-coded in the wire format. Source: clsOL2MsgStatus.cs:13-27. +_STATUS_RECORD_SIZES: dict[int, int] = { + 1: 4, # enuObjectType.Zone — number(2) + status + loop + 2: 5, # enuObjectType.Unit — number(2) + state + time(2) + 5: 6, # enuObjectType.Area — number(2) + mode + alarms + entry + exit + 6: 9, # enuObjectType.Thermostat — number(2) + status + 6 bytes (status..hold) + 7: 3, # enuObjectType.Message — number(2) + status + 8: 6, # enuObjectType.Auxillary — number(2) + output + temp + low + high + 10: 6, # enuObjectType.AudioZone — number(2) + power + source + volume + mute + 11: 4, # enuObjectType.Expansion — number(2) + status + battery + 13: 5, # enuObjectType.UserSetting — number(2) + type + value(2) + 15: 5, # enuObjectType.AccessControlLock — number(2) + status + duration(2) +} + + class OmniClient: """High-level async Omni-Link II client. @@ -169,6 +197,475 @@ class OmniClient: self._expect(reply, OmniLink2MessageType.Properties) return parser.parse(reply.payload) + # ---- commands -------------------------------------------------------- + + async def execute_command( + self, + command: Command, + parameter1: int = 0, + parameter2: int = 0, + ) -> None: + """Send a generic Command (opcode 20). + + Most state-change operations on lights, scenes, zones, thermostats, + scenes, audio zones, etc. flow through here. The panel acks with + an :attr:`OmniLink2MessageType.Ack`; specific resulting state must + be re-polled (or you can subscribe to the unsolicited push stream + to see the corresponding ExtendedStatus push the panel emits). + + Wire opcode: 20 (Command). + Wire payload (4 bytes, from clsOL2MsgCommand.cs:5-57): + [0] command byte (this enum value) + [1] parameter1 (single byte; brightness, mode, code index, ...) + [2] parameter2 high byte (BE u16) + [3] parameter2 low byte (object number for nearly every command) + + Reference: clsOL2MsgCommand.cs. + """ + if not 0 <= parameter1 <= 0xFF: + raise ValueError(f"parameter1 must fit in a byte: {parameter1}") + if not 0 <= parameter2 <= 0xFFFF: + raise ValueError(f"parameter2 must fit in u16: {parameter2}") + payload = struct.pack( + ">BBH", int(command), parameter1 & 0xFF, parameter2 & 0xFFFF + ) + reply = await self._conn.request(OmniLink2MessageType.Command, payload) + if reply.opcode == OmniLink2MessageType.Nak: + raise CommandFailedError( + f"panel NAK'd Command {command.name} " + f"(p1={parameter1}, p2={parameter2})" + ) + if reply.opcode != OmniLink2MessageType.Ack: + raise CommandFailedError( + f"unexpected reply to Command {command.name}: opcode={reply.opcode}" + ) + + async def execute_security_command( + self, + area: int, + mode: SecurityMode, + code: int, + ) -> AreaStatus | None: + """Arm or disarm a security area. + + The panel validates the code against its enabled-codes list for + that area; on failure it returns an + :attr:`OmniLink2MessageType.ExecuteSecurityCommandResponse` whose + ``payload[0]`` is one of the :class:`SecurityCommandResponse` + values. On success the panel may either return an Ack or push an + ExtendedStatus update for the affected area; we surface only the + response code (success/failure) and return ``None`` for the + success path because the synchronous reply does not carry a full + :class:`AreaStatus` record. Re-poll via :meth:`get_object_status` + if you need the post-command state. + + Wire opcode: 74 (ExecuteSecurityCommand). + Wire payload (6 bytes, from clsOL2MsgExecuteSecurityCommand.cs:5-90): + [0] area number (1-based) + [1] security mode byte (raw enuSecurityMode 0..7) + [2] code digit 1 (thousands) + [3] code digit 2 (hundreds) + [4] code digit 3 (tens) + [5] code digit 4 (ones) + + Raises: + ValueError: ``area`` not 1..255 or ``code`` not 0..9999. + CommandFailedError: panel Nak'd the request, or the + ExecuteSecurityCommandResponse status byte is non-zero. + The structured failure code is exposed on + ``CommandFailedError.failure_code``. + + Reference: clsOL2MsgExecuteSecurityCommand.cs, + clsOL2MsgExecuteSecurityCommandResponse.cs. + """ + if not 1 <= area <= 0xFF: + raise ValueError(f"area out of range: {area}") + if not 0 <= code <= 9999: + raise ValueError(f"code out of range (0000-9999): {code}") + # Match the C# digit-packing exactly (clsOL2MsgExecuteSecurityCommand.cs:36-41). + d1 = (code // 1000) % 10 + d2 = (code // 100) % 10 + d3 = (code // 10) % 10 + d4 = code % 10 + payload = bytes([area & 0xFF, int(mode) & 0xFF, d1, d2, d3, d4]) + reply = await self._conn.request( + OmniLink2MessageType.ExecuteSecurityCommand, payload + ) + if reply.opcode == OmniLink2MessageType.Nak: + raise CommandFailedError( + f"panel NAK'd ExecuteSecurityCommand " + f"(area={area}, mode={mode.name})" + ) + if reply.opcode == OmniLink2MessageType.ExecuteSecurityCommandResponse: + if not reply.payload: + raise CommandFailedError( + "ExecuteSecurityCommandResponse with empty payload" + ) + status = reply.payload[0] + if status != SecurityCommandResponse.SUCCESS: + try: + label = SecurityCommandResponse(status).name + except ValueError: + label = f"unknown({status})" + raise CommandFailedError( + f"ExecuteSecurityCommand failed: {label}", + failure_code=status, + ) + return None + if reply.opcode == OmniLink2MessageType.Ack: + return None + raise CommandFailedError( + f"unexpected reply to ExecuteSecurityCommand: opcode={reply.opcode}" + ) + + async def acknowledge_alerts(self) -> None: + """Acknowledge all outstanding alerts/troubles on the panel. + + Wire opcode: 60 (AcknowledgeAlerts). No payload, panel acks. + + Reference: enuOmniLink2MessageType.AcknowledgeAlerts. + """ + reply = await self._conn.request(OmniLink2MessageType.AcknowledgeAlerts) + if reply.opcode == OmniLink2MessageType.Nak: + raise CommandFailedError("panel NAK'd AcknowledgeAlerts") + if reply.opcode != OmniLink2MessageType.Ack: + raise CommandFailedError( + f"unexpected reply to AcknowledgeAlerts: opcode={reply.opcode}" + ) + + async def get_object_status( + self, + object_type: ModelObjectType, + start: int, + end: int | None = None, + ) -> Sequence[StatusReply]: + """Request basic Status (opcode 34/35) for a range of objects. + + ``end=None`` requests just the single object at ``start``. Returns + a list of the appropriate ``*Status`` dataclass instances, parsed + from each fixed-size record in the reply. + + Unlike :meth:`get_extended_status`, the basic Status reply has NO + per-record ``object_length`` byte — record sizes are hard-coded + per object type (see ``clsOL2MsgStatus.cs:13-27``). + + Wire opcode: 34 (RequestStatus) -> 35 (Status). + RequestStatus payload (5 bytes, clsOL2MsgRequestStatus.cs:5-41): + [0] object type (enuObjectType) + [1..2] starting number (BE u16) + [3..4] ending number (BE u16) + + Status reply payload layout (clsOL2MsgStatus.cs): + [0] object type + [1..] N records of size :data:`_STATUS_RECORD_SIZES[object_type]` + + Reference: clsOL2MsgRequestStatus.cs, clsOL2MsgStatus.cs. + """ + return await self._fetch_status_range( + object_type=object_type, + start=start, + end=end, + request_opcode=OmniLink2MessageType.RequestStatus, + reply_opcode=OmniLink2MessageType.Status, + header_bytes=1, # just object_type + record_sizes=_STATUS_RECORD_SIZES, + ) + + async def get_extended_status( + self, + object_type: ModelObjectType, + start: int, + end: int | None = None, + ) -> Sequence[StatusReply]: + """Request ExtendedStatus (opcode 58/59) for a range of objects. + + For Thermostats, AuxSensors, dimmable Units, and most other types + this carries more fields (current temperature, setpoints, + brightness level, etc.) than the basic Status reply. + + Unlike basic Status, the ExtendedStatus reply has an explicit + ``object_length`` byte at ``payload[1]`` so the record size doesn't + have to be hard-coded — we use it as-is. + + Wire opcode: 58 (RequestExtendedStatus) -> 59 (ExtendedStatus). + RequestExtendedStatus payload (5 bytes, clsOL2MsgRequestExtendedStatus.cs:5-41): + [0] object type + [1..2] starting number (BE u16) + [3..4] ending number (BE u16) + + ExtendedStatus reply payload layout (clsOL2MsgExtendedStatus.cs): + [0] object type + [1] object length (per-record byte count) + [2..] N records of ``object_length`` bytes + + Reference: clsOL2MsgRequestExtendedStatus.cs, clsOL2MsgExtendedStatus.cs. + """ + return await self._fetch_status_range( + object_type=object_type, + start=start, + end=end, + request_opcode=OmniLink2MessageType.RequestExtendedStatus, + reply_opcode=OmniLink2MessageType.ExtendedStatus, + header_bytes=2, # object_type + object_length + record_sizes=None, # take from payload[1] + ) + + # ---- thin command wrappers ------------------------------------------ + + async def turn_unit_on(self, index: int) -> None: + """Turn a unit (light, relay, scene) ON. + + Wire opcode: 20 (Command), command byte = ``Command.UNIT_ON`` (1). + Reference: enuUnitCommand.On (line 6). + """ + await self.execute_command(Command.UNIT_ON, parameter2=index) + + async def turn_unit_off(self, index: int) -> None: + """Turn a unit OFF. + + Wire opcode: 20 (Command), command byte = ``Command.UNIT_OFF`` (0). + Reference: enuUnitCommand.Off (line 5). + """ + await self.execute_command(Command.UNIT_OFF, parameter2=index) + + async def set_unit_level(self, index: int, percent: int) -> None: + """Set a dimmable unit's brightness to ``percent`` (0..100). + + Wire opcode: 20 (Command), command byte = ``Command.UNIT_LEVEL`` (9), + parameter1 = percent. + Reference: enuUnitCommand.Level (line 15). + """ + if not 0 <= percent <= 100: + raise ValueError(f"percent must be 0..100: {percent}") + await self.execute_command( + Command.UNIT_LEVEL, parameter1=percent, parameter2=index + ) + + async def bypass_zone(self, index: int, code: int = 0) -> None: + """Bypass a zone (1-based). + + Wire opcode: 20 (Command), command byte = ``Command.BYPASS_ZONE`` (4), + parameter1 = user code index (0 = installer/no-code path), + parameter2 = zone number. + + Reference: enuUnitCommand.Bypass (line 10). + """ + await self.execute_command( + Command.BYPASS_ZONE, parameter1=code, parameter2=index + ) + + async def restore_zone(self, index: int, code: int = 0) -> None: + """Restore a previously-bypassed zone. + + Wire opcode: 20 (Command), command byte = ``Command.RESTORE_ZONE`` (5), + parameter1 = user code index, parameter2 = zone number. + + Reference: enuUnitCommand.Restore (line 11). + """ + await self.execute_command( + Command.RESTORE_ZONE, parameter1=code, parameter2=index + ) + + async def set_thermostat_system_mode( + self, index: int, mode: HvacMode + ) -> None: + """Change the thermostat's system mode (Off/Heat/Cool/Auto/EmHeat). + + Wire opcode: 20 (Command), command byte = + ``Command.SET_THERMOSTAT_SYSTEM_MODE`` (68), + parameter1 = mode value, parameter2 = thermostat number. + + Reference: enuUnitCommand.Mode (line 73). + """ + await self.execute_command( + Command.SET_THERMOSTAT_SYSTEM_MODE, + parameter1=int(mode), + parameter2=index, + ) + + async def set_thermostat_fan_mode( + self, index: int, mode: FanMode + ) -> None: + """Change the thermostat's fan mode (Auto/On/Cycle). + + Wire opcode: 20 (Command), command byte = + ``Command.SET_THERMOSTAT_FAN_MODE`` (69). + Reference: enuUnitCommand.Fan (line 74). + """ + await self.execute_command( + Command.SET_THERMOSTAT_FAN_MODE, + parameter1=int(mode), + parameter2=index, + ) + + async def set_thermostat_hold_mode( + self, index: int, mode: HoldMode + ) -> None: + """Change the thermostat's hold mode (Off/Hold/Vacation). + + Wire opcode: 20 (Command), command byte = + ``Command.SET_THERMOSTAT_HOLD_MODE`` (70). + Reference: enuUnitCommand.Hold (line 75). + """ + await self.execute_command( + Command.SET_THERMOSTAT_HOLD_MODE, + parameter1=int(mode), + parameter2=index, + ) + + async def set_thermostat_heat_setpoint_raw( + self, index: int, raw: int + ) -> None: + """Set the heat setpoint, in Omni's raw temperature byte units. + + Convert from C/F at the call site (see + :func:`omni_pca.models.omni_temp_to_celsius` / + :func:`omni_pca.models.omni_temp_to_fahrenheit` for the inverse) - + this layer is deliberately transport-shaped. + + Wire opcode: 20 (Command), command byte = + ``Command.SET_THERMOSTAT_HEAT_SETPOINT`` (66). + Reference: enuUnitCommand.SetLowSetPt (line 71). + """ + if not 0 <= raw <= 0xFF: + raise ValueError(f"raw setpoint must be a byte: {raw}") + await self.execute_command( + Command.SET_THERMOSTAT_HEAT_SETPOINT, + parameter1=raw, + parameter2=index, + ) + + async def set_thermostat_cool_setpoint_raw( + self, index: int, raw: int + ) -> None: + """Set the cool setpoint, in Omni's raw temperature byte units. + + Wire opcode: 20 (Command), command byte = + ``Command.SET_THERMOSTAT_COOL_SETPOINT`` (67). + Reference: enuUnitCommand.SetHighSetPt (line 72). + """ + if not 0 <= raw <= 0xFF: + raise ValueError(f"raw setpoint must be a byte: {raw}") + await self.execute_command( + Command.SET_THERMOSTAT_COOL_SETPOINT, + parameter1=raw, + parameter2=index, + ) + + async def execute_button(self, index: int) -> None: + """Run the program assigned to a button. + + Wire opcode: 20 (Command), command byte = ``Command.EXECUTE_BUTTON`` (7). + Reference: enuUnitCommand.Button (line 13). + """ + await self.execute_command(Command.EXECUTE_BUTTON, parameter2=index) + + async def execute_program(self, index: int) -> None: + """Run a stored program by index (1-based). + + Wire opcode: 20 (Command), command byte = ``Command.EXECUTE_PROGRAM`` (104). + Note: enuUnitCommand calls this ``UserSetting`` historically — we + rename for clarity since "execute program" matches the user-facing + verb in the owner manual. + + Reference: enuUnitCommand.UserSetting (line 98). + """ + await self.execute_command(Command.EXECUTE_PROGRAM, parameter2=index) + + async def show_message(self, index: int, beep: bool = True) -> None: + """Display a stored message on the panel's keypad. + + Wire opcode: 20 (Command), command byte = ``Command.SHOW_MESSAGE_WITH_BEEP`` + (80) when ``beep=True`` or ``Command.SHOW_MESSAGE_NO_BEEP`` (86) otherwise. + + Reference: enuUnitCommand.ShowMsgWBeep (line 81), + enuUnitCommand.ShowMsgNoBeep (line 87). + """ + cmd = ( + Command.SHOW_MESSAGE_WITH_BEEP + if beep + else Command.SHOW_MESSAGE_NO_BEEP + ) + await self.execute_command(cmd, parameter2=index) + + async def clear_message(self, index: int) -> None: + """Clear a previously-shown message. + + Wire opcode: 20 (Command), command byte = ``Command.CLEAR_MESSAGE`` (82). + Reference: enuUnitCommand.ClearMsg (line 83). + """ + await self.execute_command(Command.CLEAR_MESSAGE, parameter2=index) + + # ---- helpers (status) ----------------------------------------------- + + async def _fetch_status_range( + self, + *, + object_type: ModelObjectType, + start: int, + end: int | None, + request_opcode: OmniLink2MessageType, + reply_opcode: OmniLink2MessageType, + header_bytes: int, + record_sizes: dict[int, int] | None, + ) -> Sequence[StatusReply]: + if not 0 <= start <= 0xFFFF: + raise ValueError(f"start out of range: {start}") + end_n = start if end is None else end + if not 0 <= end_n <= 0xFFFF: + raise ValueError(f"end out of range: {end_n}") + if end_n < start: + raise ValueError(f"end ({end_n}) must be >= start ({start})") + + parser = OBJECT_TYPE_TO_STATUS.get(int(object_type)) + if parser is None: + raise NotImplementedError( + f"no status parser for object type {object_type.name}" + ) + + payload = struct.pack(">BHH", int(object_type), start, end_n) + reply = await self._conn.request(request_opcode, payload) + if reply.opcode == OmniLink2MessageType.EOD: + return [] + if reply.opcode == OmniLink2MessageType.Nak: + raise CommandFailedError( + f"panel NAK'd {request_opcode.name} for " + f"{object_type.name}#{start}..{end_n}" + ) + self._expect(reply, reply_opcode) + body = reply.payload + if len(body) < header_bytes: + raise OmniConnectionError( + f"{reply_opcode.name} payload too short: {len(body)}" + ) + if body[0] != int(object_type): + raise OmniConnectionError( + f"{reply_opcode.name} object type mismatch: " + f"sent {int(object_type)}, got {body[0]}" + ) + if record_sizes is None: + # ExtendedStatus carries the per-record size at payload[1]. + record_size = body[1] + records_start = 2 + else: + record_size = record_sizes.get(int(object_type), 0) + if record_size == 0: + raise NotImplementedError( + f"no Status record size for {object_type.name}" + ) + records_start = 1 + records_buf = body[records_start:] + if record_size == 0: + return [] + out: list[StatusReply] = [] + for off in range(0, len(records_buf), record_size): + chunk = records_buf[off : off + record_size] + if len(chunk) < record_size: + # Trailing partial record: ignore (panel may pad). + break + out.append(parser.parse(chunk)) + return out + async def list_zone_names(self) -> dict[int, str]: """Walk all zones, returning ``{index: name}`` for those with a name set.""" return await self._walk_named_objects( diff --git a/src/omni_pca/commands.py b/src/omni_pca/commands.py new file mode 100644 index 0000000..282ec21 --- /dev/null +++ b/src/omni_pca/commands.py @@ -0,0 +1,211 @@ +"""Command (opcode 20) and ExecuteSecurityCommand (opcode 74) primitives. + +This module pins down the exact byte values the panel expects in the +*first byte* of a Command (opcode 20) payload, plus the failure-mode +exception used by the typed methods on :class:`omni_pca.client.OmniClient`. + +Naming note: there is no standalone ``enuCommand`` enum in HAI_Shared — +the C# code uses :class:`enuUnitCommand` (file +``decompiled/project/HAI_Shared/enuUnitCommand.cs``) for *every* Command +opcode regardless of object type, even though the name suggests it's +unit-only. We mirror that single enum here under the cleaner name +:class:`Command`. Every member cites the line in ``enuUnitCommand.cs`` +where its byte value is defined. + +The rich Command (opcode 20) wire format (from +``clsOL2MsgCommand.cs:5-57``) is: + + payload[0] = command byte (this enum) + payload[1] = parameter1 (single byte, e.g. brightness, mode value) + payload[2] = parameter2 high byte (BE u16) + payload[3] = parameter2 low byte + +Parameter2 is almost always the **object number** (unit#, zone#, +thermostat#, message#, button#, scene#). Parameter1 carries whatever the +specific command needs (level, mode, set-point, etc.) — see the per- +member doc-comments below for the mapping. + +The ExecuteSecurityCommand (opcode 74) wire format (from +``clsOL2MsgExecuteSecurityCommand.cs:5-90``) is: + + payload[0] = area number (1-based) + payload[1] = security mode byte (enuSecurityMode, raw 0-7) + payload[2] = code digit 1 (thousands place, 0-9) + payload[3] = code digit 2 (hundreds place, 0-9) + payload[4] = code digit 3 (tens place, 0-9) + payload[5] = code digit 4 (ones place, 0-9) + +The reply (ExecuteSecurityCommandResponse, opcode 75) carries a single +status byte at ``payload[0]`` whose values are listed in +``enuSecurityCommnadResponse.cs`` — :class:`SecurityCommandResponse` +mirrors that enum. +""" + +from __future__ import annotations + +from enum import IntEnum + +from .connection import ProtocolError + + +class Command(IntEnum): + """OMNI command codes used as ``payload[0]`` of a Command (opcode 20). + + Every member's value is sourced from + ``decompiled/project/HAI_Shared/enuUnitCommand.cs``; the trailing + line-number reference points to the exact definition. + """ + + # ---- unit / lighting ------------------------------------------------ + UNIT_OFF = 0 # enuUnitCommand.Off, line 5 + UNIT_ON = 1 # enuUnitCommand.On, line 6 + ALL_OFF = 2 # enuUnitCommand.AllOff, line 7 (alias: All=2 line 8) + ALL_ON = 3 # enuUnitCommand.AllOn, line 9 + BYPASS_ZONE = 4 # enuUnitCommand.Bypass, line 10 + RESTORE_ZONE = 5 # enuUnitCommand.Restore, line 11 + RESTORE_ALL_ZONES = 6 # enuUnitCommand.RestoreAll, line 12 + EXECUTE_BUTTON = 7 # enuUnitCommand.Button, line 13 + ENERGY = 8 # enuUnitCommand.Energy, line 14 + UNIT_LEVEL = 9 # enuUnitCommand.Level, line 15 (param1 = 0..100 %) + UNIT_DECREMENT_COUNTER = 10 # enuUnitCommand.Dec, line 16 + UNIT_INCREMENT_COUNTER = 11 # enuUnitCommand.Inc, line 17 + UNIT_SET_COUNTER = 12 # enuUnitCommand.Set, line 18 + UNIT_RAMP = 13 # enuUnitCommand.Ramp, line 19 + COMPOSE_SCENE = 14 # enuUnitCommand.Compose, line 20 + UPB_STATUS_REQUEST = 15 # enuUnitCommand.UPBStatus, line 21 + DIM_STEP = 16 # enuUnitCommand.Dim, line 22 (param1 = step) + DIM_1 = 17 # enuUnitCommand.Dim1, line 23 + DIM_2 = 18 # enuUnitCommand.Dim2, line 24 + DIM_3 = 19 # enuUnitCommand.Dim3, line 25 + DIM_4 = 20 # enuUnitCommand.Dim4, line 26 + DIM_5 = 21 # enuUnitCommand.Dim5, line 27 + DIM_6 = 22 # enuUnitCommand.Dim6, line 28 + DIM_7 = 23 # enuUnitCommand.Dim7, line 29 + DIM_8 = 24 # enuUnitCommand.Dim8, line 30 + DIM_9 = 25 # enuUnitCommand.Dim9, line 31 + UPB_BLINK = 26 # enuUnitCommand.UPBBlink, line 32 + UPB_BLINK_OFF = 27 # enuUnitCommand.UPBBlinkOff, line 33 + UPB_LINK_OFF = 28 # enuUnitCommand.UPBLinkOff, line 34 + UPB_LINK_ON = 29 # enuUnitCommand.UPBLinkOn, line 35 + UPB_LINK_SET = 30 # enuUnitCommand.UPBLinkSet, line 36 + UPB_LINK_FADE_STOP = 31 # enuUnitCommand.UPBLinkFadeStop, line 37 + BRIGHT_STEP = 32 # enuUnitCommand.Bright, line 38 (param1 = step) + BRIGHT_1 = 33 # enuUnitCommand.Bright1, line 39 + BRIGHT_2 = 34 # enuUnitCommand.Bright2, line 40 + BRIGHT_3 = 35 # enuUnitCommand.Bright3, line 41 + BRIGHT_4 = 36 # enuUnitCommand.Bright4, line 42 + BRIGHT_5 = 37 # enuUnitCommand.Bright5, line 43 + BRIGHT_6 = 38 # enuUnitCommand.Bright6, line 44 + BRIGHT_7 = 39 # enuUnitCommand.Bright7, line 45 + BRIGHT_8 = 40 # enuUnitCommand.Bright8, line 46 + BRIGHT_9 = 41 # enuUnitCommand.Bright9, line 47 + CENTRALITE_SCENE_OFF = 42 # enuUnitCommand.CentraLiteSceneOff, line 48 + CENTRALITE_SCENE_ON = 43 # enuUnitCommand.CentraLiteSceneOn, line 49 + UPB_LED_OFF = 44 # enuUnitCommand.UPBLEDOff, line 50 + UPB_LED_ON = 45 # enuUnitCommand.UPBLEDOn, line 51 + RADIO_RA_PHANTOM_OFF = 46 # enuUnitCommand.RadioRAPhantomOff, line 52 + RADIO_RA_PHANTOM_ON = 47 # enuUnitCommand.RadioRAPhantomOn, line 53 + + # ---- security (alternative path; preferred path is opcode 74) ------ + # When sent through a Command (opcode 20), parameter1 carries the user + # code index (1-based) and parameter2 carries the area number. The + # panel honours these only if the code is enabled for the area. + SECURITY_OFF = 48 # enuUnitCommand.SecurityOff, line 55 (alias Security=48 line 54) + SECURITY_DAY = 49 # enuUnitCommand.SecurityDay, line 56 + SECURITY_NIGHT = 50 # enuUnitCommand.SecurityNight, line 57 + SECURITY_AWAY = 51 # enuUnitCommand.SecurityAway, line 58 + SECURITY_VACATION = 52 # enuUnitCommand.SecurityVac, line 59 + SECURITY_DAY_INSTANT = 53 # enuUnitCommand.SecurityDyi, line 60 + SECURITY_NIGHT_DELAYED = 54 # enuUnitCommand.SecurityNtd, line 61 + SECURITY_ANY_CHANGE = 55 # enuUnitCommand.SecurityAny, line 62 + SECURITY_ARMING_DAY = 57 # enuUnitCommand.SecurityArmingDay, line 63 + SECURITY_ARMING_NIGHT = 58 # enuUnitCommand.SecurityArmingNight, line 64 + SECURITY_ARMING_AWAY = 59 # enuUnitCommand.SecurityArmingAway, line 65 + SECURITY_ARMING_VACATION = 60 # enuUnitCommand.SecurityArmingVacation, line 66 + SECURITY_ARMING_DAY_INSTANT = 61 # enuUnitCommand.SecurityArmingDayInst, line 67 + SECURITY_ARMING_NIGHT_DELAYED = 62 # enuUnitCommand.SecurityArmingNightDelay, line 68 + + # ---- energy (HMS) -------------------------------------------------- + ENERGY_OFF = 64 # enuUnitCommand.Eof, line 69 + ENERGY_ON = 65 # enuUnitCommand.Eon, line 70 + + # ---- thermostat --------------------------------------------------- + SET_THERMOSTAT_HEAT_SETPOINT = 66 # enuUnitCommand.SetLowSetPt, line 71 + SET_THERMOSTAT_COOL_SETPOINT = 67 # enuUnitCommand.SetHighSetPt, line 72 + SET_THERMOSTAT_SYSTEM_MODE = 68 # enuUnitCommand.Mode, line 73 + SET_THERMOSTAT_FAN_MODE = 69 # enuUnitCommand.Fan, line 74 + SET_THERMOSTAT_HOLD_MODE = 70 # enuUnitCommand.Hold, line 75 + THERMOSTAT_INC_DEC_LO = 71 # enuUnitCommand.IncDecLo, line 76 + THERMOSTAT_INC_DEC_HI = 72 # enuUnitCommand.IncDecHi, line 77 + SET_THERMOSTAT_HUMIDIFY_SETPOINT = 73 # enuUnitCommand.SetHumidifySetPt, line 78 + SET_THERMOSTAT_DEHUMIDIFY_SETPOINT = 74 # enuUnitCommand.SetDeHumidifySetPt, line 79 + + # ---- panel display messages --------------------------------------- + SHOW_MESSAGE_WITH_BEEP = 80 # enuUnitCommand.ShowMsgWBeep, line 81 (alias FirstMsgCmd=80 line 80) + LOG_MESSAGE = 81 # enuUnitCommand.LogMsg, line 82 + CLEAR_MESSAGE = 82 # enuUnitCommand.ClearMsg, line 83 + SAY_MESSAGE = 83 # enuUnitCommand.SayMsg, line 84 + PHONE_MESSAGE = 84 # enuUnitCommand.PhoneMsg, line 85 + SEND_MESSAGE = 85 # enuUnitCommand.SendMsg, line 86 + SHOW_MESSAGE_NO_BEEP = 86 # enuUnitCommand.ShowMsgNoBeep, line 87 + EMAIL_MESSAGE = 87 # enuUnitCommand.EMailMsg, line 88 (alias LastMsgCmd=87 line 89) + + # ---- scenes / misc ----------------------------------------------- + SCENE_OFF = 96 # enuUnitCommand.SceneOff, line 90 + SCENE_ON = 97 # enuUnitCommand.SceneOn, line 91 + SCENE_SET = 98 # enuUnitCommand.SceneSet, line 92 + TOGGLE = 99 # enuUnitCommand.Toggle, line 93 + SHOW_VIDEO = 100 # enuUnitCommand.ShowVideo, line 94 + TIMED_LEVEL = 101 # enuUnitCommand.TimedLevel, line 95 + CONSOLE_BEEP = 102 # enuUnitCommand.ConsoleBeep, line 96 + BEEP = 103 # enuUnitCommand.Beep, line 97 + EXECUTE_PROGRAM = 104 # enuUnitCommand.UserSetting, line 98 + LOCK = 105 # enuUnitCommand.Lock, line 99 + UNLOCK = 106 # enuUnitCommand.Unlock, line 100 + LUTRON_HOMEWORKS_KEYPAD = 107 # enuUnitCommand.LutronHomeWorksKeypadButtonPress, line 101 + CLIPSAL_C_BUS_SCENE = 108 # enuUnitCommand.Clipsal_C_Bus_Scene, line 102 + RADIO_RA2_PHANTOM = 109 # enuUnitCommand.RadioRA2Phantom, line 103 + STOP = 110 # enuUnitCommand.Stop, line 104 + + # ---- audio -------------------------------------------------------- + AUDIO_ZONE = 112 # enuUnitCommand.AudioZone, line 105 + AUDIO_VOLUME = 113 # enuUnitCommand.AudioVolume, line 106 + AUDIO_SOURCE = 114 # enuUnitCommand.AudioSource, line 107 + AUDIO_KEY_PRESS = 115 # enuUnitCommand.AudioKeyPress, line 108 + + +class SecurityCommandResponse(IntEnum): + """Status byte returned in an ExecuteSecurityCommandResponse (opcode 75). + + Source: ``decompiled/project/HAI_Shared/enuSecurityCommnadResponse.cs`` + (typo in the C# enum name preserved here for grep parity). + """ + + SUCCESS = 0 # line 5 + INVALID_CODE = 1 # line 6 + INVALID_SECURITY_MODE = 2 # line 7 + INVALID_AREA = 3 # line 8 + ZONES_NOT_READY = 4 # line 9 + INSTALLER_RESTORE_NEEDED = 5 # line 10 + CODE_LOCKED_OUT = 6 # line 11 + INVALID = 0xFF # line 12 + + +class CommandFailedError(ProtocolError): + """A command opcode was Nak'd by the panel, or returned a structured + failure code (e.g. the Security command response carries one of the + :class:`SecurityCommandResponse` values). + + The ``failure_code`` attribute is set when the panel returned an + ExecuteSecurityCommandResponse with a non-zero status byte; it's + ``None`` for plain Nak replies that carry no further detail. + """ + + def __init__( + self, + message: str, + *, + failure_code: int | None = None, + ) -> None: + super().__init__(message) + self.failure_code = failure_code diff --git a/src/omni_pca/events.py b/src/omni_pca/events.py new file mode 100644 index 0000000..d3d5a6d --- /dev/null +++ b/src/omni_pca/events.py @@ -0,0 +1,860 @@ +"""Typed system-event objects for Omni-Link II push notifications. + +The panel batches state-change notifications into a single ``SystemEvents`` +message (v2 opcode 55). Each batched event is a single 16-bit big-endian +word in the message payload — the message envelope is just a sequence of +those words. The 16-bit value is *category-encoded*: the high bits pick +the event family (zone, unit, arming, alarm, AC, battery, …) and the +remaining bits carry per-family fields (zone index, area, alarm type, +unit number, security mode, etc.). + +Pipeline: + + raw bytes -> Message (opcode 55) + Message -> parse_events(message) -> list[SystemEvent] + SystemEvent -> ZoneStateChanged | UnitStateChanged | ArmingChanged | … + +A single SystemEvents message can carry multiple events, so the public +parse entry points always return a *list*. ``EventStream`` flattens that +list across an underlying ``OmniConnection.unsolicited()`` iterator so +consumers can iterate one typed event at a time. + +References (decompiled C# source): + clsOLMsgSystemEvents.cs — message envelope + per-event word read + enuOmniLink2MessageType.cs:60 — SystemEvents = 55 (v2 opcode) + enuEventType.cs — category enum (the values used here) + enuAlarmType.cs — alarm subtype byte + enuSecurityMode.cs — security mode byte (used by arming) + clsText.cs:1585-1690 (GetEventCategory) + — bit-mask classifier we mirror below + clsText.cs:1693-1911 (GetEventText) + — per-category sub-field extraction +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from enum import IntEnum +from typing import ClassVar + +from .message import Message +from .models import SecurityMode +from .opcodes import OmniLink2MessageType + +# -------------------------------------------------------------------------- +# Numeric tags / enums +# -------------------------------------------------------------------------- + + +class EventType(IntEnum): + """Symbolic identifiers for the event subclasses we expose. + + These are *not* the raw 16-bit on-the-wire codes — those are densely + packed bit fields. We assign each typed-event subclass a stable small + integer so that ``SystemEvent.event_type`` is a single discriminator + and ``EVENT_REGISTRY`` can dispatch on it. + + Reference: enuEventType.cs (the C# equivalent that drives clsText + .GetEventCategory). The order/values below intentionally mirror the + classification order in clsText.cs:1585-1690 so a future maintainer + reading both files side-by-side sees the same shape. + """ + + USER_MACRO_BUTTON = 0 # clsText.cs:1587-1590 + PRO_LINK_MESSAGE = 1 # clsText.cs:1591-1594 + CENTRALITE_SWITCH = 2 # clsText.cs:1595-1598 + ALARM_ACTIVATED = 3 # clsText.cs:1599-1602, 1738-1750 + ALARM_CLEARED = 4 # synthesized: ALARM word with alarm_type=0 + ZONE_STATE_CHANGED = 5 # clsText.cs:1603-1606, 1751-1756 + UNIT_STATE_CHANGED = 6 # clsText.cs:1607-1610, 1757-1765 + X10_CODE = 7 # clsText.cs:1615-1618 + ALL_ON_OFF = 8 # clsText.cs:1643-1646 + PHONE_LINE_DEAD = 9 # clsText.cs:1649,1853-1857 + PHONE_LINE_RING = 10 # clsText.cs:1651,1858-1859 + PHONE_LINE_OFF_HOOK = 11 # clsText.cs:1653,1860-1861 + PHONE_LINE_ON_HOOK = 12 # clsText.cs:1655,1862-1863 + AC_LOST = 13 # clsText.cs:1657,1866-1870 + AC_RESTORED = 14 # clsText.cs:1659,1871-1872 + BATTERY_LOW = 15 # clsText.cs:1661,1875-1879 + BATTERY_RESTORED = 16 # clsText.cs:1663,1880-1881 + DCM_TROUBLE = 17 # clsText.cs:1665,1884-1888 + DCM_OK = 18 # clsText.cs:1667,1889-1890 + ENERGY_COST_LOW = 19 # clsText.cs:1669,1893-1897 + ENERGY_COST_MID = 20 # clsText.cs:1671,1898-1899 + ENERGY_COST_HIGH = 21 # clsText.cs:1673,1900-1901 + ENERGY_COST_CRITICAL = 22 # clsText.cs:1675,1902-1903 + CAMERA = 23 # clsText.cs:1677-1683,1906-1907 + ACCESS_READER = 24 # clsText.cs:1684-1688,1908-1909 + UPB_LINK = 25 # clsText.cs:1635-1638,1795-1810 + ARMING_CHANGED = 26 # clsText.cs:1689,2140-2217 (catch-all) + UNKNOWN = 0xFF # parser couldn't classify + + +class AlarmKind(IntEnum): + """Alarm subtype byte (enuAlarmType.cs).""" + + ANY = 0 # enuAlarmType.cs:5 + BURGLARY = 1 # enuAlarmType.cs:6 + FIRE = 2 # enuAlarmType.cs:7 + GAS = 3 # enuAlarmType.cs:8 + AUX = 4 # enuAlarmType.cs:9 + FREEZE = 5 # enuAlarmType.cs:10 + WATER = 6 # enuAlarmType.cs:11 + DURESS = 7 # enuAlarmType.cs:12 + TEMPERATURE = 8 # enuAlarmType.cs:13 + CONFIRMED_BURGLARY = 9 # enuAlarmType.cs:14 + + +class UpbLinkAction(IntEnum): + """UPB link sub-action, the upper byte of a UPB-LINK event word. + + The C# code maps these via ``enuButtonType`` — UPBLinkOff/On/Set/ + FadeStop — and the enum values are picked so they line up with the + on-the-wire upper byte (clsText.cs:1801-1808 + enuEventType.cs:14-19, + where UPB_LINK_OFF=64512, UPB_LINK_ON=64768, UPB_LINK_SET=65024, + UPB_LINK_FADE_STOP=65280 — i.e. high-byte = 0xFC, 0xFD, 0xFE, 0xFF). + """ + + OFF = 0xFC + ON = 0xFD + SET = 0xFE + FADE_STOP = 0xFF + + +# -------------------------------------------------------------------------- +# Wire-format helpers +# -------------------------------------------------------------------------- + + +def _ensure_system_events(message: Message) -> bytes: + """Validate that ``message`` is a v2 SystemEvents reply, return its + payload bytes (everything after the opcode). + + Reference: clsOLMsgSystemEvents.cs (entire file) — the message body + is just ``[opcode][word1_hi][word1_lo][word2_hi][word2_lo]…``. + """ + if message.opcode != int(OmniLink2MessageType.SystemEvents): + raise ValueError( + "not a SystemEvents message: opcode " + f"{message.opcode} (expected {int(OmniLink2MessageType.SystemEvents)})" + ) + payload = message.payload + if len(payload) % 2 != 0: + # The C# count formula is ``(MessageLength - 1) / 2`` and silently + # truncates a trailing odd byte. We do the same — never raise. + payload = payload[: len(payload) - 1] + return payload + + +def _iter_event_words(payload: bytes) -> list[int]: + """Split a SystemEvents payload into 16-bit BE words. + + Reference: clsOLMsgSystemEvents.cs:15-18 (SystemEvent(index) accessor). + """ + return [(payload[i] << 8) | payload[i + 1] for i in range(0, len(payload), 2)] + + +# -------------------------------------------------------------------------- +# Base + concrete event classes +# -------------------------------------------------------------------------- + + +@dataclass(frozen=True, slots=True) +class SystemEvent: + """Base class for every typed system-event object. + + Subclasses override ``EVENT_TYPE`` (the discriminator) and may add + extra attributes carrying decoded fields. The original 16-bit word + is always preserved as ``raw_word`` so callers can fall back to the + unstructured value for diagnostics. + """ + + event_type: EventType + raw_word: int + EVENT_TYPE: ClassVar[EventType | None] = None + + @classmethod + def parse(cls, message: Message) -> list[SystemEvent]: + """Decode every event word in a v2 SystemEvents message. + + Returns a list because a single message can batch multiple events + (clsOLMsgSystemEvents.SystemEventsCount() — one count value, but + the protocol allows ``count`` to be > 1). + """ + return parse_events(message) + + +@dataclass(frozen=True, slots=True) +class UserMacroButton(SystemEvent): + """A user macro button (1-255) was triggered. + + Wire layout: ``(word & 0xFF00) == 0`` → button index in low byte. + Reference: clsText.cs:1587-1590, 1697-1698. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.USER_MACRO_BUTTON + button_index: int = 0 + + +@dataclass(frozen=True, slots=True) +class ProLinkMessage(SystemEvent): + """A Pro-Link message (256..383) was received. + + Wire layout: ``(word & 0xFF80) == 0x100`` → message index in low 7 bits. + Reference: clsText.cs:1591-1594, 1699-1700. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.PRO_LINK_MESSAGE + message_index: int = 0 + + +@dataclass(frozen=True, slots=True) +class CentraLiteSwitch(SystemEvent): + """A CentraLite/Aegis scene-keypad button was pressed. + + Wire layout: ``(word & 0xFF80) == 0x180`` → switch sub-index in low 7 bits. + Reference: clsText.cs:1595-1598, 1701-1736. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.CENTRALITE_SWITCH + switch_index: int = 0 + + +@dataclass(frozen=True, slots=True) +class AlarmActivated(SystemEvent): + """A real alarm condition was triggered. + + Wire layout: ``(word & 0xFF00) == 0x200`` (the ALARM family). + - bits 4-7 of low byte (``(word & 0xF0) >> 4``) → enuAlarmType + - bits 0-3 of low byte (``word & 0xF``) → area index + (0 means "system-wide alarm, no specific area") + Reference: clsText.cs:1599-1602, 1738-1750. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.ALARM_ACTIVATED + area_index: int = 0 + alarm_type: int = 0 # AlarmKind value + + +@dataclass(frozen=True, slots=True) +class AlarmCleared(SystemEvent): + """An alarm condition was cleared. + + Synthesized — the wire word is in the ALARM family but with the + alarm-type nibble equal to ``AlarmKind.ANY`` (0). The C# code does + not have a separate cleared category; it simply formats the word as + an "Any" alarm. We split it out so home-automation callers can react + to "alarm went away" without rebuilding the bitfield themselves. + + Reference: clsText.cs:1738-1750 (the ``a`` variable can be 0). + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.ALARM_CLEARED + area_index: int = 0 + + +@dataclass(frozen=True, slots=True) +class ZoneStateChanged(SystemEvent): + """A security zone changed state (open/close, secure/not-ready). + + Wire layout: ``(word & 0xFC00) == 0x400`` (ZONE_STATE_CHANGE family). + - low byte → zone index 1..255 + - bit 9 (``(word >> 8) & 0x02``) → 1 = not-ready/open, 0 = secure + Reference: clsText.cs:1603-1606, 1751-1756. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.ZONE_STATE_CHANGED + zone_index: int = 0 + new_state: int = 0 # 0=secure, 1=not-ready/open + + @property + def is_open(self) -> bool: + return self.new_state != 0 + + @property + def is_secure(self) -> bool: + return self.new_state == 0 + + +@dataclass(frozen=True, slots=True) +class UnitStateChanged(SystemEvent): + """A controllable unit (light/output) changed state. + + Wire layout: ``(word & 0xFC00) == 0x800`` (UNIT_STATE_CHANGE family). + - unit index = ``((word >> 8) & 1) * 256 + (word & 0xFF)`` + (so bit 8 lifts the index above 255) + - bit 9 (``(word >> 8) & 0x02``) → 1 = on, 0 = off + Reference: clsText.cs:1607-1610, 1757-1765. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.UNIT_STATE_CHANGED + unit_index: int = 0 + new_state: int = 0 # 0=off, 1=on + + @property + def is_on(self) -> bool: + return self.new_state != 0 + + +@dataclass(frozen=True, slots=True) +class X10CodeReceived(SystemEvent): + """An X-10 house/unit code was seen on the powerline. + + Wire layout: ``(word & 0xFC00) == 0xC00``. + - house letter A..P = chr(65 + ((word & 0xFF) >> 4)) + - unit number 1..16 = (word & 0xF) + 1 + - on/off (bit 9) = ``((word >> 8) & 2) == 2`` → On + - "all units" (bit 8) = ``(word & 0x100) != 0`` + Reference: clsText.cs:1615-1618, 1785-1793. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.X10_CODE + house_code: str = "A" + unit_number: int = 1 + is_on: bool = False + all_units: bool = False + + +@dataclass(frozen=True, slots=True) +class AllOnOff(SystemEvent): + """An "All On" or "All Off" command was issued. + + Wire layout: ``(word & 0xFFE0) == 0x3E0`` (=992 family). + - bit 4 (``(word & 0x10) >> 4``) → 0 = all off, 1 = all on + - low 4 bits (``word & 0xF``) → area (0 = system-wide) + Reference: clsText.cs:1643-1646, 1835-1851. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.ALL_ON_OFF + area_index: int = 0 + on: bool = False + + +@dataclass(frozen=True, slots=True) +class PhoneLineDead(SystemEvent): + """Phone line went dead (word == 768). + + Reference: clsText.cs:1649, 1853-1857.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.PHONE_LINE_DEAD + + +@dataclass(frozen=True, slots=True) +class PhoneLineRinging(SystemEvent): + """Phone is ringing (word == 769). + + Reference: clsText.cs:1651, 1858-1859.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.PHONE_LINE_RING + + +@dataclass(frozen=True, slots=True) +class PhoneLineOffHook(SystemEvent): + """Panel went off-hook to dial out (word == 770). + + Reference: clsText.cs:1653, 1860-1861.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.PHONE_LINE_OFF_HOOK + + +@dataclass(frozen=True, slots=True) +class PhoneLineOnHook(SystemEvent): + """Panel hung up (word == 771). + + Reference: clsText.cs:1655, 1862-1863.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.PHONE_LINE_ON_HOOK + + +@dataclass(frozen=True, slots=True) +class AcLost(SystemEvent): + """Mains AC was lost (word == 772). + + Reference: clsText.cs:1657, 1866-1870.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.AC_LOST + + +@dataclass(frozen=True, slots=True) +class AcRestored(SystemEvent): + """Mains AC came back (word == 773). + + Reference: clsText.cs:1659, 1871-1872.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.AC_RESTORED + + +@dataclass(frozen=True, slots=True) +class BatteryLow(SystemEvent): + """Backup battery is low (word == 774). + + Reference: clsText.cs:1661, 1875-1879.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.BATTERY_LOW + + +@dataclass(frozen=True, slots=True) +class BatteryRestored(SystemEvent): + """Backup battery is OK again (word == 775). + + Reference: clsText.cs:1663, 1880-1881.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.BATTERY_RESTORED + + +@dataclass(frozen=True, slots=True) +class DcmTrouble(SystemEvent): + """Digital communicator failure (word == 776). + + Reference: clsText.cs:1665, 1884-1888.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.DCM_TROUBLE + + +@dataclass(frozen=True, slots=True) +class DcmOk(SystemEvent): + """Digital communicator OK (word == 777). + + Reference: clsText.cs:1667, 1889-1890.""" + + EVENT_TYPE: ClassVar[EventType] = EventType.DCM_OK + + +@dataclass(frozen=True, slots=True) +class EnergyCostChanged(SystemEvent): + """Real-time energy-cost band changed (words 778..781). + + The discriminator on the dataclass tells you which band; ``raw_word`` + keeps the original number in case a future firmware adds more. + Reference: clsText.cs:1669-1676, 1893-1903. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.ENERGY_COST_LOW # placeholder, overridden by parse + cost_level: int = 0 # 0=low, 1=mid, 2=high, 3=critical + + +@dataclass(frozen=True, slots=True) +class CameraTrigger(SystemEvent): + """A camera input fired (words 782..787). + + Wire layout: ``camera_index = word - 781`` → 1..6. + Reference: clsText.cs:1677-1683, 1906-1907. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.CAMERA + camera_index: int = 1 + + +@dataclass(frozen=True, slots=True) +class AccessReaderEvent(SystemEvent): + """An access-control reader emitted an event (words 976..991). + + Wire layout: ``reader_index = (word & 0xF) + 1``. + Reference: clsText.cs:1684-1688, 1908-1909. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.ACCESS_READER + reader_index: int = 1 + + +@dataclass(frozen=True, slots=True) +class UpbLinkEvent(SystemEvent): + """A UPB link command was sent (words 0xFC00..0xFFFF). + + Wire layout: ``(word & 0xFC00) == 0xFC00``. + - upper byte → enuButtonType: UPBLinkOff(0xFC), UPBLinkOn(0xFD), + UPBLinkSet(0xFE), UPBLinkFadeStop(0xFF) + - lower byte → link index 1..255 + Reference: clsText.cs:1635-1638, 1795-1810. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.UPB_LINK + link_index: int = 0 + action: int = 0 # UpbLinkAction value (raw upper byte) + + +@dataclass(frozen=True, slots=True) +class ArmingChanged(SystemEvent): + """An area's security mode changed (catch-all SECURITY_MODE_CHANGE). + + Wire layout (clsText.cs:2155-2217 — this is the default branch of + ``GetButtonText``, which is what GetEventCategory routes to when the + event word doesn't match any other family): + + - bits 12-14 (``(word >> 12) & 7``) → enuSecurityMode value + - bits 8-11 (``(word >> 8) & 0xF``) → area index (0 = system) + - low byte (``word & 0xFF``) → user/code index that + triggered the change (0 = unknown / panel-initiated) + - bit 15 (``(word >> 8) & 0x80``) → "Set" vs. "Arm" verb, + surfaced as ``is_set_command`` + - bit 11 (``(word >> 8) & 0x40``)... reserved-ish; left as raw + Reference: clsText.cs:1689 (catch-all) + 2140-2217 (decoder). + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.ARMING_CHANGED + area_index: int = 0 + new_mode: int = 0 # SecurityMode value + user_index: int = 0 + is_set_command: bool = False # True for SET (panic/Lumina), False for ARM + + @property + def mode_name(self) -> str: + try: + return SecurityMode(self.new_mode).name + except ValueError: + return f"Unknown({self.new_mode})" + + +@dataclass(frozen=True, slots=True) +class UnknownEvent(SystemEvent): + """Catch-all so an unrecognised event word never crashes the iterator. + + The event type byte was parseable but didn't match any family in + clsText.GetEventCategory's classification — likely a future-firmware + code we haven't mapped yet. + """ + + EVENT_TYPE: ClassVar[EventType] = EventType.UNKNOWN + + +# -------------------------------------------------------------------------- +# Per-word classifier (mirrors clsText.GetEventCategory) +# -------------------------------------------------------------------------- + + +def _classify(word: int) -> SystemEvent: + """Decode a single 16-bit event word into the appropriate subclass. + + Mirrors ``clsText.GetEventCategory`` (clsText.cs:1585-1690) and the + per-family field-extraction in ``GetEventText`` (clsText.cs:1693-1911). + The classification order matters — exact-match cases (PHONE_, AC_, + BATTERY_, …) are inspected before the wide SECURITY_MODE_CHANGE + catch-all, exactly as the C# does. + """ + # USER_MACRO_BUTTON: high byte == 0 + if (word & 0xFF00) == 0x0000: + return UserMacroButton( + event_type=EventType.USER_MACRO_BUTTON, + raw_word=word, + button_index=word & 0xFF, + ) + + # PRO_LINK_MESSAGE: high 9 bits == 0x100 + if (word & 0xFF80) == 0x0100: + return ProLinkMessage( + event_type=EventType.PRO_LINK_MESSAGE, + raw_word=word, + message_index=word & 0x7F, + ) + + # CENTRALITE_SWITCH: high 9 bits == 0x180 + if (word & 0xFF80) == 0x0180: + return CentraLiteSwitch( + event_type=EventType.CENTRALITE_SWITCH, + raw_word=word, + switch_index=word & 0x7F, + ) + + # ALARM (activated/cleared): top byte == 0x02 + if (word & 0xFF00) == 0x0200: + alarm_type = (word & 0xF0) >> 4 + area = word & 0x0F + if alarm_type == int(AlarmKind.ANY): + # Per clsText.cs:1738-1750, the "ANY" subtype is what the panel + # emits when an alarm is being cleared — it formats it as + # "Any alarm cleared" string. Surface as a distinct subclass. + return AlarmCleared( + event_type=EventType.ALARM_CLEARED, + raw_word=word, + area_index=area, + ) + return AlarmActivated( + event_type=EventType.ALARM_ACTIVATED, + raw_word=word, + area_index=area, + alarm_type=alarm_type, + ) + + # ZONE_STATE_CHANGE: top 6 bits == 0x4 + if (word & 0xFC00) == 0x0400: + return ZoneStateChanged( + event_type=EventType.ZONE_STATE_CHANGED, + raw_word=word, + zone_index=word & 0xFF, + new_state=1 if ((word >> 8) & 0x02) == 0x02 else 0, + ) + + # UNIT_STATE_CHANGE: top 6 bits == 0x8 + if (word & 0xFC00) == 0x0800: + unit_index = ((word >> 8) & 0x01) * 256 + (word & 0xFF) + return UnitStateChanged( + event_type=EventType.UNIT_STATE_CHANGED, + raw_word=word, + unit_index=unit_index, + new_state=1 if ((word >> 8) & 0x02) == 0x02 else 0, + ) + + # X-10 code: top 6 bits == 0xC + if (word & 0xFC00) == 0x0C00: + return X10CodeReceived( + event_type=EventType.X10_CODE, + raw_word=word, + house_code=chr(65 + ((word & 0xFF) >> 4)), + unit_number=(word & 0x0F) + 1, + is_on=((word >> 8) & 0x02) == 0x02, + all_units=(word & 0x0100) != 0, + ) + + # ALL_ON_OFF: top 11 bits == 992 (0x3E0) — covers 992..1023, but + # we leave 1024+ for ZONE which has already been handled above. + if (word & 0xFFE0) == 0x03E0: + return AllOnOff( + event_type=EventType.ALL_ON_OFF, + raw_word=word, + area_index=word & 0x0F, + on=(word & 0x10) != 0, + ) + + # Exact-match singletons (PHONE_, AC_, BATTERY_, DCM_, ENERGY, CAMERA, + # ACCESS_READER) come before the SECURITY catch-all. + if word == 768: + return PhoneLineDead(event_type=EventType.PHONE_LINE_DEAD, raw_word=word) + if word == 769: + return PhoneLineRinging(event_type=EventType.PHONE_LINE_RING, raw_word=word) + if word == 770: + return PhoneLineOffHook(event_type=EventType.PHONE_LINE_OFF_HOOK, raw_word=word) + if word == 771: + return PhoneLineOnHook(event_type=EventType.PHONE_LINE_ON_HOOK, raw_word=word) + if word == 772: + return AcLost(event_type=EventType.AC_LOST, raw_word=word) + if word == 773: + return AcRestored(event_type=EventType.AC_RESTORED, raw_word=word) + if word == 774: + return BatteryLow(event_type=EventType.BATTERY_LOW, raw_word=word) + if word == 775: + return BatteryRestored(event_type=EventType.BATTERY_RESTORED, raw_word=word) + if word == 776: + return DcmTrouble(event_type=EventType.DCM_TROUBLE, raw_word=word) + if word == 777: + return DcmOk(event_type=EventType.DCM_OK, raw_word=word) + if 778 <= word <= 781: + level = word - 778 + return EnergyCostChanged( + event_type=EventType(EventType.ENERGY_COST_LOW + level), + raw_word=word, + cost_level=level, + ) + if 782 <= word <= 787: + return CameraTrigger( + event_type=EventType.CAMERA, + raw_word=word, + camera_index=word - 781, + ) + if 976 <= word <= 991: + return AccessReaderEvent( + event_type=EventType.ACCESS_READER, + raw_word=word, + reader_index=(word & 0x0F) + 1, + ) + + # UPB_LINK: top 6 bits == 0xFC (covers 0xFC00..0xFFFF). + # The C# code peels off the 0xFD00 (UPB_LINK_ON) sub-family first to + # check whether the unit is actually an HLC or Z-Wave room controller + # (clsText.cs:1619-1633). We don't have access to the panel's unit + # type cache here, so we always classify these as UpbLinkEvent — the + # caller can refine using the unit_index if they care. + if (word & 0xFC00) == 0xFC00: + upper = (word >> 8) & 0xFF + return UpbLinkEvent( + event_type=EventType.UPB_LINK, + raw_word=word, + link_index=word & 0xFF, + action=upper, + ) + + # SECURITY_MODE_CHANGE catch-all (clsText.cs:1689). This is the + # default branch in GetEventCategory: anything that didn't match any + # of the families above lands here. The 16-bit layout is: + # bits 12-14 → SecurityMode (0..7) + # bits 8-11 → area index (0 = system / no specific area) + # bit 15 → "Set" vs. "Arm" verb (Lumina vs. Omni semantics) + # low byte → user/code index that triggered the change + if (word >> 8) & 0xF0: + # Plausible arming change: the high nibble of the high byte is + # non-zero (carries either the Set bit or the area+mode bits). + return ArmingChanged( + event_type=EventType.ARMING_CHANGED, + raw_word=word, + area_index=(word >> 8) & 0x0F, + new_mode=(word >> 12) & 0x07, + user_index=word & 0xFF, + is_set_command=((word >> 8) & 0x80) == 0x80, + ) + + return UnknownEvent(event_type=EventType.UNKNOWN, raw_word=word) + + +# -------------------------------------------------------------------------- +# Public parse entry points +# -------------------------------------------------------------------------- + + +def parse_events(message: Message) -> list[SystemEvent]: + """Decode a v2 ``SystemEvents`` (opcode 55) message into typed events. + + The panel batches multiple state changes into a single message, so + the return type is always a list — even for messages that carry just + one event. Empty SystemEvents messages return an empty list rather + than raising. + + Reference: clsOLMsgSystemEvents.cs:10-18 (SystemEventsCount + per- + word accessor). + """ + payload = _ensure_system_events(message) + return [_classify(w) for w in _iter_event_words(payload)] + + +# -------------------------------------------------------------------------- +# Registry — discriminator → subclass, useful for callers doing isinstance +# routing or generating documentation. +# -------------------------------------------------------------------------- + +EVENT_REGISTRY: dict[int, type[SystemEvent]] = { + int(EventType.USER_MACRO_BUTTON): UserMacroButton, + int(EventType.PRO_LINK_MESSAGE): ProLinkMessage, + int(EventType.CENTRALITE_SWITCH): CentraLiteSwitch, + int(EventType.ALARM_ACTIVATED): AlarmActivated, + int(EventType.ALARM_CLEARED): AlarmCleared, + int(EventType.ZONE_STATE_CHANGED): ZoneStateChanged, + int(EventType.UNIT_STATE_CHANGED): UnitStateChanged, + int(EventType.X10_CODE): X10CodeReceived, + int(EventType.ALL_ON_OFF): AllOnOff, + int(EventType.PHONE_LINE_DEAD): PhoneLineDead, + int(EventType.PHONE_LINE_RING): PhoneLineRinging, + int(EventType.PHONE_LINE_OFF_HOOK): PhoneLineOffHook, + int(EventType.PHONE_LINE_ON_HOOK): PhoneLineOnHook, + int(EventType.AC_LOST): AcLost, + int(EventType.AC_RESTORED): AcRestored, + int(EventType.BATTERY_LOW): BatteryLow, + int(EventType.BATTERY_RESTORED): BatteryRestored, + int(EventType.DCM_TROUBLE): DcmTrouble, + int(EventType.DCM_OK): DcmOk, + int(EventType.ENERGY_COST_LOW): EnergyCostChanged, + int(EventType.ENERGY_COST_MID): EnergyCostChanged, + int(EventType.ENERGY_COST_HIGH): EnergyCostChanged, + int(EventType.ENERGY_COST_CRITICAL): EnergyCostChanged, + int(EventType.CAMERA): CameraTrigger, + int(EventType.ACCESS_READER): AccessReaderEvent, + int(EventType.UPB_LINK): UpbLinkEvent, + int(EventType.ARMING_CHANGED): ArmingChanged, + int(EventType.UNKNOWN): UnknownEvent, +} + + +# -------------------------------------------------------------------------- +# Helper: queue-backed iterator for tests + library consumers +# -------------------------------------------------------------------------- + + +def _has_unsolicited(obj: object) -> bool: + """True if ``obj`` quacks like an OmniConnection (has ``unsolicited()``). + + We avoid importing OmniConnection at runtime to keep the dependency + purely a type hint, so EventStream stays usable with any object that + exposes the same async-iterator contract (real connection, mock, or + in-memory queue wrapper used in tests). + """ + return callable(getattr(obj, "unsolicited", None)) + + +@dataclass +class EventStream: + """Async iterator over typed ``SystemEvent`` objects. + + Wraps any object with an ``unsolicited() -> AsyncIterator[Message]`` + method (typically an :class:`OmniConnection`). Filters out non- + SystemEvents messages, parses each SystemEvents message into a list + of typed events, and yields them one at a time. A single inbound + message that batches three events therefore produces three iterator + steps — callers don't have to know about batching. + + Usage:: + + async for event in EventStream(conn): + match event: + case ZoneStateChanged() if event.is_open: + print(f"zone {event.zone_index} opened") + case ArmingChanged(): + print(f"area {event.area_index} -> {event.mode_name}") + """ + + source: object # OmniConnection or duck-typed equivalent + _buffer: list[SystemEvent] = field(default_factory=list) + + def __post_init__(self) -> None: + if not _has_unsolicited(self.source): + raise TypeError( + "EventStream source must expose an unsolicited() method " + f"(got {type(self.source).__name__})" + ) + + def __aiter__(self) -> EventStream: + return self + + async def __anext__(self) -> SystemEvent: + # Drain buffered events from the previous batched message first. + while not self._buffer: + try: + # ``unsolicited()`` returns a fresh async generator each + # call on the real connection, but tests pass us a queue + # wrapper that returns a long-lived iterator. Either way + # we want one message at a time, so we manually advance. + if not hasattr(self, "_iter") or self._iter is None: # type: ignore[has-type] + self._iter = self.source.unsolicited().__aiter__() # type: ignore[attr-defined] + msg = await self._iter.__anext__() + except StopAsyncIteration: + raise + except asyncio.CancelledError: + raise + if msg.opcode != int(OmniLink2MessageType.SystemEvents): + # Non-event message (Status, Ack, …) — silently ignore. + continue + self._buffer = parse_events(msg) + return self._buffer.pop(0) + + +__all__ = [ + "EVENT_REGISTRY", + "AcLost", + "AcRestored", + "AccessReaderEvent", + "AlarmActivated", + "AlarmCleared", + "AlarmKind", + "AllOnOff", + "ArmingChanged", + "BatteryLow", + "BatteryRestored", + "CameraTrigger", + "CentraLiteSwitch", + "DcmOk", + "DcmTrouble", + "EnergyCostChanged", + "EventStream", + "EventType", + "PhoneLineDead", + "PhoneLineOffHook", + "PhoneLineOnHook", + "PhoneLineRinging", + "ProLinkMessage", + "SystemEvent", + "UnitStateChanged", + "UnknownEvent", + "UpbLinkAction", + "UpbLinkEvent", + "UserMacroButton", + "X10CodeReceived", + "ZoneStateChanged", + "parse_events", +] diff --git a/tests/test_commands.py b/tests/test_commands.py new file mode 100644 index 0000000..439a194 --- /dev/null +++ b/tests/test_commands.py @@ -0,0 +1,519 @@ +"""Unit tests for command opcodes and status range queries. + +The tests use a captured-payload approach: we monkey-patch +``OmniClient._conn.request`` so it records the (opcode, payload) pair +that the client would emit and returns whatever canned ``Message`` +the test wants. No network involved — just round-trip the bytes. + +Conventions: + * Each test pins the exact wire bytes the client should produce, so + that any future refactor that rearranges the payload layout is + caught immediately. + * Where a single command can be expressed as a high-level helper + (``turn_unit_on``) we still verify the underlying ``Command`` + enum value and the param1/param2 byte placement, not just the + success path. +""" + +from __future__ import annotations + +import struct +from collections.abc import Callable +from dataclasses import dataclass + +import pytest + +from omni_pca.client import OmniClient +from omni_pca.commands import Command, CommandFailedError, SecurityCommandResponse +from omni_pca.message import Message, encode_v2 +from omni_pca.models import ( + AreaStatus, + FanMode, + HoldMode, + HvacMode, + ObjectType, + SecurityMode, + ThermostatStatus, + UnitStatus, + ZoneStatus, +) +from omni_pca.opcodes import OmniLink2MessageType + +# -------------------------------------------------------------------------- +# Test scaffolding: a stub OmniClient that captures requests instead of +# sending them. We bypass __init__ (which builds an OmniConnection) by +# using object.__new__ + manually setting the _conn attr to our stub. +# -------------------------------------------------------------------------- + + +@dataclass +class _RecordedRequest: + opcode: int + payload: bytes + + +class _StubConn: + """Stand-in for OmniConnection; .request() captures + returns a canned reply.""" + + def __init__( + self, + reply_factory: Callable[[int, bytes], Message] | None = None, + ) -> None: + self.calls: list[_RecordedRequest] = [] + self._reply_factory = reply_factory or self._default_ack + + @staticmethod + def _default_ack(_opcode: int, _payload: bytes) -> Message: + return encode_v2(OmniLink2MessageType.Ack) + + async def request( + self, + opcode: OmniLink2MessageType | int, + payload: bytes = b"", + timeout: float | None = None, + ) -> Message: + del timeout # mirror the OmniConnection.request signature; unused here + op_int = int(opcode) + self.calls.append(_RecordedRequest(opcode=op_int, payload=bytes(payload))) + return self._reply_factory(op_int, bytes(payload)) + + +def _make_client(stub: _StubConn) -> OmniClient: + """Build an OmniClient with a stubbed connection (no socket, no handshake).""" + client = object.__new__(OmniClient) + client._conn = stub # type: ignore[attr-defined] + client._subscriber_task = None # type: ignore[attr-defined] + return client + + +# -------------------------------------------------------------------------- +# Command enum value pins. These guard against accidental renumbering. +# -------------------------------------------------------------------------- + + +def test_command_enum_pins_unit_values() -> None: + assert Command.UNIT_OFF == 0 + assert Command.UNIT_ON == 1 + assert Command.UNIT_LEVEL == 9 + assert Command.BYPASS_ZONE == 4 + assert Command.RESTORE_ZONE == 5 + + +def test_command_enum_pins_thermostat_values() -> None: + # enuUnitCommand.SetLowSetPt (line 71) → 66 (heat) + assert Command.SET_THERMOSTAT_HEAT_SETPOINT == 66 + # enuUnitCommand.SetHighSetPt (line 72) → 67 (cool) + assert Command.SET_THERMOSTAT_COOL_SETPOINT == 67 + # enuUnitCommand.Mode/Fan/Hold (lines 73/74/75) + assert Command.SET_THERMOSTAT_SYSTEM_MODE == 68 + assert Command.SET_THERMOSTAT_FAN_MODE == 69 + assert Command.SET_THERMOSTAT_HOLD_MODE == 70 + + +def test_command_enum_pins_message_and_program_values() -> None: + assert Command.SHOW_MESSAGE_WITH_BEEP == 80 + assert Command.LOG_MESSAGE == 81 + assert Command.CLEAR_MESSAGE == 82 + assert Command.SHOW_MESSAGE_NO_BEEP == 86 + assert Command.EXECUTE_BUTTON == 7 + assert Command.EXECUTE_PROGRAM == 104 + + +def test_security_command_response_enum_pins() -> None: + assert SecurityCommandResponse.SUCCESS == 0 + assert SecurityCommandResponse.INVALID_CODE == 1 + assert SecurityCommandResponse.INVALID_AREA == 3 + assert SecurityCommandResponse.CODE_LOCKED_OUT == 6 + + +# -------------------------------------------------------------------------- +# execute_command() — generic Command opcode (20) +# -------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_execute_command_payload_layout() -> None: + stub = _StubConn() + client = _make_client(stub) + # Pretend we're flipping unit #257 ON (parameter2 needs both bytes). + await client.execute_command(Command.UNIT_ON, parameter1=0, parameter2=257) + + assert len(stub.calls) == 1 + call = stub.calls[0] + assert call.opcode == int(OmniLink2MessageType.Command) + # Command (1) + p1 (1 byte) + p2 (BE u16) = 4 bytes + assert call.payload == bytes([1, 0, 0x01, 0x01]) + + +@pytest.mark.asyncio +async def test_execute_command_packs_param1_byte_and_param2_be_u16() -> None: + stub = _StubConn() + client = _make_client(stub) + await client.execute_command( + Command.UNIT_LEVEL, parameter1=75, parameter2=0xABCD + ) + payload = stub.calls[0].payload + # [cmd=9, p1=75, p2_hi=0xAB, p2_lo=0xCD] + assert payload == bytes([9, 75, 0xAB, 0xCD]) + + +@pytest.mark.asyncio +async def test_execute_command_validates_param_ranges() -> None: + stub = _StubConn() + client = _make_client(stub) + with pytest.raises(ValueError, match="parameter1"): + await client.execute_command(Command.UNIT_ON, parameter1=256) + with pytest.raises(ValueError, match="parameter2"): + await client.execute_command(Command.UNIT_ON, parameter2=0x10000) + # No request emitted on validation failure. + assert stub.calls == [] + + +@pytest.mark.asyncio +async def test_execute_command_raises_on_nak() -> None: + def nak_reply(_op: int, _pl: bytes) -> Message: + return encode_v2(OmniLink2MessageType.Nak) + + stub = _StubConn(reply_factory=nak_reply) + client = _make_client(stub) + with pytest.raises(CommandFailedError, match="NAK"): + await client.execute_command(Command.UNIT_ON, parameter2=1) + + +# -------------------------------------------------------------------------- +# Convenience wrappers over execute_command +# -------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_turn_unit_on_off_emits_correct_command_byte() -> None: + stub = _StubConn() + client = _make_client(stub) + await client.turn_unit_on(5) + await client.turn_unit_off(5) + assert stub.calls[0].payload == bytes([1, 0, 0, 5]) # UNIT_ON + assert stub.calls[1].payload == bytes([0, 0, 0, 5]) # UNIT_OFF + + +@pytest.mark.asyncio +async def test_set_unit_level_validates_range_and_emits_level() -> None: + stub = _StubConn() + client = _make_client(stub) + await client.set_unit_level(3, 50) + assert stub.calls[0].payload == bytes([9, 50, 0, 3]) + with pytest.raises(ValueError, match=r"0\.\.100"): + await client.set_unit_level(3, 101) + + +@pytest.mark.asyncio +async def test_bypass_and_restore_zone_emit_correct_payload() -> None: + stub = _StubConn() + client = _make_client(stub) + await client.bypass_zone(12, code=2) + await client.restore_zone(12, code=2) + assert stub.calls[0].payload == bytes([4, 2, 0, 12]) # BYPASS_ZONE + assert stub.calls[1].payload == bytes([5, 2, 0, 12]) # RESTORE_ZONE + + +@pytest.mark.asyncio +async def test_set_thermostat_modes_emit_correct_payloads() -> None: + stub = _StubConn() + client = _make_client(stub) + await client.set_thermostat_system_mode(2, HvacMode.COOL) + await client.set_thermostat_fan_mode(2, FanMode.ON) + await client.set_thermostat_hold_mode(2, HoldMode.HOLD) + assert stub.calls[0].payload == bytes([68, int(HvacMode.COOL), 0, 2]) + assert stub.calls[1].payload == bytes([69, int(FanMode.ON), 0, 2]) + assert stub.calls[2].payload == bytes([70, int(HoldMode.HOLD), 0, 2]) + + +@pytest.mark.asyncio +async def test_set_thermostat_setpoints_use_raw_byte() -> None: + stub = _StubConn() + client = _make_client(stub) + await client.set_thermostat_heat_setpoint_raw(1, 140) # ~70 °F + await client.set_thermostat_cool_setpoint_raw(1, 160) # ~80 °F + assert stub.calls[0].payload == bytes([66, 140, 0, 1]) + assert stub.calls[1].payload == bytes([67, 160, 0, 1]) + + +@pytest.mark.asyncio +async def test_button_program_message_helpers() -> None: + stub = _StubConn() + client = _make_client(stub) + await client.execute_button(4) + await client.execute_program(7) + await client.show_message(2, beep=True) + await client.show_message(2, beep=False) + await client.clear_message(2) + assert stub.calls[0].payload == bytes([7, 0, 0, 4]) # EXECUTE_BUTTON + assert stub.calls[1].payload == bytes([104, 0, 0, 7]) # EXECUTE_PROGRAM + assert stub.calls[2].payload == bytes([80, 0, 0, 2]) # SHOW_MSG_BEEP + assert stub.calls[3].payload == bytes([86, 0, 0, 2]) # SHOW_MSG_NOBEEP + assert stub.calls[4].payload == bytes([82, 0, 0, 2]) # CLEAR_MESSAGE + + +# -------------------------------------------------------------------------- +# execute_security_command (opcode 74) +# -------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_execute_security_command_payload_encoding_away_1234() -> None: + """The C# code packs the 4-digit code as four separate digit bytes.""" + stub = _StubConn( + reply_factory=lambda _op, _pl: encode_v2( + OmniLink2MessageType.ExecuteSecurityCommandResponse, + bytes([SecurityCommandResponse.SUCCESS]), + ) + ) + client = _make_client(stub) + result = await client.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=1234 + ) + assert result is None + payload = stub.calls[0].payload + # area, mode, d1, d2, d3, d4 + assert payload == bytes([1, int(SecurityMode.AWAY), 1, 2, 3, 4]) + assert stub.calls[0].opcode == int( + OmniLink2MessageType.ExecuteSecurityCommand + ) + + +@pytest.mark.asyncio +async def test_execute_security_command_pads_short_codes_with_zeros() -> None: + """Code 7 → digits 0,0,0,7 (matches the C# arithmetic).""" + stub = _StubConn( + reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.Ack) + ) + client = _make_client(stub) + await client.execute_security_command( + area=2, mode=SecurityMode.OFF, code=7 + ) + assert stub.calls[0].payload == bytes([2, 0, 0, 0, 0, 7]) + + +@pytest.mark.asyncio +async def test_execute_security_command_failure_raises_with_code() -> None: + def reply(_op: int, _pl: bytes) -> Message: + return encode_v2( + OmniLink2MessageType.ExecuteSecurityCommandResponse, + bytes([SecurityCommandResponse.INVALID_CODE]), + ) + + stub = _StubConn(reply_factory=reply) + client = _make_client(stub) + with pytest.raises(CommandFailedError) as ei: + await client.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=9999 + ) + assert ei.value.failure_code == int(SecurityCommandResponse.INVALID_CODE) + assert "INVALID_CODE" in str(ei.value) + + +@pytest.mark.asyncio +async def test_execute_security_command_validates_inputs() -> None: + stub = _StubConn() + client = _make_client(stub) + with pytest.raises(ValueError, match="area"): + await client.execute_security_command( + area=0, mode=SecurityMode.AWAY, code=1234 + ) + with pytest.raises(ValueError, match="code"): + await client.execute_security_command( + area=1, mode=SecurityMode.AWAY, code=10000 + ) + assert stub.calls == [] + + +# -------------------------------------------------------------------------- +# acknowledge_alerts (opcode 60) +# -------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_acknowledge_alerts_sends_no_payload_and_expects_ack() -> None: + stub = _StubConn() + client = _make_client(stub) + await client.acknowledge_alerts() + assert stub.calls[0].opcode == int(OmniLink2MessageType.AcknowledgeAlerts) + assert stub.calls[0].payload == b"" + + +# -------------------------------------------------------------------------- +# get_object_status (opcode 34/35) — request payload + reply parsing +# -------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_object_status_builds_request_payload() -> None: + """RequestStatus is [object_type, start_hi, start_lo, end_hi, end_lo].""" + captured: list[bytes] = [] + + def reply(_op: int, payload: bytes) -> Message: + captured.append(payload) + # Reply with one zone record: number=3, status=0x10, loop=200. + body = bytes([int(ObjectType.ZONE), 0, 3, 0x10, 200]) + return encode_v2(OmniLink2MessageType.Status, body) + + stub = _StubConn(reply_factory=reply) + client = _make_client(stub) + zones = await client.get_object_status(ObjectType.ZONE, 3) + assert captured[0] == struct.pack(">BHH", int(ObjectType.ZONE), 3, 3) + assert len(zones) == 1 + z = zones[0] + assert isinstance(z, ZoneStatus) + assert z.index == 3 + assert z.raw_status == 0x10 + assert z.loop == 200 + + +@pytest.mark.asyncio +async def test_get_object_status_parses_multiple_unit_records() -> None: + """Unit records are 5 bytes each (clsOL2MsgStatus.cs:17).""" + + def reply(_op: int, _pl: bytes) -> Message: + # object_type byte + two 5-byte unit records. + records = ( + bytes([0, 1, 1, 0, 0]) # unit 1, state=1 (On) + + bytes([0, 2, 100, 0, 0]) # unit 2, state=100 (level 0%) + ) + return encode_v2( + OmniLink2MessageType.Status, + bytes([int(ObjectType.UNIT)]) + records, + ) + + stub = _StubConn(reply_factory=reply) + client = _make_client(stub) + units = await client.get_object_status(ObjectType.UNIT, 1, 2) + assert len(units) == 2 + assert all(isinstance(u, UnitStatus) for u in units) + u1, u2 = units + assert u1.index == 1 + assert u1.state == 1 + assert u2.index == 2 + assert u2.state == 100 + + +@pytest.mark.asyncio +async def test_get_object_status_returns_empty_on_eod() -> None: + stub = _StubConn( + reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.EOD) + ) + client = _make_client(stub) + out = await client.get_object_status(ObjectType.AREA, 99) + assert out == [] + + +@pytest.mark.asyncio +async def test_get_object_status_raises_on_nak() -> None: + stub = _StubConn( + reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.Nak) + ) + client = _make_client(stub) + with pytest.raises(CommandFailedError, match="NAK"): + await client.get_object_status(ObjectType.ZONE, 1) + + +# -------------------------------------------------------------------------- +# get_extended_status (opcode 58/59) — header has object_length byte +# -------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_get_extended_status_request_layout_matches_spec() -> None: + captured: list[bytes] = [] + + def reply(_op: int, payload: bytes) -> Message: + captured.append(payload) + # Reply: object_type, object_length=4, then one zone record (4 bytes). + body = bytes([int(ObjectType.ZONE), 4]) + bytes([0, 5, 0x00, 100]) + return encode_v2(OmniLink2MessageType.ExtendedStatus, body) + + stub = _StubConn(reply_factory=reply) + client = _make_client(stub) + zones = await client.get_extended_status(ObjectType.ZONE, 5, 5) + assert captured[0] == struct.pack(">BHH", int(ObjectType.ZONE), 5, 5) + assert stub.calls[0].opcode == int( + OmniLink2MessageType.RequestExtendedStatus + ) + assert len(zones) == 1 + assert zones[0].index == 5 # type: ignore[union-attr] + + +@pytest.mark.asyncio +async def test_get_extended_status_uses_object_length_byte_for_record_size() -> None: + """ExtendedStatus thermostat record is 14 bytes (clsOL2MsgExtendedStatus.cs:138-235).""" + record = bytes( + [ + 0, 1, # number = 1 + 0, # status + 140, # current temp raw + 120, 160, # heat / cool setpoints + int(HvacMode.AUTO), + int(FanMode.AUTO), + int(HoldMode.OFF), + 150, # humidity raw + 120, 160, # humidify / dehumidify setpoints + 130, # outdoor temp raw + 1, # H or C status + ] + ) + assert len(record) == 14 + + def reply(_op: int, _pl: bytes) -> Message: + body = bytes([int(ObjectType.THERMOSTAT), 14]) + record + return encode_v2(OmniLink2MessageType.ExtendedStatus, body) + + stub = _StubConn(reply_factory=reply) + client = _make_client(stub) + out = await client.get_extended_status(ObjectType.THERMOSTAT, 1) + assert len(out) == 1 + t = out[0] + assert isinstance(t, ThermostatStatus) + assert t.index == 1 + assert t.temperature_raw == 140 + assert t.system_mode == int(HvacMode.AUTO) + + +@pytest.mark.asyncio +async def test_get_extended_status_returns_empty_on_eod() -> None: + stub = _StubConn( + reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.EOD) + ) + client = _make_client(stub) + out = await client.get_extended_status(ObjectType.AREA, 1, 8) + assert out == [] + + +@pytest.mark.asyncio +async def test_get_extended_status_area_record_parses_to_areastatus() -> None: + """Area ExtendedStatus record is 6 bytes + (clsOL2MsgExtendedStatus.cs:75-118): number(2) + mode + alarms + entry + + exit (matches our AreaStatus.parse). + """ + + def reply(_op: int, _pl: bytes) -> Message: + # area 1, mode AWAY, alarms 0, entry 0, exit 30 + record = bytes([0, 1, int(SecurityMode.AWAY), 0, 0, 30]) + body = bytes([int(ObjectType.AREA), 6]) + record + return encode_v2(OmniLink2MessageType.ExtendedStatus, body) + + stub = _StubConn(reply_factory=reply) + client = _make_client(stub) + out = await client.get_extended_status(ObjectType.AREA, 1) + assert len(out) == 1 + a = out[0] + assert isinstance(a, AreaStatus) + assert a.index == 1 + assert a.mode == int(SecurityMode.AWAY) + assert a.exit_timer_secs == 30 + + +@pytest.mark.asyncio +async def test_get_object_status_validates_range() -> None: + stub = _StubConn() + client = _make_client(stub) + with pytest.raises(ValueError, match="end"): + await client.get_object_status(ObjectType.ZONE, 5, 3) + assert stub.calls == [] diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 0000000..027f3ea --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,405 @@ +"""Tests for ``omni_pca.events`` — typed system-event parsing. + +The test data is hand-built — each helper constructs a synthetic v2 +``SystemEvents`` (opcode 55) ``Message`` containing one or more 16-bit +event words encoded in the panel's wire layout. Cross-reference the +bit-mask comments below against ``clsText.GetEventCategory`` +(clsText.cs:1585-1690) and ``GetEventText`` (clsText.cs:1693-1911). +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from omni_pca.events import ( + EVENT_REGISTRY, + AccessReaderEvent, + AcLost, + AcRestored, + AlarmActivated, + AlarmCleared, + AlarmKind, + AllOnOff, + ArmingChanged, + BatteryLow, + BatteryRestored, + CameraTrigger, + DcmOk, + DcmTrouble, + EnergyCostChanged, + EventStream, + EventType, + PhoneLineDead, + PhoneLineOffHook, + PhoneLineOnHook, + PhoneLineRinging, + SystemEvent, + UnitStateChanged, + UnknownEvent, + UpbLinkAction, + UpbLinkEvent, + UserMacroButton, + X10CodeReceived, + ZoneStateChanged, + parse_events, +) +from omni_pca.message import START_CHAR_V2, Message +from omni_pca.opcodes import OmniLink2MessageType + +# -------------------------------------------------------------------------- +# helpers +# -------------------------------------------------------------------------- + + +def _make_events_message(*words: int) -> Message: + """Build a v2 SystemEvents (opcode 55) Message containing the given + 16-bit event words, encoded big-endian on the wire.""" + payload = bytearray() + for w in words: + payload.append((w >> 8) & 0xFF) + payload.append(w & 0xFF) + data = bytes([int(OmniLink2MessageType.SystemEvents)]) + bytes(payload) + return Message(start_char=START_CHAR_V2, data=data) + + +# -------------------------------------------------------------------------- +# Per-subclass parse tests — one event per message +# -------------------------------------------------------------------------- + + +def test_parse_user_macro_button() -> None: + msg = _make_events_message(0x0042) + events = parse_events(msg) + assert len(events) == 1 + ev = events[0] + assert isinstance(ev, UserMacroButton) + assert ev.button_index == 0x42 + assert ev.event_type is EventType.USER_MACRO_BUTTON + assert ev.raw_word == 0x0042 + + +def test_parse_alarm_activated_burglary_area_3() -> None: + # ALARM family = 0x0200; (alarm_type=Burglary=1) << 4 | area=3 = 0x13 + word = 0x0200 | (int(AlarmKind.BURGLARY) << 4) | 0x03 + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, AlarmActivated) + assert ev.alarm_type == AlarmKind.BURGLARY + assert ev.area_index == 3 + + +def test_parse_alarm_cleared_when_alarm_type_zero() -> None: + # ALARM family with alarm_type=ANY(0): we surface as a cleared event. + word = 0x0200 | (int(AlarmKind.ANY) << 4) | 0x05 + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, AlarmCleared) + assert ev.area_index == 5 + + +def test_parse_zone_state_changed_open() -> None: + # ZONE family = 0x0400; bit 9 (0x0200) set ⇒ not-ready/open; zone 17. + word = 0x0400 | 0x0200 | 17 + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, ZoneStateChanged) + assert ev.zone_index == 17 + assert ev.is_open + assert not ev.is_secure + assert ev.new_state == 1 + + +def test_parse_zone_state_changed_secure() -> None: + word = 0x0400 | 23 # bit 9 clear ⇒ secure + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, ZoneStateChanged) + assert ev.zone_index == 23 + assert ev.is_secure + + +def test_parse_unit_state_changed_high_index_on() -> None: + # UNIT family = 0x0800; index 300 = bit 8 set + low byte 44; bit 9 = on. + # 300 = 256 + 44; bit 8 of high byte == ((1<<8))=0x100; OR bit 9 (0x200). + word = 0x0800 | 0x0200 | 0x0100 | 44 # high-byte bit 0 (=0x100) + bit 1 (=0x200) + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, UnitStateChanged) + assert ev.unit_index == 300 + assert ev.is_on + assert ev.new_state == 1 + + +def test_parse_unit_state_changed_low_index_off() -> None: + word = 0x0800 | 7 # bit 9 clear, no index extension + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, UnitStateChanged) + assert ev.unit_index == 7 + assert not ev.is_on + + +def test_parse_x10_code_received() -> None: + # X-10 family = 0x0C00; house 'B' (=1<<4 in low byte high nibble), + # unit 5 (=4 in low nibble, +1), bit 9 ⇒ on. + word = 0x0C00 | 0x0200 | (1 << 4) | 4 + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, X10CodeReceived) + assert ev.house_code == "B" + assert ev.unit_number == 5 + assert ev.is_on + assert not ev.all_units + + +def test_parse_all_on_off_area_2_on() -> None: + # ALL_ON_OFF family = 0x03E0; area 2 in low nibble; on bit (0x10) set. + word = 0x03E0 | 0x10 | 2 + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, AllOnOff) + assert ev.area_index == 2 + assert ev.on + + +def test_parse_phone_singletons() -> None: + cases: list[tuple[int, type]] = [ + (768, PhoneLineDead), + (769, PhoneLineRinging), + (770, PhoneLineOffHook), + (771, PhoneLineOnHook), + ] + for word, klass in cases: + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, klass), (word, type(ev)) + assert ev.raw_word == word + + +def test_parse_ac_battery_dcm_singletons() -> None: + [ac_off] = parse_events(_make_events_message(772)) + [ac_on] = parse_events(_make_events_message(773)) + [batt_low] = parse_events(_make_events_message(774)) + [batt_ok] = parse_events(_make_events_message(775)) + [dcm_bad] = parse_events(_make_events_message(776)) + [dcm_ok] = parse_events(_make_events_message(777)) + assert isinstance(ac_off, AcLost) + assert isinstance(ac_on, AcRestored) + assert isinstance(batt_low, BatteryLow) + assert isinstance(batt_ok, BatteryRestored) + assert isinstance(dcm_bad, DcmTrouble) + assert isinstance(dcm_ok, DcmOk) + + +def test_parse_energy_cost_levels() -> None: + for word, level in [(778, 0), (779, 1), (780, 2), (781, 3)]: + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, EnergyCostChanged) + assert ev.cost_level == level + + +def test_parse_camera_trigger() -> None: + [ev] = parse_events(_make_events_message(785)) # 785 - 781 = 4 → camera 4 + assert isinstance(ev, CameraTrigger) + assert ev.camera_index == 4 + + +def test_parse_access_reader_event() -> None: + # 976..991: reader_index = (word & 0xF) + 1 + [ev] = parse_events(_make_events_message(978)) + assert isinstance(ev, AccessReaderEvent) + assert ev.reader_index == ((978 & 0xF) + 1) + + +def test_parse_upb_link_actions() -> None: + # UPB_LINK family = 0xFC00; upper byte selects action. + actions = [ + (UpbLinkAction.OFF, 0xFC), + (UpbLinkAction.ON, 0xFD), + (UpbLinkAction.SET, 0xFE), + (UpbLinkAction.FADE_STOP, 0xFF), + ] + for action, upper in actions: + word = (upper << 8) | 12 # link index 12 + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, UpbLinkEvent), (action, ev) + assert ev.link_index == 12 + assert ev.action == int(action) + + +def test_parse_arming_changed_user_5_area_2_away() -> None: + # SECURITY_MODE_CHANGE catch-all: + # bits 12-14 = SecurityMode (3 = AWAY) + # bits 8-11 = area (2) + # low byte = user/code (5) + word = (3 << 12) | (2 << 8) | 5 + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, ArmingChanged) + assert ev.area_index == 2 + assert ev.new_mode == 3 + assert ev.user_index == 5 + assert ev.mode_name == "AWAY" + + +def test_parse_arming_changed_set_command_bit() -> None: + # Same as above but with the "Set" verb bit (bit 15) set. + word = (1 << 12) | (1 << 15) | (1 << 8) | 9 + [ev] = parse_events(_make_events_message(word)) + assert isinstance(ev, ArmingChanged) + assert ev.is_set_command + assert ev.user_index == 9 + + +def test_unknown_event_returned_for_unmapped_word() -> None: + # Any value in the gap between the special singletons and the SECURITY + # catch-all that ALSO has zero in the high nibble of the high byte + # falls through to UnknownEvent. word=900 is in the gap (after CAMERA + # range, before ACCESS_READER) and (900 >> 8) & 0xF0 = 0 → unknown. + [ev] = parse_events(_make_events_message(900)) + assert isinstance(ev, UnknownEvent) + assert ev.event_type is EventType.UNKNOWN + assert ev.raw_word == 900 + + +# -------------------------------------------------------------------------- +# Multi-event-per-packet (the panel batches into a single SystemEvents msg) +# -------------------------------------------------------------------------- + + +def test_parse_three_events_in_one_message() -> None: + msg = _make_events_message( + 0x0400 | 0x0200 | 5, # zone 5 opened + (3 << 12) | (1 << 8) | 7, # area 1 → AWAY by user 7 + 773, # AC restored + ) + events = parse_events(msg) + assert len(events) == 3 + z, a, ac = events + assert isinstance(z, ZoneStateChanged) + assert z.zone_index == 5 + assert z.is_open + assert isinstance(a, ArmingChanged) + assert a.area_index == 1 + assert a.new_mode == 3 + assert isinstance(ac, AcRestored) + + +def test_empty_system_events_message_returns_empty_list() -> None: + msg = _make_events_message() # zero event words + assert parse_events(msg) == [] + + +def test_odd_trailing_byte_is_silently_truncated() -> None: + """The C# count is ``(MessageLength - 1) / 2`` — a trailing odd byte + is dropped, not raised. We mirror that to stay tolerant of messages + where the panel has appended a stray byte (seen on some firmwares).""" + data = bytes([int(OmniLink2MessageType.SystemEvents)]) + b"\x00\x42\x77" + msg = Message(start_char=START_CHAR_V2, data=data) + events = parse_events(msg) + assert len(events) == 1 + assert isinstance(events[0], UserMacroButton) + assert events[0].button_index == 0x42 + + +def test_parse_rejects_wrong_opcode() -> None: + # opcode 25 = SystemStatus, not SystemEvents. + msg = Message( + start_char=START_CHAR_V2, + data=bytes([int(OmniLink2MessageType.SystemStatus)]) + b"\x00", + ) + with pytest.raises(ValueError, match="not a SystemEvents message"): + parse_events(msg) + + +def test_classmethod_parse_matches_function() -> None: + msg = _make_events_message(0x0042, 773) + via_classmethod = SystemEvent.parse(msg) + via_function = parse_events(msg) + assert [type(e) for e in via_classmethod] == [type(e) for e in via_function] + assert [e.raw_word for e in via_classmethod] == [e.raw_word for e in via_function] + + +# -------------------------------------------------------------------------- +# Registry sanity check +# -------------------------------------------------------------------------- + + +def test_event_registry_covers_every_eventtype_value() -> None: + """Every non-UNKNOWN EventType value should map to a concrete class.""" + for et in EventType: + assert int(et) in EVENT_REGISTRY, f"missing registry entry for {et!r}" + + +# -------------------------------------------------------------------------- +# EventStream — async iterator over an underlying connection-like source +# -------------------------------------------------------------------------- + + +class _FakeConnection: + """Minimal stand-in for OmniConnection used in the EventStream tests. + + Exposes the same ``unsolicited() -> AsyncIterator[Message]`` contract + backed by an in-memory queue so the test harness can drive the stream + deterministically without touching real I/O. + """ + + def __init__(self) -> None: + self.queue: asyncio.Queue[Message] = asyncio.Queue() + self._closed = False + + def push(self, msg: Message) -> None: + self.queue.put_nowait(msg) + + def close(self) -> None: + self._closed = True + + def unsolicited(self): + async def _gen(): + while True: + if self._closed and self.queue.empty(): + return + msg = await self.queue.get() + yield msg + return _gen() + + +@pytest.mark.asyncio +async def test_event_stream_yields_one_typed_event_per_step() -> None: + conn = _FakeConnection() + # Three events spread across two messages — confirms both flattening + # and cross-message iteration work. + conn.push(_make_events_message(0x0042, 773)) # 2 events + conn.push(_make_events_message(0x0400 | 0x0200 | 9)) # 1 event + conn.close() + + stream = EventStream(source=conn) + seen: list[SystemEvent] = [] + async for ev in stream: + seen.append(ev) + if len(seen) == 3: + break + + assert isinstance(seen[0], UserMacroButton) + assert seen[0].button_index == 0x42 + assert isinstance(seen[1], AcRestored) + assert isinstance(seen[2], ZoneStateChanged) + assert seen[2].zone_index == 9 + assert seen[2].is_open + + +@pytest.mark.asyncio +async def test_event_stream_skips_non_event_messages() -> None: + """Status replies, Acks, etc. that show up on the unsolicited + channel must be silently dropped — only opcode 55 produces events.""" + conn = _FakeConnection() + # Inject a SystemStatus reply (opcode 25) — should be filtered out. + other = Message( + start_char=START_CHAR_V2, + data=bytes([int(OmniLink2MessageType.SystemStatus)]) + b"\x00" * 14, + ) + conn.push(other) + conn.push(_make_events_message(0x0042)) + conn.close() + + stream = EventStream(source=conn) + ev = await stream.__anext__() + assert isinstance(ev, UserMacroButton) + assert ev.button_index == 0x42 + + +def test_event_stream_rejects_non_connection_source() -> None: + with pytest.raises(TypeError, match="unsolicited"): + EventStream(source=object()) # type: ignore[arg-type]