···44pub mod constellation;
55pub mod error;
66pub mod resolve;
77+#[cfg(feature = "telemetry")]
88+pub mod telemetry;
79pub mod transport;
810pub mod worker_rt;
911
+170
crates/weaver-common/src/telemetry.rs
···11+//! Telemetry infrastructure for weaver services.
22+//!
33+//! Provides:
44+//! - Prometheus metrics with `/metrics` endpoint
55+//! - Tracing with pretty console output + optional Loki push
66+//!
77+//! # Usage
88+//!
99+//! ```ignore
1010+//! use weaver_common::telemetry::{self, TelemetryConfig};
1111+//!
1212+//! #[tokio::main]
1313+//! async fn main() {
1414+//! // Initialize telemetry (metrics + tracing)
1515+//! let config = TelemetryConfig::from_env("weaver-index");
1616+//! telemetry::init(config).await;
1717+//!
1818+//! // Mount the metrics endpoint in your axum router
1919+//! let app = Router::new()
2020+//! .route("/metrics", get(|| async { telemetry::render() }));
2121+//!
2222+//! // Use metrics
2323+//! metrics::counter!("requests_total").increment(1);
2424+//!
2525+//! // Use tracing (goes to both console and loki if configured)
2626+//! tracing::info!("server started");
2727+//! }
2828+//! ```
2929+3030+use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
3131+use std::sync::OnceLock;
3232+use tracing::Level;
3333+use tracing_subscriber::layer::SubscriberExt;
3434+use tracing_subscriber::util::SubscriberInitExt;
3535+use tracing_subscriber::{EnvFilter, Layer};
3636+3737+static PROMETHEUS_HANDLE: OnceLock<PrometheusHandle> = OnceLock::new();
3838+3939+/// Telemetry configuration
4040+#[derive(Debug, Clone)]
4141+pub struct TelemetryConfig {
4242+ /// Service name for labeling (e.g., "weaver-index", "weaver-app")
4343+ pub service_name: String,
4444+ /// Loki push URL (e.g., "http://localhost:3100"). None disables Loki.
4545+ pub loki_url: Option<String>,
4646+ /// Console log level (default: INFO, DEBUG in debug builds)
4747+ pub console_level: Level,
4848+}
4949+5050+impl TelemetryConfig {
5151+ /// Load config from environment variables.
5252+ ///
5353+ /// - `LOKI_URL`: Loki push endpoint (optional)
5454+ /// - `RUST_LOG`: Standard env filter (optional, overrides console_level)
5555+ pub fn from_env(service_name: impl Into<String>) -> Self {
5656+ let console_level = if cfg!(debug_assertions) {
5757+ Level::DEBUG
5858+ } else {
5959+ Level::INFO
6060+ };
6161+6262+ Self {
6363+ service_name: service_name.into(),
6464+ loki_url: std::env::var("LOKI_URL").ok(),
6565+ console_level,
6666+ }
6767+ }
6868+}
6969+7070+/// Initialize telemetry (metrics + tracing).
7171+///
7272+/// Call once at application startup. If `LOKI_URL` is set, spawns a background
7373+/// task to push logs to Loki.
7474+pub async fn init(config: TelemetryConfig) {
7575+ // Initialize prometheus metrics
7676+ init_metrics();
7777+7878+ // Initialize tracing
7979+ init_tracing(config).await;
8080+}
8181+8282+/// Initialize just the prometheus metrics recorder.
8383+pub fn init_metrics() -> &'static PrometheusHandle {
8484+ PROMETHEUS_HANDLE.get_or_init(|| {
8585+ PrometheusBuilder::new()
8686+ .install_recorder()
8787+ .expect("failed to install prometheus recorder")
8888+ })
8989+}
9090+9191+/// Initialize tracing with console + optional Loki layers.
9292+async fn init_tracing(config: TelemetryConfig) {
9393+ let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
9494+ EnvFilter::new(format!(
9595+ "{}",
9696+ config.console_level.as_str().to_lowercase()
9797+ ))
9898+ });
9999+100100+ // Pretty console layer for human-readable stdout
101101+ let console_layer = tracing_subscriber::fmt::layer()
102102+ .with_target(true)
103103+ .with_thread_ids(false)
104104+ .with_file(false)
105105+ .with_line_number(false)
106106+ .compact()
107107+ .with_filter(env_filter);
108108+109109+ // Optional Loki layer for structured logs
110110+ if let Some(loki_url) = config.loki_url {
111111+ match tracing_loki::url::Url::parse(&loki_url) {
112112+ Ok(url) => {
113113+ let (loki_layer, loki_task) = tracing_loki::builder()
114114+ .label("service", config.service_name.clone())
115115+ .expect("invalid label")
116116+ .build_url(url)
117117+ .expect("failed to build loki layer");
118118+119119+ tracing_subscriber::registry()
120120+ .with(console_layer)
121121+ .with(loki_layer)
122122+ .init();
123123+124124+ // Spawn the background task that pushes to Loki
125125+ tokio::spawn(loki_task);
126126+127127+ tracing::info!(
128128+ service = %config.service_name,
129129+ loki_url = %loki_url,
130130+ "telemetry initialized with loki"
131131+ );
132132+ }
133133+ Err(e) => {
134134+ // Invalid URL - fall back to console only
135135+ tracing_subscriber::registry().with(console_layer).init();
136136+137137+ tracing::warn!(
138138+ error = %e,
139139+ loki_url = %loki_url,
140140+ "invalid LOKI_URL, falling back to console only"
141141+ );
142142+ }
143143+ }
144144+ } else {
145145+ // No Loki URL - console only
146146+ tracing_subscriber::registry().with(console_layer).init();
147147+148148+ tracing::debug!(
149149+ service = %config.service_name,
150150+ "telemetry initialized (console only, set LOKI_URL to enable loki)"
151151+ );
152152+ }
153153+}
154154+155155+/// Get the prometheus handle.
156156+pub fn handle() -> &'static PrometheusHandle {
157157+ PROMETHEUS_HANDLE.get_or_init(|| {
158158+ PrometheusBuilder::new()
159159+ .install_recorder()
160160+ .expect("failed to install prometheus recorder")
161161+ })
162162+}
163163+164164+/// Render metrics in prometheus text format.
165165+pub fn render() -> String {
166166+ handle().render()
167167+}
168168+169169+// Re-export the metrics crate for convenience
170170+pub use metrics::{counter, gauge, histogram};
+16-1
crates/weaver-index/Cargo.toml
···2121[dependencies]
2222# Internal
2323weaver-api = { path = "../weaver-api", features = ["streaming"] }
2424+weaver-common = { path = "../weaver-common", features = ["telemetry"] }
24252526# AT Protocol / Jacquard
2626-jacquard = { workspace = true, features = ["websocket", "zstd"] }
2727+jacquard = { workspace = true, features = ["websocket", "zstd", "dns", "cache"] }
2728jacquard-common = { workspace = true }
2829jacquard-repo = { workspace = true }
3030+jacquard-axum = { workspace = true }
29313032# ClickHouse
3133clickhouse = { version = "0.14", features = ["inserter", "chrono", "rustls-tls-ring", "rustls-tls-webpki-roots"] }
···6668base64 = "0.22"
6769dashmap = "6"
6870include_dir = "0.7.4"
7171+7272+# WebSocket (for tap consumer)
7373+tokio-tungstenite = { version = "0.26", features = ["native-tls"] }
7474+futures-util = "0.3"
7575+7676+# HTTP server
7777+axum = { version = "0.8.7", features = ["macros"] }
7878+tower = "0.5.2"
7979+tower-http = { version = "0.6.7", features = ["trace", "cors"] }
8080+8181+# SQLite (shard storage)
8282+rusqlite = { version = "0.37.0", features = ["bundled"] }
8383+rusqlite_migration = { version = "2.1.0", features = ["from-directory"] }
···3535 -- Populated by async batch validation, not in hot path
3636 validation_state LowCardinality(String) DEFAULT 'unchecked',
37373838+ -- Whether this came from live firehose (true) or backfill (false)
3939+ -- Backfill events may not reflect current state until repo is fully synced
4040+ is_live Bool DEFAULT true,
4141+3842 -- Materialized AT URI for convenience
3943 uri String MATERIALIZED concat('at://', did, '/', collection, '/', rkey),
4044
+2-8
crates/weaver-index/src/bin/storage_benchmark.rs
···178178 let firehose_config = FirehoseConfig::from_env()?;
179179180180 info!(
181181- "Connecting to ClickHouse at {} (database: {})",
181181+ "Connecting to ClickHouse at:\n{} (database: {})",
182182 ch_config.url, ch_config.database
183183 );
184184 let client = Client::new(&ch_config)?;
···189189 drop_benchmark_tables(&client).await?;
190190 }
191191192192- // Create tables
193192 info!("Creating benchmark tables...");
194193 create_benchmark_tables(&client).await?;
195194196196- // Create inserters
197195 let mut json_inserter = client.inserter::<RawRecordJson>(TABLE_JSON);
198196 let mut cbor_inserter = client.inserter::<RawRecordCbor>(TABLE_CBOR);
199197200200- // Connect to firehose
201201- info!("Connecting to firehose at {}", firehose_config.relay_url);
198198+ info!("Connecting to firehose at:\n {}", firehose_config.relay_url);
202199 let consumer = FirehoseConsumer::new(firehose_config);
203200 let mut stream = consumer.connect().await?;
204201···353350 }
354351 }
355352356356- // Final flush
357353 info!("Flushing remaining records...");
358354 json_inserter
359355 .end()
···370366 source: e,
371367 })?;
372368373373- // Final report
374369 info!("\n========== FINAL RESULTS ==========");
375370 report_progress(
376371 &client,
···453448 errors
454449 );
455450456456- // Lag info - critical for detecting if we're falling behind
457451 if lag.sample_count > 0 {
458452 info!(
459453 " Lag: current={:.1}s, min={:.1}s, max={:.1}s (window)",
+132-25
crates/weaver-index/src/bin/weaver_indexer.rs
···11use clap::{Parser, Subcommand};
22-use miette::IntoDiagnostic;
33-use tracing::{Level, info, warn};
44-use tracing_subscriber::EnvFilter;
22+use tracing::{error, info, warn};
53use weaver_index::clickhouse::{Client, Migrator, Tables};
66-use weaver_index::config::{ClickHouseConfig, FirehoseConfig, IndexerConfig};
44+use weaver_index::config::{
55+ ClickHouseConfig, FirehoseConfig, IndexerConfig, ShardConfig, SourceMode, TapConfig,
66+};
77use weaver_index::firehose::FirehoseConsumer;
88-use weaver_index::{Indexer, load_cursor};
88+use weaver_index::server::{AppState, ServerConfig, TelemetryConfig, telemetry};
99+use weaver_index::{FirehoseIndexer, TapIndexer, load_cursor};
9101011#[derive(Parser)]
1112#[command(name = "indexer")]
···3132 /// Check database connectivity
3233 Health,
33343434- /// Start the indexer service (not yet implemented)
3535+ /// Start the full service (indexer + HTTP server)
3536 Run,
3737+3838+ /// Start only the HTTP server (no indexing)
3939+ Serve,
4040+4141+ /// Start only the indexer (no HTTP server)
4242+ Index,
3643}
37443845#[tokio::main]
3946async fn main() -> miette::Result<()> {
4047 dotenvy::dotenv().ok();
41484242- let console_level = if cfg!(debug_assertions) {
4343- Level::DEBUG
4444- } else {
4545- Level::INFO
4646- };
4747-4848- tracing_subscriber::fmt()
4949- .with_env_filter(
5050- tracing_subscriber::EnvFilter::builder()
5151- .from_env_lossy()
5252- .add_directive(console_level.into())
5353- .add_directive("hyper_util=info".parse().into_diagnostic()?),
5454- )
5555- .init();
4949+ // Initialize telemetry (metrics + tracing with optional Loki)
5050+ let telemetry_config = TelemetryConfig::from_env("weaver-index");
5151+ telemetry::init(telemetry_config).await;
56525753 let args = Args::parse();
58545955 match args.command {
6056 Command::Migrate { dry_run, reset } => run_migrate(dry_run, reset).await,
6157 Command::Health => run_health().await,
6262- Command::Run => run_indexer().await,
5858+ Command::Run => run_full().await,
5959+ Command::Serve => run_server_only().await,
6060+ Command::Index => run_indexer_only().await,
6361 }
6462}
6563···126124 Ok(())
127125}
128126129129-async fn run_indexer() -> miette::Result<()> {
127127+/// Run both indexer and HTTP server concurrently (production mode)
128128+async fn run_full() -> miette::Result<()> {
129129+ let ch_config = ClickHouseConfig::from_env()?;
130130+ let shard_config = ShardConfig::from_env();
131131+ let server_config = ServerConfig::from_env();
132132+ let indexer_config = IndexerConfig::from_env();
133133+ let source_mode = SourceMode::from_env();
134134+135135+ info!(
136136+ "Connecting to ClickHouse at {} (database: {})",
137137+ ch_config.url, ch_config.database
138138+ );
139139+ info!("SQLite shards at {}", shard_config.base_path.display());
140140+141141+ // Create separate clients for indexer and server
142142+ let indexer_client = Client::new(&ch_config)?;
143143+ let server_client = Client::new(&ch_config)?;
144144+145145+ // Build AppState for server
146146+ let state = AppState::new(server_client, shard_config);
147147+148148+ // Spawn the indexer task
149149+ let indexer_handle = match source_mode {
150150+ SourceMode::Firehose => {
151151+ let mut firehose_config = FirehoseConfig::from_env()?;
152152+ if firehose_config.cursor.is_none() {
153153+ if let Some(cursor) = load_cursor(&indexer_client).await? {
154154+ firehose_config.cursor = Some(cursor);
155155+ }
156156+ }
157157+ info!(
158158+ "Connecting to firehose at {} (cursor: {:?})",
159159+ firehose_config.relay_url, firehose_config.cursor
160160+ );
161161+ let consumer = FirehoseConsumer::new(firehose_config);
162162+ let indexer = FirehoseIndexer::new(indexer_client, consumer, indexer_config).await?;
163163+ info!("Starting firehose indexer");
164164+ tokio::spawn(async move { indexer.run().await })
165165+ }
166166+ SourceMode::Tap => {
167167+ let tap_config = TapConfig::from_env()?;
168168+ let indexer = TapIndexer::new(indexer_client, tap_config, indexer_config);
169169+ info!("Starting tap indexer");
170170+ tokio::spawn(async move { indexer.run().await })
171171+ }
172172+ };
173173+174174+ // Run server, monitoring indexer health
175175+ tokio::select! {
176176+ result = weaver_index::server::run(state, server_config) => {
177177+ result?;
178178+ }
179179+ result = indexer_handle => {
180180+ match result {
181181+ Ok(Ok(())) => info!("Indexer completed"),
182182+ Ok(Err(e)) => error!("Indexer failed: {}", e),
183183+ Err(e) => error!("Indexer task panicked: {}", e),
184184+ }
185185+ }
186186+ }
187187+188188+ Ok(())
189189+}
190190+191191+/// Run only the indexer (no HTTP server)
192192+async fn run_indexer_only() -> miette::Result<()> {
130193 let ch_config = ClickHouseConfig::from_env()?;
131131- let mut firehose_config = FirehoseConfig::from_env()?;
132194 let indexer_config = IndexerConfig::from_env();
195195+ let source_mode = SourceMode::from_env();
133196134197 info!(
135198 "Connecting to ClickHouse at {} (database: {})",
···137200 );
138201 let client = Client::new(&ch_config)?;
139202203203+ match source_mode {
204204+ SourceMode::Firehose => run_firehose_indexer(client, indexer_config).await,
205205+ SourceMode::Tap => {
206206+ let tap_config = TapConfig::from_env()?;
207207+ run_tap_indexer(client, tap_config, indexer_config).await
208208+ }
209209+ }
210210+}
211211+212212+async fn run_firehose_indexer(client: Client, indexer_config: IndexerConfig) -> miette::Result<()> {
213213+ let mut firehose_config = FirehoseConfig::from_env()?;
214214+140215 // Load cursor from ClickHouse if not overridden by env var
141216 if firehose_config.cursor.is_none() {
142217 if let Some(cursor) = load_cursor(&client).await? {
···150225 );
151226 let consumer = FirehoseConsumer::new(firehose_config);
152227153153- let indexer = Indexer::new(client, consumer, indexer_config).await?;
228228+ let indexer = FirehoseIndexer::new(client, consumer, indexer_config).await?;
229229+230230+ info!("Starting firehose indexer");
231231+ indexer.run().await?;
154232155155- info!("Starting indexer");
233233+ Ok(())
234234+}
235235+236236+async fn run_tap_indexer(
237237+ client: Client,
238238+ tap_config: TapConfig,
239239+ indexer_config: IndexerConfig,
240240+) -> miette::Result<()> {
241241+ let indexer = TapIndexer::new(client, tap_config, indexer_config);
242242+243243+ info!("Starting tap indexer");
156244 indexer.run().await?;
157245158246 Ok(())
159247}
248248+249249+async fn run_server_only() -> miette::Result<()> {
250250+ let ch_config = ClickHouseConfig::from_env()?;
251251+ let shard_config = ShardConfig::from_env();
252252+ let server_config = ServerConfig::from_env();
253253+254254+ info!(
255255+ "Connecting to ClickHouse at {} (database: {})",
256256+ ch_config.url, ch_config.database
257257+ );
258258+ info!("SQLite shards at {}", shard_config.base_path.display());
259259+260260+ let client = Client::new(&ch_config)?;
261261+262262+ let state = AppState::new(client, shard_config);
263263+ weaver_index::server::run(state, server_config).await?;
264264+265265+ Ok(())
266266+}
+172-1
crates/weaver-index/src/clickhouse/client.rs
···11+use std::time::Duration;
22+13use crate::config::ClickHouseConfig;
24use crate::error::{ClickHouseError, IndexError};
35use clickhouse::Row;
46use clickhouse::inserter::Inserter;
77+use serde::Deserialize;
5869/// ClickHouse client wrapper with connection pooling and batched inserts
710pub struct Client {
···1922 // Enable JSON type support (treated as string at transport level)
2023 .with_option("allow_experimental_json_type", "1")
2124 .with_option("input_format_binary_read_json_as_string", "1")
2222- .with_option("output_format_binary_write_json_as_string", "1");
2525+ .with_option("output_format_binary_write_json_as_string", "1")
2626+ .with_option("send_timeout", "120")
2727+ .with_option("receive_timeout", "120");
23282429 Ok(Self { inner })
2530 }
···4550 .inserter(table)
4651 .with_max_rows(1000)
4752 .with_period_bias(0.1)
5353+ .with_period(Some(Duration::from_secs(1)))
4854 .with_max_bytes(1_048_576)
4955 }
5056···9096 pub fn inner(&self) -> &clickhouse::Client {
9197 &self.inner
9298 }
9999+100100+ /// Get a single record by (did, collection, rkey)
101101+ ///
102102+ /// Returns the latest non-deleted version from raw_records.
103103+ pub async fn get_record(
104104+ &self,
105105+ did: &str,
106106+ collection: &str,
107107+ rkey: &str,
108108+ ) -> Result<Option<RecordRow>, IndexError> {
109109+ // FINAL ensures ReplacingMergeTree deduplication is applied
110110+ let query = r#"
111111+ SELECT cid, record
112112+ FROM raw_records FINAL
113113+ WHERE did = ?
114114+ AND collection = ?
115115+ AND rkey = ?
116116+ AND operation != 'delete'
117117+ ORDER BY event_time DESC
118118+ LIMIT 1
119119+ "#;
120120+121121+ let row = self
122122+ .inner
123123+ .query(query)
124124+ .bind(did)
125125+ .bind(collection)
126126+ .bind(rkey)
127127+ .fetch_optional::<RecordRow>()
128128+ .await
129129+ .map_err(|e| ClickHouseError::Query {
130130+ message: "failed to get record".into(),
131131+ source: e,
132132+ })?;
133133+134134+ Ok(row)
135135+ }
136136+137137+ /// Insert a single record (for cache-on-miss)
138138+ ///
139139+ /// Used when fetching a record from upstream that wasn't in our cache.
140140+ pub async fn insert_record(
141141+ &self,
142142+ did: &str,
143143+ collection: &str,
144144+ rkey: &str,
145145+ cid: &str,
146146+ record_json: &str,
147147+ ) -> Result<(), IndexError> {
148148+ use crate::clickhouse::schema::RawRecordInsert;
149149+ use chrono::DateTime;
150150+ use smol_str::SmolStr;
151151+152152+ let row = RawRecordInsert {
153153+ did: SmolStr::new(did),
154154+ collection: SmolStr::new(collection),
155155+ rkey: SmolStr::new(rkey),
156156+ cid: SmolStr::new(cid),
157157+ rev: SmolStr::new_static(""), // Unknown from upstream fetch
158158+ record: SmolStr::new(record_json),
159159+ operation: SmolStr::new_static("cache"), // Distinguish from firehose ops
160160+ seq: 0, // Not from firehose
161161+ event_time: DateTime::UNIX_EPOCH, // Sort behind canonical firehose data
162162+ is_live: false, // Fetched on-demand, not from firehose
163163+ };
164164+165165+ let mut insert = self
166166+ .inner
167167+ .insert::<RawRecordInsert>("raw_records")
168168+ .await
169169+ .map_err(|e| ClickHouseError::Insert {
170170+ message: "failed to create insert".into(),
171171+ source: e,
172172+ })?;
173173+174174+ insert
175175+ .write(&row)
176176+ .await
177177+ .map_err(|e| ClickHouseError::Insert {
178178+ message: "failed to write record".into(),
179179+ source: e,
180180+ })?;
181181+182182+ insert.end().await.map_err(|e| ClickHouseError::Insert {
183183+ message: "failed to flush insert".into(),
184184+ source: e,
185185+ })?;
186186+187187+ Ok(())
188188+ }
189189+190190+ /// List records for a repo+collection
191191+ ///
192192+ /// Returns non-deleted records ordered by rkey, with cursor-based pagination.
193193+ pub async fn list_records(
194194+ &self,
195195+ did: &str,
196196+ collection: &str,
197197+ limit: u32,
198198+ cursor: Option<&str>,
199199+ reverse: bool,
200200+ ) -> Result<Vec<RecordListRow>, IndexError> {
201201+ let order = if reverse { "DESC" } else { "ASC" };
202202+ let cursor_op = if reverse { "<" } else { ">" };
203203+204204+ // Build query with optional cursor
205205+ let query = if cursor.is_some() {
206206+ format!(
207207+ r#"
208208+ SELECT rkey, cid, record
209209+ FROM raw_records FINAL
210210+ WHERE did = ?
211211+ AND collection = ?
212212+ AND rkey {cursor_op} ?
213213+ AND operation != 'delete'
214214+ ORDER BY rkey {order}
215215+ LIMIT ?
216216+ "#,
217217+ )
218218+ } else {
219219+ format!(
220220+ r#"
221221+ SELECT rkey, cid, record
222222+ FROM raw_records FINAL
223223+ WHERE did = ?
224224+ AND collection = ?
225225+ AND operation != 'delete'
226226+ ORDER BY rkey {order}
227227+ LIMIT ?
228228+ "#,
229229+ )
230230+ };
231231+232232+ let mut q = self.inner.query(&query).bind(did).bind(collection);
233233+234234+ if let Some(cursor_rkey) = cursor {
235235+ q = q.bind(cursor_rkey);
236236+ }
237237+238238+ let rows = q
239239+ .bind(limit)
240240+ .fetch_all::<RecordListRow>()
241241+ .await
242242+ .map_err(|e| ClickHouseError::Query {
243243+ message: "failed to list records".into(),
244244+ source: e,
245245+ })?;
246246+247247+ Ok(rows)
248248+ }
93249}
9425095251/// Table size statistics from system.parts
···121277 }
122278 }
123279}
280280+281281+/// Single record from raw_records (for getRecord)
282282+#[derive(Debug, Clone, Row, Deserialize)]
283283+pub struct RecordRow {
284284+ pub cid: String,
285285+ pub record: String, // JSON string
286286+}
287287+288288+/// Record with rkey from raw_records (for listRecords)
289289+#[derive(Debug, Clone, Row, Deserialize)]
290290+pub struct RecordListRow {
291291+ pub rkey: String,
292292+ pub cid: String,
293293+ pub record: String, // JSON string
294294+}
+7
crates/weaver-index/src/clickhouse/schema.rs
···30303131/// Validation states for records
3232pub mod validation {
3333+ #[allow(dead_code)]
3334 pub const UNCHECKED: &str = "unchecked";
3535+ #[allow(dead_code)]
3436 pub const VALID: &str = "valid";
3737+ #[allow(dead_code)]
3538 pub const INVALID_REV: &str = "invalid_rev";
3939+ #[allow(dead_code)]
3640 pub const INVALID_GAP: &str = "invalid_gap";
4141+ #[allow(dead_code)]
3742 pub const INVALID_ACCOUNT: &str = "invalid_account";
3843}
3944···5156 pub seq: u64,
5257 #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
5358 pub event_time: DateTime<Utc>,
5959+ /// Whether this came from live firehose (true) or backfill (false)
6060+ pub is_live: bool,
5461 // Note: indexed_at has DEFAULT now64(3), omit from insert
5562 // Note: validation_state has DEFAULT 'unchecked', omit from insert
5663}
···55use dashmap::DashMap;
66use n0_future::StreamExt;
77use smol_str::{SmolStr, ToSmolStr};
88-use tracing::{debug, info, warn};
88+use tracing::{debug, info, trace, warn};
991010use chrono::DateTime;
1111···1313 AccountRevState, Client, FirehoseCursor, RawAccountEvent, RawIdentityEvent, RawRecordInsert,
1414};
1515use crate::config::IndexerConfig;
1616-use crate::error::{IndexError, Result};
1616+use crate::config::TapConfig;
1717+use crate::error::{ClickHouseError, IndexError, Result};
1718use crate::firehose::{
1819 Account, Commit, ExtractedRecord, FirehoseConsumer, Identity, MessageStream,
1920 SubscribeReposMessage, extract_records,
2021};
2222+use crate::tap::{TapConfig as TapConsumerConfig, TapConsumer, TapEvent};
21232224/// Default consumer ID for cursor tracking
2325const CONSUMER_ID: &str = "main";
···160162 }
161163}
162164163163-/// Main indexer that consumes firehose and writes to ClickHouse
164164-pub struct Indexer {
165165+/// Firehose indexer that consumes AT Protocol firehose and writes to ClickHouse
166166+pub struct FirehoseIndexer {
165167 client: Arc<Client>,
166168 consumer: FirehoseConsumer,
167169 rev_cache: RevCache,
168170 config: IndexerConfig,
169171}
170172171171-impl Indexer {
172172- /// Create a new indexer
173173+impl FirehoseIndexer {
174174+ /// Create a new firehose indexer
173175 pub async fn new(
174176 client: Client,
175177 consumer: FirehoseConsumer,
···226228227229 info!("starting indexer loop");
228230229229- while let Some(result) = stream.next().await {
231231+ loop {
232232+ // Get time until next required flush - must commit before socket timeout (30s)
233233+ let records_time = records.time_left().unwrap_or(Duration::from_secs(10));
234234+ let identities_time = identities.time_left().unwrap_or(Duration::from_secs(10));
235235+ let accounts_time = accounts.time_left().unwrap_or(Duration::from_secs(10));
236236+ let time_left = records_time.min(identities_time).min(accounts_time);
237237+238238+ let result =
239239+ match tokio::time::timeout(time_left, stream.next()).await {
240240+ Ok(Some(result)) => result,
241241+ Ok(None) => {
242242+ // Stream ended
243243+ break;
244244+ }
245245+ Err(_) => {
246246+ // Timeout - flush inserters to keep INSERT alive
247247+ debug!("flush timeout, committing inserters");
248248+ records.commit().await.map_err(|e| {
249249+ crate::error::ClickHouseError::Query {
250250+ message: "periodic records commit failed".into(),
251251+ source: e,
252252+ }
253253+ })?;
254254+ identities.commit().await.map_err(|e| {
255255+ crate::error::ClickHouseError::Query {
256256+ message: "periodic identities commit failed".into(),
257257+ source: e,
258258+ }
259259+ })?;
260260+ accounts.commit().await.map_err(|e| {
261261+ crate::error::ClickHouseError::Query {
262262+ message: "periodic accounts commit failed".into(),
263263+ source: e,
264264+ }
265265+ })?;
266266+ continue;
267267+ }
268268+ };
269269+230270 let msg = match result {
231271 Ok(msg) => msg,
232272 Err(e) => {
···381421 operation: record.operation.clone(),
382422 seq: record.seq as u64,
383423 event_time: record.event_time,
424424+ is_live: true,
384425 })
385426 .await
386427 .map_err(|e| crate::error::ClickHouseError::Query {
···455496/// Minimal struct for delete lookups - just the fields we need to process the delete
456497#[derive(Debug, Clone, clickhouse::Row, serde::Deserialize)]
457498struct LookupRawRecord {
499499+ #[allow(dead_code)]
458500 did: SmolStr,
501501+ #[allow(dead_code)]
459502 collection: SmolStr,
503503+ #[allow(dead_code)]
460504 rkey: SmolStr,
505505+ #[allow(dead_code)]
461506 record: SmolStr, // JSON string of the original record
462507}
463508···510555 tokio::time::sleep(Duration::from_secs(1)).await;
511556 }
512557}
558558+559559+// ============================================================================
560560+// TapIndexer - consumes from tap websocket
561561+// ============================================================================
562562+563563+/// Consumer ID for tap cursor tracking
564564+const TAP_CONSUMER_ID: &str = "tap";
565565+566566+/// Tap indexer that consumes from tap websocket and writes to ClickHouse
567567+pub struct TapIndexer {
568568+ client: Arc<Client>,
569569+ tap_config: TapConfig,
570570+ config: IndexerConfig,
571571+}
572572+573573+impl TapIndexer {
574574+ /// Create a new tap indexer
575575+ pub fn new(client: Client, tap_config: TapConfig, config: IndexerConfig) -> Self {
576576+ Self {
577577+ client: Arc::new(client),
578578+ tap_config,
579579+ config,
580580+ }
581581+ }
582582+583583+ /// Save tap cursor to ClickHouse for visibility
584584+ async fn save_cursor(&self, seq: u64) -> Result<()> {
585585+ let query = format!(
586586+ "INSERT INTO firehose_cursor (consumer_id, seq, event_time) VALUES ('{}', {}, now64(3))",
587587+ TAP_CONSUMER_ID, seq
588588+ );
589589+590590+ self.client.execute(&query).await?;
591591+ debug!(seq, "saved tap cursor");
592592+ Ok(())
593593+ }
594594+595595+ /// Run the tap indexer loop
596596+ pub async fn run(&self) -> Result<()> {
597597+ info!(url = %self.tap_config.url, "connecting to tap...");
598598+599599+ let consumer_config = TapConsumerConfig::new(self.tap_config.url.clone())
600600+ .with_acks(self.tap_config.send_acks);
601601+ let consumer = TapConsumer::new(consumer_config);
602602+603603+ let (mut events, ack_tx) = consumer.connect().await?;
604604+605605+ let mut records = self.client.inserter::<RawRecordInsert>("raw_records");
606606+ let mut identities = self
607607+ .client
608608+ .inserter::<RawIdentityEvent>("raw_identity_events");
609609+610610+ let mut processed: u64 = 0;
611611+ let mut last_seq: u64 = 0;
612612+ let mut last_stats = Instant::now();
613613+ let mut last_cursor_save = Instant::now();
614614+615615+ info!("starting tap indexer loop");
616616+617617+ loop {
618618+ // Get time until next required flush - must commit before socket timeout (30s)
619619+ let records_time = records.time_left().unwrap_or(Duration::from_secs(10));
620620+ let identities_time = identities.time_left().unwrap_or(Duration::from_secs(10));
621621+ let time_left = records_time.min(identities_time);
622622+623623+ let event = match tokio::time::timeout(time_left, events.recv()).await {
624624+ Ok(Some(event)) => event,
625625+ Ok(None) => {
626626+ // Channel closed, exit loop
627627+ break;
628628+ }
629629+ Err(_) => {
630630+ // Timeout - flush inserters to keep INSERT alive
631631+ trace!("flush timeout, committing inserters");
632632+ records.commit().await.map_err(|e| ClickHouseError::Query {
633633+ message: "periodic records commit failed".into(),
634634+ source: e,
635635+ })?;
636636+ identities
637637+ .commit()
638638+ .await
639639+ .map_err(|e| ClickHouseError::Query {
640640+ message: "periodic identities commit failed".into(),
641641+ source: e,
642642+ })?;
643643+ continue;
644644+ }
645645+ };
646646+647647+ let event_id = event.id();
648648+ last_seq = event_id;
649649+650650+ match event {
651651+ TapEvent::Record(envelope) => {
652652+ let record = &envelope.record;
653653+654654+ // Collection filter
655655+ if !self.config.collections.matches(&record.collection) {
656656+ // Still ack even if filtered
657657+ let _ = ack_tx.send(event_id).await;
658658+ continue;
659659+ }
660660+661661+ let json = record
662662+ .record
663663+ .as_ref()
664664+ .map(|v| serde_json::to_string(v).unwrap_or_default())
665665+ .unwrap_or_default();
666666+667667+ debug!(
668668+ op = record.action.as_str(),
669669+ id = event_id,
670670+ len = json.len(),
671671+ "writing record"
672672+ );
673673+674674+ records
675675+ .write(&RawRecordInsert {
676676+ did: record.did.clone(),
677677+ collection: record.collection.clone(),
678678+ rkey: record.rkey.clone(),
679679+ cid: record.cid.clone(),
680680+ rev: record.rev.clone(),
681681+ record: json.to_smolstr(),
682682+ operation: record.action.as_str().to_smolstr(),
683683+ seq: event_id,
684684+ event_time: Utc::now(),
685685+ is_live: record.live,
686686+ })
687687+ .await
688688+ .map_err(|e| ClickHouseError::Query {
689689+ message: "record write failed".into(),
690690+ source: e,
691691+ })?;
692692+ records.commit().await.map_err(|e| ClickHouseError::Query {
693693+ message: format!("record commit failed for id {}", event_id),
694694+ source: e,
695695+ })?;
696696+697697+ processed += 1;
698698+ }
699699+ TapEvent::Identity(envelope) => {
700700+ let identity = &envelope.identity;
701701+702702+ identities
703703+ .write(&RawIdentityEvent {
704704+ did: identity.did.clone(),
705705+ handle: identity.handle.clone(),
706706+ seq: event_id,
707707+ event_time: Utc::now(),
708708+ })
709709+ .await
710710+ .map_err(|e| ClickHouseError::Query {
711711+ message: "identity write failed".into(),
712712+ source: e,
713713+ })?;
714714+ identities
715715+ .commit()
716716+ .await
717717+ .map_err(|e| ClickHouseError::Query {
718718+ message: "identity commit failed".into(),
719719+ source: e,
720720+ })?;
721721+ }
722722+ }
723723+724724+ // Send ack after successful write+commit
725725+ let _ = ack_tx.send(event_id).await;
726726+727727+ // Periodic stats
728728+ if last_stats.elapsed() >= Duration::from_secs(10) {
729729+ info!(processed, last_seq, "tap indexer stats");
730730+ last_stats = Instant::now();
731731+ }
732732+733733+ // Save cursor every 30s for visibility
734734+ if last_cursor_save.elapsed() >= Duration::from_secs(30) && last_seq > 0 {
735735+ if let Err(e) = self.save_cursor(last_seq).await {
736736+ warn!(error = ?e, "failed to save tap cursor");
737737+ }
738738+ last_cursor_save = Instant::now();
739739+ }
740740+ }
741741+742742+ // Final flush
743743+ records.end().await.map_err(|e| ClickHouseError::Query {
744744+ message: "final records flush failed".into(),
745745+ source: e,
746746+ })?;
747747+ identities.end().await.map_err(|e| ClickHouseError::Query {
748748+ message: "final identities flush failed".into(),
749749+ source: e,
750750+ })?;
751751+752752+ // Final cursor save
753753+ if last_seq > 0 {
754754+ self.save_cursor(last_seq).await?;
755755+ }
756756+757757+ info!(last_seq, "tap stream ended");
758758+ Ok(())
759759+ }
760760+}
+7-1
crates/weaver-index/src/lib.rs
···11pub mod clickhouse;
22pub mod config;
33+pub mod endpoints;
34pub mod error;
45pub mod firehose;
56pub mod indexer;
77+pub mod server;
88+pub mod sqlite;
99+pub mod tap;
610711pub use config::Config;
812pub use error::{IndexError, Result};
99-pub use indexer::{load_cursor, Indexer};
1313+pub use indexer::{FirehoseIndexer, TapIndexer, load_cursor};
1414+pub use server::{AppState, ServerConfig};
1515+pub use sqlite::{ShardKey, ShardRouter, SqliteShard};
···11+-- Edit graph storage (roots and diffs)
22+-- Supports DAG structure for future merge support
33+44+CREATE TABLE edit_nodes (
55+ -- Edit record identity (decomposed)
66+ did TEXT NOT NULL,
77+ collection TEXT NOT NULL, -- 'sh.weaver.edit.root' or 'sh.weaver.edit.diff'
88+ rkey TEXT NOT NULL,
99+1010+ -- Resource being edited (decomposed)
1111+ resource_did TEXT NOT NULL,
1212+ resource_collection TEXT NOT NULL,
1313+ resource_rkey TEXT NOT NULL,
1414+1515+ node_type TEXT NOT NULL, -- 'root' | 'diff'
1616+ created_at TEXT NOT NULL,
1717+ indexed_at TEXT NOT NULL,
1818+1919+ PRIMARY KEY (did, collection, rkey)
2020+);
2121+2222+CREATE INDEX idx_edit_nodes_resource ON edit_nodes(resource_did, resource_collection, resource_rkey);
2323+CREATE INDEX idx_edit_nodes_author ON edit_nodes(did);
2424+2525+-- Edit graph edges (supports DAG)
2626+CREATE TABLE edit_edges (
2727+ -- Child reference (decomposed)
2828+ child_did TEXT NOT NULL,
2929+ child_collection TEXT NOT NULL,
3030+ child_rkey TEXT NOT NULL,
3131+3232+ -- Parent reference (decomposed)
3333+ parent_did TEXT NOT NULL,
3434+ parent_collection TEXT NOT NULL,
3535+ parent_rkey TEXT NOT NULL,
3636+3737+ edge_type TEXT NOT NULL, -- 'prev' | 'merge' (future)
3838+3939+ PRIMARY KEY (child_did, child_collection, child_rkey, parent_did, parent_collection, parent_rkey),
4040+ FOREIGN KEY (child_did, child_collection, child_rkey) REFERENCES edit_nodes(did, collection, rkey),
4141+ FOREIGN KEY (parent_did, parent_collection, parent_rkey) REFERENCES edit_nodes(did, collection, rkey)
4242+);
4343+4444+CREATE INDEX idx_edit_edges_parent ON edit_edges(parent_did, parent_collection, parent_rkey);
4545+4646+-- Fast path: track current head per resource
4747+CREATE TABLE edit_heads (
4848+ -- Resource identity (decomposed)
4949+ resource_did TEXT NOT NULL,
5050+ resource_collection TEXT NOT NULL,
5151+ resource_rkey TEXT NOT NULL,
5252+5353+ -- Latest root reference (decomposed)
5454+ root_did TEXT,
5555+ root_collection TEXT,
5656+ root_rkey TEXT,
5757+5858+ -- Current head reference (decomposed)
5959+ head_did TEXT,
6060+ head_collection TEXT,
6161+ head_rkey TEXT,
6262+6363+ updated_at TEXT NOT NULL,
6464+6565+ PRIMARY KEY (resource_did, resource_collection, resource_rkey)
6666+);
···11+-- Valid collaborators (invite + accept pairs)
22+CREATE TABLE collaborators (
33+ -- Resource reference (decomposed)
44+ resource_did TEXT NOT NULL,
55+ resource_collection TEXT NOT NULL,
66+ resource_rkey TEXT NOT NULL,
77+88+ collaborator_did TEXT NOT NULL,
99+1010+ -- Invite record reference (decomposed)
1111+ invite_did TEXT NOT NULL,
1212+ invite_rkey TEXT NOT NULL,
1313+1414+ -- Accept record reference (decomposed)
1515+ accept_did TEXT NOT NULL,
1616+ accept_rkey TEXT NOT NULL,
1717+1818+ scope TEXT NOT NULL, -- 'direct' | 'inherited'
1919+ granted_at TEXT NOT NULL,
2020+ indexed_at TEXT NOT NULL,
2121+2222+ PRIMARY KEY (resource_did, resource_collection, resource_rkey, collaborator_did)
2323+);
2424+2525+CREATE INDEX idx_collaborators_did ON collaborators(collaborator_did);
2626+2727+-- Active sessions (TTL-based, cleaned up on expiry)
2828+CREATE TABLE sessions (
2929+ -- Session record identity (decomposed)
3030+ did TEXT NOT NULL,
3131+ rkey TEXT NOT NULL,
3232+3333+ -- Resource reference (decomposed)
3434+ resource_did TEXT NOT NULL,
3535+ resource_collection TEXT NOT NULL,
3636+ resource_rkey TEXT NOT NULL,
3737+3838+ participant_did TEXT NOT NULL,
3939+ node_id TEXT NOT NULL,
4040+ relay_url TEXT, -- NULL if no relay
4141+ created_at TEXT NOT NULL,
4242+ expires_at TEXT, -- NULL = no expiry
4343+ indexed_at TEXT NOT NULL,
4444+4545+ PRIMARY KEY (did, rkey)
4646+);
4747+4848+CREATE INDEX idx_sessions_resource ON sessions(resource_did, resource_collection, resource_rkey);
4949+CREATE INDEX idx_sessions_expires ON sessions(expires_at);
5050+5151+-- Pending invites (no accept yet)
5252+CREATE TABLE pending_invites (
5353+ -- Invite record identity (decomposed)
5454+ did TEXT NOT NULL, -- inviter DID
5555+ rkey TEXT NOT NULL,
5656+5757+ -- Resource reference (decomposed)
5858+ resource_did TEXT NOT NULL,
5959+ resource_collection TEXT NOT NULL,
6060+ resource_rkey TEXT NOT NULL,
6161+6262+ inviter_did TEXT NOT NULL, -- same as did
6363+ invitee_did TEXT NOT NULL,
6464+ message TEXT, -- NULL if no message
6565+ expires_at TEXT, -- NULL = no expiry
6666+ created_at TEXT NOT NULL,
6767+ indexed_at TEXT NOT NULL,
6868+6969+ PRIMARY KEY (did, rkey)
7070+);
7171+7272+CREATE INDEX idx_pending_invites_resource ON pending_invites(resource_did, resource_collection, resource_rkey);
7373+CREATE INDEX idx_pending_invites_invitee ON pending_invites(invitee_did);
···11+-- Permissions cache
22+-- Local cache of permissions for collab-related hot paths.
33+-- ClickHouse is authoritative; this is populated on-demand for active resources.
44+CREATE TABLE permissions (
55+ -- Resource reference (decomposed)
66+ resource_did TEXT NOT NULL,
77+ resource_collection TEXT NOT NULL,
88+ resource_rkey TEXT NOT NULL,
99+1010+ did TEXT NOT NULL, -- user who has permission
1111+1212+ scope TEXT NOT NULL, -- 'owner' | 'direct' | 'inherited'
1313+1414+ -- Source reference (decomposed) - resource itself for owner, invite for others
1515+ source_did TEXT NOT NULL,
1616+ source_collection TEXT NOT NULL,
1717+ source_rkey TEXT NOT NULL,
1818+1919+ granted_at TEXT NOT NULL,
2020+2121+ PRIMARY KEY (resource_did, resource_collection, resource_rkey, did)
2222+);
2323+2424+CREATE INDEX idx_permissions_did ON permissions(did);
+5
crates/weaver-index/src/tap.rs
···11+mod consumer;
22+mod types;
33+44+pub use consumer::{TapConfig, TapConsumer};
55+pub use types::*;
+254
crates/weaver-index/src/tap/consumer.rs
···11+use std::time::Duration;
22+33+use futures_util::{SinkExt, StreamExt};
44+use tokio::sync::mpsc;
55+use tokio_tungstenite::{connect_async, tungstenite::Message};
66+use tracing::{debug, error, info, trace, warn};
77+use url::Url;
88+99+use crate::error::IndexError;
1010+1111+use super::{TapAck, TapEvent};
1212+1313+/// Messages sent to the writer task
1414+enum WriteCommand {
1515+ #[allow(dead_code)]
1616+ Ack(u64),
1717+ Pong(bytes::Bytes),
1818+}
1919+2020+/// Configuration for tap consumer
2121+#[derive(Debug, Clone)]
2222+pub struct TapConfig {
2323+ /// WebSocket URL for tap (e.g., ws://localhost:2480/channel)
2424+ pub url: Url,
2525+ /// Whether to send acks (disable for fire-and-forget mode)
2626+ pub send_acks: bool,
2727+ /// Reconnect delay on connection failure
2828+ pub reconnect_delay: Duration,
2929+}
3030+3131+impl TapConfig {
3232+ pub fn new(url: Url) -> Self {
3333+ Self {
3434+ url,
3535+ send_acks: true,
3636+ reconnect_delay: Duration::from_secs(5),
3737+ }
3838+ }
3939+4040+ pub fn with_acks(mut self, send_acks: bool) -> Self {
4141+ self.send_acks = send_acks;
4242+ self
4343+ }
4444+}
4545+4646+/// Consumer that connects to tap's websocket and yields events
4747+pub struct TapConsumer {
4848+ config: TapConfig,
4949+}
5050+5151+impl TapConsumer {
5252+ pub fn new(config: TapConfig) -> Self {
5353+ Self { config }
5454+ }
5555+5656+ /// Connect to tap and return channels for events and acks
5757+ ///
5858+ /// Returns a receiver for events and a sender for acks.
5959+ /// The consumer handles reconnection internally.
6060+ pub async fn connect(
6161+ &self,
6262+ ) -> Result<(mpsc::Receiver<TapEvent>, mpsc::Sender<u64>), IndexError> {
6363+ let (event_tx, event_rx) = mpsc::channel::<TapEvent>(10000);
6464+ let (ack_tx, ack_rx) = mpsc::channel::<u64>(10000);
6565+6666+ let config = self.config.clone();
6767+ tokio::spawn(async move {
6868+ run_connection_loop(config, event_tx, ack_rx).await;
6969+ });
7070+7171+ Ok((event_rx, ack_tx))
7272+ }
7373+}
7474+7575+async fn run_connection_loop(
7676+ config: TapConfig,
7777+ event_tx: mpsc::Sender<TapEvent>,
7878+ ack_rx: mpsc::Receiver<u64>,
7979+) {
8080+ loop {
8181+ info!(url = %config.url, "connecting to tap");
8282+8383+ match connect_async(config.url.as_str()).await {
8484+ Ok((ws_stream, _response)) => {
8585+ info!("connected to tap");
8686+8787+ let (write, read) = ws_stream.split();
8888+8989+ // Channel for reader -> writer communication (pongs, etc)
9090+ let (write_tx, write_rx) = mpsc::channel::<WriteCommand>(10000);
9191+9292+ // Spawn writer task
9393+ let send_acks = config.send_acks;
9494+ let writer_handle = tokio::spawn(run_writer(write, write_rx, ack_rx, send_acks));
9595+9696+ // Run reader in current task
9797+ let reader_result = run_reader(read, event_tx.clone(), write_tx, send_acks).await;
9898+9999+ // Reader finished - abort writer and wait for it
100100+ writer_handle.abort();
101101+ let _ = writer_handle.await;
102102+103103+ // Get back the ack_rx from... wait, we moved it. Need to restructure.
104104+ // For now, if reader dies we'll reconnect with a fresh ack channel state
105105+106106+ match reader_result {
107107+ ReaderResult::Closed => {
108108+ info!("tap connection closed");
109109+ }
110110+ ReaderResult::Error(e) => {
111111+ warn!(error = %e, "tap reader error");
112112+ }
113113+ ReaderResult::ChannelClosed => {
114114+ error!("event channel closed, stopping tap consumer");
115115+ return;
116116+ }
117117+ }
118118+119119+ // We lost the ack_rx to the writer task, need to break out
120120+ // and let caller reconnect if needed
121121+ break;
122122+ }
123123+ Err(e) => {
124124+ error!(error = ?e, "failed to connect to tap");
125125+ }
126126+ }
127127+128128+ // Reconnect after delay
129129+ info!(delay = ?config.reconnect_delay, "reconnecting to tap");
130130+ tokio::time::sleep(config.reconnect_delay).await;
131131+ }
132132+}
133133+134134+enum ReaderResult {
135135+ Closed,
136136+ Error(String),
137137+ ChannelClosed,
138138+}
139139+140140+async fn run_reader<S>(
141141+ mut read: S,
142142+ event_tx: mpsc::Sender<TapEvent>,
143143+ write_tx: mpsc::Sender<WriteCommand>,
144144+ send_acks: bool,
145145+) -> ReaderResult
146146+where
147147+ S: StreamExt<Item = Result<Message, tokio_tungstenite::tungstenite::Error>> + Unpin,
148148+{
149149+ while let Some(msg) = read.next().await {
150150+ match msg {
151151+ Ok(Message::Text(text)) => match serde_json::from_str::<TapEvent>(&text) {
152152+ Ok(event) => {
153153+ let event_id = event.id();
154154+ if event_tx.send(event).await.is_err() {
155155+ return ReaderResult::ChannelClosed;
156156+ }
157157+158158+ if !send_acks {
159159+ debug!(id = event_id, "event received (fire-and-forget)");
160160+ }
161161+ }
162162+ Err(e) => {
163163+ warn!(error = ?e, text = %text, "failed to parse tap event");
164164+ }
165165+ },
166166+ Ok(Message::Ping(data)) => {
167167+ if write_tx.send(WriteCommand::Pong(data)).await.is_err() {
168168+ return ReaderResult::Error("writer channel closed".into());
169169+ }
170170+ }
171171+ Ok(Message::Close(_)) => {
172172+ return ReaderResult::Closed;
173173+ }
174174+ Ok(_) => {
175175+ // Ignore binary, pong, etc.
176176+ }
177177+ Err(e) => {
178178+ return ReaderResult::Error(e.to_string());
179179+ }
180180+ }
181181+ }
182182+ ReaderResult::Closed
183183+}
184184+185185+async fn run_writer<S>(
186186+ mut write: S,
187187+ mut write_rx: mpsc::Receiver<WriteCommand>,
188188+ mut ack_rx: mpsc::Receiver<u64>,
189189+ send_acks: bool,
190190+) where
191191+ S: SinkExt<Message> + Unpin,
192192+ S::Error: std::fmt::Display,
193193+{
194194+ loop {
195195+ tokio::select! {
196196+ biased;
197197+198198+ // Handle pongs and other write commands from reader
199199+ cmd = write_rx.recv() => {
200200+ match cmd {
201201+ Some(WriteCommand::Pong(data)) => {
202202+ if let Err(e) = write.send(Message::Pong(data)).await {
203203+ warn!(error = %e, "failed to send pong");
204204+ return;
205205+ }
206206+ }
207207+ Some(WriteCommand::Ack(id)) => {
208208+ if send_acks {
209209+ if let Err(e) = send_ack(&mut write, id).await {
210210+ warn!(error = %e, id, "failed to send ack");
211211+ return;
212212+ }
213213+ }
214214+ }
215215+ None => {
216216+ // Reader closed the channel, we're done
217217+ return;
218218+ }
219219+ }
220220+ }
221221+222222+ // Handle acks from the indexer
223223+ id = ack_rx.recv(), if send_acks => {
224224+ match id {
225225+ Some(id) => {
226226+ if let Err(e) = send_ack(&mut write, id).await {
227227+ warn!(error = %e, id, "failed to send ack");
228228+ return;
229229+ }
230230+ }
231231+ None => {
232232+ // Ack channel closed, indexer is done
233233+ return;
234234+ }
235235+ }
236236+ }
237237+ }
238238+ }
239239+}
240240+241241+async fn send_ack<S>(write: &mut S, id: u64) -> Result<(), String>
242242+where
243243+ S: SinkExt<Message> + Unpin,
244244+ S::Error: std::fmt::Display,
245245+{
246246+ let ack = TapAck::new(id);
247247+ let json = serde_json::to_string(&ack).map_err(|e| e.to_string())?;
248248+ write
249249+ .send(Message::Text(json.into()))
250250+ .await
251251+ .map_err(|e| e.to_string())?;
252252+ trace!(id, "sent ack");
253253+ Ok(())
254254+}