very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use crate::control::Hydrant;
2use axum::{
3 Json, Router,
4 extract::State,
5 http::StatusCode,
6 routing::{get, patch},
7};
8use serde::{Deserialize, Serialize};
9
10pub fn router() -> Router<Hydrant> {
11 Router::new()
12 .route("/ingestion", get(get_ingestion))
13 .route("/ingestion", patch(patch_ingestion))
14}
15
16#[derive(Serialize)]
17pub struct IngestionStatus {
18 pub crawler: bool,
19 pub firehose: bool,
20 pub backfill: bool,
21}
22
23pub async fn get_ingestion(State(hydrant): State<Hydrant>) -> Json<IngestionStatus> {
24 Json(IngestionStatus {
25 crawler: hydrant.crawler.is_enabled(),
26 firehose: hydrant.firehose.is_enabled(),
27 backfill: hydrant.backfill.is_enabled(),
28 })
29}
30
31#[derive(Deserialize)]
32pub struct IngestionPatch {
33 #[serde(default)]
34 pub crawler: Option<bool>,
35 #[serde(default)]
36 pub firehose: Option<bool>,
37 #[serde(default)]
38 pub backfill: Option<bool>,
39}
40
41pub async fn patch_ingestion(
42 State(hydrant): State<Hydrant>,
43 Json(body): Json<IngestionPatch>,
44) -> StatusCode {
45 if let Some(crawler) = body.crawler {
46 if crawler {
47 hydrant.crawler.enable();
48 } else {
49 hydrant.crawler.disable();
50 }
51 }
52 if let Some(firehose) = body.firehose {
53 if firehose {
54 hydrant.firehose.enable();
55 } else {
56 hydrant.firehose.disable();
57 }
58 }
59 if let Some(backfill) = body.backfill {
60 if backfill {
61 hydrant.backfill.enable();
62 } else {
63 hydrant.backfill.disable();
64 }
65 }
66 StatusCode::OK
67}