very fast at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
fjall at-protocol atproto indexer rust
at main 358 lines 13 kB view raw
1use miette::Result; 2use std::fmt; 3use std::path::PathBuf; 4use std::str::FromStr; 5use std::time::Duration; 6use url::Url; 7 8#[derive(Debug, Clone, Copy, PartialEq, Eq)] 9pub enum Compression { 10 Lz4, 11 None, 12} 13 14impl FromStr for Compression { 15 type Err = miette::Error; 16 fn from_str(s: &str) -> Result<Self> { 17 match s { 18 "lz4" => Ok(Self::Lz4), 19 "none" => Ok(Self::None), 20 _ => Err(miette::miette!("invalid compression type")), 21 } 22 } 23} 24 25impl fmt::Display for Compression { 26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 27 match self { 28 Self::Lz4 => write!(f, "lz4"), 29 Self::None => write!(f, "none"), 30 } 31 } 32} 33 34#[derive(Debug, Clone, Copy)] 35pub enum SignatureVerification { 36 Full, 37 BackfillOnly, 38 None, 39} 40 41impl FromStr for SignatureVerification { 42 type Err = miette::Error; 43 fn from_str(s: &str) -> Result<Self> { 44 match s { 45 "full" => Ok(Self::Full), 46 "backfill-only" => Ok(Self::BackfillOnly), 47 "none" => Ok(Self::None), 48 _ => Err(miette::miette!("invalid signature verification level")), 49 } 50 } 51} 52 53impl fmt::Display for SignatureVerification { 54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 55 match self { 56 Self::Full => write!(f, "full"), 57 Self::BackfillOnly => write!(f, "backfill-only"), 58 Self::None => write!(f, "none"), 59 } 60 } 61} 62 63#[derive(Debug, Clone)] 64pub struct Config { 65 pub database_path: PathBuf, 66 pub relays: Vec<Url>, 67 pub plc_urls: Vec<Url>, 68 pub full_network: bool, 69 pub ephemeral: bool, 70 pub cursor_save_interval: Duration, 71 pub repo_fetch_timeout: Duration, 72 pub api_port: u16, 73 pub cache_size: u64, 74 pub backfill_concurrency_limit: usize, 75 pub data_compression: Compression, 76 pub journal_compression: Compression, 77 pub debug_port: u16, 78 pub enable_debug: bool, 79 pub verify_signatures: SignatureVerification, 80 pub identity_cache_size: u64, 81 pub enable_firehose: bool, 82 pub enable_backfill: bool, 83 pub enable_crawler: Option<bool>, 84 pub firehose_workers: usize, 85 pub db_compact: bool, 86 pub db_worker_threads: usize, 87 pub db_max_journaling_size_mb: u64, 88 pub db_blocks_memtable_size_mb: u64, 89 pub db_repos_memtable_size_mb: u64, 90 pub db_events_memtable_size_mb: u64, 91 pub db_records_memtable_size_mb: u64, 92 pub crawler_max_pending_repos: usize, 93 pub crawler_resume_pending_repos: usize, 94 pub filter_signals: Option<Vec<String>>, 95 pub filter_collections: Option<Vec<String>>, 96 pub filter_excludes: Option<Vec<String>>, 97} 98 99impl Config { 100 pub fn from_env() -> Result<Self> { 101 macro_rules! cfg { 102 (@val $key:expr) => { 103 std::env::var(concat!("HYDRANT_", $key)) 104 }; 105 ($key:expr, $default:expr, sec) => { 106 cfg!(@val $key) 107 .ok() 108 .and_then(|s| humantime::parse_duration(&s).ok()) 109 .unwrap_or(Duration::from_secs($default)) 110 }; 111 ($key:expr, $default:expr) => { 112 cfg!(@val $key) 113 .ok() 114 .and_then(|s| s.parse().ok()) 115 .unwrap_or($default.to_owned()) 116 .into() 117 }; 118 } 119 120 let relay_host: Url = cfg!( 121 "RELAY_HOST", 122 Url::parse("wss://relay.fire.hose.cam/").unwrap() 123 ); 124 let relay_hosts = std::env::var("HYDRANT_RELAY_HOSTS") 125 .ok() 126 .and_then(|hosts| { 127 hosts 128 .split(',') 129 .map(|s| Url::parse(s.trim())) 130 .collect::<Result<Vec<_>, _>>() 131 .inspect_err(|e| tracing::warn!("invalid relay host URL: {e}")) 132 .ok() 133 }) 134 .unwrap_or_default(); 135 let relay_hosts = relay_hosts 136 .is_empty() 137 .then(|| vec![relay_host]) 138 .unwrap_or(relay_hosts); 139 140 let full_network: bool = cfg!("FULL_NETWORK", false); 141 142 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 143 .ok() 144 .map(|s| { 145 s.split(',') 146 .map(|s| Url::parse(s.trim())) 147 .collect::<Result<Vec<_>, _>>() 148 .map_err(|e| miette::miette!("invalid PLC URL: {e}")) 149 }) 150 .unwrap_or_else(|| { 151 Ok(vec![ 152 full_network 153 .then_some(Url::parse("https://plc.directory").unwrap()) 154 .unwrap_or(Url::parse("https://plc.wtf").unwrap()), 155 ]) 156 })?; 157 158 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 3, sec); 159 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 160 161 let ephemeral: bool = cfg!("EPHEMERAL", false); 162 let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 163 let cache_size = cfg!("CACHE_SIZE", 256u64); 164 let data_compression = cfg!("DATA_COMPRESSION", Compression::Lz4); 165 let journal_compression = cfg!("JOURNAL_COMPRESSION", Compression::Lz4); 166 167 let api_port = cfg!("API_PORT", 3000u16); 168 let enable_debug = cfg!("ENABLE_DEBUG", false); 169 let debug_port: u16 = api_port + 1; 170 let debug_port = cfg!("DEBUG_PORT", debug_port); 171 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 172 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64); 173 let enable_firehose = cfg!("ENABLE_FIREHOSE", true); 174 let enable_backfill = cfg!("ENABLE_BACKFILL", true); 175 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER") 176 .ok() 177 .and_then(|s| s.parse().ok()); 178 179 let backfill_concurrency_limit = cfg!( 180 "BACKFILL_CONCURRENCY_LIMIT", 181 full_network.then_some(64usize).unwrap_or(16usize) 182 ); 183 let firehose_workers = cfg!( 184 "FIREHOSE_WORKERS", 185 full_network.then_some(24usize).unwrap_or(8usize) 186 ); 187 188 let db_compact = cfg!("COMPACT_DB", false); 189 190 let ( 191 default_db_worker_threads, 192 default_db_max_journaling_size_mb, 193 default_db_memtable_size_mb, 194 ): (usize, u64, u64) = full_network 195 .then_some((8usize, 1024u64, 192u64)) 196 .unwrap_or((4usize, 400u64, 32u64)); 197 198 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads); 199 let db_max_journaling_size_mb = cfg!( 200 "DB_MAX_JOURNALING_SIZE_MB", 201 default_db_max_journaling_size_mb 202 ); 203 let db_blocks_memtable_size_mb = 204 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 205 let db_events_memtable_size_mb = 206 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb); 207 let db_records_memtable_size_mb = cfg!( 208 "DB_RECORDS_MEMTABLE_SIZE_MB", 209 // records is did + col + rkey -> CID so its pretty cheap 210 default_db_memtable_size_mb / 3 * 2 211 ); 212 let db_repos_memtable_size_mb = 213 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb / 2); 214 215 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize); 216 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize); 217 218 let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| { 219 s.split(',') 220 .map(|s| s.trim().to_string()) 221 .filter(|s| !s.is_empty()) 222 .collect() 223 }); 224 225 let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| { 226 s.split(',') 227 .map(|s| s.trim().to_string()) 228 .filter(|s| !s.is_empty()) 229 .collect() 230 }); 231 232 let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| { 233 s.split(',') 234 .map(|s| s.trim().to_string()) 235 .filter(|s| !s.is_empty()) 236 .collect() 237 }); 238 239 Ok(Self { 240 database_path, 241 relays: relay_hosts, 242 plc_urls, 243 ephemeral, 244 full_network, 245 cursor_save_interval, 246 repo_fetch_timeout, 247 api_port, 248 cache_size, 249 backfill_concurrency_limit, 250 data_compression, 251 journal_compression, 252 debug_port, 253 enable_debug, 254 verify_signatures, 255 identity_cache_size, 256 enable_firehose, 257 enable_backfill, 258 enable_crawler, 259 firehose_workers, 260 db_compact, 261 db_worker_threads, 262 db_max_journaling_size_mb, 263 db_blocks_memtable_size_mb, 264 db_repos_memtable_size_mb, 265 db_events_memtable_size_mb, 266 db_records_memtable_size_mb, 267 crawler_max_pending_repos, 268 crawler_resume_pending_repos, 269 filter_signals, 270 filter_collections, 271 filter_excludes, 272 }) 273 } 274} 275 276macro_rules! config_line { 277 ($f:expr, $label:expr, $value:expr) => { 278 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH) 279 }; 280} 281 282impl fmt::Display for Config { 283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 284 const LABEL_WIDTH: usize = 27; 285 286 writeln!(f, "hydrant configuration:")?; 287 config_line!(f, "relay hosts", format_args!("{:?}", self.relays))?; 288 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?; 289 config_line!(f, "full network indexing", self.full_network)?; 290 config_line!(f, "verify signatures", self.verify_signatures)?; 291 config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?; 292 config_line!(f, "identity cache size", self.identity_cache_size)?; 293 config_line!( 294 f, 295 "cursor save interval", 296 format_args!("{}sec", self.cursor_save_interval.as_secs()) 297 )?; 298 config_line!( 299 f, 300 "repo fetch timeout", 301 format_args!("{}sec", self.repo_fetch_timeout.as_secs()) 302 )?; 303 config_line!(f, "ephemeral", self.ephemeral)?; 304 config_line!(f, "database path", self.database_path.to_string_lossy())?; 305 config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?; 306 config_line!(f, "data compression", self.data_compression)?; 307 config_line!(f, "journal compression", self.journal_compression)?; 308 config_line!(f, "api port", self.api_port)?; 309 config_line!(f, "firehose workers", self.firehose_workers)?; 310 config_line!(f, "db compact", self.db_compact)?; 311 config_line!(f, "db worker threads", self.db_worker_threads)?; 312 config_line!( 313 f, 314 "db journal size", 315 format_args!("{} mb", self.db_max_journaling_size_mb) 316 )?; 317 config_line!( 318 f, 319 "db blocks memtable", 320 format_args!("{} mb", self.db_blocks_memtable_size_mb) 321 )?; 322 config_line!( 323 f, 324 "db repos memtable", 325 format_args!("{} mb", self.db_repos_memtable_size_mb) 326 )?; 327 config_line!( 328 f, 329 "db events memtable", 330 format_args!("{} mb", self.db_events_memtable_size_mb) 331 )?; 332 config_line!( 333 f, 334 "db records memtable", 335 format_args!("{} mb", self.db_records_memtable_size_mb) 336 )?; 337 config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?; 338 config_line!( 339 f, 340 "crawler resume pending", 341 self.crawler_resume_pending_repos 342 )?; 343 if let Some(signals) = &self.filter_signals { 344 config_line!(f, "filter signals", format_args!("{:?}", signals))?; 345 } 346 if let Some(collections) = &self.filter_collections { 347 config_line!(f, "filter collections", format_args!("{:?}", collections))?; 348 } 349 if let Some(excludes) = &self.filter_excludes { 350 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?; 351 } 352 config_line!(f, "enable debug", self.enable_debug)?; 353 if self.enable_debug { 354 config_line!(f, "debug port", self.debug_port)?; 355 } 356 Ok(()) 357 } 358}