A Python port of the Invisible Internet Project (I2P)
at main 53 lines 1.5 kB view raw
1"""I2CP message handler — dispatch and request/reply correlation.""" 2 3from collections import defaultdict 4from typing import Callable 5 6 7class MessageHandler: 8 """Dispatches I2CP messages to registered handlers by type code.""" 9 10 def __init__(self): 11 self._handlers: dict[int, list[Callable]] = defaultdict(list) 12 13 def register(self, msg_type: int, handler: Callable): 14 self._handlers[msg_type].append(handler) 15 16 def dispatch(self, message): 17 for handler in self._handlers.get(message.TYPE, []): 18 handler(message) 19 20 21class ReplyFuture: 22 """Simple future for request/reply correlation.""" 23 24 def __init__(self): 25 self.done = False 26 self.cancelled = False 27 self.result = None 28 29 def set_result(self, result): 30 self.result = result 31 self.done = True 32 33 34class RequestReplyManager: 35 """Correlate I2CP requests with replies by message ID.""" 36 37 def __init__(self): 38 self._pending: dict[int, ReplyFuture] = {} 39 40 def register_request(self, msg_id: int) -> ReplyFuture: 41 future = ReplyFuture() 42 self._pending[msg_id] = future 43 return future 44 45 def handle_reply(self, msg_id: int, result): 46 future = self._pending.pop(msg_id, None) 47 if future is not None: 48 future.set_result(result) 49 50 def cancel(self, msg_id: int): 51 future = self._pending.pop(msg_id, None) 52 if future is not None: 53 future.cancelled = True