eventconsumer: rework retry mechanism #273

merged
opened by oppi.li targeting master from push-ktsnmppqsnls

the previous retry mechanism had a slight flaw: successful connections did not reset the exponent on the retry interval. this results in constantly growing retry intervals:

attempt #1 - wait 5s
attempt #2 - wait 10s
attempt #3 - success!
.
.
.
disconnect
attempt #4 - wait 20s

what we want to see however, is a pattern like so:

attempt #1 - wait 5s
attempt #2 - wait 10s
attempt #3 - success!
.
.
.
disconnect
attempt #1 - wait 5s

this is solved by slapping the retry logic around DialConnection, which is a more atomic point of connection attempt. retry logic is also offloaded to the github.com/avast-go/retry package

Signed-off-by: oppiliappan me@oppi.li

Changed files
+35 -25
eventconsumer
+31 -24
eventconsumer/consumer.go
··· 12 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 13 "tangled.sh/tangled.sh/core/log" 14 15 "github.com/gorilla/websocket" 16 ) 17 ··· 170 171 func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { 172 defer c.wg.Done() 173 - retryInterval := c.cfg.RetryInterval 174 for { 175 select { 176 case <-ctx.Done(): ··· 178 default: 179 err := c.runConnection(ctx, source) 180 if err != nil { 181 - c.logger.Error("connection failed", "source", source, "err", err) 182 - } 183 - 184 - // apply jitter 185 - jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5)) 186 - delay := retryInterval + jitter 187 - 188 - if retryInterval < c.cfg.MaxRetryInterval { 189 - retryInterval *= 2 190 - if retryInterval > c.cfg.MaxRetryInterval { 191 - retryInterval = c.cfg.MaxRetryInterval 192 - } 193 - } 194 - c.logger.Info("retrying connection", "source", source, "delay", delay) 195 - select { 196 - case <-time.After(delay): 197 - case <-ctx.Done(): 198 - return 199 } 200 } 201 } 202 } 203 204 func (c *Consumer) runConnection(ctx context.Context, source Source) error { 205 - connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 206 - defer cancel() 207 - 208 cursor := c.cfg.CursorStore.Get(source.Key()) 209 210 u, err := source.Url(cursor, c.cfg.Dev) ··· 213 } 214 215 c.logger.Info("connecting", "url", u.String()) 216 - conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil) 217 if err != nil { 218 return err 219 } 220 - defer conn.Close() 221 c.connMap.Store(source, conn) 222 defer c.connMap.Delete(source) 223 224 c.logger.Info("connected", "source", source)
··· 12 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 13 "tangled.sh/tangled.sh/core/log" 14 15 + "github.com/avast/retry-go/v4" 16 "github.com/gorilla/websocket" 17 ) 18 ··· 171 172 func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { 173 defer c.wg.Done() 174 + 175 for { 176 select { 177 case <-ctx.Done(): ··· 179 default: 180 err := c.runConnection(ctx, source) 181 if err != nil { 182 + c.logger.Error("failed to run connection", "err", err) 183 } 184 } 185 } 186 } 187 188 func (c *Consumer) runConnection(ctx context.Context, source Source) error { 189 cursor := c.cfg.CursorStore.Get(source.Key()) 190 191 u, err := source.Url(cursor, c.cfg.Dev) ··· 194 } 195 196 c.logger.Info("connecting", "url", u.String()) 197 + 198 + retryOpts := []retry.Option{ 199 + retry.Attempts(0), // infinite attempts 200 + retry.DelayType(retry.BackOffDelay), 201 + retry.Delay(c.cfg.RetryInterval), 202 + retry.MaxDelay(c.cfg.MaxRetryInterval), 203 + retry.MaxJitter(c.cfg.RetryInterval / 5), 204 + retry.OnRetry(func(n uint, err error) { 205 + c.logger.Info("retrying connection", 206 + "source", source, 207 + "url", u.String(), 208 + "attempt", n+1, 209 + "err", err, 210 + ) 211 + }), 212 + retry.Context(ctx), 213 + } 214 + 215 + var conn *websocket.Conn 216 + 217 + err = retry.Do(func() error { 218 + connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 219 + defer cancel() 220 + conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil) 221 + return err 222 + }, retryOpts...) 223 if err != nil { 224 return err 225 } 226 + 227 c.connMap.Store(source, conn) 228 + defer conn.Close() 229 defer c.connMap.Delete(source) 230 231 c.logger.Info("connected", "source", source)
+1 -1
flake.nix
··· 61 inherit (gitignore.lib) gitignoreSource; 62 in { 63 overlays.default = final: prev: let 64 - goModHash = "sha256-2RUwj16RNaZ/gCOcd7b3LRCHiROCRj9HuzbBdLdgWGo="; 65 appviewDeps = { 66 inherit htmx-src htmx-ws-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource; 67 };
··· 61 inherit (gitignore.lib) gitignoreSource; 62 in { 63 overlays.default = final: prev: let 64 + goModHash = "sha256-SLi+nALwCd/Lzn3aljwPqCo2UaM9hl/4OAjcHQLt2Bk="; 65 appviewDeps = { 66 inherit htmx-src htmx-ws-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource; 67 };
+1
go.mod
··· 49 github.com/Microsoft/go-winio v0.6.2 // indirect 50 github.com/ProtonMail/go-crypto v1.2.0 // indirect 51 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect 52 github.com/aymerick/douceur v0.2.0 // indirect 53 github.com/beorn7/perks v1.0.1 // indirect 54 github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect
··· 49 github.com/Microsoft/go-winio v0.6.2 // indirect 50 github.com/ProtonMail/go-crypto v1.2.0 // indirect 51 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect 52 + github.com/avast/retry-go/v4 v4.6.1 // indirect 53 github.com/aymerick/douceur v0.2.0 // indirect 54 github.com/beorn7/perks v1.0.1 // indirect 55 github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect
+2
go.sum
··· 17 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= 18 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= 19 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= 20 github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= 21 github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= 22 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
··· 17 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= 18 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= 19 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= 20 + github.com/avast/retry-go/v4 v4.6.1 h1:VkOLRubHdisGrHnTu89g08aQEWEgRU7LVEop3GbIcMk= 21 + github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA= 22 github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= 23 github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= 24 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=