very fast at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
fjall at-protocol atproto indexer rust
at main 312 lines 10 kB view raw
1use futures::{FutureExt, future::BoxFuture}; 2use hydrant::config::{Config, SignatureVerification}; 3use hydrant::db; 4use hydrant::ingest::firehose::FirehoseIngestor; 5use hydrant::state::AppState; 6use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 7use miette::IntoDiagnostic; 8use mimalloc::MiMalloc; 9use std::sync::Arc; 10use std::sync::atomic::Ordering; 11use tokio::{sync::mpsc, task::spawn_blocking}; 12use tracing::{debug, error, info}; 13 14#[global_allocator] 15static GLOBAL: MiMalloc = MiMalloc; 16 17#[tokio::main] 18async fn main() -> miette::Result<()> { 19 rustls::crypto::aws_lc_rs::default_provider() 20 .install_default() 21 .ok(); 22 23 let cfg = Config::from_env()?; 24 25 let env_filter = tracing_subscriber::EnvFilter::builder() 26 .with_default_directive(tracing::Level::INFO.into()) 27 .from_env_lossy(); 28 tracing_subscriber::fmt().with_env_filter(env_filter).init(); 29 30 info!("{cfg}"); 31 32 let state = AppState::new(&cfg)?; 33 34 // load block refcounts for GC - must complete before any ingest workers start 35 if cfg.ephemeral { 36 db::gc::ephemeral_startup_load_refcounts(&state.db)?; 37 } else { 38 db::gc::startup_load_refcounts(&state.db)?; 39 } 40 41 if cfg.full_network 42 || cfg.filter_signals.is_some() 43 || cfg.filter_collections.is_some() 44 || cfg.filter_excludes.is_some() 45 { 46 let filter_ks = state.db.filter.clone(); 47 let inner = state.db.inner.clone(); 48 let full_network = cfg.full_network; 49 let signals = cfg.filter_signals.clone(); 50 let collections = cfg.filter_collections.clone(); 51 let excludes = cfg.filter_excludes.clone(); 52 53 tokio::task::spawn_blocking(move || { 54 use hydrant::filter::{FilterMode, SetUpdate}; 55 let mut batch = inner.batch(); 56 57 let mode = if full_network { 58 Some(FilterMode::Full) 59 } else { 60 None 61 }; 62 63 let signals_update = signals.map(SetUpdate::Set); 64 let collections_update = collections.map(SetUpdate::Set); 65 let excludes_update = excludes.map(SetUpdate::Set); 66 67 hydrant::db::filter::apply_patch( 68 &mut batch, 69 &filter_ks, 70 mode, 71 signals_update, 72 collections_update, 73 excludes_update, 74 )?; 75 76 batch.commit().into_diagnostic() 77 }) 78 .await 79 .into_diagnostic()??; 80 81 let new_filter = hydrant::db::filter::load(&state.db.filter)?; 82 state.filter.store(new_filter.into()); 83 } 84 85 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 86 let state = Arc::new(state); 87 88 // spawn GC workers 89 if cfg.ephemeral { 90 let state_ttl = state.clone(); 91 std::thread::Builder::new() 92 .name("ephemeral-ttl".into()) 93 .spawn(move || db::gc::ephemeral_ttl_worker(state_ttl)) 94 .into_diagnostic()?; 95 } else { 96 let state_gc = state.clone(); 97 std::thread::Builder::new() 98 .name("gc-checkpoint".into()) 99 .spawn(move || db::gc::checkpoint_worker(state_gc)) 100 .into_diagnostic()?; 101 } 102 103 if cfg.enable_backfill { 104 tokio::spawn({ 105 let state = state.clone(); 106 let timeout = cfg.repo_fetch_timeout; 107 BackfillWorker::new( 108 state, 109 buffer_tx.clone(), 110 timeout, 111 cfg.backfill_concurrency_limit, 112 matches!( 113 cfg.verify_signatures, 114 SignatureVerification::Full | SignatureVerification::BackfillOnly 115 ), 116 cfg.ephemeral, 117 ) 118 .run() 119 }); 120 } 121 122 if let Err(e) = spawn_blocking({ 123 let state = state.clone(); 124 move || hydrant::backfill::manager::queue_gone_backfills(&state) 125 }) 126 .await 127 .into_diagnostic()? 128 { 129 error!(err = %e, "failed to queue gone backfills"); 130 db::check_poisoned_report(&e); 131 } 132 133 std::thread::spawn({ 134 let state = state.clone(); 135 move || hydrant::backfill::manager::retry_worker(state) 136 }); 137 138 tokio::spawn({ 139 let state = state.clone(); 140 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 141 let mut last_time = std::time::Instant::now(); 142 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 143 async move { 144 loop { 145 interval.tick().await; 146 147 let current_id = state.db.next_event_id.load(Ordering::Relaxed); 148 let current_time = std::time::Instant::now(); 149 150 let delta = current_id.saturating_sub(last_id); 151 if delta == 0 { 152 debug!("no new events in 60s"); 153 continue; 154 } 155 156 let elapsed = current_time.duration_since(last_time).as_secs_f64(); 157 let rate = if elapsed > 0.0 { 158 delta as f64 / elapsed 159 } else { 160 0.0 161 }; 162 163 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)"); 164 165 last_id = current_id; 166 last_time = current_time; 167 } 168 } 169 }); 170 171 std::thread::spawn({ 172 let state = state.clone(); 173 let persist_interval = cfg.cursor_save_interval; 174 175 move || { 176 loop { 177 std::thread::sleep(persist_interval); 178 179 // persist firehose cursors 180 for (relay, cursor) in &state.relay_cursors { 181 let seq = cursor.load(Ordering::SeqCst); 182 if seq > 0 { 183 if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) { 184 error!(relay = %relay, err = %e, "failed to save cursor"); 185 db::check_poisoned_report(&e); 186 } 187 } 188 } 189 190 // persist counts 191 // TODO: make this more durable 192 if let Err(e) = db::persist_counts(&state.db) { 193 error!(err = %e, "failed to persist counts"); 194 db::check_poisoned_report(&e); 195 } 196 197 // persist journal 198 if let Err(e) = state.db.persist() { 199 error!(err = %e, "db persist failed"); 200 db::check_poisoned_report(&e); 201 } 202 } 203 } 204 }); 205 206 info!("starting crawler ({:?})", state.filter.load().mode); 207 let state_for_crawler = state.clone(); 208 let relay_hosts = cfg.relays.clone(); 209 let crawler_max_pending = cfg.crawler_max_pending_repos; 210 let crawler_resume_pending = cfg.crawler_resume_pending_repos; 211 212 let should_run_crawler = match cfg.enable_crawler { 213 Some(true) => true, 214 Some(false) => false, 215 None => state.filter.load().mode == hydrant::filter::FilterMode::Full, 216 }; 217 218 if should_run_crawler { 219 info!( 220 relay_count = relay_hosts.len(), 221 hosts = ?relay_hosts, 222 "spawning crawler" 223 ); 224 tokio::spawn(async move { 225 let crawler = hydrant::crawler::Crawler::new( 226 state_for_crawler, 227 relay_hosts, 228 crawler_max_pending, 229 crawler_resume_pending, 230 ); 231 if let Err(e) = crawler.run().await { 232 error!(err = %e, "crawler error"); 233 db::check_poisoned_report(&e); 234 } 235 }); 236 } else { 237 info!("crawler disabled by config or filter mode"); 238 } 239 240 let mut tasks = if cfg.enable_firehose { 241 let firehose_worker = std::thread::spawn({ 242 let state = state.clone(); 243 let handle = tokio::runtime::Handle::current(); 244 move || { 245 FirehoseWorker::new( 246 state, 247 buffer_rx, 248 matches!(cfg.verify_signatures, SignatureVerification::Full), 249 cfg.ephemeral, 250 cfg.firehose_workers, 251 ) 252 .run(handle) 253 } 254 }); 255 256 let mut t: Vec<BoxFuture<miette::Result<()>>> = vec![Box::pin( 257 tokio::task::spawn_blocking(move || { 258 firehose_worker 259 .join() 260 .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 261 }) 262 .map(|r| r.into_diagnostic().flatten().flatten()), 263 )]; 264 265 for relay_url in &cfg.relays { 266 let ingestor = FirehoseIngestor::new( 267 state.clone(), 268 buffer_tx.clone(), 269 relay_url.clone(), 270 state.filter.clone(), 271 matches!(cfg.verify_signatures, SignatureVerification::Full), 272 ); 273 t.push(Box::pin(ingestor.run())); 274 } 275 276 t 277 } else { 278 info!("firehose ingestion disabled by config"); 279 // if firehose is disabled, we just wait indefinitely (or until signal) 280 // essentially we just want to keep the main thread alive for the other components 281 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>] 282 }; 283 284 let state_api = state.clone(); 285 tasks.push(Box::pin(async move { 286 api::serve(state_api, cfg.api_port) 287 .await 288 .map_err(|e| miette::miette!("API server failed: {e}")) 289 }) as BoxFuture<_>); 290 291 if cfg.enable_debug { 292 let state_debug = state.clone(); 293 tasks.push(Box::pin(async move { 294 api::serve_debug(state_debug, cfg.debug_port) 295 .await 296 .map_err(|e| miette::miette!("debug server failed: {e}")) 297 }) as BoxFuture<_>); 298 } 299 300 let res = futures::future::select_all(tasks); 301 if let (Err(e), _, _) = res.await { 302 error!(err = %e, "critical worker died"); 303 db::check_poisoned_report(&e); 304 } 305 306 if let Err(e) = state.db.persist() { 307 db::check_poisoned_report(&e); 308 return Err(e); 309 } 310 311 Ok(()) 312}