fork of indigo with slightly nicer lexgen

backfill: add failback to fetch CAR file from PDS (if relay errors) (#855)

The main motivation here is that non-archival relay is returning a 4xx
error for fetching repos, and we want to try fetching those from the
actual PDS when that happens. This code adds a new code branch when a
relay CAR fetch fails to do an identity lookup to find the account's PDS
instance, and fetches the CAR from there.

For the search code specifically, it re-uses an existing identity
directory, to reduce double-resolution.

This also refactors how fetch URLs are constructed to use just
hostnames.

UPDATE: should probably *not* merge this to `main` until Jaz can review

authored by bnewbold.net and committed by GitHub 7fd58873 72b4acb7

Changed files
+63 -34
backfill
search
+60 -33
backfill/backfill.go
··· 12 12 "time" 13 13 14 14 "github.com/bluesky-social/indigo/api/atproto" 15 + "github.com/bluesky-social/indigo/atproto/identity" 16 + "github.com/bluesky-social/indigo/atproto/syntax" 15 17 "github.com/bluesky-social/indigo/repo" 16 18 "github.com/bluesky-social/indigo/repomgr" 19 + 17 20 "github.com/ipfs/go-cid" 18 21 "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" 19 22 "go.opentelemetry.io/otel" ··· 71 74 ParallelRecordCreates int 72 75 // Prefix match for records to backfill i.e. app.bsky.feed.app/ 73 76 // If empty, all records will be backfilled 74 - NSIDFilter string 75 - CheckoutPath string 77 + NSIDFilter string 78 + RelayHost string 76 79 77 80 syncLimiter *rate.Limiter 78 81 ··· 80 83 magicHeaderVal string 81 84 82 85 stop chan chan struct{} 86 + 87 + Directory identity.Directory 83 88 } 84 89 85 90 var ( ··· 110 115 ParallelRecordCreates int 111 116 NSIDFilter string 112 117 SyncRequestsPerSecond int 113 - CheckoutPath string 118 + RelayHost string 114 119 } 115 120 116 121 func DefaultBackfillOptions() *BackfillOptions { ··· 119 124 ParallelRecordCreates: 100, 120 125 NSIDFilter: "", 121 126 SyncRequestsPerSecond: 2, 122 - CheckoutPath: "https://bsky.network/xrpc/com.atproto.sync.getRepo", 127 + RelayHost: "https://bsky.network", 123 128 } 124 129 } 125 130 ··· 145 150 ParallelRecordCreates: opts.ParallelRecordCreates, 146 151 NSIDFilter: opts.NSIDFilter, 147 152 syncLimiter: rate.NewLimiter(rate.Limit(opts.SyncRequestsPerSecond), 1), 148 - CheckoutPath: opts.CheckoutPath, 153 + RelayHost: opts.RelayHost, 149 154 stop: make(chan chan struct{}, 1), 155 + Directory: identity.DefaultDirectory(), 150 156 } 151 157 } 152 158 ··· 292 298 err error 293 299 } 294 300 295 - // BackfillRepo backfills a repo 296 - func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) { 297 - ctx, span := tracer.Start(ctx, "BackfillRepo") 298 - defer span.End() 301 + // Fetches a repo CAR file over HTTP from the indicated host. If successful, parses the CAR and returns repo.Repo 302 + func (b *Backfiller) fetchRepo(ctx context.Context, did, since, host string) (*repo.Repo, error) { 303 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", host, did) 299 304 300 - start := time.Now() 301 - 302 - repoDid := job.Repo() 303 - 304 - log := slog.With("source", "backfiller_backfill_repo", "repo", repoDid) 305 - if job.RetryCount() > 0 { 306 - log = log.With("retry_count", job.RetryCount()) 307 - } 308 - log.Info(fmt.Sprintf("processing backfill for %s", repoDid)) 309 - 310 - url := fmt.Sprintf("%s?did=%s", b.CheckoutPath, repoDid) 311 - 312 - if job.Rev() != "" { 313 - url = url + fmt.Sprintf("&since=%s", job.Rev()) 305 + if since != "" { 306 + url = url + fmt.Sprintf("&since=%s", since) 314 307 } 315 308 316 309 // GET and CAR decode the body ··· 320 313 } 321 314 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 322 315 if err != nil { 323 - state := fmt.Sprintf("failed (create request: %s)", err.Error()) 324 - return state, fmt.Errorf("failed to create request: %w", err) 316 + return nil, fmt.Errorf("failed to create request: %w", err) 325 317 } 326 318 327 319 req.Header.Set("Accept", "application/vnd.ipld.car") ··· 334 326 335 327 resp, err := client.Do(req) 336 328 if err != nil { 337 - state := fmt.Sprintf("failed (do request: %s)", err.Error()) 338 - return state, fmt.Errorf("failed to send request: %w", err) 329 + return nil, fmt.Errorf("failed to send request: %w", err) 339 330 } 340 331 341 332 if resp.StatusCode != http.StatusOK { ··· 345 336 } else { 346 337 reason = resp.Status 347 338 } 348 - state := fmt.Sprintf("failed (%s)", reason) 349 - return state, fmt.Errorf("failed to get repo: %s", reason) 339 + return nil, fmt.Errorf("failed to get repo: %s", reason) 350 340 } 351 341 352 342 instrumentedReader := instrumentedReader{ ··· 356 346 357 347 defer instrumentedReader.Close() 358 348 359 - r, err := repo.ReadRepoFromCar(ctx, instrumentedReader) 349 + repo, err := repo.ReadRepoFromCar(ctx, instrumentedReader) 360 350 if err != nil { 361 - state := "failed (couldn't read repo CAR from response body)" 362 - return state, fmt.Errorf("failed to read repo from car: %w", err) 351 + return nil, fmt.Errorf("failed to parse repo from CAR file: %w", err) 352 + } 353 + return repo, nil 354 + } 355 + 356 + // BackfillRepo backfills a repo 357 + func (b *Backfiller) BackfillRepo(ctx context.Context, job Job) (string, error) { 358 + ctx, span := tracer.Start(ctx, "BackfillRepo") 359 + defer span.End() 360 + 361 + start := time.Now() 362 + 363 + repoDID := job.Repo() 364 + 365 + log := slog.With("source", "backfiller_backfill_repo", "repo", repoDID) 366 + if job.RetryCount() > 0 { 367 + log = log.With("retry_count", job.RetryCount()) 368 + } 369 + log.Info(fmt.Sprintf("processing backfill for %s", repoDID)) 370 + 371 + // first try with Relay endpoint 372 + r, err := b.fetchRepo(ctx, repoDID, job.Rev(), b.RelayHost) 373 + if err != nil { 374 + slog.Warn("repo CAR fetch from relay failed", "did", repoDID, "since", job.Rev(), "relayHost", b.RelayHost, "err", err) 375 + // fallback to direct PDS fetch 376 + ident, err := b.Directory.LookupDID(ctx, syntax.DID(repoDID)) 377 + if err != nil { 378 + return "failed resolving DID to PDS repo", fmt.Errorf("resolving DID for PDS repo fetch: %w", err) 379 + } 380 + pdsHost := ident.PDSEndpoint() 381 + if pdsHost == "" { 382 + return "DID document missing PDS endpoint", fmt.Errorf("no PDS endpoint for DID: %s", repoDID) 383 + } 384 + r, err = b.fetchRepo(ctx, repoDID, job.Rev(), pdsHost) 385 + if err != nil { 386 + slog.Warn("repo CAR fetch from PDS failed", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err) 387 + return "repo CAR fetch from PDS failed", err 388 + } 389 + slog.Info("repo CAR fetch from PDS successful", "did", repoDID, "since", job.Rev(), "pdsHost", pdsHost, "err", err) 363 390 } 364 391 365 392 numRecords := 0 ··· 396 423 397 424 raw := blk.RawData() 398 425 399 - err = b.HandleCreateRecord(ctx, repoDid, rev, item.recordPath, &raw, &item.nodeCid) 426 + err = b.HandleCreateRecord(ctx, repoDID, rev, item.recordPath, &raw, &item.nodeCid) 400 427 if err != nil { 401 428 recordResults <- recordResult{recordPath: item.recordPath, err: fmt.Errorf("failed to handle create record: %w", err)} 402 429 continue
+3 -1
search/indexing.go
··· 130 130 opts.SyncRequestsPerSecond = 8 131 131 } 132 132 133 - opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", relayHTTP) 133 + opts.RelayHost = relayHTTP 134 134 if config.IndexMaxConcurrency > 0 { 135 135 opts.ParallelRecordCreates = config.IndexMaxConcurrency 136 136 } else { ··· 145 145 idx.handleDelete, 146 146 opts, 147 147 ) 148 + // reuse identity directory (for efficient caching) 149 + bf.Directory = dir 148 150 149 151 idx.bfs = bfstore 150 152 idx.bf = bf