like malachite (atproto-lastfm-importer) but in go and bluer
go
spotify
tealfm
lastfm
atproto
1package sync
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log/slog"
9 "net/http"
10 "net/url"
11 "strings"
12 "time"
13
14 "github.com/bluesky-social/indigo/atproto/atclient"
15 "github.com/bluesky-social/indigo/repo"
16 "github.com/failsafe-go/failsafe-go"
17 "github.com/failsafe-go/failsafe-go/retrypolicy"
18 "github.com/fxamacker/cbor/v2"
19 "github.com/ipfs/go-cid"
20
21 "tangled.org/karitham.dev/lazuli/atproto"
22 "tangled.org/karitham.dev/lazuli/cache"
23)
24
25var (
26 getRepoRetryPolicy = retrypolicy.NewBuilder[io.ReadCloser]().
27 WithMaxRetries(10).
28 WithBackoff(BaseRetryDelay, 5*time.Minute).
29 HandleIf(func(_ io.ReadCloser, err error) bool {
30 return IsTransientError(err)
31 }).
32 Build()
33)
34
35func FetchExistingViaCAR(ctx context.Context, apiClient *atproto.Client, did string, collection string, storage cache.Storage) ([]ExistingRecord, error) {
36 slog.Info("fetching repo via CAR export (this may take a while for large repos)")
37
38 repoReader, err := failsafe.With(getRepoRetryPolicy).WithContext(ctx).Get(func() (io.ReadCloser, error) {
39 resp, err := apiClient.APIClient().Do(ctx, &atclient.APIRequest{
40 Method: "GET",
41 Endpoint: "com.atproto.sync.getRepo",
42 QueryParams: url.Values{
43 "did": []string{did},
44 },
45 Headers: http.Header{
46 "Accept": []string{"application/vnd.ipld.car"},
47 },
48 })
49 if err != nil {
50 return nil, fmt.Errorf("failed to fetch repo: %w", err)
51 }
52
53 return resp.Body, nil
54 })
55 if err != nil {
56 return nil, fmt.Errorf("failed to fetch CAR: %w", err)
57 }
58
59 defer repoReader.Close() //nolint: errcheck
60
61 slog.Info("CAR received, parsing...")
62
63 allRecords, err := playRecords(ctx, repoReader, did, collection)
64 if err != nil {
65 return nil, fmt.Errorf("failed to parse CAR: %w", err)
66 }
67
68 slog.Info("CAR parsed", slog.Int("count", len(allRecords)))
69
70 if len(allRecords) > 0 {
71 cacheEntries := make(map[string][]byte)
72 keys := make([]string, 0, len(allRecords))
73 for _, rec := range allRecords {
74 parts := strings.Split(rec.URI, "/")
75 key := parts[len(parts)-1]
76 if key == "" {
77 key = CreateRecordKey(rec.Value)
78 }
79 value, _ := json.Marshal(rec.Value)
80 cacheEntries[key] = value
81 keys = append(keys, key)
82 }
83
84 if err := storage.SaveRecords(did, cacheEntries); err != nil {
85 return nil, err
86 }
87
88 if err := storage.MarkPublished(did, keys...); err != nil {
89 return nil, err
90 }
91
92 slog.Debug("saved to cache and marked as published", slog.Int("count", len(allRecords)))
93 }
94
95 return allRecords, nil
96}
97
98func playRecords(ctx context.Context, repoReader io.Reader, did, collection string) ([]ExistingRecord, error) {
99 var er []ExistingRecord
100
101 err := repo.StreamRepoRecords(ctx, repoReader, collection, nil, func(_ string, c cid.Cid, v []byte) error {
102 rec, err := parseRecord(c.String(), did, v)
103 if err != nil {
104 slog.With(slog.Any("err", err)).Error("failed to parse record")
105
106 return nil
107 }
108
109 er = append(er, rec)
110
111 return nil
112 })
113
114 if err != nil {
115 return nil, fmt.Errorf("failed to stream repo: %w", err)
116 }
117
118 return er, nil
119}
120
121func parseRecord(cid string, did string, data []byte) (ExistingRecord, error) {
122 record := ExistingRecord{
123 CID: cid,
124 Value: &PlayRecord{},
125 }
126
127 err := cbor.Unmarshal(data, record.Value)
128 if err != nil {
129 return record, fmt.Errorf("failed to parse record: %w", err)
130 }
131
132 record.URI = generateRecordURI(did, record.Value)
133
134 return record, err
135}