very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer

[all] programmatical api / hydrant-as-library

ptr.pet b24b23f1 b7be2730

verified
+1600 -926
+171
examples/statusphere.rs
··· 1 + //! a statusphere indexer: tracks xyz.statusphere.status records across the ATProto network. 2 + //! 3 + //! statusphere is a demo app where users set a single-emoji status on their bluesky profile. 4 + //! this example indexes those status records in real time, maintaining the current status 5 + //! per user and printing a periodic leaderboard of the top emoji statuses in use. 6 + //! 7 + //! see: https://github.com/bluesky-social/statusphere-example-app 8 + //! 9 + //! run with: 10 + //! HYDRANT_DATABASE_PATH=./statusphere.db cargo run --example statusphere 11 + //! 12 + //! the database persists records across restarts. on each start the full event 13 + //! history is replayed from the database to rebuild the in-memory index. 14 + 15 + use std::sync::Arc; 16 + use std::time::Duration; 17 + 18 + use futures::StreamExt; 19 + use hydrant::config::Config; 20 + use hydrant::control::{EventStream, Hydrant, ReposControl}; 21 + use hydrant::filter::FilterMode; 22 + use scc::HashMap; 23 + 24 + const COLLECTION: &str = "xyz.statusphere.status"; 25 + 26 + struct StatusEntry { 27 + emoji: String, 28 + created_at: String, 29 + } 30 + 31 + struct StatusIndex { 32 + /// current status per DID: only the latest by createdAt is kept. 33 + current: HashMap<String, StatusEntry>, 34 + } 35 + 36 + impl StatusIndex { 37 + fn new() -> Self { 38 + Self { 39 + current: HashMap::new(), 40 + } 41 + } 42 + 43 + fn set(&self, did: String, emoji: String, created_at: String) -> bool { 44 + let is_newer = self 45 + .current 46 + .read_sync(&did, |_, e| created_at > e.created_at) 47 + .unwrap_or(true); 48 + if is_newer { 49 + self.current 50 + .upsert_sync(did, StatusEntry { emoji, created_at }); 51 + } 52 + is_newer 53 + } 54 + 55 + fn delete(&self, did: &str) { 56 + self.current.remove_sync(did); 57 + } 58 + 59 + fn top(&self, n: usize) -> Vec<(String, usize)> { 60 + use std::collections::HashMap; 61 + let mut counts: HashMap<String, usize> = HashMap::with_capacity(self.current.capacity()); 62 + self.current.iter_sync(|_, e| { 63 + *counts.entry(e.emoji.clone()).or_default() += 1; 64 + true 65 + }); 66 + let mut ranked: Vec<_> = counts.into_iter().collect(); 67 + ranked.sort_by(|a, b| b.1.cmp(&a.1)); 68 + ranked.truncate(n); 69 + ranked 70 + } 71 + } 72 + 73 + async fn run_ticker(index: Arc<StatusIndex>) { 74 + let mut interval = tokio::time::interval(Duration::from_secs(30)); 75 + interval.tick().await; 76 + loop { 77 + interval.tick().await; 78 + let top = index.top(10); 79 + if top.is_empty() { 80 + continue; 81 + } 82 + println!( 83 + "\n--- top statuses ({} users tracked) ---", 84 + index.current.len() 85 + ); 86 + for (emoji, count) in &top { 87 + println!(" {emoji} ×{count}"); 88 + } 89 + println!("----------------------------------------\n"); 90 + } 91 + } 92 + 93 + async fn handle_stream(index: Arc<StatusIndex>, repos: ReposControl, mut stream: EventStream) { 94 + while let Some(event) = stream.next().await { 95 + if let Some(rec) = event.record { 96 + let did = rec.did.as_str().to_owned(); 97 + match rec.action.as_str() { 98 + "create" | "update" => { 99 + let Some(record) = rec.record else { continue }; 100 + let Some(emoji) = record 101 + .get("status") 102 + .and_then(|v| v.as_str()) 103 + .map(|s| s.to_owned()) 104 + else { 105 + continue; 106 + }; 107 + let created_at = record 108 + .get("createdAt") 109 + .and_then(|v| v.as_str()) 110 + .unwrap_or("") 111 + .to_owned(); 112 + if index.set(did.clone(), emoji.clone(), created_at) { 113 + let name = repos 114 + .get(&rec.did) 115 + .await 116 + .ok() 117 + .flatten() 118 + .and_then(|info| info.handle) 119 + .unwrap_or(did); 120 + println!("[{}] {name} set status: {emoji}", event.id); 121 + } 122 + } 123 + "delete" => { 124 + let name = repos 125 + .get(&rec.did) 126 + .await 127 + .ok() 128 + .flatten() 129 + .and_then(|info| info.handle) 130 + .unwrap_or(did.clone()); 131 + index.delete(&did); 132 + println!("[{}] {name} cleared status", event.id); 133 + } 134 + _ => {} 135 + } 136 + } else if let Some(account) = event.account { 137 + // when an account is deactivated or deleted, drop their status. 138 + if !account.active { 139 + index.delete(account.did.as_str()); 140 + } 141 + } 142 + } 143 + } 144 + 145 + #[tokio::main] 146 + async fn main() -> miette::Result<()> { 147 + tracing_subscriber::fmt() 148 + .with_env_filter("hydrant=info") 149 + .init(); 150 + 151 + let cfg = Config::from_env()?; 152 + let hydrant = Hydrant::new(cfg).await?; 153 + 154 + // discover only repos that publish xyz.statusphere.status records, 155 + // and only store that collection (all other record types are dropped). 156 + hydrant.filter.set_mode(FilterMode::Filter).await?; 157 + hydrant.filter.set_signals([COLLECTION]).await?; 158 + hydrant.filter.set_collections([COLLECTION]).await?; 159 + 160 + // replay all persisted events from the start to rebuild the in-memory index, 161 + // then switch to live tail. since the index is in-memory, we always need the 162 + // full replay on startup. 163 + let stream = hydrant.subscribe(Some(0)); 164 + 165 + let index = Arc::new(StatusIndex::new()); 166 + tokio::select! { 167 + r = hydrant.run() => r, 168 + _ = run_ticker(index.clone()) => Ok(()), 169 + _ = handle_stream(index.clone(), hydrant.repos.clone(), stream) => Ok(()), 170 + } 171 + }
+17 -35
src/api/db.rs
··· 1 - use std::sync::Arc; 2 - 3 - use crate::state::AppState; 1 + use crate::control::Hydrant; 4 2 use axum::{Router, extract::State, http::StatusCode, routing::post}; 5 - use futures::FutureExt; 6 - use miette::IntoDiagnostic; 7 3 8 - pub fn router() -> Router<Arc<AppState>> { 4 + pub fn router() -> Router<Hydrant> { 9 5 Router::new() 10 6 .route("/db/train", post(handle_train_dict)) 11 7 .route("/db/compact", post(handle_compact)) 12 8 } 13 9 14 10 pub async fn handle_train_dict( 15 - State(state): State<Arc<AppState>>, 16 - ) -> Result<StatusCode, StatusCode> { 17 - state 18 - .with_ingestion_paused(async || { 19 - let train = |name: &'static str| { 20 - let db = state.db.clone(); 21 - tokio::task::spawn_blocking(move || db.train_dict(name)) 22 - .map(|res| res.into_diagnostic().flatten()) 23 - }; 24 - let repos = train("repos"); 25 - let blocks = train("blocks"); 26 - let events = train("events"); 27 - 28 - tokio::try_join!(repos, blocks, events) 29 - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 30 - 31 - Ok(StatusCode::OK) 32 - }) 11 + State(hydrant): State<Hydrant>, 12 + ) -> Result<StatusCode, (StatusCode, String)> { 13 + hydrant 14 + .db 15 + .train_dicts() 33 16 .await 17 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 18 + Ok(StatusCode::OK) 34 19 } 35 20 36 - pub async fn handle_compact(State(state): State<Arc<AppState>>) -> Result<StatusCode, StatusCode> { 37 - state 38 - .with_ingestion_paused(async || { 39 - state 40 - .db 41 - .compact() 42 - .await 43 - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 44 - 45 - Ok(StatusCode::OK) 46 - }) 21 + pub async fn handle_compact( 22 + State(hydrant): State<Hydrant>, 23 + ) -> Result<StatusCode, (StatusCode, String)> { 24 + hydrant 25 + .db 26 + .compact() 47 27 .await 28 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 29 + Ok(StatusCode::OK) 48 30 }
+21 -75
src/api/filter.rs
··· 1 - use std::sync::Arc; 2 - 3 - use crate::api::AppState; 4 - use crate::db; 5 - use crate::db::filter::EXCLUDE_PREFIX; 1 + use crate::control::Hydrant; 6 2 use crate::filter::{FilterMode, SetUpdate}; 7 3 use axum::{ 8 4 Json, Router, ··· 10 6 http::StatusCode, 11 7 routing::{get, patch}, 12 8 }; 13 - use miette::IntoDiagnostic; 14 - use serde::{Deserialize, Serialize}; 9 + use serde::Deserialize; 15 10 16 - pub fn router() -> Router<Arc<AppState>> { 11 + type FilterSnapshot = crate::control::FilterSnapshot; 12 + 13 + pub fn router() -> Router<Hydrant> { 17 14 Router::new() 18 15 .route("/filter", get(handle_get_filter)) 19 16 .route("/filter", patch(handle_patch_filter)) 20 17 } 21 18 22 - #[derive(Serialize)] 23 - pub struct FilterResponse { 24 - pub mode: FilterMode, 25 - pub signals: Vec<String>, 26 - pub collections: Vec<String>, 27 - pub excludes: Vec<String>, 28 - } 29 - 30 19 pub async fn handle_get_filter( 31 - State(state): State<Arc<AppState>>, 32 - ) -> Result<Json<FilterResponse>, (StatusCode, String)> { 33 - let filter_ks = state.db.filter.clone(); 34 - let resp = tokio::task::spawn_blocking(move || { 35 - let hot = db::filter::load(&filter_ks).map_err(|e| e.to_string())?; 36 - let excludes = 37 - db::filter::read_set(&filter_ks, EXCLUDE_PREFIX).map_err(|e| e.to_string())?; 38 - Ok::<_, String>(FilterResponse { 39 - mode: hot.mode, 40 - signals: hot.signals.iter().map(|s| s.to_string()).collect(), 41 - collections: hot.collections.iter().map(|s| s.to_string()).collect(), 42 - excludes, 43 - }) 44 - }) 45 - .await 46 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 47 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 48 - 49 - Ok(Json(resp)) 20 + State(hydrant): State<Hydrant>, 21 + ) -> Result<Json<FilterSnapshot>, (StatusCode, String)> { 22 + hydrant 23 + .filter 24 + .get() 25 + .await 26 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 27 + .map(Json) 50 28 } 51 29 52 30 #[derive(Deserialize)] ··· 58 36 } 59 37 60 38 pub async fn handle_patch_filter( 61 - State(state): State<Arc<AppState>>, 39 + State(hydrant): State<Hydrant>, 62 40 Json(patch): Json<FilterPatch>, 63 - ) -> Result<StatusCode, (StatusCode, String)> { 64 - let db = &state.db; 65 - 66 - let filter_ks = db.filter.clone(); 67 - let inner = db.inner.clone(); 68 - 69 - let patch_mode = patch.mode; 70 - let patch_signals = patch.signals; 71 - let patch_collections = patch.collections; 72 - let patch_excludes = patch.excludes; 73 - 74 - let new_filter = tokio::task::spawn_blocking(move || { 75 - let mut batch = inner.batch(); 76 - 77 - db::filter::apply_patch( 78 - &mut batch, 79 - &filter_ks, 80 - patch_mode, 81 - patch_signals, 82 - patch_collections, 83 - patch_excludes, 84 - ) 85 - .map_err(|e| e.to_string())?; 86 - 87 - batch 88 - .commit() 89 - .into_diagnostic() 90 - .map_err(|e| e.to_string())?; 91 - 92 - let new_filter = db::filter::load(&filter_ks).map_err(|e| e.to_string())?; 93 - Ok::<_, String>(new_filter) 94 - }) 95 - .await 96 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? 97 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; 98 - 99 - state.filter.store(Arc::new(new_filter)); 100 - 101 - Ok(StatusCode::OK) 41 + ) -> Result<Json<FilterSnapshot>, (StatusCode, String)> { 42 + hydrant 43 + .filter 44 + .patch(patch.mode, patch.signals, patch.collections, patch.excludes) 45 + .await 46 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 47 + .map(Json) 102 48 }
+22 -12
src/api/ingestion.rs
··· 1 - use std::sync::Arc; 2 - 3 - use crate::state::AppState; 1 + use crate::control::Hydrant; 4 2 use axum::{ 5 3 Json, Router, 6 4 extract::State, ··· 9 7 }; 10 8 use serde::{Deserialize, Serialize}; 11 9 12 - pub fn router() -> Router<Arc<AppState>> { 10 + pub fn router() -> Router<Hydrant> { 13 11 Router::new() 14 12 .route("/ingestion", get(get_ingestion)) 15 13 .route("/ingestion", patch(patch_ingestion)) ··· 22 20 pub backfill: bool, 23 21 } 24 22 25 - pub async fn get_ingestion(State(state): State<Arc<AppState>>) -> Json<IngestionStatus> { 23 + pub async fn get_ingestion(State(hydrant): State<Hydrant>) -> Json<IngestionStatus> { 26 24 Json(IngestionStatus { 27 - crawler: *state.crawler_enabled.borrow(), 28 - firehose: *state.firehose_enabled.borrow(), 29 - backfill: *state.backfill_enabled.borrow(), 25 + crawler: hydrant.crawler.is_enabled(), 26 + firehose: hydrant.firehose.is_enabled(), 27 + backfill: hydrant.backfill.is_enabled(), 30 28 }) 31 29 } 32 30 ··· 41 39 } 42 40 43 41 pub async fn patch_ingestion( 44 - State(state): State<Arc<AppState>>, 42 + State(hydrant): State<Hydrant>, 45 43 Json(body): Json<IngestionPatch>, 46 44 ) -> StatusCode { 47 45 if let Some(crawler) = body.crawler { 48 - state.crawler_enabled.send_replace(crawler); 46 + if crawler { 47 + hydrant.crawler.enable(); 48 + } else { 49 + hydrant.crawler.disable(); 50 + } 49 51 } 50 52 if let Some(firehose) = body.firehose { 51 - state.firehose_enabled.send_replace(firehose); 53 + if firehose { 54 + hydrant.firehose.enable(); 55 + } else { 56 + hydrant.firehose.disable(); 57 + } 52 58 } 53 59 if let Some(backfill) = body.backfill { 54 - state.backfill_enabled.send_replace(backfill); 60 + if backfill { 61 + hydrant.backfill.enable(); 62 + } else { 63 + hydrant.backfill.disable(); 64 + } 55 65 } 56 66 StatusCode::OK 57 67 }
+3 -2
src/api/mod.rs
··· 1 + use crate::control::Hydrant; 1 2 use crate::state::AppState; 2 3 use axum::{Router, routing::get}; 3 4 use std::{net::SocketAddr, sync::Arc}; ··· 13 14 mod stream; 14 15 mod xrpc; 15 16 16 - pub async fn serve(state: Arc<AppState>, port: u16) -> miette::Result<()> { 17 + pub async fn serve(hydrant: Hydrant, port: u16) -> miette::Result<()> { 17 18 let app = Router::new() 18 19 .route("/health", get(|| async { "OK" })) 19 20 .route("/stats", get(stats::get_stats)) ··· 23 24 .merge(repos::router()) 24 25 .merge(ingestion::router()) 25 26 .merge(db::router()) 26 - .with_state(state) 27 + .with_state(hydrant) 27 28 .layer(TraceLayer::new_for_http()) 28 29 .layer(CorsLayer::permissive()); 29 30
+38 -200
src/api/repos.rs
··· 1 - use std::sync::Arc; 2 - 1 + use crate::control::{Hydrant, RepoInfo, repo_state_to_info}; 2 + use crate::db::keys; 3 3 use axum::{ 4 4 Json, Router, 5 5 body::Body, ··· 8 8 response::{IntoResponse, Response}, 9 9 routing::{delete, get, put}, 10 10 }; 11 - use chrono::{DateTime, Utc}; 12 - use jacquard_common::{IntoStatic, types::did::Did}; 13 - use miette::IntoDiagnostic; 14 - use rand::Rng; 15 - use serde::{Deserialize, Serialize}; 16 - 17 - use crate::api::AppState; 18 - use crate::db::{keys, ser_repo_state}; 19 - use crate::types::{GaugeState, RepoState}; 11 + use jacquard_common::types::did::Did; 12 + use serde::Deserialize; 20 13 21 - pub fn router() -> Router<Arc<AppState>> { 14 + pub fn router() -> Router<Hydrant> { 22 15 Router::new() 23 16 .route("/repos", get(handle_get_repos)) 24 17 .route("/repos/{did}", get(handle_get_repo)) ··· 31 24 pub did: String, 32 25 } 33 26 34 - #[derive(Serialize, Debug)] 35 - pub struct RepoResponse { 36 - pub did: String, 37 - pub status: String, 38 - pub tracked: bool, 39 - #[serde(skip_serializing_if = "Option::is_none")] 40 - pub rev: Option<String>, 41 - #[serde(skip_serializing_if = "Option::is_none")] 42 - pub handle: Option<String>, 43 - #[serde(skip_serializing_if = "Option::is_none")] 44 - pub pds: Option<String>, 45 - // this does not have the did:key: prefix 46 - #[serde(skip_serializing_if = "Option::is_none")] 47 - pub signing_key: Option<String>, 48 - #[serde(skip_serializing_if = "Option::is_none")] 49 - pub last_updated_at: Option<DateTime<Utc>>, 50 - #[serde(skip_serializing_if = "Option::is_none")] 51 - pub last_message_at: Option<DateTime<Utc>>, 52 - } 53 - 54 27 #[derive(Deserialize)] 55 28 pub struct GetReposParams { 56 29 pub limit: Option<usize>, ··· 59 32 } 60 33 61 34 pub async fn handle_get_repos( 62 - State(state): State<Arc<AppState>>, 35 + State(hydrant): State<Hydrant>, 63 36 Query(params): Query<GetReposParams>, 64 37 ) -> Result<Response, (StatusCode, String)> { 65 38 let limit = params.limit.unwrap_or(100).min(1000); 66 39 let partition = params.partition.unwrap_or_else(|| "all".to_string()); 67 40 68 41 let items = tokio::task::spawn_blocking(move || { 69 - let db = &state.db; 42 + let db = &hydrant.state.db; 70 43 71 - let to_response = |k: &[u8], v: &[u8]| -> Result<RepoResponse, (StatusCode, String)> { 44 + let to_info = |k: &[u8], v: &[u8]| -> Result<RepoInfo, (StatusCode, String)> { 72 45 let repo_state = crate::db::deser_repo_state(v).map_err(internal)?; 73 46 let did = crate::db::types::TrimmedDid::try_from(k) 74 47 .map_err(internal)? 75 48 .to_did(); 76 49 77 - Ok(repo_state_to_response(did.to_string(), repo_state)) 50 + Ok(repo_state_to_info(did.to_string(), repo_state)) 78 51 }; 79 52 80 53 let results = match partition.as_str() { ··· 105 78 })? 106 79 }; 107 80 108 - items.push(to_response(&k, &repo_state_bytes)?); 81 + items.push(to_info(&k, &repo_state_bytes)?); 109 82 } 110 83 Ok::<_, (StatusCode, String)>(items) 111 84 } ··· 126 99 let (_, did_key) = item.into_inner().map_err(internal)?; 127 100 128 101 if let Ok(Some(v)) = db.repos.get(&did_key) { 129 - items.push(to_response(&did_key, &v)?); 102 + items.push(to_info(&did_key, &v)?); 130 103 } 131 104 } 132 105 Ok(items) ··· 153 126 } 154 127 155 128 pub async fn handle_get_repo( 156 - State(state): State<Arc<AppState>>, 129 + State(hydrant): State<Hydrant>, 157 130 Path(did_str): Path<String>, 158 - ) -> Result<Json<RepoResponse>, (StatusCode, String)> { 131 + ) -> Result<Json<RepoInfo>, (StatusCode, String)> { 159 132 let did = Did::new(&did_str).map_err(bad_request)?; 160 - let did_key = keys::repo_key(&did); 161 133 162 - let item = tokio::task::spawn_blocking(move || { 163 - let db = &state.db; 164 - 165 - let repo_bytes = db.repos.get(&did_key).map_err(internal)?; 166 - let repo_state = repo_bytes 167 - .as_deref() 168 - .map(crate::db::deser_repo_state) 169 - .transpose() 170 - .map_err(internal)?; 171 - 172 - Ok(repo_state.map(|s| repo_state_to_response(did_str, s))) 173 - }) 174 - .await 175 - .map_err(internal)??; 134 + let item = hydrant 135 + .repos 136 + .get(&did) 137 + .await 138 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 176 139 177 140 item.map(Json) 178 141 .ok_or_else(|| (StatusCode::NOT_FOUND, "repository not found".to_string())) 179 142 } 180 143 181 144 pub async fn handle_put_repos( 182 - State(state): State<Arc<AppState>>, 145 + State(hydrant): State<Hydrant>, 183 146 req: axum::extract::Request, 184 147 ) -> Result<StatusCode, (StatusCode, String)> { 185 148 let items = parse_body(req).await?; 186 149 187 - let state_task = state.clone(); 188 - let (new_repo_count, gauge_transitions) = tokio::task::spawn_blocking(move || { 189 - let db = &state_task.db; 190 - let mut batch = db.inner.batch(); 191 - let mut added = 0i64; 192 - let mut gauge_transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 193 - 194 - let mut rng = rand::rng(); 195 - 196 - for item in items { 197 - let did = Did::new(&item.did).map_err(bad_request)?; 198 - let did_key = keys::repo_key(&did); 199 - 200 - let repo_bytes = db.repos.get(&did_key).map_err(internal)?; 201 - let existing_state = repo_bytes 202 - .as_deref() 203 - .map(crate::db::deser_repo_state) 204 - .transpose() 205 - .map_err(internal)?; 206 - 207 - if let Some(mut repo_state) = existing_state { 208 - if !repo_state.tracked { 209 - let resync_bytes = db.resync.get(&did_key).map_err(internal)?; 210 - let old_gauge = 211 - crate::db::Db::repo_gauge_state(&repo_state, resync_bytes.as_deref()); 212 - 213 - repo_state.tracked = true; 214 - // re-enqueue into pending 215 - batch.insert( 216 - &db.repos, 217 - &did_key, 218 - ser_repo_state(&repo_state).map_err(internal)?, 219 - ); 220 - batch.insert( 221 - &db.pending, 222 - keys::pending_key(repo_state.index_id), 223 - &did_key, 224 - ); 225 - batch.remove(&db.resync, &did_key); 226 - gauge_transitions.push((old_gauge, GaugeState::Pending)); 227 - } 228 - } else { 229 - let repo_state = RepoState::backfilling(rng.next_u64()); 230 - batch.insert( 231 - &db.repos, 232 - &did_key, 233 - ser_repo_state(&repo_state).map_err(internal)?, 234 - ); 235 - batch.insert( 236 - &db.pending, 237 - keys::pending_key(repo_state.index_id), 238 - &did_key, 239 - ); 240 - added += 1; 241 - gauge_transitions.push((GaugeState::Synced, GaugeState::Pending)); // pseudo-transition to just inc pending 242 - } 243 - } 244 - 245 - batch.commit().into_diagnostic().map_err(internal)?; 246 - 247 - Ok::<_, (StatusCode, String)>((added, gauge_transitions)) 248 - }) 249 - .await 250 - .map_err(internal)??; 251 - 252 - if new_repo_count > 0 { 253 - state.db.update_count_async("repos", new_repo_count).await; 254 - } 255 - for (old, new) in gauge_transitions { 256 - state.db.update_gauge_diff_async(&old, &new).await; 257 - } 150 + let dids: Vec<Did<'static>> = items 151 + .into_iter() 152 + .filter_map(|item| Did::new_owned(&item.did).ok()) 153 + .collect(); 258 154 259 - // Always notify backfill if anything was added to pending! 260 - state.notify_backfill(); 155 + hydrant 156 + .repos 157 + .track(dids) 158 + .await 159 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 261 160 262 161 Ok(StatusCode::OK) 263 162 } 264 163 265 164 pub async fn handle_delete_repos( 266 - State(state): State<Arc<AppState>>, 165 + State(hydrant): State<Hydrant>, 267 166 req: axum::extract::Request, 268 167 ) -> Result<StatusCode, (StatusCode, String)> { 269 168 let items = parse_body(req).await?; 270 169 271 - let state_task = state.clone(); 272 - let (deleted_count, gauge_decrements) = tokio::task::spawn_blocking(move || { 273 - let db = &state_task.db; 274 - let mut batch = db.inner.batch(); 275 - // keeping this for later, unused for now 276 - let deleted_count = 0i64; 277 - let mut gauge_decrements = Vec::new(); 170 + let dids: Vec<Did<'static>> = items 171 + .into_iter() 172 + .filter_map(|item| Did::new_owned(&item.did).ok()) 173 + .collect(); 278 174 279 - for item in items { 280 - let did = Did::new(&item.did).map_err(bad_request)?; 281 - let did_key = keys::repo_key(&did); 282 - 283 - let repo_bytes = db.repos.get(&did_key).map_err(internal)?; 284 - let existing_state = repo_bytes 285 - .as_deref() 286 - .map(crate::db::deser_repo_state) 287 - .transpose() 288 - .map_err(internal)?; 289 - 290 - if let Some(repo_state) = existing_state { 291 - let resync_bytes = db.resync.get(&did_key).map_err(internal)?; 292 - let old_gauge = 293 - crate::db::Db::repo_gauge_state(&repo_state, resync_bytes.as_deref()); 294 - 295 - if repo_state.tracked { 296 - let mut repo_state = repo_state.into_static(); 297 - repo_state.tracked = false; 298 - batch.insert( 299 - &db.repos, 300 - &did_key, 301 - ser_repo_state(&repo_state).map_err(internal)?, 302 - ); 303 - batch.remove(&db.pending, keys::pending_key(repo_state.index_id)); 304 - batch.remove(&db.resync, &did_key); 305 - if old_gauge != GaugeState::Synced { 306 - gauge_decrements.push(old_gauge); 307 - } 308 - } 309 - } 310 - } 311 - 312 - batch.commit().into_diagnostic().map_err(internal)?; 313 - 314 - Ok::<_, (StatusCode, String)>((deleted_count, gauge_decrements)) 315 - }) 316 - .await 317 - .map_err(internal)??; 318 - 319 - if deleted_count > 0 { 320 - state.db.update_count_async("repos", -deleted_count).await; 321 - } 322 - for gauge in gauge_decrements { 323 - state 324 - .db 325 - .update_gauge_diff_async(&gauge, &GaugeState::Synced) 326 - .await; 327 - } 175 + hydrant 176 + .repos 177 + .untrack(dids) 178 + .await 179 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 328 180 329 181 Ok(StatusCode::OK) 330 - } 331 - 332 - fn repo_state_to_response(did: String, s: RepoState<'_>) -> RepoResponse { 333 - RepoResponse { 334 - did, 335 - status: s.status.to_string(), 336 - tracked: s.tracked, 337 - rev: s.rev.as_ref().map(|r| r.to_string()), 338 - handle: s.handle.map(|h| h.to_string()), 339 - pds: s.pds.map(|p| p.to_string()), 340 - signing_key: s.signing_key.map(|k| k.encode()), 341 - last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at), 342 - last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs), 343 - } 344 182 } 345 183 346 184 async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> {
+7 -56
src/api/stats.rs
··· 1 - use crate::api::AppState; 2 - use axum::{Json, extract::State, response::Result}; 3 - use serde::Serialize; 4 - use std::{collections::BTreeMap, sync::Arc}; 5 - 6 - #[derive(Serialize)] 7 - pub struct StatsResponse { 8 - pub counts: BTreeMap<&'static str, u64>, 9 - pub size: BTreeMap<&'static str, u64>, 10 - } 11 - 12 - pub async fn get_stats(State(state): State<Arc<AppState>>) -> Result<Json<StatsResponse>> { 13 - let db = state.db.clone(); 1 + use crate::control::Hydrant; 2 + use axum::{Json, extract::State, http::StatusCode, response::IntoResponse, response::Response}; 14 3 15 - let mut counts: BTreeMap<&'static str, u64> = futures::future::join_all( 16 - [ 17 - "repos", 18 - "pending", 19 - "resync", 20 - "records", 21 - "blocks", 22 - "error_ratelimited", 23 - "error_transport", 24 - "error_generic", 25 - ] 26 - .into_iter() 27 - .map(|name| { 28 - let db = db.clone(); 29 - async move { (name, db.get_count(name).await) } 30 - }), 31 - ) 32 - .await 33 - .into_iter() 34 - .collect(); 35 - // this should be accurate since we dont remove events 36 - // todo: ...unless in ephemeral mode 37 - counts.insert("events", db.events.approximate_len() as u64); 38 - 39 - let size = tokio::task::spawn_blocking(move || { 40 - let mut size = BTreeMap::new(); 41 - size.insert("repos", db.repos.disk_space()); 42 - size.insert("records", db.records.disk_space()); 43 - size.insert("blocks", db.blocks.disk_space()); 44 - size.insert("cursors", db.cursors.disk_space()); 45 - size.insert("pending", db.pending.disk_space()); 46 - size.insert("resync", db.resync.disk_space()); 47 - size.insert("resync_buffer", db.resync_buffer.disk_space()); 48 - size.insert("events", db.events.disk_space()); 49 - size.insert("counts", db.counts.disk_space()); 50 - size.insert("filter", db.filter.disk_space()); 51 - size.insert("crawler", db.crawler.disk_space()); 52 - size 53 - }) 54 - .await 55 - .map_err(|_| axum::http::StatusCode::INTERNAL_SERVER_ERROR)?; 56 - 57 - Ok(Json(StatsResponse { counts, size })) 4 + pub async fn get_stats(State(hydrant): State<Hydrant>) -> Response { 5 + match hydrant.stats().await { 6 + Ok(stats) => Json(stats).into_response(), 7 + Err(e) => (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(), 8 + } 58 9 }
+15 -235
src/api/stream.rs
··· 1 - use crate::api::AppState; 2 - use crate::db::keys; 3 - use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredData, StoredEvent}; 1 + use crate::control::Hydrant; 4 2 use axum::Router; 5 3 use axum::routing::get; 6 4 use axum::{ ··· 10 8 }, 11 9 response::IntoResponse, 12 10 }; 13 - use cid::multihash::Multihash; 14 - use jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}; 15 - use jacquard_common::{CowStr, RawData}; 16 - use jacquard_repo::DAG_CBOR_CID_CODEC; 17 - use miette::{Context, IntoDiagnostic}; 11 + use futures::StreamExt; 18 12 use serde::Deserialize; 19 - use sha2::{Digest, Sha256}; 20 - use std::sync::Arc; 21 - use tokio::sync::{broadcast, mpsc, oneshot}; 22 - use tracing::{error, info_span}; 13 + use tracing::error; 23 14 24 - pub fn router() -> Router<Arc<AppState>> { 15 + pub fn router() -> Router<Hydrant> { 25 16 Router::new().route("/", get(handle_stream)) 26 17 } 27 18 ··· 31 22 } 32 23 33 24 pub async fn handle_stream( 34 - State(state): State<Arc<AppState>>, 25 + State(hydrant): State<Hydrant>, 35 26 Query(query): Query<StreamQuery>, 36 27 ws: WebSocketUpgrade, 37 28 ) -> impl IntoResponse { 38 - ws.on_upgrade(move |socket| handle_socket(socket, state, query)) 29 + ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query)) 39 30 } 40 31 41 - async fn handle_socket(mut socket: WebSocket, state: Arc<AppState>, query: StreamQuery) { 42 - let (tx, mut rx) = mpsc::channel(500); 43 - let (cancel_tx, cancel_rx) = oneshot::channel::<()>(); 44 - 45 - let runtime = tokio::runtime::Handle::current(); 46 - let id = std::time::SystemTime::UNIX_EPOCH 47 - .elapsed() 48 - .unwrap() 49 - .as_secs(); 50 - 51 - let thread = std::thread::Builder::new() 52 - .name(format!("stream-handler-{id}")) 53 - .spawn(move || { 54 - let _runtime_guard = runtime.enter(); 55 - stream(state, cancel_rx, tx, query, id); 56 - }) 57 - .expect("failed to spawn stream handler thread"); 58 - 59 - while let Some(msg) = rx.recv().await { 60 - if let Err(e) = socket.send(msg).await { 61 - error!(err = %e, "failed to send ws message"); 62 - break; 63 - } 64 - } 65 - 66 - let _ = cancel_tx.send(()); 67 - if let Err(e) = thread.join() { 68 - error!(err = ?e, "stream handler thread panicked"); 69 - } 70 - } 71 - 72 - fn stream( 73 - state: Arc<AppState>, 74 - mut cancel: oneshot::Receiver<()>, 75 - tx: mpsc::Sender<Message>, 76 - query: StreamQuery, 77 - id: u64, 78 - ) { 79 - let db = &state.db; 80 - let mut event_rx = db.event_tx.subscribe(); 81 - let ks = db.events.clone(); 82 - let mut current_id = match query.cursor { 83 - Some(cursor) => cursor.saturating_sub(1), 84 - None => { 85 - let max_id = db.next_event_id.load(std::sync::atomic::Ordering::SeqCst); 86 - max_id.saturating_sub(1) 87 - } 88 - }; 89 - let runtime = tokio::runtime::Handle::current(); 90 - 91 - let span = info_span!("stream", id); 92 - let _entered_span = span.enter(); 93 - 94 - loop { 95 - // 1. catch up from DB 96 - loop { 97 - let mut found = false; 98 - for item in ks.range(keys::event_key(current_id + 1)..) { 99 - let (k, v) = match item.into_inner() { 100 - Ok((k, v)) => (k, v), 101 - Err(e) => { 102 - error!(err = %e, "failed to read event from db"); 103 - break; 104 - } 105 - }; 106 - let id = match k 107 - .as_ref() 108 - .try_into() 109 - .into_diagnostic() 110 - .wrap_err("expected event id to be 8 bytes") 111 - .map(u64::from_be_bytes) 112 - { 113 - Ok(id) => id, 114 - Err(e) => { 115 - error!(err = %e, "failed to parse event id"); 116 - continue; 117 - } 118 - }; 119 - current_id = id; 120 - 121 - let StoredEvent { 122 - live, 123 - did, 124 - rev, 125 - collection, 126 - rkey, 127 - action, 128 - data, 129 - } = match rmp_serde::from_slice(&v) { 130 - Ok(e) => e, 131 - Err(e) => { 132 - error!(err = %e, "failed to deserialize stored event"); 133 - continue; 134 - } 135 - }; 136 - 137 - let _entered = info_span!("record", data = ?data).entered(); 138 - 139 - let record = match data { 140 - StoredData::Ptr(cid) => { 141 - let block = db 142 - .blocks 143 - .get(&keys::block_key(collection.as_str(), &cid.to_bytes())); 144 - match block { 145 - Ok(Some(bytes)) => { 146 - match serde_ipld_dagcbor::from_slice::<RawData>(&bytes) { 147 - Ok(val) => Some(( 148 - cid, 149 - serde_json::to_value(val) 150 - .expect("that cbor raw data is valid json"), 151 - )), 152 - Err(e) => { 153 - error!(err = %e, "cant parse block, must be corrupted?"); 154 - return; 155 - } 156 - } 157 - } 158 - Ok(None) => { 159 - error!("block not found? this is a bug!!"); 160 - continue; 161 - } 162 - Err(e) => { 163 - error!(err = %e, "can't get block"); 164 - crate::db::check_poisoned(&e); 165 - return; 166 - } 167 - } 168 - } 169 - StoredData::Block(block) => { 170 - let digest = Sha256::digest(&block); 171 - let hash = 172 - Multihash::wrap(ATP_CID_HASH, &digest).expect("that its valid sha256"); 173 - let cid = IpldCid::new_v1(DAG_CBOR_CID_CODEC, hash); 174 - match serde_ipld_dagcbor::from_slice::<RawData>(&block) { 175 - Ok(val) => Some(( 176 - cid, 177 - serde_json::to_value(val) 178 - .expect("that cbor raw data is valid json"), 179 - )), 180 - Err(e) => { 181 - error!(err = %e, "cant parse block, must be corrupted?"); 182 - return; 183 - } 184 - } 185 - } 186 - StoredData::Nothing => None, 187 - }; 32 + async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: StreamQuery) { 33 + let mut stream = hydrant.subscribe(query.cursor); 188 34 189 - let (cid, record) = record 190 - .map(|(c, r)| (Some(c), Some(r))) 191 - .unwrap_or((None, None)); 192 - let marshallable = MarshallableEvt { 193 - id, 194 - event_type: "record".into(), 195 - record: Some(RecordEvt { 196 - live, 197 - did: did.to_did(), 198 - rev: CowStr::Owned(rev.to_tid().into()), 199 - collection, 200 - rkey: CowStr::Owned(rkey.to_smolstr().into()), 201 - action: CowStr::Borrowed(action.as_str()), 202 - record, 203 - cid: cid.map(|c| jacquard_common::types::cid::Cid::ipld(c).into()), 204 - }), 205 - identity: None, 206 - account: None, 207 - }; 208 - 209 - let json_str = match serde_json::to_string(&marshallable) { 210 - Ok(s) => s, 211 - Err(e) => { 212 - error!(err = %e, "failed to serialize ws event"); 213 - continue; 214 - } 215 - }; 216 - 217 - if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) { 218 - error!(err = %e, "failed to send ws message"); 219 - return; 35 + while let Some(evt) = stream.next().await { 36 + match serde_json::to_string(&evt) { 37 + Ok(json) => { 38 + if socket.send(Message::Text(json.into())).await.is_err() { 39 + break; 220 40 } 221 - 222 - found = true; 223 - } 224 - if !found { 225 - break; 226 - } 227 - } 228 - 229 - // 2. wait for live events 230 - let next_event = runtime.block_on(async { 231 - tokio::select! { 232 - res = event_rx.recv() => Some(res), 233 - _ = &mut cancel => None, 234 41 } 235 - }); 236 - 237 - let Some(next_event) = next_event else { 238 - break; 239 - }; 240 - 241 - match next_event { 242 - Ok(BroadcastEvent::Persisted(_)) => { 243 - // just wake up and run catch-up loop again 244 - } 245 - Ok(BroadcastEvent::Ephemeral(evt)) => { 246 - // send ephemeral event directly 247 - let json_str = match serde_json::to_string(&evt) { 248 - Ok(s) => s, 249 - Err(e) => { 250 - error!(err = %e, "failed to serialize ws event"); 251 - continue; 252 - } 253 - }; 254 - if let Err(e) = tx.blocking_send(Message::Text(json_str.into())) { 255 - error!(err = %e, "failed to send ws message"); 256 - return; 257 - } 258 - } 259 - Err(broadcast::error::RecvError::Lagged(_)) => { 260 - // continue to catch up 261 - } 262 - Err(broadcast::error::RecvError::Closed) => { 263 - break; 42 + Err(e) => { 43 + error!(err = %e, "failed to serialize event"); 264 44 } 265 45 } 266 46 }
+3 -2
src/api/xrpc.rs
··· 1 - use crate::api::AppState; 1 + use crate::control::Hydrant; 2 2 use crate::db::types::DbRkey; 3 3 use crate::db::{self, Db, keys}; 4 + use crate::state::AppState; 4 5 use axum::extract::FromRequest; 5 6 use axum::response::IntoResponse; 6 7 use axum::{Json, Router, extract::State, http::StatusCode}; ··· 27 28 use std::{fmt::Display, sync::Arc}; 28 29 use tokio::task::spawn_blocking; 29 30 30 - pub fn router() -> Router<Arc<AppState>> { 31 + pub fn router() -> Router<Hydrant> { 31 32 Router::new() 32 33 .route( 33 34 GetRecordRequest::PATH,
+28 -15
src/config.rs
··· 73 73 pub ephemeral_ttl: Duration, 74 74 pub cursor_save_interval: Duration, 75 75 pub repo_fetch_timeout: Duration, 76 - pub api_port: u16, 77 76 pub cache_size: u64, 78 77 pub backfill_concurrency_limit: usize, 79 78 pub data_compression: Compression, 80 79 pub journal_compression: Compression, 81 - pub debug_port: u16, 82 - pub enable_debug: bool, 83 80 pub verify_signatures: SignatureVerification, 84 81 pub identity_cache_size: u64, 85 82 pub enable_firehose: bool, ··· 167 164 let data_compression = cfg!("DATA_COMPRESSION", Compression::Lz4); 168 165 let journal_compression = cfg!("JOURNAL_COMPRESSION", Compression::Lz4); 169 166 170 - let api_port = cfg!("API_PORT", 3000u16); 171 - let enable_debug = cfg!("ENABLE_DEBUG", false); 172 - let debug_port: u16 = api_port + 1; 173 - let debug_port = cfg!("DEBUG_PORT", debug_port); 174 167 let verify_signatures = cfg!("VERIFY_SIGNATURES", SignatureVerification::Full); 175 168 let identity_cache_size = cfg!("IDENTITY_CACHE_SIZE", 1_000_000u64); 176 169 let enable_firehose = cfg!("ENABLE_FIREHOSE", true); ··· 245 238 full_network, 246 239 cursor_save_interval, 247 240 repo_fetch_timeout, 248 - api_port, 249 241 cache_size, 250 242 backfill_concurrency_limit, 251 243 data_compression, 252 244 journal_compression, 253 - debug_port, 254 - enable_debug, 255 245 verify_signatures, 256 246 identity_cache_size, 257 247 enable_firehose, ··· 272 262 } 273 263 } 274 264 265 + #[derive(Debug, Clone)] 266 + pub struct AppConfig { 267 + pub api_port: u16, 268 + pub enable_debug: bool, 269 + pub debug_port: u16, 270 + } 271 + 272 + impl AppConfig { 273 + pub fn from_env() -> Self { 274 + macro_rules! cfg { 275 + ($key:expr, $default:expr) => { 276 + std::env::var(concat!("HYDRANT_", $key)) 277 + .ok() 278 + .and_then(|s| s.parse().ok()) 279 + .unwrap_or($default) 280 + }; 281 + } 282 + let api_port = cfg!("API_PORT", 3000u16); 283 + let enable_debug = cfg!("ENABLE_DEBUG", false); 284 + let debug_port = cfg!("DEBUG_PORT", api_port + 1); 285 + Self { 286 + api_port, 287 + enable_debug, 288 + debug_port, 289 + } 290 + } 291 + } 292 + 275 293 macro_rules! config_line { 276 294 ($f:expr, $label:expr, $value:expr) => { 277 295 writeln!($f, " {:<width$}{}", $label, $value, width = LABEL_WIDTH) ··· 304 322 config_line!(f, "cache size", format_args!("{} mb", self.cache_size))?; 305 323 config_line!(f, "data compression", self.data_compression)?; 306 324 config_line!(f, "journal compression", self.journal_compression)?; 307 - config_line!(f, "api port", self.api_port)?; 308 325 config_line!(f, "firehose workers", self.firehose_workers)?; 309 326 config_line!(f, "db worker threads", self.db_worker_threads)?; 310 327 config_line!( ··· 346 363 } 347 364 if let Some(excludes) = &self.filter_excludes { 348 365 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?; 349 - } 350 - config_line!(f, "enable debug", self.enable_debug)?; 351 - if self.enable_debug { 352 - config_line!(f, "debug port", self.debug_port)?; 353 366 } 354 367 Ok(()) 355 368 }
+1251
src/control.rs
··· 1 + use std::collections::BTreeMap; 2 + use std::future::Future; 3 + use std::pin::Pin; 4 + use std::sync::Arc; 5 + use std::sync::atomic::{AtomicBool, Ordering}; 6 + use std::task::{Context, Poll}; 7 + 8 + use chrono::{DateTime, Utc}; 9 + use futures::{FutureExt, Stream}; 10 + use jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}; 11 + use jacquard_common::types::string::Did; 12 + use jacquard_common::{CowStr, IntoStatic, RawData}; 13 + use jacquard_repo::DAG_CBOR_CID_CODEC; 14 + use miette::{IntoDiagnostic, Result}; 15 + use rand::Rng; 16 + use sha2::{Digest, Sha256}; 17 + use tokio::sync::{mpsc, watch}; 18 + use tracing::{debug, error, info}; 19 + 20 + use crate::backfill::BackfillWorker; 21 + use crate::config::{Config, SignatureVerification}; 22 + use crate::crawler::Crawler; 23 + use crate::db::{self, filter as db_filter, keys, ser_repo_state}; 24 + use crate::filter::{FilterMode, SetUpdate}; 25 + use crate::ingest::{firehose::FirehoseIngestor, worker::FirehoseWorker}; 26 + use crate::state::AppState; 27 + use crate::types::{ 28 + BroadcastEvent, GaugeState, MarshallableEvt, RecordEvt, RepoState, StoredData, StoredEvent, 29 + }; 30 + 31 + /// an event emitted by the hydrant event stream. 32 + /// 33 + /// three variants are possible depending on the `type` field: 34 + /// - `"record"`: a repo record was created, updated, or deleted. carries a [`RecordEvt`]. 35 + /// - `"identity"`: a DID's handle or PDS changed. carries an [`IdentityEvt`]. ephemeral, not replayable. 36 + /// - `"account"`: a repo's active/inactive status changed. carries an [`AccountEvt`]. ephemeral, not replayable. 37 + /// 38 + /// the `id` field is a monotonically increasing sequence number usable as a cursor for [`Hydrant::subscribe`]. 39 + pub type Event = MarshallableEvt<'static>; 40 + 41 + /// the top-level handle to a hydrant instance. 42 + /// 43 + /// `Hydrant` is cheaply cloneable. all sub-handles share the same underlying state. 44 + /// construct it via [`Hydrant::new`] or [`Hydrant::from_env`], configure the filter 45 + /// and repos as needed, then call [`Hydrant::run`] to start all background components. 46 + /// 47 + /// # example 48 + /// 49 + /// ```rust,no_run 50 + /// use hydrant::control::Hydrant; 51 + /// 52 + /// #[tokio::main] 53 + /// async fn main() -> miette::Result<()> { 54 + /// let hydrant = Hydrant::from_env().await?; 55 + /// 56 + /// tokio::select! { 57 + /// r = hydrant.run() => r, 58 + /// r = hydrant.serve(3000) => r, 59 + /// } 60 + /// } 61 + /// ``` 62 + #[derive(Clone)] 63 + pub struct Hydrant { 64 + pub crawler: CrawlerHandle, 65 + pub firehose: FirehoseHandle, 66 + pub backfill: BackfillHandle, 67 + pub filter: FilterControl, 68 + pub repos: ReposControl, 69 + pub db: DbControl, 70 + pub(crate) state: Arc<AppState>, 71 + config: Arc<Config>, 72 + started: Arc<AtomicBool>, 73 + _priv: (), 74 + } 75 + 76 + impl Hydrant { 77 + /// open the database and configure hydrant from `config`. 78 + /// 79 + /// this sets up the database, applies any filter configuration from `config`, and 80 + /// initializes all sub-handles. no background tasks are started yet: call 81 + /// [`run`](Self::run) to start all components and drive the instance. 82 + pub async fn new(config: Config) -> Result<Self> { 83 + info!("{config}"); 84 + 85 + // 1. open database and construct AppState 86 + let state = AppState::new(&config)?; 87 + 88 + // 2. apply any filter config from env variables 89 + if config.full_network 90 + || config.filter_signals.is_some() 91 + || config.filter_collections.is_some() 92 + || config.filter_excludes.is_some() 93 + { 94 + let filter_ks = state.db.filter.clone(); 95 + let inner = state.db.inner.clone(); 96 + let mode = config.full_network.then_some(FilterMode::Full); 97 + let signals = config.filter_signals.clone().map(SetUpdate::Set); 98 + let collections = config.filter_collections.clone().map(SetUpdate::Set); 99 + let excludes = config.filter_excludes.clone().map(SetUpdate::Set); 100 + 101 + tokio::task::spawn_blocking(move || { 102 + let mut batch = inner.batch(); 103 + db_filter::apply_patch( 104 + &mut batch, 105 + &filter_ks, 106 + mode, 107 + signals, 108 + collections, 109 + excludes, 110 + )?; 111 + batch.commit().into_diagnostic() 112 + }) 113 + .await 114 + .into_diagnostic()??; 115 + 116 + // 3. reload the live filter into the hot-path arc-swap 117 + let new_filter = tokio::task::spawn_blocking({ 118 + let filter_ks = state.db.filter.clone(); 119 + move || db_filter::load(&filter_ks) 120 + }) 121 + .await 122 + .into_diagnostic()??; 123 + state.filter.store(Arc::new(new_filter)); 124 + } 125 + 126 + // 4. set crawler enabled state from config, evaluated against the post-patch filter 127 + let post_patch_crawler = match config.enable_crawler { 128 + Some(b) => b, 129 + None => state.filter.load().mode == FilterMode::Full, 130 + }; 131 + state.crawler_enabled.send_replace(post_patch_crawler); 132 + 133 + let state = Arc::new(state); 134 + 135 + Ok(Self { 136 + crawler: CrawlerHandle(state.clone()), 137 + firehose: FirehoseHandle(state.clone()), 138 + backfill: BackfillHandle(state.clone()), 139 + filter: FilterControl(state.clone()), 140 + repos: ReposControl(state.clone()), 141 + db: DbControl(state.clone()), 142 + state, 143 + config: Arc::new(config), 144 + started: Arc::new(AtomicBool::new(false)), 145 + _priv: (), 146 + }) 147 + } 148 + 149 + /// reads config from environment variables and calls [`Hydrant::new`]. 150 + pub async fn from_env() -> Result<Self> { 151 + Self::new(Config::from_env()?).await 152 + } 153 + 154 + /// start all background components and return a future that resolves when any 155 + /// fatal component exits. 156 + /// 157 + /// starts the backfill worker, firehose ingestors, crawler, and worker thread. 158 + /// resolves with `Ok(())` if a fatal component exits cleanly, or `Err(e)` if it 159 + /// fails. intended for use in `tokio::select!` alongside [`serve`](Self::serve). 160 + /// 161 + /// panics if called more than once on the same `Hydrant` instance. 162 + pub fn run(&self) -> impl Future<Output = Result<()>> { 163 + let state = self.state.clone(); 164 + let config = self.config.clone(); 165 + let started = self.started.clone(); 166 + 167 + async move { 168 + if started.swap(true, Ordering::SeqCst) { 169 + panic!("Hydrant::run() called more than once"); 170 + } 171 + 172 + // internal buffered channel between ingestors / backfill and the firehose worker 173 + let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 174 + 175 + // 5. spawn the backfill worker 176 + tokio::spawn({ 177 + let state = state.clone(); 178 + BackfillWorker::new( 179 + state.clone(), 180 + buffer_tx.clone(), 181 + config.repo_fetch_timeout, 182 + config.backfill_concurrency_limit, 183 + matches!( 184 + config.verify_signatures, 185 + SignatureVerification::Full | SignatureVerification::BackfillOnly 186 + ), 187 + config.ephemeral, 188 + state.backfill_enabled.subscribe(), 189 + ) 190 + .run() 191 + }); 192 + 193 + // 6. re-queue any repos that lost their backfill state, then start the retry worker 194 + if let Err(e) = tokio::task::spawn_blocking({ 195 + let state = state.clone(); 196 + move || crate::backfill::manager::queue_gone_backfills(&state) 197 + }) 198 + .await 199 + .into_diagnostic()? 200 + { 201 + error!(err = %e, "failed to queue gone backfills"); 202 + db::check_poisoned_report(&e); 203 + } 204 + 205 + std::thread::spawn({ 206 + let state = state.clone(); 207 + move || crate::backfill::manager::retry_worker(state) 208 + }); 209 + 210 + // 7. ephemeral GC thread 211 + if config.ephemeral { 212 + let state = state.clone(); 213 + std::thread::Builder::new() 214 + .name("ephemeral-gc".into()) 215 + .spawn(move || crate::db::ephemeral::ephemeral_ttl_worker(state)) 216 + .into_diagnostic()?; 217 + } 218 + 219 + // 8. cursor / counts persist thread 220 + std::thread::spawn({ 221 + let state = state.clone(); 222 + let persist_interval = config.cursor_save_interval; 223 + move || loop { 224 + std::thread::sleep(persist_interval); 225 + 226 + for (relay, cursor) in &state.relay_cursors { 227 + let seq = cursor.load(Ordering::SeqCst); 228 + if seq > 0 { 229 + if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) { 230 + error!(relay = %relay, err = %e, "failed to save cursor"); 231 + db::check_poisoned_report(&e); 232 + } 233 + } 234 + } 235 + 236 + if let Err(e) = db::persist_counts(&state.db) { 237 + error!(err = %e, "failed to persist counts"); 238 + db::check_poisoned_report(&e); 239 + } 240 + 241 + if let Err(e) = state.db.persist() { 242 + error!(err = %e, "db persist failed"); 243 + db::check_poisoned_report(&e); 244 + } 245 + } 246 + }); 247 + 248 + // 9. events/sec stats ticker 249 + tokio::spawn({ 250 + let state = state.clone(); 251 + let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 252 + let mut last_time = std::time::Instant::now(); 253 + let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 254 + async move { 255 + loop { 256 + interval.tick().await; 257 + 258 + let current_id = state.db.next_event_id.load(Ordering::Relaxed); 259 + let current_time = std::time::Instant::now(); 260 + let delta = current_id.saturating_sub(last_id); 261 + 262 + if delta == 0 { 263 + debug!("no new events in 60s"); 264 + continue; 265 + } 266 + 267 + let elapsed = current_time.duration_since(last_time).as_secs_f64(); 268 + let rate = if elapsed > 0.0 { 269 + delta as f64 / elapsed 270 + } else { 271 + 0.0 272 + }; 273 + info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)"); 274 + 275 + last_id = current_id; 276 + last_time = current_time; 277 + } 278 + } 279 + }); 280 + 281 + let (fatal_tx_inner, mut fatal_rx) = watch::channel(None); 282 + let fatal_tx = Arc::new(fatal_tx_inner); 283 + 284 + info!( 285 + crawler_enabled = *state.crawler_enabled.borrow(), 286 + firehose_enabled = *state.firehose_enabled.borrow(), 287 + filter_mode = ?state.filter.load().mode, 288 + "starting ingestion" 289 + ); 290 + 291 + // 10. spawn one firehose ingestor per relay (fatal tasks) 292 + let relay_hosts = config.relays.clone(); 293 + if !relay_hosts.is_empty() { 294 + info!( 295 + relay_count = relay_hosts.len(), 296 + hosts = relay_hosts 297 + .iter() 298 + .map(|h| h.as_str()) 299 + .collect::<Vec<_>>() 300 + .join(", "), 301 + "starting firehose ingestor(s)" 302 + ); 303 + for relay_url in &relay_hosts { 304 + let ingestor = FirehoseIngestor::new( 305 + state.clone(), 306 + buffer_tx.clone(), 307 + relay_url.clone(), 308 + state.filter.clone(), 309 + state.firehose_enabled.subscribe(), 310 + matches!(config.verify_signatures, SignatureVerification::Full), 311 + ); 312 + let tx = Arc::clone(&fatal_tx); 313 + tokio::spawn(async move { 314 + let result = ingestor.run().await; 315 + let _ = tx.send(Some(result.map_err(|e| e.to_string()))); 316 + }); 317 + } 318 + } 319 + 320 + // 11. spawn the crawler if we have relay hosts to crawl 321 + if !relay_hosts.is_empty() { 322 + let crawler_rx = state.crawler_enabled.subscribe(); 323 + info!( 324 + relay_count = relay_hosts.len(), 325 + hosts = relay_hosts 326 + .iter() 327 + .map(|h| h.as_str()) 328 + .collect::<Vec<_>>() 329 + .join(", "), 330 + enabled = *state.crawler_enabled.borrow(), 331 + "starting crawler(s)" 332 + ); 333 + let state = state.clone(); 334 + let max_pending = config.crawler_max_pending_repos; 335 + let resume_pending = config.crawler_resume_pending_repos; 336 + tokio::spawn(async move { 337 + let crawler = 338 + Crawler::new(state, relay_hosts, max_pending, resume_pending, crawler_rx); 339 + if let Err(e) = crawler.run().await { 340 + error!(err = %e, "crawler error"); 341 + db::check_poisoned_report(&e); 342 + } 343 + }); 344 + } 345 + 346 + // 12. spawn the firehose worker on a blocking thread (fatal task) 347 + let handle = tokio::runtime::Handle::current(); 348 + let firehose_worker = std::thread::spawn({ 349 + let state = state.clone(); 350 + move || { 351 + FirehoseWorker::new( 352 + state, 353 + buffer_rx, 354 + matches!(config.verify_signatures, SignatureVerification::Full), 355 + config.ephemeral, 356 + config.firehose_workers, 357 + ) 358 + .run(handle) 359 + } 360 + }); 361 + 362 + { 363 + let tx = Arc::clone(&fatal_tx); 364 + tokio::spawn( 365 + tokio::task::spawn_blocking(move || { 366 + firehose_worker 367 + .join() 368 + .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 369 + }) 370 + .map(move |r| { 371 + let result = r.into_diagnostic().flatten().flatten(); 372 + let _ = tx.send(Some(result.map_err(|e| e.to_string()))); 373 + }), 374 + ); 375 + } 376 + 377 + // drop the local fatal_tx so the watch channel is only kept alive by the 378 + // spawned tasks. when all fatal tasks exit (and drop their tx clones), 379 + // fatal_rx.changed() returns Err and we return Ok(()). 380 + drop(fatal_tx); 381 + 382 + loop { 383 + match fatal_rx.changed().await { 384 + Ok(()) => { 385 + if let Some(result) = fatal_rx.borrow().clone() { 386 + return result.map_err(|s| miette::miette!("{s}")); 387 + } 388 + } 389 + // all fatal_tx clones dropped: all tasks finished cleanly 390 + Err(_) => return Ok(()), 391 + } 392 + } 393 + } 394 + } 395 + 396 + /// subscribe to the ordered event stream. 397 + /// 398 + /// returns an [`EventStream`] that implements [`futures::Stream`]. 399 + /// 400 + /// - if `cursor` is `None`, streaming starts from the current head (live tail only). 401 + /// - if `cursor` is `Some(id)`, all persisted `record` events from that ID onward are 402 + /// replayed first, then live events follow seamlessly. 403 + /// 404 + /// `identity` and `account` events are ephemeral and are never replayed from a cursor - 405 + /// only live occurrences are delivered. use [`ReposControl::get`] to fetch current 406 + /// identity/account state for a specific DID. 407 + /// 408 + /// multiple concurrent subscribers each receive a full independent copy of the stream. 409 + /// the stream ends when the `EventStream` is dropped. 410 + pub fn subscribe(&self, cursor: Option<u64>) -> EventStream { 411 + let (tx, rx) = mpsc::channel(500); 412 + let state = self.state.clone(); 413 + let runtime = tokio::runtime::Handle::current(); 414 + 415 + std::thread::Builder::new() 416 + .name("hydrant-stream".into()) 417 + .spawn(move || { 418 + let _g = runtime.enter(); 419 + event_stream_thread(state, tx, cursor); 420 + }) 421 + .expect("failed to spawn stream thread"); 422 + 423 + EventStream(rx) 424 + } 425 + 426 + /// return database counts and on-disk sizes for all keyspaces. 427 + /// 428 + /// counts include: `repos`, `pending`, `resync`, `records`, `blocks`, `events`, 429 + /// `error_ratelimited`, `error_transport`, `error_generic`. 430 + /// 431 + /// sizes are in bytes, reported per keyspace. 432 + pub async fn stats(&self) -> Result<StatsResponse> { 433 + let db = self.state.db.clone(); 434 + 435 + let mut counts: BTreeMap<&'static str, u64> = futures::future::join_all( 436 + [ 437 + "repos", 438 + "pending", 439 + "resync", 440 + "records", 441 + "blocks", 442 + "error_ratelimited", 443 + "error_transport", 444 + "error_generic", 445 + ] 446 + .into_iter() 447 + .map(|name| { 448 + let db = db.clone(); 449 + async move { (name, db.get_count(name).await) } 450 + }), 451 + ) 452 + .await 453 + .into_iter() 454 + .collect(); 455 + 456 + counts.insert("events", db.events.approximate_len() as u64); 457 + 458 + let sizes = tokio::task::spawn_blocking(move || { 459 + let mut s = BTreeMap::new(); 460 + s.insert("repos", db.repos.disk_space()); 461 + s.insert("records", db.records.disk_space()); 462 + s.insert("blocks", db.blocks.disk_space()); 463 + s.insert("cursors", db.cursors.disk_space()); 464 + s.insert("pending", db.pending.disk_space()); 465 + s.insert("resync", db.resync.disk_space()); 466 + s.insert("resync_buffer", db.resync_buffer.disk_space()); 467 + s.insert("events", db.events.disk_space()); 468 + s.insert("counts", db.counts.disk_space()); 469 + s.insert("filter", db.filter.disk_space()); 470 + s.insert("crawler", db.crawler.disk_space()); 471 + s 472 + }) 473 + .await 474 + .into_diagnostic()?; 475 + 476 + Ok(StatsResponse { counts, sizes }) 477 + } 478 + 479 + /// returns a future that runs the HTTP management API server on `0.0.0.0:{port}`. 480 + /// 481 + /// the server exposes all management endpoints (`/filter`, `/repos`, `/ingestion`, 482 + /// `/stream`, `/stats`, `/db/*`, `/xrpc/*`). it runs indefinitely and resolves 483 + /// only on error. 484 + /// 485 + /// intended for `tokio::spawn` or inclusion in a `select!` / task list. the clone 486 + /// of `self` is deferred until the future is first polled. 487 + /// 488 + /// to disable the HTTP API entirely, simply don't call this method. 489 + pub fn serve(&self, port: u16) -> impl Future<Output = Result<()>> { 490 + let hydrant = self.clone(); 491 + async move { crate::api::serve(hydrant, port).await } 492 + } 493 + 494 + /// returns a future that runs the debug HTTP API server on `127.0.0.1:{port}`. 495 + /// 496 + /// exposes internal inspection endpoints (`/debug/get`, `/debug/iter`, etc.) 497 + /// that are not safe to expose publicly. binds only to loopback. 498 + pub fn serve_debug(&self, port: u16) -> impl Future<Output = Result<()>> { 499 + let state = self.state.clone(); 500 + async move { crate::api::serve_debug(state, port).await } 501 + } 502 + } 503 + 504 + impl axum::extract::FromRef<Hydrant> for Arc<AppState> { 505 + fn from_ref(h: &Hydrant) -> Self { 506 + h.state.clone() 507 + } 508 + } 509 + 510 + // --- event stream --- 511 + 512 + /// a stream of [`Event`]s. returned by [`Hydrant::subscribe`]. 513 + /// 514 + /// implements [`futures::Stream`] and can be used with `StreamExt::next`, 515 + /// `while let Some(evt) = stream.next().await`, `forward`, etc. 516 + /// the stream terminates when the underlying channel closes (i.e. hydrant shuts down). 517 + pub struct EventStream(mpsc::Receiver<Event>); 518 + 519 + impl Stream for EventStream { 520 + type Item = Event; 521 + 522 + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 523 + self.0.poll_recv(cx) 524 + } 525 + } 526 + 527 + // --- stats --- 528 + 529 + /// database statistics returned by [`Hydrant::stats`]. 530 + #[derive(serde::Serialize)] 531 + pub struct StatsResponse { 532 + /// record counts per logical category (repos, records, events, error kinds, etc.) 533 + pub counts: BTreeMap<&'static str, u64>, 534 + /// on-disk size in bytes per keyspace 535 + pub sizes: BTreeMap<&'static str, u64>, 536 + } 537 + 538 + // --- ingestion handles --- 539 + 540 + /// runtime control over the crawler component. 541 + /// 542 + /// the crawler walks `com.atproto.sync.listRepos` on each configured relay to discover 543 + /// repositories that have never emitted a firehose event. in `filter` mode it also 544 + /// checks each discovered repo against the configured signal collections before 545 + /// enqueuing it for backfill. 546 + /// 547 + /// disabling the crawler does not affect in-progress repo checks. each one completes 548 + /// its current PDS request before pausing. 549 + #[derive(Clone)] 550 + pub struct CrawlerHandle(Arc<AppState>); 551 + 552 + impl CrawlerHandle { 553 + /// enable the crawler. no-op if already enabled. 554 + pub fn enable(&self) { 555 + self.0.crawler_enabled.send_replace(true); 556 + } 557 + /// disable the crawler. in-progress repo checks finish before the crawler pauses. 558 + pub fn disable(&self) { 559 + self.0.crawler_enabled.send_replace(false); 560 + } 561 + /// returns the current enabled state of the crawler. 562 + pub fn is_enabled(&self) -> bool { 563 + *self.0.crawler_enabled.borrow() 564 + } 565 + } 566 + 567 + /// runtime control over the firehose ingestor component. 568 + /// 569 + /// the firehose connects to each configured relay's `com.atproto.sync.subscribeRepos` 570 + /// websocket and processes commit, identity, account, and sync events in real time. 571 + /// one independent connection is maintained per relay URL. 572 + /// 573 + /// disabling the firehose closes the websocket after the current message is processed. 574 + #[derive(Clone)] 575 + pub struct FirehoseHandle(Arc<AppState>); 576 + 577 + impl FirehoseHandle { 578 + /// enable the firehose. no-op if already enabled. 579 + pub fn enable(&self) { 580 + self.0.firehose_enabled.send_replace(true); 581 + } 582 + /// disable the firehose. the current message finishes processing before the connection closes. 583 + pub fn disable(&self) { 584 + self.0.firehose_enabled.send_replace(false); 585 + } 586 + /// returns the current enabled state of the firehose. 587 + pub fn is_enabled(&self) -> bool { 588 + *self.0.firehose_enabled.borrow() 589 + } 590 + } 591 + 592 + /// runtime control over the backfill worker component. 593 + /// 594 + /// the backfill worker fetches full repo CAR files from each repo's PDS for any 595 + /// repository in the pending queue, parses the MST, and inserts all matching records 596 + /// into the database. concurrency is bounded by `HYDRANT_BACKFILL_CONCURRENCY_LIMIT`. 597 + /// 598 + /// disabling backfill lets any in-flight repo fetches finish before pausing. 599 + #[derive(Clone)] 600 + pub struct BackfillHandle(Arc<AppState>); 601 + 602 + impl BackfillHandle { 603 + /// enable the backfill worker. no-op if already enabled. 604 + pub fn enable(&self) { 605 + self.0.backfill_enabled.send_replace(true); 606 + } 607 + /// disable the backfill worker. in-flight repo fetches complete before pausing. 608 + pub fn disable(&self) { 609 + self.0.backfill_enabled.send_replace(false); 610 + } 611 + /// returns the current enabled state of the backfill worker. 612 + pub fn is_enabled(&self) -> bool { 613 + *self.0.backfill_enabled.borrow() 614 + } 615 + } 616 + 617 + // --- filter control --- 618 + 619 + /// a point-in-time snapshot of the filter configuration. returned by all [`FilterControl`] methods. 620 + /// 621 + /// because the filter is stored in the database and loaded on demand, this snapshot 622 + /// may be stale if another caller modifies the filter concurrently. for the authoritative 623 + /// live config use [`FilterControl::get`]. 624 + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] 625 + pub struct FilterSnapshot { 626 + pub mode: FilterMode, 627 + pub signals: Vec<String>, 628 + pub collections: Vec<String>, 629 + pub excludes: Vec<String>, 630 + } 631 + 632 + /// runtime control over the indexing filter. 633 + /// 634 + /// the filter has two orthogonal axes: 635 + /// 636 + /// **mode** controls discovery: 637 + /// - [`FilterMode::Filter`]: only indexes repos whose firehose commits touch a collection 638 + /// matching a configured `signal`. explicit [`ReposControl::track`] always works regardless. 639 + /// - [`FilterMode::Full`]: indexes the entire network. `signals` are ignored for discovery 640 + /// but `collections` and `excludes` still apply. 641 + /// 642 + /// **sets** are each independently configurable: 643 + /// - `signals`: NSID patterns that trigger auto-discovery in `filter` mode (e.g. `app.bsky.feed.post`, `app.bsky.graph.*`) 644 + /// - `collections`: NSID patterns that filter which records are *stored*. empty means store all. 645 + /// - `excludes`: DIDs that are always skipped regardless of mode. 646 + /// 647 + /// NSID patterns support an optional `.*` suffix to match an entire namespace. 648 + /// all mutations are persisted to the database and take effect immediately. 649 + #[derive(Clone)] 650 + pub struct FilterControl(Arc<AppState>); 651 + 652 + impl FilterControl { 653 + /// return the current filter configuration from the database. 654 + pub async fn get(&self) -> Result<FilterSnapshot> { 655 + let filter_ks = self.0.db.filter.clone(); 656 + tokio::task::spawn_blocking(move || { 657 + let hot = db_filter::load(&filter_ks)?; 658 + let excludes = db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX)?; 659 + Ok(FilterSnapshot { 660 + mode: hot.mode, 661 + signals: hot.signals.iter().map(|s| s.to_string()).collect(), 662 + collections: hot.collections.iter().map(|s| s.to_string()).collect(), 663 + excludes, 664 + }) 665 + }) 666 + .await 667 + .into_diagnostic()? 668 + } 669 + 670 + /// set the indexing mode. see [`FilterControl`] for mode semantics. 671 + pub async fn set_mode(&self, mode: FilterMode) -> Result<FilterSnapshot> { 672 + self.patch(Some(mode), None, None, None).await 673 + } 674 + 675 + /// replace the entire signals set. existing signals are removed. 676 + pub async fn set_signals( 677 + &self, 678 + signals: impl IntoIterator<Item = impl Into<String>>, 679 + ) -> Result<FilterSnapshot> { 680 + self.patch( 681 + None, 682 + Some(SetUpdate::Set( 683 + signals.into_iter().map(Into::into).collect(), 684 + )), 685 + None, 686 + None, 687 + ) 688 + .await 689 + } 690 + 691 + /// add multiple signals without disturbing existing ones. 692 + pub async fn append_signals( 693 + &self, 694 + signals: impl IntoIterator<Item = impl Into<String>>, 695 + ) -> Result<FilterSnapshot> { 696 + self.patch( 697 + None, 698 + Some(SetUpdate::Patch( 699 + signals.into_iter().map(|s| (s.into(), true)).collect(), 700 + )), 701 + None, 702 + None, 703 + ) 704 + .await 705 + } 706 + 707 + /// add a single signal. no-op if already present. 708 + pub async fn add_signal(&self, signal: impl Into<String>) -> Result<FilterSnapshot> { 709 + self.patch( 710 + None, 711 + Some(SetUpdate::Patch([(signal.into(), true)].into())), 712 + None, 713 + None, 714 + ) 715 + .await 716 + } 717 + 718 + /// remove a single signal. no-op if not present. 719 + pub async fn remove_signal(&self, signal: impl Into<String>) -> Result<FilterSnapshot> { 720 + self.patch( 721 + None, 722 + Some(SetUpdate::Patch([(signal.into(), false)].into())), 723 + None, 724 + None, 725 + ) 726 + .await 727 + } 728 + 729 + /// replace the entire collections set. pass an empty iterator to store all collections. 730 + pub async fn set_collections( 731 + &self, 732 + collections: impl IntoIterator<Item = impl Into<String>>, 733 + ) -> Result<FilterSnapshot> { 734 + self.patch( 735 + None, 736 + None, 737 + Some(SetUpdate::Set( 738 + collections.into_iter().map(Into::into).collect(), 739 + )), 740 + None, 741 + ) 742 + .await 743 + } 744 + 745 + /// add multiple collections without disturbing existing ones. 746 + pub async fn append_collections( 747 + &self, 748 + collections: impl IntoIterator<Item = impl Into<String>>, 749 + ) -> Result<FilterSnapshot> { 750 + self.patch( 751 + None, 752 + None, 753 + Some(SetUpdate::Patch( 754 + collections.into_iter().map(|c| (c.into(), true)).collect(), 755 + )), 756 + None, 757 + ) 758 + .await 759 + } 760 + 761 + /// add a single collection filter. no-op if already present. 762 + pub async fn add_collection(&self, collection: impl Into<String>) -> Result<FilterSnapshot> { 763 + self.patch( 764 + None, 765 + None, 766 + Some(SetUpdate::Patch([(collection.into(), true)].into())), 767 + None, 768 + ) 769 + .await 770 + } 771 + 772 + /// remove a single collection filter. no-op if not present. 773 + pub async fn remove_collection(&self, collection: impl Into<String>) -> Result<FilterSnapshot> { 774 + self.patch( 775 + None, 776 + None, 777 + Some(SetUpdate::Patch([(collection.into(), false)].into())), 778 + None, 779 + ) 780 + .await 781 + } 782 + 783 + /// replace the entire excludes set. 784 + pub async fn set_excludes( 785 + &self, 786 + excludes: impl IntoIterator<Item = impl Into<String>>, 787 + ) -> Result<FilterSnapshot> { 788 + self.patch( 789 + None, 790 + None, 791 + None, 792 + Some(SetUpdate::Set( 793 + excludes.into_iter().map(Into::into).collect(), 794 + )), 795 + ) 796 + .await 797 + } 798 + 799 + /// add multiple DIDs to the excludes set without disturbing existing ones. 800 + pub async fn append_excludes( 801 + &self, 802 + excludes: impl IntoIterator<Item = impl Into<String>>, 803 + ) -> Result<FilterSnapshot> { 804 + self.patch( 805 + None, 806 + None, 807 + None, 808 + Some(SetUpdate::Patch( 809 + excludes.into_iter().map(|d| (d.into(), true)).collect(), 810 + )), 811 + ) 812 + .await 813 + } 814 + 815 + /// add a single DID to the excludes set. no-op if already excluded. 816 + pub async fn add_exclude(&self, did: impl Into<String>) -> Result<FilterSnapshot> { 817 + self.patch( 818 + None, 819 + None, 820 + None, 821 + Some(SetUpdate::Patch([(did.into(), true)].into())), 822 + ) 823 + .await 824 + } 825 + 826 + /// remove a single DID from the excludes set. no-op if not present. 827 + pub async fn remove_exclude(&self, did: impl Into<String>) -> Result<FilterSnapshot> { 828 + self.patch( 829 + None, 830 + None, 831 + None, 832 + Some(SetUpdate::Patch([(did.into(), false)].into())), 833 + ) 834 + .await 835 + } 836 + 837 + /// apply a batch patch atomically. all provided fields are updated in a single db transaction. 838 + /// returns the updated [`FilterSnapshot`]. this is the primitive all other `FilterControl` methods delegate to. 839 + pub async fn patch( 840 + &self, 841 + mode: Option<FilterMode>, 842 + signals: Option<SetUpdate>, 843 + collections: Option<SetUpdate>, 844 + excludes: Option<SetUpdate>, 845 + ) -> Result<FilterSnapshot> { 846 + let filter_ks = self.0.db.filter.clone(); 847 + let inner = self.0.db.inner.clone(); 848 + let filter_handle = self.0.filter.clone(); 849 + 850 + let new_filter = tokio::task::spawn_blocking(move || { 851 + let mut batch = inner.batch(); 852 + db_filter::apply_patch(&mut batch, &filter_ks, mode, signals, collections, excludes)?; 853 + batch.commit().into_diagnostic()?; 854 + db_filter::load(&filter_ks) 855 + }) 856 + .await 857 + .into_diagnostic()??; 858 + 859 + let excludes = { 860 + let filter_ks = self.0.db.filter.clone(); 861 + tokio::task::spawn_blocking(move || { 862 + db_filter::read_set(&filter_ks, db_filter::EXCLUDE_PREFIX) 863 + }) 864 + .await 865 + .into_diagnostic()?? 866 + }; 867 + 868 + let snapshot = FilterSnapshot { 869 + mode: new_filter.mode, 870 + signals: new_filter.signals.iter().map(|s| s.to_string()).collect(), 871 + collections: new_filter 872 + .collections 873 + .iter() 874 + .map(|s| s.to_string()) 875 + .collect(), 876 + excludes, 877 + }; 878 + 879 + filter_handle.store(Arc::new(new_filter)); 880 + Ok(snapshot) 881 + } 882 + } 883 + 884 + // --- repos control --- 885 + 886 + /// information about a tracked or known repository. returned by [`ReposControl`] methods. 887 + #[derive(Debug, Clone, serde::Serialize)] 888 + pub struct RepoInfo { 889 + pub did: String, 890 + pub status: String, 891 + pub tracked: bool, 892 + #[serde(skip_serializing_if = "Option::is_none")] 893 + pub rev: Option<String>, 894 + #[serde(skip_serializing_if = "Option::is_none")] 895 + pub handle: Option<String>, 896 + #[serde(skip_serializing_if = "Option::is_none")] 897 + pub pds: Option<String>, 898 + #[serde(skip_serializing_if = "Option::is_none")] 899 + pub signing_key: Option<String>, 900 + #[serde(skip_serializing_if = "Option::is_none")] 901 + pub last_updated_at: Option<DateTime<Utc>>, 902 + #[serde(skip_serializing_if = "Option::is_none")] 903 + pub last_message_at: Option<DateTime<Utc>>, 904 + } 905 + 906 + /// control over which repositories are tracked and access to their state. 907 + /// 908 + /// in `filter` mode, a repo is only indexed if it either matches a signal or is 909 + /// explicitly tracked via [`ReposControl::track`]. in `full` mode all repos are indexed 910 + /// and tracking is implicit. 911 + /// 912 + /// tracking a DID that hydrant has never seen enqueues an immediate backfill. 913 + /// tracking a DID that hydrant already knows about (but has marked untracked) 914 + /// re-enqueues it for backfill. 915 + #[derive(Clone)] 916 + pub struct ReposControl(Arc<AppState>); 917 + 918 + impl ReposControl { 919 + /// fetch the current state of a single repository. returns `None` if hydrant 920 + /// has never seen this DID. 921 + pub async fn get(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> { 922 + let did_key = keys::repo_key(did); 923 + let did_str = did.as_str().to_owned(); 924 + let db = self.0.db.clone(); 925 + 926 + tokio::task::spawn_blocking(move || { 927 + let bytes = db.repos.get(&did_key).into_diagnostic()?; 928 + let state = bytes.as_deref().map(db::deser_repo_state).transpose()?; 929 + Ok(state.map(|s| repo_state_to_info(did_str, s))) 930 + }) 931 + .await 932 + .into_diagnostic()? 933 + } 934 + 935 + /// explicitly track one or more repositories, enqueuing them for backfill if needed. 936 + /// 937 + /// - if a DID is new, a fresh [`RepoState`] is created and backfill is queued. 938 + /// - if a DID is already known but untracked, it is marked tracked and re-enqueued. 939 + /// - if a DID is already tracked, this is a no-op. 940 + pub async fn track(&self, dids: impl IntoIterator<Item = Did<'_>>) -> Result<()> { 941 + let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect(); 942 + let state = self.0.clone(); 943 + 944 + let (new_count, transitions) = tokio::task::spawn_blocking(move || { 945 + let db = &state.db; 946 + let mut batch = db.inner.batch(); 947 + let mut added = 0i64; 948 + let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 949 + let mut rng = rand::rng(); 950 + 951 + for did in &dids { 952 + let did_key = keys::repo_key(did); 953 + let repo_bytes = db.repos.get(&did_key).into_diagnostic()?; 954 + let existing = repo_bytes 955 + .as_deref() 956 + .map(db::deser_repo_state) 957 + .transpose()?; 958 + 959 + if let Some(mut repo_state) = existing { 960 + if !repo_state.tracked { 961 + let resync = db.resync.get(&did_key).into_diagnostic()?; 962 + let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref()); 963 + repo_state.tracked = true; 964 + batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?); 965 + batch.insert( 966 + &db.pending, 967 + keys::pending_key(repo_state.index_id), 968 + &did_key, 969 + ); 970 + batch.remove(&db.resync, &did_key); 971 + transitions.push((old, GaugeState::Pending)); 972 + } 973 + } else { 974 + let repo_state = RepoState::backfilling(rng.next_u64()); 975 + batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?); 976 + batch.insert( 977 + &db.pending, 978 + keys::pending_key(repo_state.index_id), 979 + &did_key, 980 + ); 981 + added += 1; 982 + transitions.push((GaugeState::Synced, GaugeState::Pending)); 983 + } 984 + } 985 + 986 + batch.commit().into_diagnostic()?; 987 + Ok::<_, miette::Report>((added, transitions)) 988 + }) 989 + .await 990 + .into_diagnostic()??; 991 + 992 + if new_count > 0 { 993 + self.0.db.update_count_async("repos", new_count).await; 994 + } 995 + for (old, new) in transitions { 996 + self.0.db.update_gauge_diff_async(&old, &new).await; 997 + } 998 + self.0.notify_backfill(); 999 + Ok(()) 1000 + } 1001 + 1002 + /// stop tracking one or more repositories. hydrant will stop processing new events 1003 + /// for them and remove them from the pending/resync queues, but existing indexed 1004 + /// records are **not** deleted. 1005 + pub async fn untrack(&self, dids: impl IntoIterator<Item = Did<'_>>) -> Result<()> { 1006 + let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect(); 1007 + let state = self.0.clone(); 1008 + 1009 + let gauge_decrements = tokio::task::spawn_blocking(move || { 1010 + let db = &state.db; 1011 + let mut batch = db.inner.batch(); 1012 + let mut gauge_decrements = Vec::new(); 1013 + 1014 + for did in &dids { 1015 + let did_key = keys::repo_key(did); 1016 + let repo_bytes = db.repos.get(&did_key).into_diagnostic()?; 1017 + let existing = repo_bytes 1018 + .as_deref() 1019 + .map(db::deser_repo_state) 1020 + .transpose()?; 1021 + 1022 + if let Some(repo_state) = existing { 1023 + if repo_state.tracked { 1024 + let resync = db.resync.get(&did_key).into_diagnostic()?; 1025 + let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref()); 1026 + let mut repo_state = repo_state.into_static(); 1027 + repo_state.tracked = false; 1028 + batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?); 1029 + batch.remove(&db.pending, keys::pending_key(repo_state.index_id)); 1030 + batch.remove(&db.resync, &did_key); 1031 + if old != GaugeState::Synced { 1032 + gauge_decrements.push(old); 1033 + } 1034 + } 1035 + } 1036 + } 1037 + 1038 + batch.commit().into_diagnostic()?; 1039 + Ok::<_, miette::Report>(gauge_decrements) 1040 + }) 1041 + .await 1042 + .into_diagnostic()??; 1043 + 1044 + for gauge in gauge_decrements { 1045 + self.0 1046 + .db 1047 + .update_gauge_diff_async(&gauge, &GaugeState::Synced) 1048 + .await; 1049 + } 1050 + Ok(()) 1051 + } 1052 + } 1053 + 1054 + pub fn repo_state_to_info(did: String, s: RepoState<'_>) -> RepoInfo { 1055 + RepoInfo { 1056 + did, 1057 + status: s.status.to_string(), 1058 + tracked: s.tracked, 1059 + rev: s.rev.as_ref().map(|r| r.to_string()), 1060 + handle: s.handle.map(|h| h.to_string()), 1061 + pds: s.pds.map(|p| p.to_string()), 1062 + signing_key: s.signing_key.map(|k| k.encode()), 1063 + last_updated_at: DateTime::from_timestamp_secs(s.last_updated_at), 1064 + last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs), 1065 + } 1066 + } 1067 + 1068 + // --- db control --- 1069 + 1070 + /// control over database maintenance operations. 1071 + /// 1072 + /// all methods pause the crawler, firehose, and backfill worker for the duration 1073 + /// of the operation and restore their prior state on completion, whether or not 1074 + /// the operation succeeds. 1075 + #[derive(Clone)] 1076 + pub struct DbControl(Arc<AppState>); 1077 + 1078 + impl DbControl { 1079 + /// trigger a major compaction of all keyspaces in parallel. 1080 + /// 1081 + /// compaction reclaims disk space from deleted/updated keys and improves 1082 + /// read performance. can take several minutes on large datasets. 1083 + pub async fn compact(&self) -> Result<()> { 1084 + let state = self.0.clone(); 1085 + state 1086 + .with_ingestion_paused(async || state.db.compact().await) 1087 + .await 1088 + } 1089 + 1090 + /// train zstd compression dictionaries for the `repos`, `blocks`, and `events` keyspaces. 1091 + /// 1092 + /// dictionaries are written to `dict_{name}.bin` files next to the database. 1093 + /// a restart is required to apply them. training samples data blocks from the 1094 + /// existing database, so the database must have a reasonable amount of data first. 1095 + pub async fn train_dicts(&self) -> Result<()> { 1096 + let state = self.0.clone(); 1097 + state 1098 + .with_ingestion_paused(async || { 1099 + let train = |name: &'static str| { 1100 + let db = state.db.clone(); 1101 + tokio::task::spawn_blocking(move || db.train_dict(name)) 1102 + .map(|res| res.into_diagnostic().flatten()) 1103 + }; 1104 + tokio::try_join!(train("repos"), train("blocks"), train("events")).map(|_| ()) 1105 + }) 1106 + .await 1107 + } 1108 + } 1109 + 1110 + // --- stream thread --- 1111 + 1112 + fn event_stream_thread(state: Arc<AppState>, tx: mpsc::Sender<Event>, cursor: Option<u64>) { 1113 + let db = &state.db; 1114 + let mut event_rx = db.event_tx.subscribe(); 1115 + let ks = db.events.clone(); 1116 + let mut current_id = match cursor { 1117 + Some(c) => c.saturating_sub(1), 1118 + None => db.next_event_id.load(Ordering::SeqCst).saturating_sub(1), 1119 + }; 1120 + 1121 + loop { 1122 + // catch up from db 1123 + loop { 1124 + let mut found = false; 1125 + for item in ks.range(keys::event_key(current_id + 1)..) { 1126 + let (k, v) = match item.into_inner() { 1127 + Ok(kv) => kv, 1128 + Err(e) => { 1129 + error!(err = %e, "failed to read event from db"); 1130 + break; 1131 + } 1132 + }; 1133 + 1134 + let id = match k.as_ref().try_into().map(u64::from_be_bytes) { 1135 + Ok(id) => id, 1136 + Err(_) => { 1137 + error!("failed to parse event id"); 1138 + continue; 1139 + } 1140 + }; 1141 + current_id = id; 1142 + 1143 + let stored: StoredEvent = match rmp_serde::from_slice(&v) { 1144 + Ok(e) => e, 1145 + Err(e) => { 1146 + error!(err = %e, "failed to deserialize stored event"); 1147 + continue; 1148 + } 1149 + }; 1150 + 1151 + let Some(evt) = stored_to_event(&state, id, stored) else { 1152 + continue; 1153 + }; 1154 + 1155 + if tx.blocking_send(evt).is_err() { 1156 + return; // receiver dropped 1157 + } 1158 + found = true; 1159 + } 1160 + if !found { 1161 + break; 1162 + } 1163 + } 1164 + 1165 + // wait for live events 1166 + match event_rx.blocking_recv() { 1167 + Ok(BroadcastEvent::Persisted(_)) => {} // re-run catch-up 1168 + Ok(BroadcastEvent::Ephemeral(evt)) => { 1169 + if tx.blocking_send(*evt).is_err() { 1170 + return; 1171 + } 1172 + } 1173 + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} 1174 + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 1175 + } 1176 + } 1177 + } 1178 + 1179 + fn stored_to_event(state: &AppState, id: u64, stored: StoredEvent<'_>) -> Option<Event> { 1180 + let StoredEvent { 1181 + live, 1182 + did, 1183 + rev, 1184 + collection, 1185 + rkey, 1186 + action, 1187 + data, 1188 + } = stored; 1189 + 1190 + let record = match data { 1191 + StoredData::Ptr(cid) => { 1192 + let block = state 1193 + .db 1194 + .blocks 1195 + .get(&keys::block_key(collection.as_str(), &cid.to_bytes())); 1196 + match block { 1197 + Ok(Some(bytes)) => match serde_ipld_dagcbor::from_slice::<RawData>(&bytes) { 1198 + Ok(val) => Some((cid, serde_json::to_value(val).ok()?)), 1199 + Err(e) => { 1200 + error!(err = %e, "cant parse block"); 1201 + return None; 1202 + } 1203 + }, 1204 + Ok(None) => { 1205 + error!("block not found, this is a bug"); 1206 + return None; 1207 + } 1208 + Err(e) => { 1209 + error!(err = %e, "cant get block"); 1210 + db::check_poisoned(&e); 1211 + return None; 1212 + } 1213 + } 1214 + } 1215 + StoredData::Block(block) => { 1216 + let digest = Sha256::digest(&block); 1217 + let hash = 1218 + cid::multihash::Multihash::wrap(ATP_CID_HASH, &digest).expect("valid sha256 hash"); 1219 + let cid = IpldCid::new_v1(DAG_CBOR_CID_CODEC, hash); 1220 + match serde_ipld_dagcbor::from_slice::<RawData>(&block) { 1221 + Ok(val) => Some((cid, serde_json::to_value(val).ok()?)), 1222 + Err(e) => { 1223 + error!(err = %e, "cant parse block"); 1224 + return None; 1225 + } 1226 + } 1227 + } 1228 + StoredData::Nothing => None, 1229 + }; 1230 + 1231 + let (cid, record) = record 1232 + .map(|(c, r)| (Some(c), Some(r))) 1233 + .unwrap_or((None, None)); 1234 + 1235 + Some(MarshallableEvt { 1236 + id, 1237 + event_type: "record".into(), 1238 + record: Some(RecordEvt { 1239 + live, 1240 + did: did.to_did(), 1241 + rev: CowStr::Owned(rev.to_tid().into()), 1242 + collection: CowStr::Owned(collection.as_ref().to_string().into()), 1243 + rkey: CowStr::Owned(rkey.to_smolstr().into()), 1244 + action: CowStr::Borrowed(action.as_str()), 1245 + record, 1246 + cid: cid.map(|c| jacquard_common::types::cid::Cid::ipld(c).into()), 1247 + }), 1248 + identity: None, 1249 + account: None, 1250 + }) 1251 + }
-8
src/db/keys.rs
··· 177 177 key.extend_from_slice(cid); 178 178 key 179 179 } 180 - 181 - // prefix format: {collection}| 182 - pub fn block_prefix_collection(collection: &str) -> Vec<u8> { 183 - let mut prefix = Vec::with_capacity(collection.len() + 1); 184 - prefix.extend_from_slice(collection.as_bytes()); 185 - prefix.push(SEP); 186 - prefix 187 - }
+11 -9
src/lib.rs
··· 1 - pub mod api; 2 - pub mod backfill; 3 1 pub mod config; 4 - pub mod crawler; 5 - pub mod db; 2 + pub mod control; 6 3 pub mod filter; 7 - pub mod ingest; 8 - pub mod ops; 9 - pub mod resolver; 10 - pub mod state; 11 4 pub mod types; 12 - pub mod util; 5 + 6 + pub(crate) mod api; 7 + pub(crate) mod backfill; 8 + pub(crate) mod crawler; 9 + pub(crate) mod db; 10 + pub(crate) mod ingest; 11 + pub(crate) mod ops; 12 + pub(crate) mod resolver; 13 + pub(crate) mod state; 14 + pub(crate) mod util;
+13 -277
src/main.rs
··· 1 - use futures::{FutureExt, future::BoxFuture}; 2 - use hydrant::config::{Config, SignatureVerification}; 3 - use hydrant::db; 4 - use hydrant::ingest::firehose::FirehoseIngestor; 5 - use hydrant::state::AppState; 6 - use hydrant::{api, backfill::BackfillWorker, ingest::worker::FirehoseWorker}; 7 - use miette::IntoDiagnostic; 1 + use hydrant::config::{AppConfig, Config}; 2 + use hydrant::control::Hydrant; 8 3 use mimalloc::MiMalloc; 9 - use std::sync::Arc; 10 - use std::sync::atomic::Ordering; 11 - use tokio::{sync::mpsc, task::spawn_blocking}; 12 - use tracing::{debug, error, info}; 13 4 14 5 #[global_allocator] 15 6 static GLOBAL: MiMalloc = MiMalloc; ··· 21 12 .ok(); 22 13 23 14 let cfg = Config::from_env()?; 15 + let app = AppConfig::from_env(); 24 16 25 17 let env_filter = tracing_subscriber::EnvFilter::builder() 26 18 .with_default_directive(tracing::Level::INFO.into()) 27 19 .from_env_lossy(); 28 20 tracing_subscriber::fmt().with_env_filter(env_filter).init(); 29 21 30 - info!("{cfg}"); 31 - 32 - let state = AppState::new(&cfg)?; 33 - 34 - if cfg.full_network 35 - || cfg.filter_signals.is_some() 36 - || cfg.filter_collections.is_some() 37 - || cfg.filter_excludes.is_some() 38 - { 39 - let filter_ks = state.db.filter.clone(); 40 - let inner = state.db.inner.clone(); 41 - let full_network = cfg.full_network; 42 - let signals = cfg.filter_signals.clone(); 43 - let collections = cfg.filter_collections.clone(); 44 - let excludes = cfg.filter_excludes.clone(); 45 - 46 - tokio::task::spawn_blocking(move || { 47 - use hydrant::filter::{FilterMode, SetUpdate}; 48 - let mut batch = inner.batch(); 49 - 50 - let mode = if full_network { 51 - Some(FilterMode::Full) 52 - } else { 53 - None 54 - }; 55 - 56 - let signals_update = signals.map(SetUpdate::Set); 57 - let collections_update = collections.map(SetUpdate::Set); 58 - let excludes_update = excludes.map(SetUpdate::Set); 59 - 60 - hydrant::db::filter::apply_patch( 61 - &mut batch, 62 - &filter_ks, 63 - mode, 64 - signals_update, 65 - collections_update, 66 - excludes_update, 67 - )?; 68 - 69 - batch.commit().into_diagnostic() 70 - }) 71 - .await 72 - .into_diagnostic()??; 73 - 74 - let new_filter = hydrant::db::filter::load(&state.db.filter)?; 75 - state.filter.store(new_filter.into()); 76 - } 77 - 78 - let (buffer_tx, buffer_rx) = mpsc::unbounded_channel(); 79 - let state = Arc::new(state); 80 - 81 - if cfg.ephemeral { 82 - let state = state.clone(); 83 - std::thread::Builder::new() 84 - .name("ephemeral-gc".into()) 85 - .spawn(move || db::ephemeral::ephemeral_ttl_worker(state)) 86 - .into_diagnostic()?; 87 - } 88 - 89 - tokio::spawn({ 90 - let state = state.clone(); 91 - let timeout = cfg.repo_fetch_timeout; 92 - BackfillWorker::new( 93 - state.clone(), 94 - buffer_tx.clone(), 95 - timeout, 96 - cfg.backfill_concurrency_limit, 97 - matches!( 98 - cfg.verify_signatures, 99 - SignatureVerification::Full | SignatureVerification::BackfillOnly 100 - ), 101 - cfg.ephemeral, 102 - state.backfill_enabled.subscribe(), 103 - ) 104 - .run() 105 - }); 106 - 107 - if let Err(e) = spawn_blocking({ 108 - let state = state.clone(); 109 - move || hydrant::backfill::manager::queue_gone_backfills(&state) 110 - }) 111 - .await 112 - .into_diagnostic()? 113 - { 114 - error!(err = %e, "failed to queue gone backfills"); 115 - db::check_poisoned_report(&e); 116 - } 117 - 118 - std::thread::spawn({ 119 - let state = state.clone(); 120 - move || hydrant::backfill::manager::retry_worker(state) 121 - }); 122 - 123 - tokio::spawn({ 124 - let state = state.clone(); 125 - let mut last_id = state.db.next_event_id.load(Ordering::Relaxed); 126 - let mut last_time = std::time::Instant::now(); 127 - let mut interval = tokio::time::interval(std::time::Duration::from_secs(60)); 128 - async move { 129 - loop { 130 - interval.tick().await; 131 - 132 - let current_id = state.db.next_event_id.load(Ordering::Relaxed); 133 - let current_time = std::time::Instant::now(); 134 - 135 - let delta = current_id.saturating_sub(last_id); 136 - if delta == 0 { 137 - debug!("no new events in 60s"); 138 - continue; 139 - } 140 - 141 - let elapsed = current_time.duration_since(last_time).as_secs_f64(); 142 - let rate = if elapsed > 0.0 { 143 - delta as f64 / elapsed 144 - } else { 145 - 0.0 146 - }; 147 - 148 - info!("{rate:.2} events/s ({delta} events in {elapsed:.1}s)"); 149 - 150 - last_id = current_id; 151 - last_time = current_time; 152 - } 153 - } 154 - }); 155 - 156 - std::thread::spawn({ 157 - let state = state.clone(); 158 - let persist_interval = cfg.cursor_save_interval; 159 - 160 - move || { 161 - loop { 162 - std::thread::sleep(persist_interval); 22 + let hydrant = Hydrant::new(cfg).await?; 163 23 164 - // persist firehose cursors 165 - for (relay, cursor) in &state.relay_cursors { 166 - let seq = cursor.load(Ordering::SeqCst); 167 - if seq > 0 { 168 - if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) { 169 - error!(relay = %relay, err = %e, "failed to save cursor"); 170 - db::check_poisoned_report(&e); 171 - } 172 - } 173 - } 174 - 175 - // persist counts 176 - // TODO: make this more durable 177 - if let Err(e) = db::persist_counts(&state.db) { 178 - error!(err = %e, "failed to persist counts"); 179 - db::check_poisoned_report(&e); 180 - } 181 - 182 - // persist journal 183 - if let Err(e) = state.db.persist() { 184 - error!(err = %e, "db persist failed"); 185 - db::check_poisoned_report(&e); 186 - } 187 - } 24 + if app.enable_debug { 25 + tokio::select! { 26 + r = hydrant.run() => r, 27 + r = hydrant.serve(app.api_port) => r, 28 + r = hydrant.serve_debug(app.debug_port) => r, 188 29 } 189 - }); 190 - 191 - let post_patch_crawler = match cfg.enable_crawler { 192 - Some(b) => b, 193 - None => state.filter.load().mode == hydrant::filter::FilterMode::Full, 194 - }; 195 - state.crawler_enabled.send_replace(post_patch_crawler); 196 - 197 - info!( 198 - crawler_enabled = *state.crawler_enabled.borrow(), 199 - firehose_enabled = *state.firehose_enabled.borrow(), 200 - filter_mode = ?state.filter.load().mode, 201 - "starting ingestion" 202 - ); 203 - 204 - let relay_hosts = cfg.relays.clone(); 205 - let crawler_max_pending = cfg.crawler_max_pending_repos; 206 - let crawler_resume_pending = cfg.crawler_resume_pending_repos; 207 - 208 - if !relay_hosts.is_empty() { 209 - let state_for_crawler = state.clone(); 210 - let crawler_rx = state.crawler_enabled.subscribe(); 211 - info!( 212 - relay_count = relay_hosts.len(), 213 - hosts = relay_hosts 214 - .iter() 215 - .map(|h| h.as_str()) 216 - .collect::<Vec<_>>() 217 - .join(", "), 218 - enabled = *state.crawler_enabled.borrow(), 219 - "starting crawler(s)" 220 - ); 221 - tokio::spawn(async move { 222 - let crawler = hydrant::crawler::Crawler::new( 223 - state_for_crawler, 224 - relay_hosts, 225 - crawler_max_pending, 226 - crawler_resume_pending, 227 - crawler_rx, 228 - ); 229 - if let Err(e) = crawler.run().await { 230 - error!(err = %e, "crawler error"); 231 - db::check_poisoned_report(&e); 232 - } 233 - }); 234 - } 235 - 236 - let firehose_worker = std::thread::spawn({ 237 - let state = state.clone(); 238 - let handle = tokio::runtime::Handle::current(); 239 - move || { 240 - FirehoseWorker::new( 241 - state, 242 - buffer_rx, 243 - matches!(cfg.verify_signatures, SignatureVerification::Full), 244 - cfg.ephemeral, 245 - cfg.firehose_workers, 246 - ) 247 - .run(handle) 30 + } else { 31 + tokio::select! { 32 + r = hydrant.run() => r, 33 + r = hydrant.serve(app.api_port) => r, 248 34 } 249 - }); 250 - 251 - let mut tasks: Vec<BoxFuture<miette::Result<()>>> = vec![Box::pin( 252 - tokio::task::spawn_blocking(move || { 253 - firehose_worker 254 - .join() 255 - .map_err(|e| miette::miette!("buffer processor died: {e:?}")) 256 - }) 257 - .map(|r| r.into_diagnostic().flatten().flatten()), 258 - )]; 259 - 260 - for relay_url in &cfg.relays { 261 - let ingestor = FirehoseIngestor::new( 262 - state.clone(), 263 - buffer_tx.clone(), 264 - relay_url.clone(), 265 - state.filter.clone(), 266 - state.firehose_enabled.subscribe(), 267 - matches!(cfg.verify_signatures, SignatureVerification::Full), 268 - ); 269 - tasks.push(Box::pin(ingestor.run())); 270 35 } 271 - 272 - let state_api = state.clone(); 273 - tasks.push(Box::pin(async move { 274 - api::serve(state_api, cfg.api_port) 275 - .await 276 - .map_err(|e| miette::miette!("API server failed: {e}")) 277 - }) as BoxFuture<_>); 278 - 279 - if cfg.enable_debug { 280 - let state_debug = state.clone(); 281 - tasks.push(Box::pin(async move { 282 - api::serve_debug(state_debug, cfg.debug_port) 283 - .await 284 - .map_err(|e| miette::miette!("debug server failed: {e}")) 285 - }) as BoxFuture<_>); 286 - } 287 - 288 - let res = futures::future::select_all(tasks); 289 - if let (Err(e), _, _) = res.await { 290 - error!(err = %e, "critical worker died"); 291 - db::check_poisoned_report(&e); 292 - } 293 - 294 - if let Err(e) = state.db.persist() { 295 - db::check_poisoned_report(&e); 296 - return Err(e); 297 - } 298 - 299 - Ok(()) 300 36 }