Alternative ATProto PDS implementation

prototype actor_store sql_blob

Changed files
+38 -103
src
actor_store
+38 -103
src/actor_store/sql_blob.rs
··· 6 use anyhow::{Context, Result}; 7 use cidv10::Cid; 8 use diesel::prelude::*; 9 - use rsky_common::get_random_str; 10 use std::sync::Arc; 11 12 use crate::db::DbConn; ··· 45 size: i32, 46 mime_type: String, 47 quarantined: bool, 48 - temp: bool, 49 - temp_key: Option<String>, 50 } 51 52 // Table definition for blobs ··· 58 size -> Integer, 59 mime_type -> Text, 60 quarantined -> Bool, 61 - temp -> Bool, 62 - temp_key -> Nullable<Text>, 63 } 64 } 65 ··· 75 Box::new(move |did: String| BlobStoreSql::new(did, db_clone.clone())) 76 } 77 78 - /// Generate a random key for temporary blobs 79 - fn gen_key(&self) -> String { 80 - get_random_str() 81 - } 82 - 83 - /// Store a blob temporarily 84 pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> { 85 - let key = self.gen_key(); 86 - let did_clone = self.did.clone(); 87 - let bytes_len = bytes.len() as i32; 88 89 - // Store in the database with temp flag 90 - let key_clone = key.clone(); 91 - self.db 92 - .run(move |conn| { 93 - let entry = BlobEntry { 94 - cid: "temp".to_string(), // Will be updated when made permanent 95 - did: did_clone, 96 - data: bytes, 97 - size: bytes_len, 98 - mime_type: "application/octet-stream".to_string(), // Will be updated when made permanent 99 - quarantined: false, 100 - temp: true, 101 - temp_key: Some(key_clone), 102 - }; 103 104 - diesel::insert_into(blobs::table) 105 - .values(&entry) 106 - .execute(conn) 107 - .context("Failed to insert temporary blob data") 108 - }) 109 - .await?; 110 - 111 Ok(key) 112 } 113 114 - /// Make a temporary blob permanent 115 - pub async fn make_permanent(&self, key: String, cid: Cid) -> Result<()> { 116 - let already_has = self.has_stored(cid).await?; 117 - if !already_has { 118 - let cid_str = cid.to_string(); 119 - let did_clone = self.did.clone(); 120 - 121 - // Update database record to make it permanent 122 - self.db 123 - .run(move |conn| { 124 - diesel::update(blobs::table) 125 - .filter(blobs::temp_key.eq(&key)) 126 - .filter(blobs::did.eq(&did_clone)) 127 - .set(( 128 - blobs::cid.eq(&cid_str), 129 - blobs::temp.eq(false), 130 - blobs::temp_key.eq::<Option<String>>(None), 131 - )) 132 - .execute(conn) 133 - .context("Failed to update blob to permanent status") 134 - }) 135 - .await?; 136 - 137 - Ok(()) 138 - } else { 139 - // Already exists, so delete the temporary one 140 - let did_clone = self.did.clone(); 141 - 142 - self.db 143 - .run(move |conn| { 144 - diesel::delete(blobs::table) 145 - .filter(blobs::temp_key.eq(&key)) 146 - .filter(blobs::did.eq(&did_clone)) 147 - .execute(conn) 148 - .context("Failed to delete redundant temporary blob") 149 - }) 150 - .await?; 151 - 152 - Ok(()) 153 - } 154 } 155 156 - /// Store a blob directly as permanent 157 - pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> { 158 let cid_str = cid.to_string(); 159 let did_clone = self.did.clone(); 160 let bytes_len = bytes.len() as i32; ··· 168 did: did_clone.clone(), 169 data: bytes, 170 size: bytes_len, 171 - mime_type: "application/octet-stream".to_string(), // Could be improved with MIME detection 172 quarantined: false, 173 - temp: false, 174 - temp_key: None, 175 }; 176 177 diesel::insert_into(blobs::table) ··· 180 .do_update() 181 .set(blobs::data.eq(data_clone)) 182 .execute(conn) 183 - .context("Failed to insert permanent blob data") 184 }) 185 .await?; 186 187 Ok(()) 188 } 189 190 /// Quarantine a blob ··· 321 diesel::select(diesel::dsl::exists( 322 blobs 323 .filter(self::blobs::cid.eq(&cid_str)) 324 - .filter(did.eq(&did_clone)) 325 - .filter(temp.eq(false)), 326 )) 327 .get_result::<bool>(conn) 328 .context("Failed to check if blob exists") ··· 332 Ok(exists) 333 } 334 335 - /// Check if a temporary blob exists 336 pub async fn has_temp(&self, key: String) -> Result<bool> { 337 - use self::blobs::dsl::*; 338 - 339 - let did_clone = self.did.clone(); 340 - 341 - let exists = self 342 - .db 343 - .run(move |conn| { 344 - diesel::select(diesel::dsl::exists( 345 - blobs 346 - .filter(temp_key.eq(&key)) 347 - .filter(did.eq(&did_clone)) 348 - .filter(temp.eq(true)), 349 - )) 350 - .get_result::<bool>(conn) 351 - .context("Failed to check if temporary blob exists") 352 - }) 353 - .await?; 354 - 355 - Ok(exists) 356 } 357 }
··· 6 use anyhow::{Context, Result}; 7 use cidv10::Cid; 8 use diesel::prelude::*; 9 use std::sync::Arc; 10 11 use crate::db::DbConn; ··· 44 size: i32, 45 mime_type: String, 46 quarantined: bool, 47 } 48 49 // Table definition for blobs ··· 55 size -> Integer, 56 mime_type -> Text, 57 quarantined -> Bool, 58 } 59 } 60 ··· 70 Box::new(move |did: String| BlobStoreSql::new(did, db_clone.clone())) 71 } 72 73 + /// Store a blob temporarily - now just stores permanently with a key returned for API compatibility 74 pub async fn put_temp(&self, bytes: Vec<u8>) -> Result<String> { 75 + // Generate a unique key as a CID based on the data 76 + use sha2::{Digest, Sha256}; 77 + let digest = Sha256::digest(&bytes); 78 + let key = hex::encode(digest); 79 80 + // Just store the blob directly 81 + self.put_permanent_with_mime( 82 + Cid::try_from(format!("bafy{}", key)).unwrap_or_else(|_| Cid::default()), 83 + bytes, 84 + "application/octet-stream".to_string(), 85 + ) 86 + .await?; 87 88 + // Return the key for API compatibility 89 Ok(key) 90 } 91 92 + /// Make a temporary blob permanent - just a no-op for API compatibility 93 + pub async fn make_permanent(&self, _key: String, _cid: Cid) -> Result<()> { 94 + // No-op since we don't have temporary blobs anymore 95 + Ok(()) 96 } 97 98 + /// Store a blob with specific mime type 99 + pub async fn put_permanent_with_mime( 100 + &self, 101 + cid: Cid, 102 + bytes: Vec<u8>, 103 + mime_type: String, 104 + ) -> Result<()> { 105 let cid_str = cid.to_string(); 106 let did_clone = self.did.clone(); 107 let bytes_len = bytes.len() as i32; ··· 115 did: did_clone.clone(), 116 data: bytes, 117 size: bytes_len, 118 + mime_type, 119 quarantined: false, 120 }; 121 122 diesel::insert_into(blobs::table) ··· 125 .do_update() 126 .set(blobs::data.eq(data_clone)) 127 .execute(conn) 128 + .context("Failed to insert blob data") 129 }) 130 .await?; 131 132 Ok(()) 133 + } 134 + 135 + /// Store a blob directly as permanent 136 + pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> { 137 + self.put_permanent_with_mime(cid, bytes, "application/octet-stream".to_string()) 138 + .await 139 } 140 141 /// Quarantine a blob ··· 272 diesel::select(diesel::dsl::exists( 273 blobs 274 .filter(self::blobs::cid.eq(&cid_str)) 275 + .filter(did.eq(&did_clone)), 276 )) 277 .get_result::<bool>(conn) 278 .context("Failed to check if blob exists") ··· 282 Ok(exists) 283 } 284 285 + /// Check if a temporary blob exists - now just checks if any blob exists with the key pattern 286 pub async fn has_temp(&self, key: String) -> Result<bool> { 287 + // We don't have temporary blobs anymore, but for compatibility we'll check if 288 + // there's a blob with a similar CID pattern 289 + let temp_cid = Cid::try_from(format!("bafy{}", key)).unwrap_or_else(|_| Cid::default()); 290 + self.has_stored(temp_cid).await 291 } 292 }