1package indexer
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "sync"
8 "time"
9
10 comatproto "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/models"
12
13 "go.opentelemetry.io/otel"
14)
15
16type CrawlDispatcher struct {
17 // from Crawl()
18 ingest chan *models.ActorInfo
19
20 // from AddToCatchupQueue()
21 catchup chan *crawlWork
22
23 // from main loop to fetchWorker()
24 repoSync chan *crawlWork
25
26 complete chan models.Uid
27
28 maplk sync.Mutex
29 todo map[models.Uid]*crawlWork
30 inProgress map[models.Uid]*crawlWork
31
32 repoFetcher CrawlRepoFetcher
33
34 concurrency int
35
36 log *slog.Logger
37
38 done chan struct{}
39}
40
41// this is what we need of RepoFetcher
42type CrawlRepoFetcher interface {
43 FetchAndIndexRepo(ctx context.Context, job *crawlWork) error
44}
45
46func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
47 if concurrency < 1 {
48 return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
49 }
50
51 out := &CrawlDispatcher{
52 ingest: make(chan *models.ActorInfo),
53 repoSync: make(chan *crawlWork),
54 complete: make(chan models.Uid),
55 catchup: make(chan *crawlWork),
56 repoFetcher: repoFetcher,
57 concurrency: concurrency,
58 todo: make(map[models.Uid]*crawlWork),
59 inProgress: make(map[models.Uid]*crawlWork),
60 log: log,
61 done: make(chan struct{}),
62 }
63 go out.CatchupRepoGaugePoller()
64
65 return out, nil
66}
67
68func (c *CrawlDispatcher) Run() {
69 go c.mainLoop()
70
71 for i := 0; i < c.concurrency; i++ {
72 go c.fetchWorker()
73 }
74}
75
76func (c *CrawlDispatcher) Shutdown() {
77 close(c.done)
78}
79
80type catchupJob struct {
81 evt *comatproto.SyncSubscribeRepos_Commit
82 host *models.PDS
83 user *models.ActorInfo
84}
85
86type crawlWork struct {
87 act *models.ActorInfo
88 initScrape bool
89
90 // for events that come in while this actor's crawl is enqueued
91 // catchup items are processed during the crawl
92 catchup []*catchupJob
93
94 // for events that come in while this actor is being processed
95 // next items are processed after the crawl
96 next []*catchupJob
97}
98
99func (c *CrawlDispatcher) mainLoop() {
100 var nextDispatchedJob *crawlWork
101 var jobsAwaitingDispatch []*crawlWork
102
103 // dispatchQueue represents the repoSync worker channel to which we dispatch crawl work
104 var dispatchQueue chan *crawlWork
105
106 for {
107 select {
108 case actorToCrawl := <-c.ingest:
109 // TODO: max buffer size
110 crawlJob := c.enqueueJobForActor(actorToCrawl)
111 if crawlJob == nil {
112 break
113 }
114
115 if nextDispatchedJob == nil {
116 nextDispatchedJob = crawlJob
117 dispatchQueue = c.repoSync
118 } else {
119 jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob)
120 }
121 case dispatchQueue <- nextDispatchedJob:
122 c.dequeueJob(nextDispatchedJob)
123
124 if len(jobsAwaitingDispatch) > 0 {
125 nextDispatchedJob = jobsAwaitingDispatch[0]
126 jobsAwaitingDispatch = jobsAwaitingDispatch[1:]
127 } else {
128 nextDispatchedJob = nil
129 dispatchQueue = nil
130 }
131 case catchupJob := <-c.catchup:
132 // CatchupJobs are for processing events that come in while a crawl is in progress
133 // They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress
134 if nextDispatchedJob == nil {
135 nextDispatchedJob = catchupJob
136 dispatchQueue = c.repoSync
137 } else {
138 jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob)
139 }
140 case uid := <-c.complete:
141 c.maplk.Lock()
142
143 job, ok := c.inProgress[uid]
144 if !ok {
145 panic("should not be possible to not have a job in progress we receive a completion signal for")
146 }
147 delete(c.inProgress, uid)
148
149 // If there are any subsequent jobs for this UID, add it back to the todo list or buffer.
150 // We're basically pumping the `next` queue into the `catchup` queue and will do this over and over until the `next` queue is empty.
151 if len(job.next) > 0 {
152 c.todo[uid] = job
153 job.initScrape = false
154 job.catchup = job.next
155 job.next = nil
156 if nextDispatchedJob == nil {
157 nextDispatchedJob = job
158 dispatchQueue = c.repoSync
159 } else {
160 jobsAwaitingDispatch = append(jobsAwaitingDispatch, job)
161 }
162 }
163 c.maplk.Unlock()
164 }
165 }
166}
167
168// enqueueJobForActor adds a new crawl job to the todo list if there isn't already a job in progress for this actor
169func (c *CrawlDispatcher) enqueueJobForActor(ai *models.ActorInfo) *crawlWork {
170 c.maplk.Lock()
171 defer c.maplk.Unlock()
172 _, ok := c.inProgress[ai.Uid]
173 if ok {
174 return nil
175 }
176
177 _, has := c.todo[ai.Uid]
178 if has {
179 return nil
180 }
181
182 crawlJob := &crawlWork{
183 act: ai,
184 initScrape: true,
185 }
186 c.todo[ai.Uid] = crawlJob
187 return crawlJob
188}
189
190// dequeueJob removes a job from the todo list and adds it to the inProgress list
191func (c *CrawlDispatcher) dequeueJob(job *crawlWork) {
192 c.maplk.Lock()
193 defer c.maplk.Unlock()
194 delete(c.todo, job.act.Uid)
195 c.inProgress[job.act.Uid] = job
196}
197
198func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork {
199 c.maplk.Lock()
200 defer c.maplk.Unlock()
201
202 // If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl
203 job, ok := c.todo[catchup.user.Uid]
204 if ok {
205 catchupEventsEnqueued.WithLabelValues("todo").Inc()
206 job.catchup = append(job.catchup, catchup)
207 return nil
208 }
209
210 // If the actor crawl is in progress, we can append to the nextr queue which gets emptied after the crawl
211 job, ok = c.inProgress[catchup.user.Uid]
212 if ok {
213 catchupEventsEnqueued.WithLabelValues("prog").Inc()
214 job.next = append(job.next, catchup)
215 return nil
216 }
217
218 catchupEventsEnqueued.WithLabelValues("new").Inc()
219 // Otherwise, we need to create a new crawl job for this actor and enqueue it
220 cw := &crawlWork{
221 act: catchup.user,
222 catchup: []*catchupJob{catchup},
223 }
224 c.todo[catchup.user.Uid] = cw
225 return cw
226}
227
228func (c *CrawlDispatcher) fetchWorker() {
229 for {
230 select {
231 case job := <-c.repoSync:
232 if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil {
233 c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
234 }
235
236 // TODO: do we still just do this if it errors?
237 c.complete <- job.act.Uid
238 }
239 }
240}
241
242func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error {
243 if ai.PDS == 0 {
244 panic("must have pds for user in queue")
245 }
246
247 userCrawlsEnqueued.Inc()
248
249 ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler")
250 defer span.End()
251
252 select {
253 case c.ingest <- ai:
254 return nil
255 case <-ctx.Done():
256 return ctx.Err()
257 }
258}
259
260func (c *CrawlDispatcher) AddToCatchupQueue(ctx context.Context, host *models.PDS, u *models.ActorInfo, evt *comatproto.SyncSubscribeRepos_Commit) error {
261 if u.PDS == 0 {
262 panic("must have pds for user in queue")
263 }
264
265 catchup := &catchupJob{
266 evt: evt,
267 host: host,
268 user: u,
269 }
270
271 cw := c.addToCatchupQueue(catchup)
272 if cw == nil {
273 return nil
274 }
275
276 select {
277 case c.catchup <- cw:
278 return nil
279 case <-ctx.Done():
280 return ctx.Err()
281 }
282}
283
284func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bool {
285 c.maplk.Lock()
286 defer c.maplk.Unlock()
287 if _, ok := c.todo[uid]; ok {
288 return true
289 }
290
291 if _, ok := c.inProgress[uid]; ok {
292 return true
293 }
294
295 return false
296}
297
298func (c *CrawlDispatcher) countReposInSlowPath() int {
299 c.maplk.Lock()
300 defer c.maplk.Unlock()
301 return len(c.inProgress) + len(c.todo)
302}
303
304func (c *CrawlDispatcher) CatchupRepoGaugePoller() {
305 ticker := time.NewTicker(30 * time.Second)
306 defer ticker.Stop()
307 for {
308 select {
309 case <-c.done:
310 case <-ticker.C:
311 catchupReposGauge.Set(float64(c.countReposInSlowPath()))
312 }
313 }
314}