Alternative ATProto PDS implementation
1//! PDS implementation.
2mod account_manager;
3mod actor_endpoints;
4mod actor_store;
5mod auth;
6mod config;
7mod db;
8mod did;
9mod endpoints;
10pub mod error;
11mod firehose;
12mod metrics;
13mod mmap;
14mod oauth;
15mod plc;
16mod schema;
17mod service_proxy;
18#[cfg(test)]
19mod tests;
20
21use anyhow::{Context as _, anyhow};
22use atrium_api::types::string::Did;
23use atrium_crypto::keypair::{Export as _, Secp256k1Keypair};
24use auth::AuthenticatedUser;
25use axum::{
26 Router,
27 body::Body,
28 extract::{FromRef, Request, State},
29 http::{self, HeaderMap, Response, StatusCode, Uri},
30 response::IntoResponse,
31 routing::get,
32};
33use azure_core::credentials::TokenCredential;
34use clap::Parser;
35use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter};
36use config::AppConfig;
37use db::establish_pool;
38use deadpool_diesel::sqlite::Pool;
39use diesel::prelude::*;
40use diesel_migrations::{EmbeddedMigrations, embed_migrations};
41pub use error::Error;
42use figment::{Figment, providers::Format as _};
43use firehose::FirehoseProducer;
44use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager};
45use rand::Rng as _;
46use serde::{Deserialize, Serialize};
47use service_proxy::service_proxy;
48use std::{
49 net::{IpAddr, Ipv4Addr, SocketAddr},
50 path::PathBuf,
51 str::FromStr as _,
52 sync::Arc,
53};
54use tokio::net::TcpListener;
55use tower_http::{cors::CorsLayer, trace::TraceLayer};
56use tracing::{info, warn};
57use uuid::Uuid;
58
59/// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`.
60pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
61
62/// Embedded migrations
63pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
64
65/// The application-wide result type.
66pub type Result<T> = std::result::Result<T, Error>;
67/// The reqwest client type with middleware.
68pub type Client = reqwest_middleware::ClientWithMiddleware;
69/// The Azure credential type.
70pub type Cred = Arc<dyn TokenCredential>;
71
72#[expect(
73 clippy::arbitrary_source_item_ordering,
74 reason = "serialized data might be structured"
75)]
76#[derive(Serialize, Deserialize, Debug, Clone)]
77/// The key data structure.
78struct KeyData {
79 /// Primary signing key for all repo operations.
80 skey: Vec<u8>,
81 /// Primary signing (rotation) key for all PLC operations.
82 rkey: Vec<u8>,
83}
84
85// FIXME: We should use P256Keypair instead. SecP256K1 is primarily used for cryptocurrencies,
86// and the implementations of this algorithm are much more limited as compared to P256.
87//
88// Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/
89#[derive(Clone)]
90/// The signing key for PLC/DID operations.
91pub struct SigningKey(Arc<Secp256k1Keypair>);
92#[derive(Clone)]
93/// The rotation key for PLC operations.
94pub struct RotationKey(Arc<Secp256k1Keypair>);
95
96impl std::ops::Deref for SigningKey {
97 type Target = Secp256k1Keypair;
98
99 fn deref(&self) -> &Self::Target {
100 &self.0
101 }
102}
103
104impl SigningKey {
105 /// Import from a private key.
106 pub fn import(key: &[u8]) -> Result<Self> {
107 let key = Secp256k1Keypair::import(key).context("failed to import signing key")?;
108 Ok(Self(Arc::new(key)))
109 }
110}
111
112impl std::ops::Deref for RotationKey {
113 type Target = Secp256k1Keypair;
114
115 fn deref(&self) -> &Self::Target {
116 &self.0
117 }
118}
119
120#[derive(Parser, Debug, Clone)]
121/// Command line arguments.
122pub struct Args {
123 /// Path to the configuration file
124 #[arg(short, long, default_value = "default.toml")]
125 pub config: PathBuf,
126 /// The verbosity level.
127 #[command(flatten)]
128 pub verbosity: Verbosity<InfoLevel>,
129}
130
131pub struct ActorPools {
132 pub repo: Pool,
133 pub blob: Pool,
134}
135
136impl Clone for ActorPools {
137 fn clone(&self) -> Self {
138 Self {
139 repo: self.repo.clone(),
140 blob: self.blob.clone(),
141 }
142 }
143}
144
145#[expect(clippy::arbitrary_source_item_ordering, reason = "arbitrary")]
146#[derive(Clone, FromRef)]
147pub struct AppState {
148 /// The application configuration.
149 pub config: AppConfig,
150 /// The Azure credential.
151 pub cred: Cred,
152 /// The main database connection pool. Used for common PDS data, like invite codes.
153 pub db: Pool,
154 /// Actor-specific database connection pools. Hashed by DID.
155 pub db_actors: std::collections::HashMap<String, ActorPools>,
156
157 /// The HTTP client with middleware.
158 pub client: Client,
159 /// The simple HTTP client.
160 pub simple_client: reqwest::Client,
161 /// The firehose producer.
162 pub firehose: FirehoseProducer,
163
164 /// The signing key.
165 pub signing_key: SigningKey,
166 /// The rotation key.
167 pub rotation_key: RotationKey,
168}
169
170/// The index (/) route.
171async fn index() -> impl IntoResponse {
172 r"
173 __ __
174 /\ \__ /\ \__
175 __ \ \ ,_\ _____ _ __ ___\ \ ,_\ ___
176 /'__'\ \ \ \/ /\ '__'\/\''__\/ __'\ \ \/ / __'\
177 /\ \L\.\_\ \ \_\ \ \L\ \ \ \//\ \L\ \ \ \_/\ \L\ \
178 \ \__/.\_\\ \__\\ \ ,__/\ \_\\ \____/\ \__\ \____/
179 \/__/\/_/ \/__/ \ \ \/ \/_/ \/___/ \/__/\/___/
180 \ \_\
181 \/_/
182
183
184This is an AT Protocol Personal Data Server (aka, an atproto PDS)
185
186Most API routes are under /xrpc/
187
188 Code: https://github.com/DrChat/bluepds
189 Protocol: https://atproto.com
190 "
191}
192
193/// The main application entry point.
194#[expect(
195 clippy::cognitive_complexity,
196 clippy::too_many_lines,
197 unused_qualifications,
198 reason = "main function has high complexity"
199)]
200pub async fn run() -> anyhow::Result<()> {
201 let args = Args::parse();
202
203 // Set up trace logging to console and account for the user-provided verbosity flag.
204 if args.verbosity.log_level_filter() != LevelFilter::Off {
205 let lvl = match args.verbosity.log_level_filter() {
206 LevelFilter::Error => tracing::Level::ERROR,
207 LevelFilter::Warn => tracing::Level::WARN,
208 LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO,
209 LevelFilter::Debug => tracing::Level::DEBUG,
210 LevelFilter::Trace => tracing::Level::TRACE,
211 };
212 tracing_subscriber::fmt().with_max_level(lvl).init();
213 }
214
215 if !args.config.exists() {
216 // Throw up a warning if the config file does not exist.
217 //
218 // This is not fatal because users can specify all configuration settings via
219 // the environment, but the most likely scenario here is that a user accidentally
220 // omitted the config file for some reason (e.g. forgot to mount it into Docker).
221 warn!(
222 "configuration file {} does not exist",
223 args.config.display()
224 );
225 }
226
227 // Read and parse the user-provided configuration.
228 let config: AppConfig = Figment::new()
229 .admerge(figment::providers::Toml::file(args.config))
230 .admerge(figment::providers::Env::prefixed("BLUEPDS_"))
231 .extract()
232 .context("failed to load configuration")?;
233
234 if config.test {
235 warn!("BluePDS starting up in TEST mode.");
236 warn!("This means the application will not federate with the rest of the network.");
237 warn!(
238 "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`"
239 );
240 }
241
242 // Initialize metrics reporting.
243 metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?;
244
245 // Create a reqwest client that will be used for all outbound requests.
246 let simple_client = reqwest::Client::builder()
247 .user_agent(APP_USER_AGENT)
248 .build()
249 .context("failed to build requester client")?;
250 let client = reqwest_middleware::ClientBuilder::new(simple_client.clone())
251 .with(http_cache_reqwest::Cache(http_cache_reqwest::HttpCache {
252 mode: CacheMode::Default,
253 manager: MokaManager::default(),
254 options: HttpCacheOptions::default(),
255 }))
256 .build();
257
258 tokio::fs::create_dir_all(&config.key.parent().context("should have parent")?)
259 .await
260 .context("failed to create key directory")?;
261
262 // Check if crypto keys exist. If not, create new ones.
263 let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) {
264 let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f))
265 .context("failed to deserialize crypto keys")?;
266
267 let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?;
268 let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?;
269
270 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
271 } else {
272 info!("signing keys not found, generating new ones");
273
274 let skey = Secp256k1Keypair::create(&mut rand::thread_rng());
275 let rkey = Secp256k1Keypair::create(&mut rand::thread_rng());
276
277 let keys = KeyData {
278 skey: skey.export(),
279 rkey: rkey.export(),
280 };
281
282 let mut f = std::fs::File::create(&config.key).context("failed to create key file")?;
283 serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?;
284
285 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
286 };
287
288 tokio::fs::create_dir_all(&config.repo.path).await?;
289 tokio::fs::create_dir_all(&config.plc.path).await?;
290 tokio::fs::create_dir_all(&config.blob.path).await?;
291
292 let cred = azure_identity::DefaultAzureCredential::new()
293 .context("failed to create Azure credential")?;
294
295 // Create a database connection manager and pool for the main database.
296 let pool =
297 establish_pool(&config.db).context("failed to establish database connection pool")?;
298 // Create a dictionary of database connection pools for each actor.
299 let mut actor_pools = std::collections::HashMap::new();
300 // let mut actor_blob_pools = std::collections::HashMap::new();
301 // We'll determine actors by looking in the data/repo dir for .db files.
302 let mut actor_dbs = tokio::fs::read_dir(&config.repo.path)
303 .await
304 .context("failed to read repo directory")?;
305 while let Some(entry) = actor_dbs
306 .next_entry()
307 .await
308 .context("failed to read repo dir")?
309 {
310 let path = entry.path();
311 if path.extension().and_then(|s| s.to_str()) == Some("db") {
312 let did_path = path
313 .file_stem()
314 .and_then(|s| s.to_str())
315 .context("failed to get actor DID")?;
316 let did = Did::from_str(&format!("did:plc:{}", did_path))
317 .expect("should be able to parse actor DID");
318
319 // Create a new database connection manager and pool for the actor.
320 // The path for the SQLite connection needs to look like "sqlite://data/repo/<actor>.db"
321 let path_repo = format!("sqlite://{}", did_path);
322 let actor_repo_pool =
323 establish_pool(&path_repo).context("failed to create database connection pool")?;
324 // Create a new database connection manager and pool for the actor blobs.
325 // The path for the SQLite connection needs to look like "sqlite://data/blob/<actor>.db"
326 let path_blob = path_repo.replace("repo", "blob");
327 let actor_blob_pool =
328 establish_pool(&path_blob).context("failed to create database connection pool")?;
329 drop(actor_pools.insert(
330 did.to_string(),
331 ActorPools {
332 repo: actor_repo_pool,
333 blob: actor_blob_pool,
334 },
335 ));
336 }
337 }
338 // Apply pending migrations
339 // let conn = pool.get().await?;
340 // conn.run_pending_migrations(MIGRATIONS)
341 // .expect("should be able to run migrations");
342
343 let (_fh, fhp) = firehose::spawn(client.clone(), config.clone());
344
345 let addr = config
346 .listen_address
347 .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000));
348
349 let app = Router::new()
350 .route("/", get(index))
351 .merge(oauth::routes())
352 .nest(
353 "/xrpc",
354 endpoints::routes()
355 .merge(actor_endpoints::routes())
356 .fallback(service_proxy),
357 )
358 // .layer(RateLimitLayer::new(30, Duration::from_secs(30)))
359 .layer(CorsLayer::permissive())
360 .layer(TraceLayer::new_for_http())
361 .with_state(AppState {
362 cred,
363 config: config.clone(),
364 db: pool.clone(),
365 db_actors: actor_pools.clone(),
366 client: client.clone(),
367 simple_client,
368 firehose: fhp,
369 signing_key: skey,
370 rotation_key: rkey,
371 });
372
373 info!("listening on {addr}");
374 info!("connect to: http://127.0.0.1:{}", addr.port());
375
376 // Determine whether or not this was the first startup (i.e. no accounts exist and no invite codes were created).
377 // If so, create an invite code and share it via the console.
378 let conn = pool.get().await.context("failed to get db connection")?;
379
380 #[derive(QueryableByName)]
381 struct TotalCount {
382 #[diesel(sql_type = diesel::sql_types::Integer)]
383 total_count: i32,
384 }
385
386 let result = conn.interact(move |conn| {
387 diesel::sql_query(
388 "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count",
389 )
390 .get_result::<TotalCount>(conn)
391 })
392 .await
393 .expect("should be able to query database")?;
394
395 let c = result.total_count;
396
397 #[expect(clippy::print_stdout)]
398 if c == 0 {
399 let uuid = Uuid::new_v4().to_string();
400
401 let uuid_clone = uuid.clone();
402 _ = conn
403 .interact(move |conn| {
404 diesel::sql_query(
405 "INSERT INTO invites (id, did, count, created_at) VALUES (?, NULL, 1, datetime('now'))",
406 )
407 .bind::<diesel::sql_types::Text, _>(uuid_clone)
408 .execute(conn)
409 .context("failed to create new invite code")
410 .expect("should be able to create invite code")
411 })
412 .await
413 .expect("should be able to create invite code");
414
415 // N.B: This is a sensitive message, so we're bypassing `tracing` here and
416 // logging it directly to console.
417 println!("=====================================");
418 println!(" FIRST STARTUP ");
419 println!("=====================================");
420 println!("Use this code to create an account:");
421 println!("{uuid}");
422 println!("=====================================");
423 }
424
425 let listener = TcpListener::bind(&addr)
426 .await
427 .context("failed to bind address")?;
428
429 // Serve the app, and request crawling from upstream relays.
430 let serve = tokio::spawn(async move {
431 axum::serve(listener, app.into_make_service())
432 .await
433 .context("failed to serve app")
434 });
435
436 // Now that the app is live, request a crawl from upstream relays.
437 firehose::reconnect_relays(&client, &config).await;
438
439 serve
440 .await
441 .map_err(Into::into)
442 .and_then(|r| r)
443 .context("failed to serve app")
444}
445
446/// Creates an app router with the provided AppState.
447pub fn create_app(state: AppState) -> Router {
448 Router::new()
449 .route("/", get(index))
450 .merge(oauth::routes())
451 .nest(
452 "/xrpc",
453 endpoints::routes()
454 .merge(actor_endpoints::routes())
455 .fallback(service_proxy),
456 )
457 .layer(CorsLayer::permissive())
458 .layer(TraceLayer::new_for_http())
459 .with_state(state)
460}