"""JobQueue — priority queue for scheduled router jobs. Ported from net.i2p.router.JobQueue. Uses a binary heap (`heapq`) keyed on (scheduled_at, priority, job_id) so that the earliest, highest-priority job is always at the front. """ from __future__ import annotations import heapq import logging import time from typing import Any from i2p_router.job import Job, TimedJob log = logging.getLogger(__name__) class JobQueue: """Priority queue for scheduled router jobs. Jobs are ordered by their ``scheduled_at`` timestamp (ascending), then by priority (lower number first), then by ``job_id`` for deterministic tie-breaking. """ def __init__(self) -> None: self._jobs: list[Job] = [] self._running: bool = True # -- Internal ----------------------------------------------------------- def _push(self, job: Job) -> None: """Push a job onto the heap (internal, no state checks).""" job._cancelled = False heapq.heappush(self._jobs, job) # -- Public API --------------------------------------------------------- def schedule(self, job: Job, delay_seconds: float = 0) -> None: """Add *job* to the queue, to be executed after *delay_seconds*. Parameters ---------- job: The job to schedule. delay_seconds: Seconds from now until the job becomes eligible for execution. Defaults to 0 (immediately eligible). Raises ------ RuntimeError If the queue has been shut down. """ if not self._running: raise RuntimeError("JobQueue has been shut down") job.scheduled_at = time.monotonic() + delay_seconds # Let TimedJob know which queue it belongs to so it can re-schedule. if isinstance(job, TimedJob): job._queue = self self._push(job) log.debug("Scheduled job %s at %.3f", job.get_name(), job.scheduled_at) def cancel(self, job: Job) -> bool: """Cancel a pending job. The job is marked cancelled and lazily removed during :meth:`execute_ready_jobs`. Returns ------- bool True if the job was found and cancelled, False otherwise. """ if job in self._jobs: job._cancelled = True # Eagerly rebuild the heap without the cancelled job self._jobs = [j for j in self._jobs if not j._cancelled] heapq.heapify(self._jobs) return True return False def execute_ready_jobs(self, context: Any) -> int: """Execute all jobs whose ``scheduled_at`` <= now. Jobs are popped in priority order. Each job's ``execute(context)`` is called synchronously. Parameters ---------- context: Passed through to each job's ``execute`` method (typically a RouterContext). Returns ------- int Number of jobs executed. """ if not self._running: return 0 now = time.monotonic() # Collect all ready jobs first, then sort by priority so that # among jobs whose scheduled_at <= now, higher priority runs first. ready: list[Job] = [] while self._jobs and self._jobs[0].scheduled_at <= now: job = heapq.heappop(self._jobs) if not job._cancelled: ready.append(job) # Sort by (priority, job_id) — lower priority number first ready.sort(key=lambda j: (j.priority, j.job_id)) for job in ready: log.debug("Executing job %s (priority=%d)", job.get_name(), job.get_priority()) job.execute(context) return len(ready) def pending_count(self) -> int: """Return the number of jobs currently in the queue.""" return len(self._jobs) def shutdown(self) -> None: """Clear the queue and prevent further scheduling.""" self._running = False self._jobs.clear() log.info("JobQueue shut down")