at main 524 lines 18 kB view raw
1use std::sync::Arc; 2use std::time::{Duration, Instant}; 3 4use chrono::Utc; 5use dashmap::DashMap; 6use n0_future::StreamExt; 7use smol_str::{SmolStr, ToSmolStr}; 8use tracing::{debug, info, warn}; 9 10use chrono::DateTime; 11 12use crate::clickhouse::{ 13 AccountRevState, Client, FirehoseCursor, InserterConfig, RawAccountEvent, RawIdentityEvent, 14 RawRecordInsert, ResilientRecordInserter, 15}; 16use crate::config::IndexerConfig; 17use crate::error::{IndexError, Result}; 18use crate::firehose::{ 19 Account, ExtractedRecord, FirehoseConsumer, Identity, MessageStream, SubscribeReposMessage, 20 extract_records, 21}; 22 23/// Default consumer ID for cursor tracking 24const CONSUMER_ID: &str = "main"; 25 26/// Per-account revision state for deduplication 27#[derive(Debug, Clone)] 28pub struct RevState { 29 pub last_rev: SmolStr, 30 pub last_cid: SmolStr, 31} 32 33/// In-memory cache of per-account revision state 34/// 35/// Used for fast deduplication without hitting ClickHouse on every event. 36/// Populated from account_rev_state table on startup, updated as events are processed. 37pub struct RevCache { 38 inner: DashMap<SmolStr, RevState>, 39} 40 41impl RevCache { 42 pub fn new() -> Self { 43 Self { 44 inner: DashMap::new(), 45 } 46 } 47 48 /// Load cache from ClickHouse account_rev_state table 49 pub async fn load_from_clickhouse(client: &Client) -> Result<Self> { 50 let query = r#" 51 SELECT 52 did, 53 argMaxMerge(last_rev) as last_rev, 54 argMaxMerge(last_cid) as last_cid, 55 maxMerge(last_seq) as last_seq, 56 maxMerge(last_event_time) as last_event_time 57 FROM account_rev_state 58 GROUP BY did 59 "#; 60 61 let rows: Vec<AccountRevState> = 62 client.inner().query(query).fetch_all().await.map_err(|e| { 63 IndexError::ClickHouse(crate::error::ClickHouseError::Query { 64 message: "failed to load account rev state".into(), 65 source: e, 66 }) 67 })?; 68 69 let cache = Self::new(); 70 for row in rows { 71 cache.inner.insert( 72 SmolStr::new(&row.did), 73 RevState { 74 last_rev: SmolStr::new(&row.last_rev), 75 last_cid: SmolStr::new(&row.last_cid), 76 }, 77 ); 78 } 79 80 info!( 81 accounts = cache.inner.len(), 82 "loaded rev cache from clickhouse" 83 ); 84 Ok(cache) 85 } 86 87 /// Check if we should process this commit (returns false if already seen) 88 pub fn should_process(&self, did: &str, rev: &str) -> bool { 89 match self.inner.get(did) { 90 Some(state) => rev > state.last_rev.as_str(), 91 None => true, // new account, always process 92 } 93 } 94 95 /// Update cache after processing a commit 96 pub fn update(&self, did: &SmolStr, rev: &SmolStr, cid: &SmolStr) { 97 self.inner.insert( 98 did.clone(), 99 RevState { 100 last_rev: rev.clone(), 101 last_cid: cid.clone(), 102 }, 103 ); 104 } 105 106 /// Get current cache size (number of accounts tracked) 107 pub fn len(&self) -> usize { 108 self.inner.len() 109 } 110 111 pub fn is_empty(&self) -> bool { 112 self.inner.is_empty() 113 } 114} 115 116impl Default for RevCache { 117 fn default() -> Self { 118 Self::new() 119 } 120} 121 122/// Safety margin when resuming - back up this many sequence numbers 123/// to ensure no gaps from incomplete batches or race conditions 124const CURSOR_REWIND: i64 = 1000; 125 126/// Load cursor from ClickHouse for resuming 127/// 128/// Returns cursor with safety margin subtracted to ensure overlap 129pub async fn load_cursor(client: &Client) -> Result<Option<i64>> { 130 let query = format!( 131 r#" 132 SELECT consumer_id, seq, event_time 133 FROM firehose_cursor FINAL 134 WHERE consumer_id = '{}' 135 LIMIT 1 136 "#, 137 CONSUMER_ID 138 ); 139 140 let cursor: Option<FirehoseCursor> = client 141 .inner() 142 .query(&query) 143 .fetch_optional() 144 .await 145 .map_err(|e| crate::error::ClickHouseError::Query { 146 message: "failed to load cursor".into(), 147 source: e, 148 })?; 149 150 if let Some(c) = &cursor { 151 let resume_at = (c.seq as i64).saturating_sub(CURSOR_REWIND); 152 info!( 153 saved_seq = c.seq, 154 resume_seq = resume_at, 155 rewind = CURSOR_REWIND, 156 "loaded cursor from clickhouse (with safety margin)" 157 ); 158 Ok(Some(resume_at)) 159 } else { 160 Ok(None) 161 } 162} 163 164/// Firehose indexer that consumes AT Protocol firehose and writes to ClickHouse 165pub struct FirehoseIndexer { 166 client: Arc<Client>, 167 consumer: FirehoseConsumer, 168 rev_cache: RevCache, 169 config: IndexerConfig, 170} 171 172impl FirehoseIndexer { 173 /// Create a new firehose indexer 174 pub async fn new( 175 client: Client, 176 consumer: FirehoseConsumer, 177 config: IndexerConfig, 178 ) -> Result<Self> { 179 let client = Arc::new(client); 180 181 // Load rev cache from ClickHouse 182 let rev_cache = RevCache::load_from_clickhouse(&client).await?; 183 184 Ok(Self { 185 client, 186 consumer, 187 rev_cache, 188 config, 189 }) 190 } 191 192 /// Save cursor to ClickHouse 193 async fn save_cursor(&self, seq: u64, event_time: DateTime<Utc>) -> Result<()> { 194 let query = format!( 195 "INSERT INTO firehose_cursor (consumer_id, seq, event_time) VALUES ('{}', {}, {})", 196 CONSUMER_ID, 197 seq, 198 event_time.timestamp_millis() 199 ); 200 201 self.client.execute(&query).await?; 202 debug!(seq, "saved cursor"); 203 Ok(()) 204 } 205 206 /// Run the indexer loop 207 pub async fn run(&self) -> Result<()> { 208 info!("connecting to firehose..."); 209 let mut stream: MessageStream = self.consumer.connect().await?; 210 211 // Inserters handle batching internally based on config 212 // Use resilient inserter for records since that's where untrusted JSON enters 213 let mut records = 214 ResilientRecordInserter::new(self.client.inner().clone(), InserterConfig::default()); 215 let mut identities = self 216 .client 217 .inserter::<RawIdentityEvent>("raw_identity_events"); 218 let mut accounts = self 219 .client 220 .inserter::<RawAccountEvent>("raw_account_events"); 221 222 // Stats and cursor tracking 223 let mut processed: u64 = 0; 224 let mut skipped: u64 = 0; 225 let mut last_seq: u64 = 0; 226 let mut last_event_time = Utc::now(); 227 let mut last_stats = Instant::now(); 228 let mut last_cursor_save = Instant::now(); 229 230 info!("starting indexer loop"); 231 232 loop { 233 // Get time until next required flush - must commit before socket timeout (30s) 234 let records_time = records.time_left().unwrap_or(Duration::from_secs(10)); 235 let identities_time = identities.time_left().unwrap_or(Duration::from_secs(10)); 236 let accounts_time = accounts.time_left().unwrap_or(Duration::from_secs(10)); 237 let time_left = records_time.min(identities_time).min(accounts_time); 238 239 let result = match tokio::time::timeout(time_left, stream.next()).await { 240 Ok(Some(result)) => result, 241 Ok(None) => { 242 // Stream ended 243 break; 244 } 245 Err(_) => { 246 // Timeout - flush inserters to keep INSERT alive 247 debug!("flush timeout, committing inserters"); 248 records.commit().await?; 249 identities.commit().await.map_err(|e| { 250 crate::error::ClickHouseError::Query { 251 message: "periodic identities commit failed".into(), 252 source: e, 253 } 254 })?; 255 accounts 256 .commit() 257 .await 258 .map_err(|e| crate::error::ClickHouseError::Query { 259 message: "periodic accounts commit failed".into(), 260 source: e, 261 })?; 262 continue; 263 } 264 }; 265 266 let msg = match result { 267 Ok(msg) => msg, 268 Err(e) => { 269 warn!(error = ?e, "firehose stream error"); 270 continue; 271 } 272 }; 273 274 // Track seq from any message type that has it 275 match &msg { 276 SubscribeReposMessage::Commit(c) => { 277 last_seq = c.seq as u64; 278 last_event_time = c.time.as_ref().with_timezone(&Utc); 279 } 280 SubscribeReposMessage::Identity(i) => { 281 last_seq = i.seq as u64; 282 last_event_time = i.time.as_ref().with_timezone(&Utc); 283 } 284 SubscribeReposMessage::Account(a) => { 285 last_seq = a.seq as u64; 286 last_event_time = a.time.as_ref().with_timezone(&Utc); 287 } 288 _ => {} 289 } 290 291 match msg { 292 SubscribeReposMessage::Commit(commit) => { 293 let did = commit.repo.as_ref(); 294 let rev = commit.rev.as_ref(); 295 296 // Dedup check 297 if !self.rev_cache.should_process(did, rev) { 298 skipped += 1; 299 continue; 300 } 301 302 // Extract and write records 303 for record in extract_records(&commit).await? { 304 // Collection filter - skip early before JSON conversion 305 if !self.config.collections.matches(&record.collection) { 306 continue; 307 } 308 309 let json = record.to_json()?.unwrap_or_else(|| "{}".to_string()); 310 311 // Fire and forget delete handling 312 if record.operation == "delete" { 313 let client = self.client.clone(); 314 let record_clone = record.clone(); 315 tokio::spawn(async move { 316 if let Err(e) = handle_delete(&client, record_clone).await { 317 warn!(error = ?e, "delete handling failed"); 318 } 319 }); 320 } 321 322 records 323 .write(RawRecordInsert { 324 did: record.did.clone(), 325 collection: record.collection.clone(), 326 rkey: record.rkey.clone(), 327 cid: record.cid.clone(), 328 rev: record.rev.clone(), 329 record: json.to_smolstr(), 330 operation: record.operation.clone(), 331 seq: record.seq as u64, 332 event_time: record.event_time, 333 is_live: true, 334 validation_state: SmolStr::new_static("unchecked"), 335 }) 336 .await?; 337 } 338 339 // Update rev cache 340 self.rev_cache.update( 341 &SmolStr::new(did), 342 &SmolStr::new(rev), 343 &commit.commit.0.to_smolstr(), 344 ); 345 346 processed += 1; 347 } 348 SubscribeReposMessage::Identity(identity) => { 349 write_identity(&identity, &mut identities).await?; 350 } 351 SubscribeReposMessage::Account(account) => { 352 write_account(&account, &mut accounts).await?; 353 } 354 SubscribeReposMessage::Sync(_) => { 355 debug!("received sync (tooBig) event, skipping"); 356 } 357 _ => {} 358 } 359 360 // commit() flushes if internal thresholds met, otherwise no-op 361 records.commit().await?; 362 363 // Periodic stats and cursor save (every 10s) 364 if last_stats.elapsed() >= Duration::from_secs(10) { 365 info!( 366 processed, 367 skipped, 368 last_seq, 369 rev_cache_size = self.rev_cache.len(), 370 "indexer stats" 371 ); 372 last_stats = Instant::now(); 373 } 374 375 // Save cursor every 30s 376 if last_cursor_save.elapsed() >= Duration::from_secs(30) && last_seq > 0 { 377 if let Err(e) = self.save_cursor(last_seq, last_event_time).await { 378 warn!(error = ?e, "failed to save cursor"); 379 } 380 last_cursor_save = Instant::now(); 381 } 382 } 383 384 // Final flush 385 records.end().await?; 386 identities 387 .end() 388 .await 389 .map_err(|e| crate::error::ClickHouseError::Query { 390 message: "final flush failed".into(), 391 source: e, 392 })?; 393 accounts 394 .end() 395 .await 396 .map_err(|e| crate::error::ClickHouseError::Query { 397 message: "final flush failed".into(), 398 source: e, 399 })?; 400 401 // Final cursor save 402 if last_seq > 0 { 403 self.save_cursor(last_seq, last_event_time).await?; 404 } 405 406 info!(last_seq, "firehose stream ended"); 407 Ok(()) 408 } 409} 410 411async fn write_identity( 412 identity: &Identity<'_>, 413 inserter: &mut clickhouse::inserter::Inserter<RawIdentityEvent>, 414) -> Result<()> { 415 inserter 416 .write(&RawIdentityEvent { 417 did: identity.did.to_smolstr(), 418 handle: identity 419 .handle 420 .as_ref() 421 .map(|h| h.as_ref().to_smolstr()) 422 .unwrap_or_default(), 423 seq: identity.seq as u64, 424 event_time: identity.time.as_ref().with_timezone(&Utc), 425 }) 426 .await 427 .map_err(|e| crate::error::ClickHouseError::Query { 428 message: "write failed".into(), 429 source: e, 430 })?; 431 Ok(()) 432} 433 434async fn write_account( 435 account: &Account<'_>, 436 inserter: &mut clickhouse::inserter::Inserter<RawAccountEvent>, 437) -> Result<()> { 438 inserter 439 .write(&RawAccountEvent { 440 did: account.did.to_smolstr(), 441 active: if account.active { 1 } else { 0 }, 442 status: account 443 .status 444 .as_ref() 445 .map(|s| s.as_ref().to_smolstr()) 446 .unwrap_or_default(), 447 seq: account.seq as u64, 448 event_time: account.time.as_ref().with_timezone(&Utc), 449 }) 450 .await 451 .map_err(|e| crate::error::ClickHouseError::Query { 452 message: "write failed".into(), 453 source: e, 454 })?; 455 Ok(()) 456} 457 458/// Handle a delete event with poll-then-stub logic 459/// 460/// For deletes, we need to look up the original record to know what was deleted 461/// (e.g., which notebook a like was for). If the record doesn't exist yet 462/// (out-of-order events), we poll for up to 15 seconds before creating a stub tombstone. 463/// Minimal struct for delete lookups - just the fields we need to process the delete 464#[derive(Debug, Clone, clickhouse::Row, serde::Deserialize)] 465struct LookupRawRecord { 466 #[allow(dead_code)] 467 did: SmolStr, 468 #[allow(dead_code)] 469 collection: SmolStr, 470 #[allow(dead_code)] 471 rkey: SmolStr, 472 #[allow(dead_code)] 473 record: SmolStr, // JSON string of the original record 474} 475 476async fn handle_delete(client: &Client, record: ExtractedRecord) -> Result<()> { 477 let deadline = Instant::now() + Duration::from_secs(15); 478 479 loop { 480 // Try to find the record by CID 481 let query = format!( 482 r#" 483 SELECT did, collection, rkey, record 484 FROM raw_records 485 WHERE did = '{}' AND cid = '{}' 486 ORDER BY event_time DESC 487 LIMIT 1 488 "#, 489 record.did, record.cid 490 ); 491 492 let original: Option<LookupRawRecord> = client 493 .inner() 494 .query(&query) 495 .fetch_optional() 496 .await 497 .map_err(|e| crate::error::ClickHouseError::Query { 498 message: "delete lookup failed".into(), 499 source: e, 500 })?; 501 502 if let Some(_original) = original { 503 // Found the record - the main insert path already handles creating 504 // the delete row, so we're done. In phase 2, this is where we'd 505 // parse original.record and insert count deltas for denormalized tables. 506 debug!(did = %record.did, cid = %record.cid, "delete found original record"); 507 return Ok(()); 508 } 509 510 if Instant::now() > deadline { 511 // Gave up - create stub tombstone 512 // The record will be inserted via the main batch path with operation='delete' 513 // and empty record content, which serves as our stub tombstone 514 warn!( 515 did = %record.did, 516 cid = %record.cid, 517 "delete timeout, stub tombstone will be created" 518 ); 519 return Ok(()); 520 } 521 522 tokio::time::sleep(Duration::from_secs(1)).await; 523 } 524}