lightweight
com.atproto.sync.listReposByCollection
1pub mod backfill_progress;
2pub mod collection_index;
3pub mod error;
4pub mod firehose_cursor;
5pub mod list_hosts_cursor;
6pub mod meta;
7pub mod pds_host;
8pub mod repo;
9pub mod resync_buffer;
10pub mod resync_queue;
11
12pub(crate) use error::{StorageError, StorageResult};
13pub(crate) use meta::StatsRef;
14pub(crate) use repo::Account;
15
16// ---------------------------------------------------------------------------
17// key prefixes
18//
19// could be a single byte each really, but making them slightly human-readable
20// is nice. fjall's key compression probably means we could make these full
21// words, but they feel better short.
22// ---------------------------------------------------------------------------
23
24/// Fixed-length (3 byte) key prefix per data type
25type KeyPrefix = [u8; 3];
26
27/// Main collection index (collection → did). See [`collection_index`].
28pub(super) const PREFIX_RBC: KeyPrefix = *b"rbc";
29/// Reversed collection index (did → collection). See [`collection_index`].
30pub(super) const PREFIX_CBR: KeyPrefix = *b"cbr";
31/// Per-repo state and account status. See [`repo`].
32pub(super) const PREFIX_REPO: KeyPrefix = *b"rep";
33/// Per-repo transient sync state (rev + prevData CID). See [`repo`].
34pub(super) const PREFIX_REPO_PREV: KeyPrefix = *b"rev";
35/// Firehose subscription cursor (per relay host). See [`firehose_cursor`].
36pub(super) const PREFIX_SUBSCRIBE_REPOS: KeyPrefix = *b"sub";
37/// listRepos backfill walk progress (per relay host). See [`backfill_progress`].
38pub(super) const PREFIX_LIST_REPOS: KeyPrefix = *b"lsr";
39/// Timestamp-ordered resync work queue. See [`resync_queue`].
40pub(super) const PREFIX_RESYNC_QUEUE: KeyPrefix = *b"rsq";
41/// Per-repo buffered firehose events during resync. See [`resync_buffer`].
42pub(super) const PREFIX_RESYNC_BUFFER: KeyPrefix = *b"rsb";
43/// Per-PDS host state (sync1.1 mode, trust, listRepos cursor/done). See [`pds_host`].
44pub(super) const PREFIX_PDS_HOST: KeyPrefix = *b"pdh";
45/// listHosts walk cursor (per upstream relay host). See [`list_hosts_cursor`].
46pub(super) const PREFIX_LIST_HOSTS: KeyPrefix = *b"lhs";
47/// Persistent system stats and cardinality sketches. See [`meta`].
48pub(super) const PREFIX_META: KeyPrefix = *b"met";
49
50use std::path::Path;
51use std::sync::Arc;
52
53/// Shared handle to the fjall database and its per-concern keyspaces.
54///
55/// In fjall 3.x, `Database` is the top-level multi-keyspace container and
56/// `Keyspace` is an individual column-family (the old `PartitionHandle`).
57pub struct Db {
58 pub(crate) database: fjall::Database,
59 /// General-purpose keyspace: repo state, queues, cursors, etc.
60 pub(crate) ks: fjall::Keyspace,
61 /// Collection index keyspace: rbc + cbr ranges.
62 ///
63 /// Tuned for scan-heavy access: 64 KiB blocks (amortises per-block overhead
64 /// across sequential reads) and Lz4 compression at all levels (higher
65 /// on-disk density means more data fits in the block cache).
66 pub(crate) index_ks: fjall::Keyspace,
67 /// Persistent system stats and cardinality sketches, loaded on open.
68 pub(crate) stats: StatsRef,
69}
70
71/// Cheaply-cloneable reference to the shared database.
72pub type DbRef = Arc<Db>;
73
74/// Open (or create) the fjall database at `path` and return a shared handle.
75pub fn open(path: &Path, cache_mb: u64) -> StorageResult<DbRef> {
76 open_inner(path, DbConfig::ForReal { cache_mb })
77}
78
79enum DbConfig {
80 /// temporary db for tests
81 #[allow(dead_code)]
82 Testing,
83 /// bumpable cache for prod
84 ForReal { cache_mb: u64 },
85}
86
87/// Open a temporary database that deletes itself on drop. For tests only.
88#[cfg(test)]
89pub(crate) fn open_temporary() -> StorageResult<DbRef> {
90 use std::sync::atomic::{AtomicU64, Ordering};
91 static COUNTER: AtomicU64 = AtomicU64::new(0);
92 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
93 let path = std::env::temp_dir().join(format!("lightrail-test-{}-{}", std::process::id(), n));
94 open_inner(&path, DbConfig::Testing)
95}
96
97fn open_inner(path: &Path, config: DbConfig) -> StorageResult<DbRef> {
98 let builder = fjall::Database::builder(path);
99 let builder = match config {
100 DbConfig::Testing => builder.temporary(true),
101 DbConfig::ForReal { cache_mb } => builder.cache_size(cache_mb * 2_u64.pow(20)),
102 };
103 let database = builder.open()?;
104 let ks = database.keyspace("default", fjall::KeyspaceCreateOptions::default)?;
105 let index_ks = database.keyspace("index", || {
106 fjall::KeyspaceCreateOptions::default()
107 .data_block_size_policy(fjall::config::BlockSizePolicy::all(64 * 1_024))
108 .data_block_compression_policy(fjall::config::CompressionPolicy::all(
109 fjall::CompressionType::Lz4,
110 ))
111 })?;
112 let stats = meta::load(&ks)?;
113 Ok(Arc::new(Db {
114 database,
115 ks,
116 index_ks,
117 stats,
118 }))
119}