+14
-13
backfill/backfill.go
+14
-13
backfill/backfill.go
···
35
// Once done it clears the buffer and marks the job as "complete"
36
// Allowing the Job interface to abstract away the details of how buffered
37
// operations are stored and/or locked
38
-
FlushBufferedOps(ctx context.Context, cb func(kind, rev, path string, rec *[]byte, cid *cid.Cid) error) error
39
40
ClearBufferedOps(ctx context.Context) error
41
}
···
236
repo := job.Repo()
237
238
// Flush buffered operations, clear the buffer, and mark the job as "complete"
239
-
// Clearning and marking are handled by the job interface
240
-
err := job.FlushBufferedOps(ctx, func(kind, rev, path string, rec *[]byte, cid *cid.Cid) error {
241
-
switch repomgr.EventKind(kind) {
242
case repomgr.EvtKindCreateRecord:
243
err := b.HandleCreateRecord(ctx, repo, rev, path, rec, cid)
244
if err != nil {
···
467
468
var ops []*bufferedOp
469
for _, op := range evt.Ops {
470
-
switch op.Action {
471
-
case "create", "update":
472
cc, rec, err := bf.getRecord(ctx, r, op)
473
if err != nil {
474
return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err)
475
}
476
477
ops = append(ops, &bufferedOp{
478
-
kind: op.Action,
479
path: op.Path,
480
rec: rec,
481
cid: &cc,
482
})
483
-
case "delete":
484
ops = append(ops, &bufferedOp{
485
-
kind: op.Action,
486
path: op.Path,
487
})
488
default:
···
511
512
for _, op := range ops {
513
switch op.kind {
514
-
case "create":
515
if err := bf.HandleCreateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil {
516
return fmt.Errorf("create record failed: %w", err)
517
}
518
-
case "update":
519
if err := bf.HandleUpdateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil {
520
return fmt.Errorf("update record failed: %w", err)
521
}
522
-
case "delete":
523
if err := bf.HandleDeleteRecord(ctx, evt.Repo, evt.Rev, op.path); err != nil {
524
return fmt.Errorf("delete record failed: %w", err)
525
}
···
533
return nil
534
}
535
536
-
func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec *[]byte, cid *cid.Cid) (bool, error) {
537
return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{{
538
path: path,
539
kind: kind,
···
35
// Once done it clears the buffer and marks the job as "complete"
36
// Allowing the Job interface to abstract away the details of how buffered
37
// operations are stored and/or locked
38
+
FlushBufferedOps(ctx context.Context, cb func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error
39
40
ClearBufferedOps(ctx context.Context) error
41
}
···
236
repo := job.Repo()
237
238
// Flush buffered operations, clear the buffer, and mark the job as "complete"
239
+
// Clearing and marking are handled by the job interface
240
+
err := job.FlushBufferedOps(ctx, func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error {
241
+
switch kind {
242
case repomgr.EvtKindCreateRecord:
243
err := b.HandleCreateRecord(ctx, repo, rev, path, rec, cid)
244
if err != nil {
···
467
468
var ops []*bufferedOp
469
for _, op := range evt.Ops {
470
+
kind := repomgr.EventKind(op.Action)
471
+
switch kind {
472
+
case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
473
cc, rec, err := bf.getRecord(ctx, r, op)
474
if err != nil {
475
return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err)
476
}
477
478
ops = append(ops, &bufferedOp{
479
+
kind: kind,
480
path: op.Path,
481
rec: rec,
482
cid: &cc,
483
})
484
+
case repomgr.EvtKindDeleteRecord:
485
ops = append(ops, &bufferedOp{
486
+
kind: kind,
487
path: op.Path,
488
})
489
default:
···
512
513
for _, op := range ops {
514
switch op.kind {
515
+
case repomgr.EvtKindCreateRecord:
516
if err := bf.HandleCreateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil {
517
return fmt.Errorf("create record failed: %w", err)
518
}
519
+
case repomgr.EvtKindUpdateRecord:
520
if err := bf.HandleUpdateRecord(ctx, evt.Repo, evt.Rev, op.path, op.rec, op.cid); err != nil {
521
return fmt.Errorf("update record failed: %w", err)
522
}
523
+
case repomgr.EvtKindDeleteRecord:
524
if err := bf.HandleDeleteRecord(ctx, evt.Repo, evt.Rev, op.path); err != nil {
525
return fmt.Errorf("delete record failed: %w", err)
526
}
···
534
return nil
535
}
536
537
+
func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error) {
538
return bf.BufferOps(ctx, repo, since, rev, []*bufferedOp{{
539
path: path,
540
kind: kind,
+7
-7
backfill/backfill_test.go
+7
-7
backfill/backfill_test.go
···
63
t.Fatal(err)
64
}
65
if s.State() == backfill.StateInProgress {
66
-
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/1", nil, &cid.Undef)
67
-
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/2", nil, &cid.Undef)
68
-
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/3", nil, &cid.Undef)
69
-
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/4", nil, &cid.Undef)
70
-
bf.BufferOp(ctx, testRepos[0], "delete", "app.bsky.feed.follow/5", nil, &cid.Undef)
71
72
-
bf.BufferOp(ctx, testRepos[0], "create", "app.bsky.feed.follow/1", nil, &cid.Undef)
73
74
-
bf.BufferOp(ctx, testRepos[0], "update", "app.bsky.feed.follow/1", nil, &cid.Undef)
75
76
break
77
}
···
63
t.Fatal(err)
64
}
65
if s.State() == backfill.StateInProgress {
66
+
bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/1", nil, &cid.Undef)
67
+
bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/2", nil, &cid.Undef)
68
+
bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/3", nil, &cid.Undef)
69
+
bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/4", nil, &cid.Undef)
70
+
bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/5", nil, &cid.Undef)
71
72
+
bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindCreateRecord, "app.bsky.feed.follow/1", nil, &cid.Undef)
73
74
+
bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindUpdateRecord, "app.bsky.feed.follow/1", nil, &cid.Undef)
75
76
break
77
}
+2
-1
backfill/gormstore.go
+2
-1
backfill/gormstore.go
···
8
"sync"
9
"time"
10
11
"github.com/ipfs/go-cid"
12
"gorm.io/gorm"
13
)
···
328
return j.db.Save(j.dbj).Error
329
}
330
331
-
func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec *[]byte, cid *cid.Cid) error) error {
332
// TODO: this will block any events for this repo while this flush is ongoing, is that okay?
333
j.lk.Lock()
334
defer j.lk.Unlock()
···
8
"sync"
9
"time"
10
11
+
"github.com/bluesky-social/indigo/repomgr"
12
"github.com/ipfs/go-cid"
13
"gorm.io/gorm"
14
)
···
329
return j.db.Save(j.dbj).Error
330
}
331
332
+
func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error {
333
// TODO: this will block any events for this repo while this flush is ongoing, is that okay?
334
j.lk.Lock()
335
defer j.lk.Unlock()
+4
-3
backfill/memstore.go
+4
-3
backfill/memstore.go
···
6
"sync"
7
"time"
8
9
"github.com/ipfs/go-cid"
10
)
11
12
type bufferedOp struct {
13
-
kind string
14
path string
15
rec *[]byte
16
cid *cid.Cid
···
81
return nil
82
}
83
84
-
func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev, kind, path string, rec *[]byte, cid *cid.Cid) (bool, error) {
85
s.lk.Lock()
86
87
// If the job doesn't exist, we can't buffer an op for it
···
199
return nil
200
}
201
202
-
func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind, rev, path string, rec *[]byte, cid *cid.Cid) error) error {
203
panic("TODO: copy what we end up doing from the gormstore")
204
/*
205
j.lk.Lock()
···
6
"sync"
7
"time"
8
9
+
"github.com/bluesky-social/indigo/repomgr"
10
"github.com/ipfs/go-cid"
11
)
12
13
type bufferedOp struct {
14
+
kind repomgr.EventKind
15
path string
16
rec *[]byte
17
cid *cid.Cid
···
82
return nil
83
}
84
85
+
func (s *Memstore) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error) {
86
s.lk.Lock()
87
88
// If the job doesn't exist, we can't buffer an op for it
···
200
return nil
201
}
202
203
+
func (j *Memjob) FlushBufferedOps(ctx context.Context, fn func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error {
204
panic("TODO: copy what we end up doing from the gormstore")
205
/*
206
j.lk.Lock()