···55use dashmap::DashMap;
66use n0_future::StreamExt;
77use smol_str::{SmolStr, ToSmolStr};
88-use tracing::{debug, info, trace, warn};
88+use tracing::{debug, info, warn};
991010use chrono::DateTime;
1111···1414 RawRecordInsert, ResilientRecordInserter,
1515};
1616use crate::config::IndexerConfig;
1717-use crate::config::TapConfig;
1818-use crate::error::{ClickHouseError, IndexError, Result};
1717+use crate::error::{IndexError, Result};
1918use crate::firehose::{
2019 Account, ExtractedRecord, FirehoseConsumer, Identity, MessageStream, SubscribeReposMessage,
2120 extract_records,
2221};
2323-use crate::tap::{TapConfig as TapConsumerConfig, TapConsumer, TapEvent};
24222523/// Default consumer ID for cursor tracking
2624const CONSUMER_ID: &str = "main";
···523521 tokio::time::sleep(Duration::from_secs(1)).await;
524522 }
525523}
526526-527527-// ============================================================================
528528-// TapIndexer - consumes from tap websocket
529529-// ============================================================================
530530-531531-/// Consumer ID for tap cursor tracking
532532-const TAP_CONSUMER_ID: &str = "tap";
533533-534534-/// Tap indexer that consumes from tap websocket and writes to ClickHouse
535535-pub struct TapIndexer {
536536- client: Arc<Client>,
537537- tap_config: TapConfig,
538538- config: IndexerConfig,
539539-}
540540-541541-impl TapIndexer {
542542- /// Create a new tap indexer
543543- pub fn new(client: Client, tap_config: TapConfig, config: IndexerConfig) -> Self {
544544- Self {
545545- client: Arc::new(client),
546546- tap_config,
547547- config,
548548- }
549549- }
550550-551551- /// Save tap cursor to ClickHouse for visibility
552552- async fn save_cursor(&self, seq: u64) -> Result<()> {
553553- let query = format!(
554554- "INSERT INTO firehose_cursor (consumer_id, seq, event_time) VALUES ('{}', {}, now64(3))",
555555- TAP_CONSUMER_ID, seq
556556- );
557557-558558- self.client.execute(&query).await?;
559559- debug!(seq, "saved tap cursor");
560560- Ok(())
561561- }
562562-563563- /// Run the tap indexer loop
564564- pub async fn run(&self) -> Result<()> {
565565- info!(url = %self.tap_config.url, "connecting to tap...");
566566-567567- let consumer_config = TapConsumerConfig::new(self.tap_config.url.clone())
568568- .with_acks(self.tap_config.send_acks);
569569- let consumer = TapConsumer::new(consumer_config);
570570-571571- let (mut events, ack_tx) = consumer.connect().await?;
572572-573573- // Use resilient inserter for records since that's where untrusted JSON enters
574574- let mut records =
575575- ResilientRecordInserter::new(self.client.inner().clone(), InserterConfig::default());
576576- let mut identities = self
577577- .client
578578- .inserter::<RawIdentityEvent>("raw_identity_events");
579579-580580- let mut processed: u64 = 0;
581581- let mut last_seq: u64 = 0;
582582- let mut last_stats = Instant::now();
583583- let mut last_cursor_save = Instant::now();
584584-585585- info!("starting tap indexer loop");
586586-587587- loop {
588588- // Get time until next required flush - must commit before socket timeout (30s)
589589- let records_time = records.time_left().unwrap_or(Duration::from_secs(10));
590590- let identities_time = identities.time_left().unwrap_or(Duration::from_secs(10));
591591- let time_left = records_time.min(identities_time);
592592-593593- let event = match tokio::time::timeout(time_left, events.recv()).await {
594594- Ok(Some(event)) => event,
595595- Ok(None) => {
596596- // Channel closed, exit loop
597597- break;
598598- }
599599- Err(_) => {
600600- // Timeout - flush inserters to keep INSERT alive
601601- trace!("flush timeout, committing inserters");
602602- records.commit().await?;
603603- identities
604604- .commit()
605605- .await
606606- .map_err(|e| ClickHouseError::Query {
607607- message: "periodic identities commit failed".into(),
608608- source: e,
609609- })?;
610610- continue;
611611- }
612612- };
613613-614614- let event_id = event.id();
615615- last_seq = event_id;
616616-617617- match event {
618618- TapEvent::Record(envelope) => {
619619- let record = &envelope.record;
620620-621621- // Collection filter
622622- if !self.config.collections.matches(&record.collection) {
623623- // Still ack even if filtered
624624- let _ = ack_tx.send(event_id).await;
625625- continue;
626626- }
627627-628628- let json = match &record.record {
629629- Some(v) => match serde_json::to_string(v) {
630630- Ok(s) => s,
631631- Err(e) => {
632632- warn!(
633633- did = %record.did,
634634- collection = %record.collection,
635635- rkey = %record.rkey,
636636- error = ?e,
637637- "failed to serialize record, sending to DLQ"
638638- );
639639- let raw_data = format!(
640640- r#"{{"did":"{}","collection":"{}","rkey":"{}","cid":"{}","error":"serialization_failed"}}"#,
641641- record.did, record.collection, record.rkey, record.cid
642642- );
643643- records
644644- .write_raw_to_dlq(
645645- record.action.as_str().to_smolstr(),
646646- raw_data,
647647- e.to_string(),
648648- event_id,
649649- )
650650- .await?;
651651- let _ = ack_tx.send(event_id).await;
652652- continue;
653653- }
654654- },
655655- None => "{}".to_string(),
656656- };
657657-658658- debug!(
659659- op = record.action.as_str(),
660660- id = event_id,
661661- len = json.len(),
662662- "writing record"
663663- );
664664-665665- records
666666- .write(RawRecordInsert {
667667- did: record.did.clone(),
668668- collection: record.collection.clone(),
669669- rkey: record.rkey.clone(),
670670- cid: record.cid.clone(),
671671- rev: record.rev.clone(),
672672- record: json.to_smolstr(),
673673- operation: record.action.as_str().to_smolstr(),
674674- seq: event_id,
675675- event_time: Utc::now(),
676676- is_live: record.live,
677677- })
678678- .await?;
679679- records.commit().await?;
680680-681681- processed += 1;
682682- }
683683- TapEvent::Identity(envelope) => {
684684- let identity = &envelope.identity;
685685-686686- identities
687687- .write(&RawIdentityEvent {
688688- did: identity.did.clone(),
689689- handle: identity.handle.clone(),
690690- seq: event_id,
691691- event_time: Utc::now(),
692692- })
693693- .await
694694- .map_err(|e| ClickHouseError::Query {
695695- message: "identity write failed".into(),
696696- source: e,
697697- })?;
698698- identities
699699- .commit()
700700- .await
701701- .map_err(|e| ClickHouseError::Query {
702702- message: "identity commit failed".into(),
703703- source: e,
704704- })?;
705705- }
706706- }
707707-708708- // Send ack after successful write+commit
709709- let _ = ack_tx.send(event_id).await;
710710-711711- // Periodic stats
712712- if last_stats.elapsed() >= Duration::from_secs(10) {
713713- info!(processed, last_seq, "tap indexer stats");
714714- last_stats = Instant::now();
715715- }
716716-717717- // Save cursor every 30s for visibility
718718- if last_cursor_save.elapsed() >= Duration::from_secs(30) && last_seq > 0 {
719719- if let Err(e) = self.save_cursor(last_seq).await {
720720- warn!(error = ?e, "failed to save tap cursor");
721721- }
722722- last_cursor_save = Instant::now();
723723- }
724724- }
725725-726726- // Final flush
727727- records.end().await?;
728728- identities.end().await.map_err(|e| ClickHouseError::Query {
729729- message: "final identities flush failed".into(),
730730- source: e,
731731- })?;
732732-733733- // Final cursor save
734734- if last_seq > 0 {
735735- self.save_cursor(last_seq).await?;
736736- }
737737-738738- info!(last_seq, "tap stream ended");
739739- Ok(())
740740- }
741741-}
+3-1
crates/weaver-index/src/lib.rs
···44pub mod error;
55pub mod firehose;
66pub mod indexer;
77+pub mod parallel_tap;
78pub mod server;
89pub mod sqlite;
910pub mod tap;
10111112pub use config::Config;
1213pub use error::{IndexError, Result};
1313-pub use indexer::{FirehoseIndexer, TapIndexer, load_cursor};
1414+pub use indexer::{FirehoseIndexer, load_cursor};
1515+pub use parallel_tap::TapIndexer;
1416pub use server::{AppState, ServerConfig};
1517pub use sqlite::{ShardKey, ShardRouter, SqliteShard};
+261
crates/weaver-index/src/parallel_tap.rs
···11+use std::sync::Arc;
22+use std::time::{Duration, Instant};
33+44+use chrono::Utc;
55+use smol_str::ToSmolStr;
66+use tokio::task::JoinHandle;
77+use tracing::{debug, error, info, trace, warn};
88+99+use crate::clickhouse::{
1010+ Client, InserterConfig, RawIdentityEvent, RawRecordInsert, ResilientRecordInserter,
1111+};
1212+use crate::config::{IndexerConfig, TapConfig};
1313+use crate::error::{ClickHouseError, Result};
1414+use crate::tap::{TapConfig as TapConsumerConfig, TapConsumer, TapEvent};
1515+1616+/// TAP indexer with multiple parallel websocket connections
1717+///
1818+/// Each worker maintains its own websocket connection to TAP and its own
1919+/// ClickHouse inserter. TAP distributes events across connected clients,
2020+/// and its ack-gating mechanism ensures per-DID ordering is preserved
2121+/// regardless of which worker handles which events.
2222+pub struct TapIndexer {
2323+ client: Arc<Client>,
2424+ tap_config: TapConfig,
2525+ inserter_config: InserterConfig,
2626+ config: Arc<IndexerConfig>,
2727+ num_workers: usize,
2828+}
2929+3030+impl TapIndexer {
3131+ pub fn new(
3232+ client: Client,
3333+ tap_config: TapConfig,
3434+ inserter_config: InserterConfig,
3535+ config: IndexerConfig,
3636+ num_workers: usize,
3737+ ) -> Self {
3838+ Self {
3939+ client: Arc::new(client),
4040+ tap_config,
4141+ inserter_config,
4242+ config: Arc::new(config),
4343+ num_workers,
4444+ }
4545+ }
4646+4747+ pub async fn run(&self) -> Result<()> {
4848+ info!(
4949+ num_workers = self.num_workers,
5050+ url = %self.tap_config.url,
5151+ "starting parallel tap indexer"
5252+ );
5353+5454+ let mut handles: Vec<JoinHandle<Result<()>>> = Vec::with_capacity(self.num_workers);
5555+5656+ for worker_id in 0..self.num_workers {
5757+ let client = self.client.clone();
5858+ let tap_config = self.tap_config.clone();
5959+ let inserter_config = self.inserter_config.clone();
6060+ let config = self.config.clone();
6161+6262+ let handle = tokio::spawn(async move {
6363+ run_tap_worker(worker_id, client, tap_config, inserter_config, config).await
6464+ });
6565+6666+ handles.push(handle);
6767+ }
6868+6969+ // Wait for all workers
7070+ // TODO: Implement proper supervision - restart failed workers instead of propagating
7171+ for (i, handle) in handles.into_iter().enumerate() {
7272+ match handle.await {
7373+ Ok(Ok(())) => {
7474+ info!(worker_id = i, "tap worker finished cleanly");
7575+ }
7676+ Ok(Err(e)) => {
7777+ error!(worker_id = i, error = ?e, "tap worker failed");
7878+ return Err(e);
7979+ }
8080+ Err(e) => {
8181+ error!(worker_id = i, error = ?e, "tap worker panicked");
8282+ return Err(crate::error::FirehoseError::Stream {
8383+ message: format!("worker {} panicked: {}", i, e),
8484+ }
8585+ .into());
8686+ }
8787+ }
8888+ }
8989+9090+ Ok(())
9191+ }
9292+}
9393+9494+async fn run_tap_worker(
9595+ worker_id: usize,
9696+ client: Arc<Client>,
9797+ tap_config: TapConfig,
9898+ inserter_config: InserterConfig,
9999+ config: Arc<IndexerConfig>,
100100+) -> Result<()> {
101101+ info!(worker_id, url = %tap_config.url, "tap worker starting");
102102+103103+ let consumer_config =
104104+ TapConsumerConfig::new(tap_config.url.clone()).with_acks(tap_config.send_acks);
105105+ let consumer = TapConsumer::new(consumer_config);
106106+107107+ let (mut events, ack_tx) = consumer.connect().await?;
108108+109109+ // Each worker has its own resilient inserter
110110+ let mut records = ResilientRecordInserter::new(client.inner().clone(), inserter_config);
111111+ let mut identities = client.inserter::<RawIdentityEvent>("raw_identity_events");
112112+113113+ let mut processed: u64 = 0;
114114+ let mut last_stats = Instant::now();
115115+116116+ info!(worker_id, "tap worker connected, starting event loop");
117117+118118+ loop {
119119+ // Get time until next required flush
120120+ let records_time = records.time_left().unwrap_or(Duration::from_secs(10));
121121+ let identities_time = identities.time_left().unwrap_or(Duration::from_secs(10));
122122+ let time_left = records_time.min(identities_time);
123123+124124+ let event = match tokio::time::timeout(time_left, events.recv()).await {
125125+ Ok(Some(event)) => event,
126126+ Ok(None) => {
127127+ info!(worker_id, "tap channel closed, exiting");
128128+ break;
129129+ }
130130+ Err(_) => {
131131+ // Timeout - flush inserters
132132+ trace!(worker_id, "flush timeout, committing inserters");
133133+ records.commit().await?;
134134+ identities
135135+ .commit()
136136+ .await
137137+ .map_err(|e| ClickHouseError::Query {
138138+ message: "periodic identities commit failed".into(),
139139+ source: e,
140140+ })?;
141141+ continue;
142142+ }
143143+ };
144144+145145+ let event_id = event.id();
146146+147147+ match event {
148148+ TapEvent::Record(envelope) => {
149149+ let record = &envelope.record;
150150+151151+ // Collection filter
152152+ if !config.collections.matches(&record.collection) {
153153+ let _ = ack_tx.send(event_id).await;
154154+ continue;
155155+ }
156156+157157+ // Serialize record
158158+ let json = match &record.record {
159159+ Some(v) => match serde_json::to_string(v) {
160160+ Ok(s) => s,
161161+ Err(e) => {
162162+ warn!(
163163+ worker_id,
164164+ did = %record.did,
165165+ collection = %record.collection,
166166+ rkey = %record.rkey,
167167+ error = ?e,
168168+ "failed to serialize record, sending to DLQ"
169169+ );
170170+ let raw_data = format!(
171171+ r#"{{"did":"{}","collection":"{}","rkey":"{}","cid":"{}","error":"serialization_failed"}}"#,
172172+ record.did, record.collection, record.rkey, record.cid
173173+ );
174174+ records
175175+ .write_raw_to_dlq(
176176+ record.action.as_str().to_smolstr(),
177177+ raw_data,
178178+ e.to_string(),
179179+ event_id,
180180+ )
181181+ .await?;
182182+ let _ = ack_tx.send(event_id).await;
183183+ continue;
184184+ }
185185+ },
186186+ None => "{}".to_string(),
187187+ };
188188+189189+ debug!(
190190+ worker_id,
191191+ op = record.action.as_str(),
192192+ id = event_id,
193193+ len = json.len(),
194194+ "writing record"
195195+ );
196196+197197+ records
198198+ .write(RawRecordInsert {
199199+ did: record.did.clone(),
200200+ collection: record.collection.clone(),
201201+ rkey: record.rkey.clone(),
202202+ cid: record.cid.clone(),
203203+ rev: record.rev.clone(),
204204+ record: json.to_smolstr(),
205205+ operation: record.action.as_str().to_smolstr(),
206206+ seq: event_id,
207207+ event_time: Utc::now(),
208208+ is_live: record.live,
209209+ })
210210+ .await?;
211211+ records.commit().await?;
212212+213213+ // Ack after successful processing
214214+ let _ = ack_tx.send(event_id).await;
215215+216216+ processed += 1;
217217+ }
218218+ TapEvent::Identity(envelope) => {
219219+ let identity = &envelope.identity;
220220+221221+ identities
222222+ .write(&RawIdentityEvent {
223223+ did: identity.did.clone(),
224224+ handle: identity.handle.clone(),
225225+ seq: event_id,
226226+ event_time: Utc::now(),
227227+ })
228228+ .await
229229+ .map_err(|e| ClickHouseError::Query {
230230+ message: "identity write failed".into(),
231231+ source: e,
232232+ })?;
233233+ identities
234234+ .commit()
235235+ .await
236236+ .map_err(|e| ClickHouseError::Query {
237237+ message: "identity commit failed".into(),
238238+ source: e,
239239+ })?;
240240+241241+ let _ = ack_tx.send(event_id).await;
242242+ }
243243+ }
244244+245245+ // Periodic stats
246246+ if last_stats.elapsed() > Duration::from_secs(30) {
247247+ info!(worker_id, processed, "tap worker stats");
248248+ last_stats = Instant::now();
249249+ }
250250+ }
251251+252252+ // Clean shutdown
253253+ records.end().await?;
254254+ identities.end().await.map_err(|e| ClickHouseError::Query {
255255+ message: "identities end failed".into(),
256256+ source: e,
257257+ })?;
258258+259259+ info!(worker_id, processed, "tap worker finished");
260260+ Ok(())
261261+}