1package repomgr
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "log/slog"
10 "strings"
11 "sync"
12 "time"
13
14 atproto "github.com/bluesky-social/indigo/api/atproto"
15 bsky "github.com/bluesky-social/indigo/api/bsky"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "github.com/bluesky-social/indigo/carstore"
18 lexutil "github.com/bluesky-social/indigo/lex/util"
19 "github.com/bluesky-social/indigo/models"
20 "github.com/bluesky-social/indigo/mst"
21 "github.com/bluesky-social/indigo/repo"
22 "github.com/bluesky-social/indigo/util"
23
24 blocks "github.com/ipfs/go-block-format"
25 "github.com/ipfs/go-cid"
26 "github.com/ipfs/go-datastore"
27 blockstore "github.com/ipfs/go-ipfs-blockstore"
28 ipld "github.com/ipfs/go-ipld-format"
29 "github.com/ipld/go-car"
30 cbg "github.com/whyrusleeping/cbor-gen"
31 "go.opentelemetry.io/otel"
32 "go.opentelemetry.io/otel/attribute"
33 "gorm.io/gorm"
34)
35
36func NewRepoManager(cs carstore.CarStore, kmgr KeyManager) *RepoManager {
37
38 var noArchive bool
39 if _, ok := cs.(*carstore.NonArchivalCarstore); ok {
40 noArchive = true
41 }
42
43 clk := syntax.NewTIDClock(0)
44
45 return &RepoManager{
46 cs: cs,
47 userLocks: make(map[models.Uid]*userLock),
48 kmgr: kmgr,
49 log: slog.Default().With("system", "repomgr"),
50 noArchive: noArchive,
51 clk: &clk,
52 }
53}
54
55type KeyManager interface {
56 VerifyUserSignature(context.Context, string, []byte, []byte) error
57 SignForUser(context.Context, string, []byte) ([]byte, error)
58}
59
60func (rm *RepoManager) SetEventHandler(cb func(context.Context, *RepoEvent), hydrateRecords bool) {
61 rm.events = cb
62 rm.hydrateRecords = hydrateRecords
63}
64
65type RepoManager struct {
66 cs carstore.CarStore
67 kmgr KeyManager
68
69 lklk sync.Mutex
70 userLocks map[models.Uid]*userLock
71
72 events func(context.Context, *RepoEvent)
73 hydrateRecords bool
74
75 log *slog.Logger
76 noArchive bool
77
78 clk *syntax.TIDClock
79}
80
81type ActorInfo struct {
82 Did string
83 Handle string
84 DisplayName string
85 Type string
86}
87
88type RepoEvent struct {
89 User models.Uid
90 OldRoot *cid.Cid
91 NewRoot cid.Cid
92 Since *string
93 Rev string
94 RepoSlice []byte
95 PDS uint
96 Ops []RepoOp
97}
98
99type RepoOp struct {
100 Kind EventKind
101 Collection string
102 Rkey string
103 RecCid *cid.Cid
104 Record any
105 ActorInfo *ActorInfo
106}
107
108type EventKind string
109
110const (
111 EvtKindCreateRecord = EventKind("create")
112 EvtKindUpdateRecord = EventKind("update")
113 EvtKindDeleteRecord = EventKind("delete")
114)
115
116type RepoHead struct {
117 gorm.Model
118 Usr models.Uid `gorm:"uniqueIndex"`
119 Root string
120}
121
122type userLock struct {
123 lk sync.Mutex
124 count int
125}
126
127func (rm *RepoManager) lockUser(ctx context.Context, user models.Uid) func() {
128 ctx, span := otel.Tracer("repoman").Start(ctx, "userLock")
129 defer span.End()
130
131 rm.lklk.Lock()
132
133 ulk, ok := rm.userLocks[user]
134 if !ok {
135 ulk = &userLock{}
136 rm.userLocks[user] = ulk
137 }
138
139 ulk.count++
140
141 rm.lklk.Unlock()
142
143 ulk.lk.Lock()
144
145 return func() {
146 rm.lklk.Lock()
147
148 ulk.lk.Unlock()
149 ulk.count--
150
151 if ulk.count == 0 {
152 delete(rm.userLocks, user)
153 }
154 rm.lklk.Unlock()
155 }
156}
157
158func (rm *RepoManager) CarStore() carstore.CarStore {
159 return rm.cs
160}
161
162func (rm *RepoManager) CreateRecord(ctx context.Context, user models.Uid, collection string, rec cbg.CBORMarshaler) (string, cid.Cid, error) {
163 ctx, span := otel.Tracer("repoman").Start(ctx, "CreateRecord")
164 defer span.End()
165
166 unlock := rm.lockUser(ctx, user)
167 defer unlock()
168
169 rev, err := rm.cs.GetUserRepoRev(ctx, user)
170 if err != nil {
171 return "", cid.Undef, err
172 }
173
174 ds, err := rm.cs.NewDeltaSession(ctx, user, &rev)
175 if err != nil {
176 return "", cid.Undef, err
177 }
178
179 head := ds.BaseCid()
180
181 r, err := repo.OpenRepo(ctx, ds, head)
182 if err != nil {
183 return "", cid.Undef, err
184 }
185
186 cc, tid, err := r.CreateRecord(ctx, collection, rec)
187 if err != nil {
188 return "", cid.Undef, err
189 }
190
191 nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
192 if err != nil {
193 return "", cid.Undef, err
194 }
195
196 rslice, err := ds.CloseWithRoot(ctx, nroot, nrev)
197 if err != nil {
198 return "", cid.Undef, fmt.Errorf("close with root: %w", err)
199 }
200
201 var oldroot *cid.Cid
202 if head.Defined() {
203 oldroot = &head
204 }
205
206 if rm.events != nil {
207 rm.events(ctx, &RepoEvent{
208 User: user,
209 OldRoot: oldroot,
210 NewRoot: nroot,
211 Rev: nrev,
212 Since: &rev,
213 Ops: []RepoOp{{
214 Kind: EvtKindCreateRecord,
215 Collection: collection,
216 Rkey: tid,
217 Record: rec,
218 RecCid: &cc,
219 }},
220 RepoSlice: rslice,
221 })
222 }
223
224 return collection + "/" + tid, cc, nil
225}
226
227func (rm *RepoManager) UpdateRecord(ctx context.Context, user models.Uid, collection, rkey string, rec cbg.CBORMarshaler) (cid.Cid, error) {
228 ctx, span := otel.Tracer("repoman").Start(ctx, "UpdateRecord")
229 defer span.End()
230
231 unlock := rm.lockUser(ctx, user)
232 defer unlock()
233
234 rev, err := rm.cs.GetUserRepoRev(ctx, user)
235 if err != nil {
236 return cid.Undef, err
237 }
238
239 ds, err := rm.cs.NewDeltaSession(ctx, user, &rev)
240 if err != nil {
241 return cid.Undef, err
242 }
243
244 head := ds.BaseCid()
245 r, err := repo.OpenRepo(ctx, ds, head)
246 if err != nil {
247 return cid.Undef, err
248 }
249
250 rpath := collection + "/" + rkey
251 cc, err := r.PutRecord(ctx, rpath, rec)
252 if err != nil {
253 return cid.Undef, err
254 }
255
256 nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
257 if err != nil {
258 return cid.Undef, err
259 }
260
261 rslice, err := ds.CloseWithRoot(ctx, nroot, nrev)
262 if err != nil {
263 return cid.Undef, fmt.Errorf("close with root: %w", err)
264 }
265
266 var oldroot *cid.Cid
267 if head.Defined() {
268 oldroot = &head
269 }
270
271 if rm.events != nil {
272 op := RepoOp{
273 Kind: EvtKindUpdateRecord,
274 Collection: collection,
275 Rkey: rkey,
276 RecCid: &cc,
277 }
278
279 if rm.hydrateRecords {
280 op.Record = rec
281 }
282
283 rm.events(ctx, &RepoEvent{
284 User: user,
285 OldRoot: oldroot,
286 NewRoot: nroot,
287 Rev: nrev,
288 Since: &rev,
289 Ops: []RepoOp{op},
290 RepoSlice: rslice,
291 })
292 }
293
294 return cc, nil
295}
296
297func (rm *RepoManager) DeleteRecord(ctx context.Context, user models.Uid, collection, rkey string) error {
298 ctx, span := otel.Tracer("repoman").Start(ctx, "DeleteRecord")
299 defer span.End()
300
301 unlock := rm.lockUser(ctx, user)
302 defer unlock()
303
304 rev, err := rm.cs.GetUserRepoRev(ctx, user)
305 if err != nil {
306 return err
307 }
308
309 ds, err := rm.cs.NewDeltaSession(ctx, user, &rev)
310 if err != nil {
311 return err
312 }
313
314 head := ds.BaseCid()
315 r, err := repo.OpenRepo(ctx, ds, head)
316 if err != nil {
317 return err
318 }
319
320 rpath := collection + "/" + rkey
321 if err := r.DeleteRecord(ctx, rpath); err != nil {
322 return err
323 }
324
325 nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
326 if err != nil {
327 return err
328 }
329
330 rslice, err := ds.CloseWithRoot(ctx, nroot, nrev)
331 if err != nil {
332 return fmt.Errorf("close with root: %w", err)
333 }
334
335 var oldroot *cid.Cid
336 if head.Defined() {
337 oldroot = &head
338 }
339
340 if rm.events != nil {
341 rm.events(ctx, &RepoEvent{
342 User: user,
343 OldRoot: oldroot,
344 NewRoot: nroot,
345 Rev: nrev,
346 Since: &rev,
347 Ops: []RepoOp{{
348 Kind: EvtKindDeleteRecord,
349 Collection: collection,
350 Rkey: rkey,
351 }},
352 RepoSlice: rslice,
353 })
354 }
355
356 return nil
357
358}
359
360func (rm *RepoManager) InitNewActor(ctx context.Context, user models.Uid, handle, did, displayname string, declcid, actortype string) error {
361 unlock := rm.lockUser(ctx, user)
362 defer unlock()
363
364 if did == "" {
365 return fmt.Errorf("must specify DID for new actor")
366 }
367
368 if user == 0 {
369 return fmt.Errorf("must specify user for new actor")
370 }
371
372 ds, err := rm.cs.NewDeltaSession(ctx, user, nil)
373 if err != nil {
374 return fmt.Errorf("creating new delta session: %w", err)
375 }
376
377 r := repo.NewRepo(ctx, did, ds)
378
379 profile := &bsky.ActorProfile{
380 DisplayName: &displayname,
381 }
382
383 _, err = r.PutRecord(ctx, "app.bsky.actor.profile/self", profile)
384 if err != nil {
385 return fmt.Errorf("setting initial actor profile: %w", err)
386 }
387
388 root, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
389 if err != nil {
390 return fmt.Errorf("committing repo for actor init: %w", err)
391 }
392
393 rslice, err := ds.CloseWithRoot(ctx, root, nrev)
394 if err != nil {
395 return fmt.Errorf("close with root: %w", err)
396 }
397
398 if rm.events != nil {
399 op := RepoOp{
400 Kind: EvtKindCreateRecord,
401 Collection: "app.bsky.actor.profile",
402 Rkey: "self",
403 }
404
405 if rm.hydrateRecords {
406 op.Record = profile
407 }
408
409 rm.events(ctx, &RepoEvent{
410 User: user,
411 NewRoot: root,
412 Rev: nrev,
413 Ops: []RepoOp{op},
414 RepoSlice: rslice,
415 })
416 }
417
418 return nil
419}
420
421func (rm *RepoManager) GetRepoRoot(ctx context.Context, user models.Uid) (cid.Cid, error) {
422 unlock := rm.lockUser(ctx, user)
423 defer unlock()
424
425 return rm.cs.GetUserRepoHead(ctx, user)
426}
427
428func (rm *RepoManager) GetRepoRev(ctx context.Context, user models.Uid) (string, error) {
429 unlock := rm.lockUser(ctx, user)
430 defer unlock()
431
432 return rm.cs.GetUserRepoRev(ctx, user)
433}
434
435func (rm *RepoManager) ReadRepo(ctx context.Context, user models.Uid, since string, w io.Writer) error {
436 return rm.cs.ReadUserCar(ctx, user, since, true, w)
437}
438
439func (rm *RepoManager) GetRecord(ctx context.Context, user models.Uid, collection string, rkey string, maybeCid cid.Cid) (cid.Cid, cbg.CBORMarshaler, error) {
440 bs, err := rm.cs.ReadOnlySession(user)
441 if err != nil {
442 return cid.Undef, nil, err
443 }
444
445 head, err := rm.cs.GetUserRepoHead(ctx, user)
446 if err != nil {
447 return cid.Undef, nil, err
448 }
449
450 r, err := repo.OpenRepo(ctx, bs, head)
451 if err != nil {
452 return cid.Undef, nil, err
453 }
454
455 ocid, val, err := r.GetRecord(ctx, collection+"/"+rkey)
456 if err != nil {
457 return cid.Undef, nil, err
458 }
459
460 if maybeCid.Defined() && ocid != maybeCid {
461 return cid.Undef, nil, fmt.Errorf("record at specified key had different CID than expected")
462 }
463
464 return ocid, val, nil
465}
466
467func (rm *RepoManager) GetRecordProof(ctx context.Context, user models.Uid, collection string, rkey string) (cid.Cid, []blocks.Block, error) {
468 robs, err := rm.cs.ReadOnlySession(user)
469 if err != nil {
470 return cid.Undef, nil, err
471 }
472
473 bs := util.NewLoggingBstore(robs)
474
475 head, err := rm.cs.GetUserRepoHead(ctx, user)
476 if err != nil {
477 return cid.Undef, nil, err
478 }
479
480 r, err := repo.OpenRepo(ctx, bs, head)
481 if err != nil {
482 return cid.Undef, nil, err
483 }
484
485 _, _, err = r.GetRecordBytes(ctx, collection+"/"+rkey)
486 if err != nil {
487 return cid.Undef, nil, err
488 }
489
490 return head, bs.GetLoggedBlocks(), nil
491}
492
493func (rm *RepoManager) GetProfile(ctx context.Context, uid models.Uid) (*bsky.ActorProfile, error) {
494 bs, err := rm.cs.ReadOnlySession(uid)
495 if err != nil {
496 return nil, err
497 }
498
499 head, err := rm.cs.GetUserRepoHead(ctx, uid)
500 if err != nil {
501 return nil, err
502 }
503
504 r, err := repo.OpenRepo(ctx, bs, head)
505 if err != nil {
506 return nil, err
507 }
508
509 _, val, err := r.GetRecord(ctx, "app.bsky.actor.profile/self")
510 if err != nil {
511 return nil, err
512 }
513
514 ap, ok := val.(*bsky.ActorProfile)
515 if !ok {
516 return nil, fmt.Errorf("found wrong type in actor profile location in tree")
517 }
518
519 return ap, nil
520}
521
522func (rm *RepoManager) CheckRepoSig(ctx context.Context, r *repo.Repo, expdid string) error {
523 ctx, span := otel.Tracer("repoman").Start(ctx, "CheckRepoSig")
524 defer span.End()
525
526 repoDid := r.RepoDid()
527 if expdid != repoDid {
528 return fmt.Errorf("DID in repo did not match (%q != %q)", expdid, repoDid)
529 }
530
531 scom := r.SignedCommit()
532
533 usc := scom.Unsigned()
534 sb, err := usc.BytesForSigning()
535 if err != nil {
536 return fmt.Errorf("commit serialization failed: %w", err)
537 }
538 if err := rm.kmgr.VerifyUserSignature(ctx, repoDid, scom.Sig, sb); err != nil {
539 return fmt.Errorf("signature check failed (sig: %x) (sb: %x) : %w", scom.Sig, sb, err)
540 }
541
542 return nil
543}
544
545func (rm *RepoManager) HandleExternalUserEvent(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error {
546 if rm.noArchive {
547 return rm.handleExternalUserEventNoArchive(ctx, pdsid, uid, did, since, nrev, carslice, ops)
548 } else {
549 return rm.handleExternalUserEventArchive(ctx, pdsid, uid, did, since, nrev, carslice, ops)
550 }
551}
552
553func (rm *RepoManager) handleExternalUserEventNoArchive(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error {
554 ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent")
555 defer span.End()
556
557 span.SetAttributes(attribute.Int64("uid", int64(uid)))
558
559 rm.log.Debug("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev)
560
561 unlock := rm.lockUser(ctx, uid)
562 defer unlock()
563
564 start := time.Now()
565 root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice)
566 if err != nil {
567 return fmt.Errorf("importing external carslice: %w", err)
568 }
569
570 r, err := repo.OpenRepo(ctx, ds, root)
571 if err != nil {
572 return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err)
573 }
574
575 if err := rm.CheckRepoSig(ctx, r, did); err != nil {
576 return fmt.Errorf("check repo sig: %w", err)
577 }
578 openAndSigCheckDuration.Observe(time.Since(start).Seconds())
579
580 evtops := make([]RepoOp, 0, len(ops))
581 for _, op := range ops {
582 parts := strings.SplitN(op.Path, "/", 2)
583 if len(parts) != 2 {
584 return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey")
585 }
586
587 switch EventKind(op.Action) {
588 case EvtKindCreateRecord:
589 rop := RepoOp{
590 Kind: EvtKindCreateRecord,
591 Collection: parts[0],
592 Rkey: parts[1],
593 RecCid: (*cid.Cid)(op.Cid),
594 }
595
596 if rm.hydrateRecords {
597 _, rec, err := r.GetRecord(ctx, op.Path)
598 if err != nil {
599 return fmt.Errorf("reading changed record from car slice: %w", err)
600 }
601 rop.Record = rec
602 }
603
604 evtops = append(evtops, rop)
605 case EvtKindUpdateRecord:
606 rop := RepoOp{
607 Kind: EvtKindUpdateRecord,
608 Collection: parts[0],
609 Rkey: parts[1],
610 RecCid: (*cid.Cid)(op.Cid),
611 }
612
613 if rm.hydrateRecords {
614 _, rec, err := r.GetRecord(ctx, op.Path)
615 if err != nil {
616 return fmt.Errorf("reading changed record from car slice: %w", err)
617 }
618
619 rop.Record = rec
620 }
621
622 evtops = append(evtops, rop)
623 case EvtKindDeleteRecord:
624 evtops = append(evtops, RepoOp{
625 Kind: EvtKindDeleteRecord,
626 Collection: parts[0],
627 Rkey: parts[1],
628 })
629 default:
630 return fmt.Errorf("unrecognized external user event kind: %q", op.Action)
631 }
632 }
633
634 if rm.events != nil {
635 rm.events(ctx, &RepoEvent{
636 User: uid,
637 //OldRoot: prev,
638 NewRoot: root,
639 Rev: nrev,
640 Since: since,
641 Ops: evtops,
642 RepoSlice: carslice,
643 PDS: pdsid,
644 })
645 }
646
647 return nil
648}
649
650func (rm *RepoManager) handleExternalUserEventArchive(ctx context.Context, pdsid uint, uid models.Uid, did string, since *string, nrev string, carslice []byte, ops []*atproto.SyncSubscribeRepos_RepoOp) error {
651 ctx, span := otel.Tracer("repoman").Start(ctx, "HandleExternalUserEvent")
652 defer span.End()
653
654 span.SetAttributes(attribute.Int64("uid", int64(uid)))
655
656 rm.log.Debug("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev)
657
658 unlock := rm.lockUser(ctx, uid)
659 defer unlock()
660
661 start := time.Now()
662 root, ds, err := rm.cs.ImportSlice(ctx, uid, since, carslice)
663 if err != nil {
664 return fmt.Errorf("importing external carslice: %w", err)
665 }
666
667 r, err := repo.OpenRepo(ctx, ds, root)
668 if err != nil {
669 return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err)
670 }
671
672 if err := rm.CheckRepoSig(ctx, r, did); err != nil {
673 return err
674 }
675 openAndSigCheckDuration.Observe(time.Since(start).Seconds())
676
677 var skipcids map[cid.Cid]bool
678 if ds.BaseCid().Defined() {
679 oldrepo, err := repo.OpenRepo(ctx, ds, ds.BaseCid())
680 if err != nil {
681 return fmt.Errorf("failed to check data root in old repo: %w", err)
682 }
683
684 // if the old commit has a 'prev', CalcDiff will error out while trying
685 // to walk it. This is an old repo thing that is being deprecated.
686 // This check is a temporary workaround until all repos get migrated
687 // and this becomes no longer an issue
688 prev, _ := oldrepo.PrevCommit(ctx)
689 if prev != nil {
690 skipcids = map[cid.Cid]bool{
691 *prev: true,
692 }
693 }
694 }
695
696 start = time.Now()
697 if err := ds.CalcDiff(ctx, skipcids); err != nil {
698 return fmt.Errorf("failed while calculating mst diff (since=%v): %w", since, err)
699 }
700 calcDiffDuration.Observe(time.Since(start).Seconds())
701
702 evtops := make([]RepoOp, 0, len(ops))
703
704 for _, op := range ops {
705 parts := strings.SplitN(op.Path, "/", 2)
706 if len(parts) != 2 {
707 return fmt.Errorf("invalid rpath in mst diff, must have collection and rkey")
708 }
709
710 switch EventKind(op.Action) {
711 case EvtKindCreateRecord:
712 rop := RepoOp{
713 Kind: EvtKindCreateRecord,
714 Collection: parts[0],
715 Rkey: parts[1],
716 RecCid: (*cid.Cid)(op.Cid),
717 }
718
719 if rm.hydrateRecords {
720 _, rec, err := r.GetRecord(ctx, op.Path)
721 if err != nil {
722 return fmt.Errorf("reading changed record from car slice: %w", err)
723 }
724 rop.Record = rec
725 }
726
727 evtops = append(evtops, rop)
728 case EvtKindUpdateRecord:
729 rop := RepoOp{
730 Kind: EvtKindUpdateRecord,
731 Collection: parts[0],
732 Rkey: parts[1],
733 RecCid: (*cid.Cid)(op.Cid),
734 }
735
736 if rm.hydrateRecords {
737 _, rec, err := r.GetRecord(ctx, op.Path)
738 if err != nil {
739 return fmt.Errorf("reading changed record from car slice: %w", err)
740 }
741
742 rop.Record = rec
743 }
744
745 evtops = append(evtops, rop)
746 case EvtKindDeleteRecord:
747 evtops = append(evtops, RepoOp{
748 Kind: EvtKindDeleteRecord,
749 Collection: parts[0],
750 Rkey: parts[1],
751 })
752 default:
753 return fmt.Errorf("unrecognized external user event kind: %q", op.Action)
754 }
755 }
756
757 start = time.Now()
758 rslice, err := ds.CloseWithRoot(ctx, root, nrev)
759 if err != nil {
760 return fmt.Errorf("close with root: %w", err)
761 }
762 writeCarSliceDuration.Observe(time.Since(start).Seconds())
763
764 if rm.events != nil {
765 rm.events(ctx, &RepoEvent{
766 User: uid,
767 //OldRoot: prev,
768 NewRoot: root,
769 Rev: nrev,
770 Since: since,
771 Ops: evtops,
772 RepoSlice: rslice,
773 PDS: pdsid,
774 })
775 }
776
777 return nil
778}
779
780func (rm *RepoManager) BatchWrite(ctx context.Context, user models.Uid, writes []*atproto.RepoApplyWrites_Input_Writes_Elem) error {
781 ctx, span := otel.Tracer("repoman").Start(ctx, "BatchWrite")
782 defer span.End()
783
784 unlock := rm.lockUser(ctx, user)
785 defer unlock()
786
787 rev, err := rm.cs.GetUserRepoRev(ctx, user)
788 if err != nil {
789 return err
790 }
791
792 ds, err := rm.cs.NewDeltaSession(ctx, user, &rev)
793 if err != nil {
794 return err
795 }
796
797 head := ds.BaseCid()
798 r, err := repo.OpenRepo(ctx, ds, head)
799 if err != nil {
800 return err
801 }
802
803 ops := make([]RepoOp, 0, len(writes))
804 for _, w := range writes {
805 switch {
806 case w.RepoApplyWrites_Create != nil:
807 c := w.RepoApplyWrites_Create
808 var rkey string
809 if c.Rkey != nil {
810 rkey = *c.Rkey
811 } else {
812 rkey = rm.clk.Next().String()
813 }
814
815 nsid := c.Collection + "/" + rkey
816 cc, err := r.PutRecord(ctx, nsid, c.Value.Val)
817 if err != nil {
818 return err
819 }
820
821 op := RepoOp{
822 Kind: EvtKindCreateRecord,
823 Collection: c.Collection,
824 Rkey: rkey,
825 RecCid: &cc,
826 }
827
828 if rm.hydrateRecords {
829 op.Record = c.Value.Val
830 }
831
832 ops = append(ops, op)
833 case w.RepoApplyWrites_Update != nil:
834 u := w.RepoApplyWrites_Update
835
836 cc, err := r.PutRecord(ctx, u.Collection+"/"+u.Rkey, u.Value.Val)
837 if err != nil {
838 return err
839 }
840
841 op := RepoOp{
842 Kind: EvtKindUpdateRecord,
843 Collection: u.Collection,
844 Rkey: u.Rkey,
845 RecCid: &cc,
846 }
847
848 if rm.hydrateRecords {
849 op.Record = u.Value.Val
850 }
851
852 ops = append(ops, op)
853 case w.RepoApplyWrites_Delete != nil:
854 d := w.RepoApplyWrites_Delete
855
856 if err := r.DeleteRecord(ctx, d.Collection+"/"+d.Rkey); err != nil {
857 return err
858 }
859
860 ops = append(ops, RepoOp{
861 Kind: EvtKindDeleteRecord,
862 Collection: d.Collection,
863 Rkey: d.Rkey,
864 })
865 default:
866 return fmt.Errorf("no operation set in write enum")
867 }
868 }
869
870 nroot, nrev, err := r.Commit(ctx, rm.kmgr.SignForUser)
871 if err != nil {
872 return err
873 }
874
875 rslice, err := ds.CloseWithRoot(ctx, nroot, nrev)
876 if err != nil {
877 return fmt.Errorf("close with root: %w", err)
878 }
879
880 var oldroot *cid.Cid
881 if head.Defined() {
882 oldroot = &head
883 }
884
885 if rm.events != nil {
886 rm.events(ctx, &RepoEvent{
887 User: user,
888 OldRoot: oldroot,
889 NewRoot: nroot,
890 RepoSlice: rslice,
891 Rev: nrev,
892 Since: &rev,
893 Ops: ops,
894 })
895 }
896
897 return nil
898}
899
900func (rm *RepoManager) ImportNewRepo(ctx context.Context, user models.Uid, repoDid string, r io.Reader, rev *string) error {
901 ctx, span := otel.Tracer("repoman").Start(ctx, "ImportNewRepo")
902 defer span.End()
903
904 unlock := rm.lockUser(ctx, user)
905 defer unlock()
906
907 currev, err := rm.cs.GetUserRepoRev(ctx, user)
908 if err != nil {
909 return err
910 }
911
912 curhead, err := rm.cs.GetUserRepoHead(ctx, user)
913 if err != nil {
914 return err
915 }
916
917 if rev != nil && *rev == "" {
918 rev = nil
919 }
920 if rev == nil {
921 // if 'rev' is nil, this implies a fresh sync.
922 // in this case, ignore any existing blocks we have and treat this like a clean import.
923 curhead = cid.Undef
924 }
925
926 if rev != nil && *rev != currev {
927 // TODO: we could probably just deal with this
928 return fmt.Errorf("ImportNewRepo called with incorrect base")
929 }
930
931 err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error {
932 r, err := repo.OpenRepo(ctx, bs, root)
933 if err != nil {
934 return fmt.Errorf("opening new repo: %w", err)
935 }
936
937 scom := r.SignedCommit()
938
939 usc := scom.Unsigned()
940 sb, err := usc.BytesForSigning()
941 if err != nil {
942 return fmt.Errorf("commit serialization failed: %w", err)
943 }
944 if err := rm.kmgr.VerifyUserSignature(ctx, repoDid, scom.Sig, sb); err != nil {
945 return fmt.Errorf("new user signature check failed: %w", err)
946 }
947
948 diffops, err := r.DiffSince(ctx, curhead)
949 if err != nil {
950 return fmt.Errorf("diff trees (curhead: %s): %w", curhead, err)
951 }
952
953 ops := make([]RepoOp, 0, len(diffops))
954 for _, op := range diffops {
955 repoOpsImported.Inc()
956 out, err := rm.processOp(ctx, bs, op, rm.hydrateRecords)
957 if err != nil {
958 rm.log.Error("failed to process repo op", "err", err, "path", op.Rpath, "repo", repoDid)
959 }
960
961 if out != nil {
962 ops = append(ops, *out)
963 }
964 }
965
966 slice, err := finish(ctx, scom.Rev)
967 if err != nil {
968 return err
969 }
970
971 if rm.events != nil {
972 rm.events(ctx, &RepoEvent{
973 User: user,
974 //OldRoot: oldroot,
975 NewRoot: root,
976 Rev: scom.Rev,
977 Since: &currev,
978 RepoSlice: slice,
979 Ops: ops,
980 })
981 }
982
983 return nil
984 })
985 if err != nil {
986 return fmt.Errorf("process new repo (current rev: %s): %w:", currev, err)
987 }
988
989 return nil
990}
991
992func (rm *RepoManager) processOp(ctx context.Context, bs blockstore.Blockstore, op *mst.DiffOp, hydrateRecords bool) (*RepoOp, error) {
993 parts := strings.SplitN(op.Rpath, "/", 2)
994 if len(parts) != 2 {
995 return nil, fmt.Errorf("repo mst had invalid rpath: %q", op.Rpath)
996 }
997
998 switch op.Op {
999 case "add", "mut":
1000
1001 kind := EvtKindCreateRecord
1002 if op.Op == "mut" {
1003 kind = EvtKindUpdateRecord
1004 }
1005
1006 outop := &RepoOp{
1007 Kind: kind,
1008 Collection: parts[0],
1009 Rkey: parts[1],
1010 RecCid: &op.NewCid,
1011 }
1012
1013 if hydrateRecords {
1014 blk, err := bs.Get(ctx, op.NewCid)
1015 if err != nil {
1016 return nil, err
1017 }
1018
1019 rec, err := lexutil.CborDecodeValue(blk.RawData())
1020 if err != nil {
1021 if !errors.Is(err, lexutil.ErrUnrecognizedType) {
1022 return nil, err
1023 }
1024
1025 rm.log.Warn("failed processing repo diff", "err", err)
1026 } else {
1027 outop.Record = rec
1028 }
1029 }
1030
1031 return outop, nil
1032 case "del":
1033 return &RepoOp{
1034 Kind: EvtKindDeleteRecord,
1035 Collection: parts[0],
1036 Rkey: parts[1],
1037 RecCid: nil,
1038 }, nil
1039
1040 default:
1041 return nil, fmt.Errorf("diff returned invalid op type: %q", op.Op)
1042 }
1043}
1044
1045func (rm *RepoManager) processNewRepo(ctx context.Context, user models.Uid, r io.Reader, rev *string, cb func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error) error {
1046 ctx, span := otel.Tracer("repoman").Start(ctx, "processNewRepo")
1047 defer span.End()
1048
1049 carr, err := car.NewCarReader(r)
1050 if err != nil {
1051 return err
1052 }
1053
1054 if len(carr.Header.Roots) != 1 {
1055 return fmt.Errorf("invalid car file, header must have a single root (has %d)", len(carr.Header.Roots))
1056 }
1057
1058 membs := blockstore.NewBlockstore(datastore.NewMapDatastore())
1059
1060 for {
1061 blk, err := carr.Next()
1062 if err != nil {
1063 if err == io.EOF {
1064 break
1065 }
1066 return err
1067 }
1068
1069 if err := membs.Put(ctx, blk); err != nil {
1070 return err
1071 }
1072 }
1073
1074 seen := make(map[cid.Cid]bool)
1075
1076 root := carr.Header.Roots[0]
1077 // TODO: if there are blocks that get convergently recreated throughout
1078 // the repos lifecycle, this will end up erroneously not including
1079 // them. We should compute the set of blocks needed to read any repo
1080 // ops that happened in the commit and use that for our 'output' blocks
1081 cids, err := rm.walkTree(ctx, seen, root, membs, true)
1082 if err != nil {
1083 return fmt.Errorf("walkTree: %w", err)
1084 }
1085
1086 ds, err := rm.cs.NewDeltaSession(ctx, user, rev)
1087 if err != nil {
1088 return fmt.Errorf("opening delta session: %w", err)
1089 }
1090
1091 for _, c := range cids {
1092 blk, err := membs.Get(ctx, c)
1093 if err != nil {
1094 return fmt.Errorf("copying walked cids to carstore: %w", err)
1095 }
1096
1097 if err := ds.Put(ctx, blk); err != nil {
1098 return err
1099 }
1100 }
1101
1102 finish := func(ctx context.Context, nrev string) ([]byte, error) {
1103 return ds.CloseWithRoot(ctx, root, nrev)
1104 }
1105
1106 if err := cb(ctx, root, finish, ds); err != nil {
1107 return fmt.Errorf("cb errored root: %s, rev: %s: %w", root, stringOrNil(rev), err)
1108 }
1109
1110 return nil
1111}
1112
1113func stringOrNil(s *string) string {
1114 if s == nil {
1115 return "nil"
1116 }
1117 return *s
1118}
1119
1120// walkTree returns all cids linked recursively by the root, skipping any cids
1121// in the 'skip' map, and not erroring on 'not found' if prevMissing is set
1122func (rm *RepoManager) walkTree(ctx context.Context, skip map[cid.Cid]bool, root cid.Cid, bs blockstore.Blockstore, prevMissing bool) ([]cid.Cid, error) {
1123 // TODO: what if someone puts non-cbor links in their repo?
1124 if root.Prefix().Codec != cid.DagCBOR {
1125 return nil, fmt.Errorf("can only handle dag-cbor objects in repos (%s is %d)", root, root.Prefix().Codec)
1126 }
1127
1128 blk, err := bs.Get(ctx, root)
1129 if err != nil {
1130 return nil, err
1131 }
1132
1133 var links []cid.Cid
1134 if err := cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) {
1135 if c.Prefix().Codec == cid.Raw {
1136 rm.log.Debug("skipping 'raw' CID in record", "recordCid", root, "rawCid", c)
1137 return
1138 }
1139 if skip[c] {
1140 return
1141 }
1142
1143 links = append(links, c)
1144 skip[c] = true
1145
1146 return
1147 }); err != nil {
1148 return nil, err
1149 }
1150
1151 out := []cid.Cid{root}
1152 skip[root] = true
1153
1154 // TODO: should do this non-recursive since i expect these may get deep
1155 for _, c := range links {
1156 sub, err := rm.walkTree(ctx, skip, c, bs, prevMissing)
1157 if err != nil {
1158 if prevMissing && !ipld.IsNotFound(err) {
1159 return nil, err
1160 }
1161 }
1162
1163 out = append(out, sub...)
1164 }
1165
1166 return out, nil
1167}
1168
1169func (rm *RepoManager) TakeDownRepo(ctx context.Context, uid models.Uid) error {
1170 unlock := rm.lockUser(ctx, uid)
1171 defer unlock()
1172
1173 return rm.cs.WipeUserData(ctx, uid)
1174}
1175
1176// technically identical to TakeDownRepo, for now
1177func (rm *RepoManager) ResetRepo(ctx context.Context, uid models.Uid) error {
1178 unlock := rm.lockUser(ctx, uid)
1179 defer unlock()
1180
1181 return rm.cs.WipeUserData(ctx, uid)
1182}
1183
1184func (rm *RepoManager) VerifyRepo(ctx context.Context, uid models.Uid) error {
1185 ses, err := rm.cs.ReadOnlySession(uid)
1186 if err != nil {
1187 return err
1188 }
1189
1190 r, err := repo.OpenRepo(ctx, ses, ses.BaseCid())
1191 if err != nil {
1192 return err
1193 }
1194
1195 if err := r.ForEach(ctx, "", func(k string, v cid.Cid) error {
1196 _, err := ses.Get(ctx, v)
1197 if err != nil {
1198 return fmt.Errorf("failed to get record %s (%s): %w", k, v, err)
1199 }
1200
1201 return nil
1202 }); err != nil {
1203 return err
1204 }
1205
1206 return nil
1207}