loading up the forgejo repo on tangled to test page performance
at forgejo 11 kB view raw
1// Copyright 2023 The Gitea Authors. All rights reserved. 2// SPDX-License-Identifier: MIT 3 4package queue 5 6import ( 7 "context" 8 "runtime/pprof" 9 "sync" 10 "sync/atomic" 11 "time" 12 13 "forgejo.org/modules/log" 14) 15 16var ( 17 infiniteTimerC = make(chan time.Time) 18 batchDebounceDuration = 100 * time.Millisecond 19 workerIdleDuration = 1 * time.Second 20 shutdownDefaultTimeout = 2 * time.Second 21 22 unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test 23) 24 25func init() { 26 unhandledItemRequeueDuration.Store(int64(5 * time.Second)) 27} 28 29// workerGroup is a group of workers to work with a WorkerPoolQueue 30type workerGroup[T any] struct { 31 q *WorkerPoolQueue[T] 32 wg sync.WaitGroup 33 34 ctxWorker context.Context 35 ctxWorkerCancel context.CancelFunc 36 37 batchBuffer []T 38 popItemChan chan []byte 39 popItemErr chan error 40} 41 42func (wg *workerGroup[T]) doPrepareWorkerContext() { 43 wg.ctxWorker, wg.ctxWorkerCancel = context.WithCancel(wg.q.ctxRun) 44} 45 46// doDispatchBatchToWorker dispatches a batch of items to worker's channel. 47// If the channel is full, it tries to start a new worker if possible. 48func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushChan chan flushType) { 49 batch := wg.batchBuffer 50 wg.batchBuffer = nil 51 52 if len(batch) == 0 { 53 return 54 } 55 56 full := false 57 select { 58 case q.batchChan <- batch: 59 default: 60 full = true 61 } 62 63 // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum" 64 // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later 65 // So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary. 66 q.workerNumMu.Lock() 67 noWorker := q.workerNum == 0 68 if full || noWorker { 69 if q.workerNum < q.workerMaxNum || noWorker && q.workerMaxNum <= 0 { 70 q.workerNum++ 71 q.doStartNewWorker(wg) 72 } 73 } 74 q.workerNumMu.Unlock() 75 76 if full { 77 select { 78 case q.batchChan <- batch: 79 case flush := <-flushChan: 80 q.doWorkerHandle(batch) 81 q.doFlush(wg, flush) 82 case <-q.ctxRun.Done(): 83 wg.batchBuffer = batch // return the batch to buffer, the "doRun" function will handle it 84 } 85 } 86} 87 88// doWorkerHandle calls the safeHandler to handle a batch of items, and it increases/decreases the active worker number. 89// If the context has been canceled, it should not be caller because the "Push" still needs the context, in such case, call q.safeHandler directly 90func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) { 91 q.workerNumMu.Lock() 92 q.workerActiveNum++ 93 q.workerNumMu.Unlock() 94 95 defer func() { 96 q.workerNumMu.Lock() 97 q.workerActiveNum-- 98 q.workerNumMu.Unlock() 99 }() 100 101 unhandled := q.safeHandler(batch...) 102 // if none of the items were handled, it should back-off for a few seconds 103 // in this case the handler (eg: document indexer) may have encountered some errors/failures 104 if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 { 105 log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch)) 106 select { 107 case <-q.ctxRun.Done(): 108 case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())): 109 } 110 } 111 for _, item := range unhandled { 112 if err := q.Push(item); err != nil { 113 if !q.basePushForShutdown(item) { 114 log.Error("Failed to requeue item for queue %q when calling handler: %v", q.GetName(), err) 115 } 116 } 117 } 118} 119 120// basePushForShutdown tries to requeue items into the base queue when the WorkerPoolQueue is shutting down. 121// If the queue is shutting down, it returns true and try to push the items 122// Otherwise it does nothing and returns false 123func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool { 124 shutdownTimeout := time.Duration(q.shutdownTimeout.Load()) 125 if shutdownTimeout == 0 { 126 return false 127 } 128 ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout) 129 defer ctxShutdownCancel() 130 for _, item := range items { 131 // if there is still any error, the queue can do nothing instead of losing the items 132 if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil { 133 log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err) 134 } 135 } 136 return true 137} 138 139// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items. 140func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) { 141 wp.wg.Add(1) 142 143 go func() { 144 defer wp.wg.Done() 145 146 log.Debug("Queue %q starts new worker", q.GetName()) 147 defer log.Debug("Queue %q stops idle worker", q.GetName()) 148 149 t := time.NewTicker(workerIdleDuration) 150 defer t.Stop() 151 152 keepWorking := true 153 stopWorking := func() { 154 q.workerNumMu.Lock() 155 keepWorking = false 156 q.workerNum-- 157 q.workerNumMu.Unlock() 158 } 159 for keepWorking { 160 select { 161 case <-wp.ctxWorker.Done(): 162 stopWorking() 163 case batch, ok := <-q.batchChan: 164 if !ok { 165 stopWorking() 166 continue 167 } 168 q.doWorkerHandle(batch) 169 // reset the idle ticker, and drain the tick after reset in case a tick is already triggered 170 t.Reset(workerIdleDuration) 171 select { 172 case <-t.C: 173 default: 174 } 175 case <-t.C: 176 q.workerNumMu.Lock() 177 keepWorking = q.workerNum <= 1 // keep the last worker running 178 if !keepWorking { 179 q.workerNum-- 180 } 181 q.workerNumMu.Unlock() 182 } 183 } 184 }() 185} 186 187// doFlush flushes the queue: it tries to read all items from the queue and handles them. 188// It is for testing purpose only. It's not designed to work for a cluster. 189func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) { 190 log.Debug("Queue %q starts flushing", q.GetName()) 191 defer log.Debug("Queue %q finishes flushing", q.GetName()) 192 193 // stop all workers, and prepare a new worker context to start new workers 194 195 wg.ctxWorkerCancel() 196 wg.wg.Wait() 197 198 defer func() { 199 close(flush) 200 wg.doPrepareWorkerContext() 201 }() 202 203 // drain the batch channel first 204loop: 205 for { 206 select { 207 case batch := <-q.batchChan: 208 q.doWorkerHandle(batch) 209 default: 210 break loop 211 } 212 } 213 214 // drain the popItem channel 215 emptyCounter := 0 216 for { 217 select { 218 case data, dataOk := <-wg.popItemChan: 219 if !dataOk { 220 return 221 } 222 emptyCounter = 0 223 if v, jsonOk := q.unmarshal(data); !jsonOk { 224 continue 225 } else { 226 q.doWorkerHandle([]T{v}) 227 } 228 case err := <-wg.popItemErr: 229 if !q.isCtxRunCanceled() { 230 log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err) 231 } 232 return 233 case <-q.ctxRun.Done(): 234 log.Debug("Queue %q is shutting down", q.GetName()) 235 return 236 case <-time.After(20 * time.Millisecond): 237 // There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables. 238 // If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance. 239 // Luckily, the "Flush" trick is only used in tests, so far so good. 240 if cnt, _ := q.baseQueue.Len(q.ctxRun); cnt == 0 && len(wg.popItemChan) == 0 { 241 emptyCounter++ 242 } 243 if emptyCounter >= 2 { 244 return 245 } 246 } 247 } 248} 249 250func (q *WorkerPoolQueue[T]) isCtxRunCanceled() bool { 251 select { 252 case <-q.ctxRun.Done(): 253 return true 254 default: 255 return false 256 } 257} 258 259var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip reading other flush requests 260 261// doRun is the main loop of the queue. All related "doXxx" functions are executed in its context. 262func (q *WorkerPoolQueue[T]) doRun() { 263 pprof.SetGoroutineLabels(q.ctxRun) 264 265 log.Debug("Queue %q starts running", q.GetName()) 266 defer log.Debug("Queue %q stops running", q.GetName()) 267 268 wg := &workerGroup[T]{q: q} 269 wg.doPrepareWorkerContext() 270 wg.popItemChan, wg.popItemErr = popItemByChan(q.ctxRun, q.baseQueue.PopItem) 271 272 defer func() { 273 q.ctxRunCancel() 274 275 // drain all data on the fly 276 // since the queue is shutting down, the items can't be dispatched to workers because the context is canceled 277 // it can't call doWorkerHandle either, because there is no chance to push unhandled items back to the queue 278 var unhandled []T 279 close(q.batchChan) 280 for batch := range q.batchChan { 281 unhandled = append(unhandled, batch...) 282 } 283 unhandled = append(unhandled, wg.batchBuffer...) 284 for data := range wg.popItemChan { 285 if v, ok := q.unmarshal(data); ok { 286 unhandled = append(unhandled, v) 287 } 288 } 289 290 shutdownTimeout := time.Duration(q.shutdownTimeout.Load()) 291 if shutdownTimeout != 0 { 292 // if there is a shutdown context, try to push the items back to the base queue 293 q.basePushForShutdown(unhandled...) 294 workerDone := make(chan struct{}) 295 // the only way to wait for the workers, because the handlers do not have context to wait for 296 go func() { wg.wg.Wait(); close(workerDone) }() 297 select { 298 case <-workerDone: 299 case <-time.After(shutdownTimeout): 300 log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName()) 301 } 302 } else { 303 // if there is no shutdown context, just call the handler to try to handle the items. if the handler fails again, the items are lost 304 q.safeHandler(unhandled...) 305 } 306 307 close(q.shutdownDone) 308 }() 309 310 var batchDispatchC <-chan time.Time = infiniteTimerC 311 for { 312 select { 313 case data, dataOk := <-wg.popItemChan: 314 if !dataOk { 315 return 316 } 317 if v, jsonOk := q.unmarshal(data); !jsonOk { 318 testRecorder.Record("pop:corrupted:%s", data) // in rare cases the levelqueue(leveldb) might be corrupted 319 continue 320 } else { 321 wg.batchBuffer = append(wg.batchBuffer, v) 322 } 323 if len(wg.batchBuffer) >= q.batchLength { 324 q.doDispatchBatchToWorker(wg, q.flushChan) 325 } else if batchDispatchC == infiniteTimerC { 326 batchDispatchC = time.After(batchDebounceDuration) 327 } // else: batchDispatchC is already a debounce timer, it will be triggered soon 328 case <-batchDispatchC: 329 batchDispatchC = infiniteTimerC 330 q.doDispatchBatchToWorker(wg, q.flushChan) 331 case flush := <-q.flushChan: 332 // before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running 333 // after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish 334 // since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan. 335 q.doDispatchBatchToWorker(wg, skipFlushChan) 336 q.doFlush(wg, flush) 337 case err, errOk := <-wg.popItemErr: 338 if !errOk { 339 return 340 } 341 if !q.isCtxRunCanceled() { 342 log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err) 343 } 344 return 345 case <-q.ctxRun.Done(): 346 log.Debug("Queue %q is shutting down", q.GetName()) 347 return 348 } 349 } 350}