Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

Changed files
+234 -2764
benches
car-samples
examples
disk-read-file
src
tests
+133 -1186
Cargo.lock
··· 3 3 version = 4 4 4 5 5 [[package]] 6 - name = "addr2line" 7 - version = "0.25.1" 8 - source = "registry+https://github.com/rust-lang/crates.io-index" 9 - checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" 10 - dependencies = [ 11 - "gimli", 12 - ] 13 - 14 - [[package]] 15 - name = "adler2" 16 - version = "2.0.1" 17 - source = "registry+https://github.com/rust-lang/crates.io-index" 18 - checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" 19 - 20 - [[package]] 21 - name = "aho-corasick" 22 - version = "1.1.3" 23 - source = "registry+https://github.com/rust-lang/crates.io-index" 24 - checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" 25 - dependencies = [ 26 - "memchr", 27 - ] 28 - 29 - [[package]] 30 - name = "anes" 31 - version = "0.1.6" 32 - source = "registry+https://github.com/rust-lang/crates.io-index" 33 - checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" 34 - 35 - [[package]] 36 6 name = "anstream" 37 7 version = "0.6.21" 38 8 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 68 38 source = "registry+https://github.com/rust-lang/crates.io-index" 69 39 checksum = "9e231f6134f61b71076a3eab506c379d4f36122f2af15a9ff04415ea4c3339e2" 70 40 dependencies = [ 71 - "windows-sys 0.60.2", 41 + "windows-sys", 72 42 ] 73 43 74 44 [[package]] ··· 79 49 dependencies = [ 80 50 "anstyle", 81 51 "once_cell_polyfill", 82 - "windows-sys 0.60.2", 83 - ] 84 - 85 - [[package]] 86 - name = "anyhow" 87 - version = "1.0.100" 88 - source = "registry+https://github.com/rust-lang/crates.io-index" 89 - checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" 90 - 91 - [[package]] 92 - name = "autocfg" 93 - version = "1.5.0" 94 - source = "registry+https://github.com/rust-lang/crates.io-index" 95 - checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" 96 - 97 - [[package]] 98 - name = "backtrace" 99 - version = "0.3.76" 100 - source = "registry+https://github.com/rust-lang/crates.io-index" 101 - checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" 102 - dependencies = [ 103 - "addr2line", 104 - "cfg-if", 105 - "libc", 106 - "miniz_oxide", 107 - "object", 108 - "rustc-demangle", 109 - "windows-link", 110 - ] 111 - 112 - [[package]] 113 - name = "base-x" 114 - version = "0.2.11" 115 - source = "registry+https://github.com/rust-lang/crates.io-index" 116 - checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" 117 - 118 - [[package]] 119 - name = "base256emoji" 120 - version = "1.0.2" 121 - source = "registry+https://github.com/rust-lang/crates.io-index" 122 - checksum = "b5e9430d9a245a77c92176e649af6e275f20839a48389859d1661e9a128d077c" 123 - dependencies = [ 124 - "const-str", 125 - "match-lookup", 126 - ] 127 - 128 - [[package]] 129 - name = "bincode" 130 - version = "2.0.1" 131 - source = "registry+https://github.com/rust-lang/crates.io-index" 132 - checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" 133 - dependencies = [ 134 - "bincode_derive", 135 - "serde", 136 - "unty", 137 - ] 138 - 139 - [[package]] 140 - name = "bincode_derive" 141 - version = "2.0.1" 142 - source = "registry+https://github.com/rust-lang/crates.io-index" 143 - checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" 144 - dependencies = [ 145 - "virtue", 52 + "windows-sys", 146 53 ] 147 54 148 55 [[package]] ··· 152 59 checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" 153 60 154 61 [[package]] 155 - name = "block-buffer" 156 - version = "0.10.4" 62 + name = "byteorder-lite" 63 + version = "0.1.0" 157 64 source = "registry+https://github.com/rust-lang/crates.io-index" 158 - checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" 159 - dependencies = [ 160 - "generic-array", 161 - ] 162 - 163 - [[package]] 164 - name = "bumpalo" 165 - version = "3.19.0" 166 - source = "registry+https://github.com/rust-lang/crates.io-index" 167 - checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 - 169 - [[package]] 170 - name = "bytes" 171 - version = "1.10.1" 172 - source = "registry+https://github.com/rust-lang/crates.io-index" 173 - checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 174 - 175 - [[package]] 176 - name = "cast" 177 - version = "0.3.0" 178 - source = "registry+https://github.com/rust-lang/crates.io-index" 179 - checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" 65 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 180 66 181 67 [[package]] 182 - name = "cbor4ii" 183 - version = "0.2.14" 68 + name = "byteview" 69 + version = "0.10.0" 184 70 source = "registry+https://github.com/rust-lang/crates.io-index" 185 - checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4" 186 - dependencies = [ 187 - "serde", 188 - ] 71 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 189 72 190 73 [[package]] 191 74 name = "cfg-if" ··· 194 77 checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" 195 78 196 79 [[package]] 197 - name = "ciborium" 198 - version = "0.2.2" 199 - source = "registry+https://github.com/rust-lang/crates.io-index" 200 - checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" 201 - dependencies = [ 202 - "ciborium-io", 203 - "ciborium-ll", 204 - "serde", 205 - ] 206 - 207 - [[package]] 208 - name = "ciborium-io" 209 - version = "0.2.2" 210 - source = "registry+https://github.com/rust-lang/crates.io-index" 211 - checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" 212 - 213 - [[package]] 214 - name = "ciborium-ll" 215 - version = "0.2.2" 216 - source = "registry+https://github.com/rust-lang/crates.io-index" 217 - checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" 218 - dependencies = [ 219 - "ciborium-io", 220 - "half", 221 - ] 222 - 223 - [[package]] 224 - name = "cid" 225 - version = "0.11.1" 226 - source = "registry+https://github.com/rust-lang/crates.io-index" 227 - checksum = "3147d8272e8fa0ccd29ce51194dd98f79ddfb8191ba9e3409884e751798acf3a" 228 - dependencies = [ 229 - "core2", 230 - "multibase", 231 - "multihash", 232 - "serde", 233 - "serde_bytes", 234 - "unsigned-varint 0.8.0", 235 - ] 236 - 237 - [[package]] 238 80 name = "clap" 239 81 version = "4.5.48" 240 82 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 265 107 "heck", 266 108 "proc-macro2", 267 109 "quote", 268 - "syn 2.0.106", 110 + "syn", 269 111 ] 270 112 271 113 [[package]] ··· 281 123 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 124 283 125 [[package]] 284 - name = "const-str" 285 - version = "0.4.3" 286 - source = "registry+https://github.com/rust-lang/crates.io-index" 287 - checksum = "2f421161cb492475f1661ddc9815a745a1c894592070661180fdec3d4872e9c3" 288 - 289 - [[package]] 290 - name = "core2" 291 - version = "0.4.0" 126 + name = "compare" 127 + version = "0.0.6" 292 128 source = "registry+https://github.com/rust-lang/crates.io-index" 293 - checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" 294 - dependencies = [ 295 - "memchr", 296 - ] 129 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 297 130 298 131 [[package]] 299 - name = "cpufeatures" 300 - version = "0.2.17" 132 + name = "crossbeam-epoch" 133 + version = "0.9.18" 301 134 source = "registry+https://github.com/rust-lang/crates.io-index" 302 - checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" 135 + checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" 303 136 dependencies = [ 304 - "libc", 137 + "crossbeam-utils", 305 138 ] 306 139 307 140 [[package]] 308 - name = "criterion" 309 - version = "0.7.0" 141 + name = "crossbeam-skiplist" 142 + version = "0.1.3" 310 143 source = "registry+https://github.com/rust-lang/crates.io-index" 311 - checksum = "e1c047a62b0cc3e145fa84415a3191f628e980b194c2755aa12300a4e6cbd928" 312 - dependencies = [ 313 - "anes", 314 - "cast", 315 - "ciborium", 316 - "clap", 317 - "criterion-plot", 318 - "itertools", 319 - "num-traits", 320 - "oorandom", 321 - "plotters", 322 - "rayon", 323 - "regex", 324 - "serde", 325 - "serde_json", 326 - "tinytemplate", 327 - "tokio", 328 - "walkdir", 329 - ] 330 - 331 - [[package]] 332 - name = "criterion-plot" 333 - version = "0.6.0" 334 - source = "registry+https://github.com/rust-lang/crates.io-index" 335 - checksum = "9b1bcc0dc7dfae599d84ad0b1a55f80cde8af3725da8313b528da95ef783e338" 336 - dependencies = [ 337 - "cast", 338 - "itertools", 339 - ] 340 - 341 - [[package]] 342 - name = "crossbeam-deque" 343 - version = "0.8.6" 344 - source = "registry+https://github.com/rust-lang/crates.io-index" 345 - checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" 144 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 346 145 dependencies = [ 347 146 "crossbeam-epoch", 348 - "crossbeam-utils", 349 - ] 350 - 351 - [[package]] 352 - name = "crossbeam-epoch" 353 - version = "0.9.18" 354 - source = "registry+https://github.com/rust-lang/crates.io-index" 355 - checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" 356 - dependencies = [ 357 147 "crossbeam-utils", 358 148 ] 359 149 ··· 364 154 checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" 365 155 366 156 [[package]] 367 - name = "crunchy" 368 - version = "0.2.4" 369 - source = "registry+https://github.com/rust-lang/crates.io-index" 370 - checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" 371 - 372 - [[package]] 373 - name = "crypto-common" 374 - version = "0.1.6" 375 - source = "registry+https://github.com/rust-lang/crates.io-index" 376 - checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" 377 - dependencies = [ 378 - "generic-array", 379 - "typenum", 380 - ] 381 - 382 - [[package]] 383 - name = "data-encoding" 384 - version = "2.9.0" 385 - source = "registry+https://github.com/rust-lang/crates.io-index" 386 - checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" 387 - 388 - [[package]] 389 - name = "data-encoding-macro" 390 - version = "0.1.18" 391 - source = "registry+https://github.com/rust-lang/crates.io-index" 392 - checksum = "47ce6c96ea0102f01122a185683611bd5ac8d99e62bc59dd12e6bda344ee673d" 393 - dependencies = [ 394 - "data-encoding", 395 - "data-encoding-macro-internal", 396 - ] 397 - 398 - [[package]] 399 - name = "data-encoding-macro-internal" 400 - version = "0.1.16" 401 - source = "registry+https://github.com/rust-lang/crates.io-index" 402 - checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" 403 - dependencies = [ 404 - "data-encoding", 405 - "syn 2.0.106", 406 - ] 407 - 408 - [[package]] 409 - name = "digest" 410 - version = "0.10.7" 157 + name = "dashmap" 158 + version = "6.1.0" 411 159 source = "registry+https://github.com/rust-lang/crates.io-index" 412 - checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" 160 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 413 161 dependencies = [ 414 - "block-buffer", 415 - "crypto-common", 162 + "cfg-if", 163 + "crossbeam-utils", 164 + "hashbrown 0.14.5", 165 + "lock_api", 166 + "once_cell", 167 + "parking_lot_core", 416 168 ] 417 169 418 170 [[package]] 419 - name = "either" 420 - version = "1.15.0" 171 + name = "enum_dispatch" 172 + version = "0.3.13" 421 173 source = "registry+https://github.com/rust-lang/crates.io-index" 422 - checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 - 424 - [[package]] 425 - name = "env_filter" 426 - version = "0.1.3" 427 - source = "registry+https://github.com/rust-lang/crates.io-index" 428 - checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0" 174 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 429 175 dependencies = [ 430 - "log", 431 - "regex", 176 + "once_cell", 177 + "proc-macro2", 178 + "quote", 179 + "syn", 432 180 ] 433 181 434 182 [[package]] 435 - name = "env_logger" 436 - version = "0.11.8" 183 + name = "equivalent" 184 + version = "1.0.2" 437 185 source = "registry+https://github.com/rust-lang/crates.io-index" 438 - checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" 439 - dependencies = [ 440 - "anstream", 441 - "anstyle", 442 - "env_filter", 443 - "jiff", 444 - "log", 445 - ] 186 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 446 187 447 188 [[package]] 448 189 name = "errno" ··· 451 192 checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" 452 193 dependencies = [ 453 194 "libc", 454 - "windows-sys 0.60.2", 195 + "windows-sys", 455 196 ] 456 197 457 198 [[package]] 458 - name = "fallible-iterator" 459 - version = "0.3.0" 460 - source = "registry+https://github.com/rust-lang/crates.io-index" 461 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 462 - 463 - [[package]] 464 - name = "fallible-streaming-iterator" 465 - version = "0.1.9" 466 - source = "registry+https://github.com/rust-lang/crates.io-index" 467 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 468 - 469 - [[package]] 470 199 name = "fastrand" 471 200 version = "2.3.0" 472 201 source = "registry+https://github.com/rust-lang/crates.io-index" 473 202 checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 474 203 475 204 [[package]] 476 - name = "foldhash" 477 - version = "0.1.5" 205 + name = "fjall" 206 + version = "3.0.1" 478 207 source = "registry+https://github.com/rust-lang/crates.io-index" 479 - checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 480 - 481 - [[package]] 482 - name = "futures" 483 - version = "0.3.31" 484 - source = "registry+https://github.com/rust-lang/crates.io-index" 485 - checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" 208 + checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093" 486 209 dependencies = [ 487 - "futures-channel", 488 - "futures-core", 489 - "futures-executor", 490 - "futures-io", 491 - "futures-sink", 492 - "futures-task", 493 - "futures-util", 210 + "byteorder-lite", 211 + "byteview", 212 + "dashmap", 213 + "flume", 214 + "log", 215 + "lsm-tree", 216 + "lz4_flex", 217 + "tempfile", 218 + "xxhash-rust", 494 219 ] 495 220 496 221 [[package]] 497 - name = "futures-channel" 498 - version = "0.3.31" 222 + name = "flume" 223 + version = "0.12.0" 499 224 source = "registry+https://github.com/rust-lang/crates.io-index" 500 - checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" 225 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 501 226 dependencies = [ 502 - "futures-core", 503 - "futures-sink", 504 - ] 505 - 506 - [[package]] 507 - name = "futures-core" 508 - version = "0.3.31" 509 - source = "registry+https://github.com/rust-lang/crates.io-index" 510 - checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" 511 - 512 - [[package]] 513 - name = "futures-executor" 514 - version = "0.3.31" 515 - source = "registry+https://github.com/rust-lang/crates.io-index" 516 - checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" 517 - dependencies = [ 518 - "futures-core", 519 - "futures-task", 520 - "futures-util", 521 - ] 522 - 523 - [[package]] 524 - name = "futures-io" 525 - version = "0.3.31" 526 - source = "registry+https://github.com/rust-lang/crates.io-index" 527 - checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" 528 - 529 - [[package]] 530 - name = "futures-macro" 531 - version = "0.3.31" 532 - source = "registry+https://github.com/rust-lang/crates.io-index" 533 - checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" 534 - dependencies = [ 535 - "proc-macro2", 536 - "quote", 537 - "syn 2.0.106", 538 - ] 539 - 540 - [[package]] 541 - name = "futures-sink" 542 - version = "0.3.31" 543 - source = "registry+https://github.com/rust-lang/crates.io-index" 544 - checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" 545 - 546 - [[package]] 547 - name = "futures-task" 548 - version = "0.3.31" 549 - source = "registry+https://github.com/rust-lang/crates.io-index" 550 - checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" 551 - 552 - [[package]] 553 - name = "futures-util" 554 - version = "0.3.31" 555 - source = "registry+https://github.com/rust-lang/crates.io-index" 556 - checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" 557 - dependencies = [ 558 - "futures-channel", 559 - "futures-core", 560 - "futures-io", 561 - "futures-macro", 562 - "futures-sink", 563 - "futures-task", 564 - "memchr", 565 - "pin-project-lite", 566 - "pin-utils", 567 - "slab", 568 - ] 569 - 570 - [[package]] 571 - name = "generic-array" 572 - version = "0.14.9" 573 - source = "registry+https://github.com/rust-lang/crates.io-index" 574 - checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" 575 - dependencies = [ 576 - "typenum", 577 - "version_check", 227 + "spin", 578 228 ] 579 229 580 230 [[package]] ··· 586 236 "cfg-if", 587 237 "libc", 588 238 "r-efi", 589 - "wasi 0.14.7+wasi-0.2.4", 239 + "wasi", 590 240 ] 591 241 592 242 [[package]] 593 - name = "gimli" 594 - version = "0.32.3" 243 + name = "hashbrown" 244 + version = "0.14.5" 595 245 source = "registry+https://github.com/rust-lang/crates.io-index" 596 - checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" 597 - 598 - [[package]] 599 - name = "half" 600 - version = "2.7.0" 601 - source = "registry+https://github.com/rust-lang/crates.io-index" 602 - checksum = "e54c115d4f30f52c67202f079c5f9d8b49db4691f460fdb0b4c2e838261b2ba5" 603 - dependencies = [ 604 - "cfg-if", 605 - "crunchy", 606 - "zerocopy", 607 - ] 246 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 608 247 609 248 [[package]] 610 249 name = "hashbrown" 611 - version = "0.15.5" 612 - source = "registry+https://github.com/rust-lang/crates.io-index" 613 - checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 614 - dependencies = [ 615 - "foldhash", 616 - ] 617 - 618 - [[package]] 619 - name = "hashlink" 620 - version = "0.10.0" 250 + version = "0.16.1" 621 251 source = "registry+https://github.com/rust-lang/crates.io-index" 622 - checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 - dependencies = [ 624 - "hashbrown", 625 - ] 252 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 626 253 627 254 [[package]] 628 255 name = "heck" ··· 631 258 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 259 633 260 [[package]] 634 - name = "io-uring" 635 - version = "0.7.10" 636 - source = "registry+https://github.com/rust-lang/crates.io-index" 637 - checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" 638 - dependencies = [ 639 - "bitflags", 640 - "cfg-if", 641 - "libc", 642 - ] 643 - 644 - [[package]] 645 - name = "ipld-core" 646 - version = "0.4.2" 647 - source = "registry+https://github.com/rust-lang/crates.io-index" 648 - checksum = "104718b1cc124d92a6d01ca9c9258a7df311405debb3408c445a36452f9bf8db" 649 - dependencies = [ 650 - "cid", 651 - "serde", 652 - "serde_bytes", 653 - ] 654 - 655 - [[package]] 656 - name = "iroh-car" 657 - version = "0.5.1" 261 + name = "interval-heap" 262 + version = "0.0.5" 658 263 source = "registry+https://github.com/rust-lang/crates.io-index" 659 - checksum = "cb7f8cd4cb9aa083fba8b52e921764252d0b4dcb1cd6d120b809dbfe1106e81a" 264 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 660 265 dependencies = [ 661 - "anyhow", 662 - "cid", 663 - "futures", 664 - "serde", 665 - "serde_ipld_dagcbor", 666 - "thiserror 1.0.69", 667 - "tokio", 668 - "unsigned-varint 0.7.2", 266 + "compare", 669 267 ] 670 268 671 269 [[package]] ··· 675 273 checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" 676 274 677 275 [[package]] 678 - name = "itertools" 679 - version = "0.13.0" 680 - source = "registry+https://github.com/rust-lang/crates.io-index" 681 - checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" 682 - dependencies = [ 683 - "either", 684 - ] 685 - 686 - [[package]] 687 - name = "itoa" 688 - version = "1.0.15" 689 - source = "registry+https://github.com/rust-lang/crates.io-index" 690 - checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" 691 - 692 - [[package]] 693 - name = "jiff" 694 - version = "0.2.15" 695 - source = "registry+https://github.com/rust-lang/crates.io-index" 696 - checksum = "be1f93b8b1eb69c77f24bbb0afdf66f54b632ee39af40ca21c4365a1d7347e49" 697 - dependencies = [ 698 - "jiff-static", 699 - "log", 700 - "portable-atomic", 701 - "portable-atomic-util", 702 - "serde", 703 - ] 704 - 705 - [[package]] 706 - name = "jiff-static" 707 - version = "0.2.15" 708 - source = "registry+https://github.com/rust-lang/crates.io-index" 709 - checksum = "03343451ff899767262ec32146f6d559dd759fdadf42ff0e227c7c48f72594b4" 710 - dependencies = [ 711 - "proc-macro2", 712 - "quote", 713 - "syn 2.0.106", 714 - ] 715 - 716 - [[package]] 717 - name = "js-sys" 718 - version = "0.3.81" 719 - source = "registry+https://github.com/rust-lang/crates.io-index" 720 - checksum = "ec48937a97411dcb524a265206ccd4c90bb711fca92b2792c407f268825b9305" 721 - dependencies = [ 722 - "once_cell", 723 - "wasm-bindgen", 724 - ] 725 - 726 - [[package]] 727 276 name = "libc" 728 277 version = "0.2.176" 729 278 source = "registry+https://github.com/rust-lang/crates.io-index" 730 279 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 731 280 732 281 [[package]] 733 - name = "libsqlite3-sys" 734 - version = "0.35.0" 735 - source = "registry+https://github.com/rust-lang/crates.io-index" 736 - checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 737 - dependencies = [ 738 - "pkg-config", 739 - "vcpkg", 740 - ] 741 - 742 - [[package]] 743 282 name = "linux-raw-sys" 744 283 version = "0.11.0" 745 284 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 761 300 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 301 763 302 [[package]] 764 - name = "match-lookup" 765 - version = "0.1.1" 766 - source = "registry+https://github.com/rust-lang/crates.io-index" 767 - checksum = "1265724d8cb29dbbc2b0f06fffb8bf1a8c0cf73a78eede9ba73a4a66c52a981e" 768 - dependencies = [ 769 - "proc-macro2", 770 - "quote", 771 - "syn 1.0.109", 772 - ] 773 - 774 - [[package]] 775 - name = "memchr" 776 - version = "2.7.6" 777 - source = "registry+https://github.com/rust-lang/crates.io-index" 778 - checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 779 - 780 - [[package]] 781 - name = "miniz_oxide" 782 - version = "0.8.9" 783 - source = "registry+https://github.com/rust-lang/crates.io-index" 784 - checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" 785 - dependencies = [ 786 - "adler2", 787 - ] 788 - 789 - [[package]] 790 - name = "mio" 791 - version = "1.0.4" 303 + name = "lsm-tree" 304 + version = "3.0.1" 792 305 source = "registry+https://github.com/rust-lang/crates.io-index" 793 - checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" 306 + checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 794 307 dependencies = [ 795 - "libc", 796 - "wasi 0.11.1+wasi-snapshot-preview1", 797 - "windows-sys 0.59.0", 308 + "byteorder-lite", 309 + "byteview", 310 + "crossbeam-skiplist", 311 + "enum_dispatch", 312 + "interval-heap", 313 + "log", 314 + "lz4_flex", 315 + "quick_cache", 316 + "rustc-hash", 317 + "self_cell", 318 + "sfa", 319 + "tempfile", 320 + "varint-rs", 321 + "xxhash-rust", 798 322 ] 799 323 800 324 [[package]] 801 - name = "multibase" 802 - version = "0.9.2" 325 + name = "lz4_flex" 326 + version = "0.11.5" 803 327 source = "registry+https://github.com/rust-lang/crates.io-index" 804 - checksum = "8694bb4835f452b0e3bb06dbebb1d6fc5385b6ca1caf2e55fd165c042390ec77" 328 + checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" 805 329 dependencies = [ 806 - "base-x", 807 - "base256emoji", 808 - "data-encoding", 809 - "data-encoding-macro", 810 - ] 811 - 812 - [[package]] 813 - name = "multihash" 814 - version = "0.19.3" 815 - source = "registry+https://github.com/rust-lang/crates.io-index" 816 - checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d" 817 - dependencies = [ 818 - "core2", 819 - "serde", 820 - "unsigned-varint 0.8.0", 821 - ] 822 - 823 - [[package]] 824 - name = "num-traits" 825 - version = "0.2.19" 826 - source = "registry+https://github.com/rust-lang/crates.io-index" 827 - checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" 828 - dependencies = [ 829 - "autocfg", 830 - ] 831 - 832 - [[package]] 833 - name = "object" 834 - version = "0.37.3" 835 - source = "registry+https://github.com/rust-lang/crates.io-index" 836 - checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" 837 - dependencies = [ 838 - "memchr", 330 + "twox-hash", 839 331 ] 840 332 841 333 [[package]] ··· 851 343 checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" 852 344 853 345 [[package]] 854 - name = "oorandom" 855 - version = "11.1.5" 856 - source = "registry+https://github.com/rust-lang/crates.io-index" 857 - checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" 858 - 859 - [[package]] 860 - name = "parking_lot" 861 - version = "0.12.5" 862 - source = "registry+https://github.com/rust-lang/crates.io-index" 863 - checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" 864 - dependencies = [ 865 - "lock_api", 866 - "parking_lot_core", 867 - ] 868 - 869 - [[package]] 870 346 name = "parking_lot_core" 871 347 version = "0.9.12" 872 348 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 880 356 ] 881 357 882 358 [[package]] 883 - name = "pin-project-lite" 884 - version = "0.2.16" 359 + name = "proc-macro2" 360 + version = "1.0.101" 885 361 source = "registry+https://github.com/rust-lang/crates.io-index" 886 - checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" 887 - 888 - [[package]] 889 - name = "pin-utils" 890 - version = "0.1.0" 891 - source = "registry+https://github.com/rust-lang/crates.io-index" 892 - checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 893 - 894 - [[package]] 895 - name = "pkg-config" 896 - version = "0.3.32" 897 - source = "registry+https://github.com/rust-lang/crates.io-index" 898 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 899 - 900 - [[package]] 901 - name = "plotters" 902 - version = "0.3.7" 903 - source = "registry+https://github.com/rust-lang/crates.io-index" 904 - checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" 362 + checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 905 363 dependencies = [ 906 - "num-traits", 907 - "plotters-backend", 908 - "plotters-svg", 909 - "wasm-bindgen", 910 - "web-sys", 364 + "unicode-ident", 911 365 ] 912 366 913 367 [[package]] 914 - name = "plotters-backend" 915 - version = "0.3.7" 916 - source = "registry+https://github.com/rust-lang/crates.io-index" 917 - checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" 918 - 919 - [[package]] 920 - name = "plotters-svg" 921 - version = "0.3.7" 368 + name = "quick_cache" 369 + version = "0.6.18" 922 370 source = "registry+https://github.com/rust-lang/crates.io-index" 923 - checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" 371 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 924 372 dependencies = [ 925 - "plotters-backend", 926 - ] 927 - 928 - [[package]] 929 - name = "portable-atomic" 930 - version = "1.11.1" 931 - source = "registry+https://github.com/rust-lang/crates.io-index" 932 - checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" 933 - 934 - [[package]] 935 - name = "portable-atomic-util" 936 - version = "0.2.4" 937 - source = "registry+https://github.com/rust-lang/crates.io-index" 938 - checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" 939 - dependencies = [ 940 - "portable-atomic", 941 - ] 942 - 943 - [[package]] 944 - name = "proc-macro2" 945 - version = "1.0.101" 946 - source = "registry+https://github.com/rust-lang/crates.io-index" 947 - checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 948 - dependencies = [ 949 - "unicode-ident", 373 + "equivalent", 374 + "hashbrown 0.16.1", 950 375 ] 951 376 952 377 [[package]] ··· 965 390 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 966 391 967 392 [[package]] 968 - name = "rayon" 969 - version = "1.11.0" 970 - source = "registry+https://github.com/rust-lang/crates.io-index" 971 - checksum = "368f01d005bf8fd9b1206fb6fa653e6c4a81ceb1466406b81792d87c5677a58f" 972 - dependencies = [ 973 - "either", 974 - "rayon-core", 975 - ] 976 - 977 - [[package]] 978 - name = "rayon-core" 979 - version = "1.13.0" 980 - source = "registry+https://github.com/rust-lang/crates.io-index" 981 - checksum = "22e18b0f0062d30d4230b2e85ff77fdfe4326feb054b9783a3460d8435c8ab91" 982 - dependencies = [ 983 - "crossbeam-deque", 984 - "crossbeam-utils", 985 - ] 986 - 987 - [[package]] 988 393 name = "redox_syscall" 989 394 version = "0.5.18" 990 395 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 994 399 ] 995 400 996 401 [[package]] 997 - name = "regex" 998 - version = "1.11.3" 999 - source = "registry+https://github.com/rust-lang/crates.io-index" 1000 - checksum = "8b5288124840bee7b386bc413c487869b360b2b4ec421ea56425128692f2a82c" 1001 - dependencies = [ 1002 - "aho-corasick", 1003 - "memchr", 1004 - "regex-automata", 1005 - "regex-syntax", 1006 - ] 1007 - 1008 - [[package]] 1009 - name = "regex-automata" 1010 - version = "0.4.11" 1011 - source = "registry+https://github.com/rust-lang/crates.io-index" 1012 - checksum = "833eb9ce86d40ef33cb1306d8accf7bc8ec2bfea4355cbdebb3df68b40925cad" 1013 - dependencies = [ 1014 - "aho-corasick", 1015 - "memchr", 1016 - "regex-syntax", 1017 - ] 1018 - 1019 - [[package]] 1020 - name = "regex-syntax" 1021 - version = "0.8.6" 1022 - source = "registry+https://github.com/rust-lang/crates.io-index" 1023 - checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" 1024 - 1025 - [[package]] 1026 402 name = "repo-stream" 1027 - version = "0.2.0" 403 + version = "0.2.2" 1028 404 dependencies = [ 1029 - "bincode", 1030 405 "clap", 1031 - "criterion", 1032 - "env_logger", 1033 - "futures", 1034 - "futures-core", 1035 - "ipld-core", 1036 - "iroh-car", 1037 - "log", 1038 - "multibase", 1039 - "rusqlite", 1040 - "serde", 1041 - "serde_bytes", 1042 - "serde_ipld_dagcbor", 1043 - "sha2", 1044 - "tempfile", 1045 - "thiserror 2.0.17", 1046 - "tokio", 1047 - ] 1048 - 1049 - [[package]] 1050 - name = "rusqlite" 1051 - version = "0.37.0" 1052 - source = "registry+https://github.com/rust-lang/crates.io-index" 1053 - checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1054 - dependencies = [ 1055 - "bitflags", 1056 - "fallible-iterator", 1057 - "fallible-streaming-iterator", 1058 - "hashlink", 1059 - "libsqlite3-sys", 1060 - "smallvec", 406 + "fjall", 1061 407 ] 1062 408 1063 409 [[package]] 1064 - name = "rustc-demangle" 1065 - version = "0.1.26" 410 + name = "rustc-hash" 411 + version = "2.1.1" 1066 412 source = "registry+https://github.com/rust-lang/crates.io-index" 1067 - checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 413 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1068 414 1069 415 [[package]] 1070 416 name = "rustix" ··· 1076 422 "errno", 1077 423 "libc", 1078 424 "linux-raw-sys", 1079 - "windows-sys 0.60.2", 1080 - ] 1081 - 1082 - [[package]] 1083 - name = "rustversion" 1084 - version = "1.0.22" 1085 - source = "registry+https://github.com/rust-lang/crates.io-index" 1086 - checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" 1087 - 1088 - [[package]] 1089 - name = "ryu" 1090 - version = "1.0.20" 1091 - source = "registry+https://github.com/rust-lang/crates.io-index" 1092 - checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" 1093 - 1094 - [[package]] 1095 - name = "same-file" 1096 - version = "1.0.6" 1097 - source = "registry+https://github.com/rust-lang/crates.io-index" 1098 - checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" 1099 - dependencies = [ 1100 - "winapi-util", 425 + "windows-sys", 1101 426 ] 1102 427 1103 428 [[package]] ··· 1107 432 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1108 433 1109 434 [[package]] 1110 - name = "serde" 1111 - version = "1.0.228" 435 + name = "self_cell" 436 + version = "1.2.2" 1112 437 source = "registry+https://github.com/rust-lang/crates.io-index" 1113 - checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" 1114 - dependencies = [ 1115 - "serde_core", 1116 - "serde_derive", 1117 - ] 438 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1118 439 1119 440 [[package]] 1120 - name = "serde_bytes" 1121 - version = "0.11.19" 1122 - source = "registry+https://github.com/rust-lang/crates.io-index" 1123 - checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" 1124 - dependencies = [ 1125 - "serde", 1126 - "serde_core", 1127 - ] 1128 - 1129 - [[package]] 1130 - name = "serde_core" 1131 - version = "1.0.228" 441 + name = "sfa" 442 + version = "1.0.0" 1132 443 source = "registry+https://github.com/rust-lang/crates.io-index" 1133 - checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" 444 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1134 445 dependencies = [ 1135 - "serde_derive", 446 + "byteorder-lite", 447 + "log", 448 + "xxhash-rust", 1136 449 ] 1137 450 1138 451 [[package]] 1139 - name = "serde_derive" 1140 - version = "1.0.228" 1141 - source = "registry+https://github.com/rust-lang/crates.io-index" 1142 - checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" 1143 - dependencies = [ 1144 - "proc-macro2", 1145 - "quote", 1146 - "syn 2.0.106", 1147 - ] 1148 - 1149 - [[package]] 1150 - name = "serde_ipld_dagcbor" 1151 - version = "0.6.4" 1152 - source = "registry+https://github.com/rust-lang/crates.io-index" 1153 - checksum = "46182f4f08349a02b45c998ba3215d3f9de826246ba02bb9dddfe9a2a2100778" 1154 - dependencies = [ 1155 - "cbor4ii", 1156 - "ipld-core", 1157 - "scopeguard", 1158 - "serde", 1159 - ] 1160 - 1161 - [[package]] 1162 - name = "serde_json" 1163 - version = "1.0.145" 1164 - source = "registry+https://github.com/rust-lang/crates.io-index" 1165 - checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" 1166 - dependencies = [ 1167 - "itoa", 1168 - "memchr", 1169 - "ryu", 1170 - "serde", 1171 - "serde_core", 1172 - ] 1173 - 1174 - [[package]] 1175 - name = "sha2" 1176 - version = "0.10.9" 1177 - source = "registry+https://github.com/rust-lang/crates.io-index" 1178 - checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" 1179 - dependencies = [ 1180 - "cfg-if", 1181 - "cpufeatures", 1182 - "digest", 1183 - ] 1184 - 1185 - [[package]] 1186 - name = "signal-hook-registry" 1187 - version = "1.4.6" 1188 - source = "registry+https://github.com/rust-lang/crates.io-index" 1189 - checksum = "b2a4719bff48cee6b39d12c020eeb490953ad2443b7055bd0b21fca26bd8c28b" 1190 - dependencies = [ 1191 - "libc", 1192 - ] 1193 - 1194 - [[package]] 1195 - name = "slab" 1196 - version = "0.4.11" 1197 - source = "registry+https://github.com/rust-lang/crates.io-index" 1198 - checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" 1199 - 1200 - [[package]] 1201 452 name = "smallvec" 1202 453 version = "1.15.1" 1203 454 source = "registry+https://github.com/rust-lang/crates.io-index" 1204 455 checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" 1205 456 1206 457 [[package]] 1207 - name = "socket2" 1208 - version = "0.6.0" 458 + name = "spin" 459 + version = "0.9.8" 1209 460 source = "registry+https://github.com/rust-lang/crates.io-index" 1210 - checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" 461 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1211 462 dependencies = [ 1212 - "libc", 1213 - "windows-sys 0.59.0", 463 + "lock_api", 1214 464 ] 1215 465 1216 466 [[package]] ··· 1221 471 1222 472 [[package]] 1223 473 name = "syn" 1224 - version = "1.0.109" 1225 - source = "registry+https://github.com/rust-lang/crates.io-index" 1226 - checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" 1227 - dependencies = [ 1228 - "proc-macro2", 1229 - "quote", 1230 - "unicode-ident", 1231 - ] 1232 - 1233 - [[package]] 1234 - name = "syn" 1235 474 version = "2.0.106" 1236 475 source = "registry+https://github.com/rust-lang/crates.io-index" 1237 476 checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" ··· 1251 490 "getrandom", 1252 491 "once_cell", 1253 492 "rustix", 1254 - "windows-sys 0.60.2", 493 + "windows-sys", 1255 494 ] 1256 495 1257 496 [[package]] 1258 - name = "thiserror" 1259 - version = "1.0.69" 1260 - source = "registry+https://github.com/rust-lang/crates.io-index" 1261 - checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" 1262 - dependencies = [ 1263 - "thiserror-impl 1.0.69", 1264 - ] 1265 - 1266 - [[package]] 1267 - name = "thiserror" 1268 - version = "2.0.17" 1269 - source = "registry+https://github.com/rust-lang/crates.io-index" 1270 - checksum = "f63587ca0f12b72a0600bcba1d40081f830876000bb46dd2337a3051618f4fc8" 1271 - dependencies = [ 1272 - "thiserror-impl 2.0.17", 1273 - ] 1274 - 1275 - [[package]] 1276 - name = "thiserror-impl" 1277 - version = "1.0.69" 497 + name = "twox-hash" 498 + version = "2.1.2" 1278 499 source = "registry+https://github.com/rust-lang/crates.io-index" 1279 - checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" 1280 - dependencies = [ 1281 - "proc-macro2", 1282 - "quote", 1283 - "syn 2.0.106", 1284 - ] 1285 - 1286 - [[package]] 1287 - name = "thiserror-impl" 1288 - version = "2.0.17" 1289 - source = "registry+https://github.com/rust-lang/crates.io-index" 1290 - checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" 1291 - dependencies = [ 1292 - "proc-macro2", 1293 - "quote", 1294 - "syn 2.0.106", 1295 - ] 1296 - 1297 - [[package]] 1298 - name = "tinytemplate" 1299 - version = "1.2.1" 1300 - source = "registry+https://github.com/rust-lang/crates.io-index" 1301 - checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" 1302 - dependencies = [ 1303 - "serde", 1304 - "serde_json", 1305 - ] 1306 - 1307 - [[package]] 1308 - name = "tokio" 1309 - version = "1.47.1" 1310 - source = "registry+https://github.com/rust-lang/crates.io-index" 1311 - checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" 1312 - dependencies = [ 1313 - "backtrace", 1314 - "bytes", 1315 - "io-uring", 1316 - "libc", 1317 - "mio", 1318 - "parking_lot", 1319 - "pin-project-lite", 1320 - "signal-hook-registry", 1321 - "slab", 1322 - "socket2", 1323 - "tokio-macros", 1324 - "windows-sys 0.59.0", 1325 - ] 1326 - 1327 - [[package]] 1328 - name = "tokio-macros" 1329 - version = "2.5.0" 1330 - source = "registry+https://github.com/rust-lang/crates.io-index" 1331 - checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" 1332 - dependencies = [ 1333 - "proc-macro2", 1334 - "quote", 1335 - "syn 2.0.106", 1336 - ] 1337 - 1338 - [[package]] 1339 - name = "typenum" 1340 - version = "1.19.0" 1341 - source = "registry+https://github.com/rust-lang/crates.io-index" 1342 - checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" 500 + checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" 1343 501 1344 502 [[package]] 1345 503 name = "unicode-ident" ··· 1348 506 checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" 1349 507 1350 508 [[package]] 1351 - name = "unsigned-varint" 1352 - version = "0.7.2" 1353 - source = "registry+https://github.com/rust-lang/crates.io-index" 1354 - checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" 1355 - 1356 - [[package]] 1357 - name = "unsigned-varint" 1358 - version = "0.8.0" 1359 - source = "registry+https://github.com/rust-lang/crates.io-index" 1360 - checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1361 - 1362 - [[package]] 1363 - name = "unty" 1364 - version = "0.0.4" 1365 - source = "registry+https://github.com/rust-lang/crates.io-index" 1366 - checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 1367 - 1368 - [[package]] 1369 509 name = "utf8parse" 1370 510 version = "0.2.2" 1371 511 source = "registry+https://github.com/rust-lang/crates.io-index" 1372 512 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1373 513 1374 514 [[package]] 1375 - name = "vcpkg" 1376 - version = "0.2.15" 1377 - source = "registry+https://github.com/rust-lang/crates.io-index" 1378 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1379 - 1380 - [[package]] 1381 - name = "version_check" 1382 - version = "0.9.5" 1383 - source = "registry+https://github.com/rust-lang/crates.io-index" 1384 - checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1385 - 1386 - [[package]] 1387 - name = "virtue" 1388 - version = "0.0.18" 1389 - source = "registry+https://github.com/rust-lang/crates.io-index" 1390 - checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" 1391 - 1392 - [[package]] 1393 - name = "walkdir" 1394 - version = "2.5.0" 1395 - source = "registry+https://github.com/rust-lang/crates.io-index" 1396 - checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" 1397 - dependencies = [ 1398 - "same-file", 1399 - "winapi-util", 1400 - ] 1401 - 1402 - [[package]] 1403 - name = "wasi" 1404 - version = "0.11.1+wasi-snapshot-preview1" 515 + name = "varint-rs" 516 + version = "2.2.0" 1405 517 source = "registry+https://github.com/rust-lang/crates.io-index" 1406 - checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" 518 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1407 519 1408 520 [[package]] 1409 521 name = "wasi" ··· 1424 536 ] 1425 537 1426 538 [[package]] 1427 - name = "wasm-bindgen" 1428 - version = "0.2.104" 1429 - source = "registry+https://github.com/rust-lang/crates.io-index" 1430 - checksum = "c1da10c01ae9f1ae40cbfac0bac3b1e724b320abfcf52229f80b547c0d250e2d" 1431 - dependencies = [ 1432 - "cfg-if", 1433 - "once_cell", 1434 - "rustversion", 1435 - "wasm-bindgen-macro", 1436 - "wasm-bindgen-shared", 1437 - ] 1438 - 1439 - [[package]] 1440 - name = "wasm-bindgen-backend" 1441 - version = "0.2.104" 1442 - source = "registry+https://github.com/rust-lang/crates.io-index" 1443 - checksum = "671c9a5a66f49d8a47345ab942e2cb93c7d1d0339065d4f8139c486121b43b19" 1444 - dependencies = [ 1445 - "bumpalo", 1446 - "log", 1447 - "proc-macro2", 1448 - "quote", 1449 - "syn 2.0.106", 1450 - "wasm-bindgen-shared", 1451 - ] 1452 - 1453 - [[package]] 1454 - name = "wasm-bindgen-macro" 1455 - version = "0.2.104" 1456 - source = "registry+https://github.com/rust-lang/crates.io-index" 1457 - checksum = "7ca60477e4c59f5f2986c50191cd972e3a50d8a95603bc9434501cf156a9a119" 1458 - dependencies = [ 1459 - "quote", 1460 - "wasm-bindgen-macro-support", 1461 - ] 1462 - 1463 - [[package]] 1464 - name = "wasm-bindgen-macro-support" 1465 - version = "0.2.104" 1466 - source = "registry+https://github.com/rust-lang/crates.io-index" 1467 - checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7" 1468 - dependencies = [ 1469 - "proc-macro2", 1470 - "quote", 1471 - "syn 2.0.106", 1472 - "wasm-bindgen-backend", 1473 - "wasm-bindgen-shared", 1474 - ] 1475 - 1476 - [[package]] 1477 - name = "wasm-bindgen-shared" 1478 - version = "0.2.104" 1479 - source = "registry+https://github.com/rust-lang/crates.io-index" 1480 - checksum = "bad67dc8b2a1a6e5448428adec4c3e84c43e561d8c9ee8a9e5aabeb193ec41d1" 1481 - dependencies = [ 1482 - "unicode-ident", 1483 - ] 1484 - 1485 - [[package]] 1486 - name = "web-sys" 1487 - version = "0.3.81" 1488 - source = "registry+https://github.com/rust-lang/crates.io-index" 1489 - checksum = "9367c417a924a74cae129e6a2ae3b47fabb1f8995595ab474029da749a8be120" 1490 - dependencies = [ 1491 - "js-sys", 1492 - "wasm-bindgen", 1493 - ] 1494 - 1495 - [[package]] 1496 - name = "winapi-util" 1497 - version = "0.1.11" 1498 - source = "registry+https://github.com/rust-lang/crates.io-index" 1499 - checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" 1500 - dependencies = [ 1501 - "windows-sys 0.60.2", 1502 - ] 1503 - 1504 - [[package]] 1505 539 name = "windows-link" 1506 540 version = "0.2.1" 1507 541 source = "registry+https://github.com/rust-lang/crates.io-index" 1508 542 checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" 1509 - 1510 - [[package]] 1511 - name = "windows-sys" 1512 - version = "0.59.0" 1513 - source = "registry+https://github.com/rust-lang/crates.io-index" 1514 - checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" 1515 - dependencies = [ 1516 - "windows-targets 0.52.6", 1517 - ] 1518 543 1519 544 [[package]] 1520 545 name = "windows-sys" ··· 1522 547 source = "registry+https://github.com/rust-lang/crates.io-index" 1523 548 checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" 1524 549 dependencies = [ 1525 - "windows-targets 0.53.5", 1526 - ] 1527 - 1528 - [[package]] 1529 - name = "windows-targets" 1530 - version = "0.52.6" 1531 - source = "registry+https://github.com/rust-lang/crates.io-index" 1532 - checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" 1533 - dependencies = [ 1534 - "windows_aarch64_gnullvm 0.52.6", 1535 - "windows_aarch64_msvc 0.52.6", 1536 - "windows_i686_gnu 0.52.6", 1537 - "windows_i686_gnullvm 0.52.6", 1538 - "windows_i686_msvc 0.52.6", 1539 - "windows_x86_64_gnu 0.52.6", 1540 - "windows_x86_64_gnullvm 0.52.6", 1541 - "windows_x86_64_msvc 0.52.6", 550 + "windows-targets", 1542 551 ] 1543 552 1544 553 [[package]] ··· 1548 557 checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" 1549 558 dependencies = [ 1550 559 "windows-link", 1551 - "windows_aarch64_gnullvm 0.53.1", 1552 - "windows_aarch64_msvc 0.53.1", 1553 - "windows_i686_gnu 0.53.1", 1554 - "windows_i686_gnullvm 0.53.1", 1555 - "windows_i686_msvc 0.53.1", 1556 - "windows_x86_64_gnu 0.53.1", 1557 - "windows_x86_64_gnullvm 0.53.1", 1558 - "windows_x86_64_msvc 0.53.1", 560 + "windows_aarch64_gnullvm", 561 + "windows_aarch64_msvc", 562 + "windows_i686_gnu", 563 + "windows_i686_gnullvm", 564 + "windows_i686_msvc", 565 + "windows_x86_64_gnu", 566 + "windows_x86_64_gnullvm", 567 + "windows_x86_64_msvc", 1559 568 ] 1560 569 1561 570 [[package]] 1562 571 name = "windows_aarch64_gnullvm" 1563 - version = "0.52.6" 1564 - source = "registry+https://github.com/rust-lang/crates.io-index" 1565 - checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" 1566 - 1567 - [[package]] 1568 - name = "windows_aarch64_gnullvm" 1569 572 version = "0.53.1" 1570 573 source = "registry+https://github.com/rust-lang/crates.io-index" 1571 574 checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" 1572 575 1573 576 [[package]] 1574 577 name = "windows_aarch64_msvc" 1575 - version = "0.52.6" 1576 - source = "registry+https://github.com/rust-lang/crates.io-index" 1577 - checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" 1578 - 1579 - [[package]] 1580 - name = "windows_aarch64_msvc" 1581 578 version = "0.53.1" 1582 579 source = "registry+https://github.com/rust-lang/crates.io-index" 1583 580 checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" 1584 581 1585 582 [[package]] 1586 583 name = "windows_i686_gnu" 1587 - version = "0.52.6" 1588 - source = "registry+https://github.com/rust-lang/crates.io-index" 1589 - checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" 1590 - 1591 - [[package]] 1592 - name = "windows_i686_gnu" 1593 584 version = "0.53.1" 1594 585 source = "registry+https://github.com/rust-lang/crates.io-index" 1595 586 checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" 1596 587 1597 588 [[package]] 1598 589 name = "windows_i686_gnullvm" 1599 - version = "0.52.6" 1600 - source = "registry+https://github.com/rust-lang/crates.io-index" 1601 - checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" 1602 - 1603 - [[package]] 1604 - name = "windows_i686_gnullvm" 1605 590 version = "0.53.1" 1606 591 source = "registry+https://github.com/rust-lang/crates.io-index" 1607 592 checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" 1608 593 1609 594 [[package]] 1610 595 name = "windows_i686_msvc" 1611 - version = "0.52.6" 1612 - source = "registry+https://github.com/rust-lang/crates.io-index" 1613 - checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" 1614 - 1615 - [[package]] 1616 - name = "windows_i686_msvc" 1617 596 version = "0.53.1" 1618 597 source = "registry+https://github.com/rust-lang/crates.io-index" 1619 598 checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" 1620 599 1621 600 [[package]] 1622 601 name = "windows_x86_64_gnu" 1623 - version = "0.52.6" 1624 - source = "registry+https://github.com/rust-lang/crates.io-index" 1625 - checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" 1626 - 1627 - [[package]] 1628 - name = "windows_x86_64_gnu" 1629 602 version = "0.53.1" 1630 603 source = "registry+https://github.com/rust-lang/crates.io-index" 1631 604 checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" 1632 605 1633 606 [[package]] 1634 607 name = "windows_x86_64_gnullvm" 1635 - version = "0.52.6" 1636 - source = "registry+https://github.com/rust-lang/crates.io-index" 1637 - checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" 1638 - 1639 - [[package]] 1640 - name = "windows_x86_64_gnullvm" 1641 608 version = "0.53.1" 1642 609 source = "registry+https://github.com/rust-lang/crates.io-index" 1643 610 checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" 1644 - 1645 - [[package]] 1646 - name = "windows_x86_64_msvc" 1647 - version = "0.52.6" 1648 - source = "registry+https://github.com/rust-lang/crates.io-index" 1649 - checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" 1650 611 1651 612 [[package]] 1652 613 name = "windows_x86_64_msvc" ··· 1661 622 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1662 623 1663 624 [[package]] 1664 - name = "zerocopy" 1665 - version = "0.8.27" 1666 - source = "registry+https://github.com/rust-lang/crates.io-index" 1667 - checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c" 1668 - dependencies = [ 1669 - "zerocopy-derive", 1670 - ] 1671 - 1672 - [[package]] 1673 - name = "zerocopy-derive" 1674 - version = "0.8.27" 625 + name = "xxhash-rust" 626 + version = "0.8.15" 1675 627 source = "registry+https://github.com/rust-lang/crates.io-index" 1676 - checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" 1677 - dependencies = [ 1678 - "proc-macro2", 1679 - "quote", 1680 - "syn 2.0.106", 1681 - ] 628 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
+2 -36
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.2.0" 3 + version = "0.2.2" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 6 description = "A robust CAR file -> MST walker for atproto" 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 - bincode = { version = "2.0.1", features = ["serde"] } 11 - futures = "0.3.31" 12 - futures-core = "0.3.31" 13 - ipld-core = { version = "0.4.2", features = ["serde"] } 14 - iroh-car = "0.5.1" 15 - log = "0.4.28" 16 - multibase = "0.9.2" 17 - rusqlite = "0.37.0" 18 - serde = { version = "1.0.228", features = ["derive"] } 19 - serde_bytes = "0.11.19" 20 - serde_ipld_dagcbor = "0.6.4" 21 - sha2 = "0.10.9" 22 - thiserror = "2.0.17" 23 - tokio = { version = "1.47.1", features = ["rt", "sync"] } 24 - 25 - [dev-dependencies] 10 + fjall = "3.0.1" 26 11 clap = { version = "4.5.48", features = ["derive"] } 27 - criterion = { version = "0.7.0", features = ["async_tokio"] } 28 - env_logger = "0.11.8" 29 - multibase = "0.9.2" 30 - tempfile = "3.23.0" 31 - tokio = { version = "1.47.1", features = ["full"] } 32 12 33 - [profile.profiling] 34 - inherits = "release" 35 - debug = true 36 - 37 - # [profile.release] 38 - # debug = true 39 - 40 - [[bench]] 41 - name = "non-huge-cars" 42 - harness = false 43 - 44 - [[bench]] 45 - name = "huge-car" 46 - harness = false
+4
benches/non-huge-cars.rs
··· 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 13 14 .build() 14 15 .expect("Creating runtime failed"); 15 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 16 20 c.bench_function("tiny-car", |b| { 17 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 18 22 });
car-samples/empty.car

