An atproto PDS written in Go
at push-based 139 lines 3.5 kB view raw
1package server 2 3import ( 4 "context" 5 "strconv" 6 "time" 7 8 "github.com/bluesky-social/indigo/events" 9 "github.com/bluesky-social/indigo/lex/util" 10 "github.com/btcsuite/websocket" 11 "github.com/haileyok/cocoon/metrics" 12 "github.com/labstack/echo/v4" 13) 14 15func (s *Server) handleSyncSubscribeRepos(e echo.Context) error { 16 ctx, cancel := context.WithCancel(e.Request().Context()) 17 defer cancel() 18 19 logger := s.logger.With("component", "subscribe-repos-websocket") 20 21 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10) 22 if err != nil { 23 logger.Error("unable to establish websocket with relay", "err", err) 24 return err 25 } 26 27 ident := e.RealIP() + "-" + e.Request().UserAgent() 28 logger = logger.With("ident", ident) 29 logger.Info("new connection established") 30 31 var since *int64 32 if cursorStr := e.QueryParam("cursor"); cursorStr != "" { 33 cursor, err := strconv.ParseInt(cursorStr, 10, 64) 34 if err != nil { 35 logger.Warn("invalid cursor parameter", "cursor", cursorStr, "err", err) 36 } else { 37 since = &cursor 38 logger.Info("subscribing with cursor", "cursor", cursor) 39 } 40 } 41 42 metrics.RelaysConnected.WithLabelValues(ident).Inc() 43 defer func() { 44 metrics.RelaysConnected.WithLabelValues(ident).Dec() 45 }() 46 47 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool { 48 return true 49 }, since) 50 if err != nil { 51 return err 52 } 53 defer evtManCancel() 54 55 // drop the connection whenever a subscriber disconnects from the socket, we should get errors 56 go func() { 57 for { 58 select { 59 case <-ctx.Done(): 60 return 61 default: 62 if _, _, err := conn.ReadMessage(); err != nil { 63 logger.Warn("websocket error", "err", err) 64 cancel() 65 return 66 } 67 } 68 } 69 }() 70 71 header := events.EventHeader{Op: events.EvtKindMessage} 72 for evt := range evts { 73 func() { 74 defer func() { 75 metrics.RelaySends.WithLabelValues(ident, header.MsgType).Inc() 76 }() 77 78 wc, err := conn.NextWriter(websocket.BinaryMessage) 79 if err != nil { 80 logger.Error("error writing message to relay", "err", err) 81 return 82 } 83 84 if ctx.Err() != nil { 85 logger.Error("context error", "err", err) 86 return 87 } 88 89 var obj util.CBOR 90 switch { 91 case evt.Error != nil: 92 header.Op = events.EvtKindErrorFrame 93 obj = evt.Error 94 case evt.RepoCommit != nil: 95 header.MsgType = "#commit" 96 obj = evt.RepoCommit 97 case evt.RepoIdentity != nil: 98 header.MsgType = "#identity" 99 obj = evt.RepoIdentity 100 case evt.RepoAccount != nil: 101 header.MsgType = "#account" 102 obj = evt.RepoAccount 103 case evt.RepoInfo != nil: 104 header.MsgType = "#info" 105 obj = evt.RepoInfo 106 default: 107 logger.Warn("unrecognized event kind") 108 return 109 } 110 111 if err := header.MarshalCBOR(wc); err != nil { 112 logger.Error("failed to write header to relay", "err", err) 113 return 114 } 115 116 if err := obj.MarshalCBOR(wc); err != nil { 117 logger.Error("failed to write event to relay", "err", err) 118 return 119 } 120 121 if err := wc.Close(); err != nil { 122 logger.Error("failed to flush-close our event write", "err", err) 123 return 124 } 125 }() 126 } 127 128 // we should tell the relay to request a new crawl at this point if we got disconnected 129 // use a new context since the old one might be cancelled at this point 130 go func() { 131 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second) 132 defer retryCancel() 133 if err := s.requestCrawl(retryCtx); err != nil { 134 logger.Error("error requesting crawls", "err", err) 135 } 136 }() 137 138 return nil 139}