"""SimpleTimer2 — scheduled task executor. Ported from net.i2p.util.SimpleTimer2. Uses Python's concurrent.futures.ThreadPoolExecutor + threading.Timer. """ import threading import time from concurrent.futures import ThreadPoolExecutor from typing import Callable, Optional class SimpleTimer2: """Scheduled task executor using a thread pool. Replacement for Java's SimpleScheduler and SimpleTimer2. """ _instance: Optional["SimpleTimer2"] = None _lock = threading.Lock() def __init__(self, name: str = "SimpleTimer2", max_workers: int = 4) -> None: self._name = name self._executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix=name) self._timers: list[threading.Timer] = [] self._timers_lock = threading.Lock() self._shutdown = False @classmethod def get_instance(cls) -> "SimpleTimer2": if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = SimpleTimer2() return cls._instance def add_event(self, task: Callable[[], None], delay_ms: int) -> threading.Timer: """Schedule a one-shot task after delay_ms milliseconds.""" if self._shutdown: raise RuntimeError("Timer is shut down") delay_sec = delay_ms / 1000.0 def _run(): try: task() except Exception as e: import sys print(f"Timer task failed: {e}", file=sys.stderr) finally: with self._timers_lock: if t in self._timers: self._timers.remove(t) t = threading.Timer(delay_sec, lambda: self._executor.submit(_run)) t.daemon = True with self._timers_lock: self._timers.append(t) t.start() return t def add_periodic_event(self, task: Callable[[], None], period_ms: int, initial_delay_ms: int = 0) -> "PeriodicEvent": """Schedule a recurring task.""" event = PeriodicEvent(self, task, period_ms, initial_delay_ms) event.start() return event def cancel(self, timer: threading.Timer) -> None: """Cancel a scheduled event.""" timer.cancel() with self._timers_lock: if timer in self._timers: self._timers.remove(timer) def shutdown(self) -> None: """Stop all pending events and shut down the executor.""" self._shutdown = True with self._timers_lock: for t in self._timers: t.cancel() self._timers.clear() self._executor.shutdown(wait=False) class PeriodicEvent: """A recurring scheduled event.""" def __init__(self, timer: SimpleTimer2, task: Callable[[], None], period_ms: int, initial_delay_ms: int = 0) -> None: self._timer = timer self._task = task self._period_sec = period_ms / 1000.0 self._initial_delay_sec = initial_delay_ms / 1000.0 self._cancelled = False self._current_timer: Optional[threading.Timer] = None def start(self) -> None: delay = self._initial_delay_sec if self._initial_delay_sec > 0 else self._period_sec self._schedule(delay) def _schedule(self, delay: float) -> None: if self._cancelled: return t = threading.Timer(delay, self._run) t.daemon = True self._current_timer = t t.start() def _run(self) -> None: if self._cancelled: return try: self._task() except Exception as e: import sys print(f"Periodic task failed: {e}", file=sys.stderr) self._schedule(self._period_sec) def cancel(self) -> None: self._cancelled = True if self._current_timer: self._current_timer.cancel()