like malachite (atproto-lastfm-importer) but in go and bluer
go
spotify
tealfm
lastfm
atproto
1package sync
2
3import (
4 "fmt"
5 "sync"
6 "time"
7)
8
9const StateDir = "~/.lazuli/state"
10
11type SyncState struct {
12 LastProcessedTime time.Time `json:"lastProcessedTime"`
13 LastProcessedKey string `json:"lastProcessedKey"`
14 CompletedAt *time.Time `json:"completedAt,omitempty"`
15 StartedAt time.Time `json:"startedAt"`
16 TotalRecords int `json:"totalRecords"`
17 ImportedRecords int `json:"importedRecords"`
18 FailedRecords int `json:"failedRecords"`
19}
20
21type ProgressTracker struct {
22 Total int
23 Completed int
24 Errors int
25 StartTime time.Time
26 LastLogTime time.Time
27 mu sync.Mutex
28
29 limiter RateLimiter
30 LogInterval time.Duration
31 LogRecordsMetric int
32}
33
34func NewProgressTracker(total int, limiter RateLimiter) *ProgressTracker {
35 return &ProgressTracker{
36 Total: total,
37 StartTime: time.Now(),
38 LastLogTime: time.Now(),
39 limiter: limiter,
40 LogInterval: 5 * time.Second,
41 LogRecordsMetric: 100,
42 }
43}
44
45func (t *ProgressTracker) Increment(completed int) {
46 t.mu.Lock()
47 defer t.mu.Unlock()
48 t.Completed += completed
49}
50
51func (t *ProgressTracker) IncrementErrors(n int) {
52 t.mu.Lock()
53 defer t.mu.Unlock()
54 t.Errors += n
55}
56
57func (t *ProgressTracker) Progress() (percent float64, eta, elapsed time.Duration, rate string) {
58 t.mu.Lock()
59 defer t.mu.Unlock()
60
61 elapsed = time.Since(t.StartTime)
62 if t.Completed == 0 {
63 return 0, 0, elapsed, "0/min"
64 }
65
66 observedRate := float64(t.Completed) / elapsed.Minutes()
67 rate = formatRate(observedRate)
68
69 if t.Total == 0 {
70 return 100, 0, elapsed, rate
71 }
72
73 percent = float64(t.Completed) / float64(t.Total) * 100
74 if percent >= 100 {
75 return 100, 0, elapsed, rate
76 }
77
78 remainingRecords := float64(t.Total - t.Completed)
79 remainingMinutes := remainingRecords / observedRate
80 eta = time.Duration(remainingMinutes*60) * time.Second
81 return percent, eta, elapsed, rate
82}
83
84func (t *ProgressTracker) ShouldLog() bool {
85 t.mu.Lock()
86 defer t.mu.Unlock()
87
88 if t.Completed == 0 {
89 return false
90 }
91
92 now := time.Now()
93 if now.Sub(t.LastLogTime) >= t.LogInterval {
94 t.LastLogTime = now
95 return true
96 }
97 if t.Completed%t.LogRecordsMetric == 0 {
98 return true
99 }
100 return false
101}
102
103type ProgressReport struct {
104 Total int `json:"total"`
105 Completed int `json:"completed"`
106 Percent float64 `json:"percent"`
107 Errors int `json:"errors"`
108 Elapsed string `json:"elapsed"`
109 ETA string `json:"eta,omitempty"`
110 Rate string `json:"rate"`
111 WritesConsumed int `json:"writesConsumed,omitempty"`
112 GlobalConsumed int `json:"globalConsumed,omitempty"`
113 WritesRemaining int `json:"writesRemaining,omitempty"`
114 GlobalRemaining int `json:"globalRemaining,omitempty"`
115 TimeUntilReset string `json:"timeUntilReset,omitempty"`
116 ConstrainedRate string `json:"constrainedRate,omitempty"`
117}
118
119func (t *ProgressTracker) Report() ProgressReport {
120 percent, eta, elapsed, rate := t.Progress()
121 etaStr := ""
122 if eta > 0 {
123 etaStr = eta.String()
124 }
125
126 var w, g int
127 var writesRemaining, globalRemaining int
128 var timeUntilReset time.Duration
129 var constrainedRate string
130
131 if t.limiter != nil {
132 w, g, _ = t.limiter.Stats()
133 writesRemaining, globalRemaining, timeUntilReset = t.limiter.RemainingQuota()
134 if writesRemaining > 0 && timeUntilReset > 0 {
135 constrainedPerMin := float64(writesRemaining) / (timeUntilReset.Minutes() + 0.001)
136 if constrainedPerMin >= 1000 {
137 constrainedRate = fmt.Sprintf("%.1fk/min", constrainedPerMin/1000)
138 } else {
139 constrainedRate = fmt.Sprintf("%.0f/min", constrainedPerMin)
140 }
141 }
142 }
143
144 resetStr := ""
145 if timeUntilReset > 0 {
146 resetStr = timeUntilReset.String()
147 }
148
149 return ProgressReport{
150 Total: t.Total,
151 Completed: t.Completed,
152 Percent: percent,
153 Errors: t.Errors,
154 Elapsed: elapsed.Round(time.Second).String(),
155 ETA: etaStr,
156 Rate: rate,
157 WritesConsumed: w,
158 GlobalConsumed: g,
159 WritesRemaining: writesRemaining,
160 GlobalRemaining: globalRemaining,
161 TimeUntilReset: resetStr,
162 ConstrainedRate: constrainedRate,
163 }
164}
165
166func formatRate(perMin float64) string {
167 if perMin >= 1000 {
168 return fmt.Sprintf("%.1fk/min", perMin/1000)
169 }
170 return fmt.Sprintf("%.0f/min", perMin)
171}