A Python port of the Invisible Internet Project (I2P)
at main 51 lines 1.3 kB view raw
1"""Tests for RouterThrottleImpl.""" 2 3from i2p_router.throttle import RouterThrottleImpl, TunnelRequestStatus 4 5 6def test_accept_when_idle(): 7 t = RouterThrottleImpl() 8 assert t.accept_tunnel_request() == TunnelRequestStatus.ACCEPT 9 10 11def test_reject_over_max_tunnels(): 12 t = RouterThrottleImpl() 13 t.set_participating_count(10_001) 14 assert t.accept_tunnel_request() == TunnelRequestStatus.REJECT_OVERLOADED 15 16 17def test_reject_high_bandwidth(): 18 t = RouterThrottleImpl() 19 t.set_bandwidth_load(0.95) 20 assert t.accept_tunnel_request() == TunnelRequestStatus.REJECT_BANDWIDTH 21 22 23def test_reject_high_job_lag(): 24 t = RouterThrottleImpl() 25 t.set_job_lag_ms(2000) 26 assert t.accept_tunnel_request() == TunnelRequestStatus.REJECT_TRANSIENT 27 28 29def test_accept_message_normal(): 30 t = RouterThrottleImpl() 31 assert t.accept_network_message() is True 32 33 34def test_reject_message_overloaded(): 35 t = RouterThrottleImpl() 36 t.set_bandwidth_load(0.96) 37 assert t.accept_network_message() is False 38 39 40def test_reject_message_high_delay(): 41 t = RouterThrottleImpl() 42 t.set_message_delay_ms(3500) 43 assert t.accept_network_message() is False 44 45 46def test_status_string(): 47 t = RouterThrottleImpl() 48 t.set_participating_count(50) 49 s = t.get_status() 50 assert "tunnels=50" in s 51 assert "bw=" in s