A Python port of the Invisible Internet Project (I2P)
at main 93 lines 3.0 kB view raw
1"""Tunnel data types — TunnelId, HopConfig, TunnelInfo.""" 2 3import struct 4import time 5 6 7class TunnelId: 8 """4-byte unsigned integer tunnel identifier.""" 9 10 def __init__(self, tunnel_id: int): 11 if tunnel_id < 0 or tunnel_id >= 2**32: 12 raise ValueError(f"TunnelId must be 0..2^32-1, got {tunnel_id}") 13 self._id = tunnel_id 14 15 def to_bytes(self) -> bytes: 16 return struct.pack("!I", self._id) 17 18 @classmethod 19 def from_bytes(cls, data: bytes) -> "TunnelId": 20 if len(data) < 4: 21 raise ValueError(f"TunnelId requires 4 bytes, got {len(data)}") 22 return cls(struct.unpack("!I", data[:4])[0]) 23 24 @classmethod 25 def from_stream(cls, stream) -> "TunnelId": 26 data = stream.read(4) 27 if len(data) < 4: 28 raise ValueError("TunnelId requires 4 bytes") 29 return cls.from_bytes(data) 30 31 def is_zero(self) -> bool: 32 return self._id == 0 33 34 def __int__(self) -> int: 35 return self._id 36 37 def __eq__(self, other) -> bool: 38 if not isinstance(other, TunnelId): 39 return NotImplemented 40 return self._id == other._id 41 42 def __hash__(self) -> int: 43 return hash(self._id) 44 45 def __repr__(self) -> str: 46 return f"TunnelId({self._id})" 47 48 49class HopConfig: 50 """Configuration for one hop in a tunnel.""" 51 52 def __init__(self, receive_tunnel_id: TunnelId, send_tunnel_id: TunnelId, 53 receive_key: bytes, send_key: bytes, iv_key: bytes, 54 reply_key: bytes, reply_iv: bytes, layer_key: bytes): 55 self.receive_tunnel_id = receive_tunnel_id 56 self.send_tunnel_id = send_tunnel_id 57 self.receive_key = receive_key 58 self.send_key = send_key 59 self.iv_key = iv_key 60 self.reply_key = reply_key 61 self.reply_iv = reply_iv 62 self.layer_key = layer_key 63 64 65class TunnelInfo: 66 """Metadata about a built tunnel.""" 67 68 def __init__(self, tunnel_id: TunnelId, gateway: bytes, length: int, 69 creation_time: int, expiration: int): 70 if len(gateway) != 32: 71 raise ValueError(f"Gateway hash must be 32 bytes, got {len(gateway)}") 72 self.tunnel_id = tunnel_id 73 self.gateway = gateway 74 self.length = length 75 self.creation_time = creation_time 76 self.expiration = expiration 77 78 def is_expired(self, now_ms: int | None = None) -> bool: 79 if now_ms is None: 80 now_ms = int(time.time() * 1000) 81 return now_ms >= self.expiration 82 83 def __eq__(self, other) -> bool: 84 if not isinstance(other, TunnelInfo): 85 return NotImplemented 86 return (self.tunnel_id == other.tunnel_id and 87 self.gateway == other.gateway and 88 self.length == other.length and 89 self.creation_time == other.creation_time and 90 self.expiration == other.expiration) 91 92 def __repr__(self) -> str: 93 return f"TunnelInfo(id={self.tunnel_id}, length={self.length})"