forked from
hailey.at/cocoon
An atproto PDS written in Go
1package server
2
3import (
4 "context"
5 "strconv"
6 "time"
7
8 "github.com/bluesky-social/indigo/events"
9 "github.com/bluesky-social/indigo/lex/util"
10 "github.com/btcsuite/websocket"
11 "github.com/haileyok/cocoon/metrics"
12 "github.com/labstack/echo/v4"
13)
14
15func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
16 ctx, cancel := context.WithCancel(e.Request().Context())
17 defer cancel()
18
19 logger := s.logger.With("component", "subscribe-repos-websocket")
20
21 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
22 if err != nil {
23 logger.Error("unable to establish websocket with relay", "err", err)
24 return err
25 }
26
27 ident := e.RealIP() + "-" + e.Request().UserAgent()
28 logger = logger.With("ident", ident)
29 logger.Info("new connection established")
30
31 var since *int64
32 if cursorStr := e.QueryParam("cursor"); cursorStr != "" {
33 cursor, err := strconv.ParseInt(cursorStr, 10, 64)
34 if err != nil {
35 logger.Warn("invalid cursor parameter", "cursor", cursorStr, "err", err)
36 } else {
37 since = &cursor
38 logger.Info("subscribing with cursor", "cursor", cursor)
39 }
40 }
41
42 metrics.RelaysConnected.WithLabelValues(ident).Inc()
43 defer func() {
44 metrics.RelaysConnected.WithLabelValues(ident).Dec()
45 }()
46
47 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
48 return true
49 }, since)
50 if err != nil {
51 return err
52 }
53 defer evtManCancel()
54
55 // drop the connection whenever a subscriber disconnects from the socket, we should get errors
56 go func() {
57 for {
58 select {
59 case <-ctx.Done():
60 return
61 default:
62 if _, _, err := conn.ReadMessage(); err != nil {
63 logger.Warn("websocket error", "err", err)
64 cancel()
65 return
66 }
67 }
68 }
69 }()
70
71 header := events.EventHeader{Op: events.EvtKindMessage}
72 for evt := range evts {
73 func() {
74 defer func() {
75 metrics.RelaySends.WithLabelValues(ident, header.MsgType).Inc()
76 }()
77
78 wc, err := conn.NextWriter(websocket.BinaryMessage)
79 if err != nil {
80 logger.Error("error writing message to relay", "err", err)
81 return
82 }
83
84 if ctx.Err() != nil {
85 logger.Error("context error", "err", err)
86 return
87 }
88
89 var obj util.CBOR
90 switch {
91 case evt.Error != nil:
92 header.Op = events.EvtKindErrorFrame
93 obj = evt.Error
94 case evt.RepoCommit != nil:
95 header.MsgType = "#commit"
96 obj = evt.RepoCommit
97 case evt.RepoIdentity != nil:
98 header.MsgType = "#identity"
99 obj = evt.RepoIdentity
100 case evt.RepoAccount != nil:
101 header.MsgType = "#account"
102 obj = evt.RepoAccount
103 case evt.RepoInfo != nil:
104 header.MsgType = "#info"
105 obj = evt.RepoInfo
106 default:
107 logger.Warn("unrecognized event kind")
108 return
109 }
110
111 if err := header.MarshalCBOR(wc); err != nil {
112 logger.Error("failed to write header to relay", "err", err)
113 return
114 }
115
116 if err := obj.MarshalCBOR(wc); err != nil {
117 logger.Error("failed to write event to relay", "err", err)
118 return
119 }
120
121 if err := wc.Close(); err != nil {
122 logger.Error("failed to flush-close our event write", "err", err)
123 return
124 }
125 }()
126 }
127
128 // we should tell the relay to request a new crawl at this point if we got disconnected
129 // use a new context since the old one might be cancelled at this point
130 go func() {
131 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
132 defer retryCancel()
133 if err := s.requestCrawl(retryCtx); err != nil {
134 logger.Error("error requesting crawls", "err", err)
135 }
136 }()
137
138 return nil
139}