High-performance implementation of plcbundle written in Rust

refactor(did_index): improve compaction strategy

+16 -30
src/did_index.rs
··· 330 330 // LRU cache for hot shards 331 331 shard_cache: Arc<RwLock<HashMap<u8, Arc<Shard>>>>, 332 332 max_cache: usize, 333 - max_segments_per_shard: usize, 334 333 335 334 config: Arc<RwLock<Config>>, 336 335 ··· 398 397 config_path, 399 398 shard_cache: Arc::new(RwLock::new(HashMap::new())), 400 399 max_cache: 5, 401 - max_segments_per_shard: 8, 402 400 config: Arc::new(RwLock::new(config.clone())), 403 401 cache_hits: AtomicI64::new(0), 404 402 cache_misses: AtomicI64::new(0), ··· 1395 1393 } 1396 1394 1397 1395 let compacted = if meta_opt.is_some() { 1398 - self.auto_compact_if_needed(shard_num)? 1396 + self.compact_shard(shard_num)?; 1397 + true 1399 1398 } else { 1400 1399 false 1401 1400 }; ··· 1681 1680 } 1682 1681 1683 1682 /// Compact pending delta segments. If `shards` is `None`, all shards are compacted. 1683 + /// Uses parallel execution across shards for faster compaction. 1684 1684 pub fn compact_pending_segments(&self, shards: Option<Vec<u8>>) -> Result<()> { 1685 + use rayon::prelude::*; 1686 + 1685 1687 match shards { 1686 1688 Some(list) if !list.is_empty() => { 1687 - for shard in list { 1688 - self.compact_shard(shard)?; 1689 - } 1689 + list.into_par_iter() 1690 + .map(|shard| self.compact_shard(shard)) 1691 + .collect::<Result<Vec<_>>>()?; 1690 1692 } 1691 1693 _ => { 1692 - for shard in 0..DID_SHARD_COUNT { 1693 - self.compact_shard(shard as u8)?; 1694 - } 1694 + (0..DID_SHARD_COUNT) 1695 + .into_par_iter() 1696 + .map(|shard| self.compact_shard(shard as u8)) 1697 + .collect::<Result<Vec<_>>>()?; 1695 1698 } 1696 1699 } 1697 1700 ··· 1761 1764 Ok(layers) 1762 1765 } 1763 1766 1764 - fn auto_compact_if_needed(&self, shard_num: u8) -> Result<bool> { 1765 - let should_compact = { 1766 - let config = self.config.read().unwrap(); 1767 - config 1768 - .shards 1769 - .get(shard_num as usize) 1770 - .map(|meta| meta.segments.len() >= self.max_segments_per_shard) 1771 - .unwrap_or(false) 1772 - }; 1773 - 1774 - if should_compact { 1775 - self.compact_shard(shard_num)?; 1776 - Ok(true) 1777 - } else { 1778 - Ok(false) 1779 - } 1780 - } 1767 + // removed: auto_compact_if_needed (compaction now happens immediately after updates) 1781 1768 1782 1769 fn compact_shard(&self, shard_num: u8) -> Result<()> { 1783 1770 use std::time::Instant; ··· 3622 3609 .get("delta_segments") 3623 3610 .and_then(|v| v.as_u64()) 3624 3611 .unwrap_or(0); 3625 - assert!( 3626 - delta_segments > 0, 3627 - "expected pending delta segments, got {}", 3628 - delta_segments 3612 + assert_eq!( 3613 + delta_segments, 0, 3614 + "expected no pending delta segments after immediate compaction" 3629 3615 ); 3630 3616 } 3631 3617
+87 -25
src/manager.rs
··· 1648 1648 /// 1649 1649 /// IMPORTANT: This method performs heavy blocking I/O and should be called from async 1650 1650 /// contexts using spawn_blocking to avoid freezing the async runtime (and HTTP server). 1651 - pub fn batch_update_did_index(&self, start_bundle: u32, end_bundle: u32) -> Result<()> { 1651 + pub fn batch_update_did_index( 1652 + &self, 1653 + start_bundle: u32, 1654 + end_bundle: u32, 1655 + compact: bool, 1656 + ) -> Result<()> { 1652 1657 use std::time::Instant; 1653 1658 1654 1659 if start_bundle > end_bundle { ··· 1657 1662 1658 1663 let total_start = Instant::now(); 1659 1664 let bundle_count = end_bundle - start_bundle + 1; 1665 + if bundle_count > 10 { 1666 + use std::time::Instant; 1667 + eprintln!( 1668 + "[DID Index] Rebuild triggered for {} bundles ({} → {})", 1669 + bundle_count, start_bundle, end_bundle 1670 + ); 1671 + let rebuild_start = Instant::now(); 1672 + let _ = self.build_did_index( 1673 + crate::constants::DID_INDEX_FLUSH_INTERVAL, 1674 + Some( 1675 + |current: u32, total: u32, bytes_processed: u64, total_bytes: u64| { 1676 + let percent = if total_bytes > 0 { 1677 + (bytes_processed as f64 / total_bytes as f64) * 100.0 1678 + } else { 1679 + 0.0 1680 + }; 1681 + eprintln!( 1682 + "[DID Index] Rebuild progress: {}/{} ({:.1}%)", 1683 + current, total, percent 1684 + ); 1685 + }, 1686 + ), 1687 + None, 1688 + None, 1689 + )?; 1690 + let dur = rebuild_start.elapsed(); 1691 + eprintln!("[DID Index] Rebuild complete in {:.1}s", dur.as_secs_f64()); 1692 + return Ok(()); 1693 + } 1660 1694 1661 1695 if *self.verbose.lock().unwrap() { 1662 1696 log::info!( ··· 1710 1744 total_operations as f64 / update_duration.as_secs_f64() 1711 1745 ); 1712 1746 1747 + // Optionally compact all shards immediately to avoid leaving delta segments 1748 + if compact { 1749 + let idx_guard = self.did_index.read().unwrap(); 1750 + if let Some(idx) = idx_guard.as_ref() { 1751 + idx.compact_pending_segments(None)?; 1752 + } 1753 + } 1754 + 1713 1755 let total_duration = total_start.elapsed(); 1714 1756 1715 1757 if *self.verbose.lock().unwrap() { ··· 1734 1776 &self, 1735 1777 start_bundle: u32, 1736 1778 end_bundle: u32, 1779 + compact: bool, 1737 1780 ) -> Result<()> { 1738 1781 let manager = self.clone_for_arc(); 1739 1782 1740 - tokio::task::spawn_blocking(move || { 1741 - manager.batch_update_did_index(start_bundle, end_bundle) 1783 + // First perform the batch update in a blocking task 1784 + let _ = tokio::task::spawn_blocking(move || { 1785 + manager.batch_update_did_index(start_bundle, end_bundle, compact) 1742 1786 }) 1743 1787 .await 1744 - .map_err(|e| anyhow::anyhow!("Batch DID index update task failed: {}", e))? 1788 + .map_err(|e| anyhow::anyhow!("Batch DID index update task failed: {}", e))?; 1789 + 1790 + Ok(()) 1745 1791 } 1746 1792 1747 1793 /// Fetch and save next bundle from PLC directory ··· 1750 1796 &self, 1751 1797 client: &crate::plc_client::PLCClient, 1752 1798 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 1799 + update_did_index: bool, 1753 1800 ) -> Result<SyncResult> { 1754 1801 use crate::sync::{get_boundary_cids, strip_boundary_duplicates}; 1755 1802 use std::time::Instant; ··· 1873 1920 } 1874 1921 1875 1922 let fetch_op_start = Instant::now(); 1876 - if let Some(ref rx) = shutdown_rx && *rx.borrow() { 1923 + if let Some(ref rx) = shutdown_rx 1924 + && *rx.borrow() 1925 + { 1877 1926 anyhow::bail!("Shutdown requested"); 1878 1927 } 1879 1928 let (plc_ops, wait_dur, http_dur) = if let Some(rx) = shutdown_rx.clone() { ··· 2064 2113 index_write_time, 2065 2114 did_index_compacted, 2066 2115 ) = self 2067 - .save_bundle_with_timing(next_bundle_num, operations) 2116 + .save_bundle_with_timing(next_bundle_num, operations, update_did_index) 2068 2117 .await?; 2069 2118 let save_duration = save_start.elapsed(); 2070 2119 ··· 2113 2162 let total_duration_ms = (fetch_total_duration + save_duration).as_millis() as u64; 2114 2163 let fetch_duration_ms = fetch_total_duration.as_millis() as u64; 2115 2164 2116 - // Calculate separate timings: bundle save (serialize + compress + hash) vs index (did_index + index_write) 2117 - let bundle_save_ms = (serialize_time + compress_time + hash_time).as_millis() as u64; 2118 - let index_ms = (did_index_time + index_write_time).as_millis() as u64; 2165 + // Calculate separate timings: bundle save vs index write/DID index 2166 + let (bundle_save_ms, index_ms) = if update_did_index { 2167 + ( 2168 + (serialize_time + compress_time + hash_time).as_millis() as u64, 2169 + (did_index_time + index_write_time).as_millis() as u64, 2170 + ) 2171 + } else { 2172 + ( 2173 + (serialize_time + compress_time + hash_time + index_write_time).as_millis() as u64, 2174 + 0, 2175 + ) 2176 + }; 2119 2177 2120 2178 // Only log detailed info in verbose mode 2121 2179 if *self.verbose.lock().unwrap() { ··· 2164 2222 let mut synced = 0; 2165 2223 2166 2224 loop { 2167 - match self.sync_next_bundle(client, None).await { 2225 + match self.sync_next_bundle(client, None, true).await { 2168 2226 Ok(SyncResult::BundleCreated { .. }) => { 2169 2227 synced += 1; 2170 2228 ··· 2193 2251 &self, 2194 2252 bundle_num: u32, 2195 2253 operations: Vec<Operation>, 2254 + update_did_index: bool, 2196 2255 ) -> Result<( 2197 2256 std::time::Duration, 2198 2257 std::time::Duration, ··· 2491 2550 ); 2492 2551 } 2493 2552 2494 - // Update DID index (now fast with delta segments) 2495 - let did_index_start = Instant::now(); 2496 - let did_ops: Vec<(String, bool)> = operations 2497 - .iter() 2498 - .map(|op| (op.did.clone(), op.nullified)) 2499 - .collect(); 2553 + let (did_index_time, did_index_compacted) = if update_did_index { 2554 + let did_index_start = Instant::now(); 2555 + let did_ops: Vec<(String, bool)> = operations 2556 + .iter() 2557 + .map(|op| (op.did.clone(), op.nullified)) 2558 + .collect(); 2500 2559 2501 - self.ensure_did_index()?; 2502 - let did_index_compacted = self 2503 - .did_index 2504 - .write() 2505 - .unwrap() 2506 - .as_mut() 2507 - .unwrap() 2508 - .update_for_bundle(bundle_num, did_ops)?; 2509 - let did_index_time = did_index_start.elapsed(); 2560 + self.ensure_did_index()?; 2561 + let compacted = self 2562 + .did_index 2563 + .write() 2564 + .unwrap() 2565 + .as_mut() 2566 + .unwrap() 2567 + .update_for_bundle(bundle_num, did_ops)?; 2568 + (did_index_start.elapsed(), compacted) 2569 + } else { 2570 + (std::time::Duration::from_millis(0), false) 2571 + }; 2510 2572 2511 2573 // Update main index 2512 2574 let index_write_start = Instant::now();
+10 -8
src/plc_client.rs
··· 73 73 after: &str, 74 74 count: usize, 75 75 ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 76 - self.fetch_operations_with_retry_cancelable(after, count, 5, None).await 76 + self.fetch_operations_with_retry_cancelable(after, count, 5, None) 77 + .await 77 78 } 78 79 79 80 pub async fn fetch_operations_cancelable( ··· 82 83 count: usize, 83 84 shutdown_rx: Option<tokio::sync::watch::Receiver<bool>>, 84 85 ) -> Result<(Vec<PLCOperation>, Duration, Duration)> { 85 - self.fetch_operations_with_retry_cancelable(after, count, 5, shutdown_rx).await 86 + self.fetch_operations_with_retry_cancelable(after, count, 5, shutdown_rx) 87 + .await 86 88 } 87 89 88 90 async fn fetch_operations_with_retry_cancelable( ··· 98 100 let mut total_http = Duration::from_secs(0); 99 101 100 102 for attempt in 1..=max_retries { 101 - if let Some(ref rx) = shutdown_rx && *rx.borrow() { 103 + if let Some(ref rx) = shutdown_rx 104 + && *rx.borrow() 105 + { 102 106 anyhow::bail!("Shutdown requested"); 103 107 } 104 - let export_url = format!( 105 - "{}/export?after={}&count={}", 106 - self.base_url, after, count 107 - ); 108 + let export_url = format!("{}/export?after={}&count={}", self.base_url, after, count); 108 109 109 110 let permits = self.rate_limiter.available_permits(); 110 111 let requests_in_period = self.count_requests_in_period(); ··· 369 370 let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(0)); 370 371 let sem_clone = semaphore.clone(); 371 372 372 - let refill_rate = Duration::from_secs_f64(period.as_secs_f64() / requests_per_period as f64); 373 + let refill_rate = 374 + Duration::from_secs_f64(period.as_secs_f64() / requests_per_period as f64); 373 375 374 376 // Spawn background task to refill permits at steady rate 375 377 // CRITICAL: Add first permit immediately, then refill at steady rate
+60 -13
src/sync.rs
··· 289 289 } else { 290 290 format!("{:.0}KB", size_kb) 291 291 }; 292 - 293 - eprintln!( 294 - "[INFO] → Bundle {:06} | {} | {} dids | {} | fetch: {:.2}s ({} reqs, {:.1}s wait) | save: {}ms | index: {}ms | {}", 292 + let base = format!( 293 + "[INFO] → Bundle {:06} | {} | {} dids | {} | fetch: {:.2}s ({} reqs, {:.1}s wait) | save: {}ms", 295 294 bundle_num, 296 295 hash, 297 296 unique_dids, ··· 299 298 fetch_secs, 300 299 fetch_requests, 301 300 wait_secs, 302 - bundle_save_ms, 303 - index_ms, 304 - age 301 + bundle_save_ms 305 302 ); 303 + if index_ms > 0 { 304 + eprintln!("{} | index: {}ms | {}", base, index_ms, age); 305 + } else { 306 + eprintln!("{} | {}", base, age); 307 + } 306 308 } 307 309 308 310 fn on_caught_up( ··· 504 506 505 507 match self 506 508 .manager 507 - .sync_next_bundle(&self.client, None) 509 + .sync_next_bundle(&self.client, None, true) 508 510 .await 509 511 { 510 512 Ok(crate::manager::SyncResult::BundleCreated { ··· 597 599 598 600 let mut total_synced = 0u32; 599 601 let mut is_initial_sync = true; 602 + let mut did_index_batch_done = false; 603 + let mut initial_sync_first_bundle: Option<u32> = None; 600 604 601 605 // Notify logger that sync is starting 602 606 if let Some(logger) = &self.logger { ··· 635 639 636 640 let sync_result = self 637 641 .manager 638 - .sync_next_bundle(&self.client, self.config.shutdown_rx.clone()) 642 + .sync_next_bundle( 643 + &self.client, 644 + self.config.shutdown_rx.clone(), 645 + !is_initial_sync, 646 + ) 639 647 .await; 640 648 641 649 match sync_result { ··· 656 664 fetch_http_ms, 657 665 }) => { 658 666 total_synced += 1; 667 + if is_initial_sync && initial_sync_first_bundle.is_none() { 668 + initial_sync_first_bundle = Some(bundle_num); 669 + } 659 670 660 671 // Reset error counter on successful sync 661 672 use std::sync::atomic::{AtomicU32, Ordering}; ··· 748 759 } 749 760 750 761 // Caught up to the end of the chain 751 - // Mark initial sync as complete ONLY if we actually synced at least one bundle. 752 - // This prevents premature "initial sync complete" when we just have a full 753 - // mempool from a previous run but still have thousands of bundles to sync. 754 - if is_initial_sync && total_synced > 0 { 755 - is_initial_sync = false; 762 + // When initial sync finishes, perform a single batch DID index update if the index is empty 763 + // or if we created bundles during initial sync with per-bundle updates disabled. 764 + if is_initial_sync && !did_index_batch_done { 765 + let stats = self.manager.get_did_index_stats(); 766 + let total_dids = stats 767 + .get("total_dids") 768 + .and_then(|v| v.as_u64()) 769 + .unwrap_or(0); 770 + let total_entries = stats 771 + .get("total_entries") 772 + .and_then(|v| v.as_u64()) 773 + .unwrap_or(0); 756 774 775 + let end_bundle = self.manager.get_last_bundle(); 776 + let start_bundle = initial_sync_first_bundle.unwrap_or(1); 777 + 778 + // Only run batch update if there are bundles to process and either index is empty 779 + // or we created some bundles during this initial sync. 780 + let created_bundles = total_synced > 0; 781 + let index_is_empty = total_dids == 0 && total_entries == 0; 782 + if end_bundle >= start_bundle && (index_is_empty || created_bundles) { 783 + if self.config.verbose { 784 + eprintln!( 785 + "[Sync] Performing batch DID index update: {} → {} (index empty={}, created_bundles={})", 786 + start_bundle, end_bundle, index_is_empty, created_bundles 787 + ); 788 + } 789 + if let Err(e) = self 790 + .manager 791 + .batch_update_did_index_async(start_bundle, end_bundle, true) 792 + .await 793 + { 794 + eprintln!( 795 + "[Sync] Batch DID index update failed after initial sync: {}", 796 + e 797 + ); 798 + } else { 799 + did_index_batch_done = true; 800 + } 801 + } 802 + 803 + is_initial_sync = false; 757 804 self.handle_event(&SyncEvent::InitialSyncComplete { 758 805 total_bundles: total_synced, 759 806 mempool_count,
+2 -2
tests/manager.rs
··· 29 29 30 30 // Build DID index so DID lookups work 31 31 manager 32 - .batch_update_did_index_async(1, manager.get_last_bundle()) 32 + .batch_update_did_index_async(1, manager.get_last_bundle(), false) 33 33 .await?; 34 34 35 35 // Query DID operations and resolve DID ··· 127 127 common::add_dummy_bundle(&dir2_path)?; 128 128 let manager2 = plcbundle::BundleManager::new(dir2_path.clone(), ())?; 129 129 manager2 130 - .batch_update_did_index_async(1, manager2.get_last_bundle()) 130 + .batch_update_did_index_async(1, manager2.get_last_bundle(), false) 131 131 .await?; 132 132 133 133 // Verify we can query DID operations from the newly built index
+2 -2
tests/server.rs
··· 58 58 let manager = Arc::new(manager); 59 59 // Build DID index so the resolver can find operations in bundles 60 60 manager 61 - .batch_update_did_index_async(1, manager.get_last_bundle()) 61 + .batch_update_did_index_async(1, manager.get_last_bundle(), false) 62 62 .await?; 63 63 let port = 3032; 64 64 let server_handle = common::start_test_server(Arc::clone(&manager), port).await?; ··· 149 149 let manager = Arc::new(manager); 150 150 // Ensure DID index is available for data/op lookups 151 151 manager 152 - .batch_update_did_index_async(1, manager.get_last_bundle()) 152 + .batch_update_did_index_async(1, manager.get_last_bundle(), false) 153 153 .await?; 154 154 let port = 3031; 155 155 let server_handle = common::start_test_server(Arc::clone(&manager), port).await?;