···11-# For documentation on how to configure this file,
22-# see https://diesel.rs/guides/configuring-diesel-cli
33-44-[print_schema]
55-file = "src/schema.rs"
66-custom_type_derives = ["diesel::query_builder::QueryId", "Clone", "std::fmt::Debug", "jacquard::IntoStatic"]
77-patch_file = "src/schema.patch"
88-99-1010-[migrations_directory]
1111-dir = "/home/orual/Projects/weaver.sh/crates/weaver-index/migrations"
···11--- This file should undo anything in `up.sql`
22-drop index if exists idx_oauth_auth_requests_did;
33-44-drop index if exists idx_oauth_auth_requests_expires;
55-66-drop index if exists idx_oauth_sessions_did;
77-88-drop index if exists idx_oauth_sessions_did_session;
99-1010-drop table if exists oauth_auth_requests;
1111-1212-drop table if exists oauth_sessions;
1313-1414-drop table if exists profile_pronouns;
1515-1616-drop table if exists profile_links;
1717-1818-drop table if exists profile;
1919-2020-drop table if exists emails;
2121-2222-drop table if exists _jetstream;
2323-2424-drop table if exists follows;
2525-2626-drop table if exists public_keys;
2727-2828-drop table if exists registrations;
···11-create table if not exists registrations (
22- id integer not null primary key autoincrement,
33- domain text not null unique,
44- did text not null,
55- secret text not null,
66- created timestamp not null default (datetime('now')),
77- registered text
88-);
99-1010-create table if not exists public_keys (
1111- id integer not null primary key autoincrement,
1212- did text not null,
1313- name text not null,
1414- key_contents text not null,
1515- rkey text not null,
1616- created timestamp not null default (datetime('now')),
1717- unique (did, name, key_contents)
1818-);
1919-2020-create table if not exists follows (
2121- user_did text not null,
2222- subject_did text not null,
2323- rkey text not null,
2424- followed_at timestamp not null default (datetime('now')),
2525- primary key (user_did, subject_did),
2626- check (user_did <> subject_did)
2727-);
2828-2929-create table if not exists _jetstream (
3030- id integer not null primary key autoincrement,
3131- last_time_us integer not null
3232-);
3333-3434-create table if not exists emails (
3535- id integer not null primary key autoincrement,
3636- did text not null,
3737- email text not null,
3838- verified boolean not null default false,
3939- verification_code text not null,
4040- last_sent timestamp not null default (datetime('now')),
4141- is_primary boolean not null default false,
4242- created timestamp not null default (datetime('now')),
4343- unique (did, email)
4444-);
4545-4646-create table if not exists profile (
4747- -- id
4848- id integer not null primary key autoincrement,
4949- did text not null,
5050- -- data
5151- avatar text,
5252- description text not null,
5353- include_bluesky boolean not null default false,
5454- include_tangled boolean not null default false,
5555- location text,
5656- pinned_post text,
5757- created_at timestamp default (datetime('now')),
5858- -- constraints
5959- unique (did)
6060-);
6161-6262-create table if not exists profile_links (
6363- -- id
6464- id integer not null primary key autoincrement,
6565- did text not null,
6666- -- data
6767- link text not null,
6868- -- constraints
6969- foreign key (did) references profile (did) on delete cascade
7070-);
7171-7272-create table if not exists profile_pronouns (
7373- -- id
7474- id integer not null primary key autoincrement,
7575- did text not null,
7676- -- data
7777- pronoun text not null,
7878- -- constraints
7979- foreign key (did) references profile (did) on delete cascade
8080-);
8181-8282--- OAuth sessions table for jacquard ClientSessionData
8383-create table if not exists oauth_sessions (
8484- id integer not null primary key autoincrement,
8585- -- Extracted from ClientSessionData for indexing
8686- did text not null,
8787- session_id text not null,
8888- -- Full ClientSessionData as JSON
8989- session_data blob not null,
9090- created_at timestamp not null default (datetime('now')),
9191- updated_at timestamp not null default (datetime('now')),
9292- unique (did, session_id)
9393-);
9494-9595--- OAuth authorization requests table for jacquard AuthRequestData
9696-create table if not exists oauth_auth_requests (
9797- id integer not null primary key autoincrement,
9898- -- Extracted from AuthRequestData for indexing
9999- state text not null unique,
100100- -- Optional DID if known at auth request time
101101- account_did text,
102102- -- Full AuthRequestData as JSON
103103- auth_req_data blob not null,
104104- created_at timestamp not null default (datetime('now')),
105105- expires_at timestamp not null default (datetime('now', '+10 minutes'))
106106-);
107107-108108--- Index for quick session lookups
109109-create index if not exists idx_oauth_sessions_did_session on oauth_sessions(did, session_id);
110110-111111--- Index for DID lookups
112112-create index if not exists idx_oauth_sessions_did on oauth_sessions(did);
113113-114114--- Index for auth request cleanup
115115-create index if not exists idx_oauth_auth_requests_expires on oauth_auth_requests(expires_at);
116116-117117--- Index for DID lookups in auth requests
118118-create index if not exists idx_oauth_auth_requests_did on oauth_auth_requests(account_did) where account_did is not null;
···11+-- Migration tracking table
22+-- Tracks which migrations have been applied
33+44+CREATE TABLE IF NOT EXISTS _migrations (
55+ -- Migration filename (e.g., '001_raw_records.sql')
66+ name String,
77+88+ -- When this migration was applied
99+ applied_at DateTime64(3) DEFAULT now64(3)
1010+)
1111+ENGINE = MergeTree()
1212+ORDER BY (name);
···11+-- Raw records from firehose/jetstream
22+-- Core table for all AT Protocol records before denormalization
33+--
44+-- Uses ReplacingMergeTree to deduplicate on (collection, did, rkey) keeping latest indexed_at
55+-- JSON column stores full record, extract fields only when needed for ORDER BY/WHERE/JOINs
66+77+CREATE TABLE IF NOT EXISTS raw_records (
88+ -- Decomposed AT URI components (at://did/collection/rkey)
99+ did String,
1010+ collection LowCardinality(String),
1111+ rkey String,
1212+1313+ -- Content identifier from the record
1414+ cid String,
1515+1616+ -- Full record as native JSON (schema-flexible, queryable with record.field.subfield)
1717+ record JSON,
1818+1919+ -- Operation: 'create', 'update', 'delete'
2020+ operation LowCardinality(String),
2121+2222+ -- Firehose sequence number (metadata only, not for ordering - can jump on relay restart)
2323+ seq UInt64,
2424+2525+ -- Event timestamp from firehose
2626+ event_time DateTime64(3),
2727+2828+ -- When we indexed this record
2929+ indexed_at DateTime64(3) DEFAULT now64(3),
3030+3131+ -- Materialized AT URI for convenience
3232+ uri String MATERIALIZED concat('at://', did, '/', collection, '/', rkey)
3333+)
3434+ENGINE = ReplacingMergeTree(indexed_at)
3535+ORDER BY (collection, did, rkey, indexed_at);
···11+-- Identity events from firehose (#identity messages)
22+-- Tracks handle changes, key rotation, etc.
33+44+CREATE TABLE IF NOT EXISTS raw_identity_events (
55+ -- The DID this identity event is about
66+ did String,
77+88+ -- Handle (may be empty if cleared)
99+ handle String,
1010+1111+ -- Sequence number from firehose
1212+ seq UInt64,
1313+1414+ -- Event timestamp from firehose
1515+ event_time DateTime64(3),
1616+1717+ -- When we indexed this event
1818+ indexed_at DateTime64(3) DEFAULT now64(3)
1919+)
2020+ENGINE = MergeTree()
2121+ORDER BY (did, indexed_at);
···11+-- Account events from firehose (#account messages)
22+-- Tracks account status changes: active, deactivated, deleted, suspended, takendown
33+44+CREATE TABLE IF NOT EXISTS raw_account_events (
55+ -- The DID this account event is about
66+ did String,
77+88+ -- Whether the account is active
99+ active UInt8,
1010+1111+ -- Account status: 'active', 'deactivated', 'deleted', 'suspended', 'takendown'
1212+ status LowCardinality(String),
1313+1414+ -- Sequence number from firehose
1515+ seq UInt64,
1616+1717+ -- Event timestamp from firehose
1818+ event_time DateTime64(3),
1919+2020+ -- When we indexed this event
2121+ indexed_at DateTime64(3) DEFAULT now64(3)
2222+)
2323+ENGINE = MergeTree()
2424+ORDER BY (did, indexed_at);
···11+-- Dead-letter queue for malformed events
22+-- Events that couldn't be parsed or processed land here for debugging
33+44+CREATE TABLE IF NOT EXISTS raw_events_dlq (
55+ -- Event type we attempted to parse (if known)
66+ event_type LowCardinality(String),
77+88+ -- Raw event data (JSON string of whatever we received)
99+ raw_data String,
1010+1111+ -- Error message describing why parsing failed
1212+ error_message String,
1313+1414+ -- Sequence number from firehose (if available)
1515+ seq UInt64,
1616+1717+ -- When we received this event
1818+ received_at DateTime64(3) DEFAULT now64(3)
1919+)
2020+ENGINE = MergeTree()
2121+ORDER BY (received_at);
···11+-- Firehose cursor persistence
22+-- Tracks our position in the firehose stream for resumption after restart
33+44+CREATE TABLE IF NOT EXISTS firehose_cursor (
55+ -- Consumer identifier (allows multiple consumers with different cursors)
66+ consumer_id String,
77+88+ -- Last successfully processed sequence number
99+ seq UInt64,
1010+1111+ -- Timestamp of the last processed event
1212+ event_time DateTime64(3),
1313+1414+ -- When we saved this cursor
1515+ updated_at DateTime64(3) DEFAULT now64(3)
1616+)
1717+ENGINE = ReplacingMergeTree(updated_at)
1818+ORDER BY (consumer_id);
-90
crates/weaver-index/src/api_error.rs
···11-use axum::{
22- Json,
33- extract::rejection::JsonRejection,
44- response::{IntoResponse, Response},
55-};
66-use hyper::StatusCode;
77-use miette::Diagnostic;
88-use serde::{Deserialize, Serialize};
99-use thiserror::Error;
1010-use tracing::error;
1111-1212-/// Custom error type for the API.
1313-/// The `#[from]` attribute allows for easy conversion from other error types.
1414-#[derive(Error, Debug, Diagnostic)]
1515-pub enum ApiError {
1616- /// Converts from an Axum built-in extractor error.
1717- #[diagnostic_source]
1818- #[error("Invalid payload.")]
1919- InvalidJsonBody(#[from] JsonRejection),
2020-2121- /// For errors that occur during manual validation.
2222- #[error("Invalid request: {0}")]
2323- #[diagnostic()]
2424- InvalidRequest(String),
2525-2626- /// Converts from `sqlx::Error`.
2727- #[error("A database error has occurred.")]
2828- #[diagnostic_source]
2929- DatabaseError(#[from] diesel::result::Error),
3030-3131- #[error("A Weaver error has occurred.")]
3232- #[diagnostic(transparent)]
3333- WeaverError(#[from] weaver_common::error::WeaverError),
3434- /// Converts from any `anyhow::Error`.
3535- #[error("An internal server error has occurred.")]
3636- #[diagnostic(transparent)]
3737- InternalError(miette::Report),
3838-}
3939-4040-impl From<miette::Report> for ApiError {
4141- fn from(err: miette::Report) -> Self {
4242- ApiError::InternalError(err)
4343- }
4444-}
4545-4646-#[derive(Serialize, Deserialize)]
4747-pub struct ApiErrorResp {
4848- pub message: String,
4949-}
5050-5151-// The IntoResponse implementation for ApiError logs the error message.
5252-//
5353-// To avoid exposing implementation details to API consumers, we separate
5454-// the message that we log from the API response message.
5555-impl IntoResponse for ApiError {
5656- fn into_response(self) -> Response {
5757- // Log detailed error for telemetry.
5858- let error_to_log = match &self {
5959- ApiError::InvalidJsonBody(err) => match err {
6060- JsonRejection::JsonDataError(e) => e.body_text(),
6161- JsonRejection::JsonSyntaxError(e) => e.body_text(),
6262- JsonRejection::MissingJsonContentType(_) => {
6363- "Missing `Content-Type: application/json` header".to_string()
6464- }
6565- JsonRejection::BytesRejection(_) => "Failed to buffer request body".to_string(),
6666- _ => "Unknown error".to_string(),
6767- },
6868- ApiError::InvalidRequest(_) => format!("{}", self),
6969- ApiError::WeaverError(err) => format!("{}", err),
7070- ApiError::DatabaseError(err) => format!("{}", err),
7171- ApiError::InternalError(err) => format!("{}", err),
7272- };
7373- error!("{}", error_to_log);
7474-7575- // Create a generic response to hide specific implementation details.
7676- let resp = ApiErrorResp {
7777- message: self.to_string(),
7878- };
7979-8080- // Determine the appropriate status code.
8181- let status = match self {
8282- ApiError::InvalidJsonBody(_) | ApiError::InvalidRequest(_) => StatusCode::BAD_REQUEST,
8383- ApiError::WeaverError(_) | ApiError::DatabaseError(_) | ApiError::InternalError(_) => {
8484- StatusCode::INTERNAL_SERVER_ERROR
8585- }
8686- };
8787-8888- (status, Json(resp)).into_response()
8989- }
9090-}
+483
crates/weaver-index/src/bin/storage_benchmark.rs
···11+use bytes::Bytes;
22+use chrono::{DateTime, Utc};
33+use clap::Parser;
44+use clickhouse::Row;
55+use n0_future::StreamExt;
66+use smol_str::SmolStr;
77+use std::time::{Duration, Instant};
88+use tracing::{info, warn};
99+use weaver_index::clickhouse::Client;
1010+use weaver_index::config::{ClickHouseConfig, FirehoseConfig};
1111+use weaver_index::firehose::{FirehoseConsumer, SubscribeReposMessage, extract_records};
1212+1313+// =============================================================================
1414+// Benchmark-specific schema (not part of production)
1515+// =============================================================================
1616+1717+const TABLE_JSON: &str = "raw_records_json";
1818+const TABLE_CBOR: &str = "raw_records_cbor";
1919+2020+/// Row type for JSON benchmark records
2121+#[derive(Debug, Clone, Row, serde::Serialize, serde::Deserialize)]
2222+struct RawRecordJson {
2323+ did: SmolStr,
2424+ collection: SmolStr,
2525+ rkey: SmolStr,
2626+ cid: String,
2727+ record: String,
2828+ operation: SmolStr,
2929+ seq: u64,
3030+ #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
3131+ event_time: DateTime<Utc>,
3232+}
3333+3434+/// Row type for CBOR benchmark records
3535+#[derive(Debug, Clone, Row, serde::Serialize, serde::Deserialize)]
3636+struct RawRecordCbor {
3737+ did: SmolStr,
3838+ collection: SmolStr,
3939+ rkey: SmolStr,
4040+ cid: String,
4141+ #[serde(with = "jacquard::serde_bytes_helper")]
4242+ record: Bytes,
4343+ operation: SmolStr,
4444+ seq: u64,
4545+ #[serde(with = "clickhouse::serde::chrono::datetime64::millis")]
4646+ event_time: DateTime<Utc>,
4747+}
4848+4949+async fn create_benchmark_tables(client: &Client) -> miette::Result<()> {
5050+ client
5151+ .execute(&format!(
5252+ r#"
5353+ CREATE TABLE IF NOT EXISTS {} (
5454+ did String,
5555+ collection LowCardinality(String),
5656+ rkey String,
5757+ cid String,
5858+ record JSON,
5959+ operation LowCardinality(String),
6060+ seq UInt64,
6161+ event_time DateTime64(3),
6262+ indexed_at DateTime64(3) DEFAULT now64(3)
6363+ )
6464+ ENGINE = MergeTree()
6565+ ORDER BY (collection, did, rkey, indexed_at)
6666+ "#,
6767+ TABLE_JSON
6868+ ))
6969+ .await?;
7070+7171+ client
7272+ .execute(&format!(
7373+ r#"
7474+ CREATE TABLE IF NOT EXISTS {} (
7575+ did String,
7676+ collection LowCardinality(String),
7777+ rkey String,
7878+ cid String,
7979+ record String,
8080+ operation LowCardinality(String),
8181+ seq UInt64,
8282+ event_time DateTime64(3),
8383+ indexed_at DateTime64(3) DEFAULT now64(3)
8484+ )
8585+ ENGINE = MergeTree()
8686+ ORDER BY (collection, did, rkey, indexed_at)
8787+ "#,
8888+ TABLE_CBOR
8989+ ))
9090+ .await?;
9191+9292+ Ok(())
9393+}
9494+9595+async fn drop_benchmark_tables(client: &Client) -> miette::Result<()> {
9696+ client
9797+ .execute(&format!("DROP TABLE IF EXISTS {}", TABLE_JSON))
9898+ .await?;
9999+ client
100100+ .execute(&format!("DROP TABLE IF EXISTS {}", TABLE_CBOR))
101101+ .await?;
102102+ Ok(())
103103+}
104104+105105+// =============================================================================
106106+// Benchmark logic
107107+// =============================================================================
108108+109109+/// Tracks firehose lag to detect if we're falling behind
110110+#[derive(Default)]
111111+struct LagStats {
112112+ min_ms: Option<i64>,
113113+ max_ms: Option<i64>,
114114+ current_ms: i64,
115115+ sample_count: u64,
116116+}
117117+118118+impl LagStats {
119119+ fn update(&mut self, event_time_ms: i64) {
120120+ let now_ms = Utc::now().timestamp_millis();
121121+ let lag = now_ms - event_time_ms;
122122+123123+ self.current_ms = lag;
124124+ self.sample_count += 1;
125125+126126+ self.min_ms = Some(self.min_ms.map_or(lag, |m| m.min(lag)));
127127+ self.max_ms = Some(self.max_ms.map_or(lag, |m| m.max(lag)));
128128+ }
129129+130130+ fn reset_window(&mut self) {
131131+ // Keep current but reset min/max for next reporting window
132132+ self.min_ms = Some(self.current_ms);
133133+ self.max_ms = Some(self.current_ms);
134134+ }
135135+}
136136+137137+#[derive(Parser)]
138138+#[command(name = "storage-benchmark")]
139139+#[command(about = "Benchmark CBOR vs JSON storage in ClickHouse")]
140140+struct Args {
141141+ /// Duration to run the benchmark in minutes
142142+ #[arg(short, long, default_value = "60")]
143143+ duration_minutes: u64,
144144+145145+ /// Batch size for ClickHouse inserts
146146+ #[arg(short, long, default_value = "1000")]
147147+ batch_size: usize,
148148+149149+ /// Report interval in seconds
150150+ #[arg(short, long, default_value = "30")]
151151+ report_interval_secs: u64,
152152+153153+ /// Drop and recreate tables before starting
154154+ #[arg(long)]
155155+ reset_tables: bool,
156156+}
157157+158158+#[tokio::main]
159159+async fn main() -> miette::Result<()> {
160160+ dotenvy::dotenv().ok();
161161+162162+ tracing_subscriber::fmt()
163163+ .with_env_filter(
164164+ tracing_subscriber::EnvFilter::from_default_env()
165165+ .add_directive("weaver_index=info".parse().unwrap())
166166+ .add_directive("storage_benchmark=info".parse().unwrap()),
167167+ )
168168+ .init();
169169+170170+ let args = Args::parse();
171171+172172+ info!("Storage Benchmark: CBOR vs JSON in ClickHouse");
173173+ info!("Duration: {} minutes", args.duration_minutes);
174174+ info!("Batch size: {}", args.batch_size);
175175+176176+ // Load configs
177177+ let ch_config = ClickHouseConfig::from_env()?;
178178+ let firehose_config = FirehoseConfig::from_env()?;
179179+180180+ info!(
181181+ "Connecting to ClickHouse at {} (database: {})",
182182+ ch_config.url, ch_config.database
183183+ );
184184+ let client = Client::new(&ch_config)?;
185185+186186+ // Reset tables if requested
187187+ if args.reset_tables {
188188+ info!("Dropping existing benchmark tables...");
189189+ drop_benchmark_tables(&client).await?;
190190+ }
191191+192192+ // Create tables
193193+ info!("Creating benchmark tables...");
194194+ create_benchmark_tables(&client).await?;
195195+196196+ // Create inserters
197197+ let mut json_inserter = client.inserter::<RawRecordJson>(TABLE_JSON);
198198+ let mut cbor_inserter = client.inserter::<RawRecordCbor>(TABLE_CBOR);
199199+200200+ // Connect to firehose
201201+ info!("Connecting to firehose at {}", firehose_config.relay_url);
202202+ let consumer = FirehoseConsumer::new(firehose_config);
203203+ let mut stream = consumer.connect().await?;
204204+205205+ // Tracking
206206+ let start = Instant::now();
207207+ let duration = Duration::from_secs(args.duration_minutes * 60);
208208+ let report_interval = Duration::from_secs(args.report_interval_secs);
209209+ let mut last_report = Instant::now();
210210+ let mut total_records = 0u64;
211211+ let mut total_commits = 0u64;
212212+ let mut errors = 0u64;
213213+ let mut lag_stats = LagStats::default();
214214+215215+ info!("Starting benchmark...");
216216+217217+ while start.elapsed() < duration {
218218+ // Check for report interval
219219+ if last_report.elapsed() >= report_interval {
220220+ // Flush inserters so size measurements are accurate
221221+ match json_inserter.commit().await {
222222+ Ok(stats) => info!(
223223+ " JSON flush: {} rows, {} transactions",
224224+ stats.rows, stats.transactions
225225+ ),
226226+ Err(e) => warn!("Failed to flush JSON inserter: {}", e),
227227+ }
228228+ match cbor_inserter.commit().await {
229229+ Ok(stats) => info!(
230230+ " CBOR flush: {} rows, {} transactions",
231231+ stats.rows, stats.transactions
232232+ ),
233233+ Err(e) => warn!("Failed to flush CBOR inserter: {}", e),
234234+ }
235235+236236+ report_progress(
237237+ &client,
238238+ total_records,
239239+ total_commits,
240240+ errors,
241241+ start.elapsed(),
242242+ &lag_stats,
243243+ )
244244+ .await;
245245+ lag_stats.reset_window();
246246+ last_report = Instant::now();
247247+ }
248248+249249+ // Get next message with timeout
250250+ let msg = tokio::time::timeout(Duration::from_secs(30), stream.next()).await;
251251+252252+ let msg = match msg {
253253+ Ok(Some(Ok(msg))) => msg,
254254+ Ok(Some(Err(e))) => {
255255+ warn!("Stream error: {}", e);
256256+ errors += 1;
257257+ continue;
258258+ }
259259+ Ok(None) => {
260260+ warn!("Stream ended unexpectedly");
261261+ break;
262262+ }
263263+ Err(_) => {
264264+ warn!("Timeout waiting for message");
265265+ continue;
266266+ }
267267+ };
268268+269269+ // Only process commits
270270+ let commit = match msg {
271271+ SubscribeReposMessage::Commit(c) => c,
272272+ _ => continue,
273273+ };
274274+275275+ total_commits += 1;
276276+277277+ // Track lag
278278+ lag_stats.update(commit.time.as_ref().timestamp_millis());
279279+280280+ // Extract records from the commit
281281+ let records = match extract_records(&commit).await {
282282+ Ok(r) => r,
283283+ Err(e) => {
284284+ warn!("Record extraction error: {}", e);
285285+ errors += 1;
286286+ continue;
287287+ }
288288+ };
289289+290290+ // Insert to both tables
291291+ for record in records {
292292+ // Skip deletes (no record data)
293293+ let Some(cbor_bytes) = &record.cbor_bytes else {
294294+ continue;
295295+ };
296296+297297+ // JSON table: decode CBOR to JSON
298298+ let json_str = match record.to_json() {
299299+ Ok(Some(j)) => j,
300300+ Ok(None) => continue,
301301+ Err(e) => {
302302+ warn!("JSON encode error: {}", e);
303303+ errors += 1;
304304+ continue;
305305+ }
306306+ };
307307+308308+ let event_time = DateTime::from_timestamp_millis(record.event_time_ms).unwrap();
309309+ // Insert JSON record
310310+ json_inserter
311311+ .write(&RawRecordJson {
312312+ did: record.did.clone(),
313313+ collection: record.collection.clone(),
314314+ rkey: record.rkey.clone(),
315315+ cid: record.cid.clone(),
316316+ record: json_str,
317317+ operation: record.operation.clone(),
318318+ seq: record.seq as u64,
319319+ event_time: event_time.clone(),
320320+ })
321321+ .await
322322+ .map_err(|e| weaver_index::error::ClickHouseError::Insert {
323323+ message: "json insert failed".into(),
324324+ source: e,
325325+ })?;
326326+327327+ // Insert CBOR record (raw bytes, no base64)
328328+ cbor_inserter
329329+ .write(&RawRecordCbor {
330330+ did: record.did,
331331+ collection: record.collection,
332332+ rkey: record.rkey,
333333+ cid: record.cid,
334334+ record: cbor_bytes.clone(),
335335+ operation: record.operation,
336336+ seq: record.seq as u64,
337337+ event_time,
338338+ })
339339+ .await
340340+ .map_err(|e| weaver_index::error::ClickHouseError::Insert {
341341+ message: "cbor insert failed".into(),
342342+ source: e,
343343+ })?;
344344+345345+ match json_inserter.commit().await {
346346+ Ok(_) => {}
347347+ Err(e) => warn!("Failed to flush JSON inserter: {}", e),
348348+ }
349349+ match cbor_inserter.commit().await {
350350+ Ok(_) => {}
351351+ Err(e) => warn!("Failed to flush CBOR inserter: {}", e),
352352+ }
353353+ total_records += 1;
354354+ }
355355+ }
356356+357357+ // Final flush
358358+ info!("Flushing remaining records...");
359359+ json_inserter
360360+ .end()
361361+ .await
362362+ .map_err(|e| weaver_index::error::ClickHouseError::Insert {
363363+ message: "json flush failed".into(),
364364+ source: e,
365365+ })?;
366366+ cbor_inserter
367367+ .end()
368368+ .await
369369+ .map_err(|e| weaver_index::error::ClickHouseError::Insert {
370370+ message: "cbor flush failed".into(),
371371+ source: e,
372372+ })?;
373373+374374+ // Final report
375375+ info!("\n========== FINAL RESULTS ==========");
376376+ report_progress(
377377+ &client,
378378+ total_records,
379379+ total_commits,
380380+ errors,
381381+ start.elapsed(),
382382+ &lag_stats,
383383+ )
384384+ .await;
385385+386386+ // Detailed size comparison
387387+ info!("\nStorage Comparison:");
388388+ let sizes = client.table_sizes(&[TABLE_JSON, TABLE_CBOR]).await?;
389389+390390+ for size in &sizes {
391391+ info!(
392392+ " {}: {} compressed, {} uncompressed, {:.2}x ratio, {} rows",
393393+ size.table,
394394+ size.compressed_human(),
395395+ size.uncompressed_human(),
396396+ size.compression_ratio(),
397397+ size.row_count
398398+ );
399399+ }
400400+401401+ if sizes.len() == 2 {
402402+ let json_size = sizes.iter().find(|s| s.table == TABLE_JSON);
403403+ let cbor_size = sizes.iter().find(|s| s.table == TABLE_CBOR);
404404+405405+ if let (Some(json), Some(cbor)) = (json_size, cbor_size) {
406406+ let compressed_diff = json.compressed_bytes as f64 / cbor.compressed_bytes as f64;
407407+ let uncompressed_diff = json.uncompressed_bytes as f64 / cbor.uncompressed_bytes as f64;
408408+409409+ info!("\nJSON vs CBOR:");
410410+ info!(
411411+ " Compressed: JSON is {:.2}x the size of CBOR",
412412+ compressed_diff
413413+ );
414414+ info!(
415415+ " Uncompressed: JSON is {:.2}x the size of CBOR",
416416+ uncompressed_diff
417417+ );
418418+419419+ if compressed_diff < 1.0 {
420420+ info!(
421421+ " Winner (compressed): JSON ({:.1}% smaller)",
422422+ (1.0 - compressed_diff) * 100.0
423423+ );
424424+ } else {
425425+ info!(
426426+ " Winner (compressed): CBOR ({:.1}% smaller)",
427427+ (1.0 - 1.0 / compressed_diff) * 100.0
428428+ );
429429+ }
430430+ }
431431+ }
432432+433433+ info!("\nBenchmark complete!");
434434+435435+ Ok(())
436436+}
437437+438438+async fn report_progress(
439439+ client: &Client,
440440+ total_records: u64,
441441+ total_commits: u64,
442442+ errors: u64,
443443+ elapsed: Duration,
444444+ lag: &LagStats,
445445+) {
446446+ let records_per_sec = total_records as f64 / elapsed.as_secs_f64();
447447+448448+ info!(
449449+ "Progress: {} records from {} commits in {:.1}s ({:.1}/s), {} errors",
450450+ total_records,
451451+ total_commits,
452452+ elapsed.as_secs_f64(),
453453+ records_per_sec,
454454+ errors
455455+ );
456456+457457+ // Lag info - critical for detecting if we're falling behind
458458+ if lag.sample_count > 0 {
459459+ info!(
460460+ " Lag: current={:.1}s, min={:.1}s, max={:.1}s (window)",
461461+ lag.current_ms as f64 / 1000.0,
462462+ lag.min_ms.unwrap_or(0) as f64 / 1000.0,
463463+ lag.max_ms.unwrap_or(0) as f64 / 1000.0,
464464+ );
465465+ }
466466+467467+ // Try to get current sizes
468468+ match client.table_sizes(&[TABLE_JSON, TABLE_CBOR]).await {
469469+ Ok(sizes) => {
470470+ for size in sizes {
471471+ info!(
472472+ " {}: {} compressed ({} rows)",
473473+ size.table,
474474+ size.compressed_human(),
475475+ size.row_count
476476+ );
477477+ }
478478+ }
479479+ Err(e) => {
480480+ warn!("Failed to query table sizes: {}", e);
481481+ }
482482+ }
483483+}
+98
crates/weaver-index/src/bin/weaver_indexer.rs
···11+use clap::{Parser, Subcommand};
22+use tracing::info;
33+use weaver_index::clickhouse::{Client, Migrator};
44+use weaver_index::config::ClickHouseConfig;
55+66+#[derive(Parser)]
77+#[command(name = "weaver-indexer")]
88+#[command(about = "Weaver index service - firehose ingestion and query serving")]
99+struct Args {
1010+ #[command(subcommand)]
1111+ command: Command,
1212+}
1313+1414+#[derive(Subcommand)]
1515+enum Command {
1616+ /// Run database migrations
1717+ Migrate {
1818+ /// Show what would be run without executing
1919+ #[arg(long)]
2020+ dry_run: bool,
2121+ },
2222+2323+ /// Check database connectivity
2424+ Health,
2525+2626+ /// Start the indexer service (not yet implemented)
2727+ Run,
2828+}
2929+3030+#[tokio::main]
3131+async fn main() -> miette::Result<()> {
3232+ dotenvy::dotenv().ok();
3333+3434+ tracing_subscriber::fmt()
3535+ .with_env_filter(
3636+ tracing_subscriber::EnvFilter::from_default_env()
3737+ .add_directive("weaver_index=info".parse().unwrap())
3838+ .add_directive("weaver_indexer=info".parse().unwrap()),
3939+ )
4040+ .init();
4141+4242+ let args = Args::parse();
4343+4444+ match args.command {
4545+ Command::Migrate { dry_run } => run_migrate(dry_run).await,
4646+ Command::Health => run_health().await,
4747+ Command::Run => run_indexer().await,
4848+ }
4949+}
5050+5151+async fn run_migrate(dry_run: bool) -> miette::Result<()> {
5252+ let config = ClickHouseConfig::from_env()?;
5353+ info!(
5454+ "Connecting to ClickHouse at {} (database: {})",
5555+ config.url, config.database
5656+ );
5757+5858+ let client = Client::new(&config)?;
5959+ let migrator = Migrator::new(&client);
6060+6161+ if dry_run {
6262+ let pending = migrator.pending().await?;
6363+ if pending.is_empty() {
6464+ info!("No pending migrations");
6565+ } else {
6666+ info!("Pending migrations:");
6767+ for name in pending {
6868+ info!(" - {}", name);
6969+ }
7070+ }
7171+ } else {
7272+ let result = migrator.run().await?;
7373+ info!("{}", result);
7474+ }
7575+7676+ Ok(())
7777+}
7878+7979+async fn run_health() -> miette::Result<()> {
8080+ let config = ClickHouseConfig::from_env()?;
8181+ info!(
8282+ "Connecting to ClickHouse at {} (database: {})",
8383+ config.url, config.database
8484+ );
8585+8686+ let client = Client::new(&config)?;
8787+8888+ // Simple connectivity check
8989+ client.execute("SELECT 1").await?;
9090+ info!("ClickHouse connection OK");
9191+9292+ Ok(())
9393+}
9494+9595+async fn run_indexer() -> miette::Result<()> {
9696+ info!("Indexer not yet implemented");
9797+ Ok(())
9898+}
+9
crates/weaver-index/src/clickhouse.rs
···11+mod client;
22+mod migrations;
33+mod schema;
44+55+pub use client::{Client, TableSize};
66+pub use migrations::{MigrationResult, Migrator};
77+pub use schema::{
88+ FirehoseCursor, RawAccountEvent, RawEventDlq, RawIdentityEvent, RawRecord, Tables,
99+};
+119
crates/weaver-index/src/clickhouse/client.rs
···11+use crate::config::ClickHouseConfig;
22+use crate::error::{ClickHouseError, IndexError};
33+use clickhouse::Row;
44+use clickhouse::inserter::Inserter;
55+66+/// ClickHouse client wrapper with connection pooling and batched inserts
77+pub struct Client {
88+ inner: clickhouse::Client,
99+}
1010+1111+impl Client {
1212+ /// Create a new client from configuration
1313+ pub fn new(config: &ClickHouseConfig) -> Result<Self, IndexError> {
1414+ let inner = clickhouse::Client::default()
1515+ .with_url(config.url.as_str())
1616+ .with_database(&config.database)
1717+ .with_user(&config.user)
1818+ .with_password(&config.password);
1919+2020+ Ok(Self { inner })
2121+ }
2222+2323+ /// Execute a DDL query (CREATE TABLE, etc.)
2424+ pub async fn execute(&self, query: &str) -> Result<(), IndexError> {
2525+ self.inner
2626+ .query(query)
2727+ .execute()
2828+ .await
2929+ .map_err(|e| ClickHouseError::Query {
3030+ message: "DDL execution failed".into(),
3131+ source: e,
3232+ })?;
3333+ Ok(())
3434+ }
3535+3636+ /// Create a batched inserter for a table
3737+ ///
3838+ /// The inserter accumulates rows and flushes them in batches for efficiency.
3939+ pub fn inserter<T: Row>(&self, table: &str) -> Inserter<T> {
4040+ self.inner
4141+ .inserter(table)
4242+ .with_max_rows(1000)
4343+ .with_period_bias(0.1)
4444+ .with_max_bytes(1_048_576)
4545+ }
4646+4747+ /// Query table sizes from system.parts
4848+ ///
4949+ /// Returns (table_name, compressed_bytes, uncompressed_bytes, row_count)
5050+ pub async fn table_sizes(&self, tables: &[&str]) -> Result<Vec<TableSize>, IndexError> {
5151+ let table_list = tables
5252+ .iter()
5353+ .map(|t| format!("'{}'", t))
5454+ .collect::<Vec<_>>()
5555+ .join(", ");
5656+5757+ let query = format!(
5858+ r#"
5959+ SELECT
6060+ table,
6161+ sum(bytes_on_disk) as compressed_bytes,
6262+ sum(data_uncompressed_bytes) as uncompressed_bytes,
6363+ sum(rows) as row_count
6464+ FROM system.parts
6565+ WHERE table IN ({})
6666+ AND active
6767+ GROUP BY table
6868+ "#,
6969+ table_list
7070+ );
7171+7272+ let rows = self
7373+ .inner
7474+ .query(&query)
7575+ .fetch_all::<TableSize>()
7676+ .await
7777+ .map_err(|e| ClickHouseError::Query {
7878+ message: "failed to query table sizes".into(),
7979+ source: e,
8080+ })?;
8181+8282+ Ok(rows)
8383+ }
8484+8585+ /// Get reference to inner client for advanced operations
8686+ pub fn inner(&self) -> &clickhouse::Client {
8787+ &self.inner
8888+ }
8989+}
9090+9191+/// Table size statistics from system.parts
9292+#[derive(Debug, Clone, Row, serde::Deserialize)]
9393+pub struct TableSize {
9494+ pub table: String,
9595+ pub compressed_bytes: u64,
9696+ pub uncompressed_bytes: u64,
9797+ pub row_count: u64,
9898+}
9999+100100+impl TableSize {
101101+ /// Format compressed size as human-readable string
102102+ pub fn compressed_human(&self) -> String {
103103+ humansize::format_size(self.compressed_bytes, humansize::BINARY)
104104+ }
105105+106106+ /// Format uncompressed size as human-readable string
107107+ pub fn uncompressed_human(&self) -> String {
108108+ humansize::format_size(self.uncompressed_bytes, humansize::BINARY)
109109+ }
110110+111111+ /// Compression ratio (uncompressed / compressed)
112112+ pub fn compression_ratio(&self) -> f64 {
113113+ if self.compressed_bytes == 0 {
114114+ 0.0
115115+ } else {
116116+ self.uncompressed_bytes as f64 / self.compressed_bytes as f64
117117+ }
118118+ }
119119+}
+151
crates/weaver-index/src/clickhouse/migrations.rs
···11+use crate::error::{ClickHouseError, IndexError};
22+use tracing::info;
33+44+use super::Client;
55+66+/// Embedded migrations - compiled into the binary
77+const MIGRATIONS: &[(&str, &str)] = &[
88+ (
99+ "000_migrations.sql",
1010+ include_str!("../../migrations/clickhouse/000_migrations.sql"),
1111+ ),
1212+ (
1313+ "001_raw_records.sql",
1414+ include_str!("../../migrations/clickhouse/001_raw_records.sql"),
1515+ ),
1616+ (
1717+ "002_identity_events.sql",
1818+ include_str!("../../migrations/clickhouse/002_identity_events.sql"),
1919+ ),
2020+ (
2121+ "003_account_events.sql",
2222+ include_str!("../../migrations/clickhouse/003_account_events.sql"),
2323+ ),
2424+ (
2525+ "004_events_dlq.sql",
2626+ include_str!("../../migrations/clickhouse/004_events_dlq.sql"),
2727+ ),
2828+ (
2929+ "005_firehose_cursor.sql",
3030+ include_str!("../../migrations/clickhouse/005_firehose_cursor.sql"),
3131+ ),
3232+];
3333+3434+/// Migration runner for ClickHouse
3535+pub struct Migrator<'a> {
3636+ client: &'a Client,
3737+}
3838+3939+impl<'a> Migrator<'a> {
4040+ pub fn new(client: &'a Client) -> Self {
4141+ Self { client }
4242+ }
4343+4444+ /// Run all pending migrations
4545+ pub async fn run(&self) -> Result<MigrationResult, IndexError> {
4646+ // First, ensure the migrations table exists (bootstrap)
4747+ self.ensure_migrations_table().await?;
4848+4949+ // Get list of already applied migrations
5050+ let applied = self.get_applied_migrations().await?;
5151+5252+ let mut applied_count = 0;
5353+ let mut skipped_count = 0;
5454+5555+ for (name, sql) in MIGRATIONS {
5656+ // Skip the bootstrap migration after first run
5757+ if *name == "000_migrations.sql" && applied.contains(&"000_migrations.sql".to_string())
5858+ {
5959+ skipped_count += 1;
6060+ continue;
6161+ }
6262+6363+ if applied.contains(&name.to_string()) {
6464+ info!(migration = %name, "already applied, skipping");
6565+ skipped_count += 1;
6666+ continue;
6767+ }
6868+6969+ info!(migration = %name, "applying migration");
7070+ self.client.execute(sql).await?;
7171+ self.record_migration(name).await?;
7272+ applied_count += 1;
7373+ }
7474+7575+ Ok(MigrationResult {
7676+ applied: applied_count,
7777+ skipped: skipped_count,
7878+ })
7979+ }
8080+8181+ /// Check which migrations would be applied without running them
8282+ pub async fn pending(&self) -> Result<Vec<String>, IndexError> {
8383+ // Try to get applied migrations, but if table doesn't exist, all are pending
8484+ let applied = match self.get_applied_migrations().await {
8585+ Ok(list) => list,
8686+ Err(_) => vec![],
8787+ };
8888+8989+ let pending: Vec<String> = MIGRATIONS
9090+ .iter()
9191+ .filter(|(name, _)| !applied.contains(&name.to_string()))
9292+ .map(|(name, _)| name.to_string())
9393+ .collect();
9494+9595+ Ok(pending)
9696+ }
9797+9898+ async fn ensure_migrations_table(&self) -> Result<(), IndexError> {
9999+ // Run the bootstrap migration directly
100100+ let (_, sql) = MIGRATIONS
101101+ .iter()
102102+ .find(|(name, _)| *name == "000_migrations.sql")
103103+ .expect("bootstrap migration must exist");
104104+105105+ self.client.execute(sql).await
106106+ }
107107+108108+ async fn get_applied_migrations(&self) -> Result<Vec<String>, IndexError> {
109109+ let rows: Vec<MigrationRow> = self
110110+ .client
111111+ .inner()
112112+ .query("SELECT name FROM _migrations ORDER BY name")
113113+ .fetch_all()
114114+ .await
115115+ .map_err(|e| ClickHouseError::Query {
116116+ message: "failed to fetch applied migrations".into(),
117117+ source: e,
118118+ })?;
119119+120120+ Ok(rows.into_iter().map(|r| r.name).collect())
121121+ }
122122+123123+ async fn record_migration(&self, name: &str) -> Result<(), IndexError> {
124124+ let query = format!("INSERT INTO _migrations (name) VALUES ('{}')", name);
125125+ self.client.execute(&query).await
126126+ }
127127+}
128128+129129+#[derive(Debug, Clone, clickhouse::Row, serde::Deserialize)]
130130+struct MigrationRow {
131131+ name: String,
132132+}
133133+134134+/// Result of running migrations
135135+#[derive(Debug, Clone)]
136136+pub struct MigrationResult {
137137+ /// Number of migrations applied
138138+ pub applied: usize,
139139+ /// Number of migrations skipped (already applied)
140140+ pub skipped: usize,
141141+}
142142+143143+impl std::fmt::Display for MigrationResult {
144144+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145145+ write!(
146146+ f,
147147+ "{} migrations applied, {} skipped",
148148+ self.applied, self.skipped
149149+ )
150150+ }
151151+}
···11-use diesel::prelude::*;
22-use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
33-pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
44-use diesel_async::RunQueryDsl;
55-use diesel_async::pooled_connection::AsyncDieselConnectionManager;
66-use diesel_async::pooled_connection::deadpool::Pool;
77-use diesel_async::sync_connection_wrapper::SyncConnectionWrapper;
88-99-#[derive(Clone)]
1010-pub struct Db {
1111- pub pool: Pool<SyncConnectionWrapper<SqliteConnection>>,
1212-}
1313-1414-impl Db {
1515- /// Yes, this fuction can and WILL panic if it can't create the connection pool
1616- /// for some reason. We just want to bail because the appview
1717- /// does not work without a database.
1818- pub async fn new(db_path: Option<String>) -> Self {
1919- let database_url = if let Some(db_path) = db_path {
2020- db_path
2121- } else {
2222- std::env::var("DATABASE_URL").expect("DATABASE_URL must be set")
2323- };
2424- let config = AsyncDieselConnectionManager::<SyncConnectionWrapper<SqliteConnection>>::new(
2525- database_url,
2626- );
2727- let pool = Pool::builder(config)
2828- .build()
2929- .expect("Failed to create pool");
3030- Self { pool }
3131- }
3232-}
3333-3434-pub fn run_migrations(
3535- db_path: Option<String>,
3636-) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3737- let database_url = if let Some(db_path) = db_path {
3838- db_path
3939- } else {
4040- std::env::var("DATABASE_URL").expect("DATABASE_URL must be set")
4141- };
4242- let mut connection = SqliteConnection::establish(&database_url)
4343- .unwrap_or_else(|_| panic!("Error connecting to {}", database_url));
4444- // This will run the necessary migrations.
4545- //
4646- // See the documentation for `MigrationHarness` for
4747- // all available methods.
4848- println!("Attempting migrations...");
4949- let result = connection.run_pending_migrations(MIGRATIONS);
5050- println!("{:?}", result);
5151- if result.is_err() {
5252- println!("Failed to run migrations");
5353- return result.map(|_| ());
5454- }
5555- println!("Migrations Applied:");
5656- let applied_migrations = connection.applied_migrations()?;
5757- for migration in applied_migrations {
5858- println!(" * {}", migration);
5959- }
6060- Ok(())
6161-}
6262-6363-pub struct Runtime;
···11+mod consumer;
22+mod records;
33+44+pub use consumer::{
55+ FirehoseConsumer, MessageStream, SubscribeReposMessage, Commit, Identity, Account, Sync,
66+};
77+pub use records::{extract_records, ExtractedRecord};
+48
crates/weaver-index/src/firehose/consumer.rs
···11+use crate::config::FirehoseConfig;
22+use crate::error::{FirehoseError, IndexError};
33+use jacquard_common::stream::StreamError;
44+use jacquard_common::xrpc::subscription::{SubscriptionClient, TungsteniteSubscriptionClient};
55+use n0_future::stream::Boxed;
66+77+// Re-export the message types from weaver_api for convenience
88+pub use weaver_api::com_atproto::sync::subscribe_repos::{
99+ Account, Commit, Identity, SubscribeRepos, SubscribeReposMessage, Sync,
1010+};
1111+1212+/// Typed firehose message stream
1313+pub type MessageStream = Boxed<Result<SubscribeReposMessage<'static>, StreamError>>;
1414+1515+/// Firehose consumer that connects to a relay and yields typed events
1616+pub struct FirehoseConsumer {
1717+ config: FirehoseConfig,
1818+}
1919+2020+impl FirehoseConsumer {
2121+ pub fn new(config: FirehoseConfig) -> Self {
2222+ Self { config }
2323+ }
2424+2525+ /// Connect to the firehose and return a typed message stream
2626+ ///
2727+ /// Messages are automatically decoded and converted to owned ('static) types.
2828+ pub async fn connect(&self) -> Result<MessageStream, IndexError> {
2929+ let client = TungsteniteSubscriptionClient::from_base_uri(self.config.relay_url.clone());
3030+3131+ let mut params = SubscribeRepos::new();
3232+ if let Some(cursor) = self.config.cursor {
3333+ params = params.cursor(cursor);
3434+ }
3535+ let params = params.build();
3636+3737+ let stream = client
3838+ .subscribe(¶ms)
3939+ .await
4040+ .map_err(|e| FirehoseError::Connection {
4141+ url: self.config.relay_url.to_string(),
4242+ message: e.to_string(),
4343+ })?;
4444+4545+ let (_sink, messages) = stream.into_stream();
4646+ Ok(messages)
4747+ }
4848+}
+108
crates/weaver-index/src/firehose/records.rs
···11+use crate::error::{CarError, IndexError};
22+use bytes::Bytes;
33+use jacquard_repo::car::reader::parse_car_bytes;
44+use smol_str::{SmolStr, ToSmolStr};
55+66+use super::consumer::Commit;
77+88+/// An extracted record from a firehose commit
99+#[derive(Debug, Clone)]
1010+pub struct ExtractedRecord {
1111+ /// DID of the repo owner
1212+ pub did: SmolStr,
1313+ /// Collection NSID (e.g., "app.bsky.feed.post")
1414+ pub collection: SmolStr,
1515+ /// Record key within the collection
1616+ pub rkey: SmolStr,
1717+ /// Content identifier
1818+ pub cid: String,
1919+ /// Operation type: "create", "update", or "delete"
2020+ pub operation: SmolStr,
2121+ /// Raw DAG-CBOR bytes of the record (None for deletes)
2222+ pub cbor_bytes: Option<Bytes>,
2323+ /// Sequence number from the firehose event
2424+ pub seq: i64,
2525+ /// Event timestamp (milliseconds since epoch)
2626+ pub event_time_ms: i64,
2727+}
2828+2929+impl ExtractedRecord {
3030+ /// Decode the CBOR bytes to JSON string
3131+ ///
3232+ /// Uses jacquard's RawData type which properly handles CID links
3333+ /// and other AT Protocol specific types.
3434+ pub fn to_json(&self) -> Result<Option<String>, IndexError> {
3535+ use jacquard_common::types::value::{RawData, from_cbor};
3636+3737+ match &self.cbor_bytes {
3838+ Some(bytes) => {
3939+ // RawData handles CID links and other IPLD types correctly
4040+ let value: RawData<'static> =
4141+ from_cbor::<RawData>(bytes).map_err(|e| CarError::RecordDecode {
4242+ message: format!("failed to decode DAG-CBOR: {}", e),
4343+ })?;
4444+ let json = serde_json::to_string(&value).map_err(|e| CarError::RecordDecode {
4545+ message: format!("failed to encode JSON: {}", e),
4646+ })?;
4747+ Ok(Some(json))
4848+ }
4949+ None => Ok(None),
5050+ }
5151+ }
5252+}
5353+5454+/// Extract records from a firehose commit
5555+///
5656+/// Parses the CAR data and extracts each record referenced by the operations.
5757+pub async fn extract_records(commit: &Commit<'_>) -> Result<Vec<ExtractedRecord>, IndexError> {
5858+ let parsed_car = parse_car_bytes(&commit.blocks)
5959+ .await
6060+ .map_err(|e| CarError::Parse {
6161+ message: e.to_string(),
6262+ })?;
6363+6464+ let event_time_ms = commit.time.as_ref().timestamp_millis();
6565+ let mut records = Vec::with_capacity(commit.ops.len());
6666+6767+ for op in &commit.ops {
6868+ let path: &str = op.path.as_ref();
6969+7070+ // Path format: "collection/rkey"
7171+ let (collection, rkey) = match path.split_once('/') {
7272+ Some((c, r)) => (c.to_smolstr(), r.to_smolstr()),
7373+ None => {
7474+ tracing::warn!(path = %path, "invalid op path format, skipping");
7575+ continue;
7676+ }
7777+ };
7878+7979+ let operation = op.action.to_smolstr();
8080+ let cid_str = op.cid.as_ref().map(|c| c.to_string()).unwrap_or_default();
8181+8282+ // For creates/updates, look up the record in the CAR blocks
8383+ let cbor_bytes = if let Some(cid_link) = &op.cid {
8484+ match cid_link.0.to_ipld() {
8585+ Ok(ipld_cid) => parsed_car.blocks.get(&ipld_cid).cloned(),
8686+ Err(_) => {
8787+ tracing::warn!(cid = %cid_str, "failed to convert CID to IPLD format");
8888+ None
8989+ }
9090+ }
9191+ } else {
9292+ None
9393+ };
9494+9595+ records.push(ExtractedRecord {
9696+ did: commit.repo.to_smolstr(),
9797+ collection,
9898+ rkey,
9999+ cid: cid_str,
100100+ operation,
101101+ cbor_bytes,
102102+ seq: commit.seq,
103103+ event_time_ms,
104104+ });
105105+ }
106106+107107+ Ok(records)
108108+}
+7
crates/weaver-index/src/lib.rs
···11+pub mod clickhouse;
22+pub mod config;
33+pub mod error;
44+pub mod firehose;
55+66+pub use config::Config;
77+pub use error::{IndexError, Result};
-134
crates/weaver-index/src/main.rs
···11-pub mod api_error;
22-33-pub mod config;
44-pub mod db;
55-pub mod middleware;
66-pub mod models;
77-pub mod oauth;
88-pub mod routes;
99-pub mod schema;
1010-pub mod state;
1111-pub mod telemetry;
1212-1313-use axum::Router;
1414-use clap::Parser;
1515-use config::*;
1616-use db::*;
1717-use dotenvy::dotenv;
1818-use miette::IntoDiagnostic;
1919-use miette::miette;
2020-use state::*;
2121-use std::env;
2222-2323-use tokio::net::TcpListener;
2424-use tracing::{debug, error, info};
2525-2626-#[derive(Parser)]
2727-#[command(author, version, about, long_about = None)]
2828-struct Cli {
2929- #[arg(
3030- short,
3131- long,
3232- value_name = "FILE",
3333- default_value = "appview-config.toml"
3434- )]
3535- config: String,
3636-}
3737-3838-#[tokio::main]
3939-async fn main() -> miette::Result<()> {
4040- let config = initialize()?;
4141- // Run any migrations before we do anything else.
4242- let db_path = config.core.db_path.clone();
4343- let _ = tokio::task::spawn_blocking(|| db::run_migrations(Some(db_path)))
4444- .await
4545- .into_diagnostic()?;
4646- let db = Db::new(Some(config.core.db_path.clone())).await;
4747- debug!("Connected to database");
4848- // Spin up our server.
4949- info!("Starting server on {}", config.core.listen_addr);
5050- let listener = TcpListener::bind(&config.core.listen_addr)
5151- .await
5252- .expect("Failed to bind address");
5353- let router = router(config, db);
5454- axum::serve(listener, router)
5555- .await
5656- .expect("Failed to start server");
5757- Ok(())
5858-}
5959-6060-pub fn router(cfg: Config, db: Db) -> Router {
6161- let app_state = AppState::new(cfg, db);
6262-6363- // Middleware that adds high level tracing to a Service.
6464- // Trace comes with good defaults but also supports customizing many aspects of the output:
6565- // https://docs.rs/tower-http/latest/tower_http/trace/index.html
6666- let trace_layer = telemetry::trace_layer();
6767-6868- // Sets 'x-request-id' header with randomly generated uuid v7.
6969- let request_id_layer = middleware::request_id_layer();
7070-7171- // Propagates 'x-request-id' header from the request to the response.
7272- let propagate_request_id_layer = middleware::propagate_request_id_layer();
7373-7474- // Layer that applies the Cors middleware which adds headers for CORS.
7575- let cors_layer = middleware::cors_layer();
7676-7777- // Layer that applies the Timeout middleware, which sets a timeout for requests.
7878- // The default value is 15 seconds.
7979- let timeout_layer = middleware::timeout_layer();
8080-8181- // Any trailing slashes from request paths will be removed. For example, a request with `/foo/`
8282- // will be changed to `/foo` before reaching the internal service.
8383- let normalize_path_layer = middleware::normalize_path_layer();
8484-8585- // Create the router with the routes.
8686- let router = routes::router();
8787-8888- // Combine all the routes and apply the middleware layers.
8989- // The order of the layers is important. The first layer is the outermost layer.
9090- Router::new()
9191- .merge(router)
9292- .layer(normalize_path_layer)
9393- .layer(cors_layer)
9494- .layer(timeout_layer)
9595- .layer(propagate_request_id_layer)
9696- .layer(trace_layer)
9797- .layer(request_id_layer)
9898- .with_state(app_state)
9999-}
100100-101101-pub fn initialize() -> miette::Result<Config> {
102102- miette::set_hook(Box::new(|_| {
103103- Box::new(
104104- miette::MietteHandlerOpts::new()
105105- .terminal_links(true)
106106- //.rgb_colors(miette::RgbColors::)
107107- .with_cause_chain()
108108- .with_syntax_highlighting(miette::highlighters::SyntectHighlighter::default())
109109- .color(true)
110110- .context_lines(5)
111111- .tab_width(2)
112112- .break_words(true)
113113- .build(),
114114- )
115115- }))
116116- .map_err(|e| miette!("Failed to set miette hook: {}", e))?;
117117- miette::set_panic_hook();
118118- dotenv().ok();
119119- let cli = Cli::parse();
120120- let config = config::Config::load(&cli.config);
121121- let config = if let Err(e) = config {
122122- error!("{}", e);
123123- config::Config::load(
124124- &env::var("APPVIEW_CONFIG").expect("Either set APPVIEW_CONFIG to the path to your config file, pass --config FILE to specify the path, or create a file called appview-config.toml in the directory where you are running the binary from."),
125125- )
126126- .map_err(|e| miette!(e))
127127- } else {
128128- config
129129- }?;
130130- let log_dir = env::var("LOG_DIR").unwrap_or_else(|_| "/tmp/appview".to_string());
131131- std::fs::create_dir_all(&log_dir).unwrap();
132132- let _guard = telemetry::setup_tracing(&log_dir);
133133- Ok(config)
134134-}
-61
crates/weaver-index/src/middleware.rs
···11-use std::time::Duration;
22-33-use axum::http::HeaderName;
44-use hyper::Request;
55-use tower_http::{
66- cors::{AllowHeaders, Any, CorsLayer},
77- normalize_path::NormalizePathLayer,
88- request_id::{MakeRequestId, PropagateRequestIdLayer, RequestId, SetRequestIdLayer},
99- timeout::TimeoutLayer,
1010-};
1111-1212-#[derive(Clone, Default)]
1313-pub struct Id;
1414-1515-impl MakeRequestId for Id {
1616- fn make_request_id<B>(&mut self, _: &Request<B>) -> Option<RequestId> {
1717- let id = uuid::Uuid::now_v7().to_string().parse().unwrap();
1818- Some(RequestId::new(id))
1919- }
2020-}
2121-2222-/// Sets the 'x-request-id' header with a randomly generated UUID v7.
2323-///
2424-/// SetRequestId will not override request IDs if they are already present
2525-/// on requests or responses.
2626-pub fn request_id_layer() -> SetRequestIdLayer<Id> {
2727- let x_request_id = HeaderName::from_static("x-request-id");
2828- SetRequestIdLayer::new(x_request_id.clone(), Id)
2929-}
3030-3131-// Propagates 'x-request-id' header from the request to the response.
3232-///
3333-/// PropagateRequestId wont override request ids if its already
3434-/// present on requests or responses.
3535-pub fn propagate_request_id_layer() -> PropagateRequestIdLayer {
3636- let x_request_id = HeaderName::from_static("x-request-id");
3737- PropagateRequestIdLayer::new(x_request_id)
3838-}
3939-4040-/// Layer that applies the Cors middleware which adds headers for CORS.
4141-pub fn cors_layer() -> CorsLayer {
4242- CorsLayer::new()
4343- .allow_origin(Any)
4444- .allow_methods(Any)
4545- .allow_headers(AllowHeaders::mirror_request())
4646- .max_age(Duration::from_secs(600))
4747-}
4848-4949-/// Layer that applies the Timeout middleware which apply a timeout to requests.
5050-/// The default timeout value is set to 15 seconds.
5151-pub fn timeout_layer() -> TimeoutLayer {
5252- TimeoutLayer::new(Duration::from_secs(15))
5353-}
5454-5555-/// Middleware that normalizes paths.
5656-///
5757-/// Any trailing slashes from request paths will be removed. For example, a request with `/foo/`
5858-/// will be changed to `/foo` before reaching the inner service.
5959-pub fn normalize_path_layer() -> NormalizePathLayer {
6060- NormalizePathLayer::trim_trailing_slash()
6161-}
···11-use tower_http::{
22- classify::{ServerErrorsAsFailures, SharedClassifier},
33- trace::{DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse, TraceLayer},
44-};
55-use tracing::Level;
66-use tracing_appender::{self, non_blocking, non_blocking::WorkerGuard, rolling::daily};
77-use tracing_subscriber::{
88- EnvFilter,
99- fmt::{self, layer, writer::MakeWriterExt},
1010- layer::SubscriberExt,
1111- registry,
1212- util::SubscriberInitExt,
1313-};
1414-/// The `EnvFilter` type is used to filter log events based on the value of an environment variable.
1515-/// In this case, we are using the `try_from_default_env` method to attempt to read the `RUST_LOG` environment variable,
1616-/// which is used to set the log level for the application.
1717-/// If the environment variable is not set, we default to the log level of `debug`.
1818-/// The `RUST_LOG` environment variable is set in the Dockerfile and .env files.
1919-pub fn setup_tracing<S: AsRef<str>>(logdir: S) -> WorkerGuard {
2020- let (non_blocking_appender, guard) = non_blocking(daily(logdir.as_ref(), "general.log"));
2121- let env_filter_layer = EnvFilter::try_from_default_env().unwrap_or_else(|_| {
2222- format!(
2323- "debug,{}=debug,tower_http=debug,axum=debug,hyper=debug,axum::rejection=trace,markdown=info",
2424- env!("CARGO_PKG_NAME"),
2525- ).into()
2626- });
2727- let formatting_layer = fmt::layer().json();
2828- tracing_subscriber::registry()
2929- .with(env_filter_layer)
3030- .with(formatting_layer)
3131- .with(
3232- layer()
3333- .with_writer(std::io::stdout.with_max_level(Level::DEBUG))
3434- .event_format(tracing_subscriber::fmt::format().pretty()),
3535- )
3636- .with(layer().with_writer(non_blocking_appender.with_max_level(Level::INFO)))
3737- .init();
3838- guard
3939-}
4040-4141-/// Returns a `TraceLayer` for HTTP requests and responses.
4242-/// The `TraceLayer` is used to trace requests and responses in the application.
4343-pub fn trace_layer() -> TraceLayer<SharedClassifier<ServerErrorsAsFailures>> {
4444- TraceLayer::new_for_http()
4545- .make_span_with(DefaultMakeSpan::new().level(Level::INFO))
4646- .on_request(DefaultOnRequest::new().level(Level::INFO))
4747- .on_response(DefaultOnResponse::new().level(Level::INFO))
4848-}