Alternative ATProto PDS implementation
1//! PDS implementation. 2mod account_manager; 3mod actor_endpoints; 4mod actor_store; 5mod auth; 6mod config; 7mod db; 8mod did; 9mod endpoints; 10pub mod error; 11mod firehose; 12mod metrics; 13mod mmap; 14mod oauth; 15mod plc; 16mod schema; 17mod service_proxy; 18#[cfg(test)] 19mod tests; 20 21use anyhow::{Context as _, anyhow}; 22use atrium_api::types::string::Did; 23use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; 24use auth::AuthenticatedUser; 25use axum::{ 26 Router, 27 body::Body, 28 extract::{FromRef, Request, State}, 29 http::{self, HeaderMap, Response, StatusCode, Uri}, 30 response::IntoResponse, 31 routing::get, 32}; 33use azure_core::credentials::TokenCredential; 34use clap::Parser; 35use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter}; 36use config::AppConfig; 37use db::establish_pool; 38use deadpool_diesel::sqlite::Pool; 39use diesel::prelude::*; 40use diesel_migrations::{EmbeddedMigrations, embed_migrations}; 41pub use error::Error; 42use figment::{Figment, providers::Format as _}; 43use firehose::FirehoseProducer; 44use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 45use rand::Rng as _; 46use serde::{Deserialize, Serialize}; 47use service_proxy::service_proxy; 48use std::{ 49 net::{IpAddr, Ipv4Addr, SocketAddr}, 50 path::PathBuf, 51 str::FromStr as _, 52 sync::Arc, 53}; 54use tokio::net::TcpListener; 55use tower_http::{cors::CorsLayer, trace::TraceLayer}; 56use tracing::{info, warn}; 57use uuid::Uuid; 58 59/// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`. 60pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); 61 62/// Embedded migrations 63pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); 64 65/// The application-wide result type. 66pub type Result<T> = std::result::Result<T, Error>; 67/// The reqwest client type with middleware. 68pub type Client = reqwest_middleware::ClientWithMiddleware; 69/// The Azure credential type. 70pub type Cred = Arc<dyn TokenCredential>; 71 72#[expect( 73 clippy::arbitrary_source_item_ordering, 74 reason = "serialized data might be structured" 75)] 76#[derive(Serialize, Deserialize, Debug, Clone)] 77/// The key data structure. 78struct KeyData { 79 /// Primary signing key for all repo operations. 80 skey: Vec<u8>, 81 /// Primary signing (rotation) key for all PLC operations. 82 rkey: Vec<u8>, 83} 84 85// FIXME: We should use P256Keypair instead. SecP256K1 is primarily used for cryptocurrencies, 86// and the implementations of this algorithm are much more limited as compared to P256. 87// 88// Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/ 89#[derive(Clone)] 90/// The signing key for PLC/DID operations. 91pub struct SigningKey(Arc<Secp256k1Keypair>); 92#[derive(Clone)] 93/// The rotation key for PLC operations. 94pub struct RotationKey(Arc<Secp256k1Keypair>); 95 96impl std::ops::Deref for SigningKey { 97 type Target = Secp256k1Keypair; 98 99 fn deref(&self) -> &Self::Target { 100 &self.0 101 } 102} 103 104impl SigningKey { 105 /// Import from a private key. 106 pub fn import(key: &[u8]) -> Result<Self> { 107 let key = Secp256k1Keypair::import(key).context("failed to import signing key")?; 108 Ok(Self(Arc::new(key))) 109 } 110} 111 112impl std::ops::Deref for RotationKey { 113 type Target = Secp256k1Keypair; 114 115 fn deref(&self) -> &Self::Target { 116 &self.0 117 } 118} 119 120#[derive(Parser, Debug, Clone)] 121/// Command line arguments. 122pub struct Args { 123 /// Path to the configuration file 124 #[arg(short, long, default_value = "default.toml")] 125 pub config: PathBuf, 126 /// The verbosity level. 127 #[command(flatten)] 128 pub verbosity: Verbosity<InfoLevel>, 129} 130 131pub struct ActorPools { 132 pub repo: Pool, 133 pub blob: Pool, 134} 135 136impl Clone for ActorPools { 137 fn clone(&self) -> Self { 138 Self { 139 repo: self.repo.clone(), 140 blob: self.blob.clone(), 141 } 142 } 143} 144 145#[expect(clippy::arbitrary_source_item_ordering, reason = "arbitrary")] 146#[derive(Clone, FromRef)] 147pub struct AppState { 148 /// The application configuration. 149 pub config: AppConfig, 150 /// The Azure credential. 151 pub cred: Cred, 152 /// The main database connection pool. Used for common PDS data, like invite codes. 153 pub db: Pool, 154 /// Actor-specific database connection pools. Hashed by DID. 155 pub db_actors: std::collections::HashMap<String, ActorPools>, 156 157 /// The HTTP client with middleware. 158 pub client: Client, 159 /// The simple HTTP client. 160 pub simple_client: reqwest::Client, 161 /// The firehose producer. 162 pub firehose: FirehoseProducer, 163 164 /// The signing key. 165 pub signing_key: SigningKey, 166 /// The rotation key. 167 pub rotation_key: RotationKey, 168} 169 170/// The index (/) route. 171async fn index() -> impl IntoResponse { 172 r" 173 __ __ 174 /\ \__ /\ \__ 175 __ \ \ ,_\ _____ _ __ ___\ \ ,_\ ___ 176 /'__'\ \ \ \/ /\ '__'\/\''__\/ __'\ \ \/ / __'\ 177 /\ \L\.\_\ \ \_\ \ \L\ \ \ \//\ \L\ \ \ \_/\ \L\ \ 178 \ \__/.\_\\ \__\\ \ ,__/\ \_\\ \____/\ \__\ \____/ 179 \/__/\/_/ \/__/ \ \ \/ \/_/ \/___/ \/__/\/___/ 180 \ \_\ 181 \/_/ 182 183 184This is an AT Protocol Personal Data Server (aka, an atproto PDS) 185 186Most API routes are under /xrpc/ 187 188 Code: https://github.com/DrChat/bluepds 189 Protocol: https://atproto.com 190 " 191} 192 193/// The main application entry point. 194#[expect( 195 clippy::cognitive_complexity, 196 clippy::too_many_lines, 197 unused_qualifications, 198 reason = "main function has high complexity" 199)] 200pub async fn run() -> anyhow::Result<()> { 201 let args = Args::parse(); 202 203 // Set up trace logging to console and account for the user-provided verbosity flag. 204 if args.verbosity.log_level_filter() != LevelFilter::Off { 205 let lvl = match args.verbosity.log_level_filter() { 206 LevelFilter::Error => tracing::Level::ERROR, 207 LevelFilter::Warn => tracing::Level::WARN, 208 LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO, 209 LevelFilter::Debug => tracing::Level::DEBUG, 210 LevelFilter::Trace => tracing::Level::TRACE, 211 }; 212 tracing_subscriber::fmt().with_max_level(lvl).init(); 213 } 214 215 if !args.config.exists() { 216 // Throw up a warning if the config file does not exist. 217 // 218 // This is not fatal because users can specify all configuration settings via 219 // the environment, but the most likely scenario here is that a user accidentally 220 // omitted the config file for some reason (e.g. forgot to mount it into Docker). 221 warn!( 222 "configuration file {} does not exist", 223 args.config.display() 224 ); 225 } 226 227 // Read and parse the user-provided configuration. 228 let config: AppConfig = Figment::new() 229 .admerge(figment::providers::Toml::file(args.config)) 230 .admerge(figment::providers::Env::prefixed("BLUEPDS_")) 231 .extract() 232 .context("failed to load configuration")?; 233 234 if config.test { 235 warn!("BluePDS starting up in TEST mode."); 236 warn!("This means the application will not federate with the rest of the network."); 237 warn!( 238 "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`" 239 ); 240 } 241 242 // Initialize metrics reporting. 243 metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?; 244 245 // Create a reqwest client that will be used for all outbound requests. 246 let simple_client = reqwest::Client::builder() 247 .user_agent(APP_USER_AGENT) 248 .build() 249 .context("failed to build requester client")?; 250 let client = reqwest_middleware::ClientBuilder::new(simple_client.clone()) 251 .with(http_cache_reqwest::Cache(http_cache_reqwest::HttpCache { 252 mode: CacheMode::Default, 253 manager: MokaManager::default(), 254 options: HttpCacheOptions::default(), 255 })) 256 .build(); 257 258 tokio::fs::create_dir_all(&config.key.parent().context("should have parent")?) 259 .await 260 .context("failed to create key directory")?; 261 262 // Check if crypto keys exist. If not, create new ones. 263 let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) { 264 let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f)) 265 .context("failed to deserialize crypto keys")?; 266 267 let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?; 268 let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?; 269 270 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 271 } else { 272 info!("signing keys not found, generating new ones"); 273 274 let skey = Secp256k1Keypair::create(&mut rand::thread_rng()); 275 let rkey = Secp256k1Keypair::create(&mut rand::thread_rng()); 276 277 let keys = KeyData { 278 skey: skey.export(), 279 rkey: rkey.export(), 280 }; 281 282 let mut f = std::fs::File::create(&config.key).context("failed to create key file")?; 283 serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?; 284 285 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 286 }; 287 288 tokio::fs::create_dir_all(&config.repo.path).await?; 289 tokio::fs::create_dir_all(&config.plc.path).await?; 290 tokio::fs::create_dir_all(&config.blob.path).await?; 291 292 let cred = azure_identity::DefaultAzureCredential::new() 293 .context("failed to create Azure credential")?; 294 295 // Create a database connection manager and pool for the main database. 296 let pool = 297 establish_pool(&config.db).context("failed to establish database connection pool")?; 298 // Create a dictionary of database connection pools for each actor. 299 let mut actor_pools = std::collections::HashMap::new(); 300 // let mut actor_blob_pools = std::collections::HashMap::new(); 301 // We'll determine actors by looking in the data/repo dir for .db files. 302 let mut actor_dbs = tokio::fs::read_dir(&config.repo.path) 303 .await 304 .context("failed to read repo directory")?; 305 while let Some(entry) = actor_dbs 306 .next_entry() 307 .await 308 .context("failed to read repo dir")? 309 { 310 let path = entry.path(); 311 if path.extension().and_then(|s| s.to_str()) == Some("db") { 312 let did_path = path 313 .file_stem() 314 .and_then(|s| s.to_str()) 315 .context("failed to get actor DID")?; 316 let did = Did::from_str(&format!("did:plc:{}", did_path)) 317 .expect("should be able to parse actor DID"); 318 319 // Create a new database connection manager and pool for the actor. 320 // The path for the SQLite connection needs to look like "sqlite://data/repo/<actor>.db" 321 let path_repo = format!("sqlite://{}", did_path); 322 let actor_repo_pool = 323 establish_pool(&path_repo).context("failed to create database connection pool")?; 324 // Create a new database connection manager and pool for the actor blobs. 325 // The path for the SQLite connection needs to look like "sqlite://data/blob/<actor>.db" 326 let path_blob = path_repo.replace("repo", "blob"); 327 let actor_blob_pool = 328 establish_pool(&path_blob).context("failed to create database connection pool")?; 329 drop(actor_pools.insert( 330 did.to_string(), 331 ActorPools { 332 repo: actor_repo_pool, 333 blob: actor_blob_pool, 334 }, 335 )); 336 } 337 } 338 // Apply pending migrations 339 // let conn = pool.get().await?; 340 // conn.run_pending_migrations(MIGRATIONS) 341 // .expect("should be able to run migrations"); 342 343 let (_fh, fhp) = firehose::spawn(client.clone(), config.clone()); 344 345 let addr = config 346 .listen_address 347 .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000)); 348 349 let app = Router::new() 350 .route("/", get(index)) 351 .merge(oauth::routes()) 352 .nest( 353 "/xrpc", 354 endpoints::routes() 355 .merge(actor_endpoints::routes()) 356 .fallback(service_proxy), 357 ) 358 // .layer(RateLimitLayer::new(30, Duration::from_secs(30))) 359 .layer(CorsLayer::permissive()) 360 .layer(TraceLayer::new_for_http()) 361 .with_state(AppState { 362 cred, 363 config: config.clone(), 364 db: pool.clone(), 365 db_actors: actor_pools.clone(), 366 client: client.clone(), 367 simple_client, 368 firehose: fhp, 369 signing_key: skey, 370 rotation_key: rkey, 371 }); 372 373 info!("listening on {addr}"); 374 info!("connect to: http://127.0.0.1:{}", addr.port()); 375 376 // Determine whether or not this was the first startup (i.e. no accounts exist and no invite codes were created). 377 // If so, create an invite code and share it via the console. 378 let conn = pool.get().await.context("failed to get db connection")?; 379 380 #[derive(QueryableByName)] 381 struct TotalCount { 382 #[diesel(sql_type = diesel::sql_types::Integer)] 383 total_count: i32, 384 } 385 386 let result = conn.interact(move |conn| { 387 diesel::sql_query( 388 "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count", 389 ) 390 .get_result::<TotalCount>(conn) 391 }) 392 .await 393 .expect("should be able to query database")?; 394 395 let c = result.total_count; 396 397 #[expect(clippy::print_stdout)] 398 if c == 0 { 399 let uuid = Uuid::new_v4().to_string(); 400 401 let uuid_clone = uuid.clone(); 402 _ = conn 403 .interact(move |conn| { 404 diesel::sql_query( 405 "INSERT INTO invites (id, did, count, created_at) VALUES (?, NULL, 1, datetime('now'))", 406 ) 407 .bind::<diesel::sql_types::Text, _>(uuid_clone) 408 .execute(conn) 409 .context("failed to create new invite code") 410 .expect("should be able to create invite code") 411 }) 412 .await 413 .expect("should be able to create invite code"); 414 415 // N.B: This is a sensitive message, so we're bypassing `tracing` here and 416 // logging it directly to console. 417 println!("====================================="); 418 println!(" FIRST STARTUP "); 419 println!("====================================="); 420 println!("Use this code to create an account:"); 421 println!("{uuid}"); 422 println!("====================================="); 423 } 424 425 let listener = TcpListener::bind(&addr) 426 .await 427 .context("failed to bind address")?; 428 429 // Serve the app, and request crawling from upstream relays. 430 let serve = tokio::spawn(async move { 431 axum::serve(listener, app.into_make_service()) 432 .await 433 .context("failed to serve app") 434 }); 435 436 // Now that the app is live, request a crawl from upstream relays. 437 firehose::reconnect_relays(&client, &config).await; 438 439 serve 440 .await 441 .map_err(Into::into) 442 .and_then(|r| r) 443 .context("failed to serve app") 444} 445 446/// Creates an app router with the provided AppState. 447pub fn create_app(state: AppState) -> Router { 448 Router::new() 449 .route("/", get(index)) 450 .merge(oauth::routes()) 451 .nest( 452 "/xrpc", 453 endpoints::routes() 454 .merge(actor_endpoints::routes()) 455 .fallback(service_proxy), 456 ) 457 .layer(CorsLayer::permissive()) 458 .layer(TraceLayer::new_for_http()) 459 .with_state(state) 460}