1package server
2
3import (
4 "context"
5 "time"
6
7 "github.com/bluesky-social/indigo/events"
8 "github.com/bluesky-social/indigo/lex/util"
9 "github.com/btcsuite/websocket"
10 "github.com/haileyok/cocoon/metrics"
11 "github.com/labstack/echo/v4"
12)
13
14func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
15 ctx, cancel := context.WithCancel(e.Request().Context())
16 defer cancel()
17
18 logger := s.logger.With("component", "subscribe-repos-websocket")
19
20 conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
21 if err != nil {
22 logger.Error("unable to establish websocket with relay", "err", err)
23 return err
24 }
25
26 ident := e.RealIP() + "-" + e.Request().UserAgent()
27 logger = logger.With("ident", ident)
28 logger.Info("new connection established")
29
30 metrics.RelaysConnected.WithLabelValues(ident).Inc()
31 defer func() {
32 metrics.RelaysConnected.WithLabelValues(ident).Dec()
33 }()
34
35 evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
36 return true
37 }, nil)
38 if err != nil {
39 return err
40 }
41 defer evtManCancel()
42
43 // drop the connection whenever a subscriber disconnects from the socket, we should get errors
44 go func() {
45 for {
46 select {
47 case <-ctx.Done():
48 return
49 default:
50 if _, _, err := conn.ReadMessage(); err != nil {
51 logger.Warn("websocket error", "err", err)
52 cancel()
53 return
54 }
55 }
56 }
57 }()
58
59 header := events.EventHeader{Op: events.EvtKindMessage}
60 for evt := range evts {
61 func() {
62 defer func() {
63 metrics.RelaySends.WithLabelValues(ident, header.MsgType).Inc()
64 }()
65
66 wc, err := conn.NextWriter(websocket.BinaryMessage)
67 if err != nil {
68 logger.Error("error writing message to relay", "err", err)
69 return
70 }
71
72 if ctx.Err() != nil {
73 logger.Error("context error", "err", err)
74 return
75 }
76
77 var obj util.CBOR
78 switch {
79 case evt.Error != nil:
80 header.Op = events.EvtKindErrorFrame
81 obj = evt.Error
82 case evt.RepoCommit != nil:
83 header.MsgType = "#commit"
84 obj = evt.RepoCommit
85 case evt.RepoIdentity != nil:
86 header.MsgType = "#identity"
87 obj = evt.RepoIdentity
88 case evt.RepoAccount != nil:
89 header.MsgType = "#account"
90 obj = evt.RepoAccount
91 case evt.RepoInfo != nil:
92 header.MsgType = "#info"
93 obj = evt.RepoInfo
94 default:
95 logger.Warn("unrecognized event kind")
96 return
97 }
98
99 if err := header.MarshalCBOR(wc); err != nil {
100 logger.Error("failed to write header to relay", "err", err)
101 return
102 }
103
104 if err := obj.MarshalCBOR(wc); err != nil {
105 logger.Error("failed to write event to relay", "err", err)
106 return
107 }
108
109 if err := wc.Close(); err != nil {
110 logger.Error("failed to flush-close our event write", "err", err)
111 return
112 }
113 }()
114 }
115
116 // we should tell the relay to request a new crawl at this point if we got disconnected
117 // use a new context since the old one might be cancelled at this point
118 go func() {
119 retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
120 defer retryCancel()
121 if err := s.requestCrawl(retryCtx); err != nil {
122 logger.Error("error requesting crawls", "err", err)
123 }
124 }()
125
126 return nil
127}