+11
-3
bus/firehose/server.go
+11
-3
bus/firehose/server.go
···
113
u.RawQuery = fmt.Sprintf("cursor=%d", *cursor)
114
}
115
116
-
// run the consumer in a goroutine and
117
shutdownConsumer := make(chan struct{}, 1)
118
consumerShutdown := make(chan struct{}, 1)
119
-
120
go func() {
121
logger := kf.logger.With("component", "consumer")
122
···
141
go func() {
142
if err := events.HandleRepoStream(ctx, conn, scheduler, logger); err != nil {
143
logger.Error("error handling repo stream", "err", err)
144
}
145
}()
146
147
-
<-shutdownConsumer
148
149
if err := conn.Close(); err != nil {
150
logger.Error("error closing websocket", "err", err)
···
113
u.RawQuery = fmt.Sprintf("cursor=%d", *cursor)
114
}
115
116
shutdownConsumer := make(chan struct{}, 1)
117
consumerShutdown := make(chan struct{}, 1)
118
+
consumerErr := make(chan error, 1)
119
go func() {
120
logger := kf.logger.With("component", "consumer")
121
···
140
go func() {
141
if err := events.HandleRepoStream(ctx, conn, scheduler, logger); err != nil {
142
logger.Error("error handling repo stream", "err", err)
143
+
consumerErr <- err
144
+
return
145
}
146
+
consumerErr <- nil
147
}()
148
149
+
select {
150
+
case <-shutdownConsumer:
151
+
case err := <-consumerErr:
152
+
if err != nil {
153
+
logger.Error("consumer encountered an error", "err", err)
154
+
}
155
+
}
156
157
if err := conn.Close(); err != nil {
158
logger.Error("error closing websocket", "err", err)