Alternative ATProto PDS implementation

prototype actor_store; merge handlers

+295 -161
src/actor_store/actor_store.rs
··· 1 1 use std::path::PathBuf; 2 + use std::str::FromStr; 2 3 use std::sync::Arc; 3 4 4 - use anyhow::Result; 5 + use anyhow::{Context as _, Result, anyhow, bail}; 6 + use atrium_crypto::keypair::{Did as _, Export as _, Secp256k1Keypair}; 7 + use atrium_repo::Cid; 8 + use diesel::prelude::*; 9 + use sha2::Digest as _; 10 + use tokio::fs; 5 11 6 - use super::ActorDb; 7 - use super::actor_store_reader::ActorStoreReader; 12 + use super::PreparedWrite; 13 + use super::actor_store_handler::ActorStoreHandler; 8 14 use super::actor_store_resources::ActorStoreResources; 9 - use super::actor_store_transactor::ActorStoreTransactor; 10 - use super::actor_store_writer::ActorStoreWriter; 15 + use super::blob::{BlobStore as _, BlobStorePlaceholder}; 16 + use super::db::{ActorDb, get_db}; 11 17 use crate::SigningKey; 12 18 19 + /// Central manager for actor stores 13 20 pub(crate) struct ActorStore { 14 - pub(crate) directory: PathBuf, 15 - reserved_key_dir: PathBuf, 16 - resources: ActorStoreResources, 21 + /// Base directory for actor data 22 + pub directory: PathBuf, 23 + /// Resources shared between actor stores 24 + pub resources: ActorStoreResources, 17 25 } 18 26 19 27 struct ActorLocation { 28 + /// Actor's directory path 20 29 directory: PathBuf, 30 + /// Database path 21 31 db_location: PathBuf, 32 + /// Key path 22 33 key_location: PathBuf, 23 34 } 24 35 25 36 impl ActorStore { 37 + /// Create a new actor store manager 26 38 pub(crate) fn new(directory: impl Into<PathBuf>, resources: ActorStoreResources) -> Self { 27 - let directory = directory.into(); 28 - let reserved_key_dir = directory.join("reserved_keys"); 29 39 Self { 30 - directory, 31 - reserved_key_dir, 40 + directory: directory.into(), 32 41 resources, 33 42 } 34 43 } 35 44 45 + /// Get the location information for an actor 36 46 pub(crate) async fn get_location(&self, did: &str) -> Result<ActorLocation> { 37 - // const didHash = await crypto.sha256Hex(did) 38 - // const directory = path.join(this.cfg.directory, didHash.slice(0, 2), did) 39 - // const dbLocation = path.join(directory, `store.sqlite`) 40 - // const keyLocation = path.join(directory, `key`) 41 - // return { directory, dbLocation, keyLocation } 42 - todo!() 47 + // Hash the DID for directory organization 48 + let did_hash = sha2::Sha256::digest(did.as_bytes()); 49 + let hash_prefix = format!("{:02x}", did_hash[0]); 50 + 51 + // Create paths 52 + let directory = self.directory.join(hash_prefix).join(did); 53 + let db_location = directory.join("store.sqlite"); 54 + let key_location = directory.join("key"); 55 + 56 + Ok(ActorLocation { 57 + directory, 58 + db_location, 59 + key_location, 60 + }) 43 61 } 44 62 63 + /// Check if an actor store exists 45 64 pub(crate) async fn exists(&self, did: &str) -> Result<bool> { 46 - // const location = await this.getLocation(did) 47 - // return await fileExists(location.dbLocation) 48 - todo!() 65 + let location = self.get_location(did).await?; 66 + Ok(location.db_location.exists()) 49 67 } 50 68 69 + /// Get the signing keypair for an actor 51 70 pub(crate) async fn keypair(&self, did: &str) -> Result<Arc<SigningKey>> { 52 - // const { keyLocation } = await this.getLocation(did) 53 - // const privKey = await fs.readFile(keyLocation) 54 - // return crypto.Secp256k1Keypair.import(privKey) 55 - todo!() 71 + let location = self.get_location(did).await?; 72 + let priv_key = fs::read(&location.key_location) 73 + .await 74 + .context("Failed to read key file")?; 75 + 76 + let keypair = SigningKey::import(&priv_key).context("Failed to import signing key")?; 77 + 78 + Ok(Arc::new(keypair)) 56 79 } 57 80 81 + /// Open the database for an actor 58 82 pub(crate) async fn open_db(&self, did: &str) -> Result<ActorDb> { 59 - // const { dbLocation } = await this.getLocation(did) 60 - // const exists = await fileExists(dbLocation) 61 - // if (!exists) { 62 - // throw new InvalidRequestError('Repo not found', 'NotFound') 63 - // } 83 + let location = self.get_location(did).await?; 84 + 85 + if !location.db_location.exists() { 86 + bail!("Repo not found"); 87 + } 88 + 89 + // Convert path to string for SQLite connection 90 + let db_path = location 91 + .db_location 92 + .to_str() 93 + .ok_or_else(|| anyhow!("Invalid path encoding"))?; 64 94 65 - // const db = getDb(dbLocation, this.cfg.disableWalAutoCheckpoint) 95 + // Open database with WAL mode enabled 96 + let db = get_db(db_path, false) 97 + .await 98 + .context("Failed to open actor database")?; 66 99 67 - // // run a simple select with retry logic to ensure the db is ready (not in wal recovery mode) 68 - // try { 69 - // await retrySqlite(() => 70 - // db.db.selectFrom('repo_root').selectAll().execute(), 71 - // ) 72 - // } catch (err) { 73 - // db.close() 74 - // throw err 75 - // } 100 + // Run a simple query to ensure the database is ready 101 + db.run(|conn| diesel::sql_query("SELECT 1 FROM repo_root LIMIT 1").execute(conn)) 102 + .await 103 + .context("Database not ready")?; 76 104 77 - // return db 78 - todo!() 105 + Ok(db) 79 106 } 80 107 108 + /// Execute read operations on an actor store 81 109 pub(crate) async fn read<T, F>(&self, did: &str, f: F) -> Result<T> 82 110 where 83 - F: FnOnce(ActorStoreReader) -> Result<T>, 111 + F: FnOnce(ActorStoreHandler) -> Result<T>, 84 112 { 85 - // const db = await this.openDb(did) 86 - // try { 87 - // const getKeypair = () => this.keypair(did) 88 - // return await fn(new ActorStoreReader(did, db, this.resources, getKeypair)) 89 - // } finally { 90 - // db.close() 91 - // } 92 - todo!() 113 + let db = self.open_db(did).await?; 114 + let blobstore = self.resources.blobstore(did.to_string()); 115 + 116 + // Create a read-only handler 117 + let handler = ActorStoreHandler::new_reader(db.clone(), did.to_string(), blobstore); 118 + 119 + // Execute the function 120 + f(handler) 93 121 } 94 122 123 + /// Execute read-write operations with a transaction 95 124 pub(crate) async fn transact<T, F>(&self, did: &str, f: F) -> Result<T> 96 125 where 97 - F: FnOnce(ActorStoreTransactor) -> Result<T>, 126 + F: FnOnce(ActorStoreHandler) -> Result<T>, 98 127 { 99 - // const keypair = await this.keypair(did) 100 - // const db = await this.openDb(did) 101 - // try { 102 - // return await db.transaction((dbTxn) => { 103 - // return fn(new ActorStoreTransactor(did, dbTxn, keypair, this.resources)) 104 - // }) 105 - // } finally { 106 - // db.close() 107 - // } 108 - todo!() 128 + let db = self.open_db(did).await?; 129 + let keypair = self.keypair(did).await?; 130 + let blobstore = self.resources.blobstore(did.to_string()); 131 + let background_queue = self.resources.background_queue(); 132 + 133 + // Create a read-write handler with transaction support 134 + let handler = ActorStoreHandler::new_writer( 135 + db, 136 + did.to_string(), 137 + blobstore, 138 + keypair, 139 + background_queue.as_ref().clone(), 140 + ); 141 + 142 + // Execute the function (will handle transactions internally) 143 + f(handler) 109 144 } 110 145 146 + /// Execute read-write operations without a transaction 111 147 pub(crate) async fn write_no_transaction<T, F>(&self, did: &str, f: F) -> Result<T> 112 148 where 113 - F: FnOnce(ActorStoreWriter) -> Result<T>, 149 + F: FnOnce(ActorStoreHandler) -> Result<T>, 114 150 { 115 - // const keypair = await this.keypair(did) 116 - // const db = await this.openDb(did) 117 - // try { 118 - // return await fn(new ActorStoreWriter(did, db, keypair, this.resources)) 119 - // } finally { 120 - // db.close() 121 - // } 122 - todo!() 151 + let db = self.open_db(did).await?; 152 + let keypair = self.keypair(did).await?; 153 + let blobstore = self.resources.blobstore(did.to_string()); 154 + let background_queue = self.resources.background_queue(); 155 + 156 + // Create a read-write handler without automatic transaction 157 + let handler = ActorStoreHandler::new_writer( 158 + db, 159 + did.to_string(), 160 + blobstore, 161 + keypair, 162 + background_queue.as_ref().clone(), 163 + ); 164 + 165 + // Execute the function 166 + f(handler) 123 167 } 124 168 169 + /// Create a new actor store 125 170 pub(crate) async fn create(&self, did: &str, keypair: SigningKey) -> Result<()> { 126 - // const { directory, dbLocation, keyLocation } = await this.getLocation(did) 127 - // // ensure subdir exists 128 - // await mkdir(directory, { recursive: true }) 129 - // const exists = await fileExists(dbLocation) 130 - // if (exists) { 131 - // throw new InvalidRequestError('Repo already exists', 'AlreadyExists') 132 - // } 133 - // const privKey = await keypair.export() 134 - // await fs.writeFile(keyLocation, privKey) 171 + let location = self.get_location(did).await?; 172 + 173 + // Ensure directory exists 174 + fs::create_dir_all(&location.directory) 175 + .await 176 + .context("Failed to create directory")?; 177 + 178 + // Check if repo already exists 179 + if location.db_location.exists() { 180 + bail!("Repo already exists"); 181 + } 182 + 183 + // Export and save private key 184 + let priv_key = keypair.export(); 185 + fs::write(&location.key_location, priv_key) 186 + .await 187 + .context("Failed to write key file")?; 135 188 136 - // const db: ActorDb = getDb(dbLocation, this.cfg.disableWalAutoCheckpoint) 137 - // try { 138 - // await db.ensureWal() 139 - // const migrator = getMigrator(db) 140 - // await migrator.migrateToLatestOrThrow() 141 - // } finally { 142 - // db.close() 143 - // } 144 - todo!() 189 + // Initialize the database 190 + let db_path = location 191 + .db_location 192 + .to_str() 193 + .ok_or_else(|| anyhow!("Invalid path encoding"))?; 194 + 195 + let db = get_db(db_path, false) 196 + .await 197 + .context("Failed to create actor database")?; 198 + 199 + // Ensure WAL mode and run migrations 200 + db.ensure_wal().await?; 201 + db.run_migrations()?; 202 + 203 + Ok(()) 145 204 } 146 205 206 + /// Destroy an actor store 147 207 pub(crate) async fn destroy(&self, did: &str) -> Result<()> { 148 - // const blobstore = this.resources.blobstore(did) 149 - // if (blobstore instanceof DiskBlobStore) { 150 - // await blobstore.deleteAll() 151 - // } else { 152 - // const cids = await this.read(did, async (store) => 153 - // store.repo.blob.getBlobCids(), 154 - // ) 155 - // await Promise.allSettled( 156 - // chunkArray(cids, 500).map((chunk) => blobstore.deleteMany(chunk)), 157 - // ) 158 - // } 208 + // Get all blob CIDs first 209 + let cids = self 210 + .read(did, |handler| async move { 211 + handler.repo.blob.get_blob_cids().await 212 + }) 213 + .await?; 214 + 215 + // Delete all blobs 216 + let blobstore = self.resources.blobstore(did.to_string()); 217 + if !cids.is_empty() { 218 + // Process in chunks of 500 219 + for chunk in cids.chunks(500) { 220 + let _ = blobstore.delete_many(chunk.to_vec()).await; 221 + } 222 + } 223 + 224 + // Remove directory and all files 225 + let location = self.get_location(did).await?; 226 + if location.directory.exists() { 227 + fs::remove_dir_all(&location.directory) 228 + .await 229 + .context("Failed to remove actor directory")?; 230 + } 159 231 160 - // const { directory } = await this.getLocation(did) 161 - // await rmIfExists(directory, true) 162 - todo!() 232 + Ok(()) 163 233 } 164 234 165 - // async reserveKeypair(did?: string): Promise<string> { 166 - // let keyLoc: string | undefined 167 - // if (did) { 168 - // assertSafePathPart(did) 169 - // keyLoc = path.join(this.reservedKeyDir, did) 170 - // const maybeKey = await loadKey(keyLoc) 171 - // if (maybeKey) { 172 - // return maybeKey.did() 173 - // } 174 - // } 175 - // const keypair = await crypto.Secp256k1Keypair.create({ exportable: true }) 176 - // const keyDid = keypair.did() 177 - // keyLoc = keyLoc ?? path.join(this.reservedKeyDir, keyDid) 178 - // await mkdir(this.reservedKeyDir, { recursive: true }) 179 - // await fs.writeFile(keyLoc, await keypair.export()) 180 - // return keyDid 181 - // } 235 + /// Reserve a keypair for future use 236 + pub(crate) async fn reserve_keypair(&self, did: Option<&str>) -> Result<String> { 237 + let reserved_dir = self 238 + .resources 239 + .reserved_key_dir() 240 + .ok_or_else(|| anyhow!("No reserved key directory configured"))?; 182 241 183 - // async getReservedKeypair( 184 - // signingKeyOrDid: string, 185 - // ): Promise<ExportableKeypair | undefined> { 186 - // return loadKey(path.join(this.reservedKeyDir, signingKeyOrDid)) 187 - // } 242 + // If DID is provided, check if key already exists 243 + let mut key_path = None; 244 + if let Some(did_str) = did { 245 + assert_safe_path_part(did_str)?; 246 + key_path = Some(reserved_dir.join(did_str)); 188 247 189 - // async clearReservedKeypair(keyDid: string, did?: string) { 190 - // await rmIfExists(path.join(this.reservedKeyDir, keyDid)) 191 - // if (did) { 192 - // await rmIfExists(path.join(this.reservedKeyDir, did)) 193 - // } 194 - // } 248 + if key_path.as_ref().unwrap().exists() { 249 + let key_data = fs::read(key_path.as_ref().unwrap()).await?; 250 + let keypair = Secp256k1Keypair::import(&key_data) 251 + .context("Failed to import existing reserved key")?; 252 + return Ok(keypair.did()); 253 + } 254 + } 195 255 196 - // async storePlcOp(did: string, op: Uint8Array) { 197 - // const { directory } = await this.getLocation(did) 198 - // const opLoc = path.join(directory, `did-op`) 199 - // await fs.writeFile(opLoc, op) 200 - // } 256 + // Create a new keypair 257 + let keypair = Secp256k1Keypair::create(&mut rand::thread_rng()); 258 + let key_did = keypair.did(); 201 259 202 - // async getPlcOp(did: string): Promise<Uint8Array> { 203 - // const { directory } = await this.getLocation(did) 204 - // const opLoc = path.join(directory, `did-op`) 205 - // return await fs.readFile(opLoc) 206 - // } 260 + // Set path if not already set 261 + let final_path = key_path.unwrap_or_else(|| reserved_dir.join(&key_did)); 207 262 208 - // async clearPlcOp(did: string) { 209 - // const { directory } = await this.getLocation(did) 210 - // const opLoc = path.join(directory, `did-op`) 211 - // await rmIfExists(opLoc) 212 - // } 263 + // Ensure directory exists 264 + fs::create_dir_all(reserved_dir).await?; 265 + 266 + // Save key 267 + fs::write(&final_path, keypair.export()).await?; 268 + 269 + Ok(key_did) 270 + } 271 + 272 + /// Get a reserved keypair 273 + pub(crate) async fn get_reserved_keypair( 274 + &self, 275 + key_did: &str, 276 + ) -> Result<Option<Arc<SigningKey>>> { 277 + let reserved_dir = self 278 + .resources 279 + .reserved_key_dir() 280 + .ok_or_else(|| anyhow!("No reserved key directory configured"))?; 281 + 282 + let key_path = reserved_dir.join(key_did); 283 + if !key_path.exists() { 284 + return Ok(None); 285 + } 286 + 287 + let key_data = fs::read(key_path).await?; 288 + let keypair = SigningKey::import(&key_data).context("Failed to import reserved key")?; 289 + 290 + Ok(Some(Arc::new(keypair))) 291 + } 292 + 293 + /// Clear a reserved keypair 294 + pub(crate) async fn clear_reserved_keypair( 295 + &self, 296 + key_did: &str, 297 + did: Option<&str>, 298 + ) -> Result<()> { 299 + let reserved_dir = self 300 + .resources 301 + .reserved_key_dir() 302 + .ok_or_else(|| anyhow!("No reserved key directory configured"))?; 303 + 304 + // Remove key by DID 305 + let key_path = reserved_dir.join(key_did); 306 + if key_path.exists() { 307 + fs::remove_file(key_path).await?; 308 + } 309 + 310 + // If DID mapping provided, remove that too 311 + if let Some(did_str) = did { 312 + let did_path = reserved_dir.join(did_str); 313 + if did_path.exists() { 314 + fs::remove_file(did_path).await?; 315 + } 316 + } 317 + 318 + Ok(()) 319 + } 320 + 321 + /// Store a PLC operation 322 + pub(crate) async fn store_plc_op(&self, did: &str, op: &[u8]) -> Result<()> { 323 + let location = self.get_location(did).await?; 324 + let op_path = location.directory.join("did-op"); 325 + 326 + fs::write(op_path, op).await?; 327 + Ok(()) 328 + } 329 + 330 + /// Get a stored PLC operation 331 + pub(crate) async fn get_plc_op(&self, did: &str) -> Result<Vec<u8>> { 332 + let location = self.get_location(did).await?; 333 + let op_path = location.directory.join("did-op"); 334 + 335 + let data = fs::read(op_path).await?; 336 + Ok(data) 337 + } 338 + 339 + /// Clear a stored PLC operation 340 + pub(crate) async fn clear_plc_op(&self, did: &str) -> Result<()> { 341 + let location = self.get_location(did).await?; 342 + let op_path = location.directory.join("did-op"); 343 + 344 + if op_path.exists() { 345 + fs::remove_file(op_path).await?; 346 + } 347 + 348 + Ok(()) 349 + } 213 350 } 214 351 215 - // const loadKey = async (loc: string): Promise<ExportableKeypair | undefined> => { 216 - // const privKey = await readIfExists(loc) 217 - // if (!privKey) return undefined 218 - // return crypto.Secp256k1Keypair.import(privKey, { exportable: true }) 219 - // } 352 + /// Ensure a path part is safe to use in a filename 353 + fn assert_safe_path_part(part: &str) -> Result<()> { 354 + let normalized = std::path::Path::new(part) 355 + .file_name() 356 + .and_then(|s| s.to_str()) 357 + .ok_or_else(|| anyhow!("Invalid path"))?; 220 358 221 - // function assertSafePathPart(part: string) { 222 - // const normalized = path.normalize(part) 223 - // assert( 224 - // part === normalized && 225 - // !part.startsWith('.') && 226 - // !part.includes('/') && 227 - // !part.includes('\\'), 228 - // `unsafe path part: ${part}`, 229 - // ) 230 - // } 359 + if part != normalized || part.starts_with('.') || part.contains('/') || part.contains('\\') { 360 + bail!("Unsafe path part: {}", part); 361 + } 362 + 363 + Ok(()) 364 + }
+329
src/actor_store/actor_store_handler.rs
··· 1 + use std::path::PathBuf; 2 + use std::sync::Arc; 3 + 4 + use anyhow::{Context as _, Result, anyhow}; 5 + use futures::TryStreamExt; 6 + use rsky_repo::repo::Repo; 7 + use rsky_repo::types::{CommitData, CommitDataWithOps, PreparedWrite as RskyPreparedWrite}; 8 + 9 + use super::PreparedWrite; 10 + use super::blob::{BackgroundQueue, BlobStorePlaceholder}; 11 + use super::db::ActorDb; 12 + use super::preference::PreferenceHandler; 13 + use super::record::RecordHandler; 14 + use super::repo::RepoHandler; 15 + use crate::SigningKey; 16 + 17 + /// Unified handler for actor store operations. 18 + pub(crate) struct ActorStoreHandler { 19 + /// Actor DID 20 + pub did: String, 21 + /// Database connection 22 + pub db: ActorDb, 23 + /// Repository handler 24 + pub repo: RepoHandler, 25 + /// Record handler 26 + pub record: RecordHandler, 27 + /// Preference handler 28 + pub pref: PreferenceHandler, 29 + /// Background queue for async operations 30 + pub background_queue: Option<BackgroundQueue>, 31 + /// Signing keypair (required for write operations) 32 + pub signing_key: Option<Arc<SigningKey>>, 33 + } 34 + 35 + impl ActorStoreHandler { 36 + /// Create a new actor store handler with read-only capabilities 37 + pub(crate) fn new_reader(db: ActorDb, did: String, blobstore: BlobStorePlaceholder) -> Self { 38 + let record = RecordHandler::new(db.clone(), did.clone()); 39 + let pref = PreferenceHandler::new(db.clone(), did.clone()); 40 + let repo = RepoHandler::new_reader(db.clone(), blobstore, did.clone()); 41 + 42 + Self { 43 + did, 44 + db, 45 + repo, 46 + record, 47 + pref, 48 + background_queue: None, 49 + signing_key: None, 50 + } 51 + } 52 + 53 + /// Create a new actor store handler with read/write capabilities 54 + pub(crate) fn new_writer( 55 + db: ActorDb, 56 + did: String, 57 + blobstore: BlobStorePlaceholder, 58 + signing_key: Arc<SigningKey>, 59 + background_queue: BackgroundQueue, 60 + ) -> Self { 61 + let record = RecordHandler::new_with_blobstore(db.clone(), blobstore.clone(), did.clone()); 62 + let pref = PreferenceHandler::new(db.clone(), did.clone()); 63 + let repo = RepoHandler::new_writer( 64 + db.clone(), 65 + blobstore, 66 + did.clone(), 67 + signing_key.clone(), 68 + background_queue.clone(), 69 + ); 70 + 71 + Self { 72 + did, 73 + db, 74 + repo, 75 + record, 76 + pref, 77 + background_queue: Some(background_queue), 78 + signing_key: Some(signing_key), 79 + } 80 + } 81 + 82 + /// Set signing key (needed for write operations) 83 + pub(crate) fn with_signing_key(mut self, signing_key: Arc<SigningKey>) -> Self { 84 + self.signing_key = Some(signing_key); 85 + self 86 + } 87 + 88 + /// Set background queue (needed for async operations) 89 + pub(crate) fn with_background_queue(mut self, queue: BackgroundQueue) -> Self { 90 + self.background_queue = Some(queue); 91 + self 92 + } 93 + 94 + // Repository Operations 95 + // -------------------- 96 + 97 + /// Try to load repository 98 + pub(crate) async fn maybe_load_repo(&self) -> Result<Option<Repo>> { 99 + self.repo.maybe_load_repo().await 100 + } 101 + 102 + /// Get repository root CID 103 + pub(crate) async fn get_repo_root(&self) -> Result<Option<atrium_repo::Cid>> { 104 + self.repo.get_repo_root().await 105 + } 106 + 107 + /// Create a new repository with prepared writes 108 + pub(crate) async fn create_repo( 109 + &self, 110 + writes: Vec<PreparedWrite>, 111 + ) -> Result<CommitDataWithOps> { 112 + if self.signing_key.is_none() { 113 + return Err(anyhow!( 114 + "No signing key available for create_repo operation" 115 + )); 116 + } 117 + 118 + let rsky_writes = writes 119 + .into_iter() 120 + .map(|w| RskyPreparedWrite::from(w)) 121 + .collect::<Vec<_>>(); 122 + 123 + self.repo.create_repo(rsky_writes).await 124 + } 125 + 126 + /// Process writes to the repository 127 + pub(crate) async fn process_writes( 128 + &self, 129 + writes: Vec<PreparedWrite>, 130 + swap_commit_cid: Option<atrium_repo::Cid>, 131 + ) -> Result<CommitDataWithOps> { 132 + if self.signing_key.is_none() { 133 + return Err(anyhow!( 134 + "No signing key available for process_writes operation" 135 + )); 136 + } 137 + 138 + let rsky_writes = writes 139 + .into_iter() 140 + .map(|w| RskyPreparedWrite::from(w)) 141 + .collect::<Vec<_>>(); 142 + 143 + self.repo.process_writes(rsky_writes, swap_commit_cid).await 144 + } 145 + 146 + /// Import a repository from external data 147 + pub(crate) async fn process_import_repo( 148 + &self, 149 + commit: CommitData, 150 + writes: Vec<PreparedWrite>, 151 + ) -> Result<()> { 152 + let rsky_writes = writes 153 + .into_iter() 154 + .map(|w| RskyPreparedWrite::from(w)) 155 + .collect::<Vec<_>>(); 156 + 157 + // First index the records 158 + self.repo.index_writes(&rsky_writes, &commit.rev).await?; 159 + 160 + // Then process the commit 161 + self.repo.storage.apply_commit(commit.clone(), None).await?; 162 + 163 + // Finally process any blobs 164 + if let Some(bg_queue) = &self.background_queue { 165 + self.repo 166 + .blob_transactor 167 + .process_write_blobs(&commit.rev, rsky_writes) 168 + .await?; 169 + } else { 170 + return Err(anyhow!( 171 + "Background queue required for process_import_repo operation" 172 + )); 173 + } 174 + 175 + Ok(()) 176 + } 177 + 178 + /// Get sync event data for replication 179 + pub(crate) async fn get_sync_event_data(&self) -> Result<super::repo::SyncEventData> { 180 + self.repo.get_sync_event_data().await 181 + } 182 + 183 + /// Destroy the repository and all associated data 184 + pub(crate) async fn destroy(&self) -> Result<()> { 185 + // Get all blob CIDs 186 + let blob_cids = self.repo.blob.get_blob_cids().await?; 187 + 188 + // Delete all blobs 189 + if !blob_cids.is_empty() { 190 + self.repo 191 + .blob_transactor 192 + .blobstore 193 + .delete_many(blob_cids.clone()) 194 + .await?; 195 + } 196 + 197 + Ok(()) 198 + } 199 + 200 + // Record Operations 201 + // ---------------- 202 + 203 + /// Get a specific record 204 + pub(crate) async fn get_record( 205 + &self, 206 + uri: &rsky_syntax::aturi::AtUri, 207 + cid: Option<&str>, 208 + include_soft_deleted: bool, 209 + ) -> Result<Option<super::record::RecordData>> { 210 + self.record.get_record(uri, cid, include_soft_deleted).await 211 + } 212 + 213 + /// List collections in the repository 214 + pub(crate) async fn list_collections(&self) -> Result<Vec<String>> { 215 + self.record.list_collections().await 216 + } 217 + 218 + /// List records in a collection 219 + pub(crate) async fn list_records_for_collection( 220 + &self, 221 + opts: super::record::ListRecordsOptions, 222 + ) -> Result<Vec<super::record::RecordData>> { 223 + self.record.list_records_for_collection(opts).await 224 + } 225 + 226 + /// Get record count 227 + pub(crate) async fn record_count(&self) -> Result<i64> { 228 + self.record.record_count().await 229 + } 230 + 231 + /// Update record takedown status 232 + pub(crate) async fn update_record_takedown_status( 233 + &self, 234 + uri: &rsky_syntax::aturi::AtUri, 235 + takedown: atrium_api::com::atproto::admin::defs::StatusAttr, 236 + ) -> Result<()> { 237 + self.record 238 + .update_record_takedown_status(uri, takedown) 239 + .await 240 + } 241 + 242 + // Preference Operations 243 + // ------------------- 244 + 245 + /// Get preferences for a namespace 246 + pub(crate) async fn get_preferences( 247 + &self, 248 + namespace: Option<&str>, 249 + scope: &str, 250 + ) -> Result<Vec<super::preference::AccountPreference>> { 251 + self.pref.get_preferences(namespace, scope).await 252 + } 253 + 254 + /// Put preferences for a namespace 255 + pub(crate) async fn put_preferences( 256 + &self, 257 + values: Vec<super::preference::AccountPreference>, 258 + namespace: &str, 259 + scope: &str, 260 + ) -> Result<()> { 261 + self.pref.put_preferences(values, namespace, scope).await 262 + } 263 + 264 + // Blob Operations 265 + // -------------- 266 + 267 + /// Get blob metadata 268 + pub(crate) async fn get_blob_metadata( 269 + &self, 270 + cid: &atrium_repo::Cid, 271 + ) -> Result<super::blob::BlobMetadata> { 272 + self.repo.blob.get_blob_metadata(cid).await 273 + } 274 + 275 + /// Get blob data 276 + pub(crate) async fn get_blob(&self, cid: &atrium_repo::Cid) -> Result<super::blob::BlobData> { 277 + self.repo.blob.get_blob(cid).await 278 + } 279 + 280 + /// Update blob takedown status 281 + pub(crate) async fn update_blob_takedown_status( 282 + &self, 283 + cid: atrium_repo::Cid, 284 + takedown: atrium_api::com::atproto::admin::defs::StatusAttr, 285 + ) -> Result<()> { 286 + self.repo 287 + .blob 288 + .update_blob_takedown_status(cid, takedown) 289 + .await 290 + } 291 + 292 + /// Upload blob and get metadata 293 + pub(crate) async fn upload_blob_and_get_metadata( 294 + &self, 295 + user_suggested_mime: &str, 296 + blob_bytes: &[u8], 297 + ) -> Result<super::blob::BlobMetadata> { 298 + self.repo 299 + .blob 300 + .upload_blob_and_get_metadata(user_suggested_mime, blob_bytes) 301 + .await 302 + } 303 + 304 + /// Count blobs 305 + pub(crate) async fn blob_count(&self) -> Result<i64> { 306 + self.repo.blob.blob_count().await 307 + } 308 + 309 + // Transaction Support 310 + // ----------------- 311 + 312 + /// Execute a transaction 313 + pub(crate) async fn transaction<T, F>(&self, f: F) -> Result<T> 314 + where 315 + F: FnOnce(&mut diesel::SqliteConnection) -> Result<T> + Send, 316 + T: Send + 'static, 317 + { 318 + self.db.transaction(f).await 319 + } 320 + 321 + /// Execute a database operation with retries 322 + pub(crate) async fn run<F, T>(&self, operation: F) -> Result<T> 323 + where 324 + F: FnOnce(&mut diesel::SqliteConnection) -> diesel::QueryResult<T> + Send, 325 + T: Send + 'static, 326 + { 327 + self.db.run(operation).await 328 + } 329 + }
-91
src/actor_store/actor_store_reader.rs
··· 1 - use anyhow::Result; 2 - use diesel::prelude::*; 3 - use std::sync::Arc; 4 - 5 - use super::{ 6 - ActorStoreTransactor, actor_store_resources::ActorStoreResources, db::ActorDb, 7 - preference::PreferenceReader, record::RecordReader, repo::RepoReader, 8 - }; 9 - use crate::SigningKey; 10 - 11 - /// Reader for the actor store. 12 - pub(crate) struct ActorStoreReader { 13 - /// DID of the actor. 14 - pub(crate) did: String, 15 - /// Record reader. 16 - pub(crate) record: RecordReader, 17 - /// Preference reader. 18 - pub(crate) pref: PreferenceReader, 19 - /// RepoReader 20 - pub(crate) repo: RepoReader, 21 - /// Function to get keypair. 22 - keypair_fn: Box<dyn Fn() -> Result<Arc<SigningKey>> + Send + Sync>, 23 - /// Database connection. 24 - pub(crate) db: ActorDb, 25 - /// Actor store resources. 26 - resources: ActorStoreResources, 27 - } 28 - 29 - impl ActorStoreReader { 30 - /// Create a new actor store reader. 31 - pub(crate) fn new( 32 - did: String, 33 - db: ActorDb, 34 - resources: ActorStoreResources, 35 - keypair: impl Fn() -> Result<Arc<SigningKey>> + Send + Sync + 'static, 36 - ) -> Self { 37 - // Create readers 38 - let record = RecordReader::new(db.clone(), did.clone()); 39 - let pref = PreferenceReader::new(db.clone(), did.clone()); 40 - 41 - // Store keypair function for later use 42 - let keypair_fn = Box::new(keypair); 43 - 44 - // Initial keypair call as in TypeScript implementation 45 - let _ = keypair_fn(); 46 - 47 - // Create repo reader 48 - let repo = RepoReader::new(db.clone(), resources.blobstore(did.clone()), did.clone()); 49 - 50 - Self { 51 - did, 52 - repo, 53 - record, 54 - pref, 55 - keypair_fn, 56 - db, 57 - resources, 58 - } 59 - } 60 - 61 - /// Get the keypair for this actor. 62 - pub(crate) fn keypair(&self) -> Result<Arc<SigningKey>> { 63 - (self.keypair_fn)() 64 - } 65 - 66 - /// Execute a transaction with the actor store. 67 - pub(crate) async fn transact<T, F>(&self, f: F) -> Result<T> 68 - where 69 - F: FnOnce(ActorStoreTransactor) -> Result<T> + Send, 70 - T: Send + 'static, 71 - { 72 - let keypair = self.keypair()?; 73 - let did = self.did.clone(); 74 - let resources = self.resources.clone(); 75 - 76 - self.db 77 - .transaction(move |conn| { 78 - // Create a transactor with the transaction 79 - // We'll create a temporary ActorDb with the same pool 80 - let db = ActorDb { 81 - pool: self.db.pool.clone(), 82 - }; 83 - 84 - let store = ActorStoreTransactor::new(did, db, keypair.clone(), resources); 85 - 86 - // Execute user function 87 - f(store) 88 - }) 89 - .await 90 - } 91 - }
+35 -10
src/actor_store/actor_store_resources.rs
··· 1 + use std::path::PathBuf; 2 + use std::sync::Arc; 3 + 1 4 use super::blob::{BackgroundQueue, BlobStorePlaceholder}; 5 + 2 6 pub(crate) struct ActorStoreResources { 3 - pub(crate) blobstore: fn(did: String) -> BlobStorePlaceholder, 4 - pub(crate) background_queue: BackgroundQueue, 5 - pub(crate) reserved_key_dir: Option<String>, 7 + // Factory function to create blobstore instances 8 + blobstore_factory: Arc<dyn Fn(String) -> BlobStorePlaceholder + Send + Sync>, 9 + // Shared background queue 10 + background_queue: Arc<BackgroundQueue>, 11 + // Optional directory for reserved keys 12 + reserved_key_dir: Option<PathBuf>, 6 13 } 14 + 7 15 impl ActorStoreResources { 16 + // Simple constructor with minimal parameters 8 17 pub(crate) fn new( 9 - blobstore: fn(did: String) -> BlobStorePlaceholder, 10 - background_queue: BackgroundQueue, 11 - reserved_key_dir: Option<String>, 18 + blobstore_factory: impl Fn(String) -> BlobStorePlaceholder + Send + Sync + 'static, 19 + concurrency: usize, 12 20 ) -> Self { 13 21 Self { 14 - blobstore, 15 - background_queue, 16 - reserved_key_dir, 22 + blobstore_factory: Arc::new(blobstore_factory), 23 + background_queue: Arc::new(BackgroundQueue::new(concurrency)), 24 + reserved_key_dir: None, 17 25 } 18 26 } 19 27 28 + // Set reserved key directory 29 + pub(crate) fn with_reserved_key_dir(mut self, dir: impl Into<PathBuf>) -> Self { 30 + self.reserved_key_dir = Some(dir.into()); 31 + self 32 + } 33 + 34 + // Get a blobstore for a DID 20 35 pub(crate) fn blobstore(&self, did: String) -> BlobStorePlaceholder { 21 - (self.blobstore)(did) 36 + (self.blobstore_factory)(did) 37 + } 38 + 39 + // Get the background queue 40 + pub(crate) fn background_queue(&self) -> Arc<BackgroundQueue> { 41 + self.background_queue.clone() 42 + } 43 + 44 + // Get the reserved key directory 45 + pub(crate) fn reserved_key_dir(&self) -> Option<&PathBuf> { 46 + self.reserved_key_dir.as_ref() 22 47 } 23 48 }
-48
src/actor_store/actor_store_transactor.rs
··· 1 - use std::sync::Arc; 2 - 3 - use super::{ 4 - ActorDb, actor_store_resources::ActorStoreResources, preference::PreferenceTransactor, 5 - record::RecordTransactor, repo::RepoTransactor, 6 - }; 7 - use crate::SigningKey; 8 - 9 - /// Actor store transactor for managing actor-related transactions. 10 - pub(crate) struct ActorStoreTransactor { 11 - /// Record transactor. 12 - pub(crate) record: RecordTransactor, 13 - /// Repo transactor. 14 - pub(crate) repo: RepoTransactor, 15 - /// Preference transactor. 16 - pub(crate) pref: PreferenceTransactor, 17 - } 18 - impl ActorStoreTransactor { 19 - /// Creates a new actor store transactor. 20 - /// 21 - /// # Arguments 22 - /// 23 - /// * `did` - The DID of the actor. 24 - /// * `db` - The database connection. 25 - /// * `keypair` - The signing keypair. 26 - /// * `resources` - The actor store resources. 27 - pub(crate) fn new( 28 - did: String, 29 - db: ActorDb, 30 - keypair: Arc<SigningKey>, 31 - resources: ActorStoreResources, 32 - ) -> Self { 33 - let blobstore = resources.blobstore(did.clone()); 34 - 35 - let record = RecordTransactor::new(db.clone(), blobstore.clone()); 36 - let pref = PreferenceTransactor::new(db.clone()); 37 - let repo = RepoTransactor::new( 38 - db, 39 - blobstore, 40 - did, 41 - keypair, 42 - resources.background_queue, 43 - None, 44 - ); 45 - 46 - Self { record, repo, pref } 47 - } 48 - }
-11
src/actor_store/actor_store_writer.rs
··· 1 - use std::sync::Arc; 2 - 3 - use super::{ActorDb, ActorStoreResources}; 4 - use crate::SigningKey; 5 - 6 - pub(crate) struct ActorStoreWriter { 7 - pub(crate) did: String, 8 - pub(crate) db: ActorDb, 9 - pub(crate) keypair: Arc<SigningKey>, 10 - pub(crate) resources: ActorStoreResources, 11 - }
+1 -11
src/actor_store/blob/mod.rs
··· 96 96 } 97 97 98 98 impl BlobHandler { 99 - /// Create a new blob handler for read operations 100 - pub fn new_reader(db: ActorDb, blobstore: impl BlobStore + 'static, did: String) -> Self { 101 - Self { 102 - db, 103 - did, 104 - blobstore: Box::new(blobstore), 105 - background_queue: None, 106 - } 107 - } 108 - 109 99 /// Create a new blob handler with background queue for write operations 110 - pub fn new_writer( 100 + pub fn new( 111 101 db: ActorDb, 112 102 blobstore: impl BlobStore + 'static, 113 103 background_queue: background::BackgroundQueue,
+2 -6
src/actor_store/mod.rs
··· 1 1 //! Actor store implementation for ATProto PDS. 2 2 3 3 mod actor_store; 4 - mod actor_store_reader; 4 + mod actor_store_handler; 5 5 mod actor_store_resources; 6 - mod actor_store_transactor; 7 - mod actor_store_writer; 8 6 mod blob; 9 7 mod db; 10 8 mod preference; ··· 14 12 mod sql_repo; 15 13 16 14 pub(crate) use actor_store::ActorStore; 17 - pub(crate) use actor_store_reader::ActorStoreReader; 15 + pub(crate) use actor_store_handler::ActorStoreHandler; 18 16 pub(crate) use actor_store_resources::ActorStoreResources; 19 - pub(crate) use actor_store_transactor::ActorStoreTransactor; 20 - pub(crate) use actor_store_writer::ActorStoreWriter; 21 17 pub(crate) use db::ActorDb; 22 18 pub(crate) use prepared_write::PreparedWrite;
+8 -7
src/actor_store/repo.rs
··· 11 11 block_map::BlockMap, 12 12 cid_set::CidSet, 13 13 repo::Repo, 14 - storage::{readable_blockstore::ReadableBlockstore as _, types::RepoStorage as _}, 14 + storage::{readable_blockstore::ReadableBlockstore as _, types::RepoStorage}, 15 15 types::{ 16 16 CommitAction, CommitData, CommitDataWithOps, CommitOp, PreparedBlobRef, PreparedWrite, 17 17 WriteOpAction, write_to_op, ··· 19 19 util::format_data_key, 20 20 }; 21 21 use rsky_syntax::aturi::AtUri; 22 + use tokio::sync::RwLock; 22 23 23 24 use super::{ 24 25 ActorDb, 25 - blob::{BackgroundQueue, BlobReader, BlobStorePlaceholder, BlobTransactor}, 26 + blob::{BackgroundQueue, BlobHandler, BlobStorePlaceholder}, 26 27 record::RecordHandler, 27 28 }; 28 29 use crate::SigningKey; ··· 44 45 /// Actor DID 45 46 pub did: String, 46 47 /// Backend storage 47 - pub storage: SqlRepoStorage, 48 + pub storage: Arc<RwLock<dyn RepoStorage>>, 48 49 /// BlobReader for handling blob operations 49 - pub blob: BlobReader, 50 + pub blob: BlobHandler, 50 51 /// RecordHandler for handling record operations 51 52 pub record: RecordHandler, 52 53 /// BlobTransactor for handling blob writes 53 - pub blob_transactor: BlobTransactor, 54 + pub blob_transactor: BlobHandler, 54 55 /// RecordHandler for handling record writes 55 56 pub record_transactor: RecordHandler, 56 57 /// Signing keypair ··· 69 70 background_queue: BackgroundQueue, 70 71 ) -> Self { 71 72 // Create readers 72 - let blob = BlobReader::new(db.clone(), blobstore.clone()); 73 + let blob = BlobHandler::new(db.clone(), blobstore.clone()); 73 74 let record = RecordHandler::new(db.clone(), did.clone()); 74 75 75 76 // Create storage backend with current timestamp ··· 78 79 79 80 // Create transactors 80 81 let blob_transactor = 81 - BlobTransactor::new(db.clone(), blobstore.clone(), background_queue.clone()); 82 + BlobHandler::new(db.clone(), blobstore.clone(), background_queue.clone()); 82 83 let record_transactor = RecordHandler::new(db.clone(), blobstore); 83 84 84 85 Self {
+20 -21
src/db/mod.rs
··· 69 69 .pool 70 70 .get() 71 71 .context("Failed to get connection for migrations")?; 72 + 72 73 conn.run_pending_migrations(MIGRATIONS) 73 - .context("Failed to run migrations")?; 74 + .map_err(|e| anyhow::anyhow!("Failed to run migrations: {}", e))?; 75 + 74 76 Ok(()) 75 77 } 76 78 ··· 97 99 let mut conn = self.pool.get().context("Failed to get connection")?; 98 100 match operation(&mut conn) { 99 101 Ok(result) => return Ok(result), 100 - Err(diesel::result::Error::DatabaseError( 101 - diesel::result::DatabaseErrorKind::DatabaseIsLocked, 102 - _, 103 - )) => { 104 - retries += 1; 105 - let backoff_ms = 10 * (1 << retries); // Exponential backoff 106 - last_error = Some(diesel::result::Error::DatabaseError( 107 - diesel::result::DatabaseErrorKind::DatabaseIsLocked, 108 - Box::new("Database is locked".to_string()), 109 - )); 110 - tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await; 111 - } 102 + // TODO: Busy error handling 103 + // Err(diesel::result::Error::DatabaseError( 104 + // diesel::result::DatabaseErrorKind::DatabaseIsLocked, 105 + // _, 106 + // )) => { 107 + // retries += 1; 108 + // let backoff_ms = 10 * (1 << retries); // Exponential backoff 109 + // last_error = Some(diesel::result::Error::DatabaseError( 110 + // diesel::result::DatabaseErrorKind::DatabaseIsLocked, 111 + // Box::new("Database is locked".to_string()), 112 + // )); 113 + // tokio::time::sleep(Duration::from_millis(backoff_ms)).await; 114 + // } 112 115 Err(e) => return Err(e.into()), 113 116 } 114 117 } 115 118 116 119 Err(anyhow::anyhow!( 117 120 "Max retries exceeded: {}", 118 - last_error.unwrap_or(diesel::result::Error::RollbackTransaction(Box::new( 119 - "Unknown error" 120 - ))) 121 + last_error.unwrap_or_else(|| diesel::result::Error::RollbackTransaction) 121 122 )) 122 123 } 123 124 ··· 135 136 T: Send + 'static, 136 137 { 137 138 self.run(|conn| { 138 - conn.transaction(|tx| { 139 - f(tx).map_err(|e| diesel::result::Error::RollbackTransaction(Box::new(e))) 140 - }) 139 + conn.transaction(|tx| f(tx).map_err(|e| diesel::result::Error::RollbackTransaction)) 141 140 }) 142 141 .await 143 142 } ··· 148 147 F: FnOnce(&mut SqliteConnection) -> std::result::Result<T, diesel::result::Error> + Send, 149 148 T: Send + 'static, 150 149 { 151 - let conn = &mut self 150 + let mut conn = self 152 151 .pool 153 152 .get() 154 153 .context("Failed to get connection for transaction")?; 155 154 156 - conn.transaction(f) 155 + conn.transaction(|tx| f(tx)) 157 156 .map_err(|e| anyhow::anyhow!("Transaction error: {:?}", e)) 158 157 } 159 158 }