package sync import ( "fmt" "sync" "time" ) const StateDir = "~/.lazuli/state" type SyncState struct { LastProcessedTime time.Time `json:"lastProcessedTime"` LastProcessedKey string `json:"lastProcessedKey"` CompletedAt *time.Time `json:"completedAt,omitempty"` StartedAt time.Time `json:"startedAt"` TotalRecords int `json:"totalRecords"` ImportedRecords int `json:"importedRecords"` FailedRecords int `json:"failedRecords"` } type ProgressTracker struct { Total int Completed int Errors int StartTime time.Time LastLogTime time.Time mu sync.Mutex limiter RateLimiter LogInterval time.Duration LogRecordsMetric int } func NewProgressTracker(total int, limiter RateLimiter) *ProgressTracker { return &ProgressTracker{ Total: total, StartTime: time.Now(), LastLogTime: time.Now(), limiter: limiter, LogInterval: 5 * time.Second, LogRecordsMetric: 100, } } func (t *ProgressTracker) Increment(completed int) { t.mu.Lock() defer t.mu.Unlock() t.Completed += completed } func (t *ProgressTracker) IncrementErrors(n int) { t.mu.Lock() defer t.mu.Unlock() t.Errors += n } func (t *ProgressTracker) Progress() (percent float64, eta, elapsed time.Duration, rate string) { t.mu.Lock() defer t.mu.Unlock() elapsed = time.Since(t.StartTime) if t.Completed == 0 { return 0, 0, elapsed, "0/min" } observedRate := float64(t.Completed) / elapsed.Minutes() rate = formatRate(observedRate) if t.Total == 0 { return 100, 0, elapsed, rate } percent = float64(t.Completed) / float64(t.Total) * 100 if percent >= 100 { return 100, 0, elapsed, rate } remainingRecords := float64(t.Total - t.Completed) remainingMinutes := remainingRecords / observedRate eta = time.Duration(remainingMinutes*60) * time.Second return percent, eta, elapsed, rate } func (t *ProgressTracker) ShouldLog() bool { t.mu.Lock() defer t.mu.Unlock() if t.Completed == 0 { return false } now := time.Now() if now.Sub(t.LastLogTime) >= t.LogInterval { t.LastLogTime = now return true } if t.Completed%t.LogRecordsMetric == 0 { return true } return false } type ProgressReport struct { Total int `json:"total"` Completed int `json:"completed"` Percent float64 `json:"percent"` Errors int `json:"errors"` Elapsed string `json:"elapsed"` ETA string `json:"eta,omitempty"` Rate string `json:"rate"` WritesConsumed int `json:"writesConsumed,omitempty"` GlobalConsumed int `json:"globalConsumed,omitempty"` WritesRemaining int `json:"writesRemaining,omitempty"` GlobalRemaining int `json:"globalRemaining,omitempty"` TimeUntilReset string `json:"timeUntilReset,omitempty"` ConstrainedRate string `json:"constrainedRate,omitempty"` } func (t *ProgressTracker) Report() ProgressReport { percent, eta, elapsed, rate := t.Progress() etaStr := "" if eta > 0 { etaStr = eta.String() } var w, g int var writesRemaining, globalRemaining int var timeUntilReset time.Duration var constrainedRate string if t.limiter != nil { w, g, _ = t.limiter.Stats() writesRemaining, globalRemaining, timeUntilReset = t.limiter.RemainingQuota() if writesRemaining > 0 && timeUntilReset > 0 { constrainedPerMin := float64(writesRemaining) / (timeUntilReset.Minutes() + 0.001) if constrainedPerMin >= 1000 { constrainedRate = fmt.Sprintf("%.1fk/min", constrainedPerMin/1000) } else { constrainedRate = fmt.Sprintf("%.0f/min", constrainedPerMin) } } } resetStr := "" if timeUntilReset > 0 { resetStr = timeUntilReset.String() } return ProgressReport{ Total: t.Total, Completed: t.Completed, Percent: percent, Errors: t.Errors, Elapsed: elapsed.Round(time.Second).String(), ETA: etaStr, Rate: rate, WritesConsumed: w, GlobalConsumed: g, WritesRemaining: writesRemaining, GlobalRemaining: globalRemaining, TimeUntilReset: resetStr, ConstrainedRate: constrainedRate, } } func formatRate(perMin float64) string { if perMin >= 1000 { return fmt.Sprintf("%.1fk/min", perMin/1000) } return fmt.Sprintf("%.0f/min", perMin) }