"""Router bootstrap orchestrator — startup sequence and main event loop. Orchestrates the full I2P router startup: 1. Load or generate router identity (Ed25519 + X25519 keys) 2. Start NTCP2 listener 3. Reseed from HTTPS servers if netdb is sparse 4. Parse RouterInfos, store in NetDB 5. Connect to initial peers 6. Exchange RouterInfo via DatabaseStore I2NP messages 7. Run the async main event loop Ported from net.i2p.router.Router bootstrap sequence. """ from __future__ import annotations import asyncio import hashlib import logging import os import random import resource import struct import time from dataclasses import dataclass from typing import TYPE_CHECKING if TYPE_CHECKING: from i2p_transport.ntcp2_real_server import NTCP2RealListener from i2p_data.i2np_codec import ( MSG_TYPE_DATABASE_STORE, encode_i2np_short, ) from i2p_data.router import RouterIdentity, RouterInfo from i2p_netdb.datastore import DataStore, EntryType, NetDBEntry from i2p_netdb.reseed import ReseedClient from i2p_router.config import RouterConfig from i2p_router.core import RouterContext from i2p_router.identity import ( RouterKeyBundle, build_router_identity, build_router_info, create_full_router_identity, ) from i2p_router.peer_connector import ConnectionPool, PeerConnector, extract_ntcp2_address from i2p_router.health import start_health_server from i2p_router.maintenance import ( keepalive_loop, reconnect_loop, routerinfo_broadcast_loop, netdb_eviction_loop, ) from i2p_netdb.sqlite_netdb import SqliteNetDB logger = logging.getLogger(__name__) # Minimum netdb entries before we skip reseed _MIN_NETDB_FOR_RESEED = 50 # Target number of initial peer connections _INITIAL_PEER_COUNT = 5 # Timeout for individual peer connection attempts (seconds) _PEER_CONNECT_TIMEOUT = 15 def build_database_store_i2np(router_hash: bytes, ri_bytes: bytes) -> bytes: """Build an I2NP DatabaseStore message with short header. The DatabaseStore payload format: - key (32 bytes): the router hash - type (1 byte): 0 = RouterInfo - reply_token (4 bytes): 0 (no reply requested) - data: the RouterInfo bytes Returns the full I2NP short-header message. """ # DatabaseStore payload: key(32) + type(1) + reply_token(4) + data db_store_payload = router_hash + struct.pack("!BI", 0, 0) + ri_bytes msg_id = int.from_bytes(os.urandom(4), "big") expiration = int(time.time()) + 60 # 60s from now return encode_i2np_short( MSG_TYPE_DATABASE_STORE, msg_id, expiration, db_store_payload, ) class MessageReader: """Reads I2NP blocks from NTCP2 frames and dispatches to RouterContext. Parses the I2NP short header (type, msg_id, expiration, size, payload) and forwards to the RouterContext's inbound message handler. """ def __init__(self, context: RouterContext) -> None: self._context = context def handle_i2np_block(self, i2np_data: bytes, bootstrap: RouterBootstrap | None = None) -> None: """Parse an I2NP short-header message and dispatch it. Args: i2np_data: Raw I2NP message with short header (11+ bytes). bootstrap: Optional bootstrap instance to increment counters. """ if len(i2np_data) < 11: logger.warning("I2NP block too short: %d bytes", len(i2np_data)) return msg_type, msg_id, expiration, size = struct.unpack_from("!BIiH", i2np_data) payload = i2np_data[11:11 + size] if bootstrap is not None: bootstrap._i2np_messages_received += 1 logger.debug( "I2NP message: type=%d id=%d size=%d", msg_type, msg_id, size, ) self._context.process_inbound(msg_type, payload) class RouterBootstrap: """Orchestrates the full I2P router startup sequence. Manages the lifecycle from key generation through peer discovery, connection establishment, and the main event loop. """ def __init__(self, config: RouterConfig) -> None: self._config = config self._state = "stopped" self._key_bundle: RouterKeyBundle | None = None self._our_identity: RouterIdentity | None = None self._our_router_info: RouterInfo | None = None self._context: RouterContext | None = None self._conn_pool = ConnectionPool(max_connections=50) self._peer_connector: PeerConnector | None = None self._reseed_client = ReseedClient( reseed_urls=config.reseed_urls or None, ) self._listener: NTCP2RealListener | None = None self._message_reader: MessageReader | None = None self._main_loop_task: asyncio.Task | None = None self._started_at: float | None = None self._health_server: asyncio.Server | None = None self._sqlite_netdb: SqliteNetDB | None = None self._maintenance_tasks: list[asyncio.Task] = [] # Counters for observability self._peers_attempted = 0 self._peers_successful = 0 self._peers_failed = 0 self._i2np_messages_received = 0 self._i2np_messages_sent = 0 @property def config(self) -> RouterConfig: return self._config @property def state(self) -> str: return self._state @property def peer_count(self) -> int: return self._conn_pool.active_count async def start(self) -> None: """Run the full bootstrap sequence. 1. Load or generate identity 2. Start NTCP2 listener 3. Reseed if needed 4. Connect to initial peers 5. Exchange RouterInfo 6. Start main loop """ if self._state == "running": return self._state = "starting" self._started_at = time.monotonic() try: self._state = "loading_identity" await self._load_or_generate_identity() self._state = "starting_listener" await self._start_listener() # Start health server EARLY so we have visibility during reseed self._state = "starting_health" await self._start_health_server() self._state = "loading_netdb_cache" self._load_netdb_from_disk() self._state = "reseeding" await self._reseed_if_needed() self._persist_netdb_to_disk() self._state = "connecting_peers" await self._connect_initial_peers() self._state = "exchanging_routerinfo" await self._exchange_router_info() self._start_maintenance_tasks() self._state = "running" logger.info("Router bootstrap complete, state=running") except Exception: logger.exception("Bootstrap failed in state=%s", self._state) self._state = "failed" raise async def _load_or_generate_identity(self) -> None: """Load existing keys or generate new ones. Creates RouterContext.""" data_dir = os.path.expanduser(self._config.data_dir) os.makedirs(data_dir, exist_ok=True) key_path = os.path.join(data_dir, "router.keys.json") bundle = RouterKeyBundle.load(key_path) if bundle is None: logger.info("No existing identity found, generating new keys") bundle = RouterKeyBundle.generate() bundle.save(key_path) logger.info("New identity saved to %s", key_path) else: logger.info("Loaded existing identity from %s", key_path) self._key_bundle = bundle # Build RouterIdentity and signed RouterInfo identity, ri = create_full_router_identity( bundle, self._config.listen_host, self._config.listen_port, ) self._our_identity = identity self._our_router_info = ri # Create RouterContext with our identity hash ri_hash = hashlib.sha256(identity.to_bytes()).digest() self._context = RouterContext(router_hash=ri_hash) # Create PeerConnector with our own RouterInfo for msg3 self._peer_connector = PeerConnector( our_static_keypair=(bundle.ntcp2_private, bundle.ntcp2_public), our_iv=bundle.ntcp2_iv, our_ri_bytes=ri.to_bytes(), ) # Create MessageReader self._message_reader = MessageReader(self._context) async def _start_listener(self) -> None: """Start the NTCP2 listener for inbound connections.""" if self._key_bundle is None or self._our_router_info is None or self._our_identity is None: raise RuntimeError("Identity not loaded") from i2p_transport.ntcp2_real_server import NTCP2RealListener # AES key for inbound obfuscation = SHA-256 of our RouterIdentity bytes ri_hash = hashlib.sha256(self._our_identity.to_bytes()).digest() self._listener = NTCP2RealListener( host=self._config.listen_host, port=self._config.listen_port, our_static_key=(self._key_bundle.ntcp2_private, self._key_bundle.ntcp2_public), our_ri_hash=ri_hash, our_iv=self._key_bundle.ntcp2_iv, on_connection=self._handle_inbound_connection, ) server = await self._listener.start() actual_port = server.sockets[0].getsockname()[1] if server.sockets else 0 logger.info("NTCP2 listener started on %s:%d", self._config.listen_host, actual_port) async def _handle_inbound_connection(self, conn) -> None: """Handle a new inbound NTCP2 connection.""" peer_hash = conn.remote_hash if not self._conn_pool.add(peer_hash, conn): logger.warning("Connection pool full, rejecting inbound peer") await conn.close() return logger.info("Accepted inbound peer connection (hash=%s...)", peer_hash[:4].hex()) # Start reading messages from this connection asyncio.create_task(self._read_peer_messages(peer_hash, conn)) async def _reseed_if_needed(self) -> int: """Fetch RouterInfos from reseed servers if netdb is sparse. Returns the number of new RouterInfos stored. """ if self._context is None: return 0 current_count = self._context.datastore.count() if current_count >= _MIN_NETDB_FOR_RESEED: logger.info("NetDB has %d entries, skipping reseed", current_count) return 0 logger.info("NetDB has %d entries (<%d), reseeding...", current_count, _MIN_NETDB_FOR_RESEED) try: ri_bytes_list = await self._reseed_client.reseed() except Exception: logger.exception("Reseed failed") return 0 stored = 0 for ri_bytes in ri_bytes_list: try: ri = RouterInfo.from_bytes(ri_bytes) ri_hash = hashlib.sha256(ri.identity.to_bytes()).digest() entry = NetDBEntry( key=ri_hash, entry_type=EntryType.ROUTER_INFO, data=ri_bytes, received_ms=int(time.time() * 1000), ) self._context.datastore.put(entry) stored += 1 except Exception: logger.debug("Failed to parse RouterInfo from reseed", exc_info=True) continue logger.info("Stored %d/%d RouterInfos from reseed", stored, len(ri_bytes_list)) return stored async def _connect_initial_peers(self) -> int: """Connect to peers from the netdb. Returns the number of successful connections. """ if self._context is None or self._peer_connector is None: return 0 # Get all known RouterInfos from datastore, shuffle for diversity all_entries = self._context.datastore.get_all() random.shuffle(all_entries) connected = 0 attempts = 0 max_attempts = _INITIAL_PEER_COUNT * 6 # Try up to 30 peers for entry in all_entries: if connected >= _INITIAL_PEER_COUNT or attempts >= max_attempts: break if entry.entry_type != EntryType.ROUTER_INFO: continue try: ri = RouterInfo.from_bytes(entry.data) params = extract_ntcp2_address(ri) if params is None: continue # Skip if already connected peer_hash = entry.key if self._conn_pool.is_connected(peer_hash): continue attempts += 1 self._peers_attempted += 1 host, port_num = params[0], params[1] logger.info( "Attempting peer %d: %s:%d (hash=%s...)", attempts, host, port_num, peer_hash[:4].hex(), ) conn = await asyncio.wait_for( self._peer_connector.connect(ri), timeout=_PEER_CONNECT_TIMEOUT, ) if conn is not None: self._conn_pool.add(peer_hash, conn) connected += 1 self._peers_successful += 1 logger.info( "Connected to peer %d/%d: %s:%d (hash=%s...)", connected, _INITIAL_PEER_COUNT, host, port_num, peer_hash[:4].hex(), ) # Start reading messages asyncio.create_task(self._read_peer_messages(peer_hash, conn)) except asyncio.TimeoutError: self._peers_failed += 1 logger.info( "Peer connection timed out after %ds: %s...", _PEER_CONNECT_TIMEOUT, entry.key[:4].hex(), ) continue except Exception as e: self._peers_failed += 1 logger.info( "Failed to connect to peer %s...: %s", entry.key[:4].hex(), e, ) logger.debug("Peer connect traceback", exc_info=True) continue logger.info( "Initial peer connections: %d successful, %d failed, %d attempted", connected, attempts - connected, attempts, ) return connected async def _exchange_router_info(self) -> None: """Send our RouterInfo to all connected peers via DatabaseStore.""" if self._our_router_info is None or self._our_identity is None: return our_ri_bytes = self._our_router_info.to_bytes() our_hash = hashlib.sha256(self._our_identity.to_bytes()).digest() db_store_msg = build_database_store_i2np(our_hash, our_ri_bytes) peer_hashes = self._conn_pool.get_all_peer_hashes() for peer_hash in peer_hashes: conn = self._conn_pool.get(peer_hash) if conn is None: continue try: await conn.send_i2np(db_store_msg) self._i2np_messages_sent += 1 logger.debug("Sent DatabaseStore to peer %s...", peer_hash[:4].hex()) except Exception as e: logger.info("Failed to send DatabaseStore to peer %s...: %s", peer_hash[:4].hex(), e) self._conn_pool.remove(peer_hash) def _load_netdb_from_disk(self) -> None: """Load RouterInfo entries from SQLite into the in-memory DataStore.""" data_dir = os.path.expanduser(self._config.data_dir) self._sqlite_netdb = SqliteNetDB(data_dir) entries = self._sqlite_netdb.load_all() if not entries: logger.info("No cached NetDB entries on disk") return assert self._context is not None stored = 0 for key, data in entries: entry = NetDBEntry( key=key, entry_type=EntryType.ROUTER_INFO, data=data, received_ms=int(time.time() * 1000), ) self._context.datastore.put(entry) stored += 1 logger.info("Loaded %d cached NetDB entries from disk", stored) def _persist_netdb_to_disk(self) -> None: """Persist current in-memory NetDB entries to SQLite.""" if self._sqlite_netdb is None or self._context is None: return count = 0 for entry in self._context.datastore.get_all(): if entry.entry_type == EntryType.ROUTER_INFO: self._sqlite_netdb.store(entry.key, entry.data) count += 1 logger.info("Persisted %d NetDB entries to disk", count) async def _start_health_server(self) -> None: """Start the HTTP health endpoint.""" self._health_server = await start_health_server( get_status_fn=self.get_status, host=self._config.listen_host, port=self._config.health_port, ) def _start_maintenance_tasks(self) -> None: """Spawn background maintenance loops.""" assert self._our_router_info is not None and self._our_identity is not None assert self._context is not None # Build the DatabaseStore message for broadcast our_ri_bytes = self._our_router_info.to_bytes() our_hash = hashlib.sha256(self._our_identity.to_bytes()).digest() db_store_msg = build_database_store_i2np(our_hash, our_ri_bytes) self._maintenance_tasks = [ asyncio.create_task( keepalive_loop(self._conn_pool, interval=60.0), name="keepalive", ), asyncio.create_task( reconnect_loop( self._conn_pool, self._context.datastore, self._peer_connector, target_peers=_INITIAL_PEER_COUNT, interval=30.0, ), name="reconnect", ), asyncio.create_task( routerinfo_broadcast_loop( self._conn_pool, db_store_msg, interval=1800.0, ), name="broadcast", ), asyncio.create_task( netdb_eviction_loop( self._context.datastore, sqlite_netdb=self._sqlite_netdb, interval=600.0, max_age_ms=86_400_000, ), name="eviction", ), ] logger.info("Started %d maintenance tasks", len(self._maintenance_tasks)) async def _read_peer_messages(self, peer_hash: bytes, conn) -> None: """Continuously read frames from a peer connection. Dispatches I2NP blocks to the MessageReader. Removes the peer from the pool on disconnect. """ from i2p_transport.ntcp2_blocks import BLOCK_I2NP, BLOCK_TERMINATION try: while conn.is_alive(): blocks = await conn.recv_frame() for block in blocks: if block.block_type == BLOCK_I2NP and self._message_reader: self._message_reader.handle_i2np_block(block.data, bootstrap=self) elif block.block_type == BLOCK_TERMINATION: reason = block.data[-1] if block.data else -1 logger.info( "Peer %s... sent termination (reason=%d, data=%s)", peer_hash[:4].hex(), reason, block.data.hex(), ) return except Exception as e: logger.info("Peer %s... disconnected: %s", peer_hash[:4].hex(), e) logger.debug("Peer %s... disconnect traceback", peer_hash[:4].hex(), exc_info=True) finally: self._conn_pool.remove(peer_hash) async def run_main_loop(self, duration_seconds: float = 0) -> None: """Run the main event loop. If duration_seconds > 0, runs for that duration then stops. If 0, runs until shutdown() is called. """ if self._state != "running": await self.start() logger.info("Main loop started") try: if duration_seconds > 0: await asyncio.sleep(duration_seconds) else: # Run forever until cancelled while self._state == "running": await asyncio.sleep(1) except asyncio.CancelledError: logger.info("Main loop cancelled") finally: await self.shutdown() async def shutdown(self) -> None: """Gracefully shut down the router.""" if self._state == "stopped": return logger.info("Shutting down router...") self._state = "shutting_down" # Cancel maintenance tasks for task in self._maintenance_tasks: task.cancel() for task in self._maintenance_tasks: try: await task except asyncio.CancelledError: pass self._maintenance_tasks.clear() # Stop health server if self._health_server is not None: self._health_server.close() await self._health_server.wait_closed() self._health_server = None # Close all peer connections for peer_hash in list(self._conn_pool.get_all_peer_hashes()): conn = self._conn_pool.get(peer_hash) if conn is not None: try: await conn.close() except Exception: pass self._conn_pool.remove(peer_hash) # Stop listener if self._listener is not None: self._listener.close() await self._listener.wait_closed() self._listener = None # Persist NetDB to disk before exit self._persist_netdb_to_disk() # Close SQLite if self._sqlite_netdb is not None: self._sqlite_netdb.close() self._sqlite_netdb = None self._state = "stopped" logger.info("Router shut down") def get_status(self) -> dict: """Return current router status as a detailed dict. Designed to be the single source of truth during long bootstrap and sustained operation. Rich enough to diagnose issues from a curl without needing container logs. """ uptime = 0.0 if self._started_at is not None: uptime = time.monotonic() - self._started_at router_hash = "" if self._our_identity is not None: router_hash = hashlib.sha256( self._our_identity.to_bytes() ).digest().hex() netdb_size = 0 netdb_ri_count = 0 if self._context is not None: netdb_size = self._context.datastore.count() netdb_ri_count = self._context.datastore.count_by_type(EntryType.ROUTER_INFO) sqlite_count = 0 if self._sqlite_netdb is not None: try: sqlite_count = self._sqlite_netdb.count() except Exception: pass # Per-peer connection details peers = [] for peer_hash in self._conn_pool.get_all_peer_hashes(): conn = self._conn_pool.get(peer_hash) if conn is None: continue peer_info = { "hash": peer_hash[:8].hex(), "alive": conn.is_alive(), } if hasattr(conn, "seconds_since_last_read"): peer_info["idle_read_s"] = round(conn.seconds_since_last_read(), 1) peer_info["idle_write_s"] = round(conn.seconds_since_last_write(), 1) if hasattr(conn, "_frames_sent"): peer_info["frames_sent"] = conn._frames_sent peer_info["frames_received"] = conn._frames_received peers.append(peer_info) # Maintenance task status tasks_status = {} for task in self._maintenance_tasks: name = task.get_name() if task.done(): exc = task.exception() if not task.cancelled() else None tasks_status[name] = f"stopped: {exc}" if exc else "stopped" else: tasks_status[name] = "running" # Resource usage rusage = resource.getrusage(resource.RUSAGE_SELF) rss_mb = rusage.ru_maxrss / 1024 # Linux reports in KB try: import psutil proc = psutil.Process() cpu_percent = proc.cpu_percent(interval=0) rss_mb = proc.memory_info().rss / (1024 * 1024) except ImportError: cpu_percent = None return { "state": self._state, "uptime_seconds": round(uptime, 1), "router_hash": router_hash[:16] + "..." if router_hash else "", "peer_count": self._conn_pool.active_count, "peers_attempted": self._peers_attempted, "peers_successful": self._peers_successful, "peers_failed": self._peers_failed, "peers": peers, "i2np_messages_received": self._i2np_messages_received, "i2np_messages_sent": self._i2np_messages_sent, "netdb_memory": netdb_size, "netdb_routerinfos": netdb_ri_count, "netdb_disk": sqlite_count, "resources": { "rss_mb": round(rss_mb, 1), "cpu_percent": cpu_percent, "max_rss_mb": round(rusage.ru_maxrss / 1024, 1), "user_time_s": round(rusage.ru_utime, 2), "system_time_s": round(rusage.ru_stime, 2), }, "maintenance_tasks": tasks_status, "config": { "listen_port": self._config.listen_port, "health_port": self._config.health_port, "data_dir": self._config.data_dir, "bandwidth_limit_kbps": self._config.bandwidth_limit_kbps, }, }