A Python port of the Invisible Internet Project (I2P)
at main 136 lines 4.1 kB view raw
1"""JobQueue — priority queue for scheduled router jobs. 2 3Ported from net.i2p.router.JobQueue. 4 5Uses a binary heap (`heapq`) keyed on (scheduled_at, priority, job_id) 6so that the earliest, highest-priority job is always at the front. 7""" 8 9from __future__ import annotations 10 11import heapq 12import logging 13import time 14from typing import Any 15 16from i2p_router.job import Job, TimedJob 17 18log = logging.getLogger(__name__) 19 20 21class JobQueue: 22 """Priority queue for scheduled router jobs. 23 24 Jobs are ordered by their ``scheduled_at`` timestamp (ascending), then 25 by priority (lower number first), then by ``job_id`` for deterministic 26 tie-breaking. 27 """ 28 29 def __init__(self) -> None: 30 self._jobs: list[Job] = [] 31 self._running: bool = True 32 33 # -- Internal ----------------------------------------------------------- 34 35 def _push(self, job: Job) -> None: 36 """Push a job onto the heap (internal, no state checks).""" 37 job._cancelled = False 38 heapq.heappush(self._jobs, job) 39 40 # -- Public API --------------------------------------------------------- 41 42 def schedule(self, job: Job, delay_seconds: float = 0) -> None: 43 """Add *job* to the queue, to be executed after *delay_seconds*. 44 45 Parameters 46 ---------- 47 job: 48 The job to schedule. 49 delay_seconds: 50 Seconds from now until the job becomes eligible for execution. 51 Defaults to 0 (immediately eligible). 52 53 Raises 54 ------ 55 RuntimeError 56 If the queue has been shut down. 57 """ 58 if not self._running: 59 raise RuntimeError("JobQueue has been shut down") 60 61 job.scheduled_at = time.monotonic() + delay_seconds 62 63 # Let TimedJob know which queue it belongs to so it can re-schedule. 64 if isinstance(job, TimedJob): 65 job._queue = self 66 67 self._push(job) 68 log.debug("Scheduled job %s at %.3f", job.get_name(), job.scheduled_at) 69 70 def cancel(self, job: Job) -> bool: 71 """Cancel a pending job. 72 73 The job is marked cancelled and lazily removed during 74 :meth:`execute_ready_jobs`. 75 76 Returns 77 ------- 78 bool 79 True if the job was found and cancelled, False otherwise. 80 """ 81 if job in self._jobs: 82 job._cancelled = True 83 # Eagerly rebuild the heap without the cancelled job 84 self._jobs = [j for j in self._jobs if not j._cancelled] 85 heapq.heapify(self._jobs) 86 return True 87 return False 88 89 def execute_ready_jobs(self, context: Any) -> int: 90 """Execute all jobs whose ``scheduled_at`` <= now. 91 92 Jobs are popped in priority order. Each job's ``execute(context)`` 93 is called synchronously. 94 95 Parameters 96 ---------- 97 context: 98 Passed through to each job's ``execute`` method (typically a 99 RouterContext). 100 101 Returns 102 ------- 103 int 104 Number of jobs executed. 105 """ 106 if not self._running: 107 return 0 108 109 now = time.monotonic() 110 111 # Collect all ready jobs first, then sort by priority so that 112 # among jobs whose scheduled_at <= now, higher priority runs first. 113 ready: list[Job] = [] 114 while self._jobs and self._jobs[0].scheduled_at <= now: 115 job = heapq.heappop(self._jobs) 116 if not job._cancelled: 117 ready.append(job) 118 119 # Sort by (priority, job_id) — lower priority number first 120 ready.sort(key=lambda j: (j.priority, j.job_id)) 121 122 for job in ready: 123 log.debug("Executing job %s (priority=%d)", job.get_name(), job.get_priority()) 124 job.execute(context) 125 126 return len(ready) 127 128 def pending_count(self) -> int: 129 """Return the number of jobs currently in the queue.""" 130 return len(self._jobs) 131 132 def shutdown(self) -> None: 133 """Clear the queue and prevent further scheduling.""" 134 self._running = False 135 self._jobs.clear() 136 log.info("JobQueue shut down")