very fast at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
fjall
at-protocol
atproto
indexer
rust
1use futures::{FutureExt, future::BoxFuture};
2use hydrant::config::{Config, SignatureVerification};
3use hydrant::db;
4use hydrant::ingest::firehose::FirehoseIngestor;
5use hydrant::state::AppState;
6use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker};
7use miette::IntoDiagnostic;
8use mimalloc::MiMalloc;
9use std::sync::Arc;
10use std::sync::atomic::Ordering;
11use tokio::{sync::mpsc, task::spawn_blocking};
12use tracing::{debug, error, info};
13
14#[global_allocator]
15static GLOBAL: MiMalloc = MiMalloc;
16
17#[tokio::main]
18async fn main() -> miette::Result<()> {
19 rustls::crypto::aws_lc_rs::default_provider()
20 .install_default()
21 .ok();
22
23 let cfg = Config::from_env()?;
24
25 let env_filter = tracing_subscriber::EnvFilter::builder()
26 .with_default_directive(tracing::Level::INFO.into())
27 .from_env_lossy();
28 tracing_subscriber::fmt().with_env_filter(env_filter).init();
29
30 info!("{cfg}");
31
32 let state = AppState::new(&cfg)?;
33
34 // load block refcounts for GC - must complete before any ingest workers start
35 if cfg.ephemeral {
36 db::gc::ephemeral_startup_load_refcounts(&state.db)?;
37 } else {
38 db::gc::startup_load_refcounts(&state.db)?;
39 }
40
41 if cfg.full_network
42 || cfg.filter_signals.is_some()
43 || cfg.filter_collections.is_some()
44 || cfg.filter_excludes.is_some()
45 {
46 let filter_ks = state.db.filter.clone();
47 let inner = state.db.inner.clone();
48 let full_network = cfg.full_network;
49 let signals = cfg.filter_signals.clone();
50 let collections = cfg.filter_collections.clone();
51 let excludes = cfg.filter_excludes.clone();
52
53 tokio::task::spawn_blocking(move || {
54 use hydrant::filter::{FilterMode, SetUpdate};
55 let mut batch = inner.batch();
56
57 let mode = if full_network {
58 Some(FilterMode::Full)
59 } else {
60 None
61 };
62
63 let signals_update = signals.map(SetUpdate::Set);
64 let collections_update = collections.map(SetUpdate::Set);
65 let excludes_update = excludes.map(SetUpdate::Set);
66
67 hydrant::db::filter::apply_patch(
68 &mut batch,
69 &filter_ks,
70 mode,
71 signals_update,
72 collections_update,
73 excludes_update,
74 )?;
75
76 batch.commit().into_diagnostic()
77 })
78 .await
79 .into_diagnostic()??;
80
81 let new_filter = hydrant::db::filter::load(&state.db.filter)?;
82 state.filter.store(new_filter.into());
83 }
84
85 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
86 let state = Arc::new(state);
87
88 // spawn GC workers
89 if cfg.ephemeral {
90 let state_ttl = state.clone();
91 std::thread::Builder::new()
92 .name("ephemeral-ttl".into())
93 .spawn(move || db::gc::ephemeral_ttl_worker(state_ttl))
94 .into_diagnostic()?;
95 } else {
96 let state_gc = state.clone();
97 std::thread::Builder::new()
98 .name("gc-checkpoint".into())
99 .spawn(move || db::gc::checkpoint_worker(state_gc))
100 .into_diagnostic()?;
101 }
102
103 if cfg.enable_backfill {
104 tokio::spawn({
105 let state = state.clone();
106 let timeout = cfg.repo_fetch_timeout;
107 BackfillWorker::new(
108 state,
109 buffer_tx.clone(),
110 timeout,
111 cfg.backfill_concurrency_limit,
112 matches!(
113 cfg.verify_signatures,
114 SignatureVerification::Full | SignatureVerification::BackfillOnly
115 ),
116 cfg.ephemeral,
117 )
118 .run()
119 });
120 }
121
122 if let Err(e) = spawn_blocking({
123 let state = state.clone();
124 move || hydrant::backfill::manager::queue_gone_backfills(&state)
125 })
126 .await
127 .into_diagnostic()?
128 {
129 error!(err = %e, "failed to queue gone backfills");
130 db::check_poisoned_report(&e);
131 }
132
133 std::thread::spawn({
134 let state = state.clone();
135 move || hydrant::backfill::manager::retry_worker(state)
136 });
137
138 tokio::spawn({
139 let state = state.clone();
140 let mut last_id = state.db.next_event_id.load(Ordering::Relaxed);
141 let mut last_time = std::time::Instant::now();
142 let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
143 async move {
144 loop {
145 interval.tick().await;
146
147 let current_id = state.db.next_event_id.load(Ordering::Relaxed);
148 let current_time = std::time::Instant::now();
149
150 let delta = current_id.saturating_sub(last_id);
151 if delta == 0 {
152 debug!("no new events in 60s");
153 continue;
154 }
155
156 let elapsed = current_time.duration_since(last_time).as_secs_f64();
157 let rate = if elapsed > 0.0 {
158 delta as f64 / elapsed
159 } else {
160 0.0
161 };
162
163 info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)");
164
165 last_id = current_id;
166 last_time = current_time;
167 }
168 }
169 });
170
171 std::thread::spawn({
172 let state = state.clone();
173 let persist_interval = cfg.cursor_save_interval;
174
175 move || {
176 loop {
177 std::thread::sleep(persist_interval);
178
179 // persist firehose cursors
180 for (relay, cursor) in &state.relay_cursors {
181 let seq = cursor.load(Ordering::SeqCst);
182 if seq > 0 {
183 if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) {
184 error!(relay = %relay, err = %e, "failed to save cursor");
185 db::check_poisoned_report(&e);
186 }
187 }
188 }
189
190 // persist counts
191 // TODO: make this more durable
192 if let Err(e) = db::persist_counts(&state.db) {
193 error!(err = %e, "failed to persist counts");
194 db::check_poisoned_report(&e);
195 }
196
197 // persist journal
198 if let Err(e) = state.db.persist() {
199 error!(err = %e, "db persist failed");
200 db::check_poisoned_report(&e);
201 }
202 }
203 }
204 });
205
206 info!("starting crawler ({:?})", state.filter.load().mode);
207 let state_for_crawler = state.clone();
208 let relay_hosts = cfg.relays.clone();
209 let crawler_max_pending = cfg.crawler_max_pending_repos;
210 let crawler_resume_pending = cfg.crawler_resume_pending_repos;
211
212 let should_run_crawler = match cfg.enable_crawler {
213 Some(true) => true,
214 Some(false) => false,
215 None => state.filter.load().mode == hydrant::filter::FilterMode::Full,
216 };
217
218 if should_run_crawler {
219 info!(
220 relay_count = relay_hosts.len(),
221 hosts = ?relay_hosts,
222 "spawning crawler"
223 );
224 tokio::spawn(async move {
225 let crawler = hydrant::crawler::Crawler::new(
226 state_for_crawler,
227 relay_hosts,
228 crawler_max_pending,
229 crawler_resume_pending,
230 );
231 if let Err(e) = crawler.run().await {
232 error!(err = %e, "crawler error");
233 db::check_poisoned_report(&e);
234 }
235 });
236 } else {
237 info!("crawler disabled by config or filter mode");
238 }
239
240 let mut tasks = if cfg.enable_firehose {
241 let firehose_worker = std::thread::spawn({
242 let state = state.clone();
243 let handle = tokio::runtime::Handle::current();
244 move || {
245 FirehoseWorker::new(
246 state,
247 buffer_rx,
248 matches!(cfg.verify_signatures, SignatureVerification::Full),
249 cfg.ephemeral,
250 cfg.firehose_workers,
251 )
252 .run(handle)
253 }
254 });
255
256 let mut t: Vec<BoxFuture<miette::Result<()>>> = vec![Box::pin(
257 tokio::task::spawn_blocking(move || {
258 firehose_worker
259 .join()
260 .map_err(|e| miette::miette!("buffer processor died: {e:?}"))
261 })
262 .map(|r| r.into_diagnostic().flatten().flatten()),
263 )];
264
265 for relay_url in &cfg.relays {
266 let ingestor = FirehoseIngestor::new(
267 state.clone(),
268 buffer_tx.clone(),
269 relay_url.clone(),
270 state.filter.clone(),
271 matches!(cfg.verify_signatures, SignatureVerification::Full),
272 );
273 t.push(Box::pin(ingestor.run()));
274 }
275
276 t
277 } else {
278 info!("firehose ingestion disabled by config");
279 // if firehose is disabled, we just wait indefinitely (or until signal)
280 // essentially we just want to keep the main thread alive for the other components
281 vec![Box::pin(futures::future::pending::<miette::Result<()>>()) as BoxFuture<_>]
282 };
283
284 let state_api = state.clone();
285 tasks.push(Box::pin(async move {
286 api::serve(state_api, cfg.api_port)
287 .await
288 .map_err(|e| miette::miette!("API server failed: {e}"))
289 }) as BoxFuture<_>);
290
291 if cfg.enable_debug {
292 let state_debug = state.clone();
293 tasks.push(Box::pin(async move {
294 api::serve_debug(state_debug, cfg.debug_port)
295 .await
296 .map_err(|e| miette::miette!("debug server failed: {e}"))
297 }) as BoxFuture<_>);
298 }
299
300 let res = futures::future::select_all(tasks);
301 if let (Err(e), _, _) = res.await {
302 error!(err = %e, "critical worker died");
303 db::check_poisoned_report(&e);
304 }
305
306 if let Err(e) = state.db.persist() {
307 db::check_poisoned_report(&e);
308 return Err(e);
309 }
310
311 Ok(())
312}