atproto blogging
1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use clap::Parser;
4use clickhouse::Row;
5use n0_future::StreamExt;
6use smol_str::SmolStr;
7use std::time::{Duration, Instant};
8use tracing::{info, warn};
9use weaver_index::clickhouse::Client;
10use weaver_index::config::{ClickHouseConfig, FirehoseConfig};
11use weaver_index::firehose::{FirehoseConsumer, SubscribeReposMessage, extract_records};
12
13// =============================================================================
14// Benchmark-specific schema (not part of production)
15// =============================================================================
16
17const TABLE_JSON: &str = "raw_records_json";
18const TABLE_CBOR: &str = "raw_records_cbor";
19
20/// Row type for JSON benchmark records
21#[derive(Debug, Clone, Row, serde::Serialize, serde::Deserialize)]
22struct RawRecordJson {
23 did: SmolStr,
24 collection: SmolStr,
25 rkey: SmolStr,
26 cid: String,
27 record: String,
28 operation: SmolStr,
29 seq: u64,
30 #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
31 event_time: DateTime<Utc>,
32}
33
34/// Row type for CBOR benchmark records
35#[derive(Debug, Clone, Row, serde::Serialize, serde::Deserialize)]
36struct RawRecordCbor {
37 did: SmolStr,
38 collection: SmolStr,
39 rkey: SmolStr,
40 cid: String,
41 #[serde(with = "jacquard::serde_bytes_helper")]
42 record: Bytes,
43 operation: SmolStr,
44 seq: u64,
45 #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
46 event_time: DateTime<Utc>,
47}
48
49async fn create_benchmark_tables(client: &Client) -> miette::Result<()> {
50 client
51 .execute(&format!(
52 r#"
53 CREATE TABLE IF NOT EXISTS {} (
54 did String,
55 collection LowCardinality(String),
56 rkey String,
57 cid String,
58 record JSON,
59 operation LowCardinality(String),
60 seq UInt64,
61 event_time DateTime64(3),
62 indexed_at DateTime64(3) DEFAULT now64(3)
63 )
64 ENGINE = MergeTree()
65 ORDER BY (collection, did, rkey, indexed_at)
66 "#,
67 TABLE_JSON
68 ))
69 .await?;
70
71 client
72 .execute(&format!(
73 r#"
74 CREATE TABLE IF NOT EXISTS {} (
75 did String,
76 collection LowCardinality(String),
77 rkey String,
78 cid String,
79 record String,
80 operation LowCardinality(String),
81 seq UInt64,
82 event_time DateTime64(3),
83 indexed_at DateTime64(3) DEFAULT now64(3)
84 )
85 ENGINE = MergeTree()
86 ORDER BY (collection, did, rkey, indexed_at)
87 "#,
88 TABLE_CBOR
89 ))
90 .await?;
91
92 Ok(())
93}
94
95async fn drop_benchmark_tables(client: &Client) -> miette::Result<()> {
96 client
97 .execute(&format!("DROP TABLE IF EXISTS {}", TABLE_JSON))
98 .await?;
99 client
100 .execute(&format!("DROP TABLE IF EXISTS {}", TABLE_CBOR))
101 .await?;
102 Ok(())
103}
104
105// =============================================================================
106// Benchmark logic
107// =============================================================================
108
109/// Tracks firehose lag to detect if we're falling behind
110#[derive(Default)]
111struct LagStats {
112 min_ms: Option<i64>,
113 max_ms: Option<i64>,
114 current_ms: i64,
115 sample_count: u64,
116}
117
118impl LagStats {
119 fn update(&mut self, event_time_ms: i64) {
120 let now_ms = Utc::now().timestamp_millis();
121 let lag = now_ms - event_time_ms;
122
123 self.current_ms = lag;
124 self.sample_count += 1;
125
126 self.min_ms = Some(self.min_ms.map_or(lag, |m| m.min(lag)));
127 self.max_ms = Some(self.max_ms.map_or(lag, |m| m.max(lag)));
128 }
129
130 fn reset_window(&mut self) {
131 // Keep current but reset min/max for next reporting window
132 self.min_ms = Some(self.current_ms);
133 self.max_ms = Some(self.current_ms);
134 }
135}
136
137#[derive(Parser)]
138#[command(name = "storage-benchmark")]
139#[command(about = "Benchmark CBOR vs JSON storage in ClickHouse")]
140struct Args {
141 /// Duration to run the benchmark in minutes
142 #[arg(short, long, default_value = "60")]
143 duration_minutes: u64,
144
145 /// Batch size for ClickHouse inserts
146 #[arg(short, long, default_value = "1000")]
147 batch_size: usize,
148
149 /// Report interval in seconds
150 #[arg(short, long, default_value = "30")]
151 report_interval_secs: u64,
152
153 /// Drop and recreate tables before starting
154 #[arg(long)]
155 reset_tables: bool,
156}
157
158#[tokio::main]
159async fn main() -> miette::Result<()> {
160 dotenvy::dotenv().ok();
161
162 tracing_subscriber::fmt()
163 .with_env_filter(
164 tracing_subscriber::EnvFilter::from_default_env()
165 .add_directive("weaver_index=info".parse().unwrap())
166 .add_directive("storage_benchmark=info".parse().unwrap()),
167 )
168 .init();
169
170 let args = Args::parse();
171
172 info!("Storage Benchmark: CBOR vs JSON in ClickHouse");
173 info!("Duration: {} minutes", args.duration_minutes);
174 info!("Batch size: {}", args.batch_size);
175
176 // Load configs
177 let ch_config = ClickHouseConfig::from_env()?;
178 let firehose_config = FirehoseConfig::from_env()?;
179
180 info!(
181 "Connecting to ClickHouse at:\n{} (database: {})",
182 ch_config.url, ch_config.database
183 );
184 let client = Client::new(&ch_config)?;
185
186 // Reset tables if requested
187 if args.reset_tables {
188 info!("Dropping existing benchmark tables...");
189 drop_benchmark_tables(&client).await?;
190 }
191
192 info!("Creating benchmark tables...");
193 create_benchmark_tables(&client).await?;
194
195 let mut json_inserter = client.inserter::<RawRecordJson>(TABLE_JSON);
196 let mut cbor_inserter = client.inserter::<RawRecordCbor>(TABLE_CBOR);
197
198 info!("Connecting to firehose at:\n {}", firehose_config.relay_url);
199 let consumer = FirehoseConsumer::new(firehose_config);
200 let mut stream = consumer.connect().await?;
201
202 // Tracking
203 let start = Instant::now();
204 let duration = Duration::from_secs(args.duration_minutes * 60);
205 let report_interval = Duration::from_secs(args.report_interval_secs);
206 let mut last_report = Instant::now();
207 let mut total_records = 0u64;
208 let mut total_commits = 0u64;
209 let mut errors = 0u64;
210 let mut lag_stats = LagStats::default();
211
212 info!("Starting benchmark...");
213
214 while start.elapsed() < duration {
215 // Check for report interval
216 if last_report.elapsed() >= report_interval {
217 // Flush inserters so size measurements are accurate
218 match json_inserter.commit().await {
219 Ok(stats) => info!(
220 " JSON flush: {} rows, {} transactions",
221 stats.rows, stats.transactions
222 ),
223 Err(e) => warn!("Failed to flush JSON inserter: {}", e),
224 }
225 match cbor_inserter.commit().await {
226 Ok(stats) => info!(
227 " CBOR flush: {} rows, {} transactions",
228 stats.rows, stats.transactions
229 ),
230 Err(e) => warn!("Failed to flush CBOR inserter: {}", e),
231 }
232
233 report_progress(
234 &client,
235 total_records,
236 total_commits,
237 errors,
238 start.elapsed(),
239 &lag_stats,
240 )
241 .await;
242 lag_stats.reset_window();
243 last_report = Instant::now();
244 }
245
246 // Get next message with timeout
247 let msg = tokio::time::timeout(Duration::from_secs(30), stream.next()).await;
248
249 let msg = match msg {
250 Ok(Some(Ok(msg))) => msg,
251 Ok(Some(Err(e))) => {
252 warn!("Stream error: {}", e);
253 errors += 1;
254 continue;
255 }
256 Ok(None) => {
257 warn!("Stream ended unexpectedly");
258 break;
259 }
260 Err(_) => {
261 warn!("Timeout waiting for message");
262 continue;
263 }
264 };
265
266 // Only process commits
267 let commit = match msg {
268 SubscribeReposMessage::Commit(c) => c,
269 _ => continue,
270 };
271
272 total_commits += 1;
273
274 // Track lag
275 lag_stats.update(commit.time.as_ref().timestamp_millis());
276
277 // Extract records from the commit
278 let records = match extract_records(&commit).await {
279 Ok(r) => r,
280 Err(e) => {
281 warn!("Record extraction error: {}", e);
282 errors += 1;
283 continue;
284 }
285 };
286
287 // Insert to both tables
288 for record in records {
289 // Skip deletes (no record data)
290 let Some(cbor_bytes) = &record.cbor_bytes else {
291 continue;
292 };
293
294 // JSON table: decode CBOR to JSON
295 let json_str = match record.to_json() {
296 Ok(Some(j)) => j,
297 Ok(None) => continue,
298 Err(e) => {
299 warn!("JSON encode error: {}", e);
300 errors += 1;
301 continue;
302 }
303 };
304
305 // Insert JSON record
306 json_inserter
307 .write(&RawRecordJson {
308 did: record.did.clone(),
309 collection: record.collection.clone(),
310 rkey: record.rkey.clone(),
311 cid: record.cid.to_string(),
312 record: json_str,
313 operation: record.operation.clone(),
314 seq: record.seq as u64,
315 event_time: record.event_time,
316 })
317 .await
318 .map_err(|e| weaver_index::error::ClickHouseError::Insert {
319 message: "json insert failed".into(),
320 source: e,
321 })?;
322
323 // Insert CBOR record (raw bytes, no base64)
324 cbor_inserter
325 .write(&RawRecordCbor {
326 did: record.did,
327 collection: record.collection,
328 rkey: record.rkey,
329 cid: record.cid.to_string(),
330 record: cbor_bytes.clone(),
331 operation: record.operation,
332 seq: record.seq as u64,
333 event_time: record.event_time,
334 })
335 .await
336 .map_err(|e| weaver_index::error::ClickHouseError::Insert {
337 message: "cbor insert failed".into(),
338 source: e,
339 })?;
340
341 match json_inserter.commit().await {
342 Ok(_) => {}
343 Err(e) => warn!("Failed to flush JSON inserter: {}", e),
344 }
345 match cbor_inserter.commit().await {
346 Ok(_) => {}
347 Err(e) => warn!("Failed to flush CBOR inserter: {}", e),
348 }
349 total_records += 1;
350 }
351 }
352
353 info!("Flushing remaining records...");
354 json_inserter
355 .end()
356 .await
357 .map_err(|e| weaver_index::error::ClickHouseError::Insert {
358 message: "json flush failed".into(),
359 source: e,
360 })?;
361 cbor_inserter
362 .end()
363 .await
364 .map_err(|e| weaver_index::error::ClickHouseError::Insert {
365 message: "cbor flush failed".into(),
366 source: e,
367 })?;
368
369 info!("\n========== FINAL RESULTS ==========");
370 report_progress(
371 &client,
372 total_records,
373 total_commits,
374 errors,
375 start.elapsed(),
376 &lag_stats,
377 )
378 .await;
379
380 // Detailed size comparison
381 info!("\nStorage Comparison:");
382 let sizes = client.table_sizes(&[TABLE_JSON, TABLE_CBOR]).await?;
383
384 for size in &sizes {
385 info!(
386 " {}: {} compressed, {} uncompressed, {:.2}x ratio, {} rows",
387 size.table,
388 size.compressed_human(),
389 size.uncompressed_human(),
390 size.compression_ratio(),
391 size.row_count
392 );
393 }
394
395 if sizes.len() == 2 {
396 let json_size = sizes.iter().find(|s| s.table == TABLE_JSON);
397 let cbor_size = sizes.iter().find(|s| s.table == TABLE_CBOR);
398
399 if let (Some(json), Some(cbor)) = (json_size, cbor_size) {
400 let compressed_diff = json.compressed_bytes as f64 / cbor.compressed_bytes as f64;
401 let uncompressed_diff = json.uncompressed_bytes as f64 / cbor.uncompressed_bytes as f64;
402
403 info!("\nJSON vs CBOR:");
404 info!(
405 " Compressed: JSON is {:.2}x the size of CBOR",
406 compressed_diff
407 );
408 info!(
409 " Uncompressed: JSON is {:.2}x the size of CBOR",
410 uncompressed_diff
411 );
412
413 if compressed_diff < 1.0 {
414 info!(
415 " Winner (compressed): JSON ({:.1}% smaller)",
416 (1.0 - compressed_diff) * 100.0
417 );
418 } else {
419 info!(
420 " Winner (compressed): CBOR ({:.1}% smaller)",
421 (1.0 - 1.0 / compressed_diff) * 100.0
422 );
423 }
424 }
425 }
426
427 info!("\nBenchmark complete!");
428
429 Ok(())
430}
431
432async fn report_progress(
433 client: &Client,
434 total_records: u64,
435 total_commits: u64,
436 errors: u64,
437 elapsed: Duration,
438 lag: &LagStats,
439) {
440 let records_per_sec = total_records as f64 / elapsed.as_secs_f64();
441
442 info!(
443 "Progress: {} records from {} commits in {:.1}s ({:.1}/s), {} errors",
444 total_records,
445 total_commits,
446 elapsed.as_secs_f64(),
447 records_per_sec,
448 errors
449 );
450
451 if lag.sample_count > 0 {
452 info!(
453 " Lag: current={:.1}s, min={:.1}s, max={:.1}s (window)",
454 lag.current_ms as f64 / 1000.0,
455 lag.min_ms.unwrap_or(0) as f64 / 1000.0,
456 lag.max_ms.unwrap_or(0) as f64 / 1000.0,
457 );
458 }
459
460 // Try to get current sizes
461 match client.table_sizes(&[TABLE_JSON, TABLE_CBOR]).await {
462 Ok(sizes) => {
463 for size in sizes {
464 info!(
465 " {}: {} compressed ({} rows)",
466 size.table,
467 size.compressed_human(),
468 size.row_count
469 );
470 }
471 }
472 Err(e) => {
473 warn!("Failed to query table sizes: {}", e);
474 }
475 }
476}