Alternative ATProto PDS implementation
1//! PDS implementation. 2mod account_manager; 3mod actor_store; 4mod auth; 5mod config; 6mod db; 7mod did; 8mod endpoints; 9mod error; 10mod firehose; 11mod metrics; 12mod mmap; 13mod oauth; 14mod plc; 15#[cfg(test)] 16mod tests; 17 18/// HACK: store private user preferences in the PDS. 19/// 20/// We shouldn't have to know about any bsky endpoints to store private user data. 21/// This will _very likely_ be changed in the future. 22mod actor_endpoints; 23 24use anyhow::{Context as _, anyhow}; 25use atrium_api::types::string::Did; 26use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; 27use auth::AuthenticatedUser; 28use axum::{ 29 Router, 30 body::Body, 31 extract::{FromRef, Request, State}, 32 http::{self, HeaderMap, Response, StatusCode, Uri}, 33 response::IntoResponse, 34 routing::get, 35}; 36use azure_core::credentials::TokenCredential; 37use clap::Parser; 38use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter}; 39use config::AppConfig; 40use diesel::prelude::*; 41use diesel::r2d2::{self, ConnectionManager}; 42use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; 43#[expect(clippy::pub_use, clippy::useless_attribute)] 44pub use error::Error; 45use figment::{Figment, providers::Format as _}; 46use firehose::FirehoseProducer; 47use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 48use rand::Rng as _; 49use serde::{Deserialize, Serialize}; 50use std::{ 51 net::{IpAddr, Ipv4Addr, SocketAddr}, 52 path::PathBuf, 53 str::FromStr as _, 54 sync::Arc, 55}; 56use tokio::net::TcpListener; 57use tower_http::{cors::CorsLayer, trace::TraceLayer}; 58use tracing::{info, warn}; 59use uuid::Uuid; 60 61/// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`. 62pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); 63 64/// Embedded migrations 65pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); 66 67/// The application-wide result type. 68pub type Result<T> = std::result::Result<T, Error>; 69/// The reqwest client type with middleware. 70pub type Client = reqwest_middleware::ClientWithMiddleware; 71/// The database connection pool. 72pub type Db = r2d2::Pool<ConnectionManager<SqliteConnection>>; 73/// The Azure credential type. 74pub type Cred = Arc<dyn TokenCredential>; 75 76#[expect( 77 clippy::arbitrary_source_item_ordering, 78 reason = "serialized data might be structured" 79)] 80#[derive(Serialize, Deserialize, Debug, Clone)] 81/// The key data structure. 82struct KeyData { 83 /// Primary signing key for all repo operations. 84 skey: Vec<u8>, 85 /// Primary signing (rotation) key for all PLC operations. 86 rkey: Vec<u8>, 87} 88 89// FIXME: We should use P256Keypair instead. SecP256K1 is primarily used for cryptocurrencies, 90// and the implementations of this algorithm are much more limited as compared to P256. 91// 92// Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/ 93#[derive(Clone)] 94/// The signing key for PLC/DID operations. 95pub struct SigningKey(Arc<Secp256k1Keypair>); 96#[derive(Clone)] 97/// The rotation key for PLC operations. 98pub struct RotationKey(Arc<Secp256k1Keypair>); 99 100impl std::ops::Deref for SigningKey { 101 type Target = Secp256k1Keypair; 102 103 fn deref(&self) -> &Self::Target { 104 &self.0 105 } 106} 107 108impl SigningKey { 109 /// Import from a private key. 110 pub fn import(key: &[u8]) -> Result<Self> { 111 let key = Secp256k1Keypair::import(key).context("failed to import signing key")?; 112 Ok(Self(Arc::new(key))) 113 } 114} 115 116impl std::ops::Deref for RotationKey { 117 type Target = Secp256k1Keypair; 118 119 fn deref(&self) -> &Self::Target { 120 &self.0 121 } 122} 123 124#[derive(Parser, Debug, Clone)] 125/// Command line arguments. 126struct Args { 127 /// Path to the configuration file 128 #[arg(short, long, default_value = "default.toml")] 129 config: PathBuf, 130 /// The verbosity level. 131 #[command(flatten)] 132 verbosity: Verbosity<InfoLevel>, 133} 134 135#[expect(clippy::arbitrary_source_item_ordering, reason = "arbitrary")] 136#[derive(Clone, FromRef)] 137struct AppState { 138 /// The application configuration. 139 config: AppConfig, 140 /// The Azure credential. 141 cred: Cred, 142 /// The database connection pool. 143 db: Db, 144 145 /// The HTTP client with middleware. 146 client: Client, 147 /// The simple HTTP client. 148 simple_client: reqwest::Client, 149 /// The firehose producer. 150 firehose: FirehoseProducer, 151 152 /// The signing key. 153 signing_key: SigningKey, 154 /// The rotation key. 155 rotation_key: RotationKey, 156} 157 158/// The index (/) route. 159async fn index() -> impl IntoResponse { 160 r" 161 __ __ 162 /\ \__ /\ \__ 163 __ \ \ ,_\ _____ _ __ ___\ \ ,_\ ___ 164 /'__'\ \ \ \/ /\ '__'\/\''__\/ __'\ \ \/ / __'\ 165 /\ \L\.\_\ \ \_\ \ \L\ \ \ \//\ \L\ \ \ \_/\ \L\ \ 166 \ \__/.\_\\ \__\\ \ ,__/\ \_\\ \____/\ \__\ \____/ 167 \/__/\/_/ \/__/ \ \ \/ \/_/ \/___/ \/__/\/___/ 168 \ \_\ 169 \/_/ 170 171 172This is an AT Protocol Personal Data Server (aka, an atproto PDS) 173 174Most API routes are under /xrpc/ 175 176 Code: https://github.com/DrChat/bluepds 177 Protocol: https://atproto.com 178 " 179} 180 181/// Service proxy. 182/// 183/// Reference: <https://atproto.com/specs/xrpc#service-proxying> 184async fn service_proxy( 185 uri: Uri, 186 user: AuthenticatedUser, 187 State(skey): State<SigningKey>, 188 State(client): State<reqwest::Client>, 189 headers: HeaderMap, 190 request: Request<Body>, 191) -> Result<Response<Body>> { 192 let url_path = uri.path_and_query().context("invalid service proxy url")?; 193 let lxm = url_path 194 .path() 195 .strip_prefix("/") 196 .with_context(|| format!("invalid service proxy url prefix: {}", url_path.path()))?; 197 198 let user_did = user.did(); 199 let (did, id) = match headers.get("atproto-proxy") { 200 Some(val) => { 201 let val = 202 std::str::from_utf8(val.as_bytes()).context("proxy header not valid utf-8")?; 203 204 let (did, id) = val.split_once('#').context("invalid proxy header")?; 205 206 let did = 207 Did::from_str(did).map_err(|e| anyhow!("atproto proxy not a valid DID: {e}"))?; 208 209 (did, format!("#{id}")) 210 } 211 // HACK: Assume the bluesky appview by default. 212 None => ( 213 Did::new("did:web:api.bsky.app".to_owned()) 214 .expect("service proxy should be a valid DID"), 215 "#bsky_appview".to_owned(), 216 ), 217 }; 218 219 let did_doc = did::resolve(&Client::new(client.clone(), []), did.clone()) 220 .await 221 .with_context(|| format!("failed to resolve did document {}", did.as_str()))?; 222 223 let Some(service) = did_doc.service.iter().find(|s| s.id == id) else { 224 return Err(Error::with_status( 225 StatusCode::BAD_REQUEST, 226 anyhow!("could not find resolve service #{id}"), 227 )); 228 }; 229 230 let target_url: url::Url = service 231 .service_endpoint 232 .join(&format!("/xrpc{url_path}")) 233 .context("failed to construct target url")?; 234 235 let exp = (chrono::Utc::now().checked_add_signed(chrono::Duration::minutes(1))) 236 .context("should be valid expiration datetime")? 237 .timestamp(); 238 let jti = rand::thread_rng() 239 .sample_iter(rand::distributions::Alphanumeric) 240 .take(10) 241 .map(char::from) 242 .collect::<String>(); 243 244 // Mint a bearer token by signing a JSON web token. 245 // https://github.com/DavidBuchanan314/millipds/blob/5c7529a739d394e223c0347764f1cf4e8fd69f94/src/millipds/appview_proxy.py#L47-L59 246 let token = auth::sign( 247 &skey, 248 "JWT", 249 &serde_json::json!({ 250 "iss": user_did.as_str(), 251 "aud": did.as_str(), 252 "lxm": lxm, 253 "exp": exp, 254 "jti": jti, 255 }), 256 ) 257 .context("failed to sign jwt")?; 258 259 let mut h = HeaderMap::new(); 260 if let Some(hdr) = request.headers().get("atproto-accept-labelers") { 261 drop(h.insert("atproto-accept-labelers", hdr.clone())); 262 } 263 if let Some(hdr) = request.headers().get(http::header::CONTENT_TYPE) { 264 drop(h.insert(http::header::CONTENT_TYPE, hdr.clone())); 265 } 266 267 let r = client 268 .request(request.method().clone(), target_url) 269 .headers(h) 270 .header(http::header::AUTHORIZATION, format!("Bearer {token}")) 271 .body(reqwest::Body::wrap_stream( 272 request.into_body().into_data_stream(), 273 )) 274 .send() 275 .await 276 .context("failed to send request")?; 277 278 let mut resp = Response::builder().status(r.status()); 279 if let Some(hdrs) = resp.headers_mut() { 280 *hdrs = r.headers().clone(); 281 } 282 283 let resp = resp 284 .body(Body::from_stream(r.bytes_stream())) 285 .context("failed to construct response")?; 286 287 Ok(resp) 288} 289 290/// The main application entry point. 291#[expect( 292 clippy::cognitive_complexity, 293 clippy::too_many_lines, 294 reason = "main function has high complexity" 295)] 296async fn run() -> anyhow::Result<()> { 297 let args = Args::parse(); 298 299 // Set up trace logging to console and account for the user-provided verbosity flag. 300 if args.verbosity.log_level_filter() != LevelFilter::Off { 301 let lvl = match args.verbosity.log_level_filter() { 302 LevelFilter::Error => tracing::Level::ERROR, 303 LevelFilter::Warn => tracing::Level::WARN, 304 LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO, 305 LevelFilter::Debug => tracing::Level::DEBUG, 306 LevelFilter::Trace => tracing::Level::TRACE, 307 }; 308 tracing_subscriber::fmt().with_max_level(lvl).init(); 309 } 310 311 if !args.config.exists() { 312 // Throw up a warning if the config file does not exist. 313 // 314 // This is not fatal because users can specify all configuration settings via 315 // the environment, but the most likely scenario here is that a user accidentally 316 // omitted the config file for some reason (e.g. forgot to mount it into Docker). 317 warn!( 318 "configuration file {} does not exist", 319 args.config.display() 320 ); 321 } 322 323 // Read and parse the user-provided configuration. 324 let config: AppConfig = Figment::new() 325 .admerge(figment::providers::Toml::file(args.config)) 326 .admerge(figment::providers::Env::prefixed("BLUEPDS_")) 327 .extract() 328 .context("failed to load configuration")?; 329 330 if config.test { 331 warn!("BluePDS starting up in TEST mode."); 332 warn!("This means the application will not federate with the rest of the network."); 333 warn!( 334 "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`" 335 ); 336 } 337 338 // Initialize metrics reporting. 339 metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?; 340 341 // Create a reqwest client that will be used for all outbound requests. 342 let simple_client = reqwest::Client::builder() 343 .user_agent(APP_USER_AGENT) 344 .build() 345 .context("failed to build requester client")?; 346 let client = reqwest_middleware::ClientBuilder::new(simple_client.clone()) 347 .with(http_cache_reqwest::Cache(http_cache_reqwest::HttpCache { 348 mode: CacheMode::Default, 349 manager: MokaManager::default(), 350 options: HttpCacheOptions::default(), 351 })) 352 .build(); 353 354 tokio::fs::create_dir_all(&config.key.parent().context("should have parent")?) 355 .await 356 .context("failed to create key directory")?; 357 358 // Check if crypto keys exist. If not, create new ones. 359 let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) { 360 let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f)) 361 .context("failed to deserialize crypto keys")?; 362 363 let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?; 364 let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?; 365 366 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 367 } else { 368 info!("signing keys not found, generating new ones"); 369 370 let skey = Secp256k1Keypair::create(&mut rand::thread_rng()); 371 let rkey = Secp256k1Keypair::create(&mut rand::thread_rng()); 372 373 let keys = KeyData { 374 skey: skey.export(), 375 rkey: rkey.export(), 376 }; 377 378 let mut f = std::fs::File::create(&config.key).context("failed to create key file")?; 379 serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?; 380 381 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 382 }; 383 384 tokio::fs::create_dir_all(&config.repo.path).await?; 385 tokio::fs::create_dir_all(&config.plc.path).await?; 386 tokio::fs::create_dir_all(&config.blob.path).await?; 387 388 let cred = azure_identity::DefaultAzureCredential::new() 389 .context("failed to create Azure credential")?; 390 391 // Create a database connection manager and pool 392 let manager = ConnectionManager::<SqliteConnection>::new(&config.db); 393 let db = r2d2::Pool::builder() 394 .build(manager) 395 .context("failed to create database connection pool")?; 396 397 // Apply pending migrations 398 let conn = &mut db 399 .get() 400 .context("failed to get database connection for migrations")?; 401 conn.run_pending_migrations(MIGRATIONS) 402 .expect("should be able to run migrations"); 403 404 let (_fh, fhp) = firehose::spawn(client.clone(), config.clone()); 405 406 let addr = config 407 .listen_address 408 .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000)); 409 410 let app = Router::new() 411 .route("/", get(index)) 412 .merge(oauth::routes()) 413 .nest( 414 "/xrpc", 415 endpoints::routes() 416 .merge(actor_endpoints::routes()) 417 .fallback(service_proxy), 418 ) 419 // .layer(RateLimitLayer::new(30, Duration::from_secs(30))) 420 .layer(CorsLayer::permissive()) 421 .layer(TraceLayer::new_for_http()) 422 .with_state(AppState { 423 cred, 424 config: config.clone(), 425 db: db.clone(), 426 client: client.clone(), 427 simple_client, 428 firehose: fhp, 429 signing_key: skey, 430 rotation_key: rkey, 431 }); 432 433 info!("listening on {addr}"); 434 info!("connect to: http://127.0.0.1:{}", addr.port()); 435 436 // Determine whether or not this was the first startup (i.e. no accounts exist and no invite codes were created). 437 // If so, create an invite code and share it via the console. 438 let conn = &mut db.get().context("failed to get database connection")?; 439 440 #[derive(QueryableByName)] 441 struct TotalCount { 442 #[diesel(sql_type = diesel::sql_types::Integer)] 443 total_count: i32, 444 } 445 446 let result = diesel::sql_query( 447 "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count", 448 ) 449 .get_result::<TotalCount>(conn) 450 .context("failed to query database")?; 451 452 let c = result.total_count; 453 454 #[expect(clippy::print_stdout)] 455 if c == 0 { 456 let uuid = Uuid::new_v4().to_string(); 457 458 diesel::sql_query( 459 "INSERT INTO invites (id, did, count, created_at) VALUES (?, NULL, 1, datetime('now'))", 460 ) 461 .bind::<diesel::sql_types::Text, _>(uuid.clone()) 462 .execute(conn) 463 .context("failed to create new invite code")?; 464 465 // N.B: This is a sensitive message, so we're bypassing `tracing` here and 466 // logging it directly to console. 467 println!("====================================="); 468 println!(" FIRST STARTUP "); 469 println!("====================================="); 470 println!("Use this code to create an account:"); 471 println!("{uuid}"); 472 println!("====================================="); 473 } 474 475 let listener = TcpListener::bind(&addr) 476 .await 477 .context("failed to bind address")?; 478 479 // Serve the app, and request crawling from upstream relays. 480 let serve = tokio::spawn(async move { 481 axum::serve(listener, app.into_make_service()) 482 .await 483 .context("failed to serve app") 484 }); 485 486 // Now that the app is live, request a crawl from upstream relays. 487 firehose::reconnect_relays(&client, &config).await; 488 489 serve 490 .await 491 .map_err(Into::into) 492 .and_then(|r| r) 493 .context("failed to serve app") 494} 495 496#[tokio::main(flavor = "multi_thread")] 497async fn main() -> anyhow::Result<()> { 498 // Dispatch out to a separate function without a derive macro to help rust-analyzer along. 499 run().await 500}