fork of indigo with slightly nicer lexgen
at main 9.5 kB view raw
1package bgs 2 3import ( 4 "context" 5 "fmt" 6 "math/rand/v2" 7 "sync" 8 "time" 9 10 "github.com/bluesky-social/indigo/carstore" 11 "github.com/bluesky-social/indigo/models" 12 "go.opentelemetry.io/otel" 13 "go.opentelemetry.io/otel/attribute" 14) 15 16type queueItem struct { 17 uid models.Uid 18 fast bool 19} 20 21// uniQueue is a queue that only allows one instance of a given uid 22type uniQueue struct { 23 q []queueItem 24 members map[models.Uid]struct{} 25 lk sync.Mutex 26} 27 28// Append appends a uid to the end of the queue if it doesn't already exist 29func (q *uniQueue) Append(uid models.Uid, fast bool) { 30 q.lk.Lock() 31 defer q.lk.Unlock() 32 33 if _, ok := q.members[uid]; ok { 34 return 35 } 36 37 q.q = append(q.q, queueItem{uid: uid, fast: fast}) 38 q.members[uid] = struct{}{} 39 compactionQueueDepth.Inc() 40} 41 42// Prepend prepends a uid to the beginning of the queue if it doesn't already exist 43func (q *uniQueue) Prepend(uid models.Uid, fast bool) { 44 q.lk.Lock() 45 defer q.lk.Unlock() 46 47 if _, ok := q.members[uid]; ok { 48 return 49 } 50 51 q.q = append([]queueItem{{uid: uid, fast: fast}}, q.q...) 52 q.members[uid] = struct{}{} 53 compactionQueueDepth.Inc() 54} 55 56// Has returns true if the queue contains the given uid 57func (q *uniQueue) Has(uid models.Uid) bool { 58 q.lk.Lock() 59 defer q.lk.Unlock() 60 61 _, ok := q.members[uid] 62 return ok 63} 64 65// Remove removes the given uid from the queue 66func (q *uniQueue) Remove(uid models.Uid) { 67 q.lk.Lock() 68 defer q.lk.Unlock() 69 70 if _, ok := q.members[uid]; !ok { 71 return 72 } 73 74 for i, item := range q.q { 75 if item.uid == uid { 76 q.q = append(q.q[:i], q.q[i+1:]...) 77 break 78 } 79 } 80 81 delete(q.members, uid) 82 compactionQueueDepth.Dec() 83} 84 85// Pop pops the first item off the front of the queue 86func (q *uniQueue) Pop() (queueItem, bool) { 87 q.lk.Lock() 88 defer q.lk.Unlock() 89 90 if len(q.q) == 0 { 91 return queueItem{}, false 92 } 93 94 item := q.q[0] 95 q.q = q.q[1:] 96 delete(q.members, item.uid) 97 98 compactionQueueDepth.Dec() 99 return item, true 100} 101 102// PopRandom pops a random item off the of the queue 103// Note: this disrupts the sorted order of the queue and in-order is no longer quite in-order. The randomly popped element is replaced with the last element. 104func (q *uniQueue) PopRandom() (queueItem, bool) { 105 q.lk.Lock() 106 defer q.lk.Unlock() 107 108 if len(q.q) == 0 { 109 return queueItem{}, false 110 } 111 112 var item queueItem 113 if len(q.q) == 1 { 114 item = q.q[0] 115 q.q = nil 116 } else { 117 pos := rand.IntN(len(q.q)) 118 item = q.q[pos] 119 last := len(q.q) - 1 120 q.q[pos] = q.q[last] 121 q.q = q.q[:last] 122 } 123 delete(q.members, item.uid) 124 125 compactionQueueDepth.Dec() 126 return item, true 127} 128 129type CompactorState struct { 130 latestUID models.Uid 131 latestDID string 132 status string 133 stats *carstore.CompactionStats 134} 135 136func (cstate *CompactorState) set(uid models.Uid, did, status string, stats *carstore.CompactionStats) { 137 cstate.latestUID = uid 138 cstate.latestDID = did 139 cstate.status = status 140 cstate.stats = stats 141} 142 143// Compactor is a compactor daemon that compacts repos in the background 144type Compactor struct { 145 q *uniQueue 146 stateLk sync.RWMutex 147 exit chan struct{} 148 requeueInterval time.Duration 149 requeueLimit int 150 requeueShardCount int 151 requeueFast bool 152 153 numWorkers int 154 wg sync.WaitGroup 155} 156 157type CompactorOptions struct { 158 RequeueInterval time.Duration 159 RequeueLimit int 160 RequeueShardCount int 161 RequeueFast bool 162 NumWorkers int 163} 164 165func DefaultCompactorOptions() *CompactorOptions { 166 return &CompactorOptions{ 167 RequeueInterval: time.Hour * 4, 168 RequeueLimit: 0, 169 RequeueShardCount: 50, 170 RequeueFast: true, 171 NumWorkers: 2, 172 } 173} 174 175func NewCompactor(opts *CompactorOptions) *Compactor { 176 if opts == nil { 177 opts = DefaultCompactorOptions() 178 } 179 180 return &Compactor{ 181 q: &uniQueue{ 182 members: make(map[models.Uid]struct{}), 183 }, 184 exit: make(chan struct{}), 185 requeueInterval: opts.RequeueInterval, 186 requeueLimit: opts.RequeueLimit, 187 requeueFast: opts.RequeueFast, 188 requeueShardCount: opts.RequeueShardCount, 189 numWorkers: opts.NumWorkers, 190 } 191} 192 193type compactionStats struct { 194 Completed map[models.Uid]*carstore.CompactionStats 195 Targets []carstore.CompactionTarget 196} 197 198var errNoReposToCompact = fmt.Errorf("no repos to compact") 199 200// Start starts the compactor 201func (c *Compactor) Start(bgs *BGS) { 202 log.Info("starting compactor") 203 c.wg.Add(c.numWorkers) 204 for i := range c.numWorkers { 205 strategy := NextInOrder 206 if i%2 != 0 { 207 strategy = NextRandom 208 } 209 go c.doWork(bgs, strategy) 210 } 211 if c.requeueInterval > 0 { 212 go func() { 213 log.Info("starting compactor requeue routine", 214 "interval", c.requeueInterval, 215 "limit", c.requeueLimit, 216 "shardCount", c.requeueShardCount, 217 "fast", c.requeueFast, 218 ) 219 220 t := time.NewTicker(c.requeueInterval) 221 for { 222 select { 223 case <-c.exit: 224 return 225 case <-t.C: 226 ctx := context.Background() 227 ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine") 228 if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil { 229 log.Error("failed to enqueue all repos", "err", err) 230 } 231 span.End() 232 } 233 } 234 }() 235 } 236} 237 238// Shutdown shuts down the compactor 239func (c *Compactor) Shutdown() { 240 log.Info("stopping compactor") 241 close(c.exit) 242 c.wg.Wait() 243 log.Info("compactor stopped") 244} 245 246func (c *Compactor) doWork(bgs *BGS, strategy NextStrategy) { 247 defer c.wg.Done() 248 for { 249 select { 250 case <-c.exit: 251 log.Info("compactor worker exiting, no more active compactions running") 252 return 253 default: 254 } 255 256 ctx := context.Background() 257 start := time.Now() 258 state, err := c.compactNext(ctx, bgs, strategy) 259 if err != nil { 260 if err == errNoReposToCompact { 261 log.Debug("no repos to compact, waiting and retrying") 262 time.Sleep(time.Second * 5) 263 continue 264 } 265 log.Error("failed to compact repo", 266 "err", err, 267 "uid", state.latestUID, 268 "repo", state.latestDID, 269 "status", state.status, 270 "stats", state.stats, 271 "duration", time.Since(start), 272 ) 273 // Pause for a bit to avoid spamming failed compactions 274 time.Sleep(time.Millisecond * 100) 275 } else { 276 log.Info("compacted repo", 277 "uid", state.latestUID, 278 "repo", state.latestDID, 279 "status", state.status, 280 "stats", state.stats, 281 "duration", time.Since(start), 282 ) 283 } 284 } 285} 286 287type NextStrategy int 288 289const ( 290 NextInOrder NextStrategy = iota 291 NextRandom 292) 293 294func (c *Compactor) compactNext(ctx context.Context, bgs *BGS, strategy NextStrategy) (CompactorState, error) { 295 ctx, span := otel.Tracer("compactor").Start(ctx, "CompactNext") 296 defer span.End() 297 298 var item queueItem 299 var ok bool 300 switch strategy { 301 case NextRandom: 302 item, ok = c.q.PopRandom() 303 default: 304 item, ok = c.q.Pop() 305 } 306 if !ok { 307 return CompactorState{}, errNoReposToCompact 308 } 309 310 state := CompactorState{ 311 latestUID: item.uid, 312 latestDID: "unknown", 313 status: "getting_user", 314 } 315 316 user, err := bgs.lookupUserByUID(ctx, item.uid) 317 if err != nil { 318 span.RecordError(err) 319 state.status = "failed_getting_user" 320 err := fmt.Errorf("failed to get user %d: %w", item.uid, err) 321 return state, err 322 } 323 324 span.SetAttributes(attribute.String("repo", user.Did), attribute.Int("uid", int(item.uid))) 325 326 state.latestDID = user.Did 327 328 start := time.Now() 329 st, err := bgs.repoman.CarStore().CompactUserShards(ctx, item.uid, item.fast) 330 if err != nil { 331 span.RecordError(err) 332 state.status = "failed_compacting" 333 err := fmt.Errorf("failed to compact shards for user %d: %w", item.uid, err) 334 return state, err 335 } 336 compactionDuration.Observe(time.Since(start).Seconds()) 337 338 span.SetAttributes( 339 attribute.Int("shards.deleted", st.ShardsDeleted), 340 attribute.Int("shards.new", st.NewShards), 341 attribute.Int("dupes", st.DupeCount), 342 attribute.Int("shards.skipped", st.SkippedShards), 343 attribute.Int("refs", st.TotalRefs), 344 ) 345 346 state.status = "done" 347 state.stats = st 348 349 return state, nil 350} 351 352func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool) { 353 ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo") 354 defer span.End() 355 log.Info("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast) 356 c.q.Append(user.ID, fast) 357} 358 359// EnqueueAllRepos enqueues all repos for compaction 360// lim is the maximum number of repos to enqueue 361// shardCount is the number of shards to compact per user (0 = default of 50) 362// fast is whether to use the fast compaction method (skip large shards) 363func (c *Compactor) EnqueueAllRepos(ctx context.Context, bgs *BGS, lim int, shardCount int, fast bool) error { 364 ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueAllRepos") 365 defer span.End() 366 367 span.SetAttributes( 368 attribute.Int("lim", lim), 369 attribute.Int("shardCount", shardCount), 370 attribute.Bool("fast", fast), 371 ) 372 373 if shardCount == 0 { 374 shardCount = 20 375 } 376 377 span.SetAttributes(attribute.Int("clampedShardCount", shardCount)) 378 379 log := log.With("source", "compactor_enqueue_all_repos", "lim", lim, "shardCount", shardCount, "fast", fast) 380 log.Info("enqueueing all repos") 381 382 repos, err := bgs.repoman.CarStore().GetCompactionTargets(ctx, shardCount) 383 if err != nil { 384 return fmt.Errorf("failed to get repos to compact: %w", err) 385 } 386 387 span.SetAttributes(attribute.Int("repos", len(repos))) 388 389 if lim > 0 && len(repos) > lim { 390 repos = repos[:lim] 391 } 392 393 span.SetAttributes(attribute.Int("clampedRepos", len(repos))) 394 395 for _, r := range repos { 396 c.q.Append(r.Usr, fast) 397 } 398 399 log.Info("done enqueueing all repos", "repos_enqueued", len(repos)) 400 401 return nil 402}