Live video on the AT Protocol
1use crate::config::IngestionConfig;
2use crate::db::clickhouse::{ClickHouseClient, EventRow};
3use crate::ingest::wal::WriteAheadLog;
4use anyhow::Result;
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::Mutex;
8use tokio::time;
9use tracing::{debug, error, info, warn};
10use uuid::Uuid;
11
12pub struct EventBuffer {
13 buffer: Arc<Mutex<Vec<EventRow>>>,
14 clickhouse: ClickHouseClient,
15 wal: Option<WriteAheadLog>,
16 config: IngestionConfig,
17}
18
19impl EventBuffer {
20 pub fn new(
21 clickhouse: ClickHouseClient,
22 wal: Option<WriteAheadLog>,
23 config: IngestionConfig,
24 ) -> Self {
25 Self {
26 buffer: Arc::new(Mutex::new(Vec::new())),
27 clickhouse,
28 wal,
29 config,
30 }
31 }
32
33 pub async fn add_events(&self, events: Vec<EventRow>) -> Result<()> {
34 if let Some(wal) = &self.wal {
35 wal.write_events(&events)?;
36 }
37
38 let mut buffer = self.buffer.lock().await;
39 buffer.extend(events);
40
41 if buffer.len() >= self.config.batch_size {
42 drop(buffer);
43 self.flush().await?;
44 }
45
46 Ok(())
47 }
48
49 pub async fn flush(&self) -> Result<()> {
50 let events = {
51 let mut buffer = self.buffer.lock().await;
52 if buffer.is_empty() {
53 return Ok(());
54 }
55 std::mem::take(&mut *buffer)
56 };
57
58 let event_count = events.len();
59 debug!("flushing {} events to ClickHouse", event_count);
60
61 match self.flush_with_retry(&events).await {
62 Ok(_) => {
63 info!("successfully flushed {} events", event_count);
64
65 if let Some(wal) = &self.wal {
66 let event_ids: Vec<Uuid> = events.iter().map(|e| e.event_id).collect();
67 if let Err(e) = wal.remove_events(&event_ids) {
68 error!("failed to remove events from WAL: {}", e);
69 }
70 }
71
72 Ok(())
73 }
74 Err(e) => {
75 error!("failed to flush events after retries: {}", e);
76
77 let mut buffer = self.buffer.lock().await;
78 buffer.extend(events);
79
80 Err(e)
81 }
82 }
83 }
84
85 async fn flush_with_retry(&self, events: &[EventRow]) -> Result<()> {
86 let mut attempts = 0;
87 let max_attempts = self.config.max_retry_attempts;
88
89 loop {
90 match self.clickhouse.insert_events(events.to_vec()).await {
91 Ok(_) => return Ok(()),
92 Err(e) => {
93 attempts += 1;
94 if attempts >= max_attempts {
95 return Err(e);
96 }
97
98 let backoff = Duration::from_millis(
99 self.config.retry_backoff_base_ms * 2u64.pow(attempts - 1),
100 );
101
102 warn!(
103 "flush attempt {}/{} failed: {}, retrying in {:?}",
104 attempts, max_attempts, e, backoff
105 );
106
107 time::sleep(backoff).await;
108 }
109 }
110 }
111 }
112
113 pub fn start_periodic_flush(self: Arc<Self>) {
114 let flush_interval = Duration::from_millis(self.config.flush_interval_ms);
115
116 tokio::spawn(async move {
117 let mut interval = time::interval(flush_interval);
118
119 loop {
120 interval.tick().await;
121
122 if let Err(e) = self.flush().await {
123 error!("periodic flush failed: {}", e);
124 }
125 }
126 });
127 }
128
129 pub async fn replay_wal(&self) -> Result<usize> {
130 let wal = match &self.wal {
131 Some(wal) => wal,
132 None => return Ok(0),
133 };
134
135 let events = wal.read_all_events()?;
136 let count = events.len();
137
138 if count == 0 {
139 return Ok(0);
140 }
141
142 info!("replaying {} events from WAL", count);
143
144 dbg!(&events);
145
146 match self.flush_with_retry(&events).await {
147 Ok(_) => {
148 info!("successfully replayed {} events", count);
149 let event_ids: Vec<Uuid> = events.iter().map(|e| e.event_id).collect();
150 wal.remove_events(&event_ids)?;
151 Ok(count)
152 }
153 Err(e) => {
154 error!("failed to replay WAL events: {}", e);
155 Err(e)
156 }
157 }
158 }
159}