Live video on the AT Protocol
at natb/analytics 159 lines 4.5 kB view raw
1use crate::config::IngestionConfig; 2use crate::db::clickhouse::{ClickHouseClient, EventRow}; 3use crate::ingest::wal::WriteAheadLog; 4use anyhow::Result; 5use std::sync::Arc; 6use std::time::Duration; 7use tokio::sync::Mutex; 8use tokio::time; 9use tracing::{debug, error, info, warn}; 10use uuid::Uuid; 11 12pub struct EventBuffer { 13 buffer: Arc<Mutex<Vec<EventRow>>>, 14 clickhouse: ClickHouseClient, 15 wal: Option<WriteAheadLog>, 16 config: IngestionConfig, 17} 18 19impl EventBuffer { 20 pub fn new( 21 clickhouse: ClickHouseClient, 22 wal: Option<WriteAheadLog>, 23 config: IngestionConfig, 24 ) -> Self { 25 Self { 26 buffer: Arc::new(Mutex::new(Vec::new())), 27 clickhouse, 28 wal, 29 config, 30 } 31 } 32 33 pub async fn add_events(&self, events: Vec<EventRow>) -> Result<()> { 34 if let Some(wal) = &self.wal { 35 wal.write_events(&events)?; 36 } 37 38 let mut buffer = self.buffer.lock().await; 39 buffer.extend(events); 40 41 if buffer.len() >= self.config.batch_size { 42 drop(buffer); 43 self.flush().await?; 44 } 45 46 Ok(()) 47 } 48 49 pub async fn flush(&self) -> Result<()> { 50 let events = { 51 let mut buffer = self.buffer.lock().await; 52 if buffer.is_empty() { 53 return Ok(()); 54 } 55 std::mem::take(&mut *buffer) 56 }; 57 58 let event_count = events.len(); 59 debug!("flushing {} events to ClickHouse", event_count); 60 61 match self.flush_with_retry(&events).await { 62 Ok(_) => { 63 info!("successfully flushed {} events", event_count); 64 65 if let Some(wal) = &self.wal { 66 let event_ids: Vec<Uuid> = events.iter().map(|e| e.event_id).collect(); 67 if let Err(e) = wal.remove_events(&event_ids) { 68 error!("failed to remove events from WAL: {}", e); 69 } 70 } 71 72 Ok(()) 73 } 74 Err(e) => { 75 error!("failed to flush events after retries: {}", e); 76 77 let mut buffer = self.buffer.lock().await; 78 buffer.extend(events); 79 80 Err(e) 81 } 82 } 83 } 84 85 async fn flush_with_retry(&self, events: &[EventRow]) -> Result<()> { 86 let mut attempts = 0; 87 let max_attempts = self.config.max_retry_attempts; 88 89 loop { 90 match self.clickhouse.insert_events(events.to_vec()).await { 91 Ok(_) => return Ok(()), 92 Err(e) => { 93 attempts += 1; 94 if attempts >= max_attempts { 95 return Err(e); 96 } 97 98 let backoff = Duration::from_millis( 99 self.config.retry_backoff_base_ms * 2u64.pow(attempts - 1), 100 ); 101 102 warn!( 103 "flush attempt {}/{} failed: {}, retrying in {:?}", 104 attempts, max_attempts, e, backoff 105 ); 106 107 time::sleep(backoff).await; 108 } 109 } 110 } 111 } 112 113 pub fn start_periodic_flush(self: Arc<Self>) { 114 let flush_interval = Duration::from_millis(self.config.flush_interval_ms); 115 116 tokio::spawn(async move { 117 let mut interval = time::interval(flush_interval); 118 119 loop { 120 interval.tick().await; 121 122 if let Err(e) = self.flush().await { 123 error!("periodic flush failed: {}", e); 124 } 125 } 126 }); 127 } 128 129 pub async fn replay_wal(&self) -> Result<usize> { 130 let wal = match &self.wal { 131 Some(wal) => wal, 132 None => return Ok(0), 133 }; 134 135 let events = wal.read_all_events()?; 136 let count = events.len(); 137 138 if count == 0 { 139 return Ok(0); 140 } 141 142 info!("replaying {} events from WAL", count); 143 144 dbg!(&events); 145 146 match self.flush_with_retry(&events).await { 147 Ok(_) => { 148 info!("successfully replayed {} events", count); 149 let event_ids: Vec<Uuid> = events.iter().map(|e| e.event_id).collect(); 150 wal.remove_events(&event_ids)?; 151 Ok(count) 152 } 153 Err(e) => { 154 error!("failed to replay WAL events: {}", e); 155 Err(e) 156 } 157 } 158 } 159}