A Python port of the Invisible Internet Project (I2P)
1"""Router maintenance tasks — keepalive, reconnection, broadcast, eviction.
2
3These are long-running async tasks spawned after bootstrap completes.
4Each runs in a loop with a configurable interval.
5"""
6
7from __future__ import annotations
8
9import asyncio
10import logging
11import random
12import time
13
14from i2p_transport.ntcp2_blocks import padding_block
15
16logger = logging.getLogger(__name__)
17
18
19async def keepalive_loop(pool, interval: float = 60.0) -> None:
20 """Send Padding blocks on idle connections.
21
22 Prevents connections from being closed by the remote peer's idle
23 timeout (typically 300s). Sends a small Padding block if no frames
24 have been written recently.
25
26 Parameters
27 ----------
28 pool:
29 ConnectionPool instance.
30 interval:
31 Seconds between keepalive checks.
32 """
33 logger.info("Keepalive loop started (interval=%ds)", interval)
34 while True:
35 await asyncio.sleep(interval)
36 for peer_hash in list(pool.get_all_peer_hashes()):
37 conn = pool.get(peer_hash)
38 if conn is None:
39 continue
40 try:
41 idle = conn.seconds_since_last_write()
42 if idle >= interval:
43 pad_size = random.randint(0, 15)
44 await conn.send_frame([padding_block(pad_size)])
45 logger.debug(
46 "Sent keepalive padding (%d bytes) to %s...",
47 pad_size, peer_hash[:4].hex(),
48 )
49 except Exception:
50 logger.info("Keepalive failed for %s..., removing", peer_hash[:4].hex())
51 pool.remove(peer_hash)
52
53
54async def reconnect_loop(
55 pool,
56 datastore,
57 peer_connector,
58 target_peers: int = 5,
59 interval: float = 30.0,
60 connect_timeout: float = 15.0,
61) -> None:
62 """Maintain target peer count by connecting to new peers from the NetDB.
63
64 Parameters
65 ----------
66 pool:
67 ConnectionPool instance.
68 datastore:
69 DataStore instance for peer discovery.
70 peer_connector:
71 PeerConnector instance for NTCP2 handshakes.
72 target_peers:
73 Desired number of connected peers.
74 interval:
75 Seconds between reconnection checks.
76 connect_timeout:
77 Timeout per connection attempt.
78 """
79 from i2p_netdb.datastore import EntryType
80 from i2p_data.router import RouterInfo
81 from i2p_router.peer_connector import extract_ntcp2_address
82
83 logger.info("Reconnect loop started (target=%d, interval=%ds)", target_peers, interval)
84 while True:
85 await asyncio.sleep(interval)
86 active = pool.active_count
87 if active >= target_peers:
88 continue
89
90 needed = target_peers - active
91 logger.info("Peer count %d < %d, seeking %d more", active, target_peers, needed)
92
93 # Get candidate peers from NetDB
94 all_entries = datastore.get_all()
95 random.shuffle(all_entries)
96 connected = 0
97
98 for entry in all_entries:
99 if connected >= needed:
100 break
101 if entry.entry_type != EntryType.ROUTER_INFO:
102 continue
103 if pool.is_connected(entry.key):
104 continue
105
106 try:
107 ri = RouterInfo.from_bytes(entry.data)
108 params = extract_ntcp2_address(ri)
109 if params is None:
110 continue
111
112 host, port_num = params[0], params[1]
113 logger.info(
114 "Reconnect attempting %s:%d (hash=%s...)",
115 host, port_num, entry.key[:4].hex(),
116 )
117 conn = await asyncio.wait_for(
118 peer_connector.connect(ri),
119 timeout=connect_timeout,
120 )
121 if conn is not None:
122 pool.add(entry.key, conn)
123 connected += 1
124 logger.info(
125 "Reconnected to peer %s:%d (hash=%s...)",
126 host, port_num, entry.key[:4].hex(),
127 )
128 except asyncio.TimeoutError:
129 logger.info(
130 "Reconnect timed out for %s...", entry.key[:4].hex(),
131 )
132 continue
133 except Exception as e:
134 logger.info(
135 "Reconnect failed for %s...: %s", entry.key[:4].hex(), e,
136 )
137 continue
138
139 if connected > 0:
140 logger.info("Reconnected to %d peers (total: %d)", connected, pool.active_count)
141
142
143async def routerinfo_broadcast_loop(
144 pool,
145 db_store_msg: bytes,
146 interval: float = 1800.0,
147) -> None:
148 """Broadcast our RouterInfo to all connected peers periodically.
149
150 Parameters
151 ----------
152 pool:
153 ConnectionPool instance.
154 db_store_msg:
155 Pre-built DatabaseStore I2NP message containing our RouterInfo.
156 interval:
157 Seconds between broadcasts (default 30 min).
158 """
159 logger.info("RouterInfo broadcast loop started (interval=%ds)", interval)
160 while True:
161 await asyncio.sleep(interval)
162 sent = 0
163 for peer_hash in list(pool.get_all_peer_hashes()):
164 conn = pool.get(peer_hash)
165 if conn is None:
166 continue
167 try:
168 await conn.send_i2np(db_store_msg)
169 sent += 1
170 except Exception:
171 logger.info("Broadcast failed for %s..., removing", peer_hash[:4].hex())
172 pool.remove(peer_hash)
173 if sent > 0:
174 logger.info("Broadcast RouterInfo to %d peers", sent)
175
176
177async def netdb_eviction_loop(
178 datastore,
179 sqlite_netdb=None,
180 interval: float = 600.0,
181 max_age_ms: int = 86_400_000,
182) -> None:
183 """Evict expired entries from the in-memory DataStore and SQLite cache.
184
185 Parameters
186 ----------
187 datastore:
188 In-memory DataStore instance.
189 sqlite_netdb:
190 Optional SqliteNetDB instance for disk persistence.
191 interval:
192 Seconds between eviction runs (default 10 min).
193 max_age_ms:
194 Maximum entry age in milliseconds (default 24h).
195 """
196 logger.info("NetDB eviction loop started (interval=%ds, max_age=%dh)", interval, max_age_ms // 3_600_000)
197 while True:
198 await asyncio.sleep(interval)
199
200 # Evict from in-memory store
201 now_ms = int(time.time() * 1000)
202 cutoff = now_ms - max_age_ms
203 expired_keys = [
204 k for k, e in list(zip(datastore.get_all_keys(), datastore.get_all()))
205 if e.received_ms < cutoff
206 ]
207 for k in expired_keys:
208 datastore.remove(k)
209
210 # Evict from SQLite
211 sqlite_count = 0
212 if sqlite_netdb is not None:
213 sqlite_count = sqlite_netdb.evict_expired()
214
215 total = len(expired_keys) + sqlite_count
216 if total > 0:
217 logger.info(
218 "Evicted %d expired NetDB entries (%d memory, %d disk)",
219 total, len(expired_keys), sqlite_count,
220 )