very fast at protocol indexer with flexible filtering, xrpc queries, and a cursor-backed event stream, built on fjall
fjall
at-protocol
atproto
indexer
rust
1use miette::Result;
2use std::fmt;
3use std::path::PathBuf;
4use std::str::FromStr;
5use std::time::Duration;
6use url::Url;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum Compression {
10 Lz4,
11 None,
12}
13
14impl FromStr for Compression {
15 type Err = miette::Error;
16 fn from_str(s: &str) -> Result<Self> {
17 match s {
18 "lz4" => Ok(Self::Lz4),
19 "none" => Ok(Self::None),
20 _ => Err(miette::miette!("invalid compression type")),
21 }
22 }
23}
24
25impl fmt::Display for Compression {
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 match self {
28 Self::Lz4 => write!(f, "lz4"),
29 Self::None => write!(f, "none"),
30 }
31 }
32}
33
34#[derive(Debug, Clone, Copy)]
35pub enum SignatureVerification {
36 Full,
37 BackfillOnly,
38 None,
39}
40
41impl FromStr for SignatureVerification {
42 type Err = miette::Error;
43 fn from_str(s: &str) -> Result<Self> {
44 match s {
45 "full" => Ok(Self::Full),
46 "backfill-only" => Ok(Self::BackfillOnly),
47 "none" => Ok(Self::None),
48 _ => Err(miette::miette!("invalid signature verification level")),
49 }
50 }
51}
52
53impl fmt::Display for SignatureVerification {
54 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
55 match self {
56 Self::Full => write!(f, "full"),
57 Self::BackfillOnly => write!(f, "backfill-only"),
58 Self::None => write!(f, "none"),
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
64pub struct Config {
65 pub database_path: PathBuf,
66 pub relays: Vec<Url>,
67 pub plc_urls: Vec<Url>,
68 pub full_network: bool,
69 pub ephemeral: bool,
70 pub cursor_save_interval: Duration,
71 pub repo_fetch_timeout: Duration,
72 pub api_port: u16,
73 pub cache_size: u64,
74 pub backfill_concurrency_limit: usize,
75 pub data_compression: Compression,
76 pub journal_compression: Compression,
77 pub debug_port: u16,
78 pub enable_debug: bool,
79 pub verify_signatures: SignatureVerification,
80 pub identity_cache_size: u64,
81 pub enable_firehose: bool,
82 pub enable_backfill: bool,
83 pub enable_crawler: Option<bool>,
84 pub firehose_workers: usize,
85 pub db_compact: bool,
86 pub db_worker_threads: usize,
87 pub db_max_journaling_size_mb: u64,
88 pub db_blocks_memtable_size_mb: u64,
89 pub db_repos_memtable_size_mb: u64,
90 pub db_events_memtable_size_mb: u64,
91 pub db_records_memtable_size_mb: u64,
92 pub crawler_max_pending_repos: usize,
93 pub crawler_resume_pending_repos: usize,
94 pub filter_signals: Option<Vec<String>>,
95 pub filter_collections: Option<Vec<String>>,
96 pub filter_excludes: Option<Vec<String>>,
97}
98
99impl Config {
100 pub fn from_env() -> Result<Self> {
101 macro_rules! cfg {
102 (@val $key:expr) => {
103 std::env::var(concat!("HYDRANT_", $key))
104 };
105 ($key:expr, $default:expr, sec) => {
106 cfg!(@val $key)
107 .ok()
108 .and_then(|s| humantime::parse_duration(&s).ok())
109 .unwrap_or(Duration::from_secs($default))
110 };
111 ($key:expr, $default:expr) => {
112 cfg!(@val $key)
113 .ok()
114 .and_then(|s| s.parse().ok())
115 .unwrap_or($default.to_owned())
116 .into()
117 };
118 }
119
120 let relay_host: Url = cfg!(
121 "RELAY_HOST",
122 Url::parse("wss://relay.fire.hose.cam/").unwrap()
123 );
124 let relay_hosts = std::env::var("HYDRANT_RELAY_HOSTS")
125 .ok()
126 .and_then(|hosts| {
127 hosts
128 .split(',')
129 .map(|s| Url::parse(s.trim()))
130 .collect::<Result<Vec<_>, _>>()
131 .inspect_err(|e| tracing::warn!("invalid relay host URL: {e}"))
132 .ok()
133 })
134 .unwrap_or_default();
135 let relay_hosts = relay_hosts
136 .is_empty()
137 .then(|| vec![relay_host])
138 .unwrap_or(relay_hosts);
139
140 let full_network: bool = cfg!("FULL_NETWORK", false);
141
142 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL")
143 .ok()
144 .map(|s| {
145 s.split(',')
146 .map(|s| Url::parse(s.trim()))
147 .collect::<Result<Vec<_>, _>>()
148 .map_err(|e| miette::miette!("invalid PLC URL: {e}"))
149 })
150 .unwrap_or_else(|| {
151 Ok(vec![
152 full_network
153 .then_some(Url::parse("https://plc.directory").unwrap())
154 .unwrap_or(Url::parse("https://plc.wtf").unwrap()),
155 ])
156 })?;
157
158 let cursor_save_interval = cfg!("CURSOR_SAVE_INTERVAL", 3, sec);
159 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec);
160
161 let ephemeral: bool = cfg!("EPHEMERAL", false);
162 let database_path = cfg!("DATABASE_PATH", "./hydrant.db");
163 let cache_size = cfg!("CACHE_SIZE", 256u64);
164 let data_compression = cfg!("DATA_COMPRESSION", Compression::Lz4);
165 let journal_compression = cfg!("JOURNAL_COMPRESSION", Compression::Lz4);
166
167 let api_port = cfg!("API_PORT", 3000u16);
168 let enable_debug = cfg!("ENABLE_DEBUG", false);
169 let debug_port: u16 = api_port + 1;
170 let debug_port = cfg!("DEBUG_PORT", debug_port);
171 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full);
172 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64);
173 let enable_firehose = cfg!("ENABLE_FIREHOSE", true);
174 let enable_backfill = cfg!("ENABLE_BACKFILL", true);
175 let enable_crawler = std::env::var("HYDRANT_ENABLE_CRAWLER")
176 .ok()
177 .and_then(|s| s.parse().ok());
178
179 let backfill_concurrency_limit = cfg!(
180 "BACKFILL_CONCURRENCY_LIMIT",
181 full_network.then_some(64usize).unwrap_or(16usize)
182 );
183 let firehose_workers = cfg!(
184 "FIREHOSE_WORKERS",
185 full_network.then_some(24usize).unwrap_or(8usize)
186 );
187
188 let db_compact = cfg!("COMPACT_DB", false);
189
190 let (
191 default_db_worker_threads,
192 default_db_max_journaling_size_mb,
193 default_db_memtable_size_mb,
194 ): (usize, u64, u64) = full_network
195 .then_some((8usize, 1024u64, 192u64))
196 .unwrap_or((4usize, 400u64, 32u64));
197
198 let db_worker_threads = cfg!("DB_WORKER_THREADS", default_db_worker_threads);
199 let db_max_journaling_size_mb = cfg!(
200 "DB_MAX_JOURNALING_SIZE_MB",
201 default_db_max_journaling_size_mb
202 );
203 let db_blocks_memtable_size_mb =
204 cfg!("DB_BLOCKS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
205 let db_events_memtable_size_mb =
206 cfg!("DB_EVENTS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb);
207 let db_records_memtable_size_mb = cfg!(
208 "DB_RECORDS_MEMTABLE_SIZE_MB",
209 // records is did + col + rkey -> CID so its pretty cheap
210 default_db_memtable_size_mb / 3 * 2
211 );
212 let db_repos_memtable_size_mb =
213 cfg!("DB_REPOS_MEMTABLE_SIZE_MB", default_db_memtable_size_mb / 2);
214
215 let crawler_max_pending_repos = cfg!("CRAWLER_MAX_PENDING_REPOS", 2000usize);
216 let crawler_resume_pending_repos = cfg!("CRAWLER_RESUME_PENDING_REPOS", 1000usize);
217
218 let filter_signals = std::env::var("HYDRANT_FILTER_SIGNALS").ok().map(|s| {
219 s.split(',')
220 .map(|s| s.trim().to_string())
221 .filter(|s| !s.is_empty())
222 .collect()
223 });
224
225 let filter_collections = std::env::var("HYDRANT_FILTER_COLLECTIONS").ok().map(|s| {
226 s.split(',')
227 .map(|s| s.trim().to_string())
228 .filter(|s| !s.is_empty())
229 .collect()
230 });
231
232 let filter_excludes = std::env::var("HYDRANT_FILTER_EXCLUDES").ok().map(|s| {
233 s.split(',')
234 .map(|s| s.trim().to_string())
235 .filter(|s| !s.is_empty())
236 .collect()
237 });
238
239 Ok(Self {
240 database_path,
241 relays: relay_hosts,
242 plc_urls,
243 ephemeral,
244 full_network,
245 cursor_save_interval,
246 repo_fetch_timeout,
247 api_port,
248 cache_size,
249 backfill_concurrency_limit,
250 data_compression,
251 journal_compression,
252 debug_port,
253 enable_debug,
254 verify_signatures,
255 identity_cache_size,
256 enable_firehose,
257 enable_backfill,
258 enable_crawler,
259 firehose_workers,
260 db_compact,
261 db_worker_threads,
262 db_max_journaling_size_mb,
263 db_blocks_memtable_size_mb,
264 db_repos_memtable_size_mb,
265 db_events_memtable_size_mb,
266 db_records_memtable_size_mb,
267 crawler_max_pending_repos,
268 crawler_resume_pending_repos,
269 filter_signals,
270 filter_collections,
271 filter_excludes,
272 })
273 }
274}
275
276macro_rules! config_line {
277 ($f:expr, $label:expr, $value:expr) => {
278 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH)
279 };
280}
281
282impl fmt::Display for Config {
283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284 const LABEL_WIDTH: usize = 27;
285
286 writeln!(f, "hydrant configuration:")?;
287 config_line!(f, "relay hosts", format_args!("{:?}", self.relays))?;
288 config_line!(f, "plc urls", format_args!("{:?}", self.plc_urls))?;
289 config_line!(f, "full network indexing", self.full_network)?;
290 config_line!(f, "verify signatures", self.verify_signatures)?;
291 config_line!(f, "backfill concurrency", self.backfill_concurrency_limit)?;
292 config_line!(f, "identity cache size", self.identity_cache_size)?;
293 config_line!(
294 f,
295 "cursor save interval",
296 format_args!("{}sec", self.cursor_save_interval.as_secs())
297 )?;
298 config_line!(
299 f,
300 "repo fetch timeout",
301 format_args!("{}sec", self.repo_fetch_timeout.as_secs())
302 )?;
303 config_line!(f, "ephemeral", self.ephemeral)?;
304 config_line!(f, "database path", self.database_path.to_string_lossy())?;
305 config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?;
306 config_line!(f, "data compression", self.data_compression)?;
307 config_line!(f, "journal compression", self.journal_compression)?;
308 config_line!(f, "api port", self.api_port)?;
309 config_line!(f, "firehose workers", self.firehose_workers)?;
310 config_line!(f, "db compact", self.db_compact)?;
311 config_line!(f, "db worker threads", self.db_worker_threads)?;
312 config_line!(
313 f,
314 "db journal size",
315 format_args!("{} mb", self.db_max_journaling_size_mb)
316 )?;
317 config_line!(
318 f,
319 "db blocks memtable",
320 format_args!("{} mb", self.db_blocks_memtable_size_mb)
321 )?;
322 config_line!(
323 f,
324 "db repos memtable",
325 format_args!("{} mb", self.db_repos_memtable_size_mb)
326 )?;
327 config_line!(
328 f,
329 "db events memtable",
330 format_args!("{} mb", self.db_events_memtable_size_mb)
331 )?;
332 config_line!(
333 f,
334 "db records memtable",
335 format_args!("{} mb", self.db_records_memtable_size_mb)
336 )?;
337 config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?;
338 config_line!(
339 f,
340 "crawler resume pending",
341 self.crawler_resume_pending_repos
342 )?;
343 if let Some(signals) = &self.filter_signals {
344 config_line!(f, "filter signals", format_args!("{:?}", signals))?;
345 }
346 if let Some(collections) = &self.filter_collections {
347 config_line!(f, "filter collections", format_args!("{:?}", collections))?;
348 }
349 if let Some(excludes) = &self.filter_excludes {
350 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?;
351 }
352 config_line!(f, "enable debug", self.enable_debug)?;
353 if self.enable_debug {
354 config_line!(f, "debug port", self.debug_port)?;
355 }
356 Ok(())
357 }
358}