"""ClientManager — manages client sessions and message delivery. Ported from net.i2p.router.ClientManagerFacadeImpl. Tracks registered client sessions (by destination hash), delivers inbound messages, and initiates outbound sends. """ from __future__ import annotations import logging from dataclasses import dataclass, field from typing import Callable logger = logging.getLogger(__name__) @dataclass class ClientSession: """Represents one client's active session.""" dest_hash: bytes on_message: Callable[[bytes], None] on_status: Callable[[int, int], None] | None = None class ClientManager: """Manages client sessions and message delivery. Each client registers with a destination hash and receives callbacks for incoming messages and delivery status updates. """ def __init__(self) -> None: self._sessions: dict[bytes, ClientSession] = {} def register_session(self, session: ClientSession) -> None: """Register a client session.""" self._sessions[session.dest_hash] = session logger.info("Client session registered: %s", session.dest_hash.hex()[:16]) def unregister_session(self, dest_hash: bytes) -> None: """Unregister a client session.""" removed = self._sessions.pop(dest_hash, None) if removed: logger.info("Client session unregistered: %s", dest_hash.hex()[:16]) def is_local(self, dest_hash: bytes) -> bool: """Check if a destination is a local client.""" return dest_hash in self._sessions def message_received(self, dest_hash: bytes, payload: bytes) -> bool: """Deliver an incoming message to a client. Returns True if the message was delivered, False if no session found. """ session = self._sessions.get(dest_hash) if session is None: logger.debug("No session for dest %s, dropping message", dest_hash.hex()[:16]) return False try: session.on_message(payload) return True except Exception: logger.error("Error delivering message to %s", dest_hash.hex()[:16], exc_info=True) return False def report_delivery_status(self, dest_hash: bytes, msg_id: int, status: int) -> bool: """Notify a client of delivery status. Returns True if the notification was delivered. """ session = self._sessions.get(dest_hash) if session is None or session.on_status is None: return False try: session.on_status(msg_id, status) return True except Exception: logger.error("Error in status callback for %s", dest_hash.hex()[:16], exc_info=True) return False @property def session_count(self) -> int: return len(self._sessions) def get_session(self, dest_hash: bytes) -> ClientSession | None: return self._sessions.get(dest_hash)