A Python port of the Invisible Internet Project (I2P)
1"""Router bootstrap orchestrator — startup sequence and main event loop.
2
3Orchestrates the full I2P router startup:
41. Load or generate router identity (Ed25519 + X25519 keys)
52. Start NTCP2 listener
63. Reseed from HTTPS servers if netdb is sparse
74. Parse RouterInfos, store in NetDB
85. Connect to initial peers
96. Exchange RouterInfo via DatabaseStore I2NP messages
107. Run the async main event loop
11
12Ported from net.i2p.router.Router bootstrap sequence.
13"""
14
15from __future__ import annotations
16
17import asyncio
18import hashlib
19import logging
20import os
21import random
22import resource
23import struct
24import time
25from dataclasses import dataclass
26from typing import TYPE_CHECKING
27
28if TYPE_CHECKING:
29 from i2p_transport.ntcp2_real_server import NTCP2RealListener
30
31from i2p_data.i2np_codec import (
32 MSG_TYPE_DATABASE_STORE,
33 encode_i2np_short,
34)
35from i2p_data.router import RouterIdentity, RouterInfo
36from i2p_netdb.datastore import DataStore, EntryType, NetDBEntry
37from i2p_netdb.reseed import ReseedClient
38from i2p_router.config import RouterConfig
39from i2p_router.core import RouterContext
40from i2p_router.identity import (
41 RouterKeyBundle,
42 build_router_identity,
43 build_router_info,
44 create_full_router_identity,
45)
46from i2p_router.peer_connector import ConnectionPool, PeerConnector, extract_ntcp2_address
47from i2p_router.health import start_health_server
48from i2p_router.maintenance import (
49 keepalive_loop,
50 reconnect_loop,
51 routerinfo_broadcast_loop,
52 netdb_eviction_loop,
53)
54from i2p_netdb.sqlite_netdb import SqliteNetDB
55
56logger = logging.getLogger(__name__)
57
58# Minimum netdb entries before we skip reseed
59_MIN_NETDB_FOR_RESEED = 50
60
61# Target number of initial peer connections
62_INITIAL_PEER_COUNT = 5
63
64# Timeout for individual peer connection attempts (seconds)
65_PEER_CONNECT_TIMEOUT = 15
66
67
68def build_database_store_i2np(router_hash: bytes, ri_bytes: bytes) -> bytes:
69 """Build an I2NP DatabaseStore message with short header.
70
71 The DatabaseStore payload format:
72 - key (32 bytes): the router hash
73 - type (1 byte): 0 = RouterInfo
74 - reply_token (4 bytes): 0 (no reply requested)
75 - data: the RouterInfo bytes
76
77 Returns the full I2NP short-header message.
78 """
79 # DatabaseStore payload: key(32) + type(1) + reply_token(4) + data
80 db_store_payload = router_hash + struct.pack("!BI", 0, 0) + ri_bytes
81
82 msg_id = int.from_bytes(os.urandom(4), "big")
83 expiration = int(time.time()) + 60 # 60s from now
84
85 return encode_i2np_short(
86 MSG_TYPE_DATABASE_STORE, msg_id, expiration, db_store_payload,
87 )
88
89
90class MessageReader:
91 """Reads I2NP blocks from NTCP2 frames and dispatches to RouterContext.
92
93 Parses the I2NP short header (type, msg_id, expiration, size, payload)
94 and forwards to the RouterContext's inbound message handler.
95 """
96
97 def __init__(self, context: RouterContext) -> None:
98 self._context = context
99
100 def handle_i2np_block(self, i2np_data: bytes, bootstrap: RouterBootstrap | None = None) -> None:
101 """Parse an I2NP short-header message and dispatch it.
102
103 Args:
104 i2np_data: Raw I2NP message with short header (11+ bytes).
105 bootstrap: Optional bootstrap instance to increment counters.
106 """
107 if len(i2np_data) < 11:
108 logger.warning("I2NP block too short: %d bytes", len(i2np_data))
109 return
110
111 msg_type, msg_id, expiration, size = struct.unpack_from("!BIiH", i2np_data)
112 payload = i2np_data[11:11 + size]
113
114 if bootstrap is not None:
115 bootstrap._i2np_messages_received += 1
116
117 logger.debug(
118 "I2NP message: type=%d id=%d size=%d", msg_type, msg_id, size,
119 )
120 self._context.process_inbound(msg_type, payload)
121
122
123class RouterBootstrap:
124 """Orchestrates the full I2P router startup sequence.
125
126 Manages the lifecycle from key generation through peer discovery,
127 connection establishment, and the main event loop.
128 """
129
130 def __init__(self, config: RouterConfig) -> None:
131 self._config = config
132 self._state = "stopped"
133 self._key_bundle: RouterKeyBundle | None = None
134 self._our_identity: RouterIdentity | None = None
135 self._our_router_info: RouterInfo | None = None
136 self._context: RouterContext | None = None
137 self._conn_pool = ConnectionPool(max_connections=50)
138 self._peer_connector: PeerConnector | None = None
139 self._reseed_client = ReseedClient(
140 reseed_urls=config.reseed_urls or None,
141 )
142 self._listener: NTCP2RealListener | None = None
143 self._message_reader: MessageReader | None = None
144 self._main_loop_task: asyncio.Task | None = None
145 self._started_at: float | None = None
146 self._health_server: asyncio.Server | None = None
147 self._sqlite_netdb: SqliteNetDB | None = None
148 self._maintenance_tasks: list[asyncio.Task] = []
149 # Counters for observability
150 self._peers_attempted = 0
151 self._peers_successful = 0
152 self._peers_failed = 0
153 self._i2np_messages_received = 0
154 self._i2np_messages_sent = 0
155
156 @property
157 def config(self) -> RouterConfig:
158 return self._config
159
160 @property
161 def state(self) -> str:
162 return self._state
163
164 @property
165 def peer_count(self) -> int:
166 return self._conn_pool.active_count
167
168 async def start(self) -> None:
169 """Run the full bootstrap sequence.
170
171 1. Load or generate identity
172 2. Start NTCP2 listener
173 3. Reseed if needed
174 4. Connect to initial peers
175 5. Exchange RouterInfo
176 6. Start main loop
177 """
178 if self._state == "running":
179 return
180
181 self._state = "starting"
182 self._started_at = time.monotonic()
183
184 try:
185 self._state = "loading_identity"
186 await self._load_or_generate_identity()
187
188 self._state = "starting_listener"
189 await self._start_listener()
190
191 # Start health server EARLY so we have visibility during reseed
192 self._state = "starting_health"
193 await self._start_health_server()
194
195 self._state = "loading_netdb_cache"
196 self._load_netdb_from_disk()
197
198 self._state = "reseeding"
199 await self._reseed_if_needed()
200 self._persist_netdb_to_disk()
201
202 self._state = "connecting_peers"
203 await self._connect_initial_peers()
204
205 self._state = "exchanging_routerinfo"
206 await self._exchange_router_info()
207
208 self._start_maintenance_tasks()
209 self._state = "running"
210 logger.info("Router bootstrap complete, state=running")
211 except Exception:
212 logger.exception("Bootstrap failed in state=%s", self._state)
213 self._state = "failed"
214 raise
215
216 async def _load_or_generate_identity(self) -> None:
217 """Load existing keys or generate new ones. Creates RouterContext."""
218 data_dir = os.path.expanduser(self._config.data_dir)
219 os.makedirs(data_dir, exist_ok=True)
220
221 key_path = os.path.join(data_dir, "router.keys.json")
222 bundle = RouterKeyBundle.load(key_path)
223
224 if bundle is None:
225 logger.info("No existing identity found, generating new keys")
226 bundle = RouterKeyBundle.generate()
227 bundle.save(key_path)
228 logger.info("New identity saved to %s", key_path)
229 else:
230 logger.info("Loaded existing identity from %s", key_path)
231
232 self._key_bundle = bundle
233
234 # Build RouterIdentity and signed RouterInfo
235 identity, ri = create_full_router_identity(
236 bundle,
237 self._config.listen_host,
238 self._config.listen_port,
239 )
240 self._our_identity = identity
241 self._our_router_info = ri
242
243 # Create RouterContext with our identity hash
244 ri_hash = hashlib.sha256(identity.to_bytes()).digest()
245 self._context = RouterContext(router_hash=ri_hash)
246
247 # Create PeerConnector with our own RouterInfo for msg3
248 self._peer_connector = PeerConnector(
249 our_static_keypair=(bundle.ntcp2_private, bundle.ntcp2_public),
250 our_iv=bundle.ntcp2_iv,
251 our_ri_bytes=ri.to_bytes(),
252 )
253
254 # Create MessageReader
255 self._message_reader = MessageReader(self._context)
256
257 async def _start_listener(self) -> None:
258 """Start the NTCP2 listener for inbound connections."""
259 if self._key_bundle is None or self._our_router_info is None or self._our_identity is None:
260 raise RuntimeError("Identity not loaded")
261
262 from i2p_transport.ntcp2_real_server import NTCP2RealListener
263
264 # AES key for inbound obfuscation = SHA-256 of our RouterIdentity bytes
265 ri_hash = hashlib.sha256(self._our_identity.to_bytes()).digest()
266
267 self._listener = NTCP2RealListener(
268 host=self._config.listen_host,
269 port=self._config.listen_port,
270 our_static_key=(self._key_bundle.ntcp2_private, self._key_bundle.ntcp2_public),
271 our_ri_hash=ri_hash,
272 our_iv=self._key_bundle.ntcp2_iv,
273 on_connection=self._handle_inbound_connection,
274 )
275
276 server = await self._listener.start()
277 actual_port = server.sockets[0].getsockname()[1] if server.sockets else 0
278 logger.info("NTCP2 listener started on %s:%d", self._config.listen_host, actual_port)
279
280 async def _handle_inbound_connection(self, conn) -> None:
281 """Handle a new inbound NTCP2 connection."""
282 peer_hash = conn.remote_hash
283 if not self._conn_pool.add(peer_hash, conn):
284 logger.warning("Connection pool full, rejecting inbound peer")
285 await conn.close()
286 return
287 logger.info("Accepted inbound peer connection (hash=%s...)", peer_hash[:4].hex())
288 # Start reading messages from this connection
289 asyncio.create_task(self._read_peer_messages(peer_hash, conn))
290
291 async def _reseed_if_needed(self) -> int:
292 """Fetch RouterInfos from reseed servers if netdb is sparse.
293
294 Returns the number of new RouterInfos stored.
295 """
296 if self._context is None:
297 return 0
298
299 current_count = self._context.datastore.count()
300 if current_count >= _MIN_NETDB_FOR_RESEED:
301 logger.info("NetDB has %d entries, skipping reseed", current_count)
302 return 0
303
304 logger.info("NetDB has %d entries (<%d), reseeding...", current_count, _MIN_NETDB_FOR_RESEED)
305
306 try:
307 ri_bytes_list = await self._reseed_client.reseed()
308 except Exception:
309 logger.exception("Reseed failed")
310 return 0
311
312 stored = 0
313 for ri_bytes in ri_bytes_list:
314 try:
315 ri = RouterInfo.from_bytes(ri_bytes)
316 ri_hash = hashlib.sha256(ri.identity.to_bytes()).digest()
317 entry = NetDBEntry(
318 key=ri_hash,
319 entry_type=EntryType.ROUTER_INFO,
320 data=ri_bytes,
321 received_ms=int(time.time() * 1000),
322 )
323 self._context.datastore.put(entry)
324 stored += 1
325 except Exception:
326 logger.debug("Failed to parse RouterInfo from reseed", exc_info=True)
327 continue
328
329 logger.info("Stored %d/%d RouterInfos from reseed", stored, len(ri_bytes_list))
330 return stored
331
332 async def _connect_initial_peers(self) -> int:
333 """Connect to peers from the netdb.
334
335 Returns the number of successful connections.
336 """
337 if self._context is None or self._peer_connector is None:
338 return 0
339
340 # Get all known RouterInfos from datastore, shuffle for diversity
341 all_entries = self._context.datastore.get_all()
342 random.shuffle(all_entries)
343 connected = 0
344 attempts = 0
345 max_attempts = _INITIAL_PEER_COUNT * 6 # Try up to 30 peers
346
347 for entry in all_entries:
348 if connected >= _INITIAL_PEER_COUNT or attempts >= max_attempts:
349 break
350
351 if entry.entry_type != EntryType.ROUTER_INFO:
352 continue
353
354 try:
355 ri = RouterInfo.from_bytes(entry.data)
356 params = extract_ntcp2_address(ri)
357 if params is None:
358 continue
359
360 # Skip if already connected
361 peer_hash = entry.key
362 if self._conn_pool.is_connected(peer_hash):
363 continue
364
365 attempts += 1
366 self._peers_attempted += 1
367 host, port_num = params[0], params[1]
368 logger.info(
369 "Attempting peer %d: %s:%d (hash=%s...)",
370 attempts, host, port_num, peer_hash[:4].hex(),
371 )
372 conn = await asyncio.wait_for(
373 self._peer_connector.connect(ri),
374 timeout=_PEER_CONNECT_TIMEOUT,
375 )
376 if conn is not None:
377 self._conn_pool.add(peer_hash, conn)
378 connected += 1
379 self._peers_successful += 1
380 logger.info(
381 "Connected to peer %d/%d: %s:%d (hash=%s...)",
382 connected, _INITIAL_PEER_COUNT, host, port_num, peer_hash[:4].hex(),
383 )
384 # Start reading messages
385 asyncio.create_task(self._read_peer_messages(peer_hash, conn))
386
387 except asyncio.TimeoutError:
388 self._peers_failed += 1
389 logger.info(
390 "Peer connection timed out after %ds: %s...",
391 _PEER_CONNECT_TIMEOUT, entry.key[:4].hex(),
392 )
393 continue
394 except Exception as e:
395 self._peers_failed += 1
396 logger.info(
397 "Failed to connect to peer %s...: %s",
398 entry.key[:4].hex(), e,
399 )
400 logger.debug("Peer connect traceback", exc_info=True)
401 continue
402
403 logger.info(
404 "Initial peer connections: %d successful, %d failed, %d attempted",
405 connected, attempts - connected, attempts,
406 )
407 return connected
408
409 async def _exchange_router_info(self) -> None:
410 """Send our RouterInfo to all connected peers via DatabaseStore."""
411 if self._our_router_info is None or self._our_identity is None:
412 return
413
414 our_ri_bytes = self._our_router_info.to_bytes()
415 our_hash = hashlib.sha256(self._our_identity.to_bytes()).digest()
416 db_store_msg = build_database_store_i2np(our_hash, our_ri_bytes)
417
418 peer_hashes = self._conn_pool.get_all_peer_hashes()
419 for peer_hash in peer_hashes:
420 conn = self._conn_pool.get(peer_hash)
421 if conn is None:
422 continue
423 try:
424 await conn.send_i2np(db_store_msg)
425 self._i2np_messages_sent += 1
426 logger.debug("Sent DatabaseStore to peer %s...", peer_hash[:4].hex())
427 except Exception as e:
428 logger.info("Failed to send DatabaseStore to peer %s...: %s", peer_hash[:4].hex(), e)
429 self._conn_pool.remove(peer_hash)
430
431 def _load_netdb_from_disk(self) -> None:
432 """Load RouterInfo entries from SQLite into the in-memory DataStore."""
433 data_dir = os.path.expanduser(self._config.data_dir)
434 self._sqlite_netdb = SqliteNetDB(data_dir)
435
436 entries = self._sqlite_netdb.load_all()
437 if not entries:
438 logger.info("No cached NetDB entries on disk")
439 return
440 assert self._context is not None
441
442 stored = 0
443 for key, data in entries:
444 entry = NetDBEntry(
445 key=key,
446 entry_type=EntryType.ROUTER_INFO,
447 data=data,
448 received_ms=int(time.time() * 1000),
449 )
450 self._context.datastore.put(entry)
451 stored += 1
452
453 logger.info("Loaded %d cached NetDB entries from disk", stored)
454
455 def _persist_netdb_to_disk(self) -> None:
456 """Persist current in-memory NetDB entries to SQLite."""
457 if self._sqlite_netdb is None or self._context is None:
458 return
459
460 count = 0
461 for entry in self._context.datastore.get_all():
462 if entry.entry_type == EntryType.ROUTER_INFO:
463 self._sqlite_netdb.store(entry.key, entry.data)
464 count += 1
465
466 logger.info("Persisted %d NetDB entries to disk", count)
467
468 async def _start_health_server(self) -> None:
469 """Start the HTTP health endpoint."""
470 self._health_server = await start_health_server(
471 get_status_fn=self.get_status,
472 host=self._config.listen_host,
473 port=self._config.health_port,
474 )
475
476 def _start_maintenance_tasks(self) -> None:
477 """Spawn background maintenance loops."""
478 assert self._our_router_info is not None and self._our_identity is not None
479 assert self._context is not None
480 # Build the DatabaseStore message for broadcast
481 our_ri_bytes = self._our_router_info.to_bytes()
482 our_hash = hashlib.sha256(self._our_identity.to_bytes()).digest()
483 db_store_msg = build_database_store_i2np(our_hash, our_ri_bytes)
484
485 self._maintenance_tasks = [
486 asyncio.create_task(
487 keepalive_loop(self._conn_pool, interval=60.0),
488 name="keepalive",
489 ),
490 asyncio.create_task(
491 reconnect_loop(
492 self._conn_pool,
493 self._context.datastore,
494 self._peer_connector,
495 target_peers=_INITIAL_PEER_COUNT,
496 interval=30.0,
497 ),
498 name="reconnect",
499 ),
500 asyncio.create_task(
501 routerinfo_broadcast_loop(
502 self._conn_pool,
503 db_store_msg,
504 interval=1800.0,
505 ),
506 name="broadcast",
507 ),
508 asyncio.create_task(
509 netdb_eviction_loop(
510 self._context.datastore,
511 sqlite_netdb=self._sqlite_netdb,
512 interval=600.0,
513 max_age_ms=86_400_000,
514 ),
515 name="eviction",
516 ),
517 ]
518 logger.info("Started %d maintenance tasks", len(self._maintenance_tasks))
519
520 async def _read_peer_messages(self, peer_hash: bytes, conn) -> None:
521 """Continuously read frames from a peer connection.
522
523 Dispatches I2NP blocks to the MessageReader.
524 Removes the peer from the pool on disconnect.
525 """
526 from i2p_transport.ntcp2_blocks import BLOCK_I2NP, BLOCK_TERMINATION
527
528 try:
529 while conn.is_alive():
530 blocks = await conn.recv_frame()
531 for block in blocks:
532 if block.block_type == BLOCK_I2NP and self._message_reader:
533 self._message_reader.handle_i2np_block(block.data, bootstrap=self)
534 elif block.block_type == BLOCK_TERMINATION:
535 reason = block.data[-1] if block.data else -1
536 logger.info(
537 "Peer %s... sent termination (reason=%d, data=%s)",
538 peer_hash[:4].hex(), reason, block.data.hex(),
539 )
540 return
541 except Exception as e:
542 logger.info("Peer %s... disconnected: %s", peer_hash[:4].hex(), e)
543 logger.debug("Peer %s... disconnect traceback", peer_hash[:4].hex(), exc_info=True)
544 finally:
545 self._conn_pool.remove(peer_hash)
546
547 async def run_main_loop(self, duration_seconds: float = 0) -> None:
548 """Run the main event loop.
549
550 If duration_seconds > 0, runs for that duration then stops.
551 If 0, runs until shutdown() is called.
552 """
553 if self._state != "running":
554 await self.start()
555
556 logger.info("Main loop started")
557
558 try:
559 if duration_seconds > 0:
560 await asyncio.sleep(duration_seconds)
561 else:
562 # Run forever until cancelled
563 while self._state == "running":
564 await asyncio.sleep(1)
565 except asyncio.CancelledError:
566 logger.info("Main loop cancelled")
567 finally:
568 await self.shutdown()
569
570 async def shutdown(self) -> None:
571 """Gracefully shut down the router."""
572 if self._state == "stopped":
573 return
574
575 logger.info("Shutting down router...")
576 self._state = "shutting_down"
577
578 # Cancel maintenance tasks
579 for task in self._maintenance_tasks:
580 task.cancel()
581 for task in self._maintenance_tasks:
582 try:
583 await task
584 except asyncio.CancelledError:
585 pass
586 self._maintenance_tasks.clear()
587
588 # Stop health server
589 if self._health_server is not None:
590 self._health_server.close()
591 await self._health_server.wait_closed()
592 self._health_server = None
593
594 # Close all peer connections
595 for peer_hash in list(self._conn_pool.get_all_peer_hashes()):
596 conn = self._conn_pool.get(peer_hash)
597 if conn is not None:
598 try:
599 await conn.close()
600 except Exception:
601 pass
602 self._conn_pool.remove(peer_hash)
603
604 # Stop listener
605 if self._listener is not None:
606 self._listener.close()
607 await self._listener.wait_closed()
608 self._listener = None
609
610 # Persist NetDB to disk before exit
611 self._persist_netdb_to_disk()
612
613 # Close SQLite
614 if self._sqlite_netdb is not None:
615 self._sqlite_netdb.close()
616 self._sqlite_netdb = None
617
618 self._state = "stopped"
619 logger.info("Router shut down")
620
621 def get_status(self) -> dict:
622 """Return current router status as a detailed dict.
623
624 Designed to be the single source of truth during long bootstrap
625 and sustained operation. Rich enough to diagnose issues from
626 a curl without needing container logs.
627 """
628 uptime = 0.0
629 if self._started_at is not None:
630 uptime = time.monotonic() - self._started_at
631
632 router_hash = ""
633 if self._our_identity is not None:
634 router_hash = hashlib.sha256(
635 self._our_identity.to_bytes()
636 ).digest().hex()
637
638 netdb_size = 0
639 netdb_ri_count = 0
640 if self._context is not None:
641 netdb_size = self._context.datastore.count()
642 netdb_ri_count = self._context.datastore.count_by_type(EntryType.ROUTER_INFO)
643
644 sqlite_count = 0
645 if self._sqlite_netdb is not None:
646 try:
647 sqlite_count = self._sqlite_netdb.count()
648 except Exception:
649 pass
650
651 # Per-peer connection details
652 peers = []
653 for peer_hash in self._conn_pool.get_all_peer_hashes():
654 conn = self._conn_pool.get(peer_hash)
655 if conn is None:
656 continue
657 peer_info = {
658 "hash": peer_hash[:8].hex(),
659 "alive": conn.is_alive(),
660 }
661 if hasattr(conn, "seconds_since_last_read"):
662 peer_info["idle_read_s"] = round(conn.seconds_since_last_read(), 1)
663 peer_info["idle_write_s"] = round(conn.seconds_since_last_write(), 1)
664 if hasattr(conn, "_frames_sent"):
665 peer_info["frames_sent"] = conn._frames_sent
666 peer_info["frames_received"] = conn._frames_received
667 peers.append(peer_info)
668
669 # Maintenance task status
670 tasks_status = {}
671 for task in self._maintenance_tasks:
672 name = task.get_name()
673 if task.done():
674 exc = task.exception() if not task.cancelled() else None
675 tasks_status[name] = f"stopped: {exc}" if exc else "stopped"
676 else:
677 tasks_status[name] = "running"
678
679 # Resource usage
680 rusage = resource.getrusage(resource.RUSAGE_SELF)
681 rss_mb = rusage.ru_maxrss / 1024 # Linux reports in KB
682 try:
683 import psutil
684 proc = psutil.Process()
685 cpu_percent = proc.cpu_percent(interval=0)
686 rss_mb = proc.memory_info().rss / (1024 * 1024)
687 except ImportError:
688 cpu_percent = None
689
690 return {
691 "state": self._state,
692 "uptime_seconds": round(uptime, 1),
693 "router_hash": router_hash[:16] + "..." if router_hash else "",
694 "peer_count": self._conn_pool.active_count,
695 "peers_attempted": self._peers_attempted,
696 "peers_successful": self._peers_successful,
697 "peers_failed": self._peers_failed,
698 "peers": peers,
699 "i2np_messages_received": self._i2np_messages_received,
700 "i2np_messages_sent": self._i2np_messages_sent,
701 "netdb_memory": netdb_size,
702 "netdb_routerinfos": netdb_ri_count,
703 "netdb_disk": sqlite_count,
704 "resources": {
705 "rss_mb": round(rss_mb, 1),
706 "cpu_percent": cpu_percent,
707 "max_rss_mb": round(rusage.ru_maxrss / 1024, 1),
708 "user_time_s": round(rusage.ru_utime, 2),
709 "system_time_s": round(rusage.ru_stime, 2),
710 },
711 "maintenance_tasks": tasks_status,
712 "config": {
713 "listen_port": self._config.listen_port,
714 "health_port": self._config.health_port,
715 "data_dir": self._config.data_dir,
716 "bandwidth_limit_kbps": self._config.bandwidth_limit_kbps,
717 },
718 }