tracks lexicons and how many times they appeared on the jetstream

refactor(server): observe events for eps after ingesting all the events, not for each kind

ptr.pet c2522740 5b5cc07c

verified
Changed files
+4 -5
server
-1
server/src/api.rs
··· 1 1 use std::{ 2 - collections::VecDeque, 3 2 fmt::Display, 4 3 net::SocketAddr, 5 4 ops::{Bound, Deref, RangeBounds},
+3 -3
server/src/db/mod.rs
··· 312 312 } 313 313 314 314 pub fn ingest_events(&self, events: impl Iterator<Item = EventRecord>) -> AppResult<()> { 315 + let mut seen_events = 0; 315 316 for (key, chunk) in events.chunk_by(|event| event.nsid.clone()).into_iter() { 316 317 let mut counts = self.get_count(&key)?; 317 - let mut count = 0; 318 318 self.ensure_handle(&key).queue(chunk.inspect(|e| { 319 319 // increment count 320 320 counts.last_seen = e.timestamp; ··· 323 323 } else { 324 324 counts.count += 1; 325 325 } 326 - count += 1; 326 + seen_events += 1; 327 327 })); 328 - self.eps.observe(count); 329 328 self.insert_count(&key, &counts)?; 330 329 if self.event_broadcaster.receiver_count() > 0 { 331 330 let _ = self.event_broadcaster.send((key, counts)); 332 331 } 333 332 } 333 + self.eps.observe(seen_events); 334 334 Ok(()) 335 335 } 336 336
+1 -1
server/src/main.rs
··· 105 105 move || { 106 106 let mut buffer = Vec::new(); 107 107 loop { 108 - let read = event_rx.blocking_recv_many(&mut buffer, 100); 108 + let read = event_rx.blocking_recv_many(&mut buffer, 500); 109 109 if let Err(err) = db.ingest_events(buffer.drain(..)) { 110 110 tracing::error!("failed to ingest events: {}", err); 111 111 }