Our Personal Data Server from scratch! tranquil.farm
atproto pds rust postgresql fun oauth

refactor(api): update repo batch/delete to use repo_ops, clean up remaining repo endpoints #80

merged opened by oyster.cafe targeting main from refactor/api
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mhi3qdcvt622
+187 -480
Diff #0
+6 -14
crates/tranquil-api/src/repo/blob.rs
··· 2 2 use axum::{ 3 3 Json, 4 4 extract::{Query, State}, 5 - http::StatusCode, 6 5 response::{IntoResponse, Response}, 7 6 }; 8 7 use bytes::Bytes; ··· 58 57 } 59 58 let mime_type_for_check = get_header_str(&headers, http::header::CONTENT_TYPE) 60 59 .unwrap_or("application/octet-stream"); 61 - let scope_proof = match user.verify_blob_upload(mime_type_for_check) { 62 - Ok(proof) => proof, 63 - Err(e) => return Ok(e.into_response()), 64 - }; 60 + let scope_proof = user.verify_blob_upload(mime_type_for_check)?; 65 61 ( 66 62 scope_proof.principal_did().into_did(), 67 63 scope_proof.controller_did().map(|c| c.into_did()), ··· 237 233 State(state): State<AppState>, 238 234 auth: Auth<NotTakendown>, 239 235 Query(params): Query<ListMissingBlobsParams>, 240 - ) -> Result<Response, ApiError> { 236 + ) -> Result<Json<ListMissingBlobsOutput>, ApiError> { 241 237 let did = &auth.did; 242 238 let user = state 243 239 .user_repo ··· 269 265 } else { 270 266 None 271 267 }; 272 - Ok(( 273 - StatusCode::OK, 274 - Json(ListMissingBlobsOutput { 275 - cursor: next_cursor, 276 - blobs, 277 - }), 278 - ) 279 - .into_response()) 268 + Ok(Json(ListMissingBlobsOutput { 269 + cursor: next_cursor, 270 + blobs, 271 + })) 280 272 }
+3 -7
crates/tranquil-api/src/repo/import.rs
··· 1 - use axum::{ 2 - body::Bytes, 3 - extract::State, 4 - response::{IntoResponse, Response}, 5 - }; 1 + use axum::{Json, body::Bytes, extract::State}; 6 2 use jacquard_common::types::{integer::LimitedU32, string::Tid}; 7 3 use jacquard_repo::storage::BlockStore; 8 4 use k256::ecdsa::SigningKey; ··· 22 18 State(state): State<AppState>, 23 19 auth: Auth<NotTakendown>, 24 20 body: Bytes, 25 - ) -> Result<Response, ApiError> { 21 + ) -> Result<Json<EmptyResponse>, ApiError> { 26 22 let accepting_imports = tranquil_config::get().import.accepting; 27 23 if !accepting_imports { 28 24 return Err(ApiError::InvalidRequest( ··· 340 336 ); 341 337 } 342 338 } 343 - Ok(EmptyResponse::ok().into_response()) 339 + Ok(Json(EmptyResponse {})) 344 340 } 345 341 Err(ImportError::SizeLimitExceeded) => Err(ApiError::PayloadTooLarge(format!( 346 342 "Import exceeds block limit of {}",
+9 -44
crates/tranquil-api/src/repo/meta.rs
··· 1 + use crate::common; 1 2 use axum::{ 2 3 Json, 3 4 extract::{Query, State}, ··· 5 6 }; 6 7 use serde::Deserialize; 7 8 use serde_json::json; 8 - use tranquil_pds::api::error::ApiError; 9 9 use tranquil_pds::state::AppState; 10 10 use tranquil_pds::types::AtIdentifier; 11 11 ··· 18 18 State(state): State<AppState>, 19 19 Query(input): Query<DescribeRepoInput>, 20 20 ) -> Response { 21 - let hostname_for_handles = tranquil_config::get().server.hostname_without_port(); 22 - let user_row = if input.repo.is_did() { 23 - let did: tranquil_pds::types::Did = match input.repo.as_str().parse() { 24 - Ok(d) => d, 25 - Err(_) => return ApiError::InvalidRequest("Invalid DID format".into()).into_response(), 26 - }; 27 - state 28 - .user_repo 29 - .get_by_did(&did) 30 - .await 31 - .map(|opt| opt.map(|r| (r.id, r.handle, r.did))) 32 - } else { 33 - let repo_str = input.repo.as_str(); 34 - let handle_str = if !repo_str.contains('.') { 35 - format!("{}.{}", repo_str, hostname_for_handles) 36 - } else { 37 - repo_str.to_string() 38 - }; 39 - let handle: tranquil_pds::types::Handle = match handle_str.parse() { 40 - Ok(h) => h, 41 - Err(_) => { 42 - return ApiError::InvalidRequest("Invalid handle format".into()).into_response(); 43 - } 44 - }; 45 - state 46 - .user_repo 47 - .get_by_handle(&handle) 48 - .await 49 - .map(|opt| opt.map(|r| (r.id, r.handle, r.did))) 50 - }; 51 - let (user_id, handle, did) = match user_row { 52 - Ok(Some((id, handle, did))) => (id, handle, did), 53 - Ok(None) => { 54 - return ApiError::RepoNotFound(Some("Repo not found".into())).into_response(); 55 - } 56 - Err(_) => { 57 - return ApiError::InternalError(None).into_response(); 58 - } 21 + let resolved = match common::resolve_repo(state.user_repo.as_ref(), &input.repo).await { 22 + Ok(r) => r, 23 + Err(e) => return e.into_response(), 59 24 }; 60 25 let collections = state 61 26 .repo_repo 62 - .list_collections(user_id) 27 + .list_collections(resolved.user_id) 63 28 .await 64 29 .unwrap_or_default(); 65 30 let did_doc = json!({ 66 - "id": did, 67 - "alsoKnownAs": [format!("at://{}", handle)] 31 + "id": resolved.did, 32 + "alsoKnownAs": [format!("at://{}", resolved.handle)] 68 33 }); 69 34 Json(json!({ 70 - "handle": handle, 71 - "did": did, 35 + "handle": resolved.handle, 36 + "did": resolved.did, 72 37 "didDoc": did_doc, 73 38 "collections": collections, 74 39 "handleIsCorrect": true
+93 -215
crates/tranquil-api/src/repo/record/batch.rs
··· 1 1 use super::validation::validate_record_with_status; 2 2 use super::validation_mode::{ValidationMode, deserialize_validation_mode}; 3 - use crate::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids}; 4 - use axum::{ 5 - Json, 6 - extract::State, 7 - http::StatusCode, 8 - response::{IntoResponse, Response}, 9 - }; 10 - use cid::Cid; 11 - use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 3 + use crate::repo::record::write::CommitInfo; 4 + use axum::{Json, extract::State}; 5 + use jacquard_repo::{mst::Mst, storage::BlockStore}; 12 6 use serde::{Deserialize, Serialize}; 13 7 use serde_json::json; 14 - use std::str::FromStr; 15 - use std::sync::Arc; 16 8 use tracing::info; 17 - use tranquil_pds::api::error::ApiError; 9 + use tranquil_pds::api::error::{ApiError, DbResultExt}; 18 10 use tranquil_pds::auth::{ 19 11 Active, Auth, WriteOpKind, require_not_migrated, require_verified_or_delegated, 20 12 verify_batch_write_scopes, 21 13 }; 22 - use tranquil_pds::cid_types::CommitCid; 23 - use tranquil_pds::delegation::DelegationActionType; 24 14 use tranquil_pds::repo::tracking::TrackingBlockStore; 15 + use tranquil_pds::repo_ops::{ 16 + FinalizeParams, RecordOp, begin_repo_write, extract_blob_cids, finalize_repo_write, 17 + }; 25 18 use tranquil_pds::state::AppState; 26 19 use tranquil_pds::types::{AtIdentifier, AtUri, Did, Nsid, Rkey}; 27 20 use tranquil_pds::validation::ValidationStatus; ··· 42 35 did: &Did, 43 36 validate: ValidationMode, 44 37 tracking_store: &TrackingBlockStore, 45 - ) -> Result<WriteAccumulator, Response> { 38 + ) -> Result<WriteAccumulator, ApiError> { 46 39 let WriteAccumulator { 47 40 mst, 48 41 mut results, ··· 60 53 let validation_status = if validate.should_skip() { 61 54 None 62 55 } else { 63 - match validate_record_with_status( 64 - value, 65 - collection, 66 - rkey.as_ref(), 67 - validate.requires_lexicon(), 56 + Some( 57 + validate_record_with_status( 58 + value, 59 + collection, 60 + rkey.as_ref(), 61 + validate.requires_lexicon(), 62 + ) 63 + .await?, 68 64 ) 69 - .await 70 - { 71 - Ok(status) => Some(status), 72 - Err(err_response) => return Err(*err_response), 73 - } 74 65 }; 75 66 all_blob_cids.extend(extract_blob_cids(value)); 76 67 let rkey = rkey.clone().unwrap_or_else(Rkey::generate); 77 68 let record_ipld = tranquil_pds::util::json_to_ipld(value); 78 - let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld).map_err(|_| { 79 - ApiError::InvalidRecord("Failed to serialize record".into()).into_response() 80 - })?; 81 - let record_cid = tracking_store.put(&record_bytes).await.map_err(|_| { 82 - ApiError::InternalError(Some("Failed to store record".into())).into_response() 83 - })?; 69 + let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld) 70 + .map_err(|_| ApiError::InvalidRecord("Failed to serialize record".into()))?; 71 + let record_cid = tracking_store 72 + .put(&record_bytes) 73 + .await 74 + .map_err(|_| ApiError::InternalError(Some("Failed to store record".into())))?; 84 75 let key = format!("{}/{}", collection, rkey); 85 76 modified_keys.push(key.clone()); 86 - let new_mst = mst.add(&key, record_cid).await.map_err(|_| { 87 - ApiError::InternalError(Some("Failed to add to MST".into())).into_response() 88 - })?; 77 + let new_mst = mst 78 + .add(&key, record_cid) 79 + .await 80 + .map_err(|_| ApiError::InternalError(Some("Failed to add to MST".into())))?; 89 81 let uri = AtUri::from_parts(did, collection, &rkey); 90 82 results.push(WriteResult::CreateResult { 91 83 uri, ··· 113 105 let validation_status = if validate.should_skip() { 114 106 None 115 107 } else { 116 - match validate_record_with_status( 117 - value, 118 - collection, 119 - Some(rkey), 120 - validate.requires_lexicon(), 108 + Some( 109 + validate_record_with_status( 110 + value, 111 + collection, 112 + Some(rkey), 113 + validate.requires_lexicon(), 114 + ) 115 + .await?, 121 116 ) 122 - .await 123 - { 124 - Ok(status) => Some(status), 125 - Err(err_response) => return Err(*err_response), 126 - } 127 117 }; 128 118 all_blob_cids.extend(extract_blob_cids(value)); 129 119 let record_ipld = tranquil_pds::util::json_to_ipld(value); 130 - let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld).map_err(|_| { 131 - ApiError::InvalidRecord("Failed to serialize record".into()).into_response() 132 - })?; 133 - let record_cid = tracking_store.put(&record_bytes).await.map_err(|_| { 134 - ApiError::InternalError(Some("Failed to store record".into())).into_response() 135 - })?; 120 + let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld) 121 + .map_err(|_| ApiError::InvalidRecord("Failed to serialize record".into()))?; 122 + let record_cid = tracking_store 123 + .put(&record_bytes) 124 + .await 125 + .map_err(|_| ApiError::InternalError(Some("Failed to store record".into())))?; 136 126 let key = format!("{}/{}", collection, rkey); 137 127 modified_keys.push(key.clone()); 138 128 let prev_record_cid = mst.get(&key).await.ok().flatten(); 139 - let new_mst = mst.update(&key, record_cid).await.map_err(|_| { 140 - ApiError::InternalError(Some("Failed to update MST".into())).into_response() 141 - })?; 129 + let new_mst = mst 130 + .update(&key, record_cid) 131 + .await 132 + .map_err(|_| ApiError::InternalError(Some("Failed to update MST".into())))?; 142 133 let uri = AtUri::from_parts(did, collection, rkey); 143 134 results.push(WriteResult::UpdateResult { 144 135 uri, ··· 163 154 let key = format!("{}/{}", collection, rkey); 164 155 modified_keys.push(key.clone()); 165 156 let prev_record_cid = mst.get(&key).await.ok().flatten(); 166 - let new_mst = mst.delete(&key).await.map_err(|_| { 167 - ApiError::InternalError(Some("Failed to delete from MST".into())).into_response() 168 - })?; 157 + let new_mst = mst 158 + .delete(&key) 159 + .await 160 + .map_err(|_| ApiError::InternalError(Some("Failed to delete from MST".into())))?; 169 161 results.push(WriteResult::DeleteResult {}); 170 162 ops.push(RecordOp::Delete { 171 163 collection: collection.clone(), ··· 189 181 did: &Did, 190 182 validate: ValidationMode, 191 183 tracking_store: &TrackingBlockStore, 192 - ) -> Result<WriteAccumulator, Response> { 184 + ) -> Result<WriteAccumulator, ApiError> { 193 185 use futures::stream::{self, TryStreamExt}; 194 186 let initial_acc = WriteAccumulator { 195 187 mst: initial_mst, ··· 198 190 modified_keys: Vec::new(), 199 191 all_blob_cids: Vec::new(), 200 192 }; 201 - stream::iter(writes.iter().map(Ok::<_, Response>)) 193 + stream::iter(writes.iter().map(Ok::<_, ApiError>)) 202 194 .try_fold(initial_acc, |acc, write| async move { 203 195 process_single_write(write, acc, did, validate, tracking_store).await 204 196 }) ··· 261 253 pub results: Vec<WriteResult>, 262 254 } 263 255 264 - #[derive(Serialize)] 265 - pub struct CommitInfo { 266 - pub cid: String, 267 - pub rev: String, 268 - } 269 - 270 256 pub async fn apply_writes( 271 257 State(state): State<AppState>, 272 258 auth: Auth<Active>, 273 259 Json(input): Json<ApplyWritesInput>, 274 - ) -> Result<Response, ApiError> { 260 + ) -> Result<Json<ApplyWritesOutput>, ApiError> { 275 261 info!( 276 262 "apply_writes called: repo={}, writes={}", 277 263 input.repo, ··· 288 274 ))); 289 275 } 290 276 291 - let batch_proof = match verify_batch_write_scopes( 277 + let batch_proof = verify_batch_write_scopes( 292 278 &auth, 293 279 &auth, 294 280 &input.writes, ··· 302 288 WriteOp::Update { .. } => WriteOpKind::Update, 303 289 WriteOp::Delete { .. } => WriteOpKind::Delete, 304 290 }, 305 - ) { 306 - Ok(proof) => proof, 307 - Err(e) => return Ok(e.into_response()), 308 - }; 291 + )?; 309 292 310 293 let principal_did = batch_proof.principal_did(); 311 294 let controller_did = batch_proof.controller_did().map(|c| c.into_did()); ··· 317 300 } 318 301 319 302 let did = principal_did.into_did(); 320 - if let Err(e) = require_not_migrated(&state, &did).await { 321 - return Ok(e); 322 - } 323 - if let Err(e) = require_verified_or_delegated(&state, batch_proof.user()).await { 324 - return Ok(e); 325 - } 303 + require_not_migrated(&state, &did).await?; 304 + require_verified_or_delegated(&state, batch_proof.user()).await?; 326 305 327 306 let user_id: uuid::Uuid = state 328 307 .user_repo 329 308 .get_id_by_did(&did) 330 309 .await 331 - .ok() 332 - .flatten() 333 - .ok_or_else(|| ApiError::InternalError(Some("User not found".into())))?; 310 + .log_db_err("fetching user for batch write")? 311 + .ok_or(ApiError::InternalError(Some("User not found".into())))?; 334 312 335 - let _write_lock = state.repo_write_locks.lock(user_id).await; 313 + let (ctx, mst) = begin_repo_write(&state, user_id, input.swap_commit.as_deref()).await?; 336 314 337 - let root_cid_str = state 338 - .repo_repo 339 - .get_repo_root_cid_by_user_id(user_id) 340 - .await 341 - .ok() 342 - .flatten() 343 - .ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())))?; 344 - let current_root_cid = CommitCid::from_str(&root_cid_str) 345 - .map_err(|_| ApiError::InternalError(Some("Invalid repo root CID".into())))?; 346 - if let Some(swap_commit) = &input.swap_commit 347 - && CommitCid::from_str(swap_commit).ok().as_ref() != Some(&current_root_cid) 348 - { 349 - return Err(ApiError::InvalidSwap(Some("Repo has been modified".into()))); 350 - } 351 - let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 352 - let commit_bytes = tracking_store 353 - .get(current_root_cid.as_cid()) 354 - .await 355 - .ok() 356 - .flatten() 357 - .ok_or_else(|| ApiError::InternalError(Some("Commit block not found".into())))?; 358 - let commit = Commit::from_cbor(&commit_bytes) 359 - .map_err(|_| ApiError::InternalError(Some("Failed to parse commit".into())))?; 360 - let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 361 - let initial_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 362 315 let WriteAccumulator { 363 - mst, 316 + mst: final_mst, 364 317 results, 365 318 ops, 366 319 modified_keys, 367 320 all_blob_cids, 368 - } = match process_writes( 321 + } = process_writes( 369 322 &input.writes, 370 - initial_mst, 323 + mst, 371 324 &did, 372 325 input.validate, 373 - &tracking_store, 326 + &ctx.tracking_store, 374 327 ) 375 - .await 376 - { 377 - Ok(acc) => acc, 378 - Err(response) => return Ok(response), 379 - }; 380 - let new_mst_root = mst 381 - .persist() 382 - .await 383 - .map_err(|_| ApiError::InternalError(Some("Failed to persist MST".into())))?; 384 - let (new_mst_blocks, old_mst_blocks) = { 385 - let mut new_blocks = std::collections::BTreeMap::new(); 386 - let mut old_blocks = std::collections::BTreeMap::new(); 387 - for key in &modified_keys { 388 - mst.blocks_for_path(key, &mut new_blocks) 389 - .await 390 - .map_err(|_| { 391 - ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 392 - })?; 393 - original_mst 394 - .blocks_for_path(key, &mut old_blocks) 395 - .await 396 - .map_err(|_| { 397 - ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 398 - })?; 399 - } 400 - (new_blocks, old_blocks) 401 - }; 402 - let mut relevant_blocks = new_mst_blocks.clone(); 403 - relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 404 - let written_cids: Vec<Cid> = tracking_store 405 - .get_all_relevant_cids() 406 - .into_iter() 407 - .chain(relevant_blocks.keys().copied()) 408 - .collect::<std::collections::HashSet<_>>() 409 - .into_iter() 410 - .collect(); 411 - let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 412 - let prev_record_cids = ops.iter().filter_map(|op| match op { 413 - RecordOp::Update { 414 - prev: Some(cid), .. 415 - } 416 - | RecordOp::Delete { 417 - prev: Some(cid), .. 418 - } => Some(*cid), 419 - _ => None, 420 - }); 421 - let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid.into_cid()) 422 - .chain( 423 - old_mst_blocks 424 - .keys() 425 - .filter(|cid| !new_mst_blocks.contains_key(*cid)) 426 - .copied(), 427 - ) 428 - .chain(prev_record_cids) 429 - .collect::<std::collections::HashSet<_>>() 430 - .into_iter() 431 - .collect(); 432 - let commit_res = match commit_and_log( 433 - &state, 434 - CommitParams { 435 - did: &did, 436 - user_id, 437 - current_root_cid: Some(current_root_cid.into_cid()), 438 - prev_data_cid: Some(commit.data), 439 - new_mst_root, 440 - ops, 441 - blocks_cids: &written_cids_str, 442 - blobs: &all_blob_cids, 443 - obsolete_cids, 444 - }, 445 - ) 446 - .await 447 - { 448 - Ok(res) => res, 449 - Err(e) => return Err(ApiError::from(e)), 450 - }; 328 + .await?; 451 329 452 - if let Some(ref controller) = controller_did { 453 - let write_summary: Vec<serde_json::Value> = input 330 + let write_summary: Option<serde_json::Value> = controller_did.as_ref().map(|_| { 331 + let writes: Vec<serde_json::Value> = input 454 332 .writes 455 333 .iter() 456 334 .map(|w| match w { ··· 475 353 }), 476 354 }) 477 355 .collect(); 356 + json!({ 357 + "action": "apply_writes", 358 + "count": input.writes.len(), 359 + "writes": writes 360 + }) 361 + }); 478 362 479 - let _ = state 480 - .delegation_repo 481 - .log_delegation_action( 482 - &did, 483 - controller, 484 - Some(controller), 485 - DelegationActionType::RepoWrite, 486 - Some(json!({ 487 - "action": "apply_writes", 488 - "count": input.writes.len(), 489 - "writes": write_summary 490 - })), 491 - None, 492 - None, 493 - ) 494 - .await; 495 - } 496 - 497 - Ok(( 498 - StatusCode::OK, 499 - Json(ApplyWritesOutput { 500 - commit: CommitInfo { 501 - cid: commit_res.commit_cid.to_string(), 502 - rev: commit_res.rev, 503 - }, 504 - results, 505 - }), 363 + let commit_result = finalize_repo_write( 364 + &state, 365 + ctx, 366 + final_mst, 367 + FinalizeParams { 368 + did: &did, 369 + user_id, 370 + controller_did: controller_did.as_ref(), 371 + delegation_detail: write_summary, 372 + ops, 373 + modified_keys: &modified_keys, 374 + blob_cids: &all_blob_cids, 375 + }, 506 376 ) 507 - .into_response()) 377 + .await?; 378 + 379 + Ok(Json(ApplyWritesOutput { 380 + commit: CommitInfo { 381 + cid: commit_result.commit_cid.to_string(), 382 + rev: commit_result.rev, 383 + }, 384 + results, 385 + })) 508 386 }
+58 -170
crates/tranquil-api/src/repo/record/delete.rs
··· 1 - use crate::repo::record::utils::{ 2 - CommitError, CommitParams, RecordOp, commit_and_log, get_current_root_cid, 3 - }; 4 1 use crate::repo::record::write::{CommitInfo, prepare_repo_write}; 5 - use axum::{ 6 - Json, 7 - extract::State, 8 - http::StatusCode, 9 - response::{IntoResponse, Response}, 10 - }; 2 + use axum::{Json, extract::State}; 11 3 use cid::Cid; 12 4 use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore}; 13 5 use serde::{Deserialize, Serialize}; ··· 17 9 use tracing::error; 18 10 use tranquil_pds::api::error::ApiError; 19 11 use tranquil_pds::auth::{Active, Auth, VerifyScope}; 20 - use tranquil_pds::cid_types::CommitCid; 21 - use tranquil_pds::delegation::DelegationActionType; 22 12 use tranquil_pds::repo::tracking::TrackingBlockStore; 13 + use tranquil_pds::repo_ops::{ 14 + CommitError, FinalizeParams, RecordOp, begin_repo_write, finalize_repo_write, 15 + }; 23 16 use tranquil_pds::state::AppState; 24 - use tranquil_pds::types::{AtIdentifier, AtUri, Nsid, Rkey}; 17 + use tranquil_pds::types::{AtIdentifier, AtUri, Did, Nsid, Rkey}; 25 18 26 19 #[derive(Deserialize)] 27 20 pub struct DeleteRecordInput { ··· 45 38 State(state): State<AppState>, 46 39 auth: Auth<Active>, 47 40 Json(input): Json<DeleteRecordInput>, 48 - ) -> Result<Response, tranquil_pds::api::error::ApiError> { 49 - let scope_proof = match auth.verify_repo_delete(&input.collection) { 50 - Ok(proof) => proof, 51 - Err(e) => return Ok(e.into_response()), 52 - }; 53 - 54 - let repo_auth = match prepare_repo_write(&state, &scope_proof, &input.repo).await { 55 - Ok(res) => res, 56 - Err(err_res) => return Ok(err_res), 57 - }; 58 - 41 + ) -> Result<Json<DeleteRecordOutput>, ApiError> { 42 + let scope_proof = auth.verify_repo_delete(&input.collection)?; 43 + let repo_auth = prepare_repo_write(&state, &scope_proof, &input.repo).await?; 59 44 let did = repo_auth.did; 60 45 let user_id = repo_auth.user_id; 61 46 let controller_did = repo_auth.controller_did; 62 47 63 - let _write_lock = state.repo_write_locks.lock(user_id).await; 64 - let current_root_cid = get_current_root_cid(&state, user_id).await?; 48 + let (ctx, mst) = begin_repo_write(&state, user_id, input.swap_commit.as_deref()).await?; 65 49 66 - if let Some(swap_commit) = &input.swap_commit 67 - && CommitCid::from_str(swap_commit).ok().as_ref() != Some(&current_root_cid) 68 - { 69 - return Ok(ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response()); 70 - } 71 - let tracking_store = TrackingBlockStore::new(state.block_store.clone()); 72 - let commit_bytes = match tracking_store.get(current_root_cid.as_cid()).await { 73 - Ok(Some(b)) => b, 74 - _ => { 75 - return Ok( 76 - ApiError::InternalError(Some("Commit block not found".into())).into_response(), 77 - ); 78 - } 79 - }; 80 - let commit = match Commit::from_cbor(&commit_bytes) { 81 - Ok(c) => c, 82 - _ => { 83 - return Ok( 84 - ApiError::InternalError(Some("Failed to parse commit".into())).into_response(), 85 - ); 86 - } 87 - }; 88 - let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None); 89 50 let key = format!("{}/{}", input.collection, input.rkey); 51 + 90 52 if let Some(swap_record_str) = &input.swap_record { 91 53 let expected_cid = Cid::from_str(swap_record_str).ok(); 92 54 let actual_cid = mst.get(&key).await.ok().flatten(); 93 55 if expected_cid != actual_cid { 94 - return Ok(ApiError::InvalidSwap(Some( 56 + return Err(ApiError::InvalidSwap(Some( 95 57 "Record has been modified or does not exist".into(), 96 - )) 97 - .into_response()); 58 + ))); 98 59 } 99 60 } 61 + 100 62 let prev_record_cid = mst.get(&key).await.ok().flatten(); 101 63 if prev_record_cid.is_none() { 102 - return Ok((StatusCode::OK, Json(DeleteRecordOutput { commit: None })).into_response()); 64 + return Ok(Json(DeleteRecordOutput { commit: None })); 103 65 } 104 - let new_mst = match mst.delete(&key).await { 105 - Ok(m) => m, 106 - Err(e) => { 107 - error!("Failed to delete from MST: {:?}", e); 108 - return Ok(ApiError::InternalError(Some(format!( 109 - "Failed to delete from MST: {:?}", 110 - e 111 - ))) 112 - .into_response()); 113 - } 114 - }; 115 - let new_mst_root = match new_mst.persist().await { 116 - Ok(c) => c, 117 - Err(e) => { 118 - error!("Failed to persist MST: {:?}", e); 119 - return Ok( 120 - ApiError::InternalError(Some("Failed to persist MST".into())).into_response(), 121 - ); 122 - } 123 - }; 124 - let collection_for_audit = input.collection.to_string(); 125 - let rkey_for_audit = input.rkey.to_string(); 66 + 67 + let new_mst = mst.delete(&key).await.map_err(|e| { 68 + error!("Failed to delete from MST: {:?}", e); 69 + ApiError::InternalError(Some("Failed to delete from MST".into())) 70 + })?; 71 + 126 72 let op = RecordOp::Delete { 127 73 collection: input.collection.clone(), 128 74 rkey: input.rkey.clone(), 129 75 prev: prev_record_cid, 130 76 }; 131 - let mut new_mst_blocks = std::collections::BTreeMap::new(); 132 - let mut old_mst_blocks = std::collections::BTreeMap::new(); 133 - if new_mst 134 - .blocks_for_path(&key, &mut new_mst_blocks) 135 - .await 136 - .is_err() 137 - { 138 - return Ok( 139 - ApiError::InternalError(Some("Failed to get new MST blocks for path".into())) 140 - .into_response(), 141 - ); 142 - } 143 - if mst 144 - .blocks_for_path(&key, &mut old_mst_blocks) 145 - .await 146 - .is_err() 147 - { 148 - return Ok( 149 - ApiError::InternalError(Some("Failed to get old MST blocks for path".into())) 150 - .into_response(), 151 - ); 152 - } 153 - let mut relevant_blocks = new_mst_blocks.clone(); 154 - relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 155 - let written_cids: Vec<Cid> = tracking_store 156 - .get_all_relevant_cids() 157 - .into_iter() 158 - .chain(relevant_blocks.keys().copied()) 159 - .collect::<std::collections::HashSet<_>>() 160 - .into_iter() 161 - .collect(); 162 - let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 163 - let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid.into_cid()) 164 - .chain( 165 - old_mst_blocks 166 - .keys() 167 - .filter(|cid| !new_mst_blocks.contains_key(*cid)) 168 - .copied(), 169 - ) 170 - .chain(prev_record_cid) 171 - .collect(); 172 - let commit_result = match commit_and_log( 77 + 78 + let modified_keys = [key]; 79 + 80 + let commit_result = finalize_repo_write( 173 81 &state, 174 - CommitParams { 82 + ctx, 83 + new_mst, 84 + FinalizeParams { 175 85 did: &did, 176 86 user_id, 177 - current_root_cid: Some(current_root_cid.into_cid()), 178 - prev_data_cid: Some(commit.data), 179 - new_mst_root, 87 + controller_did: controller_did.as_ref(), 88 + delegation_detail: controller_did.as_ref().map(|_| { 89 + json!({ 90 + "action": "delete", 91 + "collection": input.collection, 92 + "rkey": input.rkey 93 + }) 94 + }), 180 95 ops: vec![op], 181 - blocks_cids: &written_cids_str, 182 - blobs: &[], 183 - obsolete_cids, 96 + modified_keys: &modified_keys, 97 + blob_cids: &[], 184 98 }, 185 99 ) 186 - .await 187 - { 188 - Ok(res) => res, 189 - Err(e) => return Ok(ApiError::from(e).into_response()), 190 - }; 191 - 192 - if let Some(ref controller) = controller_did { 193 - let _ = state 194 - .delegation_repo 195 - .log_delegation_action( 196 - &did, 197 - controller, 198 - Some(controller), 199 - DelegationActionType::RepoWrite, 200 - Some(json!({ 201 - "action": "delete", 202 - "collection": collection_for_audit, 203 - "rkey": rkey_for_audit 204 - })), 205 - None, 206 - None, 207 - ) 208 - .await; 209 - } 100 + .await?; 210 101 211 102 let deleted_uri = AtUri::from_parts(&did, &input.collection, &input.rkey); 212 103 if let Err(e) = state ··· 217 108 error!("Failed to remove backlinks for {}: {}", deleted_uri, e); 218 109 } 219 110 220 - Ok(( 221 - StatusCode::OK, 222 - Json(DeleteRecordOutput { 223 - commit: Some(CommitInfo { 224 - cid: commit_result.commit_cid.to_string(), 225 - rev: commit_result.rev, 226 - }), 111 + Ok(Json(DeleteRecordOutput { 112 + commit: Some(CommitInfo { 113 + cid: commit_result.commit_cid.to_string(), 114 + rev: commit_result.rev, 227 115 }), 228 - ) 229 - .into_response()) 116 + })) 230 117 } 231 118 232 - use tranquil_pds::types::Did; 233 119 use uuid::Uuid; 234 120 235 121 pub async fn delete_record_internal( ··· 239 125 collection: &Nsid, 240 126 rkey: &Rkey, 241 127 ) -> Result<(), CommitError> { 128 + use tranquil_pds::repo_ops::{CommitParams, RecordOp, commit_and_log}; 129 + 242 130 let _write_lock = state.repo_write_locks.lock(user_id).await; 243 131 244 132 let root_cid_str = state ··· 303 191 .await 304 192 .map_err(|e| CommitError::MstOperationFailed(format!("{:?}", e)))?; 305 193 306 - let mut relevant_blocks = new_mst_blocks.clone(); 307 - relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone()))); 308 - 309 - let written_cids: Vec<Cid> = tracking_store 310 - .get_all_relevant_cids() 311 - .into_iter() 312 - .chain(relevant_blocks.keys().copied()) 313 - .collect::<std::collections::HashSet<_>>() 314 - .into_iter() 315 - .collect(); 316 - 317 - let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect(); 318 - 319 194 let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid) 320 195 .chain( 321 196 old_mst_blocks ··· 326 201 .chain(std::iter::once(prev_cid)) 327 202 .collect(); 328 203 204 + let mut relevant_blocks = new_mst_blocks; 205 + relevant_blocks.extend(old_mst_blocks); 206 + 207 + let written_cids: Vec<Cid> = tracking_store 208 + .get_all_relevant_cids() 209 + .into_iter() 210 + .chain(relevant_blocks.keys().copied()) 211 + .collect::<std::collections::HashSet<_>>() 212 + .into_iter() 213 + .collect(); 214 + 215 + let written_cids_str: Vec<String> = written_cids.iter().map(ToString::to_string).collect(); 216 + 329 217 commit_and_log( 330 218 state, 331 219 CommitParams {
+8 -7
crates/tranquil-api/src/server/mod.rs
··· 23 23 }; 24 24 pub use app_password::{create_app_password, list_app_passwords, revoke_app_password}; 25 25 pub use email::{ 26 - authorize_email_update, check_channel_verified, check_email_in_use, 27 - check_email_update_status, check_email_verified, confirm_email, request_email_update, 28 - update_email, 26 + authorize_email_update, check_channel_verified, check_email_in_use, check_email_update_status, 27 + check_email_verified, confirm_email, request_email_update, update_email, 29 28 }; 30 29 pub use invite::{create_invite_code, create_invite_codes, get_account_invite_codes}; 31 30 pub use logo::get_logo; ··· 44 43 set_password, 45 44 }; 46 45 pub use reauth::{ 47 - check_legacy_session_mfa, check_reauth_required, get_reauth_status, 48 - legacy_mfa_required_response, reauth_passkey_finish, reauth_passkey_start, reauth_password, 49 - reauth_required_response, reauth_totp, update_mfa_verified, 46 + check_legacy_session_mfa, check_reauth_required, get_reauth_status, reauth_passkey_finish, 47 + reauth_passkey_start, reauth_password, reauth_totp, update_mfa_verified, 50 48 }; 51 49 pub use service_auth::get_service_auth; 52 50 pub use session::{ ··· 64 62 trust_device, update_trusted_device, 65 63 }; 66 64 pub use verify_email::{resend_migration_verification, verify_migration_email}; 67 - pub use verify_token::{VerifyTokenInput, VerifyTokenOutput, verify_token, verify_token_internal}; 65 + pub use verify_token::{ 66 + VerifyTokenInput, VerifyTokenOutput, confirm_channel_verification, verify_token, 67 + verify_token_internal, 68 + };
+2 -2
crates/tranquil-api/src/server/service_auth.rs
··· 112 112 }; 113 113 114 114 let lxm = params.lxm.as_ref(); 115 - let lxm_for_token = lxm.map_or("*", |n| n.as_str()); 115 + let lxm_for_token = lxm.map_or("*", |v| v.as_str()); 116 116 117 117 if let Some(method) = lxm { 118 118 if let Err(e) = tranquil_pds::auth::scope_check::check_rpc_scope( ··· 121 121 params.aud.as_str(), 122 122 method.as_str(), 123 123 ) { 124 - return e; 124 + return e.into_response(); 125 125 } 126 126 } else if auth.is_oauth() { 127 127 let permissions = auth.permissions();
+4 -17
crates/tranquil-api/src/server/verify_email.rs
··· 74 74 return Ok(Json(ResendMigrationVerificationOutput { sent: true })); 75 75 } 76 76 77 - let hostname = &tranquil_config::get().server.hostname; 78 - let token = tranquil_pds::auth::verification_token::generate_migration_token( 79 - &user.did, 80 - channel, 81 - &identifier, 82 - ); 83 - let formatted_token = tranquil_pds::auth::verification_token::format_token_for_display(&token); 84 - 85 - if let Err(e) = tranquil_pds::comms::comms_repo::enqueue_migration_verification( 86 - state.user_repo.as_ref(), 87 - state.infra_repo.as_ref(), 77 + crate::identity::provision::enqueue_migration_verification( 78 + &state, 88 79 user.id, 80 + &user.did, 89 81 channel, 90 82 &identifier, 91 - &formatted_token, 92 - hostname, 93 83 ) 94 - .await 95 - { 96 - warn!(error = ?e, channel = ?channel, "Failed to enqueue migration verification"); 97 - } 84 + .await; 98 85 99 86 info!(did = %user.did, channel = ?channel, "Resent migration verification"); 100 87
+4 -4
crates/tranquil-api/src/temp.rs
··· 6 6 use cid::Cid; 7 7 use jacquard_repo::storage::BlockStore; 8 8 use serde::{Deserialize, Serialize}; 9 + use serde_json::Value; 9 10 use std::str::FromStr; 10 11 use tranquil_pds::api::error::ApiError; 11 12 use tranquil_pds::auth::{Active, Auth, Permissive}; ··· 51 52 State(state): State<AppState>, 52 53 _auth: Auth<Active>, 53 54 Json(input): Json<DereferenceScopeInput>, 54 - ) -> Result<Response, ApiError> { 55 + ) -> Result<Json<DereferenceScopeOutput>, ApiError> { 55 56 let scope_parts: Vec<&str> = input.scope.split_whitespace().collect(); 56 57 let mut resolved_scopes: Vec<String> = Vec::new(); 57 58 ··· 96 97 } 97 98 }; 98 99 99 - if let Some(scope_value) = scope_record.get("scope").and_then(|v| v.as_str()) { 100 + if let Some(scope_value) = scope_record.get("scope").and_then(Value::as_str) { 100 101 let _ = state 101 102 .cache 102 103 .set( ··· 118 119 119 120 Ok(Json(DereferenceScopeOutput { 120 121 scope: resolved_scopes.join(" "), 121 - }) 122 - .into_response()) 122 + })) 123 123 }

History

1 round 0 comments
sign up or login to add to the discussion
oyster.cafe submitted #0
1 commit
expand
refactor(api): update repo batch/delete to use repo_ops, clean up remaining repo endpoints
expand 0 comments
pull request successfully merged