"""NetDB Kademlia lookup/store operations — pure state machines, no network I/O.""" from __future__ import annotations import time from dataclasses import dataclass, field from uuid import uuid4 from i2p_netdb.floodfill import FloodfillManager def _xor_distance(a: bytes, b: bytes) -> int: """Compute XOR distance between two 32-byte keys as an integer.""" return int.from_bytes(bytes(x ^ y for x, y in zip(a, b)), 'big') class SearchOperation: """Iterative Kademlia search for a target key across known peers. Pure state machine: call start() to get initial peers to query, then feed replies via on_reply() to drive the search forward. """ def __init__(self, target_key: bytes, known_peers: dict[bytes, bytes] | None = None, k: int = 8): self._target_key = target_key self._known_peers: dict[bytes, bytes] = dict(known_peers) if known_peers else {} self._k = k self._queried: set[bytes] = set() self._result: bytes | None = None self._found = False self._started = False def start(self) -> list[bytes]: """Select the k closest known peers to target and return their hashes for querying.""" self._started = True return self._select_unqueried_closest() def on_reply(self, peer_hash: bytes, found_data: bytes | None, closer_peers: dict[bytes, bytes] | None = None) -> list[bytes]: """Process a reply from a queried peer. If found_data is not None, the search is complete. If closer_peers are provided, merge them and return new peers to query. """ self._queried.add(peer_hash) if found_data is not None: self._result = found_data self._found = True return [] if closer_peers: for h, data in closer_peers.items(): if h not in self._known_peers: self._known_peers[h] = data return self._select_unqueried_closest() def is_complete(self) -> bool: """True if target was found or all reachable peers have been queried.""" if self._found: return True if not self._started: return False # Check if there are any unqueried peers in our k-closest set unqueried = [h for h in self._known_peers if h not in self._queried] return len(unqueried) == 0 def get_result(self) -> bytes | None: """Return the found data, or None if not found.""" return self._result def get_queried(self) -> set[bytes]: """Return the set of peers already queried.""" return set(self._queried) def _select_unqueried_closest(self) -> list[bytes]: """Return up to k closest unqueried peers sorted by XOR distance.""" unqueried = [h for h in self._known_peers if h not in self._queried] unqueried.sort(key=lambda h: _xor_distance(h, self._target_key)) return unqueried[:self._k] class StoreOperation: """Kademlia store operation — send data to target peers and track confirmations. Pure state machine: call start() to get store tuples, then feed acknowledgments via on_ack(). """ def __init__(self, key: bytes, data: bytes, target_peers: list[bytes], redundancy: int = 3): self._key = key self._data = data self._target_peers = list(target_peers) self._redundancy = redundancy self._confirmed: set[bytes] = set() def start(self) -> list[tuple[bytes, bytes, bytes]]: """Return list of (peer_hash, key, data) tuples for sending store messages.""" return [(peer, self._key, self._data) for peer in self._target_peers] def on_ack(self, peer_hash: bytes): """Record a storage confirmation from a peer.""" self._confirmed.add(peer_hash) def is_complete(self) -> bool: """True if enough peers have confirmed storage.""" return len(self._confirmed) >= self._redundancy def get_confirmed_peers(self) -> set[bytes]: """Return set of peers that have confirmed storage.""" return set(self._confirmed) def get_pending_peers(self) -> set[bytes]: """Return set of peers that have not yet confirmed.""" return set(self._target_peers) - self._confirmed # --------------------------------------------------------------------------- # Enhanced NetDB operations: SearchJob, StoreJob, RouterInfoCache, # NetDBOperationManager # --------------------------------------------------------------------------- @dataclass class SearchJob: """Tracks an in-progress NetDB search.""" search_id: str target_hash: bytes started_at: float replies: list[dict] = field(default_factory=list) complete: bool = False @dataclass class StoreJob: """Tracks an in-progress NetDB store.""" store_id: str data_hash: bytes data: bytes started_at: float acked: bool = False class RouterInfoCache: """Cache for RouterInfo dicts with time-based expiration.""" def __init__(self, ttl_seconds: float = 3600.0) -> None: self._entries: dict[bytes, tuple[dict, float]] = {} self._ttl = ttl_seconds def get(self, peer_hash: bytes) -> dict | None: """Return cached info dict, or None if missing or expired.""" entry = self._entries.get(peer_hash) if entry is None: return None info, stored_at = entry if time.monotonic() - stored_at > self._ttl: return None return info def put(self, peer_hash: bytes, info: dict) -> None: """Store a RouterInfo dict with the current timestamp.""" self._entries[peer_hash] = (info, time.monotonic()) def is_expired(self, peer_hash: bytes) -> bool: """Check whether an entry exists and is expired.""" entry = self._entries.get(peer_hash) if entry is None: return False _, stored_at = entry return time.monotonic() - stored_at > self._ttl def cleanup_expired(self) -> int: """Remove all expired entries. Returns count of entries removed.""" now = time.monotonic() expired_keys = [ k for k, (_, stored_at) in self._entries.items() if now - stored_at > self._ttl ] for k in expired_keys: del self._entries[k] return len(expired_keys) class NetDBOperationManager: """Orchestrates NetDB lookup and store operations.""" def __init__(self, floodfill_mgr: FloodfillManager, cache: RouterInfoCache) -> None: self._floodfill_mgr = floodfill_mgr self._cache = cache self._searches: dict[str, SearchJob] = {} self._stores: dict[str, StoreJob] = {} def start_lookup(self, target_hash: bytes) -> SearchJob: """Create a new search job for the given target hash.""" search_id = uuid4().hex job = SearchJob( search_id=search_id, target_hash=target_hash, started_at=time.monotonic(), ) self._searches[search_id] = job return job def on_search_reply(self, search_id: str, from_peer: bytes, data: dict) -> None: """Process a search reply from a peer.""" job = self._searches[search_id] job.replies.append(data) def start_store(self, data_hash: bytes, data: bytes) -> StoreJob: """Create a new store job.""" store_id = uuid4().hex job = StoreJob( store_id=store_id, data_hash=data_hash, data=data, started_at=time.monotonic(), ) self._stores[store_id] = job return job def on_store_ack(self, store_id: str, from_peer: bytes) -> None: """Record a store acknowledgment from a peer.""" job = self._stores[store_id] job.acked = True def get_router_info(self, peer_hash: bytes) -> dict | None: """Look up a router info, checking the cache first.""" return self._cache.get(peer_hash) def cleanup_stale(self, max_age_seconds: float = 30.0) -> int: """Remove stale search and store jobs. Returns count removed.""" now = time.monotonic() stale_searches = [ sid for sid, job in self._searches.items() if now - job.started_at > max_age_seconds ] stale_stores = [ sid for sid, job in self._stores.items() if now - job.started_at > max_age_seconds ] for sid in stale_searches: del self._searches[sid] for sid in stale_stores: del self._stores[sid] return len(stale_searches) + len(stale_stores)