"""Packet scheduler — retransmit timer, congestion control, and state-aware scheduling. Ported from net.i2p.client.streaming.impl scheduling classes. """ import enum from i2p_streaming.connection import ConnectionState class RetransmitTimer: """TCP-like retransmit timer with Jacobson/Karels RTT estimation.""" INITIAL_RTO = 3000 # 3 seconds MAX_RTO = 60000 # 60 seconds ALPHA = 0.125 # SRTT smoothing BETA = 0.25 # RTTVAR smoothing def __init__(self, initial_rto: int | None = None): self.rto = initial_rto if initial_rto is not None else self.INITIAL_RTO self._srtt: float | None = None self._rttvar: float | None = None def update_rtt(self, rtt_ms: int): if self._srtt is None: self._srtt = float(rtt_ms) self._rttvar = float(rtt_ms) / 2 else: self._rttvar = (1 - self.BETA) * self._rttvar + self.BETA * abs(self._srtt - rtt_ms) # type: ignore[operator] self._srtt = (1 - self.ALPHA) * self._srtt + self.ALPHA * rtt_ms self.rto = int(self._srtt + max(1, 4 * self._rttvar)) self.rto = min(self.rto, self.MAX_RTO) def backoff(self): self.rto = min(self.rto * 2, self.MAX_RTO) class CongestionWindow: """AIMD congestion window (additive increase, multiplicative decrease).""" def __init__(self, initial_size: int = 6, ssthresh: int = 64, max_size: int = 128): self.size = initial_size self.ssthresh = ssthresh self.max_size = max_size def on_ack(self): if self.size < self.ssthresh: # Slow start: double self.size = min(self.size * 2, self.max_size) else: # Congestion avoidance: additive increase self.size = min(self.size + max(1, 1.0 / self.size), self.max_size) self.size = int(self.size) if isinstance(self.size, float) and self.size == int(self.size) else self.size def on_loss(self): self.ssthresh = max(1, int(self.size / 2)) self.size = max(1, int(self.size / 2)) class PacketScheduler: """Decides when packets can be sent based on congestion window.""" def __init__(self, window_size: int = 6): self._window_size = window_size self._outstanding: set[int] = set() @property def outstanding(self) -> int: return len(self._outstanding) def can_send(self) -> bool: return len(self._outstanding) < self._window_size def on_packet_sent(self, seq: int): self._outstanding.add(seq) def on_ack(self, seq: int): self._outstanding.discard(seq) # --------------------------------------------------------------------------- # State-aware scheduler classes # --------------------------------------------------------------------------- class SchedulerState(enum.Enum): """Scheduler states corresponding to connection lifecycle phases.""" CONNECTING = "connecting" CONNECTED_BULK = "connected_bulk" CONNECTED_INTERACTIVE = "connected_interactive" CLOSING = "closing" DEAD = "dead" class ConnectingScheduler(PacketScheduler): """During SYN: aggressive retransmit (short RTO), limited retries. Ported from net.i2p.client.streaming.impl.SchedulerConnecting. """ def __init__(self, window_size: int = 1, max_retries: int = 5, initial_rto: int = 3000): super().__init__(window_size=window_size) self.max_retries = max_retries self.timer = RetransmitTimer(initial_rto=initial_rto) self._retry_count = 0 def on_retransmit(self) -> bool: """Record a retransmit. Returns False if max retries exceeded.""" self._retry_count += 1 self.timer.backoff() return self._retry_count <= self.max_retries @property def retry_count(self) -> int: return self._retry_count class EstablishedBulkScheduler(PacketScheduler): """Established bulk: standard Jacobson/Karels RTO, AIMD congestion. Ported from net.i2p.client.streaming.impl.SchedulerConnectedBulk. """ def __init__(self, window_size: int = 6, ssthresh: int = 14, max_window: int = 128): super().__init__(window_size=window_size) self.timer = RetransmitTimer() self.congestion = CongestionWindow( initial_size=window_size, ssthresh=ssthresh, max_size=max_window ) def on_ack(self, seq: int): super().on_ack(seq) self.congestion.on_ack() # Update effective window self._window_size = int(self.congestion.size) def on_loss(self): self.congestion.on_loss() self._window_size = int(self.congestion.size) self.timer.backoff() class EstablishedInteractiveScheduler(PacketScheduler): """Established interactive: lower latency, smaller buffers. Ported from net.i2p.client.streaming.impl.SchedulerConnectedInteractive. Uses smaller window and more aggressive ACK policy. """ def __init__(self, window_size: int = 2, max_window: int = 16): super().__init__(window_size=window_size) self.timer = RetransmitTimer() self.congestion = CongestionWindow( initial_size=window_size, ssthresh=window_size * 2, max_size=max_window ) def on_ack(self, seq: int): super().on_ack(seq) self.congestion.on_ack() self._window_size = int(self.congestion.size) def on_loss(self): self.congestion.on_loss() self._window_size = int(self.congestion.size) self.timer.backoff() class ClosingScheduler(PacketScheduler): """During FIN: retransmit FIN with backoff, timeout to CLOSED. Ported from net.i2p.client.streaming.impl.SchedulerClosing. """ def __init__(self, window_size: int = 1, max_retries: int = 8, initial_rto: int = 3000): super().__init__(window_size=window_size) self.max_retries = max_retries self.timer = RetransmitTimer(initial_rto=initial_rto) self._retry_count = 0 def on_retransmit(self) -> bool: """Record a FIN retransmit. Returns False if max retries exceeded.""" self._retry_count += 1 self.timer.backoff() return self._retry_count <= self.max_retries class StateScheduler: """Selects scheduling behavior based on connection state. Ported from net.i2p.client.streaming.impl.SchedulerChooser. """ def __init__(self, profile: str = "bulk"): self._profile = profile def get_scheduler(self, state: ConnectionState) -> PacketScheduler | None: """Return the appropriate scheduler for the given connection state. Returns None for terminal states (CLOSED, RESET). """ if state in (ConnectionState.NEW, ConnectionState.SYN_SENT, ConnectionState.SYN_RECEIVED): return ConnectingScheduler() elif state == ConnectionState.ESTABLISHED: if self._profile == "interactive": return EstablishedInteractiveScheduler() return EstablishedBulkScheduler() elif state in (ConnectionState.FIN_WAIT, ConnectionState.CLOSE_WAIT, ConnectionState.LAST_ACK, ConnectionState.CLOSING, ConnectionState.TIME_WAIT): return ClosingScheduler() else: # CLOSED, RESET — no scheduling needed return None