at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
at-protocol atproto indexer rust fjall

[config] allow setting filters through the config

ptr.pet bbcd660c f89fd624

verified
+67 -7
+3
README.md
··· 29 29 | `RELAY_HOST` | `wss://relay.fire.hose.cam` | websocket URL of the upstream firehose relay. | 30 30 | `PLC_URL` | `https://plc.wtf` | base URL(s) of the PLC directory (comma-separated for multiple). | 31 31 | `FULL_NETWORK` | `false` | if `true`, discovers and indexes all repositories in the network. | 32 + | `FILTER_SIGNALS` | | comma-separated list of NSID patterns to use for the filter on startup (e.g. `app.bsky.feed.post,app.bsky.graph.*`). | 33 + | `FILTER_COLLECTIONS` | | comma-separated list of NSID patterns to use for the collections filter on startup. | 34 + | `FILTER_EXCLUDES` | | comma-separated list of DIDs to exclude from indexing on startup. | 32 35 | `FIREHOSE_WORKERS` | `8` (`32` if full network) | number of concurrent workers for firehose events. | 33 36 | `BACKFILL_CONCURRENCY_LIMIT` | `128` | maximum number of concurrent backfill tasks. | 34 37 | `VERIFY_SIGNATURES` | `full` | signature verification level: `full`, `backfill-only`, or `none`. |
+36
src/config.rs
··· 65 65 pub db_records_memtable_size_mb: u64, 66 66 pub crawler_max_pending_repos: usize, 67 67 pub crawler_resume_pending_repos: usize, 68 + pub filter_signals: Option<Vec<String>>, 69 + pub filter_collections: Option<Vec<String>>, 70 + pub filter_excludes: Option<Vec<String>>, 68 71 } 69 72 70 73 impl Config { ··· 156 159 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize); 157 160 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize); 158 161 162 + let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| { 163 + s.split(',') 164 + .map(|s| s.trim().to_string()) 165 + .filter(|s| !s.is_empty()) 166 + .collect() 167 + }); 168 + 169 + let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| { 170 + s.split(',') 171 + .map(|s| s.trim().to_string()) 172 + .filter(|s| !s.is_empty()) 173 + .collect() 174 + }); 175 + 176 + let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| { 177 + s.split(',') 178 + .map(|s| s.trim().to_string()) 179 + .filter(|s| !s.is_empty()) 180 + .collect() 181 + }); 182 + 159 183 Ok(Self { 160 184 database_path, 161 185 relay_host, ··· 185 209 db_records_memtable_size_mb, 186 210 crawler_max_pending_repos, 187 211 crawler_resume_pending_repos, 212 + filter_signals, 213 + filter_collections, 214 + filter_excludes, 188 215 }) 189 216 } 190 217 } ··· 272 299 " crawler resume pending: {}", 273 300 self.crawler_resume_pending_repos 274 301 )?; 302 + if let Some(signals) = &self.filter_signals { 303 + writeln!(f, " filter signals: {:?}", signals)?; 304 + } 305 + if let Some(collections) = &self.filter_collections { 306 + writeln!(f, " filter collections: {:?}", collections)?; 307 + } 308 + if let Some(excludes) = &self.filter_excludes { 309 + writeln!(f, " filter excludes: {:?}", excludes)?; 310 + } 275 311 writeln!(f, " enable debug: {}", self.enable_debug)?; 276 312 if self.enable_debug { 277 313 writeln!(f, " debug port: {}", self.debug_port)?;
+28 -7
src/main.rs
··· 16 16 17 17 #[tokio::main] 18 18 async fn main() -> miette::Result<()> { 19 + rustls::crypto::ring::default_provider().install_default().ok(); 20 + 19 21 let cfg = Config::from_env()?; 20 22 21 23 let env_filter = tracing_subscriber::EnvFilter::new(&cfg.log_level); ··· 25 27 26 28 let state = AppState::new(&cfg)?; 27 29 28 - if cfg.full_network { 30 + if cfg.full_network || cfg.filter_signals.is_some() || cfg.filter_collections.is_some() || cfg.filter_excludes.is_some() { 29 31 let filter_ks = state.db.filter.clone(); 30 32 let inner = state.db.inner.clone(); 33 + let full_network = cfg.full_network; 34 + let signals = cfg.filter_signals.clone(); 35 + let collections = cfg.filter_collections.clone(); 36 + let excludes = cfg.filter_excludes.clone(); 37 + 31 38 tokio::task::spawn_blocking(move || { 32 - use hydrant::db::filter::MODE_KEY; 33 - use hydrant::filter::FilterMode; 39 + use hydrant::filter::{FilterMode, SetUpdate}; 34 40 let mut batch = inner.batch(); 35 - batch.insert( 41 + 42 + let mode = if full_network { 43 + Some(FilterMode::Full) 44 + } else { 45 + None 46 + }; 47 + 48 + let signals_update = signals.map(SetUpdate::Set); 49 + let collections_update = collections.map(SetUpdate::Set); 50 + let excludes_update = excludes.map(SetUpdate::Set); 51 + 52 + hydrant::db::filter::apply_patch( 53 + &mut batch, 36 54 &filter_ks, 37 - MODE_KEY, 38 - rmp_serde::to_vec(&FilterMode::Full).into_diagnostic()?, 39 - ); 55 + mode, 56 + signals_update, 57 + collections_update, 58 + excludes_update, 59 + )?; 60 + 40 61 batch.commit().into_diagnostic() 41 62 }) 42 63 .await