"""Tests for ReplySelector and DeliveryStatusTracker.""" from i2p_router.reply_handler import ReplySelector, ReplyContext, DeliveryStatusTracker def _make_ctx(token: int, sent_ms: int = 1000, timeout_ms: int = 30_000) -> ReplyContext: return ReplyContext( token=token, message_id=token, from_hash=b"\x01" * 32, to_hash=b"\x02" * 32, sent_ms=sent_ms, timeout_ms=timeout_ms, ) def test_register_and_match(): sel = ReplySelector() ctx = _make_ctx(42) sel.register(42, ctx) assert sel.match(42) is ctx def test_unknown_token(): sel = ReplySelector() assert sel.match(999) is None def test_match_consumes_entry(): sel = ReplySelector() sel.register(42, _make_ctx(42)) assert sel.match(42) is not None assert sel.match(42) is None # consumed def test_expire_old_fires_timeout(): fired = [] ctx = _make_ctx(1, sent_ms=1000, timeout_ms=5000) ctx.on_timeout = lambda: fired.append(True) sel = ReplySelector() sel.register(1, ctx) expired = sel.expire_old(now_ms=7000) assert len(expired) == 1 assert len(fired) == 1 def test_expire_old_keeps_fresh(): sel = ReplySelector() sel.register(1, _make_ctx(1, sent_ms=1000, timeout_ms=30_000)) expired = sel.expire_old(now_ms=2000) assert len(expired) == 0 assert sel.pending_count == 1 def test_tracker_full_flow(): tracker = DeliveryStatusTracker() success_fired = [] ctx = _make_ctx(10, sent_ms=1000) ctx.on_success = lambda: success_fired.append(True) tracker.register_pending(ctx) assert tracker.pending_count == 1 result = tracker.on_status_received(10) assert result is ctx assert len(success_fired) == 1 assert tracker.pending_count == 0 def test_tracker_timeout(): tracker = DeliveryStatusTracker() timeout_fired = [] ctx = _make_ctx(20, sent_ms=1000, timeout_ms=5000) ctx.on_timeout = lambda: timeout_fired.append(True) tracker.register_pending(ctx) expired = tracker.check_timeouts(now_ms=7000) assert len(expired) == 1 assert len(timeout_fired) == 1