Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

+656 -480
+3 -260
Cargo.lock
··· 95 95 checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" 96 96 97 97 [[package]] 98 - name = "arrayref" 99 - version = "0.3.9" 100 - source = "registry+https://github.com/rust-lang/crates.io-index" 101 - checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" 102 - 103 - [[package]] 104 - name = "arrayvec" 105 - version = "0.7.6" 106 - source = "registry+https://github.com/rust-lang/crates.io-index" 107 - checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" 108 - 109 - [[package]] 110 98 name = "autocfg" 111 99 version = "1.5.0" 112 100 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 145 133 146 134 [[package]] 147 135 name = "bitflags" 148 - version = "1.3.2" 149 - source = "registry+https://github.com/rust-lang/crates.io-index" 150 - checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" 151 - 152 - [[package]] 153 - name = "bitflags" 154 136 version = "2.9.4" 155 137 source = "registry+https://github.com/rust-lang/crates.io-index" 156 138 checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" 157 139 158 140 [[package]] 159 - name = "blake2b_simd" 160 - version = "1.0.4" 161 - source = "registry+https://github.com/rust-lang/crates.io-index" 162 - checksum = "b79834656f71332577234b50bfc009996f7449e0c056884e6a02492ded0ca2f3" 163 - dependencies = [ 164 - "arrayref", 165 - "arrayvec", 166 - "constant_time_eq", 167 - ] 168 - 169 - [[package]] 170 - name = "blake2s_simd" 171 - version = "1.0.4" 172 - source = "registry+https://github.com/rust-lang/crates.io-index" 173 - checksum = "ee29928bad1e3f94c9d1528da29e07a1d3d04817ae8332de1e8b846c8439f4b3" 174 - dependencies = [ 175 - "arrayref", 176 - "arrayvec", 177 - "constant_time_eq", 178 - ] 179 - 180 - [[package]] 181 - name = "blake3" 182 - version = "1.8.3" 183 - source = "registry+https://github.com/rust-lang/crates.io-index" 184 - checksum = "2468ef7d57b3fb7e16b576e8377cdbde2320c60e1491e961d11da40fc4f02a2d" 185 - dependencies = [ 186 - "arrayref", 187 - "arrayvec", 188 - "cc", 189 - "cfg-if", 190 - "constant_time_eq", 191 - "cpufeatures", 192 - ] 193 - 194 - [[package]] 195 141 name = "block-buffer" 196 142 version = "0.10.4" 197 143 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 205 151 version = "3.19.0" 206 152 source = "registry+https://github.com/rust-lang/crates.io-index" 207 153 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 208 - 209 - [[package]] 210 - name = "byteorder" 211 - version = "1.5.0" 212 - source = "registry+https://github.com/rust-lang/crates.io-index" 213 - checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 214 154 215 155 [[package]] 216 156 name = "byteorder-lite" ··· 359 299 version = "0.4.3" 360 300 source = "registry+https://github.com/rust-lang/crates.io-index" 361 301 checksum = "2f421161cb492475f1661ddc9815a745a1c894592070661180fdec3d4872e9c3" 362 - 363 - [[package]] 364 - name = "constant_time_eq" 365 - version = "0.4.2" 366 - source = "registry+https://github.com/rust-lang/crates.io-index" 367 - checksum = "3d52eff69cd5e647efe296129160853a42795992097e8af39800e1060caeea9b" 368 302 369 303 [[package]] 370 304 name = "core2" ··· 777 711 checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" 778 712 779 713 [[package]] 780 - name = "indexmap" 781 - version = "2.13.0" 782 - source = "registry+https://github.com/rust-lang/crates.io-index" 783 - checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" 784 - dependencies = [ 785 - "equivalent", 786 - "hashbrown 0.16.1", 787 - ] 788 - 789 - [[package]] 790 714 name = "interval-heap" 791 715 version = "0.0.5" 792 716 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 801 725 source = "registry+https://github.com/rust-lang/crates.io-index" 802 726 checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" 803 727 dependencies = [ 804 - "bitflags 2.9.4", 728 + "bitflags", 805 729 "cfg-if", 806 730 "libc", 807 731 ] ··· 886 810 dependencies = [ 887 811 "once_cell", 888 812 "wasm-bindgen", 889 - ] 890 - 891 - [[package]] 892 - name = "keccak" 893 - version = "0.1.5" 894 - source = "registry+https://github.com/rust-lang/crates.io-index" 895 - checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" 896 - dependencies = [ 897 - "cpufeatures", 898 813 ] 899 814 900 815 [[package]] ··· 1025 940 ] 1026 941 1027 942 [[package]] 1028 - name = "multihash-codetable" 1029 - version = "0.1.4" 1030 - source = "registry+https://github.com/rust-lang/crates.io-index" 1031 - checksum = "67996849749d25f1da9f238e8ace2ece8f9d6bdf3f9750aaf2ae7de3a5cad8ea" 1032 - dependencies = [ 1033 - "blake2b_simd", 1034 - "blake2s_simd", 1035 - "blake3", 1036 - "core2", 1037 - "digest", 1038 - "multihash-derive", 1039 - "ripemd", 1040 - "sha1", 1041 - "sha2", 1042 - "sha3", 1043 - "strobe-rs", 1044 - ] 1045 - 1046 - [[package]] 1047 - name = "multihash-derive" 1048 - version = "0.9.1" 1049 - source = "registry+https://github.com/rust-lang/crates.io-index" 1050 - checksum = "1f1b7edab35d920890b88643a765fc9bd295cf0201f4154dda231bef9b8404eb" 1051 - dependencies = [ 1052 - "core2", 1053 - "multihash", 1054 - "multihash-derive-impl", 1055 - ] 1056 - 1057 - [[package]] 1058 - name = "multihash-derive-impl" 1059 - version = "0.1.2" 1060 - source = "registry+https://github.com/rust-lang/crates.io-index" 1061 - checksum = "e3dc7141bd06405929948754f0628d247f5ca1865be745099205e5086da957cb" 1062 - dependencies = [ 1063 - "proc-macro-crate", 1064 - "proc-macro2", 1065 - "quote", 1066 - "syn 2.0.106", 1067 - "synstructure", 1068 - ] 1069 - 1070 - [[package]] 1071 943 name = "num-traits" 1072 944 version = "0.2.19" 1073 945 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1182 1054 ] 1183 1055 1184 1056 [[package]] 1185 - name = "proc-macro-crate" 1186 - version = "3.4.0" 1187 - source = "registry+https://github.com/rust-lang/crates.io-index" 1188 - checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" 1189 - dependencies = [ 1190 - "toml_edit", 1191 - ] 1192 - 1193 - [[package]] 1194 1057 name = "proc-macro2" 1195 1058 version = "1.0.101" 1196 1059 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1250 1113 source = "registry+https://github.com/rust-lang/crates.io-index" 1251 1114 checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" 1252 1115 dependencies = [ 1253 - "bitflags 2.9.4", 1116 + "bitflags", 1254 1117 ] 1255 1118 1256 1119 [[package]] ··· 1297 1160 "log", 1298 1161 "mimalloc", 1299 1162 "multibase", 1300 - "multihash-codetable", 1301 1163 "serde", 1302 1164 "serde_bytes", 1303 1165 "serde_ipld_dagcbor", ··· 1308 1170 ] 1309 1171 1310 1172 [[package]] 1311 - name = "ripemd" 1312 - version = "0.1.3" 1313 - source = "registry+https://github.com/rust-lang/crates.io-index" 1314 - checksum = "bd124222d17ad93a644ed9d011a40f4fb64aa54275c08cc216524a9ea82fb09f" 1315 - dependencies = [ 1316 - "digest", 1317 - ] 1318 - 1319 - [[package]] 1320 1173 name = "rustc-demangle" 1321 1174 version = "0.1.26" 1322 1175 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1334 1187 source = "registry+https://github.com/rust-lang/crates.io-index" 1335 1188 checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" 1336 1189 dependencies = [ 1337 - "bitflags 2.9.4", 1190 + "bitflags", 1338 1191 "errno", 1339 1192 "libc", 1340 1193 "linux-raw-sys", ··· 1451 1304 ] 1452 1305 1453 1306 [[package]] 1454 - name = "sha1" 1455 - version = "0.10.6" 1456 - source = "registry+https://github.com/rust-lang/crates.io-index" 1457 - checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" 1458 - dependencies = [ 1459 - "cfg-if", 1460 - "cpufeatures", 1461 - "digest", 1462 - ] 1463 - 1464 - [[package]] 1465 1307 name = "sha2" 1466 1308 version = "0.10.9" 1467 1309 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1473 1315 ] 1474 1316 1475 1317 [[package]] 1476 - name = "sha3" 1477 - version = "0.10.8" 1478 - source = "registry+https://github.com/rust-lang/crates.io-index" 1479 - checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60" 1480 - dependencies = [ 1481 - "digest", 1482 - "keccak", 1483 - ] 1484 - 1485 - [[package]] 1486 1318 name = "shlex" 1487 1319 version = "1.3.0" 1488 1320 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1529 1361 ] 1530 1362 1531 1363 [[package]] 1532 - name = "strobe-rs" 1533 - version = "0.10.0" 1534 - source = "registry+https://github.com/rust-lang/crates.io-index" 1535 - checksum = "98fe17535ea31344936cc58d29fec9b500b0452ddc4cc24c429c8a921a0e84e5" 1536 - dependencies = [ 1537 - "bitflags 1.3.2", 1538 - "byteorder", 1539 - "keccak", 1540 - "subtle", 1541 - "zeroize", 1542 - ] 1543 - 1544 - [[package]] 1545 1364 name = "strsim" 1546 1365 version = "0.11.1" 1547 1366 source = "registry+https://github.com/rust-lang/crates.io-index" 1548 1367 checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" 1549 - 1550 - [[package]] 1551 - name = "subtle" 1552 - version = "2.6.1" 1553 - source = "registry+https://github.com/rust-lang/crates.io-index" 1554 - checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" 1555 1368 1556 1369 [[package]] 1557 1370 name = "syn" ··· 1576 1389 ] 1577 1390 1578 1391 [[package]] 1579 - name = "synstructure" 1580 - version = "0.13.2" 1581 - source = "registry+https://github.com/rust-lang/crates.io-index" 1582 - checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" 1583 - dependencies = [ 1584 - "proc-macro2", 1585 - "quote", 1586 - "syn 2.0.106", 1587 - ] 1588 - 1589 - [[package]] 1590 1392 name = "tempfile" 1591 1393 version = "3.23.0" 1592 1394 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1678 1480 "proc-macro2", 1679 1481 "quote", 1680 1482 "syn 2.0.106", 1681 - ] 1682 - 1683 - [[package]] 1684 - name = "toml_datetime" 1685 - version = "0.7.5+spec-1.1.0" 1686 - source = "registry+https://github.com/rust-lang/crates.io-index" 1687 - checksum = "92e1cfed4a3038bc5a127e35a2d360f145e1f4b971b551a2ba5fd7aedf7e1347" 1688 - dependencies = [ 1689 - "serde_core", 1690 - ] 1691 - 1692 - [[package]] 1693 - name = "toml_edit" 1694 - version = "0.23.10+spec-1.0.0" 1695 - source = "registry+https://github.com/rust-lang/crates.io-index" 1696 - checksum = "84c8b9f757e028cee9fa244aea147aab2a9ec09d5325a9b01e0a49730c2b5269" 1697 - dependencies = [ 1698 - "indexmap", 1699 - "toml_datetime", 1700 - "toml_parser", 1701 - "winnow", 1702 - ] 1703 - 1704 - [[package]] 1705 - name = "toml_parser" 1706 - version = "1.0.6+spec-1.1.0" 1707 - source = "registry+https://github.com/rust-lang/crates.io-index" 1708 - checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" 1709 - dependencies = [ 1710 - "winnow", 1711 1483 ] 1712 1484 1713 1485 [[package]] ··· 2018 1790 checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" 2019 1791 2020 1792 [[package]] 2021 - name = "winnow" 2022 - version = "0.7.14" 2023 - source = "registry+https://github.com/rust-lang/crates.io-index" 2024 - checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" 2025 - dependencies = [ 2026 - "memchr", 2027 - ] 2028 - 2029 - [[package]] 2030 1793 name = "wit-bindgen" 2031 1794 version = "0.46.0" 2032 1795 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2057 1820 "quote", 2058 1821 "syn 2.0.106", 2059 1822 ] 2060 - 2061 - [[package]] 2062 - name = "zeroize" 2063 - version = "1.8.2" 2064 - source = "registry+https://github.com/rust-lang/crates.io-index" 2065 - checksum = "b97154e67e32c85465826e8bcc1c59429aaaf107c1e4a9e53c8d8ccd5eff88d0" 2066 - dependencies = [ 2067 - "zeroize_derive", 2068 - ] 2069 - 2070 - [[package]] 2071 - name = "zeroize_derive" 2072 - version = "1.4.3" 2073 - source = "registry+https://github.com/rust-lang/crates.io-index" 2074 - checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" 2075 - dependencies = [ 2076 - "proc-macro2", 2077 - "quote", 2078 - "syn 2.0.106", 2079 - ]
+5 -6
Cargo.toml
··· 8 8 9 9 [dependencies] 10 10 fjall = { version = "3.0.1", default-features = false } 11 - hashbrown = "0.16.1" 11 + hashbrown = { version = "0.16.1", optional = true } 12 12 cid = { version = "0.11.1", features = ["serde"] } 13 13 iroh-car = "0.5.1" 14 14 log = "0.4.28" ··· 18 18 sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower 19 19 thiserror = "2.0.17" 20 20 tokio = { version = "1.47.1", features = ["rt", "sync"] } 21 - multihash-codetable = { version = "0.1.4", features = ["sha2"] } 21 + 22 + [features] 23 + default = [] 24 + hashbrown = ["dep:hashbrown"] 22 25 23 26 [dev-dependencies] 24 27 clap = { version = "4.5.48", features = ["derive"] } ··· 48 51 # [[bench]] 49 52 # name = "leading" 50 53 # harness = false 51 - 52 - [[bench]] 53 - name = "cid-check" 54 - harness = false
-45
benches/cid-check.rs
··· 1 - use cid::Cid; 2 - use criterion::{Criterion, criterion_group, criterion_main}; 3 - use multihash_codetable::{Code, MultihashDigest}; 4 - use sha2::{Digest, Sha256}; 5 - 6 - fn multihash_verify(given: Cid, block: &[u8]) -> bool { 7 - let calculated = Cid::new_v1(0x71, Code::Sha2_256.digest(block)); 8 - calculated == given 9 - } 10 - 11 - fn effortful_verify(given: Cid, block: &[u8]) -> bool { 12 - // we know we're in atproto, so we can make a few assumptions 13 - if given.version() != cid::Version::V1 { 14 - return false; 15 - } 16 - let (codec, given_digest, _) = given.hash().into_inner(); 17 - if codec != 0x12 { 18 - return false; 19 - } 20 - given_digest[..32] == *Sha256::digest(block) 21 - } 22 - 23 - fn fastloose_verify(given: Cid, block: &[u8]) -> bool { 24 - let (_, given_digest, _) = given.hash().into_inner(); 25 - given_digest[..32] == *Sha256::digest(block) 26 - } 27 - 28 - pub fn criterion_benchmark(c: &mut Criterion) { 29 - let some_bytes: Vec<u8> = vec![0x1a, 0x00, 0xAA, 0x39, 0x8C].repeat(100); 30 - let cid = Cid::new_v1(0x71, Code::Sha2_256.digest(&some_bytes)); 31 - 32 - let mut g = c.benchmark_group("CID check"); 33 - g.bench_function("multihash", |b| { 34 - b.iter(|| multihash_verify(cid, &some_bytes)) 35 - }); 36 - g.bench_function("effortful", |b| { 37 - b.iter(|| effortful_verify(cid, &some_bytes)) 38 - }); 39 - g.bench_function("fastloose", |b| { 40 - b.iter(|| fastloose_verify(cid, &some_bytes)) 41 - }); 42 - } 43 - 44 - criterion_group!(benches, criterion_benchmark); 45 - criterion_main!(benches);
+3 -3
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::Driver; 2 + use repo_stream::{Driver, Step}; 3 3 use std::path::{Path, PathBuf}; 4 4 5 5 use criterion::{Criterion, criterion_group, criterion_main}; ··· 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 35 let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 - Driver::Memory(_, mem_driver) => mem_driver, 36 + Driver::Memory(_, _, mem_driver) => mem_driver, 37 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 38 }; 39 39 40 40 let mut n = 0; 41 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 41 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 42 42 n += pairs.len(); 43 43 } 44 44 n
+3 -3
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::Driver; 2 + use repo_stream::{Driver, Step}; 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 ··· 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 - Driver::Memory(_, mem_driver) => mem_driver, 43 + Driver::Memory(_, _, mem_driver) => mem_driver, 44 44 Driver::Disk(_) => panic!("not benching big cars here"), 45 45 }; 46 46 47 47 let mut n = 0; 48 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 48 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 49 49 n += pairs.len(); 50 50 } 51 51 n
car-samples/slice-node-after.car

