Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import gevent
3import logging
4
5log = logging.getLogger("RateLimit")
6
7called_db = {} # Holds events last call time
8queue_db = {} # Commands queued to run
9
10# Register event as called
11# Return: None
12
13
14def called(event, penalty=0):
15 called_db[event] = time.time() + penalty
16
17
18# Check if calling event is allowed
19# Return: True if allowed False if not
20def isAllowed(event, allowed_again=10):
21 last_called = called_db.get(event)
22 if not last_called: # Its not called before
23 return True
24 elif time.time() - last_called >= allowed_again:
25 del called_db[event] # Delete last call time to save memory
26 return True
27 else:
28 return False
29
30def delayLeft(event, allowed_again=10):
31 last_called = called_db.get(event)
32 if not last_called: # Its not called before
33 return 0
34 else:
35 return allowed_again - (time.time() - last_called)
36
37def callQueue(event):
38 func, args, kwargs, thread = queue_db[event]
39 log.debug("Calling: %s" % event)
40 called(event)
41 del queue_db[event]
42 return func(*args, **kwargs)
43
44
45# Rate limit and delay function call if necessary
46# If the function called again within the rate limit interval then previous queued call will be dropped
47# Return: Immediately gevent thread
48def callAsync(event, allowed_again=10, func=None, *args, **kwargs):
49 if isAllowed(event, allowed_again): # Not called recently, call it now
50 called(event)
51 # print "Calling now"
52 return gevent.spawn(func, *args, **kwargs)
53 else: # Called recently, schedule it for later
54 time_left = allowed_again - max(0, time.time() - called_db[event])
55 log.debug("Added to queue (%.2fs left): %s " % (time_left, event))
56 if not queue_db.get(event): # Function call not queued yet
57 thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later
58 queue_db[event] = (func, args, kwargs, thread)
59 return thread
60 else: # Function call already queued, just update the parameters
61 thread = queue_db[event][3]
62 queue_db[event] = (func, args, kwargs, thread)
63 return thread
64
65
66# Rate limit and delay function call if needed
67# Return: Wait for execution/delay then return value
68def call(event, allowed_again=10, func=None, *args, **kwargs):
69 if isAllowed(event): # Not called recently, call it now
70 called(event)
71 # print "Calling now", allowed_again
72 return func(*args, **kwargs)
73
74 else: # Called recently, schedule it for later
75 time_left = max(0, allowed_again - (time.time() - called_db[event]))
76 # print "Time left: %s" % time_left, args, kwargs
77 log.debug("Calling sync (%.2fs left): %s" % (time_left, event))
78 called(event, time_left)
79 time.sleep(time_left)
80 back = func(*args, **kwargs)
81 called(event)
82 return back
83
84
85# Cleanup expired events every 3 minutes
86def rateLimitCleanup():
87 while 1:
88 expired = time.time() - 60 * 2 # Cleanup if older than 2 minutes
89 for event in list(called_db.keys()):
90 if called_db[event] < expired:
91 del called_db[event]
92 time.sleep(60 * 3) # Every 3 minutes
93gevent.spawn(rateLimitCleanup)
94
95
96if __name__ == "__main__":
97 from gevent import monkey
98 monkey.patch_all()
99 import random
100
101 def publish(inner_path):
102 print("Publishing %s..." % inner_path)
103 return 1
104
105 def cb(thread):
106 print("Value:", thread.value)
107
108 print("Testing async spam requests rate limit to 1/sec...")
109 for i in range(3000):
110 thread = callAsync("publish content.json", 1, publish, "content.json %s" % i)
111 time.sleep(float(random.randint(1, 20)) / 100000)
112 print(thread.link(cb))
113 print("Done")
114
115 time.sleep(2)
116
117 print("Testing sync spam requests rate limit to 1/sec...")
118 for i in range(5):
119 call("publish data.json", 1, publish, "data.json %s" % i)
120 time.sleep(float(random.randint(1, 100)) / 100)
121 print("Done")
122
123 print("Testing cleanup")
124 thread = callAsync("publish content.json single", 1, publish, "content.json single")
125 print("Needs to cleanup:", called_db, queue_db)
126 print("Waiting 3min for cleanup process...")
127 time.sleep(60 * 3)
128 print("Cleaned up:", called_db, queue_db)