fork of indigo with slightly nicer lexgen
at main 5.0 kB view raw
1package indexer 2 3import ( 4 "bytes" 5 "context" 6 "errors" 7 "fmt" 8 "io" 9 "io/fs" 10 "log/slog" 11 "sync" 12 13 "github.com/bluesky-social/indigo/api/atproto" 14 "github.com/bluesky-social/indigo/models" 15 "github.com/bluesky-social/indigo/repomgr" 16 "github.com/bluesky-social/indigo/xrpc" 17 ipld "github.com/ipfs/go-ipld-format" 18 "go.opentelemetry.io/otel" 19 "go.opentelemetry.io/otel/attribute" 20 "golang.org/x/time/rate" 21 "gorm.io/gorm" 22) 23 24func NewRepoFetcher(db *gorm.DB, rm *repomgr.RepoManager, maxConcurrency int) *RepoFetcher { 25 return &RepoFetcher{ 26 repoman: rm, 27 db: db, 28 Limiters: make(map[uint]*rate.Limiter), 29 ApplyPDSClientSettings: func(*xrpc.Client) {}, 30 MaxConcurrency: maxConcurrency, 31 log: slog.Default().With("system", "indexer"), 32 } 33} 34 35type RepoFetcher struct { 36 repoman *repomgr.RepoManager 37 db *gorm.DB 38 39 Limiters map[uint]*rate.Limiter 40 LimitMux sync.RWMutex 41 42 MaxConcurrency int 43 44 ApplyPDSClientSettings func(*xrpc.Client) 45 46 log *slog.Logger 47} 48 49func (rf *RepoFetcher) GetLimiter(pdsID uint) *rate.Limiter { 50 rf.LimitMux.RLock() 51 defer rf.LimitMux.RUnlock() 52 53 return rf.Limiters[pdsID] 54} 55 56func (rf *RepoFetcher) GetOrCreateLimiter(pdsID uint, pdsrate float64) *rate.Limiter { 57 rf.LimitMux.Lock() 58 defer rf.LimitMux.Unlock() 59 60 lim, ok := rf.Limiters[pdsID] 61 if !ok { 62 lim = rate.NewLimiter(rate.Limit(pdsrate), 1) 63 rf.Limiters[pdsID] = lim 64 } 65 66 return lim 67} 68 69func (rf *RepoFetcher) SetLimiter(pdsID uint, lim *rate.Limiter) { 70 rf.LimitMux.Lock() 71 defer rf.LimitMux.Unlock() 72 73 rf.Limiters[pdsID] = lim 74} 75 76func (rf *RepoFetcher) fetchRepo(ctx context.Context, c *xrpc.Client, pds *models.PDS, did string, rev string) ([]byte, error) { 77 ctx, span := otel.Tracer("indexer").Start(ctx, "fetchRepo") 78 defer span.End() 79 80 span.SetAttributes( 81 attribute.String("pds", pds.Host), 82 attribute.String("did", did), 83 attribute.String("rev", rev), 84 ) 85 86 limiter := rf.GetOrCreateLimiter(pds.ID, pds.CrawlRateLimit) 87 88 // Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos 89 limiter.Wait(ctx) 90 91 rf.log.Debug("SyncGetRepo", "did", did, "since", rev) 92 // TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us 93 repo, err := atproto.SyncGetRepo(ctx, c, did, rev) 94 if err != nil { 95 reposFetched.WithLabelValues("fail").Inc() 96 return nil, fmt.Errorf("failed to fetch repo (did=%s,rev=%s,host=%s): %w", did, rev, pds.Host, err) 97 } 98 reposFetched.WithLabelValues("success").Inc() 99 100 return repo, nil 101} 102 103// TODO: since this function is the only place we depend on the repomanager, i wonder if this should be wired some other way? 104func (rf *RepoFetcher) FetchAndIndexRepo(ctx context.Context, job *crawlWork) error { 105 ctx, span := otel.Tracer("indexer").Start(ctx, "FetchAndIndexRepo") 106 defer span.End() 107 108 span.SetAttributes(attribute.Int("catchup", len(job.catchup))) 109 110 ai := job.act 111 112 var pds models.PDS 113 if err := rf.db.First(&pds, "id = ?", ai.PDS).Error; err != nil { 114 catchupEventsFailed.WithLabelValues("nopds").Inc() 115 return fmt.Errorf("expected to find pds record (%d) in db for crawling one of their users: %w", ai.PDS, err) 116 } 117 118 rev, err := rf.repoman.GetRepoRev(ctx, ai.Uid) 119 if err != nil && !isNotFound(err) { 120 catchupEventsFailed.WithLabelValues("noroot").Inc() 121 return fmt.Errorf("failed to get repo root: %w", err) 122 } 123 124 // attempt to process buffered events 125 if !job.initScrape && len(job.catchup) > 0 { 126 first := job.catchup[0] 127 var resync bool 128 if first.evt.Since == nil || rev == *first.evt.Since { 129 for i, j := range job.catchup { 130 catchupEventsProcessed.Inc() 131 if err := rf.repoman.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, j.evt.Since, j.evt.Rev, j.evt.Blocks, j.evt.Ops); err != nil { 132 rf.log.Error("buffered event catchup failed", "error", err, "did", ai.Did, "i", i, "jobCount", len(job.catchup), "seq", j.evt.Seq) 133 resync = true // fall back to a repo sync 134 break 135 } 136 } 137 138 if !resync { 139 return nil 140 } 141 } 142 } 143 144 revp := &rev 145 if rev == "" { 146 span.SetAttributes(attribute.Bool("full", true)) 147 revp = nil 148 } 149 150 c := models.ClientForPds(&pds) 151 rf.ApplyPDSClientSettings(c) 152 153 repo, err := rf.fetchRepo(ctx, c, &pds, ai.Did, rev) 154 if err != nil { 155 return err 156 } 157 158 if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), revp); err != nil { 159 span.RecordError(err) 160 161 if ipld.IsNotFound(err) || errors.Is(err, io.EOF) || errors.Is(err, fs.ErrNotExist) { 162 rf.log.Error("partial repo fetch was missing data", "did", ai.Did, "pds", pds.Host, "rev", rev) 163 repo, err := rf.fetchRepo(ctx, c, &pds, ai.Did, "") 164 if err != nil { 165 return err 166 } 167 168 if err := rf.repoman.ImportNewRepo(ctx, ai.Uid, ai.Did, bytes.NewReader(repo), nil); err != nil { 169 span.RecordError(err) 170 return fmt.Errorf("failed to import backup repo (%s): %w", ai.Did, err) 171 } 172 173 return nil 174 } 175 return fmt.Errorf("importing fetched repo (curRev: %s): %w", rev, err) 176 } 177 178 return nil 179}