A Python port of the Invisible Internet Project (I2P)
1"""Packet scheduler — retransmit timer, congestion control, and state-aware scheduling.
2
3Ported from net.i2p.client.streaming.impl scheduling classes.
4"""
5
6import enum
7
8from i2p_streaming.connection import ConnectionState
9
10
11class RetransmitTimer:
12 """TCP-like retransmit timer with Jacobson/Karels RTT estimation."""
13
14 INITIAL_RTO = 3000 # 3 seconds
15 MAX_RTO = 60000 # 60 seconds
16 ALPHA = 0.125 # SRTT smoothing
17 BETA = 0.25 # RTTVAR smoothing
18
19 def __init__(self, initial_rto: int | None = None):
20 self.rto = initial_rto if initial_rto is not None else self.INITIAL_RTO
21 self._srtt: float | None = None
22 self._rttvar: float | None = None
23
24 def update_rtt(self, rtt_ms: int):
25 if self._srtt is None:
26 self._srtt = float(rtt_ms)
27 self._rttvar = float(rtt_ms) / 2
28 else:
29 self._rttvar = (1 - self.BETA) * self._rttvar + self.BETA * abs(self._srtt - rtt_ms) # type: ignore[operator]
30 self._srtt = (1 - self.ALPHA) * self._srtt + self.ALPHA * rtt_ms
31 self.rto = int(self._srtt + max(1, 4 * self._rttvar))
32 self.rto = min(self.rto, self.MAX_RTO)
33
34 def backoff(self):
35 self.rto = min(self.rto * 2, self.MAX_RTO)
36
37
38class CongestionWindow:
39 """AIMD congestion window (additive increase, multiplicative decrease)."""
40
41 def __init__(self, initial_size: int = 6, ssthresh: int = 64,
42 max_size: int = 128):
43 self.size = initial_size
44 self.ssthresh = ssthresh
45 self.max_size = max_size
46
47 def on_ack(self):
48 if self.size < self.ssthresh:
49 # Slow start: double
50 self.size = min(self.size * 2, self.max_size)
51 else:
52 # Congestion avoidance: additive increase
53 self.size = min(self.size + max(1, 1.0 / self.size), self.max_size)
54 self.size = int(self.size) if isinstance(self.size, float) and self.size == int(self.size) else self.size
55
56 def on_loss(self):
57 self.ssthresh = max(1, int(self.size / 2))
58 self.size = max(1, int(self.size / 2))
59
60
61class PacketScheduler:
62 """Decides when packets can be sent based on congestion window."""
63
64 def __init__(self, window_size: int = 6):
65 self._window_size = window_size
66 self._outstanding: set[int] = set()
67
68 @property
69 def outstanding(self) -> int:
70 return len(self._outstanding)
71
72 def can_send(self) -> bool:
73 return len(self._outstanding) < self._window_size
74
75 def on_packet_sent(self, seq: int):
76 self._outstanding.add(seq)
77
78 def on_ack(self, seq: int):
79 self._outstanding.discard(seq)
80
81
82# ---------------------------------------------------------------------------
83# State-aware scheduler classes
84# ---------------------------------------------------------------------------
85
86class SchedulerState(enum.Enum):
87 """Scheduler states corresponding to connection lifecycle phases."""
88 CONNECTING = "connecting"
89 CONNECTED_BULK = "connected_bulk"
90 CONNECTED_INTERACTIVE = "connected_interactive"
91 CLOSING = "closing"
92 DEAD = "dead"
93
94
95class ConnectingScheduler(PacketScheduler):
96 """During SYN: aggressive retransmit (short RTO), limited retries.
97
98 Ported from net.i2p.client.streaming.impl.SchedulerConnecting.
99 """
100
101 def __init__(self, window_size: int = 1, max_retries: int = 5,
102 initial_rto: int = 3000):
103 super().__init__(window_size=window_size)
104 self.max_retries = max_retries
105 self.timer = RetransmitTimer(initial_rto=initial_rto)
106 self._retry_count = 0
107
108 def on_retransmit(self) -> bool:
109 """Record a retransmit. Returns False if max retries exceeded."""
110 self._retry_count += 1
111 self.timer.backoff()
112 return self._retry_count <= self.max_retries
113
114 @property
115 def retry_count(self) -> int:
116 return self._retry_count
117
118
119class EstablishedBulkScheduler(PacketScheduler):
120 """Established bulk: standard Jacobson/Karels RTO, AIMD congestion.
121
122 Ported from net.i2p.client.streaming.impl.SchedulerConnectedBulk.
123 """
124
125 def __init__(self, window_size: int = 6, ssthresh: int = 14,
126 max_window: int = 128):
127 super().__init__(window_size=window_size)
128 self.timer = RetransmitTimer()
129 self.congestion = CongestionWindow(
130 initial_size=window_size, ssthresh=ssthresh, max_size=max_window
131 )
132
133 def on_ack(self, seq: int):
134 super().on_ack(seq)
135 self.congestion.on_ack()
136 # Update effective window
137 self._window_size = int(self.congestion.size)
138
139 def on_loss(self):
140 self.congestion.on_loss()
141 self._window_size = int(self.congestion.size)
142 self.timer.backoff()
143
144
145class EstablishedInteractiveScheduler(PacketScheduler):
146 """Established interactive: lower latency, smaller buffers.
147
148 Ported from net.i2p.client.streaming.impl.SchedulerConnectedInteractive.
149 Uses smaller window and more aggressive ACK policy.
150 """
151
152 def __init__(self, window_size: int = 2, max_window: int = 16):
153 super().__init__(window_size=window_size)
154 self.timer = RetransmitTimer()
155 self.congestion = CongestionWindow(
156 initial_size=window_size, ssthresh=window_size * 2,
157 max_size=max_window
158 )
159
160 def on_ack(self, seq: int):
161 super().on_ack(seq)
162 self.congestion.on_ack()
163 self._window_size = int(self.congestion.size)
164
165 def on_loss(self):
166 self.congestion.on_loss()
167 self._window_size = int(self.congestion.size)
168 self.timer.backoff()
169
170
171class ClosingScheduler(PacketScheduler):
172 """During FIN: retransmit FIN with backoff, timeout to CLOSED.
173
174 Ported from net.i2p.client.streaming.impl.SchedulerClosing.
175 """
176
177 def __init__(self, window_size: int = 1, max_retries: int = 8,
178 initial_rto: int = 3000):
179 super().__init__(window_size=window_size)
180 self.max_retries = max_retries
181 self.timer = RetransmitTimer(initial_rto=initial_rto)
182 self._retry_count = 0
183
184 def on_retransmit(self) -> bool:
185 """Record a FIN retransmit. Returns False if max retries exceeded."""
186 self._retry_count += 1
187 self.timer.backoff()
188 return self._retry_count <= self.max_retries
189
190
191class StateScheduler:
192 """Selects scheduling behavior based on connection state.
193
194 Ported from net.i2p.client.streaming.impl.SchedulerChooser.
195 """
196
197 def __init__(self, profile: str = "bulk"):
198 self._profile = profile
199
200 def get_scheduler(self, state: ConnectionState) -> PacketScheduler | None:
201 """Return the appropriate scheduler for the given connection state.
202
203 Returns None for terminal states (CLOSED, RESET).
204 """
205 if state in (ConnectionState.NEW, ConnectionState.SYN_SENT,
206 ConnectionState.SYN_RECEIVED):
207 return ConnectingScheduler()
208 elif state == ConnectionState.ESTABLISHED:
209 if self._profile == "interactive":
210 return EstablishedInteractiveScheduler()
211 return EstablishedBulkScheduler()
212 elif state in (ConnectionState.FIN_WAIT, ConnectionState.CLOSE_WAIT,
213 ConnectionState.LAST_ACK, ConnectionState.CLOSING,
214 ConnectionState.TIME_WAIT):
215 return ClosingScheduler()
216 else:
217 # CLOSED, RESET — no scheduling needed
218 return None