This is a binary file and will not be displayed.

+26 -74
examples/disk-read-file/main.rs
··· 1 - /*! 2 - Read a CAR file by spilling to disk 3 - */ 4 - 5 - extern crate repo_stream; 6 1 use clap::Parser; 7 - use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 - use std::path::PathBuf; 2 + use fjall::{Database, KeyspaceCreateOptions}; 3 + use std::{path::PathBuf, collections::BTreeMap}; 9 4 10 5 #[derive(Debug, Parser)] 11 6 struct Args { 12 7 #[arg()] 13 - car: PathBuf, 14 - #[arg()] 15 - tmpfile: PathBuf, 8 + db_path: PathBuf, 16 9 } 17 10 18 - #[tokio::main] 19 - async fn main() -> Result<(), Box<dyn std::error::Error>> { 20 - env_logger::init(); 21 - 22 - let Args { car, tmpfile } = Args::parse(); 23 - 24 - // repo-stream takes an AsyncRead as input. wrapping a filesystem read in 25 - // BufReader can provide a really significant performance win. 26 - let reader = tokio::fs::File::open(car).await?; 27 - let reader = tokio::io::BufReader::new(reader); 28 - 29 - log::info!("hello! reading the car..."); 30 - 31 - // in this example we only bother handling CARs that are too big for memory 32 - // `noop` helper means: do no block processing, store the raw blocks 33 - let driver = match DriverBuilder::new() 34 - .with_mem_limit_mb(10) // how much memory can be used before disk spill 35 - .load_car(reader) 36 - .await? 37 - { 38 - Driver::Memory(_, _) => panic!("try this on a bigger car"), 39 - Driver::Disk(big_stuff) => { 40 - // we reach here if the repo was too big and needs to be spilled to 41 - // disk to continue 42 - 43 - // set up a disk store we can spill to 44 - let disk_store = DiskBuilder::new().open(tmpfile).await?; 45 - 46 - // do the spilling, get back a (similar) driver 47 - let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 48 - 49 - // at this point you might want to fetch the account's signing key 50 - // via the DID from the commit, and then verify the signature. 51 - log::warn!("big's comit: {:?}", commit); 52 - 53 - // pop the driver back out to get some code indentation relief 54 - driver 55 - } 56 - }; 11 + fn main() -> Result<(), Box<dyn std::error::Error>> { 12 + let Args { db_path } = Args::parse(); 57 13 58 - // collect some random stats about the blocks 59 - let mut n = 0; 60 - let mut zeros = 0; 14 + let db = Database::builder(db_path).open()?; 15 + let ks = db.keyspace("z", KeyspaceCreateOptions::default)?; 16 + let mut seen_keys: BTreeMap<Vec<u8>, usize> = BTreeMap::default(); 61 17 62 - log::info!("walking..."); 18 + print!("writing..."); 19 + for i in 0..250_000_usize { 20 + let k = i.to_be_bytes().to_vec(); 21 + ks.insert(k.clone(), vec![0xAA; 256])?; 22 + seen_keys.insert(k, i); 23 + } 63 24 64 - // this example uses the disk driver's channel mode: the tree walking is 65 - // spawned onto a blocking thread, and we get chunks of rkey+blocks back 66 - let (mut rx, join) = driver.to_channel(512); 67 - while let Some(r) = rx.recv().await { 68 - let pairs = r?; 25 + println!(" done. checking keys..."); 69 26 70 - // keep a count of the total number of blocks seen 71 - n += pairs.len(); 27 + // remove every seen key that fjall actually has, to see what's left 28 + for guard in ks.iter() { 29 + seen_keys.remove(guard.key()?.as_ref()); 30 + } 72 31 73 - for (_, block) in pairs { 74 - // for each block, count how many bytes are equal to '0' 75 - // (this is just an example, you probably want to do something more 76 - // interesting) 77 - zeros += block.into_iter().filter(|&b| b == b'0').count() 32 + // report the result 33 + if seen_keys.len() == 0 { 34 + println!("[ OK ] all keys found"); 35 + } else { 36 + println!("[FAIL] fjall did not have all seen_keys:"); 37 + for (k, i) in seen_keys { 38 + println!(" insert #{i} missing, key bytes: {k:?}"); 78 39 } 79 40 } 80 - 81 - log::info!("arrived! joining rx..."); 82 - 83 - // clean up the database. would be nice to do this in drop so it happens 84 - // automatically, but some blocking work happens, so that's not allowed in 85 - // async rust. ๐Ÿคทโ€โ™€๏ธ 86 - join.await?.reset_store().await?; 87 - 88 - log::info!("done. n={n} zeros={zeros}"); 89 41 90 42 Ok(()) 91 43 }
+53 -2
readme.md
··· 4 4 5 5 [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 6 [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 7 8 8 9 [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 9 10 [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 10 12 13 + ```rust 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 11 15 12 - todo 16 + #[tokio::main] 17 + async fn main() -> Result<(), DriveError> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car".into()).await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 + 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 28 + .load_car(reader) 29 + .await? 30 + { 31 + 32 + // if all blocks fit within memory 33 + Driver::Memory(_commit, mut driver) => { 34 + while let Some(chunk) = driver.next_chunk(256).await? { 35 + for (_rkey, size) in chunk { 36 + total_size += size; 37 + } 38 + } 39 + }, 40 + 41 + // if the CAR was too big for in-memory processing 42 + Driver::Disk(paused) => { 43 + // set up a disk store we can spill to 44 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 45 + // do the spilling, get back a (similar) driver 46 + let (_commit, mut driver) = paused.finish_loading(store).await?; 47 + 48 + while let Some(chunk) = driver.next_chunk(256).await? { 49 + for (_rkey, size) in chunk { 50 + total_size += size; 51 + } 52 + } 53 + 54 + // clean up the disk store (drop tables etc) 55 + driver.reset_store().await?; 56 + } 57 + }; 58 + println!("sum of size of all records: {total_size}"); 59 + Ok(()) 60 + } 61 + ``` 62 + 63 + more recent todo 13 64 14 65 - [ ] get an *emtpy* car for the test suite 15 - - [ ] implement a max size on disk limit 66 + - [x] implement a max size on disk limit 16 67 17 68 18 69 -----
-220
src/disk.rs
··· 1 - /*! 2 - Disk storage for blocks on disk 3 - 4 - Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5 - to be the best behaved in terms of both on-disk space usage and memory usage. 6 - 7 - ```no_run 8 - # use repo_stream::{DiskBuilder, DiskError}; 9 - # #[tokio::main] 10 - # async fn main() -> Result<(), DiskError> { 11 - let store = DiskBuilder::new() 12 - .with_cache_size_mb(32) 13 - .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 - .open("/some/path.db".into()).await?; 15 - # Ok(()) 16 - # } 17 - ``` 18 - */ 19 - 20 - use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 22 - use std::path::PathBuf; 23 - 24 - #[derive(Debug, thiserror::Error)] 25 - pub enum DiskError { 26 - /// A wrapped database error 27 - /// 28 - /// (The wrapped err should probably be obscured to remove public-facing 29 - /// sqlite bits) 30 - #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 32 - /// A tokio blocking task failed to join 33 - #[error("Failed to join a tokio blocking task: {0}")] 34 - JoinError(#[from] tokio::task::JoinError), 35 - /// The total size of stored blocks exceeded the allowed size 36 - /// 37 - /// If you need to process *really* big CARs, you can configure a higher 38 - /// limit. 39 - #[error("Maximum disk size reached")] 40 - MaxSizeExceeded, 41 - #[error("this error was replaced, seeing this is a bug.")] 42 - #[doc(hidden)] 43 - Stolen, 44 - } 45 - 46 - impl DiskError { 47 - /// hack for ownership challenges with the disk driver 48 - pub(crate) fn steal(&mut self) -> Self { 49 - let mut swapped = DiskError::Stolen; 50 - std::mem::swap(self, &mut swapped); 51 - swapped 52 - } 53 - } 54 - 55 - /// Builder-style disk store setup 56 - pub struct DiskBuilder { 57 - /// Database in-memory cache allowance 58 - /// 59 - /// Default: 32 MiB 60 - pub cache_size_mb: usize, 61 - /// Database stored block size limit 62 - /// 63 - /// Default: 10 GiB 64 - /// 65 - /// Note: actual size on disk may be more, but should approximately scale 66 - /// with this limit 67 - pub max_stored_mb: usize, 68 - } 69 - 70 - impl Default for DiskBuilder { 71 - fn default() -> Self { 72 - Self { 73 - cache_size_mb: 32, 74 - max_stored_mb: 10 * 1024, // 10 GiB 75 - } 76 - } 77 - } 78 - 79 - impl DiskBuilder { 80 - /// Begin configuring the storage with defaults 81 - pub fn new() -> Self { 82 - Default::default() 83 - } 84 - /// Set the in-memory cache allowance for the database 85 - /// 86 - /// Default: 32 MiB 87 - pub fn with_cache_size_mb(mut self, size: usize) -> Self { 88 - self.cache_size_mb = size; 89 - self 90 - } 91 - /// Set the approximate stored block size limit 92 - /// 93 - /// Default: 10 GiB 94 - pub fn with_max_stored_mb(mut self, max: usize) -> Self { 95 - self.max_stored_mb = max; 96 - self 97 - } 98 - /// Open and initialize the actual disk storage 99 - pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 - DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 101 - } 102 - } 103 - 104 - /// On-disk block storage 105 - pub struct DiskStore { 106 - conn: rusqlite::Connection, 107 - max_stored: usize, 108 - stored: usize, 109 - } 110 - 111 - impl DiskStore { 112 - /// Initialize a new disk store 113 - pub async fn new( 114 - path: PathBuf, 115 - cache_mb: usize, 116 - max_stored_mb: usize, 117 - ) -> Result<Self, DiskError> { 118 - let max_stored = max_stored_mb * 2_usize.pow(20); 119 - let conn = tokio::task::spawn_blocking(move || { 120 - let conn = rusqlite::Connection::open(path)?; 121 - 122 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 123 - 124 - // conn.pragma_update(None, "journal_mode", "OFF")?; 125 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 126 - conn.pragma_update(None, "journal_mode", "WAL")?; 127 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 128 - conn.pragma_update(None, "synchronous", "OFF")?; 129 - conn.pragma_update( 130 - None, 131 - "cache_size", 132 - (cache_mb as i64 * sqlite_one_mb).to_string(), 133 - )?; 134 - Self::reset_tables(&conn)?; 135 - 136 - Ok::<_, DiskError>(conn) 137 - }) 138 - .await??; 139 - 140 - Ok(Self { 141 - conn, 142 - max_stored, 143 - stored: 0, 144 - }) 145 - } 146 - pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 147 - let tx = self.conn.transaction()?; 148 - Ok(SqliteWriter { 149 - tx, 150 - stored: &mut self.stored, 151 - max: self.max_stored, 152 - }) 153 - } 154 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 155 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 156 - Ok(SqliteReader { select_stmt }) 157 - } 158 - /// Drop and recreate the kv table 159 - pub async fn reset(self) -> Result<Self, DiskError> { 160 - tokio::task::spawn_blocking(move || { 161 - Self::reset_tables(&self.conn)?; 162 - Ok(self) 163 - }) 164 - .await? 165 - } 166 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 167 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 168 - conn.execute( 169 - "CREATE TABLE blocks ( 170 - key BLOB PRIMARY KEY NOT NULL, 171 - val BLOB NOT NULL 172 - ) WITHOUT ROWID", 173 - (), 174 - )?; 175 - Ok(()) 176 - } 177 - } 178 - 179 - pub(crate) struct SqliteWriter<'conn> { 180 - tx: rusqlite::Transaction<'conn>, 181 - stored: &'conn mut usize, 182 - max: usize, 183 - } 184 - 185 - impl SqliteWriter<'_> { 186 - pub(crate) fn put_many( 187 - &mut self, 188 - kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 189 - ) -> Result<(), DriveError> { 190 - let mut insert_stmt = self 191 - .tx 192 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 193 - .map_err(DiskError::DbError)?; 194 - for pair in kv { 195 - let (k, v) = pair?; 196 - *self.stored += v.len(); 197 - if *self.stored > self.max { 198 - return Err(DiskError::MaxSizeExceeded.into()); 199 - } 200 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 201 - } 202 - Ok(()) 203 - } 204 - pub fn commit(self) -> Result<(), DiskError> { 205 - self.tx.commit()?; 206 - Ok(()) 207 - } 208 - } 209 - 210 - pub(crate) struct SqliteReader<'conn> { 211 - select_stmt: rusqlite::Statement<'conn>, 212 - } 213 - 214 - impl SqliteReader<'_> { 215 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 216 - self.select_stmt 217 - .query_one((&key,), |row| row.get(0)) 218 - .optional() 219 - } 220 - }
-629
src/drive.rs
··· 1 - //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 - 3 - use crate::disk::{DiskError, DiskStore}; 4 - use crate::process::Processable; 5 - use ipld_core::cid::Cid; 6 - use iroh_car::CarReader; 7 - use serde::{Deserialize, Serialize}; 8 - use std::collections::HashMap; 9 - use std::convert::Infallible; 10 - use tokio::{io::AsyncRead, sync::mpsc}; 11 - 12 - use crate::mst::{Commit, Node}; 13 - use crate::walk::{Step, WalkError, Walker}; 14 - 15 - /// Errors that can happen while consuming and emitting blocks and records 16 - #[derive(Debug, thiserror::Error)] 17 - pub enum DriveError { 18 - #[error("Error from iroh_car: {0}")] 19 - CarReader(#[from] iroh_car::Error), 20 - #[error("Failed to decode commit block: {0}")] 21 - BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 22 - #[error("The Commit block reference by the root was not found")] 23 - MissingCommit, 24 - #[error("The MST block {0} could not be found")] 25 - MissingBlock(Cid), 26 - #[error("Failed to walk the mst tree: {0}")] 27 - WalkError(#[from] WalkError), 28 - #[error("CAR file had no roots")] 29 - MissingRoot, 30 - #[error("Storage error")] 31 - StorageError(#[from] DiskError), 32 - #[error("Encode error: {0}")] 33 - BincodeEncodeError(#[from] bincode::error::EncodeError), 34 - #[error("Tried to send on a closed channel")] 35 - ChannelSendError, // SendError takes <T> which we don't need 36 - #[error("Failed to join a task: {0}")] 37 - JoinError(#[from] tokio::task::JoinError), 38 - } 39 - 40 - #[derive(Debug, thiserror::Error)] 41 - pub enum DecodeError { 42 - #[error(transparent)] 43 - BincodeDecodeError(#[from] bincode::error::DecodeError), 44 - #[error("extra bytes remained after decoding")] 45 - ExtraGarbage, 46 - } 47 - 48 - /// An in-order chunk of Rkey + (processed) Block pairs 49 - pub type BlockChunk<T> = Vec<(String, T)>; 50 - 51 - #[derive(Debug, Clone, Serialize, Deserialize)] 52 - pub(crate) enum MaybeProcessedBlock<T> { 53 - /// A block that's *probably* a Node (but we can't know yet) 54 - /// 55 - /// It *can be* a record that suspiciously looks a lot like a node, so we 56 - /// cannot eagerly turn it into a Node. We only know for sure what it is 57 - /// when we actually walk down the MST 58 - Raw(Vec<u8>), 59 - /// A processed record from a block that was definitely not a Node 60 - /// 61 - /// Processing has to be fallible because the CAR can have totally-unused 62 - /// blocks, which can just be garbage. since we're eagerly trying to process 63 - /// record blocks without knowing for sure that they *are* records, we 64 - /// discard any definitely-not-nodes that fail processing and keep their 65 - /// error in the buffer for them. if we later try to retreive them as a 66 - /// record, then we can surface the error. 67 - /// 68 - /// If we _never_ needed this block, then we may have wasted a bit of effort 69 - /// trying to process it. Oh well. 70 - /// 71 - /// There's an alternative here, which would be to kick unprocessable blocks 72 - /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 73 - /// surface the typed error later if needed by trying to reprocess. 74 - Processed(T), 75 - } 76 - 77 - impl<T: Processable> Processable for MaybeProcessedBlock<T> { 78 - /// TODO this is probably a little broken 79 - fn get_size(&self) -> usize { 80 - use std::{cmp::max, mem::size_of}; 81 - 82 - // enum is always as big as its biggest member? 83 - let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 84 - 85 - let extra = match self { 86 - Self::Raw(bytes) => bytes.len(), 87 - Self::Processed(t) => t.get_size(), 88 - }; 89 - 90 - base_size + extra 91 - } 92 - } 93 - 94 - impl<T> MaybeProcessedBlock<T> { 95 - fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 - if Node::could_be(&data) { 97 - MaybeProcessedBlock::Raw(data) 98 - } else { 99 - MaybeProcessedBlock::Processed(process(data)) 100 - } 101 - } 102 - } 103 - 104 - /// Read a CAR file, buffering blocks in memory or to disk 105 - pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 106 - /// All blocks fit within the memory limit 107 - /// 108 - /// You probably want to check the commit's signature. You can go ahead and 109 - /// walk the MST right away. 110 - Memory(Commit, MemDriver<T>), 111 - /// Blocks exceed the memory limit 112 - /// 113 - /// You'll need to provide a disk storage to continue. The commit will be 114 - /// returned and can be validated only once all blocks are loaded. 115 - Disk(NeedDisk<R, T>), 116 - } 117 - 118 - /// Builder-style driver setup 119 - pub struct DriverBuilder { 120 - pub mem_limit_mb: usize, 121 - } 122 - 123 - impl Default for DriverBuilder { 124 - fn default() -> Self { 125 - Self { mem_limit_mb: 16 } 126 - } 127 - } 128 - 129 - impl DriverBuilder { 130 - /// Begin configuring the driver with defaults 131 - pub fn new() -> Self { 132 - Default::default() 133 - } 134 - /// Set the in-memory size limit, in MiB 135 - /// 136 - /// Default: 16 MiB 137 - pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 138 - Self { 139 - mem_limit_mb: new_limit, 140 - } 141 - } 142 - /// Set the block processor 143 - /// 144 - /// Default: noop, raw blocks will be emitted 145 - pub fn with_block_processor<T: Processable>( 146 - self, 147 - p: fn(Vec<u8>) -> T, 148 - ) -> DriverBuilderWithProcessor<T> { 149 - DriverBuilderWithProcessor { 150 - mem_limit_mb: self.mem_limit_mb, 151 - block_processor: p, 152 - } 153 - } 154 - /// Begin processing an atproto MST from a CAR file 155 - pub async fn load_car<R: AsyncRead + Unpin>( 156 - self, 157 - reader: R, 158 - ) -> Result<Driver<R, Vec<u8>>, DriveError> { 159 - Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 160 - } 161 - } 162 - 163 - /// Builder-style driver intermediate step 164 - /// 165 - /// start from `DriverBuilder` 166 - pub struct DriverBuilderWithProcessor<T: Processable> { 167 - pub mem_limit_mb: usize, 168 - pub block_processor: fn(Vec<u8>) -> T, 169 - } 170 - 171 - impl<T: Processable> DriverBuilderWithProcessor<T> { 172 - /// Set the in-memory size limit, in MiB 173 - /// 174 - /// Default: 16 MiB 175 - pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 176 - self.mem_limit_mb = new_limit; 177 - self 178 - } 179 - /// Begin processing an atproto MST from a CAR file 180 - pub async fn load_car<R: AsyncRead + Unpin>( 181 - self, 182 - reader: R, 183 - ) -> Result<Driver<R, T>, DriveError> { 184 - Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 185 - } 186 - } 187 - 188 - impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 189 - /// Begin processing an atproto MST from a CAR file 190 - /// 191 - /// Blocks will be loaded, processed, and buffered in memory. If the entire 192 - /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 193 - /// will be returned along with a `Commit` ready for validation. 194 - /// 195 - /// If the `mem_limit_mb` limit is reached before loading all blocks, the 196 - /// partial state will be returned as `Driver::Disk(needed)`, which can be 197 - /// resumed by providing a `SqliteStorage` for on-disk block storage. 198 - pub async fn load_car( 199 - reader: R, 200 - process: fn(Vec<u8>) -> T, 201 - mem_limit_mb: usize, 202 - ) -> Result<Driver<R, T>, DriveError> { 203 - let max_size = mem_limit_mb * 2_usize.pow(20); 204 - let mut mem_blocks = HashMap::new(); 205 - 206 - let mut car = CarReader::new(reader).await?; 207 - 208 - let root = *car 209 - .header() 210 - .roots() 211 - .first() 212 - .ok_or(DriveError::MissingRoot)?; 213 - log::debug!("root: {root:?}"); 214 - 215 - let mut commit = None; 216 - 217 - // try to load all the blocks into memory 218 - let mut mem_size = 0; 219 - while let Some((cid, data)) = car.next_block().await? { 220 - // the root commit is a Special Third Kind of block that we need to make 221 - // sure not to optimistically send to the processing function 222 - if cid == root { 223 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 224 - commit = Some(c); 225 - continue; 226 - } 227 - 228 - // remaining possible types: node, record, other. optimistically process 229 - let maybe_processed = MaybeProcessedBlock::maybe(process, data); 230 - 231 - // stash (maybe processed) blocks in memory as long as we have room 232 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 233 - mem_blocks.insert(cid, maybe_processed); 234 - if mem_size >= max_size { 235 - return Ok(Driver::Disk(NeedDisk { 236 - car, 237 - root, 238 - process, 239 - max_size, 240 - mem_blocks, 241 - commit, 242 - })); 243 - } 244 - } 245 - 246 - // all blocks loaded and we fit in memory! hopefully we found the commit... 247 - let commit = commit.ok_or(DriveError::MissingCommit)?; 248 - 249 - let walker = Walker::new(commit.data); 250 - 251 - Ok(Driver::Memory( 252 - commit, 253 - MemDriver { 254 - blocks: mem_blocks, 255 - walker, 256 - process, 257 - }, 258 - )) 259 - } 260 - } 261 - 262 - /// The core driver between the block stream and MST walker 263 - /// 264 - /// In the future, PDSs will export CARs in a stream-friendly order that will 265 - /// enable processing them with tiny memory overhead. But that future is not 266 - /// here yet. 267 - /// 268 - /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 269 - /// optimistic stream features: we load all block first, then walk the MST. 270 - /// 271 - /// This makes things much simpler: we only need to worry about spilling to disk 272 - /// in one place, and we always have a reasonable expecatation about how much 273 - /// work the init function will do. We can drop the CAR reader before walking, 274 - /// so the sync/async boundaries become a little easier to work around. 275 - #[derive(Debug)] 276 - pub struct MemDriver<T: Processable> { 277 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 278 - walker: Walker, 279 - process: fn(Vec<u8>) -> T, 280 - } 281 - 282 - impl<T: Processable> MemDriver<T> { 283 - /// Step through the record outputs, in rkey order 284 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 285 - let mut out = Vec::with_capacity(n); 286 - for _ in 0..n { 287 - // walk as far as we can until we run out of blocks or find a record 288 - match self.walker.step(&mut self.blocks, self.process)? { 289 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 290 - Step::Finish => break, 291 - Step::Found { rkey, data } => { 292 - out.push((rkey, data)); 293 - continue; 294 - } 295 - }; 296 - } 297 - 298 - if out.is_empty() { 299 - Ok(None) 300 - } else { 301 - Ok(Some(out)) 302 - } 303 - } 304 - } 305 - 306 - /// A partially memory-loaded car file that needs disk spillover to continue 307 - pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 308 - car: CarReader<R>, 309 - root: Cid, 310 - process: fn(Vec<u8>) -> T, 311 - max_size: usize, 312 - mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 313 - pub commit: Option<Commit>, 314 - } 315 - 316 - fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 317 - bincode::serde::encode_to_vec(v, bincode::config::standard()) 318 - } 319 - 320 - pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 321 - let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 322 - if n != bytes.len() { 323 - return Err(DecodeError::ExtraGarbage); 324 - } 325 - Ok(t) 326 - } 327 - 328 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 329 - pub async fn finish_loading( 330 - mut self, 331 - mut store: DiskStore, 332 - ) -> Result<(Commit, DiskDriver<T>), DriveError> { 333 - // move store in and back out so we can manage lifetimes 334 - // dump mem blocks into the store 335 - store = tokio::task::spawn(async move { 336 - let mut writer = store.get_writer()?; 337 - 338 - let kvs = self 339 - .mem_blocks 340 - .into_iter() 341 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 342 - 343 - writer.put_many(kvs)?; 344 - writer.commit()?; 345 - Ok::<_, DriveError>(store) 346 - }) 347 - .await??; 348 - 349 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 350 - 351 - let store_worker = tokio::task::spawn_blocking(move || { 352 - let mut writer = store.get_writer()?; 353 - 354 - while let Some(chunk) = rx.blocking_recv() { 355 - let kvs = chunk 356 - .into_iter() 357 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 358 - writer.put_many(kvs)?; 359 - } 360 - 361 - writer.commit()?; 362 - Ok::<_, DriveError>(store) 363 - }); // await later 364 - 365 - // dump the rest to disk (in chunks) 366 - log::debug!("dumping the rest of the stream..."); 367 - loop { 368 - let mut mem_size = 0; 369 - let mut chunk = vec![]; 370 - loop { 371 - let Some((cid, data)) = self.car.next_block().await? else { 372 - break; 373 - }; 374 - // we still gotta keep checking for the root since we might not have it 375 - if cid == self.root { 376 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 377 - self.commit = Some(c); 378 - continue; 379 - } 380 - // remaining possible types: node, record, other. optimistically process 381 - // TODO: get the actual in-memory size to compute disk spill 382 - let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 383 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 384 - chunk.push((cid, maybe_processed)); 385 - if mem_size >= self.max_size { 386 - // soooooo if we're setting the db cache to max_size and then letting 387 - // multiple chunks in the queue that are >= max_size, then at any time 388 - // we might be using some multiple of max_size? 389 - break; 390 - } 391 - } 392 - if chunk.is_empty() { 393 - break; 394 - } 395 - tx.send(chunk) 396 - .await 397 - .map_err(|_| DriveError::ChannelSendError)?; 398 - } 399 - drop(tx); 400 - log::debug!("done. waiting for worker to finish..."); 401 - 402 - store = store_worker.await??; 403 - 404 - log::debug!("worker finished."); 405 - 406 - let commit = self.commit.ok_or(DriveError::MissingCommit)?; 407 - 408 - let walker = Walker::new(commit.data); 409 - 410 - Ok(( 411 - commit, 412 - DiskDriver { 413 - process: self.process, 414 - state: Some(BigState { store, walker }), 415 - }, 416 - )) 417 - } 418 - } 419 - 420 - struct BigState { 421 - store: DiskStore, 422 - walker: Walker, 423 - } 424 - 425 - /// MST walker that reads from disk instead of an in-memory hashmap 426 - pub struct DiskDriver<T: Clone> { 427 - process: fn(Vec<u8>) -> T, 428 - state: Option<BigState>, 429 - } 430 - 431 - // for doctests only 432 - #[doc(hidden)] 433 - pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 434 - use crate::process::noop; 435 - DiskDriver { 436 - process: noop, 437 - state: None, 438 - } 439 - } 440 - 441 - impl<T: Processable + Send + 'static> DiskDriver<T> { 442 - /// Walk the MST returning up to `n` rkey + record pairs 443 - /// 444 - /// ```no_run 445 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 446 - /// # #[tokio::main] 447 - /// # async fn main() -> Result<(), DriveError> { 448 - /// # let mut disk_driver = _get_fake_disk_driver(); 449 - /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 450 - /// for (rkey, record) in pairs { 451 - /// println!("{rkey}: size={}", record.len()); 452 - /// } 453 - /// } 454 - /// let store = disk_driver.reset_store().await?; 455 - /// # Ok(()) 456 - /// # } 457 - /// ``` 458 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 459 - let process = self.process; 460 - 461 - // state should only *ever* be None transiently while inside here 462 - let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 463 - 464 - // the big pain here is that we don't want to leave self.state in an 465 - // invalid state (None), so all the error paths have to make sure it 466 - // comes out again. 467 - let (state, res) = tokio::task::spawn_blocking( 468 - move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 469 - let mut reader_res = state.store.get_reader(); 470 - let reader: &mut _ = match reader_res { 471 - Ok(ref mut r) => r, 472 - Err(ref mut e) => { 473 - // unfortunately we can't return the error directly because 474 - // (for some reason) it's attached to the lifetime of the 475 - // reader? 476 - // hack a mem::swap so we can get it out :/ 477 - let e_swapped = e.steal(); 478 - // the pain: `state` *has to* outlive the reader 479 - drop(reader_res); 480 - return (state, Err(e_swapped.into())); 481 - } 482 - }; 483 - 484 - let mut out = Vec::with_capacity(n); 485 - 486 - for _ in 0..n { 487 - // walk as far as we can until we run out of blocks or find a record 488 - let step = match state.walker.disk_step(reader, process) { 489 - Ok(s) => s, 490 - Err(e) => { 491 - // the pain: `state` *has to* outlive the reader 492 - drop(reader_res); 493 - return (state, Err(e.into())); 494 - } 495 - }; 496 - match step { 497 - Step::Missing(cid) => { 498 - // the pain: `state` *has to* outlive the reader 499 - drop(reader_res); 500 - return (state, Err(DriveError::MissingBlock(cid))); 501 - } 502 - Step::Finish => break, 503 - Step::Found { rkey, data } => out.push((rkey, data)), 504 - }; 505 - } 506 - 507 - // `state` *has to* outlive the reader 508 - drop(reader_res); 509 - 510 - (state, Ok::<_, DriveError>(out)) 511 - }, 512 - ) 513 - .await?; // on tokio JoinError, we'll be left with invalid state :( 514 - 515 - // *must* restore state before dealing with the actual result 516 - self.state = Some(state); 517 - 518 - let out = res?; 519 - 520 - if out.is_empty() { 521 - Ok(None) 522 - } else { 523 - Ok(Some(out)) 524 - } 525 - } 526 - 527 - fn read_tx_blocking( 528 - &mut self, 529 - n: usize, 530 - tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 531 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 532 - let BigState { store, walker } = self.state.as_mut().expect("valid state"); 533 - let mut reader = match store.get_reader() { 534 - Ok(r) => r, 535 - Err(e) => return tx.blocking_send(Err(e.into())), 536 - }; 537 - 538 - loop { 539 - let mut out: BlockChunk<T> = Vec::with_capacity(n); 540 - 541 - for _ in 0..n { 542 - // walk as far as we can until we run out of blocks or find a record 543 - 544 - let step = match walker.disk_step(&mut reader, self.process) { 545 - Ok(s) => s, 546 - Err(e) => return tx.blocking_send(Err(e.into())), 547 - }; 548 - 549 - match step { 550 - Step::Missing(cid) => { 551 - return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 552 - } 553 - Step::Finish => return Ok(()), 554 - Step::Found { rkey, data } => { 555 - out.push((rkey, data)); 556 - continue; 557 - } 558 - }; 559 - } 560 - 561 - if out.is_empty() { 562 - break; 563 - } 564 - tx.blocking_send(Ok(out))?; 565 - } 566 - 567 - Ok(()) 568 - } 569 - 570 - /// Spawn the disk reading task into a tokio blocking thread 571 - /// 572 - /// The idea is to avoid so much sending back and forth to the blocking 573 - /// thread, letting a blocking task do all the disk reading work and sending 574 - /// records and rkeys back through an `mpsc` channel instead. 575 - /// 576 - /// This might also allow the disk work to continue while processing the 577 - /// records. It's still not yet clear if this method actually has much 578 - /// benefit over just using `.next_chunk(n)`. 579 - /// 580 - /// ```no_run 581 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 582 - /// # #[tokio::main] 583 - /// # async fn main() -> Result<(), DriveError> { 584 - /// # let mut disk_driver = _get_fake_disk_driver(); 585 - /// let (mut rx, join) = disk_driver.to_channel(512); 586 - /// while let Some(recvd) = rx.recv().await { 587 - /// let pairs = recvd?; 588 - /// for (rkey, record) in pairs { 589 - /// println!("{rkey}: size={}", record.len()); 590 - /// } 591 - /// 592 - /// } 593 - /// let store = join.await?.reset_store().await?; 594 - /// # Ok(()) 595 - /// # } 596 - /// ``` 597 - pub fn to_channel( 598 - mut self, 599 - n: usize, 600 - ) -> ( 601 - mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 602 - tokio::task::JoinHandle<Self>, 603 - ) { 604 - let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 605 - 606 - // sketch: this worker is going to be allowed to execute without a join handle 607 - let chan_task = tokio::task::spawn_blocking(move || { 608 - if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 609 - log::debug!("big car reader exited early due to dropped receiver channel"); 610 - } 611 - self 612 - }); 613 - 614 - (rx, chan_task) 615 - } 616 - 617 - /// Reset the disk storage so it can be reused. You must call this. 618 - /// 619 - /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 620 - /// calls, that would be risky in an async context. For now you just have to 621 - /// carefully make sure you call it. 622 - /// 623 - /// The sqlite store is returned, so it can be reused for another 624 - /// `DiskDriver`. 625 - pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 626 - let BigState { store, .. } = self.state.take().expect("valid state"); 627 - Ok(store.reset().await?) 628 - } 629 - }
-12
src/lib.rs
··· 73 73 Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 74 74 75 75 */ 76 - 77 - pub mod mst; 78 - mod walk; 79 - 80 - pub mod disk; 81 - pub mod drive; 82 - pub mod process; 83 - 84 - pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 - pub use drive::{DriveError, Driver, DriverBuilder}; 86 - pub use mst::Commit; 87 - pub use process::Processable;
-110
src/mst.rs
··· 1 - //! Low-level types for parsing raw atproto MST CARs 2 - //! 3 - //! The primary aim is to work through the **tree** structure. Non-node blocks 4 - //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 - 6 - use ipld_core::cid::Cid; 7 - use serde::Deserialize; 8 - 9 - /// The top-level data object in a repository's tree is a signed commit. 10 - #[derive(Debug, Deserialize)] 11 - // #[serde(deny_unknown_fields)] 12 - pub struct Commit { 13 - /// the account DID associated with the repo, in strictly normalized form 14 - /// (eg, lowercase as appropriate) 15 - pub did: String, 16 - /// fixed value of 3 for this repo format version 17 - pub version: u64, 18 - /// pointer to the top of the repo contents tree structure (MST) 19 - pub data: Cid, 20 - /// revision of the repo, used as a logical clock. 21 - /// 22 - /// TID format. Must increase monotonically. Recommend using current 23 - /// timestamp as TID; rev values in the "future" (beyond a fudge factor) 24 - /// should be ignored and not processed 25 - pub rev: String, 26 - /// pointer (by hash) to a previous commit object for this repository. 27 - /// 28 - /// Could be used to create a chain of history, but largely unused (included 29 - /// for v2 backwards compatibility). In version 3 repos, this field must 30 - /// exist in the CBOR object, but is virtually always null. NOTE: previously 31 - /// specified as nullable and optional, but this caused interoperability 32 - /// issues. 33 - pub prev: Option<Cid>, 34 - /// cryptographic signature of this commit, as raw bytes 35 - #[serde(with = "serde_bytes")] 36 - pub sig: Vec<u8>, 37 - } 38 - 39 - /// MST node data schema 40 - #[derive(Debug, Deserialize, PartialEq)] 41 - #[serde(deny_unknown_fields)] 42 - pub(crate) struct Node { 43 - /// link to sub-tree Node on a lower level and with all keys sorting before 44 - /// keys at this node 45 - #[serde(rename = "l")] 46 - pub left: Option<Cid>, 47 - /// ordered list of TreeEntry objects 48 - /// 49 - /// atproto MSTs have a fanout of 4, so there can be max 4 entries. 50 - #[serde(rename = "e")] 51 - pub entries: Vec<Entry>, // maybe we can do [Option<Entry>; 4]? 52 - } 53 - 54 - impl Node { 55 - /// test if a block could possibly be a node 56 - /// 57 - /// we can't eagerly decode records except where we're *sure* they cannot be 58 - /// an mst node (and even then we can only attempt) because you can't know 59 - /// with certainty what a block is supposed to be without actually walking 60 - /// the tree. 61 - /// 62 - /// so if a block *could be* a node, any record converter must postpone 63 - /// processing. if it turns out it happens to be a very node-looking record, 64 - /// well, sorry, it just has to only be processed later when that's known. 65 - pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 - const NODE_FINGERPRINT: [u8; 3] = [ 67 - 0xA2, // map length 2 (for "l" and "e" keys) 68 - 0x61, // text length 1 69 - b'e', // "e" before "l" because map keys have to be lex-sorted 70 - // 0x8?: "e" has array (0x100 upper 3 bits) of some length 71 - ]; 72 - let bytes = bytes.as_ref(); 73 - bytes.starts_with(&NODE_FINGERPRINT) 74 - && bytes 75 - .get(3) 76 - .map(|b| b & 0b1110_0000 == 0x80) 77 - .unwrap_or(false) 78 - } 79 - 80 - /// Check if a node has any entries 81 - /// 82 - /// An empty repository with no records is represented as a single MST node 83 - /// with an empty array of entries. This is the only situation in which a 84 - /// tree may contain an empty leaf node which does not either contain keys 85 - /// ("entries") or point to a sub-tree containing entries. 86 - pub(crate) fn is_empty(&self) -> bool { 87 - self.left.is_none() && self.entries.is_empty() 88 - } 89 - } 90 - 91 - /// TreeEntry object 92 - #[derive(Debug, Deserialize, PartialEq)] 93 - #[serde(deny_unknown_fields)] 94 - pub(crate) struct Entry { 95 - /// count of bytes shared with previous TreeEntry in this Node (if any) 96 - #[serde(rename = "p")] 97 - pub prefix_len: usize, 98 - /// remainder of key for this TreeEntry, after "prefixlen" have been removed 99 - #[serde(rename = "k", with = "serde_bytes")] 100 - pub keysuffix: Vec<u8>, // can we String this here? 101 - /// link to the record data (CBOR) for this entry 102 - #[serde(rename = "v")] 103 - pub value: Cid, 104 - /// link to a sub-tree Node at a lower level 105 - /// 106 - /// the lower level must have keys sorting after this TreeEntry's key (to 107 - /// the "right"), but before the next TreeEntry's key in this Node (if any) 108 - #[serde(rename = "t")] 109 - pub tree: Option<Cid>, 110 - }
-87
src/process.rs
··· 1 - /*! 2 - Record processor function output trait 3 - 4 - The return type must satisfy the `Processable` trait, which requires: 5 - 6 - - `Clone` because two rkeys can refer to the same record by CID, which may 7 - only appear once in the CAR file. 8 - - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 - 10 - One required function must be implemented, `get_size()`: this should return the 11 - approximate total off-stack size of the type. (the on-stack size will be added 12 - automatically via `std::mem::get_size`). 13 - 14 - Note that it is **not guaranteed** that the `process` function will run on a 15 - block before storing it in memory or on disk: it's not possible to know if a 16 - block is a record without actually walking the MST, so the best we can do is 17 - apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 - store the raw block bytes. 19 - 20 - Here's a silly processing function that just collects 'eyy's found in the raw 21 - record bytes 22 - 23 - ``` 24 - # use repo_stream::Processable; 25 - # use serde::{Serialize, Deserialize}; 26 - #[derive(Debug, Clone, Serialize, Deserialize)] 27 - struct Eyy(usize, String); 28 - 29 - impl Processable for Eyy { 30 - fn get_size(&self) -> usize { 31 - // don't need to compute the usize, it's on the stack 32 - self.1.capacity() // in-mem size from the string's capacity, in bytes 33 - } 34 - } 35 - 36 - fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 - let mut out = Vec::new(); 38 - let to_find = "eyy".as_bytes(); 39 - for i in 0..(raw.len() - 3) { 40 - if &raw[i..(i+3)] == to_find { 41 - out.push(Eyy(i, "eyy".to_string())); 42 - } 43 - } 44 - out 45 - } 46 - ``` 47 - 48 - The memory sizing stuff is a little sketch but probably at least approximately 49 - works. 50 - */ 51 - 52 - use serde::{Serialize, de::DeserializeOwned}; 53 - 54 - /// Output trait for record processing 55 - pub trait Processable: Clone + Serialize + DeserializeOwned { 56 - /// Any additional in-memory size taken by the processed type 57 - /// 58 - /// Do not include stack size (`std::mem::size_of`) 59 - fn get_size(&self) -> usize; 60 - } 61 - 62 - /// Processor that just returns the raw blocks 63 - #[inline] 64 - pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 - block 66 - } 67 - 68 - impl Processable for u8 { 69 - fn get_size(&self) -> usize { 70 - 0 71 - } 72 - } 73 - 74 - impl Processable for usize { 75 - fn get_size(&self) -> usize { 76 - 0 // no additional space taken, just its stack size (newtype is free) 77 - } 78 - } 79 - 80 - impl<Item: Sized + Processable> Processable for Vec<Item> { 81 - fn get_size(&self) -> usize { 82 - let slot_size = std::mem::size_of::<Item>(); 83 - let direct_size = slot_size * self.capacity(); 84 - let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 85 - direct_size + items_referenced_size 86 - } 87 - }
-403
src/walk.rs
··· 1 - //! Depth-first MST traversal 2 - 3 - use crate::disk::SqliteReader; 4 - use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 - use crate::mst::Node; 6 - use crate::process::Processable; 7 - use ipld_core::cid::Cid; 8 - use sha2::{Digest, Sha256}; 9 - use std::collections::HashMap; 10 - use std::convert::Infallible; 11 - 12 - /// Errors that can happen while walking 13 - #[derive(Debug, thiserror::Error)] 14 - pub enum WalkError { 15 - #[error("Failed to fingerprint commit block")] 16 - BadCommitFingerprint, 17 - #[error("Failed to decode commit block: {0}")] 18 - BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 19 - #[error("Action node error: {0}")] 20 - MstError(#[from] MstError), 21 - #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 23 - #[error("Decode error: {0}")] 24 - DecodeError(#[from] DecodeError), 25 - } 26 - 27 - /// Errors from invalid Rkeys 28 - #[derive(Debug, PartialEq, thiserror::Error)] 29 - pub enum MstError { 30 - #[error("Failed to compute an rkey due to invalid prefix_len")] 31 - EntryPrefixOutOfbounds, 32 - #[error("RKey was not utf-8")] 33 - EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 - #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 - EmptyNode, 36 - #[error("Found an entry with rkey at the wrong depth")] 37 - WrongDepth, 38 - #[error("Lost track of our depth (possible bug?)")] 39 - LostDepth, 40 - #[error("MST depth underflow: depth-0 node with child trees")] 41 - DepthUnderflow, 42 - #[error("Encountered an rkey out of order while walking the MST")] 43 - RkeyOutOfOrder, 44 - } 45 - 46 - /// Walker outputs 47 - #[derive(Debug)] 48 - pub enum Step<T> { 49 - /// We needed this CID but it's not in the block store 50 - Missing(Cid), 51 - /// Reached the end of the MST! yay! 52 - Finish, 53 - /// A record was found! 54 - Found { rkey: String, data: T }, 55 - } 56 - 57 - #[derive(Debug, Clone, PartialEq)] 58 - enum Need { 59 - Node { depth: Depth, cid: Cid }, 60 - Record { rkey: String, cid: Cid }, 61 - } 62 - 63 - #[derive(Debug, Clone, Copy, PartialEq)] 64 - enum Depth { 65 - Root, 66 - Depth(u32), 67 - } 68 - 69 - impl Depth { 70 - fn from_key(key: &[u8]) -> Self { 71 - let mut zeros = 0; 72 - for byte in Sha256::digest(key) { 73 - let leading = byte.leading_zeros(); 74 - zeros += leading; 75 - if leading < 8 { 76 - break; 77 - } 78 - } 79 - Self::Depth(zeros / 2) // truncating divide (rounds down) 80 - } 81 - fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 - match self { 83 - Self::Root => Ok(None), 84 - Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 - } 86 - } 87 - } 88 - 89 - fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST 91 - // ...except for a single one for empty MST, but we wouldn't be pushing that 92 - if node.is_empty() { 93 - return Err(MstError::EmptyNode); 94 - } 95 - 96 - let mut entries = Vec::with_capacity(node.entries.len()); 97 - let mut prefix = vec![]; 98 - let mut this_depth = parent_depth.next_expected()?; 99 - 100 - for entry in &node.entries { 101 - let mut rkey = vec![]; 102 - let pre_checked = prefix 103 - .get(..entry.prefix_len) 104 - .ok_or(MstError::EntryPrefixOutOfbounds)?; 105 - rkey.extend_from_slice(pre_checked); 106 - rkey.extend_from_slice(&entry.keysuffix); 107 - 108 - let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 109 - return Err(MstError::WrongDepth); 110 - }; 111 - 112 - // this_depth is `none` if we are the deepest child (directly below root) 113 - // in that case we accept whatever highest depth is claimed 114 - let expected_depth = match this_depth { 115 - Some(d) => d, 116 - None => { 117 - this_depth = Some(key_depth); 118 - key_depth 119 - } 120 - }; 121 - 122 - // all keys we find should be this depth 123 - if key_depth != expected_depth { 124 - return Err(MstError::DepthUnderflow); 125 - } 126 - 127 - prefix = rkey.clone(); 128 - 129 - entries.push(Need::Record { 130 - rkey: String::from_utf8(rkey)?, 131 - cid: entry.value, 132 - }); 133 - if let Some(ref tree) = entry.tree { 134 - entries.push(Need::Node { 135 - depth: Depth::Depth(key_depth), 136 - cid: *tree, 137 - }); 138 - } 139 - } 140 - 141 - entries.reverse(); 142 - stack.append(&mut entries); 143 - 144 - let d = this_depth.ok_or(MstError::LostDepth)?; 145 - 146 - if let Some(tree) = node.left { 147 - stack.push(Need::Node { 148 - depth: Depth::Depth(d), 149 - cid: tree, 150 - }); 151 - } 152 - Ok(()) 153 - } 154 - 155 - /// Traverser of an atproto MST 156 - /// 157 - /// Walks the tree from left-to-right in depth-first order 158 - #[derive(Debug)] 159 - pub struct Walker { 160 - stack: Vec<Need>, 161 - prev: String, 162 - } 163 - 164 - impl Walker { 165 - pub fn new(tree_root_cid: Cid) -> Self { 166 - Self { 167 - stack: vec![Need::Node { 168 - depth: Depth::Root, 169 - cid: tree_root_cid, 170 - }], 171 - prev: "".to_string(), 172 - } 173 - } 174 - 175 - /// Advance through nodes until we find a record or can't go further 176 - pub fn step<T: Processable>( 177 - &mut self, 178 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 179 - process: impl Fn(Vec<u8>) -> T, 180 - ) -> Result<Step<T>, WalkError> { 181 - loop { 182 - let Some(need) = self.stack.last_mut() else { 183 - log::trace!("tried to walk but we're actually done."); 184 - return Ok(Step::Finish); 185 - }; 186 - 187 - match need { 188 - &mut Need::Node { depth, cid } => { 189 - log::trace!("need node {cid:?}"); 190 - let Some(block) = blocks.remove(&cid) else { 191 - log::trace!("node not found, resting"); 192 - return Ok(Step::Missing(cid)); 193 - }; 194 - 195 - let MaybeProcessedBlock::Raw(data) = block else { 196 - return Err(WalkError::BadCommitFingerprint); 197 - }; 198 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 199 - .map_err(WalkError::BadCommit)?; 200 - 201 - // found node, make sure we remember 202 - self.stack.pop(); 203 - 204 - // queue up work on the found node next 205 - push_from_node(&mut self.stack, &node, depth)?; 206 - } 207 - Need::Record { rkey, cid } => { 208 - log::trace!("need record {cid:?}"); 209 - // note that we cannot *remove* a record block, sadly, since 210 - // there can be multiple rkeys pointing to the same cid. 211 - let Some(data) = blocks.get_mut(cid) else { 212 - return Ok(Step::Missing(*cid)); 213 - }; 214 - let rkey = rkey.clone(); 215 - let data = match data { 216 - MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 217 - MaybeProcessedBlock::Processed(t) => t.clone(), 218 - }; 219 - 220 - // found node, make sure we remember 221 - self.stack.pop(); 222 - 223 - // rkeys *must* be in order or else the tree is invalid (or 224 - // we have a bug) 225 - if rkey <= self.prev { 226 - return Err(MstError::RkeyOutOfOrder)?; 227 - } 228 - self.prev = rkey.clone(); 229 - 230 - return Ok(Step::Found { rkey, data }); 231 - } 232 - } 233 - } 234 - } 235 - 236 - /// blocking!!!!!! 237 - pub fn disk_step<T: Processable>( 238 - &mut self, 239 - reader: &mut SqliteReader, 240 - process: impl Fn(Vec<u8>) -> T, 241 - ) -> Result<Step<T>, WalkError> { 242 - loop { 243 - let Some(need) = self.stack.last_mut() else { 244 - log::trace!("tried to walk but we're actually done."); 245 - return Ok(Step::Finish); 246 - }; 247 - 248 - match need { 249 - &mut Need::Node { depth, cid } => { 250 - let cid_bytes = cid.to_bytes(); 251 - log::trace!("need node {cid:?}"); 252 - let Some(block_bytes) = reader.get(cid_bytes)? else { 253 - log::trace!("node not found, resting"); 254 - return Ok(Step::Missing(cid)); 255 - }; 256 - 257 - let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 258 - 259 - let MaybeProcessedBlock::Raw(data) = block else { 260 - return Err(WalkError::BadCommitFingerprint); 261 - }; 262 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 263 - .map_err(WalkError::BadCommit)?; 264 - 265 - // found node, make sure we remember 266 - self.stack.pop(); 267 - 268 - // queue up work on the found node next 269 - push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 270 - } 271 - Need::Record { rkey, cid } => { 272 - log::trace!("need record {cid:?}"); 273 - let cid_bytes = cid.to_bytes(); 274 - let Some(data_bytes) = reader.get(cid_bytes)? else { 275 - log::trace!("record block not found, resting"); 276 - return Ok(Step::Missing(*cid)); 277 - }; 278 - let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 279 - let rkey = rkey.clone(); 280 - let data = match data { 281 - MaybeProcessedBlock::Raw(data) => process(data), 282 - MaybeProcessedBlock::Processed(t) => t.clone(), 283 - }; 284 - 285 - // found node, make sure we remember 286 - self.stack.pop(); 287 - 288 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 289 - 290 - // rkeys *must* be in order or else the tree is invalid (or 291 - // we have a bug) 292 - if rkey <= self.prev { 293 - return Err(MstError::RkeyOutOfOrder)?; 294 - } 295 - self.prev = rkey.clone(); 296 - 297 - return Ok(Step::Found { rkey, data }); 298 - } 299 - } 300 - } 301 - } 302 - } 303 - 304 - #[cfg(test)] 305 - mod test { 306 - use super::*; 307 - 308 - fn cid1() -> Cid { 309 - "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 310 - .parse() 311 - .unwrap() 312 - } 313 - 314 - #[test] 315 - fn test_depth_spec_0() { 316 - let d = Depth::from_key(b"2653ae71"); 317 - assert_eq!(d, Depth::Depth(0)) 318 - } 319 - 320 - #[test] 321 - fn test_depth_spec_1() { 322 - let d = Depth::from_key(b"blue"); 323 - assert_eq!(d, Depth::Depth(1)) 324 - } 325 - 326 - #[test] 327 - fn test_depth_spec_4() { 328 - let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 329 - assert_eq!(d, Depth::Depth(4)) 330 - } 331 - 332 - #[test] 333 - fn test_depth_spec_8() { 334 - let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 335 - assert_eq!(d, Depth::Depth(8)) 336 - } 337 - 338 - #[test] 339 - fn test_depth_ietf_draft_0() { 340 - let d = Depth::from_key(b"key1"); 341 - assert_eq!(d, Depth::Depth(0)) 342 - } 343 - 344 - #[test] 345 - fn test_depth_ietf_draft_1() { 346 - let d = Depth::from_key(b"key7"); 347 - assert_eq!(d, Depth::Depth(1)) 348 - } 349 - 350 - #[test] 351 - fn test_depth_ietf_draft_4() { 352 - let d = Depth::from_key(b"key515"); 353 - assert_eq!(d, Depth::Depth(4)) 354 - } 355 - 356 - #[test] 357 - fn test_depth_interop() { 358 - // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 359 - for (k, expected) in [ 360 - ("", 0), 361 - ("asdf", 0), 362 - ("blue", 1), 363 - ("2653ae71", 0), 364 - ("88bfafc7", 2), 365 - ("2a92d355", 4), 366 - ("884976f5", 6), 367 - ("app.bsky.feed.post/454397e440ec", 4), 368 - ("app.bsky.feed.post/9adeb165882c", 8), 369 - ] { 370 - let d = Depth::from_key(k.as_bytes()); 371 - assert_eq!(d, Depth::Depth(expected), "key: {}", k); 372 - } 373 - } 374 - 375 - #[test] 376 - fn test_push_empty_fails() { 377 - let empty_node = Node { 378 - left: None, 379 - entries: vec![], 380 - }; 381 - let mut stack = vec![]; 382 - let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 383 - assert_eq!(err, Err(MstError::EmptyNode)); 384 - } 385 - 386 - #[test] 387 - fn test_push_one_node() { 388 - let node = Node { 389 - left: Some(cid1()), 390 - entries: vec![], 391 - }; 392 - let mut stack = vec![]; 393 - push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 394 - assert_eq!( 395 - stack.last(), 396 - Some(Need::Node { 397 - depth: Depth::Depth(3), 398 - cid: cid1() 399 - }) 400 - .as_ref() 401 - ); 402 - } 403 - }
+16 -5
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 2 use repo_stream::Driver; 3 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 4 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 5 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 6 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 7 8 8 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 9 15 let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 10 16 .await 11 17 .unwrap() ··· 33 39 34 40 assert_eq!(records, expected_records); 35 41 assert_eq!(sum, expected_sum); 36 - assert!(found_bsky_profile); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 37 48 } 38 49 39 50 #[tokio::test] 40 51 async fn test_tiny_car() { 41 - test_car(TINY_CAR, 8, 2071).await 52 + test_car(TINY_CAR, 8, 2071, true).await 42 53 } 43 54 44 55 #[tokio::test] 45 56 async fn test_little_car() { 46 - test_car(LITTLE_CAR, 278, 246960).await 57 + test_car(LITTLE_CAR, 278, 246960, true).await 47 58 } 48 59 49 60 #[tokio::test] 50 61 async fn test_midsize_car() { 51 - test_car(MIDSIZE_CAR, 11585, 3741393).await 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 52 63 }