atproto blogging
1use std::sync::Arc;
2use std::time::{Duration, Instant};
3
4use chrono::Utc;
5use dashmap::DashMap;
6use n0_future::StreamExt;
7use smol_str::{SmolStr, ToSmolStr};
8use tracing::{debug, info, warn};
9
10use chrono::DateTime;
11
12use crate::clickhouse::{
13 AccountRevState, Client, FirehoseCursor, InserterConfig, RawAccountEvent, RawIdentityEvent,
14 RawRecordInsert, ResilientRecordInserter,
15};
16use crate::config::IndexerConfig;
17use crate::error::{IndexError, Result};
18use crate::firehose::{
19 Account, ExtractedRecord, FirehoseConsumer, Identity, MessageStream, SubscribeReposMessage,
20 extract_records,
21};
22
23/// Default consumer ID for cursor tracking
24const CONSUMER_ID: &str = "main";
25
26/// Per-account revision state for deduplication
27#[derive(Debug, Clone)]
28pub struct RevState {
29 pub last_rev: SmolStr,
30 pub last_cid: SmolStr,
31}
32
33/// In-memory cache of per-account revision state
34///
35/// Used for fast deduplication without hitting ClickHouse on every event.
36/// Populated from account_rev_state table on startup, updated as events are processed.
37pub struct RevCache {
38 inner: DashMap<SmolStr, RevState>,
39}
40
41impl RevCache {
42 pub fn new() -> Self {
43 Self {
44 inner: DashMap::new(),
45 }
46 }
47
48 /// Load cache from ClickHouse account_rev_state table
49 pub async fn load_from_clickhouse(client: &Client) -> Result<Self> {
50 let query = r#"
51 SELECT
52 did,
53 argMaxMerge(last_rev) as last_rev,
54 argMaxMerge(last_cid) as last_cid,
55 maxMerge(last_seq) as last_seq,
56 maxMerge(last_event_time) as last_event_time
57 FROM account_rev_state
58 GROUP BY did
59 "#;
60
61 let rows: Vec<AccountRevState> =
62 client.inner().query(query).fetch_all().await.map_err(|e| {
63 IndexError::ClickHouse(crate::error::ClickHouseError::Query {
64 message: "failed to load account rev state".into(),
65 source: e,
66 })
67 })?;
68
69 let cache = Self::new();
70 for row in rows {
71 cache.inner.insert(
72 SmolStr::new(&row.did),
73 RevState {
74 last_rev: SmolStr::new(&row.last_rev),
75 last_cid: SmolStr::new(&row.last_cid),
76 },
77 );
78 }
79
80 info!(
81 accounts = cache.inner.len(),
82 "loaded rev cache from clickhouse"
83 );
84 Ok(cache)
85 }
86
87 /// Check if we should process this commit (returns false if already seen)
88 pub fn should_process(&self, did: &str, rev: &str) -> bool {
89 match self.inner.get(did) {
90 Some(state) => rev > state.last_rev.as_str(),
91 None => true, // new account, always process
92 }
93 }
94
95 /// Update cache after processing a commit
96 pub fn update(&self, did: &SmolStr, rev: &SmolStr, cid: &SmolStr) {
97 self.inner.insert(
98 did.clone(),
99 RevState {
100 last_rev: rev.clone(),
101 last_cid: cid.clone(),
102 },
103 );
104 }
105
106 /// Get current cache size (number of accounts tracked)
107 pub fn len(&self) -> usize {
108 self.inner.len()
109 }
110
111 pub fn is_empty(&self) -> bool {
112 self.inner.is_empty()
113 }
114}
115
116impl Default for RevCache {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122/// Safety margin when resuming - back up this many sequence numbers
123/// to ensure no gaps from incomplete batches or race conditions
124const CURSOR_REWIND: i64 = 1000;
125
126/// Load cursor from ClickHouse for resuming
127///
128/// Returns cursor with safety margin subtracted to ensure overlap
129pub async fn load_cursor(client: &Client) -> Result<Option<i64>> {
130 let query = format!(
131 r#"
132 SELECT consumer_id, seq, event_time
133 FROM firehose_cursor FINAL
134 WHERE consumer_id = '{}'
135 LIMIT 1
136 "#,
137 CONSUMER_ID
138 );
139
140 let cursor: Option<FirehoseCursor> = client
141 .inner()
142 .query(&query)
143 .fetch_optional()
144 .await
145 .map_err(|e| crate::error::ClickHouseError::Query {
146 message: "failed to load cursor".into(),
147 source: e,
148 })?;
149
150 if let Some(c) = &cursor {
151 let resume_at = (c.seq as i64).saturating_sub(CURSOR_REWIND);
152 info!(
153 saved_seq = c.seq,
154 resume_seq = resume_at,
155 rewind = CURSOR_REWIND,
156 "loaded cursor from clickhouse (with safety margin)"
157 );
158 Ok(Some(resume_at))
159 } else {
160 Ok(None)
161 }
162}
163
164/// Firehose indexer that consumes AT Protocol firehose and writes to ClickHouse
165pub struct FirehoseIndexer {
166 client: Arc<Client>,
167 consumer: FirehoseConsumer,
168 rev_cache: RevCache,
169 config: IndexerConfig,
170}
171
172impl FirehoseIndexer {
173 /// Create a new firehose indexer
174 pub async fn new(
175 client: Client,
176 consumer: FirehoseConsumer,
177 config: IndexerConfig,
178 ) -> Result<Self> {
179 let client = Arc::new(client);
180
181 // Load rev cache from ClickHouse
182 let rev_cache = RevCache::load_from_clickhouse(&client).await?;
183
184 Ok(Self {
185 client,
186 consumer,
187 rev_cache,
188 config,
189 })
190 }
191
192 /// Save cursor to ClickHouse
193 async fn save_cursor(&self, seq: u64, event_time: DateTime<Utc>) -> Result<()> {
194 let query = format!(
195 "INSERT INTO firehose_cursor (consumer_id, seq, event_time) VALUES ('{}', {}, {})",
196 CONSUMER_ID,
197 seq,
198 event_time.timestamp_millis()
199 );
200
201 self.client.execute(&query).await?;
202 debug!(seq, "saved cursor");
203 Ok(())
204 }
205
206 /// Run the indexer loop
207 pub async fn run(&self) -> Result<()> {
208 info!("connecting to firehose...");
209 let mut stream: MessageStream = self.consumer.connect().await?;
210
211 // Inserters handle batching internally based on config
212 // Use resilient inserter for records since that's where untrusted JSON enters
213 let mut records =
214 ResilientRecordInserter::new(self.client.inner().clone(), InserterConfig::default());
215 let mut identities = self
216 .client
217 .inserter::<RawIdentityEvent>("raw_identity_events");
218 let mut accounts = self
219 .client
220 .inserter::<RawAccountEvent>("raw_account_events");
221
222 // Stats and cursor tracking
223 let mut processed: u64 = 0;
224 let mut skipped: u64 = 0;
225 let mut last_seq: u64 = 0;
226 let mut last_event_time = Utc::now();
227 let mut last_stats = Instant::now();
228 let mut last_cursor_save = Instant::now();
229
230 info!("starting indexer loop");
231
232 loop {
233 // Get time until next required flush - must commit before socket timeout (30s)
234 let records_time = records.time_left().unwrap_or(Duration::from_secs(10));
235 let identities_time = identities.time_left().unwrap_or(Duration::from_secs(10));
236 let accounts_time = accounts.time_left().unwrap_or(Duration::from_secs(10));
237 let time_left = records_time.min(identities_time).min(accounts_time);
238
239 let result = match tokio::time::timeout(time_left, stream.next()).await {
240 Ok(Some(result)) => result,
241 Ok(None) => {
242 // Stream ended
243 break;
244 }
245 Err(_) => {
246 // Timeout - flush inserters to keep INSERT alive
247 debug!("flush timeout, committing inserters");
248 records.commit().await?;
249 identities.commit().await.map_err(|e| {
250 crate::error::ClickHouseError::Query {
251 message: "periodic identities commit failed".into(),
252 source: e,
253 }
254 })?;
255 accounts
256 .commit()
257 .await
258 .map_err(|e| crate::error::ClickHouseError::Query {
259 message: "periodic accounts commit failed".into(),
260 source: e,
261 })?;
262 continue;
263 }
264 };
265
266 let msg = match result {
267 Ok(msg) => msg,
268 Err(e) => {
269 warn!(error = ?e, "firehose stream error");
270 continue;
271 }
272 };
273
274 // Track seq from any message type that has it
275 match &msg {
276 SubscribeReposMessage::Commit(c) => {
277 last_seq = c.seq as u64;
278 last_event_time = c.time.as_ref().with_timezone(&Utc);
279 }
280 SubscribeReposMessage::Identity(i) => {
281 last_seq = i.seq as u64;
282 last_event_time = i.time.as_ref().with_timezone(&Utc);
283 }
284 SubscribeReposMessage::Account(a) => {
285 last_seq = a.seq as u64;
286 last_event_time = a.time.as_ref().with_timezone(&Utc);
287 }
288 _ => {}
289 }
290
291 match msg {
292 SubscribeReposMessage::Commit(commit) => {
293 let did = commit.repo.as_ref();
294 let rev = commit.rev.as_ref();
295
296 // Dedup check
297 if !self.rev_cache.should_process(did, rev) {
298 skipped += 1;
299 continue;
300 }
301
302 // Extract and write records
303 for record in extract_records(&commit).await? {
304 // Collection filter - skip early before JSON conversion
305 if !self.config.collections.matches(&record.collection) {
306 continue;
307 }
308
309 let json = record.to_json()?.unwrap_or_else(|| "{}".to_string());
310
311 // Fire and forget delete handling
312 if record.operation == "delete" {
313 let client = self.client.clone();
314 let record_clone = record.clone();
315 tokio::spawn(async move {
316 if let Err(e) = handle_delete(&client, record_clone).await {
317 warn!(error = ?e, "delete handling failed");
318 }
319 });
320 }
321
322 records
323 .write(RawRecordInsert {
324 did: record.did.clone(),
325 collection: record.collection.clone(),
326 rkey: record.rkey.clone(),
327 cid: record.cid.clone(),
328 rev: record.rev.clone(),
329 record: json.to_smolstr(),
330 operation: record.operation.clone(),
331 seq: record.seq as u64,
332 event_time: record.event_time,
333 is_live: true,
334 validation_state: SmolStr::new_static("unchecked"),
335 })
336 .await?;
337 }
338
339 // Update rev cache
340 self.rev_cache.update(
341 &SmolStr::new(did),
342 &SmolStr::new(rev),
343 &commit.commit.0.to_smolstr(),
344 );
345
346 processed += 1;
347 }
348 SubscribeReposMessage::Identity(identity) => {
349 write_identity(&identity, &mut identities).await?;
350 }
351 SubscribeReposMessage::Account(account) => {
352 write_account(&account, &mut accounts).await?;
353 }
354 SubscribeReposMessage::Sync(_) => {
355 debug!("received sync (tooBig) event, skipping");
356 }
357 _ => {}
358 }
359
360 // commit() flushes if internal thresholds met, otherwise no-op
361 records.commit().await?;
362
363 // Periodic stats and cursor save (every 10s)
364 if last_stats.elapsed() >= Duration::from_secs(10) {
365 info!(
366 processed,
367 skipped,
368 last_seq,
369 rev_cache_size = self.rev_cache.len(),
370 "indexer stats"
371 );
372 last_stats = Instant::now();
373 }
374
375 // Save cursor every 30s
376 if last_cursor_save.elapsed() >= Duration::from_secs(30) && last_seq > 0 {
377 if let Err(e) = self.save_cursor(last_seq, last_event_time).await {
378 warn!(error = ?e, "failed to save cursor");
379 }
380 last_cursor_save = Instant::now();
381 }
382 }
383
384 // Final flush
385 records.end().await?;
386 identities
387 .end()
388 .await
389 .map_err(|e| crate::error::ClickHouseError::Query {
390 message: "final flush failed".into(),
391 source: e,
392 })?;
393 accounts
394 .end()
395 .await
396 .map_err(|e| crate::error::ClickHouseError::Query {
397 message: "final flush failed".into(),
398 source: e,
399 })?;
400
401 // Final cursor save
402 if last_seq > 0 {
403 self.save_cursor(last_seq, last_event_time).await?;
404 }
405
406 info!(last_seq, "firehose stream ended");
407 Ok(())
408 }
409}
410
411async fn write_identity(
412 identity: &Identity<'_>,
413 inserter: &mut clickhouse::inserter::Inserter<RawIdentityEvent>,
414) -> Result<()> {
415 inserter
416 .write(&RawIdentityEvent {
417 did: identity.did.to_smolstr(),
418 handle: identity
419 .handle
420 .as_ref()
421 .map(|h| h.as_ref().to_smolstr())
422 .unwrap_or_default(),
423 seq: identity.seq as u64,
424 event_time: identity.time.as_ref().with_timezone(&Utc),
425 })
426 .await
427 .map_err(|e| crate::error::ClickHouseError::Query {
428 message: "write failed".into(),
429 source: e,
430 })?;
431 Ok(())
432}
433
434async fn write_account(
435 account: &Account<'_>,
436 inserter: &mut clickhouse::inserter::Inserter<RawAccountEvent>,
437) -> Result<()> {
438 inserter
439 .write(&RawAccountEvent {
440 did: account.did.to_smolstr(),
441 active: if account.active { 1 } else { 0 },
442 status: account
443 .status
444 .as_ref()
445 .map(|s| s.as_ref().to_smolstr())
446 .unwrap_or_default(),
447 seq: account.seq as u64,
448 event_time: account.time.as_ref().with_timezone(&Utc),
449 })
450 .await
451 .map_err(|e| crate::error::ClickHouseError::Query {
452 message: "write failed".into(),
453 source: e,
454 })?;
455 Ok(())
456}
457
458/// Handle a delete event with poll-then-stub logic
459///
460/// For deletes, we need to look up the original record to know what was deleted
461/// (e.g., which notebook a like was for). If the record doesn't exist yet
462/// (out-of-order events), we poll for up to 15 seconds before creating a stub tombstone.
463/// Minimal struct for delete lookups - just the fields we need to process the delete
464#[derive(Debug, Clone, clickhouse::Row, serde::Deserialize)]
465struct LookupRawRecord {
466 #[allow(dead_code)]
467 did: SmolStr,
468 #[allow(dead_code)]
469 collection: SmolStr,
470 #[allow(dead_code)]
471 rkey: SmolStr,
472 #[allow(dead_code)]
473 record: SmolStr, // JSON string of the original record
474}
475
476async fn handle_delete(client: &Client, record: ExtractedRecord) -> Result<()> {
477 let deadline = Instant::now() + Duration::from_secs(15);
478
479 loop {
480 // Try to find the record by CID
481 let query = format!(
482 r#"
483 SELECT did, collection, rkey, record
484 FROM raw_records
485 WHERE did = '{}' AND cid = '{}'
486 ORDER BY event_time DESC
487 LIMIT 1
488 "#,
489 record.did, record.cid
490 );
491
492 let original: Option<LookupRawRecord> = client
493 .inner()
494 .query(&query)
495 .fetch_optional()
496 .await
497 .map_err(|e| crate::error::ClickHouseError::Query {
498 message: "delete lookup failed".into(),
499 source: e,
500 })?;
501
502 if let Some(_original) = original {
503 // Found the record - the main insert path already handles creating
504 // the delete row, so we're done. In phase 2, this is where we'd
505 // parse original.record and insert count deltas for denormalized tables.
506 debug!(did = %record.did, cid = %record.cid, "delete found original record");
507 return Ok(());
508 }
509
510 if Instant::now() > deadline {
511 // Gave up - create stub tombstone
512 // The record will be inserted via the main batch path with operation='delete'
513 // and empty record content, which serves as our stub tombstone
514 warn!(
515 did = %record.did,
516 cid = %record.cid,
517 "delete timeout, stub tombstone will be created"
518 );
519 return Ok(());
520 }
521
522 tokio::time::sleep(Duration::from_secs(1)).await;
523 }
524}