A Python port of the Invisible Internet Project (I2P)
1"""NetDB handler — wires DataStore with SearchOperation/StoreOperation."""
2
3import os
4import time
5
6from i2p_netdb.datastore import DataStore, NetDBEntry, EntryType
7from i2p_netdb.operations import SearchOperation, StoreOperation
8
9
10class NetDBHandler:
11 """High-level handler that coordinates local storage with Kademlia operations."""
12
13 def __init__(self, datastore: DataStore | None = None):
14 self._datastore = datastore if datastore is not None else DataStore()
15 self._active_searches: dict[str, tuple[SearchOperation, bytes]] = {}
16 self._active_stores: dict[str, StoreOperation] = {}
17
18 def handle_store(self, key: bytes, data: bytes) -> bool:
19 """Store key->data in the local datastore. Returns True on success."""
20 entry = NetDBEntry(
21 key=key,
22 entry_type=EntryType.ROUTER_INFO,
23 data=data,
24 received_ms=int(time.time() * 1000),
25 )
26 self._datastore.put(entry)
27 return True
28
29 def handle_lookup(self, key: bytes) -> bytes | None:
30 """Lookup key in the local datastore. Returns data or None."""
31 entry = self._datastore.get(key)
32 if entry is None:
33 return None
34 return entry.data
35
36 def start_search(self, target_key: bytes,
37 known_peers: dict[bytes, bytes] | None = None) -> str:
38 """Create a new SearchOperation and return a search_id."""
39 search_id = os.urandom(8).hex()
40 op = SearchOperation(target_key=target_key, known_peers=known_peers)
41 op.start()
42 self._active_searches[search_id] = (op, target_key)
43 return search_id
44
45 def on_search_reply(self, search_id: str, peer_hash: bytes,
46 found_data: bytes | None,
47 closer_peers: dict[bytes, bytes] | None = None) -> list[bytes]:
48 """Forward reply to SearchOperation. Auto-stores result if found."""
49 op, target_key = self._active_searches[search_id]
50 new_queries = op.on_reply(peer_hash, found_data, closer_peers)
51 result = op.get_result()
52 if op.is_complete() and result is not None:
53 self.handle_store(target_key, result)
54 return new_queries
55
56 def get_search_result(self, search_id: str) -> bytes | None:
57 """Return result if search is complete, else None."""
58 op, _ = self._active_searches[search_id]
59 return op.get_result()
60
61 def is_search_complete(self, search_id: str) -> bool:
62 """Check whether a search has completed."""
63 op, _ = self._active_searches[search_id]
64 return op.is_complete()
65
66 def start_store_op(self, key: bytes, data: bytes,
67 target_peers: list[bytes], redundancy: int = 3) -> str:
68 """Create a StoreOperation and return a store_id."""
69 store_id = os.urandom(8).hex()
70 op = StoreOperation(key=key, data=data, target_peers=target_peers,
71 redundancy=redundancy)
72 op.start()
73 self._active_stores[store_id] = op
74 return store_id
75
76 def on_store_ack(self, store_id: str, peer_hash: bytes):
77 """Forward an acknowledgment to the StoreOperation."""
78 self._active_stores[store_id].on_ack(peer_hash)
79
80 def is_store_complete(self, store_id: str) -> bool:
81 """Check whether a store operation has completed."""
82 return self._active_stores[store_id].is_complete()