"""Tests for NTCP2 transport protocol types.""" import os import struct import pytest class TestNTCP2Frame: def test_construct(self): from i2p_transport.ntcp2 import NTCP2Frame, FrameType frame = NTCP2Frame(frame_type=FrameType.DATA, payload=b"hello") assert frame.frame_type == FrameType.DATA assert frame.payload == b"hello" def test_roundtrip(self): from i2p_transport.ntcp2 import NTCP2Frame, FrameType frame = NTCP2Frame(frame_type=FrameType.DATA, payload=os.urandom(64)) data = frame.to_bytes() frame2 = NTCP2Frame.from_bytes(data) assert frame2.frame_type == frame.frame_type assert frame2.payload == frame.payload def test_padding_frame(self): from i2p_transport.ntcp2 import NTCP2Frame, FrameType frame = NTCP2Frame(frame_type=FrameType.PADDING, payload=os.urandom(32)) data = frame.to_bytes() frame2 = NTCP2Frame.from_bytes(data) assert frame2.frame_type == FrameType.PADDING def test_datetime_frame(self): from i2p_transport.ntcp2 import NTCP2Frame, FrameType ts = struct.pack("!I", 1710000000) frame = NTCP2Frame(frame_type=FrameType.DATETIME, payload=ts) data = frame.to_bytes() frame2 = NTCP2Frame.from_bytes(data) assert frame2.frame_type == FrameType.DATETIME assert struct.unpack("!I", frame2.payload)[0] == 1710000000 def test_router_info_frame(self): from i2p_transport.ntcp2 import NTCP2Frame, FrameType ri_data = os.urandom(200) frame = NTCP2Frame(frame_type=FrameType.ROUTER_INFO, payload=ri_data) data = frame.to_bytes() frame2 = NTCP2Frame.from_bytes(data) assert frame2.payload == ri_data def test_i2np_frame(self): from i2p_transport.ntcp2 import NTCP2Frame, FrameType i2np_data = os.urandom(100) frame = NTCP2Frame(frame_type=FrameType.I2NP, payload=i2np_data) data = frame.to_bytes() frame2 = NTCP2Frame.from_bytes(data) assert frame2.frame_type == FrameType.I2NP assert frame2.payload == i2np_data def test_termination_frame(self): from i2p_transport.ntcp2 import NTCP2Frame, FrameType # reason(1) + valid_received(8) = 9 bytes payload = struct.pack("!BQ", 0, 42) frame = NTCP2Frame(frame_type=FrameType.TERMINATION, payload=payload) data = frame.to_bytes() frame2 = NTCP2Frame.from_bytes(data) assert frame2.frame_type == FrameType.TERMINATION def test_multiple_frames_from_stream(self): import io from i2p_transport.ntcp2 import NTCP2Frame, FrameType f1 = NTCP2Frame(FrameType.DATA, b"first") f2 = NTCP2Frame(FrameType.DATA, b"second") buf = f1.to_bytes() + f2.to_bytes() stream = io.BytesIO(buf) r1 = NTCP2Frame.from_stream(stream) r2 = NTCP2Frame.from_stream(stream) assert r1.payload == b"first" assert r2.payload == b"second" class TestNTCP2SessionState: def test_initial_state(self): from i2p_transport.ntcp2 import NTCP2SessionState, TransportState state = NTCP2SessionState() assert state.state == TransportState.UNKNOWN def test_handshake_flow(self): from i2p_transport.ntcp2 import NTCP2SessionState, TransportState state = NTCP2SessionState() state.begin_handshake() assert state.state == TransportState.HANDSHAKING state.complete_handshake() assert state.state == TransportState.ESTABLISHED def test_terminate(self): from i2p_transport.ntcp2 import NTCP2SessionState, TransportState state = NTCP2SessionState() state.begin_handshake() state.complete_handshake() state.terminate(reason=0) assert state.state == TransportState.TERMINATED def test_error(self): from i2p_transport.ntcp2 import NTCP2SessionState, TransportState state = NTCP2SessionState() state.begin_handshake() state.set_error("timeout") assert state.state == TransportState.ERROR assert state.error_message == "timeout" def test_invalid_transition(self): from i2p_transport.ntcp2 import NTCP2SessionState state = NTCP2SessionState() with pytest.raises(RuntimeError): state.complete_handshake() # Can't complete without starting def test_cannot_handshake_twice(self): from i2p_transport.ntcp2 import NTCP2SessionState state = NTCP2SessionState() state.begin_handshake() with pytest.raises(RuntimeError): state.begin_handshake() def test_message_count(self): from i2p_transport.ntcp2 import NTCP2SessionState state = NTCP2SessionState() state.begin_handshake() state.complete_handshake() assert state.messages_sent == 0 state.on_message_sent() state.on_message_sent() assert state.messages_sent == 2 state.on_message_received() assert state.messages_received == 1