A Python port of the Invisible Internet Project (I2P)
at main 57 lines 1.7 kB view raw
1"""I2PThread — Thread wrapper with error handling. 2 3Ported from net.i2p.util.I2PThread. 4""" 5 6import sys 7import threading 8import traceback 9from typing import Callable, Optional, Protocol, Set 10 11 12class OOMEventListener(Protocol): 13 def out_of_memory(self, error: MemoryError) -> None: ... 14 15 16class I2PThread(threading.Thread): 17 """Thread with error logging and OOM notification. 18 19 Sets daemon=True by default (matches Java's I2PAppThread behavior). 20 Catches and logs unexpected exceptions. 21 """ 22 23 _listeners: Set[OOMEventListener] = set() 24 _listeners_lock = threading.Lock() 25 26 def __init__(self, target: Optional[Callable] = None, 27 name: Optional[str] = None, 28 daemon: bool = True, 29 **kwargs) -> None: 30 super().__init__(target=target, name=name, daemon=daemon, **kwargs) 31 32 def run(self) -> None: 33 try: 34 super().run() 35 except MemoryError as e: 36 self._fire_oom(e) 37 except Exception: 38 print(f"Thread terminated unexpectedly: {self.name}", file=sys.stderr) 39 traceback.print_exc() 40 41 def _fire_oom(self, error: MemoryError) -> None: 42 with self._listeners_lock: 43 for listener in self._listeners: 44 try: 45 listener.out_of_memory(error) 46 except Exception: 47 pass 48 49 @classmethod 50 def add_oom_listener(cls, listener: OOMEventListener) -> None: 51 with cls._listeners_lock: 52 cls._listeners.add(listener) 53 54 @classmethod 55 def remove_oom_listener(cls, listener: OOMEventListener) -> None: 56 with cls._listeners_lock: 57 cls._listeners.discard(listener)