A Python port of the Invisible Internet Project (I2P)
at main 72 lines 2.1 kB view raw
1"""Tests for ReplySelector and DeliveryStatusTracker.""" 2 3from i2p_router.reply_handler import ReplySelector, ReplyContext, DeliveryStatusTracker 4 5 6def _make_ctx(token: int, sent_ms: int = 1000, timeout_ms: int = 30_000) -> ReplyContext: 7 return ReplyContext( 8 token=token, message_id=token, from_hash=b"\x01" * 32, 9 to_hash=b"\x02" * 32, sent_ms=sent_ms, timeout_ms=timeout_ms, 10 ) 11 12 13def test_register_and_match(): 14 sel = ReplySelector() 15 ctx = _make_ctx(42) 16 sel.register(42, ctx) 17 assert sel.match(42) is ctx 18 19 20def test_unknown_token(): 21 sel = ReplySelector() 22 assert sel.match(999) is None 23 24 25def test_match_consumes_entry(): 26 sel = ReplySelector() 27 sel.register(42, _make_ctx(42)) 28 assert sel.match(42) is not None 29 assert sel.match(42) is None # consumed 30 31 32def test_expire_old_fires_timeout(): 33 fired = [] 34 ctx = _make_ctx(1, sent_ms=1000, timeout_ms=5000) 35 ctx.on_timeout = lambda: fired.append(True) 36 sel = ReplySelector() 37 sel.register(1, ctx) 38 expired = sel.expire_old(now_ms=7000) 39 assert len(expired) == 1 40 assert len(fired) == 1 41 42 43def test_expire_old_keeps_fresh(): 44 sel = ReplySelector() 45 sel.register(1, _make_ctx(1, sent_ms=1000, timeout_ms=30_000)) 46 expired = sel.expire_old(now_ms=2000) 47 assert len(expired) == 0 48 assert sel.pending_count == 1 49 50 51def test_tracker_full_flow(): 52 tracker = DeliveryStatusTracker() 53 success_fired = [] 54 ctx = _make_ctx(10, sent_ms=1000) 55 ctx.on_success = lambda: success_fired.append(True) 56 tracker.register_pending(ctx) 57 assert tracker.pending_count == 1 58 result = tracker.on_status_received(10) 59 assert result is ctx 60 assert len(success_fired) == 1 61 assert tracker.pending_count == 0 62 63 64def test_tracker_timeout(): 65 tracker = DeliveryStatusTracker() 66 timeout_fired = [] 67 ctx = _make_ctx(20, sent_ms=1000, timeout_ms=5000) 68 ctx.on_timeout = lambda: timeout_fired.append(True) 69 tracker.register_pending(ctx) 70 expired = tracker.check_timeouts(now_ms=7000) 71 assert len(expired) == 1 72 assert len(timeout_fired) == 1