A Python port of the Invisible Internet Project (I2P)
1"""I2NP message routing — inbound dispatch, outbound routing, and unified dispatcher."""
2
3from __future__ import annotations
4
5import struct
6from typing import Any, Callable
7
8
9class InboundMessageHandler:
10 """Dispatches inbound I2NP messages to registered handlers by message type."""
11
12 # I2NP message type constants
13 DATABASE_STORE = 1
14 DATABASE_LOOKUP = 2
15 DELIVERY_STATUS = 10
16 GARLIC = 11
17 TUNNEL_DATA = 18
18 TUNNEL_GATEWAY = 19
19
20 def __init__(self) -> None:
21 self._handlers: dict[int, Callable[..., Any]] = {}
22 self._store: dict[bytes, bytes] = {}
23
24 # Register default handlers
25 self.register(self.DATABASE_STORE, self.on_database_store)
26 self.register(self.DATABASE_LOOKUP, self.on_database_lookup)
27 self.register(self.DELIVERY_STATUS, self.on_delivery_status)
28 self.register(self.GARLIC, self.on_garlic)
29 self.register(self.TUNNEL_DATA, self.on_tunnel_data)
30 self.register(self.TUNNEL_GATEWAY, self.on_tunnel_data) # same extraction logic
31
32 def register(self, message_type: int, handler: Callable[..., Any]) -> None:
33 """Register a handler for a given I2NP message type."""
34 self._handlers[message_type] = handler
35
36 def handle(self, message_type: int, payload: bytes):
37 """Dispatch to the registered handler. Returns None if no handler is registered."""
38 handler = self._handlers.get(message_type)
39 if handler is None:
40 return None
41 return handler(payload)
42
43 def on_database_store(self, payload: bytes) -> None:
44 """Store an entry keyed by the first 32 bytes of payload."""
45 key = payload[:32]
46 value = payload[32:]
47 self._store[key] = value
48
49 def on_database_lookup(self, payload: bytes):
50 """Look up an entry by the first 32 bytes of payload. Returns the entry or None."""
51 key = payload[:32]
52 return self._store.get(key)
53
54 def on_garlic(self, payload: bytes) -> list[bytes]:
55 """Placeholder: return the payload as a single clove."""
56 return [payload]
57
58 def on_delivery_status(self, payload: bytes) -> int:
59 """Extract and return the 4-byte message ID from the start of the payload."""
60 return int.from_bytes(payload[:4], "big")
61
62 def on_tunnel_data(self, payload: bytes) -> tuple[int, bytes]:
63 """Extract tunnel_id (first 4 bytes) and the remaining data."""
64 tunnel_id = int.from_bytes(payload[:4], "big")
65 data = payload[4:]
66 return (tunnel_id, data)
67
68
69class OutboundMessageRouter:
70 """Routes outbound messages based on delivery type."""
71
72 LOCAL = 0
73 ROUTER = 1
74 TUNNEL = 2
75 DESTINATION = 3
76
77 def __init__(self) -> None:
78 pass
79
80 def route(self, delivery_type: int, payload: bytes, **kwargs) -> dict:
81 """Route a message based on delivery type. Returns a routing descriptor dict."""
82 if delivery_type == self.LOCAL:
83 return {"type": "local", "payload": payload}
84 elif delivery_type == self.ROUTER:
85 return {
86 "type": "router",
87 "router_hash": kwargs["router_hash"],
88 "payload": payload,
89 }
90 elif delivery_type == self.TUNNEL:
91 return {
92 "type": "tunnel",
93 "tunnel_id": kwargs["tunnel_id"],
94 "gateway": kwargs["gateway"],
95 "payload": payload,
96 }
97 elif delivery_type == self.DESTINATION:
98 return {
99 "type": "destination",
100 "destination": kwargs["destination"],
101 "payload": payload,
102 }
103 else:
104 raise ValueError(f"Unknown delivery type: {delivery_type}")
105
106
107class MessageDispatcher:
108 """Unified dispatcher that delegates to inbound handler and outbound router."""
109
110 def __init__(self, inbound: InboundMessageHandler, outbound: OutboundMessageRouter) -> None:
111 self._inbound = inbound
112 self._outbound = outbound
113
114 def dispatch_inbound(self, message_type: int, payload: bytes):
115 """Delegate to the inbound message handler."""
116 return self._inbound.handle(message_type, payload)
117
118 def dispatch_outbound(self, delivery_type: int, payload: bytes, **kwargs) -> dict:
119 """Delegate to the outbound message router."""
120 return self._outbound.route(delivery_type, payload, **kwargs)
121
122 def register_inbound_handler(self, message_type: int, handler: Callable[..., Any]) -> None:
123 """Register a handler on the inbound handler."""
124 self._inbound.register(message_type, handler)