loading up the forgejo repo on tangled to test page performance
at forgejo 9.0 kB view raw
1// Copyright 2023 The Gitea Authors. All rights reserved. 2// SPDX-License-Identifier: MIT 3 4package queue 5 6import ( 7 "bytes" 8 "runtime" 9 "strconv" 10 "sync" 11 "testing" 12 "time" 13 14 "forgejo.org/modules/setting" 15 "forgejo.org/modules/test" 16 17 "github.com/stretchr/testify/assert" 18 "github.com/stretchr/testify/require" 19) 20 21func runWorkerPoolQueue[T any](q *WorkerPoolQueue[T]) func() { 22 go q.Run() 23 return func() { 24 q.ShutdownWait(1 * time.Second) 25 } 26} 27 28func TestWorkerPoolQueueUnhandled(t *testing.T) { 29 oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load() 30 unhandledItemRequeueDuration.Store(0) 31 defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration) 32 33 mu := sync.Mutex{} 34 35 test := func(t *testing.T, queueSetting setting.QueueSettings) { 36 queueSetting.Length = 100 37 queueSetting.Type = "channel" 38 queueSetting.Datadir = t.TempDir() + "/test-queue" 39 m := map[int]int{} 40 41 // odds are handled once, evens are handled twice 42 handler := func(items ...int) (unhandled []int) { 43 testRecorder.Record("handle:%v", items) 44 for _, item := range items { 45 mu.Lock() 46 if item%2 == 0 && m[item] == 0 { 47 unhandled = append(unhandled, item) 48 } 49 m[item]++ 50 mu.Unlock() 51 } 52 return unhandled 53 } 54 55 q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", queueSetting, handler, false) 56 stop := runWorkerPoolQueue(q) 57 for i := 0; i < queueSetting.Length; i++ { 58 testRecorder.Record("push:%v", i) 59 require.NoError(t, q.Push(i)) 60 } 61 require.NoError(t, q.FlushWithContext(t.Context(), 0)) 62 stop() 63 64 ok := true 65 for i := 0; i < queueSetting.Length; i++ { 66 if i%2 == 0 { 67 ok = ok && assert.Equal(t, 2, m[i], "test %s: item %d", t.Name(), i) 68 } else { 69 ok = ok && assert.Equal(t, 1, m[i], "test %s: item %d", t.Name(), i) 70 } 71 } 72 if !ok { 73 t.Logf("m: %v", m) 74 t.Logf("records: %v", testRecorder.Records()) 75 } 76 testRecorder.Reset() 77 } 78 79 runCount := 2 // we can run these tests even hundreds times to see its stability 80 t.Run("1/1", func(t *testing.T) { 81 for i := 0; i < runCount; i++ { 82 test(t, setting.QueueSettings{BatchLength: 1, MaxWorkers: 1}) 83 } 84 }) 85 t.Run("3/1", func(t *testing.T) { 86 for i := 0; i < runCount; i++ { 87 test(t, setting.QueueSettings{BatchLength: 3, MaxWorkers: 1}) 88 } 89 }) 90 t.Run("4/5", func(t *testing.T) { 91 for i := 0; i < runCount; i++ { 92 test(t, setting.QueueSettings{BatchLength: 4, MaxWorkers: 5}) 93 } 94 }) 95} 96 97func TestWorkerPoolQueuePersistence(t *testing.T) { 98 runCount := 2 // we can run these tests even hundreds times to see its stability 99 t.Run("1/1", func(t *testing.T) { 100 for i := 0; i < runCount; i++ { 101 testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 1, MaxWorkers: 1, Length: 100}) 102 } 103 }) 104 t.Run("3/1", func(t *testing.T) { 105 for i := 0; i < runCount; i++ { 106 testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 3, MaxWorkers: 1, Length: 100}) 107 } 108 }) 109 t.Run("4/5", func(t *testing.T) { 110 for i := 0; i < runCount; i++ { 111 testWorkerPoolQueuePersistence(t, setting.QueueSettings{BatchLength: 4, MaxWorkers: 5, Length: 100}) 112 } 113 }) 114} 115 116func testWorkerPoolQueuePersistence(t *testing.T, queueSetting setting.QueueSettings) { 117 testCount := queueSetting.Length 118 queueSetting.Type = "level" 119 queueSetting.Datadir = t.TempDir() + "/test-queue" 120 121 mu := sync.Mutex{} 122 123 var tasksQ1, tasksQ2 []string 124 q1 := func() { 125 startWhenAllReady := make(chan struct{}) // only start data consuming when the "testCount" tasks are all pushed into queue 126 stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item 127 128 testHandler := func(data ...string) []string { 129 <-startWhenAllReady 130 time.Sleep(10 * time.Millisecond) 131 for _, s := range data { 132 mu.Lock() 133 tasksQ1 = append(tasksQ1, s) 134 mu.Unlock() 135 136 if s == "task-20" { 137 close(stopAt20Shutdown) 138 } 139 } 140 return nil 141 } 142 143 q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true) 144 stop := runWorkerPoolQueue(q) 145 for i := 0; i < testCount; i++ { 146 _ = q.Push("task-" + strconv.Itoa(i)) 147 } 148 close(startWhenAllReady) 149 <-stopAt20Shutdown // it's possible to have more than 20 tasks executed 150 stop() 151 } 152 153 q1() // run some tasks and shutdown at an intermediate point 154 155 time.Sleep(100 * time.Millisecond) // because the handler in q1 has a slight delay, we need to wait for it to finish 156 157 q2 := func() { 158 testHandler := func(data ...string) []string { 159 for _, s := range data { 160 mu.Lock() 161 tasksQ2 = append(tasksQ2, s) 162 mu.Unlock() 163 } 164 return nil 165 } 166 167 q, _ := newWorkerPoolQueueForTest("pr_patch_checker_test", queueSetting, testHandler, true) 168 stop := runWorkerPoolQueue(q) 169 require.NoError(t, q.FlushWithContext(t.Context(), 0)) 170 stop() 171 } 172 173 q2() // restart the queue to continue to execute the tasks in it 174 175 assert.NotEmpty(t, tasksQ1) 176 assert.NotEmpty(t, tasksQ2) 177 assert.Equal(t, testCount, len(tasksQ1)+len(tasksQ2)) 178} 179 180func TestWorkerPoolQueueActiveWorkers(t *testing.T) { 181 defer test.MockVariableValue(&workerIdleDuration, 300*time.Millisecond)() 182 183 handler := func(items ...int) (unhandled []int) { 184 time.Sleep(100 * time.Millisecond) 185 return nil 186 } 187 188 q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 1, Length: 100}, handler, false) 189 stop := runWorkerPoolQueue(q) 190 for i := 0; i < 5; i++ { 191 require.NoError(t, q.Push(i)) 192 } 193 194 time.Sleep(50 * time.Millisecond) 195 assert.Equal(t, 1, q.GetWorkerNumber()) 196 assert.Equal(t, 1, q.GetWorkerActiveNumber()) 197 time.Sleep(500 * time.Millisecond) 198 assert.Equal(t, 1, q.GetWorkerNumber()) 199 assert.Equal(t, 0, q.GetWorkerActiveNumber()) 200 time.Sleep(workerIdleDuration) 201 assert.Equal(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working 202 stop() 203 204 q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 3, Length: 100}, handler, false) 205 stop = runWorkerPoolQueue(q) 206 for i := 0; i < 15; i++ { 207 require.NoError(t, q.Push(i)) 208 } 209 210 time.Sleep(50 * time.Millisecond) 211 assert.Equal(t, 3, q.GetWorkerNumber()) 212 assert.Equal(t, 3, q.GetWorkerActiveNumber()) 213 time.Sleep(500 * time.Millisecond) 214 assert.Equal(t, 3, q.GetWorkerNumber()) 215 assert.Equal(t, 0, q.GetWorkerActiveNumber()) 216 time.Sleep(workerIdleDuration) 217 assert.Equal(t, 1, q.GetWorkerNumber()) // there is at least one worker after the queue begins working 218 stop() 219} 220 221func TestWorkerPoolQueueShutdown(t *testing.T) { 222 oldUnhandledItemRequeueDuration := unhandledItemRequeueDuration.Load() 223 unhandledItemRequeueDuration.Store(int64(100 * time.Millisecond)) 224 defer unhandledItemRequeueDuration.Store(oldUnhandledItemRequeueDuration) 225 226 // simulate a slow handler, it doesn't handle any item (all items will be pushed back to the queue) 227 handlerCalled := make(chan struct{}) 228 handler := func(items ...int) (unhandled []int) { 229 if items[0] == 0 { 230 close(handlerCalled) 231 } 232 time.Sleep(400 * time.Millisecond) 233 return items 234 } 235 236 qs := setting.QueueSettings{Type: "level", Datadir: t.TempDir() + "/queue", BatchLength: 3, MaxWorkers: 4, Length: 20} 237 q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false) 238 stop := runWorkerPoolQueue(q) 239 for i := 0; i < qs.Length; i++ { 240 require.NoError(t, q.Push(i)) 241 } 242 <-handlerCalled 243 time.Sleep(200 * time.Millisecond) // wait for a while to make sure all workers are active 244 assert.Equal(t, 4, q.GetWorkerActiveNumber()) 245 stop() // stop triggers shutdown 246 assert.Equal(t, 0, q.GetWorkerActiveNumber()) 247 248 // no item was ever handled, so we still get all of them again 249 q, _ = newWorkerPoolQueueForTest("test-workpoolqueue", qs, handler, false) 250 assert.Equal(t, 20, q.GetQueueItemNumber()) 251} 252 253func TestWorkerPoolQueueWorkerIdleReset(t *testing.T) { 254 defer test.MockVariableValue(&workerIdleDuration, 1*time.Millisecond)() 255 256 chGoroutineIDs := make(chan string) 257 handler := func(items ...int) (unhandled []int) { 258 time.Sleep(10 * workerIdleDuration) 259 chGoroutineIDs <- goroutineID() // hacky way to identify a worker 260 return nil 261 } 262 263 q, _ := newWorkerPoolQueueForTest("test-workpoolqueue", setting.QueueSettings{Type: "channel", BatchLength: 1, MaxWorkers: 2, Length: 100}, handler, false) 264 stop := runWorkerPoolQueue(q) 265 266 const workloadSize = 12 267 for i := 0; i < workloadSize; i++ { 268 require.NoError(t, q.Push(i)) 269 } 270 271 workerIDs := make(map[string]struct{}) 272 for i := 0; i < workloadSize; i++ { 273 c := <-chGoroutineIDs 274 workerIDs[c] = struct{}{} 275 t.Logf("%d workers: overall=%d current=%d", i, len(workerIDs), q.GetWorkerNumber()) 276 277 // ensure that no more than qs.MaxWorkers workers are created over the whole lifetime of the queue 278 // (otherwise it would mean that some workers got shut down while the queue was full) 279 require.LessOrEqual(t, len(workerIDs), q.GetWorkerMaxNumber()) 280 } 281 close(chGoroutineIDs) 282 283 stop() 284} 285 286func goroutineID() string { 287 var buffer [31]byte 288 _ = runtime.Stack(buffer[:], false) 289 return string(bytes.Fields(buffer[10:])[0]) 290}