very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
at main 67 lines 1.6 kB view raw
1use crate::control::Hydrant; 2use axum::{ 3 Json, Router, 4 extract::State, 5 http::StatusCode, 6 routing::{get, patch}, 7}; 8use serde::{Deserialize, Serialize}; 9 10pub fn router() -> Router<Hydrant> { 11 Router::new() 12 .route("/ingestion", get(get_ingestion)) 13 .route("/ingestion", patch(patch_ingestion)) 14} 15 16#[derive(Serialize)] 17pub struct IngestionStatus { 18 pub crawler: bool, 19 pub firehose: bool, 20 pub backfill: bool, 21} 22 23pub async fn get_ingestion(State(hydrant): State<Hydrant>) -> Json<IngestionStatus> { 24 Json(IngestionStatus { 25 crawler: hydrant.crawler.is_enabled(), 26 firehose: hydrant.firehose.is_enabled(), 27 backfill: hydrant.backfill.is_enabled(), 28 }) 29} 30 31#[derive(Deserialize)] 32pub struct IngestionPatch { 33 #[serde(default)] 34 pub crawler: Option<bool>, 35 #[serde(default)] 36 pub firehose: Option<bool>, 37 #[serde(default)] 38 pub backfill: Option<bool>, 39} 40 41pub async fn patch_ingestion( 42 State(hydrant): State<Hydrant>, 43 Json(body): Json<IngestionPatch>, 44) -> StatusCode { 45 if let Some(crawler) = body.crawler { 46 if crawler { 47 hydrant.crawler.enable(); 48 } else { 49 hydrant.crawler.disable(); 50 } 51 } 52 if let Some(firehose) = body.firehose { 53 if firehose { 54 hydrant.firehose.enable(); 55 } else { 56 hydrant.firehose.disable(); 57 } 58 } 59 if let Some(backfill) = body.backfill { 60 if backfill { 61 hydrant.backfill.enable(); 62 } else { 63 hydrant.backfill.disable(); 64 } 65 } 66 StatusCode::OK 67}