at main 320 lines 9.4 kB view raw
1use crate::error::{ConfigError, IndexError}; 2use dashmap::DashSet; 3use url::Url; 4 5/// ClickHouse connection configuration 6#[derive(Debug, Clone)] 7pub struct ClickHouseConfig { 8 pub url: Url, 9 pub database: String, 10 pub user: String, 11 pub password: String, 12} 13 14impl ClickHouseConfig { 15 /// Load configuration from environment variables. 16 /// 17 /// Required env vars: 18 /// - `CLICKHOUSE_URL`: Full URL including protocol (e.g., `https://xyz.clickhouse.cloud:8443`) 19 /// - `CLICKHOUSE_DATABASE`: Database name 20 /// - `CLICKHOUSE_USER`: Username 21 /// - `CLICKHOUSE_PASSWORD`: Password 22 pub fn from_env() -> Result<Self, IndexError> { 23 let url_str = std::env::var("CLICKHOUSE_URL").map_err(|_| ConfigError::MissingEnv { 24 var: "CLICKHOUSE_URL", 25 })?; 26 27 let url = Url::parse(&url_str).map_err(|e| ConfigError::UrlParse { 28 url: url_str, 29 message: e.to_string(), 30 })?; 31 32 let database = 33 std::env::var("CLICKHOUSE_DATABASE").map_err(|_| ConfigError::MissingEnv { 34 var: "CLICKHOUSE_DATABASE", 35 })?; 36 37 let user = std::env::var("CLICKHOUSE_USER").map_err(|_| ConfigError::MissingEnv { 38 var: "CLICKHOUSE_USER", 39 })?; 40 41 let password = 42 std::env::var("CLICKHOUSE_PASSWORD").map_err(|_| ConfigError::MissingEnv { 43 var: "CLICKHOUSE_PASSWORD", 44 })?; 45 46 Ok(Self { 47 url, 48 database, 49 user, 50 password, 51 }) 52 } 53} 54 55/// Firehose relay configuration 56#[derive(Debug, Clone)] 57pub struct FirehoseConfig { 58 pub relay_url: Url, 59 pub cursor: Option<i64>, 60} 61 62impl FirehoseConfig { 63 /// Default relay URL (Bluesky network) 64 pub const DEFAULT_RELAY: &'static str = "wss://bsky.network"; 65 66 /// Load configuration from environment variables. 67 /// 68 /// Optional env vars: 69 /// - `FIREHOSE_RELAY_URL`: Relay WebSocket URL (default: wss://bsky.network) 70 /// - `FIREHOSE_CURSOR`: Starting cursor position (default: none, starts from live) 71 pub fn from_env() -> Result<Self, IndexError> { 72 let relay_str = 73 std::env::var("FIREHOSE_RELAY_URL").unwrap_or_else(|_| Self::DEFAULT_RELAY.to_string()); 74 75 let relay_url = Url::parse(&relay_str).map_err(|e| ConfigError::UrlParse { 76 url: relay_str, 77 message: e.to_string(), 78 })?; 79 80 let cursor = std::env::var("FIREHOSE_CURSOR") 81 .ok() 82 .and_then(|s| s.parse().ok()); 83 84 Ok(Self { relay_url, cursor }) 85 } 86} 87 88use smol_str::{SmolStr, ToSmolStr}; 89 90/// Pre-parsed collection filter for efficient matching 91#[derive(Debug, Clone)] 92pub struct CollectionFilter { 93 /// Prefix patterns (from "foo.*" -> "foo.") 94 prefixes: Vec<SmolStr>, 95 /// Exact match patterns (HashSet for O(1) lookup) 96 exact: DashSet<SmolStr>, 97 /// True if filter is empty (accept all) 98 accept_all: bool, 99} 100 101impl CollectionFilter { 102 /// Parse filter patterns into prefixes and exact matches 103 pub fn new(patterns: Vec<SmolStr>) -> Self { 104 let mut prefixes = Vec::new(); 105 let exact = DashSet::new(); 106 107 for pattern in patterns { 108 if let Some(prefix) = pattern.strip_suffix('*') { 109 prefixes.push(SmolStr::new(prefix)); 110 } else { 111 exact.insert(SmolStr::new(&pattern)); 112 } 113 } 114 115 let accept_all = prefixes.is_empty() && exact.is_empty(); 116 Self { 117 prefixes, 118 exact, 119 accept_all, 120 } 121 } 122 123 /// Check if a collection matches any pattern 124 #[inline] 125 pub fn matches(&self, collection: &str) -> bool { 126 if self.accept_all { 127 return true; 128 } 129 130 // O(1) exact match check first 131 if self.exact.contains(collection) { 132 return true; 133 } 134 135 // Prefix check - for small N, linear scan is fine 136 // Accumulate without early return to help branch predictor 137 let mut matched = false; 138 for prefix in &self.prefixes { 139 matched |= collection.starts_with(prefix.as_str()); 140 } 141 matched 142 } 143} 144 145/// Indexer runtime configuration 146#[derive(Debug, Clone)] 147pub struct IndexerConfig { 148 /// Maximum records to batch before flushing to ClickHouse 149 pub batch_size: usize, 150 /// Maximum time (ms) before flushing even if batch isn't full 151 pub flush_interval_ms: u64, 152 /// Collection filter (pre-parsed patterns) 153 pub collections: CollectionFilter, 154} 155 156impl Default for IndexerConfig { 157 fn default() -> Self { 158 Self { 159 batch_size: 1000, 160 flush_interval_ms: 1000, 161 collections: CollectionFilter::new(vec![ 162 SmolStr::new_static("sh.weaver.*"), 163 SmolStr::new_static("app.bsky.actor.profile"), 164 ]), 165 } 166 } 167} 168 169impl IndexerConfig { 170 /// Load configuration from environment variables. 171 /// 172 /// Optional env vars: 173 /// - `INDEXER_BATCH_SIZE`: Max records per batch (default: 1000) 174 /// - `INDEXER_FLUSH_INTERVAL_MS`: Max ms between flushes (default: 1000) 175 /// - `INDEXER_COLLECTIONS`: Comma-separated collection patterns (default: sh.weaver.*,app.bsky.actor.profile) 176 /// Use * suffix for prefix matching, e.g., "sh.weaver.*" matches all sh.weaver.* collections 177 pub fn from_env() -> Self { 178 let batch_size = std::env::var("INDEXER_BATCH_SIZE") 179 .ok() 180 .and_then(|s| s.parse().ok()) 181 .unwrap_or(1000); 182 183 let flush_interval_ms = std::env::var("INDEXER_FLUSH_INTERVAL_MS") 184 .ok() 185 .and_then(|s| s.parse().ok()) 186 .unwrap_or(1000); 187 188 let patterns: Vec<SmolStr> = std::env::var("INDEXER_COLLECTIONS") 189 .map(|s| s.split(',').map(|p| p.trim().to_smolstr()).collect()) 190 .unwrap_or_else(|_| { 191 vec![ 192 SmolStr::new_static("sh.weaver.*"), 193 SmolStr::new_static("app.bsky.actor.profile"), 194 ] 195 }); 196 197 Self { 198 batch_size, 199 flush_interval_ms, 200 collections: CollectionFilter::new(patterns), 201 } 202 } 203} 204 205/// Tap connection configuration 206#[derive(Debug, Clone)] 207pub struct TapConfig { 208 pub url: Url, 209 pub send_acks: bool, 210 pub num_workers: usize, 211} 212 213impl TapConfig { 214 /// Default tap URL (local) 215 pub const DEFAULT_URL: &'static str = "ws://localhost:2480/channel"; 216 /// Default number of parallel workers 217 pub const DEFAULT_WORKERS: usize = 4; 218 219 /// Load configuration from environment variables. 220 /// 221 /// Optional env vars: 222 /// - `TAP_URL`: Tap WebSocket URL (default: ws://localhost:2480/channel) 223 /// - `TAP_SEND_ACKS`: Whether to send acks (default: true) 224 /// - `TAP_WORKERS`: Number of parallel workers (default: 4) 225 pub fn from_env() -> Result<Self, IndexError> { 226 let url_str = std::env::var("TAP_URL").unwrap_or_else(|_| Self::DEFAULT_URL.to_string()); 227 228 let url = Url::parse(&url_str).map_err(|e| ConfigError::UrlParse { 229 url: url_str, 230 message: e.to_string(), 231 })?; 232 233 let send_acks = std::env::var("TAP_SEND_ACKS") 234 .map(|s| s.to_lowercase() != "false") 235 .unwrap_or(true); 236 237 let num_workers = std::env::var("TAP_WORKERS") 238 .ok() 239 .and_then(|s| s.parse().ok()) 240 .unwrap_or(Self::DEFAULT_WORKERS); 241 242 Ok(Self { 243 url, 244 send_acks, 245 num_workers, 246 }) 247 } 248} 249 250/// Source mode for the indexer 251#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] 252pub enum SourceMode { 253 /// Direct firehose connection 254 #[default] 255 Firehose, 256 /// Consume from tap 257 Tap, 258} 259 260impl SourceMode { 261 pub fn from_env() -> Self { 262 match std::env::var("INDEXER_SOURCE").as_deref() { 263 Ok("tap") => SourceMode::Tap, 264 _ => SourceMode::Firehose, 265 } 266 } 267} 268 269/// SQLite shard configuration 270#[derive(Debug, Clone)] 271pub struct ShardConfig { 272 pub base_path: std::path::PathBuf, 273} 274 275impl Default for ShardConfig { 276 fn default() -> Self { 277 Self { 278 base_path: std::path::PathBuf::from("./shards"), 279 } 280 } 281} 282 283impl ShardConfig { 284 /// Load configuration from environment variables. 285 /// 286 /// Optional env vars: 287 /// - `SHARD_BASE_PATH`: Base directory for SQLite shards (default: ./shards) 288 pub fn from_env() -> Self { 289 let base_path = std::env::var("SHARD_BASE_PATH") 290 .map(std::path::PathBuf::from) 291 .unwrap_or_else(|_| std::path::PathBuf::from("./shards")); 292 293 Self { base_path } 294 } 295} 296 297/// Combined configuration for the indexer 298#[derive(Debug, Clone)] 299pub struct Config { 300 pub clickhouse: ClickHouseConfig, 301 pub firehose: FirehoseConfig, 302 pub tap: TapConfig, 303 pub indexer: IndexerConfig, 304 pub shard: ShardConfig, 305 pub source: SourceMode, 306} 307 308impl Config { 309 /// Load all configuration from environment variables. 310 pub fn from_env() -> Result<Self, IndexError> { 311 Ok(Self { 312 clickhouse: ClickHouseConfig::from_env()?, 313 firehose: FirehoseConfig::from_env()?, 314 tap: TapConfig::from_env()?, 315 indexer: IndexerConfig::from_env(), 316 shard: ShardConfig::from_env(), 317 source: SourceMode::from_env(), 318 }) 319 } 320}