Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 180 lines 5.0 kB view raw
1import threading 2import time 3import queue 4 5import gevent 6import gevent.monkey 7import gevent.threadpool 8import gevent._threading 9 10 11class ThreadPool: 12 def __init__(self, max_size, name=None): 13 self.setMaxSize(max_size) 14 if name: 15 self.name = name 16 else: 17 self.name = "ThreadPool#%s" % id(self) 18 19 def setMaxSize(self, max_size): 20 self.max_size = max_size 21 if max_size > 0: 22 self.pool = gevent.threadpool.ThreadPool(max_size) 23 else: 24 self.pool = None 25 26 def wrap(self, func): 27 if self.pool is None: 28 return func 29 30 def wrapper(*args, **kwargs): 31 if not isMainThread(): # Call directly if not in main thread 32 return func(*args, **kwargs) 33 res = self.apply(func, args, kwargs) 34 return res 35 36 return wrapper 37 38 def spawn(self, *args, **kwargs): 39 if not isMainThread() and not self.pool._semaphore.ready(): 40 # Avoid semaphore error when spawning from other thread and the pool is full 41 return main_loop.call(self.spawn, *args, **kwargs) 42 res = self.pool.spawn(*args, **kwargs) 43 return res 44 45 def apply(self, func, args=(), kwargs={}): 46 t = self.spawn(func, *args, **kwargs) 47 if self.pool._apply_immediately(): 48 return main_loop.call(t.get) 49 else: 50 return t.get() 51 52 def kill(self): 53 if self.pool is not None and self.pool.size > 0 and main_loop: 54 main_loop.call(lambda: gevent.spawn(self.pool.kill).join(timeout=1)) 55 56 del self.pool 57 self.pool = None 58 59 def __enter__(self): 60 return self 61 62 def __exit__(self, *args): 63 self.kill() 64 65 66lock_pool = gevent.threadpool.ThreadPool(50) 67main_thread_id = threading.current_thread().ident 68 69 70def isMainThread(): 71 return threading.current_thread().ident == main_thread_id 72 73 74class Lock: 75 def __init__(self): 76 self.lock = gevent._threading.Lock() 77 self.locked = self.lock.locked 78 self.release = self.lock.release 79 self.time_lock = 0 80 81 def acquire(self, *args, **kwargs): 82 self.time_lock = time.time() 83 if self.locked() and isMainThread(): 84 # Start in new thread to avoid blocking gevent loop 85 return lock_pool.apply(self.lock.acquire, args, kwargs) 86 else: 87 return self.lock.acquire(*args, **kwargs) 88 89 def __del__(self): 90 while self.locked(): 91 self.release() 92 93 94class Event: 95 def __init__(self): 96 self.get_lock = Lock() 97 self.res = None 98 self.get_lock.acquire(False) 99 self.done = False 100 101 def set(self, res): 102 if self.done: 103 raise Exception("Event already has value") 104 self.res = res 105 self.get_lock.release() 106 self.done = True 107 108 def get(self): 109 if not self.done: 110 self.get_lock.acquire(True) 111 if self.get_lock.locked(): 112 self.get_lock.release() 113 back = self.res 114 return back 115 116 def __del__(self): 117 self.res = None 118 while self.get_lock.locked(): 119 self.get_lock.release() 120 121 122# Execute function calls in main loop from other threads 123class MainLoopCaller(): 124 def __init__(self): 125 self.queue_call = queue.Queue() 126 127 self.pool = gevent.threadpool.ThreadPool(1) 128 self.num_direct = 0 129 self.running = True 130 131 def caller(self, func, args, kwargs, event_done): 132 try: 133 res = func(*args, **kwargs) 134 event_done.set((True, res)) 135 except Exception as err: 136 event_done.set((False, err)) 137 138 def start(self): 139 gevent.spawn(self.run) 140 time.sleep(0.001) 141 142 def run(self): 143 while self.running: 144 if self.queue_call.qsize() == 0: # Get queue in new thread to avoid gevent blocking 145 func, args, kwargs, event_done = self.pool.apply(self.queue_call.get) 146 else: 147 func, args, kwargs, event_done = self.queue_call.get() 148 gevent.spawn(self.caller, func, args, kwargs, event_done) 149 del func, args, kwargs, event_done 150 self.running = False 151 152 def call(self, func, *args, **kwargs): 153 if threading.current_thread().ident == main_thread_id: 154 return func(*args, **kwargs) 155 else: 156 event_done = Event() 157 self.queue_call.put((func, args, kwargs, event_done)) 158 success, res = event_done.get() 159 del event_done 160 self.queue_call.task_done() 161 if success: 162 return res 163 else: 164 raise res 165 166 167def patchSleep(): # Fix memory leak by using real sleep in threads 168 real_sleep = gevent.monkey.get_original("time", "sleep") 169 170 def patched_sleep(seconds): 171 if isMainThread(): 172 gevent.sleep(seconds) 173 else: 174 real_sleep(seconds) 175 time.sleep = patched_sleep 176 177 178main_loop = MainLoopCaller() 179main_loop.start() 180patchSleep()