Alternative ATProto PDS implementation

reorganize; apis/com/atproto/repo/apply_writes prototype

Changed files
+308 -123
src
account_manager
actor_store
apis
+2 -1
src/account_manager/mod.rs
··· 12 use crate::account_manager::helpers::password::UpdateUserPasswordOpts; 13 use crate::models::pds::EmailTokenPurpose; 14 use anyhow::Result; 15 use chrono::DateTime; 16 use chrono::offset::Utc as UtcOffset; 17 use cidv10::Cid; ··· 503 } 504 505 pub struct SharedAccountManager { 506 - pub account_manager: RwLock<AccountManagerCreator>, 507 }
··· 12 use crate::account_manager::helpers::password::UpdateUserPasswordOpts; 13 use crate::models::pds::EmailTokenPurpose; 14 use anyhow::Result; 15 + use axum::extract::FromRef; 16 use chrono::DateTime; 17 use chrono::offset::Utc as UtcOffset; 18 use cidv10::Cid; ··· 504 } 505 506 pub struct SharedAccountManager { 507 + pub account_manager: RwLock<AccountManager>, 508 }
+21
src/actor_store/mod.rs
··· 38 use sql_blob::BlobStoreSql; 39 use sql_repo::SqlRepoReader; 40 41 #[derive(Debug)] 42 enum FormatCommitError { 43 BadRecordSwap(String), ··· 88 did, 89 blob: BlobReader::new(blobstore, db), 90 } 91 } 92 93 pub async fn get_repo_root(&self) -> Option<Cid> {
··· 38 use sql_blob::BlobStoreSql; 39 use sql_repo::SqlRepoReader; 40 41 + use crate::ActorPools; 42 + 43 #[derive(Debug)] 44 enum FormatCommitError { 45 BadRecordSwap(String), ··· 90 did, 91 blob: BlobReader::new(blobstore, db), 92 } 93 + } 94 + 95 + /// Create a new ActorStore taking ActorPools HashMap as input 96 + pub async fn from_actor_pools( 97 + did: &String, 98 + hashmap_actor_pools: &std::collections::HashMap<String, ActorPools>, 99 + ) -> Self { 100 + let actor_pool = hashmap_actor_pools 101 + .get(did) 102 + .expect("Actor pool not found") 103 + .clone(); 104 + let blobstore = BlobStoreSql::new(did.clone(), actor_pool.blob); 105 + let conn = actor_pool 106 + .repo 107 + .clone() 108 + .get() 109 + .await 110 + .expect("Failed to get connection"); 111 + Self::new(did.clone(), blobstore, actor_pool.repo, conn) 112 } 113 114 pub async fn get_repo_root(&self) -> Option<Cid> {
+13 -12
src/actor_store/sql_blob.rs
··· 72 } 73 74 // /// Create a factory function for blob stores 75 - // pub fn creator( 76 - // db: deadpool_diesel::Pool< 77 - // deadpool_diesel::Manager<SqliteConnection>, 78 - // deadpool_diesel::sqlite::Object, 79 - // >, 80 - // ) -> Box<dyn Fn(String) -> BlobStoreSql> { 81 - // let db_clone = db.clone(); 82 - // Box::new(move |did: String| BlobStoreSql::new(did, db_clone.clone())) 83 - // } 84 85 /// Store a blob temporarily - now just stores permanently with a key returned for API compatibility 86 pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> { 87 // Generate a unique key as a CID based on the data 88 - use sha2::{Digest, Sha256}; 89 - let digest = Sha256::digest(&bytes); 90 - let key = hex::encode(digest); 91 92 // Just store the blob directly 93 self.put_permanent_with_mime(
··· 72 } 73 74 // /// Create a factory function for blob stores 75 + pub fn creator( 76 + db: deadpool_diesel::Pool< 77 + deadpool_diesel::Manager<SqliteConnection>, 78 + deadpool_diesel::sqlite::Object, 79 + >, 80 + ) -> Box<dyn Fn(String) -> BlobStoreSql> { 81 + let db_clone = db.clone(); 82 + Box::new(move |did: String| BlobStoreSql::new(did, db_clone.clone())) 83 + } 84 85 /// Store a blob temporarily - now just stores permanently with a key returned for API compatibility 86 pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> { 87 // Generate a unique key as a CID based on the data 88 + // use sha2::{Digest, Sha256}; 89 + // let digest = Sha256::digest(&bytes); 90 + // let key = hex::encode(digest); 91 + let key = rsky_common::get_random_str(); 92 93 // Just store the blob directly 94 self.put_permanent_with_mime(
+5
src/apis/com/atproto/mod.rs
···
··· 1 + // pub mod admin; 2 + // pub mod identity; 3 + pub mod repo; 4 + // pub mod server; 5 + // pub mod sync;
+45
src/apis/com/atproto/repo/mod.rs
···
··· 1 + use atrium_api::com::atproto::repo; 2 + use axum::{Router, routing::post}; 3 + use constcat::concat; 4 + 5 + use crate::AppState; 6 + 7 + pub mod apply_writes; 8 + // pub mod create_record; 9 + // pub mod delete_record; 10 + // pub mod describe_repo; 11 + // pub mod get_record; 12 + // pub mod import_repo; 13 + // pub mod list_missing_blobs; 14 + // pub mod list_records; 15 + // pub mod put_record; 16 + // pub mod upload_blob; 17 + 18 + /// These endpoints are part of the atproto PDS repository management APIs. \ 19 + /// Requests usually require authentication (unlike the com.atproto.sync.* endpoints), and are made directly to the user's own PDS instance. 20 + /// ### Routes 21 + /// - AP /xrpc/com.atproto.repo.applyWrites -> [`apply_writes`] 22 + /// - AP /xrpc/com.atproto.repo.createRecord -> [`create_record`] 23 + /// - AP /xrpc/com.atproto.repo.putRecord -> [`put_record`] 24 + /// - AP /xrpc/com.atproto.repo.deleteRecord -> [`delete_record`] 25 + /// - AP /xrpc/com.atproto.repo.uploadBlob -> [`upload_blob`] 26 + /// - UG /xrpc/com.atproto.repo.describeRepo -> [`describe_repo`] 27 + /// - UG /xrpc/com.atproto.repo.getRecord -> [`get_record`] 28 + /// - UG /xrpc/com.atproto.repo.listRecords -> [`list_records`] 29 + /// - [ ] xx /xrpc/com.atproto.repo.importRepo 30 + // - [ ] xx /xrpc/com.atproto.repo.listMissingBlobs 31 + pub(crate) fn routes() -> Router<AppState> { 32 + Router::new().route( 33 + concat!("/", repo::apply_writes::NSID), 34 + post(apply_writes::apply_writes), 35 + ) 36 + // .route(concat!("/", repo::create_record::NSID), post(create_record)) 37 + // .route(concat!("/", repo::put_record::NSID), post(put_record)) 38 + // .route(concat!("/", repo::delete_record::NSID), post(delete_record)) 39 + // .route(concat!("/", repo::upload_blob::NSID), post(upload_blob)) 40 + // .route(concat!("/", repo::describe_repo::NSID), get(describe_repo)) 41 + // .route(concat!("/", repo::get_record::NSID), get(get_record)) 42 + // .route(concat!("/", repo::import_repo::NSID), post(todo)) 43 + // .route(concat!("/", repo::list_missing_blobs::NSID), get(todo)) 44 + // .route(concat!("/", repo::list_records::NSID), get(list_records)) 45 + }
+1
src/apis/com/mod.rs
···
··· 1 + pub mod atproto;
src/endpoints/identity.rs src/apis/com/atproto/identity/identity.rs
+5 -4
src/endpoints/mod.rs src/apis/mod.rs
··· 1 //! Root module for all endpoints. 2 // mod identity; 3 - // mod repo; 4 // mod server; 5 // mod sync; 6 ··· 18 19 /// Register all root routes. 20 pub(crate) fn routes() -> Router<AppState> { 21 - Router::new().route("/_health", get(health)) 22 - // .merge(identity::routes()) // com.atproto.identity 23 - // .merge(repo::routes()) // com.atproto.repo 24 // .merge(server::routes()) // com.atproto.server 25 // .merge(sync::routes()) // com.atproto.sync 26 }
··· 1 //! Root module for all endpoints. 2 // mod identity; 3 + mod com; 4 // mod server; 5 // mod sync; 6 ··· 18 19 /// Register all root routes. 20 pub(crate) fn routes() -> Router<AppState> { 21 + Router::new() 22 + .route("/_health", get(health)) 23 + // .merge(identity::routes()) // com.atproto.identity 24 + .merge(com::atproto::repo::routes()) // com.atproto.repo 25 // .merge(server::routes()) // com.atproto.server 26 // .merge(sync::routes()) // com.atproto.sync 27 }
src/endpoints/repo.rs src/apis/com/atproto/repo/repo.rs
+69 -84
src/endpoints/repo/apply_writes.rs src/apis/com/atproto/repo/apply_writes.rs
··· 1 //! Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS. 2 use crate::{ 3 - ActorPools, AppState, Db, Error, Result, SigningKey, 4 actor_store::{ActorStore, sql_blob::BlobStoreSql}, 5 auth::AuthenticatedUser, 6 config::AppConfig, 7 - error::ErrorMessage, 8 - firehose::{self, FirehoseProducer, RepoOp}, 9 - metrics::{REPO_COMMITS, REPO_OP_CREATE, REPO_OP_DELETE, REPO_OP_UPDATE}, 10 - storage, 11 }; 12 - use anyhow::bail; 13 - use anyhow::{Context as _, anyhow}; 14 - use atrium_api::com::atproto::repo::apply_writes::{self, InputWritesItem, OutputResultsItem}; 15 - use atrium_api::{ 16 - com::atproto::repo::{self, defs::CommitMetaData}, 17 - types::{ 18 - LimitedU32, Object, TryFromUnknown as _, TryIntoUnknown as _, Unknown, 19 - string::{AtIdentifier, Nsid, Tid}, 20 - }, 21 - }; 22 - use atrium_repo::blockstore::CarStore; 23 use axum::{ 24 Json, Router, 25 body::Body, ··· 28 routing::{get, post}, 29 }; 30 use cidv10::Cid; 31 - use constcat::concat; 32 - use futures::TryStreamExt as _; 33 use futures::stream::{self, StreamExt}; 34 - use metrics::counter; 35 use rsky_lexicon::com::atproto::repo::{ApplyWritesInput, ApplyWritesInputRefWrite}; 36 - use rsky_pds::SharedSequencer; 37 - use rsky_pds::account_manager::AccountManager; 38 - use rsky_pds::account_manager::helpers::account::AvailabilityFlags; 39 - use rsky_pds::apis::ApiError; 40 use rsky_pds::auth_verifier::AccessStandardIncludeChecks; 41 use rsky_pds::repo::prepare::{ 42 PrepareCreateOpts, PrepareDeleteOpts, PrepareUpdateOpts, prepare_create, prepare_delete, 43 prepare_update, 44 }; 45 use rsky_repo::types::PreparedWrite; 46 - use rsky_syntax::aturi::AtUri; 47 - use serde::Deserialize; 48 - use std::{collections::HashSet, str::FromStr}; 49 - use tokio::io::AsyncWriteExt as _; 50 51 - use super::resolve_did; 52 - 53 - /// Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS. 54 - /// - POST /xrpc/com.atproto.repo.applyWrites 55 - /// ### Request Body 56 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 57 - /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data across all operations, 'true' to require it, or leave unset to validate only for known Lexicons. 58 - /// - `writes`: `object[]` // One of: 59 - /// - - com.atproto.repo.applyWrites.create 60 - /// - - com.atproto.repo.applyWrites.update 61 - /// - - com.atproto.repo.applyWrites.delete 62 - /// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations. 63 - pub(crate) async fn apply_writes( 64 user: AuthenticatedUser, 65 - State(skey): State<SigningKey>, 66 - State(config): State<AppConfig>, 67 - State(db): State<Db>, 68 - State(db_actors): State<std::collections::HashMap<String, ActorPools>>, 69 - State(fhp): State<FirehoseProducer>, 70 - Json(input): Json<ApplyWritesInput>, 71 - ) -> Result<Json<repo::apply_writes::Output>> { 72 - let tx: ApplyWritesInput = input; 73 let ApplyWritesInput { 74 repo, 75 validate, ··· 77 .. 78 } = tx; 79 let account = account_manager 80 .get_account( 81 &repo, 82 Some(AvailabilityFlags { ··· 88 89 if let Some(account) = account { 90 if account.deactivated_at.is_some() { 91 - return Err(Error::with_message( 92 - StatusCode::FORBIDDEN, 93 - anyhow!("Account is deactivated"), 94 - ErrorMessage::new("AccountDeactivated", "Account is deactivated"), 95 - )); 96 } 97 let did = account.did; 98 if did != user.did() { 99 - return Err(Error::with_message( 100 - StatusCode::FORBIDDEN, 101 - anyhow!("AuthRequiredError"), 102 - ErrorMessage::new("AuthRequiredError", "Auth required"), 103 - )); 104 } 105 let did: &String = &did; 106 if tx.writes.len() > 200 { 107 - return Err(Error::with_message( 108 - StatusCode::BAD_REQUEST, 109 - anyhow!("Too many writes. Max: 200"), 110 - ErrorMessage::new("TooManyWrites", "Too many writes. Max: 200"), 111 - )); 112 } 113 114 let writes: Vec<PreparedWrite> = stream::iter(tx.writes) ··· 156 None => None, 157 }; 158 159 - let actor_db = db_actors 160 - .get(did) 161 - .ok_or_else(|| anyhow!("Actor DB not found"))?; 162 - let conn = actor_db 163 - .repo 164 - .get() 165 - .await 166 - .context("Failed to get actor db connection")?; 167 - let mut actor_store = ActorStore::new( 168 - did.clone(), 169 - BlobStoreSql::new(did.clone(), actor_db.blob), 170 - actor_db.repo, 171 - conn, 172 - ); 173 174 let commit = actor_store 175 .process_writes(writes.clone(), swap_commit_cid) 176 .await?; 177 178 - let mut lock = sequencer.sequencer.write().await; 179 - lock.sequence_commit(did.clone(), commit.clone()).await?; 180 account_manager 181 .update_repo_root( 182 did.to_string(), 183 commit.commit_data.cid, ··· 186 .await?; 187 Ok(()) 188 } else { 189 - Err(Error::with_message( 190 - StatusCode::NOT_FOUND, 191 - anyhow!("Could not find repo: `{repo}`"), 192 - ErrorMessage::new("RepoNotFound", "Could not find repo"), 193 - )) 194 } 195 }
··· 1 //! Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS. 2 + use crate::SharedSequencer; 3 + use crate::account_manager::helpers::account::AvailabilityFlags; 4 + use crate::account_manager::{AccountManager, AccountManagerCreator, SharedAccountManager}; 5 use crate::{ 6 + ActorPools, AppState, SigningKey, 7 actor_store::{ActorStore, sql_blob::BlobStoreSql}, 8 auth::AuthenticatedUser, 9 config::AppConfig, 10 + error::{ApiError, ErrorMessage}, 11 }; 12 + use anyhow::{Result, bail}; 13 use axum::{ 14 Json, Router, 15 body::Body, ··· 18 routing::{get, post}, 19 }; 20 use cidv10::Cid; 21 + use deadpool_diesel::sqlite::Pool; 22 use futures::stream::{self, StreamExt}; 23 use rsky_lexicon::com::atproto::repo::{ApplyWritesInput, ApplyWritesInputRefWrite}; 24 use rsky_pds::auth_verifier::AccessStandardIncludeChecks; 25 use rsky_pds::repo::prepare::{ 26 PrepareCreateOpts, PrepareDeleteOpts, PrepareUpdateOpts, prepare_create, prepare_delete, 27 prepare_update, 28 }; 29 + use rsky_pds::sequencer::Sequencer; 30 use rsky_repo::types::PreparedWrite; 31 + use std::str::FromStr; 32 + use std::sync::Arc; 33 + use tokio::sync::RwLock; 34 35 + async fn inner_apply_writes( 36 + body: ApplyWritesInput, 37 user: AuthenticatedUser, 38 + sequencer: &RwLock<Sequencer>, 39 + actor_pools: std::collections::HashMap<String, ActorPools>, 40 + account_manager: &RwLock<AccountManager>, 41 + ) -> Result<()> { 42 + let tx: ApplyWritesInput = body; 43 let ApplyWritesInput { 44 repo, 45 validate, ··· 47 .. 48 } = tx; 49 let account = account_manager 50 + .read() 51 + .await 52 .get_account( 53 &repo, 54 Some(AvailabilityFlags { ··· 60 61 if let Some(account) = account { 62 if account.deactivated_at.is_some() { 63 + bail!("Account is deactivated") 64 } 65 let did = account.did; 66 if did != user.did() { 67 + bail!("AuthRequiredError") 68 } 69 let did: &String = &did; 70 if tx.writes.len() > 200 { 71 + bail!("Too many writes. Max: 200") 72 } 73 74 let writes: Vec<PreparedWrite> = stream::iter(tx.writes) ··· 116 None => None, 117 }; 118 119 + let mut actor_store = ActorStore::from_actor_pools(did, &actor_pools).await; 120 121 let commit = actor_store 122 .process_writes(writes.clone(), swap_commit_cid) 123 .await?; 124 125 + sequencer 126 + .write() 127 + .await 128 + .sequence_commit(did.clone(), commit.clone()) 129 + .await?; 130 account_manager 131 + .write() 132 + .await 133 .update_repo_root( 134 did.to_string(), 135 commit.commit_data.cid, ··· 138 .await?; 139 Ok(()) 140 } else { 141 + bail!("Could not find repo: `{repo}`") 142 + } 143 + } 144 + 145 + /// Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS. 146 + /// - POST /xrpc/com.atproto.repo.applyWrites 147 + /// ### Request Body 148 + /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 149 + /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data across all operations, 'true' to require it, or leave unset to validate only for known Lexicons. 150 + /// - `writes`: `object[]` // One of: 151 + /// - - com.atproto.repo.applyWrites.create 152 + /// - - com.atproto.repo.applyWrites.update 153 + /// - - com.atproto.repo.applyWrites.delete 154 + /// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations. 155 + #[axum::debug_handler(state = AppState)] 156 + pub(crate) async fn apply_writes( 157 + user: AuthenticatedUser, 158 + State(state): State<AppState>, 159 + Json(body): Json<ApplyWritesInput>, 160 + ) -> Result<(), ApiError> { 161 + tracing::debug!("@LOG: debug apply_writes {body:#?}"); 162 + let db_actors = state.db_actors.clone(); 163 + let sequencer = &state.sequencer.sequencer; 164 + let account_manager = &state.account_manager.account_manager; 165 + match inner_apply_writes( 166 + body, 167 + user, 168 + sequencer.clone(), 169 + db_actors, 170 + account_manager.clone(), 171 + ) 172 + .await 173 + { 174 + Ok(()) => Ok(()), 175 + Err(error) => { 176 + tracing::error!("@LOG: ERROR: {error}"); 177 + Err(ApiError::RuntimeError) 178 + } 179 } 180 }
src/endpoints/server.rs src/apis/com/atproto/server/server.rs
src/endpoints/sync.rs src/apis/com/atproto/sync/sync.rs
+144
src/error.rs
··· 4 http::StatusCode, 5 response::{IntoResponse, Response}, 6 }; 7 use thiserror::Error; 8 use tracing::error; 9 ··· 118 } 119 } 120 }
··· 4 http::StatusCode, 5 response::{IntoResponse, Response}, 6 }; 7 + use rsky_pds::handle::{self, errors::ErrorKind}; 8 use thiserror::Error; 9 use tracing::error; 10 ··· 119 } 120 } 121 } 122 + 123 + /// API error types that can be returned to clients 124 + #[derive(Clone, Debug)] 125 + pub enum ApiError { 126 + RuntimeError, 127 + InvalidLogin, 128 + AccountTakendown, 129 + InvalidRequest(String), 130 + ExpiredToken, 131 + InvalidToken, 132 + RecordNotFound, 133 + InvalidHandle, 134 + InvalidEmail, 135 + InvalidPassword, 136 + InvalidInviteCode, 137 + HandleNotAvailable, 138 + EmailNotAvailable, 139 + UnsupportedDomain, 140 + UnresolvableDid, 141 + IncompatibleDidDoc, 142 + WellKnownNotFound, 143 + AccountNotFound, 144 + BlobNotFound, 145 + BadRequest(String, String), 146 + AuthRequiredError(String), 147 + } 148 + 149 + impl ApiError { 150 + /// Get the appropriate HTTP status code for this error 151 + fn status_code(&self) -> StatusCode { 152 + match self { 153 + Self::RuntimeError => StatusCode::INTERNAL_SERVER_ERROR, 154 + Self::InvalidLogin 155 + | Self::ExpiredToken 156 + | Self::InvalidToken 157 + | Self::AuthRequiredError(_) => StatusCode::UNAUTHORIZED, 158 + Self::AccountTakendown => StatusCode::FORBIDDEN, 159 + Self::RecordNotFound 160 + | Self::WellKnownNotFound 161 + | Self::AccountNotFound 162 + | Self::BlobNotFound => StatusCode::NOT_FOUND, 163 + // All bad requests grouped together 164 + _ => StatusCode::BAD_REQUEST, 165 + } 166 + } 167 + 168 + /// Get the error type string for API responses 169 + fn error_type(&self) -> String { 170 + match self { 171 + Self::RuntimeError => "InternalServerError", 172 + Self::InvalidLogin => "InvalidLogin", 173 + Self::AccountTakendown => "AccountTakendown", 174 + Self::InvalidRequest(_) => "InvalidRequest", 175 + Self::ExpiredToken => "ExpiredToken", 176 + Self::InvalidToken => "InvalidToken", 177 + Self::RecordNotFound => "RecordNotFound", 178 + Self::InvalidHandle => "InvalidHandle", 179 + Self::InvalidEmail => "InvalidEmail", 180 + Self::InvalidPassword => "InvalidPassword", 181 + Self::InvalidInviteCode => "InvalidInviteCode", 182 + Self::HandleNotAvailable => "HandleNotAvailable", 183 + Self::EmailNotAvailable => "EmailNotAvailable", 184 + Self::UnsupportedDomain => "UnsupportedDomain", 185 + Self::UnresolvableDid => "UnresolvableDid", 186 + Self::IncompatibleDidDoc => "IncompatibleDidDoc", 187 + Self::WellKnownNotFound => "WellKnownNotFound", 188 + Self::AccountNotFound => "AccountNotFound", 189 + Self::BlobNotFound => "BlobNotFound", 190 + Self::BadRequest(error, _) => error, 191 + Self::AuthRequiredError(_) => "AuthRequiredError", 192 + } 193 + .to_string() 194 + } 195 + 196 + /// Get the user-facing error message 197 + fn message(&self) -> String { 198 + match self { 199 + Self::RuntimeError => "Something went wrong", 200 + Self::InvalidLogin => "Invalid identifier or password", 201 + Self::AccountTakendown => "Account has been taken down", 202 + Self::InvalidRequest(msg) => msg, 203 + Self::ExpiredToken => "Token is expired", 204 + Self::InvalidToken => "Token is invalid", 205 + Self::RecordNotFound => "Record could not be found", 206 + Self::InvalidHandle => "Handle is invalid", 207 + Self::InvalidEmail => "Invalid email", 208 + Self::InvalidPassword => "Invalid Password", 209 + Self::InvalidInviteCode => "Invalid invite code", 210 + Self::HandleNotAvailable => "Handle not available", 211 + Self::EmailNotAvailable => "Email not available", 212 + Self::UnsupportedDomain => "Unsupported domain", 213 + Self::UnresolvableDid => "Unresolved Did", 214 + Self::IncompatibleDidDoc => "IncompatibleDidDoc", 215 + Self::WellKnownNotFound => "User not found", 216 + Self::AccountNotFound => "Account could not be found", 217 + Self::BlobNotFound => "Blob could not be found", 218 + Self::BadRequest(_, msg) => msg, 219 + Self::AuthRequiredError(msg) => msg, 220 + } 221 + .to_string() 222 + } 223 + } 224 + 225 + impl From<Error> for ApiError { 226 + fn from(_value: Error) -> Self { 227 + ApiError::RuntimeError 228 + } 229 + } 230 + 231 + impl From<handle::errors::Error> for ApiError { 232 + fn from(value: handle::errors::Error) -> Self { 233 + match value.kind { 234 + ErrorKind::InvalidHandle => ApiError::InvalidHandle, 235 + ErrorKind::HandleNotAvailable => ApiError::HandleNotAvailable, 236 + ErrorKind::UnsupportedDomain => ApiError::UnsupportedDomain, 237 + ErrorKind::InternalError => ApiError::RuntimeError, 238 + } 239 + } 240 + } 241 + 242 + impl IntoResponse for ApiError { 243 + fn into_response(self) -> Response { 244 + let status = self.status_code(); 245 + let error_type = self.error_type(); 246 + let message = self.message(); 247 + 248 + // Log the error for debugging 249 + error!("API Error: {}: {}", error_type, message); 250 + 251 + // Create the error message and serialize to JSON 252 + let error_message = ErrorMessage::new(error_type, message); 253 + let body = serde_json::to_string(&error_message).unwrap_or_else(|_| { 254 + r#"{"error":"InternalServerError","message":"Error serializing response"}"#.to_string() 255 + }); 256 + 257 + // Build the response 258 + Response::builder() 259 + .status(status) 260 + .header("Content-Type", "application/json") 261 + .body(Body::new(body)) 262 + .expect("should be a valid response") 263 + } 264 + }
+3 -22
src/lib.rs
··· 2 mod account_manager; 3 mod actor_endpoints; 4 mod actor_store; 5 mod auth; 6 mod config; 7 mod db; 8 mod did; 9 - mod endpoints; 10 pub mod error; 11 mod firehose; 12 mod metrics; ··· 299 tokio::fs::create_dir_all(&config.plc.path).await?; 300 tokio::fs::create_dir_all(&config.blob.path).await?; 301 302 - let cred = azure_identity::DefaultAzureCredential::new() 303 - .context("failed to create Azure credential")?; 304 - 305 // Create a database connection manager and pool for the main database. 306 let pool = 307 establish_pool(&config.db).context("failed to establish database connection pool")?; ··· 364 )), 365 }); 366 let account_manager = SharedAccountManager { 367 - account_manager: RwLock::new(AccountManager::creator()), 368 }; 369 370 let addr = config ··· 376 .merge(oauth::routes()) 377 .nest( 378 "/xrpc", 379 - endpoints::routes() 380 .merge(actor_endpoints::routes()) 381 .fallback(service_proxy), 382 ) ··· 477 .and_then(|r| r) 478 .context("failed to serve app") 479 } 480 - 481 - /// Creates an app router with the provided AppState. 482 - pub fn create_app(state: AppState) -> Router { 483 - Router::new() 484 - .route("/", get(index)) 485 - .merge(oauth::routes()) 486 - .nest( 487 - "/xrpc", 488 - endpoints::routes() 489 - .merge(actor_endpoints::routes()) 490 - .fallback(service_proxy), 491 - ) 492 - .layer(CorsLayer::permissive()) 493 - .layer(TraceLayer::new_for_http()) 494 - .with_state(state) 495 - }
··· 2 mod account_manager; 3 mod actor_endpoints; 4 mod actor_store; 5 + mod apis; 6 mod auth; 7 mod config; 8 mod db; 9 mod did; 10 pub mod error; 11 mod firehose; 12 mod metrics; ··· 299 tokio::fs::create_dir_all(&config.plc.path).await?; 300 tokio::fs::create_dir_all(&config.blob.path).await?; 301 302 // Create a database connection manager and pool for the main database. 303 let pool = 304 establish_pool(&config.db).context("failed to establish database connection pool")?; ··· 361 )), 362 }); 363 let account_manager = SharedAccountManager { 364 + account_manager: RwLock::new(AccountManager::new(pool.clone())), 365 }; 366 367 let addr = config ··· 373 .merge(oauth::routes()) 374 .nest( 375 "/xrpc", 376 + apis::routes() 377 .merge(actor_endpoints::routes()) 378 .fallback(service_proxy), 379 ) ··· 474 .and_then(|r| r) 475 .context("failed to serve app") 476 }