fork of indigo with slightly nicer lexgen
at main 23 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "net/http" 10 "os" 11 "os/signal" 12 "strconv" 13 "strings" 14 "sync" 15 "syscall" 16 "time" 17 18 "github.com/bluesky-social/indigo/api/atproto" 19 comatproto "github.com/bluesky-social/indigo/api/atproto" 20 "github.com/bluesky-social/indigo/api/bsky" 21 "github.com/bluesky-social/indigo/atproto/identity" 22 "github.com/bluesky-social/indigo/atproto/syntax" 23 "github.com/bluesky-social/indigo/did" 24 "github.com/bluesky-social/indigo/events" 25 "github.com/bluesky-social/indigo/events/schedulers/sequential" 26 lexutil "github.com/bluesky-social/indigo/lex/util" 27 "github.com/bluesky-social/indigo/repo" 28 "github.com/bluesky-social/indigo/repomgr" 29 "github.com/bluesky-social/indigo/util" 30 "github.com/bluesky-social/indigo/util/cliutil" 31 "github.com/bluesky-social/indigo/xrpc" 32 33 "github.com/gorilla/websocket" 34 "github.com/ipfs/go-cid" 35 "github.com/ipfs/go-libipfs/blocks" 36 "github.com/ipld/go-car/v2" 37 cli "github.com/urfave/cli/v2" 38) 39 40var debugCmd = &cli.Command{ 41 Name: "debug", 42 Usage: "a set of debugging utilities for atproto", 43 Subcommands: []*cli.Command{ 44 inspectEventCmd, 45 debugStreamCmd, 46 debugFeedGenCmd, 47 debugFeedViewCmd, 48 compareStreamsCmd, 49 debugGetRepoCmd, 50 debugCompareReposCmd, 51 }, 52} 53 54var inspectEventCmd = &cli.Command{ 55 Name: "inspect-event", 56 Flags: []cli.Flag{ 57 &cli.StringFlag{ 58 Name: "host", 59 Required: true, 60 }, 61 &cli.BoolFlag{ 62 Name: "dump-raw-blocks", 63 }, 64 }, 65 ArgsUsage: `<cursor>`, 66 Action: func(cctx *cli.Context) error { 67 n, err := strconv.Atoi(cctx.Args().First()) 68 if err != nil { 69 return err 70 } 71 72 h := cctx.String("host") 73 74 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n-1) 75 d := websocket.DefaultDialer 76 con, _, err := d.Dial(url, http.Header{}) 77 if err != nil { 78 return fmt.Errorf("dial failure: %w", err) 79 } 80 81 var errFoundIt = fmt.Errorf("gotem") 82 83 var match *comatproto.SyncSubscribeRepos_Commit 84 85 ctx := context.TODO() 86 rsc := &events.RepoStreamCallbacks{ 87 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 88 n := int64(n) 89 if evt.Seq == n { 90 match = evt 91 return errFoundIt 92 } 93 if evt.Seq > n { 94 return fmt.Errorf("record not found in stream") 95 } 96 97 return nil 98 }, 99 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error { 100 return nil 101 }, 102 // TODO: all the other Repo* event types 103 Error: func(evt *events.ErrorFrame) error { 104 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 105 }, 106 } 107 108 seqScheduler := sequential.NewScheduler("debug-inspect-event", rsc.EventHandler) 109 err = events.HandleRepoStream(ctx, con, seqScheduler, nil) 110 if err != errFoundIt { 111 return err 112 } 113 114 b, err := json.MarshalIndent(match, "", " ") 115 if err != nil { 116 return err 117 } 118 fmt.Println(string(b)) 119 120 br, err := car.NewBlockReader(bytes.NewReader(match.Blocks)) 121 if err != nil { 122 return err 123 } 124 125 fmt.Println("\nSlice Dump:") 126 fmt.Println("Root: ", br.Roots[0]) 127 for { 128 blk, err := br.Next() 129 if err != nil { 130 if err == io.EOF { 131 break 132 } 133 return err 134 } 135 136 fmt.Println(blk.Cid()) 137 if cctx.Bool("dump-raw-blocks") { 138 fmt.Printf("%x\n", blk.RawData()) 139 } 140 } 141 142 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(match.Blocks)) 143 if err != nil { 144 return fmt.Errorf("opening repo from slice: %w", err) 145 } 146 147 fmt.Println("\nOps: ") 148 for _, op := range match.Ops { 149 switch repomgr.EventKind(op.Action) { 150 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 151 rcid, _, err := r.GetRecord(ctx, op.Path) 152 if err != nil { 153 return fmt.Errorf("loading %q: %w", op.Path, err) 154 } 155 if rcid != cid.Cid(*op.Cid) { 156 return fmt.Errorf("mismatch in record cid %s != %s", rcid, *op.Cid) 157 } 158 fmt.Printf("%s (%s): %s\n", op.Action, op.Path, *op.Cid) 159 } 160 } 161 162 return nil 163 }, 164} 165 166type eventInfo struct { 167 LastSeq int64 168 LastRev string 169} 170 171func cidStr(c *lexutil.LexLink) string { 172 if c == nil { 173 return "<nil>" 174 } 175 176 return c.String() 177} 178 179var debugStreamCmd = &cli.Command{ 180 Name: "debug-stream", 181 Flags: []cli.Flag{ 182 &cli.StringFlag{ 183 Name: "host", 184 Required: true, 185 }, 186 &cli.BoolFlag{ 187 Name: "dump-raw-blocks", 188 }, 189 }, 190 ArgsUsage: `<cursor>`, 191 Action: func(cctx *cli.Context) error { 192 n, err := strconv.Atoi(cctx.Args().First()) 193 if err != nil { 194 return err 195 } 196 197 h := cctx.String("host") 198 199 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", h, n) 200 d := websocket.DefaultDialer 201 con, _, err := d.Dial(url, http.Header{}) 202 if err != nil { 203 return fmt.Errorf("dial failure: %w", err) 204 } 205 206 infos := make(map[string]*eventInfo) 207 208 var lastSeq int64 = -1 209 ctx := context.TODO() 210 rsc := &events.RepoStreamCallbacks{ 211 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 212 213 fmt.Printf("\rChecking seq: %d ", evt.Seq) 214 if lastSeq > 0 && evt.Seq != lastSeq+1 { 215 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq) 216 } 217 lastSeq = evt.Seq 218 219 if !evt.TooBig { 220 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 221 if err != nil { 222 fmt.Printf("\nEvent at sequence %d had an invalid repo slice: %s\n", evt.Seq, err) 223 return nil 224 } else { 225 _ = r 226 /* "prev" is no longer included in #commit messages 227 prev, err := r.PrevCommit(ctx) 228 if err != nil { 229 return err 230 } 231 232 var cs, es string 233 if prev != nil { 234 cs = prev.String() 235 } 236 237 if evt.Prev != nil { 238 es = evt.Prev.String() 239 } 240 241 if !evt.Rebase && cs != es { 242 fmt.Printf("\nEvent at sequence %d has mismatch between slice prev and struct prev: %s != %s\n", evt.Seq, prev, evt.Prev) 243 } 244 */ 245 } 246 } 247 248 cur, ok := infos[evt.Repo] 249 if ok { 250 if evt.Since != nil && cur.LastRev != *evt.Since { 251 /* 252 fmt.Println() 253 fmt.Printf("Event at sequence %d, repo=%s had since=%s, but last rev we saw was %s (seq=%d)\n", evt.Seq, evt.Repo, evt.Since, cur.LastRev, cur.LastSeq) 254 */ 255 } 256 } 257 258 infos[evt.Repo] = &eventInfo{ 259 LastSeq: evt.Seq, 260 LastRev: evt.Rev, 261 } 262 263 return nil 264 }, 265 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 266 fmt.Printf("\rChecking seq: %d ", evt.Seq) 267 if lastSeq > 0 && evt.Seq != lastSeq+1 { 268 fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq) 269 } 270 lastSeq = evt.Seq 271 return nil 272 }, 273 RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error { 274 return nil 275 }, 276 // TODO: all the other Repo* event types 277 Error: func(evt *events.ErrorFrame) error { 278 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 279 }, 280 } 281 seqScheduler := sequential.NewScheduler("debug-stream", rsc.EventHandler) 282 err = events.HandleRepoStream(ctx, con, seqScheduler, nil) 283 if err != nil { 284 return err 285 } 286 287 return nil 288 }, 289} 290 291var compareStreamsCmd = &cli.Command{ 292 Name: "compare-streams", 293 Flags: []cli.Flag{ 294 &cli.StringFlag{ 295 Name: "host1", 296 Required: true, 297 }, 298 &cli.StringFlag{ 299 Name: "host2", 300 Required: true, 301 }, 302 }, 303 ArgsUsage: `<cursor>`, 304 Action: func(cctx *cli.Context) error { 305 h1 := cctx.String("host1") 306 h2 := cctx.String("host2") 307 308 url1 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h1) 309 url2 := fmt.Sprintf("%s/xrpc/com.atproto.sync.subscribeRepos", h2) 310 311 d := websocket.DefaultDialer 312 313 eventChans := []chan *comatproto.SyncSubscribeRepos_Commit{ 314 make(chan *comatproto.SyncSubscribeRepos_Commit, 2), 315 make(chan *comatproto.SyncSubscribeRepos_Commit, 2), 316 } 317 318 buffers := []map[string][]*comatproto.SyncSubscribeRepos_Commit{ 319 make(map[string][]*comatproto.SyncSubscribeRepos_Commit), 320 make(map[string][]*comatproto.SyncSubscribeRepos_Commit), 321 } 322 323 addToBuffer := func(n int, event *comatproto.SyncSubscribeRepos_Commit) { 324 buffers[n][event.Repo] = append(buffers[n][event.Repo], event) 325 } 326 327 pll := func(ll *lexutil.LexLink) string { 328 if ll == nil { 329 return "<nil>" 330 } 331 return ll.String() 332 } 333 334 findMatchAndRemove := func(n int, event *comatproto.SyncSubscribeRepos_Commit) (*comatproto.SyncSubscribeRepos_Commit, error) { 335 buf := buffers[n] 336 slice, ok := buf[event.Repo] 337 if !ok || len(slice) == 0 { 338 return nil, nil 339 } 340 341 for i, ev := range slice { 342 if ev.Commit == event.Commit { 343 _ = pll 344 /* TODO: prev is no longer included in #commit messages; could use prevData or rev? 345 if pll(ev.Prev) != pll(event.Prev) { 346 // same commit different prev?? 347 return nil, fmt.Errorf("matched event with same commit but different prev: (%d) %d - %d", n, ev.Seq, event.Seq) 348 } 349 */ 350 } 351 352 if i != 0 { 353 fmt.Printf("detected skipped event: %d (%d)\n", slice[0].Seq, i) 354 } 355 356 slice = slice[i+1:] 357 buf[event.Repo] = slice 358 return ev, nil 359 } 360 361 return nil, fmt.Errorf("did not find matching event despite having events in buffer") 362 } 363 364 printCurrentDelta := func() { 365 var a, b int 366 for _, sl := range buffers[0] { 367 a += len(sl) 368 } 369 for _, sl := range buffers[1] { 370 b += len(sl) 371 } 372 373 fmt.Printf("%d %d\n", a, b) 374 } 375 376 printDetailedDelta := func() { 377 for did, sl := range buffers[0] { 378 osl := buffers[1][did] 379 if len(osl) > 0 && len(sl) > 0 { 380 fmt.Printf("%s had mismatched events on both streams (%d, %d)\n", did, len(sl), len(osl)) 381 } 382 383 } 384 } 385 386 // Create two goroutines for reading events from two URLs 387 for i, url := range []string{url1, url2} { 388 go func(i int, url string) { 389 con, _, err := d.Dial(url, http.Header{}) 390 if err != nil { 391 log.Error("Dial failure", "i", i, "url", url, "err", err) 392 os.Exit(1) 393 } 394 395 ctx := context.TODO() 396 rsc := &events.RepoStreamCallbacks{ 397 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 398 eventChans[i] <- evt 399 return nil 400 }, 401 // TODO: all the other Repo* event types 402 Error: func(evt *events.ErrorFrame) error { 403 return fmt.Errorf("%s: %s", evt.Error, evt.Message) 404 }, 405 } 406 seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler) 407 if err := events.HandleRepoStream(ctx, con, seqScheduler, nil); err != nil { 408 log.Error("HandleRepoStream failure", "i", i, "url", url, "err", err) 409 os.Exit(1) 410 } 411 }(i, url) 412 } 413 414 ch := make(chan os.Signal, 1) 415 signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT) 416 417 // Compare events from the two URLs 418 for { 419 select { 420 case event := <-eventChans[0]: 421 partner, err := findMatchAndRemove(1, event) 422 if err != nil { 423 fmt.Println("checking for match failed: ", err) 424 continue 425 } 426 if partner == nil { 427 addToBuffer(0, event) 428 } else { 429 // the good case 430 fmt.Println("Match found") 431 } 432 433 case event := <-eventChans[1]: 434 partner, err := findMatchAndRemove(0, event) 435 if err != nil { 436 fmt.Println("checking for match failed: ", err) 437 continue 438 } 439 if partner == nil { 440 addToBuffer(1, event) 441 } else { 442 // the good case 443 fmt.Println("Match found") 444 } 445 case <-ch: 446 printDetailedDelta() 447 /* 448 b, err := json.Marshal(buffers) 449 if err != nil { 450 return err 451 } 452 453 fmt.Println(string(b)) 454 */ 455 return nil 456 } 457 458 printCurrentDelta() 459 } 460 }, 461} 462 463var debugFeedGenCmd = &cli.Command{ 464 Name: "debug-feed", 465 ArgsUsage: "<at-uri>", 466 Action: func(cctx *cli.Context) error { 467 xrpcc, err := cliutil.GetXrpcClient(cctx, true) 468 if err != nil { 469 return err 470 } 471 472 didr := cliutil.GetDidResolver(cctx) 473 474 uri := cctx.Args().First() 475 puri, err := util.ParseAtUri(uri) 476 if err != nil { 477 return err 478 } 479 480 ctx := context.TODO() 481 482 out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey) 483 if err != nil { 484 return fmt.Errorf("getting record: %w", err) 485 } 486 487 fgr, ok := out.Value.Val.(*bsky.FeedGenerator) 488 if !ok { 489 return fmt.Errorf("invalid feedgen record") 490 } 491 492 fmt.Println("Feed DID is: ", fgr.Did) 493 doc, err := didr.GetDocument(ctx, fgr.Did) 494 if err != nil { 495 return err 496 } 497 498 fmt.Println("Got service did document:") 499 b, err := json.MarshalIndent(doc, "", " ") 500 if err != nil { 501 return err 502 } 503 fmt.Println(string(b)) 504 505 var ss *did.Service 506 for _, s := range doc.Service { 507 if s.ID.String() == "#bsky_fg" { 508 cp := s 509 ss = &cp 510 break 511 } 512 } 513 514 if ss == nil { 515 return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document") 516 } 517 518 fmt.Println("Service endpoint is: ", ss.ServiceEndpoint) 519 520 fgclient := &xrpc.Client{ 521 Host: ss.ServiceEndpoint, 522 } 523 524 desc, err := bsky.FeedDescribeFeedGenerator(ctx, fgclient) 525 if err != nil { 526 return err 527 } 528 529 fmt.Printf("Found %d feeds at discovered endpoint\n", len(desc.Feeds)) 530 var found bool 531 for _, f := range desc.Feeds { 532 fmt.Println("Feed: ", f.Uri) 533 if f.Uri == uri { 534 found = true 535 break 536 } 537 } 538 539 if !found { 540 return fmt.Errorf("specified feed was not present in linked feedGenerators 'describe' method output") 541 } 542 543 skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, "", uri, 30) 544 if err != nil { 545 return fmt.Errorf("failed to fetch feed skeleton: %w", err) 546 } 547 548 if len(skel.Feed) > 30 { 549 return fmt.Errorf("feedgen not respecting limit param (returned %d posts)", len(skel.Feed)) 550 } 551 552 if len(skel.Feed) == 0 { 553 return fmt.Errorf("feedgen response is empty (might be expected since we aren't authed)") 554 } 555 556 fmt.Println("Feed response looks good!") 557 558 seen := make(map[string]bool) 559 for _, p := range skel.Feed { 560 seen[p.Post] = true 561 } 562 563 curs := skel.Cursor 564 for i := 0; i < 10 && curs != nil; i++ { 565 fmt.Println("Response had cursor: ", *curs) 566 nresp, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, *curs, uri, 10) 567 if err != nil { 568 return fmt.Errorf("fetching paginated feed failed: %w", err) 569 } 570 571 fmt.Printf("Got %d posts from cursored query\n", len(nresp.Feed)) 572 573 if len(nresp.Feed) > 10 { 574 return fmt.Errorf("got more posts than we requested") 575 } 576 577 for _, p := range nresp.Feed { 578 if seen[p.Post] { 579 return fmt.Errorf("duplicate post in response: %s", p.Post) 580 } 581 582 seen[p.Post] = true 583 } 584 585 if len(nresp.Feed) == 0 || nresp.Cursor == nil { 586 break 587 } 588 589 curs = nresp.Cursor 590 } 591 592 return nil 593 }, 594} 595var debugFeedViewCmd = &cli.Command{ 596 Name: "view-feed", 597 Usage: "<at-uri>", 598 Action: func(cctx *cli.Context) error { 599 xrpcc, err := cliutil.GetXrpcClient(cctx, true) 600 if err != nil { 601 return err 602 } 603 604 didr := cliutil.GetDidResolver(cctx) 605 606 uri := cctx.Args().First() 607 puri, err := util.ParseAtUri(uri) 608 if err != nil { 609 return err 610 } 611 612 ctx := context.TODO() 613 614 out, err := atproto.RepoGetRecord(ctx, xrpcc, "", puri.Collection, puri.Did, puri.Rkey) 615 if err != nil { 616 return fmt.Errorf("getting record: %w", err) 617 } 618 619 fgr, ok := out.Value.Val.(*bsky.FeedGenerator) 620 if !ok { 621 return fmt.Errorf("invalid feedgen record") 622 } 623 624 doc, err := didr.GetDocument(ctx, fgr.Did) 625 if err != nil { 626 return err 627 } 628 629 var ss *did.Service 630 for _, s := range doc.Service { 631 if s.ID.String() == "#bsky_fg" { 632 cp := s 633 ss = &cp 634 break 635 } 636 } 637 638 if ss == nil { 639 return fmt.Errorf("No '#bsky_fg' service entry found in feedgens DID document") 640 } 641 642 fgclient := &xrpc.Client{ 643 Host: ss.ServiceEndpoint, 644 } 645 646 cache, err := loadCache("postcache.json") 647 if err != nil { 648 return err 649 } 650 var cacheUpdate bool 651 652 var cursor string 653 getPage := func(curs string) ([]*bsky.FeedDefs_PostView, error) { 654 skel, err := bsky.FeedGetFeedSkeleton(ctx, fgclient, cursor, uri, 30) 655 if err != nil { 656 return nil, fmt.Errorf("failed to fetch feed skeleton: %w", err) 657 } 658 659 if skel.Cursor != nil { 660 cursor = *skel.Cursor 661 } 662 663 var posts []*bsky.FeedDefs_PostView 664 for _, fp := range skel.Feed { 665 cached, ok := cache[fp.Post] 666 if ok { 667 posts = append(posts, cached) 668 continue 669 } 670 fps, err := bsky.FeedGetPosts(ctx, xrpcc, []string{fp.Post}) 671 if err != nil { 672 return nil, err 673 } 674 675 if len(fps.Posts) == 0 { 676 fmt.Println("FAILED TO GET POST: ", fp.Post) 677 continue 678 } 679 p := fps.Posts[0] 680 rec := p.Record.Val.(*bsky.FeedPost) 681 rec.Embed = nil // nil out embeds since they sometimes fail to json marshal... 682 posts = append(posts, p) 683 cache[fp.Post] = p 684 cacheUpdate = true 685 } 686 687 return posts, nil 688 } 689 690 printPosts := func(posts []*bsky.FeedDefs_PostView) { 691 for _, p := range posts { 692 fp, ok := p.Record.Val.(*bsky.FeedPost) 693 if !ok { 694 fmt.Printf("ERROR: Post had invalid record type: %T\n", p.Record.Val) 695 continue 696 } 697 text := fp.Text 698 text = strings.Replace(text, "\n", " ", -1) 699 if len(text) > 70 { 700 text = text[:70] + "..." 701 } 702 703 dn := p.Author.Handle 704 if p.Author.DisplayName != nil { 705 dn = *p.Author.DisplayName 706 } 707 708 fmt.Printf("%s: %s\n", dn, text) 709 } 710 } 711 712 seen := make(map[string]bool) 713 for i := 1; i < 5; i++ { 714 fmt.Printf("PAGE %d - cursor: %s\n", i, cursor) 715 posts, err := getPage(cursor) 716 if err != nil { 717 return err 718 } 719 var alreadySeen int 720 for _, p := range posts { 721 if seen[p.Uri] { 722 alreadySeen++ 723 } 724 seen[p.Uri] = true 725 } 726 fmt.Printf("Already saw %d / %d posts in page 1\n", alreadySeen, len(posts)) 727 printPosts(posts) 728 fmt.Println("") 729 fmt.Println("") 730 } 731 732 if cacheUpdate { 733 if err := saveCache("postcache.json", cache); err != nil { 734 return err 735 } 736 } 737 738 return nil 739 }, 740} 741 742func loadCache(filename string) (map[string]*bsky.FeedDefs_PostView, error) { 743 var data map[string]*bsky.FeedDefs_PostView 744 745 jsonFile, err := os.Open(filename) 746 if err != nil { 747 if os.IsNotExist(err) { 748 return make(map[string]*bsky.FeedDefs_PostView), nil 749 } 750 751 return nil, fmt.Errorf("failed to open file: %w", err) 752 } 753 defer jsonFile.Close() 754 755 byteValue, err := io.ReadAll(jsonFile) 756 if err != nil { 757 return nil, fmt.Errorf("failed to read file: %w", err) 758 } 759 760 err = json.Unmarshal(byteValue, &data) 761 if err != nil { 762 return nil, fmt.Errorf("failed to unmarshal json: %w", err) 763 } 764 765 return data, nil 766} 767 768func saveCache(filename string, data map[string]*bsky.FeedDefs_PostView) error { 769 file, err := json.MarshalIndent(data, "", " ") 770 if err != nil { 771 return fmt.Errorf("failed to marshal json: %w", err) 772 } 773 774 err = os.WriteFile(filename, file, 0644) 775 if err != nil { 776 return fmt.Errorf("failed to write file: %w", err) 777 } 778 779 return nil 780} 781 782var debugGetRepoCmd = &cli.Command{ 783 Name: "get-repo", 784 Flags: []cli.Flag{}, 785 ArgsUsage: `<did>`, 786 Action: func(cctx *cli.Context) error { 787 xrpcc, err := cliutil.GetXrpcClient(cctx, false) 788 if err != nil { 789 return err 790 } 791 792 ctx := context.TODO() 793 794 repobytes, err := comatproto.SyncGetRepo(ctx, xrpcc, cctx.Args().First(), "") 795 if err != nil { 796 return fmt.Errorf("getting repo: %w", err) 797 } 798 799 rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repobytes)) 800 if err != nil { 801 return err 802 } 803 804 fmt.Println("Rev: ", rep.SignedCommit().Rev) 805 var count int 806 if err := rep.ForEach(ctx, "", func(k string, v cid.Cid) error { 807 rec, err := rep.Blockstore().Get(ctx, v) 808 if err != nil { 809 return fmt.Errorf("getting record %q: %w", k, err) 810 } 811 812 count++ 813 _ = rec 814 return nil 815 }); err != nil { 816 return err 817 } 818 fmt.Printf("scanned %d records\n", count) 819 820 return nil 821 }, 822} 823 824var debugCompareReposCmd = &cli.Command{ 825 Name: "compare-repos", 826 Flags: []cli.Flag{ 827 &cli.StringFlag{ 828 Name: "host-1", 829 Usage: "method, hostname, and port of PDS instance", 830 Value: "https://bsky.social", 831 }, 832 &cli.StringFlag{ 833 Name: "host-2", 834 Usage: "method, hostname, and port of PDS instance", 835 Value: "https://bsky.network", 836 }, 837 }, 838 ArgsUsage: `<did>`, 839 Action: func(cctx *cli.Context) error { 840 ctx := cctx.Context 841 did, err := syntax.ParseAtIdentifier(cctx.Args().First()) 842 if err != nil { 843 return err 844 } 845 846 wg := sync.WaitGroup{} 847 wg.Add(2) 848 849 xrpc1 := xrpc.Client{ 850 Host: cctx.String("host-1"), 851 Client: &http.Client{ 852 Timeout: 15 * time.Minute, 853 }, 854 } 855 856 if !cctx.IsSet("host-1") { 857 dir := identity.DefaultDirectory() 858 ident, err := dir.Lookup(ctx, *did) 859 if err != nil { 860 return err 861 } 862 863 xrpc1.Host = ident.PDSEndpoint() 864 } 865 866 xrpc2 := xrpc.Client{ 867 Host: cctx.String("host-2"), 868 Client: &http.Client{ 869 Timeout: 15 * time.Minute, 870 }, 871 } 872 873 var rep1 *repo.Repo 874 go func() { 875 defer wg.Done() 876 logger := log.With("host", cctx.String("host-1")) 877 repo1bytes, err := comatproto.SyncGetRepo(ctx, &xrpc1, did.String(), "") 878 if err != nil { 879 logger.Error("getting repo", "err", err) 880 os.Exit(1) 881 return 882 } 883 884 rep1, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo1bytes)) 885 if err != nil { 886 logger.Error("reading repo", "err", err, "bytes", len(repo1bytes)) 887 os.Exit(1) 888 return 889 } 890 }() 891 892 var rep2 *repo.Repo 893 go func() { 894 defer wg.Done() 895 logger := log.With("host", cctx.String("host-2")) 896 repo2bytes, err := comatproto.SyncGetRepo(ctx, &xrpc2, did.String(), "") 897 if err != nil { 898 logger.Error("getting repo", "err", err) 899 os.Exit(1) 900 return 901 } 902 903 rep2, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo2bytes)) 904 if err != nil { 905 logger.Error("reading repo", "err", err, "bytes", len(repo2bytes)) 906 os.Exit(1) 907 return 908 } 909 }() 910 911 wg.Wait() 912 913 cids1 := []cid.Cid{} 914 blocks1 := []blocks.Block{} 915 916 fmt.Println("Host 1 Results") 917 fmt.Println("Rev: ", rep1.SignedCommit().Rev) 918 var count int 919 if err := rep1.ForEach(ctx, "", func(k string, v cid.Cid) error { 920 cids1 = append(cids1, v) 921 rec, err := rep1.Blockstore().Get(ctx, v) 922 if err != nil { 923 return fmt.Errorf("getting record %q: %w", k, err) 924 } 925 blocks1 = append(blocks1, rec) 926 927 count++ 928 _ = rec 929 return nil 930 }); err != nil { 931 return err 932 } 933 fmt.Printf("scanned %d records\n", count) 934 935 cids2 := []cid.Cid{} 936 blocks2 := []blocks.Block{} 937 938 fmt.Println("\nHost 2 Results") 939 fmt.Println("Rev: ", rep2.SignedCommit().Rev) 940 count = 0 941 if err := rep2.ForEach(ctx, "", func(k string, v cid.Cid) error { 942 cids2 = append(cids2, v) 943 rec, err := rep2.Blockstore().Get(ctx, v) 944 if err != nil { 945 return fmt.Errorf("getting record %q: %w", k, err) 946 } 947 blocks2 = append(blocks2, rec) 948 949 count++ 950 _ = rec 951 return nil 952 }); err != nil { 953 return err 954 } 955 fmt.Printf("scanned %d records\n", count) 956 957 fmt.Println("\nComparing CIDs") 958 hasBadCid := false 959 for i, c1 := range cids1 { 960 if c1 != cids2[i] { 961 fmt.Printf("CID mismatch at index %d: %s != %s\n", i, c1, cids2[i]) 962 hasBadCid = true 963 } 964 } 965 966 if !hasBadCid { 967 fmt.Println("All CIDs match!") 968 } 969 970 fmt.Println("Comparing blocks") 971 hasBadBlock := false 972 for i, b1 := range blocks1 { 973 if !bytes.Equal(b1.RawData(), blocks2[i].RawData()) { 974 fmt.Printf("Block mismatch at index %d Host 1 Cid (%s) Host 2 Cid (%s)\n", i, b1.Cid().String(), blocks2[i].Cid().String()) 975 hasBadBlock = true 976 } 977 } 978 979 if !hasBadBlock { 980 fmt.Println("All blocks match!") 981 } 982 983 if hasBadBlock || hasBadCid { 984 return fmt.Errorf("mismatched blocks or cids") 985 } 986 987 return nil 988 }, 989}