at main 476 lines 15 kB view raw
1use bytes::Bytes; 2use chrono::{DateTime, Utc}; 3use clap::Parser; 4use clickhouse::Row; 5use n0_future::StreamExt; 6use smol_str::SmolStr; 7use std::time::{Duration, Instant}; 8use tracing::{info, warn}; 9use weaver_index::clickhouse::Client; 10use weaver_index::config::{ClickHouseConfig, FirehoseConfig}; 11use weaver_index::firehose::{FirehoseConsumer, SubscribeReposMessage, extract_records}; 12 13// ============================================================================= 14// Benchmark-specific schema (not part of production) 15// ============================================================================= 16 17const TABLE_JSON: &str = "raw_records_json"; 18const TABLE_CBOR: &str = "raw_records_cbor"; 19 20/// Row type for JSON benchmark records 21#[derive(Debug, Clone, Row, serde::Serialize, serde::Deserialize)] 22struct RawRecordJson { 23 did: SmolStr, 24 collection: SmolStr, 25 rkey: SmolStr, 26 cid: String, 27 record: String, 28 operation: SmolStr, 29 seq: u64, 30 #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] 31 event_time: DateTime<Utc>, 32} 33 34/// Row type for CBOR benchmark records 35#[derive(Debug, Clone, Row, serde::Serialize, serde::Deserialize)] 36struct RawRecordCbor { 37 did: SmolStr, 38 collection: SmolStr, 39 rkey: SmolStr, 40 cid: String, 41 #[serde(with = "jacquard::serde_bytes_helper")] 42 record: Bytes, 43 operation: SmolStr, 44 seq: u64, 45 #[serde(with = "clickhouse::serde::chrono::datetime64::millis")] 46 event_time: DateTime<Utc>, 47} 48 49async fn create_benchmark_tables(client: &Client) -> miette::Result<()> { 50 client 51 .execute(&format!( 52 r#" 53 CREATE TABLE IF NOT EXISTS {} ( 54 did String, 55 collection LowCardinality(String), 56 rkey String, 57 cid String, 58 record JSON, 59 operation LowCardinality(String), 60 seq UInt64, 61 event_time DateTime64(3), 62 indexed_at DateTime64(3) DEFAULT now64(3) 63 ) 64 ENGINE = MergeTree() 65 ORDER BY (collection, did, rkey, indexed_at) 66 "#, 67 TABLE_JSON 68 )) 69 .await?; 70 71 client 72 .execute(&format!( 73 r#" 74 CREATE TABLE IF NOT EXISTS {} ( 75 did String, 76 collection LowCardinality(String), 77 rkey String, 78 cid String, 79 record String, 80 operation LowCardinality(String), 81 seq UInt64, 82 event_time DateTime64(3), 83 indexed_at DateTime64(3) DEFAULT now64(3) 84 ) 85 ENGINE = MergeTree() 86 ORDER BY (collection, did, rkey, indexed_at) 87 "#, 88 TABLE_CBOR 89 )) 90 .await?; 91 92 Ok(()) 93} 94 95async fn drop_benchmark_tables(client: &Client) -> miette::Result<()> { 96 client 97 .execute(&format!("DROP TABLE IF EXISTS {}", TABLE_JSON)) 98 .await?; 99 client 100 .execute(&format!("DROP TABLE IF EXISTS {}", TABLE_CBOR)) 101 .await?; 102 Ok(()) 103} 104 105// ============================================================================= 106// Benchmark logic 107// ============================================================================= 108 109/// Tracks firehose lag to detect if we're falling behind 110#[derive(Default)] 111struct LagStats { 112 min_ms: Option<i64>, 113 max_ms: Option<i64>, 114 current_ms: i64, 115 sample_count: u64, 116} 117 118impl LagStats { 119 fn update(&mut self, event_time_ms: i64) { 120 let now_ms = Utc::now().timestamp_millis(); 121 let lag = now_ms - event_time_ms; 122 123 self.current_ms = lag; 124 self.sample_count += 1; 125 126 self.min_ms = Some(self.min_ms.map_or(lag, |m| m.min(lag))); 127 self.max_ms = Some(self.max_ms.map_or(lag, |m| m.max(lag))); 128 } 129 130 fn reset_window(&mut self) { 131 // Keep current but reset min/max for next reporting window 132 self.min_ms = Some(self.current_ms); 133 self.max_ms = Some(self.current_ms); 134 } 135} 136 137#[derive(Parser)] 138#[command(name = "storage-benchmark")] 139#[command(about = "Benchmark CBOR vs JSON storage in ClickHouse")] 140struct Args { 141 /// Duration to run the benchmark in minutes 142 #[arg(short, long, default_value = "60")] 143 duration_minutes: u64, 144 145 /// Batch size for ClickHouse inserts 146 #[arg(short, long, default_value = "1000")] 147 batch_size: usize, 148 149 /// Report interval in seconds 150 #[arg(short, long, default_value = "30")] 151 report_interval_secs: u64, 152 153 /// Drop and recreate tables before starting 154 #[arg(long)] 155 reset_tables: bool, 156} 157 158#[tokio::main] 159async fn main() -> miette::Result<()> { 160 dotenvy::dotenv().ok(); 161 162 tracing_subscriber::fmt() 163 .with_env_filter( 164 tracing_subscriber::EnvFilter::from_default_env() 165 .add_directive("weaver_index=info".parse().unwrap()) 166 .add_directive("storage_benchmark=info".parse().unwrap()), 167 ) 168 .init(); 169 170 let args = Args::parse(); 171 172 info!("Storage Benchmark: CBOR vs JSON in ClickHouse"); 173 info!("Duration: {} minutes", args.duration_minutes); 174 info!("Batch size: {}", args.batch_size); 175 176 // Load configs 177 let ch_config = ClickHouseConfig::from_env()?; 178 let firehose_config = FirehoseConfig::from_env()?; 179 180 info!( 181 "Connecting to ClickHouse at:\n{} (database: {})", 182 ch_config.url, ch_config.database 183 ); 184 let client = Client::new(&ch_config)?; 185 186 // Reset tables if requested 187 if args.reset_tables { 188 info!("Dropping existing benchmark tables..."); 189 drop_benchmark_tables(&client).await?; 190 } 191 192 info!("Creating benchmark tables..."); 193 create_benchmark_tables(&client).await?; 194 195 let mut json_inserter = client.inserter::<RawRecordJson>(TABLE_JSON); 196 let mut cbor_inserter = client.inserter::<RawRecordCbor>(TABLE_CBOR); 197 198 info!("Connecting to firehose at:\n {}", firehose_config.relay_url); 199 let consumer = FirehoseConsumer::new(firehose_config); 200 let mut stream = consumer.connect().await?; 201 202 // Tracking 203 let start = Instant::now(); 204 let duration = Duration::from_secs(args.duration_minutes * 60); 205 let report_interval = Duration::from_secs(args.report_interval_secs); 206 let mut last_report = Instant::now(); 207 let mut total_records = 0u64; 208 let mut total_commits = 0u64; 209 let mut errors = 0u64; 210 let mut lag_stats = LagStats::default(); 211 212 info!("Starting benchmark..."); 213 214 while start.elapsed() < duration { 215 // Check for report interval 216 if last_report.elapsed() >= report_interval { 217 // Flush inserters so size measurements are accurate 218 match json_inserter.commit().await { 219 Ok(stats) => info!( 220 " JSON flush: {} rows, {} transactions", 221 stats.rows, stats.transactions 222 ), 223 Err(e) => warn!("Failed to flush JSON inserter: {}", e), 224 } 225 match cbor_inserter.commit().await { 226 Ok(stats) => info!( 227 " CBOR flush: {} rows, {} transactions", 228 stats.rows, stats.transactions 229 ), 230 Err(e) => warn!("Failed to flush CBOR inserter: {}", e), 231 } 232 233 report_progress( 234 &client, 235 total_records, 236 total_commits, 237 errors, 238 start.elapsed(), 239 &lag_stats, 240 ) 241 .await; 242 lag_stats.reset_window(); 243 last_report = Instant::now(); 244 } 245 246 // Get next message with timeout 247 let msg = tokio::time::timeout(Duration::from_secs(30), stream.next()).await; 248 249 let msg = match msg { 250 Ok(Some(Ok(msg))) => msg, 251 Ok(Some(Err(e))) => { 252 warn!("Stream error: {}", e); 253 errors += 1; 254 continue; 255 } 256 Ok(None) => { 257 warn!("Stream ended unexpectedly"); 258 break; 259 } 260 Err(_) => { 261 warn!("Timeout waiting for message"); 262 continue; 263 } 264 }; 265 266 // Only process commits 267 let commit = match msg { 268 SubscribeReposMessage::Commit(c) => c, 269 _ => continue, 270 }; 271 272 total_commits += 1; 273 274 // Track lag 275 lag_stats.update(commit.time.as_ref().timestamp_millis()); 276 277 // Extract records from the commit 278 let records = match extract_records(&commit).await { 279 Ok(r) => r, 280 Err(e) => { 281 warn!("Record extraction error: {}", e); 282 errors += 1; 283 continue; 284 } 285 }; 286 287 // Insert to both tables 288 for record in records { 289 // Skip deletes (no record data) 290 let Some(cbor_bytes) = &record.cbor_bytes else { 291 continue; 292 }; 293 294 // JSON table: decode CBOR to JSON 295 let json_str = match record.to_json() { 296 Ok(Some(j)) => j, 297 Ok(None) => continue, 298 Err(e) => { 299 warn!("JSON encode error: {}", e); 300 errors += 1; 301 continue; 302 } 303 }; 304 305 // Insert JSON record 306 json_inserter 307 .write(&RawRecordJson { 308 did: record.did.clone(), 309 collection: record.collection.clone(), 310 rkey: record.rkey.clone(), 311 cid: record.cid.to_string(), 312 record: json_str, 313 operation: record.operation.clone(), 314 seq: record.seq as u64, 315 event_time: record.event_time, 316 }) 317 .await 318 .map_err(|e| weaver_index::error::ClickHouseError::Insert { 319 message: "json insert failed".into(), 320 source: e, 321 })?; 322 323 // Insert CBOR record (raw bytes, no base64) 324 cbor_inserter 325 .write(&RawRecordCbor { 326 did: record.did, 327 collection: record.collection, 328 rkey: record.rkey, 329 cid: record.cid.to_string(), 330 record: cbor_bytes.clone(), 331 operation: record.operation, 332 seq: record.seq as u64, 333 event_time: record.event_time, 334 }) 335 .await 336 .map_err(|e| weaver_index::error::ClickHouseError::Insert { 337 message: "cbor insert failed".into(), 338 source: e, 339 })?; 340 341 match json_inserter.commit().await { 342 Ok(_) => {} 343 Err(e) => warn!("Failed to flush JSON inserter: {}", e), 344 } 345 match cbor_inserter.commit().await { 346 Ok(_) => {} 347 Err(e) => warn!("Failed to flush CBOR inserter: {}", e), 348 } 349 total_records += 1; 350 } 351 } 352 353 info!("Flushing remaining records..."); 354 json_inserter 355 .end() 356 .await 357 .map_err(|e| weaver_index::error::ClickHouseError::Insert { 358 message: "json flush failed".into(), 359 source: e, 360 })?; 361 cbor_inserter 362 .end() 363 .await 364 .map_err(|e| weaver_index::error::ClickHouseError::Insert { 365 message: "cbor flush failed".into(), 366 source: e, 367 })?; 368 369 info!("\n========== FINAL RESULTS =========="); 370 report_progress( 371 &client, 372 total_records, 373 total_commits, 374 errors, 375 start.elapsed(), 376 &lag_stats, 377 ) 378 .await; 379 380 // Detailed size comparison 381 info!("\nStorage Comparison:"); 382 let sizes = client.table_sizes(&[TABLE_JSON, TABLE_CBOR]).await?; 383 384 for size in &sizes { 385 info!( 386 " {}: {} compressed, {} uncompressed, {:.2}x ratio, {} rows", 387 size.table, 388 size.compressed_human(), 389 size.uncompressed_human(), 390 size.compression_ratio(), 391 size.row_count 392 ); 393 } 394 395 if sizes.len() == 2 { 396 let json_size = sizes.iter().find(|s| s.table == TABLE_JSON); 397 let cbor_size = sizes.iter().find(|s| s.table == TABLE_CBOR); 398 399 if let (Some(json), Some(cbor)) = (json_size, cbor_size) { 400 let compressed_diff = json.compressed_bytes as f64 / cbor.compressed_bytes as f64; 401 let uncompressed_diff = json.uncompressed_bytes as f64 / cbor.uncompressed_bytes as f64; 402 403 info!("\nJSON vs CBOR:"); 404 info!( 405 " Compressed: JSON is {:.2}x the size of CBOR", 406 compressed_diff 407 ); 408 info!( 409 " Uncompressed: JSON is {:.2}x the size of CBOR", 410 uncompressed_diff 411 ); 412 413 if compressed_diff < 1.0 { 414 info!( 415 " Winner (compressed): JSON ({:.1}% smaller)", 416 (1.0 - compressed_diff) * 100.0 417 ); 418 } else { 419 info!( 420 " Winner (compressed): CBOR ({:.1}% smaller)", 421 (1.0 - 1.0 / compressed_diff) * 100.0 422 ); 423 } 424 } 425 } 426 427 info!("\nBenchmark complete!"); 428 429 Ok(()) 430} 431 432async fn report_progress( 433 client: &Client, 434 total_records: u64, 435 total_commits: u64, 436 errors: u64, 437 elapsed: Duration, 438 lag: &LagStats, 439) { 440 let records_per_sec = total_records as f64 / elapsed.as_secs_f64(); 441 442 info!( 443 "Progress: {} records from {} commits in {:.1}s ({:.1}/s), {} errors", 444 total_records, 445 total_commits, 446 elapsed.as_secs_f64(), 447 records_per_sec, 448 errors 449 ); 450 451 if lag.sample_count > 0 { 452 info!( 453 " Lag: current={:.1}s, min={:.1}s, max={:.1}s (window)", 454 lag.current_ms as f64 / 1000.0, 455 lag.min_ms.unwrap_or(0) as f64 / 1000.0, 456 lag.max_ms.unwrap_or(0) as f64 / 1000.0, 457 ); 458 } 459 460 // Try to get current sizes 461 match client.table_sizes(&[TABLE_JSON, TABLE_CBOR]).await { 462 Ok(sizes) => { 463 for size in sizes { 464 info!( 465 " {}: {} compressed ({} rows)", 466 size.table, 467 size.compressed_human(), 468 size.row_count 469 ); 470 } 471 } 472 Err(e) => { 473 warn!("Failed to query table sizes: {}", e); 474 } 475 } 476}