A Python port of the Invisible Internet Project (I2P)
1"""Tests for NTCP2 transport protocol types."""
2
3import os
4import struct
5
6import pytest
7
8
9class TestNTCP2Frame:
10 def test_construct(self):
11 from i2p_transport.ntcp2 import NTCP2Frame, FrameType
12 frame = NTCP2Frame(frame_type=FrameType.DATA, payload=b"hello")
13 assert frame.frame_type == FrameType.DATA
14 assert frame.payload == b"hello"
15
16 def test_roundtrip(self):
17 from i2p_transport.ntcp2 import NTCP2Frame, FrameType
18 frame = NTCP2Frame(frame_type=FrameType.DATA, payload=os.urandom(64))
19 data = frame.to_bytes()
20 frame2 = NTCP2Frame.from_bytes(data)
21 assert frame2.frame_type == frame.frame_type
22 assert frame2.payload == frame.payload
23
24 def test_padding_frame(self):
25 from i2p_transport.ntcp2 import NTCP2Frame, FrameType
26 frame = NTCP2Frame(frame_type=FrameType.PADDING, payload=os.urandom(32))
27 data = frame.to_bytes()
28 frame2 = NTCP2Frame.from_bytes(data)
29 assert frame2.frame_type == FrameType.PADDING
30
31 def test_datetime_frame(self):
32 from i2p_transport.ntcp2 import NTCP2Frame, FrameType
33 ts = struct.pack("!I", 1710000000)
34 frame = NTCP2Frame(frame_type=FrameType.DATETIME, payload=ts)
35 data = frame.to_bytes()
36 frame2 = NTCP2Frame.from_bytes(data)
37 assert frame2.frame_type == FrameType.DATETIME
38 assert struct.unpack("!I", frame2.payload)[0] == 1710000000
39
40 def test_router_info_frame(self):
41 from i2p_transport.ntcp2 import NTCP2Frame, FrameType
42 ri_data = os.urandom(200)
43 frame = NTCP2Frame(frame_type=FrameType.ROUTER_INFO, payload=ri_data)
44 data = frame.to_bytes()
45 frame2 = NTCP2Frame.from_bytes(data)
46 assert frame2.payload == ri_data
47
48 def test_i2np_frame(self):
49 from i2p_transport.ntcp2 import NTCP2Frame, FrameType
50 i2np_data = os.urandom(100)
51 frame = NTCP2Frame(frame_type=FrameType.I2NP, payload=i2np_data)
52 data = frame.to_bytes()
53 frame2 = NTCP2Frame.from_bytes(data)
54 assert frame2.frame_type == FrameType.I2NP
55 assert frame2.payload == i2np_data
56
57 def test_termination_frame(self):
58 from i2p_transport.ntcp2 import NTCP2Frame, FrameType
59 # reason(1) + valid_received(8) = 9 bytes
60 payload = struct.pack("!BQ", 0, 42)
61 frame = NTCP2Frame(frame_type=FrameType.TERMINATION, payload=payload)
62 data = frame.to_bytes()
63 frame2 = NTCP2Frame.from_bytes(data)
64 assert frame2.frame_type == FrameType.TERMINATION
65
66 def test_multiple_frames_from_stream(self):
67 import io
68 from i2p_transport.ntcp2 import NTCP2Frame, FrameType
69 f1 = NTCP2Frame(FrameType.DATA, b"first")
70 f2 = NTCP2Frame(FrameType.DATA, b"second")
71 buf = f1.to_bytes() + f2.to_bytes()
72 stream = io.BytesIO(buf)
73 r1 = NTCP2Frame.from_stream(stream)
74 r2 = NTCP2Frame.from_stream(stream)
75 assert r1.payload == b"first"
76 assert r2.payload == b"second"
77
78
79class TestNTCP2SessionState:
80 def test_initial_state(self):
81 from i2p_transport.ntcp2 import NTCP2SessionState, TransportState
82 state = NTCP2SessionState()
83 assert state.state == TransportState.UNKNOWN
84
85 def test_handshake_flow(self):
86 from i2p_transport.ntcp2 import NTCP2SessionState, TransportState
87 state = NTCP2SessionState()
88 state.begin_handshake()
89 assert state.state == TransportState.HANDSHAKING
90 state.complete_handshake()
91 assert state.state == TransportState.ESTABLISHED
92
93 def test_terminate(self):
94 from i2p_transport.ntcp2 import NTCP2SessionState, TransportState
95 state = NTCP2SessionState()
96 state.begin_handshake()
97 state.complete_handshake()
98 state.terminate(reason=0)
99 assert state.state == TransportState.TERMINATED
100
101 def test_error(self):
102 from i2p_transport.ntcp2 import NTCP2SessionState, TransportState
103 state = NTCP2SessionState()
104 state.begin_handshake()
105 state.set_error("timeout")
106 assert state.state == TransportState.ERROR
107 assert state.error_message == "timeout"
108
109 def test_invalid_transition(self):
110 from i2p_transport.ntcp2 import NTCP2SessionState
111 state = NTCP2SessionState()
112 with pytest.raises(RuntimeError):
113 state.complete_handshake() # Can't complete without starting
114
115 def test_cannot_handshake_twice(self):
116 from i2p_transport.ntcp2 import NTCP2SessionState
117 state = NTCP2SessionState()
118 state.begin_handshake()
119 with pytest.raises(RuntimeError):
120 state.begin_handshake()
121
122 def test_message_count(self):
123 from i2p_transport.ntcp2 import NTCP2SessionState
124 state = NTCP2SessionState()
125 state.begin_handshake()
126 state.complete_handshake()
127 assert state.messages_sent == 0
128 state.on_message_sent()
129 state.on_message_sent()
130 assert state.messages_sent == 2
131 state.on_message_received()
132 assert state.messages_received == 1