"""Enhanced diagnostic: connect to peers with detailed byte-level debugging.""" import asyncio import base64 import hashlib import logging import random import struct import sys import time sys.path.insert(0, "src") from i2p_data.router import RouterInfo from i2p_netdb.reseed import ReseedClient from i2p_router.identity import ( RouterKeyBundle, create_full_router_identity, ) from i2p_router.peer_connector import extract_ntcp2_address from i2p_transport.ntcp2_real_handshake import NTCP2RealHandshake from i2p_transport.ntcp2_blocks import BLOCK_TERMINATION, BLOCK_ROUTERINFO, decode_blocks logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s %(name)s %(message)s") logger = logging.getLogger("diag2") async def raw_connect(host, port, our_static_key, our_ri_bytes, peer_static_pub, peer_ri_hash, peer_iv): """Manual NTCP2 handshake with raw byte debugging at every step.""" reader, writer = await asyncio.open_connection(host, port) try: hs = NTCP2RealHandshake( our_static=our_static_key, peer_static_pub=peer_static_pub, peer_ri_hash=peer_ri_hash, peer_iv=peer_iv, initiator=True, ) # --- MSG1 --- msg1 = hs.create_session_request(padding_len=0, router_info=our_ri_bytes) logger.info("MSG1: %d bytes, msg3p2len=%d", len(msg1), hs._msg3p2len) writer.write(msg1) await writer.drain() # --- MSG2 header (64 bytes) --- msg2_header = await asyncio.wait_for(reader.readexactly(64), timeout=10) logger.info("MSG2 header: %d bytes", len(msg2_header)) padlen2 = hs.process_session_created_header(msg2_header) logger.info("MSG2 padlen2=%d", padlen2) if padlen2 > 0: msg2_padding = await reader.readexactly(padlen2) else: msg2_padding = b"" hs.process_session_created_padding(msg2_padding) logger.info("MSG2 processed OK") # --- MSG3 --- msg3 = hs.create_session_confirmed(router_info=our_ri_bytes) logger.info("MSG3: %d bytes (part1=48, part2=%d)", len(msg3), len(msg3) - 48) logger.info("MSG3 expected part2=%d, actual part2=%d", hs._msg3p2len, len(msg3) - 48) writer.write(msg3) await writer.drain() logger.info("MSG3 sent, draining...") # --- POST-HANDSHAKE: try raw read --- # Give peer a moment to process msg3 await asyncio.sleep(0.5) # Try to read raw bytes to see what (if anything) comes back try: raw_data = await asyncio.wait_for(reader.read(4096), timeout=5) if raw_data: logger.info("RAW POST-HANDSHAKE: got %d bytes: %s", len(raw_data), raw_data[:64].hex()) # Try SipHash deobfuscation keys = hs.split() if len(raw_data) >= 2: obf_len = raw_data[:2] frame_len = keys.recv_siphash.deobfuscate_length(obf_len) logger.info("SipHash deobfuscated length: %d", frame_len) if frame_len <= len(raw_data) - 2: encrypted = raw_data[2:2 + frame_len] try: plaintext = keys.recv_cipher.decrypt_with_ad(b"", encrypted) blocks = decode_blocks(plaintext) for block in blocks: logger.info("Block type=%d len=%d", block.block_type, len(block.data)) if block.block_type == BLOCK_TERMINATION: reason = block.data[-1] if block.data else -1 logger.info("TERMINATION reason=%d", reason) except Exception as e: logger.info("Decrypt failed: %s", e) else: logger.info("Need %d more bytes for frame", frame_len - (len(raw_data) - 2)) else: logger.info("RAW POST-HANDSHAKE: 0 bytes (peer closed connection)") except asyncio.TimeoutError: logger.info("RAW POST-HANDSHAKE: timeout (peer sent nothing in 5s)") writer.close() await writer.wait_closed() except Exception as e: logger.error("Connection failed: %s", e, exc_info=True) writer.close() async def main(): # Generate keys bundle = RouterKeyBundle.generate() identity, ri = create_full_router_identity(bundle, "0.0.0.0", 9000) our_ri_bytes = ri.to_bytes() logger.info("=== OUR ROUTERINFO ===") logger.info("RI total bytes: %d", len(our_ri_bytes)) logger.info("Self-verify: %s", ri.verify()) logger.info("Options: %s", ri.options) # Roundtrip check ri2 = RouterInfo.from_bytes(our_ri_bytes) signable1 = ri._signable_bytes() signable2 = ri2._signable_bytes() logger.info("Roundtrip signable match: %s", signable1 == signable2) logger.info("Roundtrip verify: %s", ri2.verify()) logger.info("Roundtrip to_bytes match: %s", our_ri_bytes == ri2.to_bytes()) # Show the RI block as it would appear in msg3 from i2p_transport.ntcp2_blocks import router_info_block, options_block, padding_block, encode_blocks blocks = [ router_info_block(our_ri_bytes), options_block(), padding_block(0), ] block_payload = encode_blocks(blocks) logger.info("MSG3 block payload size: %d", len(block_payload)) logger.info("MSG3 expected p2 len (payload+16): %d", len(block_payload) + 16) # Parse the block payload back to verify it's valid parsed_blocks = decode_blocks(block_payload) logger.info("Parsed %d blocks from msg3 payload", len(parsed_blocks)) for b in parsed_blocks: logger.info(" Block type=%d datalen=%d", b.block_type, len(b.data)) # Verify the RI from the block matches ri_block = parsed_blocks[0] ri_flag = ri_block.data[0] ri_data = ri_block.data[1:] logger.info("RI block flag=%d, ri_data len=%d", ri_flag, len(ri_data)) ri3 = RouterInfo.from_bytes(ri_data) logger.info("RI from msg3 block verify: %s", ri3.verify()) logger.info("RI from msg3 to_bytes match: %s", ri_data == ri3.to_bytes()) # Reseed logger.info("=== RESEEDING ===") client = ReseedClient(target_count=10, min_servers=1, timeout=15) ri_list = await client.reseed() logger.info("Got %d RIs from reseed", len(ri_list)) # Check what enc_types the peers use for peer_bytes in ri_list[:5]: try: peer_ri = RouterInfo.from_bytes(peer_bytes) cert = peer_ri.identity.certificate from i2p_data.certificate import KeyCertificate if isinstance(cert, KeyCertificate): enc_type = cert.get_enc_type() sig_type = cert.get_sig_type() logger.info("Peer cert: enc_type=%s sig_type=%s version=%s", enc_type, sig_type, peer_ri.options.get("router.version", "?")) except Exception as e: logger.debug("Skip peer cert check: %s", e) # Try to connect to peers connector_keypair = (bundle.ntcp2_private, bundle.ntcp2_public) random.shuffle(ri_list) attempts = 0 for peer_bytes in ri_list: if attempts >= 5: break try: peer_ri = RouterInfo.from_bytes(peer_bytes) params = extract_ntcp2_address(peer_ri) if params is None: continue host, port, peer_static_pub, peer_iv = params # Fix 15-byte IVs by right-padding with 0x00 if len(peer_iv) < 16: logger.info("Fixing short IV: %d bytes -> 16 bytes", len(peer_iv)) peer_iv = peer_iv + b"\x00" * (16 - len(peer_iv)) elif len(peer_iv) > 16: logger.info("Truncating long IV: %d bytes -> 16 bytes", len(peer_iv)) peer_iv = peer_iv[:16] peer_identity_bytes = peer_ri.identity.to_bytes() peer_hash = hashlib.sha256(peer_identity_bytes).digest() attempts += 1 logger.info("=== CONNECTING #%d TO %s:%d (hash=%s...) ===", attempts, host, port, peer_hash[:4].hex()) logger.info("Peer version=%s caps=%s", peer_ri.options.get("router.version", "?"), peer_ri.options.get("caps", "?")) logger.info("Peer static pub: %s", peer_static_pub.hex()) logger.info("Peer IV (%d bytes): %s", len(peer_iv), peer_iv.hex()) logger.info("Peer RI valid: %s", peer_ri.verify()) # Check peer's enc_type from i2p_data.certificate import KeyCertificate cert = peer_ri.identity.certificate if isinstance(cert, KeyCertificate): logger.info("Peer enc_type=%s sig_type=%s", cert.get_enc_type(), cert.get_sig_type()) await raw_connect( host=host, port=port, our_static_key=connector_keypair, our_ri_bytes=our_ri_bytes, peer_static_pub=peer_static_pub, peer_ri_hash=peer_hash, peer_iv=peer_iv, ) except Exception as e: logger.debug("Skip peer: %s", e) continue logger.info("=== DONE ===") if __name__ == "__main__": asyncio.run(main())