+4
-2
backfill/backfill.go
+4
-2
backfill/backfill.go
···
342
342
// Producer routine
343
343
go func() {
344
344
defer close(recordQueue)
345
-
r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
345
+
if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
346
346
numRecords++
347
347
recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid}
348
348
return nil
349
-
})
349
+
}); err != nil {
350
+
log.Error("failed to iterated records in repo", "err", err)
351
+
}
350
352
}()
351
353
352
354
// Consumer routines
+9
-12
carstore/bs.go
+9
-12
carstore/bs.go
···
861
861
return out
862
862
}
863
863
864
-
func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block) (map[cid.Cid]bool, error) {
864
+
func BlockDiff(ctx context.Context, bs blockstore.Blockstore, oldroot cid.Cid, newcids map[cid.Cid]blockformat.Block, skipmissing bool) (map[cid.Cid]bool, error) {
865
865
ctx, span := otel.Tracer("repo").Start(ctx, "BlockDiff")
866
866
defer span.End()
867
867
···
901
901
902
902
oblk, err := bs.Get(ctx, c)
903
903
if err != nil {
904
-
return nil, fmt.Errorf("get failed in old tree: %w", err)
904
+
if skipmissing && ipld.IsNotFound(err) {
905
+
log.Warnw("missing block in old tree", "root", oldroot, "missing", c)
906
+
} else {
907
+
return nil, fmt.Errorf("get failed in old tree: %w", err)
908
+
}
905
909
}
906
910
907
911
if err := cbg.ScanForLinks(bytes.NewReader(oblk.RawData()), func(lnk cid.Cid) {
···
956
960
}
957
961
}
958
962
959
-
rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks)
960
-
if err != nil {
961
-
return cid.Undef, nil, fmt.Errorf("block diff failed (base=%s,since=%v,rev=%s): %w", ds.baseCid, since, ds.lastRev, err)
962
-
}
963
-
964
-
ds.rmcids = rmcids
965
-
966
963
return carr.Header.Roots[0], ds, nil
967
964
}
968
965
969
-
func (ds *DeltaSession) CalcDiff(ctx context.Context, nroot cid.Cid) error {
970
-
rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks)
966
+
func (ds *DeltaSession) CalcDiff(ctx context.Context, skipmissing bool) error {
967
+
rmcids, err := BlockDiff(ctx, ds, ds.baseCid, ds.blks, skipmissing)
971
968
if err != nil {
972
-
return fmt.Errorf("block diff failed: %w", err)
969
+
return fmt.Errorf("block diff failed (base=%s,rev=%s): %w", ds.baseCid, ds.lastRev, err)
973
970
}
974
971
975
972
ds.rmcids = rmcids
+8
-8
carstore/repo_test.go
+8
-8
carstore/repo_test.go
···
103
103
t.Fatal(err)
104
104
}
105
105
106
-
rr, err := repo.OpenRepo(ctx, ds, head, true)
106
+
rr, err := repo.OpenRepo(ctx, ds, head)
107
107
if err != nil {
108
108
t.Fatal(err)
109
109
}
···
125
125
126
126
rev = nrev
127
127
128
-
if err := ds.CalcDiff(ctx, nroot); err != nil {
128
+
if err := ds.CalcDiff(ctx, false); err != nil {
129
129
t.Fatal(err)
130
130
}
131
131
···
188
188
t.Fatal(err)
189
189
}
190
190
191
-
rr, err := repo.OpenRepo(ctx, ds, head, true)
191
+
rr, err := repo.OpenRepo(ctx, ds, head)
192
192
if err != nil {
193
193
t.Fatal(err)
194
194
}
···
217
217
218
218
rev = nrev
219
219
220
-
if err := ds.CalcDiff(ctx, nroot); err != nil {
220
+
if err := ds.CalcDiff(ctx, false); err != nil {
221
221
t.Fatal(err)
222
222
}
223
223
···
338
338
b.Fatal(err)
339
339
}
340
340
341
-
rr, err := repo.OpenRepo(ctx, ds, head, true)
341
+
rr, err := repo.OpenRepo(ctx, ds, head)
342
342
if err != nil {
343
343
b.Fatal(err)
344
344
}
···
356
356
}
357
357
358
358
rev = nrev
359
-
if err := ds.CalcDiff(ctx, nroot); err != nil {
359
+
if err := ds.CalcDiff(ctx, false); err != nil {
360
360
b.Fatal(err)
361
361
}
362
362
···
386
386
b.ResetTimer()
387
387
for i := 0; i < b.N; i++ {
388
388
389
-
rr, err := repo.OpenRepo(ctx, bs, head, true)
389
+
rr, err := repo.OpenRepo(ctx, bs, head)
390
390
if err != nil {
391
391
b.Fatal(err)
392
392
}
···
424
424
b.ResetTimer()
425
425
for i := 0; i < b.N; i++ {
426
426
427
-
rr, err := repo.OpenRepo(ctx, bs, head, true)
427
+
rr, err := repo.OpenRepo(ctx, bs, head)
428
428
if err != nil {
429
429
b.Fatal(err)
430
430
}
+1
-1
cmd/gosky/main.go
+1
-1
cmd/gosky/main.go
+3
-3
repo/repo.go
+3
-3
repo/repo.go
···
107
107
return nil, err
108
108
}
109
109
110
-
return OpenRepo(ctx, bs, root, false)
110
+
return OpenRepo(ctx, bs, root)
111
111
}
112
112
113
113
func NewRepo(ctx context.Context, did string, bs blockstore.Blockstore) *Repo {
···
128
128
}
129
129
}
130
130
131
-
func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid, fullRepo bool) (*Repo, error) {
131
+
func OpenRepo(ctx context.Context, bs blockstore.Blockstore, root cid.Cid) (*Repo, error) {
132
132
cst := util.CborStore(bs)
133
133
134
134
var sc SignedCommit
···
363
363
364
364
var oldTree cid.Cid
365
365
if oldrepo.Defined() {
366
-
otherRepo, err := OpenRepo(ctx, r.bs, oldrepo, true)
366
+
otherRepo, err := OpenRepo(ctx, r.bs, oldrepo)
367
367
if err != nil {
368
368
return nil, err
369
369
}
+31
-9
repomgr/repomgr.go
+31
-9
repomgr/repomgr.go
···
163
163
164
164
head := ds.BaseCid()
165
165
166
-
r, err := repo.OpenRepo(ctx, ds, head, true)
166
+
r, err := repo.OpenRepo(ctx, ds, head)
167
167
if err != nil {
168
168
return "", cid.Undef, err
169
169
}
···
227
227
}
228
228
229
229
head := ds.BaseCid()
230
-
r, err := repo.OpenRepo(ctx, ds, head, true)
230
+
r, err := repo.OpenRepo(ctx, ds, head)
231
231
if err != nil {
232
232
return cid.Undef, err
233
233
}
···
297
297
}
298
298
299
299
head := ds.BaseCid()
300
-
r, err := repo.OpenRepo(ctx, ds, head, true)
300
+
r, err := repo.OpenRepo(ctx, ds, head)
301
301
if err != nil {
302
302
return err
303
303
}
···
432
432
return cid.Undef, nil, err
433
433
}
434
434
435
-
r, err := repo.OpenRepo(ctx, bs, head, true)
435
+
r, err := repo.OpenRepo(ctx, bs, head)
436
436
if err != nil {
437
437
return cid.Undef, nil, err
438
438
}
···
462
462
return cid.Undef, nil, err
463
463
}
464
464
465
-
r, err := repo.OpenRepo(ctx, bs, head, true)
465
+
r, err := repo.OpenRepo(ctx, bs, head)
466
466
if err != nil {
467
467
return cid.Undef, nil, err
468
468
}
···
486
486
return nil, err
487
487
}
488
488
489
-
r, err := repo.OpenRepo(ctx, bs, head, true)
489
+
r, err := repo.OpenRepo(ctx, bs, head)
490
490
if err != nil {
491
491
return nil, err
492
492
}
···
543
543
return fmt.Errorf("importing external carslice: %w", err)
544
544
}
545
545
546
-
r, err := repo.OpenRepo(ctx, ds, root, true)
546
+
r, err := repo.OpenRepo(ctx, ds, root)
547
547
if err != nil {
548
548
return fmt.Errorf("opening external user repo (%d, root=%s): %w", uid, root, err)
549
549
}
···
552
552
return err
553
553
}
554
554
555
+
var badPrev bool
556
+
if ds.BaseCid().Defined() {
557
+
oldrepo, err := repo.OpenRepo(ctx, ds, ds.BaseCid())
558
+
if err != nil {
559
+
return fmt.Errorf("failed to check data root in old repo: %w", err)
560
+
}
561
+
562
+
// if the old commit has a 'prev', CalcDiff will error out while trying
563
+
// to walk it. This is an old repo thing that is being deprecated.
564
+
// This check is a temporary workaround until all repos get migrated
565
+
// and this becomes no longer an issue
566
+
prev, _ := oldrepo.PrevCommit(ctx)
567
+
if prev != nil {
568
+
badPrev = true
569
+
}
570
+
}
571
+
572
+
if err := ds.CalcDiff(ctx, badPrev); err != nil {
573
+
return fmt.Errorf("failed while calculating mst diff (since=%v): %w", since, err)
574
+
575
+
}
576
+
555
577
var evtops []RepoOp
556
578
557
579
for _, op := range ops {
···
650
672
}
651
673
652
674
head := ds.BaseCid()
653
-
r, err := repo.OpenRepo(ctx, ds, head, true)
675
+
r, err := repo.OpenRepo(ctx, ds, head)
654
676
if err != nil {
655
677
return err
656
678
}
···
781
803
}
782
804
783
805
err = rm.processNewRepo(ctx, user, r, rev, func(ctx context.Context, root cid.Cid, finish func(context.Context, string) ([]byte, error), bs blockstore.Blockstore) error {
784
-
r, err := repo.OpenRepo(ctx, bs, root, true)
806
+
r, err := repo.OpenRepo(ctx, bs, root)
785
807
if err != nil {
786
808
return fmt.Errorf("opening new repo: %w", err)
787
809
}