This is a binary file and will not be displayed.

car-samples/slice-node-first-key.car

This is a binary file and will not be displayed.

car-samples/slice-one.car

This is a binary file and will not be displayed.

car-samples/slice-proving-absence.car

This is a binary file and will not be displayed.

+10 -7
examples/disk-read-file/main.rs
··· 9 9 static GLOBAL: MiMalloc = MiMalloc; 10 10 11 11 use clap::Parser; 12 - use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 12 + use repo_stream::{DiskBuilder, Driver, DriverBuilder, Step}; 13 13 use std::path::PathBuf; 14 14 use std::time::Instant; 15 15 ··· 42 42 .load_car(reader) 43 43 .await? 44 44 { 45 - Driver::Memory(_, _) => panic!("try this on a bigger car"), 45 + Driver::Memory(_, _, _) => panic!("try this on a bigger car"), 46 46 Driver::Disk(big_stuff) => { 47 47 // we reach here if the repo was too big and needs to be spilled to 48 48 // disk to continue ··· 51 51 let disk_store = DiskBuilder::new().open(tmpfile).await?; 52 52 53 53 // do the spilling, get back a (similar) driver 54 - let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 54 + let (commit, _, driver) = big_stuff.finish_loading(disk_store).await?; 55 55 56 56 // at this point you might want to fetch the account's signing key 57 57 // via the DID from the commit, and then verify the signature. ··· 74 74 // this example uses the disk driver's channel mode: the tree walking is 75 75 // spawned onto a blocking thread, and we get chunks of rkey+blocks back 76 76 let (mut rx, join) = driver.to_channel(512); 77 - while let Some(r) = rx.recv().await { 78 - let pairs = r?; 77 + while let Some(step) = rx.recv().await { 78 + let step = step?; 79 + let Step::Value(outputs) = step else { 80 + break; 81 + }; 79 82 80 83 // keep a count of the total number of blocks seen 81 - n += pairs.len(); 84 + n += outputs.len(); 82 85 83 - for output in pairs { 86 + for output in outputs { 84 87 // for each block, count how many bytes are equal to '0' 85 88 // (this is just an example, you probably want to do something more 86 89 // interesting)
+32
examples/print-tree/main.rs
··· 1 + /*! 2 + Read a CAR slice in memory and show some info about it. 3 + */ 4 + 5 + extern crate repo_stream; 6 + use repo_stream::{Driver, DriverBuilder}; 7 + 8 + type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 + 10 + #[tokio::main] 11 + async fn main() -> Result<()> { 12 + env_logger::init(); 13 + let reader = tokio::io::BufReader::new(tokio::io::stdin()); 14 + 15 + let (commit, driver) = match DriverBuilder::new() 16 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 17 + .load_car(reader) 18 + .await? 19 + { 20 + Driver::Memory(commit, _, mem_driver) => (commit, mem_driver), 21 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 22 + }; 23 + 24 + println!( 25 + "\nthis slice is from {}, repo rev {}\n\n", 26 + commit.did, commit.rev 27 + ); 28 + 29 + driver.viz(commit.data)?; 30 + 31 + Ok(()) 32 + }
+11 -7
examples/read-file/main.rs
··· 4 4 5 5 extern crate repo_stream; 6 6 use clap::Parser; 7 - use repo_stream::{Driver, DriverBuilder}; 7 + use repo_stream::{Driver, DriverBuilder, Output, Step}; 8 8 use std::path::PathBuf; 9 9 10 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 28 28 .load_car(reader) 29 29 .await? 30 30 { 31 - Driver::Memory(commit, mem_driver) => (commit, mem_driver), 31 + Driver::Memory(commit, _, mem_driver) => (commit, mem_driver), 32 32 Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 33 }; 34 34 35 35 log::info!("got commit: {commit:?}"); 36 36 37 - let mut n = 0; 38 - while let Some(pairs) = driver.next_chunk(256).await? { 39 - n += pairs.len(); 40 - // log::info!("got {rkey:?}"); 37 + while let Step::Value(records) = driver.next_chunk(256).await? { 38 + for Output { rkey, cid, data } in records { 39 + let size = usize::from_ne_bytes(data.try_into().unwrap()); 40 + print!("0x"); 41 + for byte in cid.to_bytes() { 42 + print!("{byte:>02x}"); 43 + } 44 + println!(": {rkey} => record of len {}", size); 45 + } 41 46 } 42 - log::info!("bye! total records={n}"); 43 47 44 48 Ok(()) 45 49 }
+62
examples/read-slice/main.rs
··· 1 + /*! 2 + Read a CAR slice in memory and show some info about it. 3 + */ 4 + 5 + extern crate repo_stream; 6 + use repo_stream::{Driver, DriverBuilder, Output, Step}; 7 + 8 + type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 + 10 + #[tokio::main] 11 + async fn main() -> Result<()> { 12 + env_logger::init(); 13 + let reader = tokio::io::BufReader::new(tokio::io::stdin()); 14 + 15 + let (commit, prev_rkey, mut driver) = match DriverBuilder::new() 16 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 17 + .load_car(reader) 18 + .await? 19 + { 20 + Driver::Memory(commit, prev, mem_driver) => (commit, prev, mem_driver), 21 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 22 + }; 23 + 24 + println!( 25 + "\nthis slice is from {}, repo rev {}", 26 + commit.did, commit.rev 27 + ); 28 + if let Some(rkey) = prev_rkey { 29 + println!(" -> key immediately before CAR slice: {rkey}"); 30 + } else { 31 + println!( 32 + " -> no key preceeding the CAR slice, so it includes the leading edge of the tree." 33 + ); 34 + } 35 + 36 + println!("included records:"); 37 + let end = loop { 38 + match driver.next_chunk(256).await? { 39 + Step::Value(chunk) => { 40 + for Output { cid, rkey, .. } in chunk { 41 + print!(" SHA256 "); 42 + for byte in cid.to_bytes().iter().skip(4).take(5) { 43 + print!("{byte:02x}"); 44 + } 45 + println!("...\t{rkey}"); 46 + } 47 + } 48 + Step::End(e) => break e, 49 + } 50 + }; 51 + 52 + println!("done walking records present in the slice."); 53 + if let Some(rkey) = end { 54 + println!(" -> key immediately after CAR slice: {rkey}"); 55 + } else { 56 + println!( 57 + " -> no key proceeding the CAR slice, so it includes the trailing edge of the tree." 58 + ); 59 + } 60 + 61 + Ok(()) 62 + }
+18 -18
readme.md
··· 11 11 [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 12 13 13 ```rust no_run 14 - use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output}; 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output, Step}; 15 15 16 16 #[tokio::main] 17 17 async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 31 31 { 32 32 33 33 // if all blocks fit within memory 34 - Driver::Memory(_commit, mut driver) => { 35 - while let Some(chunk) = driver.next_chunk(256).await? { 34 + Driver::Memory(_commit, _prev_rkey, mut driver) => { 35 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 36 36 for Output { rkey: _, cid: _, data } in chunk { 37 37 let size = usize::from_ne_bytes(data.try_into().unwrap()); 38 38 total_size += size; ··· 45 45 // set up a disk store we can spill to 46 46 let store = DiskBuilder::new().open("some/path.db".into()).await?; 47 47 // do the spilling, get back a (similar) driver 48 - let (_commit, mut driver) = paused.finish_loading(store).await?; 48 + let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 49 49 50 - while let Some(chunk) = driver.next_chunk(256).await? { 50 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 51 51 for Output { rkey: _, cid: _, data } in chunk { 52 52 let size = usize::from_ne_bytes(data.try_into().unwrap()); 53 53 total_size += size; ··· 62 62 63 63 more recent todo 64 64 - [ ] add a zero-copy rkyv process function example 65 - - [ ] repo car slices 66 - - [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling) 65 + - [ ] car slices 66 + - [ ] lazy-value stream (for rkey -> CID diffing; tap-like `#sync` handling; save a fjall record `.get` when not needed) 67 67 - [x] get an *emtpy* car for the test suite 68 68 - [x] implement a max size on disk limit 69 69 ··· 79 79 80 80 current car processing times (records processed into their length usize, phil's dev machine): 81 81 82 - - 450MiB CAR file (huge): `1.3s` 82 + - 450MiB CAR file (huge): `1.4s` 83 83 - 128MiB (huge): `350ms` 84 - - 5.0MiB: `6.8ms` 85 - - 279KiB: `160us` 86 - - 3.4KiB: `5.1us` 87 - - empty: `690ns` 84 + - 5.0MiB: `7.0ms` 85 + - 279KiB: `170us` 86 + - 3.4KiB: `5.3us` 87 + - empty: `720ns` 88 88 89 89 it's a little faster with `mimalloc` 90 90 ··· 94 94 static GLOBAL: MiMalloc = MiMalloc; 95 95 ``` 96 96 97 - - 450MiB CAR file: `1.2s` (-8%) 98 - - 128MiB: `300ms` (-14%) 99 - - 5.0MiB: `6.0ms` (-11%) 100 - - 279KiB: `150us` (-7%) 101 - - 3.4KiB: `4.7us` (-8%) 102 - - empty: `670ns` (-4%) 97 + - 450MiB CAR file: `1.1s` (-15%) 98 + - 128MiB: `300ms` (-15%) 99 + - 5.0MiB: `5.5ms` (-21%) 100 + - 279KiB: `140us` (-17%) 101 + - 3.4KiB: `4.3us` (-18%) 102 + - empty: `610ns` (-16%) 103 103 104 104 processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!) 105 105
+1 -1
src/disk.rs
··· 148 148 } 149 149 150 150 #[inline] 151 - pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 151 + pub(crate) fn get(&self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 152 152 self.keyspace.get(key) 153 153 } 154 154
+82 -57
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 + use crate::link::{NodeThing, ObjectLink, ThingKind}; 3 4 use crate::{ 4 - Bytes, HashMap, 5 + Bytes, HashMap, Rkey, Step, 5 6 disk::{DiskError, DiskStore}, 6 7 mst::MstNode, 7 - walk::Output, 8 + walk::{MstError, Output}, 8 9 }; 9 10 use cid::Cid; 10 11 use iroh_car::CarReader; 11 - use multihash_codetable::{Code, MultihashDigest}; 12 12 use std::convert::Infallible; 13 13 use tokio::{io::AsyncRead, sync::mpsc}; 14 14 ··· 20 20 pub enum DriveError { 21 21 #[error("Error from iroh_car: {0}")] 22 22 CarReader(#[from] iroh_car::Error), 23 - #[error("Block did not match its CID")] 24 - BadCID, 25 23 #[error("Failed to decode commit block: {0}")] 26 24 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 27 25 #[error("The Commit block reference by the root was not found")] ··· 36 34 ChannelSendError, // SendError takes <T> which we don't need 37 35 #[error("Failed to join a task: {0}")] 38 36 JoinError(#[from] tokio::task::JoinError), 37 + } 38 + 39 + impl From<MstError> for DriveError { 40 + fn from(me: MstError) -> DriveError { 41 + DriveError::WalkError(WalkError::MstError(me)) 42 + } 39 43 } 40 44 41 45 /// An in-order chunk of Rkey + CID + (processed) Block ··· 110 114 /// 111 115 /// You probably want to check the commit's signature. You can go ahead and 112 116 /// walk the MST right away. 113 - Memory(Commit, MemDriver), 117 + Memory(Commit, Option<Rkey>, MemDriver), 114 118 /// Blocks exceed the memory limit 115 119 /// 116 120 /// You'll need to provide a disk storage to continue. The commit will be ··· 124 128 block 125 129 } 126 130 127 - // iroh-car doesn't verify CIDs!!!!!! 128 - #[inline(always)] 129 - fn verify_block(given: Cid, block: &[u8]) -> bool { 130 - Cid::new_v1(0x71, Code::Sha2_256.digest(block)) == given 131 - } 132 - 133 131 /// Builder-style driver setup 134 132 #[derive(Debug, Clone)] 135 133 pub struct DriverBuilder { ··· 205 203 // try to load all the blocks into memory 206 204 let mut mem_size = 0; 207 205 while let Some((cid, data)) = car.next_block().await? { 208 - // lkasdjflkajdsflkajsfdlkjasdf 209 - if !verify_block(cid, &data) { 210 - return Err(DriveError::BadCID); 211 - } 212 - 213 206 // the root commit is a Special Third Kind of block that we need to make 214 207 // sure not to optimistically send to the processing function 215 208 if cid == root { ··· 223 216 224 217 // stash (maybe processed) blocks in memory as long as we have room 225 218 mem_size += maybe_processed.len(); 226 - mem_blocks.insert(cid, maybe_processed); 219 + mem_blocks.insert(cid.into(), maybe_processed); 227 220 if mem_size >= max_size { 228 221 return Ok(Driver::Disk(NeedDisk { 229 222 car, ··· 247 240 MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 248 241 MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 249 242 }; 250 - let walker = Walker::new(root_node); 243 + let mut walker = Walker::new(root_node); 244 + 245 + // eprintln!("going to edge..."); 246 + let edge = walker.step_to_edge(&mem_blocks)?; 247 + // eprintln!("got edge? {edge:?}"); 251 248 252 249 Ok(Driver::Memory( 253 250 commit, 251 + edge, 254 252 MemDriver { 255 253 blocks: mem_blocks, 256 254 walker, 257 255 process, 256 + next_missing: None, 258 257 }, 259 258 )) 260 259 } ··· 275 274 /// so the sync/async boundaries become a little easier to work around. 276 275 #[derive(Debug)] 277 276 pub struct MemDriver { 278 - blocks: HashMap<Cid, MaybeProcessedBlock>, 277 + blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 279 278 walker: Walker, 280 - process: fn(Bytes) -> Bytes, 279 + process: fn(Bytes) -> Bytes, // TODO: impl Fn(bytes) -> Bytes? 280 + next_missing: Option<NodeThing>, 281 281 } 282 282 283 283 impl MemDriver { 284 + pub fn viz(&self, tree: ObjectLink) -> Result<(), WalkError> { 285 + self.walker.viz(&self.blocks, tree) 286 + } 284 287 /// Step through the record outputs, in rkey order 285 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 288 + pub async fn next_chunk(&mut self, n: usize) -> Result<Step<BlockChunk>, DriveError> { 289 + if let Some(ref mut missing) = self.next_missing { 290 + while let Step::Value(sparse_out) = 291 + self.walker.step_sparse(&self.blocks, self.process)? 292 + { 293 + if missing.kind == ThingKind::ChildNode { 294 + *missing = NodeThing { 295 + link: sparse_out.cid.into(), 296 + kind: ThingKind::Record(sparse_out.rkey), 297 + }; 298 + } 299 + } 300 + // TODO: l asdflkja slfkja lkdfj lakjd f 301 + // TODO: make the walker finish walking to verify no more present blocks (oops sparse tree) 302 + // HACK: just get the last rkey if it's there -- i think we might actually need to walk for it though 303 + // ...and walk to verify rkey order of the rest of the nodes anyway? 304 + return Ok(match &missing.kind { 305 + ThingKind::ChildNode => Step::End(None), 306 + ThingKind::Record(rkey) => Step::End(Some(rkey.clone())), 307 + }); 308 + } 286 309 let mut out = Vec::with_capacity(n); 310 + // let mut err; 287 311 for _ in 0..n { 288 - // walk as far as we can until we run out of blocks or find a record 289 - let Some(output) = self.walker.step(&mut self.blocks, self.process)? else { 290 - break; 291 - }; 292 - out.push(output); 312 + match self.walker.step(&self.blocks, self.process) { 313 + Ok(Step::Value(record)) => out.push(record), 314 + Ok(Step::End(None)) => break, 315 + Ok(Step::End(_)) => todo!("actually this should be unreachable?"), 316 + Err(WalkError::MissingBlock(missing)) => { 317 + self.next_missing = Some(*missing); 318 + return Ok(Step::Value(out)); // nb: might be empty! 319 + } 320 + Err(other) => return Err(other.into()), 321 + } 293 322 } 294 323 if out.is_empty() { 295 - Ok(None) 324 + Ok(Step::End(None)) 296 325 } else { 297 - Ok(Some(out)) 326 + Ok(Step::Value(out)) 298 327 } 299 328 } 300 329 } ··· 305 334 root: Cid, 306 335 process: fn(Bytes) -> Bytes, 307 336 max_size: usize, 308 - mem_blocks: HashMap<Cid, MaybeProcessedBlock>, 337 + mem_blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 309 338 pub commit: Option<Commit>, 310 339 } 311 340 ··· 313 342 pub async fn finish_loading( 314 343 mut self, 315 344 mut store: DiskStore, 316 - ) -> Result<(Commit, DiskDriver), DriveError> { 345 + ) -> Result<(Commit, Option<Rkey>, DiskDriver), DriveError> { 317 346 // move store in and back out so we can manage lifetimes 318 347 // dump mem blocks into the store 319 348 store = tokio::task::spawn(async move { ··· 327 356 }) 328 357 .await??; 329 358 330 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1); 359 + let (tx, mut rx) = mpsc::channel::<Vec<(ObjectLink, MaybeProcessedBlock)>>(1); 331 360 332 361 let store_worker = tokio::task::spawn_blocking(move || { 333 362 while let Some(chunk) = rx.blocking_recv() { ··· 348 377 let Some((cid, data)) = self.car.next_block().await? else { 349 378 break; 350 379 }; 351 - 352 - // lkasdjflkajdsflkajsfdlkjasdf 353 - if !verify_block(cid, &data) { 354 - return Err(DriveError::BadCID); 355 - } 356 - 357 380 // we still gotta keep checking for the root since we might not have it 358 381 if cid == self.root { 359 382 let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; ··· 361 384 continue; 362 385 } 363 386 387 + let link = cid.into(); 364 388 let data = Bytes::from(data); 365 389 366 390 // remaining possible types: node, record, other. optimistically process 367 391 // TODO: get the actual in-memory size to compute disk spill 368 392 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 369 393 mem_size += maybe_processed.len(); 370 - chunk.push((cid, maybe_processed)); 394 + chunk.push((link, maybe_processed)); 371 395 if mem_size >= (self.max_size / 2) { 372 396 // soooooo if we're setting the db cache to max_size and then letting 373 397 // multiple chunks in the queue that are >= max_size, then at any time ··· 391 415 392 416 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 393 417 394 - // the commit always must point to a Node; empty node => empty MST special case 395 418 let db_bytes = store 396 419 .get(&commit.data.to_bytes()) 397 420 .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? ··· 405 428 406 429 Ok(( 407 430 commit, 431 + None, 408 432 DiskDriver { 409 433 process: self.process, 410 434 state: Some(BigState { store, walker }), ··· 437 461 /// Walk the MST returning up to `n` rkey + record pairs 438 462 /// 439 463 /// ```no_run 440 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 464 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 441 465 /// # #[tokio::main] 442 466 /// # async fn main() -> Result<(), DriveError> { 443 467 /// # let mut disk_driver = _get_fake_disk_driver(); 444 - /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 445 - /// for output in pairs { 468 + /// while let Step::Value(outputs) = disk_driver.next_chunk(256).await? { 469 + /// for output in outputs { 446 470 /// println!("{}: size={}", output.rkey, output.data.len()); 447 471 /// } 448 472 /// } 449 473 /// # Ok(()) 450 474 /// # } 451 475 /// ``` 452 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 476 + pub async fn next_chunk(&mut self, n: usize) -> Result<Step<Vec<Output>>, DriveError> { 453 477 let process = self.process; 454 478 455 479 // state should only *ever* be None transiently while inside here ··· 464 488 465 489 for _ in 0..n { 466 490 // walk as far as we can until we run out of blocks or find a record 467 - let step = match state.walker.disk_step(&mut state.store, process) { 491 + let step = match state.walker.disk_step(&state.store, process) { 468 492 Ok(s) => s, 469 493 Err(e) => { 470 494 return (state, Err(e.into())); 471 495 } 472 496 }; 473 - let Some(output) = step else { 497 + let Step::Value(output) = step else { 474 498 break; 475 499 }; 476 500 out.push(output); ··· 486 510 let out = res?; 487 511 488 512 if out.is_empty() { 489 - Ok(None) 513 + Ok(Step::End(None)) 490 514 } else { 491 - Ok(Some(out)) 515 + Ok(Step::Value(out)) 492 516 } 493 517 } 494 518 495 519 fn read_tx_blocking( 496 520 &mut self, 497 521 n: usize, 498 - tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 499 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 522 + tx: mpsc::Sender<Result<Step<BlockChunk>, DriveError>>, 523 + ) -> Result<(), mpsc::error::SendError<Result<Step<BlockChunk>, DriveError>>> { 500 524 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 501 525 502 526 loop { ··· 510 534 Err(e) => return tx.blocking_send(Err(e.into())), 511 535 }; 512 536 513 - let Some(output) = step else { 537 + let Step::Value(output) = step else { 514 538 break; 515 539 }; 516 540 out.push(output); ··· 519 543 if out.is_empty() { 520 544 break; 521 545 } 522 - tx.blocking_send(Ok(out))?; 546 + tx.blocking_send(Ok(Step::Value(out)))?; 523 547 } 524 548 525 549 Ok(()) ··· 536 560 /// benefit over just using `.next_chunk(n)`. 537 561 /// 538 562 /// ```no_run 539 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 563 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, Step, noop}; 540 564 /// # #[tokio::main] 541 565 /// # async fn main() -> Result<(), DriveError> { 542 566 /// # let mut disk_driver = _get_fake_disk_driver(); 543 567 /// let (mut rx, join) = disk_driver.to_channel(512); 544 568 /// while let Some(recvd) = rx.recv().await { 545 - /// let pairs = recvd?; 546 - /// for output in pairs { 569 + /// let outputs = recvd?; 570 + /// let Step::Value(outputs) = outputs else { break; }; 571 + /// for output in outputs { 547 572 /// println!("{}: size={}", output.rkey, output.data.len()); 548 573 /// } 549 574 /// ··· 555 580 mut self, 556 581 n: usize, 557 582 ) -> ( 558 - mpsc::Receiver<Result<BlockChunk, DriveError>>, 583 + mpsc::Receiver<Result<Step<BlockChunk>, DriveError>>, 559 584 tokio::task::JoinHandle<Self>, 560 585 ) { 561 - let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 586 + let (tx, rx) = mpsc::channel::<Result<Step<BlockChunk>, DriveError>>(1); 562 587 563 588 // sketch: this worker is going to be allowed to execute without a join handle 564 589 let chan_task = tokio::task::spawn_blocking(move || {
+14 -6
src/lib.rs
··· 18 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 19 20 20 ``` 21 - use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder, Step}; 22 22 23 23 # #[tokio::main] 24 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { ··· 35 35 { 36 36 37 37 // if all blocks fit within memory 38 - Driver::Memory(_commit, mut driver) => { 39 - while let Some(chunk) = driver.next_chunk(256).await? { 38 + Driver::Memory(_commit, _prev_rkey, mut driver) => { 39 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 40 40 for output in chunk { 41 41 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 42 ··· 50 50 // set up a disk store we can spill to 51 51 let store = DiskBuilder::new().open("some/path.db".into()).await?; 52 52 // do the spilling, get back a (similar) driver 53 - let (_commit, mut driver) = paused.finish_loading(store).await?; 53 + let (_commit, _prev_rkey, mut driver) = paused.finish_loading(store).await?; 54 54 55 - while let Some(chunk) = driver.next_chunk(256).await? { 55 + while let Step::Value(chunk) = driver.next_chunk(256).await? { 56 56 for output in chunk { 57 57 let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 58 58 ··· 82 82 83 83 pub mod disk; 84 84 pub mod drive; 85 + pub mod link; 85 86 86 87 pub use disk::{DiskBuilder, DiskError, DiskStore}; 87 88 pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 89 + pub use link::NodeThing; 88 90 pub use mst::Commit; 89 - pub use walk::Output; 91 + pub use walk::{Output, Step}; 90 92 91 93 pub type Bytes = Vec<u8>; 92 94 95 + pub type Rkey = String; 96 + 97 + #[cfg(feature = "hashbrown")] 93 98 pub(crate) use hashbrown::HashMap; 99 + 100 + #[cfg(not(feature = "hashbrown"))] 101 + pub(crate) use std::collections::HashMap; 94 102 95 103 #[doc = include_str!("../readme.md")] 96 104 #[cfg(doctest)]
+43
src/link.rs
··· 1 + use cid::Cid; 2 + 3 + #[derive(Debug, serde::Deserialize, Clone, PartialEq, Eq, Hash)] 4 + pub struct ObjectLink(Cid); 5 + 6 + impl ObjectLink { 7 + pub fn to_bytes(&self) -> Vec<u8> { 8 + self.0.to_bytes() 9 + } 10 + } 11 + 12 + impl From<Cid> for ObjectLink { 13 + fn from(cid: Cid) -> ObjectLink { 14 + ObjectLink(cid) 15 + } 16 + } 17 + 18 + impl From<ObjectLink> for Cid { 19 + fn from(link: ObjectLink) -> Cid { 20 + link.0 21 + } 22 + } 23 + 24 + #[derive(Debug, Clone)] 25 + pub struct NodeThing { 26 + pub link: ObjectLink, 27 + pub kind: ThingKind, 28 + } 29 + 30 + impl NodeThing { 31 + pub fn is_record(&self) -> bool { 32 + match self.kind { 33 + ThingKind::ChildNode => false, 34 + ThingKind::Record(_) => true, 35 + } 36 + } 37 + } 38 + 39 + #[derive(Debug, Clone, PartialEq)] 40 + pub enum ThingKind { 41 + ChildNode, 42 + Record(crate::Rkey), 43 + }
+17 -30
src/mst.rs
··· 3 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 5 6 + use crate::link::{NodeThing, ObjectLink, ThingKind}; 6 7 use cid::Cid; 7 8 use serde::Deserialize; 9 + use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 8 10 use sha2::{Digest, Sha256}; 11 + use std::fmt; 12 + 13 + pub type Depth = u32; 9 14 10 15 /// The top-level data object in a repository's tree is a signed commit. 11 16 #[derive(Debug, Deserialize)] ··· 17 22 /// fixed value of 3 for this repo format version 18 23 pub version: u64, 19 24 /// pointer to the top of the repo contents tree structure (MST) 20 - pub data: Cid, 25 + pub data: ObjectLink, 21 26 /// revision of the repo, used as a logical clock. 22 27 /// 23 28 /// TID format. Must increase monotonically. Recommend using current ··· 31 36 /// exist in the CBOR object, but is virtually always null. NOTE: previously 32 37 /// specified as nullable and optional, but this caused interoperability 33 38 /// issues. 34 - pub prev: Option<Cid>, 39 + pub prev: Option<ObjectLink>, 35 40 /// cryptographic signature of this commit, as raw bytes 36 - #[serde(with = "serde_bytes")] 37 41 pub sig: serde_bytes::ByteBuf, 38 42 } 39 43 40 - use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 41 - use std::fmt; 42 - 43 - pub type Depth = u32; 44 - 45 44 #[inline(always)] 46 45 pub fn atproto_mst_depth(key: &str) -> Depth { 47 46 // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24 48 47 u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2 49 48 } 50 49 51 - #[derive(Debug)] 50 + #[derive(Debug, Clone)] 52 51 pub(crate) struct MstNode { 53 52 pub depth: Option<Depth>, // known for nodes with entries (required for root) 54 53 pub things: Vec<NodeThing>, 55 54 } 56 55 57 - #[derive(Debug)] 58 - pub(crate) struct NodeThing { 59 - pub(crate) cid: Cid, 60 - pub(crate) kind: ThingKind, 61 - } 62 - 63 - #[derive(Debug)] 64 - pub(crate) enum ThingKind { 65 - Tree, 66 - Value { rkey: String }, 67 - } 68 - 69 56 impl<'de> Deserialize<'de> for MstNode { 70 57 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 71 58 where ··· 96 83 return Err(de::Error::duplicate_field("l")); 97 84 } 98 85 found_left = true; 99 - if let Some(cid) = map.next_value()? { 86 + if let Some(link) = map.next_value()? { 100 87 left = Some(NodeThing { 101 - cid, 102 - kind: ThingKind::Tree, 88 + link, 89 + kind: ThingKind::ChildNode, 103 90 }); 104 91 } 105 92 } ··· 142 129 } 143 130 144 131 things.push(NodeThing { 145 - cid: entry.value, 146 - kind: ThingKind::Value { rkey: rkey_s }, 132 + link: entry.value.into(), 133 + kind: ThingKind::Record(rkey_s), 147 134 }); 148 135 149 - if let Some(cid) = entry.tree { 136 + if let Some(link) = entry.tree { 150 137 things.push(NodeThing { 151 - cid, 152 - kind: ThingKind::Tree, 138 + link, 139 + kind: ThingKind::ChildNode, 153 140 }); 154 141 } 155 142 ··· 229 216 /// the lower level must have keys sorting after this TreeEntry's key (to 230 217 /// the "right"), but before the next TreeEntry's key in this Node (if any) 231 218 #[serde(rename = "t")] 232 - pub tree: Option<Cid>, 219 + pub tree: Option<ObjectLink>, 233 220 }
+224 -32
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 - use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock}; 3 + use crate::link::{NodeThing, ObjectLink, ThingKind}; 4 + use crate::mst::{Depth, MstNode}; 5 + use crate::{Bytes, HashMap, Rkey, disk::DiskStore, drive::MaybeProcessedBlock, noop}; 5 6 use cid::Cid; 6 7 use std::convert::Infallible; 7 8 ··· 16 17 MstError(#[from] MstError), 17 18 #[error("storage error: {0}")] 18 19 StorageError(#[from] fjall::Error), 19 - #[error("block not found: {0}")] 20 - MissingBlock(Cid), 20 + #[error("block not found: {0:?}")] 21 + MissingBlock(Box<NodeThing>), 21 22 } 22 23 23 24 /// Errors from invalid Rkeys ··· 30 31 #[error("MST depth underflow: depth-0 node with child trees")] 31 32 DepthUnderflow, 32 33 #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 33 - RkeyOutOfOrder { prev: String, rkey: String }, 34 + RkeyOutOfOrder { prev: Rkey, rkey: Rkey }, 34 35 } 35 36 36 37 /// Walker outputs 38 + /// 39 + /// TODO: rename to "Record" or "Entry" or something 37 40 #[derive(Debug, PartialEq)] 38 - pub struct Output { 39 - pub rkey: String, 41 + pub struct Output<T = Bytes> { 42 + pub rkey: Rkey, // TODO: aaa it's not really rkey, it's just "key" (or split to collection/rkey??) 40 43 pub cid: Cid, 41 - pub data: Bytes, 44 + pub data: T, 45 + } 46 + 47 + #[derive(Debug, PartialEq)] 48 + pub enum Step<T = Output> { 49 + Value(T), 50 + End(Option<Rkey>), 42 51 } 43 52 44 53 /// Traverser of an atproto MST 45 54 /// 46 55 /// Walks the tree from left-to-right in depth-first order 47 - #[derive(Debug)] 56 + #[derive(Debug, Clone)] 48 57 pub struct Walker { 49 - prev_rkey: String, 58 + prev_rkey: Rkey, 50 59 root_depth: Depth, 51 60 todo: Vec<Vec<NodeThing>>, 52 61 } ··· 60 69 } 61 70 } 62 71 72 + pub fn viz( 73 + &self, 74 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 75 + root_link: ObjectLink, 76 + ) -> Result<(), WalkError> { 77 + let root_block = blocks.get(&root_link).ok_or(WalkError::MissingBlock( 78 + NodeThing { 79 + link: root_link.clone(), 80 + kind: ThingKind::ChildNode, 81 + } 82 + .into(), 83 + ))?; 84 + 85 + let root_node: MstNode = match root_block { 86 + MaybeProcessedBlock::Processed(_) => return Err(WalkError::BadCommitFingerprint), 87 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 88 + }; 89 + 90 + let mut positions = HashMap::new(); 91 + let mut w = Walker::new(root_node.clone()); 92 + 93 + let mut pos_idx = 0; 94 + while let Step::Value(Output { rkey, .. }) = w.step_sparse(blocks, noop)? { 95 + positions.insert(rkey, pos_idx); 96 + pos_idx += 1; 97 + } 98 + 99 + Self::vnext( 100 + root_node.depth.unwrap(), 101 + vec![root_link], 102 + blocks, 103 + &positions, 104 + )?; 105 + 106 + Ok(()) 107 + } 108 + 109 + pub fn vnext( 110 + level: u32, 111 + links: Vec<ObjectLink>, 112 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 113 + positions: &HashMap<Rkey, usize>, 114 + ) -> Result<Vec<usize>, WalkError> { 115 + let mut offsets = Vec::new(); 116 + let mut level_keys = Vec::new(); 117 + let mut child_links = Vec::new(); 118 + 119 + for link in links { 120 + println!( 121 + "\n{level}~{}..", 122 + link.to_bytes() 123 + .iter() 124 + .take(5) 125 + .map(|c| format!("{c:02x}")) 126 + .collect::<Vec<_>>() 127 + .join("") 128 + ); 129 + 130 + let Some(mpb) = blocks.get(&link) else { 131 + // TODO: drop an 'x' for missing node 132 + continue; 133 + }; 134 + let node: MstNode = match mpb { 135 + MaybeProcessedBlock::Processed(_) => return Err(WalkError::BadCommitFingerprint), 136 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 137 + }; 138 + 139 + let mut last_key = "".to_string(); 140 + let mut last_was_record = true; 141 + for thing in node.things { 142 + let mut node_keys = Vec::new(); 143 + 144 + let has = blocks.contains_key(&thing.link); 145 + 146 + match thing.kind { 147 + ThingKind::ChildNode => { 148 + if has { 149 + child_links.push(thing.link); 150 + last_was_record = false; 151 + } 152 + } 153 + ThingKind::Record(key) => { 154 + let us = positions[&key]; 155 + 156 + if !last_was_record && last_key.is_empty() { 157 + let them = positions[&last_key]; 158 + for i in 0..(them - 1) { 159 + if i < (us + 1) { 160 + print!(" "); 161 + } else { 162 + print!("~~"); 163 + } 164 + } 165 + println!("~"); 166 + } 167 + 168 + for _ in 0..us { 169 + print!(" "); 170 + } 171 + if has { 172 + print!("O"); 173 + } else { 174 + print!("x"); 175 + } 176 + println!(" {key}"); 177 + node_keys.push(key.clone()); 178 + last_key = key; 179 + last_was_record = true; 180 + } 181 + } 182 + level_keys.push(node_keys); 183 + } 184 + 185 + offsets.push(1); 186 + } 187 + 188 + if !child_links.is_empty() { 189 + Self::vnext(level - 1, child_links, blocks, positions)?; // TODO use offsets 190 + } 191 + 192 + Ok(offsets) 193 + } 194 + 63 195 fn mpb_step( 64 196 &mut self, 65 - kind: ThingKind, 66 - cid: Cid, 197 + thing: NodeThing, 67 198 mpb: &MaybeProcessedBlock, 68 199 process: impl Fn(Bytes) -> Bytes, 69 200 ) -> Result<Option<Output>, WalkError> { 70 - match kind { 71 - ThingKind::Value { rkey } => { 201 + match thing.kind { 202 + ThingKind::Record(rkey) => { 72 203 let data = match mpb { 73 204 MaybeProcessedBlock::Raw(data) => process(data.clone()), 74 205 MaybeProcessedBlock::Processed(t) => t.clone(), ··· 83 214 self.prev_rkey = rkey.clone(); 84 215 85 216 log::trace!("val @ {rkey}"); 86 - Ok(Some(Output { rkey, cid, data })) 217 + Ok(Some(Output { 218 + rkey, 219 + cid: thing.link.into(), 220 + data, 221 + })) 87 222 } 88 - ThingKind::Tree => { 223 + ThingKind::ChildNode => { 89 224 let MaybeProcessedBlock::Raw(data) = mpb else { 90 225 return Err(WalkError::BadCommitFingerprint); 91 226 }; ··· 132 267 /// Advance through nodes until we find a record or can't go further 133 268 pub fn step( 134 269 &mut self, 135 - blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 270 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 136 271 process: impl Fn(Bytes) -> Bytes, 137 - ) -> Result<Option<Output>, WalkError> { 138 - while let Some(NodeThing { cid, kind }) = self.next_todo() { 139 - let Some(mpb) = blocks.get(&cid) else { 140 - return Err(WalkError::MissingBlock(cid)); 272 + ) -> Result<Step, WalkError> { 273 + while let Some(NodeThing { link, kind }) = self.next_todo() { 274 + let Some(mpb) = blocks.get(&link) else { 275 + return Err(WalkError::MissingBlock(NodeThing { link, kind }.into())); 141 276 }; 142 - if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 143 - return Ok(Some(out)); 277 + if let Some(out) = self.mpb_step(NodeThing { link, kind }, mpb, &process)? { 278 + return Ok(Step::Value(out)); 144 279 } 145 280 } 146 - Ok(None) 281 + Ok(Step::End(None)) 282 + } 283 + 284 + /// Advance through nodes, allowing for missing records 285 + pub fn step_sparse( 286 + &mut self, 287 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 288 + process: impl Fn(Bytes) -> Bytes, 289 + ) -> Result<Step<Output<Option<Bytes>>>, WalkError> { 290 + while let Some(NodeThing { link, kind }) = self.next_todo() { 291 + let mut dummy = false; 292 + let mpb = match blocks.get(&link) { 293 + Some(mpb) => mpb, 294 + None => { 295 + if let ThingKind::Record(_) = kind { 296 + dummy = true; 297 + &MaybeProcessedBlock::Processed(vec![]) 298 + } else { 299 + continue; 300 + } 301 + } 302 + }; 303 + if let Some(out) = self.mpb_step(NodeThing { link, kind }, mpb, |bytes| { 304 + if dummy { bytes } else { process(bytes) } 305 + })? { 306 + // eprintln!(" ----- {}", out.rkey); 307 + return Ok(Step::Value(Output { 308 + cid: out.cid, 309 + rkey: out.rkey, 310 + data: if dummy { None } else { Some(out.data) }, 311 + })); 312 + } 313 + } 314 + Ok(Step::End(None)) 315 + } 316 + 317 + pub fn step_to_edge( 318 + &mut self, 319 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 320 + ) -> Result<Option<Rkey>, WalkError> { 321 + let mut ant = self.clone(); 322 + let mut rkey_prev = None; 323 + loop { 324 + match ant.step(blocks, noop) { 325 + Err(WalkError::MissingBlock(thing)) => { 326 + if let ThingKind::Record(rkey) = thing.kind { 327 + rkey_prev = Some(rkey); 328 + } 329 + *self = ant; 330 + ant = self.clone(); 331 + } 332 + Err(anyother) => return Err(anyother), 333 + Ok(z) => { 334 + eprintln!("apparently we are too far at {z:?}"); 335 + return Ok(rkey_prev); // oop real record, mutant went too far 336 + } 337 + } 338 + } 147 339 } 148 340 149 341 /// blocking!!!!!! 150 342 pub fn disk_step( 151 343 &mut self, 152 - blocks: &mut DiskStore, 344 + blocks: &DiskStore, 153 345 process: impl Fn(Bytes) -> Bytes, 154 - ) -> Result<Option<Output>, WalkError> { 155 - while let Some(NodeThing { cid, kind }) = self.next_todo() { 156 - let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 157 - return Err(WalkError::MissingBlock(cid)); 346 + ) -> Result<Step, WalkError> { 347 + while let Some(NodeThing { link, kind }) = self.next_todo() { 348 + let Some(block_slice) = blocks.get(&link.to_bytes())? else { 349 + return Err(WalkError::MissingBlock(NodeThing { link, kind }.into())); 158 350 }; 159 351 let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 160 - if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 161 - return Ok(Some(out)); 352 + if let Some(out) = self.mpb_step(NodeThing { link, kind }, &mpb, &process)? { 353 + return Ok(Step::Value(out)); 162 354 } 163 355 } 164 - Ok(None) 356 + Ok(Step::End(None)) 165 357 } 166 358 }
+124
tests/car-slices.rs
··· 1 + extern crate repo_stream; 2 + use repo_stream::{Driver, Output, Step}; 3 + 4 + const RECORD_SLICE: &'static [u8] = include_bytes!("../car-samples/slice-one.car"); 5 + const RECORD_NODE_FIRST_KEY: &'static [u8] = 6 + include_bytes!("../car-samples/slice-node-first-key.car"); 7 + const RECORD_NODE_AFTER: &'static [u8] = include_bytes!("../car-samples/slice-node-after.car"); 8 + const RECORD_NODE_ABSENT: &'static [u8] = 9 + include_bytes!("../car-samples/slice-proving-absence.car"); 10 + 11 + async fn test_car_slice( 12 + bytes: &[u8], 13 + expected_records: usize, 14 + expected_sum: usize, 15 + expect_preceeding: Option<&str>, 16 + expect_rkey: Option<&str>, 17 + expect_proceeding: Option<&str>, 18 + ) { 19 + let (mut driver, before) = match Driver::load_car( 20 + bytes, 21 + |block| block.len().to_ne_bytes().to_vec(), 22 + 10, /* MiB */ 23 + ) 24 + .await 25 + .unwrap() 26 + { 27 + Driver::Memory(_commit, before, mem_driver) => (mem_driver, before), 28 + Driver::Disk(_) => panic!("too big"), 29 + }; 30 + 31 + assert_eq!(before.as_deref(), expect_preceeding); 32 + 33 + let mut found_records = 0; 34 + let mut sum = 0; 35 + let mut found_expected_rkey = false; 36 + let mut prev_rkey = "".to_string(); 37 + 38 + while let Ok(step) = driver.next_chunk(256).await { 39 + match step { 40 + Step::Value(records) => { 41 + for Output { rkey, cid: _, data } in records { 42 + found_records += 1; 43 + 44 + let (int_bytes, _) = data.split_at(size_of::<usize>()); 45 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 46 + 47 + sum += size; 48 + if Some(rkey.as_str()) == expect_rkey { 49 + found_expected_rkey = true; 50 + } 51 + eprintln!("!!!! {rkey}"); 52 + assert!(rkey > prev_rkey, "rkeys are streamed in order"); 53 + prev_rkey = rkey; 54 + } 55 + } 56 + Step::End(proceeding) => { 57 + assert_eq!(proceeding.as_deref(), expect_proceeding); 58 + break; 59 + } 60 + } 61 + } 62 + 63 + assert_eq!(found_records, expected_records); 64 + if expected_records > 0 { 65 + assert!(found_expected_rkey); 66 + assert_eq!(sum, expected_sum); 67 + } else { 68 + assert!(!found_expected_rkey); 69 + } 70 + } 71 + 72 + #[tokio::test] 73 + async fn test_record_slice_car() { 74 + test_car_slice( 75 + RECORD_SLICE, 76 + 1, 77 + 212, 78 + Some("app.bsky.feed.like/3mcfzfbpaml27"), 79 + Some("app.bsky.feed.like/3mcg72x6bi32z"), 80 + Some("app.bsky.feed.like/3mcga2o2efq27"), 81 + ) 82 + .await 83 + } 84 + 85 + #[tokio::test] 86 + async fn test_record_slice_node_first_key() { 87 + test_car_slice( 88 + RECORD_NODE_FIRST_KEY, 89 + 1, 90 + 212, 91 + None, 92 + Some("app.bsky.feed.like/3lohfzs6qea24"), 93 + Some("app.bsky.feed.post/3m72vlnelw227"), 94 + ) 95 + .await 96 + } 97 + 98 + #[tokio::test] 99 + async fn test_record_slice_node_after() { 100 + test_car_slice( 101 + RECORD_NODE_AFTER, 102 + 1, 103 + 212, 104 + Some("app.bsky.feed.like/3mbzi6ttskp2c"), 105 + Some("app.bsky.feed.like/3mcqqwzsc7x26"), 106 + Some("app.bsky.feed.post/3lbn6of6qxc2a"), 107 + ) 108 + .await 109 + } 110 + 111 + #[tokio::test] 112 + async fn test_record_slice_proving_absence() { 113 + // missing key is `app.bsky.feed.like/3lohfzs6qea23` 114 + // NOTE: repo-stream output here isn't enough info for proof 115 + test_car_slice( 116 + RECORD_NODE_ABSENT, 117 + 0, 118 + 0, 119 + Some("app.bsky.feed.post/3m72vlnelw227"), 120 + None, 121 + None, 122 + ) 123 + .await 124 + }
+4 -5
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::Driver; 3 - use repo_stream::Output; 2 + use repo_stream::{Driver, Output, Step}; 4 3 5 4 const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); ··· 15 14 ) { 16 15 let mut driver = match Driver::load_car( 17 16 bytes, 18 - |block| block.len().to_ne_bytes().to_vec().into(), 17 + |block| block.len().to_ne_bytes().to_vec(), 19 18 10, /* MiB */ 20 19 ) 21 20 .await 22 21 .unwrap() 23 22 { 24 - Driver::Memory(_commit, mem_driver) => mem_driver, 23 + Driver::Memory(_commit, _, mem_driver) => mem_driver, 25 24 Driver::Disk(_) => panic!("too big"), 26 25 }; 27 26 ··· 30 29 let mut found_bsky_profile = false; 31 30 let mut prev_rkey = "".to_string(); 32 31 33 - while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 32 + while let Step::Value(pairs) = driver.next_chunk(256).await.unwrap() { 34 33 for Output { rkey, cid: _, data } in pairs { 35 34 records += 1; 36 35