"""Diagnostic: connect to a single peer, dump all details.""" import asyncio import base64 import hashlib import logging import random import sys import time sys.path.insert(0, "src") from i2p_data.router import RouterInfo from i2p_data.keys_and_cert import KeysAndCert from i2p_netdb.reseed import ReseedClient from i2p_router.identity import ( RouterKeyBundle, create_full_router_identity, ) from i2p_router.peer_connector import extract_ntcp2_address from i2p_transport.ntcp2_real_server import NTCP2RealConnector from i2p_transport.ntcp2_blocks import BLOCK_TERMINATION, decode_blocks logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s %(message)s") logger = logging.getLogger("diag") async def main(): # 1. Generate keys bundle = RouterKeyBundle.generate() identity, ri = create_full_router_identity(bundle, "0.0.0.0", 9000) our_ri_bytes = ri.to_bytes() logger.info("=== OUR ROUTERINFO ===") logger.info("RI total bytes: %d", len(our_ri_bytes)) logger.info("Identity bytes: %d", len(identity.to_bytes())) logger.info("Signable bytes: %d", len(ri._signable_bytes())) logger.info("Signature bytes: %d", len(ri.signature)) logger.info("Self-verify: %s", ri.verify()) logger.info("Options: %s", ri.options) logger.info("Addresses: %s", [(a.transport, a.options) for a in ri.addresses]) # Roundtrip check ri2 = RouterInfo.from_bytes(our_ri_bytes) signable1 = ri._signable_bytes() signable2 = ri2._signable_bytes() logger.info("Roundtrip signable match: %s", signable1 == signable2) logger.info("Roundtrip verify: %s", ri2.verify()) # Show identity hex id_bytes = identity.to_bytes() logger.info("Identity cert: %s", id_bytes[384:].hex()) logger.info("Signing pub: %s", bundle.signing_public.hex()) logger.info("Enc pub (X25519): %s", bundle.ntcp2_public.hex()) # Show full RI hex logger.info("Full RI hex:\n%s", our_ri_bytes.hex()) # 2. Reseed to get peers logger.info("=== RESEEDING ===") client = ReseedClient(target_count=10, min_servers=1, timeout=15) ri_list = await client.reseed() logger.info("Got %d RIs from reseed", len(ri_list)) # 3. Try to connect to peers connector_keypair = (bundle.ntcp2_private, bundle.ntcp2_public) peer_ri_hash_for_connect = hashlib.sha256(identity.to_bytes()).digest() random.shuffle(ri_list) attempts = 0 for peer_bytes in ri_list: if attempts >= 5: break try: peer_ri = RouterInfo.from_bytes(peer_bytes) params = extract_ntcp2_address(peer_ri) if params is None: continue host, port, peer_static_pub, peer_iv = params peer_identity_bytes = peer_ri.identity.to_bytes() peer_hash = hashlib.sha256(peer_identity_bytes).digest() attempts += 1 logger.info("=== CONNECTING TO %s:%d (hash=%s...) ===", host, port, peer_hash[:4].hex()) logger.info("Peer static pub: %s", peer_static_pub.hex()) logger.info("Peer IV: %s", peer_iv.hex()) logger.info("Peer RI hash: %s", peer_hash.hex()) logger.info("Peer RI valid: %s", peer_ri.verify()) logger.info("Peer options: %s", peer_ri.options) connector = NTCP2RealConnector() try: conn = await asyncio.wait_for( connector.connect( host=host, port=port, our_static_key=connector_keypair, our_ri_bytes=our_ri_bytes, peer_static_pub=peer_static_pub, peer_ri_hash=peer_hash, peer_iv=peer_iv, ), timeout=15, ) logger.info("CONNECTED! Reading first frame...") try: blocks = await asyncio.wait_for(conn.recv_frame(), timeout=5) for block in blocks: logger.info("Block type=%d len=%d data=%s", block.block_type, len(block.data), block.data.hex()) if block.block_type == BLOCK_TERMINATION: reason = block.data[-1] if block.data else -1 logger.info("TERMINATION reason=%d", reason) except asyncio.TimeoutError: logger.info("No frame received within 5s (peer might be OK!)") except Exception as e: logger.info("Error reading frame: %s", e) await conn.close() except asyncio.TimeoutError: logger.info("Connection timed out") except Exception as e: logger.info("Connection failed: %s", e, exc_info=True) except Exception as e: logger.debug("Skip peer: %s", e) continue logger.info("=== DONE ===") if __name__ == "__main__": asyncio.run(main())