1package main
2
3import (
4 "context"
5 "fmt"
6 "net/http"
7 "net/url"
8 "sync/atomic"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/bluesky-social/indigo/events/schedulers/parallel"
14
15 "github.com/bluesky-social/indigo/events"
16 "github.com/carlmjohnson/versioninfo"
17 "github.com/gorilla/websocket"
18 "github.com/redis/go-redis/v9"
19)
20
21var firehoseCursorKey = "bluepages/firehoseSeq"
22
23func (srv *Server) RunFirehoseConsumer(ctx context.Context, host string, parallelism int) error {
24
25 cur, err := srv.ReadLastCursor(ctx)
26 if err != nil {
27 return err
28 }
29
30 dialer := websocket.DefaultDialer
31 u, err := url.Parse(host)
32 if err != nil {
33 return fmt.Errorf("invalid Host URI: %w", err)
34 }
35 u.Path = "xrpc/com.atproto.sync.subscribeRepos"
36 if cur != 0 {
37 u.RawQuery = fmt.Sprintf("cursor=%d", cur)
38 }
39 srv.logger.Info("subscribing to repo event stream", "upstream", host, "cursor", cur)
40 con, _, err := dialer.Dial(u.String(), http.Header{
41 "User-Agent": []string{fmt.Sprintf("bluepages/%s", versioninfo.Short())},
42 })
43 if err != nil {
44 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
45 }
46
47 rsc := &events.RepoStreamCallbacks{
48 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error {
49 atomic.StoreInt64(&srv.lastSeq, evt.Seq)
50 ctx := context.Background()
51 srv.logger.Info("flushing cache due to #identity firehose event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
52
53 did, err := syntax.ParseDID(evt.Did)
54 if err != nil {
55 srv.logger.Warn("invalid DID in #identity event", "did", evt.Did, "seq", evt.Seq, "err", err)
56 return nil
57 }
58 if err := srv.dir.PurgeDID(ctx, did); err != nil {
59 srv.logger.Error("failed to purge DID from cache", "did", evt.Did, "seq", evt.Seq, "err", err)
60 return nil
61 }
62 if evt.Handle == nil {
63 return nil
64 }
65 handle, err := syntax.ParseHandle(*evt.Handle)
66 if err != nil {
67 srv.logger.Warn("invalid handle in #identity event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
68 return nil
69 }
70 if err := srv.dir.PurgeHandle(ctx, handle); err != nil {
71 srv.logger.Error("failed to purge handle from cache", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
72 return nil
73 }
74 return nil
75 },
76 }
77
78 var scheduler events.Scheduler
79 // use a fixed-parallelism scheduler if configured
80 scheduler = parallel.NewScheduler(
81 parallelism,
82 1000,
83 host,
84 rsc.EventHandler,
85 )
86 srv.logger.Info("bluepages firehose scheduler configured", "scheduler", "parallel", "initial", parallelism)
87
88 return events.HandleRepoStream(ctx, con, scheduler, srv.logger)
89}
90
91func (srv *Server) ReadLastCursor(ctx context.Context) (int64, error) {
92 // if redis isn't configured, just skip
93 if srv.redisClient == nil {
94 srv.logger.Info("redis not configured, skipping cursor read")
95 return 0, nil
96 }
97
98 val, err := srv.redisClient.Get(ctx, firehoseCursorKey).Int64()
99 if err == redis.Nil {
100 srv.logger.Info("no pre-existing cursor in redis")
101 return 0, nil
102 } else if err != nil {
103 return 0, err
104 }
105 srv.logger.Info("successfully found prior subscription cursor seq in redis", "seq", val)
106 return val, nil
107}
108
109func (srv *Server) PersistCursor(ctx context.Context) error {
110 // if redis isn't configured, just skip
111 if srv.redisClient == nil {
112 return nil
113 }
114 lastSeq := atomic.LoadInt64(&srv.lastSeq)
115 if lastSeq <= 0 {
116 return nil
117 }
118 err := srv.redisClient.Set(ctx, firehoseCursorKey, lastSeq, 14*24*time.Hour).Err()
119 return err
120}
121
122// this method runs in a loop, persisting the current cursor state every 5 seconds
123func (srv *Server) RunPersistCursor(ctx context.Context) error {
124
125 // if redis isn't configured, just skip
126 if srv.redisClient == nil {
127 return nil
128 }
129 ticker := time.NewTicker(5 * time.Second)
130 for {
131 select {
132 case <-ctx.Done():
133 lastSeq := atomic.LoadInt64(&srv.lastSeq)
134 if lastSeq >= 1 {
135 srv.logger.Info("persisting final cursor seq value", "seq", lastSeq)
136 err := srv.PersistCursor(ctx)
137 if err != nil {
138 srv.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq)
139 }
140 }
141 return nil
142 case <-ticker.C:
143 lastSeq := atomic.LoadInt64(&srv.lastSeq)
144 if lastSeq >= 1 {
145 err := srv.PersistCursor(ctx)
146 if err != nil {
147 srv.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq)
148 }
149 }
150 }
151 }
152}