Alternative ATProto PDS implementation
at oauth 16 kB view raw
1use super::account_manager::AccountManager; 2use super::config::AppConfig; 3use super::db::establish_pool; 4pub use super::error::Error; 5use super::service_proxy::service_proxy; 6use anyhow::Context as _; 7use atrium_api::types::string::Did; 8use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; 9use axum::{Router, extract::FromRef, routing::get}; 10use clap::Parser; 11use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter}; 12use deadpool_diesel::sqlite::Pool; 13use diesel::prelude::*; 14use diesel_migrations::{EmbeddedMigrations, embed_migrations}; 15use figment::{Figment, providers::Format as _}; 16use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 17use rsky_common::env::env_list; 18use rsky_identity::IdResolver; 19use rsky_identity::types::{DidCache, IdentityResolverOpts}; 20use rsky_pds::{crawlers::Crawlers, sequencer::Sequencer}; 21use serde::{Deserialize, Serialize}; 22use std::env; 23use std::{ 24 net::{IpAddr, Ipv4Addr, SocketAddr}, 25 path::PathBuf, 26 str::FromStr as _, 27 sync::Arc, 28}; 29use tokio::{net::TcpListener, sync::RwLock}; 30use tower_http::{cors::CorsLayer, trace::TraceLayer}; 31use tracing::{info, warn}; 32use uuid::Uuid; 33 34/// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`. 35pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); 36 37/// Embedded migrations 38pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations"); 39pub const MIGRATIONS_ACTOR: EmbeddedMigrations = embed_migrations!("./migrations_actor"); 40 41/// The application-wide result type. 42pub type Result<T> = std::result::Result<T, Error>; 43/// The reqwest client type with middleware. 44pub type Client = reqwest_middleware::ClientWithMiddleware; 45 46#[expect( 47 clippy::arbitrary_source_item_ordering, 48 reason = "serialized data might be structured" 49)] 50#[derive(Serialize, Deserialize, Debug, Clone)] 51/// The key data structure. 52struct KeyData { 53 /// Primary signing key for all repo operations. 54 skey: Vec<u8>, 55 /// Primary signing (rotation) key for all PLC operations. 56 rkey: Vec<u8>, 57} 58 59// FIXME: We should use P256Keypair instead. SecP256K1 is primarily used for cryptocurrencies, 60// and the implementations of this algorithm are much more limited as compared to P256. 61// 62// Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/ 63#[derive(Clone)] 64/// The signing key for PLC/DID operations. 65pub struct SigningKey(Arc<Secp256k1Keypair>); 66#[derive(Clone)] 67/// The rotation key for PLC operations. 68pub struct RotationKey(Arc<Secp256k1Keypair>); 69 70impl std::ops::Deref for SigningKey { 71 type Target = Secp256k1Keypair; 72 73 fn deref(&self) -> &Self::Target { 74 &self.0 75 } 76} 77 78impl SigningKey { 79 /// Import from a private key. 80 pub fn import(key: &[u8]) -> Result<Self> { 81 let key = Secp256k1Keypair::import(key).context("failed to import signing key")?; 82 Ok(Self(Arc::new(key))) 83 } 84} 85 86impl std::ops::Deref for RotationKey { 87 type Target = Secp256k1Keypair; 88 89 fn deref(&self) -> &Self::Target { 90 &self.0 91 } 92} 93 94#[derive(Parser, Debug, Clone)] 95/// Command line arguments. 96pub struct Args { 97 /// Path to the configuration file 98 #[arg(short, long, default_value = "default.toml")] 99 pub config: PathBuf, 100 /// The verbosity level. 101 #[command(flatten)] 102 pub verbosity: Verbosity<InfoLevel>, 103} 104 105/// The actor pools for the database connections. 106pub struct ActorStorage { 107 /// The database connection pool for the actor's repository. 108 pub repo: Pool, 109 /// The file storage path for the actor's blobs. 110 pub blob: PathBuf, 111} 112 113impl Clone for ActorStorage { 114 fn clone(&self) -> Self { 115 Self { 116 repo: self.repo.clone(), 117 blob: self.blob.clone(), 118 } 119 } 120} 121 122#[expect(clippy::arbitrary_source_item_ordering, reason = "arbitrary")] 123#[derive(Clone, FromRef)] 124/// The application state, shared across all routes. 125pub struct AppState { 126 /// The application configuration. 127 pub(crate) config: AppConfig, 128 /// The main database connection pool. Used for common PDS data, like invite codes. 129 pub db: Pool, 130 /// Actor-specific database connection pools. Hashed by DID. 131 pub db_actors: std::collections::HashMap<String, ActorStorage>, 132 133 /// The HTTP client with middleware. 134 pub client: Client, 135 /// The simple HTTP client. 136 pub simple_client: reqwest::Client, 137 /// The firehose producer. 138 pub sequencer: Arc<RwLock<Sequencer>>, 139 /// The account manager. 140 pub account_manager: Arc<RwLock<AccountManager>>, 141 /// The ID resolver. 142 pub id_resolver: Arc<RwLock<IdResolver>>, 143 144 /// The signing key. 145 pub signing_key: SigningKey, 146 /// The rotation key. 147 pub rotation_key: RotationKey, 148} 149 150/// The main application entry point. 151#[expect( 152 clippy::cognitive_complexity, 153 clippy::too_many_lines, 154 unused_qualifications, 155 reason = "main function has high complexity" 156)] 157pub async fn run() -> anyhow::Result<()> { 158 let args = Args::parse(); 159 160 // Set up trace logging to console and account for the user-provided verbosity flag. 161 if args.verbosity.log_level_filter() != LevelFilter::Off { 162 let lvl = match args.verbosity.log_level_filter() { 163 LevelFilter::Error => tracing::Level::ERROR, 164 LevelFilter::Warn => tracing::Level::WARN, 165 LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO, 166 LevelFilter::Debug => tracing::Level::DEBUG, 167 LevelFilter::Trace => tracing::Level::TRACE, 168 }; 169 tracing_subscriber::fmt().with_max_level(lvl).init(); 170 } 171 172 if !args.config.exists() { 173 // Throw up a warning if the config file does not exist. 174 // 175 // This is not fatal because users can specify all configuration settings via 176 // the environment, but the most likely scenario here is that a user accidentally 177 // omitted the config file for some reason (e.g. forgot to mount it into Docker). 178 warn!( 179 "configuration file {} does not exist", 180 args.config.display() 181 ); 182 } 183 184 // Read and parse the user-provided configuration. 185 let config: AppConfig = Figment::new() 186 .admerge(figment::providers::Toml::file(args.config)) 187 .admerge(figment::providers::Env::prefixed("BLUEPDS_")) 188 .extract() 189 .context("failed to load configuration")?; 190 191 if config.test { 192 warn!("BluePDS starting up in TEST mode."); 193 warn!("This means the application will not federate with the rest of the network."); 194 warn!( 195 "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`" 196 ); 197 } 198 199 // Initialize metrics reporting. 200 super::metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?; 201 202 // Create a reqwest client that will be used for all outbound requests. 203 let simple_client = reqwest::Client::builder() 204 .user_agent(APP_USER_AGENT) 205 .build() 206 .context("failed to build requester client")?; 207 let client = reqwest_middleware::ClientBuilder::new(simple_client.clone()) 208 .with(http_cache_reqwest::Cache(http_cache_reqwest::HttpCache { 209 mode: CacheMode::Default, 210 manager: MokaManager::default(), 211 options: HttpCacheOptions::default(), 212 })) 213 .build(); 214 215 tokio::fs::create_dir_all(&config.key.parent().context("should have parent")?) 216 .await 217 .context("failed to create key directory")?; 218 219 // Check if crypto keys exist. If not, create new ones. 220 let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) { 221 let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f)) 222 .context("failed to deserialize crypto keys")?; 223 224 let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?; 225 let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?; 226 227 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 228 } else { 229 info!("signing keys not found, generating new ones"); 230 231 let skey = Secp256k1Keypair::create(&mut rand::thread_rng()); 232 let rkey = Secp256k1Keypair::create(&mut rand::thread_rng()); 233 234 let keys = KeyData { 235 skey: skey.export(), 236 rkey: rkey.export(), 237 }; 238 239 let mut f = std::fs::File::create(&config.key).context("failed to create key file")?; 240 serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?; 241 242 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey))) 243 }; 244 245 tokio::fs::create_dir_all(&config.repo.path).await?; 246 tokio::fs::create_dir_all(&config.plc.path).await?; 247 tokio::fs::create_dir_all(&config.blob.path).await?; 248 249 // Create a database connection manager and pool for the main database. 250 let pool = 251 establish_pool(&config.db).context("failed to establish database connection pool")?; 252 253 // Create a dictionary of database connection pools for each actor. 254 let mut actor_pools = std::collections::HashMap::new(); 255 // We'll determine actors by looking in the data/repo dir for .db files. 256 let mut actor_dbs = tokio::fs::read_dir(&config.repo.path) 257 .await 258 .context("failed to read repo directory")?; 259 while let Some(entry) = actor_dbs 260 .next_entry() 261 .await 262 .context("failed to read repo dir")? 263 { 264 let path = entry.path(); 265 if path.extension().and_then(|s| s.to_str()) == Some("db") { 266 let actor_repo_pool = establish_pool(&format!("sqlite://{}", path.display())) 267 .context("failed to create database connection pool")?; 268 269 let did = Did::from_str(&format!( 270 "did:plc:{}", 271 path.file_stem() 272 .and_then(|s| s.to_str()) 273 .context("failed to get actor DID")? 274 )) 275 .expect("should be able to parse actor DID") 276 .to_string(); 277 let blob_path = config.blob.path.to_path_buf(); 278 let actor_storage = ActorStorage { 279 repo: actor_repo_pool, 280 blob: blob_path.clone(), 281 }; 282 drop(actor_pools.insert(did, actor_storage)); 283 } 284 } 285 // Apply pending migrations 286 // let conn = pool.get().await?; 287 // conn.run_pending_migrations(MIGRATIONS) 288 // .expect("should be able to run migrations"); 289 290 let hostname = config.host_name.clone(); 291 let crawlers: Vec<String> = config 292 .firehose 293 .relays 294 .iter() 295 .map(|s| s.to_string()) 296 .collect(); 297 let sequencer = Arc::new(RwLock::new(Sequencer::new( 298 Crawlers::new(hostname, crawlers.clone()), 299 None, 300 ))); 301 let account_manager = Arc::new(RwLock::new(AccountManager::new(pool.clone()))); 302 let plc_url = if cfg!(debug_assertions) { 303 "http://localhost:8000".to_owned() // dummy for debug 304 } else { 305 env::var("PDS_DID_PLC_URL").unwrap_or("https://plc.directory".to_owned()) // TODO: toml config 306 }; 307 let id_resolver = Arc::new(RwLock::new(IdResolver::new(IdentityResolverOpts { 308 timeout: None, 309 plc_url: Some(plc_url), 310 did_cache: Some(DidCache::new(None, None)), 311 backup_nameservers: Some(env_list("PDS_HANDLE_BACKUP_NAMESERVERS")), 312 }))); 313 314 let addr = config 315 .listen_address 316 .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000)); 317 318 let app = Router::new() 319 .route("/", get(super::index)) 320 .merge(super::oauth::routes()) 321 .nest( 322 "/xrpc", 323 super::apis::routes() 324 .merge(super::actor_endpoints::routes()) 325 .fallback(service_proxy), 326 ) 327 // .layer(RateLimitLayer::new(30, Duration::from_secs(30))) 328 .layer(CorsLayer::permissive()) 329 .layer(TraceLayer::new_for_http()) 330 .with_state(AppState { 331 config: config.clone(), 332 db: pool.clone(), 333 db_actors: actor_pools.clone(), 334 client: client.clone(), 335 simple_client, 336 sequencer: sequencer.clone(), 337 account_manager, 338 id_resolver, 339 signing_key: skey, 340 rotation_key: rkey, 341 }); 342 343 info!("listening on {addr}"); 344 info!("connect to: http://127.0.0.1:{}", addr.port()); 345 346 // Determine whether or not this was the first startup (i.e. no accounts exist and no invite codes were created). 347 // If so, create an invite code and share it via the console. 348 let conn = pool.get().await.context("failed to get db connection")?; 349 350 #[derive(QueryableByName)] 351 struct TotalCount { 352 #[diesel(sql_type = diesel::sql_types::Integer)] 353 total_count: i32, 354 } 355 356 let result = conn.interact(move |conn| { 357 diesel::sql_query( 358 "SELECT (SELECT COUNT(*) FROM account) + (SELECT COUNT(*) FROM invite_code) AS total_count", 359 ) 360 .get_result::<TotalCount>(conn) 361 }) 362 .await 363 .expect("should be able to query database")?; 364 365 let c = result.total_count; 366 367 #[expect(clippy::print_stdout)] 368 if c == 0 { 369 let uuid = Uuid::new_v4().to_string(); 370 371 use crate::models::pds as models; 372 use crate::schema::pds::invite_code::dsl as InviteCode; 373 let uuid_clone = uuid.clone(); 374 drop( 375 conn.interact(move |conn| { 376 diesel::insert_into(InviteCode::invite_code) 377 .values(models::InviteCode { 378 code: uuid_clone, 379 available_uses: 1, 380 disabled: 0, 381 for_account: "None".to_owned(), 382 created_by: "None".to_owned(), 383 created_at: "None".to_owned(), 384 }) 385 .execute(conn) 386 .context("failed to create new invite code") 387 }) 388 .await 389 .expect("should be able to create invite code"), 390 ); 391 392 // N.B: This is a sensitive message, so we're bypassing `tracing` here and 393 // logging it directly to console. 394 println!("====================================="); 395 println!(" FIRST STARTUP "); 396 println!("====================================="); 397 println!("Use this code to create an account:"); 398 println!("{uuid}"); 399 println!("====================================="); 400 } 401 402 let listener = TcpListener::bind(&addr) 403 .await 404 .context("failed to bind address")?; 405 406 // Serve the app, and request crawling from upstream relays. 407 let serve = tokio::spawn(async move { 408 axum::serve(listener, app.into_make_service()) 409 .await 410 .context("failed to serve app") 411 }); 412 413 // Now that the app is live, request a crawl from upstream relays. 414 if cfg!(debug_assertions) { 415 info!("debug mode: not requesting crawl"); 416 } else { 417 info!("requesting crawl from upstream relays"); 418 let mut background_sequencer = sequencer.write().await.clone(); 419 drop(tokio::spawn( 420 async move { background_sequencer.start().await }, 421 )); 422 } 423 424 serve 425 .await 426 .map_err(Into::into) 427 .and_then(|r| r) 428 .context("failed to serve app") 429}