"""Tests for Job, TimedJob, JobQueue, and built-in router jobs. TDD — these tests are written before the implementation. Ported from net.i2p.router.JobQueue / JobImpl / TimedEvent concepts. """ from __future__ import annotations import time from unittest.mock import MagicMock import pytest from i2p_router.job import Job, TimedJob from i2p_router.job_queue import JobQueue from i2p_router.router_jobs import ( TunnelBuildJob, TunnelRefreshJob, DatabaseSearchJob, PeerProfileUpdateJob, ) # --------------------------------------------------------------------------- # Concrete test doubles # --------------------------------------------------------------------------- class _SimpleJob(Job): """Minimal concrete Job for testing.""" def __init__(self, name: str = "test-job", priority: int = 5): super().__init__(name=name, priority=priority) self.executed = False self.exec_context = None def execute(self, context) -> None: self.executed = True self.exec_context = context class _CountingTimedJob(TimedJob): """TimedJob that counts how many times it ran.""" def __init__(self, name: str = "timed-test", priority: int = 5, interval_seconds: float = 1.0): super().__init__(name=name, priority=priority, interval_seconds=interval_seconds) self.run_count = 0 def run(self, context) -> None: self.run_count += 1 # --------------------------------------------------------------------------- # Job basics # --------------------------------------------------------------------------- class TestJob: """Job ABC basics — name, priority, comparison.""" def test_name(self): j = _SimpleJob(name="alpha") assert j.get_name() == "alpha" def test_priority(self): j = _SimpleJob(priority=3) assert j.get_priority() == 3 def test_execute(self): ctx = object() j = _SimpleJob() j.execute(ctx) assert j.executed is True assert j.exec_context is ctx def test_has_unique_id(self): a = _SimpleJob() b = _SimpleJob() assert a.job_id != b.job_id def test_ordering_by_scheduled_at(self): """Jobs with earlier scheduled_at sort first.""" a = _SimpleJob(priority=5) b = _SimpleJob(priority=5) a.scheduled_at = 1.0 b.scheduled_at = 2.0 assert a < b def test_ordering_by_priority_when_same_time(self): """Lower priority number = higher priority = sorts first.""" a = _SimpleJob(priority=1) b = _SimpleJob(priority=5) a.scheduled_at = 1.0 b.scheduled_at = 1.0 assert a < b def test_ordering_by_id_tiebreak(self): """Deterministic ordering when time and priority are equal.""" a = _SimpleJob(priority=5) b = _SimpleJob(priority=5) a.scheduled_at = 1.0 b.scheduled_at = 1.0 # One of them must sort before the other, no equality assert (a < b) != (b < a) # --------------------------------------------------------------------------- # TimedJob # --------------------------------------------------------------------------- class TestTimedJob: """TimedJob re-schedules itself after execution.""" def test_run_called(self): tj = _CountingTimedJob(interval_seconds=10.0) queue = JobQueue() queue.schedule(tj, delay_seconds=0) queue.execute_ready_jobs(context=None) assert tj.run_count == 1 def test_reschedules_after_execute(self): tj = _CountingTimedJob(interval_seconds=10.0) queue = JobQueue() queue.schedule(tj, delay_seconds=0) queue.execute_ready_jobs(context=None) # After execution the timed job should have re-added itself assert queue.pending_count() == 1 def test_rescheduled_time_is_in_future(self): tj = _CountingTimedJob(interval_seconds=5.0) queue = JobQueue() queue.schedule(tj, delay_seconds=0) before = time.monotonic() queue.execute_ready_jobs(context=None) # The re-scheduled time should be >= now + interval assert tj.scheduled_at >= before + 5.0 def test_interval_attribute(self): tj = _CountingTimedJob(interval_seconds=30.0) assert tj.interval_seconds == 30.0 # --------------------------------------------------------------------------- # JobQueue — scheduling and execution # --------------------------------------------------------------------------- class TestJobQueueScheduling: """schedule(), pending_count(), and basic queue behavior.""" def test_empty_queue(self): q = JobQueue() assert q.pending_count() == 0 def test_schedule_one(self): q = JobQueue() q.schedule(_SimpleJob()) assert q.pending_count() == 1 def test_schedule_multiple(self): q = JobQueue() for i in range(5): q.schedule(_SimpleJob(name=f"j{i}")) assert q.pending_count() == 5 def test_schedule_with_delay(self): q = JobQueue() j = _SimpleJob() q.schedule(j, delay_seconds=100) # Job is scheduled but not ready executed = q.execute_ready_jobs(context=None) assert executed == 0 assert j.executed is False def test_execute_ready_picks_ready_only(self): q = JobQueue() ready = _SimpleJob(name="ready") delayed = _SimpleJob(name="delayed") q.schedule(ready, delay_seconds=0) q.schedule(delayed, delay_seconds=999) executed = q.execute_ready_jobs(context=None) assert executed == 1 assert ready.executed is True assert delayed.executed is False def test_execute_ready_returns_count(self): q = JobQueue() for _ in range(3): q.schedule(_SimpleJob(), delay_seconds=0) assert q.execute_ready_jobs(context=None) == 3 def test_executed_jobs_removed_from_queue(self): q = JobQueue() q.schedule(_SimpleJob(), delay_seconds=0) q.execute_ready_jobs(context=None) assert q.pending_count() == 0 class TestJobQueuePriority: """Higher priority jobs (lower number) execute first.""" def test_priority_ordering(self): q = JobQueue() order = [] class _TrackingJob(Job): def __init__(self, name, priority): super().__init__(name=name, priority=priority) def execute(self, context): order.append(self.get_name()) q.schedule(_TrackingJob("low", priority=10), delay_seconds=0) q.schedule(_TrackingJob("high", priority=1), delay_seconds=0) q.schedule(_TrackingJob("medium", priority=5), delay_seconds=0) q.execute_ready_jobs(context=None) assert order == ["high", "medium", "low"] class TestJobQueueCancellation: """cancel() removes a job from the queue.""" def test_cancel_returns_true(self): q = JobQueue() j = _SimpleJob() q.schedule(j) assert q.cancel(j) is True def test_cancel_removes_from_queue(self): q = JobQueue() j = _SimpleJob() q.schedule(j) q.cancel(j) assert q.pending_count() == 0 def test_cancel_nonexistent_returns_false(self): q = JobQueue() j = _SimpleJob() assert q.cancel(j) is False def test_cancelled_job_not_executed(self): q = JobQueue() j = _SimpleJob() q.schedule(j, delay_seconds=0) q.cancel(j) q.execute_ready_jobs(context=None) assert j.executed is False class TestJobQueueShutdown: """shutdown() clears all pending jobs.""" def test_shutdown_clears_queue(self): q = JobQueue() for _ in range(5): q.schedule(_SimpleJob()) q.shutdown() assert q.pending_count() == 0 def test_shutdown_prevents_execution(self): q = JobQueue() j = _SimpleJob() q.schedule(j, delay_seconds=0) q.shutdown() executed = q.execute_ready_jobs(context=None) assert executed == 0 assert j.executed is False def test_schedule_after_shutdown_raises(self): q = JobQueue() q.shutdown() with pytest.raises(RuntimeError): q.schedule(_SimpleJob()) # --------------------------------------------------------------------------- # Built-in router jobs # --------------------------------------------------------------------------- class TestTunnelBuildJob: """TunnelBuildJob is a TimedJob with 60s interval.""" def test_is_timed_job(self): j = TunnelBuildJob() assert isinstance(j, TimedJob) def test_interval(self): j = TunnelBuildJob() assert j.interval_seconds == 60.0 def test_name(self): j = TunnelBuildJob() assert "tunnel" in j.get_name().lower() assert "build" in j.get_name().lower() def test_execute_runs_without_error(self): j = TunnelBuildJob() j.run(context=MagicMock()) # should not raise class TestTunnelRefreshJob: """TunnelRefreshJob is a TimedJob with 30s interval.""" def test_is_timed_job(self): j = TunnelRefreshJob() assert isinstance(j, TimedJob) def test_interval(self): j = TunnelRefreshJob() assert j.interval_seconds == 30.0 def test_name(self): j = TunnelRefreshJob() assert "tunnel" in j.get_name().lower() assert "refresh" in j.get_name().lower() def test_execute_runs_without_error(self): j = TunnelRefreshJob() j.run(context=MagicMock()) class TestDatabaseSearchJob: """DatabaseSearchJob is a one-shot Job (not TimedJob).""" def test_is_job_not_timed(self): j = DatabaseSearchJob() assert isinstance(j, Job) assert not isinstance(j, TimedJob) def test_name(self): j = DatabaseSearchJob() assert "database" in j.get_name().lower() assert "search" in j.get_name().lower() def test_execute_runs_without_error(self): j = DatabaseSearchJob() j.execute(context=MagicMock()) class TestPeerProfileUpdateJob: """PeerProfileUpdateJob is a TimedJob with 120s interval.""" def test_is_timed_job(self): j = PeerProfileUpdateJob() assert isinstance(j, TimedJob) def test_interval(self): j = PeerProfileUpdateJob() assert j.interval_seconds == 120.0 def test_name(self): j = PeerProfileUpdateJob() assert "peer" in j.get_name().lower() def test_execute_runs_without_error(self): j = PeerProfileUpdateJob() j.run(context=MagicMock())