very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use crate::api::AppState;
2use crate::db::keys;
3use crate::types::{RepoState, ResyncState, StoredEvent};
4use axum::routing::{get, post};
5use axum::{
6 Json,
7 extract::{Query, State},
8 http::StatusCode,
9};
10use jacquard_common::types::cid::Cid;
11use jacquard_common::types::ident::AtIdentifier;
12use serde::{Deserialize, Serialize};
13use serde_json::Value;
14use std::str::FromStr;
15use std::sync::Arc;
16
17#[derive(Deserialize)]
18pub struct DebugCountRequest {
19 pub did: String,
20 pub collection: String,
21}
22
23#[derive(Serialize)]
24pub struct DebugCountResponse {
25 pub count: usize,
26}
27
28pub fn router() -> axum::Router<Arc<AppState>> {
29 axum::Router::new()
30 .route("/debug/count", get(handle_debug_count))
31 .route("/debug/get", get(handle_debug_get))
32 .route("/debug/iter", get(handle_debug_iter))
33 .route("/debug/compact", post(handle_debug_compact))
34 .route(
35 "/debug/ephemeral_ttl_tick",
36 post(handle_debug_ephemeral_ttl_tick),
37 )
38 .route("/debug/seed_watermark", post(handle_debug_seed_watermark))
39}
40
41pub async fn handle_debug_count(
42 State(state): State<Arc<AppState>>,
43 Query(req): Query<DebugCountRequest>,
44) -> Result<Json<DebugCountResponse>, StatusCode> {
45 let did = state
46 .resolver
47 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?)
48 .await
49 .map_err(|_| StatusCode::BAD_REQUEST)?;
50
51 let db = &state.db;
52 let ks = db.records.clone();
53
54 // {TrimmedDid}|{collection}|
55 let prefix = keys::record_prefix_collection(&did, &req.collection);
56
57 let count = tokio::task::spawn_blocking(move || {
58 let start_key = prefix.clone();
59 let mut end_key = prefix.clone();
60 if let Some(msg) = end_key.last_mut() {
61 *msg += 1;
62 }
63
64 ks.range(start_key..end_key).count()
65 })
66 .await
67 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
68
69 Ok(Json(DebugCountResponse { count }))
70}
71
72#[derive(Deserialize)]
73pub struct DebugGetRequest {
74 pub partition: String,
75 pub key: String,
76}
77
78#[derive(Serialize)]
79pub struct DebugGetResponse {
80 pub value: Option<Value>,
81}
82
83fn deserialize_value(partition: &str, value: &[u8]) -> Value {
84 match partition {
85 "repos" => {
86 if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) {
87 return serde_json::to_value(state).unwrap_or(Value::Null);
88 }
89 }
90 "resync" => {
91 if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) {
92 return serde_json::to_value(state).unwrap_or(Value::Null);
93 }
94 }
95 "events" => {
96 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) {
97 return serde_json::to_value(event).unwrap_or(Value::Null);
98 }
99 }
100 "records" => {
101 if let Ok(s) = String::from_utf8(value.to_vec()) {
102 match Cid::from_str(&s) {
103 Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)),
104 Err(_) => return Value::String(s),
105 }
106 }
107 }
108 "counts" | "cursors" => {
109 if let Ok(arr) = value.try_into() {
110 return Value::Number(u64::from_be_bytes(arr).into());
111 }
112 if let Ok(s) = String::from_utf8(value.to_vec()) {
113 return Value::String(s);
114 }
115 }
116 "blocks" => {
117 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) {
118 return val;
119 }
120 }
121 "pending" => return Value::Null,
122 _ => {}
123 }
124 Value::String(hex::encode(value))
125}
126
127pub async fn handle_debug_get(
128 State(state): State<Arc<AppState>>,
129 Query(req): Query<DebugGetRequest>,
130) -> Result<Json<DebugGetResponse>, StatusCode> {
131 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
132
133 let key = if req.partition == "events" {
134 let id = req
135 .key
136 .parse::<u64>()
137 .map_err(|_| StatusCode::BAD_REQUEST)?;
138 id.to_be_bytes().to_vec()
139 } else {
140 req.key.into_bytes()
141 };
142
143 let partition = req.partition.clone();
144 let value = crate::db::Db::get(ks, key)
145 .await
146 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
147 .map(|v| deserialize_value(&partition, &v));
148
149 Ok(Json(DebugGetResponse { value }))
150}
151
152#[derive(Deserialize)]
153pub struct DebugIterRequest {
154 pub partition: String,
155 pub start: Option<String>,
156 pub end: Option<String>,
157 pub limit: Option<usize>,
158 pub reverse: Option<bool>,
159}
160
161#[derive(Serialize)]
162pub struct DebugIterResponse {
163 pub items: Vec<(String, Value)>,
164}
165
166pub async fn handle_debug_iter(
167 State(state): State<Arc<AppState>>,
168 Query(req): Query<DebugIterRequest>,
169) -> Result<Json<DebugIterResponse>, StatusCode> {
170 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
171 let is_events = req.partition == "events";
172 let partition = req.partition.clone();
173
174 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> {
175 match s {
176 Some(s) => {
177 if is_events {
178 let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?;
179 Ok(Some(id.to_be_bytes().to_vec()))
180 } else {
181 Ok(Some(s.into_bytes()))
182 }
183 }
184 None => Ok(None),
185 }
186 };
187
188 let start = parse_bound(req.start)?;
189 let end = parse_bound(req.end)?;
190
191 let items = tokio::task::spawn_blocking(move || {
192 let limit = req.limit.unwrap_or(50);
193
194 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| {
195 let mut items = Vec::new();
196 for guard in iter.take(limit) {
197 let (k, v) = guard
198 .into_inner()
199 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
200
201 let key_str = if is_events {
202 if let Ok(arr) = k.as_ref().try_into() {
203 u64::from_be_bytes(arr).to_string()
204 } else {
205 "invalid_u64".to_string()
206 }
207 } else if partition == "blocks" {
208 // key is col|cid_bytes — show as "col|<cid_str>"
209 if let Some(sep) = k.iter().position(|&b| b == keys::SEP) {
210 let col = String::from_utf8_lossy(&k[..sep]);
211 match cid::Cid::read_bytes(&k[sep + 1..]) {
212 Ok(cid) => format!("{col}|{cid}"),
213 Err(_) => String::from_utf8_lossy(&k).into_owned(),
214 }
215 } else {
216 String::from_utf8_lossy(&k).into_owned()
217 }
218 } else {
219 String::from_utf8_lossy(&k).into_owned()
220 };
221
222 items.push((key_str, deserialize_value(&partition, &v)));
223 }
224 Ok::<_, StatusCode>(items)
225 };
226
227 let start_bound = if let Some(ref s) = start {
228 std::ops::Bound::Included(s.as_slice())
229 } else {
230 std::ops::Bound::Unbounded
231 };
232
233 let end_bound = if let Some(ref e) = end {
234 std::ops::Bound::Included(e.as_slice())
235 } else {
236 std::ops::Bound::Unbounded
237 };
238
239 if req.reverse == Some(true) {
240 collect(
241 &mut ks
242 .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>((
243 start_bound,
244 end_bound,
245 ))
246 .rev(),
247 )
248 } else {
249 collect(
250 &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>((
251 start_bound,
252 end_bound,
253 )),
254 )
255 }
256 })
257 .await
258 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??;
259
260 Ok(Json(DebugIterResponse { items }))
261}
262
263fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> {
264 match name {
265 "repos" => Ok(db.repos.clone()),
266 "blocks" => Ok(db.blocks.clone()),
267 "cursors" => Ok(db.cursors.clone()),
268 "pending" => Ok(db.pending.clone()),
269 "resync" => Ok(db.resync.clone()),
270 "events" => Ok(db.events.clone()),
271 "counts" => Ok(db.counts.clone()),
272 "records" => Ok(db.records.clone()),
273 _ => Err(StatusCode::BAD_REQUEST),
274 }
275}
276
277#[derive(Deserialize)]
278pub struct DebugCompactRequest {
279 pub partition: String,
280}
281
282pub async fn handle_debug_compact(
283 State(state): State<Arc<AppState>>,
284 Query(req): Query<DebugCompactRequest>,
285) -> Result<StatusCode, StatusCode> {
286 let ks = get_keyspace_by_name(&state.db, &req.partition)?;
287 let state_clone = state.clone();
288
289 tokio::task::spawn_blocking(move || {
290 let _ = ks.remove(b"dummy_tombstone123");
291 let _ = state_clone.db.persist();
292 let _ = ks.rotate_memtable_and_wait();
293 ks.major_compact()
294 })
295 .await
296 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
297 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
298
299 Ok(StatusCode::OK)
300}
301
302pub async fn handle_debug_ephemeral_ttl_tick(
303 State(state): State<Arc<AppState>>,
304) -> Result<StatusCode, StatusCode> {
305 tokio::task::spawn_blocking(move || {
306 crate::db::ephemeral::ephemeral_ttl_tick(&state.db, &state.ephemeral_ttl)
307 })
308 .await
309 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
310 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
311
312 Ok(StatusCode::OK)
313}
314
315#[derive(Deserialize)]
316pub struct DebugSeedWatermarkRequest {
317 /// unix timestamp (seconds) to write the watermark at
318 pub ts: u64,
319 /// event_id the watermark points to — all events before this id will be pruned
320 pub event_id: u64,
321}
322
323/// writes an event watermark entry directly to the cursors keyspace, using identical
324/// key/value encoding to the real TTL worker. used in tests to plant a past watermark
325/// so the real `ephemeral_ttl_tick` code path is exercised without waiting 3600 seconds.
326pub async fn handle_debug_seed_watermark(
327 State(state): State<Arc<AppState>>,
328 Query(req): Query<DebugSeedWatermarkRequest>,
329) -> Result<StatusCode, StatusCode> {
330 tokio::task::spawn_blocking(move || {
331 state
332 .db
333 .cursors
334 .insert(
335 crate::db::keys::event_watermark_key(req.ts),
336 req.event_id.to_be_bytes(),
337 )
338 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
339 })
340 .await
341 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??;
342
343 Ok(StatusCode::OK)
344}