"""Podman pod integration tests for NTCP2 network I/O. Spins up a podman pod with two Python containers that perform a real NTCP2 handshake over TCP, exchange encrypted frames, and verify results. Requirements: - podman available on the host - python:3.12-slim image available (will be pulled if not) - This test is slow (~30s) due to container startup + pip install """ import json import os import subprocess import tempfile import time import unittest def _run(cmd, **kwargs): """Run a command and return stdout, raising on failure.""" result = subprocess.run( cmd, shell=True, capture_output=True, text=True, timeout=120, **kwargs ) if result.returncode != 0: raise RuntimeError( f"Command failed: {cmd}\nstdout: {result.stdout}\nstderr: {result.stderr}" ) return result.stdout.strip() def _run_quiet(cmd, **kwargs): """Run a command, ignoring errors.""" subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=60, **kwargs) POD_NAME = "i2p-ntcp2-test" LISTENER_PORT = 6000 APP_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) # pip install is cached across tests if the container survives, # but each test creates a fresh pod so we install each time. PIP_CMD = "pip install -q cryptography 2>/dev/null" class TestNTCP2PodmanIntegration(unittest.TestCase): """Integration tests using a podman pod with two Python containers.""" @classmethod def setUpClass(cls): """Check that podman is available.""" try: _run("podman --version") except (FileNotFoundError, RuntimeError): raise unittest.SkipTest("podman not available") def setUp(self): """Create a shared tmpdir and a podman pod.""" self._shared_dir = tempfile.mkdtemp(prefix="i2p-ntcp2-test-") os.chmod(self._shared_dir, 0o777) # Clean up any leftover pod _run_quiet(f"podman pod rm -f {POD_NAME}") _run(f"podman pod create --name {POD_NAME}") def tearDown(self): """Remove the pod and shared dir.""" _run_quiet(f"podman pod rm -f {POD_NAME}") # Clean up shared dir import shutil shutil.rmtree(self._shared_dir, ignore_errors=True) def _wait_for_file(self, path, timeout=60): """Wait for a file to appear and have content.""" deadline = time.time() + timeout while time.time() < deadline: if os.path.exists(path) and os.path.getsize(path) > 0: try: with open(path) as f: return json.load(f) except (json.JSONDecodeError, IOError): pass time.sleep(0.5) # Try to get container logs for debugging for name in ["router-a", "router-b"]: logs = subprocess.run( f"podman logs {name}", shell=True, capture_output=True, text=True ) print(f"\n--- {name} logs ---\n{logs.stdout}\n{logs.stderr}") raise TimeoutError(f"File {path} not ready after {timeout}s") def test_two_routers_handshake_and_frame_exchange(self): """Two Python routers in a pod perform NTCP2 handshake and exchange frames.""" shared = self._shared_dir # Start router-a (listener) _run( f"podman run -d --pod {POD_NAME} --name router-a " f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z " f"python:3.12-slim bash -c '" f"{PIP_CMD} && PYTHONPATH=/app/src python /app/scripts/router_listener.py " f"--port {LISTENER_PORT} " f"--key-file /shared/listener_key.bin " f"--result-file /shared/listener_result.json'" ) # Start router-b (connector) — connects to localhost in same pod _run( f"podman run -d --pod {POD_NAME} --name router-b " f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z " f"python:3.12-slim bash -c '" f"{PIP_CMD} && PYTHONPATH=/app/src python /app/scripts/router_connector.py " f"--host 127.0.0.1 --port {LISTENER_PORT} " f"--key-file /shared/listener_key.bin " f"--result-file /shared/connector_result.json'" ) # Wait for both results listener_result = self._wait_for_file(os.path.join(shared, "listener_result.json")) connector_result = self._wait_for_file(os.path.join(shared, "connector_result.json")) # Verify listener self.assertEqual(listener_result["status"], "ok", f"Listener failed: {listener_result}") self.assertEqual(listener_result["handshake"], "complete") self.assertEqual(listener_result["received_payload"], "hello from connector") self.assertTrue(listener_result["sent_reply"]) # Verify connector self.assertEqual(connector_result["status"], "ok", f"Connector failed: {connector_result}") self.assertEqual(connector_result["handshake"], "complete") self.assertEqual(connector_result["received_payload"], "hello from listener") self.assertTrue(connector_result["sent_frame"]) def test_router_info_exchange(self): """After handshake, both sides exchange ROUTER_INFO frames.""" shared = self._shared_dir # For this test, we use a combined script that sends RouterInfo frames # Write a small inline script that does handshake + RouterInfo exchange script = ''' import asyncio import json import os import struct import sys import time import traceback sys.path.insert(0, "/app/src") from i2p_crypto.x25519 import X25519DH from i2p_transport.ntcp2 import NTCP2Frame, FrameType from i2p_transport.ntcp2_connection import NTCP2Connection from i2p_transport.ntcp2_handshake import NTCP2Handshake async def _send_hs(writer, msg): writer.write(struct.pack("!H", len(msg)) + msg) await writer.drain() async def _recv_hs(reader): lb = await reader.readexactly(2) length = struct.unpack("!H", lb)[0] return await reader.readexactly(length) ROLE = os.environ["ROLE"] # "listener" or "connector" PORT = int(os.environ.get("PORT", "6000")) SHARED = "/shared" async def run_listener(): static = X25519DH.generate_keypair() with open(f"{SHARED}/ri_listener_key.bin", "wb") as f: f.write(static[1]) done = asyncio.Event() result = {"status": "error"} holder = [None] async def handle(reader, writer): try: hs = NTCP2Handshake(our_static=static, peer_static_pub=None, initiator=False) msg1 = await asyncio.wait_for(_recv_hs(reader), timeout=10) msg2 = hs.process_message_1(msg1) await _send_hs(writer, msg2) msg3 = await asyncio.wait_for(_recv_hs(reader), timeout=10) hs.process_message_3(msg3) sc, rc = hs.split() conn = NTCP2Connection(reader=reader, writer=writer, cipher_send=sc, cipher_recv=rc, remote_hash=hs.remote_static_key() or b"") holder[0] = conn done.set() except Exception as e: print(f"Error: {e}", flush=True) writer.close() server = await asyncio.start_server(handle, "0.0.0.0", PORT) print(f"RI listener on port {PORT}", flush=True) await asyncio.wait_for(done.wait(), timeout=30) conn = holder[0] # Send our "RouterInfo" (simulated as ROUTER_INFO frame type) our_ri = json.dumps({"router": "listener", "key": static[1].hex()}).encode() await conn.send_frame(NTCP2Frame(FrameType.ROUTER_INFO, our_ri)) # Receive peer's RouterInfo peer_frame = await asyncio.wait_for(conn.recv_frame(), timeout=10) result = { "status": "ok", "sent_ri": True, "received_ri_type": peer_frame.frame_type.value, "received_ri_payload": peer_frame.payload.decode("utf-8", errors="replace"), } conn._writer.close() server.close() return result async def run_connector(): deadline = time.time() + 30 while time.time() < deadline: kf = f"{SHARED}/ri_listener_key.bin" if os.path.exists(kf) and os.path.getsize(kf) == 32: break await asyncio.sleep(0.2) with open(f"{SHARED}/ri_listener_key.bin", "rb") as f: peer_pub = f.read() static = X25519DH.generate_keypair() reader, writer = await asyncio.wait_for( asyncio.open_connection("127.0.0.1", PORT), timeout=10) hs = NTCP2Handshake(our_static=static, peer_static_pub=peer_pub, initiator=True) msg1 = hs.create_message_1() await _send_hs(writer, msg1) msg2 = await asyncio.wait_for(_recv_hs(reader), timeout=10) msg3 = hs.process_message_2(msg2) await _send_hs(writer, msg3) sc, rc = hs.split() conn = NTCP2Connection(reader=reader, writer=writer, cipher_send=sc, cipher_recv=rc, remote_hash=peer_pub) # Send our RouterInfo our_ri = json.dumps({"router": "connector", "key": static[1].hex()}).encode() await conn.send_frame(NTCP2Frame(FrameType.ROUTER_INFO, our_ri)) # Receive peer's RouterInfo peer_frame = await asyncio.wait_for(conn.recv_frame(), timeout=10) result = { "status": "ok", "sent_ri": True, "received_ri_type": peer_frame.frame_type.value, "received_ri_payload": peer_frame.payload.decode("utf-8", errors="replace"), } conn._writer.close() return result async def main(): try: if ROLE == "listener": result = await run_listener() else: result = await run_connector() except Exception as e: result = {"status": "error", "error": str(e)} traceback.print_exc() with open(f"{SHARED}/ri_{ROLE}_result.json", "w") as f: json.dump(result, f) print(f"Result: {result}", flush=True) asyncio.run(main()) ''' script_path = os.path.join(shared, "ri_exchange.py") with open(script_path, "w") as f: f.write(script) # Start listener _run( f"podman run -d --pod {POD_NAME} --name ri-listener " f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z " f"-e ROLE=listener -e PORT={LISTENER_PORT} " f"python:3.12-slim bash -c '" f"{PIP_CMD} && python /shared/ri_exchange.py'" ) # Start connector _run( f"podman run -d --pod {POD_NAME} --name ri-connector " f"-v {APP_DIR}:/app:Z -v {shared}:/shared:Z " f"-e ROLE=connector -e PORT={LISTENER_PORT} " f"python:3.12-slim bash -c '" f"{PIP_CMD} && python /shared/ri_exchange.py'" ) # Wait for results listener_result = self._wait_for_file(os.path.join(shared, "ri_listener_result.json")) connector_result = self._wait_for_file(os.path.join(shared, "ri_connector_result.json")) # Verify both exchanged RouterInfo frames self.assertEqual(listener_result["status"], "ok", f"Listener failed: {listener_result}") self.assertTrue(listener_result["sent_ri"]) self.assertEqual(listener_result["received_ri_type"], FrameType.ROUTER_INFO.value) # Verify the payload contains the connector's router info ri_data = json.loads(listener_result["received_ri_payload"]) self.assertEqual(ri_data["router"], "connector") self.assertEqual(connector_result["status"], "ok", f"Connector failed: {connector_result}") self.assertTrue(connector_result["sent_ri"]) self.assertEqual(connector_result["received_ri_type"], FrameType.ROUTER_INFO.value) ri_data = json.loads(connector_result["received_ri_payload"]) self.assertEqual(ri_data["router"], "listener") # Need FrameType for assertions from i2p_transport.ntcp2 import FrameType if __name__ == "__main__": unittest.main()