Alternative ATProto PDS implementation
at oauth 9.9 kB view raw
1//! SQL-based blob storage implementation 2#![expect( 3 clippy::pub_use, 4 clippy::single_char_lifetime_names, 5 unused_qualifications, 6 unnameable_types 7)] 8use anyhow::{Context, Result}; 9use cidv10::Cid; 10use diesel::prelude::*; 11 12/// ByteStream implementation for blob data 13pub struct ByteStream { 14 pub bytes: Vec<u8>, 15} 16 17impl ByteStream { 18 pub const fn new(bytes: Vec<u8>) -> Self { 19 Self { bytes } 20 } 21 22 pub async fn collect(self) -> Result<Vec<u8>> { 23 Ok(self.bytes) 24 } 25} 26 27/// SQL-based implementation of blob storage 28pub struct BlobStoreSql { 29 /// Database connection for metadata 30 pub db: deadpool_diesel::Pool< 31 deadpool_diesel::Manager<SqliteConnection>, 32 deadpool_diesel::sqlite::Object, 33 >, 34 /// DID of the actor 35 pub did: String, 36} 37 38/// Blob table structure for SQL operations 39#[derive(Queryable, Insertable, Debug)] 40#[diesel(table_name = blobs)] 41struct BlobEntry { 42 cid: String, 43 did: String, 44 data: Vec<u8>, 45 size: i32, 46 mime_type: String, 47 quarantined: bool, 48} 49 50// Table definition for blobs 51table! { 52 blobs (cid, did) { 53 cid -> Text, 54 did -> Text, 55 data -> Binary, 56 size -> Integer, 57 mime_type -> Text, 58 quarantined -> Bool, 59 } 60} 61 62impl BlobStoreSql { 63 /// Create a new SQL-based blob store for the given DID 64 pub const fn new( 65 did: String, 66 db: deadpool_diesel::Pool< 67 deadpool_diesel::Manager<SqliteConnection>, 68 deadpool_diesel::sqlite::Object, 69 >, 70 ) -> Self { 71 Self { db, did } 72 } 73 74 // /// Create a factory function for blob stores 75 pub fn creator( 76 db: deadpool_diesel::Pool< 77 deadpool_diesel::Manager<SqliteConnection>, 78 deadpool_diesel::sqlite::Object, 79 >, 80 ) -> Box<dyn Fn(String) -> BlobStoreSql> { 81 let db_clone = db.clone(); 82 Box::new(move |did: String| BlobStoreSql::new(did, db_clone.clone())) 83 } 84 85 /// Store a blob temporarily - now just stores permanently with a key returned for API compatibility 86 pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> { 87 // Generate a unique key as a CID based on the data 88 // use sha2::{Digest, Sha256}; 89 // let digest = Sha256::digest(&bytes); 90 // let key = hex::encode(digest); 91 let key = rsky_common::get_random_str(); 92 93 // Just store the blob directly 94 self.put_permanent_with_mime( 95 Cid::try_from(format!("bafy{}", key)).unwrap_or_else(|_| Cid::default()), 96 bytes, 97 "application/octet-stream".to_owned(), 98 ) 99 .await?; 100 101 // Return the key for API compatibility 102 Ok(key) 103 } 104 105 /// Make a temporary blob permanent - just a no-op for API compatibility 106 pub async fn make_permanent(&self, _key: String, _cid: Cid) -> Result<()> { 107 // No-op since we don't have temporary blobs anymore 108 Ok(()) 109 } 110 111 /// Store a blob with specific mime type 112 pub async fn put_permanent_with_mime( 113 &self, 114 cid: Cid, 115 bytes: Vec<u8>, 116 mime_type: String, 117 ) -> Result<()> { 118 let cid_str = cid.to_string(); 119 let did_clone = self.did.clone(); 120 let bytes_len = bytes.len() as i32; 121 122 // Store directly in the database 123 _ = self 124 .db 125 .get() 126 .await? 127 .interact(move |conn| { 128 let data_clone = bytes.clone(); 129 let entry = BlobEntry { 130 cid: cid_str.clone(), 131 did: did_clone.clone(), 132 data: bytes, 133 size: bytes_len, 134 mime_type, 135 quarantined: false, 136 }; 137 138 diesel::insert_into(blobs::table) 139 .values(&entry) 140 .on_conflict((blobs::cid, blobs::did)) 141 .do_update() 142 .set(blobs::data.eq(data_clone)) 143 .execute(conn) 144 .context("Failed to insert blob data") 145 }) 146 .await 147 .expect("Failed to store blob data")?; 148 149 Ok(()) 150 } 151 152 /// Store a blob directly as permanent 153 pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> { 154 self.put_permanent_with_mime(cid, bytes, "application/octet-stream".to_owned()) 155 .await 156 } 157 158 /// Quarantine a blob 159 pub async fn quarantine(&self, cid: Cid) -> Result<()> { 160 let cid_str = cid.to_string(); 161 let did_clone = self.did.clone(); 162 163 // Update the quarantine flag in the database 164 _ = self 165 .db 166 .get() 167 .await? 168 .interact(move |conn| { 169 diesel::update(blobs::table) 170 .filter(blobs::cid.eq(&cid_str)) 171 .filter(blobs::did.eq(&did_clone)) 172 .set(blobs::quarantined.eq(true)) 173 .execute(conn) 174 .context("Failed to quarantine blob") 175 }) 176 .await 177 .expect("Failed to update quarantine status")?; 178 179 Ok(()) 180 } 181 182 /// Unquarantine a blob 183 pub async fn unquarantine(&self, cid: Cid) -> Result<()> { 184 let cid_str = cid.to_string(); 185 let did_clone = self.did.clone(); 186 187 // Update the quarantine flag in the database 188 _ = self 189 .db 190 .get() 191 .await? 192 .interact(move |conn| { 193 diesel::update(blobs::table) 194 .filter(blobs::cid.eq(&cid_str)) 195 .filter(blobs::did.eq(&did_clone)) 196 .set(blobs::quarantined.eq(false)) 197 .execute(conn) 198 .context("Failed to unquarantine blob") 199 }) 200 .await 201 .expect("Failed to update unquarantine status")?; 202 203 Ok(()) 204 } 205 206 /// Get a blob as a stream 207 pub async fn get_object(&self, blob_cid: Cid) -> Result<ByteStream> { 208 use self::blobs::dsl::*; 209 210 let cid_str = blob_cid.to_string(); 211 let did_clone = self.did.clone(); 212 213 // Get the blob data from the database 214 let blob_data = self 215 .db 216 .get() 217 .await? 218 .interact(move |conn| { 219 blobs 220 .filter(self::blobs::cid.eq(&cid_str)) 221 .filter(did.eq(&did_clone)) 222 .filter(quarantined.eq(false)) 223 .select(data) 224 .first::<Vec<u8>>(conn) 225 .optional() 226 .context("Failed to query blob data") 227 }) 228 .await 229 .expect("Failed to get blob data")?; 230 231 if let Some(bytes) = blob_data { 232 Ok(ByteStream::new(bytes)) 233 } else { 234 anyhow::bail!("Blob not found: {}", blob_cid) 235 } 236 } 237 238 /// Get blob bytes 239 pub async fn get_bytes(&self, cid: Cid) -> Result<Vec<u8>> { 240 let stream = self.get_object(cid).await?; 241 stream.collect().await 242 } 243 244 /// Get a blob as a stream 245 pub async fn get_stream(&self, cid: Cid) -> Result<ByteStream> { 246 self.get_object(cid).await 247 } 248 249 /// Delete a blob by CID string 250 pub async fn delete(&self, blob_cid: String) -> Result<()> { 251 use self::blobs::dsl::*; 252 253 let did_clone = self.did.clone(); 254 255 // Delete from database 256 _ = self 257 .db 258 .get() 259 .await? 260 .interact(move |conn| { 261 diesel::delete(blobs) 262 .filter(self::blobs::cid.eq(&blob_cid)) 263 .filter(did.eq(&did_clone)) 264 .execute(conn) 265 .context("Failed to delete blob") 266 }) 267 .await 268 .expect("Failed to delete blob")?; 269 270 Ok(()) 271 } 272 273 /// Delete multiple blobs by CID 274 pub async fn delete_many(&self, cids: Vec<Cid>) -> Result<()> { 275 use self::blobs::dsl::*; 276 277 let cid_strings: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); 278 let did_clone = self.did.clone(); 279 280 // Delete all blobs in one operation 281 _ = self 282 .db 283 .get() 284 .await? 285 .interact(move |conn| { 286 diesel::delete(blobs) 287 .filter(self::blobs::cid.eq_any(cid_strings)) 288 .filter(did.eq(&did_clone)) 289 .execute(conn) 290 .context("Failed to delete multiple blobs") 291 }) 292 .await 293 .expect("Failed to delete multiple blobs")?; 294 295 Ok(()) 296 } 297 298 /// Check if a blob is stored 299 pub async fn has_stored(&self, blob_cid: Cid) -> Result<bool> { 300 use self::blobs::dsl::*; 301 302 let cid_str = blob_cid.to_string(); 303 let did_clone = self.did.clone(); 304 305 let exists = self 306 .db 307 .get() 308 .await? 309 .interact(move |conn| { 310 diesel::select(diesel::dsl::exists( 311 blobs 312 .filter(self::blobs::cid.eq(&cid_str)) 313 .filter(did.eq(&did_clone)), 314 )) 315 .get_result::<bool>(conn) 316 .context("Failed to check if blob exists") 317 }) 318 .await 319 .expect("Failed to check blob existence")?; 320 321 Ok(exists) 322 } 323 324 /// Check if a temporary blob exists - now just checks if any blob exists with the key pattern 325 pub async fn has_temp(&self, key: String) -> Result<bool> { 326 // We don't have temporary blobs anymore, but for compatibility we'll check if 327 // there's a blob with a similar CID pattern 328 let temp_cid = Cid::try_from(format!("bafy{}", key)).unwrap_or_else(|_| Cid::default()); 329 self.has_stored(temp_cid).await 330 } 331}