A Python port of the Invisible Internet Project (I2P)
at main 218 lines 7.5 kB view raw
1"""Packet scheduler — retransmit timer, congestion control, and state-aware scheduling. 2 3Ported from net.i2p.client.streaming.impl scheduling classes. 4""" 5 6import enum 7 8from i2p_streaming.connection import ConnectionState 9 10 11class RetransmitTimer: 12 """TCP-like retransmit timer with Jacobson/Karels RTT estimation.""" 13 14 INITIAL_RTO = 3000 # 3 seconds 15 MAX_RTO = 60000 # 60 seconds 16 ALPHA = 0.125 # SRTT smoothing 17 BETA = 0.25 # RTTVAR smoothing 18 19 def __init__(self, initial_rto: int | None = None): 20 self.rto = initial_rto if initial_rto is not None else self.INITIAL_RTO 21 self._srtt: float | None = None 22 self._rttvar: float | None = None 23 24 def update_rtt(self, rtt_ms: int): 25 if self._srtt is None: 26 self._srtt = float(rtt_ms) 27 self._rttvar = float(rtt_ms) / 2 28 else: 29 self._rttvar = (1 - self.BETA) * self._rttvar + self.BETA * abs(self._srtt - rtt_ms) # type: ignore[operator] 30 self._srtt = (1 - self.ALPHA) * self._srtt + self.ALPHA * rtt_ms 31 self.rto = int(self._srtt + max(1, 4 * self._rttvar)) 32 self.rto = min(self.rto, self.MAX_RTO) 33 34 def backoff(self): 35 self.rto = min(self.rto * 2, self.MAX_RTO) 36 37 38class CongestionWindow: 39 """AIMD congestion window (additive increase, multiplicative decrease).""" 40 41 def __init__(self, initial_size: int = 6, ssthresh: int = 64, 42 max_size: int = 128): 43 self.size = initial_size 44 self.ssthresh = ssthresh 45 self.max_size = max_size 46 47 def on_ack(self): 48 if self.size < self.ssthresh: 49 # Slow start: double 50 self.size = min(self.size * 2, self.max_size) 51 else: 52 # Congestion avoidance: additive increase 53 self.size = min(self.size + max(1, 1.0 / self.size), self.max_size) 54 self.size = int(self.size) if isinstance(self.size, float) and self.size == int(self.size) else self.size 55 56 def on_loss(self): 57 self.ssthresh = max(1, int(self.size / 2)) 58 self.size = max(1, int(self.size / 2)) 59 60 61class PacketScheduler: 62 """Decides when packets can be sent based on congestion window.""" 63 64 def __init__(self, window_size: int = 6): 65 self._window_size = window_size 66 self._outstanding: set[int] = set() 67 68 @property 69 def outstanding(self) -> int: 70 return len(self._outstanding) 71 72 def can_send(self) -> bool: 73 return len(self._outstanding) < self._window_size 74 75 def on_packet_sent(self, seq: int): 76 self._outstanding.add(seq) 77 78 def on_ack(self, seq: int): 79 self._outstanding.discard(seq) 80 81 82# --------------------------------------------------------------------------- 83# State-aware scheduler classes 84# --------------------------------------------------------------------------- 85 86class SchedulerState(enum.Enum): 87 """Scheduler states corresponding to connection lifecycle phases.""" 88 CONNECTING = "connecting" 89 CONNECTED_BULK = "connected_bulk" 90 CONNECTED_INTERACTIVE = "connected_interactive" 91 CLOSING = "closing" 92 DEAD = "dead" 93 94 95class ConnectingScheduler(PacketScheduler): 96 """During SYN: aggressive retransmit (short RTO), limited retries. 97 98 Ported from net.i2p.client.streaming.impl.SchedulerConnecting. 99 """ 100 101 def __init__(self, window_size: int = 1, max_retries: int = 5, 102 initial_rto: int = 3000): 103 super().__init__(window_size=window_size) 104 self.max_retries = max_retries 105 self.timer = RetransmitTimer(initial_rto=initial_rto) 106 self._retry_count = 0 107 108 def on_retransmit(self) -> bool: 109 """Record a retransmit. Returns False if max retries exceeded.""" 110 self._retry_count += 1 111 self.timer.backoff() 112 return self._retry_count <= self.max_retries 113 114 @property 115 def retry_count(self) -> int: 116 return self._retry_count 117 118 119class EstablishedBulkScheduler(PacketScheduler): 120 """Established bulk: standard Jacobson/Karels RTO, AIMD congestion. 121 122 Ported from net.i2p.client.streaming.impl.SchedulerConnectedBulk. 123 """ 124 125 def __init__(self, window_size: int = 6, ssthresh: int = 14, 126 max_window: int = 128): 127 super().__init__(window_size=window_size) 128 self.timer = RetransmitTimer() 129 self.congestion = CongestionWindow( 130 initial_size=window_size, ssthresh=ssthresh, max_size=max_window 131 ) 132 133 def on_ack(self, seq: int): 134 super().on_ack(seq) 135 self.congestion.on_ack() 136 # Update effective window 137 self._window_size = int(self.congestion.size) 138 139 def on_loss(self): 140 self.congestion.on_loss() 141 self._window_size = int(self.congestion.size) 142 self.timer.backoff() 143 144 145class EstablishedInteractiveScheduler(PacketScheduler): 146 """Established interactive: lower latency, smaller buffers. 147 148 Ported from net.i2p.client.streaming.impl.SchedulerConnectedInteractive. 149 Uses smaller window and more aggressive ACK policy. 150 """ 151 152 def __init__(self, window_size: int = 2, max_window: int = 16): 153 super().__init__(window_size=window_size) 154 self.timer = RetransmitTimer() 155 self.congestion = CongestionWindow( 156 initial_size=window_size, ssthresh=window_size * 2, 157 max_size=max_window 158 ) 159 160 def on_ack(self, seq: int): 161 super().on_ack(seq) 162 self.congestion.on_ack() 163 self._window_size = int(self.congestion.size) 164 165 def on_loss(self): 166 self.congestion.on_loss() 167 self._window_size = int(self.congestion.size) 168 self.timer.backoff() 169 170 171class ClosingScheduler(PacketScheduler): 172 """During FIN: retransmit FIN with backoff, timeout to CLOSED. 173 174 Ported from net.i2p.client.streaming.impl.SchedulerClosing. 175 """ 176 177 def __init__(self, window_size: int = 1, max_retries: int = 8, 178 initial_rto: int = 3000): 179 super().__init__(window_size=window_size) 180 self.max_retries = max_retries 181 self.timer = RetransmitTimer(initial_rto=initial_rto) 182 self._retry_count = 0 183 184 def on_retransmit(self) -> bool: 185 """Record a FIN retransmit. Returns False if max retries exceeded.""" 186 self._retry_count += 1 187 self.timer.backoff() 188 return self._retry_count <= self.max_retries 189 190 191class StateScheduler: 192 """Selects scheduling behavior based on connection state. 193 194 Ported from net.i2p.client.streaming.impl.SchedulerChooser. 195 """ 196 197 def __init__(self, profile: str = "bulk"): 198 self._profile = profile 199 200 def get_scheduler(self, state: ConnectionState) -> PacketScheduler | None: 201 """Return the appropriate scheduler for the given connection state. 202 203 Returns None for terminal states (CLOSED, RESET). 204 """ 205 if state in (ConnectionState.NEW, ConnectionState.SYN_SENT, 206 ConnectionState.SYN_RECEIVED): 207 return ConnectingScheduler() 208 elif state == ConnectionState.ESTABLISHED: 209 if self._profile == "interactive": 210 return EstablishedInteractiveScheduler() 211 return EstablishedBulkScheduler() 212 elif state in (ConnectionState.FIN_WAIT, ConnectionState.CLOSE_WAIT, 213 ConnectionState.LAST_ACK, ConnectionState.CLOSING, 214 ConnectionState.TIME_WAIT): 215 return ClosingScheduler() 216 else: 217 # CLOSED, RESET — no scheduling needed 218 return None