"""ReplyHandler — match DeliveryStatusMessages to pending ACKs. Ported from net.i2p.router.message.OutboundClientMessageOneShotJob (reply selector and delivery tracking logic). """ from __future__ import annotations import logging from dataclasses import dataclass, field from typing import Callable logger = logging.getLogger(__name__) @dataclass class ReplyContext: """State associated with a pending ACK.""" token: int message_id: int from_hash: bytes to_hash: bytes sent_ms: int timeout_ms: int = 30_000 on_success: Callable | None = None on_timeout: Callable | None = None class ReplySelector: """Matches incoming DeliveryStatusMessages to pending outbound ACKs. Each pending message is registered with a unique token. When a DeliveryStatusMessage arrives with a matching token, the entry is consumed (one-shot match). """ def __init__(self) -> None: self._pending: dict[int, ReplyContext] = {} def register(self, token: int, context: ReplyContext) -> None: """Register a pending reply expectation.""" self._pending[token] = context def match(self, msg_id: int) -> ReplyContext | None: """Try to match an incoming message ID to a pending context. Consumes the entry on match (one-shot). Returns None if no match. """ ctx = self._pending.pop(msg_id, None) return ctx def expire_old(self, now_ms: int) -> list[ReplyContext]: """Remove and return all timed-out entries. Fires on_timeout callback for each expired entry. """ expired = [] to_remove = [] for token, ctx in self._pending.items(): if (now_ms - ctx.sent_ms) > ctx.timeout_ms: to_remove.append(token) expired.append(ctx) for token in to_remove: del self._pending[token] for ctx in expired: if ctx.on_timeout: try: ctx.on_timeout() except Exception: logger.warning("Error in timeout callback for token %d", ctx.token) return expired @property def pending_count(self) -> int: return len(self._pending) class DeliveryStatusTracker: """Higher-level delivery tracking using ReplySelector. Provides a simpler interface for registering pending messages and processing incoming status updates. """ def __init__(self) -> None: self._selector = ReplySelector() def register_pending(self, context: ReplyContext) -> None: """Register a pending delivery.""" self._selector.register(context.token, context) def on_status_received(self, msg_id: int) -> ReplyContext | None: """Process an incoming DeliveryStatusMessage. Returns the matched context (fires on_success callback), or None if no match. """ ctx = self._selector.match(msg_id) if ctx is not None and ctx.on_success: try: ctx.on_success() except Exception: logger.warning("Error in success callback for token %d", ctx.token) return ctx def check_timeouts(self, now_ms: int) -> list[ReplyContext]: """Check for and handle timed-out deliveries.""" return self._selector.expire_old(now_ms) @property def pending_count(self) -> int: return self._selector.pending_count