Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 202 lines 6.8 kB view raw
1import gevent 2import time 3from gevent.event import AsyncResult 4 5from . import ThreadPool 6 7 8class Noparallel: # Only allow function running once in same time 9 10 def __init__(self, blocking=True, ignore_args=False, ignore_class=False, queue=False): 11 self.threads = {} 12 self.blocking = blocking # Blocking: Acts like normal function else thread returned 13 self.queue = queue # Execute again when blocking is done 14 self.queued = False 15 self.ignore_args = ignore_args # Block does not depend on function call arguments 16 self.ignore_class = ignore_class # Block does not depeds on class instance 17 18 def __call__(self, func): 19 def wrapper(*args, **kwargs): 20 if not ThreadPool.isMainThread(): 21 return ThreadPool.main_loop.call(wrapper, *args, **kwargs) 22 23 if self.ignore_class: 24 key = func # Unique key only by function and class object 25 elif self.ignore_args: 26 key = (func, args[0]) # Unique key only by function and class object 27 else: 28 key = (func, tuple(args), str(kwargs)) # Unique key for function including parameters 29 if key in self.threads: # Thread already running (if using blocking mode) 30 if self.queue: 31 self.queued = True 32 thread = self.threads[key] 33 if self.blocking: 34 if self.queued: 35 res = thread.get() # Blocking until its finished 36 if key in self.threads: 37 return self.threads[key].get() # Queue finished since started running 38 self.queued = False 39 return wrapper(*args, **kwargs) # Run again after the end 40 else: 41 return thread.get() # Return the value 42 43 else: # No blocking 44 if thread.ready(): # Its finished, create a new 45 thread = gevent.spawn(func, *args, **kwargs) 46 self.threads[key] = thread 47 return thread 48 else: # Still running 49 return thread 50 else: # Thread not running 51 if self.blocking: # Wait for finish 52 asyncres = AsyncResult() 53 self.threads[key] = asyncres 54 try: 55 res = func(*args, **kwargs) 56 asyncres.set(res) 57 self.cleanup(key, asyncres) 58 return res 59 except Exception as err: 60 asyncres.set_exception(err) 61 self.cleanup(key, asyncres) 62 raise(err) 63 else: # No blocking just return the thread 64 thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread 65 thread.link(lambda thread: self.cleanup(key, thread)) 66 self.threads[key] = thread 67 return thread 68 wrapper.__name__ = func.__name__ 69 70 return wrapper 71 72 # Cleanup finished threads 73 def cleanup(self, key, thread): 74 if key in self.threads: 75 del(self.threads[key]) 76 77 78if __name__ == "__main__": 79 80 81 class Test(): 82 83 @Noparallel() 84 def count(self, num=5): 85 for i in range(num): 86 print(self, i) 87 time.sleep(1) 88 return "%s return:%s" % (self, i) 89 90 class TestNoblock(): 91 92 @Noparallel(blocking=False) 93 def count(self, num=5): 94 for i in range(num): 95 print(self, i) 96 time.sleep(1) 97 return "%s return:%s" % (self, i) 98 99 def testBlocking(): 100 test = Test() 101 test2 = Test() 102 print("Counting...") 103 print("Creating class1/thread1") 104 thread1 = gevent.spawn(test.count) 105 print("Creating class1/thread2 (ignored)") 106 thread2 = gevent.spawn(test.count) 107 print("Creating class2/thread3") 108 thread3 = gevent.spawn(test2.count) 109 110 print("Joining class1/thread1") 111 thread1.join() 112 print("Joining class1/thread2") 113 thread2.join() 114 print("Joining class2/thread3") 115 thread3.join() 116 117 print("Creating class1/thread4 (its finished, allowed again)") 118 thread4 = gevent.spawn(test.count) 119 print("Joining thread4") 120 thread4.join() 121 122 print(thread1.value, thread2.value, thread3.value, thread4.value) 123 print("Done.") 124 125 def testNoblocking(): 126 test = TestNoblock() 127 test2 = TestNoblock() 128 print("Creating class1/thread1") 129 thread1 = test.count() 130 print("Creating class1/thread2 (ignored)") 131 thread2 = test.count() 132 print("Creating class2/thread3") 133 thread3 = test2.count() 134 print("Joining class1/thread1") 135 thread1.join() 136 print("Joining class1/thread2") 137 thread2.join() 138 print("Joining class2/thread3") 139 thread3.join() 140 141 print("Creating class1/thread4 (its finished, allowed again)") 142 thread4 = test.count() 143 print("Joining thread4") 144 thread4.join() 145 146 print(thread1.value, thread2.value, thread3.value, thread4.value) 147 print("Done.") 148 149 def testBenchmark(): 150 import time 151 152 def printThreadNum(): 153 import gc 154 from greenlet import greenlet 155 objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)] 156 print("Greenlets: %s" % len(objs)) 157 158 printThreadNum() 159 test = TestNoblock() 160 s = time.time() 161 for i in range(3): 162 gevent.spawn(test.count, i + 1) 163 print("Created in %.3fs" % (time.time() - s)) 164 printThreadNum() 165 time.sleep(5) 166 167 def testException(): 168 import time 169 @Noparallel(blocking=True, queue=True) 170 def count(self, num=5): 171 s = time.time() 172 # raise Exception("err") 173 for i in range(num): 174 print(self, i) 175 time.sleep(1) 176 return "%s return:%s" % (s, i) 177 def caller(): 178 try: 179 print("Ret:", count(5)) 180 except Exception as err: 181 print("Raised:", repr(err)) 182 183 gevent.joinall([ 184 gevent.spawn(caller), 185 gevent.spawn(caller), 186 gevent.spawn(caller), 187 gevent.spawn(caller) 188 ]) 189 190 191 from gevent import monkey 192 monkey.patch_all() 193 194 testException() 195 196 """ 197 testBenchmark() 198 print("Testing blocking mode...") 199 testBlocking() 200 print("Testing noblocking mode...") 201 testNoblocking() 202 """