A Python port of the Invisible Internet Project (I2P)
at main 121 lines 3.6 kB view raw
1"""Job and TimedJob — abstract scheduled tasks for the I2P router. 2 3Ported from net.i2p.router.JobImpl and net.i2p.util.SimpleTimer / TimedEvent. 4 5Jobs are the unit of work in the router's job queue. Each job has a name, 6a priority (lower number = higher priority), and a scheduled execution time 7expressed as a `time.monotonic()` value. Jobs are comparable so they can 8live in a heapq ordered by (scheduled_at, priority, job_id). 9""" 10 11from __future__ import annotations 12 13import itertools 14import time 15from abc import ABC, abstractmethod 16from typing import Any 17 18 19_id_counter = itertools.count() 20 21 22class Job(ABC): 23 """Abstract scheduled task. 24 25 Parameters 26 ---------- 27 name: 28 Human-readable job name (used in logging). 29 priority: 30 Numeric priority. Lower number = higher priority. 31 """ 32 33 def __init__(self, name: str = "job", priority: int = 5) -> None: 34 self.name: str = name 35 self.priority: int = priority 36 self.scheduled_at: float = 0.0 37 self.job_id: int = next(_id_counter) 38 self._cancelled: bool = False 39 40 # -- Public accessors --------------------------------------------------- 41 42 def get_name(self) -> str: 43 return self.name 44 45 def get_priority(self) -> int: 46 return self.priority 47 48 # -- Comparison (for heapq) --------------------------------------------- 49 50 def _sort_key(self) -> tuple[float, int, int]: 51 return (self.scheduled_at, self.priority, self.job_id) 52 53 def __lt__(self, other: Job) -> bool: 54 return self._sort_key() < other._sort_key() 55 56 def __le__(self, other: Job) -> bool: 57 return self._sort_key() <= other._sort_key() 58 59 def __gt__(self, other: Job) -> bool: 60 return self._sort_key() > other._sort_key() 61 62 def __ge__(self, other: Job) -> bool: 63 return self._sort_key() >= other._sort_key() 64 65 def __eq__(self, other: object) -> bool: 66 if not isinstance(other, Job): 67 return NotImplemented 68 return self.job_id == other.job_id 69 70 def __hash__(self) -> int: 71 return hash(self.job_id) 72 73 # -- Abstract ----------------------------------------------------------- 74 75 @abstractmethod 76 def execute(self, context: Any) -> None: 77 """Execute this job. *context* is typically a RouterContext.""" 78 ... 79 80 81class TimedJob(Job): 82 """Job that re-schedules itself at a fixed interval after each execution. 83 84 Subclasses implement :meth:`run` instead of :meth:`execute`. 85 86 Parameters 87 ---------- 88 interval_seconds: 89 Delay between successive executions. 90 """ 91 92 def __init__( 93 self, 94 name: str = "timed-job", 95 priority: int = 5, 96 interval_seconds: float = 60.0, 97 ) -> None: 98 super().__init__(name=name, priority=priority) 99 self.interval_seconds: float = interval_seconds 100 self._queue: JobQueue | None = None # set by JobQueue on schedule 101 102 def execute(self, context: Any) -> None: 103 self.run(context) 104 # Re-schedule on the same queue 105 if self._queue is not None and self._queue._running: 106 self.scheduled_at = time.monotonic() + self.interval_seconds 107 self._queue._push(self) 108 109 @abstractmethod 110 def run(self, context: Any) -> None: 111 """Subclass work goes here.""" 112 ... 113 114 115# Avoid circular import — JobQueue is only needed at runtime for TimedJob 116# re-scheduling. The _queue attribute is set by JobQueue.schedule() and 117# typed via TYPE_CHECKING below. 118from typing import TYPE_CHECKING 119 120if TYPE_CHECKING: 121 from i2p_router.job_queue import JobQueue