A Python port of the Invisible Internet Project (I2P)
1"""Tests for ReplySelector and DeliveryStatusTracker."""
2
3from i2p_router.reply_handler import ReplySelector, ReplyContext, DeliveryStatusTracker
4
5
6def _make_ctx(token: int, sent_ms: int = 1000, timeout_ms: int = 30_000) -> ReplyContext:
7 return ReplyContext(
8 token=token, message_id=token, from_hash=b"\x01" * 32,
9 to_hash=b"\x02" * 32, sent_ms=sent_ms, timeout_ms=timeout_ms,
10 )
11
12
13def test_register_and_match():
14 sel = ReplySelector()
15 ctx = _make_ctx(42)
16 sel.register(42, ctx)
17 assert sel.match(42) is ctx
18
19
20def test_unknown_token():
21 sel = ReplySelector()
22 assert sel.match(999) is None
23
24
25def test_match_consumes_entry():
26 sel = ReplySelector()
27 sel.register(42, _make_ctx(42))
28 assert sel.match(42) is not None
29 assert sel.match(42) is None # consumed
30
31
32def test_expire_old_fires_timeout():
33 fired = []
34 ctx = _make_ctx(1, sent_ms=1000, timeout_ms=5000)
35 ctx.on_timeout = lambda: fired.append(True)
36 sel = ReplySelector()
37 sel.register(1, ctx)
38 expired = sel.expire_old(now_ms=7000)
39 assert len(expired) == 1
40 assert len(fired) == 1
41
42
43def test_expire_old_keeps_fresh():
44 sel = ReplySelector()
45 sel.register(1, _make_ctx(1, sent_ms=1000, timeout_ms=30_000))
46 expired = sel.expire_old(now_ms=2000)
47 assert len(expired) == 0
48 assert sel.pending_count == 1
49
50
51def test_tracker_full_flow():
52 tracker = DeliveryStatusTracker()
53 success_fired = []
54 ctx = _make_ctx(10, sent_ms=1000)
55 ctx.on_success = lambda: success_fired.append(True)
56 tracker.register_pending(ctx)
57 assert tracker.pending_count == 1
58 result = tracker.on_status_received(10)
59 assert result is ctx
60 assert len(success_fired) == 1
61 assert tracker.pending_count == 0
62
63
64def test_tracker_timeout():
65 tracker = DeliveryStatusTracker()
66 timeout_fired = []
67 ctx = _make_ctx(20, sent_ms=1000, timeout_ms=5000)
68 ctx.on_timeout = lambda: timeout_fired.append(True)
69 tracker.register_pending(ctx)
70 expired = tracker.check_timeouts(now_ms=7000)
71 assert len(expired) == 1
72 assert len(timeout_fired) == 1