very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
at main 220 lines 7.0 kB view raw
1use crate::control::{Hydrant, RepoInfo, repo_state_to_info}; 2use crate::db::keys; 3use axum::{ 4 Json, Router, 5 body::Body, 6 extract::{Path, Query, State}, 7 http::{StatusCode, header}, 8 response::{IntoResponse, Response}, 9 routing::{delete, get, put}, 10}; 11use jacquard_common::types::did::Did; 12use serde::Deserialize; 13 14pub fn router() -> Router<Hydrant> { 15 Router::new() 16 .route("/repos", get(handle_get_repos)) 17 .route("/repos/{did}", get(handle_get_repo)) 18 .route("/repos", put(handle_put_repos)) 19 .route("/repos", delete(handle_delete_repos)) 20} 21 22#[derive(Deserialize, Debug)] 23pub struct RepoRequest { 24 pub did: String, 25} 26 27#[derive(Deserialize)] 28pub struct GetReposParams { 29 pub limit: Option<usize>, 30 pub cursor: Option<String>, 31 pub partition: Option<String>, 32} 33 34pub async fn handle_get_repos( 35 State(hydrant): State<Hydrant>, 36 Query(params): Query<GetReposParams>, 37) -> Result<Response, (StatusCode, String)> { 38 let limit = params.limit.unwrap_or(100).min(1000); 39 let partition = params.partition.unwrap_or_else(|| "all".to_string()); 40 41 let items = tokio::task::spawn_blocking(move || { 42 let db = &hydrant.state.db; 43 44 let to_info = |k: &[u8], v: &[u8]| -> Result<RepoInfo, (StatusCode, String)> { 45 let repo_state = crate::db::deser_repo_state(v).map_err(internal)?; 46 let did = crate::db::types::TrimmedDid::try_from(k) 47 .map_err(internal)? 48 .to_did(); 49 50 Ok(repo_state_to_info(did, repo_state)) 51 }; 52 53 let results = match partition.as_str() { 54 "all" | "resync" => { 55 let is_all = partition == "all"; 56 let ks = if is_all { &db.repos } else { &db.resync }; 57 58 let start_bound = if let Some(cursor) = params.cursor { 59 let did = Did::new_owned(&cursor).map_err(bad_request)?; 60 let did_key = keys::repo_key(&did); 61 std::ops::Bound::Excluded(did_key) 62 } else { 63 std::ops::Bound::Unbounded 64 }; 65 66 let mut items = Vec::new(); 67 for item in ks 68 .range((start_bound, std::ops::Bound::Unbounded)) 69 .take(limit) 70 { 71 let (k, v) = item.into_inner().map_err(internal)?; 72 73 let repo_state_bytes = if is_all { 74 v 75 } else { 76 db.repos.get(&k).map_err(internal)?.ok_or_else(|| { 77 internal(format!("repository state missing for {}", partition)) 78 })? 79 }; 80 81 items.push(to_info(&k, &repo_state_bytes)?); 82 } 83 Ok::<_, (StatusCode, String)>(items) 84 } 85 "pending" => { 86 let start_bound = if let Some(cursor) = params.cursor { 87 let id = cursor.parse::<u64>().map_err(bad_request)?; 88 std::ops::Bound::Excluded(id.to_be_bytes().to_vec()) 89 } else { 90 std::ops::Bound::Unbounded 91 }; 92 93 let mut items = Vec::new(); 94 for item in db 95 .pending 96 .range((start_bound, std::ops::Bound::Unbounded)) 97 .take(limit) 98 { 99 let (_, did_key) = item.into_inner().map_err(internal)?; 100 101 if let Ok(Some(v)) = db.repos.get(&did_key) { 102 items.push(to_info(&did_key, &v)?); 103 } 104 } 105 Ok(items) 106 } 107 _ => Err((StatusCode::BAD_REQUEST, "invalid partition".to_string())), 108 }?; 109 110 Ok::<_, (StatusCode, String)>(results) 111 }) 112 .await 113 .map_err(internal)??; 114 115 use futures::StreamExt; 116 117 let stream = futures::stream::iter(items.into_iter().map(|item| { 118 let json = serde_json::to_string(&item).ok()?; 119 Some(Ok::<_, std::io::Error>(format!("{json}\n"))) 120 })) 121 .filter_map(|x| futures::future::ready(x)); 122 123 let body = Body::from_stream(stream); 124 125 Ok(([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response()) 126} 127 128pub async fn handle_get_repo( 129 State(hydrant): State<Hydrant>, 130 Path(did_str): Path<String>, 131) -> Result<Json<RepoInfo>, (StatusCode, String)> { 132 let did = Did::new(&did_str).map_err(bad_request)?; 133 134 let item = hydrant 135 .repos 136 .info(&did) 137 .await 138 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 139 140 item.map(Json) 141 .ok_or_else(|| (StatusCode::NOT_FOUND, "repository not found".to_string())) 142} 143 144pub async fn handle_put_repos( 145 State(hydrant): State<Hydrant>, 146 req: axum::extract::Request, 147) -> Result<StatusCode, (StatusCode, String)> { 148 let items = parse_body(req).await?; 149 150 let dids: Vec<Did<'static>> = items 151 .into_iter() 152 .filter_map(|item| Did::new_owned(&item.did).ok()) 153 .collect(); 154 155 hydrant 156 .repos 157 .track(dids) 158 .await 159 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 160 161 Ok(StatusCode::OK) 162} 163 164pub async fn handle_delete_repos( 165 State(hydrant): State<Hydrant>, 166 req: axum::extract::Request, 167) -> Result<StatusCode, (StatusCode, String)> { 168 let items = parse_body(req).await?; 169 170 let dids: Vec<Did<'static>> = items 171 .into_iter() 172 .filter_map(|item| Did::new_owned(&item.did).ok()) 173 .collect(); 174 175 hydrant 176 .repos 177 .untrack(dids) 178 .await 179 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 180 181 Ok(StatusCode::OK) 182} 183 184async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> { 185 let content_type = req 186 .headers() 187 .get(header::CONTENT_TYPE) 188 .and_then(|h| h.to_str().ok()) 189 .unwrap_or("") 190 .to_string(); 191 192 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 193 .await 194 .map_err(bad_request)?; 195 196 let text = std::str::from_utf8(&body_bytes).map_err(bad_request)?; 197 let trimmed = text.trim(); 198 199 if content_type.contains("application/json") { 200 serde_json::from_str::<Vec<RepoRequest>>(trimmed) 201 .map_err(|e| bad_request(format!("invalid JSON array: {e}"))) 202 } else { 203 trimmed 204 .lines() 205 .filter(|l| !l.trim().is_empty()) 206 .map(|line| { 207 serde_json::from_str::<RepoRequest>(line) 208 .map_err(|e| bad_request(format!("invalid NDJSON line: {e}"))) 209 }) 210 .collect() 211 } 212} 213 214fn bad_request<E: std::fmt::Display>(err: E) -> (StatusCode, String) { 215 (StatusCode::BAD_REQUEST, err.to_string()) 216} 217 218fn internal<E: std::fmt::Display>(err: E) -> (StatusCode, String) { 219 (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) 220}