lightweight com.atproto.sync.listReposByCollection
at main 119 lines 4.8 kB view raw
1pub mod backfill_progress; 2pub mod collection_index; 3pub mod error; 4pub mod firehose_cursor; 5pub mod list_hosts_cursor; 6pub mod meta; 7pub mod pds_host; 8pub mod repo; 9pub mod resync_buffer; 10pub mod resync_queue; 11 12pub(crate) use error::{StorageError, StorageResult}; 13pub(crate) use meta::StatsRef; 14pub(crate) use repo::Account; 15 16// --------------------------------------------------------------------------- 17// key prefixes 18// 19// could be a single byte each really, but making them slightly human-readable 20// is nice. fjall's key compression probably means we could make these full 21// words, but they feel better short. 22// --------------------------------------------------------------------------- 23 24/// Fixed-length (3 byte) key prefix per data type 25type KeyPrefix = [u8; 3]; 26 27/// Main collection index (collection → did). See [`collection_index`]. 28pub(super) const PREFIX_RBC: KeyPrefix = *b"rbc"; 29/// Reversed collection index (did → collection). See [`collection_index`]. 30pub(super) const PREFIX_CBR: KeyPrefix = *b"cbr"; 31/// Per-repo state and account status. See [`repo`]. 32pub(super) const PREFIX_REPO: KeyPrefix = *b"rep"; 33/// Per-repo transient sync state (rev + prevData CID). See [`repo`]. 34pub(super) const PREFIX_REPO_PREV: KeyPrefix = *b"rev"; 35/// Firehose subscription cursor (per relay host). See [`firehose_cursor`]. 36pub(super) const PREFIX_SUBSCRIBE_REPOS: KeyPrefix = *b"sub"; 37/// listRepos backfill walk progress (per relay host). See [`backfill_progress`]. 38pub(super) const PREFIX_LIST_REPOS: KeyPrefix = *b"lsr"; 39/// Timestamp-ordered resync work queue. See [`resync_queue`]. 40pub(super) const PREFIX_RESYNC_QUEUE: KeyPrefix = *b"rsq"; 41/// Per-repo buffered firehose events during resync. See [`resync_buffer`]. 42pub(super) const PREFIX_RESYNC_BUFFER: KeyPrefix = *b"rsb"; 43/// Per-PDS host state (sync1.1 mode, trust, listRepos cursor/done). See [`pds_host`]. 44pub(super) const PREFIX_PDS_HOST: KeyPrefix = *b"pdh"; 45/// listHosts walk cursor (per upstream relay host). See [`list_hosts_cursor`]. 46pub(super) const PREFIX_LIST_HOSTS: KeyPrefix = *b"lhs"; 47/// Persistent system stats and cardinality sketches. See [`meta`]. 48pub(super) const PREFIX_META: KeyPrefix = *b"met"; 49 50use std::path::Path; 51use std::sync::Arc; 52 53/// Shared handle to the fjall database and its per-concern keyspaces. 54/// 55/// In fjall 3.x, `Database` is the top-level multi-keyspace container and 56/// `Keyspace` is an individual column-family (the old `PartitionHandle`). 57pub struct Db { 58 pub(crate) database: fjall::Database, 59 /// General-purpose keyspace: repo state, queues, cursors, etc. 60 pub(crate) ks: fjall::Keyspace, 61 /// Collection index keyspace: rbc + cbr ranges. 62 /// 63 /// Tuned for scan-heavy access: 64 KiB blocks (amortises per-block overhead 64 /// across sequential reads) and Lz4 compression at all levels (higher 65 /// on-disk density means more data fits in the block cache). 66 pub(crate) index_ks: fjall::Keyspace, 67 /// Persistent system stats and cardinality sketches, loaded on open. 68 pub(crate) stats: StatsRef, 69} 70 71/// Cheaply-cloneable reference to the shared database. 72pub type DbRef = Arc<Db>; 73 74/// Open (or create) the fjall database at `path` and return a shared handle. 75pub fn open(path: &Path, cache_mb: u64) -> StorageResult<DbRef> { 76 open_inner(path, DbConfig::ForReal { cache_mb }) 77} 78 79enum DbConfig { 80 /// temporary db for tests 81 #[allow(dead_code)] 82 Testing, 83 /// bumpable cache for prod 84 ForReal { cache_mb: u64 }, 85} 86 87/// Open a temporary database that deletes itself on drop. For tests only. 88#[cfg(test)] 89pub(crate) fn open_temporary() -> StorageResult<DbRef> { 90 use std::sync::atomic::{AtomicU64, Ordering}; 91 static COUNTER: AtomicU64 = AtomicU64::new(0); 92 let n = COUNTER.fetch_add(1, Ordering::Relaxed); 93 let path = std::env::temp_dir().join(format!("lightrail-test-{}-{}", std::process::id(), n)); 94 open_inner(&path, DbConfig::Testing) 95} 96 97fn open_inner(path: &Path, config: DbConfig) -> StorageResult<DbRef> { 98 let builder = fjall::Database::builder(path); 99 let builder = match config { 100 DbConfig::Testing => builder.temporary(true), 101 DbConfig::ForReal { cache_mb } => builder.cache_size(cache_mb * 2_u64.pow(20)), 102 }; 103 let database = builder.open()?; 104 let ks = database.keyspace("default", fjall::KeyspaceCreateOptions::default)?; 105 let index_ks = database.keyspace("index", || { 106 fjall::KeyspaceCreateOptions::default() 107 .data_block_size_policy(fjall::config::BlockSizePolicy::all(64 * 1_024)) 108 .data_block_compression_policy(fjall::config::CompressionPolicy::all( 109 fjall::CompressionType::Lz4, 110 )) 111 })?; 112 let stats = meta::load(&ks)?; 113 Ok(Arc::new(Db { 114 database, 115 ks, 116 index_ks, 117 stats, 118 })) 119}