Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 100 lines 3.7 kB view raw
1import time 2 3import gevent 4 5from util import RateLimit 6 7 8# Time is around limit +/- 0.05 sec 9def around(t, limit): 10 return t >= limit - 0.05 and t <= limit + 0.05 11 12 13class ExampleClass(object): 14 def __init__(self): 15 self.counted = 0 16 self.last_called = None 17 18 def count(self, back="counted"): 19 self.counted += 1 20 self.last_called = back 21 return back 22 23 24class TestRateLimit: 25 def testCall(self): 26 obj1 = ExampleClass() 27 obj2 = ExampleClass() 28 29 s = time.time() 30 assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted" 31 assert around(time.time() - s, 0.0) # First allow to call instantly 32 assert obj1.counted == 1 33 34 # Call again 35 assert not RateLimit.isAllowed("counting", 0.1) 36 assert RateLimit.isAllowed("something else", 0.1) 37 assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted" 38 assert around(time.time() - s, 0.1) # Delays second call within interval 39 assert obj1.counted == 2 40 time.sleep(0.1) # Wait the cooldown time 41 42 # Call 3 times async 43 s = time.time() 44 assert obj2.counted == 0 45 threads = [ 46 gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # Instant 47 gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # 0.1s delay 48 gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)) # 0.2s delay 49 ] 50 gevent.joinall(threads) 51 assert [thread.value for thread in threads] == ["counted", "counted", "counted"] 52 assert around(time.time() - s, 0.2) 53 54 # Wait 0.1s cooldown 55 assert not RateLimit.isAllowed("counting", 0.1) 56 time.sleep(0.11) 57 assert RateLimit.isAllowed("counting", 0.1) 58 59 # No queue = instant again 60 s = time.time() 61 assert RateLimit.isAllowed("counting", 0.1) 62 assert RateLimit.call("counting", allowed_again=0.1, func=obj2.count) == "counted" 63 assert around(time.time() - s, 0.0) 64 65 assert obj2.counted == 4 66 67 def testCallAsync(self): 68 obj1 = ExampleClass() 69 obj2 = ExampleClass() 70 71 s = time.time() 72 RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #1").join() 73 assert obj1.counted == 1 # First instant 74 assert around(time.time() - s, 0.0) 75 76 # After that the calls delayed 77 s = time.time() 78 t1 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #2") # Dumped by the next call 79 time.sleep(0.03) 80 t2 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #3") # Dumped by the next call 81 time.sleep(0.03) 82 t3 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #4") # Will be called 83 assert obj1.counted == 1 # Delay still in progress: Not called yet 84 t3.join() 85 assert t3.value == "call #4" 86 assert around(time.time() - s, 0.1) 87 88 # Only the last one called 89 assert obj1.counted == 2 90 assert obj1.last_called == "call #4" 91 92 # Just called, not allowed again 93 assert not RateLimit.isAllowed("counting async", 0.1) 94 s = time.time() 95 t4 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #5").join() 96 assert obj1.counted == 3 97 assert around(time.time() - s, 0.1) 98 assert not RateLimit.isAllowed("counting async", 0.1) 99 time.sleep(0.11) 100 assert RateLimit.isAllowed("counting async", 0.1)