1package backfill
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "sync"
9 "time"
10
11 "github.com/bluesky-social/indigo/repomgr"
12 "github.com/ipfs/go-cid"
13 "gorm.io/gorm"
14)
15
16type Gormjob struct {
17 repo string
18 state string
19 rev string
20
21 lk sync.Mutex
22 bufferedOps []*opSet
23
24 dbj *GormDBJob
25 db *gorm.DB
26
27 createdAt time.Time
28 updatedAt time.Time
29
30 retryCount int
31 retryAfter *time.Time
32}
33
34type GormDBJob struct {
35 gorm.Model
36 Repo string `gorm:"unique;index"`
37 State string `gorm:"index:enqueued_job_idx,where:state = 'enqueued';index:retryable_job_idx,where:state like 'failed%'"`
38 Rev string
39 RetryCount int
40 RetryAfter *time.Time `gorm:"index:retryable_job_idx,sort:desc"`
41}
42
43// Gormstore is a gorm-backed implementation of the Backfill Store interface
44type Gormstore struct {
45 lk sync.RWMutex
46 jobs map[string]*Gormjob
47
48 qlk sync.Mutex
49 taskQueue []string
50
51 db *gorm.DB
52}
53
54func NewGormstore(db *gorm.DB) *Gormstore {
55 return &Gormstore{
56 jobs: make(map[string]*Gormjob),
57 db: db,
58 }
59}
60
61func (s *Gormstore) LoadJobs(ctx context.Context) error {
62 s.qlk.Lock()
63 defer s.qlk.Unlock()
64 return s.loadJobs(ctx, 20_000)
65}
66
67func (s *Gormstore) loadJobs(ctx context.Context, limit int) error {
68 enqueuedIndexClause := ""
69 retryableIndexClause := ""
70
71 // If the DB is a SQLite DB, we can use INDEXED BY to speed up the query
72 if s.db.Dialector.Name() == "sqlite" {
73 enqueuedIndexClause = "INDEXED BY enqueued_job_idx"
74 retryableIndexClause = "INDEXED BY retryable_job_idx"
75 }
76
77 enqueuedSelect := fmt.Sprintf(`SELECT repo FROM gorm_db_jobs %s WHERE state = 'enqueued' LIMIT ?`, enqueuedIndexClause)
78 retryableSelect := fmt.Sprintf(`SELECT repo FROM gorm_db_jobs %s WHERE state like 'failed%%' AND (retry_after = NULL OR retry_after < ?) LIMIT ?`, retryableIndexClause)
79
80 var todo []string
81 if err := s.db.Raw(enqueuedSelect, limit).Scan(&todo).Error; err != nil {
82 return err
83 }
84
85 if len(todo) < limit {
86 var moreTodo []string
87 if err := s.db.Raw(retryableSelect, time.Now(), limit-len(todo)).Scan(&moreTodo).Error; err != nil {
88 return err
89 }
90 todo = append(todo, moreTodo...)
91 }
92
93 s.taskQueue = append(s.taskQueue, todo...)
94
95 return nil
96}
97
98func (s *Gormstore) GetOrCreateJob(ctx context.Context, repo, state string) (Job, error) {
99 j, err := s.getJob(ctx, repo)
100 if err == nil {
101 return j, nil
102 }
103
104 if !errors.Is(err, ErrJobNotFound) {
105 return nil, err
106 }
107
108 if err := s.createJobForRepo(repo, state); err != nil {
109 return nil, err
110 }
111
112 return s.getJob(ctx, repo)
113}
114
115func (s *Gormstore) EnqueueJob(ctx context.Context, repo string) error {
116 _, err := s.GetOrCreateJob(ctx, repo, StateEnqueued)
117 if err != nil {
118 return err
119 }
120
121 s.qlk.Lock()
122 s.taskQueue = append(s.taskQueue, repo)
123 s.qlk.Unlock()
124
125 return nil
126}
127
128func (s *Gormstore) EnqueueJobWithState(ctx context.Context, repo, state string) error {
129 _, err := s.GetOrCreateJob(ctx, repo, state)
130 if err != nil {
131 return err
132 }
133
134 s.qlk.Lock()
135 s.taskQueue = append(s.taskQueue, repo)
136 s.qlk.Unlock()
137
138 return nil
139}
140
141func (s *Gormstore) createJobForRepo(repo, state string) error {
142 dbj := &GormDBJob{
143 Repo: repo,
144 State: state,
145 }
146 if err := s.db.Create(dbj).Error; err != nil {
147 if err == gorm.ErrDuplicatedKey {
148 return nil
149 }
150 return err
151 }
152
153 s.lk.Lock()
154 defer s.lk.Unlock()
155
156 // Convert it to an in-memory job
157 if _, ok := s.jobs[repo]; ok {
158 // The DB create should have errored if the job already existed, but just in case
159 return fmt.Errorf("job already exists for repo %s", repo)
160 }
161
162 j := &Gormjob{
163 repo: repo,
164 createdAt: time.Now(),
165 updatedAt: time.Now(),
166 state: state,
167
168 dbj: dbj,
169 db: s.db,
170 }
171 s.jobs[repo] = j
172
173 return nil
174}
175
176func (j *Gormjob) BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error) {
177 j.lk.Lock()
178 defer j.lk.Unlock()
179
180 switch j.state {
181 case StateComplete:
182 return false, nil
183 case StateInProgress, StateEnqueued:
184 // keep going and buffer the op
185 default:
186 if strings.HasPrefix(j.state, "failed") {
187 if j.retryCount >= MaxRetries {
188 // Process immediately since we're out of retries
189 return false, nil
190 }
191 // Don't buffer the op since it'll get caught in the next retry (hopefully)
192 return true, nil
193 }
194 return false, fmt.Errorf("invalid job state: %q", j.state)
195 }
196
197 if j.rev >= rev || (since == nil && j.rev != "") {
198 // we've already accounted for this event
199 return false, ErrAlreadyProcessed
200 }
201
202 j.bufferOps(&opSet{since: since, rev: rev, ops: ops})
203 return true, nil
204}
205
206func (j *Gormjob) bufferOps(ops *opSet) {
207 j.bufferedOps = append(j.bufferedOps, ops)
208 j.updatedAt = time.Now()
209}
210
211func (s *Gormstore) GetJob(ctx context.Context, repo string) (Job, error) {
212 return s.getJob(ctx, repo)
213}
214
215func (s *Gormstore) getJob(ctx context.Context, repo string) (*Gormjob, error) {
216 cj := s.checkJobCache(ctx, repo)
217 if cj != nil {
218 return cj, nil
219 }
220
221 return s.loadJob(ctx, repo)
222}
223
224func (s *Gormstore) loadJob(ctx context.Context, repo string) (*Gormjob, error) {
225 var dbj GormDBJob
226 if err := s.db.Find(&dbj, "repo = ?", repo).Error; err != nil {
227 return nil, err
228 }
229
230 if dbj.ID == 0 {
231 return nil, ErrJobNotFound
232 }
233
234 j := &Gormjob{
235 repo: dbj.Repo,
236 state: dbj.State,
237 createdAt: dbj.CreatedAt,
238 updatedAt: dbj.UpdatedAt,
239
240 dbj: &dbj,
241 db: s.db,
242
243 retryCount: dbj.RetryCount,
244 retryAfter: dbj.RetryAfter,
245 }
246 s.lk.Lock()
247 defer s.lk.Unlock()
248 // would imply a race condition
249 exist, ok := s.jobs[repo]
250 if ok {
251 return exist, nil
252 }
253 s.jobs[repo] = j
254 return j, nil
255}
256
257func (s *Gormstore) checkJobCache(ctx context.Context, repo string) *Gormjob {
258 s.lk.RLock()
259 defer s.lk.RUnlock()
260
261 j, ok := s.jobs[repo]
262 if !ok || j == nil {
263 return nil
264 }
265 return j
266}
267
268func (s *Gormstore) GetNextEnqueuedJob(ctx context.Context) (Job, error) {
269 s.qlk.Lock()
270 defer s.qlk.Unlock()
271 if len(s.taskQueue) == 0 {
272 if err := s.loadJobs(ctx, 1000); err != nil {
273 return nil, err
274 }
275
276 if len(s.taskQueue) == 0 {
277 return nil, nil
278 }
279 }
280
281 for len(s.taskQueue) > 0 {
282 first := s.taskQueue[0]
283 s.taskQueue = s.taskQueue[1:]
284
285 j, err := s.getJob(ctx, first)
286 if err != nil {
287 return nil, err
288 }
289
290 shouldRetry := strings.HasPrefix(j.State(), "failed") && j.retryAfter != nil && time.Now().After(*j.retryAfter)
291
292 if j.State() == StateEnqueued || shouldRetry {
293 return j, nil
294 }
295 }
296 return nil, nil
297}
298
299func (j *Gormjob) Repo() string {
300 return j.repo
301}
302
303func (j *Gormjob) State() string {
304 j.lk.Lock()
305 defer j.lk.Unlock()
306
307 return j.state
308}
309
310func (j *Gormjob) SetRev(ctx context.Context, r string) error {
311 j.lk.Lock()
312 defer j.lk.Unlock()
313
314 j.rev = r
315 j.updatedAt = time.Now()
316
317 j.dbj.Rev = r
318 j.dbj.UpdatedAt = j.updatedAt
319
320 // Persist the job to the database
321
322 return j.db.Save(j.dbj).Error
323}
324
325func (j *Gormjob) Rev() string {
326 j.lk.Lock()
327 defer j.lk.Unlock()
328
329 return j.rev
330}
331
332func (j *Gormjob) SetState(ctx context.Context, state string) error {
333 j.lk.Lock()
334 defer j.lk.Unlock()
335
336 j.state = state
337 j.updatedAt = time.Now()
338
339 if strings.HasPrefix(state, "failed") {
340 if j.retryCount < MaxRetries {
341 next := time.Now().Add(computeExponentialBackoff(j.retryCount))
342 j.retryAfter = &next
343 j.retryCount++
344 } else {
345 j.retryAfter = nil
346 }
347 }
348
349 // Persist the job to the database
350 j.dbj.State = state
351 return j.db.Save(j.dbj).Error
352}
353
354func (j *Gormjob) FlushBufferedOps(ctx context.Context, fn func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error {
355 // TODO: this will block any events for this repo while this flush is ongoing, is that okay?
356 j.lk.Lock()
357 defer j.lk.Unlock()
358
359 for _, opset := range j.bufferedOps {
360 if opset.rev <= j.rev {
361 // stale events, skip
362 continue
363 }
364
365 if opset.since == nil {
366 // The first event for a repo may have a nil since
367 // We should process it only if the rev is empty, skip otherwise
368 if j.rev != "" {
369 continue
370 }
371 }
372
373 if j.rev > *opset.since {
374 // we've already accounted for this event
375 continue
376 }
377
378 if j.rev != *opset.since {
379 // we've got a discontinuity
380 return fmt.Errorf("event since did not match current rev (%s != %s): %w", *opset.since, j.rev, ErrEventGap)
381 }
382
383 for _, op := range opset.ops {
384 if err := fn(op.Kind, opset.rev, op.Path, op.Record, op.Cid); err != nil {
385 return err
386 }
387 }
388
389 j.rev = opset.rev
390 }
391
392 j.bufferedOps = []*opSet{}
393 j.state = StateComplete
394
395 return nil
396}
397
398func (j *Gormjob) ClearBufferedOps(ctx context.Context) error {
399 j.lk.Lock()
400 defer j.lk.Unlock()
401
402 j.bufferedOps = []*opSet{}
403 j.updatedAt = time.Now()
404 return nil
405}
406
407func (j *Gormjob) RetryCount() int {
408 j.lk.Lock()
409 defer j.lk.Unlock()
410 return j.retryCount
411}
412
413func (s *Gormstore) UpdateRev(ctx context.Context, repo, rev string) error {
414 j, err := s.GetJob(ctx, repo)
415 if err != nil {
416 return err
417 }
418
419 return j.SetRev(ctx, rev)
420}
421
422func (s *Gormstore) PurgeRepo(ctx context.Context, repo string) error {
423 if err := s.db.Exec("DELETE FROM gorm_db_jobs WHERE repo = ?", repo).Error; err != nil {
424 return err
425 }
426
427 s.lk.Lock()
428 defer s.lk.Unlock()
429 delete(s.jobs, repo)
430
431 return nil
432}