+22
-8
server/handle_sync_subscribe_repos.go
+22
-8
server/handle_sync_subscribe_repos.go
···
12
12
)
13
13
14
14
func (s *Server) handleSyncSubscribeRepos(e echo.Context) error {
15
-
ctx := e.Request().Context()
15
+
ctx, cancel := context.WithCancel(e.Request().Context())
16
+
defer cancel()
17
+
16
18
logger := s.logger.With("component", "subscribe-repos-websocket")
17
19
18
20
conn, err := websocket.Upgrade(e.Response().Writer, e.Request(), e.Response().Header(), 1<<10, 1<<10)
···
30
32
metrics.RelaysConnected.WithLabelValues(ident).Dec()
31
33
}()
32
34
33
-
evts, cancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
35
+
evts, evtManCancel, err := s.evtman.Subscribe(ctx, ident, func(evt *events.XRPCStreamEvent) bool {
34
36
return true
35
37
}, nil)
36
38
if err != nil {
37
39
return err
38
40
}
39
-
defer cancel()
41
+
defer evtManCancel()
42
+
43
+
// drop the connection whenever a subscriber disconnects from the socket, we should get errors
44
+
go func() {
45
+
for {
46
+
if _, _, err := conn.ReadMessage(); err != nil {
47
+
logger.Warn("websocket error", "err", err)
48
+
cancel()
49
+
}
50
+
}
51
+
}()
40
52
41
53
header := events.EventHeader{Op: events.EvtKindMessage}
42
54
for evt := range evts {
···
97
109
98
110
// we should tell the relay to request a new crawl at this point if we got disconnected
99
111
// use a new context since the old one might be cancelled at this point
100
-
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
101
-
defer cancel()
102
-
if err := s.requestCrawl(ctx); err != nil {
103
-
logger.Error("error requesting crawls", "err", err)
104
-
}
112
+
go func() {
113
+
retryCtx, retryCancel := context.WithTimeout(context.Background(), 10*time.Second)
114
+
defer retryCancel()
115
+
if err := s.requestCrawl(retryCtx); err != nil {
116
+
logger.Error("error requesting crawls", "err", err)
117
+
}
118
+
}()
105
119
106
120
return nil
107
121
}