A Python port of the Invisible Internet Project (I2P)
1"""End-to-end integration tests for the I2P Python port.
2
3These tests exercise multiple subsystems together through the RouterContext,
4verifying that crypto, tunnel, NetDB, transport, and streaming components
5integrate correctly.
6"""
7
8import os
9import struct
10import time
11
12import pytest
13
14from i2p_router.core import RouterContext
15from i2p_crypto.session_key_manager import SessionKeyManager
16from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor
17from i2p_crypto.aes import AESEngine
18from i2p_tunnel.crypto import (
19 TunnelLayerEncryptor,
20 TunnelLayerDecryptor,
21 OutboundTunnelEncryptor,
22)
23from i2p_tunnel.data_handler import TunnelCryptoRegistry, TunnelDataHandler
24from i2p_transport.ntcp2_handshake import NTCP2Handshake, NTCP2FrameCodec
25from i2p_transport.ntcp2 import NTCP2Frame, FrameType
26from i2p_crypto.x25519 import X25519DH
27from i2p_netdb.operations import SearchOperation, StoreOperation
28from i2p_streaming.stream_io import MessageInputStream, MessageOutputStream, StreamSession
29from i2p_data.garlic import (
30 GarlicMessage,
31 GarlicClove,
32 DeliveryInstructions,
33 DeliveryType,
34)
35
36
37def _pad16(data: bytes) -> bytes:
38 """Pad data to a multiple of 16 bytes with zero bytes."""
39 remainder = len(data) % 16
40 if remainder == 0 and len(data) > 0:
41 return data
42 pad_len = 16 - remainder
43 return data + b"\x00" * pad_len
44
45
46def _build_garlic_plaintext(message_data: bytes) -> bytes:
47 """Build a valid GarlicMessage plaintext containing one clove with the given data.
48
49 Returns bytes that are a multiple of 16.
50 """
51 di = DeliveryInstructions(DeliveryType.LOCAL)
52 clove = GarlicClove(
53 delivery_instructions=di,
54 message_data=message_data,
55 clove_id=1,
56 expiration=int(time.time() * 1000) + 600_000,
57 )
58 garlic_msg = GarlicMessage([clove])
59 raw = garlic_msg.to_bytes()
60 return _pad16(raw)
61
62
63class TestGarlicEncryptDecryptThroughRouter:
64 """Test 1: Garlic encrypt/decrypt through RouterContext."""
65
66 def test_garlic_encrypt_decrypt_through_router(self):
67 router = RouterContext()
68
69 # Create a session (generates session key + tags)
70 dest_hash = os.urandom(32)
71 session_key, tags = router.session_key_mgr.create_session(dest_hash)
72
73 # Build garlic plaintext with a clove containing our test data
74 test_payload = b"hello from garlic clove"
75 garlic_plaintext = _build_garlic_plaintext(test_payload)
76
77 # Pick a session tag and encrypt
78 session_tag = tags[0]
79 encrypted_payload = GarlicEncryptor.encrypt(
80 garlic_plaintext, session_key, session_tag
81 )
82
83 # Feed through router's inbound handler (type 11 = GARLIC)
84 result = router.process_inbound(11, encrypted_payload)
85
86 # The garlic handler returns a list of clove message_data payloads
87 assert isinstance(result, list)
88 assert len(result) == 1
89 assert result[0] == test_payload
90
91
92class TestTunnelDataMultiHop:
93 """Test 2: Multi-hop tunnel encryption and decryption."""
94
95 def test_tunnel_data_multi_hop(self):
96 # Create 3 sets of hop keys (layer_key, iv_key)
97 hop_keys = [
98 (os.urandom(32), os.urandom(32)) # hop 0 (gateway)
99 for _ in range(3)
100 ]
101 # Use distinct keys for each hop
102 hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)]
103
104 # The endpoint is hop 2 (last hop)
105 endpoint_layer_key, endpoint_iv_key = hop_keys[2]
106
107 # Create a RouterContext for the endpoint
108 router = RouterContext()
109 endpoint_tunnel_id = 42
110 router.register_tunnel(
111 endpoint_tunnel_id,
112 endpoint_layer_key,
113 endpoint_iv_key,
114 is_endpoint=True,
115 )
116
117 # Original plaintext (must be multiple of 16)
118 original_plaintext = _pad16(b"tunnel payload data for endpoint delivery")
119
120 # Encrypt with OutboundTunnelEncryptor (applies layers in reverse order)
121 fully_encrypted = OutboundTunnelEncryptor.encrypt(original_plaintext, hop_keys)
122
123 # Simulate intermediate hops peeling layers
124 # Hop 0 (gateway) decrypts its layer
125 after_hop0 = TunnelLayerDecryptor.decrypt_layer(
126 fully_encrypted, hop_keys[0][0], hop_keys[0][1]
127 )
128 # Hop 1 (intermediate) decrypts its layer
129 after_hop1 = TunnelLayerDecryptor.decrypt_layer(
130 after_hop0, hop_keys[1][0], hop_keys[1][1]
131 )
132
133 # At the endpoint, call router.process_tunnel_data()
134 result = router.process_tunnel_data(endpoint_tunnel_id, after_hop1)
135
136 assert result["action"] == "deliver"
137 assert result["data"] == original_plaintext
138
139
140class TestTunnelBuildAndRegisterLifecycle:
141 """Test 3: Register a tunnel and verify lifecycle."""
142
143 def test_tunnel_build_and_register_lifecycle(self):
144 router = RouterContext()
145
146 tunnel_id = 100
147 layer_key = os.urandom(32)
148 iv_key = os.urandom(32)
149
150 # Register the tunnel
151 router.register_tunnel(tunnel_id, layer_key, iv_key, is_endpoint=True)
152
153 # Verify process_tunnel_data works on that tunnel
154 plaintext = _pad16(b"lifecycle test data")
155 # Encrypt a single layer (since only one hop registered)
156 encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key)
157 result = router.process_tunnel_data(tunnel_id, encrypted)
158
159 assert result["action"] == "deliver"
160 assert result["data"] == plaintext
161
162 # Check get_status() shows the tunnel
163 status = router.get_status()
164 assert status["tunnels_registered"] == 1
165
166 def test_unknown_tunnel_returns_unknown(self):
167 router = RouterContext()
168
169 data = _pad16(b"data for unknown tunnel")
170 result = router.process_tunnel_data(999, data)
171
172 assert result["action"] == "unknown"
173 assert result["tunnel_id"] == 999
174
175
176class TestNetDBStoreThenLookupViaInbound:
177 """Test 4: Store and lookup NetDB entries via inbound message processing."""
178
179 def test_netdb_store_then_lookup_via_inbound(self):
180 router = RouterContext()
181
182 # Store an entry via store_netdb_entry()
183 key = os.urandom(32)
184 data = b"router info data for testing netdb store and lookup"
185
186 router.store_netdb_entry(key, data)
187
188 # Lookup the entry via process_inbound(2, key_payload) (DATABASE_LOOKUP type)
189 # The handler extracts the first 32 bytes as the key
190 result = router.process_inbound(2, key)
191
192 assert result == data
193
194 def test_netdb_lookup_missing_returns_none(self):
195 router = RouterContext()
196
197 missing_key = os.urandom(32)
198 result = router.process_inbound(2, missing_key)
199
200 assert result is None
201
202 def test_netdb_store_via_inbound(self):
203 router = RouterContext()
204
205 key = os.urandom(32)
206 data = b"stored via inbound message"
207
208 # DATABASE_STORE is type 1; payload = key(32) + data
209 store_payload = key + data
210 router.process_inbound(1, store_payload)
211
212 # Verify it was stored by looking it up
213 result = router.lookup_netdb_entry(key)
214 assert result == data
215
216
217class TestNTCP2HandshakeThenFrameExchange:
218 """Test 5: Full NTCP2 Noise_XK handshake followed by frame exchange."""
219
220 def test_ntcp2_handshake_then_frame_exchange(self):
221 # Generate static keypairs for initiator and responder
222 init_priv, init_pub = X25519DH.generate_keypair()
223 resp_priv, resp_pub = X25519DH.generate_keypair()
224
225 # Create handshake objects
226 # Initiator knows responder's static public key
227 initiator = NTCP2Handshake(
228 our_static=(init_priv, init_pub),
229 peer_static_pub=resp_pub,
230 initiator=True,
231 )
232 # Responder does not know initiator's static key ahead of time
233 responder = NTCP2Handshake(
234 our_static=(resp_priv, resp_pub),
235 peer_static_pub=None,
236 initiator=False,
237 )
238
239 # Message 1: initiator -> responder
240 msg1 = initiator.create_message_1(b"")
241 # Message 2: responder processes msg1, produces msg2
242 msg2 = responder.process_message_1(msg1)
243 # Message 3: initiator processes msg2, produces msg3
244 msg3 = initiator.process_message_2(msg2)
245 # Responder processes msg3
246 responder.process_message_3(msg3)
247
248 # Both sides should be complete
249 assert initiator.is_complete()
250 assert responder.is_complete()
251
252 # Responder should have learned initiator's static key
253 assert responder.remote_static_key() == init_pub
254
255 # Get cipher states
256 init_send, init_recv = initiator.split()
257 resp_send, resp_recv = responder.split()
258
259 # Create a frame codec
260 codec = NTCP2FrameCodec()
261
262 # Test: initiator sends a frame, responder decrypts it
263 test_payload = b"hello from initiator to responder"
264 frame = NTCP2Frame(FrameType.I2NP, test_payload)
265
266 encrypted = codec.encrypt_frame(init_send, frame)
267 decrypted_frame = codec.decrypt_frame(resp_recv, encrypted)
268
269 assert decrypted_frame.frame_type == FrameType.I2NP
270 assert decrypted_frame.payload == test_payload
271
272 # Test: responder sends a frame, initiator decrypts it
273 reply_payload = b"reply from responder to initiator"
274 reply_frame = NTCP2Frame(FrameType.I2NP, reply_payload)
275
276 encrypted_reply = codec.encrypt_frame(resp_send, reply_frame)
277 decrypted_reply = codec.decrypt_frame(init_recv, encrypted_reply)
278
279 assert decrypted_reply.frame_type == FrameType.I2NP
280 assert decrypted_reply.payload == reply_payload
281
282 def test_ntcp2_multiple_frames_maintain_nonce(self):
283 """Verify that multiple frames can be sent/received with nonce advancement."""
284 init_priv, init_pub = X25519DH.generate_keypair()
285 resp_priv, resp_pub = X25519DH.generate_keypair()
286
287 initiator = NTCP2Handshake(
288 our_static=(init_priv, init_pub),
289 peer_static_pub=resp_pub,
290 initiator=True,
291 )
292 responder = NTCP2Handshake(
293 our_static=(resp_priv, resp_pub),
294 initiator=False,
295 )
296
297 msg1 = initiator.create_message_1()
298 msg2 = responder.process_message_1(msg1)
299 msg3 = initiator.process_message_2(msg2)
300 responder.process_message_3(msg3)
301
302 init_send, init_recv = initiator.split()
303 resp_send, resp_recv = responder.split()
304 codec = NTCP2FrameCodec()
305
306 # Send multiple frames in sequence
307 for i in range(5):
308 payload = f"message number {i}".encode()
309 frame = NTCP2Frame(FrameType.I2NP, payload)
310 encrypted = codec.encrypt_frame(init_send, frame)
311 decrypted = codec.decrypt_frame(resp_recv, encrypted)
312 assert decrypted.payload == payload
313
314
315class TestStreamingSessionDataFlow:
316 """Test 6: Streaming session data flow."""
317
318 def test_streaming_session_data_flow(self):
319 # Create a StreamSession and connect
320 session = StreamSession(local_id=1000)
321 seq, syn_data = session.connect()
322 assert session.state == "SYN_SENT"
323
324 # Simulate receiving SYN-ACK
325 remote_id = 2000
326 session.receive_syn_ack(remote_id)
327 assert session.state == "ESTABLISHED"
328
329 # Send data
330 test_data = b"streaming data payload for integration test"
331 packets = session.send(test_data)
332 assert len(packets) > 0
333
334 # Feed the sent packets into a MessageInputStream (simulating receiver)
335 input_stream = MessageInputStream()
336 for seq_num, chunk in packets:
337 input_stream.receive_packet(seq_num, chunk)
338
339 # Read the data back
340 readable = input_stream.readable_bytes()
341 assert readable == len(test_data)
342
343 received = input_stream.read(readable)
344 assert received == test_data
345
346 def test_streaming_session_bidirectional(self):
347 """Test bidirectional data flow between two sessions."""
348 # Session A (initiator)
349 session_a = StreamSession(local_id=100)
350 _, syn_data = session_a.connect()
351
352 # Session B (responder)
353 session_b = StreamSession(local_id=200)
354 remote_id_a = struct.unpack("!I", syn_data)[0]
355 _, syn_ack_data = session_b.accept(remote_id_a)
356
357 # Complete handshake
358 remote_id_b = struct.unpack("!II", syn_ack_data)[1]
359 session_a.receive_syn_ack(session_b.local_id)
360
361 assert session_a.state == "ESTABLISHED"
362 assert session_b.state == "ESTABLISHED"
363
364 # A sends to B
365 data_a = b"hello from A"
366 packets_a = session_a.send(data_a)
367 for seq_num, chunk in packets_a:
368 session_b.receive(seq_num, chunk)
369 assert session_b.read(len(data_a)) == data_a
370
371 # B sends to A
372 data_b = b"hello from B"
373 packets_b = session_b.send(data_b)
374 for seq_num, chunk in packets_b:
375 session_a.receive(seq_num, chunk)
376 assert session_a.read(len(data_b)) == data_b
377
378 def test_streaming_large_data_chunked(self):
379 """Test that large data is properly chunked and reassembled."""
380 session = StreamSession(local_id=3000)
381 session.connect()
382 session.receive_syn_ack(4000)
383
384 # Send data larger than max_packet_size (default 1024)
385 large_data = os.urandom(3000)
386 packets = session.send(large_data)
387
388 # Should be chunked into multiple packets
389 assert len(packets) > 1
390
391 # Reassemble on receive side
392 input_stream = MessageInputStream()
393 for seq_num, chunk in packets:
394 input_stream.receive_packet(seq_num, chunk)
395
396 received = input_stream.read(input_stream.readable_bytes())
397 assert received == large_data
398
399
400class TestFullMessageFlowNetDBStoreAndSearch:
401 """Test 7: Full message flow with NetDB store and search operations."""
402
403 def test_full_message_flow_netdb_store_and_search(self):
404 router = RouterContext()
405
406 # Store several entries in the NetDB
407 entries = {}
408 for i in range(5):
409 key = os.urandom(32)
410 data = f"router info entry {i}".encode()
411 router.store_netdb_entry(key, data)
412 entries[key] = data
413
414 # Pick a target to search for
415 target_key = list(entries.keys())[2]
416 target_data = entries[target_key]
417
418 # Create known peers (some with keys that are "close" to target)
419 peer1 = os.urandom(32)
420 peer2 = os.urandom(32)
421 peer3 = os.urandom(32)
422 known_peers = {
423 peer1: b"peer1_info",
424 peer2: b"peer2_info",
425 peer3: b"peer3_info",
426 }
427
428 # Start a search via the netdb_handler
429 search_id = router.netdb_handler.start_search(
430 target_key, known_peers=known_peers
431 )
432
433 # Feed a reply from peer1: didn't find it, but knows a closer peer
434 closer_peer = os.urandom(32)
435 next_queries = router.netdb_handler.on_search_reply(
436 search_id,
437 peer_hash=peer1,
438 found_data=None,
439 closer_peers={closer_peer: b"closer_peer_info"},
440 )
441
442 assert not router.netdb_handler.is_search_complete(search_id)
443
444 # Feed a reply from peer2: found the target data
445 router.netdb_handler.on_search_reply(
446 search_id,
447 peer_hash=peer2,
448 found_data=target_data,
449 )
450
451 assert router.netdb_handler.is_search_complete(search_id)
452 assert router.netdb_handler.get_search_result(search_id) == target_data
453
454 def test_search_exhausts_peers_without_finding(self):
455 """Test search completing by exhausting all peers."""
456 router = RouterContext()
457
458 target_key = os.urandom(32)
459 peer1 = os.urandom(32)
460 peer2 = os.urandom(32)
461 known_peers = {peer1: b"p1", peer2: b"p2"}
462
463 search_id = router.netdb_handler.start_search(
464 target_key, known_peers=known_peers
465 )
466
467 # Both peers reply without finding the target and without new peers
468 router.netdb_handler.on_search_reply(search_id, peer1, None)
469 router.netdb_handler.on_search_reply(search_id, peer2, None)
470
471 assert router.netdb_handler.is_search_complete(search_id)
472 assert router.netdb_handler.get_search_result(search_id) is None
473
474
475class TestRouterContextStatus:
476 """Test 8: RouterContext status reflects all state correctly."""
477
478 def test_router_context_status(self):
479 router = RouterContext()
480
481 # Initial status
482 status = router.get_status()
483 assert status["tunnels_registered"] == 0
484 assert status["netdb_entries"] == 0
485 assert status["inbound_count"] == 0
486 assert status["outbound_count"] == 0
487 assert len(status["router_hash"]) == 64 # 32 bytes hex-encoded
488
489 # Register tunnels
490 for tid in [10, 20, 30]:
491 router.register_tunnel(tid, os.urandom(32), os.urandom(32))
492
493 status = router.get_status()
494 assert status["tunnels_registered"] == 3
495
496 # Store NetDB entries
497 for _ in range(5):
498 router.store_netdb_entry(os.urandom(32), b"test entry data")
499
500 status = router.get_status()
501 assert status["netdb_entries"] == 5
502
503 # Status should still reflect all registered tunnels
504 assert status["tunnels_registered"] == 3
505
506 def test_router_context_custom_hash(self):
507 """Verify that a custom router_hash is preserved."""
508 custom_hash = os.urandom(32)
509 router = RouterContext(router_hash=custom_hash)
510
511 status = router.get_status()
512 assert status["router_hash"] == custom_hash.hex()
513
514 def test_router_context_all_subsystems_wired(self):
515 """Verify that all subsystems are accessible and properly wired."""
516 router = RouterContext()
517
518 # Session key manager works
519 dest = os.urandom(32)
520 key, tags = router.session_key_mgr.create_session(dest)
521 assert len(key) == 32
522 assert len(tags) == 20
523
524 # Garlic encryptor/decryptor are accessible
525 assert router.garlic_encryptor is not None
526 assert router.garlic_decryptor is not None
527
528 # Tunnel subsystems work
529 router.register_tunnel(1, os.urandom(32), os.urandom(32))
530 assert router.get_status()["tunnels_registered"] == 1
531
532 # NetDB subsystem works
533 k = os.urandom(32)
534 router.store_netdb_entry(k, b"data")
535 assert router.lookup_netdb_entry(k) == b"data"
536
537 # Outbound routing works
538 result = router.route_outbound(0, b"local payload")
539 assert result["type"] == "local"
540 assert result["payload"] == b"local payload"