A Python port of the Invisible Internet Project (I2P)
at main 124 lines 4.0 kB view raw
1"""SimpleTimer2 — scheduled task executor. 2 3Ported from net.i2p.util.SimpleTimer2. 4Uses Python's concurrent.futures.ThreadPoolExecutor + threading.Timer. 5""" 6 7import threading 8import time 9from concurrent.futures import ThreadPoolExecutor 10from typing import Callable, Optional 11 12 13class SimpleTimer2: 14 """Scheduled task executor using a thread pool. 15 16 Replacement for Java's SimpleScheduler and SimpleTimer2. 17 """ 18 19 _instance: Optional["SimpleTimer2"] = None 20 _lock = threading.Lock() 21 22 def __init__(self, name: str = "SimpleTimer2", max_workers: int = 4) -> None: 23 self._name = name 24 self._executor = ThreadPoolExecutor(max_workers=max_workers, 25 thread_name_prefix=name) 26 self._timers: list[threading.Timer] = [] 27 self._timers_lock = threading.Lock() 28 self._shutdown = False 29 30 @classmethod 31 def get_instance(cls) -> "SimpleTimer2": 32 if cls._instance is None: 33 with cls._lock: 34 if cls._instance is None: 35 cls._instance = SimpleTimer2() 36 return cls._instance 37 38 def add_event(self, task: Callable[[], None], delay_ms: int) -> threading.Timer: 39 """Schedule a one-shot task after delay_ms milliseconds.""" 40 if self._shutdown: 41 raise RuntimeError("Timer is shut down") 42 delay_sec = delay_ms / 1000.0 43 44 def _run(): 45 try: 46 task() 47 except Exception as e: 48 import sys 49 print(f"Timer task failed: {e}", file=sys.stderr) 50 finally: 51 with self._timers_lock: 52 if t in self._timers: 53 self._timers.remove(t) 54 55 t = threading.Timer(delay_sec, lambda: self._executor.submit(_run)) 56 t.daemon = True 57 with self._timers_lock: 58 self._timers.append(t) 59 t.start() 60 return t 61 62 def add_periodic_event(self, task: Callable[[], None], 63 period_ms: int, 64 initial_delay_ms: int = 0) -> "PeriodicEvent": 65 """Schedule a recurring task.""" 66 event = PeriodicEvent(self, task, period_ms, initial_delay_ms) 67 event.start() 68 return event 69 70 def cancel(self, timer: threading.Timer) -> None: 71 """Cancel a scheduled event.""" 72 timer.cancel() 73 with self._timers_lock: 74 if timer in self._timers: 75 self._timers.remove(timer) 76 77 def shutdown(self) -> None: 78 """Stop all pending events and shut down the executor.""" 79 self._shutdown = True 80 with self._timers_lock: 81 for t in self._timers: 82 t.cancel() 83 self._timers.clear() 84 self._executor.shutdown(wait=False) 85 86 87class PeriodicEvent: 88 """A recurring scheduled event.""" 89 90 def __init__(self, timer: SimpleTimer2, task: Callable[[], None], 91 period_ms: int, initial_delay_ms: int = 0) -> None: 92 self._timer = timer 93 self._task = task 94 self._period_sec = period_ms / 1000.0 95 self._initial_delay_sec = initial_delay_ms / 1000.0 96 self._cancelled = False 97 self._current_timer: Optional[threading.Timer] = None 98 99 def start(self) -> None: 100 delay = self._initial_delay_sec if self._initial_delay_sec > 0 else self._period_sec 101 self._schedule(delay) 102 103 def _schedule(self, delay: float) -> None: 104 if self._cancelled: 105 return 106 t = threading.Timer(delay, self._run) 107 t.daemon = True 108 self._current_timer = t 109 t.start() 110 111 def _run(self) -> None: 112 if self._cancelled: 113 return 114 try: 115 self._task() 116 except Exception as e: 117 import sys 118 print(f"Periodic task failed: {e}", file=sys.stderr) 119 self._schedule(self._period_sec) 120 121 def cancel(self) -> None: 122 self._cancelled = True 123 if self._current_timer: 124 self._current_timer.cancel()