"""Tunnel building types — build records, reply records, tunnel pools.""" import random import struct import time from i2p_data.tunnel import TunnelId class BuildRecord: """Tunnel build request record (cleartext portion). Fixed size: 222 bytes (4+32+4+32+32+32+32+16+1+...). """ SIZE = 222 def __init__(self, receive_tunnel_id: int, our_ident: bytes, next_tunnel_id: int, next_ident: bytes, layer_key: bytes, iv_key: bytes, reply_key: bytes, reply_iv: bytes, is_gateway: bool, is_endpoint: bool): self.receive_tunnel_id = receive_tunnel_id self.our_ident = our_ident self.next_tunnel_id = next_tunnel_id self.next_ident = next_ident self.layer_key = layer_key self.iv_key = iv_key self.reply_key = reply_key self.reply_iv = reply_iv self.is_gateway = is_gateway self.is_endpoint = is_endpoint def to_bytes(self) -> bytes: flags = 0 if self.is_gateway: flags |= 0x01 if self.is_endpoint: flags |= 0x02 return (struct.pack("!I", self.receive_tunnel_id) + self.our_ident + struct.pack("!I", self.next_tunnel_id) + self.next_ident + self.layer_key + self.iv_key + self.reply_key + self.reply_iv + struct.pack("!B", flags) + b"\x00" * (self.SIZE - 4 - 32 - 4 - 32 - 32 - 32 - 32 - 16 - 1)) @classmethod def from_bytes(cls, data: bytes) -> "BuildRecord": off = 0 recv_tid = struct.unpack("!I", data[off:off + 4])[0]; off += 4 our_ident = data[off:off + 32]; off += 32 next_tid = struct.unpack("!I", data[off:off + 4])[0]; off += 4 next_ident = data[off:off + 32]; off += 32 layer_key = data[off:off + 32]; off += 32 iv_key = data[off:off + 32]; off += 32 reply_key = data[off:off + 32]; off += 32 reply_iv = data[off:off + 16]; off += 16 flags = data[off] return cls(recv_tid, our_ident, next_tid, next_ident, layer_key, iv_key, reply_key, reply_iv, is_gateway=bool(flags & 0x01), is_endpoint=bool(flags & 0x02)) class BuildReplyRecord: """Tunnel build reply record: status(1) + reply_data(495) = 496 bytes.""" SIZE = 496 def __init__(self, status: int, reply_data: bytes): self.status = status self.reply_data = reply_data def is_accepted(self) -> bool: return self.status == 0 def to_bytes(self) -> bytes: return struct.pack("!B", self.status) + self.reply_data @classmethod def from_bytes(cls, data: bytes) -> "BuildReplyRecord": return cls(data[0], data[1:cls.SIZE]) class TunnelEntry: """An entry in a tunnel pool.""" def __init__(self, tunnel_id: TunnelId, gateway: bytes, length: int, creation_time: int, expiration: int): self.tunnel_id = tunnel_id self.gateway = gateway self.length = length self.creation_time = creation_time self.expiration = expiration def is_expired(self, now_ms: int | None = None) -> bool: if now_ms is None: now_ms = int(time.time() * 1000) return now_ms >= self.expiration class TunnelPool: """Pool of tunnels (inbound or outbound).""" def __init__(self, name: str): self.name = name self._tunnels: list[TunnelEntry] = [] def tunnel_count(self) -> int: return len(self._tunnels) def add(self, entry: TunnelEntry): self._tunnels.append(entry) def get(self, tunnel_id: TunnelId) -> TunnelEntry | None: for t in self._tunnels: if t.tunnel_id == tunnel_id: return t return None def remove_expired(self, now_ms: int | None = None): self._tunnels = [t for t in self._tunnels if not t.is_expired(now_ms)] def select_random(self) -> TunnelEntry | None: if not self._tunnels: return None return random.choice(self._tunnels)