···8899[dependencies]
1010fjall = { version = "3.0.1", default-features = false }
1111-hashbrown = "0.16.1"
1111+hashbrown = { version = "0.16.1", optional = true }
1212cid = { version = "0.11.1", features = ["serde"] }
1313iroh-car = "0.5.1"
1414log = "0.4.28"
···1818sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower
1919thiserror = "2.0.17"
2020tokio = { version = "1.47.1", features = ["rt", "sync"] }
2121-multihash-codetable = { version = "0.1.4", features = ["sha2"] }
2121+2222+[features]
2323+default = []
2424+hashbrown = ["dep:hashbrown"]
22252326[dev-dependencies]
2427clap = { version = "4.5.48", features = ["derive"] }
···4851# [[bench]]
4952# name = "leading"
5053# harness = false
5151-5252-[[bench]]
5353-name = "cid-check"
5454-harness = false
-45
benches/cid-check.rs
···11-use cid::Cid;
22-use criterion::{Criterion, criterion_group, criterion_main};
33-use multihash_codetable::{Code, MultihashDigest};
44-use sha2::{Digest, Sha256};
55-66-fn multihash_verify(given: Cid, block: &[u8]) -> bool {
77- let calculated = Cid::new_v1(0x71, Code::Sha2_256.digest(block));
88- calculated == given
99-}
1010-1111-fn effortful_verify(given: Cid, block: &[u8]) -> bool {
1212- // we know we're in atproto, so we can make a few assumptions
1313- if given.version() != cid::Version::V1 {
1414- return false;
1515- }
1616- let (codec, given_digest, _) = given.hash().into_inner();
1717- if codec != 0x12 {
1818- return false;
1919- }
2020- given_digest[..32] == *Sha256::digest(block)
2121-}
2222-2323-fn fastloose_verify(given: Cid, block: &[u8]) -> bool {
2424- let (_, given_digest, _) = given.hash().into_inner();
2525- given_digest[..32] == *Sha256::digest(block)
2626-}
2727-2828-pub fn criterion_benchmark(c: &mut Criterion) {
2929- let some_bytes: Vec<u8> = vec![0x1a, 0x00, 0xAA, 0x39, 0x8C].repeat(100);
3030- let cid = Cid::new_v1(0x71, Code::Sha2_256.digest(&some_bytes));
3131-3232- let mut g = c.benchmark_group("CID check");
3333- g.bench_function("multihash", |b| {
3434- b.iter(|| multihash_verify(cid, &some_bytes))
3535- });
3636- g.bench_function("effortful", |b| {
3737- b.iter(|| effortful_verify(cid, &some_bytes))
3838- });
3939- g.bench_function("fastloose", |b| {
4040- b.iter(|| fastloose_verify(cid, &some_bytes))
4141- });
4242-}
4343-4444-criterion_group!(benches, criterion_benchmark);
4545-criterion_main!(benches);
+3-3
benches/huge-car.rs
···11extern crate repo_stream;
22-use repo_stream::Driver;
22+use repo_stream::{Driver, Step};
33use std::path::{Path, PathBuf};
4455use criterion::{Criterion, criterion_group, criterion_main};
···3333 let reader = tokio::io::BufReader::new(reader);
34343535 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() {
3636- Driver::Memory(_, mem_driver) => mem_driver,
3636+ Driver::Memory(_, _, mem_driver) => mem_driver,
3737 Driver::Disk(_) => panic!("not doing disk for benchmark"),
3838 };
39394040 let mut n = 0;
4141- while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
4141+ while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() {
4242 n += pairs.len();
4343 }
4444 n
+3-3
benches/non-huge-cars.rs
···11extern crate repo_stream;
22-use repo_stream::Driver;
22+use repo_stream::{Driver, Step};
3344use criterion::{Criterion, criterion_group, criterion_main};
55···40404141async fn drive_car(bytes: &[u8]) -> usize {
4242 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() {
4343- Driver::Memory(_, mem_driver) => mem_driver,
4343+ Driver::Memory(_, _, mem_driver) => mem_driver,
4444 Driver::Disk(_) => panic!("not benching big cars here"),
4545 };
46464747 let mut n = 0;
4848- while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
4848+ while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() {
4949 n += pairs.len();
5050 }
5151 n
car-samples/slice-node-after.car
This is a binary file and will not be displayed.
car-samples/slice-node-first-key.car
This is a binary file and will not be displayed.
car-samples/slice-one.car
This is a binary file and will not be displayed.
car-samples/slice-proving-absence.car
This is a binary file and will not be displayed.
+10-7
examples/disk-read-file/main.rs
···99static GLOBAL: MiMalloc = MiMalloc;
10101111use clap::Parser;
1212-use repo_stream::{DiskBuilder, Driver, DriverBuilder};
1212+use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step};
1313use std::path::PathBuf;
1414use std::time::Instant;
1515···4242 .load_car(reader)
4343 .await?
4444 {
4545- Driver::Memory(_, _) => panic!("try this on a bigger car"),
4545+ Driver::Memory(_, _, _) => panic!("try this on a bigger car"),
4646 Driver::Disk(big_stuff) => {
4747 // we reach here if the repo was too big and needs to be spilled to
4848 // disk to continue
···5151 let disk_store = DiskBuilder::new().open(tmpfile).await?;
52525353 // do the spilling, get back a (similar) driver
5454- let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
5454+ let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?;
55555656 // at this point you might want to fetch the account's signing key
5757 // via the DID from the commit, and then verify the signature.
···7474 // this example uses the disk driver's channel mode: the tree walking is
7575 // spawned onto a blocking thread, and we get chunks of rkey+blocks back
7676 let (mut rx, join) = driver.to_channel(512);
7777- while let Some(r) = rx.recv().await {
7878- let pairs = r?;
7777+ while let Some(step) = rx.recv().await {
7878+ let step = step?;
7979+ let Step::Value(outputs) = step else {
8080+ break;
8181+ };
79828083 // keep a count of the total number of blocks seen
8181- n += pairs.len();
8484+ n += outputs.len();
82858383- for output in pairs {
8686+ for output in outputs {
8487 // for each block, count how many bytes are equal to '0'
8588 // (this is just an example, you probably want to do something more
8689 // interesting)
+32
examples/print-tree/main.rs
···11+/*!
22+Read a CAR slice in memory and show some info about it.
33+*/
44+55+extern crate repo_stream;
66+use repo_stream::{Driver, DriverBuilder};
77+88+type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
99+1010+#[tokio::main]
1111+async fn main() -> Result<()> {
1212+ env_logger::init();
1313+ let reader = tokio::io::BufReader::new(tokio::io::stdin());
1414+1515+ let (commit, driver) = match DriverBuilder::new()
1616+ .with_block_processor(|block| block.len().to_ne_bytes().to_vec())
1717+ .load_car(reader)
1818+ .await?
1919+ {
2020+ Driver::Memory(commit, _, mem_driver) => (commit, mem_driver),
2121+ Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
2222+ };
2323+2424+ println!(
2525+ "\nthis slice is from {}, repo rev {}\n\n",
2626+ commit.did, commit.rev
2727+ );
2828+2929+ driver.viz(commit.data)?;
3030+3131+ Ok(())
3232+}
+11-7
examples/read-file/main.rs
···4455extern crate repo_stream;
66use clap::Parser;
77-use repo_stream::{Driver, DriverBuilder};
77+use repo_stream::{Driver, DriverBuilder, Output, Step};
88use std::path::PathBuf;
991010type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···2828 .load_car(reader)
2929 .await?
3030 {
3131- Driver::Memory(commit, mem_driver) => (commit, mem_driver),
3131+ Driver::Memory(commit, _, mem_driver) => (commit, mem_driver),
3232 Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
3333 };
34343535 log::info!("got commit: {commit:?}");
36363737- let mut n = 0;
3838- while let Some(pairs) = driver.next_chunk(256).await? {
3939- n += pairs.len();
4040- // log::info!("got {rkey:?}");
3737+ while let Step::Value(records) = driver.next_chunk(256).await? {
3838+ for Output { rkey, cid, data } in records {
3939+ let size = usize::from_ne_bytes(data.try_into().unwrap());
4040+ print!("0x");
4141+ for byte in cid.to_bytes() {
4242+ print!("{byte:>02x}");
4343+ }
4444+ println!(": {rkey} => record of len {}", size);
4545+ }
4146 }
4242- log::info!("bye! total records={n}");
43474448 Ok(())
4549}
+62
examples/read-slice/main.rs
···11+/*!
22+Read a CAR slice in memory and show some info about it.
33+*/
44+55+extern crate repo_stream;
66+use repo_stream::{Driver, DriverBuilder, Output, Step};
77+88+type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
99+1010+#[tokio::main]
1111+async fn main() -> Result<()> {
1212+ env_logger::init();
1313+ let reader = tokio::io::BufReader::new(tokio::io::stdin());
1414+1515+ let (commit, prev_rkey, mut driver) = match DriverBuilder::new()
1616+ .with_block_processor(|block| block.len().to_ne_bytes().to_vec())
1717+ .load_car(reader)
1818+ .await?
1919+ {
2020+ Driver::Memory(commit, prev, mem_driver) => (commit, prev, mem_driver),
2121+ Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
2222+ };
2323+2424+ println!(
2525+ "\nthis slice is from {}, repo rev {}",
2626+ commit.did, commit.rev
2727+ );
2828+ if let Some(rkey) = prev_rkey {
2929+ println!(" -> key immediately before CAR slice: {rkey}");
3030+ } else {
3131+ println!(
3232+ " -> no key preceeding the CAR slice, so it includes the leading edge of the tree."
3333+ );
3434+ }
3535+3636+ println!("included records:");
3737+ let end = loop {
3838+ match driver.next_chunk(256).await? {
3939+ Step::Value(chunk) => {
4040+ for Output { cid, rkey, .. } in chunk {
4141+ print!(" SHA256 ");
4242+ for byte in cid.to_bytes().iter().skip(4).take(5) {
4343+ print!("{byte:02x}");
4444+ }
4545+ println!("...\t{rkey}");
4646+ }
4747+ }
4848+ Step::End(e) => break e,
4949+ }
5050+ };
5151+5252+ println!("done walking records present in the slice.");
5353+ if let Some(rkey) = end {
5454+ println!(" -> key immediately after CAR slice: {rkey}");
5555+ } else {
5656+ println!(
5757+ " -> no key proceeding the CAR slice, so it includes the trailing edge of the tree."
5858+ );
5959+ }
6060+6161+ Ok(())
6262+}
+18-18
readme.md
···1111[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12121313```rust no_run
1414-use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output};
1414+use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step};
15151616#[tokio::main]
1717async fn main() -> Result<(), Box<dyn std::error::Error>> {
···3131 {
32323333 // if all blocks fit within memory
3434- Driver::Memory(_commit, mut driver) => {
3535- while let Some(chunk) = driver.next_chunk(256).await? {
3434+ Driver::Memory(_commit, _prev_rkey, mut driver) => {
3535+ while let Step::Value(chunk) = driver.next_chunk(256).await? {
3636 for Output { rkey: _, cid: _, data } in chunk {
3737 let size = usize::from_ne_bytes(data.try_into().unwrap());
3838 total_size += size;
···4545 // set up a disk store we can spill to
4646 let store = DiskBuilder::new().open("some/path.db".into()).await?;
4747 // do the spilling, get back a (similar) driver
4848- let (_commit, mut driver) = paused.finish_loading(store).await?;
4848+ let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?;
49495050- while let Some(chunk) = driver.next_chunk(256).await? {
5050+ while let Step::Value(chunk) = driver.next_chunk(256).await? {
5151 for Output { rkey: _, cid: _, data } in chunk {
5252 let size = usize::from_ne_bytes(data.try_into().unwrap());
5353 total_size += size;
···62626363more recent todo
6464- [ ] add a zero-copy rkyv process function example
6565-- [ ] repo car slices
6666-- [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling)
6565+- [ ] car slices
6666+- [ ] lazy-value stream (for rkey -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed)
6767- [x] get an *emtpy* car for the test suite
6868- [x] implement a max size on disk limit
6969···79798080current car processing times (records processed into their length usize, phil's dev machine):
81818282-- 450MiB CAR file (huge): `1.3s`
8282+- 450MiB CAR file (huge): `1.4s`
8383- 128MiB (huge): `350ms`
8484-- 5.0MiB: `6.8ms`
8585-- 279KiB: `160us`
8686-- 3.4KiB: `5.1us`
8787-- empty: `690ns`
8484+- 5.0MiB: `7.0ms`
8585+- 279KiB: `170us`
8686+- 3.4KiB: `5.3us`
8787+- empty: `720ns`
88888989it's a little faster with `mimalloc`
9090···9494static GLOBAL: MiMalloc = MiMalloc;
9595```
96969797-- 450MiB CAR file: `1.2s` (-8%)
9898-- 128MiB: `300ms` (-14%)
9999-- 5.0MiB: `6.0ms` (-11%)
100100-- 279KiB: `150us` (-7%)
101101-- 3.4KiB: `4.7us` (-8%)
102102-- empty: `670ns` (-4%)
9797+- 450MiB CAR file: `1.1s` (-15%)
9898+- 128MiB: `300ms` (-15%)
9999+- 5.0MiB: `5.5ms` (-21%)
100100+- 279KiB: `140us` (-17%)
101101+- 3.4KiB: `4.3us` (-18%)
102102+- empty: `610ns` (-16%)
103103104104processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!)
105105
···11//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2233+use crate::link::{NodeThing, ObjectLink, ThingKind};
34use crate::{
44- Bytes, HashMap,
55+ Bytes, HashMap, Rkey, Step,
56 disk::{DiskError, DiskStore},
67 mst::MstNode,
77- walk::Output,
88+ walk::{MstError, Output},
89};
910use cid::Cid;
1011use iroh_car::CarReader;
1111-use multihash_codetable::{Code, MultihashDigest};
1212use std::convert::Infallible;
1313use tokio::{io::AsyncRead, sync::mpsc};
1414···2020pub enum DriveError {
2121 #[error("Error from iroh_car: {0}")]
2222 CarReader(#[from] iroh_car::Error),
2323- #[error("Block did not match its CID")]
2424- BadCID,
2523 #[error("Failed to decode commit block: {0}")]
2624 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
2725 #[error("The Commit block reference by the root was not found")]
···3634 ChannelSendError, // SendError takes <T> which we don't need
3735 #[error("Failed to join a task: {0}")]
3836 JoinError(#[from] tokio::task::JoinError),
3737+}
3838+3939+impl From<MstError> for DriveError {
4040+ fn from(me: MstError) -> DriveError {
4141+ DriveError::WalkError(WalkError::MstError(me))
4242+ }
3943}
40444145/// An in-order chunk of Rkey + CID + (processed) Block
···110114 ///
111115 /// You probably want to check the commit's signature. You can go ahead and
112116 /// walk the MST right away.
113113- Memory(Commit, MemDriver),
117117+ Memory(Commit, Option<Rkey>, MemDriver),
114118 /// Blocks exceed the memory limit
115119 ///
116120 /// You'll need to provide a disk storage to continue. The commit will be
···124128 block
125129}
126130127127-// iroh-car doesn't verify CIDs!!!!!!
128128-#[inline(always)]
129129-fn verify_block(given: Cid, block: &[u8]) -> bool {
130130- Cid::new_v1(0x71, Code::Sha2_256.digest(block)) == given
131131-}
132132-133131/// Builder-style driver setup
134132#[derive(Debug, Clone)]
135133pub struct DriverBuilder {
···205203 // try to load all the blocks into memory
206204 let mut mem_size = 0;
207205 while let Some((cid, data)) = car.next_block().await? {
208208- // lkasdjflkajdsflkajsfdlkjasdf
209209- if !verify_block(cid, &data) {
210210- return Err(DriveError::BadCID);
211211- }
212212-213206 // the root commit is a Special Third Kind of block that we need to make
214207 // sure not to optimistically send to the processing function
215208 if cid == root {
···223216224217 // stash (maybe processed) blocks in memory as long as we have room
225218 mem_size += maybe_processed.len();
226226- mem_blocks.insert(cid, maybe_processed);
219219+ mem_blocks.insert(cid.into(), maybe_processed);
227220 if mem_size >= max_size {
228221 return Ok(Driver::Disk(NeedDisk {
229222 car,
···247240 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
248241 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
249242 };
250250- let walker = Walker::new(root_node);
243243+ let mut walker = Walker::new(root_node);
244244+245245+ // eprintln!("going to edge...");
246246+ let edge = walker.step_to_edge(&mem_blocks)?;
247247+ // eprintln!("got edge? {edge:?}");
251248252249 Ok(Driver::Memory(
253250 commit,
251251+ edge,
254252 MemDriver {
255253 blocks: mem_blocks,
256254 walker,
257255 process,
256256+ next_missing: None,
258257 },
259258 ))
260259 }
···275274/// so the sync/async boundaries become a little easier to work around.
276275#[derive(Debug)]
277276pub struct MemDriver {
278278- blocks: HashMap<Cid, MaybeProcessedBlock>,
277277+ blocks: HashMap<ObjectLink, MaybeProcessedBlock>,
279278 walker: Walker,
280280- process: fn(Bytes) -> Bytes,
279279+ process: fn(Bytes) -> Bytes, // TODO: impl Fn(bytes) -> Bytes?
280280+ next_missing: Option<NodeThing>,
281281}
282282283283impl MemDriver {
284284+ pub fn viz(&self, tree: ObjectLink) -> Result<(), WalkError> {
285285+ self.walker.viz(&self.blocks, tree)
286286+ }
284287 /// Step through the record outputs, in rkey order
285285- pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
288288+ pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> {
289289+ if let Some(ref mut missing) = self.next_missing {
290290+ while let Step::Value(sparse_out) =
291291+ self.walker.step_sparse(&self.blocks, self.process)?
292292+ {
293293+ if missing.kind == ThingKind::ChildNode {
294294+ *missing = NodeThing {
295295+ link: sparse_out.cid.into(),
296296+ kind: ThingKind::Record(sparse_out.rkey),
297297+ };
298298+ }
299299+ }
300300+ // TODO: l asdflkja slfkja lkdfj lakjd f
301301+ // TODO: make the walker finish walking to verify no more present blocks (oops sparse tree)
302302+ // HACK: just get the last rkey if it's there -- i think we might actually need to walk for it though
303303+ // ...and walk to verify rkey order of the rest of the nodes anyway?
304304+ return Ok(match &missing.kind {
305305+ ThingKind::ChildNode => Step::End(None),
306306+ ThingKind::Record(rkey) => Step::End(Some(rkey.clone())),
307307+ });
308308+ }
286309 let mut out = Vec::with_capacity(n);
310310+ // let mut err;
287311 for _ in 0..n {
288288- // walk as far as we can until we run out of blocks or find a record
289289- let Some(output) = self.walker.step(&mut self.blocks, self.process)? else {
290290- break;
291291- };
292292- out.push(output);
312312+ match self.walker.step(&self.blocks, self.process) {
313313+ Ok(Step::Value(record)) => out.push(record),
314314+ Ok(Step::End(None)) => break,
315315+ Ok(Step::End(_)) => todo!("actually this should be unreachable?"),
316316+ Err(WalkError::MissingBlock(missing)) => {
317317+ self.next_missing = Some(*missing);
318318+ return Ok(Step::Value(out)); // nb: might be empty!
319319+ }
320320+ Err(other) => return Err(other.into()),
321321+ }
293322 }
294323 if out.is_empty() {
295295- Ok(None)
324324+ Ok(Step::End(None))
296325 } else {
297297- Ok(Some(out))
326326+ Ok(Step::Value(out))
298327 }
299328 }
300329}
···305334 root: Cid,
306335 process: fn(Bytes) -> Bytes,
307336 max_size: usize,
308308- mem_blocks: HashMap<Cid, MaybeProcessedBlock>,
337337+ mem_blocks: HashMap<ObjectLink, MaybeProcessedBlock>,
309338 pub commit: Option<Commit>,
310339}
311340···313342 pub async fn finish_loading(
314343 mut self,
315344 mut store: DiskStore,
316316- ) -> Result<(Commit, DiskDriver), DriveError> {
345345+ ) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> {
317346 // move store in and back out so we can manage lifetimes
318347 // dump mem blocks into the store
319348 store = tokio::task::spawn(async move {
···327356 })
328357 .await??;
329358330330- let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1);
359359+ let (tx, mut rx) = mpsc::channel::<Vec<(ObjectLink, MaybeProcessedBlock)>>(1);
331360332361 let store_worker = tokio::task::spawn_blocking(move || {
333362 while let Some(chunk) = rx.blocking_recv() {
···348377 let Some((cid, data)) = self.car.next_block().await? else {
349378 break;
350379 };
351351-352352- // lkasdjflkajdsflkajsfdlkjasdf
353353- if !verify_block(cid, &data) {
354354- return Err(DriveError::BadCID);
355355- }
356356-357380 // we still gotta keep checking for the root since we might not have it
358381 if cid == self.root {
359382 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
···361384 continue;
362385 }
363386387387+ let link = cid.into();
364388 let data = Bytes::from(data);
365389366390 // remaining possible types: node, record, other. optimistically process
367391 // TODO: get the actual in-memory size to compute disk spill
368392 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
369393 mem_size += maybe_processed.len();
370370- chunk.push((cid, maybe_processed));
394394+ chunk.push((link, maybe_processed));
371395 if mem_size >= (self.max_size / 2) {
372396 // soooooo if we're setting the db cache to max_size and then letting
373397 // multiple chunks in the queue that are >= max_size, then at any time
···391415392416 let commit = self.commit.ok_or(DriveError::MissingCommit)?;
393417394394- // the commit always must point to a Node; empty node => empty MST special case
395418 let db_bytes = store
396419 .get(&commit.data.to_bytes())
397420 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))?
···405428406429 Ok((
407430 commit,
431431+ None,
408432 DiskDriver {
409433 process: self.process,
410434 state: Some(BigState { store, walker }),
···437461 /// Walk the MST returning up to `n` rkey + record pairs
438462 ///
439463 /// ```no_run
440440- /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
464464+ /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop};
441465 /// # #[tokio::main]
442466 /// # async fn main() -> Result<(), DriveError> {
443467 /// # let mut disk_driver = _get_fake_disk_driver();
444444- /// while let Some(pairs) = disk_driver.next_chunk(256).await? {
445445- /// for output in pairs {
468468+ /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? {
469469+ /// for output in outputs {
446470 /// println!("{}: size={}", output.rkey, output.data.len());
447471 /// }
448472 /// }
449473 /// # Ok(())
450474 /// # }
451475 /// ```
452452- pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
476476+ pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> {
453477 let process = self.process;
454478455479 // state should only *ever* be None transiently while inside here
···464488465489 for _ in 0..n {
466490 // walk as far as we can until we run out of blocks or find a record
467467- let step = match state.walker.disk_step(&mut state.store, process) {
491491+ let step = match state.walker.disk_step(&state.store, process) {
468492 Ok(s) => s,
469493 Err(e) => {
470494 return (state, Err(e.into()));
471495 }
472496 };
473473- let Some(output) = step else {
497497+ let Step::Value(output) = step else {
474498 break;
475499 };
476500 out.push(output);
···486510 let out = res?;
487511488512 if out.is_empty() {
489489- Ok(None)
513513+ Ok(Step::End(None))
490514 } else {
491491- Ok(Some(out))
515515+ Ok(Step::Value(out))
492516 }
493517 }
494518495519 fn read_tx_blocking(
496520 &mut self,
497521 n: usize,
498498- tx: mpsc::Sender<Result<BlockChunk, DriveError>>,
499499- ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> {
522522+ tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>,
523523+ ) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> {
500524 let BigState { store, walker } = self.state.as_mut().expect("valid state");
501525502526 loop {
···510534 Err(e) => return tx.blocking_send(Err(e.into())),
511535 };
512536513513- let Some(output) = step else {
537537+ let Step::Value(output) = step else {
514538 break;
515539 };
516540 out.push(output);
···519543 if out.is_empty() {
520544 break;
521545 }
522522- tx.blocking_send(Ok(out))?;
546546+ tx.blocking_send(Ok(Step::Value(out)))?;
523547 }
524548525549 Ok(())
···536560 /// benefit over just using `.next_chunk(n)`.
537561 ///
538562 /// ```no_run
539539- /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
563563+ /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop};
540564 /// # #[tokio::main]
541565 /// # async fn main() -> Result<(), DriveError> {
542566 /// # let mut disk_driver = _get_fake_disk_driver();
543567 /// let (mut rx, join) = disk_driver.to_channel(512);
544568 /// while let Some(recvd) = rx.recv().await {
545545- /// let pairs = recvd?;
546546- /// for output in pairs {
569569+ /// let outputs = recvd?;
570570+ /// let Step::Value(outputs) = outputs else { break; };
571571+ /// for output in outputs {
547572 /// println!("{}: size={}", output.rkey, output.data.len());
548573 /// }
549574 ///
···555580 mut self,
556581 n: usize,
557582 ) -> (
558558- mpsc::Receiver<Result<BlockChunk, DriveError>>,
583583+ mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>,
559584 tokio::task::JoinHandle<Self>,
560585 ) {
561561- let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1);
586586+ let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1);
562587563588 // sketch: this worker is going to be allowed to execute without a join handle
564589 let chan_task = tokio::task::spawn_blocking(move || {
+14-6
src/lib.rs
···1818`iroh_car` additionally applies a block size limit of `2MiB`.
19192020```
2121-use repo_stream::{Driver, DriverBuilder, DiskBuilder};
2121+use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step};
22222323# #[tokio::main]
2424# async fn main() -> Result<(), Box<dyn std::error::Error>> {
···3535{
36363737 // if all blocks fit within memory
3838- Driver::Memory(_commit, mut driver) => {
3939- while let Some(chunk) = driver.next_chunk(256).await? {
3838+ Driver::Memory(_commit, _prev_rkey, mut driver) => {
3939+ while let Step::Value(chunk) = driver.next_chunk(256).await? {
4040 for output in chunk {
4141 let size = usize::from_ne_bytes(output.data.try_into().unwrap());
4242···5050 // set up a disk store we can spill to
5151 let store = DiskBuilder::new().open("some/path.db".into()).await?;
5252 // do the spilling, get back a (similar) driver
5353- let (_commit, mut driver) = paused.finish_loading(store).await?;
5353+ let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?;
54545555- while let Some(chunk) = driver.next_chunk(256).await? {
5555+ while let Step::Value(chunk) = driver.next_chunk(256).await? {
5656 for output in chunk {
5757 let size = usize::from_ne_bytes(output.data.try_into().unwrap());
5858···82828383pub mod disk;
8484pub mod drive;
8585+pub mod link;
85868687pub use disk::{DiskBuilder, DiskError, DiskStore};
8788pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop};
8989+pub use link::NodeThing;
8890pub use mst::Commit;
8989-pub use walk::Output;
9191+pub use walk::{Output, Step};
90929193pub type Bytes = Vec<u8>;
92949595+pub type Rkey = String;
9696+9797+#[cfg(feature = "hashbrown")]
9398pub(crate) use hashbrown::HashMap;
9999+100100+#[cfg(not(feature = "hashbrown"))]
101101+pub(crate) use std::collections::HashMap;
9410295103#[doc = include_str!("../readme.md")]
96104#[cfg(doctest)]
···33//! The primary aim is to work through the **tree** structure. Non-node blocks
44//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
5566+use crate::link::{NodeThing, ObjectLink, ThingKind};
67use cid::Cid;
78use serde::Deserialize;
99+use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor};
810use sha2::{Digest, Sha256};
1111+use std::fmt;
1212+1313+pub type Depth = u32;
9141015/// The top-level data object in a repository's tree is a signed commit.
1116#[derive(Debug, Deserialize)]
···1722 /// fixed value of 3 for this repo format version
1823 pub version: u64,
1924 /// pointer to the top of the repo contents tree structure (MST)
2020- pub data: Cid,
2525+ pub data: ObjectLink,
2126 /// revision of the repo, used as a logical clock.
2227 ///
2328 /// TID format. Must increase monotonically. Recommend using current
···3136 /// exist in the CBOR object, but is virtually always null. NOTE: previously
3237 /// specified as nullable and optional, but this caused interoperability
3338 /// issues.
3434- pub prev: Option<Cid>,
3939+ pub prev: Option<ObjectLink>,
3540 /// cryptographic signature of this commit, as raw bytes
3636- #[serde(with = "serde_bytes")]
3741 pub sig: serde_bytes::ByteBuf,
3842}
39434040-use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor};
4141-use std::fmt;
4242-4343-pub type Depth = u32;
4444-4544#[inline(always)]
4645pub fn atproto_mst_depth(key: &str) -> Depth {
4746 // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24
4847 u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2
4948}
50495151-#[derive(Debug)]
5050+#[derive(Debug, Clone)]
5251pub(crate) struct MstNode {
5352 pub depth: Option<Depth>, // known for nodes with entries (required for root)
5453 pub things: Vec<NodeThing>,
5554}
56555757-#[derive(Debug)]
5858-pub(crate) struct NodeThing {
5959- pub(crate) cid: Cid,
6060- pub(crate) kind: ThingKind,
6161-}
6262-6363-#[derive(Debug)]
6464-pub(crate) enum ThingKind {
6565- Tree,
6666- Value { rkey: String },
6767-}
6868-6956impl<'de> Deserialize<'de> for MstNode {
7057 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
7158 where
···9683 return Err(de::Error::duplicate_field("l"));
9784 }
9885 found_left = true;
9999- if let Some(cid) = map.next_value()? {
8686+ if let Some(link) = map.next_value()? {
10087 left = Some(NodeThing {
101101- cid,
102102- kind: ThingKind::Tree,
8888+ link,
8989+ kind: ThingKind::ChildNode,
10390 });
10491 }
10592 }
···142129 }
143130144131 things.push(NodeThing {
145145- cid: entry.value,
146146- kind: ThingKind::Value { rkey: rkey_s },
132132+ link: entry.value.into(),
133133+ kind: ThingKind::Record(rkey_s),
147134 });
148135149149- if let Some(cid) = entry.tree {
136136+ if let Some(link) = entry.tree {
150137 things.push(NodeThing {
151151- cid,
152152- kind: ThingKind::Tree,
138138+ link,
139139+ kind: ThingKind::ChildNode,
153140 });
154141 }
155142···229216 /// the lower level must have keys sorting after this TreeEntry's key (to
230217 /// the "right"), but before the next TreeEntry's key in this Node (if any)
231218 #[serde(rename = "t")]
232232- pub tree: Option<Cid>,
219219+ pub tree: Option<ObjectLink>,
233220}
+224-32
src/walk.rs
···11//! Depth-first MST traversal
2233-use crate::mst::{Depth, MstNode, NodeThing, ThingKind};
44-use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock};
33+use crate::link::{NodeThing, ObjectLink, ThingKind};
44+use crate::mst::{Depth, MstNode};
55+use crate::{Bytes, HashMap, Rkey, disk::DiskStore, drive::MaybeProcessedBlock, noop};
56use cid::Cid;
67use std::convert::Infallible;
78···1617 MstError(#[from] MstError),
1718 #[error("storage error: {0}")]
1819 StorageError(#[from] fjall::Error),
1919- #[error("block not found: {0}")]
2020- MissingBlock(Cid),
2020+ #[error("block not found: {0:?}")]
2121+ MissingBlock(Box<NodeThing>),
2122}
22232324/// Errors from invalid Rkeys
···3031 #[error("MST depth underflow: depth-0 node with child trees")]
3132 DepthUnderflow,
3233 #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")]
3333- RkeyOutOfOrder { prev: String, rkey: String },
3434+ RkeyOutOfOrder { prev: Rkey, rkey: Rkey },
3435}
35363637/// Walker outputs
3838+///
3939+/// TODO: rename to "Record" or "Entry" or something
3740#[derive(Debug, PartialEq)]
3838-pub struct Output {
3939- pub rkey: String,
4141+pub struct Output<T = Bytes> {
4242+ pub rkey: Rkey, // TODO: aaa it's not really rkey, it's just "key" (or split to collection/rkey??)
4043 pub cid: Cid,
4141- pub data: Bytes,
4444+ pub data: T,
4545+}
4646+4747+#[derive(Debug, PartialEq)]
4848+pub enum Step<T = Output> {
4949+ Value(T),
5050+ End(Option<Rkey>),
4251}
43524453/// Traverser of an atproto MST
4554///
4655/// Walks the tree from left-to-right in depth-first order
4747-#[derive(Debug)]
5656+#[derive(Debug, Clone)]
4857pub struct Walker {
4949- prev_rkey: String,
5858+ prev_rkey: Rkey,
5059 root_depth: Depth,
5160 todo: Vec<Vec<NodeThing>>,
5261}
···6069 }
6170 }
62717272+ pub fn viz(
7373+ &self,
7474+ blocks: &HashMap<ObjectLink, MaybeProcessedBlock>,
7575+ root_link: ObjectLink,
7676+ ) -> Result<(), WalkError> {
7777+ let root_block = blocks.get(&root_link).ok_or(WalkError::MissingBlock(
7878+ NodeThing {
7979+ link: root_link.clone(),
8080+ kind: ThingKind::ChildNode,
8181+ }
8282+ .into(),
8383+ ))?;
8484+8585+ let root_node: MstNode = match root_block {
8686+ MaybeProcessedBlock::Processed(_) => return Err(WalkError::BadCommitFingerprint),
8787+ MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
8888+ };
8989+9090+ let mut positions = HashMap::new();
9191+ let mut w = Walker::new(root_node.clone());
9292+9393+ let mut pos_idx = 0;
9494+ while let Step::Value(Output { rkey, .. }) = w.step_sparse(blocks, noop)? {
9595+ positions.insert(rkey, pos_idx);
9696+ pos_idx += 1;
9797+ }
9898+9999+ Self::vnext(
100100+ root_node.depth.unwrap(),
101101+ vec![root_link],
102102+ blocks,
103103+ &positions,
104104+ )?;
105105+106106+ Ok(())
107107+ }
108108+109109+ pub fn vnext(
110110+ level: u32,
111111+ links: Vec<ObjectLink>,
112112+ blocks: &HashMap<ObjectLink, MaybeProcessedBlock>,
113113+ positions: &HashMap<Rkey, usize>,
114114+ ) -> Result<Vec<usize>, WalkError> {
115115+ let mut offsets = Vec::new();
116116+ let mut level_keys = Vec::new();
117117+ let mut child_links = Vec::new();
118118+119119+ for link in links {
120120+ println!(
121121+ "\n{level}~{}..",
122122+ link.to_bytes()
123123+ .iter()
124124+ .take(5)
125125+ .map(|c| format!("{c:02x}"))
126126+ .collect::<Vec<_>>()
127127+ .join("")
128128+ );
129129+130130+ let Some(mpb) = blocks.get(&link) else {
131131+ // TODO: drop an 'x' for missing node
132132+ continue;
133133+ };
134134+ let node: MstNode = match mpb {
135135+ MaybeProcessedBlock::Processed(_) => return Err(WalkError::BadCommitFingerprint),
136136+ MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
137137+ };
138138+139139+ let mut last_key = "".to_string();
140140+ let mut last_was_record = true;
141141+ for thing in node.things {
142142+ let mut node_keys = Vec::new();
143143+144144+ let has = blocks.contains_key(&thing.link);
145145+146146+ match thing.kind {
147147+ ThingKind::ChildNode => {
148148+ if has {
149149+ child_links.push(thing.link);
150150+ last_was_record = false;
151151+ }
152152+ }
153153+ ThingKind::Record(key) => {
154154+ let us = positions[&key];
155155+156156+ if !last_was_record && last_key.is_empty() {
157157+ let them = positions[&last_key];
158158+ for i in 0..(them - 1) {
159159+ if i < (us + 1) {
160160+ print!(" ");
161161+ } else {
162162+ print!("~~");
163163+ }
164164+ }
165165+ println!("~");
166166+ }
167167+168168+ for _ in 0..us {
169169+ print!(" ");
170170+ }
171171+ if has {
172172+ print!("O");
173173+ } else {
174174+ print!("x");
175175+ }
176176+ println!(" {key}");
177177+ node_keys.push(key.clone());
178178+ last_key = key;
179179+ last_was_record = true;
180180+ }
181181+ }
182182+ level_keys.push(node_keys);
183183+ }
184184+185185+ offsets.push(1);
186186+ }
187187+188188+ if !child_links.is_empty() {
189189+ Self::vnext(level - 1, child_links, blocks, positions)?; // TODO use offsets
190190+ }
191191+192192+ Ok(offsets)
193193+ }
194194+63195 fn mpb_step(
64196 &mut self,
6565- kind: ThingKind,
6666- cid: Cid,
197197+ thing: NodeThing,
67198 mpb: &MaybeProcessedBlock,
68199 process: impl Fn(Bytes) -> Bytes,
69200 ) -> Result<Option<Output>, WalkError> {
7070- match kind {
7171- ThingKind::Value { rkey } => {
201201+ match thing.kind {
202202+ ThingKind::Record(rkey) => {
72203 let data = match mpb {
73204 MaybeProcessedBlock::Raw(data) => process(data.clone()),
74205 MaybeProcessedBlock::Processed(t) => t.clone(),
···83214 self.prev_rkey = rkey.clone();
8421585216 log::trace!("val @ {rkey}");
8686- Ok(Some(Output { rkey, cid, data }))
217217+ Ok(Some(Output {
218218+ rkey,
219219+ cid: thing.link.into(),
220220+ data,
221221+ }))
87222 }
8888- ThingKind::Tree => {
223223+ ThingKind::ChildNode => {
89224 let MaybeProcessedBlock::Raw(data) = mpb else {
90225 return Err(WalkError::BadCommitFingerprint);
91226 };
···132267 /// Advance through nodes until we find a record or can't go further
133268 pub fn step(
134269 &mut self,
135135- blocks: &mut HashMap<Cid, MaybeProcessedBlock>,
270270+ blocks: &HashMap<ObjectLink, MaybeProcessedBlock>,
136271 process: impl Fn(Bytes) -> Bytes,
137137- ) -> Result<Option<Output>, WalkError> {
138138- while let Some(NodeThing { cid, kind }) = self.next_todo() {
139139- let Some(mpb) = blocks.get(&cid) else {
140140- return Err(WalkError::MissingBlock(cid));
272272+ ) -> Result<Step, WalkError> {
273273+ while let Some(NodeThing { link, kind }) = self.next_todo() {
274274+ let Some(mpb) = blocks.get(&link) else {
275275+ return Err(WalkError::MissingBlock(NodeThing { link, kind }.into()));
141276 };
142142- if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? {
143143- return Ok(Some(out));
277277+ if let Some(out) = self.mpb_step(NodeThing { link, kind }, mpb, &process)? {
278278+ return Ok(Step::Value(out));
144279 }
145280 }
146146- Ok(None)
281281+ Ok(Step::End(None))
282282+ }
283283+284284+ /// Advance through nodes, allowing for missing records
285285+ pub fn step_sparse(
286286+ &mut self,
287287+ blocks: &HashMap<ObjectLink, MaybeProcessedBlock>,
288288+ process: impl Fn(Bytes) -> Bytes,
289289+ ) -> Result<Step<Output<Option<Bytes>>>, WalkError> {
290290+ while let Some(NodeThing { link, kind }) = self.next_todo() {
291291+ let mut dummy = false;
292292+ let mpb = match blocks.get(&link) {
293293+ Some(mpb) => mpb,
294294+ None => {
295295+ if let ThingKind::Record(_) = kind {
296296+ dummy = true;
297297+ &MaybeProcessedBlock::Processed(vec![])
298298+ } else {
299299+ continue;
300300+ }
301301+ }
302302+ };
303303+ if let Some(out) = self.mpb_step(NodeThing { link, kind }, mpb, |bytes| {
304304+ if dummy { bytes } else { process(bytes) }
305305+ })? {
306306+ // eprintln!(" ----- {}", out.rkey);
307307+ return Ok(Step::Value(Output {
308308+ cid: out.cid,
309309+ rkey: out.rkey,
310310+ data: if dummy { None } else { Some(out.data) },
311311+ }));
312312+ }
313313+ }
314314+ Ok(Step::End(None))
315315+ }
316316+317317+ pub fn step_to_edge(
318318+ &mut self,
319319+ blocks: &HashMap<ObjectLink, MaybeProcessedBlock>,
320320+ ) -> Result<Option<Rkey>, WalkError> {
321321+ let mut ant = self.clone();
322322+ let mut rkey_prev = None;
323323+ loop {
324324+ match ant.step(blocks, noop) {
325325+ Err(WalkError::MissingBlock(thing)) => {
326326+ if let ThingKind::Record(rkey) = thing.kind {
327327+ rkey_prev = Some(rkey);
328328+ }
329329+ *self = ant;
330330+ ant = self.clone();
331331+ }
332332+ Err(anyother) => return Err(anyother),
333333+ Ok(z) => {
334334+ eprintln!("apparently we are too far at {z:?}");
335335+ return Ok(rkey_prev); // oop real record, mutant went too far
336336+ }
337337+ }
338338+ }
147339 }
148340149341 /// blocking!!!!!!
150342 pub fn disk_step(
151343 &mut self,
152152- blocks: &mut DiskStore,
344344+ blocks: &DiskStore,
153345 process: impl Fn(Bytes) -> Bytes,
154154- ) -> Result<Option<Output>, WalkError> {
155155- while let Some(NodeThing { cid, kind }) = self.next_todo() {
156156- let Some(block_slice) = blocks.get(&cid.to_bytes())? else {
157157- return Err(WalkError::MissingBlock(cid));
346346+ ) -> Result<Step, WalkError> {
347347+ while let Some(NodeThing { link, kind }) = self.next_todo() {
348348+ let Some(block_slice) = blocks.get(&link.to_bytes())? else {
349349+ return Err(WalkError::MissingBlock(NodeThing { link, kind }.into()));
158350 };
159351 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec());
160160- if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? {
161161- return Ok(Some(out));
352352+ if let Some(out) = self.mpb_step(NodeThing { link, kind }, &mpb, &process)? {
353353+ return Ok(Step::Value(out));
162354 }
163355 }
164164- Ok(None)
356356+ Ok(Step::End(None))
165357 }
166358}