Alternative ATProto PDS implementation
at oauth 20 kB view raw
1//! Record storage and retrieval for the actor store. 2//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/record/mod.rs 3//! blacksky-algorithms/rsky is licensed under the Apache License 2.0 4//! 5//! Modified for SQLite backend 6 7use crate::models::actor_store::{Backlink, Record, RepoBlock}; 8use anyhow::{Result, bail}; 9use cidv10::Cid; 10use diesel::result::Error; 11use diesel::*; 12use futures::stream::{self, StreamExt}; 13use rsky_lexicon::com::atproto::admin::StatusAttr; 14use rsky_pds::actor_store::record::{GetRecord, RecordsForCollection}; 15use rsky_repo::storage::Ipld; 16use rsky_repo::types::{Ids, Lex, RepoRecord, WriteOpAction}; 17use rsky_repo::util::cbor_to_lex_record; 18use rsky_syntax::aturi::AtUri; 19use rsky_syntax::aturi_validation::ensure_valid_at_uri; 20use rsky_syntax::did::ensure_valid_did; 21use serde_json::Value as JsonValue; 22use std::env; 23use std::str::FromStr; 24 25// @NOTE in the future this can be replaced with a more generic routine that pulls backlinks based on lex docs. 26// For now, we just want to ensure we're tracking links from follows, blocks, likes, and reposts. 27pub fn get_backlinks(uri: &AtUri, record: &RepoRecord) -> Result<Vec<Backlink>> { 28 if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(record_type)))) = record.get("$type") { 29 if record_type == Ids::AppBskyGraphFollow.as_str() 30 || record_type == Ids::AppBskyGraphBlock.as_str() 31 { 32 if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(subject)))) = record.get("subject") { 33 match ensure_valid_did(uri) { 34 Ok(_) => { 35 return Ok(vec![Backlink { 36 uri: uri.to_string(), 37 path: "subject".to_owned(), 38 link_to: subject.clone(), 39 }]); 40 } 41 Err(e) => bail!("get_backlinks Error: invalid did {}", e), 42 }; 43 } 44 } else if record_type == Ids::AppBskyFeedLike.as_str() 45 || record_type == Ids::AppBskyFeedRepost.as_str() 46 { 47 if let Some(Lex::Map(ref_object)) = record.get("subject") { 48 if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(subject_uri)))) = 49 ref_object.get("uri") 50 { 51 match ensure_valid_at_uri(uri) { 52 Ok(_) => { 53 return Ok(vec![Backlink { 54 uri: uri.to_string(), 55 path: "subject.uri".to_owned(), 56 link_to: subject_uri.clone(), 57 }]); 58 } 59 Err(e) => bail!("get_backlinks Error: invalid AtUri {}", e), 60 }; 61 } 62 } 63 } 64 } 65 Ok(Vec::new()) 66} 67 68/// Combined handler for record operations with both read and write capabilities. 69pub(crate) struct RecordReader { 70 /// Database connection. 71 pub db: deadpool_diesel::Pool< 72 deadpool_diesel::Manager<SqliteConnection>, 73 deadpool_diesel::sqlite::Object, 74 >, 75 /// DID of the actor. 76 pub did: String, 77} 78 79impl RecordReader { 80 /// Create a new record handler. 81 pub(crate) const fn new( 82 did: String, 83 db: deadpool_diesel::Pool< 84 deadpool_diesel::Manager<SqliteConnection>, 85 deadpool_diesel::sqlite::Object, 86 >, 87 ) -> Self { 88 Self { did, db } 89 } 90 91 /// Count the total number of records. 92 pub(crate) async fn record_count(&mut self) -> Result<i64> { 93 use crate::schema::actor_store::record::dsl::*; 94 95 let other_did = self.did.clone(); 96 self.db 97 .get() 98 .await? 99 .interact(move |conn| { 100 let res: i64 = record.filter(did.eq(&other_did)).count().get_result(conn)?; 101 Ok(res) 102 }) 103 .await 104 .expect("Failed to count records") 105 } 106 107 /// List all collections in the repository. 108 pub(crate) async fn list_collections(&self) -> Result<Vec<String>> { 109 use crate::schema::actor_store::record::dsl::*; 110 111 let other_did = self.did.clone(); 112 self.db 113 .get() 114 .await? 115 .interact(move |conn| { 116 let collections = record 117 .filter(did.eq(&other_did)) 118 .select(collection) 119 .group_by(collection) 120 .load::<String>(conn)? 121 .into_iter() 122 .collect::<Vec<String>>(); 123 Ok(collections) 124 }) 125 .await 126 .expect("Failed to list collections") 127 } 128 129 /// List records for a specific collection. 130 pub(crate) async fn list_records_for_collection( 131 &mut self, 132 collection: String, 133 limit: i64, 134 reverse: bool, 135 cursor: Option<String>, 136 rkey_start: Option<String>, 137 rkey_end: Option<String>, 138 include_soft_deleted: Option<bool>, 139 ) -> Result<Vec<RecordsForCollection>> { 140 use crate::schema::actor_store::record::dsl as RecordSchema; 141 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 142 143 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 144 let mut builder = RecordSchema::record 145 .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) 146 .limit(limit) 147 .select((Record::as_select(), RepoBlock::as_select())) 148 .filter(RecordSchema::did.eq(self.did.clone())) 149 .filter(RecordSchema::collection.eq(collection)) 150 .into_boxed(); 151 if !include_soft_deleted { 152 builder = builder.filter(RecordSchema::takedownRef.is_null()); 153 } 154 if reverse { 155 builder = builder.order(RecordSchema::rkey.asc()); 156 } else { 157 builder = builder.order(RecordSchema::rkey.desc()); 158 } 159 160 if let Some(cursor) = cursor { 161 if reverse { 162 builder = builder.filter(RecordSchema::rkey.gt(cursor)); 163 } else { 164 builder = builder.filter(RecordSchema::rkey.lt(cursor)); 165 } 166 } else { 167 if let Some(rkey_start) = rkey_start { 168 builder = builder.filter(RecordSchema::rkey.gt(rkey_start)); 169 } 170 if let Some(rkey_end) = rkey_end { 171 builder = builder.filter(RecordSchema::rkey.lt(rkey_end)); 172 } 173 } 174 let res: Vec<(Record, RepoBlock)> = self 175 .db 176 .get() 177 .await? 178 .interact(move |conn| builder.load(conn)) 179 .await 180 .expect("Failed to load records")?; 181 res.into_iter() 182 .map(|row| { 183 Ok(RecordsForCollection { 184 uri: row.0.uri, 185 cid: row.0.cid, 186 value: cbor_to_lex_record(row.1.content)?, 187 }) 188 }) 189 .collect::<Result<Vec<RecordsForCollection>>>() 190 } 191 192 /// Get a specific record by URI. 193 pub(crate) async fn get_record( 194 &mut self, 195 uri: &AtUri, 196 cid: Option<String>, 197 include_soft_deleted: Option<bool>, 198 ) -> Result<Option<GetRecord>> { 199 use crate::schema::actor_store::record::dsl as RecordSchema; 200 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 201 202 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 203 let mut builder = RecordSchema::record 204 .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) 205 .select((Record::as_select(), RepoBlock::as_select())) 206 .filter(RecordSchema::uri.eq(uri.to_string())) 207 .into_boxed(); 208 if !include_soft_deleted { 209 builder = builder.filter(RecordSchema::takedownRef.is_null()); 210 } 211 if let Some(cid) = cid { 212 builder = builder.filter(RecordSchema::cid.eq(cid)); 213 } 214 let record: Option<(Record, RepoBlock)> = self 215 .db 216 .get() 217 .await? 218 .interact(move |conn| builder.first(conn).optional()) 219 .await 220 .expect("Failed to load record")?; 221 if let Some(record) = record { 222 Ok(Some(GetRecord { 223 uri: record.0.uri, 224 cid: record.0.cid, 225 value: cbor_to_lex_record(record.1.content)?, 226 indexed_at: record.0.indexed_at, 227 takedown_ref: record.0.takedown_ref, 228 })) 229 } else { 230 Ok(None) 231 } 232 } 233 234 /// Check if a record exists. 235 pub(crate) async fn has_record( 236 &mut self, 237 uri: String, 238 cid: Option<String>, 239 include_soft_deleted: Option<bool>, 240 ) -> Result<bool> { 241 use crate::schema::actor_store::record::dsl as RecordSchema; 242 243 let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 244 let mut builder = RecordSchema::record 245 .select(RecordSchema::uri) 246 .filter(RecordSchema::uri.eq(uri)) 247 .into_boxed(); 248 if !include_soft_deleted { 249 builder = builder.filter(RecordSchema::takedownRef.is_null()); 250 } 251 if let Some(cid) = cid { 252 builder = builder.filter(RecordSchema::cid.eq(cid)); 253 } 254 let record_uri = self 255 .db 256 .get() 257 .await? 258 .interact(move |conn| builder.first::<String>(conn).optional()) 259 .await 260 .expect("Failed to check record")?; 261 Ok(record_uri.is_some()) 262 } 263 264 /// Get the takedown status of a record. 265 pub(crate) async fn get_record_takedown_status( 266 &self, 267 uri: String, 268 ) -> Result<Option<StatusAttr>> { 269 use crate::schema::actor_store::record::dsl as RecordSchema; 270 271 let res = self 272 .db 273 .get() 274 .await? 275 .interact(move |conn| { 276 RecordSchema::record 277 .select(RecordSchema::takedownRef) 278 .filter(RecordSchema::uri.eq(uri)) 279 .first::<Option<String>>(conn) 280 .optional() 281 }) 282 .await 283 .expect("Failed to get takedown status")?; 284 res.map_or_else( 285 || Ok(None), 286 |res| { 287 res.map_or_else( 288 || { 289 Ok(Some(StatusAttr { 290 applied: false, 291 r#ref: None, 292 })) 293 }, 294 |takedown_ref| { 295 Ok(Some(StatusAttr { 296 applied: true, 297 r#ref: Some(takedown_ref), 298 })) 299 }, 300 ) 301 }, 302 ) 303 } 304 305 /// Get the current CID for a record URI. 306 pub(crate) async fn get_current_record_cid(&self, uri: String) -> Result<Option<Cid>> { 307 use crate::schema::actor_store::record::dsl as RecordSchema; 308 309 let res = self 310 .db 311 .get() 312 .await? 313 .interact(move |conn| { 314 RecordSchema::record 315 .select(RecordSchema::cid) 316 .filter(RecordSchema::uri.eq(uri)) 317 .first::<String>(conn) 318 .optional() 319 }) 320 .await 321 .expect("Failed to get current CID")?; 322 if let Some(res) = res { 323 Ok(Some(Cid::from_str(&res)?)) 324 } else { 325 Ok(None) 326 } 327 } 328 329 /// Get backlinks for a record. 330 pub(crate) async fn get_record_backlinks( 331 &self, 332 collection: String, 333 path: String, 334 link_to: String, 335 ) -> Result<Vec<Record>> { 336 use crate::schema::actor_store::backlink::dsl as BacklinkSchema; 337 use crate::schema::actor_store::record::dsl as RecordSchema; 338 339 let res = self 340 .db 341 .get() 342 .await? 343 .interact(move |conn| { 344 RecordSchema::record 345 .inner_join( 346 BacklinkSchema::backlink.on(BacklinkSchema::uri.eq(RecordSchema::uri)), 347 ) 348 .select(Record::as_select()) 349 .filter(BacklinkSchema::path.eq(path)) 350 .filter(BacklinkSchema::linkTo.eq(link_to)) 351 .filter(RecordSchema::collection.eq(collection)) 352 .load::<Record>(conn) 353 }) 354 .await 355 .expect("Failed to get backlinks")?; 356 Ok(res) 357 } 358 359 /// Get backlink conflicts for a record. 360 pub(crate) async fn get_backlink_conflicts( 361 &self, 362 uri: &AtUri, 363 record: &RepoRecord, 364 ) -> Result<Vec<AtUri>> { 365 let record_backlinks = get_backlinks(uri, record)?; 366 let conflicts: Vec<Vec<Record>> = stream::iter(record_backlinks) 367 .then(|backlink| async move { 368 Ok::<Vec<Record>, anyhow::Error>( 369 self.get_record_backlinks( 370 uri.get_collection(), 371 backlink.path, 372 backlink.link_to, 373 ) 374 .await?, 375 ) 376 }) 377 .collect::<Vec<_>>() 378 .await 379 .into_iter() 380 .collect::<Result<Vec<_>, _>>()?; 381 Ok(conflicts 382 .into_iter() 383 .flatten() 384 .filter_map(|record| { 385 AtUri::make( 386 env::var("BLUEPDS_HOST_NAME").unwrap_or("localhost".to_owned()), 387 Some(String::from(uri.get_collection())), 388 Some(record.rkey), 389 ) 390 .ok() 391 }) 392 .collect::<Vec<AtUri>>()) 393 } 394 395 // Transactor methods 396 // ----------------- 397 398 /// Index a record in the database. 399 #[tracing::instrument(skip_all)] 400 pub(crate) async fn index_record( 401 &self, 402 uri: AtUri, 403 cid: Cid, 404 record: Option<RepoRecord>, 405 action: Option<WriteOpAction>, // Create or update with a default of create 406 repo_rev: String, 407 timestamp: Option<String>, 408 ) -> Result<()> { 409 tracing::debug!("@LOG DEBUG RecordReader::index_record, indexing record {uri}"); 410 411 let collection = uri.get_collection(); 412 let rkey = uri.get_rkey(); 413 let hostname = uri.get_hostname().to_string(); 414 let action = action.unwrap_or(WriteOpAction::Create); 415 let indexed_at = timestamp.unwrap_or_else(rsky_common::now); 416 let row = Record { 417 did: self.did.clone(), 418 uri: uri.to_string(), 419 cid: cid.to_string(), 420 collection: collection.clone(), 421 rkey: rkey.to_string(), 422 repo_rev: Some(repo_rev.clone()), 423 indexed_at: indexed_at.clone(), 424 takedown_ref: None, 425 }; 426 427 if !hostname.starts_with("did:") { 428 bail!("Expected indexed URI to contain DID") 429 } else if collection.is_empty() { 430 bail!("Expected indexed URI to contain a collection") 431 } else if rkey.is_empty() { 432 bail!("Expected indexed URI to contain a record key") 433 } 434 435 use crate::schema::actor_store::record::dsl as RecordSchema; 436 437 // Track current version of record 438 let (record, uri) = self 439 .db 440 .get() 441 .await? 442 .interact(move |conn| { 443 _ = insert_into(RecordSchema::record) 444 .values(row) 445 .on_conflict(RecordSchema::uri) 446 .do_update() 447 .set(( 448 RecordSchema::cid.eq(cid.to_string()), 449 RecordSchema::repoRev.eq(&repo_rev), 450 RecordSchema::indexedAt.eq(&indexed_at), 451 )) 452 .execute(conn)?; 453 Ok::<_, Error>((record, uri)) 454 }) 455 .await 456 .expect("Failed to index record")?; 457 458 if let Some(record) = record { 459 // Maintain backlinks 460 let backlinks = get_backlinks(&uri, &record)?; 461 if action == WriteOpAction::Update { 462 // On update just recreate backlinks from scratch for the record, so we can clear out 463 // the old ones. E.g. for weird cases like updating a follow to be for a different did. 464 self.remove_backlinks_by_uri(&uri).await?; 465 } 466 self.add_backlinks(backlinks).await?; 467 } 468 tracing::debug!("@LOG DEBUG RecordReader::index_record, indexed record {uri}"); 469 Ok(()) 470 } 471 472 /// Delete a record from the database. 473 #[tracing::instrument(skip_all)] 474 pub(crate) async fn delete_record(&self, uri: &AtUri) -> Result<()> { 475 tracing::debug!("@LOG DEBUG RecordReader::delete_record, deleting indexed record {uri}"); 476 use crate::schema::actor_store::backlink::dsl as BacklinkSchema; 477 use crate::schema::actor_store::record::dsl as RecordSchema; 478 let uri = uri.to_string(); 479 self.db 480 .get() 481 .await? 482 .interact(move |conn| { 483 _ = delete(RecordSchema::record) 484 .filter(RecordSchema::uri.eq(&uri)) 485 .execute(conn)?; 486 _ = delete(BacklinkSchema::backlink) 487 .filter(BacklinkSchema::uri.eq(&uri)) 488 .execute(conn)?; 489 tracing::debug!( 490 "@LOG DEBUG RecordReader::delete_record, deleted indexed record {uri}" 491 ); 492 Ok(()) 493 }) 494 .await 495 .expect("Failed to delete record") 496 } 497 498 /// Remove backlinks for a URI. 499 pub(crate) async fn remove_backlinks_by_uri(&self, uri: &AtUri) -> Result<()> { 500 use crate::schema::actor_store::backlink::dsl as BacklinkSchema; 501 let uri = uri.to_string(); 502 self.db 503 .get() 504 .await? 505 .interact(move |conn| { 506 _ = delete(BacklinkSchema::backlink) 507 .filter(BacklinkSchema::uri.eq(uri)) 508 .execute(conn)?; 509 Ok(()) 510 }) 511 .await 512 .expect("Failed to remove backlinks") 513 } 514 515 /// Add backlinks to the database. 516 pub(crate) async fn add_backlinks(&self, backlinks: Vec<Backlink>) -> Result<()> { 517 if backlinks.is_empty() { 518 Ok(()) 519 } else { 520 use crate::schema::actor_store::backlink::dsl as BacklinkSchema; 521 self.db 522 .get() 523 .await? 524 .interact(move |conn| { 525 _ = insert_or_ignore_into(BacklinkSchema::backlink) 526 .values(&backlinks) 527 .execute(conn)?; 528 Ok(()) 529 }) 530 .await 531 .expect("Failed to add backlinks") 532 } 533 } 534 535 /// Update the takedown status of a record. 536 pub(crate) async fn update_record_takedown_status( 537 &self, 538 uri: &AtUri, 539 takedown: StatusAttr, 540 ) -> Result<()> { 541 use crate::schema::actor_store::record::dsl as RecordSchema; 542 543 let takedown_ref: Option<String> = match takedown.applied { 544 true => takedown 545 .r#ref 546 .map_or_else(|| Some(rsky_common::now()), Some), 547 false => None, 548 }; 549 let uri_string = uri.to_string(); 550 551 self.db 552 .get() 553 .await? 554 .interact(move |conn| { 555 _ = update(RecordSchema::record) 556 .filter(RecordSchema::uri.eq(uri_string)) 557 .set(RecordSchema::takedownRef.eq(takedown_ref)) 558 .execute(conn)?; 559 Ok(()) 560 }) 561 .await 562 .expect("Failed to update takedown status") 563 } 564}