tracks lexicons and how many times they appeared on the jetstream

feat(server): allow setting a max block size

ptr.pet b7a5f07a 4e1f68a4

verified
+17 -5
+17 -5
server/src/db/mod.rs
··· 309 309 } 310 310 } 311 311 312 - fn sync(&self) -> AppResult<()> { 312 + fn sync(&self, max_block_size: usize) -> AppResult<usize> { 313 313 let mut writer = ItemEncoder::new(Vec::with_capacity( 314 - size_of::<u64>() + self.item_count() * size_of::<(u64, NsidHit)>(), 314 + size_of::<u64>() + self.item_count().min(max_block_size) * size_of::<(u64, NsidHit)>(), 315 315 )); 316 316 let mut start_timestamp = None; 317 317 let mut end_timestamp = None; 318 + let mut written = 0_usize; 318 319 while let Some(event) = self.buf.pop() { 319 320 let item = Item::new( 320 321 event.timestamp, ··· 327 328 start_timestamp = Some(event.timestamp); 328 329 } 329 330 end_timestamp = Some(event.timestamp); 331 + if written >= max_block_size { 332 + break; 333 + } 334 + written += 1; 330 335 } 331 336 if let (Some(start_timestamp), Some(end_timestamp)) = (start_timestamp, end_timestamp) { 332 337 self.buf_len.store(0, AtomicOrdering::Release); ··· 336 341 key.write_varint(end_timestamp)?; 337 342 self.tree.insert(key, value)?; 338 343 } 339 - Ok(()) 344 + Ok(written) 340 345 } 341 346 } 342 347 ··· 351 356 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 352 357 eps: Rate, 353 358 min_block_size: usize, 359 + max_block_size: usize, 354 360 max_last_activity: Duration, 355 361 } 356 362 ··· 370 376 event_broadcaster: broadcast::channel(1000).0, 371 377 eps: Rate::new(Duration::from_secs(1)), 372 378 min_block_size: 512, 379 + max_block_size: 100_000, 373 380 max_last_activity: Duration::from_secs(10), 374 381 }) 375 382 } ··· 381 388 let is_max_block_size = count > self.min_block_size.max(tree.suggested_block_size()); 382 389 let is_too_old = tree.last_insert().elapsed() > self.max_last_activity; 383 390 if count > 0 && (all || is_max_block_size || is_too_old) { 384 - tracing::info!("syncing {count} of {nsid} to db"); 385 - tree.sync()?; 391 + loop { 392 + let synced = tree.sync(self.max_block_size)?; 393 + if synced == 0 { 394 + break; 395 + } 396 + tracing::info!("synced {synced} of {nsid} to db"); 397 + } 386 398 } 387 399 } 388 400 Ok(())