A Python port of the Invisible Internet Project (I2P)
1"""Tests for RouterThrottleImpl."""
2
3from i2p_router.throttle import RouterThrottleImpl, TunnelRequestStatus
4
5
6def test_accept_when_idle():
7 t = RouterThrottleImpl()
8 assert t.accept_tunnel_request() == TunnelRequestStatus.ACCEPT
9
10
11def test_reject_over_max_tunnels():
12 t = RouterThrottleImpl()
13 t.set_participating_count(10_001)
14 assert t.accept_tunnel_request() == TunnelRequestStatus.REJECT_OVERLOADED
15
16
17def test_reject_high_bandwidth():
18 t = RouterThrottleImpl()
19 t.set_bandwidth_load(0.95)
20 assert t.accept_tunnel_request() == TunnelRequestStatus.REJECT_BANDWIDTH
21
22
23def test_reject_high_job_lag():
24 t = RouterThrottleImpl()
25 t.set_job_lag_ms(2000)
26 assert t.accept_tunnel_request() == TunnelRequestStatus.REJECT_TRANSIENT
27
28
29def test_accept_message_normal():
30 t = RouterThrottleImpl()
31 assert t.accept_network_message() is True
32
33
34def test_reject_message_overloaded():
35 t = RouterThrottleImpl()
36 t.set_bandwidth_load(0.96)
37 assert t.accept_network_message() is False
38
39
40def test_reject_message_high_delay():
41 t = RouterThrottleImpl()
42 t.set_message_delay_ms(3500)
43 assert t.accept_network_message() is False
44
45
46def test_status_string():
47 t = RouterThrottleImpl()
48 t.set_participating_count(50)
49 s = t.get_status()
50 assert "tunnels=50" in s
51 assert "bw=" in s