A Python port of the Invisible Internet Project (I2P)
at main 142 lines 5.2 kB view raw
1"""Tests for I2CP session state machine.""" 2 3import os 4 5import pytest 6 7 8class TestSessionState: 9 def test_enum_values(self): 10 from i2p_client.session import SessionState 11 assert SessionState.INIT.value == 0 12 assert SessionState.OPENING.value == 1 13 assert SessionState.OPEN.value == 2 14 assert SessionState.CLOSING.value == 3 15 assert SessionState.CLOSED.value == 4 16 assert SessionState.ERROR.value == 5 17 18 19class TestSessionId: 20 def test_construct(self): 21 from i2p_client.session import SessionId 22 sid = SessionId(42) 23 assert int(sid) == 42 24 25 def test_equality(self): 26 from i2p_client.session import SessionId 27 assert SessionId(1) == SessionId(1) 28 assert SessionId(1) != SessionId(2) 29 30 def test_hash(self): 31 from i2p_client.session import SessionId 32 s = {SessionId(1), SessionId(1), SessionId(2)} 33 assert len(s) == 2 34 35 def test_range_validation(self): 36 from i2p_client.session import SessionId 37 SessionId(0) 38 SessionId(0xFFFF) 39 with pytest.raises(ValueError): 40 SessionId(-1) 41 with pytest.raises(ValueError): 42 SessionId(0x10000) 43 44 45class TestI2CPSession: 46 def test_initial_state(self): 47 from i2p_client.session import I2CPSession, SessionState 48 session = I2CPSession() 49 assert session.state == SessionState.INIT 50 51 def test_open_transitions_to_opening(self): 52 from i2p_client.session import I2CPSession, SessionState 53 from i2p_client.session_config import SessionConfig 54 session = I2CPSession() 55 dest_data = os.urandom(387) 56 session.open(dest_data, SessionConfig()) 57 assert session.state == SessionState.OPENING 58 59 def test_confirm_open(self): 60 from i2p_client.session import I2CPSession, SessionState, SessionId 61 from i2p_client.session_config import SessionConfig 62 session = I2CPSession() 63 session.open(os.urandom(387), SessionConfig()) 64 session.confirm_open(SessionId(5)) 65 assert session.state == SessionState.OPEN 66 assert session.session_id == SessionId(5) 67 68 def test_close(self): 69 from i2p_client.session import I2CPSession, SessionState, SessionId 70 from i2p_client.session_config import SessionConfig 71 session = I2CPSession() 72 session.open(os.urandom(387), SessionConfig()) 73 session.confirm_open(SessionId(1)) 74 session.close() 75 assert session.state == SessionState.CLOSING 76 77 def test_confirm_close(self): 78 from i2p_client.session import I2CPSession, SessionState, SessionId 79 from i2p_client.session_config import SessionConfig 80 session = I2CPSession() 81 session.open(os.urandom(387), SessionConfig()) 82 session.confirm_open(SessionId(1)) 83 session.close() 84 session.confirm_close() 85 assert session.state == SessionState.CLOSED 86 87 def test_error_from_opening(self): 88 from i2p_client.session import I2CPSession, SessionState 89 from i2p_client.session_config import SessionConfig 90 session = I2CPSession() 91 session.open(os.urandom(387), SessionConfig()) 92 session.set_error("refused") 93 assert session.state == SessionState.ERROR 94 assert session.error_message == "refused" 95 96 def test_cannot_open_twice(self): 97 from i2p_client.session import I2CPSession 98 from i2p_client.session_config import SessionConfig 99 session = I2CPSession() 100 session.open(os.urandom(387), SessionConfig()) 101 with pytest.raises(RuntimeError): 102 session.open(os.urandom(387), SessionConfig()) 103 104 def test_cannot_confirm_from_init(self): 105 from i2p_client.session import I2CPSession, SessionId 106 session = I2CPSession() 107 with pytest.raises(RuntimeError): 108 session.confirm_open(SessionId(1)) 109 110 def test_cannot_close_from_init(self): 111 from i2p_client.session import I2CPSession 112 session = I2CPSession() 113 with pytest.raises(RuntimeError): 114 session.close() 115 116 def test_message_callback(self): 117 from i2p_client.session import I2CPSession, SessionState, SessionId 118 from i2p_client.session_config import SessionConfig 119 received = [] 120 session = I2CPSession() 121 session.on_message(lambda msg: received.append(msg)) 122 session.open(os.urandom(387), SessionConfig()) 123 session.confirm_open(SessionId(1)) 124 session.deliver_message(b"hello i2p") 125 assert len(received) == 1 126 assert received[0] == b"hello i2p" 127 128 def test_destination_data_stored(self): 129 from i2p_client.session import I2CPSession 130 from i2p_client.session_config import SessionConfig 131 dest = os.urandom(387) 132 session = I2CPSession() 133 session.open(dest, SessionConfig()) 134 assert session.destination_data == dest 135 136 def test_config_stored(self): 137 from i2p_client.session import I2CPSession 138 from i2p_client.session_config import SessionConfig 139 cfg = SessionConfig({"inbound.quantity": "5"}) 140 session = I2CPSession() 141 session.open(os.urandom(387), cfg) 142 assert session.config.get("inbound.quantity") == "5"