tracks lexicons and how many times they appeared on the jetstream
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 504 lines 17 kB view raw
1use std::{ 2 fmt::Debug, 3 io::Cursor, 4 ops::{Bound, Deref, RangeBounds}, 5 path::Path, 6 time::Duration, 7 u64, 8}; 9 10use ahash::{AHashMap, AHashSet}; 11use byteview::StrView; 12use fjall::{Keyspace, Partition, PartitionCreateOptions}; 13use itertools::{Either, Itertools}; 14use rayon::iter::{IntoParallelIterator, ParallelIterator}; 15use rclite::Arc; 16use rkyv::{Archive, Deserialize, Serialize, rancor::Error}; 17use smol_str::{SmolStr, ToSmolStr}; 18use tokio::sync::broadcast; 19use tokio_util::sync::CancellationToken; 20 21use crate::{ 22 db::handle::{ItemDecoder, LexiconHandle}, 23 error::{AppError, AppResult}, 24 jetstream::JetstreamEvent, 25 utils::{CLOCK, RateTracker, ReadVariableExt, varints_unsigned_encoded}, 26}; 27 28mod block; 29mod handle; 30 31#[derive(Clone, Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 32#[rkyv(compare(PartialEq), derive(Debug))] 33pub struct NsidCounts { 34 pub count: u128, 35 pub deleted_count: u128, 36 pub last_seen: u64, 37} 38 39#[derive(Debug, Default, Archive, Deserialize, Serialize, PartialEq)] 40#[rkyv(compare(PartialEq), derive(Debug))] 41pub struct NsidHit { 42 pub deleted: bool, 43} 44 45#[derive(Clone)] 46pub struct EventRecord { 47 pub nsid: SmolStr, 48 pub timestamp: u64, // seconds 49 pub deleted: bool, 50} 51 52impl EventRecord { 53 pub fn from_jetstream(event: JetstreamEvent) -> Option<Self> { 54 match event { 55 JetstreamEvent::Commit { 56 time_us, commit, .. 57 } => Some(Self { 58 nsid: commit.collection.into(), 59 timestamp: time_us / 1_000_000, 60 deleted: false, 61 }), 62 JetstreamEvent::Delete { 63 time_us, commit, .. 64 } => Some(Self { 65 nsid: commit.collection.into(), 66 timestamp: time_us / 1_000_000, 67 deleted: true, 68 }), 69 _ => None, 70 } 71 } 72} 73 74pub struct DbInfo { 75 pub nsids: AHashMap<SmolStr, Vec<usize>>, 76 pub disk_size: u64, 77} 78 79pub struct DbConfig { 80 pub ks_config: fjall::Config, 81 pub min_block_size: usize, 82 pub max_block_size: usize, 83 pub max_last_activity: Duration, 84} 85 86impl DbConfig { 87 pub fn path(mut self, path: impl AsRef<Path>) -> Self { 88 self.ks_config = fjall::Config::new(path); 89 self 90 } 91 92 pub fn ks(mut self, f: impl FnOnce(fjall::Config) -> fjall::Config) -> Self { 93 self.ks_config = f(self.ks_config); 94 self 95 } 96} 97 98impl Default for DbConfig { 99 fn default() -> Self { 100 Self { 101 ks_config: fjall::Config::default() 102 .cache_size(1024 * 1024 * 512) 103 .max_write_buffer_size(u64::MAX), 104 min_block_size: 1000, 105 max_block_size: 250_000, 106 max_last_activity: Duration::from_secs(10), 107 } 108 } 109} 110 111// counts is nsid -> NsidCounts 112// hits is tree per nsid: varint start time + varint end time -> block of hits 113pub struct Db { 114 pub cfg: DbConfig, 115 pub ks: Keyspace, 116 counts: Partition, 117 hits: scc::HashIndex<SmolStr, Arc<LexiconHandle>, ahash::RandomState>, 118 sync_pool: threadpool::ThreadPool, 119 event_broadcaster: broadcast::Sender<(SmolStr, NsidCounts)>, 120 eps: RateTracker<100>, // 100 millis buckets 121 cancel_token: CancellationToken, 122} 123 124impl Db { 125 pub fn new(cfg: DbConfig, cancel_token: CancellationToken) -> AppResult<Self> { 126 tracing::info!("opening db..."); 127 let ks = cfg.ks_config.clone().open()?; 128 Ok(Self { 129 cfg, 130 hits: Default::default(), 131 sync_pool: threadpool::Builder::new() 132 .num_threads(rayon::current_num_threads() * 2) 133 .build(), 134 counts: ks.open_partition( 135 "_counts", 136 PartitionCreateOptions::default().compression(fjall::CompressionType::None), 137 )?, 138 ks, 139 event_broadcaster: broadcast::channel(1000).0, 140 eps: RateTracker::new(Duration::from_secs(1)), 141 cancel_token, 142 }) 143 } 144 145 #[inline(always)] 146 pub fn shutting_down(&self) -> impl Future<Output = ()> { 147 self.cancel_token.cancelled() 148 } 149 150 #[inline(always)] 151 pub fn is_shutting_down(&self) -> bool { 152 self.cancel_token.is_cancelled() 153 } 154 155 #[inline(always)] 156 pub fn eps(&self) -> usize { 157 self.eps.rate() as usize 158 } 159 160 #[inline(always)] 161 pub fn new_listener(&self) -> broadcast::Receiver<(SmolStr, NsidCounts)> { 162 self.event_broadcaster.subscribe() 163 } 164 165 pub fn sync(&self, all: bool) -> AppResult<()> { 166 let start = CLOCK.now(); 167 // prepare all the data 168 let nsids_len = self.hits.len(); 169 let mut data = Vec::with_capacity(nsids_len); 170 let mut nsids = AHashSet::with_capacity(nsids_len); 171 let _guard = scc::ebr::Guard::new(); 172 for (nsid, handle) in self.hits.iter(&_guard) { 173 let mut nsid_data = Vec::with_capacity(2); 174 // let mut total_count = 0; 175 let is_too_old = handle.since_last_activity() > self.cfg.max_last_activity; 176 // if we disconnect for a long time, we want to sync all of what we 177 // have to avoid having many small blocks (even if we run compaction 178 // later, it reduces work until we run compaction) 179 let block_size = (is_too_old || all) 180 .then_some(self.cfg.max_block_size) 181 .unwrap_or_else(|| { 182 self.cfg 183 .max_block_size 184 .min(self.cfg.min_block_size.max(handle.suggested_block_size())) 185 }); 186 let count = handle.item_count(); 187 let data_count = count / block_size; 188 if count > 0 && (all || data_count > 0 || is_too_old) { 189 for _ in 0..data_count { 190 nsid_data.push((handle.clone(), block_size)); 191 // total_count += block_size; 192 } 193 // only sync remainder if we haven't met block size 194 let remainder = count % block_size; 195 if (all || data_count == 0) && remainder > 0 { 196 nsid_data.push((handle.clone(), remainder)); 197 // total_count += remainder; 198 } 199 } 200 let _span = handle.span().entered(); 201 if nsid_data.len() > 0 { 202 // tracing::info!( 203 // {blocks = %nsid_data.len(), count = %total_count}, 204 // "will encode & sync", 205 // ); 206 nsids.insert(nsid.clone()); 207 data.push(nsid_data); 208 } 209 } 210 drop(_guard); 211 212 // process the blocks 213 data.into_par_iter() 214 .map(|chunk| { 215 chunk 216 .into_iter() 217 .map(|(handle, max_block_size)| { 218 (handle.take_block_items(max_block_size), handle) 219 }) 220 .collect::<Vec<_>>() 221 .into_par_iter() 222 .map(|(items, handle)| { 223 let count = items.len(); 224 let block = LexiconHandle::encode_block_from_items(items, count)?; 225 AppResult::Ok((block, handle)) 226 }) 227 .collect::<Result<Vec<_>, _>>() 228 }) 229 .try_for_each(|chunk| { 230 let chunk = chunk?; 231 for (block, handle) in chunk { 232 self.sync_pool.execute(move || { 233 let _span = handle.span().entered(); 234 let written = block.written; 235 match handle.insert_block(block) { 236 Ok(_) => { 237 tracing::info!({count = %written}, "synced") 238 } 239 Err(err) => tracing::error!({ err = %err }, "failed to sync block"), 240 } 241 }); 242 } 243 AppResult::Ok(()) 244 })?; 245 self.sync_pool.join(); 246 247 // update snapshots for all (changed) handles 248 for nsid in nsids { 249 self.hits.peek_with(&nsid, |_, handle| handle.update_tree()); 250 } 251 252 tracing::info!(time = %start.elapsed().as_secs_f64(), "synced all blocks"); 253 254 Ok(()) 255 } 256 257 pub fn compact( 258 &self, 259 nsid: impl AsRef<str>, 260 max_count: usize, 261 range: impl RangeBounds<u64>, 262 sort: bool, 263 ) -> AppResult<()> { 264 let Some(handle) = self.get_handle(nsid) else { 265 return Ok(()); 266 }; 267 handle.compact(max_count, range, sort)?; 268 handle.update_tree(); 269 Ok(()) 270 } 271 272 pub fn compact_all( 273 &self, 274 max_count: usize, 275 range: impl RangeBounds<u64> + Clone, 276 sort: bool, 277 ) -> AppResult<()> { 278 for nsid in self.get_nsids() { 279 self.compact(nsid, max_count, range.clone(), sort)?; 280 } 281 Ok(()) 282 } 283 284 pub fn major_compact(&self) -> AppResult<()> { 285 self.compact_all(self.cfg.max_block_size, .., true)?; 286 Ok(()) 287 } 288 289 #[inline(always)] 290 fn get_handle(&self, nsid: impl AsRef<str>) -> Option<Arc<LexiconHandle>> { 291 let _guard = scc::ebr::Guard::new(); 292 let handle = match self.hits.peek(nsid.as_ref(), &_guard) { 293 Some(handle) => handle.clone(), 294 None => { 295 if self.ks.partition_exists(nsid.as_ref()) { 296 let handle = Arc::new(LexiconHandle::new(&self.ks, nsid.as_ref())); 297 let _ = self.hits.insert(SmolStr::new(nsid), handle.clone()); 298 handle 299 } else { 300 return None; 301 } 302 } 303 }; 304 Some(handle) 305 } 306 307 #[inline(always)] 308 fn ensure_handle(&self, nsid: &SmolStr) -> impl Deref<Target = Arc<LexiconHandle>> + use<'_> { 309 self.hits 310 .entry(nsid.clone()) 311 .or_insert_with(|| Arc::new(LexiconHandle::new(&self.ks, &nsid))) 312 } 313 314 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 315 let mut seen_events = 0; 316 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 317 let mut counts = self.get_count(&key)?; 318 self.ensure_handle(&key).queue(chunk.inspect(|e| { 319 // increment count 320 counts.last_seen = e.timestamp; 321 if e.deleted { 322 counts.deleted_count += 1; 323 } else { 324 counts.count += 1; 325 } 326 seen_events += 1; 327 })); 328 self.insert_count(&key, &counts)?; 329 if self.event_broadcaster.receiver_count() > 0 { 330 let _ = self.event_broadcaster.send((key, counts)); 331 } 332 } 333 self.eps.observe(seen_events); 334 Ok(()) 335 } 336 337 #[inline(always)] 338 fn insert_count(&self, nsid: &str, counts: &NsidCounts) -> AppResult<()> { 339 self.counts 340 .insert( 341 nsid, 342 unsafe { rkyv::to_bytes::<Error>(counts).unwrap_unchecked() }.as_slice(), 343 ) 344 .map_err(AppError::from) 345 } 346 347 pub fn get_count(&self, nsid: &str) -> AppResult<NsidCounts> { 348 let Some(raw) = self.counts.get(nsid)? else { 349 return Ok(NsidCounts::default()); 350 }; 351 Ok(unsafe { rkyv::from_bytes_unchecked::<_, Error>(&raw).unwrap_unchecked() }) 352 } 353 354 pub fn get_counts(&self) -> impl Iterator<Item = AppResult<(SmolStr, NsidCounts)>> { 355 self.counts.iter().map(|res| { 356 res.map_err(AppError::from).map(|(key, val)| { 357 ( 358 SmolStr::new(unsafe { str::from_utf8_unchecked(&key) }), 359 unsafe { rkyv::from_bytes_unchecked::<_, Error>(&val).unwrap_unchecked() }, 360 ) 361 }) 362 }) 363 } 364 365 pub fn get_nsids(&self) -> impl Iterator<Item = StrView> { 366 self.ks 367 .list_partitions() 368 .into_iter() 369 .filter(|k| k.deref() != "_counts") 370 } 371 372 pub fn info(&self) -> AppResult<DbInfo> { 373 let mut nsids = AHashMap::new(); 374 for nsid in self.get_nsids() { 375 let Some(handle) = self.get_handle(&nsid) else { 376 continue; 377 }; 378 let block_lens = handle 379 .read() 380 .iter() 381 .rev() 382 .try_fold(Vec::new(), |mut acc, item| { 383 let (key, value) = item?; 384 let mut timestamps = Cursor::new(key); 385 let start_timestamp = timestamps.read_varint()?; 386 let decoder = ItemDecoder::new(Cursor::new(value), start_timestamp)?; 387 acc.push(decoder.item_count()); 388 AppResult::Ok(acc) 389 })?; 390 nsids.insert(nsid.to_smolstr(), block_lens); 391 } 392 Ok(DbInfo { 393 nsids, 394 disk_size: self.ks.disk_space(), 395 }) 396 } 397 398 pub fn get_hits( 399 &self, 400 nsid: &str, 401 range: impl RangeBounds<u64> + std::fmt::Debug, 402 max_items: usize, 403 ) -> impl Iterator<Item = AppResult<handle::Item>> { 404 let start_limit = match range.start_bound().cloned() { 405 Bound::Included(start) => start, 406 Bound::Excluded(start) => start.saturating_add(1), 407 Bound::Unbounded => 0, 408 }; 409 let end_limit = match range.end_bound().cloned() { 410 Bound::Included(end) => end, 411 Bound::Excluded(end) => end.saturating_sub(1), 412 Bound::Unbounded => u64::MAX, 413 }; 414 let end_key = varints_unsigned_encoded([end_limit]); 415 416 let Some(handle) = self.get_handle(nsid) else { 417 return Either::Right(std::iter::empty()); 418 }; 419 420 // let mut ts = CLOCK.now(); 421 let map_block = move |(res, current_item_count)| -> AppResult<(Option<_>, usize)> { 422 if current_item_count >= max_items { 423 return Ok((None, current_item_count)); 424 } 425 let (key, val) = res?; 426 let mut key_reader = Cursor::new(key); 427 let start_timestamp = key_reader.read_varint::<u64>()?; 428 // let end_timestamp = key_reader.read_varint::<u64>()?; 429 if start_timestamp < start_limit { 430 // tracing::info!( 431 // "stopped at block with timestamps {start_timestamp}..{end_timestamp} because {start_limit} is greater" 432 // ); 433 return Ok((None, current_item_count)); 434 } 435 let decoder = handle::ItemDecoder::new(Cursor::new(val), start_timestamp)?; 436 let current_item_count = current_item_count + decoder.item_count(); 437 // tracing::info!( 438 // "took {}ns to get block with size {}", 439 // ts.elapsed().as_nanos(), 440 // decoder.item_count() 441 // ); 442 // ts = CLOCK.now(); 443 Ok(( 444 Some( 445 decoder 446 .take_while(move |item| { 447 item.as_ref().map_or(true, |item| { 448 item.timestamp <= end_limit && item.timestamp >= start_limit 449 }) 450 }) 451 .map(|res| res.map_err(AppError::from)), 452 ), 453 current_item_count, 454 )) 455 }; 456 457 let (blocks, _counted) = handle 458 .read() 459 .range(..end_key) 460 .map(|res| res.map_err(AppError::from)) 461 .rev() 462 .fold_while( 463 (Vec::with_capacity(20), 0), 464 |(mut blocks, current_item_count), res| { 465 use itertools::FoldWhile::*; 466 467 match map_block((res, current_item_count)) { 468 Ok((Some(block), current_item_count)) => { 469 blocks.push(Ok(block)); 470 Continue((blocks, current_item_count)) 471 } 472 Ok((None, current_item_count)) => Done((blocks, current_item_count)), 473 Err(err) => { 474 blocks.push(Err(err)); 475 Done((blocks, current_item_count)) 476 } 477 } 478 }, 479 ) 480 .into_inner(); 481 482 // tracing::info!( 483 // "got blocks with size {}, item count {counted}", 484 // blocks.len() 485 // ); 486 487 Either::Left(blocks.into_iter().rev().flatten().flatten()) 488 } 489 490 pub fn tracking_since(&self) -> AppResult<u64> { 491 // HACK: we should actually store when we started tracking but im lazy 492 // this should be accurate enough 493 let Some(handle) = self.get_handle("app.bsky.feed.like") else { 494 return Ok(0); 495 }; 496 let Some((timestamps_raw, _)) = handle.read().first_key_value()? else { 497 return Ok(0); 498 }; 499 let mut timestamp_reader = Cursor::new(timestamps_raw); 500 timestamp_reader 501 .read_varint::<u64>() 502 .map_err(AppError::from) 503 } 504}