···88 "errors"
99 "fmt"
1010 "io"
1111+ "maps"
1112 "net/http"
1213 "strings"
1314 "sync"
1415 "time"
1616+1717+ "github.com/charmbracelet/log"
1518)
16191720const (
···367370}
368371369372// SearchActors searches for actors (users) matching the query string.
370370-// Returns actor profiles with pagination support.
371373func (s *BlueskyService) SearchActors(ctx context.Context, query string, limit int, cursor string) (*SearchActorsResponse, error) {
372374 urlPath := fmt.Sprintf("/xrpc/app.bsky.actor.searchActors?q=%s&limit=%d", strings.ReplaceAll(query, " ", "+"), limit)
373375 if cursor != "" {
···465467}
466468467469// GetLastPostDate fetches the most recent post date for an actor.
468468-// Returns zero time if the actor has no posts or if an error occurs.
469470func (s *BlueskyService) GetLastPostDate(ctx context.Context, actor string) (time.Time, error) {
470471 feed, err := s.GetAuthorFeed(ctx, actor, 1, "")
471472 if err != nil {
···485486 return lastPost, nil
486487}
487488488488-// BatchGetLastPostDates fetches last post dates for multiple actors concurrently.
489489+// BatchGetLastPostDates fetches last post dates for multiple actors concurrently, as a map of actor DID/handle to their last post date..
489490// Uses a semaphore to limit concurrent requests to maxConcurrent.
490490-// Returns a map of actor DID/handle to their last post date.
491491func (s *BlueskyService) BatchGetLastPostDates(ctx context.Context, actors []string, maxConcurrent int) map[string]time.Time {
492492 results := make(map[string]time.Time)
493493 resultsMu := &sync.Mutex{}
···517517 return results
518518}
519519520520-// BatchGetProfiles fetches full profiles for multiple actors concurrently.
521521-// Uses a semaphore to limit concurrent requests to maxConcurrent.
522522-// Returns a map of actor DID/handle to their full ActorProfile.
520520+// BatchGetProfiles fetches full profiles for multiple actors concurrently, as a map of actor DID/handle to their full ActorProfile.
521521+// Uses a semaphore to limit concurrent requests to maxConcurrent..
523522func (s *BlueskyService) BatchGetProfiles(ctx context.Context, actors []string, maxConcurrent int) map[string]*ActorProfile {
524523 results := make(map[string]*ActorProfile)
525524 resultsMu := &sync.Mutex{}
···556555 SampleSize int
557556}
558557559559-// BatchGetPostRates calculates posting rates for multiple actors concurrently.
558558+// BatchGetPostRates calculates posting rates for multiple actors concurrently, as a map of actor DID/handle to their [PostRate] metrics.
559559+//
560560// Samples recent posts from each actor and calculates posts per day over the lookback period.
561561// Uses a semaphore to limit concurrent requests to maxConcurrent.
562562-// Returns a map of actor DID/handle to their PostRate metrics.
563562func (s *BlueskyService) BatchGetPostRates(ctx context.Context, actors []string, sampleSize int, lookbackDays int, maxConcurrent int, progressFn func(current, total int)) map[string]*PostRate {
564563 results := make(map[string]*PostRate)
565564 resultsMu := &sync.Mutex{}
···601600 return
602601 }
603602604604- // Get the last post date
605603 lastPost, err := time.Parse(time.RFC3339, feed.Feed[0].Post.IndexedAt)
606604 if err != nil {
607605 return
608606 }
609607610610- // Filter posts within lookback window
611608 cutoffTime := time.Now().AddDate(0, 0, -lookbackDays)
612609 recentPosts := 0
613610 for _, post := range feed.Feed {
···744741745742 return time.Unix(claims.Exp, 0), nil
746743}
744744+745745+// BatchGetPostRatesCached calculates posting rates for multiple actors with caching support.
746746+//
747747+// Checks cache first, falls back to API for cache misses, and saves results to cache.
748748+// If refresh is true, bypasses cache and refetches all data from API.
749749+//
750750+// TODO: Implement per-item TTL for more efficient cache invalidation.
751751+// FIXME: this function signature is ridiculous
752752+func (s *BlueskyService) BatchGetPostRatesCached(ctx context.Context, cacheRepo *CacheRepository, actors []string, sampleSize int, lookbackDays int, maxConcurrent int, refresh bool, progressFn func(current, total int)) map[string]*PostRate {
753753+ results := make(map[string]*PostRate)
754754+755755+ // If not refreshing, try to load from cache
756756+ var actorsToFetch []string
757757+ if !refresh {
758758+ cached, err := cacheRepo.GetPostRates(ctx, actors)
759759+ if err == nil {
760760+ for _, actor := range actors {
761761+ if cache, ok := cached[actor]; ok && cache.IsFresh() {
762762+ results[actor] = &PostRate{
763763+ PostsPerDay: cache.PostsPerDay,
764764+ LastPostDate: cache.LastPostDate,
765765+ SampleSize: cache.SampleSize,
766766+ }
767767+ } else {
768768+ actorsToFetch = append(actorsToFetch, actor)
769769+ }
770770+ }
771771+ } else {
772772+ actorsToFetch = actors
773773+ }
774774+ } else {
775775+ actorsToFetch = actors
776776+ }
777777+778778+ if len(actorsToFetch) > 0 {
779779+ apiResults := s.BatchGetPostRates(ctx, actorsToFetch, sampleSize, lookbackDays, maxConcurrent, progressFn)
780780+ maps.Copy(results, apiResults)
781781+782782+ var cacheModels []*PostRateCacheModel
783783+ for actor, postRate := range apiResults {
784784+ cacheModels = append(cacheModels, &PostRateCacheModel{
785785+ ActorDid: actor,
786786+ PostsPerDay: postRate.PostsPerDay,
787787+ LastPostDate: postRate.LastPostDate,
788788+ SampleSize: postRate.SampleSize,
789789+ })
790790+ }
791791+792792+ if len(cacheModels) > 0 {
793793+ if err := cacheRepo.SavePostRates(ctx, cacheModels); err != nil {
794794+ // Log error but don't fail - cache save is non-critical
795795+ }
796796+ }
797797+ }
798798+799799+ return results
800800+}
801801+802802+// BatchGetLastPostDatesCached fetches last post dates for multiple actors with caching support.
803803+//
804804+// Checks cache first, falls back to API for cache misses, and saves results to cache.
805805+// If refresh is true, bypasses cache and refetches all data from API.
806806+//
807807+// TODO: Implement per-item TTL for more efficient cache invalidation.
808808+// FIXME: this function signature is ridiculous
809809+func (s *BlueskyService) BatchGetLastPostDatesCached(ctx context.Context, cacheRepo *CacheRepository, actors []string, maxConcurrent int, refresh bool) map[string]time.Time {
810810+ results := make(map[string]time.Time)
811811+812812+ var actorsToFetch []string
813813+ if !refresh {
814814+ cached, err := cacheRepo.GetActivities(ctx, actors)
815815+ if err == nil {
816816+ for _, actor := range actors {
817817+ if cache, ok := cached[actor]; ok && cache.IsFresh() {
818818+ if cache.HasPosted() {
819819+ results[actor] = cache.LastPostDate
820820+ }
821821+ } else {
822822+ actorsToFetch = append(actorsToFetch, actor)
823823+ }
824824+ }
825825+ } else {
826826+ actorsToFetch = actors
827827+ }
828828+ } else {
829829+ actorsToFetch = actors
830830+ }
831831+832832+ if len(actorsToFetch) > 0 {
833833+ apiResults := s.BatchGetLastPostDates(ctx, actorsToFetch, maxConcurrent)
834834+ maps.Copy(results, apiResults)
835835+836836+ var cacheModels []*ActivityCacheModel
837837+ for _, actor := range actorsToFetch {
838838+ lastPostDate, hasPosted := apiResults[actor]
839839+ cacheModels = append(cacheModels, &ActivityCacheModel{
840840+ ActorDid: actor,
841841+ LastPostDate: lastPostDate,
842842+ FetchedAt: time.Now(),
843843+ ExpiresAt: time.Now().Add(24 * time.Hour),
844844+ })
845845+846846+ if !hasPosted {
847847+ results[actor] = time.Time{}
848848+ }
849849+ }
850850+851851+ if len(cacheModels) > 0 {
852852+ if err := cacheRepo.SaveActivities(ctx, cacheModels); err != nil {
853853+ log.Warnf("save failed with error %v", err.Error())
854854+ }
855855+ }
856856+ }
857857+858858+ return results
859859+}
+41
cli/internal/store/cache_model.go
···11+package store
22+33+import "time"
44+55+// PostRateCacheModel represents a cached post rate computation for an actor.
66+// Stores expensive post rate calculations with TTL support (24 hours default).
77+type PostRateCacheModel struct {
88+ ActorDid string
99+ PostsPerDay float64
1010+ LastPostDate time.Time
1111+ SampleSize int
1212+ FetchedAt time.Time
1313+ ExpiresAt time.Time
1414+}
1515+1616+// IsFresh returns true if the cached post rate has not expired.
1717+// Post rates expire after 24 hours by default.
1818+func (m *PostRateCacheModel) IsFresh() bool {
1919+ return time.Now().Before(m.ExpiresAt)
2020+}
2121+2222+// ActivityCacheModel represents cached activity data (last post date) for an actor.
2323+// Stores last post date lookups with TTL support (24 hours default).
2424+type ActivityCacheModel struct {
2525+ ActorDid string
2626+ LastPostDate time.Time // May be zero if actor has never posted
2727+ FetchedAt time.Time
2828+ ExpiresAt time.Time
2929+}
3030+3131+// IsFresh returns true if the cached activity data has not expired.
3232+// Activity data expires after 24 hours by default.
3333+func (m *ActivityCacheModel) IsFresh() bool {
3434+ return time.Now().Before(m.ExpiresAt)
3535+}
3636+3737+// HasPosted returns true if the actor has posted at least once.
3838+// A zero LastPostDate indicates the actor has never posted.
3939+func (m *ActivityCacheModel) HasPosted() bool {
4040+ return !m.LastPostDate.IsZero()
4141+}
+475
cli/internal/store/cache_repo.go
···11+package store
22+33+import (
44+ "context"
55+ "database/sql"
66+ "errors"
77+ "time"
88+99+ _ "github.com/mattn/go-sqlite3"
1010+ "github.com/stormlightlabs/skypanel/cli/internal/config"
1111+)
1212+1313+// CacheRepository manages post rate and activity caches using SQLite.
1414+//
1515+// Provides methods for storing and retrieving expensive computation results stored as [PostRateCacheModel] or [ActivityCacheModel].
1616+type CacheRepository struct {
1717+ db *sql.DB
1818+}
1919+2020+// NewCacheRepository creates a new cache repository with SQLite backend
2121+func NewCacheRepository() (*CacheRepository, error) {
2222+ dbPath, err := config.GetCacheDB()
2323+ if err != nil {
2424+ return nil, err
2525+ }
2626+2727+ db, err := sql.Open("sqlite3", dbPath)
2828+ if err != nil {
2929+ return nil, err
3030+ }
3131+3232+ return &CacheRepository{db: db}, nil
3333+}
3434+3535+// Init ensures database schema is initialized via migrations
3636+func (r *CacheRepository) Init(ctx context.Context) error {
3737+ if err := config.EnsureConfigDir(); err != nil {
3838+ return err
3939+ }
4040+ return RunMigrations(r.db)
4141+}
4242+4343+// Close releases database connection
4444+func (r *CacheRepository) Close() error {
4545+ return r.db.Close()
4646+}
4747+4848+// GetPostRate retrieves cached post rate for an actor
4949+func (r *CacheRepository) GetPostRate(ctx context.Context, actorDid string) (*PostRateCacheModel, error) {
5050+ query := `
5151+ SELECT actor_did, posts_per_day, last_post_date, sample_size, fetched_at, expires_at
5252+ FROM cached_post_rates
5353+ WHERE actor_did = ? AND expires_at > ?
5454+ `
5555+5656+ var cache PostRateCacheModel
5757+ var lastPostDate sql.NullTime
5858+5959+ err := r.db.QueryRowContext(ctx, query, actorDid, time.Now()).Scan(
6060+ &cache.ActorDid,
6161+ &cache.PostsPerDay,
6262+ &lastPostDate,
6363+ &cache.SampleSize,
6464+ &cache.FetchedAt,
6565+ &cache.ExpiresAt,
6666+ )
6767+6868+ if lastPostDate.Valid {
6969+ cache.LastPostDate = lastPostDate.Time
7070+ }
7171+7272+ if err != nil {
7373+ if errors.Is(err, sql.ErrNoRows) {
7474+ return nil, nil
7575+ }
7676+ return nil, &RepositoryError{Op: "GetPostRate", Err: err}
7777+ }
7878+7979+ return &cache, nil
8080+}
8181+8282+// GetPostRates retrieves cached post rates for multiple actors in a single query,
8383+// as a map of actorDid -> PostRateCacheModel for found entries.
8484+func (r *CacheRepository) GetPostRates(ctx context.Context, actorDids []string) (map[string]*PostRateCacheModel, error) {
8585+ if len(actorDids) == 0 {
8686+ return make(map[string]*PostRateCacheModel), nil
8787+ }
8888+8989+ query := `
9090+ SELECT actor_did, posts_per_day, last_post_date, sample_size, fetched_at, expires_at
9191+ FROM cached_post_rates
9292+ WHERE actor_did IN (` + buildPlaceholders(len(actorDids)) + `) AND expires_at > ?
9393+ `
9494+9595+ args := make([]interface{}, len(actorDids)+1)
9696+ for i, did := range actorDids {
9797+ args[i] = did
9898+ }
9999+ args[len(actorDids)] = time.Now()
100100+101101+ rows, err := r.db.QueryContext(ctx, query, args...)
102102+ if err != nil {
103103+ return nil, &RepositoryError{Op: "GetPostRates", Err: err}
104104+ }
105105+ defer rows.Close()
106106+107107+ result := make(map[string]*PostRateCacheModel)
108108+ for rows.Next() {
109109+ var cache PostRateCacheModel
110110+ var lastPostDate sql.NullTime
111111+112112+ err := rows.Scan(
113113+ &cache.ActorDid,
114114+ &cache.PostsPerDay,
115115+ &lastPostDate,
116116+ &cache.SampleSize,
117117+ &cache.FetchedAt,
118118+ &cache.ExpiresAt,
119119+ )
120120+ if err != nil {
121121+ return nil, &RepositoryError{Op: "GetPostRates", Err: err}
122122+ }
123123+124124+ if lastPostDate.Valid {
125125+ cache.LastPostDate = lastPostDate.Time
126126+ }
127127+128128+ result[cache.ActorDid] = &cache
129129+ }
130130+131131+ return result, rows.Err()
132132+}
133133+134134+// SavePostRate saves or updates a post rate cache entry
135135+func (r *CacheRepository) SavePostRate(ctx context.Context, cache *PostRateCacheModel) error {
136136+ if cache.FetchedAt.IsZero() {
137137+ cache.FetchedAt = time.Now()
138138+ }
139139+ if cache.ExpiresAt.IsZero() {
140140+ cache.ExpiresAt = time.Now().Add(24 * time.Hour)
141141+ }
142142+143143+ query := `
144144+ INSERT INTO cached_post_rates (actor_did, posts_per_day, last_post_date, sample_size, fetched_at, expires_at)
145145+ VALUES (?, ?, ?, ?, ?, ?)
146146+ ON CONFLICT(actor_did) DO UPDATE SET
147147+ posts_per_day = excluded.posts_per_day,
148148+ last_post_date = excluded.last_post_date,
149149+ sample_size = excluded.sample_size,
150150+ fetched_at = excluded.fetched_at,
151151+ expires_at = excluded.expires_at
152152+ `
153153+154154+ var lastPostDate interface{}
155155+ if !cache.LastPostDate.IsZero() {
156156+ lastPostDate = cache.LastPostDate
157157+ }
158158+159159+ _, err := r.db.ExecContext(ctx, query,
160160+ cache.ActorDid,
161161+ cache.PostsPerDay,
162162+ lastPostDate,
163163+ cache.SampleSize,
164164+ cache.FetchedAt,
165165+ cache.ExpiresAt,
166166+ )
167167+168168+ if err != nil {
169169+ return &RepositoryError{Op: "SavePostRate", Err: err}
170170+ }
171171+172172+ return nil
173173+}
174174+175175+// SavePostRates saves multiple post rate cache entries in a transaction
176176+func (r *CacheRepository) SavePostRates(ctx context.Context, caches []*PostRateCacheModel) error {
177177+ if len(caches) == 0 {
178178+ return nil
179179+ }
180180+181181+ tx, err := r.db.BeginTx(ctx, nil)
182182+ if err != nil {
183183+ return &RepositoryError{Op: "SavePostRates", Err: err}
184184+ }
185185+ defer tx.Rollback()
186186+187187+ stmt, err := tx.PrepareContext(ctx, `
188188+ INSERT INTO cached_post_rates (actor_did, posts_per_day, last_post_date, sample_size, fetched_at, expires_at)
189189+ VALUES (?, ?, ?, ?, ?, ?)
190190+ ON CONFLICT(actor_did) DO UPDATE SET
191191+ posts_per_day = excluded.posts_per_day,
192192+ last_post_date = excluded.last_post_date,
193193+ sample_size = excluded.sample_size,
194194+ fetched_at = excluded.fetched_at,
195195+ expires_at = excluded.expires_at
196196+ `)
197197+ if err != nil {
198198+ return &RepositoryError{Op: "SavePostRates", Err: err}
199199+ }
200200+ defer stmt.Close()
201201+202202+ for _, cache := range caches {
203203+ if cache.FetchedAt.IsZero() {
204204+ cache.FetchedAt = time.Now()
205205+ }
206206+ if cache.ExpiresAt.IsZero() {
207207+ cache.ExpiresAt = time.Now().Add(24 * time.Hour)
208208+ }
209209+210210+ var lastPostDate interface{}
211211+ if !cache.LastPostDate.IsZero() {
212212+ lastPostDate = cache.LastPostDate
213213+ }
214214+215215+ _, err := stmt.ExecContext(ctx,
216216+ cache.ActorDid,
217217+ cache.PostsPerDay,
218218+ lastPostDate,
219219+ cache.SampleSize,
220220+ cache.FetchedAt,
221221+ cache.ExpiresAt,
222222+ )
223223+ if err != nil {
224224+ return &RepositoryError{Op: "SavePostRates", Err: err}
225225+ }
226226+ }
227227+228228+ if err := tx.Commit(); err != nil {
229229+ return &RepositoryError{Op: "SavePostRates", Err: err}
230230+ }
231231+232232+ return nil
233233+}
234234+235235+// DeletePostRate removes a post rate cache entry
236236+func (r *CacheRepository) DeletePostRate(ctx context.Context, actorDid string) error {
237237+ query := "DELETE FROM cached_post_rates WHERE actor_did = ?"
238238+ _, err := r.db.ExecContext(ctx, query, actorDid)
239239+ if err != nil {
240240+ return &RepositoryError{Op: "DeletePostRate", Err: err}
241241+ }
242242+ return nil
243243+}
244244+245245+// GetActivity retrieves cached activity data for an actor
246246+func (r *CacheRepository) GetActivity(ctx context.Context, actorDid string) (*ActivityCacheModel, error) {
247247+ query := `
248248+ SELECT actor_did, last_post_date, fetched_at, expires_at
249249+ FROM cached_activity
250250+ WHERE actor_did = ? AND expires_at > ?
251251+ `
252252+253253+ var cache ActivityCacheModel
254254+ var lastPostDate sql.NullTime
255255+256256+ err := r.db.QueryRowContext(ctx, query, actorDid, time.Now()).Scan(
257257+ &cache.ActorDid,
258258+ &lastPostDate,
259259+ &cache.FetchedAt,
260260+ &cache.ExpiresAt,
261261+ )
262262+263263+ if lastPostDate.Valid {
264264+ cache.LastPostDate = lastPostDate.Time
265265+ }
266266+267267+ if err != nil {
268268+ if errors.Is(err, sql.ErrNoRows) {
269269+ return nil, nil
270270+ }
271271+ return nil, &RepositoryError{Op: "GetActivity", Err: err}
272272+ }
273273+274274+ return &cache, nil
275275+}
276276+277277+// GetActivities retrieves cached activity data for multiple actors in a single query,
278278+// as a map of actorDid -> ActivityCacheModel for found entries.
279279+func (r *CacheRepository) GetActivities(ctx context.Context, actorDids []string) (map[string]*ActivityCacheModel, error) {
280280+ if len(actorDids) == 0 {
281281+ return make(map[string]*ActivityCacheModel), nil
282282+ }
283283+284284+ query := `
285285+ SELECT actor_did, last_post_date, fetched_at, expires_at
286286+ FROM cached_activity
287287+ WHERE actor_did IN (` + buildPlaceholders(len(actorDids)) + `) AND expires_at > ?
288288+ `
289289+290290+ args := make([]interface{}, len(actorDids)+1)
291291+ for i, did := range actorDids {
292292+ args[i] = did
293293+ }
294294+ args[len(actorDids)] = time.Now()
295295+296296+ rows, err := r.db.QueryContext(ctx, query, args...)
297297+ if err != nil {
298298+ return nil, &RepositoryError{Op: "GetActivities", Err: err}
299299+ }
300300+ defer rows.Close()
301301+302302+ result := make(map[string]*ActivityCacheModel)
303303+ for rows.Next() {
304304+ var cache ActivityCacheModel
305305+ var lastPostDate sql.NullTime
306306+307307+ err := rows.Scan(
308308+ &cache.ActorDid,
309309+ &lastPostDate,
310310+ &cache.FetchedAt,
311311+ &cache.ExpiresAt,
312312+ )
313313+ if err != nil {
314314+ return nil, &RepositoryError{Op: "GetActivities", Err: err}
315315+ }
316316+317317+ if lastPostDate.Valid {
318318+ cache.LastPostDate = lastPostDate.Time
319319+ }
320320+321321+ result[cache.ActorDid] = &cache
322322+ }
323323+324324+ return result, rows.Err()
325325+}
326326+327327+// SaveActivity saves or updates an activity cache entry
328328+func (r *CacheRepository) SaveActivity(ctx context.Context, cache *ActivityCacheModel) error {
329329+ if cache.FetchedAt.IsZero() {
330330+ cache.FetchedAt = time.Now()
331331+ }
332332+ if cache.ExpiresAt.IsZero() {
333333+ cache.ExpiresAt = time.Now().Add(24 * time.Hour)
334334+ }
335335+336336+ query := `
337337+ INSERT INTO cached_activity (actor_did, last_post_date, fetched_at, expires_at)
338338+ VALUES (?, ?, ?, ?)
339339+ ON CONFLICT(actor_did) DO UPDATE SET
340340+ last_post_date = excluded.last_post_date,
341341+ fetched_at = excluded.fetched_at,
342342+ expires_at = excluded.expires_at
343343+ `
344344+345345+ var lastPostDate interface{}
346346+ if !cache.LastPostDate.IsZero() {
347347+ lastPostDate = cache.LastPostDate
348348+ }
349349+350350+ _, err := r.db.ExecContext(ctx, query,
351351+ cache.ActorDid,
352352+ lastPostDate,
353353+ cache.FetchedAt,
354354+ cache.ExpiresAt,
355355+ )
356356+357357+ if err != nil {
358358+ return &RepositoryError{Op: "SaveActivity", Err: err}
359359+ }
360360+361361+ return nil
362362+}
363363+364364+// SaveActivities saves multiple activity cache entries in a transaction
365365+func (r *CacheRepository) SaveActivities(ctx context.Context, caches []*ActivityCacheModel) error {
366366+ if len(caches) == 0 {
367367+ return nil
368368+ }
369369+370370+ tx, err := r.db.BeginTx(ctx, nil)
371371+ if err != nil {
372372+ return &RepositoryError{Op: "SaveActivities", Err: err}
373373+ }
374374+ defer tx.Rollback()
375375+376376+ stmt, err := tx.PrepareContext(ctx, `
377377+ INSERT INTO cached_activity (actor_did, last_post_date, fetched_at, expires_at)
378378+ VALUES (?, ?, ?, ?)
379379+ ON CONFLICT(actor_did) DO UPDATE SET
380380+ last_post_date = excluded.last_post_date,
381381+ fetched_at = excluded.fetched_at,
382382+ expires_at = excluded.expires_at
383383+ `)
384384+ if err != nil {
385385+ return &RepositoryError{Op: "SaveActivities", Err: err}
386386+ }
387387+ defer stmt.Close()
388388+389389+ for _, cache := range caches {
390390+ if cache.FetchedAt.IsZero() {
391391+ cache.FetchedAt = time.Now()
392392+ }
393393+ if cache.ExpiresAt.IsZero() {
394394+ cache.ExpiresAt = time.Now().Add(24 * time.Hour)
395395+ }
396396+397397+ var lastPostDate interface{}
398398+ if !cache.LastPostDate.IsZero() {
399399+ lastPostDate = cache.LastPostDate
400400+ }
401401+402402+ _, err := stmt.ExecContext(ctx,
403403+ cache.ActorDid,
404404+ lastPostDate,
405405+ cache.FetchedAt,
406406+ cache.ExpiresAt,
407407+ )
408408+ if err != nil {
409409+ return &RepositoryError{Op: "SaveActivities", Err: err}
410410+ }
411411+ }
412412+413413+ if err := tx.Commit(); err != nil {
414414+ return &RepositoryError{Op: "SaveActivities", Err: err}
415415+ }
416416+417417+ return nil
418418+}
419419+420420+// DeleteActivity removes an activity cache entry
421421+func (r *CacheRepository) DeleteActivity(ctx context.Context, actorDid string) error {
422422+ query := "DELETE FROM cached_activity WHERE actor_did = ?"
423423+ _, err := r.db.ExecContext(ctx, query, actorDid)
424424+ if err != nil {
425425+ return &RepositoryError{Op: "DeleteActivity", Err: err}
426426+ }
427427+ return nil
428428+}
429429+430430+// DeleteExpiredPostRates removes all expired post rate cache entries
431431+func (r *CacheRepository) DeleteExpiredPostRates(ctx context.Context) (int64, error) {
432432+ query := "DELETE FROM cached_post_rates WHERE expires_at < ?"
433433+ result, err := r.db.ExecContext(ctx, query, time.Now())
434434+ if err != nil {
435435+ return 0, &RepositoryError{Op: "DeleteExpiredPostRates", Err: err}
436436+ }
437437+438438+ rows, err := result.RowsAffected()
439439+ if err != nil {
440440+ return 0, &RepositoryError{Op: "DeleteExpiredPostRates", Err: err}
441441+ }
442442+443443+ return rows, nil
444444+}
445445+446446+// DeleteExpiredActivities removes all expired activity cache entries
447447+func (r *CacheRepository) DeleteExpiredActivities(ctx context.Context) (int64, error) {
448448+ query := "DELETE FROM cached_activity WHERE expires_at < ?"
449449+ result, err := r.db.ExecContext(ctx, query, time.Now())
450450+ if err != nil {
451451+ return 0, &RepositoryError{Op: "DeleteExpiredActivities", Err: err}
452452+ }
453453+454454+ rows, err := result.RowsAffected()
455455+ if err != nil {
456456+ return 0, &RepositoryError{Op: "DeleteExpiredActivities", Err: err}
457457+ }
458458+459459+ return rows, nil
460460+}
461461+462462+// buildPlaceholders generates SQL placeholder string for IN queries.
463463+//
464464+// Example: buildPlaceholders(3) returns "?,?,?"
465465+func buildPlaceholders(count int) string {
466466+ if count == 0 {
467467+ return ""
468468+ }
469469+470470+ placeholders := "?"
471471+ for i := 1; i < count; i++ {
472472+ placeholders += ",?"
473473+ }
474474+ return placeholders
475475+}
···11+DROP TABLE IF EXISTS cached_activity;
22+DROP TABLE IF EXISTS cached_post_rates;
33+DROP TABLE IF EXISTS follower_snapshot_entries;
44+DROP TABLE IF EXISTS follower_snapshots;
···11+-- Follower snapshots metadata
22+CREATE TABLE IF NOT EXISTS follower_snapshots (
33+ id TEXT PRIMARY KEY,
44+ created_at DATETIME NOT NULL,
55+ user_did TEXT NOT NULL,
66+ snapshot_type TEXT NOT NULL,
77+ total_count INTEGER NOT NULL,
88+ expires_at DATETIME NOT NULL
99+);
1010+1111+CREATE INDEX IF NOT EXISTS idx_snapshots_user_type ON follower_snapshots(user_did, snapshot_type);
1212+CREATE INDEX IF NOT EXISTS idx_snapshots_created ON follower_snapshots(created_at);
1313+CREATE INDEX IF NOT EXISTS idx_snapshots_expires ON follower_snapshots(expires_at);
1414+1515+-- Snapshot entries (actors in each snapshot)
1616+CREATE TABLE IF NOT EXISTS follower_snapshot_entries (
1717+ snapshot_id TEXT NOT NULL,
1818+ actor_did TEXT NOT NULL,
1919+ indexed_at TEXT,
2020+ PRIMARY KEY(snapshot_id, actor_did),
2121+ FOREIGN KEY(snapshot_id) REFERENCES follower_snapshots(id) ON DELETE CASCADE
2222+);
2323+2424+CREATE INDEX IF NOT EXISTS idx_snapshot_entries_actor ON follower_snapshot_entries(actor_did);
2525+2626+-- Cached post rate metrics
2727+CREATE TABLE IF NOT EXISTS cached_post_rates (
2828+ actor_did TEXT PRIMARY KEY,
2929+ posts_per_day REAL NOT NULL,
3030+ last_post_date DATETIME,
3131+ sample_size INTEGER NOT NULL,
3232+ fetched_at DATETIME NOT NULL,
3333+ expires_at DATETIME NOT NULL
3434+);
3535+3636+CREATE INDEX IF NOT EXISTS idx_post_rates_fetched ON cached_post_rates(fetched_at);
3737+CREATE INDEX IF NOT EXISTS idx_post_rates_expires ON cached_post_rates(expires_at);
3838+3939+-- Cached activity data (last post dates)
4040+CREATE TABLE IF NOT EXISTS cached_activity (
4141+ actor_did TEXT PRIMARY KEY,
4242+ last_post_date DATETIME,
4343+ fetched_at DATETIME NOT NULL,
4444+ expires_at DATETIME NOT NULL
4545+);
4646+4747+CREATE INDEX IF NOT EXISTS idx_activity_fetched ON cached_activity(fetched_at);
4848+CREATE INDEX IF NOT EXISTS idx_activity_expires ON cached_activity(expires_at);
+34
cli/internal/store/snapshot_model.go
···11+package store
22+33+import "time"
44+55+// SnapshotModel represents a follower or following snapshot with metadata.
66+// Stores snapshot metadata with TTL support (24 hours default).
77+type SnapshotModel struct {
88+ id string
99+ createdAt time.Time
1010+ UserDid string
1111+ SnapshotType string // "followers" or "following"
1212+ TotalCount int
1313+ ExpiresAt time.Time
1414+}
1515+1616+func (m *SnapshotModel) ID() string { return m.id }
1717+func (m *SnapshotModel) CreatedAt() time.Time { return m.createdAt }
1818+func (m *SnapshotModel) UpdatedAt() time.Time { return m.createdAt } // Snapshots are immutable
1919+func (m *SnapshotModel) SetID(id string) { m.id = id }
2020+func (m *SnapshotModel) SetCreatedAt(t time.Time) { m.createdAt = t }
2121+func (m *SnapshotModel) SetUpdatedAt(t time.Time) {} // Snapshots are immutable
2222+2323+// IsFresh returns true if the snapshot has not expired. Snapshots expire after 24 hours by default.
2424+func (m *SnapshotModel) IsFresh() bool {
2525+ return time.Now().Before(m.ExpiresAt)
2626+}
2727+2828+// SnapshotEntry represents an actor in a snapshot with minimal cached data.
2929+// Linked to [SnapshotModel] via snapshot_id foreign key.
3030+type SnapshotEntry struct {
3131+ SnapshotID string
3232+ ActorDid string
3333+ IndexedAt string // When the follow relationship was indexed by Bluesky
3434+}
+362
cli/internal/store/snapshot_repo.go
···11+package store
22+33+import (
44+ "context"
55+ "database/sql"
66+ "errors"
77+ "time"
88+99+ _ "github.com/mattn/go-sqlite3"
1010+ "github.com/stormlightlabs/skypanel/cli/internal/config"
1111+)
1212+1313+// SnapshotRepository implements Repository for SnapshotModel using SQLite.
1414+// Manages follower/following snapshots with entries for diff and historical comparison.
1515+type SnapshotRepository struct {
1616+ db *sql.DB
1717+}
1818+1919+// NewSnapshotRepository creates a new snapshot repository with SQLite backend
2020+func NewSnapshotRepository() (*SnapshotRepository, error) {
2121+ dbPath, err := config.GetCacheDB()
2222+ if err != nil {
2323+ return nil, err
2424+ }
2525+2626+ db, err := sql.Open("sqlite3", dbPath)
2727+ if err != nil {
2828+ return nil, err
2929+ }
3030+3131+ return &SnapshotRepository{db: db}, nil
3232+}
3333+3434+// Init ensures database schema is initialized via migrations
3535+func (r *SnapshotRepository) Init(ctx context.Context) error {
3636+ if err := config.EnsureConfigDir(); err != nil {
3737+ return err
3838+ }
3939+ return RunMigrations(r.db)
4040+}
4141+4242+// Close releases database connection
4343+func (r *SnapshotRepository) Close() error {
4444+ return r.db.Close()
4545+}
4646+4747+// Get retrieves a snapshot by ID
4848+func (r *SnapshotRepository) Get(ctx context.Context, id string) (Model, error) {
4949+ query := `
5050+ SELECT id, created_at, user_did, snapshot_type, total_count, expires_at
5151+ FROM follower_snapshots
5252+ WHERE id = ?
5353+ `
5454+5555+ var snapshot SnapshotModel
5656+ var snapshotID string
5757+ var createdAt, expiresAt time.Time
5858+5959+ err := r.db.QueryRowContext(ctx, query, id).Scan(
6060+ &snapshotID,
6161+ &createdAt,
6262+ &snapshot.UserDid,
6363+ &snapshot.SnapshotType,
6464+ &snapshot.TotalCount,
6565+ &expiresAt,
6666+ )
6767+6868+ snapshot.SetID(snapshotID)
6969+ snapshot.SetCreatedAt(createdAt)
7070+ snapshot.ExpiresAt = expiresAt
7171+7272+ if err != nil {
7373+ if errors.Is(err, sql.ErrNoRows) {
7474+ return nil, &RepositoryError{Op: "Get", Err: errors.New("snapshot not found")}
7575+ }
7676+ return nil, &RepositoryError{Op: "Get", Err: err}
7777+ }
7878+7979+ return &snapshot, nil
8080+}
8181+8282+// List retrieves all snapshots ordered by creation date (newest first)
8383+func (r *SnapshotRepository) List(ctx context.Context) ([]Model, error) {
8484+ query := `
8585+ SELECT id, created_at, user_did, snapshot_type, total_count, expires_at
8686+ FROM follower_snapshots
8787+ ORDER BY created_at DESC
8888+ `
8989+9090+ rows, err := r.db.QueryContext(ctx, query)
9191+ if err != nil {
9292+ return nil, &RepositoryError{Op: "List", Err: err}
9393+ }
9494+ defer rows.Close()
9595+9696+ var snapshots []Model
9797+ for rows.Next() {
9898+ var snapshot SnapshotModel
9999+ var snapshotID string
100100+ var createdAt, expiresAt time.Time
101101+102102+ err := rows.Scan(
103103+ &snapshotID,
104104+ &createdAt,
105105+ &snapshot.UserDid,
106106+ &snapshot.SnapshotType,
107107+ &snapshot.TotalCount,
108108+ &expiresAt,
109109+ )
110110+ if err != nil {
111111+ return nil, &RepositoryError{Op: "List", Err: err}
112112+ }
113113+114114+ snapshot.SetID(snapshotID)
115115+ snapshot.SetCreatedAt(createdAt)
116116+ snapshot.ExpiresAt = expiresAt
117117+ snapshots = append(snapshots, &snapshot)
118118+ }
119119+120120+ return snapshots, rows.Err()
121121+}
122122+123123+// Save creates a new snapshot (snapshots are immutable, no updates)
124124+func (r *SnapshotRepository) Save(ctx context.Context, model Model) error {
125125+ snapshot, ok := model.(*SnapshotModel)
126126+ if !ok {
127127+ return &RepositoryError{Op: "Save", Err: errors.New("invalid model type: expected *SnapshotModel")}
128128+ }
129129+130130+ if snapshot.ID() == "" {
131131+ snapshot.SetID(GenerateUUID())
132132+ snapshot.SetCreatedAt(time.Now())
133133+ }
134134+135135+ if snapshot.ExpiresAt.IsZero() {
136136+ snapshot.ExpiresAt = time.Now().Add(24 * time.Hour)
137137+ }
138138+139139+ query := `
140140+ INSERT INTO follower_snapshots (id, created_at, user_did, snapshot_type, total_count, expires_at)
141141+ VALUES (?, ?, ?, ?, ?, ?)
142142+ `
143143+144144+ _, err := r.db.ExecContext(ctx, query,
145145+ snapshot.ID(),
146146+ snapshot.CreatedAt(),
147147+ snapshot.UserDid,
148148+ snapshot.SnapshotType,
149149+ snapshot.TotalCount,
150150+ snapshot.ExpiresAt,
151151+ )
152152+153153+ if err != nil {
154154+ return &RepositoryError{Op: "Save", Err: err}
155155+ }
156156+157157+ return nil
158158+}
159159+160160+// Delete removes a snapshot by ID (cascade deletes entries)
161161+func (r *SnapshotRepository) Delete(ctx context.Context, id string) error {
162162+ query := "DELETE FROM follower_snapshots WHERE id = ?"
163163+ result, err := r.db.ExecContext(ctx, query, id)
164164+ if err != nil {
165165+ return &RepositoryError{Op: "Delete", Err: err}
166166+ }
167167+168168+ rows, err := result.RowsAffected()
169169+ if err != nil {
170170+ return &RepositoryError{Op: "Delete", Err: err}
171171+ }
172172+173173+ if rows == 0 {
174174+ return &RepositoryError{Op: "Delete", Err: errors.New("snapshot not found")}
175175+ }
176176+177177+ return nil
178178+}
179179+180180+// FindByUserAndType retrieves the most recent fresh snapshot for a user and type.
181181+func (r *SnapshotRepository) FindByUserAndType(ctx context.Context, userDid, snapshotType string) (*SnapshotModel, error) {
182182+ query := `
183183+ SELECT id, created_at, user_did, snapshot_type, total_count, expires_at
184184+ FROM follower_snapshots
185185+ WHERE user_did = ? AND snapshot_type = ? AND expires_at > ?
186186+ ORDER BY created_at DESC
187187+ LIMIT 1
188188+ `
189189+190190+ var snapshot SnapshotModel
191191+ var snapshotID string
192192+ var createdAt, expiresAt time.Time
193193+194194+ err := r.db.QueryRowContext(ctx, query, userDid, snapshotType, time.Now()).Scan(
195195+ &snapshotID,
196196+ &createdAt,
197197+ &snapshot.UserDid,
198198+ &snapshot.SnapshotType,
199199+ &snapshot.TotalCount,
200200+ &expiresAt,
201201+ )
202202+203203+ if err != nil {
204204+ if errors.Is(err, sql.ErrNoRows) {
205205+ return nil, nil
206206+ }
207207+ return nil, &RepositoryError{Op: "FindByUserAndType", Err: err}
208208+ }
209209+210210+ snapshot.SetID(snapshotID)
211211+ snapshot.SetCreatedAt(createdAt)
212212+ snapshot.ExpiresAt = expiresAt
213213+214214+ return &snapshot, nil
215215+}
216216+217217+// FindByUserTypeAndDate retrieves a snapshot for a user, type, and specific date, closest to (but not after) the specified date.
218218+func (r *SnapshotRepository) FindByUserTypeAndDate(ctx context.Context, userDid, snapshotType string, date time.Time) (*SnapshotModel, error) {
219219+ query := `
220220+ SELECT id, created_at, user_did, snapshot_type, total_count, expires_at
221221+ FROM follower_snapshots
222222+ WHERE user_did = ? AND snapshot_type = ? AND created_at <= ?
223223+ ORDER BY created_at DESC
224224+ LIMIT 1
225225+ `
226226+227227+ var snapshot SnapshotModel
228228+ var snapshotID string
229229+ var createdAt, expiresAt time.Time
230230+231231+ err := r.db.QueryRowContext(ctx, query, userDid, snapshotType, date).Scan(
232232+ &snapshotID,
233233+ &createdAt,
234234+ &snapshot.UserDid,
235235+ &snapshot.SnapshotType,
236236+ &snapshot.TotalCount,
237237+ &expiresAt,
238238+ )
239239+240240+ if err != nil {
241241+ if errors.Is(err, sql.ErrNoRows) {
242242+ return nil, nil
243243+ }
244244+ return nil, &RepositoryError{Op: "FindByUserTypeAndDate", Err: err}
245245+ }
246246+247247+ snapshot.SetID(snapshotID)
248248+ snapshot.SetCreatedAt(createdAt)
249249+ snapshot.ExpiresAt = expiresAt
250250+ return &snapshot, nil
251251+}
252252+253253+// SaveEntry saves a single snapshot entry
254254+func (r *SnapshotRepository) SaveEntry(ctx context.Context, entry *SnapshotEntry) error {
255255+ query := `
256256+ INSERT INTO follower_snapshot_entries (snapshot_id, actor_did, indexed_at)
257257+ VALUES (?, ?, ?)
258258+ `
259259+260260+ _, err := r.db.ExecContext(ctx, query, entry.SnapshotID, entry.ActorDid, entry.IndexedAt)
261261+ if err != nil {
262262+ return &RepositoryError{Op: "SaveEntry", Err: err}
263263+ }
264264+ return nil
265265+}
266266+267267+// SaveEntries saves multiple snapshot entries in a transaction for efficiency
268268+func (r *SnapshotRepository) SaveEntries(ctx context.Context, entries []*SnapshotEntry) error {
269269+ tx, err := r.db.BeginTx(ctx, nil)
270270+ if err != nil {
271271+ return &RepositoryError{Op: "SaveEntries", Err: err}
272272+ }
273273+ defer tx.Rollback()
274274+275275+ stmt, err := tx.PrepareContext(ctx, `
276276+ INSERT INTO follower_snapshot_entries (snapshot_id, actor_did, indexed_at)
277277+ VALUES (?, ?, ?)
278278+ `)
279279+ if err != nil {
280280+ return &RepositoryError{Op: "SaveEntries", Err: err}
281281+ }
282282+ defer stmt.Close()
283283+284284+ for _, entry := range entries {
285285+ _, err := stmt.ExecContext(ctx, entry.SnapshotID, entry.ActorDid, entry.IndexedAt)
286286+ if err != nil {
287287+ return &RepositoryError{Op: "SaveEntries", Err: err}
288288+ }
289289+ }
290290+291291+ if err := tx.Commit(); err != nil {
292292+ return &RepositoryError{Op: "SaveEntries", Err: err}
293293+ }
294294+ return nil
295295+}
296296+297297+// GetEntries retrieves all entries for a snapshot
298298+func (r *SnapshotRepository) GetEntries(ctx context.Context, snapshotID string) ([]*SnapshotEntry, error) {
299299+ query := `
300300+ SELECT snapshot_id, actor_did, indexed_at
301301+ FROM follower_snapshot_entries
302302+ WHERE snapshot_id = ?
303303+ `
304304+305305+ rows, err := r.db.QueryContext(ctx, query, snapshotID)
306306+ if err != nil {
307307+ return nil, &RepositoryError{Op: "GetEntries", Err: err}
308308+ }
309309+ defer rows.Close()
310310+311311+ var entries []*SnapshotEntry
312312+ for rows.Next() {
313313+ var entry SnapshotEntry
314314+ err := rows.Scan(&entry.SnapshotID, &entry.ActorDid, &entry.IndexedAt)
315315+ if err != nil {
316316+ return nil, &RepositoryError{Op: "GetEntries", Err: err}
317317+ }
318318+ entries = append(entries, &entry)
319319+ }
320320+ return entries, rows.Err()
321321+}
322322+323323+// GetActorDids retrieves just the actor DIDs for a snapshot (efficient for diffs)
324324+func (r *SnapshotRepository) GetActorDids(ctx context.Context, snapshotID string) ([]string, error) {
325325+ query := `
326326+ SELECT actor_did
327327+ FROM follower_snapshot_entries
328328+ WHERE snapshot_id = ?
329329+ `
330330+331331+ rows, err := r.db.QueryContext(ctx, query, snapshotID)
332332+ if err != nil {
333333+ return nil, &RepositoryError{Op: "GetActorDids", Err: err}
334334+ }
335335+ defer rows.Close()
336336+337337+ var dids []string
338338+ for rows.Next() {
339339+ var did string
340340+ err := rows.Scan(&did)
341341+ if err != nil {
342342+ return nil, &RepositoryError{Op: "GetActorDids", Err: err}
343343+ }
344344+ dids = append(dids, did)
345345+ }
346346+ return dids, rows.Err()
347347+}
348348+349349+// DeleteExpiredSnapshots removes all expired snapshots and their entries
350350+func (r *SnapshotRepository) DeleteExpiredSnapshots(ctx context.Context) (int64, error) {
351351+ query := "DELETE FROM follower_snapshots WHERE expires_at < ?"
352352+ result, err := r.db.ExecContext(ctx, query, time.Now())
353353+ if err != nil {
354354+ return 0, &RepositoryError{Op: "DeleteExpiredSnapshots", Err: err}
355355+ }
356356+357357+ rows, err := result.RowsAffected()
358358+ if err != nil {
359359+ return 0, &RepositoryError{Op: "DeleteExpiredSnapshots", Err: err}
360360+ }
361361+ return rows, nil
362362+}