A Python port of the Invisible Internet Project (I2P)
1"""Tests for NTCP2 listener, connector, and connection manager."""
2
3import asyncio
4import struct
5import unittest
6
7from i2p_crypto.x25519 import X25519DH
8from i2p_transport.ntcp2 import NTCP2Frame, FrameType
9from i2p_transport.ntcp2_connection import NTCP2Connection
10from i2p_transport.ntcp2_server import (
11 ConnectionManager,
12 NTCP2Listener,
13 NTCP2Connector,
14)
15
16
17# ---------------------------------------------------------------------------
18# Mock connection for ConnectionManager tests (no asyncio needed)
19# ---------------------------------------------------------------------------
20
21class MockWriter:
22 """Minimal mock for writer.close() / is_closing()."""
23
24 def __init__(self):
25 self.closed = False
26
27 def close(self):
28 self.closed = True
29
30 def is_closing(self):
31 return self.closed
32
33
34def _make_mock_connection(remote_hash: bytes = b"") -> NTCP2Connection:
35 """Create an NTCP2Connection with mock streams (no real I/O)."""
36 from i2p_crypto.noise import CipherState
37
38 cs = CipherState(b"\x00" * 32)
39 conn = NTCP2Connection(
40 reader=None,
41 writer=MockWriter(),
42 cipher_send=cs,
43 cipher_recv=CipherState(b"\x01" * 32),
44 remote_hash=remote_hash,
45 )
46 return conn
47
48
49# ===========================================================================
50# ConnectionManager tests — synchronous
51# ===========================================================================
52
53class TestConnectionManager(unittest.TestCase):
54 """Unit tests for ConnectionManager."""
55
56 def test_add_and_get(self):
57 mgr = ConnectionManager()
58 peer = b"\xaa" * 32
59 conn = _make_mock_connection(peer)
60 mgr.add(peer, conn)
61 self.assertIs(mgr.get(peer), conn)
62
63 def test_get_unknown_returns_none(self):
64 mgr = ConnectionManager()
65 self.assertIsNone(mgr.get(b"\xbb" * 32))
66
67 def test_remove(self):
68 mgr = ConnectionManager()
69 peer = b"\xcc" * 32
70 conn = _make_mock_connection(peer)
71 mgr.add(peer, conn)
72 mgr.remove(peer)
73 self.assertIsNone(mgr.get(peer))
74
75 def test_remove_nonexistent_is_noop(self):
76 mgr = ConnectionManager()
77 mgr.remove(b"\xdd" * 32) # should not raise
78
79 def test_active_count(self):
80 mgr = ConnectionManager()
81 self.assertEqual(mgr.active_count(), 0)
82 mgr.add(b"\x01" * 32, _make_mock_connection())
83 mgr.add(b"\x02" * 32, _make_mock_connection())
84 self.assertEqual(mgr.active_count(), 2)
85 mgr.remove(b"\x01" * 32)
86 self.assertEqual(mgr.active_count(), 1)
87
88 def test_all_peer_hashes(self):
89 mgr = ConnectionManager()
90 h1 = b"\x01" * 32
91 h2 = b"\x02" * 32
92 mgr.add(h1, _make_mock_connection())
93 mgr.add(h2, _make_mock_connection())
94 hashes = mgr.all_peer_hashes()
95 self.assertEqual(len(hashes), 2)
96 self.assertIn(h1, hashes)
97 self.assertIn(h2, hashes)
98
99 def test_close_all(self):
100 mgr = ConnectionManager()
101 conns = []
102 for i in range(3):
103 c = _make_mock_connection()
104 mgr.add(bytes([i]) * 32, c)
105 conns.append(c)
106
107 mgr.close_all()
108
109 # All writers should be closed
110 for c in conns:
111 self.assertTrue(c._writer.closed)
112 # Manager should be empty
113 self.assertEqual(mgr.active_count(), 0)
114
115
116# ===========================================================================
117# NTCP2Listener + NTCP2Connector integration tests — asyncio
118# ===========================================================================
119
120class TestListenerConnectorIntegration(unittest.TestCase):
121 """Integration tests that run a real TCP listener and connector."""
122
123 def test_listener_connector_handshake(self):
124 """Start listener, connect with connector, verify handshake completes."""
125 asyncio.run(self._test_handshake())
126
127 async def _test_handshake(self):
128 listener_static = X25519DH.generate_keypair()
129 connector_static = X25519DH.generate_keypair()
130
131 connections = []
132
133 async def on_conn(conn):
134 connections.append(conn)
135
136 listener = NTCP2Listener(
137 "127.0.0.1", 0, listener_static, on_connection=on_conn
138 )
139 server = await listener.start()
140 port = server.sockets[0].getsockname()[1]
141
142 connector = NTCP2Connector()
143 conn = await connector.connect(
144 "127.0.0.1", port, connector_static, listener_static[1]
145 )
146
147 # Give the listener a moment to finish handling
148 await asyncio.sleep(0.2)
149
150 # Handshake completed — connector got a connection
151 self.assertIsInstance(conn, NTCP2Connection)
152 self.assertTrue(conn.is_alive())
153
154 # Listener side received the connection via callback
155 self.assertEqual(len(connections), 1)
156 self.assertIsInstance(connections[0], NTCP2Connection)
157 self.assertTrue(connections[0].is_alive())
158
159 # Close connections before server (Python 3.12+ wait_closed blocks otherwise)
160 conn._writer.close()
161 connections[0]._writer.close()
162 server.close()
163 await server.wait_closed()
164
165 def test_frame_exchange_after_handshake(self):
166 """After handshake, send a frame from connector and receive on listener side."""
167 asyncio.run(self._test_frame_exchange())
168
169 async def _test_frame_exchange(self):
170 listener_static = X25519DH.generate_keypair()
171 connector_static = X25519DH.generate_keypair()
172
173 connections = []
174
175 async def on_conn(conn):
176 connections.append(conn)
177
178 listener = NTCP2Listener(
179 "127.0.0.1", 0, listener_static, on_connection=on_conn
180 )
181 server = await listener.start()
182 port = server.sockets[0].getsockname()[1]
183
184 connector = NTCP2Connector()
185 client_conn = await connector.connect(
186 "127.0.0.1", port, connector_static, listener_static[1]
187 )
188
189 await asyncio.sleep(0.2)
190 server_conn = connections[0]
191
192 # Send a frame from client to server
193 test_payload = b"hello from connector"
194 frame = NTCP2Frame(FrameType.I2NP, test_payload)
195 await client_conn.send_frame(frame)
196
197 # Server receives it
198 received = await server_conn.recv_frame()
199 self.assertEqual(received.frame_type, FrameType.I2NP)
200 self.assertEqual(received.payload, test_payload)
201
202 # Send a frame from server to client
203 reply_payload = b"hello from listener"
204 reply_frame = NTCP2Frame(FrameType.I2NP, reply_payload)
205 await server_conn.send_frame(reply_frame)
206
207 received_reply = await client_conn.recv_frame()
208 self.assertEqual(received_reply.frame_type, FrameType.I2NP)
209 self.assertEqual(received_reply.payload, reply_payload)
210
211 # Close connections before server (Python 3.12+ wait_closed blocks otherwise)
212 client_conn._writer.close()
213 server_conn._writer.close()
214 server.close()
215 await server.wait_closed()
216
217 def test_multiple_connections(self):
218 """Listener accepts multiple concurrent connections."""
219 asyncio.run(self._test_multiple_connections())
220
221 async def _test_multiple_connections(self):
222 listener_static = X25519DH.generate_keypair()
223
224 connections = []
225
226 async def on_conn(conn):
227 connections.append(conn)
228
229 listener = NTCP2Listener(
230 "127.0.0.1", 0, listener_static, on_connection=on_conn
231 )
232 server = await listener.start()
233 port = server.sockets[0].getsockname()[1]
234
235 # Connect three clients
236 client_conns = []
237 for _ in range(3):
238 cs = X25519DH.generate_keypair()
239 connector = NTCP2Connector()
240 conn = await connector.connect(
241 "127.0.0.1", port, cs, listener_static[1]
242 )
243 client_conns.append(conn)
244
245 await asyncio.sleep(0.3)
246
247 self.assertEqual(len(connections), 3)
248 for c in client_conns:
249 self.assertTrue(c.is_alive())
250
251 # Close connections before server (Python 3.12+ wait_closed blocks otherwise)
252 for c in client_conns:
253 c._writer.close()
254 for c in connections:
255 c._writer.close()
256 server.close()
257 await server.wait_closed()
258
259 def test_connector_without_listener_raises(self):
260 """Connecting to a port with no listener should raise."""
261 asyncio.run(self._test_connector_no_listener())
262
263 async def _test_connector_no_listener(self):
264 connector = NTCP2Connector()
265 cs = X25519DH.generate_keypair()
266 with self.assertRaises(ConnectionRefusedError):
267 await connector.connect(
268 "127.0.0.1", 19999, cs, b"\x00" * 32
269 )
270
271
272if __name__ == "__main__":
273 unittest.main()