very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
at main 47 lines 1.2 kB view raw
1use crate::control::Hydrant; 2use axum::Router; 3use axum::routing::get; 4use axum::{ 5 extract::{ 6 Query, State, 7 ws::{Message, WebSocket, WebSocketUpgrade}, 8 }, 9 response::IntoResponse, 10}; 11use futures::StreamExt; 12use serde::Deserialize; 13use tracing::error; 14 15pub fn router() -> Router<Hydrant> { 16 Router::new().route("/", get(handle_stream)) 17} 18 19#[derive(Deserialize)] 20pub struct StreamQuery { 21 pub cursor: Option<u64>, 22} 23 24pub async fn handle_stream( 25 State(hydrant): State<Hydrant>, 26 Query(query): Query<StreamQuery>, 27 ws: WebSocketUpgrade, 28) -> impl IntoResponse { 29 ws.on_upgrade(move |socket| handle_socket(socket, hydrant, query)) 30} 31 32async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: StreamQuery) { 33 let mut stream = hydrant.subscribe(query.cursor); 34 35 while let Some(evt) = stream.next().await { 36 match serde_json::to_string(&evt) { 37 Ok(json) => { 38 if socket.send(Message::Text(json.into())).await.is_err() { 39 break; 40 } 41 } 42 Err(e) => { 43 error!(err = %e, "failed to serialize event"); 44 } 45 } 46 } 47}