I've been saying "PDSes seem easy enough, they're what, some CRUD to a db? I can do that in my sleep". well i'm sleeping rn so let's go
at main 6.3 kB view raw
1use crate::state::AppState; 2use axum::{ 3 Json, 4 extract::{Query, State}, 5 http::StatusCode, 6 response::{IntoResponse, Response}, 7}; 8use cid::Cid; 9use jacquard_repo::commit::Commit; 10use jacquard_repo::storage::BlockStore; 11use serde::{Deserialize, Serialize}; 12use serde_json::json; 13use std::str::FromStr; 14use tracing::error; 15 16async fn get_rev_from_commit(state: &AppState, cid_str: &str) -> Option<String> { 17 let cid = Cid::from_str(cid_str).ok()?; 18 let block = state.block_store.get(&cid).await.ok()??; 19 let commit = Commit::from_cbor(&block).ok()?; 20 Some(commit.rev().to_string()) 21} 22 23#[derive(Deserialize)] 24pub struct GetLatestCommitParams { 25 pub did: String, 26} 27 28#[derive(Serialize)] 29pub struct GetLatestCommitOutput { 30 pub cid: String, 31 pub rev: String, 32} 33 34pub async fn get_latest_commit( 35 State(state): State<AppState>, 36 Query(params): Query<GetLatestCommitParams>, 37) -> Response { 38 let did = params.did.trim(); 39 if did.is_empty() { 40 return ( 41 StatusCode::BAD_REQUEST, 42 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 43 ) 44 .into_response(); 45 } 46 let result = sqlx::query!( 47 r#" 48 SELECT r.repo_root_cid 49 FROM repos r 50 JOIN users u ON r.user_id = u.id 51 WHERE u.did = $1 52 "#, 53 did 54 ) 55 .fetch_optional(&state.db) 56 .await; 57 match result { 58 Ok(Some(row)) => { 59 let rev = get_rev_from_commit(&state, &row.repo_root_cid) 60 .await 61 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().to_string()); 62 ( 63 StatusCode::OK, 64 Json(GetLatestCommitOutput { 65 cid: row.repo_root_cid, 66 rev, 67 }), 68 ) 69 .into_response() 70 } 71 Ok(None) => ( 72 StatusCode::NOT_FOUND, 73 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 74 ) 75 .into_response(), 76 Err(e) => { 77 error!("DB error in get_latest_commit: {:?}", e); 78 ( 79 StatusCode::INTERNAL_SERVER_ERROR, 80 Json(json!({"error": "InternalError"})), 81 ) 82 .into_response() 83 } 84 } 85} 86 87#[derive(Deserialize)] 88pub struct ListReposParams { 89 pub limit: Option<i64>, 90 pub cursor: Option<String>, 91} 92 93#[derive(Serialize)] 94#[serde(rename_all = "camelCase")] 95pub struct RepoInfo { 96 pub did: String, 97 pub head: String, 98 pub rev: String, 99 pub active: bool, 100} 101 102#[derive(Serialize)] 103pub struct ListReposOutput { 104 #[serde(skip_serializing_if = "Option::is_none")] 105 pub cursor: Option<String>, 106 pub repos: Vec<RepoInfo>, 107} 108 109pub async fn list_repos( 110 State(state): State<AppState>, 111 Query(params): Query<ListReposParams>, 112) -> Response { 113 let limit = params.limit.unwrap_or(50).clamp(1, 1000); 114 let cursor_did = params.cursor.as_deref().unwrap_or(""); 115 let result = sqlx::query!( 116 r#" 117 SELECT u.did, r.repo_root_cid 118 FROM repos r 119 JOIN users u ON r.user_id = u.id 120 WHERE u.did > $1 121 ORDER BY u.did ASC 122 LIMIT $2 123 "#, 124 cursor_did, 125 limit + 1 126 ) 127 .fetch_all(&state.db) 128 .await; 129 match result { 130 Ok(rows) => { 131 let has_more = rows.len() as i64 > limit; 132 let mut repos: Vec<RepoInfo> = Vec::new(); 133 for row in rows.iter().take(limit as usize) { 134 let rev = get_rev_from_commit(&state, &row.repo_root_cid) 135 .await 136 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().to_string()); 137 repos.push(RepoInfo { 138 did: row.did.clone(), 139 head: row.repo_root_cid.clone(), 140 rev, 141 active: true, 142 }); 143 } 144 let next_cursor = if has_more { 145 repos.last().map(|r| r.did.clone()) 146 } else { 147 None 148 }; 149 ( 150 StatusCode::OK, 151 Json(ListReposOutput { 152 cursor: next_cursor, 153 repos, 154 }), 155 ) 156 .into_response() 157 } 158 Err(e) => { 159 error!("DB error in list_repos: {:?}", e); 160 ( 161 StatusCode::INTERNAL_SERVER_ERROR, 162 Json(json!({"error": "InternalError"})), 163 ) 164 .into_response() 165 } 166 } 167} 168 169#[derive(Deserialize)] 170pub struct GetRepoStatusParams { 171 pub did: String, 172} 173 174#[derive(Serialize)] 175pub struct GetRepoStatusOutput { 176 pub did: String, 177 pub active: bool, 178 pub rev: Option<String>, 179} 180 181pub async fn get_repo_status( 182 State(state): State<AppState>, 183 Query(params): Query<GetRepoStatusParams>, 184) -> Response { 185 let did = params.did.trim(); 186 if did.is_empty() { 187 return ( 188 StatusCode::BAD_REQUEST, 189 Json(json!({"error": "InvalidRequest", "message": "did is required"})), 190 ) 191 .into_response(); 192 } 193 let result = sqlx::query!( 194 r#" 195 SELECT u.did, r.repo_root_cid 196 FROM users u 197 LEFT JOIN repos r ON u.id = r.user_id 198 WHERE u.did = $1 199 "#, 200 did 201 ) 202 .fetch_optional(&state.db) 203 .await; 204 match result { 205 Ok(Some(row)) => { 206 let rev = get_rev_from_commit(&state, &row.repo_root_cid).await; 207 ( 208 StatusCode::OK, 209 Json(GetRepoStatusOutput { 210 did: row.did, 211 active: true, 212 rev, 213 }), 214 ) 215 .into_response() 216 } 217 Ok(None) => ( 218 StatusCode::NOT_FOUND, 219 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})), 220 ) 221 .into_response(), 222 Err(e) => { 223 error!("DB error in get_repo_status: {:?}", e); 224 ( 225 StatusCode::INTERNAL_SERVER_ERROR, 226 Json(json!({"error": "InternalError"})), 227 ) 228 .into_response() 229 } 230 } 231}