1// Copyright 2023 The Gitea Authors. All rights reserved.
2// SPDX-License-Identifier: MIT
3
4package queue
5
6import (
7 "context"
8 "runtime/pprof"
9 "sync"
10 "sync/atomic"
11 "time"
12
13 "forgejo.org/modules/log"
14)
15
16var (
17 infiniteTimerC = make(chan time.Time)
18 batchDebounceDuration = 100 * time.Millisecond
19 workerIdleDuration = 1 * time.Second
20 shutdownDefaultTimeout = 2 * time.Second
21
22 unhandledItemRequeueDuration atomic.Int64 // to avoid data race during test
23)
24
25func init() {
26 unhandledItemRequeueDuration.Store(int64(5 * time.Second))
27}
28
29// workerGroup is a group of workers to work with a WorkerPoolQueue
30type workerGroup[T any] struct {
31 q *WorkerPoolQueue[T]
32 wg sync.WaitGroup
33
34 ctxWorker context.Context
35 ctxWorkerCancel context.CancelFunc
36
37 batchBuffer []T
38 popItemChan chan []byte
39 popItemErr chan error
40}
41
42func (wg *workerGroup[T]) doPrepareWorkerContext() {
43 wg.ctxWorker, wg.ctxWorkerCancel = context.WithCancel(wg.q.ctxRun)
44}
45
46// doDispatchBatchToWorker dispatches a batch of items to worker's channel.
47// If the channel is full, it tries to start a new worker if possible.
48func (q *WorkerPoolQueue[T]) doDispatchBatchToWorker(wg *workerGroup[T], flushChan chan flushType) {
49 batch := wg.batchBuffer
50 wg.batchBuffer = nil
51
52 if len(batch) == 0 {
53 return
54 }
55
56 full := false
57 select {
58 case q.batchChan <- batch:
59 default:
60 full = true
61 }
62
63 // TODO: the logic could be improved in the future, to avoid a data-race between "doStartNewWorker" and "workerNum"
64 // The root problem is that if we skip "doStartNewWorker" here, the "workerNum" might be decreased by other workers later
65 // So ideally, it should check whether there are enough workers by some approaches, and start new workers if necessary.
66 q.workerNumMu.Lock()
67 noWorker := q.workerNum == 0
68 if full || noWorker {
69 if q.workerNum < q.workerMaxNum || noWorker && q.workerMaxNum <= 0 {
70 q.workerNum++
71 q.doStartNewWorker(wg)
72 }
73 }
74 q.workerNumMu.Unlock()
75
76 if full {
77 select {
78 case q.batchChan <- batch:
79 case flush := <-flushChan:
80 q.doWorkerHandle(batch)
81 q.doFlush(wg, flush)
82 case <-q.ctxRun.Done():
83 wg.batchBuffer = batch // return the batch to buffer, the "doRun" function will handle it
84 }
85 }
86}
87
88// doWorkerHandle calls the safeHandler to handle a batch of items, and it increases/decreases the active worker number.
89// If the context has been canceled, it should not be caller because the "Push" still needs the context, in such case, call q.safeHandler directly
90func (q *WorkerPoolQueue[T]) doWorkerHandle(batch []T) {
91 q.workerNumMu.Lock()
92 q.workerActiveNum++
93 q.workerNumMu.Unlock()
94
95 defer func() {
96 q.workerNumMu.Lock()
97 q.workerActiveNum--
98 q.workerNumMu.Unlock()
99 }()
100
101 unhandled := q.safeHandler(batch...)
102 // if none of the items were handled, it should back-off for a few seconds
103 // in this case the handler (eg: document indexer) may have encountered some errors/failures
104 if len(unhandled) == len(batch) && unhandledItemRequeueDuration.Load() != 0 {
105 log.Error("Queue %q failed to handle batch of %d items, backoff for a few seconds", q.GetName(), len(batch))
106 select {
107 case <-q.ctxRun.Done():
108 case <-time.After(time.Duration(unhandledItemRequeueDuration.Load())):
109 }
110 }
111 for _, item := range unhandled {
112 if err := q.Push(item); err != nil {
113 if !q.basePushForShutdown(item) {
114 log.Error("Failed to requeue item for queue %q when calling handler: %v", q.GetName(), err)
115 }
116 }
117 }
118}
119
120// basePushForShutdown tries to requeue items into the base queue when the WorkerPoolQueue is shutting down.
121// If the queue is shutting down, it returns true and try to push the items
122// Otherwise it does nothing and returns false
123func (q *WorkerPoolQueue[T]) basePushForShutdown(items ...T) bool {
124 shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
125 if shutdownTimeout == 0 {
126 return false
127 }
128 ctxShutdown, ctxShutdownCancel := context.WithTimeout(context.Background(), shutdownTimeout)
129 defer ctxShutdownCancel()
130 for _, item := range items {
131 // if there is still any error, the queue can do nothing instead of losing the items
132 if err := q.baseQueue.PushItem(ctxShutdown, q.marshal(item)); err != nil {
133 log.Error("Failed to requeue item for queue %q when shutting down: %v", q.GetName(), err)
134 }
135 }
136 return true
137}
138
139// doStartNewWorker starts a new worker for the queue, the worker reads from worker's channel and handles the items.
140func (q *WorkerPoolQueue[T]) doStartNewWorker(wp *workerGroup[T]) {
141 wp.wg.Add(1)
142
143 go func() {
144 defer wp.wg.Done()
145
146 log.Debug("Queue %q starts new worker", q.GetName())
147 defer log.Debug("Queue %q stops idle worker", q.GetName())
148
149 t := time.NewTicker(workerIdleDuration)
150 defer t.Stop()
151
152 keepWorking := true
153 stopWorking := func() {
154 q.workerNumMu.Lock()
155 keepWorking = false
156 q.workerNum--
157 q.workerNumMu.Unlock()
158 }
159 for keepWorking {
160 select {
161 case <-wp.ctxWorker.Done():
162 stopWorking()
163 case batch, ok := <-q.batchChan:
164 if !ok {
165 stopWorking()
166 continue
167 }
168 q.doWorkerHandle(batch)
169 // reset the idle ticker, and drain the tick after reset in case a tick is already triggered
170 t.Reset(workerIdleDuration)
171 select {
172 case <-t.C:
173 default:
174 }
175 case <-t.C:
176 q.workerNumMu.Lock()
177 keepWorking = q.workerNum <= 1 // keep the last worker running
178 if !keepWorking {
179 q.workerNum--
180 }
181 q.workerNumMu.Unlock()
182 }
183 }
184 }()
185}
186
187// doFlush flushes the queue: it tries to read all items from the queue and handles them.
188// It is for testing purpose only. It's not designed to work for a cluster.
189func (q *WorkerPoolQueue[T]) doFlush(wg *workerGroup[T], flush flushType) {
190 log.Debug("Queue %q starts flushing", q.GetName())
191 defer log.Debug("Queue %q finishes flushing", q.GetName())
192
193 // stop all workers, and prepare a new worker context to start new workers
194
195 wg.ctxWorkerCancel()
196 wg.wg.Wait()
197
198 defer func() {
199 close(flush)
200 wg.doPrepareWorkerContext()
201 }()
202
203 // drain the batch channel first
204loop:
205 for {
206 select {
207 case batch := <-q.batchChan:
208 q.doWorkerHandle(batch)
209 default:
210 break loop
211 }
212 }
213
214 // drain the popItem channel
215 emptyCounter := 0
216 for {
217 select {
218 case data, dataOk := <-wg.popItemChan:
219 if !dataOk {
220 return
221 }
222 emptyCounter = 0
223 if v, jsonOk := q.unmarshal(data); !jsonOk {
224 continue
225 } else {
226 q.doWorkerHandle([]T{v})
227 }
228 case err := <-wg.popItemErr:
229 if !q.isCtxRunCanceled() {
230 log.Error("Failed to pop item from queue %q (doFlush): %v", q.GetName(), err)
231 }
232 return
233 case <-q.ctxRun.Done():
234 log.Debug("Queue %q is shutting down", q.GetName())
235 return
236 case <-time.After(20 * time.Millisecond):
237 // There is no reliable way to make sure all queue items are consumed by the Flush, there always might be some items stored in some buffers/temp variables.
238 // If we run Gitea in a cluster, we can even not guarantee all items are consumed in a deterministic instance.
239 // Luckily, the "Flush" trick is only used in tests, so far so good.
240 if cnt, _ := q.baseQueue.Len(q.ctxRun); cnt == 0 && len(wg.popItemChan) == 0 {
241 emptyCounter++
242 }
243 if emptyCounter >= 2 {
244 return
245 }
246 }
247 }
248}
249
250func (q *WorkerPoolQueue[T]) isCtxRunCanceled() bool {
251 select {
252 case <-q.ctxRun.Done():
253 return true
254 default:
255 return false
256 }
257}
258
259var skipFlushChan = make(chan flushType) // an empty flush chan, used to skip reading other flush requests
260
261// doRun is the main loop of the queue. All related "doXxx" functions are executed in its context.
262func (q *WorkerPoolQueue[T]) doRun() {
263 pprof.SetGoroutineLabels(q.ctxRun)
264
265 log.Debug("Queue %q starts running", q.GetName())
266 defer log.Debug("Queue %q stops running", q.GetName())
267
268 wg := &workerGroup[T]{q: q}
269 wg.doPrepareWorkerContext()
270 wg.popItemChan, wg.popItemErr = popItemByChan(q.ctxRun, q.baseQueue.PopItem)
271
272 defer func() {
273 q.ctxRunCancel()
274
275 // drain all data on the fly
276 // since the queue is shutting down, the items can't be dispatched to workers because the context is canceled
277 // it can't call doWorkerHandle either, because there is no chance to push unhandled items back to the queue
278 var unhandled []T
279 close(q.batchChan)
280 for batch := range q.batchChan {
281 unhandled = append(unhandled, batch...)
282 }
283 unhandled = append(unhandled, wg.batchBuffer...)
284 for data := range wg.popItemChan {
285 if v, ok := q.unmarshal(data); ok {
286 unhandled = append(unhandled, v)
287 }
288 }
289
290 shutdownTimeout := time.Duration(q.shutdownTimeout.Load())
291 if shutdownTimeout != 0 {
292 // if there is a shutdown context, try to push the items back to the base queue
293 q.basePushForShutdown(unhandled...)
294 workerDone := make(chan struct{})
295 // the only way to wait for the workers, because the handlers do not have context to wait for
296 go func() { wg.wg.Wait(); close(workerDone) }()
297 select {
298 case <-workerDone:
299 case <-time.After(shutdownTimeout):
300 log.Error("Queue %q is shutting down, but workers are still running after timeout", q.GetName())
301 }
302 } else {
303 // if there is no shutdown context, just call the handler to try to handle the items. if the handler fails again, the items are lost
304 q.safeHandler(unhandled...)
305 }
306
307 close(q.shutdownDone)
308 }()
309
310 var batchDispatchC <-chan time.Time = infiniteTimerC
311 for {
312 select {
313 case data, dataOk := <-wg.popItemChan:
314 if !dataOk {
315 return
316 }
317 if v, jsonOk := q.unmarshal(data); !jsonOk {
318 testRecorder.Record("pop:corrupted:%s", data) // in rare cases the levelqueue(leveldb) might be corrupted
319 continue
320 } else {
321 wg.batchBuffer = append(wg.batchBuffer, v)
322 }
323 if len(wg.batchBuffer) >= q.batchLength {
324 q.doDispatchBatchToWorker(wg, q.flushChan)
325 } else if batchDispatchC == infiniteTimerC {
326 batchDispatchC = time.After(batchDebounceDuration)
327 } // else: batchDispatchC is already a debounce timer, it will be triggered soon
328 case <-batchDispatchC:
329 batchDispatchC = infiniteTimerC
330 q.doDispatchBatchToWorker(wg, q.flushChan)
331 case flush := <-q.flushChan:
332 // before flushing, it needs to try to dispatch the batch to worker first, in case there is no worker running
333 // after the flushing, there is at least one worker running, so "doFlush" could wait for workers to finish
334 // since we are already in a "flush" operation, so the dispatching function shouldn't read the flush chan.
335 q.doDispatchBatchToWorker(wg, skipFlushChan)
336 q.doFlush(wg, flush)
337 case err, errOk := <-wg.popItemErr:
338 if !errOk {
339 return
340 }
341 if !q.isCtxRunCanceled() {
342 log.Error("Failed to pop item from queue %q (doRun): %v", q.GetName(), err)
343 }
344 return
345 case <-q.ctxRun.Done():
346 log.Debug("Queue %q is shutting down", q.GetName())
347 return
348 }
349 }
350}