A Python port of the Invisible Internet Project (I2P)
1"""Tests for Job, TimedJob, JobQueue, and built-in router jobs.
2
3TDD — these tests are written before the implementation.
4Ported from net.i2p.router.JobQueue / JobImpl / TimedEvent concepts.
5"""
6
7from __future__ import annotations
8
9import time
10from unittest.mock import MagicMock
11
12import pytest
13
14from i2p_router.job import Job, TimedJob
15from i2p_router.job_queue import JobQueue
16from i2p_router.router_jobs import (
17 TunnelBuildJob,
18 TunnelRefreshJob,
19 DatabaseSearchJob,
20 PeerProfileUpdateJob,
21)
22
23
24# ---------------------------------------------------------------------------
25# Concrete test doubles
26# ---------------------------------------------------------------------------
27
28class _SimpleJob(Job):
29 """Minimal concrete Job for testing."""
30
31 def __init__(self, name: str = "test-job", priority: int = 5):
32 super().__init__(name=name, priority=priority)
33 self.executed = False
34 self.exec_context = None
35
36 def execute(self, context) -> None:
37 self.executed = True
38 self.exec_context = context
39
40
41class _CountingTimedJob(TimedJob):
42 """TimedJob that counts how many times it ran."""
43
44 def __init__(self, name: str = "timed-test", priority: int = 5,
45 interval_seconds: float = 1.0):
46 super().__init__(name=name, priority=priority,
47 interval_seconds=interval_seconds)
48 self.run_count = 0
49
50 def run(self, context) -> None:
51 self.run_count += 1
52
53
54# ---------------------------------------------------------------------------
55# Job basics
56# ---------------------------------------------------------------------------
57
58class TestJob:
59 """Job ABC basics — name, priority, comparison."""
60
61 def test_name(self):
62 j = _SimpleJob(name="alpha")
63 assert j.get_name() == "alpha"
64
65 def test_priority(self):
66 j = _SimpleJob(priority=3)
67 assert j.get_priority() == 3
68
69 def test_execute(self):
70 ctx = object()
71 j = _SimpleJob()
72 j.execute(ctx)
73 assert j.executed is True
74 assert j.exec_context is ctx
75
76 def test_has_unique_id(self):
77 a = _SimpleJob()
78 b = _SimpleJob()
79 assert a.job_id != b.job_id
80
81 def test_ordering_by_scheduled_at(self):
82 """Jobs with earlier scheduled_at sort first."""
83 a = _SimpleJob(priority=5)
84 b = _SimpleJob(priority=5)
85 a.scheduled_at = 1.0
86 b.scheduled_at = 2.0
87 assert a < b
88
89 def test_ordering_by_priority_when_same_time(self):
90 """Lower priority number = higher priority = sorts first."""
91 a = _SimpleJob(priority=1)
92 b = _SimpleJob(priority=5)
93 a.scheduled_at = 1.0
94 b.scheduled_at = 1.0
95 assert a < b
96
97 def test_ordering_by_id_tiebreak(self):
98 """Deterministic ordering when time and priority are equal."""
99 a = _SimpleJob(priority=5)
100 b = _SimpleJob(priority=5)
101 a.scheduled_at = 1.0
102 b.scheduled_at = 1.0
103 # One of them must sort before the other, no equality
104 assert (a < b) != (b < a)
105
106
107# ---------------------------------------------------------------------------
108# TimedJob
109# ---------------------------------------------------------------------------
110
111class TestTimedJob:
112 """TimedJob re-schedules itself after execution."""
113
114 def test_run_called(self):
115 tj = _CountingTimedJob(interval_seconds=10.0)
116 queue = JobQueue()
117 queue.schedule(tj, delay_seconds=0)
118 queue.execute_ready_jobs(context=None)
119 assert tj.run_count == 1
120
121 def test_reschedules_after_execute(self):
122 tj = _CountingTimedJob(interval_seconds=10.0)
123 queue = JobQueue()
124 queue.schedule(tj, delay_seconds=0)
125 queue.execute_ready_jobs(context=None)
126 # After execution the timed job should have re-added itself
127 assert queue.pending_count() == 1
128
129 def test_rescheduled_time_is_in_future(self):
130 tj = _CountingTimedJob(interval_seconds=5.0)
131 queue = JobQueue()
132 queue.schedule(tj, delay_seconds=0)
133 before = time.monotonic()
134 queue.execute_ready_jobs(context=None)
135 # The re-scheduled time should be >= now + interval
136 assert tj.scheduled_at >= before + 5.0
137
138 def test_interval_attribute(self):
139 tj = _CountingTimedJob(interval_seconds=30.0)
140 assert tj.interval_seconds == 30.0
141
142
143# ---------------------------------------------------------------------------
144# JobQueue — scheduling and execution
145# ---------------------------------------------------------------------------
146
147class TestJobQueueScheduling:
148 """schedule(), pending_count(), and basic queue behavior."""
149
150 def test_empty_queue(self):
151 q = JobQueue()
152 assert q.pending_count() == 0
153
154 def test_schedule_one(self):
155 q = JobQueue()
156 q.schedule(_SimpleJob())
157 assert q.pending_count() == 1
158
159 def test_schedule_multiple(self):
160 q = JobQueue()
161 for i in range(5):
162 q.schedule(_SimpleJob(name=f"j{i}"))
163 assert q.pending_count() == 5
164
165 def test_schedule_with_delay(self):
166 q = JobQueue()
167 j = _SimpleJob()
168 q.schedule(j, delay_seconds=100)
169 # Job is scheduled but not ready
170 executed = q.execute_ready_jobs(context=None)
171 assert executed == 0
172 assert j.executed is False
173
174 def test_execute_ready_picks_ready_only(self):
175 q = JobQueue()
176 ready = _SimpleJob(name="ready")
177 delayed = _SimpleJob(name="delayed")
178 q.schedule(ready, delay_seconds=0)
179 q.schedule(delayed, delay_seconds=999)
180 executed = q.execute_ready_jobs(context=None)
181 assert executed == 1
182 assert ready.executed is True
183 assert delayed.executed is False
184
185 def test_execute_ready_returns_count(self):
186 q = JobQueue()
187 for _ in range(3):
188 q.schedule(_SimpleJob(), delay_seconds=0)
189 assert q.execute_ready_jobs(context=None) == 3
190
191 def test_executed_jobs_removed_from_queue(self):
192 q = JobQueue()
193 q.schedule(_SimpleJob(), delay_seconds=0)
194 q.execute_ready_jobs(context=None)
195 assert q.pending_count() == 0
196
197
198class TestJobQueuePriority:
199 """Higher priority jobs (lower number) execute first."""
200
201 def test_priority_ordering(self):
202 q = JobQueue()
203 order = []
204
205 class _TrackingJob(Job):
206 def __init__(self, name, priority):
207 super().__init__(name=name, priority=priority)
208
209 def execute(self, context):
210 order.append(self.get_name())
211
212 q.schedule(_TrackingJob("low", priority=10), delay_seconds=0)
213 q.schedule(_TrackingJob("high", priority=1), delay_seconds=0)
214 q.schedule(_TrackingJob("medium", priority=5), delay_seconds=0)
215
216 q.execute_ready_jobs(context=None)
217 assert order == ["high", "medium", "low"]
218
219
220class TestJobQueueCancellation:
221 """cancel() removes a job from the queue."""
222
223 def test_cancel_returns_true(self):
224 q = JobQueue()
225 j = _SimpleJob()
226 q.schedule(j)
227 assert q.cancel(j) is True
228
229 def test_cancel_removes_from_queue(self):
230 q = JobQueue()
231 j = _SimpleJob()
232 q.schedule(j)
233 q.cancel(j)
234 assert q.pending_count() == 0
235
236 def test_cancel_nonexistent_returns_false(self):
237 q = JobQueue()
238 j = _SimpleJob()
239 assert q.cancel(j) is False
240
241 def test_cancelled_job_not_executed(self):
242 q = JobQueue()
243 j = _SimpleJob()
244 q.schedule(j, delay_seconds=0)
245 q.cancel(j)
246 q.execute_ready_jobs(context=None)
247 assert j.executed is False
248
249
250class TestJobQueueShutdown:
251 """shutdown() clears all pending jobs."""
252
253 def test_shutdown_clears_queue(self):
254 q = JobQueue()
255 for _ in range(5):
256 q.schedule(_SimpleJob())
257 q.shutdown()
258 assert q.pending_count() == 0
259
260 def test_shutdown_prevents_execution(self):
261 q = JobQueue()
262 j = _SimpleJob()
263 q.schedule(j, delay_seconds=0)
264 q.shutdown()
265 executed = q.execute_ready_jobs(context=None)
266 assert executed == 0
267 assert j.executed is False
268
269 def test_schedule_after_shutdown_raises(self):
270 q = JobQueue()
271 q.shutdown()
272 with pytest.raises(RuntimeError):
273 q.schedule(_SimpleJob())
274
275
276# ---------------------------------------------------------------------------
277# Built-in router jobs
278# ---------------------------------------------------------------------------
279
280class TestTunnelBuildJob:
281 """TunnelBuildJob is a TimedJob with 60s interval."""
282
283 def test_is_timed_job(self):
284 j = TunnelBuildJob()
285 assert isinstance(j, TimedJob)
286
287 def test_interval(self):
288 j = TunnelBuildJob()
289 assert j.interval_seconds == 60.0
290
291 def test_name(self):
292 j = TunnelBuildJob()
293 assert "tunnel" in j.get_name().lower()
294 assert "build" in j.get_name().lower()
295
296 def test_execute_runs_without_error(self):
297 j = TunnelBuildJob()
298 j.run(context=MagicMock()) # should not raise
299
300
301class TestTunnelRefreshJob:
302 """TunnelRefreshJob is a TimedJob with 30s interval."""
303
304 def test_is_timed_job(self):
305 j = TunnelRefreshJob()
306 assert isinstance(j, TimedJob)
307
308 def test_interval(self):
309 j = TunnelRefreshJob()
310 assert j.interval_seconds == 30.0
311
312 def test_name(self):
313 j = TunnelRefreshJob()
314 assert "tunnel" in j.get_name().lower()
315 assert "refresh" in j.get_name().lower()
316
317 def test_execute_runs_without_error(self):
318 j = TunnelRefreshJob()
319 j.run(context=MagicMock())
320
321
322class TestDatabaseSearchJob:
323 """DatabaseSearchJob is a one-shot Job (not TimedJob)."""
324
325 def test_is_job_not_timed(self):
326 j = DatabaseSearchJob()
327 assert isinstance(j, Job)
328 assert not isinstance(j, TimedJob)
329
330 def test_name(self):
331 j = DatabaseSearchJob()
332 assert "database" in j.get_name().lower()
333 assert "search" in j.get_name().lower()
334
335 def test_execute_runs_without_error(self):
336 j = DatabaseSearchJob()
337 j.execute(context=MagicMock())
338
339
340class TestPeerProfileUpdateJob:
341 """PeerProfileUpdateJob is a TimedJob with 120s interval."""
342
343 def test_is_timed_job(self):
344 j = PeerProfileUpdateJob()
345 assert isinstance(j, TimedJob)
346
347 def test_interval(self):
348 j = PeerProfileUpdateJob()
349 assert j.interval_seconds == 120.0
350
351 def test_name(self):
352 j = PeerProfileUpdateJob()
353 assert "peer" in j.get_name().lower()
354
355 def test_execute_runs_without_error(self):
356 j = PeerProfileUpdateJob()
357 j.run(context=MagicMock())