Fast and robust atproto CAR file processing in rust
1/*!
2Disk storage for blocks on disk
3
4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5to be the best behaved in terms of both on-disk space usage and memory usage.
6
7```no_run
8# use repo_stream::{DiskBuilder, DiskError};
9# #[tokio::main]
10# async fn main() -> Result<(), DiskError> {
11let store = DiskBuilder::new()
12 .with_cache_size_mb(32)
13 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14 .open("/some/path.db".into()).await?;
15# Ok(())
16# }
17```
18*/
19
20use crate::drive::DriveError;
21use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy};
22use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions};
23use std::path::PathBuf;
24
25#[derive(Debug, thiserror::Error)]
26pub enum DiskError {
27 /// A wrapped database error
28 ///
29 /// (The wrapped err should probably be obscured to remove public-facing
30 /// sqlite bits)
31 #[error(transparent)]
32 DbError(#[from] FjallError),
33 /// A tokio blocking task failed to join
34 #[error("Failed to join a tokio blocking task: {0}")]
35 JoinError(#[from] tokio::task::JoinError),
36 /// The total size of stored blocks exceeded the allowed size
37 ///
38 /// If you need to process *really* big CARs, you can configure a higher
39 /// limit.
40 #[error("Maximum disk size reached")]
41 MaxSizeExceeded,
42}
43
44/// Builder-style disk store setup
45#[derive(Debug, Clone)]
46pub struct DiskBuilder {
47 /// Database in-memory cache allowance
48 ///
49 /// Default: 32 MiB
50 pub cache_size_mb: usize,
51 /// Database stored block size limit
52 ///
53 /// Default: 10 GiB
54 ///
55 /// Note: actual size on disk may be more, but should approximately scale
56 /// with this limit
57 pub max_stored_mb: usize,
58}
59
60impl Default for DiskBuilder {
61 fn default() -> Self {
62 Self {
63 cache_size_mb: 64,
64 max_stored_mb: 10 * 1024, // 10 GiB
65 }
66 }
67}
68
69impl DiskBuilder {
70 /// Begin configuring the storage with defaults
71 pub fn new() -> Self {
72 Default::default()
73 }
74 /// Set the in-memory cache allowance for the database
75 ///
76 /// Default: 64 MiB
77 pub fn with_cache_size_mb(mut self, size: usize) -> Self {
78 self.cache_size_mb = size;
79 self
80 }
81 /// Set the approximate stored block size limit
82 ///
83 /// Default: 10 GiB
84 pub fn with_max_stored_mb(mut self, max: usize) -> Self {
85 self.max_stored_mb = max;
86 self
87 }
88 /// Open and initialize the actual disk storage
89 pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
90 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
91 }
92}
93
94/// On-disk block storage
95pub struct DiskStore {
96 #[allow(unused)]
97 db: Database,
98 partition: Keyspace,
99 max_stored: usize,
100 stored: usize,
101}
102
103impl DiskStore {
104 /// Initialize a new disk store
105 pub async fn new(
106 path: PathBuf,
107 cache_mb: usize,
108 max_stored_mb: usize,
109 ) -> Result<Self, DiskError> {
110 let max_stored = max_stored_mb * 2_usize.pow(20);
111 let (db, partition) = tokio::task::spawn_blocking(move || {
112 let db = Database::builder(path)
113 // .manual_journal_persist(true)
114 // .flush_workers(1)
115 // .compaction_workers(1)
116 .journal_compression(CompressionType::None)
117 .cache_size(cache_mb as u64 * 2_u64.pow(20))
118 .temporary(true)
119 .open()?;
120 let opts = KeyspaceCreateOptions::default()
121 .data_block_restart_interval_policy(RestartIntervalPolicy::all(8))
122 .filter_block_pinning_policy(PinningPolicy::disabled())
123 .expect_point_read_hits(true)
124 .data_block_compression_policy(CompressionPolicy::disabled())
125 .manual_journal_persist(true)
126 .max_memtable_size(32 * 2_u64.pow(20));
127 let partition = db.keyspace("z", || opts)?;
128
129 Ok::<_, DiskError>((db, partition))
130 })
131 .await??;
132
133 Ok(Self {
134 db,
135 partition,
136 max_stored,
137 stored: 0,
138 })
139 }
140
141 pub(crate) fn put_many(
142 &mut self,
143 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
144 ) -> Result<(), DriveError> {
145 let mut batch = self.db.batch();
146 for pair in kv {
147 let (k, v) = pair?;
148 self.stored += v.len();
149 if self.stored > self.max_stored {
150 return Err(DiskError::MaxSizeExceeded.into());
151 }
152 batch.insert(&self.partition, k, v);
153 }
154 batch.commit().map_err(DiskError::DbError)?;
155 Ok(())
156 }
157
158 #[inline]
159 pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> {
160 self.partition.get(key)
161 }
162}