"""I2NP message routing — inbound dispatch, outbound routing, and unified dispatcher.""" from __future__ import annotations import struct from typing import Any, Callable class InboundMessageHandler: """Dispatches inbound I2NP messages to registered handlers by message type.""" # I2NP message type constants DATABASE_STORE = 1 DATABASE_LOOKUP = 2 DELIVERY_STATUS = 10 GARLIC = 11 TUNNEL_DATA = 18 TUNNEL_GATEWAY = 19 def __init__(self) -> None: self._handlers: dict[int, Callable[..., Any]] = {} self._store: dict[bytes, bytes] = {} # Register default handlers self.register(self.DATABASE_STORE, self.on_database_store) self.register(self.DATABASE_LOOKUP, self.on_database_lookup) self.register(self.DELIVERY_STATUS, self.on_delivery_status) self.register(self.GARLIC, self.on_garlic) self.register(self.TUNNEL_DATA, self.on_tunnel_data) self.register(self.TUNNEL_GATEWAY, self.on_tunnel_data) # same extraction logic def register(self, message_type: int, handler: Callable[..., Any]) -> None: """Register a handler for a given I2NP message type.""" self._handlers[message_type] = handler def handle(self, message_type: int, payload: bytes): """Dispatch to the registered handler. Returns None if no handler is registered.""" handler = self._handlers.get(message_type) if handler is None: return None return handler(payload) def on_database_store(self, payload: bytes) -> None: """Store an entry keyed by the first 32 bytes of payload.""" key = payload[:32] value = payload[32:] self._store[key] = value def on_database_lookup(self, payload: bytes): """Look up an entry by the first 32 bytes of payload. Returns the entry or None.""" key = payload[:32] return self._store.get(key) def on_garlic(self, payload: bytes) -> list[bytes]: """Placeholder: return the payload as a single clove.""" return [payload] def on_delivery_status(self, payload: bytes) -> int: """Extract and return the 4-byte message ID from the start of the payload.""" return int.from_bytes(payload[:4], "big") def on_tunnel_data(self, payload: bytes) -> tuple[int, bytes]: """Extract tunnel_id (first 4 bytes) and the remaining data.""" tunnel_id = int.from_bytes(payload[:4], "big") data = payload[4:] return (tunnel_id, data) class OutboundMessageRouter: """Routes outbound messages based on delivery type.""" LOCAL = 0 ROUTER = 1 TUNNEL = 2 DESTINATION = 3 def __init__(self) -> None: pass def route(self, delivery_type: int, payload: bytes, **kwargs) -> dict: """Route a message based on delivery type. Returns a routing descriptor dict.""" if delivery_type == self.LOCAL: return {"type": "local", "payload": payload} elif delivery_type == self.ROUTER: return { "type": "router", "router_hash": kwargs["router_hash"], "payload": payload, } elif delivery_type == self.TUNNEL: return { "type": "tunnel", "tunnel_id": kwargs["tunnel_id"], "gateway": kwargs["gateway"], "payload": payload, } elif delivery_type == self.DESTINATION: return { "type": "destination", "destination": kwargs["destination"], "payload": payload, } else: raise ValueError(f"Unknown delivery type: {delivery_type}") class MessageDispatcher: """Unified dispatcher that delegates to inbound handler and outbound router.""" def __init__(self, inbound: InboundMessageHandler, outbound: OutboundMessageRouter) -> None: self._inbound = inbound self._outbound = outbound def dispatch_inbound(self, message_type: int, payload: bytes): """Delegate to the inbound message handler.""" return self._inbound.handle(message_type, payload) def dispatch_outbound(self, delivery_type: int, payload: bytes, **kwargs) -> dict: """Delegate to the outbound message router.""" return self._outbound.route(delivery_type, payload, **kwargs) def register_inbound_handler(self, message_type: int, handler: Callable[..., Any]) -> None: """Register a handler on the inbound handler.""" self._inbound.register(message_type, handler)