1// Copyright 2019 The Gitea Authors. All rights reserved.
2// SPDX-License-Identifier: MIT
3
4package queue
5
6import (
7 "context"
8 "sync"
9 "time"
10
11 "forgejo.org/modules/log"
12 "forgejo.org/modules/setting"
13)
14
15// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues".
16type Manager struct {
17 mu sync.Mutex
18
19 qidCounter int64
20 Queues map[int64]ManagedWorkerPoolQueue
21}
22
23type ManagedWorkerPoolQueue interface {
24 GetName() string
25 GetType() string
26 GetItemTypeName() string
27 GetWorkerNumber() int
28 GetWorkerActiveNumber() int
29 GetWorkerMaxNumber() int
30 SetWorkerMaxNumber(num int)
31 GetQueueItemNumber() int
32
33 // FlushWithContext tries to make the handler process all items in the queue synchronously.
34 // It is for testing purpose only. It's not designed to be used in a cluster.
35 FlushWithContext(ctx context.Context, timeout time.Duration) error
36
37 // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected)
38 RemoveAllItems(ctx context.Context) error
39}
40
41var manager *Manager
42
43func init() {
44 manager = &Manager{
45 Queues: make(map[int64]ManagedWorkerPoolQueue),
46 }
47}
48
49func GetManager() *Manager {
50 return manager
51}
52
53func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) {
54 m.mu.Lock()
55 defer m.mu.Unlock()
56 m.qidCounter++
57 m.Queues[m.qidCounter] = managed
58}
59
60func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue {
61 m.mu.Lock()
62 defer m.mu.Unlock()
63 return m.Queues[qid]
64}
65
66func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue {
67 m.mu.Lock()
68 defer m.mu.Unlock()
69
70 queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues))
71 for k, v := range m.Queues {
72 queues[k] = v
73 }
74 return queues
75}
76
77// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty.
78// It is for testing purpose only. It's not designed to be used in a cluster.
79func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error {
80 var finalErr error
81 qs := m.ManagedQueues()
82 for _, q := range qs {
83 if err := q.FlushWithContext(ctx, timeout); err != nil {
84 finalErr = err // TODO: in Go 1.20: errors.Join
85 }
86 }
87 return finalErr
88}
89
90// CreateSimpleQueue creates a simple queue from global setting config provider by name
91func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
92 return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false)
93}
94
95// CreateUniqueQueue creates a unique queue from global setting config provider by name
96func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] {
97 return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true)
98}
99
100func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] {
101 queueSetting, err := setting.GetQueueSettings(cfgProvider, name)
102 if err != nil {
103 log.Error("Failed to get queue settings for %q: %v", name, err)
104 return nil
105 }
106 w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique)
107 if err != nil {
108 log.Error("Failed to create queue %q: %v", name, err)
109 return nil
110 }
111 GetManager().AddManagedQueue(w)
112 return w
113}