fork of indigo with slightly nicer lexgen
at main 7.6 kB view raw
1package indexer 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "sync" 8 "time" 9 10 comatproto "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/models" 12 13 "go.opentelemetry.io/otel" 14) 15 16type CrawlDispatcher struct { 17 // from Crawl() 18 ingest chan *models.ActorInfo 19 20 // from AddToCatchupQueue() 21 catchup chan *crawlWork 22 23 // from main loop to fetchWorker() 24 repoSync chan *crawlWork 25 26 complete chan models.Uid 27 28 maplk sync.Mutex 29 todo map[models.Uid]*crawlWork 30 inProgress map[models.Uid]*crawlWork 31 32 repoFetcher CrawlRepoFetcher 33 34 concurrency int 35 36 log *slog.Logger 37 38 done chan struct{} 39} 40 41// this is what we need of RepoFetcher 42type CrawlRepoFetcher interface { 43 FetchAndIndexRepo(ctx context.Context, job *crawlWork) error 44} 45 46func NewCrawlDispatcher(repoFetcher CrawlRepoFetcher, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) { 47 if concurrency < 1 { 48 return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency") 49 } 50 51 out := &CrawlDispatcher{ 52 ingest: make(chan *models.ActorInfo), 53 repoSync: make(chan *crawlWork), 54 complete: make(chan models.Uid), 55 catchup: make(chan *crawlWork), 56 repoFetcher: repoFetcher, 57 concurrency: concurrency, 58 todo: make(map[models.Uid]*crawlWork), 59 inProgress: make(map[models.Uid]*crawlWork), 60 log: log, 61 done: make(chan struct{}), 62 } 63 go out.CatchupRepoGaugePoller() 64 65 return out, nil 66} 67 68func (c *CrawlDispatcher) Run() { 69 go c.mainLoop() 70 71 for i := 0; i < c.concurrency; i++ { 72 go c.fetchWorker() 73 } 74} 75 76func (c *CrawlDispatcher) Shutdown() { 77 close(c.done) 78} 79 80type catchupJob struct { 81 evt *comatproto.SyncSubscribeRepos_Commit 82 host *models.PDS 83 user *models.ActorInfo 84} 85 86type crawlWork struct { 87 act *models.ActorInfo 88 initScrape bool 89 90 // for events that come in while this actor's crawl is enqueued 91 // catchup items are processed during the crawl 92 catchup []*catchupJob 93 94 // for events that come in while this actor is being processed 95 // next items are processed after the crawl 96 next []*catchupJob 97} 98 99func (c *CrawlDispatcher) mainLoop() { 100 var nextDispatchedJob *crawlWork 101 var jobsAwaitingDispatch []*crawlWork 102 103 // dispatchQueue represents the repoSync worker channel to which we dispatch crawl work 104 var dispatchQueue chan *crawlWork 105 106 for { 107 select { 108 case actorToCrawl := <-c.ingest: 109 // TODO: max buffer size 110 crawlJob := c.enqueueJobForActor(actorToCrawl) 111 if crawlJob == nil { 112 break 113 } 114 115 if nextDispatchedJob == nil { 116 nextDispatchedJob = crawlJob 117 dispatchQueue = c.repoSync 118 } else { 119 jobsAwaitingDispatch = append(jobsAwaitingDispatch, crawlJob) 120 } 121 case dispatchQueue <- nextDispatchedJob: 122 c.dequeueJob(nextDispatchedJob) 123 124 if len(jobsAwaitingDispatch) > 0 { 125 nextDispatchedJob = jobsAwaitingDispatch[0] 126 jobsAwaitingDispatch = jobsAwaitingDispatch[1:] 127 } else { 128 nextDispatchedJob = nil 129 dispatchQueue = nil 130 } 131 case catchupJob := <-c.catchup: 132 // CatchupJobs are for processing events that come in while a crawl is in progress 133 // They are lower priority than new crawls so we only add them to the queue if there isn't already a job in progress 134 if nextDispatchedJob == nil { 135 nextDispatchedJob = catchupJob 136 dispatchQueue = c.repoSync 137 } else { 138 jobsAwaitingDispatch = append(jobsAwaitingDispatch, catchupJob) 139 } 140 case uid := <-c.complete: 141 c.maplk.Lock() 142 143 job, ok := c.inProgress[uid] 144 if !ok { 145 panic("should not be possible to not have a job in progress we receive a completion signal for") 146 } 147 delete(c.inProgress, uid) 148 149 // If there are any subsequent jobs for this UID, add it back to the todo list or buffer. 150 // We're basically pumping the `next` queue into the `catchup` queue and will do this over and over until the `next` queue is empty. 151 if len(job.next) > 0 { 152 c.todo[uid] = job 153 job.initScrape = false 154 job.catchup = job.next 155 job.next = nil 156 if nextDispatchedJob == nil { 157 nextDispatchedJob = job 158 dispatchQueue = c.repoSync 159 } else { 160 jobsAwaitingDispatch = append(jobsAwaitingDispatch, job) 161 } 162 } 163 c.maplk.Unlock() 164 } 165 } 166} 167 168// enqueueJobForActor adds a new crawl job to the todo list if there isn't already a job in progress for this actor 169func (c *CrawlDispatcher) enqueueJobForActor(ai *models.ActorInfo) *crawlWork { 170 c.maplk.Lock() 171 defer c.maplk.Unlock() 172 _, ok := c.inProgress[ai.Uid] 173 if ok { 174 return nil 175 } 176 177 _, has := c.todo[ai.Uid] 178 if has { 179 return nil 180 } 181 182 crawlJob := &crawlWork{ 183 act: ai, 184 initScrape: true, 185 } 186 c.todo[ai.Uid] = crawlJob 187 return crawlJob 188} 189 190// dequeueJob removes a job from the todo list and adds it to the inProgress list 191func (c *CrawlDispatcher) dequeueJob(job *crawlWork) { 192 c.maplk.Lock() 193 defer c.maplk.Unlock() 194 delete(c.todo, job.act.Uid) 195 c.inProgress[job.act.Uid] = job 196} 197 198func (c *CrawlDispatcher) addToCatchupQueue(catchup *catchupJob) *crawlWork { 199 c.maplk.Lock() 200 defer c.maplk.Unlock() 201 202 // If the actor crawl is enqueued, we can append to the catchup queue which gets emptied during the crawl 203 job, ok := c.todo[catchup.user.Uid] 204 if ok { 205 catchupEventsEnqueued.WithLabelValues("todo").Inc() 206 job.catchup = append(job.catchup, catchup) 207 return nil 208 } 209 210 // If the actor crawl is in progress, we can append to the nextr queue which gets emptied after the crawl 211 job, ok = c.inProgress[catchup.user.Uid] 212 if ok { 213 catchupEventsEnqueued.WithLabelValues("prog").Inc() 214 job.next = append(job.next, catchup) 215 return nil 216 } 217 218 catchupEventsEnqueued.WithLabelValues("new").Inc() 219 // Otherwise, we need to create a new crawl job for this actor and enqueue it 220 cw := &crawlWork{ 221 act: catchup.user, 222 catchup: []*catchupJob{catchup}, 223 } 224 c.todo[catchup.user.Uid] = cw 225 return cw 226} 227 228func (c *CrawlDispatcher) fetchWorker() { 229 for { 230 select { 231 case job := <-c.repoSync: 232 if err := c.repoFetcher.FetchAndIndexRepo(context.TODO(), job); err != nil { 233 c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err) 234 } 235 236 // TODO: do we still just do this if it errors? 237 c.complete <- job.act.Uid 238 } 239 } 240} 241 242func (c *CrawlDispatcher) Crawl(ctx context.Context, ai *models.ActorInfo) error { 243 if ai.PDS == 0 { 244 panic("must have pds for user in queue") 245 } 246 247 userCrawlsEnqueued.Inc() 248 249 ctx, span := otel.Tracer("crawler").Start(ctx, "addToCrawler") 250 defer span.End() 251 252 select { 253 case c.ingest <- ai: 254 return nil 255 case <-ctx.Done(): 256 return ctx.Err() 257 } 258} 259 260func (c *CrawlDispatcher) AddToCatchupQueue(ctx context.Context, host *models.PDS, u *models.ActorInfo, evt *comatproto.SyncSubscribeRepos_Commit) error { 261 if u.PDS == 0 { 262 panic("must have pds for user in queue") 263 } 264 265 catchup := &catchupJob{ 266 evt: evt, 267 host: host, 268 user: u, 269 } 270 271 cw := c.addToCatchupQueue(catchup) 272 if cw == nil { 273 return nil 274 } 275 276 select { 277 case c.catchup <- cw: 278 return nil 279 case <-ctx.Done(): 280 return ctx.Err() 281 } 282} 283 284func (c *CrawlDispatcher) RepoInSlowPath(ctx context.Context, uid models.Uid) bool { 285 c.maplk.Lock() 286 defer c.maplk.Unlock() 287 if _, ok := c.todo[uid]; ok { 288 return true 289 } 290 291 if _, ok := c.inProgress[uid]; ok { 292 return true 293 } 294 295 return false 296} 297 298func (c *CrawlDispatcher) countReposInSlowPath() int { 299 c.maplk.Lock() 300 defer c.maplk.Unlock() 301 return len(c.inProgress) + len(c.todo) 302} 303 304func (c *CrawlDispatcher) CatchupRepoGaugePoller() { 305 ticker := time.NewTicker(30 * time.Second) 306 defer ticker.Stop() 307 for { 308 select { 309 case <-c.done: 310 case <-ticker.C: 311 catchupReposGauge.Set(float64(c.countReposInSlowPath())) 312 } 313 } 314}