···1-- Raw records from firehose/jetstream
2-- Core table for all AT Protocol records before denormalization
3--
4--- Uses ReplacingMergeTree to deduplicate on (collection, did, rkey) keeping latest indexed_at
05-- JSON column stores full record, extract fields only when needed for ORDER BY/WHERE/JOINs
67CREATE TABLE IF NOT EXISTS raw_records (
···10 collection LowCardinality(String),
11 rkey String,
1213- -- Content identifier from the record
14 cid String,
1516- -- Repository revision (TID) - monotonically increasing per DID, used for dedup/ordering
17 rev String,
1819 -- Full record as native JSON (schema-flexible, queryable with record.field.subfield)
20 record JSON,
2122- -- Operation: 'create', 'update', 'delete'
23 operation LowCardinality(String),
2425 -- Firehose sequence number (metadata only, not for ordering - can jump on relay restart)
···49 SELECT * ORDER BY (did, cid)
50 )
51)
52-ENGINE = ReplacingMergeTree(indexed_at)
53-ORDER BY (collection, did, rkey, event_time, indexed_at)
54-SETTINGS deduplicate_merge_projection_mode = 'drop';
···1-- Raw records from firehose/jetstream
2-- Core table for all AT Protocol records before denormalization
3--
4+-- Append-only log using plain MergeTree - all versions preserved for audit/rollback.
5+-- Query-time deduplication via ORDER BY + LIMIT or window functions.
6-- JSON column stores full record, extract fields only when needed for ORDER BY/WHERE/JOINs
78CREATE TABLE IF NOT EXISTS raw_records (
···11 collection LowCardinality(String),
12 rkey String,
1314+ -- Content identifier from the record (content-addressed hash)
15 cid String,
1617+ -- Repository revision (TID) - monotonically increasing per DID, used for ordering
18 rev String,
1920 -- Full record as native JSON (schema-flexible, queryable with record.field.subfield)
21 record JSON,
2223+ -- Operation: 'create', 'update', 'delete', 'cache' (fetched on-demand)
24 operation LowCardinality(String),
2526 -- Firehose sequence number (metadata only, not for ordering - can jump on relay restart)
···50 SELECT * ORDER BY (did, cid)
51 )
52)
53+ENGINE = MergeTree()
54+ORDER BY (collection, did, rkey, event_time, indexed_at);
0
···1+-- Auto-populate handle_mappings from identity events when handle is present
2+3+CREATE MATERIALIZED VIEW IF NOT EXISTS handle_mappings_from_identity_mv TO handle_mappings AS
4+SELECT
5+ handle,
6+ did,
7+ 0 as freed,
8+ 'active' as account_status,
9+ 'identity' as source,
10+ event_time,
11+ now64(3) as indexed_at
12+FROM raw_identity_events
13+WHERE handle != ''
···1+-- Auto-populate freed status from account events
2+-- JOINs against handle_mappings to find current handle for the DID
3+-- If no mapping exists yet, the JOIN fails silently (can't free unknown handles)
4+5+CREATE MATERIALIZED VIEW IF NOT EXISTS handle_mappings_from_account_mv TO handle_mappings AS
6+SELECT
7+ h.handle,
8+ a.did,
9+ 1 as freed,
10+ a.status as account_status,
11+ 'account' as source,
12+ a.event_time,
13+ now64(3) as indexed_at
14+FROM raw_account_events a
15+INNER JOIN handle_mappings h ON h.did = a.did AND h.freed = 0
16+WHERE a.active = 0 AND a.status != ''
···1+-- Profile counts aggregated from graph tables
2+-- Updated by MVs from follows, notebooks, entries (added later with those tables)
3+-- Joined with profiles at query time
4+5+CREATE TABLE IF NOT EXISTS profile_counts (
6+ did String,
7+8+ -- Signed for increment/decrement from MVs
9+ follower_count Int64 DEFAULT 0,
10+ following_count Int64 DEFAULT 0,
11+ notebook_count Int64 DEFAULT 0,
12+ entry_count Int64 DEFAULT 0,
13+14+ indexed_at DateTime64(3) DEFAULT now64(3)
15+)
16+ENGINE = SummingMergeTree((follower_count, following_count, notebook_count, entry_count))
17+ORDER BY did
···1+-- Notebook engagement counts
2+-- Updated by MVs from likes, bookmarks, subscriptions (added later with graph tables)
3+-- Joined with notebooks at query time
4+5+CREATE TABLE IF NOT EXISTS notebook_counts (
6+ did String,
7+ rkey String,
8+9+ -- Signed for increment/decrement from MVs
10+ like_count Int64 DEFAULT 0,
11+ bookmark_count Int64 DEFAULT 0,
12+ subscriber_count Int64 DEFAULT 0,
13+14+ indexed_at DateTime64(3) DEFAULT now64(3)
15+)
16+ENGINE = SummingMergeTree((like_count, bookmark_count, subscriber_count))
17+ORDER BY (did, rkey)
···1+-- Entry engagement counts
2+-- Updated by MVs from likes, bookmarks (added later with graph tables)
3+-- Joined with entries at query time
4+5+CREATE TABLE IF NOT EXISTS entry_counts (
6+ did String,
7+ rkey String,
8+9+ -- Signed for increment/decrement from MVs
10+ like_count Int64 DEFAULT 0,
11+ bookmark_count Int64 DEFAULT 0,
12+13+ indexed_at DateTime64(3) DEFAULT now64(3)
14+)
15+ENGINE = SummingMergeTree((like_count, bookmark_count))
16+ORDER BY (did, rkey)
···1+-- Edit heads per resource
2+-- Refreshable MV that tracks all branch heads for each resource
3+-- A head is a node with no children (nothing has prev pointing to it)
4+-- Multiple heads = divergent branches needing merge
5+6+CREATE MATERIALIZED VIEW IF NOT EXISTS edit_heads
7+REFRESH EVERY 1 MINUTE
8+ENGINE = ReplacingMergeTree(indexed_at)
9+ORDER BY (resource_did, resource_collection, resource_rkey, head_did, head_rkey)
10+AS
11+WITH
12+ -- All nodes
13+ all_nodes AS (
14+ SELECT
15+ did, rkey, cid, collection, node_type,
16+ resource_did, resource_collection, resource_rkey,
17+ root_did, root_rkey,
18+ prev_did, prev_rkey,
19+ created_at
20+ FROM edit_nodes
21+ WHERE resource_did != ''
22+ ),
23+ -- Nodes that are pointed to by prev (have children)
24+ has_children AS (
25+ SELECT DISTINCT prev_did as did, prev_rkey as rkey
26+ FROM all_nodes
27+ WHERE prev_did != ''
28+ ),
29+ -- Root CIDs lookup
30+ root_cids AS (
31+ SELECT did, rkey, cid
32+ FROM edit_nodes
33+ WHERE node_type = 'root'
34+ )
35+-- Heads are nodes with no children
36+SELECT
37+ n.resource_did,
38+ n.resource_collection,
39+ n.resource_rkey,
40+ concat('at://', n.resource_did, '/', n.resource_collection, '/', n.resource_rkey) as resource_uri,
41+42+ -- This head
43+ n.did as head_did,
44+ n.rkey as head_rkey,
45+ n.cid as head_cid,
46+ n.collection as head_collection,
47+ n.node_type as head_type,
48+ concat('at://', n.did, '/', n.collection, '/', n.rkey) as head_uri,
49+50+ -- Root for this branch (with CID)
51+ if(n.node_type = 'root', n.did, n.root_did) as root_did,
52+ if(n.node_type = 'root', n.rkey, n.root_rkey) as root_rkey,
53+ if(n.node_type = 'root', n.cid, coalesce(r.cid, '')) as root_cid,
54+ if(n.node_type = 'root',
55+ concat('at://', n.did, '/', n.collection, '/', n.rkey),
56+ if(n.root_did != '', concat('at://', n.root_did, '/sh.weaver.edit.root/', n.root_rkey), '')
57+ ) as root_uri,
58+59+ n.created_at as head_created_at,
60+ now64(3) as indexed_at
61+FROM all_nodes n
62+LEFT ANTI JOIN has_children c ON n.did = c.did AND n.rkey = c.rkey
63+LEFT JOIN root_cids r ON r.did = n.root_did AND r.rkey = n.root_rkey
···1+-- Resource permissions
2+-- Refreshable MV that computes who can access each resource
3+-- Combines: owners (resource creator) + collaborators (invite+accept pairs)
4+5+CREATE MATERIALIZED VIEW IF NOT EXISTS permissions
6+REFRESH EVERY 1 MINUTE
7+ENGINE = ReplacingMergeTree(indexed_at)
8+ORDER BY (resource_did, resource_collection, resource_rkey, grantee_did)
9+AS
10+-- Owners: resource creator has owner permission
11+SELECT
12+ did as resource_did,
13+ 'sh.weaver.notebook.entry' as resource_collection,
14+ rkey as resource_rkey,
15+ concat('at://', did, '/sh.weaver.notebook.entry/', rkey) as resource_uri,
16+17+ did as grantee_did,
18+ 'owner' as scope,
19+20+ -- Source is the resource itself
21+ did as source_did,
22+ 'sh.weaver.notebook.entry' as source_collection,
23+ rkey as source_rkey,
24+25+ created_at as granted_at,
26+ now64(3) as indexed_at
27+FROM entries
28+29+UNION ALL
30+31+SELECT
32+ did as resource_did,
33+ 'sh.weaver.notebook.book' as resource_collection,
34+ rkey as resource_rkey,
35+ concat('at://', did, '/sh.weaver.notebook.book/', rkey) as resource_uri,
36+37+ did as grantee_did,
38+ 'owner' as scope,
39+40+ did as source_did,
41+ 'sh.weaver.notebook.book' as source_collection,
42+ rkey as source_rkey,
43+44+ created_at as granted_at,
45+ now64(3) as indexed_at
46+FROM notebooks
47+48+UNION ALL
49+50+-- Collaborators: invite+accept pairs grant permission
51+SELECT
52+ resource_did,
53+ resource_collection,
54+ resource_rkey,
55+ resource_uri,
56+57+ collaborator_did as grantee_did,
58+ if(scope != '', scope, 'collaborator') as scope,
59+60+ invite_did as source_did,
61+ 'sh.weaver.collab.invite' as source_collection,
62+ invite_rkey as source_rkey,
63+64+ accepted_at as granted_at,
65+ indexed_at
66+FROM collaborators
+10-9
crates/weaver-index/src/bin/weaver_indexer.rs
···1use clap::{Parser, Subcommand};
2use tracing::{error, info, warn};
3-use weaver_index::clickhouse::{Client, Migrator, Tables};
4use weaver_index::config::{
5 ClickHouseConfig, FirehoseConfig, IndexerConfig, ShardConfig, SourceMode, TapConfig,
6};
···71 let client = Client::new(&config)?;
7273 if reset {
074 if dry_run {
75- info!("Would drop tables:");
76- for table in Tables::ALL {
77- info!(" - {}", table);
78 }
79 } else {
80- info!("Dropping all tables...");
81- for table in Tables::ALL {
82- let query = format!("DROP TABLE IF EXISTS {}", table);
83 match client.execute(&query).await {
84- Ok(_) => info!(" dropped {}", table),
85- Err(e) => warn!(" failed to drop {}: {}", table, e),
86 }
87 }
88 }
···1use clap::{Parser, Subcommand};
2use tracing::{error, info, warn};
3+use weaver_index::clickhouse::{Client, Migrator};
4use weaver_index::config::{
5 ClickHouseConfig, FirehoseConfig, IndexerConfig, ShardConfig, SourceMode, TapConfig,
6};
···71 let client = Client::new(&config)?;
7273 if reset {
74+ let objects = Migrator::all_objects();
75 if dry_run {
76+ info!("Would drop {} objects:", objects.len());
77+ for obj in &objects {
78+ info!(" - {} ({:?})", obj.name, obj.object_type);
79 }
80 } else {
81+ info!("Dropping all tables and views...");
82+ for obj in &objects {
83+ let query = obj.drop_statement();
84 match client.execute(&query).await {
85+ Ok(_) => info!(" dropped {} ({:?})", obj.name, obj.object_type),
86+ Err(e) => warn!(" failed to drop {}: {}", obj.name, e),
87 }
88 }
89 }
+1-1
crates/weaver-index/src/clickhouse.rs
···3mod schema;
45pub use client::{Client, TableSize};
6-pub use migrations::{MigrationResult, Migrator};
7pub use schema::{
8 AccountRevState, FirehoseCursor, RawAccountEvent, RawEventDlq, RawIdentityEvent,
9 RawRecordInsert, Tables,
···3mod schema;
45pub use client::{Client, TableSize};
6+pub use migrations::{DbObject, MigrationResult, Migrator, ObjectType};
7pub use schema::{
8 AccountRevState, FirehoseCursor, RawAccountEvent, RawEventDlq, RawIdentityEvent,
9 RawRecordInsert, Tables,
+20-12
crates/weaver-index/src/clickhouse/client.rs
···106 collection: &str,
107 rkey: &str,
108 ) -> Result<Option<RecordRow>, IndexError> {
109- // FINAL ensures ReplacingMergeTree deduplication is applied
110 // Order by event_time first (firehose data wins), then indexed_at as tiebreaker
111 // Include deletes so we can return not-found for deleted records
112 let query = r#"
113 SELECT cid, record, operation
114- FROM raw_records FINAL
115 WHERE did = ?
116 AND collection = ?
117 AND rkey = ?
···191 /// List records for a repo+collection
192 ///
193 /// Returns non-deleted records ordered by rkey, with cursor-based pagination.
0194 pub async fn list_records(
195 &self,
196 did: &str,
···202 let order = if reverse { "DESC" } else { "ASC" };
203 let cursor_op = if reverse { "<" } else { ">" };
204205- // Build query with optional cursor
206 let query = if cursor.is_some() {
207 format!(
208 r#"
209 SELECT rkey, cid, record
210- FROM raw_records FINAL
211- WHERE did = ?
212- AND collection = ?
213- AND rkey {cursor_op} ?
214- AND operation != 'delete'
0000215 ORDER BY rkey {order}
216 LIMIT ?
217 "#,
···220 format!(
221 r#"
222 SELECT rkey, cid, record
223- FROM raw_records FINAL
224- WHERE did = ?
225- AND collection = ?
226- AND operation != 'delete'
0000227 ORDER BY rkey {order}
228 LIMIT ?
229 "#,
···106 collection: &str,
107 rkey: &str,
108 ) -> Result<Option<RecordRow>, IndexError> {
0109 // Order by event_time first (firehose data wins), then indexed_at as tiebreaker
110 // Include deletes so we can return not-found for deleted records
111 let query = r#"
112 SELECT cid, record, operation
113+ FROM raw_records
114 WHERE did = ?
115 AND collection = ?
116 AND rkey = ?
···190 /// List records for a repo+collection
191 ///
192 /// Returns non-deleted records ordered by rkey, with cursor-based pagination.
193+ /// Uses window function to get latest operation per rkey and filter out deletes.
194 pub async fn list_records(
195 &self,
196 did: &str,
···202 let order = if reverse { "DESC" } else { "ASC" };
203 let cursor_op = if reverse { "<" } else { ">" };
204205+ // Use window function to get latest version per rkey, then filter out deletes
206 let query = if cursor.is_some() {
207 format!(
208 r#"
209 SELECT rkey, cid, record
210+ FROM (
211+ SELECT rkey, cid, record, operation,
212+ ROW_NUMBER() OVER (PARTITION BY rkey ORDER BY event_time DESC, indexed_at DESC) as rn
213+ FROM raw_records
214+ WHERE did = ?
215+ AND collection = ?
216+ AND rkey {cursor_op} ?
217+ )
218+ WHERE rn = 1 AND operation != 'delete'
219 ORDER BY rkey {order}
220 LIMIT ?
221 "#,
···224 format!(
225 r#"
226 SELECT rkey, cid, record
227+ FROM (
228+ SELECT rkey, cid, record, operation,
229+ ROW_NUMBER() OVER (PARTITION BY rkey ORDER BY event_time DESC, indexed_at DESC) as rn
230+ FROM raw_records
231+ WHERE did = ?
232+ AND collection = ?
233+ )
234+ WHERE rn = 1 AND operation != 'delete'
235 ORDER BY rkey {order}
236 LIMIT ?
237 "#,
+87-1
crates/weaver-index/src/clickhouse/migrations.rs
···1use crate::error::{ClickHouseError, IndexError};
2use include_dir::{Dir, include_dir};
03use tracing::info;
45use super::Client;
···7/// Embedded migrations directory - compiled into the binary
8static MIGRATIONS_DIR: Dir = include_dir!("$CARGO_MANIFEST_DIR/migrations/clickhouse");
900000000000000000000000000010/// Migration runner for ClickHouse
11pub struct Migrator<'a> {
12 client: &'a Client,
···32 files
33 }
3400000000000000000000000000000000000000000000035 /// Run all pending migrations
36 pub async fn run(&self) -> Result<MigrationResult, IndexError> {
37 // First, ensure the migrations table exists (bootstrap)
···57 }
5859 info!(migration = %name, "applying migration");
60- self.client.execute(sql).await?;
0000061 self.record_migration(name).await?;
62 applied_count += 1;
63 }
···66 applied: applied_count,
67 skipped: skipped_count,
68 })
0000000069 }
7071 /// Check which migrations would be applied without running them
···1use crate::error::{ClickHouseError, IndexError};
2use include_dir::{Dir, include_dir};
3+use regex::Regex;
4use tracing::info;
56use super::Client;
···8/// Embedded migrations directory - compiled into the binary
9static MIGRATIONS_DIR: Dir = include_dir!("$CARGO_MANIFEST_DIR/migrations/clickhouse");
1011+/// Type of database object
12+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13+pub enum ObjectType {
14+ Table,
15+ MaterializedView,
16+ View,
17+}
18+19+/// A database object (table or view) extracted from migrations
20+#[derive(Debug, Clone)]
21+pub struct DbObject {
22+ pub name: String,
23+ pub object_type: ObjectType,
24+}
25+26+impl DbObject {
27+ /// Get the DROP statement for this object
28+ pub fn drop_statement(&self) -> String {
29+ match self.object_type {
30+ ObjectType::Table => format!("DROP TABLE IF EXISTS {}", self.name),
31+ ObjectType::MaterializedView | ObjectType::View => {
32+ format!("DROP VIEW IF EXISTS {}", self.name)
33+ }
34+ }
35+ }
36+}
37+38/// Migration runner for ClickHouse
39pub struct Migrator<'a> {
40 client: &'a Client,
···60 files
61 }
6263+ /// Extract all database objects (tables, views) from migrations
64+ /// Returns them in reverse order for safe dropping (MVs before their source tables)
65+ pub fn all_objects() -> Vec<DbObject> {
66+ let table_re =
67+ Regex::new(r"(?i)CREATE\s+TABLE\s+IF\s+NOT\s+EXISTS\s+(\w+)").unwrap();
68+ let mv_re =
69+ Regex::new(r"(?i)CREATE\s+MATERIALIZED\s+VIEW\s+IF\s+NOT\s+EXISTS\s+(\w+)").unwrap();
70+ let view_re =
71+ Regex::new(r"(?i)CREATE\s+VIEW\s+IF\s+NOT\s+EXISTS\s+(\w+)").unwrap();
72+73+ let mut objects = Vec::new();
74+75+ for (_, sql) in Self::migrations() {
76+ // Find all materialized views
77+ for caps in mv_re.captures_iter(sql) {
78+ objects.push(DbObject {
79+ name: caps[1].to_string(),
80+ object_type: ObjectType::MaterializedView,
81+ });
82+ }
83+ // Find all regular views (excluding MVs already matched)
84+ for caps in view_re.captures_iter(sql) {
85+ let name = caps[1].to_string();
86+ // Skip if already added as MV
87+ if !objects.iter().any(|o| o.name == name) {
88+ objects.push(DbObject {
89+ name,
90+ object_type: ObjectType::View,
91+ });
92+ }
93+ }
94+ // Find all tables
95+ for caps in table_re.captures_iter(sql) {
96+ objects.push(DbObject {
97+ name: caps[1].to_string(),
98+ object_type: ObjectType::Table,
99+ });
100+ }
101+ }
102+103+ // Reverse so MVs/views come before their source tables
104+ objects.reverse();
105+ objects
106+ }
107+108 /// Run all pending migrations
109 pub async fn run(&self) -> Result<MigrationResult, IndexError> {
110 // First, ensure the migrations table exists (bootstrap)
···130 }
131132 info!(migration = %name, "applying migration");
133+134+ // Split by semicolons and execute each statement
135+ for statement in Self::split_statements(sql) {
136+ self.client.execute(statement).await?;
137+ }
138+139 self.record_migration(name).await?;
140 applied_count += 1;
141 }
···144 applied: applied_count,
145 skipped: skipped_count,
146 })
147+ }
148+149+ /// Split SQL into individual statements
150+ fn split_statements(sql: &str) -> Vec<&str> {
151+ sql.split(';')
152+ .map(|s| s.trim())
153+ .filter(|s| !s.is_empty())
154+ .collect()
155 }
156157 /// Check which migrations would be applied without running them