Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import threading
2import time
3import queue
4
5import gevent
6import gevent.monkey
7import gevent.threadpool
8import gevent._threading
9
10
11class ThreadPool:
12 def __init__(self, max_size, name=None):
13 self.setMaxSize(max_size)
14 if name:
15 self.name = name
16 else:
17 self.name = "ThreadPool#%s" % id(self)
18
19 def setMaxSize(self, max_size):
20 self.max_size = max_size
21 if max_size > 0:
22 self.pool = gevent.threadpool.ThreadPool(max_size)
23 else:
24 self.pool = None
25
26 def wrap(self, func):
27 if self.pool is None:
28 return func
29
30 def wrapper(*args, **kwargs):
31 if not isMainThread(): # Call directly if not in main thread
32 return func(*args, **kwargs)
33 res = self.apply(func, args, kwargs)
34 return res
35
36 return wrapper
37
38 def spawn(self, *args, **kwargs):
39 if not isMainThread() and not self.pool._semaphore.ready():
40 # Avoid semaphore error when spawning from other thread and the pool is full
41 return main_loop.call(self.spawn, *args, **kwargs)
42 res = self.pool.spawn(*args, **kwargs)
43 return res
44
45 def apply(self, func, args=(), kwargs={}):
46 t = self.spawn(func, *args, **kwargs)
47 if self.pool._apply_immediately():
48 return main_loop.call(t.get)
49 else:
50 return t.get()
51
52 def kill(self):
53 if self.pool is not None and self.pool.size > 0 and main_loop:
54 main_loop.call(lambda: gevent.spawn(self.pool.kill).join(timeout=1))
55
56 del self.pool
57 self.pool = None
58
59 def __enter__(self):
60 return self
61
62 def __exit__(self, *args):
63 self.kill()
64
65
66lock_pool = gevent.threadpool.ThreadPool(50)
67main_thread_id = threading.current_thread().ident
68
69
70def isMainThread():
71 return threading.current_thread().ident == main_thread_id
72
73
74class Lock:
75 def __init__(self):
76 self.lock = gevent._threading.Lock()
77 self.locked = self.lock.locked
78 self.release = self.lock.release
79 self.time_lock = 0
80
81 def acquire(self, *args, **kwargs):
82 self.time_lock = time.time()
83 if self.locked() and isMainThread():
84 # Start in new thread to avoid blocking gevent loop
85 return lock_pool.apply(self.lock.acquire, args, kwargs)
86 else:
87 return self.lock.acquire(*args, **kwargs)
88
89 def __del__(self):
90 while self.locked():
91 self.release()
92
93
94class Event:
95 def __init__(self):
96 self.get_lock = Lock()
97 self.res = None
98 self.get_lock.acquire(False)
99 self.done = False
100
101 def set(self, res):
102 if self.done:
103 raise Exception("Event already has value")
104 self.res = res
105 self.get_lock.release()
106 self.done = True
107
108 def get(self):
109 if not self.done:
110 self.get_lock.acquire(True)
111 if self.get_lock.locked():
112 self.get_lock.release()
113 back = self.res
114 return back
115
116 def __del__(self):
117 self.res = None
118 while self.get_lock.locked():
119 self.get_lock.release()
120
121
122# Execute function calls in main loop from other threads
123class MainLoopCaller():
124 def __init__(self):
125 self.queue_call = queue.Queue()
126
127 self.pool = gevent.threadpool.ThreadPool(1)
128 self.num_direct = 0
129 self.running = True
130
131 def caller(self, func, args, kwargs, event_done):
132 try:
133 res = func(*args, **kwargs)
134 event_done.set((True, res))
135 except Exception as err:
136 event_done.set((False, err))
137
138 def start(self):
139 gevent.spawn(self.run)
140 time.sleep(0.001)
141
142 def run(self):
143 while self.running:
144 if self.queue_call.qsize() == 0: # Get queue in new thread to avoid gevent blocking
145 func, args, kwargs, event_done = self.pool.apply(self.queue_call.get)
146 else:
147 func, args, kwargs, event_done = self.queue_call.get()
148 gevent.spawn(self.caller, func, args, kwargs, event_done)
149 del func, args, kwargs, event_done
150 self.running = False
151
152 def call(self, func, *args, **kwargs):
153 if threading.current_thread().ident == main_thread_id:
154 return func(*args, **kwargs)
155 else:
156 event_done = Event()
157 self.queue_call.put((func, args, kwargs, event_done))
158 success, res = event_done.get()
159 del event_done
160 self.queue_call.task_done()
161 if success:
162 return res
163 else:
164 raise res
165
166
167def patchSleep(): # Fix memory leak by using real sleep in threads
168 real_sleep = gevent.monkey.get_original("time", "sleep")
169
170 def patched_sleep(seconds):
171 if isMainThread():
172 gevent.sleep(seconds)
173 else:
174 real_sleep(seconds)
175 time.sleep = patched_sleep
176
177
178main_loop = MainLoopCaller()
179main_loop.start()
180patchSleep()