atproto blogging
1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::{Duration, Instant};
4
5use chrono::Utc;
6use smol_str::{SmolStr, ToSmolStr};
7use tokio::task::JoinHandle;
8use tracing::{debug, error, info, trace, warn};
9
10use crate::clickhouse::{
11 Client, InserterConfig, Migrator, RawIdentityEvent, RawRecordInsert, ResilientRecordInserter,
12};
13use crate::config::{IndexerConfig, TapConfig};
14use crate::error::{ClickHouseError, Result};
15use crate::tap::{
16 RecordAction, TapConfig as TapConsumerConfig, TapConsumer, TapEvent, TapRecordEvent,
17};
18
19/// Tap indexer with multiple parallel websocket connections
20///
21/// Each worker maintains its own websocket connection to Tap and its own
22/// ClickHouse inserter. Tap distributes events across connected clients,
23/// and its ack-gating mechanism ensures per-DID ordering is preserved
24/// regardless of which worker handles which events.
25pub struct TapIndexer {
26 client: Arc<Client>,
27 tap_config: TapConfig,
28 inserter_config: InserterConfig,
29 config: Arc<IndexerConfig>,
30 num_workers: usize,
31 /// Tracks whether backfill has been triggered (first live event seen)
32 backfill_triggered: Arc<AtomicBool>,
33}
34
35impl TapIndexer {
36 pub fn new(
37 client: Client,
38 tap_config: TapConfig,
39 inserter_config: InserterConfig,
40 config: IndexerConfig,
41 num_workers: usize,
42 ) -> Self {
43 Self {
44 client: Arc::new(client),
45 tap_config,
46 inserter_config,
47 config: Arc::new(config),
48 num_workers,
49 backfill_triggered: Arc::new(AtomicBool::new(false)),
50 }
51 }
52
53 pub async fn run(&self) -> Result<()> {
54 info!(
55 num_workers = self.num_workers,
56 url = %self.tap_config.url,
57 "starting parallel tap indexer"
58 );
59
60 let mut handles: Vec<JoinHandle<Result<()>>> = Vec::with_capacity(self.num_workers);
61
62 for worker_id in 0..self.num_workers {
63 let client = self.client.clone();
64 let tap_config = self.tap_config.clone();
65 let inserter_config = self.inserter_config.clone();
66 let config = self.config.clone();
67 let backfill_triggered = self.backfill_triggered.clone();
68
69 let handle = tokio::spawn(async move {
70 run_tap_worker(
71 worker_id,
72 client,
73 tap_config,
74 inserter_config,
75 config,
76 backfill_triggered,
77 )
78 .await
79 });
80
81 handles.push(handle);
82 }
83
84 // Wait for all workers
85 // TODO: Implement proper supervision - restart failed workers instead of propagating
86 for (i, handle) in handles.into_iter().enumerate() {
87 match handle.await {
88 Ok(Ok(())) => {
89 info!(worker_id = i, "tap worker finished cleanly");
90 }
91 Ok(Err(e)) => {
92 error!(worker_id = i, error = ?e, "tap worker failed");
93 return Err(e);
94 }
95 Err(e) => {
96 error!(worker_id = i, error = ?e, "tap worker panicked");
97 return Err(crate::error::FirehoseError::Stream {
98 message: format!("worker {} panicked: {}", i, e),
99 }
100 .into());
101 }
102 }
103 }
104
105 Ok(())
106 }
107}
108
109async fn run_tap_worker(
110 worker_id: usize,
111 client: Arc<Client>,
112 tap_config: TapConfig,
113 inserter_config: InserterConfig,
114 config: Arc<IndexerConfig>,
115 backfill_triggered: Arc<AtomicBool>,
116) -> Result<()> {
117 info!(worker_id, url = %tap_config.url, "tap worker starting");
118
119 let consumer_config =
120 TapConsumerConfig::new(tap_config.url.clone()).with_acks(tap_config.send_acks);
121 let consumer = TapConsumer::new(consumer_config);
122
123 let (mut events, ack_tx) = consumer.connect().await?;
124
125 // Each worker has its own resilient inserter
126 let mut records = ResilientRecordInserter::new(client.inner().clone(), inserter_config);
127 let mut identities = client.inserter::<RawIdentityEvent>("raw_identity_events");
128
129 let mut processed: u64 = 0;
130 let mut last_stats = Instant::now();
131
132 info!(worker_id, "tap worker connected, starting event loop");
133
134 loop {
135 // Get time until next required flush
136 let records_time = records.time_left().unwrap_or(Duration::from_secs(10));
137 let identities_time = identities.time_left().unwrap_or(Duration::from_secs(10));
138 let time_left = records_time.min(identities_time);
139
140 let event = match tokio::time::timeout(time_left, events.recv()).await {
141 Ok(Some(event)) => event,
142 Ok(None) => {
143 info!(worker_id, "tap channel closed, exiting");
144 break;
145 }
146 Err(_) => {
147 // Timeout - flush inserters
148 trace!(worker_id, "flush timeout, committing inserters");
149 records.commit().await?;
150 identities
151 .commit()
152 .await
153 .map_err(|e| ClickHouseError::Query {
154 message: "periodic identities commit failed".into(),
155 source: e,
156 })?;
157 continue;
158 }
159 };
160
161 let event_id = event.id();
162
163 match event {
164 TapEvent::Record(envelope) => {
165 let record = &envelope.record;
166
167 // Collection filter
168 if !config.collections.matches(&record.collection) {
169 let _ = ack_tx.send(event_id).await;
170 continue;
171 }
172
173 // Serialize record
174 let json = match &record.record {
175 Some(v) => match serde_json::to_string(v) {
176 Ok(s) => s,
177 Err(e) => {
178 warn!(
179 worker_id,
180 did = %record.did,
181 collection = %record.collection,
182 rkey = %record.rkey,
183 error = ?e,
184 "failed to serialize record, sending to DLQ"
185 );
186 let raw_data = format!(
187 r#"{{"did":"{}","collection":"{}","rkey":"{}","cid":"{}","error":"serialization_failed"}}"#,
188 record.did,
189 record.collection,
190 record.rkey,
191 record
192 .cid
193 .as_ref()
194 .unwrap_or(&SmolStr::new_static("no cid"))
195 );
196 records
197 .write_raw_to_dlq(
198 record.action.as_str().to_smolstr(),
199 raw_data,
200 e.to_string(),
201 event_id,
202 )
203 .await?;
204 let _ = ack_tx.send(event_id).await;
205 continue;
206 }
207 },
208 None => "{}".to_string(),
209 };
210
211 debug!(
212 worker_id,
213 op = record.action.as_str(),
214 id = event_id,
215 len = json.len(),
216 "writing record"
217 );
218
219 if record.action == RecordAction::Delete {
220 let client = client.clone();
221 let record_clone = record.clone();
222 tokio::spawn(async move {
223 if let Err(e) = handle_delete(&client, record_clone).await {
224 warn!(error = ?e, "delete handling failed");
225 }
226 });
227 }
228
229 records
230 .write(RawRecordInsert {
231 did: record.did.clone(),
232 collection: record.collection.clone(),
233 rkey: record.rkey.clone(),
234 cid: record.cid.clone().unwrap_or_default(),
235 rev: record.rev.clone(),
236 record: json.to_smolstr(),
237 operation: record.action.as_str().to_smolstr(),
238 seq: event_id,
239 event_time: Utc::now(),
240 is_live: record.live,
241 // records from tap are pre-validated
242 validation_state: SmolStr::new_static("valid"),
243 })
244 .await?;
245 records.commit().await?;
246
247 // Ack after successful processing
248 let _ = ack_tx.send(event_id).await;
249
250 processed += 1;
251
252 // Trigger backfill on first live event
253 // compare_exchange ensures only one worker triggers this
254 if record.live
255 && backfill_triggered
256 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
257 .is_ok()
258 {
259 info!(worker_id, "first live event received, scheduling backfill");
260 let backfill_client = client.clone();
261 tokio::spawn(async move {
262 run_backfill(backfill_client).await;
263 });
264 }
265 }
266 TapEvent::Identity(envelope) => {
267 let identity = &envelope.identity;
268
269 identities
270 .write(&RawIdentityEvent {
271 did: identity.did.clone(),
272 handle: identity.handle.clone(),
273 seq: event_id,
274 event_time: Utc::now(),
275 })
276 .await
277 .map_err(|e| ClickHouseError::Query {
278 message: "identity write failed".into(),
279 source: e,
280 })?;
281 identities
282 .commit()
283 .await
284 .map_err(|e| ClickHouseError::Query {
285 message: "identity commit failed".into(),
286 source: e,
287 })?;
288
289 let _ = ack_tx.send(event_id).await;
290 }
291 }
292
293 // Periodic stats
294 if last_stats.elapsed() > Duration::from_secs(30) {
295 info!(worker_id, processed, "tap worker stats");
296 last_stats = Instant::now();
297 }
298 }
299
300 // Clean shutdown
301 records.end().await?;
302 identities.end().await.map_err(|e| ClickHouseError::Query {
303 message: "identities end failed".into(),
304 source: e,
305 })?;
306
307 info!(worker_id, processed, "tap worker finished");
308 Ok(())
309}
310
311/// Run backfill queries for incremental MVs
312///
313/// Called once when the first live event is received, indicating historical
314/// data load is complete. Waits briefly to let in-flight inserts settle,
315/// then runs INSERT queries to populate target tables for incremental MVs.
316async fn run_backfill(client: Arc<Client>) {
317 // Wait for in-flight inserts to settle
318 info!("backfill: waiting 100s for in-flight inserts to settle");
319 tokio::time::sleep(Duration::from_secs(100)).await;
320
321 let mvs = Migrator::incremental_mvs();
322 if mvs.is_empty() {
323 info!("backfill: no incremental MVs found, nothing to do");
324 return;
325 }
326
327 info!(
328 count = mvs.len(),
329 "backfill: starting incremental MV backfill"
330 );
331
332 for mv in mvs {
333 info!(
334 mv = %mv.name,
335 table = %mv.target_table,
336 "backfill: running backfill query"
337 );
338
339 let query = mv.backfill_query();
340 debug!(query = %query, "backfill query");
341
342 match client.execute(&query).await {
343 Ok(()) => {
344 info!(mv = %mv.name, "backfill: completed successfully");
345 }
346 Err(e) => {
347 error!(mv = %mv.name, error = ?e, "backfill: query failed");
348 }
349 }
350 }
351
352 info!("backfill: all incremental MVs processed");
353}
354
355#[derive(Debug, Clone, clickhouse::Row, serde::Deserialize)]
356struct LookupRawRecord {
357 #[allow(dead_code)]
358 did: SmolStr,
359 #[allow(dead_code)]
360 collection: SmolStr,
361 #[allow(dead_code)]
362 cid: SmolStr,
363 #[allow(dead_code)]
364 record: SmolStr, // JSON string of the original record
365}
366
367async fn handle_delete(client: &Client, record: TapRecordEvent) -> Result<()> {
368 let deadline = Instant::now() + Duration::from_secs(15);
369
370 loop {
371 // Try to find the record by rkey
372 let query = format!(
373 r#"
374 SELECT did, collection, cid, record
375 FROM raw_records
376 WHERE did = '{}' AND rkey = '{}'
377 ORDER BY event_time DESC
378 LIMIT 1
379 "#,
380 record.did, record.rkey
381 );
382
383 let original: Option<LookupRawRecord> = client
384 .inner()
385 .query(&query)
386 .fetch_optional()
387 .await
388 .map_err(|e| crate::error::ClickHouseError::Query {
389 message: "delete lookup failed".into(),
390 source: e,
391 })?;
392
393 if let Some(original) = original {
394 // Found the record - the main insert path already handles creating
395 // the delete row, so we're done. In phase 2, this is where we'd
396 // parse original.record and insert count deltas for denormalized tables.
397 debug!(did = %record.did, cid = %original.cid, "delete found original record");
398 return Ok(());
399 }
400
401 if Instant::now() > deadline {
402 // Gave up - create stub tombstone
403 // The record will be inserted via the main batch path with operation='delete'
404 // and empty record content, which serves as our stub tombstone
405 warn!(
406 did = %record.did,
407 cid = %original.as_ref().map(|o| o.cid.clone()).unwrap_or(SmolStr::new_static("")),
408 "delete timeout, stub tombstone will be created"
409 );
410 return Ok(());
411 }
412
413 tokio::time::sleep(Duration::from_secs(1)).await;
414 }
415}