an app.bsky.* indexer

Compare changes

Choose any two refs to compare.

Changed files
+25 -6
cmd
+20
cmd/monarch/handlers.go
··· 5 5 "context" 6 6 "encoding/json" 7 7 "fmt" 8 + "log/slog" 8 9 9 10 appbsky "github.com/bluesky-social/indigo/api/bsky" 10 11 "github.com/bluesky-social/indigo/atproto/syntax" ··· 75 76 var out appbsky.LabelerService 76 77 out.UnmarshalCBOR(bytes.NewReader(*rec)) 77 78 body, err = json.Marshal(out) 79 + 80 + case syntax.NSID("app.bsky.graph.list"): 81 + var out appbsky.GraphList 82 + out.UnmarshalCBOR(bytes.NewReader(*rec)) 83 + body, err = json.Marshal(out) 84 + 85 + case syntax.NSID("app.bsky.graph.verification"): 86 + var out appbsky.GraphVerification 87 + out.UnmarshalCBOR(bytes.NewReader(*rec)) 88 + body, err = json.Marshal(out) 89 + 90 + case syntax.NSID("app.bsky.graph.starterpack"): 91 + var out appbsky.GraphStarterpack 92 + out.UnmarshalCBOR(bytes.NewReader(*rec)) 93 + body, err = json.Marshal(out) 94 + 95 + default: 96 + slog.Error("tracked collection missing handler", "collection", uri.Collection()) 97 + return nil 78 98 } 79 99 80 100 switch action {
+1 -1
cmd/monarch/backfill.go
··· 10 10 ParallelBackfills: cctx.Int("backfill-workers"), 11 11 ParallelRecordCreates: cctx.Int("backfill-consumers"), 12 12 NSIDFilter: "", 13 - SyncRequestsPerSecond: 10, 13 + SyncRequestsPerSecond: cctx.Int("sync-requests-limit"), 14 14 RelayHost: "https://" + cctx.String("relay-host"), 15 15 } 16 16
+1 -4
cmd/monarch/cursors.go
··· 3 3 import ( 4 4 "context" 5 5 "log/slog" 6 - "sync" 7 6 "time" 8 7 9 8 "gorm.io/gorm" 10 9 ) 11 10 12 11 type CursorService struct { 13 - store *gorm.DB 14 - 15 - firehoseLk sync.Mutex 12 + store *gorm.DB 16 13 firehoseSeq int64 17 14 } 18 15
+3 -1
cmd/monarch/firehose.go
··· 13 13 func NewFirehoseConnection(ctx context.Context, cctx *cli.Context, cursorSvc *CursorService) (*websocket.Conn, error) { 14 14 url := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos", cctx.String("relay-host")) 15 15 curs, err := cursorSvc.GetFirehoseCursor() 16 - if err == nil { // reversed 16 + if err != nil { 17 + slog.Error("error getting firehose cursor", "err", err) 18 + } else if curs > 0 { 17 19 url += fmt.Sprintf("?cursor=%d", curs) 18 20 } 19 21