A Python port of the Invisible Internet Project (I2P)
1"""Tests for stream connection options."""
2
3import pytest
4
5
6class TestStreamOptions:
7 def test_defaults(self):
8 from i2p_streaming.options import StreamOptions
9 opts = StreamOptions()
10 assert opts.initial_window_size == 6
11 assert opts.max_window_size == 128
12 assert opts.connect_timeout == 60000
13 assert opts.max_message_size == 1730
14 assert opts.inactivity_timeout == 90000
15
16 def test_override(self):
17 from i2p_streaming.options import StreamOptions
18 opts = StreamOptions(initial_window_size=10, max_window_size=64)
19 assert opts.initial_window_size == 10
20 assert opts.max_window_size == 64
21 # Non-overridden defaults
22 assert opts.connect_timeout == 60000
23
24 def test_read_timeout(self):
25 from i2p_streaming.options import StreamOptions
26 opts = StreamOptions(read_timeout=5000)
27 assert opts.read_timeout == 5000
28
29 def test_write_timeout(self):
30 from i2p_streaming.options import StreamOptions
31 opts = StreamOptions(write_timeout=3000)
32 assert opts.write_timeout == 3000
33
34 def test_merge(self):
35 from i2p_streaming.options import StreamOptions
36 opts = StreamOptions()
37 opts.merge(initial_window_size=12, max_message_size=1000)
38 assert opts.initial_window_size == 12
39 assert opts.max_message_size == 1000
40 assert opts.max_window_size == 128 # unchanged
41
42 def test_equality(self):
43 from i2p_streaming.options import StreamOptions
44 o1 = StreamOptions(initial_window_size=8)
45 o2 = StreamOptions(initial_window_size=8)
46 o3 = StreamOptions(initial_window_size=4)
47 assert o1 == o2
48 assert o1 != o3