Alternative ATProto PDS implementation

prototype apis com atproto repo

+1
Cargo.lock
··· 1326 1326 "tower-http", 1327 1327 "tracing", 1328 1328 "tracing-subscriber", 1329 + "ubyte", 1329 1330 "url", 1330 1331 "urlencoding", 1331 1332 "uuid 1.16.0",
+1
Cargo.toml
··· 269 269 "sqlite", 270 270 "tracing", 271 271 ] } 272 + ubyte = "0.10.4"
+49 -55
src/apis/com/atproto/repo/import_repo.rs
··· 1 + use axum::{body::Bytes, http::HeaderMap}; 1 2 use reqwest::header; 2 3 use rsky_common::env::env_int; 3 4 use rsky_repo::block_map::BlockMap; ··· 6 7 use rsky_repo::repo::Repo; 7 8 use rsky_repo::sync::consumer::{VerifyRepoInput, verify_diff}; 8 9 use rsky_repo::types::{RecordWriteDescript, VerifiedDiff}; 9 - use serde::Deserialize; 10 - use std::num::NonZeroU64; 10 + use ubyte::ToByteUnit; 11 11 12 12 use super::*; 13 13 14 - struct ImportRepoInput { 15 - car_with_root: CarWithRoot, 16 - } 17 - 18 - // #[rocket::async_trait] 19 - // impl<'r> FromData<'r> for ImportRepoInput { 20 - // type Error = ApiError; 21 - 22 - // #[tracing::instrument(skip_all)] 23 - // async fn from_data(req: &'r Request<'_>, data: Data<'r>) -> Outcome<'r, Self, Self::Error> { 24 - // let max_import_size = env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes(); 25 - // match req.headers().get_one(header::CONTENT_LENGTH.as_ref()) { 26 - // None => { 27 - // let error = ApiError::InvalidRequest("Missing content-length header".to_string()); 28 - // req.local_cache(|| Some(error.clone())); 29 - // Outcome::Error((Status::BadRequest, error)) 30 - // } 31 - // Some(res) => match res.parse::<NonZeroU64>() { 32 - // Ok(content_length) => { 33 - // if content_length.get().bytes() > max_import_size { 34 - // let error = ApiError::InvalidRequest(format!( 35 - // "Content-Length is greater than maximum of {max_import_size}" 36 - // )); 37 - // req.local_cache(|| Some(error.clone())); 38 - // return Outcome::Error((Status::BadRequest, error)); 39 - // } 40 - 41 - // let import_datastream = data.open(content_length.get().bytes()); 42 - // match read_stream_car_with_root(import_datastream).await { 43 - // Ok(car_with_root) => Outcome::Success(ImportRepoInput { car_with_root }), 44 - // Err(error) => { 45 - // let error = ApiError::InvalidRequest(error.to_string()); 46 - // req.local_cache(|| Some(error.clone())); 47 - // Outcome::Error((Status::BadRequest, error)) 48 - // } 49 - // } 50 - // } 51 - // Err(_error) => { 52 - // tracing::error!("{}", format!("Error parsing content-length\n{_error}")); 53 - // let error = 54 - // ApiError::InvalidRequest("Error parsing content-length".to_string()); 55 - // req.local_cache(|| Some(error.clone())); 56 - // Outcome::Error((Status::BadRequest, error)) 57 - // } 58 - // }, 59 - // } 60 - // } 61 - // } 14 + async fn from_data(bytes: Bytes) -> Result<CarWithRoot, ApiError> { 15 + let max_import_size = env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes(); 16 + if bytes.len() > max_import_size { 17 + return Err(ApiError::InvalidRequest(format!( 18 + "Content-Length is greater than maximum of {max_import_size}" 19 + ))); 20 + } 62 21 63 - // TODO: lookup axum docs to impl deserialize 22 + let mut cursor = std::io::Cursor::new(bytes); 23 + match read_stream_car_with_root(&mut cursor).await { 24 + Ok(car_with_root) => Ok(car_with_root), 25 + Err(error) => { 26 + tracing::error!("Error reading stream car with root\n{error}"); 27 + Err(ApiError::InvalidRequest("Invalid CAR file".to_owned())) 28 + } 29 + } 30 + } 64 31 65 32 #[tracing::instrument(skip_all)] 66 33 #[axum::debug_handler(state = AppState)] 34 + /// Import a repo in the form of a CAR file. Requires Content-Length HTTP header to be set. 35 + /// Request 36 + /// mime application/vnd.ipld.car 37 + /// Body - required 67 38 pub async fn import_repo( 68 39 // auth: AccessFullImport, 69 40 auth: AuthenticatedUser, 70 - Query(import_repo_input): Query<ImportRepoInput>, 41 + headers: HeaderMap, 71 42 State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 43 + body: Bytes, 72 44 ) -> Result<(), ApiError> { 73 45 // let requester = auth.access.credentials.unwrap().did.unwrap(); 74 46 let requester = auth.did(); 75 47 let mut actor_store = ActorStore::from_actor_pools(&requester, &actor_pools).await; 76 48 49 + // Check headers 50 + let content_length = headers 51 + .get(header::CONTENT_LENGTH) 52 + .expect("no content length provided") 53 + .to_str() 54 + .map_err(anyhow::Error::from) 55 + .and_then(|content_length| content_length.parse::<u64>().map_err(anyhow::Error::from)) 56 + .expect("invalid content-length header"); 57 + if content_length > env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes() { 58 + return Err(ApiError::InvalidRequest(format!( 59 + "Content-Length is greater than maximum of {}", 60 + env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes() 61 + ))); 62 + }; 63 + 77 64 // Get current repo if it exists 78 65 let curr_root: Option<Cid> = actor_store.get_repo_root().await; 79 66 let curr_repo: Option<Repo> = match curr_root { ··· 82 69 }; 83 70 84 71 // Process imported car 85 - let car_with_root = import_repo_input.car_with_root; 72 + // let car_with_root = import_repo_input.car_with_root; 73 + let car_with_root: CarWithRoot = match from_data(body).await { 74 + Ok(car) => car, 75 + Err(error) => { 76 + tracing::error!("Error importing repo\n{error:?}"); 77 + return Err(ApiError::InvalidRequest("Invalid CAR file".to_owned())); 78 + } 79 + }; 86 80 87 81 // Get verified difference from current repo and imported repo 88 82 let mut imported_blocks: BlockMap = car_with_root.blocks; ··· 127 121 128 122 /// Converts list of RecordWriteDescripts into a list of PreparedWrites 129 123 async fn prepare_import_repo_writes( 130 - _did: String, 124 + did: String, 131 125 writes: Vec<RecordWriteDescript>, 132 126 blocks: &BlockMap, 133 127 ) -> Result<Vec<PreparedWrite>, ApiError> { 134 128 match stream::iter(writes) 135 129 .then(|write| { 136 - let did = _did.clone(); 130 + let did = did.clone(); 137 131 async move { 138 132 Ok::<PreparedWrite, anyhow::Error>(match write { 139 133 RecordWriteDescript::Create(write) => {
+15 -8
src/apis/com/atproto/repo/list_missing_blobs.rs
··· 1 - //! 1 + //! Returns a list of missing blobs for the requesting account. Intended to be used in the account migration flow. 2 2 use rsky_lexicon::com::atproto::repo::ListMissingBlobsOutput; 3 3 use rsky_pds::actor_store::blob::ListMissingBlobsOpts; 4 4 5 5 use super::*; 6 6 7 - /// 7 + /// Returns a list of missing blobs for the requesting account. Intended to be used in the account migration flow. 8 + /// Request 9 + /// Query Parameters 10 + /// limit integer 11 + /// Possible values: >= 1 and <= 1000 12 + /// Default value: 500 13 + /// cursor string 14 + /// Responses 15 + /// cursor string 16 + /// blobs object[] 8 17 #[tracing::instrument(skip_all)] 9 18 #[axum::debug_handler(state = AppState)] 10 19 pub async fn list_missing_blobs( ··· 14 23 ) -> Result<Json<ListMissingBlobsOutput>, ApiError> { 15 24 let cursor = input.cursor; 16 25 let limit = input.limit; 17 - let limit: Option<u16> = Some(limit.unwrap().into()); 26 + let default_limit: atrium_api::types::LimitedNonZeroU16<1000> = 27 + atrium_api::types::LimitedNonZeroU16::try_from(500).expect("default limit"); 28 + let limit: u16 = limit.unwrap_or(default_limit).into(); 18 29 // let did = auth.access.credentials.unwrap().did.unwrap(); 19 30 let did = user.did(); 20 - let limit: u16 = limit.unwrap_or(500); 21 31 22 32 let actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 23 33 ··· 27 37 .await 28 38 { 29 39 Ok(blobs) => { 30 - let cursor = match blobs.last() { 31 - Some(last_blob) => Some(last_blob.cid.clone()), 32 - None => None, 33 - }; 40 + let cursor = blobs.last().map(|last_blob| last_blob.cid.clone()); 34 41 Ok(Json(ListMissingBlobsOutput { cursor, blobs })) 35 42 } 36 43 Err(error) => {
+5 -3
src/apis/com/atproto/repo/put_record.rs
··· 86 86 }; 87 87 88 88 match current { 89 - Some(current) if current.cid == write.cid().unwrap().to_string() => (None, write), 89 + Some(current) if current.cid == write.cid().expect("write cid").to_string() => { 90 + (None, write) 91 + } 90 92 _ => { 91 93 let commit = actor_store 92 94 .process_writes(vec![write.clone()], swap_commit_cid) ··· 97 99 }; 98 100 99 101 if let Some(commit) = commit { 100 - sequencer 102 + _ = sequencer 101 103 .write() 102 104 .await 103 105 .sequence_commit(did.clone(), commit.clone()) ··· 115 117 } 116 118 Ok(PutRecordOutput { 117 119 uri: write.uri().to_string(), 118 - cid: write.cid().unwrap().to_string(), 120 + cid: write.cid().expect("write cid").to_string(), 119 121 }) 120 122 } else { 121 123 bail!("Could not find repo: `{repo}`")
+1 -1
src/apis/com/atproto/repo/upload_blob.rs
··· 50 50 51 51 Ok(BlobOutput { 52 52 blob: Blob { 53 - r#type: Some("blob".to_string()), 53 + r#type: Some("blob".to_owned()), 54 54 r#ref: Some(blobref.get_cid()?), 55 55 cid: None, 56 56 mime_type: blobref.get_mime_type().to_string(),