A Python port of the Invisible Internet Project (I2P)
1"""Tests for state-aware schedulers."""
2
3import pytest
4
5
6class TestSchedulerState:
7 def test_enum_values(self):
8 from i2p_streaming.scheduler import SchedulerState
9 assert hasattr(SchedulerState, "CONNECTING")
10 assert hasattr(SchedulerState, "CONNECTED_BULK")
11 assert hasattr(SchedulerState, "CONNECTED_INTERACTIVE")
12 assert hasattr(SchedulerState, "CLOSING")
13 assert hasattr(SchedulerState, "DEAD")
14
15
16class TestStateScheduler:
17 def test_returns_connecting_scheduler_for_syn_sent(self):
18 from i2p_streaming.connection import ConnectionState
19 from i2p_streaming.scheduler import StateScheduler, ConnectingScheduler
20 ss = StateScheduler()
21 sched = ss.get_scheduler(ConnectionState.SYN_SENT)
22 assert isinstance(sched, ConnectingScheduler)
23
24 def test_returns_connecting_scheduler_for_syn_received(self):
25 from i2p_streaming.connection import ConnectionState
26 from i2p_streaming.scheduler import StateScheduler, ConnectingScheduler
27 ss = StateScheduler()
28 sched = ss.get_scheduler(ConnectionState.SYN_RECEIVED)
29 assert isinstance(sched, ConnectingScheduler)
30
31 def test_returns_bulk_scheduler_for_established(self):
32 from i2p_streaming.connection import ConnectionState
33 from i2p_streaming.scheduler import StateScheduler, EstablishedBulkScheduler
34 ss = StateScheduler()
35 sched = ss.get_scheduler(ConnectionState.ESTABLISHED)
36 assert isinstance(sched, EstablishedBulkScheduler)
37
38 def test_returns_interactive_scheduler_when_configured(self):
39 from i2p_streaming.connection import ConnectionState
40 from i2p_streaming.scheduler import StateScheduler, EstablishedInteractiveScheduler
41 ss = StateScheduler(profile="interactive")
42 sched = ss.get_scheduler(ConnectionState.ESTABLISHED)
43 assert isinstance(sched, EstablishedInteractiveScheduler)
44
45 def test_returns_closing_scheduler_for_fin_wait(self):
46 from i2p_streaming.connection import ConnectionState
47 from i2p_streaming.scheduler import StateScheduler, ClosingScheduler
48 ss = StateScheduler()
49 sched = ss.get_scheduler(ConnectionState.FIN_WAIT)
50 assert isinstance(sched, ClosingScheduler)
51
52 def test_returns_closing_scheduler_for_close_wait(self):
53 from i2p_streaming.connection import ConnectionState
54 from i2p_streaming.scheduler import StateScheduler, ClosingScheduler
55 ss = StateScheduler()
56 sched = ss.get_scheduler(ConnectionState.CLOSE_WAIT)
57 assert isinstance(sched, ClosingScheduler)
58
59 def test_returns_closing_scheduler_for_last_ack(self):
60 from i2p_streaming.connection import ConnectionState
61 from i2p_streaming.scheduler import StateScheduler, ClosingScheduler
62 ss = StateScheduler()
63 sched = ss.get_scheduler(ConnectionState.LAST_ACK)
64 assert isinstance(sched, ClosingScheduler)
65
66 def test_returns_none_for_closed(self):
67 from i2p_streaming.connection import ConnectionState
68 from i2p_streaming.scheduler import StateScheduler
69 ss = StateScheduler()
70 sched = ss.get_scheduler(ConnectionState.CLOSED)
71 assert sched is None
72
73 def test_returns_none_for_reset(self):
74 from i2p_streaming.connection import ConnectionState
75 from i2p_streaming.scheduler import StateScheduler
76 ss = StateScheduler()
77 sched = ss.get_scheduler(ConnectionState.RESET)
78 assert sched is None
79
80
81class TestConnectingScheduler:
82 def test_has_short_initial_rto(self):
83 from i2p_streaming.scheduler import ConnectingScheduler
84 sched = ConnectingScheduler()
85 # Connecting scheduler should have shorter RTO than default
86 assert sched.timer.rto <= 3000
87
88 def test_max_retries(self):
89 from i2p_streaming.scheduler import ConnectingScheduler
90 sched = ConnectingScheduler(max_retries=3)
91 assert sched.max_retries == 3
92
93 def test_can_send(self):
94 from i2p_streaming.scheduler import ConnectingScheduler
95 sched = ConnectingScheduler()
96 assert sched.can_send()
97
98 def test_tracks_outstanding(self):
99 from i2p_streaming.scheduler import ConnectingScheduler
100 sched = ConnectingScheduler(window_size=1)
101 sched.on_packet_sent(0)
102 assert sched.outstanding == 1
103 assert not sched.can_send()
104
105
106class TestEstablishedBulkScheduler:
107 def test_uses_jacobson_karels(self):
108 from i2p_streaming.scheduler import EstablishedBulkScheduler
109 sched = EstablishedBulkScheduler()
110 # Update RTT and verify Jacobson/Karels is used
111 sched.timer.update_rtt(100)
112 assert sched.timer.rto > 0
113
114 def test_congestion_window_on_ack(self):
115 from i2p_streaming.scheduler import EstablishedBulkScheduler
116 sched = EstablishedBulkScheduler()
117 initial_size = sched.congestion.size
118 sched.on_ack(0)
119 # Should grow window
120 assert sched.congestion.size >= initial_size
121
122 def test_congestion_window_on_loss(self):
123 from i2p_streaming.scheduler import EstablishedBulkScheduler
124 sched = EstablishedBulkScheduler()
125 sched.congestion.size = 20
126 sched.on_loss()
127 assert sched.congestion.size < 20
128
129 def test_can_send_respects_window(self):
130 from i2p_streaming.scheduler import EstablishedBulkScheduler
131 sched = EstablishedBulkScheduler(window_size=2)
132 sched.on_packet_sent(0)
133 sched.on_packet_sent(1)
134 assert not sched.can_send()
135
136
137class TestEstablishedInteractiveScheduler:
138 def test_smaller_window(self):
139 from i2p_streaming.scheduler import (
140 EstablishedBulkScheduler,
141 EstablishedInteractiveScheduler,
142 )
143 bulk = EstablishedBulkScheduler()
144 interactive = EstablishedInteractiveScheduler()
145 # Interactive should use smaller default window
146 assert interactive._window_size <= bulk._window_size
147
148 def test_can_send(self):
149 from i2p_streaming.scheduler import EstablishedInteractiveScheduler
150 sched = EstablishedInteractiveScheduler()
151 assert sched.can_send()
152
153
154class TestClosingScheduler:
155 def test_backoff_on_retransmit(self):
156 from i2p_streaming.scheduler import ClosingScheduler
157 sched = ClosingScheduler()
158 initial_rto = sched.timer.rto
159 sched.timer.backoff()
160 assert sched.timer.rto > initial_rto
161
162 def test_can_send(self):
163 from i2p_streaming.scheduler import ClosingScheduler
164 sched = ClosingScheduler()
165 assert sched.can_send()