"""NetDB handler — wires DataStore with SearchOperation/StoreOperation.""" import os import time from i2p_netdb.datastore import DataStore, NetDBEntry, EntryType from i2p_netdb.operations import SearchOperation, StoreOperation class NetDBHandler: """High-level handler that coordinates local storage with Kademlia operations.""" def __init__(self, datastore: DataStore | None = None): self._datastore = datastore if datastore is not None else DataStore() self._active_searches: dict[str, tuple[SearchOperation, bytes]] = {} self._active_stores: dict[str, StoreOperation] = {} def handle_store(self, key: bytes, data: bytes) -> bool: """Store key->data in the local datastore. Returns True on success.""" entry = NetDBEntry( key=key, entry_type=EntryType.ROUTER_INFO, data=data, received_ms=int(time.time() * 1000), ) self._datastore.put(entry) return True def handle_lookup(self, key: bytes) -> bytes | None: """Lookup key in the local datastore. Returns data or None.""" entry = self._datastore.get(key) if entry is None: return None return entry.data def start_search(self, target_key: bytes, known_peers: dict[bytes, bytes] | None = None) -> str: """Create a new SearchOperation and return a search_id.""" search_id = os.urandom(8).hex() op = SearchOperation(target_key=target_key, known_peers=known_peers) op.start() self._active_searches[search_id] = (op, target_key) return search_id def on_search_reply(self, search_id: str, peer_hash: bytes, found_data: bytes | None, closer_peers: dict[bytes, bytes] | None = None) -> list[bytes]: """Forward reply to SearchOperation. Auto-stores result if found.""" op, target_key = self._active_searches[search_id] new_queries = op.on_reply(peer_hash, found_data, closer_peers) result = op.get_result() if op.is_complete() and result is not None: self.handle_store(target_key, result) return new_queries def get_search_result(self, search_id: str) -> bytes | None: """Return result if search is complete, else None.""" op, _ = self._active_searches[search_id] return op.get_result() def is_search_complete(self, search_id: str) -> bool: """Check whether a search has completed.""" op, _ = self._active_searches[search_id] return op.is_complete() def start_store_op(self, key: bytes, data: bytes, target_peers: list[bytes], redundancy: int = 3) -> str: """Create a StoreOperation and return a store_id.""" store_id = os.urandom(8).hex() op = StoreOperation(key=key, data=data, target_peers=target_peers, redundancy=redundancy) op.start() self._active_stores[store_id] = op return store_id def on_store_ack(self, store_id: str, peer_hash: bytes): """Forward an acknowledgment to the StoreOperation.""" self._active_stores[store_id].on_ack(peer_hash) def is_store_complete(self, store_id: str) -> bool: """Check whether a store operation has completed.""" return self._active_stores[store_id].is_complete()