A Python port of the Invisible Internet Project (I2P)
at main 82 lines 2.7 kB view raw
1"""EventDispatcher — event notification system. 2 3Ported from net.i2p.util.EventDispatcher and EventDispatcherImpl. 4""" 5 6import threading 7from typing import Any, Optional, Set 8 9 10class EventDispatcher: 11 """Event dispatching with notification chains. 12 13 Allows objects to receive and notify data events 14 (string -> object associations). 15 """ 16 17 def __init__(self) -> None: 18 self._events: dict[str, Any] = {} 19 self._lock = threading.Lock() 20 self._conditions: dict[str, threading.Condition] = {} 21 self._attached: list["EventDispatcher"] = [] 22 self._ignored = False 23 24 def attach_event_dispatcher(self, other: "EventDispatcher") -> None: 25 with self._lock: 26 if other not in self._attached: 27 self._attached.append(other) 28 29 def detach_event_dispatcher(self, other: "EventDispatcher") -> None: 30 with self._lock: 31 if other in self._attached: 32 self._attached.remove(other) 33 34 def notify_event(self, event: str, args: Any = None) -> None: 35 """Deliver an event with optional data.""" 36 if self._ignored: 37 return 38 with self._lock: 39 self._events[event] = args 40 if event in self._conditions: 41 self._conditions[event].notify_all() 42 for dispatcher in self._attached: 43 dispatcher.notify_event(event, args) 44 45 def get_event_value(self, name: str) -> Optional[Any]: 46 """Get the value associated with an event.""" 47 with self._lock: 48 return self._events.get(name) 49 50 def get_events(self) -> Set[str]: 51 """Get all event names that have been received.""" 52 with self._lock: 53 return set(self._events.keys()) 54 55 def ignore_events(self) -> None: 56 self._ignored = True 57 58 def un_ignore_events(self) -> None: 59 self._ignored = False 60 61 def wait_event_value(self, name: str, timeout: Optional[float] = None) -> Optional[Any]: 62 """Wait until the given event has received a value. 63 64 Args: 65 name: event name to wait for 66 timeout: max seconds to wait, None for indefinite 67 68 Returns: 69 The event value, or None if timed out. 70 """ 71 with self._lock: 72 if name in self._events: 73 return self._events[name] 74 if name not in self._conditions: 75 self._conditions[name] = threading.Condition(self._lock) 76 cond = self._conditions[name] 77 78 with cond: 79 while name not in self._events: 80 if not cond.wait(timeout): 81 return None # Timed out 82 return self._events.get(name)