Fast and robust atproto CAR file processing in rust
1/*!
2Disk storage for blocks on disk
3
4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5to be the best behaved in terms of both on-disk space usage and memory usage.
6
7```no_run
8# use repo_stream::{DiskBuilder, DiskError};
9# #[tokio::main]
10# async fn main() -> Result<(), DiskError> {
11let store = DiskBuilder::new()
12 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
13 .open("/some/path.db".into()).await?;
14# Ok(())
15# }
16```
17*/
18
19use crate::drive::DriveError;
20use candystore::{CandyError, CandyStore, Config};
21use std::path::PathBuf;
22
23#[derive(Debug, thiserror::Error)]
24pub enum DiskError {
25 /// A wrapped database error
26 ///
27 /// (The wrapped err should probably be obscured to remove public-facing
28 /// sqlite bits)
29 #[error(transparent)]
30 DbError(#[from] CandyError),
31 /// Unfortunately candystore uses anyhow::Result for it's open call
32 #[error("Failed on a db call, see logs")]
33 DbGarbageError,
34 /// A tokio blocking task failed to join
35 #[error("Failed to join a tokio blocking task: {0}")]
36 JoinError(#[from] tokio::task::JoinError),
37 /// The total size of stored blocks exceeded the allowed size
38 ///
39 /// If you need to process *really* big CARs, you can configure a higher
40 /// limit.
41 #[error("Maximum disk size reached")]
42 MaxSizeExceeded,
43}
44
45/// Builder-style disk store setup
46#[derive(Debug, Clone)]
47pub struct DiskBuilder {
48 /// Database stored block size limit
49 ///
50 /// Default: 10 GiB
51 ///
52 /// Note: actual size on disk may be more, but should approximately scale
53 /// with this limit
54 pub max_stored_mb: usize,
55}
56
57impl Default for DiskBuilder {
58 fn default() -> Self {
59 Self {
60 max_stored_mb: 10 * 1024, // 10 GiB
61 }
62 }
63}
64
65impl DiskBuilder {
66 /// Begin configuring the storage with defaults
67 pub fn new() -> Self {
68 Default::default()
69 }
70 /// Set the approximate stored block size limit
71 ///
72 /// Default: 10 GiB
73 pub fn with_max_stored_mb(mut self, max: usize) -> Self {
74 self.max_stored_mb = max;
75 self
76 }
77 /// Open and initialize the actual disk storage
78 pub async fn open(&self, path: PathBuf, keys_hint: Option<usize>) -> Result<DiskStore, DiskError> {
79 DiskStore::new(path, self.max_stored_mb, keys_hint).await
80 }
81}
82
83/// On-disk block storage
84pub struct DiskStore {
85 db: CandyStore,
86 max_stored: usize,
87 stored: usize,
88}
89
90impl DiskStore {
91 /// Initialize a new disk store
92 pub async fn new(path: PathBuf, max_stored_mb: usize, keys_hint: Option<usize>) -> Result<Self, DiskError> {
93 let max_stored = max_stored_mb * 2_usize.pow(20);
94 let db = tokio::task::spawn_blocking(move || {
95 let mut conf = Config::default();
96 // conf.max_shard_size = 256 * 1024 * 1024;
97 // conf.min_compaction_threashold = 32 * 1024 * 1024;
98 // conf.expected_number_of_keys = 1_200_000;
99 if let Some(hint) = keys_hint {
100 conf.expected_number_of_keys = hint;
101 }
102 conf.num_compaction_threads = 1;
103 let db = CandyStore::open(path, conf).map_err(|e| {
104 log::error!("{e:?}");
105 DiskError::DbGarbageError
106 })?;
107
108 Ok::<_, DiskError>(db)
109 })
110 .await??;
111
112 Ok(Self {
113 db,
114 max_stored,
115 stored: 0,
116 })
117 }
118
119 /// Drop and recreate the kv table
120 pub async fn reset(self) -> Result<Self, DiskError> {
121 tokio::task::spawn_blocking(move || {
122 Self::reset_tables(&self.db)?;
123 Ok(self)
124 })
125 .await?
126 }
127 fn reset_tables(db: &CandyStore) -> Result<(), DiskError> {
128 db.clear().map_err(|e| {
129 log::error!("{e:?}");
130 DiskError::DbGarbageError
131 })?;
132
133 Ok(())
134 }
135
136 pub(crate) fn put_many(
137 &mut self,
138 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
139 ) -> Result<(), DriveError> {
140 for pair in kv {
141 let (k, v) = pair?;
142 self.stored += v.len();
143 if self.stored > self.max_stored {
144 return Err(DiskError::MaxSizeExceeded.into());
145 }
146 self.db.owned_set(k, &v).map_err(|e| {
147 log::error!("{e:?}");
148 DiskError::DbGarbageError
149 })?;
150 }
151 Ok(())
152 }
153 pub(crate) fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, DiskError> {
154 self.db.owned_get(key).map_err(|e| {
155 log::error!("{e:?}");
156 DiskError::DbGarbageError
157 })
158 }
159}