"""Tunnel build orchestration — executor and manager. Ported from: net.i2p.router.tunnel.pool.BuildExecutor net.i2p.router.tunnel.pool.TunnelPool Provides TunnelBuildExecutor for constructing tunnel build messages and processing build replies, and TunnelManager for maintaining inbound/outbound tunnel pools. """ from __future__ import annotations import os import random import time from i2p_data.i2np_tunnel import TunnelBuildMessage, TunnelBuildReplyMessage from i2p_data.tunnel import TunnelId from i2p_tunnel.builder import BuildRecord, BuildReplyRecord, TunnelEntry from i2p_tunnel.crypto import BuildRecordEncryptor, BuildReplyDecryptor # Default tunnel lifetime: 10 minutes (600,000 ms), matching I2P Java. DEFAULT_TUNNEL_LIFETIME_MS = 600_000 class TunnelBuildExecutor: """Build tunnel request messages and process replies. For each hop in the tunnel, a BuildRecord is created with random cryptographic keys, encrypted with the hop's ElGamal public key, and assembled into a TunnelBuildMessage (always 8 record slots, unused slots filled with random data). """ def build_tunnel( self, hops: list, public_keys: list[bytes], is_inbound: bool, ) -> TunnelBuildMessage: """Create a TunnelBuildMessage for the given hops. Parameters ---------- hops: List of HopConfig objects describing each hop in order (gateway to endpoint). public_keys: List of 256-byte ElGamal public keys, one per hop, in the same order as *hops*. is_inbound: True for inbound tunnels, False for outbound. Returns ------- TunnelBuildMessage Message containing 8 encrypted records (hop records + random padding records). """ num_hops = len(hops) records: list[bytes] = [] for i, hop in enumerate(hops): is_gateway = (i == 0) is_endpoint = (i == num_hops - 1) # Determine next hop identity (32 bytes) and tunnel id. if is_endpoint: next_ident = os.urandom(32) next_tunnel_id = 0 else: next_hop = hops[i + 1] next_ident = os.urandom(32) # peer hash placeholder next_tunnel_id = int(next_hop.receive_tunnel_id) build_rec = BuildRecord( receive_tunnel_id=int(hop.receive_tunnel_id), our_ident=os.urandom(32), next_tunnel_id=next_tunnel_id, next_ident=next_ident, layer_key=hop.layer_key, iv_key=hop.iv_key, reply_key=hop.reply_key, reply_iv=hop.reply_iv, is_gateway=is_gateway, is_endpoint=is_endpoint, ) record_bytes = build_rec.to_bytes() encrypted = BuildRecordEncryptor.encrypt_record( record_bytes, public_keys[i] ) records.append(encrypted) # Fill remaining slots (up to 8) with random 528-byte records. while len(records) < TunnelBuildMessage.NUM_RECORDS: records.append(os.urandom(TunnelBuildMessage.RECORD_SIZE)) # Shuffle so hop positions are not trivially identifiable. # (In a full implementation, the hop index would be tracked # separately. For now we do not shuffle to keep reply # processing straightforward.) return TunnelBuildMessage(records) def process_reply( self, reply_msg: TunnelBuildReplyMessage, hop_configs: list, ) -> TunnelEntry | None: """Process a TunnelBuildReplyMessage. Decrypt each hop's reply record using its reply_key / reply_iv and check whether all hops accepted (status == 0). Parameters ---------- reply_msg: The reply message received from the network. hop_configs: The original HopConfig list (same order as build_tunnel). Returns ------- TunnelEntry | None A TunnelEntry if all hops accepted; None if any rejected. """ for i, hop in enumerate(hop_configs): encrypted_record = reply_msg.records[i] # The encrypted portion is the first part; strip trailing # zero-padding that was added to reach 528 bytes. # BuildReplyRecord.SIZE is 496; padded to next 16-byte # boundary = 496 (already multiple of 16). reply_size = BuildReplyRecord.SIZE pad_len = (16 - reply_size % 16) % 16 decrypt_len = reply_size + pad_len encrypted_data = encrypted_record[:decrypt_len] decrypted = BuildReplyDecryptor.decrypt_reply( encrypted_data, hop.reply_key, hop.reply_iv ) # First byte is the status. reply_rec = BuildReplyRecord.from_bytes(decrypted) if not reply_rec.is_accepted(): return None # All hops accepted — create a TunnelEntry. now_ms = int(time.time() * 1000) tunnel_id_val = int(hop_configs[0].receive_tunnel_id) if hop_configs else random.randint(1, 2**32 - 1) return TunnelEntry( tunnel_id=TunnelId(tunnel_id_val), gateway=os.urandom(32), length=len(hop_configs), creation_time=now_ms, expiration=now_ms + DEFAULT_TUNNEL_LIFETIME_MS, ) class TunnelManager: """Manage pools of inbound and outbound tunnels. Tracks active tunnels against configurable targets and provides methods to query pool status and remove expired entries. """ def __init__( self, target_inbound: int = 3, target_outbound: int = 3, ): self._target_inbound = target_inbound self._target_outbound = target_outbound self._inbound: list[TunnelEntry] = [] self._outbound: list[TunnelEntry] = [] def add_tunnel(self, entry: TunnelEntry, is_inbound: bool) -> None: """Add a tunnel to the appropriate pool.""" if is_inbound: self._inbound.append(entry) else: self._outbound.append(entry) def remove_expired(self, now_ms: int) -> None: """Remove tunnels that have passed their expiration time.""" self._inbound = [t for t in self._inbound if not t.is_expired(now_ms)] self._outbound = [t for t in self._outbound if not t.is_expired(now_ms)] def inbound_count(self) -> int: """Return the number of active inbound tunnels.""" return len(self._inbound) def outbound_count(self) -> int: """Return the number of active outbound tunnels.""" return len(self._outbound) def needs_more_inbound(self) -> bool: """True if inbound pool is below target.""" return len(self._inbound) < self._target_inbound def needs_more_outbound(self) -> bool: """True if outbound pool is below target.""" return len(self._outbound) < self._target_outbound def get_inbound_tunnels(self) -> list[TunnelEntry]: """Return a copy of the inbound tunnel list.""" return list(self._inbound) def get_outbound_tunnels(self) -> list[TunnelEntry]: """Return a copy of the outbound tunnel list.""" return list(self._outbound)