porting all github actions from bluesky-social/indigo to tangled CI
at main 2.9 kB view raw
1package backfill_test 2 3import ( 4 "context" 5 "log/slog" 6 "sync" 7 "testing" 8 9 "github.com/ipfs/go-cid" 10 typegen "github.com/whyrusleeping/cbor-gen" 11) 12 13type testState struct { 14 creates int 15 updates int 16 deletes int 17 lk sync.Mutex 18} 19 20func TestBackfill(t *testing.T) { 21 /* this test depends on being able to hit the live production bgs... 22 ctx := context.Background() 23 24 testRepos := []string{ 25 "did:plc:q6gjnaw2blty4crticxkmujt", 26 "did:plc:f5f4diimystr7ima7nqvamhe", 27 "did:plc:t7y4sud4dhptvzz7ibnv5cbt", 28 } 29 30 db, err := gorm.Open(sqlite.Open("sqlite://:memory")) 31 if err != nil { 32 t.Fatal(err) 33 } 34 35 store := backfill.NewGormstore(db) 36 ts := &testState{} 37 38 opts := backfill.DefaultBackfillOptions() 39 opts.CheckoutPath = "https://bsky.network/xrpc/com.atproto.sync.getRepo" 40 opts.NSIDFilter = "app.bsky.feed.follow/" 41 42 bf := backfill.NewBackfiller( 43 "backfill-test", 44 store, 45 ts.handleCreate, 46 ts.handleUpdate, 47 ts.handleDelete, 48 opts, 49 ) 50 51 slog.Info("starting backfiller") 52 53 go bf.Start() 54 55 for _, repo := range testRepos { 56 store.EnqueueJob(repo) 57 } 58 59 // Wait until job 0 is in progress 60 for { 61 s, err := store.GetJob(ctx, testRepos[0]) 62 if err != nil { 63 t.Fatal(err) 64 } 65 if s.State() == backfill.StateInProgress { 66 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/1", nil, &cid.Undef) 67 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/2", nil, &cid.Undef) 68 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/3", nil, &cid.Undef) 69 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/4", nil, &cid.Undef) 70 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindDeleteRecord, "app.bsky.feed.follow/5", nil, &cid.Undef) 71 72 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindCreateRecord, "app.bsky.feed.follow/1", nil, &cid.Undef) 73 74 bf.BufferOp(ctx, testRepos[0], repomgr.EvtKindUpdateRecord, "app.bsky.feed.follow/1", nil, &cid.Undef) 75 76 break 77 } 78 time.Sleep(100 * time.Millisecond) 79 } 80 81 for { 82 ts.lk.Lock() 83 if ts.deletes >= 5 && ts.creates >= 1 && ts.updates >= 1 { 84 ts.lk.Unlock() 85 break 86 } 87 ts.lk.Unlock() 88 time.Sleep(100 * time.Millisecond) 89 } 90 91 bf.Stop() 92 93 slog.Info("shutting down") 94 */ 95} 96 97func (ts *testState) handleCreate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { 98 slog.Info("got create", "repo", repo, "path", path) 99 ts.lk.Lock() 100 ts.creates++ 101 ts.lk.Unlock() 102 return nil 103} 104 105func (ts *testState) handleUpdate(ctx context.Context, repo string, path string, rec typegen.CBORMarshaler, cid *cid.Cid) error { 106 slog.Info("got update", "repo", repo, "path", path) 107 ts.lk.Lock() 108 ts.updates++ 109 ts.lk.Unlock() 110 return nil 111} 112 113func (ts *testState) handleDelete(ctx context.Context, repo string, path string) error { 114 slog.Info("got delete", "repo", repo, "path", path) 115 ts.lk.Lock() 116 ts.deletes++ 117 ts.lk.Unlock() 118 return nil 119}