A Python port of the Invisible Internet Project (I2P)
1"""Job and TimedJob — abstract scheduled tasks for the I2P router.
2
3Ported from net.i2p.router.JobImpl and net.i2p.util.SimpleTimer / TimedEvent.
4
5Jobs are the unit of work in the router's job queue. Each job has a name,
6a priority (lower number = higher priority), and a scheduled execution time
7expressed as a `time.monotonic()` value. Jobs are comparable so they can
8live in a heapq ordered by (scheduled_at, priority, job_id).
9"""
10
11from __future__ import annotations
12
13import itertools
14import time
15from abc import ABC, abstractmethod
16from typing import Any
17
18
19_id_counter = itertools.count()
20
21
22class Job(ABC):
23 """Abstract scheduled task.
24
25 Parameters
26 ----------
27 name:
28 Human-readable job name (used in logging).
29 priority:
30 Numeric priority. Lower number = higher priority.
31 """
32
33 def __init__(self, name: str = "job", priority: int = 5) -> None:
34 self.name: str = name
35 self.priority: int = priority
36 self.scheduled_at: float = 0.0
37 self.job_id: int = next(_id_counter)
38 self._cancelled: bool = False
39
40 # -- Public accessors ---------------------------------------------------
41
42 def get_name(self) -> str:
43 return self.name
44
45 def get_priority(self) -> int:
46 return self.priority
47
48 # -- Comparison (for heapq) ---------------------------------------------
49
50 def _sort_key(self) -> tuple[float, int, int]:
51 return (self.scheduled_at, self.priority, self.job_id)
52
53 def __lt__(self, other: Job) -> bool:
54 return self._sort_key() < other._sort_key()
55
56 def __le__(self, other: Job) -> bool:
57 return self._sort_key() <= other._sort_key()
58
59 def __gt__(self, other: Job) -> bool:
60 return self._sort_key() > other._sort_key()
61
62 def __ge__(self, other: Job) -> bool:
63 return self._sort_key() >= other._sort_key()
64
65 def __eq__(self, other: object) -> bool:
66 if not isinstance(other, Job):
67 return NotImplemented
68 return self.job_id == other.job_id
69
70 def __hash__(self) -> int:
71 return hash(self.job_id)
72
73 # -- Abstract -----------------------------------------------------------
74
75 @abstractmethod
76 def execute(self, context: Any) -> None:
77 """Execute this job. *context* is typically a RouterContext."""
78 ...
79
80
81class TimedJob(Job):
82 """Job that re-schedules itself at a fixed interval after each execution.
83
84 Subclasses implement :meth:`run` instead of :meth:`execute`.
85
86 Parameters
87 ----------
88 interval_seconds:
89 Delay between successive executions.
90 """
91
92 def __init__(
93 self,
94 name: str = "timed-job",
95 priority: int = 5,
96 interval_seconds: float = 60.0,
97 ) -> None:
98 super().__init__(name=name, priority=priority)
99 self.interval_seconds: float = interval_seconds
100 self._queue: JobQueue | None = None # set by JobQueue on schedule
101
102 def execute(self, context: Any) -> None:
103 self.run(context)
104 # Re-schedule on the same queue
105 if self._queue is not None and self._queue._running:
106 self.scheduled_at = time.monotonic() + self.interval_seconds
107 self._queue._push(self)
108
109 @abstractmethod
110 def run(self, context: Any) -> None:
111 """Subclass work goes here."""
112 ...
113
114
115# Avoid circular import — JobQueue is only needed at runtime for TimedJob
116# re-scheduling. The _queue attribute is set by JobQueue.schedule() and
117# typed via TYPE_CHECKING below.
118from typing import TYPE_CHECKING
119
120if TYPE_CHECKING:
121 from i2p_router.job_queue import JobQueue