1package indexer
2
3import (
4 "bytes"
5 "context"
6 "errors"
7 "fmt"
8 "io"
9 "io/fs"
10 "log/slog"
11 "sync"
12
13 "github.com/bluesky-social/indigo/api/atproto"
14 "github.com/bluesky-social/indigo/models"
15 "github.com/bluesky-social/indigo/repomgr"
16 "github.com/bluesky-social/indigo/xrpc"
17 ipld "github.com/ipfs/go-ipld-format"
18 "go.opentelemetry.io/otel"
19 "go.opentelemetry.io/otel/attribute"
20 "golang.org/x/time/rate"
21 "gorm.io/gorm"
22)
23
24func NewRepoFetcher(db *gorm.DB, rm *repomgr.RepoManager, maxConcurrency int) *RepoFetcher {
25 return &RepoFetcher{
26 repoman: rm,
27 db: db,
28 Limiters: make(map[uint]*rate.Limiter),
29 ApplyPDSClientSettings: func(*xrpc.Client) {},
30 MaxConcurrency: maxConcurrency,
31 log: slog.Default().With("system", "indexer"),
32 }
33}
34
35type RepoFetcher struct {
36 repoman *repomgr.RepoManager
37 db *gorm.DB
38
39 Limiters map[uint]*rate.Limiter
40 LimitMux sync.RWMutex
41
42 MaxConcurrency int
43
44 ApplyPDSClientSettings func(*xrpc.Client)
45
46 log *slog.Logger
47}
48
49func (rf *RepoFetcher) GetLimiter(pdsID uint) *rate.Limiter {
50 rf.LimitMux.RLock()
51 defer rf.LimitMux.RUnlock()
52
53 return rf.Limiters[pdsID]
54}
55
56func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Limiter {
57 rf.LimitMux.Lock()
58 defer rf.LimitMux.Unlock()
59
60 lim, ok := rf.Limiters[pdsID]
61 if !ok {
62 lim = rate.NewLimiter(rate.Limit(pdsrate), 1)
63 rf.Limiters[pdsID] = lim
64 }
65
66 return lim
67}
68
69func (rf *RepoFetcher) SetLimiter(pdsID uint, lim *rate.Limiter) {
70 rf.LimitMux.Lock()
71 defer rf.LimitMux.Unlock()
72
73 rf.Limiters[pdsID] = lim
74}
75
76func (rf *RepoFetcher) fetchRepo(ctx context.Context, c *xrpc.Client, pds *models.PDS, did string, rev string) ([]byte, error) {
77 ctx, span := otel.Tracer("indexer").Start(ctx, "fetchRepo")
78 defer span.End()
79
80 span.SetAttributes(
81 attribute.String("pds", pds.Host),
82 attribute.String("did", did),
83 attribute.String("rev", rev),
84 )
85
86 limiter := rf.GetOrCreateLimiter(pds.ID, pds.CrawlRateLimit)
87
88 // Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos
89 limiter.Wait(ctx)
90
91 rf.log.Debug("SyncGetRepo", "did", did, "since", rev)
92 // TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us
93 repo, err := atproto.SyncGetRepo(ctx, c, did, rev)
94 if err != nil {
95 reposFetched.WithLabelValues("fail").Inc()
96 return nil, fmt.Errorf("failed to fetch repo (did=%s,rev=%s,host=%s): %w", did, rev, pds.Host, err)
97 }
98 reposFetched.WithLabelValues("success").Inc()
99
100 return repo, nil
101}
102
103// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way?
104func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error {
105 ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo")
106 defer span.End()
107
108 span.SetAttributes(attribute.Int("catchup", len(job.catchup)))
109
110 ai := job.act
111
112 var pds models.PDS
113 if err := rf.db.First(&pds, "id = ?", ai.PDS).Error; err != nil {
114 catchupEventsFailed.WithLabelValues("nopds").Inc()
115 return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err)
116 }
117
118 rev, err := rf.repoman.GetRepoRev(ctx, ai.Uid)
119 if err != nil && !isNotFound(err) {
120 catchupEventsFailed.WithLabelValues("noroot").Inc()
121 return fmt.Errorf("failed to get repo root: %w", err)
122 }
123
124 // attempt to process buffered events
125 if !job.initScrape && len(job.catchup) > 0 {
126 first := job.catchup[0]
127 var resync bool
128 if first.evt.Since == nil || rev == *first.evt.Since {
129 for i, j := range job.catchup {
130 catchupEventsProcessed.Inc()
131 if err := rf.repoman.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, j.evt.Since, j.evt.Rev, j.evt.Blocks, j.evt.Ops); err != nil {
132 rf.log.Error("buffered event catchup failed", "error", err, "did", ai.Did, "i", i, "jobCount", len(job.catchup), "seq", j.evt.Seq)
133 resync = true // fall back to a repo sync
134 break
135 }
136 }
137
138 if !resync {
139 return nil
140 }
141 }
142 }
143
144 revp := &rev
145 if rev == "" {
146 span.SetAttributes(attribute.Bool("full", true))
147 revp = nil
148 }
149
150 c := models.ClientForPds(&pds)
151 rf.ApplyPDSClientSettings(c)
152
153 repo, err := rf.fetchRepo(ctx, c, &pds, ai.Did, rev)
154 if err != nil {
155 return err
156 }
157
158 if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), revp); err != nil {
159 span.RecordError(err)
160
161 if ipld.IsNotFound(err) || errors.Is(err, io.EOF) || errors.Is(err, fs.ErrNotExist) {
162 rf.log.Error("partial repo fetch was missing data", "did", ai.Did, "pds", pds.Host, "rev", rev)
163 repo, err := rf.fetchRepo(ctx, c, &pds, ai.Did, "")
164 if err != nil {
165 return err
166 }
167
168 if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), nil); err != nil {
169 span.RecordError(err)
170 return fmt.Errorf("failed to import backup repo (%s): %w", ai.Did, err)
171 }
172
173 return nil
174 }
175 return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err)
176 }
177
178 return nil
179}