forked from tangled.org/core
Monorepo for Tangled

eventconsumer: rework retry mechanism

the previous retry mechanism had a slight flaw: successful connections
did not reset the exponent on the retry interval. this results in
constantly growing retry intervals:

attempt #1 - wait 5s
attempt #2 - wait 10s
attempt #3 - success!
.
.
.
disconnect
attempt #4 - wait 20s

what we want to see however, is a pattern like so:

attempt #1 - wait 5s
attempt #2 - wait 10s
attempt #3 - success!
.
.
.
disconnect
attempt #1 - wait 5s

this is solved by slapping the retry logic around DialConnection, which
is a more atomic point of connection attempt. retry logic is also
offloaded to the github.com/avast-go/retry package

Signed-off-by: oppiliappan <me@oppi.li>

authored by oppi.li and committed by Tangled 037bdc4c dea92276

Changed files
+35 -25
eventconsumer
+31 -24
eventconsumer/consumer.go
··· 12 12 "tangled.sh/tangled.sh/core/eventconsumer/cursor" 13 13 "tangled.sh/tangled.sh/core/log" 14 14 15 + "github.com/avast/retry-go/v4" 15 16 "github.com/gorilla/websocket" 16 17 ) 17 18 ··· 170 171 171 172 func (c *Consumer) startConnectionLoop(ctx context.Context, source Source) { 172 173 defer c.wg.Done() 173 - retryInterval := c.cfg.RetryInterval 174 + 174 175 for { 175 176 select { 176 177 case <-ctx.Done(): ··· 178 179 default: 179 180 err := c.runConnection(ctx, source) 180 181 if err != nil { 181 - c.logger.Error("connection failed", "source", source, "err", err) 182 - } 183 - 184 - // apply jitter 185 - jitter := time.Duration(c.randSource.Int63n(int64(retryInterval) / 5)) 186 - delay := retryInterval + jitter 187 - 188 - if retryInterval < c.cfg.MaxRetryInterval { 189 - retryInterval *= 2 190 - if retryInterval > c.cfg.MaxRetryInterval { 191 - retryInterval = c.cfg.MaxRetryInterval 192 - } 193 - } 194 - c.logger.Info("retrying connection", "source", source, "delay", delay) 195 - select { 196 - case <-time.After(delay): 197 - case <-ctx.Done(): 198 - return 182 + c.logger.Error("failed to run connection", "err", err) 199 183 } 200 184 } 201 185 } 202 186 } 203 187 204 188 func (c *Consumer) runConnection(ctx context.Context, source Source) error { 205 - connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 206 - defer cancel() 207 - 208 189 cursor := c.cfg.CursorStore.Get(source.Key()) 209 190 210 191 u, err := source.Url(cursor, c.cfg.Dev) ··· 213 194 } 214 195 215 196 c.logger.Info("connecting", "url", u.String()) 216 - conn, _, err := c.dialer.DialContext(connCtx, u.String(), nil) 197 + 198 + retryOpts := []retry.Option{ 199 + retry.Attempts(0), // infinite attempts 200 + retry.DelayType(retry.BackOffDelay), 201 + retry.Delay(c.cfg.RetryInterval), 202 + retry.MaxDelay(c.cfg.MaxRetryInterval), 203 + retry.MaxJitter(c.cfg.RetryInterval / 5), 204 + retry.OnRetry(func(n uint, err error) { 205 + c.logger.Info("retrying connection", 206 + "source", source, 207 + "url", u.String(), 208 + "attempt", n+1, 209 + "err", err, 210 + ) 211 + }), 212 + retry.Context(ctx), 213 + } 214 + 215 + var conn *websocket.Conn 216 + 217 + err = retry.Do(func() error { 218 + connCtx, cancel := context.WithTimeout(ctx, c.cfg.ConnectionTimeout) 219 + defer cancel() 220 + conn, _, err = c.dialer.DialContext(connCtx, u.String(), nil) 221 + return err 222 + }, retryOpts...) 217 223 if err != nil { 218 224 return err 219 225 } 226 + 227 + c.connMap.Store(source, conn) 220 228 defer conn.Close() 221 - c.connMap.Store(source, conn) 222 229 defer c.connMap.Delete(source) 223 230 224 231 c.logger.Info("connected", "source", source)
+1 -1
flake.nix
··· 61 61 inherit (gitignore.lib) gitignoreSource; 62 62 in { 63 63 overlays.default = final: prev: let 64 - goModHash = "sha256-2RUwj16RNaZ/gCOcd7b3LRCHiROCRj9HuzbBdLdgWGo="; 64 + goModHash = "sha256-SLi+nALwCd/Lzn3aljwPqCo2UaM9hl/4OAjcHQLt2Bk="; 65 65 appviewDeps = { 66 66 inherit htmx-src htmx-ws-src lucide-src inter-fonts-src ibm-plex-mono-src goModHash gitignoreSource; 67 67 };
+1
go.mod
··· 49 49 github.com/Microsoft/go-winio v0.6.2 // indirect 50 50 github.com/ProtonMail/go-crypto v1.2.0 // indirect 51 51 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect 52 + github.com/avast/retry-go/v4 v4.6.1 // indirect 52 53 github.com/aymerick/douceur v0.2.0 // indirect 53 54 github.com/beorn7/perks v1.0.1 // indirect 54 55 github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect
+2
go.sum
··· 17 17 github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= 18 18 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio= 19 19 github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= 20 + github.com/avast/retry-go/v4 v4.6.1 h1:VkOLRubHdisGrHnTu89g08aQEWEgRU7LVEop3GbIcMk= 21 + github.com/avast/retry-go/v4 v4.6.1/go.mod h1:V6oF8njAwxJ5gRo1Q7Cxab24xs5NCWZBeaHHBklR8mA= 20 22 github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk= 21 23 github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4= 22 24 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=