+12
-1
eventconsumer/consumer.go
+12
-1
eventconsumer/consumer.go
···
172
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
173
defer c.wg.Done()
174
175
for {
176
select {
177
case <-ctx.Done():
178
return
179
-
default:
180
err := c.runConnection(ctx, source)
181
if err != nil {
182
c.logger.Error("failed to run connection", "err", err)
183
}
184
}
185
}
186
}
···
172
func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) {
173
defer c.wg.Done()
174
175
+
// attempt connection initially
176
+
err := c.runConnection(ctx, source)
177
+
if err != nil {
178
+
c.logger.Error("failed to run connection", "err", err)
179
+
}
180
+
181
+
timer := time.NewTimer(1 * time.Minute)
182
+
defer timer.Stop()
183
+
184
+
// every subsequent attempt is delayed by 1 minute
185
for {
186
select {
187
case <-ctx.Done():
188
return
189
+
case <-timer.C:
190
err := c.runConnection(ctx, source)
191
if err != nil {
192
c.logger.Error("failed to run connection", "err", err)
193
}
194
+
timer.Reset(1 * time.Minute)
195
}
196
}
197
}