fork of indigo with slightly nicer lexgen
at main 28 kB view raw
1package repomgr 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "log/slog" 10 "strings" 11 "sync" 12 "time" 13 14 atproto "github.com/bluesky-social/indigo/api/atproto" 15 bsky "github.com/bluesky-social/indigo/api/bsky" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/carstore" 18 lexutil "github.com/bluesky-social/indigo/lex/util" 19 "github.com/bluesky-social/indigo/models" 20 "github.com/bluesky-social/indigo/mst" 21 "github.com/bluesky-social/indigo/repo" 22 "github.com/bluesky-social/indigo/util" 23 24 blocks "github.com/ipfs/go-block-format" 25 "github.com/ipfs/go-cid" 26 "github.com/ipfs/go-datastore" 27 blockstore "github.com/ipfs/go-ipfs-blockstore" 28 ipld "github.com/ipfs/go-ipld-format" 29 "github.com/ipld/go-car" 30 cbg "github.com/whyrusleeping/cbor-gen" 31 "go.opentelemetry.io/otel" 32 "go.opentelemetry.io/otel/attribute" 33 "gorm.io/gorm" 34) 35 36func NewRepoManager(cs carstore.CarStore, kmgr KeyManager) *RepoManager { 37 38 var noArchive bool 39 if _, ok := cs.(*carstore.NonArchivalCarstore); ok { 40 noArchive = true 41 } 42 43 clk := syntax.NewTIDClock(0) 44 45 return &RepoManager{ 46 cs: cs, 47 userLocks: make(map[models.Uid]*userLock), 48 kmgr: kmgr, 49 log: slog.Default().With("system", "repomgr"), 50 noArchive: noArchive, 51 clk: &clk, 52 } 53} 54 55type KeyManager interface { 56 VerifyUserSignature(context.Context, string, []byte, []byte) error 57 SignForUser(context.Context, string, []byte) ([]byte, error) 58} 59 60func (rm *RepoManager) SetEventHandler(cb func(context.Context, *RepoEvent), hydrateRecords bool) { 61 rm.events = cb 62 rm.hydrateRecords = hydrateRecords 63} 64 65type RepoManager struct { 66 cs carstore.CarStore 67 kmgr KeyManager 68 69 lklk sync.Mutex 70 userLocks map[models.Uid]*userLock 71 72 events func(context.Context, *RepoEvent) 73 hydrateRecords bool 74 75 log *slog.Logger 76 noArchive bool 77 78 clk *syntax.TIDClock 79} 80 81type ActorInfo struct { 82 Did string 83 Handle string 84 DisplayName string 85 Type string 86} 87 88type RepoEvent struct { 89 User models.Uid 90 OldRoot *cid.Cid 91 NewRoot cid.Cid 92 Since *string 93 Rev string 94 RepoSlice []byte 95 PDS uint 96 Ops []RepoOp 97} 98 99type RepoOp struct { 100 Kind EventKind 101 Collection string 102 Rkey string 103 RecCid *cid.Cid 104 Record any 105 ActorInfo *ActorInfo 106} 107 108type EventKind string 109 110const ( 111 EvtKindCreateRecord = EventKind("create") 112 EvtKindUpdateRecord = EventKind("update") 113 EvtKindDeleteRecord = EventKind("delete") 114) 115 116type RepoHead struct { 117 gorm.Model 118 Usr models.Uid `gorm:"uniqueIndex"` 119 Root string 120} 121 122type userLock struct { 123 lk sync.Mutex 124 count int 125} 126 127func (rm *RepoManager) lockUser(ctx context.Context, user models.Uid) func() { 128 ctx, span := otel.Tracer("repoman").Start(ctx, "userLock") 129 defer span.End() 130 131 rm.lklk.Lock() 132 133 ulk, ok := rm.userLocks[user] 134 if !ok { 135 ulk = &userLock{} 136 rm.userLocks[user] = ulk 137 } 138 139 ulk.count++ 140 141 rm.lklk.Unlock() 142 143 ulk.lk.Lock() 144 145 return func() { 146 rm.lklk.Lock() 147 148 ulk.lk.Unlock() 149 ulk.count-- 150 151 if ulk.count == 0 { 152 delete(rm.userLocks, user) 153 } 154 rm.lklk.Unlock() 155 } 156} 157 158func (rm *RepoManager) CarStore() carstore.CarStore { 159 return rm.cs 160} 161 162func (rm *RepoManager) CreateRecord(ctx context.Context, user models.Uid, collection string, rec cbg.CBORMarshaler) (string, cid.Cid, error) { 163 ctx, span := otel.Tracer("repoman").Start(ctx, "CreateRecord") 164 defer span.End() 165 166 unlock := rm.lockUser(ctx, user) 167 defer unlock() 168 169 rev, err := rm.cs.GetUserRepoRev(ctx, user) 170 if err != nil { 171 return "", cid.Undef, err 172 } 173 174 ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) 175 if err != nil { 176 return "", cid.Undef, err 177 } 178 179 head := ds.BaseCid() 180 181 r, err := repo.OpenRepo(ctx, ds, head) 182 if err != nil { 183 return "", cid.Undef, err 184 } 185 186 cc, tid, err := r.CreateRecord(ctx, collection, rec) 187 if err != nil { 188 return "", cid.Undef, err 189 } 190 191 nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) 192 if err != nil { 193 return "", cid.Undef, err 194 } 195 196 rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) 197 if err != nil { 198 return "", cid.Undef, fmt.Errorf("close with root: %w", err) 199 } 200 201 var oldroot *cid.Cid 202 if head.Defined() { 203 oldroot = &head 204 } 205 206 if rm.events != nil { 207 rm.events(ctx, &RepoEvent{ 208 User: user, 209 OldRoot: oldroot, 210 NewRoot: nroot, 211 Rev: nrev, 212 Since: &rev, 213 Ops: []RepoOp{{ 214 Kind: EvtKindCreateRecord, 215 Collection: collection, 216 Rkey: tid, 217 Record: rec, 218 RecCid: &cc, 219 }}, 220 RepoSlice: rslice, 221 }) 222 } 223 224 return collection + "/" + tid, cc, nil 225} 226 227func (rm *RepoManager) UpdateRecord(ctx context.Context, user models.Uid, collection, rkey string, rec cbg.CBORMarshaler) (cid.Cid, error) { 228 ctx, span := otel.Tracer("repoman").Start(ctx, "UpdateRecord") 229 defer span.End() 230 231 unlock := rm.lockUser(ctx, user) 232 defer unlock() 233 234 rev, err := rm.cs.GetUserRepoRev(ctx, user) 235 if err != nil { 236 return cid.Undef, err 237 } 238 239 ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) 240 if err != nil { 241 return cid.Undef, err 242 } 243 244 head := ds.BaseCid() 245 r, err := repo.OpenRepo(ctx, ds, head) 246 if err != nil { 247 return cid.Undef, err 248 } 249 250 rpath := collection + "/" + rkey 251 cc, err := r.PutRecord(ctx, rpath, rec) 252 if err != nil { 253 return cid.Undef, err 254 } 255 256 nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) 257 if err != nil { 258 return cid.Undef, err 259 } 260 261 rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) 262 if err != nil { 263 return cid.Undef, fmt.Errorf("close with root: %w", err) 264 } 265 266 var oldroot *cid.Cid 267 if head.Defined() { 268 oldroot = &head 269 } 270 271 if rm.events != nil { 272 op := RepoOp{ 273 Kind: EvtKindUpdateRecord, 274 Collection: collection, 275 Rkey: rkey, 276 RecCid: &cc, 277 } 278 279 if rm.hydrateRecords { 280 op.Record = rec 281 } 282 283 rm.events(ctx, &RepoEvent{ 284 User: user, 285 OldRoot: oldroot, 286 NewRoot: nroot, 287 Rev: nrev, 288 Since: &rev, 289 Ops: []RepoOp{op}, 290 RepoSlice: rslice, 291 }) 292 } 293 294 return cc, nil 295} 296 297func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collection, rkey string) error { 298 ctx, span := otel.Tracer("repoman").Start(ctx, "DeleteRecord") 299 defer span.End() 300 301 unlock := rm.lockUser(ctx, user) 302 defer unlock() 303 304 rev, err := rm.cs.GetUserRepoRev(ctx, user) 305 if err != nil { 306 return err 307 } 308 309 ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) 310 if err != nil { 311 return err 312 } 313 314 head := ds.BaseCid() 315 r, err := repo.OpenRepo(ctx, ds, head) 316 if err != nil { 317 return err 318 } 319 320 rpath := collection + "/" + rkey 321 if err := r.DeleteRecord(ctx, rpath); err != nil { 322 return err 323 } 324 325 nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) 326 if err != nil { 327 return err 328 } 329 330 rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) 331 if err != nil { 332 return fmt.Errorf("close with root: %w", err) 333 } 334 335 var oldroot *cid.Cid 336 if head.Defined() { 337 oldroot = &head 338 } 339 340 if rm.events != nil { 341 rm.events(ctx, &RepoEvent{ 342 User: user, 343 OldRoot: oldroot, 344 NewRoot: nroot, 345 Rev: nrev, 346 Since: &rev, 347 Ops: []RepoOp{{ 348 Kind: EvtKindDeleteRecord, 349 Collection: collection, 350 Rkey: rkey, 351 }}, 352 RepoSlice: rslice, 353 }) 354 } 355 356 return nil 357 358} 359 360func (rm *RepoManager) InitNewActor(ctx context.Context, user models.Uid, handle, did, displayname string, declcid, actortype string) error { 361 unlock := rm.lockUser(ctx, user) 362 defer unlock() 363 364 if did == "" { 365 return fmt.Errorf("must specify DID for new actor") 366 } 367 368 if user == 0 { 369 return fmt.Errorf("must specify user for new actor") 370 } 371 372 ds, err := rm.cs.NewDeltaSession(ctx, user, nil) 373 if err != nil { 374 return fmt.Errorf("creating new delta session: %w", err) 375 } 376 377 r := repo.NewRepo(ctx, did, ds) 378 379 profile := &bsky.ActorProfile{ 380 DisplayName: &displayname, 381 } 382 383 _, err = r.PutRecord(ctx, "app.bsky.actor.profile/self", profile) 384 if err != nil { 385 return fmt.Errorf("setting initial actor profile: %w", err) 386 } 387 388 root, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) 389 if err != nil { 390 return fmt.Errorf("committing repo for actor init: %w", err) 391 } 392 393 rslice, err := ds.CloseWithRoot(ctx, root, nrev) 394 if err != nil { 395 return fmt.Errorf("close with root: %w", err) 396 } 397 398 if rm.events != nil { 399 op := RepoOp{ 400 Kind: EvtKindCreateRecord, 401 Collection: "app.bsky.actor.profile", 402 Rkey: "self", 403 } 404 405 if rm.hydrateRecords { 406 op.Record = profile 407 } 408 409 rm.events(ctx, &RepoEvent{ 410 User: user, 411 NewRoot: root, 412 Rev: nrev, 413 Ops: []RepoOp{op}, 414 RepoSlice: rslice, 415 }) 416 } 417 418 return nil 419} 420 421func (rm *RepoManager) GetRepoRoot(ctx context.Context, user models.Uid) (cid.Cid, error) { 422 unlock := rm.lockUser(ctx, user) 423 defer unlock() 424 425 return rm.cs.GetUserRepoHead(ctx, user) 426} 427 428func (rm *RepoManager) GetRepoRev(ctx context.Context, user models.Uid) (string, error) { 429 unlock := rm.lockUser(ctx, user) 430 defer unlock() 431 432 return rm.cs.GetUserRepoRev(ctx, user) 433} 434 435func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, since string, w io.Writer) error { 436 return rm.cs.ReadUserCar(ctx, user, since, true, w) 437} 438 439func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, cbg.CBORMarshaler, error) { 440 bs, err := rm.cs.ReadOnlySession(user) 441 if err != nil { 442 return cid.Undef, nil, err 443 } 444 445 head, err := rm.cs.GetUserRepoHead(ctx, user) 446 if err != nil { 447 return cid.Undef, nil, err 448 } 449 450 r, err := repo.OpenRepo(ctx, bs, head) 451 if err != nil { 452 return cid.Undef, nil, err 453 } 454 455 ocid, val, err := r.GetRecord(ctx, collection+"/"+rkey) 456 if err != nil { 457 return cid.Undef, nil, err 458 } 459 460 if maybeCid.Defined() && ocid != maybeCid { 461 return cid.Undef, nil, fmt.Errorf("record at specified key had different CID than expected") 462 } 463 464 return ocid, val, nil 465} 466 467func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, collection string, rkey string) (cid.Cid, []blocks.Block, error) { 468 robs, err := rm.cs.ReadOnlySession(user) 469 if err != nil { 470 return cid.Undef, nil, err 471 } 472 473 bs := util.NewLoggingBstore(robs) 474 475 head, err := rm.cs.GetUserRepoHead(ctx, user) 476 if err != nil { 477 return cid.Undef, nil, err 478 } 479 480 r, err := repo.OpenRepo(ctx, bs, head) 481 if err != nil { 482 return cid.Undef, nil, err 483 } 484 485 _, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey) 486 if err != nil { 487 return cid.Undef, nil, err 488 } 489 490 return head, bs.GetLoggedBlocks(), nil 491} 492 493func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.ActorProfile, error) { 494 bs, err := rm.cs.ReadOnlySession(uid) 495 if err != nil { 496 return nil, err 497 } 498 499 head, err := rm.cs.GetUserRepoHead(ctx, uid) 500 if err != nil { 501 return nil, err 502 } 503 504 r, err := repo.OpenRepo(ctx, bs, head) 505 if err != nil { 506 return nil, err 507 } 508 509 _, val, err := r.GetRecord(ctx, "app.bsky.actor.profile/self") 510 if err != nil { 511 return nil, err 512 } 513 514 ap, ok := val.(*bsky.ActorProfile) 515 if !ok { 516 return nil, fmt.Errorf("found wrong type in actor profile location in tree") 517 } 518 519 return ap, nil 520} 521 522func (rm *RepoManager) CheckRepoSig(ctx context.Context, r *repo.Repo, expdid string) error { 523 ctx, span := otel.Tracer("repoman").Start(ctx, "CheckRepoSig") 524 defer span.End() 525 526 repoDid := r.RepoDid() 527 if expdid != repoDid { 528 return fmt.Errorf("DID in repo did not match (%q != %q)", expdid, repoDid) 529 } 530 531 scom := r.SignedCommit() 532 533 usc := scom.Unsigned() 534 sb, err := usc.BytesForSigning() 535 if err != nil { 536 return fmt.Errorf("commit serialization failed: %w", err) 537 } 538 if err := rm.kmgr.VerifyUserSignature(ctx, repoDid, scom.Sig, sb); err != nil { 539 return fmt.Errorf("signature check failed (sig: %x) (sb: %x) : %w", scom.Sig, sb, err) 540 } 541 542 return nil 543} 544 545func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { 546 if rm.noArchive { 547 return rm.handleExternalUserEventNoArchive(ctx, pdsid, uid, did, since, nrev, carslice, ops) 548 } else { 549 return rm.handleExternalUserEventArchive(ctx, pdsid, uid, did, since, nrev, carslice, ops) 550 } 551} 552 553func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { 554 ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent") 555 defer span.End() 556 557 span.SetAttributes(attribute.Int64("uid", int64(uid))) 558 559 rm.log.Debug("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev) 560 561 unlock := rm.lockUser(ctx, uid) 562 defer unlock() 563 564 start := time.Now() 565 root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice) 566 if err != nil { 567 return fmt.Errorf("importing external carslice: %w", err) 568 } 569 570 r, err := repo.OpenRepo(ctx, ds, root) 571 if err != nil { 572 return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err) 573 } 574 575 if err := rm.CheckRepoSig(ctx, r, did); err != nil { 576 return fmt.Errorf("check repo sig: %w", err) 577 } 578 openAndSigCheckDuration.Observe(time.Since(start).Seconds()) 579 580 evtops := make([]RepoOp, 0, len(ops)) 581 for _, op := range ops { 582 parts := strings.SplitN(op.Path, "/", 2) 583 if len(parts) != 2 { 584 return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey") 585 } 586 587 switch EventKind(op.Action) { 588 case EvtKindCreateRecord: 589 rop := RepoOp{ 590 Kind: EvtKindCreateRecord, 591 Collection: parts[0], 592 Rkey: parts[1], 593 RecCid: (*cid.Cid)(op.Cid), 594 } 595 596 if rm.hydrateRecords { 597 _, rec, err := r.GetRecord(ctx, op.Path) 598 if err != nil { 599 return fmt.Errorf("reading changed record from car slice: %w", err) 600 } 601 rop.Record = rec 602 } 603 604 evtops = append(evtops, rop) 605 case EvtKindUpdateRecord: 606 rop := RepoOp{ 607 Kind: EvtKindUpdateRecord, 608 Collection: parts[0], 609 Rkey: parts[1], 610 RecCid: (*cid.Cid)(op.Cid), 611 } 612 613 if rm.hydrateRecords { 614 _, rec, err := r.GetRecord(ctx, op.Path) 615 if err != nil { 616 return fmt.Errorf("reading changed record from car slice: %w", err) 617 } 618 619 rop.Record = rec 620 } 621 622 evtops = append(evtops, rop) 623 case EvtKindDeleteRecord: 624 evtops = append(evtops, RepoOp{ 625 Kind: EvtKindDeleteRecord, 626 Collection: parts[0], 627 Rkey: parts[1], 628 }) 629 default: 630 return fmt.Errorf("unrecognized external user event kind: %q", op.Action) 631 } 632 } 633 634 if rm.events != nil { 635 rm.events(ctx, &RepoEvent{ 636 User: uid, 637 //OldRoot: prev, 638 NewRoot: root, 639 Rev: nrev, 640 Since: since, 641 Ops: evtops, 642 RepoSlice: carslice, 643 PDS: pdsid, 644 }) 645 } 646 647 return nil 648} 649 650func (rm *RepoManager) handleExternalUserEventArchive(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error { 651 ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent") 652 defer span.End() 653 654 span.SetAttributes(attribute.Int64("uid", int64(uid))) 655 656 rm.log.Debug("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev) 657 658 unlock := rm.lockUser(ctx, uid) 659 defer unlock() 660 661 start := time.Now() 662 root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice) 663 if err != nil { 664 return fmt.Errorf("importing external carslice: %w", err) 665 } 666 667 r, err := repo.OpenRepo(ctx, ds, root) 668 if err != nil { 669 return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err) 670 } 671 672 if err := rm.CheckRepoSig(ctx, r, did); err != nil { 673 return err 674 } 675 openAndSigCheckDuration.Observe(time.Since(start).Seconds()) 676 677 var skipcids map[cid.Cid]bool 678 if ds.BaseCid().Defined() { 679 oldrepo, err := repo.OpenRepo(ctx, ds, ds.BaseCid()) 680 if err != nil { 681 return fmt.Errorf("failed to check data root in old repo: %w", err) 682 } 683 684 // if the old commit has a 'prev', CalcDiff will error out while trying 685 // to walk it. This is an old repo thing that is being deprecated. 686 // This check is a temporary workaround until all repos get migrated 687 // and this becomes no longer an issue 688 prev, _ := oldrepo.PrevCommit(ctx) 689 if prev != nil { 690 skipcids = map[cid.Cid]bool{ 691 *prev: true, 692 } 693 } 694 } 695 696 start = time.Now() 697 if err := ds.CalcDiff(ctx, skipcids); err != nil { 698 return fmt.Errorf("failed while calculating mst diff (since=%v): %w", since, err) 699 } 700 calcDiffDuration.Observe(time.Since(start).Seconds()) 701 702 evtops := make([]RepoOp, 0, len(ops)) 703 704 for _, op := range ops { 705 parts := strings.SplitN(op.Path, "/", 2) 706 if len(parts) != 2 { 707 return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey") 708 } 709 710 switch EventKind(op.Action) { 711 case EvtKindCreateRecord: 712 rop := RepoOp{ 713 Kind: EvtKindCreateRecord, 714 Collection: parts[0], 715 Rkey: parts[1], 716 RecCid: (*cid.Cid)(op.Cid), 717 } 718 719 if rm.hydrateRecords { 720 _, rec, err := r.GetRecord(ctx, op.Path) 721 if err != nil { 722 return fmt.Errorf("reading changed record from car slice: %w", err) 723 } 724 rop.Record = rec 725 } 726 727 evtops = append(evtops, rop) 728 case EvtKindUpdateRecord: 729 rop := RepoOp{ 730 Kind: EvtKindUpdateRecord, 731 Collection: parts[0], 732 Rkey: parts[1], 733 RecCid: (*cid.Cid)(op.Cid), 734 } 735 736 if rm.hydrateRecords { 737 _, rec, err := r.GetRecord(ctx, op.Path) 738 if err != nil { 739 return fmt.Errorf("reading changed record from car slice: %w", err) 740 } 741 742 rop.Record = rec 743 } 744 745 evtops = append(evtops, rop) 746 case EvtKindDeleteRecord: 747 evtops = append(evtops, RepoOp{ 748 Kind: EvtKindDeleteRecord, 749 Collection: parts[0], 750 Rkey: parts[1], 751 }) 752 default: 753 return fmt.Errorf("unrecognized external user event kind: %q", op.Action) 754 } 755 } 756 757 start = time.Now() 758 rslice, err := ds.CloseWithRoot(ctx, root, nrev) 759 if err != nil { 760 return fmt.Errorf("close with root: %w", err) 761 } 762 writeCarSliceDuration.Observe(time.Since(start).Seconds()) 763 764 if rm.events != nil { 765 rm.events(ctx, &RepoEvent{ 766 User: uid, 767 //OldRoot: prev, 768 NewRoot: root, 769 Rev: nrev, 770 Since: since, 771 Ops: evtops, 772 RepoSlice: rslice, 773 PDS: pdsid, 774 }) 775 } 776 777 return nil 778} 779 780func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes []*atproto.RepoApplyWrites_Input_Writes_Elem) error { 781 ctx, span := otel.Tracer("repoman").Start(ctx, "BatchWrite") 782 defer span.End() 783 784 unlock := rm.lockUser(ctx, user) 785 defer unlock() 786 787 rev, err := rm.cs.GetUserRepoRev(ctx, user) 788 if err != nil { 789 return err 790 } 791 792 ds, err := rm.cs.NewDeltaSession(ctx, user, &rev) 793 if err != nil { 794 return err 795 } 796 797 head := ds.BaseCid() 798 r, err := repo.OpenRepo(ctx, ds, head) 799 if err != nil { 800 return err 801 } 802 803 ops := make([]RepoOp, 0, len(writes)) 804 for _, w := range writes { 805 switch { 806 case w.RepoApplyWrites_Create != nil: 807 c := w.RepoApplyWrites_Create 808 var rkey string 809 if c.Rkey != nil { 810 rkey = *c.Rkey 811 } else { 812 rkey = rm.clk.Next().String() 813 } 814 815 nsid := c.Collection + "/" + rkey 816 cc, err := r.PutRecord(ctx, nsid, c.Value.Val) 817 if err != nil { 818 return err 819 } 820 821 op := RepoOp{ 822 Kind: EvtKindCreateRecord, 823 Collection: c.Collection, 824 Rkey: rkey, 825 RecCid: &cc, 826 } 827 828 if rm.hydrateRecords { 829 op.Record = c.Value.Val 830 } 831 832 ops = append(ops, op) 833 case w.RepoApplyWrites_Update != nil: 834 u := w.RepoApplyWrites_Update 835 836 cc, err := r.PutRecord(ctx, u.Collection+"/"+u.Rkey, u.Value.Val) 837 if err != nil { 838 return err 839 } 840 841 op := RepoOp{ 842 Kind: EvtKindUpdateRecord, 843 Collection: u.Collection, 844 Rkey: u.Rkey, 845 RecCid: &cc, 846 } 847 848 if rm.hydrateRecords { 849 op.Record = u.Value.Val 850 } 851 852 ops = append(ops, op) 853 case w.RepoApplyWrites_Delete != nil: 854 d := w.RepoApplyWrites_Delete 855 856 if err := r.DeleteRecord(ctx, d.Collection+"/"+d.Rkey); err != nil { 857 return err 858 } 859 860 ops = append(ops, RepoOp{ 861 Kind: EvtKindDeleteRecord, 862 Collection: d.Collection, 863 Rkey: d.Rkey, 864 }) 865 default: 866 return fmt.Errorf("no operation set in write enum") 867 } 868 } 869 870 nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser) 871 if err != nil { 872 return err 873 } 874 875 rslice, err := ds.CloseWithRoot(ctx, nroot, nrev) 876 if err != nil { 877 return fmt.Errorf("close with root: %w", err) 878 } 879 880 var oldroot *cid.Cid 881 if head.Defined() { 882 oldroot = &head 883 } 884 885 if rm.events != nil { 886 rm.events(ctx, &RepoEvent{ 887 User: user, 888 OldRoot: oldroot, 889 NewRoot: nroot, 890 RepoSlice: rslice, 891 Rev: nrev, 892 Since: &rev, 893 Ops: ops, 894 }) 895 } 896 897 return nil 898} 899 900func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoDid string, r io.Reader, rev *string) error { 901 ctx, span := otel.Tracer("repoman").Start(ctx, "ImportNewRepo") 902 defer span.End() 903 904 unlock := rm.lockUser(ctx, user) 905 defer unlock() 906 907 currev, err := rm.cs.GetUserRepoRev(ctx, user) 908 if err != nil { 909 return err 910 } 911 912 curhead, err := rm.cs.GetUserRepoHead(ctx, user) 913 if err != nil { 914 return err 915 } 916 917 if rev != nil && *rev == "" { 918 rev = nil 919 } 920 if rev == nil { 921 // if 'rev' is nil, this implies a fresh sync. 922 // in this case, ignore any existing blocks we have and treat this like a clean import. 923 curhead = cid.Undef 924 } 925 926 if rev != nil && *rev != currev { 927 // TODO: we could probably just deal with this 928 return fmt.Errorf("ImportNewRepo called with incorrect base") 929 } 930 931 err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error { 932 r, err := repo.OpenRepo(ctx, bs, root) 933 if err != nil { 934 return fmt.Errorf("opening new repo: %w", err) 935 } 936 937 scom := r.SignedCommit() 938 939 usc := scom.Unsigned() 940 sb, err := usc.BytesForSigning() 941 if err != nil { 942 return fmt.Errorf("commit serialization failed: %w", err) 943 } 944 if err := rm.kmgr.VerifyUserSignature(ctx, repoDid, scom.Sig, sb); err != nil { 945 return fmt.Errorf("new user signature check failed: %w", err) 946 } 947 948 diffops, err := r.DiffSince(ctx, curhead) 949 if err != nil { 950 return fmt.Errorf("diff trees (curhead: %s): %w", curhead, err) 951 } 952 953 ops := make([]RepoOp, 0, len(diffops)) 954 for _, op := range diffops { 955 repoOpsImported.Inc() 956 out, err := rm.processOp(ctx, bs, op, rm.hydrateRecords) 957 if err != nil { 958 rm.log.Error("failed to process repo op", "err", err, "path", op.Rpath, "repo", repoDid) 959 } 960 961 if out != nil { 962 ops = append(ops, *out) 963 } 964 } 965 966 slice, err := finish(ctx, scom.Rev) 967 if err != nil { 968 return err 969 } 970 971 if rm.events != nil { 972 rm.events(ctx, &RepoEvent{ 973 User: user, 974 //OldRoot: oldroot, 975 NewRoot: root, 976 Rev: scom.Rev, 977 Since: &currev, 978 RepoSlice: slice, 979 Ops: ops, 980 }) 981 } 982 983 return nil 984 }) 985 if err != nil { 986 return fmt.Errorf("process new repo (current rev: %s): %w:", currev, err) 987 } 988 989 return nil 990} 991 992func (rm *RepoManager) processOp(ctx context.Context, bs blockstore.Blockstore, op *mst.DiffOp, hydrateRecords bool) (*RepoOp, error) { 993 parts := strings.SplitN(op.Rpath, "/", 2) 994 if len(parts) != 2 { 995 return nil, fmt.Errorf("repo mst had invalid rpath: %q", op.Rpath) 996 } 997 998 switch op.Op { 999 case "add", "mut": 1000 1001 kind := EvtKindCreateRecord 1002 if op.Op == "mut" { 1003 kind = EvtKindUpdateRecord 1004 } 1005 1006 outop := &RepoOp{ 1007 Kind: kind, 1008 Collection: parts[0], 1009 Rkey: parts[1], 1010 RecCid: &op.NewCid, 1011 } 1012 1013 if hydrateRecords { 1014 blk, err := bs.Get(ctx, op.NewCid) 1015 if err != nil { 1016 return nil, err 1017 } 1018 1019 rec, err := lexutil.CborDecodeValue(blk.RawData()) 1020 if err != nil { 1021 if !errors.Is(err, lexutil.ErrUnrecognizedType) { 1022 return nil, err 1023 } 1024 1025 rm.log.Warn("failed processing repo diff", "err", err) 1026 } else { 1027 outop.Record = rec 1028 } 1029 } 1030 1031 return outop, nil 1032 case "del": 1033 return &RepoOp{ 1034 Kind: EvtKindDeleteRecord, 1035 Collection: parts[0], 1036 Rkey: parts[1], 1037 RecCid: nil, 1038 }, nil 1039 1040 default: 1041 return nil, fmt.Errorf("diff returned invalid op type: %q", op.Op) 1042 } 1043} 1044 1045func (rm *RepoManager) processNewRepo(ctx context.Context, user models.Uid, r io.Reader, rev *string, cb func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error) error { 1046 ctx, span := otel.Tracer("repoman").Start(ctx, "processNewRepo") 1047 defer span.End() 1048 1049 carr, err := car.NewCarReader(r) 1050 if err != nil { 1051 return err 1052 } 1053 1054 if len(carr.Header.Roots) != 1 { 1055 return fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots)) 1056 } 1057 1058 membs := blockstore.NewBlockstore(datastore.NewMapDatastore()) 1059 1060 for { 1061 blk, err := carr.Next() 1062 if err != nil { 1063 if err == io.EOF { 1064 break 1065 } 1066 return err 1067 } 1068 1069 if err := membs.Put(ctx, blk); err != nil { 1070 return err 1071 } 1072 } 1073 1074 seen := make(map[cid.Cid]bool) 1075 1076 root := carr.Header.Roots[0] 1077 // TODO: if there are blocks that get convergently recreated throughout 1078 // the repos lifecycle, this will end up erroneously not including 1079 // them. We should compute the set of blocks needed to read any repo 1080 // ops that happened in the commit and use that for our 'output' blocks 1081 cids, err := rm.walkTree(ctx, seen, root, membs, true) 1082 if err != nil { 1083 return fmt.Errorf("walkTree: %w", err) 1084 } 1085 1086 ds, err := rm.cs.NewDeltaSession(ctx, user, rev) 1087 if err != nil { 1088 return fmt.Errorf("opening delta session: %w", err) 1089 } 1090 1091 for _, c := range cids { 1092 blk, err := membs.Get(ctx, c) 1093 if err != nil { 1094 return fmt.Errorf("copying walked cids to carstore: %w", err) 1095 } 1096 1097 if err := ds.Put(ctx, blk); err != nil { 1098 return err 1099 } 1100 } 1101 1102 finish := func(ctx context.Context, nrev string) ([]byte, error) { 1103 return ds.CloseWithRoot(ctx, root, nrev) 1104 } 1105 1106 if err := cb(ctx, root, finish, ds); err != nil { 1107 return fmt.Errorf("cb errored root: %s, rev: %s: %w", root, stringOrNil(rev), err) 1108 } 1109 1110 return nil 1111} 1112 1113func stringOrNil(s *string) string { 1114 if s == nil { 1115 return "nil" 1116 } 1117 return *s 1118} 1119 1120// walkTree returns all cids linked recursively by the root, skipping any cids 1121// in the 'skip' map, and not erroring on 'not found' if prevMissing is set 1122func (rm *RepoManager) walkTree(ctx context.Context, skip map[cid.Cid]bool, root cid.Cid, bs blockstore.Blockstore, prevMissing bool) ([]cid.Cid, error) { 1123 // TODO: what if someone puts non-cbor links in their repo? 1124 if root.Prefix().Codec != cid.DagCBOR { 1125 return nil, fmt.Errorf("can only handle dag-cbor objects in repos (%s is %d)", root, root.Prefix().Codec) 1126 } 1127 1128 blk, err := bs.Get(ctx, root) 1129 if err != nil { 1130 return nil, err 1131 } 1132 1133 var links []cid.Cid 1134 if err := cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) { 1135 if c.Prefix().Codec == cid.Raw { 1136 rm.log.Debug("skipping 'raw' CID in record", "recordCid", root, "rawCid", c) 1137 return 1138 } 1139 if skip[c] { 1140 return 1141 } 1142 1143 links = append(links, c) 1144 skip[c] = true 1145 1146 return 1147 }); err != nil { 1148 return nil, err 1149 } 1150 1151 out := []cid.Cid{root} 1152 skip[root] = true 1153 1154 // TODO: should do this non-recursive since i expect these may get deep 1155 for _, c := range links { 1156 sub, err := rm.walkTree(ctx, skip, c, bs, prevMissing) 1157 if err != nil { 1158 if prevMissing && !ipld.IsNotFound(err) { 1159 return nil, err 1160 } 1161 } 1162 1163 out = append(out, sub...) 1164 } 1165 1166 return out, nil 1167} 1168 1169func (rm *RepoManager) TakeDownRepo(ctx context.Context, uid models.Uid) error { 1170 unlock := rm.lockUser(ctx, uid) 1171 defer unlock() 1172 1173 return rm.cs.WipeUserData(ctx, uid) 1174} 1175 1176// technically identical to TakeDownRepo, for now 1177func (rm *RepoManager) ResetRepo(ctx context.Context, uid models.Uid) error { 1178 unlock := rm.lockUser(ctx, uid) 1179 defer unlock() 1180 1181 return rm.cs.WipeUserData(ctx, uid) 1182} 1183 1184func (rm *RepoManager) VerifyRepo(ctx context.Context, uid models.Uid) error { 1185 ses, err := rm.cs.ReadOnlySession(uid) 1186 if err != nil { 1187 return err 1188 } 1189 1190 r, err := repo.OpenRepo(ctx, ses, ses.BaseCid()) 1191 if err != nil { 1192 return err 1193 } 1194 1195 if err := r.ForEach(ctx, "", func(k string, v cid.Cid) error { 1196 _, err := ses.Get(ctx, v) 1197 if err != nil { 1198 return fmt.Errorf("failed to get record %s (%s): %w", k, v, err) 1199 } 1200 1201 return nil 1202 }); err != nil { 1203 return err 1204 } 1205 1206 return nil 1207}