A Python port of the Invisible Internet Project (I2P)
1"""ReplyHandler — match DeliveryStatusMessages to pending ACKs.
2
3Ported from net.i2p.router.message.OutboundClientMessageOneShotJob
4(reply selector and delivery tracking logic).
5"""
6
7from __future__ import annotations
8
9import logging
10from dataclasses import dataclass, field
11from typing import Callable
12
13logger = logging.getLogger(__name__)
14
15
16@dataclass
17class ReplyContext:
18 """State associated with a pending ACK."""
19 token: int
20 message_id: int
21 from_hash: bytes
22 to_hash: bytes
23 sent_ms: int
24 timeout_ms: int = 30_000
25 on_success: Callable | None = None
26 on_timeout: Callable | None = None
27
28
29class ReplySelector:
30 """Matches incoming DeliveryStatusMessages to pending outbound ACKs.
31
32 Each pending message is registered with a unique token.
33 When a DeliveryStatusMessage arrives with a matching token,
34 the entry is consumed (one-shot match).
35 """
36
37 def __init__(self) -> None:
38 self._pending: dict[int, ReplyContext] = {}
39
40 def register(self, token: int, context: ReplyContext) -> None:
41 """Register a pending reply expectation."""
42 self._pending[token] = context
43
44 def match(self, msg_id: int) -> ReplyContext | None:
45 """Try to match an incoming message ID to a pending context.
46
47 Consumes the entry on match (one-shot).
48 Returns None if no match.
49 """
50 ctx = self._pending.pop(msg_id, None)
51 return ctx
52
53 def expire_old(self, now_ms: int) -> list[ReplyContext]:
54 """Remove and return all timed-out entries.
55
56 Fires on_timeout callback for each expired entry.
57 """
58 expired = []
59 to_remove = []
60 for token, ctx in self._pending.items():
61 if (now_ms - ctx.sent_ms) > ctx.timeout_ms:
62 to_remove.append(token)
63 expired.append(ctx)
64
65 for token in to_remove:
66 del self._pending[token]
67
68 for ctx in expired:
69 if ctx.on_timeout:
70 try:
71 ctx.on_timeout()
72 except Exception:
73 logger.warning("Error in timeout callback for token %d", ctx.token)
74
75 return expired
76
77 @property
78 def pending_count(self) -> int:
79 return len(self._pending)
80
81
82class DeliveryStatusTracker:
83 """Higher-level delivery tracking using ReplySelector.
84
85 Provides a simpler interface for registering pending messages
86 and processing incoming status updates.
87 """
88
89 def __init__(self) -> None:
90 self._selector = ReplySelector()
91
92 def register_pending(self, context: ReplyContext) -> None:
93 """Register a pending delivery."""
94 self._selector.register(context.token, context)
95
96 def on_status_received(self, msg_id: int) -> ReplyContext | None:
97 """Process an incoming DeliveryStatusMessage.
98
99 Returns the matched context (fires on_success callback),
100 or None if no match.
101 """
102 ctx = self._selector.match(msg_id)
103 if ctx is not None and ctx.on_success:
104 try:
105 ctx.on_success()
106 except Exception:
107 logger.warning("Error in success callback for token %d", ctx.token)
108 return ctx
109
110 def check_timeouts(self, now_ms: int) -> list[ReplyContext]:
111 """Check for and handle timed-out deliveries."""
112 return self._selector.expire_old(now_ms)
113
114 @property
115 def pending_count(self) -> int:
116 return self._selector.pending_count