websocket-based lrcproto server

add mediainit/mediapub

Changed files
+145 -52
+1 -1
go.mod
··· 4 4 5 5 require ( 6 6 github.com/gorilla/websocket v1.5.3 7 - github.com/rachel-mp4/lrcproto v0.0.0-20250905151943-8e3a1989ea5a 7 + github.com/rachel-mp4/lrcproto v1.2.0 8 8 google.golang.org/protobuf v1.36.6 9 9 )
+2 -6
go.sum
··· 2 2 github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= 3 3 github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= 4 4 github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= 5 - github.com/rachel-mp4/lrcproto v0.0.0-20250720164211-c6162669b709 h1:P//gJE0zFv9Qvfn8dvp9ZrnG0FZh2MVcAX+uOP2flRw= 6 - github.com/rachel-mp4/lrcproto v0.0.0-20250720164211-c6162669b709/go.mod h1:hQzO36tQELGbkmRnUtKeM6NMU34t79ZcTlhM+MO7pHw= 7 - github.com/rachel-mp4/lrcproto v0.0.0-20250905145450-74a49183ee1f h1:CZVsuwuS/5fDwB4X9AgobE1bjnhVFaIuedgZOuVtqeA= 8 - github.com/rachel-mp4/lrcproto v0.0.0-20250905145450-74a49183ee1f/go.mod h1:hQzO36tQELGbkmRnUtKeM6NMU34t79ZcTlhM+MO7pHw= 9 - github.com/rachel-mp4/lrcproto v0.0.0-20250905151943-8e3a1989ea5a h1:W3zPeGz/jHYyWj8ZfsuA9yigPMNlA/h7Fu+SQKgiBmQ= 10 - github.com/rachel-mp4/lrcproto v0.0.0-20250905151943-8e3a1989ea5a/go.mod h1:hQzO36tQELGbkmRnUtKeM6NMU34t79ZcTlhM+MO7pHw= 5 + github.com/rachel-mp4/lrcproto v1.2.0 h1:nZI80WQKO6yKgX0O5H6OO1cM/tiJqugs0p52KCoIDOw= 6 + github.com/rachel-mp4/lrcproto v1.2.0/go.mod h1:hQzO36tQELGbkmRnUtKeM6NMU34t79ZcTlhM+MO7pHw= 11 7 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= 12 8 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 13 9 google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
+19 -8
options.go
··· 8 8 ) 9 9 10 10 type options struct { 11 - uri string 12 - secret string 13 - welcome *string 14 - writer *io.Writer 15 - verbose bool 16 - pubChan chan PubEvent 17 - initChan chan lrcpb.Event_Init 18 - initialID *uint32 11 + uri string 12 + secret string 13 + welcome *string 14 + writer *io.Writer 15 + verbose bool 16 + pubChan chan PubEvent 17 + initChan chan lrcpb.Event_Init 18 + mediainitChan chan lrcpb.Event_Mediainit 19 + initialID *uint32 19 20 } 20 21 21 22 type Option func(option *options) error ··· 51 52 return errors.New("must provide a channel") 52 53 } 53 54 options.initChan = initChan 55 + return nil 56 + } 57 + } 58 + 59 + func WithMediainitChannel(mediainitChan chan lrcpb.Event_Mediainit) Option { 60 + return func(options *options) error { 61 + if mediainitChan == nil { 62 + return errors.New("must provide a channel") 63 + } 64 + options.mediainitChan = mediainitChan 54 65 return nil 55 66 } 56 67 }
+123 -37
server.go
··· 3 3 import ( 4 4 "context" 5 5 "errors" 6 + "fmt" 6 7 "github.com/gorilla/websocket" 7 8 "github.com/rachel-mp4/lrcproto/gen/go" 8 9 "google.golang.org/protobuf/proto" ··· 15 16 ) 16 17 17 18 type Server struct { 18 - secret string 19 - uri string 20 - eventBus chan clientEvent 21 - ctx context.Context 22 - cancel context.CancelFunc 23 - clients map[*client]bool 24 - clientsMu sync.Mutex 25 - idmapsMu sync.Mutex 26 - clientToID map[*client]*uint32 27 - idToClient map[uint32]*client 28 - lastID uint32 29 - logger *log.Logger 30 - debugLogger *log.Logger 31 - welcomeEvt []byte 32 - pongEvt []byte 33 - initChan chan lrcpb.Event_Init 34 - pubChan chan PubEvent 19 + secret string 20 + uri string 21 + eventBus chan clientEvent 22 + ctx context.Context 23 + cancel context.CancelFunc 24 + clients map[*client]bool 25 + clientsMu sync.Mutex 26 + idmapsMu sync.Mutex 27 + idToClient map[uint32]*client 28 + lastID uint32 29 + logger *log.Logger 30 + debugLogger *log.Logger 31 + welcomeEvt []byte 32 + pongEvt []byte 33 + initChan chan lrcpb.Event_Init 34 + mediainitChan chan lrcpb.Event_Mediainit 35 + pubChan chan PubEvent 35 36 } 36 37 37 38 type PubEvent struct { ··· 47 48 muteMap map[*client]bool 48 49 mutedBy map[*client]bool 49 50 myIDs []uint32 51 + textID *uint32 52 + mediaID *uint32 50 53 post *string 51 54 nick *string 52 55 externID *string ··· 84 87 85 88 if options.initChan != nil { 86 89 s.initChan = options.initChan 90 + } 91 + if options.mediainitChan != nil { 92 + s.mediainitChan = options.mediainitChan 87 93 } 88 94 if options.pubChan != nil { 89 95 s.pubChan = options.pubChan ··· 97 103 s.clients = make(map[*client]bool) 98 104 s.clientsMu = sync.Mutex{} 99 105 s.idmapsMu = sync.Mutex{} 100 - s.clientToID = make(map[*client]*uint32) 101 106 s.idToClient = make(map[uint32]*client) 102 107 s.eventBus = make(chan clientEvent, 100) 103 108 return &s, nil ··· 214 219 s.handlePub(client) 215 220 216 221 s.idmapsMu.Lock() 217 - delete(s.clientToID, client) 218 222 for _, id := range client.myIDs { // remove myself from the idToClient map 219 223 delete(s.idToClient, id) 220 224 } ··· 307 311 continue 308 312 case *lrcpb.Event_Init: 309 313 s.handleInit(msg, client) 314 + case *lrcpb.Event_Mediainit: 315 + s.handleMediainit(msg, client) 310 316 case *lrcpb.Event_Pub: 311 317 s.handlePub(client) 318 + case *lrcpb.Event_Mediapub: 319 + s.handleMediapub(msg, client) 312 320 case *lrcpb.Event_Insert: 313 321 s.handleInsert(msg, client) 314 322 case *lrcpb.Event_Delete: ··· 330 338 } 331 339 332 340 func (s *Server) handleInit(msg *lrcpb.Event_Init, client *client) { 333 - s.idmapsMu.Lock() 334 - curID := s.clientToID[client] 341 + curID := client.textID 335 342 if curID != nil { 336 - s.idmapsMu.Unlock() 337 343 return 338 344 } 345 + s.idmapsMu.Lock() 339 346 newID := s.lastID + 1 340 347 s.lastID = newID 341 - s.clientToID[client] = &newID 342 348 s.idToClient[newID] = client 343 349 s.idmapsMu.Unlock() 350 + client.textID = &newID 344 351 client.myIDs = append(client.myIDs, newID) 345 352 newpost := "" 346 353 client.post = &newpost ··· 393 400 } 394 401 } 395 402 } 403 + func (s *Server) handleMediainit(msg *lrcpb.Event_Mediainit, client *client) { 404 + curId := client.mediaID 405 + if curId != nil { 406 + return 407 + } 408 + s.idmapsMu.Lock() 409 + newID := s.lastID + 1 410 + s.lastID = newID 411 + s.idToClient[newID] = client 412 + s.idmapsMu.Unlock() 413 + client.mediaID = &newID 414 + client.myIDs = append(client.myIDs, newID) 415 + msg.Mediainit.Id = &newID 416 + msg.Mediainit.Nick = client.nick 417 + msg.Mediainit.ExternalID = client.externID 418 + msg.Mediainit.Color = client.color 419 + echoed := false 420 + msg.Mediainit.Echoed = &echoed 421 + msg.Mediainit.Nonce = nil 422 + if s.mediainitChan != nil { 423 + select { 424 + case s.mediainitChan <- *msg: 425 + default: 426 + s.log("initchan blocked, closing channel") 427 + close(s.mediainitChan) 428 + s.mediainitChan = nil 429 + } 430 + } 431 + s.broadcastMediainit(msg, client) 432 + } 433 + 434 + func (s *Server) broadcastMediainit(msg *lrcpb.Event_Mediainit, client *client) { 435 + stdEvent := &lrcpb.Event{Msg: msg} 436 + stdData, _ := proto.Marshal(stdEvent) 437 + echoed := true 438 + msg.Mediainit.Echoed = &echoed 439 + msg.Mediainit.Nonce = GenerateNonce(*msg.Mediainit.Id, s.uri, s.secret) 440 + echoEvent := &lrcpb.Event{Msg: msg} 441 + echoData, _ := proto.Marshal(echoEvent) 442 + muteEvent := &lrcpb.Event{Msg: &lrcpb.Event_Mute{Mute: &lrcpb.Mute{Id: msg.Mediainit.GetId()}}} 443 + muteData, _ := proto.Marshal(muteEvent) 444 + s.clientsMu.Lock() 445 + defer s.clientsMu.Unlock() 446 + for c := range s.clients { 447 + var dts []byte 448 + if c == client { 449 + dts = echoData 450 + } else if client.mutedBy[c] { 451 + dts = muteData 452 + } else { 453 + dts = stdData 454 + } 455 + select { 456 + case c.dataChan <- dts: 457 + s.logDebug("b mediainit") 458 + default: 459 + s.log("kicked client") 460 + client.cancel() 461 + } 462 + } 463 + } 396 464 397 465 func (s *Server) handlePub(client *client) { 398 - s.idmapsMu.Lock() 399 - curID := s.clientToID[client] 466 + curID := client.textID 400 467 if curID == nil { 401 - s.idmapsMu.Unlock() 402 468 return 403 469 } 404 - s.clientToID[client] = nil 405 - s.idmapsMu.Unlock() 470 + client.textID = nil 406 471 event := &lrcpb.Event{Msg: &lrcpb.Event_Pub{Pub: &lrcpb.Pub{Id: curID}}} 407 472 if s.pubChan != nil { 408 473 select { ··· 417 482 s.broadcast(event, client) 418 483 } 419 484 485 + func (s *Server) handleMediapub(msg *lrcpb.Event_Mediapub, client *client) { 486 + curID := client.mediaID 487 + if curID == nil { 488 + return 489 + } 490 + client.mediaID = nil 491 + msg.Mediapub.Id = curID 492 + body := "external media." 493 + if msg.Mediapub.Alt != nil { 494 + body += fmt.Sprintf(" alt=%s.", *msg.Mediapub.Alt) 495 + } 496 + if msg.Mediapub.ContentAddress != nil { 497 + body += fmt.Sprintf(" cid=%s.", *msg.Mediapub.ContentAddress) 498 + } 499 + if s.pubChan != nil { 500 + select { 501 + case s.pubChan <- PubEvent{ID: *curID, Body: body}: 502 + default: 503 + s.log("pubchan blocked, closing channel") 504 + close(s.pubChan) 505 + s.pubChan = nil 506 + } 507 + } 508 + event := &lrcpb.Event{Msg: msg} 509 + s.broadcast(event, client) 510 + } 511 + 420 512 func (s *Server) handleInsert(msg *lrcpb.Event_Insert, client *client) { 421 - s.idmapsMu.Lock() 422 - curID := s.clientToID[client] 423 - s.idmapsMu.Unlock() 513 + curID := client.textID 424 514 if curID == nil { 425 515 return 426 516 } ··· 452 542 } 453 543 454 544 func (s *Server) handleDelete(msg *lrcpb.Event_Delete, client *client) { 455 - s.idmapsMu.Lock() 456 - curID := s.clientToID[client] 457 - s.idmapsMu.Unlock() 545 + curID := client.textID 458 546 if curID == nil { 459 547 return 460 548 } ··· 502 590 } 503 591 504 592 func (s *Server) handleEditBatch(msg *lrcpb.Event_Editbatch, client *client) { 505 - s.idmapsMu.Lock() 506 - curID := s.clientToID[client] 507 - s.idmapsMu.Unlock() 593 + curID := client.textID 508 594 if curID == nil { 509 595 return 510 596 }