"""Job and TimedJob — abstract scheduled tasks for the I2P router. Ported from net.i2p.router.JobImpl and net.i2p.util.SimpleTimer / TimedEvent. Jobs are the unit of work in the router's job queue. Each job has a name, a priority (lower number = higher priority), and a scheduled execution time expressed as a `time.monotonic()` value. Jobs are comparable so they can live in a heapq ordered by (scheduled_at, priority, job_id). """ from __future__ import annotations import itertools import time from abc import ABC, abstractmethod from typing import Any _id_counter = itertools.count() class Job(ABC): """Abstract scheduled task. Parameters ---------- name: Human-readable job name (used in logging). priority: Numeric priority. Lower number = higher priority. """ def __init__(self, name: str = "job", priority: int = 5) -> None: self.name: str = name self.priority: int = priority self.scheduled_at: float = 0.0 self.job_id: int = next(_id_counter) self._cancelled: bool = False # -- Public accessors --------------------------------------------------- def get_name(self) -> str: return self.name def get_priority(self) -> int: return self.priority # -- Comparison (for heapq) --------------------------------------------- def _sort_key(self) -> tuple[float, int, int]: return (self.scheduled_at, self.priority, self.job_id) def __lt__(self, other: Job) -> bool: return self._sort_key() < other._sort_key() def __le__(self, other: Job) -> bool: return self._sort_key() <= other._sort_key() def __gt__(self, other: Job) -> bool: return self._sort_key() > other._sort_key() def __ge__(self, other: Job) -> bool: return self._sort_key() >= other._sort_key() def __eq__(self, other: object) -> bool: if not isinstance(other, Job): return NotImplemented return self.job_id == other.job_id def __hash__(self) -> int: return hash(self.job_id) # -- Abstract ----------------------------------------------------------- @abstractmethod def execute(self, context: Any) -> None: """Execute this job. *context* is typically a RouterContext.""" ... class TimedJob(Job): """Job that re-schedules itself at a fixed interval after each execution. Subclasses implement :meth:`run` instead of :meth:`execute`. Parameters ---------- interval_seconds: Delay between successive executions. """ def __init__( self, name: str = "timed-job", priority: int = 5, interval_seconds: float = 60.0, ) -> None: super().__init__(name=name, priority=priority) self.interval_seconds: float = interval_seconds self._queue: JobQueue | None = None # set by JobQueue on schedule def execute(self, context: Any) -> None: self.run(context) # Re-schedule on the same queue if self._queue is not None and self._queue._running: self.scheduled_at = time.monotonic() + self.interval_seconds self._queue._push(self) @abstractmethod def run(self, context: Any) -> None: """Subclass work goes here.""" ... # Avoid circular import — JobQueue is only needed at runtime for TimedJob # re-scheduling. The _queue attribute is set by JobQueue.schedule() and # typed via TYPE_CHECKING below. from typing import TYPE_CHECKING if TYPE_CHECKING: from i2p_router.job_queue import JobQueue