A Python port of the Invisible Internet Project (I2P)
1"""Tests for NTCP2 asyncio connection layer."""
2
3import asyncio
4import os
5import struct
6
7import pytest
8
9from i2p_crypto.noise import CipherState, SymmetricState
10from i2p_transport.ntcp2 import NTCP2Frame, FrameType
11
12
13# ---------------------------------------------------------------------------
14# Mock streams
15# ---------------------------------------------------------------------------
16
17class MockStreamReader:
18 def __init__(self):
19 self._buffer = bytearray()
20
21 def feed(self, data: bytes):
22 self._buffer.extend(data)
23
24 async def readexactly(self, n: int) -> bytes:
25 while len(self._buffer) < n:
26 await asyncio.sleep(0.001)
27 result = bytes(self._buffer[:n])
28 del self._buffer[:n]
29 return result
30
31
32class MockStreamWriter:
33 def __init__(self):
34 self.data = bytearray()
35 self._closing = False
36
37 def write(self, data: bytes):
38 self.data.extend(data)
39
40 async def drain(self):
41 pass
42
43 def close(self):
44 self._closing = True
45
46 async def wait_closed(self):
47 pass
48
49 def is_closing(self) -> bool:
50 return self._closing
51
52
53# ---------------------------------------------------------------------------
54# Helpers
55# ---------------------------------------------------------------------------
56
57def _make_cipher_pair() -> tuple[CipherState, CipherState]:
58 """Create a matched send/recv cipher pair from a shared key."""
59 key = os.urandom(32)
60 return CipherState(key), CipherState(key)
61
62
63def _make_connection(reader=None, writer=None, cipher_send=None, cipher_recv=None):
64 from i2p_transport.ntcp2_connection import NTCP2Connection
65 if reader is None:
66 reader = MockStreamReader()
67 if writer is None:
68 writer = MockStreamWriter()
69 if cipher_send is None and cipher_recv is None:
70 cipher_send, cipher_recv = _make_cipher_pair()
71 if cipher_send is None:
72 cipher_send = CipherState(os.urandom(32))
73 if cipher_recv is None:
74 cipher_recv = CipherState(os.urandom(32))
75 return NTCP2Connection(reader, writer, cipher_send, cipher_recv)
76
77
78# ---------------------------------------------------------------------------
79# Tests
80# ---------------------------------------------------------------------------
81
82class TestSendFrame:
83 def test_send_frame_encrypts_and_writes_length_prefixed_data(self):
84 asyncio.run(self._test())
85
86 async def _test(self):
87 send_cipher, recv_cipher = _make_cipher_pair()
88 writer = MockStreamWriter()
89 conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher)
90
91 frame = NTCP2Frame(FrameType.DATA, b"hello world")
92 await conn.send_frame(frame)
93
94 # Output should be 4-byte length prefix + encrypted data
95 raw = bytes(writer.data)
96 assert len(raw) > 4
97 length = struct.unpack("!I", raw[:4])[0]
98 encrypted_data = raw[4:]
99 assert len(encrypted_data) == length
100
101 # Decrypt with a fresh cipher (same key, nonce=0)
102 decrypt_cipher = CipherState(recv_cipher._key)
103 decrypt_cipher.set_nonce(0)
104 plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted_data)
105
106 # Should match the frame bytes
107 assert plaintext == frame.to_bytes()
108
109
110class TestRecvFrame:
111 def test_recv_frame_reads_and_decrypts(self):
112 asyncio.run(self._test())
113
114 async def _test(self):
115 key = os.urandom(32)
116 reader = MockStreamReader()
117
118 # Encrypt a frame manually with a cipher at nonce 0
119 frame = NTCP2Frame(FrameType.DATA, b"test payload")
120 frame_bytes = frame.to_bytes()
121 encrypt_cipher = CipherState(key)
122 encrypted = encrypt_cipher.encrypt_with_ad(b"", frame_bytes)
123
124 # Feed length-prefixed encrypted data into reader
125 reader.feed(struct.pack("!I", len(encrypted)) + encrypted)
126
127 # The connection's recv cipher uses the same key, starts at nonce 0
128 recv_cipher = CipherState(key)
129 conn = _make_connection(reader=reader, cipher_recv=recv_cipher)
130 result = await conn.recv_frame()
131
132 assert result.frame_type == FrameType.DATA
133 assert result.payload == b"test payload"
134
135
136class TestRoundtrip:
137 def test_send_recv_roundtrip(self):
138 asyncio.run(self._test())
139
140 async def _test(self):
141 """Two connections wired back-to-back: A sends, B receives."""
142 # Simulate a completed handshake — split gives (c1, c2)
143 # Initiator sends with c1, responder decrypts with c1
144 # Responder sends with c2, initiator decrypts with c2
145 key_material = os.urandom(32)
146 ss = SymmetricState(b"Noise_IK_25519_ChaChaPoly_SHA256")
147 ss.mix_key(key_material)
148 c1, c2 = ss.split()
149
150 # Make another SymmetricState with same operations to get identical keys
151 ss2 = SymmetricState(b"Noise_IK_25519_ChaChaPoly_SHA256")
152 ss2.mix_key(key_material)
153 c1_copy, c2_copy = ss2.split()
154
155 # A's writer -> B's reader
156 a_writer = MockStreamWriter()
157 b_reader = MockStreamReader()
158
159 # A sends with c1, B receives with c1_copy
160 from i2p_transport.ntcp2_connection import NTCP2Connection
161 conn_a = NTCP2Connection(MockStreamReader(), a_writer, c1, c2)
162 conn_b = NTCP2Connection(b_reader, MockStreamWriter(), c2_copy, c1_copy)
163
164 # A sends a frame
165 original = NTCP2Frame(FrameType.ROUTER_INFO, os.urandom(128))
166 await conn_a.send_frame(original)
167
168 # Wire A's output to B's input
169 b_reader.feed(bytes(a_writer.data))
170
171 # B receives
172 received = await conn_b.recv_frame()
173 assert received.frame_type == original.frame_type
174 assert received.payload == original.payload
175
176
177class TestSendI2NP:
178 def test_send_i2np_wraps_in_i2np_frame(self):
179 asyncio.run(self._test())
180
181 async def _test(self):
182 send_cipher, recv_cipher = _make_cipher_pair()
183 writer = MockStreamWriter()
184 conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher)
185
186 msg_type = 11 # DatabaseStore
187 payload = b"some i2np payload data"
188 await conn.send_i2np(msg_type, payload)
189
190 # Decrypt the written data
191 raw = bytes(writer.data)
192 length = struct.unpack("!I", raw[:4])[0]
193 encrypted = raw[4:]
194
195 decrypt_cipher = CipherState(recv_cipher._key)
196 decrypt_cipher.set_nonce(0)
197 plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted)
198
199 # Parse the frame
200 frame = NTCP2Frame.from_bytes(plaintext)
201 assert frame.frame_type == FrameType.I2NP
202
203 # The I2NP inner payload: type(1) + length(2) + payload
204 inner_type, inner_len = struct.unpack("!BH", frame.payload[:3])
205 inner_payload = frame.payload[3:]
206 assert inner_type == msg_type
207 assert inner_len == len(payload)
208 assert inner_payload == payload
209
210
211class TestClose:
212 def test_close_sends_termination_frame(self):
213 asyncio.run(self._test())
214
215 async def _test(self):
216 send_cipher, recv_cipher = _make_cipher_pair()
217 writer = MockStreamWriter()
218 conn = _make_connection(writer=writer, cipher_send=send_cipher, cipher_recv=recv_cipher)
219
220 await conn.close()
221
222 # Writer should be closed
223 assert writer.is_closing()
224
225 # Should have written a termination frame
226 raw = bytes(writer.data)
227 assert len(raw) > 4
228
229 length = struct.unpack("!I", raw[:4])[0]
230 encrypted = raw[4:]
231
232 decrypt_cipher = CipherState(recv_cipher._key)
233 decrypt_cipher.set_nonce(0)
234 plaintext = decrypt_cipher.decrypt_with_ad(b"", encrypted)
235
236 frame = NTCP2Frame.from_bytes(plaintext)
237 assert frame.frame_type == FrameType.TERMINATION
238
239
240class TestIsAlive:
241 def test_is_alive_true_when_open(self):
242 conn = _make_connection()
243 assert conn.is_alive() is True
244
245 def test_is_alive_false_after_close(self):
246 asyncio.run(self._test())
247
248 async def _test(self):
249 conn = _make_connection()
250 await conn.close()
251 assert conn.is_alive() is False
252
253
254class TestMultipleFrames:
255 def test_multiple_frames_nonce_advances(self):
256 asyncio.run(self._test())
257
258 async def _test(self):
259 """Send multiple frames; nonces must advance so decryption works."""
260 key = os.urandom(32)
261 send_cipher = CipherState(key)
262 recv_cipher = CipherState(key)
263
264 a_writer = MockStreamWriter()
265 b_reader = MockStreamReader()
266
267 from i2p_transport.ntcp2_connection import NTCP2Connection
268 conn_a = NTCP2Connection(MockStreamReader(), a_writer, send_cipher, CipherState(os.urandom(32)))
269 conn_b = NTCP2Connection(b_reader, MockStreamWriter(), CipherState(os.urandom(32)), recv_cipher)
270
271 frames = [
272 NTCP2Frame(FrameType.DATA, b"frame-0"),
273 NTCP2Frame(FrameType.DATA, b"frame-1"),
274 NTCP2Frame(FrameType.DATA, b"frame-2"),
275 ]
276
277 for f in frames:
278 await conn_a.send_frame(f)
279
280 # Wire all output
281 b_reader.feed(bytes(a_writer.data))
282
283 # Receive all
284 for i, expected in enumerate(frames):
285 received = await conn_b.recv_frame()
286 assert received.frame_type == expected.frame_type
287 assert received.payload == expected.payload, f"Frame {i} payload mismatch"