Alternative ATProto PDS implementation

additional lints

+115 -1
Cargo.toml
··· 1 1 [package] 2 2 name = "bluepds" 3 3 version = "0.0.0" 4 - edition = "2021" 4 + edition = "2024" 5 5 publish = false 6 + authors = ["Justin Moore", "Timothy Quilling <teqed@shatteredsky.net>"] 7 + description = "Alternative ATProto PDS implementation" 8 + license = "" 9 + readme = "README.md" 10 + repository = "https://github.com/DrChat/bluepds" 11 + keywords = ["atproto", "pds"] 12 + categories = [] 13 + 14 + [profile.dev.package."*"] 15 + opt-level = 3 16 + 17 + [profile.dev] 18 + opt-level = 1 19 + 20 + [profile.release] 21 + opt-level = "s" # Slightly slows compile times, great improvements to file size and runtime performance. 22 + lto = "thin" # Do a second optimization pass over the entire program, including dependencies. 23 + codegen-units = 1 # Compile the entire crate as one unit. 24 + strip = "debuginfo" # Strip all debugging information from the binary to slightly reduce file size. 25 + 26 + [lints.rust] 27 + ## Groups 28 + # warnings = { level = "warn", priority = -1 } # All lints that are set to issue warnings 29 + # deprecated-safe = { level = "warn", priority = -1 } # Lints for functions which were erroneously marked as safe in the past 30 + # future-incompatible = { level = "warn", priority = -1 } # Lints that detect code that has future-compatibility problems 31 + # keyword-idents = { level = "warn", priority = -1 } # Lints that detect identifiers which will be come keywords in later editions 32 + # let-underscore = { level = "warn", priority = -1 } # Lints that detect wildcard let bindings that are likely to be invalid 33 + # nonstandard-style = { level = "warn", priority = -1 } # Violation of standard naming conventions 34 + # refining-impl-trait = { level = "warn", priority = -1 } # Detects refinement of impl Trait return types by trait implementations 35 + # rust-2018-compatibility = { level = "warn", priority = -1 } # Lints used to transition code from the 2015 edition to 2018 36 + # rust-2021-compatibility = { level = "warn", priority = -1 } # Lints used to transition code from the 2018 edition to 2021 37 + # rust-2018-idioms = { level = "warn", priority = -1 } # Lints to nudge you toward idiomatic features of Rust 2018 38 + # rust-2024-compatibility = { level = "warn", priority = -1 } # Lints used to transition code from the 2021 edition to 2024 39 + # unused = { level = "warn", priority = -1 } # Lints that detect things being declared but not used, or excess syntax 40 + ## Individual 41 + ambiguous_negative_literals = "warn" # checks for cases that are confusing between a negative literal and a negation that's not part of the literal. 42 + closure_returning_async_block = "warn" # detects cases where users write a closure that returns an async block. # nightly 43 + ffi_unwind_calls = "warn" 44 + # fuzzy_provenance_casts = "warn" # unstable 45 + # lossy_provenance_casts = "warn" # unstable 46 + macro_use_extern_crate = "warn" 47 + meta_variable_misuse = "warn" 48 + missing_abi = "warn" 49 + missing_copy_implementations = "allow" # detects potentially-forgotten implementations of Copy for public types. 50 + missing_debug_implementations = "allow" # detects missing implementations of fmt::Debug for public types. 51 + missing_docs = "warn" 52 + # multiple_supertrait_upcastable = "warn" # unstable 53 + # must_not_suspend = "warn" # unstable 54 + non_ascii_idents = "warn" 55 + # non_exhaustive_omitted_patterns = "warn" # unstable 56 + redundant_imports = "warn" 57 + redundant_lifetimes = "warn" 58 + rust_2024_incompatible_pat = "warn" # nightly 59 + single_use_lifetimes = "warn" 60 + trivial_casts = "warn" 61 + trivial_numeric_casts = "warn" 62 + unit_bindings = "warn" 63 + unnameable_types = "warn" 64 + # unqualified_local_imports = "warn" # unstable 65 + unreachable_pub = "warn" 66 + unsafe_code = "forbid" 67 + unstable_features = "warn" 68 + # unused_crate_dependencies = "warn" 69 + unused_import_braces = "warn" 70 + unused_lifetimes = "warn" 71 + unused_qualifications = "warn" 72 + unused_results = "warn" 73 + variant_size_differences = "warn" 74 + elided_lifetimes_in_paths = "allow" 75 + # unstable-features = "allow" 76 + 77 + [lints.clippy] 78 + # Groups 79 + # nursery = { level = "warn", priority = -1 } 80 + # correctness = { level = "warn", priority = -1 } 81 + # suspicious = { level = "warn", priority = -1 } 82 + # complexity = { level = "warn", priority = -1 } 83 + # perf = { level = "warn", priority = -1 } 84 + # style = { level = "warn", priority = -1 } 85 + 86 + # pedantic = { level = "warn", priority = -1 } 87 + # restriction = { level = "warn", priority = -1 } 88 + cargo = { level = "warn", priority = -1 } 89 + # Temporary Allows 90 + single_call_fn = "allow" 91 + multiple_crate_versions = "allow" 92 + expect_used = "allow" 93 + # Style Allows 94 + implicit_return = "allow" 95 + self_named_module_files = "allow" 96 + else_if_without_else = "allow" 97 + std_instead_of_alloc = "allow" 98 + std_instead_of_core = "allow" 99 + blanket_clippy_restriction_lints = "allow" 100 + float_arithmetic = "allow" 101 + redundant_pub_crate = "allow" 102 + pub_with_shorthand = "allow" 103 + absolute_paths = "allow" 104 + module_name_repetitions = "allow" 105 + missing_trait_methods = "allow" 106 + separated_literal_suffix = "allow" 107 + exhaustive_structs = "allow" 108 + field_scoped_visibility_modifiers = "allow" 109 + allow_attributes_without_reason = "allow" 110 + # Warns 111 + missing_docs_in_private_items = "warn" 112 + use_self = "warn" 113 + str_to_string = "warn" 114 + print_stdout = "warn" 115 + unseparated_literal_suffix = "warn" 116 + unwrap_used = "warn" 117 + # Denys 118 + enum_glob_use = "deny" 119 + # expect_used = "deny" 6 120 7 121 [dependencies] 8 122 atrium-api = "0.25"
+1
build.rs
··· 1 + //! This file is used to tell cargo to recompile the project if the migrations folder changes. 1 2 fn main() { 2 3 println!("cargo:rerun-if-changed=migrations"); 3 4 }
+4 -3
src/auth.rs
··· 19 19 } 20 20 21 21 impl AuthenticatedUser { 22 + /// Get the DID of the authenticated user. 22 23 pub(crate) fn did(&self) -> String { 23 24 self.did.clone() 24 25 } ··· 84 85 .await 85 86 .with_context(|| format!("failed to query account {did}"))?; 86 87 87 - Ok(AuthenticatedUser { 88 - did: did.to_string(), 88 + Ok(Self { 89 + did: did.to_owned(), 89 90 }) 90 91 } else { 91 92 Err(Error::with_status( ··· 148 149 .context("failed to decode claims")?; 149 150 let claims = serde_json::from_slice(&claims).context("failed to parse claims as json")?; 150 151 151 - Ok((typ.to_string(), claims)) 152 + Ok((typ.to_owned(), claims)) 152 153 }
+4
src/config.rs
··· 1 + //! Configuration structures for the PDS. 1 2 use std::{net::SocketAddr, path::PathBuf}; 2 3 3 4 use serde::Deserialize; 4 5 use url::Url; 5 6 7 + /// The metrics configuration. 6 8 pub(crate) mod metrics { 7 9 use super::*; 8 10 9 11 #[derive(Deserialize, Debug, Clone)] 12 + /// The Prometheus configuration. 10 13 pub(crate) struct PrometheusConfig { 11 14 /// The URL of the Prometheus server's exporter endpoint. 12 15 pub url: Url, ··· 16 19 #[derive(Deserialize, Debug, Clone)] 17 20 #[serde(tag = "type")] 18 21 pub(crate) enum MetricConfig { 22 + /// The Prometheus push gateway. 19 23 PrometheusPush(metrics::PrometheusConfig), 20 24 } 21 25
+4
src/did.rs
··· 1 + //! DID utilities. 2 + 1 3 use anyhow::{Context, Result, bail}; 2 4 use atrium_api::types::string::Did; 3 5 use serde::{Deserialize, Serialize}; ··· 10 12 11 13 #[derive(Clone, Debug, Serialize, Deserialize)] 12 14 #[serde(rename_all = "camelCase")] 15 + /// DID verification method. 13 16 pub(crate) struct DidVerificationMethod { 14 17 pub id: String, 15 18 #[serde(rename = "type")] ··· 29 32 30 33 #[derive(Clone, Debug, Serialize, Deserialize)] 31 34 #[serde(rename_all = "camelCase")] 35 + /// DID document. 32 36 pub(crate) struct DidDocument { 33 37 #[serde(rename = "@context", skip_serializing_if = "Vec::is_empty")] 34 38 pub context: Vec<Url>,
+9 -9
src/endpoints/identity.rs
··· 34 34 .fetch_one(&db) 35 35 .await 36 36 { 37 - let did = atrium_api::types::string::Did::new(did).unwrap(); 37 + let did = atrium_api::types::string::Did::new(did).expect("should be valid DID format"); 38 38 return Ok(Json(identity::resolve_handle::OutputData { did }.into())); 39 39 } 40 40 ··· 85 85 ) -> Result<()> { 86 86 let handle = input.handle.as_str(); 87 87 let did_str = user.did(); 88 - let did = atrium_api::types::string::Did::new(user.did()).unwrap(); 88 + let did = atrium_api::types::string::Did::new(user.did()).expect("should be valid DID format"); 89 89 90 90 let existing_did = sqlx::query_scalar!(r#"SELECT did FROM handles WHERE handle = ?"#, handle) 91 91 .fetch_optional(&db) ··· 113 113 .with_context(|| format!("failed to resolve DID for {did_str}"))?; 114 114 115 115 let op = PlcOperation { 116 - typ: "plc_operation".to_string(), 116 + typ: "plc_operation".to_owned(), 117 117 rotation_keys: vec![rkey.did().to_string()], 118 - verification_methods: HashMap::from([("atproto".to_string(), skey.did().to_string())]), 119 - also_known_as: vec![input.handle.as_str().to_string()], 118 + verification_methods: HashMap::from([("atproto".to_owned(), skey.did().to_string())]), 119 + also_known_as: vec![input.handle.as_str().to_owned()], 120 120 services: HashMap::from([( 121 - "atproto_pds".to_string(), 121 + "atproto_pds".to_owned(), 122 122 PlcService::Pds { 123 123 endpoint: config.host_name.clone(), 124 124 }, ··· 137 137 } 138 138 139 139 // FIXME: Properly abstract these implementation details. 140 - let did_hash = did_str.strip_prefix("did:plc:").unwrap(); 140 + let did_hash = did_str.strip_prefix("did:plc:").expect("should be valid DID format"); 141 141 let doc = tokio::fs::File::options() 142 142 .read(true) 143 143 .write(true) ··· 157 157 158 158 let cid_str = plc_cid.to_string(); 159 159 160 - sqlx::query!( 160 + _ = sqlx::query!( 161 161 r#"UPDATE accounts SET plc_root = ? WHERE did = ?"#, 162 162 cid_str, 163 163 did_str ··· 170 170 fhp.identity( 171 171 atrium_api::com::atproto::sync::subscribe_repos::IdentityData { 172 172 did: did.clone(), 173 - handle: Some(Handle::new(handle.to_string()).unwrap()), 173 + handle: Some(Handle::new(handle.to_owned()).expect("should be valid handle")), 174 174 seq: 0, // Filled by firehose later. 175 175 time: Datetime::now(), 176 176 },
+4 -1
src/endpoints/mod.rs
··· 1 + //! Root module for all endpoints. 1 2 use axum::{Json, Router, routing::get}; 2 3 use serde_json::json; 3 4 ··· 8 9 mod server; 9 10 mod sync; 10 11 12 + /// Health check endpoint. Returns name and version of the service. 11 13 pub(crate) async fn health() -> Result<Json<serde_json::Value>> { 12 14 Ok(Json(json!({ 13 - "version": "bluepds" 15 + "version": concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION")), 14 16 }))) 15 17 } 16 18 19 + /// Register all root routes. 17 20 pub(crate) fn routes() -> Router<AppState> { 18 21 Router::new() 19 22 .route("/_health", get(health))
+21 -21
src/endpoints/repo.rs
··· 70 70 // If the swap failed, indicate as such. 71 71 Ok(r.rows_affected() != 0) 72 72 } else { 73 - sqlx::query!( 73 + _ = sqlx::query!( 74 74 r#"UPDATE accounts SET root = ?, rev = ? WHERE did = ?"#, 75 75 cid_str, 76 76 rev_str, ··· 99 99 .await 100 100 .context("failed to query did")?; 101 101 102 - (handle.to_string(), did) 102 + (handle.to_owned(), did) 103 103 } 104 104 AtIdentifier::Did(did) => { 105 105 let did = did.as_str(); ··· 108 108 .await 109 109 .context("failed to query did")?; 110 110 111 - (handle, did.to_string()) 111 + (handle, did.to_owned()) 112 112 } 113 113 }; 114 114 115 115 Ok(( 116 - atrium_api::types::string::Did::new(did).unwrap(), 117 - atrium_api::types::string::Handle::new(handle).unwrap(), 116 + atrium_api::types::string::Did::new(did).expect("should be valid DID"), 117 + atrium_api::types::string::Handle::new(handle).expect("should be valid handle"), 118 118 )) 119 119 } 120 120 ··· 136 136 let (ty, rf) = (map.get("$type"), map.get("ref")); 137 137 138 138 if let (Some(ty), Some(rf)) = (ty, rf) { 139 - if ty == &serde_json::Value::String("blob".to_string()) { 139 + if ty == &serde_json::Value::String("blob".to_owned()) { 140 140 if let Ok(rf) = serde_json::from_value::<BlobRef>(rf.clone()) { 141 141 cids.push(Cid::from_str(&rf.link).context("failed to convert cid")?); 142 142 } ··· 312 312 .sign(&builder.bytes()) 313 313 .context("failed to sign commit")?; 314 314 315 - builder 315 + _ = builder 316 316 .finalize(sig) 317 317 .await 318 318 .context("failed to write signed commit")?; ··· 346 346 .context("failed to swap commit")? 347 347 { 348 348 // This should always succeed. 349 - let old = input.swap_commit.clone().unwrap(); 349 + let old = input.swap_commit.clone().expect("swap_commit should always be Some"); 350 350 351 351 // The swap failed. Return the old commit and do not update the repository. 352 352 return Ok(Json( ··· 371 371 match op { 372 372 RepoOp::Update { path, .. } | RepoOp::Delete { path, .. } => { 373 373 // FIXME: This may cause issues if a user deletes more than one record referencing the same blob. 374 - sqlx::query!( 374 + _ = sqlx::query!( 375 375 r#"UPDATE blob_ref SET record = NULL WHERE did = ? AND record = ?"#, 376 376 did_str, 377 377 path ··· 398 398 399 399 // Handle the case where a new record references an existing blob. 400 400 if r.rows_affected() == 0 { 401 - sqlx::query!( 401 + _ = sqlx::query!( 402 402 r#"INSERT INTO blob_ref (record, cid, did) VALUES (?, ?, ?)"#, 403 403 key, 404 404 cid_str, ··· 432 432 ops, 433 433 cid: repo.root(), 434 434 rev: repo.commit().rev().to_string(), 435 - did: atrium_api::types::string::Did::new(user.did()).unwrap(), 435 + did: atrium_api::types::string::Did::new(user.did()).expect("should be valid DID"), 436 436 blobs: blobs.into_iter().map(|(_, c)| c).collect::<Vec<_>>(), 437 437 }) 438 438 .await; ··· 504 504 cid: res.cid.clone(), 505 505 commit: r.commit.clone(), 506 506 uri: res.uri.clone(), 507 - validation_status: Some("unknown".to_string()), 507 + validation_status: Some("unknown".to_owned()), 508 508 } 509 509 .into(), 510 510 )) ··· 564 564 cid: res.cid.clone(), 565 565 commit: r.commit, 566 566 uri: res.uri.clone(), 567 - validation_status: Some("unknown".to_string()), 567 + validation_status: Some("unknown".to_owned()), 568 568 } 569 569 .into(), 570 570 )) ··· 632 632 let mut it = Box::pin(tree.keys()); 633 633 while let Some(key) = it.try_next().await.context("failed to iterate repo keys")? { 634 634 if let Some((collection, _rkey)) = key.split_once('/') { 635 - collections.insert(collection.to_string()); 635 + _ = collections.insert(collection.to_owned()); 636 636 } 637 637 } 638 638 ··· 640 640 repo::describe_repo::OutputData { 641 641 collections: collections 642 642 .into_iter() 643 - .map(|s| Nsid::new(s).unwrap()) 643 + .map(|s| Nsid::new(s).expect("should be valid NSID")) 644 644 .collect::<Vec<_>>(), 645 645 did: did.clone(), 646 646 did_doc: Unknown::Null, // TODO: Fetch the DID document from the PLC directory ··· 688 688 repo::get_record::OutputData { 689 689 cid: cid.map(atrium_api::types::string::Cid::new), 690 690 uri, 691 - value: record.try_into_unknown().unwrap(), 691 + value: record.try_into_unknown().expect("should be valid JSON"), 692 692 } 693 693 .into(), 694 694 )) ··· 763 763 repo::list_records::RecordData { 764 764 cid: atrium_api::types::string::Cid::new(*cid), 765 765 uri: format!("at://{}/{}", did.as_str(), key), 766 - value: value.try_into_unknown().unwrap(), 766 + value: value.try_into_unknown().expect("should be valid JSON"), 767 767 } 768 768 .into(), 769 769 ) ··· 798 798 .context("no content-type provided")? 799 799 .to_str() 800 800 .context("invalid content-type provided")? 801 - .to_string(); 801 + .to_owned(); 802 802 803 803 if length > config.blob.limit { 804 804 return Err(Error::with_status( ··· 816 816 .await 817 817 .context("failed to create temporary file")?; 818 818 819 - let mut len = 0usize; 819 + let mut len = 0_usize; 820 820 let mut sha = Sha256::new(); 821 821 let mut stream = request.into_body().into_data_stream(); 822 822 while let Some(bytes) = stream.try_next().await.context("failed to receive file")? { ··· 847 847 848 848 let cid = Cid::new_v1( 849 849 IPLD_RAW, 850 - atrium_repo::Multihash::wrap(IPLD_MH_SHA2_256, hash.as_slice()).unwrap(), 850 + atrium_repo::Multihash::wrap(IPLD_MH_SHA2_256, hash.as_slice()).expect("should be valid hash"), 851 851 ); 852 852 853 853 let cid_str = cid.to_string(); ··· 861 861 862 862 let did_str = user.did(); 863 863 864 - sqlx::query!( 864 + _ = sqlx::query!( 865 865 r#"INSERT INTO blob_ref (cid, did, record) VALUES (?, ?, NULL)"#, 866 866 cid_str, 867 867 did_str
+17 -17
src/endpoints/server.rs
··· 134 134 // Account can be created. Synthesize a new DID for the user. 135 135 // https://github.com/did-method-plc/did-method-plc?tab=readme-ov-file#did-creation 136 136 let op = PlcOperation { 137 - typ: "plc_operation".to_string(), 137 + typ: "plc_operation".to_owned(), 138 138 rotation_keys: recovery_keys, 139 - verification_methods: HashMap::from([("atproto".to_string(), skey.did().to_string())]), 139 + verification_methods: HashMap::from([("atproto".to_owned(), skey.did().to_string())]), 140 140 also_known_as: vec![format!("at://{}", input.handle.as_str())], 141 141 services: HashMap::from([( 142 - "atproto_pds".to_string(), 142 + "atproto_pds".to_owned(), 143 143 PlcService::Pds { 144 144 endpoint: format!("https://{}", config.host_name), 145 145 }, ··· 198 198 .await 199 199 .context("failed to create carstore")?; 200 200 201 - let repo_builder = Repository::create(&mut store, Did::from_str(&did).unwrap()) 201 + let repo_builder = Repository::create(&mut store, Did::from_str(&did).expect("should be valid DID format")) 202 202 .await 203 203 .context("failed to initialize user repo")?; 204 204 ··· 233 233 let cid_str = cid.to_string(); 234 234 let rev_str = rev.as_str(); 235 235 236 - sqlx::query!( 236 + _ = sqlx::query!( 237 237 r#" 238 238 INSERT INTO accounts (did, email, password, root, plc_root, rev, created_at) 239 239 VALUES (?, ?, ?, ?, ?, ?, datetime('now')); ··· 264 264 // Broadcast the identity event now that the new identity is resolvable on the public directory. 265 265 fhp.identity( 266 266 atrium_api::com::atproto::sync::subscribe_repos::IdentityData { 267 - did: Did::from_str(&did).unwrap(), 268 - handle: Some(Handle::new(handle).unwrap()), 267 + did: Did::from_str(&did).expect("should be valid DID format"), 268 + handle: Some(Handle::new(handle).expect("should be valid handle")), 269 269 seq: 0, // Filled by firehose later. 270 270 time: Datetime::now(), 271 271 }, ··· 276 276 fhp.account( 277 277 atrium_api::com::atproto::sync::subscribe_repos::AccountData { 278 278 active: true, 279 - did: Did::from_str(&did).unwrap(), 279 + did: Did::from_str(&did).expect("should be valid DID format"), 280 280 seq: 0, // Filled by firehose later. 281 281 status: None, // "takedown" / "suspended" / "deactivated" 282 282 time: Datetime::now(), ··· 284 284 ) 285 285 .await; 286 286 287 - let did = Did::from_str(&did).unwrap(); 287 + let did = Did::from_str(&did).expect("should be valid DID format"); 288 288 289 289 fhp.commit(Commit { 290 290 car: store, ··· 375 375 // determine whether or not an account exists. 376 376 let _ = Argon2::default().verify_password( 377 377 password.as_bytes(), 378 - &PasswordHash::new(DUMMY_PASSWORD).unwrap(), 378 + &PasswordHash::new(DUMMY_PASSWORD).expect("should be valid password hash"), 379 379 ); 380 380 381 381 return Err(Error::with_status( ··· 429 429 refresh_jwt: refresh_token, 430 430 431 431 active: Some(true), 432 - did: Did::from_str(&did).unwrap(), 432 + did: Did::from_str(&did).expect("should be valid DID format"), 433 433 did_doc: None, 434 434 email: None, 435 435 email_auth_factor: None, 436 436 email_confirmed: None, 437 - handle: Handle::new(account.handle).unwrap(), 437 + handle: Handle::new(account.handle).expect("should be valid handle"), 438 438 status: None, 439 439 } 440 440 .into(), ··· 517 517 refresh_jwt: refresh_token, 518 518 519 519 active: Some(active), // TODO? 520 - did: Did::new(did.to_string()).unwrap(), 520 + did: Did::new(did.to_owned()).expect("should be valid DID format"), 521 521 did_doc: None, 522 - handle: Handle::new(user.handle).unwrap(), 522 + handle: Handle::new(user.handle).expect("should be valid handle"), 523 523 status, 524 524 } 525 525 .into(), ··· 589 589 Ok(Json( 590 590 server::get_session::OutputData { 591 591 active: Some(active), 592 - did: Did::from_str(&did).unwrap(), 592 + did: Did::from_str(&did).expect("should be valid DID format"), 593 593 did_doc: None, 594 594 email: Some(user.email), 595 595 email_auth_factor: None, 596 596 email_confirmed: None, 597 - handle: Handle::new(user.handle).unwrap(), 597 + handle: Handle::new(user.handle).expect("should be valid handle"), 598 598 status, 599 599 } 600 600 .into(), ··· 614 614 server::describe_server::OutputData { 615 615 available_user_domains: vec![], 616 616 contact: None, 617 - did: Did::from_str(&format!("did:web:{}", config.host_name)).unwrap(), 617 + did: Did::from_str(&format!("did:web:{}", config.host_name)).expect("should be valid DID format"), 618 618 invite_code_required: Some(true), 619 619 links: None, 620 620 phone_verification_required: Some(false), // email verification
+4 -4
src/endpoints/sync.rs
··· 294 294 .map(|r| { 295 295 sync::list_repos::RepoData { 296 296 active: Some(true), 297 - did: Did::new(r.did).unwrap(), 298 - head: atrium_api::types::string::Cid::new(Cid::from_str(&r.root).unwrap()), 297 + did: Did::new(r.did).expect("should be a valid DID"), 298 + head: atrium_api::types::string::Cid::new(Cid::from_str(&r.root).expect("should be a valid CID")), 299 299 rev: r.rev, 300 300 status: None, 301 301 } ··· 326 326 return Response::builder() 327 327 .status(StatusCode::BAD_REQUEST) 328 328 .body(Body::empty()) 329 - .unwrap(); 329 + .expect("should be a valid response") 330 330 } 331 331 }; 332 332 333 - ws.on_upgrade(move |ws| async move { 333 + ws.on_upgrade(async move |ws| { 334 334 fh.client_connection(ws, cursor).await; 335 335 }) 336 336 }
+11 -5
src/error.rs
··· 1 + //! Error handling for the application. 1 2 use axum::{ 2 3 body::Body, 3 4 http::StatusCode, ··· 15 16 } 16 17 17 18 #[derive(Default, serde::Serialize)] 18 - pub struct ErrorMessage { 19 + /// A JSON error message. 20 + pub(crate) struct ErrorMessage { 19 21 error: String, 20 22 message: String, 21 23 } ··· 29 31 } 30 32 } 31 33 impl ErrorMessage { 32 - pub fn new(error: impl Into<String>, message: impl Into<String>) -> Self { 34 + /// Create a new error message to be returned as JSON body. 35 + pub(crate) fn new(error: impl Into<String>, message: impl Into<String>) -> Self { 33 36 Self { 34 37 error: error.into(), 35 38 message: message.into(), ··· 38 41 } 39 42 40 43 impl Error { 44 + /// Returned when a route is not yet implemented. 41 45 pub fn unimplemented(err: impl Into<anyhow::Error>) -> Self { 42 46 Self::with_status(StatusCode::NOT_IMPLEMENTED, err) 43 47 } 44 48 49 + /// Returned when just providing a status code. 45 50 pub fn with_status(status: StatusCode, err: impl Into<anyhow::Error>) -> Self { 46 51 Self { 47 52 status, ··· 50 55 } 51 56 } 52 57 53 - pub fn with_message( 58 + /// Returned when providing a status code and a JSON message body. 59 + pub(crate) fn with_message( 54 60 status: StatusCode, 55 61 err: impl Into<anyhow::Error>, 56 62 message: impl Into<ErrorMessage>, ··· 96 102 Response::builder() 97 103 .status(self.status) 98 104 .body(Body::new(format!("{:?}", self.err))) 99 - .unwrap() 105 + .expect("should be a valid response") 100 106 } else { 101 107 Response::builder() 102 108 .status(self.status) 103 109 .header("Content-Type", "application/json") 104 110 .body(Body::new(self.message.unwrap_or_default().to_string())) 105 - .unwrap() 111 + .expect("should be a valid response") 106 112 } 107 113 } 108 114 }
+43 -18
src/firehose.rs
··· 1 + //! The firehose module. 1 2 use std::{collections::VecDeque, time::Duration}; 2 3 3 4 use anyhow::{Result, bail}; ··· 36 37 let mut map = serializer.serialize_map(None)?; 37 38 38 39 match self { 39 - FrameHeader::Message(s) => { 40 + Self::Message(s) => { 40 41 map.serialize_key("op")?; 41 42 map.serialize_value(&1)?; 42 43 map.serialize_key("t")?; 43 44 map.serialize_value(s.as_str())?; 44 45 } 45 - FrameHeader::Error => { 46 + Self::Error => { 46 47 map.serialize_key("op")?; 47 48 map.serialize_value(&-1)?; 48 49 } ··· 52 53 } 53 54 } 54 55 55 - pub enum RepoOp { 56 - Create { cid: Cid, path: String }, 57 - Update { cid: Cid, path: String, prev: Cid }, 58 - Delete { path: String, prev: Cid }, 56 + /// A repository operation. 57 + pub(crate) enum RepoOp { 58 + /// Create a new record. 59 + Create { 60 + /// The CID of the record. 61 + cid: Cid, 62 + /// The path of the record. 63 + path: String 64 + }, 65 + /// Update an existing record. 66 + Update { 67 + /// The CID of the record. 68 + cid: Cid, 69 + /// The path of the record. 70 + path: String, 71 + /// The previous CID of the record. 72 + prev: Cid 73 + }, 74 + /// Delete an existing record. 75 + Delete { 76 + /// The path of the record. 77 + path: String, 78 + /// The previous CID of the record. 79 + prev: Cid 80 + }, 59 81 } 60 82 61 83 impl From<RepoOp> for sync::subscribe_repos::RepoOp { ··· 71 93 }; 72 94 73 95 sync::subscribe_repos::RepoOpData { 74 - action: action.to_string(), 96 + action: action.to_owned(), 75 97 cid: cid.map(atrium_api::types::CidLink), 76 98 path, 77 99 } ··· 79 101 } 80 102 } 81 103 82 - pub struct Commit { 104 + /// A commit to the repository. 105 + pub(crate) struct Commit { 83 106 /// The car file containing the commit blocks. 84 107 pub car: Vec<u8>, 85 108 /// The operations performed in this commit. ··· 155 178 .await; 156 179 } 157 180 181 + /// Handle client connection. 158 182 pub(crate) async fn client_connection(&self, ws: WebSocket, cursor: Option<i64>) { 159 183 let _ = self 160 184 .tx ··· 168 192 seq: u64, 169 193 mut msg: sync::subscribe_repos::Message, 170 194 ) -> (&'static str, Vec<u8>) { 171 - let mut dummy_seq = 0i64; 195 + let mut dummy_seq = 0_i64; 172 196 let (ty, nseq) = match &mut msg { 173 197 sync::subscribe_repos::Message::Account(m) => ("#account", &mut m.seq), 174 198 sync::subscribe_repos::Message::Commit(m) => ("#commit", &mut m.seq), ··· 182 206 // Set the sequence number. 183 207 *nseq = seq as i64; 184 208 185 - let hdr = FrameHeader::Message(ty.to_string()); 209 + let hdr = FrameHeader::Message(ty.to_owned()); 186 210 187 211 let mut frame = Vec::new(); 188 - serde_ipld_dagcbor::to_writer(&mut frame, &hdr).unwrap(); 189 - serde_ipld_dagcbor::to_writer(&mut frame, &msg).unwrap(); 212 + serde_ipld_dagcbor::to_writer(&mut frame, &hdr).expect("should serialize header"); 213 + serde_ipld_dagcbor::to_writer(&mut frame, &msg).expect("should serialize message"); 190 214 191 215 (ty, frame) 192 216 } ··· 199 223 let client = &mut clients[i]; 200 224 if let Err(e) = client.send(msg.clone()).await { 201 225 debug!("Firehose client disconnected: {e}"); 202 - clients.remove(i); 226 + _ = clients.remove(i); 203 227 } 204 228 } 205 229 ··· 225 249 "cursor {cursor} is greater than the current sequence number {seq}" 226 250 ))); 227 251 228 - serde_ipld_dagcbor::to_writer(&mut frame, &hdr).unwrap(); 229 - serde_ipld_dagcbor::to_writer(&mut frame, &msg).unwrap(); 252 + serde_ipld_dagcbor::to_writer(&mut frame, &hdr).expect("should serialize header"); 253 + serde_ipld_dagcbor::to_writer(&mut frame, &msg).expect("should serialize message"); 230 254 231 255 // Drop the connection. 232 256 let _ = ws.send(Message::binary(frame)).await; ··· 241 265 } 242 266 243 267 let hdr = FrameHeader::Message(ty.to_string()); 244 - serde_ipld_dagcbor::to_writer(&mut frame, &hdr).unwrap(); 245 - serde_ipld_dagcbor::to_writer(&mut frame, msg).unwrap(); 268 + serde_ipld_dagcbor::to_writer(&mut frame, &hdr).expect("should serialize header"); 269 + serde_ipld_dagcbor::to_writer(&mut frame, msg).expect("should serialize message"); 246 270 247 271 if let Err(e) = ws.send(Message::binary(frame.clone())).await { 248 272 debug!("Firehose client disconnected during backfill: {e}"); ··· 257 281 Ok(ws) 258 282 } 259 283 284 + /// Reconnect to upstream relays. 260 285 pub(crate) async fn reconnect_relays(client: &Client, config: &AppConfig) { 261 286 // Avoid connecting to upstream relays in test mode. 262 287 if config.test { ··· 316 341 let handle = tokio::spawn(async move { 317 342 let mut clients: Vec<WebSocket> = Vec::new(); 318 343 let mut history = VecDeque::with_capacity(1000); 319 - let mut seq = 1u64; 344 + let mut seq = 1_u64; 320 345 321 346 // TODO: We should use `com.atproto.sync.notifyOfUpdate` to reach out to relays 322 347 // that may have disconnected from us due to timeout.
+31 -8
src/main.rs
··· 1 + //! PDS implementation. 1 2 use std::{ 2 3 net::{IpAddr, Ipv4Addr, SocketAddr}, 3 4 path::PathBuf, ··· 42 43 mod plc; 43 44 mod storage; 44 45 46 + /// The application-wide result type. 45 47 pub type Result<T> = std::result::Result<T, Error>; 46 48 pub use error::Error; 47 49 use uuid::Uuid; 48 50 51 + /// The reqwest client type with middleware. 49 52 pub type Client = reqwest_middleware::ClientWithMiddleware; 53 + /// The database connection pool. 50 54 pub type Db = SqlitePool; 55 + /// The Azure credential type. 51 56 pub type Cred = Arc<dyn TokenCredential>; 52 57 58 + /// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`. 53 59 pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); 54 60 55 61 #[derive(Serialize, Deserialize, Debug, Clone)] 62 + /// The key data structure. 56 63 struct KeyData { 57 64 /// Primary signing key for all repo operations. 58 65 skey: Vec<u8>, ··· 65 72 // 66 73 // Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/ 67 74 #[derive(Clone)] 75 + /// The signing key for PLC/DID operations. 68 76 pub struct SigningKey(Arc<Secp256k1Keypair>); 69 77 #[derive(Clone)] 78 + /// The rotation key for PLC operations. 70 79 pub struct RotationKey(Arc<Secp256k1Keypair>); 71 80 72 81 impl std::ops::Deref for SigningKey { ··· 86 95 } 87 96 88 97 #[derive(Parser, Debug, Clone)] 98 + /// Command line arguments. 89 99 struct Args { 100 + /// The verbosity level. 90 101 #[command(flatten)] 91 102 verbosity: Verbosity<InfoLevel>, 92 103 ··· 97 108 98 109 #[derive(Clone, FromRef)] 99 110 struct AppState { 111 + /// The application configuration. 100 112 config: AppConfig, 113 + /// The Azure credential. 101 114 cred: Cred, 115 + /// The database connection pool. 102 116 db: Db, 103 117 118 + /// The HTTP client with middleware. 104 119 client: Client, 120 + /// The simple HTTP client. 105 121 simple_client: reqwest::Client, 122 + /// The firehose producer. 106 123 firehose: FirehoseProducer, 107 124 125 + /// The signing key. 108 126 signing_key: SigningKey, 127 + /// The rotation key. 109 128 rotation_key: RotationKey, 110 129 } 111 130 131 + /// The index (/) route. 112 132 async fn index() -> impl IntoResponse { 113 133 r#" 114 134 __ __ ··· 149 169 ) -> Result<()> { 150 170 let did = user.did(); 151 171 let prefs = sqlx::types::Json(input.preferences.clone()); 152 - sqlx::query!( 172 + _ = sqlx::query!( 153 173 r#"UPDATE accounts SET private_prefs = ? WHERE did = ?"#, 154 174 prefs, 155 175 did ··· 191 211 } 192 212 193 213 #[rustfmt::skip] 214 + /// Register all actor endpoints. 194 215 pub(crate) fn routes() -> Router<AppState> { 195 216 // AP /xrpc/app.bsky.actor.putPreferences 196 217 // AG /xrpc/app.bsky.actor.getPreferences ··· 232 253 } 233 254 // HACK: Assume the bluesky appview by default. 234 255 None => ( 235 - Did::new("did:web:api.bsky.app".to_string()).unwrap(), 236 - "#bsky_appview".to_string(), 256 + Did::new("did:web:api.bsky.app".to_owned()).expect("should be a valid DID"), 257 + "#bsky_appview".to_owned(), 237 258 ), 238 259 }; 239 260 ··· 280 301 281 302 let mut h = HeaderMap::new(); 282 303 if let Some(hdr) = request.headers().get("atproto-accept-labelers") { 283 - h.insert("atproto-accept-labelers", hdr.clone()); 304 + _ = h.insert("atproto-accept-labelers", hdr.clone()); 284 305 } 285 306 if let Some(hdr) = request.headers().get(http::header::CONTENT_TYPE) { 286 - h.insert(http::header::CONTENT_TYPE, hdr.clone()); 307 + _ = h.insert(http::header::CONTENT_TYPE, hdr.clone()); 287 308 } 288 309 289 310 let r = client ··· 309 330 Ok(resp) 310 331 } 311 332 333 + /// The main application entry point. 312 334 async fn run() -> anyhow::Result<()> { 313 335 let args = Args::parse(); 314 336 ··· 368 390 })) 369 391 .build(); 370 392 371 - tokio::fs::create_dir_all(&config.key.parent().unwrap()) 393 + tokio::fs::create_dir_all(&config.key.parent().expect("should have parent")) 372 394 .await 373 395 .context("failed to create key directory")?; 374 396 ··· 458 480 .await 459 481 .context("failed to query database")?; 460 482 483 + #[allow(clippy::print_stdout)] 461 484 if c == 0 { 462 485 let uuid = Uuid::new_v4().to_string(); 463 486 464 - sqlx::query!( 487 + _ = sqlx::query!( 465 488 r#" 466 489 INSERT INTO invites (id, did, count, created_at) 467 490 VALUES (?, NULL, 1, datetime('now')) ··· 494 517 }); 495 518 496 519 // Now that the app is live, request a crawl from upstream relays. 497 - let _ = firehose::reconnect_relays(&client, &config).await; 520 + firehose::reconnect_relays(&client, &config).await; 498 521 499 522 serve 500 523 .await
+18 -11
src/metrics.rs
··· 8 8 9 9 use crate::config; 10 10 11 - pub(crate) const AUTH_FAILED: &str = "bluepds.auth.failed"; // Counter. 12 - 13 - pub(crate) const FIREHOSE_HISTORY: &str = "bluepds.firehose.history"; // Gauge. 14 - pub(crate) const FIREHOSE_LISTENERS: &str = "bluepds.firehose.listeners"; // Gauge. 15 - pub(crate) const FIREHOSE_MESSAGES: &str = "bluepds.firehose.messages"; // Counter. 16 - pub(crate) const FIREHOSE_SEQUENCE: &str = "bluepds.firehose.sequence"; // Counter. 17 - 18 - pub(crate) const REPO_COMMITS: &str = "bluepds.repo.commits"; // Counter. 19 - pub(crate) const REPO_OP_CREATE: &str = "bluepds.repo.op.create"; // Counter. 20 - pub(crate) const REPO_OP_UPDATE: &str = "bluepds.repo.op.update"; // Counter. 21 - pub(crate) const REPO_OP_DELETE: &str = "bluepds.repo.op.delete"; // Counter. 11 + /// Authorization failed. Counter. 12 + pub(crate) const AUTH_FAILED: &str = "bluepds.auth.failed"; 13 + /// Firehose history. Gauge. 14 + pub(crate) const FIREHOSE_HISTORY: &str = "bluepds.firehose.history"; 15 + /// Firehose listeners. Gauge. 16 + pub(crate) const FIREHOSE_LISTENERS: &str = "bluepds.firehose.listeners"; 17 + /// Firehose messages. Counter. 18 + pub(crate) const FIREHOSE_MESSAGES: &str = "bluepds.firehose.messages"; 19 + /// Firehose sequence. Counter. 20 + pub(crate) const FIREHOSE_SEQUENCE: &str = "bluepds.firehose.sequence"; 21 + /// Repository commits. Counter. 22 + pub(crate) const REPO_COMMITS: &str = "bluepds.repo.commits"; 23 + /// Repository operation create. Counter. 24 + pub(crate) const REPO_OP_CREATE: &str = "bluepds.repo.op.create"; 25 + /// Repository operation update. Counter. 26 + pub(crate) const REPO_OP_UPDATE: &str = "bluepds.repo.op.update"; 27 + /// Repository operation delete. Counter. 28 + pub(crate) const REPO_OP_DELETE: &str = "bluepds.repo.op.delete"; 22 29 23 30 /// Must be ran exactly once on startup. This will declare all of the instruments for `metrics`. 24 31 pub(crate) fn setup(config: &Option<config::MetricConfig>) -> anyhow::Result<()> {
+11 -3
src/plc.rs
··· 1 + //! PLC operations. 1 2 use std::collections::HashMap; 2 3 3 4 use anyhow::{Context, bail}; ··· 12 13 13 14 #[derive(Debug, Deserialize, Serialize, Clone)] 14 15 #[serde(rename_all = "camelCase", tag = "type")] 16 + /// A PLC service. 15 17 pub(crate) enum PlcService { 16 18 #[serde(rename = "AtprotoPersonalDataServer")] 17 - Pds { endpoint: String }, 19 + /// A personal data server. 20 + Pds { 21 + /// The URL of the PDS. 22 + endpoint: String 23 + }, 18 24 } 19 25 20 26 #[derive(Debug, Deserialize, Serialize, Clone)] ··· 30 36 } 31 37 32 38 impl PlcOperation { 39 + /// Sign an operation with the provided signature. 33 40 pub(crate) fn sign(self, sig: Vec<u8>) -> SignedPlcOperation { 34 41 SignedPlcOperation { 35 42 typ: self.typ, ··· 45 52 46 53 #[derive(Debug, Deserialize, Serialize, Clone)] 47 54 #[serde(rename_all = "camelCase")] 55 + /// A signed PLC operation. 48 56 pub(crate) struct SignedPlcOperation { 49 57 #[serde(rename = "type")] 50 58 pub typ: String, ··· 65 73 66 74 /// Submit a PLC operation to the public directory. 67 75 pub(crate) async fn submit(client: &Client, did: &str, op: &SignedPlcOperation) -> anyhow::Result<()> { 68 - debug!("submitting {} {}", did, serde_json::to_string(&op).unwrap()); 76 + debug!("submitting {} {}", did, serde_json::to_string(&op).expect("should serialize")); 69 77 70 78 let res = client 71 79 .post(format!("{PLC_DIRECTORY}{did}")) ··· 84 92 85 93 bail!( 86 94 "error from PLC directory: {}", 87 - serde_json::to_string(&e).unwrap() 95 + serde_json::to_string(&e).expect("should serialize") 88 96 ); 89 97 } 90 98 }
+4 -1
src/storage.rs
··· 10 10 11 11 use crate::{Db, config::RepoConfig}; 12 12 13 + /// Open a block store for a given DID. 13 14 pub(crate) async fn open_store( 14 15 config: &RepoConfig, 15 16 did: impl Into<String>, ··· 31 32 CarStore::open(f).await.context("failed to open car store") 32 33 } 33 34 35 + /// Open a repository for a given DID. 34 36 pub(crate) async fn open_repo_db( 35 37 config: &RepoConfig, 36 38 db: &Db, ··· 48 50 .await 49 51 .context("failed to query database")?; 50 52 51 - open_repo(config, did, Cid::from_str(&cid).unwrap()).await 53 + open_repo(config, did, Cid::from_str(&cid).expect("should be valid CID")).await 52 54 } 53 55 56 + /// Open a repository for a given DID and CID. 54 57 pub(crate) async fn open_repo( 55 58 config: &RepoConfig, 56 59 did: impl Into<String>,