Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2
3import gevent
4
5from util import RateLimit
6
7
8# Time is around limit +/- 0.05 sec
9def around(t, limit):
10 return t >= limit - 0.05 and t <= limit + 0.05
11
12
13class ExampleClass(object):
14 def __init__(self):
15 self.counted = 0
16 self.last_called = None
17
18 def count(self, back="counted"):
19 self.counted += 1
20 self.last_called = back
21 return back
22
23
24class TestRateLimit:
25 def testCall(self):
26 obj1 = ExampleClass()
27 obj2 = ExampleClass()
28
29 s = time.time()
30 assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted"
31 assert around(time.time() - s, 0.0) # First allow to call instantly
32 assert obj1.counted == 1
33
34 # Call again
35 assert not RateLimit.isAllowed("counting", 0.1)
36 assert RateLimit.isAllowed("something else", 0.1)
37 assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted"
38 assert around(time.time() - s, 0.1) # Delays second call within interval
39 assert obj1.counted == 2
40 time.sleep(0.1) # Wait the cooldown time
41
42 # Call 3 times async
43 s = time.time()
44 assert obj2.counted == 0
45 threads = [
46 gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # Instant
47 gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # 0.1s delay
48 gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)) # 0.2s delay
49 ]
50 gevent.joinall(threads)
51 assert [thread.value for thread in threads] == ["counted", "counted", "counted"]
52 assert around(time.time() - s, 0.2)
53
54 # Wait 0.1s cooldown
55 assert not RateLimit.isAllowed("counting", 0.1)
56 time.sleep(0.11)
57 assert RateLimit.isAllowed("counting", 0.1)
58
59 # No queue = instant again
60 s = time.time()
61 assert RateLimit.isAllowed("counting", 0.1)
62 assert RateLimit.call("counting", allowed_again=0.1, func=obj2.count) == "counted"
63 assert around(time.time() - s, 0.0)
64
65 assert obj2.counted == 4
66
67 def testCallAsync(self):
68 obj1 = ExampleClass()
69 obj2 = ExampleClass()
70
71 s = time.time()
72 RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #1").join()
73 assert obj1.counted == 1 # First instant
74 assert around(time.time() - s, 0.0)
75
76 # After that the calls delayed
77 s = time.time()
78 t1 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #2") # Dumped by the next call
79 time.sleep(0.03)
80 t2 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #3") # Dumped by the next call
81 time.sleep(0.03)
82 t3 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #4") # Will be called
83 assert obj1.counted == 1 # Delay still in progress: Not called yet
84 t3.join()
85 assert t3.value == "call #4"
86 assert around(time.time() - s, 0.1)
87
88 # Only the last one called
89 assert obj1.counted == 2
90 assert obj1.last_called == "call #4"
91
92 # Just called, not allowed again
93 assert not RateLimit.isAllowed("counting async", 0.1)
94 s = time.time()
95 t4 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #5").join()
96 assert obj1.counted == 3
97 assert around(time.time() - s, 0.1)
98 assert not RateLimit.isAllowed("counting async", 0.1)
99 time.sleep(0.11)
100 assert RateLimit.isAllowed("counting async", 0.1)