"""Tests for NTCP2 asyncio connection layer.""" import asyncio import os import struct import pytest from i2p_crypto.noise import CipherState, SymmetricState from i2p_transport.ntcp2 import NTCP2Frame, FrameType # --------------------------------------------------------------------------- # Mock streams # --------------------------------------------------------------------------- class MockStreamReader: def __init__(self): self._buffer = bytearray() def feed(self, data: bytes): self._buffer.extend(data) async def readexactly(self, n: int) -> bytes: while len(self._buffer) < n: await asyncio.sleep(0.001) result = bytes(self._buffer[:n]) del self._buffer[:n] return result class MockStreamWriter: def __init__(self): self.data = bytearray() self._closing = False def write(self, data: bytes): self.data.extend(data) async def drain(self): pass def close(self): self._closing = True async def wait_closed(self): pass def is_closing(self) -> bool: return self._closing # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_cipher_pair() -> tuple[CipherState, CipherState]: """Create a matched send/recv cipher pair from a shared key.""" key = os.urandom(32) return CipherState(key), CipherState(key) def _make_connection(reader=None, writer=None, cipher_send=None, cipher_recv=None): from i2p_transport.ntcp2_connection import NTCP2Connection if reader is None: reader = MockStreamReader() if writer is None: writer = MockStreamWriter() if cipher_send is None and cipher_recv is None: cipher_send, cipher_recv = _make_cipher_pair() if cipher_send is None: cipher_send = CipherState(os.urandom(32)) if cipher_recv is None: cipher_recv = CipherState(os.urandom(32)) return NTCP2Connection(reader, writer, cipher_send, cipher_recv) # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- class TestSendFrame: def test_send_frame_encrypts_and_writes_length_prefixed_data(self): asyncio.run(self._test()) async def _test(self): send_cipher, recv_cipher = _make_cipher_pair() writer = MockStreamWriter() conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher) frame = NTCP2Frame(FrameType.DATA, b"hello world") await conn.send_frame(frame) # Output should be 4-byte length prefix + encrypted data raw = bytes(writer.data) assert len(raw) > 4 length = struct.unpack("!I", raw[:4])[0] encrypted_data = raw[4:] assert len(encrypted_data) == length # Decrypt with a fresh cipher (same key, nonce=0) decrypt_cipher = CipherState(recv_cipher._key) decrypt_cipher.set_nonce(0) plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted_data) # Should match the frame bytes assert plaintext == frame.to_bytes() class TestRecvFrame: def test_recv_frame_reads_and_decrypts(self): asyncio.run(self._test()) async def _test(self): key = os.urandom(32) reader = MockStreamReader() # Encrypt a frame manually with a cipher at nonce 0 frame = NTCP2Frame(FrameType.DATA, b"test payload") frame_bytes = frame.to_bytes() encrypt_cipher = CipherState(key) encrypted = encrypt_cipher.encrypt_with_ad(b"", frame_bytes) # Feed length-prefixed encrypted data into reader reader.feed(struct.pack("!I", len(encrypted)) + encrypted) # The connection's recv cipher uses the same key, starts at nonce 0 recv_cipher = CipherState(key) conn = _make_connection(reader=reader, cipher_recv=recv_cipher) result = await conn.recv_frame() assert result.frame_type == FrameType.DATA assert result.payload == b"test payload" class TestRoundtrip: def test_send_recv_roundtrip(self): asyncio.run(self._test()) async def _test(self): """Two connections wired back-to-back: A sends, B receives.""" # Simulate a completed handshake — split gives (c1, c2) # Initiator sends with c1, responder decrypts with c1 # Responder sends with c2, initiator decrypts with c2 key_material = os.urandom(32) ss = SymmetricState(b"Noise_IK_25519_ChaChaPoly_SHA256") ss.mix_key(key_material) c1, c2 = ss.split() # Make another SymmetricState with same operations to get identical keys ss2 = SymmetricState(b"Noise_IK_25519_ChaChaPoly_SHA256") ss2.mix_key(key_material) c1_copy, c2_copy = ss2.split() # A's writer -> B's reader a_writer = MockStreamWriter() b_reader = MockStreamReader() # A sends with c1, B receives with c1_copy from i2p_transport.ntcp2_connection import NTCP2Connection conn_a = NTCP2Connection(MockStreamReader(), a_writer, c1, c2) conn_b = NTCP2Connection(b_reader, MockStreamWriter(), c2_copy, c1_copy) # A sends a frame original = NTCP2Frame(FrameType.ROUTER_INFO, os.urandom(128)) await conn_a.send_frame(original) # Wire A's output to B's input b_reader.feed(bytes(a_writer.data)) # B receives received = await conn_b.recv_frame() assert received.frame_type == original.frame_type assert received.payload == original.payload class TestSendI2NP: def test_send_i2np_wraps_in_i2np_frame(self): asyncio.run(self._test()) async def _test(self): send_cipher, recv_cipher = _make_cipher_pair() writer = MockStreamWriter() conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher) msg_type = 11 # DatabaseStore payload = b"some i2np payload data" await conn.send_i2np(msg_type, payload) # Decrypt the written data raw = bytes(writer.data) length = struct.unpack("!I", raw[:4])[0] encrypted = raw[4:] decrypt_cipher = CipherState(recv_cipher._key) decrypt_cipher.set_nonce(0) plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted) # Parse the frame frame = NTCP2Frame.from_bytes(plaintext) assert frame.frame_type == FrameType.I2NP # The I2NP inner payload: type(1) + length(2) + payload inner_type, inner_len = struct.unpack("!BH", frame.payload[:3]) inner_payload = frame.payload[3:] assert inner_type == msg_type assert inner_len == len(payload) assert inner_payload == payload class TestClose: def test_close_sends_termination_frame(self): asyncio.run(self._test()) async def _test(self): send_cipher, recv_cipher = _make_cipher_pair() writer = MockStreamWriter() conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher) await conn.close() # Writer should be closed assert writer.is_closing() # Should have written a termination frame raw = bytes(writer.data) assert len(raw) > 4 length = struct.unpack("!I", raw[:4])[0] encrypted = raw[4:] decrypt_cipher = CipherState(recv_cipher._key) decrypt_cipher.set_nonce(0) plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted) frame = NTCP2Frame.from_bytes(plaintext) assert frame.frame_type == FrameType.TERMINATION class TestIsAlive: def test_is_alive_true_when_open(self): conn = _make_connection() assert conn.is_alive() is True def test_is_alive_false_after_close(self): asyncio.run(self._test()) async def _test(self): conn = _make_connection() await conn.close() assert conn.is_alive() is False class TestMultipleFrames: def test_multiple_frames_nonce_advances(self): asyncio.run(self._test()) async def _test(self): """Send multiple frames; nonces must advance so decryption works.""" key = os.urandom(32) send_cipher = CipherState(key) recv_cipher = CipherState(key) a_writer = MockStreamWriter() b_reader = MockStreamReader() from i2p_transport.ntcp2_connection import NTCP2Connection conn_a = NTCP2Connection(MockStreamReader(), a_writer, send_cipher, CipherState(os.urandom(32))) conn_b = NTCP2Connection(b_reader, MockStreamWriter(), CipherState(os.urandom(32)), recv_cipher) frames = [ NTCP2Frame(FrameType.DATA, b"frame-0"), NTCP2Frame(FrameType.DATA, b"frame-1"), NTCP2Frame(FrameType.DATA, b"frame-2"), ] for f in frames: await conn_a.send_frame(f) # Wire all output b_reader.feed(bytes(a_writer.data)) # Receive all for i, expected in enumerate(frames): received = await conn_b.recv_frame() assert received.frame_type == expected.frame_type assert received.payload == expected.payload, f"Frame {i} payload mismatch"