+20
cmd/monarch/handlers.go
+20
cmd/monarch/handlers.go
···
5
5
"context"
6
6
"encoding/json"
7
7
"fmt"
8
+
"log/slog"
8
9
9
10
appbsky "github.com/bluesky-social/indigo/api/bsky"
10
11
"github.com/bluesky-social/indigo/atproto/syntax"
···
75
76
var out appbsky.LabelerService
76
77
out.UnmarshalCBOR(bytes.NewReader(*rec))
77
78
body, err = json.Marshal(out)
79
+
80
+
case syntax.NSID("app.bsky.graph.list"):
81
+
var out appbsky.GraphList
82
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
83
+
body, err = json.Marshal(out)
84
+
85
+
case syntax.NSID("app.bsky.graph.verification"):
86
+
var out appbsky.GraphVerification
87
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
88
+
body, err = json.Marshal(out)
89
+
90
+
case syntax.NSID("app.bsky.graph.starterpack"):
91
+
var out appbsky.GraphStarterpack
92
+
out.UnmarshalCBOR(bytes.NewReader(*rec))
93
+
body, err = json.Marshal(out)
94
+
95
+
default:
96
+
slog.Error("tracked collection missing handler", "collection", uri.Collection())
97
+
return nil
78
98
}
79
99
80
100
switch action {
+1
-1
cmd/monarch/backfill.go
+1
-1
cmd/monarch/backfill.go
···
10
10
ParallelBackfills: cctx.Int("backfill-workers"),
11
11
ParallelRecordCreates: cctx.Int("backfill-consumers"),
12
12
NSIDFilter: "",
13
-
SyncRequestsPerSecond: 10,
13
+
SyncRequestsPerSecond: cctx.Int("sync-requests-limit"),
14
14
RelayHost: "https://" + cctx.String("relay-host"),
15
15
}
16
16
+1
-4
cmd/monarch/cursors.go
+1
-4
cmd/monarch/cursors.go
+3
-1
cmd/monarch/firehose.go
+3
-1
cmd/monarch/firehose.go
···
13
13
func NewFirehoseConnection(ctx context.Context, cctx *cli.Context, cursorSvc *CursorService) (*websocket.Conn, error) {
14
14
url := fmt.Sprintf("wss://%s/xrpc/com.atproto.sync.subscribeRepos", cctx.String("relay-host"))
15
15
curs, err := cursorSvc.GetFirehoseCursor()
16
-
if err == nil { // reversed
16
+
if err != nil {
17
+
slog.Error("error getting firehose cursor", "err", err)
18
+
} else if curs > 0 {
17
19
url += fmt.Sprintf("?cursor=%d", curs)
18
20
}
19
21