this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

detect ws disconnect & acks on outbox msgs

dholms 9352127a 7e8df665

+31 -6
+25 -6
nexus/handlers.go
··· 35 35 36 36 n.logger.Info("websocket connected") 37 37 38 + // read loop so we can detect if the client disconnects 39 + disconnected := make(chan struct{}) 40 + go func() { 41 + for { 42 + if _, _, err := ws.ReadMessage(); err != nil { 43 + close(disconnected) 44 + return 45 + } 46 + } 47 + }() 48 + 38 49 evtCh := n.outbox.Subscribe(c.Request().Context()) 39 50 40 - for evt := range evtCh { 41 - if err := ws.WriteJSON(evt); err != nil { 42 - n.logger.Info("websocket write error", "error", err) 43 - return err 51 + for { 52 + select { 53 + case <-disconnected: 54 + n.logger.Info("websocket disconnected") 55 + return nil 56 + case evt, ok := <-evtCh: 57 + if !ok { 58 + return nil 59 + } 60 + if err := ws.WriteJSON(evt); err != nil { 61 + n.logger.Info("websocket write error", "error", err) 62 + return err 63 + } 64 + close(evt.AckCh) 44 65 } 45 66 } 46 - 47 - return nil 48 67 } 49 68 50 69 type DidPayload struct {
+4
nexus/outbox.go
··· 60 60 continue 61 61 } 62 62 63 + outboxEvt.AckCh = make(chan struct{}) 64 + 63 65 o.events <- &outboxEvt 66 + 67 + <-outboxEvt.AckCh 64 68 65 69 if err := o.db.Delete(&evt).Error; err != nil { 66 70 o.logger.Error("failed to delete outbox event", "error", err, "id", evt.ID)
+2
nexus/types.go
··· 54 54 Type string `json:"type"` 55 55 RecordEvt *RecordEvt `json:"record,omitempty"` 56 56 UserEvt *UserEvt `json:"user,omitempty"` 57 + 58 + AckCh chan struct{} `json:"-"` 57 59 }