Alternative ATProto PDS implementation

cargo fmt

+3 -3
src/auth.rs
··· 1 //! Authentication primitives. 2 3 - use anyhow::{anyhow, Context}; 4 use atrium_crypto::{ 5 keypair::{Did, Secp256k1Keypair}, 6 verify::Verifier, ··· 8 use axum::{extract::FromRequestParts, http::StatusCode}; 9 use base64::Engine; 10 11 - use crate::{auth, error::ErrorMessage, AppState, Error}; 12 13 /// This is an axum request extractor that represents an authenticated user. 14 /// ··· 46 return Err(Error::with_status( 47 StatusCode::UNAUTHORIZED, 48 anyhow!("no bearer token"), 49 - )) 50 } 51 }; 52
··· 1 //! Authentication primitives. 2 3 + use anyhow::{Context, anyhow}; 4 use atrium_crypto::{ 5 keypair::{Did, Secp256k1Keypair}, 6 verify::Verifier, ··· 8 use axum::{extract::FromRequestParts, http::StatusCode}; 9 use base64::Engine; 10 11 + use crate::{AppState, Error, auth, error::ErrorMessage}; 12 13 /// This is an axum request extractor that represents an authenticated user. 14 /// ··· 46 return Err(Error::with_status( 47 StatusCode::UNAUTHORIZED, 48 anyhow!("no bearer token"), 49 + )); 50 } 51 }; 52
+1 -1
src/did.rs
··· 1 - use anyhow::{bail, Context, Result}; 2 use atrium_api::types::string::Did; 3 use serde::{Deserialize, Serialize}; 4 use url::Url;
··· 1 + use anyhow::{Context, Result, bail}; 2 use atrium_api::types::string::Did; 3 use serde::{Deserialize, Serialize}; 4 use url::Url;
+7 -4
src/endpoints/identity.rs
··· 1 use std::collections::HashMap; 2 3 - use anyhow::{anyhow, Context}; 4 use atrium_api::{ 5 com::atproto::identity, 6 types::string::{Datetime, Handle}, ··· 8 use atrium_crypto::keypair::Did; 9 use atrium_repo::blockstore::{AsyncBlockStoreWrite, CarStore, DAG_CBOR, SHA2_256}; 10 use axum::{ 11 extract::{Query, State}, 12 http::StatusCode, 13 routing::{get, post}, 14 - Json, Router, 15 }; 16 use constcat::concat; 17 18 use crate::{ 19 auth::AuthenticatedUser, 20 config::AppConfig, 21 did, 22 firehose::FirehoseProducer, 23 plc::{self, PlcOperation, PlcService}, 24 - AppState, Client, Db, Error, Result, RotationKey, SigningKey, 25 }; 26 27 async fn resolve_handle( ··· 69 todo!() 70 } 71 72 - #[expect(clippy::too_many_arguments, reason = "Many parameters are required for this endpoint")] 73 async fn update_handle( 74 user: AuthenticatedUser, 75 State(skey): State<SigningKey>,
··· 1 use std::collections::HashMap; 2 3 + use anyhow::{Context, anyhow}; 4 use atrium_api::{ 5 com::atproto::identity, 6 types::string::{Datetime, Handle}, ··· 8 use atrium_crypto::keypair::Did; 9 use atrium_repo::blockstore::{AsyncBlockStoreWrite, CarStore, DAG_CBOR, SHA2_256}; 10 use axum::{ 11 + Json, Router, 12 extract::{Query, State}, 13 http::StatusCode, 14 routing::{get, post}, 15 }; 16 use constcat::concat; 17 18 use crate::{ 19 + AppState, Client, Db, Error, Result, RotationKey, SigningKey, 20 auth::AuthenticatedUser, 21 config::AppConfig, 22 did, 23 firehose::FirehoseProducer, 24 plc::{self, PlcOperation, PlcService}, 25 }; 26 27 async fn resolve_handle( ··· 69 todo!() 70 } 71 72 + #[expect( 73 + clippy::too_many_arguments, 74 + reason = "Many parameters are required for this endpoint" 75 + )] 76 async fn update_handle( 77 user: AuthenticatedUser, 78 State(skey): State<SigningKey>,
+1 -1
src/endpoints/mod.rs
··· 1 - use axum::{routing::get, Json, Router}; 2 use serde_json::json; 3 4 use crate::{AppState, Result};
··· 1 + use axum::{Json, Router, routing::get}; 2 use serde_json::json; 3 4 use crate::{AppState, Result};
+6 -5
src/endpoints/repo.rs
··· 1 use std::{collections::HashSet, str::FromStr}; 2 3 - use anyhow::{anyhow, Context}; 4 use atrium_api::{ 5 com::atproto::repo::{self, defs::CommitMetaData}, 6 types::{ 7 - string::{AtIdentifier, Nsid, Tid}, 8 LimitedU32, Object, TryFromUnknown, TryIntoUnknown, Unknown, 9 }, 10 }; 11 - use atrium_repo::{blockstore::CarStore, Cid}; 12 use axum::{ 13 body::Body, 14 extract::{Query, Request, State}, 15 http::{self, StatusCode}, 16 routing::{get, post}, 17 - Json, Router, 18 }; 19 use constcat::concat; 20 use futures::TryStreamExt; ··· 24 use tokio::io::AsyncWriteExt; 25 26 use crate::{ 27 auth::AuthenticatedUser, 28 config::AppConfig, 29 firehose::{self, FirehoseProducer, RepoOp}, 30 metrics::{REPO_COMMITS, REPO_OP_CREATE, REPO_OP_DELETE, REPO_OP_UPDATE}, 31 - storage, AppState, Db, Error, Result, SigningKey, 32 }; 33 34 /// IPLD CID raw binary
··· 1 use std::{collections::HashSet, str::FromStr}; 2 3 + use anyhow::{Context, anyhow}; 4 use atrium_api::{ 5 com::atproto::repo::{self, defs::CommitMetaData}, 6 types::{ 7 LimitedU32, Object, TryFromUnknown, TryIntoUnknown, Unknown, 8 + string::{AtIdentifier, Nsid, Tid}, 9 }, 10 }; 11 + use atrium_repo::{Cid, blockstore::CarStore}; 12 use axum::{ 13 + Json, Router, 14 body::Body, 15 extract::{Query, Request, State}, 16 http::{self, StatusCode}, 17 routing::{get, post}, 18 }; 19 use constcat::concat; 20 use futures::TryStreamExt; ··· 24 use tokio::io::AsyncWriteExt; 25 26 use crate::{ 27 + AppState, Db, Error, Result, SigningKey, 28 auth::AuthenticatedUser, 29 config::AppConfig, 30 firehose::{self, FirehoseProducer, RepoOp}, 31 metrics::{REPO_COMMITS, REPO_OP_CREATE, REPO_OP_DELETE, REPO_OP_UPDATE}, 32 + storage, 33 }; 34 35 /// IPLD CID raw binary
+5 -5
src/endpoints/server.rs
··· 1 use std::{collections::HashMap, str::FromStr}; 2 3 - use anyhow::{anyhow, Context}; 4 - use argon2::{password_hash::SaltString, Argon2, PasswordHash, PasswordHasher, PasswordVerifier}; 5 use atrium_api::{ 6 com::atproto::server, 7 types::string::{Datetime, Did, Handle, Tid}, 8 }; 9 use atrium_crypto::keypair::Did as _; 10 use atrium_repo::{ 11 - blockstore::{AsyncBlockStoreWrite, CarStore, DAG_CBOR, SHA2_256}, 12 Cid, Repository, 13 }; 14 use axum::{ 15 extract::{Query, Request, State}, 16 http::StatusCode, 17 routing::{get, post}, 18 - Json, Router, 19 }; 20 use constcat::concat; 21 use metrics::counter; ··· 24 use uuid::Uuid; 25 26 use crate::{ 27 auth::{self, AuthenticatedUser}, 28 config::AppConfig, 29 firehose::{Commit, FirehoseProducer}, 30 metrics::AUTH_FAILED, 31 plc::{self, PlcOperation, PlcService}, 32 - AppState, Client, Db, Error, Result, RotationKey, SigningKey, 33 }; 34 35 /// This is a dummy password that can be used in absence of a real password.
··· 1 use std::{collections::HashMap, str::FromStr}; 2 3 + use anyhow::{Context, anyhow}; 4 + use argon2::{Argon2, PasswordHash, PasswordHasher, PasswordVerifier, password_hash::SaltString}; 5 use atrium_api::{ 6 com::atproto::server, 7 types::string::{Datetime, Did, Handle, Tid}, 8 }; 9 use atrium_crypto::keypair::Did as _; 10 use atrium_repo::{ 11 Cid, Repository, 12 + blockstore::{AsyncBlockStoreWrite, CarStore, DAG_CBOR, SHA2_256}, 13 }; 14 use axum::{ 15 + Json, Router, 16 extract::{Query, Request, State}, 17 http::StatusCode, 18 routing::{get, post}, 19 }; 20 use constcat::concat; 21 use metrics::counter; ··· 24 use uuid::Uuid; 25 26 use crate::{ 27 + AppState, Client, Db, Error, Result, RotationKey, SigningKey, 28 auth::{self, AuthenticatedUser}, 29 config::AppConfig, 30 firehose::{Commit, FirehoseProducer}, 31 metrics::AUTH_FAILED, 32 plc::{self, PlcOperation, PlcService}, 33 }; 34 35 /// This is a dummy password that can be used in absence of a real password.
+4 -4
src/endpoints/sync.rs
··· 1 use std::str::FromStr; 2 3 - use anyhow::{anyhow, Context}; 4 use atrium_api::{com::atproto::sync, types::string::Did}; 5 use atrium_repo::{ 6 - blockstore::{AsyncBlockStoreRead, AsyncBlockStoreWrite, CarStore, DAG_CBOR, SHA2_256}, 7 Cid, 8 }; 9 use axum::{ 10 body::Body, 11 extract::{Query, State, WebSocketUpgrade}, 12 http::{self, Response, StatusCode}, 13 response::IntoResponse, 14 routing::get, 15 - Json, Router, 16 }; 17 use constcat::concat; 18 use futures::stream::TryStreamExt; 19 use tokio_util::io::ReaderStream; 20 21 use crate::{ 22 config::AppConfig, 23 firehose::FirehoseProducer, 24 storage::{open_repo_db, open_store}, 25 - AppState, Db, Error, Result, 26 }; 27 28 async fn get_blob(
··· 1 use std::str::FromStr; 2 3 + use anyhow::{Context, anyhow}; 4 use atrium_api::{com::atproto::sync, types::string::Did}; 5 use atrium_repo::{ 6 Cid, 7 + blockstore::{AsyncBlockStoreRead, AsyncBlockStoreWrite, CarStore, DAG_CBOR, SHA2_256}, 8 }; 9 use axum::{ 10 + Json, Router, 11 body::Body, 12 extract::{Query, State, WebSocketUpgrade}, 13 http::{self, Response, StatusCode}, 14 response::IntoResponse, 15 routing::get, 16 }; 17 use constcat::concat; 18 use futures::stream::TryStreamExt; 19 use tokio_util::io::ReaderStream; 20 21 use crate::{ 22 + AppState, Db, Error, Result, 23 config::AppConfig, 24 firehose::FirehoseProducer, 25 storage::{open_repo_db, open_store}, 26 }; 27 28 async fn get_blob(
+10 -2
src/error.rs
··· 21 } 22 impl std::fmt::Display for ErrorMessage { 23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 24 - write!(f, r#"{{"error":"{}","message":"{}"}}"#, self.error, self.message) 25 } 26 } 27 impl ErrorMessage { ··· 46 } 47 } 48 49 - pub fn with_message(status: StatusCode, err: impl Into<anyhow::Error>, message: impl Into<ErrorMessage>) -> Self { 50 Self { 51 status, 52 err: err.into(),
··· 21 } 22 impl std::fmt::Display for ErrorMessage { 23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 24 + write!( 25 + f, 26 + r#"{{"error":"{}","message":"{}"}}"#, 27 + self.error, self.message 28 + ) 29 } 30 } 31 impl ErrorMessage { ··· 50 } 51 } 52 53 + pub fn with_message( 54 + status: StatusCode, 55 + err: impl Into<anyhow::Error>, 56 + message: impl Into<ErrorMessage>, 57 + ) -> Self { 58 Self { 59 status, 60 err: err.into(),
+10 -5
src/firehose.rs
··· 1 use std::{collections::VecDeque, time::Duration}; 2 3 - use anyhow::{bail, Result}; 4 use atrium_api::{ 5 com::atproto::sync::{self}, 6 types::string::{Datetime, Did}, ··· 9 use axum::extract::ws::{Message, WebSocket}; 10 use metrics::{counter, gauge}; 11 use rand::Rng; 12 - use serde::{ser::SerializeMap, Serialize}; 13 use tracing::{debug, error, info, warn}; 14 15 use crate::{ 16 config::AppConfig, 17 metrics::{FIREHOSE_HISTORY, FIREHOSE_LISTENERS, FIREHOSE_MESSAGES, FIREHOSE_SEQUENCE}, 18 - Client, 19 }; 20 21 enum FirehoseMessage { ··· 156 } 157 158 pub async fn client_connection(&self, ws: WebSocket, cursor: Option<i64>) { 159 - let _ = self.tx.send(FirehoseMessage::Connect(Box::new((ws, cursor)))).await; 160 } 161 } 162 ··· 227 228 // Drop the connection. 229 let _ = ws.send(Message::binary(frame)).await; 230 - bail!("connection dropped: cursor {cursor} is greater than the current sequence number {seq}"); 231 } 232 233 for (seq, ty, msg) in history.iter() {
··· 1 use std::{collections::VecDeque, time::Duration}; 2 3 + use anyhow::{Result, bail}; 4 use atrium_api::{ 5 com::atproto::sync::{self}, 6 types::string::{Datetime, Did}, ··· 9 use axum::extract::ws::{Message, WebSocket}; 10 use metrics::{counter, gauge}; 11 use rand::Rng; 12 + use serde::{Serialize, ser::SerializeMap}; 13 use tracing::{debug, error, info, warn}; 14 15 use crate::{ 16 + Client, 17 config::AppConfig, 18 metrics::{FIREHOSE_HISTORY, FIREHOSE_LISTENERS, FIREHOSE_MESSAGES, FIREHOSE_SEQUENCE}, 19 }; 20 21 enum FirehoseMessage { ··· 156 } 157 158 pub async fn client_connection(&self, ws: WebSocket, cursor: Option<i64>) { 159 + let _ = self 160 + .tx 161 + .send(FirehoseMessage::Connect(Box::new((ws, cursor)))) 162 + .await; 163 } 164 } 165 ··· 230 231 // Drop the connection. 232 let _ = ws.send(Message::binary(frame)).await; 233 + bail!( 234 + "connection dropped: cursor {cursor} is greater than the current sequence number {seq}" 235 + ); 236 } 237 238 for (seq, ty, msg) in history.iter() {
+10 -8
src/main.rs
··· 9 use atrium_crypto::keypair::{Export, Secp256k1Keypair}; 10 use auth::AuthenticatedUser; 11 use axum::{ 12 body::Body, 13 extract::{FromRef, Request, State}, 14 http::{self, HeaderMap, Response, StatusCode, Uri}, 15 response::IntoResponse, 16 routing::get, 17 - Router, 18 }; 19 use azure_core::credentials::TokenCredential; 20 use clap::Parser; 21 - use clap_verbosity_flag::{log::LevelFilter, InfoLevel, Verbosity}; 22 use config::AppConfig; 23 - use figment::{providers::Format, Figment}; 24 use firehose::FirehoseProducer; 25 use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 26 use rand::Rng; 27 use serde::{Deserialize, Serialize}; 28 - use sqlx::{sqlite::SqliteConnectOptions, SqlitePool}; 29 use tokio::net::TcpListener; 30 use tower_http::{cors::CorsLayer, trace::TraceLayer}; 31 32 - use anyhow::{anyhow, Context}; 33 use tracing::{info, warn}; 34 35 mod auth; ··· 137 /// This will _very likely_ be changed in the future. 138 mod actor_endpoints { 139 use atrium_api::app::bsky::actor; 140 - use axum::{routing::post, Json}; 141 use constcat::concat; 142 143 use super::*; ··· 247 return Err(Error::with_status( 248 StatusCode::BAD_REQUEST, 249 anyhow!("could not find resolve service #{id}"), 250 - )) 251 } 252 }; 253 ··· 347 if config.test { 348 warn!("BluePDS starting up in TEST mode."); 349 warn!("This means the application will not federate with the rest of the network."); 350 - warn!("If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`"); 351 } 352 353 // Initialize metrics reporting.
··· 9 use atrium_crypto::keypair::{Export, Secp256k1Keypair}; 10 use auth::AuthenticatedUser; 11 use axum::{ 12 + Router, 13 body::Body, 14 extract::{FromRef, Request, State}, 15 http::{self, HeaderMap, Response, StatusCode, Uri}, 16 response::IntoResponse, 17 routing::get, 18 }; 19 use azure_core::credentials::TokenCredential; 20 use clap::Parser; 21 + use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter}; 22 use config::AppConfig; 23 + use figment::{Figment, providers::Format}; 24 use firehose::FirehoseProducer; 25 use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 26 use rand::Rng; 27 use serde::{Deserialize, Serialize}; 28 + use sqlx::{SqlitePool, sqlite::SqliteConnectOptions}; 29 use tokio::net::TcpListener; 30 use tower_http::{cors::CorsLayer, trace::TraceLayer}; 31 32 + use anyhow::{Context, anyhow}; 33 use tracing::{info, warn}; 34 35 mod auth; ··· 137 /// This will _very likely_ be changed in the future. 138 mod actor_endpoints { 139 use atrium_api::app::bsky::actor; 140 + use axum::{Json, routing::post}; 141 use constcat::concat; 142 143 use super::*; ··· 247 return Err(Error::with_status( 248 StatusCode::BAD_REQUEST, 249 anyhow!("could not find resolve service #{id}"), 250 + )); 251 } 252 }; 253 ··· 347 if config.test { 348 warn!("BluePDS starting up in TEST mode."); 349 warn!("This means the application will not federate with the rest of the network."); 350 + warn!( 351 + "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`" 352 + ); 353 } 354 355 // Initialize metrics reporting.
+1 -1
src/plc.rs
··· 1 use std::collections::HashMap; 2 3 - use anyhow::{bail, Context}; 4 use base64::Engine; 5 use serde::{Deserialize, Serialize}; 6 use tracing::debug;
··· 1 use std::collections::HashMap; 2 3 + use anyhow::{Context, bail}; 4 use base64::Engine; 5 use serde::{Deserialize, Serialize}; 6 use tracing::debug;
+3 -5
src/storage.rs
··· 4 5 use anyhow::{Context, Result}; 6 use atrium_repo::{ 7 - blockstore::{AsyncBlockStoreRead, AsyncBlockStoreWrite, CarStore}, 8 Cid, Repository, 9 }; 10 11 - use crate::{config::RepoConfig, Db}; 12 13 pub async fn open_store( 14 config: &RepoConfig, ··· 28 .await 29 .context("failed to open repository file")?; 30 31 - CarStore::open(f) 32 - .await 33 - .context("failed to open car store") 34 } 35 36 pub async fn open_repo_db(
··· 4 5 use anyhow::{Context, Result}; 6 use atrium_repo::{ 7 Cid, Repository, 8 + blockstore::{AsyncBlockStoreRead, AsyncBlockStoreWrite, CarStore}, 9 }; 10 11 + use crate::{Db, config::RepoConfig}; 12 13 pub async fn open_store( 14 config: &RepoConfig, ··· 28 .await 29 .context("failed to open repository file")?; 30 31 + CarStore::open(f).await.context("failed to open car store") 32 } 33 34 pub async fn open_repo_db(