like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
at main 479 lines 11 kB view raw
1package cache 2 3import ( 4 "bytes" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "iter" 9 "os" 10 "path/filepath" 11 "strings" 12 "time" 13 14 "go.etcd.io/bbolt" 15 berr "go.etcd.io/bbolt/errors" 16) 17 18var _ Storage = (*BoltStorage)(nil) 19 20var ErrCacheNotFound = errors.New("cache not found") 21 22type BoltStorage struct { 23 db *bbolt.DB 24 path string 25} 26 27func NewBoltStorage(readOnly bool) (*BoltStorage, error) { 28 dir, err := cacheDir() 29 if err != nil { 30 return nil, err 31 } 32 if err := os.MkdirAll(dir, 0o755); err != nil { 33 return nil, err 34 } 35 db, err := bbolt.Open(filepath.Join(dir, CacheFile), 0o644, &bbolt.Options{ 36 Timeout: time.Second, 37 ReadOnly: readOnly, 38 }) 39 if err != nil { 40 return nil, err 41 } 42 return &BoltStorage{db: db, path: dir}, nil 43} 44 45func (s *BoltStorage) Close() error { 46 if s.db == nil { 47 return nil 48 } 49 return s.db.Close() 50} 51 52func (s *BoltStorage) SaveRecords(did string, records map[string][]byte) error { 53 if did == "" { 54 return errors.New("did cannot be empty") 55 } 56 57 return s.db.Update(func(tx *bbolt.Tx) error { 58 b, err := tx.CreateBucketIfNotExists([]byte(recordsBucket(did))) 59 if err != nil { 60 return err 61 } 62 63 for key, data := range records { 64 if err := b.Put([]byte(key), data); err != nil { 65 return err 66 } 67 } 68 return s.setTimestamp(tx, did) 69 }) 70} 71 72func (s *BoltStorage) IterateUnpublished(did string, reverse bool) iter.Seq2[string, []byte] { 73 return func(yield func(key string, rec []byte) bool) { 74 published, err := s.GetPublished(did) 75 if err != nil { 76 return 77 } 78 79 _ = s.db.View(func(tx *bbolt.Tx) error { 80 b := tx.Bucket([]byte(recordsBucket(did))) 81 if b == nil { 82 return nil 83 } 84 85 if reverse { 86 cursor := b.Cursor() 87 88 for k, v := cursor.Last(); k != nil; k, v = cursor.Prev() { 89 key := string(k) 90 if !published[key] { 91 // Clone key and value to avoid holding read locks 92 if !yield(key, bytes.Clone(v)) { 93 break // Stop iteration when yield returns false 94 } 95 } 96 } 97 } else { 98 return b.ForEach(func(k, v []byte) error { 99 key := string(k) 100 if published[key] { 101 return nil 102 } 103 // Clone key and value to avoid holding read locks 104 if !yield(key, bytes.Clone(v)) { 105 return errors.New("stop iteration") 106 } 107 return nil 108 }) 109 } 110 return nil 111 }) 112 } 113} 114 115func (s *BoltStorage) IteratePublished(did string, reverse bool) iter.Seq2[string, []byte] { 116 return func(yield func(key string, rec []byte) bool) { 117 published, err := s.GetPublished(did) 118 if err != nil { 119 return 120 } 121 122 _ = s.db.View(func(tx *bbolt.Tx) error { 123 b := tx.Bucket([]byte(recordsBucket(did))) 124 if b == nil { 125 return nil 126 } 127 128 if reverse { 129 cursor := b.Cursor() 130 k, v := cursor.Last() 131 for k != nil { 132 key := string(k) 133 if published[key] { 134 // Clone key and value to avoid holding read locks 135 if !yield(key, bytes.Clone(v)) { 136 break // Stop iteration when yield returns false 137 } 138 } 139 k, v = cursor.Prev() 140 } 141 } else { 142 return b.ForEach(func(k, v []byte) error { 143 key := string(k) 144 if !published[key] { 145 return nil 146 } 147 // Clone key and value to avoid holding read locks 148 if !yield(key, bytes.Clone(v)) { 149 return errors.New("stop iteration") 150 } 151 return nil 152 }) 153 } 154 return nil 155 }) 156 } 157} 158 159func (s *BoltStorage) IterateFailed(did string) func(yield func(key string, rec []byte, errMsg string) bool) { 160 return func(yield func(key string, rec []byte, errMsg string) bool) { 161 _ = s.db.View(func(tx *bbolt.Tx) error { 162 fb := tx.Bucket([]byte(failedBucket(did))) 163 if fb == nil { 164 return nil 165 } 166 rb := tx.Bucket([]byte(recordsBucket(did))) 167 if rb == nil { 168 return nil 169 } 170 171 return fb.ForEach(func(k, v []byte) error { 172 key := string(k) 173 errMsg := string(v) 174 rec := rb.Get(k) 175 // Clone to avoid holding read lock 176 if !yield(key, bytes.Clone(rec), errMsg) { 177 return errors.New("stop iteration") 178 } 179 return nil 180 }) 181 }) 182 } 183} 184 185func (s *BoltStorage) MarkPublished(did string, keys ...string) error { 186 return s.db.Update(func(tx *bbolt.Tx) error { 187 b, err := tx.CreateBucketIfNotExists([]byte(processedBucket(did))) 188 if err != nil { 189 return err 190 } 191 for _, k := range keys { 192 if err := b.Put([]byte(k), []byte{1}); err != nil { 193 return err 194 } 195 } 196 return nil 197 }) 198} 199 200func (s *BoltStorage) MarkFailed(did string, keys []string, errMsg string) error { 201 return s.db.Update(func(tx *bbolt.Tx) error { 202 b, err := tx.CreateBucketIfNotExists([]byte(failedBucket(did))) 203 if err != nil { 204 return err 205 } 206 // Also mark as processed so we don't try again 207 pb, err := tx.CreateBucketIfNotExists([]byte(processedBucket(did))) 208 if err != nil { 209 return err 210 } 211 212 for _, k := range keys { 213 if err := b.Put([]byte(k), []byte(errMsg)); err != nil { 214 return err 215 } 216 if err := pb.Put([]byte(k), []byte{0}); err != nil { // 0 for failed 217 return err 218 } 219 } 220 return nil 221 }) 222} 223 224func (s *BoltStorage) RemoveFailed(did string, keys ...string) error { 225 return s.db.Update(func(tx *bbolt.Tx) error { 226 b := tx.Bucket([]byte(failedBucket(did))) 227 if b == nil { 228 return nil 229 } 230 for _, k := range keys { 231 if err := b.Delete([]byte(k)); err != nil { 232 return err 233 } 234 } 235 return nil 236 }) 237} 238 239func (s *BoltStorage) GetPublished(did string) (map[string]bool, error) { 240 res := make(map[string]bool) 241 err := s.db.View(func(tx *bbolt.Tx) error { 242 b := tx.Bucket([]byte(processedBucket(did))) 243 if b == nil { 244 return nil 245 } 246 return b.ForEach(func(k, v []byte) error { 247 res[string(k)] = true 248 return nil 249 }) 250 }) 251 return res, err 252} 253 254func (s *BoltStorage) IsValid(did string) bool { 255 if did == "" { 256 return false 257 } 258 ts, err := s.Timestamp(did) 259 if err != nil { 260 return false 261 } 262 return time.Since(ts) < CacheTTL 263} 264 265func (s *BoltStorage) Timestamp(did string) (time.Time, error) { 266 var ts time.Time 267 err := s.db.View(func(tx *bbolt.Tx) error { 268 metaBkt := tx.Bucket([]byte(metaBucket())) 269 if metaBkt == nil { 270 return ErrCacheNotFound 271 } 272 data := metaBkt.Get([]byte(metaPrefixTimestamp + did)) 273 if data == nil { 274 return ErrCacheNotFound 275 } 276 return json.Unmarshal(data, &ts) 277 }) 278 return ts, err 279} 280 281func (s *BoltStorage) setTimestamp(tx *bbolt.Tx, did string) error { 282 metaBkt, err := tx.CreateBucketIfNotExists([]byte(metaBucket())) 283 if err != nil { 284 return fmt.Errorf("create meta bucket: %w", err) 285 } 286 ts := time.Now() 287 data, err := json.Marshal(ts) 288 if err != nil { 289 return fmt.Errorf("marshal timestamp: %w", err) 290 } 291 return metaBkt.Put([]byte(metaPrefixTimestamp+did), data) 292} 293 294func (s *BoltStorage) Clear(did string) error { 295 if did == "" { 296 return errors.New("did cannot be empty") 297 } 298 299 return s.db.Update(func(tx *bbolt.Tx) error { 300 if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 301 return fmt.Errorf("delete records bucket: %w", err) 302 } 303 if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 304 return fmt.Errorf("delete processed bucket: %w", err) 305 } 306 if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 307 return fmt.Errorf("delete failed bucket: %w", err) 308 } 309 metaBkt := tx.Bucket([]byte(metaBucket())) 310 if metaBkt != nil { 311 if err := metaBkt.Delete([]byte(metaPrefixTimestamp + did)); err != nil { 312 return fmt.Errorf("delete timestamp: %w", err) 313 } 314 } 315 return nil 316 }) 317} 318 319func (s *BoltStorage) ClearAll() error { 320 return s.db.Update(func(tx *bbolt.Tx) error { 321 var dids []string 322 tx.ForEach(func(name []byte, b *bbolt.Bucket) error { 323 if b == nil { 324 return nil 325 } 326 bucketName := string(name) 327 if after, ok := strings.CutPrefix(bucketName, "records:"); ok { 328 dids = append(dids, after) 329 } 330 return nil 331 }) 332 333 for _, did := range dids { 334 if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 335 return fmt.Errorf("delete records for %s: %w", did, err) 336 } 337 if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 338 return fmt.Errorf("delete processed for %s: %w", did, err) 339 } 340 if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 341 return fmt.Errorf("delete failed for %s: %w", did, err) 342 } 343 } 344 345 if err := tx.DeleteBucket([]byte(metaBucket())); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 346 return fmt.Errorf("delete meta: %w", err) 347 } 348 349 if err := tx.DeleteBucket([]byte("quota")); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 350 return fmt.Errorf("delete quota: %w", err) 351 } 352 353 return nil 354 }) 355} 356 357func (s *BoltStorage) GetMulti(keys []string) (map[string]int, error) { 358 res := make(map[string]int, len(keys)) 359 err := s.db.View(func(tx *bbolt.Tx) error { 360 b := tx.Bucket([]byte("quota")) 361 if b == nil { 362 return nil 363 } 364 for _, key := range keys { 365 v := b.Get([]byte(key)) 366 if v == nil { 367 res[key] = 0 368 continue 369 } 370 var val int 371 if err := json.Unmarshal(v, &val); err != nil { 372 return err 373 } 374 res[key] = val 375 } 376 return nil 377 }) 378 return res, err 379} 380 381func (s *BoltStorage) IncrByMulti(deltas map[string]int) error { 382 return s.db.Update(func(tx *bbolt.Tx) error { 383 b, err := tx.CreateBucketIfNotExists([]byte("quota")) 384 if err != nil { 385 return err 386 } 387 for key, n := range deltas { 388 var val int 389 v := b.Get([]byte(key)) 390 if v != nil { 391 if err := json.Unmarshal(v, &val); err != nil { 392 return err 393 } 394 } 395 val += n 396 newV, err := json.Marshal(val) 397 if err != nil { 398 return err 399 } 400 if err := b.Put([]byte(key), newV); err != nil { 401 return err 402 } 403 } 404 return nil 405 }) 406} 407 408func (s *BoltStorage) Stats() (DBStats, error) { 409 var stats DBStats 410 stats.UserStats = make(map[string]any) 411 412 err := s.db.View(func(tx *bbolt.Tx) error { 413 return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { 414 bucketName := string(name) 415 416 if after, ok := strings.CutPrefix(bucketName, "records:"); ok { 417 did := after 418 count := 0 419 b.ForEach(func(k, v []byte) error { 420 count++ 421 return nil 422 }) 423 stats.TotalRecords += count 424 425 userStat := stats.UserStats[did] 426 if userStat == nil { 427 userStat = make(map[string]int) 428 } 429 m := userStat.(map[string]int) 430 m["total"] = count 431 stats.UserStats[did] = m 432 } 433 434 if after, ok := strings.CutPrefix(bucketName, "processed:"); ok { 435 did := after 436 count := 0 437 b.ForEach(func(k, v []byte) error { 438 count++ 439 return nil 440 }) 441 stats.MarkedPublished += count 442 443 userStat := stats.UserStats[did] 444 if userStat == nil { 445 userStat = make(map[string]int) 446 } 447 m := userStat.(map[string]int) 448 m["published"] = count 449 stats.UserStats[did] = m 450 } 451 452 if after, ok := strings.CutPrefix(bucketName, "failed:"); ok { 453 did := after 454 count := 0 455 b.ForEach(func(k, v []byte) error { 456 count++ 457 return nil 458 }) 459 stats.FailedCount += count 460 461 userStat := stats.UserStats[did] 462 if userStat == nil { 463 userStat = make(map[string]int) 464 } 465 m := userStat.(map[string]int) 466 m["failed"] = count 467 stats.UserStats[did] = m 468 } 469 470 return nil 471 }) 472 }) 473 if err != nil { 474 return stats, err 475 } 476 477 stats.UnpublishedCount = stats.TotalRecords - stats.MarkedPublished 478 return stats, nil 479}