Alternative ATProto PDS implementation
1//! PDS implementation.
2mod account_manager;
3mod actor_store;
4mod auth;
5mod config;
6mod db;
7mod did;
8mod endpoints;
9mod error;
10mod firehose;
11mod metrics;
12mod mmap;
13mod oauth;
14mod plc;
15#[cfg(test)]
16mod tests;
17
18/// HACK: store private user preferences in the PDS.
19///
20/// We shouldn't have to know about any bsky endpoints to store private user data.
21/// This will _very likely_ be changed in the future.
22mod actor_endpoints;
23
24use anyhow::{Context as _, anyhow};
25use atrium_api::types::string::Did;
26use atrium_crypto::keypair::{Export as _, Secp256k1Keypair};
27use auth::AuthenticatedUser;
28use axum::{
29 Router,
30 body::Body,
31 extract::{FromRef, Request, State},
32 http::{self, HeaderMap, Response, StatusCode, Uri},
33 response::IntoResponse,
34 routing::get,
35};
36use azure_core::credentials::TokenCredential;
37use clap::Parser;
38use clap_verbosity_flag::{InfoLevel, Verbosity, log::LevelFilter};
39use config::AppConfig;
40use diesel::prelude::*;
41use diesel::r2d2::{self, ConnectionManager};
42use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations};
43#[expect(clippy::pub_use, clippy::useless_attribute)]
44pub use error::Error;
45use figment::{Figment, providers::Format as _};
46use firehose::FirehoseProducer;
47use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager};
48use rand::Rng as _;
49use serde::{Deserialize, Serialize};
50use std::{
51 net::{IpAddr, Ipv4Addr, SocketAddr},
52 path::PathBuf,
53 str::FromStr as _,
54 sync::Arc,
55};
56use tokio::net::TcpListener;
57use tower_http::{cors::CorsLayer, trace::TraceLayer};
58use tracing::{info, warn};
59use uuid::Uuid;
60
61/// The application user agent. Concatenates the package name and version. e.g. `bluepds/0.0.0`.
62pub const APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
63
64/// Embedded migrations
65pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("./migrations");
66
67/// The application-wide result type.
68pub type Result<T> = std::result::Result<T, Error>;
69/// The reqwest client type with middleware.
70pub type Client = reqwest_middleware::ClientWithMiddleware;
71/// The database connection pool.
72pub type Db = r2d2::Pool<ConnectionManager<SqliteConnection>>;
73/// The Azure credential type.
74pub type Cred = Arc<dyn TokenCredential>;
75
76#[expect(
77 clippy::arbitrary_source_item_ordering,
78 reason = "serialized data might be structured"
79)]
80#[derive(Serialize, Deserialize, Debug, Clone)]
81/// The key data structure.
82struct KeyData {
83 /// Primary signing key for all repo operations.
84 skey: Vec<u8>,
85 /// Primary signing (rotation) key for all PLC operations.
86 rkey: Vec<u8>,
87}
88
89// FIXME: We should use P256Keypair instead. SecP256K1 is primarily used for cryptocurrencies,
90// and the implementations of this algorithm are much more limited as compared to P256.
91//
92// Reference: https://soatok.blog/2022/05/19/guidance-for-choosing-an-elliptic-curve-signature-algorithm-in-2022/
93#[derive(Clone)]
94/// The signing key for PLC/DID operations.
95pub struct SigningKey(Arc<Secp256k1Keypair>);
96#[derive(Clone)]
97/// The rotation key for PLC operations.
98pub struct RotationKey(Arc<Secp256k1Keypair>);
99
100impl std::ops::Deref for SigningKey {
101 type Target = Secp256k1Keypair;
102
103 fn deref(&self) -> &Self::Target {
104 &self.0
105 }
106}
107
108impl SigningKey {
109 /// Import from a private key.
110 pub fn import(key: &[u8]) -> Result<Self> {
111 let key = Secp256k1Keypair::import(key).context("failed to import signing key")?;
112 Ok(Self(Arc::new(key)))
113 }
114}
115
116impl std::ops::Deref for RotationKey {
117 type Target = Secp256k1Keypair;
118
119 fn deref(&self) -> &Self::Target {
120 &self.0
121 }
122}
123
124#[derive(Parser, Debug, Clone)]
125/// Command line arguments.
126struct Args {
127 /// Path to the configuration file
128 #[arg(short, long, default_value = "default.toml")]
129 config: PathBuf,
130 /// The verbosity level.
131 #[command(flatten)]
132 verbosity: Verbosity<InfoLevel>,
133}
134
135#[expect(clippy::arbitrary_source_item_ordering, reason = "arbitrary")]
136#[derive(Clone, FromRef)]
137struct AppState {
138 /// The application configuration.
139 config: AppConfig,
140 /// The Azure credential.
141 cred: Cred,
142 /// The database connection pool.
143 db: Db,
144
145 /// The HTTP client with middleware.
146 client: Client,
147 /// The simple HTTP client.
148 simple_client: reqwest::Client,
149 /// The firehose producer.
150 firehose: FirehoseProducer,
151
152 /// The signing key.
153 signing_key: SigningKey,
154 /// The rotation key.
155 rotation_key: RotationKey,
156}
157
158/// The index (/) route.
159async fn index() -> impl IntoResponse {
160 r"
161 __ __
162 /\ \__ /\ \__
163 __ \ \ ,_\ _____ _ __ ___\ \ ,_\ ___
164 /'__'\ \ \ \/ /\ '__'\/\''__\/ __'\ \ \/ / __'\
165 /\ \L\.\_\ \ \_\ \ \L\ \ \ \//\ \L\ \ \ \_/\ \L\ \
166 \ \__/.\_\\ \__\\ \ ,__/\ \_\\ \____/\ \__\ \____/
167 \/__/\/_/ \/__/ \ \ \/ \/_/ \/___/ \/__/\/___/
168 \ \_\
169 \/_/
170
171
172This is an AT Protocol Personal Data Server (aka, an atproto PDS)
173
174Most API routes are under /xrpc/
175
176 Code: https://github.com/DrChat/bluepds
177 Protocol: https://atproto.com
178 "
179}
180
181/// Service proxy.
182///
183/// Reference: <https://atproto.com/specs/xrpc#service-proxying>
184async fn service_proxy(
185 uri: Uri,
186 user: AuthenticatedUser,
187 State(skey): State<SigningKey>,
188 State(client): State<reqwest::Client>,
189 headers: HeaderMap,
190 request: Request<Body>,
191) -> Result<Response<Body>> {
192 let url_path = uri.path_and_query().context("invalid service proxy url")?;
193 let lxm = url_path
194 .path()
195 .strip_prefix("/")
196 .with_context(|| format!("invalid service proxy url prefix: {}", url_path.path()))?;
197
198 let user_did = user.did();
199 let (did, id) = match headers.get("atproto-proxy") {
200 Some(val) => {
201 let val =
202 std::str::from_utf8(val.as_bytes()).context("proxy header not valid utf-8")?;
203
204 let (did, id) = val.split_once('#').context("invalid proxy header")?;
205
206 let did =
207 Did::from_str(did).map_err(|e| anyhow!("atproto proxy not a valid DID: {e}"))?;
208
209 (did, format!("#{id}"))
210 }
211 // HACK: Assume the bluesky appview by default.
212 None => (
213 Did::new("did:web:api.bsky.app".to_owned())
214 .expect("service proxy should be a valid DID"),
215 "#bsky_appview".to_owned(),
216 ),
217 };
218
219 let did_doc = did::resolve(&Client::new(client.clone(), []), did.clone())
220 .await
221 .with_context(|| format!("failed to resolve did document {}", did.as_str()))?;
222
223 let Some(service) = did_doc.service.iter().find(|s| s.id == id) else {
224 return Err(Error::with_status(
225 StatusCode::BAD_REQUEST,
226 anyhow!("could not find resolve service #{id}"),
227 ));
228 };
229
230 let target_url: url::Url = service
231 .service_endpoint
232 .join(&format!("/xrpc{url_path}"))
233 .context("failed to construct target url")?;
234
235 let exp = (chrono::Utc::now().checked_add_signed(chrono::Duration::minutes(1)))
236 .context("should be valid expiration datetime")?
237 .timestamp();
238 let jti = rand::thread_rng()
239 .sample_iter(rand::distributions::Alphanumeric)
240 .take(10)
241 .map(char::from)
242 .collect::<String>();
243
244 // Mint a bearer token by signing a JSON web token.
245 // https://github.com/DavidBuchanan314/millipds/blob/5c7529a739d394e223c0347764f1cf4e8fd69f94/src/millipds/appview_proxy.py#L47-L59
246 let token = auth::sign(
247 &skey,
248 "JWT",
249 &serde_json::json!({
250 "iss": user_did.as_str(),
251 "aud": did.as_str(),
252 "lxm": lxm,
253 "exp": exp,
254 "jti": jti,
255 }),
256 )
257 .context("failed to sign jwt")?;
258
259 let mut h = HeaderMap::new();
260 if let Some(hdr) = request.headers().get("atproto-accept-labelers") {
261 drop(h.insert("atproto-accept-labelers", hdr.clone()));
262 }
263 if let Some(hdr) = request.headers().get(http::header::CONTENT_TYPE) {
264 drop(h.insert(http::header::CONTENT_TYPE, hdr.clone()));
265 }
266
267 let r = client
268 .request(request.method().clone(), target_url)
269 .headers(h)
270 .header(http::header::AUTHORIZATION, format!("Bearer {token}"))
271 .body(reqwest::Body::wrap_stream(
272 request.into_body().into_data_stream(),
273 ))
274 .send()
275 .await
276 .context("failed to send request")?;
277
278 let mut resp = Response::builder().status(r.status());
279 if let Some(hdrs) = resp.headers_mut() {
280 *hdrs = r.headers().clone();
281 }
282
283 let resp = resp
284 .body(Body::from_stream(r.bytes_stream()))
285 .context("failed to construct response")?;
286
287 Ok(resp)
288}
289
290/// The main application entry point.
291#[expect(
292 clippy::cognitive_complexity,
293 clippy::too_many_lines,
294 reason = "main function has high complexity"
295)]
296async fn run() -> anyhow::Result<()> {
297 let args = Args::parse();
298
299 // Set up trace logging to console and account for the user-provided verbosity flag.
300 if args.verbosity.log_level_filter() != LevelFilter::Off {
301 let lvl = match args.verbosity.log_level_filter() {
302 LevelFilter::Error => tracing::Level::ERROR,
303 LevelFilter::Warn => tracing::Level::WARN,
304 LevelFilter::Info | LevelFilter::Off => tracing::Level::INFO,
305 LevelFilter::Debug => tracing::Level::DEBUG,
306 LevelFilter::Trace => tracing::Level::TRACE,
307 };
308 tracing_subscriber::fmt().with_max_level(lvl).init();
309 }
310
311 if !args.config.exists() {
312 // Throw up a warning if the config file does not exist.
313 //
314 // This is not fatal because users can specify all configuration settings via
315 // the environment, but the most likely scenario here is that a user accidentally
316 // omitted the config file for some reason (e.g. forgot to mount it into Docker).
317 warn!(
318 "configuration file {} does not exist",
319 args.config.display()
320 );
321 }
322
323 // Read and parse the user-provided configuration.
324 let config: AppConfig = Figment::new()
325 .admerge(figment::providers::Toml::file(args.config))
326 .admerge(figment::providers::Env::prefixed("BLUEPDS_"))
327 .extract()
328 .context("failed to load configuration")?;
329
330 if config.test {
331 warn!("BluePDS starting up in TEST mode.");
332 warn!("This means the application will not federate with the rest of the network.");
333 warn!(
334 "If you want to turn this off, either set `test` to false in the config or define `BLUEPDS_TEST = false`"
335 );
336 }
337
338 // Initialize metrics reporting.
339 metrics::setup(config.metrics.as_ref()).context("failed to set up metrics exporter")?;
340
341 // Create a reqwest client that will be used for all outbound requests.
342 let simple_client = reqwest::Client::builder()
343 .user_agent(APP_USER_AGENT)
344 .build()
345 .context("failed to build requester client")?;
346 let client = reqwest_middleware::ClientBuilder::new(simple_client.clone())
347 .with(http_cache_reqwest::Cache(http_cache_reqwest::HttpCache {
348 mode: CacheMode::Default,
349 manager: MokaManager::default(),
350 options: HttpCacheOptions::default(),
351 }))
352 .build();
353
354 tokio::fs::create_dir_all(&config.key.parent().context("should have parent")?)
355 .await
356 .context("failed to create key directory")?;
357
358 // Check if crypto keys exist. If not, create new ones.
359 let (skey, rkey) = if let Ok(f) = std::fs::File::open(&config.key) {
360 let keys: KeyData = serde_ipld_dagcbor::from_reader(std::io::BufReader::new(f))
361 .context("failed to deserialize crypto keys")?;
362
363 let skey = Secp256k1Keypair::import(&keys.skey).context("failed to import signing key")?;
364 let rkey = Secp256k1Keypair::import(&keys.rkey).context("failed to import rotation key")?;
365
366 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
367 } else {
368 info!("signing keys not found, generating new ones");
369
370 let skey = Secp256k1Keypair::create(&mut rand::thread_rng());
371 let rkey = Secp256k1Keypair::create(&mut rand::thread_rng());
372
373 let keys = KeyData {
374 skey: skey.export(),
375 rkey: rkey.export(),
376 };
377
378 let mut f = std::fs::File::create(&config.key).context("failed to create key file")?;
379 serde_ipld_dagcbor::to_writer(&mut f, &keys).context("failed to serialize crypto keys")?;
380
381 (SigningKey(Arc::new(skey)), RotationKey(Arc::new(rkey)))
382 };
383
384 tokio::fs::create_dir_all(&config.repo.path).await?;
385 tokio::fs::create_dir_all(&config.plc.path).await?;
386 tokio::fs::create_dir_all(&config.blob.path).await?;
387
388 let cred = azure_identity::DefaultAzureCredential::new()
389 .context("failed to create Azure credential")?;
390
391 // Create a database connection manager and pool
392 let manager = ConnectionManager::<SqliteConnection>::new(&config.db);
393 let db = r2d2::Pool::builder()
394 .build(manager)
395 .context("failed to create database connection pool")?;
396
397 // Apply pending migrations
398 let conn = &mut db
399 .get()
400 .context("failed to get database connection for migrations")?;
401 conn.run_pending_migrations(MIGRATIONS)
402 .expect("should be able to run migrations");
403
404 let (_fh, fhp) = firehose::spawn(client.clone(), config.clone());
405
406 let addr = config
407 .listen_address
408 .unwrap_or(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8000));
409
410 let app = Router::new()
411 .route("/", get(index))
412 .merge(oauth::routes())
413 .nest(
414 "/xrpc",
415 endpoints::routes()
416 .merge(actor_endpoints::routes())
417 .fallback(service_proxy),
418 )
419 // .layer(RateLimitLayer::new(30, Duration::from_secs(30)))
420 .layer(CorsLayer::permissive())
421 .layer(TraceLayer::new_for_http())
422 .with_state(AppState {
423 cred,
424 config: config.clone(),
425 db: db.clone(),
426 client: client.clone(),
427 simple_client,
428 firehose: fhp,
429 signing_key: skey,
430 rotation_key: rkey,
431 });
432
433 info!("listening on {addr}");
434 info!("connect to: http://127.0.0.1:{}", addr.port());
435
436 // Determine whether or not this was the first startup (i.e. no accounts exist and no invite codes were created).
437 // If so, create an invite code and share it via the console.
438 let conn = &mut db.get().context("failed to get database connection")?;
439
440 #[derive(QueryableByName)]
441 struct TotalCount {
442 #[diesel(sql_type = diesel::sql_types::Integer)]
443 total_count: i32,
444 }
445
446 let result = diesel::sql_query(
447 "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count",
448 )
449 .get_result::<TotalCount>(conn)
450 .context("failed to query database")?;
451
452 let c = result.total_count;
453
454 #[expect(clippy::print_stdout)]
455 if c == 0 {
456 let uuid = Uuid::new_v4().to_string();
457
458 diesel::sql_query(
459 "INSERT INTO invites (id, did, count, created_at) VALUES (?, NULL, 1, datetime('now'))",
460 )
461 .bind::<diesel::sql_types::Text, _>(uuid.clone())
462 .execute(conn)
463 .context("failed to create new invite code")?;
464
465 // N.B: This is a sensitive message, so we're bypassing `tracing` here and
466 // logging it directly to console.
467 println!("=====================================");
468 println!(" FIRST STARTUP ");
469 println!("=====================================");
470 println!("Use this code to create an account:");
471 println!("{uuid}");
472 println!("=====================================");
473 }
474
475 let listener = TcpListener::bind(&addr)
476 .await
477 .context("failed to bind address")?;
478
479 // Serve the app, and request crawling from upstream relays.
480 let serve = tokio::spawn(async move {
481 axum::serve(listener, app.into_make_service())
482 .await
483 .context("failed to serve app")
484 });
485
486 // Now that the app is live, request a crawl from upstream relays.
487 firehose::reconnect_relays(&client, &config).await;
488
489 serve
490 .await
491 .map_err(Into::into)
492 .and_then(|r| r)
493 .context("failed to serve app")
494}
495
496#[tokio::main(flavor = "multi_thread")]
497async fn main() -> anyhow::Result<()> {
498 // Dispatch out to a separate function without a derive macro to help rust-analyzer along.
499 run().await
500}