"""EventDispatcher — event notification system. Ported from net.i2p.util.EventDispatcher and EventDispatcherImpl. """ import threading from typing import Any, Optional, Set class EventDispatcher: """Event dispatching with notification chains. Allows objects to receive and notify data events (string -> object associations). """ def __init__(self) -> None: self._events: dict[str, Any] = {} self._lock = threading.Lock() self._conditions: dict[str, threading.Condition] = {} self._attached: list["EventDispatcher"] = [] self._ignored = False def attach_event_dispatcher(self, other: "EventDispatcher") -> None: with self._lock: if other not in self._attached: self._attached.append(other) def detach_event_dispatcher(self, other: "EventDispatcher") -> None: with self._lock: if other in self._attached: self._attached.remove(other) def notify_event(self, event: str, args: Any = None) -> None: """Deliver an event with optional data.""" if self._ignored: return with self._lock: self._events[event] = args if event in self._conditions: self._conditions[event].notify_all() for dispatcher in self._attached: dispatcher.notify_event(event, args) def get_event_value(self, name: str) -> Optional[Any]: """Get the value associated with an event.""" with self._lock: return self._events.get(name) def get_events(self) -> Set[str]: """Get all event names that have been received.""" with self._lock: return set(self._events.keys()) def ignore_events(self) -> None: self._ignored = True def un_ignore_events(self) -> None: self._ignored = False def wait_event_value(self, name: str, timeout: Optional[float] = None) -> Optional[Any]: """Wait until the given event has received a value. Args: name: event name to wait for timeout: max seconds to wait, None for indefinite Returns: The event value, or None if timed out. """ with self._lock: if name in self._events: return self._events[name] if name not in self._conditions: self._conditions[name] = threading.Condition(self._lock) cond = self._conditions[name] with cond: while name not in self._events: if not cond.wait(timeout): return None # Timed out return self._events.get(name)