A Python port of the Invisible Internet Project (I2P)
1"""NetDB Kademlia lookup/store operations — pure state machines, no network I/O."""
2
3from __future__ import annotations
4
5import time
6from dataclasses import dataclass, field
7from uuid import uuid4
8
9from i2p_netdb.floodfill import FloodfillManager
10
11
12def _xor_distance(a: bytes, b: bytes) -> int:
13 """Compute XOR distance between two 32-byte keys as an integer."""
14 return int.from_bytes(bytes(x ^ y for x, y in zip(a, b)), 'big')
15
16
17class SearchOperation:
18 """Iterative Kademlia search for a target key across known peers.
19
20 Pure state machine: call start() to get initial peers to query,
21 then feed replies via on_reply() to drive the search forward.
22 """
23
24 def __init__(self, target_key: bytes, known_peers: dict[bytes, bytes] | None = None,
25 k: int = 8):
26 self._target_key = target_key
27 self._known_peers: dict[bytes, bytes] = dict(known_peers) if known_peers else {}
28 self._k = k
29 self._queried: set[bytes] = set()
30 self._result: bytes | None = None
31 self._found = False
32 self._started = False
33
34 def start(self) -> list[bytes]:
35 """Select the k closest known peers to target and return their hashes for querying."""
36 self._started = True
37 return self._select_unqueried_closest()
38
39 def on_reply(self, peer_hash: bytes, found_data: bytes | None,
40 closer_peers: dict[bytes, bytes] | None = None) -> list[bytes]:
41 """Process a reply from a queried peer.
42
43 If found_data is not None, the search is complete.
44 If closer_peers are provided, merge them and return new peers to query.
45 """
46 self._queried.add(peer_hash)
47
48 if found_data is not None:
49 self._result = found_data
50 self._found = True
51 return []
52
53 if closer_peers:
54 for h, data in closer_peers.items():
55 if h not in self._known_peers:
56 self._known_peers[h] = data
57
58 return self._select_unqueried_closest()
59
60 def is_complete(self) -> bool:
61 """True if target was found or all reachable peers have been queried."""
62 if self._found:
63 return True
64 if not self._started:
65 return False
66 # Check if there are any unqueried peers in our k-closest set
67 unqueried = [h for h in self._known_peers if h not in self._queried]
68 return len(unqueried) == 0
69
70 def get_result(self) -> bytes | None:
71 """Return the found data, or None if not found."""
72 return self._result
73
74 def get_queried(self) -> set[bytes]:
75 """Return the set of peers already queried."""
76 return set(self._queried)
77
78 def _select_unqueried_closest(self) -> list[bytes]:
79 """Return up to k closest unqueried peers sorted by XOR distance."""
80 unqueried = [h for h in self._known_peers if h not in self._queried]
81 unqueried.sort(key=lambda h: _xor_distance(h, self._target_key))
82 return unqueried[:self._k]
83
84
85class StoreOperation:
86 """Kademlia store operation — send data to target peers and track confirmations.
87
88 Pure state machine: call start() to get store tuples, then feed
89 acknowledgments via on_ack().
90 """
91
92 def __init__(self, key: bytes, data: bytes, target_peers: list[bytes],
93 redundancy: int = 3):
94 self._key = key
95 self._data = data
96 self._target_peers = list(target_peers)
97 self._redundancy = redundancy
98 self._confirmed: set[bytes] = set()
99
100 def start(self) -> list[tuple[bytes, bytes, bytes]]:
101 """Return list of (peer_hash, key, data) tuples for sending store messages."""
102 return [(peer, self._key, self._data) for peer in self._target_peers]
103
104 def on_ack(self, peer_hash: bytes):
105 """Record a storage confirmation from a peer."""
106 self._confirmed.add(peer_hash)
107
108 def is_complete(self) -> bool:
109 """True if enough peers have confirmed storage."""
110 return len(self._confirmed) >= self._redundancy
111
112 def get_confirmed_peers(self) -> set[bytes]:
113 """Return set of peers that have confirmed storage."""
114 return set(self._confirmed)
115
116 def get_pending_peers(self) -> set[bytes]:
117 """Return set of peers that have not yet confirmed."""
118 return set(self._target_peers) - self._confirmed
119
120
121# ---------------------------------------------------------------------------
122# Enhanced NetDB operations: SearchJob, StoreJob, RouterInfoCache,
123# NetDBOperationManager
124# ---------------------------------------------------------------------------
125
126
127@dataclass
128class SearchJob:
129 """Tracks an in-progress NetDB search."""
130
131 search_id: str
132 target_hash: bytes
133 started_at: float
134 replies: list[dict] = field(default_factory=list)
135 complete: bool = False
136
137
138@dataclass
139class StoreJob:
140 """Tracks an in-progress NetDB store."""
141
142 store_id: str
143 data_hash: bytes
144 data: bytes
145 started_at: float
146 acked: bool = False
147
148
149class RouterInfoCache:
150 """Cache for RouterInfo dicts with time-based expiration."""
151
152 def __init__(self, ttl_seconds: float = 3600.0) -> None:
153 self._entries: dict[bytes, tuple[dict, float]] = {}
154 self._ttl = ttl_seconds
155
156 def get(self, peer_hash: bytes) -> dict | None:
157 """Return cached info dict, or None if missing or expired."""
158 entry = self._entries.get(peer_hash)
159 if entry is None:
160 return None
161 info, stored_at = entry
162 if time.monotonic() - stored_at > self._ttl:
163 return None
164 return info
165
166 def put(self, peer_hash: bytes, info: dict) -> None:
167 """Store a RouterInfo dict with the current timestamp."""
168 self._entries[peer_hash] = (info, time.monotonic())
169
170 def is_expired(self, peer_hash: bytes) -> bool:
171 """Check whether an entry exists and is expired."""
172 entry = self._entries.get(peer_hash)
173 if entry is None:
174 return False
175 _, stored_at = entry
176 return time.monotonic() - stored_at > self._ttl
177
178 def cleanup_expired(self) -> int:
179 """Remove all expired entries. Returns count of entries removed."""
180 now = time.monotonic()
181 expired_keys = [
182 k for k, (_, stored_at) in self._entries.items()
183 if now - stored_at > self._ttl
184 ]
185 for k in expired_keys:
186 del self._entries[k]
187 return len(expired_keys)
188
189
190class NetDBOperationManager:
191 """Orchestrates NetDB lookup and store operations."""
192
193 def __init__(self, floodfill_mgr: FloodfillManager, cache: RouterInfoCache) -> None:
194 self._floodfill_mgr = floodfill_mgr
195 self._cache = cache
196 self._searches: dict[str, SearchJob] = {}
197 self._stores: dict[str, StoreJob] = {}
198
199 def start_lookup(self, target_hash: bytes) -> SearchJob:
200 """Create a new search job for the given target hash."""
201 search_id = uuid4().hex
202 job = SearchJob(
203 search_id=search_id,
204 target_hash=target_hash,
205 started_at=time.monotonic(),
206 )
207 self._searches[search_id] = job
208 return job
209
210 def on_search_reply(self, search_id: str, from_peer: bytes, data: dict) -> None:
211 """Process a search reply from a peer."""
212 job = self._searches[search_id]
213 job.replies.append(data)
214
215 def start_store(self, data_hash: bytes, data: bytes) -> StoreJob:
216 """Create a new store job."""
217 store_id = uuid4().hex
218 job = StoreJob(
219 store_id=store_id,
220 data_hash=data_hash,
221 data=data,
222 started_at=time.monotonic(),
223 )
224 self._stores[store_id] = job
225 return job
226
227 def on_store_ack(self, store_id: str, from_peer: bytes) -> None:
228 """Record a store acknowledgment from a peer."""
229 job = self._stores[store_id]
230 job.acked = True
231
232 def get_router_info(self, peer_hash: bytes) -> dict | None:
233 """Look up a router info, checking the cache first."""
234 return self._cache.get(peer_hash)
235
236 def cleanup_stale(self, max_age_seconds: float = 30.0) -> int:
237 """Remove stale search and store jobs. Returns count removed."""
238 now = time.monotonic()
239 stale_searches = [
240 sid for sid, job in self._searches.items()
241 if now - job.started_at > max_age_seconds
242 ]
243 stale_stores = [
244 sid for sid, job in self._stores.items()
245 if now - job.started_at > max_age_seconds
246 ]
247 for sid in stale_searches:
248 del self._searches[sid]
249 for sid in stale_stores:
250 del self._stores[sid]
251 return len(stale_searches) + len(stale_stores)