package cache import ( "bytes" "encoding/json" "errors" "fmt" "iter" "os" "path/filepath" "strings" "time" "go.etcd.io/bbolt" berr "go.etcd.io/bbolt/errors" ) var _ Storage = (*BoltStorage)(nil) var ErrCacheNotFound = errors.New("cache not found") type BoltStorage struct { db *bbolt.DB path string } func NewBoltStorage(readOnly bool) (*BoltStorage, error) { dir, err := cacheDir() if err != nil { return nil, err } if err := os.MkdirAll(dir, 0o755); err != nil { return nil, err } db, err := bbolt.Open(filepath.Join(dir, CacheFile), 0o644, &bbolt.Options{ Timeout: time.Second, ReadOnly: readOnly, }) if err != nil { return nil, err } return &BoltStorage{db: db, path: dir}, nil } func (s *BoltStorage) Close() error { if s.db == nil { return nil } return s.db.Close() } func (s *BoltStorage) SaveRecords(did string, records map[string][]byte) error { if did == "" { return errors.New("did cannot be empty") } return s.db.Update(func(tx *bbolt.Tx) error { b, err := tx.CreateBucketIfNotExists([]byte(recordsBucket(did))) if err != nil { return err } for key, data := range records { if err := b.Put([]byte(key), data); err != nil { return err } } return s.setTimestamp(tx, did) }) } func (s *BoltStorage) IterateUnpublished(did string, reverse bool) iter.Seq2[string, []byte] { return func(yield func(key string, rec []byte) bool) { published, err := s.GetPublished(did) if err != nil { return } _ = s.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket([]byte(recordsBucket(did))) if b == nil { return nil } if reverse { cursor := b.Cursor() for k, v := cursor.Last(); k != nil; k, v = cursor.Prev() { key := string(k) if !published[key] { // Clone key and value to avoid holding read locks if !yield(key, bytes.Clone(v)) { break // Stop iteration when yield returns false } } } } else { return b.ForEach(func(k, v []byte) error { key := string(k) if published[key] { return nil } // Clone key and value to avoid holding read locks if !yield(key, bytes.Clone(v)) { return errors.New("stop iteration") } return nil }) } return nil }) } } func (s *BoltStorage) IteratePublished(did string, reverse bool) iter.Seq2[string, []byte] { return func(yield func(key string, rec []byte) bool) { published, err := s.GetPublished(did) if err != nil { return } _ = s.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket([]byte(recordsBucket(did))) if b == nil { return nil } if reverse { cursor := b.Cursor() k, v := cursor.Last() for k != nil { key := string(k) if published[key] { // Clone key and value to avoid holding read locks if !yield(key, bytes.Clone(v)) { break // Stop iteration when yield returns false } } k, v = cursor.Prev() } } else { return b.ForEach(func(k, v []byte) error { key := string(k) if !published[key] { return nil } // Clone key and value to avoid holding read locks if !yield(key, bytes.Clone(v)) { return errors.New("stop iteration") } return nil }) } return nil }) } } func (s *BoltStorage) IterateFailed(did string) func(yield func(key string, rec []byte, errMsg string) bool) { return func(yield func(key string, rec []byte, errMsg string) bool) { _ = s.db.View(func(tx *bbolt.Tx) error { fb := tx.Bucket([]byte(failedBucket(did))) if fb == nil { return nil } rb := tx.Bucket([]byte(recordsBucket(did))) if rb == nil { return nil } return fb.ForEach(func(k, v []byte) error { key := string(k) errMsg := string(v) rec := rb.Get(k) // Clone to avoid holding read lock if !yield(key, bytes.Clone(rec), errMsg) { return errors.New("stop iteration") } return nil }) }) } } func (s *BoltStorage) MarkPublished(did string, keys ...string) error { return s.db.Update(func(tx *bbolt.Tx) error { b, err := tx.CreateBucketIfNotExists([]byte(processedBucket(did))) if err != nil { return err } for _, k := range keys { if err := b.Put([]byte(k), []byte{1}); err != nil { return err } } return nil }) } func (s *BoltStorage) MarkFailed(did string, keys []string, errMsg string) error { return s.db.Update(func(tx *bbolt.Tx) error { b, err := tx.CreateBucketIfNotExists([]byte(failedBucket(did))) if err != nil { return err } // Also mark as processed so we don't try again pb, err := tx.CreateBucketIfNotExists([]byte(processedBucket(did))) if err != nil { return err } for _, k := range keys { if err := b.Put([]byte(k), []byte(errMsg)); err != nil { return err } if err := pb.Put([]byte(k), []byte{0}); err != nil { // 0 for failed return err } } return nil }) } func (s *BoltStorage) RemoveFailed(did string, keys ...string) error { return s.db.Update(func(tx *bbolt.Tx) error { b := tx.Bucket([]byte(failedBucket(did))) if b == nil { return nil } for _, k := range keys { if err := b.Delete([]byte(k)); err != nil { return err } } return nil }) } func (s *BoltStorage) GetPublished(did string) (map[string]bool, error) { res := make(map[string]bool) err := s.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket([]byte(processedBucket(did))) if b == nil { return nil } return b.ForEach(func(k, v []byte) error { res[string(k)] = true return nil }) }) return res, err } func (s *BoltStorage) IsValid(did string) bool { if did == "" { return false } ts, err := s.Timestamp(did) if err != nil { return false } return time.Since(ts) < CacheTTL } func (s *BoltStorage) Timestamp(did string) (time.Time, error) { var ts time.Time err := s.db.View(func(tx *bbolt.Tx) error { metaBkt := tx.Bucket([]byte(metaBucket())) if metaBkt == nil { return ErrCacheNotFound } data := metaBkt.Get([]byte(metaPrefixTimestamp + did)) if data == nil { return ErrCacheNotFound } return json.Unmarshal(data, &ts) }) return ts, err } func (s *BoltStorage) setTimestamp(tx *bbolt.Tx, did string) error { metaBkt, err := tx.CreateBucketIfNotExists([]byte(metaBucket())) if err != nil { return fmt.Errorf("create meta bucket: %w", err) } ts := time.Now() data, err := json.Marshal(ts) if err != nil { return fmt.Errorf("marshal timestamp: %w", err) } return metaBkt.Put([]byte(metaPrefixTimestamp+did), data) } func (s *BoltStorage) Clear(did string) error { if did == "" { return errors.New("did cannot be empty") } return s.db.Update(func(tx *bbolt.Tx) error { if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { return fmt.Errorf("delete records bucket: %w", err) } if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { return fmt.Errorf("delete processed bucket: %w", err) } if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { return fmt.Errorf("delete failed bucket: %w", err) } metaBkt := tx.Bucket([]byte(metaBucket())) if metaBkt != nil { if err := metaBkt.Delete([]byte(metaPrefixTimestamp + did)); err != nil { return fmt.Errorf("delete timestamp: %w", err) } } return nil }) } func (s *BoltStorage) ClearAll() error { return s.db.Update(func(tx *bbolt.Tx) error { var dids []string tx.ForEach(func(name []byte, b *bbolt.Bucket) error { if b == nil { return nil } bucketName := string(name) if after, ok := strings.CutPrefix(bucketName, "records:"); ok { dids = append(dids, after) } return nil }) for _, did := range dids { if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { return fmt.Errorf("delete records for %s: %w", did, err) } if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { return fmt.Errorf("delete processed for %s: %w", did, err) } if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { return fmt.Errorf("delete failed for %s: %w", did, err) } } if err := tx.DeleteBucket([]byte(metaBucket())); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { return fmt.Errorf("delete meta: %w", err) } if err := tx.DeleteBucket([]byte("quota")); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { return fmt.Errorf("delete quota: %w", err) } return nil }) } func (s *BoltStorage) GetMulti(keys []string) (map[string]int, error) { res := make(map[string]int, len(keys)) err := s.db.View(func(tx *bbolt.Tx) error { b := tx.Bucket([]byte("quota")) if b == nil { return nil } for _, key := range keys { v := b.Get([]byte(key)) if v == nil { res[key] = 0 continue } var val int if err := json.Unmarshal(v, &val); err != nil { return err } res[key] = val } return nil }) return res, err } func (s *BoltStorage) IncrByMulti(deltas map[string]int) error { return s.db.Update(func(tx *bbolt.Tx) error { b, err := tx.CreateBucketIfNotExists([]byte("quota")) if err != nil { return err } for key, n := range deltas { var val int v := b.Get([]byte(key)) if v != nil { if err := json.Unmarshal(v, &val); err != nil { return err } } val += n newV, err := json.Marshal(val) if err != nil { return err } if err := b.Put([]byte(key), newV); err != nil { return err } } return nil }) } func (s *BoltStorage) Stats() (DBStats, error) { var stats DBStats stats.UserStats = make(map[string]any) err := s.db.View(func(tx *bbolt.Tx) error { return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { bucketName := string(name) if after, ok := strings.CutPrefix(bucketName, "records:"); ok { did := after count := 0 b.ForEach(func(k, v []byte) error { count++ return nil }) stats.TotalRecords += count userStat := stats.UserStats[did] if userStat == nil { userStat = make(map[string]int) } m := userStat.(map[string]int) m["total"] = count stats.UserStats[did] = m } if after, ok := strings.CutPrefix(bucketName, "processed:"); ok { did := after count := 0 b.ForEach(func(k, v []byte) error { count++ return nil }) stats.MarkedPublished += count userStat := stats.UserStats[did] if userStat == nil { userStat = make(map[string]int) } m := userStat.(map[string]int) m["published"] = count stats.UserStats[did] = m } if after, ok := strings.CutPrefix(bucketName, "failed:"); ok { did := after count := 0 b.ForEach(func(k, v []byte) error { count++ return nil }) stats.FailedCount += count userStat := stats.UserStats[did] if userStat == nil { userStat = make(map[string]int) } m := userStat.(map[string]int) m["failed"] = count stats.UserStats[did] = m } return nil }) }) if err != nil { return stats, err } stats.UnpublishedCount = stats.TotalRecords - stats.MarkedPublished return stats, nil }