A privacy-first, self-hosted, fully open source personal knowledge management software, written in typescript and golang. (PERSONAL FORK)
at lambda-fork/main 498 lines 13 kB view raw
1// SiYuan - Refactor your thinking 2// Copyright (c) 2020-present, b3log.org 3// 4// This program is free software: you can redistribute it and/or modify 5// it under the terms of the GNU Affero General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// This program is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU Affero General Public License for more details. 13// 14// You should have received a copy of the GNU Affero General Public License 15// along with this program. If not, see <https://www.gnu.org/licenses/>. 16 17package util 18 19import ( 20 "sync" 21 "time" 22 23 "github.com/88250/gulu" 24 "github.com/olahol/melody" 25 "github.com/siyuan-note/eventbus" 26) 27 28var ( 29 WebSocketServer *melody.Melody 30 31 // map[string]map[string]*melody.Session{} 32 sessions = sync.Map{} // {appId, {sessionId, session}} 33) 34 35func BroadcastByTypeAndExcludeApp(excludeApp, typ, cmd string, code int, msg string, data interface{}) { 36 sessions.Range(func(key, value interface{}) bool { 37 appSessions := value.(*sync.Map) 38 if key == excludeApp { 39 return true 40 } 41 42 appSessions.Range(func(key, value interface{}) bool { 43 session := value.(*melody.Session) 44 if t, ok := session.Get("type"); ok && typ == t { 45 event := NewResult() 46 event.Cmd = cmd 47 event.Code = code 48 event.Msg = msg 49 event.Data = data 50 session.Write(event.Bytes()) 51 } 52 return true 53 }) 54 return true 55 }) 56} 57 58func BroadcastByTypeAndApp(typ, app, cmd string, code int, msg string, data interface{}) { 59 appSessions, ok := sessions.Load(app) 60 if !ok { 61 return 62 } 63 64 appSessions.(*sync.Map).Range(func(key, value interface{}) bool { 65 session := value.(*melody.Session) 66 if t, ok := session.Get("type"); ok && typ == t { 67 event := NewResult() 68 event.Cmd = cmd 69 event.Code = code 70 event.Msg = msg 71 event.Data = data 72 session.Write(event.Bytes()) 73 } 74 return true 75 }) 76} 77 78// BroadcastByType 广播所有实例上 typ 类型的会话。 79func BroadcastByType(typ, cmd string, code int, msg string, data interface{}) { 80 typeSessions := SessionsByType(typ) 81 for _, sess := range typeSessions { 82 event := NewResult() 83 event.Cmd = cmd 84 event.Code = code 85 event.Msg = msg 86 event.Data = data 87 sess.Write(event.Bytes()) 88 } 89} 90 91func SessionsByType(typ string) (ret []*melody.Session) { 92 ret = []*melody.Session{} 93 94 sessions.Range(func(key, value interface{}) bool { 95 appSessions := value.(*sync.Map) 96 appSessions.Range(func(key, value interface{}) bool { 97 session := value.(*melody.Session) 98 if t, ok := session.Get("type"); ok && typ == t { 99 ret = append(ret, session) 100 } 101 return true 102 }) 103 return true 104 }) 105 return 106} 107 108func AddPushChan(session *melody.Session) { 109 appID := session.Request.URL.Query().Get("app") 110 session.Set("app", appID) 111 id := session.Request.URL.Query().Get("id") 112 session.Set("id", id) 113 typ := session.Request.URL.Query().Get("type") 114 session.Set("type", typ) 115 116 if appSessions, ok := sessions.Load(appID); !ok { 117 appSess := &sync.Map{} 118 appSess.Store(id, session) 119 sessions.Store(appID, appSess) 120 } else { 121 (appSessions.(*sync.Map)).Store(id, session) 122 } 123} 124 125func RemovePushChan(session *melody.Session) { 126 app, _ := session.Get("app") 127 id, _ := session.Get("id") 128 129 if nil == app || nil == id { 130 return 131 } 132 133 appSess, _ := sessions.Load(app) 134 if nil != appSess { 135 appSessions := appSess.(*sync.Map) 136 appSessions.Delete(id) 137 if 1 > lenOfSyncMap(appSessions) { 138 sessions.Delete(app) 139 } 140 } 141} 142 143func lenOfSyncMap(m *sync.Map) (ret int) { 144 m.Range(func(key, value interface{}) bool { 145 ret++ 146 return true 147 }) 148 return 149} 150 151func ClosePushChan(id string) { 152 sessions.Range(func(key, value interface{}) bool { 153 appSessions := value.(*sync.Map) 154 appSessions.Range(func(key, value interface{}) bool { 155 session := value.(*melody.Session) 156 if sid, _ := session.Get("id"); sid == id { 157 session.CloseWithMsg([]byte(" close websocket")) 158 RemovePushChan(session) 159 } 160 return true 161 }) 162 return true 163 }) 164} 165 166func ReloadUIResetScroll() { 167 BroadcastByType("main", "reloadui", 0, "", map[string]interface{}{"resetScroll": true}) 168} 169 170func ReloadUI() { 171 BroadcastByType("main", "reloadui", 0, "", nil) 172} 173 174func PushTxErr(msg string, code int, data interface{}) { 175 BroadcastByType("main", "txerr", code, msg, data) 176} 177 178func PushUpdateMsg(msgId string, msg string, timeout int) { 179 BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout}) 180} 181 182func PushMsg(msg string, timeout int) (msgId string) { 183 msgId = gulu.Rand.String(7) 184 BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout}) 185 return 186} 187 188func PushMsgWithApp(app, msg string, timeout int) (msgId string) { 189 msgId = gulu.Rand.String(7) 190 if "" == app { 191 BroadcastByType("main", "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout}) 192 return 193 } 194 BroadcastByTypeAndApp("main", app, "msg", 0, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout}) 195 return 196} 197 198func PushErrMsg(msg string, timeout int) (msgId string) { 199 msgId = gulu.Rand.String(7) 200 BroadcastByType("main", "msg", -1, msg, map[string]interface{}{"id": msgId, "closeTimeout": timeout}) 201 return 202} 203 204func PushStatusBar(msg string) { 205 msg += " (" + time.Now().Format("2006-01-02 15:04:05") + ")" 206 BroadcastByType("main", "statusbar", 0, msg, nil) 207} 208 209func PushBackgroundTask(data map[string]interface{}) { 210 BroadcastByType("main", "backgroundtask", 0, "", data) 211} 212 213func PushReloadFiletree() { 214 BroadcastByType("filetree", "reloadFiletree", 0, "", nil) 215} 216 217func PushReloadTag() { 218 BroadcastByType("main", "reloadTag", 0, "", nil) 219} 220 221type BlockStatResult struct { 222 RuneCount int `json:"runeCount"` 223 WordCount int `json:"wordCount"` 224 LinkCount int `json:"linkCount"` 225 ImageCount int `json:"imageCount"` 226 RefCount int `json:"refCount"` 227 BlockCount int `json:"blockCount"` 228} 229 230func ContextPushMsg(context map[string]interface{}, msg string) { 231 switch context[eventbus.CtxPushMsg].(int) { 232 case eventbus.CtxPushMsgToNone: 233 break 234 case eventbus.CtxPushMsgToProgress: 235 PushEndlessProgress(msg) 236 case eventbus.CtxPushMsgToStatusBar: 237 PushStatusBar(msg) 238 case eventbus.CtxPushMsgToStatusBarAndProgress: 239 PushStatusBar(msg) 240 PushEndlessProgress(msg) 241 } 242} 243 244const ( 245 PushProgressCodeProgressed = 0 // 有进度 246 PushProgressCodeEndless = 1 // 无进度 247 PushProgressCodeEnd = 2 // 关闭进度 248) 249 250func PushClearAllMsg() { 251 ClearPushProgress(100) 252 PushClearMsg("") 253} 254 255func ClearPushProgress(total int) { 256 PushProgress(PushProgressCodeEnd, total, total, "") 257} 258 259func PushEndlessProgress(msg string) { 260 PushProgress(PushProgressCodeEndless, 1, 1, msg) 261} 262 263func PushProgress(code, current, total int, msg string) { 264 BroadcastByType("main", "progress", code, msg, map[string]interface{}{ 265 "current": current, 266 "total": total, 267 }) 268} 269 270// PushClearMsg 会清空指定消息。 271func PushClearMsg(msgId string) { 272 BroadcastByType("main", "cmsg", 0, "", map[string]interface{}{"id": msgId}) 273} 274 275// PushClearProgress 取消进度遮罩。 276func PushClearProgress() { 277 BroadcastByType("main", "cprogress", 0, "", nil) 278} 279 280func PushUpdateIDs(ids map[string]string) { 281 BroadcastByType("main", "updateids", 0, "", ids) 282} 283 284func PushReloadDoc(rootID string) { 285 BroadcastByType("main", "reloaddoc", 0, "", rootID) 286} 287 288func PushSaveDoc(rootID, typ string, sources interface{}) { 289 evt := NewCmdResult("savedoc", 0, PushModeBroadcast) 290 evt.Data = map[string]interface{}{ 291 "rootID": rootID, 292 "type": typ, 293 "sources": sources, 294 } 295 PushEvent(evt) 296} 297 298func PushReloadDocInfo(docInfo map[string]any) { 299 BroadcastByType("filetree", "reloadDocInfo", 0, "", docInfo) 300} 301 302func PushReloadProtyle(rootID string) { 303 BroadcastByType("protyle", "reload", 0, "", rootID) 304} 305 306func PushSetRefDynamicText(rootID, blockID, defBlockID, refText string) { 307 BroadcastByType("main", "setRefDynamicText", 0, "", map[string]interface{}{"rootID": rootID, "blockID": blockID, "defBlockID": defBlockID, "refText": refText}) 308} 309 310func PushSetDefRefCount(rootID, blockID string, defIDs []string, refCount, rootRefCount int) { 311 BroadcastByType("main", "setDefRefCount", 0, "", map[string]interface{}{"rootID": rootID, "blockID": blockID, "refCount": refCount, "rootRefCount": rootRefCount, "defIDs": defIDs}) 312} 313 314func PushLocalShorthandCount(count int) { 315 BroadcastByType("main", "setLocalShorthandCount", 0, "", map[string]interface{}{"count": count}) 316} 317 318func PushProtyleLoading(rootID, msg string) { 319 BroadcastByType("protyle", "addLoading", 0, msg, rootID) 320} 321 322func PushReloadEmojiConf() { 323 BroadcastByType("main", "reloadEmojiConf", 0, "", nil) 324} 325 326func PushDownloadProgress(id string, percent float32) { 327 evt := NewCmdResult("downloadProgress", 0, PushModeBroadcast) 328 evt.Data = map[string]interface{}{ 329 "id": id, 330 "percent": percent, 331 } 332 PushEvent(evt) 333} 334 335func PushEvent(event *Result) { 336 msg := event.Bytes() 337 mode := event.PushMode 338 switch mode { 339 case PushModeBroadcast: 340 Broadcast(msg) 341 case PushModeSingleSelf: 342 single(msg, event.AppId, event.SessionId) 343 case PushModeBroadcastExcludeSelf: 344 broadcastOthers(msg, event.SessionId) 345 case PushModeBroadcastExcludeSelfApp: 346 broadcastOtherApps(msg, event.AppId) 347 case PushModeBroadcastApp: 348 broadcastApp(msg, event.AppId) 349 case PushModeBroadcastMainExcludeSelfApp: 350 broadcastOtherAppMains(msg, event.AppId) 351 } 352} 353 354func single(msg []byte, appId, sid string) { 355 sessions.Range(func(key, value interface{}) bool { 356 appSessions := value.(*sync.Map) 357 if key != appId { 358 return true 359 } 360 361 appSessions.Range(func(key, value interface{}) bool { 362 session := value.(*melody.Session) 363 if id, _ := session.Get("id"); id == sid { 364 session.Write(msg) 365 } 366 return true 367 }) 368 return true 369 }) 370} 371 372func Broadcast(msg []byte) { 373 sessions.Range(func(key, value interface{}) bool { 374 appSessions := value.(*sync.Map) 375 appSessions.Range(func(key, value interface{}) bool { 376 session := value.(*melody.Session) 377 session.Write(msg) 378 return true 379 }) 380 return true 381 }) 382} 383 384func broadcastOtherApps(msg []byte, excludeApp string) { 385 sessions.Range(func(key, value interface{}) bool { 386 appSessions := value.(*sync.Map) 387 appSessions.Range(func(key, value interface{}) bool { 388 session := value.(*melody.Session) 389 if app, _ := session.Get("app"); app == excludeApp { 390 return true 391 } 392 session.Write(msg) 393 return true 394 }) 395 return true 396 }) 397} 398 399func broadcastOtherAppMains(msg []byte, excludeApp string) { 400 sessions.Range(func(key, value interface{}) bool { 401 appSessions := value.(*sync.Map) 402 appSessions.Range(func(key, value interface{}) bool { 403 session := value.(*melody.Session) 404 if app, _ := session.Get("app"); app == excludeApp { 405 return true 406 } 407 408 if t, ok := session.Get("type"); ok && "main" != t { 409 return true 410 } 411 412 session.Write(msg) 413 return true 414 }) 415 return true 416 }) 417} 418 419func broadcastApp(msg []byte, app string) { 420 sessions.Range(func(key, value interface{}) bool { 421 appSessions := value.(*sync.Map) 422 appSessions.Range(func(key, value interface{}) bool { 423 session := value.(*melody.Session) 424 if sessionApp, _ := session.Get("app"); sessionApp != app { 425 return true 426 } 427 session.Write(msg) 428 return true 429 }) 430 return true 431 }) 432} 433 434func broadcastOthers(msg []byte, excludeSID string) { 435 sessions.Range(func(key, value interface{}) bool { 436 appSessions := value.(*sync.Map) 437 appSessions.Range(func(key, value interface{}) bool { 438 session := value.(*melody.Session) 439 if id, _ := session.Get("id"); id == excludeSID { 440 return true 441 } 442 session.Write(msg) 443 return true 444 }) 445 return true 446 }) 447} 448 449func CountSessions() (ret int) { 450 sessions.Range(func(key, value interface{}) bool { 451 ret++ 452 return true 453 }) 454 return 455} 456 457// ClosePublishServiceSessions 关闭所有发布服务的 WebSocket 连接 458func ClosePublishServiceSessions() { 459 if WebSocketServer == nil { 460 return 461 } 462 463 // 收集所有发布服务的会话 464 var publishSessions []*melody.Session 465 sessions.Range(func(key, value interface{}) bool { 466 appSessions := value.(*sync.Map) 467 appSessions.Range(func(key, value interface{}) bool { 468 session := value.(*melody.Session) 469 if isPublish, ok := session.Get("isPublish"); ok && isPublish == true { 470 publishSessions = append(publishSessions, session) 471 } 472 return true 473 }) 474 return true 475 }) 476 477 // 发送消息通知客户端关闭页面 478 for _, session := range publishSessions { 479 event := NewResult() 480 event.Cmd = "closepublishpage" 481 event.Code = 0 482 event.Msg = "SiYuan publish service closed" 483 event.Data = map[string]interface{}{ 484 "reason": "publish service closed", 485 } 486 session.Write(event.Bytes()) 487 } 488 489 // 等待一小段时间让消息发送完成、客户端刷新页面之后显示消息 490 time.Sleep(500 * time.Millisecond) 491 492 // 关闭所有发布服务的 WebSocket 连接 493 for _, session := range publishSessions { 494 // 使用 "close websocket" 作为关闭消息,客户端检测到后会停止重连 495 session.CloseWithMsg([]byte(" close websocket: publish service closed")) 496 RemovePushChan(session) 497 } 498}