A Python port of the Invisible Internet Project (I2P)
at main 325 lines 12 kB view raw
1"""Podman pod integration tests for NTCP2 network I/O. 2 3Spins up a podman pod with two Python containers that perform a real 4NTCP2 handshake over TCP, exchange encrypted frames, and verify results. 5 6Requirements: 7 - podman available on the host 8 - python:3.12-slim image available (will be pulled if not) 9 - This test is slow (~30s) due to container startup + pip install 10""" 11 12import json 13import os 14import subprocess 15import tempfile 16import time 17import unittest 18 19 20def _run(cmd, **kwargs): 21 """Run a command and return stdout, raising on failure.""" 22 result = subprocess.run( 23 cmd, shell=True, capture_output=True, text=True, timeout=120, **kwargs 24 ) 25 if result.returncode != 0: 26 raise RuntimeError( 27 f"Command failed: {cmd}\nstdout: {result.stdout}\nstderr: {result.stderr}" 28 ) 29 return result.stdout.strip() 30 31 32def _run_quiet(cmd, **kwargs): 33 """Run a command, ignoring errors.""" 34 subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=60, **kwargs) 35 36 37POD_NAME = "i2p-ntcp2-test" 38LISTENER_PORT = 6000 39APP_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) 40 41# pip install is cached across tests if the container survives, 42# but each test creates a fresh pod so we install each time. 43PIP_CMD = "pip install -q cryptography 2>/dev/null" 44 45 46class TestNTCP2PodmanIntegration(unittest.TestCase): 47 """Integration tests using a podman pod with two Python containers.""" 48 49 @classmethod 50 def setUpClass(cls): 51 """Check that podman is available.""" 52 try: 53 _run("podman --version") 54 except (FileNotFoundError, RuntimeError): 55 raise unittest.SkipTest("podman not available") 56 57 def setUp(self): 58 """Create a shared tmpdir and a podman pod.""" 59 self._shared_dir = tempfile.mkdtemp(prefix="i2p-ntcp2-test-") 60 os.chmod(self._shared_dir, 0o777) 61 # Clean up any leftover pod 62 _run_quiet(f"podman pod rm -f {POD_NAME}") 63 _run(f"podman pod create --name {POD_NAME}") 64 65 def tearDown(self): 66 """Remove the pod and shared dir.""" 67 _run_quiet(f"podman pod rm -f {POD_NAME}") 68 # Clean up shared dir 69 import shutil 70 shutil.rmtree(self._shared_dir, ignore_errors=True) 71 72 def _wait_for_file(self, path, timeout=60): 73 """Wait for a file to appear and have content.""" 74 deadline = time.time() + timeout 75 while time.time() < deadline: 76 if os.path.exists(path) and os.path.getsize(path) > 0: 77 try: 78 with open(path) as f: 79 return json.load(f) 80 except (json.JSONDecodeError, IOError): 81 pass 82 time.sleep(0.5) 83 # Try to get container logs for debugging 84 for name in ["router-a", "router-b"]: 85 logs = subprocess.run( 86 f"podman logs {name}", shell=True, capture_output=True, text=True 87 ) 88 print(f"\n--- {name} logs ---\n{logs.stdout}\n{logs.stderr}") 89 raise TimeoutError(f"File {path} not ready after {timeout}s") 90 91 def test_two_routers_handshake_and_frame_exchange(self): 92 """Two Python routers in a pod perform NTCP2 handshake and exchange frames.""" 93 shared = self._shared_dir 94 95 # Start router-a (listener) 96 _run( 97 f"podman run -d --pod {POD_NAME} --name router-a " 98 f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z " 99 f"python:3.12-slim bash -c '" 100 f"{PIP_CMD} && PYTHONPATH=/app/src python /app/scripts/router_listener.py " 101 f"--port {LISTENER_PORT} " 102 f"--key-file /shared/listener_key.bin " 103 f"--result-file /shared/listener_result.json'" 104 ) 105 106 # Start router-b (connector) — connects to localhost in same pod 107 _run( 108 f"podman run -d --pod {POD_NAME} --name router-b " 109 f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z " 110 f"python:3.12-slim bash -c '" 111 f"{PIP_CMD} && PYTHONPATH=/app/src python /app/scripts/router_connector.py " 112 f"--host 127.0.0.1 --port {LISTENER_PORT} " 113 f"--key-file /shared/listener_key.bin " 114 f"--result-file /shared/connector_result.json'" 115 ) 116 117 # Wait for both results 118 listener_result = self._wait_for_file(os.path.join(shared, "listener_result.json")) 119 connector_result = self._wait_for_file(os.path.join(shared, "connector_result.json")) 120 121 # Verify listener 122 self.assertEqual(listener_result["status"], "ok", f"Listener failed: {listener_result}") 123 self.assertEqual(listener_result["handshake"], "complete") 124 self.assertEqual(listener_result["received_payload"], "hello from connector") 125 self.assertTrue(listener_result["sent_reply"]) 126 127 # Verify connector 128 self.assertEqual(connector_result["status"], "ok", f"Connector failed: {connector_result}") 129 self.assertEqual(connector_result["handshake"], "complete") 130 self.assertEqual(connector_result["received_payload"], "hello from listener") 131 self.assertTrue(connector_result["sent_frame"]) 132 133 def test_router_info_exchange(self): 134 """After handshake, both sides exchange ROUTER_INFO frames.""" 135 shared = self._shared_dir 136 137 # For this test, we use a combined script that sends RouterInfo frames 138 # Write a small inline script that does handshake + RouterInfo exchange 139 script = ''' 140import asyncio 141import json 142import os 143import struct 144import sys 145import time 146import traceback 147 148sys.path.insert(0, "/app/src") 149 150from i2p_crypto.x25519 import X25519DH 151from i2p_transport.ntcp2 import NTCP2Frame, FrameType 152from i2p_transport.ntcp2_connection import NTCP2Connection 153from i2p_transport.ntcp2_handshake import NTCP2Handshake 154 155 156async def _send_hs(writer, msg): 157 writer.write(struct.pack("!H", len(msg)) + msg) 158 await writer.drain() 159 160async def _recv_hs(reader): 161 lb = await reader.readexactly(2) 162 length = struct.unpack("!H", lb)[0] 163 return await reader.readexactly(length) 164 165 166ROLE = os.environ["ROLE"] # "listener" or "connector" 167PORT = int(os.environ.get("PORT", "6000")) 168SHARED = "/shared" 169 170 171async def run_listener(): 172 static = X25519DH.generate_keypair() 173 with open(f"{SHARED}/ri_listener_key.bin", "wb") as f: 174 f.write(static[1]) 175 176 done = asyncio.Event() 177 result = {"status": "error"} 178 holder = [None] 179 180 async def handle(reader, writer): 181 try: 182 hs = NTCP2Handshake(our_static=static, peer_static_pub=None, initiator=False) 183 msg1 = await asyncio.wait_for(_recv_hs(reader), timeout=10) 184 msg2 = hs.process_message_1(msg1) 185 await _send_hs(writer, msg2) 186 msg3 = await asyncio.wait_for(_recv_hs(reader), timeout=10) 187 hs.process_message_3(msg3) 188 sc, rc = hs.split() 189 conn = NTCP2Connection(reader=reader, writer=writer, 190 cipher_send=sc, cipher_recv=rc, 191 remote_hash=hs.remote_static_key() or b"") 192 holder[0] = conn 193 done.set() 194 except Exception as e: 195 print(f"Error: {e}", flush=True) 196 writer.close() 197 198 server = await asyncio.start_server(handle, "0.0.0.0", PORT) 199 print(f"RI listener on port {PORT}", flush=True) 200 201 await asyncio.wait_for(done.wait(), timeout=30) 202 conn = holder[0] 203 204 # Send our "RouterInfo" (simulated as ROUTER_INFO frame type) 205 our_ri = json.dumps({"router": "listener", "key": static[1].hex()}).encode() 206 await conn.send_frame(NTCP2Frame(FrameType.ROUTER_INFO, our_ri)) 207 208 # Receive peer's RouterInfo 209 peer_frame = await asyncio.wait_for(conn.recv_frame(), timeout=10) 210 211 result = { 212 "status": "ok", 213 "sent_ri": True, 214 "received_ri_type": peer_frame.frame_type.value, 215 "received_ri_payload": peer_frame.payload.decode("utf-8", errors="replace"), 216 } 217 conn._writer.close() 218 server.close() 219 return result 220 221 222async def run_connector(): 223 deadline = time.time() + 30 224 while time.time() < deadline: 225 kf = f"{SHARED}/ri_listener_key.bin" 226 if os.path.exists(kf) and os.path.getsize(kf) == 32: 227 break 228 await asyncio.sleep(0.2) 229 230 with open(f"{SHARED}/ri_listener_key.bin", "rb") as f: 231 peer_pub = f.read() 232 233 static = X25519DH.generate_keypair() 234 reader, writer = await asyncio.wait_for( 235 asyncio.open_connection("127.0.0.1", PORT), timeout=10) 236 237 hs = NTCP2Handshake(our_static=static, peer_static_pub=peer_pub, initiator=True) 238 msg1 = hs.create_message_1() 239 await _send_hs(writer, msg1) 240 msg2 = await asyncio.wait_for(_recv_hs(reader), timeout=10) 241 msg3 = hs.process_message_2(msg2) 242 await _send_hs(writer, msg3) 243 sc, rc = hs.split() 244 conn = NTCP2Connection(reader=reader, writer=writer, 245 cipher_send=sc, cipher_recv=rc, remote_hash=peer_pub) 246 247 # Send our RouterInfo 248 our_ri = json.dumps({"router": "connector", "key": static[1].hex()}).encode() 249 await conn.send_frame(NTCP2Frame(FrameType.ROUTER_INFO, our_ri)) 250 251 # Receive peer's RouterInfo 252 peer_frame = await asyncio.wait_for(conn.recv_frame(), timeout=10) 253 254 result = { 255 "status": "ok", 256 "sent_ri": True, 257 "received_ri_type": peer_frame.frame_type.value, 258 "received_ri_payload": peer_frame.payload.decode("utf-8", errors="replace"), 259 } 260 conn._writer.close() 261 return result 262 263 264async def main(): 265 try: 266 if ROLE == "listener": 267 result = await run_listener() 268 else: 269 result = await run_connector() 270 except Exception as e: 271 result = {"status": "error", "error": str(e)} 272 traceback.print_exc() 273 with open(f"{SHARED}/ri_{ROLE}_result.json", "w") as f: 274 json.dump(result, f) 275 print(f"Result: {result}", flush=True) 276 277asyncio.run(main()) 278''' 279 script_path = os.path.join(shared, "ri_exchange.py") 280 with open(script_path, "w") as f: 281 f.write(script) 282 283 # Start listener 284 _run( 285 f"podman run -d --pod {POD_NAME} --name ri-listener " 286 f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z " 287 f"-e ROLE=listener -e PORT={LISTENER_PORT} " 288 f"python:3.12-slim bash -c '" 289 f"{PIP_CMD} && python /shared/ri_exchange.py'" 290 ) 291 292 # Start connector 293 _run( 294 f"podman run -d --pod {POD_NAME} --name ri-connector " 295 f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z " 296 f"-e ROLE=connector -e PORT={LISTENER_PORT} " 297 f"python:3.12-slim bash -c '" 298 f"{PIP_CMD} && python /shared/ri_exchange.py'" 299 ) 300 301 # Wait for results 302 listener_result = self._wait_for_file(os.path.join(shared, "ri_listener_result.json")) 303 connector_result = self._wait_for_file(os.path.join(shared, "ri_connector_result.json")) 304 305 # Verify both exchanged RouterInfo frames 306 self.assertEqual(listener_result["status"], "ok", f"Listener failed: {listener_result}") 307 self.assertTrue(listener_result["sent_ri"]) 308 self.assertEqual(listener_result["received_ri_type"], FrameType.ROUTER_INFO.value) 309 # Verify the payload contains the connector's router info 310 ri_data = json.loads(listener_result["received_ri_payload"]) 311 self.assertEqual(ri_data["router"], "connector") 312 313 self.assertEqual(connector_result["status"], "ok", f"Connector failed: {connector_result}") 314 self.assertTrue(connector_result["sent_ri"]) 315 self.assertEqual(connector_result["received_ri_type"], FrameType.ROUTER_INFO.value) 316 ri_data = json.loads(connector_result["received_ri_payload"]) 317 self.assertEqual(ri_data["router"], "listener") 318 319 320# Need FrameType for assertions 321from i2p_transport.ntcp2 import FrameType 322 323 324if __name__ == "__main__": 325 unittest.main()