A Python port of the Invisible Internet Project (I2P)
at main 251 lines 8.7 kB view raw
1"""NetDB Kademlia lookup/store operations — pure state machines, no network I/O.""" 2 3from __future__ import annotations 4 5import time 6from dataclasses import dataclass, field 7from uuid import uuid4 8 9from i2p_netdb.floodfill import FloodfillManager 10 11 12def _xor_distance(a: bytes, b: bytes) -> int: 13 """Compute XOR distance between two 32-byte keys as an integer.""" 14 return int.from_bytes(bytes(x ^ y for x, y in zip(a, b)), 'big') 15 16 17class SearchOperation: 18 """Iterative Kademlia search for a target key across known peers. 19 20 Pure state machine: call start() to get initial peers to query, 21 then feed replies via on_reply() to drive the search forward. 22 """ 23 24 def __init__(self, target_key: bytes, known_peers: dict[bytes, bytes] | None = None, 25 k: int = 8): 26 self._target_key = target_key 27 self._known_peers: dict[bytes, bytes] = dict(known_peers) if known_peers else {} 28 self._k = k 29 self._queried: set[bytes] = set() 30 self._result: bytes | None = None 31 self._found = False 32 self._started = False 33 34 def start(self) -> list[bytes]: 35 """Select the k closest known peers to target and return their hashes for querying.""" 36 self._started = True 37 return self._select_unqueried_closest() 38 39 def on_reply(self, peer_hash: bytes, found_data: bytes | None, 40 closer_peers: dict[bytes, bytes] | None = None) -> list[bytes]: 41 """Process a reply from a queried peer. 42 43 If found_data is not None, the search is complete. 44 If closer_peers are provided, merge them and return new peers to query. 45 """ 46 self._queried.add(peer_hash) 47 48 if found_data is not None: 49 self._result = found_data 50 self._found = True 51 return [] 52 53 if closer_peers: 54 for h, data in closer_peers.items(): 55 if h not in self._known_peers: 56 self._known_peers[h] = data 57 58 return self._select_unqueried_closest() 59 60 def is_complete(self) -> bool: 61 """True if target was found or all reachable peers have been queried.""" 62 if self._found: 63 return True 64 if not self._started: 65 return False 66 # Check if there are any unqueried peers in our k-closest set 67 unqueried = [h for h in self._known_peers if h not in self._queried] 68 return len(unqueried) == 0 69 70 def get_result(self) -> bytes | None: 71 """Return the found data, or None if not found.""" 72 return self._result 73 74 def get_queried(self) -> set[bytes]: 75 """Return the set of peers already queried.""" 76 return set(self._queried) 77 78 def _select_unqueried_closest(self) -> list[bytes]: 79 """Return up to k closest unqueried peers sorted by XOR distance.""" 80 unqueried = [h for h in self._known_peers if h not in self._queried] 81 unqueried.sort(key=lambda h: _xor_distance(h, self._target_key)) 82 return unqueried[:self._k] 83 84 85class StoreOperation: 86 """Kademlia store operation — send data to target peers and track confirmations. 87 88 Pure state machine: call start() to get store tuples, then feed 89 acknowledgments via on_ack(). 90 """ 91 92 def __init__(self, key: bytes, data: bytes, target_peers: list[bytes], 93 redundancy: int = 3): 94 self._key = key 95 self._data = data 96 self._target_peers = list(target_peers) 97 self._redundancy = redundancy 98 self._confirmed: set[bytes] = set() 99 100 def start(self) -> list[tuple[bytes, bytes, bytes]]: 101 """Return list of (peer_hash, key, data) tuples for sending store messages.""" 102 return [(peer, self._key, self._data) for peer in self._target_peers] 103 104 def on_ack(self, peer_hash: bytes): 105 """Record a storage confirmation from a peer.""" 106 self._confirmed.add(peer_hash) 107 108 def is_complete(self) -> bool: 109 """True if enough peers have confirmed storage.""" 110 return len(self._confirmed) >= self._redundancy 111 112 def get_confirmed_peers(self) -> set[bytes]: 113 """Return set of peers that have confirmed storage.""" 114 return set(self._confirmed) 115 116 def get_pending_peers(self) -> set[bytes]: 117 """Return set of peers that have not yet confirmed.""" 118 return set(self._target_peers) - self._confirmed 119 120 121# --------------------------------------------------------------------------- 122# Enhanced NetDB operations: SearchJob, StoreJob, RouterInfoCache, 123# NetDBOperationManager 124# --------------------------------------------------------------------------- 125 126 127@dataclass 128class SearchJob: 129 """Tracks an in-progress NetDB search.""" 130 131 search_id: str 132 target_hash: bytes 133 started_at: float 134 replies: list[dict] = field(default_factory=list) 135 complete: bool = False 136 137 138@dataclass 139class StoreJob: 140 """Tracks an in-progress NetDB store.""" 141 142 store_id: str 143 data_hash: bytes 144 data: bytes 145 started_at: float 146 acked: bool = False 147 148 149class RouterInfoCache: 150 """Cache for RouterInfo dicts with time-based expiration.""" 151 152 def __init__(self, ttl_seconds: float = 3600.0) -> None: 153 self._entries: dict[bytes, tuple[dict, float]] = {} 154 self._ttl = ttl_seconds 155 156 def get(self, peer_hash: bytes) -> dict | None: 157 """Return cached info dict, or None if missing or expired.""" 158 entry = self._entries.get(peer_hash) 159 if entry is None: 160 return None 161 info, stored_at = entry 162 if time.monotonic() - stored_at > self._ttl: 163 return None 164 return info 165 166 def put(self, peer_hash: bytes, info: dict) -> None: 167 """Store a RouterInfo dict with the current timestamp.""" 168 self._entries[peer_hash] = (info, time.monotonic()) 169 170 def is_expired(self, peer_hash: bytes) -> bool: 171 """Check whether an entry exists and is expired.""" 172 entry = self._entries.get(peer_hash) 173 if entry is None: 174 return False 175 _, stored_at = entry 176 return time.monotonic() - stored_at > self._ttl 177 178 def cleanup_expired(self) -> int: 179 """Remove all expired entries. Returns count of entries removed.""" 180 now = time.monotonic() 181 expired_keys = [ 182 k for k, (_, stored_at) in self._entries.items() 183 if now - stored_at > self._ttl 184 ] 185 for k in expired_keys: 186 del self._entries[k] 187 return len(expired_keys) 188 189 190class NetDBOperationManager: 191 """Orchestrates NetDB lookup and store operations.""" 192 193 def __init__(self, floodfill_mgr: FloodfillManager, cache: RouterInfoCache) -> None: 194 self._floodfill_mgr = floodfill_mgr 195 self._cache = cache 196 self._searches: dict[str, SearchJob] = {} 197 self._stores: dict[str, StoreJob] = {} 198 199 def start_lookup(self, target_hash: bytes) -> SearchJob: 200 """Create a new search job for the given target hash.""" 201 search_id = uuid4().hex 202 job = SearchJob( 203 search_id=search_id, 204 target_hash=target_hash, 205 started_at=time.monotonic(), 206 ) 207 self._searches[search_id] = job 208 return job 209 210 def on_search_reply(self, search_id: str, from_peer: bytes, data: dict) -> None: 211 """Process a search reply from a peer.""" 212 job = self._searches[search_id] 213 job.replies.append(data) 214 215 def start_store(self, data_hash: bytes, data: bytes) -> StoreJob: 216 """Create a new store job.""" 217 store_id = uuid4().hex 218 job = StoreJob( 219 store_id=store_id, 220 data_hash=data_hash, 221 data=data, 222 started_at=time.monotonic(), 223 ) 224 self._stores[store_id] = job 225 return job 226 227 def on_store_ack(self, store_id: str, from_peer: bytes) -> None: 228 """Record a store acknowledgment from a peer.""" 229 job = self._stores[store_id] 230 job.acked = True 231 232 def get_router_info(self, peer_hash: bytes) -> dict | None: 233 """Look up a router info, checking the cache first.""" 234 return self._cache.get(peer_hash) 235 236 def cleanup_stale(self, max_age_seconds: float = 30.0) -> int: 237 """Remove stale search and store jobs. Returns count removed.""" 238 now = time.monotonic() 239 stale_searches = [ 240 sid for sid, job in self._searches.items() 241 if now - job.started_at > max_age_seconds 242 ] 243 stale_stores = [ 244 sid for sid, job in self._stores.items() 245 if now - job.started_at > max_age_seconds 246 ] 247 for sid in stale_searches: 248 del self._searches[sid] 249 for sid in stale_stores: 250 del self._stores[sid] 251 return len(stale_searches) + len(stale_stores)