1// Copyright 2023 The Gitea Authors. All rights reserved.
2// SPDX-License-Identifier: MIT
3
4package queue
5
6import (
7 "bytes"
8 "runtime"
9 "strconv"
10 "sync"
11 "testing"
12 "time"
13
14 "forgejo.org/modules/setting"
15 "forgejo.org/modules/test"
16
17 "github.com/stretchr/testify/assert"
18 "github.com/stretchr/testify/require"
19)
20
21func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() {
22 go q.Run()
23 return func() {
24 q.ShutdownWait(1 * time.Second)
25 }
26}
27
28func TestWorkerPoolQueueUnhandled(t *testing.T) {
29 oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load()
30 unhandledItemRequeueDuration.Store(0)
31 defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration)
32
33 mu := sync.Mutex{}
34
35 test := func(t *testing.T, queueSetting setting.QueueSettings) {
36 queueSetting.Length = 100
37 queueSetting.Type = "channel"
38 queueSetting.Datadir = t.TempDir() + "/test-queue"
39 m := map[int]int{}
40
41 // odds are handled once, evens are handled twice
42 handler := func(items ...int) (unhandled []int) {
43 testRecorder.Record("handle:%v", items)
44 for _, item := range items {
45 mu.Lock()
46 if item%2 == 0 && m[item] == 0 {
47 unhandled = append(unhandled, item)
48 }
49 m[item]++
50 mu.Unlock()
51 }
52 return unhandled
53 }
54
55 q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", queueSetting, handler, false)
56 stop := runWorkerPoolQueue(q)
57 for i := 0; i < queueSetting.Length; i++ {
58 testRecorder.Record("push:%v", i)
59 require.NoError(t, q.Push(i))
60 }
61 require.NoError(t, q.FlushWithContext(t.Context(), 0))
62 stop()
63
64 ok := true
65 for i := 0; i < queueSetting.Length; i++ {
66 if i%2 == 0 {
67 ok = ok && assert.Equal(t, 2, m[i], "test %s: item %d", t.Name(), i)
68 } else {
69 ok = ok && assert.Equal(t, 1, m[i], "test %s: item %d", t.Name(), i)
70 }
71 }
72 if !ok {
73 t.Logf("m: %v", m)
74 t.Logf("records: %v", testRecorder.Records())
75 }
76 testRecorder.Reset()
77 }
78
79 runCount := 2 // we can run these tests even hundreds times to see its stability
80 t.Run("1/1", func(t *testing.T) {
81 for i := 0; i < runCount; i++ {
82 test(t, setting.QueueSettings{BatchLength: 1, MaxWorkers: 1})
83 }
84 })
85 t.Run("3/1", func(t *testing.T) {
86 for i := 0; i < runCount; i++ {
87 test(t, setting.QueueSettings{BatchLength: 3, MaxWorkers: 1})
88 }
89 })
90 t.Run("4/5", func(t *testing.T) {
91 for i := 0; i < runCount; i++ {
92 test(t, setting.QueueSettings{BatchLength: 4, MaxWorkers: 5})
93 }
94 })
95}
96
97func TestWorkerPoolQueuePersistence(t *testing.T) {
98 runCount := 2 // we can run these tests even hundreds times to see its stability
99 t.Run("1/1", func(t *testing.T) {
100 for i := 0; i < runCount; i++ {
101 testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 1, MaxWorkers: 1, Length: 100})
102 }
103 })
104 t.Run("3/1", func(t *testing.T) {
105 for i := 0; i < runCount; i++ {
106 testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 3, MaxWorkers: 1, Length: 100})
107 }
108 })
109 t.Run("4/5", func(t *testing.T) {
110 for i := 0; i < runCount; i++ {
111 testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 4, MaxWorkers: 5, Length: 100})
112 }
113 })
114}
115
116func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSettings) {
117 testCount := queueSetting.Length
118 queueSetting.Type = "level"
119 queueSetting.Datadir = t.TempDir() + "/test-queue"
120
121 mu := sync.Mutex{}
122
123 var tasksQ1, tasksQ2 []string
124 q1 := func() {
125 startWhenAllReady := make(chan struct{}) // only start data consuming when the "testCount" tasks are all pushed into queue
126 stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
127
128 testHandler := func(data ...string) []string {
129 <-startWhenAllReady
130 time.Sleep(10 * time.Millisecond)
131 for _, s := range data {
132 mu.Lock()
133 tasksQ1 = append(tasksQ1, s)
134 mu.Unlock()
135
136 if s == "task-20" {
137 close(stopAt20Shutdown)
138 }
139 }
140 return nil
141 }
142
143 q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
144 stop := runWorkerPoolQueue(q)
145 for i := 0; i < testCount; i++ {
146 _ = q.Push("task-" + strconv.Itoa(i))
147 }
148 close(startWhenAllReady)
149 <-stopAt20Shutdown // it's possible to have more than 20 tasks executed
150 stop()
151 }
152
153 q1() // run some tasks and shutdown at an intermediate point
154
155 time.Sleep(100 * time.Millisecond) // because the handler in q1 has a slight delay, we need to wait for it to finish
156
157 q2 := func() {
158 testHandler := func(data ...string) []string {
159 for _, s := range data {
160 mu.Lock()
161 tasksQ2 = append(tasksQ2, s)
162 mu.Unlock()
163 }
164 return nil
165 }
166
167 q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true)
168 stop := runWorkerPoolQueue(q)
169 require.NoError(t, q.FlushWithContext(t.Context(), 0))
170 stop()
171 }
172
173 q2() // restart the queue to continue to execute the tasks in it
174
175 assert.NotEmpty(t, tasksQ1)
176 assert.NotEmpty(t, tasksQ2)
177 assert.Equal(t, testCount, len(tasksQ1)+len(tasksQ2))
178}
179
180func TestWorkerPoolQueueActiveWorkers(t *testing.T) {
181 defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)()
182
183 handler := func(items ...int) (unhandled []int) {
184 time.Sleep(100 * time.Millisecond)
185 return nil
186 }
187
188 q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false)
189 stop := runWorkerPoolQueue(q)
190 for i := 0; i < 5; i++ {
191 require.NoError(t, q.Push(i))
192 }
193
194 time.Sleep(50 * time.Millisecond)
195 assert.Equal(t, 1, q.GetWorkerNumber())
196 assert.Equal(t, 1, q.GetWorkerActiveNumber())
197 time.Sleep(500 * time.Millisecond)
198 assert.Equal(t, 1, q.GetWorkerNumber())
199 assert.Equal(t, 0, q.GetWorkerActiveNumber())
200 time.Sleep(workerIdleDuration)
201 assert.Equal(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
202 stop()
203
204 q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false)
205 stop = runWorkerPoolQueue(q)
206 for i := 0; i < 15; i++ {
207 require.NoError(t, q.Push(i))
208 }
209
210 time.Sleep(50 * time.Millisecond)
211 assert.Equal(t, 3, q.GetWorkerNumber())
212 assert.Equal(t, 3, q.GetWorkerActiveNumber())
213 time.Sleep(500 * time.Millisecond)
214 assert.Equal(t, 3, q.GetWorkerNumber())
215 assert.Equal(t, 0, q.GetWorkerActiveNumber())
216 time.Sleep(workerIdleDuration)
217 assert.Equal(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working
218 stop()
219}
220
221func TestWorkerPoolQueueShutdown(t *testing.T) {
222 oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load()
223 unhandledItemRequeueDuration.Store(int64(100 * time.Millisecond))
224 defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration)
225
226 // simulate a slow handler, it doesn't handle any item (all items will be pushed back to the queue)
227 handlerCalled := make(chan struct{})
228 handler := func(items ...int) (unhandled []int) {
229 if items[0] == 0 {
230 close(handlerCalled)
231 }
232 time.Sleep(400 * time.Millisecond)
233 return items
234 }
235
236 qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20}
237 q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
238 stop := runWorkerPoolQueue(q)
239 for i := 0; i < qs.Length; i++ {
240 require.NoError(t, q.Push(i))
241 }
242 <-handlerCalled
243 time.Sleep(200 * time.Millisecond) // wait for a while to make sure all workers are active
244 assert.Equal(t, 4, q.GetWorkerActiveNumber())
245 stop() // stop triggers shutdown
246 assert.Equal(t, 0, q.GetWorkerActiveNumber())
247
248 // no item was ever handled, so we still get all of them again
249 q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false)
250 assert.Equal(t, 20, q.GetQueueItemNumber())
251}
252
253func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) {
254 defer test.MockVariableValue(&workerIdleDuration, 1*time.Millisecond)()
255
256 chGoroutineIDs := make(chan string)
257 handler := func(items ...int) (unhandled []int) {
258 time.Sleep(10 * workerIdleDuration)
259 chGoroutineIDs <- goroutineID() // hacky way to identify a worker
260 return nil
261 }
262
263 q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false)
264 stop := runWorkerPoolQueue(q)
265
266 const workloadSize = 12
267 for i := 0; i < workloadSize; i++ {
268 require.NoError(t, q.Push(i))
269 }
270
271 workerIDs := make(map[string]struct{})
272 for i := 0; i < workloadSize; i++ {
273 c := <-chGoroutineIDs
274 workerIDs[c] = struct{}{}
275 t.Logf("%d workers: overall=%d current=%d", i, len(workerIDs), q.GetWorkerNumber())
276
277 // ensure that no more than qs.MaxWorkers workers are created over the whole lifetime of the queue
278 // (otherwise it would mean that some workers got shut down while the queue was full)
279 require.LessOrEqual(t, len(workerIDs), q.GetWorkerMaxNumber())
280 }
281 close(chGoroutineIDs)
282
283 stop()
284}
285
286func goroutineID() string {
287 var buffer [31]byte
288 _ = runtime.Stack(buffer[:], false)
289 return string(bytes.Fields(buffer[10:])[0])
290}