"""I2PThread — Thread wrapper with error handling. Ported from net.i2p.util.I2PThread. """ import sys import threading import traceback from typing import Callable, Optional, Protocol, Set class OOMEventListener(Protocol): def out_of_memory(self, error: MemoryError) -> None: ... class I2PThread(threading.Thread): """Thread with error logging and OOM notification. Sets daemon=True by default (matches Java's I2PAppThread behavior). Catches and logs unexpected exceptions. """ _listeners: Set[OOMEventListener] = set() _listeners_lock = threading.Lock() def __init__(self, target: Optional[Callable] = None, name: Optional[str] = None, daemon: bool = True, **kwargs) -> None: super().__init__(target=target, name=name, daemon=daemon, **kwargs) def run(self) -> None: try: super().run() except MemoryError as e: self._fire_oom(e) except Exception: print(f"Thread terminated unexpectedly: {self.name}", file=sys.stderr) traceback.print_exc() def _fire_oom(self, error: MemoryError) -> None: with self._listeners_lock: for listener in self._listeners: try: listener.out_of_memory(error) except Exception: pass @classmethod def add_oom_listener(cls, listener: OOMEventListener) -> None: with cls._listeners_lock: cls._listeners.add(listener) @classmethod def remove_oom_listener(cls, listener: OOMEventListener) -> None: with cls._listeners_lock: cls._listeners.discard(listener)