A Python port of the Invisible Internet Project (I2P)
1"""I2PThread — Thread wrapper with error handling.
2
3Ported from net.i2p.util.I2PThread.
4"""
5
6import sys
7import threading
8import traceback
9from typing import Callable, Optional, Protocol, Set
10
11
12class OOMEventListener(Protocol):
13 def out_of_memory(self, error: MemoryError) -> None: ...
14
15
16class I2PThread(threading.Thread):
17 """Thread with error logging and OOM notification.
18
19 Sets daemon=True by default (matches Java's I2PAppThread behavior).
20 Catches and logs unexpected exceptions.
21 """
22
23 _listeners: Set[OOMEventListener] = set()
24 _listeners_lock = threading.Lock()
25
26 def __init__(self, target: Optional[Callable] = None,
27 name: Optional[str] = None,
28 daemon: bool = True,
29 **kwargs) -> None:
30 super().__init__(target=target, name=name, daemon=daemon, **kwargs)
31
32 def run(self) -> None:
33 try:
34 super().run()
35 except MemoryError as e:
36 self._fire_oom(e)
37 except Exception:
38 print(f"Thread terminated unexpectedly: {self.name}", file=sys.stderr)
39 traceback.print_exc()
40
41 def _fire_oom(self, error: MemoryError) -> None:
42 with self._listeners_lock:
43 for listener in self._listeners:
44 try:
45 listener.out_of_memory(error)
46 except Exception:
47 pass
48
49 @classmethod
50 def add_oom_listener(cls, listener: OOMEventListener) -> None:
51 with cls._listeners_lock:
52 cls._listeners.add(listener)
53
54 @classmethod
55 def remove_oom_listener(cls, listener: OOMEventListener) -> None:
56 with cls._listeners_lock:
57 cls._listeners.discard(listener)