"""NTCP2 payload block codec. Implements encoding and decoding of NTCP2 payload blocks as defined in the I2P NTCP2 specification. Each block consists of a 1-byte type, 2-byte big-endian length, and variable-length data. Multiple blocks are concatenated within a single decrypted frame payload. Also provides handshake option encoders/decoders for msg1 and msg2. """ import os import struct from dataclasses import dataclass # Block type constants BLOCK_DATETIME: int = 0 BLOCK_OPTIONS: int = 1 BLOCK_ROUTERINFO: int = 2 BLOCK_I2NP: int = 3 BLOCK_TERMINATION: int = 4 BLOCK_PADDING: int = 254 @dataclass class NTCP2Block: """A single NTCP2 payload block.""" block_type: int data: bytes def encode_blocks(blocks: list[NTCP2Block]) -> bytes: """Encode a list of NTCP2 blocks into concatenated wire bytes. Each block: 1 byte type + 2 bytes big-endian length + data. """ parts = [] for block in blocks: parts.append(struct.pack("!BH", block.block_type, len(block.data))) parts.append(block.data) return b"".join(parts) def decode_blocks(data: bytes) -> list[NTCP2Block]: """Decode concatenated wire bytes into a list of NTCP2 blocks. Reads type (1 byte), length (2 bytes BE), data (length bytes), repeating until all data is consumed. """ blocks = [] offset = 0 while offset < len(data): if offset + 3 > len(data): raise ValueError(f"Incomplete block header at offset {offset}") block_type = data[offset] length = struct.unpack("!H", data[offset + 1:offset + 3])[0] offset += 3 if offset + length > len(data): raise ValueError( f"Block at offset {offset - 3} declares length {length} " f"but only {len(data) - offset} bytes remain" ) block_data = data[offset:offset + length] offset += length blocks.append(NTCP2Block(block_type=block_type, data=block_data)) return blocks # --- Handshake option encoders/decoders --- def encode_msg1_options( network_id: int, version: int, padlen1: int, msg3p2len: int, timestamp: int ) -> bytes: """Encode msg1 options into 16 bytes. Layout: 0 1 network_id 1 1 version 2-3 2 padlen1 (BE) 4-5 2 msg3p2len (BE) 6-7 2 reserved (0) 8-11 4 timestamp seconds (BE) 12-15 4 reserved (0) """ return struct.pack( "!BBHHHI I", network_id, version, padlen1, msg3p2len, 0, timestamp, 0 ) def decode_msg1_options(data: bytes) -> dict: """Decode 16 bytes of msg1 options into a dict.""" if len(data) != 16: raise ValueError(f"msg1 options must be 16 bytes, got {len(data)}") network_id, version, padlen1, msg3p2len, _, timestamp, _ = struct.unpack( "!BBHHHI I", data ) return { "network_id": network_id, "version": version, "padlen1": padlen1, "msg3p2len": msg3p2len, "timestamp": timestamp, } def encode_msg2_options(padlen2: int, timestamp: int) -> bytes: """Encode msg2 options into 16 bytes. Layout: 0-1 2 reserved (0) 2-3 2 padlen2 (BE) 4-7 4 reserved (0) 8-11 4 timestamp seconds (BE) 12-15 4 reserved (0) """ return struct.pack("!HHI I I", 0, padlen2, 0, timestamp, 0) def decode_msg2_options(data: bytes) -> dict: """Decode 16 bytes of msg2 options into a dict.""" if len(data) != 16: raise ValueError(f"msg2 options must be 16 bytes, got {len(data)}") _, padlen2, _, timestamp, _ = struct.unpack("!HHI I I", data) return { "padlen2": padlen2, "timestamp": timestamp, } # --- Helper constructors --- def datetime_block(timestamp_seconds: int) -> NTCP2Block: """Create a DateTime block (type 0) with a 4-byte BE timestamp.""" return NTCP2Block( block_type=BLOCK_DATETIME, data=struct.pack("!I", timestamp_seconds), ) def i2np_block(message_bytes: bytes) -> NTCP2Block: """Create an I2NP message block (type 3).""" return NTCP2Block(block_type=BLOCK_I2NP, data=message_bytes) def padding_block(size: int) -> NTCP2Block: """Create a padding block (type 254) with `size` random bytes.""" return NTCP2Block(block_type=BLOCK_PADDING, data=os.urandom(size)) def termination_block(frames_received: int, reason: int) -> NTCP2Block: """Create a termination block (type 4). Data: 8 bytes LE frames_received + 1 byte reason code. """ data = struct.pack(" NTCP2Block: """Create an Options block (type 1). For msg3, the Options block carries 12 bytes of padding/delay negotiation parameters. If no data is provided, 12 zero bytes are used (accepting defaults). """ if not data: data = b"\x00" * 12 # 12 bytes: min/max padding, dummy, delay defaults return NTCP2Block(block_type=BLOCK_OPTIONS, data=data) def router_info_block(ri_bytes: bytes, flag: int = 0) -> NTCP2Block: """Create a RouterInfo block (type 2). Data: 1 flag byte + RouterInfo bytes. """ return NTCP2Block( block_type=BLOCK_ROUTERINFO, data=struct.pack("B", flag) + ri_bytes, )