Alternative ATProto PDS implementation

lint

+25 -21
src/account_manager/helpers/account.rs
··· 34 34 let AvailabilityFlags { 35 35 include_taken_down, 36 36 include_deactivated, 37 - } = flags.unwrap_or_else(|| AvailabilityFlags { 37 + } = flags.unwrap_or(AvailabilityFlags { 38 38 include_taken_down: Some(false), 39 39 include_deactivated: Some(false), 40 40 }); ··· 258 258 use rsky_pds::schema::pds::repo_root::dsl as RepoRootSchema; 259 259 260 260 let did = did.to_owned(); 261 - db.get() 261 + _ = db 262 + .get() 262 263 .await? 263 264 .interact(move |conn| { 264 - delete(RepoRootSchema::repo_root) 265 + _ = delete(RepoRootSchema::repo_root) 265 266 .filter(RepoRootSchema::did.eq(&did)) 266 267 .execute(conn)?; 267 - delete(EmailTokenSchema::email_token) 268 + _ = delete(EmailTokenSchema::email_token) 268 269 .filter(EmailTokenSchema::did.eq(&did)) 269 270 .execute(conn)?; 270 - delete(RefreshTokenSchema::refresh_token) 271 + _ = delete(RefreshTokenSchema::refresh_token) 271 272 .filter(RefreshTokenSchema::did.eq(&did)) 272 273 .execute(conn)?; 273 - delete(AccountSchema::account) 274 + _ = delete(AccountSchema::account) 274 275 .filter(AccountSchema::did.eq(&did)) 275 276 .execute(conn)?; 276 277 delete(ActorSchema::actor) ··· 291 292 >, 292 293 ) -> Result<()> { 293 294 let takedown_ref: Option<String> = match takedown.applied { 294 - true => match takedown.r#ref { 295 - Some(takedown_ref) => Some(takedown_ref), 296 - None => Some(rsky_common::now()), 297 - }, 295 + true => takedown 296 + .r#ref 297 + .map_or_else(|| Some(rsky_common::now()), Some), 298 298 false => None, 299 299 }; 300 300 let did = did.to_owned(); 301 - db.get() 301 + _ = db 302 + .get() 302 303 .await? 303 304 .interact(move |conn| { 304 305 update(ActorSchema::actor) ··· 320 321 >, 321 322 ) -> Result<()> { 322 323 let did = did.to_owned(); 323 - db.get() 324 + _ = db 325 + .get() 324 326 .await? 325 327 .interact(move |conn| { 326 328 update(ActorSchema::actor) ··· 344 346 >, 345 347 ) -> Result<()> { 346 348 let did = did.to_owned(); 347 - db.get() 349 + _ = db 350 + .get() 348 351 .await? 349 352 .interact(move |conn| { 350 353 update(ActorSchema::actor) ··· 443 446 >, 444 447 ) -> Result<()> { 445 448 let did = did.to_owned(); 446 - db.get() 449 + _ = db 450 + .get() 447 451 .await? 448 452 .interact(move |conn| { 449 453 update(AccountSchema::account) ··· 479 483 match res { 480 484 None => Ok(None), 481 485 Some(res) => { 482 - let takedown = match res.0 { 483 - Some(takedown_ref) => StatusAttr { 484 - applied: true, 485 - r#ref: Some(takedown_ref), 486 - }, 487 - None => StatusAttr { 486 + let takedown = res.0.map_or( 487 + StatusAttr { 488 488 applied: false, 489 489 r#ref: None, 490 490 }, 491 - }; 491 + |takedown_ref| StatusAttr { 492 + applied: true, 493 + r#ref: Some(takedown_ref), 494 + }, 495 + ); 492 496 let deactivated = match res.1 { 493 497 Some(_) => StatusAttr { 494 498 applied: true,
+21 -18
src/account_manager/helpers/auth.rs
··· 26 26 27 27 let exp = from_micros_to_utc((payload.exp.as_millis() / 1000) as i64); 28 28 29 - db.get() 29 + _ = db 30 + .get() 30 31 .await? 31 32 .interact(move |conn| { 32 33 insert_into(RefreshTokenSchema::refresh_token) ··· 149 150 db.get() 150 151 .await? 151 152 .interact(move |conn| { 152 - delete(RefreshTokenSchema::refresh_token) 153 + _ = delete(RefreshTokenSchema::refresh_token) 153 154 .filter(RefreshTokenSchema::did.eq(did)) 154 155 .filter(RefreshTokenSchema::expiresAt.le(now)) 155 156 .execute(conn)?; ··· 176 177 } = opts; 177 178 use rsky_pds::schema::pds::refresh_token::dsl as RefreshTokenSchema; 178 179 179 - update(RefreshTokenSchema::refresh_token) 180 - .filter(RefreshTokenSchema::id.eq(id)) 181 - .filter( 182 - RefreshTokenSchema::nextId 183 - .is_null() 184 - .or(RefreshTokenSchema::nextId.eq(&next_id)), 185 - ) 186 - .set(( 187 - RefreshTokenSchema::expiresAt.eq(expires_at), 188 - RefreshTokenSchema::nextId.eq(&next_id), 189 - )) 190 - .returning(models::RefreshToken::as_select()) 191 - .get_results(conn) 192 - .map_err(|error| { 193 - anyhow::Error::new(AuthHelperError::ConcurrentRefresh).context(error) 194 - })?; 180 + drop( 181 + update(RefreshTokenSchema::refresh_token) 182 + .filter(RefreshTokenSchema::id.eq(id)) 183 + .filter( 184 + RefreshTokenSchema::nextId 185 + .is_null() 186 + .or(RefreshTokenSchema::nextId.eq(&next_id)), 187 + ) 188 + .set(( 189 + RefreshTokenSchema::expiresAt.eq(expires_at), 190 + RefreshTokenSchema::nextId.eq(&next_id), 191 + )) 192 + .returning(models::RefreshToken::as_select()) 193 + .get_results(conn) 194 + .map_err(|error| { 195 + anyhow::Error::new(AuthHelperError::ConcurrentRefresh).context(error) 196 + })?, 197 + ); 195 198 Ok(()) 196 199 }) 197 200 .await
+23 -20
src/account_manager/helpers/email_token.rs
··· 2 2 //! blacksky-algorithms/rsky is licensed under the Apache License 2.0 3 3 //! 4 4 //! Modified for SQLite backend 5 + #![allow(unnameable_types, unused_qualifications)] 5 6 use anyhow::{Result, bail}; 6 7 use diesel::*; 7 8 use rsky_common::time::{MINUTE, from_str_to_utc, less_than_ago_s}; ··· 24 25 db.get() 25 26 .await? 26 27 .interact(move |conn| { 27 - insert_into(EmailTokenSchema::email_token) 28 + _ = insert_into(EmailTokenSchema::email_token) 28 29 .values(( 29 30 EmailTokenSchema::purpose.eq(purpose), 30 31 EmailTokenSchema::did.eq(did), ··· 146 147 } 147 148 148 149 impl EmailTokenPurpose { 149 - pub fn as_str(&self) -> &'static str { 150 + pub const fn as_str(&self) -> &'static str { 150 151 match self { 151 - EmailTokenPurpose::ConfirmEmail => "confirm_email", 152 - EmailTokenPurpose::UpdateEmail => "update_email", 153 - EmailTokenPurpose::ResetPassword => "reset_password", 154 - EmailTokenPurpose::DeleteAccount => "delete_account", 155 - EmailTokenPurpose::PlcOperation => "plc_operation", 152 + Self::ConfirmEmail => "confirm_email", 153 + Self::UpdateEmail => "update_email", 154 + Self::ResetPassword => "reset_password", 155 + Self::DeleteAccount => "delete_account", 156 + Self::PlcOperation => "plc_operation", 156 157 } 157 158 } 158 159 159 160 pub fn from_str(s: &str) -> Result<Self> { 160 161 match s { 161 - "confirm_email" => Ok(EmailTokenPurpose::ConfirmEmail), 162 - "update_email" => Ok(EmailTokenPurpose::UpdateEmail), 163 - "reset_password" => Ok(EmailTokenPurpose::ResetPassword), 164 - "delete_account" => Ok(EmailTokenPurpose::DeleteAccount), 165 - "plc_operation" => Ok(EmailTokenPurpose::PlcOperation), 162 + "confirm_email" => Ok(Self::ConfirmEmail), 163 + "update_email" => Ok(Self::UpdateEmail), 164 + "reset_password" => Ok(Self::ResetPassword), 165 + "delete_account" => Ok(Self::DeleteAccount), 166 + "plc_operation" => Ok(Self::PlcOperation), 166 167 _ => bail!("Unable to parse as EmailTokenPurpose: `{s:?}`"), 167 168 } 168 169 } ··· 176 177 type Row = String; 177 178 178 179 fn build(s: String) -> deserialize::Result<Self> { 179 - Ok(EmailTokenPurpose::from_str(&s)?) 180 + Ok(Self::from_str(&s)?) 180 181 } 181 182 } 182 183 ··· 190 191 ) -> serialize::Result { 191 192 serialize::ToSql::<sql_types::Text, sqlite::Sqlite>::to_sql( 192 193 match self { 193 - EmailTokenPurpose::ConfirmEmail => "confirm_email", 194 - EmailTokenPurpose::UpdateEmail => "update_email", 195 - EmailTokenPurpose::ResetPassword => "reset_password", 196 - EmailTokenPurpose::DeleteAccount => "delete_account", 197 - EmailTokenPurpose::PlcOperation => "plc_operation", 194 + Self::ConfirmEmail => "confirm_email", 195 + Self::UpdateEmail => "update_email", 196 + Self::ResetPassword => "reset_password", 197 + Self::DeleteAccount => "delete_account", 198 + Self::PlcOperation => "plc_operation", 198 199 }, 199 200 out, 200 201 ) ··· 211 212 ) -> Result<()> { 212 213 use rsky_pds::schema::pds::email_token::dsl as EmailTokenSchema; 213 214 let did = did.to_owned(); 214 - db.get() 215 + _ = db 216 + .get() 215 217 .await? 216 218 .interact(move |conn| { 217 219 delete(EmailTokenSchema::email_token) ··· 234 236 use rsky_pds::schema::pds::email_token::dsl as EmailTokenSchema; 235 237 236 238 let did = did.to_owned(); 237 - db.get() 239 + _ = db 240 + .get() 238 241 .await? 239 242 .interact(move |conn| { 240 243 delete(EmailTokenSchema::email_token)
+27 -17
src/account_manager/helpers/invite.rs
··· 39 39 .first(conn) 40 40 .optional()?; 41 41 42 - if invite.is_none() || invite.clone().unwrap().disabled > 0 { 43 - bail!("InvalidInviteCode: None or disabled. Provided invite code not available `{invite_code:?}`") 44 - } 42 + if let Some(invite) = invite { 43 + if invite.disabled > 0 { 44 + bail!("InvalidInviteCode: Disabled. Provided invite code not available `{invite_code:?}`"); 45 + } 45 46 46 - let uses: i64 = InviteCodeUseSchema::invite_code_use 47 - .count() 48 - .filter(InviteCodeUseSchema::code.eq(&invite_code)) 49 - .first(conn)?; 47 + let uses: i64 = InviteCodeUseSchema::invite_code_use 48 + .count() 49 + .filter(InviteCodeUseSchema::code.eq(&invite_code)) 50 + .first(conn)?; 50 51 51 - if invite.unwrap().available_uses as i64 <= uses { 52 - bail!("InvalidInviteCode: Not enough uses. Provided invite code not available `{invite_code:?}`") 52 + if invite.available_uses as i64 <= uses { 53 + bail!("InvalidInviteCode: Not enough uses. Provided invite code not available `{invite_code:?}`"); 54 + } 55 + } else { 56 + bail!("InvalidInviteCode: None. Provided invite code not available `{invite_code:?}`"); 53 57 } 58 + 54 59 Ok(()) 55 60 }).await.expect("Failed to check invite code availability")?; 56 61 ··· 69 74 if let Some(invite_code) = invite_code { 70 75 use rsky_pds::schema::pds::invite_code_use::dsl as InviteCodeUseSchema; 71 76 72 - db.get() 77 + _ = db 78 + .get() 73 79 .await? 74 80 .interact(move |conn| { 75 81 insert_into(InviteCodeUseSchema::invite_code_use) ··· 97 103 use rsky_pds::schema::pds::invite_code::dsl as InviteCodeSchema; 98 104 let created_at = rsky_common::now(); 99 105 100 - db.get() 106 + _ = db 107 + .get() 101 108 .await? 102 109 .interact(move |conn| { 103 110 let rows: Vec<models::InviteCode> = to_create ··· 158 165 }) 159 166 .collect(); 160 167 161 - insert_into(InviteCodeSchema::invite_code) 168 + _ = insert_into(InviteCodeSchema::invite_code) 162 169 .values(&rows) 163 170 .execute(conn)?; 164 171 ··· 256 263 } = invite_code_use; 257 264 match uses.get_mut(&code) { 258 265 None => { 259 - uses.insert(code, vec![CodeUse { used_by, used_at }]); 266 + drop(uses.insert(code, vec![CodeUse { used_by, used_at }])); 260 267 } 261 268 Some(matched_uses) => matched_uses.push(CodeUse { used_by, used_at }), 262 269 }; ··· 317 324 BTreeMap::new(), 318 325 |mut acc: BTreeMap<String, CodeDetail>, cur| { 319 326 for code_use in &cur.uses { 320 - acc.insert(code_use.used_by.clone(), cur.clone()); 327 + drop(acc.insert(code_use.used_by.clone(), cur.clone())); 321 328 } 322 329 acc 323 330 }, ··· 336 343 337 344 let disabled: i16 = if disabled { 1 } else { 0 }; 338 345 let did = did.to_owned(); 339 - db.get() 346 + _ = db 347 + .get() 340 348 .await? 341 349 .interact(move |conn| { 342 350 update(AccountSchema::account) ··· 360 368 361 369 let DisableInviteCodesOpts { codes, accounts } = opts; 362 370 if !codes.is_empty() { 363 - db.get() 371 + _ = db 372 + .get() 364 373 .await? 365 374 .interact(move |conn| { 366 375 update(InviteCodeSchema::invite_code) ··· 372 381 .expect("Failed to disable invite codes")?; 373 382 } 374 383 if !accounts.is_empty() { 375 - db.get() 384 + _ = db 385 + .get() 376 386 .await? 377 387 .interact(move |conn| { 378 388 update(InviteCodeSchema::invite_code)
+2 -2
src/account_manager/helpers/password.rs
··· 156 156 db.get() 157 157 .await? 158 158 .interact(move |conn| { 159 - update(AccountSchema::account) 159 + _ = update(AccountSchema::account) 160 160 .filter(AccountSchema::did.eq(opts.did)) 161 161 .set(AccountSchema::password.eq(opts.password_encrypted)) 162 162 .execute(conn)?; ··· 181 181 db.get() 182 182 .await? 183 183 .interact(move |conn| { 184 - delete(AppPasswordSchema::app_password) 184 + _ = delete(AppPasswordSchema::app_password) 185 185 .filter(AppPasswordSchema::did.eq(did)) 186 186 .filter(AppPasswordSchema::name.eq(name)) 187 187 .execute(conn)?;
+2 -1
src/account_manager/helpers/repo.rs
··· 20 20 21 21 let now = rsky_common::now(); 22 22 23 - db.get() 23 + _ = db 24 + .get() 24 25 .await? 25 26 .interact(move |conn| { 26 27 insert_into(RepoRootSchema::repo_root)
+8 -7
src/account_manager/mod.rs
··· 66 66 >; 67 67 68 68 impl AccountManager { 69 - pub fn new( 69 + pub const fn new( 70 70 db: deadpool_diesel::Pool< 71 71 deadpool_diesel::Manager<SqliteConnection>, 72 72 deadpool_diesel::sqlite::Object, ··· 81 81 deadpool_diesel::Manager<SqliteConnection>, 82 82 deadpool_diesel::sqlite::Object, 83 83 >| 84 - -> AccountManager { AccountManager::new(db) }, 84 + -> Self { Self::new(db) }, 85 85 ) 86 86 } 87 87 ··· 153 153 let (access_jwt, refresh_jwt) = auth::create_tokens(CreateTokensOpts { 154 154 did: did.clone(), 155 155 jwt_key, 156 - service_did: env::var("PDS_SERVICE_DID").unwrap(), 156 + service_did: env::var("PDS_SERVICE_DID").expect("PDS_SERVICE_DID not set"), 157 157 scope: Some(AuthScope::Access), 158 158 jti: None, 159 159 expires_in: None, ··· 246 246 let (access_jwt, refresh_jwt) = auth::create_tokens(CreateTokensOpts { 247 247 did, 248 248 jwt_key, 249 - service_did: env::var("PDS_SERVICE_DID").unwrap(), 249 + service_did: env::var("PDS_SERVICE_DID").expect("PDS_SERVICE_DID not set"), 250 250 scope: Some(scope), 251 251 jti: None, 252 252 expires_in: None, ··· 289 289 let next_id = token.next_id.unwrap_or_else(auth::get_refresh_token_id); 290 290 291 291 let secp = Secp256k1::new(); 292 - let private_key = env::var("PDS_JWT_KEY_K256_PRIVATE_KEY_HEX").unwrap(); 292 + let private_key = env::var("PDS_JWT_KEY_K256_PRIVATE_KEY_HEX") 293 + .expect("PDS_JWT_KEY_K256_PRIVATE_KEY_HEX not set"); 293 294 let secret_key = 294 - SecretKey::from_slice(&hex::decode(private_key.as_bytes()).unwrap()).unwrap(); 295 + SecretKey::from_slice(&hex::decode(private_key.as_bytes()).expect("Invalid key"))?; 295 296 let jwt_key = Keypair::from_secret_key(&secp, &secret_key); 296 297 297 298 let (access_jwt, refresh_jwt) = auth::create_tokens(CreateTokensOpts { 298 299 did: token.did, 299 300 jwt_key, 300 - service_did: env::var("PDS_SERVICE_DID").unwrap(), 301 + service_did: env::var("PDS_SERVICE_DID").expect("PDS_SERVICE_DID not set"), 301 302 scope: Some(if token.app_password_name.is_none() { 302 303 AuthScope::Access 303 304 } else {
+61 -56
src/actor_store/blob.rs
··· 58 58 deadpool_diesel::sqlite::Object, 59 59 >, 60 60 ) -> Self { 61 - BlobReader { 61 + Self { 62 62 did: blobstore.did.clone(), 63 63 blobstore, 64 64 db, ··· 158 158 size, 159 159 cid, 160 160 mime_type, 161 - width: if let Some(ref info) = img_info { 162 - Some(info.width as i32) 163 - } else { 164 - None 165 - }, 161 + width: img_info.as_ref().map(|info| info.width as i32), 166 162 height: if let Some(info) = img_info { 167 163 Some(info.height as i32) 168 164 } else { ··· 207 203 SET \"tempKey\" = EXCLUDED.\"tempKey\" \ 208 204 WHERE pds.blob.\"tempKey\" is not null;"); 209 205 #[expect(trivial_casts)] 210 - upsert 206 + let _ = upsert 211 207 .bind::<Text, _>(&cid.to_string()) 212 208 .bind::<Text, _>(&did) 213 209 .bind::<Text, _>(&mime_type) 214 210 .bind::<Integer, _>(size as i32) 215 - .bind::<Nullable<Text>, _>(Some(temp_key.clone())) 211 + .bind::<Nullable<Text>, _>(Some(temp_key)) 216 212 .bind::<Nullable<Integer>, _>(width) 217 213 .bind::<Nullable<Integer>, _>(height) 218 214 .bind::<Text, _>(created_at) ··· 227 223 pub async fn process_write_blobs(&self, writes: Vec<PreparedWrite>) -> Result<()> { 228 224 self.delete_dereferenced_blobs(writes.clone()).await?; 229 225 230 - let _ = stream::iter(writes) 231 - .then(|write| async move { 232 - Ok::<(), anyhow::Error>(match write { 233 - PreparedWrite::Create(w) => { 234 - for blob in w.blobs { 235 - self.verify_blob_and_make_permanent(blob.clone()).await?; 236 - self.associate_blob(blob, w.uri.clone()).await?; 226 + drop( 227 + stream::iter(writes) 228 + .then(async move |write| { 229 + match write { 230 + PreparedWrite::Create(w) => { 231 + for blob in w.blobs { 232 + self.verify_blob_and_make_permanent(blob.clone()).await?; 233 + self.associate_blob(blob, w.uri.clone()).await?; 234 + } 237 235 } 238 - } 239 - PreparedWrite::Update(w) => { 240 - for blob in w.blobs { 241 - self.verify_blob_and_make_permanent(blob.clone()).await?; 242 - self.associate_blob(blob, w.uri.clone()).await?; 236 + PreparedWrite::Update(w) => { 237 + for blob in w.blobs { 238 + self.verify_blob_and_make_permanent(blob.clone()).await?; 239 + self.associate_blob(blob, w.uri.clone()).await?; 240 + } 243 241 } 244 - } 245 - _ => (), 242 + _ => (), 243 + }; 244 + Ok::<(), anyhow::Error>(()) 246 245 }) 247 - }) 248 - .collect::<Vec<_>>() 249 - .await 250 - .into_iter() 251 - .collect::<Result<Vec<_>, _>>()?; 246 + .collect::<Vec<_>>() 247 + .await 248 + .into_iter() 249 + .collect::<Result<Vec<_>, _>>()?, 250 + ); 252 251 253 252 Ok(()) 254 253 } ··· 295 294 296 295 // Now perform the delete 297 296 let uris_clone = uris.clone(); 298 - self.db 297 + _ = self 298 + .db 299 299 .get() 300 300 .await? 301 301 .interact(move |conn| { ··· 354 354 // Delete from the blob table 355 355 let cids = cids_to_delete.clone(); 356 356 let did_clone = self.did.clone(); 357 - self.db 357 + _ = self 358 + .db 358 359 .get() 359 360 .await? 360 361 .interact(move |conn| { ··· 368 369 369 370 // Delete from blob storage 370 371 // Ideally we'd use a background queue here, but for now: 371 - let _ = stream::iter(cids_to_delete) 372 - .then(|cid| async move { 373 - match Cid::from_str(&cid) { 372 + drop( 373 + stream::iter(cids_to_delete) 374 + .then(async move |cid| match Cid::from_str(&cid) { 374 375 Ok(cid) => self.blobstore.delete(cid.to_string()).await, 375 376 Err(e) => Err(anyhow::Error::new(e)), 376 - } 377 - }) 378 - .collect::<Vec<_>>() 379 - .await 380 - .into_iter() 381 - .collect::<Result<Vec<_>, _>>()?; 377 + }) 378 + .collect::<Vec<_>>() 379 + .await 380 + .into_iter() 381 + .collect::<Result<Vec<_>, _>>()?, 382 + ); 382 383 383 384 Ok(()) 384 385 } ··· 412 413 .make_permanent(temp_key.clone(), blob.cid) 413 414 .await?; 414 415 } 415 - self.db 416 + _ = self 417 + .db 416 418 .get() 417 419 .await? 418 420 .interact(move |conn| { ··· 436 438 let cid = blob.cid.to_string(); 437 439 let did = self.did.clone(); 438 440 439 - self.db 441 + _ = self 442 + .db 440 443 .get() 441 444 .await? 442 445 .interact(move |conn| { ··· 628 631 629 632 match res { 630 633 None => Ok(None), 631 - Some(res) => match res.takedown_ref { 632 - None => Ok(Some(StatusAttr { 633 - applied: false, 634 - r#ref: None, 635 - })), 636 - Some(takedown_ref) => Ok(Some(StatusAttr { 637 - applied: true, 638 - r#ref: Some(takedown_ref), 639 - })), 640 - }, 634 + Some(res) => res.takedown_ref.map_or_else( 635 + || { 636 + Ok(Some(StatusAttr { 637 + applied: false, 638 + r#ref: None, 639 + })) 640 + }, 641 + |takedown_ref| { 642 + Ok(Some(StatusAttr { 643 + applied: true, 644 + r#ref: Some(takedown_ref), 645 + })) 646 + }, 647 + ), 641 648 } 642 649 }) 643 650 .await ··· 649 656 use rsky_pds::schema::pds::blob::dsl as BlobSchema; 650 657 651 658 let takedown_ref: Option<String> = match takedown.applied { 652 - true => match takedown.r#ref { 653 - Some(takedown_ref) => Some(takedown_ref), 654 - None => Some(now()), 655 - }, 659 + true => takedown.r#ref.map_or_else(|| Some(now()), Some), 656 660 false => None, 657 661 }; 658 662 659 663 let blob_cid = blob.to_string(); 660 664 let did_clone = self.did.clone(); 661 665 662 - self.db 666 + _ = self 667 + .db 663 668 .get() 664 669 .await? 665 670 .interact(move |conn| { 666 - update(BlobSchema::blob) 671 + _ = update(BlobSchema::blob) 667 672 .filter(BlobSchema::cid.eq(blob_cid)) 668 673 .filter(BlobSchema::did.eq(did_clone)) 669 674 .set(BlobSchema::takedownRef.eq(takedown_ref))
+97 -68
src/actor_store/mod.rs
··· 81 81 >, 82 82 conn: deadpool_diesel::sqlite::Object, 83 83 ) -> Self { 84 - ActorStore { 84 + Self { 85 85 storage: Arc::new(RwLock::new(SqlRepoReader::new(did.clone(), None, conn))), 86 86 record: RecordReader::new(did.clone(), db.clone()), 87 87 pref: PreferenceReader::new(did.clone(), db.clone()), 88 88 did, 89 - blob: BlobReader::new(blobstore, db.clone()), 89 + blob: BlobReader::new(blobstore, db), 90 90 } 91 91 } 92 92 ··· 124 124 Some(write_ops), 125 125 ) 126 126 .await?; 127 - let storage_guard = self.storage.read().await; 128 - storage_guard.apply_commit(commit.clone(), None).await?; 127 + self.storage 128 + .read() 129 + .await 130 + .apply_commit(commit.clone(), None) 131 + .await?; 129 132 let writes = writes 130 133 .into_iter() 131 134 .map(PreparedWrite::Create) ··· 159 162 Some(write_ops), 160 163 ) 161 164 .await?; 162 - let storage_guard = self.storage.read().await; 163 - storage_guard.apply_commit(commit.clone(), None).await?; 165 + self.storage 166 + .read() 167 + .await 168 + .apply_commit(commit.clone(), None) 169 + .await?; 164 170 let write_commit_ops = writes.iter().try_fold( 165 171 Vec::with_capacity(writes.len()), 166 172 |mut acc, w| -> Result<Vec<CommitOp>> { ··· 168 174 acc.push(CommitOp { 169 175 action: CommitAction::Create, 170 176 path: format_data_key(aturi.get_collection(), aturi.get_rkey()), 171 - cid: Some(w.cid.clone()), 177 + cid: Some(w.cid), 172 178 prev: None, 173 179 }); 174 180 Ok(acc) ··· 199 205 .await?; 200 206 } 201 207 // persist the commit to repo storage 202 - let storage_guard = self.storage.read().await; 203 - storage_guard.apply_commit(commit.clone(), None).await?; 208 + self.storage 209 + .read() 210 + .await 211 + .apply_commit(commit.clone(), None) 212 + .await?; 204 213 // process blobs 205 214 self.blob.process_write_blobs(writes).await?; 206 215 Ok(()) ··· 226 235 .await?; 227 236 } 228 237 // persist the commit to repo storage 229 - let storage_guard = self.storage.read().await; 230 - storage_guard 238 + self.storage 239 + .read() 240 + .await 231 241 .apply_commit(commit.commit_data.clone(), None) 232 242 .await?; 233 243 // process blobs ··· 236 246 } 237 247 238 248 pub async fn get_sync_event_data(&mut self) -> Result<SyncEvtData> { 239 - let storage_guard = self.storage.read().await; 240 - let current_root = storage_guard.get_root_detailed().await?; 241 - let blocks_and_missing = storage_guard.get_blocks(vec![current_root.cid]).await?; 249 + let current_root = self.storage.read().await.get_root_detailed().await?; 250 + let blocks_and_missing = self 251 + .storage 252 + .read() 253 + .await 254 + .get_blocks(vec![current_root.cid]) 255 + .await?; 242 256 Ok(SyncEvtData { 243 257 cid: current_root.cid, 244 258 rev: current_root.rev, ··· 264 278 } 265 279 } 266 280 { 267 - let mut storage_guard = self.storage.write().await; 268 - storage_guard.cache_rev(current_root.rev).await?; 281 + self.storage 282 + .write() 283 + .await 284 + .cache_rev(current_root.rev) 285 + .await?; 269 286 } 270 287 let mut new_record_cids: Vec<Cid> = vec![]; 271 288 let mut delete_and_update_uris = vec![]; ··· 306 323 cid, 307 324 prev: None, 308 325 }; 309 - if let Some(_) = current_record { 326 + if current_record.is_some() { 310 327 op.prev = current_record; 311 328 }; 312 329 commit_ops.push(op); ··· 352 369 .collect::<Result<Vec<RecordWriteOp>>>()?; 353 370 // @TODO: Use repo signing key global config 354 371 let secp = Secp256k1::new(); 355 - let repo_private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX").unwrap(); 356 - let repo_secret_key = 357 - SecretKey::from_slice(&hex::decode(repo_private_key.as_bytes()).unwrap()).unwrap(); 372 + let repo_private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX") 373 + .expect("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX not set"); 374 + let repo_secret_key = SecretKey::from_slice( 375 + &hex::decode(repo_private_key.as_bytes()).expect("Failed to decode hex"), 376 + ) 377 + .expect("Failed to create secret key from hex"); 358 378 let repo_signing_key = Keypair::from_secret_key(&secp, &repo_secret_key); 359 379 360 380 let mut commit = repo ··· 393 413 pub async fn index_writes(&self, writes: Vec<PreparedWrite>, rev: &str) -> Result<()> { 394 414 let now: &str = &rsky_common::now(); 395 415 396 - let _ = stream::iter(writes) 397 - .then(|write| async move { 398 - Ok::<(), anyhow::Error>(match write { 399 - PreparedWrite::Create(write) => { 400 - let write_at_uri: AtUri = write.uri.try_into()?; 401 - self.record 402 - .index_record( 403 - write_at_uri.clone(), 404 - write.cid, 405 - Some(write.record), 406 - Some(write.action), 407 - rev.to_owned(), 408 - Some(now.to_string()), 409 - ) 410 - .await? 411 - } 412 - PreparedWrite::Update(write) => { 413 - let write_at_uri: AtUri = write.uri.try_into()?; 414 - self.record 415 - .index_record( 416 - write_at_uri.clone(), 417 - write.cid, 418 - Some(write.record), 419 - Some(write.action), 420 - rev.to_owned(), 421 - Some(now.to_string()), 422 - ) 423 - .await? 424 - } 425 - PreparedWrite::Delete(write) => { 426 - let write_at_uri: AtUri = write.uri.try_into()?; 427 - self.record.delete_record(&write_at_uri).await? 416 + drop( 417 + stream::iter(writes) 418 + .then(async move |write| { 419 + match write { 420 + PreparedWrite::Create(write) => { 421 + let write_at_uri: AtUri = write.uri.try_into()?; 422 + self.record 423 + .index_record( 424 + write_at_uri.clone(), 425 + write.cid, 426 + Some(write.record), 427 + Some(write.action), 428 + rev.to_owned(), 429 + Some(now.to_owned()), 430 + ) 431 + .await?; 432 + } 433 + PreparedWrite::Update(write) => { 434 + let write_at_uri: AtUri = write.uri.try_into()?; 435 + self.record 436 + .index_record( 437 + write_at_uri.clone(), 438 + write.cid, 439 + Some(write.record), 440 + Some(write.action), 441 + rev.to_owned(), 442 + Some(now.to_owned()), 443 + ) 444 + .await?; 445 + } 446 + PreparedWrite::Delete(write) => { 447 + let write_at_uri: AtUri = write.uri.try_into()?; 448 + self.record.delete_record(&write_at_uri).await?; 449 + } 428 450 } 451 + Ok::<(), anyhow::Error>(()) 429 452 }) 430 - }) 431 - .collect::<Vec<_>>() 432 - .await 433 - .into_iter() 434 - .collect::<Result<Vec<_>, _>>()?; 453 + .collect::<Vec<_>>() 454 + .await 455 + .into_iter() 456 + .collect::<Result<Vec<_>, _>>()?, 457 + ); 435 458 Ok(()) 436 459 } 437 460 438 461 pub async fn destroy(&mut self) -> Result<()> { 439 462 let did: String = self.did.clone(); 440 - let storage_guard = self.storage.read().await; 441 463 use rsky_pds::schema::pds::blob::dsl as BlobSchema; 442 464 443 - let blob_rows: Vec<String> = storage_guard 465 + let blob_rows: Vec<String> = self 466 + .storage 467 + .read() 468 + .await 444 469 .db 445 470 .interact(move |conn| { 446 471 BlobSchema::blob ··· 454 479 .into_iter() 455 480 .map(|row| Ok(Cid::from_str(&row)?)) 456 481 .collect::<Result<Vec<Cid>>>()?; 457 - let _ = stream::iter(cids.chunks(500)) 458 - .then(|chunk| async { self.blob.blobstore.delete_many(chunk.to_vec()).await }) 459 - .collect::<Vec<_>>() 460 - .await 461 - .into_iter() 462 - .collect::<Result<Vec<_>, _>>()?; 482 + drop( 483 + stream::iter(cids.chunks(500)) 484 + .then(|chunk| async { self.blob.blobstore.delete_many(chunk.to_vec()).await }) 485 + .collect::<Vec<_>>() 486 + .await 487 + .into_iter() 488 + .collect::<Result<Vec<_>, _>>()?, 489 + ); 463 490 Ok(()) 464 491 } 465 492 ··· 472 499 return Ok(vec![]); 473 500 } 474 501 let did: String = self.did.clone(); 475 - let storage_guard = self.storage.read().await; 476 502 use rsky_pds::schema::pds::record::dsl as RecordSchema; 477 503 478 504 let cid_strs: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); 479 505 let touched_uri_strs: Vec<String> = touched_uris.iter().map(|t| t.to_string()).collect(); 480 - let res: Vec<String> = storage_guard 506 + let res: Vec<String> = self 507 + .storage 508 + .read() 509 + .await 481 510 .db 482 511 .interact(move |conn| { 483 512 RecordSchema::record ··· 490 519 .await 491 520 .expect("Failed to get duplicate record cids")?; 492 521 res.into_iter() 493 - .map(|row| Cid::from_str(&row).map_err(|error| anyhow::Error::new(error))) 522 + .map(|row| Cid::from_str(&row).map_err(anyhow::Error::new)) 494 523 .collect::<Result<Vec<Cid>>>() 495 524 } 496 525 }
+10 -12
src/actor_store/preference.rs
··· 21 21 } 22 22 23 23 impl PreferenceReader { 24 - pub fn new( 24 + pub const fn new( 25 25 did: String, 26 26 db: deadpool_diesel::Pool< 27 27 deadpool_diesel::Manager<SqliteConnection>, 28 28 deadpool_diesel::sqlite::Object, 29 29 >, 30 30 ) -> Self { 31 - PreferenceReader { did, db } 31 + Self { did, db } 32 32 } 33 33 34 34 pub async fn get_preferences( ··· 50 50 .load(conn)?; 51 51 let account_prefs = prefs_res 52 52 .into_iter() 53 - .filter(|pref| match &namespace { 54 - None => true, 55 - Some(namespace) => pref_match_namespace(namespace, &pref.name), 53 + .filter(|pref| { 54 + namespace 55 + .as_ref() 56 + .is_none_or(|namespace| pref_match_namespace(namespace, &pref.name)) 56 57 }) 57 58 .filter(|pref| pref_in_scope(scope.clone(), pref.name.clone())) 58 59 .map(|pref| { ··· 88 89 { 89 90 false => bail!("Some preferences are not in the {namespace} namespace"), 90 91 true => { 91 - let not_in_scope = values 92 - .iter() 93 - .filter(|value| !pref_in_scope(scope.clone(), value.get_type())) 94 - .collect::<Vec<&RefPreferences>>(); 95 - if !not_in_scope.is_empty() { 92 + if values 93 + .iter().any(|value| !pref_in_scope(scope.clone(), value.get_type())) { 96 94 tracing::info!( 97 95 "@LOG: PreferenceReader::put_preferences() debug scope: {:?}, values: {:?}", 98 96 scope, ··· 125 123 .collect::<Vec<i32>>(); 126 124 // replace all prefs in given namespace 127 125 if !all_pref_ids_in_namespace.is_empty() { 128 - delete(AccountPrefSchema::account_pref) 126 + _ = delete(AccountPrefSchema::account_pref) 129 127 .filter(AccountPrefSchema::id.eq_any(all_pref_ids_in_namespace)) 130 128 .execute(conn)?; 131 129 } 132 130 if !put_prefs.is_empty() { 133 - insert_into(AccountPrefSchema::account_pref) 131 + _ = insert_into(AccountPrefSchema::account_pref) 134 132 .values( 135 133 put_prefs 136 134 .into_iter()
+36 -45
src/actor_store/record.rs
··· 31 31 32 32 impl RecordReader { 33 33 /// Create a new record handler. 34 - pub(crate) fn new( 34 + pub(crate) const fn new( 35 35 did: String, 36 36 db: deadpool_diesel::Pool< 37 37 deadpool_diesel::Manager<SqliteConnection>, ··· 93 93 use rsky_pds::schema::pds::record::dsl as RecordSchema; 94 94 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 95 95 96 - let include_soft_deleted: bool = if let Some(include_soft_deleted) = include_soft_deleted { 97 - include_soft_deleted 98 - } else { 99 - false 100 - }; 96 + let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 101 97 let mut builder = RecordSchema::record 102 98 .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) 103 99 .limit(limit) ··· 156 152 use rsky_pds::schema::pds::record::dsl as RecordSchema; 157 153 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 158 154 159 - let include_soft_deleted: bool = if let Some(include_soft_deleted) = include_soft_deleted { 160 - include_soft_deleted 161 - } else { 162 - false 163 - }; 155 + let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 164 156 let mut builder = RecordSchema::record 165 157 .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) 166 158 .select((Record::as_select(), RepoBlock::as_select())) ··· 201 193 ) -> Result<bool> { 202 194 use rsky_pds::schema::pds::record::dsl as RecordSchema; 203 195 204 - let include_soft_deleted: bool = if let Some(include_soft_deleted) = include_soft_deleted { 205 - include_soft_deleted 206 - } else { 207 - false 208 - }; 196 + let include_soft_deleted: bool = include_soft_deleted.unwrap_or(false); 209 197 let mut builder = RecordSchema::record 210 198 .select(RecordSchema::uri) 211 199 .filter(RecordSchema::uri.eq(uri)) ··· 223 211 .interact(move |conn| builder.first::<String>(conn).optional()) 224 212 .await 225 213 .expect("Failed to check record")?; 226 - Ok(!!record_uri.is_some()) 214 + Ok(record_uri.is_some()) 227 215 } 228 216 229 217 /// Get the takedown status of a record. ··· 246 234 }) 247 235 .await 248 236 .expect("Failed to get takedown status")?; 249 - if let Some(res) = res { 250 - if let Some(takedown_ref) = res { 251 - Ok(Some(StatusAttr { 252 - applied: true, 253 - r#ref: Some(takedown_ref), 254 - })) 255 - } else { 256 - Ok(Some(StatusAttr { 257 - applied: false, 258 - r#ref: None, 259 - })) 260 - } 261 - } else { 262 - Ok(None) 263 - } 237 + res.map_or_else( 238 + || Ok(None), 239 + |res| { 240 + res.map_or_else( 241 + || { 242 + Ok(Some(StatusAttr { 243 + applied: false, 244 + r#ref: None, 245 + })) 246 + }, 247 + |takedown_ref| { 248 + Ok(Some(StatusAttr { 249 + applied: true, 250 + r#ref: Some(takedown_ref), 251 + })) 252 + }, 253 + ) 254 + }, 255 + ) 264 256 } 265 257 266 258 /// Get the current CID for a record URI. ··· 373 365 let rkey = uri.get_rkey(); 374 366 let hostname = uri.get_hostname().to_string(); 375 367 let action = action.unwrap_or(WriteOpAction::Create); 376 - let indexed_at = timestamp.unwrap_or_else(|| rsky_common::now()); 368 + let indexed_at = timestamp.unwrap_or_else(rsky_common::now); 377 369 let row = Record { 378 370 did: self.did.clone(), 379 371 uri: uri.to_string(), ··· 401 393 .get() 402 394 .await? 403 395 .interact(move |conn| { 404 - insert_into(RecordSchema::record) 396 + _ = insert_into(RecordSchema::record) 405 397 .values(row) 406 398 .on_conflict(RecordSchema::uri) 407 399 .do_update() ··· 419 411 if let Some(record) = record { 420 412 // Maintain backlinks 421 413 let backlinks = get_backlinks(&uri, &record)?; 422 - if let WriteOpAction::Update = action { 414 + if action == WriteOpAction::Update { 423 415 // On update just recreate backlinks from scratch for the record, so we can clear out 424 416 // the old ones. E.g. for weird cases like updating a follow to be for a different did. 425 417 self.remove_backlinks_by_uri(&uri).await?; ··· 441 433 .get() 442 434 .await? 443 435 .interact(move |conn| { 444 - delete(RecordSchema::record) 436 + _ = delete(RecordSchema::record) 445 437 .filter(RecordSchema::uri.eq(&uri)) 446 438 .execute(conn)?; 447 - delete(BacklinkSchema::backlink) 439 + _ = delete(BacklinkSchema::backlink) 448 440 .filter(BacklinkSchema::uri.eq(&uri)) 449 441 .execute(conn)?; 450 442 tracing::debug!( ··· 464 456 .get() 465 457 .await? 466 458 .interact(move |conn| { 467 - delete(BacklinkSchema::backlink) 459 + _ = delete(BacklinkSchema::backlink) 468 460 .filter(BacklinkSchema::uri.eq(uri)) 469 461 .execute(conn)?; 470 462 Ok(()) ··· 475 467 476 468 /// Add backlinks to the database. 477 469 pub(crate) async fn add_backlinks(&self, backlinks: Vec<Backlink>) -> Result<()> { 478 - if backlinks.len() == 0 { 470 + if backlinks.is_empty() { 479 471 Ok(()) 480 472 } else { 481 473 use rsky_pds::schema::pds::backlink::dsl as BacklinkSchema; ··· 483 475 .get() 484 476 .await? 485 477 .interact(move |conn| { 486 - insert_or_ignore_into(BacklinkSchema::backlink) 478 + _ = insert_or_ignore_into(BacklinkSchema::backlink) 487 479 .values(&backlinks) 488 480 .execute(conn)?; 489 481 Ok(()) ··· 502 494 use rsky_pds::schema::pds::record::dsl as RecordSchema; 503 495 504 496 let takedown_ref: Option<String> = match takedown.applied { 505 - true => match takedown.r#ref { 506 - Some(takedown_ref) => Some(takedown_ref), 507 - None => Some(rsky_common::now()), 508 - }, 497 + true => takedown 498 + .r#ref 499 + .map_or_else(|| Some(rsky_common::now()), Some), 509 500 false => None, 510 501 }; 511 502 let uri_string = uri.to_string(); ··· 514 505 .get() 515 506 .await? 516 507 .interact(move |conn| { 517 - update(RecordSchema::record) 508 + _ = update(RecordSchema::record) 518 509 .filter(RecordSchema::uri.eq(uri_string)) 519 510 .set(RecordSchema::takedownRef.eq(takedown_ref)) 520 511 .execute(conn)?;
+17 -11
src/actor_store/sql_blob.rs
··· 2 2 #![expect( 3 3 clippy::pub_use, 4 4 clippy::single_char_lifetime_names, 5 - unused_qualifications 5 + unused_qualifications, 6 + unnameable_types 6 7 )] 7 8 use anyhow::{Context, Result}; 8 9 use cidv10::Cid; ··· 14 15 } 15 16 16 17 impl ByteStream { 17 - pub fn new(bytes: Vec<u8>) -> Self { 18 + pub const fn new(bytes: Vec<u8>) -> Self { 18 19 Self { bytes } 19 20 } 20 21 ··· 60 61 61 62 impl BlobStoreSql { 62 63 /// Create a new SQL-based blob store for the given DID 63 - pub fn new( 64 + pub const fn new( 64 65 did: String, 65 66 db: deadpool_diesel::Pool< 66 67 deadpool_diesel::Manager<SqliteConnection>, 67 68 deadpool_diesel::sqlite::Object, 68 69 >, 69 70 ) -> Self { 70 - BlobStoreSql { db, did } 71 + Self { db, did } 71 72 } 72 73 73 74 // /// Create a factory function for blob stores ··· 92 93 self.put_permanent_with_mime( 93 94 Cid::try_from(format!("bafy{}", key)).unwrap_or_else(|_| Cid::default()), 94 95 bytes, 95 - "application/octet-stream".to_string(), 96 + "application/octet-stream".to_owned(), 96 97 ) 97 98 .await?; 98 99 ··· 118 119 let bytes_len = bytes.len() as i32; 119 120 120 121 // Store directly in the database 121 - self.db 122 + _ = self 123 + .db 122 124 .get() 123 125 .await? 124 126 .interact(move |conn| { ··· 148 150 149 151 /// Store a blob directly as permanent 150 152 pub async fn put_permanent(&self, cid: Cid, bytes: Vec<u8>) -> Result<()> { 151 - self.put_permanent_with_mime(cid, bytes, "application/octet-stream".to_string()) 153 + self.put_permanent_with_mime(cid, bytes, "application/octet-stream".to_owned()) 152 154 .await 153 155 } 154 156 ··· 158 160 let did_clone = self.did.clone(); 159 161 160 162 // Update the quarantine flag in the database 161 - self.db 163 + _ = self 164 + .db 162 165 .get() 163 166 .await? 164 167 .interact(move |conn| { ··· 181 184 let did_clone = self.did.clone(); 182 185 183 186 // Update the quarantine flag in the database 184 - self.db 187 + _ = self 188 + .db 185 189 .get() 186 190 .await? 187 191 .interact(move |conn| { ··· 248 252 let did_clone = self.did.clone(); 249 253 250 254 // Delete from database 251 - self.db 255 + _ = self 256 + .db 252 257 .get() 253 258 .await? 254 259 .interact(move |conn| { ··· 272 277 let did_clone = self.did.clone(); 273 278 274 279 // Delete all blobs in one operation 275 - self.db 280 + _ = self 281 + .db 276 282 .get() 277 283 .await? 278 284 .interact(move |conn| {
+16 -10
src/actor_store/sql_repo.rs
··· 50 50 cid: &'life Cid, 51 51 ) -> Pin<Box<dyn Future<Output = Result<Option<Vec<u8>>>> + Send + Sync + 'life>> { 52 52 let did: String = self.did.clone(); 53 - let cid = cid.clone(); 53 + let cid = *cid; 54 54 55 55 Box::pin(async move { 56 56 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 57 57 let cached = { 58 58 let cache_guard = self.cache.read().await; 59 - cache_guard.get(cid).map(|v| v.clone()) 59 + cache_guard.get(cid).cloned() 60 60 }; 61 61 if let Some(cached_result) = cached { 62 - return Ok(Some(cached_result.clone())); 62 + return Ok(Some(cached_result)); 63 63 } 64 64 65 65 let found: Option<Vec<u8>> = self ··· 120 120 let blocks = Arc::new(tokio::sync::Mutex::new(BlockMap::new())); 121 121 let missing_set = Arc::new(tokio::sync::Mutex::new(missing)); 122 122 123 - let _: Vec<_> = stream::iter(missing_strings.chunks(500)) 123 + let stream: Vec<_> = stream::iter(missing_strings.chunks(500)) 124 124 .then(|batch| { 125 125 let this_did = did.clone(); 126 126 let blocks = Arc::clone(&blocks); ··· 156 156 }) 157 157 .try_collect() 158 158 .await?; 159 + drop(stream); 159 160 160 161 // Extract values from synchronization primitives 161 162 let mut blocks = Arc::try_unwrap(blocks) ··· 203 204 Box::pin(async move { 204 205 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 205 206 206 - self.db 207 + _ = self 208 + .db 207 209 .interact(move |conn| { 208 210 insert_into(RepoBlockSchema::repo_block) 209 211 .values(( ··· 251 253 blocks.chunks(50).map(|chunk| chunk.to_vec()).collect(); 252 254 253 255 for batch in chunks { 254 - self.db 256 + _ = self 257 + .db 255 258 .interact(move |conn| { 256 259 insert_or_ignore_into(RepoBlockSchema::repo_block) 257 260 .values(&batch) ··· 278 281 279 282 let is_create = is_create.unwrap_or(false); 280 283 if is_create { 281 - self.db 284 + _ = self 285 + .db 282 286 .interact(move |conn| { 283 287 insert_into(RepoRootSchema::repo_root) 284 288 .values(( ··· 292 296 .await 293 297 .expect("Failed to create root")?; 294 298 } else { 295 - self.db 299 + _ = self 300 + .db 296 301 .interact(move |conn| { 297 302 update(RepoRootSchema::repo_root) 298 303 .filter(RepoRootSchema::did.eq(did)) ··· 329 334 impl SqlRepoReader { 330 335 pub fn new(did: String, now: Option<String>, db: deadpool_diesel::sqlite::Object) -> Self { 331 336 let now = now.unwrap_or_else(rsky_common::now); 332 - SqlRepoReader { 337 + Self { 333 338 cache: Arc::new(RwLock::new(BlockMap::new())), 334 339 root: None, 335 340 rev: None, ··· 463 468 use rsky_pds::schema::pds::repo_block::dsl as RepoBlockSchema; 464 469 465 470 let cid_strings: Vec<String> = cids.into_iter().map(|c| c.to_string()).collect(); 466 - self.db 471 + _ = self 472 + .db 467 473 .interact(move |conn| { 468 474 delete(RepoBlockSchema::repo_block) 469 475 .filter(RepoBlockSchema::did.eq(did))
+3 -3
src/auth.rs
··· 341 341 use crate::schema::pds::oauth_used_jtis::dsl as JtiSchema; 342 342 343 343 // Check if JTI has been used before 344 - let jti_string = jti.to_string(); 344 + let jti_string = jti.to_owned(); 345 345 let jti_used = state 346 346 .db 347 347 .get() ··· 372 372 .unwrap_or_else(|| timestamp.checked_add(60).unwrap_or(timestamp)); 373 373 374 374 // Convert SQLx INSERT to Diesel 375 - let jti_str = jti.to_string(); 375 + let jti_str = jti.to_owned(); 376 376 let thumbprint_str = calculated_thumbprint.to_string(); 377 - state 377 + let _ = state 378 378 .db 379 379 .get() 380 380 .await
+8 -10
src/main.rs
··· 437 437 let path_blob = path_repo.replace("repo", "blob"); 438 438 let actor_blob_pool = 439 439 establish_pool(&path_blob).context("failed to create database connection pool")?; 440 - actor_pools.insert( 440 + drop(actor_pools.insert( 441 441 did.to_string(), 442 442 ActorPools { 443 443 repo: actor_repo_pool, 444 444 blob: actor_blob_pool, 445 445 }, 446 - ); 446 + )); 447 447 } 448 448 } 449 449 // Apply pending migrations ··· 494 494 total_count: i32, 495 495 } 496 496 497 - // let result = diesel::sql_query( 498 - // "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count", 499 - // ) 500 - // .get_result::<TotalCount>(conn) 501 - // .context("failed to query database")?; 502 497 let result = conn.interact(move |conn| { 503 498 diesel::sql_query( 504 499 "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count", ··· 515 510 let uuid = Uuid::new_v4().to_string(); 516 511 517 512 let uuid_clone = uuid.clone(); 518 - conn.interact(move |conn| { 519 - diesel::sql_query( 513 + _ = conn 514 + .interact(move |conn| { 515 + diesel::sql_query( 520 516 "INSERT INTO invites (id, did, count, created_at) VALUES (?, NULL, 1, datetime('now'))", 521 517 ) 522 518 .bind::<diesel::sql_types::Text, _>(uuid_clone) 523 519 .execute(conn) 524 520 .context("failed to create new invite code") 525 521 .expect("should be able to create invite code") 526 - }); 522 + }) 523 + .await 524 + .expect("should be able to create invite code"); 527 525 528 526 // N.B: This is a sensitive message, so we're bypassing `tracing` here and 529 527 // logging it directly to console.
+1 -1
src/oauth.rs
··· 1 1 //! OAuth endpoints 2 - 2 + #![allow(unnameable_types, unused_qualifications)] 3 3 use crate::metrics::AUTH_FAILED; 4 4 use crate::{AppConfig, AppState, Client, Error, Result, SigningKey}; 5 5 use anyhow::{Context as _, anyhow};
+1
src/schema.rs
··· 1 + #![allow(unnameable_types, unused_qualifications)] 1 2 pub mod pds { 2 3 3 4 // Legacy tables