Alternative ATProto PDS implementation

lint (shadow_unrelated)

+1 -2
Cargo.toml
··· 90 multiple_crate_versions = "allow" 91 expect_used = "allow" 92 # # Temporary Allows - Restriction 93 - shadow_reuse = "allow" 94 - shadow_unrelated = "allow" 95 min_ident_chars = "allow" 96 # arbitrary_source_item_ordering = "allow" 97 renamed_function_params = "allow" ··· 117 tests_outside_test_module = "allow" 118 ref_patterns = "allow" 119 question_mark_used = "allow" 120 # Warns 121 missing_docs_in_private_items = "warn" 122 use_self = "warn"
··· 90 multiple_crate_versions = "allow" 91 expect_used = "allow" 92 # # Temporary Allows - Restriction 93 min_ident_chars = "allow" 94 # arbitrary_source_item_ordering = "allow" 95 renamed_function_params = "allow" ··· 115 tests_outside_test_module = "allow" 116 ref_patterns = "allow" 117 question_mark_used = "allow" 118 + shadow_reuse = "allow" 119 # Warns 120 missing_docs_in_private_items = "warn" 121 use_self = "warn"
+10 -7
src/auth.rs
··· 52 53 // N.B: We ignore all fields inside of the token up until this point because they can be 54 // attacker-controlled. 55 - let (typ, claims) = verify(&state.signing_key.did(), token).map_err(|e| { 56 - Error::with_status( 57 - StatusCode::UNAUTHORIZED, 58 - e.context("failed to verify auth token"), 59 - ) 60 - }).context("token auth should be verify")?; 61 62 // Ensure this is an authentication token. 63 if typ != "at+jwt" { ··· 92 let _status = sqlx::query_scalar!(r#"SELECT status FROM accounts WHERE did = ?"#, did) 93 .fetch_one(&state.db) 94 .await 95 - .with_context(|| format!("failed to query account {did}")).context("should fetch account status")?; 96 97 Ok(Self { 98 did: did.to_owned(),
··· 52 53 // N.B: We ignore all fields inside of the token up until this point because they can be 54 // attacker-controlled. 55 + let (typ, claims) = verify(&state.signing_key.did(), token) 56 + .map_err(|e| { 57 + Error::with_status( 58 + StatusCode::UNAUTHORIZED, 59 + e.context("failed to verify auth token"), 60 + ) 61 + }) 62 + .context("token auth should be verify")?; 63 64 // Ensure this is an authentication token. 65 if typ != "at+jwt" { ··· 94 let _status = sqlx::query_scalar!(r#"SELECT status FROM accounts WHERE did = ?"#, did) 95 .fetch_one(&state.db) 96 .await 97 + .with_context(|| format!("failed to query account {did}")) 98 + .context("should fetch account status")?; 99 100 Ok(Self { 101 did: did.to_owned(),
+1 -2
src/endpoints/identity.rs
··· 153 .await 154 .context("failed to open did doc")?; 155 156 - let op_bytes = serde_ipld_dagcbor::to_vec(&op) 157 - .context("failed to encode plc op")?; 158 159 let plc_cid = CarStore::open(doc) 160 .await
··· 153 .await 154 .context("failed to open did doc")?; 155 156 + let op_bytes = serde_ipld_dagcbor::to_vec(&op).context("failed to encode plc op")?; 157 158 let plc_cid = CarStore::open(doc) 159 .await
+10 -32
src/endpoints/repo.rs
··· 169 if let (Some(blob_type), Some(blob_ref)) = (map.get("$type"), map.get("ref")) { 170 if blob_type == &serde_json::Value::String("blob".to_owned()) { 171 if let Ok(rf) = serde_json::from_value::<BlobRef>(blob_ref.clone()) { 172 - cids.push( 173 - Cid::from_str(&rf.link) 174 - .context("failed to convert cid")?, 175 - ); 176 } 177 } 178 } ··· 398 .context("failed to extract key")?; 399 } 400 401 - let mut tx = db 402 - .begin() 403 - .await 404 - .context("failed to begin transaction")?; 405 406 if !swap_commit( 407 &mut *tx, ··· 638 }; 639 Ok(Json( 640 repo::put_record::OutputData { 641 - cid: cid 642 - .context("missing cid")?, 643 commit: write_result.commit.to_owned(), 644 - uri: uri 645 - .context("missing uri")?, 646 validation_status: Some("unknown".to_owned()), 647 } 648 .into(), ··· 710 711 let mut tree = repo.tree(); 712 let mut it = Box::pin(tree.keys()); 713 - while let Some(key) = it 714 - .try_next() 715 - .await 716 - .context("failed to iterate repo keys")? 717 - { 718 if let Some((collection, _rkey)) = key.split_once('/') { 719 _ = collections.insert(collection.to_owned()); 720 } ··· 764 .await 765 .context("failed to find record")?; 766 767 - let record: Option<serde_json::Value> = repo 768 - .get_raw(&key) 769 - .await 770 - .context("failed to read record")?; 771 772 record.map_or_else( 773 || { ··· 898 let mut len = 0_usize; 899 let mut sha = Sha256::new(); 900 let mut stream = request.into_body().into_data_stream(); 901 - while let Some(bytes) = stream 902 - .try_next() 903 - .await 904 - .context("failed to receive file")? 905 - { 906 - len = len 907 - .checked_add(bytes.len()) 908 - .context("size overflow")?; 909 910 // Deal with any sneaky end-users trying to bypass size limitations. 911 - let len_u64: u64 = len 912 - .try_into() 913 - .context("failed to convert `len`")?; 914 if len_u64 > config.blob.limit { 915 drop(file); 916 tokio::fs::remove_file(&filename)
··· 169 if let (Some(blob_type), Some(blob_ref)) = (map.get("$type"), map.get("ref")) { 170 if blob_type == &serde_json::Value::String("blob".to_owned()) { 171 if let Ok(rf) = serde_json::from_value::<BlobRef>(blob_ref.clone()) { 172 + cids.push(Cid::from_str(&rf.link).context("failed to convert cid")?); 173 } 174 } 175 } ··· 395 .context("failed to extract key")?; 396 } 397 398 + let mut tx = db.begin().await.context("failed to begin transaction")?; 399 400 if !swap_commit( 401 &mut *tx, ··· 632 }; 633 Ok(Json( 634 repo::put_record::OutputData { 635 + cid: cid.context("missing cid")?, 636 commit: write_result.commit.to_owned(), 637 + uri: uri.context("missing uri")?, 638 validation_status: Some("unknown".to_owned()), 639 } 640 .into(), ··· 702 703 let mut tree = repo.tree(); 704 let mut it = Box::pin(tree.keys()); 705 + while let Some(key) = it.try_next().await.context("failed to iterate repo keys")? { 706 if let Some((collection, _rkey)) = key.split_once('/') { 707 _ = collections.insert(collection.to_owned()); 708 } ··· 752 .await 753 .context("failed to find record")?; 754 755 + let record: Option<serde_json::Value> = 756 + repo.get_raw(&key).await.context("failed to read record")?; 757 758 record.map_or_else( 759 || { ··· 884 let mut len = 0_usize; 885 let mut sha = Sha256::new(); 886 let mut stream = request.into_body().into_data_stream(); 887 + while let Some(bytes) = stream.try_next().await.context("failed to receive file")? { 888 + len = len.checked_add(bytes.len()).context("size overflow")?; 889 890 // Deal with any sneaky end-users trying to bypass size limitations. 891 + let len_u64: u64 = len.try_into().context("failed to convert `len`")?; 892 if len_u64 > config.blob.limit { 893 drop(file); 894 tokio::fs::remove_file(&filename)
+11 -23
src/endpoints/server.rs
··· 79 State(fhp): State<FirehoseProducer>, 80 Json(input): Json<server::create_account::Input>, 81 ) -> Result<Json<server::create_account::Output>> { 82 - let email = input 83 - .email 84 - .as_deref() 85 - .context("no email provided")?; 86 // Hash the user's password. 87 let pass = Argon2::default() 88 .hash_password( ··· 124 125 // Begin a new transaction to actually create the user's profile. 126 // Unless committed, the transaction will be automatically rolled back. 127 - let mut tx = db 128 - .begin() 129 - .await 130 - .context("failed to begin transaction")?; 131 132 let _invite = match input.invite_code { 133 Some(ref code) => { ··· 145 .await 146 .context("failed to check invite code")?; 147 148 - invite 149 - .context("invalid invite code")? 150 } 151 None => { 152 return Err(anyhow!("invite code required").into()); ··· 173 ) 174 .await 175 .context("failed to sign genesis op")?; 176 - let op_bytes = serde_ipld_dagcbor::to_vec(&op) 177 - .context("failed to encode genesis op")?; 178 179 let did_hash = { 180 let digest = base32::encode( ··· 283 .context("failed to create new account")?; 284 285 // The account is fully created. Commit the SQL transaction to the database. 286 - tx.commit() 287 - .await 288 - .context("failed to commit transaction")?; 289 290 // Broadcast the identity event now that the new identity is resolvable on the public directory. 291 fhp.identity( ··· 415 416 match Argon2::default().verify_password( 417 password.as_bytes(), 418 - &PasswordHash::new(account.password.as_str()) 419 - .context("invalid password hash in db")?, 420 ) { 421 Ok(_) => {} 422 Err(_e) => { ··· 482 req: Request, 483 ) -> Result<Json<server::refresh_session::Output>> { 484 // TODO: store hashes of refresh tokens and enforce single-use 485 - let auth = req 486 .headers() 487 .get(axum::http::header::AUTHORIZATION) 488 - .context("no authorization header provided")?; 489 - let token = auth 490 .to_str() 491 .ok() 492 .and_then(|auth| auth.strip_prefix("Bearer ")) 493 .context("invalid authentication token")?; 494 495 - let (typ, claims) = auth::verify(&skey.did(), token) 496 - .context("failed to verify refresh token")?; 497 if typ != "refresh+jwt" { 498 return Err(Error::with_status( 499 StatusCode::UNAUTHORIZED, ··· 630 State(db): State<Db>, 631 ) -> Result<Json<server::get_session::Output>> { 632 let did = user.did(); 633 - 634 if let Some(user) = sqlx::query!( 635 r#" 636 SELECT a.email, a.status, (
··· 79 State(fhp): State<FirehoseProducer>, 80 Json(input): Json<server::create_account::Input>, 81 ) -> Result<Json<server::create_account::Output>> { 82 + let email = input.email.as_deref().context("no email provided")?; 83 // Hash the user's password. 84 let pass = Argon2::default() 85 .hash_password( ··· 121 122 // Begin a new transaction to actually create the user's profile. 123 // Unless committed, the transaction will be automatically rolled back. 124 + let mut tx = db.begin().await.context("failed to begin transaction")?; 125 126 let _invite = match input.invite_code { 127 Some(ref code) => { ··· 139 .await 140 .context("failed to check invite code")?; 141 142 + invite.context("invalid invite code")? 143 } 144 None => { 145 return Err(anyhow!("invite code required").into()); ··· 166 ) 167 .await 168 .context("failed to sign genesis op")?; 169 + let op_bytes = serde_ipld_dagcbor::to_vec(&op).context("failed to encode genesis op")?; 170 171 let did_hash = { 172 let digest = base32::encode( ··· 275 .context("failed to create new account")?; 276 277 // The account is fully created. Commit the SQL transaction to the database. 278 + tx.commit().await.context("failed to commit transaction")?; 279 280 // Broadcast the identity event now that the new identity is resolvable on the public directory. 281 fhp.identity( ··· 405 406 match Argon2::default().verify_password( 407 password.as_bytes(), 408 + &PasswordHash::new(account.password.as_str()).context("invalid password hash in db")?, 409 ) { 410 Ok(_) => {} 411 Err(_e) => { ··· 471 req: Request, 472 ) -> Result<Json<server::refresh_session::Output>> { 473 // TODO: store hashes of refresh tokens and enforce single-use 474 + let auth_token = req 475 .headers() 476 .get(axum::http::header::AUTHORIZATION) 477 + .context("no authorization header provided")? 478 .to_str() 479 .ok() 480 .and_then(|auth| auth.strip_prefix("Bearer ")) 481 .context("invalid authentication token")?; 482 483 + let (typ, claims) = 484 + auth::verify(&skey.did(), auth_token).context("failed to verify refresh token")?; 485 if typ != "refresh+jwt" { 486 return Err(Error::with_status( 487 StatusCode::UNAUTHORIZED, ··· 618 State(db): State<Db>, 619 ) -> Result<Json<server::get_session::Output>> { 620 let did = user.did(); 621 + #[expect(clippy::shadow_unrelated, reason = "is related")] 622 if let Some(user) = sqlx::query!( 623 r#" 624 SELECT a.email, a.status, (
+2 -3
src/endpoints/sync.rs
··· 318 } 319 320 async fn subscribe_repos( 321 - ws: WebSocketUpgrade, 322 State(fh): State<FirehoseProducer>, 323 Query(input): Query<SubscribeReposParametersData>, 324 ) -> impl IntoResponse { ··· 331 .expect("should be a valid response"); 332 } 333 }; 334 - 335 - ws.on_upgrade(async move |ws| { 336 fh.client_connection(ws, cursor).await; 337 }) 338 }
··· 318 } 319 320 async fn subscribe_repos( 321 + ws_up: WebSocketUpgrade, 322 State(fh): State<FirehoseProducer>, 323 Query(input): Query<SubscribeReposParametersData>, 324 ) -> impl IntoResponse { ··· 331 .expect("should be a valid response"); 332 } 333 }; 334 + ws_up.on_upgrade(async move |ws| { 335 fh.client_connection(ws, cursor).await; 336 }) 337 }
+3 -8
src/firehose.rs
··· 277 let msg = sync::subscribe_repos::Error::FutureCursor(Some(format!( 278 "cursor {cursor} is greater than the current sequence number {seq}" 279 ))); 280 - 281 serde_ipld_dagcbor::to_writer(&mut frame, &hdr).expect("should serialize header"); 282 serde_ipld_dagcbor::to_writer(&mut frame, &msg).expect("should serialize message"); 283 - 284 // Drop the connection. 285 drop(ws.send(Message::binary(frame)).await); 286 bail!( ··· 288 ); 289 } 290 291 - for &(seq, ty, ref msg) in history.iter() { 292 - if seq > cursor { 293 - break; 294 } 295 - 296 let hdr = FrameHeader::Message(ty.to_owned()); 297 serde_ipld_dagcbor::to_writer(&mut frame, &hdr).expect("should serialize header"); 298 serde_ipld_dagcbor::to_writer(&mut frame, msg).expect("should serialize message"); 299 - 300 if let Err(e) = ws.send(Message::binary(frame.clone())).await { 301 debug!("Firehose client disconnected during backfill: {e}"); 302 break; 303 } 304 - 305 // Clear out the frame to begin a new one. 306 frame.clear(); 307 }
··· 277 let msg = sync::subscribe_repos::Error::FutureCursor(Some(format!( 278 "cursor {cursor} is greater than the current sequence number {seq}" 279 ))); 280 serde_ipld_dagcbor::to_writer(&mut frame, &hdr).expect("should serialize header"); 281 serde_ipld_dagcbor::to_writer(&mut frame, &msg).expect("should serialize message"); 282 // Drop the connection. 283 drop(ws.send(Message::binary(frame)).await); 284 bail!( ··· 286 ); 287 } 288 289 + for &(historical_seq, ty, ref msg) in history.iter() { 290 + if cursor > historical_seq { 291 + continue; 292 } 293 let hdr = FrameHeader::Message(ty.to_owned()); 294 serde_ipld_dagcbor::to_writer(&mut frame, &hdr).expect("should serialize header"); 295 serde_ipld_dagcbor::to_writer(&mut frame, msg).expect("should serialize message"); 296 if let Err(e) = ws.send(Message::binary(frame.clone())).await { 297 debug!("Firehose client disconnected during backfill: {e}"); 298 break; 299 } 300 // Clear out the frame to begin a new one. 301 frame.clear(); 302 }
+2 -2
src/main.rs
··· 227 /// 228 /// Reference: https://atproto.com/specs/xrpc#service-proxying 229 async fn service_proxy( 230 - url: Uri, 231 user: AuthenticatedUser, 232 State(skey): State<SigningKey>, 233 State(client): State<reqwest::Client>, 234 headers: HeaderMap, 235 request: Request<Body>, 236 ) -> Result<Response<Body>> { 237 - let url_path = url.path_and_query().context("invalid service proxy url")?; 238 let lxm = url_path 239 .path() 240 .strip_prefix("/")
··· 227 /// 228 /// Reference: https://atproto.com/specs/xrpc#service-proxying 229 async fn service_proxy( 230 + uri: Uri, 231 user: AuthenticatedUser, 232 State(skey): State<SigningKey>, 233 State(client): State<reqwest::Client>, 234 headers: HeaderMap, 235 request: Request<Body>, 236 ) -> Result<Response<Body>> { 237 + let url_path = uri.path_and_query().context("invalid service proxy url")?; 238 let lxm = url_path 239 .path() 240 .strip_prefix("/")