A Python port of the Invisible Internet Project (I2P)
1"""Tests for stream connection state machine."""
2
3import pytest
4
5
6class TestConnectionState:
7 def test_enum_values(self):
8 from i2p_streaming.connection import ConnectionState
9 states = ["NEW", "SYN_SENT", "SYN_RECEIVED", "ESTABLISHED",
10 "CLOSE_WAIT", "LAST_ACK", "FIN_WAIT", "CLOSING",
11 "TIME_WAIT", "CLOSED", "RESET"]
12 for name in states:
13 assert hasattr(ConnectionState, name)
14
15
16class TestStreamConnection:
17 def test_initial_state(self):
18 from i2p_streaming.connection import StreamConnection, ConnectionState
19 conn = StreamConnection()
20 assert conn.state == ConnectionState.NEW
21
22 def test_active_open(self):
23 from i2p_streaming.connection import StreamConnection, ConnectionState
24 conn = StreamConnection()
25 conn.send_syn()
26 assert conn.state == ConnectionState.SYN_SENT
27
28 def test_passive_open(self):
29 from i2p_streaming.connection import StreamConnection, ConnectionState
30 conn = StreamConnection()
31 conn.receive_syn()
32 assert conn.state == ConnectionState.SYN_RECEIVED
33
34 def test_active_open_established(self):
35 from i2p_streaming.connection import StreamConnection, ConnectionState
36 conn = StreamConnection()
37 conn.send_syn()
38 conn.receive_syn_ack()
39 assert conn.state == ConnectionState.ESTABLISHED
40
41 def test_passive_open_established(self):
42 from i2p_streaming.connection import StreamConnection, ConnectionState
43 conn = StreamConnection()
44 conn.receive_syn()
45 conn.send_syn_ack()
46 assert conn.state == ConnectionState.ESTABLISHED
47
48 def test_active_close(self):
49 from i2p_streaming.connection import StreamConnection, ConnectionState
50 conn = StreamConnection()
51 conn.send_syn()
52 conn.receive_syn_ack()
53 conn.send_close()
54 assert conn.state == ConnectionState.FIN_WAIT
55
56 def test_passive_close(self):
57 from i2p_streaming.connection import StreamConnection, ConnectionState
58 conn = StreamConnection()
59 conn.send_syn()
60 conn.receive_syn_ack()
61 conn.receive_close()
62 assert conn.state == ConnectionState.CLOSE_WAIT
63
64 def test_close_wait_to_last_ack(self):
65 from i2p_streaming.connection import StreamConnection, ConnectionState
66 conn = StreamConnection()
67 conn.send_syn()
68 conn.receive_syn_ack()
69 conn.receive_close()
70 conn.send_close()
71 assert conn.state == ConnectionState.LAST_ACK
72
73 def test_last_ack_to_closed(self):
74 from i2p_streaming.connection import StreamConnection, ConnectionState
75 conn = StreamConnection()
76 conn.send_syn()
77 conn.receive_syn_ack()
78 conn.receive_close()
79 conn.send_close()
80 conn.receive_ack()
81 assert conn.state == ConnectionState.CLOSED
82
83 def test_fin_wait_to_time_wait(self):
84 from i2p_streaming.connection import StreamConnection, ConnectionState
85 conn = StreamConnection()
86 conn.send_syn()
87 conn.receive_syn_ack()
88 conn.send_close()
89 conn.receive_close_ack()
90 assert conn.state == ConnectionState.TIME_WAIT
91
92 def test_time_wait_to_closed(self):
93 from i2p_streaming.connection import StreamConnection, ConnectionState
94 conn = StreamConnection()
95 conn.send_syn()
96 conn.receive_syn_ack()
97 conn.send_close()
98 conn.receive_close_ack()
99 conn.timeout()
100 assert conn.state == ConnectionState.CLOSED
101
102 def test_reset(self):
103 from i2p_streaming.connection import StreamConnection, ConnectionState
104 conn = StreamConnection()
105 conn.send_syn()
106 conn.receive_syn_ack()
107 conn.reset()
108 assert conn.state == ConnectionState.RESET
109
110 def test_invalid_transition(self):
111 from i2p_streaming.connection import StreamConnection
112 conn = StreamConnection()
113 with pytest.raises(RuntimeError):
114 conn.receive_syn_ack() # Can't receive SYN_ACK from NEW
115
116 def test_invalid_close_from_new(self):
117 from i2p_streaming.connection import StreamConnection
118 conn = StreamConnection()
119 with pytest.raises(RuntimeError):
120 conn.send_close()
121
122 def test_sequence_numbers(self):
123 from i2p_streaming.connection import StreamConnection
124 conn = StreamConnection()
125 assert conn.next_send_seq == 0
126 conn.increment_send_seq()
127 assert conn.next_send_seq == 1
128
129 def test_send_window(self):
130 from i2p_streaming.connection import StreamConnection
131 from i2p_streaming.options import StreamOptions
132 opts = StreamOptions(initial_window_size=10)
133 conn = StreamConnection(options=opts)
134 assert conn.send_window == 10
135
136 def test_receive_window(self):
137 from i2p_streaming.connection import StreamConnection
138 from i2p_streaming.options import StreamOptions
139 opts = StreamOptions(initial_window_size=8)
140 conn = StreamConnection(options=opts)
141 assert conn.recv_window == 8