-486
src/actor_store/actor_store.rs
-486
src/actor_store/actor_store.rs
···
1
-
//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/mod.rs
2
-
//! Which is based on https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts
3
-
//! and also adds components from https://github.com/bluesky-social/atproto/blob/main/packages/pds/src/actor-store/repo/transactor.ts
4
-
//! blacksky-algorithms/rsky is licensed under the Apache License 2.0
5
-
//!
6
-
//! Modified for SQLite backend
7
-
8
-
use anyhow::Result;
9
-
use cidv10::Cid;
10
-
use diesel::*;
11
-
use futures::stream::{self, StreamExt};
12
-
use rsky_pds::actor_store::repo::types::SyncEvtData;
13
-
use rsky_repo::repo::Repo;
14
-
use rsky_repo::storage::readable_blockstore::ReadableBlockstore;
15
-
use rsky_repo::storage::types::RepoStorage;
16
-
use rsky_repo::types::{
17
-
CommitAction, CommitData, CommitDataWithOps, CommitOp, PreparedCreateOrUpdate, PreparedWrite,
18
-
RecordCreateOrUpdateOp, RecordWriteEnum, RecordWriteOp, WriteOpAction, write_to_op,
19
-
};
20
-
use rsky_repo::util::format_data_key;
21
-
use rsky_syntax::aturi::AtUri;
22
-
use secp256k1::{Keypair, Secp256k1, SecretKey};
23
-
use std::str::FromStr;
24
-
use std::sync::Arc;
25
-
use std::{env, fmt};
26
-
use tokio::sync::RwLock;
27
-
28
-
use crate::db::DbConn;
29
-
30
-
use super::blob::BlobReader;
31
-
use super::preference::PreferenceReader;
32
-
use super::record::RecordReader;
33
-
use super::sql_blob::BlobStoreSql;
34
-
use super::sql_repo::SqlRepoReader;
35
-
36
-
#[derive(Debug)]
37
-
enum FormatCommitError {
38
-
BadRecordSwap(String),
39
-
RecordSwapMismatch(String),
40
-
BadCommitSwap(String),
41
-
MissingRepoRoot(String),
42
-
}
43
-
44
-
impl fmt::Display for FormatCommitError {
45
-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
46
-
match self {
47
-
Self::BadRecordSwap(record) => write!(f, "BadRecordSwapError: `{:?}`", record),
48
-
Self::RecordSwapMismatch(record) => {
49
-
write!(f, "BadRecordSwapError: current record is `{:?}`", record)
50
-
}
51
-
Self::BadCommitSwap(cid) => write!(f, "BadCommitSwapError: {}", cid),
52
-
Self::MissingRepoRoot(did) => write!(f, "No repo root found for `{}`", did),
53
-
}
54
-
}
55
-
}
56
-
57
-
impl std::error::Error for FormatCommitError {}
58
-
59
-
pub struct ActorStore {
60
-
pub did: String,
61
-
pub storage: Arc<RwLock<SqlRepoReader>>, // get ipld blocks from db
62
-
pub record: RecordReader, // get lexicon records from db
63
-
pub blob: BlobReader, // get blobs
64
-
pub pref: PreferenceReader, // get preferences
65
-
}
66
-
67
-
// Combination of RepoReader/Transactor, BlobReader/Transactor, SqlRepoReader/Transactor
68
-
impl ActorStore {
69
-
/// Concrete reader of an individual repo (hence BlobStoreSql which takes `did` param)
70
-
pub fn new(did: String, blobstore: BlobStoreSql, db: DbConn) -> Self {
71
-
let db = Arc::new(db);
72
-
ActorStore {
73
-
storage: Arc::new(RwLock::new(SqlRepoReader::new(
74
-
did.clone(),
75
-
None,
76
-
db.clone(),
77
-
))),
78
-
record: RecordReader::new(did.clone(), db.clone()),
79
-
pref: PreferenceReader::new(did.clone(), db.clone()),
80
-
did,
81
-
blob: BlobReader::new(blobstore, db.clone()), // Unlike TS impl, just use blob reader vs generator
82
-
}
83
-
}
84
-
85
-
pub async fn get_repo_root(&self) -> Option<Cid> {
86
-
let storage_guard = self.storage.read().await;
87
-
storage_guard.get_root().await
88
-
}
89
-
90
-
// Transactors
91
-
// -------------------
92
-
93
-
#[deprecated]
94
-
pub async fn create_repo_legacy(
95
-
&self,
96
-
keypair: Keypair,
97
-
writes: Vec<PreparedCreateOrUpdate>,
98
-
) -> Result<CommitData> {
99
-
let write_ops = writes
100
-
.clone()
101
-
.into_iter()
102
-
.map(|prepare| {
103
-
let at_uri: AtUri = prepare.uri.try_into()?;
104
-
Ok(RecordCreateOrUpdateOp {
105
-
action: WriteOpAction::Create,
106
-
collection: at_uri.get_collection(),
107
-
rkey: at_uri.get_rkey(),
108
-
record: prepare.record,
109
-
})
110
-
})
111
-
.collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?;
112
-
let commit = Repo::format_init_commit(
113
-
self.storage.clone(),
114
-
self.did.clone(),
115
-
keypair,
116
-
Some(write_ops),
117
-
)
118
-
.await?;
119
-
let storage_guard = self.storage.read().await;
120
-
storage_guard.apply_commit(commit.clone(), None).await?;
121
-
let writes = writes
122
-
.into_iter()
123
-
.map(PreparedWrite::Create)
124
-
.collect::<Vec<PreparedWrite>>();
125
-
self.blob.process_write_blobs(writes).await?;
126
-
Ok(commit)
127
-
}
128
-
129
-
pub async fn create_repo(
130
-
&self,
131
-
keypair: Keypair,
132
-
writes: Vec<PreparedCreateOrUpdate>,
133
-
) -> Result<CommitDataWithOps> {
134
-
let write_ops = writes
135
-
.clone()
136
-
.into_iter()
137
-
.map(|prepare| {
138
-
let at_uri: AtUri = prepare.uri.try_into()?;
139
-
Ok(RecordCreateOrUpdateOp {
140
-
action: WriteOpAction::Create,
141
-
collection: at_uri.get_collection(),
142
-
rkey: at_uri.get_rkey(),
143
-
record: prepare.record,
144
-
})
145
-
})
146
-
.collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?;
147
-
let commit = Repo::format_init_commit(
148
-
self.storage.clone(),
149
-
self.did.clone(),
150
-
keypair,
151
-
Some(write_ops),
152
-
)
153
-
.await?;
154
-
let storage_guard = self.storage.read().await;
155
-
storage_guard.apply_commit(commit.clone(), None).await?;
156
-
let write_commit_ops = writes.iter().try_fold(
157
-
Vec::with_capacity(writes.len()),
158
-
|mut acc, w| -> Result<Vec<CommitOp>> {
159
-
let aturi: AtUri = w.uri.clone().try_into()?;
160
-
acc.push(CommitOp {
161
-
action: CommitAction::Create,
162
-
path: format_data_key(aturi.get_collection(), aturi.get_rkey()),
163
-
cid: Some(w.cid.clone()),
164
-
prev: None,
165
-
});
166
-
Ok(acc)
167
-
},
168
-
)?;
169
-
let writes = writes
170
-
.into_iter()
171
-
.map(PreparedWrite::Create)
172
-
.collect::<Vec<PreparedWrite>>();
173
-
self.blob.process_write_blobs(writes).await?;
174
-
Ok(CommitDataWithOps {
175
-
commit_data: commit,
176
-
ops: write_commit_ops,
177
-
prev_data: None,
178
-
})
179
-
}
180
-
181
-
pub async fn process_import_repo(
182
-
&mut self,
183
-
commit: CommitData,
184
-
writes: Vec<PreparedWrite>,
185
-
) -> Result<()> {
186
-
{
187
-
let immutable_borrow = &self;
188
-
// & send to indexing
189
-
immutable_borrow
190
-
.index_writes(writes.clone(), &commit.rev)
191
-
.await?;
192
-
}
193
-
// persist the commit to repo storage
194
-
let storage_guard = self.storage.read().await;
195
-
storage_guard.apply_commit(commit.clone(), None).await?;
196
-
// process blobs
197
-
self.blob.process_write_blobs(writes).await?;
198
-
Ok(())
199
-
}
200
-
201
-
pub async fn process_writes(
202
-
&mut self,
203
-
writes: Vec<PreparedWrite>,
204
-
swap_commit_cid: Option<Cid>,
205
-
) -> Result<CommitDataWithOps> {
206
-
// NOTE: In the typescript PR on sync v1.1
207
-
// there are some safeguards added for adding
208
-
// very large commits and very many commits
209
-
// for which I'm sure we could safeguard on
210
-
// but may not be necessary.
211
-
// https://github.com/bluesky-social/atproto/pull/3585/files#diff-7627844a4a6b50190014e947d1331a96df3c64d4c5273fa0ce544f85c3c1265f
212
-
let commit = self.format_commit(writes.clone(), swap_commit_cid).await?;
213
-
{
214
-
let immutable_borrow = &self;
215
-
// & send to indexing
216
-
immutable_borrow
217
-
.index_writes(writes.clone(), &commit.commit_data.rev)
218
-
.await?;
219
-
}
220
-
// persist the commit to repo storage
221
-
let storage_guard = self.storage.read().await;
222
-
storage_guard
223
-
.apply_commit(commit.commit_data.clone(), None)
224
-
.await?;
225
-
// process blobs
226
-
self.blob.process_write_blobs(writes).await?;
227
-
Ok(commit)
228
-
}
229
-
230
-
pub async fn get_sync_event_data(&mut self) -> Result<SyncEvtData> {
231
-
let storage_guard = self.storage.read().await;
232
-
let current_root = storage_guard.get_root_detailed().await?;
233
-
let blocks_and_missing = storage_guard.get_blocks(vec![current_root.cid]).await?;
234
-
Ok(SyncEvtData {
235
-
cid: current_root.cid,
236
-
rev: current_root.rev,
237
-
blocks: blocks_and_missing.blocks,
238
-
})
239
-
}
240
-
241
-
pub async fn format_commit(
242
-
&mut self,
243
-
writes: Vec<PreparedWrite>,
244
-
swap_commit: Option<Cid>,
245
-
) -> Result<CommitDataWithOps> {
246
-
let current_root = {
247
-
let storage_guard = self.storage.read().await;
248
-
storage_guard.get_root_detailed().await
249
-
};
250
-
if let Ok(current_root) = current_root {
251
-
if let Some(swap_commit) = swap_commit {
252
-
if !current_root.cid.eq(&swap_commit) {
253
-
return Err(
254
-
FormatCommitError::BadCommitSwap(current_root.cid.to_string()).into(),
255
-
);
256
-
}
257
-
}
258
-
{
259
-
let mut storage_guard = self.storage.write().await;
260
-
storage_guard.cache_rev(current_root.rev).await?;
261
-
}
262
-
let mut new_record_cids: Vec<Cid> = vec![];
263
-
let mut delete_and_update_uris = vec![];
264
-
let mut commit_ops = vec![];
265
-
for write in &writes {
266
-
let commit_action: CommitAction = write.action().into();
267
-
match write.clone() {
268
-
PreparedWrite::Create(c) => new_record_cids.push(c.cid),
269
-
PreparedWrite::Update(u) => {
270
-
new_record_cids.push(u.cid);
271
-
let u_at_uri: AtUri = u.uri.try_into()?;
272
-
delete_and_update_uris.push(u_at_uri);
273
-
}
274
-
PreparedWrite::Delete(d) => {
275
-
let d_at_uri: AtUri = d.uri.try_into()?;
276
-
delete_and_update_uris.push(d_at_uri)
277
-
}
278
-
}
279
-
if write.swap_cid().is_none() {
280
-
continue;
281
-
}
282
-
let write_at_uri: &AtUri = &write.uri().try_into()?;
283
-
let record = self
284
-
.record
285
-
.get_record(write_at_uri, None, Some(true))
286
-
.await?;
287
-
let current_record = match record {
288
-
Some(record) => Some(Cid::from_str(&record.cid)?),
289
-
None => None,
290
-
};
291
-
let cid = match &write {
292
-
&PreparedWrite::Delete(_) => None,
293
-
&PreparedWrite::Create(w) | &PreparedWrite::Update(w) => Some(w.cid),
294
-
};
295
-
let mut op = CommitOp {
296
-
action: commit_action,
297
-
path: format_data_key(write_at_uri.get_collection(), write_at_uri.get_rkey()),
298
-
cid,
299
-
prev: None,
300
-
};
301
-
if let Some(_) = current_record {
302
-
op.prev = current_record;
303
-
};
304
-
commit_ops.push(op);
305
-
match write {
306
-
// There should be no current record for a create
307
-
PreparedWrite::Create(_) if write.swap_cid().is_some() => {
308
-
Err::<(), anyhow::Error>(
309
-
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
310
-
.into(),
311
-
)
312
-
}
313
-
// There should be a current record for an update
314
-
PreparedWrite::Update(_) if write.swap_cid().is_none() => {
315
-
Err::<(), anyhow::Error>(
316
-
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
317
-
.into(),
318
-
)
319
-
}
320
-
// There should be a current record for a delete
321
-
PreparedWrite::Delete(_) if write.swap_cid().is_none() => {
322
-
Err::<(), anyhow::Error>(
323
-
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
324
-
.into(),
325
-
)
326
-
}
327
-
_ => Ok::<(), anyhow::Error>(()),
328
-
}?;
329
-
match (current_record, write.swap_cid()) {
330
-
(Some(current_record), Some(swap_cid)) if current_record.eq(swap_cid) => {
331
-
Ok::<(), anyhow::Error>(())
332
-
}
333
-
_ => Err::<(), anyhow::Error>(
334
-
FormatCommitError::RecordSwapMismatch(format!("{:?}", current_record))
335
-
.into(),
336
-
),
337
-
}?;
338
-
}
339
-
let mut repo = Repo::load(self.storage.clone(), Some(current_root.cid)).await?;
340
-
let previous_data = repo.commit.data;
341
-
let write_ops: Vec<RecordWriteOp> = writes
342
-
.into_iter()
343
-
.map(write_to_op)
344
-
.collect::<Result<Vec<RecordWriteOp>>>()?;
345
-
// @TODO: Use repo signing key global config
346
-
let secp = Secp256k1::new();
347
-
let repo_private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX").unwrap();
348
-
let repo_secret_key =
349
-
SecretKey::from_slice(&hex::decode(repo_private_key.as_bytes()).unwrap()).unwrap();
350
-
let repo_signing_key = Keypair::from_secret_key(&secp, &repo_secret_key);
351
-
352
-
let mut commit = repo
353
-
.format_commit(RecordWriteEnum::List(write_ops), repo_signing_key)
354
-
.await?;
355
-
356
-
// find blocks that would be deleted but are referenced by another record
357
-
let duplicate_record_cids = self
358
-
.get_duplicate_record_cids(commit.removed_cids.to_list(), delete_and_update_uris)
359
-
.await?;
360
-
for cid in duplicate_record_cids {
361
-
commit.removed_cids.delete(cid)
362
-
}
363
-
364
-
// find blocks that are relevant to ops but not included in diff
365
-
// (for instance a record that was moved but cid stayed the same)
366
-
let new_record_blocks = commit.relevant_blocks.get_many(new_record_cids)?;
367
-
if !new_record_blocks.missing.is_empty() {
368
-
let missing_blocks = {
369
-
let storage_guard = self.storage.read().await;
370
-
storage_guard.get_blocks(new_record_blocks.missing).await?
371
-
};
372
-
commit.relevant_blocks.add_map(missing_blocks.blocks)?;
373
-
}
374
-
let commit_with_data_ops = CommitDataWithOps {
375
-
ops: commit_ops,
376
-
commit_data: commit,
377
-
prev_data: Some(previous_data),
378
-
};
379
-
Ok(commit_with_data_ops)
380
-
} else {
381
-
Err(FormatCommitError::MissingRepoRoot(self.did.clone()).into())
382
-
}
383
-
}
384
-
385
-
pub async fn index_writes(&self, writes: Vec<PreparedWrite>, rev: &str) -> Result<()> {
386
-
let now: &str = &rsky_common::now();
387
-
388
-
let _ = stream::iter(writes)
389
-
.then(|write| async move {
390
-
Ok::<(), anyhow::Error>(match write {
391
-
PreparedWrite::Create(write) => {
392
-
let write_at_uri: AtUri = write.uri.try_into()?;
393
-
self.record
394
-
.index_record(
395
-
write_at_uri.clone(),
396
-
write.cid,
397
-
Some(write.record),
398
-
Some(write.action),
399
-
rev.to_owned(),
400
-
Some(now.to_string()),
401
-
)
402
-
.await?
403
-
}
404
-
PreparedWrite::Update(write) => {
405
-
let write_at_uri: AtUri = write.uri.try_into()?;
406
-
self.record
407
-
.index_record(
408
-
write_at_uri.clone(),
409
-
write.cid,
410
-
Some(write.record),
411
-
Some(write.action),
412
-
rev.to_owned(),
413
-
Some(now.to_string()),
414
-
)
415
-
.await?
416
-
}
417
-
PreparedWrite::Delete(write) => {
418
-
let write_at_uri: AtUri = write.uri.try_into()?;
419
-
self.record.delete_record(&write_at_uri).await?
420
-
}
421
-
})
422
-
})
423
-
.collect::<Vec<_>>()
424
-
.await
425
-
.into_iter()
426
-
.collect::<Result<Vec<_>, _>>()?;
427
-
Ok(())
428
-
}
429
-
430
-
pub async fn destroy(&mut self) -> Result<()> {
431
-
let did: String = self.did.clone();
432
-
let storage_guard = self.storage.read().await;
433
-
let db: Arc<DbConn> = storage_guard.db.clone();
434
-
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
435
-
436
-
let blob_rows: Vec<String> = db
437
-
.run(move |conn| {
438
-
BlobSchema::blob
439
-
.filter(BlobSchema::did.eq(did))
440
-
.select(BlobSchema::cid)
441
-
.get_results(conn)
442
-
})
443
-
.await?;
444
-
let cids = blob_rows
445
-
.into_iter()
446
-
.map(|row| Ok(Cid::from_str(&row)?))
447
-
.collect::<Result<Vec<Cid>>>()?;
448
-
let _ = stream::iter(cids.chunks(500))
449
-
.then(|chunk| async { self.blob.blobstore.delete_many(chunk.to_vec()).await })
450
-
.collect::<Vec<_>>()
451
-
.await
452
-
.into_iter()
453
-
.collect::<Result<Vec<_>, _>>()?;
454
-
Ok(())
455
-
}
456
-
457
-
pub async fn get_duplicate_record_cids(
458
-
&self,
459
-
cids: Vec<Cid>,
460
-
touched_uris: Vec<AtUri>,
461
-
) -> Result<Vec<Cid>> {
462
-
if touched_uris.is_empty() || cids.is_empty() {
463
-
return Ok(vec![]);
464
-
}
465
-
let did: String = self.did.clone();
466
-
let storage_guard = self.storage.read().await;
467
-
let db: Arc<DbConn> = storage_guard.db.clone();
468
-
use rsky_pds::schema::pds::record::dsl as RecordSchema;
469
-
470
-
let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
471
-
let touched_uri_strs: Vec<String> = touched_uris.iter().map(|t| t.to_string()).collect();
472
-
let res: Vec<String> = db
473
-
.run(move |conn| {
474
-
RecordSchema::record
475
-
.filter(RecordSchema::did.eq(did))
476
-
.filter(RecordSchema::cid.eq_any(cid_strs))
477
-
.filter(RecordSchema::uri.ne_all(touched_uri_strs))
478
-
.select(RecordSchema::cid)
479
-
.get_results(conn)
480
-
})
481
-
.await?;
482
-
res.into_iter()
483
-
.map(|row| Cid::from_str(&row).map_err(|error| anyhow::Error::new(error)))
484
-
.collect::<Result<Vec<Cid>>>()
485
-
}
486
-
}
+6
-2
src/actor_store/blob.rs
+6
-2
src/actor_store/blob.rs
···
1
-
use std::sync::Arc;
1
+
//! Blob operations for the actor store
2
+
//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/blob/mod.rs
3
+
//! blacksky-algorithms/rsky is licensed under the Apache License 2.0
4
+
//!
5
+
//! Modified for SQLite backend
2
6
3
7
use anyhow::{Result, bail};
4
8
use cidv10::Cid;
···
9
13
stream::{self, StreamExt},
10
14
try_join,
11
15
};
12
-
// use rocket::data::{Data, ToByteUnit};
13
16
use rsky_common::ipld::sha256_raw_to_cid;
14
17
use rsky_common::now;
15
18
use rsky_lexicon::blob_refs::BlobRef;
···
23
26
use rsky_pds::models::models;
24
27
use rsky_repo::error::BlobError;
25
28
use rsky_repo::types::{PreparedBlobRef, PreparedWrite};
29
+
use std::sync::Arc;
26
30
27
31
use super::sql_blob::{BlobStoreSql, ByteStream};
28
32
use crate::db::DbConn;
+486
-1
src/actor_store/mod.rs
+486
-1
src/actor_store/mod.rs
···
1
1
//! Actor store implementation for ATProto PDS.
2
+
//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/mod.rs
3
+
//! Which is based on https://github.com/bluesky-social/atproto/blob/main/packages/repo/src/repo.ts
4
+
//! and also adds components from https://github.com/bluesky-social/atproto/blob/main/packages/pds/src/actor-store/repo/transactor.ts
5
+
//! blacksky-algorithms/rsky is licensed under the Apache License 2.0
6
+
//!
7
+
//! Modified for SQLite backend
2
8
3
-
mod actor_store;
4
9
mod blob;
5
10
mod preference;
6
11
mod record;
7
12
mod sql_blob;
8
13
mod sql_repo;
14
+
15
+
use anyhow::Result;
16
+
use cidv10::Cid;
17
+
use diesel::*;
18
+
use futures::stream::{self, StreamExt};
19
+
use rsky_pds::actor_store::repo::types::SyncEvtData;
20
+
use rsky_repo::repo::Repo;
21
+
use rsky_repo::storage::readable_blockstore::ReadableBlockstore;
22
+
use rsky_repo::storage::types::RepoStorage;
23
+
use rsky_repo::types::{
24
+
CommitAction, CommitData, CommitDataWithOps, CommitOp, PreparedCreateOrUpdate, PreparedWrite,
25
+
RecordCreateOrUpdateOp, RecordWriteEnum, RecordWriteOp, WriteOpAction, write_to_op,
26
+
};
27
+
use rsky_repo::util::format_data_key;
28
+
use rsky_syntax::aturi::AtUri;
29
+
use secp256k1::{Keypair, Secp256k1, SecretKey};
30
+
use std::str::FromStr;
31
+
use std::sync::Arc;
32
+
use std::{env, fmt};
33
+
use tokio::sync::RwLock;
34
+
35
+
use crate::db::DbConn;
36
+
37
+
use blob::BlobReader;
38
+
use preference::PreferenceReader;
39
+
use record::RecordReader;
40
+
use sql_blob::BlobStoreSql;
41
+
use sql_repo::SqlRepoReader;
42
+
43
+
#[derive(Debug)]
44
+
enum FormatCommitError {
45
+
BadRecordSwap(String),
46
+
RecordSwapMismatch(String),
47
+
BadCommitSwap(String),
48
+
MissingRepoRoot(String),
49
+
}
50
+
51
+
impl fmt::Display for FormatCommitError {
52
+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53
+
match self {
54
+
Self::BadRecordSwap(record) => write!(f, "BadRecordSwapError: `{:?}`", record),
55
+
Self::RecordSwapMismatch(record) => {
56
+
write!(f, "BadRecordSwapError: current record is `{:?}`", record)
57
+
}
58
+
Self::BadCommitSwap(cid) => write!(f, "BadCommitSwapError: {}", cid),
59
+
Self::MissingRepoRoot(did) => write!(f, "No repo root found for `{}`", did),
60
+
}
61
+
}
62
+
}
63
+
64
+
impl std::error::Error for FormatCommitError {}
65
+
66
+
pub struct ActorStore {
67
+
pub did: String,
68
+
pub storage: Arc<RwLock<SqlRepoReader>>, // get ipld blocks from db
69
+
pub record: RecordReader, // get lexicon records from db
70
+
pub blob: BlobReader, // get blobs
71
+
pub pref: PreferenceReader, // get preferences
72
+
}
73
+
74
+
// Combination of RepoReader/Transactor, BlobReader/Transactor, SqlRepoReader/Transactor
75
+
impl ActorStore {
76
+
/// Concrete reader of an individual repo (hence BlobStoreSql which takes `did` param)
77
+
pub fn new(did: String, blobstore: BlobStoreSql, db: DbConn) -> Self {
78
+
let db = Arc::new(db);
79
+
ActorStore {
80
+
storage: Arc::new(RwLock::new(SqlRepoReader::new(
81
+
did.clone(),
82
+
None,
83
+
db.clone(),
84
+
))),
85
+
record: RecordReader::new(did.clone(), db.clone()),
86
+
pref: PreferenceReader::new(did.clone(), db.clone()),
87
+
did,
88
+
blob: BlobReader::new(blobstore, db.clone()), // Unlike TS impl, just use blob reader vs generator
89
+
}
90
+
}
91
+
92
+
pub async fn get_repo_root(&self) -> Option<Cid> {
93
+
let storage_guard = self.storage.read().await;
94
+
storage_guard.get_root().await
95
+
}
96
+
97
+
// Transactors
98
+
// -------------------
99
+
100
+
#[deprecated]
101
+
pub async fn create_repo_legacy(
102
+
&self,
103
+
keypair: Keypair,
104
+
writes: Vec<PreparedCreateOrUpdate>,
105
+
) -> Result<CommitData> {
106
+
let write_ops = writes
107
+
.clone()
108
+
.into_iter()
109
+
.map(|prepare| {
110
+
let at_uri: AtUri = prepare.uri.try_into()?;
111
+
Ok(RecordCreateOrUpdateOp {
112
+
action: WriteOpAction::Create,
113
+
collection: at_uri.get_collection(),
114
+
rkey: at_uri.get_rkey(),
115
+
record: prepare.record,
116
+
})
117
+
})
118
+
.collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?;
119
+
let commit = Repo::format_init_commit(
120
+
self.storage.clone(),
121
+
self.did.clone(),
122
+
keypair,
123
+
Some(write_ops),
124
+
)
125
+
.await?;
126
+
let storage_guard = self.storage.read().await;
127
+
storage_guard.apply_commit(commit.clone(), None).await?;
128
+
let writes = writes
129
+
.into_iter()
130
+
.map(PreparedWrite::Create)
131
+
.collect::<Vec<PreparedWrite>>();
132
+
self.blob.process_write_blobs(writes).await?;
133
+
Ok(commit)
134
+
}
135
+
136
+
pub async fn create_repo(
137
+
&self,
138
+
keypair: Keypair,
139
+
writes: Vec<PreparedCreateOrUpdate>,
140
+
) -> Result<CommitDataWithOps> {
141
+
let write_ops = writes
142
+
.clone()
143
+
.into_iter()
144
+
.map(|prepare| {
145
+
let at_uri: AtUri = prepare.uri.try_into()?;
146
+
Ok(RecordCreateOrUpdateOp {
147
+
action: WriteOpAction::Create,
148
+
collection: at_uri.get_collection(),
149
+
rkey: at_uri.get_rkey(),
150
+
record: prepare.record,
151
+
})
152
+
})
153
+
.collect::<Result<Vec<RecordCreateOrUpdateOp>>>()?;
154
+
let commit = Repo::format_init_commit(
155
+
self.storage.clone(),
156
+
self.did.clone(),
157
+
keypair,
158
+
Some(write_ops),
159
+
)
160
+
.await?;
161
+
let storage_guard = self.storage.read().await;
162
+
storage_guard.apply_commit(commit.clone(), None).await?;
163
+
let write_commit_ops = writes.iter().try_fold(
164
+
Vec::with_capacity(writes.len()),
165
+
|mut acc, w| -> Result<Vec<CommitOp>> {
166
+
let aturi: AtUri = w.uri.clone().try_into()?;
167
+
acc.push(CommitOp {
168
+
action: CommitAction::Create,
169
+
path: format_data_key(aturi.get_collection(), aturi.get_rkey()),
170
+
cid: Some(w.cid.clone()),
171
+
prev: None,
172
+
});
173
+
Ok(acc)
174
+
},
175
+
)?;
176
+
let writes = writes
177
+
.into_iter()
178
+
.map(PreparedWrite::Create)
179
+
.collect::<Vec<PreparedWrite>>();
180
+
self.blob.process_write_blobs(writes).await?;
181
+
Ok(CommitDataWithOps {
182
+
commit_data: commit,
183
+
ops: write_commit_ops,
184
+
prev_data: None,
185
+
})
186
+
}
187
+
188
+
pub async fn process_import_repo(
189
+
&mut self,
190
+
commit: CommitData,
191
+
writes: Vec<PreparedWrite>,
192
+
) -> Result<()> {
193
+
{
194
+
let immutable_borrow = &self;
195
+
// & send to indexing
196
+
immutable_borrow
197
+
.index_writes(writes.clone(), &commit.rev)
198
+
.await?;
199
+
}
200
+
// persist the commit to repo storage
201
+
let storage_guard = self.storage.read().await;
202
+
storage_guard.apply_commit(commit.clone(), None).await?;
203
+
// process blobs
204
+
self.blob.process_write_blobs(writes).await?;
205
+
Ok(())
206
+
}
207
+
208
+
pub async fn process_writes(
209
+
&mut self,
210
+
writes: Vec<PreparedWrite>,
211
+
swap_commit_cid: Option<Cid>,
212
+
) -> Result<CommitDataWithOps> {
213
+
// NOTE: In the typescript PR on sync v1.1
214
+
// there are some safeguards added for adding
215
+
// very large commits and very many commits
216
+
// for which I'm sure we could safeguard on
217
+
// but may not be necessary.
218
+
// https://github.com/bluesky-social/atproto/pull/3585/files#diff-7627844a4a6b50190014e947d1331a96df3c64d4c5273fa0ce544f85c3c1265f
219
+
let commit = self.format_commit(writes.clone(), swap_commit_cid).await?;
220
+
{
221
+
let immutable_borrow = &self;
222
+
// & send to indexing
223
+
immutable_borrow
224
+
.index_writes(writes.clone(), &commit.commit_data.rev)
225
+
.await?;
226
+
}
227
+
// persist the commit to repo storage
228
+
let storage_guard = self.storage.read().await;
229
+
storage_guard
230
+
.apply_commit(commit.commit_data.clone(), None)
231
+
.await?;
232
+
// process blobs
233
+
self.blob.process_write_blobs(writes).await?;
234
+
Ok(commit)
235
+
}
236
+
237
+
pub async fn get_sync_event_data(&mut self) -> Result<SyncEvtData> {
238
+
let storage_guard = self.storage.read().await;
239
+
let current_root = storage_guard.get_root_detailed().await?;
240
+
let blocks_and_missing = storage_guard.get_blocks(vec![current_root.cid]).await?;
241
+
Ok(SyncEvtData {
242
+
cid: current_root.cid,
243
+
rev: current_root.rev,
244
+
blocks: blocks_and_missing.blocks,
245
+
})
246
+
}
247
+
248
+
pub async fn format_commit(
249
+
&mut self,
250
+
writes: Vec<PreparedWrite>,
251
+
swap_commit: Option<Cid>,
252
+
) -> Result<CommitDataWithOps> {
253
+
let current_root = {
254
+
let storage_guard = self.storage.read().await;
255
+
storage_guard.get_root_detailed().await
256
+
};
257
+
if let Ok(current_root) = current_root {
258
+
if let Some(swap_commit) = swap_commit {
259
+
if !current_root.cid.eq(&swap_commit) {
260
+
return Err(
261
+
FormatCommitError::BadCommitSwap(current_root.cid.to_string()).into(),
262
+
);
263
+
}
264
+
}
265
+
{
266
+
let mut storage_guard = self.storage.write().await;
267
+
storage_guard.cache_rev(current_root.rev).await?;
268
+
}
269
+
let mut new_record_cids: Vec<Cid> = vec![];
270
+
let mut delete_and_update_uris = vec![];
271
+
let mut commit_ops = vec![];
272
+
for write in &writes {
273
+
let commit_action: CommitAction = write.action().into();
274
+
match write.clone() {
275
+
PreparedWrite::Create(c) => new_record_cids.push(c.cid),
276
+
PreparedWrite::Update(u) => {
277
+
new_record_cids.push(u.cid);
278
+
let u_at_uri: AtUri = u.uri.try_into()?;
279
+
delete_and_update_uris.push(u_at_uri);
280
+
}
281
+
PreparedWrite::Delete(d) => {
282
+
let d_at_uri: AtUri = d.uri.try_into()?;
283
+
delete_and_update_uris.push(d_at_uri)
284
+
}
285
+
}
286
+
if write.swap_cid().is_none() {
287
+
continue;
288
+
}
289
+
let write_at_uri: &AtUri = &write.uri().try_into()?;
290
+
let record = self
291
+
.record
292
+
.get_record(write_at_uri, None, Some(true))
293
+
.await?;
294
+
let current_record = match record {
295
+
Some(record) => Some(Cid::from_str(&record.cid)?),
296
+
None => None,
297
+
};
298
+
let cid = match &write {
299
+
&PreparedWrite::Delete(_) => None,
300
+
&PreparedWrite::Create(w) | &PreparedWrite::Update(w) => Some(w.cid),
301
+
};
302
+
let mut op = CommitOp {
303
+
action: commit_action,
304
+
path: format_data_key(write_at_uri.get_collection(), write_at_uri.get_rkey()),
305
+
cid,
306
+
prev: None,
307
+
};
308
+
if let Some(_) = current_record {
309
+
op.prev = current_record;
310
+
};
311
+
commit_ops.push(op);
312
+
match write {
313
+
// There should be no current record for a create
314
+
PreparedWrite::Create(_) if write.swap_cid().is_some() => {
315
+
Err::<(), anyhow::Error>(
316
+
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
317
+
.into(),
318
+
)
319
+
}
320
+
// There should be a current record for an update
321
+
PreparedWrite::Update(_) if write.swap_cid().is_none() => {
322
+
Err::<(), anyhow::Error>(
323
+
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
324
+
.into(),
325
+
)
326
+
}
327
+
// There should be a current record for a delete
328
+
PreparedWrite::Delete(_) if write.swap_cid().is_none() => {
329
+
Err::<(), anyhow::Error>(
330
+
FormatCommitError::BadRecordSwap(format!("{:?}", current_record))
331
+
.into(),
332
+
)
333
+
}
334
+
_ => Ok::<(), anyhow::Error>(()),
335
+
}?;
336
+
match (current_record, write.swap_cid()) {
337
+
(Some(current_record), Some(swap_cid)) if current_record.eq(swap_cid) => {
338
+
Ok::<(), anyhow::Error>(())
339
+
}
340
+
_ => Err::<(), anyhow::Error>(
341
+
FormatCommitError::RecordSwapMismatch(format!("{:?}", current_record))
342
+
.into(),
343
+
),
344
+
}?;
345
+
}
346
+
let mut repo = Repo::load(self.storage.clone(), Some(current_root.cid)).await?;
347
+
let previous_data = repo.commit.data;
348
+
let write_ops: Vec<RecordWriteOp> = writes
349
+
.into_iter()
350
+
.map(write_to_op)
351
+
.collect::<Result<Vec<RecordWriteOp>>>()?;
352
+
// @TODO: Use repo signing key global config
353
+
let secp = Secp256k1::new();
354
+
let repo_private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX").unwrap();
355
+
let repo_secret_key =
356
+
SecretKey::from_slice(&hex::decode(repo_private_key.as_bytes()).unwrap()).unwrap();
357
+
let repo_signing_key = Keypair::from_secret_key(&secp, &repo_secret_key);
358
+
359
+
let mut commit = repo
360
+
.format_commit(RecordWriteEnum::List(write_ops), repo_signing_key)
361
+
.await?;
362
+
363
+
// find blocks that would be deleted but are referenced by another record
364
+
let duplicate_record_cids = self
365
+
.get_duplicate_record_cids(commit.removed_cids.to_list(), delete_and_update_uris)
366
+
.await?;
367
+
for cid in duplicate_record_cids {
368
+
commit.removed_cids.delete(cid)
369
+
}
370
+
371
+
// find blocks that are relevant to ops but not included in diff
372
+
// (for instance a record that was moved but cid stayed the same)
373
+
let new_record_blocks = commit.relevant_blocks.get_many(new_record_cids)?;
374
+
if !new_record_blocks.missing.is_empty() {
375
+
let missing_blocks = {
376
+
let storage_guard = self.storage.read().await;
377
+
storage_guard.get_blocks(new_record_blocks.missing).await?
378
+
};
379
+
commit.relevant_blocks.add_map(missing_blocks.blocks)?;
380
+
}
381
+
let commit_with_data_ops = CommitDataWithOps {
382
+
ops: commit_ops,
383
+
commit_data: commit,
384
+
prev_data: Some(previous_data),
385
+
};
386
+
Ok(commit_with_data_ops)
387
+
} else {
388
+
Err(FormatCommitError::MissingRepoRoot(self.did.clone()).into())
389
+
}
390
+
}
391
+
392
+
pub async fn index_writes(&self, writes: Vec<PreparedWrite>, rev: &str) -> Result<()> {
393
+
let now: &str = &rsky_common::now();
394
+
395
+
let _ = stream::iter(writes)
396
+
.then(|write| async move {
397
+
Ok::<(), anyhow::Error>(match write {
398
+
PreparedWrite::Create(write) => {
399
+
let write_at_uri: AtUri = write.uri.try_into()?;
400
+
self.record
401
+
.index_record(
402
+
write_at_uri.clone(),
403
+
write.cid,
404
+
Some(write.record),
405
+
Some(write.action),
406
+
rev.to_owned(),
407
+
Some(now.to_string()),
408
+
)
409
+
.await?
410
+
}
411
+
PreparedWrite::Update(write) => {
412
+
let write_at_uri: AtUri = write.uri.try_into()?;
413
+
self.record
414
+
.index_record(
415
+
write_at_uri.clone(),
416
+
write.cid,
417
+
Some(write.record),
418
+
Some(write.action),
419
+
rev.to_owned(),
420
+
Some(now.to_string()),
421
+
)
422
+
.await?
423
+
}
424
+
PreparedWrite::Delete(write) => {
425
+
let write_at_uri: AtUri = write.uri.try_into()?;
426
+
self.record.delete_record(&write_at_uri).await?
427
+
}
428
+
})
429
+
})
430
+
.collect::<Vec<_>>()
431
+
.await
432
+
.into_iter()
433
+
.collect::<Result<Vec<_>, _>>()?;
434
+
Ok(())
435
+
}
436
+
437
+
pub async fn destroy(&mut self) -> Result<()> {
438
+
let did: String = self.did.clone();
439
+
let storage_guard = self.storage.read().await;
440
+
let db: Arc<DbConn> = storage_guard.db.clone();
441
+
use rsky_pds::schema::pds::blob::dsl as BlobSchema;
442
+
443
+
let blob_rows: Vec<String> = db
444
+
.run(move |conn| {
445
+
BlobSchema::blob
446
+
.filter(BlobSchema::did.eq(did))
447
+
.select(BlobSchema::cid)
448
+
.get_results(conn)
449
+
})
450
+
.await?;
451
+
let cids = blob_rows
452
+
.into_iter()
453
+
.map(|row| Ok(Cid::from_str(&row)?))
454
+
.collect::<Result<Vec<Cid>>>()?;
455
+
let _ = stream::iter(cids.chunks(500))
456
+
.then(|chunk| async { self.blob.blobstore.delete_many(chunk.to_vec()).await })
457
+
.collect::<Vec<_>>()
458
+
.await
459
+
.into_iter()
460
+
.collect::<Result<Vec<_>, _>>()?;
461
+
Ok(())
462
+
}
463
+
464
+
pub async fn get_duplicate_record_cids(
465
+
&self,
466
+
cids: Vec<Cid>,
467
+
touched_uris: Vec<AtUri>,
468
+
) -> Result<Vec<Cid>> {
469
+
if touched_uris.is_empty() || cids.is_empty() {
470
+
return Ok(vec![]);
471
+
}
472
+
let did: String = self.did.clone();
473
+
let storage_guard = self.storage.read().await;
474
+
let db: Arc<DbConn> = storage_guard.db.clone();
475
+
use rsky_pds::schema::pds::record::dsl as RecordSchema;
476
+
477
+
let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
478
+
let touched_uri_strs: Vec<String> = touched_uris.iter().map(|t| t.to_string()).collect();
479
+
let res: Vec<String> = db
480
+
.run(move |conn| {
481
+
RecordSchema::record
482
+
.filter(RecordSchema::did.eq(did))
483
+
.filter(RecordSchema::cid.eq_any(cid_strs))
484
+
.filter(RecordSchema::uri.ne_all(touched_uri_strs))
485
+
.select(RecordSchema::cid)
486
+
.get_results(conn)
487
+
})
488
+
.await?;
489
+
res.into_iter()
490
+
.map(|row| Cid::from_str(&row).map_err(|error| anyhow::Error::new(error)))
491
+
.collect::<Result<Vec<Cid>>>()
492
+
}
493
+
}