Live video on the AT Protocol
at eli/postgres 113 lines 3.0 kB view raw
1package resync 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "golang.org/x/sync/errgroup" 9 "stream.place/streamplace/pkg/atproto" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/model" 14) 15 16// resync a fresh database from the PDSses, copying over the few pieces of local state 17// that we have 18func Resync(ctx context.Context, cli *config.CLI) error { 19 oldMod, err := model.MakeDB(cli.IndexDBPath) 20 if err != nil { 21 return err 22 } 23 tempDBPath := cli.IndexDBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano()) 24 newMod, err := model.MakeDB(tempDBPath) 25 if err != nil { 26 return err 27 } 28 repos, err := oldMod.GetAllRepos() 29 if err != nil { 30 return err 31 } 32 33 atsync := &atproto.ATProtoSynchronizer{ 34 CLI: cli, 35 Model: newMod, 36 StatefulDB: nil, // TODO: Add StatefulDB for resync when migration is ready 37 Noter: nil, 38 Bus: bus.NewBus(), 39 } 40 41 doneMap := make(map[string]bool) 42 43 g, ctx := errgroup.WithContext(ctx) 44 45 doneChan := make(chan string) 46 go func() { 47 for { 48 select { 49 case <-ctx.Done(): 50 return 51 case did := <-doneChan: 52 doneMap[did] = true 53 case <-time.After(10 * time.Second): 54 for _, repo := range repos { 55 if !doneMap[repo.DID] { 56 log.Warn(ctx, "remaining repos to sync", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 57 } 58 } 59 } 60 } 61 }() 62 63 for _, repo := range repos { 64 repo := repo // capture range variable 65 doneMap[repo.DID] = false 66 g.Go(func() error { 67 log.Warn(ctx, "syncing repo", "did", repo.DID, "handle", repo.Handle) 68 ctx := log.WithLogValues(ctx, "resyncDID", repo.DID, "resyncHandle", repo.Handle) 69 _, err := atsync.SyncBlueskyRepoCached(ctx, repo.Handle, newMod) 70 if err != nil { 71 log.Error(ctx, "failed to sync repo", "did", repo.DID, "handle", repo.Handle, "err", err) 72 return nil 73 } 74 log.Log(ctx, "synced repo", "did", repo.DID, "handle", repo.Handle) 75 doneChan <- repo.DID 76 return nil 77 }) 78 } 79 80 if err := g.Wait(); err != nil { 81 return err 82 } 83 84 // TODO: Update OAuth session migration to use new statefulDB 85 // oauthSessions, err := oldMod.ListOAuthSessions() 86 // if err != nil { 87 // return err 88 // } 89 // for _, session := range oauthSessions { 90 // err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session) 91 // if err != nil { 92 // return fmt.Errorf("failed to create oauth session: %w", err) 93 // } 94 // } 95 // log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 96 97 // TODO: Update notification migration to use new statefulDB 98 // notificationTokens, err := oldMod.ListNotifications() 99 // if err != nil { 100 // return err 101 // } 102 // for _, token := range notificationTokens { 103 // err := newMod.CreateNotification(token.Token, token.RepoDID) 104 // if err != nil { 105 // return fmt.Errorf("failed to create notification: %w", err) 106 // } 107 // } 108 // log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens)) 109 110 log.Log(ctx, "resync complete!", "newDBPath", tempDBPath) 111 112 return nil 113}