very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
at main 344 lines 11 kB view raw
1use crate::api::AppState; 2use crate::db::keys; 3use crate::types::{RepoState, ResyncState, StoredEvent}; 4use axum::routing::{get, post}; 5use axum::{ 6 Json, 7 extract::{Query, State}, 8 http::StatusCode, 9}; 10use jacquard_common::types::cid::Cid; 11use jacquard_common::types::ident::AtIdentifier; 12use serde::{Deserialize, Serialize}; 13use serde_json::Value; 14use std::str::FromStr; 15use std::sync::Arc; 16 17#[derive(Deserialize)] 18pub struct DebugCountRequest { 19 pub did: String, 20 pub collection: String, 21} 22 23#[derive(Serialize)] 24pub struct DebugCountResponse { 25 pub count: usize, 26} 27 28pub fn router() -> axum::Router<Arc<AppState>> { 29 axum::Router::new() 30 .route("/debug/count", get(handle_debug_count)) 31 .route("/debug/get", get(handle_debug_get)) 32 .route("/debug/iter", get(handle_debug_iter)) 33 .route("/debug/compact", post(handle_debug_compact)) 34 .route( 35 "/debug/ephemeral_ttl_tick", 36 post(handle_debug_ephemeral_ttl_tick), 37 ) 38 .route("/debug/seed_watermark", post(handle_debug_seed_watermark)) 39} 40 41pub async fn handle_debug_count( 42 State(state): State<Arc<AppState>>, 43 Query(req): Query<DebugCountRequest>, 44) -> Result<Json<DebugCountResponse>, StatusCode> { 45 let did = state 46 .resolver 47 .resolve_did(&AtIdentifier::new(req.did.as_str()).map_err(|_| StatusCode::BAD_REQUEST)?) 48 .await 49 .map_err(|_| StatusCode::BAD_REQUEST)?; 50 51 let db = &state.db; 52 let ks = db.records.clone(); 53 54 // {TrimmedDid}|{collection}| 55 let prefix = keys::record_prefix_collection(&did, &req.collection); 56 57 let count = tokio::task::spawn_blocking(move || { 58 let start_key = prefix.clone(); 59 let mut end_key = prefix.clone(); 60 if let Some(msg) = end_key.last_mut() { 61 *msg += 1; 62 } 63 64 ks.range(start_key..end_key).count() 65 }) 66 .await 67 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 68 69 Ok(Json(DebugCountResponse { count })) 70} 71 72#[derive(Deserialize)] 73pub struct DebugGetRequest { 74 pub partition: String, 75 pub key: String, 76} 77 78#[derive(Serialize)] 79pub struct DebugGetResponse { 80 pub value: Option<Value>, 81} 82 83fn deserialize_value(partition: &str, value: &[u8]) -> Value { 84 match partition { 85 "repos" => { 86 if let Ok(state) = rmp_serde::from_slice::<RepoState>(value) { 87 return serde_json::to_value(state).unwrap_or(Value::Null); 88 } 89 } 90 "resync" => { 91 if let Ok(state) = rmp_serde::from_slice::<ResyncState>(value) { 92 return serde_json::to_value(state).unwrap_or(Value::Null); 93 } 94 } 95 "events" => { 96 if let Ok(event) = rmp_serde::from_slice::<StoredEvent>(value) { 97 return serde_json::to_value(event).unwrap_or(Value::Null); 98 } 99 } 100 "records" => { 101 if let Ok(s) = String::from_utf8(value.to_vec()) { 102 match Cid::from_str(&s) { 103 Ok(cid) => return serde_json::to_value(cid).unwrap_or(Value::String(s)), 104 Err(_) => return Value::String(s), 105 } 106 } 107 } 108 "counts" | "cursors" => { 109 if let Ok(arr) = value.try_into() { 110 return Value::Number(u64::from_be_bytes(arr).into()); 111 } 112 if let Ok(s) = String::from_utf8(value.to_vec()) { 113 return Value::String(s); 114 } 115 } 116 "blocks" => { 117 if let Ok(val) = serde_ipld_dagcbor::from_slice::<Value>(value) { 118 return val; 119 } 120 } 121 "pending" => return Value::Null, 122 _ => {} 123 } 124 Value::String(hex::encode(value)) 125} 126 127pub async fn handle_debug_get( 128 State(state): State<Arc<AppState>>, 129 Query(req): Query<DebugGetRequest>, 130) -> Result<Json<DebugGetResponse>, StatusCode> { 131 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 132 133 let key = if req.partition == "events" { 134 let id = req 135 .key 136 .parse::<u64>() 137 .map_err(|_| StatusCode::BAD_REQUEST)?; 138 id.to_be_bytes().to_vec() 139 } else { 140 req.key.into_bytes() 141 }; 142 143 let partition = req.partition.clone(); 144 let value = crate::db::Db::get(ks, key) 145 .await 146 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 147 .map(|v| deserialize_value(&partition, &v)); 148 149 Ok(Json(DebugGetResponse { value })) 150} 151 152#[derive(Deserialize)] 153pub struct DebugIterRequest { 154 pub partition: String, 155 pub start: Option<String>, 156 pub end: Option<String>, 157 pub limit: Option<usize>, 158 pub reverse: Option<bool>, 159} 160 161#[derive(Serialize)] 162pub struct DebugIterResponse { 163 pub items: Vec<(String, Value)>, 164} 165 166pub async fn handle_debug_iter( 167 State(state): State<Arc<AppState>>, 168 Query(req): Query<DebugIterRequest>, 169) -> Result<Json<DebugIterResponse>, StatusCode> { 170 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 171 let is_events = req.partition == "events"; 172 let partition = req.partition.clone(); 173 174 let parse_bound = |s: Option<String>| -> Result<Option<Vec<u8>>, StatusCode> { 175 match s { 176 Some(s) => { 177 if is_events { 178 let id = s.parse::<u64>().map_err(|_| StatusCode::BAD_REQUEST)?; 179 Ok(Some(id.to_be_bytes().to_vec())) 180 } else { 181 Ok(Some(s.into_bytes())) 182 } 183 } 184 None => Ok(None), 185 } 186 }; 187 188 let start = parse_bound(req.start)?; 189 let end = parse_bound(req.end)?; 190 191 let items = tokio::task::spawn_blocking(move || { 192 let limit = req.limit.unwrap_or(50); 193 194 let collect = |iter: &mut dyn Iterator<Item = fjall::Guard>| { 195 let mut items = Vec::new(); 196 for guard in iter.take(limit) { 197 let (k, v) = guard 198 .into_inner() 199 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 200 201 let key_str = if is_events { 202 if let Ok(arr) = k.as_ref().try_into() { 203 u64::from_be_bytes(arr).to_string() 204 } else { 205 "invalid_u64".to_string() 206 } 207 } else if partition == "blocks" { 208 // key is col|cid_bytes — show as "col|<cid_str>" 209 if let Some(sep) = k.iter().position(|&b| b == keys::SEP) { 210 let col = String::from_utf8_lossy(&k[..sep]); 211 match cid::Cid::read_bytes(&k[sep + 1..]) { 212 Ok(cid) => format!("{col}|{cid}"), 213 Err(_) => String::from_utf8_lossy(&k).into_owned(), 214 } 215 } else { 216 String::from_utf8_lossy(&k).into_owned() 217 } 218 } else { 219 String::from_utf8_lossy(&k).into_owned() 220 }; 221 222 items.push((key_str, deserialize_value(&partition, &v))); 223 } 224 Ok::<_, StatusCode>(items) 225 }; 226 227 let start_bound = if let Some(ref s) = start { 228 std::ops::Bound::Included(s.as_slice()) 229 } else { 230 std::ops::Bound::Unbounded 231 }; 232 233 let end_bound = if let Some(ref e) = end { 234 std::ops::Bound::Included(e.as_slice()) 235 } else { 236 std::ops::Bound::Unbounded 237 }; 238 239 if req.reverse == Some(true) { 240 collect( 241 &mut ks 242 .range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 243 start_bound, 244 end_bound, 245 )) 246 .rev(), 247 ) 248 } else { 249 collect( 250 &mut ks.range::<&[u8], (std::ops::Bound<&[u8]>, std::ops::Bound<&[u8]>)>(( 251 start_bound, 252 end_bound, 253 )), 254 ) 255 } 256 }) 257 .await 258 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??; 259 260 Ok(Json(DebugIterResponse { items })) 261} 262 263fn get_keyspace_by_name(db: &crate::db::Db, name: &str) -> Result<fjall::Keyspace, StatusCode> { 264 match name { 265 "repos" => Ok(db.repos.clone()), 266 "blocks" => Ok(db.blocks.clone()), 267 "cursors" => Ok(db.cursors.clone()), 268 "pending" => Ok(db.pending.clone()), 269 "resync" => Ok(db.resync.clone()), 270 "events" => Ok(db.events.clone()), 271 "counts" => Ok(db.counts.clone()), 272 "records" => Ok(db.records.clone()), 273 _ => Err(StatusCode::BAD_REQUEST), 274 } 275} 276 277#[derive(Deserialize)] 278pub struct DebugCompactRequest { 279 pub partition: String, 280} 281 282pub async fn handle_debug_compact( 283 State(state): State<Arc<AppState>>, 284 Query(req): Query<DebugCompactRequest>, 285) -> Result<StatusCode, StatusCode> { 286 let ks = get_keyspace_by_name(&state.db, &req.partition)?; 287 let state_clone = state.clone(); 288 289 tokio::task::spawn_blocking(move || { 290 let _ = ks.remove(b"dummy_tombstone123"); 291 let _ = state_clone.db.persist(); 292 let _ = ks.rotate_memtable_and_wait(); 293 ks.major_compact() 294 }) 295 .await 296 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 297 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 298 299 Ok(StatusCode::OK) 300} 301 302pub async fn handle_debug_ephemeral_ttl_tick( 303 State(state): State<Arc<AppState>>, 304) -> Result<StatusCode, StatusCode> { 305 tokio::task::spawn_blocking(move || { 306 crate::db::ephemeral::ephemeral_ttl_tick(&state.db, &state.ephemeral_ttl) 307 }) 308 .await 309 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 310 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 311 312 Ok(StatusCode::OK) 313} 314 315#[derive(Deserialize)] 316pub struct DebugSeedWatermarkRequest { 317 /// unix timestamp (seconds) to write the watermark at 318 pub ts: u64, 319 /// event_id the watermark points to — all events before this id will be pruned 320 pub event_id: u64, 321} 322 323/// writes an event watermark entry directly to the cursors keyspace, using identical 324/// key/value encoding to the real TTL worker. used in tests to plant a past watermark 325/// so the real `ephemeral_ttl_tick` code path is exercised without waiting 3600 seconds. 326pub async fn handle_debug_seed_watermark( 327 State(state): State<Arc<AppState>>, 328 Query(req): Query<DebugSeedWatermarkRequest>, 329) -> Result<StatusCode, StatusCode> { 330 tokio::task::spawn_blocking(move || { 331 state 332 .db 333 .cursors 334 .insert( 335 crate::db::keys::event_watermark_key(req.ts), 336 req.event_id.to_be_bytes(), 337 ) 338 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) 339 }) 340 .await 341 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)??; 342 343 Ok(StatusCode::OK) 344}