Alternative ATProto PDS implementation

prototype actor_store

Changed files
+62 -48
src
+2 -4
src/actor_store/actor_store.rs
··· 9 9 use cidv10::Cid; 10 10 use diesel::*; 11 11 use futures::stream::{self, StreamExt}; 12 - use rsky_common; 13 12 use rsky_pds::actor_store::repo::types::SyncEvtData; 14 13 use rsky_repo::repo::Repo; 15 14 use rsky_repo::storage::readable_blockstore::ReadableBlockstore; ··· 22 21 use rsky_syntax::aturi::AtUri; 23 22 use secp256k1::{Keypair, Secp256k1, SecretKey}; 24 23 use std::env; 25 - use std::fmt; 26 24 use std::str::FromStr; 27 25 use std::sync::Arc; 28 26 use tokio::sync::RwLock; ··· 416 414 pub async fn destroy(&mut self) -> Result<()> { 417 415 let did: String = self.did.clone(); 418 416 let storage_guard = self.storage.read().await; 419 - let db: Arc<ActorDb> = storage_guard.db.clone(); 417 + let db: ActorDb = storage_guard.db.clone(); 420 418 use rsky_pds::schema::pds::blob::dsl as BlobSchema; 421 419 422 420 let blob_rows: Vec<String> = db ··· 450 448 } 451 449 let did: String = self.did.clone(); 452 450 let storage_guard = self.storage.read().await; 453 - let db: Arc<ActorDb> = storage_guard.db.clone(); 451 + let db: ActorDb = storage_guard.db.clone(); 454 452 use rsky_pds::schema::pds::record::dsl as RecordSchema; 455 453 456 454 let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
+26 -23
src/actor_store/blob.rs
··· 28 28 use rsky_pds::models::models; 29 29 use rsky_repo::error::BlobError; 30 30 use rsky_repo::types::{PreparedBlobRef, PreparedWrite}; 31 - use sha2::{Digest, Sha256}; 31 + use sha2::Digest; 32 32 33 33 use super::ActorDb; 34 34 use super::sql_blob::BlobStoreSql; ··· 42 42 // Basically handles getting blob records from db 43 43 impl BlobReader { 44 44 pub fn new(blobstore: BlobStoreSql, db: ActorDb) -> Self { 45 - BlobReader { 46 - did: blobstore.bucket.clone(), 47 - blobstore, 48 - db, 49 - } 45 + // BlobReader { 46 + // did: blobstore.bucket.clone(), 47 + // blobstore, 48 + // db, 49 + // } 50 + todo!(); 50 51 } 51 52 52 53 pub async fn get_blob_metadata(&self, cid: Cid) -> Result<GetBlobMetadataOutput> { ··· 77 78 78 79 pub async fn get_blob(&self, cid: Cid) -> Result<GetBlobOutput> { 79 80 let metadata = self.get_blob_metadata(cid).await?; 80 - let blob_stream = match self.blobstore.get_stream(cid).await { 81 - Ok(res) => res, 82 - Err(e) => { 83 - return match e.downcast_ref() { 84 - Some(GetObjectError::NoSuchKey(key)) => { 85 - Err(anyhow::Error::new(GetObjectError::NoSuchKey(key.clone()))) 86 - } 87 - _ => bail!(e.to_string()), 88 - }; 89 - } 90 - }; 91 - Ok(GetBlobOutput { 92 - size: metadata.size, 93 - mime_type: metadata.mime_type, 94 - stream: blob_stream, 95 - }) 81 + // let blob_stream = match self.blobstore.get_stream(cid).await { 82 + // Ok(res) => res, 83 + // Err(e) => { 84 + // return match e.downcast_ref() { 85 + // Some(GetObjectError::NoSuchKey(key)) => { 86 + // Err(anyhow::Error::new(GetObjectError::NoSuchKey(key.clone()))) 87 + // } 88 + // _ => bail!(e.to_string()), 89 + // }; 90 + // } 91 + // }; 92 + // Ok(GetBlobOutput { 93 + // size: metadata.size, 94 + // mime_type: metadata.mime_type, 95 + // stream: blob_stream, 96 + // }) 97 + todo!(); 96 98 } 97 99 98 100 pub async fn get_records_for_blob(&self, cid: Cid) -> Result<Vec<String>> { ··· 118 120 pub async fn upload_blob_and_get_metadata( 119 121 &self, 120 122 user_suggested_mime: String, 121 - blob: Data<'_>, 123 + blob: Data<'_>, // Type representing the body data of a request. 122 124 ) -> Result<BlobMetadata> { 125 + todo!(); 123 126 let blob_stream = blob.open(100.mebibytes()); 124 127 let bytes = blob_stream.into_bytes().await?; 125 128 let size = bytes.n.written;
-2
src/actor_store/mod.rs
··· 8 8 mod sql_blob; 9 9 mod sql_repo; 10 10 11 - pub(crate) use actor_store::ActorStore; 12 11 pub(crate) use db::ActorDb; 13 - pub(crate) use sql_blob::BlobStoreSql;
+1 -2
src/actor_store/preference.rs
··· 11 11 use rsky_pds::actor_store::preference::util::pref_in_scope; 12 12 use rsky_pds::auth_verifier::AuthScope; 13 13 use rsky_pds::db::DbConn; 14 - use rsky_pds::models; 15 14 use rsky_pds::models::AccountPref; 16 15 use std::sync::Arc; 17 16 ··· 95 94 use rsky_pds::schema::pds::account_pref::dsl as AccountPrefSchema; 96 95 let all_prefs = AccountPrefSchema::account_pref 97 96 .filter(AccountPrefSchema::did.eq(&did)) 98 - .select(models::AccountPref::as_select()) 97 + .select(AccountPref::as_select()) 99 98 .load(conn)?; 100 99 let put_prefs = values 101 100 .into_iter()
+6 -13
src/actor_store/record.rs
··· 10 10 use futures::stream::{self, StreamExt}; 11 11 use rsky_lexicon::com::atproto::admin::StatusAttr; 12 12 use rsky_pds::actor_store::record::{GetRecord, RecordsForCollection, get_backlinks}; 13 - use rsky_pds::models::{Backlink, Record}; 13 + use rsky_pds::models::{Backlink, Record, RepoBlock}; 14 14 use rsky_repo::types::{RepoRecord, WriteOpAction}; 15 15 use rsky_repo::util::cbor_to_lex_record; 16 16 use rsky_syntax::aturi::AtUri; ··· 87 87 let mut builder = RecordSchema::record 88 88 .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) 89 89 .limit(limit) 90 - .select(( 91 - rsky_pds::models::Record::as_select(), 92 - rsky_pds::models::RepoBlock::as_select(), 93 - )) 90 + .select((Record::as_select(), RepoBlock::as_select())) 94 91 .filter(RecordSchema::did.eq(self.did.clone())) 95 92 .filter(RecordSchema::collection.eq(collection)) 96 93 .into_boxed(); ··· 117 114 builder = builder.filter(RecordSchema::rkey.lt(rkey_end)); 118 115 } 119 116 } 120 - let res: Vec<(rsky_pds::models::Record, rsky_pds::models::RepoBlock)> = 121 - self.db.run(move |conn| builder.load(conn)).await?; 117 + let res: Vec<(Record, RepoBlock)> = self.db.run(move |conn| builder.load(conn)).await?; 122 118 res.into_iter() 123 119 .map(|row| { 124 120 Ok(RecordsForCollection { ··· 147 143 }; 148 144 let mut builder = RecordSchema::record 149 145 .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) 150 - .select(( 151 - rsky_pds::models::Record::as_select(), 152 - rsky_pds::models::RepoBlock::as_select(), 153 - )) 146 + .select((Record::as_select(), RepoBlock::as_select())) 154 147 .filter(RecordSchema::uri.eq(uri.to_string())) 155 148 .into_boxed(); 156 149 if !include_soft_deleted { ··· 159 152 if let Some(cid) = cid { 160 153 builder = builder.filter(RecordSchema::cid.eq(cid)); 161 154 } 162 - let record: Option<(rsky_pds::models::Record, rsky_pds::models::RepoBlock)> = self 155 + let record: Option<(Record, RepoBlock)> = self 163 156 .db 164 157 .run(move |conn| builder.first(conn).optional()) 165 158 .await?; ··· 298 291 let record_backlinks = get_backlinks(uri, record)?; 299 292 let conflicts: Vec<Vec<Record>> = stream::iter(record_backlinks) 300 293 .then(|backlink| async move { 301 - Ok::<Vec<Record>, anyhow::Error>( 294 + Ok::<Vec<Record>, Error>( 302 295 self.get_record_backlinks( 303 296 uri.get_collection(), 304 297 backlink.path,
+27 -4
src/actor_store/sql_blob.rs
··· 16 16 path: PathBuf, 17 17 } 18 18 19 + /// Configuration for the blob store 20 + /// TODO: Implement this placeholder 21 + pub(crate) struct BlobConfig { 22 + pub(crate) path: PathBuf, 23 + } 24 + 25 + /// ByteStream 26 + /// TODO: Implement this placeholder 27 + pub(crate) struct ByteStream { 28 + pub(crate) bytes: Vec<u8>, 29 + } 30 + impl ByteStream { 31 + pub async fn collect(self) -> Result<Vec<u8>> { 32 + Ok(self.bytes) 33 + } 34 + } 35 + 19 36 impl BlobStoreSql { 20 - pub fn new(did: String, cfg: &SdkConfig) -> Self { 37 + pub fn new(did: String, cfg: &BlobConfig) -> Self { 21 38 // let client = aws_sdk_s3::Client::new(cfg); 22 39 // BlobStorePlaceholder { 23 40 // client, ··· 26 43 todo!(); 27 44 } 28 45 29 - pub fn creator(cfg: &SdkConfig) -> Box<dyn Fn(String) -> BlobStoreSql + '_> { 46 + pub fn creator(cfg: &BlobConfig) -> Box<dyn Fn(String) -> BlobStoreSql + '_> { 30 47 Box::new(move |did: String| BlobStoreSql::new(did, cfg)) 31 48 } 32 49 ··· 130 147 131 148 pub async fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>> { 132 149 let res = self.get_object(cid).await?; 133 - let bytes = res.collect().await.map(|data| data.into_bytes())?; 134 - Ok(bytes.to_vec()) 150 + // let bytes = res.collect().await.map(|data| data.into_bytes())?; 151 + // Ok(bytes.to_vec()) 152 + todo!(); 135 153 } 136 154 137 155 pub async fn get_stream(&self, cid: Cid) -> Result<ByteStream> { ··· 222 240 todo!(); 223 241 } 224 242 } 243 + 244 + struct MoveObject { 245 + from: String, 246 + to: String, 247 + }