···13 -- Content identifier from the record
14 cid String,
1500016 -- Full record as native JSON (schema-flexible, queryable with record.field.subfield)
17 record JSON,
18···28 -- When we indexed this record
29 indexed_at DateTime64(3) DEFAULT now64(3),
30000031 -- Materialized AT URI for convenience
32- uri String MATERIALIZED concat('at://', did, '/', collection, '/', rkey)
000000033)
34ENGINE = ReplacingMergeTree(indexed_at)
35-ORDER BY (collection, did, rkey, indexed_at);
0
···13 -- Content identifier from the record
14 cid String,
1516+ -- Repository revision (TID) - monotonically increasing per DID, used for dedup/ordering
17+ rev String,
18+19 -- Full record as native JSON (schema-flexible, queryable with record.field.subfield)
20 record JSON,
21···31 -- When we indexed this record
32 indexed_at DateTime64(3) DEFAULT now64(3),
3334+ -- Validation state: 'unchecked', 'valid', 'invalid_rev', 'invalid_gap', 'invalid_account'
35+ -- Populated by async batch validation, not in hot path
36+ validation_state LowCardinality(String) DEFAULT 'unchecked',
37+38 -- Materialized AT URI for convenience
39+ uri String MATERIALIZED concat('at://', did, '/', collection, '/', rkey),
40+41+ -- Projection for fast delete lookups by (did, cid)
42+ -- Delete events include CID, so we can O(1) lookup the original record
43+ -- to know what to decrement (e.g., which notebook's like count)
44+ PROJECTION by_did_cid (
45+ SELECT * ORDER BY (did, cid)
46+ )
47)
48ENGINE = ReplacingMergeTree(indexed_at)
49+ORDER BY (collection, did, rkey, event_time)
50+SETTINGS deduplicate_merge_projection_mode = 'drop';
···1+-- Per-account revision state tracking
2+-- Maintains latest rev/cid per DID for dedup and gap detection
3+--
4+-- AggregatingMergeTree with incremental MV from raw_records
5+-- Query with argMaxMerge/maxMerge to finalize aggregates
6+7+CREATE TABLE IF NOT EXISTS account_rev_state (
8+ -- Account DID
9+ did String,
10+11+ -- Latest revision (TID) seen for this account
12+ last_rev AggregateFunction(argMax, String, DateTime64(3)),
13+14+ -- CID of the latest revision
15+ last_cid AggregateFunction(argMax, String, DateTime64(3)),
16+17+ -- Latest sequence number seen
18+ last_seq AggregateFunction(max, UInt64),
19+20+ -- Latest event time seen
21+ last_event_time AggregateFunction(max, DateTime64(3))
22+)
23+ENGINE = AggregatingMergeTree()
24+ORDER BY did
···1+-- Incremental MV: fires on each insert to raw_records, maintains aggregate state
2+-- Must be created after both account_rev_state (target) and raw_records (source) exist
3+4+CREATE MATERIALIZED VIEW IF NOT EXISTS account_rev_state_mv TO account_rev_state AS
5+SELECT
6+ did,
7+ argMaxState(rev, event_time) as last_rev,
8+ argMaxState(cid, event_time) as last_cid,
9+ maxState(seq) as last_seq,
10+ maxState(event_time) as last_event_time
11+FROM raw_records
12+GROUP BY did
···1+use std::sync::Arc;
2+use std::time::{Duration, Instant};
3+4+use chrono::Utc;
5+use dashmap::DashMap;
6+use n0_future::StreamExt;
7+use smol_str::{SmolStr, ToSmolStr};
8+use tracing::{debug, info, warn};
9+10+use chrono::DateTime;
11+12+use crate::clickhouse::{
13+ AccountRevState, Client, FirehoseCursor, RawAccountEvent, RawIdentityEvent, RawRecordInsert,
14+};
15+use crate::config::IndexerConfig;
16+use crate::error::{IndexError, Result};
17+use crate::firehose::{
18+ Account, Commit, ExtractedRecord, FirehoseConsumer, Identity, MessageStream,
19+ SubscribeReposMessage, extract_records,
20+};
21+22+/// Default consumer ID for cursor tracking
23+const CONSUMER_ID: &str = "main";
24+25+/// Per-account revision state for deduplication
26+#[derive(Debug, Clone)]
27+pub struct RevState {
28+ pub last_rev: SmolStr,
29+ pub last_cid: SmolStr,
30+}
31+32+/// In-memory cache of per-account revision state
33+///
34+/// Used for fast deduplication without hitting ClickHouse on every event.
35+/// Populated from account_rev_state table on startup, updated as events are processed.
36+pub struct RevCache {
37+ inner: DashMap<SmolStr, RevState>,
38+}
39+40+impl RevCache {
41+ pub fn new() -> Self {
42+ Self {
43+ inner: DashMap::new(),
44+ }
45+ }
46+47+ /// Load cache from ClickHouse account_rev_state table
48+ pub async fn load_from_clickhouse(client: &Client) -> Result<Self> {
49+ let query = r#"
50+ SELECT
51+ did,
52+ argMaxMerge(last_rev) as last_rev,
53+ argMaxMerge(last_cid) as last_cid,
54+ maxMerge(last_seq) as last_seq,
55+ maxMerge(last_event_time) as last_event_time
56+ FROM account_rev_state
57+ GROUP BY did
58+ "#;
59+60+ let rows: Vec<AccountRevState> =
61+ client.inner().query(query).fetch_all().await.map_err(|e| {
62+ IndexError::ClickHouse(crate::error::ClickHouseError::Query {
63+ message: "failed to load account rev state".into(),
64+ source: e,
65+ })
66+ })?;
67+68+ let cache = Self::new();
69+ for row in rows {
70+ cache.inner.insert(
71+ SmolStr::new(&row.did),
72+ RevState {
73+ last_rev: SmolStr::new(&row.last_rev),
74+ last_cid: SmolStr::new(&row.last_cid),
75+ },
76+ );
77+ }
78+79+ info!(
80+ accounts = cache.inner.len(),
81+ "loaded rev cache from clickhouse"
82+ );
83+ Ok(cache)
84+ }
85+86+ /// Check if we should process this commit (returns false if already seen)
87+ pub fn should_process(&self, did: &str, rev: &str) -> bool {
88+ match self.inner.get(did) {
89+ Some(state) => rev > state.last_rev.as_str(),
90+ None => true, // new account, always process
91+ }
92+ }
93+94+ /// Update cache after processing a commit
95+ pub fn update(&self, did: &SmolStr, rev: &SmolStr, cid: &SmolStr) {
96+ self.inner.insert(
97+ did.clone(),
98+ RevState {
99+ last_rev: rev.clone(),
100+ last_cid: cid.clone(),
101+ },
102+ );
103+ }
104+105+ /// Get current cache size (number of accounts tracked)
106+ pub fn len(&self) -> usize {
107+ self.inner.len()
108+ }
109+110+ pub fn is_empty(&self) -> bool {
111+ self.inner.is_empty()
112+ }
113+}
114+115+impl Default for RevCache {
116+ fn default() -> Self {
117+ Self::new()
118+ }
119+}
120+121+/// Safety margin when resuming - back up this many sequence numbers
122+/// to ensure no gaps from incomplete batches or race conditions
123+const CURSOR_REWIND: i64 = 1000;
124+125+/// Load cursor from ClickHouse for resuming
126+///
127+/// Returns cursor with safety margin subtracted to ensure overlap
128+pub async fn load_cursor(client: &Client) -> Result<Option<i64>> {
129+ let query = format!(
130+ r#"
131+ SELECT consumer_id, seq, event_time
132+ FROM firehose_cursor FINAL
133+ WHERE consumer_id = '{}'
134+ LIMIT 1
135+ "#,
136+ CONSUMER_ID
137+ );
138+139+ let cursor: Option<FirehoseCursor> = client
140+ .inner()
141+ .query(&query)
142+ .fetch_optional()
143+ .await
144+ .map_err(|e| crate::error::ClickHouseError::Query {
145+ message: "failed to load cursor".into(),
146+ source: e,
147+ })?;
148+149+ if let Some(c) = &cursor {
150+ let resume_at = (c.seq as i64).saturating_sub(CURSOR_REWIND);
151+ info!(
152+ saved_seq = c.seq,
153+ resume_seq = resume_at,
154+ rewind = CURSOR_REWIND,
155+ "loaded cursor from clickhouse (with safety margin)"
156+ );
157+ Ok(Some(resume_at))
158+ } else {
159+ Ok(None)
160+ }
161+}
162+163+/// Main indexer that consumes firehose and writes to ClickHouse
164+pub struct Indexer {
165+ client: Arc<Client>,
166+ consumer: FirehoseConsumer,
167+ rev_cache: RevCache,
168+ config: IndexerConfig,
169+}
170+171+impl Indexer {
172+ /// Create a new indexer
173+ pub async fn new(
174+ client: Client,
175+ consumer: FirehoseConsumer,
176+ config: IndexerConfig,
177+ ) -> Result<Self> {
178+ let client = Arc::new(client);
179+180+ // Load rev cache from ClickHouse
181+ let rev_cache = RevCache::load_from_clickhouse(&client).await?;
182+183+ Ok(Self {
184+ client,
185+ consumer,
186+ rev_cache,
187+ config,
188+ })
189+ }
190+191+ /// Save cursor to ClickHouse
192+ async fn save_cursor(&self, seq: u64, event_time: DateTime<Utc>) -> Result<()> {
193+ let query = format!(
194+ "INSERT INTO firehose_cursor (consumer_id, seq, event_time) VALUES ('{}', {}, {})",
195+ CONSUMER_ID,
196+ seq,
197+ event_time.timestamp_millis()
198+ );
199+200+ self.client.execute(&query).await?;
201+ debug!(seq, "saved cursor");
202+ Ok(())
203+ }
204+205+ /// Run the indexer loop
206+ pub async fn run(&self) -> Result<()> {
207+ info!("connecting to firehose...");
208+ let mut stream: MessageStream = self.consumer.connect().await?;
209+210+ // Inserters handle batching internally based on config
211+ let mut records = self.client.inserter::<RawRecordInsert>("raw_records");
212+ let mut identities = self
213+ .client
214+ .inserter::<RawIdentityEvent>("raw_identity_events");
215+ let mut accounts = self
216+ .client
217+ .inserter::<RawAccountEvent>("raw_account_events");
218+219+ // Stats and cursor tracking
220+ let mut processed: u64 = 0;
221+ let mut skipped: u64 = 0;
222+ let mut last_seq: u64 = 0;
223+ let mut last_event_time = Utc::now();
224+ let mut last_stats = Instant::now();
225+ let mut last_cursor_save = Instant::now();
226+227+ info!("starting indexer loop");
228+229+ while let Some(result) = stream.next().await {
230+ let msg = match result {
231+ Ok(msg) => msg,
232+ Err(e) => {
233+ warn!(error = ?e, "firehose stream error");
234+ continue;
235+ }
236+ };
237+238+ // Track seq from any message type that has it
239+ match &msg {
240+ SubscribeReposMessage::Commit(c) => {
241+ last_seq = c.seq as u64;
242+ last_event_time = c.time.as_ref().with_timezone(&Utc);
243+ }
244+ SubscribeReposMessage::Identity(i) => {
245+ last_seq = i.seq as u64;
246+ last_event_time = i.time.as_ref().with_timezone(&Utc);
247+ }
248+ SubscribeReposMessage::Account(a) => {
249+ last_seq = a.seq as u64;
250+ last_event_time = a.time.as_ref().with_timezone(&Utc);
251+ }
252+ _ => {}
253+ }
254+255+ match msg {
256+ SubscribeReposMessage::Commit(commit) => {
257+ if self
258+ .process_commit(&commit, &mut records, &mut skipped)
259+ .await?
260+ {
261+ processed += 1;
262+ }
263+ }
264+ SubscribeReposMessage::Identity(identity) => {
265+ write_identity(&identity, &mut identities).await?;
266+ }
267+ SubscribeReposMessage::Account(account) => {
268+ write_account(&account, &mut accounts).await?;
269+ }
270+ SubscribeReposMessage::Sync(_) => {
271+ debug!("received sync (tooBig) event, skipping");
272+ }
273+ _ => {}
274+ }
275+276+ // commit() flushes if internal thresholds met, otherwise no-op
277+ records
278+ .commit()
279+ .await
280+ .map_err(|e| crate::error::ClickHouseError::Query {
281+ message: "commit failed".into(),
282+ source: e,
283+ })?;
284+285+ // Periodic stats and cursor save (every 10s)
286+ if last_stats.elapsed() >= Duration::from_secs(10) {
287+ info!(
288+ processed,
289+ skipped,
290+ last_seq,
291+ rev_cache_size = self.rev_cache.len(),
292+ "indexer stats"
293+ );
294+ last_stats = Instant::now();
295+ }
296+297+ // Save cursor every 30s
298+ if last_cursor_save.elapsed() >= Duration::from_secs(30) && last_seq > 0 {
299+ if let Err(e) = self.save_cursor(last_seq, last_event_time).await {
300+ warn!(error = ?e, "failed to save cursor");
301+ }
302+ last_cursor_save = Instant::now();
303+ }
304+ }
305+306+ // Final flush
307+ records
308+ .end()
309+ .await
310+ .map_err(|e| crate::error::ClickHouseError::Query {
311+ message: "final flush failed".into(),
312+ source: e,
313+ })?;
314+ identities
315+ .end()
316+ .await
317+ .map_err(|e| crate::error::ClickHouseError::Query {
318+ message: "final flush failed".into(),
319+ source: e,
320+ })?;
321+ accounts
322+ .end()
323+ .await
324+ .map_err(|e| crate::error::ClickHouseError::Query {
325+ message: "final flush failed".into(),
326+ source: e,
327+ })?;
328+329+ // Final cursor save
330+ if last_seq > 0 {
331+ self.save_cursor(last_seq, last_event_time).await?;
332+ }
333+334+ info!(last_seq, "firehose stream ended");
335+ Ok(())
336+ }
337+338+ async fn process_commit(
339+ &self,
340+ commit: &Commit<'_>,
341+ inserter: &mut clickhouse::inserter::Inserter<RawRecordInsert>,
342+ skipped: &mut u64,
343+ ) -> Result<bool> {
344+ let did = commit.repo.as_ref();
345+ let rev = commit.rev.as_ref();
346+347+ // Dedup check
348+ if !self.rev_cache.should_process(did, rev) {
349+ *skipped += 1;
350+ return Ok(false);
351+ }
352+353+ // Extract and write records
354+ for record in extract_records(commit).await? {
355+ // Collection filter - skip early before JSON conversion
356+ if !self.config.collections.matches(&record.collection) {
357+ continue;
358+ }
359+360+ let json = record.to_json()?.unwrap_or_else(|| "{}".to_string());
361+362+ // Fire and forget delete handling
363+ if record.operation == "delete" {
364+ let client = self.client.clone();
365+ let record_clone = record.clone();
366+ tokio::spawn(async move {
367+ if let Err(e) = handle_delete(&client, record_clone).await {
368+ warn!(error = ?e, "delete handling failed");
369+ }
370+ });
371+ }
372+373+ inserter
374+ .write(&RawRecordInsert {
375+ did: record.did.clone(),
376+ collection: record.collection.clone(),
377+ rkey: record.rkey.clone(),
378+ cid: record.cid.clone(),
379+ rev: record.rev.clone(),
380+ record: json.to_smolstr(),
381+ operation: record.operation.clone(),
382+ seq: record.seq as u64,
383+ event_time: record.event_time,
384+ })
385+ .await
386+ .map_err(|e| crate::error::ClickHouseError::Query {
387+ message: "write failed".into(),
388+ source: e,
389+ })?;
390+ }
391+392+ // Update rev cache
393+ self.rev_cache.update(
394+ &SmolStr::new(did),
395+ &SmolStr::new(rev),
396+ &commit.commit.0.to_smolstr(),
397+ );
398+399+ Ok(true)
400+ }
401+}
402+403+async fn write_identity(
404+ identity: &Identity<'_>,
405+ inserter: &mut clickhouse::inserter::Inserter<RawIdentityEvent>,
406+) -> Result<()> {
407+ inserter
408+ .write(&RawIdentityEvent {
409+ did: identity.did.to_smolstr(),
410+ handle: identity
411+ .handle
412+ .as_ref()
413+ .map(|h| h.as_ref().to_smolstr())
414+ .unwrap_or_default(),
415+ seq: identity.seq as u64,
416+ event_time: identity.time.as_ref().with_timezone(&Utc),
417+ })
418+ .await
419+ .map_err(|e| crate::error::ClickHouseError::Query {
420+ message: "write failed".into(),
421+ source: e,
422+ })?;
423+ Ok(())
424+}
425+426+async fn write_account(
427+ account: &Account<'_>,
428+ inserter: &mut clickhouse::inserter::Inserter<RawAccountEvent>,
429+) -> Result<()> {
430+ inserter
431+ .write(&RawAccountEvent {
432+ did: account.did.to_smolstr(),
433+ active: if account.active { 1 } else { 0 },
434+ status: account
435+ .status
436+ .as_ref()
437+ .map(|s| s.as_ref().to_smolstr())
438+ .unwrap_or_default(),
439+ seq: account.seq as u64,
440+ event_time: account.time.as_ref().with_timezone(&Utc),
441+ })
442+ .await
443+ .map_err(|e| crate::error::ClickHouseError::Query {
444+ message: "write failed".into(),
445+ source: e,
446+ })?;
447+ Ok(())
448+}
449+450+/// Handle a delete event with poll-then-stub logic
451+///
452+/// For deletes, we need to look up the original record to know what was deleted
453+/// (e.g., which notebook a like was for). If the record doesn't exist yet
454+/// (out-of-order events), we poll for up to 15 seconds before creating a stub tombstone.
455+/// Minimal struct for delete lookups - just the fields we need to process the delete
456+#[derive(Debug, Clone, clickhouse::Row, serde::Deserialize)]
457+struct LookupRawRecord {
458+ did: SmolStr,
459+ collection: SmolStr,
460+ rkey: SmolStr,
461+ record: SmolStr, // JSON string of the original record
462+}
463+464+async fn handle_delete(client: &Client, record: ExtractedRecord) -> Result<()> {
465+ let deadline = Instant::now() + Duration::from_secs(15);
466+467+ loop {
468+ // Try to find the record by CID
469+ let query = format!(
470+ r#"
471+ SELECT did, collection, rkey, record
472+ FROM raw_records
473+ WHERE did = '{}' AND cid = '{}'
474+ ORDER BY event_time DESC
475+ LIMIT 1
476+ "#,
477+ record.did, record.cid
478+ );
479+480+ let original: Option<LookupRawRecord> = client
481+ .inner()
482+ .query(&query)
483+ .fetch_optional()
484+ .await
485+ .map_err(|e| crate::error::ClickHouseError::Query {
486+ message: "delete lookup failed".into(),
487+ source: e,
488+ })?;
489+490+ if let Some(_original) = original {
491+ // Found the record - the main insert path already handles creating
492+ // the delete row, so we're done. In phase 2, this is where we'd
493+ // parse original.record and insert count deltas for denormalized tables.
494+ debug!(did = %record.did, cid = %record.cid, "delete found original record");
495+ return Ok(());
496+ }
497+498+ if Instant::now() > deadline {
499+ // Gave up - create stub tombstone
500+ // The record will be inserted via the main batch path with operation='delete'
501+ // and empty record content, which serves as our stub tombstone
502+ warn!(
503+ did = %record.did,
504+ cid = %record.cid,
505+ "delete timeout, stub tombstone will be created"
506+ );
507+ return Ok(());
508+ }
509+510+ tokio::time::sleep(Duration::from_secs(1)).await;
511+ }
512+}
+2
crates/weaver-index/src/lib.rs
···2pub mod config;
3pub mod error;
4pub mod firehose;
056pub use config::Config;
7pub use error::{IndexError, Result};
0
···2pub mod config;
3pub mod error;
4pub mod firehose;
5+pub mod indexer;
67pub use config::Config;
8pub use error::{IndexError, Result};
9+pub use indexer::{load_cursor, Indexer};