Live video on the AT Protocol
1package atproto
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "golang.org/x/sync/errgroup"
11 "stream.place/streamplace/pkg/log"
12)
13
14func (atsync *ATProtoSynchronizer) Migrate(ctx context.Context) error {
15 var allDIDs []string
16 offset := 0
17 for {
18 repos, err := atsync.StatefulDB.ListRepos(100, offset)
19 if err != nil {
20 return err
21 }
22 if len(repos) == 0 {
23 break
24 }
25 for _, repo := range repos {
26 allDIDs = append(allDIDs, repo.DID)
27 }
28 offset += len(repos)
29 }
30
31 log.Log(ctx, "starting migration sync", "totalRepos", len(allDIDs))
32
33 g, ctx := errgroup.WithContext(ctx)
34 var syncedCount int64
35
36 syncErrors := map[string]error{}
37 syncErrorMu := sync.Mutex{}
38
39 // Start progress logging goroutine
40 progressCtx, cancelProgress := context.WithCancel(ctx)
41 defer cancelProgress()
42
43 go func() {
44 ticker := time.NewTicker(10 * time.Second)
45 defer ticker.Stop()
46
47 for {
48 select {
49 case <-progressCtx.Done():
50 return
51 case <-ticker.C:
52 current := atomic.LoadInt64(&syncedCount)
53 log.Log(ctx, "migration progress", "synced", current, "total", len(allDIDs))
54 }
55 }
56 }()
57
58 for i, did := range allDIDs {
59 currentIndex := i
60 currentDID := did
61 g.Go(func() error {
62 log.Debug(ctx, "syncing repo", "did", currentDID, "progress", currentIndex+1, "total", len(allDIDs))
63 _, err := atsync.SyncBlueskyRepoCached(ctx, currentDID, atsync.Model)
64 if err != nil {
65 log.Error(ctx, "failed to sync repo", "did", currentDID, "err", err)
66 syncErrorMu.Lock()
67 syncErrors[currentDID] = err
68 syncErrorMu.Unlock()
69 } else {
70 atomic.AddInt64(&syncedCount, 1)
71 }
72 return nil
73 })
74 }
75
76 if err := g.Wait(); err != nil {
77 log.Error(ctx, "migration failed", "err", err, "synced", atomic.LoadInt64(&syncedCount), "total", len(allDIDs))
78 return err
79 }
80
81 for did, err := range syncErrors {
82 log.Error(ctx, "migration failed for user", "did", did, "err", err)
83 }
84
85 if len(allDIDs) > 0 && len(syncErrors) == len(allDIDs) {
86 return fmt.Errorf("all users failed to migrate")
87 }
88
89 log.Log(ctx, "migration completed", "synced", len(allDIDs))
90 return nil
91}