Live video on the AT Protocol
1package resync
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "golang.org/x/sync/errgroup"
9 "stream.place/streamplace/pkg/atproto"
10 "stream.place/streamplace/pkg/bus"
11 "stream.place/streamplace/pkg/config"
12 "stream.place/streamplace/pkg/log"
13 "stream.place/streamplace/pkg/model"
14)
15
16// resync a fresh database from the PDSses, copying over the few pieces of local state
17// that we have
18func Resync(ctx context.Context, cli *config.CLI) error {
19 oldMod, err := model.MakeDB(cli.IndexDBPath)
20 if err != nil {
21 return err
22 }
23 tempDBPath := cli.IndexDBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano())
24 newMod, err := model.MakeDB(tempDBPath)
25 if err != nil {
26 return err
27 }
28 repos, err := oldMod.GetAllRepos()
29 if err != nil {
30 return err
31 }
32
33 atsync := &atproto.ATProtoSynchronizer{
34 CLI: cli,
35 Model: newMod,
36 StatefulDB: nil, // TODO: Add StatefulDB for resync when migration is ready
37 Noter: nil,
38 Bus: bus.NewBus(),
39 }
40
41 doneMap := make(map[string]bool)
42
43 g, ctx := errgroup.WithContext(ctx)
44
45 doneChan := make(chan string)
46 go func() {
47 for {
48 select {
49 case <-ctx.Done():
50 return
51 case did := <-doneChan:
52 doneMap[did] = true
53 case <-time.After(10 * time.Second):
54 for _, repo := range repos {
55 if !doneMap[repo.DID] {
56 log.Warn(ctx, "remaining repos to sync", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS)
57 }
58 }
59 }
60 }
61 }()
62
63 for _, repo := range repos {
64 repo := repo // capture range variable
65 doneMap[repo.DID] = false
66 g.Go(func() error {
67 log.Warn(ctx, "syncing repo", "did", repo.DID, "handle", repo.Handle)
68 ctx := log.WithLogValues(ctx, "resyncDID", repo.DID, "resyncHandle", repo.Handle)
69 _, err := atsync.SyncBlueskyRepoCached(ctx, repo.Handle, newMod)
70 if err != nil {
71 log.Error(ctx, "failed to sync repo", "did", repo.DID, "handle", repo.Handle, "err", err)
72 return nil
73 }
74 log.Log(ctx, "synced repo", "did", repo.DID, "handle", repo.Handle)
75 doneChan <- repo.DID
76 return nil
77 })
78 }
79
80 if err := g.Wait(); err != nil {
81 return err
82 }
83
84 // TODO: Update OAuth session migration to use new statefulDB
85 // oauthSessions, err := oldMod.ListOAuthSessions()
86 // if err != nil {
87 // return err
88 // }
89 // for _, session := range oauthSessions {
90 // err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session)
91 // if err != nil {
92 // return fmt.Errorf("failed to create oauth session: %w", err)
93 // }
94 // }
95 // log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions))
96
97 // TODO: Update notification migration to use new statefulDB
98 // notificationTokens, err := oldMod.ListNotifications()
99 // if err != nil {
100 // return err
101 // }
102 // for _, token := range notificationTokens {
103 // err := newMod.CreateNotification(token.Token, token.RepoDID)
104 // if err != nil {
105 // return fmt.Errorf("failed to create notification: %w", err)
106 // }
107 // }
108 // log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens))
109
110 log.Log(ctx, "resync complete!", "newDBPath", tempDBPath)
111
112 return nil
113}