A Python port of the Invisible Internet Project (I2P)
1"""Tests for I2CP session state machine."""
2
3import os
4
5import pytest
6
7
8class TestSessionState:
9 def test_enum_values(self):
10 from i2p_client.session import SessionState
11 assert SessionState.INIT.value == 0
12 assert SessionState.OPENING.value == 1
13 assert SessionState.OPEN.value == 2
14 assert SessionState.CLOSING.value == 3
15 assert SessionState.CLOSED.value == 4
16 assert SessionState.ERROR.value == 5
17
18
19class TestSessionId:
20 def test_construct(self):
21 from i2p_client.session import SessionId
22 sid = SessionId(42)
23 assert int(sid) == 42
24
25 def test_equality(self):
26 from i2p_client.session import SessionId
27 assert SessionId(1) == SessionId(1)
28 assert SessionId(1) != SessionId(2)
29
30 def test_hash(self):
31 from i2p_client.session import SessionId
32 s = {SessionId(1), SessionId(1), SessionId(2)}
33 assert len(s) == 2
34
35 def test_range_validation(self):
36 from i2p_client.session import SessionId
37 SessionId(0)
38 SessionId(0xFFFF)
39 with pytest.raises(ValueError):
40 SessionId(-1)
41 with pytest.raises(ValueError):
42 SessionId(0x10000)
43
44
45class TestI2CPSession:
46 def test_initial_state(self):
47 from i2p_client.session import I2CPSession, SessionState
48 session = I2CPSession()
49 assert session.state == SessionState.INIT
50
51 def test_open_transitions_to_opening(self):
52 from i2p_client.session import I2CPSession, SessionState
53 from i2p_client.session_config import SessionConfig
54 session = I2CPSession()
55 dest_data = os.urandom(387)
56 session.open(dest_data, SessionConfig())
57 assert session.state == SessionState.OPENING
58
59 def test_confirm_open(self):
60 from i2p_client.session import I2CPSession, SessionState, SessionId
61 from i2p_client.session_config import SessionConfig
62 session = I2CPSession()
63 session.open(os.urandom(387), SessionConfig())
64 session.confirm_open(SessionId(5))
65 assert session.state == SessionState.OPEN
66 assert session.session_id == SessionId(5)
67
68 def test_close(self):
69 from i2p_client.session import I2CPSession, SessionState, SessionId
70 from i2p_client.session_config import SessionConfig
71 session = I2CPSession()
72 session.open(os.urandom(387), SessionConfig())
73 session.confirm_open(SessionId(1))
74 session.close()
75 assert session.state == SessionState.CLOSING
76
77 def test_confirm_close(self):
78 from i2p_client.session import I2CPSession, SessionState, SessionId
79 from i2p_client.session_config import SessionConfig
80 session = I2CPSession()
81 session.open(os.urandom(387), SessionConfig())
82 session.confirm_open(SessionId(1))
83 session.close()
84 session.confirm_close()
85 assert session.state == SessionState.CLOSED
86
87 def test_error_from_opening(self):
88 from i2p_client.session import I2CPSession, SessionState
89 from i2p_client.session_config import SessionConfig
90 session = I2CPSession()
91 session.open(os.urandom(387), SessionConfig())
92 session.set_error("refused")
93 assert session.state == SessionState.ERROR
94 assert session.error_message == "refused"
95
96 def test_cannot_open_twice(self):
97 from i2p_client.session import I2CPSession
98 from i2p_client.session_config import SessionConfig
99 session = I2CPSession()
100 session.open(os.urandom(387), SessionConfig())
101 with pytest.raises(RuntimeError):
102 session.open(os.urandom(387), SessionConfig())
103
104 def test_cannot_confirm_from_init(self):
105 from i2p_client.session import I2CPSession, SessionId
106 session = I2CPSession()
107 with pytest.raises(RuntimeError):
108 session.confirm_open(SessionId(1))
109
110 def test_cannot_close_from_init(self):
111 from i2p_client.session import I2CPSession
112 session = I2CPSession()
113 with pytest.raises(RuntimeError):
114 session.close()
115
116 def test_message_callback(self):
117 from i2p_client.session import I2CPSession, SessionState, SessionId
118 from i2p_client.session_config import SessionConfig
119 received = []
120 session = I2CPSession()
121 session.on_message(lambda msg: received.append(msg))
122 session.open(os.urandom(387), SessionConfig())
123 session.confirm_open(SessionId(1))
124 session.deliver_message(b"hello i2p")
125 assert len(received) == 1
126 assert received[0] == b"hello i2p"
127
128 def test_destination_data_stored(self):
129 from i2p_client.session import I2CPSession
130 from i2p_client.session_config import SessionConfig
131 dest = os.urandom(387)
132 session = I2CPSession()
133 session.open(dest, SessionConfig())
134 assert session.destination_data == dest
135
136 def test_config_stored(self):
137 from i2p_client.session import I2CPSession
138 from i2p_client.session_config import SessionConfig
139 cfg = SessionConfig({"inbound.quantity": "5"})
140 session = I2CPSession()
141 session.open(os.urandom(387), cfg)
142 assert session.config.get("inbound.quantity") == "5"