A Python port of the Invisible Internet Project (I2P)
at main 169 lines 5.8 kB view raw
1"""Tests for i2p_stat — RateStat, FrequencyStat, StatManager.""" 2 3import time 4from i2p_stat.rate_stat import RateStat 5from i2p_stat.frequency import Frequency 6from i2p_stat.frequency_stat import FrequencyStat 7from i2p_stat.stat_manager import StatManager 8from i2p_stat.rate_averages import RateAverages 9 10 11class TestRateStat: 12 def test_create(self): 13 rs = RateStat("test", "desc", "group", [60000, 3600000]) 14 assert rs.name == "test" 15 assert rs.group_name == "group" 16 assert rs.get_periods() == [60000, 3600000] 17 18 def test_add_data_to_all_rates(self): 19 rs = RateStat("test", "desc", "group", [60000, 3600000]) 20 rs.add_data(100) 21 r1 = rs.get_rate(60000) 22 r2 = rs.get_rate(3600000) 23 assert r1.get_current_event_count() == 1 24 assert r2.get_current_event_count() == 1 25 26 def test_contains_rate(self): 27 rs = RateStat("test", "desc", "group", [60000]) 28 assert rs.contains_rate(60000) 29 assert not rs.contains_rate(1000) 30 31 def test_get_rate_nonexistent(self): 32 rs = RateStat("test", "desc", "group", [60000]) 33 assert rs.get_rate(999) is None 34 35 def test_lifetime(self): 36 rs = RateStat("test", "desc", "group", [60000]) 37 rs.add_data(10) 38 rs.add_data(30) 39 assert rs.get_lifetime_event_count() == 2 40 assert rs.get_lifetime_average_value() == 20.0 41 42 43class TestFrequency: 44 def test_create(self): 45 f = Frequency(60000) 46 assert f.period == 60000 47 assert f.get_event_count() == 0 48 49 def test_event_occurred(self): 50 f = Frequency(60000) 51 f.event_occurred() 52 assert f.get_event_count() == 1 53 54 def test_average_interval_decreases_with_events(self): 55 f = Frequency(10000) 56 initial = f.get_average_interval() 57 time.sleep(0.01) 58 f.event_occurred() 59 time.sleep(0.01) 60 f.event_occurred() 61 after = f.get_average_interval() 62 # After events, interval should be less than initial (period+1) 63 assert after < initial 64 65 def test_events_per_period(self): 66 f = Frequency(1000) # 1s period 67 # Rapid events should show high events per period 68 for _ in range(10): 69 f.event_occurred() 70 time.sleep(0.005) 71 eps = f.get_average_events_per_period() 72 assert eps > 0 73 74 75class TestFrequencyStat: 76 def test_create(self): 77 fs = FrequencyStat("test", "desc", "group", [60000, 3600000]) 78 assert fs.name == "test" 79 assert len(fs.get_periods()) == 2 80 81 def test_event_propagates(self): 82 fs = FrequencyStat("test", "desc", "group", [60000]) 83 fs.event_occurred() 84 assert fs.get_event_count() == 1 85 86 87class TestStatManager: 88 def test_create_required_rate(self): 89 sm = StatManager() 90 sm.create_required_rate_stat("bw.send", "bytes sent", "bandwidth", [60000]) 91 assert sm.is_rate("bw.send") 92 assert not sm.is_rate("nonexistent") 93 94 def test_create_rate_ignored_without_stat_full(self): 95 sm = StatManager(stat_full=False) 96 sm.create_rate_stat("optional", "desc", "group", [60000]) 97 assert not sm.is_rate("optional") 98 99 def test_create_rate_with_stat_full(self): 100 sm = StatManager(stat_full=True) 101 sm.create_rate_stat("optional", "desc", "group", [60000]) 102 assert sm.is_rate("optional") 103 104 def test_add_rate_data(self): 105 sm = StatManager() 106 sm.create_required_rate_stat("test", "desc", "group", [60000]) 107 sm.add_rate_data("test", 100) 108 r = sm.get_rate("test") 109 assert r.get_rate(60000).get_current_event_count() == 1 110 111 def test_add_rate_data_nonexistent_is_noop(self): 112 sm = StatManager() 113 sm.add_rate_data("nonexistent", 100) # Should not raise 114 115 def test_frequency_stat(self): 116 sm = StatManager() 117 sm.create_required_frequency_stat("test.freq", "desc", "group", [60000]) 118 assert sm.is_frequency("test.freq") 119 sm.update_frequency("test.freq") 120 assert sm.get_frequency("test.freq").get_event_count() == 1 121 122 def test_get_names(self): 123 sm = StatManager() 124 sm.create_required_rate_stat("r1", "d", "g", [60000]) 125 sm.create_required_rate_stat("r2", "d", "g", [60000]) 126 assert sm.get_rate_names() == {"r1", "r2"} 127 128 def test_remove_rate(self): 129 sm = StatManager() 130 sm.create_required_rate_stat("test", "d", "g", [60000]) 131 sm.remove_rate_stat("test") 132 assert not sm.is_rate("test") 133 134 def test_shutdown(self): 135 sm = StatManager() 136 sm.create_required_rate_stat("test", "d", "g", [60000]) 137 sm.shutdown() 138 assert len(sm.get_rate_names()) == 0 139 140 def test_stats_by_group(self): 141 sm = StatManager() 142 sm.create_required_rate_stat("r1", "d", "bandwidth", [60000]) 143 sm.create_required_rate_stat("r2", "d", "bandwidth", [60000]) 144 sm.create_required_rate_stat("r3", "d", "tunnel", [60000]) 145 groups = sm.get_stats_by_group() 146 assert "bandwidth" in groups 147 assert len(groups["bandwidth"]) == 2 148 assert "tunnel" in groups 149 150 def test_coalesce_stats(self): 151 sm = StatManager() 152 sm.create_required_rate_stat("test", "d", "g", [100]) 153 sm.add_rate_data("test", 50) 154 time.sleep(0.15) 155 sm.coalesce_stats() 156 r = sm.get_rate("test").get_rate(100) 157 assert r.get_last_event_count() > 0 158 159 160class TestRateAverages: 161 def test_get_temp_returns_same_instance(self): 162 a = RateAverages.get_temp() 163 b = RateAverages.get_temp() 164 assert a is b 165 166 def test_fields_default_zero(self): 167 ra = RateAverages() 168 assert ra.average == 0.0 169 assert ra.total_event_count == 0