Live video on the AT Protocol
at eli/docs-url-fix 110 lines 2.7 kB view raw
1package resync 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 "golang.org/x/sync/errgroup" 9 "stream.place/streamplace/pkg/atproto" 10 "stream.place/streamplace/pkg/bus" 11 "stream.place/streamplace/pkg/config" 12 "stream.place/streamplace/pkg/log" 13 "stream.place/streamplace/pkg/model" 14) 15 16// resync a fresh database from the PDSses, copying over the few pieces of local state 17// that we have 18func Resync(ctx context.Context, cli *config.CLI) error { 19 oldMod, err := model.MakeDB(cli.DBPath) 20 if err != nil { 21 return err 22 } 23 tempDBPath := cli.DBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano()) 24 newMod, err := model.MakeDB(tempDBPath) 25 if err != nil { 26 return err 27 } 28 repos, err := oldMod.GetAllRepos() 29 if err != nil { 30 return err 31 } 32 33 atsync := &atproto.ATProtoSynchronizer{ 34 CLI: cli, 35 Model: newMod, 36 Noter: nil, 37 Bus: bus.NewBus(), 38 } 39 40 doneMap := make(map[string]bool) 41 42 g, ctx := errgroup.WithContext(ctx) 43 44 doneChan := make(chan string) 45 go func() { 46 for { 47 select { 48 case <-ctx.Done(): 49 return 50 case did := <-doneChan: 51 doneMap[did] = true 52 case <-time.After(10 * time.Second): 53 for _, repo := range repos { 54 if !doneMap[repo.DID] { 55 log.Warn(ctx, "remaining repos to sync", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 56 } 57 } 58 } 59 } 60 }() 61 62 for _, repo := range repos { 63 repo := repo // capture range variable 64 doneMap[repo.DID] = false 65 g.Go(func() error { 66 log.Warn(ctx, "syncing repo", "did", repo.DID, "handle", repo.Handle) 67 ctx := log.WithLogValues(ctx, "resyncDID", repo.DID, "resyncHandle", repo.Handle) 68 _, err := atsync.SyncBlueskyRepoCached(ctx, repo.Handle, newMod) 69 if err != nil { 70 log.Error(ctx, "failed to sync repo", "did", repo.DID, "handle", repo.Handle, "err", err) 71 return nil 72 } 73 log.Log(ctx, "synced repo", "did", repo.DID, "handle", repo.Handle) 74 doneChan <- repo.DID 75 return nil 76 }) 77 } 78 79 if err := g.Wait(); err != nil { 80 return err 81 } 82 83 oauthSessions, err := oldMod.ListOAuthSessions() 84 if err != nil { 85 return err 86 } 87 for _, session := range oauthSessions { 88 err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session) 89 if err != nil { 90 return fmt.Errorf("failed to create oauth session: %w", err) 91 } 92 } 93 log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions)) 94 95 notificationTokens, err := oldMod.ListNotifications() 96 if err != nil { 97 return err 98 } 99 for _, token := range notificationTokens { 100 err := newMod.CreateNotification(token.Token, token.RepoDID) 101 if err != nil { 102 return fmt.Errorf("failed to create notification: %w", err) 103 } 104 } 105 log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens)) 106 107 log.Log(ctx, "resync complete!", "newDBPath", tempDBPath) 108 109 return nil 110}