"""SSU2 transport protocol types. Ported from net.i2p.router.transport.udp.SSU2Header and related classes. """ import enum import os import struct from i2p_crypto.chacha20 import ChaCha20 # --------------------------------------------------------------------------- # Header / packet constants # --------------------------------------------------------------------------- SHORT_HEADER_SIZE = 16 # dest_conn_id(8) + pkt_num(4) + type(1) + flags(3) LONG_HEADER_SIZE = 32 # short + version(1) + net_id(1) + src_conn_id(8) + token(8) SESSION_HEADER_SIZE = 64 # long(32) + ephemeral_key(32) MAC_LEN = 16 # ChaCha20-Poly1305 tag KEY_LEN = 32 # X25519 key size HEADER_PROT_SAMPLE_LEN = 24 # Bytes needed for header protection sampling PROTOCOL_VERSION = 2 class SSU2State(enum.Enum): UNKNOWN = 0 TOKEN_REQUEST_SENT = 1 SESSION_REQUEST_SENT = 2 SESSION_CONFIRMED = 3 ESTABLISHED = 4 TERMINATED = 5 class BlockType(enum.IntEnum): DATA = 0 ACK = 1 ADDRESS = 2 RELAY_REQUEST = 3 RELAY_RESPONSE = 4 RELAY_INTRO = 5 PEER_TEST = 6 I2NP = 7 FIRST_FRAGMENT = 8 FOLLOW_ON_FRAGMENT = 9 PADDING = 254 TERMINATION = 255 class SSU2Header: """SSU2 packet header: dest_conn_id(8) + pkt_num(4) + type(1) + version(1) + net_id(2).""" SIZE = 16 def __init__(self, dest_conn_id: int, pkt_num: int, header_type: int = 0, version: int = 2, net_id: int = 2): self.dest_conn_id = dest_conn_id self.pkt_num = pkt_num self.header_type = header_type self.version = version self.net_id = net_id def to_bytes(self) -> bytes: return struct.pack("!QIBBH", self.dest_conn_id, self.pkt_num, self.header_type, self.version, self.net_id) @classmethod def from_bytes(cls, data: bytes) -> "SSU2Header": dest_conn_id, pkt_num, header_type, version, net_id = struct.unpack( "!QIBBH", data[:cls.SIZE]) return cls(dest_conn_id, pkt_num, header_type, version, net_id) class SSU2Block: """SSU2 block: type(1) + length(2) + payload.""" def __init__(self, block_type: BlockType, payload: bytes): self.block_type = block_type self.payload = payload def to_bytes(self) -> bytes: return struct.pack("!BH", self.block_type, len(self.payload)) + self.payload @classmethod def from_bytes(cls, data: bytes) -> "SSU2Block": block_type, length = struct.unpack("!BH", data[:3]) return cls(BlockType(block_type), data[3:3 + length]) @classmethod def from_stream(cls, stream) -> "SSU2Block": header = stream.read(3) block_type, length = struct.unpack("!BH", header) payload = stream.read(length) return cls(BlockType(block_type), payload) class SSU2SessionState: """SSU2 session state machine.""" def __init__(self): self.state = SSU2State.UNKNOWN self.peer_test_active = False self.path_challenge_pending = False self._path_nonce: bytes | None = None def send_token_request(self): self.state = SSU2State.TOKEN_REQUEST_SENT def send_session_request(self): self.state = SSU2State.SESSION_REQUEST_SENT def receive_session_created(self): if self.state != SSU2State.SESSION_REQUEST_SENT: raise RuntimeError(f"Cannot receive session created from {self.state}") self.state = SSU2State.SESSION_CONFIRMED def confirm(self): if self.state != SSU2State.SESSION_CONFIRMED: raise RuntimeError(f"Cannot confirm from {self.state}") self.state = SSU2State.ESTABLISHED def terminate(self): self.state = SSU2State.TERMINATED def start_peer_test(self): self.peer_test_active = True def start_path_validation(self) -> bytes: self._path_nonce = os.urandom(8) self.path_challenge_pending = True return self._path_nonce # --------------------------------------------------------------------------- # Header protection # --------------------------------------------------------------------------- class SSU2HeaderProtection: """ChaCha20-based header protection. Ported from net.i2p.router.transport.udp.SSU2Header. The first 8 bytes of the header are XORed with a ChaCha20 keystream derived from a key and a nonce sampled from the encrypted payload. This prevents on-path observers from reading connection IDs. """ def __init__(self, header_key1: bytes, header_key2: bytes): """Two keys: one for short headers, one for long/handshake headers. Args: header_key1: 32-byte key used for short header protection and the first region of long headers. header_key2: 32-byte key used for the second region of long headers. """ if len(header_key1) < KEY_LEN: raise ValueError(f"header_key1 must be at least {KEY_LEN} bytes") if len(header_key2) < KEY_LEN: raise ValueError(f"header_key2 must be at least {KEY_LEN} bytes") self._key1 = header_key1[:KEY_LEN] self._key2 = header_key2[:KEY_LEN] @staticmethod def _xor_region(packet: bytearray, offset: int, length: int, keystream: bytes) -> None: """XOR a region of *packet* in-place with *keystream*.""" for i in range(length): packet[offset + i] ^= keystream[i] def encrypt_short_header(self, packet: bytearray) -> None: """Encrypt short header in-place. Nonce is sampled from packet[SHORT_HEADER_SIZE:SHORT_HEADER_SIZE+12] (the first 12 bytes of the encrypted body). Keystream = ChaCha20(key1, nonce, counter=0)[:8] XOR first 8 bytes of header with keystream. """ nonce = bytes(packet[SHORT_HEADER_SIZE:SHORT_HEADER_SIZE + 12]) # Use counter=0 by passing a 16-byte nonce with 4-byte zero prefix full_nonce = b"\x00\x00\x00\x00" + nonce keystream = ChaCha20.encrypt(self._key1, full_nonce, b"\x00" * 8) self._xor_region(packet, 0, 8, keystream) def decrypt_short_header(self, packet: bytearray) -> None: """Decrypt short header in-place (same operation as encrypt — XOR is symmetric).""" self.encrypt_short_header(packet) def encrypt_long_header(self, packet: bytearray) -> None: """Encrypt long header in-place. For long headers two regions are protected: - bytes [0:8] XORed with keystream from key1 - bytes [12:20] XORed with keystream from key2 Nonce sampled from packet[LONG_HEADER_SIZE:LONG_HEADER_SIZE+12]. """ nonce = bytes(packet[LONG_HEADER_SIZE:LONG_HEADER_SIZE + 12]) full_nonce = b"\x00\x00\x00\x00" + nonce ks1 = ChaCha20.encrypt(self._key1, full_nonce, b"\x00" * 8) self._xor_region(packet, 0, 8, ks1) ks2 = ChaCha20.encrypt(self._key2, full_nonce, b"\x00" * 8) self._xor_region(packet, 12, 8, ks2) def decrypt_long_header(self, packet: bytearray) -> None: """Decrypt long header in-place (same operation as encrypt).""" self.encrypt_long_header(packet)