A Python port of the Invisible Internet Project (I2P)
at main 165 lines 6.6 kB view raw
1"""Tests for state-aware schedulers.""" 2 3import pytest 4 5 6class TestSchedulerState: 7 def test_enum_values(self): 8 from i2p_streaming.scheduler import SchedulerState 9 assert hasattr(SchedulerState, "CONNECTING") 10 assert hasattr(SchedulerState, "CONNECTED_BULK") 11 assert hasattr(SchedulerState, "CONNECTED_INTERACTIVE") 12 assert hasattr(SchedulerState, "CLOSING") 13 assert hasattr(SchedulerState, "DEAD") 14 15 16class TestStateScheduler: 17 def test_returns_connecting_scheduler_for_syn_sent(self): 18 from i2p_streaming.connection import ConnectionState 19 from i2p_streaming.scheduler import StateScheduler, ConnectingScheduler 20 ss = StateScheduler() 21 sched = ss.get_scheduler(ConnectionState.SYN_SENT) 22 assert isinstance(sched, ConnectingScheduler) 23 24 def test_returns_connecting_scheduler_for_syn_received(self): 25 from i2p_streaming.connection import ConnectionState 26 from i2p_streaming.scheduler import StateScheduler, ConnectingScheduler 27 ss = StateScheduler() 28 sched = ss.get_scheduler(ConnectionState.SYN_RECEIVED) 29 assert isinstance(sched, ConnectingScheduler) 30 31 def test_returns_bulk_scheduler_for_established(self): 32 from i2p_streaming.connection import ConnectionState 33 from i2p_streaming.scheduler import StateScheduler, EstablishedBulkScheduler 34 ss = StateScheduler() 35 sched = ss.get_scheduler(ConnectionState.ESTABLISHED) 36 assert isinstance(sched, EstablishedBulkScheduler) 37 38 def test_returns_interactive_scheduler_when_configured(self): 39 from i2p_streaming.connection import ConnectionState 40 from i2p_streaming.scheduler import StateScheduler, EstablishedInteractiveScheduler 41 ss = StateScheduler(profile="interactive") 42 sched = ss.get_scheduler(ConnectionState.ESTABLISHED) 43 assert isinstance(sched, EstablishedInteractiveScheduler) 44 45 def test_returns_closing_scheduler_for_fin_wait(self): 46 from i2p_streaming.connection import ConnectionState 47 from i2p_streaming.scheduler import StateScheduler, ClosingScheduler 48 ss = StateScheduler() 49 sched = ss.get_scheduler(ConnectionState.FIN_WAIT) 50 assert isinstance(sched, ClosingScheduler) 51 52 def test_returns_closing_scheduler_for_close_wait(self): 53 from i2p_streaming.connection import ConnectionState 54 from i2p_streaming.scheduler import StateScheduler, ClosingScheduler 55 ss = StateScheduler() 56 sched = ss.get_scheduler(ConnectionState.CLOSE_WAIT) 57 assert isinstance(sched, ClosingScheduler) 58 59 def test_returns_closing_scheduler_for_last_ack(self): 60 from i2p_streaming.connection import ConnectionState 61 from i2p_streaming.scheduler import StateScheduler, ClosingScheduler 62 ss = StateScheduler() 63 sched = ss.get_scheduler(ConnectionState.LAST_ACK) 64 assert isinstance(sched, ClosingScheduler) 65 66 def test_returns_none_for_closed(self): 67 from i2p_streaming.connection import ConnectionState 68 from i2p_streaming.scheduler import StateScheduler 69 ss = StateScheduler() 70 sched = ss.get_scheduler(ConnectionState.CLOSED) 71 assert sched is None 72 73 def test_returns_none_for_reset(self): 74 from i2p_streaming.connection import ConnectionState 75 from i2p_streaming.scheduler import StateScheduler 76 ss = StateScheduler() 77 sched = ss.get_scheduler(ConnectionState.RESET) 78 assert sched is None 79 80 81class TestConnectingScheduler: 82 def test_has_short_initial_rto(self): 83 from i2p_streaming.scheduler import ConnectingScheduler 84 sched = ConnectingScheduler() 85 # Connecting scheduler should have shorter RTO than default 86 assert sched.timer.rto <= 3000 87 88 def test_max_retries(self): 89 from i2p_streaming.scheduler import ConnectingScheduler 90 sched = ConnectingScheduler(max_retries=3) 91 assert sched.max_retries == 3 92 93 def test_can_send(self): 94 from i2p_streaming.scheduler import ConnectingScheduler 95 sched = ConnectingScheduler() 96 assert sched.can_send() 97 98 def test_tracks_outstanding(self): 99 from i2p_streaming.scheduler import ConnectingScheduler 100 sched = ConnectingScheduler(window_size=1) 101 sched.on_packet_sent(0) 102 assert sched.outstanding == 1 103 assert not sched.can_send() 104 105 106class TestEstablishedBulkScheduler: 107 def test_uses_jacobson_karels(self): 108 from i2p_streaming.scheduler import EstablishedBulkScheduler 109 sched = EstablishedBulkScheduler() 110 # Update RTT and verify Jacobson/Karels is used 111 sched.timer.update_rtt(100) 112 assert sched.timer.rto > 0 113 114 def test_congestion_window_on_ack(self): 115 from i2p_streaming.scheduler import EstablishedBulkScheduler 116 sched = EstablishedBulkScheduler() 117 initial_size = sched.congestion.size 118 sched.on_ack(0) 119 # Should grow window 120 assert sched.congestion.size >= initial_size 121 122 def test_congestion_window_on_loss(self): 123 from i2p_streaming.scheduler import EstablishedBulkScheduler 124 sched = EstablishedBulkScheduler() 125 sched.congestion.size = 20 126 sched.on_loss() 127 assert sched.congestion.size < 20 128 129 def test_can_send_respects_window(self): 130 from i2p_streaming.scheduler import EstablishedBulkScheduler 131 sched = EstablishedBulkScheduler(window_size=2) 132 sched.on_packet_sent(0) 133 sched.on_packet_sent(1) 134 assert not sched.can_send() 135 136 137class TestEstablishedInteractiveScheduler: 138 def test_smaller_window(self): 139 from i2p_streaming.scheduler import ( 140 EstablishedBulkScheduler, 141 EstablishedInteractiveScheduler, 142 ) 143 bulk = EstablishedBulkScheduler() 144 interactive = EstablishedInteractiveScheduler() 145 # Interactive should use smaller default window 146 assert interactive._window_size <= bulk._window_size 147 148 def test_can_send(self): 149 from i2p_streaming.scheduler import EstablishedInteractiveScheduler 150 sched = EstablishedInteractiveScheduler() 151 assert sched.can_send() 152 153 154class TestClosingScheduler: 155 def test_backoff_on_retransmit(self): 156 from i2p_streaming.scheduler import ClosingScheduler 157 sched = ClosingScheduler() 158 initial_rto = sched.timer.rto 159 sched.timer.backoff() 160 assert sched.timer.rto > initial_rto 161 162 def test_can_send(self): 163 from i2p_streaming.scheduler import ClosingScheduler 164 sched = ClosingScheduler() 165 assert sched.can_send()