1package bgs
2
3import (
4 "context"
5 "fmt"
6 "math/rand/v2"
7 "sync"
8 "time"
9
10 "github.com/bluesky-social/indigo/carstore"
11 "github.com/bluesky-social/indigo/models"
12 "go.opentelemetry.io/otel"
13 "go.opentelemetry.io/otel/attribute"
14)
15
16type queueItem struct {
17 uid models.Uid
18 fast bool
19}
20
21// uniQueue is a queue that only allows one instance of a given uid
22type uniQueue struct {
23 q []queueItem
24 members map[models.Uid]struct{}
25 lk sync.Mutex
26}
27
28// Append appends a uid to the end of the queue if it doesn't already exist
29func (q *uniQueue) Append(uid models.Uid, fast bool) {
30 q.lk.Lock()
31 defer q.lk.Unlock()
32
33 if _, ok := q.members[uid]; ok {
34 return
35 }
36
37 q.q = append(q.q, queueItem{uid: uid, fast: fast})
38 q.members[uid] = struct{}{}
39 compactionQueueDepth.Inc()
40}
41
42// Prepend prepends a uid to the beginning of the queue if it doesn't already exist
43func (q *uniQueue) Prepend(uid models.Uid, fast bool) {
44 q.lk.Lock()
45 defer q.lk.Unlock()
46
47 if _, ok := q.members[uid]; ok {
48 return
49 }
50
51 q.q = append([]queueItem{{uid: uid, fast: fast}}, q.q...)
52 q.members[uid] = struct{}{}
53 compactionQueueDepth.Inc()
54}
55
56// Has returns true if the queue contains the given uid
57func (q *uniQueue) Has(uid models.Uid) bool {
58 q.lk.Lock()
59 defer q.lk.Unlock()
60
61 _, ok := q.members[uid]
62 return ok
63}
64
65// Remove removes the given uid from the queue
66func (q *uniQueue) Remove(uid models.Uid) {
67 q.lk.Lock()
68 defer q.lk.Unlock()
69
70 if _, ok := q.members[uid]; !ok {
71 return
72 }
73
74 for i, item := range q.q {
75 if item.uid == uid {
76 q.q = append(q.q[:i], q.q[i+1:]...)
77 break
78 }
79 }
80
81 delete(q.members, uid)
82 compactionQueueDepth.Dec()
83}
84
85// Pop pops the first item off the front of the queue
86func (q *uniQueue) Pop() (queueItem, bool) {
87 q.lk.Lock()
88 defer q.lk.Unlock()
89
90 if len(q.q) == 0 {
91 return queueItem{}, false
92 }
93
94 item := q.q[0]
95 q.q = q.q[1:]
96 delete(q.members, item.uid)
97
98 compactionQueueDepth.Dec()
99 return item, true
100}
101
102// PopRandom pops a random item off the of the queue
103// Note: this disrupts the sorted order of the queue and in-order is no longer quite in-order. The randomly popped element is replaced with the last element.
104func (q *uniQueue) PopRandom() (queueItem, bool) {
105 q.lk.Lock()
106 defer q.lk.Unlock()
107
108 if len(q.q) == 0 {
109 return queueItem{}, false
110 }
111
112 var item queueItem
113 if len(q.q) == 1 {
114 item = q.q[0]
115 q.q = nil
116 } else {
117 pos := rand.IntN(len(q.q))
118 item = q.q[pos]
119 last := len(q.q) - 1
120 q.q[pos] = q.q[last]
121 q.q = q.q[:last]
122 }
123 delete(q.members, item.uid)
124
125 compactionQueueDepth.Dec()
126 return item, true
127}
128
129type CompactorState struct {
130 latestUID models.Uid
131 latestDID string
132 status string
133 stats *carstore.CompactionStats
134}
135
136func (cstate *CompactorState) set(uid models.Uid, did, status string, stats *carstore.CompactionStats) {
137 cstate.latestUID = uid
138 cstate.latestDID = did
139 cstate.status = status
140 cstate.stats = stats
141}
142
143// Compactor is a compactor daemon that compacts repos in the background
144type Compactor struct {
145 q *uniQueue
146 stateLk sync.RWMutex
147 exit chan struct{}
148 requeueInterval time.Duration
149 requeueLimit int
150 requeueShardCount int
151 requeueFast bool
152
153 numWorkers int
154 wg sync.WaitGroup
155}
156
157type CompactorOptions struct {
158 RequeueInterval time.Duration
159 RequeueLimit int
160 RequeueShardCount int
161 RequeueFast bool
162 NumWorkers int
163}
164
165func DefaultCompactorOptions() *CompactorOptions {
166 return &CompactorOptions{
167 RequeueInterval: time.Hour * 4,
168 RequeueLimit: 0,
169 RequeueShardCount: 50,
170 RequeueFast: true,
171 NumWorkers: 2,
172 }
173}
174
175func NewCompactor(opts *CompactorOptions) *Compactor {
176 if opts == nil {
177 opts = DefaultCompactorOptions()
178 }
179
180 return &Compactor{
181 q: &uniQueue{
182 members: make(map[models.Uid]struct{}),
183 },
184 exit: make(chan struct{}),
185 requeueInterval: opts.RequeueInterval,
186 requeueLimit: opts.RequeueLimit,
187 requeueFast: opts.RequeueFast,
188 requeueShardCount: opts.RequeueShardCount,
189 numWorkers: opts.NumWorkers,
190 }
191}
192
193type compactionStats struct {
194 Completed map[models.Uid]*carstore.CompactionStats
195 Targets []carstore.CompactionTarget
196}
197
198var errNoReposToCompact = fmt.Errorf("no repos to compact")
199
200// Start starts the compactor
201func (c *Compactor) Start(bgs *BGS) {
202 log.Info("starting compactor")
203 c.wg.Add(c.numWorkers)
204 for i := range c.numWorkers {
205 strategy := NextInOrder
206 if i%2 != 0 {
207 strategy = NextRandom
208 }
209 go c.doWork(bgs, strategy)
210 }
211 if c.requeueInterval > 0 {
212 go func() {
213 log.Info("starting compactor requeue routine",
214 "interval", c.requeueInterval,
215 "limit", c.requeueLimit,
216 "shardCount", c.requeueShardCount,
217 "fast", c.requeueFast,
218 )
219
220 t := time.NewTicker(c.requeueInterval)
221 for {
222 select {
223 case <-c.exit:
224 return
225 case <-t.C:
226 ctx := context.Background()
227 ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine")
228 if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil {
229 log.Error("failed to enqueue all repos", "err", err)
230 }
231 span.End()
232 }
233 }
234 }()
235 }
236}
237
238// Shutdown shuts down the compactor
239func (c *Compactor) Shutdown() {
240 log.Info("stopping compactor")
241 close(c.exit)
242 c.wg.Wait()
243 log.Info("compactor stopped")
244}
245
246func (c *Compactor) doWork(bgs *BGS, strategy NextStrategy) {
247 defer c.wg.Done()
248 for {
249 select {
250 case <-c.exit:
251 log.Info("compactor worker exiting, no more active compactions running")
252 return
253 default:
254 }
255
256 ctx := context.Background()
257 start := time.Now()
258 state, err := c.compactNext(ctx, bgs, strategy)
259 if err != nil {
260 if err == errNoReposToCompact {
261 log.Debug("no repos to compact, waiting and retrying")
262 time.Sleep(time.Second * 5)
263 continue
264 }
265 log.Error("failed to compact repo",
266 "err", err,
267 "uid", state.latestUID,
268 "repo", state.latestDID,
269 "status", state.status,
270 "stats", state.stats,
271 "duration", time.Since(start),
272 )
273 // Pause for a bit to avoid spamming failed compactions
274 time.Sleep(time.Millisecond * 100)
275 } else {
276 log.Info("compacted repo",
277 "uid", state.latestUID,
278 "repo", state.latestDID,
279 "status", state.status,
280 "stats", state.stats,
281 "duration", time.Since(start),
282 )
283 }
284 }
285}
286
287type NextStrategy int
288
289const (
290 NextInOrder NextStrategy = iota
291 NextRandom
292)
293
294func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStrategy) (CompactorState, error) {
295 ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext")
296 defer span.End()
297
298 var item queueItem
299 var ok bool
300 switch strategy {
301 case NextRandom:
302 item, ok = c.q.PopRandom()
303 default:
304 item, ok = c.q.Pop()
305 }
306 if !ok {
307 return CompactorState{}, errNoReposToCompact
308 }
309
310 state := CompactorState{
311 latestUID: item.uid,
312 latestDID: "unknown",
313 status: "getting_user",
314 }
315
316 user, err := bgs.lookupUserByUID(ctx, item.uid)
317 if err != nil {
318 span.RecordError(err)
319 state.status = "failed_getting_user"
320 err := fmt.Errorf("failed to get user %d: %w", item.uid, err)
321 return state, err
322 }
323
324 span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid)))
325
326 state.latestDID = user.Did
327
328 start := time.Now()
329 st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast)
330 if err != nil {
331 span.RecordError(err)
332 state.status = "failed_compacting"
333 err := fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err)
334 return state, err
335 }
336 compactionDuration.Observe(time.Since(start).Seconds())
337
338 span.SetAttributes(
339 attribute.Int("shards.deleted", st.ShardsDeleted),
340 attribute.Int("shards.new", st.NewShards),
341 attribute.Int("dupes", st.DupeCount),
342 attribute.Int("shards.skipped", st.SkippedShards),
343 attribute.Int("refs", st.TotalRefs),
344 )
345
346 state.status = "done"
347 state.stats = st
348
349 return state, nil
350}
351
352func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool) {
353 ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo")
354 defer span.End()
355 log.Info("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
356 c.q.Append(user.ID, fast)
357}
358
359// EnqueueAllRepos enqueues all repos for compaction
360// lim is the maximum number of repos to enqueue
361// shardCount is the number of shards to compact per user (0 = default of 50)
362// fast is whether to use the fast compaction method (skip large shards)
363func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shardCount int, fast bool) error {
364 ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueAllRepos")
365 defer span.End()
366
367 span.SetAttributes(
368 attribute.Int("lim", lim),
369 attribute.Int("shardCount", shardCount),
370 attribute.Bool("fast", fast),
371 )
372
373 if shardCount == 0 {
374 shardCount = 20
375 }
376
377 span.SetAttributes(attribute.Int("clampedShardCount", shardCount))
378
379 log := log.With("source", "compactor_enqueue_all_repos", "lim", lim, "shardCount", shardCount, "fast", fast)
380 log.Info("enqueueing all repos")
381
382 repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, shardCount)
383 if err != nil {
384 return fmt.Errorf("failed to get repos to compact: %w", err)
385 }
386
387 span.SetAttributes(attribute.Int("repos", len(repos)))
388
389 if lim > 0 && len(repos) > lim {
390 repos = repos[:lim]
391 }
392
393 span.SetAttributes(attribute.Int("clampedRepos", len(repos)))
394
395 for _, r := range repos {
396 c.q.Append(r.Usr, fast)
397 }
398
399 log.Info("done enqueueing all repos", "repos_enqueued", len(repos))
400
401 return nil
402}