at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::crawler::throttle::ThrottleHandle;
2use crate::db::keys::crawler_cursor_key;
3use crate::db::{Db, keys, ser_repo_state};
4use crate::state::AppState;
5use crate::types::RepoState;
6use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after, relay_id};
7use chrono::{DateTime, TimeDelta, Utc};
8use futures::FutureExt;
9use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput;
10use jacquard_api::com_atproto::sync::list_repos::ListReposOutput;
11use jacquard_common::{IntoStatic, types::string::Did};
12use miette::{Context, IntoDiagnostic, Result};
13use rand::Rng;
14use rand::RngExt;
15use rand::rngs::SmallRng;
16use reqwest::StatusCode;
17use scc::HashSet;
18use serde::{Deserialize, Serialize};
19use smol_str::{SmolStr, ToSmolStr, format_smolstr};
20use std::collections::HashMap;
21use std::ops::{Add, Mul, Sub};
22use std::sync::Arc;
23use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24use std::time::Duration;
25use tracing::{Instrument, debug, error, info, trace, warn};
26use url::Url;
27
28const MAX_RETRY_ATTEMPTS: u32 = 5;
29const MAX_RETRY_BATCH: usize = 1000;
30
31#[derive(Debug, Serialize, Deserialize)]
32struct RetryState {
33 after: DateTime<Utc>,
34 duration: TimeDelta,
35 attempts: u32,
36 #[serde(serialize_with = "crate::util::ser_status_code")]
37 #[serde(deserialize_with = "crate::util::deser_status_code")]
38 status: Option<StatusCode>,
39}
40
41impl RetryState {
42 fn new(secs: i64) -> Self {
43 let duration = TimeDelta::seconds(secs);
44 Self {
45 duration,
46 after: Utc::now().add(duration),
47 attempts: 0,
48 status: None,
49 }
50 }
51
52 /// returns the next retry state with doubled duration and incremented attempt count,
53 /// or `None` if the attempt count would reach the cap (entry left in db as-is).
54 fn next_attempt(self) -> Option<Self> {
55 let attempts = self.attempts + 1;
56 if attempts >= MAX_RETRY_ATTEMPTS {
57 return None;
58 }
59 let duration = self.duration * 2;
60 Some(Self {
61 after: Utc::now().add(duration),
62 duration,
63 attempts,
64 status: None,
65 })
66 }
67
68 fn with_status(mut self, code: StatusCode) -> Self {
69 self.status = Some(code);
70 self
71 }
72}
73
74trait ToRetryState {
75 fn to_retry_state(&self) -> RetryState;
76}
77
78impl ToRetryState for ThrottleHandle {
79 fn to_retry_state(&self) -> RetryState {
80 let after = chrono::DateTime::from_timestamp_secs(self.throttled_until()).unwrap();
81 RetryState {
82 duration: after.sub(Utc::now()),
83 after,
84 attempts: 0,
85 status: None,
86 }
87 }
88}
89
90enum CrawlCheckResult {
91 Signal,
92 NoSignal,
93 Retry(RetryState),
94}
95
96impl From<RetryState> for CrawlCheckResult {
97 fn from(value: RetryState) -> Self {
98 Self::Retry(value)
99 }
100}
101
102fn is_throttle_worthy(e: &reqwest::Error) -> bool {
103 use std::error::Error;
104
105 if e.is_timeout() {
106 return true;
107 }
108
109 let mut src = e.source();
110 while let Some(s) = src {
111 if let Some(io_err) = s.downcast_ref::<std::io::Error>() {
112 if is_tls_cert_error(io_err) {
113 return true;
114 }
115 }
116 src = s.source();
117 }
118
119 e.status().map_or(false, |s| {
120 matches!(
121 s,
122 StatusCode::BAD_GATEWAY
123 | StatusCode::SERVICE_UNAVAILABLE
124 | StatusCode::GATEWAY_TIMEOUT
125 | crate::util::CONNECTION_TIMEOUT
126 | crate::util::SITE_FROZEN
127 )
128 })
129}
130
131fn is_tls_cert_error(io_err: &std::io::Error) -> bool {
132 let Some(inner) = io_err.get_ref() else {
133 return false;
134 };
135 if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() {
136 return matches!(rustls_err, rustls::Error::InvalidCertificate(_));
137 }
138 if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() {
139 return is_tls_cert_error(nested_io);
140 }
141 false
142}
143
144#[derive(Debug, Serialize, Deserialize)]
145enum Cursor {
146 Done(SmolStr),
147 Next(Option<SmolStr>),
148}
149
150pub mod throttle;
151use throttle::{OrFailure, Throttler};
152
153type InFlight = Arc<HashSet<Did<'static>>>;
154
155struct InFlightGuard {
156 set: InFlight,
157 did: Did<'static>,
158}
159
160impl Drop for InFlightGuard {
161 fn drop(&mut self) {
162 self.set.remove_sync(&self.did);
163 }
164}
165
166#[must_use]
167struct InFlightRepos {
168 repos: Vec<Did<'static>>,
169 guards: Vec<InFlightGuard>,
170}
171
172pub struct Crawler {
173 state: Arc<AppState>,
174 relays: Vec<Url>,
175 http: reqwest::Client,
176 max_pending: usize,
177 resume_pending: usize,
178 count: AtomicUsize,
179 crawled_count: AtomicUsize,
180 throttled: AtomicBool,
181 pds_throttler: Throttler,
182 in_flight: InFlight,
183}
184
185impl Crawler {
186 pub fn new(
187 state: Arc<AppState>,
188 relay_hosts: Vec<Url>,
189 max_pending: usize,
190 resume_pending: usize,
191 ) -> Self {
192 let http = reqwest::Client::builder()
193 .user_agent(concat!(
194 env!("CARGO_PKG_NAME"),
195 "/",
196 env!("CARGO_PKG_VERSION")
197 ))
198 .gzip(true)
199 .build()
200 .expect("that reqwest will build");
201
202 Self {
203 state,
204 relays: relay_hosts,
205 http,
206 max_pending,
207 resume_pending,
208 count: AtomicUsize::new(0),
209 crawled_count: AtomicUsize::new(0),
210 throttled: AtomicBool::new(false),
211 pds_throttler: Throttler::new(),
212 in_flight: Arc::new(HashSet::new()),
213 }
214 }
215
216 async fn get_cursor(&self, relay_host: &Url) -> Result<Cursor> {
217 let key = crawler_cursor_key(&relay_id(relay_host));
218 let cursor_bytes = Db::get(self.state.db.cursors.clone(), &key).await?;
219 let cursor: Cursor = cursor_bytes
220 .as_deref()
221 .map(rmp_serde::from_slice)
222 .transpose()
223 .into_diagnostic()
224 .wrap_err("can't parse cursor")?
225 .unwrap_or(Cursor::Next(None));
226 Ok(cursor)
227 }
228
229 pub async fn run(self) -> Result<()> {
230 let crawler = Arc::new(self);
231
232 // stats ticker
233 let ticker = tokio::spawn({
234 use std::time::Instant;
235 let crawler = crawler.clone();
236 let mut last_time = Instant::now();
237 let mut interval = tokio::time::interval(Duration::from_secs(60));
238 async move {
239 loop {
240 interval.tick().await;
241 let delta_processed = crawler.count.swap(0, Ordering::Relaxed);
242 let delta_crawled = crawler.crawled_count.swap(0, Ordering::Relaxed);
243 let is_throttled = crawler.throttled.load(Ordering::Relaxed);
244
245 crawler.pds_throttler.evict_clean().await;
246
247 if delta_processed == 0 && delta_crawled == 0 {
248 if is_throttled {
249 info!("throttled: pending queue full");
250 } else {
251 info!("idle: no repos crawled or processed in 60s");
252 }
253 continue;
254 }
255
256 let elapsed = last_time.elapsed().as_secs_f64();
257
258 // fetch all cursors
259 use futures::future::join_all;
260 let cursor_futures: Vec<_> = crawler
261 .relays
262 .iter()
263 .map(|relay_host| {
264 let domain = relay_host.host_str().unwrap_or("unknown");
265 let relay_host = relay_host.clone();
266 let crawler = crawler.clone();
267 async move {
268 let cursor_str = match crawler.get_cursor(&relay_host).await {
269 Ok(c) => match c {
270 Cursor::Done(c) => format_smolstr!("done({c})"),
271 Cursor::Next(None) => "none".to_smolstr(),
272 Cursor::Next(Some(c)) => c.to_smolstr(),
273 },
274 Err(e) => e.to_smolstr(),
275 };
276 format_smolstr!("{domain}={cursor_str}")
277 }
278 })
279 .collect();
280
281 let cursors: Vec<_> = join_all(cursor_futures).await.into_iter().collect();
282
283 let cursors_display = if cursors.is_empty() {
284 "none".to_smolstr()
285 } else {
286 cursors.join(", ").into()
287 };
288
289 info!(
290 cursors = %cursors_display,
291 processed = delta_processed,
292 crawled = delta_crawled,
293 elapsed,
294 "progress"
295 );
296 last_time = Instant::now();
297 }
298 }
299 });
300 tokio::spawn(async move {
301 let Err(e) = ticker.await;
302 error!(err = ?e, "stats ticker panicked, aborting");
303 std::process::abort();
304 });
305
306 // retry thread
307 std::thread::spawn({
308 let crawler = crawler.clone();
309 let handle = tokio::runtime::Handle::current();
310 move || {
311 use std::thread::sleep;
312
313 let _g = handle.enter();
314
315 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
316 loop {
317 match crawler.process_retry_queue() {
318 Ok(Some(dur)) => sleep(dur.max(Duration::from_secs(1))),
319 Ok(None) => sleep(Duration::from_secs(60)),
320 Err(e) => {
321 error!(err = %e, "retry loop failed");
322 sleep(Duration::from_secs(60));
323 }
324 }
325 }
326 }));
327 if result.is_err() {
328 error!("retry thread panicked, aborting");
329 std::process::abort();
330 }
331 }
332 });
333
334 info!(
335 relay_count = crawler.relays.len(),
336 hosts = ?crawler.relays,
337 "starting crawler"
338 );
339
340 let mut tasks = tokio::task::JoinSet::new();
341 for url in crawler.relays.clone() {
342 let crawler = crawler.clone();
343 let span = tracing::info_span!("crawl", %url);
344 tasks.spawn(
345 async move {
346 loop {
347 if let Err(e) = Self::crawl(crawler.clone(), &url).await {
348 error!(err = ?e, "fatal error, restarting in 30s");
349 tokio::time::sleep(Duration::from_secs(30)).await;
350 }
351 }
352 }
353 .instrument(span),
354 );
355 }
356 let _ = tasks.join_all().await;
357
358 Ok(())
359 }
360
361 fn base_url(url: &Url) -> Result<Url> {
362 let mut url = url.clone();
363 match url.scheme() {
364 "wss" => url
365 .set_scheme("https")
366 .map_err(|_| miette::miette!("invalid url: {url}"))?,
367 "ws" => url
368 .set_scheme("http")
369 .map_err(|_| miette::miette!("invalid url: {url}"))?,
370 _ => {}
371 }
372 Ok(url)
373 }
374
375 async fn crawl(crawler: Arc<Self>, relay_host: &Url) -> Result<()> {
376 let base_url = Self::base_url(relay_host)?;
377
378 let mut rng: SmallRng = rand::make_rng();
379 let db = &crawler.state.db;
380
381 let mut cursor = crawler.get_cursor(relay_host).await?;
382
383 match &cursor {
384 Cursor::Next(Some(c)) => info!(cursor = %c, "resuming"),
385 Cursor::Next(None) => info!("starting from scratch"),
386 Cursor::Done(c) => info!(cursor = %c, "was done, resuming"),
387 }
388
389 let mut was_throttled = false;
390 loop {
391 // throttle check
392 loop {
393 let pending = crawler.state.db.get_count("pending").await;
394 if pending > crawler.max_pending as u64 {
395 if !was_throttled {
396 debug!(
397 pending,
398 max = crawler.max_pending,
399 "throttling: above max pending"
400 );
401 was_throttled = true;
402 crawler.throttled.store(true, Ordering::Relaxed);
403 }
404 tokio::time::sleep(Duration::from_secs(5)).await;
405 } else if pending > crawler.resume_pending as u64 {
406 if !was_throttled {
407 debug!(
408 pending,
409 resume = crawler.resume_pending,
410 "throttling: entering cooldown"
411 );
412 was_throttled = true;
413 crawler.throttled.store(true, Ordering::Relaxed);
414 }
415
416 loop {
417 let current_pending = crawler.state.db.get_count("pending").await;
418 if current_pending <= crawler.resume_pending as u64 {
419 break;
420 }
421 debug!(
422 pending = current_pending,
423 resume = crawler.resume_pending,
424 "cooldown, waiting"
425 );
426 tokio::time::sleep(Duration::from_secs(5)).await;
427 }
428 break;
429 } else {
430 if was_throttled {
431 info!("throttling released");
432 was_throttled = false;
433 crawler.throttled.store(false, Ordering::Relaxed);
434 }
435 break;
436 }
437 }
438
439 let mut list_repos_url = base_url
440 .join("/xrpc/com.atproto.sync.listRepos")
441 .into_diagnostic()?;
442 list_repos_url
443 .query_pairs_mut()
444 .append_pair("limit", "1000");
445 if let Cursor::Next(Some(c)) | Cursor::Done(c) = &cursor {
446 list_repos_url
447 .query_pairs_mut()
448 .append_pair("cursor", c.as_str());
449 }
450
451 let fetch_result = (|| {
452 crawler
453 .http
454 .get(list_repos_url.clone())
455 .send()
456 .error_for_status()
457 })
458 .retry(5, |e: &reqwest::Error, attempt| {
459 matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS))
460 .then(|| Duration::from_secs(1 << attempt.min(5)))
461 })
462 .await;
463
464 let res = match fetch_result {
465 Ok(r) => r,
466 Err(RetryOutcome::Ratelimited) => {
467 warn!("rate limited by relay after retries");
468 continue;
469 }
470 Err(RetryOutcome::Failed(e)) => {
471 error!(err = %e, "crawler failed to fetch listRepos");
472 continue;
473 }
474 };
475
476 let bytes = match res.bytes().await {
477 Ok(b) => b,
478 Err(e) => {
479 error!(err = %e, "cant read listRepos response");
480 continue;
481 }
482 };
483
484 let mut batch = db.inner.batch();
485 let filter = crawler.state.filter.load();
486
487 struct ParseResult {
488 unknown_dids: Vec<Did<'static>>,
489 cursor: Option<smol_str::SmolStr>,
490 count: usize,
491 }
492
493 const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30);
494
495 let parse_result = {
496 let repos = db.repos.clone();
497 let filter_ks = db.filter.clone();
498 let crawler_ks = db.crawler.clone();
499
500 // this wont actually cancel the task since spawn_blocking isnt cancel safe
501 // but at least we'll see whats going on?
502 tokio::time::timeout(
503 BLOCKING_TASK_TIMEOUT,
504 tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> {
505 let output = match serde_json::from_slice::<ListReposOutput>(&bytes) {
506 Ok(out) => out.into_static(),
507 Err(e) => {
508 error!(err = %e, "failed to parse listRepos response");
509 return Ok(None);
510 }
511 };
512
513 if output.repos.is_empty() {
514 return Ok(None);
515 }
516
517 let count = output.repos.len();
518 let next_cursor = output.cursor.map(|c| c.as_str().into());
519 let mut unknown = Vec::new();
520 for repo in output.repos {
521 let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?;
522 if filter_ks.contains_key(&excl_key).into_diagnostic()? {
523 continue;
524 }
525
526 // already in retry queue — let the retry thread handle it
527 let retry_key = keys::crawler_retry_key(&repo.did);
528 if crawler_ks.contains_key(&retry_key).into_diagnostic()? {
529 continue;
530 }
531
532 let did_key = keys::repo_key(&repo.did);
533 if !repos.contains_key(&did_key).into_diagnostic()? {
534 unknown.push(repo.did.into_static());
535 }
536 }
537
538 Ok(Some(ParseResult {
539 unknown_dids: unknown,
540 cursor: next_cursor,
541 count,
542 }))
543 }),
544 )
545 .await
546 }
547 .into_diagnostic()?
548 .map_err(|_| {
549 error!(
550 "spawn_blocking task for parsing listRepos timed out after {}",
551 BLOCKING_TASK_TIMEOUT.as_secs()
552 );
553 miette::miette!("spawn_blocking task for parsing listRepos timed out")
554 })?;
555
556 let ParseResult {
557 unknown_dids,
558 cursor: next_cursor,
559 count,
560 } = match parse_result {
561 Ok(Some(res)) => res,
562 Ok(None) => {
563 info!("finished enumeration (or empty page)");
564 if let Cursor::Next(Some(c)) = cursor {
565 info!("reached end of list.");
566 cursor = Cursor::Done(c);
567 }
568 info!("sleeping 1h before next enumeration pass");
569 tokio::time::sleep(Duration::from_secs(3600)).await;
570 info!("resuming after 1h sleep");
571 continue;
572 }
573 Err(e) => return Err(e).wrap_err("error while crawling"),
574 };
575
576 debug!(count, "fetched repos");
577 crawler.crawled_count.fetch_add(count, Ordering::Relaxed);
578
579 let in_flight = if filter.check_signals() && !unknown_dids.is_empty() {
580 // we dont need to pass any existing since we have none; we are crawling after all
581 crawler
582 .check_signals_batch(&unknown_dids, &filter, &mut batch, &HashMap::new())
583 .await?
584 } else {
585 // no signal checking but still need dedup to avoid orphan pending entries
586 crawler.acquire_in_flight(unknown_dids).await
587 };
588
589 for did in &in_flight.repos {
590 let did_key = keys::repo_key(did);
591 trace!(did = %did, "found new repo");
592
593 let state = RepoState::untracked(rng.next_u64());
594 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
595 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
596 }
597
598 if let Some(new_cursor) = next_cursor {
599 cursor = Cursor::Next(Some(new_cursor.as_str().into()));
600 } else if let Cursor::Next(Some(c)) = cursor {
601 info!("reached end of list.");
602 cursor = Cursor::Done(c);
603 }
604 batch.insert(
605 &db.cursors,
606 crawler_cursor_key(&relay_id(relay_host)),
607 rmp_serde::to_vec(&cursor)
608 .into_diagnostic()
609 .wrap_err("cant serialize cursor")?,
610 );
611
612 tokio::time::timeout(
613 BLOCKING_TASK_TIMEOUT,
614 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()),
615 )
616 .await
617 .into_diagnostic()?
618 .map_err(|_| {
619 error!(
620 "spawn_blocking task for batch commit timed out after {}",
621 BLOCKING_TASK_TIMEOUT.as_secs()
622 );
623 miette::miette!("spawn_blocking task for batch commit timed out")
624 })?
625 .inspect_err(|e| {
626 error!(err = ?e, "batch commit failed");
627 })
628 .ok();
629
630 drop(in_flight.guards);
631
632 crawler.account_new_repos(in_flight.repos.len()).await;
633
634 if matches!(cursor, Cursor::Done(_)) {
635 info!("enumeration complete, sleeping 1h before next pass");
636 tokio::time::sleep(Duration::from_secs(3600)).await;
637 info!("resuming after 1h sleep");
638 }
639 }
640 }
641
642 fn process_retry_queue(&self) -> Result<Option<Duration>> {
643 let db = &self.state.db;
644 let now = Utc::now();
645
646 let mut ready: Vec<Did> = Vec::new();
647 let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new();
648 let mut next_wake: Option<Duration> = None;
649 let mut had_more = false;
650
651 let mut rng: SmallRng = rand::make_rng();
652
653 let mut batch = db.inner.batch();
654 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) {
655 let (key, val) = guard.into_inner().into_diagnostic()?;
656 let state: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?;
657 let did = keys::crawler_retry_parse_key(&key)?.to_did();
658
659 // leave capped entries alone for API inspection
660 if state.attempts >= MAX_RETRY_ATTEMPTS {
661 continue;
662 }
663
664 let backoff = TimeDelta::seconds(
665 state
666 .duration
667 .as_seconds_f64()
668 .mul(rng.random_range(0.01..0.07)) as i64,
669 );
670 if state.after + backoff > now {
671 let wake = (state.after - now).to_std().unwrap_or(Duration::ZERO);
672 next_wake = Some(next_wake.map(|w| w.min(wake)).unwrap_or(wake));
673 continue;
674 }
675
676 if ready.len() >= MAX_RETRY_BATCH {
677 had_more = true;
678 break;
679 }
680
681 ready.push(did.clone());
682 existing.insert(did, state);
683 }
684
685 if ready.is_empty() {
686 return Ok(next_wake);
687 }
688
689 debug!(count = ready.len(), "retrying pending repos");
690
691 let handle = tokio::runtime::Handle::current();
692 let filter = self.state.filter.load();
693 let in_flight =
694 handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch, &existing))?;
695
696 let mut rng: SmallRng = rand::make_rng();
697 for did in &in_flight.repos {
698 let did_key = keys::repo_key(did);
699
700 if db.repos.contains_key(&did_key).into_diagnostic()? {
701 continue;
702 }
703
704 let state = RepoState::untracked(rng.next_u64());
705 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?);
706 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key);
707 }
708
709 batch.commit().into_diagnostic()?;
710
711 drop(in_flight.guards);
712
713 if !in_flight.repos.is_empty() {
714 info!(count = in_flight.repos.len(), "recovered from retry queue");
715 handle.block_on(self.account_new_repos(in_flight.repos.len()));
716 }
717
718 // if we hit the batch cap there are more ready entries, loop back immediately
719 Ok(had_more.then_some(Duration::ZERO).or(next_wake))
720 }
721
722 fn check_repo_signals(
723 &self,
724 filter: Arc<crate::filter::FilterConfig>,
725 did: Did<'static>,
726 ) -> impl Future<Output = (Did<'static>, CrawlCheckResult)> + Send + 'static {
727 let resolver = self.state.resolver.clone();
728 let http = self.http.clone();
729 let throttler = self.pds_throttler.clone();
730 async move {
731 const MAX_RETRIES: u32 = 5;
732
733 let pds_url = (|| resolver.resolve_identity_info(&did))
734 .retry(MAX_RETRIES, |e, attempt| {
735 matches!(e, crate::resolver::ResolverError::Ratelimited)
736 .then(|| Duration::from_secs(1 << attempt.min(5)))
737 })
738 .await;
739
740 let pds_url = match pds_url {
741 Ok((url, _)) => url,
742 Err(RetryOutcome::Ratelimited) => {
743 error!(
744 retries = MAX_RETRIES,
745 "rate limited resolving identity, giving up"
746 );
747 // no pds handle to read retry_after from; use a short default
748 return (did, RetryState::new(60).into());
749 }
750 Err(RetryOutcome::Failed(e)) => {
751 error!(err = %e, "failed to resolve identity");
752 return (did, RetryState::new(60).into());
753 }
754 };
755
756 let throttle = throttler.get_handle(&pds_url).await;
757 if throttle.is_throttled() {
758 trace!(host = pds_url.host_str(), "skipping throttled pds");
759 return (did, throttle.to_retry_state().into());
760 }
761
762 let _permit = throttle.acquire().unit_error().or_failure(&throttle, || ());
763 let Ok(_permit) = _permit.await else {
764 trace!(
765 host = pds_url.host_str(),
766 "pds failed while waiting for permit"
767 );
768 return (did, throttle.to_retry_state().into());
769 };
770
771 enum RequestError {
772 Reqwest(reqwest::Error),
773 RateLimited(Option<u64>),
774 /// hard failure notification from another task on this PDS
775 Throttled,
776 }
777
778 let mut describe_url = pds_url.join("/xrpc/com.atproto.repo.describeRepo").unwrap();
779 describe_url.query_pairs_mut().append_pair("repo", &did);
780
781 let resp = async {
782 let resp = http
783 .get(describe_url)
784 .timeout(throttle.timeout())
785 .send()
786 .await
787 .map_err(RequestError::Reqwest)?;
788
789 // dont retry ratelimits since we will just put it in a queue to be tried again later
790 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
791 return Err(RequestError::RateLimited(parse_retry_after(&resp)));
792 }
793
794 resp.error_for_status().map_err(RequestError::Reqwest)
795 }
796 .or_failure(&throttle, || RequestError::Throttled)
797 .await;
798
799 let resp = match resp {
800 Ok(r) => {
801 throttle.record_success();
802 r
803 }
804 Err(RequestError::RateLimited(secs)) => {
805 throttle.record_ratelimit(secs);
806 return (
807 did,
808 throttle
809 .to_retry_state()
810 .with_status(StatusCode::TOO_MANY_REQUESTS)
811 .into(),
812 );
813 }
814 Err(RequestError::Throttled) => {
815 return (did, throttle.to_retry_state().into());
816 }
817 Err(RequestError::Reqwest(e)) => {
818 if e.is_timeout() && !throttle.record_timeout() {
819 // first or second timeout, just requeue
820 let mut retry_state = RetryState::new(60);
821 retry_state.status = e.status();
822 return (did, retry_state.into());
823 }
824 // third timeout, if timeout fail is_throttle_worthy will ban the pds
825
826 if is_throttle_worthy(&e) {
827 if let Some(mins) = throttle.record_failure() {
828 warn!(url = %pds_url, mins, "throttling pds due to hard failure");
829 }
830 let mut retry_state = throttle.to_retry_state();
831 retry_state.status = e.status();
832 return (did, retry_state.into());
833 }
834
835 match e.status() {
836 Some(StatusCode::NOT_FOUND | StatusCode::GONE) => {
837 trace!("repo not found");
838 return (did, CrawlCheckResult::NoSignal);
839 }
840 Some(s) if s.is_client_error() => {
841 error!(status = %s, "repo unavailable");
842 return (did, CrawlCheckResult::NoSignal);
843 }
844 _ => {
845 error!(err = %e, "repo errored");
846 let mut retry_state = RetryState::new(60 * 15);
847 retry_state.status = e.status();
848 return (did, retry_state.into());
849 }
850 }
851 }
852 };
853
854 let bytes = match resp.bytes().await {
855 Ok(b) => b,
856 Err(e) => {
857 error!(err = %e, "failed to read describeRepo response");
858 return (did, RetryState::new(60 * 5).into());
859 }
860 };
861
862 let out = match serde_json::from_slice::<DescribeRepoOutput>(&bytes) {
863 Ok(out) => out,
864 Err(e) => {
865 error!(err = %e, "failed to parse describeRepo response");
866 return (did, RetryState::new(60 * 10).into());
867 }
868 };
869
870 let found_signal = out
871 .collections
872 .iter()
873 .any(|col| filter.matches_signal(col.as_str()));
874
875 if !found_signal {
876 trace!("no signal-matching collections found");
877 }
878
879 return (
880 did,
881 found_signal
882 .then_some(CrawlCheckResult::Signal)
883 .unwrap_or(CrawlCheckResult::NoSignal),
884 );
885 }
886 }
887
888 async fn check_signals_batch(
889 &self,
890 repos: &[Did<'static>],
891 filter: &Arc<crate::filter::FilterConfig>,
892 batch: &mut fjall::OwnedWriteBatch,
893 existing: &HashMap<Did<'static>, RetryState>,
894 ) -> Result<InFlightRepos> {
895 let db = &self.state.db;
896 let in_flight = self.acquire_in_flight(repos.to_vec()).await;
897 let mut valid = Vec::with_capacity(in_flight.repos.len());
898 let mut set = tokio::task::JoinSet::new();
899
900 for did in in_flight.repos {
901 let filter = filter.clone();
902 let span = tracing::info_span!("signals", did = %did);
903 set.spawn(
904 self.check_repo_signals(filter, did.clone())
905 .instrument(span),
906 );
907 }
908
909 while let Some(res) = tokio::time::timeout(Duration::from_secs(60), set.join_next())
910 .await
911 .into_diagnostic()
912 .map_err(|_| {
913 error!("signal check task timed out after 60s");
914 miette::miette!("signal check task timed out")
915 })?
916 {
917 let (did, result) = match res {
918 Ok(inner) => inner,
919 Err(e) => {
920 error!(err = ?e, "signal check panicked");
921 continue;
922 }
923 };
924
925 match result {
926 CrawlCheckResult::Signal => {
927 batch.remove(&db.crawler, keys::crawler_retry_key(&did));
928 valid.push(did);
929 }
930 CrawlCheckResult::NoSignal => {
931 batch.remove(&db.crawler, keys::crawler_retry_key(&did));
932 }
933 CrawlCheckResult::Retry(state) => {
934 let prev_attempts = existing.get(&did).map(|s| s.attempts).unwrap_or(0);
935 let carried = RetryState {
936 attempts: prev_attempts,
937 ..state
938 };
939 let next = match carried.next_attempt() {
940 Some(next) => next,
941 None => RetryState {
942 attempts: MAX_RETRY_ATTEMPTS,
943 ..state
944 },
945 };
946 batch.insert(
947 &db.crawler,
948 keys::crawler_retry_key(&did),
949 rmp_serde::to_vec(&next).into_diagnostic()?,
950 );
951 }
952 }
953 }
954
955 Ok(InFlightRepos {
956 repos: valid,
957 guards: in_flight.guards,
958 })
959 }
960
961 async fn acquire_in_flight(&self, dids: Vec<Did<'static>>) -> InFlightRepos {
962 let mut filtered = Vec::with_capacity(dids.len());
963 let mut guards = Vec::with_capacity(dids.len());
964 for did in dids {
965 if self.in_flight.insert_async(did.clone()).await.is_err() {
966 trace!(did = %did, "repo in-flight, skipping");
967 continue;
968 }
969 guards.push(InFlightGuard {
970 set: self.in_flight.clone(),
971 did: did.clone(),
972 });
973 filtered.push(did);
974 }
975 InFlightRepos {
976 guards,
977 repos: filtered,
978 }
979 }
980
981 async fn account_new_repos(&self, count: usize) {
982 if count == 0 {
983 return;
984 }
985
986 self.count.fetch_add(count, Ordering::Relaxed);
987 self.state
988 .db
989 .update_count_async("repos", count as i64)
990 .await;
991 self.state
992 .db
993 .update_count_async("pending", count as i64)
994 .await;
995 self.state.notify_backfill();
996 }
997}