Alternative ATProto PDS implementation
at oauth 18 kB view raw
1//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/record/mod.rs 2//! blacksky-algorithms/rsky is licensed under the Apache License 2.0 3//! 4//! Modified for SQLite backend 5 6use crate::models::actor_store as models; 7use crate::models::actor_store::RepoBlock; 8use anyhow::Result; 9use cidv10::Cid; 10use diesel::dsl::sql; 11use diesel::prelude::*; 12use diesel::sql_types::{Bool, Text}; 13use diesel::*; 14use futures::{StreamExt, TryStreamExt, stream}; 15use rsky_repo::block_map::{BlockMap, BlocksAndMissing}; 16use rsky_repo::car::blocks_to_car_file; 17use rsky_repo::cid_set::CidSet; 18use rsky_repo::storage::CidAndRev; 19use rsky_repo::storage::RepoRootError::RepoRootNotFoundError; 20use rsky_repo::storage::readable_blockstore::ReadableBlockstore; 21use rsky_repo::storage::types::RepoStorage; 22use rsky_repo::types::CommitData; 23use std::pin::Pin; 24use std::str::FromStr; 25use std::sync::Arc; 26use tokio::sync::RwLock; 27 28pub struct SqlRepoReader { 29 pub cache: Arc<RwLock<BlockMap>>, 30 pub db: deadpool_diesel::sqlite::Object, 31 pub root: Option<Cid>, 32 pub rev: Option<String>, 33 pub now: String, 34 pub did: String, 35} 36 37impl std::fmt::Debug for SqlRepoReader { 38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 39 f.debug_struct("SqlRepoReader") 40 .field("did", &self.did) 41 .field("root", &self.root) 42 .field("rev", &self.rev) 43 .finish() 44 } 45} 46 47impl ReadableBlockstore for SqlRepoReader { 48 fn get_bytes<'life>( 49 &'life self, 50 cid: &'life Cid, 51 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>>> + Send + Sync + 'life>> { 52 let did: String = self.did.clone(); 53 let cid = *cid; 54 55 Box::pin(async move { 56 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 57 let cached = { 58 let cache_guard = self.cache.read().await; 59 cache_guard.get(cid).cloned() 60 }; 61 if let Some(cached_result) = cached { 62 return Ok(Some(cached_result)); 63 } 64 65 let found: Option<Vec<u8>> = self 66 .db 67 .interact(move |conn| { 68 RepoBlockSchema::repo_block 69 .filter(RepoBlockSchema::cid.eq(cid.to_string())) 70 .filter(RepoBlockSchema::did.eq(did)) 71 .select(RepoBlockSchema::content) 72 .first(conn) 73 .optional() 74 }) 75 .await 76 .expect("Failed to get block")?; 77 match found { 78 None => Ok(None), 79 Some(result) => { 80 { 81 let mut cache_guard = self.cache.write().await; 82 cache_guard.set(cid, result.clone()); 83 } 84 Ok(Some(result)) 85 } 86 } 87 }) 88 } 89 90 fn has<'life>( 91 &'life self, 92 cid: Cid, 93 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + Sync + 'life>> { 94 Box::pin(async move { 95 let got = <Self as ReadableBlockstore>::get_bytes(self, &cid).await?; 96 Ok(got.is_some()) 97 }) 98 } 99 100 fn get_blocks<'life>( 101 &'life self, 102 cids: Vec<Cid>, 103 ) -> Pin<Box<dyn Future<Output = Result<BlocksAndMissing>> + Send + Sync + 'life>> { 104 let did: String = self.did.clone(); 105 106 Box::pin(async move { 107 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 108 let cached = { 109 let mut cache_guard = self.cache.write().await; 110 cache_guard.get_many(cids)? 111 }; 112 113 if cached.missing.is_empty() { 114 return Ok(cached); 115 } 116 let missing = CidSet::new(Some(cached.missing.clone())); 117 let missing_strings: Vec<String> = 118 cached.missing.into_iter().map(|c| c.to_string()).collect(); 119 120 let blocks = Arc::new(tokio::sync::Mutex::new(BlockMap::new())); 121 let missing_set = Arc::new(tokio::sync::Mutex::new(missing)); 122 123 let stream: Vec<_> = stream::iter(missing_strings.chunks(500)) 124 .then(|batch| { 125 let this_did = did.clone(); 126 let blocks = Arc::clone(&blocks); 127 let missing = Arc::clone(&missing_set); 128 let batch = batch.to_vec(); // Convert to owned Vec 129 130 async move { 131 // Database query 132 let rows: Vec<(String, Vec<u8>)> = self 133 .db 134 .interact(move |conn| { 135 RepoBlockSchema::repo_block 136 .filter(RepoBlockSchema::cid.eq_any(batch)) 137 .filter(RepoBlockSchema::did.eq(this_did)) 138 .select((RepoBlockSchema::cid, RepoBlockSchema::content)) 139 .load(conn) 140 }) 141 .await 142 .expect("Failed to get blocks")?; 143 144 // Process rows with locked access 145 let mut blocks = blocks.lock().await; 146 let mut missing = missing.lock().await; 147 148 for row in rows { 149 let cid = Cid::from_str(&row.0)?; // Proper error handling 150 blocks.set(cid, row.1); 151 missing.delete(cid); 152 } 153 154 Ok::<(), anyhow::Error>(()) 155 } 156 }) 157 .try_collect() 158 .await?; 159 drop(stream); 160 161 // Extract values from synchronization primitives 162 let mut blocks = Arc::try_unwrap(blocks) 163 .expect("Arc still has owners") 164 .into_inner(); 165 let missing = Arc::try_unwrap(missing_set) 166 .expect("Arc still has owners") 167 .into_inner(); 168 169 { 170 let mut cache_guard = self.cache.write().await; 171 cache_guard.add_map(blocks.clone())?; 172 } 173 174 blocks.add_map(cached.blocks)?; 175 176 Ok(BlocksAndMissing { 177 blocks, 178 missing: missing.to_list(), 179 }) 180 }) 181 } 182} 183 184impl RepoStorage for SqlRepoReader { 185 fn get_root<'life>( 186 &'life self, 187 ) -> Pin<Box<dyn Future<Output = Option<Cid>> + Send + Sync + 'life>> { 188 Box::pin(async move { 189 match self.get_root_detailed().await { 190 Ok(root) => Some(root.cid), 191 Err(_) => None, 192 } 193 }) 194 } 195 196 fn put_block<'life>( 197 &'life self, 198 cid: Cid, 199 bytes: Vec<u8>, 200 rev: String, 201 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> { 202 let did: String = self.did.clone(); 203 let bytes_cloned = bytes.clone(); 204 Box::pin(async move { 205 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 206 207 _ = self 208 .db 209 .interact(move |conn| { 210 insert_into(RepoBlockSchema::repo_block) 211 .values(( 212 RepoBlockSchema::did.eq(did), 213 RepoBlockSchema::cid.eq(cid.to_string()), 214 RepoBlockSchema::repoRev.eq(rev), 215 RepoBlockSchema::size.eq(bytes.len() as i32), 216 RepoBlockSchema::content.eq(bytes), 217 )) 218 .execute(conn) 219 }) 220 .await 221 .expect("Failed to put block")?; 222 { 223 let mut cache_guard = self.cache.write().await; 224 cache_guard.set(cid, bytes_cloned); 225 } 226 Ok(()) 227 }) 228 } 229 230 fn put_many<'life>( 231 &'life self, 232 to_put: BlockMap, 233 rev: String, 234 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> { 235 let did: String = self.did.clone(); 236 237 Box::pin(async move { 238 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 239 240 let blocks: Vec<RepoBlock> = to_put 241 .map 242 .iter() 243 .map(|(cid, bytes)| RepoBlock { 244 cid: cid.to_string(), 245 did: did.clone(), 246 repo_rev: rev.clone(), 247 size: bytes.0.len() as i32, 248 content: bytes.0.clone(), 249 }) 250 .collect(); 251 252 let chunks: Vec<Vec<RepoBlock>> = 253 blocks.chunks(50).map(|chunk| chunk.to_vec()).collect(); 254 255 for batch in chunks { 256 _ = self 257 .db 258 .interact(move |conn| { 259 insert_or_ignore_into(RepoBlockSchema::repo_block) 260 .values(&batch) 261 .execute(conn) 262 }) 263 .await 264 .expect("Failed to insert blocks")?; 265 } 266 267 Ok(()) 268 }) 269 } 270 fn update_root<'life>( 271 &'life self, 272 cid: Cid, 273 rev: String, 274 is_create: Option<bool>, 275 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> { 276 let did: String = self.did.clone(); 277 let now: String = self.now.clone(); 278 279 Box::pin(async move { 280 use crate::schema::actor_store::repo_root::dsl as RepoRootSchema; 281 282 let is_create = is_create.unwrap_or(false); 283 if is_create { 284 _ = self 285 .db 286 .interact(move |conn| { 287 insert_into(RepoRootSchema::repo_root) 288 .values(( 289 RepoRootSchema::did.eq(did), 290 RepoRootSchema::cid.eq(cid.to_string()), 291 RepoRootSchema::rev.eq(rev), 292 RepoRootSchema::indexedAt.eq(now), 293 )) 294 .execute(conn) 295 }) 296 .await 297 .expect("Failed to create root")?; 298 } else { 299 _ = self 300 .db 301 .interact(move |conn| { 302 update(RepoRootSchema::repo_root) 303 .filter(RepoRootSchema::did.eq(did)) 304 .set(( 305 RepoRootSchema::cid.eq(cid.to_string()), 306 RepoRootSchema::rev.eq(rev), 307 RepoRootSchema::indexedAt.eq(now), 308 )) 309 .execute(conn) 310 }) 311 .await 312 .expect("Failed to update root")?; 313 } 314 Ok(()) 315 }) 316 } 317 318 fn apply_commit<'life>( 319 &'life self, 320 commit: CommitData, 321 is_create: Option<bool>, 322 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> { 323 Box::pin(async move { 324 self.update_root(commit.cid, commit.rev.clone(), is_create) 325 .await?; 326 self.put_many(commit.new_blocks, commit.rev).await?; 327 self.delete_many(commit.removed_cids.to_list()).await?; 328 Ok(()) 329 }) 330 } 331} 332 333// Basically handles getting ipld blocks from db 334impl SqlRepoReader { 335 pub fn new(did: String, now: Option<String>, db: deadpool_diesel::sqlite::Object) -> Self { 336 let now = now.unwrap_or_else(rsky_common::now); 337 Self { 338 cache: Arc::new(RwLock::new(BlockMap::new())), 339 root: None, 340 rev: None, 341 db, 342 now, 343 did, 344 } 345 } 346 347 pub async fn get_car_stream(&self, since: Option<String>) -> Result<Vec<u8>> { 348 match self.get_root().await { 349 None => Err(anyhow::Error::new(RepoRootNotFoundError)), 350 Some(root) => { 351 let mut car = BlockMap::new(); 352 let mut cursor: Option<CidAndRev> = None; 353 let mut write_rows = |rows: Vec<RepoBlock>| -> Result<()> { 354 for row in rows { 355 car.set(Cid::from_str(&row.cid)?, row.content); 356 } 357 Ok(()) 358 }; 359 loop { 360 let res = self.get_block_range(&since, &cursor).await?; 361 write_rows(res.clone())?; 362 if let Some(last_row) = res.last() { 363 cursor = Some(CidAndRev { 364 cid: Cid::from_str(&last_row.cid)?, 365 rev: last_row.repo_rev.clone(), 366 }); 367 } else { 368 break; 369 } 370 } 371 blocks_to_car_file(Some(&root), car).await 372 } 373 } 374 } 375 376 pub async fn get_block_range( 377 &self, 378 since: &Option<String>, 379 cursor: &Option<CidAndRev>, 380 ) -> Result<Vec<RepoBlock>> { 381 let did: String = self.did.clone(); 382 let since = since.clone(); 383 let cursor = cursor.clone(); 384 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 385 386 Ok(self 387 .db 388 .interact(move |conn| { 389 let mut builder = RepoBlockSchema::repo_block 390 .select(RepoBlock::as_select()) 391 .order((RepoBlockSchema::repoRev.desc(), RepoBlockSchema::cid.desc())) 392 .filter(RepoBlockSchema::did.eq(did)) 393 .limit(500) 394 .into_boxed(); 395 396 if let Some(cursor) = cursor { 397 // use this syntax to ensure we hit the index 398 builder = builder.filter( 399 sql::<Bool>("((") 400 .bind(RepoBlockSchema::repoRev) 401 .sql(", ") 402 .bind(RepoBlockSchema::cid) 403 .sql(") < (") 404 .bind::<Text, _>(cursor.rev.clone()) 405 .sql(", ") 406 .bind::<Text, _>(cursor.cid.to_string()) 407 .sql("))"), 408 ); 409 } 410 if let Some(since) = since { 411 builder = builder.filter(RepoBlockSchema::repoRev.gt(since)); 412 } 413 builder.load(conn) 414 }) 415 .await 416 .expect("Failed to get block range")?) 417 } 418 419 pub async fn count_blocks(&self) -> Result<i64> { 420 let did: String = self.did.clone(); 421 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 422 423 let res = self 424 .db 425 .interact(move |conn| { 426 RepoBlockSchema::repo_block 427 .filter(RepoBlockSchema::did.eq(did)) 428 .count() 429 .get_result(conn) 430 }) 431 .await 432 .expect("Failed to count blocks")?; 433 Ok(res) 434 } 435 436 // Transactors 437 // ------------------- 438 439 /// Proactively cache all blocks from a particular commit (to prevent multiple roundtrips) 440 pub async fn cache_rev(&mut self, rev: String) -> Result<()> { 441 let did: String = self.did.clone(); 442 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 443 444 let result: Vec<(String, Vec<u8>)> = self 445 .db 446 .interact(move |conn| { 447 RepoBlockSchema::repo_block 448 .filter(RepoBlockSchema::did.eq(did)) 449 .filter(RepoBlockSchema::repoRev.eq(rev)) 450 .select((RepoBlockSchema::cid, RepoBlockSchema::content)) 451 .limit(15) 452 .get_results::<(String, Vec<u8>)>(conn) 453 }) 454 .await 455 .expect("Failed to cache rev")?; 456 for row in result { 457 let mut cache_guard = self.cache.write().await; 458 cache_guard.set(Cid::from_str(&row.0)?, row.1) 459 } 460 Ok(()) 461 } 462 463 pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> { 464 if cids.is_empty() { 465 return Ok(()); 466 } 467 let did: String = self.did.clone(); 468 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema; 469 470 let cid_strings: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); 471 _ = self 472 .db 473 .interact(move |conn| { 474 delete(RepoBlockSchema::repo_block) 475 .filter(RepoBlockSchema::did.eq(did)) 476 .filter(RepoBlockSchema::cid.eq_any(cid_strings)) 477 .execute(conn) 478 }) 479 .await 480 .expect("Failed to delete many")?; 481 Ok(()) 482 } 483 484 pub async fn get_root_detailed(&self) -> Result<CidAndRev> { 485 let did: String = self.did.clone(); 486 use crate::schema::actor_store::repo_root::dsl as RepoRootSchema; 487 488 let res = self 489 .db 490 .interact(move |conn| { 491 RepoRootSchema::repo_root 492 .filter(RepoRootSchema::did.eq(did)) 493 .select(models::RepoRoot::as_select()) 494 .first(conn) 495 }) 496 .await 497 .expect("Failed to get root")?; 498 499 Ok(CidAndRev { 500 cid: Cid::from_str(&res.cid)?, 501 rev: res.rev, 502 }) 503 } 504}