Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import gevent
2import time
3from gevent.event import AsyncResult
4
5from . import ThreadPool
6
7
8class Noparallel: # Only allow function running once in same time
9
10 def __init__(self, blocking=True, ignore_args=False, ignore_class=False, queue=False):
11 self.threads = {}
12 self.blocking = blocking # Blocking: Acts like normal function else thread returned
13 self.queue = queue # Execute again when blocking is done
14 self.queued = False
15 self.ignore_args = ignore_args # Block does not depend on function call arguments
16 self.ignore_class = ignore_class # Block does not depeds on class instance
17
18 def __call__(self, func):
19 def wrapper(*args, **kwargs):
20 if not ThreadPool.isMainThread():
21 return ThreadPool.main_loop.call(wrapper, *args, **kwargs)
22
23 if self.ignore_class:
24 key = func # Unique key only by function and class object
25 elif self.ignore_args:
26 key = (func, args[0]) # Unique key only by function and class object
27 else:
28 key = (func, tuple(args), str(kwargs)) # Unique key for function including parameters
29 if key in self.threads: # Thread already running (if using blocking mode)
30 if self.queue:
31 self.queued = True
32 thread = self.threads[key]
33 if self.blocking:
34 if self.queued:
35 res = thread.get() # Blocking until its finished
36 if key in self.threads:
37 return self.threads[key].get() # Queue finished since started running
38 self.queued = False
39 return wrapper(*args, **kwargs) # Run again after the end
40 else:
41 return thread.get() # Return the value
42
43 else: # No blocking
44 if thread.ready(): # Its finished, create a new
45 thread = gevent.spawn(func, *args, **kwargs)
46 self.threads[key] = thread
47 return thread
48 else: # Still running
49 return thread
50 else: # Thread not running
51 if self.blocking: # Wait for finish
52 asyncres = AsyncResult()
53 self.threads[key] = asyncres
54 try:
55 res = func(*args, **kwargs)
56 asyncres.set(res)
57 self.cleanup(key, asyncres)
58 return res
59 except Exception as err:
60 asyncres.set_exception(err)
61 self.cleanup(key, asyncres)
62 raise(err)
63 else: # No blocking just return the thread
64 thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread
65 thread.link(lambda thread: self.cleanup(key, thread))
66 self.threads[key] = thread
67 return thread
68 wrapper.__name__ = func.__name__
69
70 return wrapper
71
72 # Cleanup finished threads
73 def cleanup(self, key, thread):
74 if key in self.threads:
75 del(self.threads[key])
76
77
78if __name__ == "__main__":
79
80
81 class Test():
82
83 @Noparallel()
84 def count(self, num=5):
85 for i in range(num):
86 print(self, i)
87 time.sleep(1)
88 return "%s return:%s" % (self, i)
89
90 class TestNoblock():
91
92 @Noparallel(blocking=False)
93 def count(self, num=5):
94 for i in range(num):
95 print(self, i)
96 time.sleep(1)
97 return "%s return:%s" % (self, i)
98
99 def testBlocking():
100 test = Test()
101 test2 = Test()
102 print("Counting...")
103 print("Creating class1/thread1")
104 thread1 = gevent.spawn(test.count)
105 print("Creating class1/thread2 (ignored)")
106 thread2 = gevent.spawn(test.count)
107 print("Creating class2/thread3")
108 thread3 = gevent.spawn(test2.count)
109
110 print("Joining class1/thread1")
111 thread1.join()
112 print("Joining class1/thread2")
113 thread2.join()
114 print("Joining class2/thread3")
115 thread3.join()
116
117 print("Creating class1/thread4 (its finished, allowed again)")
118 thread4 = gevent.spawn(test.count)
119 print("Joining thread4")
120 thread4.join()
121
122 print(thread1.value, thread2.value, thread3.value, thread4.value)
123 print("Done.")
124
125 def testNoblocking():
126 test = TestNoblock()
127 test2 = TestNoblock()
128 print("Creating class1/thread1")
129 thread1 = test.count()
130 print("Creating class1/thread2 (ignored)")
131 thread2 = test.count()
132 print("Creating class2/thread3")
133 thread3 = test2.count()
134 print("Joining class1/thread1")
135 thread1.join()
136 print("Joining class1/thread2")
137 thread2.join()
138 print("Joining class2/thread3")
139 thread3.join()
140
141 print("Creating class1/thread4 (its finished, allowed again)")
142 thread4 = test.count()
143 print("Joining thread4")
144 thread4.join()
145
146 print(thread1.value, thread2.value, thread3.value, thread4.value)
147 print("Done.")
148
149 def testBenchmark():
150 import time
151
152 def printThreadNum():
153 import gc
154 from greenlet import greenlet
155 objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
156 print("Greenlets: %s" % len(objs))
157
158 printThreadNum()
159 test = TestNoblock()
160 s = time.time()
161 for i in range(3):
162 gevent.spawn(test.count, i + 1)
163 print("Created in %.3fs" % (time.time() - s))
164 printThreadNum()
165 time.sleep(5)
166
167 def testException():
168 import time
169 @Noparallel(blocking=True, queue=True)
170 def count(self, num=5):
171 s = time.time()
172 # raise Exception("err")
173 for i in range(num):
174 print(self, i)
175 time.sleep(1)
176 return "%s return:%s" % (s, i)
177 def caller():
178 try:
179 print("Ret:", count(5))
180 except Exception as err:
181 print("Raised:", repr(err))
182
183 gevent.joinall([
184 gevent.spawn(caller),
185 gevent.spawn(caller),
186 gevent.spawn(caller),
187 gevent.spawn(caller)
188 ])
189
190
191 from gevent import monkey
192 monkey.patch_all()
193
194 testException()
195
196 """
197 testBenchmark()
198 print("Testing blocking mode...")
199 testBlocking()
200 print("Testing noblocking mode...")
201 testNoblocking()
202 """