A privacy-first, self-hosted, fully open source personal knowledge management software, written in typescript and golang. (PERSONAL FORK)
at lambda-fork/main 916 lines 24 kB view raw
1// SiYuan - Refactor your thinking 2// Copyright (c) 2020-present, b3log.org 3// 4// This program is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Affero General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// This program is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Affero General Public License for more details. 13// 14// You should have received a copy of the GNU Affero General Public License 15// along with this program. If not, see <https://www.gnu.org/licenses/>. 16 17package model 18 19import ( 20 "errors" 21 "fmt" 22 "net/http" 23 "os" 24 "path/filepath" 25 "runtime" 26 "strings" 27 "sync" 28 "sync/atomic" 29 "time" 30 31 "github.com/88250/go-humanize" 32 "github.com/88250/gulu" 33 "github.com/88250/lute/html" 34 "github.com/gorilla/websocket" 35 "github.com/siyuan-note/dejavu" 36 "github.com/siyuan-note/dejavu/cloud" 37 "github.com/siyuan-note/logging" 38 "github.com/siyuan-note/siyuan/kernel/cache" 39 "github.com/siyuan-note/siyuan/kernel/conf" 40 "github.com/siyuan-note/siyuan/kernel/filesys" 41 "github.com/siyuan-note/siyuan/kernel/sql" 42 "github.com/siyuan-note/siyuan/kernel/treenode" 43 "github.com/siyuan-note/siyuan/kernel/util" 44) 45 46func SyncDataDownload() { 47 defer logging.Recover() 48 49 if !checkSync(false, false, true) { 50 return 51 } 52 53 util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil) 54 if !isProviderOnline(true) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈 55 util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil) 56 return 57 } 58 59 lockSync() 60 defer unlockSync() 61 62 now := util.CurrentTimeMillis() 63 Conf.Sync.Synced = now 64 65 err := syncRepoDownload() 66 code := 1 67 if err != nil { 68 code = 2 69 } 70 util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil) 71} 72 73func SyncDataUpload() { 74 defer logging.Recover() 75 76 if !checkSync(false, false, true) { 77 return 78 } 79 80 util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil) 81 if !isProviderOnline(true) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈 82 util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil) 83 return 84 } 85 86 lockSync() 87 defer unlockSync() 88 89 now := util.CurrentTimeMillis() 90 Conf.Sync.Synced = now 91 92 err := syncRepoUpload() 93 code := 1 94 if err != nil { 95 code = 2 96 } 97 util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil) 98 return 99} 100 101var ( 102 syncSameCount = atomic.Int32{} 103 autoSyncErrCount = 0 104 fixSyncInterval = 5 * time.Minute 105 106 syncPlanTimeLock = sync.Mutex{} 107 syncPlanTime = time.Now().Add(fixSyncInterval) 108 109 BootSyncSucc = -1 // -1:未执行,0:执行成功,1:执行失败 110 ExitSyncSucc = -1 111) 112 113func SyncDataJob() { 114 syncPlanTimeLock.Lock() 115 if time.Now().Before(syncPlanTime) { 116 syncPlanTimeLock.Unlock() 117 return 118 } 119 syncPlanTimeLock.Unlock() 120 121 SyncData(false) 122} 123 124func BootSyncData() { 125 defer logging.Recover() 126 127 if Conf.Sync.Perception { 128 connectSyncWebSocket() 129 } 130 131 if !checkSync(true, false, false) { 132 return 133 } 134 135 if !isProviderOnline(false) { 136 BootSyncSucc = 1 137 util.PushErrMsg(Conf.Language(76), 7000) 138 return 139 } 140 141 lockSync() 142 defer unlockSync() 143 144 util.IncBootProgress(3, "Syncing data from the cloud...") 145 BootSyncSucc = 0 146 logging.LogInfof("sync before boot") 147 148 now := util.CurrentTimeMillis() 149 Conf.Sync.Synced = now 150 util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil) 151 err := bootSyncRepo() 152 code := 1 153 if err != nil { 154 code = 2 155 } 156 util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil) 157 return 158} 159 160func SyncData(byHand bool) { 161 syncData(false, byHand) 162} 163 164func lockSync() { 165 syncLock.Lock() 166 isSyncing.Store(true) 167} 168 169func unlockSync() { 170 isSyncing.Store(false) 171 syncLock.Unlock() 172} 173 174func syncData(exit, byHand bool) { 175 defer logging.Recover() 176 177 if !checkSync(false, exit, byHand) { 178 return 179 } 180 181 lockSync() 182 defer unlockSync() 183 184 util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil) 185 if !exit && !isProviderOnline(byHand) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈 186 util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil) 187 return 188 } 189 190 if exit { 191 ExitSyncSucc = 0 192 logging.LogInfof("sync before exit") 193 msgId := util.PushMsg(Conf.Language(81), 1000*60*15) 194 defer func() { 195 util.PushClearMsg(msgId) 196 }() 197 } 198 199 now := util.CurrentTimeMillis() 200 Conf.Sync.Synced = now 201 202 dataChanged, err := syncRepo(exit, byHand) 203 code := 1 204 if err != nil { 205 code = 2 206 } 207 util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil) 208 209 if nil == webSocketConn && Conf.Sync.Perception { 210 // 如果 websocket 连接已经断开,则重新连接 211 connectSyncWebSocket() 212 } 213 214 if 1 == Conf.Sync.Mode && nil != webSocketConn && Conf.Sync.Perception && dataChanged { 215 // 如果处于自动同步模式且不是由 WS 触发的同步,则通知其他设备上的内核进行同步 216 request := map[string]interface{}{ 217 "cmd": "synced", 218 "synced": Conf.Sync.Synced, 219 } 220 if writeErr := webSocketConn.WriteJSON(request); nil != writeErr { 221 logging.LogErrorf("write websocket message failed: %v", writeErr) 222 } 223 } 224 return 225} 226 227func checkSync(boot, exit, byHand bool) bool { 228 if 2 == Conf.Sync.Mode && !boot && !exit && !byHand { // 手动模式下只有启动和退出进行同步 229 return false 230 } 231 232 if 3 == Conf.Sync.Mode && !byHand { // 完全手动模式下只有手动进行同步 233 return false 234 } 235 236 if !Conf.Sync.Enabled { 237 if byHand { 238 util.PushMsg(Conf.Language(124), 5000) 239 } 240 return false 241 } 242 243 if !cloud.IsValidCloudDirName(Conf.Sync.CloudName) { 244 if byHand { 245 util.PushMsg(Conf.Language(123), 5000) 246 } 247 return false 248 } 249 250 switch Conf.Sync.Provider { 251 case conf.ProviderSiYuan: 252 if !IsSubscriber() { 253 return false 254 } 255 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal: 256 if !IsPaidUser() { 257 return false 258 } 259 } 260 261 if 7 < autoSyncErrCount && !byHand { 262 logging.LogErrorf("failed to auto-sync too many times, delay auto-sync 64 minutes") 263 util.PushErrMsg(Conf.Language(125), 1000*60*60) 264 planSyncAfter(64 * time.Minute) 265 return false 266 } 267 return true 268} 269 270// incReindex 增量重建索引。 271func incReindex(upserts, removes []string) (upsertRootIDs, removeRootIDs []string) { 272 upsertRootIDs = []string{} 273 removeRootIDs = []string{} 274 275 util.IncBootProgress(3, "Sync reindexing...") 276 removeRootIDs = removeIndexes(removes) // 先执行 remove,否则移动文档时 upsert 会被忽略,导致未被索引 277 upsertRootIDs = upsertIndexes(upserts) 278 279 if 1 > len(removeRootIDs) { 280 removeRootIDs = []string{} 281 } 282 if 1 > len(upsertRootIDs) { 283 upsertRootIDs = []string{} 284 } 285 return 286} 287 288func removeIndexes(removeFilePaths []string) (removeRootIDs []string) { 289 bootProgressPart := int32(10 / float64(len(removeFilePaths))) 290 for _, removeFile := range removeFilePaths { 291 if !strings.HasSuffix(removeFile, ".sy") { 292 continue 293 } 294 295 id := util.GetTreeID(removeFile) 296 removeRootIDs = append(removeRootIDs, id) 297 block := treenode.GetBlockTree(id) 298 if nil != block { 299 msg := fmt.Sprintf(Conf.Language(39), block.RootID) 300 util.IncBootProgress(bootProgressPart, msg) 301 util.PushStatusBar(msg) 302 303 bts := treenode.GetBlockTreesByRootID(block.RootID) 304 for _, b := range bts { 305 cache.RemoveBlockIAL(b.ID) 306 } 307 cache.RemoveDocIAL(block.Path) 308 309 treenode.RemoveBlockTreesByRootID(block.RootID) 310 sql.RemoveTreeQueue(block.RootID) 311 } 312 } 313 314 if 1 > len(removeRootIDs) { 315 removeRootIDs = []string{} 316 } 317 return 318} 319 320func upsertIndexes(upsertFilePaths []string) (upsertRootIDs []string) { 321 luteEngine := util.NewLute() 322 bootProgressPart := int32(10 / float64(len(upsertFilePaths))) 323 for _, upsertFile := range upsertFilePaths { 324 if !strings.HasSuffix(upsertFile, ".sy") { 325 continue 326 } 327 328 upsertFile = filepath.ToSlash(upsertFile) 329 if strings.HasPrefix(upsertFile, "/") { 330 upsertFile = upsertFile[1:] 331 } 332 idx := strings.Index(upsertFile, "/") 333 if 0 > idx { 334 // .sy 直接出现在 data 文件夹下,没有出现在笔记本文件夹下的情况 335 continue 336 } 337 338 box := upsertFile[:idx] 339 p := strings.TrimPrefix(upsertFile, box) 340 msg := fmt.Sprintf(Conf.Language(40), util.GetTreeID(p)) 341 util.IncBootProgress(bootProgressPart, msg) 342 util.PushStatusBar(msg) 343 344 tree, err0 := filesys.LoadTree(box, p, luteEngine) 345 if nil != err0 { 346 continue 347 } 348 treenode.UpsertBlockTree(tree) 349 sql.UpsertTreeQueue(tree) 350 351 bts := treenode.GetBlockTreesByRootID(tree.ID) 352 for _, b := range bts { 353 cache.RemoveBlockIAL(b.ID) 354 } 355 cache.RemoveDocIAL(tree.Path) 356 357 upsertRootIDs = append(upsertRootIDs, tree.Root.ID) 358 } 359 360 if 1 > len(upsertRootIDs) { 361 upsertRootIDs = []string{} 362 } 363 return 364} 365 366func SetCloudSyncDir(name string) { 367 if !cloud.IsValidCloudDirName(name) { 368 util.PushErrMsg(Conf.Language(37), 5000) 369 return 370 } 371 372 if Conf.Sync.CloudName == name { 373 return 374 } 375 376 Conf.Sync.CloudName = name 377 Conf.Save() 378} 379 380func SetSyncGenerateConflictDoc(b bool) { 381 Conf.Sync.GenerateConflictDoc = b 382 Conf.Save() 383 return 384} 385 386func SetSyncEnable(b bool) { 387 Conf.Sync.Enabled = b 388 Conf.Save() 389 return 390} 391 392func SetSyncInterval(interval int) { 393 if 30 > interval { 394 interval = 30 395 } 396 if 43200 < interval { 397 interval = 43200 398 } 399 400 Conf.Sync.Interval = interval 401 Conf.Save() 402 planSyncAfter(time.Duration(interval) * time.Second) 403 return 404} 405 406func SetSyncPerception(b bool) { 407 if util.ContainerDocker == util.Container { 408 b = false 409 } 410 411 Conf.Sync.Perception = b 412 Conf.Save() 413 414 if b { 415 connectSyncWebSocket() 416 } else { 417 closeSyncWebSocket() 418 } 419 return 420} 421 422func SetSyncMode(mode int) { 423 Conf.Sync.Mode = mode 424 Conf.Save() 425 return 426} 427 428func SetSyncProvider(provider int) (err error) { 429 Conf.Sync.Provider = provider 430 Conf.Save() 431 return 432} 433 434func SetSyncProviderS3(s3 *conf.S3) (err error) { 435 s3.Endpoint = strings.TrimSpace(s3.Endpoint) 436 s3.Endpoint = util.NormalizeEndpoint(s3.Endpoint) 437 s3.AccessKey = strings.TrimSpace(s3.AccessKey) 438 s3.SecretKey = strings.TrimSpace(s3.SecretKey) 439 s3.Bucket = strings.TrimSpace(s3.Bucket) 440 s3.Region = strings.TrimSpace(s3.Region) 441 s3.Timeout = util.NormalizeTimeout(s3.Timeout) 442 s3.ConcurrentReqs = util.NormalizeConcurrentReqs(s3.ConcurrentReqs, conf.ProviderS3) 443 444 if !cloud.IsValidCloudDirName(s3.Bucket) { 445 util.PushErrMsg(Conf.Language(37), 5000) 446 return 447 } 448 449 Conf.Sync.S3 = s3 450 Conf.Save() 451 return 452} 453 454func SetSyncProviderWebDAV(webdav *conf.WebDAV) (err error) { 455 webdav.Endpoint = strings.TrimSpace(webdav.Endpoint) 456 webdav.Endpoint = util.NormalizeEndpoint(webdav.Endpoint) 457 458 // 不支持配置坚果云 WebDAV 进行同步 https://github.com/siyuan-note/siyuan/issues/7657 459 if strings.Contains(strings.ToLower(webdav.Endpoint), "dav.jianguoyun.com") { 460 err = errors.New(Conf.Language(194)) 461 return 462 } 463 464 webdav.Username = strings.TrimSpace(webdav.Username) 465 webdav.Password = strings.TrimSpace(webdav.Password) 466 webdav.Timeout = util.NormalizeTimeout(webdav.Timeout) 467 webdav.ConcurrentReqs = util.NormalizeConcurrentReqs(webdav.ConcurrentReqs, conf.ProviderWebDAV) 468 469 Conf.Sync.WebDAV = webdav 470 Conf.Save() 471 return 472} 473 474func SetSyncProviderLocal(local *conf.Local) (err error) { 475 local.Endpoint = strings.TrimSpace(local.Endpoint) 476 local.Endpoint = util.NormalizeLocalPath(local.Endpoint) 477 478 absPath, err := filepath.Abs(local.Endpoint) 479 if nil != err { 480 msg := fmt.Sprintf("get endpoint [%s] abs path failed: %s", local.Endpoint, err) 481 logging.LogErrorf(msg) 482 err = errors.New(fmt.Sprintf(Conf.Language(77), msg)) 483 return 484 } 485 if !gulu.File.IsExist(absPath) { 486 msg := fmt.Sprintf("endpoint [%s] not exist", local.Endpoint) 487 logging.LogErrorf(msg) 488 err = errors.New(fmt.Sprintf(Conf.Language(77), msg)) 489 return 490 } 491 if util.IsAbsPathInWorkspace(absPath) || filepath.Clean(absPath) == filepath.Clean(util.WorkspaceDir) { 492 msg := fmt.Sprintf("endpoint [%s] is in workspace", local.Endpoint) 493 logging.LogErrorf(msg) 494 err = errors.New(fmt.Sprintf(Conf.Language(77), msg)) 495 return 496 } 497 498 if util.IsSubPath(absPath, util.WorkspaceDir) { 499 msg := fmt.Sprintf("endpoint [%s] is parent of workspace", local.Endpoint) 500 logging.LogErrorf(msg) 501 err = errors.New(fmt.Sprintf(Conf.Language(77), msg)) 502 return 503 } 504 505 local.Timeout = util.NormalizeTimeout(local.Timeout) 506 local.ConcurrentReqs = util.NormalizeConcurrentReqs(local.ConcurrentReqs, conf.ProviderLocal) 507 508 Conf.Sync.Local = local 509 Conf.Save() 510 return 511} 512 513var ( 514 syncLock = sync.Mutex{} 515 isSyncing = atomic.Bool{} 516) 517 518func CreateCloudSyncDir(name string) (err error) { 519 switch Conf.Sync.Provider { 520 case conf.ProviderSiYuan, conf.ProviderLocal: 521 break 522 default: 523 err = errors.New(Conf.Language(131)) 524 return 525 } 526 527 name = strings.TrimSpace(name) 528 name = util.RemoveInvalid(name) 529 if !cloud.IsValidCloudDirName(name) { 530 return errors.New(Conf.Language(37)) 531 } 532 533 repo, err := newRepository() 534 if err != nil { 535 return 536 } 537 538 err = repo.CreateCloudRepo(name) 539 if err != nil { 540 err = errors.New(formatRepoErrorMsg(err)) 541 return 542 } 543 return 544} 545 546func RemoveCloudSyncDir(name string) (err error) { 547 switch Conf.Sync.Provider { 548 case conf.ProviderSiYuan, conf.ProviderLocal: 549 break 550 default: 551 err = errors.New(Conf.Language(131)) 552 return 553 } 554 555 msgId := util.PushMsg(Conf.Language(116), 15000) 556 557 if "" == name { 558 return 559 } 560 561 repo, err := newRepository() 562 if err != nil { 563 return 564 } 565 566 err = repo.RemoveCloudRepo(name) 567 if err != nil { 568 err = errors.New(formatRepoErrorMsg(err)) 569 return 570 } 571 572 util.PushClearMsg(msgId) 573 time.Sleep(500 * time.Millisecond) 574 if Conf.Sync.CloudName == name { 575 Conf.Sync.CloudName = "main" 576 Conf.Save() 577 util.PushMsg(Conf.Language(155), 5000) 578 } 579 return 580} 581 582func ListCloudSyncDir() (syncDirs []*Sync, hSize string, err error) { 583 syncDirs = []*Sync{} 584 var dirs []*cloud.Repo 585 var size int64 586 587 repo, err := newRepository() 588 if err != nil { 589 return 590 } 591 592 dirs, size, err = repo.GetCloudRepos() 593 if err != nil { 594 err = errors.New(formatRepoErrorMsg(err)) 595 return 596 } 597 if 1 > len(dirs) { 598 dirs = append(dirs, &cloud.Repo{ 599 Name: "main", 600 Size: 0, 601 Updated: time.Now().Format("2006-01-02 15:04:05"), 602 }) 603 } 604 605 for _, d := range dirs { 606 dirSize := d.Size 607 sync := &Sync{ 608 Size: dirSize, 609 HSize: "-", 610 Updated: d.Updated, 611 CloudName: d.Name, 612 } 613 if conf.ProviderSiYuan == Conf.Sync.Provider { 614 sync.HSize = humanize.BytesCustomCeil(uint64(dirSize), 2) 615 } 616 syncDirs = append(syncDirs, sync) 617 } 618 hSize = "-" 619 if conf.ProviderSiYuan == Conf.Sync.Provider { 620 hSize = humanize.BytesCustomCeil(uint64(size), 2) 621 } 622 if conf.ProviderS3 == Conf.Sync.Provider { 623 Conf.Sync.CloudName = syncDirs[0].CloudName 624 Conf.Save() 625 } 626 return 627} 628 629func formatRepoErrorMsg(err error) string { 630 msg := html.EscapeString(err.Error()) 631 if errors.Is(err, cloud.ErrCloudAuthFailed) { 632 msg = Conf.Language(31) 633 } else if errors.Is(err, cloud.ErrCloudObjectNotFound) { 634 msg = Conf.Language(129) 635 } else if errors.Is(err, dejavu.ErrLockCloudFailed) { 636 msg = Conf.Language(188) 637 } else if errors.Is(err, dejavu.ErrCloudLocked) { 638 msg = Conf.Language(189) 639 } else if errors.Is(err, dejavu.ErrRepoFatal) { 640 msg = Conf.Language(23) 641 } else if errors.Is(err, cloud.ErrSystemTimeIncorrect) { 642 msg = Conf.Language(195) 643 } else if errors.Is(err, cloud.ErrDeprecatedVersion) { 644 msg = Conf.Language(212) 645 } else if errors.Is(err, cloud.ErrCloudCheckFailed) { 646 msg = Conf.Language(213) 647 } else if errors.Is(err, cloud.ErrCloudServiceUnavailable) { 648 msg = Conf.language(219) 649 } else if errors.Is(err, cloud.ErrCloudForbidden) { 650 msg = Conf.language(249) 651 } else if errors.Is(err, cloud.ErrCloudTooManyRequests) { 652 msg = Conf.language(250) 653 } else { 654 logging.LogErrorf("sync failed caused by network: %s", msg) 655 msgLowerCase := strings.ToLower(msg) 656 if strings.Contains(msgLowerCase, "permission denied") || strings.Contains(msg, "access is denied") { 657 msg = Conf.Language(33) 658 } else if strings.Contains(msgLowerCase, "region was not a valid") { 659 msg = Conf.language(254) 660 } else if strings.Contains(msgLowerCase, "device or resource busy") || strings.Contains(msg, "is being used by another") { 661 msg = fmt.Sprintf(Conf.Language(85), err) 662 } else if strings.Contains(msgLowerCase, "cipher: message authentication failed") { 663 msg = Conf.Language(135) 664 } else if strings.Contains(msgLowerCase, "no such host") || strings.Contains(msgLowerCase, "connection failed") || strings.Contains(msgLowerCase, "hostname resolution") || strings.Contains(msgLowerCase, "No address associated with hostname") { 665 msg = Conf.Language(24) 666 } else if strings.Contains(msgLowerCase, "net/http: request canceled while waiting for connection") || strings.Contains(msgLowerCase, "exceeded while awaiting") || strings.Contains(msgLowerCase, "context deadline exceeded") || strings.Contains(msgLowerCase, "timeout") || strings.Contains(msgLowerCase, "context cancellation while reading body") { 667 msg = Conf.Language(24) 668 } else if strings.Contains(msgLowerCase, "connection") || strings.Contains(msgLowerCase, "refused") || strings.Contains(msgLowerCase, "socket") || strings.Contains(msgLowerCase, "eof") || strings.Contains(msgLowerCase, "closed") || strings.Contains(msgLowerCase, "network") { 669 msg = Conf.Language(28) 670 } 671 } 672 msg += " (Provider: " + conf.ProviderToStr(Conf.Sync.Provider) + ")" 673 return msg 674} 675 676func getSyncIgnoreLines() (ret []string) { 677 ignore := filepath.Join(util.DataDir, ".siyuan", "syncignore") 678 err := os.MkdirAll(filepath.Dir(ignore), 0755) 679 if err != nil { 680 return 681 } 682 if !gulu.File.IsExist(ignore) { 683 if err = gulu.File.WriteFileSafer(ignore, nil, 0644); err != nil { 684 logging.LogErrorf("create syncignore [%s] failed: %s", ignore, err) 685 return 686 } 687 } 688 data, err := os.ReadFile(ignore) 689 if err != nil { 690 logging.LogErrorf("read syncignore [%s] failed: %s", ignore, err) 691 return 692 } 693 dataStr := string(data) 694 dataStr = strings.ReplaceAll(dataStr, "\r\n", "\n") 695 ret = strings.Split(dataStr, "\n") 696 697 // 默认忽略帮助文档 698 ret = append(ret, "20210808180117-6v0mkxr/**/*") 699 ret = append(ret, "20210808180117-czj9bvb/**/*") 700 ret = append(ret, "20211226090932-5lcq56f/**/*") 701 ret = append(ret, "20240530133126-axarxgx/**/*") 702 703 ret = gulu.Str.RemoveDuplicatedElem(ret) 704 return 705} 706 707func IncSync() { 708 syncSameCount.Store(0) 709 planSyncAfter(time.Duration(Conf.Sync.Interval) * time.Second) 710} 711 712func planSyncAfter(d time.Duration) { 713 syncPlanTimeLock.Lock() 714 syncPlanTime = time.Now().Add(d) 715 syncPlanTimeLock.Unlock() 716} 717 718func isProviderOnline(byHand bool) (ret bool) { 719 var checkURL string 720 skipTlsVerify := false 721 timeout := 3000 722 switch Conf.Sync.Provider { 723 case conf.ProviderSiYuan: 724 checkURL = util.GetCloudSyncServer() 725 case conf.ProviderS3: 726 checkURL = Conf.Sync.S3.Endpoint 727 skipTlsVerify = Conf.Sync.S3.SkipTlsVerify 728 timeout = Conf.Sync.S3.Timeout * 1000 729 case conf.ProviderWebDAV: 730 checkURL = Conf.Sync.WebDAV.Endpoint 731 skipTlsVerify = Conf.Sync.WebDAV.SkipTlsVerify 732 timeout = Conf.Sync.WebDAV.Timeout * 1000 733 case conf.ProviderLocal: 734 checkURL = "file://" + Conf.Sync.Local.Endpoint 735 timeout = Conf.Sync.Local.Timeout * 1000 736 default: 737 logging.LogWarnf("unknown provider: %d", Conf.Sync.Provider) 738 return false 739 } 740 741 if ret = util.IsOnline(checkURL, skipTlsVerify, timeout); !ret { 742 if 1 > autoSyncErrCount || byHand { 743 util.PushErrMsg(Conf.Language(76)+" (Provider: "+conf.ProviderToStr(Conf.Sync.Provider)+")", 5000) 744 } 745 if !byHand { 746 planSyncAfter(fixSyncInterval) 747 autoSyncErrCount++ 748 } 749 } 750 return 751} 752 753var ( 754 webSocketConn *websocket.Conn 755 webSocketConnLock = sync.Mutex{} 756) 757 758type OnlineKernel struct { 759 ID string `json:"id"` 760 Hostname string `json:"hostname"` 761 OS string `json:"os"` 762 Ver string `json:"ver"` 763} 764 765var ( 766 onlineKernels []*OnlineKernel 767 onlineKernelsLock = sync.Mutex{} 768) 769 770func GetOnlineKernels() (ret []*OnlineKernel) { 771 ret = []*OnlineKernel{} 772 onlineKernelsLock.Lock() 773 tmp := onlineKernels 774 onlineKernelsLock.Unlock() 775 for _, kernel := range tmp { 776 if kernel.ID == KernelID { 777 continue 778 } 779 780 ret = append(ret, kernel) 781 } 782 return 783} 784 785var closedSyncWebSocket = atomic.Bool{} 786 787func closeSyncWebSocket() { 788 defer logging.Recover() 789 790 webSocketConnLock.Lock() 791 defer webSocketConnLock.Unlock() 792 793 if nil != webSocketConn { 794 webSocketConn.Close() 795 webSocketConn = nil 796 closedSyncWebSocket.Store(true) 797 } 798 799 logging.LogInfof("sync websocket closed") 800} 801 802func connectSyncWebSocket() { 803 defer logging.Recover() 804 805 if !Conf.Sync.Enabled || !IsSubscriber() || conf.ProviderSiYuan != Conf.Sync.Provider { 806 return 807 } 808 809 if util.ContainerDocker == util.Container { 810 return 811 } 812 813 webSocketConnLock.Lock() 814 defer webSocketConnLock.Unlock() 815 816 if nil != webSocketConn { 817 return 818 } 819 820 //logging.LogInfof("connecting sync websocket...") 821 var dialErr error 822 webSocketConn, dialErr = dialSyncWebSocket() 823 if nil != dialErr { 824 logging.LogWarnf("connect sync websocket failed: %s", dialErr) 825 return 826 } 827 logging.LogInfof("sync websocket connected") 828 829 webSocketConn.SetCloseHandler(func(code int, text string) error { 830 logging.LogWarnf("sync websocket closed: %d, %s", code, text) 831 return nil 832 }) 833 834 go func() { 835 defer logging.Recover() 836 837 for { 838 result := gulu.Ret.NewResult() 839 if readErr := webSocketConn.ReadJSON(&result); nil != readErr { 840 time.Sleep(1 * time.Second) 841 if closedSyncWebSocket.Load() { 842 return 843 } 844 845 reconnected := false 846 for retries := 0; retries < 7; retries++ { 847 time.Sleep(7 * time.Second) 848 if nil == Conf.GetUser() { 849 return 850 } 851 852 //logging.LogInfof("reconnecting sync websocket...") 853 webSocketConn, dialErr = dialSyncWebSocket() 854 if nil != dialErr { 855 logging.LogWarnf("reconnect sync websocket failed: %s", dialErr) 856 continue 857 } 858 859 logging.LogInfof("sync websocket reconnected") 860 reconnected = true 861 break 862 } 863 if !reconnected { 864 logging.LogWarnf("reconnect sync websocket failed, do not retry") 865 webSocketConn = nil 866 return 867 } 868 869 continue 870 } 871 872 logging.LogInfof("sync websocket message: %v", result) 873 data := result.Data.(map[string]interface{}) 874 switch data["cmd"].(string) { 875 case "synced": 876 // Improve data synchronization perception https://github.com/siyuan-note/siyuan/issues/13000 877 SyncDataDownload() 878 case "kernels": 879 onlineKernelsLock.Lock() 880 881 onlineKernels = []*OnlineKernel{} 882 for _, kernel := range data["kernels"].([]interface{}) { 883 kernelMap := kernel.(map[string]interface{}) 884 onlineKernels = append(onlineKernels, &OnlineKernel{ 885 ID: kernelMap["id"].(string), 886 Hostname: kernelMap["hostname"].(string), 887 OS: kernelMap["os"].(string), 888 Ver: kernelMap["ver"].(string), 889 }) 890 } 891 892 onlineKernelsLock.Unlock() 893 } 894 } 895 }() 896} 897 898var KernelID = gulu.Rand.String(7) 899 900func dialSyncWebSocket() (c *websocket.Conn, err error) { 901 endpoint := util.GetCloudWebSocketServer() + "/apis/siyuan/dejavu/ws" 902 header := http.Header{ 903 "User-Agent": []string{util.UserAgent}, 904 "x-siyuan-uid": []string{Conf.GetUser().UserId}, 905 "x-siyuan-kernel": []string{KernelID}, 906 "x-siyuan-ver": []string{util.Ver}, 907 "x-siyuan-os": []string{runtime.GOOS}, 908 "x-siyuan-hostname": []string{util.GetDeviceName()}, 909 "x-siyuan-repo": []string{Conf.Sync.CloudName}, 910 } 911 c, _, err = websocket.DefaultDialer.Dial(endpoint, header) 912 if err == nil { 913 closedSyncWebSocket.Store(false) 914 } 915 return 916}