"""Tests for state-aware schedulers.""" import pytest class TestSchedulerState: def test_enum_values(self): from i2p_streaming.scheduler import SchedulerState assert hasattr(SchedulerState, "CONNECTING") assert hasattr(SchedulerState, "CONNECTED_BULK") assert hasattr(SchedulerState, "CONNECTED_INTERACTIVE") assert hasattr(SchedulerState, "CLOSING") assert hasattr(SchedulerState, "DEAD") class TestStateScheduler: def test_returns_connecting_scheduler_for_syn_sent(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler, ConnectingScheduler ss = StateScheduler() sched = ss.get_scheduler(ConnectionState.SYN_SENT) assert isinstance(sched, ConnectingScheduler) def test_returns_connecting_scheduler_for_syn_received(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler, ConnectingScheduler ss = StateScheduler() sched = ss.get_scheduler(ConnectionState.SYN_RECEIVED) assert isinstance(sched, ConnectingScheduler) def test_returns_bulk_scheduler_for_established(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler, EstablishedBulkScheduler ss = StateScheduler() sched = ss.get_scheduler(ConnectionState.ESTABLISHED) assert isinstance(sched, EstablishedBulkScheduler) def test_returns_interactive_scheduler_when_configured(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler, EstablishedInteractiveScheduler ss = StateScheduler(profile="interactive") sched = ss.get_scheduler(ConnectionState.ESTABLISHED) assert isinstance(sched, EstablishedInteractiveScheduler) def test_returns_closing_scheduler_for_fin_wait(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler, ClosingScheduler ss = StateScheduler() sched = ss.get_scheduler(ConnectionState.FIN_WAIT) assert isinstance(sched, ClosingScheduler) def test_returns_closing_scheduler_for_close_wait(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler, ClosingScheduler ss = StateScheduler() sched = ss.get_scheduler(ConnectionState.CLOSE_WAIT) assert isinstance(sched, ClosingScheduler) def test_returns_closing_scheduler_for_last_ack(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler, ClosingScheduler ss = StateScheduler() sched = ss.get_scheduler(ConnectionState.LAST_ACK) assert isinstance(sched, ClosingScheduler) def test_returns_none_for_closed(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler ss = StateScheduler() sched = ss.get_scheduler(ConnectionState.CLOSED) assert sched is None def test_returns_none_for_reset(self): from i2p_streaming.connection import ConnectionState from i2p_streaming.scheduler import StateScheduler ss = StateScheduler() sched = ss.get_scheduler(ConnectionState.RESET) assert sched is None class TestConnectingScheduler: def test_has_short_initial_rto(self): from i2p_streaming.scheduler import ConnectingScheduler sched = ConnectingScheduler() # Connecting scheduler should have shorter RTO than default assert sched.timer.rto <= 3000 def test_max_retries(self): from i2p_streaming.scheduler import ConnectingScheduler sched = ConnectingScheduler(max_retries=3) assert sched.max_retries == 3 def test_can_send(self): from i2p_streaming.scheduler import ConnectingScheduler sched = ConnectingScheduler() assert sched.can_send() def test_tracks_outstanding(self): from i2p_streaming.scheduler import ConnectingScheduler sched = ConnectingScheduler(window_size=1) sched.on_packet_sent(0) assert sched.outstanding == 1 assert not sched.can_send() class TestEstablishedBulkScheduler: def test_uses_jacobson_karels(self): from i2p_streaming.scheduler import EstablishedBulkScheduler sched = EstablishedBulkScheduler() # Update RTT and verify Jacobson/Karels is used sched.timer.update_rtt(100) assert sched.timer.rto > 0 def test_congestion_window_on_ack(self): from i2p_streaming.scheduler import EstablishedBulkScheduler sched = EstablishedBulkScheduler() initial_size = sched.congestion.size sched.on_ack(0) # Should grow window assert sched.congestion.size >= initial_size def test_congestion_window_on_loss(self): from i2p_streaming.scheduler import EstablishedBulkScheduler sched = EstablishedBulkScheduler() sched.congestion.size = 20 sched.on_loss() assert sched.congestion.size < 20 def test_can_send_respects_window(self): from i2p_streaming.scheduler import EstablishedBulkScheduler sched = EstablishedBulkScheduler(window_size=2) sched.on_packet_sent(0) sched.on_packet_sent(1) assert not sched.can_send() class TestEstablishedInteractiveScheduler: def test_smaller_window(self): from i2p_streaming.scheduler import ( EstablishedBulkScheduler, EstablishedInteractiveScheduler, ) bulk = EstablishedBulkScheduler() interactive = EstablishedInteractiveScheduler() # Interactive should use smaller default window assert interactive._window_size <= bulk._window_size def test_can_send(self): from i2p_streaming.scheduler import EstablishedInteractiveScheduler sched = EstablishedInteractiveScheduler() assert sched.can_send() class TestClosingScheduler: def test_backoff_on_retransmit(self): from i2p_streaming.scheduler import ClosingScheduler sched = ClosingScheduler() initial_rto = sched.timer.rto sched.timer.backoff() assert sched.timer.rto > initial_rto def test_can_send(self): from i2p_streaming.scheduler import ClosingScheduler sched = ClosingScheduler() assert sched.can_send()