Alternative ATProto PDS implementation

prototype actor_store

+5 -6
src/actor_store/actor_store_reader.rs
··· 3 4 use super::{ 5 ActorStoreTransactor, actor_store_resources::ActorStoreResources, db::ActorDb, 6 - preference::PreferenceReader, record::RecordReader, 7 }; 8 use crate::SigningKey; 9 ··· 15 pub(crate) record: RecordReader, 16 /// Preference reader. 17 pub(crate) pref: PreferenceReader, 18 - /// RepoReader placeholder - will be implemented later. 19 - pub(crate) repo: (), // Placeholder for RepoReader 20 /// Function to get keypair. 21 keypair_fn: Box<dyn Fn() -> Result<Arc<SigningKey>> + Send + Sync>, 22 /// Database connection. ··· 43 // Initial keypair call as in TypeScript implementation 44 let _ = keypair_fn(); 45 46 - // For now, we use a placeholder for RepoReader 47 - // Real implementation will need to be added later 48 - let repo = (); 49 50 Self { 51 did,
··· 3 4 use super::{ 5 ActorStoreTransactor, actor_store_resources::ActorStoreResources, db::ActorDb, 6 + preference::PreferenceReader, record::RecordReader, repo::RepoReader, 7 }; 8 use crate::SigningKey; 9 ··· 15 pub(crate) record: RecordReader, 16 /// Preference reader. 17 pub(crate) pref: PreferenceReader, 18 + /// RepoReader 19 + pub(crate) repo: RepoReader, 20 /// Function to get keypair. 21 keypair_fn: Box<dyn Fn() -> Result<Arc<SigningKey>> + Send + Sync>, 22 /// Database connection. ··· 43 // Initial keypair call as in TypeScript implementation 44 let _ = keypair_fn(); 45 46 + // Create repo reader 47 + let repo = RepoReader::new(db.clone(), resources.blobstore(did.clone())); 48 49 Self { 50 did,
+8 -1
src/actor_store/actor_store_transactor.rs
··· 34 35 let record = RecordTransactor::new(db.clone(), blobstore.clone()); 36 let pref = PreferenceTransactor::new(db.clone()); 37 - let repo = RepoTransactor::new(db, blobstore, did, keypair, resources.background_queue); 38 39 Self { record, repo, pref } 40 }
··· 34 35 let record = RecordTransactor::new(db.clone(), blobstore.clone()); 36 let pref = PreferenceTransactor::new(db.clone()); 37 + let repo = RepoTransactor::new( 38 + db, 39 + blobstore, 40 + did, 41 + keypair, 42 + resources.background_queue, 43 + None, 44 + ); 45 46 Self { record, repo, pref } 47 }
+1 -1
src/actor_store/actor_store_writer.rs
··· 1 use std::sync::Arc; 2 3 - use super::resources::ActorStoreResources; 4 use crate::SigningKey; 5 6 pub(crate) struct ActorStoreWriter {
··· 1 use std::sync::Arc; 2 3 + use super::{ActorDb, ActorStoreResources}; 4 use crate::SigningKey; 5 6 pub(crate) struct ActorStoreWriter {
+4
src/actor_store/blob/mod.rs
··· 2 3 mod background; 4 mod reader; 5 mod transactor; 6 7 pub(crate) use background::BackgroundQueue; 8 pub(crate) use reader::BlobReader; 9 pub(crate) use transactor::BlobTransactor;
··· 2 3 mod background; 4 mod reader; 5 + mod store; 6 mod transactor; 7 8 pub(crate) use background::BackgroundQueue; 9 pub(crate) use reader::BlobReader; 10 + pub(crate) use store::BlobStore; 11 + pub(crate) use store::BlobStorePlaceholder; 12 + pub(crate) use store::BlobStream; 13 pub(crate) use transactor::BlobTransactor;
+5 -22
src/actor_store/blob/reader.rs
··· 7 use atrium_repo::Cid; 8 use sqlx::Row; 9 10 - use crate::{ 11 - actor_store::ActorDb, 12 - repo::types::{BlobStore, BlobStoreTrait, BlobStream}, 13 - }; 14 15 /// Reader for blob data in the actor store. 16 pub(crate) struct BlobReader { 17 /// Database connection. 18 pub db: ActorDb, 19 /// BlobStore. 20 - pub blobstore: BlobStore, 21 } 22 23 impl BlobReader { 24 /// Create a new blob reader. 25 - pub(crate) fn new(db: ActorDb, blobstore: BlobStore) -> Self { 26 Self { db, blobstore } 27 } 28 ··· 94 &self, 95 cid: &Cid, 96 ) -> Result<Option<StatusAttrData>> { 97 - // const res = await this.db.db 98 - // .selectFrom('blob') 99 - // .select('takedownRef') 100 - // .where('cid', '=', cid.toString()) 101 - // .executeTakeFirst() 102 - // if (!res) return null 103 - // return res.takedownRef 104 - // ? { applied: true, ref: res.takedownRef } 105 - // : { applied: false } 106 let cid_str = cid.to_string(); 107 let result = sqlx::query!(r#"SELECT takedownRef FROM blob WHERE cid = ?"#, cid_str) 108 .fetch_optional(&self.db.pool) ··· 145 146 /// Get blobs referenced by a record. 147 pub(crate) async fn get_blobs_for_record(&self, record_uri: &str) -> Result<Vec<String>> { 148 - // const res = await this.db.db 149 - // .selectFrom('blob') 150 - // .innerJoin('record_blob', 'record_blob.blobCid', 'blob.cid') 151 - // .where('recordUri', '=', recordUri) 152 - // .select('blob.cid') 153 - // .execute() 154 - // return res.map((row) => row.cid) 155 let blobs = sqlx::query!( 156 r#"SELECT blob.cid FROM blob INNER JOIN record_blob ON record_blob.blobCid = blob.cid WHERE recordUri = ?"#, 157 record_uri
··· 7 use atrium_repo::Cid; 8 use sqlx::Row; 9 10 + use crate::actor_store::ActorDb; 11 + 12 + use super::{BlobStore, BlobStorePlaceholder, BlobStream}; 13 14 /// Reader for blob data in the actor store. 15 pub(crate) struct BlobReader { 16 /// Database connection. 17 pub db: ActorDb, 18 /// BlobStore. 19 + pub blobstore: BlobStorePlaceholder, 20 } 21 22 impl BlobReader { 23 /// Create a new blob reader. 24 + pub(crate) fn new(db: ActorDb, blobstore: BlobStorePlaceholder) -> Self { 25 Self { db, blobstore } 26 } 27 ··· 93 &self, 94 cid: &Cid, 95 ) -> Result<Option<StatusAttrData>> { 96 let cid_str = cid.to_string(); 97 let result = sqlx::query!(r#"SELECT takedownRef FROM blob WHERE cid = ?"#, cid_str) 98 .fetch_optional(&self.db.pool) ··· 135 136 /// Get blobs referenced by a record. 137 pub(crate) async fn get_blobs_for_record(&self, record_uri: &str) -> Result<Vec<String>> { 138 let blobs = sqlx::query!( 139 r#"SELECT blob.cid FROM blob INNER JOIN record_blob ON record_blob.blobCid = blob.cid WHERE recordUri = ?"#, 140 record_uri
+3 -8
src/actor_store/blob/transactor.rs
··· 8 use atrium_repo::Cid; 9 use futures::FutureExt; 10 use futures::future::BoxFuture; 11 use uuid::Uuid; 12 13 - use super::{BackgroundQueue, BlobReader}; 14 - use crate::{ 15 - actor_store::ActorDb, 16 - repo::{ 17 - block_map::sha256_raw_to_cid, 18 - types::{BlobStore, BlobStoreTrait as _, PreparedBlobRef, PreparedWrite, WriteOpAction}, 19 - }, 20 - }; 21 22 /// Blob metadata for a newly uploaded blob. 23 #[derive(Debug, Clone)]
··· 8 use atrium_repo::Cid; 9 use futures::FutureExt; 10 use futures::future::BoxFuture; 11 + use rsky_repo::types::{PreparedBlobRef, WriteOpAction}; 12 use uuid::Uuid; 13 14 + use super::{BackgroundQueue, BlobReader, BlobStore}; 15 + use crate::actor_store::{ActorDb, PreparedWrite, blob::BlobStore as _}; 16 17 /// Blob metadata for a newly uploaded blob. 18 #[derive(Debug, Clone)]
+2
src/actor_store/mod.rs
··· 8 mod blob; 9 mod db; 10 mod preference; 11 mod record; 12 mod repo; 13 ··· 17 pub(crate) use actor_store_transactor::ActorStoreTransactor; 18 pub(crate) use actor_store_writer::ActorStoreWriter; 19 pub(crate) use db::ActorDb;
··· 8 mod blob; 9 mod db; 10 mod preference; 11 + mod prepared_write; 12 mod record; 13 mod repo; 14 ··· 18 pub(crate) use actor_store_transactor::ActorStoreTransactor; 19 pub(crate) use actor_store_writer::ActorStoreWriter; 20 pub(crate) use db::ActorDb; 21 + pub(crate) use prepared_write::PreparedWrite;
+1
src/actor_store/repo/reader.rs
··· 2 3 use anyhow::Result; 4 use atrium_repo::Cid; 5 6 use super::sql_repo_reader::SqlRepoReader; 7 use crate::{
··· 2 3 use anyhow::Result; 4 use atrium_repo::Cid; 5 + use rsky_repo::storage::readable_blockstore::ReadableBlockstore as _; 6 7 use super::sql_repo_reader::SqlRepoReader; 8 use crate::{
-1
src/main.rs
··· 11 mod mmap; 12 mod oauth; 13 mod plc; 14 - mod repo; 15 mod storage; 16 #[cfg(test)] 17 mod tests;
··· 11 mod mmap; 12 mod oauth; 13 mod plc; 14 mod storage; 15 #[cfg(test)] 16 mod tests;