A Python port of the Invisible Internet Project (I2P)
1"""NTCP2 payload block codec.
2
3Implements encoding and decoding of NTCP2 payload blocks as defined in
4the I2P NTCP2 specification. Each block consists of a 1-byte type,
52-byte big-endian length, and variable-length data. Multiple blocks
6are concatenated within a single decrypted frame payload.
7
8Also provides handshake option encoders/decoders for msg1 and msg2.
9"""
10
11import os
12import struct
13from dataclasses import dataclass
14
15# Block type constants
16BLOCK_DATETIME: int = 0
17BLOCK_OPTIONS: int = 1
18BLOCK_ROUTERINFO: int = 2
19BLOCK_I2NP: int = 3
20BLOCK_TERMINATION: int = 4
21BLOCK_PADDING: int = 254
22
23
24@dataclass
25class NTCP2Block:
26 """A single NTCP2 payload block."""
27
28 block_type: int
29 data: bytes
30
31
32def encode_blocks(blocks: list[NTCP2Block]) -> bytes:
33 """Encode a list of NTCP2 blocks into concatenated wire bytes.
34
35 Each block: 1 byte type + 2 bytes big-endian length + data.
36 """
37 parts = []
38 for block in blocks:
39 parts.append(struct.pack("!BH", block.block_type, len(block.data)))
40 parts.append(block.data)
41 return b"".join(parts)
42
43
44def decode_blocks(data: bytes) -> list[NTCP2Block]:
45 """Decode concatenated wire bytes into a list of NTCP2 blocks.
46
47 Reads type (1 byte), length (2 bytes BE), data (length bytes),
48 repeating until all data is consumed.
49 """
50 blocks = []
51 offset = 0
52 while offset < len(data):
53 if offset + 3 > len(data):
54 raise ValueError(f"Incomplete block header at offset {offset}")
55 block_type = data[offset]
56 length = struct.unpack("!H", data[offset + 1:offset + 3])[0]
57 offset += 3
58 if offset + length > len(data):
59 raise ValueError(
60 f"Block at offset {offset - 3} declares length {length} "
61 f"but only {len(data) - offset} bytes remain"
62 )
63 block_data = data[offset:offset + length]
64 offset += length
65 blocks.append(NTCP2Block(block_type=block_type, data=block_data))
66 return blocks
67
68
69# --- Handshake option encoders/decoders ---
70
71def encode_msg1_options(
72 network_id: int, version: int, padlen1: int, msg3p2len: int, timestamp: int
73) -> bytes:
74 """Encode msg1 options into 16 bytes.
75
76 Layout:
77 0 1 network_id
78 1 1 version
79 2-3 2 padlen1 (BE)
80 4-5 2 msg3p2len (BE)
81 6-7 2 reserved (0)
82 8-11 4 timestamp seconds (BE)
83 12-15 4 reserved (0)
84 """
85 return struct.pack(
86 "!BBHHHI I",
87 network_id, version, padlen1, msg3p2len, 0, timestamp, 0
88 )
89
90
91def decode_msg1_options(data: bytes) -> dict:
92 """Decode 16 bytes of msg1 options into a dict."""
93 if len(data) != 16:
94 raise ValueError(f"msg1 options must be 16 bytes, got {len(data)}")
95 network_id, version, padlen1, msg3p2len, _, timestamp, _ = struct.unpack(
96 "!BBHHHI I", data
97 )
98 return {
99 "network_id": network_id,
100 "version": version,
101 "padlen1": padlen1,
102 "msg3p2len": msg3p2len,
103 "timestamp": timestamp,
104 }
105
106
107def encode_msg2_options(padlen2: int, timestamp: int) -> bytes:
108 """Encode msg2 options into 16 bytes.
109
110 Layout:
111 0-1 2 reserved (0)
112 2-3 2 padlen2 (BE)
113 4-7 4 reserved (0)
114 8-11 4 timestamp seconds (BE)
115 12-15 4 reserved (0)
116 """
117 return struct.pack("!HHI I I", 0, padlen2, 0, timestamp, 0)
118
119
120def decode_msg2_options(data: bytes) -> dict:
121 """Decode 16 bytes of msg2 options into a dict."""
122 if len(data) != 16:
123 raise ValueError(f"msg2 options must be 16 bytes, got {len(data)}")
124 _, padlen2, _, timestamp, _ = struct.unpack("!HHI I I", data)
125 return {
126 "padlen2": padlen2,
127 "timestamp": timestamp,
128 }
129
130
131# --- Helper constructors ---
132
133def datetime_block(timestamp_seconds: int) -> NTCP2Block:
134 """Create a DateTime block (type 0) with a 4-byte BE timestamp."""
135 return NTCP2Block(
136 block_type=BLOCK_DATETIME,
137 data=struct.pack("!I", timestamp_seconds),
138 )
139
140
141def i2np_block(message_bytes: bytes) -> NTCP2Block:
142 """Create an I2NP message block (type 3)."""
143 return NTCP2Block(block_type=BLOCK_I2NP, data=message_bytes)
144
145
146def padding_block(size: int) -> NTCP2Block:
147 """Create a padding block (type 254) with `size` random bytes."""
148 return NTCP2Block(block_type=BLOCK_PADDING, data=os.urandom(size))
149
150
151def termination_block(frames_received: int, reason: int) -> NTCP2Block:
152 """Create a termination block (type 4).
153
154 Data: 8 bytes LE frames_received + 1 byte reason code.
155 """
156 data = struct.pack("<Q", frames_received) + struct.pack("B", reason)
157 return NTCP2Block(block_type=BLOCK_TERMINATION, data=data)
158
159
160def options_block(data: bytes = b"") -> NTCP2Block:
161 """Create an Options block (type 1).
162
163 For msg3, the Options block carries 12 bytes of padding/delay
164 negotiation parameters. If no data is provided, 12 zero bytes
165 are used (accepting defaults).
166 """
167 if not data:
168 data = b"\x00" * 12 # 12 bytes: min/max padding, dummy, delay defaults
169 return NTCP2Block(block_type=BLOCK_OPTIONS, data=data)
170
171
172def router_info_block(ri_bytes: bytes, flag: int = 0) -> NTCP2Block:
173 """Create a RouterInfo block (type 2).
174
175 Data: 1 flag byte + RouterInfo bytes.
176 """
177 return NTCP2Block(
178 block_type=BLOCK_ROUTERINFO,
179 data=struct.pack("B", flag) + ri_bytes,
180 )