A Python port of the Invisible Internet Project (I2P)
at main 180 lines 5.5 kB view raw
1"""NTCP2 payload block codec. 2 3Implements encoding and decoding of NTCP2 payload blocks as defined in 4the I2P NTCP2 specification. Each block consists of a 1-byte type, 52-byte big-endian length, and variable-length data. Multiple blocks 6are concatenated within a single decrypted frame payload. 7 8Also provides handshake option encoders/decoders for msg1 and msg2. 9""" 10 11import os 12import struct 13from dataclasses import dataclass 14 15# Block type constants 16BLOCK_DATETIME: int = 0 17BLOCK_OPTIONS: int = 1 18BLOCK_ROUTERINFO: int = 2 19BLOCK_I2NP: int = 3 20BLOCK_TERMINATION: int = 4 21BLOCK_PADDING: int = 254 22 23 24@dataclass 25class NTCP2Block: 26 """A single NTCP2 payload block.""" 27 28 block_type: int 29 data: bytes 30 31 32def encode_blocks(blocks: list[NTCP2Block]) -> bytes: 33 """Encode a list of NTCP2 blocks into concatenated wire bytes. 34 35 Each block: 1 byte type + 2 bytes big-endian length + data. 36 """ 37 parts = [] 38 for block in blocks: 39 parts.append(struct.pack("!BH", block.block_type, len(block.data))) 40 parts.append(block.data) 41 return b"".join(parts) 42 43 44def decode_blocks(data: bytes) -> list[NTCP2Block]: 45 """Decode concatenated wire bytes into a list of NTCP2 blocks. 46 47 Reads type (1 byte), length (2 bytes BE), data (length bytes), 48 repeating until all data is consumed. 49 """ 50 blocks = [] 51 offset = 0 52 while offset < len(data): 53 if offset + 3 > len(data): 54 raise ValueError(f"Incomplete block header at offset {offset}") 55 block_type = data[offset] 56 length = struct.unpack("!H", data[offset + 1:offset + 3])[0] 57 offset += 3 58 if offset + length > len(data): 59 raise ValueError( 60 f"Block at offset {offset - 3} declares length {length} " 61 f"but only {len(data) - offset} bytes remain" 62 ) 63 block_data = data[offset:offset + length] 64 offset += length 65 blocks.append(NTCP2Block(block_type=block_type, data=block_data)) 66 return blocks 67 68 69# --- Handshake option encoders/decoders --- 70 71def encode_msg1_options( 72 network_id: int, version: int, padlen1: int, msg3p2len: int, timestamp: int 73) -> bytes: 74 """Encode msg1 options into 16 bytes. 75 76 Layout: 77 0 1 network_id 78 1 1 version 79 2-3 2 padlen1 (BE) 80 4-5 2 msg3p2len (BE) 81 6-7 2 reserved (0) 82 8-11 4 timestamp seconds (BE) 83 12-15 4 reserved (0) 84 """ 85 return struct.pack( 86 "!BBHHHI I", 87 network_id, version, padlen1, msg3p2len, 0, timestamp, 0 88 ) 89 90 91def decode_msg1_options(data: bytes) -> dict: 92 """Decode 16 bytes of msg1 options into a dict.""" 93 if len(data) != 16: 94 raise ValueError(f"msg1 options must be 16 bytes, got {len(data)}") 95 network_id, version, padlen1, msg3p2len, _, timestamp, _ = struct.unpack( 96 "!BBHHHI I", data 97 ) 98 return { 99 "network_id": network_id, 100 "version": version, 101 "padlen1": padlen1, 102 "msg3p2len": msg3p2len, 103 "timestamp": timestamp, 104 } 105 106 107def encode_msg2_options(padlen2: int, timestamp: int) -> bytes: 108 """Encode msg2 options into 16 bytes. 109 110 Layout: 111 0-1 2 reserved (0) 112 2-3 2 padlen2 (BE) 113 4-7 4 reserved (0) 114 8-11 4 timestamp seconds (BE) 115 12-15 4 reserved (0) 116 """ 117 return struct.pack("!HHI I I", 0, padlen2, 0, timestamp, 0) 118 119 120def decode_msg2_options(data: bytes) -> dict: 121 """Decode 16 bytes of msg2 options into a dict.""" 122 if len(data) != 16: 123 raise ValueError(f"msg2 options must be 16 bytes, got {len(data)}") 124 _, padlen2, _, timestamp, _ = struct.unpack("!HHI I I", data) 125 return { 126 "padlen2": padlen2, 127 "timestamp": timestamp, 128 } 129 130 131# --- Helper constructors --- 132 133def datetime_block(timestamp_seconds: int) -> NTCP2Block: 134 """Create a DateTime block (type 0) with a 4-byte BE timestamp.""" 135 return NTCP2Block( 136 block_type=BLOCK_DATETIME, 137 data=struct.pack("!I", timestamp_seconds), 138 ) 139 140 141def i2np_block(message_bytes: bytes) -> NTCP2Block: 142 """Create an I2NP message block (type 3).""" 143 return NTCP2Block(block_type=BLOCK_I2NP, data=message_bytes) 144 145 146def padding_block(size: int) -> NTCP2Block: 147 """Create a padding block (type 254) with `size` random bytes.""" 148 return NTCP2Block(block_type=BLOCK_PADDING, data=os.urandom(size)) 149 150 151def termination_block(frames_received: int, reason: int) -> NTCP2Block: 152 """Create a termination block (type 4). 153 154 Data: 8 bytes LE frames_received + 1 byte reason code. 155 """ 156 data = struct.pack("<Q", frames_received) + struct.pack("B", reason) 157 return NTCP2Block(block_type=BLOCK_TERMINATION, data=data) 158 159 160def options_block(data: bytes = b"") -> NTCP2Block: 161 """Create an Options block (type 1). 162 163 For msg3, the Options block carries 12 bytes of padding/delay 164 negotiation parameters. If no data is provided, 12 zero bytes 165 are used (accepting defaults). 166 """ 167 if not data: 168 data = b"\x00" * 12 # 12 bytes: min/max padding, dummy, delay defaults 169 return NTCP2Block(block_type=BLOCK_OPTIONS, data=data) 170 171 172def router_info_block(ri_bytes: bytes, flag: int = 0) -> NTCP2Block: 173 """Create a RouterInfo block (type 2). 174 175 Data: 1 flag byte + RouterInfo bytes. 176 """ 177 return NTCP2Block( 178 block_type=BLOCK_ROUTERINFO, 179 data=struct.pack("B", flag) + ri_bytes, 180 )