"""Router maintenance tasks — keepalive, reconnection, broadcast, eviction. These are long-running async tasks spawned after bootstrap completes. Each runs in a loop with a configurable interval. """ from __future__ import annotations import asyncio import logging import random import time from i2p_transport.ntcp2_blocks import padding_block logger = logging.getLogger(__name__) async def keepalive_loop(pool, interval: float = 60.0) -> None: """Send Padding blocks on idle connections. Prevents connections from being closed by the remote peer's idle timeout (typically 300s). Sends a small Padding block if no frames have been written recently. Parameters ---------- pool: ConnectionPool instance. interval: Seconds between keepalive checks. """ logger.info("Keepalive loop started (interval=%ds)", interval) while True: await asyncio.sleep(interval) for peer_hash in list(pool.get_all_peer_hashes()): conn = pool.get(peer_hash) if conn is None: continue try: idle = conn.seconds_since_last_write() if idle >= interval: pad_size = random.randint(0, 15) await conn.send_frame([padding_block(pad_size)]) logger.debug( "Sent keepalive padding (%d bytes) to %s...", pad_size, peer_hash[:4].hex(), ) except Exception: logger.info("Keepalive failed for %s..., removing", peer_hash[:4].hex()) pool.remove(peer_hash) async def reconnect_loop( pool, datastore, peer_connector, target_peers: int = 5, interval: float = 30.0, connect_timeout: float = 15.0, ) -> None: """Maintain target peer count by connecting to new peers from the NetDB. Parameters ---------- pool: ConnectionPool instance. datastore: DataStore instance for peer discovery. peer_connector: PeerConnector instance for NTCP2 handshakes. target_peers: Desired number of connected peers. interval: Seconds between reconnection checks. connect_timeout: Timeout per connection attempt. """ from i2p_netdb.datastore import EntryType from i2p_data.router import RouterInfo from i2p_router.peer_connector import extract_ntcp2_address logger.info("Reconnect loop started (target=%d, interval=%ds)", target_peers, interval) while True: await asyncio.sleep(interval) active = pool.active_count if active >= target_peers: continue needed = target_peers - active logger.info("Peer count %d < %d, seeking %d more", active, target_peers, needed) # Get candidate peers from NetDB all_entries = datastore.get_all() random.shuffle(all_entries) connected = 0 for entry in all_entries: if connected >= needed: break if entry.entry_type != EntryType.ROUTER_INFO: continue if pool.is_connected(entry.key): continue try: ri = RouterInfo.from_bytes(entry.data) params = extract_ntcp2_address(ri) if params is None: continue host, port_num = params[0], params[1] logger.info( "Reconnect attempting %s:%d (hash=%s...)", host, port_num, entry.key[:4].hex(), ) conn = await asyncio.wait_for( peer_connector.connect(ri), timeout=connect_timeout, ) if conn is not None: pool.add(entry.key, conn) connected += 1 logger.info( "Reconnected to peer %s:%d (hash=%s...)", host, port_num, entry.key[:4].hex(), ) except asyncio.TimeoutError: logger.info( "Reconnect timed out for %s...", entry.key[:4].hex(), ) continue except Exception as e: logger.info( "Reconnect failed for %s...: %s", entry.key[:4].hex(), e, ) continue if connected > 0: logger.info("Reconnected to %d peers (total: %d)", connected, pool.active_count) async def routerinfo_broadcast_loop( pool, db_store_msg: bytes, interval: float = 1800.0, ) -> None: """Broadcast our RouterInfo to all connected peers periodically. Parameters ---------- pool: ConnectionPool instance. db_store_msg: Pre-built DatabaseStore I2NP message containing our RouterInfo. interval: Seconds between broadcasts (default 30 min). """ logger.info("RouterInfo broadcast loop started (interval=%ds)", interval) while True: await asyncio.sleep(interval) sent = 0 for peer_hash in list(pool.get_all_peer_hashes()): conn = pool.get(peer_hash) if conn is None: continue try: await conn.send_i2np(db_store_msg) sent += 1 except Exception: logger.info("Broadcast failed for %s..., removing", peer_hash[:4].hex()) pool.remove(peer_hash) if sent > 0: logger.info("Broadcast RouterInfo to %d peers", sent) async def netdb_eviction_loop( datastore, sqlite_netdb=None, interval: float = 600.0, max_age_ms: int = 86_400_000, ) -> None: """Evict expired entries from the in-memory DataStore and SQLite cache. Parameters ---------- datastore: In-memory DataStore instance. sqlite_netdb: Optional SqliteNetDB instance for disk persistence. interval: Seconds between eviction runs (default 10 min). max_age_ms: Maximum entry age in milliseconds (default 24h). """ logger.info("NetDB eviction loop started (interval=%ds, max_age=%dh)", interval, max_age_ms // 3_600_000) while True: await asyncio.sleep(interval) # Evict from in-memory store now_ms = int(time.time() * 1000) cutoff = now_ms - max_age_ms expired_keys = [ k for k, e in list(zip(datastore.get_all_keys(), datastore.get_all())) if e.received_ms < cutoff ] for k in expired_keys: datastore.remove(k) # Evict from SQLite sqlite_count = 0 if sqlite_netdb is not None: sqlite_count = sqlite_netdb.evict_expired() total = len(expired_keys) + sqlite_count if total > 0: logger.info( "Evicted %d expired NetDB entries (%d memory, %d disk)", total, len(expired_keys), sqlite_count, )