at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol
atproto
indexer
rust
fjall
1use crate::db::compaction::DropPrefixFilterFactory;
2use crate::types::{BroadcastEvent, RepoState};
3
4use fjall::config::BlockSizePolicy;
5use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode, Slice};
6use jacquard_common::IntoStatic;
7use jacquard_common::types::string::Did;
8use lsm_tree::compaction::Factory;
9use miette::{Context, IntoDiagnostic, Result};
10use scc::HashMap;
11use smol_str::SmolStr;
12
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, AtomicU64};
15
16use crate::util::RelayId;
17
18pub mod compaction;
19pub mod filter;
20pub mod gc;
21pub mod keys;
22pub mod refcount;
23pub mod types;
24
25use tokio::sync::broadcast;
26use tracing::error;
27
28fn default_opts() -> KeyspaceCreateOptions {
29 KeyspaceCreateOptions::default()
30}
31
32pub struct Db {
33 pub inner: Arc<Database>,
34 pub repos: Keyspace,
35 pub records: Keyspace,
36 pub blocks: Keyspace,
37 pub cursors: Keyspace,
38 pub pending: Keyspace,
39 pub resync: Keyspace,
40 pub resync_buffer: Keyspace,
41 pub events: Keyspace,
42 pub counts: Keyspace,
43 pub filter: Keyspace,
44 pub crawler: Keyspace,
45 pub block_refs: Keyspace,
46 pub block_reflog: Keyspace,
47 pub block_refcounts: Arc<HashMap<Slice, i64>>,
48 pub gc_ready: Arc<AtomicBool>,
49 pub next_reflog_seq: Arc<AtomicU64>,
50 pub event_tx: broadcast::Sender<BroadcastEvent>,
51 pub next_event_id: Arc<AtomicU64>,
52 pub counts_map: HashMap<SmolStr, u64>,
53}
54
55macro_rules! update_gauge_diff_impl {
56 ($self:ident, $old:ident, $new:ident, $update_method:ident $(, $await:tt)?) => {{
57 use crate::types::GaugeState;
58
59 if $old == $new {
60 return;
61 }
62
63 // pending
64 match ($old, $new) {
65 (GaugeState::Pending, GaugeState::Pending) => {}
66 (GaugeState::Pending, _) => $self.$update_method("pending", -1) $(.$await)?,
67 (_, GaugeState::Pending) => $self.$update_method("pending", 1) $(.$await)?,
68 _ => {}
69 }
70
71 // resync
72 let old_resync = $old.is_resync();
73 let new_resync = $new.is_resync();
74 match (old_resync, new_resync) {
75 (true, false) => $self.$update_method("resync", -1) $(.$await)?,
76 (false, true) => $self.$update_method("resync", 1) $(.$await)?,
77 _ => {}
78 }
79
80 // error kinds
81 if let GaugeState::Resync(Some(kind)) = $old {
82 let key = match kind {
83 crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited",
84 crate::types::ResyncErrorKind::Transport => "error_transport",
85 crate::types::ResyncErrorKind::Generic => "error_generic",
86 };
87 $self.$update_method(key, -1) $(.$await)?;
88 }
89
90 if let GaugeState::Resync(Some(kind)) = $new {
91 let key = match kind {
92 crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited",
93 crate::types::ResyncErrorKind::Transport => "error_transport",
94 crate::types::ResyncErrorKind::Generic => "error_generic",
95 };
96 $self.$update_method(key, 1) $(.$await)?;
97 }
98 }};
99}
100
101impl Db {
102 pub fn open(cfg: &crate::config::Config) -> Result<Self> {
103 const fn kb(v: u32) -> u32 {
104 v * 1024
105 }
106 const fn mb(v: u64) -> u64 {
107 v * 1024 * 1024
108 }
109
110 let gc_ready = Arc::new(AtomicBool::new(false));
111 let block_refcounts: Arc<HashMap<Slice, i64>> = Arc::new(HashMap::new());
112
113 let gc_ready_factory = gc_ready.clone();
114 let refcounts_factory = block_refcounts.clone();
115
116 let db = Database::builder(&cfg.database_path)
117 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2)
118 .manual_journal_persist(true)
119 .journal_compression(
120 cfg.disable_lz4_compression
121 .then_some(fjall::CompressionType::None)
122 .unwrap_or(fjall::CompressionType::Lz4),
123 )
124 .worker_threads(cfg.db_worker_threads)
125 .max_journaling_size(mb(cfg.db_max_journaling_size_mb))
126 .with_compaction_filter_factories({
127 let ephemeral = cfg.ephemeral;
128 let f = move |ks: &str| {
129 tracing::info!("with_compaction_filter_factories queried for keyspace: {ks}",);
130 match ks {
131 "counts" => ephemeral.then(|| -> Arc<dyn Factory> {
132 Arc::new(DropPrefixFilterFactory {
133 prefix: keys::COUNT_COLLECTION_PREFIX,
134 })
135 }),
136 "blocks" => Some(Arc::new(gc::BlocksGcFilterFactory {
137 gc_ready: gc_ready_factory.clone(),
138 refcounts: refcounts_factory.clone(),
139 }) as Arc<dyn Factory>),
140 _ => None,
141 }
142 };
143 Arc::new(f)
144 })
145 .open()
146 .into_diagnostic()?;
147 let db = Arc::new(db);
148
149 let opts = default_opts;
150 let open_ks = |name: &str, opts: KeyspaceCreateOptions| {
151 db.keyspace(name, move || opts).into_diagnostic()
152 };
153
154 let repos = open_ks(
155 "repos",
156 opts()
157 // crawler checks if a repo doesn't exist
158 .expect_point_read_hits(false)
159 .max_memtable_size(mb(cfg.db_repos_memtable_size_mb))
160 .data_block_size_policy(BlockSizePolicy::all(kb(4))),
161 )?;
162 let blocks = open_ks(
163 "blocks",
164 opts()
165 // point reads are used a lot by stream
166 .expect_point_read_hits(true)
167 .max_memtable_size(mb(cfg.db_blocks_memtable_size_mb))
168 // 32 - 64 kb is probably fine, as the newer blocks will be in the first levels
169 // and any consumers will probably be streaming the newer events...
170 .data_block_size_policy(BlockSizePolicy::new([kb(4), kb(8), kb(32), kb(64)])),
171 )?;
172 let records = open_ks(
173 "records",
174 // point reads might miss when using getRecord
175 // but we assume thats not going to be used much... (todo: should be a config option maybe?)
176 // since this keyspace is big, turning off bloom filters will help a lot
177 opts()
178 .expect_point_read_hits(true)
179 .max_memtable_size(mb(cfg.db_records_memtable_size_mb))
180 .data_block_size_policy(BlockSizePolicy::all(kb(8))),
181 )?;
182 let cursors = open_ks(
183 "cursors",
184 opts()
185 // cursor point reads hit almost 100% of the time
186 .expect_point_read_hits(true)
187 .max_memtable_size(mb(4))
188 .data_block_size_policy(BlockSizePolicy::all(kb(1))),
189 )?;
190 let pending = open_ks(
191 "pending",
192 opts()
193 // iterated over as a queue, no point reads are used so bloom filters are disabled
194 .expect_point_read_hits(true)
195 .max_memtable_size(mb(cfg.db_pending_memtable_size_mb))
196 .data_block_size_policy(BlockSizePolicy::all(kb(4))),
197 )?;
198 // resync point reads often miss (because most repos aren't resyncing), so keeping the bloom filter helps avoid disk hits
199 let resync = open_ks(
200 "resync",
201 opts()
202 .max_memtable_size(mb(cfg.db_pending_memtable_size_mb))
203 .data_block_size_policy(BlockSizePolicy::all(kb(8))),
204 )?;
205 let resync_buffer = open_ks(
206 "resync_buffer",
207 opts()
208 // iterated during backfill, no point reads
209 .expect_point_read_hits(true)
210 .max_memtable_size(mb(16))
211 .data_block_size_policy(BlockSizePolicy::all(kb(32))),
212 )?;
213 let events = open_ks(
214 "events",
215 opts()
216 // only iterators are used here, no point reads
217 .expect_point_read_hits(true)
218 .max_memtable_size(mb(cfg.db_events_memtable_size_mb))
219 .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])),
220 )?;
221 let counts = open_ks(
222 "counts",
223 opts()
224 // count increments hit because counters are mostly pre-initialized
225 .expect_point_read_hits(true)
226 .max_memtable_size(mb(32))
227 // the data is very small
228 .data_block_size_policy(BlockSizePolicy::all(kb(1))),
229 )?;
230
231 // filter handles high-volume point reads (checking excludes from firehose)
232 // so it needs the bloom filter
233 let filter = open_ks(
234 "filter",
235 // this can be pretty small since the DIDs wont be compressed that well anyhow
236 opts()
237 .max_memtable_size(mb(16))
238 .data_block_size_policy(BlockSizePolicy::all(kb(1))),
239 )?;
240
241 let crawler = open_ks(
242 "crawler",
243 opts()
244 .max_memtable_size(mb(16))
245 .data_block_size_policy(BlockSizePolicy::all(kb(1))),
246 )?;
247
248 let block_refs = open_ks(
249 "block_refs",
250 opts()
251 // only ever iterated on
252 .expect_point_read_hits(true)
253 .max_memtable_size(mb(cfg.db_records_memtable_size_mb / 2))
254 .data_block_size_policy(BlockSizePolicy::all(kb(64))),
255 )?;
256
257 let block_reflog = open_ks(
258 "block_reflog",
259 opts()
260 // only ever iterated on
261 .expect_point_read_hits(true)
262 .max_memtable_size(mb(4))
263 .data_block_size_policy(BlockSizePolicy::all(kb(64))),
264 )?;
265
266 let mut last_id = 0;
267 if let Some(guard) = events.iter().next_back() {
268 let k = guard.key().into_diagnostic()?;
269 last_id = u64::from_be_bytes(
270 k.as_ref()
271 .try_into()
272 .into_diagnostic()
273 .wrap_err("expected to be id (8 bytes)")?,
274 );
275 }
276
277 let mut last_reflog_seq = 0u64;
278 if let Some(guard) = block_reflog.iter().next_back() {
279 let k = guard.key().into_diagnostic()?;
280 last_reflog_seq = u64::from_be_bytes(
281 k.as_ref()
282 .try_into()
283 .into_diagnostic()
284 .wrap_err("expected to be reflog seq (8 bytes)")?,
285 );
286 }
287 let next_reflog_seq = Arc::new(AtomicU64::new(last_reflog_seq.saturating_add(1)));
288
289 // load counts into memory
290 let counts_map = HashMap::new();
291 for guard in counts.prefix(keys::COUNT_KS_PREFIX) {
292 let (k, v) = guard.into_inner().into_diagnostic()?;
293 let name = std::str::from_utf8(&k[keys::COUNT_KS_PREFIX.len()..])
294 .into_diagnostic()
295 .wrap_err("expected valid utf8 for ks count key")?;
296 let _ = counts_map.insert_sync(
297 SmolStr::new(name),
298 u64::from_be_bytes(v.as_ref().try_into().unwrap()),
299 );
300 }
301 // ensure critical counts are initialized
302 for ks_name in ["repos", "pending", "resync"] {
303 let _ = counts_map
304 .entry_sync(SmolStr::new(ks_name))
305 .or_insert_with(|| {
306 let ks = match ks_name {
307 "repos" => &repos,
308 "pending" => &pending,
309 "resync" => &resync,
310 _ => unreachable!(),
311 };
312 ks.iter().count() as u64
313 });
314 }
315
316 let (event_tx, _) = broadcast::channel(10000);
317
318 Ok(Self {
319 inner: db,
320 repos,
321 records,
322 blocks,
323 cursors,
324 pending,
325 resync,
326 resync_buffer,
327 events,
328 counts,
329 filter,
330 crawler,
331 block_refs,
332 block_reflog,
333 block_refcounts,
334 gc_ready,
335 next_reflog_seq,
336 event_tx,
337 counts_map,
338 next_event_id: Arc::new(AtomicU64::new(last_id + 1)),
339 })
340 }
341
342 pub fn persist(&self) -> Result<()> {
343 self.inner.persist(PersistMode::SyncAll).into_diagnostic()?;
344 Ok(())
345 }
346
347 pub async fn get(ks: Keyspace, key: impl Into<Slice>) -> Result<Option<Slice>> {
348 let key = key.into();
349 tokio::task::spawn_blocking(move || ks.get(key).into_diagnostic())
350 .await
351 .into_diagnostic()?
352 }
353
354 #[allow(dead_code)]
355 pub async fn insert(
356 ks: Keyspace,
357 key: impl Into<Slice>,
358 value: impl Into<Slice>,
359 ) -> Result<()> {
360 let key = key.into();
361 let value = value.into();
362 tokio::task::spawn_blocking(move || ks.insert(key, value).into_diagnostic())
363 .await
364 .into_diagnostic()?
365 }
366
367 #[allow(dead_code)]
368 pub async fn remove(ks: Keyspace, key: impl Into<Slice>) -> Result<()> {
369 let key = key.into();
370 tokio::task::spawn_blocking(move || ks.remove(key).into_diagnostic())
371 .await
372 .into_diagnostic()?
373 }
374
375 pub async fn contains_key(ks: Keyspace, key: impl Into<Slice>) -> Result<bool> {
376 let key = key.into();
377 tokio::task::spawn_blocking(move || ks.contains_key(key).into_diagnostic())
378 .await
379 .into_diagnostic()?
380 }
381
382 pub fn update_count(&self, key: &str, delta: i64) {
383 let mut entry = self.counts_map.entry_sync(SmolStr::new(key)).or_insert(0);
384 if delta >= 0 {
385 *entry = entry.saturating_add(delta as u64);
386 } else {
387 *entry = entry.saturating_sub(delta.unsigned_abs());
388 }
389 }
390
391 pub async fn update_count_async(&self, key: &str, delta: i64) {
392 let mut entry = self
393 .counts_map
394 .entry_async(SmolStr::new(key))
395 .await
396 .or_insert(0);
397 if delta >= 0 {
398 *entry = entry.saturating_add(delta as u64);
399 } else {
400 *entry = entry.saturating_sub(delta.unsigned_abs());
401 }
402 }
403
404 pub async fn get_count(&self, key: &str) -> u64 {
405 self.counts_map
406 .read_async(key, |_, v| *v)
407 .await
408 .unwrap_or(0)
409 }
410
411 pub fn update_gauge_diff(
412 &self,
413 old: &crate::types::GaugeState,
414 new: &crate::types::GaugeState,
415 ) {
416 update_gauge_diff_impl!(self, old, new, update_count);
417 }
418
419 pub async fn update_gauge_diff_async(
420 &self,
421 old: &crate::types::GaugeState,
422 new: &crate::types::GaugeState,
423 ) {
424 update_gauge_diff_impl!(self, old, new, update_count_async, await);
425 }
426
427 pub fn update_repo_state<F, T>(
428 batch: &mut OwnedWriteBatch,
429 repos: &Keyspace,
430 did: &Did<'_>,
431 f: F,
432 ) -> Result<Option<(RepoState<'static>, T)>>
433 where
434 F: FnOnce(&mut RepoState, (&[u8], &mut fjall::OwnedWriteBatch)) -> Result<(bool, T)>,
435 {
436 let key = keys::repo_key(did);
437 if let Some(bytes) = repos.get(&key).into_diagnostic()? {
438 let mut state: RepoState = deser_repo_state(bytes.as_ref())?.into_static();
439 let (changed, result) = f(&mut state, (key.as_slice(), batch))?;
440 if changed {
441 batch.insert(repos, key, ser_repo_state(&state)?);
442 }
443 Ok(Some((state, result)))
444 } else {
445 Ok(None)
446 }
447 }
448
449 pub async fn update_repo_state_async<F, T>(
450 &self,
451 did: &Did<'_>,
452 f: F,
453 ) -> Result<Option<(RepoState<'static>, T)>>
454 where
455 F: FnOnce(&mut RepoState, (&[u8], &mut fjall::OwnedWriteBatch)) -> Result<(bool, T)>
456 + Send
457 + 'static,
458 T: Send + 'static,
459 {
460 let mut batch = self.inner.batch();
461 let repos = self.repos.clone();
462 let did = did.clone().into_static();
463
464 tokio::task::spawn_blocking(move || {
465 let Some((state, t)) = Self::update_repo_state(&mut batch, &repos, &did, f)? else {
466 return Ok(None);
467 };
468 batch.commit().into_diagnostic()?;
469 Ok(Some((state, t)))
470 })
471 .await
472 .into_diagnostic()?
473 }
474
475 pub fn repo_gauge_state(
476 repo_state: &RepoState,
477 resync_bytes: Option<&[u8]>,
478 ) -> crate::types::GaugeState {
479 match repo_state.status {
480 crate::types::RepoStatus::Synced => crate::types::GaugeState::Synced,
481 crate::types::RepoStatus::Backfilling => crate::types::GaugeState::Pending,
482 crate::types::RepoStatus::Error(_)
483 | crate::types::RepoStatus::Deactivated
484 | crate::types::RepoStatus::Takendown
485 | crate::types::RepoStatus::Suspended => {
486 if let Some(resync_bytes) = resync_bytes {
487 if let Ok(crate::types::ResyncState::Error { kind, .. }) =
488 rmp_serde::from_slice::<crate::types::ResyncState>(resync_bytes)
489 {
490 crate::types::GaugeState::Resync(Some(kind))
491 } else {
492 crate::types::GaugeState::Resync(None)
493 }
494 } else {
495 crate::types::GaugeState::Resync(None)
496 }
497 }
498 }
499 }
500
501 pub async fn repo_gauge_state_async(
502 &self,
503 repo_state: &RepoState<'_>,
504 did_key: &[u8],
505 ) -> crate::types::GaugeState {
506 let repo_state = repo_state.clone().into_static();
507 let did_key = did_key.to_vec();
508
509 let db_resync = self.resync.clone();
510
511 tokio::task::spawn_blocking(move || {
512 let resync_bytes_opt = db_resync.get(&did_key).ok().flatten();
513 Self::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref())
514 })
515 .await
516 .unwrap_or(crate::types::GaugeState::Resync(None))
517 }
518}
519
520pub fn set_firehose_cursor(db: &Db, relay_id: &RelayId, cursor: i64) -> Result<()> {
521 db.cursors
522 .insert(keys::firehose_cursor_key(relay_id), cursor.to_be_bytes())
523 .into_diagnostic()
524}
525
526pub async fn get_firehose_cursor(db: &Db, relay_id: &RelayId) -> Result<Option<i64>> {
527 let per_relay_key = keys::firehose_cursor_key(relay_id);
528 if let Some(v) = Db::get(db.cursors.clone(), per_relay_key).await? {
529 return Ok(Some(i64::from_be_bytes(
530 v.as_ref()
531 .try_into()
532 .into_diagnostic()
533 .wrap_err("cursor is not 8 bytes")?,
534 )));
535 }
536
537 Db::get(db.cursors.clone(), keys::CURSOR_KEY)
538 .await?
539 .map(|v| {
540 Ok(i64::from_be_bytes(
541 v.as_ref()
542 .try_into()
543 .into_diagnostic()
544 .wrap_err("cursor is not 8 bytes")?,
545 ))
546 })
547 .transpose()
548}
549
550pub fn ser_repo_state(state: &RepoState) -> Result<Vec<u8>> {
551 rmp_serde::to_vec(&state).into_diagnostic()
552}
553
554pub fn deser_repo_state<'b>(bytes: &'b [u8]) -> Result<RepoState<'b>> {
555 rmp_serde::from_slice(bytes).into_diagnostic()
556}
557
558pub fn check_poisoned(e: &fjall::Error) {
559 if matches!(e, fjall::Error::Poisoned) {
560 error!("!!! DATABASE POISONED !!! exiting");
561 std::process::exit(10);
562 }
563}
564
565pub fn check_poisoned_report(e: &miette::Report) {
566 let Some(err) = e.downcast_ref::<fjall::Error>() else {
567 return;
568 };
569 self::check_poisoned(err);
570}
571
572pub fn set_ks_count(batch: &mut OwnedWriteBatch, db: &Db, name: &str, count: u64) {
573 let key = keys::count_keyspace_key(name);
574 batch.insert(&db.counts, key, count.to_be_bytes());
575}
576
577pub fn persist_counts(db: &Db) -> Result<()> {
578 let mut batch = db.inner.batch();
579 db.counts_map.iter_sync(|k, v| {
580 set_ks_count(&mut batch, db, k, *v);
581 true
582 });
583 batch.commit().into_diagnostic()
584}
585
586pub fn set_record_count(
587 batch: &mut OwnedWriteBatch,
588 db: &Db,
589 did: &Did<'_>,
590 collection: &str,
591 count: u64,
592) {
593 let key = keys::count_collection_key(did, collection);
594 batch.insert(&db.counts, key, count.to_be_bytes());
595}
596
597pub fn update_record_count(
598 batch: &mut OwnedWriteBatch,
599 db: &Db,
600 did: &Did<'_>,
601 collection: &str,
602 delta: i64,
603) -> Result<()> {
604 let key = keys::count_collection_key(did, collection);
605 let count = db
606 .counts
607 .get(&key)
608 .into_diagnostic()?
609 .map(|v| -> Result<_> {
610 Ok(u64::from_be_bytes(
611 v.as_ref()
612 .try_into()
613 .into_diagnostic()
614 .wrap_err("expected to be count (8 bytes)")?,
615 ))
616 })
617 .transpose()?
618 .unwrap_or(0);
619 let new_count = if delta >= 0 {
620 count.saturating_add(delta as u64)
621 } else {
622 count.saturating_sub(delta.unsigned_abs())
623 };
624 batch.insert(&db.counts, key, new_count.to_be_bytes());
625 Ok(())
626}
627
628pub fn get_record_count(db: &Db, did: &Did<'_>, collection: &str) -> Result<u64> {
629 let key = keys::count_collection_key(did, collection);
630 let count = db
631 .counts
632 .get(&key)
633 .into_diagnostic()?
634 .map(|v| -> Result<_> {
635 Ok(u64::from_be_bytes(
636 v.as_ref()
637 .try_into()
638 .into_diagnostic()
639 .wrap_err("expected to be count (8 bytes)")?,
640 ))
641 })
642 .transpose()?;
643 Ok(count.unwrap_or(0))
644}