1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "net/http"
10 "os"
11 "os/signal"
12 "strconv"
13 "strings"
14 "sync"
15 "syscall"
16 "time"
17
18 "github.com/bluesky-social/indigo/api/atproto"
19 comatproto "github.com/bluesky-social/indigo/api/atproto"
20 "github.com/bluesky-social/indigo/api/bsky"
21 "github.com/bluesky-social/indigo/atproto/identity"
22 "github.com/bluesky-social/indigo/atproto/syntax"
23 "github.com/bluesky-social/indigo/did"
24 "github.com/bluesky-social/indigo/events"
25 "github.com/bluesky-social/indigo/events/schedulers/sequential"
26 lexutil "github.com/bluesky-social/indigo/lex/util"
27 "github.com/bluesky-social/indigo/repo"
28 "github.com/bluesky-social/indigo/repomgr"
29 "github.com/bluesky-social/indigo/util"
30 "github.com/bluesky-social/indigo/util/cliutil"
31 "github.com/bluesky-social/indigo/xrpc"
32
33 "github.com/gorilla/websocket"
34 "github.com/ipfs/go-cid"
35 "github.com/ipfs/go-libipfs/blocks"
36 "github.com/ipld/go-car/v2"
37 cli "github.com/urfave/cli/v2"
38)
39
40var debugCmd = &cli.Command{
41 Name: "debug",
42 Usage: "a set of debugging utilities for atproto",
43 Subcommands: []*cli.Command{
44 inspectEventCmd,
45 debugStreamCmd,
46 debugFeedGenCmd,
47 debugFeedViewCmd,
48 compareStreamsCmd,
49 debugGetRepoCmd,
50 debugCompareReposCmd,
51 },
52}
53
54var inspectEventCmd = &cli.Command{
55 Name: "inspect-event",
56 Flags: []cli.Flag{
57 &cli.StringFlag{
58 Name: "host",
59 Required: true,
60 },
61 &cli.BoolFlag{
62 Name: "dump-raw-blocks",
63 },
64 },
65 ArgsUsage: `<cursor>`,
66 Action: func(cctx *cli.Context) error {
67 n, err := strconv.Atoi(cctx.Args().First())
68 if err != nil {
69 return err
70 }
71
72 h := cctx.String("host")
73
74 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n-1)
75 d := websocket.DefaultDialer
76 con, _, err := d.Dial(url, http.Header{})
77 if err != nil {
78 return fmt.Errorf("dial failure: %w", err)
79 }
80
81 var errFoundIt = fmt.Errorf("gotem")
82
83 var match *comatproto.SyncSubscribeRepos_Commit
84
85 ctx := context.TODO()
86 rsc := &events.RepoStreamCallbacks{
87 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
88 n := int64(n)
89 if evt.Seq == n {
90 match = evt
91 return errFoundIt
92 }
93 if evt.Seq > n {
94 return fmt.Errorf("record not found in stream")
95 }
96
97 return nil
98 },
99 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
100 return nil
101 },
102 // TODO: all the other Repo* event types
103 Error: func(evt *events.ErrorFrame) error {
104 return fmt.Errorf("%s: %s", evt.Error, evt.Message)
105 },
106 }
107
108 seqScheduler := sequential.NewScheduler("debug-inspect-event", rsc.EventHandler)
109 err = events.HandleRepoStream(ctx, con, seqScheduler, nil)
110 if err != errFoundIt {
111 return err
112 }
113
114 b, err := json.MarshalIndent(match, "", " ")
115 if err != nil {
116 return err
117 }
118 fmt.Println(string(b))
119
120 br, err := car.NewBlockReader(bytes.NewReader(match.Blocks))
121 if err != nil {
122 return err
123 }
124
125 fmt.Println("\nSlice Dump:")
126 fmt.Println("Root: ", br.Roots[0])
127 for {
128 blk, err := br.Next()
129 if err != nil {
130 if err == io.EOF {
131 break
132 }
133 return err
134 }
135
136 fmt.Println(blk.Cid())
137 if cctx.Bool("dump-raw-blocks") {
138 fmt.Printf("%x\n", blk.RawData())
139 }
140 }
141
142 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(match.Blocks))
143 if err != nil {
144 return fmt.Errorf("opening repo from slice: %w", err)
145 }
146
147 fmt.Println("\nOps: ")
148 for _, op := range match.Ops {
149 switch repomgr.EventKind(op.Action) {
150 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
151 rcid, _, err := r.GetRecord(ctx, op.Path)
152 if err != nil {
153 return fmt.Errorf("loading %q: %w", op.Path, err)
154 }
155 if rcid != cid.Cid(*op.Cid) {
156 return fmt.Errorf("mismatch in record cid %s != %s", rcid, *op.Cid)
157 }
158 fmt.Printf("%s (%s): %s\n", op.Action, op.Path, *op.Cid)
159 }
160 }
161
162 return nil
163 },
164}
165
166type eventInfo struct {
167 LastSeq int64
168 LastRev string
169}
170
171func cidStr(c *lexutil.LexLink) string {
172 if c == nil {
173 return "<nil>"
174 }
175
176 return c.String()
177}
178
179var debugStreamCmd = &cli.Command{
180 Name: "debug-stream",
181 Flags: []cli.Flag{
182 &cli.StringFlag{
183 Name: "host",
184 Required: true,
185 },
186 &cli.BoolFlag{
187 Name: "dump-raw-blocks",
188 },
189 },
190 ArgsUsage: `<cursor>`,
191 Action: func(cctx *cli.Context) error {
192 n, err := strconv.Atoi(cctx.Args().First())
193 if err != nil {
194 return err
195 }
196
197 h := cctx.String("host")
198
199 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n)
200 d := websocket.DefaultDialer
201 con, _, err := d.Dial(url, http.Header{})
202 if err != nil {
203 return fmt.Errorf("dial failure: %w", err)
204 }
205
206 infos := make(map[string]*eventInfo)
207
208 var lastSeq int64 = -1
209 ctx := context.TODO()
210 rsc := &events.RepoStreamCallbacks{
211 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
212
213 fmt.Printf("\rChecking seq: %d ", evt.Seq)
214 if lastSeq > 0 && evt.Seq != lastSeq+1 {
215 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
216 }
217 lastSeq = evt.Seq
218
219 if !evt.TooBig {
220 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
221 if err != nil {
222 fmt.Printf("\nEvent at sequence %d had an invalid repo slice: %s\n", evt.Seq, err)
223 return nil
224 } else {
225 _ = r
226 /* "prev" is no longer included in #commit messages
227 prev, err := r.PrevCommit(ctx)
228 if err != nil {
229 return err
230 }
231
232 var cs, es string
233 if prev != nil {
234 cs = prev.String()
235 }
236
237 if evt.Prev != nil {
238 es = evt.Prev.String()
239 }
240
241 if !evt.Rebase && cs != es {
242 fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev)
243 }
244 */
245 }
246 }
247
248 cur, ok := infos[evt.Repo]
249 if ok {
250 if evt.Since != nil && cur.LastRev != *evt.Since {
251 /*
252 fmt.Println()
253 fmt.Printf("Event at sequence %d, repo=%s had since=%s, but last rev we saw was %s (seq=%d)\n", evt.Seq, evt.Repo, evt.Since, cur.LastRev, cur.LastSeq)
254 */
255 }
256 }
257
258 infos[evt.Repo] = &eventInfo{
259 LastSeq: evt.Seq,
260 LastRev: evt.Rev,
261 }
262
263 return nil
264 },
265 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error {
266 fmt.Printf("\rChecking seq: %d ", evt.Seq)
267 if lastSeq > 0 && evt.Seq != lastSeq+1 {
268 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
269 }
270 lastSeq = evt.Seq
271 return nil
272 },
273 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
274 return nil
275 },
276 // TODO: all the other Repo* event types
277 Error: func(evt *events.ErrorFrame) error {
278 return fmt.Errorf("%s: %s", evt.Error, evt.Message)
279 },
280 }
281 seqScheduler := sequential.NewScheduler("debug-stream", rsc.EventHandler)
282 err = events.HandleRepoStream(ctx, con, seqScheduler, nil)
283 if err != nil {
284 return err
285 }
286
287 return nil
288 },
289}
290
291var compareStreamsCmd = &cli.Command{
292 Name: "compare-streams",
293 Flags: []cli.Flag{
294 &cli.StringFlag{
295 Name: "host1",
296 Required: true,
297 },
298 &cli.StringFlag{
299 Name: "host2",
300 Required: true,
301 },
302 },
303 ArgsUsage: `<cursor>`,
304 Action: func(cctx *cli.Context) error {
305 h1 := cctx.String("host1")
306 h2 := cctx.String("host2")
307
308 url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1)
309 url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2)
310
311 d := websocket.DefaultDialer
312
313 eventChans := []chan *comatproto.SyncSubscribeRepos_Commit{
314 make(chan *comatproto.SyncSubscribeRepos_Commit, 2),
315 make(chan *comatproto.SyncSubscribeRepos_Commit, 2),
316 }
317
318 buffers := []map[string][]*comatproto.SyncSubscribeRepos_Commit{
319 make(map[string][]*comatproto.SyncSubscribeRepos_Commit),
320 make(map[string][]*comatproto.SyncSubscribeRepos_Commit),
321 }
322
323 addToBuffer := func(n int, event *comatproto.SyncSubscribeRepos_Commit) {
324 buffers[n][event.Repo] = append(buffers[n][event.Repo], event)
325 }
326
327 pll := func(ll *lexutil.LexLink) string {
328 if ll == nil {
329 return "<nil>"
330 }
331 return ll.String()
332 }
333
334 findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (*comatproto.SyncSubscribeRepos_Commit, error) {
335 buf := buffers[n]
336 slice, ok := buf[event.Repo]
337 if !ok || len(slice) == 0 {
338 return nil, nil
339 }
340
341 for i, ev := range slice {
342 if ev.Commit == event.Commit {
343 _ = pll
344 /* TODO: prev is no longer included in #commit messages; could use prevData or rev?
345 if pll(ev.Prev) != pll(event.Prev) {
346 // same commit different prev??
347 return nil, fmt.Errorf("matched event with same commit but different prev: (%d) %d - %d", n, ev.Seq, event.Seq)
348 }
349 */
350 }
351
352 if i != 0 {
353 fmt.Printf("detected skipped event: %d (%d)\n", slice[0].Seq, i)
354 }
355
356 slice = slice[i+1:]
357 buf[event.Repo] = slice
358 return ev, nil
359 }
360
361 return nil, fmt.Errorf("did not find matching event despite having events in buffer")
362 }
363
364 printCurrentDelta := func() {
365 var a, b int
366 for _, sl := range buffers[0] {
367 a += len(sl)
368 }
369 for _, sl := range buffers[1] {
370 b += len(sl)
371 }
372
373 fmt.Printf("%d %d\n", a, b)
374 }
375
376 printDetailedDelta := func() {
377 for did, sl := range buffers[0] {
378 osl := buffers[1][did]
379 if len(osl) > 0 && len(sl) > 0 {
380 fmt.Printf("%s had mismatched events on both streams (%d, %d)\n", did, len(sl), len(osl))
381 }
382
383 }
384 }
385
386 // Create two goroutines for reading events from two URLs
387 for i, url := range []string{url1, url2} {
388 go func(i int, url string) {
389 con, _, err := d.Dial(url, http.Header{})
390 if err != nil {
391 log.Error("Dial failure", "i", i, "url", url, "err", err)
392 os.Exit(1)
393 }
394
395 ctx := context.TODO()
396 rsc := &events.RepoStreamCallbacks{
397 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
398 eventChans[i] <- evt
399 return nil
400 },
401 // TODO: all the other Repo* event types
402 Error: func(evt *events.ErrorFrame) error {
403 return fmt.Errorf("%s: %s", evt.Error, evt.Message)
404 },
405 }
406 seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler)
407 if err := events.HandleRepoStream(ctx, con, seqScheduler, nil); err != nil {
408 log.Error("HandleRepoStream failure", "i", i, "url", url, "err", err)
409 os.Exit(1)
410 }
411 }(i, url)
412 }
413
414 ch := make(chan os.Signal, 1)
415 signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
416
417 // Compare events from the two URLs
418 for {
419 select {
420 case event := <-eventChans[0]:
421 partner, err := findMatchAndRemove(1, event)
422 if err != nil {
423 fmt.Println("checking for match failed: ", err)
424 continue
425 }
426 if partner == nil {
427 addToBuffer(0, event)
428 } else {
429 // the good case
430 fmt.Println("Match found")
431 }
432
433 case event := <-eventChans[1]:
434 partner, err := findMatchAndRemove(0, event)
435 if err != nil {
436 fmt.Println("checking for match failed: ", err)
437 continue
438 }
439 if partner == nil {
440 addToBuffer(1, event)
441 } else {
442 // the good case
443 fmt.Println("Match found")
444 }
445 case <-ch:
446 printDetailedDelta()
447 /*
448 b, err := json.Marshal(buffers)
449 if err != nil {
450 return err
451 }
452
453 fmt.Println(string(b))
454 */
455 return nil
456 }
457
458 printCurrentDelta()
459 }
460 },
461}
462
463var debugFeedGenCmd = &cli.Command{
464 Name: "debug-feed",
465 ArgsUsage: "<at-uri>",
466 Action: func(cctx *cli.Context) error {
467 xrpcc, err := cliutil.GetXrpcClient(cctx, true)
468 if err != nil {
469 return err
470 }
471
472 didr := cliutil.GetDidResolver(cctx)
473
474 uri := cctx.Args().First()
475 puri, err := util.ParseAtUri(uri)
476 if err != nil {
477 return err
478 }
479
480 ctx := context.TODO()
481
482 out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey)
483 if err != nil {
484 return fmt.Errorf("getting record: %w", err)
485 }
486
487 fgr, ok := out.Value.Val.(*bsky.FeedGenerator)
488 if !ok {
489 return fmt.Errorf("invalid feedgen record")
490 }
491
492 fmt.Println("Feed DID is: ", fgr.Did)
493 doc, err := didr.GetDocument(ctx, fgr.Did)
494 if err != nil {
495 return err
496 }
497
498 fmt.Println("Got service did document:")
499 b, err := json.MarshalIndent(doc, "", " ")
500 if err != nil {
501 return err
502 }
503 fmt.Println(string(b))
504
505 var ss *did.Service
506 for _, s := range doc.Service {
507 if s.ID.String() == "#bsky_fg" {
508 cp := s
509 ss = &cp
510 break
511 }
512 }
513
514 if ss == nil {
515 return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document")
516 }
517
518 fmt.Println("Service endpoint is: ", ss.ServiceEndpoint)
519
520 fgclient := &xrpc.Client{
521 Host: ss.ServiceEndpoint,
522 }
523
524 desc, err := bsky.FeedDescribeFeedGenerator(ctx, fgclient)
525 if err != nil {
526 return err
527 }
528
529 fmt.Printf("Found %d feeds at discovered endpoint\n", len(desc.Feeds))
530 var found bool
531 for _, f := range desc.Feeds {
532 fmt.Println("Feed: ", f.Uri)
533 if f.Uri == uri {
534 found = true
535 break
536 }
537 }
538
539 if !found {
540 return fmt.Errorf("specified feed was not present in linked feedGenerators 'describe' method output")
541 }
542
543 skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, "", uri, 30)
544 if err != nil {
545 return fmt.Errorf("failed to fetch feed skeleton: %w", err)
546 }
547
548 if len(skel.Feed) > 30 {
549 return fmt.Errorf("feedgen not respecting limit param (returned %d posts)", len(skel.Feed))
550 }
551
552 if len(skel.Feed) == 0 {
553 return fmt.Errorf("feedgen response is empty (might be expected since we aren't authed)")
554 }
555
556 fmt.Println("Feed response looks good!")
557
558 seen := make(map[string]bool)
559 for _, p := range skel.Feed {
560 seen[p.Post] = true
561 }
562
563 curs := skel.Cursor
564 for i := 0; i < 10 && curs != nil; i++ {
565 fmt.Println("Response had cursor: ", *curs)
566 nresp, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, *curs, uri, 10)
567 if err != nil {
568 return fmt.Errorf("fetching paginated feed failed: %w", err)
569 }
570
571 fmt.Printf("Got %d posts from cursored query\n", len(nresp.Feed))
572
573 if len(nresp.Feed) > 10 {
574 return fmt.Errorf("got more posts than we requested")
575 }
576
577 for _, p := range nresp.Feed {
578 if seen[p.Post] {
579 return fmt.Errorf("duplicate post in response: %s", p.Post)
580 }
581
582 seen[p.Post] = true
583 }
584
585 if len(nresp.Feed) == 0 || nresp.Cursor == nil {
586 break
587 }
588
589 curs = nresp.Cursor
590 }
591
592 return nil
593 },
594}
595var debugFeedViewCmd = &cli.Command{
596 Name: "view-feed",
597 Usage: "<at-uri>",
598 Action: func(cctx *cli.Context) error {
599 xrpcc, err := cliutil.GetXrpcClient(cctx, true)
600 if err != nil {
601 return err
602 }
603
604 didr := cliutil.GetDidResolver(cctx)
605
606 uri := cctx.Args().First()
607 puri, err := util.ParseAtUri(uri)
608 if err != nil {
609 return err
610 }
611
612 ctx := context.TODO()
613
614 out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey)
615 if err != nil {
616 return fmt.Errorf("getting record: %w", err)
617 }
618
619 fgr, ok := out.Value.Val.(*bsky.FeedGenerator)
620 if !ok {
621 return fmt.Errorf("invalid feedgen record")
622 }
623
624 doc, err := didr.GetDocument(ctx, fgr.Did)
625 if err != nil {
626 return err
627 }
628
629 var ss *did.Service
630 for _, s := range doc.Service {
631 if s.ID.String() == "#bsky_fg" {
632 cp := s
633 ss = &cp
634 break
635 }
636 }
637
638 if ss == nil {
639 return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document")
640 }
641
642 fgclient := &xrpc.Client{
643 Host: ss.ServiceEndpoint,
644 }
645
646 cache, err := loadCache("postcache.json")
647 if err != nil {
648 return err
649 }
650 var cacheUpdate bool
651
652 var cursor string
653 getPage := func(curs string) ([]*bsky.FeedDefs_PostView, error) {
654 skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, cursor, uri, 30)
655 if err != nil {
656 return nil, fmt.Errorf("failed to fetch feed skeleton: %w", err)
657 }
658
659 if skel.Cursor != nil {
660 cursor = *skel.Cursor
661 }
662
663 var posts []*bsky.FeedDefs_PostView
664 for _, fp := range skel.Feed {
665 cached, ok := cache[fp.Post]
666 if ok {
667 posts = append(posts, cached)
668 continue
669 }
670 fps, err := bsky.FeedGetPosts(ctx, xrpcc, []string{fp.Post})
671 if err != nil {
672 return nil, err
673 }
674
675 if len(fps.Posts) == 0 {
676 fmt.Println("FAILED TO GET POST: ", fp.Post)
677 continue
678 }
679 p := fps.Posts[0]
680 rec := p.Record.Val.(*bsky.FeedPost)
681 rec.Embed = nil // nil out embeds since they sometimes fail to json marshal...
682 posts = append(posts, p)
683 cache[fp.Post] = p
684 cacheUpdate = true
685 }
686
687 return posts, nil
688 }
689
690 printPosts := func(posts []*bsky.FeedDefs_PostView) {
691 for _, p := range posts {
692 fp, ok := p.Record.Val.(*bsky.FeedPost)
693 if !ok {
694 fmt.Printf("ERROR: Post had invalid record type: %T\n", p.Record.Val)
695 continue
696 }
697 text := fp.Text
698 text = strings.Replace(text, "\n", " ", -1)
699 if len(text) > 70 {
700 text = text[:70] + "..."
701 }
702
703 dn := p.Author.Handle
704 if p.Author.DisplayName != nil {
705 dn = *p.Author.DisplayName
706 }
707
708 fmt.Printf("%s: %s\n", dn, text)
709 }
710 }
711
712 seen := make(map[string]bool)
713 for i := 1; i < 5; i++ {
714 fmt.Printf("PAGE %d - cursor: %s\n", i, cursor)
715 posts, err := getPage(cursor)
716 if err != nil {
717 return err
718 }
719 var alreadySeen int
720 for _, p := range posts {
721 if seen[p.Uri] {
722 alreadySeen++
723 }
724 seen[p.Uri] = true
725 }
726 fmt.Printf("Already saw %d / %d posts in page 1\n", alreadySeen, len(posts))
727 printPosts(posts)
728 fmt.Println("")
729 fmt.Println("")
730 }
731
732 if cacheUpdate {
733 if err := saveCache("postcache.json", cache); err != nil {
734 return err
735 }
736 }
737
738 return nil
739 },
740}
741
742func loadCache(filename string) (map[string]*bsky.FeedDefs_PostView, error) {
743 var data map[string]*bsky.FeedDefs_PostView
744
745 jsonFile, err := os.Open(filename)
746 if err != nil {
747 if os.IsNotExist(err) {
748 return make(map[string]*bsky.FeedDefs_PostView), nil
749 }
750
751 return nil, fmt.Errorf("failed to open file: %w", err)
752 }
753 defer jsonFile.Close()
754
755 byteValue, err := io.ReadAll(jsonFile)
756 if err != nil {
757 return nil, fmt.Errorf("failed to read file: %w", err)
758 }
759
760 err = json.Unmarshal(byteValue, &data)
761 if err != nil {
762 return nil, fmt.Errorf("failed to unmarshal json: %w", err)
763 }
764
765 return data, nil
766}
767
768func saveCache(filename string, data map[string]*bsky.FeedDefs_PostView) error {
769 file, err := json.MarshalIndent(data, "", " ")
770 if err != nil {
771 return fmt.Errorf("failed to marshal json: %w", err)
772 }
773
774 err = os.WriteFile(filename, file, 0644)
775 if err != nil {
776 return fmt.Errorf("failed to write file: %w", err)
777 }
778
779 return nil
780}
781
782var debugGetRepoCmd = &cli.Command{
783 Name: "get-repo",
784 Flags: []cli.Flag{},
785 ArgsUsage: `<did>`,
786 Action: func(cctx *cli.Context) error {
787 xrpcc, err := cliutil.GetXrpcClient(cctx, false)
788 if err != nil {
789 return err
790 }
791
792 ctx := context.TODO()
793
794 repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "")
795 if err != nil {
796 return fmt.Errorf("getting repo: %w", err)
797 }
798
799 rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repobytes))
800 if err != nil {
801 return err
802 }
803
804 fmt.Println("Rev: ", rep.SignedCommit().Rev)
805 var count int
806 if err := rep.ForEach(ctx, "", func(k string, v cid.Cid) error {
807 rec, err := rep.Blockstore().Get(ctx, v)
808 if err != nil {
809 return fmt.Errorf("getting record %q: %w", k, err)
810 }
811
812 count++
813 _ = rec
814 return nil
815 }); err != nil {
816 return err
817 }
818 fmt.Printf("scanned %d records\n", count)
819
820 return nil
821 },
822}
823
824var debugCompareReposCmd = &cli.Command{
825 Name: "compare-repos",
826 Flags: []cli.Flag{
827 &cli.StringFlag{
828 Name: "host-1",
829 Usage: "method, hostname, and port of PDS instance",
830 Value: "https://bsky.social",
831 },
832 &cli.StringFlag{
833 Name: "host-2",
834 Usage: "method, hostname, and port of PDS instance",
835 Value: "https://bsky.network",
836 },
837 },
838 ArgsUsage: `<did>`,
839 Action: func(cctx *cli.Context) error {
840 ctx := cctx.Context
841 did, err := syntax.ParseAtIdentifier(cctx.Args().First())
842 if err != nil {
843 return err
844 }
845
846 wg := sync.WaitGroup{}
847 wg.Add(2)
848
849 xrpc1 := xrpc.Client{
850 Host: cctx.String("host-1"),
851 Client: &http.Client{
852 Timeout: 15 * time.Minute,
853 },
854 }
855
856 if !cctx.IsSet("host-1") {
857 dir := identity.DefaultDirectory()
858 ident, err := dir.Lookup(ctx, *did)
859 if err != nil {
860 return err
861 }
862
863 xrpc1.Host = ident.PDSEndpoint()
864 }
865
866 xrpc2 := xrpc.Client{
867 Host: cctx.String("host-2"),
868 Client: &http.Client{
869 Timeout: 15 * time.Minute,
870 },
871 }
872
873 var rep1 *repo.Repo
874 go func() {
875 defer wg.Done()
876 logger := log.With("host", cctx.String("host-1"))
877 repo1bytes, err := comatproto.SyncGetRepo(ctx, &xrpc1, did.String(), "")
878 if err != nil {
879 logger.Error("getting repo", "err", err)
880 os.Exit(1)
881 return
882 }
883
884 rep1, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo1bytes))
885 if err != nil {
886 logger.Error("reading repo", "err", err, "bytes", len(repo1bytes))
887 os.Exit(1)
888 return
889 }
890 }()
891
892 var rep2 *repo.Repo
893 go func() {
894 defer wg.Done()
895 logger := log.With("host", cctx.String("host-2"))
896 repo2bytes, err := comatproto.SyncGetRepo(ctx, &xrpc2, did.String(), "")
897 if err != nil {
898 logger.Error("getting repo", "err", err)
899 os.Exit(1)
900 return
901 }
902
903 rep2, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo2bytes))
904 if err != nil {
905 logger.Error("reading repo", "err", err, "bytes", len(repo2bytes))
906 os.Exit(1)
907 return
908 }
909 }()
910
911 wg.Wait()
912
913 cids1 := []cid.Cid{}
914 blocks1 := []blocks.Block{}
915
916 fmt.Println("Host 1 Results")
917 fmt.Println("Rev: ", rep1.SignedCommit().Rev)
918 var count int
919 if err := rep1.ForEach(ctx, "", func(k string, v cid.Cid) error {
920 cids1 = append(cids1, v)
921 rec, err := rep1.Blockstore().Get(ctx, v)
922 if err != nil {
923 return fmt.Errorf("getting record %q: %w", k, err)
924 }
925 blocks1 = append(blocks1, rec)
926
927 count++
928 _ = rec
929 return nil
930 }); err != nil {
931 return err
932 }
933 fmt.Printf("scanned %d records\n", count)
934
935 cids2 := []cid.Cid{}
936 blocks2 := []blocks.Block{}
937
938 fmt.Println("\nHost 2 Results")
939 fmt.Println("Rev: ", rep2.SignedCommit().Rev)
940 count = 0
941 if err := rep2.ForEach(ctx, "", func(k string, v cid.Cid) error {
942 cids2 = append(cids2, v)
943 rec, err := rep2.Blockstore().Get(ctx, v)
944 if err != nil {
945 return fmt.Errorf("getting record %q: %w", k, err)
946 }
947 blocks2 = append(blocks2, rec)
948
949 count++
950 _ = rec
951 return nil
952 }); err != nil {
953 return err
954 }
955 fmt.Printf("scanned %d records\n", count)
956
957 fmt.Println("\nComparing CIDs")
958 hasBadCid := false
959 for i, c1 := range cids1 {
960 if c1 != cids2[i] {
961 fmt.Printf("CID mismatch at index %d: %s != %s\n", i, c1, cids2[i])
962 hasBadCid = true
963 }
964 }
965
966 if !hasBadCid {
967 fmt.Println("All CIDs match!")
968 }
969
970 fmt.Println("Comparing blocks")
971 hasBadBlock := false
972 for i, b1 := range blocks1 {
973 if !bytes.Equal(b1.RawData(), blocks2[i].RawData()) {
974 fmt.Printf("Block mismatch at index %d Host 1 Cid (%s) Host 2 Cid (%s)\n", i, b1.Cid().String(), blocks2[i].Cid().String())
975 hasBadBlock = true
976 }
977 }
978
979 if !hasBadBlock {
980 fmt.Println("All blocks match!")
981 }
982
983 if hasBadBlock || hasBadCid {
984 return fmt.Errorf("mismatched blocks or cids")
985 }
986
987 return nil
988 },
989}