A Python port of the Invisible Internet Project (I2P)
1"""Tests for the real NTCP2 listener and connector with AES-CBC handshake
2and SipHash-obfuscated transport.
3
4TDD tests -- written before the implementation in
5src/i2p_transport/ntcp2_real_server.py.
6"""
7
8import asyncio
9import hashlib
10import os
11
12from i2p_crypto.x25519 import X25519DH
13from i2p_transport.ntcp2_blocks import BLOCK_I2NP, BLOCK_ROUTERINFO
14from i2p_transport.ntcp2_real_connection import NTCP2RealConnection
15from i2p_transport.ntcp2_real_server import NTCP2RealListener, NTCP2RealConnector
16
17
18# ---------------------------------------------------------------------------
19# helpers
20# ---------------------------------------------------------------------------
21
22def _make_keypair():
23 return X25519DH.generate_keypair()
24
25
26def _dummy_ri(size: int = 64) -> bytes:
27 return os.urandom(size)
28
29
30# ---------------------------------------------------------------------------
31# Test 1: NTCP2RealListener construction
32# ---------------------------------------------------------------------------
33
34class TestListenerConstruction:
35 def test_basic_construction(self):
36 static = _make_keypair()
37 ri_hash = os.urandom(32)
38 iv = os.urandom(16)
39
40 listener = NTCP2RealListener(
41 host="127.0.0.1",
42 port=0,
43 our_static_key=static,
44 our_ri_hash=ri_hash,
45 our_iv=iv,
46 on_connection=None,
47 )
48 assert listener is not None
49
50 def test_construction_with_callback(self):
51 static = _make_keypair()
52 ri_hash = os.urandom(32)
53 iv = os.urandom(16)
54
55 async def cb(conn):
56 pass
57
58 listener = NTCP2RealListener(
59 host="127.0.0.1",
60 port=0,
61 our_static_key=static,
62 our_ri_hash=ri_hash,
63 our_iv=iv,
64 on_connection=cb,
65 )
66 assert listener is not None
67
68
69# ---------------------------------------------------------------------------
70# Test 2: NTCP2RealConnector construction
71# ---------------------------------------------------------------------------
72
73class TestConnectorConstruction:
74 def test_basic_construction(self):
75 connector = NTCP2RealConnector()
76 assert connector is not None
77
78
79# ---------------------------------------------------------------------------
80# Test 3: Localhost integration — full real TCP roundtrip
81# ---------------------------------------------------------------------------
82
83class TestLocalhostIntegration:
84 def test_full_roundtrip(self):
85 """Start listener, connect, handshake, exchange I2NP, verify."""
86
87 async def _run():
88 # Responder (Bob) identity
89 bob_static = _make_keypair()
90 bob_ri_bytes = _dummy_ri(128)
91 bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest()
92 bob_iv = os.urandom(16)
93
94 # Initiator (Alice) identity
95 alice_static = _make_keypair()
96 alice_ri_bytes = _dummy_ri(128)
97
98 # Track connections received by listener
99 connections = []
100 conn_event = asyncio.Event()
101
102 async def on_conn(conn):
103 connections.append(conn)
104 conn_event.set()
105
106 listener = NTCP2RealListener(
107 host="127.0.0.1",
108 port=0,
109 our_static_key=bob_static,
110 our_ri_hash=bob_ri_hash,
111 our_iv=bob_iv,
112 on_connection=on_conn,
113 )
114 server = await listener.start()
115 addr = server.sockets[0].getsockname()
116 port = addr[1]
117
118 try:
119 # Alice connects to Bob
120 connector = NTCP2RealConnector()
121 alice_conn = await connector.connect(
122 host="127.0.0.1",
123 port=port,
124 our_static_key=alice_static,
125 our_ri_bytes=alice_ri_bytes,
126 peer_static_pub=bob_static[1],
127 peer_ri_hash=bob_ri_hash,
128 peer_iv=bob_iv,
129 )
130
131 # Wait for Bob's side to complete handshake
132 await asyncio.wait_for(conn_event.wait(), timeout=5.0)
133 assert len(connections) == 1
134 bob_conn = connections[0]
135
136 # Both connections should be alive
137 assert alice_conn.is_alive()
138 assert bob_conn.is_alive()
139
140 # Alice sends I2NP message to Bob
141 test_msg = b"Hello from Alice via real NTCP2!"
142 await alice_conn.send_i2np(test_msg)
143
144 # Bob receives it
145 blocks = await asyncio.wait_for(bob_conn.recv_frame(), timeout=5.0)
146 i2np_blocks = [b for b in blocks if b.block_type == BLOCK_I2NP]
147 assert len(i2np_blocks) == 1
148 assert i2np_blocks[0].data == test_msg
149
150 # Bob replies
151 reply_msg = b"Hello from Bob via real NTCP2!"
152 await bob_conn.send_i2np(reply_msg)
153
154 # Alice receives reply
155 blocks2 = await asyncio.wait_for(alice_conn.recv_frame(), timeout=5.0)
156 i2np_blocks2 = [b for b in blocks2 if b.block_type == BLOCK_I2NP]
157 assert len(i2np_blocks2) == 1
158 assert i2np_blocks2[0].data == reply_msg
159
160 finally:
161 # Close connections before server to avoid Python 3.12+ wait_closed hang
162 alice_conn._writer.close()
163 if connections:
164 connections[0]._writer.close()
165 listener.close()
166
167 asyncio.run(_run())
168
169
170# ---------------------------------------------------------------------------
171# Test 4: Handshake produces correct transport keys
172# ---------------------------------------------------------------------------
173
174class TestTransportKeys:
175 def test_both_sides_have_working_cipher_and_siphash(self):
176 """After handshake, both sides can encrypt/decrypt with SipHash framing."""
177
178 async def _run():
179 bob_static = _make_keypair()
180 bob_ri_bytes = _dummy_ri(128)
181 bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest()
182 bob_iv = os.urandom(16)
183
184 alice_static = _make_keypair()
185 alice_ri_bytes = _dummy_ri(128)
186
187 connections = []
188 conn_event = asyncio.Event()
189
190 async def on_conn(conn):
191 connections.append(conn)
192 conn_event.set()
193
194 listener = NTCP2RealListener(
195 host="127.0.0.1",
196 port=0,
197 our_static_key=bob_static,
198 our_ri_hash=bob_ri_hash,
199 our_iv=bob_iv,
200 on_connection=on_conn,
201 )
202 server = await listener.start()
203 port = server.sockets[0].getsockname()[1]
204
205 try:
206 connector = NTCP2RealConnector()
207 alice_conn = await connector.connect(
208 host="127.0.0.1",
209 port=port,
210 our_static_key=alice_static,
211 our_ri_bytes=alice_ri_bytes,
212 peer_static_pub=bob_static[1],
213 peer_ri_hash=bob_ri_hash,
214 peer_iv=bob_iv,
215 )
216 await asyncio.wait_for(conn_event.wait(), timeout=5.0)
217 bob_conn = connections[0]
218
219 # Verify both are NTCP2RealConnection instances
220 assert isinstance(alice_conn, NTCP2RealConnection)
221 assert isinstance(bob_conn, NTCP2RealConnection)
222
223 # Send multiple messages in sequence to verify nonce incrementing
224 for i in range(3):
225 msg = f"message-{i}".encode()
226 await alice_conn.send_i2np(msg)
227 blocks = await asyncio.wait_for(bob_conn.recv_frame(), timeout=5.0)
228 i2np = [b for b in blocks if b.block_type == BLOCK_I2NP]
229 assert i2np[0].data == msg
230
231 finally:
232 alice_conn._writer.close()
233 if connections:
234 connections[0]._writer.close()
235 listener.close()
236
237 asyncio.run(_run())
238
239
240# ---------------------------------------------------------------------------
241# Test 5: Multiple connections
242# ---------------------------------------------------------------------------
243
244class TestMultipleConnections:
245 def test_listener_accepts_multiple_sequential_connections(self):
246 """Listener can accept multiple connections sequentially."""
247
248 async def _run():
249 bob_static = _make_keypair()
250 bob_ri_bytes = _dummy_ri(128)
251 bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest()
252 bob_iv = os.urandom(16)
253
254 connections = []
255 conn_events = [asyncio.Event(), asyncio.Event()]
256 conn_idx = 0
257
258 async def on_conn(conn):
259 nonlocal conn_idx
260 connections.append(conn)
261 if conn_idx < len(conn_events):
262 conn_events[conn_idx].set()
263 conn_idx += 1
264
265 listener = NTCP2RealListener(
266 host="127.0.0.1",
267 port=0,
268 our_static_key=bob_static,
269 our_ri_hash=bob_ri_hash,
270 our_iv=bob_iv,
271 on_connection=on_conn,
272 )
273 server = await listener.start()
274 port = server.sockets[0].getsockname()[1]
275
276 try:
277 connector = NTCP2RealConnector()
278
279 # First connection
280 alice1_static = _make_keypair()
281 alice1_ri = _dummy_ri(64)
282 conn1 = await connector.connect(
283 host="127.0.0.1",
284 port=port,
285 our_static_key=alice1_static,
286 our_ri_bytes=alice1_ri,
287 peer_static_pub=bob_static[1],
288 peer_ri_hash=bob_ri_hash,
289 peer_iv=bob_iv,
290 )
291 await asyncio.wait_for(conn_events[0].wait(), timeout=5.0)
292
293 # Second connection
294 alice2_static = _make_keypair()
295 alice2_ri = _dummy_ri(64)
296 conn2 = await connector.connect(
297 host="127.0.0.1",
298 port=port,
299 our_static_key=alice2_static,
300 our_ri_bytes=alice2_ri,
301 peer_static_pub=bob_static[1],
302 peer_ri_hash=bob_ri_hash,
303 peer_iv=bob_iv,
304 )
305 await asyncio.wait_for(conn_events[1].wait(), timeout=5.0)
306
307 assert len(connections) == 2
308
309 # Verify both are independent — send on each, receive on each
310 await conn1.send_i2np(b"from-conn1")
311 blocks1 = await asyncio.wait_for(
312 connections[0].recv_frame(), timeout=5.0
313 )
314 i2np1 = [b for b in blocks1 if b.block_type == BLOCK_I2NP]
315 assert i2np1[0].data == b"from-conn1"
316
317 await conn2.send_i2np(b"from-conn2")
318 blocks2 = await asyncio.wait_for(
319 connections[1].recv_frame(), timeout=5.0
320 )
321 i2np2 = [b for b in blocks2 if b.block_type == BLOCK_I2NP]
322 assert i2np2[0].data == b"from-conn2"
323
324 finally:
325 conn1._writer.close()
326 conn2._writer.close()
327 for c in connections:
328 c._writer.close()
329 listener.close()
330
331 asyncio.run(_run())