Alternative ATProto PDS implementation
6
fork

Configure Feed

Select the types of activity you want to include in your feed.

prototype actor_store sql_repo

+43 -61
+4 -48
Cargo.lock
··· 2077 2077 "js-sys", 2078 2078 "log", 2079 2079 "wasm-bindgen", 2080 - "windows-core 0.61.0", 2080 + "windows-core", 2081 2081 ] 2082 2082 2083 2083 [[package]] ··· 5335 5335 source = "registry+https://github.com/rust-lang/crates.io-index" 5336 5336 checksum = "dd04d41d93c4992d421894c18c8b43496aa748dd4c081bac0dc93eb0489272b6" 5337 5337 dependencies = [ 5338 - "windows-core 0.58.0", 5338 + "windows-core", 5339 5339 "windows-targets 0.52.6", 5340 5340 ] 5341 5341 ··· 5345 5345 source = "registry+https://github.com/rust-lang/crates.io-index" 5346 5346 checksum = "6ba6d44ec8c2591c134257ce647b7ea6b20335bf6379a27dac5f1641fcf59f99" 5347 5347 dependencies = [ 5348 - "windows-implement 0.58.0", 5349 - "windows-interface 0.58.0", 5348 + "windows-implement", 5349 + "windows-interface", 5350 5350 "windows-result 0.2.0", 5351 5351 "windows-strings 0.1.0", 5352 5352 "windows-targets 0.52.6", 5353 5353 ] 5354 5354 5355 5355 [[package]] 5356 - name = "windows-core" 5357 - version = "0.61.0" 5358 - source = "registry+https://github.com/rust-lang/crates.io-index" 5359 - checksum = "4763c1de310c86d75a878046489e2e5ba02c649d185f21c67d4cf8a56d098980" 5360 - dependencies = [ 5361 - "windows-implement 0.60.0", 5362 - "windows-interface 0.59.1", 5363 - "windows-link", 5364 - "windows-result 0.3.2", 5365 - "windows-strings 0.4.0", 5366 - ] 5367 - 5368 - [[package]] 5369 5356 name = "windows-implement" 5370 5357 version = "0.58.0" 5371 5358 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 5377 5364 ] 5378 5365 5379 5366 [[package]] 5380 - name = "windows-implement" 5381 - version = "0.60.0" 5382 - source = "registry+https://github.com/rust-lang/crates.io-index" 5383 - checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" 5384 - dependencies = [ 5385 - "proc-macro2", 5386 - "quote", 5387 - "syn 2.0.101", 5388 - ] 5389 - 5390 - [[package]] 5391 5367 name = "windows-interface" 5392 5368 version = "0.58.0" 5393 5369 source = "registry+https://github.com/rust-lang/crates.io-index" 5394 5370 checksum = "053c4c462dc91d3b1504c6fe5a726dd15e216ba718e84a0e46a88fbe5ded3515" 5395 - dependencies = [ 5396 - "proc-macro2", 5397 - "quote", 5398 - "syn 2.0.101", 5399 - ] 5400 - 5401 - [[package]] 5402 - name = "windows-interface" 5403 - version = "0.59.1" 5404 - source = "registry+https://github.com/rust-lang/crates.io-index" 5405 - checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" 5406 5371 dependencies = [ 5407 5372 "proc-macro2", 5408 5373 "quote", ··· 5459 5424 version = "0.3.1" 5460 5425 source = "registry+https://github.com/rust-lang/crates.io-index" 5461 5426 checksum = "87fa48cc5d406560701792be122a10132491cff9d0aeb23583cc2dcafc847319" 5462 - dependencies = [ 5463 - "windows-link", 5464 - ] 5465 - 5466 - [[package]] 5467 - name = "windows-strings" 5468 - version = "0.4.0" 5469 - source = "registry+https://github.com/rust-lang/crates.io-index" 5470 - checksum = "7a2ba9642430ee452d5a7aa78d72907ebe8cfda358e8cb7918a2050581322f97" 5471 5427 dependencies = [ 5472 5428 "windows-link", 5473 5429 ]
+28 -3
src/actor_store/repo/sql_repo_reader.rs
··· 5 5 Cid, 6 6 blockstore::{AsyncBlockStoreRead, Error as BlockstoreError}, 7 7 }; 8 - use rsky_repo::{block_map::BlockMap, cid_set::CidSet}; 9 - use sha2::Digest; 10 8 use sqlx::{Row, SqlitePool}; 11 9 use std::str::FromStr; 12 10 use std::sync::Arc; ··· 41 39 } 42 40 43 41 // async getRoot(): Promise<CID> { 44 - // async getBytes(cid: CID): Promise<Uint8Array | null> { 45 42 // async has(cid: CID): Promise<boolean> { 46 43 // async getCarStream(since?: string) { 47 44 // async *iterateCarBlocks(since?: string): AsyncIterable<CarBlock> { 48 45 // async getBlockRange(since?: string, cursor?: RevCursor) { 49 46 // async countBlocks(): Promise<number> { 50 47 // async destroy(): Promise<void> { 48 + 49 + pub(crate) async fn get_bytes(&self, cid: &Cid) -> Result<Option<Vec<u8>>> { 50 + // First check the cache 51 + { 52 + let cache_guard = self.cache.read().await; 53 + if let Some(cached) = cache_guard.get(*cid) { 54 + return Ok(Some(cached.clone())); 55 + } 56 + } 57 + 58 + // Not in cache, query from database 59 + let cid_str = cid.to_string(); 60 + let did = self.did.clone(); 61 + 62 + let content = sqlx::query!(r#"SELECT content FROM repo_block WHERE cid = ?"#, cid_str,) 63 + .fetch_optional(&self.db) 64 + .await 65 + .context("failed to fetch block content")? 66 + .map(|row| row.content); 67 + 68 + // If found, update the cache 69 + if let Some(bytes) = &content { 70 + let mut cache_guard = self.cache.write().await; 71 + cache_guard.set(*cid, bytes.clone()); 72 + } 73 + 74 + Ok(content) 75 + } 51 76 52 77 /// Get the detailed root information. 53 78 pub(crate) async fn get_root_detailed(&self) -> Result<RootInfo> {
+11 -10
src/actor_store/repo/sql_repo_transactor.rs
··· 7 7 Cid, 8 8 blockstore::{AsyncBlockStoreWrite, Error as BlockstoreError}, 9 9 }; 10 - use rsky_repo::{block_map::BlockMap, types::CommitData}; 11 10 use sha2::Digest; 12 11 use sqlx::SqlitePool; 13 12 ··· 59 58 /// Apply a commit to the repository. 60 59 pub(crate) async fn apply_commit(&self, commit: CommitData, is_create: bool) -> Result<()> { 61 60 let is_create = is_create || false; 61 + let removed_cids_list = commit.removed_cids.to_list(); 62 62 63 63 // Run these operations in parallel for better performance 64 64 tokio::try_join!( 65 65 self.update_root(commit.cid, &commit.rev, is_create), 66 66 self.put_many(&commit.new_blocks, &commit.rev), 67 - self.delete_many(&commit.removed_cids.to_list()) 67 + self.delete_many(&removed_cids_list) 68 68 )?; 69 69 70 70 Ok(()) ··· 111 111 /// Put a block into the repository. 112 112 pub(crate) async fn put_block(&self, cid: Cid, block: &[u8], rev: &str) -> Result<()> { 113 113 let cid_str = cid.to_string(); 114 - let did = self.reader.did.clone(); 115 114 115 + let block_len = block.len() as i64; 116 116 sqlx::query!( 117 117 r#" 118 118 INSERT INTO repo_block (cid, repoRev, size, content) ··· 121 121 "#, 122 122 cid_str, 123 123 rev, 124 - block.len() as i64, 124 + block_len, 125 125 block 126 126 ) 127 127 .execute(&self.reader.db) ··· 132 132 133 133 /// Put many blocks into the repository. 134 134 pub(crate) async fn put_many(&self, blocks: &BlockMap, rev: &str) -> Result<()> { 135 - if blocks.is_empty() { 135 + if blocks.size() == 0 { 136 136 return Ok(()); 137 137 } 138 138 ··· 140 140 let mut batch = Vec::new(); 141 141 142 142 blocks.for_each(|cid, bytes| { 143 + let cid_string = format!("{:?}", cid); 143 144 batch.push(( 144 - cid.to_string(), 145 + cid_string, 145 146 did.clone(), 146 147 rev.to_string(), 147 - bytes.len() as i64, 148 + bytes.encoded_len() as i64, 148 149 bytes.clone(), 149 150 )); 150 151 }); ··· 221 222 contents: &[u8], 222 223 ) -> impl Future<Output = Result<Cid, BlockstoreError>> + Send { 223 224 let contents = contents.to_vec(); 224 - let did = self.reader.did.clone(); 225 225 let rev = self.now.clone(); 226 226 227 227 async move { ··· 235 235 236 236 let cid = Cid::new_v1(codec, multihash); 237 237 let cid_str = cid.to_string(); 238 + let contents_len = contents.len() as i64; 238 239 239 240 sqlx::query!( 240 241 r#" ··· 244 245 "#, 245 246 cid_str, 246 247 rev, 247 - contents.len() as i64, 248 + contents_len, 248 249 contents 249 250 ) 250 251 .execute(&self.reader.db) 251 252 .await 252 253 .map_err(|e| BlockstoreError::Other(Box::new(e)))?; 253 254 254 - self.cache.set(cid, contents); 255 + self.cache.set(&cid, contents); 255 256 Ok(cid) 256 257 } 257 258 }