"""Peer connection management — address extraction, connection pooling, and NTCP2 connector. Provides utilities for connecting to I2P peers via the real NTCP2 transport: - extract_ntcp2_address(): parse NTCP2 params from a RouterInfo - ConnectionPool: track active connections by peer hash - PeerConnector: initiate NTCP2 handshakes to remote peers """ from __future__ import annotations import asyncio import hashlib import logging from dataclasses import dataclass from i2p_data.data_helper import from_base64 from i2p_transport.ntcp2_real_server import NTCP2RealConnector logger = logging.getLogger(__name__) def extract_ntcp2_address(router_info) -> tuple[str, int, bytes, bytes] | None: """Extract NTCP2 connection params from a RouterInfo. Returns (host, port, static_key_32bytes, iv_16bytes) or None. Looks for address with transport="NTCP2" and options containing "s" and "i". """ for addr in router_info.addresses: if addr.transport != "NTCP2": continue opts = addr.options s_b64 = opts.get("s") i_b64 = opts.get("i") if s_b64 is None or i_b64 is None: continue host = opts.get("host") port_str = opts.get("port") if host is None or port_str is None: continue try: static_key = from_base64(s_b64) iv = from_base64(i_b64) port = int(port_str) except (ValueError, Exception): continue return (host, port, static_key, iv) return None class ConnectionPool: """Tracks active NTCP2 connections by peer hash.""" def __init__(self, max_connections: int = 50): self._connections: dict[bytes, object] = {} self._max = max_connections def add(self, peer_hash: bytes, connection) -> bool: """Add connection. Returns False if at capacity.""" if len(self._connections) >= self._max: return False self._connections[peer_hash] = connection return True def get(self, peer_hash: bytes): """Get connection by peer hash, or None.""" return self._connections.get(peer_hash) def remove(self, peer_hash: bytes) -> None: """Remove a connection by peer hash. No-op if not present.""" self._connections.pop(peer_hash, None) def is_connected(self, peer_hash: bytes) -> bool: """Return True if peer_hash has an active connection.""" return peer_hash in self._connections def get_all_peer_hashes(self) -> list[bytes]: """Return all connected peer hashes.""" return list(self._connections.keys()) @property def active_count(self) -> int: """Number of active connections.""" return len(self._connections) class PeerConnector: """Connects to I2P peers via NTCP2. Extracts NTCP2 address from RouterInfo, initiates handshake, and returns established connection. """ def __init__( self, our_static_keypair: tuple[bytes, bytes], our_iv: bytes, our_ri_bytes: bytes | None = None, ): self._our_static = our_static_keypair self._our_iv = our_iv self._our_ri_bytes = our_ri_bytes or b"" def set_our_router_info(self, ri_bytes: bytes) -> None: """Set our own serialized RouterInfo to send in msg3.""" self._our_ri_bytes = ri_bytes async def connect(self, router_info) -> object | None: """Connect to a peer using NTCP2. 1. Extract NTCP2 address from RouterInfo 2. Create NTCP2RealConnector 3. Perform handshake (sends OUR RouterInfo in msg3) 4. Return connection (or None on failure) """ params = extract_ntcp2_address(router_info) if params is None: logger.debug("No NTCP2 address found in RouterInfo") return None host, port, peer_static_pub, peer_iv = params # Compute the SHA-256 hash of the peer's RouterIdentity for handshake obfuscation. # Java: _bobHash = new SessionKey(peer.calculateHash().getData()) # calculateHash() = SHA-256 of the RouterIdentity bytes (NOT the full RouterInfo) identity_bytes = router_info.identity.to_bytes() peer_ri_hash = hashlib.sha256(identity_bytes).digest() try: connector = NTCP2RealConnector() conn = await connector.connect( host=host, port=port, our_static_key=self._our_static, our_ri_bytes=self._our_ri_bytes, peer_static_pub=peer_static_pub, peer_ri_hash=peer_ri_hash, peer_iv=peer_iv, ) logger.info("Connected to peer at %s:%d", host, port) return conn except Exception: logger.exception("Failed to connect to peer at %s:%d", host, port) return None