A Python port of the Invisible Internet Project (I2P)
1"""I2CP message handler — dispatch and request/reply correlation."""
2
3from collections import defaultdict
4from typing import Callable
5
6
7class MessageHandler:
8 """Dispatches I2CP messages to registered handlers by type code."""
9
10 def __init__(self):
11 self._handlers: dict[int, list[Callable]] = defaultdict(list)
12
13 def register(self, msg_type: int, handler: Callable):
14 self._handlers[msg_type].append(handler)
15
16 def dispatch(self, message):
17 for handler in self._handlers.get(message.TYPE, []):
18 handler(message)
19
20
21class ReplyFuture:
22 """Simple future for request/reply correlation."""
23
24 def __init__(self):
25 self.done = False
26 self.cancelled = False
27 self.result = None
28
29 def set_result(self, result):
30 self.result = result
31 self.done = True
32
33
34class RequestReplyManager:
35 """Correlate I2CP requests with replies by message ID."""
36
37 def __init__(self):
38 self._pending: dict[int, ReplyFuture] = {}
39
40 def register_request(self, msg_id: int) -> ReplyFuture:
41 future = ReplyFuture()
42 self._pending[msg_id] = future
43 return future
44
45 def handle_reply(self, msg_id: int, result):
46 future = self._pending.pop(msg_id, None)
47 if future is not None:
48 future.set_result(result)
49
50 def cancel(self, msg_id: int):
51 future = self._pending.pop(msg_id, None)
52 if future is not None:
53 future.cancelled = True