Alternative ATProto PDS implementation

prototype spis com atproto repo

+6 -4
src/actor_store/blob.rs
··· 6 6 7 7 use crate::models::actor_store as models; 8 8 use anyhow::{Result, bail}; 9 + use axum::body::Bytes; 9 10 use cidv10::Cid; 10 11 use diesel::dsl::{count_distinct, exists, not}; 11 12 use diesel::sql_types::{Integer, Nullable, Text}; ··· 138 139 pub async fn upload_blob_and_get_metadata( 139 140 &self, 140 141 user_suggested_mime: String, 141 - blob: Vec<u8>, 142 + blob: Bytes, 142 143 ) -> Result<BlobMetadata> { 143 144 let bytes = blob; 144 145 let size = bytes.len() as i64; 145 146 146 147 let (temp_key, sha256, img_info, sniffed_mime) = try_join!( 147 148 self.blobstore.put_temp(bytes.clone()), 148 - sha256_stream(bytes.clone()), 149 - image::maybe_get_info(bytes.clone()), 150 - image::mime_type_from_bytes(bytes.clone()) 149 + // TODO: reimpl funcs to use Bytes instead of Vec<u8> 150 + sha256_stream(bytes.to_vec()), 151 + image::maybe_get_info(bytes.to_vec()), 152 + image::mime_type_from_bytes(bytes.to_vec()) 151 153 )?; 152 154 153 155 let cid = sha256_raw_to_cid(sha256);
+8 -7
src/actor_store/blob_fs.rs
··· 1 1 //! File system implementation of blob storage 2 2 //! Based on the S3 implementation but using local file system instead 3 3 use anyhow::Result; 4 + use axum::body::Bytes; 4 5 use cidv10::Cid; 5 6 use rsky_common::get_random_str; 6 7 use rsky_repo::error::BlobError; ··· 12 13 13 14 /// ByteStream implementation for blob data 14 15 pub struct ByteStream { 15 - pub bytes: Vec<u8>, 16 + pub bytes: Bytes, 16 17 } 17 18 18 19 impl ByteStream { 19 20 /// Create a new ByteStream with the given bytes 20 - pub const fn new(bytes: Vec<u8>) -> Self { 21 + pub const fn new(bytes: Bytes) -> Self { 21 22 Self { bytes } 22 23 } 23 24 24 25 /// Collect the bytes from the stream 25 - pub async fn collect(self) -> Result<Vec<u8>> { 26 + pub async fn collect(self) -> Result<Bytes> { 26 27 Ok(self.bytes) 27 28 } 28 29 } ··· 99 100 } 100 101 101 102 /// Store a blob temporarily 102 - pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> { 103 + pub async fn put_temp(&self, bytes: Bytes) -> Result<String> { 103 104 let key = self.gen_key(); 104 105 let temp_path = self.get_tmp_path(&key); 105 106 ··· 142 143 } 143 144 144 145 /// Store a blob directly as permanent 145 - pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> { 146 + pub async fn put_permanent(&self, cid: Cid, bytes: Bytes) -> Result<()> { 146 147 let target_path = self.get_stored_path(cid); 147 148 148 149 // Ensure the directory exists ··· 188 189 let blob_path = self.get_stored_path(cid); 189 190 190 191 match async_fs::read(&blob_path).await { 191 - Ok(bytes) => Ok(ByteStream::new(bytes)), 192 + Ok(bytes) => Ok(ByteStream::new(Bytes::from(bytes))), 192 193 Err(e) => { 193 194 error!("Failed to read blob at path {:?}: {}", blob_path, e); 194 195 Err(anyhow::Error::new(BlobError::BlobNotFoundError)) ··· 197 198 } 198 199 199 200 /// Get blob bytes 200 - pub async fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>> { 201 + pub async fn get_bytes(&self, cid: Cid) -> Result<Bytes> { 201 202 let stream = self.get_object(cid).await?; 202 203 stream.collect().await 203 204 }
+8 -30
src/apis/com/atproto/repo/apply_writes.rs
··· 1 1 //! Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS. 2 - use crate::account_manager::AccountManager; 3 - use crate::account_manager::helpers::account::AvailabilityFlags; 4 - use crate::{ 5 - actor_store::ActorStore, 6 - auth::AuthenticatedUser, 7 - error::ApiError, 8 - serve::{ActorStorage, AppState}, 9 - }; 10 - use anyhow::{Result, bail}; 11 - use axum::{Json, extract::State}; 12 - use cidv10::Cid; 13 - use futures::stream::{self, StreamExt}; 14 - use rsky_lexicon::com::atproto::repo::{ApplyWritesInput, ApplyWritesInputRefWrite}; 15 - use rsky_pds::repo::prepare::{ 16 - PrepareCreateOpts, PrepareDeleteOpts, PrepareUpdateOpts, prepare_create, prepare_delete, 17 - prepare_update, 18 - }; 19 - use rsky_pds::sequencer::Sequencer; 20 - use rsky_repo::types::PreparedWrite; 21 - use std::collections::HashMap; 22 - use std::hash::RandomState; 23 - use std::str::FromStr; 24 - use std::sync::Arc; 25 - use tokio::sync::RwLock; 2 + 3 + use super::*; 26 4 27 5 async fn inner_apply_writes( 28 6 body: ApplyWritesInput, 29 - user: AuthenticatedUser, 7 + auth: AuthenticatedUser, 30 8 sequencer: Arc<RwLock<Sequencer>>, 31 9 actor_pools: HashMap<String, ActorStorage>, 32 10 account_manager: Arc<RwLock<AccountManager>>, ··· 55 33 bail!("Account is deactivated") 56 34 } 57 35 let did = account.did; 58 - if did != user.did() { 36 + if did != auth.did() { 59 37 bail!("AuthRequiredError") 60 38 } 61 39 let did: &String = &did; ··· 64 42 } 65 43 66 44 let writes: Vec<PreparedWrite> = stream::iter(tx.writes) 67 - .then(|write| async move { 45 + .then(async |write| { 68 46 Ok::<PreparedWrite, anyhow::Error>(match write { 69 47 ApplyWritesInputRefWrite::Create(write) => PreparedWrite::Create( 70 48 prepare_create(PrepareCreateOpts { ··· 147 125 /// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations. 148 126 #[axum::debug_handler(state = AppState)] 149 127 pub(crate) async fn apply_writes( 150 - user: AuthenticatedUser, 151 - State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 128 + auth: AuthenticatedUser, 129 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 152 130 State(account_manager): State<Arc<RwLock<AccountManager>>>, 153 131 State(sequencer): State<Arc<RwLock<Sequencer>>>, 154 132 Json(body): Json<ApplyWritesInput>, 155 133 ) -> Result<(), ApiError> { 156 134 tracing::debug!("@LOG: debug apply_writes {body:#?}"); 157 - match inner_apply_writes(body, user, sequencer, db_actors, account_manager).await { 135 + match inner_apply_writes(body, auth, sequencer, actor_pools, account_manager).await { 158 136 Ok(()) => Ok(()), 159 137 Err(error) => { 160 138 tracing::error!("@LOG: ERROR: {error}");
+3 -25
src/apis/com/atproto/repo/create_record.rs
··· 1 1 //! Create a single new repository record. Requires auth, implemented by PDS. 2 - use crate::account_manager::AccountManager; 3 - use crate::account_manager::helpers::account::AvailabilityFlags; 4 - use crate::{ 5 - actor_store::ActorStore, 6 - auth::AuthenticatedUser, 7 - error::ApiError, 8 - serve::{ActorStorage, AppState}, 9 - }; 10 - use anyhow::{Result, bail}; 11 - use axum::{Json, extract::State}; 12 - use cidv10::Cid; 13 - use rsky_lexicon::com::atproto::repo::{CreateRecordInput, CreateRecordOutput}; 14 - use rsky_pds::SharedIdResolver; 15 - use rsky_pds::repo::prepare::{ 16 - PrepareCreateOpts, PrepareDeleteOpts, prepare_create, prepare_delete, 17 - }; 18 - use rsky_pds::sequencer::Sequencer; 19 - use rsky_repo::types::{PreparedDelete, PreparedWrite}; 20 - use rsky_syntax::aturi::AtUri; 21 - use std::collections::HashMap; 22 - use std::hash::RandomState; 23 - use std::str::FromStr; 24 - use std::sync::Arc; 25 - use tokio::sync::RwLock; 2 + 3 + use super::*; 26 4 27 5 async fn inner_create_record( 28 6 body: CreateRecordInput, 29 7 user: AuthenticatedUser, 30 8 sequencer: Arc<RwLock<Sequencer>>, 31 - actor_pools: std::collections::HashMap<String, ActorStorage>, 9 + actor_pools: HashMap<String, ActorStorage>, 32 10 account_manager: Arc<RwLock<AccountManager>>, 33 11 ) -> Result<CreateRecordOutput> { 34 12 let CreateRecordInput {
+2 -22
src/apis/com/atproto/repo/delete_record.rs
··· 1 - /// Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 2 - use crate::account_manager::AccountManager; 3 - use crate::account_manager::helpers::account::AvailabilityFlags; 4 - use crate::{ 5 - actor_store::ActorStore, 6 - auth::AuthenticatedUser, 7 - error::ApiError, 8 - serve::{ActorStorage, AppState}, 9 - }; 10 - use anyhow::{Result, bail}; 11 - use axum::{Json, extract::State}; 12 - use cidv10::Cid; 13 - use rsky_lexicon::com::atproto::repo::DeleteRecordInput; 14 - use rsky_pds::repo::prepare::{PrepareDeleteOpts, prepare_delete}; 15 - use rsky_pds::sequencer::Sequencer; 16 - use rsky_repo::types::PreparedWrite; 17 - use rsky_syntax::aturi::AtUri; 18 - use std::collections::HashMap; 19 - use std::hash::RandomState; 20 - use std::str::FromStr; 21 - use std::sync::Arc; 22 - use tokio::sync::RwLock; 1 + //! Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 2 + use super::*; 23 3 24 4 async fn inner_delete_record( 25 5 body: DeleteRecordInput,
+10 -18
src/apis/com/atproto/repo/describe_repo.rs
··· 1 1 //! Get information about an account and repository, including the list of collections. Does not require auth. 2 - use crate::account_manager::AccountManager; 3 - use crate::serve::ActorStorage; 4 - use crate::{actor_store::ActorStore, error::ApiError, serve::AppState}; 5 - use anyhow::{Result, bail}; 6 - use axum::extract::Query; 7 - use axum::{Json, extract::State}; 8 - use rsky_identity::IdResolver; 9 - use rsky_identity::types::DidDocument; 10 - use rsky_lexicon::com::atproto::repo::DescribeRepoOutput; 11 - use rsky_syntax::handle::INVALID_HANDLE; 12 - use std::collections::HashMap; 13 - use std::hash::RandomState; 14 - use std::sync::Arc; 15 - use tokio::sync::RwLock; 2 + use super::*; 16 3 17 4 async fn inner_describe_repo( 18 5 repo: String, ··· 28 15 match account { 29 16 None => bail!("Cound not find user: `{repo}`"), 30 17 Some(account) => { 31 - let mut lock = id_resolver.write().await; 32 - let did_doc: DidDocument = match lock.did.ensure_resolve(&account.did, None).await { 18 + let did_doc: DidDocument = match id_resolver 19 + .write() 20 + .await 21 + .did 22 + .ensure_resolve(&account.did, None) 23 + .await 24 + { 33 25 Err(err) => bail!("Could not resolve DID: `{err}`"), 34 26 Ok(res) => res, 35 27 }; ··· 41 33 let collections = actor_store.record.list_collections().await?; 42 34 43 35 Ok(DescribeRepoOutput { 44 - handle: account.handle.unwrap_or(INVALID_HANDLE.to_string()), 36 + handle: account.handle.unwrap_or_else(|| INVALID_HANDLE.to_owned()), 45 37 did: account.did, 46 38 did_doc: serde_json::to_value(did_doc)?, 47 39 collections, ··· 63 55 #[tracing::instrument(skip_all)] 64 56 #[axum::debug_handler(state = AppState)] 65 57 pub async fn describe_repo( 66 - Query(input): Query<atrium_api::com::atproto::repo::describe_repo::ParametersData>, 58 + Query(input): Query<atrium_repo::describe_repo::ParametersData>, 67 59 State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 68 60 State(account_manager): State<Arc<RwLock<AccountManager>>>, 69 61 State(id_resolver): State<Arc<RwLock<IdResolver>>>,
+3 -1
src/apis/com/atproto/repo/ex.rs
··· 25 25 #[tracing::instrument(skip_all)] 26 26 #[axum::debug_handler(state = AppState)] 27 27 pub async fn fun( 28 + auth: AuthenticatedUser, 28 29 Query(input): Query<atrium_api::com::atproto::repo::describe_repo::ParametersData>, 29 - State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 30 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 30 31 State(account_manager): State<Arc<RwLock<AccountManager>>>, 31 32 State(id_resolver): State<Arc<RwLock<IdResolver>>>, 32 33 State(sequencer): State<Arc<RwLock<Sequencer>>>, 34 + Json(body): Json<ApplyWritesInput>, 33 35 ) -> Result<Json<_>, ApiError> { 34 36 todo!(); 35 37 }
+1 -15
src/apis/com/atproto/repo/get_record.rs
··· 1 1 //! Get a single record from a repository. Does not require auth. 2 - use crate::account_manager::AccountManager; 3 - use crate::serve::ActorStorage; 4 - use crate::{actor_store::ActorStore, error::ApiError, serve::AppState}; 5 - use anyhow::{Result, bail}; 6 - use axum::extract::Query; 7 - use axum::{Json, extract::State}; 8 - use rsky_identity::IdResolver; 9 - use rsky_lexicon::com::atproto::repo::GetRecordOutput; 10 - use rsky_pds::pipethrough::{OverrideOpts, ProxyRequest, pipethrough}; 11 - use rsky_pds::sequencer::Sequencer; 12 - use rsky_syntax::aturi::AtUri; 13 - use std::collections::HashMap; 14 - use std::hash::RandomState; 15 - use std::sync::Arc; 16 - use tokio::sync::RwLock; 2 + use super::*; 17 3 18 4 async fn inner_get_record( 19 5 repo: String,
+8 -63
src/apis/com/atproto/repo/import_repo.rs
··· 1 - use crate::account_manager::AccountManager; 2 - use crate::auth::AuthenticatedUser; 3 - use crate::serve::ActorStorage; 4 - use crate::{actor_store::ActorStore, error::ApiError, serve::AppState}; 5 - use anyhow::{Result, bail}; 6 - use axum::extract::Query; 7 - use axum::{Json, extract::State}; 8 - use cidv10::Cid; 9 - use futures::{StreamExt, stream}; 10 1 use reqwest::header; 11 2 use rsky_common::env::env_int; 12 - use rsky_identity::IdResolver; 13 - use rsky_pds::repo::prepare::{ 14 - PrepareCreateOpts, PrepareDeleteOpts, PrepareUpdateOpts, prepare_create, prepare_delete, 15 - prepare_update, 16 - }; 17 - use rsky_pds::sequencer::Sequencer; 18 3 use rsky_repo::block_map::BlockMap; 19 4 use rsky_repo::car::{CarWithRoot, read_stream_car_with_root}; 20 5 use rsky_repo::parse::get_and_parse_record; 21 6 use rsky_repo::repo::Repo; 22 7 use rsky_repo::sync::consumer::{VerifyRepoInput, verify_diff}; 23 - use rsky_repo::types::{PreparedWrite, RecordWriteDescript, VerifiedDiff}; 8 + use rsky_repo::types::{RecordWriteDescript, VerifiedDiff}; 24 9 use serde::Deserialize; 25 - use std::collections::HashMap; 26 - use std::hash::RandomState; 27 10 use std::num::NonZeroU64; 28 - use std::sync::Arc; 29 - use tokio::sync::RwLock; 11 + 12 + use super::*; 30 13 31 14 struct ImportRepoInput { 32 15 car_with_root: CarWithRoot, 33 16 } 34 17 35 - impl<'de> Deserialize<'de> for ImportRepoInput { 36 - fn deserialize<D>(deserializer: D) -> core::result::Result<Self, D::Error> 37 - where 38 - D: serde::Deserializer<'de>, 39 - { 40 - async fn from_data(req: &Request<'_>, data: Data<'_>) -> Result<CarWithRoot, ApiError> { 41 - let max_import_size = env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes(); 42 - match req.headers().get_one(header::CONTENT_LENGTH.as_ref()) { 43 - None => { 44 - let error = 45 - ApiError::InvalidRequest("Missing content-length header".to_string()); 46 - req.local_cache(|| Some(error.clone())); 47 - return Err(error); 48 - } 49 - Some(res) => match res.parse::<NonZeroU64>() { 50 - Ok(content_length) => { 51 - if content_length.get().bytes() > max_import_size { 52 - let error = ApiError::InvalidRequest(format!( 53 - "Content-Length is greater than maximum of {max_import_size}" 54 - )); 55 - req.local_cache(|| Some(error.clone())); 56 - return Err(error); 57 - } 58 - 59 - let import_datastream = data.open(content_length.get().bytes()); 60 - read_stream_car_with_root(import_datastream).await 61 - } 62 - Err(_error) => { 63 - tracing::error!("{}", format!("Error parsing content-length\n{_error}")); 64 - let error = 65 - ApiError::InvalidRequest("Error parsing content-length".to_string()); 66 - req.local_cache(|| Some(error.clone())); 67 - Err(error) 68 - } 69 - }, 70 - } 71 - } 72 - } 73 - } 74 - 75 18 // #[rocket::async_trait] 76 19 // impl<'r> FromData<'r> for ImportRepoInput { 77 20 // type Error = ApiError; ··· 117 60 // } 118 61 // } 119 62 63 + // TODO: lookup axum docs to impl deserialize 64 + 120 65 #[tracing::instrument(skip_all)] 121 66 #[axum::debug_handler(state = AppState)] 122 67 pub async fn import_repo( 123 68 // auth: AccessFullImport, 124 - user: AuthenticatedUser, 69 + auth: AuthenticatedUser, 125 70 Query(import_repo_input): Query<ImportRepoInput>, 126 71 State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 127 72 ) -> Result<(), ApiError> { 128 73 // let requester = auth.access.credentials.unwrap().did.unwrap(); 129 - let requester = user.did(); 74 + let requester = auth.did(); 130 75 let mut actor_store = ActorStore::from_actor_pools(&requester, &actor_pools).await; 131 76 132 77 // Get current repo if it exists ··· 233 178 .collect::<Vec<_>>() 234 179 .await 235 180 .into_iter() 236 - .collect::<anyhow::Result<Vec<PreparedWrite>, _>>() 181 + .collect::<Result<Vec<PreparedWrite>, _>>() 237 182 { 238 183 Ok(res) => Ok(res), 239 184 Err(error) => {
+41
src/apis/com/atproto/repo/list_missing_blobs.rs
··· 1 + //! 2 + use rsky_lexicon::com::atproto::repo::ListMissingBlobsOutput; 3 + use rsky_pds::actor_store::blob::ListMissingBlobsOpts; 4 + 5 + use super::*; 6 + 7 + /// 8 + #[tracing::instrument(skip_all)] 9 + #[axum::debug_handler(state = AppState)] 10 + pub async fn list_missing_blobs( 11 + user: AuthenticatedUser, 12 + Query(input): Query<atrium_repo::list_missing_blobs::ParametersData>, 13 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 14 + ) -> Result<Json<ListMissingBlobsOutput>, ApiError> { 15 + let cursor = input.cursor; 16 + let limit = input.limit; 17 + let limit: Option<u16> = Some(limit.unwrap().into()); 18 + // let did = auth.access.credentials.unwrap().did.unwrap(); 19 + let did = user.did(); 20 + let limit: u16 = limit.unwrap_or(500); 21 + 22 + let actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 23 + 24 + match actor_store 25 + .blob 26 + .list_missing_blobs(ListMissingBlobsOpts { cursor, limit }) 27 + .await 28 + { 29 + Ok(blobs) => { 30 + let cursor = match blobs.last() { 31 + Some(last_blob) => Some(last_blob.cid.clone()), 32 + None => None, 33 + }; 34 + Ok(Json(ListMissingBlobsOutput { cursor, blobs })) 35 + } 36 + Err(error) => { 37 + tracing::error!("{error:?}"); 38 + Err(ApiError::RuntimeError) 39 + } 40 + } 41 + }
+146
src/apis/com/atproto/repo/list_records.rs
··· 1 + //! List a range of records in a repository, matching a specific collection. Does not require auth. 2 + use super::*; 3 + 4 + // #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] 5 + // #[serde(rename_all = "camelCase")] 6 + // /// Parameters for [`list_records`]. 7 + // pub(super) struct ListRecordsParameters { 8 + // ///The NSID of the record type. 9 + // pub collection: Nsid, 10 + // /// The cursor to start from. 11 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 12 + // pub cursor: Option<String>, 13 + // ///The number of records to return. 14 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 15 + // pub limit: Option<String>, 16 + // ///The handle or DID of the repo. 17 + // pub repo: AtIdentifier, 18 + // ///Flag to reverse the order of the returned records. 19 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 20 + // pub reverse: Option<bool>, 21 + // ///DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) 22 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 23 + // pub rkey_end: Option<String>, 24 + // ///DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) 25 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 26 + // pub rkey_start: Option<String>, 27 + // } 28 + 29 + #[expect(non_snake_case, clippy::too_many_arguments)] 30 + async fn inner_list_records( 31 + // The handle or DID of the repo. 32 + repo: String, 33 + // The NSID of the record type. 34 + collection: String, 35 + // The number of records to return. 36 + limit: u16, 37 + cursor: Option<String>, 38 + // DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) 39 + rkeyStart: Option<String>, 40 + // DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) 41 + rkeyEnd: Option<String>, 42 + // Flag to reverse the order of the returned records. 43 + reverse: bool, 44 + // The actor pools 45 + actor_pools: HashMap<String, ActorStorage>, 46 + account_manager: Arc<RwLock<AccountManager>>, 47 + ) -> Result<ListRecordsOutput> { 48 + if limit > 100 { 49 + bail!("Error: limit can not be greater than 100") 50 + } 51 + let did = account_manager 52 + .read() 53 + .await 54 + .get_did_for_actor(&repo, None) 55 + .await?; 56 + if let Some(did) = did { 57 + let mut actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 58 + 59 + let records: Vec<Record> = actor_store 60 + .record 61 + .list_records_for_collection( 62 + collection, 63 + limit as i64, 64 + reverse, 65 + cursor, 66 + rkeyStart, 67 + rkeyEnd, 68 + None, 69 + ) 70 + .await? 71 + .into_iter() 72 + .map(|record| { 73 + Ok(Record { 74 + uri: record.uri.clone(), 75 + cid: record.cid.clone(), 76 + value: serde_json::to_value(record)?, 77 + }) 78 + }) 79 + .collect::<Result<Vec<Record>>>()?; 80 + 81 + let last_record = records.last(); 82 + let cursor: Option<String>; 83 + if let Some(last_record) = last_record { 84 + let last_at_uri: AtUri = last_record.uri.clone().try_into()?; 85 + cursor = Some(last_at_uri.get_rkey()); 86 + } else { 87 + cursor = None; 88 + } 89 + Ok(ListRecordsOutput { records, cursor }) 90 + } else { 91 + bail!("Could not find repo: {repo}") 92 + } 93 + } 94 + 95 + /// List a range of records in a repository, matching a specific collection. Does not require auth. 96 + /// - GET /xrpc/com.atproto.repo.listRecords 97 + /// ### Query Parameters 98 + /// - `repo`: `at-identifier` // The handle or DID of the repo. 99 + /// - `collection`: `nsid` // The NSID of the record type. 100 + /// - `limit`: `integer` // The maximum number of records to return. Default 50, >=1 and <=100. 101 + /// - `cursor`: `string` 102 + /// - `reverse`: `boolean` // Flag to reverse the order of the returned records. 103 + /// ### Responses 104 + /// - 200 OK: {"cursor": "string","records": [{"uri": "string","cid": "string","value": {}}]} 105 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 106 + /// - 401 Unauthorized 107 + #[tracing::instrument(skip_all)] 108 + #[allow(non_snake_case)] 109 + #[axum::debug_handler(state = AppState)] 110 + pub async fn list_records( 111 + Query(input): Query<atrium_repo::list_records::ParametersData>, 112 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 113 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 114 + ) -> Result<Json<ListRecordsOutput>, ApiError> { 115 + let repo = input.repo; 116 + let collection = input.collection; 117 + let limit: Option<u8> = input.limit.map(u8::from); 118 + let limit: Option<u16> = limit.map(|x| x.into()); 119 + let cursor = input.cursor; 120 + let reverse = input.reverse; 121 + let rkeyStart = None; 122 + let rkeyEnd = None; 123 + 124 + let limit = limit.unwrap_or(50); 125 + let reverse = reverse.unwrap_or(false); 126 + 127 + match inner_list_records( 128 + repo.into(), 129 + collection.into(), 130 + limit, 131 + cursor, 132 + rkeyStart, 133 + rkeyEnd, 134 + reverse, 135 + actor_pools, 136 + account_manager, 137 + ) 138 + .await 139 + { 140 + Ok(res) => Ok(Json(res)), 141 + Err(error) => { 142 + tracing::error!("@LOG: ERROR: {error}"); 143 + Err(ApiError::RuntimeError) 144 + } 145 + } 146 + }
+68 -18
src/apis/com/atproto/repo/mod.rs
··· 1 - use atrium_api::com::atproto::repo; 1 + use atrium_api::com::atproto::repo as atrium_repo; 2 2 use axum::{ 3 3 Router, 4 4 routing::{get, post}, 5 5 }; 6 6 use constcat::concat; 7 7 8 - use crate::serve::AppState; 9 - 10 8 pub mod apply_writes; 11 9 pub mod create_record; 12 10 pub mod delete_record; 13 11 pub mod describe_repo; 14 12 pub mod get_record; 15 - // pub mod import_repo; 16 - // pub mod list_missing_blobs; 17 - // pub mod list_records; 18 - // pub mod put_record; 19 - // pub mod upload_blob; 13 + pub mod import_repo; 14 + pub mod list_missing_blobs; 15 + pub mod list_records; 16 + pub mod put_record; 17 + pub mod upload_blob; 18 + 19 + use crate::account_manager::AccountManager; 20 + use crate::account_manager::helpers::account::AvailabilityFlags; 21 + use crate::{ 22 + actor_store::ActorStore, 23 + auth::AuthenticatedUser, 24 + error::ApiError, 25 + serve::{ActorStorage, AppState}, 26 + }; 27 + use anyhow::{Result, bail}; 28 + use axum::extract::Query; 29 + use axum::{Json, extract::State}; 30 + use cidv10::Cid; 31 + use futures::stream::{self, StreamExt}; 32 + use rsky_identity::IdResolver; 33 + use rsky_identity::types::DidDocument; 34 + use rsky_lexicon::com::atproto::repo::DeleteRecordInput; 35 + use rsky_lexicon::com::atproto::repo::DescribeRepoOutput; 36 + use rsky_lexicon::com::atproto::repo::GetRecordOutput; 37 + use rsky_lexicon::com::atproto::repo::{ApplyWritesInput, ApplyWritesInputRefWrite}; 38 + use rsky_lexicon::com::atproto::repo::{CreateRecordInput, CreateRecordOutput}; 39 + use rsky_lexicon::com::atproto::repo::{ListRecordsOutput, Record}; 40 + // use rsky_pds::pipethrough::{OverrideOpts, ProxyRequest, pipethrough}; 41 + use rsky_pds::repo::prepare::{ 42 + PrepareCreateOpts, PrepareDeleteOpts, PrepareUpdateOpts, prepare_create, prepare_delete, 43 + prepare_update, 44 + }; 45 + use rsky_pds::sequencer::Sequencer; 46 + use rsky_repo::types::PreparedDelete; 47 + use rsky_repo::types::PreparedWrite; 48 + use rsky_syntax::aturi::AtUri; 49 + use rsky_syntax::handle::INVALID_HANDLE; 50 + use std::collections::HashMap; 51 + use std::hash::RandomState; 52 + use std::str::FromStr; 53 + use std::sync::Arc; 54 + use tokio::sync::RwLock; 20 55 21 56 /// These endpoints are part of the atproto PDS repository management APIs. \ 22 57 /// Requests usually require authentication (unlike the com.atproto.sync.* endpoints), and are made directly to the user's own PDS instance. ··· 34 69 pub(crate) fn routes() -> Router<AppState> { 35 70 Router::new() 36 71 .route( 37 - concat!("/", repo::apply_writes::NSID), 72 + concat!("/", atrium_repo::apply_writes::NSID), 38 73 post(apply_writes::apply_writes), 39 74 ) 40 75 .route( 41 - concat!("/", repo::create_record::NSID), 76 + concat!("/", atrium_repo::create_record::NSID), 42 77 post(create_record::create_record), 43 78 ) 44 - // .route(concat!("/", repo::put_record::NSID), post(put_record)) 79 + .route( 80 + concat!("/", atrium_repo::put_record::NSID), 81 + post(put_record::put_record), 82 + ) 45 83 .route( 46 - concat!("/", repo::delete_record::NSID), 84 + concat!("/", atrium_repo::delete_record::NSID), 47 85 post(delete_record::delete_record), 48 86 ) 49 - // .route(concat!("/", repo::upload_blob::NSID), post(upload_blob)) 87 + .route( 88 + concat!("/", atrium_repo::upload_blob::NSID), 89 + post(upload_blob::upload_blob), 90 + ) 50 91 .route( 51 - concat!("/", repo::describe_repo::NSID), 92 + concat!("/", atrium_repo::describe_repo::NSID), 52 93 get(describe_repo::describe_repo), 53 94 ) 54 95 .route( 55 - concat!("/", repo::get_record::NSID), 96 + concat!("/", atrium_repo::get_record::NSID), 56 97 get(get_record::get_record), 57 98 ) 58 - // .route(concat!("/", repo::import_repo::NSID), post(todo)) 59 - // .route(concat!("/", repo::list_missing_blobs::NSID), get(todo)) 60 - // .route(concat!("/", repo::list_records::NSID), get(list_records)) 99 + .route( 100 + concat!("/", atrium_repo::import_repo::NSID), 101 + post(import_repo::import_repo), 102 + ) 103 + .route( 104 + concat!("/", atrium_repo::list_missing_blobs::NSID), 105 + get(list_missing_blobs::list_missing_blobs), 106 + ) 107 + .route( 108 + concat!("/", atrium_repo::list_records::NSID), 109 + get(list_records::list_records), 110 + ) 61 111 }
+155
src/apis/com/atproto/repo/put_record.rs
··· 1 + //! Write a repository record, creating or updating it as needed. Requires auth, implemented by PDS. 2 + use anyhow::bail; 3 + use rsky_lexicon::com::atproto::repo::{PutRecordInput, PutRecordOutput}; 4 + use rsky_repo::types::CommitDataWithOps; 5 + 6 + use super::*; 7 + 8 + #[tracing::instrument(skip_all)] 9 + async fn inner_put_record( 10 + body: PutRecordInput, 11 + auth: AuthenticatedUser, 12 + sequencer: Arc<RwLock<Sequencer>>, 13 + actor_pools: HashMap<String, ActorStorage>, 14 + account_manager: Arc<RwLock<AccountManager>>, 15 + ) -> Result<PutRecordOutput> { 16 + let PutRecordInput { 17 + repo, 18 + collection, 19 + rkey, 20 + validate, 21 + record, 22 + swap_record, 23 + swap_commit, 24 + } = body; 25 + let account = account_manager 26 + .read() 27 + .await 28 + .get_account( 29 + &repo, 30 + Some(AvailabilityFlags { 31 + include_deactivated: Some(true), 32 + include_taken_down: None, 33 + }), 34 + ) 35 + .await?; 36 + if let Some(account) = account { 37 + if account.deactivated_at.is_some() { 38 + bail!("Account is deactivated") 39 + } 40 + let did = account.did; 41 + // if did != auth.access.credentials.unwrap().did.unwrap() { 42 + if did != auth.did() { 43 + bail!("AuthRequiredError") 44 + } 45 + let uri = AtUri::make(did.clone(), Some(collection.clone()), Some(rkey.clone()))?; 46 + let swap_commit_cid = match swap_commit { 47 + Some(swap_commit) => Some(Cid::from_str(&swap_commit)?), 48 + None => None, 49 + }; 50 + let swap_record_cid = match swap_record { 51 + Some(swap_record) => Some(Cid::from_str(&swap_record)?), 52 + None => None, 53 + }; 54 + let (commit, write): (Option<CommitDataWithOps>, PreparedWrite) = { 55 + let mut actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 56 + 57 + let current = actor_store 58 + .record 59 + .get_record(&uri, None, Some(true)) 60 + .await?; 61 + tracing::debug!("@LOG: debug inner_put_record, current: {current:?}"); 62 + let write: PreparedWrite = if current.is_some() { 63 + PreparedWrite::Update( 64 + prepare_update(PrepareUpdateOpts { 65 + did: did.clone(), 66 + collection, 67 + rkey, 68 + swap_cid: swap_record_cid, 69 + record: serde_json::from_value(record)?, 70 + validate, 71 + }) 72 + .await?, 73 + ) 74 + } else { 75 + PreparedWrite::Create( 76 + prepare_create(PrepareCreateOpts { 77 + did: did.clone(), 78 + collection, 79 + rkey: Some(rkey), 80 + swap_cid: swap_record_cid, 81 + record: serde_json::from_value(record)?, 82 + validate, 83 + }) 84 + .await?, 85 + ) 86 + }; 87 + 88 + match current { 89 + Some(current) if current.cid == write.cid().unwrap().to_string() => (None, write), 90 + _ => { 91 + let commit = actor_store 92 + .process_writes(vec![write.clone()], swap_commit_cid) 93 + .await?; 94 + (Some(commit), write) 95 + } 96 + } 97 + }; 98 + 99 + if let Some(commit) = commit { 100 + sequencer 101 + .write() 102 + .await 103 + .sequence_commit(did.clone(), commit.clone()) 104 + .await?; 105 + account_manager 106 + .write() 107 + .await 108 + .update_repo_root( 109 + did, 110 + commit.commit_data.cid, 111 + commit.commit_data.rev, 112 + &actor_pools, 113 + ) 114 + .await?; 115 + } 116 + Ok(PutRecordOutput { 117 + uri: write.uri().to_string(), 118 + cid: write.cid().unwrap().to_string(), 119 + }) 120 + } else { 121 + bail!("Could not find repo: `{repo}`") 122 + } 123 + } 124 + 125 + /// Write a repository record, creating or updating it as needed. Requires auth, implemented by PDS. 126 + /// - POST /xrpc/com.atproto.repo.putRecord 127 + /// ### Request Body 128 + /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 129 + /// - `collection`: `nsid` // The NSID of the record collection. 130 + /// - `rkey`: `string` // The record key. <= 512 characters. 131 + /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 132 + /// - `record` 133 + /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. WARNING: nullable and optional field; may cause problems with golang implementation 134 + /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 135 + /// ### Responses 136 + /// - 200 OK: {"uri": "string","cid": "string","commit": {"cid": "string","rev": "string"},"validationStatus": "valid | unknown"} 137 + /// - 400 Bad Request: {error:"`InvalidRequest` | `ExpiredToken` | `InvalidToken` | `InvalidSwap`"} 138 + /// - 401 Unauthorized 139 + #[tracing::instrument(skip_all)] 140 + pub async fn put_record( 141 + auth: AuthenticatedUser, 142 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 143 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 144 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 145 + Json(body): Json<PutRecordInput>, 146 + ) -> Result<Json<PutRecordOutput>, ApiError> { 147 + tracing::debug!("@LOG: debug put_record {body:#?}"); 148 + match inner_put_record(body, auth, sequencer, actor_pools, account_manager).await { 149 + Ok(res) => Ok(Json(res)), 150 + Err(error) => { 151 + tracing::error!("@LOG: ERROR: {error}"); 152 + Err(ApiError::RuntimeError) 153 + } 154 + } 155 + }
-514
src/apis/com/atproto/repo/repo.rs
··· 1 - //! PDS repository endpoints /xrpc/com.atproto.repo.*) 2 - mod apply_writes; 3 - pub(crate) use apply_writes::apply_writes; 4 - 5 - use std::{collections::HashSet, str::FromStr}; 6 - 7 - use anyhow::{Context as _, anyhow}; 8 - use atrium_api::com::atproto::repo::apply_writes::{ 9 - self as atrium_apply_writes, InputWritesItem, OutputResultsItem, 10 - }; 11 - use atrium_api::{ 12 - com::atproto::repo::{self, defs::CommitMetaData}, 13 - types::{ 14 - LimitedU32, Object, TryFromUnknown as _, TryIntoUnknown as _, Unknown, 15 - string::{AtIdentifier, Nsid, Tid}, 16 - }, 17 - }; 18 - use atrium_repo::{Cid, blockstore::CarStore}; 19 - use axum::{ 20 - Json, Router, 21 - body::Body, 22 - extract::{Query, Request, State}, 23 - http::{self, StatusCode}, 24 - routing::{get, post}, 25 - }; 26 - use constcat::concat; 27 - use futures::TryStreamExt as _; 28 - use metrics::counter; 29 - use rsky_syntax::aturi::AtUri; 30 - use serde::Deserialize; 31 - use tokio::io::AsyncWriteExt as _; 32 - 33 - use crate::repo::block_map::cid_for_cbor; 34 - use crate::repo::types::PreparedCreateOrUpdate; 35 - use crate::{ 36 - AppState, Db, Error, Result, SigningKey, 37 - actor_store::{ActorStoreTransactor, ActorStoreWriter}, 38 - auth::AuthenticatedUser, 39 - config::AppConfig, 40 - error::ErrorMessage, 41 - firehose::{self, FirehoseProducer, RepoOp}, 42 - metrics::{REPO_COMMITS, REPO_OP_CREATE, REPO_OP_DELETE, REPO_OP_UPDATE}, 43 - repo::types::{PreparedWrite, WriteOpAction}, 44 - storage, 45 - }; 46 - 47 - #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] 48 - #[serde(rename_all = "camelCase")] 49 - /// Parameters for [`list_records`]. 50 - pub(super) struct ListRecordsParameters { 51 - ///The NSID of the record type. 52 - pub collection: Nsid, 53 - /// The cursor to start from. 54 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 55 - pub cursor: Option<String>, 56 - ///The number of records to return. 57 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 58 - pub limit: Option<String>, 59 - ///The handle or DID of the repo. 60 - pub repo: AtIdentifier, 61 - ///Flag to reverse the order of the returned records. 62 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 63 - pub reverse: Option<bool>, 64 - ///DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) 65 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 66 - pub rkey_end: Option<String>, 67 - ///DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) 68 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 69 - pub rkey_start: Option<String>, 70 - } 71 - 72 - /// Resolve DID to DID document. Does not bi-directionally verify handle. 73 - /// - GET /xrpc/com.atproto.repo.resolveDid 74 - /// ### Query Parameters 75 - /// - `did`: DID to resolve. 76 - /// ### Responses 77 - /// - 200 OK: {`did_doc`: `did_doc`} 78 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `DidNotFound`, `DidDeactivated`]} 79 - async fn resolve_did( 80 - db: &Db, 81 - identifier: &AtIdentifier, 82 - ) -> anyhow::Result<( 83 - atrium_api::types::string::Did, 84 - atrium_api::types::string::Handle, 85 - )> { 86 - let (handle, did) = match *identifier { 87 - AtIdentifier::Handle(ref handle) => { 88 - let handle_as_str = &handle.as_str(); 89 - ( 90 - &handle.to_owned(), 91 - &atrium_api::types::string::Did::new( 92 - sqlx::query_scalar!( 93 - r#"SELECT did FROM handles WHERE handle = ?"#, 94 - handle_as_str 95 - ) 96 - .fetch_one(db) 97 - .await 98 - .context("failed to query did")?, 99 - ) 100 - .expect("should be valid DID"), 101 - ) 102 - } 103 - AtIdentifier::Did(ref did) => { 104 - let did_as_str = &did.as_str(); 105 - ( 106 - &atrium_api::types::string::Handle::new( 107 - sqlx::query_scalar!(r#"SELECT handle FROM handles WHERE did = ?"#, did_as_str) 108 - .fetch_one(db) 109 - .await 110 - .context("failed to query did")?, 111 - ) 112 - .expect("should be valid handle"), 113 - &did.to_owned(), 114 - ) 115 - } 116 - }; 117 - 118 - Ok((did.to_owned(), handle.to_owned())) 119 - } 120 - 121 - /// Create a single new repository record. Requires auth, implemented by PDS. 122 - /// - POST /xrpc/com.atproto.repo.createRecord 123 - /// ### Request Body 124 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 125 - /// - `collection`: `nsid` // The NSID of the record collection. 126 - /// - `rkey`: `string` // The record key. <= 512 characters. 127 - /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 128 - /// - `record` 129 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 130 - /// ### Responses 131 - /// - 200 OK: {`cid`: `cid`, `uri`: `at-uri`, `commit`: {`cid`: `cid`, `rev`: `tid`}, `validation_status`: [`valid`, `unknown`]} 132 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 133 - /// - 401 Unauthorized 134 - async fn create_record( 135 - user: AuthenticatedUser, 136 - State(actor_store): State<ActorStore>, 137 - State(skey): State<SigningKey>, 138 - State(config): State<AppConfig>, 139 - State(db): State<Db>, 140 - State(fhp): State<FirehoseProducer>, 141 - Json(input): Json<repo::create_record::Input>, 142 - ) -> Result<Json<repo::create_record::Output>> { 143 - todo!(); 144 - // let write_result = apply_writes::apply_writes( 145 - // user, 146 - // State(actor_store), 147 - // State(skey), 148 - // State(config), 149 - // State(db), 150 - // State(fhp), 151 - // Json( 152 - // repo::apply_writes::InputData { 153 - // repo: input.repo.clone(), 154 - // validate: input.validate, 155 - // swap_commit: input.swap_commit.clone(), 156 - // writes: vec![repo::apply_writes::InputWritesItem::Create(Box::new( 157 - // repo::apply_writes::CreateData { 158 - // collection: input.collection.clone(), 159 - // rkey: input.rkey.clone(), 160 - // value: input.record.clone(), 161 - // } 162 - // .into(), 163 - // ))], 164 - // } 165 - // .into(), 166 - // ), 167 - // ) 168 - // .await 169 - // .context("failed to apply writes")?; 170 - 171 - // let create_result = if let repo::apply_writes::OutputResultsItem::CreateResult(create_result) = 172 - // write_result 173 - // .results 174 - // .clone() 175 - // .and_then(|result| result.first().cloned()) 176 - // .context("unexpected output from apply_writes")? 177 - // { 178 - // Some(create_result) 179 - // } else { 180 - // None 181 - // } 182 - // .context("unexpected result from apply_writes")?; 183 - 184 - // Ok(Json( 185 - // repo::create_record::OutputData { 186 - // cid: create_result.cid.clone(), 187 - // commit: write_result.commit.clone(), 188 - // uri: create_result.uri.clone(), 189 - // validation_status: Some("unknown".to_owned()), 190 - // } 191 - // .into(), 192 - // )) 193 - } 194 - 195 - /// Write a repository record, creating or updating it as needed. Requires auth, implemented by PDS. 196 - /// - POST /xrpc/com.atproto.repo.putRecord 197 - /// ### Request Body 198 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 199 - /// - `collection`: `nsid` // The NSID of the record collection. 200 - /// - `rkey`: `string` // The record key. <= 512 characters. 201 - /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 202 - /// - `record` 203 - /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. WARNING: nullable and optional field; may cause problems with golang implementation 204 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 205 - /// ### Responses 206 - /// - 200 OK: {"uri": "string","cid": "string","commit": {"cid": "string","rev": "string"},"validationStatus": "valid | unknown"} 207 - /// - 400 Bad Request: {error:"`InvalidRequest` | `ExpiredToken` | `InvalidToken` | `InvalidSwap`"} 208 - /// - 401 Unauthorized 209 - async fn put_record( 210 - user: AuthenticatedUser, 211 - State(actor_store): State<ActorStore>, 212 - State(skey): State<SigningKey>, 213 - State(config): State<AppConfig>, 214 - State(db): State<Db>, 215 - State(fhp): State<FirehoseProducer>, 216 - Json(input): Json<repo::put_record::Input>, 217 - ) -> Result<Json<repo::put_record::Output>> { 218 - todo!(); 219 - // // TODO: `input.swap_record` 220 - // // FIXME: "put" implies that we will create the record if it does not exist. 221 - // // We currently only update existing records and/or throw an error if one doesn't exist. 222 - // let input = (*input).clone(); 223 - // let input = repo::apply_writes::InputData { 224 - // repo: input.repo, 225 - // validate: input.validate, 226 - // swap_commit: input.swap_commit, 227 - // writes: vec![repo::apply_writes::InputWritesItem::Update(Box::new( 228 - // repo::apply_writes::UpdateData { 229 - // collection: input.collection, 230 - // rkey: input.rkey, 231 - // value: input.record, 232 - // } 233 - // .into(), 234 - // ))], 235 - // } 236 - // .into(); 237 - 238 - // let write_result = apply_writes::apply_writes( 239 - // user, 240 - // State(actor_store), 241 - // State(skey), 242 - // State(config), 243 - // State(db), 244 - // State(fhp), 245 - // Json(input), 246 - // ) 247 - // .await 248 - // .context("failed to apply writes")?; 249 - 250 - // let update_result = write_result 251 - // .results 252 - // .clone() 253 - // .and_then(|result| result.first().cloned()) 254 - // .context("unexpected output from apply_writes")?; 255 - // let (cid, uri) = match update_result { 256 - // repo::apply_writes::OutputResultsItem::CreateResult(create_result) => ( 257 - // Some(create_result.cid.clone()), 258 - // Some(create_result.uri.clone()), 259 - // ), 260 - // repo::apply_writes::OutputResultsItem::UpdateResult(update_result) => ( 261 - // Some(update_result.cid.clone()), 262 - // Some(update_result.uri.clone()), 263 - // ), 264 - // repo::apply_writes::OutputResultsItem::DeleteResult(_) => (None, None), 265 - // }; 266 - // Ok(Json( 267 - // repo::put_record::OutputData { 268 - // cid: cid.context("missing cid")?, 269 - // commit: write_result.commit.clone(), 270 - // uri: uri.context("missing uri")?, 271 - // validation_status: Some("unknown".to_owned()), 272 - // } 273 - // .into(), 274 - // )) 275 - } 276 - 277 - /// Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 278 - /// - POST /xrpc/com.atproto.repo.deleteRecord 279 - /// ### Request Body 280 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 281 - /// - `collection`: `nsid` // The NSID of the record collection. 282 - /// - `rkey`: `string` // The record key. <= 512 characters. 283 - /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. 284 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 285 - /// ### Responses 286 - /// - 200 OK: {"commit": {"cid": "string","rev": "string"}} 287 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 288 - /// - 401 Unauthorized 289 - async fn delete_record( 290 - user: AuthenticatedUser, 291 - State(actor_store): State<ActorStore>, 292 - State(skey): State<SigningKey>, 293 - State(config): State<AppConfig>, 294 - State(db): State<Db>, 295 - State(fhp): State<FirehoseProducer>, 296 - Json(input): Json<repo::delete_record::Input>, 297 - ) -> Result<Json<repo::delete_record::Output>> { 298 - todo!(); 299 - // // TODO: `input.swap_record` 300 - 301 - // Ok(Json( 302 - // repo::delete_record::OutputData { 303 - // commit: apply_writes::apply_writes( 304 - // user, 305 - // State(actor_store), 306 - // State(skey), 307 - // State(config), 308 - // State(db), 309 - // State(fhp), 310 - // Json( 311 - // repo::apply_writes::InputData { 312 - // repo: input.repo.clone(), 313 - // swap_commit: input.swap_commit.clone(), 314 - // validate: None, 315 - // writes: vec![repo::apply_writes::InputWritesItem::Delete(Box::new( 316 - // repo::apply_writes::DeleteData { 317 - // collection: input.collection.clone(), 318 - // rkey: input.rkey.clone(), 319 - // } 320 - // .into(), 321 - // ))], 322 - // } 323 - // .into(), 324 - // ), 325 - // ) 326 - // .await 327 - // .context("failed to apply writes")? 328 - // .commit 329 - // .clone(), 330 - // } 331 - // .into(), 332 - // )) 333 - } 334 - 335 - /// Get information about an account and repository, including the list of collections. Does not require auth. 336 - /// - GET /xrpc/com.atproto.repo.describeRepo 337 - /// ### Query Parameters 338 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 339 - /// ### Responses 340 - /// - 200 OK: {"handle": "string","did": "string","didDoc": {},"collections": [string],"handleIsCorrect": true} \ 341 - /// handeIsCorrect - boolean - Indicates if handle is currently valid (resolves bi-directionally) 342 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 343 - /// - 401 Unauthorized 344 - async fn describe_repo( 345 - State(actor_store): State<ActorStore>, 346 - State(config): State<AppConfig>, 347 - State(db): State<Db>, 348 - Query(input): Query<repo::describe_repo::ParametersData>, 349 - ) -> Result<Json<repo::describe_repo::Output>> { 350 - // Lookup the DID by the provided handle. 351 - let (did, handle) = resolve_did(&db, &input.repo) 352 - .await 353 - .context("failed to resolve handle")?; 354 - 355 - // Use Actor Store to get the collections 356 - todo!(); 357 - } 358 - 359 - /// Get a single record from a repository. Does not require auth. 360 - /// - GET /xrpc/com.atproto.repo.getRecord 361 - /// ### Query Parameters 362 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 363 - /// - `collection`: `nsid` // The NSID of the record collection. 364 - /// - `rkey`: `string` // The record key. <= 512 characters. 365 - /// - `cid`: `cid` // The CID of the version of the record. If not specified, then return the most recent version. 366 - /// ### Responses 367 - /// - 200 OK: {"uri": "string","cid": "string","value": {}} 368 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RecordNotFound`]} 369 - /// - 401 Unauthorized 370 - async fn get_record( 371 - State(actor_store): State<ActorStore>, 372 - State(config): State<AppConfig>, 373 - State(db): State<Db>, 374 - Query(input): Query<repo::get_record::ParametersData>, 375 - ) -> Result<Json<repo::get_record::Output>> { 376 - if input.cid.is_some() { 377 - return Err(Error::unimplemented(anyhow!( 378 - "looking up old records is unsupported" 379 - ))); 380 - } 381 - 382 - // Lookup the DID by the provided handle. 383 - let (did, _handle) = resolve_did(&db, &input.repo) 384 - .await 385 - .context("failed to resolve handle")?; 386 - 387 - // Create a URI from the parameters 388 - let uri = format!( 389 - "at://{}/{}/{}", 390 - did.as_str(), 391 - input.collection.as_str(), 392 - input.rkey.as_str() 393 - ); 394 - 395 - // Use Actor Store to get the record 396 - todo!(); 397 - } 398 - 399 - /// List a range of records in a repository, matching a specific collection. Does not require auth. 400 - /// - GET /xrpc/com.atproto.repo.listRecords 401 - /// ### Query Parameters 402 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 403 - /// - `collection`: `nsid` // The NSID of the record type. 404 - /// - `limit`: `integer` // The maximum number of records to return. Default 50, >=1 and <=100. 405 - /// - `cursor`: `string` 406 - /// - `reverse`: `boolean` // Flag to reverse the order of the returned records. 407 - /// ### Responses 408 - /// - 200 OK: {"cursor": "string","records": [{"uri": "string","cid": "string","value": {}}]} 409 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 410 - /// - 401 Unauthorized 411 - async fn list_records( 412 - State(actor_store): State<ActorStore>, 413 - State(config): State<AppConfig>, 414 - State(db): State<Db>, 415 - Query(input): Query<Object<ListRecordsParameters>>, 416 - ) -> Result<Json<repo::list_records::Output>> { 417 - // Lookup the DID by the provided handle. 418 - let (did, _handle) = resolve_did(&db, &input.repo) 419 - .await 420 - .context("failed to resolve handle")?; 421 - 422 - // Use Actor Store to list records for the collection 423 - todo!(); 424 - } 425 - 426 - /// Upload a new blob, to be referenced from a repository record. \ 427 - /// The blob will be deleted if it is not referenced within a time window (eg, minutes). \ 428 - /// Blob restrictions (mimetype, size, etc) are enforced when the reference is created. \ 429 - /// Requires auth, implemented by PDS. 430 - /// - POST /xrpc/com.atproto.repo.uploadBlob 431 - /// ### Request Body 432 - /// ### Responses 433 - /// - 200 OK: {"blob": "binary"} 434 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 435 - /// - 401 Unauthorized 436 - async fn upload_blob( 437 - user: AuthenticatedUser, 438 - State(actor_store): State<ActorStore>, 439 - State(config): State<AppConfig>, 440 - State(db): State<Db>, 441 - request: Request<Body>, 442 - ) -> Result<Json<repo::upload_blob::Output>> { 443 - let length = request 444 - .headers() 445 - .get(http::header::CONTENT_LENGTH) 446 - .context("no content length provided")? 447 - .to_str() 448 - .map_err(anyhow::Error::from) 449 - .and_then(|content_length| content_length.parse::<u64>().map_err(anyhow::Error::from)) 450 - .context("invalid content-length header")?; 451 - let mime = request 452 - .headers() 453 - .get(http::header::CONTENT_TYPE) 454 - .context("no content-type provided")? 455 - .to_str() 456 - .context("invalid content-type provided")? 457 - .to_owned(); 458 - 459 - if length > config.blob.limit { 460 - return Err(Error::with_status( 461 - StatusCode::PAYLOAD_TOO_LARGE, 462 - anyhow!("size {} above limit {}", length, config.blob.limit), 463 - )); 464 - } 465 - 466 - // Read the blob data 467 - let mut body_data = Vec::new(); 468 - let mut stream = request.into_body().into_data_stream(); 469 - while let Some(bytes) = stream.try_next().await.context("failed to receive file")? { 470 - body_data.extend_from_slice(&bytes); 471 - 472 - // Check size limit incrementally 473 - if body_data.len() as u64 > config.blob.limit { 474 - return Err(Error::with_status( 475 - StatusCode::PAYLOAD_TOO_LARGE, 476 - anyhow!("size above limit and content-length header was wrong"), 477 - )); 478 - } 479 - } 480 - 481 - // Use Actor Store to upload the blob 482 - todo!(); 483 - } 484 - 485 - async fn todo() -> Result<()> { 486 - Err(Error::unimplemented(anyhow!("not implemented"))) 487 - } 488 - 489 - /// These endpoints are part of the atproto PDS repository management APIs. \ 490 - /// Requests usually require authentication (unlike the com.atproto.sync.* endpoints), and are made directly to the user's own PDS instance. 491 - /// ### Routes 492 - /// - AP /xrpc/com.atproto.repo.applyWrites -> [`apply_writes`] 493 - /// - AP /xrpc/com.atproto.repo.createRecord -> [`create_record`] 494 - /// - AP /xrpc/com.atproto.repo.putRecord -> [`put_record`] 495 - /// - AP /xrpc/com.atproto.repo.deleteRecord -> [`delete_record`] 496 - /// - AP /xrpc/com.atproto.repo.uploadBlob -> [`upload_blob`] 497 - /// - UG /xrpc/com.atproto.repo.describeRepo -> [`describe_repo`] 498 - /// - UG /xrpc/com.atproto.repo.getRecord -> [`get_record`] 499 - /// - UG /xrpc/com.atproto.repo.listRecords -> [`list_records`] 500 - /// - [ ] xx /xrpc/com.atproto.repo.importRepo 501 - // - [ ] xx /xrpc/com.atproto.repo.listMissingBlobs 502 - pub(super) fn routes() -> Router<AppState> { 503 - Router::new() 504 - .route(concat!("/", repo::apply_writes::NSID), post(apply_writes)) 505 - // .route(concat!("/", repo::create_record::NSID), post(create_record)) 506 - // .route(concat!("/", repo::put_record::NSID), post(put_record)) 507 - // .route(concat!("/", repo::delete_record::NSID), post(delete_record)) 508 - // .route(concat!("/", repo::upload_blob::NSID), post(upload_blob)) 509 - // .route(concat!("/", repo::describe_repo::NSID), get(describe_repo)) 510 - // .route(concat!("/", repo::get_record::NSID), get(get_record)) 511 - .route(concat!("/", repo::import_repo::NSID), post(todo)) 512 - .route(concat!("/", repo::list_missing_blobs::NSID), get(todo)) 513 - // .route(concat!("/", repo::list_records::NSID), get(list_records)) 514 - }
+117
src/apis/com/atproto/repo/upload_blob.rs
··· 1 + //! Upload a new blob, to be referenced from a repository record. 2 + use crate::config::AppConfig; 3 + use anyhow::Context as _; 4 + use axum::{ 5 + body::Bytes, 6 + http::{self, HeaderMap}, 7 + }; 8 + use rsky_lexicon::com::atproto::repo::{Blob, BlobOutput}; 9 + use rsky_repo::types::{BlobConstraint, PreparedBlobRef}; 10 + // use rsky_common::BadContentTypeError; 11 + 12 + use super::*; 13 + 14 + async fn inner_upload_blob( 15 + auth: AuthenticatedUser, 16 + blob: Bytes, 17 + content_type: String, 18 + actor_pools: HashMap<String, ActorStorage>, 19 + ) -> Result<BlobOutput> { 20 + // let requester = auth.access.credentials.unwrap().did.unwrap(); 21 + let requester = auth.did(); 22 + 23 + let actor_store = ActorStore::from_actor_pools(&requester, &actor_pools).await; 24 + 25 + let metadata = actor_store 26 + .blob 27 + .upload_blob_and_get_metadata(content_type, blob) 28 + .await?; 29 + let blobref = actor_store.blob.track_untethered_blob(metadata).await?; 30 + 31 + // make the blob permanent if an associated record is already indexed 32 + let records_for_blob = actor_store 33 + .blob 34 + .get_records_for_blob(blobref.get_cid()?) 35 + .await?; 36 + 37 + if !records_for_blob.is_empty() { 38 + actor_store 39 + .blob 40 + .verify_blob_and_make_permanent(PreparedBlobRef { 41 + cid: blobref.get_cid()?, 42 + mime_type: blobref.get_mime_type().to_string(), 43 + constraints: BlobConstraint { 44 + max_size: None, 45 + accept: None, 46 + }, 47 + }) 48 + .await?; 49 + } 50 + 51 + Ok(BlobOutput { 52 + blob: Blob { 53 + r#type: Some("blob".to_string()), 54 + r#ref: Some(blobref.get_cid()?), 55 + cid: None, 56 + mime_type: blobref.get_mime_type().to_string(), 57 + size: blobref.get_size(), 58 + original: None, 59 + }, 60 + }) 61 + } 62 + 63 + /// Upload a new blob, to be referenced from a repository record. \ 64 + /// The blob will be deleted if it is not referenced within a time window (eg, minutes). \ 65 + /// Blob restrictions (mimetype, size, etc) are enforced when the reference is created. \ 66 + /// Requires auth, implemented by PDS. 67 + /// - POST /xrpc/com.atproto.repo.uploadBlob 68 + /// ### Request Body 69 + /// ### Responses 70 + /// - 200 OK: {"blob": "binary"} 71 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 72 + /// - 401 Unauthorized 73 + #[tracing::instrument(skip_all)] 74 + #[axum::debug_handler(state = AppState)] 75 + pub async fn upload_blob( 76 + auth: AuthenticatedUser, 77 + headers: HeaderMap, 78 + State(config): State<AppConfig>, 79 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 80 + blob: Bytes, 81 + ) -> Result<Json<BlobOutput>, ApiError> { 82 + let content_length = headers 83 + .get(http::header::CONTENT_LENGTH) 84 + .context("no content length provided")? 85 + .to_str() 86 + .map_err(anyhow::Error::from) 87 + .and_then(|content_length| content_length.parse::<u64>().map_err(anyhow::Error::from)) 88 + .context("invalid content-length header")?; 89 + let content_type = headers 90 + .get(http::header::CONTENT_TYPE) 91 + .context("no content-type provided")? 92 + .to_str() 93 + // .map_err(BadContentTypeError::MissingType) 94 + .context("invalid content-type provided")? 95 + .to_owned(); 96 + 97 + if content_length > config.blob.limit { 98 + return Err(ApiError::InvalidRequest(format!( 99 + "Content-Length is greater than maximum of {}", 100 + config.blob.limit 101 + ))); 102 + }; 103 + if blob.len() as u64 > config.blob.limit { 104 + return Err(ApiError::InvalidRequest(format!( 105 + "Blob size is greater than maximum of {} despite content-length header", 106 + config.blob.limit 107 + ))); 108 + }; 109 + 110 + match inner_upload_blob(auth, blob, content_type, actor_pools).await { 111 + Ok(res) => Ok(Json(res)), 112 + Err(error) => { 113 + tracing::error!("{error:?}"); 114 + Err(ApiError::RuntimeError) 115 + } 116 + } 117 + }