tracks lexicons and how many times they appeared on the jetstream

feat(server): use snapshots for reading from handles to not block writes, updating every sync

ptr.pet 9da170a1 a050efab

verified
Changed files
+147 -37
server
+7
server/Cargo.lock
··· 48 checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" 49 50 [[package]] 51 name = "async-compression" 52 version = "0.4.25" 53 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1553 version = "0.1.0" 1554 dependencies = [ 1555 "anyhow", 1556 "async-trait", 1557 "axum", 1558 "axum-tws",
··· 48 checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" 49 50 [[package]] 51 + name = "arc-swap" 52 + version = "1.7.1" 53 + source = "registry+https://github.com/rust-lang/crates.io-index" 54 + checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" 55 + 56 + [[package]] 57 name = "async-compression" 58 version = "0.4.25" 59 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1559 version = "0.1.0" 1560 dependencies = [ 1561 "anyhow", 1562 + "arc-swap", 1563 "async-trait", 1564 "axum", 1565 "axum-tws",
+1
server/Cargo.toml
··· 31 parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] } 32 rclite = "0.2.7" 33 snmalloc-rs = "0.3.8"
··· 31 parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] } 32 rclite = "0.2.7" 33 snmalloc-rs = "0.3.8" 34 + arc-swap = "1.7.1"
+36 -17
server/src/db/handle.rs
··· 1 use std::{ 2 fmt::Debug, 3 io::Cursor, 4 - ops::{Bound, Deref, RangeBounds}, 5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering}, 6 time::Duration, 7 }; 8 9 use byteview::ByteView; 10 - use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice}; 11 use itertools::Itertools; 12 use parking_lot::Mutex; 13 use rayon::iter::{IntoParallelIterator, ParallelIterator}; ··· 16 17 use crate::{ 18 db::{EventRecord, NsidHit, block}, 19 - error::AppResult, 20 - utils::{CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 21 }; 22 23 pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; ··· 31 } 32 33 pub struct LexiconHandle { 34 - tree: Partition, 35 nsid: SmolStr, 36 buf: Arc<Mutex<Vec<EventRecord>>>, 37 last_insert: AtomicU64, // relaxed ··· 46 } 47 } 48 49 - impl Deref for LexiconHandle { 50 - type Target = Partition; 51 - 52 - fn deref(&self) -> &Self::Target { 53 - &self.tree 54 - } 55 - } 56 - 57 impl LexiconHandle { 58 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 59 let opts = PartitionCreateOptions::default() 60 .block_size(1024 * 48) 61 .compression(fjall::CompressionType::Miniz(9)); 62 Self { 63 - tree: keyspace.open_partition(nsid, opts).unwrap(), 64 nsid: nsid.into(), 65 buf: Default::default(), 66 last_insert: AtomicU64::new(0), ··· 68 } 69 } 70 71 pub fn span(&self) -> tracing::Span { 72 tracing::info_span!("handle", nsid = %self.nsid) 73 } 74 75 pub fn nsid(&self) -> &SmolStr { 76 &self.nsid 77 } 78 79 pub fn item_count(&self) -> usize { 80 self.buf.lock().len() 81 } ··· 122 let end_key = varints_unsigned_encoded([end_limit]); 123 124 let blocks_to_compact = self 125 - .tree 126 .range(start_key..end_key) 127 .collect::<Result<Vec<_>, _>>()?; 128 if blocks_to_compact.len() < 2 { ··· 162 let end_blocks_size = new_blocks.len(); 163 164 for key in keys_to_delete { 165 - self.tree.remove(key.clone())?; 166 } 167 for block in new_blocks { 168 - self.tree.insert(block.key, block.data)?; 169 } 170 171 let reduction = ··· 179 ); 180 181 Ok(()) 182 } 183 184 pub fn encode_block_from_items(
··· 1 use std::{ 2 fmt::Debug, 3 io::Cursor, 4 + ops::{Bound, RangeBounds}, 5 sync::atomic::{AtomicU64, Ordering as AtomicOrdering}, 6 time::Duration, 7 }; 8 9 use byteview::ByteView; 10 + use fjall::{Keyspace, Partition, PartitionCreateOptions, Slice, Snapshot}; 11 use itertools::Itertools; 12 use parking_lot::Mutex; 13 use rayon::iter::{IntoParallelIterator, ParallelIterator}; ··· 16 17 use crate::{ 18 db::{EventRecord, NsidHit, block}, 19 + error::{AppError, AppResult}, 20 + utils::{ 21 + ArcRefCnt, ArcliteSwap, CLOCK, DefaultRateTracker, RateTracker, ReadVariableExt, 22 + varints_unsigned_encoded, 23 + }, 24 }; 25 26 pub type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; ··· 34 } 35 36 pub struct LexiconHandle { 37 + write_tree: Partition, 38 + read_tree: ArcliteSwap<Snapshot>, 39 nsid: SmolStr, 40 buf: Arc<Mutex<Vec<EventRecord>>>, 41 last_insert: AtomicU64, // relaxed ··· 50 } 51 } 52 53 impl LexiconHandle { 54 pub fn new(keyspace: &Keyspace, nsid: &str) -> Self { 55 let opts = PartitionCreateOptions::default() 56 .block_size(1024 * 48) 57 .compression(fjall::CompressionType::Miniz(9)); 58 + let write_tree = keyspace.open_partition(nsid, opts).unwrap(); 59 + let read_tree = ArcliteSwap::new(ArcRefCnt::new(write_tree.snapshot())); 60 Self { 61 + write_tree, 62 + read_tree, 63 nsid: nsid.into(), 64 buf: Default::default(), 65 last_insert: AtomicU64::new(0), ··· 67 } 68 } 69 70 + #[inline(always)] 71 + pub fn read(&self) -> arc_swap::Guard<ArcRefCnt<Snapshot>> { 72 + self.read_tree.load() 73 + } 74 + 75 + #[inline(always)] 76 + pub fn update_tree(&self) { 77 + self.read_tree 78 + .store(ArcRefCnt::new(self.write_tree.snapshot())); 79 + } 80 + 81 + #[inline(always)] 82 pub fn span(&self) -> tracing::Span { 83 tracing::info_span!("handle", nsid = %self.nsid) 84 } 85 86 + #[inline(always)] 87 pub fn nsid(&self) -> &SmolStr { 88 &self.nsid 89 } 90 91 + #[inline(always)] 92 pub fn item_count(&self) -> usize { 93 self.buf.lock().len() 94 } ··· 135 let end_key = varints_unsigned_encoded([end_limit]); 136 137 let blocks_to_compact = self 138 + .read() 139 .range(start_key..end_key) 140 .collect::<Result<Vec<_>, _>>()?; 141 if blocks_to_compact.len() < 2 { ··· 175 let end_blocks_size = new_blocks.len(); 176 177 for key in keys_to_delete { 178 + self.write_tree.remove(key.clone())?; 179 } 180 for block in new_blocks { 181 + self.write_tree.insert(block.key, block.data)?; 182 } 183 184 let reduction = ··· 192 ); 193 194 Ok(()) 195 + } 196 + 197 + pub fn insert_block(&self, block: Block) -> AppResult<()> { 198 + self.write_tree 199 + .insert(block.key, block.data) 200 + .map_err(AppError::from) 201 } 202 203 pub fn encode_block_from_items(
+37 -20
server/src/db/mod.rs
··· 1 use std::{ 2 - collections::HashMap, 3 fmt::Debug, 4 io::Cursor, 5 ops::{Bound, Deref, RangeBounds}, ··· 165 pub fn sync(&self, all: bool) -> AppResult<()> { 166 let start = CLOCK.now(); 167 // prepare all the data 168 - let mut data = Vec::with_capacity(self.hits.len()); 169 let _guard = scc::ebr::Guard::new(); 170 - for (_, handle) in self.hits.iter(&_guard) { 171 let mut nsid_data = Vec::with_capacity(2); 172 let mut total_count = 0; 173 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; ··· 201 {blocks = %nsid_data.len(), count = %total_count}, 202 "will encode & sync", 203 ); 204 data.push(nsid_data); 205 } 206 } ··· 228 for (block, handle) in chunk { 229 self.sync_pool.execute(move || { 230 let _span = handle.span().entered(); 231 - match handle.insert(block.key, block.data) { 232 Ok(_) => { 233 - tracing::info!({count = %block.written}, "synced") 234 } 235 Err(err) => tracing::error!({ err = %err }, "failed to sync block"), 236 } ··· 239 AppResult::Ok(()) 240 })?; 241 self.sync_pool.join(); 242 tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks"); 243 244 Ok(()) ··· 254 let Some(handle) = self.get_handle(nsid) else { 255 return Ok(()); 256 }; 257 - handle.compact(max_count, range, sort) 258 } 259 260 pub fn compact_all( ··· 363 let Some(handle) = self.get_handle(&nsid) else { 364 continue; 365 }; 366 - let block_lens = handle.iter().rev().try_fold(Vec::new(), |mut acc, item| { 367 - let (key, value) = item?; 368 - let mut timestamps = Cursor::new(key); 369 - let start_timestamp = timestamps.read_varint()?; 370 - let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 371 - acc.push(decoder.item_count()); 372 - AppResult::Ok(acc) 373 - })?; 374 nsids.insert(nsid.to_smolstr(), block_lens); 375 } 376 Ok(DbInfo { ··· 438 )) 439 }; 440 441 - let (blocks, counted) = handle 442 .range(..end_key) 443 .map(|res| res.map_err(AppError::from)) 444 .rev() ··· 462 ) 463 .into_inner(); 464 465 - tracing::info!( 466 - "got blocks with size {}, item count {counted}", 467 - blocks.len() 468 - ); 469 470 Either::Left(blocks.into_iter().rev().flatten().flatten()) 471 } ··· 476 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 477 return Ok(0); 478 }; 479 - let Some((timestamps_raw, _)) = handle.first_key_value()? else { 480 return Ok(0); 481 }; 482 let mut timestamp_reader = Cursor::new(timestamps_raw);
··· 1 use std::{ 2 + collections::{HashMap, HashSet}, 3 fmt::Debug, 4 io::Cursor, 5 ops::{Bound, Deref, RangeBounds}, ··· 165 pub fn sync(&self, all: bool) -> AppResult<()> { 166 let start = CLOCK.now(); 167 // prepare all the data 168 + let nsids_len = self.hits.len(); 169 + let mut data = Vec::with_capacity(nsids_len); 170 + let mut nsids = HashSet::with_capacity(nsids_len); 171 let _guard = scc::ebr::Guard::new(); 172 + for (nsid, handle) in self.hits.iter(&_guard) { 173 let mut nsid_data = Vec::with_capacity(2); 174 let mut total_count = 0; 175 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; ··· 203 {blocks = %nsid_data.len(), count = %total_count}, 204 "will encode & sync", 205 ); 206 + nsids.insert(nsid.clone()); 207 data.push(nsid_data); 208 } 209 } ··· 231 for (block, handle) in chunk { 232 self.sync_pool.execute(move || { 233 let _span = handle.span().entered(); 234 + let written = block.written; 235 + match handle.insert_block(block) { 236 Ok(_) => { 237 + tracing::info!({count = %written}, "synced") 238 } 239 Err(err) => tracing::error!({ err = %err }, "failed to sync block"), 240 } ··· 243 AppResult::Ok(()) 244 })?; 245 self.sync_pool.join(); 246 + 247 + // update snapshots for all (changed) handles 248 + for nsid in nsids { 249 + self.hits.peek_with(&nsid, |_, handle| handle.update_tree()); 250 + } 251 + 252 tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks"); 253 254 Ok(()) ··· 264 let Some(handle) = self.get_handle(nsid) else { 265 return Ok(()); 266 }; 267 + handle.compact(max_count, range, sort)?; 268 + handle.update_tree(); 269 + Ok(()) 270 } 271 272 pub fn compact_all( ··· 375 let Some(handle) = self.get_handle(&nsid) else { 376 continue; 377 }; 378 + let block_lens = handle 379 + .read() 380 + .iter() 381 + .rev() 382 + .try_fold(Vec::new(), |mut acc, item| { 383 + let (key, value) = item?; 384 + let mut timestamps = Cursor::new(key); 385 + let start_timestamp = timestamps.read_varint()?; 386 + let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 387 + acc.push(decoder.item_count()); 388 + AppResult::Ok(acc) 389 + })?; 390 nsids.insert(nsid.to_smolstr(), block_lens); 391 } 392 Ok(DbInfo { ··· 454 )) 455 }; 456 457 + let (blocks, _counted) = handle 458 + .read() 459 .range(..end_key) 460 .map(|res| res.map_err(AppError::from)) 461 .rev() ··· 479 ) 480 .into_inner(); 481 482 + // tracing::info!( 483 + // "got blocks with size {}, item count {counted}", 484 + // blocks.len() 485 + // ); 486 487 Either::Left(blocks.into_iter().rev().flatten().flatten()) 488 } ··· 493 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 494 return Ok(0); 495 }; 496 + let Some((timestamps_raw, _)) = handle.read().first_key_value()? else { 497 return Ok(0); 498 }; 499 let mut timestamp_reader = Cursor::new(timestamps_raw);
+66
server/src/utils.rs
··· 1 use std::io::{self, Read, Write}; 2 use std::sync::atomic::{AtomicU64, Ordering}; 3 use std::time::Duration; 4 5 use byteview::ByteView; 6 use ordered_varint::Variable; 7 8 pub fn get_time() -> Duration { 9 std::time::SystemTime::now() ··· 320 } 321 } 322 }
··· 1 use std::io::{self, Read, Write}; 2 + use std::ops::Deref; 3 use std::sync::atomic::{AtomicU64, Ordering}; 4 use std::time::Duration; 5 6 + use arc_swap::RefCnt; 7 use byteview::ByteView; 8 use ordered_varint::Variable; 9 + use rclite::Arc; 10 11 pub fn get_time() -> Duration { 12 std::time::SystemTime::now() ··· 323 } 324 } 325 } 326 + 327 + pub type ArcliteSwap<T> = arc_swap::ArcSwapAny<ArcRefCnt<T>>; 328 + 329 + pub struct ArcRefCnt<T>(Arc<T>); 330 + 331 + impl<T> ArcRefCnt<T> { 332 + pub fn new(value: T) -> Self { 333 + Self(Arc::new(value)) 334 + } 335 + } 336 + 337 + impl<T> Deref for ArcRefCnt<T> { 338 + type Target = T; 339 + 340 + fn deref(&self) -> &Self::Target { 341 + &self.0 342 + } 343 + } 344 + 345 + impl<T> Clone for ArcRefCnt<T> { 346 + fn clone(&self) -> Self { 347 + Self(self.0.clone()) 348 + } 349 + } 350 + 351 + // SAFETY: uhhhhhhhh copied the Arc impl from arc_swap xd 352 + unsafe impl<T> RefCnt for ArcRefCnt<T> { 353 + type Base = T; 354 + 355 + fn into_ptr(me: Self) -> *mut Self::Base { 356 + Arc::into_raw(me.0) as *mut T 357 + } 358 + 359 + fn as_ptr(me: &Self) -> *mut Self::Base { 360 + // Slightly convoluted way to do this, but this avoids stacked borrows violations. The same 361 + // intention as 362 + // 363 + // me as &T as *const T as *mut T 364 + // 365 + // We first create a "shallow copy" of me - one that doesn't really own its ref count 366 + // (that's OK, me _does_ own it, so it can't be destroyed in the meantime). 367 + // Then we can use into_raw (which preserves not having the ref count). 368 + // 369 + // We need to "revert" the changes we did. In current std implementation, the combination 370 + // of from_raw and forget is no-op. But formally, into_raw shall be paired with from_raw 371 + // and that read shall be paired with forget to properly "close the brackets". In future 372 + // versions of STD, these may become something else that's not really no-op (unlikely, but 373 + // possible), so we future-proof it a bit. 374 + 375 + // SAFETY: &T cast to *const T will always be aligned, initialised and valid for reads 376 + let ptr = Arc::into_raw(unsafe { std::ptr::read(&me.0) }); 377 + let ptr = ptr as *mut T; 378 + 379 + // SAFETY: We got the pointer from into_raw just above 380 + std::mem::forget(unsafe { Arc::from_raw(ptr) }); 381 + 382 + ptr 383 + } 384 + 385 + unsafe fn from_ptr(ptr: *const Self::Base) -> Self { 386 + Self(unsafe { Arc::from_raw(ptr) }) 387 + } 388 + }