forked from
lewis.moe/bspds-sandbox
I've been saying "PDSes seem easy enough, they're what, some CRUD to a db? I can do that in my sleep". well i'm sleeping rn so let's go
1use crate::state::AppState;
2use axum::{
3 Json,
4 extract::{Query, State},
5 http::StatusCode,
6 response::{IntoResponse, Response},
7};
8use cid::Cid;
9use jacquard_repo::commit::Commit;
10use jacquard_repo::storage::BlockStore;
11use serde::{Deserialize, Serialize};
12use serde_json::json;
13use std::str::FromStr;
14use tracing::error;
15
16async fn get_rev_from_commit(state: &AppState, cid_str: &str) -> Option<String> {
17 let cid = Cid::from_str(cid_str).ok()?;
18 let block = state.block_store.get(&cid).await.ok()??;
19 let commit = Commit::from_cbor(&block).ok()?;
20 Some(commit.rev().to_string())
21}
22
23#[derive(Deserialize)]
24pub struct GetLatestCommitParams {
25 pub did: String,
26}
27
28#[derive(Serialize)]
29pub struct GetLatestCommitOutput {
30 pub cid: String,
31 pub rev: String,
32}
33
34pub async fn get_latest_commit(
35 State(state): State<AppState>,
36 Query(params): Query<GetLatestCommitParams>,
37) -> Response {
38 let did = params.did.trim();
39 if did.is_empty() {
40 return (
41 StatusCode::BAD_REQUEST,
42 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
43 )
44 .into_response();
45 }
46 let result = sqlx::query!(
47 r#"
48 SELECT r.repo_root_cid
49 FROM repos r
50 JOIN users u ON r.user_id = u.id
51 WHERE u.did = $1
52 "#,
53 did
54 )
55 .fetch_optional(&state.db)
56 .await;
57 match result {
58 Ok(Some(row)) => {
59 let rev = get_rev_from_commit(&state, &row.repo_root_cid)
60 .await
61 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().to_string());
62 (
63 StatusCode::OK,
64 Json(GetLatestCommitOutput {
65 cid: row.repo_root_cid,
66 rev,
67 }),
68 )
69 .into_response()
70 }
71 Ok(None) => (
72 StatusCode::NOT_FOUND,
73 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
74 )
75 .into_response(),
76 Err(e) => {
77 error!("DB error in get_latest_commit: {:?}", e);
78 (
79 StatusCode::INTERNAL_SERVER_ERROR,
80 Json(json!({"error": "InternalError"})),
81 )
82 .into_response()
83 }
84 }
85}
86
87#[derive(Deserialize)]
88pub struct ListReposParams {
89 pub limit: Option<i64>,
90 pub cursor: Option<String>,
91}
92
93#[derive(Serialize)]
94#[serde(rename_all = "camelCase")]
95pub struct RepoInfo {
96 pub did: String,
97 pub head: String,
98 pub rev: String,
99 pub active: bool,
100}
101
102#[derive(Serialize)]
103pub struct ListReposOutput {
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub cursor: Option<String>,
106 pub repos: Vec<RepoInfo>,
107}
108
109pub async fn list_repos(
110 State(state): State<AppState>,
111 Query(params): Query<ListReposParams>,
112) -> Response {
113 let limit = params.limit.unwrap_or(50).clamp(1, 1000);
114 let cursor_did = params.cursor.as_deref().unwrap_or("");
115 let result = sqlx::query!(
116 r#"
117 SELECT u.did, r.repo_root_cid
118 FROM repos r
119 JOIN users u ON r.user_id = u.id
120 WHERE u.did > $1
121 ORDER BY u.did ASC
122 LIMIT $2
123 "#,
124 cursor_did,
125 limit + 1
126 )
127 .fetch_all(&state.db)
128 .await;
129 match result {
130 Ok(rows) => {
131 let has_more = rows.len() as i64 > limit;
132 let mut repos: Vec<RepoInfo> = Vec::new();
133 for row in rows.iter().take(limit as usize) {
134 let rev = get_rev_from_commit(&state, &row.repo_root_cid)
135 .await
136 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().to_string());
137 repos.push(RepoInfo {
138 did: row.did.clone(),
139 head: row.repo_root_cid.clone(),
140 rev,
141 active: true,
142 });
143 }
144 let next_cursor = if has_more {
145 repos.last().map(|r| r.did.clone())
146 } else {
147 None
148 };
149 (
150 StatusCode::OK,
151 Json(ListReposOutput {
152 cursor: next_cursor,
153 repos,
154 }),
155 )
156 .into_response()
157 }
158 Err(e) => {
159 error!("DB error in list_repos: {:?}", e);
160 (
161 StatusCode::INTERNAL_SERVER_ERROR,
162 Json(json!({"error": "InternalError"})),
163 )
164 .into_response()
165 }
166 }
167}
168
169#[derive(Deserialize)]
170pub struct GetRepoStatusParams {
171 pub did: String,
172}
173
174#[derive(Serialize)]
175pub struct GetRepoStatusOutput {
176 pub did: String,
177 pub active: bool,
178 pub rev: Option<String>,
179}
180
181pub async fn get_repo_status(
182 State(state): State<AppState>,
183 Query(params): Query<GetRepoStatusParams>,
184) -> Response {
185 let did = params.did.trim();
186 if did.is_empty() {
187 return (
188 StatusCode::BAD_REQUEST,
189 Json(json!({"error": "InvalidRequest", "message": "did is required"})),
190 )
191 .into_response();
192 }
193 let result = sqlx::query!(
194 r#"
195 SELECT u.did, r.repo_root_cid
196 FROM users u
197 LEFT JOIN repos r ON u.id = r.user_id
198 WHERE u.did = $1
199 "#,
200 did
201 )
202 .fetch_optional(&state.db)
203 .await;
204 match result {
205 Ok(Some(row)) => {
206 let rev = get_rev_from_commit(&state, &row.repo_root_cid).await;
207 (
208 StatusCode::OK,
209 Json(GetRepoStatusOutput {
210 did: row.did,
211 active: true,
212 rev,
213 }),
214 )
215 .into_response()
216 }
217 Ok(None) => (
218 StatusCode::NOT_FOUND,
219 Json(json!({"error": "RepoNotFound", "message": "Could not find repo for DID"})),
220 )
221 .into_response(),
222 Err(e) => {
223 error!("DB error in get_repo_status: {:?}", e);
224 (
225 StatusCode::INTERNAL_SERVER_ERROR,
226 Json(json!({"error": "InternalError"})),
227 )
228 .into_response()
229 }
230 }
231}