A Python port of the Invisible Internet Project (I2P)
at main 116 lines 3.5 kB view raw
1"""ReplyHandler — match DeliveryStatusMessages to pending ACKs. 2 3Ported from net.i2p.router.message.OutboundClientMessageOneShotJob 4(reply selector and delivery tracking logic). 5""" 6 7from __future__ import annotations 8 9import logging 10from dataclasses import dataclass, field 11from typing import Callable 12 13logger = logging.getLogger(__name__) 14 15 16@dataclass 17class ReplyContext: 18 """State associated with a pending ACK.""" 19 token: int 20 message_id: int 21 from_hash: bytes 22 to_hash: bytes 23 sent_ms: int 24 timeout_ms: int = 30_000 25 on_success: Callable | None = None 26 on_timeout: Callable | None = None 27 28 29class ReplySelector: 30 """Matches incoming DeliveryStatusMessages to pending outbound ACKs. 31 32 Each pending message is registered with a unique token. 33 When a DeliveryStatusMessage arrives with a matching token, 34 the entry is consumed (one-shot match). 35 """ 36 37 def __init__(self) -> None: 38 self._pending: dict[int, ReplyContext] = {} 39 40 def register(self, token: int, context: ReplyContext) -> None: 41 """Register a pending reply expectation.""" 42 self._pending[token] = context 43 44 def match(self, msg_id: int) -> ReplyContext | None: 45 """Try to match an incoming message ID to a pending context. 46 47 Consumes the entry on match (one-shot). 48 Returns None if no match. 49 """ 50 ctx = self._pending.pop(msg_id, None) 51 return ctx 52 53 def expire_old(self, now_ms: int) -> list[ReplyContext]: 54 """Remove and return all timed-out entries. 55 56 Fires on_timeout callback for each expired entry. 57 """ 58 expired = [] 59 to_remove = [] 60 for token, ctx in self._pending.items(): 61 if (now_ms - ctx.sent_ms) > ctx.timeout_ms: 62 to_remove.append(token) 63 expired.append(ctx) 64 65 for token in to_remove: 66 del self._pending[token] 67 68 for ctx in expired: 69 if ctx.on_timeout: 70 try: 71 ctx.on_timeout() 72 except Exception: 73 logger.warning("Error in timeout callback for token %d", ctx.token) 74 75 return expired 76 77 @property 78 def pending_count(self) -> int: 79 return len(self._pending) 80 81 82class DeliveryStatusTracker: 83 """Higher-level delivery tracking using ReplySelector. 84 85 Provides a simpler interface for registering pending messages 86 and processing incoming status updates. 87 """ 88 89 def __init__(self) -> None: 90 self._selector = ReplySelector() 91 92 def register_pending(self, context: ReplyContext) -> None: 93 """Register a pending delivery.""" 94 self._selector.register(context.token, context) 95 96 def on_status_received(self, msg_id: int) -> ReplyContext | None: 97 """Process an incoming DeliveryStatusMessage. 98 99 Returns the matched context (fires on_success callback), 100 or None if no match. 101 """ 102 ctx = self._selector.match(msg_id) 103 if ctx is not None and ctx.on_success: 104 try: 105 ctx.on_success() 106 except Exception: 107 logger.warning("Error in success callback for token %d", ctx.token) 108 return ctx 109 110 def check_timeouts(self, now_ms: int) -> list[ReplyContext]: 111 """Check for and handle timed-out deliveries.""" 112 return self._selector.expire_old(now_ms) 113 114 @property 115 def pending_count(self) -> int: 116 return self._selector.pending_count