package sync import ( "context" "encoding/json" "fmt" "io" "log/slog" "net/http" "net/url" "strings" "time" "github.com/bluesky-social/indigo/atproto/atclient" "github.com/bluesky-social/indigo/repo" "github.com/failsafe-go/failsafe-go" "github.com/failsafe-go/failsafe-go/retrypolicy" "github.com/fxamacker/cbor/v2" "github.com/ipfs/go-cid" "tangled.org/karitham.dev/lazuli/atproto" "tangled.org/karitham.dev/lazuli/cache" ) var ( getRepoRetryPolicy = retrypolicy.NewBuilder[io.ReadCloser](). WithMaxRetries(10). WithBackoff(BaseRetryDelay, 5*time.Minute). HandleIf(func(_ io.ReadCloser, err error) bool { return IsTransientError(err) }). Build() ) func FetchExistingViaCAR(ctx context.Context, apiClient *atproto.Client, did string, collection string, storage cache.Storage) ([]ExistingRecord, error) { slog.Info("fetching repo via CAR export (this may take a while for large repos)") repoReader, err := failsafe.With(getRepoRetryPolicy).WithContext(ctx).Get(func() (io.ReadCloser, error) { resp, err := apiClient.APIClient().Do(ctx, &atclient.APIRequest{ Method: "GET", Endpoint: "com.atproto.sync.getRepo", QueryParams: url.Values{ "did": []string{did}, }, Headers: http.Header{ "Accept": []string{"application/vnd.ipld.car"}, }, }) if err != nil { return nil, fmt.Errorf("failed to fetch repo: %w", err) } return resp.Body, nil }) if err != nil { return nil, fmt.Errorf("failed to fetch CAR: %w", err) } defer repoReader.Close() //nolint: errcheck slog.Info("CAR received, parsing...") allRecords, err := playRecords(ctx, repoReader, did, collection) if err != nil { return nil, fmt.Errorf("failed to parse CAR: %w", err) } slog.Info("CAR parsed", slog.Int("count", len(allRecords))) if len(allRecords) > 0 { cacheEntries := make(map[string][]byte) keys := make([]string, 0, len(allRecords)) for _, rec := range allRecords { parts := strings.Split(rec.URI, "/") key := parts[len(parts)-1] if key == "" { key = CreateRecordKey(rec.Value) } value, _ := json.Marshal(rec.Value) cacheEntries[key] = value keys = append(keys, key) } if err := storage.SaveRecords(did, cacheEntries); err != nil { return nil, err } if err := storage.MarkPublished(did, keys...); err != nil { return nil, err } slog.Debug("saved to cache and marked as published", slog.Int("count", len(allRecords))) } return allRecords, nil } func playRecords(ctx context.Context, repoReader io.Reader, did, collection string) ([]ExistingRecord, error) { var er []ExistingRecord err := repo.StreamRepoRecords(ctx, repoReader, collection, nil, func(_ string, c cid.Cid, v []byte) error { rec, err := parseRecord(c.String(), did, v) if err != nil { slog.With(slog.Any("err", err)).Error("failed to parse record") return nil } er = append(er, rec) return nil }) if err != nil { return nil, fmt.Errorf("failed to stream repo: %w", err) } return er, nil } func parseRecord(cid string, did string, data []byte) (ExistingRecord, error) { record := ExistingRecord{ CID: cid, Value: &PlayRecord{}, } err := cbor.Unmarshal(data, record.Value) if err != nil { return record, fmt.Errorf("failed to parse record: %w", err) } record.URI = generateRecordURI(did, record.Value) return record, err }