A privacy-first, self-hosted, fully open source personal knowledge management software, written in typescript and golang. (PERSONAL FORK)
1// SiYuan - Refactor your thinking
2// Copyright (c) 2020-present, b3log.org
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17package task
18
19import (
20 "context"
21 "reflect"
22 "slices"
23 "sync"
24 "time"
25
26 "github.com/88250/gulu"
27 "github.com/siyuan-note/logging"
28 "github.com/siyuan-note/siyuan/kernel/util"
29)
30
31var (
32 taskQueue []*Task
33 queueLock = sync.Mutex{}
34)
35
36type Task struct {
37 Action string
38 Handler reflect.Value
39 Args []interface{}
40 Created time.Time
41 Async bool // 为 true 说明是异步任务,不会阻塞任务队列,满足 Delay 条件后立即执行
42 Delay time.Duration
43 Timeout time.Duration
44}
45
46func AppendTask(action string, handler interface{}, args ...interface{}) {
47 appendTaskWithDelayTimeout(action, false, 0, 24*time.Hour, handler, args...)
48}
49
50func AppendAsyncTaskWithDelay(action string, delay time.Duration, handler interface{}, args ...interface{}) {
51 appendTaskWithDelayTimeout(action, true, delay, 24*time.Hour, handler, args...)
52}
53
54func AppendTaskWithTimeout(action string, timeout time.Duration, handler interface{}, args ...interface{}) {
55 appendTaskWithDelayTimeout(action, false, 0, timeout, handler, args...)
56}
57
58func appendTaskWithDelayTimeout(action string, async bool, delay, timeout time.Duration, handler interface{}, args ...interface{}) {
59 if util.IsExiting.Load() {
60 //logging.LogWarnf("task queue is paused, action [%s] will be ignored", action)
61 return
62 }
63
64 task := &Task{
65 Action: action,
66 Handler: reflect.ValueOf(handler),
67 Args: args,
68 Created: time.Now(),
69 Async: async,
70 Delay: delay,
71 Timeout: timeout,
72 }
73
74 if gulu.Str.Contains(action, uniqueActions) {
75 if currentTasks := getCurrentTasks(); containTask(task, currentTasks) {
76 //logging.LogWarnf("task [%s] is already in queue, will be ignored", action)
77 return
78 }
79 }
80
81 queueLock.Lock()
82 defer queueLock.Unlock()
83 taskQueue = append(taskQueue, task)
84}
85
86func containTask(task *Task, tasks []*Task) bool {
87 for _, t := range tasks {
88 if t.Action == task.Action {
89 if len(t.Args) != len(task.Args) {
90 return false
91 }
92
93 for i, arg := range t.Args {
94 if !reflect.DeepEqual(arg, task.Args[i]) {
95 return false
96 }
97 }
98 return true
99 }
100 }
101 return false
102}
103
104func getCurrentTasks() (ret []*Task) {
105 queueLock.Lock()
106 defer queueLock.Unlock()
107
108 currentTaskLock.Lock()
109 if nil != currentTask {
110 ret = append(ret, currentTask)
111 }
112 currentTaskLock.Unlock()
113
114 for _, task := range taskQueue {
115 ret = append(ret, task)
116 }
117 return
118}
119
120const (
121 RepoCheckout = "task.repo.checkout" // 从快照中检出
122 RepoAutoPurge = "task.repo.autoPurge" // 自动清理数据仓库
123 DatabaseIndexFull = "task.database.index.full" // 重建索引
124 DatabaseIndex = "task.database.index" // 数据库索引
125 DatabaseIndexCommit = "task.database.index.commit" // 数据库索引提交
126 DatabaseIndexRef = "task.database.index.ref" // 数据库索引引用
127 DatabaseIndexFix = "task.database.index.fix" // 数据库索引订正
128 OCRImage = "task.ocr.image" // 图片 OCR 提取文本
129 HistoryGenerateFile = "task.history.generateFile" // 生成文件历史
130 HistoryDatabaseIndexFull = "task.history.database.index.full" // 历史数据库重建索引
131 HistoryDatabaseIndexCommit = "task.history.database.index.commit" // 历史数据库索引提交
132 DatabaseIndexEmbedBlock = "task.database.index.embedBlock" // 数据库索引嵌入块
133 ReloadUI = "task.reload.ui" // 重载 UI
134 AssetContentDatabaseIndexFull = "task.asset.database.index.full" // 资源文件数据库重建索引
135 AssetContentDatabaseIndexCommit = "task.asset.database.index.commit" // 资源文件数据库索引提交
136 CacheVirtualBlockRef = "task.cache.virtualBlockRef" // 缓存虚拟块引用
137 ReloadAttributeView = "task.reload.attributeView" // 重新加载属性视图
138 ReloadProtyle = "task.reload.protyle" // 重新加载编辑器
139 ReloadTag = "task.reload.tag" // 重新加载标签面板
140 ReloadFiletree = "task.reload.filetree" // 重新加载文档树面板
141 SetRefDynamicText = "task.ref.setDynamicText" // 设置引用的动态锚文本
142 SetDefRefCount = "task.def.setRefCount" // 设置定义的引用计数
143 UpdateIDs = "task.update.ids" // 更新 ID
144 PushMsg = "task.push.msg" // 推送消息
145)
146
147// uniqueActions 描述了唯一的任务,即队列中只能存在一个在执行的任务。
148var uniqueActions = []string{
149 RepoCheckout,
150 RepoAutoPurge,
151 DatabaseIndexFull,
152 DatabaseIndexCommit,
153 OCRImage,
154 HistoryGenerateFile,
155 HistoryDatabaseIndexFull,
156 HistoryDatabaseIndexCommit,
157 AssetContentDatabaseIndexFull,
158 AssetContentDatabaseIndexCommit,
159 ReloadAttributeView,
160 ReloadProtyle,
161 ReloadTag,
162 ReloadFiletree,
163 SetRefDynamicText,
164 SetDefRefCount,
165 UpdateIDs,
166}
167
168func ContainIndexTask() bool {
169 tasks := getCurrentTasks()
170 for _, task := range tasks {
171 if gulu.Str.Contains(task.Action, []string{DatabaseIndexFull, DatabaseIndex}) {
172 return true
173 }
174 }
175 return false
176}
177
178func StatusJob() {
179 var items []map[string]interface{}
180 count := map[string]int{}
181 actionLangs := util.TaskActionLangs[util.Lang]
182
183 queueLock.Lock()
184 for _, task := range taskQueue {
185 action := task.Action
186 if c := count[action]; 7 < c {
187 logging.LogWarnf("too many tasks [%s], ignore show its status", action)
188 continue
189 }
190 count[action]++
191
192 if nil != actionLangs {
193 if label := actionLangs[task.Action]; nil != label {
194 action = label.(string)
195 } else {
196 continue
197 }
198 }
199
200 item := map[string]interface{}{"action": action}
201 items = append(items, item)
202 }
203 defer queueLock.Unlock()
204
205 currentTaskLock.Lock()
206 if nil != currentTask && nil != actionLangs {
207 if label := actionLangs[currentTask.Action]; nil != label {
208 items = append([]map[string]interface{}{{"action": label.(string)}}, items...)
209 }
210 }
211 currentTaskLock.Unlock()
212
213 if 1 > len(items) {
214 items = []map[string]interface{}{}
215 }
216 data := map[string]interface{}{}
217 data["tasks"] = items
218 util.PushBackgroundTask(data)
219}
220
221func ExecTaskJob() {
222 task := popTask()
223 if nil == task {
224 return
225 }
226
227 if util.IsExiting.Load() {
228 return
229 }
230
231 execTask(task)
232}
233
234func popTask() (ret *Task) {
235 queueLock.Lock()
236 defer queueLock.Unlock()
237
238 if 1 > len(taskQueue) {
239 return
240 }
241
242 for i, task := range taskQueue {
243 if time.Since(task.Created) <= task.Delay {
244 continue
245 }
246
247 if !task.Async {
248 ret = task
249 taskQueue = append(taskQueue[:i], taskQueue[i+1:]...)
250 return
251 }
252 }
253 return
254}
255
256func ExecAsyncTaskJob() {
257 tasks := popAsyncTasks()
258 if 1 > len(tasks) {
259 return
260 }
261
262 if util.IsExiting.Load() {
263 return
264 }
265
266 for _, task := range tasks {
267 go func() {
268 execTask(task)
269 }()
270 }
271}
272
273func popAsyncTasks() (ret []*Task) {
274 queueLock.Lock()
275 defer queueLock.Unlock()
276
277 if 1 > len(taskQueue) {
278 return
279 }
280
281 var popedIndexes []int
282 for i, task := range taskQueue {
283 if !task.Async {
284 continue
285 }
286
287 if time.Since(task.Created) <= task.Delay {
288 continue
289 }
290
291 if task.Async {
292 ret = append(ret, task)
293 popedIndexes = append(popedIndexes, i)
294 }
295 }
296
297 if 0 < len(popedIndexes) {
298 var newQueue []*Task
299 for i, task := range taskQueue {
300 if !slices.Contains(popedIndexes, i) {
301 newQueue = append(newQueue, task)
302 }
303 }
304 taskQueue = newQueue
305 }
306 return
307}
308
309var (
310 currentTask *Task
311 currentTaskLock = sync.Mutex{}
312)
313
314func execTask(task *Task) {
315 if nil == task {
316 return
317 }
318
319 defer logging.Recover()
320
321 args := make([]reflect.Value, len(task.Args))
322 for i, v := range task.Args {
323 if nil == v {
324 args[i] = reflect.New(task.Handler.Type().In(i)).Elem()
325 } else {
326 args[i] = reflect.ValueOf(v)
327 }
328 }
329
330 if !task.Async {
331 currentTaskLock.Lock()
332 currentTask = task
333 currentTaskLock.Unlock()
334 }
335
336 ctx, cancel := context.WithTimeout(context.Background(), task.Timeout)
337 defer cancel()
338 ch := make(chan bool, 1)
339 go func() {
340 task.Handler.Call(args)
341 ch <- true
342 }()
343
344 select {
345 case <-ctx.Done():
346 logging.LogWarnf("task [%s] timeout", task.Action)
347 case <-ch:
348 //logging.LogInfof("task [%s] done", task.Action)
349 }
350
351 if !task.Async {
352 currentTaskLock.Lock()
353 currentTask = nil
354 currentTaskLock.Unlock()
355 }
356}