A privacy-first, self-hosted, fully open source personal knowledge management software, written in typescript and golang. (PERSONAL FORK)
1// SiYuan - Refactor your thinking
2// Copyright (c) 2020-present, b3log.org
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17package sql
18
19import (
20 "database/sql"
21 "errors"
22 "fmt"
23 "path"
24 "runtime/debug"
25 "sync"
26 "sync/atomic"
27 "time"
28
29 "github.com/88250/lute/parse"
30 "github.com/siyuan-note/eventbus"
31 "github.com/siyuan-note/logging"
32 "github.com/siyuan-note/siyuan/kernel/task"
33 "github.com/siyuan-note/siyuan/kernel/util"
34)
35
36var (
37 operationQueue []*dbQueueOperation
38 dbQueueLock = sync.Mutex{}
39 txLock = sync.Mutex{}
40)
41
42type dbQueueOperation struct {
43 inQueueTime time.Time
44 action string // upsert/delete/delete_id/rename/rename_sub_tree/delete_box/delete_box_refs/index/delete_ids/update_block_content/delete_assets
45 indexTree *parse.Tree // index
46 upsertTree *parse.Tree // upsert/update_refs/delete_refs
47 removeTreeBox, removeTreePath string // delete
48 removeTreeID string // delete_id
49 removeTreeIDs []string // delete_ids
50 box string // delete_box/delete_box_refs/index
51 renameTree *parse.Tree // rename/rename_sub_tree
52 block *Block // update_block_content
53 id string // index_node
54 removeAssetHashes []string // delete_assets
55}
56
57func FlushTxJob() {
58 task.AppendTask(task.DatabaseIndexCommit, FlushQueue)
59}
60
61func ClearQueue() {
62 dbQueueLock.Lock()
63 defer dbQueueLock.Unlock()
64 operationQueue = nil
65}
66
67var flushingTx = atomic.Bool{}
68
69func FlushQueue() {
70 ops := getOperations()
71 total := len(ops)
72 if 1 > total && !flushingTx.Load() {
73 return
74 }
75
76 txLock.Lock()
77 flushingTx.Store(true)
78 defer func() {
79 flushingTx.Store(false)
80 txLock.Unlock()
81 }()
82
83 start := time.Now()
84
85 context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar}
86 if 512 < len(ops) {
87 disableCache()
88 defer enableCache()
89 }
90
91 groupOpsTotal := map[string]int{}
92 for _, op := range ops {
93 groupOpsTotal[op.action]++
94 }
95
96 groupOpsCurrent := map[string]int{}
97 for i, op := range ops {
98 if util.IsExiting.Load() {
99 return
100 }
101
102 tx, err := beginTx()
103 if err != nil {
104 return
105 }
106
107 groupOpsCurrent[op.action]++
108 context["current"] = groupOpsCurrent[op.action]
109 context["total"] = groupOpsTotal[op.action]
110 if err = execOp(op, tx, context); err != nil {
111 tx.Rollback()
112 logging.LogErrorf("queue operation [%s] failed: %s", op.action, err)
113 continue
114 }
115
116 if err = commitTx(tx); err != nil {
117 logging.LogErrorf("commit tx failed: %s", err)
118 continue
119 }
120
121 if 16 < i && 0 == i%128 {
122 debug.FreeOSMemory()
123 }
124 }
125
126 if 128 < total {
127 debug.FreeOSMemory()
128 }
129
130 elapsed := time.Now().Sub(start).Milliseconds()
131 if 7000 < elapsed {
132 logging.LogInfof("database op tx [%dms]", elapsed)
133 }
134
135 // Push database index commit event https://github.com/siyuan-note/siyuan/issues/8814
136 util.BroadcastByType("main", "databaseIndexCommit", 0, "", nil)
137
138 eventbus.Publish(eventbus.EvtSQLIndexFlushed)
139}
140
141func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (err error) {
142 switch op.action {
143 case "index":
144 err = indexTree(tx, op.indexTree, context)
145 case "upsert":
146 err = upsertTree(tx, op.upsertTree, context)
147 case "delete":
148 err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath)
149 case "delete_id":
150 err = deleteByRootID(tx, op.removeTreeID, context)
151 case "delete_ids":
152 err = batchDeleteByRootIDs(tx, op.removeTreeIDs, context)
153 case "rename":
154 err = batchUpdateHPath(tx, op.renameTree, context)
155 if err != nil {
156 break
157 }
158 err = updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID)
159 case "rename_sub_tree":
160 err = batchUpdatePath(tx, op.renameTree, context)
161 case "delete_box":
162 err = deleteByBoxTx(tx, op.box)
163 case "delete_box_refs":
164 err = deleteRefsByBoxTx(tx, op.box)
165 case "update_refs":
166 err = upsertRefs(tx, op.upsertTree)
167 case "delete_refs":
168 err = deleteRefs(tx, op.upsertTree)
169 case "update_block_content":
170 err = updateBlockContent(tx, op.block)
171 case "delete_assets":
172 err = deleteAssetsByHashes(tx, op.removeAssetHashes)
173 case "index_node":
174 err = indexNode(tx, op.id)
175 default:
176 msg := fmt.Sprintf("unknown operation [%s]", op.action)
177 logging.LogErrorf(msg)
178 err = errors.New(msg)
179 }
180 return
181}
182
183func IndexNodeQueue(id string) {
184 dbQueueLock.Lock()
185 defer dbQueueLock.Unlock()
186
187 newOp := &dbQueueOperation{id: id, inQueueTime: time.Now(), action: "index_node"}
188 for i, op := range operationQueue {
189 if "index_node" == op.action && op.id == id {
190 operationQueue[i] = newOp
191 return
192 }
193 }
194 appendOperation(newOp)
195}
196
197func BatchRemoveAssetsQueue(hashes []string) {
198 if 1 > len(hashes) {
199 return
200 }
201
202 dbQueueLock.Lock()
203 defer dbQueueLock.Unlock()
204
205 newOp := &dbQueueOperation{removeAssetHashes: hashes, inQueueTime: time.Now(), action: "delete_assets"}
206 appendOperation(newOp)
207}
208
209func UpdateBlockContentQueue(block *Block) {
210 dbQueueLock.Lock()
211 defer dbQueueLock.Unlock()
212
213 newOp := &dbQueueOperation{block: block, inQueueTime: time.Now(), action: "update_block_content"}
214 for i, op := range operationQueue {
215 if "update_block_content" == op.action && op.block.ID == block.ID {
216 operationQueue[i] = newOp
217 return
218 }
219 }
220 appendOperation(newOp)
221}
222
223func DeleteRefsTreeQueue(tree *parse.Tree) {
224 dbQueueLock.Lock()
225 defer dbQueueLock.Unlock()
226
227 newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "delete_refs"}
228 for i, op := range operationQueue {
229 if "delete_refs" == op.action && op.upsertTree.ID == tree.ID {
230 operationQueue[i] = newOp
231 return
232 }
233 }
234 appendOperation(newOp)
235}
236
237func UpdateRefsTreeQueue(tree *parse.Tree) {
238 dbQueueLock.Lock()
239 defer dbQueueLock.Unlock()
240
241 newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "update_refs"}
242 for i, op := range operationQueue {
243 if "update_refs" == op.action && op.upsertTree.ID == tree.ID {
244 operationQueue[i] = newOp
245 return
246 }
247 }
248 appendOperation(newOp)
249}
250
251func DeleteBoxRefsQueue(boxID string) {
252 dbQueueLock.Lock()
253 defer dbQueueLock.Unlock()
254
255 newOp := &dbQueueOperation{box: boxID, inQueueTime: time.Now(), action: "delete_box_refs"}
256 for i, op := range operationQueue {
257 if "delete_box_refs" == op.action && op.box == boxID {
258 operationQueue[i] = newOp
259 return
260 }
261 }
262 appendOperation(newOp)
263}
264
265func DeleteBoxQueue(boxID string) {
266 dbQueueLock.Lock()
267 defer dbQueueLock.Unlock()
268
269 newOp := &dbQueueOperation{box: boxID, inQueueTime: time.Now(), action: "delete_box"}
270 for i, op := range operationQueue {
271 if "delete_box" == op.action && op.box == boxID {
272 operationQueue[i] = newOp
273 return
274 }
275 }
276 appendOperation(newOp)
277}
278
279func IndexTreeQueue(tree *parse.Tree) {
280 dbQueueLock.Lock()
281 defer dbQueueLock.Unlock()
282
283 newOp := &dbQueueOperation{indexTree: tree, inQueueTime: time.Now(), action: "index"}
284 for i, op := range operationQueue {
285 if "index" == op.action && op.indexTree.ID == tree.ID { // 相同树则覆盖
286 operationQueue[i] = newOp
287 return
288 }
289 }
290 appendOperation(newOp)
291}
292
293func UpsertTreeQueue(tree *parse.Tree) {
294 dbQueueLock.Lock()
295 defer dbQueueLock.Unlock()
296
297 newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "upsert"}
298 for i, op := range operationQueue {
299 if "upsert" == op.action && op.upsertTree.ID == tree.ID { // 相同树则覆盖
300 operationQueue[i] = newOp
301 return
302 }
303 }
304 appendOperation(newOp)
305}
306
307func RenameTreeQueue(tree *parse.Tree) {
308 dbQueueLock.Lock()
309 defer dbQueueLock.Unlock()
310
311 newOp := &dbQueueOperation{
312 renameTree: tree,
313 inQueueTime: time.Now(),
314 action: "rename",
315 }
316 for i, op := range operationQueue {
317 if "rename" == op.action && op.renameTree.ID == tree.ID { // 相同树则覆盖
318 operationQueue[i] = newOp
319 return
320 }
321 }
322 appendOperation(newOp)
323}
324
325func RenameSubTreeQueue(tree *parse.Tree) {
326 dbQueueLock.Lock()
327 defer dbQueueLock.Unlock()
328
329 newOp := &dbQueueOperation{
330 renameTree: tree,
331 inQueueTime: time.Now(),
332 action: "rename_sub_tree",
333 }
334 for i, op := range operationQueue {
335 if "rename_sub_tree" == op.action && op.renameTree.ID == tree.ID { // 相同树则覆盖
336 operationQueue[i] = newOp
337 return
338 }
339 }
340 appendOperation(newOp)
341}
342
343func RemoveTreeQueue(rootID string) {
344 dbQueueLock.Lock()
345 defer dbQueueLock.Unlock()
346
347 newOp := &dbQueueOperation{removeTreeID: rootID, inQueueTime: time.Now(), action: "delete_id"}
348 for i, op := range operationQueue {
349 if "delete_id" == op.action && op.removeTreeID == rootID {
350 operationQueue[i] = newOp
351 return
352 }
353 }
354 appendOperation(newOp)
355}
356
357func BatchRemoveTreeQueue(rootIDs []string) {
358 if 1 > len(rootIDs) {
359 return
360 }
361
362 dbQueueLock.Lock()
363 defer dbQueueLock.Unlock()
364
365 newOp := &dbQueueOperation{removeTreeIDs: rootIDs, inQueueTime: time.Now(), action: "delete_ids"}
366 appendOperation(newOp)
367}
368
369func RemoveTreePathQueue(treeBox, treePathPrefix string) {
370 dbQueueLock.Lock()
371 defer dbQueueLock.Unlock()
372
373 newOp := &dbQueueOperation{removeTreeBox: treeBox, removeTreePath: treePathPrefix, inQueueTime: time.Now(), action: "delete"}
374 for i, op := range operationQueue {
375 if "delete" == op.action && (op.removeTreeBox == treeBox && op.removeTreePath == treePathPrefix) {
376 operationQueue[i] = newOp
377 return
378 }
379 }
380 appendOperation(newOp)
381}
382
383func getOperations() (ops []*dbQueueOperation) {
384 dbQueueLock.Lock()
385 defer dbQueueLock.Unlock()
386
387 ops = operationQueue
388 operationQueue = nil
389 return
390}
391
392func appendOperation(op *dbQueueOperation) {
393 operationQueue = append(operationQueue, op)
394 eventbus.Publish(eventbus.EvtSQLIndexChanged)
395}