A Python port of the Invisible Internet Project (I2P)
1"""SimpleTimer2 — scheduled task executor.
2
3Ported from net.i2p.util.SimpleTimer2.
4Uses Python's concurrent.futures.ThreadPoolExecutor + threading.Timer.
5"""
6
7import threading
8import time
9from concurrent.futures import ThreadPoolExecutor
10from typing import Callable, Optional
11
12
13class SimpleTimer2:
14 """Scheduled task executor using a thread pool.
15
16 Replacement for Java's SimpleScheduler and SimpleTimer2.
17 """
18
19 _instance: Optional["SimpleTimer2"] = None
20 _lock = threading.Lock()
21
22 def __init__(self, name: str = "SimpleTimer2", max_workers: int = 4) -> None:
23 self._name = name
24 self._executor = ThreadPoolExecutor(max_workers=max_workers,
25 thread_name_prefix=name)
26 self._timers: list[threading.Timer] = []
27 self._timers_lock = threading.Lock()
28 self._shutdown = False
29
30 @classmethod
31 def get_instance(cls) -> "SimpleTimer2":
32 if cls._instance is None:
33 with cls._lock:
34 if cls._instance is None:
35 cls._instance = SimpleTimer2()
36 return cls._instance
37
38 def add_event(self, task: Callable[[], None], delay_ms: int) -> threading.Timer:
39 """Schedule a one-shot task after delay_ms milliseconds."""
40 if self._shutdown:
41 raise RuntimeError("Timer is shut down")
42 delay_sec = delay_ms / 1000.0
43
44 def _run():
45 try:
46 task()
47 except Exception as e:
48 import sys
49 print(f"Timer task failed: {e}", file=sys.stderr)
50 finally:
51 with self._timers_lock:
52 if t in self._timers:
53 self._timers.remove(t)
54
55 t = threading.Timer(delay_sec, lambda: self._executor.submit(_run))
56 t.daemon = True
57 with self._timers_lock:
58 self._timers.append(t)
59 t.start()
60 return t
61
62 def add_periodic_event(self, task: Callable[[], None],
63 period_ms: int,
64 initial_delay_ms: int = 0) -> "PeriodicEvent":
65 """Schedule a recurring task."""
66 event = PeriodicEvent(self, task, period_ms, initial_delay_ms)
67 event.start()
68 return event
69
70 def cancel(self, timer: threading.Timer) -> None:
71 """Cancel a scheduled event."""
72 timer.cancel()
73 with self._timers_lock:
74 if timer in self._timers:
75 self._timers.remove(timer)
76
77 def shutdown(self) -> None:
78 """Stop all pending events and shut down the executor."""
79 self._shutdown = True
80 with self._timers_lock:
81 for t in self._timers:
82 t.cancel()
83 self._timers.clear()
84 self._executor.shutdown(wait=False)
85
86
87class PeriodicEvent:
88 """A recurring scheduled event."""
89
90 def __init__(self, timer: SimpleTimer2, task: Callable[[], None],
91 period_ms: int, initial_delay_ms: int = 0) -> None:
92 self._timer = timer
93 self._task = task
94 self._period_sec = period_ms / 1000.0
95 self._initial_delay_sec = initial_delay_ms / 1000.0
96 self._cancelled = False
97 self._current_timer: Optional[threading.Timer] = None
98
99 def start(self) -> None:
100 delay = self._initial_delay_sec if self._initial_delay_sec > 0 else self._period_sec
101 self._schedule(delay)
102
103 def _schedule(self, delay: float) -> None:
104 if self._cancelled:
105 return
106 t = threading.Timer(delay, self._run)
107 t.daemon = True
108 self._current_timer = t
109 t.start()
110
111 def _run(self) -> None:
112 if self._cancelled:
113 return
114 try:
115 self._task()
116 except Exception as e:
117 import sys
118 print(f"Periodic task failed: {e}", file=sys.stderr)
119 self._schedule(self._period_sec)
120
121 def cancel(self) -> None:
122 self._cancelled = True
123 if self._current_timer:
124 self._current_timer.cancel()