Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
at main 167 lines 5.1 kB view raw
1import time 2 3import gevent 4import pytest 5 6import util 7from util import ThreadPool 8 9 10@pytest.fixture(params=['gevent.spawn', 'thread_pool.spawn']) 11def queue_spawn(request): 12 thread_pool = ThreadPool.ThreadPool(10) 13 if request.param == "gevent.spawn": 14 return gevent.spawn 15 else: 16 return thread_pool.spawn 17 18 19class ExampleClass(object): 20 def __init__(self): 21 self.counted = 0 22 23 @util.Noparallel() 24 def countBlocking(self, num=5): 25 for i in range(1, num + 1): 26 time.sleep(0.1) 27 self.counted += 1 28 return "counted:%s" % i 29 30 @util.Noparallel(queue=True, ignore_class=True) 31 def countQueue(self, num=5): 32 for i in range(1, num + 1): 33 time.sleep(0.1) 34 self.counted += 1 35 return "counted:%s" % i 36 37 @util.Noparallel(blocking=False) 38 def countNoblocking(self, num=5): 39 for i in range(1, num + 1): 40 time.sleep(0.01) 41 self.counted += 1 42 return "counted:%s" % i 43 44 45class TestNoparallel: 46 def testBlocking(self, queue_spawn): 47 obj1 = ExampleClass() 48 obj2 = ExampleClass() 49 50 # Dont allow to call again until its running and wait until its running 51 threads = [ 52 queue_spawn(obj1.countBlocking), 53 queue_spawn(obj1.countBlocking), 54 queue_spawn(obj1.countBlocking), 55 queue_spawn(obj2.countBlocking) 56 ] 57 assert obj2.countBlocking() == "counted:5" # The call is ignored as obj2.countBlocking already counting, but block until its finishes 58 gevent.joinall(threads) 59 assert [thread.value for thread in threads] == ["counted:5", "counted:5", "counted:5", "counted:5"] 60 obj2.countBlocking() # Allow to call again as obj2.countBlocking finished 61 62 assert obj1.counted == 5 63 assert obj2.counted == 10 64 65 def testNoblocking(self): 66 obj1 = ExampleClass() 67 68 thread1 = obj1.countNoblocking() 69 thread2 = obj1.countNoblocking() # Ignored 70 71 assert obj1.counted == 0 72 time.sleep(0.1) 73 assert thread1.value == "counted:5" 74 assert thread2.value == "counted:5" 75 assert obj1.counted == 5 76 77 obj1.countNoblocking().join() # Allow again and wait until finishes 78 assert obj1.counted == 10 79 80 def testQueue(self, queue_spawn): 81 obj1 = ExampleClass() 82 83 queue_spawn(obj1.countQueue, num=1) 84 queue_spawn(obj1.countQueue, num=1) 85 queue_spawn(obj1.countQueue, num=1) 86 87 time.sleep(0.3) 88 assert obj1.counted == 2 # No multi-queue supported 89 90 obj2 = ExampleClass() 91 queue_spawn(obj2.countQueue, num=10) 92 queue_spawn(obj2.countQueue, num=10) 93 94 time.sleep(1.5) # Call 1 finished, call 2 still working 95 assert 10 < obj2.counted < 20 96 97 queue_spawn(obj2.countQueue, num=10) 98 time.sleep(2.0) 99 100 assert obj2.counted == 30 101 102 def testQueueOverload(self): 103 obj1 = ExampleClass() 104 105 threads = [] 106 for i in range(1000): 107 thread = gevent.spawn(obj1.countQueue, num=5) 108 threads.append(thread) 109 110 gevent.joinall(threads) 111 assert obj1.counted == 5 * 2 # Only called twice (no multi-queue allowed) 112 113 def testIgnoreClass(self, queue_spawn): 114 obj1 = ExampleClass() 115 obj2 = ExampleClass() 116 117 threads = [ 118 queue_spawn(obj1.countQueue), 119 queue_spawn(obj1.countQueue), 120 queue_spawn(obj1.countQueue), 121 queue_spawn(obj2.countQueue), 122 queue_spawn(obj2.countQueue) 123 ] 124 s = time.time() 125 time.sleep(0.001) 126 gevent.joinall(threads) 127 128 # Queue limited to 2 calls (every call takes counts to 5 and takes 0.05 sec) 129 assert obj1.counted + obj2.counted == 10 130 131 taken = time.time() - s 132 assert 1.2 > taken >= 1.0 # 2 * 0.5s count = ~1s 133 134 def testException(self, queue_spawn): 135 class MyException(Exception): 136 pass 137 138 @util.Noparallel() 139 def raiseException(): 140 raise MyException("Test error!") 141 142 with pytest.raises(MyException) as err: 143 raiseException() 144 assert str(err.value) == "Test error!" 145 146 with pytest.raises(MyException) as err: 147 queue_spawn(raiseException).get() 148 assert str(err.value) == "Test error!" 149 150 def testMultithreadMix(self, queue_spawn): 151 obj1 = ExampleClass() 152 with ThreadPool.ThreadPool(10) as thread_pool: 153 s = time.time() 154 t1 = queue_spawn(obj1.countBlocking, 5) 155 time.sleep(0.01) 156 t2 = thread_pool.spawn(obj1.countBlocking, 5) 157 time.sleep(0.01) 158 t3 = thread_pool.spawn(obj1.countBlocking, 5) 159 time.sleep(0.3) 160 t4 = gevent.spawn(obj1.countBlocking, 5) 161 threads = [t1, t2, t3, t4] 162 for thread in threads: 163 assert thread.get() == "counted:5" 164 165 time_taken = time.time() - s 166 assert obj1.counted == 5 167 assert 0.5 < time_taken < 0.7