Library v1.0 phase B: command opcodes + typed system events

src/omni_pca/commands.py — Command IntEnum (64 values, sourced from
enuUnitCommand.cs which is the canonical 'enuCommand' despite the misleading
name) + SecurityCommandResponse + CommandFailedError exception. Notable
discovery: enuUnitCommand.UserSetting (104) is actually EXECUTE_PROGRAM;
renamed for clarity, C# alias documented inline.

src/omni_pca/client.py — 18 new methods on OmniClient:
  Core: execute_command, execute_security_command, acknowledge_alerts,
        get_object_status, get_extended_status
  Wrappers: turn_unit_on/off, set_unit_level, bypass_zone, restore_zone,
        set_thermostat_{system,fan,hold}_mode,
        set_thermostat_{heat,cool}_setpoint_raw,
        execute_button, execute_program, show_message, clear_message
  All command methods raise CommandFailedError on Nak.

src/omni_pca/events.py — typed SystemEvents (opcode 55) decoder.
- EventType IntEnum (28 dispatch tags)
- 26 SystemEvent subclasses + UnknownEvent catch-all
  Includes: ZoneStateChanged, UnitStateChanged, ArmingChanged,
  AlarmActivated/Cleared, AcLost/Restored, BatteryLow/Restored,
  PhoneLine{Off,On,Dead,Restored}, UserMacroButton, ProLinkMessage,
  CentraLiteSwitch, X10CodeReceived, AllOnOff, DcmTrouble/Ok,
  EnergyCostChanged, CameraTrigger, AccessReaderEvent, UpbLinkEvent
- SystemEvents packets carry MULTIPLE events; public API is
  parse_events(message) -> list[SystemEvent], plus SystemEvent.parse()
- EventStream helper that flattens batches across messages
- Wiring of OmniClient.events() left for next pass

55 new tests across both files. 194 pass, 2 pre-existing skips. Ruff clean.
This commit is contained in:
Ryan Malloy 2026-05-10 14:17:12 -06:00
parent 08974e2ec4
commit 68cf44a585
5 changed files with 2493 additions and 1 deletions

View File

