A Python port of the Invisible Internet Project (I2P)
1"""Tests for stream packet types."""
2
3import os
4import struct
5
6import pytest
7
8
9class TestStreamFlags:
10 def test_flag_values(self):
11 from i2p_streaming.packet import Flags
12 assert Flags.SYNCHRONIZE == 0x0001
13 assert Flags.CLOSE == 0x0002
14 assert Flags.RESET == 0x0004
15 assert Flags.SIGNATURE_INCLUDED == 0x0008
16 assert Flags.SIGNATURE_REQUESTED == 0x0010
17 assert Flags.FROM_INCLUDED == 0x0020
18 assert Flags.DELAY_REQUESTED == 0x0040
19 assert Flags.MAX_PACKET_SIZE_INCLUDED == 0x0080
20 assert Flags.PROFILE_INTERACTIVE == 0x0100
21 assert Flags.ECHO == 0x0200
22 assert Flags.NO_ACK == 0x0400
23
24 def test_combine_flags(self):
25 from i2p_streaming.packet import Flags
26 combined = Flags.SYNCHRONIZE | Flags.SIGNATURE_INCLUDED
27 assert combined & Flags.SYNCHRONIZE
28 assert combined & Flags.SIGNATURE_INCLUDED
29 assert not (combined & Flags.CLOSE)
30
31
32class TestStreamPacket:
33 def test_construct(self):
34 from i2p_streaming.packet import StreamPacket
35 pkt = StreamPacket(
36 send_id=1, recv_id=2, seq_num=10, ack_through=9,
37 flags=0x01, payload=b"hello"
38 )
39 assert pkt.send_id == 1
40 assert pkt.recv_id == 2
41 assert pkt.seq_num == 10
42 assert pkt.ack_through == 9
43 assert pkt.payload == b"hello"
44
45 def test_roundtrip_minimal(self):
46 from i2p_streaming.packet import StreamPacket
47 pkt = StreamPacket(send_id=100, recv_id=200, seq_num=1, ack_through=0,
48 flags=0, payload=b"")
49 data = pkt.to_bytes()
50 pkt2 = StreamPacket.from_bytes(data)
51 assert pkt2.send_id == 100
52 assert pkt2.recv_id == 200
53 assert pkt2.seq_num == 1
54 assert pkt2.ack_through == 0
55 assert pkt2.payload == b""
56
57 def test_roundtrip_with_payload(self):
58 from i2p_streaming.packet import StreamPacket
59 payload = os.urandom(128)
60 pkt = StreamPacket(send_id=1, recv_id=2, seq_num=5, ack_through=4,
61 flags=0x0001, payload=payload)
62 data = pkt.to_bytes()
63 pkt2 = StreamPacket.from_bytes(data)
64 assert pkt2.payload == payload
65 assert pkt2.flags == 0x0001
66
67 def test_roundtrip_with_nacks(self):
68 from i2p_streaming.packet import StreamPacket
69 pkt = StreamPacket(send_id=1, recv_id=2, seq_num=10, ack_through=8,
70 flags=0, nacks=[3, 5, 7], payload=b"data")
71 data = pkt.to_bytes()
72 pkt2 = StreamPacket.from_bytes(data)
73 assert pkt2.nacks == [3, 5, 7]
74 assert pkt2.payload == b"data"
75
76 def test_roundtrip_with_resend_delay(self):
77 from i2p_streaming.packet import StreamPacket
78 pkt = StreamPacket(send_id=1, recv_id=2, seq_num=1, ack_through=0,
79 flags=0, resend_delay=5, payload=b"")
80 data = pkt.to_bytes()
81 pkt2 = StreamPacket.from_bytes(data)
82 assert pkt2.resend_delay == 5
83
84 def test_roundtrip_with_options(self):
85 from i2p_streaming.packet import StreamPacket
86 options = os.urandom(20)
87 pkt = StreamPacket(send_id=1, recv_id=2, seq_num=1, ack_through=0,
88 flags=0, options=options, payload=b"x")
89 data = pkt.to_bytes()
90 pkt2 = StreamPacket.from_bytes(data)
91 assert pkt2.options == options
92 assert pkt2.payload == b"x"
93
94 def test_syn_flag(self):
95 from i2p_streaming.packet import StreamPacket, Flags
96 pkt = StreamPacket(send_id=0, recv_id=0, seq_num=0, ack_through=0,
97 flags=Flags.SYNCHRONIZE | Flags.SIGNATURE_REQUESTED,
98 payload=b"")
99 data = pkt.to_bytes()
100 pkt2 = StreamPacket.from_bytes(data)
101 assert pkt2.flags & Flags.SYNCHRONIZE
102 assert pkt2.flags & Flags.SIGNATURE_REQUESTED
103
104 def test_header_size_no_nacks_no_options(self):
105 from i2p_streaming.packet import StreamPacket
106 pkt = StreamPacket(send_id=0, recv_id=0, seq_num=0, ack_through=0,
107 flags=0, payload=b"")
108 data = pkt.to_bytes()
109 # Header: 4+4+4+4+1+1+2+2 = 22 bytes minimum
110 assert len(data) == 22
111
112 def test_large_payload(self):
113 from i2p_streaming.packet import StreamPacket
114 payload = os.urandom(1730)
115 pkt = StreamPacket(send_id=1, recv_id=2, seq_num=100, ack_through=99,
116 flags=0, payload=payload)
117 data = pkt.to_bytes()
118 pkt2 = StreamPacket.from_bytes(data)
119 assert pkt2.payload == payload