"""Tests for stream connection state machine.""" import pytest class TestConnectionState: def test_enum_values(self): from i2p_streaming.connection import ConnectionState states = ["NEW", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED", "CLOSE_WAIT", "LAST_ACK", "FIN_WAIT", "CLOSING", "TIME_WAIT", "CLOSED", "RESET"] for name in states: assert hasattr(ConnectionState, name) class TestStreamConnection: def test_initial_state(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() assert conn.state == ConnectionState.NEW def test_active_open(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() assert conn.state == ConnectionState.SYN_SENT def test_passive_open(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.receive_syn() assert conn.state == ConnectionState.SYN_RECEIVED def test_active_open_established(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() conn.receive_syn_ack() assert conn.state == ConnectionState.ESTABLISHED def test_passive_open_established(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.receive_syn() conn.send_syn_ack() assert conn.state == ConnectionState.ESTABLISHED def test_active_close(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() conn.receive_syn_ack() conn.send_close() assert conn.state == ConnectionState.FIN_WAIT def test_passive_close(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() conn.receive_syn_ack() conn.receive_close() assert conn.state == ConnectionState.CLOSE_WAIT def test_close_wait_to_last_ack(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() conn.receive_syn_ack() conn.receive_close() conn.send_close() assert conn.state == ConnectionState.LAST_ACK def test_last_ack_to_closed(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() conn.receive_syn_ack() conn.receive_close() conn.send_close() conn.receive_ack() assert conn.state == ConnectionState.CLOSED def test_fin_wait_to_time_wait(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() conn.receive_syn_ack() conn.send_close() conn.receive_close_ack() assert conn.state == ConnectionState.TIME_WAIT def test_time_wait_to_closed(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() conn.receive_syn_ack() conn.send_close() conn.receive_close_ack() conn.timeout() assert conn.state == ConnectionState.CLOSED def test_reset(self): from i2p_streaming.connection import StreamConnection, ConnectionState conn = StreamConnection() conn.send_syn() conn.receive_syn_ack() conn.reset() assert conn.state == ConnectionState.RESET def test_invalid_transition(self): from i2p_streaming.connection import StreamConnection conn = StreamConnection() with pytest.raises(RuntimeError): conn.receive_syn_ack() # Can't receive SYN_ACK from NEW def test_invalid_close_from_new(self): from i2p_streaming.connection import StreamConnection conn = StreamConnection() with pytest.raises(RuntimeError): conn.send_close() def test_sequence_numbers(self): from i2p_streaming.connection import StreamConnection conn = StreamConnection() assert conn.next_send_seq == 0 conn.increment_send_seq() assert conn.next_send_seq == 1 def test_send_window(self): from i2p_streaming.connection import StreamConnection from i2p_streaming.options import StreamOptions opts = StreamOptions(initial_window_size=10) conn = StreamConnection(options=opts) assert conn.send_window == 10 def test_receive_window(self): from i2p_streaming.connection import StreamConnection from i2p_streaming.options import StreamOptions opts = StreamOptions(initial_window_size=8) conn = StreamConnection(options=opts) assert conn.recv_window == 8