"""Full NTCP2 connection test: handshake + data exchange.""" import asyncio import base64 import hashlib import logging import random import sys import time sys.path.insert(0, "src") from i2p_data.router import RouterInfo from i2p_netdb.reseed import ReseedClient from i2p_router.identity import ( RouterKeyBundle, create_full_router_identity, ) from i2p_router.peer_connector import extract_ntcp2_address from i2p_transport.ntcp2_real_server import NTCP2RealConnector from i2p_transport.ntcp2_blocks import ( BLOCK_DATETIME, BLOCK_I2NP, BLOCK_ROUTERINFO, BLOCK_TERMINATION, BLOCK_PADDING, BLOCK_OPTIONS, ) logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s") logger = logging.getLogger("full") BLOCK_NAMES = { BLOCK_DATETIME: "DateTime", BLOCK_I2NP: "I2NP", BLOCK_ROUTERINFO: "RouterInfo", BLOCK_TERMINATION: "Termination", BLOCK_PADDING: "Padding", BLOCK_OPTIONS: "Options", } async def main(): bundle = RouterKeyBundle.generate() identity, ri = create_full_router_identity(bundle, "0.0.0.0", 9000) our_ri_bytes = ri.to_bytes() logger.info("Our RI: %d bytes, verify=%s", len(our_ri_bytes), ri.verify()) # Reseed client = ReseedClient(target_count=20, min_servers=1, timeout=15) ri_list = await client.reseed() logger.info("Got %d peer RIs from reseed", len(ri_list)) connector_keypair = (bundle.ntcp2_private, bundle.ntcp2_public) random.shuffle(ri_list) connected = 0 attempts = 0 for peer_bytes in ri_list: if connected >= 3 or attempts >= 10: break try: peer_ri = RouterInfo.from_bytes(peer_bytes) params = extract_ntcp2_address(peer_ri) if params is None: continue host, port, peer_static_pub, peer_iv = params if len(peer_static_pub) != 32 or len(peer_iv) != 16: continue peer_identity_bytes = peer_ri.identity.to_bytes() peer_hash = hashlib.sha256(peer_identity_bytes).digest() attempts += 1 logger.info("Connecting to %s:%d (v%s, caps=%s)...", host, port, peer_ri.options.get("router.version", "?"), peer_ri.options.get("caps", "?")) connector = NTCP2RealConnector() try: conn = await asyncio.wait_for( connector.connect( host=host, port=port, our_static_key=connector_keypair, our_ri_bytes=our_ri_bytes, peer_static_pub=peer_static_pub, peer_ri_hash=peer_hash, peer_iv=peer_iv, ), timeout=10, ) connected += 1 logger.info("CONNECTED to %s:%d!", host, port) # Read frames frames_read = 0 try: while frames_read < 5: blocks = await asyncio.wait_for(conn.recv_frame(), timeout=10) frames_read += 1 for block in blocks: name = BLOCK_NAMES.get(block.block_type, f"Unknown({block.block_type})") logger.info(" Frame %d: %s (%d bytes)", frames_read, name, len(block.data)) if block.block_type == BLOCK_TERMINATION: reason = block.data[-1] if block.data else -1 logger.info(" -> Termination reason=%d", reason) elif block.block_type == BLOCK_I2NP and len(block.data) >= 11: msg_type = block.data[0] logger.info(" -> I2NP msg_type=%d", msg_type) elif block.block_type == BLOCK_ROUTERINFO and len(block.data) > 1: flag = block.data[0] ri_data = block.data[1:] try: peer_ri2 = RouterInfo.from_bytes(ri_data) logger.info(" -> RI: %d addrs, v%s, verify=%s", len(peer_ri2.addresses), peer_ri2.options.get("router.version", "?"), peer_ri2.verify()) except Exception as e: logger.info(" -> RI parse error: %s", e) elif block.block_type == BLOCK_DATETIME: import struct ts = struct.unpack("!I", block.data[:4])[0] logger.info(" -> DateTime: %d", ts) except asyncio.TimeoutError: logger.info(" No more frames within timeout (read %d frames)", frames_read) except Exception as e: logger.info(" Error reading frame: %s", e) try: await conn.close() except Exception: pass except asyncio.TimeoutError: logger.info("Connection timed out") except Exception as e: logger.info("Connection failed: %s", e) except Exception as e: continue logger.info("=== DONE: %d/%d connections successful ===", connected, attempts) if __name__ == "__main__": asyncio.run(main())