A Python port of the Invisible Internet Project (I2P)
1"""Tunnel build orchestration — executor and manager.
2
3Ported from:
4 net.i2p.router.tunnel.pool.BuildExecutor
5 net.i2p.router.tunnel.pool.TunnelPool
6
7Provides TunnelBuildExecutor for constructing tunnel build messages
8and processing build replies, and TunnelManager for maintaining
9inbound/outbound tunnel pools.
10"""
11
12from __future__ import annotations
13
14import os
15import random
16import time
17
18from i2p_data.i2np_tunnel import TunnelBuildMessage, TunnelBuildReplyMessage
19from i2p_data.tunnel import TunnelId
20from i2p_tunnel.builder import BuildRecord, BuildReplyRecord, TunnelEntry
21from i2p_tunnel.crypto import BuildRecordEncryptor, BuildReplyDecryptor
22
23
24# Default tunnel lifetime: 10 minutes (600,000 ms), matching I2P Java.
25DEFAULT_TUNNEL_LIFETIME_MS = 600_000
26
27
28class TunnelBuildExecutor:
29 """Build tunnel request messages and process replies.
30
31 For each hop in the tunnel, a BuildRecord is created with random
32 cryptographic keys, encrypted with the hop's ElGamal public key,
33 and assembled into a TunnelBuildMessage (always 8 record slots,
34 unused slots filled with random data).
35 """
36
37 def build_tunnel(
38 self,
39 hops: list,
40 public_keys: list[bytes],
41 is_inbound: bool,
42 ) -> TunnelBuildMessage:
43 """Create a TunnelBuildMessage for the given hops.
44
45 Parameters
46 ----------
47 hops:
48 List of HopConfig objects describing each hop in order
49 (gateway to endpoint).
50 public_keys:
51 List of 256-byte ElGamal public keys, one per hop, in the
52 same order as *hops*.
53 is_inbound:
54 True for inbound tunnels, False for outbound.
55
56 Returns
57 -------
58 TunnelBuildMessage
59 Message containing 8 encrypted records (hop records +
60 random padding records).
61 """
62 num_hops = len(hops)
63 records: list[bytes] = []
64
65 for i, hop in enumerate(hops):
66 is_gateway = (i == 0)
67 is_endpoint = (i == num_hops - 1)
68
69 # Determine next hop identity (32 bytes) and tunnel id.
70 if is_endpoint:
71 next_ident = os.urandom(32)
72 next_tunnel_id = 0
73 else:
74 next_hop = hops[i + 1]
75 next_ident = os.urandom(32) # peer hash placeholder
76 next_tunnel_id = int(next_hop.receive_tunnel_id)
77
78 build_rec = BuildRecord(
79 receive_tunnel_id=int(hop.receive_tunnel_id),
80 our_ident=os.urandom(32),
81 next_tunnel_id=next_tunnel_id,
82 next_ident=next_ident,
83 layer_key=hop.layer_key,
84 iv_key=hop.iv_key,
85 reply_key=hop.reply_key,
86 reply_iv=hop.reply_iv,
87 is_gateway=is_gateway,
88 is_endpoint=is_endpoint,
89 )
90
91 record_bytes = build_rec.to_bytes()
92 encrypted = BuildRecordEncryptor.encrypt_record(
93 record_bytes, public_keys[i]
94 )
95 records.append(encrypted)
96
97 # Fill remaining slots (up to 8) with random 528-byte records.
98 while len(records) < TunnelBuildMessage.NUM_RECORDS:
99 records.append(os.urandom(TunnelBuildMessage.RECORD_SIZE))
100
101 # Shuffle so hop positions are not trivially identifiable.
102 # (In a full implementation, the hop index would be tracked
103 # separately. For now we do not shuffle to keep reply
104 # processing straightforward.)
105
106 return TunnelBuildMessage(records)
107
108 def process_reply(
109 self,
110 reply_msg: TunnelBuildReplyMessage,
111 hop_configs: list,
112 ) -> TunnelEntry | None:
113 """Process a TunnelBuildReplyMessage.
114
115 Decrypt each hop's reply record using its reply_key / reply_iv
116 and check whether all hops accepted (status == 0).
117
118 Parameters
119 ----------
120 reply_msg:
121 The reply message received from the network.
122 hop_configs:
123 The original HopConfig list (same order as build_tunnel).
124
125 Returns
126 -------
127 TunnelEntry | None
128 A TunnelEntry if all hops accepted; None if any rejected.
129 """
130 for i, hop in enumerate(hop_configs):
131 encrypted_record = reply_msg.records[i]
132 # The encrypted portion is the first part; strip trailing
133 # zero-padding that was added to reach 528 bytes.
134 # BuildReplyRecord.SIZE is 496; padded to next 16-byte
135 # boundary = 496 (already multiple of 16).
136 reply_size = BuildReplyRecord.SIZE
137 pad_len = (16 - reply_size % 16) % 16
138 decrypt_len = reply_size + pad_len
139 encrypted_data = encrypted_record[:decrypt_len]
140
141 decrypted = BuildReplyDecryptor.decrypt_reply(
142 encrypted_data, hop.reply_key, hop.reply_iv
143 )
144
145 # First byte is the status.
146 reply_rec = BuildReplyRecord.from_bytes(decrypted)
147 if not reply_rec.is_accepted():
148 return None
149
150 # All hops accepted — create a TunnelEntry.
151 now_ms = int(time.time() * 1000)
152 tunnel_id_val = int(hop_configs[0].receive_tunnel_id) if hop_configs else random.randint(1, 2**32 - 1)
153 return TunnelEntry(
154 tunnel_id=TunnelId(tunnel_id_val),
155 gateway=os.urandom(32),
156 length=len(hop_configs),
157 creation_time=now_ms,
158 expiration=now_ms + DEFAULT_TUNNEL_LIFETIME_MS,
159 )
160
161
162class TunnelManager:
163 """Manage pools of inbound and outbound tunnels.
164
165 Tracks active tunnels against configurable targets and provides
166 methods to query pool status and remove expired entries.
167 """
168
169 def __init__(
170 self,
171 target_inbound: int = 3,
172 target_outbound: int = 3,
173 ):
174 self._target_inbound = target_inbound
175 self._target_outbound = target_outbound
176 self._inbound: list[TunnelEntry] = []
177 self._outbound: list[TunnelEntry] = []
178
179 def add_tunnel(self, entry: TunnelEntry, is_inbound: bool) -> None:
180 """Add a tunnel to the appropriate pool."""
181 if is_inbound:
182 self._inbound.append(entry)
183 else:
184 self._outbound.append(entry)
185
186 def remove_expired(self, now_ms: int) -> None:
187 """Remove tunnels that have passed their expiration time."""
188 self._inbound = [t for t in self._inbound if not t.is_expired(now_ms)]
189 self._outbound = [t for t in self._outbound if not t.is_expired(now_ms)]
190
191 def inbound_count(self) -> int:
192 """Return the number of active inbound tunnels."""
193 return len(self._inbound)
194
195 def outbound_count(self) -> int:
196 """Return the number of active outbound tunnels."""
197 return len(self._outbound)
198
199 def needs_more_inbound(self) -> bool:
200 """True if inbound pool is below target."""
201 return len(self._inbound) < self._target_inbound
202
203 def needs_more_outbound(self) -> bool:
204 """True if outbound pool is below target."""
205 return len(self._outbound) < self._target_outbound
206
207 def get_inbound_tunnels(self) -> list[TunnelEntry]:
208 """Return a copy of the inbound tunnel list."""
209 return list(self._inbound)
210
211 def get_outbound_tunnels(self) -> list[TunnelEntry]:
212 """Return a copy of the outbound tunnel list."""
213 return list(self._outbound)