fork
Configure Feed
Select the types of activity you want to include in your feed.
Live video on the AT Protocol
fork
Configure Feed
Select the types of activity you want to include in your feed.
1package atproto
2
3import (
4 "context"
5 "fmt"
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "golang.org/x/sync/errgroup"
11 "stream.place/streamplace/pkg/log"
12)
13
14func (atsync *ATProtoSynchronizer) Migrate(ctx context.Context) error {
15 var allDIDs []string
16 offset := 0
17 for {
18 repos, err := atsync.StatefulDB.ListRepos(100, offset)
19 if err != nil {
20 return err
21 }
22 if len(repos) == 0 {
23 break
24 }
25 for _, repo := range repos {
26 allDIDs = append(allDIDs, repo.DID)
27 }
28 offset += len(repos)
29 }
30
31 log.Log(ctx, "starting migration sync", "totalRepos", len(allDIDs))
32
33 g, ctx := errgroup.WithContext(ctx)
34 var syncedCount int64
35
36 syncErrors := map[string]error{}
37 syncErrorMu := sync.Mutex{}
38
39 // Start progress logging goroutine
40 progressCtx, cancelProgress := context.WithCancel(ctx)
41 defer cancelProgress()
42
43 go func() {
44 ticker := time.NewTicker(10 * time.Second)
45 defer ticker.Stop()
46
47 for {
48 select {
49 case <-progressCtx.Done():
50 return
51 case <-ticker.C:
52 current := atomic.LoadInt64(&syncedCount)
53 log.Log(ctx, "migration progress", "synced", current, "total", len(allDIDs))
54 }
55 }
56 }()
57
58 for i, did := range allDIDs {
59 currentIndex := i
60 currentDID := did
61 g.Go(func() error {
62 log.Debug(ctx, "syncing repo", "did", currentDID, "progress", currentIndex+1, "total", len(allDIDs))
63 _, err := atsync.SyncBlueskyRepoCached(ctx, currentDID, atsync.Model)
64 if err != nil {
65 log.Error(ctx, "failed to sync repo", "did", currentDID, "err", err)
66 syncErrorMu.Lock()
67 syncErrors[currentDID] = err
68 syncErrorMu.Unlock()
69 } else {
70 atomic.AddInt64(&syncedCount, 1)
71 }
72 return nil
73 })
74 }
75
76 if err := g.Wait(); err != nil {
77 log.Error(ctx, "migration failed", "err", err, "synced", atomic.LoadInt64(&syncedCount), "total", len(allDIDs))
78 return err
79 }
80
81 for did, err := range syncErrors {
82 log.Error(ctx, "migration failed for user", "did", did, "err", err)
83 }
84
85 if len(allDIDs) > 0 && len(syncErrors) == len(allDIDs) {
86 return fmt.Errorf("all users failed to migrate")
87 }
88
89 log.Log(ctx, "migration completed", "synced", len(allDIDs))
90 return nil
91}