atproto blogging
1use crate::error::{ConfigError, IndexError};
2use dashmap::DashSet;
3use url::Url;
4
5/// ClickHouse connection configuration
6#[derive(Debug, Clone)]
7pub struct ClickHouseConfig {
8 pub url: Url,
9 pub database: String,
10 pub user: String,
11 pub password: String,
12}
13
14impl ClickHouseConfig {
15 /// Load configuration from environment variables.
16 ///
17 /// Required env vars:
18 /// - `CLICKHOUSE_URL`: Full URL including protocol (e.g., `https://xyz.clickhouse.cloud:8443`)
19 /// - `CLICKHOUSE_DATABASE`: Database name
20 /// - `CLICKHOUSE_USER`: Username
21 /// - `CLICKHOUSE_PASSWORD`: Password
22 pub fn from_env() -> Result<Self, IndexError> {
23 let url_str = std::env::var("CLICKHOUSE_URL").map_err(|_| ConfigError::MissingEnv {
24 var: "CLICKHOUSE_URL",
25 })?;
26
27 let url = Url::parse(&url_str).map_err(|e| ConfigError::UrlParse {
28 url: url_str,
29 message: e.to_string(),
30 })?;
31
32 let database =
33 std::env::var("CLICKHOUSE_DATABASE").map_err(|_| ConfigError::MissingEnv {
34 var: "CLICKHOUSE_DATABASE",
35 })?;
36
37 let user = std::env::var("CLICKHOUSE_USER").map_err(|_| ConfigError::MissingEnv {
38 var: "CLICKHOUSE_USER",
39 })?;
40
41 let password =
42 std::env::var("CLICKHOUSE_PASSWORD").map_err(|_| ConfigError::MissingEnv {
43 var: "CLICKHOUSE_PASSWORD",
44 })?;
45
46 Ok(Self {
47 url,
48 database,
49 user,
50 password,
51 })
52 }
53}
54
55/// Firehose relay configuration
56#[derive(Debug, Clone)]
57pub struct FirehoseConfig {
58 pub relay_url: Url,
59 pub cursor: Option<i64>,
60}
61
62impl FirehoseConfig {
63 /// Default relay URL (Bluesky network)
64 pub const DEFAULT_RELAY: &'static str = "wss://bsky.network";
65
66 /// Load configuration from environment variables.
67 ///
68 /// Optional env vars:
69 /// - `FIREHOSE_RELAY_URL`: Relay WebSocket URL (default: wss://bsky.network)
70 /// - `FIREHOSE_CURSOR`: Starting cursor position (default: none, starts from live)
71 pub fn from_env() -> Result<Self, IndexError> {
72 let relay_str =
73 std::env::var("FIREHOSE_RELAY_URL").unwrap_or_else(|_| Self::DEFAULT_RELAY.to_string());
74
75 let relay_url = Url::parse(&relay_str).map_err(|e| ConfigError::UrlParse {
76 url: relay_str,
77 message: e.to_string(),
78 })?;
79
80 let cursor = std::env::var("FIREHOSE_CURSOR")
81 .ok()
82 .and_then(|s| s.parse().ok());
83
84 Ok(Self { relay_url, cursor })
85 }
86}
87
88use smol_str::{SmolStr, ToSmolStr};
89
90/// Pre-parsed collection filter for efficient matching
91#[derive(Debug, Clone)]
92pub struct CollectionFilter {
93 /// Prefix patterns (from "foo.*" -> "foo.")
94 prefixes: Vec<SmolStr>,
95 /// Exact match patterns (HashSet for O(1) lookup)
96 exact: DashSet<SmolStr>,
97 /// True if filter is empty (accept all)
98 accept_all: bool,
99}
100
101impl CollectionFilter {
102 /// Parse filter patterns into prefixes and exact matches
103 pub fn new(patterns: Vec<SmolStr>) -> Self {
104 let mut prefixes = Vec::new();
105 let exact = DashSet::new();
106
107 for pattern in patterns {
108 if let Some(prefix) = pattern.strip_suffix('*') {
109 prefixes.push(SmolStr::new(prefix));
110 } else {
111 exact.insert(SmolStr::new(&pattern));
112 }
113 }
114
115 let accept_all = prefixes.is_empty() && exact.is_empty();
116 Self {
117 prefixes,
118 exact,
119 accept_all,
120 }
121 }
122
123 /// Check if a collection matches any pattern
124 #[inline]
125 pub fn matches(&self, collection: &str) -> bool {
126 if self.accept_all {
127 return true;
128 }
129
130 // O(1) exact match check first
131 if self.exact.contains(collection) {
132 return true;
133 }
134
135 // Prefix check - for small N, linear scan is fine
136 // Accumulate without early return to help branch predictor
137 let mut matched = false;
138 for prefix in &self.prefixes {
139 matched |= collection.starts_with(prefix.as_str());
140 }
141 matched
142 }
143}
144
145/// Indexer runtime configuration
146#[derive(Debug, Clone)]
147pub struct IndexerConfig {
148 /// Maximum records to batch before flushing to ClickHouse
149 pub batch_size: usize,
150 /// Maximum time (ms) before flushing even if batch isn't full
151 pub flush_interval_ms: u64,
152 /// Collection filter (pre-parsed patterns)
153 pub collections: CollectionFilter,
154}
155
156impl Default for IndexerConfig {
157 fn default() -> Self {
158 Self {
159 batch_size: 1000,
160 flush_interval_ms: 1000,
161 collections: CollectionFilter::new(vec![
162 SmolStr::new_static("sh.weaver.*"),
163 SmolStr::new_static("app.bsky.actor.profile"),
164 ]),
165 }
166 }
167}
168
169impl IndexerConfig {
170 /// Load configuration from environment variables.
171 ///
172 /// Optional env vars:
173 /// - `INDEXER_BATCH_SIZE`: Max records per batch (default: 1000)
174 /// - `INDEXER_FLUSH_INTERVAL_MS`: Max ms between flushes (default: 1000)
175 /// - `INDEXER_COLLECTIONS`: Comma-separated collection patterns (default: sh.weaver.*,app.bsky.actor.profile)
176 /// Use * suffix for prefix matching, e.g., "sh.weaver.*" matches all sh.weaver.* collections
177 pub fn from_env() -> Self {
178 let batch_size = std::env::var("INDEXER_BATCH_SIZE")
179 .ok()
180 .and_then(|s| s.parse().ok())
181 .unwrap_or(1000);
182
183 let flush_interval_ms = std::env::var("INDEXER_FLUSH_INTERVAL_MS")
184 .ok()
185 .and_then(|s| s.parse().ok())
186 .unwrap_or(1000);
187
188 let patterns: Vec<SmolStr> = std::env::var("INDEXER_COLLECTIONS")
189 .map(|s| s.split(',').map(|p| p.trim().to_smolstr()).collect())
190 .unwrap_or_else(|_| {
191 vec![
192 SmolStr::new_static("sh.weaver.*"),
193 SmolStr::new_static("app.bsky.actor.profile"),
194 ]
195 });
196
197 Self {
198 batch_size,
199 flush_interval_ms,
200 collections: CollectionFilter::new(patterns),
201 }
202 }
203}
204
205/// Tap connection configuration
206#[derive(Debug, Clone)]
207pub struct TapConfig {
208 pub url: Url,
209 pub send_acks: bool,
210 pub num_workers: usize,
211}
212
213impl TapConfig {
214 /// Default tap URL (local)
215 pub const DEFAULT_URL: &'static str = "ws://localhost:2480/channel";
216 /// Default number of parallel workers
217 pub const DEFAULT_WORKERS: usize = 4;
218
219 /// Load configuration from environment variables.
220 ///
221 /// Optional env vars:
222 /// - `TAP_URL`: Tap WebSocket URL (default: ws://localhost:2480/channel)
223 /// - `TAP_SEND_ACKS`: Whether to send acks (default: true)
224 /// - `TAP_WORKERS`: Number of parallel workers (default: 4)
225 pub fn from_env() -> Result<Self, IndexError> {
226 let url_str = std::env::var("TAP_URL").unwrap_or_else(|_| Self::DEFAULT_URL.to_string());
227
228 let url = Url::parse(&url_str).map_err(|e| ConfigError::UrlParse {
229 url: url_str,
230 message: e.to_string(),
231 })?;
232
233 let send_acks = std::env::var("TAP_SEND_ACKS")
234 .map(|s| s.to_lowercase() != "false")
235 .unwrap_or(true);
236
237 let num_workers = std::env::var("TAP_WORKERS")
238 .ok()
239 .and_then(|s| s.parse().ok())
240 .unwrap_or(Self::DEFAULT_WORKERS);
241
242 Ok(Self {
243 url,
244 send_acks,
245 num_workers,
246 })
247 }
248}
249
250/// Source mode for the indexer
251#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
252pub enum SourceMode {
253 /// Direct firehose connection
254 #[default]
255 Firehose,
256 /// Consume from tap
257 Tap,
258}
259
260impl SourceMode {
261 pub fn from_env() -> Self {
262 match std::env::var("INDEXER_SOURCE").as_deref() {
263 Ok("tap") => SourceMode::Tap,
264 _ => SourceMode::Firehose,
265 }
266 }
267}
268
269/// SQLite shard configuration
270#[derive(Debug, Clone)]
271pub struct ShardConfig {
272 pub base_path: std::path::PathBuf,
273}
274
275impl Default for ShardConfig {
276 fn default() -> Self {
277 Self {
278 base_path: std::path::PathBuf::from("./shards"),
279 }
280 }
281}
282
283impl ShardConfig {
284 /// Load configuration from environment variables.
285 ///
286 /// Optional env vars:
287 /// - `SHARD_BASE_PATH`: Base directory for SQLite shards (default: ./shards)
288 pub fn from_env() -> Self {
289 let base_path = std::env::var("SHARD_BASE_PATH")
290 .map(std::path::PathBuf::from)
291 .unwrap_or_else(|_| std::path::PathBuf::from("./shards"));
292
293 Self { base_path }
294 }
295}
296
297/// Combined configuration for the indexer
298#[derive(Debug, Clone)]
299pub struct Config {
300 pub clickhouse: ClickHouseConfig,
301 pub firehose: FirehoseConfig,
302 pub tap: TapConfig,
303 pub indexer: IndexerConfig,
304 pub shard: ShardConfig,
305 pub source: SourceMode,
306}
307
308impl Config {
309 /// Load all configuration from environment variables.
310 pub fn from_env() -> Result<Self, IndexError> {
311 Ok(Self {
312 clickhouse: ClickHouseConfig::from_env()?,
313 firehose: FirehoseConfig::from_env()?,
314 tap: TapConfig::from_env()?,
315 indexer: IndexerConfig::from_env(),
316 shard: ShardConfig::from_env(),
317 source: SourceMode::from_env(),
318 })
319 }
320}