forked from
lewis.moe/bspds-sandbox
I've been saying "PDSes seem easy enough, they're what, some CRUD to a db? I can do that in my sleep". well i'm sleeping rn so let's go
1use crate::api::error::ApiError;
2use crate::state::AppState;
3use crate::sync::util::{AccountStatus, assert_repo_availability, get_account_with_status};
4use axum::{
5 Json,
6 extract::{Query, State},
7 http::StatusCode,
8 response::{IntoResponse, Response},
9};
10use cid::Cid;
11use jacquard_repo::commit::Commit;
12use jacquard_repo::storage::BlockStore;
13use serde::{Deserialize, Serialize};
14use std::str::FromStr;
15use tracing::error;
16
17async fn get_rev_from_commit(state: &AppState, cid_str: &str) -> Option<String> {
18 let cid = Cid::from_str(cid_str).ok()?;
19 let block = state.block_store.get(&cid).await.ok()??;
20 let commit = Commit::from_cbor(&block).ok()?;
21 Some(commit.rev().to_string())
22}
23
24#[derive(Deserialize)]
25pub struct GetLatestCommitParams {
26 pub did: String,
27}
28
29#[derive(Serialize)]
30pub struct GetLatestCommitOutput {
31 pub cid: String,
32 pub rev: String,
33}
34
35pub async fn get_latest_commit(
36 State(state): State<AppState>,
37 Query(params): Query<GetLatestCommitParams>,
38) -> Response {
39 let did = params.did.trim();
40 if did.is_empty() {
41 return ApiError::InvalidRequest("did is required".into()).into_response();
42 }
43
44 let account = match assert_repo_availability(&state.db, did, false).await {
45 Ok(a) => a,
46 Err(e) => return e.into_response(),
47 };
48
49 let Some(repo_root_cid) = account.repo_root_cid else {
50 return ApiError::RepoNotFound(Some("Repo not initialized".into())).into_response();
51 };
52
53 let Some(rev) = get_rev_from_commit(&state, &repo_root_cid).await else {
54 error!(
55 "Failed to parse commit for DID {}: CID {}",
56 did, repo_root_cid
57 );
58 return ApiError::InternalError(Some("Failed to read repo commit".into())).into_response();
59 };
60
61 (
62 StatusCode::OK,
63 Json(GetLatestCommitOutput {
64 cid: repo_root_cid,
65 rev,
66 }),
67 )
68 .into_response()
69}
70
71#[derive(Deserialize)]
72pub struct ListReposParams {
73 pub limit: Option<i64>,
74 pub cursor: Option<String>,
75}
76
77#[derive(Serialize)]
78#[serde(rename_all = "camelCase")]
79pub struct RepoInfo {
80 pub did: String,
81 pub head: String,
82 pub rev: String,
83 pub active: bool,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub status: Option<String>,
86}
87
88#[derive(Serialize)]
89pub struct ListReposOutput {
90 #[serde(skip_serializing_if = "Option::is_none")]
91 pub cursor: Option<String>,
92 pub repos: Vec<RepoInfo>,
93}
94
95pub async fn list_repos(
96 State(state): State<AppState>,
97 Query(params): Query<ListReposParams>,
98) -> Response {
99 let limit = params.limit.unwrap_or(50).clamp(1, 1000);
100 let cursor_did = params.cursor.as_deref().unwrap_or("");
101 let result = sqlx::query!(
102 r#"
103 SELECT u.did, u.deactivated_at, u.takedown_ref, r.repo_root_cid, r.repo_rev
104 FROM repos r
105 JOIN users u ON r.user_id = u.id
106 WHERE u.did > $1
107 ORDER BY u.did ASC
108 LIMIT $2
109 "#,
110 cursor_did,
111 limit + 1
112 )
113 .fetch_all(&state.db)
114 .await;
115 match result {
116 Ok(rows) => {
117 let has_more = rows.len() as i64 > limit;
118 let mut repos: Vec<RepoInfo> = Vec::new();
119 for row in rows.iter().take(limit as usize) {
120 let rev = match get_rev_from_commit(&state, &row.repo_root_cid).await {
121 Some(r) => r,
122 None => {
123 if let Some(ref stored_rev) = row.repo_rev {
124 stored_rev.clone()
125 } else {
126 tracing::warn!(
127 "Failed to parse commit for DID {} in list_repos: CID {}",
128 row.did,
129 row.repo_root_cid
130 );
131 continue;
132 }
133 }
134 };
135 let status = if row.takedown_ref.is_some() {
136 AccountStatus::Takendown
137 } else if row.deactivated_at.is_some() {
138 AccountStatus::Deactivated
139 } else {
140 AccountStatus::Active
141 };
142 repos.push(RepoInfo {
143 did: row.did.clone(),
144 head: row.repo_root_cid.clone(),
145 rev,
146 active: status.is_active(),
147 status: status.as_str().map(String::from),
148 });
149 }
150 let next_cursor = if has_more {
151 repos.last().map(|r| r.did.clone())
152 } else {
153 None
154 };
155 (
156 StatusCode::OK,
157 Json(ListReposOutput {
158 cursor: next_cursor,
159 repos,
160 }),
161 )
162 .into_response()
163 }
164 Err(e) => {
165 error!("DB error in list_repos: {:?}", e);
166 ApiError::InternalError(Some("Database error".into())).into_response()
167 }
168 }
169}
170
171#[derive(Deserialize)]
172pub struct GetRepoStatusParams {
173 pub did: String,
174}
175
176#[derive(Serialize)]
177pub struct GetRepoStatusOutput {
178 pub did: String,
179 pub active: bool,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub status: Option<String>,
182 #[serde(skip_serializing_if = "Option::is_none")]
183 pub rev: Option<String>,
184}
185
186pub async fn get_repo_status(
187 State(state): State<AppState>,
188 Query(params): Query<GetRepoStatusParams>,
189) -> Response {
190 let did = params.did.trim();
191 if did.is_empty() {
192 return ApiError::InvalidRequest("did is required".into()).into_response();
193 }
194
195 let account = match get_account_with_status(&state.db, did).await {
196 Ok(Some(a)) => a,
197 Ok(None) => {
198 return ApiError::RepoNotFound(Some(format!("Could not find repo for DID: {}", did)))
199 .into_response();
200 }
201 Err(e) => {
202 error!("DB error in get_repo_status: {:?}", e);
203 return ApiError::InternalError(Some("Database error".into())).into_response();
204 }
205 };
206
207 let rev = if account.status.is_active() {
208 if let Some(ref cid) = account.repo_root_cid {
209 get_rev_from_commit(&state, cid).await
210 } else {
211 None
212 }
213 } else {
214 None
215 };
216
217 (
218 StatusCode::OK,
219 Json(GetRepoStatusOutput {
220 did: account.did,
221 active: account.status.is_active(),
222 status: account.status.as_str().map(String::from),
223 rev,
224 }),
225 )
226 .into_response()
227}