Live video on the AT Protocol
at eli/buffer-thumbnails 91 lines 2.1 kB view raw
1package atproto 2 3import ( 4 "context" 5 "fmt" 6 "sync" 7 "sync/atomic" 8 "time" 9 10 "golang.org/x/sync/errgroup" 11 "stream.place/streamplace/pkg/log" 12) 13 14func (atsync *ATProtoSynchronizer) Migrate(ctx context.Context) error { 15 var allDIDs []string 16 offset := 0 17 for { 18 repos, err := atsync.StatefulDB.ListRepos(100, offset) 19 if err != nil { 20 return err 21 } 22 if len(repos) == 0 { 23 break 24 } 25 for _, repo := range repos { 26 allDIDs = append(allDIDs, repo.DID) 27 } 28 offset += len(repos) 29 } 30 31 log.Log(ctx, "starting migration sync", "totalRepos", len(allDIDs)) 32 33 g, ctx := errgroup.WithContext(ctx) 34 var syncedCount int64 35 36 syncErrors := map[string]error{} 37 syncErrorMu := sync.Mutex{} 38 39 // Start progress logging goroutine 40 progressCtx, cancelProgress := context.WithCancel(ctx) 41 defer cancelProgress() 42 43 go func() { 44 ticker := time.NewTicker(10 * time.Second) 45 defer ticker.Stop() 46 47 for { 48 select { 49 case <-progressCtx.Done(): 50 return 51 case <-ticker.C: 52 current := atomic.LoadInt64(&syncedCount) 53 log.Log(ctx, "migration progress", "synced", current, "total", len(allDIDs)) 54 } 55 } 56 }() 57 58 for i, did := range allDIDs { 59 currentIndex := i 60 currentDID := did 61 g.Go(func() error { 62 log.Debug(ctx, "syncing repo", "did", currentDID, "progress", currentIndex+1, "total", len(allDIDs)) 63 _, err := atsync.SyncBlueskyRepoCached(ctx, currentDID, atsync.Model) 64 if err != nil { 65 log.Error(ctx, "failed to sync repo", "did", currentDID, "err", err) 66 syncErrorMu.Lock() 67 syncErrors[currentDID] = err 68 syncErrorMu.Unlock() 69 } else { 70 atomic.AddInt64(&syncedCount, 1) 71 } 72 return nil 73 }) 74 } 75 76 if err := g.Wait(); err != nil { 77 log.Error(ctx, "migration failed", "err", err, "synced", atomic.LoadInt64(&syncedCount), "total", len(allDIDs)) 78 return err 79 } 80 81 for did, err := range syncErrors { 82 log.Error(ctx, "migration failed for user", "did", did, "err", err) 83 } 84 85 if len(allDIDs) > 0 && len(syncErrors) == len(allDIDs) { 86 return fmt.Errorf("all users failed to migrate") 87 } 88 89 log.Log(ctx, "migration completed", "synced", len(allDIDs)) 90 return nil 91}