at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall
at main 644 lines 22 kB view raw
1use crate::db::compaction::DropPrefixFilterFactory; 2use crate::types::{BroadcastEvent, RepoState}; 3 4use fjall::config::BlockSizePolicy; 5use fjall::{Database, Keyspace, KeyspaceCreateOptions, OwnedWriteBatch, PersistMode, Slice}; 6use jacquard_common::IntoStatic; 7use jacquard_common::types::string::Did; 8use lsm_tree::compaction::Factory; 9use miette::{Context, IntoDiagnostic, Result}; 10use scc::HashMap; 11use smol_str::SmolStr; 12 13use std::sync::Arc; 14use std::sync::atomic::{AtomicBool, AtomicU64}; 15 16use crate::util::RelayId; 17 18pub mod compaction; 19pub mod filter; 20pub mod gc; 21pub mod keys; 22pub mod refcount; 23pub mod types; 24 25use tokio::sync::broadcast; 26use tracing::error; 27 28fn default_opts() -> KeyspaceCreateOptions { 29 KeyspaceCreateOptions::default() 30} 31 32pub struct Db { 33 pub inner: Arc<Database>, 34 pub repos: Keyspace, 35 pub records: Keyspace, 36 pub blocks: Keyspace, 37 pub cursors: Keyspace, 38 pub pending: Keyspace, 39 pub resync: Keyspace, 40 pub resync_buffer: Keyspace, 41 pub events: Keyspace, 42 pub counts: Keyspace, 43 pub filter: Keyspace, 44 pub crawler: Keyspace, 45 pub block_refs: Keyspace, 46 pub block_reflog: Keyspace, 47 pub block_refcounts: Arc<HashMap<Slice, i64>>, 48 pub gc_ready: Arc<AtomicBool>, 49 pub next_reflog_seq: Arc<AtomicU64>, 50 pub event_tx: broadcast::Sender<BroadcastEvent>, 51 pub next_event_id: Arc<AtomicU64>, 52 pub counts_map: HashMap<SmolStr, u64>, 53} 54 55macro_rules! update_gauge_diff_impl { 56 ($self:ident, $old:ident, $new:ident, $update_method:ident $(, $await:tt)?) => {{ 57 use crate::types::GaugeState; 58 59 if $old == $new { 60 return; 61 } 62 63 // pending 64 match ($old, $new) { 65 (GaugeState::Pending, GaugeState::Pending) => {} 66 (GaugeState::Pending, _) => $self.$update_method("pending", -1) $(.$await)?, 67 (_, GaugeState::Pending) => $self.$update_method("pending", 1) $(.$await)?, 68 _ => {} 69 } 70 71 // resync 72 let old_resync = $old.is_resync(); 73 let new_resync = $new.is_resync(); 74 match (old_resync, new_resync) { 75 (true, false) => $self.$update_method("resync", -1) $(.$await)?, 76 (false, true) => $self.$update_method("resync", 1) $(.$await)?, 77 _ => {} 78 } 79 80 // error kinds 81 if let GaugeState::Resync(Some(kind)) = $old { 82 let key = match kind { 83 crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 84 crate::types::ResyncErrorKind::Transport => "error_transport", 85 crate::types::ResyncErrorKind::Generic => "error_generic", 86 }; 87 $self.$update_method(key, -1) $(.$await)?; 88 } 89 90 if let GaugeState::Resync(Some(kind)) = $new { 91 let key = match kind { 92 crate::types::ResyncErrorKind::Ratelimited => "error_ratelimited", 93 crate::types::ResyncErrorKind::Transport => "error_transport", 94 crate::types::ResyncErrorKind::Generic => "error_generic", 95 }; 96 $self.$update_method(key, 1) $(.$await)?; 97 } 98 }}; 99} 100 101impl Db { 102 pub fn open(cfg: &crate::config::Config) -> Result<Self> { 103 const fn kb(v: u32) -> u32 { 104 v * 1024 105 } 106 const fn mb(v: u64) -> u64 { 107 v * 1024 * 1024 108 } 109 110 let gc_ready = Arc::new(AtomicBool::new(false)); 111 let block_refcounts: Arc<HashMap<Slice, i64>> = Arc::new(HashMap::new()); 112 113 let gc_ready_factory = gc_ready.clone(); 114 let refcounts_factory = block_refcounts.clone(); 115 116 let db = Database::builder(&cfg.database_path) 117 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2) 118 .manual_journal_persist(true) 119 .journal_compression( 120 cfg.disable_lz4_compression 121 .then_some(fjall::CompressionType::None) 122 .unwrap_or(fjall::CompressionType::Lz4), 123 ) 124 .worker_threads(cfg.db_worker_threads) 125 .max_journaling_size(mb(cfg.db_max_journaling_size_mb)) 126 .with_compaction_filter_factories({ 127 let ephemeral = cfg.ephemeral; 128 let f = move |ks: &str| { 129 tracing::info!("with_compaction_filter_factories queried for keyspace: {ks}",); 130 match ks { 131 "counts" => ephemeral.then(|| -> Arc<dyn Factory> { 132 Arc::new(DropPrefixFilterFactory { 133 prefix: keys::COUNT_COLLECTION_PREFIX, 134 }) 135 }), 136 "blocks" => Some(Arc::new(gc::BlocksGcFilterFactory { 137 gc_ready: gc_ready_factory.clone(), 138 refcounts: refcounts_factory.clone(), 139 }) as Arc<dyn Factory>), 140 _ => None, 141 } 142 }; 143 Arc::new(f) 144 }) 145 .open() 146 .into_diagnostic()?; 147 let db = Arc::new(db); 148 149 let opts = default_opts; 150 let open_ks = |name: &str, opts: KeyspaceCreateOptions| { 151 db.keyspace(name, move || opts).into_diagnostic() 152 }; 153 154 let repos = open_ks( 155 "repos", 156 opts() 157 // crawler checks if a repo doesn't exist 158 .expect_point_read_hits(false) 159 .max_memtable_size(mb(cfg.db_repos_memtable_size_mb)) 160 .data_block_size_policy(BlockSizePolicy::all(kb(4))), 161 )?; 162 let blocks = open_ks( 163 "blocks", 164 opts() 165 // point reads are used a lot by stream 166 .expect_point_read_hits(true) 167 .max_memtable_size(mb(cfg.db_blocks_memtable_size_mb)) 168 // 32 - 64 kb is probably fine, as the newer blocks will be in the first levels 169 // and any consumers will probably be streaming the newer events... 170 .data_block_size_policy(BlockSizePolicy::new([kb(4), kb(8), kb(32), kb(64)])), 171 )?; 172 let records = open_ks( 173 "records", 174 // point reads might miss when using getRecord 175 // but we assume thats not going to be used much... (todo: should be a config option maybe?) 176 // since this keyspace is big, turning off bloom filters will help a lot 177 opts() 178 .expect_point_read_hits(true) 179 .max_memtable_size(mb(cfg.db_records_memtable_size_mb)) 180 .data_block_size_policy(BlockSizePolicy::all(kb(8))), 181 )?; 182 let cursors = open_ks( 183 "cursors", 184 opts() 185 // cursor point reads hit almost 100% of the time 186 .expect_point_read_hits(true) 187 .max_memtable_size(mb(4)) 188 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 189 )?; 190 let pending = open_ks( 191 "pending", 192 opts() 193 // iterated over as a queue, no point reads are used so bloom filters are disabled 194 .expect_point_read_hits(true) 195 .max_memtable_size(mb(cfg.db_pending_memtable_size_mb)) 196 .data_block_size_policy(BlockSizePolicy::all(kb(4))), 197 )?; 198 // resync point reads often miss (because most repos aren't resyncing), so keeping the bloom filter helps avoid disk hits 199 let resync = open_ks( 200 "resync", 201 opts() 202 .max_memtable_size(mb(cfg.db_pending_memtable_size_mb)) 203 .data_block_size_policy(BlockSizePolicy::all(kb(8))), 204 )?; 205 let resync_buffer = open_ks( 206 "resync_buffer", 207 opts() 208 // iterated during backfill, no point reads 209 .expect_point_read_hits(true) 210 .max_memtable_size(mb(16)) 211 .data_block_size_policy(BlockSizePolicy::all(kb(32))), 212 )?; 213 let events = open_ks( 214 "events", 215 opts() 216 // only iterators are used here, no point reads 217 .expect_point_read_hits(true) 218 .max_memtable_size(mb(cfg.db_events_memtable_size_mb)) 219 .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])), 220 )?; 221 let counts = open_ks( 222 "counts", 223 opts() 224 // count increments hit because counters are mostly pre-initialized 225 .expect_point_read_hits(true) 226 .max_memtable_size(mb(32)) 227 // the data is very small 228 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 229 )?; 230 231 // filter handles high-volume point reads (checking excludes from firehose) 232 // so it needs the bloom filter 233 let filter = open_ks( 234 "filter", 235 // this can be pretty small since the DIDs wont be compressed that well anyhow 236 opts() 237 .max_memtable_size(mb(16)) 238 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 239 )?; 240 241 let crawler = open_ks( 242 "crawler", 243 opts() 244 .max_memtable_size(mb(16)) 245 .data_block_size_policy(BlockSizePolicy::all(kb(1))), 246 )?; 247 248 let block_refs = open_ks( 249 "block_refs", 250 opts() 251 // only ever iterated on 252 .expect_point_read_hits(true) 253 .max_memtable_size(mb(cfg.db_records_memtable_size_mb / 2)) 254 .data_block_size_policy(BlockSizePolicy::all(kb(64))), 255 )?; 256 257 let block_reflog = open_ks( 258 "block_reflog", 259 opts() 260 // only ever iterated on 261 .expect_point_read_hits(true) 262 .max_memtable_size(mb(4)) 263 .data_block_size_policy(BlockSizePolicy::all(kb(64))), 264 )?; 265 266 let mut last_id = 0; 267 if let Some(guard) = events.iter().next_back() { 268 let k = guard.key().into_diagnostic()?; 269 last_id = u64::from_be_bytes( 270 k.as_ref() 271 .try_into() 272 .into_diagnostic() 273 .wrap_err("expected to be id (8 bytes)")?, 274 ); 275 } 276 277 let mut last_reflog_seq = 0u64; 278 if let Some(guard) = block_reflog.iter().next_back() { 279 let k = guard.key().into_diagnostic()?; 280 last_reflog_seq = u64::from_be_bytes( 281 k.as_ref() 282 .try_into() 283 .into_diagnostic() 284 .wrap_err("expected to be reflog seq (8 bytes)")?, 285 ); 286 } 287 let next_reflog_seq = Arc::new(AtomicU64::new(last_reflog_seq.saturating_add(1))); 288 289 // load counts into memory 290 let counts_map = HashMap::new(); 291 for guard in counts.prefix(keys::COUNT_KS_PREFIX) { 292 let (k, v) = guard.into_inner().into_diagnostic()?; 293 let name = std::str::from_utf8(&k[keys::COUNT_KS_PREFIX.len()..]) 294 .into_diagnostic() 295 .wrap_err("expected valid utf8 for ks count key")?; 296 let _ = counts_map.insert_sync( 297 SmolStr::new(name), 298 u64::from_be_bytes(v.as_ref().try_into().unwrap()), 299 ); 300 } 301 // ensure critical counts are initialized 302 for ks_name in ["repos", "pending", "resync"] { 303 let _ = counts_map 304 .entry_sync(SmolStr::new(ks_name)) 305 .or_insert_with(|| { 306 let ks = match ks_name { 307 "repos" => &repos, 308 "pending" => &pending, 309 "resync" => &resync, 310 _ => unreachable!(), 311 }; 312 ks.iter().count() as u64 313 }); 314 } 315 316 let (event_tx, _) = broadcast::channel(10000); 317 318 Ok(Self { 319 inner: db, 320 repos, 321 records, 322 blocks, 323 cursors, 324 pending, 325 resync, 326 resync_buffer, 327 events, 328 counts, 329 filter, 330 crawler, 331 block_refs, 332 block_reflog, 333 block_refcounts, 334 gc_ready, 335 next_reflog_seq, 336 event_tx, 337 counts_map, 338 next_event_id: Arc::new(AtomicU64::new(last_id + 1)), 339 }) 340 } 341 342 pub fn persist(&self) -> Result<()> { 343 self.inner.persist(PersistMode::SyncAll).into_diagnostic()?; 344 Ok(()) 345 } 346 347 pub async fn get(ks: Keyspace, key: impl Into<Slice>) -> Result<Option<Slice>> { 348 let key = key.into(); 349 tokio::task::spawn_blocking(move || ks.get(key).into_diagnostic()) 350 .await 351 .into_diagnostic()? 352 } 353 354 #[allow(dead_code)] 355 pub async fn insert( 356 ks: Keyspace, 357 key: impl Into<Slice>, 358 value: impl Into<Slice>, 359 ) -> Result<()> { 360 let key = key.into(); 361 let value = value.into(); 362 tokio::task::spawn_blocking(move || ks.insert(key, value).into_diagnostic()) 363 .await 364 .into_diagnostic()? 365 } 366 367 #[allow(dead_code)] 368 pub async fn remove(ks: Keyspace, key: impl Into<Slice>) -> Result<()> { 369 let key = key.into(); 370 tokio::task::spawn_blocking(move || ks.remove(key).into_diagnostic()) 371 .await 372 .into_diagnostic()? 373 } 374 375 pub async fn contains_key(ks: Keyspace, key: impl Into<Slice>) -> Result<bool> { 376 let key = key.into(); 377 tokio::task::spawn_blocking(move || ks.contains_key(key).into_diagnostic()) 378 .await 379 .into_diagnostic()? 380 } 381 382 pub fn update_count(&self, key: &str, delta: i64) { 383 let mut entry = self.counts_map.entry_sync(SmolStr::new(key)).or_insert(0); 384 if delta >= 0 { 385 *entry = entry.saturating_add(delta as u64); 386 } else { 387 *entry = entry.saturating_sub(delta.unsigned_abs()); 388 } 389 } 390 391 pub async fn update_count_async(&self, key: &str, delta: i64) { 392 let mut entry = self 393 .counts_map 394 .entry_async(SmolStr::new(key)) 395 .await 396 .or_insert(0); 397 if delta >= 0 { 398 *entry = entry.saturating_add(delta as u64); 399 } else { 400 *entry = entry.saturating_sub(delta.unsigned_abs()); 401 } 402 } 403 404 pub async fn get_count(&self, key: &str) -> u64 { 405 self.counts_map 406 .read_async(key, |_, v| *v) 407 .await 408 .unwrap_or(0) 409 } 410 411 pub fn update_gauge_diff( 412 &self, 413 old: &crate::types::GaugeState, 414 new: &crate::types::GaugeState, 415 ) { 416 update_gauge_diff_impl!(self, old, new, update_count); 417 } 418 419 pub async fn update_gauge_diff_async( 420 &self, 421 old: &crate::types::GaugeState, 422 new: &crate::types::GaugeState, 423 ) { 424 update_gauge_diff_impl!(self, old, new, update_count_async, await); 425 } 426 427 pub fn update_repo_state<F, T>( 428 batch: &mut OwnedWriteBatch, 429 repos: &Keyspace, 430 did: &Did<'_>, 431 f: F, 432 ) -> Result<Option<(RepoState<'static>, T)>> 433 where 434 F: FnOnce(&mut RepoState, (&[u8], &mut fjall::OwnedWriteBatch)) -> Result<(bool, T)>, 435 { 436 let key = keys::repo_key(did); 437 if let Some(bytes) = repos.get(&key).into_diagnostic()? { 438 let mut state: RepoState = deser_repo_state(bytes.as_ref())?.into_static(); 439 let (changed, result) = f(&mut state, (key.as_slice(), batch))?; 440 if changed { 441 batch.insert(repos, key, ser_repo_state(&state)?); 442 } 443 Ok(Some((state, result))) 444 } else { 445 Ok(None) 446 } 447 } 448 449 pub async fn update_repo_state_async<F, T>( 450 &self, 451 did: &Did<'_>, 452 f: F, 453 ) -> Result<Option<(RepoState<'static>, T)>> 454 where 455 F: FnOnce(&mut RepoState, (&[u8], &mut fjall::OwnedWriteBatch)) -> Result<(bool, T)> 456 + Send 457 + 'static, 458 T: Send + 'static, 459 { 460 let mut batch = self.inner.batch(); 461 let repos = self.repos.clone(); 462 let did = did.clone().into_static(); 463 464 tokio::task::spawn_blocking(move || { 465 let Some((state, t)) = Self::update_repo_state(&mut batch, &repos, &did, f)? else { 466 return Ok(None); 467 }; 468 batch.commit().into_diagnostic()?; 469 Ok(Some((state, t))) 470 }) 471 .await 472 .into_diagnostic()? 473 } 474 475 pub fn repo_gauge_state( 476 repo_state: &RepoState, 477 resync_bytes: Option<&[u8]>, 478 ) -> crate::types::GaugeState { 479 match repo_state.status { 480 crate::types::RepoStatus::Synced => crate::types::GaugeState::Synced, 481 crate::types::RepoStatus::Backfilling => crate::types::GaugeState::Pending, 482 crate::types::RepoStatus::Error(_) 483 | crate::types::RepoStatus::Deactivated 484 | crate::types::RepoStatus::Takendown 485 | crate::types::RepoStatus::Suspended => { 486 if let Some(resync_bytes) = resync_bytes { 487 if let Ok(crate::types::ResyncState::Error { kind, .. }) = 488 rmp_serde::from_slice::<crate::types::ResyncState>(resync_bytes) 489 { 490 crate::types::GaugeState::Resync(Some(kind)) 491 } else { 492 crate::types::GaugeState::Resync(None) 493 } 494 } else { 495 crate::types::GaugeState::Resync(None) 496 } 497 } 498 } 499 } 500 501 pub async fn repo_gauge_state_async( 502 &self, 503 repo_state: &RepoState<'_>, 504 did_key: &[u8], 505 ) -> crate::types::GaugeState { 506 let repo_state = repo_state.clone().into_static(); 507 let did_key = did_key.to_vec(); 508 509 let db_resync = self.resync.clone(); 510 511 tokio::task::spawn_blocking(move || { 512 let resync_bytes_opt = db_resync.get(&did_key).ok().flatten(); 513 Self::repo_gauge_state(&repo_state, resync_bytes_opt.as_deref()) 514 }) 515 .await 516 .unwrap_or(crate::types::GaugeState::Resync(None)) 517 } 518} 519 520pub fn set_firehose_cursor(db: &Db, relay_id: &RelayId, cursor: i64) -> Result<()> { 521 db.cursors 522 .insert(keys::firehose_cursor_key(relay_id), cursor.to_be_bytes()) 523 .into_diagnostic() 524} 525 526pub async fn get_firehose_cursor(db: &Db, relay_id: &RelayId) -> Result<Option<i64>> { 527 let per_relay_key = keys::firehose_cursor_key(relay_id); 528 if let Some(v) = Db::get(db.cursors.clone(), per_relay_key).await? { 529 return Ok(Some(i64::from_be_bytes( 530 v.as_ref() 531 .try_into() 532 .into_diagnostic() 533 .wrap_err("cursor is not 8 bytes")?, 534 ))); 535 } 536 537 Db::get(db.cursors.clone(), keys::CURSOR_KEY) 538 .await? 539 .map(|v| { 540 Ok(i64::from_be_bytes( 541 v.as_ref() 542 .try_into() 543 .into_diagnostic() 544 .wrap_err("cursor is not 8 bytes")?, 545 )) 546 }) 547 .transpose() 548} 549 550pub fn ser_repo_state(state: &RepoState) -> Result<Vec<u8>> { 551 rmp_serde::to_vec(&state).into_diagnostic() 552} 553 554pub fn deser_repo_state<'b>(bytes: &'b [u8]) -> Result<RepoState<'b>> { 555 rmp_serde::from_slice(bytes).into_diagnostic() 556} 557 558pub fn check_poisoned(e: &fjall::Error) { 559 if matches!(e, fjall::Error::Poisoned) { 560 error!("!!! DATABASE POISONED !!! exiting"); 561 std::process::exit(10); 562 } 563} 564 565pub fn check_poisoned_report(e: &miette::Report) { 566 let Some(err) = e.downcast_ref::<fjall::Error>() else { 567 return; 568 }; 569 self::check_poisoned(err); 570} 571 572pub fn set_ks_count(batch: &mut OwnedWriteBatch, db: &Db, name: &str, count: u64) { 573 let key = keys::count_keyspace_key(name); 574 batch.insert(&db.counts, key, count.to_be_bytes()); 575} 576 577pub fn persist_counts(db: &Db) -> Result<()> { 578 let mut batch = db.inner.batch(); 579 db.counts_map.iter_sync(|k, v| { 580 set_ks_count(&mut batch, db, k, *v); 581 true 582 }); 583 batch.commit().into_diagnostic() 584} 585 586pub fn set_record_count( 587 batch: &mut OwnedWriteBatch, 588 db: &Db, 589 did: &Did<'_>, 590 collection: &str, 591 count: u64, 592) { 593 let key = keys::count_collection_key(did, collection); 594 batch.insert(&db.counts, key, count.to_be_bytes()); 595} 596 597pub fn update_record_count( 598 batch: &mut OwnedWriteBatch, 599 db: &Db, 600 did: &Did<'_>, 601 collection: &str, 602 delta: i64, 603) -> Result<()> { 604 let key = keys::count_collection_key(did, collection); 605 let count = db 606 .counts 607 .get(&key) 608 .into_diagnostic()? 609 .map(|v| -> Result<_> { 610 Ok(u64::from_be_bytes( 611 v.as_ref() 612 .try_into() 613 .into_diagnostic() 614 .wrap_err("expected to be count (8 bytes)")?, 615 )) 616 }) 617 .transpose()? 618 .unwrap_or(0); 619 let new_count = if delta >= 0 { 620 count.saturating_add(delta as u64) 621 } else { 622 count.saturating_sub(delta.unsigned_abs()) 623 }; 624 batch.insert(&db.counts, key, new_count.to_be_bytes()); 625 Ok(()) 626} 627 628pub fn get_record_count(db: &Db, did: &Did<'_>, collection: &str) -> Result<u64> { 629 let key = keys::count_collection_key(did, collection); 630 let count = db 631 .counts 632 .get(&key) 633 .into_diagnostic()? 634 .map(|v| -> Result<_> { 635 Ok(u64::from_be_bytes( 636 v.as_ref() 637 .try_into() 638 .into_diagnostic() 639 .wrap_err("expected to be count (8 bytes)")?, 640 )) 641 }) 642 .transpose()?; 643 Ok(count.unwrap_or(0)) 644}