A Python port of the Invisible Internet Project (I2P)
at main 220 lines 7.1 kB view raw
1"""Router maintenance tasks — keepalive, reconnection, broadcast, eviction. 2 3These are long-running async tasks spawned after bootstrap completes. 4Each runs in a loop with a configurable interval. 5""" 6 7from __future__ import annotations 8 9import asyncio 10import logging 11import random 12import time 13 14from i2p_transport.ntcp2_blocks import padding_block 15 16logger = logging.getLogger(__name__) 17 18 19async def keepalive_loop(pool, interval: float = 60.0) -> None: 20 """Send Padding blocks on idle connections. 21 22 Prevents connections from being closed by the remote peer's idle 23 timeout (typically 300s). Sends a small Padding block if no frames 24 have been written recently. 25 26 Parameters 27 ---------- 28 pool: 29 ConnectionPool instance. 30 interval: 31 Seconds between keepalive checks. 32 """ 33 logger.info("Keepalive loop started (interval=%ds)", interval) 34 while True: 35 await asyncio.sleep(interval) 36 for peer_hash in list(pool.get_all_peer_hashes()): 37 conn = pool.get(peer_hash) 38 if conn is None: 39 continue 40 try: 41 idle = conn.seconds_since_last_write() 42 if idle >= interval: 43 pad_size = random.randint(0, 15) 44 await conn.send_frame([padding_block(pad_size)]) 45 logger.debug( 46 "Sent keepalive padding (%d bytes) to %s...", 47 pad_size, peer_hash[:4].hex(), 48 ) 49 except Exception: 50 logger.info("Keepalive failed for %s..., removing", peer_hash[:4].hex()) 51 pool.remove(peer_hash) 52 53 54async def reconnect_loop( 55 pool, 56 datastore, 57 peer_connector, 58 target_peers: int = 5, 59 interval: float = 30.0, 60 connect_timeout: float = 15.0, 61) -> None: 62 """Maintain target peer count by connecting to new peers from the NetDB. 63 64 Parameters 65 ---------- 66 pool: 67 ConnectionPool instance. 68 datastore: 69 DataStore instance for peer discovery. 70 peer_connector: 71 PeerConnector instance for NTCP2 handshakes. 72 target_peers: 73 Desired number of connected peers. 74 interval: 75 Seconds between reconnection checks. 76 connect_timeout: 77 Timeout per connection attempt. 78 """ 79 from i2p_netdb.datastore import EntryType 80 from i2p_data.router import RouterInfo 81 from i2p_router.peer_connector import extract_ntcp2_address 82 83 logger.info("Reconnect loop started (target=%d, interval=%ds)", target_peers, interval) 84 while True: 85 await asyncio.sleep(interval) 86 active = pool.active_count 87 if active >= target_peers: 88 continue 89 90 needed = target_peers - active 91 logger.info("Peer count %d < %d, seeking %d more", active, target_peers, needed) 92 93 # Get candidate peers from NetDB 94 all_entries = datastore.get_all() 95 random.shuffle(all_entries) 96 connected = 0 97 98 for entry in all_entries: 99 if connected >= needed: 100 break 101 if entry.entry_type != EntryType.ROUTER_INFO: 102 continue 103 if pool.is_connected(entry.key): 104 continue 105 106 try: 107 ri = RouterInfo.from_bytes(entry.data) 108 params = extract_ntcp2_address(ri) 109 if params is None: 110 continue 111 112 host, port_num = params[0], params[1] 113 logger.info( 114 "Reconnect attempting %s:%d (hash=%s...)", 115 host, port_num, entry.key[:4].hex(), 116 ) 117 conn = await asyncio.wait_for( 118 peer_connector.connect(ri), 119 timeout=connect_timeout, 120 ) 121 if conn is not None: 122 pool.add(entry.key, conn) 123 connected += 1 124 logger.info( 125 "Reconnected to peer %s:%d (hash=%s...)", 126 host, port_num, entry.key[:4].hex(), 127 ) 128 except asyncio.TimeoutError: 129 logger.info( 130 "Reconnect timed out for %s...", entry.key[:4].hex(), 131 ) 132 continue 133 except Exception as e: 134 logger.info( 135 "Reconnect failed for %s...: %s", entry.key[:4].hex(), e, 136 ) 137 continue 138 139 if connected > 0: 140 logger.info("Reconnected to %d peers (total: %d)", connected, pool.active_count) 141 142 143async def routerinfo_broadcast_loop( 144 pool, 145 db_store_msg: bytes, 146 interval: float = 1800.0, 147) -> None: 148 """Broadcast our RouterInfo to all connected peers periodically. 149 150 Parameters 151 ---------- 152 pool: 153 ConnectionPool instance. 154 db_store_msg: 155 Pre-built DatabaseStore I2NP message containing our RouterInfo. 156 interval: 157 Seconds between broadcasts (default 30 min). 158 """ 159 logger.info("RouterInfo broadcast loop started (interval=%ds)", interval) 160 while True: 161 await asyncio.sleep(interval) 162 sent = 0 163 for peer_hash in list(pool.get_all_peer_hashes()): 164 conn = pool.get(peer_hash) 165 if conn is None: 166 continue 167 try: 168 await conn.send_i2np(db_store_msg) 169 sent += 1 170 except Exception: 171 logger.info("Broadcast failed for %s..., removing", peer_hash[:4].hex()) 172 pool.remove(peer_hash) 173 if sent > 0: 174 logger.info("Broadcast RouterInfo to %d peers", sent) 175 176 177async def netdb_eviction_loop( 178 datastore, 179 sqlite_netdb=None, 180 interval: float = 600.0, 181 max_age_ms: int = 86_400_000, 182) -> None: 183 """Evict expired entries from the in-memory DataStore and SQLite cache. 184 185 Parameters 186 ---------- 187 datastore: 188 In-memory DataStore instance. 189 sqlite_netdb: 190 Optional SqliteNetDB instance for disk persistence. 191 interval: 192 Seconds between eviction runs (default 10 min). 193 max_age_ms: 194 Maximum entry age in milliseconds (default 24h). 195 """ 196 logger.info("NetDB eviction loop started (interval=%ds, max_age=%dh)", interval, max_age_ms // 3_600_000) 197 while True: 198 await asyncio.sleep(interval) 199 200 # Evict from in-memory store 201 now_ms = int(time.time() * 1000) 202 cutoff = now_ms - max_age_ms 203 expired_keys = [ 204 k for k, e in list(zip(datastore.get_all_keys(), datastore.get_all())) 205 if e.received_ms < cutoff 206 ] 207 for k in expired_keys: 208 datastore.remove(k) 209 210 # Evict from SQLite 211 sqlite_count = 0 212 if sqlite_netdb is not None: 213 sqlite_count = sqlite_netdb.evict_expired() 214 215 total = len(expired_keys) + sqlite_count 216 if total > 0: 217 logger.info( 218 "Evicted %d expired NetDB entries (%d memory, %d disk)", 219 total, len(expired_keys), sqlite_count, 220 )