A Python port of the Invisible Internet Project (I2P)
1"""Podman pod integration tests for NTCP2 network I/O.
2
3Spins up a podman pod with two Python containers that perform a real
4NTCP2 handshake over TCP, exchange encrypted frames, and verify results.
5
6Requirements:
7 - podman available on the host
8 - python:3.12-slim image available (will be pulled if not)
9 - This test is slow (~30s) due to container startup + pip install
10"""
11
12import json
13import os
14import subprocess
15import tempfile
16import time
17import unittest
18
19
20def _run(cmd, **kwargs):
21 """Run a command and return stdout, raising on failure."""
22 result = subprocess.run(
23 cmd, shell=True, capture_output=True, text=True, timeout=120, **kwargs
24 )
25 if result.returncode != 0:
26 raise RuntimeError(
27 f"Command failed: {cmd}\nstdout: {result.stdout}\nstderr: {result.stderr}"
28 )
29 return result.stdout.strip()
30
31
32def _run_quiet(cmd, **kwargs):
33 """Run a command, ignoring errors."""
34 subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=60, **kwargs)
35
36
37POD_NAME = "i2p-ntcp2-test"
38LISTENER_PORT = 6000
39APP_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
40
41# pip install is cached across tests if the container survives,
42# but each test creates a fresh pod so we install each time.
43PIP_CMD = "pip install -q cryptography 2>/dev/null"
44
45
46class TestNTCP2PodmanIntegration(unittest.TestCase):
47 """Integration tests using a podman pod with two Python containers."""
48
49 @classmethod
50 def setUpClass(cls):
51 """Check that podman is available."""
52 try:
53 _run("podman --version")
54 except (FileNotFoundError, RuntimeError):
55 raise unittest.SkipTest("podman not available")
56
57 def setUp(self):
58 """Create a shared tmpdir and a podman pod."""
59 self._shared_dir = tempfile.mkdtemp(prefix="i2p-ntcp2-test-")
60 os.chmod(self._shared_dir, 0o777)
61 # Clean up any leftover pod
62 _run_quiet(f"podman pod rm -f {POD_NAME}")
63 _run(f"podman pod create --name {POD_NAME}")
64
65 def tearDown(self):
66 """Remove the pod and shared dir."""
67 _run_quiet(f"podman pod rm -f {POD_NAME}")
68 # Clean up shared dir
69 import shutil
70 shutil.rmtree(self._shared_dir, ignore_errors=True)
71
72 def _wait_for_file(self, path, timeout=60):
73 """Wait for a file to appear and have content."""
74 deadline = time.time() + timeout
75 while time.time() < deadline:
76 if os.path.exists(path) and os.path.getsize(path) > 0:
77 try:
78 with open(path) as f:
79 return json.load(f)
80 except (json.JSONDecodeError, IOError):
81 pass
82 time.sleep(0.5)
83 # Try to get container logs for debugging
84 for name in ["router-a", "router-b"]:
85 logs = subprocess.run(
86 f"podman logs {name}", shell=True, capture_output=True, text=True
87 )
88 print(f"\n--- {name} logs ---\n{logs.stdout}\n{logs.stderr}")
89 raise TimeoutError(f"File {path} not ready after {timeout}s")
90
91 def test_two_routers_handshake_and_frame_exchange(self):
92 """Two Python routers in a pod perform NTCP2 handshake and exchange frames."""
93 shared = self._shared_dir
94
95 # Start router-a (listener)
96 _run(
97 f"podman run -d --pod {POD_NAME} --name router-a "
98 f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z "
99 f"python:3.12-slim bash -c '"
100 f"{PIP_CMD} && PYTHONPATH=/app/src python /app/scripts/router_listener.py "
101 f"--port {LISTENER_PORT} "
102 f"--key-file /shared/listener_key.bin "
103 f"--result-file /shared/listener_result.json'"
104 )
105
106 # Start router-b (connector) — connects to localhost in same pod
107 _run(
108 f"podman run -d --pod {POD_NAME} --name router-b "
109 f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z "
110 f"python:3.12-slim bash -c '"
111 f"{PIP_CMD} && PYTHONPATH=/app/src python /app/scripts/router_connector.py "
112 f"--host 127.0.0.1 --port {LISTENER_PORT} "
113 f"--key-file /shared/listener_key.bin "
114 f"--result-file /shared/connector_result.json'"
115 )
116
117 # Wait for both results
118 listener_result = self._wait_for_file(os.path.join(shared, "listener_result.json"))
119 connector_result = self._wait_for_file(os.path.join(shared, "connector_result.json"))
120
121 # Verify listener
122 self.assertEqual(listener_result["status"], "ok", f"Listener failed: {listener_result}")
123 self.assertEqual(listener_result["handshake"], "complete")
124 self.assertEqual(listener_result["received_payload"], "hello from connector")
125 self.assertTrue(listener_result["sent_reply"])
126
127 # Verify connector
128 self.assertEqual(connector_result["status"], "ok", f"Connector failed: {connector_result}")
129 self.assertEqual(connector_result["handshake"], "complete")
130 self.assertEqual(connector_result["received_payload"], "hello from listener")
131 self.assertTrue(connector_result["sent_frame"])
132
133 def test_router_info_exchange(self):
134 """After handshake, both sides exchange ROUTER_INFO frames."""
135 shared = self._shared_dir
136
137 # For this test, we use a combined script that sends RouterInfo frames
138 # Write a small inline script that does handshake + RouterInfo exchange
139 script = '''
140import asyncio
141import json
142import os
143import struct
144import sys
145import time
146import traceback
147
148sys.path.insert(0, "/app/src")
149
150from i2p_crypto.x25519 import X25519DH
151from i2p_transport.ntcp2 import NTCP2Frame, FrameType
152from i2p_transport.ntcp2_connection import NTCP2Connection
153from i2p_transport.ntcp2_handshake import NTCP2Handshake
154
155
156async def _send_hs(writer, msg):
157 writer.write(struct.pack("!H", len(msg)) + msg)
158 await writer.drain()
159
160async def _recv_hs(reader):
161 lb = await reader.readexactly(2)
162 length = struct.unpack("!H", lb)[0]
163 return await reader.readexactly(length)
164
165
166ROLE = os.environ["ROLE"] # "listener" or "connector"
167PORT = int(os.environ.get("PORT", "6000"))
168SHARED = "/shared"
169
170
171async def run_listener():
172 static = X25519DH.generate_keypair()
173 with open(f"{SHARED}/ri_listener_key.bin", "wb") as f:
174 f.write(static[1])
175
176 done = asyncio.Event()
177 result = {"status": "error"}
178 holder = [None]
179
180 async def handle(reader, writer):
181 try:
182 hs = NTCP2Handshake(our_static=static, peer_static_pub=None, initiator=False)
183 msg1 = await asyncio.wait_for(_recv_hs(reader), timeout=10)
184 msg2 = hs.process_message_1(msg1)
185 await _send_hs(writer, msg2)
186 msg3 = await asyncio.wait_for(_recv_hs(reader), timeout=10)
187 hs.process_message_3(msg3)
188 sc, rc = hs.split()
189 conn = NTCP2Connection(reader=reader, writer=writer,
190 cipher_send=sc, cipher_recv=rc,
191 remote_hash=hs.remote_static_key() or b"")
192 holder[0] = conn
193 done.set()
194 except Exception as e:
195 print(f"Error: {e}", flush=True)
196 writer.close()
197
198 server = await asyncio.start_server(handle, "0.0.0.0", PORT)
199 print(f"RI listener on port {PORT}", flush=True)
200
201 await asyncio.wait_for(done.wait(), timeout=30)
202 conn = holder[0]
203
204 # Send our "RouterInfo" (simulated as ROUTER_INFO frame type)
205 our_ri = json.dumps({"router": "listener", "key": static[1].hex()}).encode()
206 await conn.send_frame(NTCP2Frame(FrameType.ROUTER_INFO, our_ri))
207
208 # Receive peer's RouterInfo
209 peer_frame = await asyncio.wait_for(conn.recv_frame(), timeout=10)
210
211 result = {
212 "status": "ok",
213 "sent_ri": True,
214 "received_ri_type": peer_frame.frame_type.value,
215 "received_ri_payload": peer_frame.payload.decode("utf-8", errors="replace"),
216 }
217 conn._writer.close()
218 server.close()
219 return result
220
221
222async def run_connector():
223 deadline = time.time() + 30
224 while time.time() < deadline:
225 kf = f"{SHARED}/ri_listener_key.bin"
226 if os.path.exists(kf) and os.path.getsize(kf) == 32:
227 break
228 await asyncio.sleep(0.2)
229
230 with open(f"{SHARED}/ri_listener_key.bin", "rb") as f:
231 peer_pub = f.read()
232
233 static = X25519DH.generate_keypair()
234 reader, writer = await asyncio.wait_for(
235 asyncio.open_connection("127.0.0.1", PORT), timeout=10)
236
237 hs = NTCP2Handshake(our_static=static, peer_static_pub=peer_pub, initiator=True)
238 msg1 = hs.create_message_1()
239 await _send_hs(writer, msg1)
240 msg2 = await asyncio.wait_for(_recv_hs(reader), timeout=10)
241 msg3 = hs.process_message_2(msg2)
242 await _send_hs(writer, msg3)
243 sc, rc = hs.split()
244 conn = NTCP2Connection(reader=reader, writer=writer,
245 cipher_send=sc, cipher_recv=rc, remote_hash=peer_pub)
246
247 # Send our RouterInfo
248 our_ri = json.dumps({"router": "connector", "key": static[1].hex()}).encode()
249 await conn.send_frame(NTCP2Frame(FrameType.ROUTER_INFO, our_ri))
250
251 # Receive peer's RouterInfo
252 peer_frame = await asyncio.wait_for(conn.recv_frame(), timeout=10)
253
254 result = {
255 "status": "ok",
256 "sent_ri": True,
257 "received_ri_type": peer_frame.frame_type.value,
258 "received_ri_payload": peer_frame.payload.decode("utf-8", errors="replace"),
259 }
260 conn._writer.close()
261 return result
262
263
264async def main():
265 try:
266 if ROLE == "listener":
267 result = await run_listener()
268 else:
269 result = await run_connector()
270 except Exception as e:
271 result = {"status": "error", "error": str(e)}
272 traceback.print_exc()
273 with open(f"{SHARED}/ri_{ROLE}_result.json", "w") as f:
274 json.dump(result, f)
275 print(f"Result: {result}", flush=True)
276
277asyncio.run(main())
278'''
279 script_path = os.path.join(shared, "ri_exchange.py")
280 with open(script_path, "w") as f:
281 f.write(script)
282
283 # Start listener
284 _run(
285 f"podman run -d --pod {POD_NAME} --name ri-listener "
286 f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z "
287 f"-e ROLE=listener -e PORT={LISTENER_PORT} "
288 f"python:3.12-slim bash -c '"
289 f"{PIP_CMD} && python /shared/ri_exchange.py'"
290 )
291
292 # Start connector
293 _run(
294 f"podman run -d --pod {POD_NAME} --name ri-connector "
295 f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z "
296 f"-e ROLE=connector -e PORT={LISTENER_PORT} "
297 f"python:3.12-slim bash -c '"
298 f"{PIP_CMD} && python /shared/ri_exchange.py'"
299 )
300
301 # Wait for results
302 listener_result = self._wait_for_file(os.path.join(shared, "ri_listener_result.json"))
303 connector_result = self._wait_for_file(os.path.join(shared, "ri_connector_result.json"))
304
305 # Verify both exchanged RouterInfo frames
306 self.assertEqual(listener_result["status"], "ok", f"Listener failed: {listener_result}")
307 self.assertTrue(listener_result["sent_ri"])
308 self.assertEqual(listener_result["received_ri_type"], FrameType.ROUTER_INFO.value)
309 # Verify the payload contains the connector's router info
310 ri_data = json.loads(listener_result["received_ri_payload"])
311 self.assertEqual(ri_data["router"], "connector")
312
313 self.assertEqual(connector_result["status"], "ok", f"Connector failed: {connector_result}")
314 self.assertTrue(connector_result["sent_ri"])
315 self.assertEqual(connector_result["received_ri_type"], FrameType.ROUTER_INFO.value)
316 ri_data = json.loads(connector_result["received_ri_payload"])
317 self.assertEqual(ri_data["router"], "listener")
318
319
320# Need FrameType for assertions
321from i2p_transport.ntcp2 import FrameType
322
323
324if __name__ == "__main__":
325 unittest.main()