@ -17,11 +17,12 @@ from __future__ import annotations
import asyncio import asyncio
import contextlib import contextlib
import struct import struct
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable, Sequence
from enum import IntEnum from enum import IntEnum
from types import TracebackType from types import TracebackType
from typing import Self from typing import Self
from .commands import Command, CommandFailedError, SecurityCommandResponse
from .connection import ( from .connection import (
ConnectionError as OmniConnectionError, ConnectionError as OmniConnectionError,
) )
@ -31,13 +32,23 @@ from .connection import (
) )
from .message import Message from .message import Message
from .models import ( from .models import (
OBJECT_TYPE_TO_STATUS,
AreaProperties, AreaProperties,
AreaStatus,
FanMode,
HoldMode,
HvacMode,
PropertiesReply, PropertiesReply,
SecurityMode,
StatusReply,
SystemInformation, SystemInformation,
SystemStatus, SystemStatus,
UnitProperties, UnitProperties,
ZoneProperties, ZoneProperties,
) )
from .models import (
ObjectType as ModelObjectType,
)
from .opcodes import OmniLink2MessageType from .opcodes import OmniLink2MessageType
@ -70,6 +81,23 @@ _PROPERTIES_PARSERS: dict[ObjectType, type[PropertiesReply]] = {
} }
# Per-object-type record sizes for a basic Status (opcode 35) reply, where
# (unlike ExtendedStatus) there is no per-record length byte and the size
# is hard-coded in the wire format. Source: clsOL2MsgStatus.cs:13-27.
_STATUS_RECORD_SIZES: dict[int, int] = {
1: 4, # enuObjectType.Zone — number(2) + status + loop
2: 5, # enuObjectType.Unit — number(2) + state + time(2)
5: 6, # enuObjectType.Area — number(2) + mode + alarms + entry + exit
6: 9, # enuObjectType.Thermostat — number(2) + status + 6 bytes (status..hold)
7: 3, # enuObjectType.Message — number(2) + status
8: 6, # enuObjectType.Auxillary — number(2) + output + temp + low + high
10: 6, # enuObjectType.AudioZone — number(2) + power + source + volume + mute
11: 4, # enuObjectType.Expansion — number(2) + status + battery
13: 5, # enuObjectType.UserSetting — number(2) + type + value(2)
15: 5, # enuObjectType.AccessControlLock — number(2) + status + duration(2)
}
class OmniClient: class OmniClient:
"""High-level async Omni-Link II client. """High-level async Omni-Link II client.
@ -169,6 +197,475 @@ class OmniClient:
self._expect(reply, OmniLink2MessageType.Properties) self._expect(reply, OmniLink2MessageType.Properties)
return parser.parse(reply.payload) return parser.parse(reply.payload)
# ---- commands --------------------------------------------------------
async def execute_command(
self,
command: Command,
parameter1: int = 0,
parameter2: int = 0,
) -> None:
"""Send a generic Command (opcode 20).
Most state-change operations on lights, scenes, zones, thermostats,
scenes, audio zones, etc. flow through here. The panel acks with
an :attr:`OmniLink2MessageType.Ack`; specific resulting state must
be re-polled (or you can subscribe to the unsolicited push stream
to see the corresponding ExtendedStatus push the panel emits).
Wire opcode: 20 (Command).
Wire payload (4 bytes, from clsOL2MsgCommand.cs:5-57):
[0] command byte (this enum value)
[1] parameter1 (single byte; brightness, mode, code index, ...)
[2] parameter2 high byte (BE u16)
[3] parameter2 low byte (object number for nearly every command)
Reference: clsOL2MsgCommand.cs.
"""
if not 0 <= parameter1 <= 0xFF:
raise ValueError(f"parameter1 must fit in a byte: {parameter1}")
if not 0 <= parameter2 <= 0xFFFF:
raise ValueError(f"parameter2 must fit in u16: {parameter2}")
payload = struct.pack(
">BBH", int(command), parameter1 & 0xFF, parameter2 & 0xFFFF
)
reply = await self._conn.request(OmniLink2MessageType.Command, payload)
if reply.opcode == OmniLink2MessageType.Nak:
raise CommandFailedError(
f"panel NAK'd Command {command.name} "
f"(p1={parameter1}, p2={parameter2})"
)
if reply.opcode != OmniLink2MessageType.Ack:
raise CommandFailedError(
f"unexpected reply to Command {command.name}: opcode={reply.opcode}"
)
async def execute_security_command(
self,
area: int,
mode: SecurityMode,
code: int,
) -> AreaStatus | None:
"""Arm or disarm a security area.
The panel validates the code against its enabled-codes list for
that area; on failure it returns an
:attr:`OmniLink2MessageType.ExecuteSecurityCommandResponse` whose
``payload[0]`` is one of the :class:`SecurityCommandResponse`
values. On success the panel may either return an Ack or push an
ExtendedStatus update for the affected area; we surface only the
response code (success/failure) and return ``None`` for the
success path because the synchronous reply does not carry a full
:class:`AreaStatus` record. Re-poll via :meth:`get_object_status`
if you need the post-command state.
Wire opcode: 74 (ExecuteSecurityCommand).
Wire payload (6 bytes, from clsOL2MsgExecuteSecurityCommand.cs:5-90):
[0] area number (1-based)
[1] security mode byte (raw enuSecurityMode 0..7)
[2] code digit 1 (thousands)
[3] code digit 2 (hundreds)
[4] code digit 3 (tens)
[5] code digit 4 (ones)
Raises:
ValueError: ``area`` not 1..255 or ``code`` not 0..9999.
CommandFailedError: panel Nak'd the request, or the
ExecuteSecurityCommandResponse status byte is non-zero.
The structured failure code is exposed on
``CommandFailedError.failure_code``.
Reference: clsOL2MsgExecuteSecurityCommand.cs,
clsOL2MsgExecuteSecurityCommandResponse.cs.
"""
if not 1 <= area <= 0xFF:
raise ValueError(f"area out of range: {area}")
if not 0 <= code <= 9999:
raise ValueError(f"code out of range (0000-9999): {code}")
# Match the C# digit-packing exactly (clsOL2MsgExecuteSecurityCommand.cs:36-41).
d1 = (code // 1000) % 10
d2 = (code // 100) % 10
d3 = (code // 10) % 10
d4 = code % 10
payload = bytes([area & 0xFF, int(mode) & 0xFF, d1, d2, d3, d4])
reply = await self._conn.request(
OmniLink2MessageType.ExecuteSecurityCommand, payload
)
if reply.opcode == OmniLink2MessageType.Nak:
raise CommandFailedError(
f"panel NAK'd ExecuteSecurityCommand "
f"(area={area}, mode={mode.name})"
)
if reply.opcode == OmniLink2MessageType.ExecuteSecurityCommandResponse:
if not reply.payload:
raise CommandFailedError(
"ExecuteSecurityCommandResponse with empty payload"
)
status = reply.payload[0]
if status != SecurityCommandResponse.SUCCESS:
try:
label = SecurityCommandResponse(status).name
except ValueError:
label = f"unknown({status})"
raise CommandFailedError(
f"ExecuteSecurityCommand failed: {label}",
failure_code=status,
)
return None
if reply.opcode == OmniLink2MessageType.Ack:
return None
raise CommandFailedError(
f"unexpected reply to ExecuteSecurityCommand: opcode={reply.opcode}"
)
async def acknowledge_alerts(self) -> None:
"""Acknowledge all outstanding alerts/troubles on the panel.
Wire opcode: 60 (AcknowledgeAlerts). No payload, panel acks.
Reference: enuOmniLink2MessageType.AcknowledgeAlerts.
"""
reply = await self._conn.request(OmniLink2MessageType.AcknowledgeAlerts)
if reply.opcode == OmniLink2MessageType.Nak:
raise CommandFailedError("panel NAK'd AcknowledgeAlerts")
if reply.opcode != OmniLink2MessageType.Ack:
raise CommandFailedError(
f"unexpected reply to AcknowledgeAlerts: opcode={reply.opcode}"
)
async def get_object_status(
self,
object_type: ModelObjectType,
start: int,
end: int | None = None,
) -> Sequence[StatusReply]:
"""Request basic Status (opcode 34/35) for a range of objects.
``end=None`` requests just the single object at ``start``. Returns
a list of the appropriate ``*Status`` dataclass instances, parsed
from each fixed-size record in the reply.
Unlike :meth:`get_extended_status`, the basic Status reply has NO
per-record ``object_length`` byte record sizes are hard-coded
per object type (see ``clsOL2MsgStatus.cs:13-27``).
Wire opcode: 34 (RequestStatus) -> 35 (Status).
RequestStatus payload (5 bytes, clsOL2MsgRequestStatus.cs:5-41):
[0] object type (enuObjectType)
[1..2] starting number (BE u16)
[3..4] ending number (BE u16)
Status reply payload layout (clsOL2MsgStatus.cs):
[0] object type
[1..] N records of size :data:`_STATUS_RECORD_SIZES[object_type]`
Reference: clsOL2MsgRequestStatus.cs, clsOL2MsgStatus.cs.
"""
return await self._fetch_status_range(
object_type=object_type,
start=start,
end=end,
request_opcode=OmniLink2MessageType.RequestStatus,
reply_opcode=OmniLink2MessageType.Status,
header_bytes=1, # just object_type
record_sizes=_STATUS_RECORD_SIZES,
)
async def get_extended_status(
self,
object_type: ModelObjectType,
start: int,
end: int | None = None,
) -> Sequence[StatusReply]:
"""Request ExtendedStatus (opcode 58/59) for a range of objects.
For Thermostats, AuxSensors, dimmable Units, and most other types
this carries more fields (current temperature, setpoints,
brightness level, etc.) than the basic Status reply.
Unlike basic Status, the ExtendedStatus reply has an explicit
``object_length`` byte at ``payload[1]`` so the record size doesn't
have to be hard-coded we use it as-is.
Wire opcode: 58 (RequestExtendedStatus) -> 59 (ExtendedStatus).
RequestExtendedStatus payload (5 bytes, clsOL2MsgRequestExtendedStatus.cs:5-41):
[0] object type
[1..2] starting number (BE u16)
[3..4] ending number (BE u16)
ExtendedStatus reply payload layout (clsOL2MsgExtendedStatus.cs):
[0] object type
[1] object length (per-record byte count)
[2..] N records of ``object_length`` bytes
Reference: clsOL2MsgRequestExtendedStatus.cs, clsOL2MsgExtendedStatus.cs.
"""
return await self._fetch_status_range(
object_type=object_type,
start=start,
end=end,
request_opcode=OmniLink2MessageType.RequestExtendedStatus,
reply_opcode=OmniLink2MessageType.ExtendedStatus,
header_bytes=2, # object_type + object_length
record_sizes=None, # take from payload[1]
)
# ---- thin command wrappers ------------------------------------------
async def turn_unit_on(self, index: int) -> None:
"""Turn a unit (light, relay, scene) ON.
Wire opcode: 20 (Command), command byte = ``Command.UNIT_ON`` (1).
Reference: enuUnitCommand.On (line 6).
"""
await self.execute_command(Command.UNIT_ON, parameter2=index)
async def turn_unit_off(self, index: int) -> None:
"""Turn a unit OFF.
Wire opcode: 20 (Command), command byte = ``Command.UNIT_OFF`` (0).
Reference: enuUnitCommand.Off (line 5).
"""
await self.execute_command(Command.UNIT_OFF, parameter2=index)
async def set_unit_level(self, index: int, percent: int) -> None:
"""Set a dimmable unit's brightness to ``percent`` (0..100).
Wire opcode: 20 (Command), command byte = ``Command.UNIT_LEVEL`` (9),
parameter1 = percent.
Reference: enuUnitCommand.Level (line 15).
"""
if not 0 <= percent <= 100:
raise ValueError(f"percent must be 0..100: {percent}")
await self.execute_command(
Command.UNIT_LEVEL, parameter1=percent, parameter2=index
)
async def bypass_zone(self, index: int, code: int = 0) -> None:
"""Bypass a zone (1-based).
Wire opcode: 20 (Command), command byte = ``Command.BYPASS_ZONE`` (4),
parameter1 = user code index (0 = installer/no-code path),
parameter2 = zone number.
Reference: enuUnitCommand.Bypass (line 10).
"""
await self.execute_command(
Command.BYPASS_ZONE, parameter1=code, parameter2=index
)
async def restore_zone(self, index: int, code: int = 0) -> None:
"""Restore a previously-bypassed zone.
Wire opcode: 20 (Command), command byte = ``Command.RESTORE_ZONE`` (5),
parameter1 = user code index, parameter2 = zone number.
Reference: enuUnitCommand.Restore (line 11).
"""
await self.execute_command(
Command.RESTORE_ZONE, parameter1=code, parameter2=index
)
async def set_thermostat_system_mode(
self, index: int, mode: HvacMode
) -> None:
"""Change the thermostat's system mode (Off/Heat/Cool/Auto/EmHeat).
Wire opcode: 20 (Command), command byte =
``Command.SET_THERMOSTAT_SYSTEM_MODE`` (68),
parameter1 = mode value, parameter2 = thermostat number.
Reference: enuUnitCommand.Mode (line 73).
"""
await self.execute_command(
Command.SET_THERMOSTAT_SYSTEM_MODE,
parameter1=int(mode),
parameter2=index,
)
async def set_thermostat_fan_mode(
self, index: int, mode: FanMode
) -> None:
"""Change the thermostat's fan mode (Auto/On/Cycle).
Wire opcode: 20 (Command), command byte =
``Command.SET_THERMOSTAT_FAN_MODE`` (69).
Reference: enuUnitCommand.Fan (line 74).
"""
await self.execute_command(
Command.SET_THERMOSTAT_FAN_MODE,
parameter1=int(mode),
parameter2=index,
)
async def set_thermostat_hold_mode(
self, index: int, mode: HoldMode
) -> None:
"""Change the thermostat's hold mode (Off/Hold/Vacation).
Wire opcode: 20 (Command), command byte =
``Command.SET_THERMOSTAT_HOLD_MODE`` (70).
Reference: enuUnitCommand.Hold (line 75).
"""
await self.execute_command(
Command.SET_THERMOSTAT_HOLD_MODE,
parameter1=int(mode),
parameter2=index,
)
async def set_thermostat_heat_setpoint_raw(
self, index: int, raw: int
) -> None:
"""Set the heat setpoint, in Omni's raw temperature byte units.
Convert from C/F at the call site (see
:func:`omni_pca.models.omni_temp_to_celsius` /
:func:`omni_pca.models.omni_temp_to_fahrenheit` for the inverse) -
this layer is deliberately transport-shaped.
Wire opcode: 20 (Command), command byte =
``Command.SET_THERMOSTAT_HEAT_SETPOINT`` (66).
Reference: enuUnitCommand.SetLowSetPt (line 71).
"""
if not 0 <= raw <= 0xFF:
raise ValueError(f"raw setpoint must be a byte: {raw}")
await self.execute_command(
Command.SET_THERMOSTAT_HEAT_SETPOINT,
parameter1=raw,
parameter2=index,
)
async def set_thermostat_cool_setpoint_raw(
self, index: int, raw: int
) -> None:
"""Set the cool setpoint, in Omni's raw temperature byte units.
Wire opcode: 20 (Command), command byte =
``Command.SET_THERMOSTAT_COOL_SETPOINT`` (67).
Reference: enuUnitCommand.SetHighSetPt (line 72).
"""
if not 0 <= raw <= 0xFF:
raise ValueError(f"raw setpoint must be a byte: {raw}")
await self.execute_command(
Command.SET_THERMOSTAT_COOL_SETPOINT,
parameter1=raw,
parameter2=index,
)
async def execute_button(self, index: int) -> None:
"""Run the program assigned to a button.
Wire opcode: 20 (Command), command byte = ``Command.EXECUTE_BUTTON`` (7).
Reference: enuUnitCommand.Button (line 13).
"""
await self.execute_command(Command.EXECUTE_BUTTON, parameter2=index)
async def execute_program(self, index: int) -> None:
"""Run a stored program by index (1-based).
Wire opcode: 20 (Command), command byte = ``Command.EXECUTE_PROGRAM`` (104).
Note: enuUnitCommand calls this ``UserSetting`` historically we
rename for clarity since "execute program" matches the user-facing
verb in the owner manual.
Reference: enuUnitCommand.UserSetting (line 98).
"""
await self.execute_command(Command.EXECUTE_PROGRAM, parameter2=index)
async def show_message(self, index: int, beep: bool = True) -> None:
"""Display a stored message on the panel's keypad.
Wire opcode: 20 (Command), command byte = ``Command.SHOW_MESSAGE_WITH_BEEP``
(80) when ``beep=True`` or ``Command.SHOW_MESSAGE_NO_BEEP`` (86) otherwise.
Reference: enuUnitCommand.ShowMsgWBeep (line 81),
enuUnitCommand.ShowMsgNoBeep (line 87).
"""
cmd = (
Command.SHOW_MESSAGE_WITH_BEEP
if beep
else Command.SHOW_MESSAGE_NO_BEEP
)
await self.execute_command(cmd, parameter2=index)
async def clear_message(self, index: int) -> None:
"""Clear a previously-shown message.
Wire opcode: 20 (Command), command byte = ``Command.CLEAR_MESSAGE`` (82).
Reference: enuUnitCommand.ClearMsg (line 83).
"""
await self.execute_command(Command.CLEAR_MESSAGE, parameter2=index)
# ---- helpers (status) -----------------------------------------------
async def _fetch_status_range(
self,
*,
object_type: ModelObjectType,
start: int,
end: int | None,
request_opcode: OmniLink2MessageType,
reply_opcode: OmniLink2MessageType,
header_bytes: int,
record_sizes: dict[int, int] | None,
) -> Sequence[StatusReply]:
if not 0 <= start <= 0xFFFF:
raise ValueError(f"start out of range: {start}")
end_n = start if end is None else end
if not 0 <= end_n <= 0xFFFF:
raise ValueError(f"end out of range: {end_n}")
if end_n < start:
raise ValueError(f"end ({end_n}) must be >= start ({start})")
parser = OBJECT_TYPE_TO_STATUS.get(int(object_type))
if parser is None:
raise NotImplementedError(
f"no status parser for object type {object_type.name}"
)
payload = struct.pack(">BHH", int(object_type), start, end_n)
reply = await self._conn.request(request_opcode, payload)
if reply.opcode == OmniLink2MessageType.EOD:
return []
if reply.opcode == OmniLink2MessageType.Nak:
raise CommandFailedError(
f"panel NAK'd {request_opcode.name} for "
f"{object_type.name}#{start}..{end_n}"
)
self._expect(reply, reply_opcode)
body = reply.payload
if len(body) < header_bytes:
raise OmniConnectionError(
f"{reply_opcode.name} payload too short: {len(body)}"
)
if body[0] != int(object_type):
raise OmniConnectionError(
f"{reply_opcode.name} object type mismatch: "
f"sent {int(object_type)}, got {body[0]}"
)
if record_sizes is None:
# ExtendedStatus carries the per-record size at payload[1].
record_size = body[1]
records_start = 2
else:
record_size = record_sizes.get(int(object_type), 0)
if record_size == 0:
raise NotImplementedError(
f"no Status record size for {object_type.name}"
)
records_start = 1
records_buf = body[records_start:]
if record_size == 0:
return []
out: list[StatusReply] = []
for off in range(0, len(records_buf), record_size):
chunk = records_buf[off : off + record_size]
if len(chunk) < record_size:
# Trailing partial record: ignore (panel may pad).
break
out.append(parser.parse(chunk))
return out
async def list_zone_names(self) -> dict[int, str]: async def list_zone_names(self) -> dict[int, str]:
"""Walk all zones, returning ``{index: name}`` for those with a name set.""" """Walk all zones, returning ``{index: name}`` for those with a name set."""
return await self._walk_named_objects( return await self._walk_named_objects(

211
src/omni_pca/commands.py Normal file
View File

@ -0,0 +1,211 @@
"""Command (opcode 20) and ExecuteSecurityCommand (opcode 74) primitives.
This module pins down the exact byte values the panel expects in the
*first byte* of a Command (opcode 20) payload, plus the failure-mode
exception used by the typed methods on :class:`omni_pca.client.OmniClient`.
Naming note: there is no standalone ``enuCommand`` enum in HAI_Shared
the C# code uses :class:`enuUnitCommand` (file
``decompiled/project/HAI_Shared/enuUnitCommand.cs``) for *every* Command
opcode regardless of object type, even though the name suggests it's
unit-only. We mirror that single enum here under the cleaner name
:class:`Command`. Every member cites the line in ``enuUnitCommand.cs``
where its byte value is defined.
The rich Command (opcode 20) wire format (from
``clsOL2MsgCommand.cs:5-57``) is:
payload[0] = command byte (this enum)
payload[1] = parameter1 (single byte, e.g. brightness, mode value)
payload[2] = parameter2 high byte (BE u16)
payload[3] = parameter2 low byte
Parameter2 is almost always the **object number** (unit#, zone#,
thermostat#, message#, button#, scene#). Parameter1 carries whatever the
specific command needs (level, mode, set-point, etc.) see the per-
member doc-comments below for the mapping.
The ExecuteSecurityCommand (opcode 74) wire format (from
``clsOL2MsgExecuteSecurityCommand.cs:5-90``) is:
payload[0] = area number (1-based)
payload[1] = security mode byte (enuSecurityMode, raw 0-7)
payload[2] = code digit 1 (thousands place, 0-9)
payload[3] = code digit 2 (hundreds place, 0-9)
payload[4] = code digit 3 (tens place, 0-9)
payload[5] = code digit 4 (ones place, 0-9)
The reply (ExecuteSecurityCommandResponse, opcode 75) carries a single
status byte at ``payload[0]`` whose values are listed in
``enuSecurityCommnadResponse.cs`` :class:`SecurityCommandResponse`
mirrors that enum.
"""
from __future__ import annotations
from enum import IntEnum
from .connection import ProtocolError
class Command(IntEnum):
"""OMNI command codes used as ``payload[0]`` of a Command (opcode 20).
Every member's value is sourced from
``decompiled/project/HAI_Shared/enuUnitCommand.cs``; the trailing
line-number reference points to the exact definition.
"""
# ---- unit / lighting ------------------------------------------------
UNIT_OFF = 0 # enuUnitCommand.Off, line 5
UNIT_ON = 1 # enuUnitCommand.On, line 6
ALL_OFF = 2 # enuUnitCommand.AllOff, line 7 (alias: All=2 line 8)
ALL_ON = 3 # enuUnitCommand.AllOn, line 9
BYPASS_ZONE = 4 # enuUnitCommand.Bypass, line 10
RESTORE_ZONE = 5 # enuUnitCommand.Restore, line 11
RESTORE_ALL_ZONES = 6 # enuUnitCommand.RestoreAll, line 12
EXECUTE_BUTTON = 7 # enuUnitCommand.Button, line 13
ENERGY = 8 # enuUnitCommand.Energy, line 14
UNIT_LEVEL = 9 # enuUnitCommand.Level, line 15 (param1 = 0..100 %)
UNIT_DECREMENT_COUNTER = 10 # enuUnitCommand.Dec, line 16
UNIT_INCREMENT_COUNTER = 11 # enuUnitCommand.Inc, line 17
UNIT_SET_COUNTER = 12 # enuUnitCommand.Set, line 18
UNIT_RAMP = 13 # enuUnitCommand.Ramp, line 19
COMPOSE_SCENE = 14 # enuUnitCommand.Compose, line 20
UPB_STATUS_REQUEST = 15 # enuUnitCommand.UPBStatus, line 21
DIM_STEP = 16 # enuUnitCommand.Dim, line 22 (param1 = step)
DIM_1 = 17 # enuUnitCommand.Dim1, line 23
DIM_2 = 18 # enuUnitCommand.Dim2, line 24
DIM_3 = 19 # enuUnitCommand.Dim3, line 25
DIM_4 = 20 # enuUnitCommand.Dim4, line 26
DIM_5 = 21 # enuUnitCommand.Dim5, line 27
DIM_6 = 22 # enuUnitCommand.Dim6, line 28
DIM_7 = 23 # enuUnitCommand.Dim7, line 29
DIM_8 = 24 # enuUnitCommand.Dim8, line 30
DIM_9 = 25 # enuUnitCommand.Dim9, line 31
UPB_BLINK = 26 # enuUnitCommand.UPBBlink, line 32
UPB_BLINK_OFF = 27 # enuUnitCommand.UPBBlinkOff, line 33
UPB_LINK_OFF = 28 # enuUnitCommand.UPBLinkOff, line 34
UPB_LINK_ON = 29 # enuUnitCommand.UPBLinkOn, line 35
UPB_LINK_SET = 30 # enuUnitCommand.UPBLinkSet, line 36
UPB_LINK_FADE_STOP = 31 # enuUnitCommand.UPBLinkFadeStop, line 37
BRIGHT_STEP = 32 # enuUnitCommand.Bright, line 38 (param1 = step)
BRIGHT_1 = 33 # enuUnitCommand.Bright1, line 39
BRIGHT_2 = 34 # enuUnitCommand.Bright2, line 40
BRIGHT_3 = 35 # enuUnitCommand.Bright3, line 41
BRIGHT_4 = 36 # enuUnitCommand.Bright4, line 42
BRIGHT_5 = 37 # enuUnitCommand.Bright5, line 43
BRIGHT_6 = 38 # enuUnitCommand.Bright6, line 44
BRIGHT_7 = 39 # enuUnitCommand.Bright7, line 45
BRIGHT_8 = 40 # enuUnitCommand.Bright8, line 46
BRIGHT_9 = 41 # enuUnitCommand.Bright9, line 47
CENTRALITE_SCENE_OFF = 42 # enuUnitCommand.CentraLiteSceneOff, line 48
CENTRALITE_SCENE_ON = 43 # enuUnitCommand.CentraLiteSceneOn, line 49
UPB_LED_OFF = 44 # enuUnitCommand.UPBLEDOff, line 50
UPB_LED_ON = 45 # enuUnitCommand.UPBLEDOn, line 51
RADIO_RA_PHANTOM_OFF = 46 # enuUnitCommand.RadioRAPhantomOff, line 52
RADIO_RA_PHANTOM_ON = 47 # enuUnitCommand.RadioRAPhantomOn, line 53
# ---- security (alternative path; preferred path is opcode 74) ------
# When sent through a Command (opcode 20), parameter1 carries the user
# code index (1-based) and parameter2 carries the area number. The
# panel honours these only if the code is enabled for the area.
SECURITY_OFF = 48 # enuUnitCommand.SecurityOff, line 55 (alias Security=48 line 54)
SECURITY_DAY = 49 # enuUnitCommand.SecurityDay, line 56
SECURITY_NIGHT = 50 # enuUnitCommand.SecurityNight, line 57
SECURITY_AWAY = 51 # enuUnitCommand.SecurityAway, line 58
SECURITY_VACATION = 52 # enuUnitCommand.SecurityVac, line 59
SECURITY_DAY_INSTANT = 53 # enuUnitCommand.SecurityDyi, line 60
SECURITY_NIGHT_DELAYED = 54 # enuUnitCommand.SecurityNtd, line 61
SECURITY_ANY_CHANGE = 55 # enuUnitCommand.SecurityAny, line 62
SECURITY_ARMING_DAY = 57 # enuUnitCommand.SecurityArmingDay, line 63
SECURITY_ARMING_NIGHT = 58 # enuUnitCommand.SecurityArmingNight, line 64
SECURITY_ARMING_AWAY = 59 # enuUnitCommand.SecurityArmingAway, line 65
SECURITY_ARMING_VACATION = 60 # enuUnitCommand.SecurityArmingVacation, line 66
SECURITY_ARMING_DAY_INSTANT = 61 # enuUnitCommand.SecurityArmingDayInst, line 67
SECURITY_ARMING_NIGHT_DELAYED = 62 # enuUnitCommand.SecurityArmingNightDelay, line 68
# ---- energy (HMS) --------------------------------------------------
ENERGY_OFF = 64 # enuUnitCommand.Eof, line 69
ENERGY_ON = 65 # enuUnitCommand.Eon, line 70
# ---- thermostat ---------------------------------------------------
SET_THERMOSTAT_HEAT_SETPOINT = 66 # enuUnitCommand.SetLowSetPt, line 71
SET_THERMOSTAT_COOL_SETPOINT = 67 # enuUnitCommand.SetHighSetPt, line 72
SET_THERMOSTAT_SYSTEM_MODE = 68 # enuUnitCommand.Mode, line 73
SET_THERMOSTAT_FAN_MODE = 69 # enuUnitCommand.Fan, line 74
SET_THERMOSTAT_HOLD_MODE = 70 # enuUnitCommand.Hold, line 75
THERMOSTAT_INC_DEC_LO = 71 # enuUnitCommand.IncDecLo, line 76
THERMOSTAT_INC_DEC_HI = 72 # enuUnitCommand.IncDecHi, line 77
SET_THERMOSTAT_HUMIDIFY_SETPOINT = 73 # enuUnitCommand.SetHumidifySetPt, line 78
SET_THERMOSTAT_DEHUMIDIFY_SETPOINT = 74 # enuUnitCommand.SetDeHumidifySetPt, line 79
# ---- panel display messages ---------------------------------------
SHOW_MESSAGE_WITH_BEEP = 80 # enuUnitCommand.ShowMsgWBeep, line 81 (alias FirstMsgCmd=80 line 80)
LOG_MESSAGE = 81 # enuUnitCommand.LogMsg, line 82
CLEAR_MESSAGE = 82 # enuUnitCommand.ClearMsg, line 83
SAY_MESSAGE = 83 # enuUnitCommand.SayMsg, line 84
PHONE_MESSAGE = 84 # enuUnitCommand.PhoneMsg, line 85
SEND_MESSAGE = 85 # enuUnitCommand.SendMsg, line 86
SHOW_MESSAGE_NO_BEEP = 86 # enuUnitCommand.ShowMsgNoBeep, line 87
EMAIL_MESSAGE = 87 # enuUnitCommand.EMailMsg, line 88 (alias LastMsgCmd=87 line 89)
# ---- scenes / misc -----------------------------------------------
SCENE_OFF = 96 # enuUnitCommand.SceneOff, line 90
SCENE_ON = 97 # enuUnitCommand.SceneOn, line 91
SCENE_SET = 98 # enuUnitCommand.SceneSet, line 92
TOGGLE = 99 # enuUnitCommand.Toggle, line 93
SHOW_VIDEO = 100 # enuUnitCommand.ShowVideo, line 94
TIMED_LEVEL = 101 # enuUnitCommand.TimedLevel, line 95
CONSOLE_BEEP = 102 # enuUnitCommand.ConsoleBeep, line 96
BEEP = 103 # enuUnitCommand.Beep, line 97
EXECUTE_PROGRAM = 104 # enuUnitCommand.UserSetting, line 98
LOCK = 105 # enuUnitCommand.Lock, line 99
UNLOCK = 106 # enuUnitCommand.Unlock, line 100
LUTRON_HOMEWORKS_KEYPAD = 107 # enuUnitCommand.LutronHomeWorksKeypadButtonPress, line 101
CLIPSAL_C_BUS_SCENE = 108 # enuUnitCommand.Clipsal_C_Bus_Scene, line 102
RADIO_RA2_PHANTOM = 109 # enuUnitCommand.RadioRA2Phantom, line 103
STOP = 110 # enuUnitCommand.Stop, line 104
# ---- audio --------------------------------------------------------
AUDIO_ZONE = 112 # enuUnitCommand.AudioZone, line 105
AUDIO_VOLUME = 113 # enuUnitCommand.AudioVolume, line 106
AUDIO_SOURCE = 114 # enuUnitCommand.AudioSource, line 107
AUDIO_KEY_PRESS = 115 # enuUnitCommand.AudioKeyPress, line 108
class SecurityCommandResponse(IntEnum):
"""Status byte returned in an ExecuteSecurityCommandResponse (opcode 75).
Source: ``decompiled/project/HAI_Shared/enuSecurityCommnadResponse.cs``
(typo in the C# enum name preserved here for grep parity).
"""
SUCCESS = 0 # line 5
INVALID_CODE = 1 # line 6
INVALID_SECURITY_MODE = 2 # line 7
INVALID_AREA = 3 # line 8
ZONES_NOT_READY = 4 # line 9
INSTALLER_RESTORE_NEEDED = 5 # line 10
CODE_LOCKED_OUT = 6 # line 11
INVALID = 0xFF # line 12
class CommandFailedError(ProtocolError):
"""A command opcode was Nak'd by the panel, or returned a structured
failure code (e.g. the Security command response carries one of the
:class:`SecurityCommandResponse` values).
The ``failure_code`` attribute is set when the panel returned an
ExecuteSecurityCommandResponse with a non-zero status byte; it's
``None`` for plain Nak replies that carry no further detail.
"""
def __init__(
self,
message: str,
*,
failure_code: int | None = None,
) -> None:
super().__init__(message)
self.failure_code = failure_code

860
src/omni_pca/events.py Normal file
View File

@ -0,0 +1,860 @@
"""Typed system-event objects for Omni-Link II push notifications.
The panel batches state-change notifications into a single ``SystemEvents``
message (v2 opcode 55). Each batched event is a single 16-bit big-endian
word in the message payload the message envelope is just a sequence of
those words. The 16-bit value is *category-encoded*: the high bits pick
the event family (zone, unit, arming, alarm, AC, battery, ) and the
remaining bits carry per-family fields (zone index, area, alarm type,
unit number, security mode, etc.).
Pipeline:
raw bytes -> Message (opcode 55)
Message -> parse_events(message) -> list[SystemEvent]
SystemEvent -> ZoneStateChanged | UnitStateChanged | ArmingChanged |
A single SystemEvents message can carry multiple events, so the public
parse entry points always return a *list*. ``EventStream`` flattens that
list across an underlying ``OmniConnection.unsolicited()`` iterator so
consumers can iterate one typed event at a time.
References (decompiled C# source):
clsOLMsgSystemEvents.cs message envelope + per-event word read
enuOmniLink2MessageType.cs:60 SystemEvents = 55 (v2 opcode)
enuEventType.cs category enum (the values used here)
enuAlarmType.cs alarm subtype byte
enuSecurityMode.cs security mode byte (used by arming)
clsText.cs:1585-1690 (GetEventCategory)
bit-mask classifier we mirror below
clsText.cs:1693-1911 (GetEventText)
per-category sub-field extraction
"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from enum import IntEnum
from typing import ClassVar
from .message import Message
from .models import SecurityMode
from .opcodes import OmniLink2MessageType
# --------------------------------------------------------------------------
# Numeric tags / enums
# --------------------------------------------------------------------------
class EventType(IntEnum):
"""Symbolic identifiers for the event subclasses we expose.
These are *not* the raw 16-bit on-the-wire codes those are densely
packed bit fields. We assign each typed-event subclass a stable small
integer so that ``SystemEvent.event_type`` is a single discriminator
and ``EVENT_REGISTRY`` can dispatch on it.
Reference: enuEventType.cs (the C# equivalent that drives clsText
.GetEventCategory). The order/values below intentionally mirror the
classification order in clsText.cs:1585-1690 so a future maintainer
reading both files side-by-side sees the same shape.
"""
USER_MACRO_BUTTON = 0 # clsText.cs:1587-1590
PRO_LINK_MESSAGE = 1 # clsText.cs:1591-1594
CENTRALITE_SWITCH = 2 # clsText.cs:1595-1598
ALARM_ACTIVATED = 3 # clsText.cs:1599-1602, 1738-1750
ALARM_CLEARED = 4 # synthesized: ALARM word with alarm_type=0
ZONE_STATE_CHANGED = 5 # clsText.cs:1603-1606, 1751-1756
UNIT_STATE_CHANGED = 6 # clsText.cs:1607-1610, 1757-1765
X10_CODE = 7 # clsText.cs:1615-1618
ALL_ON_OFF = 8 # clsText.cs:1643-1646
PHONE_LINE_DEAD = 9 # clsText.cs:1649,1853-1857
PHONE_LINE_RING = 10 # clsText.cs:1651,1858-1859
PHONE_LINE_OFF_HOOK = 11 # clsText.cs:1653,1860-1861
PHONE_LINE_ON_HOOK = 12 # clsText.cs:1655,1862-1863
AC_LOST = 13 # clsText.cs:1657,1866-1870
AC_RESTORED = 14 # clsText.cs:1659,1871-1872
BATTERY_LOW = 15 # clsText.cs:1661,1875-1879
BATTERY_RESTORED = 16 # clsText.cs:1663,1880-1881
DCM_TROUBLE = 17 # clsText.cs:1665,1884-1888
DCM_OK = 18 # clsText.cs:1667,1889-1890
ENERGY_COST_LOW = 19 # clsText.cs:1669,1893-1897
ENERGY_COST_MID = 20 # clsText.cs:1671,1898-1899
ENERGY_COST_HIGH = 21 # clsText.cs:1673,1900-1901
ENERGY_COST_CRITICAL = 22 # clsText.cs:1675,1902-1903
CAMERA = 23 # clsText.cs:1677-1683,1906-1907
ACCESS_READER = 24 # clsText.cs:1684-1688,1908-1909
UPB_LINK = 25 # clsText.cs:1635-1638,1795-1810
ARMING_CHANGED = 26 # clsText.cs:1689,2140-2217 (catch-all)
UNKNOWN = 0xFF # parser couldn't classify
class AlarmKind(IntEnum):
"""Alarm subtype byte (enuAlarmType.cs)."""
ANY = 0 # enuAlarmType.cs:5
BURGLARY = 1 # enuAlarmType.cs:6
FIRE = 2 # enuAlarmType.cs:7
GAS = 3 # enuAlarmType.cs:8
AUX = 4 # enuAlarmType.cs:9
FREEZE = 5 # enuAlarmType.cs:10
WATER = 6 # enuAlarmType.cs:11
DURESS = 7 # enuAlarmType.cs:12
TEMPERATURE = 8 # enuAlarmType.cs:13
CONFIRMED_BURGLARY = 9 # enuAlarmType.cs:14
class UpbLinkAction(IntEnum):
"""UPB link sub-action, the upper byte of a UPB-LINK event word.
The C# code maps these via ``enuButtonType`` — UPBLinkOff/On/Set/
FadeStop and the enum values are picked so they line up with the
on-the-wire upper byte (clsText.cs:1801-1808 + enuEventType.cs:14-19,
where UPB_LINK_OFF=64512, UPB_LINK_ON=64768, UPB_LINK_SET=65024,
UPB_LINK_FADE_STOP=65280 i.e. high-byte = 0xFC, 0xFD, 0xFE, 0xFF).
"""
OFF = 0xFC
ON = 0xFD
SET = 0xFE
FADE_STOP = 0xFF
# --------------------------------------------------------------------------
# Wire-format helpers
# --------------------------------------------------------------------------
def _ensure_system_events(message: Message) -> bytes:
"""Validate that ``message`` is a v2 SystemEvents reply, return its
payload bytes (everything after the opcode).
Reference: clsOLMsgSystemEvents.cs (entire file) the message body
is just ``[opcode][word1_hi][word1_lo][word2_hi][word2_lo]``.
"""
if message.opcode != int(OmniLink2MessageType.SystemEvents):
raise ValueError(
"not a SystemEvents message: opcode "
f"{message.opcode} (expected {int(OmniLink2MessageType.SystemEvents)})"
)
payload = message.payload
if len(payload) % 2 != 0:
# The C# count formula is ``(MessageLength - 1) / 2`` and silently
# truncates a trailing odd byte. We do the same — never raise.
payload = payload[: len(payload) - 1]
return payload
def _iter_event_words(payload: bytes) -> list[int]:
"""Split a SystemEvents payload into 16-bit BE words.
Reference: clsOLMsgSystemEvents.cs:15-18 (SystemEvent(index) accessor).
"""
return [(payload[i] << 8) | payload[i + 1] for i in range(0, len(payload), 2)]
# --------------------------------------------------------------------------
# Base + concrete event classes
# --------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class SystemEvent:
"""Base class for every typed system-event object.
Subclasses override ``EVENT_TYPE`` (the discriminator) and may add
extra attributes carrying decoded fields. The original 16-bit word
is always preserved as ``raw_word`` so callers can fall back to the
unstructured value for diagnostics.
"""
event_type: EventType
raw_word: int
EVENT_TYPE: ClassVar[EventType | None] = None
@classmethod
def parse(cls, message: Message) -> list[SystemEvent]:
"""Decode every event word in a v2 SystemEvents message.
Returns a list because a single message can batch multiple events
(clsOLMsgSystemEvents.SystemEventsCount() one count value, but
the protocol allows ``count`` to be > 1).
"""
return parse_events(message)
@dataclass(frozen=True, slots=True)
class UserMacroButton(SystemEvent):
"""A user macro button (1-255) was triggered.
Wire layout: ``(word & 0xFF00) == 0`` button index in low byte.
Reference: clsText.cs:1587-1590, 1697-1698.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.USER_MACRO_BUTTON
button_index: int = 0
@dataclass(frozen=True, slots=True)
class ProLinkMessage(SystemEvent):
"""A Pro-Link message (256..383) was received.
Wire layout: ``(word & 0xFF80) == 0x100`` message index in low 7 bits.
Reference: clsText.cs:1591-1594, 1699-1700.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.PRO_LINK_MESSAGE
message_index: int = 0
@dataclass(frozen=True, slots=True)
class CentraLiteSwitch(SystemEvent):
"""A CentraLite/Aegis scene-keypad button was pressed.
Wire layout: ``(word & 0xFF80) == 0x180`` switch sub-index in low 7 bits.
Reference: clsText.cs:1595-1598, 1701-1736.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.CENTRALITE_SWITCH
switch_index: int = 0
@dataclass(frozen=True, slots=True)
class AlarmActivated(SystemEvent):
"""A real alarm condition was triggered.
Wire layout: ``(word & 0xFF00) == 0x200`` (the ALARM family).
- bits 4-7 of low byte (``(word & 0xF0) >> 4``) enuAlarmType
- bits 0-3 of low byte (``word & 0xF``) area index
(0 means "system-wide alarm, no specific area")
Reference: clsText.cs:1599-1602, 1738-1750.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.ALARM_ACTIVATED
area_index: int = 0
alarm_type: int = 0 # AlarmKind value
@dataclass(frozen=True, slots=True)
class AlarmCleared(SystemEvent):
"""An alarm condition was cleared.
Synthesized the wire word is in the ALARM family but with the
alarm-type nibble equal to ``AlarmKind.ANY`` (0). The C# code does
not have a separate cleared category; it simply formats the word as
an "Any" alarm. We split it out so home-automation callers can react
to "alarm went away" without rebuilding the bitfield themselves.
Reference: clsText.cs:1738-1750 (the ``a`` variable can be 0).
"""
EVENT_TYPE: ClassVar[EventType] = EventType.ALARM_CLEARED
area_index: int = 0
@dataclass(frozen=True, slots=True)
class ZoneStateChanged(SystemEvent):
"""A security zone changed state (open/close, secure/not-ready).
Wire layout: ``(word & 0xFC00) == 0x400`` (ZONE_STATE_CHANGE family).
- low byte zone index 1..255
- bit 9 (``(word >> 8) & 0x02``) 1 = not-ready/open, 0 = secure
Reference: clsText.cs:1603-1606, 1751-1756.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.ZONE_STATE_CHANGED
zone_index: int = 0
new_state: int = 0 # 0=secure, 1=not-ready/open
@property
def is_open(self) -> bool:
return self.new_state != 0
@property
def is_secure(self) -> bool:
return self.new_state == 0
@dataclass(frozen=True, slots=True)
class UnitStateChanged(SystemEvent):
"""A controllable unit (light/output) changed state.
Wire layout: ``(word & 0xFC00) == 0x800`` (UNIT_STATE_CHANGE family).
- unit index = ``((word >> 8) & 1) * 256 + (word & 0xFF)``
(so bit 8 lifts the index above 255)
- bit 9 (``(word >> 8) & 0x02``) 1 = on, 0 = off
Reference: clsText.cs:1607-1610, 1757-1765.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.UNIT_STATE_CHANGED
unit_index: int = 0
new_state: int = 0 # 0=off, 1=on
@property
def is_on(self) -> bool:
return self.new_state != 0
@dataclass(frozen=True, slots=True)
class X10CodeReceived(SystemEvent):
"""An X-10 house/unit code was seen on the powerline.
Wire layout: ``(word & 0xFC00) == 0xC00``.
- house letter A..P = chr(65 + ((word & 0xFF) >> 4))
- unit number 1..16 = (word & 0xF) + 1
- on/off (bit 9) = ``((word >> 8) & 2) == 2`` On
- "all units" (bit 8) = ``(word & 0x100) != 0``
Reference: clsText.cs:1615-1618, 1785-1793.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.X10_CODE
house_code: str = "A"
unit_number: int = 1
is_on: bool = False
all_units: bool = False
@dataclass(frozen=True, slots=True)
class AllOnOff(SystemEvent):
"""An "All On" or "All Off" command was issued.
Wire layout: ``(word & 0xFFE0) == 0x3E0`` (=992 family).
- bit 4 (``(word & 0x10) >> 4``) 0 = all off, 1 = all on
- low 4 bits (``word & 0xF``) area (0 = system-wide)
Reference: clsText.cs:1643-1646, 1835-1851.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.ALL_ON_OFF
area_index: int = 0
on: bool = False
@dataclass(frozen=True, slots=True)
class PhoneLineDead(SystemEvent):
"""Phone line went dead (word == 768).
Reference: clsText.cs:1649, 1853-1857."""
EVENT_TYPE: ClassVar[EventType] = EventType.PHONE_LINE_DEAD
@dataclass(frozen=True, slots=True)
class PhoneLineRinging(SystemEvent):
"""Phone is ringing (word == 769).
Reference: clsText.cs:1651, 1858-1859."""
EVENT_TYPE: ClassVar[EventType] = EventType.PHONE_LINE_RING
@dataclass(frozen=True, slots=True)
class PhoneLineOffHook(SystemEvent):
"""Panel went off-hook to dial out (word == 770).
Reference: clsText.cs:1653, 1860-1861."""
EVENT_TYPE: ClassVar[EventType] = EventType.PHONE_LINE_OFF_HOOK
@dataclass(frozen=True, slots=True)
class PhoneLineOnHook(SystemEvent):
"""Panel hung up (word == 771).
Reference: clsText.cs:1655, 1862-1863."""
EVENT_TYPE: ClassVar[EventType] = EventType.PHONE_LINE_ON_HOOK
@dataclass(frozen=True, slots=True)
class AcLost(SystemEvent):
"""Mains AC was lost (word == 772).
Reference: clsText.cs:1657, 1866-1870."""
EVENT_TYPE: ClassVar[EventType] = EventType.AC_LOST
@dataclass(frozen=True, slots=True)
class AcRestored(SystemEvent):
"""Mains AC came back (word == 773).
Reference: clsText.cs:1659, 1871-1872."""
EVENT_TYPE: ClassVar[EventType] = EventType.AC_RESTORED
@dataclass(frozen=True, slots=True)
class BatteryLow(SystemEvent):
"""Backup battery is low (word == 774).
Reference: clsText.cs:1661, 1875-1879."""
EVENT_TYPE: ClassVar[EventType] = EventType.BATTERY_LOW
@dataclass(frozen=True, slots=True)
class BatteryRestored(SystemEvent):
"""Backup battery is OK again (word == 775).
Reference: clsText.cs:1663, 1880-1881."""
EVENT_TYPE: ClassVar[EventType] = EventType.BATTERY_RESTORED
@dataclass(frozen=True, slots=True)
class DcmTrouble(SystemEvent):
"""Digital communicator failure (word == 776).
Reference: clsText.cs:1665, 1884-1888."""
EVENT_TYPE: ClassVar[EventType] = EventType.DCM_TROUBLE
@dataclass(frozen=True, slots=True)
class DcmOk(SystemEvent):
"""Digital communicator OK (word == 777).
Reference: clsText.cs:1667, 1889-1890."""
EVENT_TYPE: ClassVar[EventType] = EventType.DCM_OK
@dataclass(frozen=True, slots=True)
class EnergyCostChanged(SystemEvent):
"""Real-time energy-cost band changed (words 778..781).
The discriminator on the dataclass tells you which band; ``raw_word``
keeps the original number in case a future firmware adds more.
Reference: clsText.cs:1669-1676, 1893-1903.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.ENERGY_COST_LOW # placeholder, overridden by parse
cost_level: int = 0 # 0=low, 1=mid, 2=high, 3=critical
@dataclass(frozen=True, slots=True)
class CameraTrigger(SystemEvent):
"""A camera input fired (words 782..787).
Wire layout: ``camera_index = word - 781`` 1..6.
Reference: clsText.cs:1677-1683, 1906-1907.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.CAMERA
camera_index: int = 1
@dataclass(frozen=True, slots=True)
class AccessReaderEvent(SystemEvent):
"""An access-control reader emitted an event (words 976..991).
Wire layout: ``reader_index = (word & 0xF) + 1``.
Reference: clsText.cs:1684-1688, 1908-1909.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.ACCESS_READER
reader_index: int = 1
@dataclass(frozen=True, slots=True)
class UpbLinkEvent(SystemEvent):
"""A UPB link command was sent (words 0xFC00..0xFFFF).
Wire layout: ``(word & 0xFC00) == 0xFC00``.
- upper byte enuButtonType: UPBLinkOff(0xFC), UPBLinkOn(0xFD),
UPBLinkSet(0xFE), UPBLinkFadeStop(0xFF)
- lower byte link index 1..255
Reference: clsText.cs:1635-1638, 1795-1810.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.UPB_LINK
link_index: int = 0
action: int = 0 # UpbLinkAction value (raw upper byte)
@dataclass(frozen=True, slots=True)
class ArmingChanged(SystemEvent):
"""An area's security mode changed (catch-all SECURITY_MODE_CHANGE).
Wire layout (clsText.cs:2155-2217 this is the default branch of
``GetButtonText``, which is what GetEventCategory routes to when the
event word doesn't match any other family):
- bits 12-14 (``(word >> 12) & 7``) enuSecurityMode value
- bits 8-11 (``(word >> 8) & 0xF``) area index (0 = system)
- low byte (``word & 0xFF``) user/code index that
triggered the change (0 = unknown / panel-initiated)
- bit 15 (``(word >> 8) & 0x80``) "Set" vs. "Arm" verb,
surfaced as ``is_set_command``
- bit 11 (``(word >> 8) & 0x40``)... reserved-ish; left as raw
Reference: clsText.cs:1689 (catch-all) + 2140-2217 (decoder).
"""
EVENT_TYPE: ClassVar[EventType] = EventType.ARMING_CHANGED
area_index: int = 0
new_mode: int = 0 # SecurityMode value
user_index: int = 0
is_set_command: bool = False # True for SET (panic/Lumina), False for ARM
@property
def mode_name(self) -> str:
try:
return SecurityMode(self.new_mode).name
except ValueError:
return f"Unknown({self.new_mode})"
@dataclass(frozen=True, slots=True)
class UnknownEvent(SystemEvent):
"""Catch-all so an unrecognised event word never crashes the iterator.
The event type byte was parseable but didn't match any family in
clsText.GetEventCategory's classification — likely a future-firmware
code we haven't mapped yet.
"""
EVENT_TYPE: ClassVar[EventType] = EventType.UNKNOWN
# --------------------------------------------------------------------------
# Per-word classifier (mirrors clsText.GetEventCategory)
# --------------------------------------------------------------------------
def _classify(word: int) -> SystemEvent:
"""Decode a single 16-bit event word into the appropriate subclass.
Mirrors ``clsText.GetEventCategory`` (clsText.cs:1585-1690) and the
per-family field-extraction in ``GetEventText`` (clsText.cs:1693-1911).
The classification order matters exact-match cases (PHONE_, AC_,
BATTERY_, ) are inspected before the wide SECURITY_MODE_CHANGE
catch-all, exactly as the C# does.
"""
# USER_MACRO_BUTTON: high byte == 0
if (word & 0xFF00) == 0x0000:
return UserMacroButton(
event_type=EventType.USER_MACRO_BUTTON,
raw_word=word,
button_index=word & 0xFF,
)
# PRO_LINK_MESSAGE: high 9 bits == 0x100
if (word & 0xFF80) == 0x0100:
return ProLinkMessage(
event_type=EventType.PRO_LINK_MESSAGE,
raw_word=word,
message_index=word & 0x7F,
)
# CENTRALITE_SWITCH: high 9 bits == 0x180
if (word & 0xFF80) == 0x0180:
return CentraLiteSwitch(
event_type=EventType.CENTRALITE_SWITCH,
raw_word=word,
switch_index=word & 0x7F,
)
# ALARM (activated/cleared): top byte == 0x02
if (word & 0xFF00) == 0x0200:
alarm_type = (word & 0xF0) >> 4
area = word & 0x0F
if alarm_type == int(AlarmKind.ANY):
# Per clsText.cs:1738-1750, the "ANY" subtype is what the panel
# emits when an alarm is being cleared — it formats it as
# "Any alarm cleared" string. Surface as a distinct subclass.
return AlarmCleared(
event_type=EventType.ALARM_CLEARED,
raw_word=word,
area_index=area,
)
return AlarmActivated(
event_type=EventType.ALARM_ACTIVATED,
raw_word=word,
area_index=area,
alarm_type=alarm_type,
)
# ZONE_STATE_CHANGE: top 6 bits == 0x4
if (word & 0xFC00) == 0x0400:
return ZoneStateChanged(
event_type=EventType.ZONE_STATE_CHANGED,
raw_word=word,
zone_index=word & 0xFF,
new_state=1 if ((word >> 8) & 0x02) == 0x02 else 0,
)
# UNIT_STATE_CHANGE: top 6 bits == 0x8
if (word & 0xFC00) == 0x0800:
unit_index = ((word >> 8) & 0x01) * 256 + (word & 0xFF)
return UnitStateChanged(
event_type=EventType.UNIT_STATE_CHANGED,
raw_word=word,
unit_index=unit_index,
new_state=1 if ((word >> 8) & 0x02) == 0x02 else 0,
)
# X-10 code: top 6 bits == 0xC
if (word & 0xFC00) == 0x0C00:
return X10CodeReceived(
event_type=EventType.X10_CODE,
raw_word=word,
house_code=chr(65 + ((word & 0xFF) >> 4)),
unit_number=(word & 0x0F) + 1,
is_on=((word >> 8) & 0x02) == 0x02,
all_units=(word & 0x0100) != 0,
)
# ALL_ON_OFF: top 11 bits == 992 (0x3E0) — covers 992..1023, but
# we leave 1024+ for ZONE which has already been handled above.
if (word & 0xFFE0) == 0x03E0:
return AllOnOff(
event_type=EventType.ALL_ON_OFF,
raw_word=word,
area_index=word & 0x0F,
on=(word & 0x10) != 0,
)
# Exact-match singletons (PHONE_, AC_, BATTERY_, DCM_, ENERGY, CAMERA,
# ACCESS_READER) come before the SECURITY catch-all.
if word == 768:
return PhoneLineDead(event_type=EventType.PHONE_LINE_DEAD, raw_word=word)
if word == 769:
return PhoneLineRinging(event_type=EventType.PHONE_LINE_RING, raw_word=word)
if word == 770:
return PhoneLineOffHook(event_type=EventType.PHONE_LINE_OFF_HOOK, raw_word=word)
if word == 771:
return PhoneLineOnHook(event_type=EventType.PHONE_LINE_ON_HOOK, raw_word=word)
if word == 772:
return AcLost(event_type=EventType.AC_LOST, raw_word=word)
if word == 773:
return AcRestored(event_type=EventType.AC_RESTORED, raw_word=word)
if word == 774:
return BatteryLow(event_type=EventType.BATTERY_LOW, raw_word=word)
if word == 775:
return BatteryRestored(event_type=EventType.BATTERY_RESTORED, raw_word=word)
if word == 776:
return DcmTrouble(event_type=EventType.DCM_TROUBLE, raw_word=word)
if word == 777:
return DcmOk(event_type=EventType.DCM_OK, raw_word=word)
if 778 <= word <= 781:
level = word - 778
return EnergyCostChanged(
event_type=EventType(EventType.ENERGY_COST_LOW + level),
raw_word=word,
cost_level=level,
)
if 782 <= word <= 787:
return CameraTrigger(
event_type=EventType.CAMERA,
raw_word=word,
camera_index=word - 781,
)
if 976 <= word <= 991:
return AccessReaderEvent(
event_type=EventType.ACCESS_READER,
raw_word=word,
reader_index=(word & 0x0F) + 1,
)
# UPB_LINK: top 6 bits == 0xFC (covers 0xFC00..0xFFFF).
# The C# code peels off the 0xFD00 (UPB_LINK_ON) sub-family first to
# check whether the unit is actually an HLC or Z-Wave room controller
# (clsText.cs:1619-1633). We don't have access to the panel's unit
# type cache here, so we always classify these as UpbLinkEvent — the
# caller can refine using the unit_index if they care.
if (word & 0xFC00) == 0xFC00:
upper = (word >> 8) & 0xFF
return UpbLinkEvent(
event_type=EventType.UPB_LINK,
raw_word=word,
link_index=word & 0xFF,
action=upper,
)
# SECURITY_MODE_CHANGE catch-all (clsText.cs:1689). This is the
# default branch in GetEventCategory: anything that didn't match any
# of the families above lands here. The 16-bit layout is:
# bits 12-14 → SecurityMode (0..7)
# bits 8-11 → area index (0 = system / no specific area)
# bit 15 → "Set" vs. "Arm" verb (Lumina vs. Omni semantics)
# low byte → user/code index that triggered the change
if (word >> 8) & 0xF0:
# Plausible arming change: the high nibble of the high byte is
# non-zero (carries either the Set bit or the area+mode bits).
return ArmingChanged(
event_type=EventType.ARMING_CHANGED,
raw_word=word,
area_index=(word >> 8) & 0x0F,
new_mode=(word >> 12) & 0x07,
user_index=word & 0xFF,
is_set_command=((word >> 8) & 0x80) == 0x80,
)
return UnknownEvent(event_type=EventType.UNKNOWN, raw_word=word)
# --------------------------------------------------------------------------
# Public parse entry points
# --------------------------------------------------------------------------
def parse_events(message: Message) -> list[SystemEvent]:
"""Decode a v2 ``SystemEvents`` (opcode 55) message into typed events.
The panel batches multiple state changes into a single message, so
the return type is always a list even for messages that carry just
one event. Empty SystemEvents messages return an empty list rather
than raising.
Reference: clsOLMsgSystemEvents.cs:10-18 (SystemEventsCount + per-
word accessor).
"""
payload = _ensure_system_events(message)
return [_classify(w) for w in _iter_event_words(payload)]
# --------------------------------------------------------------------------
# Registry — discriminator → subclass, useful for callers doing isinstance
# routing or generating documentation.
# --------------------------------------------------------------------------
EVENT_REGISTRY: dict[int, type[SystemEvent]] = {
int(EventType.USER_MACRO_BUTTON): UserMacroButton,
int(EventType.PRO_LINK_MESSAGE): ProLinkMessage,
int(EventType.CENTRALITE_SWITCH): CentraLiteSwitch,
int(EventType.ALARM_ACTIVATED): AlarmActivated,
int(EventType.ALARM_CLEARED): AlarmCleared,
int(EventType.ZONE_STATE_CHANGED): ZoneStateChanged,
int(EventType.UNIT_STATE_CHANGED): UnitStateChanged,
int(EventType.X10_CODE): X10CodeReceived,
int(EventType.ALL_ON_OFF): AllOnOff,
int(EventType.PHONE_LINE_DEAD): PhoneLineDead,
int(EventType.PHONE_LINE_RING): PhoneLineRinging,
int(EventType.PHONE_LINE_OFF_HOOK): PhoneLineOffHook,
int(EventType.PHONE_LINE_ON_HOOK): PhoneLineOnHook,
int(EventType.AC_LOST): AcLost,
int(EventType.AC_RESTORED): AcRestored,
int(EventType.BATTERY_LOW): BatteryLow,
int(EventType.BATTERY_RESTORED): BatteryRestored,
int(EventType.DCM_TROUBLE): DcmTrouble,
int(EventType.DCM_OK): DcmOk,
int(EventType.ENERGY_COST_LOW): EnergyCostChanged,
int(EventType.ENERGY_COST_MID): EnergyCostChanged,
int(EventType.ENERGY_COST_HIGH): EnergyCostChanged,
int(EventType.ENERGY_COST_CRITICAL): EnergyCostChanged,
int(EventType.CAMERA): CameraTrigger,
int(EventType.ACCESS_READER): AccessReaderEvent,
int(EventType.UPB_LINK): UpbLinkEvent,
int(EventType.ARMING_CHANGED): ArmingChanged,
int(EventType.UNKNOWN): UnknownEvent,
}
# --------------------------------------------------------------------------
# Helper: queue-backed iterator for tests + library consumers
# --------------------------------------------------------------------------
def _has_unsolicited(obj: object) -> bool:
"""True if ``obj`` quacks like an OmniConnection (has ``unsolicited()``).
We avoid importing OmniConnection at runtime to keep the dependency
purely a type hint, so EventStream stays usable with any object that
exposes the same async-iterator contract (real connection, mock, or
in-memory queue wrapper used in tests).
"""
return callable(getattr(obj, "unsolicited", None))
@dataclass
class EventStream:
"""Async iterator over typed ``SystemEvent`` objects.
Wraps any object with an ``unsolicited() -> AsyncIterator[Message]``
method (typically an :class:`OmniConnection`). Filters out non-
SystemEvents messages, parses each SystemEvents message into a list
of typed events, and yields them one at a time. A single inbound
message that batches three events therefore produces three iterator
steps callers don't have to know about batching.
Usage::
async for event in EventStream(conn):
match event:
case ZoneStateChanged() if event.is_open:
print(f"zone {event.zone_index} opened")
case ArmingChanged():
print(f"area {event.area_index} -> {event.mode_name}")
"""
source: object # OmniConnection or duck-typed equivalent
_buffer: list[SystemEvent] = field(default_factory=list)
def __post_init__(self) -> None:
if not _has_unsolicited(self.source):
raise TypeError(
"EventStream source must expose an unsolicited() method "
f"(got {type(self.source).__name__})"
)
def __aiter__(self) -> EventStream:
return self
async def __anext__(self) -> SystemEvent:
# Drain buffered events from the previous batched message first.
while not self._buffer:
try:
# ``unsolicited()`` returns a fresh async generator each
# call on the real connection, but tests pass us a queue
# wrapper that returns a long-lived iterator. Either way
# we want one message at a time, so we manually advance.
if not hasattr(self, "_iter") or self._iter is None: # type: ignore[has-type]
self._iter = self.source.unsolicited().__aiter__() # type: ignore[attr-defined]
msg = await self._iter.__anext__()
except StopAsyncIteration:
raise
except asyncio.CancelledError:
raise
if msg.opcode != int(OmniLink2MessageType.SystemEvents):
# Non-event message (Status, Ack, …) — silently ignore.
continue
self._buffer = parse_events(msg)
return self._buffer.pop(0)
__all__ = [
"EVENT_REGISTRY",
"AcLost",
"AcRestored",
"AccessReaderEvent",
"AlarmActivated",
"AlarmCleared",
"AlarmKind",
"AllOnOff",
"ArmingChanged",
"BatteryLow",
"BatteryRestored",
"CameraTrigger",
"CentraLiteSwitch",
"DcmOk",
"DcmTrouble",
"EnergyCostChanged",
"EventStream",
"EventType",
"PhoneLineDead",
"PhoneLineOffHook",
"PhoneLineOnHook",
"PhoneLineRinging",
"ProLinkMessage",
"SystemEvent",
"UnitStateChanged",
"UnknownEvent",
"UpbLinkAction",
"UpbLinkEvent",
"UserMacroButton",
"X10CodeReceived",
"ZoneStateChanged",
"parse_events",
]

519
tests/test_commands.py Normal file
View File

@ -0,0 +1,519 @@
"""Unit tests for command opcodes and status range queries.
The tests use a captured-payload approach: we monkey-patch
``OmniClient._conn.request`` so it records the (opcode, payload) pair
that the client would emit and returns whatever canned ``Message``
the test wants. No network involved just round-trip the bytes.
Conventions:
* Each test pins the exact wire bytes the client should produce, so
that any future refactor that rearranges the payload layout is
caught immediately.
* Where a single command can be expressed as a high-level helper
(``turn_unit_on``) we still verify the underlying ``Command``
enum value and the param1/param2 byte placement, not just the
success path.
"""
from __future__ import annotations
import struct
from collections.abc import Callable
from dataclasses import dataclass
import pytest
from omni_pca.client import OmniClient
from omni_pca.commands import Command, CommandFailedError, SecurityCommandResponse
from omni_pca.message import Message, encode_v2
from omni_pca.models import (
AreaStatus,
FanMode,
HoldMode,
HvacMode,
ObjectType,
SecurityMode,
ThermostatStatus,
UnitStatus,
ZoneStatus,
)
from omni_pca.opcodes import OmniLink2MessageType
# --------------------------------------------------------------------------
# Test scaffolding: a stub OmniClient that captures requests instead of
# sending them. We bypass __init__ (which builds an OmniConnection) by
# using object.__new__ + manually setting the _conn attr to our stub.
# --------------------------------------------------------------------------
@dataclass
class _RecordedRequest:
opcode: int
payload: bytes
class _StubConn:
"""Stand-in for OmniConnection; .request() captures + returns a canned reply."""
def __init__(
self,
reply_factory: Callable[[int, bytes], Message] | None = None,
) -> None:
self.calls: list[_RecordedRequest] = []
self._reply_factory = reply_factory or self._default_ack
@staticmethod
def _default_ack(_opcode: int, _payload: bytes) -> Message:
return encode_v2(OmniLink2MessageType.Ack)
async def request(
self,
opcode: OmniLink2MessageType | int,
payload: bytes = b"",
timeout: float | None = None,
) -> Message:
del timeout # mirror the OmniConnection.request signature; unused here
op_int = int(opcode)
self.calls.append(_RecordedRequest(opcode=op_int, payload=bytes(payload)))
return self._reply_factory(op_int, bytes(payload))
def _make_client(stub: _StubConn) -> OmniClient:
"""Build an OmniClient with a stubbed connection (no socket, no handshake)."""
client = object.__new__(OmniClient)
client._conn = stub # type: ignore[attr-defined]
client._subscriber_task = None # type: ignore[attr-defined]
return client
# --------------------------------------------------------------------------
# Command enum value pins. These guard against accidental renumbering.
# --------------------------------------------------------------------------
def test_command_enum_pins_unit_values() -> None:
assert Command.UNIT_OFF == 0
assert Command.UNIT_ON == 1
assert Command.UNIT_LEVEL == 9
assert Command.BYPASS_ZONE == 4
assert Command.RESTORE_ZONE == 5
def test_command_enum_pins_thermostat_values() -> None:
# enuUnitCommand.SetLowSetPt (line 71) → 66 (heat)
assert Command.SET_THERMOSTAT_HEAT_SETPOINT == 66
# enuUnitCommand.SetHighSetPt (line 72) → 67 (cool)
assert Command.SET_THERMOSTAT_COOL_SETPOINT == 67
# enuUnitCommand.Mode/Fan/Hold (lines 73/74/75)
assert Command.SET_THERMOSTAT_SYSTEM_MODE == 68
assert Command.SET_THERMOSTAT_FAN_MODE == 69
assert Command.SET_THERMOSTAT_HOLD_MODE == 70
def test_command_enum_pins_message_and_program_values() -> None:
assert Command.SHOW_MESSAGE_WITH_BEEP == 80
assert Command.LOG_MESSAGE == 81
assert Command.CLEAR_MESSAGE == 82
assert Command.SHOW_MESSAGE_NO_BEEP == 86
assert Command.EXECUTE_BUTTON == 7
assert Command.EXECUTE_PROGRAM == 104
def test_security_command_response_enum_pins() -> None:
assert SecurityCommandResponse.SUCCESS == 0
assert SecurityCommandResponse.INVALID_CODE == 1
assert SecurityCommandResponse.INVALID_AREA == 3
assert SecurityCommandResponse.CODE_LOCKED_OUT == 6
# --------------------------------------------------------------------------
# execute_command() — generic Command opcode (20)
# --------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_command_payload_layout() -> None:
stub = _StubConn()
client = _make_client(stub)
# Pretend we're flipping unit #257 ON (parameter2 needs both bytes).
await client.execute_command(Command.UNIT_ON, parameter1=0, parameter2=257)
assert len(stub.calls) == 1
call = stub.calls[0]
assert call.opcode == int(OmniLink2MessageType.Command)
# Command (1) + p1 (1 byte) + p2 (BE u16) = 4 bytes
assert call.payload == bytes([1, 0, 0x01, 0x01])
@pytest.mark.asyncio
async def test_execute_command_packs_param1_byte_and_param2_be_u16() -> None:
stub = _StubConn()
client = _make_client(stub)
await client.execute_command(
Command.UNIT_LEVEL, parameter1=75, parameter2=0xABCD
)
payload = stub.calls[0].payload
# [cmd=9, p1=75, p2_hi=0xAB, p2_lo=0xCD]
assert payload == bytes([9, 75, 0xAB, 0xCD])
@pytest.mark.asyncio
async def test_execute_command_validates_param_ranges() -> None:
stub = _StubConn()
client = _make_client(stub)
with pytest.raises(ValueError, match="parameter1"):
await client.execute_command(Command.UNIT_ON, parameter1=256)
with pytest.raises(ValueError, match="parameter2"):
await client.execute_command(Command.UNIT_ON, parameter2=0x10000)
# No request emitted on validation failure.
assert stub.calls == []
@pytest.mark.asyncio
async def test_execute_command_raises_on_nak() -> None:
def nak_reply(_op: int, _pl: bytes) -> Message:
return encode_v2(OmniLink2MessageType.Nak)
stub = _StubConn(reply_factory=nak_reply)
client = _make_client(stub)
with pytest.raises(CommandFailedError, match="NAK"):
await client.execute_command(Command.UNIT_ON, parameter2=1)
# --------------------------------------------------------------------------
# Convenience wrappers over execute_command
# --------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_turn_unit_on_off_emits_correct_command_byte() -> None:
stub = _StubConn()
client = _make_client(stub)
await client.turn_unit_on(5)
await client.turn_unit_off(5)
assert stub.calls[0].payload == bytes([1, 0, 0, 5]) # UNIT_ON
assert stub.calls[1].payload == bytes([0, 0, 0, 5]) # UNIT_OFF
@pytest.mark.asyncio
async def test_set_unit_level_validates_range_and_emits_level() -> None:
stub = _StubConn()
client = _make_client(stub)
await client.set_unit_level(3, 50)
assert stub.calls[0].payload == bytes([9, 50, 0, 3])
with pytest.raises(ValueError, match=r"0\.\.100"):
await client.set_unit_level(3, 101)
@pytest.mark.asyncio
async def test_bypass_and_restore_zone_emit_correct_payload() -> None:
stub = _StubConn()
client = _make_client(stub)
await client.bypass_zone(12, code=2)
await client.restore_zone(12, code=2)
assert stub.calls[0].payload == bytes([4, 2, 0, 12]) # BYPASS_ZONE
assert stub.calls[1].payload == bytes([5, 2, 0, 12]) # RESTORE_ZONE
@pytest.mark.asyncio
async def test_set_thermostat_modes_emit_correct_payloads() -> None:
stub = _StubConn()
client = _make_client(stub)
await client.set_thermostat_system_mode(2, HvacMode.COOL)
await client.set_thermostat_fan_mode(2, FanMode.ON)
await client.set_thermostat_hold_mode(2, HoldMode.HOLD)
assert stub.calls[0].payload == bytes([68, int(HvacMode.COOL), 0, 2])
assert stub.calls[1].payload == bytes([69, int(FanMode.ON), 0, 2])
assert stub.calls[2].payload == bytes([70, int(HoldMode.HOLD), 0, 2])
@pytest.mark.asyncio
async def test_set_thermostat_setpoints_use_raw_byte() -> None:
stub = _StubConn()
client = _make_client(stub)
await client.set_thermostat_heat_setpoint_raw(1, 140) # ~70 °F
await client.set_thermostat_cool_setpoint_raw(1, 160) # ~80 °F
assert stub.calls[0].payload == bytes([66, 140, 0, 1])
assert stub.calls[1].payload == bytes([67, 160, 0, 1])
@pytest.mark.asyncio
async def test_button_program_message_helpers() -> None:
stub = _StubConn()
client = _make_client(stub)
await client.execute_button(4)
await client.execute_program(7)
await client.show_message(2, beep=True)
await client.show_message(2, beep=False)
await client.clear_message(2)
assert stub.calls[0].payload == bytes([7, 0, 0, 4]) # EXECUTE_BUTTON
assert stub.calls[1].payload == bytes([104, 0, 0, 7]) # EXECUTE_PROGRAM
assert stub.calls[2].payload == bytes([80, 0, 0, 2]) # SHOW_MSG_BEEP
assert stub.calls[3].payload == bytes([86, 0, 0, 2]) # SHOW_MSG_NOBEEP
assert stub.calls[4].payload == bytes([82, 0, 0, 2]) # CLEAR_MESSAGE
# --------------------------------------------------------------------------
# execute_security_command (opcode 74)
# --------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_security_command_payload_encoding_away_1234() -> None:
"""The C# code packs the 4-digit code as four separate digit bytes."""
stub = _StubConn(
reply_factory=lambda _op, _pl: encode_v2(
OmniLink2MessageType.ExecuteSecurityCommandResponse,
bytes([SecurityCommandResponse.SUCCESS]),
)
)
client = _make_client(stub)
result = await client.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=1234
)
assert result is None
payload = stub.calls[0].payload
# area, mode, d1, d2, d3, d4
assert payload == bytes([1, int(SecurityMode.AWAY), 1, 2, 3, 4])
assert stub.calls[0].opcode == int(
OmniLink2MessageType.ExecuteSecurityCommand
)
@pytest.mark.asyncio
async def test_execute_security_command_pads_short_codes_with_zeros() -> None:
"""Code 7 → digits 0,0,0,7 (matches the C# arithmetic)."""
stub = _StubConn(
reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.Ack)
)
client = _make_client(stub)
await client.execute_security_command(
area=2, mode=SecurityMode.OFF, code=7
)
assert stub.calls[0].payload == bytes([2, 0, 0, 0, 0, 7])
@pytest.mark.asyncio
async def test_execute_security_command_failure_raises_with_code() -> None:
def reply(_op: int, _pl: bytes) -> Message:
return encode_v2(
OmniLink2MessageType.ExecuteSecurityCommandResponse,
bytes([SecurityCommandResponse.INVALID_CODE]),
)
stub = _StubConn(reply_factory=reply)
client = _make_client(stub)
with pytest.raises(CommandFailedError) as ei:
await client.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=9999
)
assert ei.value.failure_code == int(SecurityCommandResponse.INVALID_CODE)
assert "INVALID_CODE" in str(ei.value)
@pytest.mark.asyncio
async def test_execute_security_command_validates_inputs() -> None:
stub = _StubConn()
client = _make_client(stub)
with pytest.raises(ValueError, match="area"):
await client.execute_security_command(
area=0, mode=SecurityMode.AWAY, code=1234
)
with pytest.raises(ValueError, match="code"):
await client.execute_security_command(
area=1, mode=SecurityMode.AWAY, code=10000
)
assert stub.calls == []
# --------------------------------------------------------------------------
# acknowledge_alerts (opcode 60)
# --------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_acknowledge_alerts_sends_no_payload_and_expects_ack() -> None:
stub = _StubConn()
client = _make_client(stub)
await client.acknowledge_alerts()
assert stub.calls[0].opcode == int(OmniLink2MessageType.AcknowledgeAlerts)
assert stub.calls[0].payload == b""
# --------------------------------------------------------------------------
# get_object_status (opcode 34/35) — request payload + reply parsing
# --------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_object_status_builds_request_payload() -> None:
"""RequestStatus is [object_type, start_hi, start_lo, end_hi, end_lo]."""
captured: list[bytes] = []
def reply(_op: int, payload: bytes) -> Message:
captured.append(payload)
# Reply with one zone record: number=3, status=0x10, loop=200.
body = bytes([int(ObjectType.ZONE), 0, 3, 0x10, 200])
return encode_v2(OmniLink2MessageType.Status, body)
stub = _StubConn(reply_factory=reply)
client = _make_client(stub)
zones = await client.get_object_status(ObjectType.ZONE, 3)
assert captured[0] == struct.pack(">BHH", int(ObjectType.ZONE), 3, 3)
assert len(zones) == 1
z = zones[0]
assert isinstance(z, ZoneStatus)
assert z.index == 3
assert z.raw_status == 0x10
assert z.loop == 200
@pytest.mark.asyncio
async def test_get_object_status_parses_multiple_unit_records() -> None:
"""Unit records are 5 bytes each (clsOL2MsgStatus.cs:17)."""
def reply(_op: int, _pl: bytes) -> Message:
# object_type byte + two 5-byte unit records.
records = (
bytes([0, 1, 1, 0, 0]) # unit 1, state=1 (On)
+ bytes([0, 2, 100, 0, 0]) # unit 2, state=100 (level 0%)
)
return encode_v2(
OmniLink2MessageType.Status,
bytes([int(ObjectType.UNIT)]) + records,
)
stub = _StubConn(reply_factory=reply)
client = _make_client(stub)
units = await client.get_object_status(ObjectType.UNIT, 1, 2)
assert len(units) == 2
assert all(isinstance(u, UnitStatus) for u in units)
u1, u2 = units
assert u1.index == 1
assert u1.state == 1
assert u2.index == 2
assert u2.state == 100
@pytest.mark.asyncio
async def test_get_object_status_returns_empty_on_eod() -> None:
stub = _StubConn(
reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.EOD)
)
client = _make_client(stub)
out = await client.get_object_status(ObjectType.AREA, 99)
assert out == []
@pytest.mark.asyncio
async def test_get_object_status_raises_on_nak() -> None:
stub = _StubConn(
reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.Nak)
)
client = _make_client(stub)
with pytest.raises(CommandFailedError, match="NAK"):
await client.get_object_status(ObjectType.ZONE, 1)
# --------------------------------------------------------------------------
# get_extended_status (opcode 58/59) — header has object_length byte
# --------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_extended_status_request_layout_matches_spec() -> None:
captured: list[bytes] = []
def reply(_op: int, payload: bytes) -> Message:
captured.append(payload)
# Reply: object_type, object_length=4, then one zone record (4 bytes).
body = bytes([int(ObjectType.ZONE), 4]) + bytes([0, 5, 0x00, 100])
return encode_v2(OmniLink2MessageType.ExtendedStatus, body)
stub = _StubConn(reply_factory=reply)
client = _make_client(stub)
zones = await client.get_extended_status(ObjectType.ZONE, 5, 5)
assert captured[0] == struct.pack(">BHH", int(ObjectType.ZONE), 5, 5)
assert stub.calls[0].opcode == int(
OmniLink2MessageType.RequestExtendedStatus
)
assert len(zones) == 1
assert zones[0].index == 5 # type: ignore[union-attr]
@pytest.mark.asyncio
async def test_get_extended_status_uses_object_length_byte_for_record_size() -> None:
"""ExtendedStatus thermostat record is 14 bytes (clsOL2MsgExtendedStatus.cs:138-235)."""
record = bytes(
[
0, 1, # number = 1
0, # status
140, # current temp raw
120, 160, # heat / cool setpoints
int(HvacMode.AUTO),
int(FanMode.AUTO),
int(HoldMode.OFF),
150, # humidity raw
120, 160, # humidify / dehumidify setpoints
130, # outdoor temp raw
1, # H or C status
]
)
assert len(record) == 14
def reply(_op: int, _pl: bytes) -> Message:
body = bytes([int(ObjectType.THERMOSTAT), 14]) + record
return encode_v2(OmniLink2MessageType.ExtendedStatus, body)
stub = _StubConn(reply_factory=reply)
client = _make_client(stub)
out = await client.get_extended_status(ObjectType.THERMOSTAT, 1)
assert len(out) == 1
t = out[0]
assert isinstance(t, ThermostatStatus)
assert t.index == 1
assert t.temperature_raw == 140
assert t.system_mode == int(HvacMode.AUTO)
@pytest.mark.asyncio
async def test_get_extended_status_returns_empty_on_eod() -> None:
stub = _StubConn(
reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.EOD)
)
client = _make_client(stub)
out = await client.get_extended_status(ObjectType.AREA, 1, 8)
assert out == []
@pytest.mark.asyncio
async def test_get_extended_status_area_record_parses_to_areastatus() -> None:
"""Area ExtendedStatus record is 6 bytes
(clsOL2MsgExtendedStatus.cs:75-118): number(2) + mode + alarms + entry +
exit (matches our AreaStatus.parse).
"""
def reply(_op: int, _pl: bytes) -> Message:
# area 1, mode AWAY, alarms 0, entry 0, exit 30
record = bytes([0, 1, int(SecurityMode.AWAY), 0, 0, 30])
body = bytes([int(ObjectType.AREA), 6]) + record
return encode_v2(OmniLink2MessageType.ExtendedStatus, body)
stub = _StubConn(reply_factory=reply)
client = _make_client(stub)
out = await client.get_extended_status(ObjectType.AREA, 1)
assert len(out) == 1
a = out[0]
assert isinstance(a, AreaStatus)
assert a.index == 1
assert a.mode == int(SecurityMode.AWAY)
assert a.exit_timer_secs == 30
@pytest.mark.asyncio
async def test_get_object_status_validates_range() -> None:
stub = _StubConn()
client = _make_client(stub)
with pytest.raises(ValueError, match="end"):
await client.get_object_status(ObjectType.ZONE, 5, 3)
assert stub.calls == []

