use crate::control::{Hydrant, RepoInfo, repo_state_to_info}; use crate::db::keys; use axum::{ Json, Router, body::Body, extract::{Path, Query, State}, http::{HeaderMap, StatusCode, header}, response::{IntoResponse, Response}, routing::{delete, get, post, put}, }; use jacquard_common::types::did::Did; use serde::Deserialize; pub fn router() -> Router { Router::new() .route("/repos", get(handle_get_repos)) .route("/repos/resync", post(handle_post_resync)) .route("/repos/{did}", get(handle_get_repo)) .route("/repos", put(handle_put_repos)) .route("/repos", delete(handle_delete_repos)) } #[derive(Deserialize, Debug)] pub struct RepoRequest { pub did: String, } #[derive(Deserialize)] pub struct GetReposParams { pub limit: Option, pub cursor: Option, pub partition: Option, } pub async fn handle_get_repos( State(hydrant): State, Query(params): Query, headers: HeaderMap, ) -> Result { let limit = params.limit.unwrap_or(100).min(1000); let partition = params.partition.unwrap_or_else(|| "all".to_string()); let items = tokio::task::spawn_blocking(move || { let db = &hydrant.state.db; let to_info = |k: &[u8], v: &[u8]| -> Result { let repo_state = crate::db::deser_repo_state(v).map_err(internal)?; let did = crate::db::types::TrimmedDid::try_from(k) .map_err(internal)? .to_did(); Ok(repo_state_to_info(did, repo_state)) }; let results = match partition.as_str() { "all" | "resync" => { let is_all = partition == "all"; let ks = if is_all { &db.repos } else { &db.resync }; let start_bound = if let Some(cursor) = params.cursor { let did = Did::new_owned(&cursor).map_err(bad_request)?; let did_key = keys::repo_key(&did); std::ops::Bound::Excluded(did_key) } else { std::ops::Bound::Unbounded }; let mut items = Vec::new(); for item in ks .range((start_bound, std::ops::Bound::Unbounded)) .take(limit) { let (k, v) = item.into_inner().map_err(internal)?; let repo_state_bytes = if is_all { v } else { db.repos.get(&k).map_err(internal)?.ok_or_else(|| { internal(format!("repository state missing for {}", partition)) })? }; items.push(to_info(&k, &repo_state_bytes)?); } Ok::<_, (StatusCode, String)>(items) } "pending" => { let start_bound = if let Some(cursor) = params.cursor { let id = cursor.parse::().map_err(bad_request)?; std::ops::Bound::Excluded(id.to_be_bytes().to_vec()) } else { std::ops::Bound::Unbounded }; let mut items = Vec::new(); for item in db .pending .range((start_bound, std::ops::Bound::Unbounded)) .take(limit) { let (_, did_key) = item.into_inner().map_err(internal)?; if let Ok(Some(v)) = db.repos.get(&did_key) { items.push(to_info(&did_key, &v)?); } } Ok(items) } _ => Err((StatusCode::BAD_REQUEST, "invalid partition".to_string())), }?; Ok::<_, (StatusCode, String)>(results) }) .await .map_err(internal)??; if prefers_json(&headers) { return Ok(Json(items).into_response()); } use futures::StreamExt; let stream = futures::stream::iter(items.into_iter().map(|item| { let json = serde_json::to_string(&item).ok()?; Some(Ok::<_, std::io::Error>(format!("{json}\n"))) })) .filter_map(|x| futures::future::ready(x)); let body = Body::from_stream(stream); Ok(([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response()) } pub async fn handle_get_repo( State(hydrant): State, Path(did_str): Path, ) -> Result, (StatusCode, String)> { let did = Did::new(&did_str).map_err(bad_request)?; let item = hydrant .repos .info(&did) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; item.map(Json) .ok_or_else(|| (StatusCode::NOT_FOUND, "repository not found".to_string())) } pub async fn handle_put_repos( State(hydrant): State, headers: HeaderMap, body: Body, ) -> Result { let items = parse_body(body, &headers).await?; let dids: Vec> = items .into_iter() .filter_map(|item| Did::new_owned(&item.did).ok()) .collect(); let queued = hydrant .repos .track(dids) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; Ok(did_list_response(queued, &headers)) } pub async fn handle_delete_repos( State(hydrant): State, headers: HeaderMap, body: Body, ) -> Result { let items = parse_body(body, &headers).await?; let dids: Vec> = items .into_iter() .filter_map(|item| Did::new_owned(&item.did).ok()) .collect(); let untracked = hydrant .repos .untrack(dids) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; Ok(did_list_response(untracked, &headers)) } pub async fn handle_post_resync( State(hydrant): State, headers: HeaderMap, body: Body, ) -> Result { let items = parse_body(body, &headers).await?; let dids: Vec> = items .into_iter() .filter_map(|item| Did::new_owned(&item.did).ok()) .collect(); let queued = hydrant .repos .resync(dids) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; Ok(did_list_response(queued, &headers)) } fn prefers_json(headers: &HeaderMap) -> bool { let contains_json = |h: axum::http::HeaderName| { headers .get(h) .and_then(|v| v.to_str().ok()) .is_some_and(|v| v.contains("application/json")) }; contains_json(header::ACCEPT) || contains_json(header::CONTENT_TYPE) } fn did_list_response(dids: Vec>, headers: &HeaderMap) -> Response { if prefers_json(headers) { let body: Vec = dids.into_iter().map(|d| d.to_string()).collect(); Json(body).into_response() } else { let body = dids .iter() .filter_map(|d| serde_json::to_string(&d.as_str()).ok()) .map(|s| format!("{s}\n")) .collect::(); ([(header::CONTENT_TYPE, "application/x-ndjson")], body).into_response() } } async fn parse_body( body: Body, headers: &HeaderMap, ) -> Result, (StatusCode, String)> { let content_type = headers .get(header::CONTENT_TYPE) .and_then(|h| h.to_str().ok()) .unwrap_or("") .to_string(); let body_bytes = axum::body::to_bytes(body, usize::MAX) .await .map_err(bad_request)?; let text = std::str::from_utf8(&body_bytes).map_err(bad_request)?; let trimmed = text.trim(); if content_type.contains("application/json") { serde_json::from_str::>(trimmed) .map_err(|e| bad_request(format!("invalid JSON array: {e}"))) } else { trimmed .lines() .filter(|l| !l.trim().is_empty()) .map(|line| { serde_json::from_str::(line) .map_err(|e| bad_request(format!("invalid NDJSON line: {e}"))) }) .collect() } } fn bad_request(err: E) -> (StatusCode, String) { (StatusCode::BAD_REQUEST, err.to_string()) } fn internal(err: E) -> (StatusCode, String) { (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()) }