Alternative ATProto PDS implementation

prototype sequencer use

Changed files
+74 -54
src
account_manager
-22
Cargo.lock
··· 1282 dependencies = [ 1283 "anyhow", 1284 "argon2", 1285 - "async-trait", 1286 "atrium-api 0.25.3", 1287 "atrium-crypto", 1288 "atrium-repo", 1289 - "atrium-xrpc", 1290 - "atrium-xrpc-client", 1291 "axum", 1292 "azure_core", 1293 "azure_identity", ··· 1306 "futures", 1307 "hex", 1308 "http-cache-reqwest", 1309 - "ipld-core", 1310 - "k256", 1311 - "lazy_static", 1312 "memmap2", 1313 "metrics", 1314 "metrics-exporter-prometheus", 1315 - "multihash 0.19.3", 1316 - "r2d2", 1317 "rand 0.8.5", 1318 - "regex", 1319 "reqwest 0.12.15", 1320 "reqwest-middleware", 1321 "rsky-common", ··· 1325 "rsky-syntax", 1326 "secp256k1", 1327 "serde", 1328 - "serde_bytes", 1329 "serde_ipld_dagcbor", 1330 - "serde_ipld_dagjson", 1331 "serde_json", 1332 "sha2", 1333 "thiserror 2.0.12", ··· 5839 "ipld-core", 5840 "scopeguard", 5841 "serde", 5842 - ] 5843 - 5844 - [[package]] 5845 - name = "serde_ipld_dagjson" 5846 - version = "0.2.0" 5847 - source = "registry+https://github.com/rust-lang/crates.io-index" 5848 - checksum = "3359b47ba7f4a306ef5984665e10539e212e97217afa489437d533208eecda36" 5849 - dependencies = [ 5850 - "ipld-core", 5851 - "serde", 5852 - "serde_json", 5853 ] 5854 5855 [[package]]
··· 1282 dependencies = [ 1283 "anyhow", 1284 "argon2", 1285 "atrium-api 0.25.3", 1286 "atrium-crypto", 1287 "atrium-repo", 1288 "axum", 1289 "azure_core", 1290 "azure_identity", ··· 1303 "futures", 1304 "hex", 1305 "http-cache-reqwest", 1306 "memmap2", 1307 "metrics", 1308 "metrics-exporter-prometheus", 1309 "rand 0.8.5", 1310 "reqwest 0.12.15", 1311 "reqwest-middleware", 1312 "rsky-common", ··· 1316 "rsky-syntax", 1317 "secp256k1", 1318 "serde", 1319 "serde_ipld_dagcbor", 1320 "serde_json", 1321 "sha2", 1322 "thiserror 2.0.12", ··· 5828 "ipld-core", 5829 "scopeguard", 5830 "serde", 5831 ] 5832 5833 [[package]]
+14 -11
Cargo.toml
··· 1 [package] 2 name = "bluepds" 3 version = "0.0.0" ··· 13 14 [profile.dev.package."*"] 15 opt-level = 3 16 17 [profile.dev] 18 opt-level = 1 19 20 [profile.release] 21 opt-level = "s" # Slightly slows compile times, great improvements to file size and runtime performance. ··· 131 # expect_used = "deny" 132 133 [dependencies] 134 - multihash = "0.19.3" 135 diesel = { version = "2.1.5", features = [ 136 "chrono", 137 "sqlite", ··· 139 "returning_clauses_for_sqlite_3_35", 140 ] } 141 diesel_migrations = { version = "2.1.0" } 142 - r2d2 = "0.8.10" 143 144 atrium-repo = "0.1" 145 atrium-api = "0.25" 146 # atrium-common = { version = "0.1.2", path = "atrium-common" } 147 atrium-crypto = "0.1" 148 # atrium-identity = { version = "0.1.4", path = "atrium-identity" } 149 - atrium-xrpc = "0.12" 150 - atrium-xrpc-client = "0.5" 151 # bsky-sdk = { version = "0.1.19", path = "bsky-sdk" } 152 rsky-syntax = { git = "https://github.com/blacksky-algorithms/rsky.git" } 153 rsky-repo = { git = "https://github.com/blacksky-algorithms/rsky.git" } ··· 159 # async-stream = "0.3" 160 161 # DAG-CBOR codec 162 - ipld-core = "0.4.2" 163 serde_ipld_dagcbor = { version = "0.6.2", default-features = false, features = [ 164 "std", 165 ] } 166 - serde_ipld_dagjson = "0.2.0" 167 cidv10 = { version = "0.10.1", package = "cid" } 168 169 # Parsing and validation ··· 172 hex = "0.4.3" 173 # langtag = "0.3" 174 # multibase = "0.9.1" 175 - regex = "1.11.1" 176 serde = { version = "1.0.218", features = ["derive"] } 177 - serde_bytes = "0.11.17" 178 # serde_html_form = "0.2.6" 179 serde_json = "1.0.139" 180 # unsigned-varint = "0.8" ··· 184 # elliptic-curve = "0.13.6" 185 # jose-jwa = "0.1.2" 186 # jose-jwk = { version = "0.1.2", default-features = false } 187 - k256 = "0.13.4" 188 # p256 = { version = "0.13.2", default-features = false } 189 rand = "0.8.5" 190 sha2 = "0.10.8" ··· 256 url = "2.5.4" 257 uuid = { version = "1.14.0", features = ["v4"] } 258 urlencoding = "2.1.3" 259 - async-trait = "0.1.88" 260 - lazy_static = "1.5.0" 261 secp256k1 = "0.28.2" 262 dotenvy = "0.15.7" 263 deadpool-diesel = { version = "0.6.1", features = [
··· 1 + # cargo-features = ["codegen-backend"] 2 + 3 [package] 4 name = "bluepds" 5 version = "0.0.0" ··· 15 16 [profile.dev.package."*"] 17 opt-level = 3 18 + # codegen-backend = "cranelift" 19 20 [profile.dev] 21 opt-level = 1 22 + # codegen-backend = "cranelift" 23 24 [profile.release] 25 opt-level = "s" # Slightly slows compile times, great improvements to file size and runtime performance. ··· 135 # expect_used = "deny" 136 137 [dependencies] 138 + # multihash = "0.19.3" 139 diesel = { version = "2.1.5", features = [ 140 "chrono", 141 "sqlite", ··· 143 "returning_clauses_for_sqlite_3_35", 144 ] } 145 diesel_migrations = { version = "2.1.0" } 146 + # r2d2 = "0.8.10" 147 148 atrium-repo = "0.1" 149 atrium-api = "0.25" 150 # atrium-common = { version = "0.1.2", path = "atrium-common" } 151 atrium-crypto = "0.1" 152 # atrium-identity = { version = "0.1.4", path = "atrium-identity" } 153 + # atrium-xrpc = "0.12" 154 + # atrium-xrpc-client = "0.5" 155 # bsky-sdk = { version = "0.1.19", path = "bsky-sdk" } 156 rsky-syntax = { git = "https://github.com/blacksky-algorithms/rsky.git" } 157 rsky-repo = { git = "https://github.com/blacksky-algorithms/rsky.git" } ··· 163 # async-stream = "0.3" 164 165 # DAG-CBOR codec 166 + # ipld-core = "0.4.2" 167 serde_ipld_dagcbor = { version = "0.6.2", default-features = false, features = [ 168 "std", 169 ] } 170 + # serde_ipld_dagjson = "0.2.0" 171 cidv10 = { version = "0.10.1", package = "cid" } 172 173 # Parsing and validation ··· 176 hex = "0.4.3" 177 # langtag = "0.3" 178 # multibase = "0.9.1" 179 + # regex = "1.11.1" 180 serde = { version = "1.0.218", features = ["derive"] } 181 + # serde_bytes = "0.11.17" 182 # serde_html_form = "0.2.6" 183 serde_json = "1.0.139" 184 # unsigned-varint = "0.8" ··· 188 # elliptic-curve = "0.13.6" 189 # jose-jwa = "0.1.2" 190 # jose-jwk = { version = "0.1.2", default-features = false } 191 + # k256 = "0.13.4" 192 # p256 = { version = "0.13.2", default-features = false } 193 rand = "0.8.5" 194 sha2 = "0.10.8" ··· 260 url = "2.5.4" 261 uuid = { version = "1.14.0", features = ["v4"] } 262 urlencoding = "2.1.3" 263 + # lazy_static = "1.5.0" 264 secp256k1 = "0.28.2" 265 dotenvy = "0.15.7" 266 deadpool-diesel = { version = "0.6.1", features = [
+5
src/account_manager/mod.rs
··· 31 use std::collections::BTreeMap; 32 use std::env; 33 use std::time::SystemTime; 34 35 pub(crate) mod helpers { 36 pub mod account; ··· 500 email_token::create_email_token(did, purpose, &self.db).await 501 } 502 }
··· 31 use std::collections::BTreeMap; 32 use std::env; 33 use std::time::SystemTime; 34 + use tokio::sync::RwLock; 35 36 pub(crate) mod helpers { 37 pub mod account; ··· 501 email_token::create_email_token(did, purpose, &self.db).await 502 } 503 } 504 + 505 + pub struct SharedAccountManager { 506 + pub account_manager: RwLock<AccountManagerCreator>, 507 + }
+55 -21
src/lib.rs
··· 19 #[cfg(test)] 20 mod tests; 21 22 use anyhow::{Context as _, anyhow}; 23 use atrium_api::types::string::Did; 24 use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; ··· 44 use firehose::FirehoseProducer; 45 use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 46 use rand::Rng as _; 47 use serde::{Deserialize, Serialize}; 48 use service_proxy::service_proxy; 49 use std::{ ··· 52 str::FromStr as _, 53 sync::Arc, 54 }; 55 - use tokio::net::TcpListener; 56 use tower_http::{cors::CorsLayer, trace::TraceLayer}; 57 use tracing::{info, warn}; 58 use uuid::Uuid; ··· 67 pub type Result<T> = std::result::Result<T, Error>; 68 /// The reqwest client type with middleware. 69 pub type Client = reqwest_middleware::ClientWithMiddleware; 70 - /// The Azure credential type. 71 - pub type Cred = Arc<dyn TokenCredential>; 72 73 #[expect( 74 clippy::arbitrary_source_item_ordering, ··· 129 pub verbosity: Verbosity<InfoLevel>, 130 } 131 132 pub struct ActorPools { 133 pub repo: Pool, 134 pub blob: Pool, 135 } 136 ··· 148 pub struct AppState { 149 /// The application configuration. 150 pub config: AppConfig, 151 - /// The Azure credential. 152 - pub cred: Cred, 153 /// The main database connection pool. Used for common PDS data, like invite codes. 154 pub db: Pool, 155 /// Actor-specific database connection pools. Hashed by DID. ··· 160 /// The simple HTTP client. 161 pub simple_client: reqwest::Client, 162 /// The firehose producer. 163 - pub firehose: FirehoseProducer, 164 165 /// The signing key. 166 pub signing_key: SigningKey, ··· 341 // conn.run_pending_migrations(MIGRATIONS) 342 // .expect("should be able to run migrations"); 343 344 - let (_fh, fhp) = firehose::spawn(client.clone(), config.clone()); 345 346 let addr = config 347 .listen_address ··· 360 .layer(CorsLayer::permissive()) 361 .layer(TraceLayer::new_for_http()) 362 .with_state(AppState { 363 - cred, 364 config: config.clone(), 365 db: pool.clone(), 366 db_actors: actor_pools.clone(), 367 client: client.clone(), 368 simple_client, 369 - firehose: fhp, 370 signing_key: skey, 371 rotation_key: rkey, 372 }); ··· 386 387 let result = conn.interact(move |conn| { 388 diesel::sql_query( 389 - "SELECT (SELECT COUNT(*) FROM accounts) + (SELECT COUNT(*) FROM invites) AS total_count", 390 ) 391 .get_result::<TotalCount>(conn) 392 }) ··· 399 if c == 0 { 400 let uuid = Uuid::new_v4().to_string(); 401 402 let uuid_clone = uuid.clone(); 403 - _ = conn 404 - .interact(move |conn| { 405 - diesel::sql_query( 406 - "INSERT INTO invites (id, did, count, created_at) VALUES (?, NULL, 1, datetime('now'))", 407 - ) 408 - .bind::<diesel::sql_types::Text, _>(uuid_clone) 409 - .execute(conn) 410 - .context("failed to create new invite code") 411 - .expect("should be able to create invite code") 412 }) 413 .await 414 - .expect("should be able to create invite code"); 415 416 // N.B: This is a sensitive message, so we're bypassing `tracing` here and 417 // logging it directly to console. ··· 435 }); 436 437 // Now that the app is live, request a crawl from upstream relays. 438 - firehose::reconnect_relays(&client, &config).await; 439 440 serve 441 .await
··· 19 #[cfg(test)] 20 mod tests; 21 22 + use account_manager::{AccountManager, SharedAccountManager}; 23 use anyhow::{Context as _, anyhow}; 24 use atrium_api::types::string::Did; 25 use atrium_crypto::keypair::{Export as _, Secp256k1Keypair}; ··· 45 use firehose::FirehoseProducer; 46 use http_cache_reqwest::{CacheMode, HttpCacheOptions, MokaManager}; 47 use rand::Rng as _; 48 + use rsky_pds::{crawlers::Crawlers, sequencer::Sequencer}; 49 use serde::{Deserialize, Serialize}; 50 use service_proxy::service_proxy; 51 use std::{ ··· 54 str::FromStr as _, 55 sync::Arc, 56 }; 57 + use tokio::{net::TcpListener, sync::RwLock}; 58 use tower_http::{cors::CorsLayer, trace::TraceLayer}; 59 use tracing::{info, warn}; 60 use uuid::Uuid; ··· 69 pub type Result<T> = std::result::Result<T, Error>; 70 /// The reqwest client type with middleware. 71 pub type Client = reqwest_middleware::ClientWithMiddleware; 72 + 73 + /// The Shared Sequencer which requests crawls from upstream relays and emits events to the firehose. 74 + pub struct SharedSequencer { 75 + /// The sequencer instance. 76 + pub sequencer: RwLock<Sequencer>, 77 + } 78 79 #[expect( 80 clippy::arbitrary_source_item_ordering, ··· 135 pub verbosity: Verbosity<InfoLevel>, 136 } 137 138 + /// The actor pools for the database connections. 139 pub struct ActorPools { 140 + /// The database connection pool for the actor's repository. 141 pub repo: Pool, 142 + /// The database connection pool for the actor's blobs. 143 pub blob: Pool, 144 } 145 ··· 157 pub struct AppState { 158 /// The application configuration. 159 pub config: AppConfig, 160 /// The main database connection pool. Used for common PDS data, like invite codes. 161 pub db: Pool, 162 /// Actor-specific database connection pools. Hashed by DID. ··· 167 /// The simple HTTP client. 168 pub simple_client: reqwest::Client, 169 /// The firehose producer. 170 + pub sequencer: Arc<SharedSequencer>, 171 + /// The account manager. 172 + pub account_manager: Arc<SharedAccountManager>, 173 174 /// The signing key. 175 pub signing_key: SigningKey, ··· 350 // conn.run_pending_migrations(MIGRATIONS) 351 // .expect("should be able to run migrations"); 352 353 + let hostname = config.host_name.clone(); 354 + let crawlers: Vec<String> = config 355 + .firehose 356 + .relays 357 + .iter() 358 + .map(|s| s.to_string()) 359 + .collect(); 360 + let sequencer = Arc::new(SharedSequencer { 361 + sequencer: RwLock::new(Sequencer::new( 362 + Crawlers::new(hostname, crawlers.clone()), 363 + None, 364 + )), 365 + }); 366 + let account_manager = SharedAccountManager { 367 + account_manager: RwLock::new(AccountManager::creator()), 368 + }; 369 370 let addr = config 371 .listen_address ··· 384 .layer(CorsLayer::permissive()) 385 .layer(TraceLayer::new_for_http()) 386 .with_state(AppState { 387 config: config.clone(), 388 db: pool.clone(), 389 db_actors: actor_pools.clone(), 390 client: client.clone(), 391 simple_client, 392 + sequencer: sequencer.clone(), 393 + account_manager: Arc::new(account_manager), 394 signing_key: skey, 395 rotation_key: rkey, 396 }); ··· 410 411 let result = conn.interact(move |conn| { 412 diesel::sql_query( 413 + "SELECT (SELECT COUNT(*) FROM account) + (SELECT COUNT(*) FROM invite_code) AS total_count", 414 ) 415 .get_result::<TotalCount>(conn) 416 }) ··· 423 if c == 0 { 424 let uuid = Uuid::new_v4().to_string(); 425 426 + use crate::models::pds as models; 427 + use crate::schema::pds::invite_code::dsl as InviteCode; 428 let uuid_clone = uuid.clone(); 429 + drop( 430 + conn.interact(move |conn| { 431 + diesel::insert_into(InviteCode::invite_code) 432 + .values(models::InviteCode { 433 + code: uuid_clone, 434 + available_uses: 1, 435 + disabled: 0, 436 + for_account: "None".to_owned(), 437 + created_by: "None".to_owned(), 438 + created_at: "None".to_owned(), 439 + }) 440 + .execute(conn) 441 + .context("failed to create new invite code") 442 }) 443 .await 444 + .expect("should be able to create invite code"), 445 + ); 446 447 // N.B: This is a sensitive message, so we're bypassing `tracing` here and 448 // logging it directly to console. ··· 466 }); 467 468 // Now that the app is live, request a crawl from upstream relays. 469 + let mut background_sequencer = sequencer.sequencer.write().await.clone(); 470 + drop(tokio::spawn( 471 + async move { background_sequencer.start().await }, 472 + )); 473 474 serve 475 .await