A Python port of the Invisible Internet Project (I2P)
1"""JobQueue — priority queue for scheduled router jobs.
2
3Ported from net.i2p.router.JobQueue.
4
5Uses a binary heap (`heapq`) keyed on (scheduled_at, priority, job_id)
6so that the earliest, highest-priority job is always at the front.
7"""
8
9from __future__ import annotations
10
11import heapq
12import logging
13import time
14from typing import Any
15
16from i2p_router.job import Job, TimedJob
17
18log = logging.getLogger(__name__)
19
20
21class JobQueue:
22 """Priority queue for scheduled router jobs.
23
24 Jobs are ordered by their ``scheduled_at`` timestamp (ascending), then
25 by priority (lower number first), then by ``job_id`` for deterministic
26 tie-breaking.
27 """
28
29 def __init__(self) -> None:
30 self._jobs: list[Job] = []
31 self._running: bool = True
32
33 # -- Internal -----------------------------------------------------------
34
35 def _push(self, job: Job) -> None:
36 """Push a job onto the heap (internal, no state checks)."""
37 job._cancelled = False
38 heapq.heappush(self._jobs, job)
39
40 # -- Public API ---------------------------------------------------------
41
42 def schedule(self, job: Job, delay_seconds: float = 0) -> None:
43 """Add *job* to the queue, to be executed after *delay_seconds*.
44
45 Parameters
46 ----------
47 job:
48 The job to schedule.
49 delay_seconds:
50 Seconds from now until the job becomes eligible for execution.
51 Defaults to 0 (immediately eligible).
52
53 Raises
54 ------
55 RuntimeError
56 If the queue has been shut down.
57 """
58 if not self._running:
59 raise RuntimeError("JobQueue has been shut down")
60
61 job.scheduled_at = time.monotonic() + delay_seconds
62
63 # Let TimedJob know which queue it belongs to so it can re-schedule.
64 if isinstance(job, TimedJob):
65 job._queue = self
66
67 self._push(job)
68 log.debug("Scheduled job %s at %.3f", job.get_name(), job.scheduled_at)
69
70 def cancel(self, job: Job) -> bool:
71 """Cancel a pending job.
72
73 The job is marked cancelled and lazily removed during
74 :meth:`execute_ready_jobs`.
75
76 Returns
77 -------
78 bool
79 True if the job was found and cancelled, False otherwise.
80 """
81 if job in self._jobs:
82 job._cancelled = True
83 # Eagerly rebuild the heap without the cancelled job
84 self._jobs = [j for j in self._jobs if not j._cancelled]
85 heapq.heapify(self._jobs)
86 return True
87 return False
88
89 def execute_ready_jobs(self, context: Any) -> int:
90 """Execute all jobs whose ``scheduled_at`` <= now.
91
92 Jobs are popped in priority order. Each job's ``execute(context)``
93 is called synchronously.
94
95 Parameters
96 ----------
97 context:
98 Passed through to each job's ``execute`` method (typically a
99 RouterContext).
100
101 Returns
102 -------
103 int
104 Number of jobs executed.
105 """
106 if not self._running:
107 return 0
108
109 now = time.monotonic()
110
111 # Collect all ready jobs first, then sort by priority so that
112 # among jobs whose scheduled_at <= now, higher priority runs first.
113 ready: list[Job] = []
114 while self._jobs and self._jobs[0].scheduled_at <= now:
115 job = heapq.heappop(self._jobs)
116 if not job._cancelled:
117 ready.append(job)
118
119 # Sort by (priority, job_id) — lower priority number first
120 ready.sort(key=lambda j: (j.priority, j.job_id))
121
122 for job in ready:
123 log.debug("Executing job %s (priority=%d)", job.get_name(), job.get_priority())
124 job.execute(context)
125
126 return len(ready)
127
128 def pending_count(self) -> int:
129 """Return the number of jobs currently in the queue."""
130 return len(self._jobs)
131
132 def shutdown(self) -> None:
133 """Clear the queue and prevent further scheduling."""
134 self._running = False
135 self._jobs.clear()
136 log.info("JobQueue shut down")