"""Tests for stream packet types.""" import os import struct import pytest class TestStreamFlags: def test_flag_values(self): from i2p_streaming.packet import Flags assert Flags.SYNCHRONIZE == 0x0001 assert Flags.CLOSE == 0x0002 assert Flags.RESET == 0x0004 assert Flags.SIGNATURE_INCLUDED == 0x0008 assert Flags.SIGNATURE_REQUESTED == 0x0010 assert Flags.FROM_INCLUDED == 0x0020 assert Flags.DELAY_REQUESTED == 0x0040 assert Flags.MAX_PACKET_SIZE_INCLUDED == 0x0080 assert Flags.PROFILE_INTERACTIVE == 0x0100 assert Flags.ECHO == 0x0200 assert Flags.NO_ACK == 0x0400 def test_combine_flags(self): from i2p_streaming.packet import Flags combined = Flags.SYNCHRONIZE | Flags.SIGNATURE_INCLUDED assert combined & Flags.SYNCHRONIZE assert combined & Flags.SIGNATURE_INCLUDED assert not (combined & Flags.CLOSE) class TestStreamPacket: def test_construct(self): from i2p_streaming.packet import StreamPacket pkt = StreamPacket( send_id=1, recv_id=2, seq_num=10, ack_through=9, flags=0x01, payload=b"hello" ) assert pkt.send_id == 1 assert pkt.recv_id == 2 assert pkt.seq_num == 10 assert pkt.ack_through == 9 assert pkt.payload == b"hello" def test_roundtrip_minimal(self): from i2p_streaming.packet import StreamPacket pkt = StreamPacket(send_id=100, recv_id=200, seq_num=1, ack_through=0, flags=0, payload=b"") data = pkt.to_bytes() pkt2 = StreamPacket.from_bytes(data) assert pkt2.send_id == 100 assert pkt2.recv_id == 200 assert pkt2.seq_num == 1 assert pkt2.ack_through == 0 assert pkt2.payload == b"" def test_roundtrip_with_payload(self): from i2p_streaming.packet import StreamPacket payload = os.urandom(128) pkt = StreamPacket(send_id=1, recv_id=2, seq_num=5, ack_through=4, flags=0x0001, payload=payload) data = pkt.to_bytes() pkt2 = StreamPacket.from_bytes(data) assert pkt2.payload == payload assert pkt2.flags == 0x0001 def test_roundtrip_with_nacks(self): from i2p_streaming.packet import StreamPacket pkt = StreamPacket(send_id=1, recv_id=2, seq_num=10, ack_through=8, flags=0, nacks=[3, 5, 7], payload=b"data") data = pkt.to_bytes() pkt2 = StreamPacket.from_bytes(data) assert pkt2.nacks == [3, 5, 7] assert pkt2.payload == b"data" def test_roundtrip_with_resend_delay(self): from i2p_streaming.packet import StreamPacket pkt = StreamPacket(send_id=1, recv_id=2, seq_num=1, ack_through=0, flags=0, resend_delay=5, payload=b"") data = pkt.to_bytes() pkt2 = StreamPacket.from_bytes(data) assert pkt2.resend_delay == 5 def test_roundtrip_with_options(self): from i2p_streaming.packet import StreamPacket options = os.urandom(20) pkt = StreamPacket(send_id=1, recv_id=2, seq_num=1, ack_through=0, flags=0, options=options, payload=b"x") data = pkt.to_bytes() pkt2 = StreamPacket.from_bytes(data) assert pkt2.options == options assert pkt2.payload == b"x" def test_syn_flag(self): from i2p_streaming.packet import StreamPacket, Flags pkt = StreamPacket(send_id=0, recv_id=0, seq_num=0, ack_through=0, flags=Flags.SYNCHRONIZE | Flags.SIGNATURE_REQUESTED, payload=b"") data = pkt.to_bytes() pkt2 = StreamPacket.from_bytes(data) assert pkt2.flags & Flags.SYNCHRONIZE assert pkt2.flags & Flags.SIGNATURE_REQUESTED def test_header_size_no_nacks_no_options(self): from i2p_streaming.packet import StreamPacket pkt = StreamPacket(send_id=0, recv_id=0, seq_num=0, ack_through=0, flags=0, payload=b"") data = pkt.to_bytes() # Header: 4+4+4+4+1+1+2+2 = 22 bytes minimum assert len(data) == 22 def test_large_payload(self): from i2p_streaming.packet import StreamPacket payload = os.urandom(1730) pkt = StreamPacket(send_id=1, recv_id=2, seq_num=100, ack_through=99, flags=0, payload=payload) data = pkt.to_bytes() pkt2 = StreamPacket.from_bytes(data) assert pkt2.payload == payload