very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer

[crawler] rewrite component, spread into producers, add consuming from listReposByCollection

ptr.pet f99a9cd2 b24b23f1

verified
+1806 -1038
+24 -10
README.md
··· 2 2 3 3 `hydrant` is an AT Protocol indexer built on the `fjall` database that handles sync for you. it's flexible, supporting both full-network indexing and filtered indexing (e.g., by DID), also allowing querying with XRPCs and providing an ordered event stream with cursor support. 4 4 5 - you can see [random.wisp.place](https://tangled.org/did:plc:dfl62fgb7wtjj3fcbb72naae/random.wisp.place) for an example on how to use hydrant. 5 + you can see [random.wisp.place](https://tangled.org/did:plc:dfl62fgb7wtjj3fcbb72naae/random.wisp.place) (standalone binary using http API) or the [statusphere example](./examples/statusphere.rs) (hydrant-as-library) for examples on how to use hydrant. 6 6 7 7 **WARNING: *the db format is not stable yet.*** it's in active development so if you are going to rely on the db format being stable, don't (eg. for query features, if you are using ephemeral mode this doesn't matter for example, or you dont mind losing your existing backfilled data in hydrant if you already processed them.). 8 8 9 9 ## vs `tap` 10 10 11 - while [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) is designed as a firehose consumer and simply just propagates events while handling sync, `hydrant` is flexible, it allows you to directly query the database for records, and it also provides an ordered view of events, allowing the use of a cursor to fetch events from a specific point in time. 11 + while [`tap`](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) is designed as a firehose consumer and simply just propagates events while handling sync, `hydrant` is flexible, it allows you to directly query the database for records, and it also provides an ordered view of events, allowing the use of a cursor to fetch events from a specific point. it can act as both an indexer or an ephemeral view of some window of events. 12 12 13 13 ### stream behavior 14 14 ··· 24 24 25 25 ### multiple relay support 26 26 27 - `hydrant` supports connecting to multiple relays simultaneously for both firehose ingestion and crawling. when `RELAY_HOSTS` is configured with multiple URLs: 27 + `hydrant` supports connecting to multiple relays simultaneously for firehose ingestion. when `RELAY_HOSTS` is configured with multiple URLs: 28 28 29 29 - one independent firehose stream loop is spawned per relay 30 - - one independent crawling loop is spawned per relay 31 - - each relay maintains its own firehose / crawler cursor state 32 - - all ingestion loops and crawlers share the same worker pool and database 33 - - all crawlers share the same pending queue for backfill 30 + - each relay maintains its own firehose cursor state 31 + - all ingestion loops share the same worker pool and database 34 32 35 33 commit events are de-duplicated according to the repo `rev`. account / identity events are de-duplicated using the `time` field. 36 34 todo: decide what to do on relay-side account takedowns or if relays set the `time` field. 37 35 36 + ### crawler sources 37 + 38 + the crawler is configured separately from the firehose via `CRAWLER_URLS`. each source is a `[mode::]url` entry where the mode prefix is optional and defaults to `by_collection` in filter mode or `relay` in full-network mode. 39 + 40 + - `relay`: enumerates the network via `com.atproto.sync.listRepos`, then checks each repo's collections via `describeRepo`. used for full-network discovery. 41 + - `by_collection`: queries `com.atproto.sync.listReposByCollection` for each configured signal. more efficient for filtered indexing since it only surfaces repos that have matching records. 42 + cursors are stored per collection. 43 + 44 + ``` 45 + CRAWLER_URLS=by_collection::https://lightrail.microcosm.blue,relay::wss://bsky.network 46 + ``` 47 + 48 + each source maintains its own cursor so restarts resume mid-pass. 49 + 38 50 ## configuration 39 51 40 52 `hydrant` is configured via environment variables. all variables are prefixed with `HYDRANT_` (except `RUST_LOG`). ··· 43 55 | :--- | :--- | :--- | 44 56 | `DATABASE_PATH` | `./hydrant.db` | path to the database folder. | 45 57 | `RUST_LOG` | `info` | log filter directives (e.g., `debug`, `hydrant=trace`). [`tracing` env-filter syntax](https://docs.rs/tracing-subscriber/latest/tracing_subscriber/filter/struct.EnvFilter.html). | 46 - | `RELAY_HOST` | `wss://relay.fire.hose.cam/` | URL of the relay. | 47 - | `RELAY_HOSTS` | | comma-separated list of relay URLs. if unset, falls back to `RELAY_HOST`. | 58 + | `RELAY_HOST` | `wss://relay.fire.hose.cam/` | URL of the relay (firehose only). | 59 + | `RELAY_HOSTS` | | comma-separated list of relay URLs (firehose only). if unset, falls back to `RELAY_HOST`. | 60 + | `CRAWLER_URLS` | relay hosts in full-network mode, `https://lightrail.microcosm.blue` in filter mode | comma-separated list of `[mode::]url` crawler sources. mode is `relay` or `by_collection`; bare URLs use the default mode. set to empty string to disable crawling. | 48 61 | `PLC_URL` | `https://plc.wtf`, `https://plc.directory` if full network | base URL(s) of the PLC directory (comma-separated for multiple). | 49 62 | `EPHEMERAL` | `false` | if enabled, no records are stored. events are deleted after a certain duration (`EPHEMERAL_TTL`). | 50 63 | `EPHEMERAL_TTL` | `60min` | decides after how long events should be deleted. | ··· 63 76 | `ENABLE_DEBUG` | `false` | enable debug endpoints. | 64 77 | `DEBUG_PORT` | `API_PORT + 1` | port for debug endpoints (if enabled). | 65 78 | `ENABLE_FIREHOSE` | `true` | whether to ingest relay subscriptions. | 66 - | `ENABLE_CRAWLER` | `false` (if Filter), `true` (if Full) | whether to actively query the network for unknown repositories. | 79 + | `ENABLE_CRAWLER` | `true` if full network or crawler sources are configured, `false` otherwise | whether to actively query the network for unknown repositories. | 67 80 | `CRAWLER_MAX_PENDING_REPOS` | `2000` | max pending repos for crawler. | 68 81 | `CRAWLER_RESUME_PENDING_REPOS` | `1000` | resume threshold for crawler pending repos. | 69 82 ··· 86 99 87 100 - `POST /db/train`: train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces. dictionaries are written to disk; a restart is required to apply them. the crawler, firehose, and backfill worker are paused for the duration and restored on completion. 88 101 - `POST /db/compact`: trigger a full major compaction of all database keyspaces in parallel. the crawler, firehose, and backfill worker are paused for the duration and restored on completion. 102 + - `DELETE /cursors`: reset all stored cursors for a given URL. body: `{ "key": "..." }` where key is a URL. clears the relay crawler cursor, and any by-collection cursors associated with that URL. causes the next crawler pass to restart from the beginning. 89 103 90 104 #### filter mode 91 105
+27 -8
examples/statusphere.rs
··· 11 11 //! 12 12 //! the database persists records across restarts. on each start the full event 13 13 //! history is replayed from the database to rebuild the in-memory index. 14 + //! (in a better app, we could for example use the ephemeral mode of hydrant, 15 + //! and use our db, or we could use hydrant to backfill multiple instances of the app.) 14 16 17 + use std::str::FromStr; 15 18 use std::sync::Arc; 16 19 use std::time::Duration; 17 20 21 + use chrono::DateTime; 18 22 use futures::StreamExt; 19 23 use hydrant::config::Config; 20 24 use hydrant::control::{EventStream, Hydrant, ReposControl}; 21 25 use hydrant::filter::FilterMode; 26 + use jacquard_common::types::tid::Tid; 22 27 use scc::HashMap; 23 28 24 29 const COLLECTION: &str = "xyz.statusphere.status"; ··· 40 45 } 41 46 } 42 47 43 - fn set(&self, did: String, emoji: String, created_at: String) -> bool { 48 + fn set(&self, did: String, emoji: String, created_at: &str) -> bool { 44 49 let is_newer = self 45 50 .current 46 - .read_sync(&did, |_, e| created_at > e.created_at) 51 + .read_sync(&did, |_, e| created_at > e.created_at.as_str()) 47 52 .unwrap_or(true); 48 53 if is_newer { 49 - self.current 50 - .upsert_sync(did, StatusEntry { emoji, created_at }); 54 + self.current.upsert_sync( 55 + did, 56 + StatusEntry { 57 + emoji, 58 + created_at: created_at.to_owned(), 59 + }, 60 + ); 51 61 } 52 62 is_newer 53 63 } ··· 107 117 let created_at = record 108 118 .get("createdAt") 109 119 .and_then(|v| v.as_str()) 110 - .unwrap_or("") 111 - .to_owned(); 120 + .unwrap_or(""); 112 121 if index.set(did.clone(), emoji.clone(), created_at) { 113 122 let name = repos 114 123 .get(&rec.did) ··· 117 126 .flatten() 118 127 .and_then(|info| info.handle) 119 128 .unwrap_or(did); 120 - println!("[{}] {name} set status: {emoji}", event.id); 129 + println!("[{created_at}] {name}: {emoji}"); 121 130 } 122 131 } 123 132 "delete" => { ··· 129 138 .and_then(|info| info.handle) 130 139 .unwrap_or(did.clone()); 131 140 index.delete(&did); 132 - println!("[{}] {name} cleared status", event.id); 141 + let date = Tid::from_str(&rec.rkey) 142 + .ok() 143 + .and_then(|tid| DateTime::from_timestamp_micros(tid.timestamp() as i64)) 144 + .map(|date| date.to_string()) 145 + .unwrap_or_else(|| "invalid rkey".to_string()); 146 + println!("[{date}] {name} cleared status"); 133 147 } 134 148 _ => {} 135 149 } ··· 148 162 .with_env_filter("hydrant=info") 149 163 .init(); 150 164 165 + // config is loaded from environment variables (all prefixed with HYDRANT_). 166 + // key defaults for this example: 167 + // DATABASE_PATH=./hydrant.db | where to store the database. 168 + // RELAY_HOST=wss://relay.fire.hose.cam/ | firehose source. 169 + // CRAWLER_URLS=https://lightrail.microcosm.blue | crawler sources. in filter mode this defaults to `by-collection`. 151 170 let cfg = Config::from_env()?; 152 171 let hydrant = Hydrant::new(cfg).await?; 153 172
+25 -1
src/api/db.rs
··· 1 1 use crate::control::Hydrant; 2 - use axum::{Router, extract::State, http::StatusCode, routing::post}; 2 + use axum::{ 3 + Json, Router, 4 + extract::State, 5 + http::StatusCode, 6 + routing::{delete, post}, 7 + }; 8 + use serde::Deserialize; 3 9 4 10 pub fn router() -> Router<Hydrant> { 5 11 Router::new() 6 12 .route("/db/train", post(handle_train_dict)) 7 13 .route("/db/compact", post(handle_compact)) 14 + .route("/cursors", delete(handle_reset_cursor)) 8 15 } 9 16 10 17 pub async fn handle_train_dict( ··· 13 20 hydrant 14 21 .db 15 22 .train_dicts() 23 + .await 24 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 25 + Ok(StatusCode::OK) 26 + } 27 + 28 + #[derive(Deserialize)] 29 + pub struct ResetCursorBody { 30 + pub key: String, 31 + } 32 + 33 + pub async fn handle_reset_cursor( 34 + State(hydrant): State<Hydrant>, 35 + Json(body): Json<ResetCursorBody>, 36 + ) -> Result<StatusCode, (StatusCode, String)> { 37 + hydrant 38 + .crawler 39 + .reset_cursor(&body.key) 16 40 .await 17 41 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 18 42 Ok(StatusCode::OK)
+104
src/config.rs
··· 6 6 use url::Url; 7 7 8 8 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 9 + pub enum CrawlerMode { 10 + /// enumerate via `com.atproto.sync.listRepos`, then check signals with `describeRepo`. 11 + Relay, 12 + /// enumerate via `com.atproto.sync.listReposByCollection` for each configured signal. 13 + ByCollection, 14 + } 15 + 16 + impl CrawlerMode { 17 + fn default_for(full_network: bool) -> Self { 18 + if full_network { 19 + Self::Relay 20 + } else { 21 + Self::ByCollection 22 + } 23 + } 24 + } 25 + 26 + impl FromStr for CrawlerMode { 27 + type Err = miette::Error; 28 + fn from_str(s: &str) -> Result<Self> { 29 + match s { 30 + "relay" => Ok(Self::Relay), 31 + "by_collection" | "by-collection" => Ok(Self::ByCollection), 32 + _ => Err(miette::miette!( 33 + "invalid crawler mode: expected 'relay' or 'by_collection'" 34 + )), 35 + } 36 + } 37 + } 38 + 39 + impl fmt::Display for CrawlerMode { 40 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 41 + match self { 42 + Self::Relay => write!(f, "relay"), 43 + Self::ByCollection => write!(f, "by_collection"), 44 + } 45 + } 46 + } 47 + 48 + /// a single crawler source: a URL and the mode used to enumerate it. 49 + #[derive(Debug, Clone)] 50 + pub struct CrawlerSource { 51 + pub url: Url, 52 + pub mode: CrawlerMode, 53 + } 54 + 55 + impl CrawlerSource { 56 + /// parse `[mode::]url` — mode prefix is optional, falls back to `default_mode`. 57 + fn parse(s: &str, default_mode: CrawlerMode) -> Option<Self> { 58 + if let Some((prefix, rest)) = s.split_once("::") { 59 + let mode = prefix.parse().ok()?; 60 + let url = Url::parse(rest).ok()?; 61 + Some(Self { url, mode }) 62 + } else { 63 + let url = Url::parse(s).ok()?; 64 + Some(Self { 65 + url, 66 + mode: default_mode, 67 + }) 68 + } 69 + } 70 + } 71 + 72 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 9 73 pub enum Compression { 10 74 Lz4, 11 75 Zstd, ··· 93 157 pub filter_signals: Option<Vec<String>>, 94 158 pub filter_collections: Option<Vec<String>>, 95 159 pub filter_excludes: Option<Vec<String>>, 160 + /// crawler sources: each entry pairs a URL with a discovery mode. 161 + /// 162 + /// set via `HYDRANT_CRAWLER_URLS` as a comma-separated list of `[mode::]url` entries, 163 + /// e.g. `relay::wss://bsky.network,by_collection::https://lightrail.microcosm.blue`. 164 + /// a bare URL without a `mode::` prefix uses the default mode (`relay` for full-network, 165 + /// `by_collection` otherwise). defaults to the relay hosts with the default mode. 166 + /// set to an empty string to disable crawling entirely. 167 + pub crawler_sources: Vec<CrawlerSource>, 96 168 } 97 169 98 170 impl Config { ··· 229 301 .collect() 230 302 }); 231 303 304 + let default_mode = CrawlerMode::default_for(full_network); 305 + let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") { 306 + Ok(s) => s 307 + .split(',') 308 + .map(|s| s.trim()) 309 + .filter(|s| !s.is_empty()) 310 + .filter_map(|s| CrawlerSource::parse(s, default_mode)) 311 + .collect(), 312 + Err(_) => match default_mode { 313 + CrawlerMode::Relay => relay_hosts 314 + .iter() 315 + .map(|url| CrawlerSource { 316 + url: url.clone(), 317 + mode: CrawlerMode::Relay, 318 + }) 319 + .collect(), 320 + CrawlerMode::ByCollection => vec![CrawlerSource { 321 + url: Url::parse("https://lightrail.microcosm.blue").unwrap(), 322 + mode: CrawlerMode::ByCollection, 323 + }], 324 + }, 325 + }; 326 + 232 327 Ok(Self { 233 328 database_path, 234 329 relays: relay_hosts, ··· 258 353 filter_signals, 259 354 filter_collections, 260 355 filter_excludes, 356 + crawler_sources, 261 357 }) 262 358 } 263 359 } ··· 355 451 "crawler resume pending", 356 452 self.crawler_resume_pending_repos 357 453 )?; 454 + if !self.crawler_sources.is_empty() { 455 + let sources: Vec<_> = self 456 + .crawler_sources 457 + .iter() 458 + .map(|s| format!("{}::{}", s.mode, s.url)) 459 + .collect(); 460 + config_line!(f, "crawler sources", sources.join(", "))?; 461 + } 358 462 if let Some(signals) = &self.filter_signals { 359 463 config_line!(f, "filter signals", format_args!("{:?}", signals))?; 360 464 }
+151 -33
src/control.rs
··· 19 19 20 20 use crate::backfill::BackfillWorker; 21 21 use crate::config::{Config, SignatureVerification}; 22 - use crate::crawler::Crawler; 23 22 use crate::db::{self, filter as db_filter, keys, ser_repo_state}; 24 23 use crate::filter::{FilterMode, SetUpdate}; 25 24 use crate::ingest::{firehose::FirehoseIngestor, worker::FirehoseWorker}; ··· 126 125 // 4. set crawler enabled state from config, evaluated against the post-patch filter 127 126 let post_patch_crawler = match config.enable_crawler { 128 127 Some(b) => b, 129 - None => state.filter.load().mode == FilterMode::Full, 128 + None => { 129 + state.filter.load().mode == FilterMode::Full || !config.crawler_sources.is_empty() 130 + } 130 131 }; 131 132 state.crawler_enabled.send_replace(post_patch_crawler); 132 133 ··· 317 318 } 318 319 } 319 320 320 - // 11. spawn the crawler if we have relay hosts to crawl 321 - if !relay_hosts.is_empty() { 322 - let crawler_rx = state.crawler_enabled.subscribe(); 321 + // 11. spawn crawler components 322 + if !config.crawler_sources.is_empty() { 323 + use crate::config::CrawlerMode; 324 + use crate::crawler::throttle::Throttler; 325 + use crate::crawler::{ 326 + ByCollectionProducer, CrawlerStats, CrawlerWorker, InFlight, RelayProducer, 327 + RetryProducer, SignalChecker, 328 + }; 329 + use std::time::Duration; 330 + use tracing::Instrument; 331 + 332 + let http = reqwest::Client::builder() 333 + .user_agent(concat!( 334 + env!("CARGO_PKG_NAME"), 335 + "/", 336 + env!("CARGO_PKG_VERSION") 337 + )) 338 + .gzip(true) 339 + .build() 340 + .expect("that reqwest will build"); 341 + let pds_throttler = Throttler::new(); 342 + let in_flight = InFlight::new(); 343 + let stats = CrawlerStats::new( 344 + state.clone(), 345 + config 346 + .crawler_sources 347 + .iter() 348 + .map(|s| s.url.clone()) 349 + .collect(), 350 + pds_throttler.clone(), 351 + ); 352 + let checker = SignalChecker { 353 + http: http.clone(), 354 + state: state.clone(), 355 + throttler: pds_throttler, 356 + }; 357 + 323 358 info!( 324 - relay_count = relay_hosts.len(), 325 - hosts = relay_hosts 326 - .iter() 327 - .map(|h| h.as_str()) 328 - .collect::<Vec<_>>() 329 - .join(", "), 359 + max_pending = config.crawler_max_pending_repos, 360 + resume_pending = config.crawler_resume_pending_repos, 330 361 enabled = *state.crawler_enabled.borrow(), 331 - "starting crawler(s)" 362 + "starting crawler worker" 332 363 ); 333 - let state = state.clone(); 334 - let max_pending = config.crawler_max_pending_repos; 335 - let resume_pending = config.crawler_resume_pending_repos; 364 + let (worker, tx) = CrawlerWorker::new( 365 + state.clone(), 366 + config.crawler_max_pending_repos, 367 + config.crawler_resume_pending_repos, 368 + stats.clone(), 369 + ); 370 + tokio::spawn(async move { 371 + worker.run().await; 372 + error!("crawler worker exited unexpectedly, aborting"); 373 + std::process::abort(); 374 + }); 375 + 376 + let ticker = tokio::spawn(stats.clone().task()); 336 377 tokio::spawn(async move { 337 - let crawler = 338 - Crawler::new(state, relay_hosts, max_pending, resume_pending, crawler_rx); 339 - if let Err(e) = crawler.run().await { 340 - error!(err = %e, "crawler error"); 341 - db::check_poisoned_report(&e); 378 + match ticker.await { 379 + Err(e) => error!(err = ?e, "stats ticker panicked, aborting"), 380 + Ok(()) => error!("stats ticker exited unexpectedly, aborting"), 342 381 } 382 + std::process::abort(); 343 383 }); 384 + 385 + tokio::spawn( 386 + RetryProducer { 387 + checker: checker.clone(), 388 + in_flight: in_flight.clone(), 389 + tx: tx.clone(), 390 + } 391 + .run(), 392 + ); 393 + 394 + let crawler_rx = state.crawler_enabled.subscribe(); 395 + for source in config.crawler_sources.iter().cloned() { 396 + let http = http.clone(); 397 + let state = state.clone(); 398 + let in_flight = in_flight.clone(); 399 + let tx = tx.clone(); 400 + let stats = stats.clone(); 401 + let enabled = crawler_rx.clone(); 402 + match source.mode { 403 + CrawlerMode::Relay => { 404 + info!(relay = %source.url, enabled = *state.crawler_enabled.borrow(), "starting relay crawler"); 405 + let span = tracing::info_span!("crawl", url = %source.url); 406 + tokio::spawn( 407 + RelayProducer { 408 + relay_url: source.url, 409 + checker: checker.clone(), 410 + in_flight, 411 + tx, 412 + enabled, 413 + stats, 414 + } 415 + .run() 416 + .instrument(span), 417 + ); 418 + } 419 + CrawlerMode::ByCollection => { 420 + info!( 421 + host = source.url.host_str(), 422 + enabled = *state.crawler_enabled.borrow(), 423 + "starting by-collection crawler" 424 + ); 425 + let span = 426 + tracing::info_span!("by_collection", host = source.url.host_str()); 427 + tokio::spawn( 428 + async move { 429 + loop { 430 + let producer = ByCollectionProducer { 431 + index_url: source.url.clone(), 432 + http: http.clone(), 433 + state: state.clone(), 434 + in_flight: in_flight.clone(), 435 + tx: tx.clone(), 436 + enabled: enabled.clone(), 437 + stats: stats.clone(), 438 + }; 439 + if let Err(e) = producer.run().await { 440 + error!(err = ?e, "by-collection crawler fatal error, restarting in 30s"); 441 + tokio::time::sleep(Duration::from_secs(30)).await; 442 + } 443 + } 444 + } 445 + .instrument(span), 446 + ); 447 + } 448 + } 449 + } 344 450 } 345 451 346 452 // 12. spawn the firehose worker on a blocking thread (fatal task) ··· 550 656 pub struct CrawlerHandle(Arc<AppState>); 551 657 552 658 impl CrawlerHandle { 553 - /// enable the crawler. no-op if already enabled. 659 + /// enable the crawler (enables all configured producers). no-op if already enabled. 554 660 pub fn enable(&self) { 555 661 self.0.crawler_enabled.send_replace(true); 556 662 } 557 - /// disable the crawler. in-progress repo checks finish before the crawler pauses. 663 + /// disable the crawler (disables all configured producers). 664 + /// in-progress repo checks finish before the crawler pauses. 558 665 pub fn disable(&self) { 559 666 self.0.crawler_enabled.send_replace(false); 560 667 } ··· 562 669 pub fn is_enabled(&self) -> bool { 563 670 *self.0.crawler_enabled.borrow() 564 671 } 672 + 673 + /// delete all cursor entries associated with the given URL. 674 + pub async fn reset_cursor(&self, url: &str) -> Result<()> { 675 + let db = self.0.db.clone(); 676 + let point_keys = [keys::crawler_cursor_key(url)]; 677 + let by_collection_prefix = keys::by_collection_cursor_prefix(url); 678 + tokio::task::spawn_blocking(move || { 679 + let mut batch = db.inner.batch(); 680 + for k in point_keys { 681 + batch.remove(&db.cursors, k); 682 + } 683 + for entry in db.cursors.prefix(&by_collection_prefix) { 684 + let (k, _) = entry.into_inner().into_diagnostic()?; 685 + batch.remove(&db.cursors, k); 686 + } 687 + batch.commit().into_diagnostic() 688 + }) 689 + .await 690 + .into_diagnostic()??; 691 + Ok(()) 692 + } 565 693 } 566 694 567 695 /// runtime control over the firehose ingestor component. 568 - /// 569 - /// the firehose connects to each configured relay's `com.atproto.sync.subscribeRepos` 570 - /// websocket and processes commit, identity, account, and sync events in real time. 571 - /// one independent connection is maintained per relay URL. 572 - /// 573 - /// disabling the firehose closes the websocket after the current message is processed. 574 696 #[derive(Clone)] 575 697 pub struct FirehoseHandle(Arc<AppState>); 576 698 ··· 594 716 /// the backfill worker fetches full repo CAR files from each repo's PDS for any 595 717 /// repository in the pending queue, parses the MST, and inserts all matching records 596 718 /// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`. 597 - /// 598 - /// disabling backfill lets any in-flight repo fetches finish before pausing. 599 719 #[derive(Clone)] 600 720 pub struct BackfillHandle(Arc<AppState>); 601 721 ··· 604 724 pub fn enable(&self) { 605 725 self.0.backfill_enabled.send_replace(true); 606 726 } 607 - /// disable the backfill worker. in-flight repo fetches complete before pausing. 727 + /// disable the backfill worker. in-flight repos complete before pausing. 608 728 pub fn disable(&self) { 609 729 self.0.backfill_enabled.send_replace(false); 610 730 } ··· 613 733 *self.0.backfill_enabled.borrow() 614 734 } 615 735 } 616 - 617 - // --- filter control --- 618 736 619 737 /// a point-in-time snapshot of the filter configuration. returned by all [`FilterControl`] methods. 620 738 ///
+241
src/crawler/by_collection.rs
··· 1 + use crate::db::keys::by_collection_cursor_key; 2 + use crate::db::{Db, keys}; 3 + use crate::state::AppState; 4 + use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, WatchEnabledExt}; 5 + use jacquard_api::com_atproto::sync::list_repos_by_collection::ListReposByCollectionOutput; 6 + use jacquard_common::{IntoStatic, types::string::Did}; 7 + use miette::{Context, IntoDiagnostic, Result}; 8 + use smol_str::SmolStr; 9 + use std::sync::Arc; 10 + use std::time::Duration; 11 + use tokio::sync::{mpsc, watch}; 12 + use tracing::{debug, error, info, trace, warn}; 13 + use url::Url; 14 + 15 + use super::worker::{CrawlerBatch, CursorUpdate}; 16 + use super::{CrawlerStats, InFlight, base_url}; 17 + 18 + const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30); 19 + 20 + pub(crate) struct ByCollectionProducer { 21 + pub(crate) index_url: Url, 22 + pub(crate) http: reqwest::Client, 23 + pub(crate) state: Arc<AppState>, 24 + pub(crate) in_flight: InFlight, 25 + pub(crate) tx: mpsc::Sender<CrawlerBatch>, 26 + pub(crate) enabled: watch::Receiver<bool>, 27 + pub(crate) stats: CrawlerStats, 28 + } 29 + 30 + impl ByCollectionProducer { 31 + /// hourly loop: runs one full pass per configured signal, then sleeps. 32 + pub(crate) async fn run(mut self) -> Result<()> { 33 + loop { 34 + self.enabled.wait_enabled("by-collection crawler").await; 35 + 36 + let filter = self.state.filter.load(); 37 + if filter.signals.is_empty() { 38 + debug!("no signals configured, by-collection crawler sleeping 5m"); 39 + tokio::time::sleep(Duration::from_secs(300)).await; 40 + continue; 41 + } 42 + 43 + let signals: Vec<String> = filter.signals.iter().map(|s| s.to_string()).collect(); 44 + drop(filter); 45 + 46 + info!( 47 + host = self.index_url.host_str(), 48 + signal_count = signals.len(), 49 + "starting by-collection discovery pass" 50 + ); 51 + 52 + for collection in &signals { 53 + self.enabled.wait_enabled("by-collection crawler").await; 54 + let span = tracing::info_span!("by_collection", %collection); 55 + use tracing::Instrument as _; 56 + if let Err(e) = self.crawl(collection).instrument(span).await { 57 + error!(err = %e, %collection, "by-collection crawl error, continuing"); 58 + } 59 + } 60 + 61 + info!("by-collection pass complete, sleeping 1h"); 62 + tokio::select! { 63 + _ = tokio::time::sleep(Duration::from_secs(3600)) => {} 64 + _ = self.enabled.changed() => { 65 + if !*self.enabled.borrow() { return Ok(()); } 66 + } 67 + } 68 + } 69 + } 70 + 71 + async fn crawl(&mut self, collection: &str) -> Result<()> { 72 + let db = &self.state.db; 73 + let base = base_url(&self.index_url)?; 74 + let cursor_key = by_collection_cursor_key(self.index_url.as_str(), collection); 75 + 76 + // resume from any persisted cursor, so a restart mid-pass doesn't rescan from scratch. 77 + let mut cursor: Option<SmolStr> = Db::get(db.cursors.clone(), &cursor_key) 78 + .await 79 + .ok() 80 + .flatten() 81 + .and_then(|b| String::from_utf8(b.to_vec()).ok().map(SmolStr::from)); 82 + 83 + if cursor.is_some() { 84 + debug!(%collection, "resuming by-collection pass from cursor"); 85 + } 86 + 87 + let mut total_count = 0usize; 88 + let mut new_count = 0usize; 89 + 90 + loop { 91 + self.enabled.wait_enabled("by-collection crawler").await; 92 + 93 + let mut url = base 94 + .join("/xrpc/com.atproto.sync.listReposByCollection") 95 + .into_diagnostic()?; 96 + url.query_pairs_mut() 97 + .append_pair("collection", collection) 98 + .append_pair("limit", "2000"); 99 + if let Some(c) = &cursor { 100 + url.query_pairs_mut().append_pair("cursor", c.as_str()); 101 + } 102 + 103 + let fetch_result = (|| self.http.get(url.clone()).send().error_for_status()) 104 + .retry(5, |e: &reqwest::Error, attempt| { 105 + matches!(e.status(), Some(reqwest::StatusCode::TOO_MANY_REQUESTS)) 106 + .then(|| Duration::from_secs(1 << attempt.min(5))) 107 + }) 108 + .await; 109 + 110 + let res = match fetch_result { 111 + Ok(r) => r, 112 + Err(RetryOutcome::Ratelimited) => { 113 + warn!(%collection, "rate limited by collection index after retries"); 114 + continue; 115 + } 116 + Err(RetryOutcome::Failed(e)) => { 117 + error!(err = %e, %collection, "by-collection fetch failed"); 118 + continue; 119 + } 120 + }; 121 + 122 + let bytes = match res.bytes().await { 123 + Ok(b) => b, 124 + Err(e) => { 125 + error!(err = %e, "can't read listReposByCollection response"); 126 + continue; 127 + } 128 + }; 129 + 130 + struct PageResult { 131 + unknown_dids: Vec<Did<'static>>, 132 + next_cursor: Option<SmolStr>, 133 + count: usize, 134 + } 135 + 136 + let page_result = { 137 + let repos_ks = db.repos.clone(); 138 + let filter_ks = db.filter.clone(); 139 + 140 + tokio::time::timeout( 141 + BLOCKING_TASK_TIMEOUT, 142 + tokio::task::spawn_blocking(move || -> miette::Result<Option<PageResult>> { 143 + let output = 144 + match serde_json::from_slice::<ListReposByCollectionOutput>(&bytes) { 145 + Ok(out) => out.into_static(), 146 + Err(e) => { 147 + error!( 148 + err = %e, 149 + "failed to parse listReposByCollection response" 150 + ); 151 + return Ok(None); 152 + } 153 + }; 154 + 155 + if output.repos.is_empty() { 156 + return Ok(None); 157 + } 158 + 159 + let count = output.repos.len(); 160 + let next_cursor = output.cursor.map(|c| c.as_str().into()); 161 + let mut unknown = Vec::new(); 162 + for repo in output.repos { 163 + let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 164 + if filter_ks.contains_key(&excl_key).into_diagnostic()? { 165 + continue; 166 + } 167 + let did_key = keys::repo_key(&repo.did); 168 + if !repos_ks.contains_key(&did_key).into_diagnostic()? { 169 + unknown.push(repo.did.into_static()); 170 + } 171 + } 172 + 173 + Ok(Some(PageResult { 174 + unknown_dids: unknown, 175 + next_cursor, 176 + count, 177 + })) 178 + }), 179 + ) 180 + .await 181 + } 182 + .into_diagnostic()? 183 + .map_err(|_| { 184 + error!( 185 + "spawn_blocking task for listReposByCollection timed out after {}s", 186 + BLOCKING_TASK_TIMEOUT.as_secs() 187 + ); 188 + miette::miette!("spawn_blocking task for listReposByCollection timed out") 189 + })?; 190 + 191 + let PageResult { 192 + unknown_dids, 193 + next_cursor, 194 + count, 195 + } = match page_result { 196 + Ok(Some(r)) => r, 197 + Ok(None) => { 198 + info!( 199 + %collection, 200 + total = total_count, 201 + new = new_count, 202 + "finished by-collection pass (empty page)" 203 + ); 204 + return Ok(()); 205 + } 206 + Err(e) => return Err(e).wrap_err("error parsing by-collection page"), 207 + }; 208 + 209 + total_count += count; 210 + trace!(%collection, count, "fetched repos from by-collection index"); 211 + 212 + let in_flight = self.in_flight.acquire(unknown_dids).await; 213 + new_count += in_flight.len(); 214 + self.stats.record_crawled(count); 215 + 216 + cursor = next_cursor; 217 + let cursor_update = cursor.as_ref().map(|c| CursorUpdate { 218 + key: cursor_key.clone(), 219 + value: c.as_bytes().to_vec(), 220 + }); 221 + 222 + self.tx 223 + .send(CrawlerBatch { 224 + guards: in_flight, 225 + cursor_update, 226 + }) 227 + .await 228 + .ok(); 229 + 230 + if cursor.is_none() { 231 + info!( 232 + %collection, 233 + total = total_count, 234 + new = new_count, 235 + "finished by-collection pass" 236 + ); 237 + return Ok(()); 238 + } 239 + } 240 + } 241 + }
+111 -966
src/crawler/mod.rs
··· 1 - use crate::crawler::throttle::ThrottleHandle; 2 - use crate::db::keys::crawler_cursor_key; 3 - use crate::db::{Db, keys, ser_repo_state}; 4 1 use crate::state::AppState; 5 - use crate::types::RepoState; 6 - use crate::util::WatchEnabledExt; 7 - use crate::util::{ErrorForStatus, RetryOutcome, RetryWithBackoff, parse_retry_after}; 8 - use chrono::{DateTime, TimeDelta, Utc}; 9 - use futures::FutureExt; 10 - use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput; 11 - use jacquard_api::com_atproto::sync::list_repos::ListReposOutput; 12 - use jacquard_common::{IntoStatic, types::string::Did}; 13 - use miette::{Context, IntoDiagnostic, Result}; 14 - use rand::Rng; 15 - use rand::RngExt; 16 - use rand::rngs::SmallRng; 17 - use reqwest::StatusCode; 2 + use futures::future::join_all; 3 + use jacquard_common::types::string::Did; 4 + use miette::Result; 18 5 use scc::HashSet; 19 - use serde::{Deserialize, Serialize}; 20 - use smol_str::{SmolStr, ToSmolStr, format_smolstr}; 21 - use std::collections::HashMap; 22 - use std::ops::{Add, Mul, Sub}; 6 + use smol_str::ToSmolStr; 23 7 use std::sync::Arc; 24 8 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; 25 9 use std::time::Duration; 26 - use tokio::sync::watch; 27 - use tracing::{Instrument, debug, error, info, trace, warn}; 10 + use tracing::info; 28 11 use url::Url; 29 12 30 - const MAX_RETRY_ATTEMPTS: u32 = 5; 31 - const MAX_RETRY_BATCH: usize = 1000; 13 + mod by_collection; 14 + mod relay; 15 + pub mod throttle; 16 + mod worker; 32 17 33 - #[derive(Debug, Serialize, Deserialize)] 34 - struct RetryState { 35 - after: DateTime<Utc>, 36 - duration: TimeDelta, 37 - attempts: u32, 38 - #[serde(serialize_with = "crate::util::ser_status_code")] 39 - #[serde(deserialize_with = "crate::util::deser_status_code")] 40 - status: Option<StatusCode>, 41 - } 18 + use throttle::Throttler; 42 19 43 - impl RetryState { 44 - fn new(secs: i64) -> Self { 45 - let duration = TimeDelta::seconds(secs); 46 - Self { 47 - duration, 48 - after: Utc::now().add(duration), 49 - attempts: 0, 50 - status: None, 51 - } 52 - } 20 + pub(crate) use by_collection::ByCollectionProducer; 21 + pub(crate) use relay::{RelayProducer, RetryProducer, SignalChecker}; 22 + pub(crate) use worker::CrawlerWorker; 53 23 54 - /// returns the next retry state with doubled duration and incremented attempt count, 55 - /// or `None` if the attempt count would reach the cap (entry left in db as-is). 56 - fn next_attempt(self) -> Option<Self> { 57 - let attempts = self.attempts + 1; 58 - if attempts >= MAX_RETRY_ATTEMPTS { 59 - return None; 60 - } 61 - let duration = self.duration * 2; 62 - Some(Self { 63 - after: Utc::now().add(duration), 64 - duration, 65 - attempts, 66 - status: None, 67 - }) 68 - } 24 + // -- InFlight ------------------------------------------------------------ 69 25 70 - fn with_status(mut self, code: StatusCode) -> Self { 71 - self.status = Some(code); 72 - self 73 - } 74 - } 26 + #[derive(Clone)] 27 + pub(crate) struct InFlight(Arc<HashSet<Did<'static>>>); 75 28 76 - trait ToRetryState { 77 - fn to_retry_state(&self) -> RetryState; 78 - } 79 - 80 - impl ToRetryState for ThrottleHandle { 81 - fn to_retry_state(&self) -> RetryState { 82 - let after = chrono::DateTime::from_timestamp_secs(self.throttled_until()).unwrap(); 83 - RetryState { 84 - duration: after.sub(Utc::now()), 85 - after, 86 - attempts: 0, 87 - status: None, 88 - } 29 + impl InFlight { 30 + pub(crate) fn new() -> Self { 31 + Self(Arc::new(HashSet::new())) 89 32 } 90 - } 91 33 92 - enum CrawlCheckResult { 93 - Signal, 94 - NoSignal, 95 - Retry(RetryState), 96 - } 97 - 98 - impl From<RetryState> for CrawlCheckResult { 99 - fn from(value: RetryState) -> Self { 100 - Self::Retry(value) 101 - } 102 - } 103 - 104 - fn is_throttle_worthy(e: &reqwest::Error) -> bool { 105 - use std::error::Error; 106 - 107 - if e.is_timeout() { 108 - return true; 109 - } 110 - 111 - let mut src = e.source(); 112 - while let Some(s) = src { 113 - if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 114 - if is_tls_cert_error(io_err) { 115 - return true; 34 + pub(crate) async fn acquire(&self, dids: Vec<Did<'static>>) -> Vec<InFlightGuard> { 35 + let mut guards = Vec::with_capacity(dids.len()); 36 + for did in dids { 37 + if self.0.insert_async(did.clone()).await.is_err() { 38 + continue; 116 39 } 40 + guards.push(InFlightGuard { 41 + set: self.clone(), 42 + did, 43 + }); 117 44 } 118 - src = s.source(); 119 - } 120 - 121 - e.status().map_or(false, |s| { 122 - matches!( 123 - s, 124 - StatusCode::BAD_GATEWAY 125 - | StatusCode::SERVICE_UNAVAILABLE 126 - | StatusCode::GATEWAY_TIMEOUT 127 - | crate::util::CONNECTION_TIMEOUT 128 - | crate::util::SITE_FROZEN 129 - ) 130 - }) 131 - } 132 - 133 - fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 134 - let Some(inner) = io_err.get_ref() else { 135 - return false; 136 - }; 137 - if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 138 - return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 139 - } 140 - if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 141 - return is_tls_cert_error(nested_io); 45 + guards 142 46 } 143 - false 144 47 } 145 48 146 - #[derive(Debug, Serialize, Deserialize)] 147 - enum Cursor { 148 - Done(SmolStr), 149 - Next(Option<SmolStr>), 49 + pub(super) struct InFlightGuard { 50 + set: InFlight, 51 + pub(super) did: Did<'static>, 150 52 } 151 53 152 - pub mod throttle; 153 - use throttle::{OrFailure, Throttler}; 154 - 155 - type InFlight = Arc<HashSet<Did<'static>>>; 156 - 157 - struct InFlightGuard { 158 - set: InFlight, 159 - did: Did<'static>, 54 + impl std::ops::Deref for InFlightGuard { 55 + type Target = Did<'static>; 56 + fn deref(&self) -> &Did<'static> { 57 + &self.did 58 + } 160 59 } 161 60 162 61 impl Drop for InFlightGuard { 163 62 fn drop(&mut self) { 164 - self.set.remove_sync(&self.did); 63 + self.set.0.remove_sync(&self.did); 165 64 } 166 65 } 167 66 168 - #[must_use] 169 - struct InFlightRepos { 170 - repos: Vec<Did<'static>>, 171 - guards: Vec<InFlightGuard>, 172 - } 67 + #[derive(Clone)] 68 + pub(crate) struct CrawlerStats(Arc<StatsInner>); 173 69 174 - pub struct Crawler { 175 - state: Arc<AppState>, 176 - relays: Vec<Url>, 177 - http: reqwest::Client, 178 - max_pending: usize, 179 - resume_pending: usize, 70 + struct StatsInner { 71 + /// repos committed to the db this interval 180 72 count: AtomicUsize, 73 + /// repos seen from relay/by-collection this interval 181 74 crawled_count: AtomicUsize, 182 75 throttled: AtomicBool, 183 76 pds_throttler: Throttler, 184 - in_flight: InFlight, 185 - enabled: watch::Receiver<bool>, 77 + state: Arc<AppState>, 78 + relays: Vec<Url>, 186 79 } 187 80 188 - impl Crawler { 189 - pub fn new( 190 - state: Arc<AppState>, 191 - relay_hosts: Vec<Url>, 192 - max_pending: usize, 193 - resume_pending: usize, 194 - enabled: watch::Receiver<bool>, 195 - ) -> Self { 196 - let http = reqwest::Client::builder() 197 - .user_agent(concat!( 198 - env!("CARGO_PKG_NAME"), 199 - "/", 200 - env!("CARGO_PKG_VERSION") 201 - )) 202 - .gzip(true) 203 - .build() 204 - .expect("that reqwest will build"); 205 - 206 - Self { 207 - state, 208 - relays: relay_hosts, 209 - http, 210 - max_pending, 211 - resume_pending, 81 + impl CrawlerStats { 82 + pub(crate) fn new(state: Arc<AppState>, relays: Vec<Url>, pds_throttler: Throttler) -> Self { 83 + Self(Arc::new(StatsInner { 212 84 count: AtomicUsize::new(0), 213 85 crawled_count: AtomicUsize::new(0), 214 86 throttled: AtomicBool::new(false), 215 - pds_throttler: Throttler::new(), 216 - in_flight: Arc::new(HashSet::new()), 217 - enabled, 218 - } 87 + pds_throttler, 88 + state, 89 + relays, 90 + })) 219 91 } 220 92 221 - async fn get_cursor(&self, relay_host: &Url) -> Result<Cursor> { 222 - let key = crawler_cursor_key(relay_host); 223 - let cursor_bytes = Db::get(self.state.db.cursors.clone(), &key).await?; 224 - let cursor: Cursor = cursor_bytes 225 - .as_deref() 226 - .map(rmp_serde::from_slice) 227 - .transpose() 228 - .into_diagnostic() 229 - .wrap_err("can't parse cursor")? 230 - .unwrap_or(Cursor::Next(None)); 231 - Ok(cursor) 93 + pub(crate) fn record_processed(&self, n: usize) { 94 + self.0.count.fetch_add(n, Ordering::Relaxed); 232 95 } 233 96 234 - pub async fn run(self) -> Result<()> { 235 - let crawler = Arc::new(self); 236 - 237 - // stats ticker 238 - let ticker = tokio::spawn({ 239 - use std::time::Instant; 240 - let crawler = crawler.clone(); 241 - let mut last_time = Instant::now(); 242 - let mut interval = tokio::time::interval(Duration::from_secs(60)); 243 - async move { 244 - loop { 245 - interval.tick().await; 246 - let delta_processed = crawler.count.swap(0, Ordering::Relaxed); 247 - let delta_crawled = crawler.crawled_count.swap(0, Ordering::Relaxed); 248 - let is_throttled = crawler.throttled.load(Ordering::Relaxed); 249 - 250 - crawler.pds_throttler.evict_clean().await; 251 - 252 - if delta_processed == 0 && delta_crawled == 0 { 253 - if is_throttled { 254 - info!("throttled: pending queue full"); 255 - } else { 256 - info!("idle: no repos crawled or processed in 60s"); 257 - } 258 - continue; 259 - } 260 - 261 - let elapsed = last_time.elapsed().as_secs_f64(); 262 - 263 - // fetch all cursors 264 - use futures::future::join_all; 265 - let cursor_futures: Vec<_> = crawler 266 - .relays 267 - .iter() 268 - .map(|relay_host| { 269 - let domain = relay_host.host_str().unwrap_or("unknown"); 270 - let relay_host = relay_host.clone(); 271 - let crawler = crawler.clone(); 272 - async move { 273 - let cursor_str = match crawler.get_cursor(&relay_host).await { 274 - Ok(c) => match c { 275 - Cursor::Done(c) => format_smolstr!("done({c})"), 276 - Cursor::Next(None) => "none".to_smolstr(), 277 - Cursor::Next(Some(c)) => c.to_smolstr(), 278 - }, 279 - Err(e) => e.to_smolstr(), 280 - }; 281 - format_smolstr!("{domain}={cursor_str}") 282 - } 283 - }) 284 - .collect(); 285 - 286 - let cursors: Vec<_> = join_all(cursor_futures).await.into_iter().collect(); 287 - 288 - let cursors_display = if cursors.is_empty() { 289 - "none".to_smolstr() 290 - } else { 291 - cursors.join(", ").into() 292 - }; 293 - 294 - info!( 295 - cursors = %cursors_display, 296 - processed = delta_processed, 297 - crawled = delta_crawled, 298 - elapsed, 299 - "progress" 300 - ); 301 - last_time = Instant::now(); 302 - } 303 - } 304 - }); 305 - tokio::spawn(async move { 306 - let Err(e) = ticker.await; 307 - error!(err = ?e, "stats ticker panicked, aborting"); 308 - std::process::abort(); 309 - }); 310 - 311 - // retry thread 312 - std::thread::spawn({ 313 - let crawler = crawler.clone(); 314 - let handle = tokio::runtime::Handle::current(); 315 - move || { 316 - use std::thread::sleep; 317 - 318 - let _g = handle.enter(); 319 - 320 - let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { 321 - loop { 322 - match crawler.process_retry_queue() { 323 - Ok(Some(dur)) => sleep(dur.max(Duration::from_secs(1))), 324 - Ok(None) => sleep(Duration::from_secs(60)), 325 - Err(e) => { 326 - error!(err = %e, "retry loop failed"); 327 - sleep(Duration::from_secs(60)); 328 - } 329 - } 330 - } 331 - })); 332 - if result.is_err() { 333 - error!("retry thread panicked, aborting"); 334 - std::process::abort(); 335 - } 336 - } 337 - }); 338 - 339 - info!( 340 - relay_count = crawler.relays.len(), 341 - hosts = ?crawler.relays, 342 - "starting crawler" 343 - ); 344 - 345 - let mut tasks = tokio::task::JoinSet::new(); 346 - for url in crawler.relays.clone() { 347 - let crawler = crawler.clone(); 348 - let mut enabled = crawler.enabled.clone(); 349 - let span = tracing::info_span!("crawl", %url); 350 - tasks.spawn( 351 - async move { 352 - loop { 353 - if let Err(e) = Self::crawl(crawler.clone(), &url, &mut enabled).await { 354 - error!(err = ?e, "fatal error, restarting in 30s"); 355 - tokio::time::sleep(Duration::from_secs(30)).await; 356 - } 357 - } 358 - } 359 - .instrument(span), 360 - ); 361 - } 362 - let _ = tasks.join_all().await; 363 - 364 - Ok(()) 97 + pub(crate) fn record_crawled(&self, n: usize) { 98 + self.0.crawled_count.fetch_add(n, Ordering::Relaxed); 365 99 } 366 100 367 - fn base_url(url: &Url) -> Result<Url> { 368 - let mut url = url.clone(); 369 - match url.scheme() { 370 - "wss" => url 371 - .set_scheme("https") 372 - .map_err(|_| miette::miette!("invalid url: {url}"))?, 373 - "ws" => url 374 - .set_scheme("http") 375 - .map_err(|_| miette::miette!("invalid url: {url}"))?, 376 - _ => {} 377 - } 378 - Ok(url) 101 + pub(crate) fn set_throttled(&self, v: bool) { 102 + self.0.throttled.store(v, Ordering::Relaxed); 379 103 } 380 104 381 - async fn crawl( 382 - crawler: Arc<Self>, 383 - relay_host: &Url, 384 - enabled: &mut watch::Receiver<bool>, 385 - ) -> Result<()> { 386 - let base_url = Self::base_url(relay_host)?; 387 - 388 - let mut rng: SmallRng = rand::make_rng(); 389 - let db = &crawler.state.db; 390 - 391 - let mut cursor = crawler.get_cursor(relay_host).await?; 392 - 393 - match &cursor { 394 - Cursor::Next(Some(c)) => info!(cursor = %c, "resuming"), 395 - Cursor::Next(None) => info!("starting from scratch"), 396 - Cursor::Done(c) => info!(cursor = %c, "was done, resuming"), 397 - } 398 - 399 - let mut was_throttled = false; 105 + pub(crate) async fn task(self) { 106 + use std::time::Instant; 107 + let mut last_time = Instant::now(); 108 + let mut interval = tokio::time::interval(Duration::from_secs(60)); 400 109 loop { 401 - enabled.wait_enabled("crawler").await; 110 + interval.tick().await; 111 + let delta_processed = self.0.count.swap(0, Ordering::Relaxed); 112 + let delta_crawled = self.0.crawled_count.swap(0, Ordering::Relaxed); 113 + let is_throttled = self.0.throttled.load(Ordering::Relaxed); 402 114 403 - // throttle check 404 - loop { 405 - let pending = crawler.state.db.get_count("pending").await; 406 - if pending > crawler.max_pending as u64 { 407 - if !was_throttled { 408 - debug!( 409 - pending, 410 - max = crawler.max_pending, 411 - "throttling: above max pending" 412 - ); 413 - was_throttled = true; 414 - crawler.throttled.store(true, Ordering::Relaxed); 415 - } 416 - tokio::select! { 417 - _ = tokio::time::sleep(Duration::from_secs(5)) => {} 418 - _ = enabled.changed() => { 419 - if !*enabled.borrow() { return Ok(()); } 420 - } 421 - } 422 - } else if pending > crawler.resume_pending as u64 { 423 - if !was_throttled { 424 - debug!( 425 - pending, 426 - resume = crawler.resume_pending, 427 - "throttling: entering cooldown" 428 - ); 429 - was_throttled = true; 430 - crawler.throttled.store(true, Ordering::Relaxed); 431 - } 115 + self.0.pds_throttler.evict_clean().await; 432 116 433 - loop { 434 - let current_pending = crawler.state.db.get_count("pending").await; 435 - if current_pending <= crawler.resume_pending as u64 { 436 - break; 437 - } 438 - debug!( 439 - pending = current_pending, 440 - resume = crawler.resume_pending, 441 - "cooldown, waiting" 442 - ); 443 - tokio::select! { 444 - _ = tokio::time::sleep(Duration::from_secs(5)) => {} 445 - _ = enabled.changed() => { 446 - if !*enabled.borrow() { return Ok(()); } 447 - } 448 - } 449 - } 450 - break; 117 + if delta_processed == 0 && delta_crawled == 0 { 118 + if is_throttled { 119 + info!("throttled: pending queue full"); 451 120 } else { 452 - if was_throttled { 453 - info!("throttling released"); 454 - was_throttled = false; 455 - crawler.throttled.store(false, Ordering::Relaxed); 456 - } 457 - break; 121 + info!("idle: no repos crawled or processed in 60s"); 458 122 } 123 + continue; 459 124 } 460 125 461 - let mut list_repos_url = base_url 462 - .join("/xrpc/com.atproto.sync.listRepos") 463 - .into_diagnostic()?; 464 - list_repos_url 465 - .query_pairs_mut() 466 - .append_pair("limit", "1000"); 467 - if let Cursor::Next(Some(c)) | Cursor::Done(c) = &cursor { 468 - list_repos_url 469 - .query_pairs_mut() 470 - .append_pair("cursor", c.as_str()); 471 - } 126 + let elapsed = last_time.elapsed().as_secs_f64(); 472 127 473 - let fetch_result = (|| { 474 - crawler 475 - .http 476 - .get(list_repos_url.clone()) 477 - .send() 478 - .error_for_status() 479 - }) 480 - .retry(5, |e: &reqwest::Error, attempt| { 481 - matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS)) 482 - .then(|| Duration::from_secs(1 << attempt.min(5))) 483 - }) 484 - .await; 485 - 486 - let res = match fetch_result { 487 - Ok(r) => r, 488 - Err(RetryOutcome::Ratelimited) => { 489 - warn!("rate limited by relay after retries"); 490 - continue; 491 - } 492 - Err(RetryOutcome::Failed(e)) => { 493 - error!(err = %e, "crawler failed to fetch listRepos"); 494 - continue; 495 - } 496 - }; 497 - 498 - let bytes = match res.bytes().await { 499 - Ok(b) => b, 500 - Err(e) => { 501 - error!(err = %e, "cant read listRepos response"); 502 - continue; 503 - } 504 - }; 505 - 506 - let mut batch = db.inner.batch(); 507 - let filter = crawler.state.filter.load(); 508 - 509 - struct ParseResult { 510 - unknown_dids: Vec<Did<'static>>, 511 - cursor: Option<smol_str::SmolStr>, 512 - count: usize, 513 - } 514 - 515 - const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30); 516 - 517 - let parse_result = { 518 - let repos = db.repos.clone(); 519 - let filter_ks = db.filter.clone(); 520 - let crawler_ks = db.crawler.clone(); 521 - 522 - // this wont actually cancel the task since spawn_blocking isnt cancel safe 523 - // but at least we'll see whats going on? 524 - tokio::time::timeout( 525 - BLOCKING_TASK_TIMEOUT, 526 - tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> { 527 - let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 528 - Ok(out) => out.into_static(), 529 - Err(e) => { 530 - error!(err = %e, "failed to parse listRepos response"); 531 - return Ok(None); 532 - } 533 - }; 534 - 535 - if output.repos.is_empty() { 536 - return Ok(None); 537 - } 538 - 539 - let count = output.repos.len(); 540 - let next_cursor = output.cursor.map(|c| c.as_str().into()); 541 - let mut unknown = Vec::new(); 542 - for repo in output.repos { 543 - let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 544 - if filter_ks.contains_key(&excl_key).into_diagnostic()? { 545 - continue; 546 - } 547 - 548 - // already in retry queue — let the retry thread handle it 549 - let retry_key = keys::crawler_retry_key(&repo.did); 550 - if crawler_ks.contains_key(&retry_key).into_diagnostic()? { 551 - continue; 552 - } 553 - 554 - let did_key = keys::repo_key(&repo.did); 555 - if !repos.contains_key(&did_key).into_diagnostic()? { 556 - unknown.push(repo.did.into_static()); 557 - } 558 - } 559 - 560 - Ok(Some(ParseResult { 561 - unknown_dids: unknown, 562 - cursor: next_cursor, 563 - count, 564 - })) 565 - }), 566 - ) 567 - .await 568 - } 569 - .into_diagnostic()? 570 - .map_err(|_| { 571 - error!( 572 - "spawn_blocking task for parsing listRepos timed out after {}", 573 - BLOCKING_TASK_TIMEOUT.as_secs() 574 - ); 575 - miette::miette!("spawn_blocking task for parsing listRepos timed out") 576 - })?; 577 - 578 - let ParseResult { 579 - unknown_dids, 580 - cursor: next_cursor, 581 - count, 582 - } = match parse_result { 583 - Ok(Some(res)) => res, 584 - Ok(None) => { 585 - info!("finished enumeration (or empty page)"); 586 - if let Cursor::Next(Some(c)) = cursor { 587 - info!("reached end of list."); 588 - cursor = Cursor::Done(c); 128 + let cursor_futures: Vec<_> = self 129 + .0 130 + .relays 131 + .iter() 132 + .map(|relay_host| { 133 + let domain = relay_host.host_str().unwrap_or("unknown").to_owned(); 134 + let relay_host = relay_host.clone(); 135 + let state = self.0.state.clone(); 136 + async move { 137 + let cursor = relay::cursor_display(&state, &relay_host).await; 138 + format!("{domain}={cursor}").into() 589 139 } 590 - info!("sleeping 1h before next enumeration pass"); 591 - tokio::select! { 592 - _ = tokio::time::sleep(Duration::from_secs(3600)) => { 593 - info!("resuming after 1h sleep"); 594 - } 595 - _ = enabled.changed() => { 596 - if !*enabled.borrow() { return Ok(()); } 597 - } 598 - } 599 - continue; 600 - } 601 - Err(e) => return Err(e).wrap_err("error while crawling"), 602 - }; 603 - 604 - debug!(count, "fetched repos"); 605 - crawler.crawled_count.fetch_add(count, Ordering::Relaxed); 606 - 607 - let in_flight = if filter.check_signals() && !unknown_dids.is_empty() { 608 - // we dont need to pass any existing since we have none; we are crawling after all 609 - crawler 610 - .check_signals_batch(&unknown_dids, &filter, &mut batch, &HashMap::new()) 611 - .await? 612 - } else { 613 - // no signal checking but still need dedup to avoid orphan pending entries 614 - crawler.acquire_in_flight(unknown_dids).await 615 - }; 616 - 617 - for did in &in_flight.repos { 618 - let did_key = keys::repo_key(did); 619 - trace!(did = %did, "found new repo"); 620 - 621 - let state = RepoState::untracked(rng.next_u64()); 622 - batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 623 - batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 624 - } 625 - 626 - if let Some(new_cursor) = next_cursor { 627 - cursor = Cursor::Next(Some(new_cursor.as_str().into())); 628 - } else if let Cursor::Next(Some(c)) = cursor { 629 - info!("reached end of list."); 630 - cursor = Cursor::Done(c); 631 - } 632 - batch.insert( 633 - &db.cursors, 634 - crawler_cursor_key(relay_host), 635 - rmp_serde::to_vec(&cursor) 636 - .into_diagnostic() 637 - .wrap_err("cant serialize cursor")?, 638 - ); 639 - 640 - tokio::time::timeout( 641 - BLOCKING_TASK_TIMEOUT, 642 - tokio::task::spawn_blocking(move || batch.commit().into_diagnostic()), 643 - ) 644 - .await 645 - .into_diagnostic()? 646 - .map_err(|_| { 647 - error!( 648 - "spawn_blocking task for batch commit timed out after {}", 649 - BLOCKING_TASK_TIMEOUT.as_secs() 650 - ); 651 - miette::miette!("spawn_blocking task for batch commit timed out") 652 - })? 653 - .inspect_err(|e| { 654 - error!(err = ?e, "batch commit failed"); 655 - }) 656 - .ok(); 657 - 658 - drop(in_flight.guards); 659 - 660 - crawler.account_new_repos(in_flight.repos.len()).await; 661 - 662 - if matches!(cursor, Cursor::Done(_)) { 663 - info!("enumeration complete, sleeping 1h before next pass"); 664 - tokio::select! { 665 - _ = tokio::time::sleep(Duration::from_secs(3600)) => { 666 - info!("resuming after 1h sleep"); 667 - } 668 - _ = enabled.changed() => { 669 - if !*enabled.borrow() { return Ok(()); } 670 - } 671 - } 672 - } 673 - } 674 - } 675 - 676 - fn process_retry_queue(&self) -> Result<Option<Duration>> { 677 - let db = &self.state.db; 678 - let now = Utc::now(); 679 - 680 - let mut ready: Vec<Did> = Vec::new(); 681 - let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new(); 682 - let mut next_wake: Option<Duration> = None; 683 - let mut had_more = false; 684 - 685 - let mut rng: SmallRng = rand::make_rng(); 686 - 687 - let mut batch = db.inner.batch(); 688 - for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 689 - let (key, val) = guard.into_inner().into_diagnostic()?; 690 - let state: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?; 691 - let did = keys::crawler_retry_parse_key(&key)?.to_did(); 692 - 693 - // leave capped entries alone for API inspection 694 - if state.attempts >= MAX_RETRY_ATTEMPTS { 695 - continue; 696 - } 697 - 698 - let backoff = TimeDelta::seconds( 699 - state 700 - .duration 701 - .as_seconds_f64() 702 - .mul(rng.random_range(0.01..0.07)) as i64, 703 - ); 704 - if state.after + backoff > now { 705 - let wake = (state.after - now).to_std().unwrap_or(Duration::ZERO); 706 - next_wake = Some(next_wake.map(|w| w.min(wake)).unwrap_or(wake)); 707 - continue; 708 - } 709 - 710 - if ready.len() >= MAX_RETRY_BATCH { 711 - had_more = true; 712 - break; 713 - } 714 - 715 - ready.push(did.clone()); 716 - existing.insert(did, state); 717 - } 718 - 719 - if ready.is_empty() { 720 - return Ok(next_wake); 721 - } 722 - 723 - debug!(count = ready.len(), "retrying pending repos"); 724 - 725 - let handle = tokio::runtime::Handle::current(); 726 - let filter = self.state.filter.load(); 727 - let in_flight = 728 - handle.block_on(self.check_signals_batch(&ready, &filter, &mut batch, &existing))?; 729 - 730 - let mut rng: SmallRng = rand::make_rng(); 731 - for did in &in_flight.repos { 732 - let did_key = keys::repo_key(did); 733 - 734 - if db.repos.contains_key(&did_key).into_diagnostic()? { 735 - continue; 736 - } 737 - 738 - let state = RepoState::untracked(rng.next_u64()); 739 - batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 740 - batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 741 - } 742 - 743 - batch.commit().into_diagnostic()?; 744 - 745 - drop(in_flight.guards); 746 - 747 - if !in_flight.repos.is_empty() { 748 - info!(count = in_flight.repos.len(), "recovered from retry queue"); 749 - handle.block_on(self.account_new_repos(in_flight.repos.len())); 750 - } 751 - 752 - // if we hit the batch cap there are more ready entries, loop back immediately 753 - Ok(had_more.then_some(Duration::ZERO).or(next_wake)) 754 - } 755 - 756 - fn check_repo_signals( 757 - &self, 758 - filter: Arc<crate::filter::FilterConfig>, 759 - did: Did<'static>, 760 - ) -> impl Future<Output = (Did<'static>, CrawlCheckResult)> + Send + 'static { 761 - let resolver = self.state.resolver.clone(); 762 - let http = self.http.clone(); 763 - let throttler = self.pds_throttler.clone(); 764 - async move { 765 - const MAX_RETRIES: u32 = 5; 766 - 767 - let pds_url = (|| resolver.resolve_identity_info(&did)) 768 - .retry(MAX_RETRIES, |e, attempt| { 769 - matches!(e, crate::resolver::ResolverError::Ratelimited) 770 - .then(|| Duration::from_secs(1 << attempt.min(5))) 771 140 }) 772 - .await; 773 - 774 - let pds_url = match pds_url { 775 - Ok((url, _)) => url, 776 - Err(RetryOutcome::Ratelimited) => { 777 - error!( 778 - retries = MAX_RETRIES, 779 - "rate limited resolving identity, giving up" 780 - ); 781 - // no pds handle to read retry_after from; use a short default 782 - return (did, RetryState::new(60).into()); 783 - } 784 - Err(RetryOutcome::Failed(e)) => { 785 - error!(err = %e, "failed to resolve identity"); 786 - return (did, RetryState::new(60).into()); 787 - } 788 - }; 141 + .collect(); 789 142 790 - let throttle = throttler.get_handle(&pds_url).await; 791 - if throttle.is_throttled() { 792 - trace!(host = pds_url.host_str(), "skipping throttled pds"); 793 - return (did, throttle.to_retry_state().into()); 794 - } 795 - 796 - let _permit = throttle.acquire().unit_error().or_failure(&throttle, || ()); 797 - let Ok(_permit) = _permit.await else { 798 - trace!( 799 - host = pds_url.host_str(), 800 - "pds failed while waiting for permit" 801 - ); 802 - return (did, throttle.to_retry_state().into()); 143 + let cursors: Vec<smol_str::SmolStr> = 144 + join_all(cursor_futures).await.into_iter().collect(); 145 + let cursors_display = if cursors.is_empty() { 146 + "none".to_smolstr() 147 + } else { 148 + cursors.join(", ").into() 803 149 }; 804 150 805 - enum RequestError { 806 - Reqwest(reqwest::Error), 807 - RateLimited(Option<u64>), 808 - /// hard failure notification from another task on this PDS 809 - Throttled, 810 - } 811 - 812 - let mut describe_url = pds_url.join("/xrpc/com.atproto.repo.describeRepo").unwrap(); 813 - describe_url.query_pairs_mut().append_pair("repo", &did); 814 - 815 - let resp = async { 816 - let resp = http 817 - .get(describe_url) 818 - .timeout(throttle.timeout()) 819 - .send() 820 - .await 821 - .map_err(RequestError::Reqwest)?; 822 - 823 - // dont retry ratelimits since we will just put it in a queue to be tried again later 824 - if resp.status() == StatusCode::TOO_MANY_REQUESTS { 825 - return Err(RequestError::RateLimited(parse_retry_after(&resp))); 826 - } 827 - 828 - resp.error_for_status().map_err(RequestError::Reqwest) 829 - } 830 - .or_failure(&throttle, || RequestError::Throttled) 831 - .await; 832 - 833 - let resp = match resp { 834 - Ok(r) => { 835 - throttle.record_success(); 836 - r 837 - } 838 - Err(RequestError::RateLimited(secs)) => { 839 - throttle.record_ratelimit(secs); 840 - return ( 841 - did, 842 - throttle 843 - .to_retry_state() 844 - .with_status(StatusCode::TOO_MANY_REQUESTS) 845 - .into(), 846 - ); 847 - } 848 - Err(RequestError::Throttled) => { 849 - return (did, throttle.to_retry_state().into()); 850 - } 851 - Err(RequestError::Reqwest(e)) => { 852 - if e.is_timeout() && !throttle.record_timeout() { 853 - // first or second timeout, just requeue 854 - let mut retry_state = RetryState::new(60); 855 - retry_state.status = e.status(); 856 - return (did, retry_state.into()); 857 - } 858 - // third timeout, if timeout fail is_throttle_worthy will ban the pds 859 - 860 - if is_throttle_worthy(&e) { 861 - if let Some(mins) = throttle.record_failure() { 862 - warn!(url = %pds_url, mins, "throttling pds due to hard failure"); 863 - } 864 - let mut retry_state = throttle.to_retry_state(); 865 - retry_state.status = e.status(); 866 - return (did, retry_state.into()); 867 - } 868 - 869 - match e.status() { 870 - Some(StatusCode::NOT_FOUND | StatusCode::GONE) => { 871 - trace!("repo not found"); 872 - return (did, CrawlCheckResult::NoSignal); 873 - } 874 - Some(s) if s.is_client_error() => { 875 - error!(status = %s, "repo unavailable"); 876 - return (did, CrawlCheckResult::NoSignal); 877 - } 878 - _ => { 879 - error!(err = %e, "repo errored"); 880 - let mut retry_state = RetryState::new(60 * 15); 881 - retry_state.status = e.status(); 882 - return (did, retry_state.into()); 883 - } 884 - } 885 - } 886 - }; 887 - 888 - let bytes = match resp.bytes().await { 889 - Ok(b) => b, 890 - Err(e) => { 891 - error!(err = %e, "failed to read describeRepo response"); 892 - return (did, RetryState::new(60 * 5).into()); 893 - } 894 - }; 895 - 896 - let out = match serde_json::from_slice::<DescribeRepoOutput>(&bytes) { 897 - Ok(out) => out, 898 - Err(e) => { 899 - error!(err = %e, "failed to parse describeRepo response"); 900 - return (did, RetryState::new(60 * 10).into()); 901 - } 902 - }; 903 - 904 - let found_signal = out 905 - .collections 906 - .iter() 907 - .any(|col| filter.matches_signal(col.as_str())); 908 - 909 - if !found_signal { 910 - trace!("no signal-matching collections found"); 911 - } 912 - 913 - return ( 914 - did, 915 - found_signal 916 - .then_some(CrawlCheckResult::Signal) 917 - .unwrap_or(CrawlCheckResult::NoSignal), 151 + info!( 152 + cursors = %cursors_display, 153 + processed = delta_processed, 154 + crawled = delta_crawled, 155 + elapsed, 156 + "progress" 918 157 ); 919 - } 920 - } 921 - 922 - async fn check_signals_batch( 923 - &self, 924 - repos: &[Did<'static>], 925 - filter: &Arc<crate::filter::FilterConfig>, 926 - batch: &mut fjall::OwnedWriteBatch, 927 - existing: &HashMap<Did<'static>, RetryState>, 928 - ) -> Result<InFlightRepos> { 929 - let db = &self.state.db; 930 - let in_flight = self.acquire_in_flight(repos.to_vec()).await; 931 - let mut valid = Vec::with_capacity(in_flight.repos.len()); 932 - let mut set = tokio::task::JoinSet::new(); 933 - 934 - for did in in_flight.repos { 935 - let filter = filter.clone(); 936 - let span = tracing::info_span!("signals", did = %did); 937 - set.spawn( 938 - self.check_repo_signals(filter, did.clone()) 939 - .instrument(span), 940 - ); 941 - } 942 - 943 - while let Some(res) = tokio::time::timeout(Duration::from_secs(60), set.join_next()) 944 - .await 945 - .into_diagnostic() 946 - .map_err(|_| { 947 - error!("signal check task timed out after 60s"); 948 - miette::miette!("signal check task timed out") 949 - })? 950 - { 951 - let (did, result) = match res { 952 - Ok(inner) => inner, 953 - Err(e) => { 954 - error!(err = ?e, "signal check panicked"); 955 - continue; 956 - } 957 - }; 958 - 959 - match result { 960 - CrawlCheckResult::Signal => { 961 - batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 962 - valid.push(did); 963 - } 964 - CrawlCheckResult::NoSignal => { 965 - batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 966 - } 967 - CrawlCheckResult::Retry(state) => { 968 - let prev_attempts = existing.get(&did).map(|s| s.attempts).unwrap_or(0); 969 - let carried = RetryState { 970 - attempts: prev_attempts, 971 - ..state 972 - }; 973 - let next = match carried.next_attempt() { 974 - Some(next) => next, 975 - None => RetryState { 976 - attempts: MAX_RETRY_ATTEMPTS, 977 - ..state 978 - }, 979 - }; 980 - batch.insert( 981 - &db.crawler, 982 - keys::crawler_retry_key(&did), 983 - rmp_serde::to_vec(&next).into_diagnostic()?, 984 - ); 985 - } 986 - } 158 + last_time = Instant::now(); 987 159 } 988 - 989 - Ok(InFlightRepos { 990 - repos: valid, 991 - guards: in_flight.guards, 992 - }) 993 160 } 161 + } 994 162 995 - async fn acquire_in_flight(&self, dids: Vec<Did<'static>>) -> InFlightRepos { 996 - let mut filtered = Vec::with_capacity(dids.len()); 997 - let mut guards = Vec::with_capacity(dids.len()); 998 - for did in dids { 999 - if self.in_flight.insert_async(did.clone()).await.is_err() { 1000 - trace!(did = %did, "repo in-flight, skipping"); 1001 - continue; 1002 - } 1003 - guards.push(InFlightGuard { 1004 - set: self.in_flight.clone(), 1005 - did: did.clone(), 1006 - }); 1007 - filtered.push(did); 1008 - } 1009 - InFlightRepos { 1010 - guards, 1011 - repos: filtered, 1012 - } 163 + /// normalizes a relay URL to http/https for XRPC requests. 164 + pub(super) fn base_url(url: &Url) -> Result<Url> { 165 + let mut url = url.clone(); 166 + match url.scheme() { 167 + "wss" => url 168 + .set_scheme("https") 169 + .map_err(|_| miette::miette!("invalid url: {url}"))?, 170 + "ws" => url 171 + .set_scheme("http") 172 + .map_err(|_| miette::miette!("invalid url: {url}"))?, 173 + _ => {} 1013 174 } 1014 - 1015 - async fn account_new_repos(&self, count: usize) { 1016 - if count == 0 { 1017 - return; 1018 - } 1019 - 1020 - self.count.fetch_add(count, Ordering::Relaxed); 1021 - self.state 1022 - .db 1023 - .update_count_async("repos", count as i64) 1024 - .await; 1025 - self.state 1026 - .db 1027 - .update_count_async("pending", count as i64) 1028 - .await; 1029 - self.state.notify_backfill(); 1030 - } 175 + Ok(url) 1031 176 }
+755
src/crawler/relay.rs
··· 1 + use crate::crawler::throttle::{OrFailure, ThrottleHandle, Throttler}; 2 + use crate::db::keys::crawler_cursor_key; 3 + use crate::db::{Db, keys}; 4 + use crate::state::AppState; 5 + use crate::util::{ 6 + ErrorForStatus, RetryOutcome, RetryWithBackoff, WatchEnabledExt, parse_retry_after, 7 + }; 8 + use chrono::{DateTime, TimeDelta, Utc}; 9 + use fjall::OwnedWriteBatch; 10 + use futures::FutureExt; 11 + use jacquard_api::com_atproto::repo::describe_repo::DescribeRepoOutput; 12 + use jacquard_api::com_atproto::sync::list_repos::ListReposOutput; 13 + use jacquard_common::{IntoStatic, types::string::Did}; 14 + use miette::{Context, IntoDiagnostic, Result}; 15 + use rand::RngExt; 16 + use rand::rngs::SmallRng; 17 + use reqwest::StatusCode; 18 + use serde::{Deserialize, Serialize}; 19 + use smol_str::{SmolStr, ToSmolStr}; 20 + use std::collections::HashMap; 21 + use std::ops::{Add, Mul, Sub}; 22 + use std::sync::Arc; 23 + use std::time::Duration; 24 + use tokio::sync::{mpsc, watch}; 25 + use tracing::{Instrument, debug, error, info, trace, warn}; 26 + use url::Url; 27 + 28 + use super::worker::{CrawlerBatch, CursorUpdate}; 29 + use super::{CrawlerStats, InFlight, InFlightGuard, base_url}; 30 + 31 + pub(super) const MAX_RETRY_ATTEMPTS: u32 = 5; 32 + const MAX_RETRY_BATCH: usize = 1000; 33 + const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30); 34 + 35 + #[derive(Debug, Serialize, Deserialize)] 36 + pub(super) struct RetryState { 37 + pub(super) after: DateTime<Utc>, 38 + pub(super) duration: TimeDelta, 39 + pub(super) attempts: u32, 40 + #[serde(serialize_with = "crate::util::ser_status_code")] 41 + #[serde(deserialize_with = "crate::util::deser_status_code")] 42 + pub(super) status: Option<StatusCode>, 43 + } 44 + 45 + impl RetryState { 46 + pub(super) fn new(secs: i64) -> Self { 47 + let duration = TimeDelta::seconds(secs); 48 + Self { 49 + duration, 50 + after: Utc::now().add(duration), 51 + attempts: 0, 52 + status: None, 53 + } 54 + } 55 + 56 + /// returns the next retry state or `None` if the attempt count would reach the cap. 57 + pub(super) fn next_attempt(self) -> Option<Self> { 58 + let attempts = self.attempts + 1; 59 + if attempts >= MAX_RETRY_ATTEMPTS { 60 + return None; 61 + } 62 + let duration = self.duration * 2; 63 + Some(Self { 64 + after: Utc::now().add(duration), 65 + duration, 66 + attempts, 67 + status: None, 68 + }) 69 + } 70 + 71 + pub(super) fn with_status(mut self, code: StatusCode) -> Self { 72 + self.status = Some(code); 73 + self 74 + } 75 + } 76 + 77 + pub(super) enum CrawlCheckResult { 78 + Signal, 79 + NoSignal, 80 + Retry(RetryState), 81 + } 82 + 83 + impl From<RetryState> for CrawlCheckResult { 84 + fn from(value: RetryState) -> Self { 85 + Self::Retry(value) 86 + } 87 + } 88 + 89 + trait ToRetryState { 90 + fn to_retry_state(&self) -> RetryState; 91 + } 92 + 93 + impl ToRetryState for ThrottleHandle { 94 + fn to_retry_state(&self) -> RetryState { 95 + let after = chrono::DateTime::from_timestamp_secs(self.throttled_until()).unwrap(); 96 + RetryState { 97 + duration: after.sub(Utc::now()), 98 + after, 99 + attempts: 0, 100 + status: None, 101 + } 102 + } 103 + } 104 + 105 + fn is_throttle_worthy(e: &reqwest::Error) -> bool { 106 + use std::error::Error; 107 + 108 + if e.is_timeout() { 109 + return true; 110 + } 111 + 112 + let mut src = e.source(); 113 + while let Some(s) = src { 114 + if let Some(io_err) = s.downcast_ref::<std::io::Error>() { 115 + if is_tls_cert_error(io_err) { 116 + return true; 117 + } 118 + } 119 + src = s.source(); 120 + } 121 + 122 + e.status().map_or(false, |s| { 123 + matches!( 124 + s, 125 + StatusCode::BAD_GATEWAY 126 + | StatusCode::SERVICE_UNAVAILABLE 127 + | StatusCode::GATEWAY_TIMEOUT 128 + | crate::util::CONNECTION_TIMEOUT 129 + | crate::util::SITE_FROZEN 130 + ) 131 + }) 132 + } 133 + 134 + fn is_tls_cert_error(io_err: &std::io::Error) -> bool { 135 + let Some(inner) = io_err.get_ref() else { 136 + return false; 137 + }; 138 + if let Some(rustls_err) = inner.downcast_ref::<rustls::Error>() { 139 + return matches!(rustls_err, rustls::Error::InvalidCertificate(_)); 140 + } 141 + if let Some(nested_io) = inner.downcast_ref::<std::io::Error>() { 142 + return is_tls_cert_error(nested_io); 143 + } 144 + false 145 + } 146 + 147 + // -- SignalChecker -------------------------------------------------------- 148 + 149 + /// shared describeRepo signal-checking logic used by both relay and retry producers. 150 + #[derive(Clone)] 151 + pub(crate) struct SignalChecker { 152 + pub(crate) http: reqwest::Client, 153 + pub(crate) state: Arc<AppState>, 154 + pub(crate) throttler: Throttler, 155 + } 156 + 157 + impl SignalChecker { 158 + fn check_repo_signals( 159 + &self, 160 + filter: Arc<crate::filter::FilterConfig>, 161 + did: Did<'static>, 162 + ) -> impl Future<Output = (Did<'static>, CrawlCheckResult)> + Send + 'static { 163 + let resolver = self.state.resolver.clone(); 164 + let http = self.http.clone(); 165 + let throttler = self.throttler.clone(); 166 + async move { 167 + const MAX_RETRIES: u32 = 5; 168 + 169 + let pds_url = (|| resolver.resolve_identity_info(&did)) 170 + .retry(MAX_RETRIES, |e, attempt| { 171 + matches!(e, crate::resolver::ResolverError::Ratelimited) 172 + .then(|| Duration::from_secs(1 << attempt.min(5))) 173 + }) 174 + .await; 175 + 176 + let pds_url = match pds_url { 177 + Ok((url, _)) => url, 178 + Err(RetryOutcome::Ratelimited) => { 179 + error!( 180 + retries = MAX_RETRIES, 181 + "rate limited resolving identity, giving up" 182 + ); 183 + return (did, RetryState::new(60).into()); 184 + } 185 + Err(RetryOutcome::Failed(e)) => { 186 + error!(err = %e, "failed to resolve identity"); 187 + return (did, RetryState::new(60).into()); 188 + } 189 + }; 190 + 191 + let throttle = throttler.get_handle(&pds_url).await; 192 + if throttle.is_throttled() { 193 + trace!(host = pds_url.host_str(), "skipping throttled pds"); 194 + return (did, throttle.to_retry_state().into()); 195 + } 196 + 197 + let _permit = throttle.acquire().unit_error().or_failure(&throttle, || ()); 198 + let Ok(_permit) = _permit.await else { 199 + trace!( 200 + host = pds_url.host_str(), 201 + "pds failed while waiting for permit" 202 + ); 203 + return (did, throttle.to_retry_state().into()); 204 + }; 205 + 206 + enum RequestError { 207 + Reqwest(reqwest::Error), 208 + RateLimited(Option<u64>), 209 + Throttled, 210 + } 211 + 212 + let mut describe_url = pds_url.join("/xrpc/com.atproto.repo.describeRepo").unwrap(); 213 + describe_url.query_pairs_mut().append_pair("repo", &did); 214 + 215 + let resp = async { 216 + let resp = http 217 + .get(describe_url) 218 + .timeout(throttle.timeout()) 219 + .send() 220 + .await 221 + .map_err(RequestError::Reqwest)?; 222 + if resp.status() == StatusCode::TOO_MANY_REQUESTS { 223 + return Err(RequestError::RateLimited(parse_retry_after(&resp))); 224 + } 225 + resp.error_for_status().map_err(RequestError::Reqwest) 226 + } 227 + .or_failure(&throttle, || RequestError::Throttled) 228 + .await; 229 + 230 + let resp = match resp { 231 + Ok(r) => { 232 + throttle.record_success(); 233 + r 234 + } 235 + Err(RequestError::RateLimited(secs)) => { 236 + throttle.record_ratelimit(secs); 237 + return ( 238 + did, 239 + throttle 240 + .to_retry_state() 241 + .with_status(StatusCode::TOO_MANY_REQUESTS) 242 + .into(), 243 + ); 244 + } 245 + Err(RequestError::Throttled) => { 246 + return (did, throttle.to_retry_state().into()); 247 + } 248 + Err(RequestError::Reqwest(e)) => { 249 + if e.is_timeout() && !throttle.record_timeout() { 250 + let mut retry_state = RetryState::new(60); 251 + retry_state.status = e.status(); 252 + return (did, retry_state.into()); 253 + } 254 + if is_throttle_worthy(&e) { 255 + if let Some(mins) = throttle.record_failure() { 256 + warn!(url = %pds_url, mins, "throttling pds due to hard failure"); 257 + } 258 + let mut retry_state = throttle.to_retry_state(); 259 + retry_state.status = e.status(); 260 + return (did, retry_state.into()); 261 + } 262 + match e.status() { 263 + Some(StatusCode::NOT_FOUND | StatusCode::GONE) => { 264 + trace!("repo not found"); 265 + return (did, CrawlCheckResult::NoSignal); 266 + } 267 + Some(s) if s.is_client_error() => { 268 + error!(status = %s, "repo unavailable"); 269 + return (did, CrawlCheckResult::NoSignal); 270 + } 271 + _ => { 272 + error!(err = %e, "repo errored"); 273 + let mut retry_state = RetryState::new(60 * 15); 274 + retry_state.status = e.status(); 275 + return (did, retry_state.into()); 276 + } 277 + } 278 + } 279 + }; 280 + 281 + let bytes = match resp.bytes().await { 282 + Ok(b) => b, 283 + Err(e) => { 284 + error!(err = %e, "failed to read describeRepo response"); 285 + return (did, RetryState::new(60 * 5).into()); 286 + } 287 + }; 288 + 289 + let out = match serde_json::from_slice::<DescribeRepoOutput>(&bytes) { 290 + Ok(out) => out, 291 + Err(e) => { 292 + error!(err = %e, "failed to parse describeRepo response"); 293 + return (did, RetryState::new(60 * 10).into()); 294 + } 295 + }; 296 + 297 + let found_signal = out 298 + .collections 299 + .iter() 300 + .any(|col| filter.matches_signal(col.as_str())); 301 + if !found_signal { 302 + trace!("no signal-matching collections found"); 303 + } 304 + 305 + ( 306 + did, 307 + found_signal 308 + .then_some(CrawlCheckResult::Signal) 309 + .unwrap_or(CrawlCheckResult::NoSignal), 310 + ) 311 + } 312 + } 313 + 314 + /// checks signals for each DID in `in_flight`, writes retry/no-signal state changes 315 + /// into `batch`, and returns a vec containing only the guards for confirmed DIDs. 316 + /// 317 + /// guards for non-confirmed DIDs are dropped here, releasing their in-flight slots. 318 + /// confirmed DIDs' retry entries are NOT removed here — the worker removes them 319 + /// atomically with the pending insert. 320 + pub(super) async fn check_signals_batch( 321 + &self, 322 + in_flight: Vec<InFlightGuard>, 323 + filter: &Arc<crate::filter::FilterConfig>, 324 + batch: &mut OwnedWriteBatch, 325 + existing: &HashMap<Did<'static>, RetryState>, 326 + ) -> Result<Vec<InFlightGuard>> { 327 + let db = &self.state.db; 328 + let mut valid: Vec<Did<'static>> = Vec::new(); 329 + let mut set = tokio::task::JoinSet::new(); 330 + 331 + for guard in &in_flight { 332 + let filter = filter.clone(); 333 + let did = guard.did.clone(); 334 + let span = tracing::info_span!("signals", did = %did); 335 + set.spawn(self.check_repo_signals(filter, did).instrument(span)); 336 + } 337 + 338 + while let Some(res) = tokio::time::timeout(Duration::from_secs(60), set.join_next()) 339 + .await 340 + .into_diagnostic() 341 + .map_err(|_| { 342 + error!("signal check task timed out after 60s"); 343 + miette::miette!("signal check task timed out") 344 + })? 345 + { 346 + let (did, result) = match res { 347 + Ok(inner) => inner, 348 + Err(e) => { 349 + error!(err = ?e, "signal check panicked"); 350 + continue; 351 + } 352 + }; 353 + 354 + match result { 355 + CrawlCheckResult::Signal => { 356 + valid.push(did); 357 + } 358 + CrawlCheckResult::NoSignal => { 359 + batch.remove(&db.crawler, keys::crawler_retry_key(&did)); 360 + } 361 + CrawlCheckResult::Retry(state) => { 362 + let prev_attempts = existing.get(&did).map(|s| s.attempts).unwrap_or(0); 363 + let carried = RetryState { 364 + attempts: prev_attempts, 365 + ..state 366 + }; 367 + let next = match carried.next_attempt() { 368 + Some(next) => next, 369 + None => RetryState { 370 + attempts: MAX_RETRY_ATTEMPTS, 371 + ..state 372 + }, 373 + }; 374 + batch.insert( 375 + &db.crawler, 376 + keys::crawler_retry_key(&did), 377 + rmp_serde::to_vec(&next).into_diagnostic()?, 378 + ); 379 + } 380 + } 381 + } 382 + 383 + let valid_set: std::collections::HashSet<&Did<'static>> = valid.iter().collect(); 384 + Ok(in_flight 385 + .into_iter() 386 + .filter(|g| valid_set.contains(&**g)) 387 + .collect()) 388 + } 389 + } 390 + 391 + // -- RelayProducer -------------------------------------------------------- 392 + 393 + pub(crate) struct RelayProducer { 394 + pub(crate) relay_url: Url, 395 + pub(crate) checker: SignalChecker, 396 + pub(crate) in_flight: InFlight, 397 + pub(crate) tx: mpsc::Sender<CrawlerBatch>, 398 + pub(crate) enabled: watch::Receiver<bool>, 399 + pub(crate) stats: CrawlerStats, 400 + } 401 + 402 + impl RelayProducer { 403 + pub(crate) async fn run(mut self) -> Result<()> { 404 + loop { 405 + if let Err(e) = self.crawl().await { 406 + error!(err = ?e, relay = %self.relay_url, "fatal relay crawl error, restarting in 30s"); 407 + tokio::time::sleep(Duration::from_secs(30)).await; 408 + } 409 + } 410 + } 411 + 412 + async fn get_cursor(&self) -> Result<Option<SmolStr>> { 413 + let key = crawler_cursor_key(self.relay_url.as_str()); 414 + let cursor_bytes = Db::get(self.checker.state.db.cursors.clone(), &key).await?; 415 + Ok(cursor_bytes 416 + .as_deref() 417 + .and_then(|b| rmp_serde::from_slice::<SmolStr>(b).ok())) 418 + } 419 + 420 + async fn crawl(&mut self) -> Result<()> { 421 + let db = &self.checker.state.db; 422 + let base = base_url(&self.relay_url)?; 423 + 424 + let mut cursor = self.get_cursor().await?; 425 + match &cursor { 426 + Some(c) => info!(cursor = %c, "resuming"), 427 + None => info!("starting from scratch"), 428 + } 429 + 430 + loop { 431 + self.enabled.wait_enabled("crawler").await; 432 + 433 + let mut url = base 434 + .join("/xrpc/com.atproto.sync.listRepos") 435 + .into_diagnostic()?; 436 + url.query_pairs_mut().append_pair("limit", "1000"); 437 + if let Some(c) = &cursor { 438 + url.query_pairs_mut().append_pair("cursor", c.as_str()); 439 + } 440 + 441 + let fetch_result = (|| self.checker.http.get(url.clone()).send().error_for_status()) 442 + .retry(5, |e: &reqwest::Error, attempt| { 443 + matches!(e.status(), Some(StatusCode::TOO_MANY_REQUESTS)) 444 + .then(|| Duration::from_secs(1 << attempt.min(5))) 445 + }) 446 + .await; 447 + 448 + let res = match fetch_result { 449 + Ok(r) => r, 450 + Err(RetryOutcome::Ratelimited) => { 451 + warn!("rate limited by relay after retries"); 452 + continue; 453 + } 454 + Err(RetryOutcome::Failed(e)) => { 455 + error!(err = %e, "crawler failed to fetch listRepos"); 456 + continue; 457 + } 458 + }; 459 + 460 + let bytes = match res.bytes().await { 461 + Ok(b) => b, 462 + Err(e) => { 463 + error!(err = %e, "cant read listRepos response"); 464 + continue; 465 + } 466 + }; 467 + 468 + let filter = self.checker.state.filter.load(); 469 + 470 + struct ParseResult { 471 + unknown_dids: Vec<Did<'static>>, 472 + cursor: Option<SmolStr>, 473 + count: usize, 474 + } 475 + 476 + let parse_result = { 477 + let repos = db.repos.clone(); 478 + let filter_ks = db.filter.clone(); 479 + let crawler_ks = db.crawler.clone(); 480 + 481 + tokio::time::timeout( 482 + BLOCKING_TASK_TIMEOUT, 483 + tokio::task::spawn_blocking(move || -> miette::Result<Option<ParseResult>> { 484 + let output = match serde_json::from_slice::<ListReposOutput>(&bytes) { 485 + Ok(out) => out.into_static(), 486 + Err(e) => { 487 + error!(err = %e, "failed to parse listRepos response"); 488 + return Ok(None); 489 + } 490 + }; 491 + if output.repos.is_empty() { 492 + return Ok(None); 493 + } 494 + let count = output.repos.len(); 495 + let next_cursor = output.cursor.map(|c| c.as_str().into()); 496 + let mut unknown = Vec::new(); 497 + for repo in output.repos { 498 + let excl_key = crate::db::filter::exclude_key(repo.did.as_str())?; 499 + if filter_ks.contains_key(&excl_key).into_diagnostic()? { 500 + continue; 501 + } 502 + let retry_key = keys::crawler_retry_key(&repo.did); 503 + if crawler_ks.contains_key(&retry_key).into_diagnostic()? { 504 + continue; 505 + } 506 + let did_key = keys::repo_key(&repo.did); 507 + if !repos.contains_key(&did_key).into_diagnostic()? { 508 + unknown.push(repo.did.into_static()); 509 + } 510 + } 511 + Ok(Some(ParseResult { 512 + unknown_dids: unknown, 513 + cursor: next_cursor, 514 + count, 515 + })) 516 + }), 517 + ) 518 + .await 519 + } 520 + .into_diagnostic()? 521 + .map_err(|_| { 522 + error!( 523 + "spawn_blocking task for parsing listRepos timed out after {}s", 524 + BLOCKING_TASK_TIMEOUT.as_secs() 525 + ); 526 + miette::miette!("spawn_blocking task for parsing listRepos timed out") 527 + })?; 528 + 529 + let ParseResult { 530 + unknown_dids, 531 + cursor: next_cursor, 532 + count, 533 + } = match parse_result { 534 + Ok(Some(res)) => res, 535 + Ok(None) => { 536 + info!("enumeration pass complete, sleeping 1h"); 537 + tokio::select! { 538 + _ = tokio::time::sleep(Duration::from_secs(3600)) => { 539 + info!("resuming after 1h sleep"); 540 + } 541 + _ = self.enabled.changed() => { 542 + if !*self.enabled.borrow() { return Ok(()); } 543 + } 544 + } 545 + continue; 546 + } 547 + Err(e) => return Err(e).wrap_err("error while crawling"), 548 + }; 549 + 550 + debug!(count, "fetched repos"); 551 + self.stats.record_crawled(count); 552 + 553 + let pass_complete; 554 + if let Some(new_cursor) = next_cursor { 555 + cursor = Some(new_cursor.as_str().into()); 556 + pass_complete = false; 557 + } else { 558 + info!("reached end of list."); 559 + pass_complete = true; 560 + } 561 + 562 + let cursor_update = cursor 563 + .as_ref() 564 + .map(|c| -> Result<CursorUpdate> { 565 + Ok(CursorUpdate { 566 + key: crawler_cursor_key(self.relay_url.as_str()), 567 + value: rmp_serde::to_vec(c.as_str()) 568 + .into_diagnostic() 569 + .wrap_err("cant serialize cursor")?, 570 + }) 571 + }) 572 + .transpose()?; 573 + 574 + let in_flight = self.in_flight.acquire(unknown_dids).await; 575 + let confirmed = if filter.check_signals() && !in_flight.is_empty() { 576 + let mut retry_batch = db.inner.batch(); 577 + let confirmed = self 578 + .checker 579 + .check_signals_batch(in_flight, &filter, &mut retry_batch, &HashMap::new()) 580 + .await?; 581 + tokio::time::timeout( 582 + BLOCKING_TASK_TIMEOUT, 583 + tokio::task::spawn_blocking(move || retry_batch.commit().into_diagnostic()), 584 + ) 585 + .await 586 + .into_diagnostic()? 587 + .map_err(|_| miette::miette!("retry state commit timed out"))? 588 + .inspect_err(|e| error!(err = ?e, "retry state commit failed")) 589 + .ok(); 590 + confirmed 591 + } else { 592 + in_flight 593 + }; 594 + 595 + self.tx 596 + .send(CrawlerBatch { 597 + guards: confirmed, 598 + cursor_update, 599 + }) 600 + .await 601 + .ok(); 602 + 603 + if pass_complete { 604 + info!("enumeration complete, sleeping 1h before next pass"); 605 + tokio::select! { 606 + _ = tokio::time::sleep(Duration::from_secs(3600)) => { 607 + info!("resuming after 1h sleep"); 608 + } 609 + _ = self.enabled.changed() => { 610 + if !*self.enabled.borrow() { return Ok(()); } 611 + } 612 + } 613 + } 614 + } 615 + } 616 + } 617 + 618 + // -- RelayProducer: cursor display for the stats ticker ------------------ 619 + 620 + /// formats the current cursor for a relay as a display string, used by the stats ticker. 621 + pub(super) async fn cursor_display(state: &AppState, relay_host: &Url) -> SmolStr { 622 + let key = crawler_cursor_key(relay_host.as_str()); 623 + let cursor_bytes = match Db::get(state.db.cursors.clone(), &key).await { 624 + Ok(b) => b, 625 + Err(e) => return e.to_smolstr(), 626 + }; 627 + match cursor_bytes 628 + .as_deref() 629 + .map(rmp_serde::from_slice::<SmolStr>) 630 + { 631 + None | Some(Err(_)) => "none".to_smolstr(), 632 + Some(Ok(c)) => c, 633 + } 634 + } 635 + 636 + // -- RetryProducer -------------------------------------------------------- 637 + 638 + /// re-checks signal for repos whose `describeRepo` previously failed or timed out. 639 + pub(crate) struct RetryProducer { 640 + pub(crate) checker: SignalChecker, 641 + pub(crate) in_flight: InFlight, 642 + pub(crate) tx: mpsc::Sender<CrawlerBatch>, 643 + } 644 + 645 + impl RetryProducer { 646 + pub(crate) async fn run(self) { 647 + loop { 648 + match self.process_queue().await { 649 + Ok(Some(dur)) => tokio::time::sleep(dur.max(Duration::from_secs(1))).await, 650 + Ok(None) => tokio::time::sleep(Duration::from_secs(60)).await, 651 + Err(e) => { 652 + error!(err = %e, "retry loop failed"); 653 + tokio::time::sleep(Duration::from_secs(60)).await; 654 + } 655 + } 656 + } 657 + } 658 + 659 + async fn process_queue(&self) -> Result<Option<Duration>> { 660 + let db = self.checker.state.db.clone(); 661 + 662 + struct ScanResult { 663 + ready: Vec<Did<'static>>, 664 + existing: HashMap<Did<'static>, RetryState>, 665 + next_wake: Option<Duration>, 666 + had_more: bool, 667 + } 668 + 669 + let ScanResult { 670 + ready, 671 + existing, 672 + next_wake, 673 + had_more, 674 + } = tokio::task::spawn_blocking(move || -> Result<ScanResult> { 675 + let now = chrono::Utc::now(); 676 + let mut rng: SmallRng = rand::make_rng(); 677 + let mut ready: Vec<Did> = Vec::new(); 678 + let mut existing: HashMap<Did<'static>, RetryState> = HashMap::new(); 679 + let mut next_wake: Option<Duration> = None; 680 + let mut had_more = false; 681 + 682 + for guard in db.crawler.prefix(keys::CRAWLER_RETRY_PREFIX) { 683 + let (key, val) = guard.into_inner().into_diagnostic()?; 684 + let state: RetryState = rmp_serde::from_slice(&val).into_diagnostic()?; 685 + let did = keys::crawler_retry_parse_key(&key)?.to_did(); 686 + 687 + if state.attempts >= MAX_RETRY_ATTEMPTS { 688 + continue; 689 + } 690 + 691 + let backoff = TimeDelta::seconds( 692 + state 693 + .duration 694 + .as_seconds_f64() 695 + .mul(rng.random_range(0.01..0.07)) as i64, 696 + ); 697 + if state.after + backoff > now { 698 + let wake = (state.after - now).to_std().unwrap_or(Duration::ZERO); 699 + next_wake = Some(next_wake.map(|w| w.min(wake)).unwrap_or(wake)); 700 + continue; 701 + } 702 + 703 + if ready.len() >= MAX_RETRY_BATCH { 704 + had_more = true; 705 + break; 706 + } 707 + 708 + ready.push(did.clone()); 709 + existing.insert(did, state); 710 + } 711 + 712 + Ok(ScanResult { 713 + ready, 714 + existing, 715 + next_wake, 716 + had_more, 717 + }) 718 + }) 719 + .await 720 + .into_diagnostic()??; 721 + 722 + if ready.is_empty() { 723 + return Ok(next_wake); 724 + } 725 + 726 + debug!(count = ready.len(), "retrying pending repos"); 727 + 728 + let filter = self.checker.state.filter.load(); 729 + let in_flight = self.in_flight.acquire(ready).await; 730 + let mut retry_batch = self.checker.state.db.inner.batch(); 731 + let confirmed = self 732 + .checker 733 + .check_signals_batch(in_flight, &filter, &mut retry_batch, &existing) 734 + .await?; 735 + 736 + tokio::task::spawn_blocking(move || retry_batch.commit().into_diagnostic()) 737 + .await 738 + .into_diagnostic()? 739 + .inspect_err(|e| error!(err = ?e, "retry state commit failed")) 740 + .ok(); 741 + 742 + if !confirmed.is_empty() { 743 + info!(count = confirmed.len(), "recovered from retry queue"); 744 + self.tx 745 + .send(CrawlerBatch { 746 + guards: confirmed, 747 + cursor_update: None, 748 + }) 749 + .await 750 + .ok(); 751 + } 752 + 753 + Ok(had_more.then_some(Duration::ZERO).or(next_wake)) 754 + } 755 + }
+210
src/crawler/worker.rs
··· 1 + use crate::db::{keys, ser_repo_state}; 2 + use crate::state::AppState; 3 + use crate::types::RepoState; 4 + use miette::{IntoDiagnostic, Result}; 5 + use rand::Rng; 6 + use rand::rngs::SmallRng; 7 + use std::sync::Arc; 8 + use std::time::Duration; 9 + use tokio::sync::mpsc; 10 + use tracing::{debug, error, info, trace}; 11 + 12 + use super::{CrawlerStats, InFlightGuard}; 13 + 14 + const WORKER_CHANNEL_CAPACITY: usize = 64; 15 + const BLOCKING_TASK_TIMEOUT: Duration = Duration::from_secs(30); 16 + 17 + /// a cursor write to include atomically with the batch. 18 + pub(crate) struct CursorUpdate { 19 + pub(super) key: Vec<u8>, 20 + pub(super) value: Vec<u8>, 21 + } 22 + 23 + /// a batch of confirmed repos from any crawler source ready to enqueue. 24 + /// 25 + /// `guards` hold the DIDs (via `Deref`) and keep their in-flight slots occupied 26 + /// until the batch is committed to the database. `cursor_update`, if present, 27 + /// is committed atomically with the repos/pending inserts. 28 + pub(crate) struct CrawlerBatch { 29 + pub(super) guards: Vec<InFlightGuard>, 30 + pub(super) cursor_update: Option<CursorUpdate>, 31 + } 32 + 33 + pub(crate) struct CrawlerWorker { 34 + pub(super) state: Arc<AppState>, 35 + pub(super) max_pending: usize, 36 + pub(super) resume_pending: usize, 37 + pub(super) stats: CrawlerStats, 38 + pub(super) rx: mpsc::Receiver<CrawlerBatch>, 39 + pub(super) was_throttled: bool, 40 + } 41 + 42 + impl CrawlerWorker { 43 + pub(crate) fn new( 44 + state: Arc<AppState>, 45 + max_pending: usize, 46 + resume_pending: usize, 47 + stats: CrawlerStats, 48 + ) -> (Self, mpsc::Sender<CrawlerBatch>) { 49 + let (tx, rx) = mpsc::channel(WORKER_CHANNEL_CAPACITY); 50 + ( 51 + Self { 52 + state, 53 + max_pending, 54 + resume_pending, 55 + stats, 56 + rx, 57 + was_throttled: false, 58 + }, 59 + tx, 60 + ) 61 + } 62 + 63 + pub(crate) async fn run(mut self) { 64 + while let Some(batch) = self.rx.recv().await { 65 + self.wait_for_capacity().await; 66 + if let Err(e) = self.enqueue(batch).await { 67 + error!(err = ?e, "crawler worker: enqueue failed"); 68 + } 69 + } 70 + } 71 + 72 + /// blocks until the pending queue has capacity. mirrors the hysteresis logic 73 + /// that was previously duplicated in each crawler loop: 74 + /// - above `max_pending`: hard stop, poll every 5s 75 + /// - between `resume_pending` and `max_pending`: cooldown until below `resume_pending` 76 + /// - below `resume_pending`: proceed (or release throttle) 77 + async fn wait_for_capacity(&mut self) { 78 + loop { 79 + let pending = self.state.db.get_count("pending").await; 80 + if pending > self.max_pending as u64 { 81 + if !self.was_throttled { 82 + debug!( 83 + pending, 84 + max = self.max_pending, 85 + "throttling: above max pending" 86 + ); 87 + self.was_throttled = true; 88 + self.stats.set_throttled(true); 89 + } 90 + tokio::time::sleep(Duration::from_secs(5)).await; 91 + } else if pending > self.resume_pending as u64 { 92 + if !self.was_throttled { 93 + debug!( 94 + pending, 95 + resume = self.resume_pending, 96 + "throttling: entering cooldown" 97 + ); 98 + self.was_throttled = true; 99 + self.stats.set_throttled(true); 100 + } 101 + loop { 102 + tokio::time::sleep(Duration::from_secs(5)).await; 103 + if self.state.db.get_count("pending").await <= self.resume_pending as u64 { 104 + break; 105 + } 106 + } 107 + self.was_throttled = false; 108 + self.stats.set_throttled(false); 109 + info!("throttling released"); 110 + break; 111 + } else { 112 + if self.was_throttled { 113 + self.was_throttled = false; 114 + self.stats.set_throttled(false); 115 + info!("throttling released"); 116 + } 117 + break; 118 + } 119 + } 120 + } 121 + 122 + /// filters already-known repos, commits them to `repos` + `pending` (plus 123 + /// an optional cursor update), then releases the in-flight guards. 124 + async fn enqueue(&mut self, batch: CrawlerBatch) -> Result<()> { 125 + let CrawlerBatch { 126 + guards, 127 + cursor_update, 128 + } = batch; 129 + 130 + // nothing to insert but still need to commit the cursor if present. 131 + if guards.is_empty() { 132 + if let Some(cursor) = cursor_update { 133 + self.commit_cursor(cursor).await?; 134 + } 135 + return Ok(()); 136 + } 137 + 138 + // filter already-known repos, build and commit the write batch, then return 139 + // the surviving guards so they are dropped on the async side after commit. 140 + let db = self.state.db.clone(); 141 + let surviving = tokio::time::timeout( 142 + BLOCKING_TASK_TIMEOUT, 143 + tokio::task::spawn_blocking(move || -> Result<Vec<InFlightGuard>> { 144 + let mut rng: SmallRng = rand::make_rng(); 145 + let mut write_batch = db.inner.batch(); 146 + let mut surviving = Vec::new(); 147 + for guard in guards { 148 + let did_key = keys::repo_key(&*guard); 149 + if db.repos.contains_key(&did_key).into_diagnostic()? { 150 + continue; 151 + } 152 + let state = RepoState::untracked(rng.next_u64()); 153 + write_batch.insert(&db.repos, &did_key, ser_repo_state(&state)?); 154 + write_batch.insert(&db.pending, keys::pending_key(state.index_id), &did_key); 155 + // clear any stale retry entry, this DID is confirmed and being enqueued 156 + write_batch.remove(&db.crawler, keys::crawler_retry_key(&*guard)); 157 + trace!(did = %*guard, "enqueuing repo"); 158 + surviving.push(guard); 159 + } 160 + if let Some(cursor) = cursor_update { 161 + write_batch.insert(&db.cursors, cursor.key, cursor.value); 162 + } 163 + write_batch.commit().into_diagnostic()?; 164 + Ok(surviving) 165 + }), 166 + ) 167 + .await 168 + .into_diagnostic()? 169 + .map_err(|_| { 170 + error!("enqueue batch timed out after {BLOCKING_TASK_TIMEOUT:?}"); 171 + miette::miette!("enqueue batch timed out") 172 + })? 173 + .inspect_err(|e| error!(err = ?e, "enqueue batch commit failed")) 174 + .unwrap_or_default(); 175 + 176 + let count = surviving.len(); 177 + // release in-flight slots now that the batch is committed 178 + drop(surviving); 179 + 180 + if count > 0 { 181 + self.stats.record_processed(count); 182 + self.state 183 + .db 184 + .update_count_async("repos", count as i64) 185 + .await; 186 + self.state 187 + .db 188 + .update_count_async("pending", count as i64) 189 + .await; 190 + self.state.notify_backfill(); 191 + } 192 + 193 + Ok(()) 194 + } 195 + 196 + async fn commit_cursor(&self, cursor: CursorUpdate) -> Result<()> { 197 + let db = self.state.db.clone(); 198 + tokio::time::timeout( 199 + BLOCKING_TASK_TIMEOUT, 200 + tokio::task::spawn_blocking(move || { 201 + let mut batch = db.inner.batch(); 202 + batch.insert(&db.cursors, cursor.key, cursor.value); 203 + batch.commit().into_diagnostic() 204 + }), 205 + ) 206 + .await 207 + .into_diagnostic()? 208 + .map_err(|_| miette::miette!("cursor-only commit timed out"))? 209 + } 210 + }
+20 -5
src/db/keys.rs
··· 2 2 use smol_str::SmolStr; 3 3 4 4 use crate::db::types::{DbRkey, DbTid, TrimmedDid}; 5 - use url::Url; 6 5 7 6 /// separator used for composite keys 8 7 pub const SEP: u8 = b'|'; ··· 157 156 TrimmedDid::try_from(&key[CRAWLER_RETRY_PREFIX.len()..]) 158 157 } 159 158 160 - pub fn crawler_cursor_key(relay: &Url) -> Vec<u8> { 159 + pub fn crawler_cursor_key(relay: &str) -> Vec<u8> { 161 160 let mut key = b"crawler_cursor|".to_vec(); 162 - key.extend_from_slice(relay.as_str().as_bytes()); 161 + key.extend_from_slice(relay.as_bytes()); 163 162 key 164 163 } 165 164 166 - pub fn firehose_cursor_key(relay: &Url) -> Vec<u8> { 165 + pub fn by_collection_cursor_key(url: &str, collection: &str) -> Vec<u8> { 166 + let mut key = b"by_collection_cursor|".to_vec(); 167 + key.extend_from_slice(url.as_bytes()); 168 + key.push(SEP); 169 + key.extend_from_slice(collection.as_bytes()); 170 + key 171 + } 172 + 173 + /// prefix for all by-collection cursors belonging to a given index URL. 174 + pub fn by_collection_cursor_prefix(url: &str) -> Vec<u8> { 175 + let mut prefix = b"by_collection_cursor|".to_vec(); 176 + prefix.extend_from_slice(url.as_bytes()); 177 + prefix.push(SEP); 178 + prefix 179 + } 180 + 181 + pub fn firehose_cursor_key(relay: &str) -> Vec<u8> { 167 182 let mut key = b"firehose_cursor|".to_vec(); 168 - key.extend_from_slice(relay.as_str().as_bytes()); 183 + key.extend_from_slice(relay.as_bytes()); 169 184 key 170 185 } 171 186
+13 -13
src/db/mod.rs
··· 120 120 .max_journaling_size(mb(cfg.db_max_journaling_size_mb)) 121 121 .with_compaction_filter_factories({ 122 122 let ephemeral = cfg.ephemeral; 123 - let f = move |ks: &str| { 124 - tracing::info!("with_compaction_filter_factories queried for keyspace: {ks}",); 125 - match ks { 126 - "counts" => ephemeral.then(|| -> Arc<dyn Factory> { 127 - Arc::new(DropPrefixFilterFactory { 128 - prefix: keys::COUNT_COLLECTION_PREFIX, 129 - }) 130 - }), 131 - _ => None, 132 - } 123 + let f = move |ks: &str| match ks { 124 + "counts" => ephemeral.then(|| -> Arc<dyn Factory> { 125 + Arc::new(DropPrefixFilterFactory { 126 + prefix: keys::COUNT_COLLECTION_PREFIX, 127 + }) 128 + }), 129 + _ => None, 133 130 }; 134 131 Arc::new(f) 135 132 }) ··· 146 143 let path = cfg.database_path.join(format!("dict_{name}.bin")); 147 144 if path.exists() { 148 145 if let Ok(bytes) = std::fs::read(&path) { 149 - tracing::info!( 146 + tracing::debug!( 150 147 "loaded zstd dictionary for keyspace {name} ({} bytes)", 151 148 bytes.len() 152 149 ); ··· 704 701 705 702 pub fn set_firehose_cursor(db: &Db, relay: &Url, cursor: i64) -> Result<()> { 706 703 db.cursors 707 - .insert(keys::firehose_cursor_key(relay), cursor.to_be_bytes()) 704 + .insert( 705 + keys::firehose_cursor_key(relay.as_str()), 706 + cursor.to_be_bytes(), 707 + ) 708 708 .into_diagnostic() 709 709 } 710 710 711 711 pub async fn get_firehose_cursor(db: &Db, relay: &Url) -> Result<Option<i64>> { 712 - let per_relay_key = keys::firehose_cursor_key(relay); 712 + let per_relay_key = keys::firehose_cursor_key(relay.as_str()); 713 713 if let Some(v) = Db::get(db.cursors.clone(), per_relay_key).await? { 714 714 return Ok(Some(i64::from_be_bytes( 715 715 v.as_ref()
+5 -2
src/state.rs
··· 32 32 33 33 let crawler_default = match config.enable_crawler { 34 34 Some(b) => b, 35 - // default: enabled in full-network mode, disabled in filter mode 36 - None => filter_config.mode == crate::filter::FilterMode::Full, 35 + // default: enabled if full-network mode, or if crawler sources are configured 36 + None => { 37 + filter_config.mode == crate::filter::FilterMode::Full 38 + || !config.crawler_sources.is_empty() 39 + } 37 40 }; 38 41 39 42 let filter = new_handle(filter_config);
+120
tests/collection_index_test.nu
··· 1 + #!/usr/bin/env nu 2 + # tests that the collection-index crawler (listReposByCollection) correctly discovers 3 + # and backfills repos from a lightrail-style index server. 4 + # 5 + # usage: nu tests/collection_index_test.nu 6 + # 7 + # requires network access to lightrail.microcosm.blue. 8 + use common.nu * 9 + 10 + def main [] { 11 + let port = 3015 12 + let url = $"http://localhost:($port)" 13 + let db_path = (mktemp -d -t hydrant_collection_index_test.XXXXXX) 14 + let collection = "app.bsky.graph.starterpack" 15 + let index_url = "https://lightrail.microcosm.blue" 16 + 17 + print $"database path: ($db_path)" 18 + 19 + # fetch a small known set of repos from the collection index so we can verify 20 + # they appear in hydrant after the discovery pass 21 + print $"fetching known repos for ($collection) from ($index_url)..." 22 + let index_resp = (http get $"($index_url)/xrpc/com.atproto.sync.listReposByCollection?collection=($collection)&limit=3") 23 + let known_dids = ($index_resp.repos | each { |r| $r.did }) 24 + 25 + if ($known_dids | is-empty) { 26 + print "SKIP: collection index returned no repos, cannot verify discovery" 27 + rm -rf $db_path 28 + exit 0 29 + } 30 + 31 + print $"will verify these DIDs are discovered: ($known_dids | str join ', ')" 32 + 33 + # start hydrant in filter mode with the test collection as a signal. 34 + # HYDRANT_ENABLE_COLLECTION_INDEX is true by default when signals are set, 35 + # so no explicit override needed. 36 + $env.HYDRANT_FILTER_SIGNALS = $collection 37 + $env.HYDRANT_COLLECTION_INDEX_URL = $index_url 38 + # keep the pending queue very small so throttling doesn't interfere with the test 39 + $env.HYDRANT_CRAWLER_MAX_PENDING_REPOS = "500" 40 + $env.HYDRANT_CRAWLER_RESUME_PENDING_REPOS = "200" 41 + # no relay needed for collection-index testing 42 + $env.HYDRANT_ENABLE_FIREHOSE = "false" 43 + 44 + let binary = build-hydrant 45 + let instance = start-hydrant $binary $db_path $port 46 + 47 + mut test_passed = false 48 + 49 + if not (wait-for-api $url) { 50 + print "ERROR: hydrant failed to start" 51 + try { kill -9 $instance.pid } 52 + rm -rf $db_path 53 + exit 1 54 + } 55 + 56 + # verify the filter was applied correctly 57 + let filter = (http get $"($url)/filter") 58 + print $"filter state: ($filter | to json)" 59 + if not ($filter.signals | any { |s| $s == $collection }) { 60 + print $"FAILED: ($collection) not in signals — filter not configured" 61 + try { kill -9 $instance.pid } 62 + rm -rf $db_path 63 + exit 1 64 + } 65 + 66 + # wait for the collection-index pass to discover and enqueue repos. 67 + # the first pass runs immediately on startup; repos should appear within ~30s 68 + # depending on pagination speed and backfill concurrency. 69 + print "waiting for collection-index discovery pass..." 70 + mut discovered = false 71 + for i in 1..60 { 72 + let stats = (try { (http get $"($url)/stats?accurate=true").counts } catch { {} }) 73 + let repos = ($stats | get --optional repos | default 0 | into int) 74 + print $"[($i)/60] repos: ($repos)" 75 + if $repos >= ($known_dids | length) { 76 + $discovered = true 77 + break 78 + } 79 + sleep 2sec 80 + } 81 + 82 + if not $discovered { 83 + print "FAILED: collection-index did not discover any repos within timeout" 84 + try { kill -9 $instance.pid } 85 + rm -rf $db_path 86 + exit 1 87 + } 88 + 89 + # verify each known DID appears in the repos API 90 + print "verifying known DIDs were discovered..." 91 + mut all_found = true 92 + for did in $known_dids { 93 + let repo = (try { 94 + http get $"($url)/repos/($did)" 95 + } catch { 96 + null 97 + }) 98 + if ($repo | is-empty) { 99 + print $"FAILED: ($did) not found in repos API" 100 + $all_found = false 101 + } else { 102 + print $"ok: ($did) — status: ($repo.status)" 103 + } 104 + } 105 + 106 + if $all_found { 107 + print "test PASSED: collection-index correctly discovered repos from listReposByCollection" 108 + $test_passed = true 109 + } 110 + 111 + print "stopping hydrant..." 112 + try { kill -9 $instance.pid } 113 + rm -rf $db_path 114 + 115 + if $test_passed { 116 + exit 0 117 + } else { 118 + exit 1 119 + } 120 + }