Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 128 lines 4.3 kB view raw
1import time 2import gevent 3import logging 4 5log = logging.getLogger("RateLimit") 6 7called_db = {} # Holds events last call time 8queue_db = {} # Commands queued to run 9 10# Register event as called 11# Return: None 12 13 14def called(event, penalty=0): 15 called_db[event] = time.time() + penalty 16 17 18# Check if calling event is allowed 19# Return: True if allowed False if not 20def isAllowed(event, allowed_again=10): 21 last_called = called_db.get(event) 22 if not last_called: # Its not called before 23 return True 24 elif time.time() - last_called >= allowed_again: 25 del called_db[event] # Delete last call time to save memory 26 return True 27 else: 28 return False 29 30def delayLeft(event, allowed_again=10): 31 last_called = called_db.get(event) 32 if not last_called: # Its not called before 33 return 0 34 else: 35 return allowed_again - (time.time() - last_called) 36 37def callQueue(event): 38 func, args, kwargs, thread = queue_db[event] 39 log.debug("Calling: %s" % event) 40 called(event) 41 del queue_db[event] 42 return func(*args, **kwargs) 43 44 45# Rate limit and delay function call if necessary 46# If the function called again within the rate limit interval then previous queued call will be dropped 47# Return: Immediately gevent thread 48def callAsync(event, allowed_again=10, func=None, *args, **kwargs): 49 if isAllowed(event, allowed_again): # Not called recently, call it now 50 called(event) 51 # print "Calling now" 52 return gevent.spawn(func, *args, **kwargs) 53 else: # Called recently, schedule it for later 54 time_left = allowed_again - max(0, time.time() - called_db[event]) 55 log.debug("Added to queue (%.2fs left): %s " % (time_left, event)) 56 if not queue_db.get(event): # Function call not queued yet 57 thread = gevent.spawn_later(time_left, lambda: callQueue(event)) # Call this function later 58 queue_db[event] = (func, args, kwargs, thread) 59 return thread 60 else: # Function call already queued, just update the parameters 61 thread = queue_db[event][3] 62 queue_db[event] = (func, args, kwargs, thread) 63 return thread 64 65 66# Rate limit and delay function call if needed 67# Return: Wait for execution/delay then return value 68def call(event, allowed_again=10, func=None, *args, **kwargs): 69 if isAllowed(event): # Not called recently, call it now 70 called(event) 71 # print "Calling now", allowed_again 72 return func(*args, **kwargs) 73 74 else: # Called recently, schedule it for later 75 time_left = max(0, allowed_again - (time.time() - called_db[event])) 76 # print "Time left: %s" % time_left, args, kwargs 77 log.debug("Calling sync (%.2fs left): %s" % (time_left, event)) 78 called(event, time_left) 79 time.sleep(time_left) 80 back = func(*args, **kwargs) 81 called(event) 82 return back 83 84 85# Cleanup expired events every 3 minutes 86def rateLimitCleanup(): 87 while 1: 88 expired = time.time() - 60 * 2 # Cleanup if older than 2 minutes 89 for event in list(called_db.keys()): 90 if called_db[event] < expired: 91 del called_db[event] 92 time.sleep(60 * 3) # Every 3 minutes 93gevent.spawn(rateLimitCleanup) 94 95 96if __name__ == "__main__": 97 from gevent import monkey 98 monkey.patch_all() 99 import random 100 101 def publish(inner_path): 102 print("Publishing %s..." % inner_path) 103 return 1 104 105 def cb(thread): 106 print("Value:", thread.value) 107 108 print("Testing async spam requests rate limit to 1/sec...") 109 for i in range(3000): 110 thread = callAsync("publish content.json", 1, publish, "content.json %s" % i) 111 time.sleep(float(random.randint(1, 20)) / 100000) 112 print(thread.link(cb)) 113 print("Done") 114 115 time.sleep(2) 116 117 print("Testing sync spam requests rate limit to 1/sec...") 118 for i in range(5): 119 call("publish data.json", 1, publish, "data.json %s" % i) 120 time.sleep(float(random.randint(1, 100)) / 100) 121 print("Done") 122 123 print("Testing cleanup") 124 thread = callAsync("publish content.json single", 1, publish, "content.json single") 125 print("Needs to cleanup:", called_db, queue_db) 126 print("Waiting 3min for cleanup process...") 127 time.sleep(60 * 3) 128 print("Cleaned up:", called_db, queue_db)