Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2
3
4class Cached(object):
5 def __init__(self, timeout):
6 self.cache_db = {}
7 self.timeout = timeout
8
9 def __call__(self, func):
10 def wrapper(*args, **kwargs):
11 key = "%s %s" % (args, kwargs)
12 cached_value = None
13 cache_hit = False
14 if key in self.cache_db:
15 cache_hit = True
16 cached_value, time_cached_end = self.cache_db[key]
17 if time.time() > time_cached_end:
18 self.cleanupExpired()
19 cached_value = None
20 cache_hit = False
21
22 if cache_hit:
23 return cached_value
24 else:
25 cached_value = func(*args, **kwargs)
26 time_cached_end = time.time() + self.timeout
27 self.cache_db[key] = (cached_value, time_cached_end)
28 return cached_value
29
30 wrapper.emptyCache = self.emptyCache
31
32 return wrapper
33
34 def cleanupExpired(self):
35 for key in list(self.cache_db.keys()):
36 cached_value, time_cached_end = self.cache_db[key]
37 if time.time() > time_cached_end:
38 del(self.cache_db[key])
39
40 def emptyCache(self):
41 num = len(self.cache_db)
42 self.cache_db.clear()
43 return num
44
45
46if __name__ == "__main__":
47 from gevent import monkey
48 monkey.patch_all()
49
50 @Cached(timeout=2)
51 def calcAdd(a, b):
52 print("CalcAdd", a, b)
53 return a + b
54
55 @Cached(timeout=1)
56 def calcMultiply(a, b):
57 print("calcMultiply", a, b)
58 return a * b
59
60 for i in range(5):
61 print("---")
62 print("Emptied", calcAdd.emptyCache())
63 assert calcAdd(1, 2) == 3
64 print("Emptied", calcAdd.emptyCache())
65 assert calcAdd(1, 2) == 3
66 assert calcAdd(2, 3) == 5
67 assert calcMultiply(2, 3) == 6
68 time.sleep(1)