Forking what is left of ZeroNet and hopefully adding an AT Proto Frontend/Proxy
1import time
2
3import gevent
4import pytest
5
6import util
7from util import ThreadPool
8
9
10@pytest.fixture(params=['gevent.spawn', 'thread_pool.spawn'])
11def queue_spawn(request):
12 thread_pool = ThreadPool.ThreadPool(10)
13 if request.param == "gevent.spawn":
14 return gevent.spawn
15 else:
16 return thread_pool.spawn
17
18
19class ExampleClass(object):
20 def __init__(self):
21 self.counted = 0
22
23 @util.Noparallel()
24 def countBlocking(self, num=5):
25 for i in range(1, num + 1):
26 time.sleep(0.1)
27 self.counted += 1
28 return "counted:%s" % i
29
30 @util.Noparallel(queue=True, ignore_class=True)
31 def countQueue(self, num=5):
32 for i in range(1, num + 1):
33 time.sleep(0.1)
34 self.counted += 1
35 return "counted:%s" % i
36
37 @util.Noparallel(blocking=False)
38 def countNoblocking(self, num=5):
39 for i in range(1, num + 1):
40 time.sleep(0.01)
41 self.counted += 1
42 return "counted:%s" % i
43
44
45class TestNoparallel:
46 def testBlocking(self, queue_spawn):
47 obj1 = ExampleClass()
48 obj2 = ExampleClass()
49
50 # Dont allow to call again until its running and wait until its running
51 threads = [
52 queue_spawn(obj1.countBlocking),
53 queue_spawn(obj1.countBlocking),
54 queue_spawn(obj1.countBlocking),
55 queue_spawn(obj2.countBlocking)
56 ]
57 assert obj2.countBlocking() == "counted:5" # The call is ignored as obj2.countBlocking already counting, but block until its finishes
58 gevent.joinall(threads)
59 assert [thread.value for thread in threads] == ["counted:5", "counted:5", "counted:5", "counted:5"]
60 obj2.countBlocking() # Allow to call again as obj2.countBlocking finished
61
62 assert obj1.counted == 5
63 assert obj2.counted == 10
64
65 def testNoblocking(self):
66 obj1 = ExampleClass()
67
68 thread1 = obj1.countNoblocking()
69 thread2 = obj1.countNoblocking() # Ignored
70
71 assert obj1.counted == 0
72 time.sleep(0.1)
73 assert thread1.value == "counted:5"
74 assert thread2.value == "counted:5"
75 assert obj1.counted == 5
76
77 obj1.countNoblocking().join() # Allow again and wait until finishes
78 assert obj1.counted == 10
79
80 def testQueue(self, queue_spawn):
81 obj1 = ExampleClass()
82
83 queue_spawn(obj1.countQueue, num=1)
84 queue_spawn(obj1.countQueue, num=1)
85 queue_spawn(obj1.countQueue, num=1)
86
87 time.sleep(0.3)
88 assert obj1.counted == 2 # No multi-queue supported
89
90 obj2 = ExampleClass()
91 queue_spawn(obj2.countQueue, num=10)
92 queue_spawn(obj2.countQueue, num=10)
93
94 time.sleep(1.5) # Call 1 finished, call 2 still working
95 assert 10 < obj2.counted < 20
96
97 queue_spawn(obj2.countQueue, num=10)
98 time.sleep(2.0)
99
100 assert obj2.counted == 30
101
102 def testQueueOverload(self):
103 obj1 = ExampleClass()
104
105 threads = []
106 for i in range(1000):
107 thread = gevent.spawn(obj1.countQueue, num=5)
108 threads.append(thread)
109
110 gevent.joinall(threads)
111 assert obj1.counted == 5 * 2 # Only called twice (no multi-queue allowed)
112
113 def testIgnoreClass(self, queue_spawn):
114 obj1 = ExampleClass()
115 obj2 = ExampleClass()
116
117 threads = [
118 queue_spawn(obj1.countQueue),
119 queue_spawn(obj1.countQueue),
120 queue_spawn(obj1.countQueue),
121 queue_spawn(obj2.countQueue),
122 queue_spawn(obj2.countQueue)
123 ]
124 s = time.time()
125 time.sleep(0.001)
126 gevent.joinall(threads)
127
128 # Queue limited to 2 calls (every call takes counts to 5 and takes 0.05 sec)
129 assert obj1.counted + obj2.counted == 10
130
131 taken = time.time() - s
132 assert 1.2 > taken >= 1.0 # 2 * 0.5s count = ~1s
133
134 def testException(self, queue_spawn):
135 class MyException(Exception):
136 pass
137
138 @util.Noparallel()
139 def raiseException():
140 raise MyException("Test error!")
141
142 with pytest.raises(MyException) as err:
143 raiseException()
144 assert str(err.value) == "Test error!"
145
146 with pytest.raises(MyException) as err:
147 queue_spawn(raiseException).get()
148 assert str(err.value) == "Test error!"
149
150 def testMultithreadMix(self, queue_spawn):
151 obj1 = ExampleClass()
152 with ThreadPool.ThreadPool(10) as thread_pool:
153 s = time.time()
154 t1 = queue_spawn(obj1.countBlocking, 5)
155 time.sleep(0.01)
156 t2 = thread_pool.spawn(obj1.countBlocking, 5)
157 time.sleep(0.01)
158 t3 = thread_pool.spawn(obj1.countBlocking, 5)
159 time.sleep(0.3)
160 t4 = gevent.spawn(obj1.countBlocking, 5)
161 threads = [t1, t2, t3, t4]
162 for thread in threads:
163 assert thread.get() == "counted:5"
164
165 time_taken = time.time() - s
166 assert obj1.counted == 5
167 assert 0.5 < time_taken < 0.7