+1
-1
Cargo.lock
+1
-1
Cargo.lock
+1
-1
Cargo.toml
+1
-1
Cargo.toml
+4
benches/non-huge-cars.rs
+4
benches/non-huge-cars.rs
···
3
3
4
4
use criterion::{Criterion, criterion_group, criterion_main};
5
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
6
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
7
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
8
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
13
14
.build()
14
15
.expect("Creating runtime failed");
15
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
16
20
c.bench_function("tiny-car", |b| {
17
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
18
22
});
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+4
-2
examples/disk-read-file/main.rs
+4
-2
examples/disk-read-file/main.rs
···
6
6
use clap::Parser;
7
7
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
8
8
use std::path::PathBuf;
9
+
use std::time::Instant;
9
10
10
11
#[derive(Debug, Parser)]
11
12
struct Args {
···
27
28
let reader = tokio::io::BufReader::new(reader);
28
29
29
30
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
30
32
31
33
// in this example we only bother handling CARs that are too big for memory
32
34
// `noop` helper means: do no block processing, store the raw blocks
···
48
50
49
51
// at this point you might want to fetch the account's signing key
50
52
// via the DID from the commit, and then verify the signature.
51
-
log::warn!("big's comit: {:?}", commit);
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
52
54
53
55
// pop the driver back out to get some code indentation relief
54
56
driver
···
78
80
}
79
81
}
80
82
81
-
log::info!("arrived! joining rx...");
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
82
84
83
85
// clean up the database. would be nice to do this in drop so it happens
84
86
// automatically, but some blocking work happens, so that's not allowed in
+53
-2
readme.md
+53
-2
readme.md
···
4
4
5
5
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
6
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
7
8
8
9
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
9
10
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
10
12
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
11
15
12
-
todo
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
54
+
// clean up the disk store (drop tables etc)
55
+
driver.reset_store().await?;
56
+
}
57
+
};
58
+
println!("sum of size of all records: {total_size}");
59
+
Ok(())
60
+
}
61
+
```
62
+
63
+
more recent todo
13
64
14
65
- [ ] get an *emtpy* car for the test suite
15
-
- [ ] implement a max size on disk limit
66
+
- [x] implement a max size on disk limit
16
67
17
68
18
69
-----
+2
-1
src/disk.rs
+2
-1
src/disk.rs
···
53
53
}
54
54
55
55
/// Builder-style disk store setup
56
+
#[derive(Debug, Clone)]
56
57
pub struct DiskBuilder {
57
58
/// Database in-memory cache allowance
58
59
///
···
96
97
self
97
98
}
98
99
/// Open and initialize the actual disk storage
99
-
pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> {
100
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
100
101
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
101
102
}
102
103
}
+5
-3
src/drive.rs
+5
-3
src/drive.rs
···
116
116
}
117
117
118
118
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
119
120
pub struct DriverBuilder {
120
121
pub mem_limit_mb: usize,
121
122
}
···
153
154
}
154
155
/// Begin processing an atproto MST from a CAR file
155
156
pub async fn load_car<R: AsyncRead + Unpin>(
156
-
self,
157
+
&self,
157
158
reader: R,
158
159
) -> Result<Driver<R, Vec<u8>>, DriveError> {
159
160
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
···
163
164
/// Builder-style driver intermediate step
164
165
///
165
166
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
166
168
pub struct DriverBuilderWithProcessor<T: Processable> {
167
169
pub mem_limit_mb: usize,
168
170
pub block_processor: fn(Vec<u8>) -> T,
···
178
180
}
179
181
/// Begin processing an atproto MST from a CAR file
180
182
pub async fn load_car<R: AsyncRead + Unpin>(
181
-
self,
183
+
&self,
182
184
reader: R,
183
185
) -> Result<Driver<R, T>, DriveError> {
184
186
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
···
346
348
})
347
349
.await??;
348
350
349
-
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2);
351
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
350
352
351
353
let store_worker = tokio::task::spawn_blocking(move || {
352
354
let mut writer = store.get_writer()?;
+1
-1
src/lib.rs
+1
-1
src/lib.rs
+21
src/process.rs
+21
src/process.rs
···
77
77
}
78
78
}
79
79
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
80
86
impl<Item: Sized + Processable> Processable for Vec<Item> {
81
87
fn get_size(&self) -> usize {
82
88
let slot_size = std::mem::size_of::<Item>();
···
85
91
direct_size + items_referenced_size
86
92
}
87
93
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+6
-3
src/walk.rs
+6
-3
src/walk.rs
···
87
87
}
88
88
89
89
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
-
// empty nodes are not allowed in the MST
91
-
// ...except for a single one for empty MST, but we wouldn't be pushing that
90
+
// empty nodes are not allowed in the MST except in an empty MST
92
91
if node.is_empty() {
93
-
return Err(MstError::EmptyNode);
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
94
97
}
95
98
96
99
let mut entries = Vec::with_capacity(node.entries.len());
+16
-5
tests/non-huge-cars.rs
+16
-5
tests/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
2
use repo_stream::Driver;
3
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
4
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
5
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
6
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
7
8
8
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
9
15
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
10
16
.await
11
17
.unwrap()
···
33
39
34
40
assert_eq!(records, expected_records);
35
41
assert_eq!(sum, expected_sum);
36
-
assert!(found_bsky_profile);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
37
48
}
38
49
39
50
#[tokio::test]
40
51
async fn test_tiny_car() {
41
-
test_car(TINY_CAR, 8, 2071).await
52
+
test_car(TINY_CAR, 8, 2071, true).await
42
53
}
43
54
44
55
#[tokio::test]
45
56
async fn test_little_car() {
46
-
test_car(LITTLE_CAR, 278, 246960).await
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
47
58
}
48
59
49
60
#[tokio::test]
50
61
async fn test_midsize_car() {
51
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
52
63
}