A privacy-first, self-hosted, fully open source personal knowledge management software, written in typescript and golang. (PERSONAL FORK)
1// SiYuan - Refactor your thinking 2// Copyright (c) 2020-present, b3log.org 3// 4// This program is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Affero General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// This program is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Affero General Public License for more details. 13// 14// You should have received a copy of the GNU Affero General Public License 15// along with this program. If not, see <https://www.gnu.org/licenses/>. 16 17package sql 18 19import ( 20 "database/sql" 21 "errors" 22 "fmt" 23 "path" 24 "runtime/debug" 25 "sync" 26 "sync/atomic" 27 "time" 28 29 "github.com/88250/lute/parse" 30 "github.com/siyuan-note/eventbus" 31 "github.com/siyuan-note/logging" 32 "github.com/siyuan-note/siyuan/kernel/task" 33 "github.com/siyuan-note/siyuan/kernel/util" 34) 35 36var ( 37 operationQueue []*dbQueueOperation 38 dbQueueLock = sync.Mutex{} 39 txLock = sync.Mutex{} 40) 41 42type dbQueueOperation struct { 43 inQueueTime time.Time 44 action string // upsert/delete/delete_id/rename/rename_sub_tree/delete_box/delete_box_refs/index/delete_ids/update_block_content/delete_assets 45 indexTree *parse.Tree // index 46 upsertTree *parse.Tree // upsert/update_refs/delete_refs 47 removeTreeBox, removeTreePath string // delete 48 removeTreeID string // delete_id 49 removeTreeIDs []string // delete_ids 50 box string // delete_box/delete_box_refs/index 51 renameTree *parse.Tree // rename/rename_sub_tree 52 block *Block // update_block_content 53 id string // index_node 54 removeAssetHashes []string // delete_assets 55} 56 57func FlushTxJob() { 58 task.AppendTask(task.DatabaseIndexCommit, FlushQueue) 59} 60 61func ClearQueue() { 62 dbQueueLock.Lock() 63 defer dbQueueLock.Unlock() 64 operationQueue = nil 65} 66 67var flushingTx = atomic.Bool{} 68 69func FlushQueue() { 70 ops := getOperations() 71 total := len(ops) 72 if 1 > total && !flushingTx.Load() { 73 return 74 } 75 76 txLock.Lock() 77 flushingTx.Store(true) 78 defer func() { 79 flushingTx.Store(false) 80 txLock.Unlock() 81 }() 82 83 start := time.Now() 84 85 context := map[string]interface{}{eventbus.CtxPushMsg: eventbus.CtxPushMsgToStatusBar} 86 if 512 < len(ops) { 87 disableCache() 88 defer enableCache() 89 } 90 91 groupOpsTotal := map[string]int{} 92 for _, op := range ops { 93 groupOpsTotal[op.action]++ 94 } 95 96 groupOpsCurrent := map[string]int{} 97 for i, op := range ops { 98 if util.IsExiting.Load() { 99 return 100 } 101 102 tx, err := beginTx() 103 if err != nil { 104 return 105 } 106 107 groupOpsCurrent[op.action]++ 108 context["current"] = groupOpsCurrent[op.action] 109 context["total"] = groupOpsTotal[op.action] 110 if err = execOp(op, tx, context); err != nil { 111 tx.Rollback() 112 logging.LogErrorf("queue operation [%s] failed: %s", op.action, err) 113 continue 114 } 115 116 if err = commitTx(tx); err != nil { 117 logging.LogErrorf("commit tx failed: %s", err) 118 continue 119 } 120 121 if 16 < i && 0 == i%128 { 122 debug.FreeOSMemory() 123 } 124 } 125 126 if 128 < total { 127 debug.FreeOSMemory() 128 } 129 130 elapsed := time.Now().Sub(start).Milliseconds() 131 if 7000 < elapsed { 132 logging.LogInfof("database op tx [%dms]", elapsed) 133 } 134 135 // Push database index commit event https://github.com/siyuan-note/siyuan/issues/8814 136 util.BroadcastByType("main", "databaseIndexCommit", 0, "", nil) 137 138 eventbus.Publish(eventbus.EvtSQLIndexFlushed) 139} 140 141func execOp(op *dbQueueOperation, tx *sql.Tx, context map[string]interface{}) (err error) { 142 switch op.action { 143 case "index": 144 err = indexTree(tx, op.indexTree, context) 145 case "upsert": 146 err = upsertTree(tx, op.upsertTree, context) 147 case "delete": 148 err = batchDeleteByPathPrefix(tx, op.removeTreeBox, op.removeTreePath) 149 case "delete_id": 150 err = deleteByRootID(tx, op.removeTreeID, context) 151 case "delete_ids": 152 err = batchDeleteByRootIDs(tx, op.removeTreeIDs, context) 153 case "rename": 154 err = batchUpdateHPath(tx, op.renameTree, context) 155 if err != nil { 156 break 157 } 158 err = updateRootContent(tx, path.Base(op.renameTree.HPath), op.renameTree.Root.IALAttr("updated"), op.renameTree.ID) 159 case "rename_sub_tree": 160 err = batchUpdatePath(tx, op.renameTree, context) 161 case "delete_box": 162 err = deleteByBoxTx(tx, op.box) 163 case "delete_box_refs": 164 err = deleteRefsByBoxTx(tx, op.box) 165 case "update_refs": 166 err = upsertRefs(tx, op.upsertTree) 167 case "delete_refs": 168 err = deleteRefs(tx, op.upsertTree) 169 case "update_block_content": 170 err = updateBlockContent(tx, op.block) 171 case "delete_assets": 172 err = deleteAssetsByHashes(tx, op.removeAssetHashes) 173 case "index_node": 174 err = indexNode(tx, op.id) 175 default: 176 msg := fmt.Sprintf("unknown operation [%s]", op.action) 177 logging.LogErrorf(msg) 178 err = errors.New(msg) 179 } 180 return 181} 182 183func IndexNodeQueue(id string) { 184 dbQueueLock.Lock() 185 defer dbQueueLock.Unlock() 186 187 newOp := &dbQueueOperation{id: id, inQueueTime: time.Now(), action: "index_node"} 188 for i, op := range operationQueue { 189 if "index_node" == op.action && op.id == id { 190 operationQueue[i] = newOp 191 return 192 } 193 } 194 appendOperation(newOp) 195} 196 197func BatchRemoveAssetsQueue(hashes []string) { 198 if 1 > len(hashes) { 199 return 200 } 201 202 dbQueueLock.Lock() 203 defer dbQueueLock.Unlock() 204 205 newOp := &dbQueueOperation{removeAssetHashes: hashes, inQueueTime: time.Now(), action: "delete_assets"} 206 appendOperation(newOp) 207} 208 209func UpdateBlockContentQueue(block *Block) { 210 dbQueueLock.Lock() 211 defer dbQueueLock.Unlock() 212 213 newOp := &dbQueueOperation{block: block, inQueueTime: time.Now(), action: "update_block_content"} 214 for i, op := range operationQueue { 215 if "update_block_content" == op.action && op.block.ID == block.ID { 216 operationQueue[i] = newOp 217 return 218 } 219 } 220 appendOperation(newOp) 221} 222 223func DeleteRefsTreeQueue(tree *parse.Tree) { 224 dbQueueLock.Lock() 225 defer dbQueueLock.Unlock() 226 227 newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "delete_refs"} 228 for i, op := range operationQueue { 229 if "delete_refs" == op.action && op.upsertTree.ID == tree.ID { 230 operationQueue[i] = newOp 231 return 232 } 233 } 234 appendOperation(newOp) 235} 236 237func UpdateRefsTreeQueue(tree *parse.Tree) { 238 dbQueueLock.Lock() 239 defer dbQueueLock.Unlock() 240 241 newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "update_refs"} 242 for i, op := range operationQueue { 243 if "update_refs" == op.action && op.upsertTree.ID == tree.ID { 244 operationQueue[i] = newOp 245 return 246 } 247 } 248 appendOperation(newOp) 249} 250 251func DeleteBoxRefsQueue(boxID string) { 252 dbQueueLock.Lock() 253 defer dbQueueLock.Unlock() 254 255 newOp := &dbQueueOperation{box: boxID, inQueueTime: time.Now(), action: "delete_box_refs"} 256 for i, op := range operationQueue { 257 if "delete_box_refs" == op.action && op.box == boxID { 258 operationQueue[i] = newOp 259 return 260 } 261 } 262 appendOperation(newOp) 263} 264 265func DeleteBoxQueue(boxID string) { 266 dbQueueLock.Lock() 267 defer dbQueueLock.Unlock() 268 269 newOp := &dbQueueOperation{box: boxID, inQueueTime: time.Now(), action: "delete_box"} 270 for i, op := range operationQueue { 271 if "delete_box" == op.action && op.box == boxID { 272 operationQueue[i] = newOp 273 return 274 } 275 } 276 appendOperation(newOp) 277} 278 279func IndexTreeQueue(tree *parse.Tree) { 280 dbQueueLock.Lock() 281 defer dbQueueLock.Unlock() 282 283 newOp := &dbQueueOperation{indexTree: tree, inQueueTime: time.Now(), action: "index"} 284 for i, op := range operationQueue { 285 if "index" == op.action && op.indexTree.ID == tree.ID { // 相同树则覆盖 286 operationQueue[i] = newOp 287 return 288 } 289 } 290 appendOperation(newOp) 291} 292 293func UpsertTreeQueue(tree *parse.Tree) { 294 dbQueueLock.Lock() 295 defer dbQueueLock.Unlock() 296 297 newOp := &dbQueueOperation{upsertTree: tree, inQueueTime: time.Now(), action: "upsert"} 298 for i, op := range operationQueue { 299 if "upsert" == op.action && op.upsertTree.ID == tree.ID { // 相同树则覆盖 300 operationQueue[i] = newOp 301 return 302 } 303 } 304 appendOperation(newOp) 305} 306 307func RenameTreeQueue(tree *parse.Tree) { 308 dbQueueLock.Lock() 309 defer dbQueueLock.Unlock() 310 311 newOp := &dbQueueOperation{ 312 renameTree: tree, 313 inQueueTime: time.Now(), 314 action: "rename", 315 } 316 for i, op := range operationQueue { 317 if "rename" == op.action && op.renameTree.ID == tree.ID { // 相同树则覆盖 318 operationQueue[i] = newOp 319 return 320 } 321 } 322 appendOperation(newOp) 323} 324 325func RenameSubTreeQueue(tree *parse.Tree) { 326 dbQueueLock.Lock() 327 defer dbQueueLock.Unlock() 328 329 newOp := &dbQueueOperation{ 330 renameTree: tree, 331 inQueueTime: time.Now(), 332 action: "rename_sub_tree", 333 } 334 for i, op := range operationQueue { 335 if "rename_sub_tree" == op.action && op.renameTree.ID == tree.ID { // 相同树则覆盖 336 operationQueue[i] = newOp 337 return 338 } 339 } 340 appendOperation(newOp) 341} 342 343func RemoveTreeQueue(rootID string) { 344 dbQueueLock.Lock() 345 defer dbQueueLock.Unlock() 346 347 newOp := &dbQueueOperation{removeTreeID: rootID, inQueueTime: time.Now(), action: "delete_id"} 348 for i, op := range operationQueue { 349 if "delete_id" == op.action && op.removeTreeID == rootID { 350 operationQueue[i] = newOp 351 return 352 } 353 } 354 appendOperation(newOp) 355} 356 357func BatchRemoveTreeQueue(rootIDs []string) { 358 if 1 > len(rootIDs) { 359 return 360 } 361 362 dbQueueLock.Lock() 363 defer dbQueueLock.Unlock() 364 365 newOp := &dbQueueOperation{removeTreeIDs: rootIDs, inQueueTime: time.Now(), action: "delete_ids"} 366 appendOperation(newOp) 367} 368 369func RemoveTreePathQueue(treeBox, treePathPrefix string) { 370 dbQueueLock.Lock() 371 defer dbQueueLock.Unlock() 372 373 newOp := &dbQueueOperation{removeTreeBox: treeBox, removeTreePath: treePathPrefix, inQueueTime: time.Now(), action: "delete"} 374 for i, op := range operationQueue { 375 if "delete" == op.action && (op.removeTreeBox == treeBox && op.removeTreePath == treePathPrefix) { 376 operationQueue[i] = newOp 377 return 378 } 379 } 380 appendOperation(newOp) 381} 382 383func getOperations() (ops []*dbQueueOperation) { 384 dbQueueLock.Lock() 385 defer dbQueueLock.Unlock() 386 387 ops = operationQueue 388 operationQueue = nil 389 return 390} 391 392func appendOperation(op *dbQueueOperation) { 393 operationQueue = append(operationQueue, op) 394 eventbus.Publish(eventbus.EvtSQLIndexChanged) 395}