+23
-21
backfill/backfill.go
+23
-21
backfill/backfill.go
···
30
30
SetRev(ctx context.Context, rev string) error
31
31
RetryCount() int
32
32
33
-
BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error)
33
+
// BufferOps buffers the given operations and returns true if the operations
34
+
// were buffered.
35
+
// The given operations move the repo from since to rev.
36
+
BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error)
34
37
// FlushBufferedOps calls the given callback for each buffered operation
35
38
// Once done it clears the buffer and marks the job as "complete"
36
39
// Allowing the Job interface to abstract away the details of how buffered
···
465
468
return fmt.Errorf("failed to read event repo: %w", err)
466
469
}
467
470
468
-
var ops []*bufferedOp
471
+
var ops []*BufferedOp
469
472
for _, op := range evt.Ops {
470
473
kind := repomgr.EventKind(op.Action)
471
474
switch kind {
···
474
477
if err != nil {
475
478
return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err)
476
479
}
477
-
478
-
ops = append(ops, &bufferedOp{
479
-
kind: kind,
480
-
path: op.Path,
481
-
rec: rec,
482
-
cid: &cc,
480
+
ops = append(ops, &BufferedOp{
481
+
Kind: kind,
482
+
Path: op.Path,
483
+
Record: rec,
484
+
Cid: &cc,
483
485
})
484
486
case repomgr.EvtKindDeleteRecord:
485
-
ops = append(ops, &bufferedOp{
486
-
kind: kind,
487
-
path: op.Path,
487
+
ops = append(ops, &BufferedOp{
488
+
Kind: kind,
489
+
Path: op.Path,
488
490
})
489
491
default:
490
492
return fmt.Errorf("invalid op action: %q", op.Action)
···
511
513
}
512
514
513
515
for _, op := range ops {
514
-
switch op.kind {
516
+
switch op.Kind {
515
517
case repomgr.EvtKindCreateRecord:
516
-
if err := bf.HandleCreateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil {
518
+
if err := bf.HandleCreateRecord(ctx, evt.Repo, evt.Rev, op.Path, op.Record, op.Cid); err != nil {
517
519
return fmt.Errorf("create record failed: %w", err)
518
520
}
519
521
case repomgr.EvtKindUpdateRecord:
520
-
if err := bf.HandleUpdateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil {
522
+
if err := bf.HandleUpdateRecord(ctx, evt.Repo, evt.Rev, op.Path, op.Record, op.Cid); err != nil {
521
523
return fmt.Errorf("update record failed: %w", err)
522
524
}
523
525
case repomgr.EvtKindDeleteRecord:
524
-
if err := bf.HandleDeleteRecord(ctx, evt.Repo, evt.Rev, op.path); err != nil {
526
+
if err := bf.HandleDeleteRecord(ctx, evt.Repo, evt.Rev, op.Path); err != nil {
525
527
return fmt.Errorf("delete record failed: %w", err)
526
528
}
527
529
}
···
535
537
}
536
538
537
539
func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error) {
538
-
return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{{
539
-
path: path,
540
-
kind: kind,
541
-
rec: rec,
542
-
cid: cid,
540
+
return bf.BufferOps(ctx, repo, since, rev, []*BufferedOp{{
541
+
Path: path,
542
+
Kind: kind,
543
+
Record: rec,
544
+
Cid: cid,
543
545
}})
544
546
}
545
547
546
-
func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*bufferedOp) (bool, error) {
548
+
func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*BufferedOp) (bool, error) {
547
549
j, err := bf.Store.GetJob(ctx, repo)
548
550
if err != nil {
549
551
if !errors.Is(err, ErrJobNotFound) {
+2
-2
backfill/gormstore.go
+2
-2
backfill/gormstore.go
···
168
168
return nil
169
169
}
170
170
171
-
func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) {
171
+
func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error) {
172
172
j.lk.Lock()
173
173
defer j.lk.Unlock()
174
174
···
373
373
}
374
374
375
375
for _, op := range opset.ops {
376
-
if err := fn(op.kind, opset.rev, op.path, op.rec, op.cid); err != nil {
376
+
if err := fn(op.Kind, opset.rev, op.Path, op.Record, op.Cid); err != nil {
377
377
return err
378
378
}
379
379
}
+19
-14
backfill/memstore.go
+19
-14
backfill/memstore.go
···
10
10
"github.com/ipfs/go-cid"
11
11
)
12
12
13
-
type bufferedOp struct {
14
-
kind repomgr.EventKind
15
-
path string
16
-
rec *[]byte
17
-
cid *cid.Cid
13
+
// A BufferedOp is an operation buffered while a repo is being backfilled.
14
+
type BufferedOp struct {
15
+
// Kind describes the type of operation.
16
+
Kind repomgr.EventKind
17
+
// Path contains the path the operation applies to.
18
+
Path string
19
+
// Record contains the serialized record for create and update operations.
20
+
Record *[]byte
21
+
// Cid is the CID of the record.
22
+
Cid *cid.Cid
18
23
}
19
24
20
25
type opSet struct {
21
26
since *string
22
27
rev string
23
-
ops []*bufferedOp
28
+
ops []*BufferedOp
24
29
}
25
30
26
31
type Memjob struct {
···
107
112
j.bufferedOps = append(j.bufferedOps, &opSet{
108
113
since: since,
109
114
rev: rev,
110
-
ops: []*bufferedOp{&bufferedOp{
111
-
path: path,
112
-
kind: kind,
113
-
rec: rec,
114
-
cid: cid,
115
+
ops: []*BufferedOp{&BufferedOp{
116
+
Path: path,
117
+
Kind: kind,
118
+
Record: rec,
119
+
Cid: cid,
115
120
}},
116
121
})
117
122
j.updatedAt = time.Now()
118
123
return true, nil
119
124
}
120
125
121
-
func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*bufferedOp) (bool, error) {
126
+
func (j *Memjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error) {
122
127
j.lk.Lock()
123
128
defer j.lk.Unlock()
124
129
···
208
213
209
214
for _, opset := range j.bufferedOps {
210
215
for _, op := range opset.ops {
211
-
if err := fn(op.kind, op.path, op.rec, op.cid); err != nil {
216
+
if err := fn(op.Kind, op.Path, op.Record, op.Cid); err != nil {
212
217
return err
213
218
}
214
219
}
215
220
}
216
221
217
-
j.bufferedOps = map[string][]*bufferedOp{}
222
+
j.bufferedOps = map[string][]*BufferedOp{}
218
223
j.state = StateComplete
219
224
220
225
return nil