Alternative ATProto PDS implementation
at oauth 21 kB view raw
1//! Actor store implementation for ATProto PDS. 2//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/mod.rs 3//! Which is based on https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts 4//! and also adds components from https://github.com/bluesky-social/atproto/blob/main/packages/pds/src/actor-store/repo/transactor.ts 5//! blacksky-algorithms/rsky is licensed under the Apache License 2.0 6//! 7//! Modified for SQLite backend 8 9mod blob; 10pub(crate) mod blob_fs; 11mod preference; 12mod record; 13pub(crate) mod sql_blob; 14mod sql_repo; 15 16use anyhow::Result; 17use cidv10::Cid; 18use diesel::*; 19use futures::stream::{self, StreamExt}; 20use rsky_pds::actor_store::repo::types::SyncEvtData; 21use rsky_repo::repo::Repo; 22use rsky_repo::storage::readable_blockstore::ReadableBlockstore; 23use rsky_repo::storage::types::RepoStorage; 24use rsky_repo::types::{ 25 CommitAction, CommitData, CommitDataWithOps, CommitOp, PreparedCreateOrUpdate, PreparedWrite, 26 RecordCreateOrUpdateOp, RecordWriteEnum, RecordWriteOp, WriteOpAction, write_to_op, 27}; 28use rsky_repo::util::format_data_key; 29use rsky_syntax::aturi::AtUri; 30use secp256k1::{Keypair, Secp256k1, SecretKey}; 31use std::str::FromStr; 32use std::sync::Arc; 33use std::{env, fmt}; 34use tokio::sync::RwLock; 35 36use blob::BlobReader; 37use blob_fs::BlobStoreFs; 38use preference::PreferenceReader; 39use record::RecordReader; 40use sql_repo::SqlRepoReader; 41 42use crate::serve::ActorStorage; 43 44#[derive(Debug)] 45enum FormatCommitError { 46 BadRecordSwap(String), 47 RecordSwapMismatch(String), 48 BadCommitSwap(String), 49 MissingRepoRoot(String), 50} 51 52impl fmt::Display for FormatCommitError { 53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 54 match self { 55 Self::BadRecordSwap(record) => write!(f, "BadRecordSwapError: `{:?}`", record), 56 Self::RecordSwapMismatch(record) => { 57 write!(f, "BadRecordSwapError: current record is `{:?}`", record) 58 } 59 Self::BadCommitSwap(cid) => write!(f, "BadCommitSwapError: {}", cid), 60 Self::MissingRepoRoot(did) => write!(f, "No repo root found for `{}`", did), 61 } 62 } 63} 64 65impl std::error::Error for FormatCommitError {} 66 67pub struct ActorStore { 68 pub did: String, 69 pub storage: Arc<RwLock<SqlRepoReader>>, // get ipld blocks from db 70 pub record: RecordReader, // get lexicon records from db 71 pub blob: BlobReader, // get blobs 72 pub pref: PreferenceReader, // get preferences 73} 74 75// Combination of RepoReader/Transactor, BlobReader/Transactor, SqlRepoReader/Transactor 76impl ActorStore { 77 /// Concrete reader of an individual repo (hence BlobStoreFs which takes `did` param) 78 pub fn new( 79 did: String, 80 blobstore: BlobStoreFs, 81 db: deadpool_diesel::Pool< 82 deadpool_diesel::Manager<SqliteConnection>, 83 deadpool_diesel::sqlite::Object, 84 >, 85 conn: deadpool_diesel::sqlite::Object, 86 ) -> Self { 87 Self { 88 storage: Arc::new(RwLock::new(SqlRepoReader::new(did.clone(), None, conn))), 89 record: RecordReader::new(did.clone(), db.clone()), 90 pref: PreferenceReader::new(did.clone(), db.clone()), 91 did, 92 blob: BlobReader::new(blobstore, db), 93 } 94 } 95 96 /// Create a new ActorStore taking ActorPools HashMap as input 97 pub async fn from_actor_pools( 98 did: &String, 99 hashmap_actor_pools: &std::collections::HashMap<String, ActorStorage>, 100 ) -> Self { 101 let actor_pool = hashmap_actor_pools 102 .get(did) 103 .expect("Actor pool not found") 104 .clone(); 105 let blobstore = BlobStoreFs::new(did.clone(), actor_pool.blob); 106 let conn = actor_pool 107 .repo 108 .clone() 109 .get() 110 .await 111 .expect("Failed to get connection"); 112 Self::new(did.clone(), blobstore, actor_pool.repo, conn) 113 } 114 115 pub async fn get_repo_root(&self) -> Option<Cid> { 116 let storage_guard = self.storage.read().await; 117 storage_guard.get_root().await 118 } 119 120 // Transactors 121 // ------------------- 122 123 #[deprecated] 124 pub async fn create_repo_legacy( 125 &self, 126 keypair: Keypair, 127 writes: Vec<PreparedCreateOrUpdate>, 128 ) -> Result<CommitData> { 129 let write_ops = writes 130 .clone() 131 .into_iter() 132 .map(|prepare| { 133 let at_uri: AtUri = prepare.uri.try_into()?; 134 Ok(RecordCreateOrUpdateOp { 135 action: WriteOpAction::Create, 136 collection: at_uri.get_collection(), 137 rkey: at_uri.get_rkey(), 138 record: prepare.record, 139 }) 140 }) 141 .collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?; 142 let commit = Repo::format_init_commit( 143 self.storage.clone(), 144 self.did.clone(), 145 keypair, 146 Some(write_ops), 147 ) 148 .await?; 149 self.storage 150 .read() 151 .await 152 .apply_commit(commit.clone(), None) 153 .await?; 154 let writes = writes 155 .into_iter() 156 .map(PreparedWrite::Create) 157 .collect::<Vec<PreparedWrite>>(); 158 self.blob.process_write_blobs(writes).await?; 159 Ok(commit) 160 } 161 162 pub async fn create_repo( 163 &self, 164 keypair: Keypair, 165 writes: Vec<PreparedCreateOrUpdate>, 166 ) -> Result<CommitDataWithOps> { 167 let write_ops = writes 168 .clone() 169 .into_iter() 170 .map(|prepare| { 171 let at_uri: AtUri = prepare.uri.try_into()?; 172 Ok(RecordCreateOrUpdateOp { 173 action: WriteOpAction::Create, 174 collection: at_uri.get_collection(), 175 rkey: at_uri.get_rkey(), 176 record: prepare.record, 177 }) 178 }) 179 .collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?; 180 let commit = Repo::format_init_commit( 181 self.storage.clone(), 182 self.did.clone(), 183 keypair, 184 Some(write_ops), 185 ) 186 .await?; 187 self.storage 188 .read() 189 .await 190 .apply_commit(commit.clone(), None) 191 .await?; 192 let write_commit_ops = writes.iter().try_fold( 193 Vec::with_capacity(writes.len()), 194 |mut acc, w| -> Result<Vec<CommitOp>> { 195 let aturi: AtUri = w.uri.clone().try_into()?; 196 acc.push(CommitOp { 197 action: CommitAction::Create, 198 path: format_data_key(aturi.get_collection(), aturi.get_rkey()), 199 cid: Some(w.cid), 200 prev: None, 201 }); 202 Ok(acc) 203 }, 204 )?; 205 let writes = writes 206 .into_iter() 207 .map(PreparedWrite::Create) 208 .collect::<Vec<PreparedWrite>>(); 209 self.blob.process_write_blobs(writes).await?; 210 Ok(CommitDataWithOps { 211 commit_data: commit, 212 ops: write_commit_ops, 213 prev_data: None, 214 }) 215 } 216 217 pub async fn process_import_repo( 218 &mut self, 219 commit: CommitData, 220 writes: Vec<PreparedWrite>, 221 ) -> Result<()> { 222 { 223 let immutable_borrow = &self; 224 // & send to indexing 225 immutable_borrow 226 .index_writes(writes.clone(), &commit.rev) 227 .await?; 228 } 229 // persist the commit to repo storage 230 self.storage 231 .read() 232 .await 233 .apply_commit(commit.clone(), None) 234 .await?; 235 // process blobs 236 self.blob.process_write_blobs(writes).await?; 237 Ok(()) 238 } 239 240 pub async fn process_writes( 241 &mut self, 242 writes: Vec<PreparedWrite>, 243 swap_commit_cid: Option<Cid>, 244 ) -> Result<CommitDataWithOps> { 245 // NOTE: In the typescript PR on sync v1.1 246 // there are some safeguards added for adding 247 // very large commits and very many commits 248 // for which I'm sure we could safeguard on 249 // but may not be necessary. 250 // https://github.com/bluesky-social/atproto/pull/3585/files#diff-7627844a4a6b50190014e947d1331a96df3c64d4c5273fa0ce544f85c3c1265f 251 let commit = self.format_commit(writes.clone(), swap_commit_cid).await?; 252 { 253 let immutable_borrow = &self; 254 // & send to indexing 255 immutable_borrow 256 .index_writes(writes.clone(), &commit.commit_data.rev) 257 .await?; 258 } 259 // persist the commit to repo storage 260 self.storage 261 .read() 262 .await 263 .apply_commit(commit.commit_data.clone(), None) 264 .await?; 265 // process blobs 266 self.blob.process_write_blobs(writes).await?; 267 Ok(commit) 268 } 269 270 pub async fn get_sync_event_data(&mut self) -> Result<SyncEvtData> { 271 let current_root = self.storage.read().await.get_root_detailed().await?; 272 let blocks_and_missing = self 273 .storage 274 .read() 275 .await 276 .get_blocks(vec![current_root.cid]) 277 .await?; 278 Ok(SyncEvtData { 279 cid: current_root.cid, 280 rev: current_root.rev, 281 blocks: blocks_and_missing.blocks, 282 }) 283 } 284 285 pub async fn format_commit( 286 &mut self, 287 writes: Vec<PreparedWrite>, 288 swap_commit: Option<Cid>, 289 ) -> Result<CommitDataWithOps> { 290 let current_root = { 291 let storage_guard = self.storage.read().await; 292 storage_guard.get_root_detailed().await 293 }; 294 if let Ok(current_root) = current_root { 295 if let Some(swap_commit) = swap_commit { 296 if !current_root.cid.eq(&swap_commit) { 297 return Err( 298 FormatCommitError::BadCommitSwap(current_root.cid.to_string()).into(), 299 ); 300 } 301 } 302 { 303 self.storage 304 .write() 305 .await 306 .cache_rev(current_root.rev) 307 .await?; 308 } 309 let mut new_record_cids: Vec<Cid> = vec![]; 310 let mut delete_and_update_uris = vec![]; 311 let mut commit_ops = vec![]; 312 for write in &writes { 313 let commit_action: CommitAction = write.action().into(); 314 match write.clone() { 315 PreparedWrite::Create(c) => new_record_cids.push(c.cid), 316 PreparedWrite::Update(u) => { 317 new_record_cids.push(u.cid); 318 let u_at_uri: AtUri = u.uri.try_into()?; 319 delete_and_update_uris.push(u_at_uri); 320 } 321 PreparedWrite::Delete(d) => { 322 let d_at_uri: AtUri = d.uri.try_into()?; 323 delete_and_update_uris.push(d_at_uri) 324 } 325 } 326 if write.swap_cid().is_none() { 327 continue; 328 } 329 let write_at_uri: &AtUri = &write.uri().try_into()?; 330 let record = self 331 .record 332 .get_record(write_at_uri, None, Some(true)) 333 .await?; 334 let current_record = match record { 335 Some(record) => Some(Cid::from_str(&record.cid)?), 336 None => None, 337 }; 338 let cid = match &write { 339 &PreparedWrite::Delete(_) => None, 340 &PreparedWrite::Create(w) | &PreparedWrite::Update(w) => Some(w.cid), 341 }; 342 let mut op = CommitOp { 343 action: commit_action, 344 path: format_data_key(write_at_uri.get_collection(), write_at_uri.get_rkey()), 345 cid, 346 prev: None, 347 }; 348 if current_record.is_some() { 349 op.prev = current_record; 350 }; 351 commit_ops.push(op); 352 match write { 353 // There should be no current record for a create 354 PreparedWrite::Create(_) if write.swap_cid().is_some() => { 355 Err::<(), anyhow::Error>( 356 FormatCommitError::BadRecordSwap(format!("{:?}", current_record)) 357 .into(), 358 ) 359 } 360 // There should be a current record for an update 361 PreparedWrite::Update(_) if write.swap_cid().is_none() => { 362 Err::<(), anyhow::Error>( 363 FormatCommitError::BadRecordSwap(format!("{:?}", current_record)) 364 .into(), 365 ) 366 } 367 // There should be a current record for a delete 368 PreparedWrite::Delete(_) if write.swap_cid().is_none() => { 369 Err::<(), anyhow::Error>( 370 FormatCommitError::BadRecordSwap(format!("{:?}", current_record)) 371 .into(), 372 ) 373 } 374 _ => Ok::<(), anyhow::Error>(()), 375 }?; 376 match (current_record, write.swap_cid()) { 377 (Some(current_record), Some(swap_cid)) if current_record.eq(swap_cid) => { 378 Ok::<(), anyhow::Error>(()) 379 } 380 _ => Err::<(), anyhow::Error>( 381 FormatCommitError::RecordSwapMismatch(format!("{:?}", current_record)) 382 .into(), 383 ), 384 }?; 385 } 386 let mut repo = Repo::load(self.storage.clone(), Some(current_root.cid)).await?; 387 let previous_data = repo.commit.data; 388 let write_ops: Vec<RecordWriteOp> = writes 389 .into_iter() 390 .map(write_to_op) 391 .collect::<Result<Vec<RecordWriteOp>>>()?; 392 // @TODO: Use repo signing key global config 393 let secp = Secp256k1::new(); 394 let repo_private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX") 395 .expect("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX not set"); 396 let repo_secret_key = SecretKey::from_slice( 397 &hex::decode(repo_private_key.as_bytes()).expect("Failed to decode hex"), 398 ) 399 .expect("Failed to create secret key from hex"); 400 let repo_signing_key = Keypair::from_secret_key(&secp, &repo_secret_key); 401 402 let mut commit = repo 403 .format_commit(RecordWriteEnum::List(write_ops), repo_signing_key) 404 .await?; 405 406 // find blocks that would be deleted but are referenced by another record 407 let duplicate_record_cids = self 408 .get_duplicate_record_cids(commit.removed_cids.to_list(), delete_and_update_uris) 409 .await?; 410 for cid in duplicate_record_cids { 411 commit.removed_cids.delete(cid) 412 } 413 414 // find blocks that are relevant to ops but not included in diff 415 // (for instance a record that was moved but cid stayed the same) 416 let new_record_blocks = commit.relevant_blocks.get_many(new_record_cids)?; 417 if !new_record_blocks.missing.is_empty() { 418 let missing_blocks = { 419 let storage_guard = self.storage.read().await; 420 storage_guard.get_blocks(new_record_blocks.missing).await? 421 }; 422 commit.relevant_blocks.add_map(missing_blocks.blocks)?; 423 } 424 let commit_with_data_ops = CommitDataWithOps { 425 ops: commit_ops, 426 commit_data: commit, 427 prev_data: Some(previous_data), 428 }; 429 Ok(commit_with_data_ops) 430 } else { 431 Err(FormatCommitError::MissingRepoRoot(self.did.clone()).into()) 432 } 433 } 434 435 pub async fn index_writes(&self, writes: Vec<PreparedWrite>, rev: &str) -> Result<()> { 436 let now: &str = &rsky_common::now(); 437 438 drop( 439 stream::iter(writes) 440 .then(async move |write| { 441 match write { 442 PreparedWrite::Create(write) => { 443 let write_at_uri: AtUri = write.uri.try_into()?; 444 self.record 445 .index_record( 446 write_at_uri.clone(), 447 write.cid, 448 Some(write.record), 449 Some(write.action), 450 rev.to_owned(), 451 Some(now.to_owned()), 452 ) 453 .await?; 454 } 455 PreparedWrite::Update(write) => { 456 let write_at_uri: AtUri = write.uri.try_into()?; 457 self.record 458 .index_record( 459 write_at_uri.clone(), 460 write.cid, 461 Some(write.record), 462 Some(write.action), 463 rev.to_owned(), 464 Some(now.to_owned()), 465 ) 466 .await?; 467 } 468 PreparedWrite::Delete(write) => { 469 let write_at_uri: AtUri = write.uri.try_into()?; 470 self.record.delete_record(&write_at_uri).await?; 471 } 472 } 473 Ok::<(), anyhow::Error>(()) 474 }) 475 .collect::<Vec<_>>() 476 .await 477 .into_iter() 478 .collect::<Result<Vec<_>, _>>()?, 479 ); 480 Ok(()) 481 } 482 483 pub async fn destroy(&mut self) -> Result<()> { 484 let did: String = self.did.clone(); 485 use crate::schema::actor_store::blob::dsl as BlobSchema; 486 487 let blob_rows: Vec<String> = self 488 .storage 489 .read() 490 .await 491 .db 492 .interact(move |conn| { 493 BlobSchema::blob 494 .filter(BlobSchema::did.eq(did)) 495 .select(BlobSchema::cid) 496 .get_results(conn) 497 }) 498 .await 499 .expect("Failed to get blob rows")?; 500 let cids = blob_rows 501 .into_iter() 502 .map(|row| Ok(Cid::from_str(&row)?)) 503 .collect::<Result<Vec<Cid>>>()?; 504 drop( 505 stream::iter(cids.chunks(500)) 506 .then(|chunk| async { self.blob.blobstore.delete_many(chunk.to_vec()).await }) 507 .collect::<Vec<_>>() 508 .await 509 .into_iter() 510 .collect::<Result<Vec<_>, _>>()?, 511 ); 512 Ok(()) 513 } 514 515 pub async fn get_duplicate_record_cids( 516 &self, 517 cids: Vec<Cid>, 518 touched_uris: Vec<AtUri>, 519 ) -> Result<Vec<Cid>> { 520 if touched_uris.is_empty() || cids.is_empty() { 521 return Ok(vec![]); 522 } 523 let did: String = self.did.clone(); 524 use crate::schema::actor_store::record::dsl as RecordSchema; 525 526 let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); 527 let touched_uri_strs: Vec<String> = touched_uris.iter().map(|t| t.to_string()).collect(); 528 let res: Vec<String> = self 529 .storage 530 .read() 531 .await 532 .db 533 .interact(move |conn| { 534 RecordSchema::record 535 .filter(RecordSchema::did.eq(did)) 536 .filter(RecordSchema::cid.eq_any(cid_strs)) 537 .filter(RecordSchema::uri.ne_all(touched_uri_strs)) 538 .select(RecordSchema::cid) 539 .get_results(conn) 540 }) 541 .await 542 .expect("Failed to get duplicate record cids")?; 543 res.into_iter() 544 .map(|row| Cid::from_str(&row).map_err(anyhow::Error::new)) 545 .collect::<Result<Vec<Cid>>>() 546 } 547}