Alternative ATProto PDS implementation
1//! Record storage and retrieval for the actor store.
2//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/record/mod.rs
3//! blacksky-algorithms/rsky is licensed under the Apache License 2.0
4//!
5//! Modified for SQLite backend
6
7use crate::models::actor_store::{Backlink, Record, RepoBlock};
8use anyhow::{Result, bail};
9use cidv10::Cid;
10use diesel::result::Error;
11use diesel::*;
12use futures::stream::{self, StreamExt};
13use rsky_lexicon::com::atproto::admin::StatusAttr;
14use rsky_pds::actor_store::record::{GetRecord, RecordsForCollection};
15use rsky_repo::storage::Ipld;
16use rsky_repo::types::{Ids, Lex, RepoRecord, WriteOpAction};
17use rsky_repo::util::cbor_to_lex_record;
18use rsky_syntax::aturi::AtUri;
19use rsky_syntax::aturi_validation::ensure_valid_at_uri;
20use rsky_syntax::did::ensure_valid_did;
21use serde_json::Value as JsonValue;
22use std::env;
23use std::str::FromStr;
24
25// @NOTE in the future this can be replaced with a more generic routine that pulls backlinks based on lex docs.
26// For now, we just want to ensure we're tracking links from follows, blocks, likes, and reposts.
27pub fn get_backlinks(uri: &AtUri, record: &RepoRecord) -> Result<Vec<Backlink>> {
28 if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(record_type)))) = record.get("$type") {
29 if record_type == Ids::AppBskyGraphFollow.as_str()
30 || record_type == Ids::AppBskyGraphBlock.as_str()
31 {
32 if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(subject)))) = record.get("subject") {
33 match ensure_valid_did(uri) {
34 Ok(_) => {
35 return Ok(vec![Backlink {
36 uri: uri.to_string(),
37 path: "subject".to_owned(),
38 link_to: subject.clone(),
39 }]);
40 }
41 Err(e) => bail!("get_backlinks Error: invalid did {}", e),
42 };
43 }
44 } else if record_type == Ids::AppBskyFeedLike.as_str()
45 || record_type == Ids::AppBskyFeedRepost.as_str()
46 {
47 if let Some(Lex::Map(ref_object)) = record.get("subject") {
48 if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(subject_uri)))) =
49 ref_object.get("uri")
50 {
51 match ensure_valid_at_uri(uri) {
52 Ok(_) => {
53 return Ok(vec![Backlink {
54 uri: uri.to_string(),
55 path: "subject.uri".to_owned(),
56 link_to: subject_uri.clone(),
57 }]);
58 }
59 Err(e) => bail!("get_backlinks Error: invalid AtUri {}", e),
60 };
61 }
62 }
63 }
64 }
65 Ok(Vec::new())
66}
67
68/// Combined handler for record operations with both read and write capabilities.
69pub(crate) struct RecordReader {
70 /// Database connection.
71 pub db: deadpool_diesel::Pool<
72 deadpool_diesel::Manager<SqliteConnection>,
73 deadpool_diesel::sqlite::Object,
74 >,
75 /// DID of the actor.
76 pub did: String,
77}
78
79impl RecordReader {
80 /// Create a new record handler.
81 pub(crate) const fn new(
82 did: String,
83 db: deadpool_diesel::Pool<
84 deadpool_diesel::Manager<SqliteConnection>,
85 deadpool_diesel::sqlite::Object,
86 >,
87 ) -> Self {
88 Self { did, db }
89 }
90
91 /// Count the total number of records.
92 pub(crate) async fn record_count(&mut self) -> Result<i64> {
93 use crate::schema::actor_store::record::dsl::*;
94
95 let other_did = self.did.clone();
96 self.db
97 .get()
98 .await?
99 .interact(move |conn| {
100 let res: i64 = record.filter(did.eq(&other_did)).count().get_result(conn)?;
101 Ok(res)
102 })
103 .await
104 .expect("Failed to count records")
105 }
106
107 /// List all collections in the repository.
108 pub(crate) async fn list_collections(&self) -> Result<Vec<String>> {
109 use crate::schema::actor_store::record::dsl::*;
110
111 let other_did = self.did.clone();
112 self.db
113 .get()
114 .await?
115 .interact(move |conn| {
116 let collections = record
117 .filter(did.eq(&other_did))
118 .select(collection)
119 .group_by(collection)
120 .load::<String>(conn)?
121 .into_iter()
122 .collect::<Vec<String>>();
123 Ok(collections)
124 })
125 .await
126 .expect("Failed to list collections")
127 }
128
129 /// List records for a specific collection.
130 pub(crate) async fn list_records_for_collection(
131 &mut self,
132 collection: String,
133 limit: i64,
134 reverse: bool,
135 cursor: Option<String>,
136 rkey_start: Option<String>,
137 rkey_end: Option<String>,
138 include_soft_deleted: Option<bool>,
139 ) -> Result<Vec<RecordsForCollection>> {
140 use crate::schema::actor_store::record::dsl as RecordSchema;
141 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
142
143 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false);
144 let mut builder = RecordSchema::record
145 .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid)))
146 .limit(limit)
147 .select((Record::as_select(), RepoBlock::as_select()))
148 .filter(RecordSchema::did.eq(self.did.clone()))
149 .filter(RecordSchema::collection.eq(collection))
150 .into_boxed();
151 if !include_soft_deleted {
152 builder = builder.filter(RecordSchema::takedownRef.is_null());
153 }
154 if reverse {
155 builder = builder.order(RecordSchema::rkey.asc());
156 } else {
157 builder = builder.order(RecordSchema::rkey.desc());
158 }
159
160 if let Some(cursor) = cursor {
161 if reverse {
162 builder = builder.filter(RecordSchema::rkey.gt(cursor));
163 } else {
164 builder = builder.filter(RecordSchema::rkey.lt(cursor));
165 }
166 } else {
167 if let Some(rkey_start) = rkey_start {
168 builder = builder.filter(RecordSchema::rkey.gt(rkey_start));
169 }
170 if let Some(rkey_end) = rkey_end {
171 builder = builder.filter(RecordSchema::rkey.lt(rkey_end));
172 }
173 }
174 let res: Vec<(Record, RepoBlock)> = self
175 .db
176 .get()
177 .await?
178 .interact(move |conn| builder.load(conn))
179 .await
180 .expect("Failed to load records")?;
181 res.into_iter()
182 .map(|row| {
183 Ok(RecordsForCollection {
184 uri: row.0.uri,
185 cid: row.0.cid,
186 value: cbor_to_lex_record(row.1.content)?,
187 })
188 })
189 .collect::<Result<Vec<RecordsForCollection>>>()
190 }
191
192 /// Get a specific record by URI.
193 pub(crate) async fn get_record(
194 &mut self,
195 uri: &AtUri,
196 cid: Option<String>,
197 include_soft_deleted: Option<bool>,
198 ) -> Result<Option<GetRecord>> {
199 use crate::schema::actor_store::record::dsl as RecordSchema;
200 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
201
202 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false);
203 let mut builder = RecordSchema::record
204 .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid)))
205 .select((Record::as_select(), RepoBlock::as_select()))
206 .filter(RecordSchema::uri.eq(uri.to_string()))
207 .into_boxed();
208 if !include_soft_deleted {
209 builder = builder.filter(RecordSchema::takedownRef.is_null());
210 }
211 if let Some(cid) = cid {
212 builder = builder.filter(RecordSchema::cid.eq(cid));
213 }
214 let record: Option<(Record, RepoBlock)> = self
215 .db
216 .get()
217 .await?
218 .interact(move |conn| builder.first(conn).optional())
219 .await
220 .expect("Failed to load record")?;
221 if let Some(record) = record {
222 Ok(Some(GetRecord {
223 uri: record.0.uri,
224 cid: record.0.cid,
225 value: cbor_to_lex_record(record.1.content)?,
226 indexed_at: record.0.indexed_at,
227 takedown_ref: record.0.takedown_ref,
228 }))
229 } else {
230 Ok(None)
231 }
232 }
233
234 /// Check if a record exists.
235 pub(crate) async fn has_record(
236 &mut self,
237 uri: String,
238 cid: Option<String>,
239 include_soft_deleted: Option<bool>,
240 ) -> Result<bool> {
241 use crate::schema::actor_store::record::dsl as RecordSchema;
242
243 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false);
244 let mut builder = RecordSchema::record
245 .select(RecordSchema::uri)
246 .filter(RecordSchema::uri.eq(uri))
247 .into_boxed();
248 if !include_soft_deleted {
249 builder = builder.filter(RecordSchema::takedownRef.is_null());
250 }
251 if let Some(cid) = cid {
252 builder = builder.filter(RecordSchema::cid.eq(cid));
253 }
254 let record_uri = self
255 .db
256 .get()
257 .await?
258 .interact(move |conn| builder.first::<String>(conn).optional())
259 .await
260 .expect("Failed to check record")?;
261 Ok(record_uri.is_some())
262 }
263
264 /// Get the takedown status of a record.
265 pub(crate) async fn get_record_takedown_status(
266 &self,
267 uri: String,
268 ) -> Result<Option<StatusAttr>> {
269 use crate::schema::actor_store::record::dsl as RecordSchema;
270
271 let res = self
272 .db
273 .get()
274 .await?
275 .interact(move |conn| {
276 RecordSchema::record
277 .select(RecordSchema::takedownRef)
278 .filter(RecordSchema::uri.eq(uri))
279 .first::<Option<String>>(conn)
280 .optional()
281 })
282 .await
283 .expect("Failed to get takedown status")?;
284 res.map_or_else(
285 || Ok(None),
286 |res| {
287 res.map_or_else(
288 || {
289 Ok(Some(StatusAttr {
290 applied: false,
291 r#ref: None,
292 }))
293 },
294 |takedown_ref| {
295 Ok(Some(StatusAttr {
296 applied: true,
297 r#ref: Some(takedown_ref),
298 }))
299 },
300 )
301 },
302 )
303 }
304
305 /// Get the current CID for a record URI.
306 pub(crate) async fn get_current_record_cid(&self, uri: String) -> Result<Option<Cid>> {
307 use crate::schema::actor_store::record::dsl as RecordSchema;
308
309 let res = self
310 .db
311 .get()
312 .await?
313 .interact(move |conn| {
314 RecordSchema::record
315 .select(RecordSchema::cid)
316 .filter(RecordSchema::uri.eq(uri))
317 .first::<String>(conn)
318 .optional()
319 })
320 .await
321 .expect("Failed to get current CID")?;
322 if let Some(res) = res {
323 Ok(Some(Cid::from_str(&res)?))
324 } else {
325 Ok(None)
326 }
327 }
328
329 /// Get backlinks for a record.
330 pub(crate) async fn get_record_backlinks(
331 &self,
332 collection: String,
333 path: String,
334 link_to: String,
335 ) -> Result<Vec<Record>> {
336 use crate::schema::actor_store::backlink::dsl as BacklinkSchema;
337 use crate::schema::actor_store::record::dsl as RecordSchema;
338
339 let res = self
340 .db
341 .get()
342 .await?
343 .interact(move |conn| {
344 RecordSchema::record
345 .inner_join(
346 BacklinkSchema::backlink.on(BacklinkSchema::uri.eq(RecordSchema::uri)),
347 )
348 .select(Record::as_select())
349 .filter(BacklinkSchema::path.eq(path))
350 .filter(BacklinkSchema::linkTo.eq(link_to))
351 .filter(RecordSchema::collection.eq(collection))
352 .load::<Record>(conn)
353 })
354 .await
355 .expect("Failed to get backlinks")?;
356 Ok(res)
357 }
358
359 /// Get backlink conflicts for a record.
360 pub(crate) async fn get_backlink_conflicts(
361 &self,
362 uri: &AtUri,
363 record: &RepoRecord,
364 ) -> Result<Vec<AtUri>> {
365 let record_backlinks = get_backlinks(uri, record)?;
366 let conflicts: Vec<Vec<Record>> = stream::iter(record_backlinks)
367 .then(|backlink| async move {
368 Ok::<Vec<Record>, anyhow::Error>(
369 self.get_record_backlinks(
370 uri.get_collection(),
371 backlink.path,
372 backlink.link_to,
373 )
374 .await?,
375 )
376 })
377 .collect::<Vec<_>>()
378 .await
379 .into_iter()
380 .collect::<Result<Vec<_>, _>>()?;
381 Ok(conflicts
382 .into_iter()
383 .flatten()
384 .filter_map(|record| {
385 AtUri::make(
386 env::var("BLUEPDS_HOST_NAME").unwrap_or("localhost".to_owned()),
387 Some(String::from(uri.get_collection())),
388 Some(record.rkey),
389 )
390 .ok()
391 })
392 .collect::<Vec<AtUri>>())
393 }
394
395 // Transactor methods
396 // -----------------
397
398 /// Index a record in the database.
399 #[tracing::instrument(skip_all)]
400 pub(crate) async fn index_record(
401 &self,
402 uri: AtUri,
403 cid: Cid,
404 record: Option<RepoRecord>,
405 action: Option<WriteOpAction>, // Create or update with a default of create
406 repo_rev: String,
407 timestamp: Option<String>,
408 ) -> Result<()> {
409 tracing::debug!("@LOG DEBUG RecordReader::index_record, indexing record {uri}");
410
411 let collection = uri.get_collection();
412 let rkey = uri.get_rkey();
413 let hostname = uri.get_hostname().to_string();
414 let action = action.unwrap_or(WriteOpAction::Create);
415 let indexed_at = timestamp.unwrap_or_else(rsky_common::now);
416 let row = Record {
417 did: self.did.clone(),
418 uri: uri.to_string(),
419 cid: cid.to_string(),
420 collection: collection.clone(),
421 rkey: rkey.to_string(),
422 repo_rev: Some(repo_rev.clone()),
423 indexed_at: indexed_at.clone(),
424 takedown_ref: None,
425 };
426
427 if !hostname.starts_with("did:") {
428 bail!("Expected indexed URI to contain DID")
429 } else if collection.is_empty() {
430 bail!("Expected indexed URI to contain a collection")
431 } else if rkey.is_empty() {
432 bail!("Expected indexed URI to contain a record key")
433 }
434
435 use crate::schema::actor_store::record::dsl as RecordSchema;
436
437 // Track current version of record
438 let (record, uri) = self
439 .db
440 .get()
441 .await?
442 .interact(move |conn| {
443 _ = insert_into(RecordSchema::record)
444 .values(row)
445 .on_conflict(RecordSchema::uri)
446 .do_update()
447 .set((
448 RecordSchema::cid.eq(cid.to_string()),
449 RecordSchema::repoRev.eq(&repo_rev),
450 RecordSchema::indexedAt.eq(&indexed_at),
451 ))
452 .execute(conn)?;
453 Ok::<_, Error>((record, uri))
454 })
455 .await
456 .expect("Failed to index record")?;
457
458 if let Some(record) = record {
459 // Maintain backlinks
460 let backlinks = get_backlinks(&uri, &record)?;
461 if action == WriteOpAction::Update {
462 // On update just recreate backlinks from scratch for the record, so we can clear out
463 // the old ones. E.g. for weird cases like updating a follow to be for a different did.
464 self.remove_backlinks_by_uri(&uri).await?;
465 }
466 self.add_backlinks(backlinks).await?;
467 }
468 tracing::debug!("@LOG DEBUG RecordReader::index_record, indexed record {uri}");
469 Ok(())
470 }
471
472 /// Delete a record from the database.
473 #[tracing::instrument(skip_all)]
474 pub(crate) async fn delete_record(&self, uri: &AtUri) -> Result<()> {
475 tracing::debug!("@LOG DEBUG RecordReader::delete_record, deleting indexed record {uri}");
476 use crate::schema::actor_store::backlink::dsl as BacklinkSchema;
477 use crate::schema::actor_store::record::dsl as RecordSchema;
478 let uri = uri.to_string();
479 self.db
480 .get()
481 .await?
482 .interact(move |conn| {
483 _ = delete(RecordSchema::record)
484 .filter(RecordSchema::uri.eq(&uri))
485 .execute(conn)?;
486 _ = delete(BacklinkSchema::backlink)
487 .filter(BacklinkSchema::uri.eq(&uri))
488 .execute(conn)?;
489 tracing::debug!(
490 "@LOG DEBUG RecordReader::delete_record, deleted indexed record {uri}"
491 );
492 Ok(())
493 })
494 .await
495 .expect("Failed to delete record")
496 }
497
498 /// Remove backlinks for a URI.
499 pub(crate) async fn remove_backlinks_by_uri(&self, uri: &AtUri) -> Result<()> {
500 use crate::schema::actor_store::backlink::dsl as BacklinkSchema;
501 let uri = uri.to_string();
502 self.db
503 .get()
504 .await?
505 .interact(move |conn| {
506 _ = delete(BacklinkSchema::backlink)
507 .filter(BacklinkSchema::uri.eq(uri))
508 .execute(conn)?;
509 Ok(())
510 })
511 .await
512 .expect("Failed to remove backlinks")
513 }
514
515 /// Add backlinks to the database.
516 pub(crate) async fn add_backlinks(&self, backlinks: Vec<Backlink>) -> Result<()> {
517 if backlinks.is_empty() {
518 Ok(())
519 } else {
520 use crate::schema::actor_store::backlink::dsl as BacklinkSchema;
521 self.db
522 .get()
523 .await?
524 .interact(move |conn| {
525 _ = insert_or_ignore_into(BacklinkSchema::backlink)
526 .values(&backlinks)
527 .execute(conn)?;
528 Ok(())
529 })
530 .await
531 .expect("Failed to add backlinks")
532 }
533 }
534
535 /// Update the takedown status of a record.
536 pub(crate) async fn update_record_takedown_status(
537 &self,
538 uri: &AtUri,
539 takedown: StatusAttr,
540 ) -> Result<()> {
541 use crate::schema::actor_store::record::dsl as RecordSchema;
542
543 let takedown_ref: Option<String> = match takedown.applied {
544 true => takedown
545 .r#ref
546 .map_or_else(|| Some(rsky_common::now()), Some),
547 false => None,
548 };
549 let uri_string = uri.to_string();
550
551 self.db
552 .get()
553 .await?
554 .interact(move |conn| {
555 _ = update(RecordSchema::record)
556 .filter(RecordSchema::uri.eq(uri_string))
557 .set(RecordSchema::takedownRef.eq(takedown_ref))
558 .execute(conn)?;
559 Ok(())
560 })
561 .await
562 .expect("Failed to update takedown status")
563 }
564}