like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
at main 135 lines 3.4 kB view raw
1package sync 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log/slog" 9 "net/http" 10 "net/url" 11 "strings" 12 "time" 13 14 "github.com/bluesky-social/indigo/atproto/atclient" 15 "github.com/bluesky-social/indigo/repo" 16 "github.com/failsafe-go/failsafe-go" 17 "github.com/failsafe-go/failsafe-go/retrypolicy" 18 "github.com/fxamacker/cbor/v2" 19 "github.com/ipfs/go-cid" 20 21 "tangled.org/karitham.dev/lazuli/atproto" 22 "tangled.org/karitham.dev/lazuli/cache" 23) 24 25var ( 26 getRepoRetryPolicy = retrypolicy.NewBuilder[io.ReadCloser](). 27 WithMaxRetries(10). 28 WithBackoff(BaseRetryDelay, 5*time.Minute). 29 HandleIf(func(_ io.ReadCloser, err error) bool { 30 return IsTransientError(err) 31 }). 32 Build() 33) 34 35func FetchExistingViaCAR(ctx context.Context, apiClient *atproto.Client, did string, collection string, storage cache.Storage) ([]ExistingRecord, error) { 36 slog.Info("fetching repo via CAR export (this may take a while for large repos)") 37 38 repoReader, err := failsafe.With(getRepoRetryPolicy).WithContext(ctx).Get(func() (io.ReadCloser, error) { 39 resp, err := apiClient.APIClient().Do(ctx, &atclient.APIRequest{ 40 Method: "GET", 41 Endpoint: "com.atproto.sync.getRepo", 42 QueryParams: url.Values{ 43 "did": []string{did}, 44 }, 45 Headers: http.Header{ 46 "Accept": []string{"application/vnd.ipld.car"}, 47 }, 48 }) 49 if err != nil { 50 return nil, fmt.Errorf("failed to fetch repo: %w", err) 51 } 52 53 return resp.Body, nil 54 }) 55 if err != nil { 56 return nil, fmt.Errorf("failed to fetch CAR: %w", err) 57 } 58 59 defer repoReader.Close() //nolint: errcheck 60 61 slog.Info("CAR received, parsing...") 62 63 allRecords, err := playRecords(ctx, repoReader, did, collection) 64 if err != nil { 65 return nil, fmt.Errorf("failed to parse CAR: %w", err) 66 } 67 68 slog.Info("CAR parsed", slog.Int("count", len(allRecords))) 69 70 if len(allRecords) > 0 { 71 cacheEntries := make(map[string][]byte) 72 keys := make([]string, 0, len(allRecords)) 73 for _, rec := range allRecords { 74 parts := strings.Split(rec.URI, "/") 75 key := parts[len(parts)-1] 76 if key == "" { 77 key = CreateRecordKey(rec.Value) 78 } 79 value, _ := json.Marshal(rec.Value) 80 cacheEntries[key] = value 81 keys = append(keys, key) 82 } 83 84 if err := storage.SaveRecords(did, cacheEntries); err != nil { 85 return nil, err 86 } 87 88 if err := storage.MarkPublished(did, keys...); err != nil { 89 return nil, err 90 } 91 92 slog.Debug("saved to cache and marked as published", slog.Int("count", len(allRecords))) 93 } 94 95 return allRecords, nil 96} 97 98func playRecords(ctx context.Context, repoReader io.Reader, did, collection string) ([]ExistingRecord, error) { 99 var er []ExistingRecord 100 101 err := repo.StreamRepoRecords(ctx, repoReader, collection, nil, func(_ string, c cid.Cid, v []byte) error { 102 rec, err := parseRecord(c.String(), did, v) 103 if err != nil { 104 slog.With(slog.Any("err", err)).Error("failed to parse record") 105 106 return nil 107 } 108 109 er = append(er, rec) 110 111 return nil 112 }) 113 114 if err != nil { 115 return nil, fmt.Errorf("failed to stream repo: %w", err) 116 } 117 118 return er, nil 119} 120 121func parseRecord(cid string, did string, data []byte) (ExistingRecord, error) { 122 record := ExistingRecord{ 123 CID: cid, 124 Value: &PlayRecord{}, 125 } 126 127 err := cbor.Unmarshal(data, record.Value) 128 if err != nil { 129 return record, fmt.Errorf("failed to parse record: %w", err) 130 } 131 132 record.URI = generateRecordURI(did, record.Value) 133 134 return record, err 135}