fork of indigo with slightly nicer lexgen
at main 4.4 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "net/http" 7 "net/url" 8 "sync/atomic" 9 "time" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/bluesky-social/indigo/events/schedulers/parallel" 14 15 "github.com/bluesky-social/indigo/events" 16 "github.com/carlmjohnson/versioninfo" 17 "github.com/gorilla/websocket" 18 "github.com/redis/go-redis/v9" 19) 20 21var firehoseCursorKey = "bluepages/firehoseSeq" 22 23func (srv *Server) RunFirehoseConsumer(ctx context.Context, host string, parallelism int) error { 24 25 cur, err := srv.ReadLastCursor(ctx) 26 if err != nil { 27 return err 28 } 29 30 dialer := websocket.DefaultDialer 31 u, err := url.Parse(host) 32 if err != nil { 33 return fmt.Errorf("invalid Host URI: %w", err) 34 } 35 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 36 if cur != 0 { 37 u.RawQuery = fmt.Sprintf("cursor=%d", cur) 38 } 39 srv.logger.Info("subscribing to repo event stream", "upstream", host, "cursor", cur) 40 con, _, err := dialer.Dial(u.String(), http.Header{ 41 "User-Agent": []string{fmt.Sprintf("bluepages/%s", versioninfo.Short())}, 42 }) 43 if err != nil { 44 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 45 } 46 47 rsc := &events.RepoStreamCallbacks{ 48 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 49 atomic.StoreInt64(&srv.lastSeq, evt.Seq) 50 ctx := context.Background() 51 srv.logger.Info("flushing cache due to #identity firehose event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 52 53 did, err := syntax.ParseDID(evt.Did) 54 if err != nil { 55 srv.logger.Warn("invalid DID in #identity event", "did", evt.Did, "seq", evt.Seq, "err", err) 56 return nil 57 } 58 if err := srv.dir.PurgeDID(ctx, did); err != nil { 59 srv.logger.Error("failed to purge DID from cache", "did", evt.Did, "seq", evt.Seq, "err", err) 60 return nil 61 } 62 if evt.Handle == nil { 63 return nil 64 } 65 handle, err := syntax.ParseHandle(*evt.Handle) 66 if err != nil { 67 srv.logger.Warn("invalid handle in #identity event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 68 return nil 69 } 70 if err := srv.dir.PurgeHandle(ctx, handle); err != nil { 71 srv.logger.Error("failed to purge handle from cache", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err) 72 return nil 73 } 74 return nil 75 }, 76 } 77 78 var scheduler events.Scheduler 79 // use a fixed-parallelism scheduler if configured 80 scheduler = parallel.NewScheduler( 81 parallelism, 82 1000, 83 host, 84 rsc.EventHandler, 85 ) 86 srv.logger.Info("bluepages firehose scheduler configured", "scheduler", "parallel", "initial", parallelism) 87 88 return events.HandleRepoStream(ctx, con, scheduler, srv.logger) 89} 90 91func (srv *Server) ReadLastCursor(ctx context.Context) (int64, error) { 92 // if redis isn't configured, just skip 93 if srv.redisClient == nil { 94 srv.logger.Info("redis not configured, skipping cursor read") 95 return 0, nil 96 } 97 98 val, err := srv.redisClient.Get(ctx, firehoseCursorKey).Int64() 99 if err == redis.Nil { 100 srv.logger.Info("no pre-existing cursor in redis") 101 return 0, nil 102 } else if err != nil { 103 return 0, err 104 } 105 srv.logger.Info("successfully found prior subscription cursor seq in redis", "seq", val) 106 return val, nil 107} 108 109func (srv *Server) PersistCursor(ctx context.Context) error { 110 // if redis isn't configured, just skip 111 if srv.redisClient == nil { 112 return nil 113 } 114 lastSeq := atomic.LoadInt64(&srv.lastSeq) 115 if lastSeq <= 0 { 116 return nil 117 } 118 err := srv.redisClient.Set(ctx, firehoseCursorKey, lastSeq, 14*24*time.Hour).Err() 119 return err 120} 121 122// this method runs in a loop, persisting the current cursor state every 5 seconds 123func (srv *Server) RunPersistCursor(ctx context.Context) error { 124 125 // if redis isn't configured, just skip 126 if srv.redisClient == nil { 127 return nil 128 } 129 ticker := time.NewTicker(5 * time.Second) 130 for { 131 select { 132 case <-ctx.Done(): 133 lastSeq := atomic.LoadInt64(&srv.lastSeq) 134 if lastSeq >= 1 { 135 srv.logger.Info("persisting final cursor seq value", "seq", lastSeq) 136 err := srv.PersistCursor(ctx) 137 if err != nil { 138 srv.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 139 } 140 } 141 return nil 142 case <-ticker.C: 143 lastSeq := atomic.LoadInt64(&srv.lastSeq) 144 if lastSeq >= 1 { 145 err := srv.PersistCursor(ctx) 146 if err != nil { 147 srv.logger.Error("failed to persist cursor", "err", err, "seq", lastSeq) 148 } 149 } 150 } 151 } 152}