porting all github actions from bluesky-social/indigo to tangled CI
at main 9.0 kB view raw
1package backfill 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "strings" 8 "sync" 9 "time" 10 11 "github.com/bluesky-social/indigo/repomgr" 12 "github.com/ipfs/go-cid" 13 "gorm.io/gorm" 14) 15 16type Gormjob struct { 17 repo string 18 state string 19 rev string 20 21 lk sync.Mutex 22 bufferedOps []*opSet 23 24 dbj *GormDBJob 25 db *gorm.DB 26 27 createdAt time.Time 28 updatedAt time.Time 29 30 retryCount int 31 retryAfter *time.Time 32} 33 34type GormDBJob struct { 35 gorm.Model 36 Repo string `gorm:"unique;index"` 37 State string `gorm:"index:enqueued_job_idx,where:state = 'enqueued';index:retryable_job_idx,where:state like 'failed%'"` 38 Rev string 39 RetryCount int 40 RetryAfter *time.Time `gorm:"index:retryable_job_idx,sort:desc"` 41} 42 43// Gormstore is a gorm-backed implementation of the Backfill Store interface 44type Gormstore struct { 45 lk sync.RWMutex 46 jobs map[string]*Gormjob 47 48 qlk sync.Mutex 49 taskQueue []string 50 51 db *gorm.DB 52} 53 54func NewGormstore(db *gorm.DB) *Gormstore { 55 return &Gormstore{ 56 jobs: make(map[string]*Gormjob), 57 db: db, 58 } 59} 60 61func (s *Gormstore) LoadJobs(ctx context.Context) error { 62 s.qlk.Lock() 63 defer s.qlk.Unlock() 64 return s.loadJobs(ctx, 20_000) 65} 66 67func (s *Gormstore) loadJobs(ctx context.Context, limit int) error { 68 enqueuedIndexClause := "" 69 retryableIndexClause := "" 70 71 // If the DB is a SQLite DB, we can use INDEXED BY to speed up the query 72 if s.db.Dialector.Name() == "sqlite" { 73 enqueuedIndexClause = "INDEXED BY enqueued_job_idx" 74 retryableIndexClause = "INDEXED BY retryable_job_idx" 75 } 76 77 enqueuedSelect := fmt.Sprintf(`SELECT repo FROM gorm_db_jobs %s WHERE state = 'enqueued' LIMIT ?`, enqueuedIndexClause) 78 retryableSelect := fmt.Sprintf(`SELECT repo FROM gorm_db_jobs %s WHERE state like 'failed%%' AND (retry_after = NULL OR retry_after < ?) LIMIT ?`, retryableIndexClause) 79 80 var todo []string 81 if err := s.db.Raw(enqueuedSelect, limit).Scan(&todo).Error; err != nil { 82 return err 83 } 84 85 if len(todo) < limit { 86 var moreTodo []string 87 if err := s.db.Raw(retryableSelect, time.Now(), limit-len(todo)).Scan(&moreTodo).Error; err != nil { 88 return err 89 } 90 todo = append(todo, moreTodo...) 91 } 92 93 s.taskQueue = append(s.taskQueue, todo...) 94 95 return nil 96} 97 98func (s *Gormstore) GetOrCreateJob(ctx context.Context, repo, state string) (Job, error) { 99 j, err := s.getJob(ctx, repo) 100 if err == nil { 101 return j, nil 102 } 103 104 if !errors.Is(err, ErrJobNotFound) { 105 return nil, err 106 } 107 108 if err := s.createJobForRepo(repo, state); err != nil { 109 return nil, err 110 } 111 112 return s.getJob(ctx, repo) 113} 114 115func (s *Gormstore) EnqueueJob(ctx context.Context, repo string) error { 116 _, err := s.GetOrCreateJob(ctx, repo, StateEnqueued) 117 if err != nil { 118 return err 119 } 120 121 s.qlk.Lock() 122 s.taskQueue = append(s.taskQueue, repo) 123 s.qlk.Unlock() 124 125 return nil 126} 127 128func (s *Gormstore) EnqueueJobWithState(ctx context.Context, repo, state string) error { 129 _, err := s.GetOrCreateJob(ctx, repo, state) 130 if err != nil { 131 return err 132 } 133 134 s.qlk.Lock() 135 s.taskQueue = append(s.taskQueue, repo) 136 s.qlk.Unlock() 137 138 return nil 139} 140 141func (s *Gormstore) createJobForRepo(repo, state string) error { 142 dbj := &GormDBJob{ 143 Repo: repo, 144 State: state, 145 } 146 if err := s.db.Create(dbj).Error; err != nil { 147 if err == gorm.ErrDuplicatedKey { 148 return nil 149 } 150 return err 151 } 152 153 s.lk.Lock() 154 defer s.lk.Unlock() 155 156 // Convert it to an in-memory job 157 if _, ok := s.jobs[repo]; ok { 158 // The DB create should have errored if the job already existed, but just in case 159 return fmt.Errorf("job already exists for repo %s", repo) 160 } 161 162 j := &Gormjob{ 163 repo: repo, 164 createdAt: time.Now(), 165 updatedAt: time.Now(), 166 state: state, 167 168 dbj: dbj, 169 db: s.db, 170 } 171 s.jobs[repo] = j 172 173 return nil 174} 175 176func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error) { 177 j.lk.Lock() 178 defer j.lk.Unlock() 179 180 switch j.state { 181 case StateComplete: 182 return false, nil 183 case StateInProgress, StateEnqueued: 184 // keep going and buffer the op 185 default: 186 if strings.HasPrefix(j.state, "failed") { 187 if j.retryCount >= MaxRetries { 188 // Process immediately since we're out of retries 189 return false, nil 190 } 191 // Don't buffer the op since it'll get caught in the next retry (hopefully) 192 return true, nil 193 } 194 return false, fmt.Errorf("invalid job state: %q", j.state) 195 } 196 197 if j.rev >= rev || (since == nil && j.rev != "") { 198 // we've already accounted for this event 199 return false, ErrAlreadyProcessed 200 } 201 202 j.bufferOps(&opSet{since: since, rev: rev, ops: ops}) 203 return true, nil 204} 205 206func (j *Gormjob) bufferOps(ops *opSet) { 207 j.bufferedOps = append(j.bufferedOps, ops) 208 j.updatedAt = time.Now() 209} 210 211func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error) { 212 return s.getJob(ctx, repo) 213} 214 215func (s *Gormstore) getJob(ctx context.Context, repo string) (*Gormjob, error) { 216 cj := s.checkJobCache(ctx, repo) 217 if cj != nil { 218 return cj, nil 219 } 220 221 return s.loadJob(ctx, repo) 222} 223 224func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) { 225 var dbj GormDBJob 226 if err := s.db.Find(&dbj, "repo = ?", repo).Error; err != nil { 227 return nil, err 228 } 229 230 if dbj.ID == 0 { 231 return nil, ErrJobNotFound 232 } 233 234 j := &Gormjob{ 235 repo: dbj.Repo, 236 state: dbj.State, 237 createdAt: dbj.CreatedAt, 238 updatedAt: dbj.UpdatedAt, 239 240 dbj: &dbj, 241 db: s.db, 242 243 retryCount: dbj.RetryCount, 244 retryAfter: dbj.RetryAfter, 245 } 246 s.lk.Lock() 247 defer s.lk.Unlock() 248 // would imply a race condition 249 exist, ok := s.jobs[repo] 250 if ok { 251 return exist, nil 252 } 253 s.jobs[repo] = j 254 return j, nil 255} 256 257func (s *Gormstore) checkJobCache(ctx context.Context, repo string) *Gormjob { 258 s.lk.RLock() 259 defer s.lk.RUnlock() 260 261 j, ok := s.jobs[repo] 262 if !ok || j == nil { 263 return nil 264 } 265 return j 266} 267 268func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) { 269 s.qlk.Lock() 270 defer s.qlk.Unlock() 271 if len(s.taskQueue) == 0 { 272 if err := s.loadJobs(ctx, 1000); err != nil { 273 return nil, err 274 } 275 276 if len(s.taskQueue) == 0 { 277 return nil, nil 278 } 279 } 280 281 for len(s.taskQueue) > 0 { 282 first := s.taskQueue[0] 283 s.taskQueue = s.taskQueue[1:] 284 285 j, err := s.getJob(ctx, first) 286 if err != nil { 287 return nil, err 288 } 289 290 shouldRetry := strings.HasPrefix(j.State(), "failed") && j.retryAfter != nil && time.Now().After(*j.retryAfter) 291 292 if j.State() == StateEnqueued || shouldRetry { 293 return j, nil 294 } 295 } 296 return nil, nil 297} 298 299func (j *Gormjob) Repo() string { 300 return j.repo 301} 302 303func (j *Gormjob) State() string { 304 j.lk.Lock() 305 defer j.lk.Unlock() 306 307 return j.state 308} 309 310func (j *Gormjob) SetRev(ctx context.Context, r string) error { 311 j.lk.Lock() 312 defer j.lk.Unlock() 313 314 j.rev = r 315 j.updatedAt = time.Now() 316 317 j.dbj.Rev = r 318 j.dbj.UpdatedAt = j.updatedAt 319 320 // Persist the job to the database 321 322 return j.db.Save(j.dbj).Error 323} 324 325func (j *Gormjob) Rev() string { 326 j.lk.Lock() 327 defer j.lk.Unlock() 328 329 return j.rev 330} 331 332func (j *Gormjob) SetState(ctx context.Context, state string) error { 333 j.lk.Lock() 334 defer j.lk.Unlock() 335 336 j.state = state 337 j.updatedAt = time.Now() 338 339 if strings.HasPrefix(state, "failed") { 340 if j.retryCount < MaxRetries { 341 next := time.Now().Add(computeExponentialBackoff(j.retryCount)) 342 j.retryAfter = &next 343 j.retryCount++ 344 } else { 345 j.retryAfter = nil 346 } 347 } 348 349 // Persist the job to the database 350 j.dbj.State = state 351 return j.db.Save(j.dbj).Error 352} 353 354func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error { 355 // TODO: this will block any events for this repo while this flush is ongoing, is that okay? 356 j.lk.Lock() 357 defer j.lk.Unlock() 358 359 for _, opset := range j.bufferedOps { 360 if opset.rev <= j.rev { 361 // stale events, skip 362 continue 363 } 364 365 if opset.since == nil { 366 // The first event for a repo may have a nil since 367 // We should process it only if the rev is empty, skip otherwise 368 if j.rev != "" { 369 continue 370 } 371 } 372 373 if j.rev > *opset.since { 374 // we've already accounted for this event 375 continue 376 } 377 378 if j.rev != *opset.since { 379 // we've got a discontinuity 380 return fmt.Errorf("event since did not match current rev (%s != %s): %w", *opset.since, j.rev, ErrEventGap) 381 } 382 383 for _, op := range opset.ops { 384 if err := fn(op.Kind, opset.rev, op.Path, op.Record, op.Cid); err != nil { 385 return err 386 } 387 } 388 389 j.rev = opset.rev 390 } 391 392 j.bufferedOps = []*opSet{} 393 j.state = StateComplete 394 395 return nil 396} 397 398func (j *Gormjob) ClearBufferedOps(ctx context.Context) error { 399 j.lk.Lock() 400 defer j.lk.Unlock() 401 402 j.bufferedOps = []*opSet{} 403 j.updatedAt = time.Now() 404 return nil 405} 406 407func (j *Gormjob) RetryCount() int { 408 j.lk.Lock() 409 defer j.lk.Unlock() 410 return j.retryCount 411} 412 413func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error { 414 j, err := s.GetJob(ctx, repo) 415 if err != nil { 416 return err 417 } 418 419 return j.SetRev(ctx, rev) 420} 421 422func (s *Gormstore) PurgeRepo(ctx context.Context, repo string) error { 423 if err := s.db.Exec("DELETE FROM gorm_db_jobs WHERE repo = ?", repo).Error; err != nil { 424 return err 425 } 426 427 s.lk.Lock() 428 defer s.lk.Unlock() 429 delete(s.jobs, repo) 430 431 return nil 432}