A Python port of the Invisible Internet Project (I2P)
at main 138 lines 4.2 kB view raw
1"""I2NP tunnel message types — TunnelData, TunnelGateway, TunnelBuild variants.""" 2 3import struct 4 5from i2p_data.i2np import I2NPMessage 6 7 8class TunnelDataMessage(I2NPMessage): 9 """Type 18: tunnel_id(4) + data(1024 bytes).""" 10 11 TYPE = 18 12 RECORD_SIZE = 1024 13 14 def __init__(self, tunnel_id: int, data: bytes): 15 self.tunnel_id = tunnel_id 16 self.data = data 17 self._header_msg_id = 0 18 self._header_expiration = 0 19 20 def body_bytes(self) -> bytes: 21 return struct.pack("!I", self.tunnel_id) + self.data 22 23 @classmethod 24 def _from_body(cls, body: bytes) -> "TunnelDataMessage": 25 tunnel_id = struct.unpack("!I", body[:4])[0] 26 return cls(tunnel_id, body[4:]) 27 28 29class TunnelGatewayMessage(I2NPMessage): 30 """Type 19: tunnel_id(4) + length(2) + data.""" 31 32 TYPE = 19 33 34 def __init__(self, tunnel_id: int, data: bytes): 35 self.tunnel_id = tunnel_id 36 self.data = data 37 self._header_msg_id = 0 38 self._header_expiration = 0 39 40 def body_bytes(self) -> bytes: 41 return struct.pack("!IH", self.tunnel_id, len(self.data)) + self.data 42 43 @classmethod 44 def _from_body(cls, body: bytes) -> "TunnelGatewayMessage": 45 tunnel_id, length = struct.unpack("!IH", body[:6]) 46 return cls(tunnel_id, body[6:6 + length]) 47 48 49class TunnelBuildMessage(I2NPMessage): 50 """Type 21: 8 records of 528 bytes each.""" 51 52 TYPE = 21 53 NUM_RECORDS = 8 54 RECORD_SIZE = 528 55 56 def __init__(self, records: list[bytes]): 57 if len(records) != self.NUM_RECORDS: 58 raise ValueError(f"TunnelBuildMessage requires exactly {self.NUM_RECORDS} records, got {len(records)}") 59 self.records = records 60 self._header_msg_id = 0 61 self._header_expiration = 0 62 63 def body_bytes(self) -> bytes: 64 return b"".join(self.records) 65 66 @classmethod 67 def _from_body(cls, body: bytes) -> "TunnelBuildMessage": 68 records = [body[i * cls.RECORD_SIZE:(i + 1) * cls.RECORD_SIZE] 69 for i in range(cls.NUM_RECORDS)] 70 return cls(records) 71 72 73class TunnelBuildReplyMessage(I2NPMessage): 74 """Type 22: 8 records of 528 bytes each.""" 75 76 TYPE = 22 77 NUM_RECORDS = 8 78 RECORD_SIZE = 528 79 80 def __init__(self, records: list[bytes]): 81 if len(records) != self.NUM_RECORDS: 82 raise ValueError(f"TunnelBuildReplyMessage requires exactly {self.NUM_RECORDS} records") 83 self.records = records 84 self._header_msg_id = 0 85 self._header_expiration = 0 86 87 def body_bytes(self) -> bytes: 88 return b"".join(self.records) 89 90 @classmethod 91 def _from_body(cls, body: bytes) -> "TunnelBuildReplyMessage": 92 records = [body[i * cls.RECORD_SIZE:(i + 1) * cls.RECORD_SIZE] 93 for i in range(cls.NUM_RECORDS)] 94 return cls(records) 95 96 97class VariableTunnelBuildMessage(I2NPMessage): 98 """Type 23: count(1) + N records of 528 bytes.""" 99 100 TYPE = 23 101 RECORD_SIZE = 528 102 103 def __init__(self, records: list[bytes]): 104 self.records = records 105 self._header_msg_id = 0 106 self._header_expiration = 0 107 108 def body_bytes(self) -> bytes: 109 return struct.pack("!B", len(self.records)) + b"".join(self.records) 110 111 @classmethod 112 def _from_body(cls, body: bytes) -> "VariableTunnelBuildMessage": 113 count = body[0] 114 records = [body[1 + i * cls.RECORD_SIZE:1 + (i + 1) * cls.RECORD_SIZE] 115 for i in range(count)] 116 return cls(records) 117 118 119class VariableTunnelBuildReplyMessage(I2NPMessage): 120 """Type 24: count(1) + N records of 528 bytes.""" 121 122 TYPE = 24 123 RECORD_SIZE = 528 124 125 def __init__(self, records: list[bytes]): 126 self.records = records 127 self._header_msg_id = 0 128 self._header_expiration = 0 129 130 def body_bytes(self) -> bytes: 131 return struct.pack("!B", len(self.records)) + b"".join(self.records) 132 133 @classmethod 134 def _from_body(cls, body: bytes) -> "VariableTunnelBuildReplyMessage": 135 count = body[0] 136 records = [body[1 + i * cls.RECORD_SIZE:1 + (i + 1) * cls.RECORD_SIZE] 137 for i in range(count)] 138 return cls(records)