Alternative ATProto PDS implementation

prototype actor_store

+64 -15
src/actor_store/actor_store_reader.rs
··· 1 + use anyhow::Result; 1 2 use std::sync::Arc; 2 3 3 4 use super::{ 4 - ActorStoreTransactor, db::ActorDb, preference::PreferenceReader, record::RecordReader, 5 - repo::RepoReader, resources::ActorStoreResources, 5 + ActorStoreTransactor, actor_store_resources::ActorStoreResources, db::ActorDb, 6 + preference::PreferenceReader, record::RecordReader, 6 7 }; 7 8 use crate::SigningKey; 8 9 10 + /// Reader for the actor store. 9 11 pub(crate) struct ActorStoreReader { 10 - pub(crate) repo: RepoReader, 12 + /// DID of the actor. 13 + pub(crate) did: String, 14 + /// Record reader. 11 15 pub(crate) record: RecordReader, 16 + /// Preference reader. 12 17 pub(crate) pref: PreferenceReader, 18 + /// RepoReader placeholder - will be implemented later. 19 + pub(crate) repo: (), // Placeholder for RepoReader 20 + /// Function to get keypair. 21 + keypair_fn: Box<dyn Fn() -> Result<Arc<SigningKey>> + Send + Sync>, 22 + /// Database connection. 23 + db: ActorDb, 24 + /// Actor store resources. 25 + resources: ActorStoreResources, 13 26 } 14 27 15 28 impl ActorStoreReader { 29 + /// Create a new actor store reader. 16 30 pub(crate) fn new( 17 31 did: String, 18 32 db: ActorDb, 19 33 resources: ActorStoreResources, 20 - keypair: impl Fn() -> Result<Arc<SigningKey>>, 34 + keypair: impl Fn() -> Result<Arc<SigningKey>> + Send + Sync + 'static, 21 35 ) -> Self { 22 - let blobstore = resources.blobstore(&did); 36 + // Create readers 37 + let record = RecordReader::new(db.clone()); 38 + let pref = PreferenceReader::new(db.clone()); 23 39 24 - let repo = RepoReader::new(db.clone(), blobstore); 25 - let record = RecordReader::new(db.clone()); 26 - let pref = PreferenceReader::new(db); 27 - keypair(); 40 + // Store keypair function for later use 41 + let keypair_fn = Box::new(keypair); 42 + 43 + // Initial keypair call as in TypeScript implementation 44 + let _ = keypair_fn(); 45 + 46 + // For now, we use a placeholder for RepoReader 47 + // Real implementation will need to be added later 48 + let repo = (); 49 + 50 + Self { 51 + did, 52 + repo, 53 + record, 54 + pref, 55 + keypair_fn, 56 + db, 57 + resources, 58 + } 59 + } 28 60 29 - Self { repo, record, pref } 61 + /// Get the keypair for this actor. 62 + pub(crate) fn keypair(&self) -> Result<Arc<SigningKey>> { 63 + (self.keypair_fn)() 30 64 } 31 65 66 + /// Execute a transaction with the actor store. 32 67 pub(crate) async fn transact<T, F>(&self, f: F) -> Result<T> 33 68 where 34 69 F: FnOnce(ActorStoreTransactor) -> Result<T>, 35 70 { 36 - let keypair = self.keypair(); 37 - let db_txn = self.db.transaction().await?; 38 - let store = 39 - ActorStoreTransactor::new(self.did.clone(), db_txn, keypair, self.resources.clone()); 40 - f(store) 71 + let keypair = self.keypair()?; 72 + let did = self.did.clone(); 73 + let resources = self.resources.clone(); 74 + 75 + self.db 76 + .transaction_no_retry(move |tx| { 77 + // Create a transactor with the transaction 78 + let store = ActorStoreTransactor::new_with_transaction( 79 + did, 80 + tx, // Pass the transaction directly 81 + keypair.clone(), 82 + resources, 83 + ); 84 + 85 + // Execute user function 86 + f(store).map_err(|e| sqlx::Error::Custom(Box::new(e))) // Convert anyhow::Error to sqlx::Error 87 + }) 88 + .await 89 + .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e)) 41 90 } 42 91 }
+25
src/actor_store/actor_store_resources.rs
··· 1 + use crate::repo::types::BlobStore; 2 + 3 + use super::blob::BackgroundQueue; 4 + pub(crate) struct ActorStoreResources { 5 + pub(crate) blobstore: fn(did: String) -> BlobStore, 6 + pub(crate) background_queue: BackgroundQueue, 7 + pub(crate) reserved_key_dir: Option<String>, 8 + } 9 + impl ActorStoreResources { 10 + pub(crate) fn new( 11 + blobstore: fn(did: String) -> BlobStore, 12 + background_queue: BackgroundQueue, 13 + reserved_key_dir: Option<String>, 14 + ) -> Self { 15 + Self { 16 + blobstore, 17 + background_queue, 18 + reserved_key_dir, 19 + } 20 + } 21 + 22 + pub(crate) fn blobstore(&self, did: String) -> BlobStore { 23 + (self.blobstore)(did) 24 + } 25 + }
+26 -12
src/actor_store/actor_store_transactor.rs
··· 1 1 use std::sync::Arc; 2 2 3 - use super::{ActorDb, resources::ActorStoreResources}; 3 + use super::{ 4 + ActorDb, actor_store_resources::ActorStoreResources, preference::PreferenceTransactor, 5 + record::RecordTransactor, repo::RepoTransactor, 6 + }; 4 7 use crate::SigningKey; 5 8 9 + /// Actor store transactor for managing actor-related transactions. 6 10 pub(crate) struct ActorStoreTransactor { 7 - pub(crate) did: String, 8 - pub(crate) db: ActorDb, 9 - pub(crate) keypair: Arc<SigningKey>, 10 - pub(crate) resources: ActorStoreResources, 11 + /// Record transactor. 12 + pub(crate) record: RecordTransactor, 13 + /// Repo transactor. 14 + pub(crate) repo: RepoTransactor, 15 + /// Preference transactor. 16 + pub(crate) pref: PreferenceTransactor, 11 17 } 12 - 13 18 impl ActorStoreTransactor { 19 + /// Creates a new actor store transactor. 20 + /// 21 + /// # Arguments 22 + /// 23 + /// * `did` - The DID of the actor. 24 + /// * `db` - The database connection. 25 + /// * `keypair` - The signing keypair. 26 + /// * `resources` - The actor store resources. 14 27 pub(crate) fn new( 15 28 did: String, 16 29 db: ActorDb, 17 30 keypair: Arc<SigningKey>, 18 31 resources: ActorStoreResources, 19 32 ) -> Self { 20 - Self { 21 - did, 22 - db, 23 - keypair, 24 - resources, 25 - } 33 + let blobstore = resources.blobstore(did.clone()); 34 + 35 + let record = RecordTransactor::new(db.clone(), blobstore.clone()); 36 + let pref = PreferenceTransactor::new(db.clone()); 37 + let repo = RepoTransactor::new(db, blobstore, did, keypair, resources.background_queue); 38 + 39 + Self { record, repo, pref } 26 40 } 27 41 }
+5 -1
src/actor_store/blob/transactor.rs
··· 167 167 } 168 168 169 169 /// Process blobs for a repository write operation. 170 - pub(crate) async fn process_write_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> { 170 + pub(crate) async fn process_write_blobs( 171 + &self, 172 + _rev: &String, // Typescript impl declares rev but never uses it 173 + writes: Vec<PreparedWrite>, 174 + ) -> Result<()> { 171 175 self.delete_dereferenced_blobs(writes.clone(), false) 172 176 .await 173 177 .context("failed to delete dereferenced blobs")?;
+2 -2
src/actor_store/mod.rs
··· 2 2 3 3 mod actor_store; 4 4 mod actor_store_reader; 5 + mod actor_store_resources; 5 6 mod actor_store_transactor; 6 7 mod actor_store_writer; 7 8 mod blob; ··· 9 10 mod preference; 10 11 mod record; 11 12 mod repo; 12 - mod resources; 13 13 14 14 pub(crate) use actor_store::ActorStore; 15 15 pub(crate) use actor_store_reader::ActorStoreReader; 16 + pub(crate) use actor_store_resources::ActorStoreResources; 16 17 pub(crate) use actor_store_transactor::ActorStoreTransactor; 17 18 pub(crate) use actor_store_writer::ActorStoreWriter; 18 19 pub(crate) use db::ActorDb; 19 - pub(crate) use resources::ActorStoreResources;
+13 -9
src/actor_store/repo/reader.rs
··· 5 5 6 6 use super::sql_repo_reader::SqlRepoReader; 7 7 use crate::{ 8 - actor_store::{blob::BlobReader, record::RecordReader}, 9 - repo::block_map::BlockMap, 8 + actor_store::{ActorDb, blob::BlobReader, record::RecordReader}, 9 + repo::{block_map::BlockMap, types::BlobStore}, 10 10 }; 11 11 12 12 /// Reader for repository data in the actor store. ··· 19 19 20 20 impl RepoReader { 21 21 /// Create a new repository reader. 22 - // pub(crate) fn new(db: ActorDb, did: String, blob_config: BlobConfig) -> Self { 23 - // Self { 24 - // storage: SqlRepoReader::new(db.clone(), did.clone()), 25 - // db, 26 - // did, 27 - // } 28 - // } 22 + pub(crate) fn new(db: ActorDb, blobstore: BlobStore) -> Self { 23 + let blob = BlobReader::new(db.clone(), blobstore); 24 + let record = RecordReader::new(db.clone()); 25 + let storage = SqlRepoReader::new(db); 26 + 27 + Self { 28 + blob, 29 + record, 30 + storage, 31 + } 32 + } 29 33 30 34 /// Get event data for synchronization. 31 35 pub(crate) async fn get_sync_event_data(&self) -> Result<SyncEventData> {
+22 -22
src/actor_store/repo/sql_repo_reader.rs
··· 5 5 Cid, 6 6 blockstore::{AsyncBlockStoreRead, Error as BlockstoreError}, 7 7 }; 8 + use rsky_repo::storage::readable_blockstore::ReadableBlockstore; 8 9 use sqlx::Row; 9 10 use std::str::FromStr; 10 11 use std::sync::Arc; ··· 21 22 pub cache: Arc<RwLock<BlockMap>>, 22 23 /// Database connection. 23 24 pub db: ActorDb, 24 - /// DID of the repository owner. 25 - pub did: String, 26 25 } 27 26 28 27 /// Repository root with CID and revision. ··· 35 34 36 35 impl SqlRepoReader { 37 36 /// Create a new SQL repository reader. 38 - pub(crate) fn new(db: ActorDb, did: String) -> Self { 37 + pub(crate) fn new(db: ActorDb) -> Self { 39 38 Self { 40 39 cache: Arc::new(RwLock::new(BlockMap::new())), 41 40 db, 42 - did, 43 41 } 44 42 } 45 - 46 43 // async getRoot(): Promise<CID> { 47 - // async has(cid: CID): Promise<boolean> { 48 44 // async getCarStream(since?: string) { 49 45 // async *iterateCarBlocks(since?: string): AsyncIterable<CarBlock> { 50 46 // async getBlockRange(since?: string, cursor?: RevCursor) { 51 47 // async countBlocks(): Promise<number> { 52 48 // async destroy(): Promise<void> { 53 49 54 - pub(crate) async fn get_bytes(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { 50 + /// Get the detailed root information. 51 + pub(crate) async fn get_root_detailed(&self) -> Result<RootInfo> { 52 + let row = sqlx::query!(r#"SELECT cid, rev FROM repo_root"#) 53 + .fetch_one(&self.db.pool) 54 + .await 55 + .context("failed to fetch repo root")?; 56 + 57 + Ok(RootInfo { 58 + cid: Cid::from_str(&row.cid)?, 59 + rev: row.rev, 60 + }) 61 + } 62 + } 63 + 64 + impl ReadableBlockstore for SqlRepoReader { 65 + async fn get_bytes(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { 55 66 // First check the cache 56 67 { 57 68 let cache_guard = self.cache.read().await; ··· 78 89 Ok(content) 79 90 } 80 91 81 - /// Get the detailed root information. 82 - pub(crate) async fn get_root_detailed(&self) -> Result<RootInfo> { 83 - let did = self.did.clone(); 84 - let row = sqlx::query!(r#"SELECT cid, rev FROM repo_root WHERE did = ?"#, did) 85 - .fetch_one(&self.db.pool) 86 - .await 87 - .context("failed to fetch repo root")?; 88 - 89 - Ok(RootInfo { 90 - cid: Cid::from_str(&row.cid)?, 91 - rev: row.rev, 92 - }) 92 + async fn has(&self, cid: &Cid) -> Result<bool> { 93 + self.get_bytes(cid).await.map(|bytes| bytes.is_some()) 93 94 } 94 95 95 96 /// Get blocks from the database. 96 - pub(crate) async fn get_blocks(&self, cids: Vec<Cid>) -> Result<BlocksAndMissing> { 97 + async fn get_blocks(&self, cids: Vec<Cid>) -> Result<BlocksAndMissing> { 97 98 let cached = { self.cache.write().await.get_many(cids)? }; // TODO: use read lock? 98 99 99 100 if cached.missing.is_empty() { ··· 118 119 119 120 let query = format!( 120 121 "SELECT cid, content FROM repo_block 121 - WHERE did = ? AND cid IN ({}) 122 + WHERE cid IN ({}) 122 123 ORDER BY cid", 123 124 placeholders 124 125 ); 125 126 126 127 let mut query_builder = sqlx::query(&query); 127 - query_builder = query_builder.bind(&self.did); 128 128 for cid in chunk { 129 129 query_builder = query_builder.bind(cid); 130 130 }
+3 -4
src/actor_store/repo/sql_repo_transactor.rs
··· 23 23 /// Cache for blocks. 24 24 pub cache: BlockMap, 25 25 /// Current timestamp. 26 - pub now: String, 26 + pub now: Option<String>, 27 27 } 28 28 29 29 impl SqlRepoTransactor { 30 30 /// Create a new SQL repository transactor. 31 - pub(crate) fn new(db: ActorDb, did: String) -> Self { 32 - let now = chrono::Utc::now().to_rfc3339(); 31 + pub(crate) fn new(db: ActorDb, did: String, now: Option<String>) -> Self { 33 32 Self { 34 33 reader: SqlRepoReader::new(db, did), 35 34 cache: BlockMap::new(), ··· 77 76 } 78 77 79 78 /// Apply a commit to the repository. 80 - pub(crate) async fn apply_commit(&self, commit: CommitData, is_create: bool) -> Result<()> { 79 + pub(crate) async fn apply_commit(&self, commit: &CommitData, is_create: bool) -> Result<()> { 81 80 let is_create = is_create || false; 82 81 let removed_cids_list = commit.removed_cids.to_list(); 83 82
+274 -192
src/actor_store/repo/transactor.rs
··· 1 1 //! Repository transactor for the actor store. 2 2 3 - use std::str::FromStr; 4 - 5 - use anyhow::Result; 3 + use anyhow::{Context as _, Result}; 6 4 use atrium_repo::Cid; 7 5 use rsky_syntax::aturi::AtUri; 6 + use std::sync::Arc; 8 7 9 8 use crate::{ 10 - actor_store::ActorDb, repo::types::{CommitAction, CommitDataWithOps, CommitOp, PreparedWrite, WriteOpAction}, SigningKey 9 + SigningKey, 10 + actor_store::{ 11 + ActorDb, 12 + blob::{BackgroundQueue, BlobTransactor}, 13 + record::RecordTransactor, 14 + repo::{reader::RepoReader, sql_repo_transactor::SqlRepoTransactor}, 15 + resources::ActorStoreResources, 16 + }, 17 + repo::{ 18 + Repo, 19 + block_map::BlockMap, 20 + types::{ 21 + BlobStore, CommitAction, CommitDataWithOps, CommitOp, PreparedCreate, PreparedWrite, 22 + WriteOpAction, format_data_key, 23 + }, 24 + }, 11 25 }; 12 26 13 - use super::{reader::RepoReader, sql_repo_transactor::SqlRepoTransactor}; 14 - 15 - /// Transactor for repository operations. 27 + /// Repository transactor for the actor store. 16 28 pub(crate) struct RepoTransactor { 17 29 /// The inner reader. 18 - pub(crate) reader: RepoReader, 19 - /// 30 + pub reader: RepoReader, 31 + /// BlobTransactor for handling blobs. 32 + pub blob: BlobTransactor, 33 + /// RecordTransactor for handling records. 34 + pub record: RecordTransactor, 35 + /// SQL repository transactor. 36 + pub storage: SqlRepoTransactor, 20 37 } 21 - 22 38 23 39 impl RepoTransactor { 24 40 /// Create a new repository transactor. 25 41 pub(crate) fn new( 26 42 db: ActorDb, 43 + blobstore: BlobStore, 27 44 did: String, 28 - signing_key: SigningKey, 29 - blob_config: crate::config::BlobConfig, 45 + signing_key: Arc<SigningKey>, 46 + background_queue: BackgroundQueue, 47 + now: Option<String>, 30 48 ) -> Self { 49 + // Create a new RepoReader 50 + let reader = RepoReader::new(db.clone(), blobstore.clone(), did.clone(), signing_key); 51 + 52 + // Create a new BlobTransactor 53 + let blob = BlobTransactor::new(db.clone(), blobstore.clone(), background_queue); 54 + 55 + // Create a new RecordTransactor 56 + let record = RecordTransactor::new(db.clone(), blobstore); 57 + 58 + // Create a new SQL repository transactor 59 + let storage = SqlRepoTransactor::new(db, did.clone(), now); 60 + 31 61 Self { 32 - reader: RepoReader::new(db.clone(), did.clone(), blob_config), 33 - storage: SqlRepoTransactor::new(db, did.clone()), 34 - did, 35 - signing_key, 62 + reader, 63 + blob, 64 + record, 65 + storage, 36 66 } 37 67 } 38 68 39 - /// Load the repository if it exists. 40 - // pub async fn maybe_load_repo( 41 - // &self, 42 - // ) -> Result<Option<Repository<impl AsyncBlockStoreRead + AsyncBlockStoreWrite>>> { 43 - // todo!("Implement maybe_load_repo") 44 - // } 69 + /// Try to load a repository. 70 + pub(crate) async fn maybe_load_repo(&self) -> Result<Option<Repo>> { 71 + // Query the repository root 72 + let root = sqlx::query!("SELECT cid FROM repo_root LIMIT 1") 73 + .fetch_optional(&self.db.pool) 74 + .await 75 + .context("failed to query repo root")?; 76 + 77 + // If found, load the repo 78 + if let Some(row) = root { 79 + let cid = Cid::try_from(&row.cid)?; 80 + let repo = Repo::load(&self.storage, cid).await?; 81 + Ok(Some(repo)) 82 + } else { 83 + Ok(None) 84 + } 85 + } 45 86 46 - /// Create a new repository. 87 + /// Create a new repository with the given writes. 47 88 pub(crate) async fn create_repo( 48 89 &self, 49 - writes: Vec<PreparedWrite>, 90 + writes: Vec<PreparedCreate>, 50 91 ) -> Result<CommitDataWithOps> { 51 - todo!("Implement create_repo") 92 + // Assert we're in a transaction 93 + self.db.assert_transaction()?; 94 + 95 + // Convert writes to operations 96 + let ops = writes.iter().map(|w| create_write_to_op(w)).collect(); 97 + 98 + // Format the initial commit 99 + let commit = 100 + Repo::format_init_commit(&self.storage, &self.did, &self.signing_key, ops).await?; 101 + 102 + // Apply the commit, index the writes, and process blobs in parallel 103 + let results = futures::future::join3( 104 + self.storage.apply_commit(&commit, true), 105 + self.index_writes(&writes, &commit.rev), 106 + self.blob.process_write_blobs(&commit.rev, &writes), 107 + ) 108 + .await; 109 + 110 + // Check for errors in each parallel task 111 + results.0?; 112 + results.1?; 113 + results.2?; 114 + 115 + // Create commit operations 116 + let ops = writes 117 + .iter() 118 + .map(|w| CommitOp { 119 + action: CommitAction::Create, 120 + path: format_data_key(&w.uri.collection, &w.uri.rkey), 121 + cid: Some(w.cid), 122 + prev: None, 123 + }) 124 + .collect(); 125 + 126 + // Return the commit data with operations 127 + Ok(CommitDataWithOps { 128 + commit_data: commit, 129 + ops, 130 + prev_data: None, 131 + }) 52 132 } 53 133 54 134 /// Process writes to the repository. ··· 57 137 writes: Vec<PreparedWrite>, 58 138 swap_commit_cid: Option<Cid>, 59 139 ) -> Result<CommitDataWithOps> { 60 - // Validate parameters 140 + // Assert we're in a transaction 141 + self.db.assert_transaction()?; 142 + 143 + // Check write limit 61 144 if writes.len() > 200 { 62 - return Err(anyhow::anyhow!("Too many writes. Max: 200").into()); 145 + return Err(anyhow::anyhow!("Too many writes. Max: 200")); 63 146 } 64 147 65 - // Format the commit (creates blocks and structures the operations) 66 - let commit = self.format_commit(writes.clone(), swap_commit_cid).await?; 148 + // Format the commit 149 + let commit = self.format_commit(writes, swap_commit_cid).await?; 67 150 68 - // Check commit size (prevent large commits) 69 - if commit.commit_data.relevant_blocks.byte_size()? > 2000000 { 70 - return Err(anyhow::anyhow!("Too many writes. Max event size: 2MB").into()); 151 + // Check commit size limit (2MB) 152 + if commit.commit_data.relevant_blocks.byte_size()? > 2_000_000 { 153 + return Err(anyhow::anyhow!("Too many writes. Max event size: 2MB")); 71 154 } 72 155 73 - // Execute these operations in parallel for better performance 74 - tokio::try_join!( 75 - // Persist the commit to repo storage 76 - self.storage.apply_commit(commit.commit_data.clone(), true), 77 - // Send to indexing 78 - self.index_writes(writes.clone(), &commit.commit_data.rev), 79 - // Process blobs from writes 80 - self.process_write_blobs(&commit.commit_data.rev, &writes), 81 - )?; 156 + // Apply the commit, index the writes, and process blobs in parallel 157 + let results = futures::future::join3( 158 + self.storage.apply_commit(&commit.commit_data, false), 159 + self.index_writes(writes, &commit.commit_data.rev), 160 + self.blob 161 + .process_write_blobs(&commit.commit_data.rev, writes), 162 + ) 163 + .await; 164 + 165 + // Check for errors in each parallel task 166 + results.0?; 167 + results.1?; 168 + results.2?; 82 169 83 170 Ok(commit) 84 171 } 85 172 86 - /// Format a commit for writing. 173 + /// Format a commit for the given writes. 87 174 pub(crate) async fn format_commit( 88 175 &self, 89 176 writes: Vec<PreparedWrite>, 90 177 swap_commit: Option<Cid>, 91 178 ) -> Result<CommitDataWithOps> { 92 - // Get current repository root 179 + // Get the current root 93 180 let curr_root = self.storage.get_root_detailed().await?; 94 - if curr_root.cid.is_nil() { 95 - return Err(anyhow::anyhow!("No repo root found for {}", self.did).into()); 181 + if curr_root.is_none() { 182 + return Err(anyhow::anyhow!("No repo root found for {}", self.did)); 96 183 } 97 184 98 - // Check swap commit if provided 99 - if let Some(swap_cid) = swap_commit { 100 - if !curr_root.cid.equals(swap_cid) { 185 + let curr_root = curr_root.unwrap(); 186 + 187 + // Check commit swap if requested 188 + if let Some(swap) = swap_commit { 189 + if curr_root.cid != swap { 101 190 return Err(anyhow::anyhow!( 102 - "Bad commit swap: expected {}, got {}", 103 - swap_cid, 104 - curr_root.cid 105 - ) 106 - .into()); 191 + "Bad commit swap: current={}, expected={}", 192 + curr_root.cid, 193 + swap 194 + )); 107 195 } 108 196 } 109 197 110 - // Cache last commit for performance 198 + // Cache the revision for better performance 111 199 self.storage.cache_rev(&curr_root.rev).await?; 112 200 201 + // Prepare collections for tracking changes 113 202 let mut new_record_cids = Vec::new(); 114 203 let mut del_and_update_uris = Vec::new(); 115 204 let mut commit_ops = Vec::new(); 116 205 117 - // Process each write to create commit operations 118 - for write in &writes { 119 - let uri_str = write.uri().clone(); 120 - let uri = AtUri::try_from(uri_str.as_str())?; 206 + // Process each write 207 + for write in writes { 208 + let action = &write.action; 209 + let uri = &write.uri; 210 + let swap_cid = write.swap_cid; 121 211 122 - match write.action() { 123 - WriteOpAction::Create | WriteOpAction::Update => { 124 - if let Some(cid) = write.cid() { 125 - new_record_cids.push(cid); 126 - } 127 - } 128 - _ => {} 212 + // Track new record CIDs 213 + if *action != WriteOpAction::Delete { 214 + new_record_cids.push(write.cid); 129 215 } 130 216 131 - if write.action() != &WriteOpAction::Create { 217 + // Track deleted/updated URIs 218 + if *action != WriteOpAction::Create { 132 219 del_and_update_uris.push(uri.clone()); 133 220 } 134 221 135 - // Get current record if it exists 136 - let record = self.record.get_record(&uri.to_string(), None, true).await?; 137 - let curr_record = record.map(|r| Cid::from_str(&r.cid).ok()).flatten(); 222 + // Get the current record if it exists 223 + let record = self.record.get_record(uri, None, true).await?; 224 + let curr_record = record.as_ref().map(|r| Cid::try_from(&r.cid).unwrap()); 138 225 139 - // Create the operation 140 - let path = format!("{}/{}", uri.get_collection(), uri.get_rkey()); 226 + // Create commit operation 141 227 let mut op = CommitOp { 142 - action: match write.action() { 143 - WriteOpAction::Create => CommitAction::Create, 144 - WriteOpAction::Update => CommitAction::Update, 145 - WriteOpAction::Delete => CommitAction::Delete, 146 - }, 147 - path: path.clone(), 148 - cid: match write.action() { 149 - WriteOpAction::Delete => None, 150 - _ => write.cid(), 228 + action: action.clone(), 229 + path: format_data_key(&uri.collection, &uri.rkey), 230 + cid: if *action == WriteOpAction::Delete { 231 + None 232 + } else { 233 + Some(write.cid) 151 234 }, 152 - prev: curr_record.clone(), 235 + prev: curr_record, 153 236 }; 154 237 155 - // Validate swap consistency 156 - if let Some(swap_cid) = write.swap_cid() { 157 - match write.action() { 158 - WriteOpAction::Create if swap_cid.is_some() => { 238 + commit_ops.push(op); 239 + 240 + // Validate swap_cid conditions 241 + if swap_cid.is_some() { 242 + match action { 243 + WriteOpAction::Create if swap_cid != Some(None) => { 244 + return Err(anyhow::anyhow!( 245 + "Bad record swap: there should be no current record for a create" 246 + )); 247 + } 248 + WriteOpAction::Update if swap_cid == Some(None) => { 159 249 return Err(anyhow::anyhow!( 160 - "Bad record swap: cannot provide swap CID for create" 161 - ) 162 - .into()); 250 + "Bad record swap: there should be a current record for an update" 251 + )); 163 252 } 164 - WriteOpAction::Update | WriteOpAction::Delete if swap_cid.is_none() => { 253 + WriteOpAction::Delete if swap_cid == Some(None) => { 165 254 return Err(anyhow::anyhow!( 166 - "Bad record swap: must provide swap CID for update/delete" 167 - ) 168 - .into()); 255 + "Bad record swap: there should be a current record for a delete" 256 + )); 169 257 } 170 258 _ => {} 171 259 } 172 260 173 - if swap_cid.is_some() 174 - && curr_record.is_some() 175 - && !curr_record.unwrap().equals(swap_cid.unwrap()) 176 - { 177 - return Err(anyhow::anyhow!( 178 - "Bad record swap: expected {}, got {:?}", 179 - swap_cid.unwrap(), 180 - curr_record 181 - ) 182 - .into()); 261 + if let Some(Some(swap)) = swap_cid { 262 + if curr_record.is_some() && curr_record != Some(swap) { 263 + return Err(anyhow::anyhow!( 264 + "Bad record swap: current={:?}, expected={}", 265 + curr_record, 266 + swap 267 + )); 268 + } 183 269 } 184 270 } 185 - 186 - commit_ops.push(op); 187 271 } 188 272 189 - // Load the current repository and prepare for modification 190 - let repo = crate::storage::open_repo(&self.storage.reader.config, &self.did, curr_root.cid) 191 - .await?; 192 - let prev_data = repo.commit().data; 273 + // Load the repo 274 + let repo = Repo::load(&self.storage, curr_root.cid).await?; 275 + let prev_data = repo.commit.data.clone(); 193 276 194 - // Convert PreparedWrites to RecordWriteOps 195 - let write_ops = writes 196 - .iter() 197 - .map(|w| write_to_op(w.clone())) 198 - .collect::<Result<Vec<_>>>()?; 277 + // Convert writes to ops 278 + let write_ops = writes.iter().map(|w| write_to_op(w)).collect(); 199 279 200 - // Format the new commit 280 + // Format the commit 201 281 let commit = repo.format_commit(write_ops, &self.signing_key).await?; 202 282 203 - // Find blocks referenced by other records 283 + // Find blocks that would be deleted but are referenced by another record 204 284 let dupe_record_cids = self 205 285 .get_duplicate_record_cids(&commit.removed_cids.to_list(), &del_and_update_uris) 206 286 .await?; 207 287 208 - // Remove duplicated CIDs from the removal list 209 - for cid in dupe_record_cids { 210 - commit.removed_cids.delete(cid); 288 + // Remove duplicates from removed_cids 289 + for cid in &dupe_record_cids { 290 + commit.removed_cids.delete(*cid); 211 291 } 212 292 213 - // Ensure all necessary blocks are included 214 - let new_record_blocks = commit.relevant_blocks.get_many(new_record_cids)?; 293 + // Find blocks that are relevant to ops but not included in diff 294 + let new_record_blocks = commit.relevant_blocks.get_many(&new_record_cids)?; 215 295 if !new_record_blocks.missing.is_empty() { 216 - let missing_blocks = self.storage.get_blocks(new_record_blocks.missing).await?; 296 + let missing_blocks = self 297 + .storage 298 + .reader 299 + .get_blocks(&new_record_blocks.missing) 300 + .await?; 217 301 commit.relevant_blocks.add_map(missing_blocks.blocks)?; 218 302 } 219 303 ··· 224 308 }) 225 309 } 226 310 227 - /// Index writes in the repository. 228 - pub(crate) async fn index_writes(&self, writes: Vec<PreparedWrite>, rev: &str) -> Result<()> { 229 - // Process each write for indexing 230 - for write in writes { 231 - let uri_str = write.uri().clone(); 232 - let uri = AtUri::try_from(uri_str.as_str())?; 311 + /// Index writes to the database. 312 + pub(crate) async fn index_writes(&self, writes: &[PreparedWrite], rev: &str) -> Result<()> { 313 + // Assert we're in a transaction 314 + self.db.assert_transaction()?; 233 315 234 - match write.action() { 316 + // Process each write in parallel 317 + let futures = writes.iter().map(|write| async move { 318 + match write.action { 235 319 WriteOpAction::Create | WriteOpAction::Update => { 236 - if let PreparedWrite::Create(w) | PreparedWrite::Update(w) = write { 237 - self.record 238 - .index_record( 239 - uri, 240 - w.cid, 241 - Some(w.record), 242 - write.action().clone(), 243 - rev, 244 - None, // Use current timestamp 245 - ) 246 - .await?; 247 - } 320 + self.record 321 + .index_record( 322 + &write.uri, 323 + &write.cid, 324 + &write.record, 325 + &write.action, 326 + rev, 327 + &self.now, 328 + ) 329 + .await 248 330 } 249 - WriteOpAction::Delete => { 250 - self.record.delete_record(&uri).await?; 251 - } 331 + WriteOpAction::Delete => self.record.delete_record(&write.uri).await, 252 332 } 253 - } 333 + }); 334 + 335 + // Wait for all indexing operations to complete 336 + futures::future::try_join_all(futures).await?; 254 337 255 338 Ok(()) 256 339 } 257 340 258 - /// Get duplicate record CIDs. 341 + /// Get record CIDs that are duplicated elsewhere in the repository. 259 342 pub(crate) async fn get_duplicate_record_cids( 260 343 &self, 261 344 cids: &[Cid], ··· 265 348 return Ok(Vec::new()); 266 349 } 267 350 351 + // Convert CIDs and URIs to strings 268 352 let cid_strs: Vec<String> = cids.iter().map(|c| c.to_string()).collect(); 269 353 let uri_strs: Vec<String> = touched_uris.iter().map(|u| u.to_string()).collect(); 270 354 271 - // Find records that have the same CIDs but weren't touched in this operation 272 - let duplicates = sqlx::query_scalar!( 273 - r#" 274 - SELECT cid FROM record 275 - WHERE cid IN (SELECT unnest($1::text[])) 276 - AND uri NOT IN (SELECT unnest($2::text[])) 277 - "#, 278 - &cid_strs, 279 - &uri_strs 355 + // Query the database for duplicates 356 + let rows = sqlx::query!( 357 + "SELECT cid FROM record WHERE cid IN (?) AND uri NOT IN (?)", 358 + cid_strs.join(","), 359 + uri_strs.join(",") 280 360 ) 281 - .fetch_all(&self.reader.db) 282 - .await?; 361 + .fetch_all(&self.db.pool) 362 + .await 363 + .context("failed to query duplicate record CIDs")?; 283 364 284 - // Convert string CIDs back to Cid objects 285 - let dupe_cids = duplicates 365 + // Convert to CIDs 366 + let result = rows 286 367 .into_iter() 287 - .filter_map(|cid_str| Cid::from_str(&cid_str).ok()) 288 - .collect(); 368 + .map(|row| Cid::try_from(&row.cid)) 369 + .collect::<Result<Vec<_>, _>>()?; 289 370 290 - Ok(dupe_cids) 371 + Ok(result) 291 372 } 292 - 293 - pub(crate) async fn process_write_blobs( 294 - &self, 295 - rev: &str, 296 - writes: &[PreparedWrite], 297 - ) -> Result<()> { 298 - // First handle deletions or updates 299 - let uris: Vec<String> = writes 300 - .iter() 301 - .filter(|w| matches!(w.action(), WriteOpAction::Delete | WriteOpAction::Update)) 302 - .map(|w| w.uri().clone()) 303 - .collect(); 373 + } 304 374 305 - if !uris.is_empty() { 306 - // Remove blob references for deleted/updated records 307 - self.blob.delete_dereferenced_blobs(&uris).await?; 308 - } 375 + // Helper functions 309 376 310 - // Process each blob in each write 311 - for write in writes { 312 - if let PreparedWrite::Create(w) | PreparedWrite::Update(w) = write { 313 - for blob in &w.blobs { 314 - // Verify and make permanent 315 - self.blob.verify_blob_and_make_permanent(blob).await?; 316 - 317 - // Associate blob with record 318 - self.blob 319 - .associate_blob(&blob.cid.to_string(), write.uri()) 320 - .await?; 321 - } 322 - } 323 - } 377 + /// Convert a PreparedCreate to an operation. 378 + fn create_write_to_op(write: &PreparedCreate) -> WriteOp { 379 + WriteOp { 380 + action: WriteOpAction::Create, 381 + collection: write.uri.collection.clone(), 382 + rkey: write.uri.rkey.clone(), 383 + record: write.record.clone(), 384 + } 385 + } 324 386 325 - Ok(()) 387 + /// Convert a PreparedWrite to an operation. 388 + fn write_to_op(write: &PreparedWrite) -> WriteOp { 389 + match write.action { 390 + WriteOpAction::Create => WriteOp { 391 + action: WriteOpAction::Create, 392 + collection: write.uri.collection.clone(), 393 + rkey: write.uri.rkey.clone(), 394 + record: write.record.clone(), 395 + }, 396 + WriteOpAction::Update => WriteOp { 397 + action: WriteOpAction::Update, 398 + collection: write.uri.collection.clone(), 399 + rkey: write.uri.rkey.clone(), 400 + record: write.record.clone(), 401 + }, 402 + WriteOpAction::Delete => WriteOp { 403 + action: WriteOpAction::Delete, 404 + collection: write.uri.collection.clone(), 405 + rkey: write.uri.rkey.clone(), 406 + record: None, 407 + }, 326 408 } 327 409 }
-20
src/actor_store/resources.rs
··· 1 - 2 - use std::sync::Arc; 3 - 4 - use crate::config::{BlobConfig, RepoConfig}; 5 - 6 - pub(crate) struct ActorStoreResources { 7 - pub(crate) config: RepoConfig, 8 - pub(crate) blob_config: BlobConfig, 9 - pub(crate) background_queue: Arc<()>, // Placeholder 10 - } 11 - 12 - impl Clone for ActorStoreResources { 13 - fn clone(&self) -> Self { 14 - Self { 15 - config: self.config.clone(), 16 - blob_config: self.blob_config.clone(), 17 - background_queue: self.background_queue.clone(), 18 - } 19 - } 20 - }
+1
src/repo/mod.rs
··· 1 1 pub(crate) mod block_map; 2 + pub(crate) mod storage; 2 3 pub(crate) mod types;
+131
src/repo/storage/mod.rs
··· 1 + //! ReadableBlockstore for managing blocks in the repository. 2 + 3 + use crate::repo::block_map::{BlockMap, MissingBlockError}; 4 + use crate::repo::util::{cbor_to_lex_record, parse_obj_by_def}; 5 + use anyhow::{Context, Result}; 6 + use atrium_repo::Cid; 7 + use std::collections::HashMap; 8 + use std::sync::Arc; 9 + 10 + /// Trait for a readable blockstore. 11 + pub(crate) trait ReadableBlockstore { 12 + /// Get the raw bytes for a given CID. 13 + fn get_bytes(&self, cid: &Cid) -> Result<Option<Vec<u8>>>; 14 + 15 + /// Check if a block exists for a given CID. 16 + fn has(&self, cid: &Cid) -> Result<bool>; 17 + 18 + /// Get multiple blocks for a list of CIDs. 19 + fn get_blocks(&self, cids: &[Cid]) -> Result<(BlockMap, Vec<Cid>)>; 20 + 21 + /// Attempt to read an object by CID and definition. 22 + fn attempt_read<T>(&self, cid: &Cid, def: &str) -> Result<Option<(T, Vec<u8>)>> 23 + where 24 + T: serde::de::DeserializeOwned; 25 + 26 + /// Read an object and its raw bytes by CID and definition. 27 + fn read_obj_and_bytes<T>(&self, cid: &Cid, def: &str) -> Result<(T, Vec<u8>)> 28 + where 29 + T: serde::de::DeserializeOwned; 30 + 31 + /// Read an object by CID and definition. 32 + fn read_obj<T>(&self, cid: &Cid, def: &str) -> Result<T> 33 + where 34 + T: serde::de::DeserializeOwned; 35 + 36 + /// Attempt to read a record by CID. 37 + fn attempt_read_record(&self, cid: &Cid) -> Result<Option<RepoRecord>>; 38 + 39 + /// Read a record by CID. 40 + fn read_record(&self, cid: &Cid) -> Result<RepoRecord>; 41 + } 42 + 43 + /// Concrete implementation of the ReadableBlockstore. 44 + pub(crate) struct InMemoryBlockstore { 45 + blocks: HashMap<Cid, Vec<u8>>, 46 + } 47 + 48 + impl InMemoryBlockstore { 49 + /// Create a new in-memory blockstore. 50 + pub fn new() -> Self { 51 + Self { 52 + blocks: HashMap::new(), 53 + } 54 + } 55 + 56 + /// Add a block to the blockstore. 57 + pub fn add_block(&mut self, cid: Cid, bytes: Vec<u8>) { 58 + self.blocks.insert(cid, bytes); 59 + } 60 + } 61 + 62 + impl ReadableBlockstore for InMemoryBlockstore { 63 + fn get_bytes(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { 64 + Ok(self.blocks.get(cid).cloned()) 65 + } 66 + 67 + fn has(&self, cid: &Cid) -> Result<bool> { 68 + Ok(self.blocks.contains_key(cid)) 69 + } 70 + 71 + fn get_blocks(&self, cids: &[Cid]) -> Result<(BlockMap, Vec<Cid>)> { 72 + let mut blocks = BlockMap::new(); 73 + let mut missing = Vec::new(); 74 + 75 + for cid in cids { 76 + if let Some(bytes) = self.blocks.get(cid) { 77 + blocks.insert(*cid, bytes.clone()); 78 + } else { 79 + missing.push(*cid); 80 + } 81 + } 82 + 83 + Ok((blocks, missing)) 84 + } 85 + 86 + fn attempt_read<T>(&self, cid: &Cid, def: &str) -> Result<Option<(T, Vec<u8>)>> 87 + where 88 + T: serde::de::DeserializeOwned, 89 + { 90 + if let Some(bytes) = self.get_bytes(cid)? { 91 + let obj = parse_obj_by_def(&bytes, cid, def)?; 92 + Ok(Some((obj, bytes))) 93 + } else { 94 + Ok(None) 95 + } 96 + } 97 + 98 + fn read_obj_and_bytes<T>(&self, cid: &Cid, def: &str) -> Result<(T, Vec<u8>)> 99 + where 100 + T: serde::de::DeserializeOwned, 101 + { 102 + if let Some((obj, bytes)) = self.attempt_read(cid, def)? { 103 + Ok((obj, bytes)) 104 + } else { 105 + Err(MissingBlockError::new(*cid, def).into()) 106 + } 107 + } 108 + 109 + fn read_obj<T>(&self, cid: &Cid, def: &str) -> Result<T> 110 + where 111 + T: serde::de::DeserializeOwned, 112 + { 113 + let (obj, _) = self.read_obj_and_bytes(cid, def)?; 114 + Ok(obj) 115 + } 116 + 117 + fn attempt_read_record(&self, cid: &Cid) -> Result<Option<RepoRecord>> { 118 + match self.read_record(cid) { 119 + Ok(record) => Ok(Some(record)), 120 + Err(_) => Ok(None), 121 + } 122 + } 123 + 124 + fn read_record(&self, cid: &Cid) -> Result<RepoRecord> { 125 + if let Some(bytes) = self.get_bytes(cid)? { 126 + cbor_to_lex_record(&bytes) 127 + } else { 128 + Err(MissingBlockError::new(*cid, "RepoRecord").into()) 129 + } 130 + } 131 + }