loading up the forgejo repo on tangled to test page performance
at forgejo 3.3 kB view raw
1// Copyright 2019 The Gitea Authors. All rights reserved. 2// SPDX-License-Identifier: MIT 3 4package queue 5 6import ( 7 "context" 8 "sync" 9 "time" 10 11 "forgejo.org/modules/log" 12 "forgejo.org/modules/setting" 13) 14 15// Manager is a manager for the queues created by "CreateXxxQueue" functions, these queues are called "managed queues". 16type Manager struct { 17 mu sync.Mutex 18 19 qidCounter int64 20 Queues map[int64]ManagedWorkerPoolQueue 21} 22 23type ManagedWorkerPoolQueue interface { 24 GetName() string 25 GetType() string 26 GetItemTypeName() string 27 GetWorkerNumber() int 28 GetWorkerActiveNumber() int 29 GetWorkerMaxNumber() int 30 SetWorkerMaxNumber(num int) 31 GetQueueItemNumber() int 32 33 // FlushWithContext tries to make the handler process all items in the queue synchronously. 34 // It is for testing purpose only. It's not designed to be used in a cluster. 35 FlushWithContext(ctx context.Context, timeout time.Duration) error 36 37 // RemoveAllItems removes all items in the base queue (on-the-fly items are not affected) 38 RemoveAllItems(ctx context.Context) error 39} 40 41var manager *Manager 42 43func init() { 44 manager = &Manager{ 45 Queues: make(map[int64]ManagedWorkerPoolQueue), 46 } 47} 48 49func GetManager() *Manager { 50 return manager 51} 52 53func (m *Manager) AddManagedQueue(managed ManagedWorkerPoolQueue) { 54 m.mu.Lock() 55 defer m.mu.Unlock() 56 m.qidCounter++ 57 m.Queues[m.qidCounter] = managed 58} 59 60func (m *Manager) GetManagedQueue(qid int64) ManagedWorkerPoolQueue { 61 m.mu.Lock() 62 defer m.mu.Unlock() 63 return m.Queues[qid] 64} 65 66func (m *Manager) ManagedQueues() map[int64]ManagedWorkerPoolQueue { 67 m.mu.Lock() 68 defer m.mu.Unlock() 69 70 queues := make(map[int64]ManagedWorkerPoolQueue, len(m.Queues)) 71 for k, v := range m.Queues { 72 queues[k] = v 73 } 74 return queues 75} 76 77// FlushAll tries to make all managed queues process all items synchronously, until timeout or the queue is empty. 78// It is for testing purpose only. It's not designed to be used in a cluster. 79func (m *Manager) FlushAll(ctx context.Context, timeout time.Duration) error { 80 var finalErr error 81 qs := m.ManagedQueues() 82 for _, q := range qs { 83 if err := q.FlushWithContext(ctx, timeout); err != nil { 84 finalErr = err // TODO: in Go 1.20: errors.Join 85 } 86 } 87 return finalErr 88} 89 90// CreateSimpleQueue creates a simple queue from global setting config provider by name 91func CreateSimpleQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] { 92 return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, false) 93} 94 95// CreateUniqueQueue creates a unique queue from global setting config provider by name 96func CreateUniqueQueue[T any](ctx context.Context, name string, handler HandlerFuncT[T]) *WorkerPoolQueue[T] { 97 return createWorkerPoolQueue(ctx, name, setting.CfgProvider, handler, true) 98} 99 100func createWorkerPoolQueue[T any](ctx context.Context, name string, cfgProvider setting.ConfigProvider, handler HandlerFuncT[T], unique bool) *WorkerPoolQueue[T] { 101 queueSetting, err := setting.GetQueueSettings(cfgProvider, name) 102 if err != nil { 103 log.Error("Failed to get queue settings for %q: %v", name, err) 104 return nil 105 } 106 w, err := NewWorkerPoolQueueWithContext(ctx, name, queueSetting, handler, unique) 107 if err != nil { 108 log.Error("Failed to create queue %q: %v", name, err) 109 return nil 110 } 111 GetManager().AddManagedQueue(w) 112 return w 113}