Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2import threading
3
4import gevent
5import pytest
6
7from util import ThreadPool
8
9
10class TestThreadPool:
11 def testExecutionOrder(self):
12 with ThreadPool.ThreadPool(4) as pool:
13 events = []
14
15 @pool.wrap
16 def blocker():
17 events.append("S")
18 out = 0
19 for i in range(10000000):
20 if i == 3000000:
21 events.append("M")
22 out += 1
23 events.append("D")
24 return out
25
26 threads = []
27 for i in range(3):
28 threads.append(gevent.spawn(blocker))
29 gevent.joinall(threads)
30
31 assert events == ["S"] * 3 + ["M"] * 3 + ["D"] * 3
32
33 res = blocker()
34 assert res == 10000000
35
36 def testLockBlockingSameThread(self):
37 lock = ThreadPool.Lock()
38
39 s = time.time()
40
41 def unlocker():
42 time.sleep(1)
43 lock.release()
44
45 gevent.spawn(unlocker)
46 lock.acquire(True)
47 lock.acquire(True, timeout=2)
48
49 unlock_taken = time.time() - s
50
51 assert 1.0 < unlock_taken < 1.5
52
53 def testLockBlockingDifferentThread(self):
54 lock = ThreadPool.Lock()
55
56 def locker():
57 lock.acquire(True)
58 time.sleep(0.5)
59 lock.release()
60
61 with ThreadPool.ThreadPool(10) as pool:
62 threads = [
63 pool.spawn(locker),
64 pool.spawn(locker),
65 gevent.spawn(locker),
66 pool.spawn(locker)
67 ]
68 time.sleep(0.1)
69
70 s = time.time()
71
72 lock.acquire(True, 5.0)
73
74 unlock_taken = time.time() - s
75
76 assert 1.8 < unlock_taken < 2.2
77
78 gevent.joinall(threads)
79
80 def testMainLoopCallerThreadId(self):
81 main_thread_id = threading.current_thread().ident
82 with ThreadPool.ThreadPool(5) as pool:
83 def getThreadId(*args, **kwargs):
84 return threading.current_thread().ident
85
86 t = pool.spawn(getThreadId)
87 assert t.get() != main_thread_id
88
89 t = pool.spawn(lambda: ThreadPool.main_loop.call(getThreadId))
90 assert t.get() == main_thread_id
91
92 def testMainLoopCallerGeventSpawn(self):
93 main_thread_id = threading.current_thread().ident
94 with ThreadPool.ThreadPool(5) as pool:
95 def waiter():
96 time.sleep(1)
97 return threading.current_thread().ident
98
99 def geventSpawner():
100 event = ThreadPool.main_loop.call(gevent.spawn, waiter)
101
102 with pytest.raises(Exception) as greenlet_err:
103 event.get()
104 assert str(greenlet_err.value) == "cannot switch to a different thread"
105
106 waiter_thread_id = ThreadPool.main_loop.call(event.get)
107 return waiter_thread_id
108
109 s = time.time()
110 waiter_thread_id = pool.apply(geventSpawner)
111 assert main_thread_id == waiter_thread_id
112 time_taken = time.time() - s
113 assert 0.9 < time_taken < 1.2
114
115 def testEvent(self):
116 with ThreadPool.ThreadPool(5) as pool:
117 event = ThreadPool.Event()
118
119 def setter():
120 time.sleep(1)
121 event.set("done!")
122
123 def getter():
124 return event.get()
125
126 pool.spawn(setter)
127 t_gevent = gevent.spawn(getter)
128 t_pool = pool.spawn(getter)
129 s = time.time()
130 assert event.get() == "done!"
131 time_taken = time.time() - s
132 gevent.joinall([t_gevent, t_pool])
133
134 assert t_gevent.get() == "done!"
135 assert t_pool.get() == "done!"
136
137 assert 0.9 < time_taken < 1.2
138
139 with pytest.raises(Exception) as err:
140 event.set("another result")
141
142 assert "Event already has value" in str(err.value)
143
144 def testMemoryLeak(self):
145 import gc
146 thread_objs_before = [id(obj) for obj in gc.get_objects() if "threadpool" in str(type(obj))]
147
148 def worker():
149 time.sleep(0.1)
150 return "ok"
151
152 def poolTest():
153 with ThreadPool.ThreadPool(5) as pool:
154 for i in range(20):
155 pool.spawn(worker)
156
157 for i in range(5):
158 poolTest()
159 new_thread_objs = [obj for obj in gc.get_objects() if "threadpool" in str(type(obj)) and id(obj) not in thread_objs_before]
160 #print("New objs:", new_thread_objs, "run:", num_run)
161
162 # Make sure no threadpool object left behind
163 assert not new_thread_objs