A Python port of the Invisible Internet Project (I2P)
1"""Tests for i2p_stat — RateStat, FrequencyStat, StatManager."""
2
3import time
4from i2p_stat.rate_stat import RateStat
5from i2p_stat.frequency import Frequency
6from i2p_stat.frequency_stat import FrequencyStat
7from i2p_stat.stat_manager import StatManager
8from i2p_stat.rate_averages import RateAverages
9
10
11class TestRateStat:
12 def test_create(self):
13 rs = RateStat("test", "desc", "group", [60000, 3600000])
14 assert rs.name == "test"
15 assert rs.group_name == "group"
16 assert rs.get_periods() == [60000, 3600000]
17
18 def test_add_data_to_all_rates(self):
19 rs = RateStat("test", "desc", "group", [60000, 3600000])
20 rs.add_data(100)
21 r1 = rs.get_rate(60000)
22 r2 = rs.get_rate(3600000)
23 assert r1.get_current_event_count() == 1
24 assert r2.get_current_event_count() == 1
25
26 def test_contains_rate(self):
27 rs = RateStat("test", "desc", "group", [60000])
28 assert rs.contains_rate(60000)
29 assert not rs.contains_rate(1000)
30
31 def test_get_rate_nonexistent(self):
32 rs = RateStat("test", "desc", "group", [60000])
33 assert rs.get_rate(999) is None
34
35 def test_lifetime(self):
36 rs = RateStat("test", "desc", "group", [60000])
37 rs.add_data(10)
38 rs.add_data(30)
39 assert rs.get_lifetime_event_count() == 2
40 assert rs.get_lifetime_average_value() == 20.0
41
42
43class TestFrequency:
44 def test_create(self):
45 f = Frequency(60000)
46 assert f.period == 60000
47 assert f.get_event_count() == 0
48
49 def test_event_occurred(self):
50 f = Frequency(60000)
51 f.event_occurred()
52 assert f.get_event_count() == 1
53
54 def test_average_interval_decreases_with_events(self):
55 f = Frequency(10000)
56 initial = f.get_average_interval()
57 time.sleep(0.01)
58 f.event_occurred()
59 time.sleep(0.01)
60 f.event_occurred()
61 after = f.get_average_interval()
62 # After events, interval should be less than initial (period+1)
63 assert after < initial
64
65 def test_events_per_period(self):
66 f = Frequency(1000) # 1s period
67 # Rapid events should show high events per period
68 for _ in range(10):
69 f.event_occurred()
70 time.sleep(0.005)
71 eps = f.get_average_events_per_period()
72 assert eps > 0
73
74
75class TestFrequencyStat:
76 def test_create(self):
77 fs = FrequencyStat("test", "desc", "group", [60000, 3600000])
78 assert fs.name == "test"
79 assert len(fs.get_periods()) == 2
80
81 def test_event_propagates(self):
82 fs = FrequencyStat("test", "desc", "group", [60000])
83 fs.event_occurred()
84 assert fs.get_event_count() == 1
85
86
87class TestStatManager:
88 def test_create_required_rate(self):
89 sm = StatManager()
90 sm.create_required_rate_stat("bw.send", "bytes sent", "bandwidth", [60000])
91 assert sm.is_rate("bw.send")
92 assert not sm.is_rate("nonexistent")
93
94 def test_create_rate_ignored_without_stat_full(self):
95 sm = StatManager(stat_full=False)
96 sm.create_rate_stat("optional", "desc", "group", [60000])
97 assert not sm.is_rate("optional")
98
99 def test_create_rate_with_stat_full(self):
100 sm = StatManager(stat_full=True)
101 sm.create_rate_stat("optional", "desc", "group", [60000])
102 assert sm.is_rate("optional")
103
104 def test_add_rate_data(self):
105 sm = StatManager()
106 sm.create_required_rate_stat("test", "desc", "group", [60000])
107 sm.add_rate_data("test", 100)
108 r = sm.get_rate("test")
109 assert r.get_rate(60000).get_current_event_count() == 1
110
111 def test_add_rate_data_nonexistent_is_noop(self):
112 sm = StatManager()
113 sm.add_rate_data("nonexistent", 100) # Should not raise
114
115 def test_frequency_stat(self):
116 sm = StatManager()
117 sm.create_required_frequency_stat("test.freq", "desc", "group", [60000])
118 assert sm.is_frequency("test.freq")
119 sm.update_frequency("test.freq")
120 assert sm.get_frequency("test.freq").get_event_count() == 1
121
122 def test_get_names(self):
123 sm = StatManager()
124 sm.create_required_rate_stat("r1", "d", "g", [60000])
125 sm.create_required_rate_stat("r2", "d", "g", [60000])
126 assert sm.get_rate_names() == {"r1", "r2"}
127
128 def test_remove_rate(self):
129 sm = StatManager()
130 sm.create_required_rate_stat("test", "d", "g", [60000])
131 sm.remove_rate_stat("test")
132 assert not sm.is_rate("test")
133
134 def test_shutdown(self):
135 sm = StatManager()
136 sm.create_required_rate_stat("test", "d", "g", [60000])
137 sm.shutdown()
138 assert len(sm.get_rate_names()) == 0
139
140 def test_stats_by_group(self):
141 sm = StatManager()
142 sm.create_required_rate_stat("r1", "d", "bandwidth", [60000])
143 sm.create_required_rate_stat("r2", "d", "bandwidth", [60000])
144 sm.create_required_rate_stat("r3", "d", "tunnel", [60000])
145 groups = sm.get_stats_by_group()
146 assert "bandwidth" in groups
147 assert len(groups["bandwidth"]) == 2
148 assert "tunnel" in groups
149
150 def test_coalesce_stats(self):
151 sm = StatManager()
152 sm.create_required_rate_stat("test", "d", "g", [100])
153 sm.add_rate_data("test", 50)
154 time.sleep(0.15)
155 sm.coalesce_stats()
156 r = sm.get_rate("test").get_rate(100)
157 assert r.get_last_event_count() > 0
158
159
160class TestRateAverages:
161 def test_get_temp_returns_same_instance(self):
162 a = RateAverages.get_temp()
163 b = RateAverages.get_temp()
164 assert a is b
165
166 def test_fields_default_zero(self):
167 ra = RateAverages()
168 assert ra.average == 0.0
169 assert ra.total_event_count == 0