+1
-1
Cargo.lock
+1
-1
Cargo.lock
+2
-2
Cargo.toml
+2
-2
Cargo.toml
+4
benches/non-huge-cars.rs
+4
benches/non-huge-cars.rs
···
3
4
use criterion::{Criterion, criterion_group, criterion_main};
5
6
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
7
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
8
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
13
.build()
14
.expect("Creating runtime failed");
15
16
c.bench_function("tiny-car", |b| {
17
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
18
});
···
3
4
use criterion::{Criterion, criterion_group, criterion_main};
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
14
.build()
15
.expect("Creating runtime failed");
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
20
c.bench_function("tiny-car", |b| {
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
22
});
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+11
-12
examples/disk-read-file/main.rs
+11
-12
examples/disk-read-file/main.rs
···
4
5
extern crate repo_stream;
6
use clap::Parser;
7
-
use repo_stream::{DiskStore, Driver, process::noop};
8
use std::path::PathBuf;
9
10
#[derive(Debug, Parser)]
11
struct Args {
···
26
let reader = tokio::fs::File::open(car).await?;
27
let reader = tokio::io::BufReader::new(reader);
28
29
-
// configure how much memory can be used before spilling to disk.
30
-
// real memory usage may differ somewhat.
31
-
let in_mem_limit = 10; // MiB
32
-
33
-
// configure how much memory sqlite is allowed to use when dumping to disk
34
-
let db_cache_mb = 32; // MiB
35
-
36
log::info!("hello! reading the car...");
37
38
// in this example we only bother handling CARs that are too big for memory
39
// `noop` helper means: do no block processing, store the raw blocks
40
-
let driver = match Driver::load_car(reader, noop, in_mem_limit).await? {
41
Driver::Memory(_, _) => panic!("try this on a bigger car"),
42
Driver::Disk(big_stuff) => {
43
// we reach here if the repo was too big and needs to be spilled to
44
// disk to continue
45
46
// set up a disk store we can spill to
47
-
let disk_store = DiskStore::new(tmpfile.clone(), db_cache_mb).await?;
48
49
// do the spilling, get back a (similar) driver
50
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
51
52
// at this point you might want to fetch the account's signing key
53
// via the DID from the commit, and then verify the signature.
54
-
log::warn!("big's comit: {:?}", commit);
55
56
// pop the driver back out to get some code indentation relief
57
driver
···
81
}
82
}
83
84
-
log::info!("arrived! joining rx...");
85
86
// clean up the database. would be nice to do this in drop so it happens
87
// automatically, but some blocking work happens, so that's not allowed in
···
4
5
extern crate repo_stream;
6
use clap::Parser;
7
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
8
use std::path::PathBuf;
9
+
use std::time::Instant;
10
11
#[derive(Debug, Parser)]
12
struct Args {
···
27
let reader = tokio::fs::File::open(car).await?;
28
let reader = tokio::io::BufReader::new(reader);
29
30
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
32
33
// in this example we only bother handling CARs that are too big for memory
34
// `noop` helper means: do no block processing, store the raw blocks
35
+
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(10) // how much memory can be used before disk spill
37
+
.load_car(reader)
38
+
.await?
39
+
{
40
Driver::Memory(_, _) => panic!("try this on a bigger car"),
41
Driver::Disk(big_stuff) => {
42
// we reach here if the repo was too big and needs to be spilled to
43
// disk to continue
44
45
// set up a disk store we can spill to
46
+
let disk_store = DiskBuilder::new().open(tmpfile).await?;
47
48
// do the spilling, get back a (similar) driver
49
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
50
51
// at this point you might want to fetch the account's signing key
52
// via the DID from the commit, and then verify the signature.
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
54
55
// pop the driver back out to get some code indentation relief
56
driver
···
80
}
81
}
82
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
84
85
// clean up the database. would be nice to do this in drop so it happens
86
// automatically, but some blocking work happens, so that's not allowed in
+9
-6
examples/read-file/main.rs
+9
-6
examples/read-file/main.rs
···
4
5
extern crate repo_stream;
6
use clap::Parser;
7
-
use repo_stream::Driver;
8
use std::path::PathBuf;
9
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
23
let reader = tokio::fs::File::open(file).await?;
24
let reader = tokio::io::BufReader::new(reader);
25
26
-
let (commit, mut driver) =
27
-
match Driver::load_car(reader, |block| block.len(), 16 /* MiB */).await? {
28
-
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
29
-
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
30
-
};
31
32
log::info!("got commit: {commit:?}");
33
···
4
5
extern crate repo_stream;
6
use clap::Parser;
7
+
use repo_stream::{Driver, DriverBuilder};
8
use std::path::PathBuf;
9
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
23
let reader = tokio::fs::File::open(file).await?;
24
let reader = tokio::io::BufReader::new(reader);
25
26
+
let (commit, mut driver) = match DriverBuilder::new()
27
+
.with_block_processor(|block| block.len())
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
+
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
+
};
34
35
log::info!("got commit: {commit:?}");
36
+70
-2
readme.md
+70
-2
readme.md
···
1
# repo-stream
2
3
-
Fast and (aspirationally) robust atproto CAR file processing in rust
4
5
6
current car processing times (records processed into their length usize, phil's dev machine):
···
27
-> yeah the commit is returned from init
28
- [ ] spec compliance todos
29
- [x] assert that keys are ordered and fail if not
30
-
- [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
31
- [ ] performance todos
32
- [x] consume the serialized nodes into a mutable efficient format
33
- [ ] maybe customize the deserialize impl to do that directly?
···
1
# repo-stream
2
3
+
A robust CAR file -> MST walker for atproto
4
+
5
+
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
+
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
8
+
9
+
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10
+
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
+
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
15
+
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
54
+
// clean up the disk store (drop tables etc)
55
+
driver.reset_store().await?;
56
+
}
57
+
};
58
+
println!("sum of size of all records: {total_size}");
59
+
Ok(())
60
+
}
61
+
```
62
+
63
+
more recent todo
64
+
65
+
- [ ] get an *emtpy* car for the test suite
66
+
- [x] implement a max size on disk limit
67
+
68
+
69
+
-----
70
+
71
+
older stuff (to clean up):
72
73
74
current car processing times (records processed into their length usize, phil's dev machine):
···
95
-> yeah the commit is returned from init
96
- [ ] spec compliance todos
97
- [x] assert that keys are ordered and fail if not
98
+
- [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
99
- [ ] performance todos
100
- [x] consume the serialized nodes into a mutable efficient format
101
- [ ] maybe customize the deserialize impl to do that directly?
+85
-6
src/disk.rs
+85
-6
src/disk.rs
···
5
to be the best behaved in terms of both on-disk space usage and memory usage.
6
7
```no_run
8
-
# use repo_stream::{DiskStore, DiskError};
9
# #[tokio::main]
10
# async fn main() -> Result<(), DiskError> {
11
-
let db_cache_size = 32; // MiB
12
-
let store = DiskStore::new("/some/path.db".into(), db_cache_size).await?;
13
# Ok(())
14
# }
15
```
···
30
/// A tokio blocking task failed to join
31
#[error("Failed to join a tokio blocking task: {0}")]
32
JoinError(#[from] tokio::task::JoinError),
33
#[error("this error was replaced, seeing this is a bug.")]
34
#[doc(hidden)]
35
Stolen,
···
44
}
45
}
46
47
/// On-disk block storage
48
pub struct DiskStore {
49
conn: rusqlite::Connection,
50
}
51
52
impl DiskStore {
53
/// Initialize a new disk store
54
-
pub async fn new(path: PathBuf, cache_mb: usize) -> Result<Self, DiskError> {
55
let conn = tokio::task::spawn_blocking(move || {
56
let conn = rusqlite::Connection::open(path)?;
57
···
73
})
74
.await??;
75
76
-
Ok(Self { conn })
77
}
78
pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
79
let tx = self.conn.transaction()?;
80
-
Ok(SqliteWriter { tx })
81
}
82
pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
83
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
···
106
107
pub(crate) struct SqliteWriter<'conn> {
108
tx: rusqlite::Transaction<'conn>,
109
}
110
111
impl SqliteWriter<'_> {
···
119
.map_err(DiskError::DbError)?;
120
for pair in kv {
121
let (k, v) = pair?;
122
insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
123
}
124
Ok(())
···
5
to be the best behaved in terms of both on-disk space usage and memory usage.
6
7
```no_run
8
+
# use repo_stream::{DiskBuilder, DiskError};
9
# #[tokio::main]
10
# async fn main() -> Result<(), DiskError> {
11
+
let store = DiskBuilder::new()
12
+
.with_cache_size_mb(32)
13
+
.with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14
+
.open("/some/path.db".into()).await?;
15
# Ok(())
16
# }
17
```
···
32
/// A tokio blocking task failed to join
33
#[error("Failed to join a tokio blocking task: {0}")]
34
JoinError(#[from] tokio::task::JoinError),
35
+
/// The total size of stored blocks exceeded the allowed size
36
+
///
37
+
/// If you need to process *really* big CARs, you can configure a higher
38
+
/// limit.
39
+
#[error("Maximum disk size reached")]
40
+
MaxSizeExceeded,
41
#[error("this error was replaced, seeing this is a bug.")]
42
#[doc(hidden)]
43
Stolen,
···
52
}
53
}
54
55
+
/// Builder-style disk store setup
56
+
#[derive(Debug, Clone)]
57
+
pub struct DiskBuilder {
58
+
/// Database in-memory cache allowance
59
+
///
60
+
/// Default: 32 MiB
61
+
pub cache_size_mb: usize,
62
+
/// Database stored block size limit
63
+
///
64
+
/// Default: 10 GiB
65
+
///
66
+
/// Note: actual size on disk may be more, but should approximately scale
67
+
/// with this limit
68
+
pub max_stored_mb: usize,
69
+
}
70
+
71
+
impl Default for DiskBuilder {
72
+
fn default() -> Self {
73
+
Self {
74
+
cache_size_mb: 32,
75
+
max_stored_mb: 10 * 1024, // 10 GiB
76
+
}
77
+
}
78
+
}
79
+
80
+
impl DiskBuilder {
81
+
/// Begin configuring the storage with defaults
82
+
pub fn new() -> Self {
83
+
Default::default()
84
+
}
85
+
/// Set the in-memory cache allowance for the database
86
+
///
87
+
/// Default: 32 MiB
88
+
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
89
+
self.cache_size_mb = size;
90
+
self
91
+
}
92
+
/// Set the approximate stored block size limit
93
+
///
94
+
/// Default: 10 GiB
95
+
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
96
+
self.max_stored_mb = max;
97
+
self
98
+
}
99
+
/// Open and initialize the actual disk storage
100
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
101
+
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
102
+
}
103
+
}
104
+
105
/// On-disk block storage
106
pub struct DiskStore {
107
conn: rusqlite::Connection,
108
+
max_stored: usize,
109
+
stored: usize,
110
}
111
112
impl DiskStore {
113
/// Initialize a new disk store
114
+
pub async fn new(
115
+
path: PathBuf,
116
+
cache_mb: usize,
117
+
max_stored_mb: usize,
118
+
) -> Result<Self, DiskError> {
119
+
let max_stored = max_stored_mb * 2_usize.pow(20);
120
let conn = tokio::task::spawn_blocking(move || {
121
let conn = rusqlite::Connection::open(path)?;
122
···
138
})
139
.await??;
140
141
+
Ok(Self {
142
+
conn,
143
+
max_stored,
144
+
stored: 0,
145
+
})
146
}
147
pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
148
let tx = self.conn.transaction()?;
149
+
Ok(SqliteWriter {
150
+
tx,
151
+
stored: &mut self.stored,
152
+
max: self.max_stored,
153
+
})
154
}
155
pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
156
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
···
179
180
pub(crate) struct SqliteWriter<'conn> {
181
tx: rusqlite::Transaction<'conn>,
182
+
stored: &'conn mut usize,
183
+
max: usize,
184
}
185
186
impl SqliteWriter<'_> {
···
194
.map_err(DiskError::DbError)?;
195
for pair in kv {
196
let (k, v) = pair?;
197
+
*self.stored += v.len();
198
+
if *self.stored > self.max {
199
+
return Err(DiskError::MaxSizeExceeded.into());
200
+
}
201
insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
202
}
203
Ok(())
+78
-6
src/drive.rs
+78
-6
src/drive.rs
···
115
Disk(NeedDisk<R, T>),
116
}
117
118
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
119
/// Begin processing an atproto MST from a CAR file
120
///
121
/// Blocks will be loaded, processed, and buffered in memory. If the entire
122
-
/// processed size is under the `max_size_mb` limit, a `Driver::Memory` will
123
-
/// be returned along with a `Commit` ready for validation.
124
///
125
-
/// If the `max_size_mb` limit is reached before loading all blocks, the
126
/// partial state will be returned as `Driver::Disk(needed)`, which can be
127
/// resumed by providing a `SqliteStorage` for on-disk block storage.
128
pub async fn load_car(
129
reader: R,
130
process: fn(Vec<u8>) -> T,
131
-
max_size_mb: usize,
132
) -> Result<Driver<R, T>, DriveError> {
133
-
let max_size = max_size_mb * 2_usize.pow(20);
134
let mut mem_blocks = HashMap::new();
135
136
let mut car = CarReader::new(reader).await?;
···
276
})
277
.await??;
278
279
-
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2);
280
281
let store_worker = tokio::task::spawn_blocking(move || {
282
let mut writer = store.get_writer()?;
···
115
Disk(NeedDisk<R, T>),
116
}
117
118
+
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
120
+
pub struct DriverBuilder {
121
+
pub mem_limit_mb: usize,
122
+
}
123
+
124
+
impl Default for DriverBuilder {
125
+
fn default() -> Self {
126
+
Self { mem_limit_mb: 16 }
127
+
}
128
+
}
129
+
130
+
impl DriverBuilder {
131
+
/// Begin configuring the driver with defaults
132
+
pub fn new() -> Self {
133
+
Default::default()
134
+
}
135
+
/// Set the in-memory size limit, in MiB
136
+
///
137
+
/// Default: 16 MiB
138
+
pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
139
+
Self {
140
+
mem_limit_mb: new_limit,
141
+
}
142
+
}
143
+
/// Set the block processor
144
+
///
145
+
/// Default: noop, raw blocks will be emitted
146
+
pub fn with_block_processor<T: Processable>(
147
+
self,
148
+
p: fn(Vec<u8>) -> T,
149
+
) -> DriverBuilderWithProcessor<T> {
150
+
DriverBuilderWithProcessor {
151
+
mem_limit_mb: self.mem_limit_mb,
152
+
block_processor: p,
153
+
}
154
+
}
155
+
/// Begin processing an atproto MST from a CAR file
156
+
pub async fn load_car<R: AsyncRead + Unpin>(
157
+
&self,
158
+
reader: R,
159
+
) -> Result<Driver<R, Vec<u8>>, DriveError> {
160
+
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
161
+
}
162
+
}
163
+
164
+
/// Builder-style driver intermediate step
165
+
///
166
+
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
168
+
pub struct DriverBuilderWithProcessor<T: Processable> {
169
+
pub mem_limit_mb: usize,
170
+
pub block_processor: fn(Vec<u8>) -> T,
171
+
}
172
+
173
+
impl<T: Processable> DriverBuilderWithProcessor<T> {
174
+
/// Set the in-memory size limit, in MiB
175
+
///
176
+
/// Default: 16 MiB
177
+
pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
178
+
self.mem_limit_mb = new_limit;
179
+
self
180
+
}
181
+
/// Begin processing an atproto MST from a CAR file
182
+
pub async fn load_car<R: AsyncRead + Unpin>(
183
+
&self,
184
+
reader: R,
185
+
) -> Result<Driver<R, T>, DriveError> {
186
+
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
187
+
}
188
+
}
189
+
190
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
191
/// Begin processing an atproto MST from a CAR file
192
///
193
/// Blocks will be loaded, processed, and buffered in memory. If the entire
194
+
/// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
195
+
/// will be returned along with a `Commit` ready for validation.
196
///
197
+
/// If the `mem_limit_mb` limit is reached before loading all blocks, the
198
/// partial state will be returned as `Driver::Disk(needed)`, which can be
199
/// resumed by providing a `SqliteStorage` for on-disk block storage.
200
pub async fn load_car(
201
reader: R,
202
process: fn(Vec<u8>) -> T,
203
+
mem_limit_mb: usize,
204
) -> Result<Driver<R, T>, DriveError> {
205
+
let max_size = mem_limit_mb * 2_usize.pow(20);
206
let mut mem_blocks = HashMap::new();
207
208
let mut car = CarReader::new(reader).await?;
···
348
})
349
.await??;
350
351
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
352
353
let store_worker = tokio::task::spawn_blocking(move || {
354
let mut writer = store.get_writer()?;
+10
-8
src/lib.rs
+10
-8
src/lib.rs
···
18
`iroh_car` additionally applies a block size limit of `2MiB`.
19
20
```
21
-
use repo_stream::{Driver, DiskStore};
22
23
# #[tokio::main]
24
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
let mut total_size = 0;
27
-
let process = |rec: Vec<u8>| rec.len(); // block processing: just extract the size
28
-
let in_mem_limit = 10; /* MiB */
29
-
let db_cache_size = 32; /* MiB */
30
31
-
match Driver::load_car(reader, process, in_mem_limit).await? {
32
33
// if all blocks fit within memory
34
Driver::Memory(_commit, mut driver) => {
···
42
// if the CAR was too big for in-memory processing
43
Driver::Disk(paused) => {
44
// set up a disk store we can spill to
45
-
let store = DiskStore::new("some/path.db".into(), db_cache_size).await?;
46
// do the spilling, get back a (similar) driver
47
let (_commit, mut driver) = paused.finish_loading(store).await?;
48
···
79
pub mod drive;
80
pub mod process;
81
82
-
pub use disk::{DiskError, DiskStore};
83
-
pub use drive::{DriveError, Driver};
84
pub use mst::Commit;
85
pub use process::Processable;
···
18
`iroh_car` additionally applies a block size limit of `2MiB`.
19
20
```
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
23
# #[tokio::main]
24
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
let mut total_size = 0;
27
28
+
match DriverBuilder::new()
29
+
.with_mem_limit_mb(10)
30
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31
+
.load_car(reader)
32
+
.await?
33
+
{
34
35
// if all blocks fit within memory
36
Driver::Memory(_commit, mut driver) => {
···
44
// if the CAR was too big for in-memory processing
45
Driver::Disk(paused) => {
46
// set up a disk store we can spill to
47
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
48
// do the spilling, get back a (similar) driver
49
let (_commit, mut driver) = paused.finish_loading(store).await?;
50
···
81
pub mod drive;
82
pub mod process;
83
84
+
pub use disk::{DiskBuilder, DiskError, DiskStore};
85
+
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk};
86
pub use mst::Commit;
87
pub use process::Processable;
+27
src/process.rs
+27
src/process.rs
···
11
approximate total off-stack size of the type. (the on-stack size will be added
12
automatically via `std::mem::get_size`).
13
14
Here's a silly processing function that just collects 'eyy's found in the raw
15
record bytes
16
···
71
}
72
}
73
74
impl<Item: Sized + Processable> Processable for Vec<Item> {
75
fn get_size(&self) -> usize {
76
let slot_size = std::mem::size_of::<Item>();
···
79
direct_size + items_referenced_size
80
}
81
}
···
11
approximate total off-stack size of the type. (the on-stack size will be added
12
automatically via `std::mem::get_size`).
13
14
+
Note that it is **not guaranteed** that the `process` function will run on a
15
+
block before storing it in memory or on disk: it's not possible to know if a
16
+
block is a record without actually walking the MST, so the best we can do is
17
+
apply `process` to any block that we know *cannot* be an MST node, and otherwise
18
+
store the raw block bytes.
19
+
20
Here's a silly processing function that just collects 'eyy's found in the raw
21
record bytes
22
···
77
}
78
}
79
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
86
impl<Item: Sized + Processable> Processable for Vec<Item> {
87
fn get_size(&self) -> usize {
88
let slot_size = std::mem::size_of::<Item>();
···
91
direct_size + items_referenced_size
92
}
93
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+6
-203
src/walk.rs
+6
-203
src/walk.rs
···
87
}
88
89
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
-
// empty nodes are not allowed in the MST
91
-
// ...except for a single one for empty MST, but we wouldn't be pushing that
92
if node.is_empty() {
93
-
return Err(MstError::EmptyNode);
94
}
95
96
let mut entries = Vec::with_capacity(node.entries.len());
···
304
#[cfg(test)]
305
mod test {
306
use super::*;
307
-
// use crate::mst::Entry;
308
309
fn cid1() -> Cid {
310
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
311
.parse()
312
.unwrap()
313
}
314
-
// fn cid2() -> Cid {
315
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
316
-
// .parse()
317
-
// .unwrap()
318
-
// }
319
-
// fn cid3() -> Cid {
320
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
321
-
// .parse()
322
-
// .unwrap()
323
-
// }
324
-
// fn cid4() -> Cid {
325
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
326
-
// .parse()
327
-
// .unwrap()
328
-
// }
329
-
// fn cid5() -> Cid {
330
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
331
-
// .parse()
332
-
// .unwrap()
333
-
// }
334
-
// fn cid6() -> Cid {
335
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
336
-
// .parse()
337
-
// .unwrap()
338
-
// }
339
-
// fn cid7() -> Cid {
340
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
341
-
// .parse()
342
-
// .unwrap()
343
-
// }
344
-
// fn cid8() -> Cid {
345
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
346
-
// .parse()
347
-
// .unwrap()
348
-
// }
349
-
// fn cid9() -> Cid {
350
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
351
-
// .parse()
352
-
// .unwrap()
353
-
// }
354
355
#[test]
356
fn test_depth_spec_0() {
···
441
.as_ref()
442
);
443
}
444
-
445
-
// #[test]
446
-
// fn test_needs_from_node_just_one_record() {
447
-
// let node = Node {
448
-
// left: None,
449
-
// entries: vec![Entry {
450
-
// keysuffix: "asdf".into(),
451
-
// prefix_len: 0,
452
-
// value: cid1(),
453
-
// tree: None,
454
-
// }],
455
-
// };
456
-
// assert_eq!(
457
-
// needs_from_node(node).unwrap(),
458
-
// vec![Need::Record {
459
-
// rkey: "asdf".into(),
460
-
// cid: cid1(),
461
-
// },]
462
-
// );
463
-
// }
464
-
465
-
// #[test]
466
-
// fn test_needs_from_node_two_records() {
467
-
// let node = Node {
468
-
// left: None,
469
-
// entries: vec![
470
-
// Entry {
471
-
// keysuffix: "asdf".into(),
472
-
// prefix_len: 0,
473
-
// value: cid1(),
474
-
// tree: None,
475
-
// },
476
-
// Entry {
477
-
// keysuffix: "gh".into(),
478
-
// prefix_len: 2,
479
-
// value: cid2(),
480
-
// tree: None,
481
-
// },
482
-
// ],
483
-
// };
484
-
// assert_eq!(
485
-
// needs_from_node(node).unwrap(),
486
-
// vec![
487
-
// Need::Record {
488
-
// rkey: "asdf".into(),
489
-
// cid: cid1(),
490
-
// },
491
-
// Need::Record {
492
-
// rkey: "asgh".into(),
493
-
// cid: cid2(),
494
-
// },
495
-
// ]
496
-
// );
497
-
// }
498
-
499
-
// #[test]
500
-
// fn test_needs_from_node_with_both() {
501
-
// let node = Node {
502
-
// left: None,
503
-
// entries: vec![Entry {
504
-
// keysuffix: "asdf".into(),
505
-
// prefix_len: 0,
506
-
// value: cid1(),
507
-
// tree: Some(cid2()),
508
-
// }],
509
-
// };
510
-
// assert_eq!(
511
-
// needs_from_node(node).unwrap(),
512
-
// vec![
513
-
// Need::Record {
514
-
// rkey: "asdf".into(),
515
-
// cid: cid1(),
516
-
// },
517
-
// Need::Node(cid2()),
518
-
// ]
519
-
// );
520
-
// }
521
-
522
-
// #[test]
523
-
// fn test_needs_from_node_left_and_record() {
524
-
// let node = Node {
525
-
// left: Some(cid1()),
526
-
// entries: vec![Entry {
527
-
// keysuffix: "asdf".into(),
528
-
// prefix_len: 0,
529
-
// value: cid2(),
530
-
// tree: None,
531
-
// }],
532
-
// };
533
-
// assert_eq!(
534
-
// needs_from_node(node).unwrap(),
535
-
// vec![
536
-
// Need::Node(cid1()),
537
-
// Need::Record {
538
-
// rkey: "asdf".into(),
539
-
// cid: cid2(),
540
-
// },
541
-
// ]
542
-
// );
543
-
// }
544
-
545
-
// #[test]
546
-
// fn test_needs_from_full_node() {
547
-
// let node = Node {
548
-
// left: Some(cid1()),
549
-
// entries: vec![
550
-
// Entry {
551
-
// keysuffix: "asdf".into(),
552
-
// prefix_len: 0,
553
-
// value: cid2(),
554
-
// tree: Some(cid3()),
555
-
// },
556
-
// Entry {
557
-
// keysuffix: "ghi".into(),
558
-
// prefix_len: 1,
559
-
// value: cid4(),
560
-
// tree: Some(cid5()),
561
-
// },
562
-
// Entry {
563
-
// keysuffix: "jkl".into(),
564
-
// prefix_len: 2,
565
-
// value: cid6(),
566
-
// tree: Some(cid7()),
567
-
// },
568
-
// Entry {
569
-
// keysuffix: "mno".into(),
570
-
// prefix_len: 4,
571
-
// value: cid8(),
572
-
// tree: Some(cid9()),
573
-
// },
574
-
// ],
575
-
// };
576
-
// assert_eq!(
577
-
// needs_from_node(node).unwrap(),
578
-
// vec![
579
-
// Need::Node(cid1()),
580
-
// Need::Record {
581
-
// rkey: "asdf".into(),
582
-
// cid: cid2(),
583
-
// },
584
-
// Need::Node(cid3()),
585
-
// Need::Record {
586
-
// rkey: "aghi".into(),
587
-
// cid: cid4(),
588
-
// },
589
-
// Need::Node(cid5()),
590
-
// Need::Record {
591
-
// rkey: "agjkl".into(),
592
-
// cid: cid6(),
593
-
// },
594
-
// Need::Node(cid7()),
595
-
// Need::Record {
596
-
// rkey: "agjkmno".into(),
597
-
// cid: cid8(),
598
-
// },
599
-
// Need::Node(cid9()),
600
-
// ]
601
-
// );
602
-
// }
603
}
···
87
}
88
89
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
+
// empty nodes are not allowed in the MST except in an empty MST
91
if node.is_empty() {
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
97
}
98
99
let mut entries = Vec::with_capacity(node.entries.len());
···
307
#[cfg(test)]
308
mod test {
309
use super::*;
310
311
fn cid1() -> Cid {
312
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
313
.parse()
314
.unwrap()
315
}
316
317
#[test]
318
fn test_depth_spec_0() {
···
403
.as_ref()
404
);
405
}
406
}
+16
-5
tests/non-huge-cars.rs
+16
-5
tests/non-huge-cars.rs
···
1
extern crate repo_stream;
2
use repo_stream::Driver;
3
4
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
5
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
6
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
7
8
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
9
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
10
.await
11
.unwrap()
···
33
34
assert_eq!(records, expected_records);
35
assert_eq!(sum, expected_sum);
36
-
assert!(found_bsky_profile);
37
}
38
39
#[tokio::test]
40
async fn test_tiny_car() {
41
-
test_car(TINY_CAR, 8, 2071).await
42
}
43
44
#[tokio::test]
45
async fn test_little_car() {
46
-
test_car(LITTLE_CAR, 278, 246960).await
47
}
48
49
#[tokio::test]
50
async fn test_midsize_car() {
51
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
52
}
···
1
extern crate repo_stream;
2
use repo_stream::Driver;
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
8
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
15
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
16
.await
17
.unwrap()
···
39
40
assert_eq!(records, expected_records);
41
assert_eq!(sum, expected_sum);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
48
}
49
50
#[tokio::test]
51
async fn test_tiny_car() {
52
+
test_car(TINY_CAR, 8, 2071, true).await
53
}
54
55
#[tokio::test]
56
async fn test_little_car() {
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
58
}
59
60
#[tokio::test]
61
async fn test_midsize_car() {
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
63
}