A Python port of the Invisible Internet Project (I2P)
at main 300 lines 10 kB view raw
1"""Tests for NTCP2 payload block codec. 2 3Covers encode/decode roundtrips, helper constructors, options encoding, 4edge cases, and block ordering validation. 5""" 6 7import struct 8import pytest 9 10from i2p_transport.ntcp2_blocks import ( 11 BLOCK_DATETIME, 12 BLOCK_OPTIONS, 13 BLOCK_ROUTERINFO, 14 BLOCK_I2NP, 15 BLOCK_TERMINATION, 16 BLOCK_PADDING, 17 NTCP2Block, 18 encode_blocks, 19 decode_blocks, 20 encode_msg1_options, 21 decode_msg1_options, 22 encode_msg2_options, 23 decode_msg2_options, 24 datetime_block, 25 i2np_block, 26 padding_block, 27 termination_block, 28 router_info_block, 29) 30 31 32# --- Block type constants --- 33 34class TestConstants: 35 def test_block_type_values(self): 36 assert BLOCK_DATETIME == 0 37 assert BLOCK_OPTIONS == 1 38 assert BLOCK_ROUTERINFO == 2 39 assert BLOCK_I2NP == 3 40 assert BLOCK_TERMINATION == 4 41 assert BLOCK_PADDING == 254 42 43 44# --- NTCP2Block dataclass --- 45 46class TestNTCP2Block: 47 def test_create_block(self): 48 block = NTCP2Block(block_type=3, data=b"\x01\x02\x03") 49 assert block.block_type == 3 50 assert block.data == b"\x01\x02\x03" 51 52 def test_create_empty_data_block(self): 53 block = NTCP2Block(block_type=0, data=b"") 54 assert block.block_type == 0 55 assert block.data == b"" 56 57 58# --- encode_blocks / decode_blocks roundtrip --- 59 60class TestEncodeDecodeBlocks: 61 def test_single_block_roundtrip(self): 62 original = [NTCP2Block(block_type=BLOCK_I2NP, data=b"\xaa\xbb\xcc")] 63 encoded = encode_blocks(original) 64 # 1 byte type + 2 bytes length + 3 bytes data = 6 bytes 65 assert len(encoded) == 6 66 decoded = decode_blocks(encoded) 67 assert len(decoded) == 1 68 assert decoded[0].block_type == BLOCK_I2NP 69 assert decoded[0].data == b"\xaa\xbb\xcc" 70 71 def test_multiple_blocks_roundtrip(self): 72 blocks = [ 73 NTCP2Block(block_type=BLOCK_DATETIME, data=b"\x00\x00\x00\x01"), 74 NTCP2Block(block_type=BLOCK_I2NP, data=b"\xde\xad"), 75 NTCP2Block(block_type=BLOCK_PADDING, data=b"\x00" * 10), 76 ] 77 encoded = encode_blocks(blocks) 78 decoded = decode_blocks(encoded) 79 assert len(decoded) == 3 80 for orig, dec in zip(blocks, decoded): 81 assert orig.block_type == dec.block_type 82 assert orig.data == dec.data 83 84 def test_empty_data_block_roundtrip(self): 85 blocks = [NTCP2Block(block_type=BLOCK_I2NP, data=b"")] 86 encoded = encode_blocks(blocks) 87 # 1 type + 2 length + 0 data = 3 bytes 88 assert len(encoded) == 3 89 decoded = decode_blocks(encoded) 90 assert len(decoded) == 1 91 assert decoded[0].data == b"" 92 93 def test_encode_wire_format(self): 94 """Verify the exact binary layout: type(1) + length(2 BE) + data.""" 95 block = NTCP2Block(block_type=0x03, data=b"\x01\x02") 96 encoded = encode_blocks([block]) 97 assert encoded[0:1] == b"\x03" # type byte 98 assert encoded[1:3] == b"\x00\x02" # length = 2, big-endian 99 assert encoded[3:5] == b"\x01\x02" # data 100 101 def test_empty_input_returns_empty_list(self): 102 assert decode_blocks(b"") == [] 103 104 def test_no_blocks_encodes_to_empty(self): 105 assert encode_blocks([]) == b"" 106 107 108# --- Helper constructors --- 109 110class TestDatetimeBlock: 111 def test_datetime_block_format(self): 112 ts = 1700000000 113 block = datetime_block(ts) 114 assert block.block_type == BLOCK_DATETIME 115 assert len(block.data) == 4 116 assert struct.unpack("!I", block.data)[0] == ts 117 118 def test_datetime_block_roundtrip(self): 119 ts = 0 120 block = datetime_block(ts) 121 encoded = encode_blocks([block]) 122 decoded = decode_blocks(encoded) 123 assert struct.unpack("!I", decoded[0].data)[0] == ts 124 125 126class TestI2NPBlock: 127 def test_i2np_block(self): 128 msg = b"\x11\x22\x33\x44\x55" 129 block = i2np_block(msg) 130 assert block.block_type == BLOCK_I2NP 131 assert block.data == msg 132 133 def test_zero_length_i2np_block(self): 134 block = i2np_block(b"") 135 assert block.block_type == BLOCK_I2NP 136 assert block.data == b"" 137 encoded = encode_blocks([block]) 138 decoded = decode_blocks(encoded) 139 assert decoded[0].block_type == BLOCK_I2NP 140 assert decoded[0].data == b"" 141 142 143class TestPaddingBlock: 144 def test_padding_block_size(self): 145 block = padding_block(32) 146 assert block.block_type == BLOCK_PADDING 147 assert len(block.data) == 32 148 149 def test_padding_block_zero_size(self): 150 block = padding_block(0) 151 assert block.block_type == BLOCK_PADDING 152 assert len(block.data) == 0 153 154 def test_padding_block_random_bytes(self): 155 """Two padding blocks of the same size should (almost certainly) differ.""" 156 b1 = padding_block(64) 157 b2 = padding_block(64) 158 # Extremely unlikely to be equal for 64 random bytes 159 assert b1.data != b2.data 160 161 162class TestTerminationBlock: 163 def test_termination_block_format(self): 164 block = termination_block(frames_received=12345, reason=1) 165 assert block.block_type == BLOCK_TERMINATION 166 assert len(block.data) == 9 167 # 8 bytes LE frames_received + 1 byte reason 168 frames = struct.unpack("<Q", block.data[:8])[0] 169 assert frames == 12345 170 assert block.data[8] == 1 171 172 def test_termination_block_large_count(self): 173 block = termination_block(frames_received=2**48, reason=0) 174 frames = struct.unpack("<Q", block.data[:8])[0] 175 assert frames == 2**48 176 177 def test_termination_block_roundtrip(self): 178 block = termination_block(frames_received=999, reason=5) 179 encoded = encode_blocks([block]) 180 decoded = decode_blocks(encoded) 181 assert decoded[0].block_type == BLOCK_TERMINATION 182 frames = struct.unpack("<Q", decoded[0].data[:8])[0] 183 assert frames == 999 184 assert decoded[0].data[8] == 5 185 186 187class TestRouterInfoBlock: 188 def test_router_info_block_default_flag(self): 189 ri_data = b"\xaa\xbb\xcc\xdd" 190 block = router_info_block(ri_data) 191 assert block.block_type == BLOCK_ROUTERINFO 192 assert block.data[0] == 0 # default flag 193 assert block.data[1:] == ri_data 194 195 def test_router_info_block_with_flag(self): 196 ri_data = b"\x01\x02" 197 block = router_info_block(ri_data, flag=1) 198 assert block.data[0] == 1 199 assert block.data[1:] == ri_data 200 201 def test_router_info_block_roundtrip(self): 202 ri_data = b"\xff" * 100 203 block = router_info_block(ri_data, flag=2) 204 encoded = encode_blocks([block]) 205 decoded = decode_blocks(encoded) 206 assert decoded[0].block_type == BLOCK_ROUTERINFO 207 assert decoded[0].data[0] == 2 208 assert decoded[0].data[1:] == ri_data 209 210 211# --- Options encode/decode --- 212 213class TestMsg1Options: 214 def test_encode_msg1_options_size(self): 215 data = encode_msg1_options( 216 network_id=2, version=2, padlen1=64, msg3p2len=512, timestamp=1700000000 217 ) 218 assert len(data) == 16 219 220 def test_msg1_options_roundtrip(self): 221 opts = encode_msg1_options( 222 network_id=2, version=2, padlen1=128, msg3p2len=1024, timestamp=1700000000 223 ) 224 decoded = decode_msg1_options(opts) 225 assert decoded["network_id"] == 2 226 assert decoded["version"] == 2 227 assert decoded["padlen1"] == 128 228 assert decoded["msg3p2len"] == 1024 229 assert decoded["timestamp"] == 1700000000 230 231 def test_msg1_options_reserved_zero(self): 232 data = encode_msg1_options( 233 network_id=2, version=2, padlen1=0, msg3p2len=0, timestamp=0 234 ) 235 # bytes 6-7 reserved 236 assert data[6:8] == b"\x00\x00" 237 # bytes 12-15 reserved 238 assert data[12:16] == b"\x00\x00\x00\x00" 239 240 def test_msg1_options_field_layout(self): 241 data = encode_msg1_options( 242 network_id=0x02, version=0x02, padlen1=0x0100, msg3p2len=0x0200, timestamp=0x65000000 243 ) 244 assert data[0] == 0x02 # network_id 245 assert data[1] == 0x02 # version 246 assert struct.unpack("!H", data[2:4])[0] == 0x0100 # padlen1 247 assert struct.unpack("!H", data[4:6])[0] == 0x0200 # msg3p2len 248 assert struct.unpack("!I", data[8:12])[0] == 0x65000000 # timestamp 249 250 251class TestMsg2Options: 252 def test_encode_msg2_options_size(self): 253 data = encode_msg2_options(padlen2=32, timestamp=1700000000) 254 assert len(data) == 16 255 256 def test_msg2_options_roundtrip(self): 257 data = encode_msg2_options(padlen2=256, timestamp=1700000000) 258 decoded = decode_msg2_options(data) 259 assert decoded["padlen2"] == 256 260 assert decoded["timestamp"] == 1700000000 261 262 def test_msg2_options_reserved_zero(self): 263 data = encode_msg2_options(padlen2=0, timestamp=0) 264 assert data[0:2] == b"\x00\x00" # reserved 265 assert data[4:8] == b"\x00\x00\x00\x00" # reserved 266 assert data[12:16] == b"\x00\x00\x00\x00" # reserved 267 268 def test_msg2_options_field_layout(self): 269 data = encode_msg2_options(padlen2=0x0300, timestamp=0xAABBCCDD) 270 assert struct.unpack("!H", data[2:4])[0] == 0x0300 271 assert struct.unpack("!I", data[8:12])[0] == 0xAABBCCDD 272 273 274# --- Block ordering --- 275 276class TestBlockOrdering: 277 def test_padding_last_in_payload(self): 278 """Padding must be the last block in a well-formed payload.""" 279 blocks = [ 280 datetime_block(1700000000), 281 i2np_block(b"\x01\x02\x03"), 282 padding_block(16), 283 ] 284 encoded = encode_blocks(blocks) 285 decoded = decode_blocks(encoded) 286 assert decoded[-1].block_type == BLOCK_PADDING 287 288 def test_multiple_i2np_blocks(self): 289 blocks = [ 290 i2np_block(b"\x01"), 291 i2np_block(b"\x02"), 292 i2np_block(b"\x03"), 293 ] 294 encoded = encode_blocks(blocks) 295 decoded = decode_blocks(encoded) 296 assert len(decoded) == 3 297 assert all(b.block_type == BLOCK_I2NP for b in decoded) 298 assert decoded[0].data == b"\x01" 299 assert decoded[1].data == b"\x02" 300 assert decoded[2].data == b"\x03"