forked from hailey.at/cocoon
An atproto PDS written in Go
at main 3.2 kB view raw
1package server 2 3import ( 4 "context" 5 "time" 6 7 "github.com/bluesky-social/indigo/events" 8 "github.com/bluesky-social/indigo/lex/util" 9 "github.com/btcsuite/websocket" 10 "github.com/haileyok/cocoon/metrics" 11 "github.com/labstack/echo/v4" 12) 13 14func (s *Server) handleSyncSubscribeRepos(e echo.Context) error { 15 ctx, cancel := context.WithCancel(e.Request().Context()) 16 defer cancel() 17 18 logger := s.logger.With("component", "subscribe-repos-websocket") 19 20 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10) 21 if err != nil { 22 logger.Error("unable to establish websocket with relay", "err", err) 23 return err 24 } 25 26 ident := e.RealIP() + "-" + e.Request().UserAgent() 27 logger = logger.With("ident", ident) 28 logger.Info("new connection established") 29 30 metrics.RelaysConnected.WithLabelValues(ident).Inc() 31 defer func() { 32 metrics.RelaysConnected.WithLabelValues(ident).Dec() 33 }() 34 35 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 36 return true 37 }, nil) 38 if err != nil { 39 return err 40 } 41 defer evtManCancel() 42 43 // drop the connection whenever a subscriber disconnects from the socket, we should get errors 44 go func() { 45 for { 46 select { 47 case <-ctx.Done(): 48 return 49 default: 50 if _, _, err := conn.ReadMessage(); err != nil { 51 logger.Warn("websocket error", "err", err) 52 cancel() 53 return 54 } 55 } 56 } 57 }() 58 59 header := events.EventHeader{Op: events.EvtKindMessage} 60 for evt := range evts { 61 func() { 62 defer func() { 63 metrics.RelaySends.WithLabelValues(ident, header.MsgType).Inc() 64 }() 65 66 wc, err := conn.NextWriter(websocket.BinaryMessage) 67 if err != nil { 68 logger.Error("error writing message to relay", "err", err) 69 return 70 } 71 72 if ctx.Err() != nil { 73 logger.Error("context error", "err", err) 74 return 75 } 76 77 var obj util.CBOR 78 switch { 79 case evt.Error != nil: 80 header.Op = events.EvtKindErrorFrame 81 obj = evt.Error 82 case evt.RepoCommit != nil: 83 header.MsgType = "#commit" 84 obj = evt.RepoCommit 85 case evt.RepoIdentity != nil: 86 header.MsgType = "#identity" 87 obj = evt.RepoIdentity 88 case evt.RepoAccount != nil: 89 header.MsgType = "#account" 90 obj = evt.RepoAccount 91 case evt.RepoInfo != nil: 92 header.MsgType = "#info" 93 obj = evt.RepoInfo 94 default: 95 logger.Warn("unrecognized event kind") 96 return 97 } 98 99 if err := header.MarshalCBOR(wc); err != nil { 100 logger.Error("failed to write header to relay", "err", err) 101 return 102 } 103 104 if err := obj.MarshalCBOR(wc); err != nil { 105 logger.Error("failed to write event to relay", "err", err) 106 return 107 } 108 109 if err := wc.Close(); err != nil { 110 logger.Error("failed to flush-close our event write", "err", err) 111 return 112 } 113 }() 114 } 115 116 // we should tell the relay to request a new crawl at this point if we got disconnected 117 // use a new context since the old one might be cancelled at this point 118 go func() { 119 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 120 defer retryCancel() 121 if err := s.requestCrawl(retryCtx); err != nil { 122 logger.Error("error requesting crawls", "err", err) 123 } 124 }() 125 126 return nil 127}