like malachite (atproto-lastfm-importer) but in go and bluer
go
spotify
tealfm
lastfm
atproto
1package cache
2
3import (
4 "bytes"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "iter"
9 "os"
10 "path/filepath"
11 "strings"
12 "time"
13
14 "go.etcd.io/bbolt"
15 berr "go.etcd.io/bbolt/errors"
16)
17
18var _ Storage = (*BoltStorage)(nil)
19
20var ErrCacheNotFound = errors.New("cache not found")
21
22type BoltStorage struct {
23 db *bbolt.DB
24 path string
25}
26
27func NewBoltStorage(readOnly bool) (*BoltStorage, error) {
28 dir, err := cacheDir()
29 if err != nil {
30 return nil, err
31 }
32 if err := os.MkdirAll(dir, 0o755); err != nil {
33 return nil, err
34 }
35 db, err := bbolt.Open(filepath.Join(dir, CacheFile), 0o644, &bbolt.Options{
36 Timeout: time.Second,
37 ReadOnly: readOnly,
38 })
39 if err != nil {
40 return nil, err
41 }
42 return &BoltStorage{db: db, path: dir}, nil
43}
44
45func (s *BoltStorage) Close() error {
46 if s.db == nil {
47 return nil
48 }
49 return s.db.Close()
50}
51
52func (s *BoltStorage) SaveRecords(did string, records map[string][]byte) error {
53 if did == "" {
54 return errors.New("did cannot be empty")
55 }
56
57 return s.db.Update(func(tx *bbolt.Tx) error {
58 b, err := tx.CreateBucketIfNotExists([]byte(recordsBucket(did)))
59 if err != nil {
60 return err
61 }
62
63 for key, data := range records {
64 if err := b.Put([]byte(key), data); err != nil {
65 return err
66 }
67 }
68 return s.setTimestamp(tx, did)
69 })
70}
71
72func (s *BoltStorage) IterateUnpublished(did string, reverse bool) iter.Seq2[string, []byte] {
73 return func(yield func(key string, rec []byte) bool) {
74 published, err := s.GetPublished(did)
75 if err != nil {
76 return
77 }
78
79 _ = s.db.View(func(tx *bbolt.Tx) error {
80 b := tx.Bucket([]byte(recordsBucket(did)))
81 if b == nil {
82 return nil
83 }
84
85 if reverse {
86 cursor := b.Cursor()
87
88 for k, v := cursor.Last(); k != nil; k, v = cursor.Prev() {
89 key := string(k)
90 if !published[key] {
91 // Clone key and value to avoid holding read locks
92 if !yield(key, bytes.Clone(v)) {
93 break // Stop iteration when yield returns false
94 }
95 }
96 }
97 } else {
98 return b.ForEach(func(k, v []byte) error {
99 key := string(k)
100 if published[key] {
101 return nil
102 }
103 // Clone key and value to avoid holding read locks
104 if !yield(key, bytes.Clone(v)) {
105 return errors.New("stop iteration")
106 }
107 return nil
108 })
109 }
110 return nil
111 })
112 }
113}
114
115func (s *BoltStorage) IteratePublished(did string, reverse bool) iter.Seq2[string, []byte] {
116 return func(yield func(key string, rec []byte) bool) {
117 published, err := s.GetPublished(did)
118 if err != nil {
119 return
120 }
121
122 _ = s.db.View(func(tx *bbolt.Tx) error {
123 b := tx.Bucket([]byte(recordsBucket(did)))
124 if b == nil {
125 return nil
126 }
127
128 if reverse {
129 cursor := b.Cursor()
130 k, v := cursor.Last()
131 for k != nil {
132 key := string(k)
133 if published[key] {
134 // Clone key and value to avoid holding read locks
135 if !yield(key, bytes.Clone(v)) {
136 break // Stop iteration when yield returns false
137 }
138 }
139 k, v = cursor.Prev()
140 }
141 } else {
142 return b.ForEach(func(k, v []byte) error {
143 key := string(k)
144 if !published[key] {
145 return nil
146 }
147 // Clone key and value to avoid holding read locks
148 if !yield(key, bytes.Clone(v)) {
149 return errors.New("stop iteration")
150 }
151 return nil
152 })
153 }
154 return nil
155 })
156 }
157}
158
159func (s *BoltStorage) IterateFailed(did string) func(yield func(key string, rec []byte, errMsg string) bool) {
160 return func(yield func(key string, rec []byte, errMsg string) bool) {
161 _ = s.db.View(func(tx *bbolt.Tx) error {
162 fb := tx.Bucket([]byte(failedBucket(did)))
163 if fb == nil {
164 return nil
165 }
166 rb := tx.Bucket([]byte(recordsBucket(did)))
167 if rb == nil {
168 return nil
169 }
170
171 return fb.ForEach(func(k, v []byte) error {
172 key := string(k)
173 errMsg := string(v)
174 rec := rb.Get(k)
175 // Clone to avoid holding read lock
176 if !yield(key, bytes.Clone(rec), errMsg) {
177 return errors.New("stop iteration")
178 }
179 return nil
180 })
181 })
182 }
183}
184
185func (s *BoltStorage) MarkPublished(did string, keys ...string) error {
186 return s.db.Update(func(tx *bbolt.Tx) error {
187 b, err := tx.CreateBucketIfNotExists([]byte(processedBucket(did)))
188 if err != nil {
189 return err
190 }
191 for _, k := range keys {
192 if err := b.Put([]byte(k), []byte{1}); err != nil {
193 return err
194 }
195 }
196 return nil
197 })
198}
199
200func (s *BoltStorage) MarkFailed(did string, keys []string, errMsg string) error {
201 return s.db.Update(func(tx *bbolt.Tx) error {
202 b, err := tx.CreateBucketIfNotExists([]byte(failedBucket(did)))
203 if err != nil {
204 return err
205 }
206 // Also mark as processed so we don't try again
207 pb, err := tx.CreateBucketIfNotExists([]byte(processedBucket(did)))
208 if err != nil {
209 return err
210 }
211
212 for _, k := range keys {
213 if err := b.Put([]byte(k), []byte(errMsg)); err != nil {
214 return err
215 }
216 if err := pb.Put([]byte(k), []byte{0}); err != nil { // 0 for failed
217 return err
218 }
219 }
220 return nil
221 })
222}
223
224func (s *BoltStorage) RemoveFailed(did string, keys ...string) error {
225 return s.db.Update(func(tx *bbolt.Tx) error {
226 b := tx.Bucket([]byte(failedBucket(did)))
227 if b == nil {
228 return nil
229 }
230 for _, k := range keys {
231 if err := b.Delete([]byte(k)); err != nil {
232 return err
233 }
234 }
235 return nil
236 })
237}
238
239func (s *BoltStorage) GetPublished(did string) (map[string]bool, error) {
240 res := make(map[string]bool)
241 err := s.db.View(func(tx *bbolt.Tx) error {
242 b := tx.Bucket([]byte(processedBucket(did)))
243 if b == nil {
244 return nil
245 }
246 return b.ForEach(func(k, v []byte) error {
247 res[string(k)] = true
248 return nil
249 })
250 })
251 return res, err
252}
253
254func (s *BoltStorage) IsValid(did string) bool {
255 if did == "" {
256 return false
257 }
258 ts, err := s.Timestamp(did)
259 if err != nil {
260 return false
261 }
262 return time.Since(ts) < CacheTTL
263}
264
265func (s *BoltStorage) Timestamp(did string) (time.Time, error) {
266 var ts time.Time
267 err := s.db.View(func(tx *bbolt.Tx) error {
268 metaBkt := tx.Bucket([]byte(metaBucket()))
269 if metaBkt == nil {
270 return ErrCacheNotFound
271 }
272 data := metaBkt.Get([]byte(metaPrefixTimestamp + did))
273 if data == nil {
274 return ErrCacheNotFound
275 }
276 return json.Unmarshal(data, &ts)
277 })
278 return ts, err
279}
280
281func (s *BoltStorage) setTimestamp(tx *bbolt.Tx, did string) error {
282 metaBkt, err := tx.CreateBucketIfNotExists([]byte(metaBucket()))
283 if err != nil {
284 return fmt.Errorf("create meta bucket: %w", err)
285 }
286 ts := time.Now()
287 data, err := json.Marshal(ts)
288 if err != nil {
289 return fmt.Errorf("marshal timestamp: %w", err)
290 }
291 return metaBkt.Put([]byte(metaPrefixTimestamp+did), data)
292}
293
294func (s *BoltStorage) Clear(did string) error {
295 if did == "" {
296 return errors.New("did cannot be empty")
297 }
298
299 return s.db.Update(func(tx *bbolt.Tx) error {
300 if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) {
301 return fmt.Errorf("delete records bucket: %w", err)
302 }
303 if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) {
304 return fmt.Errorf("delete processed bucket: %w", err)
305 }
306 if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) {
307 return fmt.Errorf("delete failed bucket: %w", err)
308 }
309 metaBkt := tx.Bucket([]byte(metaBucket()))
310 if metaBkt != nil {
311 if err := metaBkt.Delete([]byte(metaPrefixTimestamp + did)); err != nil {
312 return fmt.Errorf("delete timestamp: %w", err)
313 }
314 }
315 return nil
316 })
317}
318
319func (s *BoltStorage) ClearAll() error {
320 return s.db.Update(func(tx *bbolt.Tx) error {
321 var dids []string
322 tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
323 if b == nil {
324 return nil
325 }
326 bucketName := string(name)
327 if after, ok := strings.CutPrefix(bucketName, "records:"); ok {
328 dids = append(dids, after)
329 }
330 return nil
331 })
332
333 for _, did := range dids {
334 if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) {
335 return fmt.Errorf("delete records for %s: %w", did, err)
336 }
337 if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) {
338 return fmt.Errorf("delete processed for %s: %w", did, err)
339 }
340 if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) {
341 return fmt.Errorf("delete failed for %s: %w", did, err)
342 }
343 }
344
345 if err := tx.DeleteBucket([]byte(metaBucket())); err != nil && !errors.Is(err, berr.ErrBucketNotFound) {
346 return fmt.Errorf("delete meta: %w", err)
347 }
348
349 if err := tx.DeleteBucket([]byte("quota")); err != nil && !errors.Is(err, berr.ErrBucketNotFound) {
350 return fmt.Errorf("delete quota: %w", err)
351 }
352
353 return nil
354 })
355}
356
357func (s *BoltStorage) GetMulti(keys []string) (map[string]int, error) {
358 res := make(map[string]int, len(keys))
359 err := s.db.View(func(tx *bbolt.Tx) error {
360 b := tx.Bucket([]byte("quota"))
361 if b == nil {
362 return nil
363 }
364 for _, key := range keys {
365 v := b.Get([]byte(key))
366 if v == nil {
367 res[key] = 0
368 continue
369 }
370 var val int
371 if err := json.Unmarshal(v, &val); err != nil {
372 return err
373 }
374 res[key] = val
375 }
376 return nil
377 })
378 return res, err
379}
380
381func (s *BoltStorage) IncrByMulti(deltas map[string]int) error {
382 return s.db.Update(func(tx *bbolt.Tx) error {
383 b, err := tx.CreateBucketIfNotExists([]byte("quota"))
384 if err != nil {
385 return err
386 }
387 for key, n := range deltas {
388 var val int
389 v := b.Get([]byte(key))
390 if v != nil {
391 if err := json.Unmarshal(v, &val); err != nil {
392 return err
393 }
394 }
395 val += n
396 newV, err := json.Marshal(val)
397 if err != nil {
398 return err
399 }
400 if err := b.Put([]byte(key), newV); err != nil {
401 return err
402 }
403 }
404 return nil
405 })
406}
407
408func (s *BoltStorage) Stats() (DBStats, error) {
409 var stats DBStats
410 stats.UserStats = make(map[string]any)
411
412 err := s.db.View(func(tx *bbolt.Tx) error {
413 return tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
414 bucketName := string(name)
415
416 if after, ok := strings.CutPrefix(bucketName, "records:"); ok {
417 did := after
418 count := 0
419 b.ForEach(func(k, v []byte) error {
420 count++
421 return nil
422 })
423 stats.TotalRecords += count
424
425 userStat := stats.UserStats[did]
426 if userStat == nil {
427 userStat = make(map[string]int)
428 }
429 m := userStat.(map[string]int)
430 m["total"] = count
431 stats.UserStats[did] = m
432 }
433
434 if after, ok := strings.CutPrefix(bucketName, "processed:"); ok {
435 did := after
436 count := 0
437 b.ForEach(func(k, v []byte) error {
438 count++
439 return nil
440 })
441 stats.MarkedPublished += count
442
443 userStat := stats.UserStats[did]
444 if userStat == nil {
445 userStat = make(map[string]int)
446 }
447 m := userStat.(map[string]int)
448 m["published"] = count
449 stats.UserStats[did] = m
450 }
451
452 if after, ok := strings.CutPrefix(bucketName, "failed:"); ok {
453 did := after
454 count := 0
455 b.ForEach(func(k, v []byte) error {
456 count++
457 return nil
458 })
459 stats.FailedCount += count
460
461 userStat := stats.UserStats[did]
462 if userStat == nil {
463 userStat = make(map[string]int)
464 }
465 m := userStat.(map[string]int)
466 m["failed"] = count
467 stats.UserStats[did] = m
468 }
469
470 return nil
471 })
472 })
473 if err != nil {
474 return stats, err
475 }
476
477 stats.UnpublishedCount = stats.TotalRecords - stats.MarkedPublished
478 return stats, nil
479}