an app.bsky.* indexer

use cursor service in census, fixup firehose cursor saving

Changed files
+50 -34
cmd
+10 -29
cmd/monarch/census.go
··· 11 11 "github.com/bluesky-social/indigo/xrpc" 12 12 "github.com/urfave/cli/v2" 13 13 "golang.org/x/sync/semaphore" 14 - "gorm.io/gorm" 15 14 ) 16 15 17 16 type CensusService struct { 18 - store *gorm.DB 17 + cursor *CursorService 19 18 backfill *backfill.Backfiller 20 19 21 20 storeLk sync.Mutex ··· 25 24 GetOrCreateJob(context.Context, string, string) (backfill.Job, error) 26 25 } 27 26 28 - func NewCensusService(store *gorm.DB, backfillSvc *backfill.Backfiller) *CensusService { 27 + func NewCensusService(cursorSvc *CursorService, backfillSvc *backfill.Backfiller) *CensusService { 29 28 return &CensusService{ 30 - store: store, 29 + cursor: cursorSvc, 31 30 backfill: backfillSvc, 32 31 } 33 32 } 34 33 35 - type relayCursor struct { 36 - ID int `gorm:"primaryKey"` 37 - Host string 38 - Cursor string 39 - } 40 - 41 - type hostCursor struct { 42 - ID int `gorm:"primaryKey"` 43 - Host string 44 - Cursor string 45 - } 46 - 47 34 // fetch the PDSes known to the relay 48 35 func (cs *CensusService) listHosts(ctx context.Context, cctx *cli.Context) { 49 36 relay := cctx.String("relay-host") ··· 53 40 Host: "https://" + relay, 54 41 } 55 42 56 - var rcur relayCursor 57 - if err := cs.store.Where("host = ?", relay).Attrs(relayCursor{ 58 - Host: relay, 59 - Cursor: "", 60 - }).FirstOrCreate(&rcur).Error; err != nil { 43 + hcur, err := cs.cursor.GetHostCursor(relay) 44 + if err != nil { 61 45 slog.Error("error fetching relay cursor", "err", err) 62 46 } 63 47 64 48 var wg sync.WaitGroup 65 49 sem := semaphore.NewWeighted(maxCrawlers) 66 50 67 - curs := rcur.Cursor 51 + curs := hcur.Cursor 68 52 for { 69 53 select { 70 54 case <-ctx.Done(): ··· 93 77 94 78 if res.Cursor != nil && *res.Cursor != "" { 95 79 curs = *res.Cursor 96 - if err := cs.store.Model(&rcur).Update("cursor", curs).Error; err != nil { 80 + if err := cs.cursor.SetHostCursor(relay, curs); err != nil { 97 81 slog.Error("error updating cursor for relay", "err", err) 98 82 } 99 83 } else { ··· 118 102 } 119 103 120 104 cs.storeLk.Lock() 121 - var hcur hostCursor 122 - if err := cs.store.Where("host = ?", host).Attrs(hostCursor{ 123 - Host: host, 124 - Cursor: "", 125 - }).FirstOrCreate(&hcur).Error; err != nil { 105 + hcur, err := cs.cursor.GetHostCursor(host) 106 + if err != nil { 126 107 slog.Error("error fetching host cursor", "err", err) 127 108 } 128 109 cs.storeLk.Unlock() ··· 158 139 if res.Cursor != nil && *res.Cursor != "" { 159 140 curs = *res.Cursor 160 141 cs.storeLk.Lock() 161 - if err := cs.store.Model(&hcur).Update("cursor", curs).Error; err != nil { 142 + if err := cs.cursor.SetHostCursor(host, curs); err != nil { 162 143 slog.Error("error updating cursor for host", "err", err) 163 144 } 164 145 cs.storeLk.Unlock()
-1
cmd/monarch/cursors.go
··· 18 18 19 19 func NewCursorService(store *gorm.DB) *CursorService { 20 20 store.AutoMigrate(&firehoseCursor{}) 21 - store.AutoMigrate(&relayCursor{}) 22 21 store.AutoMigrate(&hostCursor{}) 23 22 24 23 return &CursorService{
+7 -3
cmd/monarch/firehose.go
··· 37 37 defer cs.firehoseLk.Unlock() 38 38 39 39 var fcur firehoseCursor 40 - if err := cs.store.Where("key = ?", "firehose").Attrs(firehoseCursor{Val: cs.firehoseSeq}).FirstOrCreate(&fcur).Error; err != nil { 40 + if err := cs.store.Where(firehoseCursor{ 41 + Key: "firehose", 42 + }).Attrs(firehoseCursor{ 43 + Key: "firehose", 44 + Val: 0, 45 + }).FirstOrCreate(&fcur).Error; err != nil { 41 46 return 0, fmt.Errorf("error getting firehose seq from DB: %w", err) 42 47 } 43 48 return fcur.Val, nil ··· 54 59 cs.firehoseLk.Lock() 55 60 defer cs.firehoseLk.Unlock() 56 61 57 - var fcur firehoseCursor 58 - if err := cs.store.Where("key = ?", "firehose").Assign(firehoseCursor{Val: cs.firehoseSeq}).FirstOrCreate(&fcur).Error; err != nil { 62 + if err := cs.store.Where(firehoseCursor{Key: "firehose"}).Update("val", cs.firehoseSeq).Error; err != nil { 59 63 return fmt.Errorf("error persisting firehose seq: %w", err) 60 64 } 61 65 return nil
+32
cmd/monarch/hosts.go
··· 1 + package main 2 + 3 + type hostCursor struct { 4 + ID int `gorm:"primaryKey"` 5 + Host string 6 + Cursor string 7 + } 8 + 9 + func (cs *CursorService) GetHostCursor(host string) (*hostCursor, error) { 10 + var out hostCursor 11 + 12 + if err := cs.store.Where(hostCursor{ 13 + Host: host, 14 + }).Attrs(hostCursor{ 15 + Host: host, 16 + Cursor: "", 17 + }).FirstOrCreate(&out).Error; err != nil { 18 + return nil, err 19 + } 20 + 21 + return &out, nil 22 + } 23 + 24 + func (cs *CursorService) SetHostCursor(host string, cursor string) error { 25 + if err := cs.store.Where(hostCursor{ 26 + Host: host, 27 + }).Update("cursor", cursor).Error; err != nil { 28 + return err 29 + } 30 + 31 + return nil 32 + }
+1 -1
cmd/monarch/main.go
··· 49 49 app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler, cctx) 50 50 go app.backfill.Start() 51 51 52 - app.census = NewCensusService(app.state, app.backfill) 52 + app.census = NewCensusService(app.cursor, app.backfill) 53 53 go app.census.Start(ctx, cctx) 54 54 55 55 wsconn, err := NewFirehoseConnection(ctx, cctx, app.cursor)