Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 68 lines 1.9 kB view raw
1import time 2 3 4class Cached(object): 5 def __init__(self, timeout): 6 self.cache_db = {} 7 self.timeout = timeout 8 9 def __call__(self, func): 10 def wrapper(*args, **kwargs): 11 key = "%s %s" % (args, kwargs) 12 cached_value = None 13 cache_hit = False 14 if key in self.cache_db: 15 cache_hit = True 16 cached_value, time_cached_end = self.cache_db[key] 17 if time.time() > time_cached_end: 18 self.cleanupExpired() 19 cached_value = None 20 cache_hit = False 21 22 if cache_hit: 23 return cached_value 24 else: 25 cached_value = func(*args, **kwargs) 26 time_cached_end = time.time() + self.timeout 27 self.cache_db[key] = (cached_value, time_cached_end) 28 return cached_value 29 30 wrapper.emptyCache = self.emptyCache 31 32 return wrapper 33 34 def cleanupExpired(self): 35 for key in list(self.cache_db.keys()): 36 cached_value, time_cached_end = self.cache_db[key] 37 if time.time() > time_cached_end: 38 del(self.cache_db[key]) 39 40 def emptyCache(self): 41 num = len(self.cache_db) 42 self.cache_db.clear() 43 return num 44 45 46if __name__ == "__main__": 47 from gevent import monkey 48 monkey.patch_all() 49 50 @Cached(timeout=2) 51 def calcAdd(a, b): 52 print("CalcAdd", a, b) 53 return a + b 54 55 @Cached(timeout=1) 56 def calcMultiply(a, b): 57 print("calcMultiply", a, b) 58 return a * b 59 60 for i in range(5): 61 print("---") 62 print("Emptied", calcAdd.emptyCache()) 63 assert calcAdd(1, 2) == 3 64 print("Emptied", calcAdd.emptyCache()) 65 assert calcAdd(1, 2) == 3 66 assert calcAdd(2, 3) == 5 67 assert calcMultiply(2, 3) == 6 68 time.sleep(1)