an app.bsky.* indexer

cleanup

Changed files
+23 -50
cmd
+4 -3
cmd/monarch/census.go
··· 161 } 162 } 163 164 cs.seenLk.Lock() 165 defer cs.seenLk.Unlock() 166 cs.seenHosts[host] = true 167 - 168 - slog.Info("finished listing repos", "host", host) 169 } 170 171 func (cs *CensusService) Start(ctx context.Context, cctx *cli.Context) { ··· 173 cs.listHosts(ctx, cctx) 174 175 slog.Info("finished with initial refresh, starting ticker") 176 - t := time.NewTicker(time.Hour) 177 defer t.Stop() 178 179 for {
··· 161 } 162 } 163 164 + slog.Info("finished listing repos", "host", host) 165 + 166 cs.seenLk.Lock() 167 defer cs.seenLk.Unlock() 168 + 169 cs.seenHosts[host] = true 170 } 171 172 func (cs *CensusService) Start(ctx context.Context, cctx *cli.Context) { ··· 174 cs.listHosts(ctx, cctx) 175 176 slog.Info("finished with initial refresh, starting ticker") 177 + t := time.NewTicker(time.Minute * 5) 178 defer t.Stop() 179 180 for {
+1 -6
cmd/monarch/cursors.go
··· 33 select { 34 case <-ctx.Done(): 35 slog.Info("stopping cursor checkpointer", "err", ctx.Err()) 36 - 37 - slog.Info("persisting firehose cursor before exit", "seq", cs.firehoseSeq) 38 - if err := cs.PersistFirehoseCursor(); err != nil { 39 - slog.Error("error persisting firehose cursor", "err", err) 40 - } 41 - 42 return 43 case <-t.C: 44 }
··· 33 select { 34 case <-ctx.Done(): 35 slog.Info("stopping cursor checkpointer", "err", ctx.Err()) 36 + cs.PersistFirehoseCursor() // one final save 37 return 38 case <-t.C: 39 }
+18 -41
cmd/monarch/handlers.go
··· 39 } 40 } 41 42 - var validCollections = map[syntax.NSID]bool{ 43 - syntax.NSID("app.bsky.actor.profile"): false, 44 - syntax.NSID("app.bsky.feed.generator"): true, 45 - syntax.NSID("app.bsky.labeler.service"): true, 46 - } 47 - 48 - func jsonEncode(model any, data *[]byte) ([]byte, error) { 49 - var ( 50 - body []byte 51 - err error 52 - ) 53 - 54 - switch model.(type) { 55 - case appbsky.ActorProfile: 56 - var out appbsky.ActorProfile 57 - out.UnmarshalCBOR(bytes.NewReader(*data)) 58 - body, err = json.Marshal(out) 59 - 60 - case appbsky.FeedGenerator: 61 - var out appbsky.FeedGenerator 62 - out.UnmarshalCBOR(bytes.NewReader(*data)) 63 - body, err = json.Marshal(out) 64 - 65 - case appbsky.LabelerService: 66 - var out appbsky.LabelerService 67 - out.UnmarshalCBOR(bytes.NewReader(*data)) 68 - body, err = json.Marshal(out) 69 - } 70 - 71 - return body, err 72 } 73 74 func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid, action Action) error { ··· 77 return fmt.Errorf("error parsing at-uri: %w", err) 78 } 79 80 - valid := validCollections[uri.Collection()] 81 - if !valid { 82 return nil 83 } 84 85 var body []byte 86 switch uri.Collection() { 87 case syntax.NSID("app.bsky.actor.profile"): 88 - if body, err = jsonEncode(appbsky.ActorProfile{}, rec); err != nil { 89 - return fmt.Errorf("error parsing actor profile: %w", err) 90 - } 91 92 case syntax.NSID("app.bsky.feed.generator"): 93 - if body, err = jsonEncode(appbsky.FeedGenerator{}, rec); err != nil { 94 - return fmt.Errorf("error parsing feed generator: %w", err) 95 - } 96 97 case syntax.NSID("app.bsky.labeler.service"): 98 - if body, err = jsonEncode(appbsky.LabelerService{}, rec); err != nil { 99 - return fmt.Errorf("error parsing labeler service: %w", err) 100 - } 101 } 102 103 switch action {
··· 39 } 40 } 41 42 + var trackedCollections = map[syntax.NSID]bool{ 43 + syntax.NSID("app.bsky.actor.profile"): false, 44 + syntax.NSID("app.bsky.feed.generator"): true, 45 + syntax.NSID("app.bsky.labeler.service"): true, 46 + syntax.NSID("app.bsky.graph.list"): true, 47 + syntax.NSID("app.bsky.graph.verification"): true, 48 + syntax.NSID("app.bsky.graph.starterpack"): true, 49 } 50 51 func (hs *HandlerService) HandleUpsert(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid, action Action) error { ··· 54 return fmt.Errorf("error parsing at-uri: %w", err) 55 } 56 57 + tracked := trackedCollections[uri.Collection()] 58 + if !tracked { 59 return nil 60 } 61 62 var body []byte 63 switch uri.Collection() { 64 case syntax.NSID("app.bsky.actor.profile"): 65 + var out appbsky.ActorProfile 66 + out.UnmarshalCBOR(bytes.NewReader(*rec)) 67 + body, err = json.Marshal(out) 68 69 case syntax.NSID("app.bsky.feed.generator"): 70 + var out appbsky.FeedGenerator 71 + out.UnmarshalCBOR(bytes.NewReader(*rec)) 72 + body, err = json.Marshal(out) 73 74 case syntax.NSID("app.bsky.labeler.service"): 75 + var out appbsky.LabelerService 76 + out.UnmarshalCBOR(bytes.NewReader(*rec)) 77 + body, err = json.Marshal(out) 78 } 79 80 switch action {