an app.bsky.* indexer

Misc updates

- Reduce number of parallel backfills
- Reduce logging
- Add ActorProfile Labels, StarterPacks, and PinnedPost
- Use .Assign() and .FirstOrCreate() instead of .Save()
- Use local fork of indigo for now
- Move ActorProfile and friends to models/actor_profile.go
- Add models/strongref.go, Use StrongRef as anonymous struct embed

+2 -2
cmd/monarch/backfill.go
··· 4 4 5 5 func NewBackfillService(store backfill.Store, h *HandlerService) *backfill.Backfiller { 6 6 opts := &backfill.BackfillOptions{ 7 - ParallelBackfills: 10, 7 + ParallelBackfills: 1, 8 8 ParallelRecordCreates: 1, 9 9 NSIDFilter: "", 10 - SyncRequestsPerSecond: 5, 10 + SyncRequestsPerSecond: 2, 11 11 RelayHost: "https://bsky.network", 12 12 } 13 13
-1
cmd/monarch/census.go
··· 45 45 default: 46 46 } 47 47 48 - slog.Info("listing repos", "cursor", curs) 49 48 res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000) 50 49 if err != nil { 51 50 slog.Error("error listing repos", "err", err)
-1
cmd/monarch/cursors.go
··· 93 93 case <-t.C: 94 94 } 95 95 96 - slog.Info("flushing cursors") 97 96 if err := cs.Flush(); err != nil { 98 97 slog.Error("error flushing cursors", "err", err) 99 98 return
+11 -3
cmd/monarch/handlers.go
··· 15 15 } 16 16 17 17 func NewHandlerService(store *gorm.DB) *HandlerService { 18 - store.AutoMigrate(&models.ActorProfile{}) 18 + migrations := []any{ 19 + &models.ActorProfile{}, 20 + &models.ActorProfile_Label{}, 21 + &models.ActorProfile_JoinedViaStarterPack{}, 22 + &models.ActorProfile_PinnedPost{}, 23 + } 24 + for _, migration := range migrations { 25 + store.AutoMigrate(migration) 26 + } 19 27 20 28 return &HandlerService{ 21 29 store: store, ··· 32 40 switch uri.Collection() { 33 41 case syntax.NSID("app.bsky.actor.profile"): 34 42 profile := models.NewActorProfile(uri, *rec) 35 - if err := hs.store.Save(profile).Error; err != nil { 36 - return fmt.Errorf("error saving profile: %w", err) 43 + if err := hs.store.Where(models.ActorProfile{ID: string(uri)}).Assign(profile).FirstOrCreate(&models.ActorProfile{}).Error; err != nil { 44 + return fmt.Errorf("error upserting profile: %w", err) 37 45 } 38 46 } 39 47
+3 -1
cmd/monarch/main.go
··· 39 39 } 40 40 41 41 func (app *App) Start(ctx context.Context) error { 42 + slog.Info("starting up") 43 + 42 44 app.cursor = NewCursorService(app.state) 43 45 go app.cursor.CheckpointCursors(ctx) 44 46 ··· 65 67 // TODO identity 66 68 } 67 69 68 - sched := parallel.NewScheduler(4, 50, "firehose", rsc.EventHandler) 70 + sched := parallel.NewScheduler(1, 50, "firehose", rsc.EventHandler) 69 71 70 72 if err := events.HandleRepoStream(ctx, app.wsconn, sched, nil); err != nil { 71 73 return fmt.Errorf("error starting repo stream handler: %w", err)
+2
go.mod
··· 96 96 gorm.io/driver/postgres v1.5.7 // indirect 97 97 lukechampine.com/blake3 v1.2.1 // indirect 98 98 ) 99 + 100 + replace github.com/bluesky-social/indigo => ../../../bluesky-social/indigo
+87
models/actor_profile.go
··· 1 + package models 2 + 3 + import ( 4 + "bytes" 5 + "log/slog" 6 + "time" 7 + 8 + appbsky "github.com/bluesky-social/indigo/api/bsky" 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + ) 11 + 12 + type ActorProfile struct { 13 + ID string `gorm:"primaryKey"` 14 + 15 + // Avatar 16 + // Banner 17 + CreatedAt *string 18 + Description *string 19 + DisplayName *string 20 + JoinedViaStarterPack []ActorProfile_JoinedViaStarterPack 21 + Labels []ActorProfile_Label 22 + PinnedPost []ActorProfile_PinnedPost 23 + 24 + AutoCreatedAt time.Time `gorm:"autoCreateTime"` 25 + AutoUpdatedAt time.Time `gorm:"autoUpdateTime"` 26 + } 27 + 28 + type ActorProfile_Label struct { 29 + ActorProfileID string 30 + Value string 31 + } 32 + 33 + type ActorProfile_JoinedViaStarterPack struct { 34 + ActorProfileID string 35 + StrongRef 36 + } 37 + 38 + type ActorProfile_PinnedPost struct { 39 + ActorProfileID string 40 + StrongRef 41 + } 42 + 43 + func NewActorProfile(uri syntax.ATURI, rec []byte) *ActorProfile { 44 + var out appbsky.ActorProfile 45 + if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil { 46 + slog.Error("could not unmarshal profile CBOR", "err", err) 47 + return nil 48 + } 49 + 50 + profile := ActorProfile{ 51 + ID: string(uri), 52 + CreatedAt: out.CreatedAt, 53 + Description: out.Description, 54 + DisplayName: out.DisplayName, 55 + } 56 + 57 + if out.JoinedViaStarterPack != nil { 58 + profile.JoinedViaStarterPack = append(profile.JoinedViaStarterPack, ActorProfile_JoinedViaStarterPack{ 59 + ActorProfileID: profile.ID, 60 + StrongRef: StrongRef{ 61 + Uri: out.JoinedViaStarterPack.Uri, 62 + Cid: out.JoinedViaStarterPack.Cid, 63 + }, 64 + }) 65 + } 66 + 67 + if out.PinnedPost != nil { 68 + profile.PinnedPost = append(profile.PinnedPost, ActorProfile_PinnedPost{ 69 + ActorProfileID: profile.ID, 70 + StrongRef: StrongRef{ 71 + Uri: out.PinnedPost.Uri, 72 + Cid: out.PinnedPost.Cid, 73 + }, 74 + }) 75 + } 76 + 77 + if out.Labels != nil && out.Labels.LabelDefs_SelfLabels != nil && out.Labels.LabelDefs_SelfLabels.Values != nil { 78 + for _, label := range out.Labels.LabelDefs_SelfLabels.Values { 79 + profile.Labels = append(profile.Labels, ActorProfile_Label{ 80 + ActorProfileID: profile.ID, 81 + Value: label.Val, 82 + }) 83 + } 84 + } 85 + 86 + return &profile 87 + }
-54
models/models.go
··· 1 1 package models 2 2 3 - import ( 4 - "bytes" 5 - "log/slog" 6 - "time" 7 - 8 - appbsky "github.com/bluesky-social/indigo/api/bsky" 9 - "github.com/bluesky-social/indigo/atproto/syntax" 10 - ) 11 - 12 - type StrongRef struct { 13 - Uri string 14 - Cid string 15 - } 16 - 17 - type ActorProfile struct { 18 - ID string `gorm:"primaryKey"` 19 - 20 - // Avatar 21 - // Banner 22 - CreatedAt *string 23 - Description *string 24 - DisplayName *string 25 - JoinedViaStarterPack StrongRef `gorm:"embedded;embeddedPrefix:joinedviastarterpack_"` 26 - // Labels 27 - // PinnedPost StrongRef 28 - 29 - AutoCreatedAt time.Time `gorm:"autoCreateTime"` 30 - AutoUpdatedAt time.Time `gorm:"autoUpdateTime"` 31 - } 32 - 33 - func NewActorProfile(uri syntax.ATURI, rec []byte) *ActorProfile { 34 - var out appbsky.ActorProfile 35 - if err := out.UnmarshalCBOR(bytes.NewReader(rec)); err != nil { 36 - slog.Error("could not unmarshal profile CBOR", "err", err) 37 - return nil 38 - } 39 - 40 - profile := ActorProfile{ 41 - ID: string(uri), 42 - CreatedAt: out.CreatedAt, 43 - Description: out.Description, 44 - DisplayName: out.DisplayName, 45 - } 46 - 47 - if out.JoinedViaStarterPack != nil { 48 - profile.JoinedViaStarterPack = StrongRef{ 49 - Uri: out.JoinedViaStarterPack.Uri, 50 - Cid: out.JoinedViaStarterPack.Cid, 51 - } 52 - } 53 - 54 - return &profile 55 - } 56 - 57 3 // ActorStatus 58 4 // FeedGenerator 59 5 // FeedLike
+6
models/strongref.go
··· 1 + package models 2 + 3 + type StrongRef struct { 4 + Uri string 5 + Cid string 6 + }