A privacy-first, self-hosted, fully open source personal knowledge management software, written in typescript and golang. (PERSONAL FORK)
1// SiYuan - Refactor your thinking
2// Copyright (c) 2020-present, b3log.org
3//
4// This program is free software: you can redistribute it and/or modify
5// it under the terms of the GNU Affero General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// This program is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12// GNU Affero General Public License for more details.
13//
14// You should have received a copy of the GNU Affero General Public License
15// along with this program. If not, see <https://www.gnu.org/licenses/>.
16
17package model
18
19import (
20 "errors"
21 "fmt"
22 "net/http"
23 "os"
24 "path/filepath"
25 "runtime"
26 "strings"
27 "sync"
28 "sync/atomic"
29 "time"
30
31 "github.com/88250/go-humanize"
32 "github.com/88250/gulu"
33 "github.com/88250/lute/html"
34 "github.com/gorilla/websocket"
35 "github.com/siyuan-note/dejavu"
36 "github.com/siyuan-note/dejavu/cloud"
37 "github.com/siyuan-note/logging"
38 "github.com/siyuan-note/siyuan/kernel/cache"
39 "github.com/siyuan-note/siyuan/kernel/conf"
40 "github.com/siyuan-note/siyuan/kernel/filesys"
41 "github.com/siyuan-note/siyuan/kernel/sql"
42 "github.com/siyuan-note/siyuan/kernel/treenode"
43 "github.com/siyuan-note/siyuan/kernel/util"
44)
45
46func SyncDataDownload() {
47 defer logging.Recover()
48
49 if !checkSync(false, false, true) {
50 return
51 }
52
53 util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil)
54 if !isProviderOnline(true) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈
55 util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil)
56 return
57 }
58
59 lockSync()
60 defer unlockSync()
61
62 now := util.CurrentTimeMillis()
63 Conf.Sync.Synced = now
64
65 err := syncRepoDownload()
66 code := 1
67 if err != nil {
68 code = 2
69 }
70 util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil)
71}
72
73func SyncDataUpload() {
74 defer logging.Recover()
75
76 if !checkSync(false, false, true) {
77 return
78 }
79
80 util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil)
81 if !isProviderOnline(true) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈
82 util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil)
83 return
84 }
85
86 lockSync()
87 defer unlockSync()
88
89 now := util.CurrentTimeMillis()
90 Conf.Sync.Synced = now
91
92 err := syncRepoUpload()
93 code := 1
94 if err != nil {
95 code = 2
96 }
97 util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil)
98 return
99}
100
101var (
102 syncSameCount = atomic.Int32{}
103 autoSyncErrCount = 0
104 fixSyncInterval = 5 * time.Minute
105
106 syncPlanTimeLock = sync.Mutex{}
107 syncPlanTime = time.Now().Add(fixSyncInterval)
108
109 BootSyncSucc = -1 // -1:未执行,0:执行成功,1:执行失败
110 ExitSyncSucc = -1
111)
112
113func SyncDataJob() {
114 syncPlanTimeLock.Lock()
115 if time.Now().Before(syncPlanTime) {
116 syncPlanTimeLock.Unlock()
117 return
118 }
119 syncPlanTimeLock.Unlock()
120
121 SyncData(false)
122}
123
124func BootSyncData() {
125 defer logging.Recover()
126
127 if Conf.Sync.Perception {
128 connectSyncWebSocket()
129 }
130
131 if !checkSync(true, false, false) {
132 return
133 }
134
135 if !isProviderOnline(false) {
136 BootSyncSucc = 1
137 util.PushErrMsg(Conf.Language(76), 7000)
138 return
139 }
140
141 lockSync()
142 defer unlockSync()
143
144 util.IncBootProgress(3, "Syncing data from the cloud...")
145 BootSyncSucc = 0
146 logging.LogInfof("sync before boot")
147
148 now := util.CurrentTimeMillis()
149 Conf.Sync.Synced = now
150 util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil)
151 err := bootSyncRepo()
152 code := 1
153 if err != nil {
154 code = 2
155 }
156 util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil)
157 return
158}
159
160func SyncData(byHand bool) {
161 syncData(false, byHand)
162}
163
164func lockSync() {
165 syncLock.Lock()
166 isSyncing.Store(true)
167}
168
169func unlockSync() {
170 isSyncing.Store(false)
171 syncLock.Unlock()
172}
173
174func syncData(exit, byHand bool) {
175 defer logging.Recover()
176
177 if !checkSync(false, exit, byHand) {
178 return
179 }
180
181 lockSync()
182 defer unlockSync()
183
184 util.BroadcastByType("main", "syncing", 0, Conf.Language(81), nil)
185 if !exit && !isProviderOnline(byHand) { // 这个操作比较耗时,所以要先推送 syncing 事件后再判断网络,这样才能给用户更即时的反馈
186 util.BroadcastByType("main", "syncing", 2, Conf.Language(28), nil)
187 return
188 }
189
190 if exit {
191 ExitSyncSucc = 0
192 logging.LogInfof("sync before exit")
193 msgId := util.PushMsg(Conf.Language(81), 1000*60*15)
194 defer func() {
195 util.PushClearMsg(msgId)
196 }()
197 }
198
199 now := util.CurrentTimeMillis()
200 Conf.Sync.Synced = now
201
202 dataChanged, err := syncRepo(exit, byHand)
203 code := 1
204 if err != nil {
205 code = 2
206 }
207 util.BroadcastByType("main", "syncing", code, Conf.Sync.Stat, nil)
208
209 if nil == webSocketConn && Conf.Sync.Perception {
210 // 如果 websocket 连接已经断开,则重新连接
211 connectSyncWebSocket()
212 }
213
214 if 1 == Conf.Sync.Mode && nil != webSocketConn && Conf.Sync.Perception && dataChanged {
215 // 如果处于自动同步模式且不是由 WS 触发的同步,则通知其他设备上的内核进行同步
216 request := map[string]interface{}{
217 "cmd": "synced",
218 "synced": Conf.Sync.Synced,
219 }
220 if writeErr := webSocketConn.WriteJSON(request); nil != writeErr {
221 logging.LogErrorf("write websocket message failed: %v", writeErr)
222 }
223 }
224 return
225}
226
227func checkSync(boot, exit, byHand bool) bool {
228 if 2 == Conf.Sync.Mode && !boot && !exit && !byHand { // 手动模式下只有启动和退出进行同步
229 return false
230 }
231
232 if 3 == Conf.Sync.Mode && !byHand { // 完全手动模式下只有手动进行同步
233 return false
234 }
235
236 if !Conf.Sync.Enabled {
237 if byHand {
238 util.PushMsg(Conf.Language(124), 5000)
239 }
240 return false
241 }
242
243 if !cloud.IsValidCloudDirName(Conf.Sync.CloudName) {
244 if byHand {
245 util.PushMsg(Conf.Language(123), 5000)
246 }
247 return false
248 }
249
250 switch Conf.Sync.Provider {
251 case conf.ProviderSiYuan:
252 if !IsSubscriber() {
253 return false
254 }
255 case conf.ProviderWebDAV, conf.ProviderS3, conf.ProviderLocal:
256 if !IsPaidUser() {
257 return false
258 }
259 }
260
261 if 7 < autoSyncErrCount && !byHand {
262 logging.LogErrorf("failed to auto-sync too many times, delay auto-sync 64 minutes")
263 util.PushErrMsg(Conf.Language(125), 1000*60*60)
264 planSyncAfter(64 * time.Minute)
265 return false
266 }
267 return true
268}
269
270// incReindex 增量重建索引。
271func incReindex(upserts, removes []string) (upsertRootIDs, removeRootIDs []string) {
272 upsertRootIDs = []string{}
273 removeRootIDs = []string{}
274
275 util.IncBootProgress(3, "Sync reindexing...")
276 removeRootIDs = removeIndexes(removes) // 先执行 remove,否则移动文档时 upsert 会被忽略,导致未被索引
277 upsertRootIDs = upsertIndexes(upserts)
278
279 if 1 > len(removeRootIDs) {
280 removeRootIDs = []string{}
281 }
282 if 1 > len(upsertRootIDs) {
283 upsertRootIDs = []string{}
284 }
285 return
286}
287
288func removeIndexes(removeFilePaths []string) (removeRootIDs []string) {
289 bootProgressPart := int32(10 / float64(len(removeFilePaths)))
290 for _, removeFile := range removeFilePaths {
291 if !strings.HasSuffix(removeFile, ".sy") {
292 continue
293 }
294
295 id := util.GetTreeID(removeFile)
296 removeRootIDs = append(removeRootIDs, id)
297 block := treenode.GetBlockTree(id)
298 if nil != block {
299 msg := fmt.Sprintf(Conf.Language(39), block.RootID)
300 util.IncBootProgress(bootProgressPart, msg)
301 util.PushStatusBar(msg)
302
303 bts := treenode.GetBlockTreesByRootID(block.RootID)
304 for _, b := range bts {
305 cache.RemoveBlockIAL(b.ID)
306 }
307 cache.RemoveDocIAL(block.Path)
308
309 treenode.RemoveBlockTreesByRootID(block.RootID)
310 sql.RemoveTreeQueue(block.RootID)
311 }
312 }
313
314 if 1 > len(removeRootIDs) {
315 removeRootIDs = []string{}
316 }
317 return
318}
319
320func upsertIndexes(upsertFilePaths []string) (upsertRootIDs []string) {
321 luteEngine := util.NewLute()
322 bootProgressPart := int32(10 / float64(len(upsertFilePaths)))
323 for _, upsertFile := range upsertFilePaths {
324 if !strings.HasSuffix(upsertFile, ".sy") {
325 continue
326 }
327
328 upsertFile = filepath.ToSlash(upsertFile)
329 if strings.HasPrefix(upsertFile, "/") {
330 upsertFile = upsertFile[1:]
331 }
332 idx := strings.Index(upsertFile, "/")
333 if 0 > idx {
334 // .sy 直接出现在 data 文件夹下,没有出现在笔记本文件夹下的情况
335 continue
336 }
337
338 box := upsertFile[:idx]
339 p := strings.TrimPrefix(upsertFile, box)
340 msg := fmt.Sprintf(Conf.Language(40), util.GetTreeID(p))
341 util.IncBootProgress(bootProgressPart, msg)
342 util.PushStatusBar(msg)
343
344 tree, err0 := filesys.LoadTree(box, p, luteEngine)
345 if nil != err0 {
346 continue
347 }
348 treenode.UpsertBlockTree(tree)
349 sql.UpsertTreeQueue(tree)
350
351 bts := treenode.GetBlockTreesByRootID(tree.ID)
352 for _, b := range bts {
353 cache.RemoveBlockIAL(b.ID)
354 }
355 cache.RemoveDocIAL(tree.Path)
356
357 upsertRootIDs = append(upsertRootIDs, tree.Root.ID)
358 }
359
360 if 1 > len(upsertRootIDs) {
361 upsertRootIDs = []string{}
362 }
363 return
364}
365
366func SetCloudSyncDir(name string) {
367 if !cloud.IsValidCloudDirName(name) {
368 util.PushErrMsg(Conf.Language(37), 5000)
369 return
370 }
371
372 if Conf.Sync.CloudName == name {
373 return
374 }
375
376 Conf.Sync.CloudName = name
377 Conf.Save()
378}
379
380func SetSyncGenerateConflictDoc(b bool) {
381 Conf.Sync.GenerateConflictDoc = b
382 Conf.Save()
383 return
384}
385
386func SetSyncEnable(b bool) {
387 Conf.Sync.Enabled = b
388 Conf.Save()
389 return
390}
391
392func SetSyncInterval(interval int) {
393 if 30 > interval {
394 interval = 30
395 }
396 if 43200 < interval {
397 interval = 43200
398 }
399
400 Conf.Sync.Interval = interval
401 Conf.Save()
402 planSyncAfter(time.Duration(interval) * time.Second)
403 return
404}
405
406func SetSyncPerception(b bool) {
407 if util.ContainerDocker == util.Container {
408 b = false
409 }
410
411 Conf.Sync.Perception = b
412 Conf.Save()
413
414 if b {
415 connectSyncWebSocket()
416 } else {
417 closeSyncWebSocket()
418 }
419 return
420}
421
422func SetSyncMode(mode int) {
423 Conf.Sync.Mode = mode
424 Conf.Save()
425 return
426}
427
428func SetSyncProvider(provider int) (err error) {
429 Conf.Sync.Provider = provider
430 Conf.Save()
431 return
432}
433
434func SetSyncProviderS3(s3 *conf.S3) (err error) {
435 s3.Endpoint = strings.TrimSpace(s3.Endpoint)
436 s3.Endpoint = util.NormalizeEndpoint(s3.Endpoint)
437 s3.AccessKey = strings.TrimSpace(s3.AccessKey)
438 s3.SecretKey = strings.TrimSpace(s3.SecretKey)
439 s3.Bucket = strings.TrimSpace(s3.Bucket)
440 s3.Region = strings.TrimSpace(s3.Region)
441 s3.Timeout = util.NormalizeTimeout(s3.Timeout)
442 s3.ConcurrentReqs = util.NormalizeConcurrentReqs(s3.ConcurrentReqs, conf.ProviderS3)
443
444 if !cloud.IsValidCloudDirName(s3.Bucket) {
445 util.PushErrMsg(Conf.Language(37), 5000)
446 return
447 }
448
449 Conf.Sync.S3 = s3
450 Conf.Save()
451 return
452}
453
454func SetSyncProviderWebDAV(webdav *conf.WebDAV) (err error) {
455 webdav.Endpoint = strings.TrimSpace(webdav.Endpoint)
456 webdav.Endpoint = util.NormalizeEndpoint(webdav.Endpoint)
457
458 // 不支持配置坚果云 WebDAV 进行同步 https://github.com/siyuan-note/siyuan/issues/7657
459 if strings.Contains(strings.ToLower(webdav.Endpoint), "dav.jianguoyun.com") {
460 err = errors.New(Conf.Language(194))
461 return
462 }
463
464 webdav.Username = strings.TrimSpace(webdav.Username)
465 webdav.Password = strings.TrimSpace(webdav.Password)
466 webdav.Timeout = util.NormalizeTimeout(webdav.Timeout)
467 webdav.ConcurrentReqs = util.NormalizeConcurrentReqs(webdav.ConcurrentReqs, conf.ProviderWebDAV)
468
469 Conf.Sync.WebDAV = webdav
470 Conf.Save()
471 return
472}
473
474func SetSyncProviderLocal(local *conf.Local) (err error) {
475 local.Endpoint = strings.TrimSpace(local.Endpoint)
476 local.Endpoint = util.NormalizeLocalPath(local.Endpoint)
477
478 absPath, err := filepath.Abs(local.Endpoint)
479 if nil != err {
480 msg := fmt.Sprintf("get endpoint [%s] abs path failed: %s", local.Endpoint, err)
481 logging.LogErrorf(msg)
482 err = errors.New(fmt.Sprintf(Conf.Language(77), msg))
483 return
484 }
485 if !gulu.File.IsExist(absPath) {
486 msg := fmt.Sprintf("endpoint [%s] not exist", local.Endpoint)
487 logging.LogErrorf(msg)
488 err = errors.New(fmt.Sprintf(Conf.Language(77), msg))
489 return
490 }
491 if util.IsAbsPathInWorkspace(absPath) || filepath.Clean(absPath) == filepath.Clean(util.WorkspaceDir) {
492 msg := fmt.Sprintf("endpoint [%s] is in workspace", local.Endpoint)
493 logging.LogErrorf(msg)
494 err = errors.New(fmt.Sprintf(Conf.Language(77), msg))
495 return
496 }
497
498 if util.IsSubPath(absPath, util.WorkspaceDir) {
499 msg := fmt.Sprintf("endpoint [%s] is parent of workspace", local.Endpoint)
500 logging.LogErrorf(msg)
501 err = errors.New(fmt.Sprintf(Conf.Language(77), msg))
502 return
503 }
504
505 local.Timeout = util.NormalizeTimeout(local.Timeout)
506 local.ConcurrentReqs = util.NormalizeConcurrentReqs(local.ConcurrentReqs, conf.ProviderLocal)
507
508 Conf.Sync.Local = local
509 Conf.Save()
510 return
511}
512
513var (
514 syncLock = sync.Mutex{}
515 isSyncing = atomic.Bool{}
516)
517
518func CreateCloudSyncDir(name string) (err error) {
519 switch Conf.Sync.Provider {
520 case conf.ProviderSiYuan, conf.ProviderLocal:
521 break
522 default:
523 err = errors.New(Conf.Language(131))
524 return
525 }
526
527 name = strings.TrimSpace(name)
528 name = util.RemoveInvalid(name)
529 if !cloud.IsValidCloudDirName(name) {
530 return errors.New(Conf.Language(37))
531 }
532
533 repo, err := newRepository()
534 if err != nil {
535 return
536 }
537
538 err = repo.CreateCloudRepo(name)
539 if err != nil {
540 err = errors.New(formatRepoErrorMsg(err))
541 return
542 }
543 return
544}
545
546func RemoveCloudSyncDir(name string) (err error) {
547 switch Conf.Sync.Provider {
548 case conf.ProviderSiYuan, conf.ProviderLocal:
549 break
550 default:
551 err = errors.New(Conf.Language(131))
552 return
553 }
554
555 msgId := util.PushMsg(Conf.Language(116), 15000)
556
557 if "" == name {
558 return
559 }
560
561 repo, err := newRepository()
562 if err != nil {
563 return
564 }
565
566 err = repo.RemoveCloudRepo(name)
567 if err != nil {
568 err = errors.New(formatRepoErrorMsg(err))
569 return
570 }
571
572 util.PushClearMsg(msgId)
573 time.Sleep(500 * time.Millisecond)
574 if Conf.Sync.CloudName == name {
575 Conf.Sync.CloudName = "main"
576 Conf.Save()
577 util.PushMsg(Conf.Language(155), 5000)
578 }
579 return
580}
581
582func ListCloudSyncDir() (syncDirs []*Sync, hSize string, err error) {
583 syncDirs = []*Sync{}
584 var dirs []*cloud.Repo
585 var size int64
586
587 repo, err := newRepository()
588 if err != nil {
589 return
590 }
591
592 dirs, size, err = repo.GetCloudRepos()
593 if err != nil {
594 err = errors.New(formatRepoErrorMsg(err))
595 return
596 }
597 if 1 > len(dirs) {
598 dirs = append(dirs, &cloud.Repo{
599 Name: "main",
600 Size: 0,
601 Updated: time.Now().Format("2006-01-02 15:04:05"),
602 })
603 }
604
605 for _, d := range dirs {
606 dirSize := d.Size
607 sync := &Sync{
608 Size: dirSize,
609 HSize: "-",
610 Updated: d.Updated,
611 CloudName: d.Name,
612 }
613 if conf.ProviderSiYuan == Conf.Sync.Provider {
614 sync.HSize = humanize.BytesCustomCeil(uint64(dirSize), 2)
615 }
616 syncDirs = append(syncDirs, sync)
617 }
618 hSize = "-"
619 if conf.ProviderSiYuan == Conf.Sync.Provider {
620 hSize = humanize.BytesCustomCeil(uint64(size), 2)
621 }
622 if conf.ProviderS3 == Conf.Sync.Provider {
623 Conf.Sync.CloudName = syncDirs[0].CloudName
624 Conf.Save()
625 }
626 return
627}
628
629func formatRepoErrorMsg(err error) string {
630 msg := html.EscapeString(err.Error())
631 if errors.Is(err, cloud.ErrCloudAuthFailed) {
632 msg = Conf.Language(31)
633 } else if errors.Is(err, cloud.ErrCloudObjectNotFound) {
634 msg = Conf.Language(129)
635 } else if errors.Is(err, dejavu.ErrLockCloudFailed) {
636 msg = Conf.Language(188)
637 } else if errors.Is(err, dejavu.ErrCloudLocked) {
638 msg = Conf.Language(189)
639 } else if errors.Is(err, dejavu.ErrRepoFatal) {
640 msg = Conf.Language(23)
641 } else if errors.Is(err, cloud.ErrSystemTimeIncorrect) {
642 msg = Conf.Language(195)
643 } else if errors.Is(err, cloud.ErrDeprecatedVersion) {
644 msg = Conf.Language(212)
645 } else if errors.Is(err, cloud.ErrCloudCheckFailed) {
646 msg = Conf.Language(213)
647 } else if errors.Is(err, cloud.ErrCloudServiceUnavailable) {
648 msg = Conf.language(219)
649 } else if errors.Is(err, cloud.ErrCloudForbidden) {
650 msg = Conf.language(249)
651 } else if errors.Is(err, cloud.ErrCloudTooManyRequests) {
652 msg = Conf.language(250)
653 } else {
654 logging.LogErrorf("sync failed caused by network: %s", msg)
655 msgLowerCase := strings.ToLower(msg)
656 if strings.Contains(msgLowerCase, "permission denied") || strings.Contains(msg, "access is denied") {
657 msg = Conf.Language(33)
658 } else if strings.Contains(msgLowerCase, "region was not a valid") {
659 msg = Conf.language(254)
660 } else if strings.Contains(msgLowerCase, "device or resource busy") || strings.Contains(msg, "is being used by another") {
661 msg = fmt.Sprintf(Conf.Language(85), err)
662 } else if strings.Contains(msgLowerCase, "cipher: message authentication failed") {
663 msg = Conf.Language(135)
664 } else if strings.Contains(msgLowerCase, "no such host") || strings.Contains(msgLowerCase, "connection failed") || strings.Contains(msgLowerCase, "hostname resolution") || strings.Contains(msgLowerCase, "No address associated with hostname") {
665 msg = Conf.Language(24)
666 } else if strings.Contains(msgLowerCase, "net/http: request canceled while waiting for connection") || strings.Contains(msgLowerCase, "exceeded while awaiting") || strings.Contains(msgLowerCase, "context deadline exceeded") || strings.Contains(msgLowerCase, "timeout") || strings.Contains(msgLowerCase, "context cancellation while reading body") {
667 msg = Conf.Language(24)
668 } else if strings.Contains(msgLowerCase, "connection") || strings.Contains(msgLowerCase, "refused") || strings.Contains(msgLowerCase, "socket") || strings.Contains(msgLowerCase, "eof") || strings.Contains(msgLowerCase, "closed") || strings.Contains(msgLowerCase, "network") {
669 msg = Conf.Language(28)
670 }
671 }
672 msg += " (Provider: " + conf.ProviderToStr(Conf.Sync.Provider) + ")"
673 return msg
674}
675
676func getSyncIgnoreLines() (ret []string) {
677 ignore := filepath.Join(util.DataDir, ".siyuan", "syncignore")
678 err := os.MkdirAll(filepath.Dir(ignore), 0755)
679 if err != nil {
680 return
681 }
682 if !gulu.File.IsExist(ignore) {
683 if err = gulu.File.WriteFileSafer(ignore, nil, 0644); err != nil {
684 logging.LogErrorf("create syncignore [%s] failed: %s", ignore, err)
685 return
686 }
687 }
688 data, err := os.ReadFile(ignore)
689 if err != nil {
690 logging.LogErrorf("read syncignore [%s] failed: %s", ignore, err)
691 return
692 }
693 dataStr := string(data)
694 dataStr = strings.ReplaceAll(dataStr, "\r\n", "\n")
695 ret = strings.Split(dataStr, "\n")
696
697 // 默认忽略帮助文档
698 ret = append(ret, "20210808180117-6v0mkxr/**/*")
699 ret = append(ret, "20210808180117-czj9bvb/**/*")
700 ret = append(ret, "20211226090932-5lcq56f/**/*")
701 ret = append(ret, "20240530133126-axarxgx/**/*")
702
703 ret = gulu.Str.RemoveDuplicatedElem(ret)
704 return
705}
706
707func IncSync() {
708 syncSameCount.Store(0)
709 planSyncAfter(time.Duration(Conf.Sync.Interval) * time.Second)
710}
711
712func planSyncAfter(d time.Duration) {
713 syncPlanTimeLock.Lock()
714 syncPlanTime = time.Now().Add(d)
715 syncPlanTimeLock.Unlock()
716}
717
718func isProviderOnline(byHand bool) (ret bool) {
719 var checkURL string
720 skipTlsVerify := false
721 timeout := 3000
722 switch Conf.Sync.Provider {
723 case conf.ProviderSiYuan:
724 checkURL = util.GetCloudSyncServer()
725 case conf.ProviderS3:
726 checkURL = Conf.Sync.S3.Endpoint
727 skipTlsVerify = Conf.Sync.S3.SkipTlsVerify
728 timeout = Conf.Sync.S3.Timeout * 1000
729 case conf.ProviderWebDAV:
730 checkURL = Conf.Sync.WebDAV.Endpoint
731 skipTlsVerify = Conf.Sync.WebDAV.SkipTlsVerify
732 timeout = Conf.Sync.WebDAV.Timeout * 1000
733 case conf.ProviderLocal:
734 checkURL = "file://" + Conf.Sync.Local.Endpoint
735 timeout = Conf.Sync.Local.Timeout * 1000
736 default:
737 logging.LogWarnf("unknown provider: %d", Conf.Sync.Provider)
738 return false
739 }
740
741 if ret = util.IsOnline(checkURL, skipTlsVerify, timeout); !ret {
742 if 1 > autoSyncErrCount || byHand {
743 util.PushErrMsg(Conf.Language(76)+" (Provider: "+conf.ProviderToStr(Conf.Sync.Provider)+")", 5000)
744 }
745 if !byHand {
746 planSyncAfter(fixSyncInterval)
747 autoSyncErrCount++
748 }
749 }
750 return
751}
752
753var (
754 webSocketConn *websocket.Conn
755 webSocketConnLock = sync.Mutex{}
756)
757
758type OnlineKernel struct {
759 ID string `json:"id"`
760 Hostname string `json:"hostname"`
761 OS string `json:"os"`
762 Ver string `json:"ver"`
763}
764
765var (
766 onlineKernels []*OnlineKernel
767 onlineKernelsLock = sync.Mutex{}
768)
769
770func GetOnlineKernels() (ret []*OnlineKernel) {
771 ret = []*OnlineKernel{}
772 onlineKernelsLock.Lock()
773 tmp := onlineKernels
774 onlineKernelsLock.Unlock()
775 for _, kernel := range tmp {
776 if kernel.ID == KernelID {
777 continue
778 }
779
780 ret = append(ret, kernel)
781 }
782 return
783}
784
785var closedSyncWebSocket = atomic.Bool{}
786
787func closeSyncWebSocket() {
788 defer logging.Recover()
789
790 webSocketConnLock.Lock()
791 defer webSocketConnLock.Unlock()
792
793 if nil != webSocketConn {
794 webSocketConn.Close()
795 webSocketConn = nil
796 closedSyncWebSocket.Store(true)
797 }
798
799 logging.LogInfof("sync websocket closed")
800}
801
802func connectSyncWebSocket() {
803 defer logging.Recover()
804
805 if !Conf.Sync.Enabled || !IsSubscriber() || conf.ProviderSiYuan != Conf.Sync.Provider {
806 return
807 }
808
809 if util.ContainerDocker == util.Container {
810 return
811 }
812
813 webSocketConnLock.Lock()
814 defer webSocketConnLock.Unlock()
815
816 if nil != webSocketConn {
817 return
818 }
819
820 //logging.LogInfof("connecting sync websocket...")
821 var dialErr error
822 webSocketConn, dialErr = dialSyncWebSocket()
823 if nil != dialErr {
824 logging.LogWarnf("connect sync websocket failed: %s", dialErr)
825 return
826 }
827 logging.LogInfof("sync websocket connected")
828
829 webSocketConn.SetCloseHandler(func(code int, text string) error {
830 logging.LogWarnf("sync websocket closed: %d, %s", code, text)
831 return nil
832 })
833
834 go func() {
835 defer logging.Recover()
836
837 for {
838 result := gulu.Ret.NewResult()
839 if readErr := webSocketConn.ReadJSON(&result); nil != readErr {
840 time.Sleep(1 * time.Second)
841 if closedSyncWebSocket.Load() {
842 return
843 }
844
845 reconnected := false
846 for retries := 0; retries < 7; retries++ {
847 time.Sleep(7 * time.Second)
848 if nil == Conf.GetUser() {
849 return
850 }
851
852 //logging.LogInfof("reconnecting sync websocket...")
853 webSocketConn, dialErr = dialSyncWebSocket()
854 if nil != dialErr {
855 logging.LogWarnf("reconnect sync websocket failed: %s", dialErr)
856 continue
857 }
858
859 logging.LogInfof("sync websocket reconnected")
860 reconnected = true
861 break
862 }
863 if !reconnected {
864 logging.LogWarnf("reconnect sync websocket failed, do not retry")
865 webSocketConn = nil
866 return
867 }
868
869 continue
870 }
871
872 logging.LogInfof("sync websocket message: %v", result)
873 data := result.Data.(map[string]interface{})
874 switch data["cmd"].(string) {
875 case "synced":
876 // Improve data synchronization perception https://github.com/siyuan-note/siyuan/issues/13000
877 SyncDataDownload()
878 case "kernels":
879 onlineKernelsLock.Lock()
880
881 onlineKernels = []*OnlineKernel{}
882 for _, kernel := range data["kernels"].([]interface{}) {
883 kernelMap := kernel.(map[string]interface{})
884 onlineKernels = append(onlineKernels, &OnlineKernel{
885 ID: kernelMap["id"].(string),
886 Hostname: kernelMap["hostname"].(string),
887 OS: kernelMap["os"].(string),
888 Ver: kernelMap["ver"].(string),
889 })
890 }
891
892 onlineKernelsLock.Unlock()
893 }
894 }
895 }()
896}
897
898var KernelID = gulu.Rand.String(7)
899
900func dialSyncWebSocket() (c *websocket.Conn, err error) {
901 endpoint := util.GetCloudWebSocketServer() + "/apis/siyuan/dejavu/ws"
902 header := http.Header{
903 "User-Agent": []string{util.UserAgent},
904 "x-siyuan-uid": []string{Conf.GetUser().UserId},
905 "x-siyuan-kernel": []string{KernelID},
906 "x-siyuan-ver": []string{util.Ver},
907 "x-siyuan-os": []string{runtime.GOOS},
908 "x-siyuan-hostname": []string{util.GetDeviceName()},
909 "x-siyuan-repo": []string{Conf.Sync.CloudName},
910 }
911 c, _, err = websocket.DefaultDialer.Dial(endpoint, header)
912 if err == nil {
913 closedSyncWebSocket.Store(false)
914 }
915 return
916}