A Python port of the Invisible Internet Project (I2P)
at main 360 lines 12 kB view raw
1"""Tests for NTCP2 real data-phase transport with SipHash-obfuscated frame lengths.""" 2 3import asyncio 4import struct 5import time 6 7import pytest 8 9from i2p_crypto.noise import CipherState 10from i2p_crypto.siphash import SipHashRatchet 11from i2p_transport.ntcp2_blocks import ( 12 BLOCK_DATETIME, 13 BLOCK_I2NP, 14 BLOCK_PADDING, 15 BLOCK_TERMINATION, 16 NTCP2Block, 17 decode_blocks, 18 encode_blocks, 19) 20from i2p_transport.ntcp2_real_connection import NTCP2RealConnection 21 22 23# --------------------------------------------------------------------------- 24# Mock reader / writer 25# --------------------------------------------------------------------------- 26 27class MockWriter: 28 def __init__(self): 29 self.data = bytearray() 30 self._closed = False 31 32 def write(self, data): 33 self.data.extend(data) 34 35 async def drain(self): 36 pass 37 38 def close(self): 39 self._closed = True 40 41 async def wait_closed(self): 42 pass 43 44 def is_closing(self): 45 return self._closed 46 47 48class MockReader: 49 def __init__(self, data: bytes = b""): 50 self._data = bytearray(data) 51 self._pos = 0 52 53 def feed(self, data: bytes): 54 self._data.extend(data) 55 56 async def readexactly(self, n): 57 if self._pos + n > len(self._data): 58 raise asyncio.IncompleteReadError( 59 bytes(self._data[self._pos:]), n 60 ) 61 result = bytes(self._data[self._pos : self._pos + n]) 62 self._pos += n 63 return result 64 65 66# --------------------------------------------------------------------------- 67# Helpers to create matched cipher + siphash pairs 68# --------------------------------------------------------------------------- 69 70def _make_cipher_pair(): 71 """Return (cipher_send, cipher_recv) sharing the same key.""" 72 key = b"\x01" * 32 73 return CipherState(key), CipherState(key) 74 75 76def _make_siphash_pair(): 77 """Return (siphash_send, siphash_recv) with identical state.""" 78 k0, k1, iv = 0x0706050403020100, 0x0F0E0D0C0B0A0908, 0 79 return SipHashRatchet(k0, k1, iv), SipHashRatchet(k0, k1, iv) 80 81 82# --------------------------------------------------------------------------- 83# Tests 84# --------------------------------------------------------------------------- 85 86class TestConstruction: 87 def test_construction_stores_attributes(self): 88 reader, writer = MockReader(), MockWriter() 89 cs, cr = _make_cipher_pair() 90 ss, sr = _make_siphash_pair() 91 conn = NTCP2RealConnection(reader, writer, cs, cr, ss, sr, b"\xaa" * 32) 92 assert conn.remote_hash == b"\xaa" * 32 93 94 def test_construction_default_remote_hash(self): 95 reader, writer = MockReader(), MockWriter() 96 cs, cr = _make_cipher_pair() 97 ss, sr = _make_siphash_pair() 98 conn = NTCP2RealConnection(reader, writer, cs, cr, ss, sr) 99 assert conn.remote_hash == b"" 100 101 102class TestSendRecvRoundtrip: 103 def test_single_frame_roundtrip(self): 104 """send_frame -> wire bytes -> recv_frame produces same blocks.""" 105 async def _run(): 106 send_key = b"\x11" * 32 107 cs = CipherState(send_key) 108 cr = CipherState(send_key) 109 ss, sr = _make_siphash_pair() 110 111 writer = MockWriter() 112 conn_send = NTCP2RealConnection( 113 MockReader(), writer, cs, None, ss, None 114 ) 115 116 blocks = [NTCP2Block(BLOCK_I2NP, b"hello i2np")] 117 await conn_send.send_frame(blocks) 118 119 wire = bytes(writer.data) 120 assert len(wire) > 2 121 122 reader = MockReader(wire) 123 conn_recv = NTCP2RealConnection( 124 reader, MockWriter(), None, cr, None, sr 125 ) 126 got = await conn_recv.recv_frame() 127 128 assert len(got) == 1 129 assert got[0].block_type == BLOCK_I2NP 130 assert got[0].data == b"hello i2np" 131 132 asyncio.run(_run()) 133 134 def test_frame_format_structure(self): 135 """Wire format: [2 bytes obfuscated length][encrypted payload + 16 MAC].""" 136 async def _run(): 137 send_key = b"\x22" * 32 138 cs = CipherState(send_key) 139 ss, _ = _make_siphash_pair() 140 141 writer = MockWriter() 142 conn = NTCP2RealConnection(MockReader(), writer, cs, None, ss, None) 143 144 blocks = [NTCP2Block(BLOCK_I2NP, b"x" * 10)] 145 await conn.send_frame(blocks) 146 147 wire = bytes(writer.data) 148 encrypted_payload = wire[2:] 149 150 plaintext = encode_blocks(blocks) 151 assert len(encrypted_payload) == len(plaintext) + 16 152 153 asyncio.run(_run()) 154 155 156class TestSipHashObfuscation: 157 def test_obfuscated_length_differs_from_plain(self): 158 """The 2-byte length on the wire should differ from the actual length.""" 159 async def _run(): 160 send_key = b"\x33" * 32 161 cs = CipherState(send_key) 162 ss, _ = _make_siphash_pair() 163 164 writer = MockWriter() 165 conn = NTCP2RealConnection(MockReader(), writer, cs, None, ss, None) 166 167 blocks = [NTCP2Block(BLOCK_I2NP, b"test")] 168 await conn.send_frame(blocks) 169 170 wire = bytes(writer.data) 171 obf_len = struct.unpack("!H", wire[:2])[0] 172 actual_len = len(wire) - 2 173 assert obf_len != actual_len 174 175 asyncio.run(_run()) 176 177 178class TestMultiFrameExchange: 179 def test_nonce_increments_and_siphash_ratchets(self): 180 """Multiple frames: each uses next nonce and next SipHash value.""" 181 async def _run(): 182 send_key = b"\x44" * 32 183 cs = CipherState(send_key) 184 cr = CipherState(send_key) 185 ss1, sr1 = _make_siphash_pair() 186 187 writer = MockWriter() 188 conn_send = NTCP2RealConnection( 189 MockReader(), writer, cs, None, ss1, None 190 ) 191 192 await conn_send.send_frame([NTCP2Block(BLOCK_I2NP, b"frame1")]) 193 wire1_len = len(writer.data) 194 await conn_send.send_frame([NTCP2Block(BLOCK_I2NP, b"frame2")]) 195 wire_total = bytes(writer.data) 196 197 wire1 = wire_total[:wire1_len] 198 wire2 = wire_total[wire1_len:] 199 200 assert wire1[:2] != wire2[:2] 201 202 reader = MockReader(wire_total) 203 conn_recv = NTCP2RealConnection( 204 reader, MockWriter(), None, cr, None, sr1 205 ) 206 got1 = await conn_recv.recv_frame() 207 got2 = await conn_recv.recv_frame() 208 209 assert got1[0].data == b"frame1" 210 assert got2[0].data == b"frame2" 211 212 asyncio.run(_run()) 213 214 215class TestSendI2NP: 216 def test_send_i2np_includes_datetime_and_padding(self): 217 """send_i2np wraps message in I2NP block with DateTime and Padding.""" 218 async def _run(): 219 send_key = b"\x55" * 32 220 cs = CipherState(send_key) 221 cr = CipherState(send_key) 222 ss, sr = _make_siphash_pair() 223 224 writer = MockWriter() 225 conn_send = NTCP2RealConnection( 226 MockReader(), writer, cs, None, ss, None 227 ) 228 await conn_send.send_i2np(b"i2np-payload-data") 229 230 wire = bytes(writer.data) 231 reader = MockReader(wire) 232 conn_recv = NTCP2RealConnection( 233 reader, MockWriter(), None, cr, None, sr 234 ) 235 blocks = await conn_recv.recv_frame() 236 237 types = [b.block_type for b in blocks] 238 assert BLOCK_DATETIME in types 239 assert BLOCK_I2NP in types 240 assert BLOCK_PADDING in types 241 242 i2np_blocks = [b for b in blocks if b.block_type == BLOCK_I2NP] 243 assert len(i2np_blocks) == 1 244 assert i2np_blocks[0].data == b"i2np-payload-data" 245 246 dt_blocks = [b for b in blocks if b.block_type == BLOCK_DATETIME] 247 assert len(dt_blocks) == 1 248 assert len(dt_blocks[0].data) == 4 249 250 asyncio.run(_run()) 251 252 253class TestBlockBasedPayloads: 254 def test_multi_block_frame(self): 255 """A frame with DateTime + I2NP + Padding blocks roundtrips.""" 256 async def _run(): 257 send_key = b"\x66" * 32 258 cs = CipherState(send_key) 259 cr = CipherState(send_key) 260 ss, sr = _make_siphash_pair() 261 262 blocks = [ 263 NTCP2Block(BLOCK_DATETIME, struct.pack("!I", 1000000)), 264 NTCP2Block(BLOCK_I2NP, b"msg-content"), 265 NTCP2Block(BLOCK_PADDING, b"\x00" * 16), 266 ] 267 268 writer = MockWriter() 269 conn_send = NTCP2RealConnection( 270 MockReader(), writer, cs, None, ss, None 271 ) 272 await conn_send.send_frame(blocks) 273 274 wire = bytes(writer.data) 275 reader = MockReader(wire) 276 conn_recv = NTCP2RealConnection( 277 reader, MockWriter(), None, cr, None, sr 278 ) 279 got = await conn_recv.recv_frame() 280 281 assert len(got) == 3 282 assert got[0].block_type == BLOCK_DATETIME 283 assert got[0].data == struct.pack("!I", 1000000) 284 assert got[1].block_type == BLOCK_I2NP 285 assert got[1].data == b"msg-content" 286 assert got[2].block_type == BLOCK_PADDING 287 assert got[2].data == b"\x00" * 16 288 289 asyncio.run(_run()) 290 291 292class TestClose: 293 def test_close_full_roundtrip(self): 294 """Full roundtrip: send frames, close, verify termination on recv side.""" 295 async def _run(): 296 send_key = b"\x88" * 32 297 cs = CipherState(send_key) 298 cr = CipherState(send_key) 299 ss, sr = _make_siphash_pair() 300 301 writer = MockWriter() 302 conn_send = NTCP2RealConnection( 303 MockReader(), writer, cs, None, ss, None, 304 ) 305 await conn_send.send_frame([NTCP2Block(BLOCK_I2NP, b"msg1")]) 306 await conn_send.close() 307 308 wire = bytes(writer.data) 309 reader = MockReader(wire) 310 conn_recv = NTCP2RealConnection( 311 reader, MockWriter(), None, cr, None, sr 312 ) 313 314 blocks1 = await conn_recv.recv_frame() 315 assert blocks1[0].block_type == BLOCK_I2NP 316 assert blocks1[0].data == b"msg1" 317 318 blocks2 = await conn_recv.recv_frame() 319 term_blocks = [b for b in blocks2 if b.block_type == BLOCK_TERMINATION] 320 assert len(term_blocks) == 1 321 322 asyncio.run(_run()) 323 324 def test_close_closes_writer(self): 325 """close() should close the underlying writer.""" 326 async def _run(): 327 send_key = b"\x99" * 32 328 cs = CipherState(send_key) 329 ss, _ = _make_siphash_pair() 330 331 writer = MockWriter() 332 conn = NTCP2RealConnection( 333 MockReader(), writer, cs, None, ss, None, 334 ) 335 await conn.close() 336 assert writer._closed 337 338 asyncio.run(_run()) 339 340 341class TestIsAlive: 342 def test_is_alive_initially_true(self): 343 send_key = b"\xaa" * 32 344 cs = CipherState(send_key) 345 ss, _ = _make_siphash_pair() 346 writer = MockWriter() 347 conn = NTCP2RealConnection(MockReader(), writer, cs, None, ss, None) 348 assert conn.is_alive() 349 350 def test_is_alive_false_after_close(self): 351 async def _run(): 352 send_key = b"\xbb" * 32 353 cs = CipherState(send_key) 354 ss, _ = _make_siphash_pair() 355 writer = MockWriter() 356 conn = NTCP2RealConnection(MockReader(), writer, cs, None, ss, None) 357 await conn.close() 358 assert not conn.is_alive() 359 360 asyncio.run(_run())