A Python port of the Invisible Internet Project (I2P)
1"""SSU2 payload block definitions and parsing.
2
3Ported from net.i2p.router.transport.udp.SSU2Payload.
4
5SSU2 payloads consist of a sequence of blocks, each with:
6 - 1 byte: block type
7 - 2 bytes: block length (big-endian)
8 - N bytes: block data
9"""
10
11from __future__ import annotations
12
13import enum
14import struct
15from collections.abc import Sequence
16from dataclasses import dataclass, field
17
18BLOCK_HEADER_SIZE = 3 # type(1) + length(2)
19
20
21class SSU2BlockType(enum.IntEnum):
22 """All SSU2 payload block types."""
23 DATETIME = 0
24 OPTIONS = 1
25 ROUTER_INFO = 2
26 I2NP = 3
27 FIRST_FRAGMENT = 4
28 FOLLOW_ON_FRAGMENT = 5
29 ACK = 6
30 ADDRESS = 7
31 INTRO_KEY = 8
32 RELAY_TAG_REQUEST = 9
33 RELAY_TAG = 10
34 NEW_TOKEN = 11
35 PATH_CHALLENGE = 12
36 PATH_RESPONSE = 13
37 FIRST_PACKET_NUMBER = 14
38 CONGESTION = 15
39 PADDING = 254
40 TERMINATION = 255
41
42
43# ---------------------------------------------------------------------------
44# Base
45# ---------------------------------------------------------------------------
46
47@dataclass
48class SSU2PayloadBlock:
49 """Base class for all SSU2 payload blocks."""
50 block_type: SSU2BlockType
51
52 def to_bytes(self) -> bytes:
53 """Serialize block data (without header)."""
54 raise NotImplementedError
55
56 def to_block(self) -> bytes:
57 """Serialize complete block with type+length header."""
58 data = self.to_bytes()
59 return struct.pack("!BH", self.block_type, len(data)) + data
60
61 @classmethod
62 def from_bytes(cls, data: bytes) -> SSU2PayloadBlock:
63 """Deserialize block data (without header). Subclasses override."""
64 raise NotImplementedError
65
66
67# ---------------------------------------------------------------------------
68# Concrete block types
69# ---------------------------------------------------------------------------
70
71@dataclass
72class DateTimeBlock(SSU2PayloadBlock):
73 """Current time as seconds since epoch."""
74 timestamp: int = 0 # uint32
75 block_type: SSU2BlockType = field(default=SSU2BlockType.DATETIME, repr=False)
76
77 def to_bytes(self) -> bytes:
78 return struct.pack("!I", self.timestamp)
79
80 @classmethod
81 def from_bytes(cls, data: bytes) -> DateTimeBlock:
82 (ts,) = struct.unpack("!I", data[:4])
83 return cls(timestamp=ts)
84
85
86@dataclass
87class OptionsBlock(SSU2PayloadBlock):
88 """Session options negotiation."""
89 options: bytes = b""
90 block_type: SSU2BlockType = field(default=SSU2BlockType.OPTIONS, repr=False)
91
92 def to_bytes(self) -> bytes:
93 return self.options
94
95 @classmethod
96 def from_bytes(cls, data: bytes) -> OptionsBlock:
97 return cls(options=data)
98
99
100@dataclass
101class RouterInfoBlock(SSU2PayloadBlock):
102 """RouterInfo payload (possibly gzip-compressed)."""
103 flag: int = 0 # 1 byte: 0=uncompressed, 1=gzipped, 2=flood request
104 router_info_data: bytes = b""
105 block_type: SSU2BlockType = field(default=SSU2BlockType.ROUTER_INFO, repr=False)
106
107 def to_bytes(self) -> bytes:
108 return struct.pack("B", self.flag) + self.router_info_data
109
110 @classmethod
111 def from_bytes(cls, data: bytes) -> RouterInfoBlock:
112 flag = data[0]
113 return cls(flag=flag, router_info_data=data[1:])
114
115
116@dataclass
117class I2NPBlock(SSU2PayloadBlock):
118 """Complete I2NP message (fits in one block)."""
119 i2np_data: bytes = b""
120 block_type: SSU2BlockType = field(default=SSU2BlockType.I2NP, repr=False)
121
122 def to_bytes(self) -> bytes:
123 return self.i2np_data
124
125 @classmethod
126 def from_bytes(cls, data: bytes) -> I2NPBlock:
127 return cls(i2np_data=data)
128
129
130@dataclass
131class FirstFragmentBlock(SSU2PayloadBlock):
132 """First fragment of a large I2NP message."""
133 msg_id: int = 0 # uint32
134 total_fragments: int = 0 # uint8 (fragment info byte)
135 fragment_data: bytes = b""
136 block_type: SSU2BlockType = field(default=SSU2BlockType.FIRST_FRAGMENT, repr=False)
137
138 def to_bytes(self) -> bytes:
139 return struct.pack("!IB", self.msg_id, self.total_fragments) + self.fragment_data
140
141 @classmethod
142 def from_bytes(cls, data: bytes) -> FirstFragmentBlock:
143 msg_id, total = struct.unpack("!IB", data[:5])
144 return cls(msg_id=msg_id, total_fragments=total, fragment_data=data[5:])
145
146
147@dataclass
148class FollowOnFragmentBlock(SSU2PayloadBlock):
149 """Subsequent fragment of a large I2NP message."""
150 msg_id: int = 0 # uint32
151 fragment_num: int = 0 # uint8 (lower 7 bits)
152 is_last: bool = False # high bit of fragment info byte
153 fragment_data: bytes = b""
154 block_type: SSU2BlockType = field(default=SSU2BlockType.FOLLOW_ON_FRAGMENT, repr=False)
155
156 def to_bytes(self) -> bytes:
157 info = self.fragment_num & 0x7F
158 if self.is_last:
159 info |= 0x80
160 return struct.pack("!IB", self.msg_id, info) + self.fragment_data
161
162 @classmethod
163 def from_bytes(cls, data: bytes) -> FollowOnFragmentBlock:
164 msg_id, info = struct.unpack("!IB", data[:5])
165 is_last = bool(info & 0x80)
166 fragment_num = info & 0x7F
167 return cls(msg_id=msg_id, fragment_num=fragment_num,
168 is_last=is_last, fragment_data=data[5:])
169
170
171@dataclass
172class AckBlock(SSU2PayloadBlock):
173 """ACK block with through-number and ACK ranges."""
174 ack_through: int = 0 # uint32: highest acked packet number
175 ack_count: int = 0 # uint8: number of ack ranges
176 ranges: list[tuple[int, int]] = field(default_factory=list)
177 block_type: SSU2BlockType = field(default=SSU2BlockType.ACK, repr=False)
178
179 def to_bytes(self) -> bytes:
180 buf = struct.pack("!IB", self.ack_through, self.ack_count)
181 for acked, nacked in self.ranges:
182 buf += struct.pack("BB", acked, nacked)
183 return buf
184
185 @classmethod
186 def from_bytes(cls, data: bytes) -> AckBlock:
187 through, count = struct.unpack("!IB", data[:5])
188 ranges: list[tuple[int, int]] = []
189 offset = 5
190 for _ in range(count):
191 acked = data[offset]
192 nacked = data[offset + 1]
193 ranges.append((acked, nacked))
194 offset += 2
195 return cls(ack_through=through, ack_count=count, ranges=ranges)
196
197
198@dataclass
199class AddressBlock(SSU2PayloadBlock):
200 """Remote peer's observed address."""
201 ip_address: bytes = b"" # 4 or 16 bytes
202 port: int = 0
203 block_type: SSU2BlockType = field(default=SSU2BlockType.ADDRESS, repr=False)
204
205 def to_bytes(self) -> bytes:
206 return self.ip_address + struct.pack("!H", self.port)
207
208 @classmethod
209 def from_bytes(cls, data: bytes) -> AddressBlock:
210 port = struct.unpack("!H", data[-2:])[0]
211 ip = data[:-2]
212 return cls(ip_address=ip, port=port)
213
214
215@dataclass
216class IntroKeyBlock(SSU2PayloadBlock):
217 """Introduction key for relay."""
218 intro_key: bytes = b"" # 32 bytes
219 block_type: SSU2BlockType = field(default=SSU2BlockType.INTRO_KEY, repr=False)
220
221 def to_bytes(self) -> bytes:
222 return self.intro_key
223
224 @classmethod
225 def from_bytes(cls, data: bytes) -> IntroKeyBlock:
226 return cls(intro_key=data)
227
228
229@dataclass
230class RelayTagRequestBlock(SSU2PayloadBlock):
231 """Request for a relay tag (empty body)."""
232 block_type: SSU2BlockType = field(default=SSU2BlockType.RELAY_TAG_REQUEST, repr=False)
233
234 def to_bytes(self) -> bytes:
235 return b""
236
237 @classmethod
238 def from_bytes(cls, data: bytes) -> RelayTagRequestBlock:
239 return cls()
240
241
242@dataclass
243class RelayTagBlock(SSU2PayloadBlock):
244 """Relay tag assignment."""
245 relay_tag: int = 0 # uint32
246 block_type: SSU2BlockType = field(default=SSU2BlockType.RELAY_TAG, repr=False)
247
248 def to_bytes(self) -> bytes:
249 return struct.pack("!I", self.relay_tag)
250
251 @classmethod
252 def from_bytes(cls, data: bytes) -> RelayTagBlock:
253 (tag,) = struct.unpack("!I", data[:4])
254 return cls(relay_tag=tag)
255
256
257@dataclass
258class NewTokenBlock(SSU2PayloadBlock):
259 """New session token for future connections."""
260 expires: int = 0 # uint32: seconds since epoch
261 token: int = 0 # uint64
262 block_type: SSU2BlockType = field(default=SSU2BlockType.NEW_TOKEN, repr=False)
263
264 def to_bytes(self) -> bytes:
265 return struct.pack("!IQ", self.expires, self.token)
266
267 @classmethod
268 def from_bytes(cls, data: bytes) -> NewTokenBlock:
269 expires, token = struct.unpack("!IQ", data[:12])
270 return cls(expires=expires, token=token)
271
272
273@dataclass
274class PathChallengeBlock(SSU2PayloadBlock):
275 """Path validation challenge."""
276 challenge_data: bytes = b"" # 8 bytes
277 block_type: SSU2BlockType = field(default=SSU2BlockType.PATH_CHALLENGE, repr=False)
278
279 def to_bytes(self) -> bytes:
280 return self.challenge_data
281
282 @classmethod
283 def from_bytes(cls, data: bytes) -> PathChallengeBlock:
284 return cls(challenge_data=data)
285
286
287@dataclass
288class PathResponseBlock(SSU2PayloadBlock):
289 """Path validation response."""
290 response_data: bytes = b"" # 8 bytes
291 block_type: SSU2BlockType = field(default=SSU2BlockType.PATH_RESPONSE, repr=False)
292
293 def to_bytes(self) -> bytes:
294 return self.response_data
295
296 @classmethod
297 def from_bytes(cls, data: bytes) -> PathResponseBlock:
298 return cls(response_data=data)
299
300
301@dataclass
302class FirstPacketNumberBlock(SSU2PayloadBlock):
303 """First packet number for the data phase."""
304 packet_number: int = 0 # uint32
305 block_type: SSU2BlockType = field(default=SSU2BlockType.FIRST_PACKET_NUMBER, repr=False)
306
307 def to_bytes(self) -> bytes:
308 return struct.pack("!I", self.packet_number)
309
310 @classmethod
311 def from_bytes(cls, data: bytes) -> FirstPacketNumberBlock:
312 (pn,) = struct.unpack("!I", data[:4])
313 return cls(packet_number=pn)
314
315
316@dataclass
317class CongestionBlock(SSU2PayloadBlock):
318 """Congestion notification."""
319 congestion_data: bytes = b""
320 block_type: SSU2BlockType = field(default=SSU2BlockType.CONGESTION, repr=False)
321
322 def to_bytes(self) -> bytes:
323 return self.congestion_data
324
325 @classmethod
326 def from_bytes(cls, data: bytes) -> CongestionBlock:
327 return cls(congestion_data=data)
328
329
330@dataclass
331class PaddingBlock(SSU2PayloadBlock):
332 """Random padding."""
333 padding: bytes = b""
334 block_type: SSU2BlockType = field(default=SSU2BlockType.PADDING, repr=False)
335
336 def to_bytes(self) -> bytes:
337 return self.padding
338
339 @classmethod
340 def from_bytes(cls, data: bytes) -> PaddingBlock:
341 return cls(padding=data)
342
343
344@dataclass
345class TerminationBlock(SSU2PayloadBlock):
346 """Session termination."""
347 reason: int = 0 # uint8
348 valid_frames_received: int = 0 # uint64
349 block_type: SSU2BlockType = field(default=SSU2BlockType.TERMINATION, repr=False)
350
351 def to_bytes(self) -> bytes:
352 return struct.pack("!BQ", self.reason, self.valid_frames_received)
353
354 @classmethod
355 def from_bytes(cls, data: bytes) -> TerminationBlock:
356 reason, frames = struct.unpack("!BQ", data[:9])
357 return cls(reason=reason, valid_frames_received=frames)
358
359
360# ---------------------------------------------------------------------------
361# Block type -> class dispatch table
362# ---------------------------------------------------------------------------
363
364_BLOCK_CLASSES: dict[SSU2BlockType, type[SSU2PayloadBlock]] = {
365 SSU2BlockType.DATETIME: DateTimeBlock,
366 SSU2BlockType.OPTIONS: OptionsBlock,
367 SSU2BlockType.ROUTER_INFO: RouterInfoBlock,
368 SSU2BlockType.I2NP: I2NPBlock,
369 SSU2BlockType.FIRST_FRAGMENT: FirstFragmentBlock,
370 SSU2BlockType.FOLLOW_ON_FRAGMENT: FollowOnFragmentBlock,
371 SSU2BlockType.ACK: AckBlock,
372 SSU2BlockType.ADDRESS: AddressBlock,
373 SSU2BlockType.INTRO_KEY: IntroKeyBlock,
374 SSU2BlockType.RELAY_TAG_REQUEST: RelayTagRequestBlock,
375 SSU2BlockType.RELAY_TAG: RelayTagBlock,
376 SSU2BlockType.NEW_TOKEN: NewTokenBlock,
377 SSU2BlockType.PATH_CHALLENGE: PathChallengeBlock,
378 SSU2BlockType.PATH_RESPONSE: PathResponseBlock,
379 SSU2BlockType.FIRST_PACKET_NUMBER: FirstPacketNumberBlock,
380 SSU2BlockType.CONGESTION: CongestionBlock,
381 SSU2BlockType.PADDING: PaddingBlock,
382 SSU2BlockType.TERMINATION: TerminationBlock,
383}
384
385
386# ---------------------------------------------------------------------------
387# Payload-level parse / build
388# ---------------------------------------------------------------------------
389
390def parse_payload(data: bytes) -> list[SSU2PayloadBlock]:
391 """Parse a sequence of SSU2 blocks from payload bytes."""
392 blocks: list[SSU2PayloadBlock] = []
393 offset = 0
394 while offset < len(data):
395 if offset + BLOCK_HEADER_SIZE > len(data):
396 raise ValueError(f"Incomplete block header at offset {offset}")
397 btype = data[offset]
398 length = struct.unpack("!H", data[offset + 1:offset + 3])[0]
399 offset += BLOCK_HEADER_SIZE
400 if offset + length > len(data):
401 raise ValueError(
402 f"Block at offset {offset - BLOCK_HEADER_SIZE} declares "
403 f"length {length} but only {len(data) - offset} bytes remain"
404 )
405 block_data = data[offset:offset + length]
406 offset += length
407
408 try:
409 block_type = SSU2BlockType(btype)
410 except ValueError:
411 # Unknown block type -- skip
412 continue
413
414 cls = _BLOCK_CLASSES.get(block_type)
415 if cls is not None:
416 blocks.append(cls.from_bytes(block_data))
417 # else: recognized enum but no class -- skip
418
419 return blocks
420
421
422def build_payload(blocks: Sequence[SSU2PayloadBlock]) -> bytes:
423 """Serialize a list of blocks into payload bytes."""
424 return b"".join(block.to_block() for block in blocks)