1package backfill
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "log/slog"
9 "net/http"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/api/atproto"
15 "github.com/bluesky-social/indigo/atproto/identity"
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "github.com/bluesky-social/indigo/repo"
18 "github.com/bluesky-social/indigo/repomgr"
19
20 "github.com/ipfs/go-cid"
21 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
22 "go.opentelemetry.io/otel"
23 "golang.org/x/sync/semaphore"
24 "golang.org/x/time/rate"
25)
26
27// Job is an interface for a backfill job
28type Job interface {
29 Repo() string
30 State() string
31 Rev() string
32 SetState(ctx context.Context, state string) error
33 SetRev(ctx context.Context, rev string) error
34 RetryCount() int
35
36 // BufferOps buffers the given operations and returns true if the operations
37 // were buffered.
38 // The given operations move the repo from since to rev.
39 BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error)
40 // FlushBufferedOps calls the given callback for each buffered operation
41 // Once done it clears the buffer and marks the job as "complete"
42 // Allowing the Job interface to abstract away the details of how buffered
43 // operations are stored and/or locked
44 FlushBufferedOps(ctx context.Context, cb func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error
45
46 ClearBufferedOps(ctx context.Context) error
47}
48
49// Store is an interface for a backfill store which holds Jobs
50type Store interface {
51 // BufferOp buffers an operation for a job and returns true if the operation was buffered
52 // If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete)
53 GetJob(ctx context.Context, repo string) (Job, error)
54 GetNextEnqueuedJob(ctx context.Context) (Job, error)
55 UpdateRev(ctx context.Context, repo, rev string) error
56
57 EnqueueJob(ctx context.Context, repo string) error
58 EnqueueJobWithState(ctx context.Context, repo string, state string) error
59
60 PurgeRepo(ctx context.Context, repo string) error
61}
62
63// Backfiller is a struct which handles backfilling a repo
64type Backfiller struct {
65 Name string
66 HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error
67 HandleUpdateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error
68 HandleDeleteRecord func(ctx context.Context, repo string, rev string, path string) error
69 Store Store
70
71 // Number of backfills to process in parallel
72 ParallelBackfills int
73 // Number of records to process in parallel for each backfill
74 ParallelRecordCreates int
75 // Prefix match for records to backfill i.e. app.bsky.feed.app/
76 // If empty, all records will be backfilled
77 NSIDFilter string
78 RelayHost string
79
80 syncLimiter *rate.Limiter
81
82 magicHeaderKey string
83 magicHeaderVal string
84
85 tryRelayRepoFetch bool
86
87 stop chan chan struct{}
88
89 Directory identity.Directory
90}
91
92var (
93 // StateEnqueued is the state of a backfill job when it is first created
94 StateEnqueued = "enqueued"
95 // StateInProgress is the state of a backfill job when it is being processed
96 StateInProgress = "in_progress"
97 // StateComplete is the state of a backfill job when it has been processed
98 StateComplete = "complete"
99)
100
101// ErrJobComplete is returned when trying to buffer an op for a job that is complete
102var ErrJobComplete = errors.New("job is complete")
103
104// ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist
105var ErrJobNotFound = errors.New("job not found")
106
107// ErrEventGap is returned when an event is received with a since that doesn't match the current rev
108var ErrEventGap = fmt.Errorf("buffered event revs did not line up")
109
110// ErrAlreadyProcessed is returned when attempting to buffer an event that has already been accounted for (rev older than current)
111var ErrAlreadyProcessed = fmt.Errorf("event already accounted for")
112
113var tracer = otel.Tracer("backfiller")
114
115type BackfillOptions struct {
116 ParallelBackfills int
117 ParallelRecordCreates int
118 NSIDFilter string
119 SyncRequestsPerSecond int
120 RelayHost string
121}
122
123func DefaultBackfillOptions() *BackfillOptions {
124 return &BackfillOptions{
125 ParallelBackfills: 10,
126 ParallelRecordCreates: 100,
127 NSIDFilter: "",
128 SyncRequestsPerSecond: 2,
129 RelayHost: "https://bsky.network",
130 }
131}
132
133// NewBackfiller creates a new Backfiller
134func NewBackfiller(
135 name string,
136 store Store,
137 handleCreate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error,
138 handleUpdate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error,
139 handleDelete func(ctx context.Context, repo string, rev string, path string) error,
140 opts *BackfillOptions,
141) *Backfiller {
142 if opts == nil {
143 opts = DefaultBackfillOptions()
144 }
145
146 // Convert wss:// or ws:// to https:// or http://
147 if strings.HasPrefix(opts.RelayHost, "wss://") {
148 opts.RelayHost = "https://" + opts.RelayHost[6:]
149 } else if strings.HasPrefix(opts.RelayHost, "ws://") {
150 opts.RelayHost = "http://" + opts.RelayHost[5:]
151 }
152
153 return &Backfiller{
154 Name: name,
155 Store: store,
156 HandleCreateRecord: handleCreate,
157 HandleUpdateRecord: handleUpdate,
158 HandleDeleteRecord: handleDelete,
159 ParallelBackfills: opts.ParallelBackfills,
160 ParallelRecordCreates: opts.ParallelRecordCreates,
161 NSIDFilter: opts.NSIDFilter,
162 syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1),
163 RelayHost: opts.RelayHost,
164 stop: make(chan chan struct{}, 1),
165 Directory: identity.DefaultDirectory(),
166 }
167}
168
169// Start starts the backfill processor routine
170func (b *Backfiller) Start() {
171 ctx := context.Background()
172
173 log := slog.With("source", "backfiller", "name", b.Name)
174 log.Info("starting backfill processor")
175
176 sem := semaphore.NewWeighted(int64(b.ParallelBackfills))
177
178 for {
179 select {
180 case stopped := <-b.stop:
181 log.Info("stopping backfill processor")
182 sem.Acquire(ctx, int64(b.ParallelBackfills))
183 close(stopped)
184 return
185 default:
186 }
187
188 // Get the next job
189 job, err := b.Store.GetNextEnqueuedJob(ctx)
190 if err != nil {
191 log.Error("failed to get next enqueued job", "error", err)
192 time.Sleep(1 * time.Second)
193 continue
194 } else if job == nil {
195 time.Sleep(1 * time.Second)
196 continue
197 }
198
199 log := log.With("repo", job.Repo())
200
201 // Mark the backfill as "in progress"
202 err = job.SetState(ctx, StateInProgress)
203 if err != nil {
204 log.Error("failed to set job state", "error", err)
205 continue
206 }
207
208 sem.Acquire(ctx, 1)
209 go func(j Job) {
210 defer sem.Release(1)
211 newState, err := b.BackfillRepo(ctx, j)
212 if err != nil {
213 log.Error("failed to backfill repo", "error", err)
214 }
215 if newState != "" {
216 if sserr := j.SetState(ctx, newState); sserr != nil {
217 log.Error("failed to set job state", "error", sserr)
218 }
219
220 if strings.HasPrefix(newState, "failed") {
221 // Clear buffered ops
222 if err := j.ClearBufferedOps(ctx); err != nil {
223 log.Error("failed to clear buffered ops", "error", err)
224 }
225 }
226 }
227 backfillJobsProcessed.WithLabelValues(b.Name).Inc()
228 }(job)
229 }
230}
231
232// Stop stops the backfill processor
233func (b *Backfiller) Stop(ctx context.Context) error {
234 log := slog.With("source", "backfiller", "name", b.Name)
235 log.Info("stopping backfill processor")
236 stopped := make(chan struct{})
237 b.stop <- stopped
238 select {
239 case <-stopped:
240 log.Info("backfill processor stopped")
241 return nil
242 case <-ctx.Done():
243 return ctx.Err()
244 }
245}
246
247// FlushBuffer processes buffered operations for a job
248func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int {
249 ctx, span := tracer.Start(ctx, "FlushBuffer")
250 defer span.End()
251 log := slog.With("source", "backfiller_buffer_flush", "repo", job.Repo())
252
253 processed := 0
254
255 repo := job.Repo()
256
257 // Flush buffered operations, clear the buffer, and mark the job as "complete"
258 // Clearing and marking are handled by the job interface
259 err := job.FlushBufferedOps(ctx, func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error {
260 switch kind {
261 case repomgr.EvtKindCreateRecord:
262 err := b.HandleCreateRecord(ctx, repo, rev, path, rec, cid)
263 if err != nil {
264 log.Error("failed to handle create record", "error", err)
265 }
266 case repomgr.EvtKindUpdateRecord:
267 err := b.HandleUpdateRecord(ctx, repo, rev, path, rec, cid)
268 if err != nil {
269 log.Error("failed to handle update record", "error", err)
270 }
271 case repomgr.EvtKindDeleteRecord:
272 err := b.HandleDeleteRecord(ctx, repo, rev, path)
273 if err != nil {
274 log.Error("failed to handle delete record", "error", err)
275 }
276 }
277 backfillOpsBuffered.WithLabelValues(b.Name).Dec()
278 processed++
279 return nil
280 })
281 if err != nil {
282 log.Error("failed to flush buffered ops", "error", err)
283 if errors.Is(err, ErrEventGap) {
284 if sserr := job.SetState(ctx, StateEnqueued); sserr != nil {
285 log.Error("failed to reset job state after failed buffer flush", "error", sserr)
286 }
287 // TODO: need to re-queue this job for later
288 return processed
289 }
290 }
291
292 // Mark the job as "complete"
293 err = job.SetState(ctx, StateComplete)
294 if err != nil {
295 log.Error("failed to set job state", "error", err)
296 }
297
298 return processed
299}
300
301type recordQueueItem struct {
302 recordPath string
303 nodeCid cid.Cid
304}
305
306type recordResult struct {
307 recordPath string
308 err error
309}
310
311type FetchRepoError struct {
312 StatusCode int
313 Status string
314}
315
316func (e *FetchRepoError) Error() string {
317 reason := "unknown error"
318 if e.StatusCode == http.StatusBadRequest {
319 reason = "repo not found"
320 } else {
321 reason = e.Status
322 }
323 return fmt.Sprintf("failed to get repo: %s (%d)", reason, e.StatusCode)
324}
325
326// Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo
327func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*repo.Repo, error) {
328 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did)
329
330 if since != "" {
331 url = url + fmt.Sprintf("&since=%s", since)
332 }
333
334 // GET and CAR decode the body
335 client := &http.Client{
336 Transport: otelhttp.NewTransport(http.DefaultTransport),
337 Timeout: 600 * time.Second,
338 }
339 req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
340 if err != nil {
341 return nil, fmt.Errorf("failed to create request: %w", err)
342 }
343
344 req.Header.Set("Accept", "application/vnd.ipld.car")
345 req.Header.Set("User-Agent", fmt.Sprintf("atproto-backfill-%s/0.0.1", b.Name))
346 if b.magicHeaderKey != "" && b.magicHeaderVal != "" {
347 req.Header.Set(b.magicHeaderKey, b.magicHeaderVal)
348 }
349
350 b.syncLimiter.Wait(ctx)
351
352 resp, err := client.Do(req)
353 if err != nil {
354 return nil, fmt.Errorf("failed to send request: %w", err)
355 }
356
357 if resp.StatusCode != http.StatusOK {
358 return nil, &FetchRepoError{
359 StatusCode: resp.StatusCode,
360 Status: resp.Status,
361 }
362 }
363
364 instrumentedReader := instrumentedReader{
365 source: resp.Body,
366 counter: backfillBytesProcessed.WithLabelValues(b.Name),
367 }
368
369 defer instrumentedReader.Close()
370
371 repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader)
372 if err != nil {
373 return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err)
374 }
375 return repo, nil
376}
377
378// BackfillRepo backfills a repo
379func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) {
380 ctx, span := tracer.Start(ctx, "BackfillRepo")
381 defer span.End()
382
383 start := time.Now()
384
385 repoDID := job.Repo()
386
387 log := slog.With("source", "backfiller_backfill_repo", "repo", repoDID)
388 if job.RetryCount() > 0 {
389 log = log.With("retry_count", job.RetryCount())
390 }
391 log.Info(fmt.Sprintf("processing backfill for %s", repoDID))
392
393 var r *repo.Repo
394 if b.tryRelayRepoFetch {
395 rr, err := b.fetchRepo(ctx, repoDID, job.Rev(), b.RelayHost)
396 if err != nil {
397 slog.Warn("repo CAR fetch from relay failed", "did", repoDID, "since", job.Rev(), "relayHost", b.RelayHost, "err", err)
398 } else {
399 r = rr
400 }
401 }
402
403 if r == nil {
404 ident, err := b.Directory.LookupDID(ctx, syntax.DID(repoDID))
405 if err != nil {
406 return "failed resolving DID to PDS repo", fmt.Errorf("resolving DID for PDS repo fetch: %w", err)
407 }
408 pdsHost := ident.PDSEndpoint()
409 if pdsHost == "" {
410 return "DID document missing PDS endpoint", fmt.Errorf("no PDS endpoint for DID: %s", repoDID)
411 }
412
413 r, err = b.fetchRepo(ctx, repoDID, job.Rev(), pdsHost)
414 if err != nil {
415 slog.Warn("repo CAR fetch from PDS failed", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err)
416 rfe, ok := err.(*FetchRepoError)
417 if ok {
418 return fmt.Sprintf("failed to fetch repo CAR from PDS (http %d:%s)", rfe.StatusCode, rfe.Status), err
419 }
420 return "failed to fetch repo CAR from PDS", err
421 }
422 }
423
424 numRecords := 0
425 numRoutines := b.ParallelRecordCreates
426 recordQueue := make(chan recordQueueItem, numRoutines)
427 recordResults := make(chan recordResult, numRoutines)
428
429 // Producer routine
430 go func() {
431 defer close(recordQueue)
432 if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error {
433 numRecords++
434 recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid}
435 return nil
436 }); err != nil {
437 log.Error("failed to iterate records in repo", "err", err)
438 }
439 }()
440
441 rev := r.SignedCommit().Rev
442
443 // Consumer routines
444 wg := sync.WaitGroup{}
445 for i := 0; i < numRoutines; i++ {
446 wg.Add(1)
447 go func() {
448 defer wg.Done()
449 for item := range recordQueue {
450 blk, err := r.Blockstore().Get(ctx, item.nodeCid)
451 if err != nil {
452 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)}
453 continue
454 }
455
456 raw := blk.RawData()
457
458 err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid)
459 if err != nil {
460 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)}
461 continue
462 }
463
464 backfillRecordsProcessed.WithLabelValues(b.Name).Inc()
465 recordResults <- recordResult{recordPath: item.recordPath, err: err}
466 }
467 }()
468 }
469
470 resultWG := sync.WaitGroup{}
471 resultWG.Add(1)
472 // Handle results
473 go func() {
474 defer resultWG.Done()
475 for result := range recordResults {
476 if result.err != nil {
477 log.Error("Error processing record", "record", result.recordPath, "error", result.err)
478 }
479 }
480 }()
481
482 wg.Wait()
483 close(recordResults)
484 resultWG.Wait()
485
486 if err := job.SetRev(ctx, r.SignedCommit().Rev); err != nil {
487 log.Error("failed to update rev after backfilling repo", "err", err)
488 }
489
490 // Process buffered operations, marking the job as "complete" when done
491 numProcessed := b.FlushBuffer(ctx, job)
492
493 log.Info("backfill complete",
494 "buffered_records_processed", numProcessed,
495 "records_backfilled", numRecords,
496 "duration", time.Since(start),
497 )
498
499 return StateComplete, nil
500}
501
502const trust = true
503
504func (bf *Backfiller) getRecord(ctx context.Context, r *repo.Repo, op *atproto.SyncSubscribeRepos_RepoOp) (cid.Cid, *[]byte, error) {
505 if trust {
506 if op.Cid == nil {
507 return cid.Undef, nil, fmt.Errorf("op had no cid set")
508 }
509
510 c := (cid.Cid)(*op.Cid)
511 blk, err := r.Blockstore().Get(ctx, c)
512 if err != nil {
513 return cid.Undef, nil, err
514 }
515
516 raw := blk.RawData()
517
518 return c, &raw, nil
519 } else {
520 return r.GetRecordBytes(ctx, op.Path)
521 }
522}
523
524func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error {
525 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
526 if err != nil {
527 return fmt.Errorf("failed to read event repo: %w", err)
528 }
529
530 ops := make([]*BufferedOp, 0, len(evt.Ops))
531 for _, op := range evt.Ops {
532 kind := repomgr.EventKind(op.Action)
533 switch kind {
534 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord:
535 cc, rec, err := bf.getRecord(ctx, r, op)
536 if err != nil {
537 return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err)
538 }
539 ops = append(ops, &BufferedOp{
540 Kind: kind,
541 Path: op.Path,
542 Record: rec,
543 Cid: &cc,
544 })
545 case repomgr.EvtKindDeleteRecord:
546 ops = append(ops, &BufferedOp{
547 Kind: kind,
548 Path: op.Path,
549 })
550 default:
551 return fmt.Errorf("invalid op action: %q", op.Action)
552 }
553 }
554
555 if evt.Since == nil {
556 // The first event for a repo will have a nil since, we can enqueue the repo as "complete" to avoid fetching the empty repo
557 if err := bf.Store.EnqueueJobWithState(ctx, evt.Repo, StateComplete); err != nil {
558 return fmt.Errorf("failed to enqueue job with state for repo %q: %w", evt.Repo, err)
559 }
560 }
561
562 buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops)
563 if err != nil {
564 if errors.Is(err, ErrAlreadyProcessed) {
565 return nil
566 }
567 return fmt.Errorf("buffer ops failed: %w", err)
568 }
569
570 if buffered {
571 return nil
572 }
573
574 for _, op := range ops {
575 switch op.Kind {
576 case repomgr.EvtKindCreateRecord:
577 if err := bf.HandleCreateRecord(ctx, evt.Repo, evt.Rev, op.Path, op.Record, op.Cid); err != nil {
578 return fmt.Errorf("create record failed: %w", err)
579 }
580 case repomgr.EvtKindUpdateRecord:
581 if err := bf.HandleUpdateRecord(ctx, evt.Repo, evt.Rev, op.Path, op.Record, op.Cid); err != nil {
582 return fmt.Errorf("update record failed: %w", err)
583 }
584 case repomgr.EvtKindDeleteRecord:
585 if err := bf.HandleDeleteRecord(ctx, evt.Repo, evt.Rev, op.Path); err != nil {
586 return fmt.Errorf("delete record failed: %w", err)
587 }
588 }
589 }
590
591 if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil {
592 return fmt.Errorf("failed to update rev: %w", err)
593 }
594
595 return nil
596}
597
598func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error) {
599 return bf.BufferOps(ctx, repo, since, rev, []*BufferedOp{{
600 Path: path,
601 Kind: kind,
602 Record: rec,
603 Cid: cid,
604 }})
605}
606
607func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*BufferedOp) (bool, error) {
608 j, err := bf.Store.GetJob(ctx, repo)
609 if err != nil {
610 if !errors.Is(err, ErrJobNotFound) {
611 return false, err
612 }
613 if qerr := bf.Store.EnqueueJob(ctx, repo); qerr != nil {
614 return false, fmt.Errorf("failed to enqueue job for unknown repo: %w", qerr)
615 }
616
617 nj, err := bf.Store.GetJob(ctx, repo)
618 if err != nil {
619 return false, err
620 }
621
622 j = nj
623 }
624
625 return j.BufferOps(ctx, since, rev, ops)
626}
627
628// MaxRetries is the maximum number of times to retry a backfill job
629var MaxRetries = 10
630
631func computeExponentialBackoff(attempt int) time.Duration {
632 return time.Duration(1<<uint(attempt)) * 10 * time.Second
633}