Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 163 lines 4.7 kB view raw
1import time 2import threading 3 4import gevent 5import pytest 6 7from util import ThreadPool 8 9 10class TestThreadPool: 11 def testExecutionOrder(self): 12 with ThreadPool.ThreadPool(4) as pool: 13 events = [] 14 15 @pool.wrap 16 def blocker(): 17 events.append("S") 18 out = 0 19 for i in range(10000000): 20 if i == 3000000: 21 events.append("M") 22 out += 1 23 events.append("D") 24 return out 25 26 threads = [] 27 for i in range(3): 28 threads.append(gevent.spawn(blocker)) 29 gevent.joinall(threads) 30 31 assert events == ["S"] * 3 + ["M"] * 3 + ["D"] * 3 32 33 res = blocker() 34 assert res == 10000000 35 36 def testLockBlockingSameThread(self): 37 lock = ThreadPool.Lock() 38 39 s = time.time() 40 41 def unlocker(): 42 time.sleep(1) 43 lock.release() 44 45 gevent.spawn(unlocker) 46 lock.acquire(True) 47 lock.acquire(True, timeout=2) 48 49 unlock_taken = time.time() - s 50 51 assert 1.0 < unlock_taken < 1.5 52 53 def testLockBlockingDifferentThread(self): 54 lock = ThreadPool.Lock() 55 56 def locker(): 57 lock.acquire(True) 58 time.sleep(0.5) 59 lock.release() 60 61 with ThreadPool.ThreadPool(10) as pool: 62 threads = [ 63 pool.spawn(locker), 64 pool.spawn(locker), 65 gevent.spawn(locker), 66 pool.spawn(locker) 67 ] 68 time.sleep(0.1) 69 70 s = time.time() 71 72 lock.acquire(True, 5.0) 73 74 unlock_taken = time.time() - s 75 76 assert 1.8 < unlock_taken < 2.2 77 78 gevent.joinall(threads) 79 80 def testMainLoopCallerThreadId(self): 81 main_thread_id = threading.current_thread().ident 82 with ThreadPool.ThreadPool(5) as pool: 83 def getThreadId(*args, **kwargs): 84 return threading.current_thread().ident 85 86 t = pool.spawn(getThreadId) 87 assert t.get() != main_thread_id 88 89 t = pool.spawn(lambda: ThreadPool.main_loop.call(getThreadId)) 90 assert t.get() == main_thread_id 91 92 def testMainLoopCallerGeventSpawn(self): 93 main_thread_id = threading.current_thread().ident 94 with ThreadPool.ThreadPool(5) as pool: 95 def waiter(): 96 time.sleep(1) 97 return threading.current_thread().ident 98 99 def geventSpawner(): 100 event = ThreadPool.main_loop.call(gevent.spawn, waiter) 101 102 with pytest.raises(Exception) as greenlet_err: 103 event.get() 104 assert str(greenlet_err.value) == "cannot switch to a different thread" 105 106 waiter_thread_id = ThreadPool.main_loop.call(event.get) 107 return waiter_thread_id 108 109 s = time.time() 110 waiter_thread_id = pool.apply(geventSpawner) 111 assert main_thread_id == waiter_thread_id 112 time_taken = time.time() - s 113 assert 0.9 < time_taken < 1.2 114 115 def testEvent(self): 116 with ThreadPool.ThreadPool(5) as pool: 117 event = ThreadPool.Event() 118 119 def setter(): 120 time.sleep(1) 121 event.set("done!") 122 123 def getter(): 124 return event.get() 125 126 pool.spawn(setter) 127 t_gevent = gevent.spawn(getter) 128 t_pool = pool.spawn(getter) 129 s = time.time() 130 assert event.get() == "done!" 131 time_taken = time.time() - s 132 gevent.joinall([t_gevent, t_pool]) 133 134 assert t_gevent.get() == "done!" 135 assert t_pool.get() == "done!" 136 137 assert 0.9 < time_taken < 1.2 138 139 with pytest.raises(Exception) as err: 140 event.set("another result") 141 142 assert "Event already has value" in str(err.value) 143 144 def testMemoryLeak(self): 145 import gc 146 thread_objs_before = [id(obj) for obj in gc.get_objects() if "threadpool" in str(type(obj))] 147 148 def worker(): 149 time.sleep(0.1) 150 return "ok" 151 152 def poolTest(): 153 with ThreadPool.ThreadPool(5) as pool: 154 for i in range(20): 155 pool.spawn(worker) 156 157 for i in range(5): 158 poolTest() 159 new_thread_objs = [obj for obj in gc.get_objects() if "threadpool" in str(type(obj)) and id(obj) not in thread_objs_before] 160 #print("New objs:", new_thread_objs, "run:", num_run) 161 162 # Make sure no threadpool object left behind 163 assert not new_thread_objs