"""SSU2 payload block definitions and parsing. Ported from net.i2p.router.transport.udp.SSU2Payload. SSU2 payloads consist of a sequence of blocks, each with: - 1 byte: block type - 2 bytes: block length (big-endian) - N bytes: block data """ from __future__ import annotations import enum import struct from collections.abc import Sequence from dataclasses import dataclass, field BLOCK_HEADER_SIZE = 3 # type(1) + length(2) class SSU2BlockType(enum.IntEnum): """All SSU2 payload block types.""" DATETIME = 0 OPTIONS = 1 ROUTER_INFO = 2 I2NP = 3 FIRST_FRAGMENT = 4 FOLLOW_ON_FRAGMENT = 5 ACK = 6 ADDRESS = 7 INTRO_KEY = 8 RELAY_TAG_REQUEST = 9 RELAY_TAG = 10 NEW_TOKEN = 11 PATH_CHALLENGE = 12 PATH_RESPONSE = 13 FIRST_PACKET_NUMBER = 14 CONGESTION = 15 PADDING = 254 TERMINATION = 255 # --------------------------------------------------------------------------- # Base # --------------------------------------------------------------------------- @dataclass class SSU2PayloadBlock: """Base class for all SSU2 payload blocks.""" block_type: SSU2BlockType def to_bytes(self) -> bytes: """Serialize block data (without header).""" raise NotImplementedError def to_block(self) -> bytes: """Serialize complete block with type+length header.""" data = self.to_bytes() return struct.pack("!BH", self.block_type, len(data)) + data @classmethod def from_bytes(cls, data: bytes) -> SSU2PayloadBlock: """Deserialize block data (without header). Subclasses override.""" raise NotImplementedError # --------------------------------------------------------------------------- # Concrete block types # --------------------------------------------------------------------------- @dataclass class DateTimeBlock(SSU2PayloadBlock): """Current time as seconds since epoch.""" timestamp: int = 0 # uint32 block_type: SSU2BlockType = field(default=SSU2BlockType.DATETIME, repr=False) def to_bytes(self) -> bytes: return struct.pack("!I", self.timestamp) @classmethod def from_bytes(cls, data: bytes) -> DateTimeBlock: (ts,) = struct.unpack("!I", data[:4]) return cls(timestamp=ts) @dataclass class OptionsBlock(SSU2PayloadBlock): """Session options negotiation.""" options: bytes = b"" block_type: SSU2BlockType = field(default=SSU2BlockType.OPTIONS, repr=False) def to_bytes(self) -> bytes: return self.options @classmethod def from_bytes(cls, data: bytes) -> OptionsBlock: return cls(options=data) @dataclass class RouterInfoBlock(SSU2PayloadBlock): """RouterInfo payload (possibly gzip-compressed).""" flag: int = 0 # 1 byte: 0=uncompressed, 1=gzipped, 2=flood request router_info_data: bytes = b"" block_type: SSU2BlockType = field(default=SSU2BlockType.ROUTER_INFO, repr=False) def to_bytes(self) -> bytes: return struct.pack("B", self.flag) + self.router_info_data @classmethod def from_bytes(cls, data: bytes) -> RouterInfoBlock: flag = data[0] return cls(flag=flag, router_info_data=data[1:]) @dataclass class I2NPBlock(SSU2PayloadBlock): """Complete I2NP message (fits in one block).""" i2np_data: bytes = b"" block_type: SSU2BlockType = field(default=SSU2BlockType.I2NP, repr=False) def to_bytes(self) -> bytes: return self.i2np_data @classmethod def from_bytes(cls, data: bytes) -> I2NPBlock: return cls(i2np_data=data) @dataclass class FirstFragmentBlock(SSU2PayloadBlock): """First fragment of a large I2NP message.""" msg_id: int = 0 # uint32 total_fragments: int = 0 # uint8 (fragment info byte) fragment_data: bytes = b"" block_type: SSU2BlockType = field(default=SSU2BlockType.FIRST_FRAGMENT, repr=False) def to_bytes(self) -> bytes: return struct.pack("!IB", self.msg_id, self.total_fragments) + self.fragment_data @classmethod def from_bytes(cls, data: bytes) -> FirstFragmentBlock: msg_id, total = struct.unpack("!IB", data[:5]) return cls(msg_id=msg_id, total_fragments=total, fragment_data=data[5:]) @dataclass class FollowOnFragmentBlock(SSU2PayloadBlock): """Subsequent fragment of a large I2NP message.""" msg_id: int = 0 # uint32 fragment_num: int = 0 # uint8 (lower 7 bits) is_last: bool = False # high bit of fragment info byte fragment_data: bytes = b"" block_type: SSU2BlockType = field(default=SSU2BlockType.FOLLOW_ON_FRAGMENT, repr=False) def to_bytes(self) -> bytes: info = self.fragment_num & 0x7F if self.is_last: info |= 0x80 return struct.pack("!IB", self.msg_id, info) + self.fragment_data @classmethod def from_bytes(cls, data: bytes) -> FollowOnFragmentBlock: msg_id, info = struct.unpack("!IB", data[:5]) is_last = bool(info & 0x80) fragment_num = info & 0x7F return cls(msg_id=msg_id, fragment_num=fragment_num, is_last=is_last, fragment_data=data[5:]) @dataclass class AckBlock(SSU2PayloadBlock): """ACK block with through-number and ACK ranges.""" ack_through: int = 0 # uint32: highest acked packet number ack_count: int = 0 # uint8: number of ack ranges ranges: list[tuple[int, int]] = field(default_factory=list) block_type: SSU2BlockType = field(default=SSU2BlockType.ACK, repr=False) def to_bytes(self) -> bytes: buf = struct.pack("!IB", self.ack_through, self.ack_count) for acked, nacked in self.ranges: buf += struct.pack("BB", acked, nacked) return buf @classmethod def from_bytes(cls, data: bytes) -> AckBlock: through, count = struct.unpack("!IB", data[:5]) ranges: list[tuple[int, int]] = [] offset = 5 for _ in range(count): acked = data[offset] nacked = data[offset + 1] ranges.append((acked, nacked)) offset += 2 return cls(ack_through=through, ack_count=count, ranges=ranges) @dataclass class AddressBlock(SSU2PayloadBlock): """Remote peer's observed address.""" ip_address: bytes = b"" # 4 or 16 bytes port: int = 0 block_type: SSU2BlockType = field(default=SSU2BlockType.ADDRESS, repr=False) def to_bytes(self) -> bytes: return self.ip_address + struct.pack("!H", self.port) @classmethod def from_bytes(cls, data: bytes) -> AddressBlock: port = struct.unpack("!H", data[-2:])[0] ip = data[:-2] return cls(ip_address=ip, port=port) @dataclass class IntroKeyBlock(SSU2PayloadBlock): """Introduction key for relay.""" intro_key: bytes = b"" # 32 bytes block_type: SSU2BlockType = field(default=SSU2BlockType.INTRO_KEY, repr=False) def to_bytes(self) -> bytes: return self.intro_key @classmethod def from_bytes(cls, data: bytes) -> IntroKeyBlock: return cls(intro_key=data) @dataclass class RelayTagRequestBlock(SSU2PayloadBlock): """Request for a relay tag (empty body).""" block_type: SSU2BlockType = field(default=SSU2BlockType.RELAY_TAG_REQUEST, repr=False) def to_bytes(self) -> bytes: return b"" @classmethod def from_bytes(cls, data: bytes) -> RelayTagRequestBlock: return cls() @dataclass class RelayTagBlock(SSU2PayloadBlock): """Relay tag assignment.""" relay_tag: int = 0 # uint32 block_type: SSU2BlockType = field(default=SSU2BlockType.RELAY_TAG, repr=False) def to_bytes(self) -> bytes: return struct.pack("!I", self.relay_tag) @classmethod def from_bytes(cls, data: bytes) -> RelayTagBlock: (tag,) = struct.unpack("!I", data[:4]) return cls(relay_tag=tag) @dataclass class NewTokenBlock(SSU2PayloadBlock): """New session token for future connections.""" expires: int = 0 # uint32: seconds since epoch token: int = 0 # uint64 block_type: SSU2BlockType = field(default=SSU2BlockType.NEW_TOKEN, repr=False) def to_bytes(self) -> bytes: return struct.pack("!IQ", self.expires, self.token) @classmethod def from_bytes(cls, data: bytes) -> NewTokenBlock: expires, token = struct.unpack("!IQ", data[:12]) return cls(expires=expires, token=token) @dataclass class PathChallengeBlock(SSU2PayloadBlock): """Path validation challenge.""" challenge_data: bytes = b"" # 8 bytes block_type: SSU2BlockType = field(default=SSU2BlockType.PATH_CHALLENGE, repr=False) def to_bytes(self) -> bytes: return self.challenge_data @classmethod def from_bytes(cls, data: bytes) -> PathChallengeBlock: return cls(challenge_data=data) @dataclass class PathResponseBlock(SSU2PayloadBlock): """Path validation response.""" response_data: bytes = b"" # 8 bytes block_type: SSU2BlockType = field(default=SSU2BlockType.PATH_RESPONSE, repr=False) def to_bytes(self) -> bytes: return self.response_data @classmethod def from_bytes(cls, data: bytes) -> PathResponseBlock: return cls(response_data=data) @dataclass class FirstPacketNumberBlock(SSU2PayloadBlock): """First packet number for the data phase.""" packet_number: int = 0 # uint32 block_type: SSU2BlockType = field(default=SSU2BlockType.FIRST_PACKET_NUMBER, repr=False) def to_bytes(self) -> bytes: return struct.pack("!I", self.packet_number) @classmethod def from_bytes(cls, data: bytes) -> FirstPacketNumberBlock: (pn,) = struct.unpack("!I", data[:4]) return cls(packet_number=pn) @dataclass class CongestionBlock(SSU2PayloadBlock): """Congestion notification.""" congestion_data: bytes = b"" block_type: SSU2BlockType = field(default=SSU2BlockType.CONGESTION, repr=False) def to_bytes(self) -> bytes: return self.congestion_data @classmethod def from_bytes(cls, data: bytes) -> CongestionBlock: return cls(congestion_data=data) @dataclass class PaddingBlock(SSU2PayloadBlock): """Random padding.""" padding: bytes = b"" block_type: SSU2BlockType = field(default=SSU2BlockType.PADDING, repr=False) def to_bytes(self) -> bytes: return self.padding @classmethod def from_bytes(cls, data: bytes) -> PaddingBlock: return cls(padding=data) @dataclass class TerminationBlock(SSU2PayloadBlock): """Session termination.""" reason: int = 0 # uint8 valid_frames_received: int = 0 # uint64 block_type: SSU2BlockType = field(default=SSU2BlockType.TERMINATION, repr=False) def to_bytes(self) -> bytes: return struct.pack("!BQ", self.reason, self.valid_frames_received) @classmethod def from_bytes(cls, data: bytes) -> TerminationBlock: reason, frames = struct.unpack("!BQ", data[:9]) return cls(reason=reason, valid_frames_received=frames) # --------------------------------------------------------------------------- # Block type -> class dispatch table # --------------------------------------------------------------------------- _BLOCK_CLASSES: dict[SSU2BlockType, type[SSU2PayloadBlock]] = { SSU2BlockType.DATETIME: DateTimeBlock, SSU2BlockType.OPTIONS: OptionsBlock, SSU2BlockType.ROUTER_INFO: RouterInfoBlock, SSU2BlockType.I2NP: I2NPBlock, SSU2BlockType.FIRST_FRAGMENT: FirstFragmentBlock, SSU2BlockType.FOLLOW_ON_FRAGMENT: FollowOnFragmentBlock, SSU2BlockType.ACK: AckBlock, SSU2BlockType.ADDRESS: AddressBlock, SSU2BlockType.INTRO_KEY: IntroKeyBlock, SSU2BlockType.RELAY_TAG_REQUEST: RelayTagRequestBlock, SSU2BlockType.RELAY_TAG: RelayTagBlock, SSU2BlockType.NEW_TOKEN: NewTokenBlock, SSU2BlockType.PATH_CHALLENGE: PathChallengeBlock, SSU2BlockType.PATH_RESPONSE: PathResponseBlock, SSU2BlockType.FIRST_PACKET_NUMBER: FirstPacketNumberBlock, SSU2BlockType.CONGESTION: CongestionBlock, SSU2BlockType.PADDING: PaddingBlock, SSU2BlockType.TERMINATION: TerminationBlock, } # --------------------------------------------------------------------------- # Payload-level parse / build # --------------------------------------------------------------------------- def parse_payload(data: bytes) -> list[SSU2PayloadBlock]: """Parse a sequence of SSU2 blocks from payload bytes.""" blocks: list[SSU2PayloadBlock] = [] offset = 0 while offset < len(data): if offset + BLOCK_HEADER_SIZE > len(data): raise ValueError(f"Incomplete block header at offset {offset}") btype = data[offset] length = struct.unpack("!H", data[offset + 1:offset + 3])[0] offset += BLOCK_HEADER_SIZE if offset + length > len(data): raise ValueError( f"Block at offset {offset - BLOCK_HEADER_SIZE} declares " f"length {length} but only {len(data) - offset} bytes remain" ) block_data = data[offset:offset + length] offset += length try: block_type = SSU2BlockType(btype) except ValueError: # Unknown block type -- skip continue cls = _BLOCK_CLASSES.get(block_type) if cls is not None: blocks.append(cls.from_bytes(block_data)) # else: recognized enum but no class -- skip return blocks def build_payload(blocks: Sequence[SSU2PayloadBlock]) -> bytes: """Serialize a list of blocks into payload bytes.""" return b"".join(block.to_block() for block in blocks)