1// Copyright 2019 The Gitea Authors. All rights reserved.
2// SPDX-License-Identifier: MIT
3
4package setting
5
6import (
7 "path/filepath"
8 "runtime"
9
10 "forgejo.org/modules/json"
11 "forgejo.org/modules/log"
12)
13
14// QueueSettings represent the settings for a queue from the ini
15type QueueSettings struct {
16 Name string // not an INI option, it is the name for [queue.the-name] section
17
18 Type string
19 Datadir string
20 ConnStr string // for leveldb or redis
21 Length int // max queue length before blocking
22
23 QueueName, SetName string // the name suffix for storage (db key, redis key), "set" is for unique queue
24
25 BatchLength int
26 MaxWorkers int
27}
28
29func GetQueueSettings(rootCfg ConfigProvider, name string) (QueueSettings, error) {
30 queueSettingsDefault := QueueSettings{
31 Type: "level", // dummy, channel, level, redis
32 Datadir: "queues/common", // relative to AppDataPath
33 Length: 100000, // queue length before a channel queue will block
34
35 QueueName: "_queue",
36 SetName: "_unique",
37 BatchLength: 20,
38 MaxWorkers: runtime.NumCPU() / 2,
39 }
40 if queueSettingsDefault.MaxWorkers < 1 {
41 queueSettingsDefault.MaxWorkers = 1
42 }
43 if queueSettingsDefault.MaxWorkers > 10 {
44 queueSettingsDefault.MaxWorkers = 10
45 }
46
47 // deep copy default settings
48 cfg := QueueSettings{}
49 if cfgBs, err := json.Marshal(queueSettingsDefault); err != nil {
50 return cfg, err
51 } else if err = json.Unmarshal(cfgBs, &cfg); err != nil {
52 return cfg, err
53 }
54
55 cfg.Name = name
56 if sec, err := rootCfg.GetSection("queue"); err == nil {
57 if err = sec.MapTo(&cfg); err != nil {
58 log.Error("Failed to map queue common config for %q: %v", name, err)
59 return cfg, nil
60 }
61 }
62 if sec, err := rootCfg.GetSection("queue." + name); err == nil {
63 if err = sec.MapTo(&cfg); err != nil {
64 log.Error("Failed to map queue spec config for %q: %v", name, err)
65 return cfg, nil
66 }
67 if sec.HasKey("CONN_STR") {
68 cfg.ConnStr = sec.Key("CONN_STR").String()
69 }
70 }
71
72 if cfg.Datadir == "" {
73 cfg.Datadir = queueSettingsDefault.Datadir
74 }
75 if !filepath.IsAbs(cfg.Datadir) {
76 cfg.Datadir = filepath.Join(AppDataPath, cfg.Datadir)
77 }
78 cfg.Datadir = filepath.ToSlash(cfg.Datadir)
79
80 if cfg.Type == "redis" && cfg.ConnStr == "" {
81 cfg.ConnStr = "redis://127.0.0.1:6379/0"
82 }
83
84 if cfg.Length <= 0 {
85 cfg.Length = queueSettingsDefault.Length
86 }
87 if cfg.MaxWorkers <= 0 {
88 cfg.MaxWorkers = queueSettingsDefault.MaxWorkers
89 }
90 if cfg.BatchLength <= 0 {
91 cfg.BatchLength = queueSettingsDefault.BatchLength
92 }
93
94 return cfg, nil
95}
96
97func LoadQueueSettings() {
98 loadQueueFrom(CfgProvider)
99}
100
101func loadQueueFrom(rootCfg ConfigProvider) {
102 hasOld := false
103 handleOldLengthConfiguration := func(rootCfg ConfigProvider, newQueueName, oldSection, oldKey string) {
104 if rootCfg.Section(oldSection).HasKey(oldKey) {
105 hasOld = true
106 log.Error("Removed queue option: `[%s].%s`. Use new options in `[queue.%s]`", oldSection, oldKey, newQueueName)
107 }
108 }
109 handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_TYPE")
110 handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_BATCH_NUMBER")
111 handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_DIR")
112 handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "ISSUE_INDEXER_QUEUE_CONN_STR")
113 handleOldLengthConfiguration(rootCfg, "issue_indexer", "indexer", "UPDATE_BUFFER_LEN")
114 handleOldLengthConfiguration(rootCfg, "mailer", "mailer", "SEND_BUFFER_LEN")
115 handleOldLengthConfiguration(rootCfg, "pr_patch_checker", "repository", "PULL_REQUEST_QUEUE_LENGTH")
116 handleOldLengthConfiguration(rootCfg, "mirror", "repository", "MIRROR_QUEUE_LENGTH")
117 if hasOld {
118 log.Fatal("Please update your app.ini to remove deprecated config options")
119 }
120}