this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

firehose reconnect logic

dholms f4462440 9352127a

+61 -22
+60 -16
nexus/firehose.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log/slog" 7 + "math/rand" 7 8 "net/http" 8 9 "net/url" 10 + "time" 9 11 10 12 "github.com/bluesky-social/indigo/events" 11 13 "github.com/bluesky-social/indigo/events/schedulers/parallel" ··· 14 16 15 17 type FirehoseConsumer struct { 16 18 RelayHost string 17 - Cursor int64 18 19 Logger *slog.Logger 19 20 Parallelism int 20 21 Callbacks *events.RepoStreamCallbacks 22 + GetCursor func(ctx context.Context, relayHost string) (int64, error) 21 23 } 22 24 23 25 func (fc *FirehoseConsumer) Run(ctx context.Context) error { 24 - dialer := websocket.DefaultDialer 26 + scheduler := parallel.NewScheduler( 27 + fc.Parallelism, 28 + 100, 29 + fc.RelayHost, 30 + fc.Callbacks.EventHandler, 31 + ) 32 + 25 33 u, err := url.Parse(fc.RelayHost) 26 34 if err != nil { 27 35 return fmt.Errorf("invalid relayHost URI: %w", err) ··· 33 41 u.Scheme = "wss" 34 42 } 35 43 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 36 - if fc.Cursor != 0 { 37 - u.RawQuery = fmt.Sprintf("cursor=%d", fc.Cursor) 44 + 45 + var backoff int 46 + for { 47 + select { 48 + case <-ctx.Done(): 49 + return ctx.Err() 50 + default: 51 + } 52 + 53 + cursor, err := fc.GetCursor(ctx, fc.RelayHost) 54 + if err != nil { 55 + return fmt.Errorf("failed to read cursor: %w", err) 56 + } 57 + 58 + if cursor > 0 { 59 + u.RawQuery = fmt.Sprintf("cursor=%d", cursor) 60 + } 61 + urlStr := u.String() 62 + 63 + fc.Logger.Info("connecting to firehose", "url", urlStr, "cursor", cursor, "backoff", backoff) 64 + 65 + dialer := websocket.DefaultDialer 66 + con, _, err := dialer.DialContext(ctx, urlStr, http.Header{}) 67 + if err != nil { 68 + fc.Logger.Warn("dialing failed", "err", err, "backoff", backoff) 69 + time.Sleep(sleepForBackoff(backoff)) 70 + backoff++ 71 + continue 72 + } 73 + 74 + fc.Logger.Info("connected to firehose") 75 + backoff = 0 76 + 77 + if err := events.HandleRepoStream(ctx, con, scheduler, nil); err != nil { 78 + fc.Logger.Warn("firehose connection failed", "err", err) 79 + } 38 80 } 39 - urlString := u.String() 40 - fc.Logger.Info("subscribing to firehose", "relayHost", fc.RelayHost, "cursor", fc.Cursor) 41 - con, _, err := dialer.Dial(urlString, http.Header{}) 42 - if err != nil { 43 - return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 81 + } 82 + 83 + func sleepForBackoff(b int) time.Duration { 84 + if b == 0 { 85 + return 0 44 86 } 45 87 46 - scheduler := parallel.NewScheduler( 47 - fc.Parallelism, 48 - 100, 49 - fc.RelayHost, 50 - fc.Callbacks.EventHandler, 51 - ) 52 - return events.HandleRepoStream(ctx, con, scheduler, nil) 88 + // exponential, capped at 10s 89 + duration := time.Second * (1 << b) 90 + if duration > time.Second*10 { 91 + duration = time.Second * 10 92 + } 93 + 94 + // Add jitter 95 + jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 96 + return duration + jitter 53 97 }
+1 -6
nexus/nexus.go
··· 84 84 Outbox: n.outbox, 85 85 } 86 86 87 - cursor, err := n.EventProcessor.ReadLastCursor(context.Background(), config.RelayHost) 88 - if err != nil { 89 - return nil, err 90 - } 91 - 92 87 rsc := &events.RepoStreamCallbacks{ 93 88 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 94 89 return n.EventProcessor.ProcessCommit(context.Background(), evt) ··· 106 101 107 102 n.FirehoseConsumer = &FirehoseConsumer{ 108 103 RelayHost: config.RelayHost, 109 - Cursor: cursor, 110 104 Logger: n.logger.With("component", "firehose"), 111 105 Parallelism: parallelism, 112 106 Callbacks: rsc, 107 + GetCursor: n.EventProcessor.ReadLastCursor, 113 108 } 114 109 115 110 // crash recovery: reset any partially repos