Live video on the AT Protocol
1package resync
2
3import (
4 "context"
5 "fmt"
6 "time"
7
8 "golang.org/x/sync/errgroup"
9 "stream.place/streamplace/pkg/atproto"
10 "stream.place/streamplace/pkg/bus"
11 "stream.place/streamplace/pkg/config"
12 "stream.place/streamplace/pkg/log"
13 "stream.place/streamplace/pkg/model"
14)
15
16// resync a fresh database from the PDSses, copying over the few pieces of local state
17// that we have
18func Resync(ctx context.Context, cli *config.CLI) error {
19 oldMod, err := model.MakeDB(cli.DBPath)
20 if err != nil {
21 return err
22 }
23 tempDBPath := cli.DBPath + ".temp." + fmt.Sprintf("%d", time.Now().UnixNano())
24 newMod, err := model.MakeDB(tempDBPath)
25 if err != nil {
26 return err
27 }
28 repos, err := oldMod.GetAllRepos()
29 if err != nil {
30 return err
31 }
32
33 atsync := &atproto.ATProtoSynchronizer{
34 CLI: cli,
35 Model: newMod,
36 Noter: nil,
37 Bus: bus.NewBus(),
38 }
39
40 doneMap := make(map[string]bool)
41
42 g, ctx := errgroup.WithContext(ctx)
43
44 doneChan := make(chan string)
45 go func() {
46 for {
47 select {
48 case <-ctx.Done():
49 return
50 case did := <-doneChan:
51 doneMap[did] = true
52 case <-time.After(10 * time.Second):
53 for _, repo := range repos {
54 if !doneMap[repo.DID] {
55 log.Warn(ctx, "remaining repos to sync", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS)
56 }
57 }
58 }
59 }
60 }()
61
62 for _, repo := range repos {
63 repo := repo // capture range variable
64 doneMap[repo.DID] = false
65 g.Go(func() error {
66 log.Warn(ctx, "syncing repo", "did", repo.DID, "handle", repo.Handle)
67 ctx := log.WithLogValues(ctx, "resyncDID", repo.DID, "resyncHandle", repo.Handle)
68 _, err := atsync.SyncBlueskyRepoCached(ctx, repo.Handle, newMod)
69 if err != nil {
70 log.Error(ctx, "failed to sync repo", "did", repo.DID, "handle", repo.Handle, "err", err)
71 return nil
72 }
73 log.Log(ctx, "synced repo", "did", repo.DID, "handle", repo.Handle)
74 doneChan <- repo.DID
75 return nil
76 })
77 }
78
79 if err := g.Wait(); err != nil {
80 return err
81 }
82
83 oauthSessions, err := oldMod.ListOAuthSessions()
84 if err != nil {
85 return err
86 }
87 for _, session := range oauthSessions {
88 err := newMod.CreateOAuthSession(session.DownstreamDPoPJKT, &session)
89 if err != nil {
90 return fmt.Errorf("failed to create oauth session: %w", err)
91 }
92 }
93 log.Log(ctx, "migrated oauth sessions", "count", len(oauthSessions))
94
95 notificationTokens, err := oldMod.ListNotifications()
96 if err != nil {
97 return err
98 }
99 for _, token := range notificationTokens {
100 err := newMod.CreateNotification(token.Token, token.RepoDID)
101 if err != nil {
102 return fmt.Errorf("failed to create notification: %w", err)
103 }
104 }
105 log.Log(ctx, "migrated notification tokens", "count", len(notificationTokens))
106
107 log.Log(ctx, "resync complete!", "newDBPath", tempDBPath)
108
109 return nil
110}