"""I2CP message handler — dispatch and request/reply correlation.""" from collections import defaultdict from typing import Callable class MessageHandler: """Dispatches I2CP messages to registered handlers by type code.""" def __init__(self): self._handlers: dict[int, list[Callable]] = defaultdict(list) def register(self, msg_type: int, handler: Callable): self._handlers[msg_type].append(handler) def dispatch(self, message): for handler in self._handlers.get(message.TYPE, []): handler(message) class ReplyFuture: """Simple future for request/reply correlation.""" def __init__(self): self.done = False self.cancelled = False self.result = None def set_result(self, result): self.result = result self.done = True class RequestReplyManager: """Correlate I2CP requests with replies by message ID.""" def __init__(self): self._pending: dict[int, ReplyFuture] = {} def register_request(self, msg_id: int) -> ReplyFuture: future = ReplyFuture() self._pending[msg_id] = future return future def handle_reply(self, msg_id: int, result): future = self._pending.pop(msg_id, None) if future is not None: future.set_result(result) def cancel(self, msg_id: int): future = self._pending.pop(msg_id, None) if future is not None: future.cancelled = True