A Python port of the Invisible Internet Project (I2P)
1"""EventDispatcher — event notification system.
2
3Ported from net.i2p.util.EventDispatcher and EventDispatcherImpl.
4"""
5
6import threading
7from typing import Any, Optional, Set
8
9
10class EventDispatcher:
11 """Event dispatching with notification chains.
12
13 Allows objects to receive and notify data events
14 (string -> object associations).
15 """
16
17 def __init__(self) -> None:
18 self._events: dict[str, Any] = {}
19 self._lock = threading.Lock()
20 self._conditions: dict[str, threading.Condition] = {}
21 self._attached: list["EventDispatcher"] = []
22 self._ignored = False
23
24 def attach_event_dispatcher(self, other: "EventDispatcher") -> None:
25 with self._lock:
26 if other not in self._attached:
27 self._attached.append(other)
28
29 def detach_event_dispatcher(self, other: "EventDispatcher") -> None:
30 with self._lock:
31 if other in self._attached:
32 self._attached.remove(other)
33
34 def notify_event(self, event: str, args: Any = None) -> None:
35 """Deliver an event with optional data."""
36 if self._ignored:
37 return
38 with self._lock:
39 self._events[event] = args
40 if event in self._conditions:
41 self._conditions[event].notify_all()
42 for dispatcher in self._attached:
43 dispatcher.notify_event(event, args)
44
45 def get_event_value(self, name: str) -> Optional[Any]:
46 """Get the value associated with an event."""
47 with self._lock:
48 return self._events.get(name)
49
50 def get_events(self) -> Set[str]:
51 """Get all event names that have been received."""
52 with self._lock:
53 return set(self._events.keys())
54
55 def ignore_events(self) -> None:
56 self._ignored = True
57
58 def un_ignore_events(self) -> None:
59 self._ignored = False
60
61 def wait_event_value(self, name: str, timeout: Optional[float] = None) -> Optional[Any]:
62 """Wait until the given event has received a value.
63
64 Args:
65 name: event name to wait for
66 timeout: max seconds to wait, None for indefinite
67
68 Returns:
69 The event value, or None if timed out.
70 """
71 with self._lock:
72 if name in self._events:
73 return self._events[name]
74 if name not in self._conditions:
75 self._conditions[name] = threading.Condition(self._lock)
76 cond = self._conditions[name]
77
78 with cond:
79 while name not in self._events:
80 if not cond.wait(timeout):
81 return None # Timed out
82 return self._events.get(name)