tracks lexicons and how many times they appeared on the jetstream

Compare changes

Choose any two refs to compare.

Changed files
+929 -38
server
-1
server/Cargo.lock
··· 1580 1580 "tower-http", 1581 1581 "tracing", 1582 1582 "tracing-subscriber", 1583 - "zstd", 1584 1583 ] 1585 1584 1586 1585 [[package]]
-1
server/Cargo.toml
··· 30 30 rayon = "1.10.0" 31 31 parking_lot = { version = "0.12", features = ["send_guard", "hardware-lock-elision"] } 32 32 rclite = "0.2.7" 33 - zstd = "0.13.3" 34 33 35 34 [target.'cfg(target_env = "msvc")'.dependencies] 36 35 snmalloc-rs = "0.3.8"
-19
server/src/db/mod.rs
··· 380 380 }) 381 381 } 382 382 383 - // train zstd dict with 100 blocks from every lexicon 384 - pub fn train_zstd_dict(&self) -> AppResult<Vec<u8>> { 385 - let samples = self 386 - .get_nsids() 387 - .filter_map(|nsid| self.get_handle(&nsid)) 388 - .map(|handle| { 389 - handle 390 - .iter() 391 - .rev() 392 - .map(|res| { 393 - res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)) 394 - .map(|(_, value)| Cursor::new(value)) 395 - }) 396 - .take(1000) 397 - }) 398 - .flatten(); 399 - zstd::dict::from_sample_iterator(samples, 1024 * 128).map_err(AppError::from) 400 - } 401 - 402 383 pub fn get_hits( 403 384 &self, 404 385 nsid: &str,
+501
server/src/db_old/block.rs
··· 1 + use ordered_varint::Variable; 2 + use rkyv::{ 3 + Archive, Deserialize, Serialize, 4 + api::high::{HighSerializer, HighValidator}, 5 + bytecheck::CheckBytes, 6 + de::Pool, 7 + rancor::{self, Strategy}, 8 + ser::allocator::ArenaHandle, 9 + util::AlignedVec, 10 + }; 11 + use std::{ 12 + io::{self, Read, Write}, 13 + marker::PhantomData, 14 + }; 15 + 16 + use crate::error::{AppError, AppResult}; 17 + 18 + pub struct Item<T> { 19 + pub timestamp: u64, 20 + data: AlignedVec, 21 + phantom: PhantomData<T>, 22 + } 23 + 24 + impl<T: Archive> Item<T> { 25 + pub fn access(&self) -> &T::Archived { 26 + unsafe { rkyv::access_unchecked::<T::Archived>(&self.data) } 27 + } 28 + } 29 + 30 + impl<T> Item<T> 31 + where 32 + T: Archive, 33 + T::Archived: for<'a> CheckBytes<HighValidator<'a, rancor::Error>> 34 + + Deserialize<T, Strategy<Pool, rancor::Error>>, 35 + { 36 + pub fn deser(&self) -> AppResult<T> { 37 + rkyv::from_bytes(&self.data).map_err(AppError::from) 38 + } 39 + } 40 + 41 + impl<T: for<'a> Serialize<HighSerializer<AlignedVec, ArenaHandle<'a>, rancor::Error>>> Item<T> { 42 + pub fn new(timestamp: u64, data: &T) -> Self { 43 + Item { 44 + timestamp, 45 + data: unsafe { rkyv::to_bytes(data).unwrap_unchecked() }, 46 + phantom: PhantomData, 47 + } 48 + } 49 + } 50 + 51 + pub struct ItemEncoder<W: Write, T> { 52 + writer: W, 53 + prev_timestamp: u64, 54 + prev_delta: i64, 55 + _item: PhantomData<T>, 56 + } 57 + 58 + impl<W: Write, T> ItemEncoder<W, T> { 59 + pub fn new(writer: W) -> Self { 60 + ItemEncoder { 61 + writer, 62 + prev_timestamp: 0, 63 + prev_delta: 0, 64 + _item: PhantomData, 65 + } 66 + } 67 + 68 + pub fn encode(&mut self, item: &Item<T>) -> AppResult<()> { 69 + if self.prev_timestamp == 0 { 70 + // self.writer.write_varint(item.timestamp)?; 71 + self.prev_timestamp = item.timestamp; 72 + self.write_data(&item.data)?; 73 + return Ok(()); 74 + } 75 + 76 + let delta = (item.timestamp as i128 - self.prev_timestamp as i128) as i64; 77 + 78 + self.writer.write_varint(delta - self.prev_delta)?; 79 + self.prev_timestamp = item.timestamp; 80 + self.prev_delta = delta; 81 + 82 + self.write_data(&item.data)?; 83 + 84 + Ok(()) 85 + } 86 + 87 + fn write_data(&mut self, data: &[u8]) -> AppResult<()> { 88 + self.writer.write_varint(data.len())?; 89 + self.writer.write_all(data)?; 90 + Ok(()) 91 + } 92 + 93 + pub fn finish(mut self) -> AppResult<W> { 94 + self.writer.flush()?; 95 + Ok(self.writer) 96 + } 97 + } 98 + 99 + pub struct ItemDecoder<R, T> { 100 + reader: R, 101 + current_timestamp: u64, 102 + current_delta: i64, 103 + first_item: bool, 104 + _item: PhantomData<T>, 105 + } 106 + 107 + impl<R: Read, T: Archive> ItemDecoder<R, T> { 108 + pub fn new(reader: R, start_timestamp: u64) -> AppResult<Self> { 109 + Ok(ItemDecoder { 110 + reader, 111 + current_timestamp: start_timestamp, 112 + current_delta: 0, 113 + first_item: true, 114 + _item: PhantomData, 115 + }) 116 + } 117 + 118 + pub fn decode(&mut self) -> AppResult<Option<Item<T>>> { 119 + if self.first_item { 120 + // read the first timestamp 121 + // let timestamp = match self.reader.read_varint::<u64>() { 122 + // Ok(timestamp) => timestamp, 123 + // Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 124 + // Err(e) => return Err(e.into()), 125 + // }; 126 + // self.current_timestamp = timestamp; 127 + 128 + let Some(data_raw) = self.read_item()? else { 129 + return Ok(None); 130 + }; 131 + self.first_item = false; 132 + return Ok(Some(Item { 133 + timestamp: self.current_timestamp, 134 + data: data_raw, 135 + phantom: PhantomData, 136 + })); 137 + } 138 + 139 + let Some(_delta) = self.read_timestamp()? else { 140 + return Ok(None); 141 + }; 142 + 143 + // read data 144 + let data_raw = match self.read_item()? { 145 + Some(data_raw) => data_raw, 146 + None => { 147 + return Err(io::Error::new( 148 + io::ErrorKind::UnexpectedEof, 149 + "expected data after delta", 150 + ) 151 + .into()); 152 + } 153 + }; 154 + 155 + Ok(Some(Item { 156 + timestamp: self.current_timestamp, 157 + data: data_raw, 158 + phantom: PhantomData, 159 + })) 160 + } 161 + 162 + // [10, 11, 12, 14] -> [1, 1, 2] -> [0, 1] 163 + fn read_timestamp(&mut self) -> AppResult<Option<u64>> { 164 + let delta = match self.reader.read_varint::<i64>() { 165 + Ok(delta) => delta, 166 + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 167 + Err(e) => return Err(e.into()), 168 + }; 169 + self.current_delta += delta; 170 + self.current_timestamp = 171 + (self.current_timestamp as i128 + self.current_delta as i128) as u64; 172 + Ok(Some(self.current_timestamp)) 173 + } 174 + 175 + fn read_item(&mut self) -> AppResult<Option<AlignedVec>> { 176 + let data_len = match self.reader.read_varint::<usize>() { 177 + Ok(data_len) => data_len, 178 + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), 179 + Err(e) => return Err(e.into()), 180 + }; 181 + let mut data_raw = AlignedVec::with_capacity(data_len); 182 + for _ in 0..data_len { 183 + data_raw.push(0); 184 + } 185 + self.reader.read_exact(data_raw.as_mut_slice())?; 186 + Ok(Some(data_raw)) 187 + } 188 + } 189 + 190 + impl<R: Read, T: Archive> Iterator for ItemDecoder<R, T> { 191 + type Item = AppResult<Item<T>>; 192 + 193 + fn next(&mut self) -> Option<Self::Item> { 194 + self.decode().transpose() 195 + } 196 + } 197 + 198 + pub trait WriteVariableExt: Write { 199 + fn write_varint(&mut self, value: impl Variable) -> io::Result<usize> { 200 + value.encode_variable(self) 201 + } 202 + } 203 + impl<W: Write> WriteVariableExt for W {} 204 + 205 + pub trait ReadVariableExt: Read { 206 + fn read_varint<T: Variable>(&mut self) -> io::Result<T> { 207 + T::decode_variable(self) 208 + } 209 + } 210 + impl<R: Read> ReadVariableExt for R {} 211 + 212 + #[cfg(test)] 213 + mod test { 214 + use super::*; 215 + use rkyv::{Archive, Deserialize, Serialize}; 216 + use std::io::Cursor; 217 + 218 + #[derive(Archive, Deserialize, Serialize, Debug, PartialEq)] 219 + #[rkyv(compare(PartialEq))] 220 + struct TestData { 221 + id: u32, 222 + value: String, 223 + } 224 + 225 + #[test] 226 + fn test_encoder_decoder_single_item() { 227 + let data = TestData { 228 + id: 123, 229 + value: "test".to_string(), 230 + }; 231 + 232 + let item = Item::new(1000, &data); 233 + 234 + // encode 235 + let mut buffer = Vec::new(); 236 + let mut encoder = ItemEncoder::new(&mut buffer); 237 + encoder.encode(&item).unwrap(); 238 + encoder.finish().unwrap(); 239 + 240 + // decode 241 + let cursor = Cursor::new(buffer); 242 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 243 + 244 + let decoded_item = decoder.decode().unwrap().unwrap(); 245 + assert_eq!(decoded_item.timestamp, 1000); 246 + 247 + let decoded_data = decoded_item.access(); 248 + assert_eq!(decoded_data.id, 123); 249 + assert_eq!(decoded_data.value.as_str(), "test"); 250 + } 251 + 252 + #[test] 253 + fn test_encoder_decoder_multiple_items() { 254 + let items = vec![ 255 + Item::new( 256 + 1000, 257 + &TestData { 258 + id: 1, 259 + value: "first".to_string(), 260 + }, 261 + ), 262 + Item::new( 263 + 1010, 264 + &TestData { 265 + id: 2, 266 + value: "second".to_string(), 267 + }, 268 + ), 269 + Item::new( 270 + 1015, 271 + &TestData { 272 + id: 3, 273 + value: "third".to_string(), 274 + }, 275 + ), 276 + Item::new( 277 + 1025, 278 + &TestData { 279 + id: 4, 280 + value: "fourth".to_string(), 281 + }, 282 + ), 283 + ]; 284 + 285 + // encode 286 + let mut buffer = Vec::new(); 287 + let mut encoder = ItemEncoder::new(&mut buffer); 288 + 289 + for item in &items { 290 + encoder.encode(item).unwrap(); 291 + } 292 + encoder.finish().unwrap(); 293 + 294 + // decode 295 + let cursor = Cursor::new(buffer); 296 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 297 + 298 + let mut decoded_items = Vec::new(); 299 + while let Some(item) = decoder.decode().unwrap() { 300 + decoded_items.push(item); 301 + } 302 + 303 + assert_eq!(decoded_items.len(), 4); 304 + 305 + for (original, decoded) in items.iter().zip(decoded_items.iter()) { 306 + assert_eq!(original.timestamp, decoded.timestamp); 307 + assert_eq!(original.access().id, decoded.access().id); 308 + assert_eq!( 309 + original.access().value.as_str(), 310 + decoded.access().value.as_str() 311 + ); 312 + } 313 + } 314 + 315 + #[test] 316 + fn test_encoder_decoder_with_iterator() { 317 + let items = vec![ 318 + Item::new( 319 + 2000, 320 + &TestData { 321 + id: 10, 322 + value: "a".to_string(), 323 + }, 324 + ), 325 + Item::new( 326 + 2005, 327 + &TestData { 328 + id: 20, 329 + value: "b".to_string(), 330 + }, 331 + ), 332 + Item::new( 333 + 2012, 334 + &TestData { 335 + id: 30, 336 + value: "c".to_string(), 337 + }, 338 + ), 339 + ]; 340 + 341 + // encode 342 + let mut buffer = Vec::new(); 343 + let mut encoder = ItemEncoder::new(&mut buffer); 344 + 345 + for item in &items { 346 + encoder.encode(item).unwrap(); 347 + } 348 + encoder.finish().unwrap(); 349 + 350 + // decode 351 + let cursor = Cursor::new(buffer); 352 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 2000).unwrap(); 353 + 354 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 355 + let decoded_items = decoded_items.unwrap(); 356 + 357 + assert_eq!(decoded_items.len(), 3); 358 + assert_eq!(decoded_items[0].timestamp, 2000); 359 + assert_eq!(decoded_items[1].timestamp, 2005); 360 + assert_eq!(decoded_items[2].timestamp, 2012); 361 + 362 + assert_eq!(decoded_items[0].access().id, 10); 363 + assert_eq!(decoded_items[1].access().id, 20); 364 + assert_eq!(decoded_items[2].access().id, 30); 365 + } 366 + 367 + #[test] 368 + fn test_delta_compression() { 369 + let items = vec![ 370 + Item::new( 371 + 1000, 372 + &TestData { 373 + id: 1, 374 + value: "a".to_string(), 375 + }, 376 + ), 377 + Item::new( 378 + 1010, 379 + &TestData { 380 + id: 2, 381 + value: "b".to_string(), 382 + }, 383 + ), // delta = 10 384 + Item::new( 385 + 1020, 386 + &TestData { 387 + id: 3, 388 + value: "c".to_string(), 389 + }, 390 + ), // delta = 10, delta-of-delta = 0 391 + Item::new( 392 + 1025, 393 + &TestData { 394 + id: 4, 395 + value: "d".to_string(), 396 + }, 397 + ), // delta = 5, delta-of-delta = -5 398 + ]; 399 + 400 + let mut buffer = Vec::new(); 401 + let mut encoder = ItemEncoder::new(&mut buffer); 402 + 403 + for item in &items { 404 + encoder.encode(item).unwrap(); 405 + } 406 + encoder.finish().unwrap(); 407 + 408 + // decode and verify 409 + let cursor = Cursor::new(buffer); 410 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 411 + 412 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 413 + let decoded_items = decoded_items.unwrap(); 414 + 415 + for (original, decoded) in items.iter().zip(decoded_items.iter()) { 416 + assert_eq!(original.timestamp, decoded.timestamp); 417 + assert_eq!(original.access().id, decoded.access().id); 418 + } 419 + } 420 + 421 + #[test] 422 + fn test_empty_decode() { 423 + let buffer = Vec::new(); 424 + let cursor = Cursor::new(buffer); 425 + let mut decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 426 + 427 + let result = decoder.decode().unwrap(); 428 + assert!(result.is_none()); 429 + } 430 + 431 + #[test] 432 + fn test_backwards_timestamp() { 433 + let items = vec![ 434 + Item::new( 435 + 1000, 436 + &TestData { 437 + id: 1, 438 + value: "first".to_string(), 439 + }, 440 + ), 441 + Item::new( 442 + 900, 443 + &TestData { 444 + id: 2, 445 + value: "second".to_string(), 446 + }, 447 + ), 448 + ]; 449 + 450 + let mut buffer = Vec::new(); 451 + let mut encoder = ItemEncoder::new(&mut buffer); 452 + 453 + for item in &items { 454 + encoder.encode(item).unwrap(); 455 + } 456 + encoder.finish().unwrap(); 457 + 458 + let cursor = Cursor::new(buffer); 459 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 460 + 461 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 462 + let decoded_items = decoded_items.unwrap(); 463 + 464 + assert_eq!(decoded_items.len(), 2); 465 + assert_eq!(decoded_items[0].timestamp, 1000); 466 + assert_eq!(decoded_items[1].timestamp, 900); 467 + } 468 + 469 + #[test] 470 + fn test_different_data_sizes() { 471 + let small_data = TestData { 472 + id: 1, 473 + value: "x".to_string(), 474 + }; 475 + let large_data = TestData { 476 + id: 2, 477 + value: "a".repeat(1000), 478 + }; 479 + 480 + let items = vec![Item::new(1000, &small_data), Item::new(1001, &large_data)]; 481 + 482 + let mut buffer = Vec::new(); 483 + let mut encoder = ItemEncoder::new(&mut buffer); 484 + 485 + for item in &items { 486 + encoder.encode(item).unwrap(); 487 + } 488 + encoder.finish().unwrap(); 489 + 490 + let cursor = Cursor::new(buffer); 491 + let decoder = ItemDecoder::<_, TestData>::new(cursor, 1000).unwrap(); 492 + 493 + let decoded_items: Result<Vec<_>, _> = decoder.collect(); 494 + let decoded_items = decoded_items.unwrap(); 495 + 496 + assert_eq!(decoded_items.len(), 2); 497 + assert_eq!(decoded_items[0].access().value.as_str(), "x"); 498 + assert_eq!(decoded_items[1].access().value.len(), 1000); 499 + assert_eq!(decoded_items[1].access().value.as_str(), "a".repeat(1000)); 500 + } 501 + }
+424
server/src/db_old/mod.rs
··· 1 + use std::{ 2 + io::Cursor, 3 + ops::{Bound, Deref, RangeBounds}, 4 + path::Path, 5 + sync::{ 6 + Arc, 7 + atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering}, 8 + }, 9 + time::{Duration, Instant}, 10 + }; 11 + 12 + use fjall::{Config, Keyspace, Partition, PartitionCreateOptions, Slice}; 13 + use ordered_varint::Variable; 14 + use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 15 + use smol_str::SmolStr; 16 + use tokio::sync::broadcast; 17 + 18 + use crate::{ 19 + db_old::block::{ReadVariableExt, WriteVariableExt}, 20 + error::{AppError, AppResult}, 21 + jetstream::JetstreamEvent, 22 + utils::{DefaultRateTracker, get_time}, 23 + }; 24 + 25 + mod block; 26 + 27 + #[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 28 + #[rkyv(compare(PartialEq), derive(Debug))] 29 + pub struct NsidCounts { 30 + pub count: u128, 31 + pub deleted_count: u128, 32 + pub last_seen: u64, 33 + } 34 + 35 + #[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 36 + #[rkyv(compare(PartialEq), derive(Debug))] 37 + pub struct NsidHit { 38 + pub deleted: bool, 39 + } 40 + 41 + #[derive(Clone)] 42 + pub struct EventRecord { 43 + pub nsid: SmolStr, 44 + pub timestamp: u64, // seconds 45 + pub deleted: bool, 46 + } 47 + 48 + impl EventRecord { 49 + pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 50 + match event { 51 + JetstreamEvent::Commit { 52 + time_us, commit, .. 53 + } => Some(Self { 54 + nsid: commit.collection.into(), 55 + timestamp: time_us / 1_000_000, 56 + deleted: false, 57 + }), 58 + JetstreamEvent::Delete { 59 + time_us, commit, .. 60 + } => Some(Self { 61 + nsid: commit.collection.into(), 62 + timestamp: time_us / 1_000_000, 63 + deleted: true, 64 + }), 65 + _ => None, 66 + } 67 + } 68 + } 69 + 70 + type ItemDecoder = block::ItemDecoder<Cursor<Slice>, NsidHit>; 71 + type ItemEncoder = block::ItemEncoder<Vec<u8>, NsidHit>; 72 + type Item = block::Item<NsidHit>; 73 + 74 + pub struct LexiconHandle { 75 + tree: Partition, 76 + buf: Arc<scc::Queue<EventRecord>>, 77 + buf_len: AtomicUsize, 78 + last_insert: AtomicU64, 79 + eps: DefaultRateTracker, 80 + block_size: AtomicUsize, 81 + } 82 + 83 + impl LexiconHandle { 84 + fn new(keyspace: &Keyspace, nsid: &str) -> Self { 85 + let opts = PartitionCreateOptions::default().compression(fjall::CompressionType::Miniz(9)); 86 + Self { 87 + tree: keyspace.open_partition(nsid, opts).unwrap(), 88 + buf: Default::default(), 89 + buf_len: AtomicUsize::new(0), 90 + last_insert: AtomicU64::new(0), 91 + eps: DefaultRateTracker::new(Duration::from_secs(5)), 92 + block_size: AtomicUsize::new(1000), 93 + } 94 + } 95 + 96 + fn item_count(&self) -> usize { 97 + self.buf_len.load(AtomicOrdering::Acquire) 98 + } 99 + 100 + fn last_insert(&self) -> u64 { 101 + self.last_insert.load(AtomicOrdering::Acquire) 102 + } 103 + 104 + fn suggested_block_size(&self) -> usize { 105 + self.block_size.load(AtomicOrdering::Relaxed) 106 + } 107 + 108 + fn insert(&self, event: EventRecord) { 109 + self.buf.push(event); 110 + self.buf_len.fetch_add(1, AtomicOrdering::Release); 111 + self.last_insert 112 + .store(get_time().as_millis() as u64, AtomicOrdering::Release); 113 + self.eps.observe(1); 114 + let rate = self.eps.rate() as usize; 115 + if rate != 0 { 116 + self.block_size.store(rate * 60, AtomicOrdering::Relaxed); 117 + } 118 + } 119 + 120 + fn sync(&self, max_block_size: usize) -> AppResult<usize> { 121 + let mut writer = ItemEncoder::new(Vec::with_capacity( 122 + size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(), 123 + )); 124 + let mut start_timestamp = None; 125 + let mut end_timestamp = None; 126 + let mut written = 0_usize; 127 + while let Some(event) = self.buf.pop() { 128 + let item = Item::new( 129 + event.timestamp, 130 + &NsidHit { 131 + deleted: event.deleted, 132 + }, 133 + ); 134 + writer.encode(&item)?; 135 + if start_timestamp.is_none() { 136 + start_timestamp = Some(event.timestamp); 137 + } 138 + end_timestamp = Some(event.timestamp); 139 + if written >= max_block_size { 140 + break; 141 + } 142 + written += 1; 143 + } 144 + if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 145 + self.buf_len.store(0, AtomicOrdering::Release); 146 + let value = writer.finish()?; 147 + let mut key = Vec::with_capacity(size_of::<u64>() * 2); 148 + key.write_varint(start_timestamp)?; 149 + key.write_varint(end_timestamp)?; 150 + self.tree.insert(key, value)?; 151 + } 152 + Ok(written) 153 + } 154 + } 155 + 156 + type BoxedIter<T> = Box<dyn Iterator<Item = T>>; 157 + 158 + // counts is nsid -> NsidCounts 159 + // hits is tree per nsid: varint start time + varint end time -> block of hits 160 + pub struct Db { 161 + inner: Keyspace, 162 + hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>>, 163 + counts: Partition, 164 + event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 165 + eps: DefaultRateTracker, 166 + min_block_size: usize, 167 + max_block_size: usize, 168 + max_last_activity: Duration, 169 + } 170 + 171 + impl Db { 172 + pub fn new(path: impl AsRef<Path>) -> AppResult<Self> { 173 + tracing::info!("opening db..."); 174 + let ks = Config::new(path) 175 + .cache_size(8 * 1024 * 1024) // from talna 176 + .open()?; 177 + Ok(Self { 178 + hits: Default::default(), 179 + counts: ks.open_partition( 180 + "_counts", 181 + PartitionCreateOptions::default().compression(fjall::CompressionType::None), 182 + )?, 183 + inner: ks, 184 + event_broadcaster: broadcast::channel(1000).0, 185 + eps: DefaultRateTracker::new(Duration::from_secs(1)), 186 + min_block_size: 512, 187 + max_block_size: 100_000, 188 + max_last_activity: Duration::from_secs(10), 189 + }) 190 + } 191 + 192 + pub fn sync(&self, all: bool) -> AppResult<()> { 193 + let _guard = scc::ebr::Guard::new(); 194 + for (nsid, tree) in self.hits.iter(&_guard) { 195 + let count = tree.item_count(); 196 + let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size()); 197 + let is_too_old = (get_time().as_millis() as u64 - tree.last_insert()) 198 + > self.max_last_activity.as_millis() as u64; 199 + if count > 0 && (all || is_max_block_size || is_too_old) { 200 + loop { 201 + let synced = tree.sync(self.max_block_size)?; 202 + if synced == 0 { 203 + break; 204 + } 205 + tracing::info!("synced {synced} of {nsid} to db"); 206 + } 207 + } 208 + } 209 + Ok(()) 210 + } 211 + 212 + #[inline(always)] 213 + pub fn eps(&self) -> usize { 214 + self.eps.rate() as usize 215 + } 216 + 217 + #[inline(always)] 218 + pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 219 + self.event_broadcaster.subscribe() 220 + } 221 + 222 + #[inline(always)] 223 + fn maybe_run_in_nsid_tree<T>( 224 + &self, 225 + nsid: &str, 226 + f: impl FnOnce(&LexiconHandle) -> T, 227 + ) -> Option<T> { 228 + let _guard = scc::ebr::Guard::new(); 229 + let handle = match self.hits.peek(nsid, &_guard) { 230 + Some(handle) => handle.clone(), 231 + None => { 232 + if self.inner.partition_exists(nsid) { 233 + let handle = Arc::new(LexiconHandle::new(&self.inner, nsid)); 234 + let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 235 + handle 236 + } else { 237 + return None; 238 + } 239 + } 240 + }; 241 + Some(f(&handle)) 242 + } 243 + 244 + #[inline(always)] 245 + fn run_in_nsid_tree<T>( 246 + &self, 247 + nsid: SmolStr, 248 + f: impl FnOnce(&LexiconHandle) -> AppResult<T>, 249 + ) -> AppResult<T> { 250 + f(self 251 + .hits 252 + .entry(nsid.clone()) 253 + .or_insert_with(move || Arc::new(LexiconHandle::new(&self.inner, &nsid))) 254 + .get()) 255 + } 256 + 257 + pub fn record_event(&self, e: EventRecord) -> AppResult<()> { 258 + let EventRecord { 259 + nsid, 260 + timestamp, 261 + deleted, 262 + } = e.clone(); 263 + 264 + // insert event 265 + self.run_in_nsid_tree(nsid.clone(), move |tree| Ok(tree.insert(e)))?; 266 + // increment count 267 + let mut counts = self.get_count(&nsid)?; 268 + counts.last_seen = timestamp; 269 + if deleted { 270 + counts.deleted_count += 1; 271 + } else { 272 + counts.count += 1; 273 + } 274 + self.insert_count(&nsid, counts.clone())?; 275 + if self.event_broadcaster.receiver_count() > 0 { 276 + let _ = self.event_broadcaster.send((SmolStr::new(&nsid), counts)); 277 + } 278 + self.eps.observe(1); 279 + Ok(()) 280 + } 281 + 282 + #[inline(always)] 283 + fn insert_count(&self, nsid: &str, counts: NsidCounts) -> AppResult<()> { 284 + self.counts 285 + .insert( 286 + nsid, 287 + unsafe { rkyv::to_bytes::<Error>(&counts).unwrap_unchecked() }.as_slice(), 288 + ) 289 + .map_err(AppError::from) 290 + } 291 + 292 + pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 293 + let Some(raw) = self.counts.get(nsid)? else { 294 + return Ok(NsidCounts::default()); 295 + }; 296 + Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 297 + } 298 + 299 + pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 300 + self.counts.iter().map(|res| { 301 + res.map_err(AppError::from).map(|(key, val)| { 302 + ( 303 + SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 304 + unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 305 + ) 306 + }) 307 + }) 308 + } 309 + 310 + pub fn get_nsids(&self) -> impl Iterator<Item = impl Deref<Target = str> + 'static> { 311 + self.inner 312 + .list_partitions() 313 + .into_iter() 314 + .filter(|k| k.deref() != "_counts") 315 + } 316 + 317 + pub fn get_hits_debug(&self, nsid: &str) -> BoxedIter<AppResult<(Slice, Slice)>> { 318 + self.maybe_run_in_nsid_tree(nsid, |handle| -> BoxedIter<AppResult<(Slice, Slice)>> { 319 + Box::new( 320 + handle 321 + .tree 322 + .iter() 323 + .rev() 324 + .map(|res| res.map_err(AppError::from)), 325 + ) 326 + }) 327 + .unwrap_or_else(|| Box::new(std::iter::empty())) 328 + } 329 + 330 + pub fn get_hits( 331 + &self, 332 + nsid: &str, 333 + range: impl RangeBounds<u64> + std::fmt::Debug, 334 + ) -> BoxedIter<AppResult<Item>> { 335 + let start = range 336 + .start_bound() 337 + .cloned() 338 + .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 339 + let end = range 340 + .end_bound() 341 + .cloned() 342 + .map(|t| unsafe { t.to_variable_vec().unwrap_unchecked() }); 343 + let limit = match range.end_bound().cloned() { 344 + Bound::Included(end) => end, 345 + Bound::Excluded(end) => end.saturating_sub(1), 346 + Bound::Unbounded => u64::MAX, 347 + }; 348 + 349 + self.maybe_run_in_nsid_tree(nsid, move |handle| -> BoxedIter<AppResult<Item>> { 350 + let map_block = move |(key, val)| { 351 + let mut key_reader = Cursor::new(key); 352 + let start_timestamp = key_reader.read_varint::<u64>()?; 353 + let items = 354 + ItemDecoder::new(Cursor::new(val), start_timestamp)?.take_while(move |item| { 355 + item.as_ref().map_or(true, |item| item.timestamp <= limit) 356 + }); 357 + Ok(items) 358 + }; 359 + 360 + Box::new( 361 + handle 362 + .tree 363 + .range(TimestampRange { start, end }) 364 + .map(move |res| res.map_err(AppError::from).and_then(map_block)) 365 + .flatten() 366 + .flatten(), 367 + ) 368 + }) 369 + .unwrap_or_else(|| Box::new(std::iter::empty())) 370 + } 371 + 372 + pub fn tracking_since(&self) -> AppResult<u64> { 373 + // HACK: we should actually store when we started tracking but im lazy 374 + // should be accurate enough 375 + self.maybe_run_in_nsid_tree("app.bsky.feed.like", |handle| { 376 + let Some((timestamps_raw, _)) = handle.tree.first_key_value()? else { 377 + return Ok(0); 378 + }; 379 + let mut timestamp_reader = Cursor::new(timestamps_raw); 380 + timestamp_reader 381 + .read_varint::<u64>() 382 + .map_err(AppError::from) 383 + }) 384 + .unwrap_or(Ok(0)) 385 + } 386 + } 387 + 388 + type TimestampRepr = Vec<u8>; 389 + 390 + struct TimestampRange { 391 + start: Bound<TimestampRepr>, 392 + end: Bound<TimestampRepr>, 393 + } 394 + 395 + impl RangeBounds<TimestampRepr> for TimestampRange { 396 + #[inline(always)] 397 + fn start_bound(&self) -> Bound<&TimestampRepr> { 398 + self.start.as_ref() 399 + } 400 + 401 + #[inline(always)] 402 + fn end_bound(&self) -> Bound<&TimestampRepr> { 403 + self.end.as_ref() 404 + } 405 + } 406 + 407 + type TimestampReprOld = [u8; 8]; 408 + 409 + struct TimestampRangeOld { 410 + start: Bound<TimestampReprOld>, 411 + end: Bound<TimestampReprOld>, 412 + } 413 + 414 + impl RangeBounds<TimestampReprOld> for TimestampRangeOld { 415 + #[inline(always)] 416 + fn start_bound(&self) -> Bound<&TimestampReprOld> { 417 + self.start.as_ref() 418 + } 419 + 420 + #[inline(always)] 421 + fn end_bound(&self) -> Bound<&TimestampReprOld> { 422 + self.end.as_ref() 423 + } 424 + }
+4 -17
server/src/main.rs
··· 17 17 18 18 mod api; 19 19 mod db; 20 + mod db_old; 20 21 mod error; 21 22 mod jetstream; 22 23 mod utils; ··· 51 52 } 52 53 Some("debug") => { 53 54 debug(); 54 - return; 55 - } 56 - Some("traindict") => { 57 - train_zstd_dict(); 58 55 return; 59 56 } 60 57 Some(x) => { ··· 211 208 db.sync(true).expect("cant sync db"); 212 209 } 213 210 214 - fn train_zstd_dict() { 215 - let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 216 - let dict_data = db.train_zstd_dict().expect("cant train zstd dict"); 217 - std::fs::write("zstd_dict", dict_data).expect("cant save zstd dict") 218 - } 219 - 220 211 fn debug() { 221 212 let db = Db::new(DbConfig::default(), CancellationToken::new()).expect("couldnt create db"); 222 213 let info = db.info().expect("cant get db info"); ··· 269 260 270 261 fn migrate() { 271 262 let cancel_token = CancellationToken::new(); 272 - let from = Arc::new( 273 - Db::new( 274 - DbConfig::default().path(".fjall_data_from"), 275 - cancel_token.child_token(), 276 - ) 277 - .expect("couldnt create db"), 278 - ); 263 + 264 + let from = Arc::new(db_old::Db::new(".fjall_data_from").expect("couldnt create db")); 265 + 279 266 let to = Arc::new( 280 267 Db::new( 281 268 DbConfig::default().path(".fjall_data_to").ks(|c| {