fork of indigo with slightly nicer lexgen
at main 19 kB view raw
1package backfill 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "log/slog" 9 "net/http" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/api/atproto" 15 "github.com/bluesky-social/indigo/atproto/identity" 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/repomgr" 19 20 "github.com/ipfs/go-cid" 21 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" 22 "go.opentelemetry.io/otel" 23 "golang.org/x/sync/semaphore" 24 "golang.org/x/time/rate" 25) 26 27// Job is an interface for a backfill job 28type Job interface { 29 Repo() string 30 State() string 31 Rev() string 32 SetState(ctx context.Context, state string) error 33 SetRev(ctx context.Context, rev string) error 34 RetryCount() int 35 36 // BufferOps buffers the given operations and returns true if the operations 37 // were buffered. 38 // The given operations move the repo from since to rev. 39 BufferOps(ctx context.Context, since *string, rev string, ops []*BufferedOp) (bool, error) 40 // FlushBufferedOps calls the given callback for each buffered operation 41 // Once done it clears the buffer and marks the job as "complete" 42 // Allowing the Job interface to abstract away the details of how buffered 43 // operations are stored and/or locked 44 FlushBufferedOps(ctx context.Context, cb func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error) error 45 46 ClearBufferedOps(ctx context.Context) error 47} 48 49// Store is an interface for a backfill store which holds Jobs 50type Store interface { 51 // BufferOp buffers an operation for a job and returns true if the operation was buffered 52 // If the operation was not buffered, it returns false and an error (ErrJobNotFound or ErrJobComplete) 53 GetJob(ctx context.Context, repo string) (Job, error) 54 GetNextEnqueuedJob(ctx context.Context) (Job, error) 55 UpdateRev(ctx context.Context, repo, rev string) error 56 57 EnqueueJob(ctx context.Context, repo string) error 58 EnqueueJobWithState(ctx context.Context, repo string, state string) error 59 60 PurgeRepo(ctx context.Context, repo string) error 61} 62 63// Backfiller is a struct which handles backfilling a repo 64type Backfiller struct { 65 Name string 66 HandleCreateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error 67 HandleUpdateRecord func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error 68 HandleDeleteRecord func(ctx context.Context, repo string, rev string, path string) error 69 Store Store 70 71 // Number of backfills to process in parallel 72 ParallelBackfills int 73 // Number of records to process in parallel for each backfill 74 ParallelRecordCreates int 75 // Prefix match for records to backfill i.e. app.bsky.feed.app/ 76 // If empty, all records will be backfilled 77 NSIDFilter string 78 RelayHost string 79 80 syncLimiter *rate.Limiter 81 82 magicHeaderKey string 83 magicHeaderVal string 84 85 tryRelayRepoFetch bool 86 87 stop chan chan struct{} 88 89 Directory identity.Directory 90} 91 92var ( 93 // StateEnqueued is the state of a backfill job when it is first created 94 StateEnqueued = "enqueued" 95 // StateInProgress is the state of a backfill job when it is being processed 96 StateInProgress = "in_progress" 97 // StateComplete is the state of a backfill job when it has been processed 98 StateComplete = "complete" 99) 100 101// ErrJobComplete is returned when trying to buffer an op for a job that is complete 102var ErrJobComplete = errors.New("job is complete") 103 104// ErrJobNotFound is returned when trying to buffer an op for a job that doesn't exist 105var ErrJobNotFound = errors.New("job not found") 106 107// ErrEventGap is returned when an event is received with a since that doesn't match the current rev 108var ErrEventGap = fmt.Errorf("buffered event revs did not line up") 109 110// ErrAlreadyProcessed is returned when attempting to buffer an event that has already been accounted for (rev older than current) 111var ErrAlreadyProcessed = fmt.Errorf("event already accounted for") 112 113var tracer = otel.Tracer("backfiller") 114 115type BackfillOptions struct { 116 ParallelBackfills int 117 ParallelRecordCreates int 118 NSIDFilter string 119 SyncRequestsPerSecond int 120 RelayHost string 121} 122 123func DefaultBackfillOptions() *BackfillOptions { 124 return &BackfillOptions{ 125 ParallelBackfills: 10, 126 ParallelRecordCreates: 100, 127 NSIDFilter: "", 128 SyncRequestsPerSecond: 2, 129 RelayHost: "https://bsky.network", 130 } 131} 132 133// NewBackfiller creates a new Backfiller 134func NewBackfiller( 135 name string, 136 store Store, 137 handleCreate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error, 138 handleUpdate func(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error, 139 handleDelete func(ctx context.Context, repo string, rev string, path string) error, 140 opts *BackfillOptions, 141) *Backfiller { 142 if opts == nil { 143 opts = DefaultBackfillOptions() 144 } 145 146 // Convert wss:// or ws:// to https:// or http:// 147 if strings.HasPrefix(opts.RelayHost, "wss://") { 148 opts.RelayHost = "https://" + opts.RelayHost[6:] 149 } else if strings.HasPrefix(opts.RelayHost, "ws://") { 150 opts.RelayHost = "http://" + opts.RelayHost[5:] 151 } 152 153 return &Backfiller{ 154 Name: name, 155 Store: store, 156 HandleCreateRecord: handleCreate, 157 HandleUpdateRecord: handleUpdate, 158 HandleDeleteRecord: handleDelete, 159 ParallelBackfills: opts.ParallelBackfills, 160 ParallelRecordCreates: opts.ParallelRecordCreates, 161 NSIDFilter: opts.NSIDFilter, 162 syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1), 163 RelayHost: opts.RelayHost, 164 stop: make(chan chan struct{}, 1), 165 Directory: identity.DefaultDirectory(), 166 } 167} 168 169// Start starts the backfill processor routine 170func (b *Backfiller) Start() { 171 ctx := context.Background() 172 173 log := slog.With("source", "backfiller", "name", b.Name) 174 log.Info("starting backfill processor") 175 176 sem := semaphore.NewWeighted(int64(b.ParallelBackfills)) 177 178 for { 179 select { 180 case stopped := <-b.stop: 181 log.Info("stopping backfill processor") 182 sem.Acquire(ctx, int64(b.ParallelBackfills)) 183 close(stopped) 184 return 185 default: 186 } 187 188 // Get the next job 189 job, err := b.Store.GetNextEnqueuedJob(ctx) 190 if err != nil { 191 log.Error("failed to get next enqueued job", "error", err) 192 time.Sleep(1 * time.Second) 193 continue 194 } else if job == nil { 195 time.Sleep(1 * time.Second) 196 continue 197 } 198 199 log := log.With("repo", job.Repo()) 200 201 // Mark the backfill as "in progress" 202 err = job.SetState(ctx, StateInProgress) 203 if err != nil { 204 log.Error("failed to set job state", "error", err) 205 continue 206 } 207 208 sem.Acquire(ctx, 1) 209 go func(j Job) { 210 defer sem.Release(1) 211 newState, err := b.BackfillRepo(ctx, j) 212 if err != nil { 213 log.Error("failed to backfill repo", "error", err) 214 } 215 if newState != "" { 216 if sserr := j.SetState(ctx, newState); sserr != nil { 217 log.Error("failed to set job state", "error", sserr) 218 } 219 220 if strings.HasPrefix(newState, "failed") { 221 // Clear buffered ops 222 if err := j.ClearBufferedOps(ctx); err != nil { 223 log.Error("failed to clear buffered ops", "error", err) 224 } 225 } 226 } 227 backfillJobsProcessed.WithLabelValues(b.Name).Inc() 228 }(job) 229 } 230} 231 232// Stop stops the backfill processor 233func (b *Backfiller) Stop(ctx context.Context) error { 234 log := slog.With("source", "backfiller", "name", b.Name) 235 log.Info("stopping backfill processor") 236 stopped := make(chan struct{}) 237 b.stop <- stopped 238 select { 239 case <-stopped: 240 log.Info("backfill processor stopped") 241 return nil 242 case <-ctx.Done(): 243 return ctx.Err() 244 } 245} 246 247// FlushBuffer processes buffered operations for a job 248func (b *Backfiller) FlushBuffer(ctx context.Context, job Job) int { 249 ctx, span := tracer.Start(ctx, "FlushBuffer") 250 defer span.End() 251 log := slog.With("source", "backfiller_buffer_flush", "repo", job.Repo()) 252 253 processed := 0 254 255 repo := job.Repo() 256 257 // Flush buffered operations, clear the buffer, and mark the job as "complete" 258 // Clearing and marking are handled by the job interface 259 err := job.FlushBufferedOps(ctx, func(kind repomgr.EventKind, rev, path string, rec *[]byte, cid *cid.Cid) error { 260 switch kind { 261 case repomgr.EvtKindCreateRecord: 262 err := b.HandleCreateRecord(ctx, repo, rev, path, rec, cid) 263 if err != nil { 264 log.Error("failed to handle create record", "error", err) 265 } 266 case repomgr.EvtKindUpdateRecord: 267 err := b.HandleUpdateRecord(ctx, repo, rev, path, rec, cid) 268 if err != nil { 269 log.Error("failed to handle update record", "error", err) 270 } 271 case repomgr.EvtKindDeleteRecord: 272 err := b.HandleDeleteRecord(ctx, repo, rev, path) 273 if err != nil { 274 log.Error("failed to handle delete record", "error", err) 275 } 276 } 277 backfillOpsBuffered.WithLabelValues(b.Name).Dec() 278 processed++ 279 return nil 280 }) 281 if err != nil { 282 log.Error("failed to flush buffered ops", "error", err) 283 if errors.Is(err, ErrEventGap) { 284 if sserr := job.SetState(ctx, StateEnqueued); sserr != nil { 285 log.Error("failed to reset job state after failed buffer flush", "error", sserr) 286 } 287 // TODO: need to re-queue this job for later 288 return processed 289 } 290 } 291 292 // Mark the job as "complete" 293 err = job.SetState(ctx, StateComplete) 294 if err != nil { 295 log.Error("failed to set job state", "error", err) 296 } 297 298 return processed 299} 300 301type recordQueueItem struct { 302 recordPath string 303 nodeCid cid.Cid 304} 305 306type recordResult struct { 307 recordPath string 308 err error 309} 310 311type FetchRepoError struct { 312 StatusCode int 313 Status string 314} 315 316func (e *FetchRepoError) Error() string { 317 reason := "unknown error" 318 if e.StatusCode == http.StatusBadRequest { 319 reason = "repo not found" 320 } else { 321 reason = e.Status 322 } 323 return fmt.Sprintf("failed to get repo: %s (%d)", reason, e.StatusCode) 324} 325 326// Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo 327func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*repo.Repo, error) { 328 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did) 329 330 if since != "" { 331 url = url + fmt.Sprintf("&since=%s", since) 332 } 333 334 // GET and CAR decode the body 335 client := &http.Client{ 336 Transport: otelhttp.NewTransport(http.DefaultTransport), 337 Timeout: 600 * time.Second, 338 } 339 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 340 if err != nil { 341 return nil, fmt.Errorf("failed to create request: %w", err) 342 } 343 344 req.Header.Set("Accept", "application/vnd.ipld.car") 345 req.Header.Set("User-Agent", fmt.Sprintf("atproto-backfill-%s/0.0.1", b.Name)) 346 if b.magicHeaderKey != "" && b.magicHeaderVal != "" { 347 req.Header.Set(b.magicHeaderKey, b.magicHeaderVal) 348 } 349 350 b.syncLimiter.Wait(ctx) 351 352 resp, err := client.Do(req) 353 if err != nil { 354 return nil, fmt.Errorf("failed to send request: %w", err) 355 } 356 357 if resp.StatusCode != http.StatusOK { 358 return nil, &FetchRepoError{ 359 StatusCode: resp.StatusCode, 360 Status: resp.Status, 361 } 362 } 363 364 instrumentedReader := instrumentedReader{ 365 source: resp.Body, 366 counter: backfillBytesProcessed.WithLabelValues(b.Name), 367 } 368 369 defer instrumentedReader.Close() 370 371 repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader) 372 if err != nil { 373 return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err) 374 } 375 return repo, nil 376} 377 378// BackfillRepo backfills a repo 379func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) { 380 ctx, span := tracer.Start(ctx, "BackfillRepo") 381 defer span.End() 382 383 start := time.Now() 384 385 repoDID := job.Repo() 386 387 log := slog.With("source", "backfiller_backfill_repo", "repo", repoDID) 388 if job.RetryCount() > 0 { 389 log = log.With("retry_count", job.RetryCount()) 390 } 391 log.Info(fmt.Sprintf("processing backfill for %s", repoDID)) 392 393 var r *repo.Repo 394 if b.tryRelayRepoFetch { 395 rr, err := b.fetchRepo(ctx, repoDID, job.Rev(), b.RelayHost) 396 if err != nil { 397 slog.Warn("repo CAR fetch from relay failed", "did", repoDID, "since", job.Rev(), "relayHost", b.RelayHost, "err", err) 398 } else { 399 r = rr 400 } 401 } 402 403 if r == nil { 404 ident, err := b.Directory.LookupDID(ctx, syntax.DID(repoDID)) 405 if err != nil { 406 return "failed resolving DID to PDS repo", fmt.Errorf("resolving DID for PDS repo fetch: %w", err) 407 } 408 pdsHost := ident.PDSEndpoint() 409 if pdsHost == "" { 410 return "DID document missing PDS endpoint", fmt.Errorf("no PDS endpoint for DID: %s", repoDID) 411 } 412 413 r, err = b.fetchRepo(ctx, repoDID, job.Rev(), pdsHost) 414 if err != nil { 415 slog.Warn("repo CAR fetch from PDS failed", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err) 416 rfe, ok := err.(*FetchRepoError) 417 if ok { 418 return fmt.Sprintf("failed to fetch repo CAR from PDS (http %d:%s)", rfe.StatusCode, rfe.Status), err 419 } 420 return "failed to fetch repo CAR from PDS", err 421 } 422 } 423 424 numRecords := 0 425 numRoutines := b.ParallelRecordCreates 426 recordQueue := make(chan recordQueueItem, numRoutines) 427 recordResults := make(chan recordResult, numRoutines) 428 429 // Producer routine 430 go func() { 431 defer close(recordQueue) 432 if err := r.ForEach(ctx, b.NSIDFilter, func(recordPath string, nodeCid cid.Cid) error { 433 numRecords++ 434 recordQueue <- recordQueueItem{recordPath: recordPath, nodeCid: nodeCid} 435 return nil 436 }); err != nil { 437 log.Error("failed to iterate records in repo", "err", err) 438 } 439 }() 440 441 rev := r.SignedCommit().Rev 442 443 // Consumer routines 444 wg := sync.WaitGroup{} 445 for i := 0; i < numRoutines; i++ { 446 wg.Add(1) 447 go func() { 448 defer wg.Done() 449 for item := range recordQueue { 450 blk, err := r.Blockstore().Get(ctx, item.nodeCid) 451 if err != nil { 452 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to get blocks for record: %w", err)} 453 continue 454 } 455 456 raw := blk.RawData() 457 458 err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid) 459 if err != nil { 460 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} 461 continue 462 } 463 464 backfillRecordsProcessed.WithLabelValues(b.Name).Inc() 465 recordResults <- recordResult{recordPath: item.recordPath, err: err} 466 } 467 }() 468 } 469 470 resultWG := sync.WaitGroup{} 471 resultWG.Add(1) 472 // Handle results 473 go func() { 474 defer resultWG.Done() 475 for result := range recordResults { 476 if result.err != nil { 477 log.Error("Error processing record", "record", result.recordPath, "error", result.err) 478 } 479 } 480 }() 481 482 wg.Wait() 483 close(recordResults) 484 resultWG.Wait() 485 486 if err := job.SetRev(ctx, r.SignedCommit().Rev); err != nil { 487 log.Error("failed to update rev after backfilling repo", "err", err) 488 } 489 490 // Process buffered operations, marking the job as "complete" when done 491 numProcessed := b.FlushBuffer(ctx, job) 492 493 log.Info("backfill complete", 494 "buffered_records_processed", numProcessed, 495 "records_backfilled", numRecords, 496 "duration", time.Since(start), 497 ) 498 499 return StateComplete, nil 500} 501 502const trust = true 503 504func (bf *Backfiller) getRecord(ctx context.Context, r *repo.Repo, op *atproto.SyncSubscribeRepos_RepoOp) (cid.Cid, *[]byte, error) { 505 if trust { 506 if op.Cid == nil { 507 return cid.Undef, nil, fmt.Errorf("op had no cid set") 508 } 509 510 c := (cid.Cid)(*op.Cid) 511 blk, err := r.Blockstore().Get(ctx, c) 512 if err != nil { 513 return cid.Undef, nil, err 514 } 515 516 raw := blk.RawData() 517 518 return c, &raw, nil 519 } else { 520 return r.GetRecordBytes(ctx, op.Path) 521 } 522} 523 524func (bf *Backfiller) HandleEvent(ctx context.Context, evt *atproto.SyncSubscribeRepos_Commit) error { 525 r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 526 if err != nil { 527 return fmt.Errorf("failed to read event repo: %w", err) 528 } 529 530 ops := make([]*BufferedOp, 0, len(evt.Ops)) 531 for _, op := range evt.Ops { 532 kind := repomgr.EventKind(op.Action) 533 switch kind { 534 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 535 cc, rec, err := bf.getRecord(ctx, r, op) 536 if err != nil { 537 return fmt.Errorf("getting record failed (%s,%s): %w", op.Action, op.Path, err) 538 } 539 ops = append(ops, &BufferedOp{ 540 Kind: kind, 541 Path: op.Path, 542 Record: rec, 543 Cid: &cc, 544 }) 545 case repomgr.EvtKindDeleteRecord: 546 ops = append(ops, &BufferedOp{ 547 Kind: kind, 548 Path: op.Path, 549 }) 550 default: 551 return fmt.Errorf("invalid op action: %q", op.Action) 552 } 553 } 554 555 if evt.Since == nil { 556 // The first event for a repo will have a nil since, we can enqueue the repo as "complete" to avoid fetching the empty repo 557 if err := bf.Store.EnqueueJobWithState(ctx, evt.Repo, StateComplete); err != nil { 558 return fmt.Errorf("failed to enqueue job with state for repo %q: %w", evt.Repo, err) 559 } 560 } 561 562 buffered, err := bf.BufferOps(ctx, evt.Repo, evt.Since, evt.Rev, ops) 563 if err != nil { 564 if errors.Is(err, ErrAlreadyProcessed) { 565 return nil 566 } 567 return fmt.Errorf("buffer ops failed: %w", err) 568 } 569 570 if buffered { 571 return nil 572 } 573 574 for _, op := range ops { 575 switch op.Kind { 576 case repomgr.EvtKindCreateRecord: 577 if err := bf.HandleCreateRecord(ctx, evt.Repo, evt.Rev, op.Path, op.Record, op.Cid); err != nil { 578 return fmt.Errorf("create record failed: %w", err) 579 } 580 case repomgr.EvtKindUpdateRecord: 581 if err := bf.HandleUpdateRecord(ctx, evt.Repo, evt.Rev, op.Path, op.Record, op.Cid); err != nil { 582 return fmt.Errorf("update record failed: %w", err) 583 } 584 case repomgr.EvtKindDeleteRecord: 585 if err := bf.HandleDeleteRecord(ctx, evt.Repo, evt.Rev, op.Path); err != nil { 586 return fmt.Errorf("delete record failed: %w", err) 587 } 588 } 589 } 590 591 if err := bf.Store.UpdateRev(ctx, evt.Repo, evt.Rev); err != nil { 592 return fmt.Errorf("failed to update rev: %w", err) 593 } 594 595 return nil 596} 597 598func (bf *Backfiller) BufferOp(ctx context.Context, repo string, since *string, rev string, kind repomgr.EventKind, path string, rec *[]byte, cid *cid.Cid) (bool, error) { 599 return bf.BufferOps(ctx, repo, since, rev, []*BufferedOp{{ 600 Path: path, 601 Kind: kind, 602 Record: rec, 603 Cid: cid, 604 }}) 605} 606 607func (bf *Backfiller) BufferOps(ctx context.Context, repo string, since *string, rev string, ops []*BufferedOp) (bool, error) { 608 j, err := bf.Store.GetJob(ctx, repo) 609 if err != nil { 610 if !errors.Is(err, ErrJobNotFound) { 611 return false, err 612 } 613 if qerr := bf.Store.EnqueueJob(ctx, repo); qerr != nil { 614 return false, fmt.Errorf("failed to enqueue job for unknown repo: %w", qerr) 615 } 616 617 nj, err := bf.Store.GetJob(ctx, repo) 618 if err != nil { 619 return false, err 620 } 621 622 j = nj 623 } 624 625 return j.BufferOps(ctx, since, rev, ops) 626} 627 628// MaxRetries is the maximum number of times to retry a backfill job 629var MaxRetries = 10 630 631func computeExponentialBackoff(attempt int) time.Duration { 632 return time.Duration(1<<uint(attempt)) * 10 * time.Second 633}