A Python port of the Invisible Internet Project (I2P)
at main 287 lines 9.4 kB view raw
1"""Tests for NTCP2 asyncio connection layer.""" 2 3import asyncio 4import os 5import struct 6 7import pytest 8 9from i2p_crypto.noise import CipherState, SymmetricState 10from i2p_transport.ntcp2 import NTCP2Frame, FrameType 11 12 13# --------------------------------------------------------------------------- 14# Mock streams 15# --------------------------------------------------------------------------- 16 17class MockStreamReader: 18 def __init__(self): 19 self._buffer = bytearray() 20 21 def feed(self, data: bytes): 22 self._buffer.extend(data) 23 24 async def readexactly(self, n: int) -> bytes: 25 while len(self._buffer) < n: 26 await asyncio.sleep(0.001) 27 result = bytes(self._buffer[:n]) 28 del self._buffer[:n] 29 return result 30 31 32class MockStreamWriter: 33 def __init__(self): 34 self.data = bytearray() 35 self._closing = False 36 37 def write(self, data: bytes): 38 self.data.extend(data) 39 40 async def drain(self): 41 pass 42 43 def close(self): 44 self._closing = True 45 46 async def wait_closed(self): 47 pass 48 49 def is_closing(self) -> bool: 50 return self._closing 51 52 53# --------------------------------------------------------------------------- 54# Helpers 55# --------------------------------------------------------------------------- 56 57def _make_cipher_pair() -> tuple[CipherState, CipherState]: 58 """Create a matched send/recv cipher pair from a shared key.""" 59 key = os.urandom(32) 60 return CipherState(key), CipherState(key) 61 62 63def _make_connection(reader=None, writer=None, cipher_send=None, cipher_recv=None): 64 from i2p_transport.ntcp2_connection import NTCP2Connection 65 if reader is None: 66 reader = MockStreamReader() 67 if writer is None: 68 writer = MockStreamWriter() 69 if cipher_send is None and cipher_recv is None: 70 cipher_send, cipher_recv = _make_cipher_pair() 71 if cipher_send is None: 72 cipher_send = CipherState(os.urandom(32)) 73 if cipher_recv is None: 74 cipher_recv = CipherState(os.urandom(32)) 75 return NTCP2Connection(reader, writer, cipher_send, cipher_recv) 76 77 78# --------------------------------------------------------------------------- 79# Tests 80# --------------------------------------------------------------------------- 81 82class TestSendFrame: 83 def test_send_frame_encrypts_and_writes_length_prefixed_data(self): 84 asyncio.run(self._test()) 85 86 async def _test(self): 87 send_cipher, recv_cipher = _make_cipher_pair() 88 writer = MockStreamWriter() 89 conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher) 90 91 frame = NTCP2Frame(FrameType.DATA, b"hello world") 92 await conn.send_frame(frame) 93 94 # Output should be 4-byte length prefix + encrypted data 95 raw = bytes(writer.data) 96 assert len(raw) > 4 97 length = struct.unpack("!I", raw[:4])[0] 98 encrypted_data = raw[4:] 99 assert len(encrypted_data) == length 100 101 # Decrypt with a fresh cipher (same key, nonce=0) 102 decrypt_cipher = CipherState(recv_cipher._key) 103 decrypt_cipher.set_nonce(0) 104 plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted_data) 105 106 # Should match the frame bytes 107 assert plaintext == frame.to_bytes() 108 109 110class TestRecvFrame: 111 def test_recv_frame_reads_and_decrypts(self): 112 asyncio.run(self._test()) 113 114 async def _test(self): 115 key = os.urandom(32) 116 reader = MockStreamReader() 117 118 # Encrypt a frame manually with a cipher at nonce 0 119 frame = NTCP2Frame(FrameType.DATA, b"test payload") 120 frame_bytes = frame.to_bytes() 121 encrypt_cipher = CipherState(key) 122 encrypted = encrypt_cipher.encrypt_with_ad(b"", frame_bytes) 123 124 # Feed length-prefixed encrypted data into reader 125 reader.feed(struct.pack("!I", len(encrypted)) + encrypted) 126 127 # The connection's recv cipher uses the same key, starts at nonce 0 128 recv_cipher = CipherState(key) 129 conn = _make_connection(reader=reader, cipher_recv=recv_cipher) 130 result = await conn.recv_frame() 131 132 assert result.frame_type == FrameType.DATA 133 assert result.payload == b"test payload" 134 135 136class TestRoundtrip: 137 def test_send_recv_roundtrip(self): 138 asyncio.run(self._test()) 139 140 async def _test(self): 141 """Two connections wired back-to-back: A sends, B receives.""" 142 # Simulate a completed handshake — split gives (c1, c2) 143 # Initiator sends with c1, responder decrypts with c1 144 # Responder sends with c2, initiator decrypts with c2 145 key_material = os.urandom(32) 146 ss = SymmetricState(b"Noise_IK_25519_ChaChaPoly_SHA256") 147 ss.mix_key(key_material) 148 c1, c2 = ss.split() 149 150 # Make another SymmetricState with same operations to get identical keys 151 ss2 = SymmetricState(b"Noise_IK_25519_ChaChaPoly_SHA256") 152 ss2.mix_key(key_material) 153 c1_copy, c2_copy = ss2.split() 154 155 # A's writer -> B's reader 156 a_writer = MockStreamWriter() 157 b_reader = MockStreamReader() 158 159 # A sends with c1, B receives with c1_copy 160 from i2p_transport.ntcp2_connection import NTCP2Connection 161 conn_a = NTCP2Connection(MockStreamReader(), a_writer, c1, c2) 162 conn_b = NTCP2Connection(b_reader, MockStreamWriter(), c2_copy, c1_copy) 163 164 # A sends a frame 165 original = NTCP2Frame(FrameType.ROUTER_INFO, os.urandom(128)) 166 await conn_a.send_frame(original) 167 168 # Wire A's output to B's input 169 b_reader.feed(bytes(a_writer.data)) 170 171 # B receives 172 received = await conn_b.recv_frame() 173 assert received.frame_type == original.frame_type 174 assert received.payload == original.payload 175 176 177class TestSendI2NP: 178 def test_send_i2np_wraps_in_i2np_frame(self): 179 asyncio.run(self._test()) 180 181 async def _test(self): 182 send_cipher, recv_cipher = _make_cipher_pair() 183 writer = MockStreamWriter() 184 conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher) 185 186 msg_type = 11 # DatabaseStore 187 payload = b"some i2np payload data" 188 await conn.send_i2np(msg_type, payload) 189 190 # Decrypt the written data 191 raw = bytes(writer.data) 192 length = struct.unpack("!I", raw[:4])[0] 193 encrypted = raw[4:] 194 195 decrypt_cipher = CipherState(recv_cipher._key) 196 decrypt_cipher.set_nonce(0) 197 plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted) 198 199 # Parse the frame 200 frame = NTCP2Frame.from_bytes(plaintext) 201 assert frame.frame_type == FrameType.I2NP 202 203 # The I2NP inner payload: type(1) + length(2) + payload 204 inner_type, inner_len = struct.unpack("!BH", frame.payload[:3]) 205 inner_payload = frame.payload[3:] 206 assert inner_type == msg_type 207 assert inner_len == len(payload) 208 assert inner_payload == payload 209 210 211class TestClose: 212 def test_close_sends_termination_frame(self): 213 asyncio.run(self._test()) 214 215 async def _test(self): 216 send_cipher, recv_cipher = _make_cipher_pair() 217 writer = MockStreamWriter() 218 conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher) 219 220 await conn.close() 221 222 # Writer should be closed 223 assert writer.is_closing() 224 225 # Should have written a termination frame 226 raw = bytes(writer.data) 227 assert len(raw) > 4 228 229 length = struct.unpack("!I", raw[:4])[0] 230 encrypted = raw[4:] 231 232 decrypt_cipher = CipherState(recv_cipher._key) 233 decrypt_cipher.set_nonce(0) 234 plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted) 235 236 frame = NTCP2Frame.from_bytes(plaintext) 237 assert frame.frame_type == FrameType.TERMINATION 238 239 240class TestIsAlive: 241 def test_is_alive_true_when_open(self): 242 conn = _make_connection() 243 assert conn.is_alive() is True 244 245 def test_is_alive_false_after_close(self): 246 asyncio.run(self._test()) 247 248 async def _test(self): 249 conn = _make_connection() 250 await conn.close() 251 assert conn.is_alive() is False 252 253 254class TestMultipleFrames: 255 def test_multiple_frames_nonce_advances(self): 256 asyncio.run(self._test()) 257 258 async def _test(self): 259 """Send multiple frames; nonces must advance so decryption works.""" 260 key = os.urandom(32) 261 send_cipher = CipherState(key) 262 recv_cipher = CipherState(key) 263 264 a_writer = MockStreamWriter() 265 b_reader = MockStreamReader() 266 267 from i2p_transport.ntcp2_connection import NTCP2Connection 268 conn_a = NTCP2Connection(MockStreamReader(), a_writer, send_cipher, CipherState(os.urandom(32))) 269 conn_b = NTCP2Connection(b_reader, MockStreamWriter(), CipherState(os.urandom(32)), recv_cipher) 270 271 frames = [ 272 NTCP2Frame(FrameType.DATA, b"frame-0"), 273 NTCP2Frame(FrameType.DATA, b"frame-1"), 274 NTCP2Frame(FrameType.DATA, b"frame-2"), 275 ] 276 277 for f in frames: 278 await conn_a.send_frame(f) 279 280 # Wire all output 281 b_reader.feed(bytes(a_writer.data)) 282 283 # Receive all 284 for i, expected in enumerate(frames): 285 received = await conn_b.recv_frame() 286 assert received.frame_type == expected.frame_type 287 assert received.payload == expected.payload, f"Frame {i} payload mismatch"