very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use crate::control::{Hydrant, RepoInfo, repo_state_to_info};
2use crate::db::keys;
3use axum::{
4 Json, Router,
5 body::Body,
6 extract::{Path, Query, State},
7 http::{StatusCode, header},
8 response::{IntoResponse, Response},
9 routing::{delete, get, put},
10};
11use jacquard_common::types::did::Did;
12use serde::Deserialize;
13
14pub fn router() -> Router<Hydrant> {
15 Router::new()
16 .route("/repos", get(handle_get_repos))
17 .route("/repos/{did}", get(handle_get_repo))
18 .route("/repos", put(handle_put_repos))
19 .route("/repos", delete(handle_delete_repos))
20}
21
22#[derive(Deserialize, Debug)]
23pub struct RepoRequest {
24 pub did: String,
25}
26
27#[derive(Deserialize)]
28pub struct GetReposParams {
29 pub limit: Option<usize>,
30 pub cursor: Option<String>,
31 pub partition: Option<String>,
32}
33
34pub async fn handle_get_repos(
35 State(hydrant): State<Hydrant>,
36 Query(params): Query<GetReposParams>,
37) -> Result<Response, (StatusCode, String)> {
38 let limit = params.limit.unwrap_or(100).min(1000);
39 let partition = params.partition.unwrap_or_else(|| "all".to_string());
40
41 let items = tokio::task::spawn_blocking(move || {
42 let db = &hydrant.state.db;
43
44 let to_info = |k: &[u8], v: &[u8]| -> Result<RepoInfo, (StatusCode, String)> {
45 let repo_state = crate::db::deser_repo_state(v).map_err(internal)?;
46 let did = crate::db::types::TrimmedDid::try_from(k)
47 .map_err(internal)?
48 .to_did();
49
50 Ok(repo_state_to_info(did, repo_state))
51 };
52
53 let results = match partition.as_str() {
54 "all" | "resync" => {
55 let is_all = partition == "all";
56 let ks = if is_all { &db.repos } else { &db.resync };
57
58 let start_bound = if let Some(cursor) = params.cursor {
59 let did = Did::new_owned(&cursor).map_err(bad_request)?;
60 let did_key = keys::repo_key(&did);
61 std::ops::Bound::Excluded(did_key)
62 } else {
63 std::ops::Bound::Unbounded
64 };
65
66 let mut items = Vec::new();
67 for item in ks
68 .range((start_bound, std::ops::Bound::Unbounded))
69 .take(limit)
70 {
71 let (k, v) = item.into_inner().map_err(internal)?;
72
73 let repo_state_bytes = if is_all {
74 v
75 } else {
76 db.repos.get(&k).map_err(internal)?.ok_or_else(|| {
77 internal(format!("repository state missing for {}", partition))
78 })?
79 };
80
81 items.push(to_info(&k, &repo_state_bytes)?);
82 }
83 Ok::<_, (StatusCode, String)>(items)
84 }
85 "pending" => {
86 let start_bound = if let Some(cursor) = params.cursor {
87 let id = cursor.parse::<u64>().map_err(bad_request)?;
88 std::ops::Bound::Excluded(id.to_be_bytes().to_vec())
89 } else {
90 std::ops::Bound::Unbounded
91 };
92
93 let mut items = Vec::new();
94 for item in db
95 .pending
96 .range((start_bound, std::ops::Bound::Unbounded))
97 .take(limit)
98 {
99 let (_, did_key) = item.into_inner().map_err(internal)?;
100
101 if let Ok(Some(v)) = db.repos.get(&did_key) {
102 items.push(to_info(&did_key, &v)?);
103 }
104 }
105 Ok(items)
106 }
107 _ => Err((StatusCode::BAD_REQUEST, "invalid partition".to_string())),
108 }?;
109
110 Ok::<_, (StatusCode, String)>(results)
111 })
112 .await
113 .map_err(internal)??;
114
115 use futures::StreamExt;
116
117 let stream = futures::stream::iter(items.into_iter().map(|item| {
118 let json = serde_json::to_string(&item).ok()?;
119 Some(Ok::<_, std::io::Error>(format!("{json}\n")))
120 }))
121 .filter_map(|x| futures::future::ready(x));
122
123 let body = Body::from_stream(stream);
124
125 Ok(([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response())
126}
127
128pub async fn handle_get_repo(
129 State(hydrant): State<Hydrant>,
130 Path(did_str): Path<String>,
131) -> Result<Json<RepoInfo>, (StatusCode, String)> {
132 let did = Did::new(&did_str).map_err(bad_request)?;
133
134 let item = hydrant
135 .repos
136 .info(&did)
137 .await
138 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
139
140 item.map(Json)
141 .ok_or_else(|| (StatusCode::NOT_FOUND, "repository not found".to_string()))
142}
143
144pub async fn handle_put_repos(
145 State(hydrant): State<Hydrant>,
146 req: axum::extract::Request,
147) -> Result<StatusCode, (StatusCode, String)> {
148 let items = parse_body(req).await?;
149
150 let dids: Vec<Did<'static>> = items
151 .into_iter()
152 .filter_map(|item| Did::new_owned(&item.did).ok())
153 .collect();
154
155 hydrant
156 .repos
157 .track(dids)
158 .await
159 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
160
161 Ok(StatusCode::OK)
162}
163
164pub async fn handle_delete_repos(
165 State(hydrant): State<Hydrant>,
166 req: axum::extract::Request,
167) -> Result<StatusCode, (StatusCode, String)> {
168 let items = parse_body(req).await?;
169
170 let dids: Vec<Did<'static>> = items
171 .into_iter()
172 .filter_map(|item| Did::new_owned(&item.did).ok())
173 .collect();
174
175 hydrant
176 .repos
177 .untrack(dids)
178 .await
179 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
180
181 Ok(StatusCode::OK)
182}
183
184async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> {
185 let content_type = req
186 .headers()
187 .get(header::CONTENT_TYPE)
188 .and_then(|h| h.to_str().ok())
189 .unwrap_or("")
190 .to_string();
191
192 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
193 .await
194 .map_err(bad_request)?;
195
196 let text = std::str::from_utf8(&body_bytes).map_err(bad_request)?;
197 let trimmed = text.trim();
198
199 if content_type.contains("application/json") {
200 serde_json::from_str::<Vec<RepoRequest>>(trimmed)
201 .map_err(|e| bad_request(format!("invalid JSON array: {e}")))
202 } else {
203 trimmed
204 .lines()
205 .filter(|l| !l.trim().is_empty())
206 .map(|line| {
207 serde_json::from_str::<RepoRequest>(line)
208 .map_err(|e| bad_request(format!("invalid NDJSON line: {e}")))
209 })
210 .collect()
211 }
212}
213
214fn bad_request<E: std::fmt::Display>(err: E) -> (StatusCode, String) {
215 (StatusCode::BAD_REQUEST, err.to_string())
216}
217
218fn internal<E: std::fmt::Display>(err: E) -> (StatusCode, String) {
219 (StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
220}