at main 415 lines 15 kB view raw
1use std::sync::Arc; 2use std::sync::atomic::{AtomicBool, Ordering}; 3use std::time::{Duration, Instant}; 4 5use chrono::Utc; 6use smol_str::{SmolStr, ToSmolStr}; 7use tokio::task::JoinHandle; 8use tracing::{debug, error, info, trace, warn}; 9 10use crate::clickhouse::{ 11 Client, InserterConfig, Migrator, RawIdentityEvent, RawRecordInsert, ResilientRecordInserter, 12}; 13use crate::config::{IndexerConfig, TapConfig}; 14use crate::error::{ClickHouseError, Result}; 15use crate::tap::{ 16 RecordAction, TapConfig as TapConsumerConfig, TapConsumer, TapEvent, TapRecordEvent, 17}; 18 19/// Tap indexer with multiple parallel websocket connections 20/// 21/// Each worker maintains its own websocket connection to Tap and its own 22/// ClickHouse inserter. Tap distributes events across connected clients, 23/// and its ack-gating mechanism ensures per-DID ordering is preserved 24/// regardless of which worker handles which events. 25pub struct TapIndexer { 26 client: Arc<Client>, 27 tap_config: TapConfig, 28 inserter_config: InserterConfig, 29 config: Arc<IndexerConfig>, 30 num_workers: usize, 31 /// Tracks whether backfill has been triggered (first live event seen) 32 backfill_triggered: Arc<AtomicBool>, 33} 34 35impl TapIndexer { 36 pub fn new( 37 client: Client, 38 tap_config: TapConfig, 39 inserter_config: InserterConfig, 40 config: IndexerConfig, 41 num_workers: usize, 42 ) -> Self { 43 Self { 44 client: Arc::new(client), 45 tap_config, 46 inserter_config, 47 config: Arc::new(config), 48 num_workers, 49 backfill_triggered: Arc::new(AtomicBool::new(false)), 50 } 51 } 52 53 pub async fn run(&self) -> Result<()> { 54 info!( 55 num_workers = self.num_workers, 56 url = %self.tap_config.url, 57 "starting parallel tap indexer" 58 ); 59 60 let mut handles: Vec<JoinHandle<Result<()>>> = Vec::with_capacity(self.num_workers); 61 62 for worker_id in 0..self.num_workers { 63 let client = self.client.clone(); 64 let tap_config = self.tap_config.clone(); 65 let inserter_config = self.inserter_config.clone(); 66 let config = self.config.clone(); 67 let backfill_triggered = self.backfill_triggered.clone(); 68 69 let handle = tokio::spawn(async move { 70 run_tap_worker( 71 worker_id, 72 client, 73 tap_config, 74 inserter_config, 75 config, 76 backfill_triggered, 77 ) 78 .await 79 }); 80 81 handles.push(handle); 82 } 83 84 // Wait for all workers 85 // TODO: Implement proper supervision - restart failed workers instead of propagating 86 for (i, handle) in handles.into_iter().enumerate() { 87 match handle.await { 88 Ok(Ok(())) => { 89 info!(worker_id = i, "tap worker finished cleanly"); 90 } 91 Ok(Err(e)) => { 92 error!(worker_id = i, error = ?e, "tap worker failed"); 93 return Err(e); 94 } 95 Err(e) => { 96 error!(worker_id = i, error = ?e, "tap worker panicked"); 97 return Err(crate::error::FirehoseError::Stream { 98 message: format!("worker {} panicked: {}", i, e), 99 } 100 .into()); 101 } 102 } 103 } 104 105 Ok(()) 106 } 107} 108 109async fn run_tap_worker( 110 worker_id: usize, 111 client: Arc<Client>, 112 tap_config: TapConfig, 113 inserter_config: InserterConfig, 114 config: Arc<IndexerConfig>, 115 backfill_triggered: Arc<AtomicBool>, 116) -> Result<()> { 117 info!(worker_id, url = %tap_config.url, "tap worker starting"); 118 119 let consumer_config = 120 TapConsumerConfig::new(tap_config.url.clone()).with_acks(tap_config.send_acks); 121 let consumer = TapConsumer::new(consumer_config); 122 123 let (mut events, ack_tx) = consumer.connect().await?; 124 125 // Each worker has its own resilient inserter 126 let mut records = ResilientRecordInserter::new(client.inner().clone(), inserter_config); 127 let mut identities = client.inserter::<RawIdentityEvent>("raw_identity_events"); 128 129 let mut processed: u64 = 0; 130 let mut last_stats = Instant::now(); 131 132 info!(worker_id, "tap worker connected, starting event loop"); 133 134 loop { 135 // Get time until next required flush 136 let records_time = records.time_left().unwrap_or(Duration::from_secs(10)); 137 let identities_time = identities.time_left().unwrap_or(Duration::from_secs(10)); 138 let time_left = records_time.min(identities_time); 139 140 let event = match tokio::time::timeout(time_left, events.recv()).await { 141 Ok(Some(event)) => event, 142 Ok(None) => { 143 info!(worker_id, "tap channel closed, exiting"); 144 break; 145 } 146 Err(_) => { 147 // Timeout - flush inserters 148 trace!(worker_id, "flush timeout, committing inserters"); 149 records.commit().await?; 150 identities 151 .commit() 152 .await 153 .map_err(|e| ClickHouseError::Query { 154 message: "periodic identities commit failed".into(), 155 source: e, 156 })?; 157 continue; 158 } 159 }; 160 161 let event_id = event.id(); 162 163 match event { 164 TapEvent::Record(envelope) => { 165 let record = &envelope.record; 166 167 // Collection filter 168 if !config.collections.matches(&record.collection) { 169 let _ = ack_tx.send(event_id).await; 170 continue; 171 } 172 173 // Serialize record 174 let json = match &record.record { 175 Some(v) => match serde_json::to_string(v) { 176 Ok(s) => s, 177 Err(e) => { 178 warn!( 179 worker_id, 180 did = %record.did, 181 collection = %record.collection, 182 rkey = %record.rkey, 183 error = ?e, 184 "failed to serialize record, sending to DLQ" 185 ); 186 let raw_data = format!( 187 r#"{{"did":"{}","collection":"{}","rkey":"{}","cid":"{}","error":"serialization_failed"}}"#, 188 record.did, 189 record.collection, 190 record.rkey, 191 record 192 .cid 193 .as_ref() 194 .unwrap_or(&SmolStr::new_static("no cid")) 195 ); 196 records 197 .write_raw_to_dlq( 198 record.action.as_str().to_smolstr(), 199 raw_data, 200 e.to_string(), 201 event_id, 202 ) 203 .await?; 204 let _ = ack_tx.send(event_id).await; 205 continue; 206 } 207 }, 208 None => "{}".to_string(), 209 }; 210 211 debug!( 212 worker_id, 213 op = record.action.as_str(), 214 id = event_id, 215 len = json.len(), 216 "writing record" 217 ); 218 219 if record.action == RecordAction::Delete { 220 let client = client.clone(); 221 let record_clone = record.clone(); 222 tokio::spawn(async move { 223 if let Err(e) = handle_delete(&client, record_clone).await { 224 warn!(error = ?e, "delete handling failed"); 225 } 226 }); 227 } 228 229 records 230 .write(RawRecordInsert { 231 did: record.did.clone(), 232 collection: record.collection.clone(), 233 rkey: record.rkey.clone(), 234 cid: record.cid.clone().unwrap_or_default(), 235 rev: record.rev.clone(), 236 record: json.to_smolstr(), 237 operation: record.action.as_str().to_smolstr(), 238 seq: event_id, 239 event_time: Utc::now(), 240 is_live: record.live, 241 // records from tap are pre-validated 242 validation_state: SmolStr::new_static("valid"), 243 }) 244 .await?; 245 records.commit().await?; 246 247 // Ack after successful processing 248 let _ = ack_tx.send(event_id).await; 249 250 processed += 1; 251 252 // Trigger backfill on first live event 253 // compare_exchange ensures only one worker triggers this 254 if record.live 255 && backfill_triggered 256 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) 257 .is_ok() 258 { 259 info!(worker_id, "first live event received, scheduling backfill"); 260 let backfill_client = client.clone(); 261 tokio::spawn(async move { 262 run_backfill(backfill_client).await; 263 }); 264 } 265 } 266 TapEvent::Identity(envelope) => { 267 let identity = &envelope.identity; 268 269 identities 270 .write(&RawIdentityEvent { 271 did: identity.did.clone(), 272 handle: identity.handle.clone(), 273 seq: event_id, 274 event_time: Utc::now(), 275 }) 276 .await 277 .map_err(|e| ClickHouseError::Query { 278 message: "identity write failed".into(), 279 source: e, 280 })?; 281 identities 282 .commit() 283 .await 284 .map_err(|e| ClickHouseError::Query { 285 message: "identity commit failed".into(), 286 source: e, 287 })?; 288 289 let _ = ack_tx.send(event_id).await; 290 } 291 } 292 293 // Periodic stats 294 if last_stats.elapsed() > Duration::from_secs(30) { 295 info!(worker_id, processed, "tap worker stats"); 296 last_stats = Instant::now(); 297 } 298 } 299 300 // Clean shutdown 301 records.end().await?; 302 identities.end().await.map_err(|e| ClickHouseError::Query { 303 message: "identities end failed".into(), 304 source: e, 305 })?; 306 307 info!(worker_id, processed, "tap worker finished"); 308 Ok(()) 309} 310 311/// Run backfill queries for incremental MVs 312/// 313/// Called once when the first live event is received, indicating historical 314/// data load is complete. Waits briefly to let in-flight inserts settle, 315/// then runs INSERT queries to populate target tables for incremental MVs. 316async fn run_backfill(client: Arc<Client>) { 317 // Wait for in-flight inserts to settle 318 info!("backfill: waiting 100s for in-flight inserts to settle"); 319 tokio::time::sleep(Duration::from_secs(100)).await; 320 321 let mvs = Migrator::incremental_mvs(); 322 if mvs.is_empty() { 323 info!("backfill: no incremental MVs found, nothing to do"); 324 return; 325 } 326 327 info!( 328 count = mvs.len(), 329 "backfill: starting incremental MV backfill" 330 ); 331 332 for mv in mvs { 333 info!( 334 mv = %mv.name, 335 table = %mv.target_table, 336 "backfill: running backfill query" 337 ); 338 339 let query = mv.backfill_query(); 340 debug!(query = %query, "backfill query"); 341 342 match client.execute(&query).await { 343 Ok(()) => { 344 info!(mv = %mv.name, "backfill: completed successfully"); 345 } 346 Err(e) => { 347 error!(mv = %mv.name, error = ?e, "backfill: query failed"); 348 } 349 } 350 } 351 352 info!("backfill: all incremental MVs processed"); 353} 354 355#[derive(Debug, Clone, clickhouse::Row, serde::Deserialize)] 356struct LookupRawRecord { 357 #[allow(dead_code)] 358 did: SmolStr, 359 #[allow(dead_code)] 360 collection: SmolStr, 361 #[allow(dead_code)] 362 cid: SmolStr, 363 #[allow(dead_code)] 364 record: SmolStr, // JSON string of the original record 365} 366 367async fn handle_delete(client: &Client, record: TapRecordEvent) -> Result<()> { 368 let deadline = Instant::now() + Duration::from_secs(15); 369 370 loop { 371 // Try to find the record by rkey 372 let query = format!( 373 r#" 374 SELECT did, collection, cid, record 375 FROM raw_records 376 WHERE did = '{}' AND rkey = '{}' 377 ORDER BY event_time DESC 378 LIMIT 1 379 "#, 380 record.did, record.rkey 381 ); 382 383 let original: Option<LookupRawRecord> = client 384 .inner() 385 .query(&query) 386 .fetch_optional() 387 .await 388 .map_err(|e| crate::error::ClickHouseError::Query { 389 message: "delete lookup failed".into(), 390 source: e, 391 })?; 392 393 if let Some(original) = original { 394 // Found the record - the main insert path already handles creating 395 // the delete row, so we're done. In phase 2, this is where we'd 396 // parse original.record and insert count deltas for denormalized tables. 397 debug!(did = %record.did, cid = %original.cid, "delete found original record"); 398 return Ok(()); 399 } 400 401 if Instant::now() > deadline { 402 // Gave up - create stub tombstone 403 // The record will be inserted via the main batch path with operation='delete' 404 // and empty record content, which serves as our stub tombstone 405 warn!( 406 did = %record.did, 407 cid = %original.as_ref().map(|o| o.cid.clone()).unwrap_or(SmolStr::new_static("")), 408 "delete timeout, stub tombstone will be created" 409 ); 410 return Ok(()); 411 } 412 413 tokio::time::sleep(Duration::from_secs(1)).await; 414 } 415}