Alternative ATProto PDS implementation
1//! Based on https://github.com/blacksky-algorithms/rsky/blob/main/rsky-pds/src/actor_store/record/mod.rs
2//! blacksky-algorithms/rsky is licensed under the Apache License 2.0
3//!
4//! Modified for SQLite backend
5
6use crate::models::actor_store as models;
7use crate::models::actor_store::RepoBlock;
8use anyhow::Result;
9use cidv10::Cid;
10use diesel::dsl::sql;
11use diesel::prelude::*;
12use diesel::sql_types::{Bool, Text};
13use diesel::*;
14use futures::{StreamExt, TryStreamExt, stream};
15use rsky_repo::block_map::{BlockMap, BlocksAndMissing};
16use rsky_repo::car::blocks_to_car_file;
17use rsky_repo::cid_set::CidSet;
18use rsky_repo::storage::CidAndRev;
19use rsky_repo::storage::RepoRootError::RepoRootNotFoundError;
20use rsky_repo::storage::readable_blockstore::ReadableBlockstore;
21use rsky_repo::storage::types::RepoStorage;
22use rsky_repo::types::CommitData;
23use std::pin::Pin;
24use std::str::FromStr;
25use std::sync::Arc;
26use tokio::sync::RwLock;
27
28pub struct SqlRepoReader {
29 pub cache: Arc<RwLock<BlockMap>>,
30 pub db: deadpool_diesel::sqlite::Object,
31 pub root: Option<Cid>,
32 pub rev: Option<String>,
33 pub now: String,
34 pub did: String,
35}
36
37impl std::fmt::Debug for SqlRepoReader {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("SqlRepoReader")
40 .field("did", &self.did)
41 .field("root", &self.root)
42 .field("rev", &self.rev)
43 .finish()
44 }
45}
46
47impl ReadableBlockstore for SqlRepoReader {
48 fn get_bytes<'life>(
49 &'life self,
50 cid: &'life Cid,
51 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>>> + Send + Sync + 'life>> {
52 let did: String = self.did.clone();
53 let cid = *cid;
54
55 Box::pin(async move {
56 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
57 let cached = {
58 let cache_guard = self.cache.read().await;
59 cache_guard.get(cid).cloned()
60 };
61 if let Some(cached_result) = cached {
62 return Ok(Some(cached_result));
63 }
64
65 let found: Option<Vec<u8>> = self
66 .db
67 .interact(move |conn| {
68 RepoBlockSchema::repo_block
69 .filter(RepoBlockSchema::cid.eq(cid.to_string()))
70 .filter(RepoBlockSchema::did.eq(did))
71 .select(RepoBlockSchema::content)
72 .first(conn)
73 .optional()
74 })
75 .await
76 .expect("Failed to get block")?;
77 match found {
78 None => Ok(None),
79 Some(result) => {
80 {
81 let mut cache_guard = self.cache.write().await;
82 cache_guard.set(cid, result.clone());
83 }
84 Ok(Some(result))
85 }
86 }
87 })
88 }
89
90 fn has<'life>(
91 &'life self,
92 cid: Cid,
93 ) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + Sync + 'life>> {
94 Box::pin(async move {
95 let got = <Self as ReadableBlockstore>::get_bytes(self, &cid).await?;
96 Ok(got.is_some())
97 })
98 }
99
100 fn get_blocks<'life>(
101 &'life self,
102 cids: Vec<Cid>,
103 ) -> Pin<Box<dyn Future<Output = Result<BlocksAndMissing>> + Send + Sync + 'life>> {
104 let did: String = self.did.clone();
105
106 Box::pin(async move {
107 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
108 let cached = {
109 let mut cache_guard = self.cache.write().await;
110 cache_guard.get_many(cids)?
111 };
112
113 if cached.missing.is_empty() {
114 return Ok(cached);
115 }
116 let missing = CidSet::new(Some(cached.missing.clone()));
117 let missing_strings: Vec<String> =
118 cached.missing.into_iter().map(|c| c.to_string()).collect();
119
120 let blocks = Arc::new(tokio::sync::Mutex::new(BlockMap::new()));
121 let missing_set = Arc::new(tokio::sync::Mutex::new(missing));
122
123 let stream: Vec<_> = stream::iter(missing_strings.chunks(500))
124 .then(|batch| {
125 let this_did = did.clone();
126 let blocks = Arc::clone(&blocks);
127 let missing = Arc::clone(&missing_set);
128 let batch = batch.to_vec(); // Convert to owned Vec
129
130 async move {
131 // Database query
132 let rows: Vec<(String, Vec<u8>)> = self
133 .db
134 .interact(move |conn| {
135 RepoBlockSchema::repo_block
136 .filter(RepoBlockSchema::cid.eq_any(batch))
137 .filter(RepoBlockSchema::did.eq(this_did))
138 .select((RepoBlockSchema::cid, RepoBlockSchema::content))
139 .load(conn)
140 })
141 .await
142 .expect("Failed to get blocks")?;
143
144 // Process rows with locked access
145 let mut blocks = blocks.lock().await;
146 let mut missing = missing.lock().await;
147
148 for row in rows {
149 let cid = Cid::from_str(&row.0)?; // Proper error handling
150 blocks.set(cid, row.1);
151 missing.delete(cid);
152 }
153
154 Ok::<(), anyhow::Error>(())
155 }
156 })
157 .try_collect()
158 .await?;
159 drop(stream);
160
161 // Extract values from synchronization primitives
162 let mut blocks = Arc::try_unwrap(blocks)
163 .expect("Arc still has owners")
164 .into_inner();
165 let missing = Arc::try_unwrap(missing_set)
166 .expect("Arc still has owners")
167 .into_inner();
168
169 {
170 let mut cache_guard = self.cache.write().await;
171 cache_guard.add_map(blocks.clone())?;
172 }
173
174 blocks.add_map(cached.blocks)?;
175
176 Ok(BlocksAndMissing {
177 blocks,
178 missing: missing.to_list(),
179 })
180 })
181 }
182}
183
184impl RepoStorage for SqlRepoReader {
185 fn get_root<'life>(
186 &'life self,
187 ) -> Pin<Box<dyn Future<Output = Option<Cid>> + Send + Sync + 'life>> {
188 Box::pin(async move {
189 match self.get_root_detailed().await {
190 Ok(root) => Some(root.cid),
191 Err(_) => None,
192 }
193 })
194 }
195
196 fn put_block<'life>(
197 &'life self,
198 cid: Cid,
199 bytes: Vec<u8>,
200 rev: String,
201 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> {
202 let did: String = self.did.clone();
203 let bytes_cloned = bytes.clone();
204 Box::pin(async move {
205 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
206
207 _ = self
208 .db
209 .interact(move |conn| {
210 insert_into(RepoBlockSchema::repo_block)
211 .values((
212 RepoBlockSchema::did.eq(did),
213 RepoBlockSchema::cid.eq(cid.to_string()),
214 RepoBlockSchema::repoRev.eq(rev),
215 RepoBlockSchema::size.eq(bytes.len() as i32),
216 RepoBlockSchema::content.eq(bytes),
217 ))
218 .execute(conn)
219 })
220 .await
221 .expect("Failed to put block")?;
222 {
223 let mut cache_guard = self.cache.write().await;
224 cache_guard.set(cid, bytes_cloned);
225 }
226 Ok(())
227 })
228 }
229
230 fn put_many<'life>(
231 &'life self,
232 to_put: BlockMap,
233 rev: String,
234 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> {
235 let did: String = self.did.clone();
236
237 Box::pin(async move {
238 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
239
240 let blocks: Vec<RepoBlock> = to_put
241 .map
242 .iter()
243 .map(|(cid, bytes)| RepoBlock {
244 cid: cid.to_string(),
245 did: did.clone(),
246 repo_rev: rev.clone(),
247 size: bytes.0.len() as i32,
248 content: bytes.0.clone(),
249 })
250 .collect();
251
252 let chunks: Vec<Vec<RepoBlock>> =
253 blocks.chunks(50).map(|chunk| chunk.to_vec()).collect();
254
255 for batch in chunks {
256 _ = self
257 .db
258 .interact(move |conn| {
259 insert_or_ignore_into(RepoBlockSchema::repo_block)
260 .values(&batch)
261 .execute(conn)
262 })
263 .await
264 .expect("Failed to insert blocks")?;
265 }
266
267 Ok(())
268 })
269 }
270 fn update_root<'life>(
271 &'life self,
272 cid: Cid,
273 rev: String,
274 is_create: Option<bool>,
275 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> {
276 let did: String = self.did.clone();
277 let now: String = self.now.clone();
278
279 Box::pin(async move {
280 use crate::schema::actor_store::repo_root::dsl as RepoRootSchema;
281
282 let is_create = is_create.unwrap_or(false);
283 if is_create {
284 _ = self
285 .db
286 .interact(move |conn| {
287 insert_into(RepoRootSchema::repo_root)
288 .values((
289 RepoRootSchema::did.eq(did),
290 RepoRootSchema::cid.eq(cid.to_string()),
291 RepoRootSchema::rev.eq(rev),
292 RepoRootSchema::indexedAt.eq(now),
293 ))
294 .execute(conn)
295 })
296 .await
297 .expect("Failed to create root")?;
298 } else {
299 _ = self
300 .db
301 .interact(move |conn| {
302 update(RepoRootSchema::repo_root)
303 .filter(RepoRootSchema::did.eq(did))
304 .set((
305 RepoRootSchema::cid.eq(cid.to_string()),
306 RepoRootSchema::rev.eq(rev),
307 RepoRootSchema::indexedAt.eq(now),
308 ))
309 .execute(conn)
310 })
311 .await
312 .expect("Failed to update root")?;
313 }
314 Ok(())
315 })
316 }
317
318 fn apply_commit<'life>(
319 &'life self,
320 commit: CommitData,
321 is_create: Option<bool>,
322 ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + Sync + 'life>> {
323 Box::pin(async move {
324 self.update_root(commit.cid, commit.rev.clone(), is_create)
325 .await?;
326 self.put_many(commit.new_blocks, commit.rev).await?;
327 self.delete_many(commit.removed_cids.to_list()).await?;
328 Ok(())
329 })
330 }
331}
332
333// Basically handles getting ipld blocks from db
334impl SqlRepoReader {
335 pub fn new(did: String, now: Option<String>, db: deadpool_diesel::sqlite::Object) -> Self {
336 let now = now.unwrap_or_else(rsky_common::now);
337 Self {
338 cache: Arc::new(RwLock::new(BlockMap::new())),
339 root: None,
340 rev: None,
341 db,
342 now,
343 did,
344 }
345 }
346
347 pub async fn get_car_stream(&self, since: Option<String>) -> Result<Vec<u8>> {
348 match self.get_root().await {
349 None => Err(anyhow::Error::new(RepoRootNotFoundError)),
350 Some(root) => {
351 let mut car = BlockMap::new();
352 let mut cursor: Option<CidAndRev> = None;
353 let mut write_rows = |rows: Vec<RepoBlock>| -> Result<()> {
354 for row in rows {
355 car.set(Cid::from_str(&row.cid)?, row.content);
356 }
357 Ok(())
358 };
359 loop {
360 let res = self.get_block_range(&since, &cursor).await?;
361 write_rows(res.clone())?;
362 if let Some(last_row) = res.last() {
363 cursor = Some(CidAndRev {
364 cid: Cid::from_str(&last_row.cid)?,
365 rev: last_row.repo_rev.clone(),
366 });
367 } else {
368 break;
369 }
370 }
371 blocks_to_car_file(Some(&root), car).await
372 }
373 }
374 }
375
376 pub async fn get_block_range(
377 &self,
378 since: &Option<String>,
379 cursor: &Option<CidAndRev>,
380 ) -> Result<Vec<RepoBlock>> {
381 let did: String = self.did.clone();
382 let since = since.clone();
383 let cursor = cursor.clone();
384 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
385
386 Ok(self
387 .db
388 .interact(move |conn| {
389 let mut builder = RepoBlockSchema::repo_block
390 .select(RepoBlock::as_select())
391 .order((RepoBlockSchema::repoRev.desc(), RepoBlockSchema::cid.desc()))
392 .filter(RepoBlockSchema::did.eq(did))
393 .limit(500)
394 .into_boxed();
395
396 if let Some(cursor) = cursor {
397 // use this syntax to ensure we hit the index
398 builder = builder.filter(
399 sql::<Bool>("((")
400 .bind(RepoBlockSchema::repoRev)
401 .sql(", ")
402 .bind(RepoBlockSchema::cid)
403 .sql(") < (")
404 .bind::<Text, _>(cursor.rev.clone())
405 .sql(", ")
406 .bind::<Text, _>(cursor.cid.to_string())
407 .sql("))"),
408 );
409 }
410 if let Some(since) = since {
411 builder = builder.filter(RepoBlockSchema::repoRev.gt(since));
412 }
413 builder.load(conn)
414 })
415 .await
416 .expect("Failed to get block range")?)
417 }
418
419 pub async fn count_blocks(&self) -> Result<i64> {
420 let did: String = self.did.clone();
421 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
422
423 let res = self
424 .db
425 .interact(move |conn| {
426 RepoBlockSchema::repo_block
427 .filter(RepoBlockSchema::did.eq(did))
428 .count()
429 .get_result(conn)
430 })
431 .await
432 .expect("Failed to count blocks")?;
433 Ok(res)
434 }
435
436 // Transactors
437 // -------------------
438
439 /// Proactively cache all blocks from a particular commit (to prevent multiple roundtrips)
440 pub async fn cache_rev(&mut self, rev: String) -> Result<()> {
441 let did: String = self.did.clone();
442 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
443
444 let result: Vec<(String, Vec<u8>)> = self
445 .db
446 .interact(move |conn| {
447 RepoBlockSchema::repo_block
448 .filter(RepoBlockSchema::did.eq(did))
449 .filter(RepoBlockSchema::repoRev.eq(rev))
450 .select((RepoBlockSchema::cid, RepoBlockSchema::content))
451 .limit(15)
452 .get_results::<(String, Vec<u8>)>(conn)
453 })
454 .await
455 .expect("Failed to cache rev")?;
456 for row in result {
457 let mut cache_guard = self.cache.write().await;
458 cache_guard.set(Cid::from_str(&row.0)?, row.1)
459 }
460 Ok(())
461 }
462
463 pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> {
464 if cids.is_empty() {
465 return Ok(());
466 }
467 let did: String = self.did.clone();
468 use crate::schema::actor_store::repo_block::dsl as RepoBlockSchema;
469
470 let cid_strings: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
471 _ = self
472 .db
473 .interact(move |conn| {
474 delete(RepoBlockSchema::repo_block)
475 .filter(RepoBlockSchema::did.eq(did))
476 .filter(RepoBlockSchema::cid.eq_any(cid_strings))
477 .execute(conn)
478 })
479 .await
480 .expect("Failed to delete many")?;
481 Ok(())
482 }
483
484 pub async fn get_root_detailed(&self) -> Result<CidAndRev> {
485 let did: String = self.did.clone();
486 use crate::schema::actor_store::repo_root::dsl as RepoRootSchema;
487
488 let res = self
489 .db
490 .interact(move |conn| {
491 RepoRootSchema::repo_root
492 .filter(RepoRootSchema::did.eq(did))
493 .select(models::RepoRoot::as_select())
494 .first(conn)
495 })
496 .await
497 .expect("Failed to get root")?;
498
499 Ok(CidAndRev {
500 cid: Cid::from_str(&res.cid)?,
501 rev: res.rev,
502 })
503 }
504}