A Python port of the Invisible Internet Project (I2P)
at main 141 lines 5.1 kB view raw
1"""Tests for stream connection state machine.""" 2 3import pytest 4 5 6class TestConnectionState: 7 def test_enum_values(self): 8 from i2p_streaming.connection import ConnectionState 9 states = ["NEW", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED", 10 "CLOSE_WAIT", "LAST_ACK", "FIN_WAIT", "CLOSING", 11 "TIME_WAIT", "CLOSED", "RESET"] 12 for name in states: 13 assert hasattr(ConnectionState, name) 14 15 16class TestStreamConnection: 17 def test_initial_state(self): 18 from i2p_streaming.connection import StreamConnection, ConnectionState 19 conn = StreamConnection() 20 assert conn.state == ConnectionState.NEW 21 22 def test_active_open(self): 23 from i2p_streaming.connection import StreamConnection, ConnectionState 24 conn = StreamConnection() 25 conn.send_syn() 26 assert conn.state == ConnectionState.SYN_SENT 27 28 def test_passive_open(self): 29 from i2p_streaming.connection import StreamConnection, ConnectionState 30 conn = StreamConnection() 31 conn.receive_syn() 32 assert conn.state == ConnectionState.SYN_RECEIVED 33 34 def test_active_open_established(self): 35 from i2p_streaming.connection import StreamConnection, ConnectionState 36 conn = StreamConnection() 37 conn.send_syn() 38 conn.receive_syn_ack() 39 assert conn.state == ConnectionState.ESTABLISHED 40 41 def test_passive_open_established(self): 42 from i2p_streaming.connection import StreamConnection, ConnectionState 43 conn = StreamConnection() 44 conn.receive_syn() 45 conn.send_syn_ack() 46 assert conn.state == ConnectionState.ESTABLISHED 47 48 def test_active_close(self): 49 from i2p_streaming.connection import StreamConnection, ConnectionState 50 conn = StreamConnection() 51 conn.send_syn() 52 conn.receive_syn_ack() 53 conn.send_close() 54 assert conn.state == ConnectionState.FIN_WAIT 55 56 def test_passive_close(self): 57 from i2p_streaming.connection import StreamConnection, ConnectionState 58 conn = StreamConnection() 59 conn.send_syn() 60 conn.receive_syn_ack() 61 conn.receive_close() 62 assert conn.state == ConnectionState.CLOSE_WAIT 63 64 def test_close_wait_to_last_ack(self): 65 from i2p_streaming.connection import StreamConnection, ConnectionState 66 conn = StreamConnection() 67 conn.send_syn() 68 conn.receive_syn_ack() 69 conn.receive_close() 70 conn.send_close() 71 assert conn.state == ConnectionState.LAST_ACK 72 73 def test_last_ack_to_closed(self): 74 from i2p_streaming.connection import StreamConnection, ConnectionState 75 conn = StreamConnection() 76 conn.send_syn() 77 conn.receive_syn_ack() 78 conn.receive_close() 79 conn.send_close() 80 conn.receive_ack() 81 assert conn.state == ConnectionState.CLOSED 82 83 def test_fin_wait_to_time_wait(self): 84 from i2p_streaming.connection import StreamConnection, ConnectionState 85 conn = StreamConnection() 86 conn.send_syn() 87 conn.receive_syn_ack() 88 conn.send_close() 89 conn.receive_close_ack() 90 assert conn.state == ConnectionState.TIME_WAIT 91 92 def test_time_wait_to_closed(self): 93 from i2p_streaming.connection import StreamConnection, ConnectionState 94 conn = StreamConnection() 95 conn.send_syn() 96 conn.receive_syn_ack() 97 conn.send_close() 98 conn.receive_close_ack() 99 conn.timeout() 100 assert conn.state == ConnectionState.CLOSED 101 102 def test_reset(self): 103 from i2p_streaming.connection import StreamConnection, ConnectionState 104 conn = StreamConnection() 105 conn.send_syn() 106 conn.receive_syn_ack() 107 conn.reset() 108 assert conn.state == ConnectionState.RESET 109 110 def test_invalid_transition(self): 111 from i2p_streaming.connection import StreamConnection 112 conn = StreamConnection() 113 with pytest.raises(RuntimeError): 114 conn.receive_syn_ack() # Can't receive SYN_ACK from NEW 115 116 def test_invalid_close_from_new(self): 117 from i2p_streaming.connection import StreamConnection 118 conn = StreamConnection() 119 with pytest.raises(RuntimeError): 120 conn.send_close() 121 122 def test_sequence_numbers(self): 123 from i2p_streaming.connection import StreamConnection 124 conn = StreamConnection() 125 assert conn.next_send_seq == 0 126 conn.increment_send_seq() 127 assert conn.next_send_seq == 1 128 129 def test_send_window(self): 130 from i2p_streaming.connection import StreamConnection 131 from i2p_streaming.options import StreamOptions 132 opts = StreamOptions(initial_window_size=10) 133 conn = StreamConnection(options=opts) 134 assert conn.send_window == 10 135 136 def test_receive_window(self): 137 from i2p_streaming.connection import StreamConnection 138 from i2p_streaming.options import StreamOptions 139 opts = StreamOptions(initial_window_size=8) 140 conn = StreamConnection(options=opts) 141 assert conn.recv_window == 8