an app.bsky.* indexer

add flags for backfill workers/consumers

Changed files
+19 -8
cmd
+4 -4
cmd/monarch/backfill.go
··· 2 3 import "github.com/bluesky-social/indigo/backfill" 4 5 - func NewBackfillService(store backfill.Store, h *HandlerService) *backfill.Backfiller { 6 opts := &backfill.BackfillOptions{ 7 - ParallelBackfills: 1, 8 - ParallelRecordCreates: 1, 9 NSIDFilter: "", 10 - SyncRequestsPerSecond: 2, 11 RelayHost: "https://bsky.network", 12 } 13
··· 2 3 import "github.com/bluesky-social/indigo/backfill" 4 5 + func NewBackfillService(store backfill.Store, h *HandlerService, workers int, consumers int) *backfill.Backfiller { 6 opts := &backfill.BackfillOptions{ 7 + ParallelBackfills: workers, 8 + ParallelRecordCreates: consumers, 9 NSIDFilter: "", 10 + SyncRequestsPerSecond: 10, 11 RelayHost: "https://bsky.network", 12 } 13
+15 -4
cmd/monarch/main.go
··· 6 "log/slog" 7 "os" 8 "os/signal" 9 "syscall" 10 "time" 11 ··· 38 } 39 } 40 41 - func (app *App) Start(ctx context.Context) error { 42 slog.Info("starting up") 43 44 app.cursor = NewCursorService(app.state) ··· 46 47 app.handler = NewHandlerService(app.content) 48 49 - app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler) 50 go app.backfill.Start() 51 52 app.census = NewCensusService(app.cursor, app.backfill) ··· 119 }, 120 &cli.IntFlag{ 121 Name: "max-db-connections", 122 - Value: 1, 123 }, 124 } 125 ··· 142 } 143 144 app := NewApp(statedb, contentdb) 145 - if err := app.Start(ctx); err != nil { 146 slog.Error("failed to start backfiller", "err", err) 147 } 148
··· 6 "log/slog" 7 "os" 8 "os/signal" 9 + "runtime" 10 "syscall" 11 "time" 12 ··· 39 } 40 } 41 42 + func (app *App) Start(ctx context.Context, cctx *cli.Context) error { 43 slog.Info("starting up") 44 45 app.cursor = NewCursorService(app.state) ··· 47 48 app.handler = NewHandlerService(app.content) 49 50 + workers := cctx.Int("backfill-workers") 51 + consumers := cctx.Int("backfill-consumers") 52 + app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler, workers, consumers) 53 go app.backfill.Start() 54 55 app.census = NewCensusService(app.cursor, app.backfill) ··· 122 }, 123 &cli.IntFlag{ 124 Name: "max-db-connections", 125 + Value: runtime.NumCPU(), 126 + }, 127 + &cli.IntFlag{ 128 + Name: "backfill-workers", 129 + Value: 50, 130 + }, 131 + &cli.IntFlag{ 132 + Name: "backfill-consumers", 133 + Value: 100, 134 }, 135 } 136 ··· 153 } 154 155 app := NewApp(statedb, contentdb) 156 + if err := app.Start(ctx, cctx); err != nil { 157 slog.Error("failed to start backfiller", "err", err) 158 } 159