A Python port of the Invisible Internet Project (I2P)
at main 82 lines 3.4 kB view raw
1"""NetDB handler — wires DataStore with SearchOperation/StoreOperation.""" 2 3import os 4import time 5 6from i2p_netdb.datastore import DataStore, NetDBEntry, EntryType 7from i2p_netdb.operations import SearchOperation, StoreOperation 8 9 10class NetDBHandler: 11 """High-level handler that coordinates local storage with Kademlia operations.""" 12 13 def __init__(self, datastore: DataStore | None = None): 14 self._datastore = datastore if datastore is not None else DataStore() 15 self._active_searches: dict[str, tuple[SearchOperation, bytes]] = {} 16 self._active_stores: dict[str, StoreOperation] = {} 17 18 def handle_store(self, key: bytes, data: bytes) -> bool: 19 """Store key->data in the local datastore. Returns True on success.""" 20 entry = NetDBEntry( 21 key=key, 22 entry_type=EntryType.ROUTER_INFO, 23 data=data, 24 received_ms=int(time.time() * 1000), 25 ) 26 self._datastore.put(entry) 27 return True 28 29 def handle_lookup(self, key: bytes) -> bytes | None: 30 """Lookup key in the local datastore. Returns data or None.""" 31 entry = self._datastore.get(key) 32 if entry is None: 33 return None 34 return entry.data 35 36 def start_search(self, target_key: bytes, 37 known_peers: dict[bytes, bytes] | None = None) -> str: 38 """Create a new SearchOperation and return a search_id.""" 39 search_id = os.urandom(8).hex() 40 op = SearchOperation(target_key=target_key, known_peers=known_peers) 41 op.start() 42 self._active_searches[search_id] = (op, target_key) 43 return search_id 44 45 def on_search_reply(self, search_id: str, peer_hash: bytes, 46 found_data: bytes | None, 47 closer_peers: dict[bytes, bytes] | None = None) -> list[bytes]: 48 """Forward reply to SearchOperation. Auto-stores result if found.""" 49 op, target_key = self._active_searches[search_id] 50 new_queries = op.on_reply(peer_hash, found_data, closer_peers) 51 result = op.get_result() 52 if op.is_complete() and result is not None: 53 self.handle_store(target_key, result) 54 return new_queries 55 56 def get_search_result(self, search_id: str) -> bytes | None: 57 """Return result if search is complete, else None.""" 58 op, _ = self._active_searches[search_id] 59 return op.get_result() 60 61 def is_search_complete(self, search_id: str) -> bool: 62 """Check whether a search has completed.""" 63 op, _ = self._active_searches[search_id] 64 return op.is_complete() 65 66 def start_store_op(self, key: bytes, data: bytes, 67 target_peers: list[bytes], redundancy: int = 3) -> str: 68 """Create a StoreOperation and return a store_id.""" 69 store_id = os.urandom(8).hex() 70 op = StoreOperation(key=key, data=data, target_peers=target_peers, 71 redundancy=redundancy) 72 op.start() 73 self._active_stores[store_id] = op 74 return store_id 75 76 def on_store_ack(self, store_id: str, peer_hash: bytes): 77 """Forward an acknowledgment to the StoreOperation.""" 78 self._active_stores[store_id].on_ack(peer_hash) 79 80 def is_store_complete(self, store_id: str) -> bool: 81 """Check whether a store operation has completed.""" 82 return self._active_stores[store_id].is_complete()