Alternative ATProto PDS implementation

prototype actor_store

+20
.sqlx/query-22c1e98ac038509ad16ce437e6670a59d3fc97a05ea8b0f1f80dba0157c53e13.json
··· 1 + { 2 + "db_name": "SQLite", 3 + "query": "SELECT name FROM actor_migration", 4 + "describe": { 5 + "columns": [ 6 + { 7 + "name": "name", 8 + "ordinal": 0, 9 + "type_info": "Text" 10 + } 11 + ], 12 + "parameters": { 13 + "Right": 0 14 + }, 15 + "nullable": [ 16 + false 17 + ] 18 + }, 19 + "hash": "22c1e98ac038509ad16ce437e6670a59d3fc97a05ea8b0f1f80dba0157c53e13" 20 + }
+12
.sqlx/query-5ea8376fbbe3077b2fc62187cc29a2d03eda91fa468c7fe63306f04e160ecb5d.json
··· 1 + { 2 + "db_name": "SQLite", 3 + "query": "INSERT INTO actor_migration (name, appliedAt) VALUES (?, ?)", 4 + "describe": { 5 + "columns": [], 6 + "parameters": { 7 + "Right": 2 8 + }, 9 + "nullable": [] 10 + }, 11 + "hash": "5ea8376fbbe3077b2fc62187cc29a2d03eda91fa468c7fe63306f04e160ecb5d" 12 + }
+4 -2
src/actor_store/blob/reader.rs
··· 7 7 use atrium_repo::Cid; 8 8 use sqlx::{Row, SqlitePool}; 9 9 10 + use crate::repo::types::{BlobStore, BlobStoreTrait}; 11 + 10 12 /// Reader for blob data in the actor store. 11 13 pub(crate) struct BlobReader { 12 14 /// Database connection. ··· 47 49 /// Get a blob's full data and metadata. 48 50 pub(crate) async fn get_blob(&self, cid: &Cid) -> Result<Option<BlobData>> { 49 51 let metadata = self.get_blob_metadata(cid).await?; 50 - let blob_stream = self.blobstore.get_stream(cid).await?; 52 + let blob_stream = self.blobstore.get_stream(*cid)?; 51 53 if blob_stream.is_none() { 52 54 return Err(anyhow::anyhow!("Blob not found")); // InvalidRequestError('Blob not found') 53 55 } ··· 240 242 pub size: u64, 241 243 /// The MIME type of the blob. 242 244 pub mime_type: Option<String>, 243 - pub stream: BlobStream, 245 + pub stream: BlobStream, // stream.Readable, 244 246 } 245 247 246 248 /// Options for listing blobs.
+6 -5
src/actor_store/blob/transactor.rs
··· 13 13 use uuid::Uuid; 14 14 15 15 use super::BlobReader; 16 - use crate::repo::types::{PreparedBlobRef, PreparedWrite, WriteOpAction}; 16 + use crate::repo::types::{BlobStore, PreparedBlobRef, PreparedWrite, WriteOpAction}; 17 17 18 18 /// Blob metadata for a newly uploaded blob. 19 19 #[derive(Debug, Clone)] ··· 163 163 164 164 /// Process blobs for a repository write operation. 165 165 pub(crate) async fn process_write_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> { 166 - self.delete_dereferenced_blobs(writes) 166 + self.delete_dereferenced_blobs(writes, false) 167 167 .await 168 168 .context("failed to delete dereferenced blobs")?; 169 169 for write in writes { 170 - if write.action == WriteOpAction::Create || write.action == WriteOpAction::Update { 171 - for blob in &write.blobs { 170 + if write.action() == &WriteOpAction::Create || write.action() == &WriteOpAction::Update 171 + { 172 + for blob in &write.blobs() { 172 173 self.verify_blob_and_make_permanent(blob) 173 174 .await 174 175 .context("failed to verify and make blob permanent")?; 175 - self.associate_blob(&blob.r#ref.0.to_string(), &write.uri) 176 + self.associate_blob(&blob.r#ref.0.to_string(), &write.uri()) 176 177 .await 177 178 .context("failed to associate blob with record")?; 178 179 }
+6 -5
src/actor_store/db/migrations.rs
··· 1 1 //! Database migrations for the actor store. 2 2 use anyhow::{Context as _, Result}; 3 - use sqlx::SqlitePool; 3 + use sqlx::{Executor, SqlitePool}; 4 4 5 5 use super::ActorDb; 6 6 ··· 25 25 /// Run all migrations 26 26 pub(crate) async fn migrate_to_latest(&self) -> Result<()> { 27 27 let past_migrations = sqlx::query!("SELECT name FROM actor_migration") 28 - .fetch_all(&self.db.db) 28 + .fetch_all(&self.db.pool) 29 29 .await?; 30 30 let mut past_migration_names = past_migrations 31 31 .iter() ··· 36 36 let name = format!("{:p}", migration); 37 37 if !past_migration_names.contains(&name) { 38 38 migration(&self.db)?; 39 + let now = chrono::Utc::now().to_rfc3339(); 39 40 sqlx::query!( 40 41 "INSERT INTO actor_migration (name, appliedAt) VALUES (?, ?)", 41 42 name, 42 - chrono::Utc::now().to_rfc3339() 43 + now, 43 44 ) 44 - .execute(&self.db.db) 45 + .execute(&self.db.pool) 45 46 .await 46 47 .context("failed to insert migration record")?; 47 48 } ··· 61 62 fn _001_init(db: &ActorDb) -> Result<()> { 62 63 tokio::task::block_in_place(|| { 63 64 tokio::runtime::Handle::current() 64 - .block_on(create_tables(&db.db)) 65 + .block_on(create_tables(&db.pool)) 65 66 .context("failed to create initial tables") 66 67 }) 67 68 }
+2 -2
src/actor_store/mod.rs
··· 5 5 mod actor_store_transactor; 6 6 mod actor_store_writer; 7 7 mod blob; 8 - // mod db; 8 + mod db; 9 9 mod preference; 10 10 mod record; 11 11 mod repo; ··· 15 15 // pub(crate) use actor_store_reader::ActorStoreReader; 16 16 pub(crate) use actor_store_transactor::ActorStoreTransactor; 17 17 pub(crate) use actor_store_writer::ActorStoreWriter; 18 - // pub(crate) use db::ActorDb; 18 + pub(crate) use db::ActorDb; 19 19 pub(crate) use resources::ActorStoreResources;
+4 -4
src/actor_store/preference/mod.rs
··· 1 1 //! Preference handling for actor store. 2 2 3 - // mod reader; 4 - // mod transactor; 3 + mod reader; 4 + mod transactor; 5 5 6 - // pub(crate) use reader::PreferenceReader; 7 - // pub(crate) use transactor::PreferenceTransactor; 6 + pub(crate) use reader::PreferenceReader; 7 + pub(crate) use transactor::PreferenceTransactor;
+1 -1
src/actor_store/preference/reader.rs
··· 32 32 scope: &str, 33 33 ) -> Result<Vec<AccountPreference>> { 34 34 let prefs_res = sqlx::query!("SELECT * FROM account_pref ORDER BY id") 35 - .fetch_all(&self.db.db) 35 + .fetch_all(&self.db.pool) 36 36 .await?; 37 37 38 38 let prefs = prefs_res
+1 -1
src/actor_store/preference/transactor.rs
··· 48 48 let mut tx = self 49 49 .reader 50 50 .db 51 - .db 51 + .pool 52 52 .begin() 53 53 .await 54 54 .context("failed to begin transaction")?;
+4 -4
src/actor_store/record/mod.rs
··· 1 1 //! Record storage and retrieval for the actor store. 2 2 3 - // mod reader; 4 - // mod transactor; 3 + mod reader; 4 + mod transactor; 5 5 6 - // pub(crate) use reader::RecordReader; 7 - // pub(crate) use transactor::RecordTransactor; 6 + pub(crate) use reader::RecordReader; 7 + pub(crate) use transactor::RecordTransactor;
+19 -24
src/actor_store/record/reader.rs
··· 3 3 use anyhow::{Context as _, Result}; 4 4 use atrium_repo::Cid; 5 5 use rsky_syntax::aturi::AtUri; 6 - use sqlx::{Row, SqlitePool}; 6 + use sqlx::Row; 7 7 use std::str::FromStr; 8 8 9 - use crate::actor_store::db::schema::Backlink; 9 + use crate::actor_store::{ActorDb, db::schema::Backlink}; 10 10 11 11 /// Reader for record data. 12 12 pub(crate) struct RecordReader { 13 13 /// Database connection. 14 - pub db: SqlitePool, 15 - /// DID of the repository owner. 16 - pub did: String, 14 + pub db: ActorDb, 17 15 } 18 16 19 17 /// Record descriptor containing URI, path, and CID. ··· 62 60 63 61 impl RecordReader { 64 62 /// Create a new record reader. 65 - pub(crate) fn new(db: SqlitePool, did: String) -> Self { 66 - Self { db, did } 63 + pub(crate) fn new(db: ActorDb) -> Self { 64 + Self { db } 67 65 } 68 66 69 67 /// Count the total number of records. 70 68 pub(crate) async fn record_count(&self) -> Result<i64> { 71 69 let result = sqlx::query!(r#"SELECT COUNT(*) as count FROM record"#) 72 - .fetch_one(&self.db) 70 + .fetch_one(&self.db.pool) 73 71 .await 74 72 .context("failed to count records")?; 75 73 ··· 86 84 "SELECT uri, cid FROM record WHERE uri > ? ORDER BY uri ASC LIMIT 1000", 87 85 current_cursor 88 86 ) 89 - .fetch_all(&self.db) 87 + .fetch_all(&self.db.pool) 90 88 .await 91 89 .context("failed to fetch records")?; 92 90 ··· 116 114 /// List all collections in the repository. 117 115 pub(crate) async fn list_collections(&self) -> Result<Vec<String>> { 118 116 let rows = sqlx::query!("SELECT collection FROM record GROUP BY collection") 119 - .fetch_all(&self.db) 117 + .fetch_all(&self.db.pool) 120 118 .await 121 119 .context("failed to list collections")?; 122 120 ··· 173 171 174 172 let rows = query 175 173 .build() 176 - .fetch_all(&self.db) 174 + .fetch_all(&self.db.pool) 177 175 .await 178 176 .context("failed to list records")?; 179 177 ··· 227 225 228 226 let row = query 229 227 .build() 230 - .fetch_optional(&self.db) 228 + .fetch_optional(&self.db.pool) 231 229 .await 232 230 .context("failed to fetch record")?; 233 231 ··· 276 274 277 275 let result = query 278 276 .build() 279 - .fetch_optional(&self.db) 277 + .fetch_optional(&self.db.pool) 280 278 .await 281 279 .context("failed to check record existence")?; 282 280 ··· 286 284 /// Get the takedown status of a record. 287 285 pub(crate) async fn get_record_takedown_status(&self, uri: &str) -> Result<Option<StatusAttr>> { 288 286 let result = sqlx::query!("SELECT takedownRef FROM record WHERE uri = ?", uri) 289 - .fetch_optional(&self.db) 287 + .fetch_optional(&self.db.pool) 290 288 .await 291 289 .context("failed to fetch takedown status")?; 292 290 ··· 311 309 /// Get the current CID for a record URI. 312 310 pub(crate) async fn get_current_record_cid(&self, uri: &str) -> Result<Option<Cid>> { 313 311 let result = sqlx::query!("SELECT cid FROM record WHERE uri = ?", uri) 314 - .fetch_optional(&self.db) 312 + .fetch_optional(&self.db.pool) 315 313 .await 316 314 .context("failed to fetch record CID")?; 317 315 ··· 344 342 link_to, 345 343 collection 346 344 ) 347 - .fetch_all(&self.db) 345 + .fetch_all(&self.db.pool) 348 346 .await 349 347 .context("failed to fetch record backlinks")?; 350 348 ··· 358 356 repo_rev: Some(row.repoRev), 359 357 indexed_at: row.indexedAt, 360 358 takedown_ref: row.takedownRef, 361 - did: self.did.clone(), 362 359 }); 363 360 } 364 361 ··· 392 389 backlink.link_to, 393 390 uri_collection 394 391 ) 395 - .fetch_all(&self.db) 392 + .fetch_all(&self.db.pool) 396 393 .await 397 394 .context("failed to fetch backlink conflicts")?; 398 395 ··· 414 411 "SELECT cid FROM repo_block WHERE cid > ? ORDER BY cid ASC LIMIT 1000", 415 412 current_cursor 416 413 ) 417 - .fetch_all(&self.db) 414 + .fetch_all(&self.db.pool) 418 415 .await 419 416 .context("failed to fetch blocks")?; 420 417 ··· 445 442 LIMIT 1 446 443 "# 447 444 ) 448 - .fetch_optional(&self.db) 445 + .fetch_optional(&self.db.pool) 449 446 .await 450 447 .context("failed to fetch profile record")?; 451 448 ··· 468 465 r#"SELECT repoRev FROM record WHERE repoRev <= ? LIMIT 1"#, 469 466 rev 470 467 ) 471 - .fetch_optional(&self.db) 468 + .fetch_optional(&self.db.pool) 472 469 .await 473 470 .context("failed to check revision existence")?; 474 471 ··· 488 485 "#, 489 486 rev 490 487 ) 491 - .fetch_all(&self.db) 488 + .fetch_all(&self.db.pool) 492 489 .await 493 490 .context("failed to fetch records since revision")?; 494 491 ··· 527 524 pub indexed_at: String, 528 525 /// Reference for takedown, if any. 529 526 pub takedown_ref: Option<String>, 530 - /// DID of the repository owner. 531 - pub did: String, 532 527 } 533 528 534 529 /// Status attribute for takedowns
+10 -9
src/actor_store/record/transactor.rs
··· 4 4 use atrium_repo::Cid; 5 5 use rsky_repo::types::WriteOpAction; 6 6 use rsky_syntax::aturi::AtUri; 7 - use sqlx::SqlitePool; 8 7 8 + use crate::actor_store::ActorDb; 9 9 use crate::actor_store::db::schema::Backlink; 10 10 use crate::actor_store::record::reader::{RecordReader, StatusAttr, get_backlinks}; 11 + use crate::repo::types::BlobStore as BlobStore; 11 12 12 13 /// Transaction handler for record operations. 13 14 pub(crate) struct RecordTransactor { 14 15 /// The record reader. 15 16 pub reader: RecordReader, 16 17 /// The blob store. 17 - pub blobstore: SqlitePool, // This will be replaced with proper BlobStore type 18 + pub blobstore: BlobStore, 18 19 } 19 20 20 21 impl RecordTransactor { 21 22 /// Create a new record transactor. 22 - pub(crate) fn new(db: SqlitePool, blobstore: SqlitePool, did: String) -> Self { 23 + pub(crate) fn new(db: ActorDb, blobstore: BlobStore) -> Self { 23 24 Self { 24 - reader: RecordReader::new(db, did), 25 + reader: RecordReader::new(db), 25 26 blobstore, 26 27 } 27 28 } ··· 79 80 repo_rev, 80 81 now 81 82 ) 82 - .execute(&self.reader.db) 83 + .execute(&self.reader.db.pool) 83 84 .await 84 85 .context("failed to index record")?; 85 86 ··· 107 108 tracing::debug!("Deleting indexed record {}", uri_str); 108 109 109 110 // Delete the record and its backlinks in a transaction 110 - let mut tx = self.reader.db.begin().await?; 111 + let mut tx = self.reader.db.pool.begin().await?; 111 112 112 113 // Delete from record table 113 114 sqlx::query!("DELETE FROM record WHERE uri = ?", uri_str) ··· 130 131 /// Remove backlinks for a URI. 131 132 pub(crate) async fn remove_backlinks_by_uri(&self, uri: &str) -> Result<()> { 132 133 sqlx::query!("DELETE FROM backlink WHERE uri = ?", uri) 133 - .execute(&self.reader.db) 134 + .execute(&self.reader.db.pool) 134 135 .await 135 136 .context("failed to remove backlinks")?; 136 137 ··· 165 166 166 167 query 167 168 .build() 168 - .execute(&self.reader.db) 169 + .execute(&self.reader.db.pool) 169 170 .await 170 171 .context("failed to add backlinks")?; 171 172 ··· 192 193 takedown_ref, 193 194 uri_str 194 195 ) 195 - .execute(&self.reader.db) 196 + .execute(&self.reader.db.pool) 196 197 .await 197 198 .context("failed to update record takedown status")?; 198 199
+7 -7
src/db/db.rs
··· 20 20 #[derive(Clone)] 21 21 pub(crate) struct Database { 22 22 /// SQLite connection pool. 23 - pub(crate) db: Arc<SqlitePool>, 23 + pub(crate) pool: SqlitePool, 24 24 /// Flag indicating if the database is destroyed. 25 - destroyed: Arc<Mutex<bool>>, 25 + pub(crate) destroyed: Arc<Mutex<bool>>, 26 26 /// Queue of commit hooks. 27 - commit_hooks: Arc<AsyncMutex<VecDeque<Box<dyn FnOnce() + Send>>>>, 27 + pub(crate) commit_hooks: Arc<AsyncMutex<VecDeque<Box<dyn FnOnce() + Send>>>>, 28 28 } 29 29 30 30 impl Database { ··· 39 39 40 40 let pool = SqlitePool::connect_with(options).await?; 41 41 Ok(Self { 42 - db: Arc::new(pool), 42 + pool, 43 43 destroyed: Arc::new(Mutex::new(false)), 44 44 commit_hooks: Arc::new(AsyncMutex::new(VecDeque::new())), 45 45 }) ··· 47 47 48 48 /// Ensures the database is using Write-Ahead Logging (WAL) mode. 49 49 pub async fn ensure_wal(&self) -> sqlx::Result<()> { 50 - let mut conn = self.db.acquire().await?; 50 + let mut conn = self.pool.acquire().await?; 51 51 sqlx::query("PRAGMA journal_mode = WAL") 52 52 .execute(&mut *conn) 53 53 .await?; ··· 59 59 where 60 60 F: FnOnce(&mut Transaction<'_, Sqlite>) -> sqlx::Result<T>, 61 61 { 62 - let mut tx = self.db.begin().await?; 62 + let mut tx = self.pool.begin().await?; 63 63 let result = func(&mut tx)?; 64 64 tx.commit().await?; 65 65 self.run_commit_hooks().await; ··· 98 98 return Ok(()); 99 99 } 100 100 *destroyed = true; 101 - drop(self.db.clone()); // Drop the pool to close connections. 101 + drop(self.pool.clone()); // Drop the pool to close connections. 102 102 Ok(()) 103 103 } 104 104
+69
src/repo/types.rs
··· 1070 1070 pub rkey: String, 1071 1071 pub cid: Option<Cid>, 1072 1072 } 1073 + 1074 + pub(crate) struct BlobStore; 1075 + pub(crate) trait BlobStoreTrait { 1076 + fn put_temp(&self, bytes: &[u8]) -> Result<String>; // bytes: Uint8Array | stream.Readable 1077 + fn make_permanent(&self, key: &str, cid: Cid) -> Result<()>; 1078 + fn put_permanent(&self, cid: Cid, bytes: &[u8]) -> Result<()>; 1079 + fn quarantine(&self, cid: Cid) -> Result<()>; 1080 + fn unquarantine(&self, cid: Cid) -> Result<()>; 1081 + fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>>; 1082 + fn get_stream(&self, cid: Cid) -> Result<Vec<u8>>; // Promise<stream.Readable> 1083 + fn has_temp(&self, key: &str) -> Result<bool>; 1084 + fn has_stored(&self, cid: Cid) -> Result<bool>; 1085 + fn delete(&self, cid: Cid) -> Result<()>; 1086 + fn delete_many(&self, cids: Vec<Cid>) -> Result<()>; 1087 + } 1088 + impl BlobStoreTrait for BlobStore { 1089 + fn put_temp(&self, bytes: &[u8]) -> Result<String> { 1090 + // Implementation here 1091 + Ok("temp_key".to_string()) 1092 + } 1093 + 1094 + fn make_permanent(&self, key: &str, cid: Cid) -> Result<()> { 1095 + // Implementation here 1096 + Ok(()) 1097 + } 1098 + 1099 + fn put_permanent(&self, cid: Cid, bytes: &[u8]) -> Result<()> { 1100 + // Implementation here 1101 + Ok(()) 1102 + } 1103 + 1104 + fn quarantine(&self, cid: Cid) -> Result<()> { 1105 + // Implementation here 1106 + Ok(()) 1107 + } 1108 + 1109 + fn unquarantine(&self, cid: Cid) -> Result<()> { 1110 + // Implementation here 1111 + Ok(()) 1112 + } 1113 + 1114 + fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>> { 1115 + // Implementation here 1116 + Ok(vec![]) 1117 + } 1118 + 1119 + fn get_stream(&self, cid: Cid) -> Result<Vec<u8>> { 1120 + // Implementation here 1121 + Ok(vec![]) 1122 + } 1123 + 1124 + fn has_temp(&self, key: &str) -> Result<bool> { 1125 + // Implementation here 1126 + Ok(true) 1127 + } 1128 + 1129 + fn has_stored(&self, cid: Cid) -> Result<bool> { 1130 + // Implementation here 1131 + Ok(true) 1132 + } 1133 + fn delete(&self, cid: Cid) -> Result<()> { 1134 + // Implementation here 1135 + Ok(()) 1136 + } 1137 + fn delete_many(&self, cids: Vec<Cid>) -> Result<()> { 1138 + // Implementation here 1139 + Ok(()) 1140 + } 1141 + }