+187
-480
Diff
round #0
+6
-14
crates/tranquil-api/src/repo/blob.rs
+6
-14
crates/tranquil-api/src/repo/blob.rs
···
2
2
use axum::{
3
3
Json,
4
4
extract::{Query, State},
5
-
http::StatusCode,
6
5
response::{IntoResponse, Response},
7
6
};
8
7
use bytes::Bytes;
···
58
57
}
59
58
let mime_type_for_check = get_header_str(&headers, http::header::CONTENT_TYPE)
60
59
.unwrap_or("application/octet-stream");
61
-
let scope_proof = match user.verify_blob_upload(mime_type_for_check) {
62
-
Ok(proof) => proof,
63
-
Err(e) => return Ok(e.into_response()),
64
-
};
60
+
let scope_proof = user.verify_blob_upload(mime_type_for_check)?;
65
61
(
66
62
scope_proof.principal_did().into_did(),
67
63
scope_proof.controller_did().map(|c| c.into_did()),
···
237
233
State(state): State<AppState>,
238
234
auth: Auth<NotTakendown>,
239
235
Query(params): Query<ListMissingBlobsParams>,
240
-
) -> Result<Response, ApiError> {
236
+
) -> Result<Json<ListMissingBlobsOutput>, ApiError> {
241
237
let did = &auth.did;
242
238
let user = state
243
239
.user_repo
···
269
265
} else {
270
266
None
271
267
};
272
-
Ok((
273
-
StatusCode::OK,
274
-
Json(ListMissingBlobsOutput {
275
-
cursor: next_cursor,
276
-
blobs,
277
-
}),
278
-
)
279
-
.into_response())
268
+
Ok(Json(ListMissingBlobsOutput {
269
+
cursor: next_cursor,
270
+
blobs,
271
+
}))
280
272
}
+3
-7
crates/tranquil-api/src/repo/import.rs
+3
-7
crates/tranquil-api/src/repo/import.rs
···
1
-
use axum::{
2
-
body::Bytes,
3
-
extract::State,
4
-
response::{IntoResponse, Response},
5
-
};
1
+
use axum::{Json, body::Bytes, extract::State};
6
2
use jacquard_common::types::{integer::LimitedU32, string::Tid};
7
3
use jacquard_repo::storage::BlockStore;
8
4
use k256::ecdsa::SigningKey;
···
22
18
State(state): State<AppState>,
23
19
auth: Auth<NotTakendown>,
24
20
body: Bytes,
25
-
) -> Result<Response, ApiError> {
21
+
) -> Result<Json<EmptyResponse>, ApiError> {
26
22
let accepting_imports = tranquil_config::get().import.accepting;
27
23
if !accepting_imports {
28
24
return Err(ApiError::InvalidRequest(
···
340
336
);
341
337
}
342
338
}
343
-
Ok(EmptyResponse::ok().into_response())
339
+
Ok(Json(EmptyResponse {}))
344
340
}
345
341
Err(ImportError::SizeLimitExceeded) => Err(ApiError::PayloadTooLarge(format!(
346
342
"Import exceeds block limit of {}",
+9
-44
crates/tranquil-api/src/repo/meta.rs
+9
-44
crates/tranquil-api/src/repo/meta.rs
···
1
+
use crate::common;
1
2
use axum::{
2
3
Json,
3
4
extract::{Query, State},
···
5
6
};
6
7
use serde::Deserialize;
7
8
use serde_json::json;
8
-
use tranquil_pds::api::error::ApiError;
9
9
use tranquil_pds::state::AppState;
10
10
use tranquil_pds::types::AtIdentifier;
11
11
···
18
18
State(state): State<AppState>,
19
19
Query(input): Query<DescribeRepoInput>,
20
20
) -> Response {
21
-
let hostname_for_handles = tranquil_config::get().server.hostname_without_port();
22
-
let user_row = if input.repo.is_did() {
23
-
let did: tranquil_pds::types::Did = match input.repo.as_str().parse() {
24
-
Ok(d) => d,
25
-
Err(_) => return ApiError::InvalidRequest("Invalid DID format".into()).into_response(),
26
-
};
27
-
state
28
-
.user_repo
29
-
.get_by_did(&did)
30
-
.await
31
-
.map(|opt| opt.map(|r| (r.id, r.handle, r.did)))
32
-
} else {
33
-
let repo_str = input.repo.as_str();
34
-
let handle_str = if !repo_str.contains('.') {
35
-
format!("{}.{}", repo_str, hostname_for_handles)
36
-
} else {
37
-
repo_str.to_string()
38
-
};
39
-
let handle: tranquil_pds::types::Handle = match handle_str.parse() {
40
-
Ok(h) => h,
41
-
Err(_) => {
42
-
return ApiError::InvalidRequest("Invalid handle format".into()).into_response();
43
-
}
44
-
};
45
-
state
46
-
.user_repo
47
-
.get_by_handle(&handle)
48
-
.await
49
-
.map(|opt| opt.map(|r| (r.id, r.handle, r.did)))
50
-
};
51
-
let (user_id, handle, did) = match user_row {
52
-
Ok(Some((id, handle, did))) => (id, handle, did),
53
-
Ok(None) => {
54
-
return ApiError::RepoNotFound(Some("Repo not found".into())).into_response();
55
-
}
56
-
Err(_) => {
57
-
return ApiError::InternalError(None).into_response();
58
-
}
21
+
let resolved = match common::resolve_repo(state.user_repo.as_ref(), &input.repo).await {
22
+
Ok(r) => r,
23
+
Err(e) => return e.into_response(),
59
24
};
60
25
let collections = state
61
26
.repo_repo
62
-
.list_collections(user_id)
27
+
.list_collections(resolved.user_id)
63
28
.await
64
29
.unwrap_or_default();
65
30
let did_doc = json!({
66
-
"id": did,
67
-
"alsoKnownAs": [format!("at://{}", handle)]
31
+
"id": resolved.did,
32
+
"alsoKnownAs": [format!("at://{}", resolved.handle)]
68
33
});
69
34
Json(json!({
70
-
"handle": handle,
71
-
"did": did,
35
+
"handle": resolved.handle,
36
+
"did": resolved.did,
72
37
"didDoc": did_doc,
73
38
"collections": collections,
74
39
"handleIsCorrect": true
+93
-215
crates/tranquil-api/src/repo/record/batch.rs
+93
-215
crates/tranquil-api/src/repo/record/batch.rs
···
1
1
use super::validation::validate_record_with_status;
2
2
use super::validation_mode::{ValidationMode, deserialize_validation_mode};
3
-
use crate::repo::record::utils::{CommitParams, RecordOp, commit_and_log, extract_blob_cids};
4
-
use axum::{
5
-
Json,
6
-
extract::State,
7
-
http::StatusCode,
8
-
response::{IntoResponse, Response},
9
-
};
10
-
use cid::Cid;
11
-
use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
3
+
use crate::repo::record::write::CommitInfo;
4
+
use axum::{Json, extract::State};
5
+
use jacquard_repo::{mst::Mst, storage::BlockStore};
12
6
use serde::{Deserialize, Serialize};
13
7
use serde_json::json;
14
-
use std::str::FromStr;
15
-
use std::sync::Arc;
16
8
use tracing::info;
17
-
use tranquil_pds::api::error::ApiError;
9
+
use tranquil_pds::api::error::{ApiError, DbResultExt};
18
10
use tranquil_pds::auth::{
19
11
Active, Auth, WriteOpKind, require_not_migrated, require_verified_or_delegated,
20
12
verify_batch_write_scopes,
21
13
};
22
-
use tranquil_pds::cid_types::CommitCid;
23
-
use tranquil_pds::delegation::DelegationActionType;
24
14
use tranquil_pds::repo::tracking::TrackingBlockStore;
15
+
use tranquil_pds::repo_ops::{
16
+
FinalizeParams, RecordOp, begin_repo_write, extract_blob_cids, finalize_repo_write,
17
+
};
25
18
use tranquil_pds::state::AppState;
26
19
use tranquil_pds::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
27
20
use tranquil_pds::validation::ValidationStatus;
···
42
35
did: &Did,
43
36
validate: ValidationMode,
44
37
tracking_store: &TrackingBlockStore,
45
-
) -> Result<WriteAccumulator, Response> {
38
+
) -> Result<WriteAccumulator, ApiError> {
46
39
let WriteAccumulator {
47
40
mst,
48
41
mut results,
···
60
53
let validation_status = if validate.should_skip() {
61
54
None
62
55
} else {
63
-
match validate_record_with_status(
64
-
value,
65
-
collection,
66
-
rkey.as_ref(),
67
-
validate.requires_lexicon(),
56
+
Some(
57
+
validate_record_with_status(
58
+
value,
59
+
collection,
60
+
rkey.as_ref(),
61
+
validate.requires_lexicon(),
62
+
)
63
+
.await?,
68
64
)
69
-
.await
70
-
{
71
-
Ok(status) => Some(status),
72
-
Err(err_response) => return Err(*err_response),
73
-
}
74
65
};
75
66
all_blob_cids.extend(extract_blob_cids(value));
76
67
let rkey = rkey.clone().unwrap_or_else(Rkey::generate);
77
68
let record_ipld = tranquil_pds::util::json_to_ipld(value);
78
-
let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld).map_err(|_| {
79
-
ApiError::InvalidRecord("Failed to serialize record".into()).into_response()
80
-
})?;
81
-
let record_cid = tracking_store.put(&record_bytes).await.map_err(|_| {
82
-
ApiError::InternalError(Some("Failed to store record".into())).into_response()
83
-
})?;
69
+
let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld)
70
+
.map_err(|_| ApiError::InvalidRecord("Failed to serialize record".into()))?;
71
+
let record_cid = tracking_store
72
+
.put(&record_bytes)
73
+
.await
74
+
.map_err(|_| ApiError::InternalError(Some("Failed to store record".into())))?;
84
75
let key = format!("{}/{}", collection, rkey);
85
76
modified_keys.push(key.clone());
86
-
let new_mst = mst.add(&key, record_cid).await.map_err(|_| {
87
-
ApiError::InternalError(Some("Failed to add to MST".into())).into_response()
88
-
})?;
77
+
let new_mst = mst
78
+
.add(&key, record_cid)
79
+
.await
80
+
.map_err(|_| ApiError::InternalError(Some("Failed to add to MST".into())))?;
89
81
let uri = AtUri::from_parts(did, collection, &rkey);
90
82
results.push(WriteResult::CreateResult {
91
83
uri,
···
113
105
let validation_status = if validate.should_skip() {
114
106
None
115
107
} else {
116
-
match validate_record_with_status(
117
-
value,
118
-
collection,
119
-
Some(rkey),
120
-
validate.requires_lexicon(),
108
+
Some(
109
+
validate_record_with_status(
110
+
value,
111
+
collection,
112
+
Some(rkey),
113
+
validate.requires_lexicon(),
114
+
)
115
+
.await?,
121
116
)
122
-
.await
123
-
{
124
-
Ok(status) => Some(status),
125
-
Err(err_response) => return Err(*err_response),
126
-
}
127
117
};
128
118
all_blob_cids.extend(extract_blob_cids(value));
129
119
let record_ipld = tranquil_pds::util::json_to_ipld(value);
130
-
let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld).map_err(|_| {
131
-
ApiError::InvalidRecord("Failed to serialize record".into()).into_response()
132
-
})?;
133
-
let record_cid = tracking_store.put(&record_bytes).await.map_err(|_| {
134
-
ApiError::InternalError(Some("Failed to store record".into())).into_response()
135
-
})?;
120
+
let record_bytes = serde_ipld_dagcbor::to_vec(&record_ipld)
121
+
.map_err(|_| ApiError::InvalidRecord("Failed to serialize record".into()))?;
122
+
let record_cid = tracking_store
123
+
.put(&record_bytes)
124
+
.await
125
+
.map_err(|_| ApiError::InternalError(Some("Failed to store record".into())))?;
136
126
let key = format!("{}/{}", collection, rkey);
137
127
modified_keys.push(key.clone());
138
128
let prev_record_cid = mst.get(&key).await.ok().flatten();
139
-
let new_mst = mst.update(&key, record_cid).await.map_err(|_| {
140
-
ApiError::InternalError(Some("Failed to update MST".into())).into_response()
141
-
})?;
129
+
let new_mst = mst
130
+
.update(&key, record_cid)
131
+
.await
132
+
.map_err(|_| ApiError::InternalError(Some("Failed to update MST".into())))?;
142
133
let uri = AtUri::from_parts(did, collection, rkey);
143
134
results.push(WriteResult::UpdateResult {
144
135
uri,
···
163
154
let key = format!("{}/{}", collection, rkey);
164
155
modified_keys.push(key.clone());
165
156
let prev_record_cid = mst.get(&key).await.ok().flatten();
166
-
let new_mst = mst.delete(&key).await.map_err(|_| {
167
-
ApiError::InternalError(Some("Failed to delete from MST".into())).into_response()
168
-
})?;
157
+
let new_mst = mst
158
+
.delete(&key)
159
+
.await
160
+
.map_err(|_| ApiError::InternalError(Some("Failed to delete from MST".into())))?;
169
161
results.push(WriteResult::DeleteResult {});
170
162
ops.push(RecordOp::Delete {
171
163
collection: collection.clone(),
···
189
181
did: &Did,
190
182
validate: ValidationMode,
191
183
tracking_store: &TrackingBlockStore,
192
-
) -> Result<WriteAccumulator, Response> {
184
+
) -> Result<WriteAccumulator, ApiError> {
193
185
use futures::stream::{self, TryStreamExt};
194
186
let initial_acc = WriteAccumulator {
195
187
mst: initial_mst,
···
198
190
modified_keys: Vec::new(),
199
191
all_blob_cids: Vec::new(),
200
192
};
201
-
stream::iter(writes.iter().map(Ok::<_, Response>))
193
+
stream::iter(writes.iter().map(Ok::<_, ApiError>))
202
194
.try_fold(initial_acc, |acc, write| async move {
203
195
process_single_write(write, acc, did, validate, tracking_store).await
204
196
})
···
261
253
pub results: Vec<WriteResult>,
262
254
}
263
255
264
-
#[derive(Serialize)]
265
-
pub struct CommitInfo {
266
-
pub cid: String,
267
-
pub rev: String,
268
-
}
269
-
270
256
pub async fn apply_writes(
271
257
State(state): State<AppState>,
272
258
auth: Auth<Active>,
273
259
Json(input): Json<ApplyWritesInput>,
274
-
) -> Result<Response, ApiError> {
260
+
) -> Result<Json<ApplyWritesOutput>, ApiError> {
275
261
info!(
276
262
"apply_writes called: repo={}, writes={}",
277
263
input.repo,
···
288
274
)));
289
275
}
290
276
291
-
let batch_proof = match verify_batch_write_scopes(
277
+
let batch_proof = verify_batch_write_scopes(
292
278
&auth,
293
279
&auth,
294
280
&input.writes,
···
302
288
WriteOp::Update { .. } => WriteOpKind::Update,
303
289
WriteOp::Delete { .. } => WriteOpKind::Delete,
304
290
},
305
-
) {
306
-
Ok(proof) => proof,
307
-
Err(e) => return Ok(e.into_response()),
308
-
};
291
+
)?;
309
292
310
293
let principal_did = batch_proof.principal_did();
311
294
let controller_did = batch_proof.controller_did().map(|c| c.into_did());
···
317
300
}
318
301
319
302
let did = principal_did.into_did();
320
-
if let Err(e) = require_not_migrated(&state, &did).await {
321
-
return Ok(e);
322
-
}
323
-
if let Err(e) = require_verified_or_delegated(&state, batch_proof.user()).await {
324
-
return Ok(e);
325
-
}
303
+
require_not_migrated(&state, &did).await?;
304
+
require_verified_or_delegated(&state, batch_proof.user()).await?;
326
305
327
306
let user_id: uuid::Uuid = state
328
307
.user_repo
329
308
.get_id_by_did(&did)
330
309
.await
331
-
.ok()
332
-
.flatten()
333
-
.ok_or_else(|| ApiError::InternalError(Some("User not found".into())))?;
310
+
.log_db_err("fetching user for batch write")?
311
+
.ok_or(ApiError::InternalError(Some("User not found".into())))?;
334
312
335
-
let _write_lock = state.repo_write_locks.lock(user_id).await;
313
+
let (ctx, mst) = begin_repo_write(&state, user_id, input.swap_commit.as_deref()).await?;
336
314
337
-
let root_cid_str = state
338
-
.repo_repo
339
-
.get_repo_root_cid_by_user_id(user_id)
340
-
.await
341
-
.ok()
342
-
.flatten()
343
-
.ok_or_else(|| ApiError::InternalError(Some("Repo root not found".into())))?;
344
-
let current_root_cid = CommitCid::from_str(&root_cid_str)
345
-
.map_err(|_| ApiError::InternalError(Some("Invalid repo root CID".into())))?;
346
-
if let Some(swap_commit) = &input.swap_commit
347
-
&& CommitCid::from_str(swap_commit).ok().as_ref() != Some(¤t_root_cid)
348
-
{
349
-
return Err(ApiError::InvalidSwap(Some("Repo has been modified".into())));
350
-
}
351
-
let tracking_store = TrackingBlockStore::new(state.block_store.clone());
352
-
let commit_bytes = tracking_store
353
-
.get(current_root_cid.as_cid())
354
-
.await
355
-
.ok()
356
-
.flatten()
357
-
.ok_or_else(|| ApiError::InternalError(Some("Commit block not found".into())))?;
358
-
let commit = Commit::from_cbor(&commit_bytes)
359
-
.map_err(|_| ApiError::InternalError(Some("Failed to parse commit".into())))?;
360
-
let original_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
361
-
let initial_mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
362
315
let WriteAccumulator {
363
-
mst,
316
+
mst: final_mst,
364
317
results,
365
318
ops,
366
319
modified_keys,
367
320
all_blob_cids,
368
-
} = match process_writes(
321
+
} = process_writes(
369
322
&input.writes,
370
-
initial_mst,
323
+
mst,
371
324
&did,
372
325
input.validate,
373
-
&tracking_store,
326
+
&ctx.tracking_store,
374
327
)
375
-
.await
376
-
{
377
-
Ok(acc) => acc,
378
-
Err(response) => return Ok(response),
379
-
};
380
-
let new_mst_root = mst
381
-
.persist()
382
-
.await
383
-
.map_err(|_| ApiError::InternalError(Some("Failed to persist MST".into())))?;
384
-
let (new_mst_blocks, old_mst_blocks) = {
385
-
let mut new_blocks = std::collections::BTreeMap::new();
386
-
let mut old_blocks = std::collections::BTreeMap::new();
387
-
for key in &modified_keys {
388
-
mst.blocks_for_path(key, &mut new_blocks)
389
-
.await
390
-
.map_err(|_| {
391
-
ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
392
-
})?;
393
-
original_mst
394
-
.blocks_for_path(key, &mut old_blocks)
395
-
.await
396
-
.map_err(|_| {
397
-
ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
398
-
})?;
399
-
}
400
-
(new_blocks, old_blocks)
401
-
};
402
-
let mut relevant_blocks = new_mst_blocks.clone();
403
-
relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
404
-
let written_cids: Vec<Cid> = tracking_store
405
-
.get_all_relevant_cids()
406
-
.into_iter()
407
-
.chain(relevant_blocks.keys().copied())
408
-
.collect::<std::collections::HashSet<_>>()
409
-
.into_iter()
410
-
.collect();
411
-
let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
412
-
let prev_record_cids = ops.iter().filter_map(|op| match op {
413
-
RecordOp::Update {
414
-
prev: Some(cid), ..
415
-
}
416
-
| RecordOp::Delete {
417
-
prev: Some(cid), ..
418
-
} => Some(*cid),
419
-
_ => None,
420
-
});
421
-
let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid.into_cid())
422
-
.chain(
423
-
old_mst_blocks
424
-
.keys()
425
-
.filter(|cid| !new_mst_blocks.contains_key(*cid))
426
-
.copied(),
427
-
)
428
-
.chain(prev_record_cids)
429
-
.collect::<std::collections::HashSet<_>>()
430
-
.into_iter()
431
-
.collect();
432
-
let commit_res = match commit_and_log(
433
-
&state,
434
-
CommitParams {
435
-
did: &did,
436
-
user_id,
437
-
current_root_cid: Some(current_root_cid.into_cid()),
438
-
prev_data_cid: Some(commit.data),
439
-
new_mst_root,
440
-
ops,
441
-
blocks_cids: &written_cids_str,
442
-
blobs: &all_blob_cids,
443
-
obsolete_cids,
444
-
},
445
-
)
446
-
.await
447
-
{
448
-
Ok(res) => res,
449
-
Err(e) => return Err(ApiError::from(e)),
450
-
};
328
+
.await?;
451
329
452
-
if let Some(ref controller) = controller_did {
453
-
let write_summary: Vec<serde_json::Value> = input
330
+
let write_summary: Option<serde_json::Value> = controller_did.as_ref().map(|_| {
331
+
let writes: Vec<serde_json::Value> = input
454
332
.writes
455
333
.iter()
456
334
.map(|w| match w {
···
475
353
}),
476
354
})
477
355
.collect();
356
+
json!({
357
+
"action": "apply_writes",
358
+
"count": input.writes.len(),
359
+
"writes": writes
360
+
})
361
+
});
478
362
479
-
let _ = state
480
-
.delegation_repo
481
-
.log_delegation_action(
482
-
&did,
483
-
controller,
484
-
Some(controller),
485
-
DelegationActionType::RepoWrite,
486
-
Some(json!({
487
-
"action": "apply_writes",
488
-
"count": input.writes.len(),
489
-
"writes": write_summary
490
-
})),
491
-
None,
492
-
None,
493
-
)
494
-
.await;
495
-
}
496
-
497
-
Ok((
498
-
StatusCode::OK,
499
-
Json(ApplyWritesOutput {
500
-
commit: CommitInfo {
501
-
cid: commit_res.commit_cid.to_string(),
502
-
rev: commit_res.rev,
503
-
},
504
-
results,
505
-
}),
363
+
let commit_result = finalize_repo_write(
364
+
&state,
365
+
ctx,
366
+
final_mst,
367
+
FinalizeParams {
368
+
did: &did,
369
+
user_id,
370
+
controller_did: controller_did.as_ref(),
371
+
delegation_detail: write_summary,
372
+
ops,
373
+
modified_keys: &modified_keys,
374
+
blob_cids: &all_blob_cids,
375
+
},
506
376
)
507
-
.into_response())
377
+
.await?;
378
+
379
+
Ok(Json(ApplyWritesOutput {
380
+
commit: CommitInfo {
381
+
cid: commit_result.commit_cid.to_string(),
382
+
rev: commit_result.rev,
383
+
},
384
+
results,
385
+
}))
508
386
}
+58
-170
crates/tranquil-api/src/repo/record/delete.rs
+58
-170
crates/tranquil-api/src/repo/record/delete.rs
···
1
-
use crate::repo::record::utils::{
2
-
CommitError, CommitParams, RecordOp, commit_and_log, get_current_root_cid,
3
-
};
4
1
use crate::repo::record::write::{CommitInfo, prepare_repo_write};
5
-
use axum::{
6
-
Json,
7
-
extract::State,
8
-
http::StatusCode,
9
-
response::{IntoResponse, Response},
10
-
};
2
+
use axum::{Json, extract::State};
11
3
use cid::Cid;
12
4
use jacquard_repo::{commit::Commit, mst::Mst, storage::BlockStore};
13
5
use serde::{Deserialize, Serialize};
···
17
9
use tracing::error;
18
10
use tranquil_pds::api::error::ApiError;
19
11
use tranquil_pds::auth::{Active, Auth, VerifyScope};
20
-
use tranquil_pds::cid_types::CommitCid;
21
-
use tranquil_pds::delegation::DelegationActionType;
22
12
use tranquil_pds::repo::tracking::TrackingBlockStore;
13
+
use tranquil_pds::repo_ops::{
14
+
CommitError, FinalizeParams, RecordOp, begin_repo_write, finalize_repo_write,
15
+
};
23
16
use tranquil_pds::state::AppState;
24
-
use tranquil_pds::types::{AtIdentifier, AtUri, Nsid, Rkey};
17
+
use tranquil_pds::types::{AtIdentifier, AtUri, Did, Nsid, Rkey};
25
18
26
19
#[derive(Deserialize)]
27
20
pub struct DeleteRecordInput {
···
45
38
State(state): State<AppState>,
46
39
auth: Auth<Active>,
47
40
Json(input): Json<DeleteRecordInput>,
48
-
) -> Result<Response, tranquil_pds::api::error::ApiError> {
49
-
let scope_proof = match auth.verify_repo_delete(&input.collection) {
50
-
Ok(proof) => proof,
51
-
Err(e) => return Ok(e.into_response()),
52
-
};
53
-
54
-
let repo_auth = match prepare_repo_write(&state, &scope_proof, &input.repo).await {
55
-
Ok(res) => res,
56
-
Err(err_res) => return Ok(err_res),
57
-
};
58
-
41
+
) -> Result<Json<DeleteRecordOutput>, ApiError> {
42
+
let scope_proof = auth.verify_repo_delete(&input.collection)?;
43
+
let repo_auth = prepare_repo_write(&state, &scope_proof, &input.repo).await?;
59
44
let did = repo_auth.did;
60
45
let user_id = repo_auth.user_id;
61
46
let controller_did = repo_auth.controller_did;
62
47
63
-
let _write_lock = state.repo_write_locks.lock(user_id).await;
64
-
let current_root_cid = get_current_root_cid(&state, user_id).await?;
48
+
let (ctx, mst) = begin_repo_write(&state, user_id, input.swap_commit.as_deref()).await?;
65
49
66
-
if let Some(swap_commit) = &input.swap_commit
67
-
&& CommitCid::from_str(swap_commit).ok().as_ref() != Some(¤t_root_cid)
68
-
{
69
-
return Ok(ApiError::InvalidSwap(Some("Repo has been modified".into())).into_response());
70
-
}
71
-
let tracking_store = TrackingBlockStore::new(state.block_store.clone());
72
-
let commit_bytes = match tracking_store.get(current_root_cid.as_cid()).await {
73
-
Ok(Some(b)) => b,
74
-
_ => {
75
-
return Ok(
76
-
ApiError::InternalError(Some("Commit block not found".into())).into_response(),
77
-
);
78
-
}
79
-
};
80
-
let commit = match Commit::from_cbor(&commit_bytes) {
81
-
Ok(c) => c,
82
-
_ => {
83
-
return Ok(
84
-
ApiError::InternalError(Some("Failed to parse commit".into())).into_response(),
85
-
);
86
-
}
87
-
};
88
-
let mst = Mst::load(Arc::new(tracking_store.clone()), commit.data, None);
89
50
let key = format!("{}/{}", input.collection, input.rkey);
51
+
90
52
if let Some(swap_record_str) = &input.swap_record {
91
53
let expected_cid = Cid::from_str(swap_record_str).ok();
92
54
let actual_cid = mst.get(&key).await.ok().flatten();
93
55
if expected_cid != actual_cid {
94
-
return Ok(ApiError::InvalidSwap(Some(
56
+
return Err(ApiError::InvalidSwap(Some(
95
57
"Record has been modified or does not exist".into(),
96
-
))
97
-
.into_response());
58
+
)));
98
59
}
99
60
}
61
+
100
62
let prev_record_cid = mst.get(&key).await.ok().flatten();
101
63
if prev_record_cid.is_none() {
102
-
return Ok((StatusCode::OK, Json(DeleteRecordOutput { commit: None })).into_response());
64
+
return Ok(Json(DeleteRecordOutput { commit: None }));
103
65
}
104
-
let new_mst = match mst.delete(&key).await {
105
-
Ok(m) => m,
106
-
Err(e) => {
107
-
error!("Failed to delete from MST: {:?}", e);
108
-
return Ok(ApiError::InternalError(Some(format!(
109
-
"Failed to delete from MST: {:?}",
110
-
e
111
-
)))
112
-
.into_response());
113
-
}
114
-
};
115
-
let new_mst_root = match new_mst.persist().await {
116
-
Ok(c) => c,
117
-
Err(e) => {
118
-
error!("Failed to persist MST: {:?}", e);
119
-
return Ok(
120
-
ApiError::InternalError(Some("Failed to persist MST".into())).into_response(),
121
-
);
122
-
}
123
-
};
124
-
let collection_for_audit = input.collection.to_string();
125
-
let rkey_for_audit = input.rkey.to_string();
66
+
67
+
let new_mst = mst.delete(&key).await.map_err(|e| {
68
+
error!("Failed to delete from MST: {:?}", e);
69
+
ApiError::InternalError(Some("Failed to delete from MST".into()))
70
+
})?;
71
+
126
72
let op = RecordOp::Delete {
127
73
collection: input.collection.clone(),
128
74
rkey: input.rkey.clone(),
129
75
prev: prev_record_cid,
130
76
};
131
-
let mut new_mst_blocks = std::collections::BTreeMap::new();
132
-
let mut old_mst_blocks = std::collections::BTreeMap::new();
133
-
if new_mst
134
-
.blocks_for_path(&key, &mut new_mst_blocks)
135
-
.await
136
-
.is_err()
137
-
{
138
-
return Ok(
139
-
ApiError::InternalError(Some("Failed to get new MST blocks for path".into()))
140
-
.into_response(),
141
-
);
142
-
}
143
-
if mst
144
-
.blocks_for_path(&key, &mut old_mst_blocks)
145
-
.await
146
-
.is_err()
147
-
{
148
-
return Ok(
149
-
ApiError::InternalError(Some("Failed to get old MST blocks for path".into()))
150
-
.into_response(),
151
-
);
152
-
}
153
-
let mut relevant_blocks = new_mst_blocks.clone();
154
-
relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
155
-
let written_cids: Vec<Cid> = tracking_store
156
-
.get_all_relevant_cids()
157
-
.into_iter()
158
-
.chain(relevant_blocks.keys().copied())
159
-
.collect::<std::collections::HashSet<_>>()
160
-
.into_iter()
161
-
.collect();
162
-
let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
163
-
let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid.into_cid())
164
-
.chain(
165
-
old_mst_blocks
166
-
.keys()
167
-
.filter(|cid| !new_mst_blocks.contains_key(*cid))
168
-
.copied(),
169
-
)
170
-
.chain(prev_record_cid)
171
-
.collect();
172
-
let commit_result = match commit_and_log(
77
+
78
+
let modified_keys = [key];
79
+
80
+
let commit_result = finalize_repo_write(
173
81
&state,
174
-
CommitParams {
82
+
ctx,
83
+
new_mst,
84
+
FinalizeParams {
175
85
did: &did,
176
86
user_id,
177
-
current_root_cid: Some(current_root_cid.into_cid()),
178
-
prev_data_cid: Some(commit.data),
179
-
new_mst_root,
87
+
controller_did: controller_did.as_ref(),
88
+
delegation_detail: controller_did.as_ref().map(|_| {
89
+
json!({
90
+
"action": "delete",
91
+
"collection": input.collection,
92
+
"rkey": input.rkey
93
+
})
94
+
}),
180
95
ops: vec![op],
181
-
blocks_cids: &written_cids_str,
182
-
blobs: &[],
183
-
obsolete_cids,
96
+
modified_keys: &modified_keys,
97
+
blob_cids: &[],
184
98
},
185
99
)
186
-
.await
187
-
{
188
-
Ok(res) => res,
189
-
Err(e) => return Ok(ApiError::from(e).into_response()),
190
-
};
191
-
192
-
if let Some(ref controller) = controller_did {
193
-
let _ = state
194
-
.delegation_repo
195
-
.log_delegation_action(
196
-
&did,
197
-
controller,
198
-
Some(controller),
199
-
DelegationActionType::RepoWrite,
200
-
Some(json!({
201
-
"action": "delete",
202
-
"collection": collection_for_audit,
203
-
"rkey": rkey_for_audit
204
-
})),
205
-
None,
206
-
None,
207
-
)
208
-
.await;
209
-
}
100
+
.await?;
210
101
211
102
let deleted_uri = AtUri::from_parts(&did, &input.collection, &input.rkey);
212
103
if let Err(e) = state
···
217
108
error!("Failed to remove backlinks for {}: {}", deleted_uri, e);
218
109
}
219
110
220
-
Ok((
221
-
StatusCode::OK,
222
-
Json(DeleteRecordOutput {
223
-
commit: Some(CommitInfo {
224
-
cid: commit_result.commit_cid.to_string(),
225
-
rev: commit_result.rev,
226
-
}),
111
+
Ok(Json(DeleteRecordOutput {
112
+
commit: Some(CommitInfo {
113
+
cid: commit_result.commit_cid.to_string(),
114
+
rev: commit_result.rev,
227
115
}),
228
-
)
229
-
.into_response())
116
+
}))
230
117
}
231
118
232
-
use tranquil_pds::types::Did;
233
119
use uuid::Uuid;
234
120
235
121
pub async fn delete_record_internal(
···
239
125
collection: &Nsid,
240
126
rkey: &Rkey,
241
127
) -> Result<(), CommitError> {
128
+
use tranquil_pds::repo_ops::{CommitParams, RecordOp, commit_and_log};
129
+
242
130
let _write_lock = state.repo_write_locks.lock(user_id).await;
243
131
244
132
let root_cid_str = state
···
303
191
.await
304
192
.map_err(|e| CommitError::MstOperationFailed(format!("{:?}", e)))?;
305
193
306
-
let mut relevant_blocks = new_mst_blocks.clone();
307
-
relevant_blocks.extend(old_mst_blocks.iter().map(|(k, v)| (*k, v.clone())));
308
-
309
-
let written_cids: Vec<Cid> = tracking_store
310
-
.get_all_relevant_cids()
311
-
.into_iter()
312
-
.chain(relevant_blocks.keys().copied())
313
-
.collect::<std::collections::HashSet<_>>()
314
-
.into_iter()
315
-
.collect();
316
-
317
-
let written_cids_str: Vec<String> = written_cids.iter().map(|c| c.to_string()).collect();
318
-
319
194
let obsolete_cids: Vec<Cid> = std::iter::once(current_root_cid)
320
195
.chain(
321
196
old_mst_blocks
···
326
201
.chain(std::iter::once(prev_cid))
327
202
.collect();
328
203
204
+
let mut relevant_blocks = new_mst_blocks;
205
+
relevant_blocks.extend(old_mst_blocks);
206
+
207
+
let written_cids: Vec<Cid> = tracking_store
208
+
.get_all_relevant_cids()
209
+
.into_iter()
210
+
.chain(relevant_blocks.keys().copied())
211
+
.collect::<std::collections::HashSet<_>>()
212
+
.into_iter()
213
+
.collect();
214
+
215
+
let written_cids_str: Vec<String> = written_cids.iter().map(ToString::to_string).collect();
216
+
329
217
commit_and_log(
330
218
state,
331
219
CommitParams {
+8
-7
crates/tranquil-api/src/server/mod.rs
+8
-7
crates/tranquil-api/src/server/mod.rs
···
23
23
};
24
24
pub use app_password::{create_app_password, list_app_passwords, revoke_app_password};
25
25
pub use email::{
26
-
authorize_email_update, check_channel_verified, check_email_in_use,
27
-
check_email_update_status, check_email_verified, confirm_email, request_email_update,
28
-
update_email,
26
+
authorize_email_update, check_channel_verified, check_email_in_use, check_email_update_status,
27
+
check_email_verified, confirm_email, request_email_update, update_email,
29
28
};
30
29
pub use invite::{create_invite_code, create_invite_codes, get_account_invite_codes};
31
30
pub use logo::get_logo;
···
44
43
set_password,
45
44
};
46
45
pub use reauth::{
47
-
check_legacy_session_mfa, check_reauth_required, get_reauth_status,
48
-
legacy_mfa_required_response, reauth_passkey_finish, reauth_passkey_start, reauth_password,
49
-
reauth_required_response, reauth_totp, update_mfa_verified,
46
+
check_legacy_session_mfa, check_reauth_required, get_reauth_status, reauth_passkey_finish,
47
+
reauth_passkey_start, reauth_password, reauth_totp, update_mfa_verified,
50
48
};
51
49
pub use service_auth::get_service_auth;
52
50
pub use session::{
···
64
62
trust_device, update_trusted_device,
65
63
};
66
64
pub use verify_email::{resend_migration_verification, verify_migration_email};
67
-
pub use verify_token::{VerifyTokenInput, VerifyTokenOutput, verify_token, verify_token_internal};
65
+
pub use verify_token::{
66
+
VerifyTokenInput, VerifyTokenOutput, confirm_channel_verification, verify_token,
67
+
verify_token_internal,
68
+
};
+2
-2
crates/tranquil-api/src/server/service_auth.rs
+2
-2
crates/tranquil-api/src/server/service_auth.rs
···
112
112
};
113
113
114
114
let lxm = params.lxm.as_ref();
115
-
let lxm_for_token = lxm.map_or("*", |n| n.as_str());
115
+
let lxm_for_token = lxm.map_or("*", |v| v.as_str());
116
116
117
117
if let Some(method) = lxm {
118
118
if let Err(e) = tranquil_pds::auth::scope_check::check_rpc_scope(
···
121
121
params.aud.as_str(),
122
122
method.as_str(),
123
123
) {
124
-
return e;
124
+
return e.into_response();
125
125
}
126
126
} else if auth.is_oauth() {
127
127
let permissions = auth.permissions();
+4
-17
crates/tranquil-api/src/server/verify_email.rs
+4
-17
crates/tranquil-api/src/server/verify_email.rs
···
74
74
return Ok(Json(ResendMigrationVerificationOutput { sent: true }));
75
75
}
76
76
77
-
let hostname = &tranquil_config::get().server.hostname;
78
-
let token = tranquil_pds::auth::verification_token::generate_migration_token(
79
-
&user.did,
80
-
channel,
81
-
&identifier,
82
-
);
83
-
let formatted_token = tranquil_pds::auth::verification_token::format_token_for_display(&token);
84
-
85
-
if let Err(e) = tranquil_pds::comms::comms_repo::enqueue_migration_verification(
86
-
state.user_repo.as_ref(),
87
-
state.infra_repo.as_ref(),
77
+
crate::identity::provision::enqueue_migration_verification(
78
+
&state,
88
79
user.id,
80
+
&user.did,
89
81
channel,
90
82
&identifier,
91
-
&formatted_token,
92
-
hostname,
93
83
)
94
-
.await
95
-
{
96
-
warn!(error = ?e, channel = ?channel, "Failed to enqueue migration verification");
97
-
}
84
+
.await;
98
85
99
86
info!(did = %user.did, channel = ?channel, "Resent migration verification");
100
87
+4
-4
crates/tranquil-api/src/temp.rs
+4
-4
crates/tranquil-api/src/temp.rs
···
6
6
use cid::Cid;
7
7
use jacquard_repo::storage::BlockStore;
8
8
use serde::{Deserialize, Serialize};
9
+
use serde_json::Value;
9
10
use std::str::FromStr;
10
11
use tranquil_pds::api::error::ApiError;
11
12
use tranquil_pds::auth::{Active, Auth, Permissive};
···
51
52
State(state): State<AppState>,
52
53
_auth: Auth<Active>,
53
54
Json(input): Json<DereferenceScopeInput>,
54
-
) -> Result<Response, ApiError> {
55
+
) -> Result<Json<DereferenceScopeOutput>, ApiError> {
55
56
let scope_parts: Vec<&str> = input.scope.split_whitespace().collect();
56
57
let mut resolved_scopes: Vec<String> = Vec::new();
57
58
···
96
97
}
97
98
};
98
99
99
-
if let Some(scope_value) = scope_record.get("scope").and_then(|v| v.as_str()) {
100
+
if let Some(scope_value) = scope_record.get("scope").and_then(Value::as_str) {
100
101
let _ = state
101
102
.cache
102
103
.set(
···
118
119
119
120
Ok(Json(DereferenceScopeOutput {
120
121
scope: resolved_scopes.join(" "),
121
-
})
122
-
.into_response())
122
+
}))
123
123
}
History
1 round
0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
refactor(api): update repo batch/delete to use repo_ops, clean up remaining repo endpoints
expand 0 comments
pull request successfully merged