···89[dependencies]
10fjall = { version = "3.0.1", default-features = false }
11-hashbrown = "0.16.1"
12cid = { version = "0.11.1", features = ["serde"] }
13iroh-car = "0.5.1"
14log = "0.4.28"
···18sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower
19thiserror = "2.0.17"
20tokio = { version = "1.47.1", features = ["rt", "sync"] }
21-multihash-codetable = { version = "0.1.4", features = ["sha2"] }
0002223[dev-dependencies]
24clap = { version = "4.5.48", features = ["derive"] }
···48# [[bench]]
49# name = "leading"
50# harness = false
51-52-[[bench]]
53-name = "cid-check"
54-harness = false
···89[dependencies]
10fjall = { version = "3.0.1", default-features = false }
11+hashbrown = { version = "0.16.1", optional = true }
12cid = { version = "0.11.1", features = ["serde"] }
13iroh-car = "0.5.1"
14log = "0.4.28"
···18sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower
19thiserror = "2.0.17"
20tokio = { version = "1.47.1", features = ["rt", "sync"] }
21+22+[features]
23+default = []
24+hashbrown = ["dep:hashbrown"]
2526[dev-dependencies]
27clap = { version = "4.5.48", features = ["derive"] }
···51# [[bench]]
52# name = "leading"
53# harness = false
0000
-45
benches/cid-check.rs
···1-use cid::Cid;
2-use criterion::{Criterion, criterion_group, criterion_main};
3-use multihash_codetable::{Code, MultihashDigest};
4-use sha2::{Digest, Sha256};
5-6-fn multihash_verify(given: Cid, block: &[u8]) -> bool {
7- let calculated = Cid::new_v1(0x71, Code::Sha2_256.digest(block));
8- calculated == given
9-}
10-11-fn effortful_verify(given: Cid, block: &[u8]) -> bool {
12- // we know we're in atproto, so we can make a few assumptions
13- if given.version() != cid::Version::V1 {
14- return false;
15- }
16- let (codec, given_digest, _) = given.hash().into_inner();
17- if codec != 0x12 {
18- return false;
19- }
20- given_digest[..32] == *Sha256::digest(block)
21-}
22-23-fn fastloose_verify(given: Cid, block: &[u8]) -> bool {
24- let (_, given_digest, _) = given.hash().into_inner();
25- given_digest[..32] == *Sha256::digest(block)
26-}
27-28-pub fn criterion_benchmark(c: &mut Criterion) {
29- let some_bytes: Vec<u8> = vec![0x1a, 0x00, 0xAA, 0x39, 0x8C].repeat(100);
30- let cid = Cid::new_v1(0x71, Code::Sha2_256.digest(&some_bytes));
31-32- let mut g = c.benchmark_group("CID check");
33- g.bench_function("multihash", |b| {
34- b.iter(|| multihash_verify(cid, &some_bytes))
35- });
36- g.bench_function("effortful", |b| {
37- b.iter(|| effortful_verify(cid, &some_bytes))
38- });
39- g.bench_function("fastloose", |b| {
40- b.iter(|| fastloose_verify(cid, &some_bytes))
41- });
42-}
43-44-criterion_group!(benches, criterion_benchmark);
45-criterion_main!(benches);
···000000000000000000000000000000000000000000000
+3-3
benches/huge-car.rs
···1extern crate repo_stream;
2-use repo_stream::Driver;
3use std::path::{Path, PathBuf};
45use criterion::{Criterion, criterion_group, criterion_main};
···33 let reader = tokio::io::BufReader::new(reader);
3435 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() {
36- Driver::Memory(_, mem_driver) => mem_driver,
37 Driver::Disk(_) => panic!("not doing disk for benchmark"),
38 };
3940 let mut n = 0;
41- while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
42 n += pairs.len();
43 }
44 n
···1extern crate repo_stream;
2+use repo_stream::{Driver, Step};
3use std::path::{Path, PathBuf};
45use criterion::{Criterion, criterion_group, criterion_main};
···33 let reader = tokio::io::BufReader::new(reader);
3435 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() {
36+ Driver::Memory(_, _, mem_driver) => mem_driver,
37 Driver::Disk(_) => panic!("not doing disk for benchmark"),
38 };
3940 let mut n = 0;
41+ while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() {
42 n += pairs.len();
43 }
44 n
+3-3
benches/non-huge-cars.rs
···1extern crate repo_stream;
2-use repo_stream::Driver;
34use criterion::{Criterion, criterion_group, criterion_main};
5···4041async fn drive_car(bytes: &[u8]) -> usize {
42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() {
43- Driver::Memory(_, mem_driver) => mem_driver,
44 Driver::Disk(_) => panic!("not benching big cars here"),
45 };
4647 let mut n = 0;
48- while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
49 n += pairs.len();
50 }
51 n
···1extern crate repo_stream;
2+use repo_stream::{Driver, Step};
34use criterion::{Criterion, criterion_group, criterion_main};
5···4041async fn drive_car(bytes: &[u8]) -> usize {
42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() {
43+ Driver::Memory(_, _, mem_driver) => mem_driver,
44 Driver::Disk(_) => panic!("not benching big cars here"),
45 };
4647 let mut n = 0;
48+ while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() {
49 n += pairs.len();
50 }
51 n
car-samples/slice-node-after.car
This is a binary file and will not be displayed.
car-samples/slice-node-first-key.car
This is a binary file and will not be displayed.
car-samples/slice-one.car
This is a binary file and will not be displayed.
car-samples/slice-proving-absence.car
This is a binary file and will not be displayed.
+10-7
examples/disk-read-file/main.rs
···9static GLOBAL: MiMalloc = MiMalloc;
1011use clap::Parser;
12-use repo_stream::{DiskBuilder, Driver, DriverBuilder};
13use std::path::PathBuf;
14use std::time::Instant;
15···42 .load_car(reader)
43 .await?
44 {
45- Driver::Memory(_, _) => panic!("try this on a bigger car"),
46 Driver::Disk(big_stuff) => {
47 // we reach here if the repo was too big and needs to be spilled to
48 // disk to continue
···51 let disk_store = DiskBuilder::new().open(tmpfile).await?;
5253 // do the spilling, get back a (similar) driver
54- let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
5556 // at this point you might want to fetch the account's signing key
57 // via the DID from the commit, and then verify the signature.
···74 // this example uses the disk driver's channel mode: the tree walking is
75 // spawned onto a blocking thread, and we get chunks of rkey+blocks back
76 let (mut rx, join) = driver.to_channel(512);
77- while let Some(r) = rx.recv().await {
78- let pairs = r?;
0007980 // keep a count of the total number of blocks seen
81- n += pairs.len();
8283- for output in pairs {
84 // for each block, count how many bytes are equal to '0'
85 // (this is just an example, you probably want to do something more
86 // interesting)
···9static GLOBAL: MiMalloc = MiMalloc;
1011use clap::Parser;
12+use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step};
13use std::path::PathBuf;
14use std::time::Instant;
15···42 .load_car(reader)
43 .await?
44 {
45+ Driver::Memory(_, _, _) => panic!("try this on a bigger car"),
46 Driver::Disk(big_stuff) => {
47 // we reach here if the repo was too big and needs to be spilled to
48 // disk to continue
···51 let disk_store = DiskBuilder::new().open(tmpfile).await?;
5253 // do the spilling, get back a (similar) driver
54+ let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?;
5556 // at this point you might want to fetch the account's signing key
57 // via the DID from the commit, and then verify the signature.
···74 // this example uses the disk driver's channel mode: the tree walking is
75 // spawned onto a blocking thread, and we get chunks of rkey+blocks back
76 let (mut rx, join) = driver.to_channel(512);
77+ while let Some(step) = rx.recv().await {
78+ let step = step?;
79+ let Step::Value(outputs) = step else {
80+ break;
81+ };
8283 // keep a count of the total number of blocks seen
84+ n += outputs.len();
8586+ for output in outputs {
87 // for each block, count how many bytes are equal to '0'
88 // (this is just an example, you probably want to do something more
89 // interesting)
+32
examples/print-tree/main.rs
···00000000000000000000000000000000
···1+/*!
2+Read a CAR slice in memory and show some info about it.
3+*/
4+5+extern crate repo_stream;
6+use repo_stream::{Driver, DriverBuilder};
7+8+type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9+10+#[tokio::main]
11+async fn main() -> Result<()> {
12+ env_logger::init();
13+ let reader = tokio::io::BufReader::new(tokio::io::stdin());
14+15+ let (commit, driver) = match DriverBuilder::new()
16+ .with_block_processor(|block| block.len().to_ne_bytes().to_vec())
17+ .load_car(reader)
18+ .await?
19+ {
20+ Driver::Memory(commit, _, mem_driver) => (commit, mem_driver),
21+ Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
22+ };
23+24+ println!(
25+ "\nthis slice is from {}, repo rev {}\n\n",
26+ commit.did, commit.rev
27+ );
28+29+ driver.viz(commit.data)?;
30+31+ Ok(())
32+}
+11-7
examples/read-file/main.rs
···45extern crate repo_stream;
6use clap::Parser;
7-use repo_stream::{Driver, DriverBuilder};
8use std::path::PathBuf;
910type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···28 .load_car(reader)
29 .await?
30 {
31- Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32 Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33 };
3435 log::info!("got commit: {commit:?}");
3637- let mut n = 0;
38- while let Some(pairs) = driver.next_chunk(256).await? {
39- n += pairs.len();
40- // log::info!("got {rkey:?}");
0000041 }
42- log::info!("bye! total records={n}");
4344 Ok(())
45}
···45extern crate repo_stream;
6use clap::Parser;
7+use repo_stream::{Driver, DriverBuilder, Output, Step};
8use std::path::PathBuf;
910type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···28 .load_car(reader)
29 .await?
30 {
31+ Driver::Memory(commit, _, mem_driver) => (commit, mem_driver),
32 Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33 };
3435 log::info!("got commit: {commit:?}");
3637+ while let Step::Value(records) = driver.next_chunk(256).await? {
38+ for Output { rkey, cid, data } in records {
39+ let size = usize::from_ne_bytes(data.try_into().unwrap());
40+ print!("0x");
41+ for byte in cid.to_bytes() {
42+ print!("{byte:>02x}");
43+ }
44+ println!(": {rkey} => record of len {}", size);
45+ }
46 }
04748 Ok(())
49}
···1+/*!
2+Read a CAR slice in memory and show some info about it.
3+*/
4+5+extern crate repo_stream;
6+use repo_stream::{Driver, DriverBuilder, Output, Step};
7+8+type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9+10+#[tokio::main]
11+async fn main() -> Result<()> {
12+ env_logger::init();
13+ let reader = tokio::io::BufReader::new(tokio::io::stdin());
14+15+ let (commit, prev_rkey, mut driver) = match DriverBuilder::new()
16+ .with_block_processor(|block| block.len().to_ne_bytes().to_vec())
17+ .load_car(reader)
18+ .await?
19+ {
20+ Driver::Memory(commit, prev, mem_driver) => (commit, prev, mem_driver),
21+ Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
22+ };
23+24+ println!(
25+ "\nthis slice is from {}, repo rev {}",
26+ commit.did, commit.rev
27+ );
28+ if let Some(rkey) = prev_rkey {
29+ println!(" -> key immediately before CAR slice: {rkey}");
30+ } else {
31+ println!(
32+ " -> no key preceeding the CAR slice, so it includes the leading edge of the tree."
33+ );
34+ }
35+36+ println!("included records:");
37+ let end = loop {
38+ match driver.next_chunk(256).await? {
39+ Step::Value(chunk) => {
40+ for Output { cid, rkey, .. } in chunk {
41+ print!(" SHA256 ");
42+ for byte in cid.to_bytes().iter().skip(4).take(5) {
43+ print!("{byte:02x}");
44+ }
45+ println!("...\t{rkey}");
46+ }
47+ }
48+ Step::End(e) => break e,
49+ }
50+ };
51+52+ println!("done walking records present in the slice.");
53+ if let Some(rkey) = end {
54+ println!(" -> key immediately after CAR slice: {rkey}");
55+ } else {
56+ println!(
57+ " -> no key proceeding the CAR slice, so it includes the trailing edge of the tree."
58+ );
59+ }
60+61+ Ok(())
62+}
+18-18
readme.md
···11[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
1213```rust no_run
14-use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output};
1516#[tokio::main]
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
···31 {
3233 // if all blocks fit within memory
34- Driver::Memory(_commit, mut driver) => {
35- while let Some(chunk) = driver.next_chunk(256).await? {
36 for Output { rkey: _, cid: _, data } in chunk {
37 let size = usize::from_ne_bytes(data.try_into().unwrap());
38 total_size += size;
···45 // set up a disk store we can spill to
46 let store = DiskBuilder::new().open("some/path.db".into()).await?;
47 // do the spilling, get back a (similar) driver
48- let (_commit, mut driver) = paused.finish_loading(store).await?;
4950- while let Some(chunk) = driver.next_chunk(256).await? {
51 for Output { rkey: _, cid: _, data } in chunk {
52 let size = usize::from_ne_bytes(data.try_into().unwrap());
53 total_size += size;
···6263more recent todo
64- [ ] add a zero-copy rkyv process function example
65-- [ ] repo car slices
66-- [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling)
67- [x] get an *emtpy* car for the test suite
68- [x] implement a max size on disk limit
69···7980current car processing times (records processed into their length usize, phil's dev machine):
8182-- 450MiB CAR file (huge): `1.3s`
83- 128MiB (huge): `350ms`
84-- 5.0MiB: `6.8ms`
85-- 279KiB: `160us`
86-- 3.4KiB: `5.1us`
87-- empty: `690ns`
8889it's a little faster with `mimalloc`
90···94static GLOBAL: MiMalloc = MiMalloc;
95```
9697-- 450MiB CAR file: `1.2s` (-8%)
98-- 128MiB: `300ms` (-14%)
99-- 5.0MiB: `6.0ms` (-11%)
100-- 279KiB: `150us` (-7%)
101-- 3.4KiB: `4.7us` (-8%)
102-- empty: `670ns` (-4%)
103104processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!)
105
···11[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
1213```rust no_run
14+use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step};
1516#[tokio::main]
17async fn main() -> Result<(), Box<dyn std::error::Error>> {
···31 {
3233 // if all blocks fit within memory
34+ Driver::Memory(_commit, _prev_rkey, mut driver) => {
35+ while let Step::Value(chunk) = driver.next_chunk(256).await? {
36 for Output { rkey: _, cid: _, data } in chunk {
37 let size = usize::from_ne_bytes(data.try_into().unwrap());
38 total_size += size;
···45 // set up a disk store we can spill to
46 let store = DiskBuilder::new().open("some/path.db".into()).await?;
47 // do the spilling, get back a (similar) driver
48+ let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?;
4950+ while let Step::Value(chunk) = driver.next_chunk(256).await? {
51 for Output { rkey: _, cid: _, data } in chunk {
52 let size = usize::from_ne_bytes(data.try_into().unwrap());
53 total_size += size;
···6263more recent todo
64- [ ] add a zero-copy rkyv process function example
65+- [ ] car slices
66+- [ ] lazy-value stream (for rkey -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed)
67- [x] get an *emtpy* car for the test suite
68- [x] implement a max size on disk limit
69···7980current car processing times (records processed into their length usize, phil's dev machine):
8182+- 450MiB CAR file (huge): `1.4s`
83- 128MiB (huge): `350ms`
84+- 5.0MiB: `7.0ms`
85+- 279KiB: `170us`
86+- 3.4KiB: `5.3us`
87+- empty: `720ns`
8889it's a little faster with `mimalloc`
90···94static GLOBAL: MiMalloc = MiMalloc;
95```
9697+- 450MiB CAR file: `1.1s` (-15%)
98+- 128MiB: `300ms` (-15%)
99+- 5.0MiB: `5.5ms` (-21%)
100+- 279KiB: `140us` (-17%)
101+- 3.4KiB: `4.3us` (-18%)
102+- empty: `610ns` (-16%)
103104processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!)
105
···1//! Consume a CAR from an AsyncRead, producing an ordered stream of records
203use crate::{
4- Bytes, HashMap,
5 disk::{DiskError, DiskStore},
6 mst::MstNode,
7- walk::Output,
8};
9use cid::Cid;
10use iroh_car::CarReader;
11-use multihash_codetable::{Code, MultihashDigest};
12use std::convert::Infallible;
13use tokio::{io::AsyncRead, sync::mpsc};
14···20pub enum DriveError {
21 #[error("Error from iroh_car: {0}")]
22 CarReader(#[from] iroh_car::Error),
23- #[error("Block did not match its CID")]
24- BadCID,
25 #[error("Failed to decode commit block: {0}")]
26 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
27 #[error("The Commit block reference by the root was not found")]
···36 ChannelSendError, // SendError takes <T> which we don't need
37 #[error("Failed to join a task: {0}")]
38 JoinError(#[from] tokio::task::JoinError),
00000039}
4041/// An in-order chunk of Rkey + CID + (processed) Block
···110 ///
111 /// You probably want to check the commit's signature. You can go ahead and
112 /// walk the MST right away.
113- Memory(Commit, MemDriver),
114 /// Blocks exceed the memory limit
115 ///
116 /// You'll need to provide a disk storage to continue. The commit will be
···124 block
125}
126127-// iroh-car doesn't verify CIDs!!!!!!
128-#[inline(always)]
129-fn verify_block(given: Cid, block: &[u8]) -> bool {
130- Cid::new_v1(0x71, Code::Sha2_256.digest(block)) == given
131-}
132-133/// Builder-style driver setup
134#[derive(Debug, Clone)]
135pub struct DriverBuilder {
···205 // try to load all the blocks into memory
206 let mut mem_size = 0;
207 while let Some((cid, data)) = car.next_block().await? {
208- // lkasdjflkajdsflkajsfdlkjasdf
209- if !verify_block(cid, &data) {
210- return Err(DriveError::BadCID);
211- }
212-213 // the root commit is a Special Third Kind of block that we need to make
214 // sure not to optimistically send to the processing function
215 if cid == root {
···223224 // stash (maybe processed) blocks in memory as long as we have room
225 mem_size += maybe_processed.len();
226- mem_blocks.insert(cid, maybe_processed);
227 if mem_size >= max_size {
228 return Ok(Driver::Disk(NeedDisk {
229 car,
···247 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
248 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
249 };
250- let walker = Walker::new(root_node);
0000251252 Ok(Driver::Memory(
253 commit,
0254 MemDriver {
255 blocks: mem_blocks,
256 walker,
257 process,
0258 },
259 ))
260 }
···275/// so the sync/async boundaries become a little easier to work around.
276#[derive(Debug)]
277pub struct MemDriver {
278- blocks: HashMap<Cid, MaybeProcessedBlock>,
279 walker: Walker,
280- process: fn(Bytes) -> Bytes,
0281}
282283impl MemDriver {
000284 /// Step through the record outputs, in rkey order
285- pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
00000000000000000000286 let mut out = Vec::with_capacity(n);
0287 for _ in 0..n {
288- // walk as far as we can until we run out of blocks or find a record
289- let Some(output) = self.walker.step(&mut self.blocks, self.process)? else {
290- break;
291- };
292- out.push(output);
00000293 }
294 if out.is_empty() {
295- Ok(None)
296 } else {
297- Ok(Some(out))
298 }
299 }
300}
···305 root: Cid,
306 process: fn(Bytes) -> Bytes,
307 max_size: usize,
308- mem_blocks: HashMap<Cid, MaybeProcessedBlock>,
309 pub commit: Option<Commit>,
310}
311···313 pub async fn finish_loading(
314 mut self,
315 mut store: DiskStore,
316- ) -> Result<(Commit, DiskDriver), DriveError> {
317 // move store in and back out so we can manage lifetimes
318 // dump mem blocks into the store
319 store = tokio::task::spawn(async move {
···327 })
328 .await??;
329330- let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1);
331332 let store_worker = tokio::task::spawn_blocking(move || {
333 while let Some(chunk) = rx.blocking_recv() {
···348 let Some((cid, data)) = self.car.next_block().await? else {
349 break;
350 };
351-352- // lkasdjflkajdsflkajsfdlkjasdf
353- if !verify_block(cid, &data) {
354- return Err(DriveError::BadCID);
355- }
356-357 // we still gotta keep checking for the root since we might not have it
358 if cid == self.root {
359 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
···361 continue;
362 }
3630364 let data = Bytes::from(data);
365366 // remaining possible types: node, record, other. optimistically process
367 // TODO: get the actual in-memory size to compute disk spill
368 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
369 mem_size += maybe_processed.len();
370- chunk.push((cid, maybe_processed));
371 if mem_size >= (self.max_size / 2) {
372 // soooooo if we're setting the db cache to max_size and then letting
373 // multiple chunks in the queue that are >= max_size, then at any time
···391392 let commit = self.commit.ok_or(DriveError::MissingCommit)?;
393394- // the commit always must point to a Node; empty node => empty MST special case
395 let db_bytes = store
396 .get(&commit.data.to_bytes())
397 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))?
···405406 Ok((
407 commit,
0408 DiskDriver {
409 process: self.process,
410 state: Some(BigState { store, walker }),
···437 /// Walk the MST returning up to `n` rkey + record pairs
438 ///
439 /// ```no_run
440- /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
441 /// # #[tokio::main]
442 /// # async fn main() -> Result<(), DriveError> {
443 /// # let mut disk_driver = _get_fake_disk_driver();
444- /// while let Some(pairs) = disk_driver.next_chunk(256).await? {
445- /// for output in pairs {
446 /// println!("{}: size={}", output.rkey, output.data.len());
447 /// }
448 /// }
449 /// # Ok(())
450 /// # }
451 /// ```
452- pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
453 let process = self.process;
454455 // state should only *ever* be None transiently while inside here
···464465 for _ in 0..n {
466 // walk as far as we can until we run out of blocks or find a record
467- let step = match state.walker.disk_step(&mut state.store, process) {
468 Ok(s) => s,
469 Err(e) => {
470 return (state, Err(e.into()));
471 }
472 };
473- let Some(output) = step else {
474 break;
475 };
476 out.push(output);
···486 let out = res?;
487488 if out.is_empty() {
489- Ok(None)
490 } else {
491- Ok(Some(out))
492 }
493 }
494495 fn read_tx_blocking(
496 &mut self,
497 n: usize,
498- tx: mpsc::Sender<Result<BlockChunk, DriveError>>,
499- ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> {
500 let BigState { store, walker } = self.state.as_mut().expect("valid state");
501502 loop {
···510 Err(e) => return tx.blocking_send(Err(e.into())),
511 };
512513- let Some(output) = step else {
514 break;
515 };
516 out.push(output);
···519 if out.is_empty() {
520 break;
521 }
522- tx.blocking_send(Ok(out))?;
523 }
524525 Ok(())
···536 /// benefit over just using `.next_chunk(n)`.
537 ///
538 /// ```no_run
539- /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
540 /// # #[tokio::main]
541 /// # async fn main() -> Result<(), DriveError> {
542 /// # let mut disk_driver = _get_fake_disk_driver();
543 /// let (mut rx, join) = disk_driver.to_channel(512);
544 /// while let Some(recvd) = rx.recv().await {
545- /// let pairs = recvd?;
546- /// for output in pairs {
0547 /// println!("{}: size={}", output.rkey, output.data.len());
548 /// }
549 ///
···555 mut self,
556 n: usize,
557 ) -> (
558- mpsc::Receiver<Result<BlockChunk, DriveError>>,
559 tokio::task::JoinHandle<Self>,
560 ) {
561- let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1);
562563 // sketch: this worker is going to be allowed to execute without a join handle
564 let chan_task = tokio::task::spawn_blocking(move || {
···1//! Consume a CAR from an AsyncRead, producing an ordered stream of records
23+use crate::link::{NodeThing, ObjectLink, ThingKind};
4use crate::{
5+ Bytes, HashMap, Rkey, Step,
6 disk::{DiskError, DiskStore},
7 mst::MstNode,
8+ walk::{MstError, Output},
9};
10use cid::Cid;
11use iroh_car::CarReader;
012use std::convert::Infallible;
13use tokio::{io::AsyncRead, sync::mpsc};
14···20pub enum DriveError {
21 #[error("Error from iroh_car: {0}")]
22 CarReader(#[from] iroh_car::Error),
0023 #[error("Failed to decode commit block: {0}")]
24 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
25 #[error("The Commit block reference by the root was not found")]
···34 ChannelSendError, // SendError takes <T> which we don't need
35 #[error("Failed to join a task: {0}")]
36 JoinError(#[from] tokio::task::JoinError),
37+}
38+39+impl From<MstError> for DriveError {
40+ fn from(me: MstError) -> DriveError {
41+ DriveError::WalkError(WalkError::MstError(me))
42+ }
43}
4445/// An in-order chunk of Rkey + CID + (processed) Block
···114 ///
115 /// You probably want to check the commit's signature. You can go ahead and
116 /// walk the MST right away.
117+ Memory(Commit, Option<Rkey>, MemDriver),
118 /// Blocks exceed the memory limit
119 ///
120 /// You'll need to provide a disk storage to continue. The commit will be
···128 block
129}
130000000131/// Builder-style driver setup
132#[derive(Debug, Clone)]
133pub struct DriverBuilder {
···203 // try to load all the blocks into memory
204 let mut mem_size = 0;
205 while let Some((cid, data)) = car.next_block().await? {
00000206 // the root commit is a Special Third Kind of block that we need to make
207 // sure not to optimistically send to the processing function
208 if cid == root {
···216217 // stash (maybe processed) blocks in memory as long as we have room
218 mem_size += maybe_processed.len();
219+ mem_blocks.insert(cid.into(), maybe_processed);
220 if mem_size >= max_size {
221 return Ok(Driver::Disk(NeedDisk {
222 car,
···240 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
241 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
242 };
243+ let mut walker = Walker::new(root_node);
244+245+ // eprintln!("going to edge...");
246+ let edge = walker.step_to_edge(&mem_blocks)?;
247+ // eprintln!("got edge? {edge:?}");
248249 Ok(Driver::Memory(
250 commit,
251+ edge,
252 MemDriver {
253 blocks: mem_blocks,
254 walker,
255 process,
256+ next_missing: None,
257 },
258 ))
259 }
···274/// so the sync/async boundaries become a little easier to work around.
275#[derive(Debug)]
276pub struct MemDriver {
277+ blocks: HashMap<ObjectLink, MaybeProcessedBlock>,
278 walker: Walker,
279+ process: fn(Bytes) -> Bytes, // TODO: impl Fn(bytes) -> Bytes?
280+ next_missing: Option<NodeThing>,
281}
282283impl MemDriver {
284+ pub fn viz(&self, tree: ObjectLink) -> Result<(), WalkError> {
285+ self.walker.viz(&self.blocks, tree)
286+ }
287 /// Step through the record outputs, in rkey order
288+ pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> {
289+ if let Some(ref mut missing) = self.next_missing {
290+ while let Step::Value(sparse_out) =
291+ self.walker.step_sparse(&self.blocks, self.process)?
292+ {
293+ if missing.kind == ThingKind::ChildNode {
294+ *missing = NodeThing {
295+ link: sparse_out.cid.into(),
296+ kind: ThingKind::Record(sparse_out.rkey),
297+ };
298+ }
299+ }
300+ // TODO: l asdflkja slfkja lkdfj lakjd f
301+ // TODO: make the walker finish walking to verify no more present blocks (oops sparse tree)
302+ // HACK: just get the last rkey if it's there -- i think we might actually need to walk for it though
303+ // ...and walk to verify rkey order of the rest of the nodes anyway?
304+ return Ok(match &missing.kind {
305+ ThingKind::ChildNode => Step::End(None),
306+ ThingKind::Record(rkey) => Step::End(Some(rkey.clone())),
307+ });
308+ }
309 let mut out = Vec::with_capacity(n);
310+ // let mut err;
311 for _ in 0..n {
312+ match self.walker.step(&self.blocks, self.process) {
313+ Ok(Step::Value(record)) => out.push(record),
314+ Ok(Step::End(None)) => break,
315+ Ok(Step::End(_)) => todo!("actually this should be unreachable?"),
316+ Err(WalkError::MissingBlock(missing)) => {
317+ self.next_missing = Some(*missing);
318+ return Ok(Step::Value(out)); // nb: might be empty!
319+ }
320+ Err(other) => return Err(other.into()),
321+ }
322 }
323 if out.is_empty() {
324+ Ok(Step::End(None))
325 } else {
326+ Ok(Step::Value(out))
327 }
328 }
329}
···334 root: Cid,
335 process: fn(Bytes) -> Bytes,
336 max_size: usize,
337+ mem_blocks: HashMap<ObjectLink, MaybeProcessedBlock>,
338 pub commit: Option<Commit>,
339}
340···342 pub async fn finish_loading(
343 mut self,
344 mut store: DiskStore,
345+ ) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> {
346 // move store in and back out so we can manage lifetimes
347 // dump mem blocks into the store
348 store = tokio::task::spawn(async move {
···356 })
357 .await??;
358359+ let (tx, mut rx) = mpsc::channel::<Vec<(ObjectLink, MaybeProcessedBlock)>>(1);
360361 let store_worker = tokio::task::spawn_blocking(move || {
362 while let Some(chunk) = rx.blocking_recv() {
···377 let Some((cid, data)) = self.car.next_block().await? else {
378 break;
379 };
000000380 // we still gotta keep checking for the root since we might not have it
381 if cid == self.root {
382 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
···384 continue;
385 }
386387+ let link = cid.into();
388 let data = Bytes::from(data);
389390 // remaining possible types: node, record, other. optimistically process
391 // TODO: get the actual in-memory size to compute disk spill
392 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
393 mem_size += maybe_processed.len();
394+ chunk.push((link, maybe_processed));
395 if mem_size >= (self.max_size / 2) {
396 // soooooo if we're setting the db cache to max_size and then letting
397 // multiple chunks in the queue that are >= max_size, then at any time
···415416 let commit = self.commit.ok_or(DriveError::MissingCommit)?;
4170418 let db_bytes = store
419 .get(&commit.data.to_bytes())
420 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))?
···428429 Ok((
430 commit,
431+ None,
432 DiskDriver {
433 process: self.process,
434 state: Some(BigState { store, walker }),
···461 /// Walk the MST returning up to `n` rkey + record pairs
462 ///
463 /// ```no_run
464+ /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop};
465 /// # #[tokio::main]
466 /// # async fn main() -> Result<(), DriveError> {
467 /// # let mut disk_driver = _get_fake_disk_driver();
468+ /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? {
469+ /// for output in outputs {
470 /// println!("{}: size={}", output.rkey, output.data.len());
471 /// }
472 /// }
473 /// # Ok(())
474 /// # }
475 /// ```
476+ pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> {
477 let process = self.process;
478479 // state should only *ever* be None transiently while inside here
···488489 for _ in 0..n {
490 // walk as far as we can until we run out of blocks or find a record
491+ let step = match state.walker.disk_step(&state.store, process) {
492 Ok(s) => s,
493 Err(e) => {
494 return (state, Err(e.into()));
495 }
496 };
497+ let Step::Value(output) = step else {
498 break;
499 };
500 out.push(output);
···510 let out = res?;
511512 if out.is_empty() {
513+ Ok(Step::End(None))
514 } else {
515+ Ok(Step::Value(out))
516 }
517 }
518519 fn read_tx_blocking(
520 &mut self,
521 n: usize,
522+ tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>,
523+ ) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> {
524 let BigState { store, walker } = self.state.as_mut().expect("valid state");
525526 loop {
···534 Err(e) => return tx.blocking_send(Err(e.into())),
535 };
536537+ let Step::Value(output) = step else {
538 break;
539 };
540 out.push(output);
···543 if out.is_empty() {
544 break;
545 }
546+ tx.blocking_send(Ok(Step::Value(out)))?;
547 }
548549 Ok(())
···560 /// benefit over just using `.next_chunk(n)`.
561 ///
562 /// ```no_run
563+ /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop};
564 /// # #[tokio::main]
565 /// # async fn main() -> Result<(), DriveError> {
566 /// # let mut disk_driver = _get_fake_disk_driver();
567 /// let (mut rx, join) = disk_driver.to_channel(512);
568 /// while let Some(recvd) = rx.recv().await {
569+ /// let outputs = recvd?;
570+ /// let Step::Value(outputs) = outputs else { break; };
571+ /// for output in outputs {
572 /// println!("{}: size={}", output.rkey, output.data.len());
573 /// }
574 ///
···580 mut self,
581 n: usize,
582 ) -> (
583+ mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>,
584 tokio::task::JoinHandle<Self>,
585 ) {
586+ let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1);
587588 // sketch: this worker is going to be allowed to execute without a join handle
589 let chan_task = tokio::task::spawn_blocking(move || {
+14-6
src/lib.rs
···18`iroh_car` additionally applies a block size limit of `2MiB`.
1920```
21-use repo_stream::{Driver, DriverBuilder, DiskBuilder};
2223# #[tokio::main]
24# async fn main() -> Result<(), Box<dyn std::error::Error>> {
···35{
3637 // if all blocks fit within memory
38- Driver::Memory(_commit, mut driver) => {
39- while let Some(chunk) = driver.next_chunk(256).await? {
40 for output in chunk {
41 let size = usize::from_ne_bytes(output.data.try_into().unwrap());
42···50 // set up a disk store we can spill to
51 let store = DiskBuilder::new().open("some/path.db".into()).await?;
52 // do the spilling, get back a (similar) driver
53- let (_commit, mut driver) = paused.finish_loading(store).await?;
5455- while let Some(chunk) = driver.next_chunk(256).await? {
56 for output in chunk {
57 let size = usize::from_ne_bytes(output.data.try_into().unwrap());
58···8283pub mod disk;
84pub mod drive;
08586pub use disk::{DiskBuilder, DiskError, DiskStore};
87pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop};
088pub use mst::Commit;
89-pub use walk::Output;
9091pub type Bytes = Vec<u8>;
9200093pub(crate) use hashbrown::HashMap;
0009495#[doc = include_str!("../readme.md")]
96#[cfg(doctest)]
···18`iroh_car` additionally applies a block size limit of `2MiB`.
1920```
21+use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step};
2223# #[tokio::main]
24# async fn main() -> Result<(), Box<dyn std::error::Error>> {
···35{
3637 // if all blocks fit within memory
38+ Driver::Memory(_commit, _prev_rkey, mut driver) => {
39+ while let Step::Value(chunk) = driver.next_chunk(256).await? {
40 for output in chunk {
41 let size = usize::from_ne_bytes(output.data.try_into().unwrap());
42···50 // set up a disk store we can spill to
51 let store = DiskBuilder::new().open("some/path.db".into()).await?;
52 // do the spilling, get back a (similar) driver
53+ let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?;
5455+ while let Step::Value(chunk) = driver.next_chunk(256).await? {
56 for output in chunk {
57 let size = usize::from_ne_bytes(output.data.try_into().unwrap());
58···8283pub mod disk;
84pub mod drive;
85+pub mod link;
8687pub use disk::{DiskBuilder, DiskError, DiskStore};
88pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop};
89+pub use link::NodeThing;
90pub use mst::Commit;
91+pub use walk::{Output, Step};
9293pub type Bytes = Vec<u8>;
9495+pub type Rkey = String;
96+97+#[cfg(feature = "hashbrown")]
98pub(crate) use hashbrown::HashMap;
99+100+#[cfg(not(feature = "hashbrown"))]
101+pub(crate) use std::collections::HashMap;
102103#[doc = include_str!("../readme.md")]
104#[cfg(doctest)]
···3//! The primary aim is to work through the **tree** structure. Non-node blocks
4//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
506use cid::Cid;
7use serde::Deserialize;
08use sha2::{Digest, Sha256};
000910/// The top-level data object in a repository's tree is a signed commit.
11#[derive(Debug, Deserialize)]
···17 /// fixed value of 3 for this repo format version
18 pub version: u64,
19 /// pointer to the top of the repo contents tree structure (MST)
20- pub data: Cid,
21 /// revision of the repo, used as a logical clock.
22 ///
23 /// TID format. Must increase monotonically. Recommend using current
···31 /// exist in the CBOR object, but is virtually always null. NOTE: previously
32 /// specified as nullable and optional, but this caused interoperability
33 /// issues.
34- pub prev: Option<Cid>,
35 /// cryptographic signature of this commit, as raw bytes
36- #[serde(with = "serde_bytes")]
37 pub sig: serde_bytes::ByteBuf,
38}
3940-use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor};
41-use std::fmt;
42-43-pub type Depth = u32;
44-45#[inline(always)]
46pub fn atproto_mst_depth(key: &str) -> Depth {
47 // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24
48 u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2
49}
5051-#[derive(Debug)]
52pub(crate) struct MstNode {
53 pub depth: Option<Depth>, // known for nodes with entries (required for root)
54 pub things: Vec<NodeThing>,
55}
5657-#[derive(Debug)]
58-pub(crate) struct NodeThing {
59- pub(crate) cid: Cid,
60- pub(crate) kind: ThingKind,
61-}
62-63-#[derive(Debug)]
64-pub(crate) enum ThingKind {
65- Tree,
66- Value { rkey: String },
67-}
68-69impl<'de> Deserialize<'de> for MstNode {
70 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
71 where
···96 return Err(de::Error::duplicate_field("l"));
97 }
98 found_left = true;
99- if let Some(cid) = map.next_value()? {
100 left = Some(NodeThing {
101- cid,
102- kind: ThingKind::Tree,
103 });
104 }
105 }
···142 }
143144 things.push(NodeThing {
145- cid: entry.value,
146- kind: ThingKind::Value { rkey: rkey_s },
147 });
148149- if let Some(cid) = entry.tree {
150 things.push(NodeThing {
151- cid,
152- kind: ThingKind::Tree,
153 });
154 }
155···229 /// the lower level must have keys sorting after this TreeEntry's key (to
230 /// the "right"), but before the next TreeEntry's key in this Node (if any)
231 #[serde(rename = "t")]
232- pub tree: Option<Cid>,
233}
···3//! The primary aim is to work through the **tree** structure. Non-node blocks
4//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
56+use crate::link::{NodeThing, ObjectLink, ThingKind};
7use cid::Cid;
8use serde::Deserialize;
9+use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor};
10use sha2::{Digest, Sha256};
11+use std::fmt;
12+13+pub type Depth = u32;
1415/// The top-level data object in a repository's tree is a signed commit.
16#[derive(Debug, Deserialize)]
···22 /// fixed value of 3 for this repo format version
23 pub version: u64,
24 /// pointer to the top of the repo contents tree structure (MST)
25+ pub data: ObjectLink,
26 /// revision of the repo, used as a logical clock.
27 ///
28 /// TID format. Must increase monotonically. Recommend using current
···36 /// exist in the CBOR object, but is virtually always null. NOTE: previously
37 /// specified as nullable and optional, but this caused interoperability
38 /// issues.
39+ pub prev: Option<ObjectLink>,
40 /// cryptographic signature of this commit, as raw bytes
041 pub sig: serde_bytes::ByteBuf,
42}
430000044#[inline(always)]
45pub fn atproto_mst_depth(key: &str) -> Depth {
46 // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24
47 u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2
48}
4950+#[derive(Debug, Clone)]
51pub(crate) struct MstNode {
52 pub depth: Option<Depth>, // known for nodes with entries (required for root)
53 pub things: Vec<NodeThing>,
54}
5500000000000056impl<'de> Deserialize<'de> for MstNode {
57 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
58 where
···83 return Err(de::Error::duplicate_field("l"));
84 }
85 found_left = true;
86+ if let Some(link) = map.next_value()? {
87 left = Some(NodeThing {
88+ link,
89+ kind: ThingKind::ChildNode,
90 });
91 }
92 }
···129 }
130131 things.push(NodeThing {
132+ link: entry.value.into(),
133+ kind: ThingKind::Record(rkey_s),
134 });
135136+ if let Some(link) = entry.tree {
137 things.push(NodeThing {
138+ link,
139+ kind: ThingKind::ChildNode,
140 });
141 }
142···216 /// the lower level must have keys sorting after this TreeEntry's key (to
217 /// the "right"), but before the next TreeEntry's key in this Node (if any)
218 #[serde(rename = "t")]
219+ pub tree: Option<ObjectLink>,
220}