Alternative ATProto PDS implementation

prototype apis com atproto repo

+1
Cargo.lock
··· 1310 1310 "reqwest 0.12.15", 1311 1311 "reqwest-middleware", 1312 1312 "rsky-common", 1313 + "rsky-identity", 1313 1314 "rsky-lexicon", 1314 1315 "rsky-pds", 1315 1316 "rsky-repo",
+1
Cargo.toml
··· 158 158 rsky-pds = { git = "https://github.com/blacksky-algorithms/rsky.git" } 159 159 rsky-common = { git = "https://github.com/blacksky-algorithms/rsky.git" } 160 160 rsky-lexicon = { git = "https://github.com/blacksky-algorithms/rsky.git" } 161 + rsky-identity = { git = "https://github.com/blacksky-algorithms/rsky.git" } 161 162 162 163 # async in streams 163 164 # async-stream = "0.3"
+9 -7
src/apis/com/atproto/repo/apply_writes.rs
··· 18 18 }; 19 19 use rsky_pds::sequencer::Sequencer; 20 20 use rsky_repo::types::PreparedWrite; 21 + use std::collections::HashMap; 22 + use std::hash::RandomState; 21 23 use std::str::FromStr; 24 + use std::sync::Arc; 22 25 use tokio::sync::RwLock; 23 26 24 27 async fn inner_apply_writes( 25 28 body: ApplyWritesInput, 26 29 user: AuthenticatedUser, 27 - sequencer: &RwLock<Sequencer>, 28 - actor_pools: std::collections::HashMap<String, ActorStorage>, 29 - account_manager: &RwLock<AccountManager>, 30 + sequencer: Arc<RwLock<Sequencer>>, 31 + actor_pools: HashMap<String, ActorStorage>, 32 + account_manager: Arc<RwLock<AccountManager>>, 30 33 ) -> Result<()> { 31 34 let tx: ApplyWritesInput = body; 32 35 let ApplyWritesInput { ··· 145 148 #[axum::debug_handler(state = AppState)] 146 149 pub(crate) async fn apply_writes( 147 150 user: AuthenticatedUser, 148 - State(state): State<AppState>, 151 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 152 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 153 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 149 154 Json(body): Json<ApplyWritesInput>, 150 155 ) -> Result<(), ApiError> { 151 156 tracing::debug!("@LOG: debug apply_writes {body:#?}"); 152 - let db_actors = state.db_actors; 153 - let sequencer = &state.sequencer.sequencer; 154 - let account_manager = &state.account_manager.account_manager; 155 157 match inner_apply_writes(body, user, sequencer, db_actors, account_manager).await { 156 158 Ok(()) => Ok(()), 157 159 Err(error) => {
+162
src/apis/com/atproto/repo/create_record.rs
··· 1 + //! Create a single new repository record. Requires auth, implemented by PDS. 2 + use crate::account_manager::AccountManager; 3 + use crate::account_manager::helpers::account::AvailabilityFlags; 4 + use crate::{ 5 + actor_store::ActorStore, 6 + auth::AuthenticatedUser, 7 + error::ApiError, 8 + serve::{ActorStorage, AppState}, 9 + }; 10 + use anyhow::{Result, bail}; 11 + use axum::{Json, extract::State}; 12 + use cidv10::Cid; 13 + use rsky_lexicon::com::atproto::repo::{CreateRecordInput, CreateRecordOutput}; 14 + use rsky_pds::SharedIdResolver; 15 + use rsky_pds::repo::prepare::{ 16 + PrepareCreateOpts, PrepareDeleteOpts, prepare_create, prepare_delete, 17 + }; 18 + use rsky_pds::sequencer::Sequencer; 19 + use rsky_repo::types::{PreparedDelete, PreparedWrite}; 20 + use rsky_syntax::aturi::AtUri; 21 + use std::collections::HashMap; 22 + use std::hash::RandomState; 23 + use std::str::FromStr; 24 + use std::sync::Arc; 25 + use tokio::sync::RwLock; 26 + 27 + async fn inner_create_record( 28 + body: CreateRecordInput, 29 + user: AuthenticatedUser, 30 + sequencer: Arc<RwLock<Sequencer>>, 31 + actor_pools: std::collections::HashMap<String, ActorStorage>, 32 + account_manager: Arc<RwLock<AccountManager>>, 33 + ) -> Result<CreateRecordOutput> { 34 + let CreateRecordInput { 35 + repo, 36 + collection, 37 + record, 38 + rkey, 39 + validate, 40 + swap_commit, 41 + } = body; 42 + let account = account_manager 43 + .read() 44 + .await 45 + .get_account( 46 + &repo, 47 + Some(AvailabilityFlags { 48 + include_deactivated: Some(true), 49 + include_taken_down: None, 50 + }), 51 + ) 52 + .await?; 53 + if let Some(account) = account { 54 + if account.deactivated_at.is_some() { 55 + bail!("Account is deactivated") 56 + } 57 + let did = account.did; 58 + // if did != auth.access.credentials.unwrap().did.unwrap() { 59 + if did != user.did() { 60 + bail!("AuthRequiredError") 61 + } 62 + let swap_commit_cid = match swap_commit { 63 + Some(swap_commit) => Some(Cid::from_str(&swap_commit)?), 64 + None => None, 65 + }; 66 + let write = prepare_create(PrepareCreateOpts { 67 + did: did.clone(), 68 + collection: collection.clone(), 69 + record: serde_json::from_value(record)?, 70 + rkey, 71 + validate, 72 + swap_cid: None, 73 + }) 74 + .await?; 75 + 76 + let did: &String = &did; 77 + let mut actor_store = ActorStore::from_actor_pools(did, &actor_pools).await; 78 + let backlink_conflicts: Vec<AtUri> = match validate { 79 + Some(true) => { 80 + let write_at_uri: AtUri = write.uri.clone().try_into()?; 81 + actor_store 82 + .record 83 + .get_backlink_conflicts(&write_at_uri, &write.record) 84 + .await? 85 + } 86 + _ => Vec::new(), 87 + }; 88 + 89 + let backlink_deletions: Vec<PreparedDelete> = backlink_conflicts 90 + .iter() 91 + .map(|at_uri| { 92 + prepare_delete(PrepareDeleteOpts { 93 + did: at_uri.get_hostname().to_string(), 94 + collection: at_uri.get_collection(), 95 + rkey: at_uri.get_rkey(), 96 + swap_cid: None, 97 + }) 98 + }) 99 + .collect::<Result<Vec<PreparedDelete>>>()?; 100 + let mut writes: Vec<PreparedWrite> = vec![PreparedWrite::Create(write.clone())]; 101 + for delete in backlink_deletions { 102 + writes.push(PreparedWrite::Delete(delete)); 103 + } 104 + let commit = actor_store 105 + .process_writes(writes.clone(), swap_commit_cid) 106 + .await?; 107 + 108 + _ = sequencer 109 + .write() 110 + .await 111 + .sequence_commit(did.clone(), commit.clone()) 112 + .await?; 113 + account_manager 114 + .write() 115 + .await 116 + .update_repo_root( 117 + did.to_string(), 118 + commit.commit_data.cid, 119 + commit.commit_data.rev, 120 + &actor_pools, 121 + ) 122 + .await?; 123 + 124 + Ok(CreateRecordOutput { 125 + uri: write.uri.clone(), 126 + cid: write.cid.to_string(), 127 + }) 128 + } else { 129 + bail!("Could not find repo: `{repo}`") 130 + } 131 + } 132 + 133 + /// Create a single new repository record. Requires auth, implemented by PDS. 134 + /// - POST /xrpc/com.atproto.repo.createRecord 135 + /// ### Request Body 136 + /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 137 + /// - `collection`: `nsid` // The NSID of the record collection. 138 + /// - `rkey`: `string` // The record key. <= 512 characters. 139 + /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 140 + /// - `record` 141 + /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 142 + /// ### Responses 143 + /// - 200 OK: {`cid`: `cid`, `uri`: `at-uri`, `commit`: {`cid`: `cid`, `rev`: `tid`}, `validation_status`: [`valid`, `unknown`]} 144 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 145 + /// - 401 Unauthorized 146 + #[axum::debug_handler(state = AppState)] 147 + pub async fn create_record( 148 + user: AuthenticatedUser, 149 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 150 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 151 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 152 + Json(body): Json<CreateRecordInput>, 153 + ) -> Result<Json<CreateRecordOutput>, ApiError> { 154 + tracing::debug!("@LOG: debug create_record {body:#?}"); 155 + match inner_create_record(body, user, sequencer, db_actors, account_manager).await { 156 + Ok(res) => Ok(Json(res)), 157 + Err(error) => { 158 + tracing::error!("@LOG: ERROR: {error}"); 159 + Err(ApiError::RuntimeError) 160 + } 161 + } 162 + }
+137
src/apis/com/atproto/repo/delete_record.rs
··· 1 + /// Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 2 + use crate::account_manager::AccountManager; 3 + use crate::account_manager::helpers::account::AvailabilityFlags; 4 + use crate::{ 5 + actor_store::ActorStore, 6 + auth::AuthenticatedUser, 7 + error::ApiError, 8 + serve::{ActorStorage, AppState}, 9 + }; 10 + use anyhow::{Result, bail}; 11 + use axum::{Json, extract::State}; 12 + use cidv10::Cid; 13 + use rsky_lexicon::com::atproto::repo::DeleteRecordInput; 14 + use rsky_pds::repo::prepare::{PrepareDeleteOpts, prepare_delete}; 15 + use rsky_pds::sequencer::Sequencer; 16 + use rsky_repo::types::PreparedWrite; 17 + use rsky_syntax::aturi::AtUri; 18 + use std::collections::HashMap; 19 + use std::hash::RandomState; 20 + use std::str::FromStr; 21 + use std::sync::Arc; 22 + use tokio::sync::RwLock; 23 + 24 + async fn inner_delete_record( 25 + body: DeleteRecordInput, 26 + user: AuthenticatedUser, 27 + sequencer: Arc<RwLock<Sequencer>>, 28 + actor_pools: HashMap<String, ActorStorage>, 29 + account_manager: Arc<RwLock<AccountManager>>, 30 + ) -> Result<()> { 31 + let DeleteRecordInput { 32 + repo, 33 + collection, 34 + rkey, 35 + swap_record, 36 + swap_commit, 37 + } = body; 38 + let account = account_manager 39 + .read() 40 + .await 41 + .get_account( 42 + &repo, 43 + Some(AvailabilityFlags { 44 + include_deactivated: Some(true), 45 + include_taken_down: None, 46 + }), 47 + ) 48 + .await?; 49 + match account { 50 + None => bail!("Could not find repo: `{repo}`"), 51 + Some(account) if account.deactivated_at.is_some() => bail!("Account is deactivated"), 52 + Some(account) => { 53 + let did = account.did; 54 + // if did != auth.access.credentials.unwrap().did.unwrap() { 55 + if did != user.did() { 56 + bail!("AuthRequiredError") 57 + } 58 + 59 + let swap_commit_cid = match swap_commit { 60 + Some(swap_commit) => Some(Cid::from_str(&swap_commit)?), 61 + None => None, 62 + }; 63 + let swap_record_cid = match swap_record { 64 + Some(swap_record) => Some(Cid::from_str(&swap_record)?), 65 + None => None, 66 + }; 67 + 68 + let write = prepare_delete(PrepareDeleteOpts { 69 + did: did.clone(), 70 + collection, 71 + rkey, 72 + swap_cid: swap_record_cid, 73 + })?; 74 + let mut actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 75 + let write_at_uri: AtUri = write.uri.clone().try_into()?; 76 + let record = actor_store 77 + .record 78 + .get_record(&write_at_uri, None, Some(true)) 79 + .await?; 80 + let commit = match record { 81 + None => return Ok(()), // No-op if record already doesn't exist 82 + Some(_) => { 83 + actor_store 84 + .process_writes(vec![PreparedWrite::Delete(write.clone())], swap_commit_cid) 85 + .await? 86 + } 87 + }; 88 + 89 + _ = sequencer 90 + .write() 91 + .await 92 + .sequence_commit(did.clone(), commit.clone()) 93 + .await?; 94 + account_manager 95 + .write() 96 + .await 97 + .update_repo_root( 98 + did, 99 + commit.commit_data.cid, 100 + commit.commit_data.rev, 101 + &actor_pools, 102 + ) 103 + .await?; 104 + 105 + Ok(()) 106 + } 107 + } 108 + } 109 + 110 + /// Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 111 + /// - POST /xrpc/com.atproto.repo.deleteRecord 112 + /// ### Request Body 113 + /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 114 + /// - `collection`: `nsid` // The NSID of the record collection. 115 + /// - `rkey`: `string` // The record key. <= 512 characters. 116 + /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. 117 + /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 118 + /// ### Responses 119 + /// - 200 OK: {"commit": {"cid": "string","rev": "string"}} 120 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 121 + /// - 401 Unauthorized 122 + #[axum::debug_handler(state = AppState)] 123 + pub async fn delete_record( 124 + user: AuthenticatedUser, 125 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 126 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 127 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 128 + Json(body): Json<DeleteRecordInput>, 129 + ) -> Result<(), ApiError> { 130 + match inner_delete_record(body, user, sequencer, db_actors, account_manager).await { 131 + Ok(()) => Ok(()), 132 + Err(error) => { 133 + tracing::error!("@LOG: ERROR: {error}"); 134 + Err(ApiError::RuntimeError) 135 + } 136 + } 137 + }
+78
src/apis/com/atproto/repo/describe_repo.rs
··· 1 + //! Get information about an account and repository, including the list of collections. Does not require auth. 2 + use crate::account_manager::AccountManager; 3 + use crate::serve::ActorStorage; 4 + use crate::{actor_store::ActorStore, error::ApiError, serve::AppState}; 5 + use anyhow::{Result, bail}; 6 + use axum::extract::Query; 7 + use axum::{Json, extract::State}; 8 + use rsky_identity::IdResolver; 9 + use rsky_identity::types::DidDocument; 10 + use rsky_lexicon::com::atproto::repo::DescribeRepoOutput; 11 + use rsky_syntax::handle::INVALID_HANDLE; 12 + use std::collections::HashMap; 13 + use std::hash::RandomState; 14 + use std::sync::Arc; 15 + use tokio::sync::RwLock; 16 + 17 + async fn inner_describe_repo( 18 + repo: String, 19 + id_resolver: Arc<RwLock<IdResolver>>, 20 + actor_pools: HashMap<String, ActorStorage>, 21 + account_manager: Arc<RwLock<AccountManager>>, 22 + ) -> Result<DescribeRepoOutput> { 23 + let account = account_manager 24 + .read() 25 + .await 26 + .get_account(&repo, None) 27 + .await?; 28 + match account { 29 + None => bail!("Cound not find user: `{repo}`"), 30 + Some(account) => { 31 + let mut lock = id_resolver.write().await; 32 + let did_doc: DidDocument = match lock.did.ensure_resolve(&account.did, None).await { 33 + Err(err) => bail!("Could not resolve DID: `{err}`"), 34 + Ok(res) => res, 35 + }; 36 + let handle = rsky_common::get_handle(&did_doc); 37 + let handle_is_correct = handle == account.handle; 38 + 39 + let actor_store = 40 + ActorStore::from_actor_pools(&account.did.clone(), &actor_pools).await; 41 + let collections = actor_store.record.list_collections().await?; 42 + 43 + Ok(DescribeRepoOutput { 44 + handle: account.handle.unwrap_or(INVALID_HANDLE.to_string()), 45 + did: account.did, 46 + did_doc: serde_json::to_value(did_doc)?, 47 + collections, 48 + handle_is_correct, 49 + }) 50 + } 51 + } 52 + } 53 + 54 + /// Get information about an account and repository, including the list of collections. Does not require auth. 55 + /// - GET /xrpc/com.atproto.repo.describeRepo 56 + /// ### Query Parameters 57 + /// - `repo`: `at-identifier` // The handle or DID of the repo. 58 + /// ### Responses 59 + /// - 200 OK: {"handle": "string","did": "string","didDoc": {},"collections": [string],"handleIsCorrect": true} \ 60 + /// handeIsCorrect - boolean - Indicates if handle is currently valid (resolves bi-directionally) 61 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 62 + /// - 401 Unauthorized 63 + #[tracing::instrument(skip_all)] 64 + #[axum::debug_handler(state = AppState)] 65 + pub async fn describe_repo( 66 + Query(input): Query<atrium_api::com::atproto::repo::describe_repo::ParametersData>, 67 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 68 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 69 + State(id_resolver): State<Arc<RwLock<IdResolver>>>, 70 + ) -> Result<Json<DescribeRepoOutput>, ApiError> { 71 + match inner_describe_repo(input.repo.into(), id_resolver, db_actors, account_manager).await { 72 + Ok(res) => Ok(Json(res)), 73 + Err(error) => { 74 + tracing::error!("{error:?}"); 75 + Err(ApiError::RuntimeError) 76 + } 77 + } 78 + }
+35
src/apis/com/atproto/repo/ex.rs
··· 1 + //! 2 + use crate::account_manager::AccountManager; 3 + use crate::serve::ActorStorage; 4 + use crate::{actor_store::ActorStore, error::ApiError, serve::AppState}; 5 + use anyhow::{Result, bail}; 6 + use axum::extract::Query; 7 + use axum::{Json, extract::State}; 8 + use rsky_identity::IdResolver; 9 + use rsky_pds::sequencer::Sequencer; 10 + use std::collections::HashMap; 11 + use std::hash::RandomState; 12 + use std::sync::Arc; 13 + use tokio::sync::RwLock; 14 + 15 + async fn fun( 16 + actor_pools: HashMap<String, ActorStorage>, 17 + account_manager: Arc<RwLock<AccountManager>>, 18 + id_resolver: Arc<RwLock<IdResolver>>, 19 + sequencer: Arc<RwLock<Sequencer>>, 20 + ) -> Result<_> { 21 + todo!(); 22 + } 23 + 24 + /// 25 + #[tracing::instrument(skip_all)] 26 + #[axum::debug_handler(state = AppState)] 27 + pub async fn fun( 28 + Query(input): Query<atrium_api::com::atproto::repo::describe_repo::ParametersData>, 29 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 30 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 31 + State(id_resolver): State<Arc<RwLock<IdResolver>>>, 32 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 33 + ) -> Result<Json<_>, ApiError> { 34 + todo!(); 35 + }
+121
src/apis/com/atproto/repo/get_record.rs
··· 1 + //! Get a single record from a repository. Does not require auth. 2 + use crate::account_manager::AccountManager; 3 + use crate::serve::ActorStorage; 4 + use crate::{actor_store::ActorStore, error::ApiError, serve::AppState}; 5 + use anyhow::{Result, bail}; 6 + use axum::extract::Query; 7 + use axum::{Json, extract::State}; 8 + use rsky_identity::IdResolver; 9 + use rsky_lexicon::com::atproto::repo::GetRecordOutput; 10 + use rsky_pds::pipethrough::{OverrideOpts, ProxyRequest, pipethrough}; 11 + use rsky_pds::sequencer::Sequencer; 12 + use rsky_syntax::aturi::AtUri; 13 + use std::collections::HashMap; 14 + use std::hash::RandomState; 15 + use std::sync::Arc; 16 + use tokio::sync::RwLock; 17 + 18 + async fn inner_get_record( 19 + repo: String, 20 + collection: String, 21 + rkey: String, 22 + cid: Option<String>, 23 + // req: ProxyRequest<'_>, 24 + actor_pools: HashMap<String, ActorStorage>, 25 + account_manager: Arc<RwLock<AccountManager>>, 26 + ) -> Result<GetRecordOutput> { 27 + let did = account_manager 28 + .read() 29 + .await 30 + .get_did_for_actor(&repo, None) 31 + .await?; 32 + 33 + // fetch from pds if available, if not then fetch from appview 34 + if let Some(did) = did { 35 + let uri = AtUri::make(did.clone(), Some(collection), Some(rkey))?; 36 + 37 + let mut actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 38 + 39 + match actor_store.record.get_record(&uri, cid, None).await { 40 + Ok(Some(record)) if record.takedown_ref.is_none() => Ok(GetRecordOutput { 41 + uri: uri.to_string(), 42 + cid: Some(record.cid), 43 + value: serde_json::to_value(record.value)?, 44 + }), 45 + _ => bail!("Could not locate record: `{uri}`"), 46 + } 47 + } else { 48 + // match req.cfg.bsky_app_view { 49 + // None => bail!("Could not locate record"), 50 + // Some(_) => match pipethrough( 51 + // &req, 52 + // None, 53 + // OverrideOpts { 54 + // aud: None, 55 + // lxm: None, 56 + // }, 57 + // ) 58 + // .await 59 + // { 60 + // Err(error) => { 61 + // tracing::error!("@LOG: ERROR: {error}"); 62 + bail!("Could not locate record") 63 + // } 64 + // Ok(res) => { 65 + // let output: GetRecordOutput = serde_json::from_slice(res.buffer.as_slice())?; 66 + // Ok(output) 67 + // } 68 + // }, 69 + // } 70 + } 71 + } 72 + 73 + /// Get a single record from a repository. Does not require auth. 74 + /// - GET /xrpc/com.atproto.repo.getRecord 75 + /// ### Query Parameters 76 + /// - `repo`: `at-identifier` // The handle or DID of the repo. 77 + /// - `collection`: `nsid` // The NSID of the record collection. 78 + /// - `rkey`: `string` // The record key. <= 512 characters. 79 + /// - `cid`: `cid` // The CID of the version of the record. If not specified, then return the most recent version. 80 + /// ### Responses 81 + /// - 200 OK: {"uri": "string","cid": "string","value": {}} 82 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RecordNotFound`]} 83 + /// - 401 Unauthorized 84 + #[tracing::instrument(skip_all)] 85 + #[axum::debug_handler(state = AppState)] 86 + pub async fn get_record( 87 + Query(input): Query<ParametersData>, 88 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 89 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 90 + ) -> Result<Json<GetRecordOutput>, ApiError> { 91 + let repo = input.repo; 92 + let collection = input.collection; 93 + let rkey = input.rkey; 94 + let cid = input.cid; 95 + // let req: ProxyRequest = todo!(); // TODO: Implement service proxy 96 + match inner_get_record( 97 + repo, 98 + collection, 99 + rkey, 100 + cid, 101 + // req, 102 + db_actors, 103 + account_manager, 104 + ) 105 + .await 106 + { 107 + Ok(res) => Ok(Json(res)), 108 + Err(error) => { 109 + tracing::error!("@LOG: ERROR: {error}"); 110 + Err(ApiError::RecordNotFound) 111 + } 112 + } 113 + } 114 + 115 + #[derive(serde::Deserialize, Debug)] 116 + pub struct ParametersData { 117 + pub cid: Option<String>, 118 + pub collection: String, 119 + pub repo: String, 120 + pub rkey: String, 121 + }
+244
src/apis/com/atproto/repo/import_repo.rs
··· 1 + use crate::account_manager::AccountManager; 2 + use crate::auth::AuthenticatedUser; 3 + use crate::serve::ActorStorage; 4 + use crate::{actor_store::ActorStore, error::ApiError, serve::AppState}; 5 + use anyhow::{Result, bail}; 6 + use axum::extract::Query; 7 + use axum::{Json, extract::State}; 8 + use cidv10::Cid; 9 + use futures::{StreamExt, stream}; 10 + use reqwest::header; 11 + use rsky_common::env::env_int; 12 + use rsky_identity::IdResolver; 13 + use rsky_pds::repo::prepare::{ 14 + PrepareCreateOpts, PrepareDeleteOpts, PrepareUpdateOpts, prepare_create, prepare_delete, 15 + prepare_update, 16 + }; 17 + use rsky_pds::sequencer::Sequencer; 18 + use rsky_repo::block_map::BlockMap; 19 + use rsky_repo::car::{CarWithRoot, read_stream_car_with_root}; 20 + use rsky_repo::parse::get_and_parse_record; 21 + use rsky_repo::repo::Repo; 22 + use rsky_repo::sync::consumer::{VerifyRepoInput, verify_diff}; 23 + use rsky_repo::types::{PreparedWrite, RecordWriteDescript, VerifiedDiff}; 24 + use serde::Deserialize; 25 + use std::collections::HashMap; 26 + use std::hash::RandomState; 27 + use std::num::NonZeroU64; 28 + use std::sync::Arc; 29 + use tokio::sync::RwLock; 30 + 31 + struct ImportRepoInput { 32 + car_with_root: CarWithRoot, 33 + } 34 + 35 + impl<'de> Deserialize<'de> for ImportRepoInput { 36 + fn deserialize<D>(deserializer: D) -> core::result::Result<Self, D::Error> 37 + where 38 + D: serde::Deserializer<'de>, 39 + { 40 + async fn from_data(req: &Request<'_>, data: Data<'_>) -> Result<CarWithRoot, ApiError> { 41 + let max_import_size = env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes(); 42 + match req.headers().get_one(header::CONTENT_LENGTH.as_ref()) { 43 + None => { 44 + let error = 45 + ApiError::InvalidRequest("Missing content-length header".to_string()); 46 + req.local_cache(|| Some(error.clone())); 47 + return Err(error); 48 + } 49 + Some(res) => match res.parse::<NonZeroU64>() { 50 + Ok(content_length) => { 51 + if content_length.get().bytes() > max_import_size { 52 + let error = ApiError::InvalidRequest(format!( 53 + "Content-Length is greater than maximum of {max_import_size}" 54 + )); 55 + req.local_cache(|| Some(error.clone())); 56 + return Err(error); 57 + } 58 + 59 + let import_datastream = data.open(content_length.get().bytes()); 60 + read_stream_car_with_root(import_datastream).await 61 + } 62 + Err(_error) => { 63 + tracing::error!("{}", format!("Error parsing content-length\n{_error}")); 64 + let error = 65 + ApiError::InvalidRequest("Error parsing content-length".to_string()); 66 + req.local_cache(|| Some(error.clone())); 67 + Err(error) 68 + } 69 + }, 70 + } 71 + } 72 + } 73 + } 74 + 75 + // #[rocket::async_trait] 76 + // impl<'r> FromData<'r> for ImportRepoInput { 77 + // type Error = ApiError; 78 + 79 + // #[tracing::instrument(skip_all)] 80 + // async fn from_data(req: &'r Request<'_>, data: Data<'r>) -> Outcome<'r, Self, Self::Error> { 81 + // let max_import_size = env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes(); 82 + // match req.headers().get_one(header::CONTENT_LENGTH.as_ref()) { 83 + // None => { 84 + // let error = ApiError::InvalidRequest("Missing content-length header".to_string()); 85 + // req.local_cache(|| Some(error.clone())); 86 + // Outcome::Error((Status::BadRequest, error)) 87 + // } 88 + // Some(res) => match res.parse::<NonZeroU64>() { 89 + // Ok(content_length) => { 90 + // if content_length.get().bytes() > max_import_size { 91 + // let error = ApiError::InvalidRequest(format!( 92 + // "Content-Length is greater than maximum of {max_import_size}" 93 + // )); 94 + // req.local_cache(|| Some(error.clone())); 95 + // return Outcome::Error((Status::BadRequest, error)); 96 + // } 97 + 98 + // let import_datastream = data.open(content_length.get().bytes()); 99 + // match read_stream_car_with_root(import_datastream).await { 100 + // Ok(car_with_root) => Outcome::Success(ImportRepoInput { car_with_root }), 101 + // Err(error) => { 102 + // let error = ApiError::InvalidRequest(error.to_string()); 103 + // req.local_cache(|| Some(error.clone())); 104 + // Outcome::Error((Status::BadRequest, error)) 105 + // } 106 + // } 107 + // } 108 + // Err(_error) => { 109 + // tracing::error!("{}", format!("Error parsing content-length\n{_error}")); 110 + // let error = 111 + // ApiError::InvalidRequest("Error parsing content-length".to_string()); 112 + // req.local_cache(|| Some(error.clone())); 113 + // Outcome::Error((Status::BadRequest, error)) 114 + // } 115 + // }, 116 + // } 117 + // } 118 + // } 119 + 120 + #[tracing::instrument(skip_all)] 121 + #[axum::debug_handler(state = AppState)] 122 + pub async fn import_repo( 123 + // auth: AccessFullImport, 124 + user: AuthenticatedUser, 125 + Query(import_repo_input): Query<ImportRepoInput>, 126 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 127 + ) -> Result<(), ApiError> { 128 + // let requester = auth.access.credentials.unwrap().did.unwrap(); 129 + let requester = user.did(); 130 + let mut actor_store = ActorStore::from_actor_pools(&requester, &actor_pools).await; 131 + 132 + // Get current repo if it exists 133 + let curr_root: Option<Cid> = actor_store.get_repo_root().await; 134 + let curr_repo: Option<Repo> = match curr_root { 135 + None => None, 136 + Some(_root) => Some(Repo::load(actor_store.storage.clone(), curr_root).await?), 137 + }; 138 + 139 + // Process imported car 140 + let car_with_root = import_repo_input.car_with_root; 141 + 142 + // Get verified difference from current repo and imported repo 143 + let mut imported_blocks: BlockMap = car_with_root.blocks; 144 + let imported_root: Cid = car_with_root.root; 145 + let opts = VerifyRepoInput { 146 + ensure_leaves: Some(false), 147 + }; 148 + 149 + let diff: VerifiedDiff = match verify_diff( 150 + curr_repo, 151 + &mut imported_blocks, 152 + imported_root, 153 + None, 154 + None, 155 + Some(opts), 156 + ) 157 + .await 158 + { 159 + Ok(res) => res, 160 + Err(error) => { 161 + tracing::error!("{:?}", error); 162 + return Err(ApiError::RuntimeError); 163 + } 164 + }; 165 + 166 + let commit_data = diff.commit; 167 + let prepared_writes: Vec<PreparedWrite> = 168 + prepare_import_repo_writes(requester, diff.writes, &imported_blocks).await?; 169 + match actor_store 170 + .process_import_repo(commit_data, prepared_writes) 171 + .await 172 + { 173 + Ok(_res) => {} 174 + Err(error) => { 175 + tracing::error!("Error importing repo\n{error}"); 176 + return Err(ApiError::RuntimeError); 177 + } 178 + } 179 + 180 + Ok(()) 181 + } 182 + 183 + /// Converts list of RecordWriteDescripts into a list of PreparedWrites 184 + async fn prepare_import_repo_writes( 185 + _did: String, 186 + writes: Vec<RecordWriteDescript>, 187 + blocks: &BlockMap, 188 + ) -> Result<Vec<PreparedWrite>, ApiError> { 189 + match stream::iter(writes) 190 + .then(|write| { 191 + let did = _did.clone(); 192 + async move { 193 + Ok::<PreparedWrite, anyhow::Error>(match write { 194 + RecordWriteDescript::Create(write) => { 195 + let parsed_record = get_and_parse_record(blocks, write.cid)?; 196 + PreparedWrite::Create( 197 + prepare_create(PrepareCreateOpts { 198 + did: did.clone(), 199 + collection: write.collection, 200 + rkey: Some(write.rkey), 201 + swap_cid: None, 202 + record: parsed_record.record, 203 + validate: Some(true), 204 + }) 205 + .await?, 206 + ) 207 + } 208 + RecordWriteDescript::Update(write) => { 209 + let parsed_record = get_and_parse_record(blocks, write.cid)?; 210 + PreparedWrite::Update( 211 + prepare_update(PrepareUpdateOpts { 212 + did: did.clone(), 213 + collection: write.collection, 214 + rkey: write.rkey, 215 + swap_cid: None, 216 + record: parsed_record.record, 217 + validate: Some(true), 218 + }) 219 + .await?, 220 + ) 221 + } 222 + RecordWriteDescript::Delete(write) => { 223 + PreparedWrite::Delete(prepare_delete(PrepareDeleteOpts { 224 + did: did.clone(), 225 + collection: write.collection, 226 + rkey: write.rkey, 227 + swap_cid: None, 228 + })?) 229 + } 230 + }) 231 + } 232 + }) 233 + .collect::<Vec<_>>() 234 + .await 235 + .into_iter() 236 + .collect::<anyhow::Result<Vec<PreparedWrite>, _>>() 237 + { 238 + Ok(res) => Ok(res), 239 + Err(error) => { 240 + tracing::error!("Error preparing import repo writes\n{error}"); 241 + Err(ApiError::RuntimeError) 242 + } 243 + } 244 + }
+31 -15
src/apis/com/atproto/repo/mod.rs
··· 1 1 use atrium_api::com::atproto::repo; 2 - use axum::{Router, routing::post}; 2 + use axum::{ 3 + Router, 4 + routing::{get, post}, 5 + }; 3 6 use constcat::concat; 4 7 5 8 use crate::serve::AppState; 6 9 7 10 pub mod apply_writes; 8 - // pub mod create_record; 9 - // pub mod delete_record; 10 - // pub mod describe_repo; 11 - // pub mod get_record; 11 + pub mod create_record; 12 + pub mod delete_record; 13 + pub mod describe_repo; 14 + pub mod get_record; 12 15 // pub mod import_repo; 13 16 // pub mod list_missing_blobs; 14 17 // pub mod list_records; ··· 29 32 /// - [ ] xx /xrpc/com.atproto.repo.importRepo 30 33 // - [ ] xx /xrpc/com.atproto.repo.listMissingBlobs 31 34 pub(crate) fn routes() -> Router<AppState> { 32 - Router::new().route( 33 - concat!("/", repo::apply_writes::NSID), 34 - post(apply_writes::apply_writes), 35 - ) 36 - // .route(concat!("/", repo::create_record::NSID), post(create_record)) 37 - // .route(concat!("/", repo::put_record::NSID), post(put_record)) 38 - // .route(concat!("/", repo::delete_record::NSID), post(delete_record)) 39 - // .route(concat!("/", repo::upload_blob::NSID), post(upload_blob)) 40 - // .route(concat!("/", repo::describe_repo::NSID), get(describe_repo)) 41 - // .route(concat!("/", repo::get_record::NSID), get(get_record)) 35 + Router::new() 36 + .route( 37 + concat!("/", repo::apply_writes::NSID), 38 + post(apply_writes::apply_writes), 39 + ) 40 + .route( 41 + concat!("/", repo::create_record::NSID), 42 + post(create_record::create_record), 43 + ) 44 + // .route(concat!("/", repo::put_record::NSID), post(put_record)) 45 + .route( 46 + concat!("/", repo::delete_record::NSID), 47 + post(delete_record::delete_record), 48 + ) 49 + // .route(concat!("/", repo::upload_blob::NSID), post(upload_blob)) 50 + .route( 51 + concat!("/", repo::describe_repo::NSID), 52 + get(describe_repo::describe_repo), 53 + ) 54 + .route( 55 + concat!("/", repo::get_record::NSID), 56 + get(get_record::get_record), 57 + ) 42 58 // .route(concat!("/", repo::import_repo::NSID), post(todo)) 43 59 // .route(concat!("/", repo::list_missing_blobs::NSID), get(todo)) 44 60 // .route(concat!("/", repo::list_records::NSID), get(list_records))
+6
src/error.rs
··· 228 228 } 229 229 } 230 230 231 + impl From<anyhow::Error> for ApiError { 232 + fn from(_value: anyhow::Error) -> Self { 233 + Self::RuntimeError 234 + } 235 + } 236 + 231 237 impl From<handle::errors::Error> for ApiError { 232 238 fn from(value: handle::errors::Error) -> Self { 233 239 match value.kind {
+28 -18
src/serve.rs
··· 14 14 use diesel_migrations::{EmbeddedMigrations, embed_migrations}; 15 15 use figment::{Figment, providers::Format as _}; 16 16 use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 17 + use rsky_common::env::env_list; 18 + use rsky_identity::IdResolver; 19 + use rsky_identity::types::{DidCache, IdentityResolverOpts}; 20 + use rsky_pds::SharedIdResolver; 17 21 use rsky_pds::{crawlers::Crawlers, sequencer::Sequencer}; 18 22 use serde::{Deserialize, Serialize}; 23 + use std::env; 19 24 use std::{ 20 25 net::{IpAddr, Ipv4Addr, SocketAddr}, 21 26 path::PathBuf, ··· 32 37 33 38 /// Embedded migrations 34 39 pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); 40 + pub const MIGRATIONS_ACTOR: EmbeddedMigrations = embed_migrations!("./migrations_actor"); 35 41 36 42 /// The application-wide result type. 37 43 pub type Result<T> = std::result::Result<T, Error>; 38 44 /// The reqwest client type with middleware. 39 45 pub type Client = reqwest_middleware::ClientWithMiddleware; 40 - 41 - /// The Shared Sequencer which requests crawls from upstream relays and emits events to the firehose. 42 - pub struct SharedSequencer { 43 - /// The sequencer instance. 44 - pub sequencer: RwLock<Sequencer>, 45 - } 46 46 47 47 #[expect( 48 48 clippy::arbitrary_source_item_ordering, ··· 136 136 /// The simple HTTP client. 137 137 pub simple_client: reqwest::Client, 138 138 /// The firehose producer. 139 - pub sequencer: Arc<SharedSequencer>, 139 + pub sequencer: Arc<RwLock<Sequencer>>, 140 140 /// The account manager. 141 - pub account_manager: Arc<SharedAccountManager>, 141 + pub account_manager: Arc<RwLock<AccountManager>>, 142 + /// The ID resolver. 143 + pub id_resolver: Arc<RwLock<IdResolver>>, 142 144 143 145 /// The signing key. 144 146 pub signing_key: SigningKey, ··· 293 295 .iter() 294 296 .map(|s| s.to_string()) 295 297 .collect(); 296 - let sequencer = Arc::new(SharedSequencer { 297 - sequencer: RwLock::new(Sequencer::new( 298 - Crawlers::new(hostname, crawlers.clone()), 299 - None, 300 - )), 301 - }); 302 - let account_manager = SharedAccountManager { 303 - account_manager: RwLock::new(AccountManager::new(pool.clone())), 298 + let sequencer = Arc::new(RwLock::new(Sequencer::new( 299 + Crawlers::new(hostname, crawlers.clone()), 300 + None, 301 + ))); 302 + let account_manager = Arc::new(RwLock::new(AccountManager::new(pool.clone()))); 303 + let plc_url = if cfg!(debug_assertions) { 304 + "http://localhost:8000".to_owned() // dummy for debug 305 + } else { 306 + env::var("PDS_DID_PLC_URL").unwrap_or("https://plc.directory".to_owned()) // TODO: toml config 304 307 }; 308 + let id_resolver = Arc::new(RwLock::new(IdResolver::new(IdentityResolverOpts { 309 + timeout: None, 310 + plc_url: Some(plc_url), 311 + did_cache: Some(DidCache::new(None, None)), 312 + backup_nameservers: Some(env_list("PDS_HANDLE_BACKUP_NAMESERVERS")), 313 + }))); 305 314 306 315 let addr = config 307 316 .listen_address ··· 326 335 client: client.clone(), 327 336 simple_client, 328 337 sequencer: sequencer.clone(), 329 - account_manager: Arc::new(account_manager), 338 + account_manager, 339 + id_resolver, 330 340 signing_key: skey, 331 341 rotation_key: rkey, 332 342 }); ··· 406 416 info!("debug mode: not requesting crawl"); 407 417 } else { 408 418 info!("requesting crawl from upstream relays"); 409 - let mut background_sequencer = sequencer.sequencer.write().await.clone(); 419 + let mut background_sequencer = sequencer.write().await.clone(); 410 420 drop(tokio::spawn( 411 421 async move { background_sequencer.start().await }, 412 422 ));