1package backfill_test
2
3import (
4 "context"
5 "log/slog"
6 "sync"
7 "testing"
8
9 "github.com/ipfs/go-cid"
10 typegen "github.com/whyrusleeping/cbor-gen"
11)
12
13type testState struct {
14 creates int
15 updates int
16 deletes int
17 lk sync.Mutex
18}
19
20func TestBackfill(t *testing.T) {
21 /* this test depends on being able to hit the live production bgs...
22 ctx := context.Background()
23
24 testRepos := []string{
25 "did:plc:q6gjnaw2blty4crticxkmujt",
26 "did:plc:f5f4diimystr7ima7nqvamhe",
27 "did:plc:t7y4sud4dhptvzz7ibnv5cbt",
28 }
29
30 db, err := gorm.Open(sqlite.Open("sqlite://:memory"))
31 if err != nil {
32 t.Fatal(err)
33 }
34
35 store := backfill.NewGormstore(db)
36 ts := &testState{}
37
38 opts := backfill.DefaultBackfillOptions()
39 opts.CheckoutPath = "https://bsky.network/xrpc/com.atproto.sync.getRepo"
40 opts.NSIDFilter = "app.bsky.feed.follow/"
41
42 bf := backfill.NewBackfiller(
43 "backfill-test",
44 store,
45 ts.handleCreate,
46 ts.handleUpdate,
47 ts.handleDelete,
48 opts,
49 )
50
51 slog.Info("starting backfiller")
52
53 go bf.Start()
54
55 for _, repo := range testRepos {
56 store.EnqueueJob(repo)
57 }
58
59 // Wait until job 0 is in progress
60 for {
61 s, err := store.GetJob(ctx, testRepos[0])
62 if err != nil {
63 t.Fatal(err)
64 }
65 if s.State() == backfill.StateInProgress {
66 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/1", nil, &cid.Undef)
67 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/2", nil, &cid.Undef)
68 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/3", nil, &cid.Undef)
69 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/4", nil, &cid.Undef)
70 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/5", nil, &cid.Undef)
71
72 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindCreateRecord, "app.bsky.feed.follow/1", nil, &cid.Undef)
73
74 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindUpdateRecord, "app.bsky.feed.follow/1", nil, &cid.Undef)
75
76 break
77 }
78 time.Sleep(100 * time.Millisecond)
79 }
80
81 for {
82 ts.lk.Lock()
83 if ts.deletes >= 5 && ts.creates >= 1 && ts.updates >= 1 {
84 ts.lk.Unlock()
85 break
86 }
87 ts.lk.Unlock()
88 time.Sleep(100 * time.Millisecond)
89 }
90
91 bf.Stop()
92
93 slog.Info("shutting down")
94 */
95}
96
97func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error {
98 slog.Info("got create", "repo", repo, "path", path)
99 ts.lk.Lock()
100 ts.creates++
101 ts.lk.Unlock()
102 return nil
103}
104
105func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error {
106 slog.Info("got update", "repo", repo, "path", path)
107 ts.lk.Lock()
108 ts.updates++
109 ts.lk.Unlock()
110 return nil
111}
112
113func (ts *testState) handleDelete(ctx context.Context, repo string, path string) error {
114 slog.Info("got delete", "repo", repo, "path", path)
115 ts.lk.Lock()
116 ts.deletes++
117 ts.lk.Unlock()
118 return nil
119}