Alternative ATProto PDS implementation

implement actor_store

Changed files
+149 -32
src
+44 -3
src/actor_store/blob.rs
··· 43 43 /// DID of the actor 44 44 pub did: String, 45 45 /// Database connection 46 - pub db: deadpool_diesel::Connection<SqliteConnection>, 46 + pub db: deadpool_diesel::Pool< 47 + deadpool_diesel::Manager<SqliteConnection>, 48 + deadpool_diesel::sqlite::Object, 49 + >, 47 50 } 48 51 49 52 impl BlobReader { 50 53 /// Create a new blob reader 51 - pub fn new(blobstore: BlobStoreSql, db: deadpool_diesel::Connection<SqliteConnection>) -> Self { 54 + pub fn new( 55 + blobstore: BlobStoreSql, 56 + db: deadpool_diesel::Pool< 57 + deadpool_diesel::Manager<SqliteConnection>, 58 + deadpool_diesel::sqlite::Object, 59 + >, 60 + ) -> Self { 52 61 BlobReader { 53 62 did: blobstore.did.clone(), 54 63 blobstore, ··· 63 72 let did = self.did.clone(); 64 73 let found = self 65 74 .db 75 + .get() 76 + .await? 66 77 .interact(move |conn| { 67 78 BlobSchema::blob 68 79 .filter(BlobSchema::did.eq(did)) ··· 106 117 let did = self.did.clone(); 107 118 let res = self 108 119 .db 120 + .get() 121 + .await? 109 122 .interact(move |conn| { 110 123 let results = RecordBlobSchema::record_blob 111 124 .filter(RecordBlobSchema::blobCid.eq(cid.to_string())) ··· 163 176 use rsky_pds::schema::pds::blob::dsl as BlobSchema; 164 177 165 178 let did = self.did.clone(); 166 - self.db.interact(move |conn| { 179 + self.db.get().await?.interact(move |conn| { 167 180 let BlobMetadata { 168 181 temp_key, 169 182 size, ··· 265 278 let uris_clone = uris.clone(); 266 279 let deleted_repo_blobs: Vec<models::RecordBlob> = self 267 280 .db 281 + .get() 282 + .await? 268 283 .interact(move |conn| { 269 284 RecordBlobSchema::record_blob 270 285 .filter(RecordBlobSchema::recordUri.eq_any(&uris_clone)) ··· 281 296 // Now perform the delete 282 297 let uris_clone = uris.clone(); 283 298 self.db 299 + .get() 300 + .await? 284 301 .interact(move |conn| { 285 302 delete(RecordBlobSchema::record_blob) 286 303 .filter(RecordBlobSchema::recordUri.eq_any(uris_clone)) ··· 300 317 let did_clone = self.did.clone(); 301 318 let duplicated_cids: Vec<String> = self 302 319 .db 320 + .get() 321 + .await? 303 322 .interact(move |conn| { 304 323 RecordBlobSchema::record_blob 305 324 .filter(RecordBlobSchema::blobCid.eq_any(cids_clone)) ··· 336 355 let cids = cids_to_delete.clone(); 337 356 let did_clone = self.did.clone(); 338 357 self.db 358 + .get() 359 + .await? 339 360 .interact(move |conn| { 340 361 delete(BlobSchema::blob) 341 362 .filter(BlobSchema::cid.eq_any(cids)) ··· 368 389 369 390 let found = self 370 391 .db 392 + .get() 393 + .await? 371 394 .interact(move |conn| { 372 395 BlobSchema::blob 373 396 .filter( ··· 390 413 .await?; 391 414 } 392 415 self.db 416 + .get() 417 + .await? 393 418 .interact(move |conn| { 394 419 update(BlobSchema::blob) 395 420 .filter(BlobSchema::tempKey.eq(found.temp_key)) ··· 412 437 let did = self.did.clone(); 413 438 414 439 self.db 440 + .get() 441 + .await? 415 442 .interact(move |conn| { 416 443 insert_into(RecordBlobSchema::record_blob) 417 444 .values(( ··· 434 461 435 462 let did = self.did.clone(); 436 463 self.db 464 + .get() 465 + .await? 437 466 .interact(move |conn| { 438 467 let res = BlobSchema::blob 439 468 .filter(BlobSchema::did.eq(&did)) ··· 451 480 452 481 let did = self.did.clone(); 453 482 self.db 483 + .get() 484 + .await? 454 485 .interact(move |conn| { 455 486 let res: i64 = RecordBlobSchema::record_blob 456 487 .filter(RecordBlobSchema::did.eq(&did)) ··· 472 503 473 504 let did = self.did.clone(); 474 505 self.db 506 + .get() 507 + .await? 475 508 .interact(move |conn| { 476 509 let ListMissingBlobsOpts { cursor, limit } = opts; 477 510 ··· 552 585 builder = builder.filter(RecordBlobSchema::blobCid.gt(cursor)); 553 586 } 554 587 self.db 588 + .get() 589 + .await? 555 590 .interact(move |conn| builder.load(conn)) 556 591 .await 557 592 .expect("Failed to list blobs")? ··· 567 602 builder = builder.filter(RecordBlobSchema::blobCid.gt(cursor)); 568 603 } 569 604 self.db 605 + .get() 606 + .await? 570 607 .interact(move |conn| builder.load(conn)) 571 608 .await 572 609 .expect("Failed to list blobs")? ··· 580 617 use rsky_pds::schema::pds::blob::dsl as BlobSchema; 581 618 582 619 self.db 620 + .get() 621 + .await? 583 622 .interact(move |conn| { 584 623 let res = BlobSchema::blob 585 624 .filter(BlobSchema::cid.eq(cid.to_string())) ··· 621 660 let did_clone = self.did.clone(); 622 661 623 662 self.db 663 + .get() 664 + .await? 624 665 .interact(move |conn| { 625 666 update(BlobSchema::blob) 626 667 .filter(BlobSchema::cid.eq(blob_cid))
+17 -15
src/actor_store/mod.rs
··· 75 75 pub fn new( 76 76 did: String, 77 77 blobstore: BlobStoreSql, 78 - db: deadpool_diesel::Connection<SqliteConnection>, 78 + db: deadpool_diesel::Pool< 79 + deadpool_diesel::Manager<SqliteConnection>, 80 + deadpool_diesel::sqlite::Object, 81 + >, 82 + conn: deadpool_diesel::sqlite::Object, 79 83 ) -> Self { 80 84 ActorStore { 81 - storage: Arc::new(RwLock::new(SqlRepoReader::new( 82 - did.clone(), 83 - None, 84 - db.clone(), 85 - ))), 85 + storage: Arc::new(RwLock::new(SqlRepoReader::new(did.clone(), None, conn))), 86 86 record: RecordReader::new(did.clone(), db.clone()), 87 87 pref: PreferenceReader::new(did.clone(), db.clone()), 88 88 did, 89 - blob: BlobReader::new(blobstore, db.clone()), // Unlike TS impl, just use blob reader vs generator 89 + blob: BlobReader::new(blobstore, db.clone()), 90 90 } 91 91 } 92 92 ··· 438 438 pub async fn destroy(&mut self) -> Result<()> { 439 439 let did: String = self.did.clone(); 440 440 let storage_guard = self.storage.read().await; 441 - let db: deadpool_diesel::Connection<SqliteConnection> = storage_guard.db.clone(); 442 441 use rsky_pds::schema::pds::blob::dsl as BlobSchema; 443 442 444 - let blob_rows: Vec<String> = db 445 - .run(move |conn| { 443 + let blob_rows: Vec<String> = storage_guard 444 + .db 445 + .interact(move |conn| { 446 446 BlobSchema::blob 447 447 .filter(BlobSchema::did.eq(did)) 448 448 .select(BlobSchema::cid) 449 449 .get_results(conn) 450 450 }) 451 - .await?; 451 + .await 452 + .expect("Failed to get blob rows")?; 452 453 let cids = blob_rows 453 454 .into_iter() 454 455 .map(|row| Ok(Cid::from_str(&row)?)) ··· 472 473 } 473 474 let did: String = self.did.clone(); 474 475 let storage_guard = self.storage.read().await; 475 - let db: deadpool_diesel::Connection<SqliteConnection> = storage_guard.db.clone(); 476 476 use rsky_pds::schema::pds::record::dsl as RecordSchema; 477 477 478 478 let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); 479 479 let touched_uri_strs: Vec<String> = touched_uris.iter().map(|t| t.to_string()).collect(); 480 - let res: Vec<String> = db 481 - .run(move |conn| { 480 + let res: Vec<String> = storage_guard 481 + .db 482 + .interact(move |conn| { 482 483 RecordSchema::record 483 484 .filter(RecordSchema::did.eq(did)) 484 485 .filter(RecordSchema::cid.eq_any(cid_strs)) ··· 486 487 .select(RecordSchema::cid) 487 488 .get_results(conn) 488 489 }) 489 - .await?; 490 + .await 491 + .expect("Failed to get duplicate record cids")?; 490 492 res.into_iter() 491 493 .map(|row| Cid::from_str(&row).map_err(|error| anyhow::Error::new(error))) 492 494 .collect::<Result<Vec<Cid>>>()
+14 -3
src/actor_store/preference.rs
··· 14 14 15 15 pub struct PreferenceReader { 16 16 pub did: String, 17 - pub db: deadpool_diesel::Connection<SqliteConnection>, 17 + pub db: deadpool_diesel::Pool< 18 + deadpool_diesel::Manager<SqliteConnection>, 19 + deadpool_diesel::sqlite::Object, 20 + >, 18 21 } 19 22 20 23 impl PreferenceReader { 21 - pub fn new(did: String, db: deadpool_diesel::Connection<SqliteConnection>) -> Self { 24 + pub fn new( 25 + did: String, 26 + db: deadpool_diesel::Pool< 27 + deadpool_diesel::Manager<SqliteConnection>, 28 + deadpool_diesel::sqlite::Object, 29 + >, 30 + ) -> Self { 22 31 PreferenceReader { did, db } 23 32 } 24 33 ··· 31 40 32 41 let did = self.did.clone(); 33 42 self.db 43 + .get() 44 + .await? 34 45 .interact(move |conn| { 35 46 let prefs_res = AccountPrefSchema::account_pref 36 47 .filter(AccountPrefSchema::did.eq(&did)) ··· 69 80 scope: AuthScope, 70 81 ) -> Result<()> { 71 82 let did = self.did.clone(); 72 - self.db 83 + self.db.get().await? 73 84 .interact(move |conn| { 74 85 match values 75 86 .iter()
+37 -2
src/actor_store/record.rs
··· 21 21 /// Combined handler for record operations with both read and write capabilities. 22 22 pub(crate) struct RecordReader { 23 23 /// Database connection. 24 - pub db: deadpool_diesel::Connection<SqliteConnection>, 24 + pub db: deadpool_diesel::Pool< 25 + deadpool_diesel::Manager<SqliteConnection>, 26 + deadpool_diesel::sqlite::Object, 27 + >, 25 28 /// DID of the actor. 26 29 pub did: String, 27 30 } 28 31 29 32 impl RecordReader { 30 33 /// Create a new record handler. 31 - pub(crate) fn new(did: String, db: deadpool_diesel::Connection<SqliteConnection>) -> Self { 34 + pub(crate) fn new( 35 + did: String, 36 + db: deadpool_diesel::Pool< 37 + deadpool_diesel::Manager<SqliteConnection>, 38 + deadpool_diesel::sqlite::Object, 39 + >, 40 + ) -> Self { 32 41 Self { did, db } 33 42 } 34 43 ··· 38 47 39 48 let other_did = self.did.clone(); 40 49 self.db 50 + .get() 51 + .await? 41 52 .interact(move |conn| { 42 53 let res: i64 = record.filter(did.eq(&other_did)).count().get_result(conn)?; 43 54 Ok(res) ··· 52 63 53 64 let other_did = self.did.clone(); 54 65 self.db 66 + .get() 67 + .await? 55 68 .interact(move |conn| { 56 69 let collections = record 57 70 .filter(did.eq(&other_did)) ··· 117 130 } 118 131 let res: Vec<(Record, RepoBlock)> = self 119 132 .db 133 + .get() 134 + .await? 120 135 .interact(move |conn| builder.load(conn)) 121 136 .await 122 137 .expect("Failed to load records")?; ··· 159 174 } 160 175 let record: Option<(Record, RepoBlock)> = self 161 176 .db 177 + .get() 178 + .await? 162 179 .interact(move |conn| builder.first(conn).optional()) 163 180 .await 164 181 .expect("Failed to load record")?; ··· 201 218 } 202 219 let record_uri = self 203 220 .db 221 + .get() 222 + .await? 204 223 .interact(move |conn| builder.first::<String>(conn).optional()) 205 224 .await 206 225 .expect("Failed to check record")?; ··· 216 235 217 236 let res = self 218 237 .db 238 + .get() 239 + .await? 219 240 .interact(move |conn| { 220 241 RecordSchema::record 221 242 .select(RecordSchema::takedownRef) ··· 248 269 249 270 let res = self 250 271 .db 272 + .get() 273 + .await? 251 274 .interact(move |conn| { 252 275 RecordSchema::record 253 276 .select(RecordSchema::cid) ··· 276 299 277 300 let res = self 278 301 .db 302 + .get() 303 + .await? 279 304 .interact(move |conn| { 280 305 RecordSchema::record 281 306 .inner_join( ··· 373 398 // Track current version of record 374 399 let (record, uri) = self 375 400 .db 401 + .get() 402 + .await? 376 403 .interact(move |conn| { 377 404 insert_into(RecordSchema::record) 378 405 .values(row) ··· 411 438 use rsky_pds::schema::pds::record::dsl as RecordSchema; 412 439 let uri = uri.to_string(); 413 440 self.db 441 + .get() 442 + .await? 414 443 .interact(move |conn| { 415 444 delete(RecordSchema::record) 416 445 .filter(RecordSchema::uri.eq(&uri)) ··· 432 461 use rsky_pds::schema::pds::backlink::dsl as BacklinkSchema; 433 462 let uri = uri.to_string(); 434 463 self.db 464 + .get() 465 + .await? 435 466 .interact(move |conn| { 436 467 delete(BacklinkSchema::backlink) 437 468 .filter(BacklinkSchema::uri.eq(uri)) ··· 449 480 } else { 450 481 use rsky_pds::schema::pds::backlink::dsl as BacklinkSchema; 451 482 self.db 483 + .get() 484 + .await? 452 485 .interact(move |conn| { 453 486 insert_or_ignore_into(BacklinkSchema::backlink) 454 487 .values(&backlinks) ··· 478 511 let uri_string = uri.to_string(); 479 512 480 513 self.db 514 + .get() 515 + .await? 481 516 .interact(move |conn| { 482 517 update(RecordSchema::record) 483 518 .filter(RecordSchema::uri.eq(uri_string))
+29 -3
src/actor_store/sql_blob.rs
··· 26 26 /// SQL-based implementation of blob storage 27 27 pub struct BlobStoreSql { 28 28 /// Database connection for metadata 29 - pub db: deadpool_diesel::Connection<SqliteConnection>, 29 + pub db: deadpool_diesel::Pool< 30 + deadpool_diesel::Manager<SqliteConnection>, 31 + deadpool_diesel::sqlite::Object, 32 + >, 30 33 /// DID of the actor 31 34 pub did: String, 32 35 } ··· 57 60 58 61 impl BlobStoreSql { 59 62 /// Create a new SQL-based blob store for the given DID 60 - pub fn new(did: String, db: deadpool_diesel::Connection<SqliteConnection>) -> Self { 63 + pub fn new( 64 + did: String, 65 + db: deadpool_diesel::Pool< 66 + deadpool_diesel::Manager<SqliteConnection>, 67 + deadpool_diesel::sqlite::Object, 68 + >, 69 + ) -> Self { 61 70 BlobStoreSql { db, did } 62 71 } 63 72 64 73 // /// Create a factory function for blob stores 65 74 // pub fn creator( 66 - // db: deadpool_diesel::Connection<SqliteConnection>, 75 + // db: deadpool_diesel::Pool< 76 + // deadpool_diesel::Manager<SqliteConnection>, 77 + // deadpool_diesel::sqlite::Object, 78 + // >, 67 79 // ) -> Box<dyn Fn(String) -> BlobStoreSql> { 68 80 // let db_clone = db.clone(); 69 81 // Box::new(move |did: String| BlobStoreSql::new(did, db_clone.clone())) ··· 107 119 108 120 // Store directly in the database 109 121 self.db 122 + .get() 123 + .await? 110 124 .interact(move |conn| { 111 125 let data_clone = bytes.clone(); 112 126 let entry = BlobEntry { ··· 145 159 146 160 // Update the quarantine flag in the database 147 161 self.db 162 + .get() 163 + .await? 148 164 .interact(move |conn| { 149 165 diesel::update(blobs::table) 150 166 .filter(blobs::cid.eq(&cid_str)) ··· 166 182 167 183 // Update the quarantine flag in the database 168 184 self.db 185 + .get() 186 + .await? 169 187 .interact(move |conn| { 170 188 diesel::update(blobs::table) 171 189 .filter(blobs::cid.eq(&cid_str)) ··· 190 208 // Get the blob data from the database 191 209 let blob_data = self 192 210 .db 211 + .get() 212 + .await? 193 213 .interact(move |conn| { 194 214 blobs 195 215 .filter(self::blobs::cid.eq(&cid_str)) ··· 229 249 230 250 // Delete from database 231 251 self.db 252 + .get() 253 + .await? 232 254 .interact(move |conn| { 233 255 diesel::delete(blobs) 234 256 .filter(self::blobs::cid.eq(&blob_cid)) ··· 251 273 252 274 // Delete all blobs in one operation 253 275 self.db 276 + .get() 277 + .await? 254 278 .interact(move |conn| { 255 279 diesel::delete(blobs) 256 280 .filter(self::blobs::cid.eq_any(cid_strings)) ··· 273 297 274 298 let exists = self 275 299 .db 300 + .get() 301 + .await? 276 302 .interact(move |conn| { 277 303 diesel::select(diesel::dsl::exists( 278 304 blobs
+2 -6
src/actor_store/sql_repo.rs
··· 27 27 28 28 pub struct SqlRepoReader { 29 29 pub cache: Arc<RwLock<BlockMap>>, 30 - pub db: deadpool_diesel::Connection<SqliteConnection>, 30 + pub db: deadpool_diesel::sqlite::Object, 31 31 pub root: Option<Cid>, 32 32 pub rev: Option<String>, 33 33 pub now: String, ··· 327 327 328 328 // Basically handles getting ipld blocks from db 329 329 impl SqlRepoReader { 330 - pub fn new( 331 - did: String, 332 - now: Option<String>, 333 - db: deadpool_diesel::Connection<SqliteConnection>, 334 - ) -> Self { 330 + pub fn new(did: String, now: Option<String>, db: deadpool_diesel::sqlite::Object) -> Self { 335 331 let now = now.unwrap_or_else(rsky_common::now); 336 332 SqlRepoReader { 337 333 cache: Arc::new(RwLock::new(BlockMap::new())),
+6
src/endpoints/repo/apply_writes.rs
··· 159 159 let actor_db = db_actors 160 160 .get(did) 161 161 .ok_or_else(|| anyhow!("Actor DB not found"))?; 162 + let conn = actor_db 163 + .repo 164 + .get() 165 + .await 166 + .context("Failed to get actor db connection")?; 162 167 let mut actor_store = ActorStore::new( 163 168 did.clone(), 164 169 BlobStoreSql::new(did.clone(), actor_db.blob), 165 170 actor_db.repo, 171 + conn, 166 172 ); 167 173 168 174 let commit = actor_store