Alternative ATProto PDS implementation

Compare changes

Choose any two refs to compare.

+2
Cargo.lock
··· 1310 "reqwest 0.12.15", 1311 "reqwest-middleware", 1312 "rsky-common", 1313 "rsky-lexicon", 1314 "rsky-pds", 1315 "rsky-repo", ··· 1325 "tower-http", 1326 "tracing", 1327 "tracing-subscriber", 1328 "url", 1329 "urlencoding", 1330 "uuid 1.16.0",
··· 1310 "reqwest 0.12.15", 1311 "reqwest-middleware", 1312 "rsky-common", 1313 + "rsky-identity", 1314 "rsky-lexicon", 1315 "rsky-pds", 1316 "rsky-repo", ··· 1326 "tower-http", 1327 "tracing", 1328 "tracing-subscriber", 1329 + "ubyte", 1330 "url", 1331 "urlencoding", 1332 "uuid 1.16.0",
+2
Cargo.toml
··· 158 rsky-pds = { git = "https://github.com/blacksky-algorithms/rsky.git" } 159 rsky-common = { git = "https://github.com/blacksky-algorithms/rsky.git" } 160 rsky-lexicon = { git = "https://github.com/blacksky-algorithms/rsky.git" } 161 162 # async in streams 163 # async-stream = "0.3" ··· 268 "sqlite", 269 "tracing", 270 ] }
··· 158 rsky-pds = { git = "https://github.com/blacksky-algorithms/rsky.git" } 159 rsky-common = { git = "https://github.com/blacksky-algorithms/rsky.git" } 160 rsky-lexicon = { git = "https://github.com/blacksky-algorithms/rsky.git" } 161 + rsky-identity = { git = "https://github.com/blacksky-algorithms/rsky.git" } 162 163 # async in streams 164 # async-stream = "0.3" ··· 269 "sqlite", 270 "tracing", 271 ] } 272 + ubyte = "0.10.4"
+31 -118
README.md
··· 11 \/_/ 12 ``` 13 14 - This is an implementation of an ATProto PDS, built with [Axum](https://github.com/tokio-rs/axum) and [Atrium](https://github.com/sugyan/atrium). 15 - This PDS implementation uses a SQLite database to store private account information and file storage to store canonical user data. 16 17 Heavily inspired by David Buchanan's [millipds](https://github.com/DavidBuchanan314/millipds). 18 - This implementation forked from the [azure-rust-app](https://github.com/DrChat/azure-rust-app) starter template and the upstream [DrChat/bluepds](https://github.com/DrChat/bluepds). 19 - See TODO below for this fork's changes from upstream. 20 21 If you want to see this fork in action, there is a live account hosted by this PDS at [@teq.shatteredsky.net](https://bsky.app/profile/teq.shatteredsky.net)! 22 23 > [!WARNING] 24 - > This PDS is undergoing heavy development. Do _NOT_ use this to host your primary account or any important data! 25 26 ## Quick Start 27 ``` ··· 43 - Size: 47 GB 44 - VPUs/GB: 10 45 46 - This is about half of the 3,000 OCPU hours and 18,000 GB hours available per month for free on the VM.Standard.A1.Flex shape. This is _without_ optimizing for costs. The PDS can likely be made much cheaper. 47 - 48 - ## Code map 49 - ``` 50 - * migrations/ - SQLite database migrations 51 - * src/ 52 - * endpoints/ - ATProto API endpoints 53 - * auth.rs - Authentication primitives 54 - * config.rs - Application configuration 55 - * did.rs - Decentralized Identifier helpers 56 - * error.rs - Axum error helpers 57 - * firehose.rs - ATProto firehose producer 58 - * main.rs - Main entrypoint 59 - * metrics.rs - Definitions for telemetry instruments 60 - * oauth.rs - OAuth routes 61 - * plc.rs - Functionality to access the Public Ledger of Credentials 62 - * storage.rs - Helpers to access user repository storage 63 - ``` 64 65 ## To-do 66 - ### Teq's fork 67 - - [ ] OAuth 68 - - [X] `/.well-known/oauth-protected-resource` - Authorization Server Metadata 69 - - [X] `/.well-known/oauth-authorization-server` 70 - - [X] `/par` - Pushed Authorization Request 71 - - [X] `/client-metadata.json` - Client metadata discovery 72 - - [X] `/oauth/authorize` 73 - - [X] `/oauth/authorize/sign-in` 74 - - [X] `/oauth/token` 75 - - [ ] Authorization flow - Backend client 76 - - [X] Authorization flow - Serverless browser app 77 - - [ ] DPoP-Nonce 78 - - [ ] Verify JWT signature with JWK 79 - - [ ] Email verification 80 - - [ ] 2FA 81 - - [ ] Admin endpoints 82 - - [ ] App passwords 83 - - [X] `listRecords` fixes 84 - - [X] Fix collection prefixing (terminate with `/`) 85 - - [X] Fix cursor handling (return `cid` instead of `key`) 86 - - [X] Session management (JWT) 87 - - [X] Match token fields to reference implementation 88 - - [X] RefreshSession from Bluesky Client 89 - - [X] Respond with JSON error message `ExpiredToken` 90 - - [X] Cursor handling 91 - - [X] Implement time-based unix microsecond sequences 92 - - [X] Startup with present cursor 93 - - [X] Respond `RecordNotFound`, required for: 94 - - [X] app.bsky.feed.postgate 95 - - [X] app.bsky.feed.threadgate 96 - - [ ] app.bsky... (profile creation?) 97 - - [X] Linting 98 - - [X] Rustfmt 99 - - [X] warnings 100 - - [X] deprecated-safe 101 - - [X] future-incompatible 102 - - [X] keyword-idents 103 - - [X] let-underscore 104 - - [X] nonstandard-style 105 - - [X] refining-impl-trait 106 - - [X] rust-2018-idioms 107 - - [X] rust-2018/2021/2024-compatibility 108 - - [X] ungrouped 109 - - [X] Clippy 110 - - [X] nursery 111 - - [X] correctness 112 - - [X] suspicious 113 - - [X] complexity 114 - - [X] perf 115 - - [X] style 116 - - [X] pedantic 117 - - [X] cargo 118 - - [X] ungrouped 119 - 120 - ### High-level features 121 - - [ ] Storage backend abstractions 122 - - [ ] Azure blob storage backend 123 - - [ ] Backblaze b2(?) 124 - - [ ] Telemetry 125 - - [X] [Metrics](https://github.com/metrics-rs/metrics) (counters/gauges/etc) 126 - - [X] Exporters for common backends (Prometheus/etc) 127 - 128 ### APIs 129 - - [X] [Service proxying](https://atproto.com/specs/xrpc#service-proxying) 130 - - [X] UG /xrpc/_health (undocumented, but impl by reference PDS) 131 <!-- - [ ] xx /xrpc/app.bsky.notification.registerPush 132 - app.bsky.actor 133 - - [X] AG /xrpc/app.bsky.actor.getPreferences 134 - [ ] xx /xrpc/app.bsky.actor.getProfile 135 - [ ] xx /xrpc/app.bsky.actor.getProfiles 136 - - [X] AP /xrpc/app.bsky.actor.putPreferences 137 - app.bsky.feed 138 - [ ] xx /xrpc/app.bsky.feed.getActorLikes 139 - [ ] xx /xrpc/app.bsky.feed.getAuthorFeed ··· 157 - com.atproto.identity 158 - [ ] xx /xrpc/com.atproto.identity.getRecommendedDidCredentials 159 - [ ] AP /xrpc/com.atproto.identity.requestPlcOperationSignature 160 - - [X] UG /xrpc/com.atproto.identity.resolveHandle 161 - [ ] AP /xrpc/com.atproto.identity.signPlcOperation 162 - [ ] xx /xrpc/com.atproto.identity.submitPlcOperation 163 - - [X] AP /xrpc/com.atproto.identity.updateHandle 164 <!-- - com.atproto.moderation 165 - [ ] xx /xrpc/com.atproto.moderation.createReport --> 166 - com.atproto.repo ··· 169 - [X] AP /xrpc/com.atproto.repo.deleteRecord 170 - [X] UG /xrpc/com.atproto.repo.describeRepo 171 - [X] UG /xrpc/com.atproto.repo.getRecord 172 - - [ ] xx /xrpc/com.atproto.repo.importRepo 173 - - [ ] xx /xrpc/com.atproto.repo.listMissingBlobs 174 - [X] UG /xrpc/com.atproto.repo.listRecords 175 - [X] AP /xrpc/com.atproto.repo.putRecord 176 - [X] AP /xrpc/com.atproto.repo.uploadBlob ··· 178 - [ ] xx /xrpc/com.atproto.server.activateAccount 179 - [ ] xx /xrpc/com.atproto.server.checkAccountStatus 180 - [ ] xx /xrpc/com.atproto.server.confirmEmail 181 - - [X] UP /xrpc/com.atproto.server.createAccount 182 - [ ] xx /xrpc/com.atproto.server.createAppPassword 183 - - [X] AP /xrpc/com.atproto.server.createInviteCode 184 - [ ] xx /xrpc/com.atproto.server.createInviteCodes 185 - - [X] UP /xrpc/com.atproto.server.createSession 186 - [ ] xx /xrpc/com.atproto.server.deactivateAccount 187 - [ ] xx /xrpc/com.atproto.server.deleteAccount 188 - [ ] xx /xrpc/com.atproto.server.deleteSession 189 - - [X] UG /xrpc/com.atproto.server.describeServer 190 - [ ] xx /xrpc/com.atproto.server.getAccountInviteCodes 191 - - [X] AG /xrpc/com.atproto.server.getServiceAuth 192 - - [X] AG /xrpc/com.atproto.server.getSession 193 - [ ] xx /xrpc/com.atproto.server.listAppPasswords 194 - [ ] xx /xrpc/com.atproto.server.refreshSession 195 - [ ] xx /xrpc/com.atproto.server.requestAccountDelete ··· 201 - [ ] xx /xrpc/com.atproto.server.revokeAppPassword 202 - [ ] xx /xrpc/com.atproto.server.updateEmail 203 - com.atproto.sync 204 - - [X] UG /xrpc/com.atproto.sync.getBlob 205 - - [X] UG /xrpc/com.atproto.sync.getBlocks 206 - - [X] UG /xrpc/com.atproto.sync.getLatestCommit 207 - - [X] UG /xrpc/com.atproto.sync.getRecord 208 - - [X] UG /xrpc/com.atproto.sync.getRepo 209 - - [X] UG /xrpc/com.atproto.sync.getRepoStatus 210 - - [X] UG /xrpc/com.atproto.sync.listBlobs 211 - - [X] UG /xrpc/com.atproto.sync.listRepos 212 - - [X] UG /xrpc/com.atproto.sync.subscribeRepos 213 214 - ## Quick Deployment (Azure CLI) 215 - ``` 216 - az group create --name "webapp" --location southcentralus 217 - az deployment group create --resource-group "webapp" --template-file .\deployment.bicep --parameters webAppName=testapp 218 - 219 - az acr login --name <insert name of ACR resource here> 220 - docker build -t <ACR>.azurecr.io/testapp:latest . 221 - docker push <ACR>.azurecr.io/testapp:latest 222 - ``` 223 - ## Quick Deployment (NixOS) 224 ```nix 225 { 226 inputs = {
··· 11 \/_/ 12 ``` 13 14 + This is an implementation of an ATProto PDS, built with [Axum](https://github.com/tokio-rs/axum), [rsky](https://github.com/blacksky-algorithms/rsky/) and [Atrium](https://github.com/sugyan/atrium). 15 + This PDS implementation uses a SQLite database and [diesel.rs](https://diesel.rs/) ORM to store canonical user data, and file system storage to store user blobs. 16 17 Heavily inspired by David Buchanan's [millipds](https://github.com/DavidBuchanan314/millipds). 18 + This implementation forked from [DrChat/bluepds](https://github.com/DrChat/bluepds), and now makes heavy use of the [rsky-repo](https://github.com/blacksky-algorithms/rsky/tree/main/rsky-repo) repository implementation. 19 + The `actor_store` and `account_manager` modules have been reimplemented from [rsky-pds](https://github.com/blacksky-algorithms/rsky/tree/main/rsky-pds) to use a SQLite backend and file storage, which are themselves adapted from the [original Bluesky implementation](https://github.com/bluesky-social/atproto) using SQLite in Typescript. 20 + 21 22 If you want to see this fork in action, there is a live account hosted by this PDS at [@teq.shatteredsky.net](https://bsky.app/profile/teq.shatteredsky.net)! 23 24 > [!WARNING] 25 + > This PDS is undergoing heavy development, and this branch is not at an operable release. Do _NOT_ use this to host your primary account or any important data! 26 27 ## Quick Start 28 ``` ··· 44 - Size: 47 GB 45 - VPUs/GB: 10 46 47 + This is about half of the 3,000 OCPU hours and 18,000 GB hours available per month for free on the VM.Standard.A1.Flex shape. This is _without_ optimizing for costs. The PDS can likely be made to run on much less resources. 48 49 ## To-do 50 ### APIs 51 + - [ ] [Service proxying](https://atproto.com/specs/xrpc#service-proxying) 52 + - [ ] UG /xrpc/_health (undocumented, but impl by reference PDS) 53 <!-- - [ ] xx /xrpc/app.bsky.notification.registerPush 54 - app.bsky.actor 55 + - [ ] AG /xrpc/app.bsky.actor.getPreferences 56 - [ ] xx /xrpc/app.bsky.actor.getProfile 57 - [ ] xx /xrpc/app.bsky.actor.getProfiles 58 + - [ ] AP /xrpc/app.bsky.actor.putPreferences 59 - app.bsky.feed 60 - [ ] xx /xrpc/app.bsky.feed.getActorLikes 61 - [ ] xx /xrpc/app.bsky.feed.getAuthorFeed ··· 79 - com.atproto.identity 80 - [ ] xx /xrpc/com.atproto.identity.getRecommendedDidCredentials 81 - [ ] AP /xrpc/com.atproto.identity.requestPlcOperationSignature 82 + - [ ] UG /xrpc/com.atproto.identity.resolveHandle 83 - [ ] AP /xrpc/com.atproto.identity.signPlcOperation 84 - [ ] xx /xrpc/com.atproto.identity.submitPlcOperation 85 + - [ ] AP /xrpc/com.atproto.identity.updateHandle 86 <!-- - com.atproto.moderation 87 - [ ] xx /xrpc/com.atproto.moderation.createReport --> 88 - com.atproto.repo ··· 91 - [X] AP /xrpc/com.atproto.repo.deleteRecord 92 - [X] UG /xrpc/com.atproto.repo.describeRepo 93 - [X] UG /xrpc/com.atproto.repo.getRecord 94 + - [X] xx /xrpc/com.atproto.repo.importRepo 95 + - [X] xx /xrpc/com.atproto.repo.listMissingBlobs 96 - [X] UG /xrpc/com.atproto.repo.listRecords 97 - [X] AP /xrpc/com.atproto.repo.putRecord 98 - [X] AP /xrpc/com.atproto.repo.uploadBlob ··· 100 - [ ] xx /xrpc/com.atproto.server.activateAccount 101 - [ ] xx /xrpc/com.atproto.server.checkAccountStatus 102 - [ ] xx /xrpc/com.atproto.server.confirmEmail 103 + - [ ] UP /xrpc/com.atproto.server.createAccount 104 - [ ] xx /xrpc/com.atproto.server.createAppPassword 105 + - [ ] AP /xrpc/com.atproto.server.createInviteCode 106 - [ ] xx /xrpc/com.atproto.server.createInviteCodes 107 + - [ ] UP /xrpc/com.atproto.server.createSession 108 - [ ] xx /xrpc/com.atproto.server.deactivateAccount 109 - [ ] xx /xrpc/com.atproto.server.deleteAccount 110 - [ ] xx /xrpc/com.atproto.server.deleteSession 111 + - [ ] UG /xrpc/com.atproto.server.describeServer 112 - [ ] xx /xrpc/com.atproto.server.getAccountInviteCodes 113 + - [ ] AG /xrpc/com.atproto.server.getServiceAuth 114 + - [ ] AG /xrpc/com.atproto.server.getSession 115 - [ ] xx /xrpc/com.atproto.server.listAppPasswords 116 - [ ] xx /xrpc/com.atproto.server.refreshSession 117 - [ ] xx /xrpc/com.atproto.server.requestAccountDelete ··· 123 - [ ] xx /xrpc/com.atproto.server.revokeAppPassword 124 - [ ] xx /xrpc/com.atproto.server.updateEmail 125 - com.atproto.sync 126 + - [ ] UG /xrpc/com.atproto.sync.getBlob 127 + - [ ] UG /xrpc/com.atproto.sync.getBlocks 128 + - [ ] UG /xrpc/com.atproto.sync.getLatestCommit 129 + - [ ] UG /xrpc/com.atproto.sync.getRecord 130 + - [ ] UG /xrpc/com.atproto.sync.getRepo 131 + - [ ] UG /xrpc/com.atproto.sync.getRepoStatus 132 + - [ ] UG /xrpc/com.atproto.sync.listBlobs 133 + - [ ] UG /xrpc/com.atproto.sync.listRepos 134 + - [ ] UG /xrpc/com.atproto.sync.subscribeRepos 135 136 + ## Deployment (NixOS) 137 ```nix 138 { 139 inputs = {
+6 -4
src/actor_store/blob.rs
··· 6 7 use crate::models::actor_store as models; 8 use anyhow::{Result, bail}; 9 use cidv10::Cid; 10 use diesel::dsl::{count_distinct, exists, not}; 11 use diesel::sql_types::{Integer, Nullable, Text}; ··· 138 pub async fn upload_blob_and_get_metadata( 139 &self, 140 user_suggested_mime: String, 141 - blob: Vec<u8>, 142 ) -> Result<BlobMetadata> { 143 let bytes = blob; 144 let size = bytes.len() as i64; 145 146 let (temp_key, sha256, img_info, sniffed_mime) = try_join!( 147 self.blobstore.put_temp(bytes.clone()), 148 - sha256_stream(bytes.clone()), 149 - image::maybe_get_info(bytes.clone()), 150 - image::mime_type_from_bytes(bytes.clone()) 151 )?; 152 153 let cid = sha256_raw_to_cid(sha256);
··· 6 7 use crate::models::actor_store as models; 8 use anyhow::{Result, bail}; 9 + use axum::body::Bytes; 10 use cidv10::Cid; 11 use diesel::dsl::{count_distinct, exists, not}; 12 use diesel::sql_types::{Integer, Nullable, Text}; ··· 139 pub async fn upload_blob_and_get_metadata( 140 &self, 141 user_suggested_mime: String, 142 + blob: Bytes, 143 ) -> Result<BlobMetadata> { 144 let bytes = blob; 145 let size = bytes.len() as i64; 146 147 let (temp_key, sha256, img_info, sniffed_mime) = try_join!( 148 self.blobstore.put_temp(bytes.clone()), 149 + // TODO: reimpl funcs to use Bytes instead of Vec<u8> 150 + sha256_stream(bytes.to_vec()), 151 + image::maybe_get_info(bytes.to_vec()), 152 + image::mime_type_from_bytes(bytes.to_vec()) 153 )?; 154 155 let cid = sha256_raw_to_cid(sha256);
+8 -7
src/actor_store/blob_fs.rs
··· 1 //! File system implementation of blob storage 2 //! Based on the S3 implementation but using local file system instead 3 use anyhow::Result; 4 use cidv10::Cid; 5 use rsky_common::get_random_str; 6 use rsky_repo::error::BlobError; ··· 12 13 /// ByteStream implementation for blob data 14 pub struct ByteStream { 15 - pub bytes: Vec<u8>, 16 } 17 18 impl ByteStream { 19 /// Create a new ByteStream with the given bytes 20 - pub const fn new(bytes: Vec<u8>) -> Self { 21 Self { bytes } 22 } 23 24 /// Collect the bytes from the stream 25 - pub async fn collect(self) -> Result<Vec<u8>> { 26 Ok(self.bytes) 27 } 28 } ··· 99 } 100 101 /// Store a blob temporarily 102 - pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> { 103 let key = self.gen_key(); 104 let temp_path = self.get_tmp_path(&key); 105 ··· 142 } 143 144 /// Store a blob directly as permanent 145 - pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> { 146 let target_path = self.get_stored_path(cid); 147 148 // Ensure the directory exists ··· 188 let blob_path = self.get_stored_path(cid); 189 190 match async_fs::read(&blob_path).await { 191 - Ok(bytes) => Ok(ByteStream::new(bytes)), 192 Err(e) => { 193 error!("Failed to read blob at path {:?}: {}", blob_path, e); 194 Err(anyhow::Error::new(BlobError::BlobNotFoundError)) ··· 197 } 198 199 /// Get blob bytes 200 - pub async fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>> { 201 let stream = self.get_object(cid).await?; 202 stream.collect().await 203 }
··· 1 //! File system implementation of blob storage 2 //! Based on the S3 implementation but using local file system instead 3 use anyhow::Result; 4 + use axum::body::Bytes; 5 use cidv10::Cid; 6 use rsky_common::get_random_str; 7 use rsky_repo::error::BlobError; ··· 13 14 /// ByteStream implementation for blob data 15 pub struct ByteStream { 16 + pub bytes: Bytes, 17 } 18 19 impl ByteStream { 20 /// Create a new ByteStream with the given bytes 21 + pub const fn new(bytes: Bytes) -> Self { 22 Self { bytes } 23 } 24 25 /// Collect the bytes from the stream 26 + pub async fn collect(self) -> Result<Bytes> { 27 Ok(self.bytes) 28 } 29 } ··· 100 } 101 102 /// Store a blob temporarily 103 + pub async fn put_temp(&self, bytes: Bytes) -> Result<String> { 104 let key = self.gen_key(); 105 let temp_path = self.get_tmp_path(&key); 106 ··· 143 } 144 145 /// Store a blob directly as permanent 146 + pub async fn put_permanent(&self, cid: Cid, bytes: Bytes) -> Result<()> { 147 let target_path = self.get_stored_path(cid); 148 149 // Ensure the directory exists ··· 189 let blob_path = self.get_stored_path(cid); 190 191 match async_fs::read(&blob_path).await { 192 + Ok(bytes) => Ok(ByteStream::new(Bytes::from(bytes))), 193 Err(e) => { 194 error!("Failed to read blob at path {:?}: {}", blob_path, e); 195 Err(anyhow::Error::new(BlobError::BlobNotFoundError)) ··· 198 } 199 200 /// Get blob bytes 201 + pub async fn get_bytes(&self, cid: Cid) -> Result<Bytes> { 202 let stream = self.get_object(cid).await?; 203 stream.collect().await 204 }
+13 -33
src/apis/com/atproto/repo/apply_writes.rs
··· 1 //! Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS. 2 - use crate::account_manager::AccountManager; 3 - use crate::account_manager::helpers::account::AvailabilityFlags; 4 - use crate::{ 5 - actor_store::ActorStore, 6 - auth::AuthenticatedUser, 7 - error::ApiError, 8 - serve::{ActorStorage, AppState}, 9 - }; 10 - use anyhow::{Result, bail}; 11 - use axum::{Json, extract::State}; 12 - use cidv10::Cid; 13 - use futures::stream::{self, StreamExt}; 14 - use rsky_lexicon::com::atproto::repo::{ApplyWritesInput, ApplyWritesInputRefWrite}; 15 - use rsky_pds::repo::prepare::{ 16 - PrepareCreateOpts, PrepareDeleteOpts, PrepareUpdateOpts, prepare_create, prepare_delete, 17 - prepare_update, 18 - }; 19 - use rsky_pds::sequencer::Sequencer; 20 - use rsky_repo::types::PreparedWrite; 21 - use std::str::FromStr; 22 - use tokio::sync::RwLock; 23 24 async fn inner_apply_writes( 25 body: ApplyWritesInput, 26 - user: AuthenticatedUser, 27 - sequencer: &RwLock<Sequencer>, 28 - actor_pools: std::collections::HashMap<String, ActorStorage>, 29 - account_manager: &RwLock<AccountManager>, 30 ) -> Result<()> { 31 let tx: ApplyWritesInput = body; 32 let ApplyWritesInput { ··· 52 bail!("Account is deactivated") 53 } 54 let did = account.did; 55 - if did != user.did() { 56 bail!("AuthRequiredError") 57 } 58 let did: &String = &did; ··· 61 } 62 63 let writes: Vec<PreparedWrite> = stream::iter(tx.writes) 64 - .then(|write| async move { 65 Ok::<PreparedWrite, anyhow::Error>(match write { 66 ApplyWritesInputRefWrite::Create(write) => PreparedWrite::Create( 67 prepare_create(PrepareCreateOpts { ··· 144 /// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations. 145 #[axum::debug_handler(state = AppState)] 146 pub(crate) async fn apply_writes( 147 - user: AuthenticatedUser, 148 - State(state): State<AppState>, 149 Json(body): Json<ApplyWritesInput>, 150 ) -> Result<(), ApiError> { 151 tracing::debug!("@LOG: debug apply_writes {body:#?}"); 152 - let db_actors = state.db_actors; 153 - let sequencer = &state.sequencer.sequencer; 154 - let account_manager = &state.account_manager.account_manager; 155 - match inner_apply_writes(body, user, sequencer, db_actors, account_manager).await { 156 Ok(()) => Ok(()), 157 Err(error) => { 158 tracing::error!("@LOG: ERROR: {error}");
··· 1 //! Apply a batch transaction of repository creates, updates, and deletes. Requires auth, implemented by PDS. 2 + 3 + use super::*; 4 5 async fn inner_apply_writes( 6 body: ApplyWritesInput, 7 + auth: AuthenticatedUser, 8 + sequencer: Arc<RwLock<Sequencer>>, 9 + actor_pools: HashMap<String, ActorStorage>, 10 + account_manager: Arc<RwLock<AccountManager>>, 11 ) -> Result<()> { 12 let tx: ApplyWritesInput = body; 13 let ApplyWritesInput { ··· 33 bail!("Account is deactivated") 34 } 35 let did = account.did; 36 + if did != auth.did() { 37 bail!("AuthRequiredError") 38 } 39 let did: &String = &did; ··· 42 } 43 44 let writes: Vec<PreparedWrite> = stream::iter(tx.writes) 45 + .then(async |write| { 46 Ok::<PreparedWrite, anyhow::Error>(match write { 47 ApplyWritesInputRefWrite::Create(write) => PreparedWrite::Create( 48 prepare_create(PrepareCreateOpts { ··· 125 /// - `swap_commit`: `cid` // If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations. 126 #[axum::debug_handler(state = AppState)] 127 pub(crate) async fn apply_writes( 128 + auth: AuthenticatedUser, 129 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 130 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 131 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 132 Json(body): Json<ApplyWritesInput>, 133 ) -> Result<(), ApiError> { 134 tracing::debug!("@LOG: debug apply_writes {body:#?}"); 135 + match inner_apply_writes(body, auth, sequencer, actor_pools, account_manager).await { 136 Ok(()) => Ok(()), 137 Err(error) => { 138 tracing::error!("@LOG: ERROR: {error}");
+140
src/apis/com/atproto/repo/create_record.rs
···
··· 1 + //! Create a single new repository record. Requires auth, implemented by PDS. 2 + 3 + use super::*; 4 + 5 + async fn inner_create_record( 6 + body: CreateRecordInput, 7 + user: AuthenticatedUser, 8 + sequencer: Arc<RwLock<Sequencer>>, 9 + actor_pools: HashMap<String, ActorStorage>, 10 + account_manager: Arc<RwLock<AccountManager>>, 11 + ) -> Result<CreateRecordOutput> { 12 + let CreateRecordInput { 13 + repo, 14 + collection, 15 + record, 16 + rkey, 17 + validate, 18 + swap_commit, 19 + } = body; 20 + let account = account_manager 21 + .read() 22 + .await 23 + .get_account( 24 + &repo, 25 + Some(AvailabilityFlags { 26 + include_deactivated: Some(true), 27 + include_taken_down: None, 28 + }), 29 + ) 30 + .await?; 31 + if let Some(account) = account { 32 + if account.deactivated_at.is_some() { 33 + bail!("Account is deactivated") 34 + } 35 + let did = account.did; 36 + // if did != auth.access.credentials.unwrap().did.unwrap() { 37 + if did != user.did() { 38 + bail!("AuthRequiredError") 39 + } 40 + let swap_commit_cid = match swap_commit { 41 + Some(swap_commit) => Some(Cid::from_str(&swap_commit)?), 42 + None => None, 43 + }; 44 + let write = prepare_create(PrepareCreateOpts { 45 + did: did.clone(), 46 + collection: collection.clone(), 47 + record: serde_json::from_value(record)?, 48 + rkey, 49 + validate, 50 + swap_cid: None, 51 + }) 52 + .await?; 53 + 54 + let did: &String = &did; 55 + let mut actor_store = ActorStore::from_actor_pools(did, &actor_pools).await; 56 + let backlink_conflicts: Vec<AtUri> = match validate { 57 + Some(true) => { 58 + let write_at_uri: AtUri = write.uri.clone().try_into()?; 59 + actor_store 60 + .record 61 + .get_backlink_conflicts(&write_at_uri, &write.record) 62 + .await? 63 + } 64 + _ => Vec::new(), 65 + }; 66 + 67 + let backlink_deletions: Vec<PreparedDelete> = backlink_conflicts 68 + .iter() 69 + .map(|at_uri| { 70 + prepare_delete(PrepareDeleteOpts { 71 + did: at_uri.get_hostname().to_string(), 72 + collection: at_uri.get_collection(), 73 + rkey: at_uri.get_rkey(), 74 + swap_cid: None, 75 + }) 76 + }) 77 + .collect::<Result<Vec<PreparedDelete>>>()?; 78 + let mut writes: Vec<PreparedWrite> = vec![PreparedWrite::Create(write.clone())]; 79 + for delete in backlink_deletions { 80 + writes.push(PreparedWrite::Delete(delete)); 81 + } 82 + let commit = actor_store 83 + .process_writes(writes.clone(), swap_commit_cid) 84 + .await?; 85 + 86 + _ = sequencer 87 + .write() 88 + .await 89 + .sequence_commit(did.clone(), commit.clone()) 90 + .await?; 91 + account_manager 92 + .write() 93 + .await 94 + .update_repo_root( 95 + did.to_string(), 96 + commit.commit_data.cid, 97 + commit.commit_data.rev, 98 + &actor_pools, 99 + ) 100 + .await?; 101 + 102 + Ok(CreateRecordOutput { 103 + uri: write.uri.clone(), 104 + cid: write.cid.to_string(), 105 + }) 106 + } else { 107 + bail!("Could not find repo: `{repo}`") 108 + } 109 + } 110 + 111 + /// Create a single new repository record. Requires auth, implemented by PDS. 112 + /// - POST /xrpc/com.atproto.repo.createRecord 113 + /// ### Request Body 114 + /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 115 + /// - `collection`: `nsid` // The NSID of the record collection. 116 + /// - `rkey`: `string` // The record key. <= 512 characters. 117 + /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 118 + /// - `record` 119 + /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 120 + /// ### Responses 121 + /// - 200 OK: {`cid`: `cid`, `uri`: `at-uri`, `commit`: {`cid`: `cid`, `rev`: `tid`}, `validation_status`: [`valid`, `unknown`]} 122 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 123 + /// - 401 Unauthorized 124 + #[axum::debug_handler(state = AppState)] 125 + pub async fn create_record( 126 + user: AuthenticatedUser, 127 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 128 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 129 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 130 + Json(body): Json<CreateRecordInput>, 131 + ) -> Result<Json<CreateRecordOutput>, ApiError> { 132 + tracing::debug!("@LOG: debug create_record {body:#?}"); 133 + match inner_create_record(body, user, sequencer, db_actors, account_manager).await { 134 + Ok(res) => Ok(Json(res)), 135 + Err(error) => { 136 + tracing::error!("@LOG: ERROR: {error}"); 137 + Err(ApiError::RuntimeError) 138 + } 139 + } 140 + }
+117
src/apis/com/atproto/repo/delete_record.rs
···
··· 1 + //! Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 2 + use super::*; 3 + 4 + async fn inner_delete_record( 5 + body: DeleteRecordInput, 6 + user: AuthenticatedUser, 7 + sequencer: Arc<RwLock<Sequencer>>, 8 + actor_pools: HashMap<String, ActorStorage>, 9 + account_manager: Arc<RwLock<AccountManager>>, 10 + ) -> Result<()> { 11 + let DeleteRecordInput { 12 + repo, 13 + collection, 14 + rkey, 15 + swap_record, 16 + swap_commit, 17 + } = body; 18 + let account = account_manager 19 + .read() 20 + .await 21 + .get_account( 22 + &repo, 23 + Some(AvailabilityFlags { 24 + include_deactivated: Some(true), 25 + include_taken_down: None, 26 + }), 27 + ) 28 + .await?; 29 + match account { 30 + None => bail!("Could not find repo: `{repo}`"), 31 + Some(account) if account.deactivated_at.is_some() => bail!("Account is deactivated"), 32 + Some(account) => { 33 + let did = account.did; 34 + // if did != auth.access.credentials.unwrap().did.unwrap() { 35 + if did != user.did() { 36 + bail!("AuthRequiredError") 37 + } 38 + 39 + let swap_commit_cid = match swap_commit { 40 + Some(swap_commit) => Some(Cid::from_str(&swap_commit)?), 41 + None => None, 42 + }; 43 + let swap_record_cid = match swap_record { 44 + Some(swap_record) => Some(Cid::from_str(&swap_record)?), 45 + None => None, 46 + }; 47 + 48 + let write = prepare_delete(PrepareDeleteOpts { 49 + did: did.clone(), 50 + collection, 51 + rkey, 52 + swap_cid: swap_record_cid, 53 + })?; 54 + let mut actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 55 + let write_at_uri: AtUri = write.uri.clone().try_into()?; 56 + let record = actor_store 57 + .record 58 + .get_record(&write_at_uri, None, Some(true)) 59 + .await?; 60 + let commit = match record { 61 + None => return Ok(()), // No-op if record already doesn't exist 62 + Some(_) => { 63 + actor_store 64 + .process_writes(vec![PreparedWrite::Delete(write.clone())], swap_commit_cid) 65 + .await? 66 + } 67 + }; 68 + 69 + _ = sequencer 70 + .write() 71 + .await 72 + .sequence_commit(did.clone(), commit.clone()) 73 + .await?; 74 + account_manager 75 + .write() 76 + .await 77 + .update_repo_root( 78 + did, 79 + commit.commit_data.cid, 80 + commit.commit_data.rev, 81 + &actor_pools, 82 + ) 83 + .await?; 84 + 85 + Ok(()) 86 + } 87 + } 88 + } 89 + 90 + /// Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 91 + /// - POST /xrpc/com.atproto.repo.deleteRecord 92 + /// ### Request Body 93 + /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 94 + /// - `collection`: `nsid` // The NSID of the record collection. 95 + /// - `rkey`: `string` // The record key. <= 512 characters. 96 + /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. 97 + /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 98 + /// ### Responses 99 + /// - 200 OK: {"commit": {"cid": "string","rev": "string"}} 100 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 101 + /// - 401 Unauthorized 102 + #[axum::debug_handler(state = AppState)] 103 + pub async fn delete_record( 104 + user: AuthenticatedUser, 105 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 106 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 107 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 108 + Json(body): Json<DeleteRecordInput>, 109 + ) -> Result<(), ApiError> { 110 + match inner_delete_record(body, user, sequencer, db_actors, account_manager).await { 111 + Ok(()) => Ok(()), 112 + Err(error) => { 113 + tracing::error!("@LOG: ERROR: {error}"); 114 + Err(ApiError::RuntimeError) 115 + } 116 + } 117 + }
+70
src/apis/com/atproto/repo/describe_repo.rs
···
··· 1 + //! Get information about an account and repository, including the list of collections. Does not require auth. 2 + use super::*; 3 + 4 + async fn inner_describe_repo( 5 + repo: String, 6 + id_resolver: Arc<RwLock<IdResolver>>, 7 + actor_pools: HashMap<String, ActorStorage>, 8 + account_manager: Arc<RwLock<AccountManager>>, 9 + ) -> Result<DescribeRepoOutput> { 10 + let account = account_manager 11 + .read() 12 + .await 13 + .get_account(&repo, None) 14 + .await?; 15 + match account { 16 + None => bail!("Cound not find user: `{repo}`"), 17 + Some(account) => { 18 + let did_doc: DidDocument = match id_resolver 19 + .write() 20 + .await 21 + .did 22 + .ensure_resolve(&account.did, None) 23 + .await 24 + { 25 + Err(err) => bail!("Could not resolve DID: `{err}`"), 26 + Ok(res) => res, 27 + }; 28 + let handle = rsky_common::get_handle(&did_doc); 29 + let handle_is_correct = handle == account.handle; 30 + 31 + let actor_store = 32 + ActorStore::from_actor_pools(&account.did.clone(), &actor_pools).await; 33 + let collections = actor_store.record.list_collections().await?; 34 + 35 + Ok(DescribeRepoOutput { 36 + handle: account.handle.unwrap_or_else(|| INVALID_HANDLE.to_owned()), 37 + did: account.did, 38 + did_doc: serde_json::to_value(did_doc)?, 39 + collections, 40 + handle_is_correct, 41 + }) 42 + } 43 + } 44 + } 45 + 46 + /// Get information about an account and repository, including the list of collections. Does not require auth. 47 + /// - GET /xrpc/com.atproto.repo.describeRepo 48 + /// ### Query Parameters 49 + /// - `repo`: `at-identifier` // The handle or DID of the repo. 50 + /// ### Responses 51 + /// - 200 OK: {"handle": "string","did": "string","didDoc": {},"collections": [string],"handleIsCorrect": true} \ 52 + /// handeIsCorrect - boolean - Indicates if handle is currently valid (resolves bi-directionally) 53 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 54 + /// - 401 Unauthorized 55 + #[tracing::instrument(skip_all)] 56 + #[axum::debug_handler(state = AppState)] 57 + pub async fn describe_repo( 58 + Query(input): Query<atrium_repo::describe_repo::ParametersData>, 59 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 60 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 61 + State(id_resolver): State<Arc<RwLock<IdResolver>>>, 62 + ) -> Result<Json<DescribeRepoOutput>, ApiError> { 63 + match inner_describe_repo(input.repo.into(), id_resolver, db_actors, account_manager).await { 64 + Ok(res) => Ok(Json(res)), 65 + Err(error) => { 66 + tracing::error!("{error:?}"); 67 + Err(ApiError::RuntimeError) 68 + } 69 + } 70 + }
+37
src/apis/com/atproto/repo/ex.rs
···
··· 1 + //! 2 + use crate::account_manager::AccountManager; 3 + use crate::serve::ActorStorage; 4 + use crate::{actor_store::ActorStore, error::ApiError, serve::AppState}; 5 + use anyhow::{Result, bail}; 6 + use axum::extract::Query; 7 + use axum::{Json, extract::State}; 8 + use rsky_identity::IdResolver; 9 + use rsky_pds::sequencer::Sequencer; 10 + use std::collections::HashMap; 11 + use std::hash::RandomState; 12 + use std::sync::Arc; 13 + use tokio::sync::RwLock; 14 + 15 + async fn fun( 16 + actor_pools: HashMap<String, ActorStorage>, 17 + account_manager: Arc<RwLock<AccountManager>>, 18 + id_resolver: Arc<RwLock<IdResolver>>, 19 + sequencer: Arc<RwLock<Sequencer>>, 20 + ) -> Result<_> { 21 + todo!(); 22 + } 23 + 24 + /// 25 + #[tracing::instrument(skip_all)] 26 + #[axum::debug_handler(state = AppState)] 27 + pub async fn fun( 28 + auth: AuthenticatedUser, 29 + Query(input): Query<atrium_api::com::atproto::repo::describe_repo::ParametersData>, 30 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 31 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 32 + State(id_resolver): State<Arc<RwLock<IdResolver>>>, 33 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 34 + Json(body): Json<ApplyWritesInput>, 35 + ) -> Result<Json<_>, ApiError> { 36 + todo!(); 37 + }
+102
src/apis/com/atproto/repo/get_record.rs
···
··· 1 + //! Get a single record from a repository. Does not require auth. 2 + 3 + use crate::pipethrough::{ProxyRequest, pipethrough}; 4 + 5 + use super::*; 6 + 7 + use rsky_pds::pipethrough::OverrideOpts; 8 + 9 + async fn inner_get_record( 10 + repo: String, 11 + collection: String, 12 + rkey: String, 13 + cid: Option<String>, 14 + req: ProxyRequest, 15 + actor_pools: HashMap<String, ActorStorage>, 16 + account_manager: Arc<RwLock<AccountManager>>, 17 + ) -> Result<GetRecordOutput> { 18 + let did = account_manager 19 + .read() 20 + .await 21 + .get_did_for_actor(&repo, None) 22 + .await?; 23 + 24 + // fetch from pds if available, if not then fetch from appview 25 + if let Some(did) = did { 26 + let uri = AtUri::make(did.clone(), Some(collection), Some(rkey))?; 27 + 28 + let mut actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 29 + 30 + match actor_store.record.get_record(&uri, cid, None).await { 31 + Ok(Some(record)) if record.takedown_ref.is_none() => Ok(GetRecordOutput { 32 + uri: uri.to_string(), 33 + cid: Some(record.cid), 34 + value: serde_json::to_value(record.value)?, 35 + }), 36 + _ => bail!("Could not locate record: `{uri}`"), 37 + } 38 + } else { 39 + match req.cfg.bsky_app_view { 40 + None => bail!("Could not locate record"), 41 + Some(_) => match pipethrough( 42 + &req, 43 + None, 44 + OverrideOpts { 45 + aud: None, 46 + lxm: None, 47 + }, 48 + ) 49 + .await 50 + { 51 + Err(error) => { 52 + tracing::error!("@LOG: ERROR: {error}"); 53 + bail!("Could not locate record") 54 + } 55 + Ok(res) => { 56 + let output: GetRecordOutput = serde_json::from_slice(res.buffer.as_slice())?; 57 + Ok(output) 58 + } 59 + }, 60 + } 61 + } 62 + } 63 + 64 + /// Get a single record from a repository. Does not require auth. 65 + /// - GET /xrpc/com.atproto.repo.getRecord 66 + /// ### Query Parameters 67 + /// - `repo`: `at-identifier` // The handle or DID of the repo. 68 + /// - `collection`: `nsid` // The NSID of the record collection. 69 + /// - `rkey`: `string` // The record key. <= 512 characters. 70 + /// - `cid`: `cid` // The CID of the version of the record. If not specified, then return the most recent version. 71 + /// ### Responses 72 + /// - 200 OK: {"uri": "string","cid": "string","value": {}} 73 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RecordNotFound`]} 74 + /// - 401 Unauthorized 75 + #[tracing::instrument(skip_all)] 76 + #[axum::debug_handler(state = AppState)] 77 + pub async fn get_record( 78 + Query(input): Query<ParametersData>, 79 + State(db_actors): State<HashMap<String, ActorStorage, RandomState>>, 80 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 81 + req: ProxyRequest, 82 + ) -> Result<Json<GetRecordOutput>, ApiError> { 83 + let repo = input.repo; 84 + let collection = input.collection; 85 + let rkey = input.rkey; 86 + let cid = input.cid; 87 + match inner_get_record(repo, collection, rkey, cid, req, db_actors, account_manager).await { 88 + Ok(res) => Ok(Json(res)), 89 + Err(error) => { 90 + tracing::error!("@LOG: ERROR: {error}"); 91 + Err(ApiError::RecordNotFound) 92 + } 93 + } 94 + } 95 + 96 + #[derive(serde::Deserialize, Debug)] 97 + pub struct ParametersData { 98 + pub cid: Option<String>, 99 + pub collection: String, 100 + pub repo: String, 101 + pub rkey: String, 102 + }
+183
src/apis/com/atproto/repo/import_repo.rs
···
··· 1 + use axum::{body::Bytes, http::HeaderMap}; 2 + use reqwest::header; 3 + use rsky_common::env::env_int; 4 + use rsky_repo::block_map::BlockMap; 5 + use rsky_repo::car::{CarWithRoot, read_stream_car_with_root}; 6 + use rsky_repo::parse::get_and_parse_record; 7 + use rsky_repo::repo::Repo; 8 + use rsky_repo::sync::consumer::{VerifyRepoInput, verify_diff}; 9 + use rsky_repo::types::{RecordWriteDescript, VerifiedDiff}; 10 + use ubyte::ToByteUnit; 11 + 12 + use super::*; 13 + 14 + async fn from_data(bytes: Bytes) -> Result<CarWithRoot, ApiError> { 15 + let max_import_size = env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes(); 16 + if bytes.len() > max_import_size { 17 + return Err(ApiError::InvalidRequest(format!( 18 + "Content-Length is greater than maximum of {max_import_size}" 19 + ))); 20 + } 21 + 22 + let mut cursor = std::io::Cursor::new(bytes); 23 + match read_stream_car_with_root(&mut cursor).await { 24 + Ok(car_with_root) => Ok(car_with_root), 25 + Err(error) => { 26 + tracing::error!("Error reading stream car with root\n{error}"); 27 + Err(ApiError::InvalidRequest("Invalid CAR file".to_owned())) 28 + } 29 + } 30 + } 31 + 32 + #[tracing::instrument(skip_all)] 33 + #[axum::debug_handler(state = AppState)] 34 + /// Import a repo in the form of a CAR file. Requires Content-Length HTTP header to be set. 35 + /// Request 36 + /// mime application/vnd.ipld.car 37 + /// Body - required 38 + pub async fn import_repo( 39 + // auth: AccessFullImport, 40 + auth: AuthenticatedUser, 41 + headers: HeaderMap, 42 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 43 + body: Bytes, 44 + ) -> Result<(), ApiError> { 45 + // let requester = auth.access.credentials.unwrap().did.unwrap(); 46 + let requester = auth.did(); 47 + let mut actor_store = ActorStore::from_actor_pools(&requester, &actor_pools).await; 48 + 49 + // Check headers 50 + let content_length = headers 51 + .get(header::CONTENT_LENGTH) 52 + .expect("no content length provided") 53 + .to_str() 54 + .map_err(anyhow::Error::from) 55 + .and_then(|content_length| content_length.parse::<u64>().map_err(anyhow::Error::from)) 56 + .expect("invalid content-length header"); 57 + if content_length > env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes() { 58 + return Err(ApiError::InvalidRequest(format!( 59 + "Content-Length is greater than maximum of {}", 60 + env_int("IMPORT_REPO_LIMIT").unwrap_or(100).megabytes() 61 + ))); 62 + }; 63 + 64 + // Get current repo if it exists 65 + let curr_root: Option<Cid> = actor_store.get_repo_root().await; 66 + let curr_repo: Option<Repo> = match curr_root { 67 + None => None, 68 + Some(_root) => Some(Repo::load(actor_store.storage.clone(), curr_root).await?), 69 + }; 70 + 71 + // Process imported car 72 + // let car_with_root = import_repo_input.car_with_root; 73 + let car_with_root: CarWithRoot = match from_data(body).await { 74 + Ok(car) => car, 75 + Err(error) => { 76 + tracing::error!("Error importing repo\n{error:?}"); 77 + return Err(ApiError::InvalidRequest("Invalid CAR file".to_owned())); 78 + } 79 + }; 80 + 81 + // Get verified difference from current repo and imported repo 82 + let mut imported_blocks: BlockMap = car_with_root.blocks; 83 + let imported_root: Cid = car_with_root.root; 84 + let opts = VerifyRepoInput { 85 + ensure_leaves: Some(false), 86 + }; 87 + 88 + let diff: VerifiedDiff = match verify_diff( 89 + curr_repo, 90 + &mut imported_blocks, 91 + imported_root, 92 + None, 93 + None, 94 + Some(opts), 95 + ) 96 + .await 97 + { 98 + Ok(res) => res, 99 + Err(error) => { 100 + tracing::error!("{:?}", error); 101 + return Err(ApiError::RuntimeError); 102 + } 103 + }; 104 + 105 + let commit_data = diff.commit; 106 + let prepared_writes: Vec<PreparedWrite> = 107 + prepare_import_repo_writes(requester, diff.writes, &imported_blocks).await?; 108 + match actor_store 109 + .process_import_repo(commit_data, prepared_writes) 110 + .await 111 + { 112 + Ok(_res) => {} 113 + Err(error) => { 114 + tracing::error!("Error importing repo\n{error}"); 115 + return Err(ApiError::RuntimeError); 116 + } 117 + } 118 + 119 + Ok(()) 120 + } 121 + 122 + /// Converts list of RecordWriteDescripts into a list of PreparedWrites 123 + async fn prepare_import_repo_writes( 124 + did: String, 125 + writes: Vec<RecordWriteDescript>, 126 + blocks: &BlockMap, 127 + ) -> Result<Vec<PreparedWrite>, ApiError> { 128 + match stream::iter(writes) 129 + .then(|write| { 130 + let did = did.clone(); 131 + async move { 132 + Ok::<PreparedWrite, anyhow::Error>(match write { 133 + RecordWriteDescript::Create(write) => { 134 + let parsed_record = get_and_parse_record(blocks, write.cid)?; 135 + PreparedWrite::Create( 136 + prepare_create(PrepareCreateOpts { 137 + did: did.clone(), 138 + collection: write.collection, 139 + rkey: Some(write.rkey), 140 + swap_cid: None, 141 + record: parsed_record.record, 142 + validate: Some(true), 143 + }) 144 + .await?, 145 + ) 146 + } 147 + RecordWriteDescript::Update(write) => { 148 + let parsed_record = get_and_parse_record(blocks, write.cid)?; 149 + PreparedWrite::Update( 150 + prepare_update(PrepareUpdateOpts { 151 + did: did.clone(), 152 + collection: write.collection, 153 + rkey: write.rkey, 154 + swap_cid: None, 155 + record: parsed_record.record, 156 + validate: Some(true), 157 + }) 158 + .await?, 159 + ) 160 + } 161 + RecordWriteDescript::Delete(write) => { 162 + PreparedWrite::Delete(prepare_delete(PrepareDeleteOpts { 163 + did: did.clone(), 164 + collection: write.collection, 165 + rkey: write.rkey, 166 + swap_cid: None, 167 + })?) 168 + } 169 + }) 170 + } 171 + }) 172 + .collect::<Vec<_>>() 173 + .await 174 + .into_iter() 175 + .collect::<Result<Vec<PreparedWrite>, _>>() 176 + { 177 + Ok(res) => Ok(res), 178 + Err(error) => { 179 + tracing::error!("Error preparing import repo writes\n{error}"); 180 + Err(ApiError::RuntimeError) 181 + } 182 + } 183 + }
+48
src/apis/com/atproto/repo/list_missing_blobs.rs
···
··· 1 + //! Returns a list of missing blobs for the requesting account. Intended to be used in the account migration flow. 2 + use rsky_lexicon::com::atproto::repo::ListMissingBlobsOutput; 3 + use rsky_pds::actor_store::blob::ListMissingBlobsOpts; 4 + 5 + use super::*; 6 + 7 + /// Returns a list of missing blobs for the requesting account. Intended to be used in the account migration flow. 8 + /// Request 9 + /// Query Parameters 10 + /// limit integer 11 + /// Possible values: >= 1 and <= 1000 12 + /// Default value: 500 13 + /// cursor string 14 + /// Responses 15 + /// cursor string 16 + /// blobs object[] 17 + #[tracing::instrument(skip_all)] 18 + #[axum::debug_handler(state = AppState)] 19 + pub async fn list_missing_blobs( 20 + user: AuthenticatedUser, 21 + Query(input): Query<atrium_repo::list_missing_blobs::ParametersData>, 22 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 23 + ) -> Result<Json<ListMissingBlobsOutput>, ApiError> { 24 + let cursor = input.cursor; 25 + let limit = input.limit; 26 + let default_limit: atrium_api::types::LimitedNonZeroU16<1000> = 27 + atrium_api::types::LimitedNonZeroU16::try_from(500).expect("default limit"); 28 + let limit: u16 = limit.unwrap_or(default_limit).into(); 29 + // let did = auth.access.credentials.unwrap().did.unwrap(); 30 + let did = user.did(); 31 + 32 + let actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 33 + 34 + match actor_store 35 + .blob 36 + .list_missing_blobs(ListMissingBlobsOpts { cursor, limit }) 37 + .await 38 + { 39 + Ok(blobs) => { 40 + let cursor = blobs.last().map(|last_blob| last_blob.cid.clone()); 41 + Ok(Json(ListMissingBlobsOutput { cursor, blobs })) 42 + } 43 + Err(error) => { 44 + tracing::error!("{error:?}"); 45 + Err(ApiError::RuntimeError) 46 + } 47 + } 48 + }
+146
src/apis/com/atproto/repo/list_records.rs
···
··· 1 + //! List a range of records in a repository, matching a specific collection. Does not require auth. 2 + use super::*; 3 + 4 + // #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] 5 + // #[serde(rename_all = "camelCase")] 6 + // /// Parameters for [`list_records`]. 7 + // pub(super) struct ListRecordsParameters { 8 + // ///The NSID of the record type. 9 + // pub collection: Nsid, 10 + // /// The cursor to start from. 11 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 12 + // pub cursor: Option<String>, 13 + // ///The number of records to return. 14 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 15 + // pub limit: Option<String>, 16 + // ///The handle or DID of the repo. 17 + // pub repo: AtIdentifier, 18 + // ///Flag to reverse the order of the returned records. 19 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 20 + // pub reverse: Option<bool>, 21 + // ///DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) 22 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 23 + // pub rkey_end: Option<String>, 24 + // ///DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) 25 + // #[serde(skip_serializing_if = "core::option::Option::is_none")] 26 + // pub rkey_start: Option<String>, 27 + // } 28 + 29 + #[expect(non_snake_case, clippy::too_many_arguments)] 30 + async fn inner_list_records( 31 + // The handle or DID of the repo. 32 + repo: String, 33 + // The NSID of the record type. 34 + collection: String, 35 + // The number of records to return. 36 + limit: u16, 37 + cursor: Option<String>, 38 + // DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) 39 + rkeyStart: Option<String>, 40 + // DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) 41 + rkeyEnd: Option<String>, 42 + // Flag to reverse the order of the returned records. 43 + reverse: bool, 44 + // The actor pools 45 + actor_pools: HashMap<String, ActorStorage>, 46 + account_manager: Arc<RwLock<AccountManager>>, 47 + ) -> Result<ListRecordsOutput> { 48 + if limit > 100 { 49 + bail!("Error: limit can not be greater than 100") 50 + } 51 + let did = account_manager 52 + .read() 53 + .await 54 + .get_did_for_actor(&repo, None) 55 + .await?; 56 + if let Some(did) = did { 57 + let mut actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 58 + 59 + let records: Vec<Record> = actor_store 60 + .record 61 + .list_records_for_collection( 62 + collection, 63 + limit as i64, 64 + reverse, 65 + cursor, 66 + rkeyStart, 67 + rkeyEnd, 68 + None, 69 + ) 70 + .await? 71 + .into_iter() 72 + .map(|record| { 73 + Ok(Record { 74 + uri: record.uri.clone(), 75 + cid: record.cid.clone(), 76 + value: serde_json::to_value(record)?, 77 + }) 78 + }) 79 + .collect::<Result<Vec<Record>>>()?; 80 + 81 + let last_record = records.last(); 82 + let cursor: Option<String>; 83 + if let Some(last_record) = last_record { 84 + let last_at_uri: AtUri = last_record.uri.clone().try_into()?; 85 + cursor = Some(last_at_uri.get_rkey()); 86 + } else { 87 + cursor = None; 88 + } 89 + Ok(ListRecordsOutput { records, cursor }) 90 + } else { 91 + bail!("Could not find repo: {repo}") 92 + } 93 + } 94 + 95 + /// List a range of records in a repository, matching a specific collection. Does not require auth. 96 + /// - GET /xrpc/com.atproto.repo.listRecords 97 + /// ### Query Parameters 98 + /// - `repo`: `at-identifier` // The handle or DID of the repo. 99 + /// - `collection`: `nsid` // The NSID of the record type. 100 + /// - `limit`: `integer` // The maximum number of records to return. Default 50, >=1 and <=100. 101 + /// - `cursor`: `string` 102 + /// - `reverse`: `boolean` // Flag to reverse the order of the returned records. 103 + /// ### Responses 104 + /// - 200 OK: {"cursor": "string","records": [{"uri": "string","cid": "string","value": {}}]} 105 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 106 + /// - 401 Unauthorized 107 + #[tracing::instrument(skip_all)] 108 + #[allow(non_snake_case)] 109 + #[axum::debug_handler(state = AppState)] 110 + pub async fn list_records( 111 + Query(input): Query<atrium_repo::list_records::ParametersData>, 112 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 113 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 114 + ) -> Result<Json<ListRecordsOutput>, ApiError> { 115 + let repo = input.repo; 116 + let collection = input.collection; 117 + let limit: Option<u8> = input.limit.map(u8::from); 118 + let limit: Option<u16> = limit.map(|x| x.into()); 119 + let cursor = input.cursor; 120 + let reverse = input.reverse; 121 + let rkeyStart = None; 122 + let rkeyEnd = None; 123 + 124 + let limit = limit.unwrap_or(50); 125 + let reverse = reverse.unwrap_or(false); 126 + 127 + match inner_list_records( 128 + repo.into(), 129 + collection.into(), 130 + limit, 131 + cursor, 132 + rkeyStart, 133 + rkeyEnd, 134 + reverse, 135 + actor_pools, 136 + account_manager, 137 + ) 138 + .await 139 + { 140 + Ok(res) => Ok(Json(res)), 141 + Err(error) => { 142 + tracing::error!("@LOG: ERROR: {error}"); 143 + Err(ApiError::RuntimeError) 144 + } 145 + } 146 + }
+92 -26
src/apis/com/atproto/repo/mod.rs
··· 1 - use atrium_api::com::atproto::repo; 2 - use axum::{Router, routing::post}; 3 use constcat::concat; 4 5 - use crate::serve::AppState; 6 - 7 pub mod apply_writes; 8 - // pub mod create_record; 9 - // pub mod delete_record; 10 - // pub mod describe_repo; 11 - // pub mod get_record; 12 - // pub mod import_repo; 13 - // pub mod list_missing_blobs; 14 - // pub mod list_records; 15 - // pub mod put_record; 16 - // pub mod upload_blob; 17 18 /// These endpoints are part of the atproto PDS repository management APIs. \ 19 /// Requests usually require authentication (unlike the com.atproto.sync.* endpoints), and are made directly to the user's own PDS instance. ··· 29 /// - [ ] xx /xrpc/com.atproto.repo.importRepo 30 // - [ ] xx /xrpc/com.atproto.repo.listMissingBlobs 31 pub(crate) fn routes() -> Router<AppState> { 32 - Router::new().route( 33 - concat!("/", repo::apply_writes::NSID), 34 - post(apply_writes::apply_writes), 35 - ) 36 - // .route(concat!("/", repo::create_record::NSID), post(create_record)) 37 - // .route(concat!("/", repo::put_record::NSID), post(put_record)) 38 - // .route(concat!("/", repo::delete_record::NSID), post(delete_record)) 39 - // .route(concat!("/", repo::upload_blob::NSID), post(upload_blob)) 40 - // .route(concat!("/", repo::describe_repo::NSID), get(describe_repo)) 41 - // .route(concat!("/", repo::get_record::NSID), get(get_record)) 42 - // .route(concat!("/", repo::import_repo::NSID), post(todo)) 43 - // .route(concat!("/", repo::list_missing_blobs::NSID), get(todo)) 44 - // .route(concat!("/", repo::list_records::NSID), get(list_records)) 45 }
··· 1 + use atrium_api::com::atproto::repo as atrium_repo; 2 + use axum::{ 3 + Router, 4 + routing::{get, post}, 5 + }; 6 use constcat::concat; 7 8 pub mod apply_writes; 9 + pub mod create_record; 10 + pub mod delete_record; 11 + pub mod describe_repo; 12 + pub mod get_record; 13 + pub mod import_repo; 14 + pub mod list_missing_blobs; 15 + pub mod list_records; 16 + pub mod put_record; 17 + pub mod upload_blob; 18 + 19 + use crate::account_manager::AccountManager; 20 + use crate::account_manager::helpers::account::AvailabilityFlags; 21 + use crate::{ 22 + actor_store::ActorStore, 23 + auth::AuthenticatedUser, 24 + error::ApiError, 25 + serve::{ActorStorage, AppState}, 26 + }; 27 + use anyhow::{Result, bail}; 28 + use axum::extract::Query; 29 + use axum::{Json, extract::State}; 30 + use cidv10::Cid; 31 + use futures::stream::{self, StreamExt}; 32 + use rsky_identity::IdResolver; 33 + use rsky_identity::types::DidDocument; 34 + use rsky_lexicon::com::atproto::repo::DeleteRecordInput; 35 + use rsky_lexicon::com::atproto::repo::DescribeRepoOutput; 36 + use rsky_lexicon::com::atproto::repo::GetRecordOutput; 37 + use rsky_lexicon::com::atproto::repo::{ApplyWritesInput, ApplyWritesInputRefWrite}; 38 + use rsky_lexicon::com::atproto::repo::{CreateRecordInput, CreateRecordOutput}; 39 + use rsky_lexicon::com::atproto::repo::{ListRecordsOutput, Record}; 40 + // use rsky_pds::pipethrough::{OverrideOpts, ProxyRequest, pipethrough}; 41 + use rsky_pds::repo::prepare::{ 42 + PrepareCreateOpts, PrepareDeleteOpts, PrepareUpdateOpts, prepare_create, prepare_delete, 43 + prepare_update, 44 + }; 45 + use rsky_pds::sequencer::Sequencer; 46 + use rsky_repo::types::PreparedDelete; 47 + use rsky_repo::types::PreparedWrite; 48 + use rsky_syntax::aturi::AtUri; 49 + use rsky_syntax::handle::INVALID_HANDLE; 50 + use std::collections::HashMap; 51 + use std::hash::RandomState; 52 + use std::str::FromStr; 53 + use std::sync::Arc; 54 + use tokio::sync::RwLock; 55 56 /// These endpoints are part of the atproto PDS repository management APIs. \ 57 /// Requests usually require authentication (unlike the com.atproto.sync.* endpoints), and are made directly to the user's own PDS instance. ··· 67 /// - [ ] xx /xrpc/com.atproto.repo.importRepo 68 // - [ ] xx /xrpc/com.atproto.repo.listMissingBlobs 69 pub(crate) fn routes() -> Router<AppState> { 70 + Router::new() 71 + .route( 72 + concat!("/", atrium_repo::apply_writes::NSID), 73 + post(apply_writes::apply_writes), 74 + ) 75 + .route( 76 + concat!("/", atrium_repo::create_record::NSID), 77 + post(create_record::create_record), 78 + ) 79 + .route( 80 + concat!("/", atrium_repo::put_record::NSID), 81 + post(put_record::put_record), 82 + ) 83 + .route( 84 + concat!("/", atrium_repo::delete_record::NSID), 85 + post(delete_record::delete_record), 86 + ) 87 + .route( 88 + concat!("/", atrium_repo::upload_blob::NSID), 89 + post(upload_blob::upload_blob), 90 + ) 91 + .route( 92 + concat!("/", atrium_repo::describe_repo::NSID), 93 + get(describe_repo::describe_repo), 94 + ) 95 + .route( 96 + concat!("/", atrium_repo::get_record::NSID), 97 + get(get_record::get_record), 98 + ) 99 + .route( 100 + concat!("/", atrium_repo::import_repo::NSID), 101 + post(import_repo::import_repo), 102 + ) 103 + .route( 104 + concat!("/", atrium_repo::list_missing_blobs::NSID), 105 + get(list_missing_blobs::list_missing_blobs), 106 + ) 107 + .route( 108 + concat!("/", atrium_repo::list_records::NSID), 109 + get(list_records::list_records), 110 + ) 111 }
+157
src/apis/com/atproto/repo/put_record.rs
···
··· 1 + //! Write a repository record, creating or updating it as needed. Requires auth, implemented by PDS. 2 + use anyhow::bail; 3 + use rsky_lexicon::com::atproto::repo::{PutRecordInput, PutRecordOutput}; 4 + use rsky_repo::types::CommitDataWithOps; 5 + 6 + use super::*; 7 + 8 + #[tracing::instrument(skip_all)] 9 + async fn inner_put_record( 10 + body: PutRecordInput, 11 + auth: AuthenticatedUser, 12 + sequencer: Arc<RwLock<Sequencer>>, 13 + actor_pools: HashMap<String, ActorStorage>, 14 + account_manager: Arc<RwLock<AccountManager>>, 15 + ) -> Result<PutRecordOutput> { 16 + let PutRecordInput { 17 + repo, 18 + collection, 19 + rkey, 20 + validate, 21 + record, 22 + swap_record, 23 + swap_commit, 24 + } = body; 25 + let account = account_manager 26 + .read() 27 + .await 28 + .get_account( 29 + &repo, 30 + Some(AvailabilityFlags { 31 + include_deactivated: Some(true), 32 + include_taken_down: None, 33 + }), 34 + ) 35 + .await?; 36 + if let Some(account) = account { 37 + if account.deactivated_at.is_some() { 38 + bail!("Account is deactivated") 39 + } 40 + let did = account.did; 41 + // if did != auth.access.credentials.unwrap().did.unwrap() { 42 + if did != auth.did() { 43 + bail!("AuthRequiredError") 44 + } 45 + let uri = AtUri::make(did.clone(), Some(collection.clone()), Some(rkey.clone()))?; 46 + let swap_commit_cid = match swap_commit { 47 + Some(swap_commit) => Some(Cid::from_str(&swap_commit)?), 48 + None => None, 49 + }; 50 + let swap_record_cid = match swap_record { 51 + Some(swap_record) => Some(Cid::from_str(&swap_record)?), 52 + None => None, 53 + }; 54 + let (commit, write): (Option<CommitDataWithOps>, PreparedWrite) = { 55 + let mut actor_store = ActorStore::from_actor_pools(&did, &actor_pools).await; 56 + 57 + let current = actor_store 58 + .record 59 + .get_record(&uri, None, Some(true)) 60 + .await?; 61 + tracing::debug!("@LOG: debug inner_put_record, current: {current:?}"); 62 + let write: PreparedWrite = if current.is_some() { 63 + PreparedWrite::Update( 64 + prepare_update(PrepareUpdateOpts { 65 + did: did.clone(), 66 + collection, 67 + rkey, 68 + swap_cid: swap_record_cid, 69 + record: serde_json::from_value(record)?, 70 + validate, 71 + }) 72 + .await?, 73 + ) 74 + } else { 75 + PreparedWrite::Create( 76 + prepare_create(PrepareCreateOpts { 77 + did: did.clone(), 78 + collection, 79 + rkey: Some(rkey), 80 + swap_cid: swap_record_cid, 81 + record: serde_json::from_value(record)?, 82 + validate, 83 + }) 84 + .await?, 85 + ) 86 + }; 87 + 88 + match current { 89 + Some(current) if current.cid == write.cid().expect("write cid").to_string() => { 90 + (None, write) 91 + } 92 + _ => { 93 + let commit = actor_store 94 + .process_writes(vec![write.clone()], swap_commit_cid) 95 + .await?; 96 + (Some(commit), write) 97 + } 98 + } 99 + }; 100 + 101 + if let Some(commit) = commit { 102 + _ = sequencer 103 + .write() 104 + .await 105 + .sequence_commit(did.clone(), commit.clone()) 106 + .await?; 107 + account_manager 108 + .write() 109 + .await 110 + .update_repo_root( 111 + did, 112 + commit.commit_data.cid, 113 + commit.commit_data.rev, 114 + &actor_pools, 115 + ) 116 + .await?; 117 + } 118 + Ok(PutRecordOutput { 119 + uri: write.uri().to_string(), 120 + cid: write.cid().expect("write cid").to_string(), 121 + }) 122 + } else { 123 + bail!("Could not find repo: `{repo}`") 124 + } 125 + } 126 + 127 + /// Write a repository record, creating or updating it as needed. Requires auth, implemented by PDS. 128 + /// - POST /xrpc/com.atproto.repo.putRecord 129 + /// ### Request Body 130 + /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 131 + /// - `collection`: `nsid` // The NSID of the record collection. 132 + /// - `rkey`: `string` // The record key. <= 512 characters. 133 + /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 134 + /// - `record` 135 + /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. WARNING: nullable and optional field; may cause problems with golang implementation 136 + /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 137 + /// ### Responses 138 + /// - 200 OK: {"uri": "string","cid": "string","commit": {"cid": "string","rev": "string"},"validationStatus": "valid | unknown"} 139 + /// - 400 Bad Request: {error:"`InvalidRequest` | `ExpiredToken` | `InvalidToken` | `InvalidSwap`"} 140 + /// - 401 Unauthorized 141 + #[tracing::instrument(skip_all)] 142 + pub async fn put_record( 143 + auth: AuthenticatedUser, 144 + State(sequencer): State<Arc<RwLock<Sequencer>>>, 145 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 146 + State(account_manager): State<Arc<RwLock<AccountManager>>>, 147 + Json(body): Json<PutRecordInput>, 148 + ) -> Result<Json<PutRecordOutput>, ApiError> { 149 + tracing::debug!("@LOG: debug put_record {body:#?}"); 150 + match inner_put_record(body, auth, sequencer, actor_pools, account_manager).await { 151 + Ok(res) => Ok(Json(res)), 152 + Err(error) => { 153 + tracing::error!("@LOG: ERROR: {error}"); 154 + Err(ApiError::RuntimeError) 155 + } 156 + } 157 + }
-514
src/apis/com/atproto/repo/repo.rs
··· 1 - //! PDS repository endpoints /xrpc/com.atproto.repo.*) 2 - mod apply_writes; 3 - pub(crate) use apply_writes::apply_writes; 4 - 5 - use std::{collections::HashSet, str::FromStr}; 6 - 7 - use anyhow::{Context as _, anyhow}; 8 - use atrium_api::com::atproto::repo::apply_writes::{ 9 - self as atrium_apply_writes, InputWritesItem, OutputResultsItem, 10 - }; 11 - use atrium_api::{ 12 - com::atproto::repo::{self, defs::CommitMetaData}, 13 - types::{ 14 - LimitedU32, Object, TryFromUnknown as _, TryIntoUnknown as _, Unknown, 15 - string::{AtIdentifier, Nsid, Tid}, 16 - }, 17 - }; 18 - use atrium_repo::{Cid, blockstore::CarStore}; 19 - use axum::{ 20 - Json, Router, 21 - body::Body, 22 - extract::{Query, Request, State}, 23 - http::{self, StatusCode}, 24 - routing::{get, post}, 25 - }; 26 - use constcat::concat; 27 - use futures::TryStreamExt as _; 28 - use metrics::counter; 29 - use rsky_syntax::aturi::AtUri; 30 - use serde::Deserialize; 31 - use tokio::io::AsyncWriteExt as _; 32 - 33 - use crate::repo::block_map::cid_for_cbor; 34 - use crate::repo::types::PreparedCreateOrUpdate; 35 - use crate::{ 36 - AppState, Db, Error, Result, SigningKey, 37 - actor_store::{ActorStoreTransactor, ActorStoreWriter}, 38 - auth::AuthenticatedUser, 39 - config::AppConfig, 40 - error::ErrorMessage, 41 - firehose::{self, FirehoseProducer, RepoOp}, 42 - metrics::{REPO_COMMITS, REPO_OP_CREATE, REPO_OP_DELETE, REPO_OP_UPDATE}, 43 - repo::types::{PreparedWrite, WriteOpAction}, 44 - storage, 45 - }; 46 - 47 - #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)] 48 - #[serde(rename_all = "camelCase")] 49 - /// Parameters for [`list_records`]. 50 - pub(super) struct ListRecordsParameters { 51 - ///The NSID of the record type. 52 - pub collection: Nsid, 53 - /// The cursor to start from. 54 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 55 - pub cursor: Option<String>, 56 - ///The number of records to return. 57 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 58 - pub limit: Option<String>, 59 - ///The handle or DID of the repo. 60 - pub repo: AtIdentifier, 61 - ///Flag to reverse the order of the returned records. 62 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 63 - pub reverse: Option<bool>, 64 - ///DEPRECATED: The highest sort-ordered rkey to stop at (exclusive) 65 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 66 - pub rkey_end: Option<String>, 67 - ///DEPRECATED: The lowest sort-ordered rkey to start from (exclusive) 68 - #[serde(skip_serializing_if = "core::option::Option::is_none")] 69 - pub rkey_start: Option<String>, 70 - } 71 - 72 - /// Resolve DID to DID document. Does not bi-directionally verify handle. 73 - /// - GET /xrpc/com.atproto.repo.resolveDid 74 - /// ### Query Parameters 75 - /// - `did`: DID to resolve. 76 - /// ### Responses 77 - /// - 200 OK: {`did_doc`: `did_doc`} 78 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `DidNotFound`, `DidDeactivated`]} 79 - async fn resolve_did( 80 - db: &Db, 81 - identifier: &AtIdentifier, 82 - ) -> anyhow::Result<( 83 - atrium_api::types::string::Did, 84 - atrium_api::types::string::Handle, 85 - )> { 86 - let (handle, did) = match *identifier { 87 - AtIdentifier::Handle(ref handle) => { 88 - let handle_as_str = &handle.as_str(); 89 - ( 90 - &handle.to_owned(), 91 - &atrium_api::types::string::Did::new( 92 - sqlx::query_scalar!( 93 - r#"SELECT did FROM handles WHERE handle = ?"#, 94 - handle_as_str 95 - ) 96 - .fetch_one(db) 97 - .await 98 - .context("failed to query did")?, 99 - ) 100 - .expect("should be valid DID"), 101 - ) 102 - } 103 - AtIdentifier::Did(ref did) => { 104 - let did_as_str = &did.as_str(); 105 - ( 106 - &atrium_api::types::string::Handle::new( 107 - sqlx::query_scalar!(r#"SELECT handle FROM handles WHERE did = ?"#, did_as_str) 108 - .fetch_one(db) 109 - .await 110 - .context("failed to query did")?, 111 - ) 112 - .expect("should be valid handle"), 113 - &did.to_owned(), 114 - ) 115 - } 116 - }; 117 - 118 - Ok((did.to_owned(), handle.to_owned())) 119 - } 120 - 121 - /// Create a single new repository record. Requires auth, implemented by PDS. 122 - /// - POST /xrpc/com.atproto.repo.createRecord 123 - /// ### Request Body 124 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 125 - /// - `collection`: `nsid` // The NSID of the record collection. 126 - /// - `rkey`: `string` // The record key. <= 512 characters. 127 - /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 128 - /// - `record` 129 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 130 - /// ### Responses 131 - /// - 200 OK: {`cid`: `cid`, `uri`: `at-uri`, `commit`: {`cid`: `cid`, `rev`: `tid`}, `validation_status`: [`valid`, `unknown`]} 132 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 133 - /// - 401 Unauthorized 134 - async fn create_record( 135 - user: AuthenticatedUser, 136 - State(actor_store): State<ActorStore>, 137 - State(skey): State<SigningKey>, 138 - State(config): State<AppConfig>, 139 - State(db): State<Db>, 140 - State(fhp): State<FirehoseProducer>, 141 - Json(input): Json<repo::create_record::Input>, 142 - ) -> Result<Json<repo::create_record::Output>> { 143 - todo!(); 144 - // let write_result = apply_writes::apply_writes( 145 - // user, 146 - // State(actor_store), 147 - // State(skey), 148 - // State(config), 149 - // State(db), 150 - // State(fhp), 151 - // Json( 152 - // repo::apply_writes::InputData { 153 - // repo: input.repo.clone(), 154 - // validate: input.validate, 155 - // swap_commit: input.swap_commit.clone(), 156 - // writes: vec![repo::apply_writes::InputWritesItem::Create(Box::new( 157 - // repo::apply_writes::CreateData { 158 - // collection: input.collection.clone(), 159 - // rkey: input.rkey.clone(), 160 - // value: input.record.clone(), 161 - // } 162 - // .into(), 163 - // ))], 164 - // } 165 - // .into(), 166 - // ), 167 - // ) 168 - // .await 169 - // .context("failed to apply writes")?; 170 - 171 - // let create_result = if let repo::apply_writes::OutputResultsItem::CreateResult(create_result) = 172 - // write_result 173 - // .results 174 - // .clone() 175 - // .and_then(|result| result.first().cloned()) 176 - // .context("unexpected output from apply_writes")? 177 - // { 178 - // Some(create_result) 179 - // } else { 180 - // None 181 - // } 182 - // .context("unexpected result from apply_writes")?; 183 - 184 - // Ok(Json( 185 - // repo::create_record::OutputData { 186 - // cid: create_result.cid.clone(), 187 - // commit: write_result.commit.clone(), 188 - // uri: create_result.uri.clone(), 189 - // validation_status: Some("unknown".to_owned()), 190 - // } 191 - // .into(), 192 - // )) 193 - } 194 - 195 - /// Write a repository record, creating or updating it as needed. Requires auth, implemented by PDS. 196 - /// - POST /xrpc/com.atproto.repo.putRecord 197 - /// ### Request Body 198 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 199 - /// - `collection`: `nsid` // The NSID of the record collection. 200 - /// - `rkey`: `string` // The record key. <= 512 characters. 201 - /// - `validate`: `boolean` // Can be set to 'false' to skip Lexicon schema validation of record data, 'true' to require it, or leave unset to validate only for known Lexicons. 202 - /// - `record` 203 - /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. WARNING: nullable and optional field; may cause problems with golang implementation 204 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 205 - /// ### Responses 206 - /// - 200 OK: {"uri": "string","cid": "string","commit": {"cid": "string","rev": "string"},"validationStatus": "valid | unknown"} 207 - /// - 400 Bad Request: {error:"`InvalidRequest` | `ExpiredToken` | `InvalidToken` | `InvalidSwap`"} 208 - /// - 401 Unauthorized 209 - async fn put_record( 210 - user: AuthenticatedUser, 211 - State(actor_store): State<ActorStore>, 212 - State(skey): State<SigningKey>, 213 - State(config): State<AppConfig>, 214 - State(db): State<Db>, 215 - State(fhp): State<FirehoseProducer>, 216 - Json(input): Json<repo::put_record::Input>, 217 - ) -> Result<Json<repo::put_record::Output>> { 218 - todo!(); 219 - // // TODO: `input.swap_record` 220 - // // FIXME: "put" implies that we will create the record if it does not exist. 221 - // // We currently only update existing records and/or throw an error if one doesn't exist. 222 - // let input = (*input).clone(); 223 - // let input = repo::apply_writes::InputData { 224 - // repo: input.repo, 225 - // validate: input.validate, 226 - // swap_commit: input.swap_commit, 227 - // writes: vec![repo::apply_writes::InputWritesItem::Update(Box::new( 228 - // repo::apply_writes::UpdateData { 229 - // collection: input.collection, 230 - // rkey: input.rkey, 231 - // value: input.record, 232 - // } 233 - // .into(), 234 - // ))], 235 - // } 236 - // .into(); 237 - 238 - // let write_result = apply_writes::apply_writes( 239 - // user, 240 - // State(actor_store), 241 - // State(skey), 242 - // State(config), 243 - // State(db), 244 - // State(fhp), 245 - // Json(input), 246 - // ) 247 - // .await 248 - // .context("failed to apply writes")?; 249 - 250 - // let update_result = write_result 251 - // .results 252 - // .clone() 253 - // .and_then(|result| result.first().cloned()) 254 - // .context("unexpected output from apply_writes")?; 255 - // let (cid, uri) = match update_result { 256 - // repo::apply_writes::OutputResultsItem::CreateResult(create_result) => ( 257 - // Some(create_result.cid.clone()), 258 - // Some(create_result.uri.clone()), 259 - // ), 260 - // repo::apply_writes::OutputResultsItem::UpdateResult(update_result) => ( 261 - // Some(update_result.cid.clone()), 262 - // Some(update_result.uri.clone()), 263 - // ), 264 - // repo::apply_writes::OutputResultsItem::DeleteResult(_) => (None, None), 265 - // }; 266 - // Ok(Json( 267 - // repo::put_record::OutputData { 268 - // cid: cid.context("missing cid")?, 269 - // commit: write_result.commit.clone(), 270 - // uri: uri.context("missing uri")?, 271 - // validation_status: Some("unknown".to_owned()), 272 - // } 273 - // .into(), 274 - // )) 275 - } 276 - 277 - /// Delete a repository record, or ensure it doesn't exist. Requires auth, implemented by PDS. 278 - /// - POST /xrpc/com.atproto.repo.deleteRecord 279 - /// ### Request Body 280 - /// - `repo`: `at-identifier` // The handle or DID of the repo (aka, current account). 281 - /// - `collection`: `nsid` // The NSID of the record collection. 282 - /// - `rkey`: `string` // The record key. <= 512 characters. 283 - /// - `swap_record`: `boolean` // Compare and swap with the previous record by CID. 284 - /// - `swap_commit`: `cid` // Compare and swap with the previous commit by CID. 285 - /// ### Responses 286 - /// - 200 OK: {"commit": {"cid": "string","rev": "string"}} 287 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `InvalidSwap`]} 288 - /// - 401 Unauthorized 289 - async fn delete_record( 290 - user: AuthenticatedUser, 291 - State(actor_store): State<ActorStore>, 292 - State(skey): State<SigningKey>, 293 - State(config): State<AppConfig>, 294 - State(db): State<Db>, 295 - State(fhp): State<FirehoseProducer>, 296 - Json(input): Json<repo::delete_record::Input>, 297 - ) -> Result<Json<repo::delete_record::Output>> { 298 - todo!(); 299 - // // TODO: `input.swap_record` 300 - 301 - // Ok(Json( 302 - // repo::delete_record::OutputData { 303 - // commit: apply_writes::apply_writes( 304 - // user, 305 - // State(actor_store), 306 - // State(skey), 307 - // State(config), 308 - // State(db), 309 - // State(fhp), 310 - // Json( 311 - // repo::apply_writes::InputData { 312 - // repo: input.repo.clone(), 313 - // swap_commit: input.swap_commit.clone(), 314 - // validate: None, 315 - // writes: vec![repo::apply_writes::InputWritesItem::Delete(Box::new( 316 - // repo::apply_writes::DeleteData { 317 - // collection: input.collection.clone(), 318 - // rkey: input.rkey.clone(), 319 - // } 320 - // .into(), 321 - // ))], 322 - // } 323 - // .into(), 324 - // ), 325 - // ) 326 - // .await 327 - // .context("failed to apply writes")? 328 - // .commit 329 - // .clone(), 330 - // } 331 - // .into(), 332 - // )) 333 - } 334 - 335 - /// Get information about an account and repository, including the list of collections. Does not require auth. 336 - /// - GET /xrpc/com.atproto.repo.describeRepo 337 - /// ### Query Parameters 338 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 339 - /// ### Responses 340 - /// - 200 OK: {"handle": "string","did": "string","didDoc": {},"collections": [string],"handleIsCorrect": true} \ 341 - /// handeIsCorrect - boolean - Indicates if handle is currently valid (resolves bi-directionally) 342 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 343 - /// - 401 Unauthorized 344 - async fn describe_repo( 345 - State(actor_store): State<ActorStore>, 346 - State(config): State<AppConfig>, 347 - State(db): State<Db>, 348 - Query(input): Query<repo::describe_repo::ParametersData>, 349 - ) -> Result<Json<repo::describe_repo::Output>> { 350 - // Lookup the DID by the provided handle. 351 - let (did, handle) = resolve_did(&db, &input.repo) 352 - .await 353 - .context("failed to resolve handle")?; 354 - 355 - // Use Actor Store to get the collections 356 - todo!(); 357 - } 358 - 359 - /// Get a single record from a repository. Does not require auth. 360 - /// - GET /xrpc/com.atproto.repo.getRecord 361 - /// ### Query Parameters 362 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 363 - /// - `collection`: `nsid` // The NSID of the record collection. 364 - /// - `rkey`: `string` // The record key. <= 512 characters. 365 - /// - `cid`: `cid` // The CID of the version of the record. If not specified, then return the most recent version. 366 - /// ### Responses 367 - /// - 200 OK: {"uri": "string","cid": "string","value": {}} 368 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`, `RecordNotFound`]} 369 - /// - 401 Unauthorized 370 - async fn get_record( 371 - State(actor_store): State<ActorStore>, 372 - State(config): State<AppConfig>, 373 - State(db): State<Db>, 374 - Query(input): Query<repo::get_record::ParametersData>, 375 - ) -> Result<Json<repo::get_record::Output>> { 376 - if input.cid.is_some() { 377 - return Err(Error::unimplemented(anyhow!( 378 - "looking up old records is unsupported" 379 - ))); 380 - } 381 - 382 - // Lookup the DID by the provided handle. 383 - let (did, _handle) = resolve_did(&db, &input.repo) 384 - .await 385 - .context("failed to resolve handle")?; 386 - 387 - // Create a URI from the parameters 388 - let uri = format!( 389 - "at://{}/{}/{}", 390 - did.as_str(), 391 - input.collection.as_str(), 392 - input.rkey.as_str() 393 - ); 394 - 395 - // Use Actor Store to get the record 396 - todo!(); 397 - } 398 - 399 - /// List a range of records in a repository, matching a specific collection. Does not require auth. 400 - /// - GET /xrpc/com.atproto.repo.listRecords 401 - /// ### Query Parameters 402 - /// - `repo`: `at-identifier` // The handle or DID of the repo. 403 - /// - `collection`: `nsid` // The NSID of the record type. 404 - /// - `limit`: `integer` // The maximum number of records to return. Default 50, >=1 and <=100. 405 - /// - `cursor`: `string` 406 - /// - `reverse`: `boolean` // Flag to reverse the order of the returned records. 407 - /// ### Responses 408 - /// - 200 OK: {"cursor": "string","records": [{"uri": "string","cid": "string","value": {}}]} 409 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 410 - /// - 401 Unauthorized 411 - async fn list_records( 412 - State(actor_store): State<ActorStore>, 413 - State(config): State<AppConfig>, 414 - State(db): State<Db>, 415 - Query(input): Query<Object<ListRecordsParameters>>, 416 - ) -> Result<Json<repo::list_records::Output>> { 417 - // Lookup the DID by the provided handle. 418 - let (did, _handle) = resolve_did(&db, &input.repo) 419 - .await 420 - .context("failed to resolve handle")?; 421 - 422 - // Use Actor Store to list records for the collection 423 - todo!(); 424 - } 425 - 426 - /// Upload a new blob, to be referenced from a repository record. \ 427 - /// The blob will be deleted if it is not referenced within a time window (eg, minutes). \ 428 - /// Blob restrictions (mimetype, size, etc) are enforced when the reference is created. \ 429 - /// Requires auth, implemented by PDS. 430 - /// - POST /xrpc/com.atproto.repo.uploadBlob 431 - /// ### Request Body 432 - /// ### Responses 433 - /// - 200 OK: {"blob": "binary"} 434 - /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 435 - /// - 401 Unauthorized 436 - async fn upload_blob( 437 - user: AuthenticatedUser, 438 - State(actor_store): State<ActorStore>, 439 - State(config): State<AppConfig>, 440 - State(db): State<Db>, 441 - request: Request<Body>, 442 - ) -> Result<Json<repo::upload_blob::Output>> { 443 - let length = request 444 - .headers() 445 - .get(http::header::CONTENT_LENGTH) 446 - .context("no content length provided")? 447 - .to_str() 448 - .map_err(anyhow::Error::from) 449 - .and_then(|content_length| content_length.parse::<u64>().map_err(anyhow::Error::from)) 450 - .context("invalid content-length header")?; 451 - let mime = request 452 - .headers() 453 - .get(http::header::CONTENT_TYPE) 454 - .context("no content-type provided")? 455 - .to_str() 456 - .context("invalid content-type provided")? 457 - .to_owned(); 458 - 459 - if length > config.blob.limit { 460 - return Err(Error::with_status( 461 - StatusCode::PAYLOAD_TOO_LARGE, 462 - anyhow!("size {} above limit {}", length, config.blob.limit), 463 - )); 464 - } 465 - 466 - // Read the blob data 467 - let mut body_data = Vec::new(); 468 - let mut stream = request.into_body().into_data_stream(); 469 - while let Some(bytes) = stream.try_next().await.context("failed to receive file")? { 470 - body_data.extend_from_slice(&bytes); 471 - 472 - // Check size limit incrementally 473 - if body_data.len() as u64 > config.blob.limit { 474 - return Err(Error::with_status( 475 - StatusCode::PAYLOAD_TOO_LARGE, 476 - anyhow!("size above limit and content-length header was wrong"), 477 - )); 478 - } 479 - } 480 - 481 - // Use Actor Store to upload the blob 482 - todo!(); 483 - } 484 - 485 - async fn todo() -> Result<()> { 486 - Err(Error::unimplemented(anyhow!("not implemented"))) 487 - } 488 - 489 - /// These endpoints are part of the atproto PDS repository management APIs. \ 490 - /// Requests usually require authentication (unlike the com.atproto.sync.* endpoints), and are made directly to the user's own PDS instance. 491 - /// ### Routes 492 - /// - AP /xrpc/com.atproto.repo.applyWrites -> [`apply_writes`] 493 - /// - AP /xrpc/com.atproto.repo.createRecord -> [`create_record`] 494 - /// - AP /xrpc/com.atproto.repo.putRecord -> [`put_record`] 495 - /// - AP /xrpc/com.atproto.repo.deleteRecord -> [`delete_record`] 496 - /// - AP /xrpc/com.atproto.repo.uploadBlob -> [`upload_blob`] 497 - /// - UG /xrpc/com.atproto.repo.describeRepo -> [`describe_repo`] 498 - /// - UG /xrpc/com.atproto.repo.getRecord -> [`get_record`] 499 - /// - UG /xrpc/com.atproto.repo.listRecords -> [`list_records`] 500 - /// - [ ] xx /xrpc/com.atproto.repo.importRepo 501 - // - [ ] xx /xrpc/com.atproto.repo.listMissingBlobs 502 - pub(super) fn routes() -> Router<AppState> { 503 - Router::new() 504 - .route(concat!("/", repo::apply_writes::NSID), post(apply_writes)) 505 - // .route(concat!("/", repo::create_record::NSID), post(create_record)) 506 - // .route(concat!("/", repo::put_record::NSID), post(put_record)) 507 - // .route(concat!("/", repo::delete_record::NSID), post(delete_record)) 508 - // .route(concat!("/", repo::upload_blob::NSID), post(upload_blob)) 509 - // .route(concat!("/", repo::describe_repo::NSID), get(describe_repo)) 510 - // .route(concat!("/", repo::get_record::NSID), get(get_record)) 511 - .route(concat!("/", repo::import_repo::NSID), post(todo)) 512 - .route(concat!("/", repo::list_missing_blobs::NSID), get(todo)) 513 - // .route(concat!("/", repo::list_records::NSID), get(list_records)) 514 - }
···
+117
src/apis/com/atproto/repo/upload_blob.rs
···
··· 1 + //! Upload a new blob, to be referenced from a repository record. 2 + use crate::config::AppConfig; 3 + use anyhow::Context as _; 4 + use axum::{ 5 + body::Bytes, 6 + http::{self, HeaderMap}, 7 + }; 8 + use rsky_lexicon::com::atproto::repo::{Blob, BlobOutput}; 9 + use rsky_repo::types::{BlobConstraint, PreparedBlobRef}; 10 + // use rsky_common::BadContentTypeError; 11 + 12 + use super::*; 13 + 14 + async fn inner_upload_blob( 15 + auth: AuthenticatedUser, 16 + blob: Bytes, 17 + content_type: String, 18 + actor_pools: HashMap<String, ActorStorage>, 19 + ) -> Result<BlobOutput> { 20 + // let requester = auth.access.credentials.unwrap().did.unwrap(); 21 + let requester = auth.did(); 22 + 23 + let actor_store = ActorStore::from_actor_pools(&requester, &actor_pools).await; 24 + 25 + let metadata = actor_store 26 + .blob 27 + .upload_blob_and_get_metadata(content_type, blob) 28 + .await?; 29 + let blobref = actor_store.blob.track_untethered_blob(metadata).await?; 30 + 31 + // make the blob permanent if an associated record is already indexed 32 + let records_for_blob = actor_store 33 + .blob 34 + .get_records_for_blob(blobref.get_cid()?) 35 + .await?; 36 + 37 + if !records_for_blob.is_empty() { 38 + actor_store 39 + .blob 40 + .verify_blob_and_make_permanent(PreparedBlobRef { 41 + cid: blobref.get_cid()?, 42 + mime_type: blobref.get_mime_type().to_string(), 43 + constraints: BlobConstraint { 44 + max_size: None, 45 + accept: None, 46 + }, 47 + }) 48 + .await?; 49 + } 50 + 51 + Ok(BlobOutput { 52 + blob: Blob { 53 + r#type: Some("blob".to_owned()), 54 + r#ref: Some(blobref.get_cid()?), 55 + cid: None, 56 + mime_type: blobref.get_mime_type().to_string(), 57 + size: blobref.get_size(), 58 + original: None, 59 + }, 60 + }) 61 + } 62 + 63 + /// Upload a new blob, to be referenced from a repository record. \ 64 + /// The blob will be deleted if it is not referenced within a time window (eg, minutes). \ 65 + /// Blob restrictions (mimetype, size, etc) are enforced when the reference is created. \ 66 + /// Requires auth, implemented by PDS. 67 + /// - POST /xrpc/com.atproto.repo.uploadBlob 68 + /// ### Request Body 69 + /// ### Responses 70 + /// - 200 OK: {"blob": "binary"} 71 + /// - 400 Bad Request: {error:[`InvalidRequest`, `ExpiredToken`, `InvalidToken`]} 72 + /// - 401 Unauthorized 73 + #[tracing::instrument(skip_all)] 74 + #[axum::debug_handler(state = AppState)] 75 + pub async fn upload_blob( 76 + auth: AuthenticatedUser, 77 + headers: HeaderMap, 78 + State(config): State<AppConfig>, 79 + State(actor_pools): State<HashMap<String, ActorStorage, RandomState>>, 80 + blob: Bytes, 81 + ) -> Result<Json<BlobOutput>, ApiError> { 82 + let content_length = headers 83 + .get(http::header::CONTENT_LENGTH) 84 + .context("no content length provided")? 85 + .to_str() 86 + .map_err(anyhow::Error::from) 87 + .and_then(|content_length| content_length.parse::<u64>().map_err(anyhow::Error::from)) 88 + .context("invalid content-length header")?; 89 + let content_type = headers 90 + .get(http::header::CONTENT_TYPE) 91 + .context("no content-type provided")? 92 + .to_str() 93 + // .map_err(BadContentTypeError::MissingType) 94 + .context("invalid content-type provided")? 95 + .to_owned(); 96 + 97 + if content_length > config.blob.limit { 98 + return Err(ApiError::InvalidRequest(format!( 99 + "Content-Length is greater than maximum of {}", 100 + config.blob.limit 101 + ))); 102 + }; 103 + if blob.len() as u64 > config.blob.limit { 104 + return Err(ApiError::InvalidRequest(format!( 105 + "Blob size is greater than maximum of {} despite content-length header", 106 + config.blob.limit 107 + ))); 108 + }; 109 + 110 + match inner_upload_blob(auth, blob, content_type, actor_pools).await { 111 + Ok(res) => Ok(Json(res)), 112 + Err(error) => { 113 + tracing::error!("{error:?}"); 114 + Err(ApiError::RuntimeError) 115 + } 116 + } 117 + }
+6
src/error.rs
··· 228 } 229 } 230 231 impl From<handle::errors::Error> for ApiError { 232 fn from(value: handle::errors::Error) -> Self { 233 match value.kind {
··· 228 } 229 } 230 231 + impl From<anyhow::Error> for ApiError { 232 + fn from(_value: anyhow::Error) -> Self { 233 + Self::RuntimeError 234 + } 235 + } 236 + 237 impl From<handle::errors::Error> for ApiError { 238 fn from(value: handle::errors::Error) -> Self { 239 match value.kind {
+1
src/lib.rs
··· 11 mod metrics; 12 mod models; 13 mod oauth; 14 mod schema; 15 mod serve; 16 mod service_proxy;
··· 11 mod metrics; 12 mod models; 13 mod oauth; 14 + mod pipethrough; 15 mod schema; 16 mod serve; 17 mod service_proxy;
+606
src/pipethrough.rs
···
··· 1 + //! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/pipethrough.rs 2 + //! blacksky-algorithms/rsky is licensed under the Apache License 2.0 3 + //! 4 + //! Modified for Axum instead of Rocket 5 + 6 + use anyhow::{Result, bail}; 7 + use axum::extract::{FromRequestParts, State}; 8 + use rsky_identity::IdResolver; 9 + use rsky_pds::apis::ApiError; 10 + use rsky_pds::auth_verifier::{AccessOutput, AccessStandard}; 11 + use rsky_pds::config::{ServerConfig, ServiceConfig, env_to_cfg}; 12 + use rsky_pds::pipethrough::{OverrideOpts, ProxyHeader, UrlAndAud}; 13 + use rsky_pds::xrpc_server::types::{HandlerPipeThrough, InvalidRequestError, XRPCError}; 14 + use rsky_pds::{APP_USER_AGENT, SharedIdResolver, context}; 15 + // use lazy_static::lazy_static; 16 + use reqwest::header::{CONTENT_TYPE, HeaderValue}; 17 + use reqwest::{Client, Method, RequestBuilder, Response}; 18 + // use rocket::data::ToByteUnit; 19 + // use rocket::http::{Method, Status}; 20 + // use rocket::request::{FromRequest, Outcome, Request}; 21 + // use rocket::{Data, State}; 22 + use axum::{ 23 + body::Bytes, 24 + http::{self, HeaderMap}, 25 + }; 26 + use rsky_common::{GetServiceEndpointOpts, get_service_endpoint}; 27 + use rsky_repo::types::Ids; 28 + use serde::de::DeserializeOwned; 29 + use serde_json::Value as JsonValue; 30 + use std::collections::{BTreeMap, HashSet}; 31 + use std::str::FromStr; 32 + use std::sync::Arc; 33 + use std::time::Duration; 34 + use ubyte::ToByteUnit as _; 35 + use url::Url; 36 + 37 + use crate::serve::AppState; 38 + 39 + // pub struct OverrideOpts { 40 + // pub aud: Option<String>, 41 + // pub lxm: Option<String>, 42 + // } 43 + 44 + // pub struct UrlAndAud { 45 + // pub url: Url, 46 + // pub aud: String, 47 + // pub lxm: String, 48 + // } 49 + 50 + // pub struct ProxyHeader { 51 + // pub did: String, 52 + // pub service_url: String, 53 + // } 54 + 55 + pub struct ProxyRequest { 56 + pub headers: BTreeMap<String, String>, 57 + pub query: Option<String>, 58 + pub path: String, 59 + pub method: Method, 60 + pub id_resolver: Arc<tokio::sync::RwLock<rsky_identity::IdResolver>>, 61 + pub cfg: ServerConfig, 62 + } 63 + impl FromRequestParts<AppState> for ProxyRequest { 64 + // type Rejection = ApiError; 65 + type Rejection = axum::response::Response; 66 + 67 + async fn from_request_parts( 68 + parts: &mut axum::http::request::Parts, 69 + state: &AppState, 70 + ) -> Result<Self, Self::Rejection> { 71 + let headers = parts 72 + .headers 73 + .iter() 74 + .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string())) 75 + .collect::<BTreeMap<String, String>>(); 76 + let query = parts.uri.query().map(|s| s.to_string()); 77 + let path = parts.uri.path().to_string(); 78 + let method = parts.method.clone(); 79 + let id_resolver = state.id_resolver.clone(); 80 + // let cfg = state.cfg.clone(); 81 + let cfg = env_to_cfg(); // TODO: use state.cfg.clone(); 82 + 83 + Ok(Self { 84 + headers, 85 + query, 86 + path, 87 + method, 88 + id_resolver, 89 + cfg, 90 + }) 91 + } 92 + } 93 + 94 + // #[rocket::async_trait] 95 + // impl<'r> FromRequest<'r> for HandlerPipeThrough { 96 + // type Error = anyhow::Error; 97 + 98 + // #[tracing::instrument(skip_all)] 99 + // async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> { 100 + // match AccessStandard::from_request(req).await { 101 + // Outcome::Success(output) => { 102 + // let AccessOutput { credentials, .. } = output.access; 103 + // let requester: Option<String> = match credentials { 104 + // None => None, 105 + // Some(credentials) => credentials.did, 106 + // }; 107 + // let headers = req.headers().clone().into_iter().fold( 108 + // BTreeMap::new(), 109 + // |mut acc: BTreeMap<String, String>, cur| { 110 + // let _ = acc.insert(cur.name().to_string(), cur.value().to_string()); 111 + // acc 112 + // }, 113 + // ); 114 + // let proxy_req = ProxyRequest { 115 + // headers, 116 + // query: match req.uri().query() { 117 + // None => None, 118 + // Some(query) => Some(query.to_string()), 119 + // }, 120 + // path: req.uri().path().to_string(), 121 + // method: req.method(), 122 + // id_resolver: req.guard::<&State<SharedIdResolver>>().await.unwrap(), 123 + // cfg: req.guard::<&State<ServerConfig>>().await.unwrap(), 124 + // }; 125 + // match pipethrough( 126 + // &proxy_req, 127 + // requester, 128 + // OverrideOpts { 129 + // aud: None, 130 + // lxm: None, 131 + // }, 132 + // ) 133 + // .await 134 + // { 135 + // Ok(res) => Outcome::Success(res), 136 + // Err(error) => match error.downcast_ref() { 137 + // Some(InvalidRequestError::XRPCError(xrpc)) => { 138 + // if let XRPCError::FailedResponse { 139 + // status, 140 + // error, 141 + // message, 142 + // headers, 143 + // } = xrpc 144 + // { 145 + // tracing::error!( 146 + // "@LOG: XRPC ERROR Status:{status}; Message: {message:?}; Error: {error:?}; Headers: {headers:?}" 147 + // ); 148 + // } 149 + // req.local_cache(|| Some(ApiError::InvalidRequest(error.to_string()))); 150 + // Outcome::Error((Status::BadRequest, error)) 151 + // } 152 + // _ => { 153 + // req.local_cache(|| Some(ApiError::InvalidRequest(error.to_string()))); 154 + // Outcome::Error((Status::BadRequest, error)) 155 + // } 156 + // }, 157 + // } 158 + // } 159 + // Outcome::Error(err) => { 160 + // req.local_cache(|| Some(ApiError::RuntimeError)); 161 + // Outcome::Error(( 162 + // Status::BadRequest, 163 + // anyhow::Error::new(InvalidRequestError::AuthError(err.1)), 164 + // )) 165 + // } 166 + // _ => panic!("Unexpected outcome during Pipethrough"), 167 + // } 168 + // } 169 + // } 170 + 171 + // #[rocket::async_trait] 172 + // impl<'r> FromRequest<'r> for ProxyRequest<'r> { 173 + // type Error = anyhow::Error; 174 + 175 + // async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> { 176 + // let headers = req.headers().clone().into_iter().fold( 177 + // BTreeMap::new(), 178 + // |mut acc: BTreeMap<String, String>, cur| { 179 + // let _ = acc.insert(cur.name().to_string(), cur.value().to_string()); 180 + // acc 181 + // }, 182 + // ); 183 + // Outcome::Success(Self { 184 + // headers, 185 + // query: match req.uri().query() { 186 + // None => None, 187 + // Some(query) => Some(query.to_string()), 188 + // }, 189 + // path: req.uri().path().to_string(), 190 + // method: req.method(), 191 + // id_resolver: req.guard::<&State<SharedIdResolver>>().await.unwrap(), 192 + // cfg: req.guard::<&State<ServerConfig>>().await.unwrap(), 193 + // }) 194 + // } 195 + // } 196 + 197 + pub async fn pipethrough( 198 + req: &ProxyRequest, 199 + requester: Option<String>, 200 + override_opts: OverrideOpts, 201 + ) -> Result<HandlerPipeThrough> { 202 + let UrlAndAud { 203 + url, 204 + aud, 205 + lxm: nsid, 206 + } = format_url_and_aud(req, override_opts.aud).await?; 207 + let lxm = override_opts.lxm.unwrap_or(nsid); 208 + let headers = format_headers(req, aud, lxm, requester).await?; 209 + let req_init = format_req_init(req, url, headers, None)?; 210 + let res = make_request(req_init).await?; 211 + parse_proxy_res(res).await 212 + } 213 + 214 + pub async fn pipethrough_procedure<T: serde::Serialize>( 215 + req: &ProxyRequest, 216 + requester: Option<String>, 217 + body: Option<T>, 218 + ) -> Result<HandlerPipeThrough> { 219 + let UrlAndAud { 220 + url, 221 + aud, 222 + lxm: nsid, 223 + } = format_url_and_aud(req, None).await?; 224 + let headers = format_headers(req, aud, nsid, requester).await?; 225 + let encoded_body: Option<Vec<u8>> = match body { 226 + None => None, 227 + Some(body) => Some(serde_json::to_string(&body)?.into_bytes()), 228 + }; 229 + let req_init = format_req_init(req, url, headers, encoded_body)?; 230 + let res = make_request(req_init).await?; 231 + parse_proxy_res(res).await 232 + } 233 + 234 + #[tracing::instrument(skip_all)] 235 + pub async fn pipethrough_procedure_post( 236 + req: &ProxyRequest, 237 + requester: Option<String>, 238 + body: Option<Bytes>, 239 + ) -> Result<HandlerPipeThrough, ApiError> { 240 + let UrlAndAud { 241 + url, 242 + aud, 243 + lxm: nsid, 244 + } = format_url_and_aud(req, None).await?; 245 + let headers = format_headers(req, aud, nsid, requester).await?; 246 + let encoded_body: Option<JsonValue>; 247 + match body { 248 + None => encoded_body = None, 249 + Some(body) => { 250 + // let res = match body.open(50.megabytes()).into_string().await { 251 + // Ok(res1) => { 252 + // tracing::info!(res1.value); 253 + // res1.value 254 + // } 255 + // Err(error) => { 256 + // tracing::error!("{error}"); 257 + // return Err(ApiError::RuntimeError); 258 + // } 259 + // }; 260 + let res = String::from_utf8(body.to_vec()).expect("Invalid UTF-8"); 261 + 262 + match serde_json::from_str(res.as_str()) { 263 + Ok(res) => { 264 + encoded_body = Some(res); 265 + } 266 + Err(error) => { 267 + tracing::error!("{error}"); 268 + return Err(ApiError::RuntimeError); 269 + } 270 + } 271 + } 272 + }; 273 + let req_init = format_req_init_with_value(req, url, headers, encoded_body)?; 274 + let res = make_request(req_init).await?; 275 + Ok(parse_proxy_res(res).await?) 276 + } 277 + 278 + // Request setup/formatting 279 + // ------------------- 280 + 281 + const REQ_HEADERS_TO_FORWARD: [&str; 4] = [ 282 + "accept-language", 283 + "content-type", 284 + "atproto-accept-labelers", 285 + "x-bsky-topics", 286 + ]; 287 + 288 + #[tracing::instrument(skip_all)] 289 + pub async fn format_url_and_aud( 290 + req: &ProxyRequest, 291 + aud_override: Option<String>, 292 + ) -> Result<UrlAndAud> { 293 + let proxy_to = parse_proxy_header(req).await?; 294 + let nsid = parse_req_nsid(req); 295 + let default_proxy = default_service(req, &nsid).await; 296 + let service_url = match proxy_to { 297 + Some(ref proxy_to) => { 298 + tracing::info!( 299 + "@LOG: format_url_and_aud() proxy_to: {:?}", 300 + proxy_to.service_url 301 + ); 302 + Some(proxy_to.service_url.clone()) 303 + } 304 + None => match default_proxy { 305 + Some(ref default_proxy) => Some(default_proxy.url.clone()), 306 + None => None, 307 + }, 308 + }; 309 + let aud = match aud_override { 310 + Some(_) => aud_override, 311 + None => match proxy_to { 312 + Some(proxy_to) => Some(proxy_to.did), 313 + None => match default_proxy { 314 + Some(default_proxy) => Some(default_proxy.did), 315 + None => None, 316 + }, 317 + }, 318 + }; 319 + match (service_url, aud) { 320 + (Some(service_url), Some(aud)) => { 321 + let mut url = Url::parse(format!("{0}{1}", service_url, req.path).as_str())?; 322 + if let Some(ref params) = req.query { 323 + url.set_query(Some(params.as_str())); 324 + } 325 + if !req.cfg.service.dev_mode && !is_safe_url(url.clone()) { 326 + bail!(InvalidRequestError::InvalidServiceUrl(url.to_string())); 327 + } 328 + Ok(UrlAndAud { 329 + url, 330 + aud, 331 + lxm: nsid, 332 + }) 333 + } 334 + _ => bail!(InvalidRequestError::NoServiceConfigured(req.path.clone())), 335 + } 336 + } 337 + 338 + pub async fn format_headers( 339 + req: &ProxyRequest, 340 + aud: String, 341 + lxm: String, 342 + requester: Option<String>, 343 + ) -> Result<HeaderMap> { 344 + let mut headers: HeaderMap = match requester { 345 + Some(requester) => context::service_auth_headers(&requester, &aud, &lxm).await?, 346 + None => HeaderMap::new(), 347 + }; 348 + // forward select headers to upstream services 349 + for header in REQ_HEADERS_TO_FORWARD { 350 + let val = req.headers.get(header); 351 + if let Some(val) = val { 352 + headers.insert(header, HeaderValue::from_str(val)?); 353 + } 354 + } 355 + Ok(headers) 356 + } 357 + 358 + pub fn format_req_init( 359 + req: &ProxyRequest, 360 + url: Url, 361 + headers: HeaderMap, 362 + body: Option<Vec<u8>>, 363 + ) -> Result<RequestBuilder> { 364 + match req.method { 365 + Method::GET => { 366 + let client = Client::builder() 367 + .user_agent(APP_USER_AGENT) 368 + .http2_keep_alive_while_idle(true) 369 + .http2_keep_alive_timeout(Duration::from_secs(5)) 370 + .default_headers(headers) 371 + .build()?; 372 + Ok(client.get(url)) 373 + } 374 + Method::HEAD => { 375 + let client = Client::builder() 376 + .user_agent(APP_USER_AGENT) 377 + .http2_keep_alive_while_idle(true) 378 + .http2_keep_alive_timeout(Duration::from_secs(5)) 379 + .default_headers(headers) 380 + .build()?; 381 + Ok(client.head(url)) 382 + } 383 + Method::POST => { 384 + let client = Client::builder() 385 + .user_agent(APP_USER_AGENT) 386 + .http2_keep_alive_while_idle(true) 387 + .http2_keep_alive_timeout(Duration::from_secs(5)) 388 + .default_headers(headers) 389 + .build()?; 390 + Ok(client.post(url).body(body.unwrap())) 391 + } 392 + _ => bail!(InvalidRequestError::MethodNotFound), 393 + } 394 + } 395 + 396 + pub fn format_req_init_with_value( 397 + req: &ProxyRequest, 398 + url: Url, 399 + headers: HeaderMap, 400 + body: Option<JsonValue>, 401 + ) -> Result<RequestBuilder> { 402 + match req.method { 403 + Method::GET => { 404 + let client = Client::builder() 405 + .user_agent(APP_USER_AGENT) 406 + .http2_keep_alive_while_idle(true) 407 + .http2_keep_alive_timeout(Duration::from_secs(5)) 408 + .default_headers(headers) 409 + .build()?; 410 + Ok(client.get(url)) 411 + } 412 + Method::HEAD => { 413 + let client = Client::builder() 414 + .user_agent(APP_USER_AGENT) 415 + .http2_keep_alive_while_idle(true) 416 + .http2_keep_alive_timeout(Duration::from_secs(5)) 417 + .default_headers(headers) 418 + .build()?; 419 + Ok(client.head(url)) 420 + } 421 + Method::POST => { 422 + let client = Client::builder() 423 + .user_agent(APP_USER_AGENT) 424 + .http2_keep_alive_while_idle(true) 425 + .http2_keep_alive_timeout(Duration::from_secs(5)) 426 + .default_headers(headers) 427 + .build()?; 428 + Ok(client.post(url).json(&body.unwrap())) 429 + } 430 + _ => bail!(InvalidRequestError::MethodNotFound), 431 + } 432 + } 433 + 434 + pub async fn parse_proxy_header(req: &ProxyRequest) -> Result<Option<ProxyHeader>> { 435 + let headers = &req.headers; 436 + let proxy_to: Option<&String> = headers.get("atproto-proxy"); 437 + match proxy_to { 438 + None => Ok(None), 439 + Some(proxy_to) => { 440 + let parts: Vec<&str> = proxy_to.split("#").collect::<Vec<&str>>(); 441 + match (parts.get(0), parts.get(1), parts.get(2)) { 442 + (Some(did), Some(service_id), None) => { 443 + let did = did.to_string(); 444 + let mut lock = req.id_resolver.write().await; 445 + match lock.did.resolve(did.clone(), None).await? { 446 + None => bail!(InvalidRequestError::CannotResolveProxyDid), 447 + Some(did_doc) => { 448 + match get_service_endpoint( 449 + did_doc, 450 + GetServiceEndpointOpts { 451 + id: format!("#{service_id}"), 452 + r#type: None, 453 + }, 454 + ) { 455 + None => bail!(InvalidRequestError::CannotResolveServiceUrl), 456 + Some(service_url) => Ok(Some(ProxyHeader { did, service_url })), 457 + } 458 + } 459 + } 460 + } 461 + (_, None, _) => bail!(InvalidRequestError::NoServiceId), 462 + _ => bail!("error parsing atproto-proxy header"), 463 + } 464 + } 465 + } 466 + } 467 + 468 + pub fn parse_req_nsid(req: &ProxyRequest) -> String { 469 + let nsid = req.path.as_str().replace("/xrpc/", ""); 470 + match nsid.ends_with("/") { 471 + false => nsid, 472 + true => nsid 473 + .trim_end_matches(|c| c == nsid.chars().last().unwrap()) 474 + .to_string(), 475 + } 476 + } 477 + 478 + // Sending request 479 + // ------------------- 480 + #[tracing::instrument(skip_all)] 481 + pub async fn make_request(req_init: RequestBuilder) -> Result<Response> { 482 + let res = req_init.send().await; 483 + match res { 484 + Err(e) => { 485 + tracing::error!("@LOG WARN: pipethrough network error {}", e.to_string()); 486 + bail!(InvalidRequestError::XRPCError(XRPCError::UpstreamFailure)) 487 + } 488 + Ok(res) => match res.error_for_status_ref() { 489 + Ok(_) => Ok(res), 490 + Err(_) => { 491 + let status = res.status().to_string(); 492 + let headers = res.headers().clone(); 493 + let error_body = res.json::<JsonValue>().await?; 494 + bail!(InvalidRequestError::XRPCError(XRPCError::FailedResponse { 495 + status, 496 + headers, 497 + error: match error_body["error"].as_str() { 498 + None => None, 499 + Some(error_body_error) => Some(error_body_error.to_string()), 500 + }, 501 + message: match error_body["message"].as_str() { 502 + None => None, 503 + Some(error_body_message) => Some(error_body_message.to_string()), 504 + } 505 + })) 506 + } 507 + }, 508 + } 509 + } 510 + 511 + // Response parsing/forwarding 512 + // ------------------- 513 + 514 + const RES_HEADERS_TO_FORWARD: [&str; 4] = [ 515 + "content-type", 516 + "content-language", 517 + "atproto-repo-rev", 518 + "atproto-content-labelers", 519 + ]; 520 + 521 + pub async fn parse_proxy_res(res: Response) -> Result<HandlerPipeThrough> { 522 + let encoding = match res.headers().get(CONTENT_TYPE) { 523 + Some(content_type) => content_type.to_str()?, 524 + None => "application/json", 525 + }; 526 + // Release borrow 527 + let encoding = encoding.to_string(); 528 + let res_headers = RES_HEADERS_TO_FORWARD.into_iter().fold( 529 + BTreeMap::new(), 530 + |mut acc: BTreeMap<String, String>, cur| { 531 + let _ = match res.headers().get(cur) { 532 + Some(res_header_val) => acc.insert( 533 + cur.to_string(), 534 + res_header_val.clone().to_str().unwrap().to_string(), 535 + ), 536 + None => None, 537 + }; 538 + acc 539 + }, 540 + ); 541 + let buffer = read_array_buffer_res(res).await?; 542 + Ok(HandlerPipeThrough { 543 + encoding, 544 + buffer, 545 + headers: Some(res_headers), 546 + }) 547 + } 548 + 549 + // Utils 550 + // ------------------- 551 + 552 + pub async fn default_service(req: &ProxyRequest, nsid: &str) -> Option<ServiceConfig> { 553 + let cfg = req.cfg.clone(); 554 + match Ids::from_str(nsid) { 555 + Ok(Ids::ToolsOzoneTeamAddMember) => cfg.mod_service, 556 + Ok(Ids::ToolsOzoneTeamDeleteMember) => cfg.mod_service, 557 + Ok(Ids::ToolsOzoneTeamUpdateMember) => cfg.mod_service, 558 + Ok(Ids::ToolsOzoneTeamListMembers) => cfg.mod_service, 559 + Ok(Ids::ToolsOzoneCommunicationCreateTemplate) => cfg.mod_service, 560 + Ok(Ids::ToolsOzoneCommunicationDeleteTemplate) => cfg.mod_service, 561 + Ok(Ids::ToolsOzoneCommunicationUpdateTemplate) => cfg.mod_service, 562 + Ok(Ids::ToolsOzoneCommunicationListTemplates) => cfg.mod_service, 563 + Ok(Ids::ToolsOzoneModerationEmitEvent) => cfg.mod_service, 564 + Ok(Ids::ToolsOzoneModerationGetEvent) => cfg.mod_service, 565 + Ok(Ids::ToolsOzoneModerationGetRecord) => cfg.mod_service, 566 + Ok(Ids::ToolsOzoneModerationGetRepo) => cfg.mod_service, 567 + Ok(Ids::ToolsOzoneModerationQueryEvents) => cfg.mod_service, 568 + Ok(Ids::ToolsOzoneModerationQueryStatuses) => cfg.mod_service, 569 + Ok(Ids::ToolsOzoneModerationSearchRepos) => cfg.mod_service, 570 + Ok(Ids::ComAtprotoModerationCreateReport) => cfg.report_service, 571 + _ => cfg.bsky_app_view, 572 + } 573 + } 574 + 575 + pub fn parse_res<T: DeserializeOwned>(_nsid: String, res: HandlerPipeThrough) -> Result<T> { 576 + let buffer = res.buffer; 577 + let record = serde_json::from_slice::<T>(buffer.as_slice())?; 578 + Ok(record) 579 + } 580 + 581 + #[tracing::instrument(skip_all)] 582 + pub async fn read_array_buffer_res(res: Response) -> Result<Vec<u8>> { 583 + match res.bytes().await { 584 + Ok(bytes) => Ok(bytes.to_vec()), 585 + Err(err) => { 586 + tracing::error!("@LOG WARN: pipethrough network error {}", err.to_string()); 587 + bail!("UpstreamFailure") 588 + } 589 + } 590 + } 591 + 592 + pub fn is_safe_url(url: Url) -> bool { 593 + if url.scheme() != "https" { 594 + return false; 595 + } 596 + match url.host_str() { 597 + None => false, 598 + Some(hostname) if hostname == "localhost" => false, 599 + Some(hostname) => { 600 + if std::net::IpAddr::from_str(hostname).is_ok() { 601 + return false; 602 + } 603 + true 604 + } 605 + } 606 + }
+28 -19
src/serve.rs
··· 1 - use super::account_manager::{AccountManager, SharedAccountManager}; 2 use super::config::AppConfig; 3 use super::db::establish_pool; 4 pub use super::error::Error; ··· 14 use diesel_migrations::{EmbeddedMigrations, embed_migrations}; 15 use figment::{Figment, providers::Format as _}; 16 use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 17 use rsky_pds::{crawlers::Crawlers, sequencer::Sequencer}; 18 use serde::{Deserialize, Serialize}; 19 use std::{ 20 net::{IpAddr, Ipv4Addr, SocketAddr}, 21 path::PathBuf, ··· 32 33 /// Embedded migrations 34 pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); 35 36 /// The application-wide result type. 37 pub type Result<T> = std::result::Result<T, Error>; 38 /// The reqwest client type with middleware. 39 pub type Client = reqwest_middleware::ClientWithMiddleware; 40 - 41 - /// The Shared Sequencer which requests crawls from upstream relays and emits events to the firehose. 42 - pub struct SharedSequencer { 43 - /// The sequencer instance. 44 - pub sequencer: RwLock<Sequencer>, 45 - } 46 47 #[expect( 48 clippy::arbitrary_source_item_ordering, ··· 136 /// The simple HTTP client. 137 pub simple_client: reqwest::Client, 138 /// The firehose producer. 139 - pub sequencer: Arc<SharedSequencer>, 140 /// The account manager. 141 - pub account_manager: Arc<SharedAccountManager>, 142 143 /// The signing key. 144 pub signing_key: SigningKey, ··· 293 .iter() 294 .map(|s| s.to_string()) 295 .collect(); 296 - let sequencer = Arc::new(SharedSequencer { 297 - sequencer: RwLock::new(Sequencer::new( 298 - Crawlers::new(hostname, crawlers.clone()), 299 - None, 300 - )), 301 - }); 302 - let account_manager = SharedAccountManager { 303 - account_manager: RwLock::new(AccountManager::new(pool.clone())), 304 }; 305 306 let addr = config 307 .listen_address ··· 326 client: client.clone(), 327 simple_client, 328 sequencer: sequencer.clone(), 329 - account_manager: Arc::new(account_manager), 330 signing_key: skey, 331 rotation_key: rkey, 332 }); ··· 406 info!("debug mode: not requesting crawl"); 407 } else { 408 info!("requesting crawl from upstream relays"); 409 - let mut background_sequencer = sequencer.sequencer.write().await.clone(); 410 drop(tokio::spawn( 411 async move { background_sequencer.start().await }, 412 ));
··· 1 + use super::account_manager::AccountManager; 2 use super::config::AppConfig; 3 use super::db::establish_pool; 4 pub use super::error::Error; ··· 14 use diesel_migrations::{EmbeddedMigrations, embed_migrations}; 15 use figment::{Figment, providers::Format as _}; 16 use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 17 + use rsky_common::env::env_list; 18 + use rsky_identity::IdResolver; 19 + use rsky_identity::types::{DidCache, IdentityResolverOpts}; 20 use rsky_pds::{crawlers::Crawlers, sequencer::Sequencer}; 21 use serde::{Deserialize, Serialize}; 22 + use std::env; 23 use std::{ 24 net::{IpAddr, Ipv4Addr, SocketAddr}, 25 path::PathBuf, ··· 36 37 /// Embedded migrations 38 pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); 39 + pub const MIGRATIONS_ACTOR: EmbeddedMigrations = embed_migrations!("./migrations_actor"); 40 41 /// The application-wide result type. 42 pub type Result<T> = std::result::Result<T, Error>; 43 /// The reqwest client type with middleware. 44 pub type Client = reqwest_middleware::ClientWithMiddleware; 45 46 #[expect( 47 clippy::arbitrary_source_item_ordering, ··· 135 /// The simple HTTP client. 136 pub simple_client: reqwest::Client, 137 /// The firehose producer. 138 + pub sequencer: Arc<RwLock<Sequencer>>, 139 /// The account manager. 140 + pub account_manager: Arc<RwLock<AccountManager>>, 141 + /// The ID resolver. 142 + pub id_resolver: Arc<RwLock<IdResolver>>, 143 144 /// The signing key. 145 pub signing_key: SigningKey, ··· 294 .iter() 295 .map(|s| s.to_string()) 296 .collect(); 297 + let sequencer = Arc::new(RwLock::new(Sequencer::new( 298 + Crawlers::new(hostname, crawlers.clone()), 299 + None, 300 + ))); 301 + let account_manager = Arc::new(RwLock::new(AccountManager::new(pool.clone()))); 302 + let plc_url = if cfg!(debug_assertions) { 303 + "http://localhost:8000".to_owned() // dummy for debug 304 + } else { 305 + env::var("PDS_DID_PLC_URL").unwrap_or("https://plc.directory".to_owned()) // TODO: toml config 306 }; 307 + let id_resolver = Arc::new(RwLock::new(IdResolver::new(IdentityResolverOpts { 308 + timeout: None, 309 + plc_url: Some(plc_url), 310 + did_cache: Some(DidCache::new(None, None)), 311 + backup_nameservers: Some(env_list("PDS_HANDLE_BACKUP_NAMESERVERS")), 312 + }))); 313 314 let addr = config 315 .listen_address ··· 334 client: client.clone(), 335 simple_client, 336 sequencer: sequencer.clone(), 337 + account_manager, 338 + id_resolver, 339 signing_key: skey, 340 rotation_key: rkey, 341 }); ··· 415 info!("debug mode: not requesting crawl"); 416 } else { 417 info!("requesting crawl from upstream relays"); 418 + let mut background_sequencer = sequencer.write().await.clone(); 419 drop(tokio::spawn( 420 async move { background_sequencer.start().await }, 421 ));