+24
-10
crates/jacquard-repo/src/car/writer.rs
+24
-10
crates/jacquard-repo/src/car/writer.rs
···
23
23
blocks: BTreeMap<IpldCid, Bytes>,
24
24
) -> Result<()> {
25
25
let path = path.as_ref();
26
-
let file = File::create(path)
27
-
.await
28
-
.map_err(|e| RepoError::io(e).with_context(format!("creating CAR file: {}", path.display())))?;
26
+
let file = File::create(path).await.map_err(|e| {
27
+
RepoError::io(e).with_context(format!("creating CAR file: {}", path.display()))
28
+
})?;
29
29
30
30
let header = iroh_car::CarHeader::new_v1(roots);
31
31
let mut writer = CarWriter::new(header, file);
···
37
37
.map_err(|e| RepoError::car(e).with_context(format!("writing block {}", cid)))?;
38
38
}
39
39
40
-
writer.finish().await.map_err(|e| RepoError::car(e).with_context("finalizing CAR file"))?;
40
+
writer
41
+
.finish()
42
+
.await
43
+
.map_err(|e| RepoError::car(e).with_context("finalizing CAR file"))?;
41
44
42
45
Ok(())
43
46
}
···
58
61
.map_err(|e| RepoError::car(e).with_context(format!("writing block {}", cid)))?;
59
62
}
60
63
61
-
writer.finish().await.map_err(|e| RepoError::car(e).with_context("finalizing CAR bytes"))?;
64
+
writer
65
+
.finish()
66
+
.await
67
+
.map_err(|e| RepoError::car(e).with_context("finalizing CAR bytes"))?;
62
68
63
-
buffer.flush().await.map_err(|e| RepoError::io(e).with_context("flushing CAR buffer"))?;
69
+
buffer
70
+
.flush()
71
+
.await
72
+
.map_err(|e| RepoError::io(e).with_context("flushing CAR buffer"))?;
64
73
65
74
Ok(buffer)
66
75
}
···
73
82
/// - All record blocks (from storage)
74
83
///
75
84
/// Uses streaming to avoid loading all blocks into memory.
85
+
///
86
+
/// Should write in the correct order for [streaming car processing](https://github.com/bluesky-social/proposals/blob/main/0006-sync-iteration/README.md#streaming-car-processing) from sync v1.1
76
87
pub async fn export_repo_car<S: BlockStore + Sync + 'static>(
77
88
path: impl AsRef<Path>,
78
89
commit_cid: IpldCid,
79
90
mst: &Mst<S>,
80
91
) -> Result<()> {
81
92
let path = path.as_ref();
82
-
let file = File::create(path)
83
-
.await
84
-
.map_err(|e| RepoError::io(e).with_context(format!("creating CAR export file: {}", path.display())))?;
93
+
let file = File::create(path).await.map_err(|e| {
94
+
RepoError::io(e).with_context(format!("creating CAR export file: {}", path.display()))
95
+
})?;
85
96
86
97
let header = iroh_car::CarHeader::new_v1(vec![commit_cid]);
87
98
let mut writer = CarWriter::new(header, file);
···
105
116
mst.write_blocks_to_car(&mut writer).await?;
106
117
107
118
// Finish writing
108
-
writer.finish().await.map_err(|e| RepoError::car(e).with_context("finalizing CAR export"))?;
119
+
writer
120
+
.finish()
121
+
.await
122
+
.map_err(|e| RepoError::car(e).with_context("finalizing CAR export"))?;
109
123
110
124
Ok(())
111
125
}
+190
-6
crates/jacquard-repo/src/commit/firehose.rs
+190
-6
crates/jacquard-repo/src/commit/firehose.rs
···
87
87
88
88
/// DEPRECATED: Unused
89
89
pub rebase: bool,
90
+
91
+
/// Debug: block sources for validation analysis
92
+
#[cfg(debug_assertions)]
93
+
pub block_sources: BTreeMap<IpldCid, String>,
94
+
95
+
#[cfg(debug_assertions)]
96
+
pub excluded_blocks: BTreeMap<IpldCid, Vec<u8>>, // blocks we skipped
97
+
98
+
#[cfg(debug_assertions)]
99
+
pub excluded_metadata: BTreeMap<IpldCid, Vec<String>>, // context about excluded blocks
90
100
}
91
101
92
102
/// A repository operation (mutation of a single record)
···
193
203
blobs: self.blobs.into_iter().map(|b| b.into_static()).collect(),
194
204
too_big: self.too_big,
195
205
rebase: self.rebase,
206
+
#[cfg(debug_assertions)]
207
+
block_sources: self.block_sources,
208
+
#[cfg(debug_assertions)]
209
+
excluded_blocks: self.excluded_blocks,
210
+
#[cfg(debug_assertions)]
211
+
excluded_metadata: self.excluded_metadata,
196
212
}
197
213
}
198
214
}
···
218
234
use crate::mst::{Mst, VerifiedWriteOp};
219
235
use crate::storage::{BlockStore, LayeredBlockStore, MemoryBlockStore};
220
236
use cid::Cid as IpldCid;
237
+
use std::collections::BTreeMap;
221
238
use std::sync::Arc;
222
239
223
240
impl<'a> FirehoseCommit<'a> {
···
324
341
/// **Inductive property:** Can validate without any external state besides the blocks
325
342
/// in this message. The `prev_data` field provides the starting MST root, and operations
326
343
/// include `prev` CIDs for validation. All necessary blocks must be in the CAR bytes.
344
+
///
345
+
/// Note: Because this uses the same merkle search tree struct as the repository itself,
346
+
/// this is far from the most efficient possible validation function possible. The repo
347
+
/// tree struct carries extra information. However,
348
+
/// it has the virtue of making everything self-validating.
327
349
pub async fn validate_v1_1(&self, pubkey: &PublicKey<'_>) -> Result<IpldCid> {
328
350
// 1. Require prev_data for v1.1
329
351
let prev_data_cid: IpldCid = self
···
337
359
338
360
// 2. Parse CAR blocks from the firehose message into temporary storage
339
361
let parsed = parse_car_bytes(&self.blocks).await?;
362
+
363
+
#[cfg(debug_assertions)]
364
+
let provided_blocks = parsed
365
+
.blocks
366
+
.keys()
367
+
.cloned()
368
+
.collect::<std::collections::HashSet<_>>();
369
+
#[cfg(debug_assertions)]
370
+
let accessed_blocks = Arc::new(std::sync::RwLock::new(std::collections::HashSet::new()));
371
+
372
+
#[cfg(debug_assertions)]
373
+
let missing_blocks = Arc::new(std::sync::RwLock::new(Vec::new()));
374
+
375
+
#[cfg(debug_assertions)]
376
+
let block_categories = self.block_sources.clone();
377
+
378
+
#[cfg(debug_assertions)]
379
+
let excluded_blocks_ref = self.excluded_blocks.clone();
380
+
340
381
let temp_storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
341
382
383
+
#[cfg(debug_assertions)]
384
+
let tracking_storage = {
385
+
use crate::storage::BlockStore;
386
+
#[derive(Clone)]
387
+
struct TrackingStorage {
388
+
inner: Arc<MemoryBlockStore>,
389
+
accessed: Arc<std::sync::RwLock<std::collections::HashSet<IpldCid>>>,
390
+
missing: Arc<std::sync::RwLock<Vec<IpldCid>>>,
391
+
excluded: BTreeMap<IpldCid, Vec<u8>>,
392
+
}
393
+
impl BlockStore for TrackingStorage {
394
+
async fn get(&self, cid: &IpldCid) -> Result<Option<Bytes>> {
395
+
self.accessed.write().unwrap().insert(*cid);
396
+
let result = self.inner.get(cid).await?;
397
+
398
+
if result.is_none() {
399
+
// Check if this block was excluded
400
+
if let Some(excluded_block) = self.excluded.get(cid) {
401
+
self.missing.write().unwrap().push(*cid);
402
+
eprintln!(
403
+
"[MISS] Block {} was EXCLUDED but needed during validation",
404
+
cid
405
+
);
406
+
// Return the excluded block so validation can continue
407
+
return Ok(Some(Bytes::copy_from_slice(&excluded_block)));
408
+
} else {
409
+
self.missing.write().unwrap().push(*cid);
410
+
eprintln!(
411
+
"[MISS] Block {} not found (never seen during commit creation)",
412
+
cid
413
+
);
414
+
}
415
+
}
416
+
417
+
Ok(result)
418
+
}
419
+
async fn put(&self, data: &[u8]) -> Result<IpldCid> {
420
+
self.inner.put(data).await
421
+
}
422
+
async fn has(&self, cid: &IpldCid) -> Result<bool> {
423
+
self.inner.has(cid).await
424
+
}
425
+
async fn put_many(
426
+
&self,
427
+
blocks: impl IntoIterator<Item = (IpldCid, Bytes)> + Send,
428
+
) -> Result<()> {
429
+
self.inner.put_many(blocks).await
430
+
}
431
+
async fn get_many(&self, cids: &[IpldCid]) -> Result<Vec<Option<Bytes>>> {
432
+
self.inner.get_many(cids).await
433
+
}
434
+
async fn apply_commit(&self, commit: crate::repo::CommitData) -> Result<()> {
435
+
self.inner.apply_commit(commit).await
436
+
}
437
+
}
438
+
Arc::new(TrackingStorage {
439
+
inner: temp_storage.clone(),
440
+
accessed: accessed_blocks.clone(),
441
+
missing: missing_blocks.clone(),
442
+
excluded: excluded_blocks_ref,
443
+
})
444
+
};
445
+
446
+
#[cfg(not(debug_assertions))]
447
+
let tracking_storage = temp_storage.clone();
448
+
342
449
// 3. Extract and verify commit object from temporary storage
343
450
let commit_cid: IpldCid = self
344
451
.commit
345
452
.to_ipld()
346
453
.map_err(|e| RepoError::invalid_cid_conversion(e, "commit CID"))?;
347
-
let commit_bytes = temp_storage
454
+
let commit_bytes = tracking_storage
348
455
.get(&commit_cid)
349
456
.await?
350
457
.ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?;
···
366
473
367
474
// 5. Load new MST from commit.data (claimed result)
368
475
let expected_root = *commit.data();
369
-
let mut new_mst = Mst::load(temp_storage, expected_root, None);
476
+
477
+
let mut new_mst = Mst::load(tracking_storage, expected_root, None);
370
478
371
479
let verified_ops = self
372
480
.ops
···
401
509
)));
402
510
}
403
511
512
+
#[cfg(debug_assertions)]
513
+
{
514
+
let accessed = accessed_blocks.read().unwrap();
515
+
let missing = missing_blocks.read().unwrap();
516
+
let unused: Vec<_> = provided_blocks.difference(&*accessed).copied().collect();
517
+
518
+
println!("[validation stats]");
519
+
println!(" provided: {} blocks", provided_blocks.len());
520
+
println!(" accessed: {} blocks", accessed.len());
521
+
522
+
if !missing.is_empty() {
523
+
println!(
524
+
" MISSING: {} blocks NEEDED but not provided!",
525
+
missing.len()
526
+
);
527
+
528
+
// Show operation breakdown for this commit
529
+
let mut op_counts: BTreeMap<&str, usize> = BTreeMap::new();
530
+
for op in &self.ops {
531
+
*op_counts.entry(op.action.as_ref()).or_insert(0) += 1;
532
+
}
533
+
println!(" operations in this commit:");
534
+
for (action, count) in op_counts {
535
+
println!(" {}: {}", action, count);
536
+
}
537
+
538
+
println!(" missing block CIDs:");
539
+
for cid in missing.iter() {
540
+
if self.excluded_blocks.contains_key(cid) {
541
+
println!(" {} (was excluded)", cid);
542
+
if let Some(metadata) = self.excluded_metadata.get(cid) {
543
+
for context in metadata {
544
+
println!(" -> {}", context);
545
+
}
546
+
}
547
+
} else {
548
+
println!(" {} (never seen)", cid);
549
+
}
550
+
}
551
+
}
552
+
553
+
if !unused.is_empty() {
554
+
println!(
555
+
" UNUSED: {} blocks ({}%)",
556
+
unused.len(),
557
+
(unused.len() * 100) / provided_blocks.len()
558
+
);
559
+
560
+
// Show breakdown by category
561
+
let mut category_stats: BTreeMap<&str, (usize, usize)> = BTreeMap::new();
562
+
for (cid, category) in &block_categories {
563
+
let stats = category_stats.entry(category).or_insert((0, 0));
564
+
stats.0 += 1; // total provided
565
+
if accessed.contains(cid) {
566
+
stats.1 += 1; // accessed
567
+
}
568
+
}
569
+
570
+
println!("\n breakdown by category:");
571
+
for (category, (total, accessed_count)) in category_stats {
572
+
let unused_count = total - accessed_count;
573
+
let unused_pct = if total > 0 {
574
+
(unused_count * 100) / total
575
+
} else {
576
+
0
577
+
};
578
+
println!(
579
+
" {}: {} total, {} accessed, {} unused ({}%)",
580
+
category, total, accessed_count, unused_count, unused_pct
581
+
);
582
+
}
583
+
} else {
584
+
println!(" ✓ all blocks were used");
585
+
}
586
+
}
587
+
404
588
Ok(expected_root)
405
589
}
406
590
}
···
408
592
#[cfg(test)]
409
593
mod tests {
410
594
use super::*;
411
-
use crate::commit::{Commit, SigningKey as _};
595
+
use crate::Repository;
596
+
use crate::commit::Commit;
412
597
use crate::mst::{Mst, RecordWriteOp};
413
598
use crate::storage::MemoryBlockStore;
414
-
use crate::{CommitData, Repository};
415
599
use jacquard_common::types::crypto::{KeyCodec, PublicKey};
416
600
use jacquard_common::types::recordkey::Rkey;
417
601
use jacquard_common::types::string::{Nsid, RecordKey};
···
720
904
let parsed = parse_car_bytes(&firehose_commit.blocks).await.unwrap();
721
905
let commit_cid: IpldCid = firehose_commit.commit.to_ipld().unwrap();
722
906
723
-
let mut blocks_without_commit: BTreeMap<IpldCid, bytes::Bytes> = parsed
907
+
let blocks_without_commit: BTreeMap<IpldCid, bytes::Bytes> = parsed
724
908
.blocks
725
909
.into_iter()
726
910
.filter(|(cid, _)| cid != &commit_cid)
···
851
1035
.insert(fake_commit_cid, bytes::Bytes::from(fake_commit_cbor));
852
1036
commit_data.cid = fake_commit_cid;
853
1037
854
-
let mut firehose_commit = commit_data
1038
+
let firehose_commit = commit_data
855
1039
.to_firehose_commit(&did, 1, Datetime::now(), repo_ops, vec![])
856
1040
.await
857
1041
.unwrap();
+47
crates/jacquard-repo/src/mst/cursor.rs
+47
crates/jacquard-repo/src/mst/cursor.rs
···
7
7
use cid::Cid as IpldCid;
8
8
use smol_str::SmolStr;
9
9
10
+
#[cfg(debug_assertions)]
11
+
use std::collections::HashSet;
12
+
#[cfg(debug_assertions)]
13
+
use std::sync::{Arc, RwLock};
14
+
10
15
/// Position within an MST traversal
11
16
#[derive(Debug, Clone)]
12
17
pub enum CursorPosition<S: BlockStore> {
···
65
70
66
71
/// Current position in traversal
67
72
current: CursorPosition<S>,
73
+
74
+
/// Track CIDs accessed during traversal (debug only)
75
+
#[cfg(debug_assertions)]
76
+
accessed_cids: Option<Arc<RwLock<HashSet<IpldCid>>>>,
68
77
}
69
78
70
79
impl<S: BlockStore + Sync + 'static> MstCursor<S> {
···
76
85
Self {
77
86
path: Vec::new(),
78
87
current: CursorPosition::Tree { mst: root },
88
+
#[cfg(debug_assertions)]
89
+
accessed_cids: None,
90
+
}
91
+
}
92
+
93
+
/// Create new cursor with dirty tracking enabled
94
+
///
95
+
/// Records all CIDs accessed during traversal in the provided set.
96
+
#[cfg(debug_assertions)]
97
+
pub fn new_with_tracking(root: Mst<S>, tracking: Arc<RwLock<HashSet<IpldCid>>>) -> Self {
98
+
Self {
99
+
path: Vec::new(),
100
+
current: CursorPosition::Tree { mst: root },
101
+
accessed_cids: Some(tracking),
79
102
}
80
103
}
81
104
···
103
126
/// If at the root level (before stepping in), returns root's layer + 1.
104
127
pub async fn layer(&self) -> Result<usize> {
105
128
if let Some((walking_node, _, _)) = self.path.last() {
129
+
// Track CID access
130
+
#[cfg(debug_assertions)]
131
+
if let Some(ref tracking) = self.accessed_cids {
132
+
if let Ok(cid) = walking_node.get_pointer().await {
133
+
tracking.write().unwrap().insert(cid);
134
+
}
135
+
}
136
+
106
137
// We're inside a node - return its layer
107
138
walking_node.get_layer().await
108
139
} else {
···
111
142
// is one layer higher than being "inside" the root
112
143
match &self.current {
113
144
CursorPosition::Tree { mst } => {
145
+
// Track CID access
146
+
#[cfg(debug_assertions)]
147
+
if let Some(ref tracking) = self.accessed_cids {
148
+
if let Ok(cid) = mst.get_pointer().await {
149
+
tracking.write().unwrap().insert(cid);
150
+
}
151
+
}
152
+
114
153
let root_layer = mst.get_layer().await?;
115
154
Ok(root_layer + 1)
116
155
}
···
186
225
187
226
/// Descend into a tree node
188
227
async fn step_into(&mut self, mst: Mst<S>) -> Result<()> {
228
+
// Track CID access
229
+
#[cfg(debug_assertions)]
230
+
if let Some(ref tracking) = self.accessed_cids {
231
+
if let Ok(cid) = mst.get_pointer().await {
232
+
tracking.write().unwrap().insert(cid);
233
+
}
234
+
}
235
+
189
236
let entries = mst.get_entries().await?;
190
237
191
238
if entries.is_empty() {
+40
-1
crates/jacquard-repo/src/mst/diff.rs
+40
-1
crates/jacquard-repo/src/mst/diff.rs
···
4
4
use std::future::Future;
5
5
use std::pin::Pin;
6
6
7
+
#[cfg(debug_assertions)]
8
+
use std::collections::HashSet;
9
+
#[cfg(debug_assertions)]
10
+
use std::sync::{Arc, RwLock};
11
+
7
12
use super::cursor::{CursorPosition, MstCursor};
8
13
use super::tree::Mst;
9
14
use super::util::serialize_node_data;
···
60
65
/// When modifying a tree, old MST nodes along changed paths become unreachable.
61
66
/// This tracks those nodes for garbage collection.
62
67
pub removed_mst_blocks: Vec<IpldCid>,
68
+
69
+
/// CIDs accessed from old tree during diff (debug only)
70
+
///
71
+
/// Tracks all blocks touched when walking the old tree during diff.
72
+
/// This is the precise set of blocks needed for validation.
73
+
#[cfg(debug_assertions)]
74
+
pub old_tree_accessed: Vec<IpldCid>,
75
+
76
+
/// CIDs accessed from new tree during diff (debug only)
77
+
#[cfg(debug_assertions)]
78
+
pub new_tree_accessed: Vec<IpldCid>,
63
79
}
64
80
65
81
use super::tree::VerifiedWriteOp;
···
75
91
removed_cids: Vec::new(),
76
92
new_mst_blocks: BTreeMap::new(),
77
93
removed_mst_blocks: Vec::new(),
94
+
#[cfg(debug_assertions)]
95
+
old_tree_accessed: Vec::new(),
96
+
#[cfg(debug_assertions)]
97
+
new_tree_accessed: Vec::new(),
78
98
}
79
99
}
80
100
···
234
254
return Ok(());
235
255
}
236
256
237
-
// CIDs differ - use cursors to walk both trees
257
+
// CIDs differ - use cursors to walk both trees with tracking
258
+
#[cfg(debug_assertions)]
259
+
let old_tracking = Arc::new(RwLock::new(HashSet::new()));
260
+
#[cfg(debug_assertions)]
261
+
let new_tracking = Arc::new(RwLock::new(HashSet::new()));
262
+
263
+
#[cfg(debug_assertions)]
264
+
let mut old_cursor = MstCursor::new_with_tracking(old.clone(), old_tracking.clone());
265
+
#[cfg(debug_assertions)]
266
+
let mut new_cursor = MstCursor::new_with_tracking(new.clone(), new_tracking.clone());
267
+
268
+
#[cfg(not(debug_assertions))]
238
269
let mut old_cursor = MstCursor::new(old.clone());
270
+
#[cfg(not(debug_assertions))]
239
271
let mut new_cursor = MstCursor::new(new.clone());
240
272
241
273
// Don't advance yet - let loop handle roots like any other tree comparison
···
393
425
}
394
426
}
395
427
}
428
+
}
429
+
430
+
// Collect tracking data
431
+
#[cfg(debug_assertions)]
432
+
{
433
+
diff.old_tree_accessed = old_tracking.read().unwrap().iter().copied().collect();
434
+
diff.new_tree_accessed = new_tracking.read().unwrap().iter().copied().collect();
396
435
}
397
436
398
437
Ok(())
+121
-118
crates/jacquard-repo/src/repo.rs
+121
-118
crates/jacquard-repo/src/repo.rs
···
58
58
59
59
/// CIDs of blocks to delete
60
60
pub deleted_cids: Vec<IpldCid>,
61
+
62
+
/// Debug: block sources for validation analysis
63
+
#[cfg(debug_assertions)]
64
+
pub block_sources: BTreeMap<IpldCid, String>,
65
+
#[cfg(debug_assertions)]
66
+
pub excluded_blocks: BTreeMap<IpldCid, Bytes>, // blocks we skipped
67
+
#[cfg(debug_assertions)]
68
+
pub excluded_metadata: BTreeMap<IpldCid, Vec<String>>, // context about excluded blocks
61
69
}
62
70
63
71
impl CommitData {
···
91
99
blobs,
92
100
too_big: false,
93
101
rebase: false,
102
+
#[cfg(debug_assertions)]
103
+
block_sources: self.block_sources.clone(),
104
+
#[cfg(debug_assertions)]
105
+
excluded_blocks: self
106
+
.excluded_blocks
107
+
.iter()
108
+
.map(|(cid, b)| (cid.clone(), b.to_vec()))
109
+
.collect(),
110
+
#[cfg(debug_assertions)]
111
+
excluded_metadata: self.excluded_metadata.clone(),
94
112
})
95
113
}
96
114
}
···
233
251
blocks: blocks.clone(),
234
252
relevant_blocks: blocks,
235
253
deleted_cids: Vec::new(),
254
+
#[cfg(debug_assertions)]
255
+
block_sources: BTreeMap::new(),
256
+
#[cfg(debug_assertions)]
257
+
excluded_blocks: BTreeMap::new(),
258
+
#[cfg(debug_assertions)]
259
+
excluded_metadata: BTreeMap::new(),
236
260
})
237
261
}
238
262
···
346
370
// But these bulk operations would benefit significantly from cursor's skip_subtree()
347
371
// to avoid traversing unrelated branches when searching lexicographically-organized data.
348
372
349
-
/// Apply record write operations with inline data
350
-
///
351
-
/// Serializes record data to DAG-CBOR, computes CIDs, stores data blocks,
352
-
/// then applies write operations to the MST. Returns the diff for inspection.
353
-
///
354
-
/// For creating commits with operations, use `create_commit()` instead.
355
-
pub async fn apply_record_writes(&mut self, ops: &[RecordWriteOp<'_>]) -> Result<MstDiff> {
356
-
use smol_str::format_smolstr;
357
-
358
-
let mut updated_tree = self.mst.clone();
359
-
360
-
for op in ops {
361
-
updated_tree = match op {
362
-
RecordWriteOp::Create {
363
-
collection,
364
-
rkey,
365
-
record,
366
-
} => {
367
-
let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref());
368
-
369
-
// Serialize record to DAG-CBOR
370
-
let cbor = serde_ipld_dagcbor::to_vec(record).map_err(|e| {
371
-
RepoError::serialization(e).with_context(format!(
372
-
"serializing record data for {}/{}",
373
-
collection.as_ref(),
374
-
rkey.as_ref()
375
-
))
376
-
})?;
377
-
378
-
// Compute CID and store data
379
-
let cid = self.storage.put(&cbor).await?;
380
-
381
-
updated_tree.add(key.as_str(), cid).await?
382
-
}
383
-
RecordWriteOp::Update {
384
-
collection,
385
-
rkey,
386
-
record,
387
-
prev,
388
-
} => {
389
-
let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref());
390
-
391
-
// Serialize record to DAG-CBOR
392
-
let cbor = serde_ipld_dagcbor::to_vec(record).map_err(|e| {
393
-
RepoError::serialization(e).with_context(format!(
394
-
"serializing record data for {}/{}",
395
-
collection.as_ref(),
396
-
rkey.as_ref()
397
-
))
398
-
})?;
399
-
400
-
// Compute CID and store data
401
-
let cid = self.storage.put(&cbor).await?;
402
-
403
-
// Validate prev if provided
404
-
if let Some(prev_cid) = prev {
405
-
if &cid != prev_cid {
406
-
return Err(RepoError::cid_mismatch(format!(
407
-
"Update prev CID mismatch for key {}: expected {}, got {}",
408
-
key, prev_cid, cid
409
-
)));
410
-
}
411
-
}
412
-
413
-
updated_tree.add(key.as_str(), cid).await?
414
-
}
415
-
RecordWriteOp::Delete {
416
-
collection,
417
-
rkey,
418
-
prev,
419
-
} => {
420
-
let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref());
421
-
422
-
// Check exists
423
-
let current = self
424
-
.mst
425
-
.get(key.as_str())
426
-
.await?
427
-
.ok_or_else(|| RepoError::not_found("record", key.as_str()))?;
428
-
429
-
// Validate prev if provided
430
-
if let Some(prev_cid) = prev {
431
-
if ¤t != prev_cid {
432
-
return Err(RepoError::cid_mismatch(format!(
433
-
"Delete prev CID mismatch for key {}: expected {}, got {}",
434
-
key, prev_cid, current
435
-
)));
436
-
}
437
-
}
438
-
439
-
updated_tree.delete(key.as_str()).await?
440
-
}
441
-
};
442
-
}
443
-
444
-
// Compute diff before updating
445
-
let diff = self.mst.diff(&updated_tree).await?;
446
-
447
-
println!("Repo before:\n{}", self);
448
-
// Update mst
449
-
self.mst = updated_tree;
450
-
451
-
println!("Repo after:\n{}", self);
452
-
Ok(diff)
453
-
}
454
-
455
373
/// Create a commit from record write operations
456
374
///
457
375
/// Applies write operations, creates signed commit, and collects blocks:
···
578
496
let mut blocks = diff.new_mst_blocks;
579
497
let mut relevant_blocks = BTreeMap::new();
580
498
581
-
// Add the previous MST root block (needed to load prev_data in validation)
582
-
if let Some(prev_root_block) = self.storage.get(&prev_data).await? {
583
-
relevant_blocks.insert(prev_data, prev_root_block);
584
-
}
499
+
#[cfg(debug_assertions)]
500
+
let mut block_sources: BTreeMap<IpldCid, &str> = BTreeMap::new();
501
+
502
+
// // Add the previous MST root block (needed to load prev_data in validation)
503
+
// if let Some(prev_root_block) = self.storage.get(&prev_data).await? {
504
+
// #[cfg(debug_assertions)]
505
+
// block_sources.insert(prev_data, "prev_root");
506
+
// relevant_blocks.insert(prev_data, prev_root_block);
507
+
// }
585
508
586
-
// Walk paths in both old and new trees for each operation
509
+
let mut new_tree_cids = std::collections::HashSet::new();
587
510
for op in ops {
588
511
let key = format_smolstr!("{}/{}", op.collection().as_ref(), op.rkey().as_ref());
589
-
590
512
updated_tree
591
513
.blocks_for_path(&key, &mut relevant_blocks)
592
514
.await?;
593
515
594
-
self.mst.blocks_for_path(&key, &mut relevant_blocks).await?;
516
+
let new_path_cids = updated_tree.cids_for_path(&key).await?;
517
+
for cid in &new_path_cids {
518
+
new_tree_cids.insert(*cid);
519
+
#[cfg(debug_assertions)]
520
+
block_sources.insert(*cid, "new_tree_path");
521
+
}
522
+
}
523
+
let mut old_path_blocks = BTreeMap::new();
524
+
let mut old_path_cids = std::collections::HashSet::new();
525
+
526
+
// Step 2: Walk OLD tree, only add blocks NOT in new tree (changed nodes)
527
+
for op in ops {
528
+
let key = format_smolstr!("{}/{}", op.collection().as_ref(), op.rkey().as_ref());
529
+
530
+
self.mst.blocks_for_path(&key, &mut old_path_blocks).await?;
531
+
for cid in updated_tree.cids_for_path(&key).await? {
532
+
old_path_cids.insert(cid);
533
+
}
534
+
}
535
+
536
+
let mut excluded_blocks = BTreeMap::new();
537
+
#[cfg(debug_assertions)]
538
+
let mut excluded_metadata: BTreeMap<IpldCid, Vec<String>> = BTreeMap::new();
539
+
540
+
// Re-walk old tree paths to collect metadata about excluded blocks
541
+
#[cfg(debug_assertions)]
542
+
for (op_idx, op) in ops.iter().enumerate() {
543
+
let key = format_smolstr!("{}/{}", op.collection().as_ref(), op.rkey().as_ref());
544
+
let old_path_cids = self.mst.cids_for_path(&key).await?;
545
+
546
+
for (depth, cid) in old_path_cids.iter().enumerate() {
547
+
if !new_tree_cids.contains(cid) {
548
+
let metadata = format!("op#{} ({}) path depth {}", op_idx, key, depth);
549
+
excluded_metadata.entry(*cid).or_insert_with(Vec::new).push(metadata);
550
+
}
551
+
}
595
552
}
596
553
597
-
// Add new leaf blocks to both collections (single iteration)
598
-
for (cid, block) in &leaf_blocks {
599
-
if diff.new_leaf_cids.contains(cid) {
600
-
blocks.insert(*cid, block.clone());
601
-
relevant_blocks.insert(*cid, block.clone());
554
+
for (cid, block) in old_path_blocks.into_iter() {
555
+
// Only include if this block CHANGED (different CID in new tree)
556
+
if !new_tree_cids.contains(&cid) {
557
+
//relevant_blocks.insert(cid, block);
558
+
excluded_blocks.insert(cid, block);
559
+
#[cfg(debug_assertions)]
560
+
block_sources.insert(cid, "old_tree_changed");
602
561
}
603
562
}
604
563
564
+
// // Add new leaf blocks to both collections (single iteration)
565
+
// for (cid, block) in &leaf_blocks {
566
+
// if diff.new_leaf_cids.contains(cid) {
567
+
// blocks.insert(*cid, block.clone());
568
+
// #[cfg(debug_assertions)]
569
+
// block_sources.insert(*cid, "new_leaf");
570
+
// relevant_blocks.insert(*cid, block.clone());
571
+
// }
572
+
// }
573
+
574
+
// For DELETE operations, we need the deleted record blocks for inversion
575
+
// (when inverting a delete, we insert the prev CID back)
576
+
// for cid in &deleted_cids {
577
+
// if let Some(block) = self.storage.get(cid).await? {
578
+
// #[cfg(debug_assertions)]
579
+
// block_sources.insert(*cid, "deleted_leaf");
580
+
// relevant_blocks.insert(*cid, block);
581
+
// }
582
+
// }
583
+
605
584
// Step 6: Create and sign commit
606
585
let rev = Ticker::new().next(Some(self.commit.rev.clone()));
607
586
let commit = Commit::new_unsigned(did.clone().into_static(), data, rev.clone(), prev)
···
613
592
614
593
// Step 7: Add commit block to both collections
615
594
blocks.insert(commit_cid, commit_bytes.clone());
595
+
#[cfg(debug_assertions)]
596
+
block_sources.insert(commit_cid, "commit");
616
597
relevant_blocks.insert(commit_cid, commit_bytes);
617
598
599
+
#[cfg(debug_assertions)]
600
+
{
601
+
let mut by_source: BTreeMap<&str, usize> = BTreeMap::new();
602
+
for source in block_sources.values() {
603
+
*by_source.entry(source).or_insert(0) += 1;
604
+
}
605
+
println!("[commit creation] relevant_blocks by source:");
606
+
for (source, count) in by_source {
607
+
println!(" {}: {}", source, count);
608
+
}
609
+
println!(" TOTAL: {}", relevant_blocks.len());
610
+
}
611
+
618
612
// Step 8: Update internal MST state
619
613
self.mst = updated_tree;
620
614
···
630
624
blocks,
631
625
relevant_blocks,
632
626
deleted_cids,
627
+
#[cfg(debug_assertions)]
628
+
block_sources: block_sources
629
+
.into_iter()
630
+
.map(|(k, v)| (k, v.to_string()))
631
+
.collect(),
632
+
#[cfg(debug_assertions)]
633
+
excluded_blocks,
634
+
#[cfg(debug_assertions)]
635
+
excluded_metadata,
633
636
},
634
637
))
635
638
}
+1
-1
crates/jacquard-repo/tests/firehose_stress.rs
crates/jacquard-repo/tests/large_proof_tests.rs
+1
-1
crates/jacquard-repo/tests/firehose_stress.rs
crates/jacquard-repo/tests/large_proof_tests.rs
···
338
338
339
339
firehose_commit.validate_v1_1(&pubkey).await.unwrap();
340
340
341
-
for batch_num in 1..=2000 {
341
+
for batch_num in 1..=5000 {
342
342
let batch_size = rng.gen_range(1..=20);
343
343
let ops = generate_random_ops(&mut rng, &mut tracker, batch_size);
344
344
let record_writes = test_ops_to_record_writes(ops, &collection);