very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use crate::control::Hydrant;
2use axum::Router;
3use axum::routing::get;
4use axum::{
5 extract::{
6 Query, State,
7 ws::{Message, WebSocket, WebSocketUpgrade},
8 },
9 response::IntoResponse,
10};
11use futures::StreamExt;
12use serde::Deserialize;
13use tracing::error;
14
15pub fn router() -> Router<Hydrant> {
16 Router::new().route("/", get(handle_stream))
17}
18
19#[derive(Deserialize)]
20pub struct StreamQuery {
21 pub cursor: Option<u64>,
22}
23
24pub async fn handle_stream(
25 State(hydrant): State<Hydrant>,
26 Query(query): Query<StreamQuery>,
27 ws: WebSocketUpgrade,
28) -> impl IntoResponse {
29 ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query))
30}
31
32async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: StreamQuery) {
33 let mut stream = hydrant.subscribe(query.cursor);
34
35 while let Some(evt) = stream.next().await {
36 match serde_json::to_string(&evt) {
37 Ok(json) => {
38 if socket.send(Message::Text(json.into())).await.is_err() {
39 break;
40 }
41 }
42 Err(e) => {
43 error!(err = %e, "failed to serialize event");
44 }
45 }
46 }
47}