"""Async event history and waiting system for ESP32 events.""" from __future__ import annotations import asyncio from collections.abc import Callable from dataclasses import dataclass from .protocol import Event @dataclass class _EventWaiter: """A pending wait condition with its associated future.""" match_fn: Callable[[Event], bool] future: asyncio.Future[Event] class EventQueue: """Accumulates ESP32 events in a bounded history and allows callers to wait for specific events. Thread-safe for push operations via an asyncio lock. All async methods must be called from the same event loop. """ def __init__(self, max_events: int = 1000) -> None: self._events: list[Event] = [] self._max_events: int = max_events self._waiters: list[_EventWaiter] = [] self._lock: asyncio.Lock = asyncio.Lock() def push(self, event: Event) -> None: """Append an event to history and resolve any matching waiters. Trims oldest events when the history exceeds *max_events*. """ self._events.append(event) # Trim oldest if over capacity overflow = len(self._events) - self._max_events if overflow > 0: del self._events[:overflow] # Resolve matching waiters (iterate a copy so removal is safe) resolved: list[_EventWaiter] = [] for waiter in self._waiters: if waiter.future.done(): resolved.append(waiter) continue try: if waiter.match_fn(event): waiter.future.set_result(event) resolved.append(waiter) except Exception: # Bad match function — don't let it break the queue resolved.append(waiter) for waiter in resolved: try: self._waiters.remove(waiter) except ValueError: pass async def wait_for( self, event_name: str | None = None, match: Callable[[Event], bool] | None = None, timeout: float = 30.0, ) -> Event: """Wait for an event matching the given criteria. Args: event_name: If provided, match events where ``event.event == event_name``. match: Optional custom predicate. If both *event_name* and *match* are given they are combined with AND logic. timeout: Seconds to wait before raising ``asyncio.TimeoutError``. Returns: The first matching ``Event``. Raises: asyncio.TimeoutError: If no matching event arrives within *timeout*. ValueError: If neither *event_name* nor *match* is provided. """ if event_name is None and match is None: raise ValueError("at least one of event_name or match must be provided") match_fn = _build_match_fn(event_name, match) # Check existing history (most recent first) for an immediate match async with self._lock: for event in reversed(self._events): try: if match_fn(event): return event except Exception: continue # No existing match — register a waiter loop = asyncio.get_running_loop() future: asyncio.Future[Event] = loop.create_future() waiter = _EventWaiter(match_fn=match_fn, future=future) async with self._lock: self._waiters.append(waiter) try: return await asyncio.wait_for(future, timeout=timeout) except TimeoutError: # Clean up the waiter on timeout try: self._waiters.remove(waiter) except ValueError: pass raise def get_events( self, event_name: str | None = None, limit: int = 50, since_ts: int | None = None, ) -> list[Event]: """Return recent events from history, optionally filtered. Args: event_name: Only include events with this name. limit: Maximum number of events to return. since_ts: Only include events with ``ts >= since_ts`` (millisecond timestamp). Returns: A list of matching events, most recent last, capped at *limit*. """ filtered: list[Event] = [] for event in self._events: if event_name is not None and event.event != event_name: continue if since_ts is not None and (event.ts is None or event.ts < since_ts): continue filtered.append(event) # Return the most recent `limit` entries if len(filtered) > limit: return filtered[-limit:] return filtered def clear(self) -> None: """Clear the event history. Active waiters are not cancelled.""" self._events.clear() def __len__(self) -> int: """Return the number of stored events.""" return len(self._events) def _build_match_fn( event_name: str | None, match: Callable[[Event], bool] | None, ) -> Callable[[Event], bool]: """Combine event_name and custom match predicate into a single callable.""" if event_name is not None and match is not None: return lambda e: e.event == event_name and match(e) if event_name is not None: return lambda e: e.event == event_name # match is guaranteed non-None by the caller's validation assert match is not None return match