A Python port of the Invisible Internet Project (I2P)
1"""ClientManager — manages client sessions and message delivery.
2
3Ported from net.i2p.router.ClientManagerFacadeImpl.
4
5Tracks registered client sessions (by destination hash),
6delivers inbound messages, and initiates outbound sends.
7"""
8
9from __future__ import annotations
10
11import logging
12from dataclasses import dataclass, field
13from typing import Callable
14
15logger = logging.getLogger(__name__)
16
17
18@dataclass
19class ClientSession:
20 """Represents one client's active session."""
21 dest_hash: bytes
22 on_message: Callable[[bytes], None]
23 on_status: Callable[[int, int], None] | None = None
24
25
26class ClientManager:
27 """Manages client sessions and message delivery.
28
29 Each client registers with a destination hash and receives
30 callbacks for incoming messages and delivery status updates.
31 """
32
33 def __init__(self) -> None:
34 self._sessions: dict[bytes, ClientSession] = {}
35
36 def register_session(self, session: ClientSession) -> None:
37 """Register a client session."""
38 self._sessions[session.dest_hash] = session
39 logger.info("Client session registered: %s", session.dest_hash.hex()[:16])
40
41 def unregister_session(self, dest_hash: bytes) -> None:
42 """Unregister a client session."""
43 removed = self._sessions.pop(dest_hash, None)
44 if removed:
45 logger.info("Client session unregistered: %s", dest_hash.hex()[:16])
46
47 def is_local(self, dest_hash: bytes) -> bool:
48 """Check if a destination is a local client."""
49 return dest_hash in self._sessions
50
51 def message_received(self, dest_hash: bytes, payload: bytes) -> bool:
52 """Deliver an incoming message to a client.
53
54 Returns True if the message was delivered, False if no session found.
55 """
56 session = self._sessions.get(dest_hash)
57 if session is None:
58 logger.debug("No session for dest %s, dropping message", dest_hash.hex()[:16])
59 return False
60 try:
61 session.on_message(payload)
62 return True
63 except Exception:
64 logger.error("Error delivering message to %s", dest_hash.hex()[:16],
65 exc_info=True)
66 return False
67
68 def report_delivery_status(self, dest_hash: bytes, msg_id: int, status: int) -> bool:
69 """Notify a client of delivery status.
70
71 Returns True if the notification was delivered.
72 """
73 session = self._sessions.get(dest_hash)
74 if session is None or session.on_status is None:
75 return False
76 try:
77 session.on_status(msg_id, status)
78 return True
79 except Exception:
80 logger.error("Error in status callback for %s", dest_hash.hex()[:16],
81 exc_info=True)
82 return False
83
84 @property
85 def session_count(self) -> int:
86 return len(self._sessions)
87
88 def get_session(self, dest_hash: bytes) -> ClientSession | None:
89 return self._sessions.get(dest_hash)