"""RouterContext — central coordinator that owns all subsystem instances. Ported from net.i2p.router.RouterContext. The RouterContext wires together crypto, messaging, tunnel, and NetDB subsystems, providing a single entry point for inbound/outbound message processing, tunnel management, and network database operations. """ from __future__ import annotations import os from i2p_crypto.session_key_manager import SessionKeyManager from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor from i2p_data.message_router import ( InboundMessageHandler, OutboundMessageRouter, MessageDispatcher, ) from i2p_data.garlic_handler import GarlicMessageHandler from i2p_netdb.datastore import DataStore from i2p_netdb.netdb_handler import NetDBHandler from i2p_tunnel.data_handler import TunnelCryptoRegistry, TunnelDataHandler from i2p_tunnel.build_executor import TunnelManager class RouterContext: """Central coordinator that owns all subsystem instances and wires them together. Parameters ---------- router_hash: 32-byte router identity hash. If None, a random 32-byte hash is generated. """ def __init__(self, router_hash: bytes | None = None) -> None: self.router_hash = router_hash if router_hash is not None else os.urandom(32) # Crypto subsystems self.session_key_mgr = SessionKeyManager() self.garlic_encryptor = GarlicEncryptor() self.garlic_decryptor = GarlicDecryptor() # Message routing subsystems self.inbound_handler = InboundMessageHandler() self.outbound_router = OutboundMessageRouter() self.message_dispatcher = MessageDispatcher( self.inbound_handler, self.outbound_router ) # NetDB subsystems self.datastore = DataStore() self.netdb_handler = NetDBHandler(self.datastore) # Tunnel subsystems self.crypto_registry = TunnelCryptoRegistry() self.tunnel_data_handler = TunnelDataHandler(self.crypto_registry) self.tunnel_manager = TunnelManager() # Garlic message handler self.garlic_handler = GarlicMessageHandler( self.session_key_mgr, self.garlic_decryptor ) # Wire all handlers together self._wire_handlers() def _wire_handlers(self) -> None: """Register message-type handlers on the inbound handler.""" # GARLIC (type 11) self.inbound_handler.register(11, self.garlic_handler.handle) # DATABASE_STORE (type 1) self.inbound_handler.register(1, self._handle_db_store) # DATABASE_LOOKUP (type 2) self.inbound_handler.register(2, self._handle_db_lookup) # TUNNEL_DATA (type 18) self.inbound_handler.register(18, self._handle_tunnel_data) def _handle_db_store(self, payload: bytes) -> bool: """Adapter: extract key(32) + data from payload and delegate to netdb_handler.""" key = payload[:32] data = payload[32:] return self.netdb_handler.handle_store(key, data) def _handle_db_lookup(self, payload: bytes): """Adapter: extract key(32) from payload and delegate to netdb_handler.""" key = payload[:32] return self.netdb_handler.handle_lookup(key) def _handle_tunnel_data(self, payload: bytes) -> dict: """Adapter: extract tunnel_id(4) + data from payload and delegate to tunnel_data_handler.""" tunnel_id = int.from_bytes(payload[:4], "big") data = payload[4:] return self.tunnel_data_handler.handle_inbound(tunnel_id, data) def process_inbound(self, message_type: int, payload: bytes): """Dispatch an inbound I2NP message to the appropriate handler. Parameters ---------- message_type: I2NP message type code. payload: Raw message payload bytes. Returns ------- The result from the registered handler, or None if no handler is registered. """ return self.message_dispatcher.dispatch_inbound(message_type, payload) def route_outbound(self, delivery_type: int, payload: bytes, **kwargs) -> dict: """Route an outbound message based on delivery type. Parameters ---------- delivery_type: Delivery type code (0=LOCAL, 1=ROUTER, 2=TUNNEL, 3=DESTINATION). payload: Message payload bytes. **kwargs: Additional routing parameters (router_hash, tunnel_id, gateway, destination). Returns ------- dict Routing descriptor with type and payload fields. """ return self.message_dispatcher.dispatch_outbound(delivery_type, payload, **kwargs) def store_netdb_entry(self, key: bytes, data: bytes) -> bool: """Store a key-value entry in the network database. Parameters ---------- key: 32-byte entry key. data: Entry data bytes. Returns ------- bool True on success. """ return self.netdb_handler.handle_store(key, data) def lookup_netdb_entry(self, key: bytes) -> bytes | None: """Look up an entry in the network database. Parameters ---------- key: 32-byte entry key. Returns ------- bytes or None The entry data, or None if not found. """ return self.netdb_handler.handle_lookup(key) def register_tunnel( self, tunnel_id: int, layer_key: bytes, iv_key: bytes, is_endpoint: bool = False, ) -> None: """Register a tunnel in the crypto registry. Parameters ---------- tunnel_id: Numeric tunnel identifier. layer_key: 32-byte AES layer key. iv_key: Key for IV derivation. is_endpoint: True if this router is the tunnel endpoint. """ self.crypto_registry.register(tunnel_id, layer_key, iv_key, is_endpoint) def process_tunnel_data(self, tunnel_id: int, encrypted_data: bytes) -> dict: """Decrypt one layer of inbound tunnel data and decide routing. Parameters ---------- tunnel_id: The tunnel ID this data arrived on. encrypted_data: Encrypted tunnel data (multiple of 16 bytes). Returns ------- dict Routing decision: deliver, forward, or unknown. """ return self.tunnel_data_handler.handle_inbound(tunnel_id, encrypted_data) def get_status(self) -> dict: """Return a status snapshot of the router. Returns ------- dict Status dictionary with router_hash, tunnels_registered, netdb_entries, inbound_count, and outbound_count. """ return { "router_hash": self.router_hash.hex(), "tunnels_registered": len(self.crypto_registry.registered_tunnels()), "netdb_entries": self.datastore.count(), "inbound_count": self.tunnel_manager.inbound_count(), "outbound_count": self.tunnel_manager.outbound_count(), }