Alternative ATProto PDS implementation
1use super::account_manager::AccountManager;
2use super::config::AppConfig;
3use super::db::establish_pool;
4pub use super::error::Error;
5use super::service_proxy::service_proxy;
6use anyhow::Context as _;
7use atrium_api::types::string::Did;
8use atrium_crypto::keypair::{Export as _, Secp256k1Keypair};
9use axum::{Router, extract::FromRef, routing::get};
10use clap::Parser;
11use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter};
12use deadpool_diesel::sqlite::Pool;
13use diesel::prelude::*;
14use diesel_migrations::{EmbeddedMigrations, embed_migrations};
15use figment::{Figment, providers::Format as _};
16use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager};
17use rsky_common::env::env_list;
18use rsky_identity::IdResolver;
19use rsky_identity::types::{DidCache, IdentityResolverOpts};
20use rsky_pds::{crawlers::Crawlers, sequencer::Sequencer};
21use serde::{Deserialize, Serialize};
22use std::env;
23use std::{
24 net::{IpAddr, Ipv4Addr, SocketAddr},
25 path::PathBuf,
26 str::FromStr as _,
27 sync::Arc,
28};
29use tokio::{net::TcpListener, sync::RwLock};
30use tower_http::{cors::CorsLayer, trace::TraceLayer};
31use tracing::{info, warn};
32use uuid::Uuid;
33
34/// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`.
35pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
36
37/// Embedded migrations
38pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
39pub const MIGRATIONS_ACTOR: EmbeddedMigrations = embed_migrations!("./migrations_actor");
40
41/// The application-wide result type.
42pub type Result<T> = std::result::Result<T, Error>;
43/// The reqwest client type with middleware.
44pub type Client = reqwest_middleware::ClientWithMiddleware;
45
46#[expect(
47 clippy::arbitrary_source_item_ordering,
48 reason = "serialized data might be structured"
49)]
50#[derive(Serialize, Deserialize, Debug, Clone)]
51/// The key data structure.
52struct KeyData {
53 /// Primary signing key for all repo operations.
54 skey: Vec<u8>,
55 /// Primary signing (rotation) key for all PLC operations.
56 rkey: Vec<u8>,
57}
58
59// FIXME: We should use P256Keypair instead. SecP256K1 is primarily used for cryptocurrencies,
60// and the implementations of this algorithm are much more limited as compared to P256.
61//
62// Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/
63#[derive(Clone)]
64/// The signing key for PLC/DID operations.
65pub struct SigningKey(Arc<Secp256k1Keypair>);
66#[derive(Clone)]
67/// The rotation key for PLC operations.
68pub struct RotationKey(Arc<Secp256k1Keypair>);
69
70impl std::ops::Deref for SigningKey {
71 type Target = Secp256k1Keypair;
72
73 fn deref(&self) -> &Self::Target {
74 &self.0
75 }
76}
77
78impl SigningKey {
79 /// Import from a private key.
80 pub fn import(key: &[u8]) -> Result<Self> {
81 let key = Secp256k1Keypair::import(key).context("failed to import signing key")?;
82 Ok(Self(Arc::new(key)))
83 }
84}
85
86impl std::ops::Deref for RotationKey {
87 type Target = Secp256k1Keypair;
88
89 fn deref(&self) -> &Self::Target {
90 &self.0
91 }
92}
93
94#[derive(Parser, Debug, Clone)]
95/// Command line arguments.
96pub struct Args {
97 /// Path to the configuration file
98 #[arg(short, long, default_value = "default.toml")]
99 pub config: PathBuf,
100 /// The verbosity level.
101 #[command(flatten)]
102 pub verbosity: Verbosity<InfoLevel>,
103}
104
105/// The actor pools for the database connections.
106pub struct ActorStorage {
107 /// The database connection pool for the actor's repository.
108 pub repo: Pool,
109 /// The file storage path for the actor's blobs.
110 pub blob: PathBuf,
111}
112
113impl Clone for ActorStorage {
114 fn clone(&self) -> Self {
115 Self {
116 repo: self.repo.clone(),
117 blob: self.blob.clone(),
118 }
119 }
120}
121
122#[expect(clippy::arbitrary_source_item_ordering, reason = "arbitrary")]
123#[derive(Clone, FromRef)]
124/// The application state, shared across all routes.
125pub struct AppState {
126 /// The application configuration.
127 pub(crate) config: AppConfig,
128 /// The main database connection pool. Used for common PDS data, like invite codes.
129 pub db: Pool,
130 /// Actor-specific database connection pools. Hashed by DID.
131 pub db_actors: std::collections::HashMap<String, ActorStorage>,
132
133 /// The HTTP client with middleware.
134 pub client: Client,
135 /// The simple HTTP client.
136 pub simple_client: reqwest::Client,
137 /// The firehose producer.
138 pub sequencer: Arc<RwLock<Sequencer>>,
139 /// The account manager.
140 pub account_manager: Arc<RwLock<AccountManager>>,
141 /// The ID resolver.
142 pub id_resolver: Arc<RwLock<IdResolver>>,
143
144 /// The signing key.
145 pub signing_key: SigningKey,
146 /// The rotation key.
147 pub rotation_key: RotationKey,
148}
149
150/// The main application entry point.
151#[expect(
152 clippy::cognitive_complexity,
153 clippy::too_many_lines,
154 unused_qualifications,
155 reason = "main function has high complexity"
156)]
157pub async fn run() -> anyhow::Result<()> {
158 let args = Args::parse();
159
160 // Set up trace logging to console and account for the user-provided verbosity flag.
161 if args.verbosity.log_level_filter() != LevelFilter::Off {
162 let lvl = match args.verbosity.log_level_filter() {
163 LevelFilter::Error => tracing::Level::ERROR,
164 LevelFilter::Warn => tracing::Level::WARN,
165 LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO,
166 LevelFilter::Debug => tracing::Level::DEBUG,
167 LevelFilter::Trace => tracing::Level::TRACE,
168 };
169 tracing_subscriber::fmt().with_max_level(lvl).init();
170 }
171
172 if !args.config.exists() {
173 // Throw up a warning if the config file does not exist.
174 //
175 // This is not fatal because users can specify all configuration settings via
176 // the environment, but the most likely scenario here is that a user accidentally
177 // omitted the config file for some reason (e.g. forgot to mount it into Docker).
178 warn!(
179 "configuration file {} does not exist",
180 args.config.display()
181 );
182 }
183
184 // Read and parse the user-provided configuration.
185 let config: AppConfig = Figment::new()
186 .admerge(figment::providers::Toml::file(args.config))
187 .admerge(figment::providers::Env::prefixed("BLUEPDS_"))
188 .extract()
189 .context("failed to load configuration")?;
190
191 if config.test {
192 warn!("BluePDS starting up in TEST mode.");
193 warn!("This means the application will not federate with the rest of the network.");
194 warn!(
195 "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`"
196 );
197 }
198
199 // Initialize metrics reporting.
200 super::metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?;
201
202 // Create a reqwest client that will be used for all outbound requests.
203 let simple_client = reqwest::Client::builder()
204 .user_agent(APP_USER_AGENT)
205 .build()
206 .context("failed to build requester client")?;
207 let client = reqwest_middleware::ClientBuilder::new(simple_client.clone())
208 .with(http_cache_reqwest::Cache(http_cache_reqwest::HttpCache {
209 mode: CacheMode::Default,
210 manager: MokaManager::default(),
211 options: HttpCacheOptions::default(),
212 }))
213 .build();
214
215 tokio::fs::create_dir_all(&config.key.parent().context("should have parent")?)
216 .await
217 .context("failed to create key directory")?;
218
219 // Check if crypto keys exist. If not, create new ones.
220 let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) {
221 let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f))
222 .context("failed to deserialize crypto keys")?;
223
224 let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?;
225 let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?;
226
227 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
228 } else {
229 info!("signing keys not found, generating new ones");
230
231 let skey = Secp256k1Keypair::create(&mut rand::thread_rng());
232 let rkey = Secp256k1Keypair::create(&mut rand::thread_rng());
233
234 let keys = KeyData {
235 skey: skey.export(),
236 rkey: rkey.export(),
237 };
238
239 let mut f = std::fs::File::create(&config.key).context("failed to create key file")?;
240 serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?;
241
242 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
243 };
244
245 tokio::fs::create_dir_all(&config.repo.path).await?;
246 tokio::fs::create_dir_all(&config.plc.path).await?;
247 tokio::fs::create_dir_all(&config.blob.path).await?;
248
249 // Create a database connection manager and pool for the main database.
250 let pool =
251 establish_pool(&config.db).context("failed to establish database connection pool")?;
252
253 // Create a dictionary of database connection pools for each actor.
254 let mut actor_pools = std::collections::HashMap::new();
255 // We'll determine actors by looking in the data/repo dir for .db files.
256 let mut actor_dbs = tokio::fs::read_dir(&config.repo.path)
257 .await
258 .context("failed to read repo directory")?;
259 while let Some(entry) = actor_dbs
260 .next_entry()
261 .await
262 .context("failed to read repo dir")?
263 {
264 let path = entry.path();
265 if path.extension().and_then(|s| s.to_str()) == Some("db") {
266 let actor_repo_pool = establish_pool(&format!("sqlite://{}", path.display()))
267 .context("failed to create database connection pool")?;
268
269 let did = Did::from_str(&format!(
270 "did:plc:{}",
271 path.file_stem()
272 .and_then(|s| s.to_str())
273 .context("failed to get actor DID")?
274 ))
275 .expect("should be able to parse actor DID")
276 .to_string();
277 let blob_path = config.blob.path.to_path_buf();
278 let actor_storage = ActorStorage {
279 repo: actor_repo_pool,
280 blob: blob_path.clone(),
281 };
282 drop(actor_pools.insert(did, actor_storage));
283 }
284 }
285 // Apply pending migrations
286 // let conn = pool.get().await?;
287 // conn.run_pending_migrations(MIGRATIONS)
288 // .expect("should be able to run migrations");
289
290 let hostname = config.host_name.clone();
291 let crawlers: Vec<String> = config
292 .firehose
293 .relays
294 .iter()
295 .map(|s| s.to_string())
296 .collect();
297 let sequencer = Arc::new(RwLock::new(Sequencer::new(
298 Crawlers::new(hostname, crawlers.clone()),
299 None,
300 )));
301 let account_manager = Arc::new(RwLock::new(AccountManager::new(pool.clone())));
302 let plc_url = if cfg!(debug_assertions) {
303 "http://localhost:8000".to_owned() // dummy for debug
304 } else {
305 env::var("PDS_DID_PLC_URL").unwrap_or("https://plc.directory".to_owned()) // TODO: toml config
306 };
307 let id_resolver = Arc::new(RwLock::new(IdResolver::new(IdentityResolverOpts {
308 timeout: None,
309 plc_url: Some(plc_url),
310 did_cache: Some(DidCache::new(None, None)),
311 backup_nameservers: Some(env_list("PDS_HANDLE_BACKUP_NAMESERVERS")),
312 })));
313
314 let addr = config
315 .listen_address
316 .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000));
317
318 let app = Router::new()
319 .route("/", get(super::index))
320 .merge(super::oauth::routes())
321 .nest(
322 "/xrpc",
323 super::apis::routes()
324 .merge(super::actor_endpoints::routes())
325 .fallback(service_proxy),
326 )
327 // .layer(RateLimitLayer::new(30, Duration::from_secs(30)))
328 .layer(CorsLayer::permissive())
329 .layer(TraceLayer::new_for_http())
330 .with_state(AppState {
331 config: config.clone(),
332 db: pool.clone(),
333 db_actors: actor_pools.clone(),
334 client: client.clone(),
335 simple_client,
336 sequencer: sequencer.clone(),
337 account_manager,
338 id_resolver,
339 signing_key: skey,
340 rotation_key: rkey,
341 });
342
343 info!("listening on {addr}");
344 info!("connect to: http://127.0.0.1:{}", addr.port());
345
346 // Determine whether or not this was the first startup (i.e. no accounts exist and no invite codes were created).
347 // If so, create an invite code and share it via the console.
348 let conn = pool.get().await.context("failed to get db connection")?;
349
350 #[derive(QueryableByName)]
351 struct TotalCount {
352 #[diesel(sql_type = diesel::sql_types::Integer)]
353 total_count: i32,
354 }
355
356 let result = conn.interact(move |conn| {
357 diesel::sql_query(
358 "SELECT (SELECT COUNT(*) FROM account) + (SELECT COUNT(*) FROM invite_code) AS total_count",
359 )
360 .get_result::<TotalCount>(conn)
361 })
362 .await
363 .expect("should be able to query database")?;
364
365 let c = result.total_count;
366
367 #[expect(clippy::print_stdout)]
368 if c == 0 {
369 let uuid = Uuid::new_v4().to_string();
370
371 use crate::models::pds as models;
372 use crate::schema::pds::invite_code::dsl as InviteCode;
373 let uuid_clone = uuid.clone();
374 drop(
375 conn.interact(move |conn| {
376 diesel::insert_into(InviteCode::invite_code)
377 .values(models::InviteCode {
378 code: uuid_clone,
379 available_uses: 1,
380 disabled: 0,
381 for_account: "None".to_owned(),
382 created_by: "None".to_owned(),
383 created_at: "None".to_owned(),
384 })
385 .execute(conn)
386 .context("failed to create new invite code")
387 })
388 .await
389 .expect("should be able to create invite code"),
390 );
391
392 // N.B: This is a sensitive message, so we're bypassing `tracing` here and
393 // logging it directly to console.
394 println!("=====================================");
395 println!(" FIRST STARTUP ");
396 println!("=====================================");
397 println!("Use this code to create an account:");
398 println!("{uuid}");
399 println!("=====================================");
400 }
401
402 let listener = TcpListener::bind(&addr)
403 .await
404 .context("failed to bind address")?;
405
406 // Serve the app, and request crawling from upstream relays.
407 let serve = tokio::spawn(async move {
408 axum::serve(listener, app.into_make_service())
409 .await
410 .context("failed to serve app")
411 });
412
413 // Now that the app is live, request a crawl from upstream relays.
414 if cfg!(debug_assertions) {
415 info!("debug mode: not requesting crawl");
416 } else {
417 info!("requesting crawl from upstream relays");
418 let mut background_sequencer = sequencer.write().await.clone();
419 drop(tokio::spawn(
420 async move { background_sequencer.start().await },
421 ));
422 }
423
424 serve
425 .await
426 .map_err(Into::into)
427 .and_then(|r| r)
428 .context("failed to serve app")
429}