405
tests/test_events.py Normal file
View File

@ -0,0 +1,405 @@
"""Tests for ``omni_pca.events`` — typed system-event parsing.
The test data is hand-built each helper constructs a synthetic v2
``SystemEvents`` (opcode 55) ``Message`` containing one or more 16-bit
event words encoded in the panel's wire layout. Cross-reference the
bit-mask comments below against ``clsText.GetEventCategory``
(clsText.cs:1585-1690) and ``GetEventText`` (clsText.cs:1693-1911).
"""
from __future__ import annotations
import asyncio
import pytest
from omni_pca.events import (
EVENT_REGISTRY,
AccessReaderEvent,
AcLost,
AcRestored,
AlarmActivated,
AlarmCleared,
AlarmKind,
AllOnOff,
ArmingChanged,
BatteryLow,
BatteryRestored,
CameraTrigger,
DcmOk,
DcmTrouble,
EnergyCostChanged,
EventStream,
EventType,
PhoneLineDead,
PhoneLineOffHook,
PhoneLineOnHook,
PhoneLineRinging,
SystemEvent,
UnitStateChanged,
UnknownEvent,
UpbLinkAction,
UpbLinkEvent,
UserMacroButton,
X10CodeReceived,
ZoneStateChanged,
parse_events,
)
from omni_pca.message import START_CHAR_V2, Message
from omni_pca.opcodes import OmniLink2MessageType
# --------------------------------------------------------------------------
# helpers
# --------------------------------------------------------------------------
def _make_events_message(*words: int) -> Message:
"""Build a v2 SystemEvents (opcode 55) Message containing the given
16-bit event words, encoded big-endian on the wire."""
payload = bytearray()
for w in words:
payload.append((w >> 8) & 0xFF)
payload.append(w & 0xFF)
data = bytes([int(OmniLink2MessageType.SystemEvents)]) + bytes(payload)
return Message(start_char=START_CHAR_V2, data=data)
# --------------------------------------------------------------------------
# Per-subclass parse tests — one event per message
# --------------------------------------------------------------------------
def test_parse_user_macro_button() -> None:
msg = _make_events_message(0x0042)
events = parse_events(msg)
assert len(events) == 1
ev = events[0]
assert isinstance(ev, UserMacroButton)
assert ev.button_index == 0x42
assert ev.event_type is EventType.USER_MACRO_BUTTON
assert ev.raw_word == 0x0042
def test_parse_alarm_activated_burglary_area_3() -> None:
# ALARM family = 0x0200; (alarm_type=Burglary=1) << 4 | area=3 = 0x13
word = 0x0200 | (int(AlarmKind.BURGLARY) << 4) | 0x03
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, AlarmActivated)
assert ev.alarm_type == AlarmKind.BURGLARY
assert ev.area_index == 3
def test_parse_alarm_cleared_when_alarm_type_zero() -> None:
# ALARM family with alarm_type=ANY(0): we surface as a cleared event.
word = 0x0200 | (int(AlarmKind.ANY) << 4) | 0x05
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, AlarmCleared)
assert ev.area_index == 5
def test_parse_zone_state_changed_open() -> None:
# ZONE family = 0x0400; bit 9 (0x0200) set ⇒ not-ready/open; zone 17.
word = 0x0400 | 0x0200 | 17
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, ZoneStateChanged)
assert ev.zone_index == 17
assert ev.is_open
assert not ev.is_secure
assert ev.new_state == 1
def test_parse_zone_state_changed_secure() -> None:
word = 0x0400 | 23 # bit 9 clear ⇒ secure
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, ZoneStateChanged)
assert ev.zone_index == 23
assert ev.is_secure
def test_parse_unit_state_changed_high_index_on() -> None:
# UNIT family = 0x0800; index 300 = bit 8 set + low byte 44; bit 9 = on.
# 300 = 256 + 44; bit 8 of high byte == ((1<<8))=0x100; OR bit 9 (0x200).
word = 0x0800 | 0x0200 | 0x0100 | 44 # high-byte bit 0 (=0x100) + bit 1 (=0x200)
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, UnitStateChanged)
assert ev.unit_index == 300
assert ev.is_on
assert ev.new_state == 1
def test_parse_unit_state_changed_low_index_off() -> None:
word = 0x0800 | 7 # bit 9 clear, no index extension
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, UnitStateChanged)
assert ev.unit_index == 7
assert not ev.is_on
def test_parse_x10_code_received() -> None:
# X-10 family = 0x0C00; house 'B' (=1<<4 in low byte high nibble),
# unit 5 (=4 in low nibble, +1), bit 9 ⇒ on.
word = 0x0C00 | 0x0200 | (1 << 4) | 4
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, X10CodeReceived)
assert ev.house_code == "B"
assert ev.unit_number == 5
assert ev.is_on
assert not ev.all_units
def test_parse_all_on_off_area_2_on() -> None:
# ALL_ON_OFF family = 0x03E0; area 2 in low nibble; on bit (0x10) set.
word = 0x03E0 | 0x10 | 2
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, AllOnOff)
assert ev.area_index == 2
assert ev.on
def test_parse_phone_singletons() -> None:
cases: list[tuple[int, type]] = [
(768, PhoneLineDead),
(769, PhoneLineRinging),
(770, PhoneLineOffHook),
(771, PhoneLineOnHook),
]
for word, klass in cases:
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, klass), (word, type(ev))
assert ev.raw_word == word
def test_parse_ac_battery_dcm_singletons() -> None:
[ac_off] = parse_events(_make_events_message(772))
[ac_on] = parse_events(_make_events_message(773))
[batt_low] = parse_events(_make_events_message(774))
[batt_ok] = parse_events(_make_events_message(775))
[dcm_bad] = parse_events(_make_events_message(776))
[dcm_ok] = parse_events(_make_events_message(777))
assert isinstance(ac_off, AcLost)
assert isinstance(ac_on, AcRestored)
assert isinstance(batt_low, BatteryLow)
assert isinstance(batt_ok, BatteryRestored)
assert isinstance(dcm_bad, DcmTrouble)
assert isinstance(dcm_ok, DcmOk)
def test_parse_energy_cost_levels() -> None:
for word, level in [(778, 0), (779, 1), (780, 2), (781, 3)]:
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, EnergyCostChanged)
assert ev.cost_level == level
def test_parse_camera_trigger() -> None:
[ev] = parse_events(_make_events_message(785)) # 785 - 781 = 4 → camera 4
assert isinstance(ev, CameraTrigger)
assert ev.camera_index == 4
def test_parse_access_reader_event() -> None:
# 976..991: reader_index = (word & 0xF) + 1
[ev] = parse_events(_make_events_message(978))
assert isinstance(ev, AccessReaderEvent)
assert ev.reader_index == ((978 & 0xF) + 1)
def test_parse_upb_link_actions() -> None:
# UPB_LINK family = 0xFC00; upper byte selects action.
actions = [
(UpbLinkAction.OFF, 0xFC),
(UpbLinkAction.ON, 0xFD),
(UpbLinkAction.SET, 0xFE),
(UpbLinkAction.FADE_STOP, 0xFF),
]
for action, upper in actions:
word = (upper << 8) | 12 # link index 12
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, UpbLinkEvent), (action, ev)
assert ev.link_index == 12
assert ev.action == int(action)
def test_parse_arming_changed_user_5_area_2_away() -> None:
# SECURITY_MODE_CHANGE catch-all:
# bits 12-14 = SecurityMode (3 = AWAY)
# bits 8-11 = area (2)
# low byte = user/code (5)
word = (3 << 12) | (2 << 8) | 5
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, ArmingChanged)
assert ev.area_index == 2
assert ev.new_mode == 3
assert ev.user_index == 5
assert ev.mode_name == "AWAY"
def test_parse_arming_changed_set_command_bit() -> None:
# Same as above but with the "Set" verb bit (bit 15) set.
word = (1 << 12) | (1 << 15) | (1 << 8) | 9
[ev] = parse_events(_make_events_message(word))
assert isinstance(ev, ArmingChanged)
assert ev.is_set_command
assert ev.user_index == 9
def test_unknown_event_returned_for_unmapped_word() -> None:
# Any value in the gap between the special singletons and the SECURITY
# catch-all that ALSO has zero in the high nibble of the high byte
# falls through to UnknownEvent. word=900 is in the gap (after CAMERA
# range, before ACCESS_READER) and (900 >> 8) & 0xF0 = 0 → unknown.
[ev] = parse_events(_make_events_message(900))
assert isinstance(ev, UnknownEvent)
assert ev.event_type is EventType.UNKNOWN
assert ev.raw_word == 900
# --------------------------------------------------------------------------
# Multi-event-per-packet (the panel batches into a single SystemEvents msg)
# --------------------------------------------------------------------------
def test_parse_three_events_in_one_message() -> None:
msg = _make_events_message(
0x0400 | 0x0200 | 5, # zone 5 opened
(3 << 12) | (1 << 8) | 7, # area 1 → AWAY by user 7
773, # AC restored
)
events = parse_events(msg)
assert len(events) == 3
z, a, ac = events
assert isinstance(z, ZoneStateChanged)
assert z.zone_index == 5
assert z.is_open
assert isinstance(a, ArmingChanged)
assert a.area_index == 1
assert a.new_mode == 3
assert isinstance(ac, AcRestored)
def test_empty_system_events_message_returns_empty_list() -> None:
msg = _make_events_message() # zero event words
assert parse_events(msg) == []
def test_odd_trailing_byte_is_silently_truncated() -> None:
"""The C# count is ``(MessageLength - 1) / 2`` — a trailing odd byte
is dropped, not raised. We mirror that to stay tolerant of messages
where the panel has appended a stray byte (seen on some firmwares)."""
data = bytes([int(OmniLink2MessageType.SystemEvents)]) + b"\x00\x42\x77"
msg = Message(start_char=START_CHAR_V2, data=data)
events = parse_events(msg)
assert len(events) == 1
assert isinstance(events[0], UserMacroButton)
assert events[0].button_index == 0x42
def test_parse_rejects_wrong_opcode() -> None:
# opcode 25 = SystemStatus, not SystemEvents.
msg = Message(
start_char=START_CHAR_V2,
data=bytes([int(OmniLink2MessageType.SystemStatus)]) + b"\x00",
)
with pytest.raises(ValueError, match="not a SystemEvents message"):
parse_events(msg)
def test_classmethod_parse_matches_function() -> None:
msg = _make_events_message(0x0042, 773)
via_classmethod = SystemEvent.parse(msg)
via_function = parse_events(msg)
assert [type(e) for e in via_classmethod] == [type(e) for e in via_function]
assert [e.raw_word for e in via_classmethod] == [e.raw_word for e in via_function]
# --------------------------------------------------------------------------
# Registry sanity check
# --------------------------------------------------------------------------
def test_event_registry_covers_every_eventtype_value() -> None:
"""Every non-UNKNOWN EventType value should map to a concrete class."""
for et in EventType:
assert int(et) in EVENT_REGISTRY, f"missing registry entry for {et!r}"
# --------------------------------------------------------------------------
# EventStream — async iterator over an underlying connection-like source
# --------------------------------------------------------------------------
class _FakeConnection:
"""Minimal stand-in for OmniConnection used in the EventStream tests.
Exposes the same ``unsolicited() -> AsyncIterator[Message]`` contract
backed by an in-memory queue so the test harness can drive the stream
deterministically without touching real I/O.
"""
def __init__(self) -> None:
self.queue: asyncio.Queue[Message] = asyncio.Queue()
self._closed = False
def push(self, msg: Message) -> None:
self.queue.put_nowait(msg)
def close(self) -> None:
self._closed = True
def unsolicited(self):
async def _gen():
while True:
if self._closed and self.queue.empty():
return
msg = await self.queue.get()
yield msg
return _gen()
@pytest.mark.asyncio
async def test_event_stream_yields_one_typed_event_per_step() -> None:
conn = _FakeConnection()
# Three events spread across two messages — confirms both flattening
# and cross-message iteration work.
conn.push(_make_events_message(0x0042, 773)) # 2 events
conn.push(_make_events_message(0x0400 | 0x0200 | 9)) # 1 event
conn.close()
stream = EventStream(source=conn)
seen: list[SystemEvent] = []
async for ev in stream:
seen.append(ev)
if len(seen) == 3:
break
assert isinstance(seen[0], UserMacroButton)
assert seen[0].button_index == 0x42
assert isinstance(seen[1], AcRestored)
assert isinstance(seen[2], ZoneStateChanged)
assert seen[2].zone_index == 9
assert seen[2].is_open
@pytest.mark.asyncio
async def test_event_stream_skips_non_event_messages() -> None:
"""Status replies, Acks, etc. that show up on the unsolicited
channel must be silently dropped only opcode 55 produces events."""
conn = _FakeConnection()
# Inject a SystemStatus reply (opcode 25) — should be filtered out.
other = Message(
start_char=START_CHAR_V2,
data=bytes([int(OmniLink2MessageType.SystemStatus)]) + b"\x00" * 14,
)
conn.push(other)
conn.push(_make_events_message(0x0042))
conn.close()
stream = EventStream(source=conn)
ev = await stream.__anext__()
assert isinstance(ev, UserMacroButton)
assert ev.button_index == 0x42
def test_event_stream_rejects_non_connection_source() -> None:
with pytest.raises(TypeError, match="unsolicited"):
EventStream(source=object()) # type: ignore[arg-type]