A Python port of the Invisible Internet Project (I2P)
at main 213 lines 7.5 kB view raw
1"""Tunnel build orchestration — executor and manager. 2 3Ported from: 4 net.i2p.router.tunnel.pool.BuildExecutor 5 net.i2p.router.tunnel.pool.TunnelPool 6 7Provides TunnelBuildExecutor for constructing tunnel build messages 8and processing build replies, and TunnelManager for maintaining 9inbound/outbound tunnel pools. 10""" 11 12from __future__ import annotations 13 14import os 15import random 16import time 17 18from i2p_data.i2np_tunnel import TunnelBuildMessage, TunnelBuildReplyMessage 19from i2p_data.tunnel import TunnelId 20from i2p_tunnel.builder import BuildRecord, BuildReplyRecord, TunnelEntry 21from i2p_tunnel.crypto import BuildRecordEncryptor, BuildReplyDecryptor 22 23 24# Default tunnel lifetime: 10 minutes (600,000 ms), matching I2P Java. 25DEFAULT_TUNNEL_LIFETIME_MS = 600_000 26 27 28class TunnelBuildExecutor: 29 """Build tunnel request messages and process replies. 30 31 For each hop in the tunnel, a BuildRecord is created with random 32 cryptographic keys, encrypted with the hop's ElGamal public key, 33 and assembled into a TunnelBuildMessage (always 8 record slots, 34 unused slots filled with random data). 35 """ 36 37 def build_tunnel( 38 self, 39 hops: list, 40 public_keys: list[bytes], 41 is_inbound: bool, 42 ) -> TunnelBuildMessage: 43 """Create a TunnelBuildMessage for the given hops. 44 45 Parameters 46 ---------- 47 hops: 48 List of HopConfig objects describing each hop in order 49 (gateway to endpoint). 50 public_keys: 51 List of 256-byte ElGamal public keys, one per hop, in the 52 same order as *hops*. 53 is_inbound: 54 True for inbound tunnels, False for outbound. 55 56 Returns 57 ------- 58 TunnelBuildMessage 59 Message containing 8 encrypted records (hop records + 60 random padding records). 61 """ 62 num_hops = len(hops) 63 records: list[bytes] = [] 64 65 for i, hop in enumerate(hops): 66 is_gateway = (i == 0) 67 is_endpoint = (i == num_hops - 1) 68 69 # Determine next hop identity (32 bytes) and tunnel id. 70 if is_endpoint: 71 next_ident = os.urandom(32) 72 next_tunnel_id = 0 73 else: 74 next_hop = hops[i + 1] 75 next_ident = os.urandom(32) # peer hash placeholder 76 next_tunnel_id = int(next_hop.receive_tunnel_id) 77 78 build_rec = BuildRecord( 79 receive_tunnel_id=int(hop.receive_tunnel_id), 80 our_ident=os.urandom(32), 81 next_tunnel_id=next_tunnel_id, 82 next_ident=next_ident, 83 layer_key=hop.layer_key, 84 iv_key=hop.iv_key, 85 reply_key=hop.reply_key, 86 reply_iv=hop.reply_iv, 87 is_gateway=is_gateway, 88 is_endpoint=is_endpoint, 89 ) 90 91 record_bytes = build_rec.to_bytes() 92 encrypted = BuildRecordEncryptor.encrypt_record( 93 record_bytes, public_keys[i] 94 ) 95 records.append(encrypted) 96 97 # Fill remaining slots (up to 8) with random 528-byte records. 98 while len(records) < TunnelBuildMessage.NUM_RECORDS: 99 records.append(os.urandom(TunnelBuildMessage.RECORD_SIZE)) 100 101 # Shuffle so hop positions are not trivially identifiable. 102 # (In a full implementation, the hop index would be tracked 103 # separately. For now we do not shuffle to keep reply 104 # processing straightforward.) 105 106 return TunnelBuildMessage(records) 107 108 def process_reply( 109 self, 110 reply_msg: TunnelBuildReplyMessage, 111 hop_configs: list, 112 ) -> TunnelEntry | None: 113 """Process a TunnelBuildReplyMessage. 114 115 Decrypt each hop's reply record using its reply_key / reply_iv 116 and check whether all hops accepted (status == 0). 117 118 Parameters 119 ---------- 120 reply_msg: 121 The reply message received from the network. 122 hop_configs: 123 The original HopConfig list (same order as build_tunnel). 124 125 Returns 126 ------- 127 TunnelEntry | None 128 A TunnelEntry if all hops accepted; None if any rejected. 129 """ 130 for i, hop in enumerate(hop_configs): 131 encrypted_record = reply_msg.records[i] 132 # The encrypted portion is the first part; strip trailing 133 # zero-padding that was added to reach 528 bytes. 134 # BuildReplyRecord.SIZE is 496; padded to next 16-byte 135 # boundary = 496 (already multiple of 16). 136 reply_size = BuildReplyRecord.SIZE 137 pad_len = (16 - reply_size % 16) % 16 138 decrypt_len = reply_size + pad_len 139 encrypted_data = encrypted_record[:decrypt_len] 140 141 decrypted = BuildReplyDecryptor.decrypt_reply( 142 encrypted_data, hop.reply_key, hop.reply_iv 143 ) 144 145 # First byte is the status. 146 reply_rec = BuildReplyRecord.from_bytes(decrypted) 147 if not reply_rec.is_accepted(): 148 return None 149 150 # All hops accepted — create a TunnelEntry. 151 now_ms = int(time.time() * 1000) 152 tunnel_id_val = int(hop_configs[0].receive_tunnel_id) if hop_configs else random.randint(1, 2**32 - 1) 153 return TunnelEntry( 154 tunnel_id=TunnelId(tunnel_id_val), 155 gateway=os.urandom(32), 156 length=len(hop_configs), 157 creation_time=now_ms, 158 expiration=now_ms + DEFAULT_TUNNEL_LIFETIME_MS, 159 ) 160 161 162class TunnelManager: 163 """Manage pools of inbound and outbound tunnels. 164 165 Tracks active tunnels against configurable targets and provides 166 methods to query pool status and remove expired entries. 167 """ 168 169 def __init__( 170 self, 171 target_inbound: int = 3, 172 target_outbound: int = 3, 173 ): 174 self._target_inbound = target_inbound 175 self._target_outbound = target_outbound 176 self._inbound: list[TunnelEntry] = [] 177 self._outbound: list[TunnelEntry] = [] 178 179 def add_tunnel(self, entry: TunnelEntry, is_inbound: bool) -> None: 180 """Add a tunnel to the appropriate pool.""" 181 if is_inbound: 182 self._inbound.append(entry) 183 else: 184 self._outbound.append(entry) 185 186 def remove_expired(self, now_ms: int) -> None: 187 """Remove tunnels that have passed their expiration time.""" 188 self._inbound = [t for t in self._inbound if not t.is_expired(now_ms)] 189 self._outbound = [t for t in self._outbound if not t.is_expired(now_ms)] 190 191 def inbound_count(self) -> int: 192 """Return the number of active inbound tunnels.""" 193 return len(self._inbound) 194 195 def outbound_count(self) -> int: 196 """Return the number of active outbound tunnels.""" 197 return len(self._outbound) 198 199 def needs_more_inbound(self) -> bool: 200 """True if inbound pool is below target.""" 201 return len(self._inbound) < self._target_inbound 202 203 def needs_more_outbound(self) -> bool: 204 """True if outbound pool is below target.""" 205 return len(self._outbound) < self._target_outbound 206 207 def get_inbound_tunnels(self) -> list[TunnelEntry]: 208 """Return a copy of the inbound tunnel list.""" 209 return list(self._inbound) 210 211 def get_outbound_tunnels(self) -> list[TunnelEntry]: 212 """Return a copy of the outbound tunnel list.""" 213 return list(self._outbound)