A Python port of the Invisible Internet Project (I2P)
at main 357 lines 11 kB view raw
1"""Tests for Job, TimedJob, JobQueue, and built-in router jobs. 2 3TDD — these tests are written before the implementation. 4Ported from net.i2p.router.JobQueue / JobImpl / TimedEvent concepts. 5""" 6 7from __future__ import annotations 8 9import time 10from unittest.mock import MagicMock 11 12import pytest 13 14from i2p_router.job import Job, TimedJob 15from i2p_router.job_queue import JobQueue 16from i2p_router.router_jobs import ( 17 TunnelBuildJob, 18 TunnelRefreshJob, 19 DatabaseSearchJob, 20 PeerProfileUpdateJob, 21) 22 23 24# --------------------------------------------------------------------------- 25# Concrete test doubles 26# --------------------------------------------------------------------------- 27 28class _SimpleJob(Job): 29 """Minimal concrete Job for testing.""" 30 31 def __init__(self, name: str = "test-job", priority: int = 5): 32 super().__init__(name=name, priority=priority) 33 self.executed = False 34 self.exec_context = None 35 36 def execute(self, context) -> None: 37 self.executed = True 38 self.exec_context = context 39 40 41class _CountingTimedJob(TimedJob): 42 """TimedJob that counts how many times it ran.""" 43 44 def __init__(self, name: str = "timed-test", priority: int = 5, 45 interval_seconds: float = 1.0): 46 super().__init__(name=name, priority=priority, 47 interval_seconds=interval_seconds) 48 self.run_count = 0 49 50 def run(self, context) -> None: 51 self.run_count += 1 52 53 54# --------------------------------------------------------------------------- 55# Job basics 56# --------------------------------------------------------------------------- 57 58class TestJob: 59 """Job ABC basics — name, priority, comparison.""" 60 61 def test_name(self): 62 j = _SimpleJob(name="alpha") 63 assert j.get_name() == "alpha" 64 65 def test_priority(self): 66 j = _SimpleJob(priority=3) 67 assert j.get_priority() == 3 68 69 def test_execute(self): 70 ctx = object() 71 j = _SimpleJob() 72 j.execute(ctx) 73 assert j.executed is True 74 assert j.exec_context is ctx 75 76 def test_has_unique_id(self): 77 a = _SimpleJob() 78 b = _SimpleJob() 79 assert a.job_id != b.job_id 80 81 def test_ordering_by_scheduled_at(self): 82 """Jobs with earlier scheduled_at sort first.""" 83 a = _SimpleJob(priority=5) 84 b = _SimpleJob(priority=5) 85 a.scheduled_at = 1.0 86 b.scheduled_at = 2.0 87 assert a < b 88 89 def test_ordering_by_priority_when_same_time(self): 90 """Lower priority number = higher priority = sorts first.""" 91 a = _SimpleJob(priority=1) 92 b = _SimpleJob(priority=5) 93 a.scheduled_at = 1.0 94 b.scheduled_at = 1.0 95 assert a < b 96 97 def test_ordering_by_id_tiebreak(self): 98 """Deterministic ordering when time and priority are equal.""" 99 a = _SimpleJob(priority=5) 100 b = _SimpleJob(priority=5) 101 a.scheduled_at = 1.0 102 b.scheduled_at = 1.0 103 # One of them must sort before the other, no equality 104 assert (a < b) != (b < a) 105 106 107# --------------------------------------------------------------------------- 108# TimedJob 109# --------------------------------------------------------------------------- 110 111class TestTimedJob: 112 """TimedJob re-schedules itself after execution.""" 113 114 def test_run_called(self): 115 tj = _CountingTimedJob(interval_seconds=10.0) 116 queue = JobQueue() 117 queue.schedule(tj, delay_seconds=0) 118 queue.execute_ready_jobs(context=None) 119 assert tj.run_count == 1 120 121 def test_reschedules_after_execute(self): 122 tj = _CountingTimedJob(interval_seconds=10.0) 123 queue = JobQueue() 124 queue.schedule(tj, delay_seconds=0) 125 queue.execute_ready_jobs(context=None) 126 # After execution the timed job should have re-added itself 127 assert queue.pending_count() == 1 128 129 def test_rescheduled_time_is_in_future(self): 130 tj = _CountingTimedJob(interval_seconds=5.0) 131 queue = JobQueue() 132 queue.schedule(tj, delay_seconds=0) 133 before = time.monotonic() 134 queue.execute_ready_jobs(context=None) 135 # The re-scheduled time should be >= now + interval 136 assert tj.scheduled_at >= before + 5.0 137 138 def test_interval_attribute(self): 139 tj = _CountingTimedJob(interval_seconds=30.0) 140 assert tj.interval_seconds == 30.0 141 142 143# --------------------------------------------------------------------------- 144# JobQueue — scheduling and execution 145# --------------------------------------------------------------------------- 146 147class TestJobQueueScheduling: 148 """schedule(), pending_count(), and basic queue behavior.""" 149 150 def test_empty_queue(self): 151 q = JobQueue() 152 assert q.pending_count() == 0 153 154 def test_schedule_one(self): 155 q = JobQueue() 156 q.schedule(_SimpleJob()) 157 assert q.pending_count() == 1 158 159 def test_schedule_multiple(self): 160 q = JobQueue() 161 for i in range(5): 162 q.schedule(_SimpleJob(name=f"j{i}")) 163 assert q.pending_count() == 5 164 165 def test_schedule_with_delay(self): 166 q = JobQueue() 167 j = _SimpleJob() 168 q.schedule(j, delay_seconds=100) 169 # Job is scheduled but not ready 170 executed = q.execute_ready_jobs(context=None) 171 assert executed == 0 172 assert j.executed is False 173 174 def test_execute_ready_picks_ready_only(self): 175 q = JobQueue() 176 ready = _SimpleJob(name="ready") 177 delayed = _SimpleJob(name="delayed") 178 q.schedule(ready, delay_seconds=0) 179 q.schedule(delayed, delay_seconds=999) 180 executed = q.execute_ready_jobs(context=None) 181 assert executed == 1 182 assert ready.executed is True 183 assert delayed.executed is False 184 185 def test_execute_ready_returns_count(self): 186 q = JobQueue() 187 for _ in range(3): 188 q.schedule(_SimpleJob(), delay_seconds=0) 189 assert q.execute_ready_jobs(context=None) == 3 190 191 def test_executed_jobs_removed_from_queue(self): 192 q = JobQueue() 193 q.schedule(_SimpleJob(), delay_seconds=0) 194 q.execute_ready_jobs(context=None) 195 assert q.pending_count() == 0 196 197 198class TestJobQueuePriority: 199 """Higher priority jobs (lower number) execute first.""" 200 201 def test_priority_ordering(self): 202 q = JobQueue() 203 order = [] 204 205 class _TrackingJob(Job): 206 def __init__(self, name, priority): 207 super().__init__(name=name, priority=priority) 208 209 def execute(self, context): 210 order.append(self.get_name()) 211 212 q.schedule(_TrackingJob("low", priority=10), delay_seconds=0) 213 q.schedule(_TrackingJob("high", priority=1), delay_seconds=0) 214 q.schedule(_TrackingJob("medium", priority=5), delay_seconds=0) 215 216 q.execute_ready_jobs(context=None) 217 assert order == ["high", "medium", "low"] 218 219 220class TestJobQueueCancellation: 221 """cancel() removes a job from the queue.""" 222 223 def test_cancel_returns_true(self): 224 q = JobQueue() 225 j = _SimpleJob() 226 q.schedule(j) 227 assert q.cancel(j) is True 228 229 def test_cancel_removes_from_queue(self): 230 q = JobQueue() 231 j = _SimpleJob() 232 q.schedule(j) 233 q.cancel(j) 234 assert q.pending_count() == 0 235 236 def test_cancel_nonexistent_returns_false(self): 237 q = JobQueue() 238 j = _SimpleJob() 239 assert q.cancel(j) is False 240 241 def test_cancelled_job_not_executed(self): 242 q = JobQueue() 243 j = _SimpleJob() 244 q.schedule(j, delay_seconds=0) 245 q.cancel(j) 246 q.execute_ready_jobs(context=None) 247 assert j.executed is False 248 249 250class TestJobQueueShutdown: 251 """shutdown() clears all pending jobs.""" 252 253 def test_shutdown_clears_queue(self): 254 q = JobQueue() 255 for _ in range(5): 256 q.schedule(_SimpleJob()) 257 q.shutdown() 258 assert q.pending_count() == 0 259 260 def test_shutdown_prevents_execution(self): 261 q = JobQueue() 262 j = _SimpleJob() 263 q.schedule(j, delay_seconds=0) 264 q.shutdown() 265 executed = q.execute_ready_jobs(context=None) 266 assert executed == 0 267 assert j.executed is False 268 269 def test_schedule_after_shutdown_raises(self): 270 q = JobQueue() 271 q.shutdown() 272 with pytest.raises(RuntimeError): 273 q.schedule(_SimpleJob()) 274 275 276# --------------------------------------------------------------------------- 277# Built-in router jobs 278# --------------------------------------------------------------------------- 279 280class TestTunnelBuildJob: 281 """TunnelBuildJob is a TimedJob with 60s interval.""" 282 283 def test_is_timed_job(self): 284 j = TunnelBuildJob() 285 assert isinstance(j, TimedJob) 286 287 def test_interval(self): 288 j = TunnelBuildJob() 289 assert j.interval_seconds == 60.0 290 291 def test_name(self): 292 j = TunnelBuildJob() 293 assert "tunnel" in j.get_name().lower() 294 assert "build" in j.get_name().lower() 295 296 def test_execute_runs_without_error(self): 297 j = TunnelBuildJob() 298 j.run(context=MagicMock()) # should not raise 299 300 301class TestTunnelRefreshJob: 302 """TunnelRefreshJob is a TimedJob with 30s interval.""" 303 304 def test_is_timed_job(self): 305 j = TunnelRefreshJob() 306 assert isinstance(j, TimedJob) 307 308 def test_interval(self): 309 j = TunnelRefreshJob() 310 assert j.interval_seconds == 30.0 311 312 def test_name(self): 313 j = TunnelRefreshJob() 314 assert "tunnel" in j.get_name().lower() 315 assert "refresh" in j.get_name().lower() 316 317 def test_execute_runs_without_error(self): 318 j = TunnelRefreshJob() 319 j.run(context=MagicMock()) 320 321 322class TestDatabaseSearchJob: 323 """DatabaseSearchJob is a one-shot Job (not TimedJob).""" 324 325 def test_is_job_not_timed(self): 326 j = DatabaseSearchJob() 327 assert isinstance(j, Job) 328 assert not isinstance(j, TimedJob) 329 330 def test_name(self): 331 j = DatabaseSearchJob() 332 assert "database" in j.get_name().lower() 333 assert "search" in j.get_name().lower() 334 335 def test_execute_runs_without_error(self): 336 j = DatabaseSearchJob() 337 j.execute(context=MagicMock()) 338 339 340class TestPeerProfileUpdateJob: 341 """PeerProfileUpdateJob is a TimedJob with 120s interval.""" 342 343 def test_is_timed_job(self): 344 j = PeerProfileUpdateJob() 345 assert isinstance(j, TimedJob) 346 347 def test_interval(self): 348 j = PeerProfileUpdateJob() 349 assert j.interval_seconds == 120.0 350 351 def test_name(self): 352 j = PeerProfileUpdateJob() 353 assert "peer" in j.get_name().lower() 354 355 def test_execute_runs_without_error(self): 356 j = PeerProfileUpdateJob() 357 j.run(context=MagicMock())