Alternative ATProto PDS implementation
1//! Actor store implementation for ATProto PDS.
2//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/mod.rs
3//! Which is based on https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts
4//! and also adds components from https://github.com/bluesky-social/atproto/blob/main/packages/pds/src/actor-store/repo/transactor.ts
5//! blacksky-algorithms/rsky is licensed under the Apache License 2.0
6//!
7//! Modified for SQLite backend
8
9mod blob;
10pub(crate) mod blob_fs;
11mod preference;
12mod record;
13pub(crate) mod sql_blob;
14mod sql_repo;
15
16use anyhow::Result;
17use cidv10::Cid;
18use diesel::*;
19use futures::stream::{self, StreamExt};
20use rsky_pds::actor_store::repo::types::SyncEvtData;
21use rsky_repo::repo::Repo;
22use rsky_repo::storage::readable_blockstore::ReadableBlockstore;
23use rsky_repo::storage::types::RepoStorage;
24use rsky_repo::types::{
25 CommitAction, CommitData, CommitDataWithOps, CommitOp, PreparedCreateOrUpdate, PreparedWrite,
26 RecordCreateOrUpdateOp, RecordWriteEnum, RecordWriteOp, WriteOpAction, write_to_op,
27};
28use rsky_repo::util::format_data_key;
29use rsky_syntax::aturi::AtUri;
30use secp256k1::{Keypair, Secp256k1, SecretKey};
31use std::str::FromStr;
32use std::sync::Arc;
33use std::{env, fmt};
34use tokio::sync::RwLock;
35
36use blob::BlobReader;
37use blob_fs::BlobStoreFs;
38use preference::PreferenceReader;
39use record::RecordReader;
40use sql_repo::SqlRepoReader;
41
42use crate::serve::ActorStorage;
43
44#[derive(Debug)]
45enum FormatCommitError {
46 BadRecordSwap(String),
47 RecordSwapMismatch(String),
48 BadCommitSwap(String),
49 MissingRepoRoot(String),
50}
51
52impl fmt::Display for FormatCommitError {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 match self {
55 Self::BadRecordSwap(record) => write!(f, "BadRecordSwapError: `{:?}`", record),
56 Self::RecordSwapMismatch(record) => {
57 write!(f, "BadRecordSwapError: current record is `{:?}`", record)
58 }
59 Self::BadCommitSwap(cid) => write!(f, "BadCommitSwapError: {}", cid),
60 Self::MissingRepoRoot(did) => write!(f, "No repo root found for `{}`", did),
61 }
62 }
63}
64
65impl std::error::Error for FormatCommitError {}
66
67pub struct ActorStore {
68 pub did: String,
69 pub storage: Arc<RwLock<SqlRepoReader>>, // get ipld blocks from db
70 pub record: RecordReader, // get lexicon records from db
71 pub blob: BlobReader, // get blobs
72 pub pref: PreferenceReader, // get preferences
73}
74
75// Combination of RepoReader/Transactor, BlobReader/Transactor, SqlRepoReader/Transactor
76impl ActorStore {
77 /// Concrete reader of an individual repo (hence BlobStoreFs which takes `did` param)
78 pub fn new(
79 did: String,
80 blobstore: BlobStoreFs,
81 db: deadpool_diesel::Pool<
82 deadpool_diesel::Manager<SqliteConnection>,
83 deadpool_diesel::sqlite::Object,
84 >,
85 conn: deadpool_diesel::sqlite::Object,
86 ) -> Self {
87 Self {
88 storage: Arc::new(RwLock::new(SqlRepoReader::new(did.clone(), None, conn))),
89 record: RecordReader::new(did.clone(), db.clone()),
90 pref: PreferenceReader::new(did.clone(), db.clone()),
91 did,
92 blob: BlobReader::new(blobstore, db),
93 }
94 }
95
96 /// Create a new ActorStore taking ActorPools HashMap as input
97 pub async fn from_actor_pools(
98 did: &String,
99 hashmap_actor_pools: &std::collections::HashMap<String, ActorStorage>,
100 ) -> Self {
101 let actor_pool = hashmap_actor_pools
102 .get(did)
103 .expect("Actor pool not found")
104 .clone();
105 let blobstore = BlobStoreFs::new(did.clone(), actor_pool.blob);
106 let conn = actor_pool
107 .repo
108 .clone()
109 .get()
110 .await
111 .expect("Failed to get connection");
112 Self::new(did.clone(), blobstore, actor_pool.repo, conn)
113 }
114
115 pub async fn get_repo_root(&self) -> Option<Cid> {
116 let storage_guard = self.storage.read().await;
117 storage_guard.get_root().await
118 }
119
120 // Transactors
121 // -------------------
122
123 #[deprecated]
124 pub async fn create_repo_legacy(
125 &self,
126 keypair: Keypair,
127 writes: Vec<PreparedCreateOrUpdate>,
128 ) -> Result<CommitData> {
129 let write_ops = writes
130 .clone()
131 .into_iter()
132 .map(|prepare| {
133 let at_uri: AtUri = prepare.uri.try_into()?;
134 Ok(RecordCreateOrUpdateOp {
135 action: WriteOpAction::Create,
136 collection: at_uri.get_collection(),
137 rkey: at_uri.get_rkey(),
138 record: prepare.record,
139 })
140 })
141 .collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?;
142 let commit = Repo::format_init_commit(
143 self.storage.clone(),
144 self.did.clone(),
145 keypair,
146 Some(write_ops),
147 )
148 .await?;
149 self.storage
150 .read()
151 .await
152 .apply_commit(commit.clone(), None)
153 .await?;
154 let writes = writes
155 .into_iter()
156 .map(PreparedWrite::Create)
157 .collect::<Vec<PreparedWrite>>();
158 self.blob.process_write_blobs(writes).await?;
159 Ok(commit)
160 }
161
162 pub async fn create_repo(
163 &self,
164 keypair: Keypair,
165 writes: Vec<PreparedCreateOrUpdate>,
166 ) -> Result<CommitDataWithOps> {
167 let write_ops = writes
168 .clone()
169 .into_iter()
170 .map(|prepare| {
171 let at_uri: AtUri = prepare.uri.try_into()?;
172 Ok(RecordCreateOrUpdateOp {
173 action: WriteOpAction::Create,
174 collection: at_uri.get_collection(),
175 rkey: at_uri.get_rkey(),
176 record: prepare.record,
177 })
178 })
179 .collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?;
180 let commit = Repo::format_init_commit(
181 self.storage.clone(),
182 self.did.clone(),
183 keypair,
184 Some(write_ops),
185 )
186 .await?;
187 self.storage
188 .read()
189 .await
190 .apply_commit(commit.clone(), None)
191 .await?;
192 let write_commit_ops = writes.iter().try_fold(
193 Vec::with_capacity(writes.len()),
194 |mut acc, w| -> Result<Vec<CommitOp>> {
195 let aturi: AtUri = w.uri.clone().try_into()?;
196 acc.push(CommitOp {
197 action: CommitAction::Create,
198 path: format_data_key(aturi.get_collection(), aturi.get_rkey()),
199 cid: Some(w.cid),
200 prev: None,
201 });
202 Ok(acc)
203 },
204 )?;
205 let writes = writes
206 .into_iter()
207 .map(PreparedWrite::Create)
208 .collect::<Vec<PreparedWrite>>();
209 self.blob.process_write_blobs(writes).await?;
210 Ok(CommitDataWithOps {
211 commit_data: commit,
212 ops: write_commit_ops,
213 prev_data: None,
214 })
215 }
216
217 pub async fn process_import_repo(
218 &mut self,
219 commit: CommitData,
220 writes: Vec<PreparedWrite>,
221 ) -> Result<()> {
222 {
223 let immutable_borrow = &self;
224 // & send to indexing
225 immutable_borrow
226 .index_writes(writes.clone(), &commit.rev)
227 .await?;
228 }
229 // persist the commit to repo storage
230 self.storage
231 .read()
232 .await
233 .apply_commit(commit.clone(), None)
234 .await?;
235 // process blobs
236 self.blob.process_write_blobs(writes).await?;
237 Ok(())
238 }
239
240 pub async fn process_writes(
241 &mut self,
242 writes: Vec<PreparedWrite>,
243 swap_commit_cid: Option<Cid>,
244 ) -> Result<CommitDataWithOps> {
245 // NOTE: In the typescript PR on sync v1.1
246 // there are some safeguards added for adding
247 // very large commits and very many commits
248 // for which I'm sure we could safeguard on
249 // but may not be necessary.
250 // https://github.com/bluesky-social/atproto/pull/3585/files#diff-7627844a4a6b50190014e947d1331a96df3c64d4c5273fa0ce544f85c3c1265f
251 let commit = self.format_commit(writes.clone(), swap_commit_cid).await?;
252 {
253 let immutable_borrow = &self;
254 // & send to indexing
255 immutable_borrow
256 .index_writes(writes.clone(), &commit.commit_data.rev)
257 .await?;
258 }
259 // persist the commit to repo storage
260 self.storage
261 .read()
262 .await
263 .apply_commit(commit.commit_data.clone(), None)
264 .await?;
265 // process blobs
266 self.blob.process_write_blobs(writes).await?;
267 Ok(commit)
268 }
269
270 pub async fn get_sync_event_data(&mut self) -> Result<SyncEvtData> {
271 let current_root = self.storage.read().await.get_root_detailed().await?;
272 let blocks_and_missing = self
273 .storage
274 .read()
275 .await
276 .get_blocks(vec![current_root.cid])
277 .await?;
278 Ok(SyncEvtData {
279 cid: current_root.cid,
280 rev: current_root.rev,
281 blocks: blocks_and_missing.blocks,
282 })
283 }
284
285 pub async fn format_commit(
286 &mut self,
287 writes: Vec<PreparedWrite>,
288 swap_commit: Option<Cid>,
289 ) -> Result<CommitDataWithOps> {
290 let current_root = {
291 let storage_guard = self.storage.read().await;
292 storage_guard.get_root_detailed().await
293 };
294 if let Ok(current_root) = current_root {
295 if let Some(swap_commit) = swap_commit {
296 if !current_root.cid.eq(&swap_commit) {
297 return Err(
298 FormatCommitError::BadCommitSwap(current_root.cid.to_string()).into(),
299 );
300 }
301 }
302 {
303 self.storage
304 .write()
305 .await
306 .cache_rev(current_root.rev)
307 .await?;
308 }
309 let mut new_record_cids: Vec<Cid> = vec![];
310 let mut delete_and_update_uris = vec![];
311 let mut commit_ops = vec![];
312 for write in &writes {
313 let commit_action: CommitAction = write.action().into();
314 match write.clone() {
315 PreparedWrite::Create(c) => new_record_cids.push(c.cid),
316 PreparedWrite::Update(u) => {
317 new_record_cids.push(u.cid);
318 let u_at_uri: AtUri = u.uri.try_into()?;
319 delete_and_update_uris.push(u_at_uri);
320 }
321 PreparedWrite::Delete(d) => {
322 let d_at_uri: AtUri = d.uri.try_into()?;
323 delete_and_update_uris.push(d_at_uri)
324 }
325 }
326 if write.swap_cid().is_none() {
327 continue;
328 }
329 let write_at_uri: &AtUri = &write.uri().try_into()?;
330 let record = self
331 .record
332 .get_record(write_at_uri, None, Some(true))
333 .await?;
334 let current_record = match record {
335 Some(record) => Some(Cid::from_str(&record.cid)?),
336 None => None,
337 };
338 let cid = match &write {
339 &PreparedWrite::Delete(_) => None,
340 &PreparedWrite::Create(w) | &PreparedWrite::Update(w) => Some(w.cid),
341 };
342 let mut op = CommitOp {
343 action: commit_action,
344 path: format_data_key(write_at_uri.get_collection(), write_at_uri.get_rkey()),
345 cid,
346 prev: None,
347 };
348 if current_record.is_some() {
349 op.prev = current_record;
350 };
351 commit_ops.push(op);
352 match write {
353 // There should be no current record for a create
354 PreparedWrite::Create(_) if write.swap_cid().is_some() => {
355 Err::<(), anyhow::Error>(
356 FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
357 .into(),
358 )
359 }
360 // There should be a current record for an update
361 PreparedWrite::Update(_) if write.swap_cid().is_none() => {
362 Err::<(), anyhow::Error>(
363 FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
364 .into(),
365 )
366 }
367 // There should be a current record for a delete
368 PreparedWrite::Delete(_) if write.swap_cid().is_none() => {
369 Err::<(), anyhow::Error>(
370 FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
371 .into(),
372 )
373 }
374 _ => Ok::<(), anyhow::Error>(()),
375 }?;
376 match (current_record, write.swap_cid()) {
377 (Some(current_record), Some(swap_cid)) if current_record.eq(swap_cid) => {
378 Ok::<(), anyhow::Error>(())
379 }
380 _ => Err::<(), anyhow::Error>(
381 FormatCommitError::RecordSwapMismatch(format!("{:?}", current_record))
382 .into(),
383 ),
384 }?;
385 }
386 let mut repo = Repo::load(self.storage.clone(), Some(current_root.cid)).await?;
387 let previous_data = repo.commit.data;
388 let write_ops: Vec<RecordWriteOp> = writes
389 .into_iter()
390 .map(write_to_op)
391 .collect::<Result<Vec<RecordWriteOp>>>()?;
392 // @TODO: Use repo signing key global config
393 let secp = Secp256k1::new();
394 let repo_private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX")
395 .expect("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX not set");
396 let repo_secret_key = SecretKey::from_slice(
397 &hex::decode(repo_private_key.as_bytes()).expect("Failed to decode hex"),
398 )
399 .expect("Failed to create secret key from hex");
400 let repo_signing_key = Keypair::from_secret_key(&secp, &repo_secret_key);
401
402 let mut commit = repo
403 .format_commit(RecordWriteEnum::List(write_ops), repo_signing_key)
404 .await?;
405
406 // find blocks that would be deleted but are referenced by another record
407 let duplicate_record_cids = self
408 .get_duplicate_record_cids(commit.removed_cids.to_list(), delete_and_update_uris)
409 .await?;
410 for cid in duplicate_record_cids {
411 commit.removed_cids.delete(cid)
412 }
413
414 // find blocks that are relevant to ops but not included in diff
415 // (for instance a record that was moved but cid stayed the same)
416 let new_record_blocks = commit.relevant_blocks.get_many(new_record_cids)?;
417 if !new_record_blocks.missing.is_empty() {
418 let missing_blocks = {
419 let storage_guard = self.storage.read().await;
420 storage_guard.get_blocks(new_record_blocks.missing).await?
421 };
422 commit.relevant_blocks.add_map(missing_blocks.blocks)?;
423 }
424 let commit_with_data_ops = CommitDataWithOps {
425 ops: commit_ops,
426 commit_data: commit,
427 prev_data: Some(previous_data),
428 };
429 Ok(commit_with_data_ops)
430 } else {
431 Err(FormatCommitError::MissingRepoRoot(self.did.clone()).into())
432 }
433 }
434
435 pub async fn index_writes(&self, writes: Vec<PreparedWrite>, rev: &str) -> Result<()> {
436 let now: &str = &rsky_common::now();
437
438 drop(
439 stream::iter(writes)
440 .then(async move |write| {
441 match write {
442 PreparedWrite::Create(write) => {
443 let write_at_uri: AtUri = write.uri.try_into()?;
444 self.record
445 .index_record(
446 write_at_uri.clone(),
447 write.cid,
448 Some(write.record),
449 Some(write.action),
450 rev.to_owned(),
451 Some(now.to_owned()),
452 )
453 .await?;
454 }
455 PreparedWrite::Update(write) => {
456 let write_at_uri: AtUri = write.uri.try_into()?;
457 self.record
458 .index_record(
459 write_at_uri.clone(),
460 write.cid,
461 Some(write.record),
462 Some(write.action),
463 rev.to_owned(),
464 Some(now.to_owned()),
465 )
466 .await?;
467 }
468 PreparedWrite::Delete(write) => {
469 let write_at_uri: AtUri = write.uri.try_into()?;
470 self.record.delete_record(&write_at_uri).await?;
471 }
472 }
473 Ok::<(), anyhow::Error>(())
474 })
475 .collect::<Vec<_>>()
476 .await
477 .into_iter()
478 .collect::<Result<Vec<_>, _>>()?,
479 );
480 Ok(())
481 }
482
483 pub async fn destroy(&mut self) -> Result<()> {
484 let did: String = self.did.clone();
485 use crate::schema::actor_store::blob::dsl as BlobSchema;
486
487 let blob_rows: Vec<String> = self
488 .storage
489 .read()
490 .await
491 .db
492 .interact(move |conn| {
493 BlobSchema::blob
494 .filter(BlobSchema::did.eq(did))
495 .select(BlobSchema::cid)
496 .get_results(conn)
497 })
498 .await
499 .expect("Failed to get blob rows")?;
500 let cids = blob_rows
501 .into_iter()
502 .map(|row| Ok(Cid::from_str(&row)?))
503 .collect::<Result<Vec<Cid>>>()?;
504 drop(
505 stream::iter(cids.chunks(500))
506 .then(|chunk| async { self.blob.blobstore.delete_many(chunk.to_vec()).await })
507 .collect::<Vec<_>>()
508 .await
509 .into_iter()
510 .collect::<Result<Vec<_>, _>>()?,
511 );
512 Ok(())
513 }
514
515 pub async fn get_duplicate_record_cids(
516 &self,
517 cids: Vec<Cid>,
518 touched_uris: Vec<AtUri>,
519 ) -> Result<Vec<Cid>> {
520 if touched_uris.is_empty() || cids.is_empty() {
521 return Ok(vec![]);
522 }
523 let did: String = self.did.clone();
524 use crate::schema::actor_store::record::dsl as RecordSchema;
525
526 let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
527 let touched_uri_strs: Vec<String> = touched_uris.iter().map(|t| t.to_string()).collect();
528 let res: Vec<String> = self
529 .storage
530 .read()
531 .await
532 .db
533 .interact(move |conn| {
534 RecordSchema::record
535 .filter(RecordSchema::did.eq(did))
536 .filter(RecordSchema::cid.eq_any(cid_strs))
537 .filter(RecordSchema::uri.ne_all(touched_uri_strs))
538 .select(RecordSchema::cid)
539 .get_results(conn)
540 })
541 .await
542 .expect("Failed to get duplicate record cids")?;
543 res.into_iter()
544 .map(|row| Cid::from_str(&row).map_err(anyhow::Error::new))
545 .collect::<Result<Vec<Cid>>>()
546 }
547}