src/omni_pca/commands.py — Command IntEnum (64 values, sourced from
enuUnitCommand.cs which is the canonical 'enuCommand' despite the misleading
name) + SecurityCommandResponse + CommandFailedError exception. Notable
discovery: enuUnitCommand.UserSetting (104) is actually EXECUTE_PROGRAM;
renamed for clarity, C# alias documented inline.
src/omni_pca/client.py — 18 new methods on OmniClient:
Core: execute_command, execute_security_command, acknowledge_alerts,
get_object_status, get_extended_status
Wrappers: turn_unit_on/off, set_unit_level, bypass_zone, restore_zone,
set_thermostat_{system,fan,hold}_mode,
set_thermostat_{heat,cool}_setpoint_raw,
execute_button, execute_program, show_message, clear_message
All command methods raise CommandFailedError on Nak.
src/omni_pca/events.py — typed SystemEvents (opcode 55) decoder.
- EventType IntEnum (28 dispatch tags)
- 26 SystemEvent subclasses + UnknownEvent catch-all
Includes: ZoneStateChanged, UnitStateChanged, ArmingChanged,
AlarmActivated/Cleared, AcLost/Restored, BatteryLow/Restored,
PhoneLine{Off,On,Dead,Restored}, UserMacroButton, ProLinkMessage,
CentraLiteSwitch, X10CodeReceived, AllOnOff, DcmTrouble/Ok,
EnergyCostChanged, CameraTrigger, AccessReaderEvent, UpbLinkEvent
- SystemEvents packets carry MULTIPLE events; public API is
parse_events(message) -> list[SystemEvent], plus SystemEvent.parse()
- EventStream helper that flattens batches across messages
- Wiring of OmniClient.events() left for next pass
55 new tests across both files. 194 pass, 2 pre-existing skips. Ruff clean.
520 lines
18 KiB
Python
520 lines
18 KiB
Python
"""Unit tests for command opcodes and status range queries.
|
|
|
|
The tests use a captured-payload approach: we monkey-patch
|
|
``OmniClient._conn.request`` so it records the (opcode, payload) pair
|
|
that the client would emit and returns whatever canned ``Message``
|
|
the test wants. No network involved — just round-trip the bytes.
|
|
|
|
Conventions:
|
|
* Each test pins the exact wire bytes the client should produce, so
|
|
that any future refactor that rearranges the payload layout is
|
|
caught immediately.
|
|
* Where a single command can be expressed as a high-level helper
|
|
(``turn_unit_on``) we still verify the underlying ``Command``
|
|
enum value and the param1/param2 byte placement, not just the
|
|
success path.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import struct
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass
|
|
|
|
import pytest
|
|
|
|
from omni_pca.client import OmniClient
|
|
from omni_pca.commands import Command, CommandFailedError, SecurityCommandResponse
|
|
from omni_pca.message import Message, encode_v2
|
|
from omni_pca.models import (
|
|
AreaStatus,
|
|
FanMode,
|
|
HoldMode,
|
|
HvacMode,
|
|
ObjectType,
|
|
SecurityMode,
|
|
ThermostatStatus,
|
|
UnitStatus,
|
|
ZoneStatus,
|
|
)
|
|
from omni_pca.opcodes import OmniLink2MessageType
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Test scaffolding: a stub OmniClient that captures requests instead of
|
|
# sending them. We bypass __init__ (which builds an OmniConnection) by
|
|
# using object.__new__ + manually setting the _conn attr to our stub.
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class _RecordedRequest:
|
|
opcode: int
|
|
payload: bytes
|
|
|
|
|
|
class _StubConn:
|
|
"""Stand-in for OmniConnection; .request() captures + returns a canned reply."""
|
|
|
|
def __init__(
|
|
self,
|
|
reply_factory: Callable[[int, bytes], Message] | None = None,
|
|
) -> None:
|
|
self.calls: list[_RecordedRequest] = []
|
|
self._reply_factory = reply_factory or self._default_ack
|
|
|
|
@staticmethod
|
|
def _default_ack(_opcode: int, _payload: bytes) -> Message:
|
|
return encode_v2(OmniLink2MessageType.Ack)
|
|
|
|
async def request(
|
|
self,
|
|
opcode: OmniLink2MessageType | int,
|
|
payload: bytes = b"",
|
|
timeout: float | None = None,
|
|
) -> Message:
|
|
del timeout # mirror the OmniConnection.request signature; unused here
|
|
op_int = int(opcode)
|
|
self.calls.append(_RecordedRequest(opcode=op_int, payload=bytes(payload)))
|
|
return self._reply_factory(op_int, bytes(payload))
|
|
|
|
|
|
def _make_client(stub: _StubConn) -> OmniClient:
|
|
"""Build an OmniClient with a stubbed connection (no socket, no handshake)."""
|
|
client = object.__new__(OmniClient)
|
|
client._conn = stub # type: ignore[attr-defined]
|
|
client._subscriber_task = None # type: ignore[attr-defined]
|
|
return client
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Command enum value pins. These guard against accidental renumbering.
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
def test_command_enum_pins_unit_values() -> None:
|
|
assert Command.UNIT_OFF == 0
|
|
assert Command.UNIT_ON == 1
|
|
assert Command.UNIT_LEVEL == 9
|
|
assert Command.BYPASS_ZONE == 4
|
|
assert Command.RESTORE_ZONE == 5
|
|
|
|
|
|
def test_command_enum_pins_thermostat_values() -> None:
|
|
# enuUnitCommand.SetLowSetPt (line 71) → 66 (heat)
|
|
assert Command.SET_THERMOSTAT_HEAT_SETPOINT == 66
|
|
# enuUnitCommand.SetHighSetPt (line 72) → 67 (cool)
|
|
assert Command.SET_THERMOSTAT_COOL_SETPOINT == 67
|
|
# enuUnitCommand.Mode/Fan/Hold (lines 73/74/75)
|
|
assert Command.SET_THERMOSTAT_SYSTEM_MODE == 68
|
|
assert Command.SET_THERMOSTAT_FAN_MODE == 69
|
|
assert Command.SET_THERMOSTAT_HOLD_MODE == 70
|
|
|
|
|
|
def test_command_enum_pins_message_and_program_values() -> None:
|
|
assert Command.SHOW_MESSAGE_WITH_BEEP == 80
|
|
assert Command.LOG_MESSAGE == 81
|
|
assert Command.CLEAR_MESSAGE == 82
|
|
assert Command.SHOW_MESSAGE_NO_BEEP == 86
|
|
assert Command.EXECUTE_BUTTON == 7
|
|
assert Command.EXECUTE_PROGRAM == 104
|
|
|
|
|
|
def test_security_command_response_enum_pins() -> None:
|
|
assert SecurityCommandResponse.SUCCESS == 0
|
|
assert SecurityCommandResponse.INVALID_CODE == 1
|
|
assert SecurityCommandResponse.INVALID_AREA == 3
|
|
assert SecurityCommandResponse.CODE_LOCKED_OUT == 6
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# execute_command() — generic Command opcode (20)
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_command_payload_layout() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
# Pretend we're flipping unit #257 ON (parameter2 needs both bytes).
|
|
await client.execute_command(Command.UNIT_ON, parameter1=0, parameter2=257)
|
|
|
|
assert len(stub.calls) == 1
|
|
call = stub.calls[0]
|
|
assert call.opcode == int(OmniLink2MessageType.Command)
|
|
# Command (1) + p1 (1 byte) + p2 (BE u16) = 4 bytes
|
|
assert call.payload == bytes([1, 0, 0x01, 0x01])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_command_packs_param1_byte_and_param2_be_u16() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
await client.execute_command(
|
|
Command.UNIT_LEVEL, parameter1=75, parameter2=0xABCD
|
|
)
|
|
payload = stub.calls[0].payload
|
|
# [cmd=9, p1=75, p2_hi=0xAB, p2_lo=0xCD]
|
|
assert payload == bytes([9, 75, 0xAB, 0xCD])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_command_validates_param_ranges() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
with pytest.raises(ValueError, match="parameter1"):
|
|
await client.execute_command(Command.UNIT_ON, parameter1=256)
|
|
with pytest.raises(ValueError, match="parameter2"):
|
|
await client.execute_command(Command.UNIT_ON, parameter2=0x10000)
|
|
# No request emitted on validation failure.
|
|
assert stub.calls == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_command_raises_on_nak() -> None:
|
|
def nak_reply(_op: int, _pl: bytes) -> Message:
|
|
return encode_v2(OmniLink2MessageType.Nak)
|
|
|
|
stub = _StubConn(reply_factory=nak_reply)
|
|
client = _make_client(stub)
|
|
with pytest.raises(CommandFailedError, match="NAK"):
|
|
await client.execute_command(Command.UNIT_ON, parameter2=1)
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# Convenience wrappers over execute_command
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_turn_unit_on_off_emits_correct_command_byte() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
await client.turn_unit_on(5)
|
|
await client.turn_unit_off(5)
|
|
assert stub.calls[0].payload == bytes([1, 0, 0, 5]) # UNIT_ON
|
|
assert stub.calls[1].payload == bytes([0, 0, 0, 5]) # UNIT_OFF
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_unit_level_validates_range_and_emits_level() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
await client.set_unit_level(3, 50)
|
|
assert stub.calls[0].payload == bytes([9, 50, 0, 3])
|
|
with pytest.raises(ValueError, match=r"0\.\.100"):
|
|
await client.set_unit_level(3, 101)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_bypass_and_restore_zone_emit_correct_payload() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
await client.bypass_zone(12, code=2)
|
|
await client.restore_zone(12, code=2)
|
|
assert stub.calls[0].payload == bytes([4, 2, 0, 12]) # BYPASS_ZONE
|
|
assert stub.calls[1].payload == bytes([5, 2, 0, 12]) # RESTORE_ZONE
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_thermostat_modes_emit_correct_payloads() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
await client.set_thermostat_system_mode(2, HvacMode.COOL)
|
|
await client.set_thermostat_fan_mode(2, FanMode.ON)
|
|
await client.set_thermostat_hold_mode(2, HoldMode.HOLD)
|
|
assert stub.calls[0].payload == bytes([68, int(HvacMode.COOL), 0, 2])
|
|
assert stub.calls[1].payload == bytes([69, int(FanMode.ON), 0, 2])
|
|
assert stub.calls[2].payload == bytes([70, int(HoldMode.HOLD), 0, 2])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_thermostat_setpoints_use_raw_byte() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
await client.set_thermostat_heat_setpoint_raw(1, 140) # ~70 °F
|
|
await client.set_thermostat_cool_setpoint_raw(1, 160) # ~80 °F
|
|
assert stub.calls[0].payload == bytes([66, 140, 0, 1])
|
|
assert stub.calls[1].payload == bytes([67, 160, 0, 1])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_button_program_message_helpers() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
await client.execute_button(4)
|
|
await client.execute_program(7)
|
|
await client.show_message(2, beep=True)
|
|
await client.show_message(2, beep=False)
|
|
await client.clear_message(2)
|
|
assert stub.calls[0].payload == bytes([7, 0, 0, 4]) # EXECUTE_BUTTON
|
|
assert stub.calls[1].payload == bytes([104, 0, 0, 7]) # EXECUTE_PROGRAM
|
|
assert stub.calls[2].payload == bytes([80, 0, 0, 2]) # SHOW_MSG_BEEP
|
|
assert stub.calls[3].payload == bytes([86, 0, 0, 2]) # SHOW_MSG_NOBEEP
|
|
assert stub.calls[4].payload == bytes([82, 0, 0, 2]) # CLEAR_MESSAGE
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# execute_security_command (opcode 74)
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_security_command_payload_encoding_away_1234() -> None:
|
|
"""The C# code packs the 4-digit code as four separate digit bytes."""
|
|
stub = _StubConn(
|
|
reply_factory=lambda _op, _pl: encode_v2(
|
|
OmniLink2MessageType.ExecuteSecurityCommandResponse,
|
|
bytes([SecurityCommandResponse.SUCCESS]),
|
|
)
|
|
)
|
|
client = _make_client(stub)
|
|
result = await client.execute_security_command(
|
|
area=1, mode=SecurityMode.AWAY, code=1234
|
|
)
|
|
assert result is None
|
|
payload = stub.calls[0].payload
|
|
# area, mode, d1, d2, d3, d4
|
|
assert payload == bytes([1, int(SecurityMode.AWAY), 1, 2, 3, 4])
|
|
assert stub.calls[0].opcode == int(
|
|
OmniLink2MessageType.ExecuteSecurityCommand
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_security_command_pads_short_codes_with_zeros() -> None:
|
|
"""Code 7 → digits 0,0,0,7 (matches the C# arithmetic)."""
|
|
stub = _StubConn(
|
|
reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.Ack)
|
|
)
|
|
client = _make_client(stub)
|
|
await client.execute_security_command(
|
|
area=2, mode=SecurityMode.OFF, code=7
|
|
)
|
|
assert stub.calls[0].payload == bytes([2, 0, 0, 0, 0, 7])
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_security_command_failure_raises_with_code() -> None:
|
|
def reply(_op: int, _pl: bytes) -> Message:
|
|
return encode_v2(
|
|
OmniLink2MessageType.ExecuteSecurityCommandResponse,
|
|
bytes([SecurityCommandResponse.INVALID_CODE]),
|
|
)
|
|
|
|
stub = _StubConn(reply_factory=reply)
|
|
client = _make_client(stub)
|
|
with pytest.raises(CommandFailedError) as ei:
|
|
await client.execute_security_command(
|
|
area=1, mode=SecurityMode.AWAY, code=9999
|
|
)
|
|
assert ei.value.failure_code == int(SecurityCommandResponse.INVALID_CODE)
|
|
assert "INVALID_CODE" in str(ei.value)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_security_command_validates_inputs() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
with pytest.raises(ValueError, match="area"):
|
|
await client.execute_security_command(
|
|
area=0, mode=SecurityMode.AWAY, code=1234
|
|
)
|
|
with pytest.raises(ValueError, match="code"):
|
|
await client.execute_security_command(
|
|
area=1, mode=SecurityMode.AWAY, code=10000
|
|
)
|
|
assert stub.calls == []
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# acknowledge_alerts (opcode 60)
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_acknowledge_alerts_sends_no_payload_and_expects_ack() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
await client.acknowledge_alerts()
|
|
assert stub.calls[0].opcode == int(OmniLink2MessageType.AcknowledgeAlerts)
|
|
assert stub.calls[0].payload == b""
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# get_object_status (opcode 34/35) — request payload + reply parsing
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_object_status_builds_request_payload() -> None:
|
|
"""RequestStatus is [object_type, start_hi, start_lo, end_hi, end_lo]."""
|
|
captured: list[bytes] = []
|
|
|
|
def reply(_op: int, payload: bytes) -> Message:
|
|
captured.append(payload)
|
|
# Reply with one zone record: number=3, status=0x10, loop=200.
|
|
body = bytes([int(ObjectType.ZONE), 0, 3, 0x10, 200])
|
|
return encode_v2(OmniLink2MessageType.Status, body)
|
|
|
|
stub = _StubConn(reply_factory=reply)
|
|
client = _make_client(stub)
|
|
zones = await client.get_object_status(ObjectType.ZONE, 3)
|
|
assert captured[0] == struct.pack(">BHH", int(ObjectType.ZONE), 3, 3)
|
|
assert len(zones) == 1
|
|
z = zones[0]
|
|
assert isinstance(z, ZoneStatus)
|
|
assert z.index == 3
|
|
assert z.raw_status == 0x10
|
|
assert z.loop == 200
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_object_status_parses_multiple_unit_records() -> None:
|
|
"""Unit records are 5 bytes each (clsOL2MsgStatus.cs:17)."""
|
|
|
|
def reply(_op: int, _pl: bytes) -> Message:
|
|
# object_type byte + two 5-byte unit records.
|
|
records = (
|
|
bytes([0, 1, 1, 0, 0]) # unit 1, state=1 (On)
|
|
+ bytes([0, 2, 100, 0, 0]) # unit 2, state=100 (level 0%)
|
|
)
|
|
return encode_v2(
|
|
OmniLink2MessageType.Status,
|
|
bytes([int(ObjectType.UNIT)]) + records,
|
|
)
|
|
|
|
stub = _StubConn(reply_factory=reply)
|
|
client = _make_client(stub)
|
|
units = await client.get_object_status(ObjectType.UNIT, 1, 2)
|
|
assert len(units) == 2
|
|
assert all(isinstance(u, UnitStatus) for u in units)
|
|
u1, u2 = units
|
|
assert u1.index == 1
|
|
assert u1.state == 1
|
|
assert u2.index == 2
|
|
assert u2.state == 100
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_object_status_returns_empty_on_eod() -> None:
|
|
stub = _StubConn(
|
|
reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.EOD)
|
|
)
|
|
client = _make_client(stub)
|
|
out = await client.get_object_status(ObjectType.AREA, 99)
|
|
assert out == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_object_status_raises_on_nak() -> None:
|
|
stub = _StubConn(
|
|
reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.Nak)
|
|
)
|
|
client = _make_client(stub)
|
|
with pytest.raises(CommandFailedError, match="NAK"):
|
|
await client.get_object_status(ObjectType.ZONE, 1)
|
|
|
|
|
|
# --------------------------------------------------------------------------
|
|
# get_extended_status (opcode 58/59) — header has object_length byte
|
|
# --------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_extended_status_request_layout_matches_spec() -> None:
|
|
captured: list[bytes] = []
|
|
|
|
def reply(_op: int, payload: bytes) -> Message:
|
|
captured.append(payload)
|
|
# Reply: object_type, object_length=4, then one zone record (4 bytes).
|
|
body = bytes([int(ObjectType.ZONE), 4]) + bytes([0, 5, 0x00, 100])
|
|
return encode_v2(OmniLink2MessageType.ExtendedStatus, body)
|
|
|
|
stub = _StubConn(reply_factory=reply)
|
|
client = _make_client(stub)
|
|
zones = await client.get_extended_status(ObjectType.ZONE, 5, 5)
|
|
assert captured[0] == struct.pack(">BHH", int(ObjectType.ZONE), 5, 5)
|
|
assert stub.calls[0].opcode == int(
|
|
OmniLink2MessageType.RequestExtendedStatus
|
|
)
|
|
assert len(zones) == 1
|
|
assert zones[0].index == 5 # type: ignore[union-attr]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_extended_status_uses_object_length_byte_for_record_size() -> None:
|
|
"""ExtendedStatus thermostat record is 14 bytes (clsOL2MsgExtendedStatus.cs:138-235)."""
|
|
record = bytes(
|
|
[
|
|
0, 1, # number = 1
|
|
0, # status
|
|
140, # current temp raw
|
|
120, 160, # heat / cool setpoints
|
|
int(HvacMode.AUTO),
|
|
int(FanMode.AUTO),
|
|
int(HoldMode.OFF),
|
|
150, # humidity raw
|
|
120, 160, # humidify / dehumidify setpoints
|
|
130, # outdoor temp raw
|
|
1, # H or C status
|
|
]
|
|
)
|
|
assert len(record) == 14
|
|
|
|
def reply(_op: int, _pl: bytes) -> Message:
|
|
body = bytes([int(ObjectType.THERMOSTAT), 14]) + record
|
|
return encode_v2(OmniLink2MessageType.ExtendedStatus, body)
|
|
|
|
stub = _StubConn(reply_factory=reply)
|
|
client = _make_client(stub)
|
|
out = await client.get_extended_status(ObjectType.THERMOSTAT, 1)
|
|
assert len(out) == 1
|
|
t = out[0]
|
|
assert isinstance(t, ThermostatStatus)
|
|
assert t.index == 1
|
|
assert t.temperature_raw == 140
|
|
assert t.system_mode == int(HvacMode.AUTO)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_extended_status_returns_empty_on_eod() -> None:
|
|
stub = _StubConn(
|
|
reply_factory=lambda _op, _pl: encode_v2(OmniLink2MessageType.EOD)
|
|
)
|
|
client = _make_client(stub)
|
|
out = await client.get_extended_status(ObjectType.AREA, 1, 8)
|
|
assert out == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_extended_status_area_record_parses_to_areastatus() -> None:
|
|
"""Area ExtendedStatus record is 6 bytes
|
|
(clsOL2MsgExtendedStatus.cs:75-118): number(2) + mode + alarms + entry +
|
|
exit (matches our AreaStatus.parse).
|
|
"""
|
|
|
|
def reply(_op: int, _pl: bytes) -> Message:
|
|
# area 1, mode AWAY, alarms 0, entry 0, exit 30
|
|
record = bytes([0, 1, int(SecurityMode.AWAY), 0, 0, 30])
|
|
body = bytes([int(ObjectType.AREA), 6]) + record
|
|
return encode_v2(OmniLink2MessageType.ExtendedStatus, body)
|
|
|
|
stub = _StubConn(reply_factory=reply)
|
|
client = _make_client(stub)
|
|
out = await client.get_extended_status(ObjectType.AREA, 1)
|
|
assert len(out) == 1
|
|
a = out[0]
|
|
assert isinstance(a, AreaStatus)
|
|
assert a.index == 1
|
|
assert a.mode == int(SecurityMode.AWAY)
|
|
assert a.exit_timer_secs == 30
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_object_status_validates_range() -> None:
|
|
stub = _StubConn()
|
|
client = _make_client(stub)
|
|
with pytest.raises(ValueError, match="end"):
|
|
await client.get_object_status(ObjectType.ZONE, 5, 3)
|
|
assert stub.calls == []
|