at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 997 lines 36 kB view raw
1use crate::crawler::throttle::ThrottleHandle; 2use crate::db::keys::crawler_cursor_key; 3use crate::db::{Db, keys, ser_repo_state}; 4use crate::state::AppState; 5use crate::types::RepoState; 6use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after, relay_id}; 7use chrono::{DateTime, TimeDelta, Utc}; 8use futures::FutureExt; 9use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput; 10use jacquard_api::com_atproto::sync::list_repos::ListReposOutput; 11use jacquard_common::{IntoStatic, types::string::Did}; 12use miette::{Context, IntoDiagnostic, Result}; 13use rand::Rng; 14use rand::RngExt; 15use rand::rngs::SmallRng; 16use reqwest::StatusCode; 17use scc::HashSet; 18use serde::{Deserialize, Serialize}; 19use smol_str::{SmolStr, ToSmolStr, format_smolstr}; 20use std::collections::HashMap; 21use std::ops::{Add, Mul, Sub}; 22use std::sync::Arc; 23use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 24use std::time::Duration; 25use tracing::{Instrument, debug, error, info, trace, warn}; 26use url::Url; 27 28const MAX_RETRY_ATTEMPTS: u32 = 5; 29const MAX_RETRY_BATCH: usize = 1000; 30 31#[derive(Debug, Serialize, Deserialize)] 32struct RetryState { 33 after: DateTime<Utc>, 34 duration: TimeDelta, 35 attempts: u32, 36 #[serde(serialize_with = "crate::util::ser_status_code")] 37 #[serde(deserialize_with = "crate::util::deser_status_code")] 38 status: Option<StatusCode>, 39} 40 41impl RetryState { 42 fn new(secs: i64) -> Self { 43 let duration = TimeDelta::seconds(secs); 44 Self { 45 duration, 46 after: Utc::now().add(duration), 47 attempts: 0, 48 status: None, 49 } 50 } 51 52 /// returns the next retry state with doubled duration and incremented attempt count, 53 /// or `None` if the attempt count would reach the cap (entry left in db as-is). 54 fn next_attempt(self) -> Option<Self> { 55 let attempts = self.attempts + 1; 56 if attempts >= MAX_RETRY_ATTEMPTS { 57 return None; 58 } 59 let duration = self.duration * 2; 60 Some(Self { 61 after: Utc::now().add(duration), 62 duration, 63 attempts, 64 status: None, 65 }) 66 } 67 68 fn with_status(mut self, code: StatusCode) -> Self { 69 self.status = Some(code); 70 self 71 } 72} 73 74trait ToRetryState { 75 fn to_retry_state(&self) -> RetryState; 76} 77 78impl ToRetryState for ThrottleHandle { 79 fn to_retry_state(&self) -> RetryState { 80 let after = chrono::DateTime::from_timestamp_secs(self.throttled_until()).unwrap(); 81 RetryState { 82 duration: after.sub(Utc::now()), 83 after, 84 attempts: 0, 85 status: None, 86 } 87 } 88} 89 90enum CrawlCheckResult { 91 Signal, 92 NoSignal, 93 Retry(RetryState), 94} 95 96impl From<RetryState> for CrawlCheckResult { 97 fn from(value: RetryState) -> Self { 98 Self::Retry(value) 99 } 100} 101 102fn is_throttle_worthy(e: &reqwest::Error) -> bool { 103 use std::error::Error; 104 105 if e.is_timeout() { 106 return true; 107 } 108 109 let mut src = e.source(); 110 while let Some(s) = src { 111 if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 112 if is_tls_cert_error(io_err) { 113 return true; 114 } 115 } 116 src = s.source(); 117 } 118 119 e.status().map_or(false, |s| { 120 matches!( 121 s, 122 StatusCode::BAD_GATEWAY 123 | StatusCode::SERVICE_UNAVAILABLE 124 | StatusCode::GATEWAY_TIMEOUT 125 | crate::util::CONNECTION_TIMEOUT 126 | crate::util::SITE_FROZEN 127 ) 128 }) 129} 130 131fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 132 let Some(inner) = io_err.get_ref() else { 133 return false; 134 }; 135 if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 136 return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 137 } 138 if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 139 return is_tls_cert_error(nested_io); 140 } 141 false 142} 143 144#[derive(Debug, Serialize, Deserialize)] 145enum Cursor { 146 Done(SmolStr), 147 Next(Option<SmolStr>), 148} 149 150pub mod throttle; 151use throttle::{OrFailure, Throttler}; 152 153type InFlight = Arc<HashSet<Did<'static>>>; 154 155struct InFlightGuard { 156 set: InFlight, 157 did: Did<'static>, 158} 159 160impl Drop for InFlightGuard { 161 fn drop(&mut self) { 162 self.set.remove_sync(&self.did); 163 } 164} 165 166#[must_use] 167struct InFlightRepos { 168 repos: Vec<Did<'static>>, 169 guards: Vec<InFlightGuard>, 170} 171 172pub struct Crawler { 173 state: Arc<AppState>, 174 relays: Vec<Url>, 175 http: reqwest::Client, 176 max_pending: usize, 177 resume_pending: usize, 178 count: AtomicUsize, 179 crawled_count: AtomicUsize, 180 throttled: AtomicBool, 181 pds_throttler: Throttler, 182 in_flight: InFlight, 183} 184 185impl Crawler { 186 pub fn new( 187 state: Arc<AppState>, 188 relay_hosts: Vec<Url>, 189 max_pending: usize, 190 resume_pending: usize, 191 ) -> Self { 192 let http = reqwest::Client::builder() 193 .user_agent(concat!( 194 env!("CARGO_PKG_NAME"), 195 "/", 196 env!("CARGO_PKG_VERSION") 197 )) 198 .gzip(true) 199 .build() 200 .expect("that reqwest will build"); 201 202 Self { 203 state, 204 relays: relay_hosts, 205 http, 206 max_pending, 207 resume_pending, 208 count: AtomicUsize::new(0), 209 crawled_count: AtomicUsize::new(0), 210 throttled: AtomicBool::new(false), 211 pds_throttler: Throttler::new(), 212 in_flight: Arc::new(HashSet::new()), 213 } 214 } 215 216 async fn get_cursor(&self, relay_host: &Url) -> Result<Cursor> { 217 let key = crawler_cursor_key(&relay_id(relay_host)); 218 let cursor_bytes = Db::get(self.state.db.cursors.clone(), &key).await?; 219 let cursor: Cursor = cursor_bytes 220 .as_deref() 221 .map(rmp_serde::from_slice) 222 .transpose() 223 .into_diagnostic() 224 .wrap_err("can't parse cursor")? 225 .unwrap_or(Cursor::Next(None)); 226 Ok(cursor) 227 } 228 229 pub async fn run(self) -> Result<()> { 230 let crawler = Arc::new(self); 231 232 // stats ticker 233 let ticker = tokio::spawn({ 234 use std::time::Instant; 235 let crawler = crawler.clone(); 236 let mut last_time = Instant::now(); 237 let mut interval = tokio::time::interval(Duration::from_secs(60)); 238 async move { 239 loop { 240 interval.tick().await; 241 let delta_processed = crawler.count.swap(0, Ordering::Relaxed); 242 let delta_crawled = crawler.crawled_count.swap(0, Ordering::Relaxed); 243 let is_throttled = crawler.throttled.load(Ordering::Relaxed); 244 245 crawler.pds_throttler.evict_clean().await; 246 247 if delta_processed == 0 && delta_crawled == 0 { 248 if is_throttled { 249 info!("throttled: pending queue full"); 250 } else { 251 info!("idle: no repos crawled or processed in 60s"); 252 } 253 continue; 254 } 255 256 let elapsed = last_time.elapsed().as_secs_f64(); 257 258 // fetch all cursors 259 use futures::future::join_all; 260 let cursor_futures: Vec<_> = crawler 261 .relays 262 .iter() 263 .map(|relay_host| { 264 let domain = relay_host.host_str().unwrap_or("unknown"); 265 let relay_host = relay_host.clone(); 266 let crawler = crawler.clone(); 267 async move { 268 let cursor_str = match crawler.get_cursor(&relay_host).await { 269 Ok(c) => match c { 270 Cursor::Done(c) => format_smolstr!("done({c})"), 271 Cursor::Next(None) => "none".to_smolstr(), 272 Cursor::Next(Some(c)) => c.to_smolstr(), 273 }, 274 Err(e) => e.to_smolstr(), 275 }; 276 format_smolstr!("{domain}={cursor_str}") 277 } 278 }) 279 .collect(); 280 281 let cursors: Vec<_> = join_all(cursor_futures).await.into_iter().collect(); 282 283 let cursors_display = if cursors.is_empty() { 284 "none".to_smolstr() 285 } else { 286 cursors.join(", ").into() 287 }; 288 289 info!( 290 cursors = %cursors_display, 291 processed = delta_processed, 292 crawled = delta_crawled, 293 elapsed, 294 "progress" 295 ); 296 last_time = Instant::now(); 297 } 298 } 299 }); 300 tokio::spawn(async move { 301 let Err(e) = ticker.await; 302 error!(err = ?e, "stats ticker panicked, aborting"); 303 std::process::abort(); 304 }); 305 306 // retry thread 307 std::thread::spawn({ 308 let crawler = crawler.clone(); 309 let handle = tokio::runtime::Handle::current(); 310 move || { 311 use std::thread::sleep; 312 313 let _g = handle.enter(); 314 315 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { 316 loop { 317 match crawler.process_retry_queue() { 318 Ok(Some(dur)) => sleep(dur.max(Duration::from_secs(1))), 319 Ok(None) => sleep(Duration::from_secs(60)), 320 Err(e) => { 321 error!(err = %e, "retry loop failed"); 322 sleep(Duration::from_secs(60)); 323 } 324 } 325 } 326 })); 327 if result.is_err() { 328 error!("retry thread panicked, aborting"); 329 std::process::abort(); 330 } 331 } 332 }); 333 334 info!( 335 relay_count = crawler.relays.len(), 336 hosts = ?crawler.relays, 337 "starting crawler" 338 ); 339 340 let mut tasks = tokio::task::JoinSet::new(); 341 for url in crawler.relays.clone() { 342 let crawler = crawler.clone(); 343 let span = tracing::info_span!("crawl", %url); 344 tasks.spawn( 345 async move { 346 loop { 347 if let Err(e) = Self::crawl(crawler.clone(), &url).await { 348 error!(err = ?e, "fatal error, restarting in 30s"); 349 tokio::time::sleep(Duration::from_secs(30)).await; 350 } 351 } 352 } 353 .instrument(span), 354 ); 355 } 356 let _ = tasks.join_all().await; 357 358 Ok(()) 359 } 360 361 fn base_url(url: &Url) -> Result<Url> { 362 let mut url = url.clone(); 363 match url.scheme() { 364 "wss" => url 365 .set_scheme("https") 366 .map_err(|_| miette::miette!("invalid url: {url}"))?, 367 "ws" => url 368 .set_scheme("http") 369 .map_err(|_| miette::miette!("invalid url: {url}"))?, 370 _ => {} 371 } 372 Ok(url) 373 } 374 375 async fn crawl(crawler: Arc<Self>, relay_host: &Url) -> Result<()> { 376 let base_url = Self::base_url(relay_host)?; 377 378 let mut rng: SmallRng = rand::make_rng(); 379 let db = &crawler.state.db; 380 381 let mut cursor = crawler.get_cursor(relay_host).await?; 382 383 match &cursor { 384 Cursor::Next(Some(c)) => info!(cursor = %c, "resuming"), 385 Cursor::Next(None) => info!("starting from scratch"), 386 Cursor::Done(c) => info!(cursor = %c, "was done, resuming"), 387 } 388 389 let mut was_throttled = false; 390 loop { 391 // throttle check 392 loop { 393 let pending = crawler.state.db.get_count("pending").await; 394 if pending > crawler.max_pending as u64 { 395 if !was_throttled { 396 debug!( 397 pending, 398 max = crawler.max_pending, 399 "throttling: above max pending" 400 ); 401 was_throttled = true; 402 crawler.throttled.store(true, Ordering::Relaxed); 403 } 404 tokio::time::sleep(Duration::from_secs(5)).await; 405 } else if pending > crawler.resume_pending as u64 { 406 if !was_throttled { 407 debug!( 408 pending, 409 resume = crawler.resume_pending, 410 "throttling: entering cooldown" 411 ); 412 was_throttled = true; 413 crawler.throttled.store(true, Ordering::Relaxed); 414 } 415 416 loop { 417 let current_pending = crawler.state.db.get_count("pending").await; 418 if current_pending <= crawler.resume_pending as u64 { 419 break; 420 } 421 debug!( 422 pending = current_pending, 423 resume = crawler.resume_pending, 424 "cooldown, waiting" 425 ); 426 tokio::time::sleep(Duration::from_secs(5)).await; 427 } 428 break; 429 } else { 430 if was_throttled { 431 info!("throttling released"); 432 was_throttled = false; 433 crawler.throttled.store(false, Ordering::Relaxed); 434 } 435 break; 436 } 437 } 438 439 let mut list_repos_url = base_url 440 .join("/xrpc/com.atproto.sync.listRepos") 441 .into_diagnostic()?; 442 list_repos_url 443 .query_pairs_mut() 444 .append_pair("limit", "1000"); 445 if let Cursor::Next(Some(c)) | Cursor::Done(c) = &cursor { 446 list_repos_url 447 .query_pairs_mut() 448 .append_pair("cursor", c.as_str()); 449 } 450 451 let fetch_result = (|| { 452 crawler 453 .http 454 .get(list_repos_url.clone()) 455 .send() 456 .error_for_status() 457 }) 458 .retry(5, |e: &reqwest::Error, attempt| { 459 matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS)) 460 .then(|| Duration::from_secs(1 << attempt.min(5))) 461 }) 462 .await; 463 464 let res = match fetch_result { 465 Ok(r) => r, 466 Err(RetryOutcome::Ratelimited) => { 467 warn!("rate limited by relay after retries"); 468 continue; 469 } 470 Err(RetryOutcome::Failed(e)) => { 471 error!(err = %e, "crawler failed to fetch listRepos"); 472 continue; 473 } 474 }; 475 476 let bytes = match res.bytes().await { 477 Ok(b) => b, 478 Err(e) => { 479 error!(err = %e, "cant read listRepos response"); 480 continue; 481 } 482 }; 483 484 let mut batch = db.inner.batch(); 485 let filter = crawler.state.filter.load(); 486 487 struct ParseResult { 488 unknown_dids: Vec<Did<'static>>, 489 cursor: Option<smol_str::SmolStr>, 490 count: usize, 491 } 492 493 const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30); 494 495 let parse_result = { 496 let repos = db.repos.clone(); 497 let filter_ks = db.filter.clone(); 498 let crawler_ks = db.crawler.clone(); 499 500 // this wont actually cancel the task since spawn_blocking isnt cancel safe 501 // but at least we'll see whats going on? 502 tokio::time::timeout( 503 BLOCKING_TASK_TIMEOUT, 504 tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> { 505 let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 506 Ok(out) => out.into_static(), 507 Err(e) => { 508 error!(err = %e, "failed to parse listRepos response"); 509 return Ok(None); 510 } 511 }; 512 513 if output.repos.is_empty() { 514 return Ok(None); 515 } 516 517 let count = output.repos.len(); 518 let next_cursor = output.cursor.map(|c| c.as_str().into()); 519 let mut unknown = Vec::new(); 520 for repo in output.repos { 521 let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 522 if filter_ks.contains_key(&excl_key).into_diagnostic()? { 523 continue; 524 } 525 526 // already in retry queue — let the retry thread handle it 527 let retry_key = keys::crawler_retry_key(&repo.did); 528 if crawler_ks.contains_key(&retry_key).into_diagnostic()? { 529 continue; 530 } 531 532 let did_key = keys::repo_key(&repo.did); 533 if !repos.contains_key(&did_key).into_diagnostic()? { 534 unknown.push(repo.did.into_static()); 535 } 536 } 537 538 Ok(Some(ParseResult { 539 unknown_dids: unknown, 540 cursor: next_cursor, 541 count, 542 })) 543 }), 544 ) 545 .await 546 } 547 .into_diagnostic()? 548 .map_err(|_| { 549 error!( 550 "spawn_blocking task for parsing listRepos timed out after {}", 551 BLOCKING_TASK_TIMEOUT.as_secs() 552 ); 553 miette::miette!("spawn_blocking task for parsing listRepos timed out") 554 })?; 555 556 let ParseResult { 557 unknown_dids, 558 cursor: next_cursor, 559 count, 560 } = match parse_result { 561 Ok(Some(res)) => res, 562 Ok(None) => { 563 info!("finished enumeration (or empty page)"); 564 if let Cursor::Next(Some(c)) = cursor { 565 info!("reached end of list."); 566 cursor = Cursor::Done(c); 567 } 568 info!("sleeping 1h before next enumeration pass"); 569 tokio::time::sleep(Duration::from_secs(3600)).await; 570 info!("resuming after 1h sleep"); 571 continue; 572 } 573 Err(e) => return Err(e).wrap_err("error while crawling"), 574 }; 575 576 debug!(count, "fetched repos"); 577 crawler.crawled_count.fetch_add(count, Ordering::Relaxed); 578 579 let in_flight = if filter.check_signals() && !unknown_dids.is_empty() { 580 // we dont need to pass any existing since we have none; we are crawling after all 581 crawler 582 .check_signals_batch(&unknown_dids, &filter, &mut batch, &HashMap::new()) 583 .await? 584 } else { 585 // no signal checking but still need dedup to avoid orphan pending entries 586 crawler.acquire_in_flight(unknown_dids).await 587 }; 588 589 for did in &in_flight.repos { 590 let did_key = keys::repo_key(did); 591 trace!(did = %did, "found new repo"); 592 593 let state = RepoState::untracked(rng.next_u64()); 594 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 595 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 596 } 597 598 if let Some(new_cursor) = next_cursor { 599 cursor = Cursor::Next(Some(new_cursor.as_str().into())); 600 } else if let Cursor::Next(Some(c)) = cursor { 601 info!("reached end of list."); 602 cursor = Cursor::Done(c); 603 } 604 batch.insert( 605 &db.cursors, 606 crawler_cursor_key(&relay_id(relay_host)), 607 rmp_serde::to_vec(&cursor) 608 .into_diagnostic() 609 .wrap_err("cant serialize cursor")?, 610 ); 611 612 tokio::time::timeout( 613 BLOCKING_TASK_TIMEOUT, 614 tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()), 615 ) 616 .await 617 .into_diagnostic()? 618 .map_err(|_| { 619 error!( 620 "spawn_blocking task for batch commit timed out after {}", 621 BLOCKING_TASK_TIMEOUT.as_secs() 622 ); 623 miette::miette!("spawn_blocking task for batch commit timed out") 624 })? 625 .inspect_err(|e| { 626 error!(err = ?e, "batch commit failed"); 627 }) 628 .ok(); 629 630 drop(in_flight.guards); 631 632 crawler.account_new_repos(in_flight.repos.len()).await; 633 634 if matches!(cursor, Cursor::Done(_)) { 635 info!("enumeration complete, sleeping 1h before next pass"); 636 tokio::time::sleep(Duration::from_secs(3600)).await; 637 info!("resuming after 1h sleep"); 638 } 639 } 640 } 641 642 fn process_retry_queue(&self) -> Result<Option<Duration>> { 643 let db = &self.state.db; 644 let now = Utc::now(); 645 646 let mut ready: Vec<Did> = Vec::new(); 647 let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new(); 648 let mut next_wake: Option<Duration> = None; 649 let mut had_more = false; 650 651 let mut rng: SmallRng = rand::make_rng(); 652 653 let mut batch = db.inner.batch(); 654 for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 655 let (key, val) = guard.into_inner().into_diagnostic()?; 656 let state: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?; 657 let did = keys::crawler_retry_parse_key(&key)?.to_did(); 658 659 // leave capped entries alone for API inspection 660 if state.attempts >= MAX_RETRY_ATTEMPTS { 661 continue; 662 } 663 664 let backoff = TimeDelta::seconds( 665 state 666 .duration 667 .as_seconds_f64() 668 .mul(rng.random_range(0.01..0.07)) as i64, 669 ); 670 if state.after + backoff > now { 671 let wake = (state.after - now).to_std().unwrap_or(Duration::ZERO); 672 next_wake = Some(next_wake.map(|w| w.min(wake)).unwrap_or(wake)); 673 continue; 674 } 675 676 if ready.len() >= MAX_RETRY_BATCH { 677 had_more = true; 678 break; 679 } 680 681 ready.push(did.clone()); 682 existing.insert(did, state); 683 } 684 685 if ready.is_empty() { 686 return Ok(next_wake); 687 } 688 689 debug!(count = ready.len(), "retrying pending repos"); 690 691 let handle = tokio::runtime::Handle::current(); 692 let filter = self.state.filter.load(); 693 let in_flight = 694 handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch, &existing))?; 695 696 let mut rng: SmallRng = rand::make_rng(); 697 for did in &in_flight.repos { 698 let did_key = keys::repo_key(did); 699 700 if db.repos.contains_key(&did_key).into_diagnostic()? { 701 continue; 702 } 703 704 let state = RepoState::untracked(rng.next_u64()); 705 batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 706 batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 707 } 708 709 batch.commit().into_diagnostic()?; 710 711 drop(in_flight.guards); 712 713 if !in_flight.repos.is_empty() { 714 info!(count = in_flight.repos.len(), "recovered from retry queue"); 715 handle.block_on(self.account_new_repos(in_flight.repos.len())); 716 } 717 718 // if we hit the batch cap there are more ready entries, loop back immediately 719 Ok(had_more.then_some(Duration::ZERO).or(next_wake)) 720 } 721 722 fn check_repo_signals( 723 &self, 724 filter: Arc<crate::filter::FilterConfig>, 725 did: Did<'static>, 726 ) -> impl Future<Output = (Did<'static>, CrawlCheckResult)> + Send + 'static { 727 let resolver = self.state.resolver.clone(); 728 let http = self.http.clone(); 729 let throttler = self.pds_throttler.clone(); 730 async move { 731 const MAX_RETRIES: u32 = 5; 732 733 let pds_url = (|| resolver.resolve_identity_info(&did)) 734 .retry(MAX_RETRIES, |e, attempt| { 735 matches!(e, crate::resolver::ResolverError::Ratelimited) 736 .then(|| Duration::from_secs(1 << attempt.min(5))) 737 }) 738 .await; 739 740 let pds_url = match pds_url { 741 Ok((url, _)) => url, 742 Err(RetryOutcome::Ratelimited) => { 743 error!( 744 retries = MAX_RETRIES, 745 "rate limited resolving identity, giving up" 746 ); 747 // no pds handle to read retry_after from; use a short default 748 return (did, RetryState::new(60).into()); 749 } 750 Err(RetryOutcome::Failed(e)) => { 751 error!(err = %e, "failed to resolve identity"); 752 return (did, RetryState::new(60).into()); 753 } 754 }; 755 756 let throttle = throttler.get_handle(&pds_url).await; 757 if throttle.is_throttled() { 758 trace!(host = pds_url.host_str(), "skipping throttled pds"); 759 return (did, throttle.to_retry_state().into()); 760 } 761 762 let _permit = throttle.acquire().unit_error().or_failure(&throttle, || ()); 763 let Ok(_permit) = _permit.await else { 764 trace!( 765 host = pds_url.host_str(), 766 "pds failed while waiting for permit" 767 ); 768 return (did, throttle.to_retry_state().into()); 769 }; 770 771 enum RequestError { 772 Reqwest(reqwest::Error), 773 RateLimited(Option<u64>), 774 /// hard failure notification from another task on this PDS 775 Throttled, 776 } 777 778 let mut describe_url = pds_url.join("/xrpc/com.atproto.repo.describeRepo").unwrap(); 779 describe_url.query_pairs_mut().append_pair("repo", &did); 780 781 let resp = async { 782 let resp = http 783 .get(describe_url) 784 .timeout(throttle.timeout()) 785 .send() 786 .await 787 .map_err(RequestError::Reqwest)?; 788 789 // dont retry ratelimits since we will just put it in a queue to be tried again later 790 if resp.status() == StatusCode::TOO_MANY_REQUESTS { 791 return Err(RequestError::RateLimited(parse_retry_after(&resp))); 792 } 793 794 resp.error_for_status().map_err(RequestError::Reqwest) 795 } 796 .or_failure(&throttle, || RequestError::Throttled) 797 .await; 798 799 let resp = match resp { 800 Ok(r) => { 801 throttle.record_success(); 802 r 803 } 804 Err(RequestError::RateLimited(secs)) => { 805 throttle.record_ratelimit(secs); 806 return ( 807 did, 808 throttle 809 .to_retry_state() 810 .with_status(StatusCode::TOO_MANY_REQUESTS) 811 .into(), 812 ); 813 } 814 Err(RequestError::Throttled) => { 815 return (did, throttle.to_retry_state().into()); 816 } 817 Err(RequestError::Reqwest(e)) => { 818 if e.is_timeout() && !throttle.record_timeout() { 819 // first or second timeout, just requeue 820 let mut retry_state = RetryState::new(60); 821 retry_state.status = e.status(); 822 return (did, retry_state.into()); 823 } 824 // third timeout, if timeout fail is_throttle_worthy will ban the pds 825 826 if is_throttle_worthy(&e) { 827 if let Some(mins) = throttle.record_failure() { 828 warn!(url = %pds_url, mins, "throttling pds due to hard failure"); 829 } 830 let mut retry_state = throttle.to_retry_state(); 831 retry_state.status = e.status(); 832 return (did, retry_state.into()); 833 } 834 835 match e.status() { 836 Some(StatusCode::NOT_FOUND | StatusCode::GONE) => { 837 trace!("repo not found"); 838 return (did, CrawlCheckResult::NoSignal); 839 } 840 Some(s) if s.is_client_error() => { 841 error!(status = %s, "repo unavailable"); 842 return (did, CrawlCheckResult::NoSignal); 843 } 844 _ => { 845 error!(err = %e, "repo errored"); 846 let mut retry_state = RetryState::new(60 * 15); 847 retry_state.status = e.status(); 848 return (did, retry_state.into()); 849 } 850 } 851 } 852 }; 853 854 let bytes = match resp.bytes().await { 855 Ok(b) => b, 856 Err(e) => { 857 error!(err = %e, "failed to read describeRepo response"); 858 return (did, RetryState::new(60 * 5).into()); 859 } 860 }; 861 862 let out = match serde_json::from_slice::<DescribeRepoOutput>(&bytes) { 863 Ok(out) => out, 864 Err(e) => { 865 error!(err = %e, "failed to parse describeRepo response"); 866 return (did, RetryState::new(60 * 10).into()); 867 } 868 }; 869 870 let found_signal = out 871 .collections 872 .iter() 873 .any(|col| filter.matches_signal(col.as_str())); 874 875 if !found_signal { 876 trace!("no signal-matching collections found"); 877 } 878 879 return ( 880 did, 881 found_signal 882 .then_some(CrawlCheckResult::Signal) 883 .unwrap_or(CrawlCheckResult::NoSignal), 884 ); 885 } 886 } 887 888 async fn check_signals_batch( 889 &self, 890 repos: &[Did<'static>], 891 filter: &Arc<crate::filter::FilterConfig>, 892 batch: &mut fjall::OwnedWriteBatch, 893 existing: &HashMap<Did<'static>, RetryState>, 894 ) -> Result<InFlightRepos> { 895 let db = &self.state.db; 896 let in_flight = self.acquire_in_flight(repos.to_vec()).await; 897 let mut valid = Vec::with_capacity(in_flight.repos.len()); 898 let mut set = tokio::task::JoinSet::new(); 899 900 for did in in_flight.repos { 901 let filter = filter.clone(); 902 let span = tracing::info_span!("signals", did = %did); 903 set.spawn( 904 self.check_repo_signals(filter, did.clone()) 905 .instrument(span), 906 ); 907 } 908 909 while let Some(res) = tokio::time::timeout(Duration::from_secs(60), set.join_next()) 910 .await 911 .into_diagnostic() 912 .map_err(|_| { 913 error!("signal check task timed out after 60s"); 914 miette::miette!("signal check task timed out") 915 })? 916 { 917 let (did, result) = match res { 918 Ok(inner) => inner, 919 Err(e) => { 920 error!(err = ?e, "signal check panicked"); 921 continue; 922 } 923 }; 924 925 match result { 926 CrawlCheckResult::Signal => { 927 batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 928 valid.push(did); 929 } 930 CrawlCheckResult::NoSignal => { 931 batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 932 } 933 CrawlCheckResult::Retry(state) => { 934 let prev_attempts = existing.get(&did).map(|s| s.attempts).unwrap_or(0); 935 let carried = RetryState { 936 attempts: prev_attempts, 937 ..state 938 }; 939 let next = match carried.next_attempt() { 940 Some(next) => next, 941 None => RetryState { 942 attempts: MAX_RETRY_ATTEMPTS, 943 ..state 944 }, 945 }; 946 batch.insert( 947 &db.crawler, 948 keys::crawler_retry_key(&did), 949 rmp_serde::to_vec(&next).into_diagnostic()?, 950 ); 951 } 952 } 953 } 954 955 Ok(InFlightRepos { 956 repos: valid, 957 guards: in_flight.guards, 958 }) 959 } 960 961 async fn acquire_in_flight(&self, dids: Vec<Did<'static>>) -> InFlightRepos { 962 let mut filtered = Vec::with_capacity(dids.len()); 963 let mut guards = Vec::with_capacity(dids.len()); 964 for did in dids { 965 if self.in_flight.insert_async(did.clone()).await.is_err() { 966 trace!(did = %did, "repo in-flight, skipping"); 967 continue; 968 } 969 guards.push(InFlightGuard { 970 set: self.in_flight.clone(), 971 did: did.clone(), 972 }); 973 filtered.push(did); 974 } 975 InFlightRepos { 976 guards, 977 repos: filtered, 978 } 979 } 980 981 async fn account_new_repos(&self, count: usize) { 982 if count == 0 { 983 return; 984 } 985 986 self.count.fetch_add(count, Ordering::Relaxed); 987 self.state 988 .db 989 .update_count_async("repos", count as i64) 990 .await; 991 self.state 992 .db 993 .update_count_async("pending", count as i64) 994 .await; 995 self.state.notify_backfill(); 996 } 997}