A Python port of the Invisible Internet Project (I2P)
1"""I2NP tunnel message types — TunnelData, TunnelGateway, TunnelBuild variants."""
2
3import struct
4
5from i2p_data.i2np import I2NPMessage
6
7
8class TunnelDataMessage(I2NPMessage):
9 """Type 18: tunnel_id(4) + data(1024 bytes)."""
10
11 TYPE = 18
12 RECORD_SIZE = 1024
13
14 def __init__(self, tunnel_id: int, data: bytes):
15 self.tunnel_id = tunnel_id
16 self.data = data
17 self._header_msg_id = 0
18 self._header_expiration = 0
19
20 def body_bytes(self) -> bytes:
21 return struct.pack("!I", self.tunnel_id) + self.data
22
23 @classmethod
24 def _from_body(cls, body: bytes) -> "TunnelDataMessage":
25 tunnel_id = struct.unpack("!I", body[:4])[0]
26 return cls(tunnel_id, body[4:])
27
28
29class TunnelGatewayMessage(I2NPMessage):
30 """Type 19: tunnel_id(4) + length(2) + data."""
31
32 TYPE = 19
33
34 def __init__(self, tunnel_id: int, data: bytes):
35 self.tunnel_id = tunnel_id
36 self.data = data
37 self._header_msg_id = 0
38 self._header_expiration = 0
39
40 def body_bytes(self) -> bytes:
41 return struct.pack("!IH", self.tunnel_id, len(self.data)) + self.data
42
43 @classmethod
44 def _from_body(cls, body: bytes) -> "TunnelGatewayMessage":
45 tunnel_id, length = struct.unpack("!IH", body[:6])
46 return cls(tunnel_id, body[6:6 + length])
47
48
49class TunnelBuildMessage(I2NPMessage):
50 """Type 21: 8 records of 528 bytes each."""
51
52 TYPE = 21
53 NUM_RECORDS = 8
54 RECORD_SIZE = 528
55
56 def __init__(self, records: list[bytes]):
57 if len(records) != self.NUM_RECORDS:
58 raise ValueError(f"TunnelBuildMessage requires exactly {self.NUM_RECORDS} records, got {len(records)}")
59 self.records = records
60 self._header_msg_id = 0
61 self._header_expiration = 0
62
63 def body_bytes(self) -> bytes:
64 return b"".join(self.records)
65
66 @classmethod
67 def _from_body(cls, body: bytes) -> "TunnelBuildMessage":
68 records = [body[i * cls.RECORD_SIZE:(i + 1) * cls.RECORD_SIZE]
69 for i in range(cls.NUM_RECORDS)]
70 return cls(records)
71
72
73class TunnelBuildReplyMessage(I2NPMessage):
74 """Type 22: 8 records of 528 bytes each."""
75
76 TYPE = 22
77 NUM_RECORDS = 8
78 RECORD_SIZE = 528
79
80 def __init__(self, records: list[bytes]):
81 if len(records) != self.NUM_RECORDS:
82 raise ValueError(f"TunnelBuildReplyMessage requires exactly {self.NUM_RECORDS} records")
83 self.records = records
84 self._header_msg_id = 0
85 self._header_expiration = 0
86
87 def body_bytes(self) -> bytes:
88 return b"".join(self.records)
89
90 @classmethod
91 def _from_body(cls, body: bytes) -> "TunnelBuildReplyMessage":
92 records = [body[i * cls.RECORD_SIZE:(i + 1) * cls.RECORD_SIZE]
93 for i in range(cls.NUM_RECORDS)]
94 return cls(records)
95
96
97class VariableTunnelBuildMessage(I2NPMessage):
98 """Type 23: count(1) + N records of 528 bytes."""
99
100 TYPE = 23
101 RECORD_SIZE = 528
102
103 def __init__(self, records: list[bytes]):
104 self.records = records
105 self._header_msg_id = 0
106 self._header_expiration = 0
107
108 def body_bytes(self) -> bytes:
109 return struct.pack("!B", len(self.records)) + b"".join(self.records)
110
111 @classmethod
112 def _from_body(cls, body: bytes) -> "VariableTunnelBuildMessage":
113 count = body[0]
114 records = [body[1 + i * cls.RECORD_SIZE:1 + (i + 1) * cls.RECORD_SIZE]
115 for i in range(count)]
116 return cls(records)
117
118
119class VariableTunnelBuildReplyMessage(I2NPMessage):
120 """Type 24: count(1) + N records of 528 bytes."""
121
122 TYPE = 24
123 RECORD_SIZE = 528
124
125 def __init__(self, records: list[bytes]):
126 self.records = records
127 self._header_msg_id = 0
128 self._header_expiration = 0
129
130 def body_bytes(self) -> bytes:
131 return struct.pack("!B", len(self.records)) + b"".join(self.records)
132
133 @classmethod
134 def _from_body(cls, body: bytes) -> "VariableTunnelBuildReplyMessage":
135 count = body[0]
136 records = [body[1 + i * cls.RECORD_SIZE:1 + (i + 1) * cls.RECORD_SIZE]
137 for i in range(count)]
138 return cls(records)