A Python port of the Invisible Internet Project (I2P)
at main 132 lines 5.1 kB view raw
1"""Tests for NTCP2 transport protocol types.""" 2 3import os 4import struct 5 6import pytest 7 8 9class TestNTCP2Frame: 10 def test_construct(self): 11 from i2p_transport.ntcp2 import NTCP2Frame, FrameType 12 frame = NTCP2Frame(frame_type=FrameType.DATA, payload=b"hello") 13 assert frame.frame_type == FrameType.DATA 14 assert frame.payload == b"hello" 15 16 def test_roundtrip(self): 17 from i2p_transport.ntcp2 import NTCP2Frame, FrameType 18 frame = NTCP2Frame(frame_type=FrameType.DATA, payload=os.urandom(64)) 19 data = frame.to_bytes() 20 frame2 = NTCP2Frame.from_bytes(data) 21 assert frame2.frame_type == frame.frame_type 22 assert frame2.payload == frame.payload 23 24 def test_padding_frame(self): 25 from i2p_transport.ntcp2 import NTCP2Frame, FrameType 26 frame = NTCP2Frame(frame_type=FrameType.PADDING, payload=os.urandom(32)) 27 data = frame.to_bytes() 28 frame2 = NTCP2Frame.from_bytes(data) 29 assert frame2.frame_type == FrameType.PADDING 30 31 def test_datetime_frame(self): 32 from i2p_transport.ntcp2 import NTCP2Frame, FrameType 33 ts = struct.pack("!I", 1710000000) 34 frame = NTCP2Frame(frame_type=FrameType.DATETIME, payload=ts) 35 data = frame.to_bytes() 36 frame2 = NTCP2Frame.from_bytes(data) 37 assert frame2.frame_type == FrameType.DATETIME 38 assert struct.unpack("!I", frame2.payload)[0] == 1710000000 39 40 def test_router_info_frame(self): 41 from i2p_transport.ntcp2 import NTCP2Frame, FrameType 42 ri_data = os.urandom(200) 43 frame = NTCP2Frame(frame_type=FrameType.ROUTER_INFO, payload=ri_data) 44 data = frame.to_bytes() 45 frame2 = NTCP2Frame.from_bytes(data) 46 assert frame2.payload == ri_data 47 48 def test_i2np_frame(self): 49 from i2p_transport.ntcp2 import NTCP2Frame, FrameType 50 i2np_data = os.urandom(100) 51 frame = NTCP2Frame(frame_type=FrameType.I2NP, payload=i2np_data) 52 data = frame.to_bytes() 53 frame2 = NTCP2Frame.from_bytes(data) 54 assert frame2.frame_type == FrameType.I2NP 55 assert frame2.payload == i2np_data 56 57 def test_termination_frame(self): 58 from i2p_transport.ntcp2 import NTCP2Frame, FrameType 59 # reason(1) + valid_received(8) = 9 bytes 60 payload = struct.pack("!BQ", 0, 42) 61 frame = NTCP2Frame(frame_type=FrameType.TERMINATION, payload=payload) 62 data = frame.to_bytes() 63 frame2 = NTCP2Frame.from_bytes(data) 64 assert frame2.frame_type == FrameType.TERMINATION 65 66 def test_multiple_frames_from_stream(self): 67 import io 68 from i2p_transport.ntcp2 import NTCP2Frame, FrameType 69 f1 = NTCP2Frame(FrameType.DATA, b"first") 70 f2 = NTCP2Frame(FrameType.DATA, b"second") 71 buf = f1.to_bytes() + f2.to_bytes() 72 stream = io.BytesIO(buf) 73 r1 = NTCP2Frame.from_stream(stream) 74 r2 = NTCP2Frame.from_stream(stream) 75 assert r1.payload == b"first" 76 assert r2.payload == b"second" 77 78 79class TestNTCP2SessionState: 80 def test_initial_state(self): 81 from i2p_transport.ntcp2 import NTCP2SessionState, TransportState 82 state = NTCP2SessionState() 83 assert state.state == TransportState.UNKNOWN 84 85 def test_handshake_flow(self): 86 from i2p_transport.ntcp2 import NTCP2SessionState, TransportState 87 state = NTCP2SessionState() 88 state.begin_handshake() 89 assert state.state == TransportState.HANDSHAKING 90 state.complete_handshake() 91 assert state.state == TransportState.ESTABLISHED 92 93 def test_terminate(self): 94 from i2p_transport.ntcp2 import NTCP2SessionState, TransportState 95 state = NTCP2SessionState() 96 state.begin_handshake() 97 state.complete_handshake() 98 state.terminate(reason=0) 99 assert state.state == TransportState.TERMINATED 100 101 def test_error(self): 102 from i2p_transport.ntcp2 import NTCP2SessionState, TransportState 103 state = NTCP2SessionState() 104 state.begin_handshake() 105 state.set_error("timeout") 106 assert state.state == TransportState.ERROR 107 assert state.error_message == "timeout" 108 109 def test_invalid_transition(self): 110 from i2p_transport.ntcp2 import NTCP2SessionState 111 state = NTCP2SessionState() 112 with pytest.raises(RuntimeError): 113 state.complete_handshake() # Can't complete without starting 114 115 def test_cannot_handshake_twice(self): 116 from i2p_transport.ntcp2 import NTCP2SessionState 117 state = NTCP2SessionState() 118 state.begin_handshake() 119 with pytest.raises(RuntimeError): 120 state.begin_handshake() 121 122 def test_message_count(self): 123 from i2p_transport.ntcp2 import NTCP2SessionState 124 state = NTCP2SessionState() 125 state.begin_handshake() 126 state.complete_handshake() 127 assert state.messages_sent == 0 128 state.on_message_sent() 129 state.on_message_sent() 130 assert state.messages_sent == 2 131 state.on_message_received() 132 assert state.messages_received == 1