like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
at main 171 lines 4.3 kB view raw
1package sync 2 3import ( 4 "fmt" 5 "sync" 6 "time" 7) 8 9const StateDir = "~/.lazuli/state" 10 11type SyncState struct { 12 LastProcessedTime time.Time `json:"lastProcessedTime"` 13 LastProcessedKey string `json:"lastProcessedKey"` 14 CompletedAt *time.Time `json:"completedAt,omitempty"` 15 StartedAt time.Time `json:"startedAt"` 16 TotalRecords int `json:"totalRecords"` 17 ImportedRecords int `json:"importedRecords"` 18 FailedRecords int `json:"failedRecords"` 19} 20 21type ProgressTracker struct { 22 Total int 23 Completed int 24 Errors int 25 StartTime time.Time 26 LastLogTime time.Time 27 mu sync.Mutex 28 29 limiter RateLimiter 30 LogInterval time.Duration 31 LogRecordsMetric int 32} 33 34func NewProgressTracker(total int, limiter RateLimiter) *ProgressTracker { 35 return &ProgressTracker{ 36 Total: total, 37 StartTime: time.Now(), 38 LastLogTime: time.Now(), 39 limiter: limiter, 40 LogInterval: 5 * time.Second, 41 LogRecordsMetric: 100, 42 } 43} 44 45func (t *ProgressTracker) Increment(completed int) { 46 t.mu.Lock() 47 defer t.mu.Unlock() 48 t.Completed += completed 49} 50 51func (t *ProgressTracker) IncrementErrors(n int) { 52 t.mu.Lock() 53 defer t.mu.Unlock() 54 t.Errors += n 55} 56 57func (t *ProgressTracker) Progress() (percent float64, eta, elapsed time.Duration, rate string) { 58 t.mu.Lock() 59 defer t.mu.Unlock() 60 61 elapsed = time.Since(t.StartTime) 62 if t.Completed == 0 { 63 return 0, 0, elapsed, "0/min" 64 } 65 66 observedRate := float64(t.Completed) / elapsed.Minutes() 67 rate = formatRate(observedRate) 68 69 if t.Total == 0 { 70 return 100, 0, elapsed, rate 71 } 72 73 percent = float64(t.Completed) / float64(t.Total) * 100 74 if percent >= 100 { 75 return 100, 0, elapsed, rate 76 } 77 78 remainingRecords := float64(t.Total - t.Completed) 79 remainingMinutes := remainingRecords / observedRate 80 eta = time.Duration(remainingMinutes*60) * time.Second 81 return percent, eta, elapsed, rate 82} 83 84func (t *ProgressTracker) ShouldLog() bool { 85 t.mu.Lock() 86 defer t.mu.Unlock() 87 88 if t.Completed == 0 { 89 return false 90 } 91 92 now := time.Now() 93 if now.Sub(t.LastLogTime) >= t.LogInterval { 94 t.LastLogTime = now 95 return true 96 } 97 if t.Completed%t.LogRecordsMetric == 0 { 98 return true 99 } 100 return false 101} 102 103type ProgressReport struct { 104 Total int `json:"total"` 105 Completed int `json:"completed"` 106 Percent float64 `json:"percent"` 107 Errors int `json:"errors"` 108 Elapsed string `json:"elapsed"` 109 ETA string `json:"eta,omitempty"` 110 Rate string `json:"rate"` 111 WritesConsumed int `json:"writesConsumed,omitempty"` 112 GlobalConsumed int `json:"globalConsumed,omitempty"` 113 WritesRemaining int `json:"writesRemaining,omitempty"` 114 GlobalRemaining int `json:"globalRemaining,omitempty"` 115 TimeUntilReset string `json:"timeUntilReset,omitempty"` 116 ConstrainedRate string `json:"constrainedRate,omitempty"` 117} 118 119func (t *ProgressTracker) Report() ProgressReport { 120 percent, eta, elapsed, rate := t.Progress() 121 etaStr := "" 122 if eta > 0 { 123 etaStr = eta.String() 124 } 125 126 var w, g int 127 var writesRemaining, globalRemaining int 128 var timeUntilReset time.Duration 129 var constrainedRate string 130 131 if t.limiter != nil { 132 w, g, _ = t.limiter.Stats() 133 writesRemaining, globalRemaining, timeUntilReset = t.limiter.RemainingQuota() 134 if writesRemaining > 0 && timeUntilReset > 0 { 135 constrainedPerMin := float64(writesRemaining) / (timeUntilReset.Minutes() + 0.001) 136 if constrainedPerMin >= 1000 { 137 constrainedRate = fmt.Sprintf("%.1fk/min", constrainedPerMin/1000) 138 } else { 139 constrainedRate = fmt.Sprintf("%.0f/min", constrainedPerMin) 140 } 141 } 142 } 143 144 resetStr := "" 145 if timeUntilReset > 0 { 146 resetStr = timeUntilReset.String() 147 } 148 149 return ProgressReport{ 150 Total: t.Total, 151 Completed: t.Completed, 152 Percent: percent, 153 Errors: t.Errors, 154 Elapsed: elapsed.Round(time.Second).String(), 155 ETA: etaStr, 156 Rate: rate, 157 WritesConsumed: w, 158 GlobalConsumed: g, 159 WritesRemaining: writesRemaining, 160 GlobalRemaining: globalRemaining, 161 TimeUntilReset: resetStr, 162 ConstrainedRate: constrainedRate, 163 } 164} 165 166func formatRate(perMin float64) string { 167 if perMin >= 1000 { 168 return fmt.Sprintf("%.1fk/min", perMin/1000) 169 } 170 return fmt.Sprintf("%.0f/min", perMin) 171}