Alternative ATProto PDS implementation
1//! SQL-based blob storage implementation
2#![expect(
3 clippy::pub_use,
4 clippy::single_char_lifetime_names,
5 unused_qualifications,
6 unnameable_types
7)]
8use anyhow::{Context, Result};
9use cidv10::Cid;
10use diesel::prelude::*;
11
12/// ByteStream implementation for blob data
13pub struct ByteStream {
14 pub bytes: Vec<u8>,
15}
16
17impl ByteStream {
18 pub const fn new(bytes: Vec<u8>) -> Self {
19 Self { bytes }
20 }
21
22 pub async fn collect(self) -> Result<Vec<u8>> {
23 Ok(self.bytes)
24 }
25}
26
27/// SQL-based implementation of blob storage
28pub struct BlobStoreSql {
29 /// Database connection for metadata
30 pub db: deadpool_diesel::Pool<
31 deadpool_diesel::Manager<SqliteConnection>,
32 deadpool_diesel::sqlite::Object,
33 >,
34 /// DID of the actor
35 pub did: String,
36}
37
38/// Blob table structure for SQL operations
39#[derive(Queryable, Insertable, Debug)]
40#[diesel(table_name = blobs)]
41struct BlobEntry {
42 cid: String,
43 did: String,
44 data: Vec<u8>,
45 size: i32,
46 mime_type: String,
47 quarantined: bool,
48}
49
50// Table definition for blobs
51table! {
52 blobs (cid, did) {
53 cid -> Text,
54 did -> Text,
55 data -> Binary,
56 size -> Integer,
57 mime_type -> Text,
58 quarantined -> Bool,
59 }
60}
61
62impl BlobStoreSql {
63 /// Create a new SQL-based blob store for the given DID
64 pub const fn new(
65 did: String,
66 db: deadpool_diesel::Pool<
67 deadpool_diesel::Manager<SqliteConnection>,
68 deadpool_diesel::sqlite::Object,
69 >,
70 ) -> Self {
71 Self { db, did }
72 }
73
74 // /// Create a factory function for blob stores
75 pub fn creator(
76 db: deadpool_diesel::Pool<
77 deadpool_diesel::Manager<SqliteConnection>,
78 deadpool_diesel::sqlite::Object,
79 >,
80 ) -> Box<dyn Fn(String) -> BlobStoreSql> {
81 let db_clone = db.clone();
82 Box::new(move |did: String| BlobStoreSql::new(did, db_clone.clone()))
83 }
84
85 /// Store a blob temporarily - now just stores permanently with a key returned for API compatibility
86 pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> {
87 // Generate a unique key as a CID based on the data
88 // use sha2::{Digest, Sha256};
89 // let digest = Sha256::digest(&bytes);
90 // let key = hex::encode(digest);
91 let key = rsky_common::get_random_str();
92
93 // Just store the blob directly
94 self.put_permanent_with_mime(
95 Cid::try_from(format!("bafy{}", key)).unwrap_or_else(|_| Cid::default()),
96 bytes,
97 "application/octet-stream".to_owned(),
98 )
99 .await?;
100
101 // Return the key for API compatibility
102 Ok(key)
103 }
104
105 /// Make a temporary blob permanent - just a no-op for API compatibility
106 pub async fn make_permanent(&self, _key: String, _cid: Cid) -> Result<()> {
107 // No-op since we don't have temporary blobs anymore
108 Ok(())
109 }
110
111 /// Store a blob with specific mime type
112 pub async fn put_permanent_with_mime(
113 &self,
114 cid: Cid,
115 bytes: Vec<u8>,
116 mime_type: String,
117 ) -> Result<()> {
118 let cid_str = cid.to_string();
119 let did_clone = self.did.clone();
120 let bytes_len = bytes.len() as i32;
121
122 // Store directly in the database
123 _ = self
124 .db
125 .get()
126 .await?
127 .interact(move |conn| {
128 let data_clone = bytes.clone();
129 let entry = BlobEntry {
130 cid: cid_str.clone(),
131 did: did_clone.clone(),
132 data: bytes,
133 size: bytes_len,
134 mime_type,
135 quarantined: false,
136 };
137
138 diesel::insert_into(blobs::table)
139 .values(&entry)
140 .on_conflict((blobs::cid, blobs::did))
141 .do_update()
142 .set(blobs::data.eq(data_clone))
143 .execute(conn)
144 .context("Failed to insert blob data")
145 })
146 .await
147 .expect("Failed to store blob data")?;
148
149 Ok(())
150 }
151
152 /// Store a blob directly as permanent
153 pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> {
154 self.put_permanent_with_mime(cid, bytes, "application/octet-stream".to_owned())
155 .await
156 }
157
158 /// Quarantine a blob
159 pub async fn quarantine(&self, cid: Cid) -> Result<()> {
160 let cid_str = cid.to_string();
161 let did_clone = self.did.clone();
162
163 // Update the quarantine flag in the database
164 _ = self
165 .db
166 .get()
167 .await?
168 .interact(move |conn| {
169 diesel::update(blobs::table)
170 .filter(blobs::cid.eq(&cid_str))
171 .filter(blobs::did.eq(&did_clone))
172 .set(blobs::quarantined.eq(true))
173 .execute(conn)
174 .context("Failed to quarantine blob")
175 })
176 .await
177 .expect("Failed to update quarantine status")?;
178
179 Ok(())
180 }
181
182 /// Unquarantine a blob
183 pub async fn unquarantine(&self, cid: Cid) -> Result<()> {
184 let cid_str = cid.to_string();
185 let did_clone = self.did.clone();
186
187 // Update the quarantine flag in the database
188 _ = self
189 .db
190 .get()
191 .await?
192 .interact(move |conn| {
193 diesel::update(blobs::table)
194 .filter(blobs::cid.eq(&cid_str))
195 .filter(blobs::did.eq(&did_clone))
196 .set(blobs::quarantined.eq(false))
197 .execute(conn)
198 .context("Failed to unquarantine blob")
199 })
200 .await
201 .expect("Failed to update unquarantine status")?;
202
203 Ok(())
204 }
205
206 /// Get a blob as a stream
207 pub async fn get_object(&self, blob_cid: Cid) -> Result<ByteStream> {
208 use self::blobs::dsl::*;
209
210 let cid_str = blob_cid.to_string();
211 let did_clone = self.did.clone();
212
213 // Get the blob data from the database
214 let blob_data = self
215 .db
216 .get()
217 .await?
218 .interact(move |conn| {
219 blobs
220 .filter(self::blobs::cid.eq(&cid_str))
221 .filter(did.eq(&did_clone))
222 .filter(quarantined.eq(false))
223 .select(data)
224 .first::<Vec<u8>>(conn)
225 .optional()
226 .context("Failed to query blob data")
227 })
228 .await
229 .expect("Failed to get blob data")?;
230
231 if let Some(bytes) = blob_data {
232 Ok(ByteStream::new(bytes))
233 } else {
234 anyhow::bail!("Blob not found: {}", blob_cid)
235 }
236 }
237
238 /// Get blob bytes
239 pub async fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>> {
240 let stream = self.get_object(cid).await?;
241 stream.collect().await
242 }
243
244 /// Get a blob as a stream
245 pub async fn get_stream(&self, cid: Cid) -> Result<ByteStream> {
246 self.get_object(cid).await
247 }
248
249 /// Delete a blob by CID string
250 pub async fn delete(&self, blob_cid: String) -> Result<()> {
251 use self::blobs::dsl::*;
252
253 let did_clone = self.did.clone();
254
255 // Delete from database
256 _ = self
257 .db
258 .get()
259 .await?
260 .interact(move |conn| {
261 diesel::delete(blobs)
262 .filter(self::blobs::cid.eq(&blob_cid))
263 .filter(did.eq(&did_clone))
264 .execute(conn)
265 .context("Failed to delete blob")
266 })
267 .await
268 .expect("Failed to delete blob")?;
269
270 Ok(())
271 }
272
273 /// Delete multiple blobs by CID
274 pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> {
275 use self::blobs::dsl::*;
276
277 let cid_strings: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect();
278 let did_clone = self.did.clone();
279
280 // Delete all blobs in one operation
281 _ = self
282 .db
283 .get()
284 .await?
285 .interact(move |conn| {
286 diesel::delete(blobs)
287 .filter(self::blobs::cid.eq_any(cid_strings))
288 .filter(did.eq(&did_clone))
289 .execute(conn)
290 .context("Failed to delete multiple blobs")
291 })
292 .await
293 .expect("Failed to delete multiple blobs")?;
294
295 Ok(())
296 }
297
298 /// Check if a blob is stored
299 pub async fn has_stored(&self, blob_cid: Cid) -> Result<bool> {
300 use self::blobs::dsl::*;
301
302 let cid_str = blob_cid.to_string();
303 let did_clone = self.did.clone();
304
305 let exists = self
306 .db
307 .get()
308 .await?
309 .interact(move |conn| {
310 diesel::select(diesel::dsl::exists(
311 blobs
312 .filter(self::blobs::cid.eq(&cid_str))
313 .filter(did.eq(&did_clone)),
314 ))
315 .get_result::<bool>(conn)
316 .context("Failed to check if blob exists")
317 })
318 .await
319 .expect("Failed to check blob existence")?;
320
321 Ok(exists)
322 }
323
324 /// Check if a temporary blob exists - now just checks if any blob exists with the key pattern
325 pub async fn has_temp(&self, key: String) -> Result<bool> {
326 // We don't have temporary blobs anymore, but for compatibility we'll check if
327 // there's a blob with a similar CID pattern
328 let temp_cid = Cid::try_from(format!("bafy{}", key)).unwrap_or_else(|_| Cid::default());
329 self.has_stored(temp_cid).await
330 }
331}